celery實(shí)現(xiàn)微信小程序推送消息 自動(dòng)化檢索

celery實(shí)現(xiàn)微信小程序推送消息 自動(dòng)化檢索

一、目錄結(jié)構(gòu)

使用如下目錄結(jié)構(gòu):

目錄結(jié)構(gòu)

相關(guān)文件及說明:

celery.py 創(chuàng)建應(yīng)用實(shí)例
config.py 參數(shù)配置文件
tasks.py 執(zhí)行任務(wù)文件(消費(fèi)者文件)

二、celery文件說明及介紹

celery是一個(gè)性能良好的python異步協(xié)調(diào)庫(kù) 可以設(shè)置定時(shí)任務(wù),協(xié)調(diào)請(qǐng)求方(生產(chǎn)者)與提供方(消費(fèi)者)的庫(kù)

</br>

我的另外一篇博客是關(guān)于celery入門使用理解處理機(jī)制

celery.py(創(chuàng)建app實(shí)例)

from __future__ import absolute_import
from celery import Celery

app = Celery('celery_proj', include=['celery_proj.tasks'])

app.config_from_object('celery_proj.config')

if __name__ == '__main__':
   app.start()

config.py(配置文件)

from __future__ import absolute_import
from datetime import timedelta

CELERY_RESULT_BACKEND = 'redis://:123456qw@@127.0.0.1:6379/6'
BROKER_URL = 'redis://:123456qw@@127.0.0.1:6379/5'
CELERY_TIMEZONE = 'Asia/Shanghai'


CELERYBEAT_SCHEDULE = {
   # 時(shí)鐘周期為30 沒30秒添加一次檢索是否有需要推送消息的任務(wù)    
   'add-every-30-seconds': {
        'task': 'celery_proj.tasks.pullMsg', # 添加生產(chǎn)者函數(shù)
        'schedule': timedelta(seconds=30), #30秒后執(zhí)行 建議和時(shí)鐘同步
        'args': (60,)
   },
}

task.py(生產(chǎn)者函數(shù))

from __future__ import absolute_import
import requests
from celery_proj.celery import app as ap # 由于后臺(tái)使用的是flask框架 防止重復(fù)導(dǎo)入app
import sys
sys.path.append("..")
#調(diào)用封裝好的redis數(shù)據(jù)庫(kù)記錄推送狀態(tài) 已推送?未推送? 可以通過設(shè)置key的過期時(shí)間設(shè)置最小推送時(shí)間間隔
from model.redis_models import Redis_db 
from model.ord_models import *
import datetime

template_id = 'WJuAWZ1DGHzjcOk0W2BevWkZdORxITBNZE5gUfHEr9M'
app_id = 'wx64118b44bbd3bfa3'
secret_key = '微信小程序的密鑰'


def get_access_token(app_id=app_id, secret_key=secret_key):
    ''' 獲取微信access_token'''
    try:
        payload = {
            'grant_type': 'client_credential',
            'appid': app_id,
            'secret': secret_key
        }
        req = requests.get(
            'https://api.weixin.qq.com/cgi-bin/token',
            params=payload,
            timeout=3,
            verify=False)
        access_token = req.json().get('access_token', "")
        print('access_token', access_token)
        return access_token
    except Exception as e:
        print(e)


def push(
        openid,
        formId,
        item_info: dict,
        access_token,
        template_id=template_id):
    '''推送'''
    data = {
        "touser": openid,
        "template_id": template_id,
        "form_id": formId,
        'page': 'pages\person\myjoinedlist\myjoineditem_detail\myjoineditem_detail?wd={}'.format(
            item_info.get(
                'item_pass_id',
                '未知')),
        "data": {
            'keyword1': {
                'value': item_info.get(
                    'item_name',
                    '未知')},
            'keyword2': {
                'value': item_info.get(
                    'item_address',
                    '未知')},
            'keyword3': {
                'value': item_info.get(
                    'item_start_time',
                    '未知')}},
        "emphasis_keyword": ''}
    push_url = 'https://api.weixin.qq.com/cgi-bin/message/wxopen/template/send?access_token={}'.format(
        access_token)
    result = requests.post(push_url, json=data, timeout=3, verify=False)
    return result

@ap.task
def pullMsg(delta=60):
    '''推送消息'''
    access_token = get_access_token()
    now = datetime.datetime.now()
    timedel = datetime.datetime.now() + datetime.timedelta(minutes=delta)
    # 搜索所有快要開始的活動(dòng)
    objlist = db.session.query(OrdObject.obj_id).filter(
        OrdObject.startOrd_time <= timedel,
        OrdObject.startOrd_time >= now)
    # 搜索相關(guān)訂單
    ordlist = db.session.query(Orderinfo.ordNum).filter(
        Orderinfo.objId.in_(objlist)
    )
    # 查詢可推送的用戶和實(shí)例 此處為項(xiàng)目/活動(dòng)
    # 搜索所有符合的用戶,預(yù)定信息,項(xiàng)目名稱,地址, 并且逐個(gè)推送
    # all (1, 'jhon', 'xixixi', 'bbbUI0egBJY1zhBYw2KhdUfwVJJE', datetime.datetime(2019, 5, 5, 14, 0))
    result = db.session.query(
        Item.item_id,
        Item.item_name,
        Item.item_address,
        Item.pass_id,
        Order.ord_usId,
        db.func.min(
            OrdObject.startOrd_time)).join(
        Orderinfo,
        Order.ord_num == Orderinfo.ordNum).outerjoin(OrdObject).filter(
                OrdObject.obj_id == Orderinfo.objId,
                OrdObject.startOrd_time <= timedel,
                OrdObject.startOrd_time <= now).group_by(
                    Item.item_id,
        Order.ord_usId).all()
    # 按狀態(tài)推送 已推送的不推送
    status_db = Redis_db('pushMsg_status')
    formId_db = Redis_db('formId')
    if result and access_token:
        for i in result:
            item_id, openid = i[0], i[4]
            # 檢查是否已推送
            response = status_db.check_pushMsg_status(item_id, openid)
            if response:
                # 已推送 下個(gè)
                continue
            else:

                # 未推送
                # 記錄狀態(tài)為已推送 假設(shè)未推送成功也最少24小時(shí)后再試
                response = status_db.set_pushMsg_status(item_id, openid)
                formId = formId_db.get_formId(openid)
                if formId:
                    # 推送 無法推送的情況,需要 改進(jìn)redis過期時(shí)間
                    item_info = {
                        'item_name': i[1],
                        'item_address': i[2],
                        'item_pass_id': i[3],
                        'item_start_time': '{0}.{1}.{2} {3}:{4}'.format(
                            i[5].year,
                            i[5].month,
                            i[5].day,
                            i[5].hour,
                            i[5].minute)}
                    return push(openid, formId, item_info, access_token)
                else:
                    # 無法推送
                    return 'without formId!'
        return 'NO RESULT for!'
    else:
        return 'NO RESULT!'

三、通過守護(hù)進(jìn)程啟動(dòng)

  • celery multi start w1 -A celery_proj -B -l info --logfile = celerylog.log (以守護(hù)進(jìn)程啟動(dòng) 停止改成stop就行了 日志指定為celerylog.log)

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容