前言
EventBus的核心思想是观察者模式 (生产/消费者编程模型) 。
SpringBoot+EventBus使用教程(一)
SpringBoot+EventBus使用教程(二)
通过前面的文章我们已经知道,如何使用eventBus了。我们需要先定义一个Observer(前文中的EventListener类),然后将其注册到eventBus里,通过 @Subscribe 定义消息回调函数。
那我们先看看register(Object object) 和unregister(Object object) 方法。
register (Object object) 解析
1 2 3 4 5 6 7 8 9 10 |
public void register(Object object) { Multimap<Class<?>, EventSubscriber> methodsInListener = finder.findAllSubscribers(object); subscribersByTypeLock.writeLock().lock(); try { subscribersByType.putAll(methodsInListener); } finally { subscribersByTypeLock.writeLock().unlock(); } } |
可以看到是先通过SubscriberFindingStrategy接口里的findAllSubscribers方法获取所有标记了@ Subscribe 注解的方法,其中该接口的具体实现是AnnotatedSubscriberFinder类。放到一个guava里定义的Multimap里。然后是把获取到的methodsInListener放到一个叫subscribersByType的 guava里定义的SetMultimap里 。
1 2 3 4 5 6 7 8 9 10 11 |
public Multimap<Class<?>, EventSubscriber> findAllSubscribers(Object listener) { Multimap<Class<?>, EventSubscriber> methodsInListener = HashMultimap.create(); Class<?> clazz = listener.getClass(); for (Method method : getAnnotatedMethods(clazz)) { Class<?>[] parameterTypes = method.getParameterTypes(); Class<?> eventType = parameterTypes[0]; EventSubscriber subscriber = makeSubscriber(listener, method); methodsInListener.put(eventType, subscriber); } return methodsInListener; } |
findAllSubscribers方法里,最重要的是methodsInListener,它的结构可以简单理解为一个map,其中key是eventType,在我前文写的例子中就是com.sww.eventbus.domain.MessageEvent,其中value是subscriber,就是例子中的com.sww.eventbus.listener.EventListener#onMessageEvent。
总之,一句话就是先通过标记找到所有已经注册进来的观察者,然后存放到容器里备用。
那unregister就是从容器删除它们,
unRegister (Object object) 解析
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
public void unregister(Object object) { Multimap<Class<?>, EventSubscriber> methodsInListener = finder.findAllSubscribers(object); for (Entry<Class<?>, Collection<EventSubscriber>> entry : methodsInListener.asMap().entrySet()) { Class<?> eventType = entry.getKey(); Collection<EventSubscriber> eventMethodsInListener = entry.getValue(); subscribersByTypeLock.writeLock().lock(); try { Set<EventSubscriber> currentSubscribers = subscribersByType.get(eventType); if (!currentSubscribers.containsAll(eventMethodsInListener)) { throw new IllegalArgumentException( "missing event subscriber for an annotated method. Is " + object + " registered?"); } currentSubscribers.removeAll(eventMethodsInListener); } finally { subscribersByTypeLock.writeLock().unlock(); } } } |
post( Object event)解析
有了观察者,下面就是发送事件了,阅读过前文会知道是通过eventBus.post(Object event)来发送事件消息。那咱们来看看这个post方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
public void post(Object event) { Set<Class<?>> dispatchTypes = flattenHierarchy(event.getClass()); boolean dispatched = false; for (Class<?> eventType : dispatchTypes) { subscribersByTypeLock.readLock().lock(); try { Set<EventSubscriber> wrappers = subscribersByType.get(eventType); if (!wrappers.isEmpty()) { dispatched = true; for (EventSubscriber wrapper : wrappers) { enqueueEvent(event, wrapper); } } } finally { subscribersByTypeLock.readLock().unlock(); } } if (!dispatched && !(event instanceof DeadEvent)) { post(new DeadEvent(this, event)); } dispatchQueuedEvents(); } |
该方法就是从之前的容器subscribersByType里获取到eventType对应的观察者,然后组装成EventWithSubscriber放到队列里。
1 2 3 |
void enqueueEvent(Object event, EventSubscriber subscriber) { eventsToDispatch.get().offer(new EventWithSubscriber(event, subscriber)); } |
然后就是最后的dispatchQueuedEvents(),经过一层层深入进去,可以发现wrapper.handleEvent(event),其中 handleEvent方法就是最终的关键了
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
public void handleEvent(Object event) throws InvocationTargetException { checkNotNull(event); try { method.invoke(target, new Object[] { event }); } catch (IllegalArgumentException e) { throw new Error("Method rejected target/argument: " + event, e); } catch (IllegalAccessException e) { throw new Error("Method became inaccessible: " + event, e); } catch (InvocationTargetException e) { if (e.getCause() instanceof Error) { throw (Error) e.getCause(); } throw e; } } |
就是通过Java的反射机制实现。
需要说明的是,如果没有订阅者注册到要发送的event事件上,并且该event不是DeadEvent,那么它将被包装成DeadEvent中并重新发布。也就是其中这三行代码索要做的
1 2 3 |
if (!dispatched && !(event instanceof DeadEvent)) { post(new DeadEvent(this, event)); } |