ambari-agent心跳流程分析

ambari-agent啟動流程

??ambari-agent啟動通過命令ambari-agent start,命令實際執(zhí)行nohup $PYTHON $AMBARI_AGENT_PY_SCRIPTpython AmbariAgent.py

AMBARI_AGENT_PY_SCRIPT=/usr/lib/ambari-agent/lib/ambari_agent/AmbariAgent.py

??AmbariAgent.py通過subprocess子進程執(zhí)行main.py,該進程會會從配置文件中獲取相關需要的配置信息,數據文件清理(err,auto,output*等文件),通過啟動參數來重啟或關閉agent,監(jiān)聽端口,獲取server的地址,然后和server建立連接,一旦建立連接,調用run-thread方法開始agent和server的通信過程,在run_threads中啟動了Controller線程。
??Ambari-agent 通過Controller.py的方法與Ambari-server 中的AgentResource(org.apache.ambari.server.agent.rest) 進行交互(獲得集群配置變更,報告節(jié)點屬性,以及節(jié)點上運行服務運行狀態(tài)),并通過HTTP Response返回ambari-server投遞過來的狀態(tài)操作到操作隊列ActionQueue。ActionQueue默認是使用并行模式執(zhí)行command

Post請求 注冊關于主機的信息
Post請求 心跳連接(更新節(jié)點的狀態(tài))
Get 請求 檢索用于集群上的組件映射

基礎參數釋義

ExecuteCommand:對服務組件執(zhí)行INSTALL/START/STOP等操作。

StatusCommand:對服務組件執(zhí)行死活檢查(由Server定期下發(fā))。需要server確定

CancelCommand:取消其他已經下發(fā)的Task(當Stage中的某個Task失敗時)。

RegistrationCommand:要求Agent向Server重新注冊(當發(fā)現Server維護的心跳序號與Agent上報的不一致時)

心跳流程源碼分析

heartbeatWithServer方法屬性釋義

self.DEBUG_HEARTBEAT_RETRIES = 0 # debug心跳重試次數
self.DEBUG_SUCCESSFULL_HEARTBEATS = 0 # debug心跳成功次數 
retry = False # 是否重試,心跳開始的時候默認False
certVerifFailed = False 
state_interval = int(self.config.get('heartbeat', 'state_interval_seconds', '60')) # 日志記錄間隔 默認60s
 # last time when state was successfully sent to server
last_state_timestamp = 0.0 # 上一次記錄日志的時間
# 為了確保心跳的正常運行,我們會記錄一些日志,但是為了避免高頻次的日志記錄,引用了state_interval來間隔記錄日志
heartbeat_running_msg_timestamp = 0.0 # 

# 通過只在特定的時間間隔內進行日志記錄來防止過度日志記錄
getrecoverycommands_timestamp = 0.0 # 獲取recoverycommands的時間戳
getrecoverycommands_interval = self.netutil.HEARTBEAT_IDLE_INTERVAL_DEFAULT_MAX_SEC #獲取recoverycommands時間間隔默認為10s
heartbeat_interval = self.netutil.HEARTBEAT_IDLE_INTERVAL_DEFAULT_MAX_SEC # 心跳的默認時間間隔是10s

出于測試目的

DEBUG_HEARTBEAT_RETRIES = 0
DEBUG_SUCCESSFULL_HEARTBEATS = 0
DEBUG_STOP_HEARTBEATING = False

非測試環(huán)境下初始化心跳進入while循環(huán)

while not self.DEBUG_STOP_HEARTBEATING:

while循環(huán)體內部解析:

  1. 默認的日志級別為DEBUG,如果當前時間 - 心跳日志輸出時間 > 日志輸出間隔 則將日志級別改成INFO,將心跳日志輸出時間設為當前時間
while not self.DEBUG_STOP_HEARTBEATING:
    current_time = time.time()
    logging_level = logging.DEBUG
    if current_time - heartbeat_running_msg_timestamp > state_interval:
        # log more steps every minute or so
        logging_level = logging.INFO
        heartbeat_running_msg_timestamp = current_time
  1. 發(fā)送心跳前的數據準備
  • 日志記錄當前的responseId,注冊系統(tǒng)時返回的或者默認的-1
  • send_state 是否發(fā)送系統(tǒng)狀態(tài)信息(通過shell命令獲取主機信息) ,默認不發(fā)送
  • 判斷是否重試retry,如果為true,則DEBUG_HEARTBEAT_RETRIES +1 否則判斷當前時間 - 上一次發(fā)送系統(tǒng)狀態(tài)信息的時間 > state_interval(10s) 如果為true,則send_state 置為true,意味著需要發(fā)送系統(tǒng)狀態(tài)信息給ambari-server端
  • 組織需要發(fā)送給ambari-server的json數據
  • 根據當前設置的日志級別 進行日志記錄.如果是INFO級別的則會記錄下當前發(fā)出的json數據
try:
    logger.log(logging_level, "Heartbeat (response id = %s) with server is running...", self.responseId)
    send_state = False
    if not retry:
        if current_time - last_state_timestamp > state_interval:
            send_state = True
        logger.log(logging_level, "Building heartbeat message")
        data = json.dumps(self.heartbeat.build(self.responseId, send_state, self.hasMappedComponents))
    else:
        self.DEBUG_HEARTBEAT_RETRIES += 1
    if logger.isEnabledFor(logging.DEBUG):
        logger.log(logging_level, "Sending Heartbeat (id = %s): %s", self.responseId, data)
    else:
        logger.log(logging_level, "Sending Heartbeat (id = %s)", self.responseId)
  1. 與ambari-server建立通信并解析response
    3.1 發(fā)送請求至ambari-server,并接受server傳過來的response
    • exitstatus==0則表示本次通信成功, 不為0則拋出異常
    • 獲取responseId,并記錄日志
    • 或者集群節(jié)點數,來及時調整心跳的頻率,如果集群節(jié)點>0 則 集群節(jié)點//HEARTBEAT_IDLE_INTERVAL_DEFAULT_MAX_SEC(10s)取整除 - 返回商的整數部分(向下取整)
    # HEARTBEAT_IDLE_INTERVAL_DEFAULT_MAX_SEC = 10
    # HEARTBEAT_IDLE_INTERVAL_DEFAULT_MIN_SEC = 1
    if(0 < cluster_size and cluster_size < 9 ) 
        cluster_size // HEARTBEAT_IDLE_INTERVAL_DEFAULT_MAX_SEC = 0 ==> 則心跳頻率為1s
    if(cluster_size > 9 and cluster_size <=100)
        cluster_size // HEARTBEAT_IDLE_INTERVAL_DEFAULT_MAX_SEC 整除取返回商部分
    if(cluster_size > 100 )
        cluster_size // HEARTBEAT_IDLE_INTERVAL_DEFAULT_MAX_SEC =10===>則心跳頻率為10s
    
    • 日志記錄新的心跳頻率
response = self.sendRequest(self.heartbeatUrl, data)
exitStatus = 0
if 'exitstatus' in response.keys():
    exitStatus = int(response['exitstatus'])
if exitStatus != 0:
    raise Exception(response)
serverId = int(response['responseId'])
logger.log(logging_level, 'Heartbeat response received (id = %s)', serverId)
cluster_size = int(response['clusterSize']) if 'clusterSize' in response.keys() else -1
# TODO: this needs to be revised if hosts can be shared across multiple clusters
heartbeat_interval = self.get_heartbeat_interval(cluster_size) \
    if cluster_size > 0 \
    else self.netutil.HEARTBEAT_IDLE_INTERVAL_DEFAULT_MAX_SEC
logger.log(logging_level, "Heartbeat interval is %s seconds", heartbeat_interval)
  1. 根據ambari-server 返回的值來調整部分設置
    4.1 是否有映射的組件信息-----主要是心跳發(fā)送數據的時候如果無組件映射的話,會執(zhí)行一些比較耗費性能的操作,根據response返回的信息來決定是否執(zhí)行該操作
    4.2 是否有處于等待的task,如果有的話,會暫停 自動恢復管理機(recovery_manager)的操作
    4.3 是否存在注冊命令,如果存在的話,則退出心跳,將設置isRegistered = False 以及repeatRegistration = True 并再次進行注冊操作,意味著從頭再來一次
if 'hasMappedComponents' in response.keys():
    self.hasMappedComponents = response['hasMappedComponents'] is not False
if 'hasPendingTasks' in response.keys():
    has_pending_tasks = bool(response['hasPendingTasks'])
    self.recovery_manager.set_paused(has_pending_tasks)
if 'registrationCommand' in response.keys():
    # check if the registration command is None. If none skip
    if response['registrationCommand'] is not None:
        logger.info("RegistrationCommand received - repeat agent registration")
        self.isRegistered = False
        self.repeatRegistration = True
        return
  1. 根據指標來決定是否需要重啟agent
    5.1 獲取當前進程使用的內存大小(單位KB)/1000 = MB
    默認的軟性指標為400MB 硬性指標為1000GB 可通過ambari-agent.ini配置
    • 當超過軟性指標時且沒有正在處理的任務時,進行agent重啟
    • 當大于等于硬性指標時,則強制進行agent重啟

5.2 ambari-server返回的responseId與agent當前記錄的responseId+1做比較,如果不相符則進入重啟反之則更新最新的responseId,并更新上一次獲取主機狀態(tài)信息的時間last_state_timestamp=current_time

used_ram = get_used_ram() / 1000
# dealing with a possible memory leaks
if self.max_ram_soft and used_ram >= self.max_ram_soft and not self.actionQueue.tasks_in_progress_or_pending():
    logger.error(
        AGENT_RAM_OVERUSE_MESSAGE.format(used_ram=used_ram, config_name="memory_threshold_soft_mb",
                                         max_ram=self.max_ram_soft))
    self.restartAgent()
if self.max_ram_hard and used_ram >= self.max_ram_hard:
    logger.error(
        AGENT_RAM_OVERUSE_MESSAGE.format(used_ram=used_ram, config_name="memory_threshold_hard_mb",
                                         max_ram=self.max_ram_hard))
    self.restartAgent()
if serverId != self.responseId + 1:
    logger.error(
        "Error in responseId sequence - received responseId={0} from server while expecting {1} - restarting..."
        .format(serverId, self.responseId + 1))
    self.restartAgent()
else:
    self.responseId = serverId
    if send_state:
        last_state_timestamp = current_time
  1. 通過心跳返回信息更新agent配置
# if the response contains configurations, update the in-memory and
# disk-based configuration cache (execution and alert commands have this)
logger.log(logging_level, "Updating configurations from heartbeat")
self.cluster_configuration.update_configurations_from_heartbeat(response)

7.根據ambari-server返回的不同command來分別執(zhí)行相應的操作

  • 因為需要取消的Commands可能會在其它類型Commands之前進行,這可能導致actionQueue執(zhí)行操作的混亂,所以為了避免命令執(zhí)行失敗,所以會將actionQueue進行原子性操作,
  • 先將cancelCommand進行移除(從actionQueue中remove假如它還沒執(zhí)行的話,或者直接kill假如它已經在執(zhí)行中)
  • executionCommands存在的話,則recovery_manager會根據執(zhí)行的操作來動態(tài)調整預期狀態(tài)方便之后進行recover,緊接著將executionCommands放入actionQueue
  • statusCommands存在的話,recovery_manager會執(zhí)行相應的操作,并放入statusCommandsExecutor通過線程去執(zhí)行
  • 通過時間的間隔查詢來定期生成recovery_commands
# there's case when canceled task can be processed in
# Action Queue.execute before adding rescheduled task to queue
# this can cause command failure instead result suppression
# so canceling and putting rescheduled commands should be executed atomically
if 'cancelCommands' in response_keys or 'executionCommands' in response_keys:
    logger.log(logging_level, "Adding cancel/execution commands")
with self.actionQueue.lock:
    if 'cancelCommands' in response_keys:
        self.cancelCommandInQueue(response['cancelCommands'])

    if 'executionCommands' in response_keys:
        execution_commands = response['executionCommands']
        self.recovery_manager.process_execution_commands(execution_commands)
        self.addToQueue(execution_commands)

if 'statusCommands' in response_keys:
    # try storing execution command details and desired state
    self.addToStatusQueue(response['statusCommands'])

if current_time - getrecoverycommands_timestamp > getrecoverycommands_interval:
    getrecoverycommands_timestamp = current_time
    if not self.actionQueue.tasks_in_progress_or_pending():
        logger.log(logging_level, "Adding recovery commands")
        recovery_commands = self.recovery_manager.get_recovery_commands()
        for recovery_command in recovery_commands:
            logger.info("Adding recovery command %s for component %s",
                        recovery_command['roleCommand'], recovery_command['role'])
            self.addToQueue([recovery_command])

if 'alertDefinitionCommands' in response_keys:
    logger.log(logging_level, "Updating alert definitions")
    self.alert_scheduler_handler.update_definitions(response)

if 'alertExecutionCommands' in response_keys:
    logger.log(logging_level, "Executing alert commands")
    self.alert_scheduler_handler.execute_alert(response['alertExecutionCommands'])

  1. server下發(fā)restart命令的話則進行agent重啟,如果response中存在recoveryConfig則進行配置更新
if "true" == response['restartAgent']:
    logger.error("Received the restartAgent command")
    self.restartAgent()
else:
    logger.debug("No commands sent from %s", self.serverHostname)

if retry:
    logger.info("Reconnected to %s", self.heartbeatUrl)

if "recoveryConfig" in response:
    # update the list of components enabled for recovery
    logger.log(logging_level, "Updating recovery config")
    self.recovery_manager.update_configuration_from_registration(response)

retry = False
certVerifFailed = False
self.DEBUG_SUCCESSFULL_HEARTBEATS += 1
self.DEBUG_HEARTBEAT_RETRIES = 0
self.heartbeat_stop_callback.reset_heartbeat()
  1. heartbeat_stop_callback 這個方法使用了python的threading,每一次心跳結束后進入阻塞,thread.wait(timeout)后才會進入下一次心跳,特殊情況,一旦非statusCommand類型的命令執(zhí)行完成,也會立即發(fā)送心跳,觸發(fā)threading.event.set(),立即進入下一次心跳

相關博客

ambari-agent 源碼梳理

如果有相關問題的話,可以給我留言,歡迎一起探討ambari-agent !!

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

友情鏈接更多精彩內容