一、前言:從 "路由" 轉(zhuǎn)向 "表達(dá)式"
在傳統(tǒng)編程范式中,我們往往通過(guò)命令式結(jié)構(gòu)(如 if/else、for、try/except)明確編排程序的執(zhí)行路徑,就像手動(dòng)設(shè)計(jì)一張流程圖。
LangChain 通過(guò) LCEL(LangChain 表達(dá)式語(yǔ)言)將這類流程控制抽象為“可組合的執(zhí)行表達(dá)式”。每個(gè)模塊都被封裝為 Runnable,多個(gè)模塊之間通過(guò)鏈?zhǔn)秸Z(yǔ)法自然組合,系統(tǒng)根據(jù)數(shù)據(jù)流自動(dòng)推導(dǎo)執(zhí)行順序。
這不僅僅是語(yǔ)法層面的變化,更代表著一種編程思維的演進(jìn):從“命令式控制”轉(zhuǎn)向“表達(dá)式組合”,從“顯式邏輯”轉(zhuǎn)向“模塊化構(gòu)建”。
二、核心概念:Runnable 是什么?
Runnable 是 LangChain 架構(gòu)中的核心抽象單元,是 LCEL 表達(dá)式語(yǔ)言的最小執(zhí)行模塊和構(gòu)建基石。
任何可執(zhí)行的單元都是 Runnable
核心基類
class Runnable(ABC, Generic[Input, Output]):
"""A unit of work that can be invoked, batched, streamed, transformed and composed.
...
"""
@abstractmethod
def invoke(
self,
input: Input,
config: Optional[RunnableConfig] = None,
**kwargs: Any,
) -> Output:
"""Transform a single input into an output.
...
"""
.invoke(input) 是 LangChain 中 Runnable 接口的核心同步調(diào)用方法,用于將輸入傳入模塊,執(zhí)行并返回結(jié)果。適用于單次執(zhí)行、測(cè)試鏈路或組合多個(gè)模塊后的最終觸發(fā)。
特點(diǎn):
- 同步執(zhí)行:立即返回處理結(jié)果
- 通用入口:適用于任何 Runnable 對(duì)象(LLM、鏈、工具等)
- 可組合:支持與 .with_retry()、.transform() 等方法鏈?zhǔn)绞褂?/li>
LangChain 將你所知道的:
- LLM
- PromptTemplate
- ChatHistory
- Tool
- AgentExecutor
通通都抽象為 Runnable,不同類型只是指定了不同的輸入/輸出數(shù)據(jù)類型。
- .stream() 將普通的執(zhí)行過(guò)程變成流式執(zhí)行,允許逐步產(chǎn)出結(jié)果,而不是等待全部完成后一次性返回。
- 在調(diào)用大型語(yǔ)言模型(LLM)或其它需要長(zhǎng)時(shí)間運(yùn)行的任務(wù)時(shí),輸出結(jié)果往往是增量產(chǎn)生的。使用 .stream(),你可以實(shí)時(shí)接收部分結(jié)果,實(shí)現(xiàn)更流暢的交互體驗(yàn)。
- .batch() 支持一次性處理一批輸入,返回一批對(duì)應(yīng)輸出,提高整體吞吐量。
- .transform() 為輸出添加一個(gè)轉(zhuǎn)換器(函數(shù)),對(duì)結(jié)果進(jìn)行進(jìn)一步加工或格式化。
- .with_fallbacks() 設(shè)置備用執(zhí)行路徑,當(dāng)主任務(wù)失敗時(shí)自動(dòng)切換執(zhí)行備用任務(wù)。
- .with_retry() 為當(dāng)前任務(wù)添加自動(dòng)重試功能,遇到異常自動(dòng)按重試策略重試。
| 方法 | 作用說(shuō)明 | 典型應(yīng)用場(chǎng)景 |
|---|---|---|
.stream() |
實(shí)時(shí)流式輸出,邊生成邊返回 | 聊天機(jī)器人、文本生成 |
.batch() |
批量處理輸入,提高吞吐量 | 批量分類、批量生成 |
.transform() |
對(duì)輸出結(jié)果進(jìn)行后續(xù)轉(zhuǎn)換處理 | 格式化、過(guò)濾、標(biāo)注 |
.with_fallbacks() |
多路徑備選執(zhí)行,容錯(cuò)降級(jí) | 多模型容災(zāi)、工具降級(jí) |
.with_retry() |
失敗自動(dòng)重試,保證穩(wěn)定性 | 網(wǎng)絡(luò)異常、API 失敗重試 |
三、LCEL 組合式表達(dá)式:從幾個(gè) Runnable 到一條“表達(dá)式鏈”
LangChain Expression Language(LCEL)是 LangChain 提供的一種 聲明式編排語(yǔ)言,它讓你用鏈?zhǔn)讲僮鞣▅)將多個(gè) Runnable 組合成一個(gè)數(shù)據(jù)流管道,就像使用 Unix 管道 | 或 Pandas 的 .pipe() 一樣,把一個(gè)處理單元的輸出自動(dòng)傳給下一個(gè)。
LCEL 的目標(biāo)是:
“用最少的代碼,構(gòu)建最清晰、最靈活的 AI 執(zhí)行流程?!?/p>
示例代碼:
from langchain_core.runnables import RunnableMap, RunnableLambda
chain = RunnableMap({"name": lambda _: "LangChain"}) \
| RunnableLambda(lambda d: f"Hello, {d['name']}!")
print(chain.invoke({}))
# 輸出:Hello, LangChain!
執(zhí)行過(guò)程分解:
RunnableMap(...):將輸入 {} 映射為 {"name": "LangChain"}。
RunnableLambda(...):提取字典中的 "name" 字段并格式化字符串。
|:使用 LCEL 的組合語(yǔ)法把兩者連接成執(zhí)行鏈。
這就是 LCEL 最基礎(chǔ)的組合思想:每個(gè) Runnable 就像函數(shù),每個(gè) | 表示把上一個(gè)的輸出作為下一個(gè)的輸入。
那么,為什么 Runnable 可以用 | 運(yùn)算符組合?
這是因?yàn)?LangChain 的 Runnable 類型實(shí)現(xiàn)了 Python 的運(yùn)算符重載機(jī)制。
在 Python 中,a | b 實(shí)際調(diào)用的是 a.or(b)。LangChain 通過(guò)實(shí)現(xiàn) or() 方法,使得 Runnable 可以像表達(dá)式一樣組合使用:
class Runnable:
...
def __or__(
self,
other: Union[
Runnable[Any, Other],
Callable[[Any], Other],
Callable[[Iterator[Any]], Iterator[Other]],
Mapping[str, Union[Runnable[Any, Other], Callable[[Any], Other], Any]],
],
) -> RunnableSerializable[Input, Other]:
你可以把 RunnableSequence 理解為“執(zhí)行鏈容器”,它按順序執(zhí)行每個(gè)子模塊的 invoke(),自動(dòng)將數(shù)據(jù)流傳遞下去。
上面的執(zhí)行鏈后續(xù)可以這樣擴(kuò)展:
| some_chat_model \
| some_output_parser \
| final_formatter
四、實(shí)現(xiàn)解析:執(zhí)行鏈條與數(shù)據(jù)流通道
LangChain 的 Runnable 原理根基于一套“輸入 - 運(yùn)行 - 輸出”的基本通道模型:
通過(guò) invoke() 啟動(dòng)
def invoke(self, input: Input) -> Output:
return self._call_with_config(input, config)
配合 Configurable 和 RunnableBinding
它們會(huì)為 chain 中的每個(gè) Runnable 自動(dòng)分配唯一的 ID,并附加配置(config),以便于后續(xù)的日志記錄、異常追蹤以及 fallback 分支的排查與切換。
支持 RunnableSequence 系列化
用 | 連接
內(nèi)部編排 _invoke, _batch, _stream
總結(jié)核心意圖
LangChain 在執(zhí)行層的設(shè)計(jì)并不僅僅是為了“串起來(lái)能用”,而是希望通過(guò) Runnable + LCEL 表達(dá)式系統(tǒng),構(gòu)建一個(gè)可觀測(cè)、可控制、易擴(kuò)展、可組合、可替換的 AI 應(yīng)用執(zhí)行基座。
下圖是典型執(zhí)行鏈條的簡(jiǎn)化視圖,展示了 LangChain 中各個(gè)模塊的基本作用和周邊能力增強(qiáng)點(diǎn):
[Prompt] -> [LLM] -> [Parser] -> [PostProcess]
\ | | |
Config Retry OutputMap LangSmith Trace
每個(gè)模塊之間通過(guò) Runnable 接口無(wú)縫銜接
任意節(jié)點(diǎn)都可以掛接 Retry / Config / Debug / Tracing 能力
整條鏈條既是執(zhí)行流程,也是組合表達(dá)式(LCEL)
這種結(jié)構(gòu)讓 LangChain 能像搭積木一樣構(gòu)建復(fù)雜任務(wù),又能像監(jiān)控系統(tǒng)一樣追蹤每個(gè)模塊的執(zhí)行情況。
五、擴(kuò)展方案:如何實(shí)現(xiàn)自定義 Runnable
所有 Runnable 只需要繼承 Runnable,重寫 _invoke:
示例:實(shí)現(xiàn)一個(gè)簡(jiǎn)單 Tool
from langchain_core.runnables import Runnable
class AddOne(Runnable[int, int]):
def invoke(self, input: int, config=None) -> int:
return input + 1
- AddOne 繼承了 Runnable[int, int],表示這個(gè)類輸入是 int,輸出也是 int。
這個(gè)簡(jiǎn)單的 AddOne 類展示了 LangChain 架構(gòu)的核心理念:將所有可執(zhí)行邏輯抽象為統(tǒng)一接口 Runnable,使其天然具備組合性、控制性和可觀測(cè)性。
通過(guò)繼承 Runnable,即便是最簡(jiǎn)單的 Python 函數(shù),也能被納入 LCEL 表達(dá)式系統(tǒng),與 LLM、Prompt、Parser 等模塊無(wú)縫集成,構(gòu)建出結(jié)構(gòu)清晰、邏輯強(qiáng)大、易于維護(hù)的 AI 應(yīng)用流程。
與其他 Runnable 組合
from langchain_core.runnables import RunnableLambda
add = AddOne()
square = RunnableLambda(lambda x: x * x)
workflow = add | square
workflow.invoke(2) # 輸出:9
六、相關(guān)擴(kuò)展功能
功能說(shuō)明和代碼示例
- with_retry(RetryConfig):自動(dòng)重試機(jī)制,增強(qiáng)魯棒性
- 緩解 LLM 調(diào)用時(shí)的 timeout、RateLimitError 或網(wǎng)絡(luò)故障問(wèn)題。
- 就像給函數(shù)加上 try...except...retry,但更優(yōu)雅、自動(dòng)、聲明式。
from langchain_core.runnables.retry import RetryHandler
retryable = llm.with_retry(RetryHandler(max_attempts=3))
response = retryable.invoke("你好")
- with_fallbacks([B, C]):備用路徑,失敗時(shí)自動(dòng)切換
- 當(dāng)主要模塊(如 LLM A)不可用時(shí),自動(dòng)嘗試備用方案(如 B 和 C)。
- 像 try A except → try B → try C 的自動(dòng)化策略,非常適合生產(chǎn)場(chǎng)景。
robust_chain = llm.with_fallbacks([openai_llm, local_llm])
- with_config(tags=[...], run_name=...):執(zhí)行元信息,便于可觀測(cè)性
- 為每次執(zhí)行添加追蹤信息,方便在 LangSmith 等調(diào)試平臺(tái)中記錄來(lái)源與上下文。
- 相當(dāng)于“打標(biāo)簽 + 命名”,你在 LangSmith 中可以看到漂亮的執(zhí)行軌跡。
configured = chain.with_config(tags=["query", "qa"], run_name="qa_main_chain")
- RunnableParallel:并行執(zhí)行多個(gè)任務(wù),提高吞吐率
- 一次性運(yùn)行多個(gè) Runnable,并收集所有結(jié)果,適合處理多個(gè)輸入源或分任務(wù)處理。
- 輸出是一個(gè)字典:{"qa": ..., "summary": ...},并行執(zhí)行效率更高。
from langchain_core.runnables import RunnableParallel
parallel_chain = RunnableParallel({
"qa": qa_chain,
"summary": summary_chain,
})
result = parallel_chain.invoke({"input": "今天的新聞..."})
- RunnableBranch:條件分支邏輯,像 if-else 一樣靈活
- 根據(jù)輸入內(nèi)容的特征動(dòng)態(tài)選擇執(zhí)行路徑。
- 把控制流 if/elif/else 模塊化、聲明式地掛接到鏈路中。
from langchain_core.runnables import RunnableBranch
branch = RunnableBranch(
(lambda x: "help" in x.lower(), help_chain),
(lambda x: "buy" in x.lower(), shopping_chain),
default_chain
)
擴(kuò)展功能與 .stream() / .batch() 聯(lián)合使用
LangChain 中的 Runnable 擁有統(tǒng)一的三種執(zhí)行入口:
- .invoke():同步執(zhí)行單次輸入
- .batch():同步執(zhí)行一批輸入
- .stream():逐步返回輸出(生成式任務(wù)常用)
下面我們按擴(kuò)展能力逐一說(shuō)明如何結(jié)合使用:
1. with_retry() + .batch() / .stream()
- 作用:為一批請(qǐng)求或流式數(shù)據(jù)執(zhí)行增加自動(dòng)重試機(jī)制,增強(qiáng)穩(wěn)健性。
- 示例:
retry_llm = llm.with_retry()
results = retry_llm.batch(["你好", "請(qǐng)翻譯", "幫我寫一封郵件"])
for chunk in llm.with_retry().stream("請(qǐng)寫一段詩(shī)"):
print(chunk, end="")
提示:
- RetryHandler 會(huì)針對(duì)每個(gè)失敗樣本重試,不會(huì)重跑整個(gè) batch。
- 如果用于 .stream(),只會(huì)在流開(kāi)始失敗時(shí)重試,流中間錯(cuò)誤不會(huì)回滾。
2. with_fallbacks() + .batch() / .stream()
- 作用:當(dāng)主模塊失敗時(shí),自動(dòng)切換到備用模塊處理批量請(qǐng)求或流式任務(wù)。
- 示例:
robust = llm.with_fallbacks([backup_llm])
results = robust.batch(["你是誰(shuí)", "請(qǐng)總結(jié)這段文字"])
for chunk in robust.stream("用 fallback 模式生成一段回答"):
print(chunk, end="")
提示:
- fallback 是按整個(gè)輸入粒度判斷是否失敗,不是逐個(gè) token fallback
- 不適合頻繁局部失敗但整體還有效的情況(建議配合 RetryHandler)
3. with_config() + .batch() / .stream()
- 作用:批處理或流式調(diào)用時(shí),添加執(zhí)行標(biāo)識(shí),用于日志標(biāo)注、鏈路追蹤(LangSmith 等)。
- 示例:
configured = chain.with_config(tags=["批處理任務(wù)"], run_name="qa_batch")
results = configured.batch(list_of_questions)
for token in chain.with_config(run_name="stream_chat").stream("你好啊"):
print(token, end="")
提示:
- 在調(diào)試多鏈條并行執(zhí)行時(shí)非常有用
- tags 和 run_name 可配合 LangSmith 實(shí)現(xiàn)全鏈路調(diào)用樹分析
4. RunnableParallel + .batch() / .stream()
作用:并行執(zhí)行多個(gè)子鏈,支持同時(shí)流式或批量處理多個(gè)子任務(wù)。
示例:
parallel = RunnableParallel({
"translate": translator,
"summary": summarizer
})
results = parallel.batch([{"text": "A"}, {"text": "B"}])
輸出結(jié)果結(jié)構(gòu):
[
{"translate": "...", "summary": "..."},
{"translate": "...", "summary": "..."}
]
目前 stream + 并行使用受限,不建議用于多路 stream 合并,需自定義包裝
5. RunnableBranch + .batch() / .stream()
作用:批量輸入中可根據(jù)條件路由到不同模塊,或?qū)γ織l輸入動(dòng)態(tài)選擇處理邏輯。
示例:
branch = RunnableBranch(
(lambda x: "翻譯" in x, translator),
(lambda x: "摘要" in x, summarizer),
default_chain
)
results = branch.batch(["請(qǐng)翻譯", "請(qǐng)摘要", "默認(rèn)處理"])
七、總結(jié)思維
?Runnable 和 LCEL 不是一套 API,而是一套 執(zhí)行邏輯架構(gòu)模型。
它為 LLM 應(yīng)用提供了一套通用型執(zhí)行單元,并能夠被分層、并行、切換、監(jiān)控和應(yīng)急處理,是 LangChain 搭建模塊化調(diào)度器的基礎(chǔ)。
接下來(lái)我們將手工實(shí)現(xiàn)一個(gè)完整的 Runnable 模塊,并構(gòu)建屬于自己的 LangChain 流式鏈條。