news 2026/6/23 19:10:20

Spring Boot 4.0 + MyBatis-Plus 实战响应式编程的能力实战

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
Spring Boot 4.0 + MyBatis-Plus 实战响应式编程的能力实战

Spring Boot 4.0 + MyBatis-Plus 实战响应式编程的能力实战

Spring Cloud全栈实战:手撸企业级项目,从入门到架构师!

目前 MyBatis-Plus 官方还不完全支持响应式编程,但我们可以结合 R2DBC 和 MyBatis-Plus 的部分特性来实现。这里提供两种方案:

方案一:使用 MyBatis-Plus 增强 R2DBC (推荐)

Spring Cloud全栈实战:手撸企业级项目,从入门到架构师!

1. 项目依赖

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-webflux</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-r2dbc</artifactId></dependency><dependency><groupId>com.baomidou</groupId><artifactId>mybatis-plus-boot-starter</artifactId><version>3.5.3.1</version><!-- 注意:只使用其工具类,不启用SQL执行 --></dependency><!-- 数据库驱动 --><dependency><groupId>io.asyncer</groupId><artifactId>r2dbc-mysql</artifactId><version>1.0.2</version></dependency><!-- 或 PostgreSQL --><!-- <dependency> <groupId>org.postgresql</groupId> <artifactId>r2dbc-postgresql</artifactId> <scope>runtime</scope> </dependency> -->

2. 配置类

importcom.baomidou.mybatisplus.annotation.DbType;importcom.baomidou.mybatisplus.extension.plugins.MybatisPlusInterceptor;importcom.baomidou.mybatisplus.extension.plugins.inner.PaginationInnerInterceptor;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;importorg.springframework.data.r2dbc.config.EnableR2dbcAuditing;@Configuration@EnableR2dbcAuditingpublicclassR2dbcMybatisConfig{/** * 只使用 MyBatis-Plus 的分页插件 */@BeanpublicMybatisPlusInterceptormybatisPlusInterceptor(){MybatisPlusInterceptorinterceptor=newMybatisPlusInterceptor();interceptor.addInnerInterceptor(newPaginationInnerInterceptor(DbType.MYSQL));returninterceptor;}}

3. 实体类 (使用 MyBatis-Plus 注解)

Spring Cloud全栈实战:手撸企业级项目,从入门到架构师!

importcom.baomidou.mybatisplus.annotation.*;importlombok.Data;importorg.springframework.data.annotation.CreatedDate;importorg.springframework.data.annotation.LastModifiedDate;importorg.springframework.data.relational.core.mapping.Table;importjava.time.LocalDateTime;@Data@Table("users")publicclassUser{@TableId(type=IdType.AUTO)privateLongid;@TableField("username")privateStringusername;@TableField("email")privateStringemail;@TableField("password")privateStringpassword;@TableField("age")privateIntegerage;@TableLogic@TableField("deleted")privateIntegerdeleted=0;@TableField(value="version",fill=FieldFill.INSERT)@VersionprivateIntegerversion=1;@CreatedDate@TableField("create_time")privateLocalDateTimecreateTime;@LastModifiedDate@TableField("update_time")privateLocalDateTimeupdateTime;// 响应式编程友好的构造方法publicstaticMono<User>of(Stringusername,Stringemail){Useruser=newUser();user.setUsername(username);user.setEmail(email);returnMono.just(user);}}

4. Repository 接口 (R2DBC)

importorg.springframework.data.r2dbc.repository.R2dbcRepository;importorg.springframework.data.r2dbc.repository.Query;importorg.springframework.stereotype.Repository;importreactor.core.publisher.Flux;importreactor.core.publisher.Mono;@RepositorypublicinterfaceUserR2dbcRepositoryextendsR2dbcRepository<User,Long>{Mono<User>findByUsername(Stringusername);Mono<User>findByEmail(Stringemail);Flux<User>findByAgeGreaterThan(Integerage);@Query("SELECT * FROM users WHERE username LIKE :keyword OR email LIKE :keyword")Flux<User>searchUsers(Stringkeyword);@Query("UPDATE users SET age = :age WHERE id = :id")Mono<Integer>updateAgeById(Longid,Integerage);}

5. Service 层 (结合 MyBatis-Plus 工具)

importcom.baomidou.mybatisplus.core.conditions.query.QueryWrapper;importcom.baomidou.mybatisplus.extension.plugins.pagination.Page;importlombok.RequiredArgsConstructor;importorg.springframework.data.domain.Pageable;importorg.springframework.r2dbc.core.DatabaseClient;importorg.springframework.stereotype.Service;importorg.springframework.transaction.annotation.Transactional;importreactor.core.publisher.Flux;importreactor.core.publisher.Mono;importreactor.core.scheduler.Schedulers;importjava.time.Duration;importjava.util.Map;@Service@RequiredArgsConstructorpublicclassReactiveUserService{privatefinalUserR2dbcRepositoryuserRepository;privatefinalDatabaseClientdatabaseClient;publicMono<User>createUser(Useruser){returnuserRepository.save(user);}publicMono<User>getUserById(Longid){returnuserRepository.findById(id).switchIfEmpty(Mono.error(newRuntimeException("User not found")));}publicFlux<User>getAllUsers(){returnuserRepository.findAll().delayElements(Duration.ofMillis(100))// 模拟流处理.subscribeOn(Schedulers.boundedElastic());}publicMono<User>updateUser(Longid,Useruser){returnuserRepository.findById(id).flatMap(existing->{existing.setUsername(user.getUsername());existing.setEmail(user.getEmail());existing.setAge(user.getAge());returnuserRepository.save(existing);});}@TransactionalpublicMono<Void>deleteUser(Longid){returnuserRepository.deleteById(id).then(Mono.fromRunnable(()->System.out.println("User deleted: "+id)));}/** * 使用 MyBatis-Plus 的 QueryWrapper 构建查询条件 * 然后转换为 R2DBC 查询 */publicFlux<User>queryUsers(Map<String,Object>params){// 使用 MyBatis-Plus 的 QueryWrapper 构建条件QueryWrapper<User>queryWrapper=newQueryWrapper<>();if(params.containsKey("username")){queryWrapper.like("username",params.get("username"));}if(params.containsKey("email")){queryWrapper.like("email",params.get("email"));}if(params.containsKey("minAge")){queryWrapper.ge("age",params.get("minAge"));}if(params.containsKey("maxAge")){queryWrapper.le("age",params.get("maxAge"));}queryWrapper.orderByDesc("create_time");// 将 QueryWrapper 转换为 SQLStringsql=buildQueryWrapperSql(queryWrapper);// 执行响应式查询returndatabaseClient.sql(sql).fetch().all().map(row->{Useruser=newUser();user.setId((Long)row.get("id"));user.setUsername((String)row.get("username"));user.setEmail((String)row.get("email"));user.setAge((Integer)row.get("age"));returnuser;});}/** * 响应式分页查询 */publicMono<Page<User>>getUsersPage(Pageablepageable){// 使用 MyBatis-Plus 的 Page 对象Page<User>mybatisPage=newPage<>(pageable.getPageNumber(),pageable.getPageSize());// 计算总数Mono<Long>countMono=databaseClient.sql("SELECT COUNT(*) FROM users").map(row->row.get(0,Long.class)).one();// 查询数据Flux<User>usersFlux=databaseClient.sql("SELECT * FROM users ORDER BY create_time DESC LIMIT :limit OFFSET :offset").bind("limit",pageable.getPageSize()).bind("offset",pageable.getOffset()).fetch().all().map(this::mapRowToUser);returnMono.zip(countMono,usersFlux.collectList()).map(tuple->{mybatisPage.setTotal(tuple.getT1());mybatisPage.setRecords(tuple.getT2());returnmybatisPage;});}privateStringbuildQueryWrapperSql(QueryWrapper<User>queryWrapper){// 简化示例,实际需要更复杂的转换return"SELECT * FROM users WHERE "+queryWrapper.getTargetSql();}privateUsermapRowToUser(Map<String,Object>row){Useruser=newUser();user.setId((Long)row.get("id"));user.setUsername((String)row.get("username"));user.setEmail((String)row.get("email"));user.setAge((Integer)row.get("age"));returnuser;}/** * 批量保存 */publicFlux<User>saveAll(Flux<User>users){returnuserRepository.saveAll(users).onErrorContinue((error,user)->System.err.println("Error saving user: "+error.getMessage()));}/** * 流式查询 */publicFlux<User>streamUsers(){returndatabaseClient.sql("SELECT * FROM users").fetch().all().delayElements(Duration.ofMillis(50))// 控制流速度.map(this::mapRowToUser);}}

6. Controller 层

Spring Cloud全栈实战:手撸企业级项目,从入门到架构师!

importcom.baomidou.mybatisplus.extension.plugins.pagination.Page;importlombok.RequiredArgsConstructor;importorg.springframework.data.domain.PageRequest;importorg.springframework.http.HttpStatus;importorg.springframework.http.MediaType;importorg.springframework.http.ResponseEntity;importorg.springframework.web.bind.annotation.*;importreactor.core.publisher.Flux;importreactor.core.publisher.Mono;importjavax.validation.Valid;importjava.time.Duration;importjava.util.Map;@RestController@RequestMapping("/api/reactive/users")@RequiredArgsConstructorpublicclassReactiveUserController{privatefinalReactiveUserServiceuserService;@PostMapping@ResponseStatus(HttpStatus.CREATED)publicMono<ResponseEntity<User>>create(@Valid@RequestBodyUseruser){returnuserService.createUser(user).map(saved->ResponseEntity.status(HttpStatus.CREATED).body(saved)).onErrorResume(e->Mono.just(ResponseEntity.badRequest().build()));}@GetMapping("/{id}")publicMono<ResponseEntity<User>>getById(@PathVariableLongid){returnuserService.getUserById(id).map(ResponseEntity::ok).defaultIfEmpty(ResponseEntity.notFound().build());}@GetMappingpublicFlux<User>getAll(){returnuserService.getAllUsers();}@GetMapping("/search")publicFlux<User>search(@RequestParamMap<String,Object>params){returnuserService.queryUsers(params);}@GetMapping("/page")publicMono<Page<User>>getPage(@RequestParam(defaultValue="0")intpage,@RequestParam(defaultValue="10")intsize){PageRequestpageRequest=PageRequest.of(page,size);returnuserService.getUsersPage(pageRequest);}@PutMapping("/{id}")publicMono<ResponseEntity<User>>update(@PathVariableLongid,@Valid@RequestBodyUseruser){returnuserService.updateUser(id,user).map(ResponseEntity::ok).defaultIfEmpty(ResponseEntity.notFound().build());}@DeleteMapping("/{id}")publicMono<ResponseEntity<Void>>delete(@PathVariableLongid){returnuserService.deleteUser(id).then(Mono.just(ResponseEntity.noContent().<Void>build())).defaultIfEmpty(ResponseEntity.notFound().build());}/** * Server-Sent Events (SSE) 流式接口 */@GetMapping(value="/stream",produces=MediaType.TEXT_EVENT_STREAM_VALUE)publicFlux<User>stream(){returnuserService.streamUsers();}/** * WebFlux WebSocket 支持 */@MessageMapping("users.chat")publicFlux<UserMessage>userChat(Flux<UserMessage>messages){returnmessages.doOnNext(message->System.out.println("Received: "+message.getContent())).map(message->newUserMessage("Server: "+message.getContent())).delayElements(Duration.ofSeconds(1));}/** * 批量操作 */@PostMapping("/batch")publicFlux<User>batchCreate(@RequestBodyFlux<User>users){returnuserService.saveAll(users);}}

7. 自定义响应式 Repository

Spring Cloud全栈实战:手撸企业级项目,从入门到架构师!

importcom.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;importorg.springframework.data.r2dbc.repository.R2dbcRepository;importorg.springframework.data.repository.reactive.ReactiveCrudRepository;importorg.springframework.r2dbc.core.DatabaseClient;importorg.springframework.stereotype.Repository;importreactor.core.publisher.Flux;importreactor.core.publisher.Mono;@RepositorypublicinterfaceCustomReactiveRepository{/** * 使用 MyBatis-Plus 的 Lambda 查询 */Flux<User>findUsersByCondition(LambdaQueryWrapper<User>wrapper);/** * 响应式分页查询 */Mono<Page<User>>findPage(Page<User>page,LambdaQueryWrapper<User>wrapper);}

8. 响应式事务配置

importorg.springframework.context.annotation.Configuration;importorg.springframework.transaction.ReactiveTransactionManager;importorg.springframework.transaction.reactive.TransactionalOperator;@ConfigurationpublicclassReactiveTransactionConfig{@BeanpublicTransactionalOperatortransactionalOperator(ReactiveTransactionManagertransactionManager){returnTransactionalOperator.create(transactionManager);}}

方案二:使用 MyBatis-Plus 响应式扩展 (第三方)

有一些第三方项目正在尝试为 MyBatis-Plus 添加响应式支持:
Spring Cloud全栈实战:手撸企业级项目,从入门到架构师!

1. 添加依赖

<!-- 第三方响应式扩展 --><dependency><groupId>com.github.yulichang</groupId><artifactId>mybatis-plus-join</artifactId><version>1.4.6</version></dependency>

2. 自定义响应式 Mapper

importorg.apache.ibatis.annotations.SelectProvider;importorg.springframework.data.repository.reactive.ReactiveCrudRepository;importreactor.core.publisher.Flux;importreactor.core.publisher.Mono;publicinterfaceReactiveBaseMapper<T>{Mono<Integer>insertReactive(Tentity);Mono<Integer>updateByIdReactive(Tentity);Mono<T>selectByIdReactive(Serializableid);Flux<T>selectListReactive(Wrapper<T>queryWrapper);Mono<Integer>deleteByIdReactive(Serializableid);}

重要提示

  1. MyBatis-Plus 官方还不完全支持响应式,上述方案是结合 R2DBC 和 MyBatis-Plus 的工具类
  2. 真正的响应式编程需要使用 R2DBC 或 MongoDB Reactive
  3. 如果需要复杂 SQL 查询,可以使用 DatabaseClient 或 R2DBC Entity Callbacks
  4. 生产环境建议使用成熟的响应式数据库驱动

完整配置 application.yml

Spring Cloud全栈实战:手撸企业级项目,从入门到架构师!

spring:r2dbc:url:r2dbc:mysql://localhost:3306/reactive_dbusername:rootpassword:passwordpool:initial-size:5max-size:20max-idle-time:30mwebflux:base-path:/apistatic-path-pattern:/static/**codec:max-in-memory-size:10MBlogging:level:org.springframework.r2dbc:DEBUGio.r2dbc:DEBUG

这种架构结合了 MyBatis-Plus 的便利性和 R2DBC 的响应式能力,适合需要复杂查询的场景。

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/6/23 7:52:15

2026 等保测评趋势:从被动合规到主动安全的转型路径

2025 年等保体系系列新规的集中落地&#xff0c;正式标志着网络安全合规进入 “实效防护” 新阶段。 过去 “文档凑数、临时整改” 的被动合规模式已难以为继&#xff0c;而2026 年行业将全面转向以主动安全为核心的合规新生态 —— 这一转型既是政策持续收紧的必然结果&#x…

作者头像 李华
网站建设 2026/6/22 18:39:46

微服务架构设计 - 可降级设计

引言 在金融科技领域&#xff0c;系统的稳定性和连续性是企业的生命线。面对突发故障或流量洪峰&#xff0c;简单粗暴的“挂维护页”或“整体下线”策略不仅造成巨大的业务损失&#xff0c;更可能因引发用户恐慌和资金流动性问题而威胁企业生存。真正的鲁棒性&#xff0c;在于…

作者头像 李华
网站建设 2026/6/22 23:11:21

计算机图形学·25 消隐2 区域子分算法-光线投射算法

本文为记录专业课计算机图形学的部分笔记&#xff0c;参考教材为Angel的第八版交互式计算机图形学——基于WebGL 2.0的自顶向下方法。1、区域子分算法的由来&#xff1a;①Z缓存器算法&#xff0c;将像素孤立来考虑&#xff0c;未利用相邻像素之间存在的属性的连贯性&#xff0…

作者头像 李华
网站建设 2026/6/23 10:59:28

MinerU升级终极指南:避坑技巧与实战解决方案

还在为MinerU版本升级踩坑而头疼&#xff1f;作为资深技术老司机&#xff0c;我整理了这份避坑指南&#xff0c;帮你用最短时间、最少成本完成MinerU升级。这份指南将完全重构传统升级流程&#xff0c;采用"问题导向"思维&#xff0c;直击升级痛点。 【免费下载链接】…

作者头像 李华