news 2026/1/14 0:22:41

Python爬虫实战:运用异步爬虫与智能解析技术抓取海量本地生活服务数据

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
Python爬虫实战:运用异步爬虫与智能解析技术抓取海量本地生活服务数据

引言

在数字经济时代,本地生活服务数据已成为企业决策和用户选择的重要依据。从餐饮点评、酒店预订到生活娱乐,这些数据蕴含着巨大的商业价值。本文将介绍如何使用Python最新爬虫技术,构建一个高效、稳定的本地生活服务数据采集系统,涵盖异步请求、智能解析、反爬对抗等核心技术。

技术栈概览

  • 请求库: aiohttp (异步HTTP客户端)

  • 解析库: parsel + lxml (支持XPath和CSS选择器)

  • 浏览器自动化: Playwright (处理动态加载内容)

  • 数据存储: PostgreSQL + SQLAlchemy ORM

  • 代理中间件: 智能代理轮换系统

  • 任务调度: Celery + Redis (分布式爬虫)

  • 数据清洗: Pandas + 正则表达式

核心代码实现

1. 异步基础爬虫类

python

import asyncio import aiohttp from abc import ABC, abstractmethod from typing import Dict, List, Optional, Any import logging from urllib.parse import urljoin import hashlib import json class AsyncBaseSpider(ABC): """异步爬虫基类""" def __init__(self, name: str, concurrency: int = 10): self.name = name self.concurrency = concurrency self.session: Optional[aiohttp.ClientSession] = None self.logger = logging.getLogger(f"spider.{name}") self.semaphore = asyncio.Semaphore(concurrency) # 默认请求头 self.default_headers = { 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8', 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8', 'Accept-Encoding': 'gzip, deflate, br', 'Connection': 'keep-alive', 'Upgrade-Insecure-Requests': '1', 'Sec-Fetch-Dest': 'document', 'Sec-Fetch-Mode': 'navigate', 'Sec-Fetch-Site': 'none', 'Sec-Fetch-User': '?1', 'Cache-Control': 'max-age=0', } async def init_session(self): """初始化aiohttp会话""" timeout = aiohttp.ClientTimeout(total=30) connector = aiohttp.TCPConnector( limit=self.concurrency * 2, ssl=False, force_close=True, enable_cleanup_closed=True ) self.session = aiohttp.ClientSession( connector=connector, timeout=timeout, headers=self.default_headers ) @abstractmethod async def start(self, **kwargs): """启动爬虫""" pass async def fetch(self, url: str, method: str = 'GET', **kwargs) -> Optional[str]: """异步请求页面""" async with self.semaphore: try: async with self.session.request( method=method, url=url, **kwargs ) as response: if response.status == 200: content = await response.read() # 自动检测编码 encoding = response.get_encoding() if not encoding: encoding = 'utf-8' return content.decode(encoding, errors='ignore') else: self.logger.warning( f"请求失败: {url}, 状态码: {response.status}" ) return None except Exception as e: self.logger.error(f"请求异常 {url}: {str(e)}") return None async def close(self): """关闭会话""" if self.session: await self.session.close() def generate_request_id(self, url: str, params: Dict = None) -> str: """生成请求ID用于去重""" data = url + (json.dumps(params, sort_keys=True) if params else '') return hashlib.md5(data.encode()).hexdigest() async def __aenter__(self): await self.init_session() return self async def __aexit__(self, exc_type, exc_val, exc_tb): await self.close()

2. 美团商家数据爬虫

python

import re import random import asyncio from datetime import datetime from typing import List, Dict, Any from parsel import Selector import pandas as pd from playwright.async_api import async_playwright class MeituanSpider(AsyncBaseSpider): """美团商家数据爬虫""" def __init__(self, city: str = '北京'): super().__init__(f'meituan_{city}', concurrency=5) self.city = city self.base_url = "https://www.meituan.com" self.api_url = "https://apimobile.meituan.com" # 商家数据结构 self.shop_fields = [ 'shop_id', 'shop_name', 'address', 'phone', 'latitude', 'longitude', 'avg_price', 'score', 'comment_count', 'category', 'business_hours', 'city', 'district', 'business_circle', 'has_coupon', 'discount_info', 'source', 'crawl_time' ] async def start(self, categories: List[str] = None, max_pages: int = 50): """启动爬虫""" if categories is None: categories = ['美食', '酒店', '休闲娱乐', '生活服务'] tasks = [] for category in categories: for page in range(1, max_pages + 1): task = self.crawl_category_page(category, page) tasks.append(task) # 并发执行所有任务 results = await asyncio.gather(*tasks, return_exceptions=True) # 处理结果 all_shops = [] for result in results: if isinstance(result, Exception): self.logger.error(f"任务执行失败: {result}") continue if result: all_shops.extend(result) return all_shops async def crawl_category_page(self, category: str, page: int) -> List[Dict]: """爬取分类页面""" params = { 'utm_source': 'shopList', 'ci': self._get_city_code(self.city), 'uuid': self._generate_uuid(), 'userid': '', 'limit': '20', 'offset': str((page - 1) * 20), 'cateId': self._get_category_id(category), 'token': '', 'partner': '126', 'platform': '3', 'riskLevel': '1', 'optimusCode': '10', '_token': self._generate_token(), } url = f"{self.api_url}/group/v4/poi/pcsearch/1" headers = self._get_api_headers() try: html = await self.fetch(url, method='GET', params=params, headers=headers) if html: shops = self.parse_search_result(html) return shops except Exception as e: self.logger.error(f"爬取失败 {category} 第{page}页: {e}") return [] def parse_search_result(self, html: str) -> List[Dict]: """解析搜索结果""" data = json.loads(html) shops = [] if data.get('code') == 0 and 'data' in data: for item in data['data']['searchResult']: shop = { 'shop_id': item.get('id'), 'shop_name': item.get('title', '').strip(), 'address': item.get('address', '').strip(), 'phone': item.get('phone', ''), 'latitude': item.get('latitude'), 'longitude': item.get('longitude'), 'avg_price': item.get('avgprice'), 'score': item.get('avgscore'), 'comment_count': item.get('comments'), 'category': item.get('backCateName', ''), 'business_hours': item.get('openinfo', ''), 'city': self.city, 'district': item.get('areaname', ''), 'business_circle': item.get('frontPoiName', ''), 'has_coupon': bool(item.get('deals')), 'discount_info': json.dumps(item.get('preferentialInfo', []), ensure_ascii=False), 'source': 'meituan', 'crawl_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S') } shops.append(shop) return shops async def crawl_shop_detail(self, shop_id: str) -> Dict: """爬取商家详情页(使用Playwright处理动态内容)""" detail_url = f"{self.base_url}/meishi/{shop_id}/" async with async_playwright() as p: # 启动浏览器(可配置为无头模式) browser = await p.chromium.launch( headless=True, args=['--disable-blink-features=AutomationControlled'] ) # 创建上下文 context = await browser.new_context( viewport={'width': 1920, 'height': 1080}, user_agent=self._get_random_user_agent() ) # 创建页面 page = await context.new_page() try: # 导航到页面 await page.goto(detail_url, wait_until='networkidle', timeout=30000) # 等待关键元素加载 await page.wait_for_selector('.dp-header', timeout=10000) # 获取页面内容 content = await page.content() # 解析详情数据 detail_data = self.parse_shop_detail(content) detail_data['shop_id'] = shop_id return detail_data except Exception as e: self.logger.error(f"爬取详情页失败 {shop_id}: {e}") return {} finally: await browser.close() def parse_shop_detail(self, html: str) -> Dict: """解析商家详情页""" selector = Selector(text=html) detail = { 'shop_images': [], 'recommended_dishes': [], 'environment_score': 0, 'service_score': 0, 'taste_score': 0, 'feature_tags': [], 'facilities': [], 'parking_info': '', 'reservation_info': '', } # 解析图片 images = selector.css('.shop-images img::attr(src)').getall() detail['shop_images'] = [urljoin(self.base_url, img) for img in images] # 解析推荐菜 dishes = selector.css('.recommend-dish .dish-name::text').getall() detail['recommended_dishes'] = [d.strip() for d in dishes if d.strip()] # 解析评分 scores = selector.css('.score-info span::text').getall() if len(scores) >= 3: detail['environment_score'] = float(scores[0] or 0) detail['service_score'] = float(scores[1] or 0) detail['taste_score'] = float(scores[2] or 0) # 解析特色标签 tags = selector.css('.feature-tags span::text').getall() detail['feature_tags'] = [tag.strip() for tag in tags if tag.strip()] return detail def _get_city_code(self, city_name: str) -> str: """获取城市代码""" city_codes = { '北京': '1', '上海': '10', '广州': '20', '深圳': '30', '杭州': '50', '成都': '60', '重庆': '70', '武汉': '80' } return city_codes.get(city_name, '1') def _get_category_id(self, category: str) -> str: """获取分类ID""" category_ids = { '美食': '1', '酒店': '12', '休闲娱乐': '5', '生活服务': '3', '购物': '4', '运动健身': '8' } return category_ids.get(category, '1') def _generate_uuid(self) -> str: """生成UUID""" return ''.join(random.choices('0123456789abcdef', k=32)) def _generate_token(self) -> str: """生成访问令牌""" import time timestamp = int(time.time() * 1000) return hashlib.md5(f"{timestamp}{random.random()}".encode()).hexdigest() def _get_api_headers(self) -> Dict: """获取API请求头""" headers = { 'Accept': 'application/json', 'Content-Type': 'application/json', 'Origin': 'https://www.meituan.com', 'Referer': 'https://www.meituan.com/', 'User-Agent': self._get_random_user_agent(), 'X-Requested-With': 'XMLHttpRequest', } return headers def _get_random_user_agent(self) -> str: """获取随机User-Agent""" user_agents = [ 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36', 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36', 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36', 'Mozilla/5.0 (iPhone; CPU iPhone OS 14_0 like Mac OS X) AppleWebKit/537.36', ] return random.choice(user_agents)

3. 数据存储与处理

python

from sqlalchemy import create_engine, Column, String, Integer, Float, Text, DateTime from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker import pandas as pd from contextlib import contextmanager import logging Base = declarative_base() class LocalServiceShop(Base): """本地生活服务商家数据模型""" __tablename__ = 'local_service_shops' id = Column(Integer, primary_key=True, autoincrement=True) shop_id = Column(String(100), unique=True, index=True, nullable=False) shop_name = Column(String(200), nullable=False) address = Column(Text) phone = Column(String(50)) latitude = Column(Float) longitude = Column(Float) avg_price = Column(Float) score = Column(Float) comment_count = Column(Integer) category = Column(String(100)) business_hours = Column(String(200)) city = Column(String(50)) district = Column(String(100)) business_circle = Column(String(100)) has_coupon = Column(Integer, default=0) discount_info = Column(Text) source = Column(String(50)) environment_score = Column(Float) service_score = Column(Float) taste_score = Column(Float) recommended_dishes = Column(Text) feature_tags = Column(Text) crawl_time = Column(DateTime) create_time = Column(DateTime, default=datetime.now) update_time = Column(DateTime, default=datetime.now, onupdate=datetime.now) class DatabaseManager: """数据库管理类""" def __init__(self, connection_string: str): self.engine = create_engine(connection_string, pool_size=20, max_overflow=30) self.SessionLocal = sessionmaker(bind=self.engine, expire_on_commit=False) self.logger = logging.getLogger('database') # 创建表 Base.metadata.create_all(bind=self.engine) @contextmanager def get_session(self): """获取数据库会话""" session = self.SessionLocal() try: yield session session.commit() except Exception as e: session.rollback() self.logger.error(f"数据库操作失败: {e}") raise finally: session.close() async def batch_insert_shops(self, shops: List[Dict]): """批量插入商家数据""" if not shops: return with self.get_session() as session: # 批量插入或更新 for shop_data in shops: # 检查是否已存在 existing = session.query(LocalServiceShop).filter_by( shop_id=shop_data['shop_id'], source=shop_data['source'] ).first() if existing: # 更新现有记录 for key, value in shop_data.items(): if hasattr(existing, key) and key != 'id': setattr(existing, key, value) existing.update_time = datetime.now() else: # 插入新记录 new_shop = LocalServiceShop(**shop_data) session.add(new_shop) self.logger.info(f"成功处理 {len(shops)} 条商家数据") def export_to_excel(self, filepath: str, city: str = None): """导出数据到Excel""" with self.get_session() as session: query = session.query(LocalServiceShop) if city: query = query.filter_by(city=city) df = pd.read_sql(query.statement, session.bind) # 数据清洗 df['discount_info'] = df['discount_info'].apply( lambda x: json.loads(x) if x else [] ) # 导出Excel with pd.ExcelWriter(filepath, engine='openpyxl') as writer: df.to_excel(writer, sheet_name='商家数据', index=False) # 添加统计信息 stats = self._calculate_statistics(df) stats_df = pd.DataFrame([stats]) stats_df.to_excel(writer, sheet_name='统计信息', index=False) def _calculate_statistics(self, df: pd.DataFrame) -> Dict: """计算统计信息""" if df.empty: return {} return { '总商家数': len(df), '平均评分': df['score'].mean(), '平均价格': df['avg_price'].mean(), '总评论数': df['comment_count'].sum(), '分类数量': df['category'].nunique(), '有优惠商家数': df['has_coupon'].sum(), }

4. 分布式任务调度

python

from celery import Celery from celery.schedules import crontab import redis from typing import List, Dict class DistributedCrawler: """分布式爬虫调度器""" def __init__(self, redis_url: str = 'redis://localhost:6379/0'): self.celery_app = Celery( 'crawler_tasks', broker=redis_url, backend=redis_url, include=['crawler.tasks'] ) # 配置Celery self.celery_app.conf.update( task_serializer='json', accept_content=['json'], result_serializer='json', timezone='Asia/Shanghai', enable_utc=True, task_routes={ 'crawl_meituan': {'queue': 'meituan'}, 'crawl_dianping': {'queue': 'dianping'}, 'crawl_eleme': {'queue': 'eleme'}, }, beat_schedule={ 'daily-crawl-meituan': { 'task': 'crawl_meituan', 'schedule': crontab(hour=2, minute=0), # 每天凌晨2点 'args': (['北京', '上海', '广州', '深圳'],), }, 'weekly-full-crawl': { 'task': 'crawl_all_platforms', 'schedule': crontab(hour=3, minute=0, day_of_week=0), # 每周日3点 }, } ) self.redis_client = redis.from_url(redis_url) def start_crawler(self, platform: str, cities: List[str], categories: List[str] = None): """启动爬虫任务""" task_map = { 'meituan': 'crawl_meituan', 'dianping': 'crawl_dianping', 'eleme': 'crawl_eleme', } if platform not in task_map: raise ValueError(f"不支持的平台: {platform}") task_name = task_map[platform] task = self.celery_app.send_task( task_name, args=[cities, categories], kwargs={} ) return task.id def monitor_progress(self, task_id: str) -> Dict: """监控任务进度""" task_result = self.celery_app.AsyncResult(task_id) # 从Redis获取详细进度 progress_key = f"crawler:progress:{task_id}" progress = self.redis_client.hgetall(progress_key) return { 'task_id': task_id, 'status': task_result.status, 'result': task_result.result if task_result.ready() else None, 'progress': progress, }

5. 反爬虫对抗策略

python

import random import time from typing import Optional from fake_useragent import UserAgent import asyncio class AntiAntiCrawler: """反反爬虫策略管理器""" def __init__(self): self.ua = UserAgent() self.proxy_pool = [] self.request_history = [] self.cookie_jars = {} async def get_proxy(self) -> Optional[str]: """获取代理IP""" if not self.proxy_pool: await self.refresh_proxy_pool() if self.proxy_pool: proxy = random.choice(self.proxy_pool) # 验证代理可用性 if await self.validate_proxy(proxy): return proxy return None async def refresh_proxy_pool(self): """刷新代理池""" # 从代理服务商获取代理 sources = [ 'http://api.proxy.com/v1/proxies', 'http://proxy-pool.example.com/get', ] for source in sources: try: async with aiohttp.ClientSession() as session: async with session.get(source, timeout=10) as response: if response.status == 200: data = await response.json() self.proxy_pool.extend(data.get('proxies', [])) except Exception as e: print(f"获取代理失败 {source}: {e}") # 去重 self.proxy_pool = list(set(self.proxy_pool)) async def validate_proxy(self, proxy: str) -> bool: """验证代理可用性""" try: async with aiohttp.ClientSession() as session: async with session.get( 'http://httpbin.org/ip', proxy=f"http://{proxy}", timeout=5 ) as response: return response.status == 200 except: return False def get_random_headers(self, referer: str = None) -> dict: """生成随机请求头""" headers = { 'User-Agent': self.ua.random, 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8', 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8', 'Accept-Encoding': 'gzip, deflate', 'Connection': 'keep-alive', 'Upgrade-Insecure-Requests': '1', } if referer: headers['Referer'] = referer # 随机添加更多头部信息 extra_headers = { 'Sec-Fetch-Dest': 'document', 'Sec-Fetch-Mode': 'navigate', 'Sec-Fetch-Site': 'same-origin', 'Sec-Fetch-User': '?1', 'Cache-Control': 'max-age=0', 'DNT': '1', } if random.random() > 0.5: headers.update(extra_headers) return headers def get_random_delay(self, base_delay: float = 1.0, random_range: float = 2.0) -> float: """生成随机延迟""" return base_delay + random.random() * random_range def rotate_cookie(self, domain: str) -> dict: """轮换Cookie""" if domain not in self.cookie_jars: self.cookie_jars[domain] = self._generate_cookies() return self.cookie_jars[domain] def _generate_cookies(self) -> dict: """生成随机Cookie""" cookies = { 'Hm_lvt_' + ''.join(random.choices('abcdef0123456789', k=32)): str(int(time.time())), 'Hm_lpvt_' + ''.join(random.choices('abcdef0123456789', k=32)): str(int(time.time())), '__guid': ''.join(random.choices('0123456789abcdef', k=32)), 'monitor_count': str(random.randint(1, 100)), } return cookies

6. 主程序入口

python

import asyncio import argparse import logging from typing import List import sys class MainCrawler: """主爬虫程序""" def __init__(self): self.setup_logging() self.logger = logging.getLogger(__name__) def setup_logging(self): """配置日志""" logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler('crawler.log', encoding='utf-8'), logging.StreamHandler(sys.stdout) ] ) async def run(self, platform: str, cities: List[str], categories: List[str], max_pages: int = 10): """运行爬虫""" self.logger.info(f"开始爬取 {platform} 数据") # 根据平台选择爬虫 spider_map = { 'meituan': MeituanSpider, 'dianping': DianpingSpider, 'eleme': ElemeSpider, } if platform not in spider_map: self.logger.error(f"不支持的平台: {platform}") return SpiderClass = spider_map[platform] all_results = [] for city in cities: self.logger.info(f"开始爬取城市: {city}") async with SpiderClass(city) as spider: results = await spider.start( categories=categories, max_pages=max_pages ) all_results.extend(results) self.logger.info(f"城市 {city} 爬取完成,获取 {len(results)} 条数据") # 避免请求过快 await asyncio.sleep(2) # 保存数据 if all_results: db_manager = DatabaseManager('postgresql://user:password@localhost/local_service') await db_manager.batch_insert_shops(all_results) # 导出Excel export_file = f"本地生活数据_{platform}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.xlsx" db_manager.export_to_excel(export_file) self.logger.info(f"数据保存完成,共 {len(all_results)} 条记录") self.logger.info(f"数据已导出到: {export_file}") return all_results def main(): """命令行入口""" parser = argparse.ArgumentParser(description='本地生活服务数据爬虫') parser.add_argument('--platform', '-p', required=True, choices=['meituan', 'dianping', 'eleme'], help='选择爬取平台') parser.add_argument('--cities', '-c', nargs='+', default=['北京'], help='选择城市列表') parser.add_argument('--categories', '-cat', nargs='+', default=['美食', '酒店', '休闲娱乐'], help='选择分类') parser.add_argument('--pages', type=int, default=10, help='每类爬取页数') parser.add_argument('--output', '-o', default='output.xlsx', help='输出文件路径') args = parser.parse_args() # 创建并运行爬虫 crawler = MainCrawler() try: asyncio.run(crawler.run( platform=args.platform, cities=args.cities, categories=args.categories, max_pages=args.pages )) except KeyboardInterrupt: print("\n用户中断程序") except Exception as e: print(f"程序运行错误: {e}") logging.exception("程序异常") if __name__ == '__main__': main()

部署与优化建议

1. Docker容器化部署

dockerfile

# Dockerfile FROM python:3.9-slim WORKDIR /app # 安装系统依赖 RUN apt-get update && apt-get install -y \ gcc \ g++ \ libpq-dev \ wget \ gnupg \ && wget -q -O - https://dl-ssl.google.com/linux/linux_signing_key.pub | apt-key add - \ && sh -c 'echo "deb [arch=amd64] http://dl.google.com/linux/chrome/deb/ stable main" >> /etc/apt/sources.list.d/google.list' \ && apt-get update \ && apt-get install -y google-chrome-stable \ && rm -rf /var/lib/apt/lists/* # 安装Python依赖 COPY requirements.txt . RUN pip install --no-cache-dir -r requirements.txt # 复制代码 COPY . . # 运行爬虫 CMD ["python", "main.py", "--platform", "meituan", "--cities", "北京", "上海"]

2. 性能优化建议

  1. 连接池优化: 调整aiohttp的连接池参数

  2. 内存管理: 使用生成器处理大数据量

  3. 缓存策略: Redis缓存已爬取数据

  4. 错误重试: 实现指数退避重试机制

  5. 监控告警: 集成Prometheus监控

法律与道德声明

  1. 遵守robots.txt: 尊重网站的爬虫协议

  2. 限制爬取频率: 避免对目标网站造成压力

  3. 数据使用规范: 仅用于学习和研究目的

  4. 隐私保护: 不爬取用户个人信息

  5. 版权尊重: 遵守数据版权规定

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/1/10 14:12:23

Python爬虫实战:使用异步技术高效爬取图标资源网站

引言:图标资源的数字化价值在当今数字化时代,图标资源已成为UI/UX设计、应用程序开发和网页制作中不可或缺的组成部分。优秀的图标资源网站汇集了成千上万的设计师作品,为开发者提供了丰富的视觉元素。然而,手动下载这些资源既耗时…

作者头像 李华
网站建设 2026/1/9 5:28:28

为什么你的ZGC不生效?可能是没用对这3个内存泄漏检测利器

第一章:为什么你的ZGC不生效?可能是没用对这3个内存泄漏检测利器当启用ZGC(Z Garbage Collector)后仍出现长时间停顿或内存持续增长,往往不是ZGC本身失效,而是潜在的内存泄漏拖累了整体性能。许多开发者误以…

作者头像 李华
网站建设 2026/1/12 21:36:40

Sonic数字人助力新闻播报自动化,提升媒体生产效率

Sonic数字人助力新闻播报自动化,提升媒体生产效率 在媒体内容需求日益高频化、个性化的今天,传统新闻制作模式正面临巨大挑战。一条完整的新闻视频不仅需要主持人出镜录制,还涉及灯光布景、摄像剪辑、音画同步等多个环节,流程冗长…

作者头像 李华
网站建设 2026/1/8 19:06:43

内容战略的维度升迁——从“可被发现”到“值得生成”的进化之路

引言:当内容的价值被重新定义 在传统互联网时代,内容营销的核心逻辑是“创建-分发-被发现”。一篇优质博客文章、一则精彩视频或一份详尽白皮书,通过搜索引擎优化和社交媒体传播,最终目标是吸引用户点击、访问和转化。然而&#…

作者头像 李华