Hướng dẫn Xây dựng Hàng đợi Công việc Nền (Background Job Queue) trong Bộ Nhớ với ASP.NET Core từ Đầu

Trong các ứng dụng web, việc cung cấp API nhanh và phản hồi nhanh là rất quan trọng đối với trải nghiệm người dùng. Tuy nhiên, một số thao tác như gửi email, tạo báo cáo hoặc thêm người dùng hàng loạt có thể mất nhiều thời gian. Thực hiện các tác vụ này trong một yêu cầu có thể chặn các luồng, dẫn đến thời gian phản hồi chậm.

Core Components of Background Job System

IHostedService và BackgroundService là gì?

IHostedService là một interface được sử dụng trong ASP.NET Core để triển khai các tác vụ nền chạy dài được quản lý bởi vòng đời ứng dụng. Khi ứng dụng của bạn khởi động, nó bắt đầu tất cả các ứng dụng IHostedService đã đăng ký. Khi tắt, nó hướng dẫn chúng dừng lại.

Cấu trúc Producer/Consumer

  • Producer: Đây là phần của ứng dụng tạo ra các công việc và thêm chúng vào hàng đợi. Trong ví dụ của chúng tôi, producer sẽ là một API Controller.
  • Consumer: Đây là một dịch vụ nền liên tục theo dõi hàng đợi, lấy công việc từ hàng đợi và thực thi chúng. Ứng dụng BackgroundService của chúng ta sẽ là consumer.
  • Queue: Đây là cấu trúc dữ liệu giữa producer và consumer giữ các công việc đang chờ được xử lý.

Tại sao sử dụng System.Threading.Channels?

Được giới thiệu trong .NET Core 3.0, System.Threading.Channels cung cấp một cấu trúc dữ liệu đồng bộ hóa được thiết kế cho các kịch bản producer-consumer không đồng bộ.

Triển khai Background Job Queue

Tạo Project

Đầu tiên, tạo một project ASP.NET Core Web API mới bằng .NET CLI.


dotnet new sln -n BackgroundJobQueue<br>
dotnet new webapi -n BackgroundJobQueue.Api<br>
dotnet sln BackgroundJobQueue.sln add BackgroundJobQueue.Api/BackgroundJobQueue.Api.csproj

Định nghĩa Interface cho Job Queue

Tạo file IBackgroundTaskQueue.cs.


namespace BackgroundJobQueue;<br><br>
public interface IBackgroundTaskQueue<br>
{<br>
ValueTask EnqueueAsync(Func<CancellationToken, ValueTask> workItem);<br>
ValueTask<Func<CancellationToken, ValueTask>> DequeueAsync(CancellationToken cancellationToken);<br>
}

Triển khai Queue Service với Channels

Tạo file BackgroundTaskQueue.cs.


using System.Threading.Channels;<br><br>
namespace BackgroundJobQueue;<br><br>
public class BackgroundTaskQueue : IBackgroundTaskQueue<br>
{<br>
private readonly Channel<Func<CancellationToken, ValueTask>> _queue;<br><br>
public BackgroundTaskQueue(int capacity)<br>
{<br>
var options = new BoundedChannelOptions(capacity)<br>
{<br>
FullMode = BoundedChannelFullMode.Wait<br>
};<br>
_queue = Channel.CreateBounded<Func<CancellationToken, ValueTask>>(options);<br>
}<br><br>
public async ValueTask EnqueueAsync(Func<CancellationToken, ValueTask> workItem)<br>
{<br>
if (workItem is null)<br>
{<br>
throw new ArgumentNullException(nameof(workItem));<br>
}<br>
await _queue.Writer.WriteAsync(workItem);<br>
}<br><br>
public async ValueTask<Func<CancellationToken, ValueTask>> DequeueAsync(CancellationToken cancellationToken)<br>
{<br>
var workItem = await _queue.Reader.ReadAsync(cancellationToken);<br>
return workItem;<br>
}<br>
}

Tạo Consumer Service (QueuedHostedService)

Tạo file QueuedHostedService.cs.


namespace BackgroundJobQueue;<br><br>
public class QueuedHostedService : BackgroundService<br>
{<br>
private readonly IBackgroundTaskQueue _taskQueue;<br>
private readonly ILogger<QueuedHostedService> _logger;<br><br>
public QueuedHostedService(IBackgroundTaskQueue taskQueue, ILogger<QueuedHostedService> logger)<br>
{<br>
_taskQueue = taskQueue;<br>
_logger = logger;<br>
}<br><br>
protected override async Task ExecuteAsync(CancellationToken stoppingToken)<br>
{<br>
_logger.LogInformation("Queued Hosted Service is running.");<br><br>
while (!stoppingToken.IsCancellationRequested)<br>
{<br>
var workItem = await _taskQueue.DequeueAsync(stoppingToken);<br><br>
try<br>
{<br>
await workItem(stoppingToken);<br>
}<br>
catch (Exception ex)<br>
{<br>
_logger.LogError(ex, "Error occurred executing {WorkItem}.", nameof(workItem));<br>
}<br>
}<br><br>
_logger.LogInformation("Queued Hosted Service is stopping.");<br>
}<br>
}

Đăng ký Services trong Program.cs


using BackgroundJobQueue;<br><br>
var builder = WebApplication.CreateBuilder(args);<br><br>
// Register the background task queue as a Singleton<br>builder.Services.AddSingleton<IBackgroundTaskQueue>(_ => <br>{<br> if (!int.TryParse(builder.Configuration["QueueCapacity"], out var queueCapacity))<br> {<br> queueCapacity = 100;<br> }<br> return new BackgroundTaskQueue(queueCapacity);<br>});<br><br>// Register the hosted service<br>builder.Services.AddHostedService<QueuedHostedService>();<br><br>var app = builder.Build();<br><br>app.Run();

Tạo Producer (API Controller)

Tạo JobsController.cs.


using Microsoft.AspNetCore.Mvc;<br><br>
namespace BackgroundJobQueue.Controllers;<br><br>
[ApiController]<br>
[Route("[controller]")]<br>
public class JobsController : ControllerBase<br>
{<br>
private readonly IBackgroundTaskQueue _queue;<br>
private readonly ILogger<JobsController> _logger;<br><br>
public JobsController(IBackgroundTaskQueue queue, ILogger<JobsController> logger)<br>
{<br>
_queue = queue;<br>
_logger = logger;<br>
}<br><br>
[HttpPost]<br>
public async Task<IActionResult> EnqueueJob()<br>
{<br>
await _queue.EnqueueAsync(async token =><br>
{<br>
var guid = Guid.NewGuid();<br>
_logger.LogInformation("Job {Guid} started.", guid);<br>
await Task.Delay(TimeSpan.FromSeconds(5), token);<br>
_logger.LogInformation("Job {Guid} finished.", guid);<br>
});<br><br>
return Ok("Job has been enqueued.");<br>
}<br>
}

Các Xem xét Quan trọng và Chủ đề Nâng cao

Tắt ứng dụng

CancellationToken được truyền cho ExecuteAsync rất quan trọng. Host kích hoạt token này khi bạn dừng ứng dụng.

Xử lý Lỗi

try-catch block ngăn một công việc bị lỗi làm crash toàn bộ consumer.

Giới hạn của In Memory Queue

Nhược điểm lớn nhất là hàng đợi trong bộ nhớ. Nếu quá trình ứng dụng khởi động lại hoặc crash vì bất kỳ lý do gì, mọi công việc đang chờ trong hàng đợi sẽ bị mất.

Khi nào nên sử dụng thư viện bên ngoài?

  • Hangfire: Khi bạn cần tính liên tục của công việc (lưu công việc vào database), tự động retry, bảng quản lý và công việc được lên lịch.
  • RabbitMQ: Khi xây dựng hệ thống phân tán (kiến trúc microservices).

Câu hỏi Thường gặp (FAQ)

Q1: Điều gì xảy ra với các công việc trong hàng đợi nếu ứng dụng của tôi khởi động lại hoặc bị crash?
A: Vì đây là hàng đợi trong bộ nhớ, mọi công việc chưa được xử lý sẽ bị mất khi ứng dụng đóng hoặc crash.

Q2: Kết quả này có hoạt động trong môi trường load balanced với nhiều máy chủ không?
A: Không, triển khai này chỉ cụ thể cho instance. Hàng đợi chỉ tồn tại trong bộ nhớ của máy chủ nhận yêu cầu API.

Q3: Làm thế nào để giám sát số lượng công việc hiện tại trong hàng đợi?
A: Tiêm IBackgroundTaskQueue vào một dịch vụ và ghi lại số lượng hàng đợi định kỳ.

Kết luận

Chúng ta đã xây dựng một hàng đợi công việc nền hiệu suất cao trong bộ nhớ với ASP.NET Core chỉ sử dụng các tính năng framework tích hợp sẵn. Sử dụng IHostedService và System.Threading.Channels, chúng ta đã tạo một triển khai hiệu quả của mô hình producer-consumer.

Tài liệu Tham khảo

Chỉ mục