0%

简单模式

简单模式

一个生产者对一个消费者

graph LR;
    id1([Product]);
    id2([Consumer]);
    id3([Message Quene])
    style id1 fill:#0ff,stroke:#333;
    style id2 fill:#3cf,stroke:#333;
    style id3 fill:#f00,stroke:#333;
    id1-->id3;
    id3-->id2;
  • Product : 发送消息
  • Consumer : 消费者 接受消息
  • Message Quene 消息队列,消息只会存储在队列中,不会处理,等待Consumer处理

请注意,生产者,消费者和经纪人不必位于同一主机上。实际上,在大多数应用程序中它们不是。一个应用程序既可以是生产者,也可以是消费者。

RabbitMQ使用多种协议。本教程使用AMQP 0-9-1,这是一种开放的通用消息传递协议。RabbitMQ有许多不同语言的客户。我们将使用RabbitMQ提供的.NET客户端。

环境

阅读全文 »

发布/订阅模式

Publish/Subscribe 发布/订阅模式

一个生成者对应多个消费者

之前我们创建了一个工作队列。工作队列背后的假设是,每个任务都恰好交付给一个工人。
在这一部分中,我们将做一些完全不同的事情-我们将消息传达给多个消费者。这种模式称为“发布/订阅”。

为了说明这种模式,我们将构建一个简单的日志记录系统。它包含两个程序-第一个程序将发出日志消息,第二个程序将接收并打印它们。

graph LR;
    id1([Product]);
    id2([ExChange]);
    id3([amq.gen-2G4YaJ2P3JcJEwHHiRL5JA])
    id4([amq.gen-tsfVrHogVGKF3vGv6-rPWg])
    id5([C1])
    id6([C2])

    style id1 fill:#0ff,stroke:#333;
    style id2 fill:#33c,stroke:#333;
    style id3 fill:#f00,stroke:#333;
    style id4 fill:#f00,stroke:#333;
    style id5 fill:#3cf,stroke:#333;
    style id6 fill:#3cf,stroke:#333;

    id1-->id2;

    id2-->id3-->id5;
    id2-->id4-->id6;
  • 生产者:将消息发送到 交换机/队列
  • 消费者:只能从队列中获取消息
  • 队列可以绑定交换机

如果消息发送到没有队列绑定的交换机上,那么消息将丢失

阅读全文 »

主题模式

Topics 主题模式

主题模式类似 路由模式

路由模式是 完全匹配 模式,主题模式匹配 通配符

graph LR;
p([Product]);
ex([ExChange]);
mq1([amq.gen-fMFRcKxaTxM-o_ApPe_AHw])
mq2([amq.gen-jWFR9bCh4_b52j6KUDt1Sw])
mq3([amq.gen-kkOjkWx9if2mQB_3gcfO4w])
mq4([amq.gen-tNqCT75w_QqSJbVKrJapQQ])
c1([C1])
c2([C2])
c3([C3])
c4([C4])

style p fill:#0ff,stroke:#333;
style ex fill:#33c,stroke:#333;
style mq1 fill:#f00,stroke:#333;
style mq2 fill:#f00,stroke:#333;
style mq3 fill:#f00,stroke:#333;
style mq4 fill:#f00,stroke:#333;
style c1 fill:#3cf,stroke:#333;
style c2 fill:#3cf,stroke:#333;
style c3 fill:#3cf,stroke:#333;
style c4 fill:#3cf,stroke:#333;

p-->ex;

ex-->|#|mq1;
mq1-->c1;

ex-->|kern.*|mq2;
mq2-->c2;

ex-->|*.critical|mq3;
mq3-->c3;

ex-->|kern.*|mq4;
ex-->|*.critical|mq4;
mq4-->c4;</pre>
阅读全文 »

工作队列

Work Queues 工作队列

工作队列背后的假设是,每个任务都恰好交付给一个工人

一个生成者对应多个消费者

graph LR;
    id1([Product]);
    id2([Message Quene])
    id3([Consumer1]);
    id4([Consumer2]);

    style id1 fill:#0ff,stroke:#333;
    style id2 fill:#f00,stroke:#333;
    style id3 fill:#3cf,stroke:#333;
    style id4 fill:#3cf,stroke:#333;

    id1-->id2;

    id2-->id3;
    id2-->id4;

将比较复杂比较耗时的任务放在任务队列中,不必立即执行。

任务队列用来管理任务列表,我们在后台的工作可以交给多个线程来完成。

阅读全文 »

本文以淘宝作为例子,介绍从一百个并发到千万级并发情况下服务端的架构的演进过程

1、 概述

本文以淘宝作为例子,介绍从一百个并发到千万级并发情况下服务端的架构的演进过程,同时列举出每个演进阶段会遇到的相关技术,让大家对架构的演进有一个整体的认知,文章最后汇总了一些架构设计的原则。

2、基本概念

在介绍架构之前,为了避免部分读者对架构设计中的一些概念不了解,下面对几个最基础的概念进行介绍。

1)什么是分布式?

系统中的多个模块在不同服务器上部署,即可称为分布式系统,如Tomcat和数据库分别部署在不同的服务器上,或两个相同功能的Tomcat分别部署在不同服务器上。

2)什么是高可用?

系统中部分节点失效时,其他节点能够接替它继续提供服务,则可认为系统具有高可用性。

3)什么是集群?

一个特定领域的软件部署在多台服务器上并作为一个整体提供一类服务,这个整体称为集群。
如Zookeeper中的Master和Slave分别部署在多台服务器上,共同组成一个整体提供集中配置服务。
在常见的集群中,客户端往往能够连接任意一个节点获得服务,并且当集群中一个节点掉线时,其他节点往往能够自动的接替它继续提供服务,这时候说明集群具有高可用性。

4)什么是负载均衡?

请求发送到系统时,通过某些方式把请求均匀分发到多个节点上,使系统中每个节点能够均匀的处理请求负载,则可认为系统是负载均衡的。

5)什么是正向代理和反向代理?

系统内部要访问外部网络时,统一通过一个代理服务器把请求转发出去,在外部网络看来就是代理服务器发起的访问,此时代理服务器实现的是正向代理;
当外部请求进入系统时,代理服务器把该请求转发到系统中的某台服务器上,对外部请求来说,与之交互的只有代理服务器,此时代理服务器实现的是反向代理。
简单来说,正向代理是代理服务器代替系统内部来访问外部网络的过程,反向代理是外部请求访问系统时通过代理服务器转发到内部服务器的过程。

阅读全文 »

RabbitMQ 控制台操作

对应的RabbitMQ操作

RabbitMQ 控制台操作
  1. 添加guest用户

guest用户设置密码为 guest

  1. 添加 vhost -> frexport

设置VHost权限,添加guest用户权限

设置后如下

回到User界面

  1. 添加Quene 队列 hello

我们使用 frexport 虚拟主机创建一个队列 hello

添加后

阅读全文 »

RabbitMQ 交换机概念

Exchange 交换机

RabbitMQ 中的消息 不是直接发送到Queue中的,中间有一个Exchange 做消息分发。
producer甚至都不知道消息发送到哪个队列中去。因此,当Exchange收到message时,必须知道如何准备分发消息。
具体是append 到一定规则的queue,还是append到多个queue中,还是被丢弃?这些都是通过 exchange的类型定义的。
|type|作用|创建vhost时默认创建的exchange的名称|
|-|-|-|
|direct|路由模式|(Empty string) and amq.direct|
|fanout|发布/订阅模式|amq.fanout|
|Topic|主题模式|amq.topic|
|headers||amq.match (and amq.headers in RabbitMQ)|

一:Direct Exchange

它处理路由键。需要将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全匹配。这是一个完整的匹配。如果一个队列绑定到该交换机上要求路由键 “dog”,则只有被标记为“dog”的消息才被转发,不会转发dog.puppy,也不会转发dog.guard,只会转发dog。

直接交换通常用于:

  • 以循环方式在多个 workers(同一应用程序的实例)之间分配任务。当这样做时,消息在消费者之间而不是在队列之间是负载平衡的。
阅读全文 »

RabbitMQ 安装

RabbitMQ官网

安装 RabbitMQ

docker需要先装好

使用 docker-compose

1
2
3
4
5
[root@localhost ~]# mkdir rabbitmq
[root@localhost ~]# cd rabbitmq/
[root@localhost rabbitmq]# mkdir data
[root@localhost rabbitmq]# mkdir log
[root@localhost rabbitmq]# vi docker-compose.yml

docker-compose.yml 内容

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
version: '2'
services:
rabbitmq:
hostname: rabbitmq
environment:
RABBITMQ_DEFAULT_VHOST: "xxx"
RABBITMQ_DEFAULT_USER: "admin"
RABBITMQ_DEFAULT_PASS: "admin"
image: "rabbitmq:3-management"
restart: always
volumes:
- "./data:/var/lib/rabbitmq"
- "./log:/var/log/rabbitmq/log"
ports:
- "4369:4369"
- "5672:5672"
- "15672:15672"
- "25672:25672"
阅读全文 »

任务分发

任务分发机制

使用任务队列的优点之一是可以轻易的进行一步工作。

如果我们现在积压了很多工作,可以通过增加消费者来解决这个问题,使得系统伸缩性更加容易

Round-robin(轮询分发)

发布者 RabbitMQ 发送几条消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
using RabbitMQ.Client;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;

namespace WorkMQProduct
{
class NewTask
{
static string QueueName = "task_queue";
static void Main(string[] args)
{
var factory = new ConnectionFactory()
{
HostName = "localhost",
VirtualHost = "frexport",
UserName = "guest",
Password = "guest"
};
List<int> taskMessages = new List<int> { 2, 7, 2, 6, 5, 2, 2, 3 };

using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.QueueDeclare(queue: QueueName,
durable: true,
exclusive: false,
autoDelete: false,
arguments: null);

var properties = channel.CreateBasicProperties();
properties.SetPersistent(true);

for (int i = 0; i < 8; i++)
{
string message = taskMessages[i] + "";
var body = Encoding.UTF8.GetBytes(message);

properties.CorrelationId = i + "";
channel.BasicPublish(exchange: "",
routingKey: QueueName,
basicProperties: properties,
body: body);
Console.WriteLine(" [x] {0} Sent {1}", properties.CorrelationId, message);
}
}

Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}

}
}

<!--more-->

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Threading;

namespace WorkMQConsumer
{
class Worker
{
static string QueueName = "task_queue";
static void Main(string[] args)
{
var factory = new ConnectionFactory() { };
factory.HostName = "localhost";
factory.VirtualHost = "frexport";
factory.UserName = "guest";
factory.Password = "guest";

using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
channel.QueueDeclare(queue: QueueName,
durable: true,
exclusive: false,
autoDelete: false,
arguments: null);

Console.WriteLine(" [*] Waiting for message.");

var consumer = new EventingBasicConsumer(channel);

consumer.Received += (sender, ea) =>
{
var body = ea.Body;

var message = System.Text.Encoding.UTF8.GetString(body);
int x = int.Parse(message);

Console.WriteLine(" [x] Task {0} Receive {1} {2}", ea.BasicProperties.CorrelationId, message, DateTime.Now);

Thread.Sleep(1000*x);

Console.WriteLine(" [x] Done! at {0}", DateTime.Now);
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
};

channel.BasicConsume(queue: QueueName,
noAck: false,
consumer: consumer);

Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
}
}
}
}

先启动两个消费者

1
2
$ MDConsumer/bin/Debug/MDConsumer.exe
$ MDConsumer/bin/Debug/MDConsumer.exe

再启动一个生产者
1
$ MDProduct/bin/Debug/MDProduct.exe

效果

这个地方其实,所有消息会很快传给消费者,虽然没有消息应答

从上述的结果中,我们可以得知,在默认情况下,RabbitMQ不会顾虑消息者处理消息的能力,即使其中有的消费者闲置有的消费者高负荷。RabbitMQ会逐个发送消息到在序列中的下一个消费者(而不考虑每个任务的时长等等,且是提前一次性分配,并非一个一个分配)。平均每个消费者获得相同数量的消息,这种方式分发消息机制称为Round-Robin(轮询)。

Fair dispatch(公平分发)

您可能已经注意到,任务分发仍然没有完全按照我们想要的那样。比如:现在有2个消费者,所有的奇数的消息都是繁忙的,而偶数则是轻松的。按照轮询的方式,奇数的任务交给了第一个消费者,所以一直在忙个不停。偶数的任务交给另一个消费者,则立即完成任务,然后闲得不行。而RabbitMQ则是不了解这些的。这是因为当消息进入队列,RabbitMQ就会分派消息。它不看消费者为应答的数目,只是盲目的将第n条消息发给第n个消费者。

公平分发,则是根据消费者的处理能力来进行分发处理的。这里主要是通过设置prefetchCount 参数来实现的。这样RabbitMQ就会使得每个Consumer在同一个时间点最多处理规定的数量级个数的Message。换句话说,在接收到该Consumer的ack前,它不会将新的Message分发给它。 比如prefetchCount=1,则在同一时间下,每个Consumer在同一个时间点最多处理1个Message,同时在收到Consumer的ack前,它不会将新的Message分发给它。

graph LR;
    P([Product]);
    mq([Message Quene])
    C1([Consumer1]);
    C2([Consumer2]);

    style P fill:#0ff,stroke:#333;
    style mq fill:#f00,stroke:#333;
    style C1 fill:#3cf,stroke:#333;
    style C2 fill:#3cf,stroke:#333;

    P-->mq;

    mq-->|prefetch=1|C1;
    mq-->|prefetch=1|C2;

修改工作线程

1
channel.BasicQos(prefetchCount: 1, prefetchSize: 0, global: false);

注:如果所有的工作者都处于繁忙状态,你的队列有可能被填充满。你可能会观察队列的使用情况,然后增加工作者,或者使用别的什么策略。
还有一点需要注意,使用公平分发,必须关闭自动应答,改为手动应答。

效果

消息每次只会发送一条给消费者,只有消费者处理完成后,才会分发新的消息

RabbitMQ 基础知识

总览

  • 为什么需要确认
  • 手动和自动确认模式
  • 确认API[多次确认和重新排队]
  • 连接丢失或通道关闭时自动重新排队
  • 通道预取及其对吞吐量的影响
  • 最常见的客户错误
  • 发布者确认和相关发布者数据安全主题

基础

按照定义,使用消息传递代理(RabbitMQ)的系统是分布式的。由于不能保证发送的消息可以到达对方或者被其成功处理,因此发布者和消费者都需要一种机制来进行传递和处理确认。

从消费者到RabbitMQ的消息确认被称为消息传递协议的确认
对发布者的去人称为发布者确认。两种功能都基于相同的思想,启发于TCP.

这对于 发布者到RabbitMQ,RabbitMQ到消费者的可靠交付都是必不可少的。 他们对于数据安全至关重要。

消费者确认

RabbitMQ 将消息传递给使用者的时候,需要知道何时消息被处理成功。具体逻辑取决于系统。因此这个是应用程序的决策.

阅读全文 »