建立连接时,使用using 每次连接完毕即释放,这样导致了一个问题,除非是进程正在监听中,否则生产出来的消息,消费者接收不到
如果不使用using,保持channel活跃,这样可以将消息队列里面的消息挨个取出来,但是无法使用消息的手动确认
如果在连接服务器处也不使用using,保持连接活跃,这样会保持监听,但是会将所有的消息都取出来
以下代码为片段
//using (var connection = factory.CreateConnection())//连接服务器,即正在创建终结点。
//using (var channel = connection.CreateModel())
var connection = factory.CreateConnection();
var channel = connection.CreateModel();
channel.QueueDeclare(queue: config.QUEENNAME, durable: true, exclusive: false, autoDelete: false, arguments: null);
channel.ExchangeDeclare(exchange: config.EXCHANGENAME, type: "topic", durable: true);
channel.QueueBind(queue: config.QUEENNAME, exchange: config.EXCHANGENAME, routingKey: config.ROUNTINGKEY);
var consumer = new EventingBasicConsumer(channel);//消费者
Utility.WriteLog("连接至消费者");
//确认消息,noAck: false,手动确认
//channel.BasicConsume(queue: config.QUEENNAME, noAck: false, consumer: consumer);
Utility.WriteLog("next1");
//while (true)
//{
consumer.Received += (model, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
var routingKey = ea.RoutingKey;
Utility.WriteLog("W-messagr" + message);
//消息确认
try
{
Utility.WriteLog("W-BasicAck" + System.DateTime.Now.ToString());
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
Utility.WriteLog("W-BasicAck-end" + System.DateTime.Now.ToString());
}
catch(Exception ex)
{
Utility.WriteLog("确认失败" + ex);
}
//channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
Utility.WriteLog("返回ACK");
};
//}
//}
Utility.WriteLog("next2");
channel.BasicConsume(queue: config.QUEENNAME, noAck: false, consumer: consumer);
不需要using释放掉连接,消费端本来就是需要保持长久连接的。消息是可以手动确认的,每处理一条就手动确认一条,确认后队列就会移除该条消息。
看我的代码示例
//队列消费者
public class ReceiveMq
{
private ConnectionFactory factory;
public ReceiveMq() {
factory = new ConnectionFactory() { HostName = "localhost" };
}
public void receive(Action<string> action) {
var connection = factory.CreateConnection();
var channel = connection.CreateModel();
channel.QueueDeclare(queue: "helloaa",
durable: false,
exclusive: false,
autoDelete: false,
arguments: null);
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (sender, e) => {
var body = e.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
action(message);
channel.BasicAck(e.DeliveryTag, false); //确认消息
};
//channel.BasicConsume(queue: "helloaa", autoAck: true, consumer: consumer); //自动确认消息
}
}
//调用
ReceiveMq receiveMq = new ReceiveMq();
receiveMq.receive(msg => {
Console.WriteLine(" [x] Received {0}", msg);
});
有帮助麻烦点个采纳【本回答右上角】,谢谢~~有其他问题可以继续交流~