轉載請注明:
原始地址: http://www.itdecent.cn/p/9f2fb27062cb
原作者:wonder
1、寫在前面
owt-server使用node.js開發(fā),涉及node.js c++混合開發(fā)。
owt-server的目錄結構如下:
root@ubuntu:/home/wonder/OWT/owt-server-master# ls
build cert doc docker LICENSE node_modules README.md scripts source test third_party
各種環(huán)境安裝腳本在 scripts/ 下,參考README.md進行編譯、安裝、運行、測試即可
2、owt-server簡要介紹
owt-server是集群式的媒體服務。每種功能模塊可以是集群(cluster)的一個工作站(worker),多個worker由中心管理者(manager)管理,管理者有主(master)/備(slave)/候選者(candidate)之分。有些模塊可以復用同一個worker。
worker、manager之間通過消息隊列進行 任務傳遞/rpc調用。owt-server使用了node.js中的amqp庫模塊連接宿主機中運行的rabbitmq,以此作為消息隊列的底層實現(xiàn)。
3、clusterManager集群管理者模塊概述
查看目錄 source/cluster_manager/
root@ubuntu:/home/wonder/OWT/owt-server-master# ls source/cluster_manager/
clusterManager.js cluster_manager.toml dist.json index.js log4js_configuration.json matcher.js package.json scheduler.js strategy.js
其中index.js,為該模塊的啟動入口。**index.js **代碼前段是引用必要的庫,重點庫有:
1)amqp_client (庫實現(xiàn)位于source/common/amqp_client.js):主要用是owt-server的 rpc 封裝(利用amqp實現(xiàn)RPC角色的封裝定義,如rpcClient、rpcServer等),后文會介紹
2)clusterManager(位于source/cluster_manager/clusterManager.js):主要定義了集群管理中心之管理者(manager)、候選者(candidate)自薦競選、主(master)/備(slave)、同步、?;畹确椒?。
3.1 clusterManager模塊詳細分解
clusterManager.js定義了主要的4內部函數變量(var ClusterManager、var runAsSlave、var runAsMaster和var runAsCandidate)和一個導出函數變量(exports.run)
下面就是 clusterManager.js,源碼沒有注釋,筆者在走讀源碼后根據自己的理解添加了注釋。涉及到對專業(yè)術語有疑問的,如rpc、主\備\候選者(master/slave/candidate)、以及 js 語法,請自行百度。
推薦兩篇介紹消息隊列的網址,個人覺得很不錯:
【推薦看這篇,全面且有拓展】:[http://www.itdecent.cn/p/a4d92d0d7e19](http://www.itdecent.cn/p/a4d92d0d7e19)
rabbitmq 對于AMQP的介紹:[https://www.rabbitmq.com/tutorials/amqp-concepts.html#message-acknowledge]
var ClusterManager = function (clusterName, selfId, spec) { //集群管理者定義**
//省略一些定義
......
// var ClusterManager 定義了集群管理中心(manager)的內部函數變量和返回值:**
// 以下為內部函數變量,函數體均省略,請查看源碼**
var createScheduler = function (purpose); /*創(chuàng)建某一類任務的調度器(Scheduler,記錄\管理\執(zhí)行)*/
var checkAlive = function (); /*檢查該manager 所管理的工作站 (worker)的存活情況*/
var workerJoin = function (purpose, worker, info); /*執(zhí)行某一類任務的worker加入該manager */
var workerQuit = function (worker); /*一個 worker 從該manager 退出該*/
var keepAlive = function (worker, on_result); /*一個 worker 向該 manager 申請?;?/
var reportState = function (worker, state); /* 該 manager 報告某 worker 的狀態(tài)*/
var reportLoad = function (worker, load); /*該 manager 報告某 woker 的負載*/
var pickUpTasks = function (worker, tasks); /*令某 worker 執(zhí)行某些任務*/
var layDownTask = function (worker, task); /*令某 worker 放棄執(zhí)行某任務*/
var schedule = function (purpose, task, preference, reserveTime, on_ok, on_error); /*對某類型(purpose) 的任務 (task) 按照指定配置 (preference,reserveTime) 分配worker*/
var unschedule = function (worker, task); /*撤銷某 worker 下分配的任務*/
var getWorkerAttr = function (worker, on_ok, on_error); /*獲取某 worker 的屬性*/
var getWorkers = function (purpose, on_ok); /*獲取某類 workers*/
var getTasks = function (worker, on_ok); /*獲取某 worker 的任務*/
var getScheduled = function (purpose, task, on_ok, on_error); /*獲取某類型任務的 worker*/
// 以下為返回值:
var that = {name: clusterName, id: selfId}; /*ClusterManager(...)的返回值that*/
that.getRuntimeData = function (on_data); /*收集該 manager 管理的每類Scheduler、每個 worker 、每個 task*/
that.registerDataUpdate = function (on_updated_data); /*向該 manager 注冊消息同步實例*/
that.setRuntimeData = function (data); /*向該 manager 配置 data 中記錄的 Scheduler、worker和 task*/
that.setUpdatedData = function (data); /*向該 manager 更新信息,data.type∈{"worker_join "," worker_quit"," worker_state"," worker_load","worker_pickup "," worker_laydown"," scheduled"," unscheduled" },data具體數據結構,請查看源碼*/
that.serve = function (monitoringTgt); /*啟用 manager 服務,并注冊管理目標*/
that.rpcAPI = { /*rpc接口函數,以下函數體均省略,請查看源碼。它們與內部函數變量是對應的*/
join: function (purpose, worker, info, callback) { ...},
quit: function (worker) { ...},
keepAlive: function (worker, callback) { ...},
reportState: function (worker, state) { ...},
reportLoad: function (worker, load) { ...},
pickUpTasks: function (worker, tasks) { ...},
layDownTask: function (worker, task) { ...},
schedule: function (purpose, task, preference, reserveTime, callback) { ...},
unschedule: function (worker, task) { ...},
getWorkerAttr: function (worker, callback) { ...},
getWorkers: function (purpose, callback) { ...},
getTasks: function (worker, callback) { ...},
getScheduled: function (purpose, task, callback) { ...}
}
}
var runAsSlave= function(topicChannel, manager) { //集群管理者作為 “備份”(salve) 的身份運行**
//省略一些定義
......
**//以下為內部函數變量,函數體均省略,請查看源碼**
var requestRuntimeData = function (); /*向首要集群管理者 (master) 請求運行期間的數據,數據內容參閱上文var ClusterManager 返回值中的 getRuntimeData 函數*/
var onTopicMessage = function(message); /*接收到主題消息時的處理函數,message.type∈{ “runtimeData”,“updateData”,“declareMaster” },分別對應著 “收到運行期間的數據”,“收到數據更新”,“收到 master 的角色申明 ” */
var superviseMaster = function (); /*監(jiān)督 master 的定時任務(30ms檢查一次),若當前master失聯(lián)(matster心跳超時大于2次),則該 salve 將進入候選者身份(candidate) 的狀態(tài)*/
**//以下為調用 runAsSlave 將執(zhí)行的函數體**
topicChannel.subscribe( //在指定的主題信道下(基于消息隊列) 訂閱兩種主題消息
['clusterManager.slave.#', 'clusterManager.*.' + manager.id] , //兩種主題的關鍵id
onTopicMessage , //主題消息處理函數
function () { //訂閱成功后,執(zhí)行的函數體
requestRuntimeData(); //向 master 請求運行期間的數據
superviseMaster(); //監(jiān)督 master
}
)
**}**
**var runAsMaster = function(topicChannel, manager) {**
//省略一些定義
......
topicChannel.bus.asRpcServer( //啟用遠程調用服務
manager.name, //master 名稱
manager.rpcAPI, //master的rpc接口
function(rpcSvr) { //rpc服務啟用成功后執(zhí)行的函數體
topicChannel.bus.asMonitoringTarget(function(monitoringTgt) { //啟用worker監(jiān)管服務,主要用于在worker遠程調用登出master時的消息回傳,可類比為消息確認ACK
manager.serve(monitoringTgt); //啟用 master 集群管理服務
setInterval( //設置定時器
function () { // 向消息隊列的三種主題 發(fā)送 “declareMaster ”消息
topicChannel.publish( //主題 'clusterManager.slave' 消息
'clusterManager.slave',
{type: 'declareMaster', data: {id: manager.id, life_time: life_time}}
);
topicChannel.publish( //主題 'clusterManager.candidate' 消息
'clusterManager.candidate',
{type: 'declareMaster', data: {id: manager.id, life_time: life_time}}
);
topicChannel.publish( //主題 'clusterManager.master' 消息
'clusterManager.master',
{type: 'declareMaster', data: {id: manager.id, life_time: life_time}}
);
},
20 //時間間隔20ms
);
var onTopicMessage = function (message); //消息處理函數
topicChannel.subscribe( //訂閱主題消息
['clusterManager.master.#', 'clusterManager.*.' + manager.id],
onTopicMessage, //消息處理函數
function () { //訂閱成功執(zhí)行的函數體
manager.registerDataUpdate( //注冊通知slave的具體方法
topicChannel.publish( //通過消息隊列發(fā)送主題為 'clusterManager.slave' 的消息
'clusterManager.slave',
{type: 'updateData', data: data}
)
);
}
);
},
function(reason) { process.exit();}; //asMonitoringTarget 失敗
},
function(reason) {process.exit(); }; //as RPC server 失敗
}
}
}
var runAsCandidate = function(topicChannel, manager) {
//省略一些定義
......
var electMaster = function () { ...} //該候選者決定自身身份:是 master 還是 slave
var selfRecommend = function () { ...} //該候選者自薦,每30ms向消息隊列發(fā)送'clusterManager.candidate' 主題消息“selfRecommend”
var onTopicMessage = function (message) { ...} //消息處理函數。初始化后定時160ms決定自身身份;收到“selfRecommend”,若消息中id大于自身id,放棄晉升master;收到“declareMaster”,停止自薦,清除定時,成為slave身份
topicChannel.subscribe( //訂閱 “clusterManager.candidate.#”主題消息
['clusterManager.candidate.#'],
onTopicMessage, //消息處理函數
function () {
selfRecommend(); //訂閱成功,該參與者開始自薦
}
);
}
exports.run= function (topicChannel, clusterName, id, spec) {
//該js庫的導出函數
var manager = new ClusterManager(clusterName, id, spec); //生成一個集群管理者(manager)實例
runAsCandidate(topicChannel, manager); //該manager立即作為候選者(candiate)運行
}
思考:主/備方式的好處,在于:一定程度減少了中心式集群管理的風險,即中心管理者宕機造成集群失效的風險。其缺點也是存在的,即調度集中于主管理者,主管理者僅于備管理者進行同步,在調度請求非常頻繁時,主管理者性能會成為瓶頸,這也是中心式網絡應用的通病。
閱讀clusterManager.js文件的總結:
a) 通過源碼走讀,可以明確管理者(manager)、主(master)/備(slave)/候選者(candidate)的分工以及競選方式。
b) 這部分僅是對集群管理者(cluster_manager)的定義,對于集群工作站(worker)的定義還沒有概念。目前僅知道,master暴露了一下rpc接口供調用。
c) 這部分對rpc的調用是比較高層次的,確實在owt-server的代碼中,amqp_client.js文件對node.js中的amqp進行了封裝,以amqp為基礎實現(xiàn)了底層的消息收發(fā)、通知機制。
d) 這部分提到了Scheduler,它是作為某種類型的任務的管理器,供clusterManager.js使用的。它內部實現(xiàn)了task的記錄、worker的記錄、超時管理、task與worker的關聯(lián)、task和worker的調度分配細節(jié)。對于記錄、關聯(lián)、超時管理等功能,下文不做詳細描述,因為相關的接口基本與clusterManager.js 文件中 var ClusterManager 提供的接口一致。
因此,下文將僅對scheduler.js中的任務調度部分做詳細分解。
( 消息隊列重要文件amqp_client.js將在下一篇《owt-server 的集群管理、集群工作站、消息隊列(二)》進行分解; 集群工作站(worker)將在《owt-server 的集群管理、集群工作站、消息隊列(三)》結合具體應用類型進行分解)
3.2 scheduler模塊---任務調度部分詳細分解
話不多說,上干貨。
首先,放兩個scheduler 模塊---任務調度部分需要使用的模塊。
1) strategy.js
調度策略模塊,描述了不同的調度準則:最近使用、最常使用、最少使用、roundRobin(輪詢)、隨機選取
這里貼兩個(最常使用、roundRobin)進行說明
var mostUsed = function () {
this.allocate = function (workers, candidates, on_ok, on_error) { //獲取該策略選中的某個 work 在 workers 中的標號,candidates中存放標號
var most = 0, found = undefined;
for (var i in candidates) { //在提前篩選出的候選candidates中搜索,(提前篩選好處是縮小策略算法運算的空間范圍)
var id = candidates[i];
if (workers[id].load >= most) { //檢查id所對應work的負載,選取最大負載的work的標號
most = workers[id].load;
found = id;
}
}
on_ok(found); //回調選中的標號
};};
var roundRobin = function () {
var latest_used = 65536 * 65536;
this.allocate = function (workers, candidates, on_ok, on_error) {
var i = candidates.indexOf(latest_used); //初始返回-1
if (i === -1) {
latest_used = candidates[0]; //初始選第一個候選
} else {
latest_used = (i === candidates.length - 1) ? candidates[0] : candidates[i + 1]; //選擇下一個candidates[xxx]中存放的標號
}
on_ok(latest_used); //回調選中的標號
};};
2) matcher.js
條件匹配模塊,描述了不同類型work的匹配準則。owt-server提供了多種類型的服務:portal、webrtc、video、audio、analytics、conference、recording、streaming。其中有些服務需要有獨特的任務task和工作站worker的匹配準則。
舉兩個栗子(上干貨):
var webrtcMatcher = function () {
this.match = function (preference, workers, candidates) { //參數1是配置喜好
var result = [],
found_sweet = false; //找到甜心?!?!?!?!!!,源碼作者有點意思的(奸笑~)
for (var i in candidates) {
var id = candidates[i];
var capacity = workers[id].info.capacity; //每個worker在向master登記時,都會把自身能力帶上
if (is_isp_applicable(capacity.isps, preference.isp)) { //這個isp是什么作用還不清楚,直譯是“運營商”?懂的朋友可以交流一下
if (is_region_suited(capacity.regions, preference.region)) { //這個region也不太明確,根據字面直覺上和域控相關
if (!found_sweet) {
found_sweet = true;
result = [id];
} else {
result.push(id);
}
} else { //不在region里,并且沒有找到甜心,強行指定甜心嗎?有點迷
if (!found_sweet) {
result.push(id);
}
}
}
}
return result;
};};
var videoMatcher = function () {
this.match = function (preference, workers, candidates) {
if (!preference || !preference.video)
return candidates;
var formatContain = function (listA, listB) { //函數,統(tǒng)計B在A中的數量
var count = 0;
listB.forEach((fmtB) => {
if (listA.indexOf(fmtB) > -1)
count++;
});
return (count === listB.length);
};
var result = candidates.filter(function(cid) { //篩選結果
var capacity = workers[cid].info.capacity;
var encodeOk = false;
var decodeOk = false;
if (capacity.video) {
encodeOk = formatContain(capacity.video.encode, preference.video.encode); //判斷偏好的視頻編碼器是否在worker的能力中
decodeOk = formatContain(capacity.video.decode, preference.video.decode); //判斷偏好的視頻解碼器是否在worker的能力中
}
if (!encodeOk) { //編碼不匹配
log.warn('No available workers for encoding:', JSON.stringify(preference.video.encode));
}
if (!decodeOk) { //解碼不匹配
log.warn('No available workers for decoding:', JSON.stringify(preference.video.decode));
}
return (encodeOk && decodeOk); //編解碼都匹配才行嘛!
});
return result;
};};
終于,
3)scheduler.js
代碼不多,就是淦~
exports.Scheduler = function(spec) {
/*State <- [0 | 1 | 2]*/ //官方注釋最為致命,這種魔數是看代碼最大的障礙之一,尤其是表示狀態(tài)的魔數
/*{WorkerId: {state: State, load: Number, info: info, tasks:[Task]}*/ // Scheduler 中worker表中的屬性,以及tasks表屬性
var workers = {};
/*{Task: {reserve_timer: TimerId,
reserve_time: Number,
worker: WorkerId} }*/
var tasks = {};
var matcher = Matcher.create(spec.purpose), //根據指定的應用類型名稱創(chuàng)建對應matcher
strategy = Strategy.create(spec.strategy), //根據指定的策略創(chuàng)建對應matcher
schedule_reserve_time = spec.scheduleReserveTime;
that.schedule = function (task, preference, reserveTime, on_ok, on_error) { //參數1是需要調度的任務編號,參數2是該任務的偏好配置
if (tasks[task]) { //該任務編號在處理記錄中
var newReserveTime = reserveTime && tasks[task].reserve_time < reserveTime ? reserveTime : tasks[task].reserve_time, //更新保留時長
worker = tasks[task].worker; //正在處理該 task 的 worker
if (workers[worker]) { // worker還在記錄中
if (isTaskInExecution(task, worker)) { //該任務正在執(zhí)行
tasks[task].reserve_time = newReserveTime; //更新任務保留時長
} else { //任務沒在執(zhí)行
reserveWorkerForTask(task, worker, newReserveTime); //向worker申請該task執(zhí)行時長
}
return on_ok(worker, workers[worker].info); //回調指定的 worker 和 它的信息,并返回
} else { //如果 worker 沒了
repealTask(task); //清理該任務記錄,準備重新分派
}
}
var candidates = [];
for (var worker in workers) {
if (isWorkerAvailable(workers[worker])) { //衡量worker負載和狀態(tài)
candidates.push(worker); //加入候選
}
}
if (candidates.length < 1) {
return on_error('No worker available, all in full load.');
}
candidates = matcher.match(preference, workers, candidates); //matcher它來了,基于任務偏好、matcher類型篩選候選者
if (candidates.length < 1) {
return on_error('No worker matches the preference.');
} else {
strategy.allocate(workers, candidates, function (worker) { //strategy它來了,基于strategy 策略選擇合適的 worker
reserveWorkerForTask(task, worker, (reserveTime && reserveTime > 0 ? reserveTime : schedule_reserve_time)); //為該task分配記錄
on_ok(worker, workers[worker].info); //回調
}, on_error);
}
};
}
總結:scheduler 使用了策略方式,結合strategy 和matcher 模塊,將不同配置對應的策略與策略的執(zhí)行進行解耦,方便擴展,是良好設計模式的體現(xiàn)。
3.3 index.js --- cluster_manager 模塊入口
在節(jié)3中,提到了該模塊下的index.js是入口程序,下面簡單介紹一下,以形成模塊到進程(程序)的概念。
首先是引用的庫
var amqper = require('./amqp_client')); //amqp封裝模塊
var logger = require('./logger').logger; //日志
var log = logger.getLogger('Main');
var ClusterManager = require('./clusterManager'); //cluster_manager模塊
var toml = require('toml'); //配置文件模塊
var fs = require('fs'); //文件系統(tǒng)模塊
其次,配置文件讀取、配置設置
var config;
try {
config = toml.parse(fs.readFileSync('./cluster_manager.toml')); //可以讀一下配置文件,更加清晰
} catch (e) {
log.error('Parsing config error on line ' + e.line + ', column ' + e.column + ': ' + e.message);
process.exit(1);
}
config.manager = config.manager || {}; //manager配置
config.manager.name = config.manager.name || 'owt-cluster'; //manager名字
config.manager.initial_time = config.manager.initial_time || 10 * 1000; //啟動時間
config.manager.check_alive_interval = config.manager.check_alive_interval || 1000; //manager 檢查 worker 失聯(lián)的時間間隔
config.manager.check_alive_count = config.manager.check_alive_count || 10; //manager 剔除失聯(lián) worker 前的最大檢查次數
config.manager.schedule_reserve_time = config.manager.schedule_reserve_time || 60 * 1000; //調度默認保留時間(僅當調度請求沒有該字段時)
config.strategy = config.strategy || {}; //調度策略, 以下為各種應用類型的默認調度策略
config.strategy.general = config.strategy.general || 'round-robin';
config.strategy.portal = config.strategy.portal || 'last-used';
config.strategy.conference = config.strategy.conference || 'last-used';
config.strategy.webrtc = config.strategy.webrtc || 'last-used';
config.strategy.sip = config.strategy.sip || 'round-robin';
config.strategy.streaming = config.strategy.streaming || 'round-robin';
config.strategy.recording = config.strategy.recording || 'randomly-pick';
config.strategy.audio = config.strategy.audio || 'most-used';
config.strategy.video = config.strategy.video || 'least-used';
config.strategy.analytics = config.strategy.analytics || 'least-used';
config.rabbit = config.rabbit || {}; //rabbitmq配置
config.rabbit.host = config.rabbit.host || 'localhost'; //rabbitmq地址
config.rabbit.port = config.rabbit.port || 5672; //rabbitmq端口
最后,
function startup () {
var enableService = function () {
var id = Math.floor(Math.random() * 1000000000); //生成隨機id
var spec = {initialTime: config.manager.initial_time,
checkAlivePeriod: config.manager.check_alive_interval,
checkAliveCount: config.manager.check_alive_count,
scheduleKeepTime: config.manager.schedule_reserve_time,
strategy: config.strategy
};
amqper.asTopicParticipant(config.manager.name + '.management', function(channel) { //利用amqp封裝庫加入主題,得到句柄channel
log.info('Cluster manager up! id:', id);
ClusterManager.run(channel, config.manager.name, id, spec); //使用配置、隨機、句柄,啟動ClusterManager
}, function(reason) {
log.error('Cluster manager initializing failed, reason:', reason);
process.exit();
});
};
amqper.connect(config.rabbit, function () { //amqp封裝庫連接rabbitmq消息隊列
enableService(); //啟動上述服務
}, function(reason) {
log.error('Cluster manager connect to rabbitMQ server failed, reason:', reason);
process.exit();
});
}
startup(); //啟動
... //省略其他系統(tǒng)信號設置
至此,集群管理者模塊,即cluster_manager模塊以及基本分解完畢。下一步就是需要理清集群工作站,以及二者如何協(xié)調工作。此外就是二者所依賴的底層消息隊列究竟做了些什么,還有怎么實現(xiàn)的。
