0%

RabbitMQ 发布/订阅模式

发布/订阅模式

Publish/Subscribe 发布/订阅模式

一个生成者对应多个消费者

之前我们创建了一个工作队列。工作队列背后的假设是,每个任务都恰好交付给一个工人。
在这一部分中,我们将做一些完全不同的事情-我们将消息传达给多个消费者。这种模式称为“发布/订阅”。

为了说明这种模式,我们将构建一个简单的日志记录系统。它包含两个程序-第一个程序将发出日志消息,第二个程序将接收并打印它们。

graph LR;
    id1([Product]);
    id2([ExChange]);
    id3([amq.gen-2G4YaJ2P3JcJEwHHiRL5JA])
    id4([amq.gen-tsfVrHogVGKF3vGv6-rPWg])
    id5([C1])
    id6([C2])

    style id1 fill:#0ff,stroke:#333;
    style id2 fill:#33c,stroke:#333;
    style id3 fill:#f00,stroke:#333;
    style id4 fill:#f00,stroke:#333;
    style id5 fill:#3cf,stroke:#333;
    style id6 fill:#3cf,stroke:#333;

    id1-->id2;

    id2-->id3-->id5;
    id2-->id4-->id6;
  • 生产者:将消息发送到 交换机/队列
  • 消费者:只能从队列中获取消息
  • 队列可以绑定交换机

如果消息发送到没有队列绑定的交换机上,那么消息将丢失

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
using RabbitMQ.Client;
using System;
using System.Text;

namespace SubscribeMQProduct
{
/// <summary>
/// 发布订阅模式生产者
/// </summary>

class EmitLog
{
static string EXCHANGE_NAME = "logs";

static void Main(string[] args)
{
var factory = new ConnectionFactory() { HostName = "localhost", VirtualHost = "frexport" };

using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
//声明交换机 fanout:交换机类型 主要有fanout,direct,topics三种
channel.ExchangeDeclare(exchange: EXCHANGE_NAME,
type: ExchangeType.Fanout,
durable: true);
for (int i = 0; i < 10; i++)
{
var message = "Message-" + i;
var body = Encoding.UTF8.GetBytes(message);

//发送消息
channel.BasicPublish(exchange: EXCHANGE_NAME,
routingKey: "",
basicProperties: null,
body: body);
Console.WriteLine(" [x] Sent {0}", message);
}
}

Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
}
}
}

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Linq;
using System.Text;

namespace SubscribeMQConsumer
{
/// <summary>
/// 发布订阅模式消费者
/// </summary>
class ReceiveLogs
{
static string EXCHANGE_NAME = "logs";
static void Main(string[] args)
{
var factory = new ConnectionFactory() { HostName = "localhost", VirtualHost = "frexport" };

using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
//声明 交换机
channel.ExchangeDeclare(exchange: EXCHANGE_NAME,
type: ExchangeType.Fanout,
durable:true);

var queueName = channel.QueueDeclare().QueueName;

//队列和交换机绑定
channel.QueueBind(queue: queueName,
exchange: EXCHANGE_NAME,
routingKey: "");

Console.WriteLine(" [*] Waiting for logs.");

var consumer = new EventingBasicConsumer(channel);
consumer.Received += (sender, ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
Console.WriteLine(" [x] {0}", message);
};

channel.BasicConsume(queue: queueName,
noAck: true,
consumer: consumer);


Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
}
}
}
}

配置RabbitMQ

添加交换机

测试

还可以看RabbitMQ 自动生成了两个队列绑定路由

总结:

这个模式下,消息会被交换机转发给每个订阅者,每个订阅消费者都会在MQ端有一个Queue队列。
生产者的消息会转到所有绑定交换机的队列上,消费者消费所有队列消息

RabbitMQ Model介绍

欢迎关注我的其它发布渠道