celery 定時(shí)任務(wù)實(shí)現(xiàn)

Celery 是什么?

  • 異步任務(wù)隊(duì)列工具,主要解決 realtime 事件的異步操作,但也支持定時(shí)任務(wù)。
  • 什么是異步?那要先理解什么是同步,比如我去麥當(dāng)勞吃飯,如果麥當(dāng)勞前一個(gè)顧客點(diǎn)完單,拿到餐,吃完走人之后才能接待下一個(gè)顧客,就是同步。反過(guò)來(lái)我點(diǎn)完單,它馬上就接下一個(gè)客人的單,我的流程雖然還沒(méi)有走完(time-consuming),但也不影響下一個(gè)顧客點(diǎn)單(blocked),這就是異步。

Celery 安裝

  • pip install celery,由于 celery 自己并不帶隊(duì)列存儲(chǔ),所以根據(jù)官方推薦,還需要安裝 RabbitMQ 或者 Redis 來(lái)存儲(chǔ)隊(duì)列。方便起見(jiàn),本文用 Redis。

Celery 機(jī)制

  1. celery 是一個(gè)裝飾器類,本質(zhì)上是把一個(gè)函數(shù)變成一個(gè)可以異步調(diào)用的函數(shù)。
  2. 開(kāi)啟 celery 進(jìn)程。
  3. 在另一段程序中導(dǎo)入這個(gè)函數(shù),當(dāng)這個(gè)函數(shù)被以 delay 模式調(diào)用時(shí):xxx.delay(),celery 會(huì)把這個(gè)任務(wù)寫(xiě)到任務(wù)隊(duì)列然后返回,原程序不會(huì)被阻塞可以往下跑。注意這里需要用 delay 模式去跑這個(gè)函數(shù),同時(shí)注意不要把 xxx.delay() 的結(jié)果賦值給一個(gè)變量,否則依然會(huì)被阻塞。
  4. celery 的另一個(gè)進(jìn)程會(huì)去這個(gè)任務(wù)隊(duì)列里取任務(wù),完成之后寫(xiě)到 result 隊(duì)列里面。或者多數(shù)情況下,這個(gè)被異步調(diào)用的函數(shù)不需要返回結(jié)果,比如發(fā)送一個(gè)郵件,提醒之類的,連 result 隊(duì)列都不用。

Celery first blood

  1. 開(kāi)一個(gè) python 腳本,比如tasks.py

  2. 生成一個(gè) Celery 對(duì)象實(shí)例,是一個(gè)裝飾器。

    from celery import Celery
    app = Celery('__name__', broker='redis://localhost:6379') 
    
  3. 定義一個(gè)你想異步操作的函數(shù),并加上 celery 裝飾器 @app.task。

    @app.task
    def add(x, y):
        return x + y
    
  4. 保存,退出,然后在 terminal 啟動(dòng) celery 服務(wù)。

    celery -A tasks worker --loglevel=info
    

做個(gè)定時(shí)任務(wù):每天發(fā)問(wèn)候

  1. 接下來(lái),我要搭配釘釘機(jī)器人了,我希望小仙女每天早上7點(diǎn)給我發(fā)個(gè)問(wèn)候,然后在7點(diǎn)半的時(shí)候確認(rèn)我有沒(méi)有開(kāi)始干活了。

  2. 先寫(xiě)一個(gè)簡(jiǎn)單的釘釘提醒程序,命名celery_worker.py

    #! /usr/bin/env python
    # coding: utf-8
    
    import requests
    import json
    import time
    
    from config import HOST_IP, NOTIFY_URL, MOBILE_NUMBER
    
    def notify_dingding(msg):
        headers = {"Content-Type": "application/json; charset=utf-8"}
        post_data = {
            "msgtype": "text",
            "text": {
                "content": msg
            },
            "at": {
                "atMobiles": [MOBILE_NUMBER]
            }
        }
    
        r = requests.post(NOTIFY_URL, headers=headers, data=json.dumps(post_data))
        print(r.content)
    

    注意這里從 config 里導(dǎo)入了一些參數(shù),所以要在這個(gè)程序的同一層寫(xiě)一個(gè) config.py 的配置文件。

    # config.py
    NOTIFY_URL = ("https://oapi.dingtalk.com/robot/send?access_token="
        "c6d5a2936381dfc29394f3c336bea5fad962d90ffd31809e92d95a1xxxxxxxx")
    MOBILE_NUMBER = "176xxxxx619"
    HOST_IP = "127.0.0.1"
    
  3. 導(dǎo)入 celery 包,給函數(shù)加上裝飾器:

    from celery import Celery
    
    BROKER_URI = 'redis://%s:6379/6' % HOST_IP
    BACKEND_URI = 'redis://%s:6379/5' % HOST_IP
    
    worker = Celery('celery_worker', broker=BROKER_URI, backend=BACKEND_URI)
    
    @worker.task
    def notify_dingding(msg):
        ...
    
  4. 簡(jiǎn)單學(xué)習(xí)一下celery 的 crontab 定時(shí)任務(wù)。

  5. 給 worker 加上定時(shí)任務(wù)

    from celery.schedules import crontab
    
    worker.conf.update(
        timezone='Asia/Shanghai',
        enable_utc=True,
        beat_schedule={
            "morning_msg_1": {
                "task": "celery_worker.notify_dingding",
                "schedule": crontab(minute=0, hour=7),
                "args": ("早,起床了喲,先去做個(gè)早飯吧",)
            },
            "morning_msg_2": {
                "task": "celery_worker.notify_dingding",
                "schedule": crontab(minute=30, hour=7),
                "args": ("我就問(wèn)問(wèn)你在干活咩?",)
            }
        }
    )
    
  6. 最后程序末尾加一個(gè)小測(cè)試,看看服務(wù)是不是起來(lái)了:

    notify_dingding("小仙女上線啦")
    
  7. 開(kāi)啟 redis-server

    nohup redis-server &
    
  8. 開(kāi)啟我們的 celery worker,這里的-B 是 celery 的 beat 服務(wù),可以理解為一個(gè)周期任務(wù)。

    celery -A celery_worker worker -B
    
  9. 服務(wù)起來(lái)嘍:

QQ20170818-163110.jpg
QQ20170818-162958.jpg
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容