发布/订阅模式
Publish/Subscribe 发布/订阅模式 一个生成者对应多个消费者
之前我们创建了一个工作队列。工作队列背后的假设是,每个任务都恰好交付给一个工人。 在这一部分中,我们将做一些完全不同的事情-我们将消息传达给多个消费者。这种模式称为“发布/订阅”。
为了说明这种模式,我们将构建一个简单的日志记录系统。它包含两个程序-第一个程序将发出日志消息,第二个程序将接收并打印它们。
graph LR;
id1([Product]);
id2([ExChange]);
id3([amq.gen-2G4YaJ2P3JcJEwHHiRL5JA])
id4([amq.gen-tsfVrHogVGKF3vGv6-rPWg])
id5([C1])
id6([C2])
style id1 fill:#0ff,stroke:#333;
style id2 fill:#33c,stroke:#333;
style id3 fill:#f00,stroke:#333;
style id4 fill:#f00,stroke:#333;
style id5 fill:#3cf,stroke:#333;
style id6 fill:#3cf,stroke:#333;
id1-->id2;
id2-->id3-->id5;
id2-->id4-->id6;
生产者:将消息发送到 交换机/队列
消费者:只能从队列中获取消息
队列可以绑定交换机
如果消息发送到没有队列绑定的交换机上,那么消息将丢失
生产者 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 using RabbitMQ.Client; using System; using System.Text; namespace SubscribeMQProduct { /// <summary> /// 发布订阅模式生产者 /// </summary> class EmitLog { static string EXCHANGE_NAME = "logs"; static void Main(string[] args) { var factory = new ConnectionFactory() { HostName = "localhost", VirtualHost = "frexport" }; using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { //声明交换机 fanout:交换机类型 主要有fanout,direct,topics三种 channel.ExchangeDeclare(exchange: EXCHANGE_NAME, type: ExchangeType.Fanout, durable: true); for (int i = 0; i < 10; i++) { var message = "Message-" + i; var body = Encoding.UTF8.GetBytes(message); //发送消息 channel.BasicPublish(exchange: EXCHANGE_NAME, routingKey: "", basicProperties: null, body: body); Console.WriteLine(" [x] Sent {0}", message); } } Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } } } }
消费者 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 using RabbitMQ.Client; using RabbitMQ.Client.Events; using System; using System.Linq; using System.Text; namespace SubscribeMQConsumer { /// <summary> /// 发布订阅模式消费者 /// </summary> class ReceiveLogs { static string EXCHANGE_NAME = "logs"; static void Main(string[] args) { var factory = new ConnectionFactory() { HostName = "localhost", VirtualHost = "frexport" }; using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { //声明 交换机 channel.ExchangeDeclare(exchange: EXCHANGE_NAME, type: ExchangeType.Fanout, durable:true); var queueName = channel.QueueDeclare().QueueName; //队列和交换机绑定 channel.QueueBind(queue: queueName, exchange: EXCHANGE_NAME, routingKey: ""); Console.WriteLine(" [*] Waiting for logs."); var consumer = new EventingBasicConsumer(channel); consumer.Received += (sender, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); Console.WriteLine(" [x] {0}", message); }; channel.BasicConsume(queue: queueName, noAck: true, consumer: consumer); Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } } } } }
配置RabbitMQ 添加交换机
测试
还可以看RabbitMQ 自动生成了两个队列绑定路由
总结: 这个模式下,消息会被交换机转发给每个订阅者,每个订阅消费者都会在MQ端有一个Queue队列。 生产者的消息会转到所有绑定交换机的队列上,消费者消费所有队列消息
RabbitMQ Model介绍