聊聊HystrixConcurrencyStrategy

本文主要研究一下HystrixConcurrencyStrategy

HystrixConcurrencyStrategy

hystrix-core-1.5.12-sources.jar!/com/netflix/hystrix/strategy/concurrency/HystrixConcurrencyStrategy.java

/**
 * Abstract class for defining different behavior or implementations for concurrency related aspects of the system with default implementations.
 * <p>
 * For example, every {@link Callable} executed by {@link HystrixCommand} will call {@link #wrapCallable(Callable)} to give a chance for custom implementations to decorate the {@link Callable} with
 * additional behavior.
 * <p>
 * When you implement a concrete {@link HystrixConcurrencyStrategy}, you should make the strategy idempotent w.r.t ThreadLocals.
 * Since the usage of threads by Hystrix is internal, Hystrix does not attempt to apply the strategy in an idempotent way.
 * Instead, you should write your strategy to work idempotently.  See https://github.com/Netflix/Hystrix/issues/351 for a more detailed discussion.
 * <p>
 * See {@link HystrixPlugins} or the Hystrix GitHub Wiki for information on configuring plugins: <a
 * >https://github.com/Netflix/Hystrix/wiki/Plugins</a>.
 */
public abstract class HystrixConcurrencyStrategy {

    private final static Logger logger = LoggerFactory.getLogger(HystrixConcurrencyStrategy.class);

    /**
     * Factory method to provide {@link ThreadPoolExecutor} instances as desired.
     * <p>
     * Note that the corePoolSize, maximumPoolSize and keepAliveTime values will be dynamically set during runtime if their values change using the {@link ThreadPoolExecutor#setCorePoolSize},
     * {@link ThreadPoolExecutor#setMaximumPoolSize} and {@link ThreadPoolExecutor#setKeepAliveTime} methods.
     * <p>
     * <b>Default Implementation</b>
     * <p>
     * Implementation using standard java.util.concurrent.ThreadPoolExecutor
     * 
     * @param threadPoolKey
     *            {@link HystrixThreadPoolKey} representing the {@link HystrixThreadPool} that this {@link ThreadPoolExecutor} will be used for.
     * @param corePoolSize
     *            Core number of threads requested via properties (or system default if no properties set).
     * @param maximumPoolSize
     *            Max number of threads requested via properties (or system default if no properties set).
     * @param keepAliveTime
     *            Keep-alive time for threads requested via properties (or system default if no properties set).
     * @param unit
     *            {@link TimeUnit} corresponding with keepAliveTime
     * @param workQueue
     *            {@code BlockingQueue<Runnable>} as provided by {@link #getBlockingQueue(int)}
     * @return instance of {@link ThreadPoolExecutor}
     */
    public ThreadPoolExecutor getThreadPool(final HystrixThreadPoolKey threadPoolKey, HystrixProperty<Integer> corePoolSize, HystrixProperty<Integer> maximumPoolSize, HystrixProperty<Integer> keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        //......
    }

    public ThreadPoolExecutor getThreadPool(final HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties threadPoolProperties) {
        //......
    }

    /**
     * Factory method to provide instance of {@code BlockingQueue<Runnable>} used for each {@link ThreadPoolExecutor} as constructed in {@link #getThreadPool}.
     * <p>
     * Note: The maxQueueSize value is provided so any type of queue can be used but typically an implementation such as {@link SynchronousQueue} without a queue (just a handoff) is preferred as
     * queueing is an anti-pattern to be purposefully avoided for latency tolerance reasons.
     * <p>
     * <b>Default Implementation</b>
     * <p>
     * Implementation returns {@link SynchronousQueue} when maxQueueSize <= 0 or {@link LinkedBlockingQueue} when maxQueueSize > 0.
     * 
     * @param maxQueueSize
     *            The max size of the queue requested via properties (or system default if no properties set).
     * @return instance of {@code BlockingQueue<Runnable>}
     */
    public BlockingQueue<Runnable> getBlockingQueue(int maxQueueSize) {
        //......
    }

    /**
     * Provides an opportunity to wrap/decorate a {@code Callable<T>} before execution.
     * <p>
     * This can be used to inject additional behavior such as copying of thread state (such as {@link ThreadLocal}).
     * <p>
     * <b>Default Implementation</b>
     * <p>
     * Pass-thru that does no wrapping.
     * 
     * @param callable
     *            {@code Callable<T>} to be executed via a {@link ThreadPoolExecutor}
     * @return {@code Callable<T>} either as a pass-thru or wrapping the one given
     */
    public <T> Callable<T> wrapCallable(Callable<T> callable) {
        return callable;
    }

    /**
     * Factory method to return an implementation of {@link HystrixRequestVariable} that behaves like a {@link ThreadLocal} except that it
     * is scoped to a request instead of a thread.
     * <p>
     * For example, if a request starts with an HTTP request and ends with the HTTP response, then {@link HystrixRequestVariable} should
     * be initialized at the beginning, available on any and all threads spawned during the request and then cleaned up once the HTTP request is completed.
     * <p>
     * If this method is implemented it is generally necessary to also implemented {@link #wrapCallable(Callable)} in order to copy state
     * from parent to child thread.
     * 
     * @param rv
     *            {@link HystrixRequestVariableLifecycle} with lifecycle implementations from Hystrix
     * @return {@code HystrixRequestVariable<T>}
     */
    public <T> HystrixRequestVariable<T> getRequestVariable(final HystrixRequestVariableLifecycle<T> rv) {
        return new HystrixLifecycleForwardingRequestVariable<T>(rv);
    }
    
}
  • 這個類主要提供了一些方法允許自定義線程隔離的一些配置
  • getThreadPool()以及getBlockingQueue()方法,用于自定義線程池及其隊列
  • wrapCallable()允許你去修飾Callable,比如做些上下文數(shù)據(jù)傳遞
  • getRequestVariable()返回HystrixRequestVariable,類似ThreadLocal

getThreadPool

    /**
     * Factory method to provide {@link ThreadPoolExecutor} instances as desired.
     * <p>
     * Note that the corePoolSize, maximumPoolSize and keepAliveTime values will be dynamically set during runtime if their values change using the {@link ThreadPoolExecutor#setCorePoolSize},
     * {@link ThreadPoolExecutor#setMaximumPoolSize} and {@link ThreadPoolExecutor#setKeepAliveTime} methods.
     * <p>
     * <b>Default Implementation</b>
     * <p>
     * Implementation using standard java.util.concurrent.ThreadPoolExecutor
     * 
     * @param threadPoolKey
     *            {@link HystrixThreadPoolKey} representing the {@link HystrixThreadPool} that this {@link ThreadPoolExecutor} will be used for.
     * @param corePoolSize
     *            Core number of threads requested via properties (or system default if no properties set).
     * @param maximumPoolSize
     *            Max number of threads requested via properties (or system default if no properties set).
     * @param keepAliveTime
     *            Keep-alive time for threads requested via properties (or system default if no properties set).
     * @param unit
     *            {@link TimeUnit} corresponding with keepAliveTime
     * @param workQueue
     *            {@code BlockingQueue<Runnable>} as provided by {@link #getBlockingQueue(int)}
     * @return instance of {@link ThreadPoolExecutor}
     */
    public ThreadPoolExecutor getThreadPool(final HystrixThreadPoolKey threadPoolKey, HystrixProperty<Integer> corePoolSize, HystrixProperty<Integer> maximumPoolSize, HystrixProperty<Integer> keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        final ThreadFactory threadFactory = getThreadFactory(threadPoolKey);

        final int dynamicCoreSize = corePoolSize.get();
        final int dynamicMaximumSize = maximumPoolSize.get();

        if (dynamicCoreSize > dynamicMaximumSize) {
            logger.error("Hystrix ThreadPool configuration at startup for : " + threadPoolKey.name() + " is trying to set coreSize = " +
                    dynamicCoreSize + " and maximumSize = " + dynamicMaximumSize + ".  Maximum size will be set to " +
                    dynamicCoreSize + ", the coreSize value, since it must be equal to or greater than the coreSize value");
            return new ThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime.get(), unit, workQueue, threadFactory);
        } else {
            return new ThreadPoolExecutor(dynamicCoreSize, dynamicMaximumSize, keepAliveTime.get(), unit, workQueue, threadFactory);
        }
    }

    public ThreadPoolExecutor getThreadPool(final HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties threadPoolProperties) {
        final ThreadFactory threadFactory = getThreadFactory(threadPoolKey);

        final boolean allowMaximumSizeToDivergeFromCoreSize = threadPoolProperties.getAllowMaximumSizeToDivergeFromCoreSize().get();
        final int dynamicCoreSize = threadPoolProperties.coreSize().get();
        final int keepAliveTime = threadPoolProperties.keepAliveTimeMinutes().get();
        final int maxQueueSize = threadPoolProperties.maxQueueSize().get();
        final BlockingQueue<Runnable> workQueue = getBlockingQueue(maxQueueSize);

        if (allowMaximumSizeToDivergeFromCoreSize) {
            final int dynamicMaximumSize = threadPoolProperties.maximumSize().get();
            if (dynamicCoreSize > dynamicMaximumSize) {
                logger.error("Hystrix ThreadPool configuration at startup for : " + threadPoolKey.name() + " is trying to set coreSize = " +
                        dynamicCoreSize + " and maximumSize = " + dynamicMaximumSize + ".  Maximum size will be set to " +
                        dynamicCoreSize + ", the coreSize value, since it must be equal to or greater than the coreSize value");
                return new ThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory);
            } else {
                return new ThreadPoolExecutor(dynamicCoreSize, dynamicMaximumSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory);
            }
        } else {
            return new ThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory);
        }
    }

    private static ThreadFactory getThreadFactory(final HystrixThreadPoolKey threadPoolKey) {
        if (!PlatformSpecific.isAppEngineStandardEnvironment()) {
            return new ThreadFactory() {
                private final AtomicInteger threadNumber = new AtomicInteger(0);

                @Override
                public Thread newThread(Runnable r) {
                    Thread thread = new Thread(r, "hystrix-" + threadPoolKey.name() + "-" + threadNumber.incrementAndGet());
                    thread.setDaemon(true);
                    return thread;
                }

            };
        } else {
            return PlatformSpecific.getAppEngineThreadFactory();
        }
    }
  • 根據(jù)HystrixThreadPoolKey以及HystrixThreadPoolProperties構(gòu)建線程池
  • HystrixThreadPoolKey主要用來獲取ThreadFactory,自定義了線程池的名稱
  • HystrixThreadPoolProperties主要是dynamicCoreSize(corePoolSize)、dynamicMaximumSize(maximumPoolSize)、keepAliveTime、maxQueueSize這幾個參數(shù)

HystrixLifecycleForwardingRequestVariable

hystrix-core-1.5.12-sources.jar!/com/netflix/hystrix/strategy/concurrency/HystrixLifecycleForwardingRequestVariable.java

/**
 * Implementation of {@link HystrixRequestVariable} which forwards to the wrapped
 * {@link HystrixRequestVariableLifecycle}.
 * <p>
 * This implementation also returns null when {@link #get()} is called while the {@link HystrixRequestContext} has not
 * been initialized rather than throwing an exception, allowing for use in a {@link HystrixConcurrencyStrategy} which
 * does not depend on an a HystrixRequestContext
 */
public class HystrixLifecycleForwardingRequestVariable<T> extends HystrixRequestVariableDefault<T> {
    private final HystrixRequestVariableLifecycle<T> lifecycle;

    /**
     * Creates a HystrixRequestVariable which will return data as provided by the {@link HystrixRequestVariableLifecycle}
     * @param lifecycle lifecycle used to provide values. Must have the same type parameter as the constructed instance.
     */
    public HystrixLifecycleForwardingRequestVariable(HystrixRequestVariableLifecycle<T> lifecycle) {
        this.lifecycle = lifecycle;
    }

    /**
     * Delegates to the wrapped {@link HystrixRequestVariableLifecycle}
     * @return T with initial value or null if none.
     */
    @Override
    public T initialValue() {
        return lifecycle.initialValue();
    }

    /**
     * Delegates to the wrapped {@link HystrixRequestVariableLifecycle}
     * @param value
     *            of request variable to allow cleanup activity.
     *            <p>
     *            If nothing needs to be cleaned up then nothing needs to be done in this method.
     */
    @Override
    public void shutdown(T value) {
        lifecycle.shutdown(value);
    }

    /**
     * Return null if the {@link HystrixRequestContext} has not been initialized for the current thread.
     * <p>
     * If {@link HystrixRequestContext} has been initialized then call method in superclass:
     * {@link HystrixRequestVariableDefault#get()}
     */
    @Override
    public T get() {
        if (!HystrixRequestContext.isCurrentThreadInitialized()) {
            return null;
        }
        return super.get();
    }

}
  • 繼承了HystrixRequestVariableDefault類,然后調(diào)用HystrixRequestVariableLifecycle來進行初始化和銷毀
  • HystrixRequestVariableDefault主要是對HystrixRequestContext進行操作

HystrixConcurrencyStrategyDefault

hystrix-core-1.5.12-sources.jar!/com/netflix/hystrix/strategy/concurrency/HystrixConcurrencyStrategyDefault.java

/**
 * Default implementation of {@link HystrixConcurrencyStrategy} using standard java.util.concurrent.* implementations.
 * 
 * @ExcludeFromJavadoc
 */
public class HystrixConcurrencyStrategyDefault extends HystrixConcurrencyStrategy {

    private static HystrixConcurrencyStrategyDefault INSTANCE = new HystrixConcurrencyStrategyDefault();

    public static HystrixConcurrencyStrategy getInstance() {
        return INSTANCE;
    }

    private HystrixConcurrencyStrategyDefault() {
    }

}

默認(rèn)實現(xiàn)沒有重新任何方法,都是使用了父類的實現(xiàn)

小結(jié)

HystrixConcurrencyStrategy是提供給開發(fā)者去自定義hystrix內(nèi)部線程池及其隊列,還提供了包裝callable的方法,以及傳遞上下文變量的方法。

doc

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn),斷路器,智...
    卡卡羅2017閱讀 136,677評論 19 139
  • 個人筆記,方便自己查閱使用 Contents Java LangAssignment, ReferenceData...
    freenik閱讀 1,529評論 0 6
  • (git上的源碼:https://gitee.com/rain7564/spring_microservices_...
    sprainkle閱讀 9,566評論 13 33
  • 其實發(fā)了好多關(guān)于世界和人性思考的說說 都刪了。然后相信,世界和人類都是善良的,請不要再質(zhì)疑我一個高中生為什...
    朱梓萌閱讀 232評論 0 0
  • 陽光明媚的一天我們踏上了橫穿中國之旅,早上9:30出發(fā),晚上20:00到達山西太谷,我們一路上聽著音樂聊著天,有說...
    冷暖知閱讀 525評論 0 1

友情鏈接更多精彩內(nèi)容