利用RocketMQ解决分布式事务-Spring专区论坛-技术-SpringForAll社区

利用RocketMQ解决分布式事务

一、什么是分布式事务

1、问题场景

image

单服务操作多数据库情况

image

多服务操作单数据库情况

image

多服务操作多数据库,服务a和服务b之间相互调用情况

2、事务的特性

原子性、一致性、隔离性、持久化

3、常见的分布式解决方案

3.1 二阶段提交(2PC)

image

①阶段一:表决阶段,协调者将事务信息发送给各参与者,然后各参与者接收到事务请求后向协调者反馈自身是否有能力执行事务提交并记录undo和redo日志

②阶段二:执行阶段,协调者接收到各参与者的反馈后,再通知各参与者进行真正的事务提交或回滚,如果各个参与者都收到回滚,则会根据undo日志执行回滚操作

 

优点:

提高了达到数据一致性的可能性,原理简单,实现成本低。

缺点:

①单点问题:如果协调者宕机,整个流程将不可用

②性能问题,在第一阶段要等待所有节点反馈,才能进入第二阶段

③数据不一致,在执行阶段如果协调者发送崩溃,导致只有部分参与者收到提交消息,那么就会存在数据不一致的问题

 

3.2 三阶段提交(3PC)

image

三阶段提交是二阶段提交的改进版,主要有两个改动点:

(1) 引入超时机制

(2)插入一个准备阶段,三阶段提交分为CanCommit、PreCommit、DoCommit三个阶段

三个阶段的具体内容:

①CanCommit阶段:协调者向各参与者发送CanCommit请求,询问是否可以执行事务提交操作,各参与者都会响应Yes或No,如果是全部响应Yes,则会进入下一个阶段

②PreCommit阶段:协调者向各个参与者发送PreCommit请求,进入Prepared阶段。各参与者接收到PreCommit请求后,执行事务操作,并将undo和redo信息记录到事务日志中。如果都执行成功,则向协调者反馈成功指令,并等待协调者的下一次请求

③Docommit阶段:前两个阶段各参与者均反馈成功之后,协调者再向各参与者发送DoCommit请求真正提交事务,各参与者收到DoCommit请求之后,执行正式的事务提交。参与者完成事务提交之后向协调者反馈成功,协调者收到所有参与者成功反馈之后,完成事务。

二、RecketMQ实现分布式事务原理

RocketMQ虽然之前也支持分布式事务,但并没有开源,等到RocketMQ 4.3才正式开源。

1、基础概念

最终一致性

RocketMQ是一种最终一致性的分布式事务,就是说它保证的是消息最终一致性

Half Message(半消息)

是指暂不能被Consumer消费的消息。Producer 已经把消息成功发送到了 Broker 端,但此消息被标记为暂不能投递状态,处于该种状态下的消息称为半消息。需要 Producer对消息的二次确认后,Consumer才能去消费它。

消息回查

由于网络闪段,生产者应用重启等原因。导致 Producer 端一直没有对 Half Message(半消息) 进行 二次确认。这是Brock服务器会定时扫描长期处于半消息的消息,会

主动询问 Producer端 该消息的最终状态(Commit或者Rollback),该消息即为 消息回查

2、分布式事务交互流程

image

流程说明:

1、A服务先发送个Half Message给Brock端,消息中携带 B服务的信息。

2、当A服务知道Half Message发送成功后,那么开始第3步执行本地事务。

3、执行本地事务(会有三种情况1、执行成功。2、执行失败。3、网络等原因导致没有响应)

4.1)、如果本地事务成功,那么Product像Brock服务器发送Commit,这样B服务就可以消费该message。

4.2)、如果本地事务失败,那么Product像Brock服务器发送Rollback,那么就会直接删除上面这条半消息。

4.3)、如果因为网络等原因迟迟没有返回失败还是成功,那么会执行RocketMQ的回调接口,来进行事务的回查。

3、代码实战

实战场景:下单派送服务,用户选择商品之后调用订单服务创建订单,再调用派送服务发起派送

核心代码如下:

3.1 项目目录

image

3.2 项目依赖

<!--开启web服务-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!--jdbc-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-jdbc</artifactId>
</dependency>
<!--mysql驱动连接-->
<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
</dependency>
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.2.0</version>
</dependency>
<!--工具包-->
<dependency>
    <groupId>cn.hutool</groupId>
    <artifactId>hutool-all</artifactId>
    <version>5.7.22</version>
</dependency>

3.3 创建订单服务,发送半消息

@Service
public class OrderServiceImpl implements OrderService {
    @Autowired
    JdbcTemplate jdbcTemplate;
    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    @Override
    @Transactional
    public Integer createOrder(OrderDto orderDto) {
        String sql = "insert into t_order(order_id,shop_id) values(?,?)";
        int result = jdbcTemplate.update(sql, new Object[]{orderDto.getOrderId(), orderDto.getShopId()});
        return result;
    }
    @Override
    public String createOrderId(Integer id) {
        //生成订单id
        String order_id = UUID.randomUUID().toString();
        //通过商品信息生成订单信息
        OrderDto orderDto = new OrderDto(order_id, id);
        //组织生成mq消息
        MessageBuilder<String> tMessageBuilder = MessageBuilder.withPayload(JSONUtil.toJsonStr(orderDto));
        Message message = tMessageBuilder.build();
        //该消息不会被消费
        rocketMQTemplate.sendMessageInTransaction("springboot-producer-group", message, null);
        return order_id;
    }
    @Override
    public Integer queryOrder(String orderId) {
        String sql = "select count(1) from t_order where order_id=?";
        //根据订单查询是否存在订单信息
        Integer result = jdbcTemplate.queryForObject(sql, Integer.class, orderId);
        return result;
    }
}

3.4 创建RocketMQ事务监听

@Component
@RocketMQTransactionListener
public class OrderProducerListener implements RocketMQLocalTransactionListener {
    private static final Logger logger = LoggerFactory.getLogger(OrderProducerListener.class);
    @Autowired
    OrderService orderService;
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
        logger.info("开始执行本地事务{}", JSONUtil.toJsonStr(message));
        //根据不同的header信息分发到不同的业务场景中
        //MessageHeaders messageHeaders= message.getHeaders();
        //messageHeaders.get("");
        String msgJson = new String((byte[])message.getPayload());
        OrderDto orderDto=JSONUtil.toBean(msgJson,OrderDto.class);
        if (orderDto == null) {
            logger.info("订单数据为空事务状态设置为ROLLBACK。。。");
            return RocketMQLocalTransactionState.ROLLBACK;
        }
        try {
            //执行生成订单业务
            Integer result = orderService.createOrder(orderDto);
            if (result > 0) {
                //数据执行成功则确认消息
                logger.info("插入订单数据成功{}", result);
                //模拟发生异常
                //int i=1/0;
                return RocketMQLocalTransactionState.COMMIT;
            } else {
                //数据执行失败则回滚消息
                logger.info("插入订单数据失败{}", result);
                return RocketMQLocalTransactionState.ROLLBACK;
            }
        } catch (Exception e) {
            //发生异常进行回滚
            logger.error("执行本地事务发生异常:{}", e);
            return RocketMQLocalTransactionState.UNKNOWN;
        }
    }
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
        logger.info("开始回查本地事务{}", JSONUtil.toJsonStr(message));
        try {
            String msgJson = new String((byte[])message.getPayload());
            //TODO
            if (ObjectUtil.isNotNull(msgJson)) {
                OrderDto orderDto=JSONUtil.toBean(msgJson,OrderDto.class);
                //回查时检查是否生成订单数据
                Integer result = orderService.queryOrder(orderDto.getOrderId());
                if (result > 0) {
                    logger.info("数据已插入{}", result);
                    return RocketMQLocalTransactionState.COMMIT;
                } else {
                    //如果没有数据可以重新提交或者进行其他处理
                    return RocketMQLocalTransactionState.ROLLBACK;
                }
            }
        }catch (Exception e){
            logger.error("检查本地事务发生异常:{}",e);
            return RocketMQLocalTransactionState.ROLLBACK;
        }
        logger.info("结束本地事务状态查询:{}", JSONUtil.toJsonStr(message));
        return RocketMQLocalTransactionState.UNKNOWN;
    }
}

3.5 创建派送服务

@Service
public class DistributionServiceImpl implements DistributionService {
    private  final List<Integer> distributorList = new ArrayList<Integer>(){
        {
            //模拟配送员id
            add(1);
            add(2);
            add(3);
        }
    };
    @Autowired
    JdbcTemplate jdbcTemplate;
    @Override
    @Transactional
    public Integer distribution(String orderId) {
        String sql="insert into order_distribution (order_id,distributor) values (?,?)";
        //随机获取配送员id
        Integer distributor=distributorList.get(RandomUtil.randomInt(0,distributorList.size()));
        //插入数据
        int updateResult=jdbcTemplate.update(sql,new Object[]{orderId,distributor});
        return updateResult;
    }
}

3.6 创建Rocket消息监听

@RocketMQMessageListener(topic = "springboot-producer-group", consumerGroup = "spring_boot_distribution", selectorExpression = "*")
@Service
public class DistributionLisTener implements RocketMQListener<String>, RocketMQPushConsumerLifecycleListener {
    private static final Logger logger = LoggerFactory.getLogger(DistributionLisTener.class);
    @Autowired
    DistributionService distributionService;
    @Override
    public void onMessage(String o) {
        logger.info("开始消费消息:{}", o);
        Map<String, Object> msgMap = JSONUtil.toBean(o, Map.class);
        //模拟异常
        int i = 1 / 0;
        //执行分配业务
        distributionService.distribution((String) msgMap.get("orderId"));
    }

    @Override
    public void prepareStart(DefaultMQPushConsumer defaultMQPushConsumer) {
        // 设置最大重试次数
        defaultMQPushConsumer.setMaxReconsumeTimes(2);
    }
}

3.7 执行流程

订单服务:

调用创建订单服务–>生成订单号,发送半消息–>rocketmq本地事务监听,二次确认–>执行本地事务发送确认状态

派送服务:

监听Rocketmq消息,消费消息–>分配派送员生成订单数据

4、问题思考

为什么要先发送Half Message(半消息)

1)可以先确认 Brock服务器是否正常 ,如果半消息都发送失败了 那说明Brock挂了。

2)可以通过半消息来回查事务,如果半消息发送成功后一直没有被二次确认,那么就会回查事务状态。

什么情况会回查

1)执行本地事务的时候,由于突然网络等原因一直没有返回执行事务的结果(commit或者rollback)导致最终返回UNKNOW,那么就会回查。

2) 本地事务执行成功后,返回Commit进行消息二次确认的时候的服务挂了,在重启服务那么这个时候在brock端它还是个Half Message(半消息),这也会回查。

 

为什么说MQ是最终一致性事务

首先事务不一致的情况有两种,一种是 服务A执行失败,服务B执行成功 另一种是 服务A执行成功,服务B执行失败

第一种情况永远不会发生,因为服务A执行失败MQ就不会发送半消息,那么服务B就不会进行执行

第二种情况可能会发生,如果B最终执行失败,几乎可以断定就是代码有问题所以才引起的异常,因为消费端RocketMQ有重试机制,如果不是代码问题一般重试几次就能成功。

如果是代码的原因引起多次重试失败后,也没有关系,将该异常记录下来,由人工处理,人工兜底处理后,就可以让事务达到最终的一致性

请登录后发表评论

    没有回复内容