RPC——Remote Procedure Call,远程过程调用。 那RabbitMQ如何进行远程调用呢?
示意图
如下:
graph TB;
C([Client]);
mq1([rpc_queue])
mq2([reply_to=amq.gen-..])
S([Server])
style C fill:#3cf,stroke:#333;
style mq1 fill:#f00,stroke:#333;
style mq2 fill:#f00,stroke:#333;
style S fill:#3cf,stroke:#333;
C-->|Request reply_to=amq.gen-.. correlation_id=abc |mq1;
mq1-->S;
S-->mq2;
mq2-->|Reply correlation_id=abc|C;
解释
- 第一步,主要是进行远程调用的客户端需要指定接收远程回调的队列,并申明消费者监听此队列。
- 第二步,远程调用的服务端除了要申明消费端接收远程调用请求外,还要将结果发送到客户端用来监听回调结果的队列中去。
代码
客户端
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93
| using System; using System.Collections.Concurrent; using System.Text; using RabbitMQ.Client; using RabbitMQ.Client.Events;
public class RpcClient { private readonly IConnection connection; private readonly IModel channel; private readonly string replyQueueName; private readonly EventingBasicConsumer consumer; private readonly BlockingCollection<string> respQueue = new BlockingCollection<string>(); private readonly IBasicProperties props;
/// <summary> /// 队列名 /// </summary> static string QueueName = "rpc_queue";
public RpcClient() { var factory = new ConnectionFactory() { HostName = "localhost", VirtualHost = "frexport" };
connection = factory.CreateConnection(); channel = connection.CreateModel();
//创建一个回调MQ队列,获取消息队列名 replyQueueName = channel.QueueDeclare().QueueName; //创建自动队列的消费者,接收此队列的消息 consumer = new EventingBasicConsumer(channel);
//创建消息信息,在信息里面填入消息回调参数,让服务器知道消息来源 props = channel.CreateBasicProperties(); var correlationId = Guid.NewGuid().ToString(); props.CorrelationId = correlationId; props.ReplyTo = replyQueueName;
//消息消费 consumer.Received += (model, ea) => { var body = ea.Body; var response = Encoding.UTF8.GetString(body); if (ea.BasicProperties.CorrelationId == correlationId) { respQueue.Add(response); } }; }
public string Call(string message) { var messageBytes = Encoding.UTF8.GetBytes(message);
//发送消息 channel.BasicPublish( exchange: "", routingKey: QueueName, basicProperties: props, body: messageBytes);
//接收回调消息 channel.BasicConsume( consumer: consumer, queue: replyQueueName, noAck: true); //如果有回调,则respQueue有数据,返回,否则线程会阻塞在这个位置 return respQueue.Take(); }
public void Close() { connection.Close(); } }
public class Rpc { public static void Main() { var rpcClient = new RpcClient();
Console.WriteLine(" [x] Requesting fib(30)"); var response = rpcClient.Call("30");
Console.WriteLine(" [.] Got '{0}'", response); rpcClient.Close(); } }
|
服务端
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102
| using RabbitMQ.Client; using RabbitMQ.Client.Events; using System; using System.Text; using System.Threading;
namespace RPCServer { class RPCServer { static string QueueName = "rpc_queue"; static void Main(string[] args) { var factory = new ConnectionFactory() { HostName = "localhost", VirtualHost = "frexport" };
using (var conn = factory.CreateConnection()) { using (var channel = conn.CreateModel()) { //定义消息队列 channel.QueueDeclare(queue: QueueName, durable: true, exclusive: false, autoDelete: false, arguments: null); //消费者建立 var consumer = new EventingBasicConsumer(channel); Console.WriteLine(" [x] Awaiting RPC requests");
//消息处理 consumer.Received += (sender, ea) => { string response = null;
var body = ea.Body;
var props = ea.BasicProperties;
//回调属性带上 CorrelationId,以便客户端识别 var replyProps = channel.CreateBasicProperties(); replyProps.CorrelationId = props.CorrelationId;
try { var message = Encoding.UTF8.GetString(body); int n = int.Parse(message); Console.WriteLine(" [.] fib({0})", message); response = fib(n).ToString(); } catch (Exception e) { Console.WriteLine(" [.] " + e.Message); response = ""; } finally { var responseBytes = Encoding.UTF8.GetBytes(response);
//消息从回调通道,发送RabbitMQ channel.BasicPublish(exchange: "", routingKey: props.ReplyTo, basicProperties: replyProps, body: responseBytes); //手动完成消息 channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); } };
//消费消息 channel.BasicConsume(queue: QueueName, noAck: false, consumer: consumer);
Console.WriteLine(" Press [enter] to exit."); //这个位置必须要加入 Console.ReadLine(); 否则程序继续往下执行,channel和conn 会被销毁,通道会断开 Console.ReadLine(); } } }
/// <summary> /// 求 fib 数 /// </summary> /// <param name="n"></param> /// <returns></returns> private static int fib(int n) { if (n == 0 || n == 1) { return n; }
return fib(n - 1) + fib(n - 2); } } }
|
演示
RabbitMQ Model介绍