前言
BufferQueue 是一個用 .NET 編寫的高性能的緩沖隊列實現,支持多線程并發操作。
項目地址:https://github.com/eventhorizon-cli/BufferQueue
項目是從 mocha 項目中獨立出來的一個組件,經過修改以提供更通用的緩沖隊列功能。
目前支持的緩沖區類型為內存緩沖區,后續會考慮支持更多類型的緩沖區。
適用場景
生產者和消費者之間的速度不一致,需要并發批量處理數據的場景。
因為目前只有內存版本,不適用于不允許數據丟失的業務場景。
功能說明
支持創建多個 Topic,每個 Topic 可以有多種數據類型。每一對 Topic 和數據類型對應一個獨立的緩沖區。
支持創建多個 Consumer Group,每個 Consumer Group 的消費進度都是獨立的。支持多個 Consumer Group 并發消費同一個 Topic。
支持同一個 Consumer Group 創建多個 Consumer,以負載均衡的方式消費數據。
支持數據的批量消費,可以一次性獲取多條數據。
支持 pull 模式和 push 模式兩種消費模式。
pull 模式下和 push 模式下都支持 auto commit 和 manual commit 兩種提交方式。auto commit 模式下,消費者在收到數據后自動提交消費進度,如果消費失敗不會重試。manual commit 模式下,消費者需要手動提交消費進度,如果消費失敗只要不提交進度就可以重試。
需要注意的是,當前版本出于簡化實現的考慮,暫不支持消費者的動態擴容和縮容,需要在創建消費者時指定消費者數量。
使用示例
安裝 Nuget 包:
dotnet add package BufferQueue
項目基于 Microsoft.Extensions.DependencyInjection,使用時需要先注冊服務。
BufferQueue 支持兩種消費模式:pull 模式和 push 模式。
builder.Services.AddBufferQueue(options =>
{
options.UseMemory(bufferOptions =>
{
// 每一對 Topic 和數據類型對應一個獨立的緩沖區,可以設置 partitionNumber
bufferOptions.AddTopic<Foo>("topic-foo1", partitionNumber: 6);
bufferOptions.AddTopic<Foo>("topic-foo2", partitionNumber: 4);
bufferOptions.AddTopic<Bar>("topic-bar", partitionNumber: 8);
})
// 添加 push 模式的消費者
// 掃描指定程序集中的標記了 BufferPushCustomerAttribute 的類,
// 注冊為 push 模式的消費者
.AddPushCustomers(typeof(Program).Assembly);
});
// 在 HostedService 中使用 pull模式 消費數據
builder.Services.AddHostedService<Foo1PullConsumerHostService>();
pull 模式的消費者示例:
public class Foo1PullConsumerHostService(
IBufferQueue bufferQueue,
ILogger<Foo1PullConsumerHostService> logger) : IHostedService
{
private readonly CancellationTokenSource _cancellationTokenSource = new();
public Task StartAsync(CancellationToken cancellationToken)
{
var token = CancellationTokenSource
.CreateLinkedTokenSource(cancellationToken, _cancellationTokenSource.Token)
.Token;
var consumers = bufferQueue.CreatePullConsumers<Foo>(
new BufferPullConsumerOptions
{
TopicName = "topic-foo1", GroupName = "group-foo1", AutoCommit = true, BatchSize = 100,
}, consumerNumber: 4);
foreach (var consumer in consumers)
{
_ = ConsumeAsync(consumer, token);
}
return Task.CompletedTask;
}
public Task StopAsync(CancellationToken cancellationToken)
{
_cancellationTokenSource.Cancel();
return Task.CompletedTask;
}
private async Task ConsumeAsync(IBufferPullConsumer<Foo> consumer, CancellationToken cancellationToken)
{
await foreach (var buffer in consumer.ConsumeAsync(cancellationToken))
{
foreach (var foo in buffer)
{
// Process the foo
logger.LogInformation("Foo1PullConsumerHostService.ConsumeAsync: {Foo}", foo);
}
}
}
}
push 模式的消費者示例:
通過 BufferPushCustomer 特性注冊 push 模式的消費者。
push consumer 會被注冊到 DI 容器中,可以通過構造函數注入其他服務,可以通過設置 ServiceLifetime 來控制 consumer 的生命周期。
BufferPushCustomerAttribute 中的 concurrency 參數用于設置 push consumer 的消費并發數,對應 pull consumer 的 consumerNumber。
[BufferPushCustomer(
topicName: "topic-foo2",
groupName: "group-foo2",
batchSize: 100,
serviceLifetime: ServiceLifetime.Singleton,
concurrency: 2)]
public class Foo2PushConsumer(ILogger<Foo2PushConsumer> logger) : IBufferAutoCommitPushConsumer<Foo>
{
public Task ConsumeAsync(IEnumerable<Foo> buffer, CancellationToken cancellationToken)
{
foreach (var foo in buffer)
{
logger.LogInformation("Foo2PushConsumer.ConsumeAsync: {Foo}", foo);
}
return Task.CompletedTask;
}
}
[BufferPushCustomer(
"topic-bar",
"group-bar",
100,
ServiceLifetime.Scoped,
2)]
public class BarPushConsumer(ILogger<BarPushConsumer> logger) : IBufferManualCommitPushConsumer<Bar>
{
public async Task ConsumeAsync(IEnumerable<Bar> buffer, IBufferConsumerCommitter committer,
CancellationToken cancellationToken)
{
foreach (var bar in buffer)
{
logger.LogInformation("BarPushConsumer.ConsumeAsync: {Bar}", bar);
}
var commitTask = committer.CommitAsync();
if (!commitTask.IsCompletedSuccessfully)
{
await commitTask.AsTask();
}
}
}
Producer 示例:
通過 IBufferQueue 獲取到指定的 Producer,然后調用 ProduceAsync 方法發送數據。
[ApiController]
[Route("/api/[controller]")]
public class TestController(IBufferQueue bufferQueue) : ControllerBase
{
[HttpPost("foo1")]
public async Task<IActionResult> PostFoo1([FromBody] Foo foo)
{
var producer = bufferQueue.GetProducer<Foo>("topic-foo1");
await producer.ProduceAsync(foo);
return Ok();
}
[HttpPost("foo2")]
public async Task<IActionResult> PostFoo2([FromBody] Foo foo)
{
var producer = bufferQueue.GetProducer<Foo>("topic-foo2");
await producer.ProduceAsync(foo);
return Ok();
}
[HttpPost("bar")]
public async Task<IActionResult> PostBar([FromBody] Bar bar)
{
var producer = bufferQueue.GetProducer<Bar>("topic-bar");
await producer.ProduceAsync(bar);
return Ok();
}
}
BufferQueue 內部設計概述
Topic 的隔離
BufferQueue 有以下的特性:
同一個數據類型 下的 不同 Topic 的 BufferQueue 互不干擾。
同一個 Topic 下的 不同數據類型 的 BufferQueue 互不干擾。
這個特性是通過以下兩層接口設計實現的:
IBufferQueue:根據 TopicName 和 類型參數 T 將請求轉發給具體的 IBufferQueue<T> 實現(借助 KeyedService 實現),其中參數 T 代表 Buffer 所承載的數據實體的類型。
IBufferQueue<T>:具體的 BufferQueue 實現,負責管理 Topic 下的數據。屬于 Buffer 模塊的內部實現,不對外暴露。
Partition 的設計
為了保證消費速度,BufferQueue 將數據劃分為多個 Partition,每個 Partition 都是一個獨立的隊列,每個 Partition 都有一個對應的消費者線程。
Producer 以輪詢的方式往每個 Partition 中寫入數據。
Consumer 最多不允許超過 Partition 的數量,Partition 按平均分配到組內每個 Customer 上。
當一個 Consumer 被分配了多個 Partition 時,以輪訓的方式進行消費。
每個 Partition 上會記錄不同消費組的消費進度,不同組之間的消費進度互不干擾。
對并發的支持
Producer 支持并發寫入。
Consumer 消費時是綁定 Partition 的,為保證能正確管理 Partition 的消費進度,Consumer 不支持并發消費。
如果要增加消費速度,需創建多個 Consumer。
Partition 的動態擴容
Partition 的基本組成單元是 Segment,Segment 代表保存數據的數組,多個 Segment 通過鏈表的形式組合成一個 Partition。
當一個 Segment 寫滿后,通過在其后面追加一個 Segment 實現擴容。
Segment 中用于保存數據的數組的每一個元素稱為 Slot,每個 Slot 都有一個Partition 內唯一的自增 Offset。
Segment 的回收機制
每次在 Partition 中新增 Segment 時,會從頭判斷此前的 Segment 是否已經被所有消費組消費完,回收最后一個消費完的 Segment 作為新的 Segment 追加到 Partition 末尾使用。
Benchmark
測試環境:Apple M2 Max 64GB
寫入性能測試
與 BlockingCollection 對比并發,并發線程數為 CPU 邏輯核心數 12, partitionNumber 為 1 和 12。
測試結果
在并發寫入時,BufferQueue 的寫入性能明顯優于 BlockingCollection。
消費性能測試
pull 模式 consumer 與 BlockingCollection 對比并發讀取性能,并發線程數為 CPU 邏輯核心數 12,partitionNumber 為 12。
測試結果
在批量消費時,隨著批量大小的增加,BufferQueue 的消費性能優勢更加明顯。
轉自https://www.cnblogs.com/eventhorizon/p/18331018
該文章在 2024/8/5 9:51:53 編輯過