0%

RabbitMQ Topics 主题模式

主题模式

Topics 主题模式

主题模式类似 路由模式

路由模式是 完全匹配 模式,主题模式匹配 通配符

graph LR;
p([Product]);
ex([ExChange]);
mq1([amq.gen-fMFRcKxaTxM-o_ApPe_AHw])
mq2([amq.gen-jWFR9bCh4_b52j6KUDt1Sw])
mq3([amq.gen-kkOjkWx9if2mQB_3gcfO4w])
mq4([amq.gen-tNqCT75w_QqSJbVKrJapQQ])
c1([C1])
c2([C2])
c3([C3])
c4([C4])

style p fill:#0ff,stroke:#333;
style ex fill:#33c,stroke:#333;
style mq1 fill:#f00,stroke:#333;
style mq2 fill:#f00,stroke:#333;
style mq3 fill:#f00,stroke:#333;
style mq4 fill:#f00,stroke:#333;
style c1 fill:#3cf,stroke:#333;
style c2 fill:#3cf,stroke:#333;
style c3 fill:#3cf,stroke:#333;
style c4 fill:#3cf,stroke:#333;

p-->ex;

ex-->|#|mq1;
mq1-->c1;

ex-->|kern.*|mq2;
mq2-->c2;

ex-->|*.critical|mq3;
mq3-->c3;

ex-->|kern.*|mq4;
ex-->|*.critical|mq4;
mq4-->c4;</pre>

生产者代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
using RabbitMQ.Client;
using System;
using System.Text;

namespace TopicsMQConsumer
{
class EmitLogTopic
{
static string ExchangeName = "topic_logs";
static void Main(string[] args)
{
var factory = new ConnectionFactory()
{
HostName = "localhost",
VirtualHost = "frexport"
};

using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
// 设置交换机以及交换机模式 durable 不设置的话,默认为false
channel.ExchangeDeclare(exchange: ExchangeName,
type: ExchangeType.Topic,
durable: false);
//路由信息
var routingKey = (args.Length > 0) ? args[0] : "anonymous.info";

//消息
var message = (args.Length > 1) ? args[1] : "HelloWorld!";
var body = Encoding.UTF8.GetBytes(message);

channel.BasicPublish(exchange: ExchangeName,
routingKey: routingKey,
basicProperties: null,
body: body);

Console.WriteLine(" [x] Sent '{0}':'{1}'", routingKey, message);
}

Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
}
}

消费者代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Linq;
using System.Text;

namespace TopicsMQConsumer
{
class ReceiveLogsTopic
{
static string ExchangeName = "topic_logs";
static void Main(string[] args)
{
var factory = new ConnectionFactory()
{
HostName = "localhost",
VirtualHost = "frexport"
};

using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
// 设置交换机以及交换机模式 durable 不设置的话,默认为false
channel.ExchangeDeclare(exchange: ExchangeName,
type: ExchangeType.Topic,
durable: false);

var queueName = channel.QueueDeclare().QueueName;

if (args.Length < 1)
{
Console.Error.WriteLine("Usage: {0} [binding_key...]",
Environment.GetCommandLineArgs()[0]);
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
Environment.ExitCode = 1;
return;
}

//消息队列 绑定到 对应交换机的路由上
foreach (var bindingKey in args)
{
channel.QueueBind(queue: queueName,
exchange: ExchangeName,
routingKey: bindingKey);
}

Console.WriteLine(" [*] Waiting for messages. To exit press CTRL+C");

var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
var routingKey = ea.RoutingKey;
Console.WriteLine(" [x] Received '{0}':'{1}'",
routingKey,
message);
};

channel.BasicConsume(queue: queueName,
noAck: true,
consumer: consumer);

Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
}
}
}

配置RabbitMQ

添加交换机

测试:

消费者1 监听 “#” 所有消息

1
$ ./bin/Debug/TopicsMQConsumer.exe "#"

消费者2 只监听 kern.* 通配符

1
$ ./bin/Debug/TopicsMQConsumer.exe "kern.*"

消费者3 只监听 “*.critical” 通配符

1
$ ./bin/Debug/TopicsMQConsumer.exe "*.critical"

消费者4 监听 “kern.“ “.critical” 通配符

1
$ ./bin/Debug/TopicsMQConsumer.exe "kern.*" "*.critical"

生产者发送消息

1
2
3
4
5
6
$ ./bin/Debug/TopicsMQProduct.exe kern.critic wwww
$ ./bin/Debug/TopicsMQProduct.exe kern.1 wwww
$ ./bin/Debug/TopicsMQProduct.exe kwww wwww
$ ./bin/Debug/TopicsMQProduct.exe kwww.critic wwww
$ ./bin/Debug/TopicsMQProduct.exe kwww.criticical wwww
$ ./bin/Debug/TopicsMQProduct.exe kwww.critical wwww

输出结果

RabbitMQ Model介绍

欢迎关注我的其它发布渠道