创建生产者并发送消息

普通java项目

首先我们创建一个工具类来获取与服务端通信的相关配置

public class RocketMqUtil {
    

    // 设置成nameser的地址端口。
    private static String ENDPOINT = "192.168.183.129:30885";

    private static String INSTANCE_NAME = "CUSTOMER";

    private static Integer INSTANCE_INDEX = 0;


    public static String getENDPOINT() {
        return ENDPOINT;
    }

    public static void setENDPOINT(String ENDPOINT) {
        RocketMqUtil.ENDPOINT = ENDPOINT;
    }

    public static String getInstanceName() {
        return INSTANCE_NAME;
    }

    public static void setInstanceName(String instanceName) {
        INSTANCE_NAME = instanceName;
    }

    public static Integer getInstanceIndex() {
        return INSTANCE_INDEX;
    }

    public static void setInstanceIndex(Integer instanceIndex) {
        INSTANCE_INDEX = instanceIndex;
    }


  
    private static ClientConfig getClientConfig() throws UnknownHostException {
        ClientConfig clientConfig = new ClientConfig();
        clientConfig.setNamesrvAddr(ENDPOINT);
        clientConfig.setAccessChannel(AccessChannel.LOCAL);
        clientConfig.setClientIP(InetAddress.getLocalHost().getHostAddress());
        clientConfig.setInstanceName(INSTANCE_NAME);
        return clientConfig;
    }

    public static DefaultMQProducer getProduce(String producerGroup) throws MQClientException, UnknownHostException, InterruptedException {
        DefaultMQProducer producer = new DefaultMQProducer(producerGroup);
        producer.resetClientConfig(getClientConfig());
        producer.setSendMsgTimeout(30000);
        producer.start();
        return producer;
    }


}

编写发送消息测试用例

public class NormalMsgTest {
    /**
     * 发送普通同步消息
     */
    @Test
    public void sendNormalMsg() throws UnknownHostException, MQClientException, MQBrokerException, RemotingException, InterruptedException {
        String topic = "TestTopic";
        DefaultMQProducer normalProduce = RocketMqUtil.getProduce("NORMAL");
        String msg="This is a normal message for Apache RocketMQ2";
        String tag = "tagA";
        String msgId = "msgIdA";
        Message message = new Message(topic, tag, msgId, msg.getBytes(StandardCharsets.UTF_8));
        SendResult send = normalProduce.send(message);
        System.out.println(send);
        msg="This is a normal message for Apache RocketMQ#";
        msgId = "msgIdA2";
        message = new Message(topic, tag, msgId, msg.getBytes(StandardCharsets.UTF_8));
        send = normalProduce.send(message);
        System.out.println(send);
        normalProduce.shutdown();
    }

    /**
     * 发送普通异步消息
     */
    @Test
    public void sendAsyncNormalMsg() throws InterruptedException, UnknownHostException, MQClientException, MQBrokerException, RemotingException {
        String topic = "TestTopic";
        String msg="This is a normal message for Apache RocketMQ";
        String tag = "tagB";
        String msgId = "msgIdB";
        MQProducer normalProduce = RocketMqUtil.getProduce("NORMAL");
        Message message = new Message(topic, tag, msg.getBytes(StandardCharsets.UTF_8));
        normalProduce.send(message,new SendCallback() {

            @Override
            public void onSuccess(SendResult sendResult) {
                System.out.println(sendResult);
            }

            @Override
            public void onException(Throwable e) {
                e.printStackTrace();
            }
        });
        Thread.sleep(Long.MAX_VALUE);
    }
}

然后控制台就可以看到我们发送的消息

Springboot项目

更改API接口连接超时时间

从git 下载spring-rocketmq源码 源码地址:https://github.com/apache/rocketmq-spring.git

我们要修改rocketmq-spring-boot模块下的三个文件,如下图

@SuppressWarnings("WeakerAccess")
@ConfigurationProperties(prefix = "rocketmq")
public class RocketMQProperties {

    /**
     * The name server for rocketMQ, formats: `host:port;host:port`.
     */
    private String nameServer;

    /**
     * Enum type for accessChannel, values: LOCAL, CLOUD
     */
    private String accessChannel;

    /**
     * mqClient Api request timeout.
     */
    private int apiTimeout = 3000;



    private Producer producer;

    /**
     * for pull consumer only
     *
     * @see org.apache.rocketmq.spring.annotation.RocketMQMessageListener for a push consumer
     */
    private PullConsumer pullConsumer = new PullConsumer();
    /**
     * Configure enable listener or not.
     * In some particular cases, if you don't want the the listener is enabled when container startup,
     * the configuration pattern is like this :
     * rocketmq.push-consumer.listeners.<group-name>.<topic-name>.enabled=<boolean value, true or false>
     * <p>
     * the listener is enabled by default.
     *
     */
    private PushConsumer consumer = new PushConsumer();

    public String getNameServer() {
        return nameServer;
    }

    public void setNameServer(String nameServer) {
        this.nameServer = nameServer;
    }

    public String getAccessChannel() {
        return accessChannel;
    }

    public void setAccessChannel(String accessChannel) {
        this.accessChannel = accessChannel;
    }

    public int getApiTimeout() {
        return apiTimeout;
    }

    public void setApiTimeout(int apiTimeout) {
        this.apiTimeout = apiTimeout;
    }

    public RocketMQProperties.Producer getProducer() {
        return producer;
    }

    public void setProducer(RocketMQProperties.Producer producer) {
        this.producer = producer;
    }

    public PullConsumer getPullConsumer() {
        return pullConsumer;
    }

    public void setPullConsumer(PullConsumer pullConsumer) {
        this.pullConsumer = pullConsumer;
    }

    public PushConsumer getConsumer() {
        return consumer;
    }

    public void setConsumer(PushConsumer consumer) {
        this.consumer = consumer;
    }

    public static class Producer {

        /**
         * Group name of producer.
         */
        private String group;

        /**
         * Namespace for this MQ Producer instance.
         */
        private String namespace;

        /**
         * The namespace v2 version of producer, it can not be used in combination with namespace.
         */
        private String namespaceV2;

        /**
         * Millis of send message timeout.
         */
        private int sendMessageTimeout = 3000;

        /**
         * Compress message body threshold, namely, message body larger than 4k will be compressed on default.
         */
        private int compressMessageBodyThreshold = 1024 * 4;

        /**
         * Maximum number of retry to perform internally before claiming sending failure in synchronous mode.
         * This may potentially cause message duplication which is up to application developers to resolve.
         */
        private int retryTimesWhenSendFailed = 2;

        /**
         * <p> Maximum number of retry to perform internally before claiming sending failure in asynchronous mode. </p>
         * This may potentially cause message duplication which is up to application developers to resolve.
         */
        private int retryTimesWhenSendAsyncFailed = 2;

        /**
         * Indicate whether to retry another broker on sending failure internally.
         */
        private boolean retryNextServer = false;

        /**
         * Maximum allowed message size in bytes.
         */
        private int maxMessageSize = 1024 * 1024 * 4;

        /**
         * The property of "access-key".
         */
        private String accessKey;

        /**
         * The property of "secret-key".
         */
        private String secretKey;

        /**
         * Switch flag instance for message trace.
         */
        private boolean enableMsgTrace = false;

        /**
         * The name value of message trace topic.If you don't config,you can use the default trace topic name.
         */
        private String customizedTraceTopic = TopicValidator.RMQ_SYS_TRACE_TOPIC;

        /**
         * The property of "tlsEnable".
         */
        private boolean tlsEnable = false;

        /**
         * The property of "instanceName".
         */
        private String instanceName = "DEFAULT";

        public String getGroup() {
            return group;
        }

        public void setGroup(String group) {
            this.group = group;
        }

        public int getSendMessageTimeout() {
            return sendMessageTimeout;
        }

        public void setSendMessageTimeout(int sendMessageTimeout) {
            this.sendMessageTimeout = sendMessageTimeout;
        }

        public int getCompressMessageBodyThreshold() {
            return compressMessageBodyThreshold;
        }

        public void setCompressMessageBodyThreshold(int compressMessageBodyThreshold) {
            this.compressMessageBodyThreshold = compressMessageBodyThreshold;
        }

        public int getRetryTimesWhenSendFailed() {
            return retryTimesWhenSendFailed;
        }

        public void setRetryTimesWhenSendFailed(int retryTimesWhenSendFailed) {
            this.retryTimesWhenSendFailed = retryTimesWhenSendFailed;
        }

        public int getRetryTimesWhenSendAsyncFailed() {
            return retryTimesWhenSendAsyncFailed;
        }

        public void setRetryTimesWhenSendAsyncFailed(int retryTimesWhenSendAsyncFailed) {
            this.retryTimesWhenSendAsyncFailed = retryTimesWhenSendAsyncFailed;
        }

        public boolean isRetryNextServer() {
            return retryNextServer;
        }

        public void setRetryNextServer(boolean retryNextServer) {
            this.retryNextServer = retryNextServer;
        }

        public int getMaxMessageSize() {
            return maxMessageSize;
        }

        public void setMaxMessageSize(int maxMessageSize) {
            this.maxMessageSize = maxMessageSize;
        }

        public String getAccessKey() {
            return accessKey;
        }

        public void setAccessKey(String accessKey) {
            this.accessKey = accessKey;
        }

        public String getSecretKey() {
            return secretKey;
        }

        public void setSecretKey(String secretKey) {
            this.secretKey = secretKey;
        }

        public boolean isEnableMsgTrace() {
            return enableMsgTrace;
        }

        public void setEnableMsgTrace(boolean enableMsgTrace) {
            this.enableMsgTrace = enableMsgTrace;
        }

        public String getCustomizedTraceTopic() {
            return customizedTraceTopic;
        }

        public void setCustomizedTraceTopic(String customizedTraceTopic) {
            this.customizedTraceTopic = customizedTraceTopic;
        }

        public boolean isTlsEnable() {
            return tlsEnable;
        }

        public void setTlsEnable(boolean tlsEnable) {
            this.tlsEnable = tlsEnable;
        }

        public String getNamespace() {
            return namespace;
        }

        public void setNamespace(String namespace) {
            this.namespace = namespace;
        }

        public String getNamespaceV2() {
            return namespaceV2;
        }

        public void setNamespaceV2(String namespaceV2) {
            this.namespaceV2 = namespaceV2;
        }

        public String getInstanceName() {
            return instanceName;
        }

        public void setInstanceName(String instanceName) {
            this.instanceName = instanceName;
        }
    }

    public static class PullConsumer {
        /**
         * Group name of consumer.
         */
        private String group;

        /**
         * Namespace for this MQ Consumer instance.
         */
        private String namespace;

        /**
         * The namespace v2 version of consumer, it can not be used in combination with namespace.
         */
        private String namespaceV2;

        /**
         * Topic name of consumer.
         */
        private String topic;

        /**
         * Control message mode, if you want all subscribers receive message all message, broadcasting is a good choice.
         */
        private String messageModel = "CLUSTERING";

        /**
         * Control how to selector message.
         */
        private String selectorType = "TAG";

        /**
         * Control which message can be select.
         */
        private String selectorExpression = "*";

        /**
         * The property of "access-key".
         */
        private String accessKey;

        /**
         * The property of "secret-key".
         */
        private String secretKey;

        /**
         * Maximum number of messages pulled each time.
         */
        private int pullBatchSize = 10;

        /**
         * Switch flag instance for message trace.
         */
        private boolean enableMsgTrace = false;

        /**
         * The name value of message trace topic.If you don't config,you can use the default trace topic name.
         */
        private String customizedTraceTopic = TopicValidator.RMQ_SYS_TRACE_TOPIC;

        /**
         * The property of "tlsEnable".
         */
        private boolean tlsEnable = false;

        /**
         * The property of "instanceName".
         */
        private String instanceName = "DEFAULT";

        public String getGroup() {
            return group;
        }

        public void setGroup(String group) {
            this.group = group;
        }

        public String getTopic() {
            return topic;
        }

        public void setTopic(String topic) {
            this.topic = topic;
        }

        public String getMessageModel() {
            return messageModel;
        }

        public void setMessageModel(String messageModel) {
            this.messageModel = messageModel;
        }

        public String getSelectorType() {
            return selectorType;
        }

        public void setSelectorType(String selectorType) {
            this.selectorType = selectorType;
        }

        public String getSelectorExpression() {
            return selectorExpression;
        }

        public void setSelectorExpression(String selectorExpression) {
            this.selectorExpression = selectorExpression;
        }

        public String getAccessKey() {
            return accessKey;
        }

        public void setAccessKey(String accessKey) {
            this.accessKey = accessKey;
        }

        public String getSecretKey() {
            return secretKey;
        }

        public void setSecretKey(String secretKey) {
            this.secretKey = secretKey;
        }

        public int getPullBatchSize() {
            return pullBatchSize;
        }

        public void setPullBatchSize(int pullBatchSize) {
            this.pullBatchSize = pullBatchSize;
        }

        public boolean isEnableMsgTrace() {
            return enableMsgTrace;
        }

        public void setEnableMsgTrace(boolean enableMsgTrace) {
            this.enableMsgTrace = enableMsgTrace;
        }

        public String getCustomizedTraceTopic() {
            return customizedTraceTopic;
        }

        public void setCustomizedTraceTopic(String customizedTraceTopic) {
            this.customizedTraceTopic = customizedTraceTopic;
        }

        public boolean isTlsEnable() {
            return tlsEnable;
        }

        public void setTlsEnable(boolean tlsEnable) {
            this.tlsEnable = tlsEnable;
        }

        public String getNamespace() {
            return namespace;
        }

        public void setNamespace(String namespace) {
            this.namespace = namespace;
        }

        public String getNamespaceV2() {
            return namespaceV2;
        }

        public void setNamespaceV2(String namespaceV2) {
            this.namespaceV2 = namespaceV2;
        }

        public String getInstanceName() {
            return instanceName;
        }

        public void setInstanceName(String instanceName) {
            this.instanceName = instanceName;
        }
    }

    public static class PushConsumer extends PullConsumer {
        /**
         * listener configuration container
         * the pattern is like this:
         * group1.topic1 = false
         * group2.topic2 = true
         * group3.topic3 = false
         */
        private Map<String, Map<String, Boolean>> listeners = new HashMap<>();

        public Map<String, Map<String, Boolean>> getListeners() {
            return listeners;
        }

        public void setListeners(Map<String, Map<String, Boolean>> listeners) {
            this.listeners = listeners;
        }
    }

}
@SuppressWarnings("WeakerAccess")
public class DefaultRocketMQListenerContainer implements InitializingBean,
        RocketMQListenerContainer, SmartLifecycle, ApplicationContextAware {
    private final static Logger log = LoggerFactory.getLogger(DefaultRocketMQListenerContainer.class);

    private ApplicationContext applicationContext;

    /**
     * The name of the DefaultRocketMQListenerContainer instance
     */
    private String name;

    /**
     * Suspending pulling time in orderly mode.
     * <p>
     * The minimum value is 10 and the maximum is 30000.
     */
    private long suspendCurrentQueueTimeMillis = 1000;

    /**
     * Message consume retry strategy in concurrently mode.
     * <p>
     * -1,no retry,put into DLQ directly
     * 0,broker control retry frequency
     * >0,client control retry frequency
     */
    private int delayLevelWhenNextConsume = 0;

    private String nameServer;

    private AccessChannel accessChannel = AccessChannel.LOCAL;

    private String consumerGroup;

    private String topic;

    private int consumeThreadMax = 64;

    private int consumeThreadNumber = 20;

    private String charset = "UTF-8";

    private MessageConverter messageConverter;

    private RocketMQListener rocketMQListener;

    private RocketMQReplyListener rocketMQReplyListener;

    private RocketMQMessageListener rocketMQMessageListener;

    private DefaultMQPushConsumer consumer;

    private Type messageType;

    private MethodParameter methodParameter;

    private boolean running;

    // The following properties came from @RocketMQMessageListener.
    private ConsumeMode consumeMode;
    private SelectorType selectorType;
    private String selectorExpression;
    private MessageModel messageModel;
    private long consumeTimeout;
    private int maxReconsumeTimes;
    private int replyTimeout;
    private String tlsEnable;
    private String namespace;
    private String namespaceV2;
    private long awaitTerminationMillisWhenShutdown;

    private String instanceName;

    private int mqClientApiTimeout;

    public long getSuspendCurrentQueueTimeMillis() {
        return suspendCurrentQueueTimeMillis;
    }

    public void setSuspendCurrentQueueTimeMillis(long suspendCurrentQueueTimeMillis) {
        this.suspendCurrentQueueTimeMillis = suspendCurrentQueueTimeMillis;
    }

    public int getDelayLevelWhenNextConsume() {
        return delayLevelWhenNextConsume;
    }

    public void setDelayLevelWhenNextConsume(int delayLevelWhenNextConsume) {
        this.delayLevelWhenNextConsume = delayLevelWhenNextConsume;
    }

    public String getNameServer() {
        return nameServer;
    }

    public void setNameServer(String nameServer) {
        this.nameServer = nameServer;
    }

    public AccessChannel getAccessChannel() {
        return accessChannel;
    }

    public void setAccessChannel(AccessChannel accessChannel) {
        this.accessChannel = accessChannel;
    }

    public String getConsumerGroup() {
        return consumerGroup;
    }

    public void setConsumerGroup(String consumerGroup) {
        this.consumerGroup = consumerGroup;
    }

    public String getTopic() {
        return topic;
    }

    public void setTopic(String topic) {
        this.topic = topic;
    }

    public int getConsumeThreadMax() {
        return consumeThreadMax;
    }

    public int getConsumeThreadNumber() {
        return consumeThreadNumber;
    }

    public String getCharset() {
        return charset;
    }

    public void setCharset(String charset) {
        this.charset = charset;
    }

    public int getMqClientApiTimeout() {
        return mqClientApiTimeout;
    }

    public void setMqClientApiTimeout(int mqClientApiTimeout) {
        this.mqClientApiTimeout = mqClientApiTimeout;
    }

    public MessageConverter getMessageConverter() {
        return messageConverter;
    }

    public DefaultRocketMQListenerContainer setMessageConverter(MessageConverter messageConverter) {
        this.messageConverter = messageConverter;
        return this;
    }

    public RocketMQListener getRocketMQListener() {
        return rocketMQListener;
    }

    public void setRocketMQListener(RocketMQListener rocketMQListener) {
        this.rocketMQListener = rocketMQListener;
    }

    public RocketMQReplyListener getRocketMQReplyListener() {
        return rocketMQReplyListener;
    }

    public void setRocketMQReplyListener(RocketMQReplyListener rocketMQReplyListener) {
        this.rocketMQReplyListener = rocketMQReplyListener;
    }

    public RocketMQMessageListener getRocketMQMessageListener() {
        return rocketMQMessageListener;
    }

    public void setRocketMQMessageListener(RocketMQMessageListener anno) {
        this.rocketMQMessageListener = anno;

        this.consumeMode = anno.consumeMode();
        this.consumeThreadMax = anno.consumeThreadMax();
        this.consumeThreadNumber = anno.consumeThreadNumber();
        this.messageModel = anno.messageModel();
        this.selectorType = anno.selectorType();
        this.selectorExpression = anno.selectorExpression();
        this.consumeTimeout = anno.consumeTimeout();
        this.maxReconsumeTimes = anno.maxReconsumeTimes();
        this.replyTimeout = anno.replyTimeout();
        this.tlsEnable = anno.tlsEnable();
        this.namespace = anno.namespace();
        this.namespaceV2 = anno.namespaceV2();
        this.delayLevelWhenNextConsume = anno.delayLevelWhenNextConsume();
        this.suspendCurrentQueueTimeMillis = anno.suspendCurrentQueueTimeMillis();
        this.awaitTerminationMillisWhenShutdown = Math.max(0, anno.awaitTerminationMillisWhenShutdown());
        this.instanceName = anno.instanceName();
    }

    public ConsumeMode getConsumeMode() {
        return consumeMode;
    }

    public SelectorType getSelectorType() {
        return selectorType;
    }

    public void setSelectorExpression(String selectorExpression) {
        this.selectorExpression = selectorExpression;
    }

    public String getSelectorExpression() {
        return selectorExpression;
    }

    public MessageModel getMessageModel() {
        return messageModel;
    }

    public String getTlsEnable() {
        return tlsEnable;
    }

    public void setTlsEnable(String tlsEnable) {
        this.tlsEnable = tlsEnable;
    }

    public String getNamespace() {
        return namespace;
    }

    public void setNamespace(String namespace) {
        this.namespace = namespace;
    }

    public String getNamespaceV2() {
        return namespaceV2;
    }

    public void setNamespaceV2(String namespaceV2) {
        this.namespaceV2 = namespaceV2;
    }

    public DefaultMQPushConsumer getConsumer() {
        return consumer;
    }

    public void setConsumer(DefaultMQPushConsumer consumer) {
        this.consumer = consumer;
    }

    public long getAwaitTerminationMillisWhenShutdown() {
        return awaitTerminationMillisWhenShutdown;
    }

    public String getInstanceName() {
        return instanceName;
    }

    public void setInstanceName(String instanceName) {
        this.instanceName = instanceName;
    }

    public DefaultRocketMQListenerContainer setAwaitTerminationMillisWhenShutdown(long awaitTerminationMillisWhenShutdown) {
        this.awaitTerminationMillisWhenShutdown = awaitTerminationMillisWhenShutdown;
        return this;
    }

    @Override
    public void destroy() {
        this.setRunning(false);
        if (Objects.nonNull(consumer)) {
            consumer.shutdown();
        }
        log.info("container destroyed, {}", this.toString());
    }

    @Override
    public boolean isAutoStartup() {
        return true;
    }

    @Override
    public void stop(Runnable callback) {
        stop();
        callback.run();
    }

    @Override
    public void start() {
        if (this.isRunning()) {
            throw new IllegalStateException("container already running. " + this.toString());
        }

        try {
            consumer.start();
        } catch (MQClientException e) {
            throw new IllegalStateException("Failed to start RocketMQ push consumer", e);
        }
        this.setRunning(true);

        log.info("running container: {}", this.toString());
    }

    @Override
    public void stop() {
        if (this.isRunning()) {
            if (Objects.nonNull(consumer)) {
                consumer.shutdown();
            }
            setRunning(false);
        }
    }

    @Override
    public boolean isRunning() {
        return running;
    }

    private void setRunning(boolean running) {
        this.running = running;
    }

    @Override
    public int getPhase() {
        // Returning Integer.MAX_VALUE only suggests that
        // we will be the first bean to shutdown and last bean to start
        return Integer.MAX_VALUE;
    }

    @Override
    public void afterPropertiesSet() throws Exception {
        initRocketMQPushConsumer();

        this.messageType = getMessageType();
        this.methodParameter = getMethodParameter();
        log.debug("RocketMQ messageType: {}", messageType);
    }

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = applicationContext;
    }

    @Override
    public String toString() {
        return "DefaultRocketMQListenerContainer{" +
                "consumerGroup='" + consumerGroup + '\'' +
                ", namespace='" + namespace + '\'' +
                ", namespaceV2='" + namespaceV2 + '\'' +
                ", nameServer='" + nameServer + '\'' +
                ", topic='" + topic + '\'' +
                ", consumeMode=" + consumeMode +
                ", selectorType=" + selectorType +
                ", selectorExpression='" + selectorExpression + '\'' +
                ", messageModel=" + messageModel + '\'' +
                ", tlsEnable=" + tlsEnable +
                ", instanceName=" + instanceName +
                '}';
    }

    public void setName(String name) {
        this.name = name;
    }

    public class DefaultMessageListenerConcurrently implements MessageListenerConcurrently {

        @SuppressWarnings("unchecked")
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            for (MessageExt messageExt : msgs) {
                log.debug("received msg: {}", messageExt);
                try {
                    long now = System.currentTimeMillis();
                    handleMessage(messageExt);
                    long costTime = System.currentTimeMillis() - now;
                    log.debug("consume {} cost: {} ms", messageExt.getMsgId(), costTime);
                } catch (Exception e) {
                    log.warn("consume message failed. messageId:{}, topic:{}, reconsumeTimes:{}", messageExt.getMsgId(), messageExt.getTopic(), messageExt.getReconsumeTimes(), e);
                    context.setDelayLevelWhenNextConsume(delayLevelWhenNextConsume);
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }
            }

            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    }

    public class DefaultMessageListenerOrderly implements MessageListenerOrderly {

        @SuppressWarnings("unchecked")
        @Override
        public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
            for (MessageExt messageExt : msgs) {
                log.debug("received msg: {}", messageExt);
                try {
                    long now = System.currentTimeMillis();
                    handleMessage(messageExt);
                    long costTime = System.currentTimeMillis() - now;
                    log.debug("consume {} cost: {} ms", messageExt.getMsgId(), costTime);
                } catch (Exception e) {
                    log.warn("consume message failed. messageId:{}, topic:{}, reconsumeTimes:{}", messageExt.getMsgId(), messageExt.getTopic(), messageExt.getReconsumeTimes(), e);
                    context.setSuspendCurrentQueueTimeMillis(suspendCurrentQueueTimeMillis);
                    return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
                }
            }

            return ConsumeOrderlyStatus.SUCCESS;
        }
    }

    public void handleMessage(
            MessageExt messageExt) throws MQClientException, RemotingException, InterruptedException {
        if (rocketMQListener != null) {
            rocketMQListener.onMessage(doConvertMessage(messageExt));
        } else if (rocketMQReplyListener != null) {
            Object replyContent = rocketMQReplyListener.onMessage(doConvertMessage(messageExt));
            Message<?> message = MessageBuilder.withPayload(replyContent).build();

            org.apache.rocketmq.common.message.Message replyMessage = MessageUtil.createReplyMessage(messageExt, convertToBytes(message));
            DefaultMQProducer producer = consumer.getDefaultMQPushConsumerImpl().getmQClientFactory().getDefaultMQProducer();
            producer.setSendMsgTimeout(replyTimeout);
            producer.send(replyMessage, new SendCallback() {
                @Override
                public void onSuccess(SendResult sendResult) {
                    if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
                        log.error("Consumer replies message failed. SendStatus: {}", sendResult.getSendStatus());
                    } else {
                        log.debug("Consumer replies message success.");
                    }
                }

                @Override
                public void onException(Throwable e) {
                    log.error("Consumer replies message failed. error: {}", e.getLocalizedMessage());
                }
            });
        }
    }

    private byte[] convertToBytes(Message<?> message) {
        Message<?> messageWithSerializedPayload = doConvert(message.getPayload(), message.getHeaders());
        Object payloadObj = messageWithSerializedPayload.getPayload();
        byte[] payloads;
        try {
            if (null == payloadObj) {
                throw new RuntimeException("the message cannot be empty");
            }
            if (payloadObj instanceof String) {
                payloads = ((String) payloadObj).getBytes(Charset.forName(charset));
            } else if (payloadObj instanceof byte[]) {
                payloads = (byte[]) messageWithSerializedPayload.getPayload();
            } else {
                String jsonObj = (String) this.messageConverter.fromMessage(messageWithSerializedPayload, payloadObj.getClass());
                if (null == jsonObj) {
                    throw new RuntimeException(String.format(
                            "empty after conversion [messageConverter:%s,payloadClass:%s,payloadObj:%s]",
                            this.messageConverter.getClass(), payloadObj.getClass(), payloadObj));
                }
                payloads = jsonObj.getBytes(Charset.forName(charset));
            }
        } catch (Exception e) {
            throw new RuntimeException("convert to bytes failed.", e);
        }
        return payloads;
    }

    private Message<?> doConvert(Object payload, MessageHeaders headers) {
        Message<?> message = this.messageConverter instanceof SmartMessageConverter ?
                ((SmartMessageConverter) this.messageConverter).toMessage(payload, headers, null) :
                this.messageConverter.toMessage(payload, headers);
        if (message == null) {
            String payloadType = payload.getClass().getName();
            Object contentType = headers != null ? headers.get(MessageHeaders.CONTENT_TYPE) : null;
            throw new MessageConversionException("Unable to convert payload with type='" + payloadType +
                    "', contentType='" + contentType + "', converter=[" + this.messageConverter + "]");
        }
        MessageBuilder<?> builder = MessageBuilder.fromMessage(message);
        builder.setHeaderIfAbsent(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.TEXT_PLAIN);
        return builder.build();
    }

    @SuppressWarnings("unchecked")
    private Object doConvertMessage(MessageExt messageExt) {
        if (Objects.equals(messageType, MessageExt.class) || Objects.equals(messageType, org.apache.rocketmq.common.message.Message.class)) {
            return messageExt;
        } else {
            String str = new String(messageExt.getBody(), Charset.forName(charset));
            if (Objects.equals(messageType, String.class)) {
                return str;
            } else {
                // If msgType not string, use objectMapper change it.
                try {
                    if (messageType instanceof Class) {
                        //if the messageType has not Generic Parameter
                        return this.getMessageConverter().fromMessage(MessageBuilder.withPayload(str).build(), (Class<?>) messageType);
                    } else {
                        //if the messageType has Generic Parameter, then use SmartMessageConverter#fromMessage with third parameter "conversionHint".
                        //we have validate the MessageConverter is SmartMessageConverter in this#getMethodParameter.
                        return ((SmartMessageConverter) this.getMessageConverter()).fromMessage(MessageBuilder.withPayload(str).build(), (Class<?>) ((ParameterizedType) messageType).getRawType(), methodParameter);
                    }
                } catch (Exception e) {
                    log.info("convert failed. str:{}, msgType:{}", str, messageType);
                    throw new RuntimeException("cannot convert message to " + messageType, e);
                }
            }
        }
    }

    private MethodParameter getMethodParameter() {
        Class<?> targetClass;
        if (rocketMQListener != null) {
            targetClass = AopProxyUtils.ultimateTargetClass(rocketMQListener);
        } else {
            targetClass = AopProxyUtils.ultimateTargetClass(rocketMQReplyListener);
        }
        Type messageType = this.getMessageType();
        Class clazz = null;
        if (messageType instanceof ParameterizedType && messageConverter instanceof SmartMessageConverter) {
            clazz = (Class) ((ParameterizedType) messageType).getRawType();
        } else if (messageType instanceof Class) {
            clazz = (Class) messageType;
        } else {
            throw new RuntimeException("parameterType:" + messageType + " of onMessage method is not supported");
        }
        try {
            final Method method = targetClass.getMethod("onMessage", clazz);
            return new MethodParameter(method, 0);
        } catch (NoSuchMethodException e) {
            e.printStackTrace();
            throw new RuntimeException("parameterType:" + messageType + " of onMessage method is not supported");
        }
    }

    protected Type getMessageType() {
        Class<?> targetClass;
        if (rocketMQListener != null) {
            targetClass = AopProxyUtils.ultimateTargetClass(rocketMQListener);
        } else {
            targetClass = AopProxyUtils.ultimateTargetClass(rocketMQReplyListener);
        }
        Type matchedGenericInterface = null;
        while (Objects.nonNull(targetClass)) {
            Type[] interfaces = targetClass.getGenericInterfaces();
            if (Objects.nonNull(interfaces)) {
                for (Type type : interfaces) {
                    if (type instanceof ParameterizedType &&
                            (Objects.equals(((ParameterizedType) type).getRawType(), RocketMQListener.class) || Objects.equals(((ParameterizedType) type).getRawType(), RocketMQReplyListener.class))) {
                        matchedGenericInterface = type;
                        break;
                    }
                }
            }
            targetClass = targetClass.getSuperclass();
        }
        if (Objects.isNull(matchedGenericInterface)) {
            return Object.class;
        }

        Type[] actualTypeArguments = ((ParameterizedType) matchedGenericInterface).getActualTypeArguments();
        if (Objects.nonNull(actualTypeArguments) && actualTypeArguments.length > 0) {
            return actualTypeArguments[0];
        }
        return Object.class;
    }

    private void initRocketMQPushConsumer() throws MQClientException {
        if (rocketMQListener == null && rocketMQReplyListener == null) {
            throw new IllegalArgumentException("Property 'rocketMQListener' or 'rocketMQReplyListener' is required");
        }
        Assert.notNull(consumerGroup, "Property 'consumerGroup' is required");
        Assert.notNull(nameServer, "Property 'nameServer' is required");
        Assert.notNull(topic, "Property 'topic' is required");

        RPCHook rpcHook = RocketMQUtil.getRPCHookByAkSk(applicationContext.getEnvironment(),
                this.rocketMQMessageListener.accessKey(), this.rocketMQMessageListener.secretKey());
        boolean enableMsgTrace = rocketMQMessageListener.enableMsgTrace();
        if (Objects.nonNull(rpcHook)) {
            consumer = new DefaultMQPushConsumer(consumerGroup, rpcHook, new AllocateMessageQueueAveragely(),
                    enableMsgTrace, this.applicationContext.getEnvironment().
                    resolveRequiredPlaceholders(this.rocketMQMessageListener.customizedTraceTopic()));
            consumer.setVipChannelEnabled(false);
        } else {
            log.debug("Access-key or secret-key not configure in " + this + ".");
            consumer = new DefaultMQPushConsumer(consumerGroup, enableMsgTrace,
                    this.applicationContext.getEnvironment().
                            resolveRequiredPlaceholders(this.rocketMQMessageListener.customizedTraceTopic()));
        }
        consumer.setNamespace(namespace);
        consumer.setNamespaceV2(namespaceV2);

        String customizedNameServer = this.applicationContext.getEnvironment().resolveRequiredPlaceholders(this.rocketMQMessageListener.nameServer());
        if (customizedNameServer != null) {
            consumer.setNamesrvAddr(customizedNameServer);
        } else {
            consumer.setNamesrvAddr(nameServer);
        }
        if (accessChannel != null) {
            consumer.setAccessChannel(accessChannel);
        }
        if (mqClientApiTimeout != 0) {
            consumer.setMqClientApiTimeout(mqClientApiTimeout);
        }
        consumer.setConsumeThreadMax(consumeThreadMax);
        consumer.setConsumeThreadMin(consumeThreadNumber);
        consumer.setConsumeTimeout(consumeTimeout);
        consumer.setMaxReconsumeTimes(maxReconsumeTimes);
        consumer.setAwaitTerminationMillisWhenShutdown(awaitTerminationMillisWhenShutdown);
        consumer.setInstanceName(instanceName);
        switch (messageModel) {
            case BROADCASTING:
                consumer.setMessageModel(org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel.BROADCASTING);
                break;
            case CLUSTERING:
                consumer.setMessageModel(org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel.CLUSTERING);
                break;
            default:
                throw new IllegalArgumentException("Property 'messageModel' was wrong.");
        }

        switch (selectorType) {
            case TAG:
                consumer.subscribe(topic, selectorExpression);
                break;
            case SQL92:
                consumer.subscribe(topic, MessageSelector.bySql(selectorExpression));
                break;
            default:
                throw new IllegalArgumentException("Property 'selectorType' was wrong.");
        }

        switch (consumeMode) {
            case ORDERLY:
                consumer.setMessageListener(new DefaultMessageListenerOrderly());
                break;
            case CONCURRENTLY:
                consumer.setMessageListener(new DefaultMessageListenerConcurrently());
                break;
            default:
                throw new IllegalArgumentException("Property 'consumeMode' was wrong.");
        }

        //if String is not is equal "true" TLS mode will represent the as default value false
        consumer.setUseTLS(new Boolean(tlsEnable));

        if (rocketMQListener instanceof RocketMQPushConsumerLifecycleListener) {
            ((RocketMQPushConsumerLifecycleListener) rocketMQListener).prepareStart(consumer);
        } else if (rocketMQReplyListener instanceof RocketMQPushConsumerLifecycleListener) {
            ((RocketMQPushConsumerLifecycleListener) rocketMQReplyListener).prepareStart(consumer);
        }

    }

}
public class RocketMQMessageListenerContainerRegistrar implements ApplicationContextAware {
    private final static Logger log = LoggerFactory.getLogger(RocketMQMessageListenerContainerRegistrar.class);

    private ConfigurableApplicationContext applicationContext;

    private final AtomicLong counter = new AtomicLong(0);

    private final ConfigurableEnvironment environment;

    private final RocketMQProperties rocketMQProperties;

    private final RocketMQMessageConverter rocketMQMessageConverter;

    private final List<DefaultRocketMQListenerContainer> containers = new ArrayList<>();

    public RocketMQMessageListenerContainerRegistrar(RocketMQMessageConverter rocketMQMessageConverter,
                                                     ConfigurableEnvironment environment, RocketMQProperties rocketMQProperties) {
        this.rocketMQMessageConverter = rocketMQMessageConverter;
        this.environment = environment;
        this.rocketMQProperties = rocketMQProperties;
    }

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = (ConfigurableApplicationContext) applicationContext;
    }

    public void registerContainer(String beanName, Object bean, RocketMQMessageListener annotation) {
        Class<?> clazz = AopProxyUtils.ultimateTargetClass(bean);

        if (RocketMQListener.class.isAssignableFrom(bean.getClass()) && RocketMQReplyListener.class.isAssignableFrom(bean.getClass())) {
            throw new IllegalStateException(clazz + " cannot be both instance of " + RocketMQListener.class.getName() + " and " + RocketMQReplyListener.class.getName());
        }

        if (!RocketMQListener.class.isAssignableFrom(bean.getClass()) && !RocketMQReplyListener.class.isAssignableFrom(bean.getClass())) {
            throw new IllegalStateException(clazz + " is not instance of " + RocketMQListener.class.getName() + " or " + RocketMQReplyListener.class.getName());
        }

        String consumerGroup = this.environment.resolvePlaceholders(annotation.consumerGroup());
        String topic = this.environment.resolvePlaceholders(annotation.topic());

        boolean listenerEnabled =
                (boolean) rocketMQProperties.getConsumer().getListeners().getOrDefault(consumerGroup, Collections.EMPTY_MAP)
                        .getOrDefault(topic, true);

        if (!listenerEnabled) {
            log.debug(
                    "Consumer Listener (group:{},topic:{}) is not enabled by configuration, will ignore initialization.",
                    consumerGroup, topic);
            return;
        }
        validate(annotation);

        String containerBeanName = String.format("%s_%s", DefaultRocketMQListenerContainer.class.getName(),
                counter.incrementAndGet());
        GenericApplicationContext genericApplicationContext = (GenericApplicationContext) applicationContext;

        genericApplicationContext.registerBean(containerBeanName, DefaultRocketMQListenerContainer.class, () -> createRocketMQListenerContainer(containerBeanName, bean, annotation));
        DefaultRocketMQListenerContainer container = genericApplicationContext.getBean(containerBeanName,
                DefaultRocketMQListenerContainer.class);

        containers.add(container);

        log.info("Register the listener to container, listenerBeanName:{}, containerBeanName:{}", beanName, containerBeanName);
    }

    public void startContainer() {
        for (DefaultRocketMQListenerContainer container : containers) {
            if (!container.isRunning()) {
                try {
                    container.start();
                } catch (Exception e) {
                    log.error("Started container failed. {}", container, e);
                    throw new RuntimeException(e);
                }
            }
        }
    }

    private DefaultRocketMQListenerContainer createRocketMQListenerContainer(String name, Object bean,
                                                                             RocketMQMessageListener annotation) {
        DefaultRocketMQListenerContainer container = new DefaultRocketMQListenerContainer();

        container.setRocketMQMessageListener(annotation);

        String nameServer = environment.resolvePlaceholders(annotation.nameServer());
        nameServer = StringUtils.hasLength(nameServer) ? nameServer : rocketMQProperties.getNameServer();
        String accessChannel = environment.resolvePlaceholders(annotation.accessChannel());
        container.setNameServer(nameServer);
        if (StringUtils.hasLength(accessChannel)) {
            container.setAccessChannel(AccessChannel.valueOf(accessChannel));
        }
        container.setTopic(environment.resolvePlaceholders(annotation.topic()));
        String tags = environment.resolvePlaceholders(annotation.selectorExpression());
        if (StringUtils.hasLength(tags)) {
            container.setSelectorExpression(tags);
        }
        container.setConsumerGroup(environment.resolvePlaceholders(annotation.consumerGroup()));
        container.setTlsEnable(environment.resolvePlaceholders(annotation.tlsEnable()));
        if (RocketMQListener.class.isAssignableFrom(bean.getClass())) {
            container.setRocketMQListener((RocketMQListener) bean);
        } else if (RocketMQReplyListener.class.isAssignableFrom(bean.getClass())) {
            container.setRocketMQReplyListener((RocketMQReplyListener) bean);
        }
        container.setMessageConverter(rocketMQMessageConverter.getMessageConverter());
        container.setName(name);

        String namespace = environment.resolvePlaceholders(annotation.namespace());
        container.setNamespace(RocketMQUtil.getNamespace(namespace,
                rocketMQProperties.getConsumer().getNamespace()));

        String namespaceV2 = environment.resolvePlaceholders(annotation.namespaceV2());
        container.setNamespaceV2(RocketMQUtil.getNamespace(namespaceV2,
            rocketMQProperties.getConsumer().getNamespaceV2()));
        container.setMqClientApiTimeout(rocketMQProperties.getApiTimeout());
        return container;
    }

    private void validate(RocketMQMessageListener annotation) {
        if (annotation.consumeMode() == ConsumeMode.ORDERLY &&
                annotation.messageModel() == MessageModel.BROADCASTING) {
            throw new BeanDefinitionValidationException(
                    "Bad annotation definition in @RocketMQMessageListener, messageModel BROADCASTING does not support ORDERLY message!");
        }
    }
}

然后我们重新构建一个jar包并引入依赖

在application.yaml中增加api-timeout时间配置

server:
  port: 8080

rocketmq:
  name-server: '192.168.183.129:30885'
  access-channel: LOCAL
  api-timeout: 30000
  producer:
    group: "SpringBootProducer"
    send-message-timeout: 30000
  consumer:
    group: "SpringBootConsumer"

发送消息代码如下

@Slf4j
@SpringBootTest
class RocketMqTestApplicationTests {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    /**
     * 发送普通消息
     */
    @Test
    void sendNormalMsg() {
        SendResult sendResult = rocketMQTemplate.syncSend("TestTopic2:Springboot", "Springboot send Msg");
        System.out.println(sendResult);
    }


    /**
     * 异步发送普通消息
     * @throws InterruptedException
     */
    @Test
    void sendAsyncNormalMsg() throws InterruptedException {
        rocketMQTemplate.asyncSend("TestTopic2:Springboot","Springboot Send Async Msg",new SendCallback() {

            @Override
            public void onSuccess(SendResult sendResult) {
                log.info(sendResult.toString());
            }

            @Override
            public void onException(Throwable e) {
                log.error(e.toString());
            }
        });
        Thread.sleep(Long.MAX_VALUE);
    }

}

如果代码中出现异常invokeSync call the addr[] timeout 则采用覆盖DefaultMQProducer Bean的方法增加api连接时长的方法

出现这个异常的原因是客户端与服务api调用的时长限制是3秒钟,而当获取broker路由时,时长超过3秒而爆出的异常,如下图所示的位置,当然更深的原因是使用Netty 时与服务端还未建立channel,有兴趣的小伙伴可以继续探索

创建消费者并接收消息

SpringBoot项目

@Slf4j
@Component
@RocketMQMessageListener(topic = "TestTopic2",consumerGroup = "${rocketmq.consumer.group}",selectorExpression="*")
public class ConsumerService implements RocketMQListener<String> {
    @Override
    public void onMessage(String message) {
        log.info("接收到的消息为:{}", message);
    }
}

普通java项目

在RocketmqUtil中创建一个获取消费者的方法

 /**
     * 获取一个队列的消费者
     * @param consumerGroup 消费者分组(用于负载均衡)
     * @param topic 订阅的主题
     * @param subExpression  订阅规则,根据标签进行过滤 例如:"tag1 || tag2 || tag3"
     * @param messageListener 消息监听者
     * @return
     * @throws MQClientException
     * @throws UnknownHostException
     * @throws InterruptedException
     */
    public static DefaultMQPushConsumer getConsume(String consumerGroup, String topic, String subExpression, MessageListener messageListener) throws MQClientException, UnknownHostException, InterruptedException {
        DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer();
        defaultMQPushConsumer.resetClientConfig(getClientConfig());
        defaultMQPushConsumer.setConsumerGroup(consumerGroup);
        defaultMQPushConsumer.subscribe(topic, subExpression);
        //设置消费批量拉取消息的数量
        defaultMQPushConsumer.setConsumeMessageBatchMaxSize(3);
        //设置消费者的最大线程数
        defaultMQPushConsumer.setConsumeThreadMax(1);
        //设置消费者的最小线程数
        defaultMQPushConsumer.setConsumeThreadMin(1);
//        defaultMQPushConsumer.setPullBatchSize(1);
//        defaultMQPushConsumer.setPopBatchNums(1);
        if(messageListener instanceof MessageListenerOrderly) {
            defaultMQPushConsumer.registerMessageListener((MessageListenerOrderly)messageListener);
        } else if (messageListener instanceof MessageListenerConcurrently) {
            defaultMQPushConsumer.registerMessageListener((MessageListenerConcurrently)messageListener);
        }else {
            defaultMQPushConsumer.registerMessageListener(messageListener);
        }
        defaultMQPushConsumer.start();
        return defaultMQPushConsumer;
    }

创建消费者实例

@Test
    public void receiveNormalMsg() throws InterruptedException, MQClientException, MQBrokerException, RemotingException, UnknownHostException {
        String topic = "TestTopic";
        String tag = "tagA";
        //顺序消费 MessageListenerOrderly 对象用于按顺序接收消息。一个队列逐个线程
        MessageListenerOrderly messageListenerOrderly = new MessageListenerOrderly() {
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                System.out.println("msgs:"+JSON.toJSONString(msgs));
                System.out.println("context:"+JSON.toJSONString(context));
                return ConsumeOrderlyStatus.SUCCESS;
            }
        };
        //并发接收消息 MessageListenerConcurrently 对象用于并发接收异步传输的消息
        MessageListenerConcurrently messageListenerConcurrently = new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                System.out.println("msgs:" + JSON.toJSONString(msgs));
//                System.out.println("context:" + JSON.toJSONString(context));
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        };

        DefaultMQPushConsumer normal = RocketMqUtil.getConsume("NORMAL", topic, tag, messageListenerOrderly);
        Thread.sleep(Long.MAX_VALUE);
        normal.shutdown();
    }