【RocketMQ 高可用】- 主从同步(一主一从测试)
【RocketMQ 高可用】- 主从同步的前置(自己实现的简单通信流程)
本文章基于 RocketMQ 4.9.3
1. 前言
前置文章:
2. 搭建主从集群
因为下面要解析源码,所以这里直接在本地搭建,首先就是 broker-master 的配置文件。
brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
# 删除文件的时间点,默认为凌晨4点执行文件清除操作
deleteWhen = 04
# 文件保留时间,当前时间距离上一次超过了 48 小时就认为这个文件过期了
fileReservedTime = 48
# broker的角色
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
# nameserver的地址
namesrvAddr=127.0.0.1:9876
# brokerIp
brokerIP1=127.0.0.1
brokerIP2=127.0.0.1:10912
# 消息存储根路径
storePathRootDir=D:\\javaCode\\rocketmq-source\\config\\store
# commitLog文件的存储路径
storePathCommitLog=D:\\javaCode\\rocketmq-source\\config\\store\\commitlog
# consume queue文件的存储路径
storePathConsumeQueue=D:\\javaCode\\rocketmq-source\\config\\store\\consumequeue
# 消息索引文件的存储路径
storePathIndex=D:\\javaCode\\rocketmq-source\\config\\store\\index
# checkpoint文件的存储路径
storeCheckpoint=D:\\javaCode\\rocketmq-source\\config\\store\\checkpoint
# abort文件的存储路径
abortFile=D:\\javaCode\\rocketmq-source\\config\\store\\abort
里面还有一个配置是 listenPort,这个就是用来标识当前 broker 监听的端口,因为是本地启动两个进程,所以第二个从节点的文件需要配置下这个项,下面是从节点的配置。
brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 1
deleteWhen = 04
fileReservedTime = 48
brokerRole = SLAVE
flushDiskType = SYNC_FLUSH
namesrvAddr=127.0.0.1:9876
brokerIP1=127.0.0.1
storePathRootDir=D:\\javaCode\\rocketmq-source\\config\\slave\\store
storePathCommitLog=D:\\javaCode\\rocketmq-source\\config\\store\\slave\\commitlog
storePathConsumeQueue=D:\\javaCode\\rocketmq-source\\config\\store\\slave\\consumequeue
storePathIndex=D:\\javaCode\\rocketmq-source\\config\\store\\slave\\index
storeCheckpoint=D:\\javaCode\\rocketmq-source\\config\\store\\slave\\checkpoint
abortFile=D:\\javaCode\\rocketmq-source\\config\\store\\slave\\abort
listenPort = 10921
slave 节点监听 10921 端口,所以全部地址是 brokerIP1 + listenPort = 127.0.0.1:10921
,同时 slave 的 brokerId
设置为 1 表示从节点,RocketMQ 中使用 1 和 0 来区分主从节点,同时 brokerRole
也设置成了 SLAVE
表示从节点。
这些配置里面还有一个关键点就是 brokerIP:
- brokerIP1 是网卡的 InetAddress,也就是当前 broker 监听的 IP。
- brokerIP2 跟 brokerIP1 一样,但是存在主从 broker 时,如果在 broker 主节点上配置了 brokerIP2 属性,broker 从节点会连接主节点配置的 brokerIP2 进行同步,所以我们这里就配置下主节点的 brokerIP2,让从节点和主节点进行同步。
下面启动 namesrv,启动如下图所示,注意在启动的时候设置环境变量:ROCKETMQ_HOME=D:\javaCode\rocketmq-source\config
。
下面先启动主节点,也就是 BrokerStartup,也需要设置环境变量 ROCKETMQ_HOME=D:\javaCode\rocketmq-source\config
,同时需要设置启动参数 -c D:\javaCode\rocketmq-source\config\conf\broker.conf
。
这里输出的 null 是主节点地址,如果是 slave 这里就会输出,master 会输出 null,算是我自己调试的时候加的,下面启动 slave 节点,环境变量是 ROCKETMQ_HOME=D:\javaCode\rocketmq-source\config
,设置的配置文件是 -c D:\javaCode\rocketmq-source\config\conf\slave\broker.conf
。
上面输出的 192.168.1.101
就是主节点的地址,端口是 10912
。
下面我们就使用 Producer 往主节点发送 10 条消息,RocketMQ 中消息写入都是写入主节点的,从节点负责数据同步和消息读取,不过由于我们没有使用 Dledger 高可用,现在单纯是搭建一主一从的框架,这种情况下主节点挂了从节点是不会成为主节点的。
上面就是控制台的展示了,可以看到确实是搭建了一主一从的集群。下面我们就用生产者往主节点发送消息,生产者指定了 Namesrv 地址之后发送的消息要发送到哪个节点是 namesrv 决定的,不过这里就是往主节点去写入。
public class Producer {
public static void main(String[] args) throws MQClientException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("testGroup");
producer.setNamesrvAddr("localhost:9876");
producer.start();
for (int i = 0; i < 10; i++) {
try {
Message msg = new Message("TopicMasterSlave" /* Topic */,
"TagA" /* Tag */,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
} catch (Exception e) {
e.printStackTrace();
Thread.sleep(1000);
}
}
producer.shutdown();
}
}
这里就是消息发送成功了,下面来读取消息,至于主节点肯定是能读取到的,所以我们直接把主节点给停掉,看看能不能从从节点读取到消息。
public class Consumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("testGroupConsumer");
consumer.setNamesrvAddr("localhost:9876");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("TopicMasterSlave", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
可以看到从节点确实消费到了消息,如下图所示。
当主节点关闭之后继续发送消息也是会报错,如下所示:
3. 小结
这篇文章只是从本地搭建了一个集群,监听不同端口来模拟集群,实际上真正搭建集群还是得弄几台机子来搭建,只是内存不够开个虚拟机能给我卡死,所以也就没弄了,下一篇文章就正式来说下主从同步的逻辑。
如有错误,欢迎指出!!!
更多推荐
所有评论(0)