一、為什么你的AI應(yīng)用那么慢?先講個(gè)早餐店的故事
1.1 同步模式:一個(gè)人排隊(duì),全店干等
假設(shè)你去一家早餐店買油條。店里有口鍋,一次只能炸一根油條。你點(diǎn)了一根,老板開始炸,你就站在鍋前盯著。兩分鐘后油條好了,你拿走。后面還有五個(gè)人排隊(duì),每個(gè)人都要等前一個(gè)人拿到油條才開始炸自己的。
這種方式下,第六個(gè)人要等12分鐘。這就是同步串行——每個(gè)任務(wù)必須等前一個(gè)完全結(jié)束后才能開始,總時(shí)間等于所有任務(wù)時(shí)間相加。
1.2 異步模式:拿號(hào)排隊(duì),邊等邊干別的事
換一種方式:你點(diǎn)完油條,老板給你一個(gè)振動(dòng)號(hào)牌,讓你先去買豆?jié){。老板同時(shí)接了五個(gè)人的訂單,鍋里同時(shí)炸著五根油條。誰(shuí)的那根好了,號(hào)牌震動(dòng),誰(shuí)就來(lái)取。
這種方式下,五個(gè)人同時(shí)點(diǎn)餐,最慢的一根油條2分鐘出鍋,所有人都能在2分鐘左右拿到。這就是異步并發(fā)——多個(gè)任務(wù)同時(shí)推進(jìn),總時(shí)間約等于最慢的那個(gè)任務(wù),而不是所有任務(wù)之和。

1.3 你的AI應(yīng)用正在犯同樣的錯(cuò)誤
絕大多數(shù)大語(yǔ)言模型應(yīng)用,比如智能客服、文檔問(wèn)答、多輪對(duì)話機(jī)器人,90%的時(shí)間都花在等網(wǎng)絡(luò)上——等 OpenAI 的API返回、等數(shù)據(jù)庫(kù)查詢結(jié)果、等搜索引擎響應(yīng)。
如果你寫的代碼是這種風(fēng)格:
result1 = call_openai("問(wèn)題1") ? # 等2秒
result2 = call_openai("問(wèn)題2") ? # 再等2秒
result3 = call_openai("問(wèn)題3") ? # 再等2秒
# 總耗時(shí)6秒
那你的代碼就相當(dāng)于早餐店里一次只炸一根油條。用戶點(diǎn)一次提問(wèn),要等6秒才有回復(fù),早就劃走了。
解決方案就是異步并發(fā):
results = await asyncio.gather(
? ? call_openai("問(wèn)題1"),
? ? call_openai("問(wèn)題2"),
? ? call_openai("問(wèn)題3")
)
# 總耗時(shí)約2秒
本文用LangGraph 1.0完整演示如何將這種能力應(yīng)用到結(jié)構(gòu)化的 AI工作流 中。代碼完整可運(yùn)行,每個(gè)概念都用白話解釋。
二、必須先搞懂三個(gè)關(guān)鍵詞(不用懂底層,懂比喻就行)
2.1 async def —— “我可以中途去做別的事”
在Python里,普通函數(shù) def 就像你坐在工位上專心寫一份報(bào)告——寫完之前,任何其他事情都不能打斷你,電話響了也不接。
而 async def 定義的異步函數(shù),就像你寫報(bào)告時(shí)可以隨時(shí)接電話、回消息、倒杯水。當(dāng)遇到需要等的事情(比如等同事發(fā)資料給你),你會(huì)說(shuō)“你先發(fā),我繼續(xù)寫報(bào)告”,等資料到了你再接著處理。
2.2 await —— “停在這,有結(jié)果了叫我”
await 是一個(gè)暫停標(biāo)志。它告訴Python:這里要執(zhí)行一個(gè)慢操作(網(wǎng)絡(luò)請(qǐng)求、讀文件、查數(shù)據(jù)庫(kù)),當(dāng)前任務(wù)先讓出CPU,去做其他任務(wù)。等這個(gè)慢操作完成了,再回到這里繼續(xù)往下跑。
你可以把 await 理解為“店小二喊號(hào)”——你聽到自己的號(hào),才從座位上起來(lái)去取餐。在等號(hào)的時(shí)間里,你該刷手機(jī)刷手機(jī),該聊天聊天。
2.3 asyncio.gather() —— “同時(shí)發(fā)起,統(tǒng)一收結(jié)果”
當(dāng)你有一堆互不依賴的任務(wù)時(shí),最直接的高效做法就是把它們?nèi)糠胚M(jìn) asyncio.gather()。
這個(gè)函數(shù)會(huì)做三件事:
-
同時(shí)啟動(dòng)所有任務(wù)
-
自動(dòng)等待所有任務(wù)完成
-
按你傳入的順序返回結(jié)果列表
串行執(zhí)行四個(gè)任務(wù):耗時(shí) 1+2+1+3 = 7秒。用 gather 并發(fā)執(zhí)行:耗時(shí)約等于最慢的那個(gè)任務(wù)(3秒)。省下來(lái)的4秒,就是用戶愿意繼續(xù)用你產(chǎn)品的理由。
三、完整代碼(復(fù)制即跑,每行都有注釋)
3.1 安裝依賴
只需要安裝 LangGraph 1.0 或更高版本:
pip install langgraph
3.2 完整代碼文件
將以下代碼保存為 async_demo.py,然后用 python async_demo.py 運(yùn)行。
# -*- coding: utf-8 -*-
"""
LangGraph 異步并發(fā)完整示例
功能:模擬4個(gè)API請(qǐng)求,對(duì)比同步串行 vs 異步并發(fā) vs LangGraph工作流
說(shuō)明:不需要真實(shí)API key,用 sleep 模擬網(wǎng)絡(luò)延遲
"""
import asyncio
import time
from typing import Annotated, List
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START, END
# ========== 第一步:定義狀態(tài)(所有節(jié)點(diǎn)共享的數(shù)據(jù)結(jié)構(gòu)) ==========
# 狀態(tài)就像工作流里的小黑板,每個(gè)節(jié)點(diǎn)都能寫,LangGraph自動(dòng)幫你合并
class IOState(TypedDict):
? ? # results: 存放每個(gè)任務(wù)返回的結(jié)果字符串
? ? # Annotated[list, 合并函數(shù)] 的意思是:多個(gè)節(jié)點(diǎn)同時(shí)寫入時(shí),自動(dòng)把列表拼接起來(lái)
? ? results: Annotated[List[str], lambda x, y: x + y]
? ? # task_count: 記錄總共有多少任務(wù),僅用于展示
? ? task_count: int
# ========== 第二步:模擬慢速API調(diào)用 ==========
# 這個(gè)函數(shù)假裝是向大模型發(fā)請(qǐng)求。實(shí)際項(xiàng)目中,將 asyncio.sleep 替換為真實(shí)的 async HTTP 請(qǐng)求即可
async def slow_api_call(task_id: int, delay: float = 1.0) -> str:
? ? """
? ? 模擬一個(gè)耗時(shí)的網(wǎng)絡(luò)請(qǐng)求
? ? task_id: 任務(wù)編號(hào)
? ? delay: 模擬的網(wǎng)絡(luò)延遲(秒)
? ? 返回: 模擬的響應(yīng)結(jié)果
? ? """
? ? current_time = time.strftime("%H:%M:%S")
? ? print(f"[{current_time}] 任務(wù){(diào)task_id} 開始,預(yù)計(jì)耗時(shí){delay}秒")
? ? # 關(guān)鍵:await asyncio.sleep 模擬 IO 等待
? ? # 在執(zhí)行這行期間,Python 會(huì)去運(yùn)行其他協(xié)程,而不是空等
? ? await asyncio.sleep(delay)
? ? current_time = time.strftime("%H:%M:%S")
? ? result = f"任務(wù){(diào)task_id}的結(jié)果(實(shí)際等待{delay}秒)"
? ? print(f"[{current_time}] 任務(wù){(diào)task_id} 完成")
? ? return result
# ========== 第三步:定義四個(gè)獨(dú)立的異步節(jié)點(diǎn) ==========
# 每個(gè)節(jié)點(diǎn)對(duì)應(yīng)工作流中的一個(gè)步驟。它們之間沒(méi)有依賴,所以可以并發(fā)執(zhí)行
async def node_1(state: IOState):
? ? result = await slow_api_call(1, 1.0)
? ? return {"results": [result]} ? # 返回字典,LangGraph 會(huì)自動(dòng)合并到全局狀態(tài)中
async def node_2(state: IOState):
? ? result = await slow_api_call(2, 1.5)
? ? return {"results": [result]}
async def node_3(state: IOState):
? ? result = await slow_api_call(3, 0.8)
? ? return {"results": [result]}
async def node_4(state: IOState):
? ? result = await slow_api_call(4, 1.2)
? ? return {"results": [result]}
# ========== 第四步:匯總節(jié)點(diǎn)(所有并發(fā)任務(wù)完成后執(zhí)行) ==========
# 這個(gè)節(jié)點(diǎn)不需要異步,因?yàn)樗蛔?CPU 計(jì)算(打印、整理數(shù)據(jù)),不涉及 IO 等待
def summary_node(state: IOState) -> dict:
? ? print("\n" + "="*50)
? ? print("匯總報(bào)告:所有并發(fā)任務(wù)已完成")
? ? print("="*50)
? ? print(f"總?cè)蝿?wù)數(shù): {state['task_count']}")
? ? print(f"收到結(jié)果數(shù): {len(state['results'])}")
? ? print("詳細(xì)結(jié)果:")
? ? for idx, res in enumerate(state['results'], 1):
? ? ? ? print(f" ?{idx}. {res}")
? ? return {"results": state["results"] + ["匯總節(jié)點(diǎn)執(zhí)行完畢"]}
# ========== 第五部分:同步版本(用于性能對(duì)比) ==========
def run_sync_version():
? ? """同步串行執(zhí)行版本,作為性能對(duì)比的基準(zhǔn)"""
? ? print("\n" + "="*40)
? ? print("【同步執(zhí)行版本】一個(gè)任務(wù)完成才做下一個(gè)")
? ? print("="*40)
? ? start_time = time.time()
? ? delays = [1.0, 1.5, 0.8, 1.2]
? ? results = []
? ? for i, delay in enumerate(delays, 1):
? ? ? ? current = time.strftime("%H:%M:%S")
? ? ? ? print(f"[{current}] 同步任務(wù){(diào)i} 開始...")
? ? ? ? # time.sleep 是阻塞的:CPU 在這里什么都不做,就干等
? ? ? ? time.sleep(delay)
? ? ? ? results.append(f"同步任務(wù){(diào)i}的結(jié)果")
? ? ? ? current = time.strftime("%H:%M:%S")
? ? ? ? print(f"[{current}] 同步任務(wù){(diào)i} 完成")
? ? elapsed = time.time() - start_time
? ? print(f"\n同步執(zhí)行總耗時(shí): {elapsed:.2f} 秒")
? ? print(f"理論串行總時(shí)間: {sum(delays)} 秒")
? ? return results
# ========== 第六部分:純異步版本(只用 asyncio.gather,不用 LangGraph) ==========
async def run_async_gather():
? ? """使用 asyncio.gather 并發(fā)執(zhí)行,展示最基本的并發(fā)能力"""
? ? print("\n" + "="*40)
? ? print("【純異步并發(fā)版本】使用 asyncio.gather 同時(shí)發(fā)起所有請(qǐng)求")
? ? print("="*40)
? ? start_time = time.time()
? ? # 創(chuàng)建四個(gè)協(xié)程對(duì)象(此時(shí)尚未執(zhí)行)
? ? tasks = [
? ? ? ? slow_api_call(1, 1.0),
? ? ? ? slow_api_call(2, 1.5),
? ? ? ? slow_api_call(3, 0.8),
? ? ? ? slow_api_call(4, 1.2),
? ? ]
? ? # asyncio.gather 同時(shí)啟動(dòng)所有任務(wù),等待全部完成,按傳入順序返回結(jié)果
? ? results = await asyncio.gather(*tasks)
? ? elapsed = time.time() - start_time
? ? print(f"\n異步并發(fā)總耗時(shí): {elapsed:.2f} 秒")
? ? print(f"理論最優(yōu)耗時(shí): {max([1.0,1.5,0.8,1.2])} 秒")
? ? return results
# ========== 第七部分:構(gòu)建 LangGraph 異步工作流 ==========
def build_langgraph_async():
? ? """
? ? 構(gòu)建一個(gè)狀態(tài)圖,實(shí)現(xiàn)結(jié)構(gòu)化的異步并發(fā)
? ? 圖結(jié)構(gòu):
? ? ? ? ? ? ? ? ? ? ┌─→ node_1 ──┐
? ? ? ? ? ? ? ? ? ? ├─→ node_2 ──┤
? ? ? ? ? ? START ──┼─→ node_3 ──┼─→ summary ──→ END
? ? ? ? ? ? ? ? ? ? ├─→ node_4 ──┤
? ? 關(guān)鍵點(diǎn):
? ? - 四個(gè) node 都從 START 出發(fā),因此它們會(huì)并發(fā)執(zhí)行
? ? - 四個(gè) node 都連接到 summary,因此 summary 會(huì)等待所有 node 完成后才執(zhí)行
? ? """
? ? builder = StateGraph(IOState)
? ? # 添加節(jié)點(diǎn)
? ? builder.add_node("node_1", node_1)
? ? builder.add_node("node_2", node_2)
? ? builder.add_node("node_3", node_3)
? ? builder.add_node("node_4", node_4)
? ? builder.add_node("summary", summary_node)
? ? # 定義邊:所有 IO 節(jié)點(diǎn)從 START 開始(并發(fā)啟動(dòng))
? ? builder.add_edge(START, "node_1")
? ? builder.add_edge(START, "node_2")
? ? builder.add_edge(START, "node_3")
? ? builder.add_edge(START, "node_4")
? ? # 所有 IO 節(jié)點(diǎn)完成后,匯聚到 summary 節(jié)點(diǎn)
? ? builder.add_edge("node_1", "summary")
? ? builder.add_edge("node_2", "summary")
? ? builder.add_edge("node_3", "summary")
? ? builder.add_edge("node_4", "summary")
? ? # summary 之后結(jié)束
? ? builder.add_edge("summary", END)
? ? # 編譯成可執(zhí)行的圖
? ? return builder.compile()
async def run_langgraph_version():
? ? """運(yùn)行 LangGraph 異步工作流"""
? ? print("\n" + "="*40)
? ? print("【LangGraph 異步工作流】結(jié)構(gòu)化并發(fā) + 狀態(tài)管理")
? ? print("="*40)
? ? start_time = time.time()
? ? # 構(gòu)建圖
? ? graph = build_langgraph_async()
? ? # 關(guān)鍵:必須使用 ainvoke 而不是 invoke,因?yàn)楣?jié)點(diǎn)是異步的
? ? final_state = await graph.ainvoke({
? ? ? ? "results": [], ? ? ?# 初始結(jié)果列表為空
? ? ? ? "task_count": 4, ? ?# 共有4個(gè)并發(fā)任務(wù)
? ? })
? ? elapsed = time.time() - start_time
? ? print(f"\nLangGraph 異步工作流總耗時(shí): {elapsed:.2f} 秒")
? ? return final_state
# ========== 第八部分:主函數(shù),依次運(yùn)行三個(gè)版本進(jìn)行對(duì)比 ==========
async def main():
? ? print("\n" + "="*60)
? ? print("LangGraph 異步編程完整演示 - 性能對(duì)比")
? ? print("="*60)
? ? # 1. 同步版本
? ? run_sync_version()
? ? # 2. 純異步 gather 版本
? ? await run_async_gather()
? ? # 3. LangGraph 異步工作流版本
? ? await run_langgraph_version()
? ? # 打印最終對(duì)比結(jié)論
? ? print("\n" + "="*60)
? ? print("性能對(duì)比總結(jié)")
? ? print("="*60)
? ? print("""
? ? 執(zhí)行方式 ? ? ? ? ? ?總耗時(shí)(約) ? ? 說(shuō)明
? ? ------------------------------------------------
? ? 同步串行 ? ? ? ? ? 4.5 秒 ? ? ? ?任務(wù)依次執(zhí)行,總時(shí)間 = 各任務(wù)時(shí)間之和
? ? 異步 gather ? ? ? ?1.5 秒 ? ? ? ?同時(shí)啟動(dòng),總時(shí)間 ≈ 最慢的任務(wù)時(shí)間
? ? LangGraph異步 ? ? ?1.5 秒 ? ? ? ?同樣快,但額外提供狀態(tài)管理和工作流編排能力
? ? 結(jié)論:
? ? 1. 對(duì)于 IO 密集型任務(wù)(網(wǎng)絡(luò)請(qǐng)求、文件讀寫、數(shù)據(jù)庫(kù)查詢),異步并發(fā)能帶來(lái)數(shù)倍性能提升。
? ? 2. asyncio.gather 適用于簡(jiǎn)單的一次性批量并發(fā)。
? ? 3. LangGraph 適用于需要狀態(tài)共享、條件分支、錯(cuò)誤重試的復(fù)雜工作流。
? ? """)
if __name__ == "__main__":
? ? # asyncio.run 是啟動(dòng)異步事件循環(huán)的標(biāo)準(zhǔn)入口
? ? asyncio.run(main())
四、運(yùn)行結(jié)果解讀
運(yùn)行上述代碼后,你會(huì)看到清晰的對(duì)比:
-
同步版本
:四個(gè)任務(wù)一個(gè)接一個(gè)執(zhí)行,總耗時(shí)約4.5秒。每個(gè)任務(wù)的開始和結(jié)束時(shí)間明顯錯(cuò)開。
-
純異步版本
:四個(gè)任務(wù)幾乎同時(shí)打印“開始”,然后各自在對(duì)應(yīng)的延遲后打印“完成”??偤臅r(shí)約1.5秒(取決于最慢的那個(gè)1.5秒任務(wù))。
-
LangGraph版本
:執(zhí)行時(shí)間與純異步版本相同,但多了一個(gè)匯總節(jié)點(diǎn),自動(dòng)合并了所有結(jié)果并打印報(bào)告。
通過(guò)對(duì)比可以直觀感受到:異步并發(fā)比同步串行快3倍左右。
五、幾個(gè)常見(jiàn)疑問(wèn)解答
5.1 我什么時(shí)候該用異步?
只要你的代碼存在“等待”外部操作的情況,就應(yīng)該考慮異步。典型場(chǎng)景包括:
-
調(diào)用大模型API(OpenAI、Claude、通義千問(wèn)等)
-
查詢遠(yuǎn)程數(shù)據(jù)庫(kù)(MySQL、PostgreSQL、MongoDB)
-
爬取網(wǎng)頁(yè)或調(diào)用第三方HTTP接口
-
讀寫大文件(尤其是網(wǎng)絡(luò)文件系統(tǒng))
這些操作的特點(diǎn)都是:CPU基本不干活,主要是網(wǎng)絡(luò)或磁盤在忙。異步就是利用這段空閑時(shí)間去做別的事情。
5.2 什么時(shí)候不能用異步?
如果你的任務(wù)是CPU密集型的,例如:
-
大量數(shù)值計(jì)算、矩陣運(yùn)算
-
視頻編解碼、圖像處理
-
復(fù)雜的加密或壓縮算法
那么異步幫不上什么忙,因?yàn)镻ython的異步本質(zhì)上是單線程的,CPU本身就忙不過(guò)來(lái)時(shí),異步切換反而增加開銷。這種場(chǎng)景應(yīng)該使用多進(jìn)程(ProcessPoolExecutor)。
5.3 直接用 asyncio.gather 不就行了,為什么還要 LangGraph?
asyncio.gather 適合“一次性并發(fā)一堆獨(dú)立任務(wù)”這種簡(jiǎn)單場(chǎng)景。但真實(shí)的工作流往往更復(fù)雜:
-
任務(wù)B需要等待任務(wù)A的結(jié)果才能開始,同時(shí)任務(wù)C和D可以并行
-
需要處理失敗重試、超時(shí)控制
-
需要?jiǎng)討B(tài)決定下一步執(zhí)行哪個(gè)節(jié)點(diǎn)(比如根據(jù)用戶意圖分支)
-
需要持久化狀態(tài)、支持?jǐn)帱c(diǎn)續(xù)跑
LangGraph 正是為這種場(chǎng)景設(shè)計(jì)的。它用狀態(tài)圖的方式讓你聲明節(jié)點(diǎn)之間的依賴關(guān)系,底層自動(dòng)處理并發(fā)調(diào)度、狀態(tài)合并、條件路由等復(fù)雜邏輯。
5.4 代碼里的 Annotated 是什么意思?
results: Annotated[List[str], lambda x, y: x + y]
這是 LangGraph 的狀態(tài)歸約器(Reducer)。因?yàn)槎鄠€(gè)并發(fā)節(jié)點(diǎn)可能同時(shí)向 results 字段寫入數(shù)據(jù),如果不指定合并規(guī)則,后寫入的可能會(huì)覆蓋前面的。lambda x, y: x + y 告訴 LangGraph:把多個(gè)節(jié)點(diǎn)返回的列表拼接成一個(gè)更大的列表。這樣最終 results 就會(huì)包含所有節(jié)點(diǎn)的輸出。
六、總結(jié)
本文用一個(gè)早餐店的類比講清楚了同步與異步的核心區(qū)別,并用完整可運(yùn)行的代碼演示了三種執(zhí)行方式的性能差異:
-
同步串行
:任務(wù)排隊(duì)執(zhí)行,總時(shí)間為各任務(wù)耗時(shí)之和。代碼簡(jiǎn)單但性能差。
-
純異步 concurrent
:使用 asyncio.gather 同時(shí)啟動(dòng)多個(gè)獨(dú)立任務(wù),總時(shí)間約等于最慢任務(wù)。性能大幅提升,適合簡(jiǎn)單批量并發(fā)。
-
LangGraph 異步工作流
:在保持同等性能的基礎(chǔ)上,增加了結(jié)構(gòu)化狀態(tài)管理、節(jié)點(diǎn)依賴編排、自動(dòng)結(jié)果合并等能力,適合生產(chǎn)級(jí)復(fù)雜工作流。
對(duì)于所有 LLM 應(yīng)用開發(fā)者來(lái)說(shuō),掌握異步編程不再是一個(gè)“高級(jí)技巧”,而是提升產(chǎn)品響應(yīng)速度和用戶體驗(yàn)的必備技能。建議先從本文的完整示例開始運(yùn)行和修改,逐步理解異步狀態(tài)圖的構(gòu)建邏輯,再應(yīng)用到自己的實(shí)際項(xiàng)目中。