0%

RabbitMQ 发布者确认

发布者确认

发布者确认

发布者确认是RabbitMQ的扩展,可以实现可靠的发布。在channel上启用发布者确认后,代理将异步确认客户端发布的消息,这意味着他们已在服务器端处理。

在频道上启用发布者确认

发布者确认是AMQP 0.9.1协议的RabbitMQ扩展,因此默认情况下未启用它们。发布者确认是通过ConfirmSelect方法在通道级别启用的:

1
2
var channel = connection.CreateModel();
channel.ConfirmSelect();

必须在希望使用发布者确认的每个频道上调用此方法。确认仅应启用一次,而不是对每个已发布的消息都启用

策略1:Publishing Messages Individually

每条消息发步后,等待确认

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
private static void PublishMessagesIndividually()
{
using (var connection = CreateConnection())
using (var channel = connection.CreateModel())
{
var queueName = channel.QueueDeclare().QueueName;
channel.ConfirmSelect();

var timer = new Stopwatch();
timer.Start();
for (int i = 0; i < MESSAGE_COUNT; i++)
{
var body = Encoding.UTF8.GetBytes(i.ToString());
channel.BasicPublish(exchange: "", routingKey: queueName, basicProperties: null, body: body);
channel.WaitForConfirmsOrDie(new TimeSpan(0, 0, 5));
}
timer.Stop();
Console.WriteLine($"Published {MESSAGE_COUNT:N0} messages individually in {timer.ElapsedMilliseconds:N0} ms");
}
}

在前面的示例中,我们像往常一样发布一条消息,并等待通过Channel#WaitForConfirmsOrDie(TimeSpan)方法进行确认。确认消息后,该方法立即返回。如果未在超时时间内确认该消息或该消息没有被确认(这意味着代理出于某种原因无法处理该消息),则该方法将引发异常。异常的处理通常包括记录错误消息和/或重试发送消息。

此方法非常简单,但也有一个主要缺点:由于消息的确认会阻止所有后续消息的发布,因此它会大大降低发布速度。这种方法不会提供每秒超过数百条已发布消息的吞吐量。但是,对于某些应用程序来说这可能已经足够了。

发布者确认异步吗?

我们在一开始提到代理程序以异步方式确认发布的消息,但是在第一个示例中,代码同步等待直到消息被确认。
客户端实际上异步接收确认,并相应地取消阻止对WaitForConfirmsOrDie的调用 。将WaitForConfirmsOrDie视为依赖于后台异步通知的同步。

策略2:批量发布消息

为了改进前面的示例,我们可以发布一批消息,并等待整个批次被确认。以下示例使用了100个批次:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
private static void PublishMessagesInBatch()
{
using (var connection = CreateConnection())
using (var channel = connection.CreateModel())
{
var queueName = channel.QueueDeclare().QueueName;
channel.ConfirmSelect();

var batchSize = 100;
var outstandingMessageCount = 0;
var timer = new Stopwatch();
timer.Start();
for (int i = 0; i < MESSAGE_COUNT; i++)
{
var body = Encoding.UTF8.GetBytes(i.ToString());
channel.BasicPublish(exchange: "", routingKey: queueName, basicProperties: null, body: body);
outstandingMessageCount++;

if (outstandingMessageCount == batchSize)
{
channel.WaitForConfirmsOrDie(new TimeSpan(0, 0, 5));
outstandingMessageCount = 0;
}
}

if (outstandingMessageCount > 0)
channel.WaitForConfirmsOrDie(new TimeSpan(0, 0, 5));

timer.Stop();
Console.WriteLine($"Published {MESSAGE_COUNT:N0} messages in batch in {timer.ElapsedMilliseconds:N0} ms");
}
}

与等待确认单个消息相比,等待一批消息被确认可以极大地提高吞吐量(对于远程RabbitMQ节点,这最多可以达到20-30倍)。
缺点之一是我们不知道发生故障时到底出了什么问题,因此我们可能必须将整个批处理保存在内存中,以记录有意义的内容或重新发布消息。而且该解决方案仍然是同步的,因此它阻止了消息的发布。

策略3:处理发布者异步确认

代理异步确认已发布的消息,只需在客户端上注册一个回调即可收到这些确认的通知:

1
2
3
4
5
6
7
8
9
10
var channel = connection.CreateModel();
channel.ConfirmSelect();
channel.BasicAcks += (sender, ea) =>
{
// RabbitMQ 确认的消息
};
channel.BasicNacks += (sender, ea) =>
{
// RabbitMQ 未确认的消息(可以认为是丢失的消息)
};

有2个回调:一个用于确认的消息,另一个用于未确认的消息(代理可以认为丢失的消息)。这两个回调都有一个对应的 EventArgs 参数(ea),其中包含:

  • DeliveryTag : 标识 已确认或丢失 消息的序列号。
  • multiple : 这是一个布尔值。如果为false,则仅 确认/丢失 一条消息;如果为true,则将 确认/丢失 序列号较低或相等的所有消息。

可以在消息发布之前通过 Channel#NextPublishSeqNo 获取序列号

1
2
var sequenceNumber = channel.NextPublishSeqNo;
channel.BasicPublish(exchange, queue, properties, body);

将消息与序列号关联的一种简单方法是使用字典。假设我们要发布字符串,因为它们很容易变成要发布的字节数组。这是一个代码示例,该示例使用字典将发布序列号与消息的字符串主体相关联:

1
2
3
4
5
var outstandingConfirms = new ConcurrentDictionary<ulong, string>();
// ... code for confirm callbacks will come later
var body = "...";
outstandingConfirms.TryAdd(channel.NextPublishSeqNo, body);
channel.BasicPublish(exchange, queue, properties, Encoding.UTF8.GetBytes(body));

现在,发布消息 使用字典来跟踪 消息是否被确认。
我们需要在消息确认回调时清理此字典,并做一些类似在消息丢失警告的操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
var outstandingConfirms = new ConcurrentDictionary<ulong, string>();

void ​cleanOutstandingConfirms(ulong sequenceNumber, bool multiple)
{
​if (multiple)
​{
​var confirmed = outstandingConfirms.Where(k => k.Key <= sequenceNumber);
​foreach (var entry in confirmed)
​{
​outstandingConfirms.TryRemove(entry.Key, out _);
​}
​}
​else
​{
​outstandingConfirms.TryRemove(sequenceNumber, out _);
​}
}

channel.BasicAcks += (sender, ea) => cleanOutstandingConfirms(ea.DeliveryTag, ea.Multiple);
channel.BasicNacks += (sender, ea) =>
{
​outstandingConfirms.TryGetValue(ea.DeliveryTag, out string body);
​Console.WriteLine($"Message with body {body} has been nack-ed. Sequence number: {ea.DeliveryTag}, multiple: {ea.Multiple}");
​cleanOutstandingConfirms(ea.DeliveryTag, ea.Multiple);
};
// ... publishing code

所有代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
using RabbitMQ.Client;
using System;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Text;
using System.Linq;
using System.Threading;

class PublisherConfirms
{
private const int MESSAGE_COUNT = 50_000;

public static void Main()
{
PublishMessagesIndividually();
PublishMessagesInBatch();
HandlePublishConfirmsAsynchronously();
Console.ReadLine();
}

private static IConnection CreateConnection()
{
var factory = new ConnectionFactory { HostName = "localhost", VirtualHost = "frexport" };
return factory.CreateConnection();
}

private static void PublishMessagesIndividually()
{
using (var connection = CreateConnection())
using (var channel = connection.CreateModel())
{
var queueName = channel.QueueDeclare().QueueName;
channel.ConfirmSelect();

var timer = new Stopwatch();
timer.Start();
for (int i = 0; i < MESSAGE_COUNT; i++)
{
var body = Encoding.UTF8.GetBytes(i.ToString());
channel.BasicPublish(exchange: "", routingKey: queueName, basicProperties: null, body: body);
channel.WaitForConfirmsOrDie(new TimeSpan(0, 0, 5));
}
timer.Stop();
Console.WriteLine($"Published {MESSAGE_COUNT:N0} messages individually in {timer.ElapsedMilliseconds:N0} ms");
}
}

private static void PublishMessagesInBatch()
{
using (var connection = CreateConnection())
using (var channel = connection.CreateModel())
{
var queueName = channel.QueueDeclare().QueueName;
channel.ConfirmSelect();

var batchSize = 100;
var outstandingMessageCount = 0;
var timer = new Stopwatch();
timer.Start();
for (int i = 0; i < MESSAGE_COUNT; i++)
{
var body = Encoding.UTF8.GetBytes(i.ToString());
channel.BasicPublish(exchange: "", routingKey: queueName, basicProperties: null, body: body);
outstandingMessageCount++;

if (outstandingMessageCount == batchSize)
{
channel.WaitForConfirmsOrDie(new TimeSpan(0, 0, 5));
outstandingMessageCount = 0;
}
}

if (outstandingMessageCount > 0)
channel.WaitForConfirmsOrDie(new TimeSpan(0, 0, 5));

timer.Stop();
Console.WriteLine($"Published {MESSAGE_COUNT:N0} messages in batch in {timer.ElapsedMilliseconds:N0} ms");
}
}

private static void HandlePublishConfirmsAsynchronously()
{
using (var connection = CreateConnection())
using (var channel = connection.CreateModel())
{
var queueName = channel.QueueDeclare().QueueName;
channel.ConfirmSelect();

var outstandingConfirms = new ConcurrentDictionary<ulong, string>();

void cleanOutstandingConfirms(ulong sequenceNumber, bool multiple)
{
if (multiple)
{
var confirmed = outstandingConfirms.Where(k => k.Key <= sequenceNumber);
foreach (var entry in confirmed)
outstandingConfirms.TryRemove(entry.Key, out _);
}
else
outstandingConfirms.TryRemove(sequenceNumber, out _);
}

channel.BasicAcks += (sender, ea) => cleanOutstandingConfirms(ea.DeliveryTag, ea.Multiple);
channel.BasicNacks += (sender, ea) =>
{
outstandingConfirms.TryGetValue(ea.DeliveryTag, out string body);
Console.WriteLine($"Message with body {body} has been nack-ed. Sequence number: {ea.DeliveryTag}, multiple: {ea.Multiple}");
cleanOutstandingConfirms(ea.DeliveryTag, ea.Multiple);
};

var timer = new Stopwatch();
timer.Start();
for (int i = 0; i < MESSAGE_COUNT; i++)
{
var body = i.ToString();
outstandingConfirms.TryAdd(channel.NextPublishSeqNo, i.ToString());
channel.BasicPublish(exchange: "", routingKey: queueName, basicProperties: null, body: Encoding.UTF8.GetBytes(body));
}

if (!WaitUntil(60, () => outstandingConfirms.IsEmpty))
throw new Exception("All messages could not be confirmed in 60 seconds");

timer.Stop();
Console.WriteLine($"Published {MESSAGE_COUNT:N0} messages and handled confirm asynchronously {timer.ElapsedMilliseconds:N0} ms");
}
}

private static bool WaitUntil(int numberOfSeconds, Func<bool> condition)
{
int waited = 0;
while (!condition() && waited < numberOfSeconds * 1000)
{
Thread.Sleep(100);
waited += 100;
}

return condition();
}
}

RabbitMQ在本机运行的效果

1
2
3
Published 50,000 messages individually in 5,549 ms
Published 50,000 messages in batch in 2,331 ms
Published 50,000 messages and handled confirms asynchronously in 4,054 ms

RabbitMQ在远程运行的效果

1
2
3
Published 50,000 messages individually in 231,541 ms
Published 50,000 messages in batch in 7,232 ms
Published 50,000 messages and handled confirms asynchronously in 6,332 ms

RabbitMQ Model介绍

欢迎关注我的其它发布渠道