ambari-agent啟動流程
??ambari-agent啟動通過命令ambari-agent start,命令實際執(zhí)行nohup $PYTHON $AMBARI_AGENT_PY_SCRIPT即python AmbariAgent.py
AMBARI_AGENT_PY_SCRIPT=/usr/lib/ambari-agent/lib/ambari_agent/AmbariAgent.py
??AmbariAgent.py通過subprocess子進程執(zhí)行main.py,該進程會會從配置文件中獲取相關需要的配置信息,數據文件清理(err,auto,output*等文件),通過啟動參數來重啟或關閉agent,監(jiān)聽端口,獲取server的地址,然后和server建立連接,一旦建立連接,調用run-thread方法開始agent和server的通信過程,在run_threads中啟動了Controller線程。
??Ambari-agent 通過Controller.py的方法與Ambari-server 中的AgentResource(org.apache.ambari.server.agent.rest) 進行交互(獲得集群配置變更,報告節(jié)點屬性,以及節(jié)點上運行服務運行狀態(tài)),并通過HTTP Response返回ambari-server投遞過來的狀態(tài)操作到操作隊列ActionQueue。ActionQueue默認是使用并行模式執(zhí)行command
Post請求 注冊關于主機的信息
Post請求 心跳連接(更新節(jié)點的狀態(tài))
Get 請求 檢索用于集群上的組件映射
基礎參數釋義
ExecuteCommand:對服務組件執(zhí)行INSTALL/START/STOP等操作。
StatusCommand:對服務組件執(zhí)行死活檢查(由Server定期下發(fā))。需要server確定
CancelCommand:取消其他已經下發(fā)的Task(當Stage中的某個Task失敗時)。
RegistrationCommand:要求Agent向Server重新注冊(當發(fā)現Server維護的心跳序號與Agent上報的不一致時)
心跳流程源碼分析
heartbeatWithServer方法屬性釋義
self.DEBUG_HEARTBEAT_RETRIES = 0 # debug心跳重試次數
self.DEBUG_SUCCESSFULL_HEARTBEATS = 0 # debug心跳成功次數
retry = False # 是否重試,心跳開始的時候默認False
certVerifFailed = False
state_interval = int(self.config.get('heartbeat', 'state_interval_seconds', '60')) # 日志記錄間隔 默認60s
# last time when state was successfully sent to server
last_state_timestamp = 0.0 # 上一次記錄日志的時間
# 為了確保心跳的正常運行,我們會記錄一些日志,但是為了避免高頻次的日志記錄,引用了state_interval來間隔記錄日志
heartbeat_running_msg_timestamp = 0.0 #
# 通過只在特定的時間間隔內進行日志記錄來防止過度日志記錄
getrecoverycommands_timestamp = 0.0 # 獲取recoverycommands的時間戳
getrecoverycommands_interval = self.netutil.HEARTBEAT_IDLE_INTERVAL_DEFAULT_MAX_SEC #獲取recoverycommands時間間隔默認為10s
heartbeat_interval = self.netutil.HEARTBEAT_IDLE_INTERVAL_DEFAULT_MAX_SEC # 心跳的默認時間間隔是10s
出于測試目的
DEBUG_HEARTBEAT_RETRIES = 0
DEBUG_SUCCESSFULL_HEARTBEATS = 0
DEBUG_STOP_HEARTBEATING = False
非測試環(huán)境下初始化心跳進入while循環(huán)
while not self.DEBUG_STOP_HEARTBEATING:
while循環(huán)體內部解析:
- 默認的日志級別為DEBUG,如果
當前時間 - 心跳日志輸出時間 > 日志輸出間隔則將日志級別改成INFO,將心跳日志輸出時間設為當前時間
while not self.DEBUG_STOP_HEARTBEATING:
current_time = time.time()
logging_level = logging.DEBUG
if current_time - heartbeat_running_msg_timestamp > state_interval:
# log more steps every minute or so
logging_level = logging.INFO
heartbeat_running_msg_timestamp = current_time
- 發(fā)送心跳前的數據準備
- 日志記錄當前的responseId,注冊系統(tǒng)時返回的或者默認的-1
- send_state 是否發(fā)送系統(tǒng)狀態(tài)信息(通過shell命令獲取主機信息) ,默認不發(fā)送
- 判斷是否重試retry,如果為true,則DEBUG_HEARTBEAT_RETRIES +1 否則判斷
當前時間 - 上一次發(fā)送系統(tǒng)狀態(tài)信息的時間 > state_interval(10s)如果為true,則send_state 置為true,意味著需要發(fā)送系統(tǒng)狀態(tài)信息給ambari-server端 - 組織需要發(fā)送給ambari-server的json數據
- 根據當前設置的日志級別 進行日志記錄.如果是INFO級別的則會記錄下當前發(fā)出的json數據
try:
logger.log(logging_level, "Heartbeat (response id = %s) with server is running...", self.responseId)
send_state = False
if not retry:
if current_time - last_state_timestamp > state_interval:
send_state = True
logger.log(logging_level, "Building heartbeat message")
data = json.dumps(self.heartbeat.build(self.responseId, send_state, self.hasMappedComponents))
else:
self.DEBUG_HEARTBEAT_RETRIES += 1
if logger.isEnabledFor(logging.DEBUG):
logger.log(logging_level, "Sending Heartbeat (id = %s): %s", self.responseId, data)
else:
logger.log(logging_level, "Sending Heartbeat (id = %s)", self.responseId)
- 與ambari-server建立通信并解析response
3.1 發(fā)送請求至ambari-server,并接受server傳過來的response- exitstatus==0則表示本次通信成功, 不為0則拋出異常
- 獲取responseId,并記錄日志
- 或者集群節(jié)點數,來及時調整心跳的頻率,如果集群節(jié)點>0 則
集群節(jié)點//HEARTBEAT_IDLE_INTERVAL_DEFAULT_MAX_SEC(10s)取整除 - 返回商的整數部分(向下取整)
# HEARTBEAT_IDLE_INTERVAL_DEFAULT_MAX_SEC = 10 # HEARTBEAT_IDLE_INTERVAL_DEFAULT_MIN_SEC = 1 if(0 < cluster_size and cluster_size < 9 ) cluster_size // HEARTBEAT_IDLE_INTERVAL_DEFAULT_MAX_SEC = 0 ==> 則心跳頻率為1s if(cluster_size > 9 and cluster_size <=100) cluster_size // HEARTBEAT_IDLE_INTERVAL_DEFAULT_MAX_SEC 整除取返回商部分 if(cluster_size > 100 ) cluster_size // HEARTBEAT_IDLE_INTERVAL_DEFAULT_MAX_SEC =10===>則心跳頻率為10s- 日志記錄新的心跳頻率
response = self.sendRequest(self.heartbeatUrl, data)
exitStatus = 0
if 'exitstatus' in response.keys():
exitStatus = int(response['exitstatus'])
if exitStatus != 0:
raise Exception(response)
serverId = int(response['responseId'])
logger.log(logging_level, 'Heartbeat response received (id = %s)', serverId)
cluster_size = int(response['clusterSize']) if 'clusterSize' in response.keys() else -1
# TODO: this needs to be revised if hosts can be shared across multiple clusters
heartbeat_interval = self.get_heartbeat_interval(cluster_size) \
if cluster_size > 0 \
else self.netutil.HEARTBEAT_IDLE_INTERVAL_DEFAULT_MAX_SEC
logger.log(logging_level, "Heartbeat interval is %s seconds", heartbeat_interval)
- 根據ambari-server 返回的值來調整部分設置
4.1 是否有映射的組件信息-----主要是心跳發(fā)送數據的時候如果無組件映射的話,會執(zhí)行一些比較耗費性能的操作,根據response返回的信息來決定是否執(zhí)行該操作
4.2 是否有處于等待的task,如果有的話,會暫停 自動恢復管理機(recovery_manager)的操作
4.3 是否存在注冊命令,如果存在的話,則退出心跳,將設置isRegistered = False 以及repeatRegistration = True 并再次進行注冊操作,意味著從頭再來一次
if 'hasMappedComponents' in response.keys():
self.hasMappedComponents = response['hasMappedComponents'] is not False
if 'hasPendingTasks' in response.keys():
has_pending_tasks = bool(response['hasPendingTasks'])
self.recovery_manager.set_paused(has_pending_tasks)
if 'registrationCommand' in response.keys():
# check if the registration command is None. If none skip
if response['registrationCommand'] is not None:
logger.info("RegistrationCommand received - repeat agent registration")
self.isRegistered = False
self.repeatRegistration = True
return
- 根據指標來決定是否需要重啟agent
5.1 獲取當前進程使用的內存大小(單位KB)/1000 = MB
默認的軟性指標為400MB 硬性指標為1000GB 可通過ambari-agent.ini配置- 當超過軟性指標時且沒有正在處理的任務時,進行agent重啟
- 當大于等于硬性指標時,則強制進行agent重啟
5.2 ambari-server返回的responseId與agent當前記錄的responseId+1做比較,如果不相符則進入重啟反之則更新最新的responseId,并更新上一次獲取主機狀態(tài)信息的時間last_state_timestamp=current_time
used_ram = get_used_ram() / 1000
# dealing with a possible memory leaks
if self.max_ram_soft and used_ram >= self.max_ram_soft and not self.actionQueue.tasks_in_progress_or_pending():
logger.error(
AGENT_RAM_OVERUSE_MESSAGE.format(used_ram=used_ram, config_name="memory_threshold_soft_mb",
max_ram=self.max_ram_soft))
self.restartAgent()
if self.max_ram_hard and used_ram >= self.max_ram_hard:
logger.error(
AGENT_RAM_OVERUSE_MESSAGE.format(used_ram=used_ram, config_name="memory_threshold_hard_mb",
max_ram=self.max_ram_hard))
self.restartAgent()
if serverId != self.responseId + 1:
logger.error(
"Error in responseId sequence - received responseId={0} from server while expecting {1} - restarting..."
.format(serverId, self.responseId + 1))
self.restartAgent()
else:
self.responseId = serverId
if send_state:
last_state_timestamp = current_time
- 通過心跳返回信息更新agent配置
# if the response contains configurations, update the in-memory and
# disk-based configuration cache (execution and alert commands have this)
logger.log(logging_level, "Updating configurations from heartbeat")
self.cluster_configuration.update_configurations_from_heartbeat(response)
7.根據ambari-server返回的不同command來分別執(zhí)行相應的操作
- 因為需要取消的Commands可能會在其它類型Commands之前進行,這可能導致actionQueue執(zhí)行操作的混亂,所以為了避免命令執(zhí)行失敗,所以會將actionQueue進行原子性操作,
- 先將cancelCommand進行移除(從actionQueue中remove假如它還沒執(zhí)行的話,或者直接kill假如它已經在執(zhí)行中)
- executionCommands存在的話,則recovery_manager會根據執(zhí)行的操作來動態(tài)調整預期狀態(tài)方便之后進行recover,緊接著將executionCommands放入actionQueue
- statusCommands存在的話,recovery_manager會執(zhí)行相應的操作,并放入statusCommandsExecutor通過線程去執(zhí)行
- 通過時間的間隔查詢來定期生成recovery_commands
# there's case when canceled task can be processed in
# Action Queue.execute before adding rescheduled task to queue
# this can cause command failure instead result suppression
# so canceling and putting rescheduled commands should be executed atomically
if 'cancelCommands' in response_keys or 'executionCommands' in response_keys:
logger.log(logging_level, "Adding cancel/execution commands")
with self.actionQueue.lock:
if 'cancelCommands' in response_keys:
self.cancelCommandInQueue(response['cancelCommands'])
if 'executionCommands' in response_keys:
execution_commands = response['executionCommands']
self.recovery_manager.process_execution_commands(execution_commands)
self.addToQueue(execution_commands)
if 'statusCommands' in response_keys:
# try storing execution command details and desired state
self.addToStatusQueue(response['statusCommands'])
if current_time - getrecoverycommands_timestamp > getrecoverycommands_interval:
getrecoverycommands_timestamp = current_time
if not self.actionQueue.tasks_in_progress_or_pending():
logger.log(logging_level, "Adding recovery commands")
recovery_commands = self.recovery_manager.get_recovery_commands()
for recovery_command in recovery_commands:
logger.info("Adding recovery command %s for component %s",
recovery_command['roleCommand'], recovery_command['role'])
self.addToQueue([recovery_command])
if 'alertDefinitionCommands' in response_keys:
logger.log(logging_level, "Updating alert definitions")
self.alert_scheduler_handler.update_definitions(response)
if 'alertExecutionCommands' in response_keys:
logger.log(logging_level, "Executing alert commands")
self.alert_scheduler_handler.execute_alert(response['alertExecutionCommands'])
- server下發(fā)restart命令的話則進行agent重啟,如果response中存在recoveryConfig則進行配置更新
if "true" == response['restartAgent']:
logger.error("Received the restartAgent command")
self.restartAgent()
else:
logger.debug("No commands sent from %s", self.serverHostname)
if retry:
logger.info("Reconnected to %s", self.heartbeatUrl)
if "recoveryConfig" in response:
# update the list of components enabled for recovery
logger.log(logging_level, "Updating recovery config")
self.recovery_manager.update_configuration_from_registration(response)
retry = False
certVerifFailed = False
self.DEBUG_SUCCESSFULL_HEARTBEATS += 1
self.DEBUG_HEARTBEAT_RETRIES = 0
self.heartbeat_stop_callback.reset_heartbeat()
- heartbeat_stop_callback 這個方法使用了python的threading,每一次心跳結束后進入阻塞,thread.wait(timeout)后才會進入下一次心跳,特殊情況,一旦非statusCommand類型的命令執(zhí)行完成,也會立即發(fā)送心跳,觸發(fā)threading.event.set(),立即進入下一次心跳
相關博客
如果有相關問題的話,可以給我留言,歡迎一起探討ambari-agent !!