通过命令行创建

使用docker命令部署的进入容器内,进入命令如下

docker exec -it [容器id或容器名称] /bin/bash 

无论是否在容器内进入到 rocketmq5.3.0/bin 目录下,执行如下命令

./mqadmin updatetopic -n localhost:9876 -t TestTopic -c DefaultCluster

当显示如下显示时表示主题创建成功,然后可以到控制台查看创建的主题

create topic to xx.xx.xx.xx:10911 success.
TopicConfig [topicName=TestTopic2, readQueueNums=8, writeQueueNums=8, perm=RW-, topicFilterType=SINGLE_TAG, topicSysFlag=0, order=false, attributes={}]

通集成代码创建topic

普通Maven项目java代码创建

引入maven依赖 ,写这篇文章时最新版的是5.3.0

<dependency>
      <groupId>org.apache.rocketmq</groupId>
      <artifactId>rocketmq-client</artifactId>
      <version>5.3.0</version>
   </dependency>

测试代码如下

public class TopicTest {

    // 设置成nameser的地址端口。
    String endpoint="192.168.183.129:30885";

    @Test
    public void test() throws UnknownHostException, MQClientException {
        // 配置客户端连接的配置
        ClientConfig clientConfig = new ClientConfig();
        clientConfig.setNamesrvAddr(endpoint);
        clientConfig.setAccessChannel(AccessChannel.LOCAL);
        clientConfig.setClientIP(InetAddress.getLocalHost().getHostAddress());
        clientConfig.setInstanceName("CUSTOMER");
        MQClientInstance clientInstance = new MQClientInstance(clientConfig, 0, clientConfig.buildMQClientId());
        //客户端实例的开始方法会调用与服务端通信监听,包括路由地址的获取,主题的更新,消息的拉取,将生产者的信息注册到服务端等等
        clientInstance.start();
        // 简单配置topic就使用 MQAdminImpl
        MQAdminImpl mqAdminImpl = clientInstance.getMQAdminImpl();
//        想更详细的定义topic 就使用 MQClientAPIImpl.createTopic()  MQAdminImpl.createTopic()还是调用的MQClientAPIImpl方法
//        MQClientAPIImpl mqClientAPIImpl = clientInstance.getMQClientAPIImpl();

        // 介绍一下这三个参数 第一个是rocketmq 已经存在的主题 ,第二个是 新建主题的名称 ,第三个是主题下建的队列的数量
        // 第一个为什么要配置成已有的主题是根据主题获取nameservice下的所有节点信息,在所有节点中都创建该主题
        mqAdminImpl.createTopic("DefaultCluster","java",8);
        clientInstance.shutdown();
    }
}

注:创建客户端实例(MQClientInstance )时同一个进程内可以创建多个,但是每个实例的InstanceName必须是唯一的,建议我们使用完成之后最好使用shutdown关闭一下,不然后面我们创建produce(生产者时)如果使用相同的ClientConfig 会报重复异常

通过springboot集成RocketMQ依赖创建

在yaml中配置nameserver的地址信息

rocketmq:
  name-server: '192.168.183.129:30885'
  access-channel: LOCAL

将config和instance注册为bean

@Component
@EnableConfigurationProperties({RocketMQProperties.class})
public class RocketMQConfig {

    @Bean
    @ConditionalOnMissingBean({ClientConfig.class})
    @ConditionalOnProperty(
            prefix = "rocketmq",
            value = {"name-server"}
    )
    public  ClientConfig clientConfig(RocketMQProperties rocketMQProperties) {
        ClientConfig clientConfig = new ClientConfig();
        clientConfig.setNamesrvAddr(rocketMQProperties.getNameServer());
        clientConfig.setInstanceName("INIT");
        clientConfig.setAccessChannel(AccessChannel.valueOf(rocketMQProperties.getAccessChannel()));
        return clientConfig;
    }
    @Bean
    @ConditionalOnBean({ClientConfig.class})
    @ConditionalOnMissingBean({MQClientInstance.class})
    public MQClientInstance mqClientInstance(ClientConfig clientConfig) {
        return new MQClientInstance(clientConfig, 0, clientConfig.buildMQClientId());
    }

}

创建对应的主题

@SpringBootApplication
public class RocketMqTestApplication {

    public static void main(String[] args) throws MQClientException {
        ConfigurableApplicationContext context = SpringApplication.run(RocketMqTestApplication.class, args);
        MQClientInstance instance = context.getBean(MQClientInstance.class);
        instance.start();
        instance.getMQAdminImpl().createTopic("DefaultCluster","test",8);
        instance.shutdown();
    }

}

在配置开启自动创建

broker.conf中增加下面的配置项

autoCreateTopicEnable = trueautoCreateTopicEnable = true

这样我们在推送消息到服务端时,没有主题会自动创建主题。不过这个配置最好是在开发端开启线上关闭。这样设置的原因网上大部分说的 自动创建主题那么有可能该主题的消息都只会发往一台 Broker,起不到负载均衡的作用

详细的介绍推荐看这篇博客:【RocketMQ进阶二】深度解析RocketMQ 主题的创建机制,为何生产建议关掉自动创建Topic

控制面板中队列参数介绍

BROKER_NAME:当前主题在那个broker(实例机器)中部署。

topicName:当前主题的名称

wirteQueueNums:写队列的数量。写队列是会真实创建存储文件的,负责数据的写入。

readQueueNums:读队列的数量。读队列会记录consumer消费的offset,负责消息的读取。

perm:perm 参数是设置队列的读写权限。

6:同时开启读写

4:禁写

2:禁读

一般情况下,一个写队列会对应一个读队列,因为在往写队列里写Message时,会同步写入到一个对应的读队列中。

  • 如果写队列比读队列多时:因为读写队列是一一对应的,如果写比多时,那么就会导致一些写队列中的一些无法被消费掉,造成消息积压和丢失。

  • 读队列比写队列多时:这样就会导致读队列没有消息写入,如果一些消费者分配到了这些没有消息的读队列,就会导致一些consumer的空转,造成性能的浪费