news 2026/3/10 6:13:21

项目1-通过RocketMQ 将短链接统计

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
项目1-通过RocketMQ 将短链接统计

这是一份关于 “短链接访问统计系统”(基于 RocketMQ)的笔记,整合了我们之前讨论的所有核心知识点、代码逻辑、设计思想和技术细节,方便你系统复习和查阅。


短链接访问统计系统(基于 RocketMQ)笔记

一、系统核心目标
  • 核心功能:记录短链接的每一次访问,并进行多维度的统计分析(PV/UV/UIP、地域、设备、浏览器、操作系统等)。
  • 核心挑战
    1. 高性能:短链接跳转是核心入口,必须保证用户访问速度快。
    2. 高并发:可能面临瞬间大量的访问请求(如热点链接)。
    3. 数据可靠:每一次访问的统计数据都不能丢失,也不能重复计算。
    4. 系统解耦:统计功能不能影响核心的跳转功能。
二、技术架构与核心组件

整个系统采用生产者 - 消费者(Producer-Consumer)模式,核心组件如下:

  1. 生产者(Producer)ShortLinkStatsSaveProducer

    • 角色:在短链接被访问时,负责收集访问数据并发送到消息队列。
    • 核心任务:将同步的统计入库操作,转化为异步的消息发送。
  2. 消息队列(Message Queue):RocketMQ

    • 角色:作为生产者和消费者之间的 “桥梁”,存储和转发消息。
    • 核心任务:解耦、削峰填谷、保证消息可靠传输。
  3. 消费者(Consumer)ShortLinkStatsSaveConsumer

    • 角色:监听消息队列,消费统计消息,并将数据持久化到数据库。
    • 核心任务:执行耗时的统计入库操作,保证数据最终一致性。
  4. 幂等处理器(Idempotent Handler)MessageQueueIdempotentHandler

    • 角色:基于 Redis 实现,防止同一条消息被重复消费,导致统计数据重复。
    • 核心任务:保证消费的幂等性。
  5. 分布式锁(Distributed Lock):Redisson RReadWriteLock

    • 角色:在消费者入库时,保证并发场景下数据的一致性。
    • 核心任务:防止在统计过程中,短链接的 GID 被修改,导致数据归属错误。
三、核心流程详解
1. 生产者流程 (ShortLinkStatsSaveProducer)

java

运行

public void send(Map<String, String> producerMap) { // 1. 生成唯一的消息Key(UUID),用于幂等性保证 String keys = UUID.randomUUID().toString(); producerMap.put("keys", keys); // 2. 构建RocketMQ消息,设置消息体和消息头 Message<Map<String, String>> build = MessageBuilder .withPayload(producerMap) // 消息体:包含统计数据 .setHeader(MessageConst.PROPERTY_KEYS, keys) // 消息头:设置消息Key .build(); try { // 3. 同步发送消息到指定的Topic SendResult sendResult = rocketMQTemplate.syncSend(statsSaveTopic, build, 2000L); log.info("消息发送成功,ID: {}, Keys: {}", sendResult.getMsgId(), keys); } catch (Throwable ex) { log.error("消息发送失败", ex); // 可扩展:发送失败后的重试或告警逻辑 } }
  • 关键操作
    • 生成消息 Key:使用UUID.randomUUID(),确保每条消息的唯一性,是实现幂等的基础。
    • 同步发送 (syncSend):最可靠的发送方式。生产者会等待 Broker 返回发送结果,确保消息至少被 Broker 接收一次。
    • 设置超时时间2000L(2 秒),防止生产者无限期阻塞。
2. 消费者流程 (ShortLinkStatsSaveConsumer)

java

运行

@Override public void onMessage(Map<String, String> producerMap) { String keys = producerMap.get("keys"); // ========== 核心步骤1:幂等校验 ========== if (!messageQueueIdempotentHandler.isMessageProcessed(keys)) { if (messageQueueIdempotentHandler.isAccomplish(keys)) { return; // 消息已处理完成,直接返回 } throw new ServiceException("消息处理中,需要重试"); // 触发MQ重试 } try { String fullShortUrl = producerMap.get("fullShortUrl"); if (StrUtil.isNotBlank(fullShortUrl)) { String gid = producerMap.get("gid"); ShortLinkStatsRecordDTO statsRecord = JSON.parseObject(producerMap.get("statsRecord"), ShortLinkStatsRecordDTO.class); // ========== 核心步骤2:执行业务逻辑 ========== actualSaveShortLinkStats(fullShortUrl, gid, statsRecord); } } catch (Throwable ex) { log.error("消费异常", ex); try { // ========== 核心步骤3:异常处理 ========== messageQueueIdempotentHandler.delMessageProcessed(keys); // 删除幂等标识,允许重试 } catch (Throwable remoteEx) { log.error("删除幂等标识失败", remoteEx); } throw ex; // 抛出异常,触发RocketMQ重试 } // ========== 核心步骤4:标记完成 ========== messageQueueIdempotentHandler.setAccomplish(keys); // 标记消息处理完成 }
  • 关键操作
    • 幂等校验:通过MessageQueueIdempotentHandler确保消息只被处理一次。
    • 执行业务逻辑:调用actualSaveShortLinkStats方法,将统计数据入库。
    • 异常处理
      • 消费失败时,必须删除幂等标识,否则消息将无法被重试。
      • 抛出异常,触发 RocketMQ 的重试机制。
    • 标记完成:消费成功后,标记消息为 “已完成”,防止后续重复消费。
3. 实际入库逻辑 (actualSaveShortLinkStats)

java

运行

public void actualSaveShortLinkStats(String fullShortUrl, String gid, ShortLinkStatsRecordDTO statsRecord) { // ========== 核心步骤1:加分布式读锁 ========== RReadWriteLock readWriteLock = redissonClient.getReadWriteLock(String.format(LOCK_GID_UPDATE_KEY, fullShortUrl)); RLock rLock = readWriteLock.readLock(); rLock.lock(); // 加锁 try { // 1. 补全GID(如果生产者未传入) if (StrUtil.isBlank(gid)) { ShortLinkGotoDO shortLinkGotoDO = shortLinkGotoMapper.selectOne(Wrappers.lambdaQuery(ShortLinkGotoDO.class) .eq(ShortLinkGotoDO::getFullShortUrl, fullShortUrl)); gid = shortLinkGotoDO.getGid(); } // 2. 解析时间维度(小时、星期) int hour = DateUtil.hour(new Date(), true); int weekValue = DateUtil.dayOfWeekEnum(new Date()).getIso8601Value(); // ========== 核心步骤2:多维度统计入库 ========== // a. PV/UV/UIP统计 LinkAccessStatsDO linkAccessStatsDO = LinkAccessStatsDO.builder()...build(); linkAccessStatsMapper.shortLinkStats(linkAccessStatsDO); // 自定义的增量更新方法 // b. 地域统计(调用高德API) Map<String, Object> localeParamMap = new HashMap<>(); localeParamMap.put("key", statsLocaleAmapKey); localeParamMap.put("ip", statsRecord.getRemoteAddr()); String localeResultStr = HttpUtil.get(AMAP_REMOTE_URL, localeParamMap); // ... 解析结果并入库 ... // c. 操作系统、浏览器、设备、网络等统计(类似) LinkOsStatsDO linkOsStatsDO = LinkOsStatsDO.builder()...build(); linkOsStatsMapper.shortLinkOsState(linkOsStatsDO); // ... // d. 原始访问日志 LinkAccessLogsDO linkAccessLogsDO = LinkAccessLogsDO.builder()...build(); linkAccessLogsMapper.insert(linkAccessLogsDO); // e. 更新短链接核心表的总统计 shortLinkMapper.incrementStats(gid, fullShortUrl, 1, ...); // f. 今日统计 LinkStatsTodayDO linkStatsTodayDO = LinkStatsTodayDO.builder()...build(); linkStatsTodayMapper.shortLinkTodayState(linkStatsTodayDO); } catch (Throwable ex) { log.error("统计入库异常", ex); } finally { // ========== 核心步骤3:释放锁 ========== rLock.unlock(); // 最终释放锁,避免死锁 } }
  • 关键操作
    • 加分布式读锁
      • 锁的 KeyLOCK_GID_UPDATE_KEY + fullShortUrl,保证锁的粒度是单个短链接,避免全局锁。
      • 读锁 (Read Lock)
        • 允许多个读操作并发执行(多个统计线程可以同时处理同一个短链接)。
        • 阻塞写操作(修改 GID 的操作),保证在统计过程中 GID 不会被修改。
    • 多维度统计入库
      • 增量更新:大部分统计表(如link_access_stats)使用自定义的shortLinkStats方法,实现 “不存在则插入,存在则更新(累加)” 的逻辑,避免全量更新的性能问题。
      • 原始日志link_access_logs表直接插入原始访问记录,用于后续的明细查询和数据分析。
    • 释放锁:在finally块中释放锁,确保无论代码是否异常,锁都能被释放,防止死锁。
4. 幂等处理器 (MessageQueueIdempotentHandler)

基于 Redis 的SETNXsetIfAbsent)命令实现,保证分布式环境下的原子性。

方法名作用Redis KeyValue核心逻辑
isMessageProcessed判断消息是否可被处理short-link:idempotent:{keys}0使用setIfAbsent尝试设置 Key。-true:Key 不存在,消息可处理,设置 Value 为0(处理中)。-false:Key 已存在,消息不可处理。
isAccomplish判断消息是否处理完成short-link:idempotent:{keys}1检查 Key 对应的 Value 是否为1
setAccomplish标记消息处理完成short-link:idempotent:{keys}1将 Value 设置为1,并设置过期时间。
delMessageProcessed删除幂等标识short-link:idempotent:{keys}-删除 Key,允许消息被重试。
  • 核心思想先占坑,后处理
    1. 处理前,用SETNX占坑(Value=0)。
    2. 处理中,其他线程看到坑被占,要么等待要么拒绝。
    3. 处理成功,将坑标记为完成(Value=1)。
    4. 处理失败,把坑让出来(删除 Key)。
四、为什么选择 RocketMQ?
  1. 异步解耦:将耗时的统计入库操作从同步的跳转流程中剥离出来,极大提升了核心接口的响应速度。
  2. 削峰填谷:面对突发的高并发访问,RocketMQ 可以缓冲大量消息,避免直接冲击数据库,保证系统稳定。
  3. 消息可靠性
    • 生产者同步发送:确保消息至少被 Broker 接收一次。
    • Broker 持久化:消息存储在磁盘,即使 Broker 宕机,消息也不会丢失。
    • 消费者重试机制:消费失败时,RocketMQ 会自动重试,保证消息最终被处理。
  4. 负载均衡:RocketMQ 的消费者组(Consumer Group)机制,可以轻松实现多个消费者实例共同消费一个 Topic 的消息,提高处理能力。
  5. 可扩展性
    • 水平扩展:可以通过增加 Broker 节点和消费者实例来提升系统的吞吐量。
    • 功能扩展:RocketMQ 支持定时消息、事务消息等高级特性,便于未来功能扩展。
五、核心技术亮点与设计模式
  1. 读写锁分离
    • 使用 Redisson 的RReadWriteLock,统计入库时加读锁,修改 GID 时加写锁
    • 好处:允许多个统计操作并发执行,同时保证 GID 不被并发修改,兼顾了性能和数据一致性。
  2. 幂等性设计
    • 基于 Redis 的SETNX命令,是分布式系统中实现幂等的经典方案。
    • 好处:有效防止了因网络抖动或 MQ 重试导致的重复消费问题,保证了统计数据的准确性。
  3. 增量更新
    • 统计表的INSERT OR UPDATE操作(如linkAccessStatsMapper.shortLinkStats)。
    • 好处:相比先查询后更新,减少了一次数据库交互,提升了入库性能。
  4. 最小锁粒度
    • 锁的 Key 是fullShortUrl,而不是全局锁。
    • 好处:只对同一个短链接的操作进行同步,不同短链接的操作互不影响,最大化了并发性能。
  5. 异常处理与重试
    • 消费失败时,删除幂等标识并抛出异常,触发 MQ 重试。
    • 好处:保证了消息的最终一致性,即使中间环节出错,数据也不会丢失。
六、总结

这套短链接统计系统是一个高性能、高可用、高并发的分布式系统设计典范。它巧妙地运用了 RocketMQ 实现异步解耦和削峰填谷,通过 Redis 实现了分布式锁和幂等性保证,最终达到了 “用户访问快、统计数据准、系统运行稳” 的目标。

核心设计思想可以概括为:将同步操作异步化,将串行操作并行化,在性能和数据一致性之间找到最佳平衡点。

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/3/10 1:47:39

DUT硬件调试接口集成:JTAG与UART配置指南

DUT调试接口实战&#xff1a;JTAG与UART如何协同构建可靠调试链路你有没有遇到过这样的场景&#xff1f;新板子第一次上电&#xff0c;烧录完固件却毫无反应——串口没输出、JTAG连不上、LED也不闪。这时候&#xff0c;是电源问题&#xff1f;晶振坏了&#xff1f;还是Bootload…

作者头像 李华
网站建设 2026/3/8 1:27:32

金仓数据库MongoDB兼容版深度评测:从性能到实战的全面解析

一、引言:数字化转型下的数据库新选择 如今企业数字化转型进入深水区,大家对数据库的要求早就不是"能存能取"那么简单。文档数据库因为天生适合处理半结构化数据,成了很多现代应用的标配。可现实情况是,随着技术自主可控、供应链安全成为必答题,再加上业务常常需要同…

作者头像 李华
网站建设 2026/3/8 18:21:21

PyTorch-CUDA-v2.6镜像适配主流GPU,训练速度提升3倍以上

PyTorch-CUDA-v2.6镜像适配主流GPU&#xff0c;训练速度提升3倍以上 在深度学习项目从实验室走向生产的今天&#xff0c;一个常见的痛点是&#xff1a;为什么同样的模型代码&#xff0c;在同事的机器上跑得飞快&#xff0c;而在自己的环境里却频频报错、训练缓慢&#xff1f;答…

作者头像 李华
网站建设 2026/3/3 15:48:04

Anaconda配置PyTorch环境太难?试试预装CUDA的v2.6镜像

告别环境配置噩梦&#xff1a;用预装CUDA的PyTorch镜像加速AI开发 在深度学习项目中&#xff0c;你是否曾经历过这样的场景&#xff1f; 刚拿到一台新服务器&#xff0c;兴致勃勃准备训练模型&#xff0c;结果 torch.cuda.is_available() 返回了 False。 翻文档、查社区、试了十…

作者头像 李华
网站建设 2026/3/2 23:51:15

基于双层优化的微电网系统规划容量配置方法

基于双层优化的微电网系统规划容量配置方法 摘要&#xff1a;与目前大部分的微网优化调度代码不同&#xff0c;本代码主要做的是微网的多电源容量优化配置&#xff0c;规划出最佳的微电网光伏、风电、储能等多电源的容量配置方案&#xff0c;此外&#xff0c;代码采用双层模型&…

作者头像 李华
网站建设 2026/3/9 22:37:07

vscode 是盈利的吗?微软为什么要持续投入开发资源?

开源不代表不盈利&#xff0c;vscode 本身就是一个非常好的流量入口&#xff0c;程序员用户数多了&#xff0c;自然商业变现的方式就多了。比如通过插件市场提供Copilot等增值服务&#xff0c;为他的大哥Visual Studio引流&#xff0c;为Azure引流。总体来说&#xff1a;先免费…

作者头像 李华