0%

RabbitMQ RPC Model

RPC——Remote Procedure Call,远程过程调用。 那RabbitMQ如何进行远程调用呢?

示意图

如下:

graph TB;
    C([Client]);
    mq1([rpc_queue])
    mq2([reply_to=amq.gen-..])
    S([Server])

    style C fill:#3cf,stroke:#333;
    style mq1 fill:#f00,stroke:#333;
    style mq2 fill:#f00,stroke:#333;
    style S fill:#3cf,stroke:#333;

    C-->|Request reply_to=amq.gen-.. correlation_id=abc |mq1;
    mq1-->S;

    S-->mq2;
    mq2-->|Reply correlation_id=abc|C;

解释

  • 第一步,主要是进行远程调用的客户端需要指定接收远程回调的队列,并申明消费者监听此队列。
  • 第二步,远程调用的服务端除了要申明消费端接收远程调用请求外,还要将结果发送到客户端用来监听回调结果的队列中去。

代码

客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
using System;
using System.Collections.Concurrent;
using System.Text;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;

public class RpcClient
{
private readonly IConnection connection;
private readonly IModel channel;
private readonly string replyQueueName;
private readonly EventingBasicConsumer consumer;
private readonly BlockingCollection<string> respQueue = new BlockingCollection<string>();
private readonly IBasicProperties props;

/// <summary>
/// 队列名
/// </summary>
static string QueueName = "rpc_queue";

public RpcClient()
{
var factory = new ConnectionFactory()
{
HostName = "localhost",
VirtualHost = "frexport"
};

connection = factory.CreateConnection();
channel = connection.CreateModel();

//创建一个回调MQ队列,获取消息队列名
replyQueueName = channel.QueueDeclare().QueueName;
//创建自动队列的消费者,接收此队列的消息
consumer = new EventingBasicConsumer(channel);

//创建消息信息,在信息里面填入消息回调参数,让服务器知道消息来源
props = channel.CreateBasicProperties();
var correlationId = Guid.NewGuid().ToString();
props.CorrelationId = correlationId;
props.ReplyTo = replyQueueName;

//消息消费
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var response = Encoding.UTF8.GetString(body);
if (ea.BasicProperties.CorrelationId == correlationId)
{
respQueue.Add(response);
}
};
}

public string Call(string message)
{
var messageBytes = Encoding.UTF8.GetBytes(message);

//发送消息
channel.BasicPublish(
exchange: "",
routingKey: QueueName,
basicProperties: props,
body: messageBytes);

//接收回调消息
channel.BasicConsume(
consumer: consumer,
queue: replyQueueName,
noAck: true);
//如果有回调,则respQueue有数据,返回,否则线程会阻塞在这个位置
return respQueue.Take();
}

public void Close()
{
connection.Close();
}
}

public class Rpc
{
public static void Main()
{
var rpcClient = new RpcClient();

Console.WriteLine(" [x] Requesting fib(30)");
var response = rpcClient.Call("30");

Console.WriteLine(" [.] Got '{0}'", response);
rpcClient.Close();
}
}

服务端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;
using System.Threading;

namespace RPCServer
{
class RPCServer
{
static string QueueName = "rpc_queue";
static void Main(string[] args)
{
var factory = new ConnectionFactory()
{
HostName = "localhost",
VirtualHost = "frexport"
};

using (var conn = factory.CreateConnection())
{
using (var channel = conn.CreateModel())
{
//定义消息队列
channel.QueueDeclare(queue: QueueName,
durable: true,
exclusive: false,
autoDelete: false,
arguments: null);
//消费者建立
var consumer = new EventingBasicConsumer(channel);
Console.WriteLine(" [x] Awaiting RPC requests");

//消息处理
consumer.Received += (sender, ea) =>
{
string response = null;

var body = ea.Body;

var props = ea.BasicProperties;

//回调属性带上 CorrelationId,以便客户端识别
var replyProps = channel.CreateBasicProperties();
replyProps.CorrelationId = props.CorrelationId;

try
{
var message = Encoding.UTF8.GetString(body);
int n = int.Parse(message);
Console.WriteLine(" [.] fib({0})", message);
response = fib(n).ToString();
}
catch (Exception e)
{
Console.WriteLine(" [.] " + e.Message);
response = "";
}
finally
{
var responseBytes = Encoding.UTF8.GetBytes(response);

//消息从回调通道,发送RabbitMQ
channel.BasicPublish(exchange: "",
routingKey: props.ReplyTo,
basicProperties: replyProps,
body: responseBytes);
//手动完成消息
channel.BasicAck(deliveryTag: ea.DeliveryTag,
multiple: false);
}
};

//消费消息
channel.BasicConsume(queue: QueueName,
noAck: false,
consumer: consumer);

Console.WriteLine(" Press [enter] to exit.");
//这个位置必须要加入 Console.ReadLine(); 否则程序继续往下执行,channel和conn 会被销毁,通道会断开
Console.ReadLine();
}
}
}

/// <summary>
/// 求 fib 数
/// </summary>
/// <param name="n"></param>
/// <returns></returns>
private static int fib(int n)
{
if (n == 0 || n == 1)
{
return n;
}

return fib(n - 1) + fib(n - 2);
}
}
}

演示

RabbitMQ Model介绍

欢迎关注我的其它发布渠道