从PDF到决策:构建生产就绪的AI系统自动化票据处理流程
TL;DR — 你将获得的内容
一套可运行、面向生产的设计方案,包含完整的代码片段,用于构建一个AI代理。该代理能够处理发票PDF文件、运行OCR、使用大语言模型提取结构化字段、验证数据、应用确定性业务逻辑并存储可审计的结果。所有内容都经过精心编写,工程师可以直接复制粘贴到代码仓库中并在本地运行。包含测试、数据库迁移、指标监控和操作控制。
语气:实用且有主见。本文旨在通过代码审查、合规性问题和凌晨三点的生产告警——而不是为了赢得奖项竞赛。
AI获胜的关键不在于聪明,而在于可靠。
为什么你应该关注
大多数生成式AI文章都止步于单一的大语言模型调用。在实践中,真正的系统会在那些看似无聊但至关重要的基础设施方面出问题——例如数据摄取的边界情况、重试机制、可观测性缺口以及静默的验证错误。
本文展示了如何构建一个AI系统——而不是一个演示——来替代人工发票处理流程,同时保持可审计性、安全性和可维护性。
仓库结构
ai-agent-invoice/ ├─ docker-compose.yml ├─ Dockerfile ├─ requirements.txt ├─ alembic/ │ └─ versions/001_create_tables.py ├─ app/ │ ├─ main.py │ ├─ config.py │ ├─ services/ │ │ ├─ ingestion.py │ │ ├─ preprocess.py │ │ ├─ ocr.py │ │ ├─ extractor.py │ │ ├─ llm_adapters/ │ │ ├─ validator.py │ │ ├─ decision.py │ │ └─ storage.py │ ├─ models/invoice.py │ ├─ db/repo.py │ └─ utils/ │ ├─ logging.py │ ├─ metrics.py │ └─ retry.py └─ infra/prometheus.yml快速浏览清单
- 架构先行:Pydantic v2模型是唯一的真相来源。
- 大语言模型作为提取器,而非决策器。业务规则存在于代码中。
- 使用Decimal处理金额;使用dateutil处理日期。
- 使用流式上传;采用幂等性和内容哈希。
- 持久化原始PDF、OCR输出和模型元数据以便审计。
- 使用后台工作器(开发环境:BackgroundTasks;生产环境:Celery/RQ/Kafka)。
- 全面监控:Prometheus指标、结构化JSON日志和追踪。
- 如果在六个月后无法解释某个决策的原因,那么这个系统就是不完整的。
高层流程(运行时顺序)
- 客户端上传PDF →
/ingest端点(流式传输文件并返回document_id) - 后台工作器将PDF转换为图像(进行去歪斜和清理)
- 对每个页面运行OCR(本地Tesseract或第三方服务)→ 页面级文本和置信度分数
- 大语言模型使用严格的模式提取结构化字段 → JSON(优先使用函数调用/结构化输出;自由文本解析是最后的手段)
- 使用Pydantic进行规范化与验证(Decimal用于金额,ISO格式日期)
- 确定性决策引擎自动批准或将发票加入人工审核队列
- 持久化结果(process_results JSONB + provider_meta)并发出指标和结构化日志
关键设计原则(简洁清晰)
- 架构先行。架构是真相的来源。大语言模型提取到架构字段中;Pydantic验证它们。
- 关注点分离。在不改变业务逻辑的情况下,更换OCR或大语言模型提供商。
- 确定性决策。如果你需要确定性的结果,请使用版本化的策略将其编码在代码中。
- 审计一切。持久化原始PDF、OCR文本、大语言模型输出、模型和修订详情,以及所有审核操作。
- 默认安全失败。低置信度的数据路由到人工审核。没有静默的自动批准。
- 幂等性。使用X-Idempotency-Key加上内容哈希来避免重复处理。
核心文件
这里只包含最核心的文件——那些审查者会首先阅读的文件。将它们放入仓库布局中所示的路径,根据你的环境进行调整,并根据需要进行修改。
app/models/invoice.py — 架构(Pydantic v2)
# app/models/invoice.pyfrompydanticimportBaseModel,FieldfromdecimalimportDecimalfromdatetimeimportdateclassInvoice(BaseModel):invoice_number:strinvoice_date:date vendor_name:strtotal_amount:Decimal=Field(...,gt=Decimal("0"))currency:str注意:使用Decimal避免金额四舍五入错误。日期存储为ISO格式。
app/config.py — 设置(环境驱动)
# app/config.pyfrompydanticimportBaseSettingsclassSettings(BaseSettings):DATABASE_URL:strMISTRAL_API_KEY:str|None=NoneMAX_UPLOAD_BYTES:int=20*1024*1024# 20MB defaultAPPROVAL_THRESHOLD:float=5000.0classConfig:env_file=".env"settings=Settings()app/services/ingestion.py — 流式传输 + 幂等性 + 内容哈希
# app/services/ingestion.pyimportos,uuid,hashlibfromfastapiimportUploadFilefromapp.db.repoimportsave_document,get_document_by_idempotencyfromapp.utils.retryimportretry_backofffromapp.configimportsettings UPLOAD_DIR=os.getenv("UPLOAD_DIR","/data/documents")os.makedirs(UPLOAD_DIR,exist_ok=True)@retry_backoff()asyncdefsave_uploaded_file(file:UploadFile,idempotency_key:str|None=None)->str:ifidempotency_key:existing=get_document_by_idempotency(idempotency_key)ifexisting:returnexisting.document_id document_id=str(uuid.uuid4())path=os.path.join(UPLOAD_DIR,f"{document_id}.pdf")size=0withopen(path,"wb")asout_f:whileTrue:chunk=awaitfile.read(1024*1024)ifnotchunk:breaksize+=len(chunk)ifsize>settings.MAX_UPLOAD_BYTES:out_f.close()os.remove(path)raiseValueError("File too large")out_f.write(chunk)sha256=hashlib.sha256()withopen(path,"rb")asf:forblockiniter(lambda:f.read(65536),b""):sha256.update(block)content_hash=sha256.hexdigest()save_document(document_id=document_id,path=path,idempotency_key=idempotency_key,content_hash=content_hash)returndocument_id流式传输可防止内存不足问题;内容哈希和幂等键可防止重复处理。我们在同一个发票以不同文件名上传两次并自动处理两次后,艰难地学到了这一点。
app/services/preprocess.py — PDF→图像 + 去歪斜
# app/services/preprocess.pyfrompdf2imageimportconvert_from_path,convert_from_bytesimportcv2,numpyasnpimporttempfile,osdefpreprocess_pdf_to_images(pdf_path:str,dpi:int=300)->list[str]:images=convert_from_path(pdf_path,dpi=dpi)out_paths=[]fori,pil_imginenumerate(images):img=cv2.cvtColor(np.array(pil_img),cv2.COLOR_RGB2BGR)img=_deskew_image_safe(img)tmp_path=os.path.join(tempfile.gettempdir(),f"{os.path.basename(pdf_path)}_page_{i}.png")cv2.imwrite(tmp_path,img)out_paths.append(tmp_path)returnout_pathsdef_deskew_image_safe(img):try:gray=cv2.cvtColor(img,cv2.COLOR_BGR2GRAY)coords=np.column_stack(np.where(gray>0))ifcoords.size==0:returnimg angle=cv2.minAreaRect(coords)[-1]ifangle<-45:angle=-(90+angle)else:angle=-angle(h,w)=img.shape[:2]M=cv2.getRotationMatrix2D((w//2,h//2),angle,1.0)rotated=cv2.warpAffine(img,M,(w,h),flags=cv2.INTER_CUBIC,borderMode=cv2.BORDER_REPLICATE)returnrotatedexceptException:returnimg注意:pdf2image有系统级依赖。这是一个无聊的设置步骤,如果跳过,它会悄悄降低OCR质量。DPI 300是扫描文档的安全默认值。
app/services/ocr.py
# app/services/ocr.pyfromPILimportImageimportpytesseractfromtypingimportNamedTupleclassOCRResult(NamedTuple):text:strconfidence:floatdeflocal_tesseract_ocr(image_path:str)->OCRResult:img=Image.open(image_path)data=pytesseract.image_to_data(img,output_type=pytesseract.Output.DICT)lines=[]confidences=[]fori,textinenumerate(data.get("text",[])):ifstr(text).strip():lines.append(text)try:conf_val=data.get("conf",[])[i]conf=float(conf_val)ifconf>=0:confidences.append(conf)exceptException:continuetext="\n".join(lines)avg_conf=(sum(confidences)/len(confidences)/100.0)ifconfidenceselse0.0returnOCRResult(text=text,confidence=avg_conf)注意:实现提供商适配器(例如,某机构的Textract或Vision LLM),返回相同的OCRResult接口。这使得管道的其余部分与提供商无关。
app/services/extractor.py — 适配器 + 稳健的JSON提取
# app/services/extractor.pyimportjson,refromapp.configimportsettingsdefextract_json_from_text(text:str)->str:text=re.sub(r"```(?:json)?","",text)brace_idx=text.find("{")ifbrace_idx==-1:raiseValueError("No JSON object found in response")stack=0start=-1fori,chinenumerate(text[brace_idx:],start=brace_idx):ifch=="{":ifstart==-1:start=i stack+=1elifch=="}":stack-=1ifstack==0:json_str=text[start:i+1]try:parsed=json.loads(json_str)returnjson.dumps(parsed)exceptException:breakm=re.search(r"\{.*\}",text,flags=re.DOTALL)ifm:returnm.group(0)raiseValueError("Could not extract JSON from response")# Provider adapter example (conceptual)classOpenAIAdapter:def__init__(self,client):self.client=clientdefextract_with_schema(self,text:str,schema:dict)->dict:# Prefer function-calling / structured outputs if provider supports it.resp_raw=self.client.call_model(text,schema=schema,temperature=0.0)json_str=extract_json_from_text(resp_raw)returnjson.loads(json_str)注意:当提供商支持时,始终优先使用结构化输出(函数调用)。自由文本解析之所以存在,只是因为真实的模型仍然会以令人惊讶的方式失败。
app/services/validator.py — 规范化 & 验证(Decimal, dateutil)
# app/services/validator.pyfromapp.models.invoiceimportInvoicefrompydanticimportValidationErrorfromdateutilimportparserasdate_parserfromdecimalimportDecimalimportredefnormalize_numbers_and_dates(raw:dict)->dict:amount=raw.get("total_amount")ifisinstance(amount,str):amount=amount.replace(",","").strip()amount=re.sub(r"[^\d.\-]","",amount)raw["total_amount"]=Decimal(amount)ifamountelseNoneelifisinstance(amount,(int,float)):raw["total_amount"]=Decimal(str(amount))date_val=raw.get("invoice_date")ifisinstance(date_val,str):try:d=date_parser.parse(date_val,dayfirst=False)raw["invoice_date"]=d.date().isoformat()exceptException:passreturnrawdefvalidate_invoice(raw:dict)->Invoice:raw=normalize_numbers_and_dates(raw)try:invoice=Invoice.model_validate(raw)exceptValidationErrorase:raiseRuntimeError(f"Validation failed:{e}")returninvoice注意:在生产环境中明确处理区域设置差异(例如,日期格式和小数点分隔符)。
app/services/decision.py — 确定性策略
# app/services/decision.pyfromdecimalimportDecimalfromapp.configimportsettings APPROVAL_THRESHOLD=Decimal(str(settings.APPROVAL_THRESHOLD))defdecide(invoice):ifinvoice.total_amount<APPROVAL_THRESHOLD:return{"decision":"AUTO_APPROVED","reason":"Amount under threshold","policy_version":"v1"}return{"decision":"NEEDS_REVIEW","reason":"Amount exceeds threshold","policy_version":"v1"}重要:每个决策都持久化policy_version。当财务或合规部门问“为什么批准了这个?”,这个字段是你唯一可以辩护的答案。
app/db/repo.py — 简化持久化(概念性)
# app/db/repo.pyfromsqlalchemyimportcreate_engine,textfromsqlalchemy.ormimportsessionmakerfromapp.configimportsettings engine=create_engine(settings.DATABASE_URL)Session=sessionmaker(bind=engine)defsave_document(document_id,path,idempotency_key=None,content_hash=None):# Implement insert with unique constraints and transactionspassdefsave_process_result(document_id,result_json,provider_meta):# store JSONB recordpassdefget_document_by_idempotency(key):# return document row if existspass确保存储库包含完整的SQL/ORM实现和Alembic迁移(见下文),以获得完整、可运行的设置。
数据库DDL(Postgres) — infra/db_schema.sql
CREATETABLEdocuments(document_idTEXTPRIMARYKEY,pathTEXTNOTNULL,statusTEXTNOTNULL,idempotency_keyTEXTUNIQUE,content_hashTEXTUNIQUE,created_at TIMESTAMPTZDEFAULTnow());CREATETABLEprocess_results(id BIGSERIALPRIMARYKEY,document_idTEXTREFERENCESdocuments(document_id),result_json JSONBNOTNULL,provider_meta JSONB,created_at TIMESTAMPTZDEFAULTnow());CREATETABLEreview_queue(id BIGSERIALPRIMARYKEY,document_idTEXT,reasonTEXT,added_at TIMESTAMPTZDEFAULTnow(),resolvedBOOLEANDEFAULTFALSE);提示:对result_json和provider_meta使用JSONB来存储所有相关元数据,包括llm_provider、model、revision和prompt_hash。
可观测性(快速指南)
- 结构化日志:使用structlog发出JSON日志,包括trace_id、document_id、step和status。
- 指标(Prometheus):跟踪计数器,如documents_ingested_total和documents_processed_total,并使用直方图记录processing_duration_seconds。
- 追踪:使用OpenTelemetry检测FastAPI和数据库,导出到OTLP收集器。在日志中包含trace_id以实现端到端的可追溯性。
Alembic示例(迁移框架)
alembic/versions/001_create_tables.py
fromalembicimportopimportsqlalchemyassa revision='001'defupgrade():op.create_table('documents',sa.Column('document_id',sa.Text,primary_key=True),sa.Column('path',sa.Text,nullable=False),sa.Column('status',sa.Text,nullable=False),sa.Column('idempotency_key',sa.Text,nullable=True),sa.Column('content_hash',sa.Text,nullable=True),sa.Column('created_at',sa.TIMESTAMP(timezone=True),server_default=sa.text('now()')))# create other tables...defdowngrade():op.drop_table('documents')人工干预(审核)端点 — 框架
GET /reviews/queue— 获取待审核文档列表POST /reviews/{document_id}/resolve— 提交带有 { decision, note, override_by } 的操作
记录每次审核操作,附带user_id和时间戳以便审计。
UI:显示原始PDF、页面级OCR结果、带有置信度得分的提取JSON,并提供操作按钮:批准、拒绝和编辑。
数据保留和删除(合规性)
实现DELETE /documents/{id}作为软删除:将文档标记为已删除,并在保留TTL(生存时间)后安排数据块清除。
始终尊重法律保留要求,并维护完整的删除审计追踪。
必须包含的测试
- 验证器测试:覆盖日期格式和货币格式的所有边界情况。
- 提取器测试:稳健地解析JSON,包括围栏式代码块和额外注释。
- 端到端测试:使用提供商模拟器来模拟上传 → 处理 → 数据库断言。
- 安全测试:验证RBAC端点和权限。
快速本地运行
- 启动应用:
docker-compose up --build - 上传示例发票:
curl -F "file=@tests/fixtures/invoice_sample.pdf" http://localhost:8000/ingest - 通过在process_results表中检查JSON和provider_meta来验证结果。
- 爬取 /metrics 端点(Prometheus)查看计数器是否递增。
清单
- 上传使用流式传输(不要将整个文件加载到内存中)。
- 一致使用Invoice.model_validate / model_dump。
- 对total_amount使用Decimal并稳健地处理日期解析。
- 确保提供商适配器存在,并在测试中正确模拟。
- 在content_hash和idempotency_key上应用唯一约束。
- 为审计持久化原始OCR和大语言模型输出。
- 包含Prometheus /metrics和示例仪表板。
- Alembic迁移应被包含并经过测试。
- 记录后台处理/队列模式。
潜在陷阱
- 扫描图像质量极差:OCR可能会失败。考虑添加Vision LLM或云端OCR以获得更好的结果。
- 模糊的日期格式(DD/MM vs MM/DD):确保你的策略明确选择一个区域设置。
- 意外的大语言模型输出结构:如果可用,使用函数调用或架构强制执行;或者进行防御性解析并记录原始输出。
注意:这些不是错误——它们是你在生产的头几周内会遇到的操作现实。
总结
本文像为队友编写代码一样编写:小巧、清晰、可靠的组件,附带审计追踪和操作控制。它很紧凑,所以你可以快速掌握构建生产就绪AI系统的关键实践,这些系统是安全、可审计和可维护的。将此视为你进行真实世界AI工程的实用蓝图。
更多精彩内容 请关注我的个人公众号 公众号(办公AI智能小助手)或者 我的个人博客 https://blog.qife122.com/
对网络安全、黑客技术感兴趣的朋友可以关注我的安全公众号(网络安全技术点滴分享)