本章目标
掌握在ASP.NET Core中配置和依赖注入RabbitMQ服务。
学习使用IHostedService/BackgroundService实现常驻消费者服务。
实现基于RabbitMQ的请求-响应模式。
构建完整的微服务间异步通信解决方案。
学习配置管理和健康检查。
一、理论部分
1. ASP.NET Core集成模式
将RabbitMQ集成到ASP.NET Core应用程序时,我们需要考虑几个关键方面:
依赖注入:正确管理连接和通道的生命周期。
托管服务:实现后台消息消费者。
配置管理:从配置文件读取RabbitMQ连接设置。
健康检查:监控RabbitMQ连接状态。
日志记录:使用ASP.NET Core的日志系统。
2. 生命周期管理
IConnection:建议注册为单例,因为创建TCP连接开销大。
IModel:建议注册为瞬态或作用域,因为通道不是线程安全的。
生产者服务:可以注册为作用域或瞬态。
消费者服务:通常在托管服务中管理。
3. 托管服务(Hosted Services)
ASP.NET Core提供了IHostedService接口和BackgroundService基类,用于实现长时间运行的后台任务。这是实现RabbitMQ消费者的理想方式。
4. 微服务架构中的消息模式
异步命令:发送指令但不期待立即响应。
事件通知:广播状态变化。
请求-响应:类似RPC,但通过消息中间件。
二、实操部分:构建订单处理微服务
我们将创建一个完整的订单处理系统,包含:
Order.API:接收HTTP订单请求,发布消息
OrderProcessor.BackgroundService:后台处理订单
订单状态查询API
健康检查
配置管理
第1步:创建项目结构
复制代码
# 创建解决方案
dotnet new sln -n OrderSystem
# 创建Web API项目
dotnet new webapi -n Order.API
dotnet new classlib -n Order.Core
dotnet new classlib -n Order.Infrastructure
dotnet new classlib -n OrderProcessor.Service
# 添加到解决方案
dotnet sln add Order.API/Order.API.csproj
dotnet sln add Order.Core/Order.Core.csproj
dotnet sln add Order.Infrastructure/Order.Infrastructure.csproj
dotnet sln add OrderProcessor.Service/OrderProcessor.Service.csproj
# 添加项目引用
dotnet add Order.API reference Order.Core
dotnet add Order.API reference Order.Infrastructure
dotnet add OrderProcessor.Service reference Order.Core
dotnet add OrderProcessor.Service reference Order.Infrastructure
dotnet add Order.Infrastructure reference Order.Core
# 添加NuGet包
cd Order.API
dotnet add package RabbitMQ.Client
dotnet add package Microsoft.Extensions.Diagnostics.HealthChecks
cd ../Order.Infrastructure
dotnet add package RabbitMQ.Client
dotnet add package Microsoft.Extensions.Configuration
cd ../OrderProcessor.Service
dotnet add package RabbitMQ.Client
复制代码
第2步:定义领域模型(Order.Core)
Models/Order.cs
复制代码
namespace Order.Core.Models
{
public class Order
{
public string Id { get; set; } = Guid.NewGuid().ToString();
public string CustomerId { get; set; }
public string ProductId { get; set; }
public int Quantity { get; set; }
public decimal TotalAmount { get; set; }
public OrderStatus Status { get; set; } = OrderStatus.Pending;
public DateTime CreatedAt { get; set; } = DateTime.UtcNow;
public DateTime? ProcessedAt { get; set; }
}
public enum OrderStatus
{
Pending,
Processing,
Completed,
Failed,
Cancelled
}
}
复制代码
Messages/OrderMessage.cs
复制代码
namespace Order.Core.Messages
{
public class OrderMessage
{
public string OrderId { get; set; }
public string CustomerId { get; set; }
public string ProductId { get; set; }
public int Quantity { get; set; }
public decimal TotalAmount { get; set; }
public string Action { get; set; } // "create", "cancel"
}
public class OrderStatusMessage
{
public string OrderId { get; set; }
public OrderStatus Status { get; set; }
public string Message { get; set; }
public DateTime Timestamp { get; set; } = DateTime.UtcNow;
}
}
复制代码
第3步:基础设施层(Order.Infrastructure)
Services/IRabbitMQConnection.cs
复制代码
using RabbitMQ.Client;
namespace Order.Infrastructure.Services
{
public interface IRabbitMQConnection : IDisposable
{
bool IsConnected { get; }
IModel CreateModel();
bool TryConnect();
}
}
复制代码
Services/RabbitMQConnection.cs
复制代码
using System.Net.Sockets;
using Microsoft.Extensions.Logging;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using RabbitMQ.Client.Exceptions;
namespace Order.Infrastructure.Services
{
public class RabbitMQConnection : IRabbitMQConnection
{
private readonly IConnectionFactory _connectionFactory;
private readonly ILogger<RabbitMQConnection> _logger;
private IConnection _connection;
private bool _disposed;
private readonly object _syncRoot = new object();
public RabbitMQConnection(IConnectionFactory connectionFactory, ILogger<RabbitMQConnection> logger)
{
_connectionFactory = connectionFactory;
_logger = logger;
}
public bool IsConnected => _connection != null && _connection.IsOpen && !_disposed;
public IModel CreateModel()
{
if (!IsConnected)
{
throw new InvalidOperationException("No RabbitMQ connections are available to perform this action");
}
return _connection.CreateModel();
}
public bool TryConnect()
{
lock (_syncRoot)
{
if (IsConnected) return true;
_logger.LogInformation("RabbitMQ Client is trying to connect");
try
{
_connection = _connectionFactory.CreateConnection();
_connection.ConnectionShutdown += OnConnectionShutdown;
_connection.CallbackException += OnCallbackException;
_connection.ConnectionBlocked += OnConnectionBlocked;
_logger.LogInformation("RabbitMQ Client acquired a persistent connection to '{HostName}' and is subscribed to failure events",
_connectionFactory.HostName);
return true;
}
catch (BrokerUnreachableException ex)
{
_logger.LogError(ex, "RabbitMQ connection failed: {Message}", ex.Message);
return false;
}
catch (SocketException ex)
{
_logger.LogError(ex, "RabbitMQ connection failed: {Message}", ex.Message);
return false;
}
}
}
private void OnConnectionBlocked(object sender, ConnectionBlockedEventArgs e)
{
if (_disposed) return;
_logger.LogWarning("A RabbitMQ connection is blocked. Reason: {Reason}", e.Reason);
// 这里可以实现重连逻辑
TryConnect();
}
private void OnCallbackException(object sender, CallbackExceptionEventArgs e)
{
if (_disposed) return;
_logger.LogWarning(e.Exception, "A RabbitMQ connection throw exception. Trying to re-connect...");
// 这里可以实现重连逻辑
TryConnect();
}
private void OnConnectionShutdown(object sender, ShutdownEventArgs reason)
{
if (_disposed) return;
_logger.LogWarning("A RabbitMQ connection is on shutdown. Trying to re-connect...");
// 这里可以实现重连逻辑
TryConnect();
}
public void Dispose()
{
if (_disposed) return;
_disposed = true;
try
{
_connection?.Dispose();
}
catch (IOException ex)
{
_logger.LogCritical(ex, "Error disposing RabbitMQ connection");
}
}
}
}
复制代码
Services/IOrderPublisher.cs
复制代码
using Order.Core.Messages;
namespace Order.Infrastructure.Services
{
public interface IOrderPublisher
{
Task PublishOrderCreatedAsync(OrderMessage order);
Task PublishOrderStatusAsync(OrderStatusMessage status);
}
}
复制代码
Services/OrderPublisher.cs
复制代码
using System.Text;
using System.Text.Json;
using Microsoft.Extensions.Logging;
using Order.Core.Messages;
using RabbitMQ.Client;
namespace Order.Infrastructure.Services
{
public class OrderPublisher : IOrderPublisher
{
private readonly IRabbitMQConnection _connection;
private readonly ILogger<OrderPublisher> _logger;
private const string ExchangeName = "order.events";
private const string OrderCreatedRoutingKey = "order.created";
private const string OrderStatusRoutingKey = "order.status";
public OrderPublisher(IRabbitMQConnection connection, ILogger<OrderPublisher> logger)
{
_connection = connection;
_logger = logger;
// 确保交换机和队列存在
InitializeInfrastructure();
}
private void InitializeInfrastructure()
{
using var channel = _connection.CreateModel();
// 声明主题交换机
channel.ExchangeDeclare(ExchangeName, ExchangeType.Topic, durable: true);
// 声明订单创建队列
channel.QueueDeclare("order.created.queue", durable: true, exclusive: false, autoDelete: false);
channel.QueueBind("order.created.queue", ExchangeName, OrderCreatedRoutingKey);
// 声明订单状态队列
channel.QueueDeclare("order.status.queue", durable: true, exclusive: false, autoDelete: false);
channel.QueueBind("order.status.queue", ExchangeName, OrderStatusRoutingKey);
_logger.LogInformation("RabbitMQ infrastructure initialized");
}
public async Task PublishOrderCreatedAsync(OrderMessage order)
{
await PublishMessageAsync(order, OrderCreatedRoutingKey, "OrderCreated");
}
public async Task PublishOrderStatusAsync(OrderStatusMessage status)
{
await PublishMessageAsync(status, OrderStatusRoutingKey, "OrderStatus");
}
private async Task PublishMessageAsync<T>(T message, string routingKey, string messageType)
{
if (!_connection.IsConnected)
{
_connection.TryConnect();
}
using var channel = _connection.CreateModel();
var json = JsonSerializer.Serialize(message);
var body = Encoding.UTF8.GetBytes(json);
var properties = channel.CreateBasicProperties();
properties.Persistent = true;
properties.ContentType = "application/json";
properties.Type = messageType;
try
{
channel.BasicPublish(
exchange: ExchangeName,
routingKey: routingKey,
mandatory: true,
basicProperties: properties,
body: body);
_logger.LogInformation("Published {MessageType} message for Order {OrderId}",
messageType, GetOrderId(message));
}
catch (Exception ex)
{
_logger.LogError(ex, "Error publishing {MessageType} message for Order {OrderId}",
messageType, GetOrderId(message));
throw;
}
await Task.CompletedTask;
}
private static string GetOrderId<T>(T message)
{
return message switch
{
OrderMessage order => order.OrderId,
OrderStatusMessage status => status.OrderId,
_ => "unknown"
};
}
}
}
复制代码
第4步:Order.API项目配置
appsettings.json
复制代码
{
"RabbitMQ": {
"HostName": "localhost",
"UserName": "myuser",
"Password": "mypassword",
"Port": 5672,
"VirtualHost": "/"
},
"Logging": {
"LogLevel": {
"Default": "Information",
"Microsoft.AspNetCore": "Warning"
}
},
"AllowedHosts": "*"
}
复制代码
Program.cs
复制代码
using Order.API.Controllers;
using Order.API.Services;
using Order.Core.Models;
using Order.Infrastructure.Services;
using RabbitMQ.Client;
var builder = WebApplication.CreateBuilder(args);
// Add services to the container.
builder.Services.AddControllers();
builder.Services.AddEndpointsApiExplorer();
builder.Services.AddSwaggerGen();
// Configure RabbitMQ
builder.Services.AddSingleton<IConnectionFactory>(sp =>
{
var configuration = sp.GetRequiredService<IConfiguration>();
return new ConnectionFactory
{
HostName = configuration["RabbitMQ:HostName"],
UserName = configuration["RabbitMQ:UserName"],
Password = configuration["RabbitMQ:Password"],
Port = int.Parse(configuration["RabbitMQ:Port"] ?? "5672"),
VirtualHost = configuration["RabbitMQ:VirtualHost"] ?? "/",
DispatchConsumersAsync = true
};
});
// Register RabbitMQ services
builder.Services.AddSingleton<IRabbitMQConnection, RabbitMQConnection>();
builder.Services.AddScoped<IOrderPublisher, OrderPublisher>();
builder.Services.AddScoped<IOrderService, OrderService>();
// Add Health Checks
builder.Services.AddHealthChecks()
.AddRabbitMQ(provider =>
{
var factory = provider.GetRequiredService<IConnectionFactory>();
return factory.CreateConnection();
}, name: "rabbitmq");
// Add hosted service for status updates consumer
builder.Services.AddHostedService<OrderStatusConsumerService>();
var app = builder.Build();
// Configure the HTTP request pipeline.
if (app.Environment.IsDevelopment())
{
app.UseSwagger();
app.UseSwaggerUI();
}
app.UseHttpsRedirection();
app.UseAuthorization();
app.MapControllers();
// Add health check endpoint
app.MapHealthChecks("/health");
app.Run();
复制代码
Services/IOrderService.cs
复制代码
using Order.Core.Models;
namespace Order.API.Services
{
public interface IOrderService
{
Task<Order> CreateOrderAsync(string customerId, string productId, int quantity, decimal unitPrice);
Task<Order?> GetOrderAsync(string orderId);
Task UpdateOrderStatusAsync(string orderId, OrderStatus status);
}
}
复制代码
Services/OrderService.cs
View Code
Services/OrderStatusConsumerService.cs
View Code
Controllers/OrdersController.cs
View Code
第5步:订单处理器服务(OrderProcessor.Service)
Program.cs
View Code
Services/OrderProcessorService.cs
View Code
第6步:运行与测试
启动服务
复制代码
# 终端1:启动Order.API
cd Order.API
dotnet run
# 终端2:启动OrderProcessor.Service
cd OrderProcessor.Service
dotnet run
复制代码
测试API
复制代码
# 创建订单
curl -X POST "https://localhost:7000/api/orders" \
-H "Content-Type: application/json" \
-d '{
"customerId": "customer-123",
"productId": "product-456",
"quantity": 2,
"unitPrice": 29.99
}'
# 查询订单状态
curl "https://localhost:7000/api/orders/{orderId}"
复制代码
测试健康检查
GET https://localhost:7000/health
观察日志输出
Order.API:接收HTTP请求,发布订单创建消息
OrderProcessor.Service:消费订单消息,处理业务逻辑,发布状态更新
Order.API:消费状态更新消息
测试错误场景
停止RabbitMQ服务,观察重连机制
停止OrderProcessor.Service,观察消息堆积
重启服务,观察消息恢复处理
第7步:高级特性 - 配置重试和 resilience
在Order.Infrastructure中添加Polly支持:
复制代码
// 添加NuGet包
dotnet add package Polly
dotnet add package Microsoft.Extensions.Http.Polly
// 在Program.cs中添加重试策略
builder.Services.AddHttpClient("retry-client")
.AddTransientHttpErrorPolicy(policy =>
policy.WaitAndRetryAsync(3, retryAttempt =>
TimeSpan.FromSeconds(Math.Pow(2, retryAttempt))));