Hướng dẫn Xây dựng Hàng đợi Công việc Nền Trong Bộ nhớ Trong ASP.NET Core từ Đầu

Trong các ứng dụng web, việc cung cấp API nhanh và phản hồi nhanh là rất quan trọng đối với trải nghiệm người dùng. Tuy nhiên, một số hoạt động như gửi email, tạo báo cáo hoặc thêm người dùng hàng loạt có thể mất nhiều thời gian. Thực hiện các tác vụ này trong quá trình xử lý yêu cầu có thể chặn các luồng, dẫn đến thời gian phản hồi chậm.

Để giải quyết vấn đề này một cách hiệu quả, việc chạy các tác vụ dài hạn trong một quy trình nền sẽ tốt hơn. Thay vì chờ đợi yêu cầu hoàn thành, chúng ta có thể đưa tác vụ vào hàng đợi, cung cấp phản hồi ngay lập tức cho người dùng và thực hiện công việc trong một quy trình nền riêng biệt.

Mặc dù có các thư viện như Hangfire hoặc message broker như RabbitMQ cho mục đích này, bạn không cần bất kỳ phụ thuộc bên ngoài nào. Trong bài đăng blog này, chúng ta sẽ xây dựng một hàng đợi công việc nền trong bộ nhớ từ đầu trong ASP.NET Core chỉ sử dụng các tính năng .NET tích hợp.

Các Thành phần Cốt lõi của Hệ thống Công việc Nền

Trước khi triển khai code, điều quan trọng là đầu tiên cần hiểu các phần cơ bản tạo nên hệ thống công việc nền và cách chúng hoạt động để xử lý các tác vụ đã đưa vào hàng đợi.

IHostedService và BackgroundService là gì?

IHostedService là một interface được sử dụng trong ASP.NET Core để triển khai các tác vụ nền chạy dài được quản lý bởi vòng đời ứng dụng. Khi ứng dụng của bạn khởi động, nó sẽ khởi động tất cả các ứng dụng IHostedService đã đăng ký. Khi tắt, nó sẽ hướng dẫn chúng dừng lại.

BackgroundService là một lớp trừu tượng cơ sở triển khai IHostedService. Nó cung cấp cho bạn một cách hiệu quả hơn để tạo một dịch vụ định thời bằng cách cung cấp một phương thức có thể ghi đè duy nhất ExecuteAsync(CancellationToken stoppingToken). Chúng ta sẽ sử dụng điều này làm cơ sở cho bộ xử lý công việc của mình.

Cấu trúc Producer/Consumer

Producer (Nhà sản xuất): Đây là phần ứng dụng của bạn tạo ra các công việc và thêm chúng vào hàng đợi. Trong ví dụ của chúng ta, producer sẽ là một API Controller.

Consumer (Người tiêu dùng): Đây là một dịch vụ nền liên tục giám sát hàng đợi, lấy công việc từ hàng đợi và thực thi chúng. Ứng dụng BackgroundService của chúng ta sẽ là consumer.

Queue (Hàng đợi): Đây là cấu trúc dữ liệu giữa producer và consumer chứa các công việc đang chờ được xử lý.

Tại sao nên sử dụng System.Threading.Channels?

Được giới thiệu trong .NET Core 3.0, System.Threading.Channels cung cấp một cấu trúc dữ liệu đồng bộ hóa được thiết kế cho các kịch bản producer, consumer không đồng bộ.

  • An toàn luồng: Xử lý tất cả các thao tác khóa và đồng bộ hóa phức tạp bên trong, làm cho nó có thể sử dụng đồng thời từ nhiều luồng.
  • Async Native: Được thiết kế để sử dụng với async/await, ngăn chặn việc chặn luồng.
  • Hiệu suất: Được tối ưu hóa cho tốc độ và phân bổ bộ nhớ thấp.

Triển khai Hàng đợi Công việc Nền

Bây giờ chúng ta đã hiểu kiến trúc, chúng ta có thể triển khai hàng đợi công việc nền.

Tạo Project

Đầu tiên, tạo một project ASP.NET Core Web API mới sử dụng .NET CLI.

dotnet new sln -n BackgroundJobQueue

dotnet new webapi -n BackgroundJobQueue.Api

dotnet sln BackgroundJobQueue.sln add BackgroundJobQueue.Api/BackgroundJobQueue.Api.csproj

Định nghĩa Interface cho Hàng đợi Công việc

Để dependency injection và khả năng kiểm thử, chúng ta sẽ bắt đầu bằng cách định nghĩa một interface cho hàng đợi của mình. Interface này sẽ định nghĩa các phương thức để thêm một công việc (EnqueueAsync) và loại bỏ một công việc (DequeueAsync).

Tạo một file mới có tên IBackgroundTaskQueue.cs.

namespace BackgroundJobQueue;

public interface IBackgroundTaskQueue
{
// Thêm một mục công việc vào hàng đợi
ValueTask EnqueueAsync(Func<CancellationToken, ValueTask> workItem);

// Xóa và trả về một mục công việc từ hàng đợi
ValueTask<Func<CancellationToken, ValueTask>> DequeueAsync(CancellationToken cancellationToken);
}

Triển khai Dịch vụ Hàng đợi với Channels

Bây giờ, hãy triển khai interface IBackgroundTaskQueue bằng cách sử dụng lớp System.Threading.Channels. Lớp này sẽ quản lý channel trong bộ nhớ.

Tạo một file mới có tên BackgroundTaskQueue.cs.

using System.Threading.Channels;

namespace BackgroundJobQueue;

public class BackgroundTaskQueue : IBackgroundTaskQueue
{
private readonly Channel<Func<CancellationToken, ValueTask>> _queue;

public BackgroundTaskQueue(int capacity)
{
// BoundedChannelOptions xác định hành vi của channel
var options = new BoundedChannelOptions(capacity)
{
// FullMode.Wait yêu cầu writer chờ không đồng bộ nếu hàng đợi đầy
FullMode = BoundedChannelFullMode.Wait
};
_queue = Channel.CreateBounded<Func<CancellationToken, ValueTask>>(options);
}

public async ValueTask EnqueueAsync(Func<CancellationToken, ValueTask> workItem)
{
if (workItem is null)
{
throw new ArgumentNullException(nameof(workItem));
}

// Ghi một mục vào channel. Nếu channel đầy,
await _queue.Writer.WriteAsync(workItem);
}

public async ValueTask<Func<CancellationToken, ValueTask>> DequeueAsync(CancellationToken cancellationToken)
{
// Đọc một mục từ channel. Nếu channel trống,
var workItem = await _queue.Reader.ReadAsync(cancellationToken);

return workItem;
}
}

Tạo Dịch vụ Consumer (QueuedHostedService)

Đây là consumer của chúng ta. Nó là một BackgroundService liên tục lấy công việc từ hàng đợi và thực thi chúng.

Tạo một file mới có tên QueuedHostedService.cs.

namespace BackgroundJobQueue;

public class QueuedHostedService : BackgroundService
{
private readonly IBackgroundTaskQueue _taskQueue;
private readonly ILogger<QueuedHostedService> _logger;

public QueuedHostedService(IBackgroundTaskQueue taskQueue, ILogger<QueuedHostedService> logger)
{
_taskQueue = taskQueue;
_logger = logger;
}

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_logger.LogInformation("Queued Hosted Service đang chạy.");

while (!stoppingToken.IsCancellationRequested)
{
// Lấy một mục công việc từ hàng đợi
var workItem = await _taskQueue.DequeueAsync(stoppingToken);

try
{
// Thực thi mục công việc
await workItem(stoppingToken);
}
catch (Exception ex)
{
_logger.LogError(ex, "Lỗi xảy ra khi thực thi {WorkItem}.", nameof(workItem));
}
}

_logger.LogInformation("Queued Hosted Service đang dừng.");
}
}

Khối try-catch ở đây rất quan trọng. Nó đảm bảo rằng nếu một công việc thất bại với một ngoại lệ, nó sẽ không làm sập toàn bộ dịch vụ nền.

Đăng ký Dịch vụ trong Program.cs

Tiếp theo, chúng ta cần đăng ký hàng đợi và dịch vụ được lưu trú trong container dependency injection bên trong Program.cs.

using BackgroundJobQueue;

var builder = WebApplication.CreateBuilder(args);

//...

// Đăng ký hàng đợi công việc nền dưới dạng Singleton
builder.Services.AddSingleton<IBackgroundTaskQueue>(_ =>
{
// Bạn có thể cấu hình dung lượng của hàng đợi ở đây
if (!int.TryParse(builder.Configuration["QueueCapacity"], out var queueCapacity))
{
queueCapacity = 100;
}
return new BackgroundTaskQueue(queueCapacity);
});

// Đăng ký dịch vụ được lưu trú
builder.Services.AddHostedService<QueuedHostedService>();

var app = builder.Build();

//...

app.Run();

Chúng ta đăng ký IBackgroundTaskQueue dưới dạng Singleton vì chúng ta cần một instance hàng đợi duy nhất, được chia sẻ cho toàn bộ ứng dụng.

Tạo một Producer (API Controller)

Cuối cùng, hãy tạo một producer. Đây sẽ là một API controller đơn giản với một endpoint đưa một công việc nền mới vào hàng đợi.

Tạo một controller mới có tên JobsController.cs.

using Microsoft.AspNetCore.Mvc;

namespace BackgroundJobQueue.Controllers;

[ApiController]
[Route("[controller]")]
public class JobsController : ControllerBase
{
private readonly IBackgroundTaskQueue _queue;
private readonly ILogger<JobsController> _logger;

public JobsController(IBackgroundTaskQueue queue, ILogger<JobsController> logger)
{
_queue = queue;
_logger = logger;
}

[HttpPost]
public async Task<IActionResult> EnqueueJob()
{
// Đưa một công việc mô phỏng tác vụ chạy dài vào hàng đợi
await _queue.EnqueueAsync(async token =>
{
// Mô phỏng một tác vụ 5 giây
var guid = Guid.NewGuid();
_logger.LogInformation("Công việc {Guid} đã bắt đầu.", guid);
await Task.Delay(TimeSpan.FromSeconds(5), token);
_logger.LogInformation("Công việc {Guid} đã hoàn thành.", guid);
});

return Ok("Công việc đã được đưa vào hàng đợi.");
}
}

Bây giờ, khi bạn chạy ứng dụng của mình và gửi một yêu cầu POST đến /jobs, API sẽ phản hồi ngay lập tức với “Công việc đã được đưa vào hàng đợi.” trong khi tác vụ 5 giây chạy ở nền. Bạn sẽ thấy các tin nhắn log xuất hiện trong console của mình sau độ trễ.

Các Vấn đề Quan trọng và Chủ đề Nâng cao

Shutdown

CancellationToken được truyền đến ExecuteAsync rất quan trọng. Host kích hoạt token này khi bạn dừng ứng dụng của mình. Điều kiện vòng lặp while (!stoppingToken.IsCancellationRequested) và việc truyền token đến DequeueAsync khiến dịch vụ dừng lắng nghe các mục mới và thoát.

Xử lý Lỗi

Khối try-catch của chúng ta ngăn chặn một công việc thất bại duy nhất làm sập consumer. Đối với các kịch bản sản xuất, bạn có thể xem xét một chiến lược xử lý lỗi nâng cao hơn, chẳng hạn như cơ chế thử lại. Các thư viện như Polly có thể được tích hợp ở đây để tự động thử lại các công việc thất bại với các chính sách như fallback.

Giới hạn của Hàng đợi Trong Bộ nhớ

Nhược điểm lớn nhất của cách tiếp cận này là hàng đợi chỉ tồn tại trong bộ nhớ. Nếu quy trình ứng dụng của bạn khởi động lại hoặc sập vì bất kỳ lý do gì, bất kỳ công việc nào đang chờ trong hàng đợi sẽ bị mất. Giải pháp này tốt nhất cho các tác vụ không quan trọng hoặc idempotent.

Khi nào Bạn Nên Sử dụng Các Thư viện Bên ngoài?

Được xây dựng từ đầu, giải pháp này rất mạnh mẽ, nhưng điều quan trọng là phải biết khi nào cần chuyển sang các công cụ tiên tiến hơn.

  • Hangfire: Sử dụng Hangfire khi bạn cần độ bền công việc (lưu công việc vào cơ sở dữ liệu), tự động thử lại, bảng điều khiển quản lý, công việc được lên lịch và thực thi trì hoãn.
  • RabbitMQ: Khi xây dựng một hệ thống phân tán (chẳng hạn như kiến trúc microservice), bạn có thể sử dụng một message broker tùy chỉnh. Các broker này cung cấp các hàng đợi có thể mở rộng cách ly các dịch vụ của bạn và đảm bảo giao hàng tin nhắn.

Câu hỏi Thường gặp

Q1: Điều gì xảy ra với các công việc trong hàng đợi nếu ứng dụng của tôi khởi động lại hoặc sập?

A: Vì đây là hàng đợi trong bộ nhớ, bất kỳ công việc chưa xử lý nào sẽ bị mất khi ứng dụng đóng hoặc sập. Giải pháp này phù hợp nhất cho các tác vụ không quan trọng hoặc idempotent. Để đảm bảo tính liên tục kinh doanh, bạn nên sử dụng giải pháp lưu trữ công việc trong cơ sở dữ liệu hoặc message broker, chẳng hạn như Hangfire hoặc RabbitMQ.

Q2: Làm thế nào tôi có thể triển khai logic thử lại cho các công việc thất bại do ngoại lệ?

A: Khối try-catch trong QueuedHostedService ngăn chặn toàn bộ dịch vụ nền bị sập, nhưng nó không tự động thử lại công việc thất bại. Bạn có thể tích hợp một thư viện như Polly để thêm khả năng thử lại mạnh mẽ. Bạn có thể bọc lệnh gọi await workItem(stoppingToken); trong chính sách thử lại của Polly để tự động chạy lại công việc một số lần đã cấu hình với độ trễ.

Q3: Điều này có hoạt động trong môi trường cân bằng tải với nhiều instance máy chủ không?

A: Không, triển khai này chỉ dành riêng cho instance. Hàng đợi chỉ tồn tại trong bộ nhớ của instance máy chủ nhận được yêu cầu API. Nếu bạn triển khai ứng dụng của mình trên nhiều máy chủ, một công việc được đưa vào hàng đợi trên Máy chủ A sẽ chỉ được xử lý bởi Máy chủ A. Đối với một hàng đợi chia sẻ có thể được xử lý bởi bất kỳ máy chủ nào trong môi trường web, bạn nên sử dụng một message broker phân tán tập trung như RabbitMQ, Azure Service Bus.

Q4: Điều gì xảy ra nếu các công việc được thêm vào hàng đợi nhanh hơn dịch vụ nền có thể xử lý chúng?

A: Ứng dụng này sử dụng BoundedChannel với dung lượng cố định. Khi hàng đợi đầy, việc đặt FullMode = BoundedChannelFullMode.Wait khiến producer (API controller) chờ không đồng bộ cho đến khi có chỗ trống trong hàng đợi. Điều này tạo ra “backpressure” ngăn ứng dụng của bạn hết bộ nhớ.

Q5: Làm thế nào tôi có thể giám sát số lượng công việc hiện tại trong hàng đợi?

A: Để giám sát cơ bản, bạn có thể tiêm IBackgroundTaskQueue vào một dịch vụ và ghi lại số lượng hàng đợi định kỳ. Cách tiếp cận nâng cao hơn là tạo một endpoint metrics tùy chỉnh.

Kết luận

Chúng ta đã xây dựng một hàng đợi công việc nền trong bộ nhớ hiệu suất cao trong ASP.NET Core chỉ sử dụng các tính năng framework tích hợp. Đây là một cách để tăng khả năng phản hồi API của bạn và cải thiện trải nghiệm người dùng bằng cách vô hiệu hóa các tác vụ chạy dài.

Sử dụng IHostedServiceSystem.Threading.Channels, chúng ta đã tạo ra một triển khai hiệu quả của mô hình producer, consumer. Mặc dù cách tiếp cận trong bộ nhớ này có một số hạn chế, nhưng nó là một công cụ mạnh mẽ cho nhiều kịch bản phổ biến. Khi nhu cầu của bạn phát triển, bạn có thể xem xét các công cụ nhiều tính năng hơn như Hangfire hoặc RabbitMQ, biết rằng bạn đã nắm vững các nguyên tắc cơ bản.

Chỉ mục