两台电脑,没有虚拟机情况下各起两个broker,发100条消息,两个主节点都能接收,有两个问题:
1,明明一共100条,两个主节点消费总数是对的100条,但总生产数却是105这样多一点得数
2,两个从节点一直是0/0,该怎么调整,正常情况生产数不应该和主节点相同吗
〔系统是windows〕
(不要chatgpt ,我就是搜过解决不了才来问的,来个实际上真会的帮我解答一下,gpt选手不要回答了)
这是从节点配置文件
#所属集群名字
brokerClusterName=rocketmq-cluster
#broker名字,名字可重复,为了管理,每个master起一个名字,他的slave同他,eg:Amaster叫broker-a,他的slave也叫broker-a
brokerName=bro-bs
#0 表示 Master,>0 表示 Slave
brokerId=1
#nameServer地址,分号分割
namesrvAddr=172.18.83.146:9876;172.18.83.160:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口,
listenPort=10951
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=120
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=E:/soft/java/RocketMQ/many/rocketmq-4.9.3/store/slave2/broker-b-s
#commitLog 存储路径
storePathCommitLog=E:/soft/java/RocketMQ/many/rocketmq-4.9.3/store/slave2/broker-b-s/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=E:/soft/java/RocketMQ/many/rocketmq-4.9.3/store/slave2/broker-b-s/consumequeue
#消息索引存储路径
storePathIndex=E:/soft/java/RocketMQ/many/rocketmq-4.9.3/store/slave2/broker-b-s/index
#checkpoint 文件存储路径
storeCheckpoint=E:/soft/java/RocketMQ/many/rocketmq-4.9.3/store/slave2/checkpoint
#abort 文件存储路径
abortFile=E:/soft/java/RocketMQ/many/rocketmq-4.9.3/store/slave2/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=SLAVE
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNu
该回答引用ChatGPT
// 创建消息发送者
DefaultMQProducer producer = new DefaultMQProducer("producer_group");
// 设置重试次数
producer.setRetryTimesWhenSendFailed(3);
// 设置重试间隔时间
producer.setRetryAnotherBrokerWhenNotStoreOK(true);
producer.start();
// 创建消息对象
Message message = new Message("topic", "tag", "key", "body".getBytes());
// 发送消息
SendResult result = producer.send(message);
还是在当前目录,执行“start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true”。启动broker可能会出现以下三个问题:
以下答案由GPT-3.5大模型与博主波罗歌共同编写:
问题一:明明一共100条,两个主节点消费总数是对的100条,但总生产数却是多出了5条,可能的原因有:
1.可能是重复发送了5条信息,可以检查发送端的代码,看是否有重复发送的情况。
2.可能是消息生产端发送消息时,启用了事务机制,但事务未提交,导致消息未被成功发送,但是计数器已经加1。
问题二:从节点消费量一直为0,可能的原因有:
1.可能是从节点的配置文件有误,请检查配置文件是否正确。
2.可能是从节点和主节点的网络连接出现了问题,导致从节点无法消费数据。
3.从节点的消费者组名称和主节点不一致,导致从节点无法消费到消息。请检查消费者组名称是否相同。
4.从节点上的消费者未正确启动或者实现有误,导致无法消费消息。请检查消费者的代码实现是否正确。
以下是一个简单的消息发送和消费示例,使用Java实现:
消息发送端代码:
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
public class Producer {
public static void main(String[] args) throws MQClientException {
// 实例化生产者组
DefaultMQProducer producer = new DefaultMQProducer("example_group");
// 设置NameServer地址-可多个,以";"分隔
producer.setNamesrvAddr("localhost:9876");
// 启动生产者
producer.start();
try {
// 创建消息
Message msg = new Message("TopicTest", // Topic
"TagA", // Tag
("Hello RocketMQ ").getBytes("UTF-8") // Message body
);
// 发送消息并保持发送结果
producer.send(msg);
} catch (Exception e) {
e.printStackTrace();
}
// 关闭生产者
producer.shutdown();
}
}
消息消费端代码:
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class Consumer {
public static void main(String[] args) throws MQClientException {
// 实例化消费者组
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("example_group");
// 设置NameServer地址-可多个,以";"分隔
consumer.setNamesrvAddr("localhost:9876");
// 订阅主题和标签
consumer.subscribe("TopicTest", "*");
// 注册回调函数,处理消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
// 处理消息
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
// 返回消费状态
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 启动消费者
consumer.start();
System.out.printf("consumer has started.%n");
}
}
如果我的回答解决了您的问题,请采纳!