StackExchange.Redis TimeOut 記錄

前言

最近在用.Net Core 做業(yè)務(wù)模塊時,發(fā)現(xiàn)經(jīng)常會出現(xiàn)TimeOut 超時的情況。然后看了官方的解釋,說2.0版本之后維護了一個專用的線程池。我就打算閱讀源碼,看一下這個線程池的實現(xiàn)。

源碼

StackExchange.Redis源碼 中可以看到,其中維護了一個名為:DedicatedThreadPoolPipeScheduler 的線程池,此線程池引用了一個使用并不多的第三方開源庫Pipelines.Sockets.Unofficial。這個庫給我的第一感覺就是,用的很少,估計有坑。

我們先來看一下這個 DedicatedThreadPoolpipeScheduler 的實現(xiàn):

/// <summary>
    /// An implementation of a pipe-scheduler that uses a dedicated pool of threads, deferring to
    /// the thread-pool if that becomes too backlogged
    /// </summary>
    public sealed class DedicatedThreadPoolPipeScheduler : PipeScheduler, IDisposable
    {
        /// <summary>
        /// Reusable shared scheduler instance
        /// </summary>
        public static DedicatedThreadPoolPipeScheduler Default => StaticContext.Instance;

        private static class StaticContext
        {   // locating here rather than as a static field on DedicatedThreadPoolPipeScheduler so that it isn't instantiated too eagerly
            internal static readonly DedicatedThreadPoolPipeScheduler Instance = new DedicatedThreadPoolPipeScheduler(nameof(Default));
        }

        /// <summary>
        /// The name of the pool
        /// </summary>
        public override string ToString() => Name;

        /// <summary>
        /// The number of workers associated with this pool
        /// </summary>
        public int WorkerCount { get; }

        private int UseThreadPoolQueueLength { get; }

        private ThreadPriority Priority { get; }

        private string Name { get; }

        /// <summary>
        /// Create a new dedicated thread-pool
        /// </summary>
        public DedicatedThreadPoolPipeScheduler(string name = null, int workerCount = 5, int useThreadPoolQueueLength = 10,
            ThreadPriority priority = ThreadPriority.Normal)
        {
            if (workerCount < 0) throw new ArgumentNullException(nameof(workerCount));

            WorkerCount = workerCount;
            UseThreadPoolQueueLength = useThreadPoolQueueLength;
            if (string.IsNullOrWhiteSpace(name)) name = GetType().Name;
            Name = name.Trim();
            Priority = priority;
            for (int i = 0; i < workerCount; i++)
            {
                StartWorker(i);
            }
        }

        private long _totalServicedByQueue, _totalServicedByPool;

        /// <summary>
        /// The total number of operations serviced by the queue
        /// </summary>
        public long TotalServicedByQueue => Volatile.Read(ref _totalServicedByQueue);

        /// <summary>
        /// The total number of operations that could not be serviced by the queue, but which were sent to the thread-pool instead
        /// </summary>
        public long TotalServicedByPool => Volatile.Read(ref _totalServicedByPool);

        private readonly struct WorkItem
        {
            public readonly Action<object> Action;
            public readonly object State;
            public WorkItem(Action<object> action, object state)
            {
                Action = action;
                State = state;
            }
        }

        private volatile bool _disposed;

        private readonly Queue<WorkItem> _queue = new Queue<WorkItem>();
        private void StartWorker(int id)
        {
            var thread = new Thread(ThreadRunWorkLoop)
            {
                Name = $"{Name}:{id}",
                Priority = Priority,
                IsBackground = true
            };
            thread.Start(this);
            Helpers.Incr(Counter.ThreadPoolWorkerStarted);
        }

        /// <summary>
        /// Requests <paramref name="action"/> to be run on scheduler with <paramref name="state"/> being passed in
        /// </summary>
        public override void Schedule(Action<object> action, object state)
        {
            if (action == null) return; // nothing to do
            int queueLength;
            lock (_queue)
            {
                _queue.Enqueue(new WorkItem(action, state));
                if (_availableCount != 0)
                {
                    Monitor.Pulse(_queue); // wake up someone
                }
                queueLength = _queue.Count;
            }

            if (_disposed || queueLength > UseThreadPoolQueueLength)
            {
                Helpers.Incr(Counter.ThreadPoolPushedToMainThreadPool);
                System.Threading.ThreadPool.QueueUserWorkItem(ThreadPoolRunSingleItem, this);
            }
            else
            {
                Helpers.Incr(Counter.ThreadPoolScheduled);
            }
        }

        private static readonly ParameterizedThreadStart ThreadRunWorkLoop = state => ((DedicatedThreadPoolPipeScheduler)state).RunWorkLoop();
        private static readonly WaitCallback ThreadPoolRunSingleItem = state => ((DedicatedThreadPoolPipeScheduler)state).RunSingleItem();

        private int _availableCount;
        /// <summary>
        /// The number of workers currently actively engaged in work
        /// </summary>
        public int AvailableCount => Thread.VolatileRead(ref _availableCount);

        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        private void Execute(Action<object> action, object state)
        {
            try
            {
                action(state);
                Helpers.Incr(Counter.ThreadPoolExecuted);
                Helpers.Incr(action == SocketAwaitableEventArgs.InvokeStateAsAction ? ((Action)state).Method : action.Method);
            }
            catch (Exception ex)
            {
                Helpers.DebugLog(Name, ex.Message);
            }
        }

        private void RunSingleItem()
        {
            WorkItem next;
            lock (_queue)
            {
                if (_queue.Count == 0) return;
                next = _queue.Dequeue();
            }
            Interlocked.Increment(ref _totalServicedByPool);
            Execute(next.Action, next.State);
        }
        private void RunWorkLoop()
        {
            while (true)
            {
                WorkItem next;
                lock (_queue)
                {
                    while (_queue.Count == 0)
                    {
                        if (_disposed) break;
                        _availableCount++;
                        Monitor.Wait(_queue);
                        _availableCount--;
                    }
                    if (_queue.Count == 0)
                    {
                        if (_disposed) break;
                        else continue;
                    }
                    next = _queue.Dequeue();
                }
                Interlocked.Increment(ref _totalServicedByQueue);
                Execute(next.Action, next.State);
            }
        }
        /// <summary>
        /// Release the threads associated with this pool; if additional work is requested, it will
        /// be sent to the main thread-pool
        /// </summary>
        public void Dispose()
        {
            _disposed = true;
            lock (_queue)
            {
                Monitor.PulseAll(_queue);
            }
        }
    }

可以看到內(nèi)部維護了一個任務(wù)隊列,默認工作線程為5個。最重要的一段是這里:

    /// <summary>
        /// Requests <paramref name="action"/> to be run on scheduler with <paramref name="state"/> being passed in
        /// </summary>
        public override void Schedule(Action<object> action, object state)
        {
            if (action == null) return; // nothing to do
            int queueLength;
            lock (_queue)
            {
                _queue.Enqueue(new WorkItem(action, state));
                if (_availableCount != 0)
                {
                    Monitor.Pulse(_queue); // wake up someone
                }
                queueLength = _queue.Count;
            }

            if (_disposed || queueLength > UseThreadPoolQueueLength)
            {
                Helpers.Incr(Counter.ThreadPoolPushedToMainThreadPool);
                System.Threading.ThreadPool.QueueUserWorkItem(ThreadPoolRunSingleItem, this);
            }
            else
            {
                Helpers.Incr(Counter.ThreadPoolScheduled);
            }
        }

當任務(wù)隊列長度超過 UseThreadPoolQueueLength (默認為10)或者這個專用線程池被釋放而又存在沒有處理完的任務(wù)時,就會使用.Net全局線程池來幫助處理任務(wù)。
另外的問題就是,這個默認線程池工作線程數(shù)量,在StackExchange.Redis中是不可配置的,默認就是5。所以如果使用單實例模型來使用這個Redis庫時,一旦并發(fā)數(shù)高就會出現(xiàn)超時的任務(wù),根據(jù)現(xiàn)有資料可以看到

For these .NET-provided global thread pools: once the number of existing (busy) threads hits the "minimum" number of threads, the ThreadPool will throttle the rate at which is injects new threads to one thread per 500 milliseconds. This means that if your system gets a burst of work needing an IOCP thread, it will process that work very quickly. However, if the burst of work is more than the configured "Minimum" setting, there will be some delay in processing some of the work as the ThreadPool waits for one of two things to happen 1. An existing thread becomes free to process the work 2. No existing thread becomes free for 500ms, so a new thread is created.
Basically, if you're hitting the global thread pool (rather than the dedicated StackExchange.Redis thread-pool) it means that when the number of Busy threads is greater than Min threads, you are likely paying a 500ms delay before network traffic is processed by the application. Also, it is important to note that when an existing thread stays idle for longer than 15 seconds (based on what I remember), it will be cleaned up and this cycle of growth and shrinkage can repeat.

.Net全局線程池在超過最小線程數(shù)時,要花費500ms 創(chuàng)建一個線程,當線程閑時超過15秒時就會被回收,是一個動態(tài)伸縮的線程池。

那么總結(jié)一下就是:

  1. StackExchange.Redis維護了一個專有線程池,但是在線程數(shù)超過5,且并發(fā)數(shù)超過10(每個任務(wù)處理的非常慢的極端情況)就會使用.Net 全局線程池。
  2. StackExchange.Redis無法配置這個工作線程數(shù)。
  3. 當并發(fā)過多,且全局線程池的Minimum 很小時,就會出現(xiàn)超時TimeOut 的情況。

解決方案:

  1. 創(chuàng)建多個ConnectionMultiplexer 實例(每個實例有5個專用工作線程),理論上粗略計算可以處理N510個并發(fā)。
  2. 自己編譯源碼修改工作線程數(shù)。
  3. System.Threading.ThreadPool.SetMinThreads(200, 200); //根據(jù)并發(fā)估算,設(shè)置一下全局線程池的最小值。

但是以上的解決方案,感覺都不好。最好的方式應(yīng)該就是,自己寫一個Redis的客戶端,然后實現(xiàn)一個能夠動態(tài)擴容和收縮的線程池。StackExchange.Redis代碼結(jié)構(gòu)看著也不夠清晰。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • rljs by sennchi Timeline of History Part One The Cognitiv...
    sennchi閱讀 7,872評論 0 10
  • 北京 第一教堂 婚禮進行時 熱熱鬧鬧的氛圍,伴隨著美麗新娘的到來,掌聲響起。 慕珂馨被慕父引著,走向她余生的眷...
    囖_25ba閱讀 222評論 0 1
  • 抓住終端色,提綱挈領(lǐng)!表達結(jié)構(gòu)規(guī)律,主觀塑造!強調(diào)瞬間感受,筆隨意走!有的放矢,一氣呵成!
    夜書童閱讀 146評論 0 0

友情鏈接更多精彩內(nèi)容