RocketMq死信队列
介绍
如果消息重试次数超过了最大的重试次数,那么RocketMQ就会把这个消息丢到死信队列中。RocketMQ默认的重试次数是16次。见源码 org.apache.rocketmq.common.subscription.SubscriptionGroupConfig中的retryMaxTimes属性。 这个重试次数可以在消费者端进行配置。 例如 DefaultMQPushConsumer实例中有个setMaxReconsumeTimes方法 指定重试次数。
一个死信队列对应的消费者组,而且会包含这个消费者组的所有死信消息,而不会区分该消息属于哪个topic
如果消费者组没有产生死信消息,RocketMQ就不会为其创建相应的死信队列
默认创建的死信队列,消息是无法被读取的,因为权限perm默认为2,如果想要被消费,需要更改perm的值
消息重试策略差异
据消费者类型不同,消息重试策略的具体内部机制和设置方法有所不同,具体差异如下
至于PushConsumer和SimpleConsumer的详细差异,请参见官网PushConsumer消费重试策略和SimpleConsumer消费重试策略。
在这里我们只是了解一下死信队列创建和消费其中消息的过程。所以我们以PushConsumer举例
PushConsumer的最大重试次数由消费者分组创建时的元数据控制,具体参数为 maxReconsumeTimes
最大投递次数。默认为16
例如,最大重试次数为3次,则该消息最多可被投递4次,1次为原始消息,3次为重试投递次数。
重试投递还与消息的类型有关联
无序消息(非顺序消息):重试间隔为阶梯时间,具体时间如下:
若重试次数超过16次,后面每次重试间隔都为2小时
顺序消息:重试间隔为固定时间,具体取值参数为
suspendCurrentQueueTimeMillis
暂停拉取时间
代码示例
消息生产者
private static ClientConfig getClientConfig() throws UnknownHostException {
ClientConfig clientConfig = new ClientConfig();
clientConfig.setNamesrvAddr(ENDPOINT);
clientConfig.setAccessChannel(AccessChannel.LOCAL);
clientConfig.setClientIP(InetAddress.getLocalHost().getHostAddress());
clientConfig.setInstanceName(INSTANCE_NAME);
clientConfig.setMqClientApiTimeout(30000);
return clientConfig;
}
public static DefaultMQProducer getProduce(String producerGroup) throws MQClientException, UnknownHostException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer(producerGroup);
producer.resetClientConfig(getClientConfig());
producer.setSendMsgTimeout(30000);
producer.start();
return producer;
}
@Test
public void sendNormalMsg() throws UnknownHostException, MQClientException, MQBrokerException, RemotingException, InterruptedException {
String topic = "TestTopic";
DefaultMQProducer normalProduce = getProduce("NORMAL");
String msg=null;
String tag = "tagA";
String msgId = null;
Message message =null;
for(int i=0;i<10;i++){
msg="This is a normal message for Apache RocketMQ#序号:"+i;
msgId = "msgIdA"+i;
message = new Message(topic, tag, msgId, msg.getBytes(StandardCharsets.UTF_8));
SendResult send = normalProduce.send(message);
System.out.println(send);
}
// normalProduce.shutdown();
}
消息消费者,为了快速看到效果把消息最大重试次数设置为1,并返回消息失败
public static DefaultMQPushConsumer getConsume(String consumerGroup, String topic, String subExpression, MessageListener messageListener) throws MQClientException, UnknownHostException, InterruptedException {
DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer();
defaultMQPushConsumer.resetClientConfig(getClientConfig());
defaultMQPushConsumer.setConsumerGroup(consumerGroup);
defaultMQPushConsumer.subscribe(topic, subExpression);
//设置消费批量拉取消息的数量
defaultMQPushConsumer.setConsumeMessageBatchMaxSize(1);
//设置消费者的最大线程数
defaultMQPushConsumer.setConsumeThreadMax(1);
//设置消费者的最小线程数
defaultMQPushConsumer.setConsumeThreadMin(1);
// 对于需要慢速拉取的情况(如流控制场景),暂停拉取时间 或者又称消费失败再次消费的间隔时间
defaultMQPushConsumer.setSuspendCurrentQueueTimeMillis(10000);
//设置消费模式 集群消费还是广播消费
defaultMQPushConsumer.setMessageModel(MessageModel.CLUSTERING);
// 设置最大重试次数
defaultMQPushConsumer.setMaxReconsumeTimes(1);
// defaultMQPushConsumer.setPullBatchSize(1);
// defaultMQPushConsumer.setPopBatchNums(1);
if(messageListener instanceof MessageListenerOrderly) {
defaultMQPushConsumer.registerMessageListener((MessageListenerOrderly)messageListener);
} else if (messageListener instanceof MessageListenerConcurrently) {
defaultMQPushConsumer.registerMessageListener((MessageListenerConcurrently)messageListener);
}else {
defaultMQPushConsumer.registerMessageListener(messageListener);
}
defaultMQPushConsumer.start();
return defaultMQPushConsumer;
}
@Test
public void receiveNormalMsg() throws InterruptedException, MQClientException, MQBrokerException, RemotingException, UnknownHostException {
String topic = "TestTopic";
String tag = "tagA";
//顺序消费
MessageListenerOrderly messageListenerOrderly = new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
System.out.println("msgs:"+JSON.toJSONString(msgs));
System.out.println("context:"+JSON.toJSONString(context));
return ConsumeOrderlyStatus.SUCCESS;
}
};
//并发接收消息
MessageListenerConcurrently messageListenerConcurrently = new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.println("msgs:" + JSON.toJSONString(msgs));
// System.out.println("context:" + JSON.toJSONString(context));
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
};
DefaultMQPushConsumer normal = getConsume("NORMAL", topic, tag, messageListenerConcurrently);
Thread.sleep(Long.MAX_VALUE);
normal.shutdown();
}
然后我们就可以在控制台看到对应的死信队列了
如果perm是2一定要改为6
死信队列消费者
@Test
public void receiveDLQMsg() throws InterruptedException, MQClientException, MQBrokerException, RemotingException, UnknownHostException {
String topic = "%DLQ%NORMAL";
String tag = "*";
//并发接收消息
MessageListenerConcurrently messageListenerConcurrently = new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.println("msgs:" + JSON.toJSONString(msgs));
// System.out.println("context:" + JSON.toJSONString(context));
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
};
DefaultMQPushConsumer normal = RocketMqUtil.getConsume("NORMAL", topic, tag, messageListenerConcurrently);
Thread.sleep(Long.MAX_VALUE);
normal.shutdown();
}