什么是定时消息

定时消息是 Apache RocketMQ 提供的一种高级消息类型,消息被发送至服务端后,在指定时间后才能被消费者消费。通过设置一定的定时时间可以实现分布式场景的延时调度触发效果。

定时时间设置原则

  • Apache RocketMQ 定时消息设置的定时时间是一个预期触发的系统时间戳,延时时间也需要转换成当前系统时间后的某一个时间戳,而不是一段延时时长。

  • 定时时间的格式为毫秒级的Unix时间戳,您需要将要设置的时刻转换成时间戳形式

  • 定时时间必须设置在定时时长范围内,超过范围则定时不生效,服务端会立即投递消息。

  • 定时时长最大值默认为24小时,不支持自定义修改

  • 定时时间必须设置为当前时间之后,若设置到当前时间之前,则定时不生效,服务端会立即投递消息。

使用限制

消息类型一致性

定时消息仅支持在 MessageType为Delay 的主题内使用,即定时消息只能发送至类型为定时消息的主题中,发送的消息的类型必须和主题的类型一致。

定时精度约束

Apache RocketMQ 定时消息的定时时长参数精确到毫秒级,但是默认精度为1000ms,即定时消息为秒级精度。

Apache RocketMQ 定时消息的状态支持持久化存储,系统由于故障重启后,仍支持按照原来设置的定时时间触发消息投递。若存储系统异常重启,可能会导致定时消息投递出现一定延迟

rocketmq-client版本

  • 这个定时和延时两种方式设置方式,两种设置方式都需转成毫秒

  • 使用延时偏移间隔时间时注意以下两点

    • 客户端时间和服务端时间要保持一致

    • 设置最大的偏移时间量不能超过24小时

  • 延时/定时消息接收和普通消息接收一样

为了方便计算时间相关项目引入hutool相关工具包

<dependency>
    <groupId>cn.hutool</groupId>
    <artifactId>hutool-core</artifactId>
    <version>5.8.26</version>
</dependency>

普通java项目

定时代码示例

@Test
public void  sendDeliverMsg() throws UnknownHostException, InterruptedException, MQClientException, MQBrokerException, RemotingException {
    String topic = "DelayTopic";
    String tag = "Java";
    String msg=null;
    String msgKey = null;
    Message message =null;
    MQProducer delayProduce = RocketMqUtil.getProduce("DelayProduce");
    Date now = new Date();
    String nowDateString = DateUtil.formatDateTime(now);
    System.out.println("当前时间:"+nowDateString);
    for(int i=0;i<10;i++){
        msg="This is a delay message for Apache RocketMQ#序号:"+i;
        msgKey = "delayMsgKey:"+i;
        DateTime delayTime = DateUtil.offsetMinute(now, i+3);
        message = new Message(topic, tag, msgKey, msg.getBytes(StandardCharsets.UTF_8));
        // 设置指定过期时间
        message.setDeliverTimeMs(delayTime.getTime());
        System.out.println("当前消息Key"+msgKey+"过期时间为:"+DateUtil.formatDateTime(delayTime)+",过期时间戳:"+delayTime.getTime());
        SendResult send = delayProduce.send(message);

        System.out.println(send);
    }
}

延时消息代码示例

@Test
public void  sendDelayMsg() throws UnknownHostException, InterruptedException, MQClientException, MQBrokerException, RemotingException {
    String topic = "DelayTopic";
    String tag = "Java";
    String msg=null;
    String msgKey = null;
    Message message =null;
    MQProducer delayProduce = RocketMqUtil.getProduce("DelayProduce");
    Date now = new Date();
    String nowDateString = DateUtil.formatDateTime(now);
    System.out.println("当前时间:"+nowDateString);
    for(int i=0;i<10;i++){
        msg="This is a delay message for Apache RocketMQ#序号:"+i;
        msgKey = "delayMsgKey:"+i;
        DateTime delayTime = DateUtil.offsetMinute(now, i+3);
        message = new Message(topic, tag, msgKey, msg.getBytes(StandardCharsets.UTF_8));
        // 设置间隔过期时间
        message.setDelayTimeMs(delayTime.getTime()-System.currentTimeMillis());
        System.out.println("当前消息Key"+msgKey+"过期时间为:"+DateUtil.formatDateTime(delayTime)+",过期时间戳:"+delayTime.getTime());
        SendResult send = delayProduce.send(message);

        System.out.println(send);
    }
}

接收消息示例

@Test
public void receiveDelayMsg() throws InterruptedException, MQClientException, MQBrokerException, RemotingException, UnknownHostException {
    String topic = "DelayTopic";
    String tag = "Java";
    MessageListenerOrderly messageListenerOrderly = new MessageListenerOrderly() {
        @Override
        public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
            System.out.println("MsgKey:" + JSON.toJSONString(msgs.get(0).getKeys()));
            return ConsumeOrderlyStatus.SUCCESS;
        }
    };
    DefaultMQPushConsumer delayConsumer = RocketMqUtil.getConsume("DelayConsumer", topic, tag, messageListenerOrderly);
    Thread.sleep(Long.MAX_VALUE);
    delayConsumer.shutdown();
}

Springboot项目示例

定时代码示例

@Test
void sendDeliverMsg() {
    Date now = new Date();
    String nowDateString = DateUtil.formatDateTime(now);
    log.info("当前时间:"+nowDateString);
    for (int i = 0; i < 10; i++) {
        DateTime delayTime = DateUtil.offsetMinute(now, i);
        log.info("过期时间为:"+DateUtil.formatDateTime(delayTime)+",过期时间戳:"+delayTime.getTime());
        SendResult sendResult = rocketMQTemplate.syncSendDeliverTimeMills("DelayTopic:spring-boot", "Delay Msg "+i,delayTime.getTime());
        log.info(String.valueOf(sendResult));
    }
}

延时消息代码示例

@Test
void sendDelayMsg() {
    Date now = new Date();
    String nowDateString = DateUtil.formatDateTime(now);
    log.info("当前时间:"+nowDateString);
    for (int i = 0; i < 10; i++) {
        DateTime delayTime = DateUtil.offsetMinute(now, i);
        log.info("过期时间为:"+DateUtil.formatDateTime(delayTime)+",过期时间戳:"+delayTime.getTime());
        SendResult sendResult = rocketMQTemplate.syncSendDelayTimeMills("DelayTopic:spring-boot", "Delay Msg "+i,delayTime.getTime()-System.currentTimeMillis());
        log.info(String.valueOf(sendResult));
    }
}

接收消息示例

@Slf4j
@Component
@RocketMQMessageListener(topic = "DelayTopic",consumerGroup = "${rocketmq.consumer.group}",selectorExpression="spring-boot")
public class DelayConsumerService implements RocketMQListener<String> {
    @Override
    public void onMessage(String message) {
        log.info("接收到的消息为:{}", message);
    }
}

rocketmq-client-java版本

普通java项目

Springboot项目示例