顺序消息最主要的是让生产的消息进入同一个队列中

rocketmq-client版本

在这个sdk版本中是要指定一个消息队列,这样消息才能按顺序进入主题下的同一个队列

至于顺序消息的概念建议还是看官方文档,文档地址:https://rocketmq.apache.org/zh/docs/featureBehavior/03fifomessage

普通java项目

消息生产

根据顺序消息的必要条件:

生产端:单一生产者和串行发送

服务端:确定服务器和确定的服务队列

所以我们在发送消息时要用到MessageQueue类,并在类里面设置服务提供者broker的名称和队列的id

示例代码如下:

 @Test
    public void  sendFIFOMsg() throws UnknownHostException, InterruptedException, MQClientException, MQBrokerException, RemotingException {
        String topic = "TestTopic";
        String tag = "tagA";
        String msg=null;
        String msgId = null;
        Message message =null;
        String brokeName="broker-a";
        MQProducer normalProduce = RocketMqUtil.getProduce("NORMAL");
        // 用于确定消息发送到那台服务器的哪个主题下的哪个队列
        MessageQueue messageQueue = new MessageQueue(topic,brokeName,0);
        for(int i=0;i<10;i++){
            msg="This is a normal message for Apache RocketMQ#序号:"+i;
            msgId = "msgIdA"+i;
            message = new Message(topic, tag, msgId, msg.getBytes(StandardCharsets.UTF_8));
            SendResult send = normalProduce.send(message,messageQueue);
            System.out.println(send);
        }

    }

消息消费

为了保证消息被顺序消费消费端也要满足以下条件:

  1. 业务方消费消息时需要严格按照接收---处理---应答的语义处理消息,避免因异步处理导致消息乱序。

  2. 设置合理的重试次数,避免参数不合理导致消息乱序。

顺序消费必须使用DefaultMQPushConsumer并将最大并发线程和消费批量拉取消息的数量设置为1 使用MessageListenerOrderly 实现类监听器

/**
     * 获取一个队列的消费者
     * @param consumerGroup 消费者分组(用于负载均衡)
     * @param topic 订阅的主题
     * @param subExpression  订阅规则,根据标签进行过滤 例如:"tag1 || tag2 || tag3"
     * @param messageListener 消息监听者
     * @return
     * @throws MQClientException
     * @throws UnknownHostException
     * @throws InterruptedException
     */
    public static DefaultMQPushConsumer getConsume(String consumerGroup, String topic, String subExpression, MessageListener messageListener) throws MQClientException, UnknownHostException, InterruptedException {
        DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer();
        defaultMQPushConsumer.resetClientConfig(getClientConfig());
        defaultMQPushConsumer.setConsumerGroup(consumerGroup);
        defaultMQPushConsumer.subscribe(topic, subExpression);
        //设置消费批量拉取消息的数量
        defaultMQPushConsumer.setConsumeMessageBatchMaxSize(1);
        //设置消费者的最大线程数
        defaultMQPushConsumer.setConsumeThreadMax(1);
        //设置消费者的最小线程数
        defaultMQPushConsumer.setConsumeThreadMin(1);
      // 对于需要慢速拉取的情况(如流控制场景),暂停拉取时间 或者又称消费失败再次消费的间隔时间
        defaultMQPushConsumer.setSuspendCurrentQueueTimeMillis(30000);
        if(messageListener instanceof MessageListenerOrderly) {
            defaultMQPushConsumer.registerMessageListener((MessageListenerOrderly)messageListener);
        } else if (messageListener instanceof MessageListenerConcurrently) {
            defaultMQPushConsumer.registerMessageListener((MessageListenerConcurrently)messageListener);
        }else {
            defaultMQPushConsumer.registerMessageListener(messageListener);
        }
        defaultMQPushConsumer.start();
        return defaultMQPushConsumer;
    }
@Test
    public void receiveFIFOMsg() throws InterruptedException, MQClientException, MQBrokerException, RemotingException, UnknownHostException {
        String topic = "TestTopic";
        String tag = "tagA";
        //顺序消费
        MessageListenerOrderly messageListenerOrderly = new MessageListenerOrderly() {
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                System.out.println("msgs:"+JSON.toJSONString(msgs));
                System.out.println("context:"+JSON.toJSONString(context));
                return ConsumeOrderlyStatus.SUCCESS;
            }
        };
        DefaultMQPushConsumer normal = RocketMqUtil.getConsume("NORMAL", topic, tag, messageListenerOrderly);
        Thread.sleep(Long.MAX_VALUE);
        normal.shutdown();
    }

Springboot项目示例

目前找到的方法是创建一个只有一个队列的主题,然后使用普通的消息发送和接收的方法。其他方法还在探索中。

rocketmq-client-java版本

普通java项目

消息生产

消息消费

Springboot项目示例

消息生产

消息消费