znlgis 博客

GIS开发与技术分享

第十一章:事件总线

一、事件总线概述

事件总线(Event Bus)是一种用于实现组件间松耦合通信的设计模式。在 Furion 框架中,事件总线提供了强大的进程内和跨进程事件通信能力,使得应用程序各模块之间能够通过发布/订阅模式进行高效协作。

1.1 什么是事件总线

事件总线的核心思想是:发布者发布事件,订阅者订阅并处理事件,两者之间不需要直接引用。这种模式带来以下好处:

  • 松耦合:发布者和订阅者不需要知道彼此的存在
  • 可扩展性:可以轻松添加新的订阅者而不影响现有代码
  • 异步处理:事件可以异步处理,提升应用性能
  • 模块化:各模块通过事件进行通信,便于独立开发和测试

1.2 进程内事件与跨进程事件

特性 进程内事件 跨进程事件
通信范围 同一应用程序内 不同应用程序之间
传输方式 内存传递 消息队列(如 RabbitMQ、Kafka)
性能 极高 取决于消息中间件
可靠性 进程崩溃则丢失 支持持久化
适用场景 模块解耦、业务流程编排 微服务通信、分布式系统
Furion 支持 内置支持 需集成第三方中间件

1.3 注册事件总线服务

Startup.csProgram.cs 中注册事件总线:

// Program.cs (.NET 6+)
var builder = WebApplication.CreateBuilder(args);

builder.Services.AddEventBus(builder =>
{
    // 注册所有事件订阅者
    builder.AddSubscribers();
});

var app = builder.Build();
app.Run();

二、核心接口与特性

2.1 IEventPublisher 接口

IEventPublisher 是事件发布者接口,用于发布事件消息:

public interface IEventPublisher
{
    /// <summary>
    /// 发布事件
    /// </summary>
    /// <param name="eventSource">事件源</param>
    /// <returns></returns>
    Task PublishAsync(IEventSource eventSource);

    /// <summary>
    /// 延迟发布事件
    /// </summary>
    /// <param name="eventId">事件 Id</param>
    /// <param name="payload">事件载荷</param>
    /// <param name="delay">延迟时间(毫秒)</param>
    /// <returns></returns>
    Task PublishDelayAsync(string eventId, object payload, long delay);
}

2.2 IEventSubscriber 接口

IEventSubscriber 是事件订阅者标记接口,所有订阅者必须实现此接口:

public class UserEventSubscriber : IEventSubscriber
{
    private readonly ILogger<UserEventSubscriber> _logger;

    public UserEventSubscriber(ILogger<UserEventSubscriber> logger)
    {
        _logger = logger;
    }

    [EventSubscribe("user:created")]
    public async Task HandleUserCreated(EventHandlerExecutingContext context)
    {
        var user = context.Source.Payload as UserDto;
        _logger.LogInformation("新用户注册:{UserName}", user?.UserName);

        // 处理用户创建后的逻辑,如发送欢迎邮件
        await SendWelcomeEmailAsync(user);
    }

    private Task SendWelcomeEmailAsync(UserDto user)
    {
        // 发送欢迎邮件逻辑
        return Task.CompletedTask;
    }
}

2.3 EventSubscribe 特性

[EventSubscribe] 特性用于标记事件处理方法,指定监听的事件 Id:

// 基本用法
[EventSubscribe("order:created")]
public async Task OnOrderCreated(EventHandlerExecutingContext context)
{
    // 处理订单创建事件
}

// 指定执行顺序
[EventSubscribe("order:created", Order = 1)]
public async Task OnOrderCreatedFirst(EventHandlerExecutingContext context)
{
    // 优先执行
}

// 支持正则表达式匹配
[EventSubscribe("order:(.*)", FuzzyMatch = true)]
public async Task OnAnyOrderEvent(EventHandlerExecutingContext context)
{
    // 匹配所有以 "order:" 开头的事件
}

三、事件消息定义

3.1 事件源(EventSource)

事件源是事件消息的载体,包含事件 Id 和事件数据:

// 使用内置的 ChannelEventSource
var eventSource = new ChannelEventSource("user:created", new UserDto
{
    Id = 1,
    UserName = "张三",
    Email = "zhangsan@example.com"
});

// 发布事件
await _eventPublisher.PublishAsync(eventSource);

3.2 自定义事件消息

对于复杂场景,可以定义自定义事件消息类:

/// <summary>
/// 订单事件消息
/// </summary>
public class OrderEventMessage
{
    /// <summary>
    /// 订单Id
    /// </summary>
    public long OrderId { get; set; }

    /// <summary>
    /// 订单金额
    /// </summary>
    public decimal Amount { get; set; }

    /// <summary>
    /// 用户Id
    /// </summary>
    public long UserId { get; set; }

    /// <summary>
    /// 创建时间
    /// </summary>
    public DateTime CreatedTime { get; set; } = DateTime.Now;

    /// <summary>
    /// 订单状态
    /// </summary>
    public OrderStatus Status { get; set; }
}

public enum OrderStatus
{
    Pending,
    Paid,
    Shipped,
    Completed,
    Cancelled
}

四、事件发布

4.1 基本发布

[ApiDescriptionSettings("订单管理")]
public class OrderAppService : IDynamicApiController
{
    private readonly IEventPublisher _eventPublisher;
    private readonly IRepository<Order> _orderRepository;

    public OrderAppService(
        IEventPublisher eventPublisher,
        IRepository<Order> orderRepository)
    {
        _eventPublisher = eventPublisher;
        _orderRepository = orderRepository;
    }

    /// <summary>
    /// 创建订单
    /// </summary>
    public async Task<OrderDto> CreateOrder(CreateOrderInput input)
    {
        // 1. 创建订单
        var order = input.Adapt<Order>();
        var newOrder = await _orderRepository.InsertNowAsync(order);

        // 2. 发布订单创建事件
        await _eventPublisher.PublishAsync(new ChannelEventSource("order:created", new OrderEventMessage
        {
            OrderId = newOrder.Entity.Id,
            Amount = newOrder.Entity.TotalAmount,
            UserId = newOrder.Entity.UserId,
            Status = OrderStatus.Pending
        }));

        return newOrder.Entity.Adapt<OrderDto>();
    }
}

4.2 延迟发布

延迟发布允许事件在指定时间后才被触发:

// 延迟5秒发布
await _eventPublisher.PublishDelayAsync("order:timeout:check", new
{
    OrderId = orderId
}, 5000);

// 延迟30分钟检查订单支付状态
await _eventPublisher.PublishDelayAsync("order:payment:check", new
{
    OrderId = orderId,
    CreatedTime = DateTime.Now
}, 30 * 60 * 1000);

4.3 批量发布

/// <summary>
/// 批量发布事件
/// </summary>
public async Task PublishBatchEvents(List<OrderDto> orders)
{
    var tasks = orders.Select(order =>
        _eventPublisher.PublishAsync(new ChannelEventSource("order:created", order))
    );

    await Task.WhenAll(tasks);
}

五、事件订阅

5.1 基本订阅

public class OrderEventSubscriber : IEventSubscriber
{
    private readonly ILogger<OrderEventSubscriber> _logger;

    public OrderEventSubscriber(ILogger<OrderEventSubscriber> logger)
    {
        _logger = logger;
    }

    /// <summary>
    /// 处理订单创建事件
    /// </summary>
    [EventSubscribe("order:created")]
    public async Task HandleOrderCreated(EventHandlerExecutingContext context)
    {
        var order = context.Source.Payload as OrderEventMessage;
        if (order == null) return;

        _logger.LogInformation("收到订单创建事件,订单Id:{OrderId},金额:{Amount}",
            order.OrderId, order.Amount);

        // 执行业务逻辑
        await ProcessNewOrder(order);
    }

    /// <summary>
    /// 处理订单支付事件
    /// </summary>
    [EventSubscribe("order:paid")]
    public async Task HandleOrderPaid(EventHandlerExecutingContext context)
    {
        var order = context.Source.Payload as OrderEventMessage;
        _logger.LogInformation("订单已支付,订单Id:{OrderId}", order?.OrderId);

        // 通知仓库发货
        await NotifyWarehouse(order);
    }

    private Task ProcessNewOrder(OrderEventMessage order) => Task.CompletedTask;
    private Task NotifyWarehouse(OrderEventMessage order) => Task.CompletedTask;
}

5.2 同一事件多个订阅者

一个事件可以有多个订阅者,它们会按照注册顺序依次执行:

// 订阅者1:发送通知
public class NotificationSubscriber : IEventSubscriber
{
    [EventSubscribe("order:created")]
    public async Task SendNotification(EventHandlerExecutingContext context)
    {
        var order = context.Source.Payload as OrderEventMessage;
        // 发送通知给用户
        await SendPushNotification(order.UserId, "您的订单已创建成功!");
    }

    private Task SendPushNotification(long userId, string message) => Task.CompletedTask;
}

// 订阅者2:记录日志
public class AuditLogSubscriber : IEventSubscriber
{
    [EventSubscribe("order:created")]
    public async Task WriteAuditLog(EventHandlerExecutingContext context)
    {
        var order = context.Source.Payload as OrderEventMessage;
        // 写入审计日志
        await WriteLog("OrderCreated", order);
    }

    private Task WriteLog(string action, object data) => Task.CompletedTask;
}

// 订阅者3:更新统计
public class StatisticsSubscriber : IEventSubscriber
{
    [EventSubscribe("order:created")]
    public async Task UpdateStatistics(EventHandlerExecutingContext context)
    {
        var order = context.Source.Payload as OrderEventMessage;
        // 更新订单统计数据
        await UpdateOrderCount(order.UserId);
    }

    private Task UpdateOrderCount(long userId) => Task.CompletedTask;
}

六、异步事件处理

6.1 异步处理模式

Furion 事件总线天然支持异步处理,所有事件处理方法都返回 Task

public class AsyncEventSubscriber : IEventSubscriber
{
    private readonly IServiceScopeFactory _scopeFactory;

    public AsyncEventSubscriber(IServiceScopeFactory scopeFactory)
    {
        _scopeFactory = scopeFactory;
    }

    [EventSubscribe("data:import")]
    public async Task HandleDataImport(EventHandlerExecutingContext context)
    {
        var importData = context.Source.Payload as DataImportMessage;

        // 在新的作用域中处理,避免 DbContext 共享问题
        using var scope = _scopeFactory.CreateScope();
        var repository = scope.ServiceProvider.GetRequiredService<IRepository<DataRecord>>();

        foreach (var record in importData.Records)
        {
            await repository.InsertAsync(record.Adapt<DataRecord>());
        }

        await repository.SaveNowAsync();
    }
}

6.2 使用 CancellationToken

[EventSubscribe("longrunning:task")]
public async Task HandleLongRunningTask(EventHandlerExecutingContext context)
{
    var cancellationToken = context.CancellationToken;

    for (int i = 0; i < 1000; i++)
    {
        if (cancellationToken.IsCancellationRequested)
        {
            // 任务被取消
            break;
        }

        await Task.Delay(100, cancellationToken);
        // 执行耗时操作
    }
}

七、事件重试机制

7.1 配置重试策略

Furion 支持事件处理失败后的自动重试:

[EventSubscribe("order:payment", NumRetries = 3)]
public async Task HandlePayment(EventHandlerExecutingContext context)
{
    var order = context.Source.Payload as OrderEventMessage;

    // 调用支付接口,可能因网络问题失败
    var result = await CallPaymentApi(order);

    if (!result.Success)
    {
        // 抛出异常将触发重试
        throw new PaymentException($"支付失败:{result.Message}");
    }
}

7.2 自定义重试策略

public class PaymentEventSubscriber : IEventSubscriber
{
    [EventSubscribe("payment:process", NumRetries = 5)]
    public async Task HandlePayment(EventHandlerExecutingContext context)
    {
        var retryCount = context.Attribute.NumRetries;
        var currentRetry = 0;

        while (currentRetry < retryCount)
        {
            try
            {
                await ProcessPayment(context.Source.Payload);
                break;
            }
            catch (Exception ex) when (currentRetry < retryCount - 1)
            {
                currentRetry++;
                // 指数退避
                var delay = Math.Pow(2, currentRetry) * 1000;
                await Task.Delay((int)delay);
            }
        }
    }

    private Task ProcessPayment(object payload) => Task.CompletedTask;
}

7.3 重试与异常处理

[EventSubscribe("critical:operation", NumRetries = 3)]
public async Task HandleCriticalOperation(EventHandlerExecutingContext context)
{
    try
    {
        await ExecuteCriticalOperation(context.Source.Payload);
    }
    catch (Exception ex)
    {
        // 记录异常信息
        context.Attribute.SetProperty("LastError", ex.Message);

        // 如果不是最后一次重试,继续抛出触发重试
        if (context.Source is ChannelEventSource source)
        {
            throw; // 触发重试
        }
    }
}

private Task ExecuteCriticalOperation(object payload) => Task.CompletedTask;

八、事件执行顺序控制

8.1 使用 Order 属性

通过 Order 属性控制同一事件的多个处理器的执行顺序:

public class OrderedEventSubscriber : IEventSubscriber
{
    /// <summary>
    /// 第一步:验证数据
    /// </summary>
    [EventSubscribe("workflow:start", Order = 1)]
    public async Task Step1_ValidateData(EventHandlerExecutingContext context)
    {
        var data = context.Source.Payload;
        // 数据验证逻辑
        await Task.CompletedTask;
    }

    /// <summary>
    /// 第二步:处理业务
    /// </summary>
    [EventSubscribe("workflow:start", Order = 2)]
    public async Task Step2_ProcessBusiness(EventHandlerExecutingContext context)
    {
        // 业务处理逻辑
        await Task.CompletedTask;
    }

    /// <summary>
    /// 第三步:发送通知
    /// </summary>
    [EventSubscribe("workflow:start", Order = 3)]
    public async Task Step3_SendNotification(EventHandlerExecutingContext context)
    {
        // 发送通知逻辑
        await Task.CompletedTask;
    }
}

8.2 执行顺序说明

Order 值 执行优先级 说明
1 最高 最先执行
2 次高 第二执行
不指定 默认 按注册顺序
100 后执行

九、正则匹配事件

9.1 模糊匹配

使用 FuzzyMatch = true 启用正则表达式匹配:

public class RegexEventSubscriber : IEventSubscriber
{
    /// <summary>
    /// 匹配所有订单相关事件
    /// </summary>
    [EventSubscribe("order:(.*)", FuzzyMatch = true)]
    public async Task HandleAllOrderEvents(EventHandlerExecutingContext context)
    {
        var eventId = context.Source.EventId;
        var payload = context.Source.Payload;

        switch (eventId)
        {
            case "order:created":
                await HandleOrderCreated(payload);
                break;
            case "order:paid":
                await HandleOrderPaid(payload);
                break;
            case "order:cancelled":
                await HandleOrderCancelled(payload);
                break;
            default:
                // 处理其他订单事件
                break;
        }
    }

    /// <summary>
    /// 匹配所有以 "log:" 开头的事件
    /// </summary>
    [EventSubscribe("log:(info|warn|error)", FuzzyMatch = true)]
    public async Task HandleLogEvents(EventHandlerExecutingContext context)
    {
        var level = context.Source.EventId.Replace("log:", "");
        var message = context.Source.Payload as string;

        // 统一处理日志事件
        await WriteLog(level, message);
    }

    private Task HandleOrderCreated(object payload) => Task.CompletedTask;
    private Task HandleOrderPaid(object payload) => Task.CompletedTask;
    private Task HandleOrderCancelled(object payload) => Task.CompletedTask;
    private Task WriteLog(string level, string message) => Task.CompletedTask;
}

9.2 正则匹配常用模式

模式 说明 匹配示例
order:(.*) 匹配所有 order 前缀事件 order:created, order:paid
(user\|order):created 匹配多个模块的 created 事件 user:created, order:created
log:(info\|warn\|error) 匹配指定级别的日志事件 log:info, log:error
.*:deleted 匹配所有 deleted 后缀事件 user:deleted, order:deleted

十、自定义事件存储

10.1 实现自定义事件源存储器

默认情况下,Furion 使用 Channel 作为事件存储。可以通过实现 IEventSourceStorer 来自定义存储方式:

/// <summary>
/// 基于 Redis 的事件存储器
/// </summary>
public class RedisEventSourceStorer : IEventSourceStorer
{
    private readonly IConnectionMultiplexer _redis;
    private readonly ILogger<RedisEventSourceStorer> _logger;
    private const string QueueKey = "furion:event:queue";

    public RedisEventSourceStorer(
        IConnectionMultiplexer redis,
        ILogger<RedisEventSourceStorer> logger)
    {
        _redis = redis;
        _logger = logger;
    }

    /// <summary>
    /// 写入事件源
    /// </summary>
    public async ValueTask WriteAsync(IEventSource eventSource, CancellationToken cancellationToken)
    {
        var db = _redis.GetDatabase();
        var json = JsonSerializer.Serialize(eventSource);
        await db.ListRightPushAsync(QueueKey, json);
        _logger.LogDebug("事件已写入 Redis:{EventId}", eventSource.EventId);
    }

    /// <summary>
    /// 读取事件源
    /// </summary>
    public async ValueTask<IEventSource> ReadAsync(CancellationToken cancellationToken)
    {
        var db = _redis.GetDatabase();

        while (!cancellationToken.IsCancellationRequested)
        {
            var value = await db.ListLeftPopAsync(QueueKey);
            if (value.HasValue)
            {
                var eventSource = JsonSerializer.Deserialize<ChannelEventSource>(value);
                return eventSource;
            }

            // 没有消息时等待一段时间
            await Task.Delay(100, cancellationToken);
        }

        return default;
    }
}

10.2 注册自定义存储器

builder.Services.AddEventBus(options =>
{
    // 替换默认的事件存储器
    options.ReplaceStorer<RedisEventSourceStorer>();

    options.AddSubscribers();
});

10.3 基于数据库的事件存储

/// <summary>
/// 基于数据库的事件存储器(用于事件溯源)
/// </summary>
public class DbEventSourceStorer : IEventSourceStorer
{
    private readonly Channel<IEventSource> _channel;
    private readonly IServiceScopeFactory _scopeFactory;

    public DbEventSourceStorer(IServiceScopeFactory scopeFactory)
    {
        _scopeFactory = scopeFactory;
        _channel = Channel.CreateUnbounded<IEventSource>();
    }

    public async ValueTask WriteAsync(IEventSource eventSource, CancellationToken cancellationToken)
    {
        // 先持久化到数据库
        using var scope = _scopeFactory.CreateScope();
        var dbContext = scope.ServiceProvider.GetRequiredService<AppDbContext>();

        dbContext.EventLogs.Add(new EventLog
        {
            EventId = eventSource.EventId,
            Payload = JsonSerializer.Serialize(eventSource.Payload),
            CreatedTime = DateTime.Now,
            Status = "Pending"
        });
        await dbContext.SaveChangesAsync(cancellationToken);

        // 再写入 Channel 供消费
        await _channel.Writer.WriteAsync(eventSource, cancellationToken);
    }

    public async ValueTask<IEventSource> ReadAsync(CancellationToken cancellationToken)
    {
        var eventSource = await _channel.Reader.ReadAsync(cancellationToken);
        return eventSource;
    }
}

十一、事件总线最佳实践

11.1 事件命名规范

/// <summary>
/// 事件 Id 常量定义
/// 命名规范:{模块}:{动作}
/// </summary>
public static class EventIds
{
    // 用户模块
    public const string UserCreated = "user:created";
    public const string UserUpdated = "user:updated";
    public const string UserDeleted = "user:deleted";
    public const string UserLoggedIn = "user:loggedin";

    // 订单模块
    public const string OrderCreated = "order:created";
    public const string OrderPaid = "order:paid";
    public const string OrderShipped = "order:shipped";
    public const string OrderCompleted = "order:completed";
    public const string OrderCancelled = "order:cancelled";

    // 系统模块
    public const string SystemAlert = "system:alert";
    public const string SystemHealthCheck = "system:healthcheck";
}

11.2 事件处理中的依赖注入

public class ScopedEventSubscriber : IEventSubscriber
{
    private readonly IServiceScopeFactory _scopeFactory;

    public ScopedEventSubscriber(IServiceScopeFactory scopeFactory)
    {
        _scopeFactory = scopeFactory;
    }

    [EventSubscribe("data:process")]
    public async Task HandleDataProcess(EventHandlerExecutingContext context)
    {
        // 在新作用域中获取 Scoped 服务
        using var scope = _scopeFactory.CreateScope();
        var service = scope.ServiceProvider.GetRequiredService<IDataProcessService>();
        await service.ProcessAsync(context.Source.Payload);
    }
}

11.3 避免事件循环

// ❌ 错误示范:可能导致无限循环
[EventSubscribe("order:updated")]
public async Task HandleOrderUpdated(EventHandlerExecutingContext context)
{
    // 更新订单后又发布 order:updated 事件,导致死循环
    await _eventPublisher.PublishAsync(new ChannelEventSource("order:updated", newData));
}

// ✅ 正确做法:使用不同的事件 Id
[EventSubscribe("order:updated")]
public async Task HandleOrderUpdated(EventHandlerExecutingContext context)
{
    // 处理完成后发布不同的事件
    await _eventPublisher.PublishAsync(new ChannelEventSource("order:update:completed", result));
}

十二、与 MediatR 对比

12.1 功能对比

特性 Furion 事件总线 MediatR
通信模式 发布/订阅 请求/响应 + 通知
异步支持 ✅ 原生支持 ✅ 支持
重试机制 ✅ 内置支持 ❌ 需自行实现
正则匹配 ✅ 支持 ❌ 不支持
执行顺序 ✅ Order 控制 ✅ Pipeline 控制
依赖 无额外依赖 需要安装 NuGet 包
集成度 深度集成 Furion 独立库
延迟发布 ✅ 支持 ❌ 不支持
自定义存储 ✅ 支持 ❌ 不支持
学习曲线

12.2 MediatR 使用示例(对比参考)

// MediatR 方式
public class CreateOrderCommand : IRequest<OrderDto>
{
    public string ProductName { get; set; }
    public decimal Amount { get; set; }
}

public class CreateOrderHandler : IRequestHandler<CreateOrderCommand, OrderDto>
{
    public async Task<OrderDto> Handle(CreateOrderCommand request, CancellationToken ct)
    {
        // 处理逻辑
        return new OrderDto();
    }
}

// Furion 事件总线方式
// 发布
await _eventPublisher.PublishAsync(new ChannelEventSource("order:created", orderData));

// 订阅
[EventSubscribe("order:created")]
public async Task OnOrderCreated(EventHandlerExecutingContext context)
{
    // 处理逻辑
}

12.3 选型建议

  • 选择 Furion 事件总线:项目已使用 Furion 框架、需要重试机制、需要事件模式匹配、偏好零依赖方案
  • 选择 MediatR:需要请求/响应模式、项目不使用 Furion、团队熟悉 CQRS 模式、需要丰富的中间件管道

十三、完整示例

13.1 电商订单流程事件驱动

// 1. 定义事件消息
public class OrderCreatedEvent
{
    public long OrderId { get; set; }
    public long UserId { get; set; }
    public List<OrderItemDto> Items { get; set; }
    public decimal TotalAmount { get; set; }
}

// 2. 发布事件
[ApiDescriptionSettings("订单")]
public class OrderAppService : IDynamicApiController
{
    private readonly IEventPublisher _publisher;

    public OrderAppService(IEventPublisher publisher) => _publisher = publisher;

    public async Task<long> CreateOrder(CreateOrderInput input)
    {
        // ... 创建订单逻辑
        long orderId = 12345;

        await _publisher.PublishAsync(new ChannelEventSource(EventIds.OrderCreated, new OrderCreatedEvent
        {
            OrderId = orderId,
            UserId = input.UserId,
            Items = input.Items,
            TotalAmount = input.Items.Sum(x => x.Price * x.Quantity)
        }));

        return orderId;
    }
}

// 3. 库存扣减订阅者
public class InventorySubscriber : IEventSubscriber
{
    private readonly IServiceScopeFactory _scopeFactory;

    public InventorySubscriber(IServiceScopeFactory scopeFactory) => _scopeFactory = scopeFactory;

    [EventSubscribe("order:created", Order = 1)]
    public async Task DeductInventory(EventHandlerExecutingContext context)
    {
        var order = context.Source.Payload as OrderCreatedEvent;
        using var scope = _scopeFactory.CreateScope();
        var inventoryService = scope.ServiceProvider.GetRequiredService<IInventoryService>();

        foreach (var item in order.Items)
        {
            await inventoryService.DeductAsync(item.ProductId, item.Quantity);
        }
    }
}

// 4. 积分增加订阅者
public class PointsSubscriber : IEventSubscriber
{
    [EventSubscribe("order:created", Order = 2)]
    public async Task AddPoints(EventHandlerExecutingContext context)
    {
        var order = context.Source.Payload as OrderCreatedEvent;
        // 根据订单金额增加积分
        var points = (int)(order.TotalAmount);
        // ... 增加积分逻辑
        await Task.CompletedTask;
    }
}

// 5. 通知订阅者
public class OrderNotificationSubscriber : IEventSubscriber
{
    [EventSubscribe("order:created", Order = 3)]
    public async Task SendNotification(EventHandlerExecutingContext context)
    {
        var order = context.Source.Payload as OrderCreatedEvent;
        // 发送短信、推送通知等
        await Task.CompletedTask;
    }
}

通过事件总线,订单创建后的库存扣减、积分增加、通知发送等操作被解耦为独立的订阅者,每个订阅者可以独立开发、测试和维护,极大地提升了系统的可维护性和可扩展性。