Anton Martyniuk
Việc xây dựng các ứng dụng .NET đáng tin cậy, có khả năng mở rộng và hiệu suất cao thường phụ thuộc vào cách bạn xử lý concurrency và xử lý dữ liệu. C# Channels mang đến một cách tiếp cận hiện đại để xây dựng các pipeline an toàn, bất đồng bộ và có thông lượng cao trong .NET.
Channels cho phép bạn tạo các hàng đợi producer-consumer trong bộ nhớ tự động scale qua các workflow bất đồng bộ và dịch vụ nền. Tuy nhiên, một quyết định kiến trúc quan trọng là lựa chọn giữa các channel có giới hạn và không giới hạn.
Trong bài viết hôm nay, chúng ta sẽ khám phá:
- C# Channels là gì?
- Channel có giới hạn vs Channel không giới hạn
- Xử lý nền với Channels
- Sử dụng channels trong ứng dụng ASP.NET Core thực tế
- Các best practices và tips khi làm việc với Channels
Hãy cùng bắt đầu!
Mục lục
C# Channels là gì?
Khi xây dựng các ứng dụng .NET, bạn thường cần gửi dữ liệu từ một phần của code sang một phần khác.
Trong quá khứ, các nhà phát triển đã sử dụng các cấu trúc như Queue<T>, ConcurrentQueue<T>, hoặc BlockingCollection<T> cho mục đích này. Họ đóng gói các hàng đợi này vào các lớp và sử dụng chúng để quản lý luồng dữ liệu.
Tuy nhiên, các triển khai như vậy có một nhược điểm đáng kể: coupling chặt chẽ giữa các đoạn code.
C# Channels giải quyết vấn đề này. Chúng triển khai pattern producer-consumer. Một lớp tạo dữ liệu và lớp khác tiêu thụ nó, mà không cần biết về nhau.
C# Channels đến từ namespace System.Threading.Channels
. Channels làm cho việc gửi dữ liệu giữa producers và consumers trở nên đơn giản, giúp bạn tránh các vấn đề threading phổ biến.
Một channel có hai phần:
- Writer: đẩy dữ liệu vào Channel.
- Reader: kéo dữ liệu ra khỏi Channel.
Cả đọc và ghi đều có thể xảy ra trên các thread khác nhau, và Channels đảm bảo thread safety. Chúng cho phép bạn sử dụng code async ở mọi nơi, vì vậy ứng dụng của bạn có thể xử lý lượng lớn dữ liệu mà không block threads hoặc locking.
Dưới đây là một ví dụ cơ bản về sử dụng channels trong C#. Ở đây chúng ta tạo ra các số một cách bất đồng bộ và tiêu thụ chúng từ Channel:
using System;
using System.Threading.Channels;
using System.Threading.Tasks;
var channel = Channel.CreateUnbounded<int>();
// Producer
_ = Task.Run(async () =>
{
for (var i = 0; i < 10; i++)
{
await channel.Writer.WriteAsync(i);
Console.WriteLine($"Produced: {i}");
await Task.Delay(100); // simulate work
}
channel.Writer.Complete();
});
// Consumer
await foreach (var item in channel.Reader.ReadAllAsync())
{
Console.WriteLine($"Consumed: {item}");
await Task.Delay(150); // simulate processing
}
Console.WriteLine("Processing complete.");
Dưới đây là cách nó hoạt động:
- Producer chạy trên một Thread riêng (Task), ghi dữ liệu vào Channel.
- Consumer đọc dữ liệu từ Channel và xử lý nó.
- Channel xử lý tất cả các kết nối và thread safety.
Channels phù hợp khi:
- Bạn cần kết nối producers và consumers sử dụng async/await.
- Bạn muốn tách biệt logic producing và consuming
- Bạn muốn kiểm soát luồng dữ liệu (ví dụ: làm chậm producers khi consumers không theo kịp).
- Bạn muốn tránh threading ở mức thấp, locking, hoặc synchronization thủ công.
Channels là lựa chọn lý tưởng cho streaming events và processing background tasks. Chúng là một lựa chọn trong bộ nhớ đơn giản thay thế cho message queue trong một ứng dụng duy nhất.
Channel có giới hạn vs Channel không giới hạn
C# channels có thể có 2 loại: bounded và unbounded. Cả hai đều cho phép bạn truyền dữ liệu từ producer đến consumer, nhưng chúng xử lý flow control và memory khác nhau.
Channel có giới hạn là gì?
Một channel có giới hạn có dung lượng tối đa cố định. Khi bạn tạo nó, bạn đặt giới hạn cho số lượng mục nó có thể chứa tại một thời điểm.
Nếu producer cố gắng thêm nhiều mục hơn sau khi Channel đã đầy, nó phải chờ cho đến khi có không gian.
Nên sử dụng channel có giới hạn khi nào?
- Khi bạn muốn giới hạn việc sử dụng memory và防止 overload.
- Khi consumer đôi khi chậm hơn producer.
- Khi bạn cần backpressure để tránh làm hệ thống bị ngập.
var channel = Channel.CreateBounded<int>(5);
// Producer
await channel.Writer.WriteAsync(1); // Adds item if there's space, waits if full
// Consumer
var item = await channel.Reader.ReadAsync(); // Takes an item
Channels có giới hạn là lựa chọn tốt cho background processing, job queues, và bất kỳ ứng dụng nào bạn cần kiểm soát việc sử dụng tài nguyên. Chúng giúp bảo vệ ứng dụng của bạn khỏi các đột ngột và runaway producers.
Channel không giới hạn là gì?
Một channel không giới hạn không có giới hạn cố định. Producer có thể tiếp tục thêm mục nhanh như họ muốn.
Channel sẽ phát triển để xử lý càng nhiều mục càng tốt, chỉ bị giới hạn bởi memory hệ thống có sẵn.
Nên sử dụng channel không giới hạn khi nào?
- Khi bạn chắc chắn producer sẽ không bao giờ vượt qua consumer trong thời gian dài.
- Các trường hợp đơn giản mà flow control không phải là vấn đề.
- Khi bạn chỉ mong đợi một lượng nhỏ hoặc ổn định các mục.
Dưới đây là cách tạo một channel không giới hạn:
var channel = Channel.CreateUnbounded<int>();
// Producer
await channel.Writer.WriteAsync(42); // Always accepts new items
// Consumer
var item = await channel.Reader.ReadAsync(); // Takes an item
Channels không giới hạn dễ sử dụng, nhưng chúng có thể rủi ro trong các tình huống high-load. Nếu producer ghi mục nhanh hơn consumer đọc chúng, bạn có thể hết memory.
Nên sử dụng loại nào?
Nếu bạn muốn an toàn và ổn định trong hệ thống, hãy bắt đầu với channels có giới hạn. Chúng bảo vệ ứng dụng của bạn nếu consumer bị tụt lại phía sau.
Nếu bạn biết producer luôn kiểm soát được và tốc độ dữ liệu thấp, bạn có thể sử dụng channels không giới hạn.
Trong hầu hết các dịch vụ .NET thực tế, channels có giới hạn là mặc định an toàn hơn.
Background Processor với Channel có giới hạn
Trong production, channels thường được sử dụng trong ASP.NET Core background services.
Một pattern phổ biến là thiết lập một background service đọc messages hoặc tasks từ một channel và xử lý chúng lần lượt. Điều này làm cho code của bạn dễ dàng scale và giữ cho main thread được tự do cho các công việc khác.
Hãy cùng khám phá một ví dụ về BackgroundService xử lý items từ một Channel:
builder.Services.AddSingleton(_ => Channel.CreateBounded<string>(new BoundedChannelOptions(100)
{
FullMode = BoundedChannelFullMode.Wait
}));
public class MessageProcessor : BackgroundService
{
private readonly Channel<string> _channel;
private readonly ILogger<MessageProcessor> _logger;
public MessageProcessor(Channel<string> channel, ILogger<MessageProcessor> logger)
{
_channel = channel;
_logger = logger;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_logger.LogInformation("Message processor starting");
await foreach (var message in _channel.Reader.ReadAllAsync(stoppingToken))
{
_logger.LogInformation("Processing message: {Message}", message);
await Task.Delay(100, stoppingToken);
_logger.LogInformation("Message processed successfully: {Message}", message);
}
}
}
Để publish items vào Channel, bạn cần gọi phương thức WriteAsync
:
private Task AddSampleMessagesAsync(CancellationToken stoppingToken)
{
_ = Task.Run(async () =>
{
// Add 150 messages (more than channel capacity of 100)
for (var i = 1; i <= 150 && !stoppingToken.IsCancellationRequested; i++)
{
var message = $"Sample message #{i}";
// Wait until there's space in the Channel (this will block when the Channel is full)
await _channel.Writer.WriteAsync(message, stoppingToken);
_logger.LogInformation("Added message to channel: {Message}", message);
await Task.Delay(50, stoppingToken);
}
// Complete the Channel when done adding messages
_channel.Writer.Complete();
}, stoppingToken);
return Task.CompletedTask;
}
Lưu ý rằng chúng ta chạy vòng lặp foreach trên AsyncEnumerable
:
await foreach (var message in _channel.Reader.ReadAllAsync(stoppingToken))
{
// Process message
}
_channel.Reader.ReadAllAsync
chờ một message mới xuất hiện trong Channel. Khi hoàn tất việc publish messages, bạn có thể gọi _channel.Writer.Complete()
và vòng lặp sẽ kết thúc.
Channel có giới hạn với dung lượng 100, vì vậy nếu 100 messages đang chờ, producer sẽ tạm dừng cho đến khi có slot trống. Background service đọc messages nhanh như nó có thể xử lý chúng.
Nếu bạn cố gắng thêm messages quá nhanh, producer sẽ chậm lại, điều này giúp việc sử dụng memory của bạn được kiểm soát.
Khi tạo một channel có giới hạn, bạn có thể đặt BoundedChannelFullMode
để kiểm soát điều gì xảy ra khi Channel đầy:
- Wait: Writer chờ cho đến khi có không gian (phổ biến nhất, an toàn nhất).
- DropWrite: Các mục mới bị loại bỏ nếu Channel đầy.
- DropOldest: Mục cũ nhất bị loại bỏ để tạo không gian cho mục mới.
- DropNewest: Mục mới nhất bị loại bỏ thay vào đó.
Đối với hầu hết các background tasks, sử dụng Wait
. Khi mất một vài message là chấp nhận được, DropWrite
hoặc DropOldest
có thể là lựa chọn phù hợp. Bạn có thể sử dụng DropOldest
khi event mới nhất liên quan hơn các event cũ.
Ứng dụng thực tế với Channels
Trong một ứng dụng ASP.NET Core thực tế, bạn có thể sử dụng Channels để triển khai Write Back Caching Strategy.
Chiến lược caching này được sử dụng trong các tình huống high-speed, write-intensive.
Ý tưởng chính là dữ liệu được viết vào cache trước. Cache sau đó viết dữ liệu trở lại database một cách bất đồng bộ sau một điều kiện hoặc khoảng thời gian nhất định.
Hãy cùng khám phá một ví dụ thực tế: một ứng dụng Online Store.
p>Người dùng thường thêm hoặc loại bỏ các items từ giỏ hàng trực tuyến. Trạng thái cuối cùng chỉ thực sự quan trọng tại thời điểm thanh toán. Việc ghi có thể rất nhanh vì nó hit cache trước, và hệ thống có thể periodically flush updates đến database.
Điều này cho phép high write throughput trong các thời điểm mua sắm cao điểm, với database cuối cùng nhận được các chi tiết giỏ hàng cuối cùng.
Dưới đây là WebApi endpoint tạo ra một product cart:
public record ProductCartRequest(string UserId, List<ProductCartItemRequest> ProductCartItems);
public record ProductCartItemRequest(Guid ProductId, int Quantity);
[HttpPost]
public async Task<ActionResult<ProductCartResponse>> CreateCart(ProductCartRequest request)
{
var response = await _service.AddAsync(request);
return CreatedAtAction(nameof(GetCart), new { id = response.Id }, response);
}
Khi tạo một ProductCart
mới, nó được thêm vào cache ngay lập tức:
public class WriteBackCacheProductCartService
{
private readonly HybridCache _cache;
private readonly IProductCartRepository _repository;
private readonly Channel<ProductCartDispatchEvent> _channel;
public WriteBackCacheProductCartService(
HybridCache cache,
IProductCartRepository repository,
Channel<ProductCartDispatchEvent> channel)
{
_cache = cache;
_repository = repository;
_channel = channel;
}
public async Task<ProductCartResponse> AddAsync(ProductCartRequest request)
{
var productCart = new ProductCart
{
Id = Guid.NewGuid(),
UserId = request.UserId,
CartItems = request.ProductCartItems.Select(x => new CartItem
{
Id = Guid.NewGuid(),
Quantity = x.Quantity,
Price = Random.Shared.Next(100, 1000)
}).ToList()
};
var cacheKey = $"productCart:{productCart.Id}";
var productCartResponse = MapToProductCartResponse(productCart);
await _cache.SetAsync(cacheKey, productCartResponse);
await _channel.Writer.WriteAsync(new ProductCartDispatchEvent(productCart));
return productCartResponse;
}
}
Ở đây chúng ta sử dụng một Bounded Channel để publish ProductCartDispatchEvent
.
public record ProductCartDispatchEvent(ProductCart ProductCart);
builder.Services.AddSingleton(_ => Channel.CreateBounded<ProductCartDispatchEvent>(new BoundedChannelOptions(100)
{
FullMode = BoundedChannelFullMode.Wait
}));
Một background đọc từ Channel và viết cart data đến database một cách bất đồng bộ:
public class WriteBackCacheBackgroundService(IServiceScopeFactory scopeFactory,
Channel<ProductCartDispatchEvent> channel) : BackgroundService
{
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
await foreach (var command in channel.Reader.ReadAllAsync(stoppingToken))
{
using var scope = scopeFactory.CreateScope();
var repository = scope.ServiceProvider.GetRequiredService<IProductCartRepository>();
var existingCart = await repository.GetByIdAsync(command.ProductCart.Id);
if (existingCart is null)
{
await repository.AddAsync(command.ProductCart);
return;
}
existingCart.CartItems = command.ProductCart.CartItems;
await repository.UpdateAsync(existingCart);
}
}
}
Lưu ý: Pattern này có thể tăng tốc độ ghi đáng kể, nhưng nó yêu cầu conflict resolution và failure handling mạnh mẽ để đảm bảo consistency. Đảm bảo trạng thái cart không bao giờ bị mất trong trường hợp cache failure, đòi hỏi các cơ chế replication hoặc backup mạnh mẽ.
Các Best Practices và Tips khi làm việc với Channels
Channels là một công cụ mạnh mẽ, nhưng để tận dụng tối đa chúng (và tránh các vấn đề), việc theo đuổi một vài best practices quan trọng là rất quan trọng. Dưới đây là một số tips thực tế khi sử dụng channels trong các ứng dụng .NET của bạn.
1. Ưu tiên Channels có giới hạn cho An toàn:
Hầu hết thời gian, bạn nên bắt đầu với một channel có giới hạn. Channel có giới hạn đặt ra giới hạn cho lượng dữ liệu có thể chờ được xử lý. Điều này làm cho ứng dụng của bạn ổn định hơn dưới tải nặng.
Đặt dung lượng thành một số hợp lý cho workload của bạn. Nếu bạn mong đợi các đột ngột dữ liệu, hãy chọn một giá trị bao gồm các burst điển hình nhưng không quá lớn.
Channels không giới hạn chỉ an toàn khi bạn biết tốc độ dữ liệu luôn thấp.
2. Nhớ Complete Channel:
Khi producer hoàn tất việc ghi, hãy gọi .Writer.Complete()
trên Channel. Điều này cho consumer biết sẽ không còn dữ liệu nữa, cho phép nó kết thúc.
Nếu bạn không complete Channel, vòng lặp ReadAllAsync của consumer sẽ không bao giờ kết thúc.
3. Sử dụng await với Write và Read Operations:
Channels được thiết kế cho code async. Luôn sử dụng await
khi viết hoặc đọc để tránh blocking threads và giữ ứng dụng của bạn responsive.
await Channel.Writer.WriteAsync(data, cancellationToken);
await foreach (var item in channel.Reader.ReadAllAsync(cancellationToken))
{
// process item
}
4. Xử lý Cancellation Properly:
Luôn truyền một CancellationToken
khi đọc hoặc ghi vào một channel. Điều này cho phép bạn dừng xử lý nếu ứng dụng của bạn đang shutting down hoặc nếu một user hủy một operation.
await channel.Writer.WriteAsync(message, cancellationToken);
5. Không Share Channels giữa quá nhiều Producers hoặc Consumers:
Mặc dù channels có thể xử lý nhiều writers và readers, nhưng có quá nhiều có thể khiến việc gỡ lỗi trở nên khó khăn. Trong hầu hết các trường hợp, hãy bám sát một producer và một consumer duy nhất để có hiệu suất tốt nhất và dễ dàng hiểu nhất.
Nếu bạn cần nhiều hơn, .NET channels hỗ trợ multiple readers và writers, nhưng bạn nên thiết kế cẩn thận để tránh các bất ngờ.
6. Monitor việc sử dụng Channel của bạn:
Theo dõi dung lượng Channel của bạn trong production. Nếu bạn thường thấy Channel đầy (producers chờ để ghi), điều đó có nghĩa là consumers của bạn quá chậm. Bạn có thể cần tăng tốc xử lý hoặc tăng kích thước channel.
7. Chọn đúng FullMode (cho Channels có giới hạn):
Khi tạo một channel có giới hạn, bạn có thể đặt BoundedChannelFullMode
để kiểm soát điều gì xảy ra khi Channel đầy.
Hy vọng bạn thấy newsletter này hữu ích. Hẹn gặp lại lần sau.
Bạn có thể tải source code cho newsletter này miễn phí
[Download source code](https://antondevtips.com/source-code/building-high-performance-dotnet-apps-with-csharp-channels)
Thích bài viết này? Chia sẻ với mạng lưới của bạn
[LinkedIn](https://www.linkedin.com/sharing/share-offsite/?url=https%3A%2F%2Fantondevtips.com%2Fblog%2Fbuilding-high-performance-dotnet-apps-with-csharp-channels&title=Building%20High-Performance%20.NET%20Apps%20With%20C%23%20Channels)[X](https://twitter.com/intent/tweet?text=Building%20High-Performance%20.NET%20Apps%20With%20C%23%20Channels&url=https%3A%2F%2Fantondevtips.com%2Fblog%2Fbuilding-high-performance-dotnet-apps-with-csharp-channels)[Facebook](https://www.facebook.com/sharer/sharer.php?u=https%3A%2F%2Fantondevtips.com%2Fblog%2Fbuilding-high-performance-dotnet-apps-with-csharp-channels)
Cải thiện kỹ năng **.NET** và Architecture của bạn
Tham gia cộng đồng của **12,000+** nhà phát triển và kiến trúc sư.
Mỗi tuần bạn sẽ nhận được 1 tip thực tế với best practices và ví dụ thực tế.
Học cách xây dựng phần mềm tốt hơn với source code có sẵn cho newsletter của tôi.
Subscribe
Email address Join 12,000+ Subscribers
Tham gia 12,000+ nhà phát triển đã đọc
No spam. Unsubscribe any time.
© 2025 Anton Martyniuk. All rights reserved.
Hãy kết nối
[Email](mailto:[email protected])[LinkedIn](https://www.linkedin.com/in/anton-martyniuk/)[Twitter](https://twitter.com/AntonMartyniuk)[GitHub](https://github.com/anton-martyniuk)[Patreon](https://patreon.com/AntonMartyniuk)