1、第一種辦法是最簡(jiǎn)單又最暴力。那就是在一個(gè)死循環(huán)中,使用線程睡眠函數(shù) sleep()。
from datetime import datetime
import time
'''
每個(gè) 10 秒打印當(dāng)前時(shí)間。
'''
def timedTask():
while True:
print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
time.sleep(10)
if __name__ == '__main__':
timedTask()
'''
想最快的入門Python嗎?請(qǐng)搜索:"泉小朵",來學(xué)習(xí)Python最快入門教程。
也可以加入我們的Python學(xué)習(xí)Q群:902936549,看看前輩們是如何學(xué)習(xí)的。
'''
這種方法能夠執(zhí)行固定間隔時(shí)間的任務(wù)。如果timedTask()函數(shù)之后還有些操作,我們還使用死循環(huán) + 阻塞線程。這會(huì)使得timedTask()一直占有 CPU 資源,導(dǎo)致后續(xù)操作無法執(zhí)行。我建議謹(jǐn)重使用。
2、既然第一種方法暴力,那么有沒有比較優(yōu)雅地方法?答案是肯定的。Python 標(biāo)準(zhǔn)庫 threading 中有個(gè) Timer 類。它會(huì)新啟動(dòng)一個(gè)線程來執(zhí)行定時(shí)任務(wù),所以它是非阻塞函式。
如果你有使用多線程的話,需要關(guān)心線程安全問題。那么你可以選使用threading.Timer模塊。
from datetime import datetime
from threading import Timer
import time
'''
每個(gè) 10 秒打印當(dāng)前時(shí)間。
'''
def timedTask():
'''
第一個(gè)參數(shù): 延遲多長時(shí)間執(zhí)行任務(wù)(單位: 秒)
第二個(gè)參數(shù): 要執(zhí)行的任務(wù), 即函數(shù)
第三個(gè)參數(shù): 調(diào)用函數(shù)的參數(shù)(tuple)
'''
Timer(10, task, ()).start()
# 定時(shí)任務(wù)
def task():
print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
if __name__ == '__main__':
timedTask()
while True:
print(time.time())
time.sleep(5)
運(yùn)行結(jié)果:
1512486945.1196375
1512486950.119873
2017-12-05 23:15:50
1512486955.133385
3、使用標(biāo)準(zhǔn)庫中sched模塊。sched 是事件調(diào)度器,它通過 scheduler 類來調(diào)度事件,從而達(dá)到定時(shí)執(zhí)行任務(wù)的效果。
sched庫使用起來也是非常簡(jiǎn)單。
1)首先構(gòu)造一個(gè)sched.scheduler類
它接受兩個(gè)參數(shù):timefunc和 delayfunc。timefunc 應(yīng)該返回一個(gè)數(shù)字,代表當(dāng)前時(shí)間,delayfunc 函數(shù)接受一個(gè)參數(shù),用于暫停運(yùn)行的時(shí)間單元。
一般使用默認(rèn)參數(shù)就行,即傳入這兩個(gè)參數(shù) time.time 和 time.sleep.當(dāng)然,你也可以自己實(shí)現(xiàn)時(shí)間暫停的函數(shù)。
2)添加調(diào)度任務(wù)
scheduler 提供了兩個(gè)添加調(diào)度任務(wù)的函數(shù):
enter(delay, priority, action, argument=(), kwargs={})
1
該函數(shù)可以延遲一定時(shí)間執(zhí)行任務(wù)。delay 表示延遲多長時(shí)間執(zhí)行任務(wù),單位是秒。priority為優(yōu)先級(jí),越小優(yōu)先級(jí)越大。兩個(gè)任務(wù)指定相同的延遲時(shí)間,優(yōu)先級(jí)大的任務(wù)會(huì)向被執(zhí)行。action 即需要執(zhí)行的函數(shù),argument 和 kwargs 分別是函數(shù)的位置和關(guān)鍵字參數(shù)。
scheduler.enterabs(time, priority, action, argument=(), kwargs={})
1
添加一項(xiàng)任務(wù),但這個(gè)任務(wù)會(huì)在 time 這時(shí)刻執(zhí)行。因此,time 是絕對(duì)時(shí)間.其他參數(shù)用法與 enter() 中的參數(shù)用法是一致。
3)把任務(wù)運(yùn)行起來
調(diào)用 scheduler.run()函數(shù)就完事了。
下面是 sche 使用的簡(jiǎn)單示例:
from datetime import datetime
import sched
import time
'''
每個(gè) 10 秒打印當(dāng)前時(shí)間。
'''
def timedTask():
# 初始化 sched 模塊的 scheduler 類
scheduler = sched.scheduler(time.time, time.sleep)
# 增加調(diào)度任務(wù)
scheduler.enter(10, 1, task)
# 運(yùn)行任務(wù)
scheduler.run()
# 定時(shí)任務(wù)
def task():
print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
if __name__ == '__main__':
timedTask()
值得注意的是: scheduler 中的每個(gè)調(diào)度任務(wù)只會(huì)工作一次,不會(huì)無限循環(huán)被調(diào)用。如果想重復(fù)執(zhí)行同一任務(wù), 需要重復(fù)添加調(diào)度任務(wù)即可。
上面三種辦法能實(shí)現(xiàn)定時(shí)任務(wù),但是都無法做到循環(huán)執(zhí)行定時(shí)任務(wù)。因此,需要一個(gè)能夠擔(dān)當(dāng)此重任的庫。它就是APScheduler。
1 簡(jiǎn)介
APScheduler的全稱是Advanced Python Scheduler。它是一個(gè)輕量級(jí)的 Python 定時(shí)任務(wù)調(diào)度框架。APScheduler 支持三種調(diào)度任務(wù):固定時(shí)間間隔,固定時(shí)間點(diǎn)(日期),Linux 下的 Crontab 命令。同時(shí),它還支持異步執(zhí)行、后臺(tái)執(zhí)行調(diào)度任務(wù)。
2 安裝
使用 pip 包管理工具安裝 APScheduler 是最方便快捷的。
pip install APScheduler
# 如果出現(xiàn)因下載失敗導(dǎo)致安裝不上的情況,建議使用代理
pip --proxy http://代理ip:端口 install APScheduler
3 使用步驟
APScheduler 使用起來還算是比較簡(jiǎn)單。運(yùn)行一個(gè)調(diào)度任務(wù)只需要以下三部曲。
新建一個(gè) schedulers (調(diào)度器) 。
添加一個(gè)調(diào)度任務(wù)(job stores)。
運(yùn)行調(diào)度任務(wù)。
下面是執(zhí)行每 2 秒報(bào)時(shí)的簡(jiǎn)單示例代碼:
'''
想最快的入門Python嗎?請(qǐng)搜索:"泉小朵",來學(xué)習(xí)Python最快入門教程。
也可以加入我們的Python學(xué)習(xí)Q群:902936549,看看前輩們是如何學(xué)習(xí)的。
'''
import datetime
import time
from apscheduler.schedulers.background import BackgroundScheduler
def timedTask():
print(datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3])
if __name__ == '__main__':
# 創(chuàng)建后臺(tái)執(zhí)行的 schedulers
scheduler = BackgroundScheduler()
# 添加調(diào)度任務(wù)
# 調(diào)度方法為 timedTask,觸發(fā)器選擇 interval(間隔性),間隔時(shí)長為 2 秒
scheduler.add_job(timedTask, 'interval', seconds=2)
# 啟動(dòng)調(diào)度任務(wù)
scheduler.start()
while True:
print(time.time())
time.sleep(5)
4 基礎(chǔ)組件
APScheduler 有四種組件,分別是:調(diào)度器(scheduler),作業(yè)存儲(chǔ)(job store),觸發(fā)器(trigger),執(zhí)行器(executor)。
schedulers(調(diào)度器)
它是任務(wù)調(diào)度器,屬于控制器角色。它配置作業(yè)存儲(chǔ)器和執(zhí)行器可以在調(diào)度器中完成,例如添加、修改和移除作業(yè)。
triggers(觸發(fā)器)
描述調(diào)度任務(wù)被觸發(fā)的條件。不過觸發(fā)器完全是無狀態(tài)的。
job stores(作業(yè)存儲(chǔ)器)
任務(wù)持久化倉庫,默認(rèn)保存任務(wù)在內(nèi)存中,也可將任務(wù)保存都各種數(shù)據(jù)庫中,任務(wù)中的數(shù)據(jù)序列化后保存到持久化數(shù)據(jù)庫,從數(shù)據(jù)庫加載后又反序列化。
executors(執(zhí)行器)
負(fù)責(zé)處理作業(yè)的運(yùn)行,它們通常通過在作業(yè)中提交指定的可調(diào)用對(duì)象到一個(gè)線程或者進(jìn)城池來進(jìn)行。當(dāng)作業(yè)完成時(shí),執(zhí)行器將會(huì)通知調(diào)度器。
4.1 schedulers(調(diào)度器)
我個(gè)人覺得 APScheduler 非常好用的原因。它提供 7 種調(diào)度器,能夠滿足我們各種場(chǎng)景的需要。例如:后臺(tái)執(zhí)行某個(gè)操作,異步執(zhí)行操作等。調(diào)度器分別是:
BlockingScheduler : 調(diào)度器在當(dāng)前進(jìn)程的主線程中運(yùn)行,也就是會(huì)阻塞當(dāng)前線程。
BackgroundScheduler : 調(diào)度器在后臺(tái)線程中運(yùn)行,不會(huì)阻塞當(dāng)前線程。
AsyncIOScheduler : 結(jié)合 asyncio 模塊(一個(gè)異步框架)一起使用。
GeventScheduler : 程序中使用 gevent(高性能的Python并發(fā)框架)作為IO模型,和 GeventExecutor 配合使用。
TornadoScheduler : 程序中使用 Tornado(一個(gè)web框架)的IO模型,用 ioloop.add_timeout 完成定時(shí)喚醒。
TwistedScheduler : 配合 TwistedExecutor,用 reactor.callLater 完成定時(shí)喚醒。
QtScheduler : 你的應(yīng)用是一個(gè) Qt 應(yīng)用,需使用QTimer完成定時(shí)喚醒。
4.2 triggers(觸發(fā)器)
APScheduler 有三種內(nèi)建的 trigger:
1)date 觸發(fā)器
date 是最基本的一種調(diào)度,作業(yè)任務(wù)只會(huì)執(zhí)行一次。它表示特定的時(shí)間點(diǎn)觸發(fā)。它的參數(shù)如下:
參數(shù) 說明
run_date (datetime 或 str) 作業(yè)的運(yùn)行日期或時(shí)間
timezone (datetime.tzinfo 或 str) 指定時(shí)區(qū)
date 觸發(fā)器使用示例如下:
from datetime import datetime
from datetime import date
from apscheduler.schedulers.background import BackgroundScheduler
def job_func(text):
print(text)
scheduler = BackgroundScheduler()
# 在 2017-12-13 時(shí)刻運(yùn)行一次 job_func 方法
scheduler .add_job(job_func, 'date', run_date=date(2017, 12, 13), args=['text'])
# 在 2017-12-13 14:00:00 時(shí)刻運(yùn)行一次 job_func 方法
scheduler .add_job(job_func, 'date', run_date=datetime(2017, 12, 13, 14, 0, 0), args=['text'])
# 在 2017-12-13 14:00:01 時(shí)刻運(yùn)行一次 job_func 方法
scheduler .add_job(job_func, 'date', run_date='2017-12-13 14:00:01', args=['text'])
scheduler.start()
2)interval 觸發(fā)器
固定時(shí)間間隔觸發(fā)。interval 間隔調(diào)度,參數(shù)如下:
參數(shù) 說明
weeks (int) 間隔幾周
days (int) 間隔幾天
hours (int) 間隔幾小時(shí)
minutes (int) 間隔幾分鐘
seconds (int) 間隔多少秒
start_date (datetime 或 str) 開始日期
end_date (datetime 或 str) 結(jié)束日期
timezone (datetime.tzinfo 或str) 時(shí)區(qū)
interval 觸發(fā)器使用示例如下:
'''
想最快的入門Python嗎?請(qǐng)搜索:"泉小朵",來學(xué)習(xí)Python最快入門教程。
也可以加入我們的Python學(xué)習(xí)Q群:902936549,看看前輩們是如何學(xué)習(xí)的。
'''
import datetime
from apscheduler.schedulers.background import BackgroundScheduler
def job_func(text):
print(datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3])
scheduler = BackgroundScheduler()
# 每隔兩分鐘執(zhí)行一次 job_func 方法
scheduler .add_job(job_func, 'interval', minutes=2)
# 在 2017-12-13 14:00:01 ~ 2017-12-13 14:00:10 之間, 每隔兩分鐘執(zhí)行一次 job_func 方法
scheduler .add_job(job_func, 'interval', minutes=2, start_date='2017-12-13 14:00:01' , end_date='2017-12-13 14:00:10')
scheduler.start()
3)cron 觸發(fā)器
在特定時(shí)間周期性地觸發(fā),和Linux crontab格式兼容。它是功能最強(qiáng)大的觸發(fā)器。
我們先了解 cron 參數(shù):
參數(shù) 說明
year (int 或 str) 年,4位數(shù)字
month (int 或 str) 月 (范圍1-12)
day (int 或 str) 日 (范圍1-31
week (int 或 str) 周 (范圍1-53)
day_of_week (int 或 str) 周內(nèi)第幾天或者星期幾 (范圍0-6 或者 mon,tue,wed,thu,fri,sat,sun)
hour (int 或 str) 時(shí) (范圍0-23)
minute (int 或 str) 分 (范圍0-59)
second (int 或 str) 秒 (范圍0-59)
start_date (datetime 或 str) 最早開始日期(包含)
end_date (datetime 或 str) 最晚結(jié)束時(shí)間(包含)
timezone (datetime.tzinfo 或str) 指定時(shí)區(qū)
這些參數(shù)是支持算數(shù)表達(dá)式,取值格式有如下:
cron 觸發(fā)器使用示例如下:
import datetime
from apscheduler.schedulers.background import BackgroundScheduler
def job_func(text):
print("當(dāng)前時(shí)間:", datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3])
scheduler = BackgroundScheduler()
# 在每年 1-3、7-9 月份中的每個(gè)星期一、二中的 00:00, 01:00, 02:00 和 03:00 執(zhí)行 job_func 任務(wù)
scheduler .add_job(job_func, 'cron', month='1-3,7-9',day='0, tue', hour='0-3')
scheduler.start()
4.3 作業(yè)存儲(chǔ)(job store)
該組件是對(duì)調(diào)度任務(wù)的管理。
1)添加 job
有兩種添加方法,其中一種上述代碼用到的 add_job(), 另一種則是scheduled_job()修飾器來修飾函數(shù)。
這個(gè)兩種辦法的區(qū)別是:第一種方法返回一個(gè) apscheduler.job.Job 的實(shí)例,可以用來改變或者移除 job。第二種方法只適用于應(yīng)用運(yùn)行期間不會(huì)改變的 job。
第二種添加任務(wù)方式的例子:
import datetime
from apscheduler.schedulers.background import BackgroundScheduler
@scheduler.scheduled_job(job_func, 'interval', minutes=2)
def job_func(text):
print(datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3])
scheduler = BackgroundScheduler()
scheduler.start()
2)移除 job
移除 job 也有兩種方法:remove_job() 和 job.remove()。
remove_job() 是根據(jù) job 的 id 來移除,所以要在 job 創(chuàng)建的時(shí)候指定一個(gè) id。
job.remove() 則是對(duì) job 執(zhí)行 remove 方法即可
scheduler.add_job(job_func, 'interval', minutes=2, id='job_one')
scheduler.remove_job(job_one)
job = add_job(job_func, 'interval', minutes=2, id='job_one')
job.remvoe()
3)獲取 job 列表
通過 scheduler.get_jobs() 方法能夠獲取當(dāng)前調(diào)度器中的所有 job 的列表
修改 job
如果你因計(jì)劃改變要對(duì) job 進(jìn)行修改,可以使用Job.modify() 或者 modify_job()方法來修改 job 的屬性。但是值得注意的是,job 的 id 是無法被修改的。
scheduler.add_job(job_func, 'interval', minutes=2, id='job_one')
scheduler.start()
# 將觸發(fā)時(shí)間間隔修改成 5分鐘
scheduler.modify_job('job_one', minutes=5)
job = scheduler.add_job(job_func, 'interval', minutes=2)
# 將觸發(fā)時(shí)間間隔修改成 5分鐘
job.modify(minutes=5)
5)關(guān)閉 job
默認(rèn)情況下調(diào)度器會(huì)等待所有正在運(yùn)行的作業(yè)完成后,關(guān)閉所有的調(diào)度器和作業(yè)存儲(chǔ)。如果你不想等待,可以將 wait 選項(xiàng)設(shè)置為 False。
scheduler.shutdown()
scheduler.shutdown(wait=false)
4.4 執(zhí)行器(executor)
執(zhí)行器顧名思義是執(zhí)行調(diào)度任務(wù)的模塊。最常用的 executor 有兩種:ProcessPoolExecutor 和 ThreadPoolExecutor
下面是顯式設(shè)置 job store(使用mongo存儲(chǔ))和 executor 的代碼的示例。
注:本代碼來源于網(wǎng)絡(luò)
from pymongo import MongoClient
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.jobstores.mongodb import MongoDBJobStore
from apscheduler.jobstores.memory import MemoryJobStore
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
def my_job():
print 'hello world'
host = '127.0.0.1'
port = 27017
client = MongoClient(host, port)
jobstores = {
'mongo': MongoDBJobStore(collection='job', database='test', client=client),
'default': MemoryJobStore()
}
executors = {
'default': ThreadPoolExecutor(10),
'processpool': ProcessPoolExecutor(3)
}
job_defaults = {
'coalesce': False,
'max_instances': 3
}
scheduler = BlockingScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults)
scheduler.add_job(my_job, 'interval', seconds=5)
try:
scheduler.start()
except SystemExit:
client.close()