news 2026/1/31 20:15:50

【RabbitMQ】与ASP.NET Core集成

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
【RabbitMQ】与ASP.NET Core集成

本章目标

掌握在ASP.NET Core中配置和依赖注入RabbitMQ服务。

学习使用IHostedService/BackgroundService实现常驻消费者服务。

实现基于RabbitMQ的请求-响应模式。

构建完整的微服务间异步通信解决方案。

学习配置管理和健康检查。

一、理论部分

1. ASP.NET Core集成模式

将RabbitMQ集成到ASP.NET Core应用程序时,我们需要考虑几个关键方面:

依赖注入:正确管理连接和通道的生命周期。

托管服务:实现后台消息消费者。

配置管理:从配置文件读取RabbitMQ连接设置。

健康检查:监控RabbitMQ连接状态。

日志记录:使用ASP.NET Core的日志系统。

2. 生命周期管理

IConnection:建议注册为单例,因为创建TCP连接开销大。

IModel:建议注册为瞬态或作用域,因为通道不是线程安全的。

生产者服务:可以注册为作用域或瞬态。

消费者服务:通常在托管服务中管理。

3. 托管服务(Hosted Services)

ASP.NET Core提供了IHostedService接口和BackgroundService基类,用于实现长时间运行的后台任务。这是实现RabbitMQ消费者的理想方式。

4. 微服务架构中的消息模式

异步命令:发送指令但不期待立即响应。

事件通知:广播状态变化。

请求-响应:类似RPC,但通过消息中间件。

二、实操部分:构建订单处理微服务

我们将创建一个完整的订单处理系统,包含:

Order.API:接收HTTP订单请求,发布消息

OrderProcessor.BackgroundService:后台处理订单

订单状态查询API

健康检查

配置管理

第1步:创建项目结构

复制代码

# 创建解决方案

dotnet new sln -n OrderSystem

# 创建Web API项目

dotnet new webapi -n Order.API

dotnet new classlib -n Order.Core

dotnet new classlib -n Order.Infrastructure

dotnet new classlib -n OrderProcessor.Service

# 添加到解决方案

dotnet sln add Order.API/Order.API.csproj

dotnet sln add Order.Core/Order.Core.csproj

dotnet sln add Order.Infrastructure/Order.Infrastructure.csproj

dotnet sln add OrderProcessor.Service/OrderProcessor.Service.csproj

# 添加项目引用

dotnet add Order.API reference Order.Core

dotnet add Order.API reference Order.Infrastructure

dotnet add OrderProcessor.Service reference Order.Core

dotnet add OrderProcessor.Service reference Order.Infrastructure

dotnet add Order.Infrastructure reference Order.Core

# 添加NuGet包

cd Order.API

dotnet add package RabbitMQ.Client

dotnet add package Microsoft.Extensions.Diagnostics.HealthChecks

cd ../Order.Infrastructure

dotnet add package RabbitMQ.Client

dotnet add package Microsoft.Extensions.Configuration

cd ../OrderProcessor.Service

dotnet add package RabbitMQ.Client

复制代码

第2步:定义领域模型(Order.Core)

Models/Order.cs

复制代码

namespace Order.Core.Models

{

public class Order

{

public string Id { get; set; } = Guid.NewGuid().ToString();

public string CustomerId { get; set; }

public string ProductId { get; set; }

public int Quantity { get; set; }

public decimal TotalAmount { get; set; }

public OrderStatus Status { get; set; } = OrderStatus.Pending;

public DateTime CreatedAt { get; set; } = DateTime.UtcNow;

public DateTime? ProcessedAt { get; set; }

}

public enum OrderStatus

{

Pending,

Processing,

Completed,

Failed,

Cancelled

}

}

复制代码

Messages/OrderMessage.cs

复制代码

namespace Order.Core.Messages

{

public class OrderMessage

{

public string OrderId { get; set; }

public string CustomerId { get; set; }

public string ProductId { get; set; }

public int Quantity { get; set; }

public decimal TotalAmount { get; set; }

public string Action { get; set; } // "create", "cancel"

}

public class OrderStatusMessage

{

public string OrderId { get; set; }

public OrderStatus Status { get; set; }

public string Message { get; set; }

public DateTime Timestamp { get; set; } = DateTime.UtcNow;

}

}

复制代码

第3步:基础设施层(Order.Infrastructure)

Services/IRabbitMQConnection.cs

复制代码

using RabbitMQ.Client;

namespace Order.Infrastructure.Services

{

public interface IRabbitMQConnection : IDisposable

{

bool IsConnected { get; }

IModel CreateModel();

bool TryConnect();

}

}

复制代码

Services/RabbitMQConnection.cs

复制代码

using System.Net.Sockets;

using Microsoft.Extensions.Logging;

using RabbitMQ.Client;

using RabbitMQ.Client.Events;

using RabbitMQ.Client.Exceptions;

namespace Order.Infrastructure.Services

{

public class RabbitMQConnection : IRabbitMQConnection

{

private readonly IConnectionFactory _connectionFactory;

private readonly ILogger<RabbitMQConnection> _logger;

private IConnection _connection;

private bool _disposed;

private readonly object _syncRoot = new object();

public RabbitMQConnection(IConnectionFactory connectionFactory, ILogger<RabbitMQConnection> logger)

{

_connectionFactory = connectionFactory;

_logger = logger;

}

public bool IsConnected => _connection != null && _connection.IsOpen && !_disposed;

public IModel CreateModel()

{

if (!IsConnected)

{

throw new InvalidOperationException("No RabbitMQ connections are available to perform this action");

}

return _connection.CreateModel();

}

public bool TryConnect()

{

lock (_syncRoot)

{

if (IsConnected) return true;

_logger.LogInformation("RabbitMQ Client is trying to connect");

try

{

_connection = _connectionFactory.CreateConnection();

_connection.ConnectionShutdown += OnConnectionShutdown;

_connection.CallbackException += OnCallbackException;

_connection.ConnectionBlocked += OnConnectionBlocked;

_logger.LogInformation("RabbitMQ Client acquired a persistent connection to '{HostName}' and is subscribed to failure events",

_connectionFactory.HostName);

return true;

}

catch (BrokerUnreachableException ex)

{

_logger.LogError(ex, "RabbitMQ connection failed: {Message}", ex.Message);

return false;

}

catch (SocketException ex)

{

_logger.LogError(ex, "RabbitMQ connection failed: {Message}", ex.Message);

return false;

}

}

}

private void OnConnectionBlocked(object sender, ConnectionBlockedEventArgs e)

{

if (_disposed) return;

_logger.LogWarning("A RabbitMQ connection is blocked. Reason: {Reason}", e.Reason);

// 这里可以实现重连逻辑

TryConnect();

}

private void OnCallbackException(object sender, CallbackExceptionEventArgs e)

{

if (_disposed) return;

_logger.LogWarning(e.Exception, "A RabbitMQ connection throw exception. Trying to re-connect...");

// 这里可以实现重连逻辑

TryConnect();

}

private void OnConnectionShutdown(object sender, ShutdownEventArgs reason)

{

if (_disposed) return;

_logger.LogWarning("A RabbitMQ connection is on shutdown. Trying to re-connect...");

// 这里可以实现重连逻辑

TryConnect();

}

public void Dispose()

{

if (_disposed) return;

_disposed = true;

try

{

_connection?.Dispose();

}

catch (IOException ex)

{

_logger.LogCritical(ex, "Error disposing RabbitMQ connection");

}

}

}

}

复制代码

Services/IOrderPublisher.cs

复制代码

using Order.Core.Messages;

namespace Order.Infrastructure.Services

{

public interface IOrderPublisher

{

Task PublishOrderCreatedAsync(OrderMessage order);

Task PublishOrderStatusAsync(OrderStatusMessage status);

}

}

复制代码

Services/OrderPublisher.cs

复制代码

using System.Text;

using System.Text.Json;

using Microsoft.Extensions.Logging;

using Order.Core.Messages;

using RabbitMQ.Client;

namespace Order.Infrastructure.Services

{

public class OrderPublisher : IOrderPublisher

{

private readonly IRabbitMQConnection _connection;

private readonly ILogger<OrderPublisher> _logger;

private const string ExchangeName = "order.events";

private const string OrderCreatedRoutingKey = "order.created";

private const string OrderStatusRoutingKey = "order.status";

public OrderPublisher(IRabbitMQConnection connection, ILogger<OrderPublisher> logger)

{

_connection = connection;

_logger = logger;

// 确保交换机和队列存在

InitializeInfrastructure();

}

private void InitializeInfrastructure()

{

using var channel = _connection.CreateModel();

// 声明主题交换机

channel.ExchangeDeclare(ExchangeName, ExchangeType.Topic, durable: true);

// 声明订单创建队列

channel.QueueDeclare("order.created.queue", durable: true, exclusive: false, autoDelete: false);

channel.QueueBind("order.created.queue", ExchangeName, OrderCreatedRoutingKey);

// 声明订单状态队列

channel.QueueDeclare("order.status.queue", durable: true, exclusive: false, autoDelete: false);

channel.QueueBind("order.status.queue", ExchangeName, OrderStatusRoutingKey);

_logger.LogInformation("RabbitMQ infrastructure initialized");

}

public async Task PublishOrderCreatedAsync(OrderMessage order)

{

await PublishMessageAsync(order, OrderCreatedRoutingKey, "OrderCreated");

}

public async Task PublishOrderStatusAsync(OrderStatusMessage status)

{

await PublishMessageAsync(status, OrderStatusRoutingKey, "OrderStatus");

}

private async Task PublishMessageAsync<T>(T message, string routingKey, string messageType)

{

if (!_connection.IsConnected)

{

_connection.TryConnect();

}

using var channel = _connection.CreateModel();

var json = JsonSerializer.Serialize(message);

var body = Encoding.UTF8.GetBytes(json);

var properties = channel.CreateBasicProperties();

properties.Persistent = true;

properties.ContentType = "application/json";

properties.Type = messageType;

try

{

channel.BasicPublish(

exchange: ExchangeName,

routingKey: routingKey,

mandatory: true,

basicProperties: properties,

body: body);

_logger.LogInformation("Published {MessageType} message for Order {OrderId}",

messageType, GetOrderId(message));

}

catch (Exception ex)

{

_logger.LogError(ex, "Error publishing {MessageType} message for Order {OrderId}",

messageType, GetOrderId(message));

throw;

}

await Task.CompletedTask;

}

private static string GetOrderId<T>(T message)

{

return message switch

{

OrderMessage order => order.OrderId,

OrderStatusMessage status => status.OrderId,

_ => "unknown"

};

}

}

}

复制代码

第4步:Order.API项目配置

appsettings.json

复制代码

{

"RabbitMQ": {

"HostName": "localhost",

"UserName": "myuser",

"Password": "mypassword",

"Port": 5672,

"VirtualHost": "/"

},

"Logging": {

"LogLevel": {

"Default": "Information",

"Microsoft.AspNetCore": "Warning"

}

},

"AllowedHosts": "*"

}

复制代码

Program.cs

复制代码

using Order.API.Controllers;

using Order.API.Services;

using Order.Core.Models;

using Order.Infrastructure.Services;

using RabbitMQ.Client;

var builder = WebApplication.CreateBuilder(args);

// Add services to the container.

builder.Services.AddControllers();

builder.Services.AddEndpointsApiExplorer();

builder.Services.AddSwaggerGen();

// Configure RabbitMQ

builder.Services.AddSingleton<IConnectionFactory>(sp =>

{

var configuration = sp.GetRequiredService<IConfiguration>();

return new ConnectionFactory

{

HostName = configuration["RabbitMQ:HostName"],

UserName = configuration["RabbitMQ:UserName"],

Password = configuration["RabbitMQ:Password"],

Port = int.Parse(configuration["RabbitMQ:Port"] ?? "5672"),

VirtualHost = configuration["RabbitMQ:VirtualHost"] ?? "/",

DispatchConsumersAsync = true

};

});

// Register RabbitMQ services

builder.Services.AddSingleton<IRabbitMQConnection, RabbitMQConnection>();

builder.Services.AddScoped<IOrderPublisher, OrderPublisher>();

builder.Services.AddScoped<IOrderService, OrderService>();

// Add Health Checks

builder.Services.AddHealthChecks()

.AddRabbitMQ(provider =>

{

var factory = provider.GetRequiredService<IConnectionFactory>();

return factory.CreateConnection();

}, name: "rabbitmq");

// Add hosted service for status updates consumer

builder.Services.AddHostedService<OrderStatusConsumerService>();

var app = builder.Build();

// Configure the HTTP request pipeline.

if (app.Environment.IsDevelopment())

{

app.UseSwagger();

app.UseSwaggerUI();

}

app.UseHttpsRedirection();

app.UseAuthorization();

app.MapControllers();

// Add health check endpoint

app.MapHealthChecks("/health");

app.Run();

复制代码

Services/IOrderService.cs

复制代码

using Order.Core.Models;

namespace Order.API.Services

{

public interface IOrderService

{

Task<Order> CreateOrderAsync(string customerId, string productId, int quantity, decimal unitPrice);

Task<Order?> GetOrderAsync(string orderId);

Task UpdateOrderStatusAsync(string orderId, OrderStatus status);

}

}

复制代码

Services/OrderService.cs

View Code

Services/OrderStatusConsumerService.cs

View Code

Controllers/OrdersController.cs

View Code

第5步:订单处理器服务(OrderProcessor.Service)

Program.cs

View Code

Services/OrderProcessorService.cs

View Code

第6步:运行与测试

启动服务

复制代码

# 终端1:启动Order.API

cd Order.API

dotnet run

# 终端2:启动OrderProcessor.Service

cd OrderProcessor.Service

dotnet run

复制代码

测试API

复制代码

# 创建订单

curl -X POST "https://localhost:7000/api/orders" \

-H "Content-Type: application/json" \

-d '{

"customerId": "customer-123",

"productId": "product-456",

"quantity": 2,

"unitPrice": 29.99

}'

# 查询订单状态

curl "https://localhost:7000/api/orders/{orderId}"

复制代码

测试健康检查

GET https://localhost:7000/health

观察日志输出

Order.API:接收HTTP请求,发布订单创建消息

OrderProcessor.Service:消费订单消息,处理业务逻辑,发布状态更新

Order.API:消费状态更新消息

测试错误场景

停止RabbitMQ服务,观察重连机制

停止OrderProcessor.Service,观察消息堆积

重启服务,观察消息恢复处理

第7步:高级特性 - 配置重试和 resilience

在Order.Infrastructure中添加Polly支持:

复制代码

// 添加NuGet包

dotnet add package Polly

dotnet add package Microsoft.Extensions.Http.Polly

// 在Program.cs中添加重试策略

builder.Services.AddHttpClient("retry-client")

.AddTransientHttpErrorPolicy(policy =>

policy.WaitAndRetryAsync(3, retryAttempt =>

TimeSpan.FromSeconds(Math.Pow(2, retryAttempt))));

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/1/22 14:32:50

芜湖,千兆网络下载速率只有10MB秒,过的什么苦日子

第一坑&#xff1a;百度网盘的“灵魂限速”果然&#xff0c;下载链接指向了那个让人又爱又恨的百度网盘。非会员的下载速度&#xff1f;稳定在100KB/秒左右&#xff0c;好家伙&#xff0c;算下来得下一整天……我是那种坐以待毙的人吗&#xff1f;当然不&#xff01;我默默打开…

作者头像 李华
网站建设 2026/1/25 22:02:03

AI一周大事盘点(2025年12月14日~2025年12月20日)

【摘要】2025年12月第三周&#xff0c;全球AI领域呈现出三大核心趋势&#xff1a;首先&#xff0c;模型技术层面&#xff0c;以谷歌Gemini 3 Flash为代表的高性价比轻量级模型实现关键突破&#xff0c;为智能体&#xff08;Agent&#xff09;大规模应用奠定基础&#xff0c;同时…

作者头像 李华
网站建设 2026/1/29 0:04:07

K3s + Sysbox:让容器拥有“虚拟机的灵魂”

Containerd 与 Runc 的关系首先&#xff0c;让我们简要了解一下 containerd 是如何与 runc 协作的。containerd 是一个常驻的守护进程&#xff0c;主要负责以下任务&#xff1a;镜像管理&#xff1a;从镜像仓库拉取并存储镜像。容器管理&#xff1a;管理容器生命周期&#xff0…

作者头像 李华
网站建设 2026/1/29 22:43:22

8 个降AI率工具推荐,继续教育学生必备

8 个降AI率工具推荐&#xff0c;继续教育学生必备 AI降重工具&#xff0c;让论文更自然更安心 随着人工智能技术的不断进步&#xff0c;越来越多的学生和研究人员在撰写论文时会借助AI工具进行辅助。然而&#xff0c;AI生成的内容往往存在明显的痕迹&#xff0c;容易被查重系统…

作者头像 李华
网站建设 2026/1/26 12:35:05

从开发一个AI美女聊天群组开始

ramework。很多开发者可能会有疑问&#xff1a;为什么微软要推出这么多框架&#xff1f;它们之间有什么区别&#xff1f;本文将通过一个实际的AI美女聊天群组项目&#xff0c;带你深入理解Microsoft Agent Framework&#xff0c;掌握多智能体开发的核心概念。本文的示例代码已开…

作者头像 李华