本文首發(fā)于泊浮目的專欄:https://segmentfault.com/blog/camile
| 版本 | 日期 | 備注 |
|---|---|---|
| 1.0 | 2017.12.10 | 文章首發(fā) |
| 1.1 | 2020.9.6 | 根據(jù)缺陷提出一些可參考的解決方案 |
| 1.2 | 2021.7.13 | 增加示意圖 |
| 1.3 | 2021.8.10 | 更新改進(jìn)方案 |
前言
在ZStack中,最基本的執(zhí)行單位不僅僅是一個(gè)函數(shù),也可以是一個(gè)任務(wù)(Task。其本質(zhì)實(shí)現(xiàn)了Java的Callable接口)。通過(guò)大小合理的線程池調(diào)度來(lái)并行的消費(fèi)這些任務(wù),使ZStack這個(gè)Iaas軟件有條不紊運(yùn)行在大型的數(shù)據(jù)中心里。
對(duì)線程池不太了解的同學(xué)可以先看我的一篇博客:Java多線程筆記(三):線程池

演示代碼
在這里,將以ZStack中ThreadFacade最常用的方法為例進(jìn)行演示。
syncSubmit
提交同步任務(wù),線程將會(huì)等結(jié)果完成后才繼續(xù)下一個(gè)任務(wù)。
這里先參考ZStack中ApiMediatorImpl ,其中有一段用于API消息調(diào)度的邏輯。
@Override
public void handleMessage(final Message msg) {
thdf.syncSubmit(new SyncTask<Object>() {
@Override
public String getSyncSignature() {
return "api.worker";
}
@Override
public int getSyncLevel() {
return apiWorkerNum;
}
@Override
public String getName() {
return "api.worker";
}
@MessageSafe
public void handleMessage(Message msg) {
if (msg instanceof APIIsReadyToGoMsg) {
handle((APIIsReadyToGoMsg) msg);
} else if (msg instanceof APIGetVersionMsg) {
handle((APIGetVersionMsg) msg);
} else if (msg instanceof APIGetCurrentTimeMsg) {
handle((APIGetCurrentTimeMsg) msg);
} else if (msg instanceof APIMessage) {
dispatchMessage((APIMessage) msg);
} else {
logger.debug("Not an APIMessage.Message ID is " + msg.getId());
}
}
@Override
public Object call() throws Exception {
handleMessage(msg);
return null;
}
});
}
每個(gè)API消息都會(huì)被一個(gè)線程消費(fèi),同時(shí)最大并發(fā)量為5(apiWorkerNum=5)。每個(gè)線程都會(huì)等著API消息的回復(fù),等到回復(fù)后便給用戶。
chainSubmit
提交異步任務(wù),這里的任務(wù)執(zhí)行后將會(huì)執(zhí)行隊(duì)列中的下一個(gè)任務(wù),不會(huì)等待結(jié)果。
參考VmInstanceBase關(guān)于虛擬機(jī)啟動(dòng)、重啟、暫停相關(guān)的代碼:
//暫停虛擬機(jī)
protected void handle(final APIStopVmInstanceMsg msg) {
thdf.chainSubmit(new ChainTask(msg) {
@Override
public String getName() {
return String.format("stop-vm-%s", self.getUuid());
}
@Override
public String getSyncSignature() {
return syncThreadName;
}
@Override
public void run(SyncTaskChain chain) {
stopVm(msg, chain);
}
});
}
//重啟虛擬機(jī)
protected void handle(final APIRebootVmInstanceMsg msg) {
thdf.chainSubmit(new ChainTask(msg) {
@Override
public String getName() {
return String.format("reboot-vm-%s", self.getUuid());
}
@Override
public String getSyncSignature() {
return syncThreadName;
}
@Override
public void run(SyncTaskChain chain) {
rebootVm(msg, chain);
}
});
}
//啟動(dòng)虛擬機(jī)
protected void handle(final APIStartVmInstanceMsg msg) {
thdf.chainSubmit(new ChainTask(msg) {
@Override
public String getName() {
return String.format("start-vm-%s", self.getUuid());
}
@Override
public String getSyncSignature() {
return syncThreadName;
}
@Override
public void run(SyncTaskChain chain) {
startVm(msg, chain);
}
});
}
通用特性
getSyncSignature則指定了其隊(duì)列的key,這個(gè)任務(wù)隊(duì)列本質(zhì)一個(gè)Map。根據(jù)相同的k,將任務(wù)作為v按照順序放入map執(zhí)行。單從這里的業(yè)務(wù)邏輯來(lái)看,可以有效避免虛擬機(jī)的狀態(tài)混亂。
chainTask的默認(rèn)并發(fā)度為1,這意味著它是同步的。在稍后的源碼解析中我們將會(huì)看到。
它的實(shí)現(xiàn)
先從接口ThreadFacade了解一下方法簽名:
public interface ThreadFacade extends Component {
<T> Future<T> submit(Task<T> task);//提交一個(gè)任務(wù)
<T> Future<T> syncSubmit(SyncTask<T> task); //提交一個(gè)有返回值的任務(wù)
Future<Void> chainSubmit(ChainTask task); //提交一個(gè)沒(méi)有返回值的任務(wù)
Future<Void> submitPeriodicTask(PeriodicTask task, long delay); //提交一個(gè)周期性任務(wù),將在一定時(shí)間后執(zhí)行
Future<Void> submitPeriodicTask(PeriodicTask task); //提交一個(gè)周期性任務(wù)
Future<Void> submitCancelablePeriodicTask(CancelablePeriodicTask task); //提交一個(gè)可以取消的周期性任務(wù)
Future<Void> submitCancelablePeriodicTask(CancelablePeriodicTask task, long delay); //提交一個(gè)可以取消的周期性任務(wù),將在一定時(shí)間后執(zhí)行
void registerHook(ThreadAroundHook hook); //注冊(cè)鉤子
void unregisterHook(ThreadAroundHook hook); //取消鉤子
ThreadFacadeImpl.TimeoutTaskReceipt submitTimeoutTask(Runnable task, TimeUnit unit, long delay); //提交一個(gè)過(guò)了一定時(shí)間就算超時(shí)的任務(wù)
void submitTimerTask(TimerTask task, TimeUnit unit, long delay); //提交一個(gè)timer任務(wù)
}
以及幾個(gè)方法邏輯實(shí)現(xiàn)類DispatchQueueImpl中的幾個(gè)成員變量。
private static final CLogger logger = Utils.getLogger(DispatchQueueImpl.class);
@Autowired
ThreadFacade _threadFacade;
private final HashMap<String, SyncTaskQueueWrapper> syncTasks = new HashMap<String, SyncTaskQueueWrapper>();
private final HashMap<String, ChainTaskQueueWrapper> chainTasks = new HashMap<String, ChainTaskQueueWrapper>();
private static final CLogger _logger = CLoggerImpl.getLogger(DispatchQueueImpl.class);
public static final String DUMP_TASK_DEBUG_SINGAL = "DumpTaskQueue";
關(guān)鍵就是syncTasks(同步隊(duì)列)和chainTasks(異步隊(duì)列) ,用于存儲(chǔ)兩種類型的任務(wù)隊(duì)列。
因此當(dāng)我們提交chainTask時(shí),要注意記得顯示的調(diào)用next方法,避免后面的任務(wù)調(diào)度不到。
接著,我們從最常用的幾個(gè)方法開始看它的代碼。
chainSubmit方法
從ThreadFacadeImpl作為入口
@Override
public Future<Void> chainSubmit(ChainTask task) {
return dpq.chainSubmit(task);
}
DispatchQueue中的邏輯
//公有方法,即入口之一
@Override
public Future<Void> chainSubmit(ChainTask task) {
return doChainSyncSubmit(task);
}
//內(nèi)部邏輯
private <T> Future<T> doChainSyncSubmit(final ChainTask task) {
assert task.getSyncSignature() != null : "How can you submit a chain task without sync signature ???";
DebugUtils.Assert(task.getSyncLevel() >= 1, String.format("getSyncLevel() must return 1 at least "));
synchronized (chainTasks) {
final String signature = task.getSyncSignature();
ChainTaskQueueWrapper wrapper = chainTasks.get(signature);
if (wrapper == null) {
wrapper = new ChainTaskQueueWrapper();
chainTasks.put(signature, wrapper);
}
ChainFuture cf = new ChainFuture(task);
wrapper.addTask(cf);
wrapper.startThreadIfNeeded();
return cf;
}
}
這段邏輯大致為:
- 斷言syncSignature不為空,并且必須并行度必須大于等于1。因?yàn)?會(huì)被做成隊(duì)列,由一個(gè)線程完成這些任務(wù)。而1以上則指定了可以有幾個(gè)線程來(lái)完成同一個(gè)
signature的任務(wù)。 - 加鎖
HashMap<String, ChainTaskQueueWrapper> chainTasks,嘗試取出相同signature的隊(duì)列。如果沒(méi)有則新建一個(gè)相關(guān)signature的隊(duì)列,并初始化這個(gè)隊(duì)列的線程數(shù)量和它的signature。無(wú)論如何,要將這個(gè)任務(wù)放置隊(duì)列。 - 接下來(lái)就是
startThreadIfNeeded。所謂ifNeeded就是指給這個(gè)隊(duì)列的線程數(shù)尚有空余。然后提交一個(gè)任務(wù)到線程池中,這個(gè)任務(wù)的內(nèi)容是:從等待隊(duì)列中取出一個(gè)Feture,如果等待隊(duì)列為空,則刪除這個(gè)等待隊(duì)列的Map。
private class ChainTaskQueueWrapper {
LinkedList pendingQueue = new LinkedList();
final LinkedList runningQueue = new LinkedList();
AtomicInteger counter = new AtomicInteger(0);
int maxThreadNum = -1;
String syncSignature;
void addTask(ChainFuture task) {
pendingQueue.offer(task);
if (maxThreadNum == -1) {
maxThreadNum = task.getSyncLevel();
}
if (syncSignature == null) {
syncSignature = task.getSyncSignature();
}
}
void startThreadIfNeeded() {
//如果運(yùn)行線程數(shù)量已經(jīng)大于等于限制,不start
if (counter.get() >= maxThreadNum) {
return;
}
counter.incrementAndGet();
_threadFacade.submit(new Task<Void>() {
@Override
public String getName() {
return "sync-chain-thread";
}
// start a new thread every time to avoid stack overflow
@AsyncThread
private void runQueue() {
ChainFuture cf;
synchronized (chainTasks) {
// remove from pending queue and add to running queue later
cf = (ChainFuture) pendingQueue.poll();
if (cf == null) {
if (counter.decrementAndGet() == 0) {
//并且線程只有一個(gè)(跑完就沒(méi)了),則將相關(guān)的signature隊(duì)列移除,避免占用內(nèi)存
chainTasks.remove(syncSignature);
}
//如果為空,則沒(méi)有任務(wù),返回
return;
}
}
synchronized (runningQueue) {
// add to running queue
runningQueue.offer(cf);
}
//完成以后將任務(wù)挪出運(yùn)行隊(duì)列
cf.run(new SyncTaskChain() {
@Override
public void next() {
synchronized (runningQueue) {
runningQueue.remove(cf);
}
runQueue();
}
});
}
//這個(gè)方法將會(huì)被線程池調(diào)用,作為入口
@Override
public Void call() throws Exception {
runQueue();
return null;
}
});
}
}
syncSubmit方法
syncSubmit的內(nèi)部邏輯與我們之前分析的chainSubmit極為相似,只是放入了不同的隊(duì)列中。
同樣,也是從ThreadFacadeImpl作為入口
@Override
public <T> Future<T> syncSubmit(SyncTask<T> task) {
return dpq.syncSubmit(task);
}
然后是DispatchQueue中的實(shí)現(xiàn)
@Override
public <T> Future<T> syncSubmit(SyncTask<T> task) {
if (task.getSyncLevel() <= 0) {
return _threadFacade.submit(task);
} else {
return doSyncSubmit(task);
}
}
內(nèi)部邏輯-私有方法
private <T> Future<T> doSyncSubmit(final SyncTask<T> syncTask) {
assert syncTask.getSyncSignature() != null : "How can you submit a sync task without sync signature ???";
SyncTaskFuture f;
synchronized (syncTasks) {
SyncTaskQueueWrapper wrapper = syncTasks.get(syncTask.getSyncSignature());
if (wrapper == null) {
wrapper = new SyncTaskQueueWrapper();
//放入syncTasks隊(duì)列。
syncTasks.put(syncTask.getSyncSignature(), wrapper);
}
f = new SyncTaskFuture(syncTask);
wrapper.addTask(f);
wrapper.startThreadIfNeeded();
}
return f;
}
submitPeriodicTask
提交一個(gè)定時(shí)任務(wù)本質(zhì)上是通過(guò)了線程池的scheduleAtFixedRate來(lái)實(shí)現(xiàn)。這個(gè)方法用于對(duì)任務(wù)進(jìn)行周期性調(diào)度,任務(wù)調(diào)度的頻率是一定的,它以上一個(gè)任務(wù)開始執(zhí)行時(shí)間為起點(diǎn),之后的period時(shí)間后調(diào)度下一次任務(wù)。如果任務(wù)的執(zhí)行時(shí)間大于調(diào)度時(shí)間,那么任務(wù)就會(huì)在上一個(gè)任務(wù)結(jié)束后,立即被調(diào)用。
調(diào)用這個(gè)方法時(shí)將會(huì)把任務(wù)放入定時(shí)任務(wù)隊(duì)列。當(dāng)任務(wù)出現(xiàn)異常時(shí),將會(huì)取消這個(gè)Futrue,并且挪出隊(duì)列。
public Future<Void> submitPeriodicTask(final PeriodicTask task, long delay) {
assert task.getInterval() != 0;
assert task.getTimeUnit() != null;
ScheduledFuture<Void> ret = (ScheduledFuture<Void>) _pool.scheduleAtFixedRate(new Runnable() {
public void run() {
try {
task.run();
} catch (Throwable e) {
_logger.warn("An unhandled exception happened during executing periodic task: " + task.getName() + ", cancel it", e);
final Map<PeriodicTask, ScheduledFuture<?>> periodicTasks = getPeriodicTasks();
final ScheduledFuture<?> ft = periodicTasks.get(task);
if (ft != null) {
ft.cancel(true);
periodicTasks.remove(task);
} else {
_logger.warn("Not found feature for task " + task.getName()
+ ", the exception happened too soon, will try to cancel the task next time the exception happens");
}
}
}
}, delay, task.getInterval(), task.getTimeUnit());
_periodicTasks.put(task, ret);
return ret;
}
submitCancelablePeriodicTask
而submitCancelablePeriodicTask則是會(huì)在執(zhí)行時(shí)檢測(cè)ScheduledFuture是否被要求cancel,如果有要求則取消。
@Override
public Future<Void> submitCancelablePeriodicTask(final CancelablePeriodicTask task, long delay) {
ScheduledFuture<Void> ret = (ScheduledFuture<Void>) _pool.scheduleAtFixedRate(new Runnable() {
private void cancelTask() {
ScheduledFuture<?> ft = cancelablePeriodicTasks.get(task);
if (ft != null) {
ft.cancel(true);
cancelablePeriodicTasks.remove(task);
} else {
_logger.warn("cannot find feature for task " + task.getName()
+ ", the exception happened too soon, will try to cancel the task next time the exception happens");
}
}
public void run() {
try {
boolean cancel = task.run();
if (cancel) {
cancelTask();
}
} catch (Throwable e) {
_logger.warn("An unhandled exception happened during executing periodic task: " + task.getName() + ", cancel it", e);
cancelTask();
}
}
}, delay, task.getInterval(), task.getTimeUnit());
cancelablePeriodicTasks.put(task, ret);
return ret;
}
初始化操作
不同與通常的ZStack組件,它雖然實(shí)現(xiàn)了Component接口。但是其start中的邏輯并不全面,初始化邏輯是基于spring bean的生命周期來(lái)做的。見(jiàn)ThreadFacade。
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:aop="http://www.springframework.org/schema/aop"
xmlns:tx="http://www.springframework.org/schema/tx" xmlns:zstack="http://zstack.org/schema/zstack"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/aop
http://www.springframework.org/schema/aop/spring-aop-3.0.xsd
http://www.springframework.org/schema/tx
http://www.springframework.org/schema/tx/spring-tx-3.0.xsd
http://zstack.org/schema/zstack
http://zstack.org/schema/zstack/plugin.xsd"
default-init-method="init" default-destroy-method="destroy">
<bean id="ThreadFacade" class="org.zstack.core.thread.ThreadFacadeImpl">
<property name="totalThreadNum" value="500" />
<!-- don't declare Component extension, it's specially handled -->
</bean>
<bean id="ThreadAspectj" class="org.zstack.core.aspect.ThreadAspect" factory-method="aspectOf" />
</beans>
再讓回頭看看ThreadFacadeImpl的init與destory操作。
//init 操作
public void init() {
//根據(jù)全局配置讀入線程池最大線程數(shù)量
totalThreadNum = ThreadGlobalProperty.MAX_THREAD_NUM;
if (totalThreadNum < 10) {
_logger.warn(String.format("ThreadFacade.maxThreadNum is configured to %s, which is too small for running zstack. Change it to 10", ThreadGlobalProperty.MAX_THREAD_NUM));
totalThreadNum = 10;
}
// 構(gòu)建一個(gè)支持延時(shí)任務(wù)的線程池
_pool = new ScheduledThreadPoolExecutorExt(totalThreadNum, this, this);
_logger.debug(String.format("create ThreadFacade with max thread number:%s", totalThreadNum));
//構(gòu)建一個(gè)DispatchQueue
dpq = new DispatchQueueImpl();
jmxf.registerBean("ThreadFacade", this);
}
//destory
public void destroy() {
_pool.shutdownNow();
}
看了這里可能大家會(huì)有疑問(wèn),這種關(guān)閉方式未免關(guān)于暴力(執(zhí)行任務(wù)的線程會(huì)全部被中斷)。在此之前,我們?cè)岬竭^(guò),它實(shí)現(xiàn)了Component接口。這個(gè)接口分別有一個(gè)start和stop方法,使一個(gè)組件的生命周期能夠方便的在ZStack中注冊(cè)相應(yīng)的鉤子。
//stop 方法
@Override
public boolean stop() {
_pool.shutdown();
timerPool.stop();
return true;
}
線程工廠
ThreadFacadeImpl同時(shí)也實(shí)現(xiàn)了ThreadFactory,可以讓線程在創(chuàng)建時(shí)做一些操作。
@Override
public Thread newThread(Runnable arg0) {
return new Thread(arg0, "zs-thread-" + String.valueOf(seqNum.getAndIncrement()));
}
在這里可以看到ZStack為每一個(gè)新的線程賦予了一個(gè)名字。
線程池
ZStack對(duì)JDK中的線程池進(jìn)行了一定的擴(kuò)展,對(duì)一個(gè)任務(wù)執(zhí)行前后都有相應(yīng)的鉤子函數(shù),同時(shí)也開放注冊(cè)鉤子。
package org.zstack.core.thread;
import org.apache.logging.log4j.ThreadContext;
import org.zstack.utils.logging.CLogger;
import org.zstack.utils.logging.CLoggerImpl;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
public class ScheduledThreadPoolExecutorExt extends ScheduledThreadPoolExecutor {
private static final CLogger _logger =CLoggerImpl.getLogger(ScheduledThreadPoolExecutorExt.class);
List<ThreadAroundHook> _hooks = new ArrayList<ThreadAroundHook>(8);
public ScheduledThreadPoolExecutorExt(int corePoolSize, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
super(corePoolSize, threadFactory, handler);
this.setMaximumPoolSize(corePoolSize);
}
public void registerHook(ThreadAroundHook hook) {
synchronized (_hooks) {
_hooks.add(hook);
}
}
public void unregisterHook(ThreadAroundHook hook) {
synchronized (_hooks) {
_hooks.remove(hook);
}
}
@Override
protected void beforeExecute(Thread t, Runnable r) {
ThreadContext.clearMap();
ThreadContext.clearStack();
ThreadAroundHook debugHook = null;
List<ThreadAroundHook> tmpHooks;
synchronized (_hooks) {
tmpHooks = new ArrayList<ThreadAroundHook>(_hooks);
}
for (ThreadAroundHook hook : tmpHooks) {
debugHook = hook;
try {
hook.beforeExecute(t, r);
} catch (Exception e) {
_logger.warn("Unhandle exception happend during executing ThreadAroundHook: " + debugHook.getClass().getCanonicalName(), e);
}
}
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
ThreadContext.clearMap();
ThreadContext.clearStack();
ThreadAroundHook debugHook = null;
List<ThreadAroundHook> tmpHooks;
synchronized (_hooks) {
tmpHooks = new ArrayList<ThreadAroundHook>(_hooks);
}
for (ThreadAroundHook hook : tmpHooks) {
debugHook = hook;
try {
hook.afterExecute(r, t);
} catch (Exception e) {
_logger.warn("Unhandle exception happend during executing ThreadAroundHook: " + debugHook.getClass().getCanonicalName(), e);
}
}
}
}
另外,ScheduledThreadPoolExecutorExt是繼承自ScheduledThreadPoolExecutor。本質(zhì)上是一個(gè)任務(wù)調(diào)度線程池,用的工作隊(duì)列也是一個(gè)延時(shí)工作隊(duì)列。
小結(jié)
本文分析了ZStack的久經(jīng)生產(chǎn)考驗(yàn)的核心組件——線程池。通過(guò)線程池,使并行編程變得不再那么復(fù)雜。
當(dāng)然,其中也有一些可以改進(jìn)的地方:
- 一些加鎖的地方(synchronized),可以通過(guò)使用并發(fā)容器解決。這樣可以有效提升吞吐量,節(jié)省因?yàn)楦?jìng)爭(zhēng)鎖而導(dǎo)致的開銷。
- 在提交大量任務(wù)的情況下,HashMap會(huì)因?yàn)閿U(kuò)容而導(dǎo)致性能耗損。可以考慮用固定大小的map并以hash key到固定entry的形式來(lái)保證數(shù)據(jù)結(jié)構(gòu)不會(huì)持續(xù)擴(kuò)容。
- 隊(duì)列是無(wú)界的。在大量任務(wù)請(qǐng)求阻塞時(shí),會(huì)對(duì)內(nèi)存造成極大的負(fù)擔(dān)。
-
任務(wù)隊(duì)列無(wú)超時(shí)邏輯判斷。ZStack中的調(diào)用絕大多數(shù)都是由MQ完成,每一個(gè)msg有著對(duì)應(yīng)的超時(shí)時(shí)間。但是每一個(gè)任務(wù)卻沒(méi)有超時(shí)判定,這意味著一個(gè)任務(wù)執(zhí)行時(shí)間過(guò)長(zhǎng)時(shí),后面的任務(wù)有可能進(jìn)入了超時(shí)狀態(tài),而卻沒(méi)有挪出隊(duì)列,配合之前提到的無(wú)界隊(duì)列,就是一場(chǎng)潛在的災(zāi)難。針對(duì)這個(gè)問(wèn)題,可以參考Zookeeper的
SessionBucket或Kafka的TimingWheel來(lái)解決這種問(wèn)題。