MyBatisPlus缓存机制?我们采用Redis加速推理队列
在AI服务日益普及的今天,一个看似简单的“文本转语音”请求背后,往往隐藏着复杂的系统挑战。用户点击“生成语音”的瞬间,系统不仅要快速响应,还要处理可能长达数秒甚至数十秒的模型推理任务。如果直接同步执行,轻则接口超时,重则数据库被打满——这正是我们在构建基于VoxCPM-1.5-TTS的语音合成平台时面临的真实困境。
我们的解决方案没有选择复杂的消息中间件或昂贵的服务治理框架,而是回归本质:用Redis这一把“瑞士军刀”,同时解决数据缓存与任务调度两大难题。通过深度整合MyBatisPlus与Redis,我们实现了一套轻量、高效且极具工程实用性的AI服务架构。
从ORM到分布式缓存:为什么默认缓存走不通?
MyBatisPlus作为Java生态中广受欢迎的持久层框架,自带了两级缓存机制。一级缓存在SqlSession生命周期内有效,适合单次会话中的重复查询;二级缓存则作用于Mapper命名空间级别,理论上可以跨会话共享结果。
但现实很骨感。默认的二级缓存使用PerpetualCache,底层是JVM堆内的HashMap,这意味着:
- 多实例部署时,每个节点都有自己独立的缓存副本;
- 没有过期策略,容易造成内存泄漏;
- 写操作后无法保证其他节点及时失效,极易出现脏读。
举个例子:当多个API实例同时处理TTS任务状态查询时,某个节点更新了任务为“已完成”,而其他节点仍从本地缓存返回“处理中”——这种不一致会直接导致前端轮询永无止境。
所以,真正的生产级应用必须打破本地缓存的局限,引入像Redis这样的外部存储作为统一的缓存中枢。
让MyBatisPlus真正支持分布式缓存
要让MyBatisPlus接入Redis,并非简单替换模板变量,而是需要深入其缓存扩展机制。
自定义RedisCache:不只是桥接,更是可控
MyBatis提供了Cache接口,允许开发者自定义缓存实现。我们编写了一个RedisCache类,不仅完成基础的数据读写,更关注以下几点工程细节:
public class RedisCache implements Cache { private final String id; private static RedisTemplate<String, Object> redisTemplate; public RedisCache(String id) { if (id == null) throw new IllegalArgumentException("Cache instances require an ID"); this.id = id; } @Override public void putObject(Object key, Object value) { if (key != null && value != null) { getRedisTemplate().opsForValue().set(key.toString(), value, Duration.ofMinutes(30)); } } @Override public Object getObject(Object key) { return key == null ? null : getRedisTemplate().opsForValue().get(key.toString()); } @Override public void clear() { Set<String> keys = getRedisTemplate().keys(this.id + ":*"); if (keys != null) { getRedisTemplate().delete(keys); } } }这里有几个关键设计点值得强调:
- 命名空间隔离:
clear()方法只清除当前Mapper相关的键(以namespace:*为前缀),避免误删其他模块缓存; - 自动过期:所有缓存项设置30分钟TTL,防止冷数据长期驻留;
- 延迟初始化:通过
SpringContextHolder.getBean()获取redisTemplate,规避构造器注入失败问题; - 序列化兼容性:要求实体类实现
Serializable,确保跨JVM传输安全。
然后在Mapper接口上启用该缓存:
@Mapper @CacheNamespace(implementation = RedisCache.class, flushInterval = 60000, size = 1024) public interface TTSTaskMapper extends BaseMapper<TTSTask> { }配合配置文件开启全局缓存:
mybatis-plus: configuration: cache-enabled: true这样一来,任何对TTSTaskMapper的查询都会优先走Redis,命中率高的场景下数据库压力可下降80%以上。
Redis不止是缓存:它还是轻量级任务队列
很多人习惯将任务队列交给RabbitMQ或Kafka,但在中小型AI服务中,这些组件反而带来了额外的运维负担。而Redis凭借其丰富的数据结构和高性能特性,完全可以胜任“兼职队列”的角色。
为什么选择Redis做推理队列?
| 维度 | Redis | 传统消息队列 |
|---|---|---|
| 部署成本 | 极低(已有缓存实例) | 高(需独立集群) |
| 延迟 | 微秒级 | 毫秒级起 |
| 功能完备性 | 中等(需自行补全ACK、重试) | 完善 |
| 学习曲线 | 平缓 | 较陡 |
对于我们的TTS服务来说,任务类型单一、吞吐适中、对极致可靠性要求不高,Redis是更务实的选择。
核心流程设计:List + Hash + TTL
我们利用三种数据结构协同工作:
- List (
tts_task_queue):作为FIFO队列,存放待处理的任务ID; - String (
tts_task:{id}):缓存完整任务对象,供Worker拉取详情; - Hash (
tts_result:{id}):存储推理结果及状态,支持字段级更新。
整个流程如下:
- 用户提交请求 → 后端生成唯一
taskId; - 将任务元数据存入
"tts_task:" + taskId,设置1小时过期; - 将
taskId推入LPUSH tts_task_queue; - Worker通过
BRPOP tts_task_queue 5阻塞监听; - 获取任务后调用本地TTS模型服务(如6006端口);
- 成功则写入
HSET tts_result:{id} status success url {url},并设1小时TTL; - 前端轮询
/result/{taskId}接口获取最新状态。
这种方式既解耦了请求接入与模型计算,又避免了数据库频繁更新带来的锁竞争。
关键代码落地:生产可用的细节打磨
Java侧:任务提交与幂等控制
@Service public class TTSTaskService { @Autowired private RedisTemplate<String, String> redisTemplate; public String submitTask(TTSTask task) { // 使用雪花算法或UUID确保全局唯一 String taskId = IdUtil.fastSimpleUUID(); task.setId(taskId); task.setStatus("pending"); String taskKey = "tts_task:" + taskId; // 先检查是否已存在(防重复提交) if (Boolean.TRUE.equals(redisTemplate.hasKey(taskKey))) { return taskId; // 或抛异常 } // 缓存任务信息 redisTemplate.opsForValue().set( taskKey, JSON.toJSONString(task), Duration.ofHours(1) ); // 入队(左侧推入) redisTemplate.opsForList().leftPush("tts_task_queue", taskId); return taskId; } }注意这里的幂等性设计:即使同一请求被多次触发,也只会入队一次。
Python Worker:稳定消费与错误捕获
import redis import json import requests r = redis.Redis(host='localhost', port=6379, db=0) while True: try: # 阻塞式拉取,节省CPU资源 result = r.brpop('tts_task_queue', timeout=5) if not result: continue # 超时继续循环 _, task_id_bytes = result task_id = task_id_bytes.decode('utf-8') task_data = r.get(f'tts_task:{task_id}') if not task_data: print(f"[WARN] Task {task_id} not found in cache") continue task = json.loads(task_data) # 调用TTS模型服务 resp = requests.post( 'http://localhost:6006/api/tts', json={ 'text': task['text'], 'voice_id': task.get('voice_id', 'default') }, timeout=30 ) resp.raise_for_status() audio_url = resp.json()['url'] # 写回结果 result_key = f'tts_result:{task_id}' r.hset(result_key, 'status', 'success') r.hset(result_key, 'url', audio_url) r.expire(result_key, 3600) # 1小时后自动清理 except Exception as e: # 错误也要记录,便于前端展示失败原因 err_key = f'tts_result:{task_id}' r.hset(err_key, 'status', 'failed') r.hset(err_key, 'error', str(e)[:500]) # 截断防止过大 r.expire(err_key, 3600)这个Worker进程可以水平扩展多个实例,共同消费同一个队列,天然支持负载均衡。
前端对接:轮询还是WebSocket?
虽然WebSocket体验更好,但在初期阶段,简单的HTTP轮询已经足够:
@GetMapping("/result/{taskId}") public ResponseEntity<?> getResult(@PathVariable String taskId) { String resultKey = "tts_result:" + taskId; if (!redisTemplate.hasKey(resultKey)) { return ResponseEntity.notFound().build(); } Map<Object, Object> result = redisTemplate.opsForHash().entries(resultKey); return ResponseEntity.ok(result); }前端每2秒轮询一次,直到status变为success或failed为止。未来可升级为Redis Pub/Sub模式,由Worker推送完成事件,进一步降低延迟。
架构全景与设计哲学
+------------------+ +---------------------+ | Web Frontend | <-> | Spring Boot API | +------------------+ +----------+----------+ | v +----------------------------+ | Redis (Cache & Queue) | | - tts_task_queue (List) | | - tts_task:{id} (String) | | - tts_result:{id} (Hash) | +--------------+-------------+ | v +------------------------------+ | TTS Inference Workers | | (VoxCPM-1.5-TTS Model) | | Port: 6006 | +------------------------------+这套架构的核心思想是极简主义下的功能复用:Redis不再只是缓存,而是承担了三大职责——
- 共享状态中心:替代数据库成为高频读写的首选存储;
- 异步任务队列:实现请求与计算的时空解耦;
- 结果暂存区:提供临时的结果访问入口,减轻OSS回源压力。
更重要的是,它避开了多组件集成的复杂性,在保证性能的同时极大降低了运维门槛。
工程实践建议:如何避免踩坑?
合理设置TTL
任务缓存建议1~2小时,结果缓存可根据业务需求设定(如24小时)。过长会占用内存,过短可能导致前端查不到结果。监控队列积压
定期通过LLEN tts_task_queue检查队列长度,超过阈值时告警扩容Worker。限流保护API入口
利用Redis实现令牌桶:java String limitKey = "rate_limit:" + ip; Long count = redisTemplate.execute((RedisCallback<Long>) conn -> conn.eval("redis.call('INCR', KEYS[1]); if tonumber(redis.call('GET', KEYS[1])) > tonumber(ARGV[1]) then return 0; else redis.call('EXPIRE', KEYS[1], ARGV[2]); return 1; end", Arrays.asList(limitKey.getBytes()), Arrays.asList("100", "3600")));考虑失败重试与死信机制
可在Hash中增加retry_count字段,超过阈值后转入dead_letter_queue人工排查。安全性补充
- 对外暴露的taskId应避免泄露业务含义(不要用自增ID);
- 敏感结果可通过签名URL访问,而非长期公开链接。
这种高度集成的设计思路,正引领着智能音频设备向更可靠、更高效的方向演进。