简单模式
简单模式
一个生产者对一个消费者
graph LR;
id1([Product]);
id2([Consumer]);
id3([Message Quene])
style id1 fill:#0ff,stroke:#333;
style id2 fill:#3cf,stroke:#333;
style id3 fill:#f00,stroke:#333;
id1-->id3;
id3-->id2;
- Product : 发送消息
- Consumer : 消费者 接受消息
- Message Quene 消息队列,消息只会存储在队列中,不会处理,等待Consumer处理
请注意,生产者,消费者和经纪人不必位于同一主机上。实际上,在大多数应用程序中它们不是。一个应用程序既可以是生产者,也可以是消费者。
RabbitMQ使用多种协议。本教程使用AMQP 0-9-1,这是一种开放的通用消息传递协议。RabbitMQ有许多不同语言的客户。我们将使用RabbitMQ提供的.NET客户端。
环境
- Net 35
- RabbitMQ 3.4.3 点击下载
生产者
新建项目 .net35 控制台项目
SimpleMQProduct
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
| using RabbitMQ.Client; using System; using System.Text; /// <summary> /// 生产者 /// </summary> namespace SimpleMQProduct { class Send { static void Main(string[] args) { //定义连接工厂 var factory = new ConnectionFactory() { HostName = "127.0.0.1" }; //通过工厂获取连接 using (var connection = factory.CreateConnection()) { //创建通道 using (var channel = connection.CreateModel()) { //声明创建队列 channel.QueueDeclare(queue: "hello", durable: false, exclusive: false, autoDelete: false, arguments: null); //消息内容 string message = "Hello,World!"; var body = Encoding.UTF8.GetBytes(message); //发送消息 channel.BasicPublish(exchange: "", routingKey: "hello", basicProperties: null, body: body);
Console.WriteLine(" [x] Send {0}", message); Console.ReadLine(); } } } } }
|
消费者
新建项目 .net35 控制台项目
SimpleMQConsumer
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51
| using RabbitMQ.Client; using RabbitMQ.Client.Events; using System.Text; using System; /// <summary> /// 消费者 /// </summary> namespace SimpleMQConsumer { class Receive { static void Main(string[] args) { //定义连接工厂 var factory = new ConnectionFactory() { HostName = "localhost" }; //通过工厂获取连接 using (var connection = factory.CreateConnection()) { //创建通道 using (var channel = connection.CreateModel()) { //声明通道 channel.QueueDeclare(queue: "hello", durable: false, exclusive: false, autoDelete: false, arguments: null);
//定义消费者 var consumer = new EventingBasicConsumer(channel); //消息接收后处理 consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); Console.WriteLine(" [x] Received {0}", message); }; //监听队列 noAck:True ,自动消费模式; 消息消费时无需给MQ返回Ack channel.BasicConsume(queue: "hello", noAck: true, consumer: consumer);
Console.WriteLine(" Press [enter] to exit!"); Console.ReadLine(); } }
} } }
|
通过 ConnectionFactory 设置 RabbitMQ 连接参数
1
| var factory = new ConnectionFactory() { HostName = "localhost", Port=5672, UserName = "guest", Password = "guest", VirtualHost = "frexport" };
|
这里的参数
参数 |
参数类型 |
参数说明 |
默认值 |
HostName |
string |
主机的IP |
|
Port |
int |
主机通信端口 |
5672 |
UserName |
string |
连接账户 |
guest |
Password |
string |
连接账户密码 |
guest |
VirtualHost |
string |
访问的虚拟主机,可以理解为一个应用MQ |
/ |
对应的RabbitMQ操作
RabbitMQ 控制台操作
- 添加guest用户
guest用户设置密码为 guest
- 添加 vhost -> frexport
设置VHost权限,添加guest用户权限
设置后如下
回到User界面
- 添加Quene 队列 hello
我们使用 frexport 虚拟主机创建一个队列 hello
添加后
运行程序
Product
Consumer
RabbitMQ Model介绍