RocketMQ死信队列

一、RocketMQ死信队列概述

RocketMQ是一个高性能、可靠性、可扩展性 broker 致力于处理大量数据,包括流式数据和批量数据,并且能够实现在线扩容。RocketMQ支持死信队列(Dead Letter Queue,DLQ),可以将无法被使用者消费的消息转发到指定的 topic 或者 queue 中去,死信队列就是为了解决这些问题而生的。

对于一些无法被用户正常消费的消息,RocketMQ会根据一些设定规则,最终将这些问题消息发送到RocketMQ死信队列中,从而引导一系列的流程操作。

二、RocketMQ死信队列要自己配

RocketMQ死信队列需要手动配制,这个最好在消费者端配制,如果在消费者配置消费者类为顺序消息消费者,需要在组消费者中增加一个成员来消费死信消息。以下是使用Java SDK 实现创建消费者及相关配制的示例:

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group_name");
consumer.setNamesrvAddr("localhost:9876");
consumer.setMessageModel(MessageModel.CLUSTERING);
consumer.subscribe("topic_name", "*");
consumer.setConsumeThreadMax(1);
consumer.setConsumeThreadMin(1);
consumer.setInstanceName("consumer");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
consumer.setConsumeTimeout(60);
consumer.setAllocateMessageQueueStrategy(new AllocateMessageQueueAveragely());
consumer.setMessageListener(new MessageListenerConcurrently() {
  public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) {
    // business logic
    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  }
});
consumer.start();

// 死信队列配制
String groupName = "your_group_name";
String topic = "your_topic_name";
String origTopic = "your_topic_name";
String subExpression = "your_sub_expression";
int maxRetryTimes = 5;
long retryInterval = 60L;

String dlqTopic = MixAll.getRetryTopic(groupName);
String dlqRealTopic = String.format("%s%s", dlqTopic, System.currentTimeMillis());
String dlqSubExpression = String.format("%s&&%s", subExpression, MessageConst.PROPERTY_RETRY_TOPIC + "==" + dlqTopic);
CreateTopicKey topicKey = new CreateTopicKey(dlqTopic, org.apache.rocketmq.common.protocol.RequestCode.UPDATE_AND_CREATE_TOPIC);
TopicConfig topicConfig = new TopicConfig(dlqTopic);
groupConfig.getTopicConfigTable().put(dlqTopic, topicConfig);
table.put(topicKey, topicConfig.buildTopicSetting());

producer.send(new Message(origTopic, dlqSubExpression, "", new byte[] {1, 2, 3}), new SendCallback() {
  public void onSuccess(SendResult sendResult) {
    // do something
  }
  public void onException(Throwable e) {
    // do something
  }
});

三、RocketMQ死信队列保证消息被消费

RocketMQ生产者将消息发送到 broker 后,broker会将消息持久化到磁盘,保证消息不会丢失。RocketMQ消费者一旦收到消息后,就会自动提交 offset,表明将此条消息消费完整,所以在 broker处于重启等一些情况下,已经消费成功但未来得及提交 offset 的消息会再次被 broker 发送给消费者消费。

RocketMQ的重复删除机制有两种,第一种是使用默认的消费重试次数机制,重试这个机制在一定时间内会不重不漏地把消息发送给消费者,保障消息不丢失。如果运行消费者的服务器出现宕机,这个时间间隔可能会被限制。第二种是使用死信队列。

四、RocketMQ死信队列消费

RocketMQ的死信队列机制为保障消息不丢失提供了一种方式,分别从定义、实现、应用场景几个方面介绍死信队列的消费机制。

定义:

public enum ConsumeStatus {
    SUCCESS,
    FAIL,
    EXCEPTION,
    ;
}

实现:

public interface MessageListener {
    ConsumeStatus consumeMessage(MessageExt messageExt);
}

应用场景:

比如一个新用户提交了订单,但是由于某些原因,该订单的数据中缺少了一些必须的字段。这部分订单消息就可以通过 RocketMQ发往死信队列,等待相关的人员来处理。又比如当 RocketMQ 消费端出现异常,RocketMQ的死信队列可以帮助消费者重新消费这些过期或异常的消息,确保消息不丢失。

五、RocketMQ死信队列处理

RocketMQ消费者接收到消息后,可以判断一下消费是否成功。如果消费失败,则可以根据消息中的关联信息尝试进行重试、或者将消息发送到死信队列。

业务处理时如果出现异常,可以在消费端将消息发送到RocketMQ死信队列中,业务处理时,调用方法:reconsumeLater();后的传入为设置该消息再次消费的时间,此时消息会被放入到 RocketMQ 的 “重试队列”,若多次发生了消费失败,则最终会被发送到 DLQ 中。

以下是使用Java SDK在消费者端将消息发送到RocketMQ死信队列的示例:

public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) {
  // 处理业务逻辑
  return ConsumeConcurrentlyStatus.RECONSUME_LATER; // 如果消费失败,将该消息告知 broker,等待 broker 根据系统设置进行后续操作,如重新消费或进入死信队列
}

六、RocketMQ死信队列的使用场景

RocketMQ死信队列的具体使用场景如下:

1.消息延迟消费

我们可以将一些带有延迟消费的消息通过 RocketMQ定时储存在 broker 中,等到指定时间后再由 RocketMQ 发送给消费方进行消费。如果在指定时间内消费成功,则 broker 会将消息标记为已消费。否则,就会自动发送到死信队列。

2.异常消息重新消费

考虑到消费者在消息处理过程中,可能会出现脏数据、数据异常或者处理失败的情况,在消息重试了几次之后,依然不能被正常地消费,那么就可以将这些消费失败、过期数据等通过设置重试机制和死信队列机制来优雅地处理。RocketMQ死信队列就是可以让这些消费无法完成的消息正常被消费的解决方案。

3.流量负载与限流

可以通过死信队列来实现简单的负载均衡,将某些 topic 的消息发送到这个队列,这样的话就可以避免在消息消费处理失败过程中对 consumer 造成过于严重的负载压力,从而降低服务的风险,提高整个架构的稳定性。

七、RabbitMQ死信队列使用场景

RabbitMQ同样支持死信队列,但它与RocketMQ实现方式并不相同。RabbitMQ的 DLQ 需要为每个需要死信队列的 queue 配置一个单独的 exchange,而配置与代码示例如下:

//在生产端声明死信exchange
channel.exchangeDeclare("dlx.exchange", "topic", true);

//声明队列,并设置队列的 x-dead-letter-exchange 和 x-dead-letter-routing-key
Map<String, Object> arguments = new HashMap<String, Object>();
arguments.put("x-dead-letter-exchange", "dlx.exchange");
arguments.put("x-dead-letter-routing-key", "dlqKey");
channel.queueDeclare("test.queue", true, false, false, arguments);

以上示例中death_exchange就是死信exchange,必须先绑定一个死信的exchange,然后在设置原队列的死信队列绑定到它上面,这样就完成了死信的设置。

请登录后发表评论

    没有回复内容