RocketMQ业务保障-中间件专区论坛-技术-SpringForAll社区

RocketMQ业务保障

1. 为什么从业务上保证

1.1 消息丢失问题

RocketMQ虽然号称消息不会丢失,但是还是有几率存在MQ宕机以及rocketMQ使用上的问题可能存在消息丢失等,对于类似于支付确认的消息一般来说是一条都不允许丢失的

1.2 消息幂等

在网络环境中,由于网络不稳定等因素,消息队列的消息有可能出现重复,大概有以下几种:

发送时消息重复 当一条消息已被成功发送到服务端并完成持久化,此时出现了网络闪断或者客户端宕机,导致服务端对客户端应答失败。 如果此时生产者意识到消息发送失败并尝试再次发送消息,消费者后续会收到两条内容相同并且 Message ID 也相同的消息。

投递时消息重复 消息消费的场景下,消息已投递到消费者并完成业务处理,当客户端给服务端反馈应答的时候网络闪断。 为了保证消息至少被消费一次,消息队列 RocketMQ 的服务端将在网络恢复后再次尝试投递之前已被处理过的消息,消费者后续会收到两条内容相同并且 Message ID 也相同的消息。

负载均衡时消息重复

包括但不限于网络抖动、Broker 重启以及订阅方应用重启.

当消息队列 RocketMQ 的 Broker 或客户端重启、扩容或缩容时,会触发 Rebalance,此时消费者可能会收到重复消息。

结合三种情况,可以发现消息重发的最后结果都是,消费者接收到了重复消息,那么,我们只需要在消费者端统一进行幂等处理就能够实现消息幂等。

2. 整体架构

上面我们是从MQ本身来保证消息的的可靠性,下面我们从业务上来分析如何保障MQ的可靠性,Mq 的消息成功投递和消费是比较难的,这里提供一个思路,如何保证消息成功投递并且消息是幂等的。 在这里插入图片描述

2.1 表结构设计

这里面涉及到两张表

2.1.1 消息存根

消息发送到 mq 之前先把消息存储到 mongodb 或者 mysql 中,消息有一个存根是为了避免消息在 mq 中丢失后就找不到发送的消息,主要需要保存几个关键信息:

  • messageId: 消息流水号,用来在消息的生产端和消费端串联使用,根据这个id找到消费端和生产端唯一的消息

  • messageContent:消息内容

  • count:发送到 broker 的次数,如果超过特定次数就不往 broker 发

  • status:消息状态,已确认、未确认、已作废,如果消费端已经确认就需要修改该状态

  • offset:偏移量,发送的时候记录消息的偏移量。

  • resendTime:重发时间,需要加上消息可能在队列的堆积时间,否则可能造成消息还未被消费到就被重发了

  • createDatre:记录消息的发送时间

2.1.2 消息日志

为了保障消息能够成功发送到MQ,需要在拉取到消息后的第一时间将消息ID保存到消息日志表,用来让发送端知道消息消息是否成功发送到了MQ,需要包含一下字段:

  • messageId: 消息流水号,为了和消息存根来保持一致

  • offset:用来记录当前接收到的消息的偏移量,重发时需要通过统计最大偏移量来确定消费者是否堆积来确定是否重发

  • createDate:用来记录当前消息接收到的时间

3. 幂等性校验

3.1 Redis幂等性校验

首先进行Redis的setNX进行幂等性校验,有以下情况

  • 成功:代表redis幂等性校验通过,为了防止Redis数据误删或者过期,需要进行数据库的幂等性校验,检查消息存根是否是未处理状态,如果是未处理状态则进行业务,则说明消息是未处理状态,否则幂等性校验失败

  • 失败:代表redis幂等性校验未通过,则不通过数据库直接进行校验失败处理

3.2 DB幂等性校验

如果Redis校验通过,还需要DB进行幂等性校验,有以下情况

  • 成功:说明消息确定时未处理的,需要进行处理

  • 失败:说明Redis缓存已经过期或者误删,这里做兜底处理

3.3 校验失败处理

幂等性校验失败则说明消息是重复,存在一下两种情况

Redis校验失败 如果直接Redis的setNX校验失败,说明是重复消息,但是这个时候消息是不知道消息是否处理完成,有以下两种情况:

  • 消息处理完成:需要删除消息日志以及返回成功消费的标志

  • 消息未处理完成:这个时候说明已经有一个线程在处理消息了,直接结束并返回成功消费的标志。

DB校验失败 这个时候说明消息已经处理完成了,redis并且已经通过setNX已经设置标志了,需要删除消息日志,并且返回成功标志。

3.4 校验成功

幂等性校验成功后就需要处理业务操作了

4. 业务处理

业务操作分为两种情况:

4.1 操作成功

如果操作成功,需要修改消息存根状态,并且删除消息日志,然后返回成功标志

4.2 操作失败

如果操作失败,需要消费端重试,这个时候删除redis的setNX的值,并且返回重试指令,让消费端进行重试,但是重试可能一直不成功,RocketMQ的消费端重试机制,达到上限后会投递到死信队列,后期需要人工处理。

5. 定时重发消息

5.1 重发消息筛选

需要符合一下条件的数据才会被筛选出来

  1. 消息存根中未确认的消息

  2. 消息存根中发送次数小于最大发送次数的消息

  3. 消息存根中下次发送时间大于当前时间的消息

  4. 消息存根中的偏移量小于消息日志中的最大偏移量的消息

  5. 消息存根中的消息ID在消息日志中不存在的消息

5.2 重发消息

符合以上条件的消息需要进行重发,调用MQProducer客户端进行重发消息,重发完成后还需要做以下事情

  1. 修改消息的重发次数,当前次数+1

  2. 修改消息的下次重发时间,通过规则计算需要间隔多长时间才需要重发

  3. 修改消息的偏移量,设置为当前发送消息的的偏移量

5.3 人工处理

如果消息的发送次数达到最大的发送次数,将无法进行重发,需要人工进行处理,可以通过发邮件以及其他的方式通知开发人员进行后续的人工处理

请登录后发表评论

    没有回复内容