RocketMq创建主题
通过命令行创建
使用docker命令部署的进入容器内,进入命令如下
docker exec -it [容器id或容器名称] /bin/bash
无论是否在容器内进入到 rocketmq5.3.0/bin
目录下,执行如下命令
./mqadmin updatetopic -n localhost:9876 -t TestTopic -c DefaultCluster
当显示如下显示时表示主题创建成功,然后可以到控制台查看创建的主题
create topic to xx.xx.xx.xx:10911 success.
TopicConfig [topicName=TestTopic2, readQueueNums=8, writeQueueNums=8, perm=RW-, topicFilterType=SINGLE_TAG, topicSysFlag=0, order=false, attributes={}]
通集成代码创建topic
普通Maven项目java代码创建
引入maven依赖 ,写这篇文章时最新版的是5.3.0
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>5.3.0</version>
</dependency>
测试代码如下
public class TopicTest {
// 设置成nameser的地址端口。
String endpoint="192.168.183.129:30885";
@Test
public void test() throws UnknownHostException, MQClientException {
// 配置客户端连接的配置
ClientConfig clientConfig = new ClientConfig();
clientConfig.setNamesrvAddr(endpoint);
clientConfig.setAccessChannel(AccessChannel.LOCAL);
clientConfig.setClientIP(InetAddress.getLocalHost().getHostAddress());
clientConfig.setInstanceName("CUSTOMER");
MQClientInstance clientInstance = new MQClientInstance(clientConfig, 0, clientConfig.buildMQClientId());
//客户端实例的开始方法会调用与服务端通信监听,包括路由地址的获取,主题的更新,消息的拉取,将生产者的信息注册到服务端等等
clientInstance.start();
// 简单配置topic就使用 MQAdminImpl
MQAdminImpl mqAdminImpl = clientInstance.getMQAdminImpl();
// 想更详细的定义topic 就使用 MQClientAPIImpl.createTopic() MQAdminImpl.createTopic()还是调用的MQClientAPIImpl方法
// MQClientAPIImpl mqClientAPIImpl = clientInstance.getMQClientAPIImpl();
// 介绍一下这三个参数 第一个是rocketmq 已经存在的主题 ,第二个是 新建主题的名称 ,第三个是主题下建的队列的数量
// 第一个为什么要配置成已有的主题是根据主题获取nameservice下的所有节点信息,在所有节点中都创建该主题
mqAdminImpl.createTopic("DefaultCluster","java",8);
clientInstance.shutdown();
}
}
注:创建客户端实例(MQClientInstance )时同一个进程内可以创建多个,但是每个实例的InstanceName必须是唯一的,建议我们使用完成之后最好使用shutdown关闭一下,不然后面我们创建produce(生产者时)如果使用相同的ClientConfig 会报重复异常
通过springboot集成RocketMQ依赖创建
在yaml中配置nameserver的地址信息
rocketmq:
name-server: '192.168.183.129:30885'
access-channel: LOCAL
将config和instance注册为bean
@Component
@EnableConfigurationProperties({RocketMQProperties.class})
public class RocketMQConfig {
@Bean
@ConditionalOnMissingBean({ClientConfig.class})
@ConditionalOnProperty(
prefix = "rocketmq",
value = {"name-server"}
)
public ClientConfig clientConfig(RocketMQProperties rocketMQProperties) {
ClientConfig clientConfig = new ClientConfig();
clientConfig.setNamesrvAddr(rocketMQProperties.getNameServer());
clientConfig.setInstanceName("INIT");
clientConfig.setAccessChannel(AccessChannel.valueOf(rocketMQProperties.getAccessChannel()));
return clientConfig;
}
@Bean
@ConditionalOnBean({ClientConfig.class})
@ConditionalOnMissingBean({MQClientInstance.class})
public MQClientInstance mqClientInstance(ClientConfig clientConfig) {
return new MQClientInstance(clientConfig, 0, clientConfig.buildMQClientId());
}
}
创建对应的主题
@SpringBootApplication
public class RocketMqTestApplication {
public static void main(String[] args) throws MQClientException {
ConfigurableApplicationContext context = SpringApplication.run(RocketMqTestApplication.class, args);
MQClientInstance instance = context.getBean(MQClientInstance.class);
instance.start();
instance.getMQAdminImpl().createTopic("DefaultCluster","test",8);
instance.shutdown();
}
}
在配置开启自动创建
在broker.conf中增加下面的配置项
autoCreateTopicEnable = trueautoCreateTopicEnable = true
这样我们在推送消息到服务端时,没有主题会自动创建主题。不过这个配置最好是在开发端开启线上关闭。这样设置的原因网上大部分说的 自动创建主题那么有可能该主题的消息都只会发往一台 Broker,起不到负载均衡的作用。
详细的介绍推荐看这篇博客:【RocketMQ进阶二】深度解析RocketMQ 主题的创建机制,为何生产建议关掉自动创建Topic
控制面板中队列参数介绍
BROKER_NAME:当前主题在那个broker(实例机器)中部署。
topicName:当前主题的名称
wirteQueueNums:写队列的数量。写队列是会真实创建存储文件的,负责数据的写入。
readQueueNums:读队列的数量。读队列会记录consumer消费的offset,负责消息的读取。
perm:perm
参数是设置队列的读写权限。
6:同时开启读写
4:禁写
2:禁读
一般情况下,一个写队列会对应一个读队列,因为在往写队列里写Message时,会同步写入到一个对应的读队列中。
如果写队列比读队列多时:因为读写队列是一一对应的,如果写比多时,那么就会导致一些写队列中的一些无法被消费掉,造成消息积压和丢失。
读队列比写队列多时:这样就会导致读队列没有消息写入,如果一些消费者分配到了这些没有消息的读队列,就会导致一些consumer的空转,造成性能的浪费