1. 核心通信機(jī)制
claude code采用了基于文件系統(tǒng)的郵箱系統(tǒng)作為Agent之間的主要通信機(jī)制,具有以下特點(diǎn):
1.1 郵箱系統(tǒng)架構(gòu)
-
存儲(chǔ)結(jié)構(gòu):每個(gè)Agent都有獨(dú)立的郵箱文件,位于
.claude/teams/{team_name}/inboxes/{agent_name}.json - 消息格式:使用JSON格式存儲(chǔ)消息,包含發(fā)送者、內(nèi)容、時(shí)間戳等信息
- 并發(fā)控制:通過文件鎖機(jī)制防止多Agent同時(shí)寫入造成的沖突
- 消息類型:支持多種結(jié)構(gòu)化消息類型,如權(quán)限請(qǐng)求、任務(wù)分配、關(guān)閉請(qǐng)求等
1.2 消息傳遞流程
-
發(fā)送消息:Agent通過
writeToMailbox()方法向目標(biāo)Agent的郵箱寫入消息 -
讀取消息:Agent通過
readMailbox()或readUnreadMessages()讀取自己郵箱中的消息 -
消息處理:Agent通過輪詢機(jī)制(如
waitForNextPromptOrShutdown())監(jiān)聽新消息 - 消息標(biāo)記:讀取后的消息會(huì)被標(biāo)記為已讀,避免重復(fù)處理
2. 設(shè)計(jì)模式應(yīng)用
本項(xiàng)目在Agent通信和管理中應(yīng)用了多種設(shè)計(jì)模式:
2.1 發(fā)布-訂閱模式 (Publish-Subscribe)
- 實(shí)現(xiàn)方式:通過郵箱系統(tǒng),Agent可以向特定Agent或團(tuán)隊(duì)發(fā)送消息
-
核心組件:
teammateMailbox.ts中的writeToMailbox()和readMailbox()方法 - 優(yōu)勢(shì):解耦發(fā)送者和接收者,支持異步通信,無需直接引用
2.2 代理模式 (Proxy)
-
實(shí)現(xiàn)方式:通過
runWithTeammateContext()和runWithAgentContext()為Agent提供上下文隔離 -
核心組件:
agentContext.ts和teammateContext.ts中的上下文管理 - 優(yōu)勢(shì):確保Agent之間的狀態(tài)隔離,同時(shí)共享必要的系統(tǒng)資源
2.3 觀察者模式 (Observer)
- 實(shí)現(xiàn)方式:Agent通過輪詢機(jī)制監(jiān)聽郵箱中的新消息
-
核心組件:
inProcessRunner.ts中的waitForNextPromptOrShutdown()方法 - 優(yōu)勢(shì):實(shí)時(shí)響應(yīng)新消息,支持Agent的持續(xù)運(yùn)行和交互
2.4 命令模式 (Command)
- 實(shí)現(xiàn)方式:通過結(jié)構(gòu)化消息(如關(guān)閉請(qǐng)求、權(quán)限請(qǐng)求等)執(zhí)行特定操作
-
核心組件:
teammateMailbox.ts中的各種消息類型定義和處理函數(shù) - 優(yōu)勢(shì):標(biāo)準(zhǔn)化消息格式,便于擴(kuò)展和維護(hù)
2.5 責(zé)任鏈模式 (Chain of Responsibility)
- 實(shí)現(xiàn)方式:消息處理通過不同的處理器進(jìn)行分發(fā)
-
核心組件:
inProcessRunner.ts中的消息處理邏輯 - 優(yōu)勢(shì):分離消息類型和處理邏輯,提高代碼可維護(hù)性
2.6 工廠模式 (Factory)
-
實(shí)現(xiàn)方式:通過
getAgentDefinitionsWithOverrides()加載和創(chuàng)建Agent實(shí)例 -
核心組件:
loadAgentsDir.ts中的Agent加載和解析邏輯 - 優(yōu)勢(shì):統(tǒng)一Agent的創(chuàng)建和配置,支持多種來源的Agent定義
3. 關(guān)鍵實(shí)現(xiàn)細(xì)節(jié)
3.1 郵箱系統(tǒng)實(shí)現(xiàn)
// 讀取郵箱消息
export async function readMailbox(
agentName: string,
teamName?: string,
): Promise<TeammateMessage[]> {
const inboxPath = getInboxPath(agentName, teamName);
try {
const content = await readFile(inboxPath, 'utf-8');
const messages = jsonParse(content) as TeammateMessage[];
return messages;
} catch (error) {
const code = getErrnoCode(error);
if (code === 'ENOENT') {
return [];
}
logError(error);
return [];
}
}
// 寫入郵箱消息
export async function writeToMailbox(
recipientName: string,
message: Omit<TeammateMessage, 'read'>,
teamName?: string,
): Promise<void> {
await ensureInboxDir(teamName);
const inboxPath = getInboxPath(recipientName, teamName);
const lockFilePath = `${inboxPath}.lock`;
// 確保郵箱文件存在
try {
await writeFile(inboxPath, '[]', { encoding: 'utf-8', flag: 'wx' });
} catch (error) {
const code = getErrnoCode(error);
if (code !== 'EEXIST') {
logError(error);
return;
}
}
// 使用文件鎖防止并發(fā)寫入
let release: (() => Promise<void>) | undefined;
try {
release = await lockfile.lock(inboxPath, {
lockfilePath: lockFilePath,
...LOCK_OPTIONS,
});
// 讀取現(xiàn)有消息
const messages = await readMailbox(recipientName, teamName);
// 添加新消息
const newMessage: TeammateMessage = {
...message,
read: false,
};
messages.push(newMessage);
// 寫回郵箱文件
await writeFile(inboxPath, jsonStringify(messages, null, 2), 'utf-8');
} catch (error) {
logError(error);
} finally {
if (release) {
await release();
}
}
}
3.2 Agent運(yùn)行和通信
// 運(yùn)行進(jìn)程內(nèi)Agent
export async function runInProcessTeammate(
config: InProcessRunnerConfig,
): Promise<InProcessRunnerResult> {
const {
identity,
taskId,
prompt,
agentDefinition,
teammateContext,
toolUseContext,
abortController,
} = config;
// 創(chuàng)建Agent上下文
const agentContext: AgentContext = {
agentId: identity.agentId,
parentSessionId: identity.parentSessionId,
agentName: identity.agentName,
teamName: identity.teamName,
agentColor: identity.color,
planModeRequired: identity.planModeRequired,
isTeamLead: false,
agentType: 'teammate',
invocationKind: 'spawn',
invocationEmitted: false,
};
// 構(gòu)建系統(tǒng)提示
let teammateSystemPrompt: string;
// ... 系統(tǒng)提示構(gòu)建邏輯 ...
// 解析Agent定義
const resolvedAgentDefinition: CustomAgentDefinition = {
agentType: identity.agentName,
whenToUse: `In-process teammate: ${identity.agentName}`,
getSystemPrompt: () => teammateSystemPrompt,
tools: agentDefinition?.tools
? [
...new Set([
...agentDefinition.tools,
SEND_MESSAGE_TOOL_NAME,
TEAM_CREATE_TOOL_NAME,
TEAM_DELETE_TOOL_NAME,
TASK_CREATE_TOOL_NAME,
TASK_GET_TOOL_NAME,
TASK_LIST_TOOL_NAME,
TASK_UPDATE_TOOL_NAME,
]),
]
: ['*'],
source: 'projectSettings',
permissionMode: 'default',
...(agentDefinition?.model ? { model: agentDefinition.model } : {}),
};
// 主Agent循環(huán)
while (!abortController.signal.aborted && !shouldExit) {
// 處理當(dāng)前提示
// ... 提示處理邏輯 ...
// 運(yùn)行Agent
await runWithTeammateContext(teammateContext, async () => {
return runWithAgentContext(agentContext, async () => {
// 標(biāo)記任務(wù)為運(yùn)行中
updateTaskState(
taskId,
task => ({ ...task, status: 'running', isIdle: false }),
setAppState,
);
// 運(yùn)行Agent循環(huán)
for await (const message of runAgent({
agentDefinition: iterationAgentDefinition,
promptMessages,
toolUseContext,
canUseTool: createInProcessCanUseTool(
identity,
currentWorkAbortController,
(waitMs: number) => {
updateTaskState(
taskId,
task => ({ ...task, permissionWaitMs: (task.permissionWaitMs || 0) + waitMs }),
setAppState,
);
},
),
abortController: currentWorkAbortController,
options: {
...toolUseContext.options,
model,
forkContextMessages,
contentReplacementState: teammateReplacementState,
isTeammate: true,
teammateIdentity: identity,
},
})) {
// 處理Agent消息
// ... 消息處理邏輯 ...
}
});
});
// 發(fā)送 idle 通知
await sendIdleNotification(
identity.agentName,
identity.color,
identity.teamName,
{
idleReason: workWasAborted ? 'interrupted' : 'available',
summary: getLastPeerDmSummary(allMessages),
},
);
// 等待下一個(gè)提示或關(guān)閉請(qǐng)求
const waitResult = await waitForNextPromptOrShutdown(
identity,
abortController,
taskId,
toolUseContext.getAppState,
setAppState,
identity.parentSessionId,
);
// 處理等待結(jié)果
// ... 等待結(jié)果處理邏輯 ...
}
// 返回結(jié)果
return {
success: true,
messages: allMessages,
};
}
4. 優(yōu)勢(shì)與特點(diǎn)
- 去中心化通信:基于文件系統(tǒng)的郵箱系統(tǒng)實(shí)現(xiàn)了去中心化的通信機(jī)制,Agent之間無需直接引用即可通信
- 可靠性:通過文件鎖機(jī)制確保消息傳遞的可靠性,避免并發(fā)寫入沖突
- 可擴(kuò)展性:支持多種消息類型和處理邏輯,易于擴(kuò)展新的通信功能
- 上下文隔離:通過AsyncLocalStorage為每個(gè)Agent提供獨(dú)立的上下文,確保狀態(tài)隔離
- 靈活性:支持進(jìn)程內(nèi)和進(jìn)程間的Agent通信,適應(yīng)不同的部署場(chǎng)景
5. 應(yīng)用場(chǎng)景
- 團(tuán)隊(duì)協(xié)作:多個(gè)Agent可以組成團(tuán)隊(duì),共同完成復(fù)雜任務(wù)
- 任務(wù)分配:通過任務(wù)分配消息實(shí)現(xiàn)Agent之間的工作分配
- 權(quán)限管理:通過權(quán)限請(qǐng)求/響應(yīng)機(jī)制實(shí)現(xiàn)Agent的權(quán)限控制
- 狀態(tài)同步:通過 idle 通知等機(jī)制同步Agent的狀態(tài)
- 資源共享:Agent之間可以共享工具、模型等資源
6. 總結(jié)
本項(xiàng)目采用了基于文件系統(tǒng)的郵箱系統(tǒng)作為Agent通信的核心機(jī)制,結(jié)合多種設(shè)計(jì)模式實(shí)現(xiàn)了靈活、可靠的Agent協(xié)作系統(tǒng)。這種設(shè)計(jì)不僅支持簡(jiǎn)單的消息傳遞,還能處理復(fù)雜的權(quán)限管理、任務(wù)分配等場(chǎng)景,為多Agent系統(tǒng)提供了強(qiáng)大的通信基礎(chǔ)設(shè)施。
通過發(fā)布-訂閱、代理、觀察者等設(shè)計(jì)模式的應(yīng)用,本項(xiàng)目實(shí)現(xiàn)了Agent之間的解耦和高效協(xié)作,同時(shí)保持了系統(tǒng)的可擴(kuò)展性和可維護(hù)性。這種設(shè)計(jì)思路對(duì)于構(gòu)建復(fù)雜的多Agent系統(tǒng)具有重要的參考價(jià)值。