RocketMq顺序消息示例
顺序消息最主要的是让生产的消息进入同一个队列中
rocketmq-client版本
在这个sdk版本中是要指定一个消息队列,这样消息才能按顺序进入主题下的同一个队列
至于顺序消息的概念建议还是看官方文档,文档地址:https://rocketmq.apache.org/zh/docs/featureBehavior/03fifomessage
普通java项目
消息生产
根据顺序消息的必要条件:
生产端:单一生产者和串行发送
服务端:确定服务器和确定的服务队列
所以我们在发送消息时要用到MessageQueue类,并在类里面设置服务提供者broker的名称和队列的id
示例代码如下:
@Test
public void sendFIFOMsg() throws UnknownHostException, InterruptedException, MQClientException, MQBrokerException, RemotingException {
String topic = "TestTopic";
String tag = "tagA";
String msg=null;
String msgId = null;
Message message =null;
String brokeName="broker-a";
MQProducer normalProduce = RocketMqUtil.getProduce("NORMAL");
// 用于确定消息发送到那台服务器的哪个主题下的哪个队列
MessageQueue messageQueue = new MessageQueue(topic,brokeName,0);
for(int i=0;i<10;i++){
msg="This is a normal message for Apache RocketMQ#序号:"+i;
msgId = "msgIdA"+i;
message = new Message(topic, tag, msgId, msg.getBytes(StandardCharsets.UTF_8));
SendResult send = normalProduce.send(message,messageQueue);
System.out.println(send);
}
}
消息消费
为了保证消息被顺序消费消费端也要满足以下条件:
业务方消费消息时需要严格按照接收---处理---应答的语义处理消息,避免因异步处理导致消息乱序。
设置合理的重试次数,避免参数不合理导致消息乱序。
顺序消费必须使用DefaultMQPushConsumer
并将最大并发线程和消费批量拉取消息的数量设置为1
使用MessageListenerOrderly
实现类监听器
/**
* 获取一个队列的消费者
* @param consumerGroup 消费者分组(用于负载均衡)
* @param topic 订阅的主题
* @param subExpression 订阅规则,根据标签进行过滤 例如:"tag1 || tag2 || tag3"
* @param messageListener 消息监听者
* @return
* @throws MQClientException
* @throws UnknownHostException
* @throws InterruptedException
*/
public static DefaultMQPushConsumer getConsume(String consumerGroup, String topic, String subExpression, MessageListener messageListener) throws MQClientException, UnknownHostException, InterruptedException {
DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer();
defaultMQPushConsumer.resetClientConfig(getClientConfig());
defaultMQPushConsumer.setConsumerGroup(consumerGroup);
defaultMQPushConsumer.subscribe(topic, subExpression);
//设置消费批量拉取消息的数量
defaultMQPushConsumer.setConsumeMessageBatchMaxSize(1);
//设置消费者的最大线程数
defaultMQPushConsumer.setConsumeThreadMax(1);
//设置消费者的最小线程数
defaultMQPushConsumer.setConsumeThreadMin(1);
// 对于需要慢速拉取的情况(如流控制场景),暂停拉取时间 或者又称消费失败再次消费的间隔时间
defaultMQPushConsumer.setSuspendCurrentQueueTimeMillis(30000);
if(messageListener instanceof MessageListenerOrderly) {
defaultMQPushConsumer.registerMessageListener((MessageListenerOrderly)messageListener);
} else if (messageListener instanceof MessageListenerConcurrently) {
defaultMQPushConsumer.registerMessageListener((MessageListenerConcurrently)messageListener);
}else {
defaultMQPushConsumer.registerMessageListener(messageListener);
}
defaultMQPushConsumer.start();
return defaultMQPushConsumer;
}
@Test
public void receiveFIFOMsg() throws InterruptedException, MQClientException, MQBrokerException, RemotingException, UnknownHostException {
String topic = "TestTopic";
String tag = "tagA";
//顺序消费
MessageListenerOrderly messageListenerOrderly = new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
System.out.println("msgs:"+JSON.toJSONString(msgs));
System.out.println("context:"+JSON.toJSONString(context));
return ConsumeOrderlyStatus.SUCCESS;
}
};
DefaultMQPushConsumer normal = RocketMqUtil.getConsume("NORMAL", topic, tag, messageListenerOrderly);
Thread.sleep(Long.MAX_VALUE);
normal.shutdown();
}
Springboot项目示例
目前找到的方法是创建一个只有一个队列的主题,然后使用普通的消息发送和接收的方法。其他方法还在探索中。
rocketmq-client-java版本
普通java项目
消息生产
消息消费
Springboot项目示例
消息生产
消息消费
本文是原创文章,采用 CC BY-NC-ND 4.0 协议,完整转载请注明来自 程序员流动的零一
评论
匿名评论
隐私政策
你无需删除空行,直接评论以获取最佳展示效果