JAVA進程內(nèi)消息組件分析SpringEvent與EventBus

前言

日常開發(fā)中經(jīng)常遇到一個業(yè)務(wù)發(fā)生之后需要觸發(fā)好幾個業(yè)務(wù)點,比如訂單支付完成之后需要發(fā)送短信、送會員積分、發(fā)送優(yōu)惠券等。在分布式系統(tǒng)中我們可以通過消息隊列實現(xiàn),各個系統(tǒng)之間訂閱支付成功事件,然后實現(xiàn)各自的業(yè)務(wù),達到一個系統(tǒng)之間的解耦和異步的目的。

如果在同一個進程中也存在類似的通知需求,通過消息隊列顯得太笨重而且也沒有跨進程或者系統(tǒng)架構(gòu)中都沒引入消息隊列。這時候要實現(xiàn)進程內(nèi)的消息通訊就可以通過Spring自帶的Event事件或者google的EventBus。

觀察者模式

不管是SpringEvent還是EventBus都是對觀察者模式的實現(xiàn)。與傳統(tǒng)的觀察者模式不同的是,它是觀察者模式的非顯示實現(xiàn),說白了就是通過第三方將消息發(fā)布者與訂閱者解耦。
傳統(tǒng)的觀察的模式需要在消息發(fā)布方維護一個訂閱者的隊列,耦合性是比較強的。而SpringEvent和EventBus是通過自身來管理發(fā)布者與訂閱者的關(guān)系,發(fā)布者不再關(guān)心有多少訂閱者,達到一個解耦的效果。

  • 傳統(tǒng)觀察者模式


    觀察者模式

    如上圖傳統(tǒng)的觀察者模式,事件發(fā)布方需要自身維護和監(jiān)聽者的關(guān)系,這樣做耦合性比較高。

  • 消息訂閱


    消息訂閱

    采用這種形式,事件發(fā)布者和監(jiān)聽者通過第三方來管理它們之間的對應(yīng)關(guān)系,這樣不要顯示的去訂閱和發(fā)布,達到一個解耦的效果。

SpringEvent

使用

  • 事件
    定義一個消息事件非常簡單,繼承ApplicationEvent就可以。
public class SpringObjectEvent extends ApplicationEvent {


    private String msg;

    public SpringObjectEvent(Object source) {
        super(source);
    }


    public String getMsg() {
        return msg;
    }

    public void setMsg(String msg) {
        this.msg = msg;
    }
}
  • 發(fā)布
    通過ApplicationContext發(fā)布事件
    @Resource
    private ApplicationContext applicationContext;

    public void publishSpringEvent(String msg){
        SpringObjectEvent springObjectEvent = new SpringObjectEvent(this);
        springObjectEvent.setMsg(msg);

        applicationContext.publishEvent(springObjectEvent);
    }
  • 訂閱
    訂閱消息有兩種方式一個是通過繼承另外一個是通過注解的方式。

繼承ApplicationListener

@Component
public class SpringObjectEventListener implements ApplicationListener<SpringObjectEvent> {


    @Override
    public void onApplicationEvent(SpringObjectEvent event) {
        System.out.println("Object:"+event.getMsg());
    }
}

注解 @EventListener

@Component
public class SpringAnnotationEventListener {

    @EventListener
    public void processSpringObjectEvent(SpringObjectEvent springObjectEvent){

        System.out.println("Annotation:"+springObjectEvent.getMsg());
    }
}

原理分析

本著知其然也知其所以然的原則,簡單分析一下SpringEvent的原理。
監(jiān)聽加載
第一個問題通過繼承或者注解定義的觀察者是什么時候添加到spring中去的。
這時候就需要找到spring的經(jīng)典方法org.springframework.context.support.AbstractApplicationContext#refresh,這方法在spring容器加載或者刷新的時候?qū)徽{(diào)用,如果要分析Spring的源碼這個方法是必須要看的。本文主要研究SpringEvent,這里也不會去詳細分析這個方法,只分析其中的兩步

                // 創(chuàng)建消息分發(fā)通知器
                initApplicationEventMulticaster();
                ......
                // 注冊監(jiān)聽器
                registerListeners();

initApplicationEventMulticaster()

    protected void initApplicationEventMulticaster() {
        ConfigurableListableBeanFactory beanFactory = getBeanFactory();
        //如果通知器已經(jīng)存在,給applicationEventMulticaster做一個簡單的賦值
        if (beanFactory.containsLocalBean(APPLICATION_EVENT_MULTICASTER_BEAN_NAME)) {
            this.applicationEventMulticaster =
                    beanFactory.getBean(APPLICATION_EVENT_MULTICASTER_BEAN_NAME, ApplicationEventMulticaster.class);
            if (logger.isTraceEnabled()) {
                logger.trace("Using ApplicationEventMulticaster [" + this.applicationEventMulticaster + "]");
            }
        }
        else {
            //不存在則創(chuàng)建一個SimpleApplicationEventMulticaster
            this.applicationEventMulticaster = new SimpleApplicationEventMulticaster(beanFactory);
            beanFactory.registerSingleton(APPLICATION_EVENT_MULTICASTER_BEAN_NAME, this.applicationEventMulticaster);
            if (logger.isTraceEnabled()) {
                logger.trace("No '" + APPLICATION_EVENT_MULTICASTER_BEAN_NAME + "' bean, using " +
                        "[" + this.applicationEventMulticaster.getClass().getSimpleName() + "]");
            }
        }
    }

整體流程很簡單如果applicationEventMulticaster在Spring容器中已經(jīng)存在則做一個賦值操作,如果不存在就創(chuàng)建一個SimpleApplicationEventMulticaster。
registerListeners()

    protected void registerListeners() {
        // 首先添加靜態(tài)指定的listener
        for (ApplicationListener<?> listener : getApplicationListeners()) {
            getApplicationEventMulticaster().addApplicationListener(listener);
        }

    
        //將繼承自ApplicationListener接口的bean加載到通知器中
        String[] listenerBeanNames = getBeanNamesForType(ApplicationListener.class, true, false);
        for (String listenerBeanName : listenerBeanNames) {
            getApplicationEventMulticaster().addApplicationListenerBean(listenerBeanName);
        }

        // 開始發(fā)布一些早期的事件
        Set<ApplicationEvent> earlyEventsToProcess = this.earlyApplicationEvents;
        this.earlyApplicationEvents = null;
        if (earlyEventsToProcess != null) {
            for (ApplicationEvent earlyEvent : earlyEventsToProcess) {
                getApplicationEventMulticaster().multicastEvent(earlyEvent);
            }
        }
    }

這個方法主要就是將定義的listener加載到通知器中,有兩個來源:一是Spring內(nèi)置的一些監(jiān)聽器,二是繼承ApplicationListener的listener。最后一步是將一些預(yù)存的事件通知出去。
在這個方法里面其實是沒有將通過@EventListener注解定義的listener加載進去的。那它是在什么時候加載進去的呢?找到類EventListenerMethodProcessor,看命名就知道是在這里做的處理,簡單分析一下。
EventListenerMethodProcessor中有個關(guān)鍵方法afterSingletonsInstantiated(),在bean實例化之后可以做一個切入,跟進去找到方法processBean
簡化流程

    //帶有@EventListener的方法集合
    Map<Method, EventListener> annotatedMethods = null;
        //遍歷集合
        for (Method method : annotatedMethods.keySet()) {
            for (EventListenerFactory factory : factories) {
                if (factory.supportsMethod(method)) {
                    Method methodToUse = AopUtils.selectInvocableMethod(method, context.getType(beanName));
                    //創(chuàng)建listener為ApplicationListenerMethodAdapter 
                    ApplicationListener<?> applicationListener =
                            factory.createApplicationListener(beanName, targetType, methodToUse);
                    if (applicationListener instanceof ApplicationListenerMethodAdapter) {
                      //初始化listener
                        ((ApplicationListenerMethodAdapter) applicationListener).init(context, this.evaluator);
                    }
                    //添加listener到ioc容器中
                    context.addApplicationListener(applicationListener);
                    break;
                }
            }
        }

上訴流程可以知道,spring在單例的bean實例化之后做了一個切入,將類中所有帶@EventListener注解的方法轉(zhuǎn)換為ApplicationListenerMethodAdapter(繼承自ApplicationListener),然后添加到容器中。
listener加載到這里分析完了,做一個簡單的總結(jié)

  • 在容器初始化的時候加載listener
  • 如果applicationEventMulticaster不存在則創(chuàng)建applicationEventMulticaster
  • 添加listener,包含內(nèi)置的listener,繼承ApplicationListener接口的listener,帶有@EventListener注解的方法。

通知
void publishEvent(Object event)
找到具體的實現(xiàn)org.springframework.context.event.SimpleApplicationEventMulticaster#multicastEvent(final ApplicationEvent event, @Nullable ResolvableType eventType)

    public void multicastEvent(final ApplicationEvent event, @Nullable ResolvableType eventType) {
        ResolvableType type = (eventType != null ? eventType : resolveDefaultEventType(event));
        Executor executor = getTaskExecutor();
        for (ApplicationListener<?> listener : getApplicationListeners(event, type)) {
            if (executor != null) {
                executor.execute(() -> invokeListener(listener, event));
            }
            else {
                invokeListener(listener, event);
            }
        }
    }

介紹流程之前,先說明SimpleApplicationEventMulticaster的兩個類變量

    //調(diào)用監(jiān)聽器的執(zhí)行器
    private Executor taskExecutor;

    //調(diào)用監(jiān)聽器的異常處理機制
    private ErrorHandler errorHandler;

事件通知流程非常的簡單,獲取當前事件監(jiān)聽的listener,如果執(zhí)行器不為空就交個執(zhí)行器執(zhí)行,否則交給調(diào)用線程執(zhí)行。這里的流程很簡單,就不具體分析了,這里有個效率優(yōu)化的地方就是獲取當前事件監(jiān)聽的listener時,其實是維護了一個ConcurrentHashMap作為緩存,避免每次通知都遍歷所有的listener。

同步異步

通過上面的源碼分析可以知道SpringEvent默認是同步實現(xiàn),如果要實現(xiàn)異步通知兩種方式

  1. 監(jiān)聽者異步
public class SpringObjectEventListener implements ApplicationListener<SpringObjectEvent> {
    //添加異步注解
    @Async
    @Override
    public void onApplicationEvent(SpringObjectEvent event) {
        System.out.println("Object:"+event.getMsg());
    }
}
  1. 自定義applicationEventMulticaster

前面源碼分析當容器中不存在applicationEventMulticaster才會創(chuàng)建一個默認的SimpleApplicationEventMulticaster,而且SimpleApplicationEventMulticaster默認taskExecutor為null,所以我們可以定義一個SimpleApplicationEventMulticaster并將taskExecutor設(shè)置一個執(zhí)行l(wèi)istener的線程池,從而達到異步執(zhí)行的效果。

    @Bean(name = "applicationEventMulticaster")
    public ApplicationEventMulticaster createApplicationEventMulticaster(){
        SimpleApplicationEventMulticaster applicationEventMulticaster = new SimpleApplicationEventMulticaster();
        //創(chuàng)建線程池
        DefaultThreadFactory threadFactory = new DefaultThreadFactory("listener-executor", false);
        ThreadPoolExecutor executor = new ThreadPoolExecutor(4, 8,
                60, TimeUnit.SECONDS, new LinkedBlockingDeque<>(1000), threadFactory);
        applicationEventMulticaster.setTaskExecutor(executor);
        return applicationEventMulticaster;
    }

EventBus

使用

  • 引入
    EventBus在google的guava包中,通過maven引入。
        <dependency>
            <groupId>com.google.guava</groupId>
            <artifactId>guava</artifactId>
            <version>29.0-jre</version>
        </dependency>
  • 事件
    EventBus的事件定義只要是Object類型就行
public class EventBusEvent {

    private String msg;

    public String getMsg() {
        return msg;
    }

    public void setMsg(String msg) {
        this.msg = msg;
    }
}
  • 初始化
   EventBus eventBus = new EventBus();
  • 發(fā)布
        EventBusEvent eventBusEvent = new EventBusEvent();
        eventBusEvent.setMsg(msg);
        eventBus.post(eventBusEvent);
  • 訂閱
    定于一個事件需要在方法上添加@Subscribe注解,然后再注冊到eventBus中
public class EventBusListener {

    @Subscribe
    public void subscribe(EventBusEvent e) {
        System.out.println(e.getMsg());
    }

}

    eventBus.register(new EventBusListener());

原理分析

在具體分析源碼之前,我們通過已經(jīng)分析過的SpringEvent的一個基本原理,參考EventBus的一個使用流程。在不分析源碼的情況下,去猜測它的一個實現(xiàn)原理是怎么樣的。當一個監(jiān)聽者注冊到EventBus中的時候,是不是會將這個類上帶有@Subscribe的方法都當做一個監(jiān)聽者,然后有一個數(shù)據(jù)結(jié)構(gòu)去保存事件和監(jiān)聽者的一個關(guān)聯(lián)關(guān)系。
然后通過EventBus發(fā)布一個事件的時候,通過內(nèi)部保存的一個事件與監(jiān)聽者的管理關(guān)系,就可以找到監(jiān)聽者,然后就可以對其通知了呢?
接下來簡單分析一下EventBus的源碼,看對它的猜測對不對。

監(jiān)聽加載

注冊監(jiān)聽器

  public void register(Object object) {
    subscribers.register(object);
  }

這里的subscribers就是SubscriberRegistry
先看SubscriberRegistry的成員變量subscribers

  private final ConcurrentMap<Class<?>, CopyOnWriteArraySet<Subscriber>> subscribers =
      Maps.newConcurrentMap();

可以知道EventBus是通過一個ConcurrentMap保存事件與監(jiān)聽者的關(guān)系的,監(jiān)聽者是讀多寫少的創(chuàng)建所以用一個CopyOnWriteArraySet存儲。
register

  void register(Object listener) {
    //查找?guī)в蠤Subscribe的方法、并封裝成Subscriber
    Multimap<Class<?>, Subscriber> listenerMethods = findAllSubscribers(listener);
    //遍歷集合,將監(jiān)聽器添加到subscribers中
    for (Entry<Class<?>, Collection<Subscriber>> entry : listenerMethods.asMap().entrySet()) {
      Class<?> eventType = entry.getKey();
      Collection<Subscriber> eventMethodsInListener = entry.getValue();

      CopyOnWriteArraySet<Subscriber> eventSubscribers = subscribers.get(eventType);

      if (eventSubscribers == null) {
        CopyOnWriteArraySet<Subscriber> newSet = new CopyOnWriteArraySet<>();
        eventSubscribers =
            MoreObjects.firstNonNull(subscribers.putIfAbsent(eventType, newSet), newSet);
      }

      eventSubscribers.addAll(eventMethodsInListener);
    }
  }

上面流程比較簡單清晰,和我們預(yù)先分析的差不多,先解析帶有@Subscriber注解的方法,然后將這些監(jiān)聽器添加到一個Map中,這個Map用來保存事件與監(jiān)聽者的關(guān)系。
通知

  public void post(Object event) {
     //獲取事件的監(jiān)聽者
    Iterator<Subscriber> eventSubscribers = subscribers.getSubscribers(event);
    if (eventSubscribers.hasNext()) {
      //分發(fā)事件
      dispatcher.dispatch(event, eventSubscribers);
    } else if (!(event instanceof DeadEvent)) {
      // the event had no subscribers and was not itself a DeadEvent
      post(new DeadEvent(this, event));
    }
  }

上訴流程也很簡單,從subscribers獲取該事件的監(jiān)聽者,獲取方式就是map.get(Object key)。如果有監(jiān)聽者則開始分發(fā)事情,如果又沒有監(jiān)聽者也不是DeadEvent,將會把該事件封裝一個DeadEvent然后再調(diào)用post方法。
這里需要分析一下dispatcher事件分發(fā)器。

  /** Dispatches the given {@code event} to the given {@code subscribers}. */
  abstract void dispatch(Object event, Iterator<Subscriber> subscribers);

這是一個抽象方法,一共有三個實現(xiàn)類ImmediateDispatcher、PerThreadQueuedDispatcher、LegacyAsyncDispatcher。

  • ImmediateDispatcher:直接遍歷subscribers,并且立即執(zhí)行。
  • PerThreadQueuedDispatcher:(EventBus默認的分發(fā)器)內(nèi)部用ThreadLocal為每個調(diào)用線程維護了一個隊列,保證各個調(diào)用線程的subscribers執(zhí)行的有序性。主要流程為先把subscribers加入隊列,然后再從隊列中取出執(zhí)行,保證監(jiān)聽者在所有調(diào)用線程執(zhí)行的順序性。
  • LegacyAsyncDispatcher:傳統(tǒng)異步分發(fā)器AsyncEventBus(AsyncEventBus默認的分發(fā)器),內(nèi)部通過一個全局的queue去保存subscribers。執(zhí)行流程與PerThreadQueuedDispatcher差不多,subscribers加入隊列,然后再從隊列中取出執(zhí)行,區(qū)別在于PerThreadQueuedDispatcher為每個調(diào)用線程都維護一個隊列,而這里是所有線程共享一個全局隊列。這不過這里的任務(wù)執(zhí)行是異步執(zhí)行的,所以其實這個queue并沒有保證subscribers執(zhí)行的總體有序性,而是盡量去保證一個執(zhí)行順序。

同步異步

EventBus默認是同步的,要構(gòu)造異步的通知使用AsyncEventBus,傳入任務(wù)執(zhí)行的線程池就可以了。

  public AsyncEventBus(Executor executor) {
    super("default", executor, Dispatcher.legacyAsync(), LoggingHandler.INSTANCE);
  }

總結(jié)

分析了上面兩種關(guān)于進程內(nèi)的通訊組件,其實有很多相同的設(shè)計思路。如果現(xiàn)在讓你自己實現(xiàn)一個類似的組件相信也可以輕松的實現(xiàn)了。那如果是實現(xiàn)進程之間的消息隊列呢?一個可持久化的消息隊列、一個高可用的消息隊列、一個高吞吐的消息隊列......這時候就需要去學習Kafka、RocketMQ才能回答這個問題了。

對比了SpringEvent與EventBus,感覺兩個的功能都差不多,都能實現(xiàn)同步與異步,實現(xiàn)消息通知也能簡單。但是個人覺得如果項目中使用了Spring,還是使用Spring自帶的通知機制比較好,不用引入第三方的包,而且idea也自帶快捷鍵查找消息的訂閱者,會比較方便。
大家這做項目中如果需要一個業(yè)務(wù)的觸發(fā)點需要多個地方去處理的情況下,不妨考慮這種實現(xiàn)方式。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

友情鏈接更多精彩內(nèi)容