rocketmq从节点没变化

两台电脑,没有虚拟机情况下各起两个broker,发100条消息,两个主节点都能接收,有两个问题:
1,明明一共100条,两个主节点消费总数是对的100条,但总生产数却是105这样多一点得数
2,两个从节点一直是0/0,该怎么调整,正常情况生产数不应该和主节点相同吗
〔系统是windows〕
(不要chatgpt ,我就是搜过解决不了才来问的,来个实际上真会的帮我解答一下,gpt选手不要回答了)

这是从节点配置文件

#所属集群名字
    brokerClusterName=rocketmq-cluster
    #broker名字,名字可重复,为了管理,每个master起一个名字,他的slave同他,eg:Amaster叫broker-a,他的slave也叫broker-a
    brokerName=bro-bs
    #0 表示 Master,>0 表示 Slave
    brokerId=1
    #nameServer地址,分号分割
    namesrvAddr=172.18.83.146:9876;172.18.83.160:9876
    #在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
    defaultTopicQueueNums=4
    #是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
    autoCreateTopicEnable=true
    #是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
    autoCreateSubscriptionGroup=true
    #Broker 对外服务的监听端口,
    listenPort=10951
    #删除文件时间点,默认凌晨 4点
    deleteWhen=04
    #文件保留时间,默认 48 小时
    fileReservedTime=120
    #commitLog每个文件的大小默认1G
    mapedFileSizeCommitLog=1073741824
    #ConsumeQueue每个文件默认存30W条,根据业务情况调整
    mapedFileSizeConsumeQueue=300000
    #destroyMapedFileIntervalForcibly=120000
    #redeleteHangedFileInterval=120000
    #检测物理文件磁盘空间
    diskMaxUsedSpaceRatio=88
    #存储路径
    storePathRootDir=E:/soft/java/RocketMQ/many/rocketmq-4.9.3/store/slave2/broker-b-s
    #commitLog 存储路径
    storePathCommitLog=E:/soft/java/RocketMQ/many/rocketmq-4.9.3/store/slave2/broker-b-s/commitlog
    #消费队列存储路径存储路径
    storePathConsumeQueue=E:/soft/java/RocketMQ/many/rocketmq-4.9.3/store/slave2/broker-b-s/consumequeue
    #消息索引存储路径
    storePathIndex=E:/soft/java/RocketMQ/many/rocketmq-4.9.3/store/slave2/broker-b-s/index
    #checkpoint 文件存储路径
    storeCheckpoint=E:/soft/java/RocketMQ/many/rocketmq-4.9.3/store/slave2/checkpoint
    #abort 文件存储路径
    abortFile=E:/soft/java/RocketMQ/many/rocketmq-4.9.3/store/slave2/abort
    #限制的消息大小
    maxMessageSize=65536
    #flushCommitLogLeastPages=4
    #flushConsumeQueueLeastPages=2
    #flushCommitLogThoroughInterval=10000
    #flushConsumeQueueThoroughInterval=60000
    #Broker 的角色
    #- ASYNC_MASTER 异步复制Master
    #- SYNC_MASTER 同步双写Master
    #- SLAVE
    brokerRole=SLAVE
    #刷盘方式
    #- ASYNC_FLUSH 异步刷盘
    #- SYNC_FLUSH 同步刷盘
    flushDiskType=ASYNC_FLUSH
    #checkTransactionMessageEnable=false
    #发消息线程池数量
    #sendMessageThreadPoolNums=128
    #拉消息线程池数量
    #pullMessageThreadPoolNu

该回答引用ChatGPT
问题1:生产数多出5条的原因可能是消息发送时出现了重试,导致消息被发送了多次。可以在消息发送时设置重试次数和重试间隔时间,避免重复发送。
问题2:从节点消费数为0的原因可能是从节点没有正确配置,或者从节点所在的机器无法连接到主节点。可以检查从节点的配置文件是否正确,以及网络连接是否正常。
以下是可能的解决方案:
1. 检查消息发送代码,确保没有重复发送消息的情况。
2. 检查从节点的配置文件,确保从节点的配置正确,包括主节点的地址和端口等信息。
3. 检查网络连接,确保从节点所在的机器可以连接到主节点。
4. 检查RocketMQ的日志,查看是否有相关的错误信息,以便更好地定位问题。
以下是示例代码,用于设置消息发送的重试次数和重试间隔时间:

// 创建消息发送者
DefaultMQProducer producer = new DefaultMQProducer("producer_group");
// 设置重试次数
producer.setRetryTimesWhenSendFailed(3);
// 设置重试间隔时间
producer.setRetryAnotherBrokerWhenNotStoreOK(true);
producer.start();
// 创建消息对象
Message message = new Message("topic", "tag", "key", "body".getBytes());
// 发送消息
SendResult result = producer.send(message);

在上述代码中,`setRetryTimesWhenSendFailed`方法用于设置消息发送失败时的重试次数,`setRetryAnotherBrokerWhenNotStoreOK`方法用于设置是否在发送失败时尝试连接其他Broker节点。这样可以避免因为某个Broker节点宕机导致消息发送失败。不知道你这个问题是否已经解决, 如果还没有解决的话:

如果你已经解决了该问题, 非常希望你能够分享一下解决方案, 写成博客, 将相关链接放在评论区, 以帮助更多的人 ^-^

以下答案由GPT-3.5大模型与博主波罗歌共同编写:
问题一:明明一共100条,两个主节点消费总数是对的100条,但总生产数却是多出了5条,可能的原因有:

1.可能是重复发送了5条信息,可以检查发送端的代码,看是否有重复发送的情况。
2.可能是消息生产端发送消息时,启用了事务机制,但事务未提交,导致消息未被成功发送,但是计数器已经加1。

问题二:从节点消费量一直为0,可能的原因有:

1.可能是从节点的配置文件有误,请检查配置文件是否正确。
2.可能是从节点和主节点的网络连接出现了问题,导致从节点无法消费数据。
3.从节点的消费者组名称和主节点不一致,导致从节点无法消费到消息。请检查消费者组名称是否相同。
4.从节点上的消费者未正确启动或者实现有误,导致无法消费消息。请检查消费者的代码实现是否正确。

以下是一个简单的消息发送和消费示例,使用Java实现:

消息发送端代码:

import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;

public class Producer {
    public static void main(String[] args) throws MQClientException {
        // 实例化生产者组
        DefaultMQProducer producer = new DefaultMQProducer("example_group");
        // 设置NameServer地址-可多个,以";"分隔
        producer.setNamesrvAddr("localhost:9876");
        // 启动生产者
        producer.start();

        try {
            // 创建消息
            Message msg = new Message("TopicTest", // Topic
                    "TagA", // Tag
                    ("Hello RocketMQ ").getBytes("UTF-8") // Message body
            );
            // 发送消息并保持发送结果
            producer.send(msg);
        } catch (Exception e) {
            e.printStackTrace();
        }
        // 关闭生产者
        producer.shutdown();
    }
}

消息消费端代码:

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;

public class Consumer {
    public static void main(String[] args) throws MQClientException {
        // 实例化消费者组
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("example_group");
        // 设置NameServer地址-可多个,以";"分隔
        consumer.setNamesrvAddr("localhost:9876");
        // 订阅主题和标签
        consumer.subscribe("TopicTest", "*");
        // 注册回调函数,处理消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                // 处理消息
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                // 返回消费状态
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        // 启动消费者
        consumer.start();
        System.out.printf("consumer has started.%n");
    }
}

如果我的回答解决了您的问题,请采纳!