本文章基于 RocketMQ 4.9.3

1. 前言

前置文章:


2. 搭建主从集群

因为下面要解析源码,所以这里直接在本地搭建,首先就是 broker-master 的配置文件。

brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
# 删除文件的时间点,默认为凌晨4点执行文件清除操作
deleteWhen = 04
# 文件保留时间,当前时间距离上一次超过了 48 小时就认为这个文件过期了
fileReservedTime = 48
# broker的角色
brokerRole = ASYNC_MASTER

flushDiskType = ASYNC_FLUSH

# nameserver的地址
namesrvAddr=127.0.0.1:9876
# brokerIp
brokerIP1=127.0.0.1
brokerIP2=127.0.0.1:10912
# 消息存储根路径
storePathRootDir=D:\\javaCode\\rocketmq-source\\config\\store
# commitLog文件的存储路径
storePathCommitLog=D:\\javaCode\\rocketmq-source\\config\\store\\commitlog
# consume queue文件的存储路径
storePathConsumeQueue=D:\\javaCode\\rocketmq-source\\config\\store\\consumequeue
# 消息索引文件的存储路径
storePathIndex=D:\\javaCode\\rocketmq-source\\config\\store\\index
# checkpoint文件的存储路径
storeCheckpoint=D:\\javaCode\\rocketmq-source\\config\\store\\checkpoint
# abort文件的存储路径
abortFile=D:\\javaCode\\rocketmq-source\\config\\store\\abort

里面还有一个配置是 listenPort,这个就是用来标识当前 broker 监听的端口,因为是本地启动两个进程,所以第二个从节点的文件需要配置下这个项,下面是从节点的配置。

brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 1
deleteWhen = 04

fileReservedTime = 48

brokerRole = SLAVE

flushDiskType = SYNC_FLUSH

namesrvAddr=127.0.0.1:9876

brokerIP1=127.0.0.1

storePathRootDir=D:\\javaCode\\rocketmq-source\\config\\slave\\store

storePathCommitLog=D:\\javaCode\\rocketmq-source\\config\\store\\slave\\commitlog

storePathConsumeQueue=D:\\javaCode\\rocketmq-source\\config\\store\\slave\\consumequeue

storePathIndex=D:\\javaCode\\rocketmq-source\\config\\store\\slave\\index

storeCheckpoint=D:\\javaCode\\rocketmq-source\\config\\store\\slave\\checkpoint

abortFile=D:\\javaCode\\rocketmq-source\\config\\store\\slave\\abort

listenPort = 10921

slave 节点监听 10921 端口,所以全部地址是 brokerIP1 + listenPort = 127.0.0.1:10921,同时 slave 的 brokerId 设置为 1 表示从节点,RocketMQ 中使用 1 和 0 来区分主从节点,同时 brokerRole 也设置成了 SLAVE 表示从节点。

这些配置里面还有一个关键点就是 brokerIP:

  • brokerIP1 是网卡的 InetAddress,也就是当前 broker 监听的 IP。
  • brokerIP2 跟 brokerIP1 一样,但是存在主从 broker 时,如果在 broker 主节点上配置了 brokerIP2 属性,broker 从节点会连接主节点配置的 brokerIP2 进行同步,所以我们这里就配置下主节点的 brokerIP2,让从节点和主节点进行同步。

下面启动 namesrv,启动如下图所示,注意在启动的时候设置环境变量:ROCKETMQ_HOME=D:\javaCode\rocketmq-source\config
在这里插入图片描述
在这里插入图片描述
下面先启动主节点,也就是 BrokerStartup,也需要设置环境变量 ROCKETMQ_HOME=D:\javaCode\rocketmq-source\config,同时需要设置启动参数 -c D:\javaCode\rocketmq-source\config\conf\broker.conf
在这里插入图片描述
这里输出的 null 是主节点地址,如果是 slave 这里就会输出,master 会输出 null,算是我自己调试的时候加的,下面启动 slave 节点,环境变量是 ROCKETMQ_HOME=D:\javaCode\rocketmq-source\config,设置的配置文件是 -c D:\javaCode\rocketmq-source\config\conf\slave\broker.conf
在这里插入图片描述
上面输出的 192.168.1.101 就是主节点的地址,端口是 10912

下面我们就使用 Producer 往主节点发送 10 条消息,RocketMQ 中消息写入都是写入主节点的,从节点负责数据同步和消息读取,不过由于我们没有使用 Dledger 高可用,现在单纯是搭建一主一从的框架,这种情况下主节点挂了从节点是不会成为主节点的。
在这里插入图片描述
上面就是控制台的展示了,可以看到确实是搭建了一主一从的集群。下面我们就用生产者往主节点发送消息,生产者指定了 Namesrv 地址之后发送的消息要发送到哪个节点是 namesrv 决定的,不过这里就是往主节点去写入。

public class Producer {
    public static void main(String[] args) throws MQClientException, InterruptedException {

        DefaultMQProducer producer = new DefaultMQProducer("testGroup");
        producer.setNamesrvAddr("localhost:9876");
        producer.start();

        for (int i = 0; i < 10; i++) {
            try {
                Message msg = new Message("TopicMasterSlave" /* Topic */,
                    "TagA" /* Tag */,
                    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
                );
                SendResult sendResult = producer.send(msg);
                System.out.printf("%s%n", sendResult);
            } catch (Exception e) {
                e.printStackTrace();
                Thread.sleep(1000);
            }
        }

        producer.shutdown();
    }
}

在这里插入图片描述
在这里插入图片描述

这里就是消息发送成功了,下面来读取消息,至于主节点肯定是能读取到的,所以我们直接把主节点给停掉,看看能不能从从节点读取到消息。
在这里插入图片描述

public class Consumer {

    public static void main(String[] args) throws InterruptedException, MQClientException {

        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("testGroupConsumer");
        consumer.setNamesrvAddr("localhost:9876");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        consumer.subscribe("TopicMasterSlave", "*");
        consumer.registerMessageListener(new MessageListenerConcurrently() {

            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        consumer.start();

        System.out.printf("Consumer Started.%n");
    }
}

可以看到从节点确实消费到了消息,如下图所示。
在这里插入图片描述在这里插入图片描述
当主节点关闭之后继续发送消息也是会报错,如下所示:
在这里插入图片描述


3. 小结

这篇文章只是从本地搭建了一个集群,监听不同端口来模拟集群,实际上真正搭建集群还是得弄几台机子来搭建,只是内存不够开个虚拟机能给我卡死,所以也就没弄了,下一篇文章就正式来说下主从同步的逻辑。





如有错误,欢迎指出!!!

Logo

技术共进,成长同行——讯飞AI开发者社区

更多推荐