介绍

如果消息重试次数超过了最大的重试次数,那么RocketMQ就会把这个消息丢到死信队列中。RocketMQ默认的重试次数是16次。见源码 org.apache.rocketmq.common.subscription.SubscriptionGroupConfig中的retryMaxTimes属性。 这个重试次数可以在消费者端进行配置。 例如 DefaultMQPushConsumer实例中有个setMaxReconsumeTimes方法 指定重试次数。

  • 一个死信队列对应的消费者组,而且会包含这个消费者组的所有死信消息,而不会区分该消息属于哪个topic

  • 如果消费者组没有产生死信消息,RocketMQ就不会为其创建相应的死信队列

  • 默认创建的死信队列,消息是无法被读取的,因为权限perm默认为2,如果想要被消费,需要更改perm的值

消息重试策略差异

据消费者类型不同,消息重试策略的具体内部机制和设置方法有所不同,具体差异如下

消费者类型

重试过程状态机

重试间隔

最大重试次数

PushConsumer

已就绪 处理中 待重试 提交 * 死信

消费者分组创建时元数据控制。 无序消息:阶梯间隔 顺序消息:固定间隔时间

消费者分组创建时的元数据控制。

SimpleConsumer

已就绪 处理中 提交 死信

通过API修改获取消息时的不可见时间。

消费者分组创建时的元数据控制。

至于PushConsumer和SimpleConsumer的详细差异,请参见官网PushConsumer消费重试策略SimpleConsumer消费重试策略

在这里我们只是了解一下死信队列创建和消费其中消息的过程。所以我们以PushConsumer举例

PushConsumer的最大重试次数由消费者分组创建时的元数据控制,具体参数为 maxReconsumeTimes 最大投递次数。默认为16

例如,最大重试次数为3次,则该消息最多可被投递4次,1次为原始消息,3次为重试投递次数。

重试投递还与消息的类型有关联

  • 无序消息(非顺序消息):重试间隔为阶梯时间,具体时间如下:

第几次重试

与上次重试的间隔时间

第几次重试

与上次重试的间隔时间

1

10秒

9

7分钟

2

30秒

10

8分钟

3

1分钟

11

9分钟

4

2分钟

12

10分钟

5

3分钟

13

20分钟

6

4分钟

14

30分钟

7

5分钟

15

1小时

8

6分钟

16

2小时

若重试次数超过16次,后面每次重试间隔都为2小时

  • 顺序消息:重试间隔为固定时间,具体取值参数为 suspendCurrentQueueTimeMillis 暂停拉取时间

代码示例

消息生产者

    private static ClientConfig getClientConfig() throws UnknownHostException {
        ClientConfig clientConfig = new ClientConfig();
        clientConfig.setNamesrvAddr(ENDPOINT);
        clientConfig.setAccessChannel(AccessChannel.LOCAL);
        clientConfig.setClientIP(InetAddress.getLocalHost().getHostAddress());
        clientConfig.setInstanceName(INSTANCE_NAME);
        clientConfig.setMqClientApiTimeout(30000);
        return clientConfig;
    }

public static DefaultMQProducer getProduce(String producerGroup) throws MQClientException, UnknownHostException, InterruptedException {
        DefaultMQProducer producer = new DefaultMQProducer(producerGroup);
        producer.resetClientConfig(getClientConfig());
        producer.setSendMsgTimeout(30000);
        producer.start();
        return producer;
    }


@Test
public void sendNormalMsg() throws UnknownHostException, MQClientException, MQBrokerException, RemotingException, InterruptedException {
        String topic = "TestTopic";
        DefaultMQProducer normalProduce = getProduce("NORMAL");
        String msg=null;
        String tag = "tagA";
        String msgId = null;
        Message message =null;
        for(int i=0;i<10;i++){
            msg="This is a normal message for Apache RocketMQ#序号:"+i;
            msgId = "msgIdA"+i;
            message = new Message(topic, tag, msgId, msg.getBytes(StandardCharsets.UTF_8));
            SendResult send = normalProduce.send(message);
            System.out.println(send);
        }
//        normalProduce.shutdown();
    }

消息消费者,为了快速看到效果把消息最大重试次数设置为1,并返回消息失败

public static DefaultMQPushConsumer getConsume(String consumerGroup, String topic, String subExpression, MessageListener messageListener) throws MQClientException, UnknownHostException, InterruptedException {
        DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer();
        defaultMQPushConsumer.resetClientConfig(getClientConfig());
        defaultMQPushConsumer.setConsumerGroup(consumerGroup);
        defaultMQPushConsumer.subscribe(topic, subExpression);
        //设置消费批量拉取消息的数量
        defaultMQPushConsumer.setConsumeMessageBatchMaxSize(1);
        //设置消费者的最大线程数
        defaultMQPushConsumer.setConsumeThreadMax(1);
        //设置消费者的最小线程数
        defaultMQPushConsumer.setConsumeThreadMin(1);
        // 对于需要慢速拉取的情况(如流控制场景),暂停拉取时间 或者又称消费失败再次消费的间隔时间
        defaultMQPushConsumer.setSuspendCurrentQueueTimeMillis(10000);
        //设置消费模式 集群消费还是广播消费
        defaultMQPushConsumer.setMessageModel(MessageModel.CLUSTERING);
        // 设置最大重试次数
        defaultMQPushConsumer.setMaxReconsumeTimes(1);
//        defaultMQPushConsumer.setPullBatchSize(1);
//        defaultMQPushConsumer.setPopBatchNums(1);
        if(messageListener instanceof MessageListenerOrderly) {
            defaultMQPushConsumer.registerMessageListener((MessageListenerOrderly)messageListener);
        } else if (messageListener instanceof MessageListenerConcurrently) {
            defaultMQPushConsumer.registerMessageListener((MessageListenerConcurrently)messageListener);
        }else {
            defaultMQPushConsumer.registerMessageListener(messageListener);
        }
        defaultMQPushConsumer.start();
        return defaultMQPushConsumer;
 }



 @Test
public void receiveNormalMsg() throws InterruptedException, MQClientException, MQBrokerException, RemotingException, UnknownHostException {
        String topic = "TestTopic";
        String tag = "tagA";
        //顺序消费
        MessageListenerOrderly messageListenerOrderly = new MessageListenerOrderly() {
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                System.out.println("msgs:"+JSON.toJSONString(msgs));
                System.out.println("context:"+JSON.toJSONString(context));
                return ConsumeOrderlyStatus.SUCCESS;
            }
        };
        //并发接收消息
        MessageListenerConcurrently messageListenerConcurrently = new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                System.out.println("msgs:" + JSON.toJSONString(msgs));
//                System.out.println("context:" + JSON.toJSONString(context));
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        };

        DefaultMQPushConsumer normal = getConsume("NORMAL", topic, tag, messageListenerConcurrently);
        Thread.sleep(Long.MAX_VALUE);
        normal.shutdown();
    }

然后我们就可以在控制台看到对应的死信队列了

如果perm是2一定要改为6

死信队列消费者

@Test
    public void receiveDLQMsg() throws InterruptedException, MQClientException, MQBrokerException, RemotingException, UnknownHostException {
        String topic = "%DLQ%NORMAL";
        String tag = "*";
        //并发接收消息
        MessageListenerConcurrently messageListenerConcurrently = new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                System.out.println("msgs:" + JSON.toJSONString(msgs));
//                System.out.println("context:" + JSON.toJSONString(context));
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        };

        DefaultMQPushConsumer normal = RocketMqUtil.getConsume("NORMAL", topic, tag, messageListenerConcurrently);
        Thread.sleep(Long.MAX_VALUE);
        normal.shutdown();
    }