celery實(shí)現(xiàn)微信小程序推送消息 自動(dòng)化檢索
一、目錄結(jié)構(gòu)
使用如下目錄結(jié)構(gòu):
目錄結(jié)構(gòu)
相關(guān)文件及說明:
celery.py 創(chuàng)建應(yīng)用實(shí)例
config.py 參數(shù)配置文件
tasks.py 執(zhí)行任務(wù)文件(消費(fèi)者文件)
二、celery文件說明及介紹
celery是一個(gè)性能良好的python異步協(xié)調(diào)庫(kù) 可以設(shè)置定時(shí)任務(wù),協(xié)調(diào)請(qǐng)求方(生產(chǎn)者)與提供方(消費(fèi)者)的庫(kù)
</br>
我的另外一篇博客是關(guān)于celery入門使用理解處理機(jī)制
celery.py(創(chuàng)建app實(shí)例)from __future__ import absolute_import from celery import Celery app = Celery('celery_proj', include=['celery_proj.tasks']) app.config_from_object('celery_proj.config') if __name__ == '__main__': app.start()
config.py(配置文件)from __future__ import absolute_import from datetime import timedelta CELERY_RESULT_BACKEND = 'redis://:123456qw@@127.0.0.1:6379/6' BROKER_URL = 'redis://:123456qw@@127.0.0.1:6379/5' CELERY_TIMEZONE = 'Asia/Shanghai' CELERYBEAT_SCHEDULE = { # 時(shí)鐘周期為30 沒30秒添加一次檢索是否有需要推送消息的任務(wù) 'add-every-30-seconds': { 'task': 'celery_proj.tasks.pullMsg', # 添加生產(chǎn)者函數(shù) 'schedule': timedelta(seconds=30), #30秒后執(zhí)行 建議和時(shí)鐘同步 'args': (60,) }, }
task.py(生產(chǎn)者函數(shù))
from __future__ import absolute_import
import requests
from celery_proj.celery import app as ap # 由于后臺(tái)使用的是flask框架 防止重復(fù)導(dǎo)入app
import sys
sys.path.append("..")
#調(diào)用封裝好的redis數(shù)據(jù)庫(kù)記錄推送狀態(tài) 已推送?未推送? 可以通過設(shè)置key的過期時(shí)間設(shè)置最小推送時(shí)間間隔
from model.redis_models import Redis_db
from model.ord_models import *
import datetime
template_id = 'WJuAWZ1DGHzjcOk0W2BevWkZdORxITBNZE5gUfHEr9M'
app_id = 'wx64118b44bbd3bfa3'
secret_key = '微信小程序的密鑰'
def get_access_token(app_id=app_id, secret_key=secret_key):
''' 獲取微信access_token'''
try:
payload = {
'grant_type': 'client_credential',
'appid': app_id,
'secret': secret_key
}
req = requests.get(
'https://api.weixin.qq.com/cgi-bin/token',
params=payload,
timeout=3,
verify=False)
access_token = req.json().get('access_token', "")
print('access_token', access_token)
return access_token
except Exception as e:
print(e)
def push(
openid,
formId,
item_info: dict,
access_token,
template_id=template_id):
'''推送'''
data = {
"touser": openid,
"template_id": template_id,
"form_id": formId,
'page': 'pages\person\myjoinedlist\myjoineditem_detail\myjoineditem_detail?wd={}'.format(
item_info.get(
'item_pass_id',
'未知')),
"data": {
'keyword1': {
'value': item_info.get(
'item_name',
'未知')},
'keyword2': {
'value': item_info.get(
'item_address',
'未知')},
'keyword3': {
'value': item_info.get(
'item_start_time',
'未知')}},
"emphasis_keyword": ''}
push_url = 'https://api.weixin.qq.com/cgi-bin/message/wxopen/template/send?access_token={}'.format(
access_token)
result = requests.post(push_url, json=data, timeout=3, verify=False)
return result
@ap.task
def pullMsg(delta=60):
'''推送消息'''
access_token = get_access_token()
now = datetime.datetime.now()
timedel = datetime.datetime.now() + datetime.timedelta(minutes=delta)
# 搜索所有快要開始的活動(dòng)
objlist = db.session.query(OrdObject.obj_id).filter(
OrdObject.startOrd_time <= timedel,
OrdObject.startOrd_time >= now)
# 搜索相關(guān)訂單
ordlist = db.session.query(Orderinfo.ordNum).filter(
Orderinfo.objId.in_(objlist)
)
# 查詢可推送的用戶和實(shí)例 此處為項(xiàng)目/活動(dòng)
# 搜索所有符合的用戶,預(yù)定信息,項(xiàng)目名稱,地址, 并且逐個(gè)推送
# all (1, 'jhon', 'xixixi', 'bbbUI0egBJY1zhBYw2KhdUfwVJJE', datetime.datetime(2019, 5, 5, 14, 0))
result = db.session.query(
Item.item_id,
Item.item_name,
Item.item_address,
Item.pass_id,
Order.ord_usId,
db.func.min(
OrdObject.startOrd_time)).join(
Orderinfo,
Order.ord_num == Orderinfo.ordNum).outerjoin(OrdObject).filter(
OrdObject.obj_id == Orderinfo.objId,
OrdObject.startOrd_time <= timedel,
OrdObject.startOrd_time <= now).group_by(
Item.item_id,
Order.ord_usId).all()
# 按狀態(tài)推送 已推送的不推送
status_db = Redis_db('pushMsg_status')
formId_db = Redis_db('formId')
if result and access_token:
for i in result:
item_id, openid = i[0], i[4]
# 檢查是否已推送
response = status_db.check_pushMsg_status(item_id, openid)
if response:
# 已推送 下個(gè)
continue
else:
# 未推送
# 記錄狀態(tài)為已推送 假設(shè)未推送成功也最少24小時(shí)后再試
response = status_db.set_pushMsg_status(item_id, openid)
formId = formId_db.get_formId(openid)
if formId:
# 推送 無法推送的情況,需要 改進(jìn)redis過期時(shí)間
item_info = {
'item_name': i[1],
'item_address': i[2],
'item_pass_id': i[3],
'item_start_time': '{0}.{1}.{2} {3}:{4}'.format(
i[5].year,
i[5].month,
i[5].day,
i[5].hour,
i[5].minute)}
return push(openid, formId, item_info, access_token)
else:
# 無法推送
return 'without formId!'
return 'NO RESULT for!'
else:
return 'NO RESULT!'
三、通過守護(hù)進(jìn)程啟動(dòng)
-
celery multi start w1 -A celery_proj -B -l info --logfile = celerylog.log (以守護(hù)進(jìn)程啟動(dòng) 停止改成stop就行了 日志指定為celerylog.log)
