eShopOnContainers 知多少[5]:EventBus With RabbitMQ

事件总线这个概念对你来说可能很陌生,但提到观察者(发布-订阅)模式,你也许就很熟悉。事件总线是对发布-订阅模式的一种实现。它是一种集中式事件处理机制,允许不同的组件之间进行彼此通信而又不需要相互依赖,达到一种解耦的目的。
eShopOnContainers 知多少[5]:EventBus With RabbitMQ
从上图可知,核心就4个角色:

  1. 事件(事件源+事件处理)
  2. 事件发布者
  3. 事件订阅者
  4. 事件总线

实现事件总线的关键是:

  1. 事件总线维护一个事件源与事件处理的映射字典;
  2. 通过单例模式,确保事件总线的唯一入口;
  3. 利用反射完成事件源与事件处理的初始化绑定;
  4. 提供统一的事件注册、取消注册和触发接口。

我们直接以上帝视角,来看下其实现机制,上类图。
eShopOnContainers 知多少[5]:EventBus With RabbitMQ

我们知道事件的本质是:事件源+事件处理
针对事件源,其定义了IntegrationEvent基类来处理。默认仅包含一个guid和一个创建日期,具体的事件可以通过继承该类,来完善事件的描述信息。

这里有必要解释下Integration Event(集成事件)。因为在微服务中事件的消费不再局限于当前领域内,而是多个微服务可能共享同一个事件,所以这里要和DDD中的领域事件区分开来。集成事件可用于跨多个微服务或外部系统同步领域状态,这是通过在微服务之外发布集成事件来实现的。

针对事件处理,其本质是对事件的反应,一个事件可引起多个反应,所以,它们之间是一对多的关系。
eShopOnContainers中抽象了两个事件处理的接口:

  1. IIntegrationEventHandler
  2. IDynamicIntegrationEventHandler

二者都定义了一个Handle方法用于响应事件。不同之处在于方法参数的类型:
第一个接受的是一个强类型的IntegrationEvent。第二个接收的是一个动态类型dynamic
为什么要单独提供一个事件源为dynamic类型的接口呢?
不是每一个事件源都需要详细的事件信息,所以一个强类型的参数约束就没有必要,通过dynamic可以简化事件源的构建,更趋于灵活。

有了事件源和事件处理,接下来就是事件的注册和订阅了。为了方便进行订阅管理,系统提供了额外的一层抽象IEventBusSubscriptionsManager,其用于维护事件的订阅和注销,以及订阅信息的持久化。其默认的实现InMemoryEventBusSubscriptionsManager就是使用内存进行存储事件源和事件处理的映射字典。
从类图中看InMemoryEventBusSubscriptionsManager中定义了一个内部类SubscriptionInfo,其主要用于表示事件订阅方的订阅类型和事件处理的类型。

我们来近距离看下InMemoryEventBusSubscriptionsManager的定义:

//InMemoryEventBusSubscriptionsManager.cs
//定义的事件名称和事件订阅的字典映射(1:N)
private readonly Dictionary<string, List<SubscriptionInfo>> _handlers;
//保存所有的事件处理类型
private readonly List<Type> _eventTypes;
//定义事件移除后事件
public event EventHandler<string> OnEventRemoved;

//构造函数初始化
public InMemoryEventBusSubscriptionsManager()
{
    _handlers = new Dictionary<string, List<SubscriptionInfo>>();
    _eventTypes = new List<Type>();
}
//添加动态类型事件订阅(需要手动指定事件名称)
public void AddDynamicSubscription<TH>(string eventName)
    where TH : IDynamicIntegrationEventHandler
{
    DoAddSubscription(typeof(TH), eventName, isDynamic: true);
}
//添加强类型事件订阅(事件名称为事件源类型)
public void AddSubscription<T, TH>()
    where T : IntegrationEvent
    where TH : IIntegrationEventHandler<T>
{
    var eventName = GetEventKey<T>();

    DoAddSubscription(typeof(TH), eventName, isDynamic: false);

    if (!_eventTypes.Contains(typeof(T)))
    {
        _eventTypes.Add(typeof(T));
    }
}
//移除动态类型事件订阅
public void RemoveDynamicSubscription<TH>(string eventName)
    where TH : IDynamicIntegrationEventHandler
{
    var handlerToRemove = FindDynamicSubscriptionToRemove<TH>(eventName);
    DoRemoveHandler(eventName, handlerToRemove);
}

//移除强类型事件订阅
public void RemoveSubscription<T, TH>()
    where TH : IIntegrationEventHandler<T>
    where T : IntegrationEvent
{
    var handlerToRemove = FindSubscriptionToRemove<T, TH>();
    var eventName = GetEventKey<T>();
    DoRemoveHandler(eventName, handlerToRemove);
}

添加了这么一层抽象,即符合了单一职责原则,又完成了代码重用。IEventBus的具体实现通过注入对IEventBusSubscriptionsManager的依赖,即可完成订阅管理。
你这里可能会好奇,为什么要暴露一个OnEventRemoved事件?这里先按住不表,留给大家思考。