因为涉及大量内容文章的审核
我们做了这个方案 并没违规更不是广告
一、系统设计理念
1.1 设计目标
┌─────────────────────────────────────────┐ │ 违禁词预审系统设计目标 │ ├─────────────────────────────────────────┤ │ ❶ 实时检测:毫秒级响应 │ │ ❷ 智能过滤:上下文感知判断 │ │ ❸ 多级处置:过滤/替换/标记/拦截 │ │ ❹ 高可扩展:支持动态词库更新 │ │ ❺ 人机协同:AI识别+人工复核 │ └─────────────────────────────────────────┘1.2 核心原则
安全第一:确保信件内容符合监管要求
最小干预:最大程度保留原意,只做必要修改
透明可查:所有修改记录可追溯、可复核
持续学习:基于反馈持续优化识别能力
二、系统架构设计
2.1 整体架构
┌─────────────────────────────────────────────────────┐ │ 用户端界面 │ └─────────────────┬───────────────────────────────────┘ │ ┌─────────────────▼───────────────────────────────────┐ │ API网关层 │ │ ┌──────────────┐ ┌──────────────┐ │ │ │ 负载均衡 │ │ 限流熔断 │ │ │ └──────────────┘ └──────────────┘ │ └─────────────────┬───────────────────────────────────┘ │ ┌─────────────────▼───────────────────────────────────┐ │ 内容预审服务层 │ │ ┌──────┐ ┌──────┐ ┌──────┐ ┌──────┐ │ │ │快速 │ │深度 │ │情感 │ │语义 │ │ │ │过滤 │ │分析 │ │分析 │ │理解 │ │ │ └──────┘ └──────┘ └──────┘ └──────┘ │ └─────────────────┬───────────────────────────────────┘ │ ┌─────────────────▼───────────────────────────────────┐ │ 数据处理层 │ │ ┌────────────┐ ┌────────────┐ ┌────────────┐ │ │ │违禁词库 │ │上下文规则 │ │模型推理 │ │ │ │管理 │ │引擎 │ │引擎 │ │ │ └────────────┘ └────────────┘ └────────────┘ │ └─────────────────┬───────────────────────────────────┘ │ ┌─────────────────▼───────────────────────────────────┐ │ 存储层 │ │ ┌────────────┐ ┌────────────┐ ┌────────────┐ │ │ │Redis缓存 │ │MySQL主库 │ │ES检索 │ │ │ └────────────┘ └────────────┘ └────────────┘ │ └─────────────────────────────────────────────────────┘2.2 处理流程
三、核心技术实现
3.1 违禁词库管理系统
from typing import Dict, List, Set, Optional, Tuple from dataclasses import dataclass, field from enum import Enum import json import re import ahocorasick from collections import defaultdict import datetime import hashlib class ProhibitedLevel(Enum): """违禁词级别枚举""" CRITICAL = 1 # 高危:直接拦截 HIGH = 2 # 高:必须替换 MEDIUM = 3 # 中:建议替换 LOW = 4 # 低:仅标记 WARNING = 5 # 警告:提醒注意 class MatchType(Enum): """匹配类型枚举""" EXACT = "exact" # 精确匹配 FUZZY = "fuzzy" # 模糊匹配 SEMANTIC = "semantic" # 语义匹配 CONTEXT = "context" # 上下文匹配 @dataclass class ProhibitedWord: """违禁词定义""" word: str # 违禁词原文 level: ProhibitedLevel # 违禁级别 categories: Set[str] # 所属分类 match_type: MatchType # 匹配类型 synonyms: List[str] = field(default_factory=list) # 同义词 patterns: List[str] = field(default_factory=list) # 正则模式 replacement: Optional[str] = None # 建议替换词 contexts: List[str] = field(default_factory=list) # 触发上下文 exceptions: List[str] = field(default_factory=list) # 例外情况 enabled: bool = True # 是否启用 created_at: datetime.datetime = field(default_factory=datetime.datetime.now) updated_at: datetime.datetime = field(default_factory=datetime.datetime.now) class ProhibitedWordLibrary: """违禁词库管理系统""" def __init__(self): self.words: Dict[str, ProhibitedWord] = {} self.word_tree = ahocorasick.Automaton() # Aho-Corasick自动机 self.category_index: Dict[str, Set[str]] = defaultdict(set) self.level_index: Dict[ProhibitedLevel, Set[str]] = defaultdict(set) self.enabled_words: Set[str] = set() # 加载内置词库 self._load_builtin_library() def _load_builtin_library(self): """加载内置违禁词库""" builtin_words = [ ProhibitedWord( word="越狱", level=ProhibitedLevel.CRITICAL, categories={"逃跑", "违规"}, match_type=MatchType.EXACT, replacement="", contexts=["计划", "策划", "企图"] ), ProhibitedWord( word="毒品", level=ProhibitedLevel.HIGH, categories={"违禁品", "非法"}, match_type=MatchType.FUZZY, synonyms=["白粉", "海洛因", "冰毒"], replacement="违禁物品", exceptions=["戒毒", "远离毒品"] ), ProhibitedWord( word="自杀", level=ProhibitedLevel.HIGH, categories={"自残", "危险"}, match_type=MatchType.SEMANTIC, synonyms=["自尽", "轻生", "寻短见"], replacement="不珍惜生命", contexts=["想要", "打算", "准备"] ), ProhibitedWord( word="打架", level=ProhibitedLevel.MEDIUM, categories={"暴力", "冲突"}, match_type=MatchType.EXACT, replacement="发生冲突", exceptions=["不要打架", "反对打架"] ), ProhibitedWord( word="投诉", level=ProhibitedLevel.WARNING, categories={"管理", "意见"}, match_type=MatchType.EXACT, replacement="反映情况" ), ] for word_obj in builtin_words: self.add_word(word_obj) def add_word(self, word_obj: ProhibitedWord): """添加违禁词""" word_key = word_obj.word.lower() self.words[word_key] = word_obj # 更新索引 for category in word_obj.categories: self.category_index[category].add(word_key) self.level_index[word_obj.level].add(word_key) if word_obj.enabled: self.enabled_words.add(word_key) # 添加到AC自动机 self.word_tree.add_word(word_key) for synonym in word_obj.synonyms: self.word_tree.add_word(synonym.lower()) # 构建AC自动机 self.word_tree.make_automaton() def remove_word(self, word: str): """移除违禁词""" word_key = word.lower() if word_key in self.words: word_obj = self.words[word_key] # 从索引中移除 for category in word_obj.categories: self.category_index[category].discard(word_key) self.level_index[word_obj.level].discard(word_key) self.enabled_words.discard(word_key) # 从AC自动机移除(需要重建) del self.words[word_key] self._rebuild_automaton() def update_word(self, word: str, **kwargs): """更新违禁词""" word_key = word.lower() if word_key in self.words: word_obj = self.words[word_key] # 更新属性 for key, value in kwargs.items(): if hasattr(word_obj, key): setattr(word_obj, key, value) word_obj.updated_at = datetime.datetime.now() # 重新构建索引和自动机 self._rebuild_automaton() def _rebuild_automaton(self): """重建AC自动机""" self.word_tree = ahocorasick.Automaton() for word_key in self.enabled_words: self.word_tree.add_word(word_key) word_obj = self.words[word_key] for synonym in word_obj.synonyms: self.word_tree.add_word(synonym.lower()) self.word_tree.make_automaton() def search_words(self, category: Optional[str] = None, level: Optional[ProhibitedLevel] = None, enabled: Optional[bool] = None) -> List[ProhibitedWord]: """搜索违禁词""" results = [] for word_key, word_obj in self.words.items(): if category and category not in word_obj.categories: continue if level and word_obj.level != level: continue if enabled is not None and word_obj.enabled != enabled: continue results.append(word_obj) return results def export_library(self, format: str = "json") -> str: """导出词库""" data = { "version": "1.0.0", "export_time": datetime.datetime.now().isoformat(), "word_count": len(self.words), "words": [] } for word_obj in self.words.values(): word_data = { "word": word_obj.word, "level": word_obj.level.value, "categories": list(word_obj.categories), "match_type": word_obj.match_type.value, "synonyms": word_obj.synonyms, "replacement": word_obj.replacement, "enabled": word_obj.enabled, "created_at": word_obj.created_at.isoformat(), "updated_at": word_obj.updated_at.isoformat() } data["words"].append(word_data) if format == "json": return json.dumps(data, ensure_ascii=False, indent=2) else: raise ValueError(f"Unsupported format: {format}") def import_library(self, data: str, format: str = "json", merge: bool = True): """导入词库""" if format == "json": imported_data = json.loads(data) if not merge: self.words.clear() self.category_index.clear() self.level_index.clear() self.enabled_words.clear() self.word_tree = ahocorasick.Automaton() for word_data in imported_data.get("words", []): word_obj = ProhibitedWord( word=word_data["word"], level=ProhibitedLevel(word_data["level"]), categories=set(word_data.get("categories", [])), match_type=MatchType(word_data.get("match_type", "exact")), synonyms=word_data.get("synonyms", []), replacement=word_data.get("replacement"), enabled=word_data.get("enabled", True), created_at=datetime.datetime.fromisoformat(word_data.get("created_at")), updated_at=datetime.datetime.fromisoformat(word_data.get("updated_at")) ) self.add_word(word_obj) else: raise ValueError(f"Unsupported format: {format}")3.2 智能过滤引擎
import jieba import jieba.posseg as pseg from typing import List, Dict, Any, Optional, Tuple from collections import defaultdict import re class IntelligentFilterEngine: """智能过滤引擎""" def __init__(self, word_library: ProhibitedWordLibrary): self.library = word_library self.stop_words = self._load_stop_words() # 语义相似度模型(简化版,实际应使用BERT等模型) self.semantic_similarity_threshold = 0.7 # 上下文窗口大小 self.context_window = 3 # 初始化jieba分词 jieba.initialize() # 加载自定义词典 self._load_custom_dict() def _load_stop_words(self) -> Set[str]: """加载停用词表""" stop_words = { "的", "了", "在", "是", "我", "有", "和", "就", "不", "人", "都", "一", "一个", "上", "也", "很", "到", "说", "要", "去", "你", "会", "着", "没有", "看", "好", "自己", "这", "那", "她", "他", "它" } return stop_words def _load_custom_dict(self): """加载自定义词典""" # 添加特殊词汇到分词词典 custom_words = [] for word_obj in self.library.words.values(): custom_words.append(f"{word_obj.word} 1000 n") for synonym in word_obj.synonyms: custom_words.append(f"{synonym} 1000 n") dict_content = "\n".join(custom_words) jieba.load_userdict_from_fd(dict_content.splitlines()) def preprocess_text(self, text: str) -> Tuple[str, List[str], List[str]]: """ 文本预处理 Returns: (清洗后文本, 分词结果, 词性标注) """ # 清洗文本 cleaned_text = self._clean_text(text) # 分词 words = list(jieba.cut(cleaned_text)) # 词性标注 word_tags = [] for word, flag in pseg.cut(cleaned_text): word_tags.append(f"{word}/{flag}") return cleaned_text, words, word_tags def _clean_text(self, text: str) -> str: """清洗文本""" # 去除HTML标签 text = re.sub(r'<[^>]+>', '', text) # 去除特殊字符(保留中文、英文、数字和常用标点) text = re.sub(r'[^\u4e00-\u9fa5a-zA-Z0-9,。!?、;:"\'()《》【】\s]', '', text) # 统一标点符号 text = text.replace(',', ',').replace('。', '.').replace('!', '!').replace('?', '?') # 合并多余空格 text = re.sub(r'\s+', ' ', text).strip() return text def detect_prohibited_content(self, text: str) -> Dict[str, Any]: """ 检测违禁内容 Returns: 检测结果字典 """ # 预处理文本 cleaned_text, words, word_tags = self.preprocess_text(text) text_lower = cleaned_text.lower() results = { "original_text": text, "cleaned_text": cleaned_text, "violations": [], "risk_level": "safe", "suggested_text": cleaned_text, "needs_review": False } # 1. 快速AC自动机匹配 fast_matches = self._fast_ac_match(text_lower) # 2. 模糊匹配 fuzzy_matches = self._fuzzy_match(text_lower, words) # 3. 语义匹配 semantic_matches = self._semantic_match(cleaned_text, words) # 4. 上下文分析 context_matches = self._context_analysis(cleaned_text, words, word_tags) # 合并所有匹配 all_matches = self._merge_matches(fast_matches + fuzzy_matches + semantic_matches + context_matches) if not all_matches: return results # 分析匹配结果 violations = [] highest_level = ProhibitedLevel.LOW for match in all_matches: violation = { "matched_text": match["text"], "matched_type": match["type"].value, "prohibited_word": match["word_obj"].word, "level": match["word_obj"].level.value, "categories": list(match["word_obj"].categories), "position": match["position"], "context": match.get("context", ""), "suggested_replacement": match["word_obj"].replacement or "***", "confidence": match.get("confidence", 1.0) } violations.append(violation) # 更新最高风险级别 if match["word_obj"].level.value < highest_level.value: highest_level = match["word_obj"].level results["violations"] = violations results["risk_level"] = self._get_risk_level(highest_level) results["needs_review"] = highest_level.value <= ProhibitedLevel.MEDIUM.value # 生成建议文本 results["suggested_text"] = self._generate_suggested_text(cleaned_text, violations) return results def _fast_ac_match(self, text: str) -> List[Dict]: """快速AC自动机匹配""" matches = [] for end_index, word_key in self.library.word_tree.iter(text): start_index = end_index - len(word_key) + 1 matched_text = text[start_index:end_index + 1] # 获取违禁词对象 word_obj = self.library.words.get(word_key) if not word_obj: # 可能是同义词,需要找到原词 for obj in self.library.words.values(): if word_key in [w.lower() for w in obj.synonyms]: word_obj = obj break if word_obj: # 检查例外情况 if not self._is_exception(text, start_index, end_index, word_obj): match = { "text": matched_text, "type": MatchType.EXACT, "word_obj": word_obj, "position": (start_index, end_index + 1), "confidence": 1.0 } matches.append(match) return matches def _fuzzy_match(self, text: str, words: List[str]) -> List[Dict]: """模糊匹配""" matches = [] # 检查变体(拼音、简写等) for word_obj in self.library.search_words(enabled=True): if word_obj.match_type != MatchType.FUZZY: continue # 拼音匹配 pinyin_matches = self._match_pinyin_variants(text, word_obj) matches.extend(pinyin_matches) # 简写匹配 abbreviation_matches = self._match_abbreviations(text, word_obj) matches.extend(abbreviation_matches) # 拆字匹配 split_matches = self._match_split_words(text, word_obj, words) matches.extend(split_matches) return matches def _semantic_match(self, text: str, words: List[str]) -> List[Dict]: """语义匹配""" matches = [] # 简化版语义匹配(实际应使用BERT等模型) for word_obj in self.library.search_words(enabled=True): if word_obj.match_type != MatchType.SEMANTIC: continue # 检查语义相似的词 for i, word in enumerate(words): if word in self.stop_words: continue similarity = self._calculate_semantic_similarity(word, word_obj.word) if similarity >= self.semantic_similarity_threshold: # 获取上下文 context_start = max(0, i - self.context_window) context_end = min(len(words), i + self.context_window + 1) context = " ".join(words[context_start:context_end]) match = { "text": word, "type": MatchType.SEMANTIC, "word_obj": word_obj, "position": self._find_word_position(text, word, i), "context": context, "confidence": similarity } # 检查例外情况 if not self._is_exception(text, match["position"][0], match["position"][1] - 1, word_obj): matches.append(match) return matches def _context_analysis(self, text: str, words: List[str], word_tags: List[str]) -> List[Dict]: """上下文分析""" matches = [] for word_obj in self.library.search_words(enabled=True): if not word_obj.contexts: continue # 检查上下文关键词 for context_word in word_obj.contexts: if context_word in text: # 找到上下文词附近的违禁词 context_pos = text.find(context_word) search_start = max(0, context_pos - 50) search_end = min(len(text), context_pos + len(context_word) + 50) context_region = text[search_start:search_end] # 在上下文区域内检查违禁词 if word_obj.word in context_region: word_pos = context_region.find(word_obj.word) actual_pos = search_start + word_pos match = { "text": word_obj.word, "type": MatchType.CONTEXT, "word_obj": word_obj, "position": (actual_pos, actual_pos + len(word_obj.word)), "context": context_region, "confidence": 0.9 } if not self._is_exception(text, match["position"][0], match["position"][1] - 1, word_obj): matches.append(match) return matches def _is_exception(self, text: str, start: int, end: int, word_obj: ProhibitedWord) -> bool: """检查是否为例外情况""" # 检查例外词列表 for exception in word_obj.exceptions: if exception in text: # 检查例外词是否覆盖了违禁词 exception_pos = text.find(exception) if exception_pos <= start and exception_pos + len(exception) >= end: return True # 检查否定上下文 context_start = max(0, start - 5) context_end = min(len(text), end + 5) context = text[context_start:context_end] negative_patterns = [ r"不(要|会|能|可以|应该|可能).{0,3}" + re.escape(word_obj.word), r"反对.{0,3}" + re.escape(word_obj.word), r"禁止.{0,3}" + re.escape(word_obj.word), r"远离.{0,3}" + re.escape(word_obj.word), r"抵制.{0,3}" + re.escape(word_obj.word) ] for pattern in negative_patterns: if re.search(pattern, context): return True return False def _merge_matches(self, matches: List[Dict]) -> List[Dict]: """合并重叠的匹配""" if not matches: return [] # 按起始位置排序 sorted_matches = sorted(matches, key=lambda x: x["position"][0]) merged = [] current = sorted_matches[0] for match in sorted_matches[1:]: # 检查是否重叠 if match["position"][0] <= current["position"][1]: # 重叠,保留置信度更高的或更长的 if (match.get("confidence", 0) > current.get("confidence", 0) or (match["position"][1] - match["position"][0]) > (current["position"][1] - current["position"][0])): current = match # 更新结束位置 current["position"] = (current["position"][0], max(current["position"][1], match["position"][1])) else: # 不重叠,添加当前匹配,开始新的 merged.append(current) current = match merged.append(current) return merged def _get_risk_level(self, highest_prohibited_level: ProhibitedLevel) -> str: """获取风险级别""" if highest_prohibited_level == ProhibitedLevel.CRITICAL: return "critical" elif highest_prohibited_level == ProhibitedLevel.HIGH: return "high" elif highest_prohibited_level == ProhibitedLevel.MEDIUM: return "medium" elif highest_prohibited_level == ProhibitedLevel.LOW: return "low" else: return "safe" def _generate_suggested_text(self, text: str, violations: List[Dict]) -> str: """生成建议文本""" if not violations: return text # 按位置逆序处理,避免索引变化 sorted_violations = sorted(violations, key=lambda x: x["position"][0], reverse=True) suggested_text = text for violation in sorted_violations: start, end = violation["position"] replacement = violation["suggested_replacement"] # 根据级别决定替换策略 level = violation["level"] if level <= ProhibitedLevel.HIGH.value: # 高危词直接替换 suggested_text = suggested_text[:start] + replacement + suggested_text[end:] elif level == ProhibitedLevel.MEDIUM.value: # 中危词替换并标记 marked_replacement = f"[{replacement}]" suggested_text = suggested_text[:start] + marked_replacement + suggested_text[end:] else: # 低危词仅标记 marked_text = f"({suggested_text[start:end]})" suggested_text = suggested_text[:start] + marked_text + suggested_text[end:] return suggested_text # 以下为辅助方法(简化实现) def _match_pinyin_variants(self, text: str, word_obj: ProhibitedWord) -> List[Dict]: """匹配拼音变体""" # 简化实现,实际需要拼音转换库 return [] def _match_abbreviations(self, text: str, word_obj: ProhibitedWord) -> List[Dict]: """匹配简写""" # 简化实现 return [] def _match_split_words(self, text: str, word_obj: ProhibitedWord, words: List[str]) -> List[Dict]: """匹配拆字""" # 简化实现 return [] def _calculate_semantic_similarity(self, word1: str, word2: str) -> float: """计算语义相似度""" # 简化实现,实际应使用词向量 if word1 == word2: return 1.0 # 简单相似度计算(基于字符重叠) set1 = set(word1) set2 = set(word2) intersection = len(set1 & set2) union = len(set1 | set2) return intersection / union if union > 0 else 0 def _find_word_position(self, text: str, word: str, word_index: int) -> Tuple[int, int]: """查找词在文本中的位置""" # 简化实现 if word in text: pos = text.find(word) return (pos, pos + len(word)) return (0, 0)3.3 内容预审API服务
from fastapi import FastAPI, HTTPException, Depends, BackgroundTasks from pydantic import BaseModel, Field from typing import Optional, List, Dict import uvicorn import asyncio import redis import pickle from datetime import datetime, timedelta app = FastAPI(title="微爱帮内容预审API", version="1.0.0") # Redis缓存 redis_client = redis.Redis(host='localhost', port=6379, db=0, decode_responses=False) # 初始化组件 word_library = ProhibitedWordLibrary() filter_engine = IntelligentFilterEngine(word_library) class ContentReviewRequest(BaseModel): """内容预审请求""" content: str = Field(..., description="待审核内容", min_length=1, max_length=10000) user_id: str = Field(..., description="用户ID") letter_id: Optional[str] = Field(None, description="信件ID") check_level: str = Field("normal", description="审核级别:strict/normal/lenient") auto_replace: bool = Field(True, description="是否自动替换违禁词") class ContentReviewResponse(BaseModel): """内容预审响应""" request_id: str status: str risk_level: str is_passed: bool violations_count: int violations: List[Dict] original_content: str suggested_content: str needs_human_review: bool review_notes: Optional[str] = None processing_time_ms: int timestamp: str class ReviewRecord(BaseModel): """审核记录""" record_id: str user_id: str letter_id: Optional[str] original_content: str reviewed_content: str violations: List[Dict] risk_level: str reviewer_id: Optional[str] = None review_status: str review_notes: Optional[str] = None created_at: str reviewed_at: Optional[str] = None def get_cache_key(content: str, check_level: str) -> str: """生成缓存键""" content_hash = hashlib.md5(content.encode()).hexdigest() return f"content_review:{content_hash}:{check_level}" @app.post("/api/v1/content/review", response_model=ContentReviewResponse) async def review_content( request: ContentReviewRequest, background_tasks: BackgroundTasks ): """ 内容预审接口 """ start_time = datetime.now() # 检查缓存 cache_key = get_cache_key(request.content, request.check_level) cached_result = redis_client.get(cache_key) if cached_result: result = pickle.loads(cached_result) result["processing_time_ms"] = (datetime.now() - start_time).microseconds // 1000 return ContentReviewResponse(**result) try: # 内容检测 detection_result = filter_engine.detect_prohibited_content(request.content) # 根据审核级别调整 if request.check_level == "lenient": # 宽松模式:只处理高危词 detection_result["violations"] = [ v for v in detection_result["violations"] if v["level"] <= ProhibitedLevel.HIGH.value ] elif request.check_level == "strict": # 严格模式:处理所有违禁词 pass # 判断是否通过 is_passed = len(detection_result["violations"]) == 0 or detection_result["risk_level"] == "low" # 如果需要人工审核 needs_human_review = detection_result["needs_review"] and not is_passed # 生成响应 response_data = { "request_id": f"req_{int(start_time.timestamp())}_{hash(request.content) % 10000}", "status": "completed", "risk_level": detection_result["risk_level"], "is_passed": is_passed, "violations_count": len(detection_result["violations"]), "violations": detection_result["violations"], "original_content": request.content, "suggested_content": detection_result["suggested_text"], "needs_human_review": needs_human_review, "processing_time_ms": (datetime.now() - start_time).microseconds // 1000, "timestamp": start_time.isoformat() } # 记录审核笔记 if detection_result["violations"]: violation_types = set(v["matched_type"] for v in detection_result["violations"]) response_data["review_notes"] = f"发现{len(detection_result['violations'])}处违禁内容,类型:{', '.join(violation_types)}" response = ContentReviewResponse(**response_data) # 缓存结果(5分钟) redis_client.setex( cache_key, timedelta(minutes=5), pickle.dumps(response_data) ) # 后台记录审核日志 background_tasks.add_task( record_review_log, request.user_id, request.letter_id, request.content, detection_result ) return response except Exception as e: raise HTTPException(status_code=500, detail=f"内容审核失败: {str(e)}") async def record_review_log(user_id: str, letter_id: Optional[str], content: str, detection_result: Dict): """记录审核日志""" log_entry = { "user_id": user_id, "letter_id": letter_id, "original_content_hash": hashlib.md5(content.encode()).hexdigest(), "violations_count": len(detection_result["violations"]), "risk_level": detection_result["risk_level"], "violation_types": list(set(v["matched_type"] for v in detection_result["violations"])), "timestamp": datetime.now().isoformat() } # 这里应该将日志存入数据库 # 简化实现:打印日志 print(f"Review log: {json.dumps(log_entry, ensure_ascii=False)}") @app.post("/api/v1/content/batch-review") async def batch_review_content(requests: List[ContentReviewRequest]): """ 批量内容预审接口 """ tasks = [] for request in requests: tasks.append(review_content(request, BackgroundTasks())) results = await asyncio.gather(*tasks, return_exceptions=True) return { "total_count": len(requests), "success_count": sum(1 for r in results if not isinstance(r, Exception)), "failed_count": sum(1 for r in results if isinstance(r, Exception)), "results": results } @app.get("/api/v1/content/stats") async def get_review_statistics( start_date: Optional[str] = None, end_date: Optional[str] = None ): """ 获取审核统计信息 """ # 这里应该从数据库查询统计数据 # 简化实现:返回模拟数据 return { "total_reviews": 1000, "passed_count": 850, "blocked_count": 150, "human_review_count": 50, "average_processing_time_ms": 120, "common_violations": [ {"word": "打架", "count": 45}, {"word": "投诉", "count": 32}, {"word": "违禁品", "count": 28} ] } @app.post("/api/v1/word-library/update") async def update_word_library( words: List[Dict], operation: str = "add" # add/update/remove ): """ 更新违禁词库 """ try: for word_data in words: if operation == "add": word_obj = ProhibitedWord( word=word_data["word"], level=ProhibitedLevel(word_data["level"]), categories=set(word_data.get("categories", [])), match_type=MatchType(word_data.get("match_type", "exact")), synonyms=word_data.get("synonyms", []), replacement=word_data.get("replacement") ) word_library.add_word(word_obj) elif operation == "update": word_library.update_word( word_data["word"], **{k: v for k, v in word_data.items() if k != "word"} ) elif operation == "remove": word_library.remove_word(word_data["word"]) return {"status": "success", "updated_count": len(words)} except Exception as e: raise HTTPException(status_code=400, detail=f"更新词库失败: {str(e)}") if __name__ == "__main__": uvicorn.run(app, host="0.0.0.0", port=8000)3.4 前端集成示例
// content-review.js class WeiAiBangContentReview { constructor(options = {}) { this.options = { apiEndpoint: 'https://review.weiaibang.com/api/v1/content', checkLevel: 'normal', autoReplace: true, showSuggestions: true, realtimeCheck: false, ...options }; this.reviewQueue = []; this.isProcessing = false; this.eventListeners = new Map(); } /** * 检查内容 */ async checkContent(content, userInfo = {}) { const requestId = `req_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`; // 添加到队列 this.reviewQueue.push({ requestId, content, userInfo, timestamp: Date.now() }); // 处理队列 if (!this.isProcessing) { this.isProcessing = true; await this.processQueue(); } return requestId; } async processQueue() { while (this.reviewQueue.length > 0) { const task = this.reviewQueue.shift(); try { const result = await this.sendReviewRequest( task.content, task.userInfo ); // 触发事件 this.dispatchEvent('reviewcomplete', { requestId: task.requestId, result }); // 根据结果处理 if (result.is_passed) { this.dispatchEvent('contentapproved', { requestId: task.requestId, content: task.content }); } else { this.handleViolations(task.requestId, task.content, result); } } catch (error) { this.dispatchEvent('reviewerror', { requestId: task.requestId, error: error.message }); } } this.isProcessing = false; } async sendReviewRequest(content, userInfo) { const requestBody = { content: content, user_id: userInfo.userId || this.getUserId(), check_level: this.options.checkLevel, auto_replace: this.options.autoReplace }; const response = await fetch(`${this.options.apiEndpoint}/review`, { method: 'POST', headers: { 'Content-Type': 'application/json', 'Accept': 'application/json' }, body: JSON.stringify(requestBody) }); if (!response.ok) { throw new Error(`审核请求失败: ${response.status}`); } return await response.json(); } handleViolations(requestId, originalContent, reviewResult) { const violations = reviewResult.violations; const suggestedContent = reviewResult.suggested_content; // 触发违禁内容事件 this.dispatchEvent('violationsfound', { requestId, violations, originalContent, suggestedContent }); // 根据风险级别处理 if (reviewResult.risk_level === 'critical') { // 高危内容,直接拦截 this.dispatchEvent('contentblocked', { requestId, reason: '包含高危违禁内容', violations }); } else if (reviewResult.risk_level === 'high') { // 高风险内容,建议替换 this.showReplacementDialog( originalContent, suggestedContent, violations ); } else if (reviewResult.needs_human_review) { // 需要人工审核 this.dispatchEvent('needshumanreview', { requestId, originalContent, violations }); } else { // 低风险,仅警告 this.showWarningDialog(violations); } } showReplacementDialog(originalContent, suggestedContent, violations) { // 创建替换对话框 const dialog = document.createElement('div'); dialog.className = 'weiaibang-review-dialog'; dialog.innerHTML = ` <div class="review-dialog-content"> <h3>内容优化建议</h3> <p>系统检测到${violations.length}处需要修改的内容:</p> <div class="violation-list"> ${violations.map(v => ` <div class="violation-item"> <span class="violation-text">"${v.matched_text}"</span> <span class="violation-level level-${v.level}">${this.getLevelText(v.level)}</span> <span class="violation-suggestion">建议改为:"${v.suggested_replacement}"</span> </div> `).join('')} </div> <div class="original-content"> <h4>原文:</h4> <pre>${originalContent}</pre> </div> <div class="suggested-content"> <h4>建议修改为:</h4> <pre>${suggestedContent}</pre> </div> <div class="dialog-actions"> <button class="btn-accept">接受建议</button> <button class="btn-edit">手动修改</button> <button class="btn-cancel">取消发送</button> </div> </div> `; document.body.appendChild(dialog); // 添加事件监听 dialog.querySelector('.btn-accept').addEventListener('click', () => { this.dispatchEvent('contentreplaced', { originalContent, newContent: suggestedContent, violations }); dialog.remove(); }); dialog.querySelector('.btn-edit').addEventListener('click', () => { this.showEditor(originalContent, violations); dialog.remove(); }); dialog.querySelector('.btn-cancel').addEventListener('click', () => { this.dispatchEvent('reviewcancelled', { originalContent }); dialog.remove(); }); } showWarningDialog(violations) { // 显示警告对话框 const warning = document.createElement('div'); warning.className = 'weiaibang-warning-dialog'; warning.innerHTML = ` <div class="warning-content"> <h3>请注意用语规范</h3> <p>检测到${violations.length}处需要注意的内容:</p> <ul> ${violations.map(v => `<li>"${v.matched_text}" - ${v.suggested_replacement || '建议修改'}</li>`).join('')} </ul> <button class="btn-continue">继续发送</button> </div> `; document.body.appendChild(warning); warning.querySelector('.btn-continue').addEventListener('click', () => { warning.remove(); }); } showEditor(originalContent, violations) { // 显示编辑界面 const editor = document.createElement('div'); editor.className = 'weiaibang-content-editor'; // 创建编辑器界面 // 简化实现 } getLevelText(level) { const levelMap = { 1: '高危', 2: '高风险', 3: '中风险', 4: '低风险', 5: '注意' }; return levelMap[level] || '注意'; } getUserId() { let userId = localStorage.getItem('weiaibang_user_id'); if (!userId) { userId = 'user_' + Math.random().toString(36).substr(2, 9); localStorage.setItem('weiaibang_user_id', userId); } return userId; } /** * 事件监听 */ on(eventName, callback) { if (!this.eventListeners.has(eventName)) { this.eventListeners.set(eventName, []); } this.eventListeners.get(eventName).push(callback); } dispatchEvent(eventName, data) { const listeners = this.eventListeners.get(eventName) || []; listeners.forEach(callback => { try { callback(data); } catch (error) { console.error(`Error in event listener for ${eventName}:`, error); } }); } /** * 实时检查(用于输入框) */ enableRealtimeCheck(textareaSelector) { if (!this.options.realtimeCheck) return; const textarea = document.querySelector(textareaSelector); if (!textarea) return; let checkTimer; const checkInterval = 1000; // 1秒 textarea.addEventListener('input', () => { clearTimeout(checkTimer); checkTimer = setTimeout(() => { const content = textarea.value; if (content.trim().length > 0) { this.checkContent(content); } }, checkInterval); }); } } // CSS样式 const reviewStyles = ` .weiaibang-review-dialog { position: fixed; top: 0; left: 0; width: 100%; height: 100%; background: rgba(0, 0, 0, 0.5); display: flex; align-items: center; justify-content: center; z-index: 10000; } .review-dialog-content { background: white; border-radius: 8px; padding: 24px; max-width: 600px; max-height: 80vh; overflow-y: auto; box-shadow: 0 4px 20px rgba(0, 0, 0, 0.15); } .violation-list { margin: 16px 0; border: 1px solid #eee; border-radius: 4px; padding: 12px; } .violation-item { padding: 8px; border-bottom: 1px solid #f5f5f5; } .violation-item:last-child { border-bottom: none; } .violation-text { font-weight: bold; color: #e74c3c; } .violation-level { display: inline-block; padding: 2px 8px; border-radius: 12px; font-size: 12px; margin-left: 8px; } .level-1 { background: #ff4757; color: white; } .level-2 { background: #ff6b81; color: white; } .level-3 { background: #ffa502; color: white; } .level-4 { background: #2ed573; color: white; } .level-5 { background: #70a1ff; color: white; } .dialog-actions { margin-top: 20px; text-align: right; } .dialog-actions button { padding: 8px 16px; margin-left: 8px; border: none; border-radius: 4px; cursor: pointer; } .btn-accept { background: #2ed573; color: white; } .btn-edit { background: #70a1ff; color: white; } .btn-cancel { background: #ff6b81; color: white; } `; // 添加样式到页面 const styleElement = document.createElement('style'); styleElement.textContent = reviewStyles; document.head.appendChild(styleElement); // 使用示例 document.addEventListener('DOMContentLoaded', () => { const contentReview = new WeiAiBangContentReview({ checkLevel: 'normal', realtimeCheck: true, autoReplace: true }); // 监听事件 contentReview.on('reviewcomplete', (data) => { console.log('审核完成:', data.requestId); }); contentReview.on('violationsfound', (data) => { console.log('发现违禁内容:', data.violations); }); contentReview.on('contentapproved', (data) => { console.log('内容通过审核,可以发送'); // 这里可以执行发送逻辑 }); contentReview.on('contentblocked', (data) => { alert('内容包含高危违禁词,无法发送'); }); // 实时检查输入框 contentReview.enableRealtimeCheck('#letterContent'); // 提交前检查 document.getElementById('sendButton').addEventListener('click', async (e) => { e.preventDefault(); const content = document.getElementById('letterContent').value; if (!content.trim()) { alert('请输入信件内容'); return; } // 开始审核 const requestId = await contentReview.checkContent(content, { userId: 'user_123' }); console.log('审核请求已提交:', requestId); }); });四、部署与运维
4.1 Docker部署配置
# Dockerfile FROM python:3.9-slim WORKDIR /app # 安装系统依赖 RUN apt-get update && apt-get install -y \ gcc \ g++ \ && rm -rf /var/lib/apt/lists/* # 安装Python依赖 COPY requirements.txt . RUN pip install --no-cache-dir -r requirements.txt # 复制应用代码 COPY . . # 创建非root用户 RUN useradd -m -u 1000 appuser && chown -R appuser:appuser /app USER appuser # 暴露端口 EXPOSE 8000 # 启动命令 CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]4.2 监控与告警配置
# prometheus.yml scrape_configs: - job_name: 'weiaibang_review_service' static_configs: - targets: ['review-service:8000'] metrics_path: '/metrics' - job_name: 'review_api' static_configs: - targets: ['review-service:8000'] metrics_path: '/api/v1/content/metrics' # alerting.yml groups: - name: content_review_alerts rules: - alert: HighBlockRate expr: rate(content_review_blocked_total[5m]) > 0.1 for: 5m labels: severity: warning annotations: summary: "内容拦截率过高" description: "过去5分钟内内容拦截率超过10%" - alert: SlowProcessing expr: content_review_processing_time_seconds{quantile="0.9"} > 1 for: 10m labels: severity: warning annotations: summary: "内容审核处理缓慢" description: "90%的审核请求处理时间超过1秒" - alert: ServiceDown expr: up{job="weiaibang_review_service"} == 0 for: 1m labels: severity: critical annotations: summary: "审核服务不可用" description: "内容审核服务已下线超过1分钟"五、性能优化策略
5.1 缓存策略
# cache_optimizer.py class ReviewCacheOptimizer: """审核缓存优化器""" def __init__(self): self.redis_client = redis.Redis( host='localhost', port=6379, decode_responses=False ) self.local_cache = {} self.cache_hits = 0 self.cache_misses = 0 async def get_cached_result(self, content_hash: str) -> Optional[Dict]: """获取缓存结果(三级缓存)""" # 1. 本地内存缓存 if content_hash in self.local_cache: self.cache_hits += 1 return self.local_cache[content_hash] # 2. Redis缓存 redis_key = f"review:cache:{content_hash}" cached_data = self.redis_client.get(redis_key) if cached_data: result = pickle.loads(cached_data) self.local_cache[content_hash] = result self.cache_hits += 1 return result self.cache_misses += 1 return None async def set_cache_result(self, content_hash: str, result: Dict, ttl: int = 300): """设置缓存结果""" # 本地缓存 self.local_cache[content_hash] = result # Redis缓存 redis_key = f"review:cache:{content_hash}" self.redis_client.setex( redis_key, ttl, pickle.dumps(result) ) def get_cache_stats(self) -> Dict: """获取缓存统计""" total = self.cache_hits + self.cache_misses hit_rate = self.cache_hits / total if total > 0 else 0 return { "hits": self.cache_hits, "misses": self.cache_misses, "hit_rate": f"{hit_rate:.2%}", "local_cache_size": len(self.local_cache) }5.2 异步处理优化
# async_processor.py import asyncio from concurrent.futures import ThreadPoolExecutor from queue import PriorityQueue import time class AsyncReviewProcessor: """异步审核处理器""" def __init__(self, max_workers: int = 10): self.thread_pool = ThreadPoolExecutor(max_workers=max_workers) self.priority_queue = PriorityQueue() self.processing_tasks = {} self.max_batch_size = 50 async def process_batch_async(self, contents: List[str]) -> List[Dict]: """批量异步处理""" batch_size = min(len(contents), self.max_batch_size) tasks = [] # 分批处理 for i in range(0, len(contents), batch_size): batch = contents[i:i + batch_size] task = asyncio.create_task( self._process_batch(batch) ) tasks.append(task) # 等待所有任务完成 results = await asyncio.gather(*tasks, return_exceptions=True) # 合并结果 all_results = [] for result in results: if isinstance(result, Exception): # 错误处理 all_results.extend([{"error": str(result)}] * batch_size) else: all_results.extend(result) return all_results async def _process_batch(self, batch: List[str]) -> List[Dict]: """处理单个批次""" loop = asyncio.get_event_loop() # 在线程池中运行CPU密集型任务 results = await loop.run_in_executor( self.thread_pool, self._process_batch_sync, batch ) return results def _process_batch_sync(self, batch: List[str]) -> List[Dict]: """同步处理批次""" results = [] for content in batch: # 这里调用同步的检测方法 result = filter_engine.detect_prohibited_content(content) results.append(result) return results六、测试方案
6.1 单元测试
# test_content_review.py import pytest from unittest.mock import Mock, patch class TestContentReviewSystem: @pytest.fixture def review_system(self): from content_review import IntelligentFilterEngine, ProhibitedWordLibrary library = ProhibitedWordLibrary() engine = IntelligentFilterEngine(library) return engine def test_clean_text(self, review_system): """测试文本清洗""" dirty_text = "这是一段<html>标签</html>和特殊@#字符的文本!" cleaned = review_system._clean_text(dirty_text) assert "html" not in cleaned assert "@" not in cleaned assert "!" in cleaned def test_exact_match(self, review_system): """测试精确匹配""" text = "我计划越狱逃跑" result = review_system.detect_prohibited_content(text) assert len(result["violations"]) > 0 assert result["risk_level"] == "critical" def test_exception_case(self, review_system): """测试例外情况""" text = "我们要远离毒品" result = review_system.detect_prohibited_content(text) # 远离毒品应该被识别为例外 assert len(result["violations"]) == 0 def test_suggested_replacement(self, review_system): """测试建议替换""" text = "他们昨天打架了" result = review_system.detect_prohibited_content(text) if result["violations"]: violation = result["violations"][0] assert "发生冲突" in result["suggested_content"] @pytest.mark.asyncio async def test_api_endpoint(self): """测试API端点""" from fastapi.testclient import TestClient from main import app client = TestClient(app) response = client.post("/api/v1/content/review", json={ "content": "测试内容", "user_id": "test_user", "check_level": "normal" }) assert response.status_code == 200 data = response.json() assert "request_id" in data assert "risk_level" in data七、安全与合规
7.1 数据安全措施
内容加密存储:所有审核记录加密存储
访问审计:记录所有API访问日志
权限控制:基于角色的访问控制
数据脱敏:日志中的敏感信息脱敏处理
定期清理:自动清理过期审核记录
7.2 合规性保障
# compliance_manager.py class ReviewComplianceManager: """审核合规性管理""" @staticmethod def ensure_compliance(review_result: Dict) -> Dict: """确保审核结果合规""" compliant_result = review_result.copy() # 1. 隐私保护:移除敏感信息 if "original_content" in compliant_result: compliant_result["original_content_hash"] = hashlib.md5( compliant_result["original_content"].encode() ).hexdigest() del compliant_result["original_content"] # 2. 审计日志:记录审核操作 audit_log = { "timestamp": datetime.now().isoformat(), "action": "content_review", "result": compliant_result["risk_level"], "violation_count": compliant_result["violations_count"], "reviewer": "system" if not compliant_result["needs_human_review"] else "human" } # 3. 数据保留策略 retention_days = 90 # 保留90天 compliant_result["expires_at"] = ( datetime.now() + timedelta(days=retention_days) ).isoformat() return compliant_result @staticmethod def generate_compliance_report(start_date: datetime, end_date: datetime) -> Dict: """生成合规报告""" # 统计审核数据 stats = { "period": f"{start_date.date()} 至 {end_date.date()}", "total_reviews": 0, "blocked_content": 0, "human_reviews": 0, "false_positives": 0, "compliance_rate": 1.0 } # 这里应该从数据库查询实际数据 # 简化实现 return stats八、总结
8.1 系统特点
智能识别:结合规则匹配和语义分析
精准过滤:考虑上下文和例外情况
柔性处理:分级处置,最小干预
实时响应:毫秒级审核,不影响用户体验
持续进化:基于反馈持续优化词库和算法
8.2 社会价值
安全保障:确保通信内容安全合规
人文关怀:保留情感表达,只做必要修改
教育引导:帮助用户规范用语
信任建立:增强平台公信力和用户信任
8.3 技术亮点
多级过滤策略:快速匹配 + 深度分析
智能替换算法:保留原意的精准替换
上下文感知:准确判断违禁词的真实意图
高性能架构:支持高并发实时审核
可扩展设计:支持动态词库和算法升级
微爱帮的违禁词预审系统不仅是一个技术工具,更是我们守护特殊群体通信安全的承诺。我们用最严谨的技术,传递最温暖的关怀,让每一封家书都能安全、温暖地抵达。