工作队列
Work Queues 工作队列 工作队列背后的假设是,每个任务都恰好交付给一个工人
一个生成者对应多个消费者
graph LR;
id1([Product]);
id2([Message Quene])
id3([Consumer1]);
id4([Consumer2]);
style id1 fill:#0ff,stroke:#333;
style id2 fill:#f00,stroke:#333;
style id3 fill:#3cf,stroke:#333;
style id4 fill:#3cf,stroke:#333;
id1-->id2;
id2-->id3;
id2-->id4;
将比较复杂比较耗时的任务放在任务队列中,不必立即执行。
任务队列用来管理任务列表,我们在后台的工作可以交给多个线程来完成。
准备工作 创建两个工程一个作为生产者,一个作为消费者
这个时候的消费者,不能立即处理完一个事情,需要消耗一定时间
我们同时开启多个消费者消费任务。
生产者不停的生产新的任务
以下是代码
生产者代码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 using RabbitMQ.Client; using System; using System.Text; using System.Threading; namespace WorkMQProduct { class NewTask { static string QueueName = "task_queue"; static void Main(string[] args) { var factory = new ConnectionFactory() { HostName = "localhost", VirtualHost = "frexport", UserName = "guest", Password = "guest" }; using (var connection = factory.CreateConnection()) using (var channel = connection.CreateModel()) { channel.QueueDeclare(queue: QueueName, durable: true, exclusive: false, autoDelete: false, arguments: null); var properties = channel.CreateBasicProperties(); properties.SetPersistent(true); for (int i = 0; i < 40; i++) { var message = "Task" + i; var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchange: "", routingKey: QueueName, basicProperties: properties, body: body); Console.WriteLine(" [x] Sent {0}", message); Thread.Sleep(100); } } Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } } }
消费者代码 同一队列可以有多个消费者同时消费
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 using RabbitMQ.Client; using RabbitMQ.Client.Events; using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading; namespace WorkMQConsumer { class Worker { static string QueueName = "task_queue"; static void Main(string[] args) { var factory = new ConnectionFactory() { }; factory.HostName = "localhost"; factory.VirtualHost = "frexport"; factory.UserName = "guest"; factory.Password = "guest"; using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { channel.QueueDeclare(queue: QueueName, durable: true, exclusive: false, autoDelete: false, arguments: null); //同一时刻服务器只发送一条消息给消费端 channel.BasicQos(prefetchCount: 1, prefetchSize: 0, global: false); Console.WriteLine(" [*] Waiting for message."); var consumer = new EventingBasicConsumer(channel); consumer.Received += (sender, ea) => { var body = ea.Body; var message = System.Text.Encoding.UTF8.GetString(body); Console.WriteLine(" [x] Receive {0} {1}", message, DateTime.Now); Thread.Sleep(1000); //消息消费完给服务器返回确认状态,表示该消息已被消费 channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); }; channel.BasicConsume(queue: QueueName, noAck: false, consumer: consumer); Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } } } } }
当然需要添加Queue
消息消费的两种模式 1. 自动模式 消费者从消息队列获取消息后,服务端就认为该消息已经成功消费。
1 2 3 4 5 6 7 8 9 10 11 12 var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); Console.WriteLine(" [x] Received {0}", message); //无需反馈 }; channel.BasicConsume(queue: "hello", noAck: true, consumer: consumer);
2. 手动模式 消费者从消息队列获取消息后,服务端并没有标记为成功消费 消费者成功消费后需要将状态返回到服务端
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 var consumer = new EventingBasicConsumer(channel); consumer.Received += (sender, ea) => { var body = ea.Body; var message = System.Text.Encoding.UTF8.GetString(body); Console.WriteLine(" [x] Receive {0} {1}", message, DateTime.Now); Thread.Sleep(1000); //消息消费完给服务器返回确认状态,表示该消息已被消费 channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); }; channel.BasicConsume(queue: QueueName, noAck: false, consumer: consumer);
RabbitMQ Model介绍