概述
DBLE在啟動時,由作為入口的com.actiontech.dble.DbleServerStartup.main()控制,依次調(diào)用com.actiontech.dble.DbleServer的以下函數(shù),來完成整個啟動過程。這與MyCat的行為相仿。
com.actiontech.dble.DbleServer()
com.actiontech.dble.DbleServer.beforeStart()
com.actiontech.dble.DbleServer.startup()
無論是MyCat的MycatServer,還是DBLE的DbleServer,XXXServer這個類并沒有自己的線程。完成上面的函數(shù)之后,主線程離開DbleServer的相關(guān)函數(shù),并完全交出CPU控制權(quán),后續(xù)將由持有子線程的其他模塊來驅(qū)動程序運(yùn)作。
換句話說,DbleServer是把各個功能模塊封裝起來的邏輯盒子,它主要的工作就是初始化和啟動DBLE的各種功能模塊。
第一階段:com.actiontech.dble.DbleServer()
DBLE在全局公用類的設(shè)計(jì)上,繼承了MyCat的做法,大量使用單實(shí)例設(shè)計(jì)模式(Singleton)。DbleServer本身是單實(shí)例設(shè)計(jì),其中還包含了大量非傳遞(non-transitivity)成員變量,并以此來實(shí)現(xiàn)其他一些類(或者說是功能模塊)的單實(shí)例設(shè)計(jì)。
DbleServer()的主要工作就是對非傳遞成員變量進(jìn)行賦值:
- 讀取配置文件
private final ServerConfig config;
// ...
this.config = new ServerConfig();
- 創(chuàng)建定時任務(wù)的計(jì)時線程池
private final ScheduledExecutorService scheduler;
// ...
scheduler = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setNameFormat("TimerScheduler-%d").build());
- 設(shè)置服務(wù)上線標(biāo)識
private final AtomicBoolean isOnline;
// ...
this.isOnline = new AtomicBoolean(true);
- 創(chuàng)建緩存服務(wù)
private final CacheService cacheService;
// ...
cacheService = new CacheService(config.getSystem().isLowerCaseTableNames());
- 創(chuàng)建路由服務(wù)
private final RouteService routerService;
// ...
routerService = new RouteService(cacheService);
- 記錄啟動時間
private final long startupTime;
// ...
this.startupTime = TimeUtil.currentTimeMillis();
- 創(chuàng)建會話XA檢查服務(wù)
這是MyCat沒有,DBLE特有的功能組件。原理上是提供commitSession和rollbackSession這兩個map,和它們的commit/rollback狀態(tài)檢查,來協(xié)助實(shí)現(xiàn)XA中的全局事務(wù)管理器概念。
雖然沒有進(jìn)行final修飾,但是它是實(shí)例私有(private),也沒有提供setter,而且DbleServer中沒有更改過它的對象引用,所以也可以認(rèn)為它是單實(shí)例模式的設(shè)計(jì)。
private XASessionCheck xaSessionCheck;
// ...
xaSessionCheck = new XASessionCheck();
- 創(chuàng)建序列號發(fā)生器
private final SequenceHandler sequenceHandler;
// ...
sequenceHandler = initSequenceHandler(config.getSystem().getSequnceHandlerType());
與MycatServer相比,DbleServer在這個階段(MycatServer的實(shí)例化過程中),并不實(shí)例化DynaClassLoader(catlet加載器)和SQLInterceptor(SQL解析器,封裝了druid)。這是因?yàn)?code>DbleServer從設(shè)計(jì)上就移除了這兩個成員,是設(shè)計(jì)上的顯著不同。
第二階段:com.actiontech.dble.DbleServer.beforeStart()
和MyCat一樣,beforeStart()僅僅調(diào)用SystemConfig.getHomePath(),這是一個只讀不寫的靜態(tài)方法,所以并不會引起任何變化。
public void beforeStart() {
SystemConfig.getHomePath();
}
另外,com.actiontech.dble.DbleServerStartup.main()在實(shí)例化DbleServer之前也調(diào)用同一方法,用來確認(rèn)DBLE_HOME。
所以,從功能上來看這個步驟是沒有意義的一步。MyCat和DBLE都沒有移除它,可能是想保留一個擴(kuò)展點(diǎn)吧。
第三階段:com.actiontech.dble.DbleServer.startup()
該階段主要是創(chuàng)建了使外部能夠訪問和利用DBLE的協(xié)作類:業(yè)務(wù)前端SocketAcceptor server、管理前端SocketAcceptor manager和處理器組NIOProcessor processors[]。
但是,從最終效果來說,其實(shí)這些類的實(shí)例化與初始化工作前移到DbleServer的實(shí)例化階段(啟動的第一階段)也沒有問題(reload操作針對的是ServerConfig)。將這些類的初始化和啟動混放在本階段,各階段間的邏輯區(qū)分不太清晰,應(yīng)該是來自MyCat的的歷史包袱。
- 創(chuàng)建備份時用的業(yè)務(wù)阻塞鎖
這是MyCat所不具備的特性,這個特性對于集群備份非常必要。
private AtomicBoolean backupLocked;
// ...
backupLocked = new AtomicBoolean(false);
- 創(chuàng)建業(yè)務(wù)端口(TCP 8066)的聆聽器
ServerConnectionFactory sf = new ServerConnectionFactory();
SocketAcceptor server = null;
// ...
server = new NIOAcceptor(DirectByteBufferPool.LOCAL_BUF_THREAD_PREX + NAME + "Server", system.getBindIp(),
system.getServerPort(), system.getServerBacklog(), sf, reactorPool);
// ...
server.start();
- 創(chuàng)建管理端口(TCP 9066)的聆聽器
ManagerConnectionFactory mf = new ManagerConnectionFactory();
SocketAcceptor manager = null;
// ...
manager = new NIOAcceptor(DirectByteBufferPool.LOCAL_BUF_THREAD_PREX + NAME + "Manager", system.getBindIp(),
system.getManagerPort(), 100, mf, reactorPool);
// ...
manager.start();
- 分配內(nèi)存緩沖池
int bufferPoolPageSize = system.getBufferPoolPageSize();
// total page number
short bufferPoolPageNumber = system.getBufferPoolPageNumber();
//minimum allocation unit
short bufferPoolChunkSize = system.getBufferPoolChunkSize();
totalNetWorkBufferSize = bufferPoolPageSize * bufferPoolPageNumber;
if (totalNetWorkBufferSize > Platform.getMaxDirectMemory()) {
LOGGER.error("Direct BufferPool size lager than MaxDirectMemory");
throw new IOException("Direct BufferPool size lager than MaxDirectMemory");
}
bufferPool = new DirectByteBufferPool(bufferPoolPageSize, bufferPoolChunkSize, bufferPoolPageNumber);
- 分配用于跨節(jié)點(diǎn)結(jié)果整合/排序/分組/分頁用的堆外內(nèi)存
if (system.getUseOffHeapForMerge() == 1) {
try {
serverMemory = new SeverMemory(system, totalNetWorkBufferSize);
} catch (NoSuchFieldException e) {
LOGGER.error("NoSuchFieldException", e);
} catch (IllegalAccessException e) {
LOGGER.error("Error", e);
}
}
- 創(chuàng)建業(yè)務(wù)處理池
int threadPoolSize = system.getProcessorExecutor();
// ...
businessExecutor = ExecutorUtil.createFixed("BusinessExecutor", threadPoolSize);
- 創(chuàng)建復(fù)雜查詢處理池
int threadPoolSize = system.getProcessorExecutor();
// ...
complexQueryExecutor = ExecutorUtil.createCached("complexQueryExecutor", threadPoolSize);
- 創(chuàng)建定時任務(wù)處理池
timerExecutor = ExecutorUtil.createFixed("Timer", 1);
- 將內(nèi)存緩沖池和業(yè)務(wù)處理池組合成一個抽象對象,處理器
int processorCount = system.getProcessors();
processors = new NIOProcessor[processorCount];
// ...
for (int i = 0; i < processors.length; i++) {
processors[i] = new NIOProcessor("Processor" + i, bufferPool, businessExecutor);
}
- 創(chuàng)建交易語句記錄器
if (config.getSystem().getRecordTxn() == 1) {
txnLogProcessor = new TxnLogProcessor(bufferPool);
txnLogProcessor.setName("TxnLogProcessor");
txnLogProcessor.start();
}
- 創(chuàng)建互斥元數(shù)據(jù)管理器
tmManager = new ProxyMetaManager();
if (!this.getConfig().isDataHostWithoutWR()) {
//init tmManager
try {
tmManager.init(this.getConfig());
} catch (Exception e) {
throw new IOException(e);
}
}
- 進(jìn)行XA的crash recovery
performXARecoveryLog();
- 注冊定時任務(wù)
long dataNodeIdleCheckPeriod = system.getDataNodeIdleCheckPeriod();
scheduler.scheduleAtFixedRate(updateTime(), 0L, TIME_UPDATE_PERIOD, TimeUnit.MILLISECONDS);
scheduler.scheduleWithFixedDelay(processorCheck(), 0L, system.getProcessorCheckPeriod(), TimeUnit.MILLISECONDS);
scheduler.scheduleAtFixedRate(dataNodeConHeartBeatCheck(dataNodeIdleCheckPeriod), 0L, dataNodeIdleCheckPeriod, TimeUnit.MILLISECONDS);
//dataHost heartBeat will be influence by dataHostWithoutWR
scheduler.scheduleAtFixedRate(dataNodeHeartbeat(), 0L, system.getDataNodeHeartbeatPeriod(), TimeUnit.MILLISECONDS);
scheduler.scheduleAtFixedRate(dataSourceOldConsClear(), 0L, DEFAULT_OLD_CONNECTION_CLEAR_PERIOD, TimeUnit.MILLISECONDS);
scheduler.scheduleWithFixedDelay(xaSessionCheck(), 0L, system.getXaSessionCheckPeriod(), TimeUnit.MILLISECONDS);
scheduler.scheduleWithFixedDelay(xaLogClean(), 0L, system.getXaLogCleanPeriod(), TimeUnit.MILLISECONDS);
scheduler.scheduleWithFixedDelay(resultSetMapClear(), 0L, system.getClearBigSqLResultSetMapMs(), TimeUnit.MILLISECONDS);
if (system.getUseSqlStat() == 1) {
//sql record detail timing clean
scheduler.scheduleWithFixedDelay(recycleSqlStat(), 0L, DEFAULT_SQL_STAT_RECYCLE_PERIOD, TimeUnit.MILLISECONDS);
}
if (system.getUseGlobleTableCheck() == 1) { // will be influence by dataHostWithoutWR
scheduler.scheduleWithFixedDelay(globalTableConsistencyCheck(), 0L, system.getGlableTableCheckPeriod(), TimeUnit.MILLISECONDS);
}
- 初始化到數(shù)據(jù)庫的連接
if (!this.getConfig().isDataHostWithoutWR()) {
// init datahost
// connect to Databases using conf settings
Map<String, PhysicalDBPool> dataHosts = this.getConfig().getDataHosts();
LOGGER.info("Initialize dataHost ...");
for (PhysicalDBPool node : dataHosts.values()) {
String index = dnIndexProperties.getProperty(node.getHostName(), "0");
if (!"0".equals(index)) {
LOGGER.info("init datahost: " + node.getHostName() + " to use datasource index:" + index);
}
int activeIndex = node.init(Integer.parseInt(index));
saveDataHostIndex(node.getHostName(), activeIndex);
node.startHeartbeat();
}
}