Spring @Async 的使用與實現(xiàn)

首先Spring AOP有兩個重要的基礎接口,Advisor和PointcutAdvisor,接口聲明如下:

Advisor接口聲明:

public interface Advisor {
    Advice getAdvice();
    boolean isPerInstance();

}



PointcutAdvisor的接口聲明:

public interface PointcutAdvisor extends Advisor {

    /**
     * Get the Pointcut that drives this advisor.
     */
    Pointcut getPointcut();

}

PointcutAdvisor用來獲取一個切點以及這個切點的處理器(Advise)。

@Async注解使用后置處理器BeanPostProcessor的子類AsyncAnnotationBeanPostProcessor來實現(xiàn)bean處理 :

AsyncAnnotationAdvisor繼承了PointcutAdvisor接口。并且在AsyncAnnotationBeanPostProcessor實現(xiàn)了其父類接口的BeanFactoryAware中的setBeanFactory初始化。Spring一旦創(chuàng)建beanFactory回調成功,就會回調這個方法。保證Advisor對象最先被初始化。

    @Override
    public void setBeanFactory(BeanFactory beanFactory) {
        super.setBeanFactory(beanFactory);

        AsyncAnnotationAdvisor advisor = new AsyncAnnotationAdvisor(this.executor, this.exceptionHandler);
        if (this.asyncAnnotationType != null) {
            advisor.setAsyncAnnotationType(this.asyncAnnotationType);
        }
        advisor.setBeanFactory(beanFactory);
        this.advisor = advisor;
    }

}

具體的后置處理是通過AsyncAnnotationBeanPostProcessor的后置bean處理是通過其父類AbstractAdvisingBeanPostProcessor來實現(xiàn)的。AbstractAdvisingBeanPostProcessor提供的后置bean處理方法對所有的自定義注解的bean處理方法時通用的。其具體的代碼如下:


@Override
    public Object postProcessAfterInitialization(Object bean, String beanName) {
        if (bean instanceof AopInfrastructureBean) {
            // Ignore AOP infrastructure such as scoped proxies.
            return bean;
        }
      
      /*
      
      * bean對象如果是一個ProxyFactory對象。ProxyFactory繼承了AdvisedSupport,而       AdvisedSupport又繼承了Advised接口。這個時候就把不同的Advisor添加起來。
      *
        if (bean instanceof Advised) {
            Advised advised = (Advised) bean;
            if (!advised.isFrozen() && isEligible(AopUtils.getTargetClass(bean))) {
                // Add our local Advisor to the existing proxy's Advisor chain...
                if (this.beforeExistingAdvisors) {
                    advised.addAdvisor(0, this.advisor);
                }
                else {
                    advised.addAdvisor(this.advisor);
                }
                return bean;
            }
        }

        if (isEligible(bean, beanName)) {
            ProxyFactory proxyFactory = prepareProxyFactory(bean, beanName);
            if (!proxyFactory.isProxyTargetClass()) {
                evaluateProxyInterfaces(bean.getClass(), proxyFactory);
            }
            proxyFactory.addAdvisor(this.advisor);
            customizeProxyFactory(proxyFactory);
            return proxyFactory.getProxy(getProxyClassLoader());
        }

可以看得出來,isEligible用于判斷這個類或者這個類中的某個方法是否含有注解。這個方法最終進入到AopUtils的canApply方法中間:

public static boolean canApply(Advisor advisor, Class<?> targetClass, boolean hasIntroductions) {
        if (advisor instanceof IntroductionAdvisor) {
            return ((IntroductionAdvisor) advisor).getClassFilter().matches(targetClass);
        }
        else if (advisor instanceof PointcutAdvisor) {
            PointcutAdvisor pca = (PointcutAdvisor) advisor;
            return canApply(pca.getPointcut(), targetClass, hasIntroductions);
        }
        else {
            // It doesn't have a pointcut so we assume it applies.
            return true;
        }
    }

這里的advisor就是AsyncAnnotationAdvisor對象。然后調用AsyncAnnotationAdvisor對象的getPointcut()方法,得到了Pointcut對象。在AOP規(guī)范中間,表示一個具體的切點。那么在方法上注釋@Async注解,就意味著聲明了一個切點。

然后再根據(jù)Pointcut判斷是否含有指定的注解。

切點的執(zhí)行

由于生成了JDK動態(tài)代理對象,那么每一個方法的執(zhí)行必然進入到JdkDynamicAopProxy中的invoke方法中間去執(zhí)行:

@Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        MethodInvocation invocation;
        Object oldProxy = null;
        boolean setProxyContext = false;

        TargetSource targetSource = this.advised.targetSource;
        Class<?> targetClass = null;
        Object target = null;

        try {
            if (!this.equalsDefined && AopUtils.isEqualsMethod(method)) {
                // The target does not implement the equals(Object) method itself.
                return equals(args[0]);
            }
            else if (!this.hashCodeDefined && AopUtils.isHashCodeMethod(method)) {
                // The target does not implement the hashCode() method itself.
                return hashCode();
            }
            else if (method.getDeclaringClass() == DecoratingProxy.class) {
                // There is only getDecoratedClass() declared -> dispatch to proxy config.
                return AopProxyUtils.ultimateTargetClass(this.advised);
            }
            else if (!this.advised.opaque && method.getDeclaringClass().isInterface() &&
                    method.getDeclaringClass().isAssignableFrom(Advised.class)) {
                // Service invocations on ProxyConfig with the proxy config...
                return AopUtils.invokeJoinpointUsingReflection(this.advised, method, args);
            }

            Object retVal;

            if (this.advised.exposeProxy) {
                // Make invocation available if necessary.
                oldProxy = AopContext.setCurrentProxy(proxy);
                setProxyContext = true;
            }

            // May be null. Get as late as possible to minimize the time we "own" the target,
            // in case it comes from a pool.
            target = targetSource.getTarget();
            if (target != null) {
                targetClass = target.getClass();
            }

            // Get the interception chain for this method.
            List<Object> chain = this.advised.getInterceptorsAndDynamicInterceptionAdvice(method, targetClass);

            // Check whether we have any advice. If we don't, we can fallback on direct
            // reflective invocation of the target, and avoid creating a MethodInvocation.
            if (chain.isEmpty()) {
                // We can skip creating a MethodInvocation: just invoke the target directly
                // Note that the final invoker must be an InvokerInterceptor so we know it does
                // nothing but a reflective operation on the target, and no hot swapping or fancy proxying.
                Object[] argsToUse = AopProxyUtils.adaptArgumentsIfNecessary(method, args);
                retVal = AopUtils.invokeJoinpointUsingReflection(target, method, argsToUse);
            }
            else {
                // We need to create a method invocation...
                invocation = new ReflectiveMethodInvocation(proxy, target, method, args, targetClass, chain);
                // Proceed to the joinpoint through the interceptor chain.
                retVal = invocation.proceed();
            }

            // Massage return value if necessary.
            Class<?> returnType = method.getReturnType();
            if (retVal != null && retVal == target && returnType.isInstance(proxy) &&
                    !RawTargetAccess.class.isAssignableFrom(method.getDeclaringClass())) {
                // Special case: it returned "this" and the return type of the method
                // is type-compatible. Note that we can't help if the target sets
                // a reference to itself in another returned object.
                retVal = proxy;
            }
            else if (retVal == null && returnType != Void.TYPE && returnType.isPrimitive()) {
                throw new AopInvocationException(
                        "Null return value from advice does not match primitive return type for: " + method);
            }
            return retVal;
        }
        finally {
            if (target != null && !targetSource.isStatic()) {
                // Must have come from TargetSource.
                targetSource.releaseTarget(target);
            }
            if (setProxyContext) {
                // Restore old proxy.
                AopContext.setCurrentProxy(oldProxy);
            }
        }
    }

重點的執(zhí)行語句:

// 獲取攔截器
            List<Object> chain = this.advised.getInterceptorsAndDynamicInterceptionAdvice(method, targetClass);

            // Check whether we have any advice. If we don't, we can fallback on direct
            // reflective invocation of the target, and avoid creating a MethodInvocation.
            if (chain.isEmpty()) {
                // We can skip creating a MethodInvocation: just invoke the target directly
                // Note that the final invoker must be an InvokerInterceptor so we know it does
                // nothing but a reflective operation on the target, and no hot swapping or fancy proxying.
                Object[] argsToUse = AopProxyUtils.adaptArgumentsIfNecessary(method, args);
                retVal = AopUtils.invokeJoinpointUsingReflection(target, method, argsToUse);
            }
            else {
            
                // 根據(jù)攔截器來執(zhí)行
                invocation = new ReflectiveMethodInvocation(proxy, target, method, args, targetClass, chain);
                // Proceed to the joinpoint through the interceptor chain.
                retVal = invocation.proceed();
            }
            

@Async注解的攔截器是AsyncExecutionInterceptor,它繼承了MethodInterceptor接口。而MethodInterceptor就是AOP規(guī)范中的Advice(切點的處理器)。

自定義注解

由于其bean處理器是通用的,所以只要實現(xiàn)PointcutAdvisor和具體的處理器就好了。首先自定義一個注解,只要方法加入了這個注解,就可以輸出這個方法的開始時間和截止時間,注解的名字叫做@Log:

@Target({ElementType.METHOD, ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface Log {

}

定義一個簡單的方法用于測試:

public interface IDemoService {

    void add(int a, int b);
    String getName();
}
@Service
public class DemoServiceImpl implements  IDemoService {

    
    @Log
    
    public void add(int a, int b) {
        System.out.println(Thread.currentThread().getName());
        System.out.println(a + b);

    }

    @Override
    public String getName() {
        System.out.println("DemoServiceImpl.getName");
        return "DemoServiceImpl";
    }

}

定義Advisor:

public class LogAnnotationAdvisor extends AbstractPointcutAdvisor  {

    private Advice advice;

    private Pointcut pointcut;

    public LogAnnotationAdvisor() {

        this.advice = new LogAnnotationInterceptor();
    }


    @Override
    public Advice getAdvice() {

        return this.advice;
    }

    @Override
    public boolean isPerInstance() {

        return false;
    }

    @Override
    public Pointcut getPointcut() {

        return this.pointcut;
    }

    public void setAsyncAnnotationType(Class<? extends Annotation> asyncAnnotationType) {
        Assert.notNull(asyncAnnotationType, "'asyncAnnotationType' must not be null");
        Set<Class<? extends Annotation>> asyncAnnotationTypes = new HashSet<Class<? extends Annotation>>();
        asyncAnnotationTypes.add(asyncAnnotationType);
        this.pointcut = buildPointcut(asyncAnnotationTypes);
    }

    protected Pointcut buildPointcut(Set<Class<? extends Annotation>> asyncAnnotationTypes) {
        ComposablePointcut result = null;
        for (Class<? extends Annotation> asyncAnnotationType : asyncAnnotationTypes) {
            Pointcut cpc = new AnnotationMatchingPointcut(asyncAnnotationType, true);
            Pointcut mpc = AnnotationMatchingPointcut.forMethodAnnotation(asyncAnnotationType);
            if (result == null) {
                result = new ComposablePointcut(cpc).union(mpc);
            } else {
                result.union(cpc).union(mpc);
            }
        }
        return result;
    }

}

定義具體的處理器:

public class LogAnnotationInterceptor implements MethodInterceptor, Ordered {

    @Override
    public int getOrder() {

        return Ordered.HIGHEST_PRECEDENCE;
    }

    @Override
    public Object invoke(MethodInvocation invocation) throws Throwable {
        System.out.println("開始執(zhí)行");
        Object result = invocation.proceed();
        System.out.println("結束執(zhí)行");
        return result;
    }

}

定義@Log專屬的BeanPostProcesser對象:

@SuppressWarnings("serial")
@Service
public class LogAnnotationBeanPostProcesser extends AbstractBeanFactoryAwareAdvisingPostProcessor {

    @Override
    public void setBeanFactory(BeanFactory beanFactory) {
        super.setBeanFactory(beanFactory);
        LogAnnotationAdvisor advisor = new LogAnnotationAdvisor();
        advisor.setAsyncAnnotationType(Log.class);
        this.advisor = advisor;
    }



}

對bean的后置處理方法直接沿用其父類的方法。當然也可以自定義其后置處理方法,那么就需要自己判斷這個對象的方法是否含有注解,并且生成代理對象:

@Override
    public Object postProcessAfterInitialization(Object bean, String beanName) {

        Method[] methods = ReflectionUtils.getAllDeclaredMethods(bean.getClass());
        for (Method method : methods) {
            if (method.isAnnotationPresent(Log.class)) {
                ProxyFactory proxyFactory = prepareProxyFactory(bean, beanName);
                System.out.println(proxyFactory);
                if (!proxyFactory.isProxyTargetClass()) {
                    evaluateProxyInterfaces(bean.getClass(), proxyFactory);
                }
                proxyFactory.addAdvisor(this.advisor);    
                customizeProxyFactory(proxyFactory);
                return proxyFactory.getProxy(getProxyClassLoader());
            }
        }
        return bean;

    }

測試注解是否是正常運行的:

public class Main {
    public static void main(String[] args) {
        @SuppressWarnings("resource")
        ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("application-context.xml"); 
         IDemoService demoService = context.getBean(IDemoService.class);     
         demoService.add(1, 2);
         demoService.getName();
////         AsyncAnnotationAdvisor
//       AsyncAnnotationBeanPostProcessor
         
        
    }
}

輸出:

開始執(zhí)行
main
3
結束執(zhí)行
DemoServiceImpl.getName

功能一切正常。

很顯然,這是一個最原始的做法,好處是不需要聲明XML注解,還有一種方式,就是需要申明XML的方式:org.springframework.aop.framework.autoproxy.BeanNameAutoProxyCreator可以快速幫助實現(xiàn)一個注解。

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

  • Spring Cloud為開發(fā)人員提供了快速構建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務發(fā)現(xiàn),斷路器,智...
    卡卡羅2017閱讀 136,544評論 19 139
  • 什么是Spring Spring是一個開源的Java EE開發(fā)框架。Spring框架的核心功能可以應用在任何Jav...
    jemmm閱讀 16,771評論 1 133
  • 本章內容: 面向切面編程的基本原理 通過POJO創(chuàng)建切面 使用@AspectJ注解 為AspectJ切面注入依賴 ...
    謝隨安閱讀 3,413評論 0 9
  • Spring Boot 參考指南 介紹 轉載自:https://www.gitbook.com/book/qbgb...
    毛宇鵬閱讀 47,265評論 6 342
  • 故宮是皇朝的遺跡,歷史的影像,自然是不宜在喧鬧的環(huán)境中觀品的,因為喧鬧就像思維的鐵柵欄,會阻止你“進入”。 因此,...
    美文散文分享閱讀 410評論 0 0

友情鏈接更多精彩內容