RocketMQ普通消息示例
创建生产者并发送消息
普通java项目
首先我们创建一个工具类来获取与服务端通信的相关配置
public class RocketMqUtil {
// 设置成nameser的地址端口。
private static String ENDPOINT = "192.168.183.129:30885";
private static String INSTANCE_NAME = "CUSTOMER";
private static Integer INSTANCE_INDEX = 0;
public static String getENDPOINT() {
return ENDPOINT;
}
public static void setENDPOINT(String ENDPOINT) {
RocketMqUtil.ENDPOINT = ENDPOINT;
}
public static String getInstanceName() {
return INSTANCE_NAME;
}
public static void setInstanceName(String instanceName) {
INSTANCE_NAME = instanceName;
}
public static Integer getInstanceIndex() {
return INSTANCE_INDEX;
}
public static void setInstanceIndex(Integer instanceIndex) {
INSTANCE_INDEX = instanceIndex;
}
private static ClientConfig getClientConfig() throws UnknownHostException {
ClientConfig clientConfig = new ClientConfig();
clientConfig.setNamesrvAddr(ENDPOINT);
clientConfig.setAccessChannel(AccessChannel.LOCAL);
clientConfig.setClientIP(InetAddress.getLocalHost().getHostAddress());
clientConfig.setInstanceName(INSTANCE_NAME);
return clientConfig;
}
public static DefaultMQProducer getProduce(String producerGroup) throws MQClientException, UnknownHostException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer(producerGroup);
producer.resetClientConfig(getClientConfig());
producer.setSendMsgTimeout(30000);
producer.start();
return producer;
}
}
编写发送消息测试用例
public class NormalMsgTest {
/**
* 发送普通同步消息
*/
@Test
public void sendNormalMsg() throws UnknownHostException, MQClientException, MQBrokerException, RemotingException, InterruptedException {
String topic = "TestTopic";
DefaultMQProducer normalProduce = RocketMqUtil.getProduce("NORMAL");
String msg="This is a normal message for Apache RocketMQ2";
String tag = "tagA";
String msgId = "msgIdA";
Message message = new Message(topic, tag, msgId, msg.getBytes(StandardCharsets.UTF_8));
SendResult send = normalProduce.send(message);
System.out.println(send);
msg="This is a normal message for Apache RocketMQ#";
msgId = "msgIdA2";
message = new Message(topic, tag, msgId, msg.getBytes(StandardCharsets.UTF_8));
send = normalProduce.send(message);
System.out.println(send);
normalProduce.shutdown();
}
/**
* 发送普通异步消息
*/
@Test
public void sendAsyncNormalMsg() throws InterruptedException, UnknownHostException, MQClientException, MQBrokerException, RemotingException {
String topic = "TestTopic";
String msg="This is a normal message for Apache RocketMQ";
String tag = "tagB";
String msgId = "msgIdB";
MQProducer normalProduce = RocketMqUtil.getProduce("NORMAL");
Message message = new Message(topic, tag, msg.getBytes(StandardCharsets.UTF_8));
normalProduce.send(message,new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println(sendResult);
}
@Override
public void onException(Throwable e) {
e.printStackTrace();
}
});
Thread.sleep(Long.MAX_VALUE);
}
}
然后控制台就可以看到我们发送的消息
Springboot项目
更改API接口连接超时时间
从git 下载spring-rocketmq源码 源码地址:https://github.com/apache/rocketmq-spring.git
我们要修改rocketmq-spring-boot模块下的三个文件,如下图
@SuppressWarnings("WeakerAccess")
@ConfigurationProperties(prefix = "rocketmq")
public class RocketMQProperties {
/**
* The name server for rocketMQ, formats: `host:port;host:port`.
*/
private String nameServer;
/**
* Enum type for accessChannel, values: LOCAL, CLOUD
*/
private String accessChannel;
/**
* mqClient Api request timeout.
*/
private int apiTimeout = 3000;
private Producer producer;
/**
* for pull consumer only
*
* @see org.apache.rocketmq.spring.annotation.RocketMQMessageListener for a push consumer
*/
private PullConsumer pullConsumer = new PullConsumer();
/**
* Configure enable listener or not.
* In some particular cases, if you don't want the the listener is enabled when container startup,
* the configuration pattern is like this :
* rocketmq.push-consumer.listeners.<group-name>.<topic-name>.enabled=<boolean value, true or false>
* <p>
* the listener is enabled by default.
*
*/
private PushConsumer consumer = new PushConsumer();
public String getNameServer() {
return nameServer;
}
public void setNameServer(String nameServer) {
this.nameServer = nameServer;
}
public String getAccessChannel() {
return accessChannel;
}
public void setAccessChannel(String accessChannel) {
this.accessChannel = accessChannel;
}
public int getApiTimeout() {
return apiTimeout;
}
public void setApiTimeout(int apiTimeout) {
this.apiTimeout = apiTimeout;
}
public RocketMQProperties.Producer getProducer() {
return producer;
}
public void setProducer(RocketMQProperties.Producer producer) {
this.producer = producer;
}
public PullConsumer getPullConsumer() {
return pullConsumer;
}
public void setPullConsumer(PullConsumer pullConsumer) {
this.pullConsumer = pullConsumer;
}
public PushConsumer getConsumer() {
return consumer;
}
public void setConsumer(PushConsumer consumer) {
this.consumer = consumer;
}
public static class Producer {
/**
* Group name of producer.
*/
private String group;
/**
* Namespace for this MQ Producer instance.
*/
private String namespace;
/**
* The namespace v2 version of producer, it can not be used in combination with namespace.
*/
private String namespaceV2;
/**
* Millis of send message timeout.
*/
private int sendMessageTimeout = 3000;
/**
* Compress message body threshold, namely, message body larger than 4k will be compressed on default.
*/
private int compressMessageBodyThreshold = 1024 * 4;
/**
* Maximum number of retry to perform internally before claiming sending failure in synchronous mode.
* This may potentially cause message duplication which is up to application developers to resolve.
*/
private int retryTimesWhenSendFailed = 2;
/**
* <p> Maximum number of retry to perform internally before claiming sending failure in asynchronous mode. </p>
* This may potentially cause message duplication which is up to application developers to resolve.
*/
private int retryTimesWhenSendAsyncFailed = 2;
/**
* Indicate whether to retry another broker on sending failure internally.
*/
private boolean retryNextServer = false;
/**
* Maximum allowed message size in bytes.
*/
private int maxMessageSize = 1024 * 1024 * 4;
/**
* The property of "access-key".
*/
private String accessKey;
/**
* The property of "secret-key".
*/
private String secretKey;
/**
* Switch flag instance for message trace.
*/
private boolean enableMsgTrace = false;
/**
* The name value of message trace topic.If you don't config,you can use the default trace topic name.
*/
private String customizedTraceTopic = TopicValidator.RMQ_SYS_TRACE_TOPIC;
/**
* The property of "tlsEnable".
*/
private boolean tlsEnable = false;
/**
* The property of "instanceName".
*/
private String instanceName = "DEFAULT";
public String getGroup() {
return group;
}
public void setGroup(String group) {
this.group = group;
}
public int getSendMessageTimeout() {
return sendMessageTimeout;
}
public void setSendMessageTimeout(int sendMessageTimeout) {
this.sendMessageTimeout = sendMessageTimeout;
}
public int getCompressMessageBodyThreshold() {
return compressMessageBodyThreshold;
}
public void setCompressMessageBodyThreshold(int compressMessageBodyThreshold) {
this.compressMessageBodyThreshold = compressMessageBodyThreshold;
}
public int getRetryTimesWhenSendFailed() {
return retryTimesWhenSendFailed;
}
public void setRetryTimesWhenSendFailed(int retryTimesWhenSendFailed) {
this.retryTimesWhenSendFailed = retryTimesWhenSendFailed;
}
public int getRetryTimesWhenSendAsyncFailed() {
return retryTimesWhenSendAsyncFailed;
}
public void setRetryTimesWhenSendAsyncFailed(int retryTimesWhenSendAsyncFailed) {
this.retryTimesWhenSendAsyncFailed = retryTimesWhenSendAsyncFailed;
}
public boolean isRetryNextServer() {
return retryNextServer;
}
public void setRetryNextServer(boolean retryNextServer) {
this.retryNextServer = retryNextServer;
}
public int getMaxMessageSize() {
return maxMessageSize;
}
public void setMaxMessageSize(int maxMessageSize) {
this.maxMessageSize = maxMessageSize;
}
public String getAccessKey() {
return accessKey;
}
public void setAccessKey(String accessKey) {
this.accessKey = accessKey;
}
public String getSecretKey() {
return secretKey;
}
public void setSecretKey(String secretKey) {
this.secretKey = secretKey;
}
public boolean isEnableMsgTrace() {
return enableMsgTrace;
}
public void setEnableMsgTrace(boolean enableMsgTrace) {
this.enableMsgTrace = enableMsgTrace;
}
public String getCustomizedTraceTopic() {
return customizedTraceTopic;
}
public void setCustomizedTraceTopic(String customizedTraceTopic) {
this.customizedTraceTopic = customizedTraceTopic;
}
public boolean isTlsEnable() {
return tlsEnable;
}
public void setTlsEnable(boolean tlsEnable) {
this.tlsEnable = tlsEnable;
}
public String getNamespace() {
return namespace;
}
public void setNamespace(String namespace) {
this.namespace = namespace;
}
public String getNamespaceV2() {
return namespaceV2;
}
public void setNamespaceV2(String namespaceV2) {
this.namespaceV2 = namespaceV2;
}
public String getInstanceName() {
return instanceName;
}
public void setInstanceName(String instanceName) {
this.instanceName = instanceName;
}
}
public static class PullConsumer {
/**
* Group name of consumer.
*/
private String group;
/**
* Namespace for this MQ Consumer instance.
*/
private String namespace;
/**
* The namespace v2 version of consumer, it can not be used in combination with namespace.
*/
private String namespaceV2;
/**
* Topic name of consumer.
*/
private String topic;
/**
* Control message mode, if you want all subscribers receive message all message, broadcasting is a good choice.
*/
private String messageModel = "CLUSTERING";
/**
* Control how to selector message.
*/
private String selectorType = "TAG";
/**
* Control which message can be select.
*/
private String selectorExpression = "*";
/**
* The property of "access-key".
*/
private String accessKey;
/**
* The property of "secret-key".
*/
private String secretKey;
/**
* Maximum number of messages pulled each time.
*/
private int pullBatchSize = 10;
/**
* Switch flag instance for message trace.
*/
private boolean enableMsgTrace = false;
/**
* The name value of message trace topic.If you don't config,you can use the default trace topic name.
*/
private String customizedTraceTopic = TopicValidator.RMQ_SYS_TRACE_TOPIC;
/**
* The property of "tlsEnable".
*/
private boolean tlsEnable = false;
/**
* The property of "instanceName".
*/
private String instanceName = "DEFAULT";
public String getGroup() {
return group;
}
public void setGroup(String group) {
this.group = group;
}
public String getTopic() {
return topic;
}
public void setTopic(String topic) {
this.topic = topic;
}
public String getMessageModel() {
return messageModel;
}
public void setMessageModel(String messageModel) {
this.messageModel = messageModel;
}
public String getSelectorType() {
return selectorType;
}
public void setSelectorType(String selectorType) {
this.selectorType = selectorType;
}
public String getSelectorExpression() {
return selectorExpression;
}
public void setSelectorExpression(String selectorExpression) {
this.selectorExpression = selectorExpression;
}
public String getAccessKey() {
return accessKey;
}
public void setAccessKey(String accessKey) {
this.accessKey = accessKey;
}
public String getSecretKey() {
return secretKey;
}
public void setSecretKey(String secretKey) {
this.secretKey = secretKey;
}
public int getPullBatchSize() {
return pullBatchSize;
}
public void setPullBatchSize(int pullBatchSize) {
this.pullBatchSize = pullBatchSize;
}
public boolean isEnableMsgTrace() {
return enableMsgTrace;
}
public void setEnableMsgTrace(boolean enableMsgTrace) {
this.enableMsgTrace = enableMsgTrace;
}
public String getCustomizedTraceTopic() {
return customizedTraceTopic;
}
public void setCustomizedTraceTopic(String customizedTraceTopic) {
this.customizedTraceTopic = customizedTraceTopic;
}
public boolean isTlsEnable() {
return tlsEnable;
}
public void setTlsEnable(boolean tlsEnable) {
this.tlsEnable = tlsEnable;
}
public String getNamespace() {
return namespace;
}
public void setNamespace(String namespace) {
this.namespace = namespace;
}
public String getNamespaceV2() {
return namespaceV2;
}
public void setNamespaceV2(String namespaceV2) {
this.namespaceV2 = namespaceV2;
}
public String getInstanceName() {
return instanceName;
}
public void setInstanceName(String instanceName) {
this.instanceName = instanceName;
}
}
public static class PushConsumer extends PullConsumer {
/**
* listener configuration container
* the pattern is like this:
* group1.topic1 = false
* group2.topic2 = true
* group3.topic3 = false
*/
private Map<String, Map<String, Boolean>> listeners = new HashMap<>();
public Map<String, Map<String, Boolean>> getListeners() {
return listeners;
}
public void setListeners(Map<String, Map<String, Boolean>> listeners) {
this.listeners = listeners;
}
}
}
@SuppressWarnings("WeakerAccess")
public class DefaultRocketMQListenerContainer implements InitializingBean,
RocketMQListenerContainer, SmartLifecycle, ApplicationContextAware {
private final static Logger log = LoggerFactory.getLogger(DefaultRocketMQListenerContainer.class);
private ApplicationContext applicationContext;
/**
* The name of the DefaultRocketMQListenerContainer instance
*/
private String name;
/**
* Suspending pulling time in orderly mode.
* <p>
* The minimum value is 10 and the maximum is 30000.
*/
private long suspendCurrentQueueTimeMillis = 1000;
/**
* Message consume retry strategy in concurrently mode.
* <p>
* -1,no retry,put into DLQ directly
* 0,broker control retry frequency
* >0,client control retry frequency
*/
private int delayLevelWhenNextConsume = 0;
private String nameServer;
private AccessChannel accessChannel = AccessChannel.LOCAL;
private String consumerGroup;
private String topic;
private int consumeThreadMax = 64;
private int consumeThreadNumber = 20;
private String charset = "UTF-8";
private MessageConverter messageConverter;
private RocketMQListener rocketMQListener;
private RocketMQReplyListener rocketMQReplyListener;
private RocketMQMessageListener rocketMQMessageListener;
private DefaultMQPushConsumer consumer;
private Type messageType;
private MethodParameter methodParameter;
private boolean running;
// The following properties came from @RocketMQMessageListener.
private ConsumeMode consumeMode;
private SelectorType selectorType;
private String selectorExpression;
private MessageModel messageModel;
private long consumeTimeout;
private int maxReconsumeTimes;
private int replyTimeout;
private String tlsEnable;
private String namespace;
private String namespaceV2;
private long awaitTerminationMillisWhenShutdown;
private String instanceName;
private int mqClientApiTimeout;
public long getSuspendCurrentQueueTimeMillis() {
return suspendCurrentQueueTimeMillis;
}
public void setSuspendCurrentQueueTimeMillis(long suspendCurrentQueueTimeMillis) {
this.suspendCurrentQueueTimeMillis = suspendCurrentQueueTimeMillis;
}
public int getDelayLevelWhenNextConsume() {
return delayLevelWhenNextConsume;
}
public void setDelayLevelWhenNextConsume(int delayLevelWhenNextConsume) {
this.delayLevelWhenNextConsume = delayLevelWhenNextConsume;
}
public String getNameServer() {
return nameServer;
}
public void setNameServer(String nameServer) {
this.nameServer = nameServer;
}
public AccessChannel getAccessChannel() {
return accessChannel;
}
public void setAccessChannel(AccessChannel accessChannel) {
this.accessChannel = accessChannel;
}
public String getConsumerGroup() {
return consumerGroup;
}
public void setConsumerGroup(String consumerGroup) {
this.consumerGroup = consumerGroup;
}
public String getTopic() {
return topic;
}
public void setTopic(String topic) {
this.topic = topic;
}
public int getConsumeThreadMax() {
return consumeThreadMax;
}
public int getConsumeThreadNumber() {
return consumeThreadNumber;
}
public String getCharset() {
return charset;
}
public void setCharset(String charset) {
this.charset = charset;
}
public int getMqClientApiTimeout() {
return mqClientApiTimeout;
}
public void setMqClientApiTimeout(int mqClientApiTimeout) {
this.mqClientApiTimeout = mqClientApiTimeout;
}
public MessageConverter getMessageConverter() {
return messageConverter;
}
public DefaultRocketMQListenerContainer setMessageConverter(MessageConverter messageConverter) {
this.messageConverter = messageConverter;
return this;
}
public RocketMQListener getRocketMQListener() {
return rocketMQListener;
}
public void setRocketMQListener(RocketMQListener rocketMQListener) {
this.rocketMQListener = rocketMQListener;
}
public RocketMQReplyListener getRocketMQReplyListener() {
return rocketMQReplyListener;
}
public void setRocketMQReplyListener(RocketMQReplyListener rocketMQReplyListener) {
this.rocketMQReplyListener = rocketMQReplyListener;
}
public RocketMQMessageListener getRocketMQMessageListener() {
return rocketMQMessageListener;
}
public void setRocketMQMessageListener(RocketMQMessageListener anno) {
this.rocketMQMessageListener = anno;
this.consumeMode = anno.consumeMode();
this.consumeThreadMax = anno.consumeThreadMax();
this.consumeThreadNumber = anno.consumeThreadNumber();
this.messageModel = anno.messageModel();
this.selectorType = anno.selectorType();
this.selectorExpression = anno.selectorExpression();
this.consumeTimeout = anno.consumeTimeout();
this.maxReconsumeTimes = anno.maxReconsumeTimes();
this.replyTimeout = anno.replyTimeout();
this.tlsEnable = anno.tlsEnable();
this.namespace = anno.namespace();
this.namespaceV2 = anno.namespaceV2();
this.delayLevelWhenNextConsume = anno.delayLevelWhenNextConsume();
this.suspendCurrentQueueTimeMillis = anno.suspendCurrentQueueTimeMillis();
this.awaitTerminationMillisWhenShutdown = Math.max(0, anno.awaitTerminationMillisWhenShutdown());
this.instanceName = anno.instanceName();
}
public ConsumeMode getConsumeMode() {
return consumeMode;
}
public SelectorType getSelectorType() {
return selectorType;
}
public void setSelectorExpression(String selectorExpression) {
this.selectorExpression = selectorExpression;
}
public String getSelectorExpression() {
return selectorExpression;
}
public MessageModel getMessageModel() {
return messageModel;
}
public String getTlsEnable() {
return tlsEnable;
}
public void setTlsEnable(String tlsEnable) {
this.tlsEnable = tlsEnable;
}
public String getNamespace() {
return namespace;
}
public void setNamespace(String namespace) {
this.namespace = namespace;
}
public String getNamespaceV2() {
return namespaceV2;
}
public void setNamespaceV2(String namespaceV2) {
this.namespaceV2 = namespaceV2;
}
public DefaultMQPushConsumer getConsumer() {
return consumer;
}
public void setConsumer(DefaultMQPushConsumer consumer) {
this.consumer = consumer;
}
public long getAwaitTerminationMillisWhenShutdown() {
return awaitTerminationMillisWhenShutdown;
}
public String getInstanceName() {
return instanceName;
}
public void setInstanceName(String instanceName) {
this.instanceName = instanceName;
}
public DefaultRocketMQListenerContainer setAwaitTerminationMillisWhenShutdown(long awaitTerminationMillisWhenShutdown) {
this.awaitTerminationMillisWhenShutdown = awaitTerminationMillisWhenShutdown;
return this;
}
@Override
public void destroy() {
this.setRunning(false);
if (Objects.nonNull(consumer)) {
consumer.shutdown();
}
log.info("container destroyed, {}", this.toString());
}
@Override
public boolean isAutoStartup() {
return true;
}
@Override
public void stop(Runnable callback) {
stop();
callback.run();
}
@Override
public void start() {
if (this.isRunning()) {
throw new IllegalStateException("container already running. " + this.toString());
}
try {
consumer.start();
} catch (MQClientException e) {
throw new IllegalStateException("Failed to start RocketMQ push consumer", e);
}
this.setRunning(true);
log.info("running container: {}", this.toString());
}
@Override
public void stop() {
if (this.isRunning()) {
if (Objects.nonNull(consumer)) {
consumer.shutdown();
}
setRunning(false);
}
}
@Override
public boolean isRunning() {
return running;
}
private void setRunning(boolean running) {
this.running = running;
}
@Override
public int getPhase() {
// Returning Integer.MAX_VALUE only suggests that
// we will be the first bean to shutdown and last bean to start
return Integer.MAX_VALUE;
}
@Override
public void afterPropertiesSet() throws Exception {
initRocketMQPushConsumer();
this.messageType = getMessageType();
this.methodParameter = getMethodParameter();
log.debug("RocketMQ messageType: {}", messageType);
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}
@Override
public String toString() {
return "DefaultRocketMQListenerContainer{" +
"consumerGroup='" + consumerGroup + '\'' +
", namespace='" + namespace + '\'' +
", namespaceV2='" + namespaceV2 + '\'' +
", nameServer='" + nameServer + '\'' +
", topic='" + topic + '\'' +
", consumeMode=" + consumeMode +
", selectorType=" + selectorType +
", selectorExpression='" + selectorExpression + '\'' +
", messageModel=" + messageModel + '\'' +
", tlsEnable=" + tlsEnable +
", instanceName=" + instanceName +
'}';
}
public void setName(String name) {
this.name = name;
}
public class DefaultMessageListenerConcurrently implements MessageListenerConcurrently {
@SuppressWarnings("unchecked")
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt messageExt : msgs) {
log.debug("received msg: {}", messageExt);
try {
long now = System.currentTimeMillis();
handleMessage(messageExt);
long costTime = System.currentTimeMillis() - now;
log.debug("consume {} cost: {} ms", messageExt.getMsgId(), costTime);
} catch (Exception e) {
log.warn("consume message failed. messageId:{}, topic:{}, reconsumeTimes:{}", messageExt.getMsgId(), messageExt.getTopic(), messageExt.getReconsumeTimes(), e);
context.setDelayLevelWhenNextConsume(delayLevelWhenNextConsume);
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}
public class DefaultMessageListenerOrderly implements MessageListenerOrderly {
@SuppressWarnings("unchecked")
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
for (MessageExt messageExt : msgs) {
log.debug("received msg: {}", messageExt);
try {
long now = System.currentTimeMillis();
handleMessage(messageExt);
long costTime = System.currentTimeMillis() - now;
log.debug("consume {} cost: {} ms", messageExt.getMsgId(), costTime);
} catch (Exception e) {
log.warn("consume message failed. messageId:{}, topic:{}, reconsumeTimes:{}", messageExt.getMsgId(), messageExt.getTopic(), messageExt.getReconsumeTimes(), e);
context.setSuspendCurrentQueueTimeMillis(suspendCurrentQueueTimeMillis);
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
}
return ConsumeOrderlyStatus.SUCCESS;
}
}
public void handleMessage(
MessageExt messageExt) throws MQClientException, RemotingException, InterruptedException {
if (rocketMQListener != null) {
rocketMQListener.onMessage(doConvertMessage(messageExt));
} else if (rocketMQReplyListener != null) {
Object replyContent = rocketMQReplyListener.onMessage(doConvertMessage(messageExt));
Message<?> message = MessageBuilder.withPayload(replyContent).build();
org.apache.rocketmq.common.message.Message replyMessage = MessageUtil.createReplyMessage(messageExt, convertToBytes(message));
DefaultMQProducer producer = consumer.getDefaultMQPushConsumerImpl().getmQClientFactory().getDefaultMQProducer();
producer.setSendMsgTimeout(replyTimeout);
producer.send(replyMessage, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
log.error("Consumer replies message failed. SendStatus: {}", sendResult.getSendStatus());
} else {
log.debug("Consumer replies message success.");
}
}
@Override
public void onException(Throwable e) {
log.error("Consumer replies message failed. error: {}", e.getLocalizedMessage());
}
});
}
}
private byte[] convertToBytes(Message<?> message) {
Message<?> messageWithSerializedPayload = doConvert(message.getPayload(), message.getHeaders());
Object payloadObj = messageWithSerializedPayload.getPayload();
byte[] payloads;
try {
if (null == payloadObj) {
throw new RuntimeException("the message cannot be empty");
}
if (payloadObj instanceof String) {
payloads = ((String) payloadObj).getBytes(Charset.forName(charset));
} else if (payloadObj instanceof byte[]) {
payloads = (byte[]) messageWithSerializedPayload.getPayload();
} else {
String jsonObj = (String) this.messageConverter.fromMessage(messageWithSerializedPayload, payloadObj.getClass());
if (null == jsonObj) {
throw new RuntimeException(String.format(
"empty after conversion [messageConverter:%s,payloadClass:%s,payloadObj:%s]",
this.messageConverter.getClass(), payloadObj.getClass(), payloadObj));
}
payloads = jsonObj.getBytes(Charset.forName(charset));
}
} catch (Exception e) {
throw new RuntimeException("convert to bytes failed.", e);
}
return payloads;
}
private Message<?> doConvert(Object payload, MessageHeaders headers) {
Message<?> message = this.messageConverter instanceof SmartMessageConverter ?
((SmartMessageConverter) this.messageConverter).toMessage(payload, headers, null) :
this.messageConverter.toMessage(payload, headers);
if (message == null) {
String payloadType = payload.getClass().getName();
Object contentType = headers != null ? headers.get(MessageHeaders.CONTENT_TYPE) : null;
throw new MessageConversionException("Unable to convert payload with type='" + payloadType +
"', contentType='" + contentType + "', converter=[" + this.messageConverter + "]");
}
MessageBuilder<?> builder = MessageBuilder.fromMessage(message);
builder.setHeaderIfAbsent(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.TEXT_PLAIN);
return builder.build();
}
@SuppressWarnings("unchecked")
private Object doConvertMessage(MessageExt messageExt) {
if (Objects.equals(messageType, MessageExt.class) || Objects.equals(messageType, org.apache.rocketmq.common.message.Message.class)) {
return messageExt;
} else {
String str = new String(messageExt.getBody(), Charset.forName(charset));
if (Objects.equals(messageType, String.class)) {
return str;
} else {
// If msgType not string, use objectMapper change it.
try {
if (messageType instanceof Class) {
//if the messageType has not Generic Parameter
return this.getMessageConverter().fromMessage(MessageBuilder.withPayload(str).build(), (Class<?>) messageType);
} else {
//if the messageType has Generic Parameter, then use SmartMessageConverter#fromMessage with third parameter "conversionHint".
//we have validate the MessageConverter is SmartMessageConverter in this#getMethodParameter.
return ((SmartMessageConverter) this.getMessageConverter()).fromMessage(MessageBuilder.withPayload(str).build(), (Class<?>) ((ParameterizedType) messageType).getRawType(), methodParameter);
}
} catch (Exception e) {
log.info("convert failed. str:{}, msgType:{}", str, messageType);
throw new RuntimeException("cannot convert message to " + messageType, e);
}
}
}
}
private MethodParameter getMethodParameter() {
Class<?> targetClass;
if (rocketMQListener != null) {
targetClass = AopProxyUtils.ultimateTargetClass(rocketMQListener);
} else {
targetClass = AopProxyUtils.ultimateTargetClass(rocketMQReplyListener);
}
Type messageType = this.getMessageType();
Class clazz = null;
if (messageType instanceof ParameterizedType && messageConverter instanceof SmartMessageConverter) {
clazz = (Class) ((ParameterizedType) messageType).getRawType();
} else if (messageType instanceof Class) {
clazz = (Class) messageType;
} else {
throw new RuntimeException("parameterType:" + messageType + " of onMessage method is not supported");
}
try {
final Method method = targetClass.getMethod("onMessage", clazz);
return new MethodParameter(method, 0);
} catch (NoSuchMethodException e) {
e.printStackTrace();
throw new RuntimeException("parameterType:" + messageType + " of onMessage method is not supported");
}
}
protected Type getMessageType() {
Class<?> targetClass;
if (rocketMQListener != null) {
targetClass = AopProxyUtils.ultimateTargetClass(rocketMQListener);
} else {
targetClass = AopProxyUtils.ultimateTargetClass(rocketMQReplyListener);
}
Type matchedGenericInterface = null;
while (Objects.nonNull(targetClass)) {
Type[] interfaces = targetClass.getGenericInterfaces();
if (Objects.nonNull(interfaces)) {
for (Type type : interfaces) {
if (type instanceof ParameterizedType &&
(Objects.equals(((ParameterizedType) type).getRawType(), RocketMQListener.class) || Objects.equals(((ParameterizedType) type).getRawType(), RocketMQReplyListener.class))) {
matchedGenericInterface = type;
break;
}
}
}
targetClass = targetClass.getSuperclass();
}
if (Objects.isNull(matchedGenericInterface)) {
return Object.class;
}
Type[] actualTypeArguments = ((ParameterizedType) matchedGenericInterface).getActualTypeArguments();
if (Objects.nonNull(actualTypeArguments) && actualTypeArguments.length > 0) {
return actualTypeArguments[0];
}
return Object.class;
}
private void initRocketMQPushConsumer() throws MQClientException {
if (rocketMQListener == null && rocketMQReplyListener == null) {
throw new IllegalArgumentException("Property 'rocketMQListener' or 'rocketMQReplyListener' is required");
}
Assert.notNull(consumerGroup, "Property 'consumerGroup' is required");
Assert.notNull(nameServer, "Property 'nameServer' is required");
Assert.notNull(topic, "Property 'topic' is required");
RPCHook rpcHook = RocketMQUtil.getRPCHookByAkSk(applicationContext.getEnvironment(),
this.rocketMQMessageListener.accessKey(), this.rocketMQMessageListener.secretKey());
boolean enableMsgTrace = rocketMQMessageListener.enableMsgTrace();
if (Objects.nonNull(rpcHook)) {
consumer = new DefaultMQPushConsumer(consumerGroup, rpcHook, new AllocateMessageQueueAveragely(),
enableMsgTrace, this.applicationContext.getEnvironment().
resolveRequiredPlaceholders(this.rocketMQMessageListener.customizedTraceTopic()));
consumer.setVipChannelEnabled(false);
} else {
log.debug("Access-key or secret-key not configure in " + this + ".");
consumer = new DefaultMQPushConsumer(consumerGroup, enableMsgTrace,
this.applicationContext.getEnvironment().
resolveRequiredPlaceholders(this.rocketMQMessageListener.customizedTraceTopic()));
}
consumer.setNamespace(namespace);
consumer.setNamespaceV2(namespaceV2);
String customizedNameServer = this.applicationContext.getEnvironment().resolveRequiredPlaceholders(this.rocketMQMessageListener.nameServer());
if (customizedNameServer != null) {
consumer.setNamesrvAddr(customizedNameServer);
} else {
consumer.setNamesrvAddr(nameServer);
}
if (accessChannel != null) {
consumer.setAccessChannel(accessChannel);
}
if (mqClientApiTimeout != 0) {
consumer.setMqClientApiTimeout(mqClientApiTimeout);
}
consumer.setConsumeThreadMax(consumeThreadMax);
consumer.setConsumeThreadMin(consumeThreadNumber);
consumer.setConsumeTimeout(consumeTimeout);
consumer.setMaxReconsumeTimes(maxReconsumeTimes);
consumer.setAwaitTerminationMillisWhenShutdown(awaitTerminationMillisWhenShutdown);
consumer.setInstanceName(instanceName);
switch (messageModel) {
case BROADCASTING:
consumer.setMessageModel(org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel.BROADCASTING);
break;
case CLUSTERING:
consumer.setMessageModel(org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel.CLUSTERING);
break;
default:
throw new IllegalArgumentException("Property 'messageModel' was wrong.");
}
switch (selectorType) {
case TAG:
consumer.subscribe(topic, selectorExpression);
break;
case SQL92:
consumer.subscribe(topic, MessageSelector.bySql(selectorExpression));
break;
default:
throw new IllegalArgumentException("Property 'selectorType' was wrong.");
}
switch (consumeMode) {
case ORDERLY:
consumer.setMessageListener(new DefaultMessageListenerOrderly());
break;
case CONCURRENTLY:
consumer.setMessageListener(new DefaultMessageListenerConcurrently());
break;
default:
throw new IllegalArgumentException("Property 'consumeMode' was wrong.");
}
//if String is not is equal "true" TLS mode will represent the as default value false
consumer.setUseTLS(new Boolean(tlsEnable));
if (rocketMQListener instanceof RocketMQPushConsumerLifecycleListener) {
((RocketMQPushConsumerLifecycleListener) rocketMQListener).prepareStart(consumer);
} else if (rocketMQReplyListener instanceof RocketMQPushConsumerLifecycleListener) {
((RocketMQPushConsumerLifecycleListener) rocketMQReplyListener).prepareStart(consumer);
}
}
}
public class RocketMQMessageListenerContainerRegistrar implements ApplicationContextAware {
private final static Logger log = LoggerFactory.getLogger(RocketMQMessageListenerContainerRegistrar.class);
private ConfigurableApplicationContext applicationContext;
private final AtomicLong counter = new AtomicLong(0);
private final ConfigurableEnvironment environment;
private final RocketMQProperties rocketMQProperties;
private final RocketMQMessageConverter rocketMQMessageConverter;
private final List<DefaultRocketMQListenerContainer> containers = new ArrayList<>();
public RocketMQMessageListenerContainerRegistrar(RocketMQMessageConverter rocketMQMessageConverter,
ConfigurableEnvironment environment, RocketMQProperties rocketMQProperties) {
this.rocketMQMessageConverter = rocketMQMessageConverter;
this.environment = environment;
this.rocketMQProperties = rocketMQProperties;
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = (ConfigurableApplicationContext) applicationContext;
}
public void registerContainer(String beanName, Object bean, RocketMQMessageListener annotation) {
Class<?> clazz = AopProxyUtils.ultimateTargetClass(bean);
if (RocketMQListener.class.isAssignableFrom(bean.getClass()) && RocketMQReplyListener.class.isAssignableFrom(bean.getClass())) {
throw new IllegalStateException(clazz + " cannot be both instance of " + RocketMQListener.class.getName() + " and " + RocketMQReplyListener.class.getName());
}
if (!RocketMQListener.class.isAssignableFrom(bean.getClass()) && !RocketMQReplyListener.class.isAssignableFrom(bean.getClass())) {
throw new IllegalStateException(clazz + " is not instance of " + RocketMQListener.class.getName() + " or " + RocketMQReplyListener.class.getName());
}
String consumerGroup = this.environment.resolvePlaceholders(annotation.consumerGroup());
String topic = this.environment.resolvePlaceholders(annotation.topic());
boolean listenerEnabled =
(boolean) rocketMQProperties.getConsumer().getListeners().getOrDefault(consumerGroup, Collections.EMPTY_MAP)
.getOrDefault(topic, true);
if (!listenerEnabled) {
log.debug(
"Consumer Listener (group:{},topic:{}) is not enabled by configuration, will ignore initialization.",
consumerGroup, topic);
return;
}
validate(annotation);
String containerBeanName = String.format("%s_%s", DefaultRocketMQListenerContainer.class.getName(),
counter.incrementAndGet());
GenericApplicationContext genericApplicationContext = (GenericApplicationContext) applicationContext;
genericApplicationContext.registerBean(containerBeanName, DefaultRocketMQListenerContainer.class, () -> createRocketMQListenerContainer(containerBeanName, bean, annotation));
DefaultRocketMQListenerContainer container = genericApplicationContext.getBean(containerBeanName,
DefaultRocketMQListenerContainer.class);
containers.add(container);
log.info("Register the listener to container, listenerBeanName:{}, containerBeanName:{}", beanName, containerBeanName);
}
public void startContainer() {
for (DefaultRocketMQListenerContainer container : containers) {
if (!container.isRunning()) {
try {
container.start();
} catch (Exception e) {
log.error("Started container failed. {}", container, e);
throw new RuntimeException(e);
}
}
}
}
private DefaultRocketMQListenerContainer createRocketMQListenerContainer(String name, Object bean,
RocketMQMessageListener annotation) {
DefaultRocketMQListenerContainer container = new DefaultRocketMQListenerContainer();
container.setRocketMQMessageListener(annotation);
String nameServer = environment.resolvePlaceholders(annotation.nameServer());
nameServer = StringUtils.hasLength(nameServer) ? nameServer : rocketMQProperties.getNameServer();
String accessChannel = environment.resolvePlaceholders(annotation.accessChannel());
container.setNameServer(nameServer);
if (StringUtils.hasLength(accessChannel)) {
container.setAccessChannel(AccessChannel.valueOf(accessChannel));
}
container.setTopic(environment.resolvePlaceholders(annotation.topic()));
String tags = environment.resolvePlaceholders(annotation.selectorExpression());
if (StringUtils.hasLength(tags)) {
container.setSelectorExpression(tags);
}
container.setConsumerGroup(environment.resolvePlaceholders(annotation.consumerGroup()));
container.setTlsEnable(environment.resolvePlaceholders(annotation.tlsEnable()));
if (RocketMQListener.class.isAssignableFrom(bean.getClass())) {
container.setRocketMQListener((RocketMQListener) bean);
} else if (RocketMQReplyListener.class.isAssignableFrom(bean.getClass())) {
container.setRocketMQReplyListener((RocketMQReplyListener) bean);
}
container.setMessageConverter(rocketMQMessageConverter.getMessageConverter());
container.setName(name);
String namespace = environment.resolvePlaceholders(annotation.namespace());
container.setNamespace(RocketMQUtil.getNamespace(namespace,
rocketMQProperties.getConsumer().getNamespace()));
String namespaceV2 = environment.resolvePlaceholders(annotation.namespaceV2());
container.setNamespaceV2(RocketMQUtil.getNamespace(namespaceV2,
rocketMQProperties.getConsumer().getNamespaceV2()));
container.setMqClientApiTimeout(rocketMQProperties.getApiTimeout());
return container;
}
private void validate(RocketMQMessageListener annotation) {
if (annotation.consumeMode() == ConsumeMode.ORDERLY &&
annotation.messageModel() == MessageModel.BROADCASTING) {
throw new BeanDefinitionValidationException(
"Bad annotation definition in @RocketMQMessageListener, messageModel BROADCASTING does not support ORDERLY message!");
}
}
}
然后我们重新构建一个jar包并引入依赖
在application.yaml中增加api-timeout时间配置
server:
port: 8080
rocketmq:
name-server: '192.168.183.129:30885'
access-channel: LOCAL
api-timeout: 30000
producer:
group: "SpringBootProducer"
send-message-timeout: 30000
consumer:
group: "SpringBootConsumer"
发送消息代码如下
@Slf4j
@SpringBootTest
class RocketMqTestApplicationTests {
@Autowired
private RocketMQTemplate rocketMQTemplate;
/**
* 发送普通消息
*/
@Test
void sendNormalMsg() {
SendResult sendResult = rocketMQTemplate.syncSend("TestTopic2:Springboot", "Springboot send Msg");
System.out.println(sendResult);
}
/**
* 异步发送普通消息
* @throws InterruptedException
*/
@Test
void sendAsyncNormalMsg() throws InterruptedException {
rocketMQTemplate.asyncSend("TestTopic2:Springboot","Springboot Send Async Msg",new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info(sendResult.toString());
}
@Override
public void onException(Throwable e) {
log.error(e.toString());
}
});
Thread.sleep(Long.MAX_VALUE);
}
}
如果代码中出现异常invokeSync call the addr[] timeout
则采用覆盖DefaultMQProducer Bean的方法增加api连接时长的方法
出现这个异常的原因是客户端与服务api调用的时长限制是3秒钟,而当获取broker路由时,时长超过3秒而爆出的异常,如下图所示的位置,当然更深的原因是使用Netty 时与服务端还未建立channel,有兴趣的小伙伴可以继续探索
创建消费者并接收消息
SpringBoot项目
@Slf4j
@Component
@RocketMQMessageListener(topic = "TestTopic2",consumerGroup = "${rocketmq.consumer.group}",selectorExpression="*")
public class ConsumerService implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
log.info("接收到的消息为:{}", message);
}
}
普通java项目
在RocketmqUtil中创建一个获取消费者的方法
/**
* 获取一个队列的消费者
* @param consumerGroup 消费者分组(用于负载均衡)
* @param topic 订阅的主题
* @param subExpression 订阅规则,根据标签进行过滤 例如:"tag1 || tag2 || tag3"
* @param messageListener 消息监听者
* @return
* @throws MQClientException
* @throws UnknownHostException
* @throws InterruptedException
*/
public static DefaultMQPushConsumer getConsume(String consumerGroup, String topic, String subExpression, MessageListener messageListener) throws MQClientException, UnknownHostException, InterruptedException {
DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer();
defaultMQPushConsumer.resetClientConfig(getClientConfig());
defaultMQPushConsumer.setConsumerGroup(consumerGroup);
defaultMQPushConsumer.subscribe(topic, subExpression);
//设置消费批量拉取消息的数量
defaultMQPushConsumer.setConsumeMessageBatchMaxSize(3);
//设置消费者的最大线程数
defaultMQPushConsumer.setConsumeThreadMax(1);
//设置消费者的最小线程数
defaultMQPushConsumer.setConsumeThreadMin(1);
// defaultMQPushConsumer.setPullBatchSize(1);
// defaultMQPushConsumer.setPopBatchNums(1);
if(messageListener instanceof MessageListenerOrderly) {
defaultMQPushConsumer.registerMessageListener((MessageListenerOrderly)messageListener);
} else if (messageListener instanceof MessageListenerConcurrently) {
defaultMQPushConsumer.registerMessageListener((MessageListenerConcurrently)messageListener);
}else {
defaultMQPushConsumer.registerMessageListener(messageListener);
}
defaultMQPushConsumer.start();
return defaultMQPushConsumer;
}
创建消费者实例
@Test
public void receiveNormalMsg() throws InterruptedException, MQClientException, MQBrokerException, RemotingException, UnknownHostException {
String topic = "TestTopic";
String tag = "tagA";
//顺序消费 MessageListenerOrderly 对象用于按顺序接收消息。一个队列逐个线程
MessageListenerOrderly messageListenerOrderly = new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
System.out.println("msgs:"+JSON.toJSONString(msgs));
System.out.println("context:"+JSON.toJSONString(context));
return ConsumeOrderlyStatus.SUCCESS;
}
};
//并发接收消息 MessageListenerConcurrently 对象用于并发接收异步传输的消息
MessageListenerConcurrently messageListenerConcurrently = new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.println("msgs:" + JSON.toJSONString(msgs));
// System.out.println("context:" + JSON.toJSONString(context));
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
};
DefaultMQPushConsumer normal = RocketMqUtil.getConsume("NORMAL", topic, tag, messageListenerOrderly);
Thread.sleep(Long.MAX_VALUE);
normal.shutdown();
}
本文是原创文章,采用 CC BY-NC-ND 4.0 协议,完整转载请注明来自 程序员流动的零一
评论
匿名评论
隐私政策
你无需删除空行,直接评论以获取最佳展示效果