前言
日常開發(fā)中經(jīng)常遇到一個業(yè)務(wù)發(fā)生之后需要觸發(fā)好幾個業(yè)務(wù)點,比如訂單支付完成之后需要發(fā)送短信、送會員積分、發(fā)送優(yōu)惠券等。在分布式系統(tǒng)中我們可以通過消息隊列實現(xiàn),各個系統(tǒng)之間訂閱支付成功事件,然后實現(xiàn)各自的業(yè)務(wù),達到一個系統(tǒng)之間的解耦和異步的目的。
如果在同一個進程中也存在類似的通知需求,通過消息隊列顯得太笨重而且也沒有跨進程或者系統(tǒng)架構(gòu)中都沒引入消息隊列。這時候要實現(xiàn)進程內(nèi)的消息通訊就可以通過Spring自帶的Event事件或者google的EventBus。
觀察者模式
不管是SpringEvent還是EventBus都是對觀察者模式的實現(xiàn)。與傳統(tǒng)的觀察者模式不同的是,它是觀察者模式的非顯示實現(xiàn),說白了就是通過第三方將消息發(fā)布者與訂閱者解耦。
傳統(tǒng)的觀察的模式需要在消息發(fā)布方維護一個訂閱者的隊列,耦合性是比較強的。而SpringEvent和EventBus是通過自身來管理發(fā)布者與訂閱者的關(guān)系,發(fā)布者不再關(guān)心有多少訂閱者,達到一個解耦的效果。
-
傳統(tǒng)觀察者模式
觀察者模式
如上圖傳統(tǒng)的觀察者模式,事件發(fā)布方需要自身維護和監(jiān)聽者的關(guān)系,這樣做耦合性比較高。
-
消息訂閱
消息訂閱
采用這種形式,事件發(fā)布者和監(jiān)聽者通過第三方來管理它們之間的對應(yīng)關(guān)系,這樣不要顯示的去訂閱和發(fā)布,達到一個解耦的效果。
SpringEvent
使用
- 事件
定義一個消息事件非常簡單,繼承ApplicationEvent就可以。
public class SpringObjectEvent extends ApplicationEvent {
private String msg;
public SpringObjectEvent(Object source) {
super(source);
}
public String getMsg() {
return msg;
}
public void setMsg(String msg) {
this.msg = msg;
}
}
- 發(fā)布
通過ApplicationContext發(fā)布事件
@Resource
private ApplicationContext applicationContext;
public void publishSpringEvent(String msg){
SpringObjectEvent springObjectEvent = new SpringObjectEvent(this);
springObjectEvent.setMsg(msg);
applicationContext.publishEvent(springObjectEvent);
}
- 訂閱
訂閱消息有兩種方式一個是通過繼承另外一個是通過注解的方式。
繼承ApplicationListener
@Component
public class SpringObjectEventListener implements ApplicationListener<SpringObjectEvent> {
@Override
public void onApplicationEvent(SpringObjectEvent event) {
System.out.println("Object:"+event.getMsg());
}
}
注解 @EventListener
@Component
public class SpringAnnotationEventListener {
@EventListener
public void processSpringObjectEvent(SpringObjectEvent springObjectEvent){
System.out.println("Annotation:"+springObjectEvent.getMsg());
}
}
原理分析
本著知其然也知其所以然的原則,簡單分析一下SpringEvent的原理。
監(jiān)聽加載
第一個問題通過繼承或者注解定義的觀察者是什么時候添加到spring中去的。
這時候就需要找到spring的經(jīng)典方法org.springframework.context.support.AbstractApplicationContext#refresh,這方法在spring容器加載或者刷新的時候?qū)徽{(diào)用,如果要分析Spring的源碼這個方法是必須要看的。本文主要研究SpringEvent,這里也不會去詳細分析這個方法,只分析其中的兩步
// 創(chuàng)建消息分發(fā)通知器
initApplicationEventMulticaster();
......
// 注冊監(jiān)聽器
registerListeners();
initApplicationEventMulticaster()
protected void initApplicationEventMulticaster() {
ConfigurableListableBeanFactory beanFactory = getBeanFactory();
//如果通知器已經(jīng)存在,給applicationEventMulticaster做一個簡單的賦值
if (beanFactory.containsLocalBean(APPLICATION_EVENT_MULTICASTER_BEAN_NAME)) {
this.applicationEventMulticaster =
beanFactory.getBean(APPLICATION_EVENT_MULTICASTER_BEAN_NAME, ApplicationEventMulticaster.class);
if (logger.isTraceEnabled()) {
logger.trace("Using ApplicationEventMulticaster [" + this.applicationEventMulticaster + "]");
}
}
else {
//不存在則創(chuàng)建一個SimpleApplicationEventMulticaster
this.applicationEventMulticaster = new SimpleApplicationEventMulticaster(beanFactory);
beanFactory.registerSingleton(APPLICATION_EVENT_MULTICASTER_BEAN_NAME, this.applicationEventMulticaster);
if (logger.isTraceEnabled()) {
logger.trace("No '" + APPLICATION_EVENT_MULTICASTER_BEAN_NAME + "' bean, using " +
"[" + this.applicationEventMulticaster.getClass().getSimpleName() + "]");
}
}
}
整體流程很簡單如果applicationEventMulticaster在Spring容器中已經(jīng)存在則做一個賦值操作,如果不存在就創(chuàng)建一個SimpleApplicationEventMulticaster。
registerListeners()
protected void registerListeners() {
// 首先添加靜態(tài)指定的listener
for (ApplicationListener<?> listener : getApplicationListeners()) {
getApplicationEventMulticaster().addApplicationListener(listener);
}
//將繼承自ApplicationListener接口的bean加載到通知器中
String[] listenerBeanNames = getBeanNamesForType(ApplicationListener.class, true, false);
for (String listenerBeanName : listenerBeanNames) {
getApplicationEventMulticaster().addApplicationListenerBean(listenerBeanName);
}
// 開始發(fā)布一些早期的事件
Set<ApplicationEvent> earlyEventsToProcess = this.earlyApplicationEvents;
this.earlyApplicationEvents = null;
if (earlyEventsToProcess != null) {
for (ApplicationEvent earlyEvent : earlyEventsToProcess) {
getApplicationEventMulticaster().multicastEvent(earlyEvent);
}
}
}
這個方法主要就是將定義的listener加載到通知器中,有兩個來源:一是Spring內(nèi)置的一些監(jiān)聽器,二是繼承ApplicationListener的listener。最后一步是將一些預(yù)存的事件通知出去。
在這個方法里面其實是沒有將通過@EventListener注解定義的listener加載進去的。那它是在什么時候加載進去的呢?找到類EventListenerMethodProcessor,看命名就知道是在這里做的處理,簡單分析一下。
EventListenerMethodProcessor中有個關(guān)鍵方法afterSingletonsInstantiated(),在bean實例化之后可以做一個切入,跟進去找到方法processBean
簡化流程
//帶有@EventListener的方法集合
Map<Method, EventListener> annotatedMethods = null;
//遍歷集合
for (Method method : annotatedMethods.keySet()) {
for (EventListenerFactory factory : factories) {
if (factory.supportsMethod(method)) {
Method methodToUse = AopUtils.selectInvocableMethod(method, context.getType(beanName));
//創(chuàng)建listener為ApplicationListenerMethodAdapter
ApplicationListener<?> applicationListener =
factory.createApplicationListener(beanName, targetType, methodToUse);
if (applicationListener instanceof ApplicationListenerMethodAdapter) {
//初始化listener
((ApplicationListenerMethodAdapter) applicationListener).init(context, this.evaluator);
}
//添加listener到ioc容器中
context.addApplicationListener(applicationListener);
break;
}
}
}
上訴流程可以知道,spring在單例的bean實例化之后做了一個切入,將類中所有帶@EventListener注解的方法轉(zhuǎn)換為ApplicationListenerMethodAdapter(繼承自ApplicationListener),然后添加到容器中。
listener加載到這里分析完了,做一個簡單的總結(jié)
- 在容器初始化的時候加載listener
- 如果applicationEventMulticaster不存在則創(chuàng)建applicationEventMulticaster
- 添加listener,包含內(nèi)置的listener,繼承ApplicationListener接口的listener,帶有@EventListener注解的方法。
通知
void publishEvent(Object event)
找到具體的實現(xiàn)org.springframework.context.event.SimpleApplicationEventMulticaster#multicastEvent(final ApplicationEvent event, @Nullable ResolvableType eventType)
public void multicastEvent(final ApplicationEvent event, @Nullable ResolvableType eventType) {
ResolvableType type = (eventType != null ? eventType : resolveDefaultEventType(event));
Executor executor = getTaskExecutor();
for (ApplicationListener<?> listener : getApplicationListeners(event, type)) {
if (executor != null) {
executor.execute(() -> invokeListener(listener, event));
}
else {
invokeListener(listener, event);
}
}
}
介紹流程之前,先說明SimpleApplicationEventMulticaster的兩個類變量
//調(diào)用監(jiān)聽器的執(zhí)行器
private Executor taskExecutor;
//調(diào)用監(jiān)聽器的異常處理機制
private ErrorHandler errorHandler;
事件通知流程非常的簡單,獲取當前事件監(jiān)聽的listener,如果執(zhí)行器不為空就交個執(zhí)行器執(zhí)行,否則交給調(diào)用線程執(zhí)行。這里的流程很簡單,就不具體分析了,這里有個效率優(yōu)化的地方就是獲取當前事件監(jiān)聽的listener時,其實是維護了一個ConcurrentHashMap作為緩存,避免每次通知都遍歷所有的listener。
同步異步
通過上面的源碼分析可以知道SpringEvent默認是同步實現(xiàn),如果要實現(xiàn)異步通知兩種方式
- 監(jiān)聽者異步
public class SpringObjectEventListener implements ApplicationListener<SpringObjectEvent> {
//添加異步注解
@Async
@Override
public void onApplicationEvent(SpringObjectEvent event) {
System.out.println("Object:"+event.getMsg());
}
}
- 自定義applicationEventMulticaster
前面源碼分析當容器中不存在applicationEventMulticaster才會創(chuàng)建一個默認的SimpleApplicationEventMulticaster,而且SimpleApplicationEventMulticaster默認taskExecutor為null,所以我們可以定義一個SimpleApplicationEventMulticaster并將taskExecutor設(shè)置一個執(zhí)行l(wèi)istener的線程池,從而達到異步執(zhí)行的效果。
@Bean(name = "applicationEventMulticaster")
public ApplicationEventMulticaster createApplicationEventMulticaster(){
SimpleApplicationEventMulticaster applicationEventMulticaster = new SimpleApplicationEventMulticaster();
//創(chuàng)建線程池
DefaultThreadFactory threadFactory = new DefaultThreadFactory("listener-executor", false);
ThreadPoolExecutor executor = new ThreadPoolExecutor(4, 8,
60, TimeUnit.SECONDS, new LinkedBlockingDeque<>(1000), threadFactory);
applicationEventMulticaster.setTaskExecutor(executor);
return applicationEventMulticaster;
}
EventBus
使用
- 引入
EventBus在google的guava包中,通過maven引入。
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>29.0-jre</version>
</dependency>
- 事件
EventBus的事件定義只要是Object類型就行
public class EventBusEvent {
private String msg;
public String getMsg() {
return msg;
}
public void setMsg(String msg) {
this.msg = msg;
}
}
- 初始化
EventBus eventBus = new EventBus();
- 發(fā)布
EventBusEvent eventBusEvent = new EventBusEvent();
eventBusEvent.setMsg(msg);
eventBus.post(eventBusEvent);
- 訂閱
定于一個事件需要在方法上添加@Subscribe注解,然后再注冊到eventBus中
public class EventBusListener {
@Subscribe
public void subscribe(EventBusEvent e) {
System.out.println(e.getMsg());
}
}
eventBus.register(new EventBusListener());
原理分析
在具體分析源碼之前,我們通過已經(jīng)分析過的SpringEvent的一個基本原理,參考EventBus的一個使用流程。在不分析源碼的情況下,去猜測它的一個實現(xiàn)原理是怎么樣的。當一個監(jiān)聽者注冊到EventBus中的時候,是不是會將這個類上帶有@Subscribe的方法都當做一個監(jiān)聽者,然后有一個數(shù)據(jù)結(jié)構(gòu)去保存事件和監(jiān)聽者的一個關(guān)聯(lián)關(guān)系。
然后通過EventBus發(fā)布一個事件的時候,通過內(nèi)部保存的一個事件與監(jiān)聽者的管理關(guān)系,就可以找到監(jiān)聽者,然后就可以對其通知了呢?
接下來簡單分析一下EventBus的源碼,看對它的猜測對不對。
監(jiān)聽加載
注冊監(jiān)聽器
public void register(Object object) {
subscribers.register(object);
}
這里的subscribers就是SubscriberRegistry
先看SubscriberRegistry的成員變量subscribers
private final ConcurrentMap<Class<?>, CopyOnWriteArraySet<Subscriber>> subscribers =
Maps.newConcurrentMap();
可以知道EventBus是通過一個ConcurrentMap保存事件與監(jiān)聽者的關(guān)系的,監(jiān)聽者是讀多寫少的創(chuàng)建所以用一個CopyOnWriteArraySet存儲。
register
void register(Object listener) {
//查找?guī)в蠤Subscribe的方法、并封裝成Subscriber
Multimap<Class<?>, Subscriber> listenerMethods = findAllSubscribers(listener);
//遍歷集合,將監(jiān)聽器添加到subscribers中
for (Entry<Class<?>, Collection<Subscriber>> entry : listenerMethods.asMap().entrySet()) {
Class<?> eventType = entry.getKey();
Collection<Subscriber> eventMethodsInListener = entry.getValue();
CopyOnWriteArraySet<Subscriber> eventSubscribers = subscribers.get(eventType);
if (eventSubscribers == null) {
CopyOnWriteArraySet<Subscriber> newSet = new CopyOnWriteArraySet<>();
eventSubscribers =
MoreObjects.firstNonNull(subscribers.putIfAbsent(eventType, newSet), newSet);
}
eventSubscribers.addAll(eventMethodsInListener);
}
}
上面流程比較簡單清晰,和我們預(yù)先分析的差不多,先解析帶有@Subscriber注解的方法,然后將這些監(jiān)聽器添加到一個Map中,這個Map用來保存事件與監(jiān)聽者的關(guān)系。
通知
public void post(Object event) {
//獲取事件的監(jiān)聽者
Iterator<Subscriber> eventSubscribers = subscribers.getSubscribers(event);
if (eventSubscribers.hasNext()) {
//分發(fā)事件
dispatcher.dispatch(event, eventSubscribers);
} else if (!(event instanceof DeadEvent)) {
// the event had no subscribers and was not itself a DeadEvent
post(new DeadEvent(this, event));
}
}
上訴流程也很簡單,從subscribers獲取該事件的監(jiān)聽者,獲取方式就是map.get(Object key)。如果有監(jiān)聽者則開始分發(fā)事情,如果又沒有監(jiān)聽者也不是DeadEvent,將會把該事件封裝一個DeadEvent然后再調(diào)用post方法。
這里需要分析一下dispatcher事件分發(fā)器。
/** Dispatches the given {@code event} to the given {@code subscribers}. */
abstract void dispatch(Object event, Iterator<Subscriber> subscribers);
這是一個抽象方法,一共有三個實現(xiàn)類ImmediateDispatcher、PerThreadQueuedDispatcher、LegacyAsyncDispatcher。
- ImmediateDispatcher:直接遍歷subscribers,并且立即執(zhí)行。
- PerThreadQueuedDispatcher:(EventBus默認的分發(fā)器)內(nèi)部用ThreadLocal為每個調(diào)用線程維護了一個隊列,保證各個調(diào)用線程的subscribers執(zhí)行的有序性。主要流程為先把subscribers加入隊列,然后再從隊列中取出執(zhí)行,保證監(jiān)聽者在所有調(diào)用線程執(zhí)行的順序性。
- LegacyAsyncDispatcher:傳統(tǒng)異步分發(fā)器AsyncEventBus(AsyncEventBus默認的分發(fā)器),內(nèi)部通過一個全局的queue去保存subscribers。執(zhí)行流程與PerThreadQueuedDispatcher差不多,subscribers加入隊列,然后再從隊列中取出執(zhí)行,區(qū)別在于PerThreadQueuedDispatcher為每個調(diào)用線程都維護一個隊列,而這里是所有線程共享一個全局隊列。這不過這里的任務(wù)執(zhí)行是異步執(zhí)行的,所以其實這個queue并沒有保證subscribers執(zhí)行的總體有序性,而是盡量去保證一個執(zhí)行順序。
同步異步
EventBus默認是同步的,要構(gòu)造異步的通知使用AsyncEventBus,傳入任務(wù)執(zhí)行的線程池就可以了。
public AsyncEventBus(Executor executor) {
super("default", executor, Dispatcher.legacyAsync(), LoggingHandler.INSTANCE);
}
總結(jié)
分析了上面兩種關(guān)于進程內(nèi)的通訊組件,其實有很多相同的設(shè)計思路。如果現(xiàn)在讓你自己實現(xiàn)一個類似的組件相信也可以輕松的實現(xiàn)了。那如果是實現(xiàn)進程之間的消息隊列呢?一個可持久化的消息隊列、一個高可用的消息隊列、一個高吞吐的消息隊列......這時候就需要去學習Kafka、RocketMQ才能回答這個問題了。
對比了SpringEvent與EventBus,感覺兩個的功能都差不多,都能實現(xiàn)同步與異步,實現(xiàn)消息通知也能簡單。但是個人覺得如果項目中使用了Spring,還是使用Spring自帶的通知機制比較好,不用引入第三方的包,而且idea也自帶快捷鍵查找消息的訂閱者,會比較方便。
大家這做項目中如果需要一個業(yè)務(wù)的觸發(fā)點需要多個地方去處理的情況下,不妨考慮這種實現(xiàn)方式。

