Sharding分库分表复杂SQL之数据源路由
- 一、问题及分析
- 1. 背景
- 2. 方案
- 二、数据源动态切换
- 1. 配置及代码实现
- 2. 动态数据源使用
- 3. `事务拦截器TransactionInterceptor`
- 4. 数据源动态切换流程图
- 三、关于事务管理
- 1. 混合事务
- 方案一:分布式事务
- 方案二:拆分事务(妥协方案)
一、问题及分析
1. 背景
数据库中有20张业务表,其中 有两张表(订单表、交易明细表)因为数据量太大进行了分库分表,其余18张表保持单库单表结构。现有系统架构变为:Spring Boot + Spring Cloud + ShardingSphere 的微服务系统。但是,ShardingSphere 对【复杂 SQL】(如多表关联、子查询、窗口函数等)支持度不足 和 性能问题。对此有这样的需求:既需要支持复杂查询,又要对分片表进行有效的水平拆分。
2. 方案
混合数据源方案:在部分表分片、部分表不分片的场景下,混合使用不同数据源(这是一种标准做法)。
具体就是使用Spring的AbstractRoutingDataSource来动态路由数据源(自定义一个路由数据源,继承AbstractRoutingDataSource)。
我们需要在两个数据源之间做路由:一个是ShardingSphereDataSource(负责分片表,也就是分库分表的表),另一个是普通数据源defaultDataSource(负责其他 非分片表)。
但是,又有新的问题:如何决定使用哪个数据源?
- 方案一:通过解析sql,判断是否涉及分片表,从而决定要使用哪个数据源。
- 该方案的缺点:通过解析SQL去判断是否涉及分片表可能比较复杂,而且解析可能不准确。
- 该方案存在的问题:通常来说,我们会将业务逻辑放在 Service 层,
具体要去 操作哪几张表、用哪个数据源,都是业务逻辑决定的。所以说,数据源切换 应该在 “方法开始前” 决定,而不是在 SQL 执行前。方案一不考虑。
- 方案二:
通过AOP切面在Service层 根据方法名或注解来切换数据源。当然,像这样的处理方案是需要写入项目的编码规则文档中。- 缺点:需要开发人员明确指定每个方法使用哪个数据源。
注意事项:
- ① 如果一条SQL只操作非分片表,就用defaultDataSource;如果一条SQL只操作分片表,就用ShardingDataSource;但是
如果一条SQL同时操作了分片表和非分片表,那么就需要同时使用两个数据源,这就会涉及到 【分布式事务】。假设目前没有同时操作两个数据源的业务,所以暂不讨论这种情况。 - ② 目前没有同时操作两个数据源的情况,也就是说,在业务上能够保证 在同一个事务中不会同时操作两个数据源,所以说,我们就可以使用Spring的事务管理,我们可以
配置两个事务管理器,并且指定每个事务管理器对应哪个数据源。这样做方便后期扩展,当然目前还不需要,可以不用配置,直接使用 @Transaction注解即可。 - ③ Spring事务管理是通过AOP实现的,而我们的数据源动态路由也是通过AOP实现的。所以说,
数据源动态路由切面 要在 事务切面 之前执行,否则会导致数据源切换失效。我们可以通过调整切面顺序来解决,通常使用@Order注解。
二、数据源动态切换
- 可以先看一下第4小节【4. 数据源动态切换流程图】。
1. 配置及代码实现
使用AbstractRoutingDataSource+AOP的方案:
1). 数据源配置
确保两个数据源使用独立的连接池配置。
# application.ymlspring:# 默认数据源配置(用于不分片的18张表)datasource:url:jdbc:mysql://localhost:3306/db0username:rootpassword:rootdriver-class-name:com.mysql.cj.jdbc.Driver# ShardingSphere 数据源配置(用于分片的2张表)shardingsphere:datasource:names:ds0,ds1ds0:url:jdbc:mysql://localhost:3306/db0ds1:url:jdbc:mysql://localhost:3307/db1# ... 其他ShardingSphere配置2). 数据源类型枚举
publicenumDataSourceType{DEFAULT,// 默认数据源(不分片表)SHARDING// ShardingSphere数据源(分片表)}3). 数据源上下文
publicclassDataSourceContextHolder{// ThreadLocal存储当前要切到哪个数据源privatestaticfinalThreadLocal<DataSourceType>CONTEXT_HOLDER=newThreadLocal<>();publicstaticvoidsetDataSourceType(DataSourceTypedataSourceType){CONTEXT_HOLDER.set(dataSourceType);}publicstaticDataSourceTypegetDataSourceType(){returnCONTEXT_HOLDER.get();}publicstaticvoidclearDataSourceType(){CONTEXT_HOLDER.remove();}publicstaticbooleanisShardingDataSource(){returnDataSourceType.SHARDING.equals(CONTEXT_HOLDER.get());}}4). 自定义数据源路由
// 继承AbstractRoutingDataSource@ComponentpublicclassDynamicDataSourceextendsAbstractRoutingDataSource{@OverrideprotectedObjectdetermineCurrentLookupKey(){returnDataSourceContextHolder.getDataSourceType();}}5). 数据源配置类
@ConfigurationpublicclassDataSourceConfig{// ==================== 1. 默认数据源(Spring Boot自动配置的数据源,不分片表使用)@Bean("defaultDataSource")@ConfigurationProperties(prefix="spring.datasource")publicDataSourcedefaultDataSource(){returnDataSourceBuilder.create().build();}// ==================== 2. ShardingSphere数据源配置// ShardingSphere数据源(分片表使用)@Bean("shardingDataSource")publicDataSourceshardingDataSource()throwsSQLException{// 通过 Yaml 的方式创建数据源URLyamlResource=ClassLoader.getSystemClassLoader().getResource("sharding.yaml");returnYamlShardingSphereDataSourceFactory.createDataSource(newFile(yamlResource.toURI()));}// ==================== 3. 动态数据源配置 ==========@Primary@Bean("dynamicDataSource")publicDataSourcedataSource(@Qualifier("defaultDataSource")DataSourcedefaultDataSource,@Qualifier("shardingDataSource")DataSourceshardingDataSource){// 创建DynamicDataSource,绑定两个数据源DynamicDataSourcedynamicDataSource=newDynamicDataSource();// 设置数据源映射Map<Object,Object>targetDataSources=newHashMap<>();targetDataSources.put(DataSourceType.DEFAULT,defaultDataSource);targetDataSources.put(DataSourceType.SHARDING,shardingDataSource);dynamicDataSource.setTargetDataSources(targetDataSources);// 设置默认数据源dynamicDataSource.setDefaultTargetDataSource(defaultDataSource);// 数据源初始化后执行dynamicDataSource.afterPropertiesSet();returndynamicDataSource;}// ==================== 4. 事务管理器配置 ====================@Bean(name="transactionManager")publicPlatformTransactionManagertransactionManager(@Qualifier("dynamicDataSource")DataSourcedynamicDataSource){// 事务管理器 绑定 动态数据源returnnewDataSourceTransactionManager(dynamicDataSource);}}6). 自动切换数据源
// 1. 自定义注解@Target({ElementType.METHOD,ElementType.TYPE})@Retention(RetentionPolicy.RUNTIME)public@interfaceDataSourceAnno{DataSourceTypevalue()defaultDataSourceType.DEFAULT;// 默认用 DEFAULT 数据源}// 2. 动态数据源AOP切面@Aspect@Component@Order(-1)// 值比默认值小,确保先于事务拦截器(或 事务注解)执行(这个注解的默认值为 Integer.MAX_VALUE)publicclassDataSourceAspect{// 方法1:基于自定义注解切换@Before("@annotation(dataSourceAnno)")publicvoidswitchDataSourceByAnnotation(JoinPointjoinPoint,DataSourcedataSource){DataSourceContextHolder.setDataSourceType(dataSource.value());}// 方法2:基于方法名自动识别@Before("execution(* com.sh.service.*.*(..))")publicvoidswitchDataSourceByMethod(JoinPointjoinPoint){StringmethodName=joinPoint.getSignature().getName();Object[]args=joinPoint.getArgs();// 根据方法名判断是否需要分片表操作if(isShardingTableOperation(methodName,args)){DataSourceContextHolder.setDataSourceType(DataSourceType.SHARDING);}else{DataSourceContextHolder.setDataSourceType(DataSourceType.DEFAULT);}}privatebooleanisShardingTableOperation(StringmethodName,Object[]args){// 分片表识别逻辑,比如:我们可以规定使用 ShardingSphereDataSource 的方法名 前缀统一为:shardingDsreturnmethodName.contains("shardingDs");}@After("@annotation(dataSourceAnno)")// 或者 @After("execution(* com.sh.service.*.*(..))")publicvoidrestoreDataSource(JoinPointjoinPoint){// 清理,防止内存泄漏DataSourceContextHolder.clearDataSourceType();}}2. 动态数据源使用
由于目前一次操作只涉及一个数据源(要么全部是分片表,要么全部是非分片表),所以,可以使用Spring的本地事务管理。
// 在 Service 层使用注解@ServicepublicclassOrderService{// 1.操作分表时用 sharding@DataSourceAnno(DataSourceType.SHARDING)@Transactional// 【Spring本地事务】publicvoidprocessOrder(){orderMapper.insertOrderItem(...);// order_item(分表)orderMapper.insertOrderInfo(...);// order_info(分表)}// 2.操作普通表时用 DEFAULT@DataSourceAnno(DataSourceType.DEFAULT)@Transactional// 【Spring本地事务】publicvoidprocessUser(){userMapper.insertUser(...);// user(普通表)productMapper.insertProduct(...);// product(普通表)}}3.事务拦截器TransactionInterceptor
上面我们在使用 Spirng的本地事务管理时 用的是:@Transactional注解,但是,有时候我们会忘记添加这个注解或者说到处写这个注解太麻烦了。我们就可以使用@Aspect配置式AOP 来装配 事务拦截器(TransactionInterceptor)的方案,完全摆脱@Transactional注解。这是Spring框架内更原生的编程式事务管理方式。
importorg.aspectj.lang.annotation.Aspect;importorg.springframework.aop.Advisor;importorg.springframework.aop.aspectj.AspectJExpressionPointcut;importorg.springframework.aop.support.DefaultBeanFactoryPointcutAdvisor;importorg.springframework.aop.support.DefaultPointcutAdvisor;importorg.springframework.aop.support.NameMatchMethodPointcut;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;importorg.springframework.transaction.PlatformTransactionManager;importorg.springframework.transaction.TransactionDefinition;importorg.springframework.transaction.interceptor.*;importjava.util.Collections;importjava.util.HashMap;importjava.util.Map;@Aspect@ConfigurationpublicclassTransactionConfig{// 1. 定义切入点(拦截 com.pro.service 包下所有方法)privatestaticfinalStringAOP_POINTCUT_EXPRESSION="execution(* com.sh.service..*.*(..))";// 2. 定义增删改方法前缀(自动加事务)privatestaticfinalString[]REQUIRED_RULE_TRANSACTION={"insert*","create*","add*","save*","update*","modify*","del*","delete*","remove*"};// 3. 定义查询方法前缀(自动加只读事务)privatestaticfinalString[]READ_RULE_TRANSACTION={"select*","get*","query*","search*","count*","find*","list*","page*"};// 4. 注入事务管理器@AutowiredprivatePlatformTransactionManagertransactionManager;// ========== 5. 核心:配置事务拦截器 (TransactionInterceptor)@BeanpublicTransactionInterceptortxAdvice(){// 5.1 创建事务属性源NameMatchTransactionAttributeSourcetas=newNameMatchTransactionAttributeSource();// 5.2 配置增删改事务属性(REQUIRED)RuleBasedTransactionAttributerequiredTx=newRuleBasedTransactionAttribute();requiredTx.setRollbackRules(Collections.singletonList(newRollbackRuleAttribute(Exception.class)));// 发生异常回滚requiredTx.setIsolationLevel(TransactionDefinition.ISOLATION_READ_COMMITTED);// 事务隔离级别:读已提交requiredTx.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);// 事务传播行为:REQUIREDrequiredTx.setTimeout(30);// 超时30秒// 5.3 配置查询事务属性(只读)RuleBasedTransactionAttributereadOnlyTx=newRuleBasedTransactionAttribute();readOnlyTx.setRollbackRules(Collections.singletonList(newRollbackRuleAttribute(Exception.class)));readOnlyTx.setIsolationLevel(TransactionDefinition.ISOLATION_READ_COMMITTED);readOnlyTx.setPropagationBehavior(TransactionDefinition.PROPAGATION_SUPPORTS);// 支持当前事务,不存在也不新建readOnlyTx.setReadOnly(true);// 关键:设置为只读readOnlyTx.setTimeout(20);// 查询超时可设短些// 5.4 将方法名模式映射到事务属性Map<String,TransactionAttribute>txMap=newHashMap<>();for(StringmethodName:REQUIRED_RULE_TRANSACTION){txMap.put(methodName,requiredTx);}for(StringmethodName:READ_RULE_TRANSACTION){txMap.put(methodName,readOnlyTx);}tas.setNameMap(txMap);// 5.5 创建并返回事务拦截器returnnewTransactionInterceptor(transactionManager,tas);}// ========== 6. 核心:配置切面(Advisor),将切入点和拦截器关联@BeanpublicAdvisortxAdviceAdvisor(){// 6.1 创建切入点(使用AspectJ表达式)AspectJExpressionPointcutpointcut=newAspectJExpressionPointcut();pointcut.setExpression(AOP_POINTCUT_EXPRESSION);// 6.2 创建Advisor(通知器),将切入点和事务通知绑定returnnewDefaultPointcutAdvisor(pointcut,txAdvice());}}4. 数据源动态切换流程图
三、关于事务管理
1. 单数据源事务:如果一次操作只涉及一个数据源(要么全部是分片表,要么全部是非分片表)。就可以使用Spring的本地事务管理,因为动态数据源会路由到同一个物理数据源。
2.混合数据源事务:如果需要同时操作分片表和非分片表,则需要引入分布式事务方案(如Seata)或者 【将操作拆分为两个独立的事务】。
1. 混合事务
问题:
当一次业务操作需要同时更新分片表和非分片表时,由于这两个表【可能】位于不同的物理数据库(或同一数据库的不同数据源管理),我们需要考虑事务的一致性。
方案一:分布式事务
分布式事务的内容有点多,这里就先用伪代码:
// 使用 Seata 等分布式事务框架@GlobalTransactional// 全局分布式事务注解publicvoidmixedOperation(){// 操作非分片表(默认数据源)defaultService.updateNormalTable();// 操作分片表(ShardingSphere 数据源)shardingService.updateShardingTable();}方案二:拆分事务(妥协方案)
将原本在一个事务中的操作,拆分成两个独立的事务,每个事务只操作一个数据源(要么是分片数据源,要么是默认数据源)。这样,每个事务都是本地事务,由各自的数据源事务管理器管理。
假设我们有一个业务方法,需要先更新非分片表A,然后更新分片表B。
原本的设计(问题代码):
// ❌ 错误做法:试图用一个事务控制两个数据源@TransactionalpublicvoidcreateOrderWithUserInfo(Useruser,Orderorder){// 更新用户信息(非分片表,使用默认数据源)userMapper.update(user);// 默认数据源// 创建订单(分片表,使用ShardingSphere数据源)orderMapper.insert(order);// ShardingSphere数据源// 这里会出现事务问题!}拆分后的设计:
@AutowiredprivateTransactionTemplatetransactionTemplate;// ✅ 将业务拆分成两个独立的事务publicvoiddoBusiness(){// 第一个事务,操作非分片表AtransactionTemplate.execute(status->{updateNonShardingTableA();returnnull;});// 第二个事务,操作分片表BtransactionTemplate.execute(status->{updateShardingTableB();returnnull;});}拆分事务的优缺点
- 优点:
- 实现简单:不需要引入复杂的分布式事务框架
- 性能较好:避免了分布式事务的网络开销和锁竞争
- 技术栈轻量:减少了系统复杂度
- 缺点:
- 数据一致性风险:
如果第二个事务失败,第一个事务已经提交,则无法回滚。因此,这种拆分需要根据业务场景来判断是否可接受。如果业务要求两个操作要么都成功,要么都失败,那么就需要引入分布式事务。 - 业务逻辑复杂:
- 需要实现补偿机制。
- 需要考虑幂等性。
- 业务限制:
- 如果两个操作必须同时成功或同时失败,此方案不适用。
- 如果第二个操作依赖第一个操作的结果,拆分不可行。
- 数据一致性风险:
补充说明:
在拆分事务时,我们通常需要根据业务逻辑考虑是否允许中间状态。如果业务允许(例如,先记录日志,再更新分片表,即使分片表更新失败,日志也可以保留),那么可以拆分。如果不允许,则需要考虑其他方案,如:
- 使用分布式事务(如Seata)保证两个数据源的事务一致性。
- 重新设计数据模型,将相关操作放到同一个数据源中。