pyspider流程

pyspider 執(zhí)行流程

process組件,result組件, fetcher組件

都差不多, 都是從隊列讀取task, 執(zhí)行.

scheduler組件和webui組件

scheduler負責調(diào)度task給fetcher隊列(shedulerfetcher2)
webui負責和scheduler進行通訊,前端交互產(chǎn)生task, 交給scheduler調(diào)度.

scheduler流程

主要是run_once的代碼, 總結(jié)來說

  • _update_projects:更新projcet, 從數(shù)據(jù)庫拿task放入scheduler的project.task_queue
  • _check_task_done: 檢查status_queue(和process模塊有關(guān)), 放入project.task_queue
  • _check_request: 檢查_postpone_request和newtask_queue, 放入project.task_queue
  • _check_select: 從project.task_queue拿出來task放入fetcher隊列
def run_once(self):
    '''comsume queues and feed tasks to fetcher, once'''
    self._update_projects()
    self._check_task_done()
    self._check_request()
    while self._check_cronjob():
        pass
    self._check_select()
    self._check_delete()
    self._try_dump_cnt()
_update_projects()

更新project信息, 實例化project并且存入scheduler的self.projects.

for project in self.projectdb.check_update(self._last_update_project):
           self._update_project(project)
_update_project()

發(fā)送一個taskid為_on_get_info給fetcher隊列(用來更新project), 從數(shù)據(jù)庫讀取task,插入到self.project.task_queue

if project._send_on_get_info:
          # update project runtime info from processor by sending a _on_get_info
          # request, result is in status_page.track.save
          project._send_on_get_info = False

          self.on_select_task({
              'taskid': '_on_get_info',
              'project': project.name,
              'url': 'data:,_on_get_info',
              'status': self.taskdb.SUCCESS,
              'fetch': {
                  'save': self.get_info_attributes,
              },
              'process': {
                  'callback': '_on_get_info',
              },
          })

if project.active:
     if not project.task_loaded:
     # _load_tasks 就是從數(shù)據(jù)庫取task
          self._load_tasks(project)
          project.task_loaded = True

另外, webui更新project的交互的實現(xiàn)就是通過rpc觸發(fā)修改_force_update_project的function.

def update_project():
    self._force_update_project = True
application.register_function(update_project, 'update_project')
_check_task_done()

檢查status_queue. 叫_check_task_done的原因可能是因為這個隊列的task是通過process模塊產(chǎn)生, 檢查是否正確.

while True:          
if task.get('taskid') == '_on_get_info' and 'project' in task and 'track' in task:
    if task['project'] not in self.projects:
        continue
    project = self.projects[task['project']]
    project.on_get_info(task['track'].get('save') or {})
    logger.info(
        '%s on_get_info %r', task['project'], task['track'].get('save', {})
    )
        continue
    # 檢測task是否滿足需求
    elif not self.task_verify(task):
        continue
    # 如果是新的task,
    self.on_task_status(task)
on_task_status

on_task_status調(diào)用on_task_done和on_task_failed, 并且把task插入active_tasks

if procesok:
    ret = self.on_task_done(task)
else:
    ret = self.on_task_failed(task)
 self.projects[task['project']].active_tasks.appendleft((time.time(), task))

on_task_done把task放入self.project.status_queue, 并且更新數(shù)據(jù)庫
on_task_failed判斷是next_exetime, 如果小于0插入數(shù)據(jù)庫task的status為fail, 否則插入插入數(shù)據(jù)
庫放入self.project.task_queue

_check_request

從_postpone_request 和 newtask_queue 拿到task執(zhí)行 on_request, _postpone_request這個隊列用來存儲正在processing狀態(tài)的task,
可能是說, 在執(zhí)行但是產(chǎn)生修改的task

for task in self._postpone_request:
  if self.projects[task['project']].task_queue.is_processing(task['taskid']):
         todo.append(task)
     else:
         # 對于老的
         self.on_request(task)
self._postpone_request = todo
while len(tasks) < self.LOOP_LIMIT:
           try:
               task = self.newtask_queue.get_nowait()
           except Queue.Empty:
               break
for task in itervalues(tasks):
    self.on_request(task)

on_request從數(shù)據(jù)庫讀取oldtask,如果存在執(zhí)行on_old_request, 如果不存在執(zhí)行on_new_request

on_old_request

判斷老的task是否需要重新爬去或者取消, 更新數(shù)據(jù)庫, 插入self.project.task_queue

on_new_request

插入task到數(shù)據(jù)庫, 插入task到self.project.task_queue

_check_cronjob

插入一個taskid為_on_cronjob的task給fetcher的隊列,插入task到self.project.active_tasks

def _check_cronjob(self):
    """Check projects cronjob tick, return True when a new tick is sended"""
    now = time.time()
    self._last_tick = int(self._last_tick)
    if now - self._last_tick < 1:
        return False
    self._last_tick += 1
    for project in itervalues(self.projects):
        if not project.active:
            continue
        if project.waiting_get_info:
            continue
        if int(project.min_tick) == 0:
            continue
        if self._last_tick % int(project.min_tick) != 0:
            continue
        self.on_select_task({
            'taskid': '_on_cronjob',
            'project': project.name,
            'url': 'data:,_on_cronjob',
            'status': self.taskdb.SUCCESS,
            'fetch': {
                'save': {
                    'tick': self._last_tick,
                },
            },
            'process': {
                'callback': '_on_cronjob',
            },
        })
    return True

_check_select

從self.project.task_queue拿出task, 插入fetcher隊列

for project, taskid in taskids:
    self._load_put_task(project, taskid)    

剩下兩個用來刪除project和監(jiān)控隊列數(shù)量

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容