如何實(shí)現(xiàn)一個(gè)簡(jiǎn)單的熔斷以及Hystrix原理分析

前言

隨著業(yè)務(wù)的越來(lái)越復(fù)雜,保證程序的健壯性對(duì)程序猿來(lái)說也變得更加的重要,畢竟不寫B(tài)ug的程序猿不是一個(gè)好的程序猿。但怎樣盡可能的保證咱們的程序能夠穩(wěn)定的運(yùn)行,以及出錯(cuò)后能夠進(jìn)行相應(yīng)的補(bǔ)償,這里就需要咱們使用熔斷機(jī)制了。

PS:在進(jìn)入正文之前,不妨思考一下兩個(gè)問題:
①熔斷機(jī)制究竟為我們解決了什么問題?
②我們?cè)鯓尤プ约簩?shí)現(xiàn)一個(gè)簡(jiǎn)單的熔斷?


自定義熔斷的實(shí)現(xiàn)

這里咱們簡(jiǎn)單的實(shí)現(xiàn)了一個(gè)超時(shí)后進(jìn)行熔斷的例子,這里有用到AspectJ的相關(guān)知識(shí),對(duì)于熟悉Spring AOP知識(shí)的同學(xué)應(yīng)該沒什么問題。

主要分為兩步:

  1. 使用Future控制是否超時(shí),超時(shí)后將任務(wù)cancel掉。
  2. 調(diào)用咱們自己定義好的fallback方法進(jìn)行處理。在這里需要注意的是,fallback方法參數(shù)應(yīng)該要與原方法相同,這樣咱們才能進(jìn)行補(bǔ)償措施。例如:咱們可以在fallback方法借助消息中間件將這些參數(shù)進(jìn)行存儲(chǔ),然后在適當(dāng)?shù)臅r(shí)候從消息中間件中讀取出來(lái)進(jìn)行補(bǔ)償消費(fèi)處理。
@RestController
public class HelloController {
    private Random random = new Random();

    @MyHystrixCommand(fallback="errorMethod")
    @RequestMapping("/hello")
    public String hello(@RequestParam("name") String message) throws InterruptedException {
        int time = random.nextInt(200);
        System.out.println("spend time : " + time + "ms");
        Thread.sleep(time);
        System.out.println("hhhhhhhhhhhhhhhhhhhhhhhhh");
        return "hello world:" + message;
    }

    public String errorMethod(String message) {
        return "error message";
    }
}
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface MyHystrixCommand {
    int value() default 100;
    String fallback() default "";
}
@Aspect
@Component
public class MyHystrixCommandAspect {

    ExecutorService executor = Executors.newFixedThreadPool(10);

    @Pointcut(value = "@annotation(MyHystrixCommand)")
    public void pointCut() {

    }

    @Around(value = "pointCut()&&@annotation(hystrixCommand)")
    public Object doPointCut(ProceedingJoinPoint joinPoint, MyHystrixCommand hystrixCommand) throws Throwable {
        int timeout = hystrixCommand.value();
        Future future = executor.submit(() -> {
            try {
                return joinPoint.proceed();
            } catch (Throwable throwable) {
            }
            return null;
        });
        Object returnValue = null;
        try {
            returnValue = future.get(timeout, TimeUnit.MILLISECONDS);
        } catch (InterruptedException | ExecutionException | TimeoutException e) {
            future.cancel(true);
            if (StringUtils.isBlank(hystrixCommand.fallback())){
                throw new Exception("fallback is null");
            }
            returnValue = invokeFallbackMethod(joinPoint, hystrixCommand.fallback());
        }
        return returnValue;
    }

    private Object invokeFallbackMethod(ProceedingJoinPoint joinPoint, String fallback) throws Exception {
        Method method = findFallbackMethod(joinPoint, fallback);
        if (method == null) {
            throw new Exception("can not find fallback :" + fallback + " method");
        } else {
            method.setAccessible(true);
            try {
                Object invoke = method.invoke(joinPoint.getTarget(), joinPoint.getArgs());
                return invoke;
            } catch (IllegalAccessException | InvocationTargetException e) {
                throw e;
            }
        }
    }


    private Method findFallbackMethod(ProceedingJoinPoint joinPoint, String fallbackMethodName) {
        Signature signature = joinPoint.getSignature();
        MethodSignature methodSignature = (MethodSignature) signature;
        Method method = methodSignature.getMethod();
        Class<?>[] parameterTypes = method.getParameterTypes();
        Method fallbackMethod = null;
        try {
        //這里通過判斷必須取和原方法一樣參數(shù)的fallback方法
            fallbackMethod = joinPoint.getTarget().getClass().getMethod(fallbackMethodName, parameterTypes);
        } catch (NoSuchMethodException e) {
        }
        return fallbackMethod;
    }

}

當(dāng)然,上述例子只是一個(gè)簡(jiǎn)單的超時(shí)后熔斷處理的實(shí)現(xiàn)方式。咱們?cè)趯?shí)際應(yīng)用中,還有可能并發(fā)超過指定閾值后咱們也需要進(jìn)行降級(jí)處理,一個(gè)最普通的場(chǎng)景:秒殺案例。這些東西在Hystrix中都有相應(yīng)的處理,它提供了線程池和信號(hào)量這兩種方式去解決并發(fā)的問題。


什么是Hystrix?

咱們看一下官方介紹

In a distributed environment, inevitably some of the many service dependencies will fail. Hystrix is a library that helps you control the interactions between these distributed services by adding latency tolerance and fault tolerance logic. Hystrix does this by isolating points of access between the services, stopping cascading failures across them, and providing fallback options, all of which improve your system’s overall resiliency.

在分布式環(huán)境中,調(diào)用一些服務(wù)不可避免的會(huì)出現(xiàn)失敗,Hystrix幫助咱們添加了一些容忍策略,并且將服務(wù)進(jìn)行隔離處理,防止一個(gè)服務(wù)的失敗影響到了另一個(gè)服務(wù)的調(diào)用,這些都提高了咱們系統(tǒng)的彈性。


Hystrix的處理流程

這里咱們結(jié)合一下Spring Cloud Hystrix進(jìn)行說明,從HystrixCommandAspect開始分析:

@Around("hystrixCommandAnnotationPointcut() || hystrixCollapserAnnotationPointcut()")
    public Object methodsAnnotatedWithHystrixCommand(final ProceedingJoinPoint joinPoint) throws Throwable {
        Method method = getMethodFromTarget(joinPoint);
        ...
        MetaHolder metaHolder = metaHolderFactory.create(joinPoint);//第一步
        HystrixInvokable invokable = HystrixCommandFactory.getInstance().create(metaHolder);//第二步
        ...
        Object result;
        try {
            //第三步
            if (!metaHolder.isObservable()) {
                result = CommandExecutor.execute(invokable, executionType, metaHolder);
            } else {
                result = executeObservable(invokable, executionType, metaHolder);
            }
        } 
        ....
        return result;
    }

這個(gè)切面主要針對(duì)HystrixCommandHystrixCollapser這兩個(gè)注解,前者用于進(jìn)行熔斷降級(jí)處理,后者用來(lái)根據(jù)配置進(jìn)行合并請(qǐng)求(類比數(shù)據(jù)庫(kù)操作,將多個(gè)insert語(yǔ)句合并成一個(gè)insert batch語(yǔ)句)。咱們側(cè)重進(jìn)行HystrixCommand這一塊的分析。

第一步:獲取元數(shù)據(jù)(MetaHolder)

這段代碼對(duì)應(yīng)上面的MetaHolder metaHolder = metaHolderFactory.create(joinPoint);,里面封裝了比如調(diào)用方法method,參數(shù)args,方法所屬對(duì)象target,動(dòng)態(tài)代理對(duì)象proxy,回調(diào)方法fallbackMethod等等一些元數(shù)據(jù)的封裝。這些數(shù)據(jù)在創(chuàng)建命令對(duì)象時(shí)會(huì)被使用。

第二步:獲取調(diào)用者(HystrixInvokable)

它持有一個(gè)命令對(duì)象,并且可以在合適的時(shí)候通過這個(gè)命令對(duì)象完成具體的業(yè)務(wù)邏輯,針對(duì)HystrixCommand上述的命令對(duì)象就是GenericObservableCommandGenericCommand的一種,這里命令對(duì)象的選擇和方法的返回值有關(guān),如果返回值為Observable類型,則創(chuàng)建GenericObservableCommand命令,否則創(chuàng)建GenericCommand命令。

第三步:執(zhí)行命令(execute)
    public static Object execute(HystrixInvokable invokable, ExecutionType executionType, MetaHolder metaHolder) throws RuntimeException {
        ...
        switch (executionType) {
            case SYNCHRONOUS: {
                return castToExecutable(invokable, executionType).execute();
            }
            case ASYNCHRONOUS: {
                HystrixExecutable executable = castToExecutable(invokable, executionType);
                if (metaHolder.hasFallbackMethodCommand()
                        && ExecutionType.ASYNCHRONOUS == metaHolder.getFallbackExecutionType()) {
                    return new FutureDecorator(executable.queue());
                }
                return executable.queue();
            }
            case OBSERVABLE: {
                HystrixObservable observable = castToObservable(invokable);
                return ObservableExecutionMode.EAGER == metaHolder.getObservableExecutionMode() ? observable.observe() : observable.toObservable();
            }
            ...
        }
    }

從上面的代碼段中,可以很容易的看出共有三種策略,同步、異步、OBSERVABLE,而Observable又分為Cold Observable(observable.toObservable())Hot Observable(observable.observe())。所以說總共有四種執(zhí)行方式。但是底層都會(huì)調(diào)用到AbstractCommand.toObservable()方法。

  • execute():同步執(zhí)行,返回一個(gè)單一的對(duì)象結(jié)果,發(fā)生錯(cuò)誤時(shí)拋出異常。
  • queue():異步執(zhí)行,返回一個(gè)Future對(duì)象,包含著執(zhí)行結(jié)束后返回的單一結(jié)果。
  • observe():這個(gè)方法返回一個(gè)Observable對(duì)象,它代表操作的多個(gè)結(jié)果,但是已經(jīng)被訂閱者消費(fèi)掉了。
  • toObservable():這個(gè)方法返回一個(gè)Observable對(duì)象,它代表操作的多個(gè)結(jié)果,需要咱們自己手動(dòng)訂閱并消費(fèi)掉。

在執(zhí)行邏輯中,大量用到了RxJava,各種回調(diào)處理,看的著實(shí)頭暈,感興趣的同學(xué)可以自行閱讀源碼,我這里只是介紹一些關(guān)鍵的流程點(diǎn)。

①首先會(huì)檢查是否命中緩存(toObservable方法中),命中緩存則直接返回:

/* try from cache first */
 if (requestCacheEnabled) {
      HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.get(cacheKey);
       if (fromCache != null) {
           isResponseFromCache = true;
           return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
        }
}

②檢查斷路器是否打開,如果斷路器打開,則通過handleShortCircuitViaFallback直接進(jìn)行fallback處理:

private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {
        executionHook.onStart(_cmd);

        /* determine if we're allowed to execute */
        if (circuitBreaker.allowRequest()) {
        }else {
            return handleShortCircuitViaFallback();
        }
        ...
}

③檢查是否用了信號(hào)量,如果用了,則判斷是否被占滿,占滿后則拋出異常,通過handleSemaphoreRejectionViaFallback直接轉(zhuǎn)到fallback中進(jìn)行執(zhí)行,不執(zhí)行后面的邏輯。如果沒用,則會(huì)返回一個(gè)默認(rèn)的TryableSemaphoreNoOp.DEFAULT,在進(jìn)行executionSemaphore.tryAcquire()時(shí)始終返回true。

if (executionSemaphore.tryAcquire()) {
  try {
    /* used to track userThreadExecutionTime */
    executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis());
    return executeCommandAndObserve(_cmd)
            .doOnError(markExceptionThrown)
            .doOnTerminate(singleSemaphoreRelease)
            .doOnUnsubscribe(singleSemaphoreRelease);
    } catch (RuntimeException e) {
        return Observable.error(e);
    }
} else {
    return handleSemaphoreRejectionViaFallback();
}

④執(zhí)行命令中的邏輯

通過重寫AbstractCommand中的getExecutionObservable()方法使得下面兩個(gè)命令類中的相應(yīng)邏輯被調(diào)用。

  • GenericCommand中的run()方法
  • GenericObservableCommand中的construct()方法

如果run或者construct中設(shè)置了超時(shí)時(shí)間,如果執(zhí)行時(shí)間超過了閾值,則會(huì)拋出TimeoutException,或者在執(zhí)行過程中拋出其他異常,都會(huì)進(jìn)入fallback中進(jìn)行處理邏輯。

⑤發(fā)生異常后執(zhí)行fallback

   private Observable<R> getFallbackOrThrowException(final AbstractCommand<R> _cmd, 
         final HystrixEventType eventType,
         final FailureType failureType, 
         final String message,
         final Exception originalException) {
}

最終都會(huì)調(diào)用到這個(gè)方法,咱們看看FailureType具體有哪幾種類型。

  • COMMAND_EXCEPTION:執(zhí)行run方法或者construct方法拋出異常時(shí)。
  • TIMEOUT:超時(shí)情況下。
  • SHORTCIRCUIT:斷路器直接打開時(shí),直接執(zhí)行handleShortCircuitViaFallback方法。
  • REJECTED_THREAD_EXECUTION:線程池、請(qǐng)求隊(duì)列被占滿的情況下。
  • REJECTED_SEMAPHORE_EXECUTION:信號(hào)量占滿情況下。
  • BAD_REQUEST_EXCEPTION:
  • REJECTED_SEMAPHORE_FALLBACK:

總結(jié)

Hystrix中大量用了RxJava,閱讀源碼看起來(lái)不免會(huì)覺得頭暈,可以考慮在關(guān)鍵點(diǎn)打幾個(gè)斷點(diǎn)看看,不然各種回調(diào)會(huì)讓你繞圈圈。不過個(gè)人覺得RxJava代碼看起來(lái)還是蠻優(yōu)美的,只不過有些許不適應(yīng)而已,后面有時(shí)間會(huì)研究一下RxJava。


END

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容