0%

RabbitMQ 任务分发

任务分发

任务分发机制

使用任务队列的优点之一是可以轻易的进行一步工作。

如果我们现在积压了很多工作,可以通过增加消费者来解决这个问题,使得系统伸缩性更加容易

Round-robin(轮询分发)

发布者 RabbitMQ 发送几条消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
using RabbitMQ.Client;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;

namespace WorkMQProduct
{
class NewTask
{
static string QueueName = "task_queue";
static void Main(string[] args)
{
var factory = new ConnectionFactory()
{
HostName = "localhost",
VirtualHost = "frexport",
UserName = "guest",
Password = "guest"
};
List<int> taskMessages = new List<int> { 2, 7, 2, 6, 5, 2, 2, 3 };

using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.QueueDeclare(queue: QueueName,
durable: true,
exclusive: false,
autoDelete: false,
arguments: null);

var properties = channel.CreateBasicProperties();
properties.SetPersistent(true);

for (int i = 0; i < 8; i++)
{
string message = taskMessages[i] + "";
var body = Encoding.UTF8.GetBytes(message);

properties.CorrelationId = i + "";
channel.BasicPublish(exchange: "",
routingKey: QueueName,
basicProperties: properties,
body: body);
Console.WriteLine(" [x] {0} Sent {1}", properties.CorrelationId, message);
}
}

Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}

}
}

<!--more-->

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Threading;

namespace WorkMQConsumer
{
class Worker
{
static string QueueName = "task_queue";
static void Main(string[] args)
{
var factory = new ConnectionFactory() { };
factory.HostName = "localhost";
factory.VirtualHost = "frexport";
factory.UserName = "guest";
factory.Password = "guest";

using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
channel.QueueDeclare(queue: QueueName,
durable: true,
exclusive: false,
autoDelete: false,
arguments: null);

Console.WriteLine(" [*] Waiting for message.");

var consumer = new EventingBasicConsumer(channel);

consumer.Received += (sender, ea) =>
{
var body = ea.Body;

var message = System.Text.Encoding.UTF8.GetString(body);
int x = int.Parse(message);

Console.WriteLine(" [x] Task {0} Receive {1} {2}", ea.BasicProperties.CorrelationId, message, DateTime.Now);

Thread.Sleep(1000*x);

Console.WriteLine(" [x] Done! at {0}", DateTime.Now);
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
};

channel.BasicConsume(queue: QueueName,
noAck: false,
consumer: consumer);

Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
}
}
}
}

先启动两个消费者

1
2
$ MDConsumer/bin/Debug/MDConsumer.exe
$ MDConsumer/bin/Debug/MDConsumer.exe

再启动一个生产者
1
$ MDProduct/bin/Debug/MDProduct.exe

效果

这个地方其实,所有消息会很快传给消费者,虽然没有消息应答

从上述的结果中,我们可以得知,在默认情况下,RabbitMQ不会顾虑消息者处理消息的能力,即使其中有的消费者闲置有的消费者高负荷。RabbitMQ会逐个发送消息到在序列中的下一个消费者(而不考虑每个任务的时长等等,且是提前一次性分配,并非一个一个分配)。平均每个消费者获得相同数量的消息,这种方式分发消息机制称为Round-Robin(轮询)。

Fair dispatch(公平分发)

您可能已经注意到,任务分发仍然没有完全按照我们想要的那样。比如:现在有2个消费者,所有的奇数的消息都是繁忙的,而偶数则是轻松的。按照轮询的方式,奇数的任务交给了第一个消费者,所以一直在忙个不停。偶数的任务交给另一个消费者,则立即完成任务,然后闲得不行。而RabbitMQ则是不了解这些的。这是因为当消息进入队列,RabbitMQ就会分派消息。它不看消费者为应答的数目,只是盲目的将第n条消息发给第n个消费者。

公平分发,则是根据消费者的处理能力来进行分发处理的。这里主要是通过设置prefetchCount 参数来实现的。这样RabbitMQ就会使得每个Consumer在同一个时间点最多处理规定的数量级个数的Message。换句话说,在接收到该Consumer的ack前,它不会将新的Message分发给它。 比如prefetchCount=1,则在同一时间下,每个Consumer在同一个时间点最多处理1个Message,同时在收到Consumer的ack前,它不会将新的Message分发给它。

graph LR;
    P([Product]);
    mq([Message Quene])
    C1([Consumer1]);
    C2([Consumer2]);

    style P fill:#0ff,stroke:#333;
    style mq fill:#f00,stroke:#333;
    style C1 fill:#3cf,stroke:#333;
    style C2 fill:#3cf,stroke:#333;

    P-->mq;

    mq-->|prefetch=1|C1;
    mq-->|prefetch=1|C2;

修改工作线程

1
channel.BasicQos(prefetchCount: 1, prefetchSize: 0, global: false);

注:如果所有的工作者都处于繁忙状态,你的队列有可能被填充满。你可能会观察队列的使用情况,然后增加工作者,或者使用别的什么策略。
还有一点需要注意,使用公平分发,必须关闭自动应答,改为手动应答。

效果

消息每次只会发送一条给消费者,只有消费者处理完成后,才会分发新的消息

RabbitMQ 基础知识

欢迎关注我的其它发布渠道