WEBKT

Spring Boot + Seata 实现 MySQL 与 MongoDB 的分布式事务

63 0 0 0

在微服务架构改造中,我们经常会遇到跨数据库的事务问题。最近团队在进行微服务改造,涉及到将老系统的 MySQL 数据迁移到新业务的 MongoDB,同时还需要保证数据的一致性。领导要求在不引入过于复杂第三方组件的前提下,实现数据强一致性。经过调研,我们选择了 Seata 作为分布式事务解决方案。

本文将通过一个具体的案例,详细介绍如何在 Spring Boot 中优雅地配置 Seata,并尽量避免编写过多的模板代码。

案例背景

假设我们有一个订单服务,需要同时更新 MySQL 中的订单表和 MongoDB 中的订单日志表。为了保证数据一致性,我们需要使用分布式事务。

  • MySQL: 存储订单的基本信息。
  • MongoDB: 存储订单的操作日志。

Seata 简介

Seata 是一款开源的分布式事务解决方案,提供了 AT、TCC、SAGA 和 XA 事务模式。 在本例中,我们使用 AT 模式,它具有对业务代码侵入性小的优点。

环境准备

  1. MySQL: 确保 MySQL 服务正常运行,并创建订单表。
  2. MongoDB: 确保 MongoDB 服务正常运行,并创建订单日志表。
  3. Seata Server: 下载 Seata Server 并启动。 可以从 Seata 的官方 GitHub 仓库下载最新版本。
  4. Spring Boot 项目: 创建一个 Spring Boot 项目,并引入以下依赖:
        <dependency>
            <groupId>io.seata</groupId>
            <artifactId>seata-spring-boot-starter</artifactId>
            <version>1.7.0</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-jpa</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-mongodb</artifactId>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <scope>runtime</scope>
        </dependency>

配置 Seata

  1. application.yml 配置:
spring:
  application:
    name: order-service
  cloud:
    alibaba:
      seata:
        tx-service-group: order-service_tx_group #事务组名
  datasource:
    names: db_order,mongo_order # 数据源名称
    db_order: # MySQL 数据源配置
      url: jdbc:mysql://localhost:3306/order_db?useUnicode=true&characterEncoding=utf8&useSSL=false
      username: root
      password: your_mysql_password
      driver-class-name: com.mysql.cj.jdbc.Driver
    mongo_order: # MongoDB 数据源配置
      uri: mongodb://localhost:27017/order_log
  jpa:
    hibernate:
      ddl-auto: update
server:
  port: 8081

seata:
  enabled: true # 启用 Seata
  application-id: order-service
  tx-service-group: order-service_tx_group
  service:
    vgroup-mapping:
      order-service_tx_group: default # 事务组映射到 Seata Server
  1. DataSourceProxy 配置:

Seata 需要对数据源进行代理,才能实现 AT 模式的事务控制。

import com.zaxxer.hikari.HikariDataSource;
import io.seata.rm.datasource.DataSourceProxy;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.jdbc.DataSourceProperties;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;

import javax.sql.DataSource;

@Configuration
public class DataSourceConfig {

    @Bean
    @Primary
    @ConfigurationProperties("spring.datasource.db_order")
    public DataSourceProperties orderDataSourceProperties() {
        return new DataSourceProperties();
    }

    @Bean("orderDataSource")
    @Primary
    public DataSource orderDataSource(DataSourceProperties orderDataSourceProperties) {
        return orderDataSourceProperties.initializeDataSourceBuilder().type(HikariDataSource.class).build();
    }

    @Bean("orderDataSourceProxy")
    public DataSourceProxy orderDataSourceProxy(@Qualifier("orderDataSource") DataSource dataSource) {
        return new DataSourceProxy(dataSource);
    }

}
  1. GlobalTransactionScanner 配置:

Spring Boot 启动时,Seata 会自动扫描被 @GlobalTransactional 注解的方法,并开启分布式事务。

import io.seata.spring.annotation.GlobalTransactionScanner;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class SeataConfig {

    @Bean
    public GlobalTransactionScanner globalTransactionScanner() {
        return new GlobalTransactionScanner("order-service", "order-service_tx_group");
    }
}

业务代码实现

  1. 定义实体类:
import javax.persistence.*;
import java.math.BigDecimal;

@Entity
@Table(name = "orders")
public class Order {

    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;
    private String orderNo;
    private BigDecimal amount;

    // 省略 getter 和 setter
}
import org.springframework.data.annotation.Id;
import org.springframework.data.mongodb.core.mapping.Document;

import java.util.Date;

@Document(collection = "order_logs")
public class OrderLog {

    @Id
    private String id;
    private String orderNo;
    private String operation;
    private Date createTime;

    // 省略 getter 和 setter
}
  1. 定义 Repository:
import org.springframework.data.jpa.repository.JpaRepository;

public interface OrderRepository extends JpaRepository<Order, Long> {
}
import org.springframework.data.mongodb.repository.MongoRepository;

public interface OrderLogRepository extends MongoRepository<OrderLog, String> {
}
  1. 定义 Service:
import io.seata.spring.annotation.GlobalTransactional;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import java.math.BigDecimal;
import java.util.Date;
import java.util.UUID;

@Service
public class OrderService {

    @Autowired
    private OrderRepository orderRepository;
    @Autowired
    private OrderLogRepository orderLogRepository;

    @GlobalTransactional(timeoutMills = 300000, name = "createOrder")
    @Transactional
    public void createOrder(BigDecimal amount) {
        // 1. 创建订单
        Order order = new Order();
        order.setOrderNo(UUID.randomUUID().toString());
        order.setAmount(amount);
        orderRepository.save(order);

        // 2. 记录订单日志
        OrderLog orderLog = new OrderLog();
        orderLog.setOrderNo(order.getOrderNo());
        orderLog.setOperation("创建订单");
        orderLog.setCreateTime(new Date());
        orderLogRepository.save(orderLog);

        // 模拟异常
        if (amount.compareTo(new BigDecimal("1000")) > 0) {
            throw new RuntimeException("订单金额超过 1000,触发异常");
        }
    }
}
  1. Controller:
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

import java.math.BigDecimal;

@RestController
public class OrderController {

    @Autowired
    private OrderService orderService;

    @PostMapping("/order/create")
    public String createOrder(@RequestParam("amount") BigDecimal amount) {
        try {
            orderService.createOrder(amount);
            return "创建订单成功";
        } catch (Exception e) {
            return "创建订单失败:" + e.getMessage();
        }
    }
}

测试

  1. 启动 Seata Server 和 Spring Boot 项目。
  2. 调用 POST /order/create 接口,传入不同的金额参数。
  3. 观察 MySQL 和 MongoDB 中的数据,验证事务是否生效。
  • 当金额小于等于 1000 时,MySQL 和 MongoDB 都会成功插入数据。
  • 当金额大于 1000 时,由于模拟异常,MySQL 和 MongoDB 都会回滚,保证数据一致性。

总结

本文通过一个简单的案例,介绍了如何在 Spring Boot 中使用 Seata 解决 MySQL 和 MongoDB 的分布式事务问题。 通过 @GlobalTransactional 注解,我们可以轻松地实现 AT 模式的事务控制,避免了编写大量的模板代码。 在实际项目中,我们可以根据具体的业务场景,选择合适的 Seata 事务模式,并进行相应的配置。

需要注意的是,AT 模式虽然对业务代码侵入性小,但仍然需要对数据源进行代理。 在生产环境中,需要仔细评估其性能影响,并进行相应的优化。

此外,Seata 还提供了 TCC、SAGA 和 XA 等事务模式,可以根据具体的业务场景选择合适的模式。 例如,TCC 模式适用于对性能要求较高的场景,SAGA 模式适用于长事务场景。

希望本文能够帮助大家更好地理解和使用 Seata,解决微服务架构中的分布式事务问题。

TechLead8 Seata分布式事务

评论点评