0%

RabbitMQ 路由模式

路由模式

Routing 路由模式

graph LR;
    id1([Product]);
    id2([ExChange]);
    id3([amq.gen-DjtYso1eaz52eM3mAJToaw])
    id4([amq.gen-nLrD6gHpPBMY-oqM-tBVcQ])
    id5([C1])
    id6([C2])

    style id1 fill:#0ff,stroke:#333;
    style id2 fill:#33c,stroke:#333;
    style id3 fill:#f00,stroke:#333;
    style id4 fill:#f00,stroke:#333;
    style id5 fill:#3cf,stroke:#333;
    style id6 fill:#3cf,stroke:#333;

    id1-->id2;

    id2-->|error|id3;
    id3-->id5;

    id2-->|error|id4;
    id2-->|info|id4;

    id4-->id6;

注意

  1. 生产者发送消息到交换机,要指定路由Key
  2. 消费者将队列绑定到交换机时需要指定路由Key

这个是一种 完全匹配 只有匹配到的消费者才能消费消息

消息中的路由键值如果和Binding中的binding key 一致,交换机就将消息发送到对应的队列中。
路由键与队列名完全匹配,如果一个队列绑定到交换机要求路由键为”dog”,则只转发routingkey 标记为”dog”的消息,不会转发”dog.puppy”,也不会转发”dog.guard”等等。这个是时 完全匹配、单播的模式

生产者:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
using System;
using RabbitMQ.Client;

namespace RoutingMQProduct
{
class EmitLogDirect
{
static string ExchangeName = "direct_logs";

static void Main(string[] args)
{
var factory = new ConnectionFactory()
{
HostName = "localhost",
VirtualHost = "frexport"
};

using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
//指定交换机以及交换机模式
channel.ExchangeDeclare(exchange: ExchangeName,
type: ExchangeType.Direct,
durable: true);

var severity = (args.Length > 0) ? args[0] : "info";
var message = (args.Length > 1) ? args[1] : "HelloWorld!";

var body = System.Text.Encoding.UTF8.GetBytes(message);

channel.BasicPublish(exchange: ExchangeName,
routingKey:severity,
basicProperties:null,
body:body);

Console.WriteLine(" [x] Sent '{0}':'{1}'", severity, message);

}
}

Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
}
}

消费者:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;

namespace RoutingMQConsumer1
{
class ReceiveLogsDirect
{
static string ExchangeName = "direct_logs";

static void Main(string[] args)
{
var factory = new ConnectionFactory()
{
HostName = "localhost",
VirtualHost = "frexport"
};

using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
// 设置交换机以及交换机模式
channel.ExchangeDeclare(exchange: ExchangeName,
type: ExchangeType.Direct,
durable:true);

//获取当前消息队列名称
var queueName = channel.QueueDeclare().QueueName;

if (args.Length < 1)
{
Console.Error.WriteLine("Usage: {0} [info] [warning] [error]",
Environment.GetCommandLineArgs()[0]);
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
Environment.ExitCode = 1;
return;
}

//当前消息队列 和 指定交换机的路由进行 绑定
foreach (var severity in args)
{
channel.QueueBind(queue: queueName,
exchange: ExchangeName,
routingKey: severity);
}

Console.WriteLine(" [*] Waiting for message.");

var consumer = new EventingBasicConsumer(channel);
consumer.Received += (sender, ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
var routingKey = ea.RoutingKey;
Console.WriteLine(" [x] Received '{0}':'{1}'",
routingKey, message);
};

channel.BasicConsume(queue: queueName,
noAck: true,
consumer: consumer);

Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
}
}
}
}

配置RabbitMQ

添加交换机

测试:

消费者1 监听 errorinfo 两个路由

1
$ ./bin/Debug/RoutingMQConsumer1.exe error info

消费者2 只监听 error 路由

1
$ ./bin/Debug/RoutingMQConsumer1.exe error

生产者 分别对 info/error/warn 路由 各发送一条纤细

1
2
3
$ ./bin/Debug/RoutingMQProduct.exe info hhh
$ ./bin/Debug/RoutingMQProduct.exe error 11111
$ ./bin/Debug/RoutingMQProduct.exe warn www

RabbitMQ Model介绍

欢迎关注我的其它发布渠道