news 2026/1/12 2:13:02

Sharding分库分表复杂SQL之数据源路由

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
Sharding分库分表复杂SQL之数据源路由

Sharding分库分表复杂SQL之数据源路由

  • 一、问题及分析
    • 1. 背景
    • 2. 方案
  • 二、数据源动态切换
    • 1. 配置及代码实现
    • 2. 动态数据源使用
    • 3. `事务拦截器TransactionInterceptor`
    • 4. 数据源动态切换流程图
  • 三、关于事务管理
    • 1. 混合事务
      • 方案一:分布式事务
      • 方案二:拆分事务(妥协方案)

一、问题及分析

1. 背景

数据库中有20张业务表,其中 有两张表(订单表、交易明细表)因为数据量太大进行了分库分表,其余18张表保持单库单表结构。现有系统架构变为:Spring Boot + Spring Cloud + ShardingSphere 的微服务系统。但是,ShardingSphere 对【复杂 SQL】(如多表关联、子查询、窗口函数等)支持度不足 和 性能问题。对此有这样的需求:既需要支持复杂查询,又要对分片表进行有效的水平拆分。


2. 方案

混合数据源方案:在部分表分片、部分表不分片的场景下,混合使用不同数据源(这是一种标准做法)。

具体就是使用Spring的AbstractRoutingDataSource来动态路由数据源(自定义一个路由数据源,继承AbstractRoutingDataSource)。

我们需要在两个数据源之间做路由:一个是ShardingSphereDataSource(负责分片表,也就是分库分表的表),另一个是普通数据源defaultDataSource(负责其他 非分片表)。

但是,又有新的问题如何决定使用哪个数据源

  • 方案一:通过解析sql,判断是否涉及分片表,从而决定要使用哪个数据源。
    • 该方案的缺点:通过解析SQL去判断是否涉及分片表可能比较复杂,而且解析可能不准确。
    • 该方案存在的问题:通常来说,我们会将业务逻辑放在 Service 层,具体要去 操作哪几张表、用哪个数据源,都是业务逻辑决定的。所以说,数据源切换 应该在 “方法开始前” 决定,而不是在 SQL 执行前。方案一不考虑。
  • 方案二通过AOP切面在Service层 根据方法名或注解来切换数据源。当然,像这样的处理方案是需要写入项目的编码规则文档中。
    • 缺点:需要开发人员明确指定每个方法使用哪个数据源。

注意事项

  • ① 如果一条SQL只操作非分片表,就用defaultDataSource;如果一条SQL只操作分片表,就用ShardingDataSource;但是如果一条SQL同时操作了分片表和非分片表,那么就需要同时使用两个数据源,这就会涉及到 【分布式事务】。假设目前没有同时操作两个数据源的业务,所以暂不讨论这种情况。
  • ② 目前没有同时操作两个数据源的情况,也就是说,在业务上能够保证 在同一个事务中不会同时操作两个数据源,所以说,我们就可以使用Spring的事务管理,我们可以配置两个事务管理器,并且指定每个事务管理器对应哪个数据源。这样做方便后期扩展,当然目前还不需要,可以不用配置,直接使用 @Transaction注解即可。
  • ③ Spring事务管理是通过AOP实现的,而我们的数据源动态路由也是通过AOP实现的。所以说,数据源动态路由切面 要在 事务切面 之前执行,否则会导致数据源切换失效。我们可以通过调整切面顺序来解决,通常使用@Order注解。

二、数据源动态切换

  • 可以先看一下第4小节【4. 数据源动态切换流程图】。

1. 配置及代码实现

使用AbstractRoutingDataSource+AOP的方案:

1). 数据源配置

确保两个数据源使用独立的连接池配置

# application.ymlspring:# 默认数据源配置(用于不分片的18张表)datasource:url:jdbc:mysql://localhost:3306/db0username:rootpassword:rootdriver-class-name:com.mysql.cj.jdbc.Driver# ShardingSphere 数据源配置(用于分片的2张表)shardingsphere:datasource:names:ds0,ds1ds0:url:jdbc:mysql://localhost:3306/db0ds1:url:jdbc:mysql://localhost:3307/db1# ... 其他ShardingSphere配置

2). 数据源类型枚举

publicenumDataSourceType{DEFAULT,// 默认数据源(不分片表)SHARDING// ShardingSphere数据源(分片表)}

3). 数据源上下文

publicclassDataSourceContextHolder{// ThreadLocal存储当前要切到哪个数据源privatestaticfinalThreadLocal<DataSourceType>CONTEXT_HOLDER=newThreadLocal<>();publicstaticvoidsetDataSourceType(DataSourceTypedataSourceType){CONTEXT_HOLDER.set(dataSourceType);}publicstaticDataSourceTypegetDataSourceType(){returnCONTEXT_HOLDER.get();}publicstaticvoidclearDataSourceType(){CONTEXT_HOLDER.remove();}publicstaticbooleanisShardingDataSource(){returnDataSourceType.SHARDING.equals(CONTEXT_HOLDER.get());}}

4). 自定义数据源路由

// 继承AbstractRoutingDataSource@ComponentpublicclassDynamicDataSourceextendsAbstractRoutingDataSource{@OverrideprotectedObjectdetermineCurrentLookupKey(){returnDataSourceContextHolder.getDataSourceType();}}

5). 数据源配置类

@ConfigurationpublicclassDataSourceConfig{// ==================== 1. 默认数据源(Spring Boot自动配置的数据源,不分片表使用)@Bean("defaultDataSource")@ConfigurationProperties(prefix="spring.datasource")publicDataSourcedefaultDataSource(){returnDataSourceBuilder.create().build();}// ==================== 2. ShardingSphere数据源配置// ShardingSphere数据源(分片表使用)@Bean("shardingDataSource")publicDataSourceshardingDataSource()throwsSQLException{// 通过 Yaml 的方式创建数据源URLyamlResource=ClassLoader.getSystemClassLoader().getResource("sharding.yaml");returnYamlShardingSphereDataSourceFactory.createDataSource(newFile(yamlResource.toURI()));}// ==================== 3. 动态数据源配置 ==========@Primary@Bean("dynamicDataSource")publicDataSourcedataSource(@Qualifier("defaultDataSource")DataSourcedefaultDataSource,@Qualifier("shardingDataSource")DataSourceshardingDataSource){// 创建DynamicDataSource,绑定两个数据源DynamicDataSourcedynamicDataSource=newDynamicDataSource();// 设置数据源映射Map<Object,Object>targetDataSources=newHashMap<>();targetDataSources.put(DataSourceType.DEFAULT,defaultDataSource);targetDataSources.put(DataSourceType.SHARDING,shardingDataSource);dynamicDataSource.setTargetDataSources(targetDataSources);// 设置默认数据源dynamicDataSource.setDefaultTargetDataSource(defaultDataSource);// 数据源初始化后执行dynamicDataSource.afterPropertiesSet();returndynamicDataSource;}// ==================== 4. 事务管理器配置 ====================@Bean(name="transactionManager")publicPlatformTransactionManagertransactionManager(@Qualifier("dynamicDataSource")DataSourcedynamicDataSource){// 事务管理器 绑定 动态数据源returnnewDataSourceTransactionManager(dynamicDataSource);}}

6). 自动切换数据源

// 1. 自定义注解@Target({ElementType.METHOD,ElementType.TYPE})@Retention(RetentionPolicy.RUNTIME)public@interfaceDataSourceAnno{DataSourceTypevalue()defaultDataSourceType.DEFAULT;// 默认用 DEFAULT 数据源}// 2. 动态数据源AOP切面@Aspect@Component@Order(-1)// 值比默认值小,确保先于事务拦截器(或 事务注解)执行(这个注解的默认值为 Integer.MAX_VALUE)publicclassDataSourceAspect{// 方法1:基于自定义注解切换@Before("@annotation(dataSourceAnno)")publicvoidswitchDataSourceByAnnotation(JoinPointjoinPoint,DataSourcedataSource){DataSourceContextHolder.setDataSourceType(dataSource.value());}// 方法2:基于方法名自动识别@Before("execution(* com.sh.service.*.*(..))")publicvoidswitchDataSourceByMethod(JoinPointjoinPoint){StringmethodName=joinPoint.getSignature().getName();Object[]args=joinPoint.getArgs();// 根据方法名判断是否需要分片表操作if(isShardingTableOperation(methodName,args)){DataSourceContextHolder.setDataSourceType(DataSourceType.SHARDING);}else{DataSourceContextHolder.setDataSourceType(DataSourceType.DEFAULT);}}privatebooleanisShardingTableOperation(StringmethodName,Object[]args){// 分片表识别逻辑,比如:我们可以规定使用 ShardingSphereDataSource 的方法名 前缀统一为:shardingDsreturnmethodName.contains("shardingDs");}@After("@annotation(dataSourceAnno)")// 或者 @After("execution(* com.sh.service.*.*(..))")publicvoidrestoreDataSource(JoinPointjoinPoint){// 清理,防止内存泄漏DataSourceContextHolder.clearDataSourceType();}}

2. 动态数据源使用

由于目前一次操作只涉及一个数据源(要么全部是分片表,要么全部是非分片表),所以,可以使用Spring的本地事务管理。

// 在 Service 层使用注解@ServicepublicclassOrderService{// 1.操作分表时用 sharding@DataSourceAnno(DataSourceType.SHARDING)@Transactional// 【Spring本地事务】publicvoidprocessOrder(){orderMapper.insertOrderItem(...);// order_item(分表)orderMapper.insertOrderInfo(...);// order_info(分表)}// 2.操作普通表时用 DEFAULT@DataSourceAnno(DataSourceType.DEFAULT)@Transactional// 【Spring本地事务】publicvoidprocessUser(){userMapper.insertUser(...);// user(普通表)productMapper.insertProduct(...);// product(普通表)}}

3.事务拦截器TransactionInterceptor

上面我们在使用 Spirng的本地事务管理时 用的是:@Transactional注解,但是,有时候我们会忘记添加这个注解或者说到处写这个注解太麻烦了。我们就可以使用@Aspect配置式AOP 来装配 事务拦截器(TransactionInterceptor)的方案,完全摆脱@Transactional注解。这是Spring框架内更原生的编程式事务管理方式。

importorg.aspectj.lang.annotation.Aspect;importorg.springframework.aop.Advisor;importorg.springframework.aop.aspectj.AspectJExpressionPointcut;importorg.springframework.aop.support.DefaultBeanFactoryPointcutAdvisor;importorg.springframework.aop.support.DefaultPointcutAdvisor;importorg.springframework.aop.support.NameMatchMethodPointcut;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;importorg.springframework.transaction.PlatformTransactionManager;importorg.springframework.transaction.TransactionDefinition;importorg.springframework.transaction.interceptor.*;importjava.util.Collections;importjava.util.HashMap;importjava.util.Map;@Aspect@ConfigurationpublicclassTransactionConfig{// 1. 定义切入点(拦截 com.pro.service 包下所有方法)privatestaticfinalStringAOP_POINTCUT_EXPRESSION="execution(* com.sh.service..*.*(..))";// 2. 定义增删改方法前缀(自动加事务)privatestaticfinalString[]REQUIRED_RULE_TRANSACTION={"insert*","create*","add*","save*","update*","modify*","del*","delete*","remove*"};// 3. 定义查询方法前缀(自动加只读事务)privatestaticfinalString[]READ_RULE_TRANSACTION={"select*","get*","query*","search*","count*","find*","list*","page*"};// 4. 注入事务管理器@AutowiredprivatePlatformTransactionManagertransactionManager;// ========== 5. 核心:配置事务拦截器 (TransactionInterceptor)@BeanpublicTransactionInterceptortxAdvice(){// 5.1 创建事务属性源NameMatchTransactionAttributeSourcetas=newNameMatchTransactionAttributeSource();// 5.2 配置增删改事务属性(REQUIRED)RuleBasedTransactionAttributerequiredTx=newRuleBasedTransactionAttribute();requiredTx.setRollbackRules(Collections.singletonList(newRollbackRuleAttribute(Exception.class)));// 发生异常回滚requiredTx.setIsolationLevel(TransactionDefinition.ISOLATION_READ_COMMITTED);// 事务隔离级别:读已提交requiredTx.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);// 事务传播行为:REQUIREDrequiredTx.setTimeout(30);// 超时30秒// 5.3 配置查询事务属性(只读)RuleBasedTransactionAttributereadOnlyTx=newRuleBasedTransactionAttribute();readOnlyTx.setRollbackRules(Collections.singletonList(newRollbackRuleAttribute(Exception.class)));readOnlyTx.setIsolationLevel(TransactionDefinition.ISOLATION_READ_COMMITTED);readOnlyTx.setPropagationBehavior(TransactionDefinition.PROPAGATION_SUPPORTS);// 支持当前事务,不存在也不新建readOnlyTx.setReadOnly(true);// 关键:设置为只读readOnlyTx.setTimeout(20);// 查询超时可设短些// 5.4 将方法名模式映射到事务属性Map<String,TransactionAttribute>txMap=newHashMap<>();for(StringmethodName:REQUIRED_RULE_TRANSACTION){txMap.put(methodName,requiredTx);}for(StringmethodName:READ_RULE_TRANSACTION){txMap.put(methodName,readOnlyTx);}tas.setNameMap(txMap);// 5.5 创建并返回事务拦截器returnnewTransactionInterceptor(transactionManager,tas);}// ========== 6. 核心:配置切面(Advisor),将切入点和拦截器关联@BeanpublicAdvisortxAdviceAdvisor(){// 6.1 创建切入点(使用AspectJ表达式)AspectJExpressionPointcutpointcut=newAspectJExpressionPointcut();pointcut.setExpression(AOP_POINTCUT_EXPRESSION);// 6.2 创建Advisor(通知器),将切入点和事务通知绑定returnnewDefaultPointcutAdvisor(pointcut,txAdvice());}}

4. 数据源动态切换流程图

before(): 设置数据源Key
到ThreadLocal
1. 根据方法名匹配规则
2. 从事务管理器获取连接
此时ThreadLocal中
已有正确数据源Key
成功
异常
客户端调用 Service 方法
动态数据源代理
数据源切面
DataSourceAspect
事务切面代理
事务拦截器
TransactionInterceptor
获取事务属性
DataSourceTransactionManager
动态数据源
determineCurrentLookupKey()
路由到真实物理数据源
执行SQL
方法执行结束?
TransactionInterceptor
提交事务
TransactionInterceptor
回滚事务
DataSourceAspect.after()
清理ThreadLocal
返回结果给客户端

三、关于事务管理

1. 单数据源事务:如果一次操作只涉及一个数据源(要么全部是分片表,要么全部是非分片表)。就可以使用Spring的本地事务管理,因为动态数据源会路由到同一个物理数据源。

2.混合数据源事务如果需要同时操作分片表和非分片表,则需要引入分布式事务方案(如Seata)或者 【将操作拆分为两个独立的事务


1. 混合事务

问题
当一次业务操作需要同时更新分片表和非分片表时,由于这两个表【可能】位于不同的物理数据库(或同一数据库的不同数据源管理),我们需要考虑事务的一致性。

方案一:分布式事务

分布式事务的内容有点多,这里就先用伪代码:

// 使用 Seata 等分布式事务框架@GlobalTransactional// 全局分布式事务注解publicvoidmixedOperation(){// 操作非分片表(默认数据源)defaultService.updateNormalTable();// 操作分片表(ShardingSphere 数据源)shardingService.updateShardingTable();}

方案二:拆分事务(妥协方案)

将原本在一个事务中的操作,拆分成两个独立的事务,每个事务只操作一个数据源(要么是分片数据源,要么是默认数据源)。这样,每个事务都是本地事务,由各自的数据源事务管理器管理。

假设我们有一个业务方法,需要先更新非分片表A,然后更新分片表B。

原本的设计(问题代码)

// ❌ 错误做法:试图用一个事务控制两个数据源@TransactionalpublicvoidcreateOrderWithUserInfo(Useruser,Orderorder){// 更新用户信息(非分片表,使用默认数据源)userMapper.update(user);// 默认数据源// 创建订单(分片表,使用ShardingSphere数据源)orderMapper.insert(order);// ShardingSphere数据源// 这里会出现事务问题!}

拆分后的设计

@AutowiredprivateTransactionTemplatetransactionTemplate;// ✅ 将业务拆分成两个独立的事务publicvoiddoBusiness(){// 第一个事务,操作非分片表AtransactionTemplate.execute(status->{updateNonShardingTableA();returnnull;});// 第二个事务,操作分片表BtransactionTemplate.execute(status->{updateShardingTableB();returnnull;});}

拆分事务的优缺点

  • 优点:
    • 实现简单:不需要引入复杂的分布式事务框架
    • 性能较好:避免了分布式事务的网络开销和锁竞争
    • 技术栈轻量:减少了系统复杂度
  • 缺点:
    • 数据一致性风险:如果第二个事务失败,第一个事务已经提交,则无法回滚。因此,这种拆分需要根据业务场景来判断是否可接受如果业务要求两个操作要么都成功,要么都失败,那么就需要引入分布式事务
    • 业务逻辑复杂:
      • 需要实现补偿机制。
      • 需要考虑幂等性。
    • 业务限制:
      • 如果两个操作必须同时成功或同时失败,此方案不适用。
      • 如果第二个操作依赖第一个操作的结果,拆分不可行。

补充说明

在拆分事务时,我们通常需要根据业务逻辑考虑是否允许中间状态。如果业务允许(例如,先记录日志,再更新分片表,即使分片表更新失败,日志也可以保留),那么可以拆分。如果不允许,则需要考虑其他方案,如:

  • 使用分布式事务(如Seata)保证两个数据源的事务一致性。
  • 重新设计数据模型,将相关操作放到同一个数据源中。
版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/1/9 19:05:08

解决failed to connect to api.anthropic.c错误,转向国产Qwen方案

解决 failed to connect 到境外 API 的难题&#xff0c;转向国产 Qwen 图像编辑方案 在跨境电商运营的某个清晨&#xff0c;设计师正准备批量生成新品主图&#xff0c;却突然发现图像编辑服务卡住了——日志里反复出现 failed to connect to api.anthropic.com。这不是个例。过…

作者头像 李华
网站建设 2026/1/10 1:19:47

ComfyUI与Cherry Studio协作:打造个性化AI创作空间

ComfyUI与Cherry Studio协作&#xff1a;打造个性化AI创作空间 在AI内容生成的浪潮中&#xff0c;越来越多的创作者发现&#xff0c;传统的“一键出图”工具虽然上手快&#xff0c;但一旦进入复杂项目或团队协作场景&#xff0c;便暴露出流程不可控、设置难复用、调试像猜谜等痛…

作者头像 李华
网站建设 2026/1/10 5:51:52

终极轨道计算指南:3个实战技巧解析

终极轨道计算指南&#xff1a;3个实战技巧解析 【免费下载链接】sgp4 Simplified perturbations models 项目地址: https://gitcode.com/gh_mirrors/sg/sgp4 轨道计算是航天工程中至关重要的基础技术&#xff0c;它能够精确预测卫星在太空中的位置和运动轨迹。SGP4&…

作者头像 李华
网站建设 2026/1/7 8:54:03

时区相关的问题,开发如何自测?

时区相关的问题&#xff0c;开发如何自测&#xff1f; 在java服务启动时&#xff0c;通过 Intellij Idea 的 Vm Option 加上启动参数。。 比如 -Duser.timezoneUTC &#xff0c; 就可以指定时区为标准的 UTC 0时区。 通过这些启动参数&#xff0c;开发就可以直接在本地自测时区…

作者头像 李华
网站建设 2026/1/10 13:27:35

城通网盘直链解析神器:三步解锁高速下载新体验

城通网盘直链解析神器&#xff1a;三步解锁高速下载新体验 【免费下载链接】ctfileGet 获取城通网盘一次性直连地址 项目地址: https://gitcode.com/gh_mirrors/ct/ctfileGet 还在为城通网盘繁琐的下载流程而烦恼吗&#xff1f;城通网盘直链解析工具正是你需要的解决方案…

作者头像 李华
网站建设 2025/12/30 15:35:16

21届智能车赛外延创意:用车载语音指令触发ACE-Step音乐生成

智能车赛外延创意&#xff1a;用车载语音指令触发ACE-Step音乐生成 在一辆飞驰的智能汽车中&#xff0c;驾驶员轻声说了一句&#xff1a;“来点轻松的爵士乐&#xff0c;带点萨克斯&#xff0c;像深夜电台那样。”话音刚落&#xff0c;车内音响便流淌出一段即兴创作的原创旋律—…

作者头像 李华