0%

RabbitMQ 使用

引子

学习下如何使用rabbitMQ

MQ 即 (Message Quene),消息队列

在计算机科学中,消息队列(英语:Message queue)是一种进程间通信或同一进程的不同线程间的通信方式,软件的贮列用来处理一系列的输入,通常是来自用户。消息队列提供了异步的通信协议,每一个贮列中的纪录包含详细说明的数据,包含发生的时间,输入设备的种类,以及特定的输入参数,也就是说:消息的发送者和接收者不需要同时与消息队列交互。消息会保存在队列中,直到接收者取回它。

实际上,消息队列常常保存在链表结构中。拥有权限的进程可以向消息队列中写入或读取消息。
目前,有很多消息队列有很多开源的实现,包括JBoss Messaging、JORAM、Apache ActiveMQ、Sun Open Message Queue、RabbitMQ、IBM MQ、Apache Qpid和HTTPSQS。

RabbitMQ简介

RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务器是用Erlang语言编写的,而群集和故障转移是构建在开放电信平台框架上的。所有主要的编程语言均有与代理接口通讯的客户端库。

RabbitMQ支持以下操作系统:

- Windows
- Linux/Unix
- MAC OS X

RabbitMQ支持下列编程语言:

- C# (using .net/c# client)
- clojure (using Langohr)
- erlang (using erlang client)
- java (using java client)
- javascript/node.js (using amqp.node)
- perl (using Net::RabbitFoot)
- python (using pika)
- python-puka (using puka)
- ruby (using Bunny)
- ruby (using amqp gem)

RabbitMQ官网

安装 RabbitMQ

安装: RabbitMQ 安装

RabbitMQ 概念

一、队列、生产者、消费者

    1. 队列: RabbitMQ的内部对象,用于存储消息。
    1. 生产者: (下图中的Product)生产消息并投递到队列中
    1. 消费者: (下图中的Consumer)可以从队列中获取消息并消费。
graph LR;
    P([Product]);
    C([Consumer]);
    mq([Message Quene])
    style P fill:#0ff,stroke:#333;
    style C fill:#3cf,stroke:#333;
    style mq fill:#f00,stroke:#333;
    P-->mq-->C;

多个消费者可以订阅同一个队列,这时队列中的消息会被平均分摊给多个消费者进行处理,而不是每个消费者都收到所有的消息并处理。

graph LR;
    P([Product]);
    mq([Message Quene])
    C1([Consumer1]);
    C2([Consumer2]);

    style P fill:#0ff,stroke:#333;
    style mq fill:#f00,stroke:#333;
    style C1 fill:#3cf,stroke:#333;
    style C2 fill:#3cf,stroke:#333;

    P-->mq;
    mq-->C1;
    mq-->C2;

二、Exchange/交换机、Binding/绑定

刚才我们看到生产者将消息投递到队列中,实际上这在RabbitMQ中这种事情永远都不会发生。实际的情况是,生产者将消息发送到Exchange(交换器,下图中的Exchange),再通过Binding将Exchange与Queue关联起来。

graph LR;
    P([Product]);
    Ex([ExChange]);
    mq1([amq.gen-2G4YaJ2P3JcJEwHHiRL5JA])
    mq2([amq.gen-tsfVrHogVGKF3vGv6-rPWg])
    C1([Consumer1])
    C2([Consumer2])

    style P fill:#0ff,stroke:#333;
    style Ex fill:#33c,stroke:#333;
    style mq1 fill:#f00,stroke:#333;
    style mq2 fill:#f00,stroke:#333;
    style C1 fill:#3cf,stroke:#333;
    style C2 fill:#3cf,stroke:#333;

    P-->Ex;

    Ex-->mq1-->C1;
    Ex-->mq2-->C2;

三、Exchange Type/交换机类型、Bingding key/绑定Key、routing key/路由Key

相关信息: RabbitMQ交换机

四、几种消息模型

五、消息持久化

要持久化队列queue的持久化需要在声明时指定durable=True;
这里要注意,队列的名字一定要是Broker中不存在的,不然不能改变此队列的任何属性.
队列和交换机有一个创建时候指定的标志durable,durable的唯一含义就是具有这个标志的队列和交换机会在重启之后重新建立,它不表示说在队列中的消息会在重启后恢复

消息持久化包括3部分

  • exchange 持久化,在声明时指定durable => true
    1
    hannel.ExchangeDeclare(ExchangeName, "direct", durable: true, autoDelete: false, arguments: null);//声明交换机信息,且为可持久化的
  • queue持久化,在声明时指定durable => true
    1
    channel.QueueDeclare(QueueName, durable: true, exclusive: false, autoDelete: false, arguments: null);//声明消息队列,且为可持久化的
  • 消息持久化,在投递时指定delivery_mode => 2(1是非持久化).
    1
    channel.basicPublish("", queueName, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes()); 

如果exchange和queue都是持久化的,那么它们之间的binding也是持久化的,如果exchange和queue两者之间有一个持久化,一个非持久化,则不允许建立绑定.
注意:一旦创建了队列和交换机,就不能修改其标志了,例如,创建了一个non-durable的队列,然后想把它改变成durable的,唯一的办法就是删除这个队列然后重现创建。

六、任务分发机制

相关信息: RabbitMQ任务分发

RabbitMQ 应用场景

一、异步处理

场景说明:用户注册后,需要发注册邮件和注册短信,传统的做法有两种

  • 1.串行的方式;
  • 2.并行的方式

(1)串行方式:

将注册信息写入数据库后,发送注册邮件,再发送注册短信,以上三个任务全部完成后才返回给客户端。 这有一个问题是,邮件,短信并不是必须的,它只是一个通知,而这种做法让客户端等待没有必要等待的东西.

(2)并行方式:

将注册信息写入数据库后,发送邮件的同时,发送短信,以上三个任务完成后,返回给客户端,并行的方式能提高处理的时间。

假设三个业务节点分别使用50ms,串行方式使用时间150ms,并行使用时间100ms。虽然并性已经提高的处理时间,但是,前面说过,邮件和短信对我正常的使用网站没有任何影响,客户端没有必要等着其发送完成才显示注册成功,应该是写入数据库后就返回.

(3)消息队列
引入消息队列后,把发送邮件,短信不是必须的业务逻辑异步处理

由此可以看出,引入消息队列后,用户的响应时间就等于写入数据库的时间+写入消息队列的时间(可以忽略不计),引入消息队列后处理后,响应时间是串行的3倍,是并行的2倍。

二、应用解耦

场景:双11是购物狂节,用户下单后,订单系统需要通知库存系统,传统的做法就是订单系统调用库存系统的接口.

这种做法有一个缺点:

当库存系统出现故障时,订单就会失败。
订单系统和库存系统高耦合.
引入消息队列

订单系统:用户下单后,订单系统完成持久化处理,将消息写入消息队列,返回用户订单下单成功。

库存系统:订阅下单的消息,获取下单消息,进行库操作。
就算库存系统出现故障,消息队列也能保证消息的可靠投递,不会导致消息丢失.

三、流量削峰

流量削峰一般在秒杀活动中应用广泛
场景:秒杀活动,一般会因为流量过大,导致应用挂掉,为了解决这个问题,一般在应用前端加入消息队列。
作用:
1.可以控制活动人数,超过此一定阀值的订单直接丢弃
2.可以缓解短时间的高流量压垮应用(应用程序按自己的最大处理能力获取订单)

1.用户的请求,服务器收到之后,首先写入消息队列,加入消息队列长度超过最大值,则直接抛弃用户请求或跳转到错误页面.
2.秒杀业务根据消息队列中的请求信息,再做后续处理.

运行环境

.Net 的 ConnectionFactory

通过 ConnectionFactory 设置 RabbitMQ 连接参数

1
2
3
4
5
var factory = new ConnectionFactory() { HostName = "localhost", 
Port=5672,
UserName = "guest",
Password = "guest",
VirtualHost = "frexport" };

这里的参数

参数 参数类型 参数说明 默认值
HostName string 主机的IP
Port int 主机通信端口 5672
UserName string 连接账户 guest
Password string 连接账户密码 guest
VirtualHost string 访问的虚拟主机,可以理解为一个应用MQ /

Queue 队列

queue(队列,task-queueing系统),主要存储消息被提供消费者进行消费。
queue还有以下属性:

属性 type 描述
name string queue的名称
durability bool queue是否持久化
exclusive bool 当消费者断开连接后是否删除该队列
Auto-delete bool 当所有消费客户端连接断开后,是否自动删除队列
Arguments object 使用 broker-specific 时候的参数

Message Ack 消息应答

消息应答。执行一个任务可能需要花费几秒钟,你可能会担心如果一个消费者在执行任务过程中挂掉了。一旦RabbitMQ将消息分发给了消费者,就会从内存中删除。在这种情况下,如果正在执行任务的消费者宕机,会丢失正在处理的消息和分发给这个消费者但尚未处理的消息。
但是,我们不想丢失任何任务,如果有一个消费者挂掉了,那么我们应该将分发给它的任务交付给另一个消费者去处理。
为了确保消息不会丢失,RabbitMQ支持消息应答。消费者发送一个消息应答,告诉RabbitMQ这个消息已经接收并且处理完毕了。RabbitMQ就可以删除它了。
如果一个消费者挂掉却没有发送应答,RabbitMQ会理解为这个消息没有处理完全,然后交给另一个消费者去重新处理。这样,你就可以确认即使消费者偶尔挂掉也不会丢失任何消息了。
没有任何消息超时限制;只有当消费者挂掉时,RabbitMQ才会重新投递。即使处理一条消息会花费很长的时间。
消息应答是默认打开的。我们通过显示的设置autoAsk=true关闭这种机制。现即自动应答开,一旦我们完成任务,消费者会自动发送应答。通知RabbitMQ消息已被处理,可以从内存删除。如果消费者因宕机或链接失败等原因没有发送ACK(不同于ActiveMQ,在RabbitMQ里,消息没有过期的概念),则RabbitMQ会将消息重新发送给其他监听在队列的下一个消费者。

Message Reject

拒绝消息。当消费者应用程序收到消息时,该消息的处理可能会成功,也可能不会成功。 消费者可以通过拒绝消息向代理指出消息处理失败(或当时无法完成)。 当拒绝消息时,消费者可以要求代理丢弃或重新发送消息。 当队列中只有一个消费者时,确保您不会通过一次又一次地拒绝并重新发送来自同一个消费者的消息来创建无限的消息传递循环。

Message Nack

拒绝应答。消费者使用 basic.reject拒绝消息,则该消息为Rejecting Messages。AMQP
只能一次拒绝一条消息,但是如果用的rabbitmq则可以拒绝多个消息。

Prefetching Messages

预取消息。指定channel(通道)的等待处理的消息个数,如果等待的消息已经达到该值,则该消费者不再接受新的消息。默认的channel不限制个数。最好的方式是设置该值在一个合理的数值,达到多消费者之间的简单负载均衡。

对应的RabbitMQ控制台操作

RabbitMQ控制台: RabbitMQ 控制台操作

消息消费的两种模式

  1. 自动模式

消费者从消息队列获取消息后,服务端就认为该消息已经成功消费。

1
2
3
4
5
6
7
8
9
10
11
12
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
Console.WriteLine(" [x] Received {0}", message);
//无需反馈
};

channel.BasicConsume(queue: "hello",
noAck: true,
consumer: consumer);
  1. 手动模式

消费者从消息队列获取消息后,服务端并没有标记为成功消费
​消费者成功消费后需要将状态返回到服务端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
var consumer = new EventingBasicConsumer(channel);

consumer.Received += (sender, ea) =>
{
var body = ea.Body;
var message = System.Text.Encoding.UTF8.GetString(body);

Console.WriteLine(" [x] Receive {0} {1}", message, DateTime.Now);

Thread.Sleep(1000);

//消息消费完给服务器返回确认状态,表示该消息已被消费
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
};

channel.BasicConsume(queue: QueueName,
noAck: false,
consumer: consumer);

RabbitMQ 基础知识

欢迎关注我的其它发布渠道