Spring的事件驅(qū)動(dòng)模型

1. 簡(jiǎn)單介紹:

事件Event驅(qū)動(dòng)模型實(shí)際也被稱之為觀察者模式,或者發(fā)布/訂閱模型。
Spring中,也為我們提供了這樣的框架,采用Event/Listener這樣的好處自然不用多說(shuō),就是解耦,利于擴(kuò)展,并且利于一對(duì)多這種形式,我們下面就來(lái)介紹下Spring的Event模式:

2. 核心類:

想要了解Spring的事件模型,需要了解一下幾個(gè)類:

  • ApplicationEvent
    事件本身,繼承自Java EventObject。
  • ApplicationListener
    監(jiān)聽(tīng)者對(duì)象,能夠選擇監(jiān)聽(tīng)不同的事件Event
public interface ApplicationListener<E extends ApplicationEvent> extends EventListener
  • ApplicationEventMulticaster
    事件廣播,也是Spring事件模型中的核心對(duì)象,事件發(fā)布-監(jiān)聽(tīng)依靠的就是ApplicationEventMulticaster,其內(nèi)部類ListenerRetriever按照不同的ApplicationEvent類型對(duì)Listener進(jìn)行了保存,這也是我們?yōu)槭裁茨軌蜥槍?duì)不同的Event進(jìn)行過(guò)濾從而喚醒Listener的過(guò)程。
  • ApplicationEventPublisher
    事件的發(fā)布者,通過(guò)ApplicationEventPublisher#publishEvent的方法進(jìn)行事件的廣播。
  • ApplicationEventPublisherAware
    獲取ApplicationEventPublisher的方法。
  • AbstractApplicationContext
    Spring的事件模型框架的主要邏輯的內(nèi)容出。AbstractApplicationContext中實(shí)現(xiàn)了ApplicationEventPublisher接口,從而實(shí)際在Spring中,默認(rèn)的ApplicationEventPublisher實(shí)現(xiàn)就是AbstractApplicationContext。

3. 例子

我們以如下的情況為例:
以問(wèn)答知乎為例,我們?cè)谛略鲆粋€(gè)問(wèn)題時(shí),需要異步的去建立索引。而建立索引的過(guò)程由于和新增的邏輯沒(méi)有必然的聯(lián)系,所以可以通過(guò)異步的方式來(lái)進(jìn)行,而這種事件的方式能夠很好的進(jìn)行解耦,或者異步來(lái)執(zhí)行,所以我們可以采用事件驅(qū)動(dòng)來(lái)實(shí)現(xiàn)。

// QuestionEvent
public class QuestionEvent extends ApplicationEvent {
    private String question;
    public String getQuestion() {
        return question;
    }
    public void setQuestion(String question) {
        this.question = question;
    }
    public QuestionEvent(String question) {
        super(question);
        this.question = question;
    }
}

// QuestionListener
@Component("questionListener")
public class QuestionListener implements ApplicationListener<QuestionEvent> {
    public void onApplicationEvent(QuestionEvent questionEvent) {
        System.out.println("index question : " + questionEvent.getQuestion());
    }
}

// EventPublisher
@Component(value = "eventPublisher")
public class EventPublisher implements ApplicationEventPublisherAware {

    private ApplicationEventPublisher applicationEventPublisher;

    public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
        this.applicationEventPublisher = applicationEventPublisher;
    }

    public ApplicationEventPublisher getApplicationEventPublisher() {
        return applicationEventPublisher;
    }
}

// EventTest
public class EventTest extends BaseTestNG {

    @Resource
    private ApplicationEventPublisher applicationEventPublisher;

    @Test
    public void testEvent() {
        String question = "這是一個(gè)問(wèn)題";
        applicationEventPublisher.publishEvent(new QuestionEvent(question));
    }
}

輸出:
index question : 這是一個(gè)問(wèn)題

4. 源碼分析:

在分析源碼之前,我們首先要明白Event-Listener模式實(shí)際就是觀察者模式。如果看過(guò)之前寫(xiě)過(guò)的觀察者的模式的文章,將會(huì)對(duì)源碼的內(nèi)容較容易明白。
首先是Listener的注冊(cè):
我們定義了Listener(實(shí)現(xiàn)了ApplicationListener),那又是什么時(shí)候被注冊(cè)到BeanFactory中呢,如果了解Spring的Bean的生命周期的話,會(huì)知道BeanPostProcess是在AbstractApplicationContext的refresh時(shí)候被實(shí)例化并初始化的,這個(gè)過(guò)程是要由于非lazy-init Bean的實(shí)例化過(guò)程的。而實(shí)際,Listener的注冊(cè)過(guò)程也是處在refresh的過(guò)程中的,我們來(lái)看AbstractApplicationContext#refresh的代碼:

@Override
    public void refresh() throws BeansException, IllegalStateException {
        synchronized (this.startupShutdownMonitor) {
            // Prepare this context for refreshing.
            prepareRefresh();

            // Tell the subclass to refresh the internal bean factory.
            ConfigurableListableBeanFactory beanFactory = obtainFreshBeanFactory();

            // Prepare the bean factory for use in this context.
            prepareBeanFactory(beanFactory);

            try {
                // Allows post-processing of the bean factory in context subclasses.
                postProcessBeanFactory(beanFactory);

                // Invoke factory processors registered as beans in the context.
                invokeBeanFactoryPostProcessors(beanFactory);

                // Register bean processors that intercept bean creation.
                registerBeanPostProcessors(beanFactory);

                // Initialize message source for this context.
                initMessageSource();

                // 初始化傳播器
                initApplicationEventMulticaster();

                // Initialize other special beans in specific context subclasses.
                onRefresh();

                // 注冊(cè)監(jiān)聽(tīng)器
                registerListeners();

                // Instantiate all remaining (non-lazy-init) singletons.
                finishBeanFactoryInitialization(beanFactory);

                // Last step: publish corresponding event.
                finishRefresh();
            }

            catch (BeansException ex) {
                if (logger.isWarnEnabled()) {
                    logger.warn("Exception encountered during context initialization - " +
                            "cancelling refresh attempt: " + ex);
                }

                // Destroy already created singletons to avoid dangling resources.
                destroyBeans();

                // Reset 'active' flag.
                cancelRefresh(ex);

                // Propagate exception to caller.
                throw ex;
            }

            finally {
                // Reset common introspection caches in Spring's core, since we
                // might not ever need metadata for singleton beans anymore...
                resetCommonCaches();
            }
        }
    }

上面代碼中,與Event有關(guān)的是兩個(gè)內(nèi)容,第一個(gè)是initApplicationEventMulticaster(),用于初始化事件傳播器,而第二個(gè)registerListeners(),便是注冊(cè)監(jiān)聽(tīng)器,實(shí)際就是掃描實(shí)現(xiàn)了ApplicationListener接口的類并注冊(cè)到事件傳播器。
我們來(lái)看InitApplicationEventMulticaster:

    protected void initApplicationEventMulticaster() {
        ConfigurableListableBeanFactory beanFactory = getBeanFactory();
        if (beanFactory.containsLocalBean(APPLICATION_EVENT_MULTICASTER_BEAN_NAME)) {
            this.applicationEventMulticaster =
                    beanFactory.getBean(APPLICATION_EVENT_MULTICASTER_BEAN_NAME, ApplicationEventMulticaster.class);
            if (logger.isDebugEnabled()) {
                logger.debug("Using ApplicationEventMulticaster [" + this.applicationEventMulticaster + "]");
            }
        }
        else {
            this.applicationEventMulticaster = new SimpleApplicationEventMulticaster(beanFactory);
            beanFactory.registerSingleton(APPLICATION_EVENT_MULTICASTER_BEAN_NAME, this.applicationEventMulticaster);
            if (logger.isDebugEnabled()) {
                logger.debug("Unable to locate ApplicationEventMulticaster with name '" +
                        APPLICATION_EVENT_MULTICASTER_BEAN_NAME +
                        "': using default [" + this.applicationEventMulticaster + "]");
            }
        }
    }

做的內(nèi)容就是一件事,如果容器中沒(méi)有name=“applicationEventMulticaster”的類,則認(rèn)為沒(méi)有自定義的ApplicationEventMulticaster,此時(shí)會(huì)自己new一個(gè)SimpleApplicationEventMulticaster。

那我們來(lái)看看這個(gè)默認(rèn)的廣播器的實(shí)現(xiàn)SimpleApplicationEventMulticaster,其中核心的方法是multicastEvent,從字面來(lái)看是多路廣播事件的意思:

public void multicastEvent(final ApplicationEvent event, @Nullable ResolvableType eventType) {
        ResolvableType type = (eventType != null ? eventType : resolveDefaultEventType(event));
        for (final ApplicationListener<?> listener : getApplicationListeners(event, type)) {
            Executor executor = getTaskExecutor();
            if (executor != null) {
                executor.execute(() -> invokeListener(listener, event));
            }
            else {
                invokeListener(listener, event);
            }
        }
    }

protected void invokeListener(ApplicationListener<?> listener, ApplicationEvent event) {
        ErrorHandler errorHandler = getErrorHandler();
        if (errorHandler != null) {
            try {
                doInvokeListener(listener, event);
            }
            catch (Throwable err) {
                errorHandler.handleError(err);
            }
        }
        else {
            doInvokeListener(listener, event);
        }
    }

    @SuppressWarnings({"unchecked", "rawtypes"})
    private void doInvokeListener(ApplicationListener listener, ApplicationEvent event) {
        try {
            listener.onApplicationEvent(event);
        }
        catch (ClassCastException ex) {
            String msg = ex.getMessage();
            if (msg == null || matchesClassCastMessage(msg, event.getClass().getName())) {
                // Possibly a lambda-defined listener which we could not resolve the generic event type for
                // -> let's suppress the exception and just log a debug message.
                Log logger = LogFactory.getLog(getClass());
                if (logger.isDebugEnabled()) {
                    logger.debug("Non-matching event type for listener: " + listener, ex);
                }
            }
            else {
                throw ex;
            }
        }
    }

可見(jiàn)multicastEvent是獲取對(duì)應(yīng)的Event的listener,并進(jìn)行調(diào)用的,也就是listener.onApplicationEvent(event)。在這里面有點(diǎn)需要注意的是在進(jìn)行調(diào)用的時(shí)候,我們會(huì)先去獲得Executor executor = getTaskExecutor();但是如果我們使用的是默認(rèn)的Executor的話,則實(shí)際executor = null,則此時(shí)喚醒的是同步的,如果我們希望采用多線程的方式,則實(shí)際需要配置自己事件傳播器,并通過(guò)setter的方式將executor配置進(jìn)去。
而關(guān)于SimpleApplicationEventMulticaster是如何將Listener配置進(jìn)去的,下面的代碼顯示了細(xì)節(jié):

public abstract class AbstractApplicationEventMulticaster
        implements ApplicationEventMulticaster, BeanClassLoaderAware, BeanFactoryAware {

    final Map<ListenerCacheKey, ListenerRetriever> retrieverCache = new ConcurrentHashMap<>(64);
...
private class ListenerRetriever {

        public final Set<ApplicationListener<?>> applicationListeners;

        public final Set<String> applicationListenerBeans;

        private final boolean preFiltered;

        public ListenerRetriever(boolean preFiltered) {
            this.applicationListeners = new LinkedHashSet<>();
            this.applicationListenerBeans = new LinkedHashSet<>();
            this.preFiltered = preFiltered;
        }

        public Collection<ApplicationListener<?>> getApplicationListeners() {
            LinkedList<ApplicationListener<?>> allListeners = new LinkedList<>();
            for (ApplicationListener<?> listener : this.applicationListeners) {
                allListeners.add(listener);
            }
            if (!this.applicationListenerBeans.isEmpty()) {
                BeanFactory beanFactory = getBeanFactory();
                for (String listenerBeanName : this.applicationListenerBeans) {
                    try {
                        ApplicationListener<?> listener = beanFactory.getBean(listenerBeanName, ApplicationListener.class);
                        if (this.preFiltered || !allListeners.contains(listener)) {
                            allListeners.add(listener);
                        }
                    }
                    catch (NoSuchBeanDefinitionException ex) {
                        // Singleton listener instance (without backing bean definition) disappeared -
                        // probably in the middle of the destruction phase
                    }
                }
            }
            AnnotationAwareOrderComparator.sort(allListeners);
            return allListeners;
        }
    }
...

}

在AbstractApplicationEventMulticaster中定義了一個(gè)內(nèi)部類ListenerRetriever,實(shí)際ListenerRetriever對(duì)應(yīng)的是同一類ApplicationEvent的事件監(jiān)聽(tīng)器,而通過(guò)了一個(gè)Map當(dāng)做緩存來(lái)取得相應(yīng)的Listener。
每次獲取的時(shí)候,會(huì)根據(jù)EventType和SourceType來(lái)生成相應(yīng)的ListenerCacheKey,從而獲得相應(yīng)的監(jiān)聽(tīng)器。

5. 定義有序的監(jiān)聽(tīng)器:

如果我們希望監(jiān)聽(tīng)器有序的話,實(shí)際只要實(shí)現(xiàn)SmartApplicationListener接口即可。

6. 參考文章:

詳解Spring事件驅(qū)動(dòng)模型
Spring源碼

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • Spring Cloud為開(kāi)發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見(jiàn)模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn),斷路器,智...
    卡卡羅2017閱讀 136,568評(píng)論 19 139
  • 事件驅(qū)動(dòng)模型簡(jiǎn)介 事件驅(qū)動(dòng)模型也就是我們常說(shuō)的觀察者,或者發(fā)布-訂閱模型;理解它的幾個(gè)關(guān)鍵點(diǎn): 1.首先是一種對(duì)象...
    algernoon閱讀 1,776評(píng)論 0 4
  • 前言 在微服務(wù)架構(gòu)的系統(tǒng)中,我們通常會(huì)使用輕量級(jí)的消息代理來(lái)構(gòu)建一個(gè)共用的消息主題讓系統(tǒng)中所有微服務(wù)實(shí)例都連接上來(lái)...
    Chandler_玨瑜閱讀 6,787評(píng)論 2 39
  • Spring Boot 參考指南 介紹 轉(zhuǎn)載自:https://www.gitbook.com/book/qbgb...
    毛宇鵬閱讀 47,273評(píng)論 6 342
  • http://liuxing.info/2017/06/30/Spring%20AMQP%E4%B8%AD%E6%...
    sherlock_6981閱讀 16,208評(píng)論 2 11

友情鏈接更多精彩內(nèi)容