RQ源碼閱讀

數(shù)據(jù)字典

rq:worker:名稱

  • 用途:記錄每個worker的相關信息
  • 數(shù)據(jù)類型:Hash
  • 字段:
    • birth:開始工作的時間
    • queues:工作的隊列名稱,多個隊列以逗號分隔
    • death:「死亡」的時間
    • shutdown_requested_date:worker被停掉的時間
    • state:當前的狀態(tài)
    • current_job:正在進行中的Job ID
    • started_at:開始當前執(zhí)行當前的job的時間

rq:workers

  • 用途:記錄當前正在工作的worker名稱
  • 數(shù)據(jù)類型:Set

rq:wip:隊列名稱

  • 用途:記錄每個隊列正在執(zhí)行中的任務
  • 數(shù)據(jù)結(jié)構(gòu):SortedSet
  • 說明:每個member的score是job的expiration time(unix時間戳)

rq:deferred:隊列名稱

TBD

rq:job:任務ID:dependents

  • 用途:
  • 數(shù)據(jù)結(jié)構(gòu)

rq:job:任務ID

  • 用途:記錄每個任務的信息
  • 數(shù)據(jù)結(jié)構(gòu):Hash
  • 字段:
    • created_at: 創(chuàng)建時間
    • data:
    • origin: 隊列名稱
    • description: 任務的描述信息,主要是寫日志時用于區(qū)分不同的任務
    • enqueued_at: 進入隊列時間(UNIX時間戳)
    • started_at: 開始執(zhí)行時間(UNIX時間戳)
    • ended_at: 執(zhí)行結(jié)束時間(UNIX時間戳)
    • result: 任務執(zhí)行結(jié)果
    • exc_info: 拋出異常的相關信息
    • timeout: 任務停留在隊列中的最長時間
    • result_ttl: 任務執(zhí)行結(jié)果的過期時間
    • status: 任務的當前狀態(tài)
    • dependency_id: 依賴的任務的ID
    • meta: 供任務生產(chǎn)者自由設置的meta信息,原數(shù)據(jù)是字典,這里pickle.dumps后存入redis
    • ttl: 任務的最長執(zhí)行時間

rq:queues

  • 用途: 記錄當前存在的隊列名稱
  • 數(shù)據(jù)結(jié)構(gòu): Set

rq:queue:隊列名稱

  • 用途: 記錄當前隊列排隊中的任務ID
  • 數(shù)據(jù)結(jié)構(gòu): List

rq:finished:隊列名稱

  • 用途: 記錄當前已經(jīng)完成的任務ID
  • 數(shù)據(jù)結(jié)構(gòu): SortedSet
  • 說明: 每個member的score是job的「執(zhí)行結(jié)果」過期時間(unix時間戳)

worker的生命周期

首先貼一下官方文檔中關于worker的生命周期的說明,來一個整體的認識:

The life-cycle of a worker consists of a few phases:

  1. Boot. Loading the Python environment.
    2.Birth registration. The worker registers itself to the system so it knows of this worker.
    3.Start listening. A job is popped from any of the given Redis queues. If all queues are empty and the worker is running in burst mode, quit now. Else, wait until jobs arrive.
    4.Prepare job execution. The worker tells the system that it will begin work by setting its status to busy and registers job in the StartedJobRegistry.
    5.Fork a child process. A child process (the "work horse") is forked off to do the actual work in a fail-safe context.
    6.Process work. This performs the actual job work in the work horse.
    7.Cleanup job execution. The worker sets its status to idle and sets both the job and its result to expire based on result_ttl. Job is also removed from StartedJobRegistry and added to to FinishedJobRegistry in the case of successful execution, or FailedQueue in the case of failure.
    8.Loop. Repeat from step 3.

任務的幾種狀態(tài)

def enum(name, *sequential, **named):
    """
    通過元類來實現(xiàn)枚舉類型
    """
    values = dict(zip(sequential, range(len(sequential))), **named)   
    
    # NOTE: Yes, we *really* want to cast using str() here.
    # On Python 2 type() requires a byte string (which is str() on Python 2).
    # On Python 3 it does not matter, so we'll use str(), which acts as
    # a no-op.    
    return type(str(name), (), values)

# 這里看到rq中的job的幾種狀態(tài)
JobStatus = enum(
    'JobStatus',
    QUEUED='queued',
    FINISHED='finished',
    FAILED='failed',
    STARTED='started',
    DEFERRED='deferred'
)

異常處理機制

如果執(zhí)行任務過程中拋出異常怎么辦?

# worker.py
def perform_job(self, job, queue):
    # 省略與異常處理無關的代碼
    try:
        job.perform()
    except Exception:
        # 在實際執(zhí)行任務的函數(shù)外捕獲所有異常(也可能因為超時)

        """Handles the failure or an executing job by:    
            1. Setting the job status to failed
            2. Removing the job from the started_job_registry
            3. Setting the workers current job to None
        """
        self.handle_job_failure(
            job=job,
            started_job_registry=started_job_registry
        )
        
        # 這里值得留意sys.exc_info()的用法
        self.handle_exception(job, *sys.exc_info())
        return False

# 再看看self.handle_exception()方法
def handle_exception(self, job, *exc_info):
    """Walks the exception handler stack to delegate exception handling."""
    exc_string = ''.join(traceback.format_exception_only(*exc_info[:2]) +
                         traceback.format_exception(*exc_info))
    self.log.error(exc_string, exc_info=True, extra={
        'func': job.func_name,
        'arguments': job.args,
        'kwargs': job.kwargs,
        'queue': job.origin,
    })

    for handler in reversed(self._exc_handlers):    
        self.log.debug('Invoking exception handler {0}'.format(handler)
        fallthrough = handler(job, *exc_info)   
      
        # Only handlers with explicit return values should disable further
        # exc handling, so interpret a None return value as True.    
        if fallthrough is None:
            fallthrough = True
        
        if not fallthrough:
            break

從handle_exception方法中可以看到,worker對象的_exc_handlers中可以注冊一系列的異常處理方法,當worker捕獲到異常之后會按handler注冊的先后順序委托它們來處理異常,這樣就可以靈活定制job執(zhí)行失敗的異常處理邏輯了。當然worker也提供了注冊exc_handlers的方法,這里就不展開了。

與任務的執(zhí)行效率有關的數(shù)據(jù)

每個job的信息中有幾個字段可以體現(xiàn)每個job的執(zhí)行效率,分別是:

  • created_at: 進入隊列時記錄
  • started_at: 開始執(zhí)行時記錄
  • ended_at: 執(zhí)行成功時記錄

如果要統(tǒng)計任務隊列的效率問題就可以使用這三個值來進行統(tǒng)計。

任務的優(yōu)先級

首先,每個worker可以同時處理多個隊列中的任務,從隊列中獲取任務的方法如下:

# queue.py
@classmethod
def dequeue_any(cls, queues, timeout, connection=None):
    """Class method returning the job_class instance at the front of the given
    set of Queues, where the order of the queues is important.    

    When all of the Queues are empty, depending on the `timeout` argument, 
    either blocks execution of this function for the duration of the
    timeout or until new messages arrive on any of the queues, or returns    
    None.    

    See the documentation of cls.lpop for the interpretation of timeout.
    """    
    while True:        
        queue_keys = [q.key for q in queues]        
        result = cls.lpop(queue_keys, timeout, connection=connection) 
        # 后面的代碼省略

這里的queue對象的lpop方法是封裝好的,主要是利用redis中的lpop和blpop方法。無論是阻塞式還是非阻塞式地從隊列中獲取任務,如果要同時從多個隊列中獲取任務,優(yōu)先級都跟對worker指定隊列時的先后順序有關。

例如運行worker方式是這樣的:

$ rq worker high normal low

如果high隊列中還有任務,這個worker是永遠不會開始執(zhí)行normal中的任務的。

而對于同一個隊列中的任務,由于使用的是Redis的List結(jié)構(gòu),要么從隊列頭部插入要么從尾部插入,所以正常情況下是FIFO的,當然rq也提供了at_front的選項,能夠從隊列的頭部插入任務。

任務的存活時間

官方文檔是這么說的:

A job has two TTLs, one for the job result and one for the job itself. This means that if you have job that shouldn't be executed after a certain amount of time, you can define a TTL as such:

# When creating the job:
job = Job.create(func=say_hello, ttl=43)
# or when queueing a new job:
job = q.enqueue(count_words_at_url, 'http://nvie.com', ttl=43)

對于每個任務來說,一共有三個限制值:

  • timeout:執(zhí)行任務的超時時間,如果任務從開始執(zhí)行經(jīng)過timeout秒后還沒完成則視為「lost」
  • result_ttl:結(jié)果的保存時間
  • ttl:任務在隊列中的存活時間

TIMEOUT

這部分的整個機制在別的地方都可以照搬借用。先看源碼:

with self.death_penalty_class(job.timeout or self.queue_class.DEFAULT_TIMEOUT):    
    rv = job.perform()

death_penalty_class的源碼在timeouts.py模塊中

這里的death_penalty_class利用了信號中的SIGALRM,超時就會拋出異常。通過這樣的方式限制了任務的執(zhí)行時長:

class UnixSignalDeathPenalty(BaseDeathPenalty):    
    def handle_death_penalty(self, signum, frame):  
        raise JobTimeoutException('Job exceeded maximum timeout '                                  
                                  'value ({0} seconds)'.format(self._timeout))    
    
    def setup_death_penalty(self):      
        """Sets up an alarm signal and a signal handler that raises   
        a JobTimeoutException after the timeout amount (expressed in        
        seconds).        
        """        
        signal.signal(signal.SIGALRM, self.handle_death_penalty)
        signal.alarm(self._timeout)    

    def cancel_death_penalty(self):       
        """Removes the death penalty alarm and puts back the system into        
        default signal handling.        
        """       
        signal.alarm(0)       
        signal.signal(signal.SIGALRM, signal.SIG_DFL)

任務執(zhí)行完畢跳出了context manager之后就會執(zhí)行cancel_death_penalty。

RESULT_TTL

result_ttl最容易理解,result就是每個任務執(zhí)行完畢的返回值,result_ttl就是這個結(jié)果保存在redis中的時間。

如果result_ttl設置成0,這個任務的所有信息(也就是rq:job:[任務ID]這個redis key)會馬上清除;如果是None,該任務信息會永久保存;如果是大于0的值,則會設置rq:jog:[任務就ID]這個key的過期時間,到期自動由redis清除,然后任務的執(zhí)行結(jié)果則會先使用python的pickle.dumps處理,然后保存在result這個字段中。

其實還有一個叫FinishedJobRegistry的東西,它使用的是上文提到的數(shù)據(jù)字典中所說的名為rq:finished:[隊列名稱]的SortedSet。里面保存著每個隊列已經(jīng)執(zhí)行完畢的任務,估計主要是monitor用的。

TTL

設定任務在隊列中的存活時長是通過設置rq:job:[任務ID]這個redis key的超時時間來實現(xiàn)的。達到了超時時間后key都被刪掉了自然就拿不到這個任務了。如果ttl設為None就是不設置超時時間。

平滑關閉worker

為了平滑地關閉,rq的worker注冊了兩個信號的handler:

# worker.py
def _install_signal_handlers(self):    
    """Installs signal handlers for handling SIGINT and SIGTERM
    gracefully.   
    """    
    signal.signal(signal.SIGINT, self.request_stop)
    signal.signal(signal.SIGTERM, self.request_stop)

所以使用SIGINT和SIGTERM都能平滑地關閉worker。再看request_stop方法:

# worker.py
def request_stop(self, signum, frame):    
    """Stops the current worker loop but waits for child processes to    end gracefully (warm shutdown).   
    """   
    self.log.debug('Got signal {0}'.format(signal_name(signum)))
    
    signal.signal(signal.SIGINT, self.request_force_stop)
    signal.signal(signal.SIGTERM, self.request_force_stop)
  
    self.handle_warm_shutdown_request()   
   
    # If shutdown is requested in the middle of a job, wait until   
    # finish before shutting down and save the request in redis
    if self.get_state() == 'busy':  
        self._stop_requested = True
        self.set_shutdown_requested_date() 
        self.log.debug('Stopping after current horse is finished. '                       'Press Ctrl+C again for a cold shutdown.') 
    else:     
        raise StopRequested()

這里有幾個看點:

  • 重新注冊了SIGINT和SIGTERM的handler,意味著連續(xù)兩次發(fā)出這兩個類型的信號就會進行cold shut down
  • worker處于busy狀態(tài)和非busy狀態(tài)的處理方式是不一樣的:處于非busy狀態(tài)時直接結(jié)束worker,處于busy狀態(tài)時就設置_stop_requested屬性,等結(jié)束這一趟job之后再自殺
  • 使用拋出異常的方式來結(jié)束worker,這個StopRequested的異常會在worker的主循環(huán)中捕獲,然后跳出worker循環(huán),結(jié)束worker進程

日志

只有worker模塊有打日志。首先一個常規(guī)套路,就是用模塊名稱來命名logger:

# worker.py
logger = logging.getLogger(__name__)

然后在worker開始工作的時候調(diào)用setup_loghandlers函數(shù)配置logger的handler:

# logutils.py
def setup_loghandlers(level):    
    logger = logging.getLogger('rq.worker')   
    if not _has_effective_handler(logger):   
        logger.setLevel(level)       
        # This statement doesn't set level properly in Python-2.6      
        # Following is an additional check to see if level has been set to       
        # appropriate(int) value       
        if logger.getEffectiveLevel() == level:       
            # Python-2.6. Set again by using logging.INFO etc.
            level_int = getattr(logging, level)   
            logger.setLevel(level_int)    
        formatter = logging.Formatter(fmt='%(asctime)s %(message)s',
                                      datefmt='%H:%M:%S')
        handler = ColorizingStreamHandler()
        handler.setFormatter(formatter)
        logger.addHandler(handler)

def _has_effective_handler(logger):    
    """ 
    Checks if a logger has a handler that will catch its messages in its logger hierarchy.   
    :param `logging.Logger` logger: The logger to be checked.
    :return: True if a handler is found for the logger, False otherwise.    
    :rtype: bool    
    """    
    while True:  
        if logger.handlers:          
            return True     
        if not logger.parent:        
            return False    
        logger = logger.parent

以上代碼主要有兩點:

  • 如果名為rq.worker這個logger未設置handler的話就在setup_loghandlers里面設置一下,從_has_effective_handler里面可以體現(xiàn)出logger的繼承性;
  • 使用了一個自定義的名為ColorizingStreamHandler的Handler,它的作用是為輸出到終端的日志加上顏色,這里就不貼出具體的代碼了,有需要再參考源碼。

但是按照這個默認配置也會有問題,因為在整個worker的運行周期中,只配置了rq.worker這個logger,所以只有rq.worker或者繼承rq.worker的logger可以使用setup_loghandlers函數(shù)里面的配置,這樣看來rq的logger配置方面做得還不夠靈活。恐怕這方面只能靠自定義worker才能達到靈活配置logging的目標。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容