主题模式
Topics 主题模式
主题模式类似 路由模式
路由模式是 完全匹配 模式,主题模式匹配 通配符
graph LR;
p([Product]);
ex([ExChange]);
mq1([amq.gen-fMFRcKxaTxM-o_ApPe_AHw])
mq2([amq.gen-jWFR9bCh4_b52j6KUDt1Sw])
mq3([amq.gen-kkOjkWx9if2mQB_3gcfO4w])
mq4([amq.gen-tNqCT75w_QqSJbVKrJapQQ])
c1([C1])
c2([C2])
c3([C3])
c4([C4])style p fill:#0ff,stroke:#333; style ex fill:#33c,stroke:#333; style mq1 fill:#f00,stroke:#333; style mq2 fill:#f00,stroke:#333; style mq3 fill:#f00,stroke:#333; style mq4 fill:#f00,stroke:#333; style c1 fill:#3cf,stroke:#333; style c2 fill:#3cf,stroke:#333; style c3 fill:#3cf,stroke:#333; style c4 fill:#3cf,stroke:#333; p-->ex; ex-->|#|mq1; mq1-->c1; ex-->|kern.*|mq2; mq2-->c2; ex-->|*.critical|mq3; mq3-->c3; ex-->|kern.*|mq4; ex-->|*.critical|mq4; mq4-->c4;</pre>
生产者代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44 using RabbitMQ.Client;
using System;
using System.Text;
namespace TopicsMQConsumer
{
class EmitLogTopic
{
static string ExchangeName = "topic_logs";
static void Main(string[] args)
{
var factory = new ConnectionFactory()
{
HostName = "localhost",
VirtualHost = "frexport"
};
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
// 设置交换机以及交换机模式 durable 不设置的话,默认为false
channel.ExchangeDeclare(exchange: ExchangeName,
type: ExchangeType.Topic,
durable: false);
//路由信息
var routingKey = (args.Length > 0) ? args[0] : "anonymous.info";
//消息
var message = (args.Length > 1) ? args[1] : "HelloWorld!";
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: ExchangeName,
routingKey: routingKey,
basicProperties: null,
body: body);
Console.WriteLine(" [x] Sent '{0}':'{1}'", routingKey, message);
}
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
}
}消费者代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70 using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Linq;
using System.Text;
namespace TopicsMQConsumer
{
class ReceiveLogsTopic
{
static string ExchangeName = "topic_logs";
static void Main(string[] args)
{
var factory = new ConnectionFactory()
{
HostName = "localhost",
VirtualHost = "frexport"
};
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
// 设置交换机以及交换机模式 durable 不设置的话,默认为false
channel.ExchangeDeclare(exchange: ExchangeName,
type: ExchangeType.Topic,
durable: false);
var queueName = channel.QueueDeclare().QueueName;
if (args.Length < 1)
{
Console.Error.WriteLine("Usage: {0} [binding_key...]",
Environment.GetCommandLineArgs()[0]);
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
Environment.ExitCode = 1;
return;
}
//消息队列 绑定到 对应交换机的路由上
foreach (var bindingKey in args)
{
channel.QueueBind(queue: queueName,
exchange: ExchangeName,
routingKey: bindingKey);
}
Console.WriteLine(" [*] Waiting for messages. To exit press CTRL+C");
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
var routingKey = ea.RoutingKey;
Console.WriteLine(" [x] Received '{0}':'{1}'",
routingKey,
message);
};
channel.BasicConsume(queue: queueName,
noAck: true,
consumer: consumer);
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
}
}
}配置RabbitMQ
添加交换机
测试:
消费者1 监听 “#” 所有消息
1 $ ./bin/Debug/TopicsMQConsumer.exe "#"消费者2 只监听 kern.* 通配符
1 $ ./bin/Debug/TopicsMQConsumer.exe "kern.*"消费者3 只监听 “*.critical” 通配符
1 $ ./bin/Debug/TopicsMQConsumer.exe "*.critical"消费者4 监听 “kern.“ “.critical” 通配符
1 $ ./bin/Debug/TopicsMQConsumer.exe "kern.*" "*.critical"生产者发送消息
1
2
3
4
5
6 $ ./bin/Debug/TopicsMQProduct.exe kern.critic wwww
$ ./bin/Debug/TopicsMQProduct.exe kern.1 wwww
$ ./bin/Debug/TopicsMQProduct.exe kwww wwww
$ ./bin/Debug/TopicsMQProduct.exe kwww.critic wwww
$ ./bin/Debug/TopicsMQProduct.exe kwww.criticical wwww
$ ./bin/Debug/TopicsMQProduct.exe kwww.critical wwww输出结果
RabbitMQ Model介绍