news 2026/6/24 0:33:37

Python 爬虫框架设计:类封装与工程化实践

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
Python 爬虫框架设计:类封装与工程化实践

在 1688 数据采集等爬虫场景中,类封装能实现代码的复用与解耦,工程化则保障爬虫的稳定性、可维护性和可扩展性。本文将结合 1688 爬虫的实际需求,从框架设计原则核心类封装工程化配套模块实战落地,完整讲解爬虫框架的设计与实现。

一、框架设计原则与整体架构

1. 核心设计原则

爬虫框架需遵循开闭原则(对扩展开放、对修改关闭)、单一职责(每个模块只做一件事)和依赖注入(模块间通过配置解耦),同时需适配 1688 的反爬特性(如动态渲染、IP 封禁)。

2. 整体架构分层

将爬虫拆分为5 个核心层,层与层之间通过接口交互,降低耦合:

层级职责核心实现方式
配置层管理爬虫参数(如代理、UA、爬取关键词)、存储配置等YAML/JSON 配置文件 + 配置类
请求层封装 HTTP 请求,处理反爬(代理、UA、延迟)、异常重试基础请求类 + 反爬中间件
解析层解析网页 / 接口数据,提取目标字段(如 1688 商品标题、价格)解析器基类 + 业务解析子类
存储层处理数据持久化(CSV/MySQL/MongoDB),支持数据去重存储基类 + 多存储实现子类
调度层管理爬取任务(分页、多线程 / 异步)、监控任务状态调度器类 + 任务队列

二、核心类封装实现

基于分层架构,我们通过类的继承与多态封装通用逻辑,再针对 1688 场景实现具体业务。

1. 环境准备

安装必备依赖:

bash

运行

pip install requests beautifulsoup4 pyyaml fake-useragent playwright pymongo mysql-connector-python playwright install chromium # 处理动态页面

2. 配置层封装(Config 类)

通过 YAML 配置文件管理参数,避免硬编码,便于后续修改。

配置文件(config.yaml)

yaml

# 爬虫基础配置 spider: keyword: "手机壳" # 1688搜索关键词 max_page: 5 # 最大爬取页数 delay: 3 # 请求延迟(秒) retry_times: 3 # 失败重试次数 # 反爬配置 anti_crawl: user_agent_pool: ["Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36...", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) ..."] proxy_pool: ["http://127.0.0.1:7890", "http://username:password@proxy.example.com:8080"] # 代理池 # 存储配置 storage: type: "csv" # 可选:csv/mongo/mysql csv_path: "./1688_products.csv" mongo: uri: "mongodb://localhost:27017/" db: "1688_spider" collection: "products" mysql: host: "localhost" port: 3306 user: "root" password: "123456" db: "1688_spider"

配置类封装(config.py)

python

运行

import yaml from typing import Dict, List, Any class Config: """配置管理类,加载并解析YAML配置文件""" def __init__(self, config_path: str = "./config.yaml"): self.config_path = config_path self.config = self._load_config() def _load_config(self) -> Dict[str, Any]: """加载YAML配置""" try: with open(self.config_path, "r", encoding="utf-8") as f: return yaml.safe_load(f) except FileNotFoundError: raise Exception(f"配置文件{self.config_path}不存在") except yaml.YAMLError as e: raise Exception(f"配置文件解析错误:{e}") def get(self, key: str, default: Any = None) -> Any: """按层级获取配置,如'spider.keyword'""" keys = key.split(".") value = self.config for k in keys: if k not in value: return default value = value[k] return value # 测试配置类 if __name__ == "__main__": config = Config() print(config.get("spider.keyword")) # 输出:手机壳 print(config.get("anti_crawl.proxy_pool")) # 输出代理池列表

3. 请求层封装(BaseRequest 类)

封装 HTTP 请求的通用逻辑(反爬、重试、延迟),支持同步请求和动态页面请求(Playwright)。

python

运行

import requests import time import random from typing import Dict, Any, Optional from fake_useragent import UserAgent from playwright.sync_api import sync_playwright from config import Config class BaseRequest: """请求基类,封装通用请求逻辑""" def __init__(self, config: Config): self.config = config self.ua = UserAgent() self.retry_times = self.config.get("spider.retry_times", 3) self.delay = self.config.get("spider.delay", 2) self.proxy_pool = self.config.get("anti_crawl.proxy_pool", []) self.ua_pool = self.config.get("anti_crawl.user_agent_pool", []) def _get_random_proxy(self) -> Optional[str]: """随机获取代理""" return random.choice(self.proxy_pool) if self.proxy_pool else None def _get_random_ua(self) -> str: """随机获取User-Agent""" return random.choice(self.ua_pool) if self.ua_pool else self.ua.random def _add_delay(self) -> None: """请求延迟,防反爬""" time.sleep(random.uniform(self.delay, self.delay + 2)) def get(self, url: str, params: Optional[Dict] = None, headers: Optional[Dict] = None) -> Optional[str]: """同步GET请求,支持重试和反爬""" headers = headers or {} headers["User-Agent"] = self._get_random_ua() proxies = {"http": self._get_random_proxy(), "https": self._get_random_proxy()} if self._get_random_proxy() else None for retry in range(self.retry_times): try: self._add_delay() resp = requests.get(url, params=params, headers=headers, proxies=proxies, timeout=10) resp.raise_for_status() # 抛出HTTP错误 return resp.text except Exception as e: print(f"请求失败(第{retry+1}次重试):{e}") time.sleep(2 ** retry) # 指数退避重试 return None def get_dynamic(self, url: str) -> Optional[Dict[str, Any]]: """动态页面请求(Playwright),返回关键数据""" result = {"title": None, "price": None, "sales": None} with sync_playwright() as p: browser = p.chromium.launch(headless=True) page = browser.new_page(user_agent=self._get_random_ua()) try: page.goto(url, timeout=30000) # 提取1688商品详情页关键数据(需根据页面结构调整) result["title"] = page.locator(".detail-title").inner_text() if page.locator(".detail-title").count() > 0 else None result["price"] = page.locator(".price").inner_text() if page.locator(".price").count() > 0 else None result["sales"] = page.locator(".sales-volume").inner_text() if page.locator(".sales-volume").count() > 0 else None self._add_delay() except Exception as e: print(f"动态页面请求失败:{e}") finally: browser.close() return result # 测试请求类 if __name__ == "__main__": config = Config() request = BaseRequest(config) html = request.get("https://s.1688.com/selloffer/offer_search.htm?keywords=手机壳") print(html[:500]) # 输出页面前500字符

4. 解析层封装(BaseParser 类)

封装数据解析的通用接口,子类实现具体的 1688 页面解析逻辑。

python

运行

from bs4 import BeautifulSoup from typing import List, Dict, Any from config import Config class BaseParser: """解析器基类,定义解析接口""" def __init__(self, config: Config): self.config = config def parse(self, html: str) -> List[Dict[str, Any]]: """解析接口,子类必须实现""" raise NotImplementedError("子类需实现parse方法") class Ali1688ListParser(BaseParser): """1688商品列表页解析器""" def parse(self, html: str) -> List[Dict[str, Any]]: """解析商品列表页,提取标题、价格、链接""" soup = BeautifulSoup(html, "lxml") products = soup.select(".sm-offer-item") result = [] for item in products: # 提取字段(需根据1688页面结构实时调整) title_elem = item.select_one(".offer-title a") price_elem = item.select_one(".price") link_elem = item.select_one(".offer-title a") if not (title_elem and price_elem and link_elem): continue product = { "title": title_elem.get("title", "").strip(), "price": price_elem.text.strip(), "link": link_elem.get("href", "").strip(), "source": "1688" } result.append(product) return result # 测试解析类 if __name__ == "__main__": config = Config() request = BaseRequest(config) parser = Ali1688ListParser(config) html = request.get("https://s.1688.com/selloffer/offer_search.htm?keywords=手机壳") if html: products = parser.parse(html) print(f"解析到{len(products)}个商品:") print(products[:2])

5. 存储层封装(BaseStorage 类)

支持多存储方式(CSV/MySQL/MongoDB),通过子类实现具体存储逻辑。

python

运行

import csv import pymongo import mysql.connector from typing import List, Dict, Any from config import Config class BaseStorage: """存储基类,定义存储接口""" def __init__(self, config: Config): self.config = config def save(self, data: List[Dict[str, Any]]) -> None: """存储接口,子类必须实现""" raise NotImplementedError("子类需实现save方法") class CsvStorage(BaseStorage): """CSV存储类""" def __init__(self, config: Config): super().__init__(config) self.csv_path = self.config.get("storage.csv_path", "./products.csv") # 初始化CSV文件并写入表头 with open(self.csv_path, "w", newline="", encoding="utf-8") as f: writer = csv.DictWriter(f, fieldnames=["title", "price", "link", "source"]) writer.writeheader() def save(self, data: List[Dict[str, Any]]) -> None: """将数据追加写入CSV""" with open(self.csv_path, "a", newline="", encoding="utf-8") as f: writer = csv.DictWriter(f, fieldnames=["title", "price", "link", "source"]) writer.writerows(data) print(f"成功写入{len(data)}条数据到CSV:{self.csv_path}") class MongoStorage(BaseStorage): """MongoDB存储类""" def __init__(self, config: Config): super().__init__(config) self.client = pymongo.MongoClient(self.config.get("storage.mongo.uri")) self.db = self.client[self.config.get("storage.mongo.db")] self.collection = self.db[self.config.get("storage.mongo.collection")] # 创建唯一索引,避免重复存储 self.collection.create_index("link", unique=True) def save(self, data: List[Dict[str, Any]]) -> None: """将数据写入MongoDB,自动去重""" if not data: return try: self.collection.insert_many(data, ordered=False) print(f"成功写入{len(data)}条数据到MongoDB") except pymongo.errors.BulkWriteError as e: # 忽略重复数据错误 print(f"部分数据重复,实际写入{len(data) - len(e.details['writeErrors'])}条") # 测试存储类 if __name__ == "__main__": config = Config() storage = CsvStorage(config) # 模拟数据 test_data = [ {"title": "苹果15手机壳", "price": "10.00", "link": "https://example.com/1", "source": "1688"}, {"title": "华为Mate60手机壳", "price": "8.50", "link": "https://example.com/2", "source": "1688"} ] storage.save(test_data)

6. 调度层封装(SpiderScheduler 类)

管理爬取任务的生命周期(分页、任务分发),整合请求、解析、存储模块。

python

运行

from typing import List, Dict, Any from config import Config from request import BaseRequest from parser import Ali1688ListParser from storage import BaseStorage, CsvStorage, MongoStorage class SpiderScheduler: """爬虫调度器,整合各模块并管理爬取任务""" def __init__(self, config: Config): self.config = config self.request = BaseRequest(config) self.parser = Ali1688ListParser(config) self.storage = self._init_storage() self.keyword = self.config.get("spider.keyword") self.max_page = self.config.get("spider.max_page") def _init_storage(self) -> BaseStorage: """根据配置初始化存储类""" storage_type = self.config.get("storage.type", "csv") if storage_type == "csv": return CsvStorage(self.config) elif storage_type == "mongo": return MongoStorage(self.config) else: raise ValueError(f"不支持的存储类型:{storage_type}") def build_url(self, page: int) -> str: """构建1688搜索页URL""" from urllib.parse import quote return f"https://s.1688.com/selloffer/offer_search.htm?keywords={quote(self.keyword)}&page={page}" def run(self) -> None: """启动爬虫任务""" print(f"开始爬取1688关键词【{self.keyword}】,共{self.max_page}页") all_data = [] for page in range(1, self.max_page + 1): print(f"正在爬取第{page}页...") url = self.build_url(page) html = self.request.get(url) if not html: print(f"第{page}页爬取失败,跳过") continue # 解析数据 page_data = self.parser.parse(html) if page_data: all_data.extend(page_data) # 实时存储 self.storage.save(page_data) print(f"爬取完成,总计获取{len(all_data)}条商品数据") # 测试调度器 if __name__ == "__main__": config = Config() scheduler = SpiderScheduler(config) scheduler.run()

三、工程化配套模块

1. 日志系统(Logging)

替换 print 语句,使用 Python 标准库logging实现分级日志(INFO/ERROR),便于问题排查。

python

运行

import logging import os def init_logger() -> None: """初始化日志系统""" # 创建日志目录 if not os.path.exists("logs"): os.makedirs("logs") # 配置日志格式 log_format = "%(asctime)s - %(name)s - %(levelname)s - %(message)s" # 写入文件 + 控制台输出 logging.basicConfig( level=logging.INFO, format=log_format, handlers=[ logging.FileHandler("logs/1688_spider.log", encoding="utf-8"), logging.StreamHandler() ] ) # 在调度器中使用日志 if __name__ == "__main__": init_logger() logger = logging.getLogger(__name__) logger.info("爬虫启动") try: config = Config() scheduler = SpiderScheduler(config) scheduler.run() logger.info("爬虫结束") except Exception as e: logger.error(f"爬虫异常:{e}", exc_info=True)

2. 异常处理体系

在核心模块中定义自定义异常,便于精准捕获和处理不同类型的错误:

python

运行

# exceptions.py class SpiderRequestError(Exception): """请求异常""" pass class SpiderParseError(Exception): """解析异常""" pass class SpiderStorageError(Exception): """存储异常""" pass # 在请求层中抛出自定义异常 def get(self, url: str) -> Optional[str]: for retry in range(self.retry_times): try: # ... 原有逻辑 ... return resp.text except Exception as e: if retry == self.retry_times - 1: raise SpiderRequestError(f"请求{url}失败:{e}") time.sleep(2 ** retry) return None

3. 代理池集成

对于大规模爬取,可对接第三方代理池(如阿布云、快代理)或自建代理池,通过 API 动态获取可用代理:

python

运行

def _get_proxy_from_pool(self) -> Optional[str]: """从代理池API获取可用代理""" proxy_api = "http://proxy.example.com/get_proxy" try: resp = requests.get(proxy_api, timeout=5) return resp.json().get("proxy") except Exception as e: print(f"获取代理失败:{e}") return None

四、工程化扩展与最佳实践

1. 多线程 / 异步爬取

针对单线程效率低的问题,可使用concurrent.futures.ThreadPoolExecutor实现多线程,或用aiohttp实现异步爬取(注意 1688 的反爬限制,避免并发过高)。

python

运行

from concurrent.futures import ThreadPoolExecutor def run_multi_thread(self) -> None: """多线程爬取""" with ThreadPoolExecutor(max_workers=3) as executor: # 控制并发数 executor.map(self.crawl_page, range(1, self.max_page + 1)) def crawl_page(self, page: int) -> None: """单页爬取逻辑,供线程调用""" url = self.build_url(page) html = self.request.get(url) if html: page_data = self.parser.parse(html) self.storage.save(page_data)

2. 爬虫监控与告警

通过prometheus+grafana监控爬虫的爬取量、失败率,或通过邮件 / 钉钉机器人在爬虫异常时发送告警:

python

运行

import smtplib from email.mime.text import MIMEText def send_alert_email(message: str) -> None: """发送告警邮件""" msg = MIMEText(message, "plain", "utf-8") msg["Subject"] = "1688爬虫异常告警" msg["From"] = "sender@example.com" msg["To"] = "receiver@example.com" smtp = smtplib.SMTP_SSL("smtp.example.com", 465) smtp.login("sender@example.com", "password") smtp.sendmail("sender@example.com", ["receiver@example.com"], msg.as_string()) smtp.quit()

3. 合规与维护

  1. 遵守 robots 协议:1688 的robots.txt(https://www.1688.com/robots.txt)明确禁止爬取的路径需严格规避。
  2. 定期更新解析规则:1688 页面结构会频繁变更,需定期检查并调整 CSS 选择器 / XPath。
  3. 数据去重与清洗:通过商品链接、ID 等唯一键去重,对价格、销量等字段做格式清洗(如去除非数字字符)。
版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/6/23 4:34:58

Open-AutoGLM多语言适配技术内幕(仅限资深工程师查看)

第一章:Open-AutoGLM多语言支持开发实现为实现 Open-AutoGLM 框架的全球化应用,多语言支持成为核心功能之一。系统采用模块化设计,将语言资源与核心逻辑解耦,确保高可维护性与扩展性。国际化架构设计 系统基于 ICU 国际化标准构建…

作者头像 李华
网站建设 2026/6/23 20:29:08

【第65套】加油,同学们!

写在前面车门焊死,考研将至,准备冲刺!我将持续为大家更新25最新真题解析!学得快的同学可以和我一起,全力冲刺~注意,目前我只发布最新年份的真题,其他年份的真题,一个是很…

作者头像 李华
网站建设 2026/6/23 16:01:22

【紧急预警】Open-AutoGLM与旧系统兼容性问题正在摧毁生产环境?

第一章:Open-AutoGLM 与现有系统集成案例在企业级AI应用部署中,Open-AutoGLM 凭借其灵活的接口设计和标准化协议支持,已成功集成至多个异构系统环境中。以下展示其在典型业务场景中的实际对接方案。与企业CRM系统的自然语言工单处理集成 通过…

作者头像 李华
网站建设 2026/6/23 21:16:41

Linly-Talker支持动态光照渲染,提升画面质感

Linly-Talker 支持动态光照渲染,提升画面质感 在虚拟主播、AI客服和数字员工日益普及的今天,用户对数字人“像不像真人”越来越敏感。不只是嘴型能不能对上语音,更在于——这个虚拟形象有没有“灵魂”。而所谓“灵魂”,往往藏在细…

作者头像 李华
网站建设 2026/6/23 18:23:49

为什么你的Open-AutoGLM总是输出不准?3步定位提示词设计缺陷

第一章:为什么你的Open-AutoGLM总是输出不准?Open-AutoGLM 作为一款基于开源大语言模型的自动化推理框架,其输出准确性受多种因素影响。理解这些潜在问题源,是提升模型表现的关键。输入提示设计不合理 模糊或歧义的提示词会导致模…

作者头像 李华
网站建设 2026/6/23 11:52:18

【工业级AI系统设计指南】:基于Open-AutoGLM的任务层级拆解模型

第一章:工业级AI系统设计的核心挑战在构建工业级人工智能系统时,开发者面临远超实验室环境的复杂性。这类系统不仅要求高精度的模型表现,还需在稳定性、可扩展性和实时性之间取得平衡。模型部署与服务化 将训练好的模型集成到生产环境中&…

作者头像 李华