RocketMq定时/延时消息示例
什么是定时消息
定时消息是 Apache RocketMQ 提供的一种高级消息类型,消息被发送至服务端后,在指定时间后才能被消费者消费。通过设置一定的定时时间可以实现分布式场景的延时调度触发效果。
定时时间设置原则
Apache RocketMQ 定时消息设置的定时时间是一个预期触发的系统时间戳,延时时间也需要转换成当前系统时间后的某一个时间戳,而不是一段延时时长。
定时时间的格式为毫秒级的Unix时间戳,您需要将要设置的时刻转换成时间戳形式
定时时间必须设置在定时时长范围内,超过范围则定时不生效,服务端会立即投递消息。
定时时长最大值默认为24小时,不支持自定义修改
定时时间必须设置为当前时间之后,若设置到当前时间之前,则定时不生效,服务端会立即投递消息。
使用限制
消息类型一致性
定时消息仅支持在 MessageType为Delay 的主题内使用,即定时消息只能发送至类型为定时消息的主题中,发送的消息的类型必须和主题的类型一致。
定时精度约束
Apache RocketMQ 定时消息的定时时长参数精确到毫秒级,但是默认精度为1000ms,即定时消息为秒级精度。
Apache RocketMQ 定时消息的状态支持持久化存储,系统由于故障重启后,仍支持按照原来设置的定时时间触发消息投递。若存储系统异常重启,可能会导致定时消息投递出现一定延迟
rocketmq-client版本
这个定时和延时两种方式设置方式,两种设置方式都需转成毫秒
使用延时偏移间隔时间时注意以下两点
客户端时间和服务端时间要保持一致
设置最大的偏移时间量不能超过24小时
延时/定时消息接收和普通消息接收一样
为了方便计算时间相关项目引入hutool相关工具包
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-core</artifactId>
<version>5.8.26</version>
</dependency>
普通java项目
定时代码示例
@Test
public void sendDeliverMsg() throws UnknownHostException, InterruptedException, MQClientException, MQBrokerException, RemotingException {
String topic = "DelayTopic";
String tag = "Java";
String msg=null;
String msgKey = null;
Message message =null;
MQProducer delayProduce = RocketMqUtil.getProduce("DelayProduce");
Date now = new Date();
String nowDateString = DateUtil.formatDateTime(now);
System.out.println("当前时间:"+nowDateString);
for(int i=0;i<10;i++){
msg="This is a delay message for Apache RocketMQ#序号:"+i;
msgKey = "delayMsgKey:"+i;
DateTime delayTime = DateUtil.offsetMinute(now, i+3);
message = new Message(topic, tag, msgKey, msg.getBytes(StandardCharsets.UTF_8));
// 设置指定过期时间
message.setDeliverTimeMs(delayTime.getTime());
System.out.println("当前消息Key"+msgKey+"过期时间为:"+DateUtil.formatDateTime(delayTime)+",过期时间戳:"+delayTime.getTime());
SendResult send = delayProduce.send(message);
System.out.println(send);
}
}
延时消息代码示例
@Test
public void sendDelayMsg() throws UnknownHostException, InterruptedException, MQClientException, MQBrokerException, RemotingException {
String topic = "DelayTopic";
String tag = "Java";
String msg=null;
String msgKey = null;
Message message =null;
MQProducer delayProduce = RocketMqUtil.getProduce("DelayProduce");
Date now = new Date();
String nowDateString = DateUtil.formatDateTime(now);
System.out.println("当前时间:"+nowDateString);
for(int i=0;i<10;i++){
msg="This is a delay message for Apache RocketMQ#序号:"+i;
msgKey = "delayMsgKey:"+i;
DateTime delayTime = DateUtil.offsetMinute(now, i+3);
message = new Message(topic, tag, msgKey, msg.getBytes(StandardCharsets.UTF_8));
// 设置间隔过期时间
message.setDelayTimeMs(delayTime.getTime()-System.currentTimeMillis());
System.out.println("当前消息Key"+msgKey+"过期时间为:"+DateUtil.formatDateTime(delayTime)+",过期时间戳:"+delayTime.getTime());
SendResult send = delayProduce.send(message);
System.out.println(send);
}
}
接收消息示例
@Test
public void receiveDelayMsg() throws InterruptedException, MQClientException, MQBrokerException, RemotingException, UnknownHostException {
String topic = "DelayTopic";
String tag = "Java";
MessageListenerOrderly messageListenerOrderly = new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
System.out.println("MsgKey:" + JSON.toJSONString(msgs.get(0).getKeys()));
return ConsumeOrderlyStatus.SUCCESS;
}
};
DefaultMQPushConsumer delayConsumer = RocketMqUtil.getConsume("DelayConsumer", topic, tag, messageListenerOrderly);
Thread.sleep(Long.MAX_VALUE);
delayConsumer.shutdown();
}
Springboot项目示例
定时代码示例
@Test
void sendDeliverMsg() {
Date now = new Date();
String nowDateString = DateUtil.formatDateTime(now);
log.info("当前时间:"+nowDateString);
for (int i = 0; i < 10; i++) {
DateTime delayTime = DateUtil.offsetMinute(now, i);
log.info("过期时间为:"+DateUtil.formatDateTime(delayTime)+",过期时间戳:"+delayTime.getTime());
SendResult sendResult = rocketMQTemplate.syncSendDeliverTimeMills("DelayTopic:spring-boot", "Delay Msg "+i,delayTime.getTime());
log.info(String.valueOf(sendResult));
}
}
延时消息代码示例
@Test
void sendDelayMsg() {
Date now = new Date();
String nowDateString = DateUtil.formatDateTime(now);
log.info("当前时间:"+nowDateString);
for (int i = 0; i < 10; i++) {
DateTime delayTime = DateUtil.offsetMinute(now, i);
log.info("过期时间为:"+DateUtil.formatDateTime(delayTime)+",过期时间戳:"+delayTime.getTime());
SendResult sendResult = rocketMQTemplate.syncSendDelayTimeMills("DelayTopic:spring-boot", "Delay Msg "+i,delayTime.getTime()-System.currentTimeMillis());
log.info(String.valueOf(sendResult));
}
}
接收消息示例
@Slf4j
@Component
@RocketMQMessageListener(topic = "DelayTopic",consumerGroup = "${rocketmq.consumer.group}",selectorExpression="spring-boot")
public class DelayConsumerService implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
log.info("接收到的消息为:{}", message);
}
}
rocketmq-client-java版本
普通java项目
Springboot项目示例
本文是原创文章,采用 CC BY-NC-ND 4.0 协议,完整转载请注明来自 程序员流动的零一
评论
匿名评论
隐私政策
你无需删除空行,直接评论以获取最佳展示效果