在分布式系統(tǒng)中,將計算密集型任務(wù)(如 AI 圖像生成)從 Web 服務(wù)中剝離是常見的架構(gòu)模式。本文將介紹如何使用 Python 的 asyncio 和 aio_periodic 構(gòu)建一個健壯的 Worker,它能夠調(diào)用本地 MLX 推理引擎生成圖片,同時保持高并發(fā)下的穩(wěn)定性。
1. 架構(gòu)概述
我們的目標是構(gòu)建一個 Worker 節(jié)點,它負責(zé):
-
監(jiān)聽任務(wù):從
periodic任務(wù)服務(wù)器領(lǐng)取繪圖任務(wù)。 -
執(zhí)行推理:調(diào)用適配 Apple Silicon 的
z-image-turbo-mlx腳本。 - 非阻塞交互:在長達數(shù)秒的生成過程中,保持與服務(wù)器的心跳連接。
- 結(jié)果回傳:將生成的圖片轉(zhuǎn)為 Base64 傳回。
2. 核心挑戰(zhàn):同步 vs 異步
在早期實現(xiàn)中,開發(fā)者常直接使用 os.system 調(diào)用外部命令。這在異步框架(如 asyncio)中是致命的。
-
問題:
os.system會阻塞整個 Python 進程。在 AI 生成圖片的 5-10 秒內(nèi),Event Loop 停止運轉(zhuǎn),Worker 無法發(fā)送心跳包(Heartbeat),導(dǎo)致服務(wù)器誤判 Worker 掉線并重新分配任務(wù),造成“僵尸任務(wù)”循環(huán)。 -
解法:使用
asyncio.create_subprocess_exec。它允許我們在等待子進程結(jié)束時,讓出 CPU 控制權(quán),使 Worker 能繼續(xù)處理網(wǎng)絡(luò) IO。
3. 代碼實現(xiàn)亮點
3.1 安全的子進程調(diào)用
為了防止 Shell 命令注入攻擊(例如用戶在 prompt 中輸入 ; rm -rf /),我們放棄字符串拼接,改用列表傳參:
# 推薦做法:列表傳參
cmd = [
sys.executable, 'generate_mlx.py',
'--prompt', user_prompt,
'--output', output_filename
]
# asyncio 負責(zé)安全地將參數(shù)傳給子進程,無需經(jīng)過 Shell 解釋
process = await asyncio.create_subprocess_exec(*cmd)
3.2 保持 Event Loop 活躍
對于文件讀取這種 IO 操作,雖然 Python 的 open() 是同步的,但在高并發(fā)下依然可能造成微小的卡頓。我們利用 run_in_executor 將其放入線程池:
# 將同步的文件讀取扔到線程池,避免阻塞主循環(huán)
loop = asyncio.get_running_loop()
b64_str = await loop.run_in_executor(
None,
partial(blocking_image_to_base64, output_filename)
)
3.3 健壯的資源清理
AI 任務(wù)常因顯存不足或參數(shù)錯誤而失敗。使用 try...finally 模式確保即使生成失敗,臨時文件也能被清理,保持環(huán)境整潔。
try:
await run_generation()
except Exception:
logger.error("Job failed")
finally:
if os.path.exists(temp_file):
os.remove(temp_file) # 無論成功失敗,必須清理垃圾
4. 實時日志流
為了方便運維監(jiān)控,我們配置子進程直接繼承父進程的標準輸出。這意味著你在運行 Worker 的終端上,可以直接看到底層 generate_mlx.py 打印的進度條和日志,而無需復(fù)雜的管道轉(zhuǎn)發(fā)。
5. 總結(jié)
通過將 aio_periodic 的任務(wù)調(diào)度能力與 asyncio 的子進程管理結(jié)合,我們構(gòu)建了一個既能利用本地強大算力(MLX),又完全符合分布式系統(tǒng)穩(wěn)定性要求的 Worker。這種模式同樣適用于視頻轉(zhuǎn)碼、PDF 生成等其他耗時任務(wù)。