【MCP-03】一次完整的MCP和LLM交互流程

前言

SpringAI MCP技術(shù)試用【01】JSON-RPC2.0協(xié)議,【02】SSE和StreamableHttp技術(shù)整理這幾篇總結(jié)大概說明了下MCP出現(xiàn)的原因,以及簡單說明了下技術(shù)細節(jié),但是MCP還是主要為LLM大模型服務(wù)的,那MCP和LLM大模型是怎么交互的呢?這里簡單總結(jié)下交互流程。

整體流程

image-2025-7-31_20-20-13.png

image-2025-7-31_20-8-3.png

一些說明:
1)用戶向AIAgent問:幫我計算3*111,此時AIAgent就是MCP Client,MCP Client會先拉取所有注冊到AIAgent的MCPServer元數(shù)據(jù)信息,然后把用戶Query和所有的MCP Server以及MCP Tool的信息一起發(fā)送給 LLM。
2)LLM拿到信息后開始推理,基于用戶的Query和MCP Server的信息,選出解決用戶問題最合適的那個MCP Server和MCP Tool,然后返回給AIAgent(MCP Client)。這里L(fēng)LM返回給AIAgent的信息是:“你用multiply這個 MCP Server里的Calculate_McpServer這個MCP Tool吧,它可以解決用戶的問題”
3)AIAgent(MCP Client)現(xiàn)在知道該使用哪個MCP Server里的哪個MCP Tool了,直接調(diào)用那個MCP Tool,獲取結(jié)果。調(diào)用Calculate_McpServer這個MCP Server里的multiply這個MCP Tool。
4)Calculate_McpServer 返回結(jié)果(計算乘法后的結(jié)果)給 AIAgent(MCP Client)。
5)AIAgent(MCP Client)把用戶的問題和從Calculate_McpServer處拿到的結(jié)果再一次給了LLM,目的是讓LLM結(jié)合問題和答案判斷下是否要做上述的循環(huán),如果沒有則規(guī)整一下內(nèi)容返回。
6)LLM把整理后的內(nèi)容返回給AIAgent(MCP Client),最后AIAgent(MCP Client)再返回整合后的內(nèi)容給用戶。
需要注意的是 不是所有的大模型都支持FunctionCall的方式,還有一種通用的方式是走系統(tǒng)提示詞(system prompt),大體思路和FunctionCall差不多,只是系統(tǒng)提示詞(system prompt)需要LLM打模型按指定的格式返回MCPServer的名稱Tool以及args參數(shù)。

Demo

下文用Python編寫上文時序圖中的Calculate_McpServer的Demo,嘗試LLM使用MCP Tool完成四則計算任務(wù),由于HTTP+SSE已經(jīng)被官方廢棄,下文只使用streamable_http傳輸協(xié)議編寫Demo。

Python版本

MCPServer.py

# MCPServer
from mcp.server.fastmcp import FastMCP
import logging

logging.basicConfig(
    level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)

MCP_SERVER_NAME = "Calculate_McpServer"

logger = logging.getLogger(MCP_SERVER_NAME)
# 初始化FastMCP服務(wù)器
mcp = FastMCP(name=MCP_SERVER_NAME,instructions="數(shù)學(xué)四則計算")

@mcp.tool(name="add", description="對兩個數(shù)字進行加法")
def add(a: float, b: float) -> float:
    """
    Add two numbers.

    Parameters:
    - a (float): First number (required)
    - b (float): Second number (required)

    Returns:
    - float: The result of a + b
    """
    return a + b

@mcp.tool(name="subtract", description="對兩個數(shù)字進行減法")
def subtract(a: float, b: float) -> float:
    """
    Subtract two numbers.

    Parameters:
    - a (float): The number to subtract from (required)
    - b (float): The number to subtract (required)

    Returns:
    - float: The result of a - b
    """
    return a - b

@mcp.tool(name="multiply", description="對兩個數(shù)字進行乘法")
def multiply(a: float, b: float) -> float:
    """
    Multiply two numbers.

    Parameters:
    - a (float): First number (required)
    - b (float): Second number (required)

    Returns:
    - float: The result of a * b
    """
    return a * b

@mcp.tool(name="divide", description="對兩個數(shù)字進行除法")
def divide(a: float, b: float) -> float:
    """
    Divide two numbers.

    Parameters:
    - a (float): Numerator (required)
    - b (float): Denominator (required, must not be zero)

    Returns:
    - float: The result of a / b
    """
    if b == 0:
        raise ValueError("Division by zero is not allowed")
    return a / b

if __name__ == "__main__":
    mcp.settings.host = "0.0.0.0"
    mcp.settings.port = 8000
    mcp.settings.log_level = "INFO"

    # stateless_http和json_response,兩個參數(shù)默認(rèn)都為False
    # stateless_http
    # 控制是否開啟SSE通道和是否對對客戶端會話進行管理
    # json_response
    # 控制Post請求響應(yīng)結(jié)果數(shù)據(jù)結(jié)構(gòu)是否用JSON還是SSE事件數(shù)據(jù)流(不是走SSE通道,只是用SSE事件數(shù)據(jù)格式)
    # mcp.settings.json_response = True
    # mcp.settings.stateless_http = True
    
    # 初始化并運行服務(wù)器
    print("Starting MCPServer...")
    # mcp.run(transport='sse')
    mcp.run(transport="streamable-http")

MCPClient.py

# MCPClient.py
from typing import Optional
import asyncio
from contextlib import AsyncExitStack
from mcp.client.streamable_http import streamablehttp_client
from mcp import ClientSession, Tool
from typing import Optional, List
from utils import logger

""" 
官方MCP client demo:https://github.com/modelcontextprotocol/quickstart-resources/blob/main/mcp-client-python/client.py
 """

# 配置日志記錄
logger = logger.setup_logging()


def convert_tools(tools: List[Tool]):
    """
    將MCP Server的list tools獲取到的工具列表,轉(zhuǎn)換為OpenAI API的可用工具列表
    """
    ret = []
    for tool in tools:
        parameters = {
            "type": "object",
            "properties": {},
            "required": (
                tool.inputSchema["required"] if "required" in tool.inputSchema else []
            ),
        }
        properties = tool.inputSchema["properties"]
        for param_name in properties:
            if "type" in properties[param_name]:
                param_type = properties[param_name]["type"]
            elif (
                "anyOf" in properties[param_name]
                and len(properties[param_name]["anyOf"]) > 0
            ):
                param_type = properties[param_name]["anyOf"][0]["type"]
            else:
                param_type = "string"
            parameters["properties"][param_name] = {
                "type": param_type,
                "description": properties[param_name].get("description", ""),
            }

        ret.append(
            {
                "type": "function",
                "function": {
                    "name": tool.name,
                    "description": tool.description,
                    "parameters": parameters,
                },
            }
        )
    return ret

class MCPClient:
    def __init__(self):
        # 初始化會話和客戶端對象
        self.session: Optional[ClientSession] = None
        self.exit_stack = AsyncExitStack()
        self.stream_client = None  # 修改為更通用的名稱

    async def connect_to_server(self, url):
        """Connect to an MCP server using Streamable HTTP.
        Args:
            url: Streamable HTTP 地址
        """
        mcp_timeout = 300
        logger.info(f"mcp_timeout: {mcp_timeout}")

        # 使用 streamable_http_client 替代 sse_client
        http_transport = await self.exit_stack.enter_async_context(
            streamablehttp_client(url=url, timeout=mcp_timeout)  # 修改為 streamable_http_client
        )
        self.streamable_http, self.write, _ = http_transport
        self.session = await self.exit_stack.enter_async_context(
            ClientSession(self.streamable_http, self.write)
        )

        logger.info(f"connect_to_server successful, streamable http url: {url}")
        await self.session.initialize()

    async def cleanup(self):
        await self.exit_stack.aclose()

    @classmethod
    def list_tools(cls, mcp_server_url, reqid):
        """
        列出指定MCP Server上可用的工具
        Args:
            mcp_server_url: MCP Server的URL
        Returns:
            可用的工具列表
        """
        async def async_task():
            client = cls()
            await client.connect_to_server(mcp_server_url)
            response = await client.session.list_tools()
            mcp_tools = response.tools
            available_tools = convert_tools(mcp_tools)
            logger.info(f"reqid:{reqid},tool size:{len(available_tools)},tool name:{",".join([tool["function"]["name"] for tool in available_tools])}")
            # logger.info([tool["function"]["name"] for tool in available_tools])
            await client.cleanup()
            return available_tools

        available_tools = asyncio.run(async_task())
        return available_tools

    @classmethod
    def call_tool(cls, mcp_server_url: str, tool_name: str, tool_args: dict, reqid):
        """
        調(diào)用指定MCP Server上的工具
        Args:
            mcp_server_url: MCP Server的URL
            tool_name: 工具名稱
            tool_args: 工具參數(shù)
        Returns:
            工具調(diào)用結(jié)果
        """
        async def async_task():
            client = cls()
            await client.connect_to_server(mcp_server_url)
            logger.info(f"reqid;{reqid}, mcp client, tool_name: {tool_name}, tool_args: {tool_args}")
            result = await client.session.call_tool(tool_name, tool_args)
            logger.info(f"reqid;{reqid}, mcp client, tool_args: {tool_args}, result: {result}")
            await client.cleanup()
            return result

        result = asyncio.run(async_task())
        return result

if __name__ == "__main__":
    mcpserver_url = "http://127.0.0.1:8000/mcp"  # 修改為stream端點
    # MCPClient.list_tools(mcpserver_url, "mock reqid")
    MCPClient.call_tool(mcpserver_url,"multiply",{"a": 3, "b": 111},"mock reqid")

LLMChat2MCP_FunctionCall.py

from openai import OpenAI
from utils import logger
from dotenv import load_dotenv
import os
from streamablehttp_mcpclient import MCPClient
import json
import uuid
import argparse
import traceback

logger = logger.setup_logging()

# load environment variables from .env
load_dotenv()

API_KEY = os.environ["API_KEY"]
BASE_URL = os.environ["BASE_URL"]
MODEL_NAME = os.environ["MODEL_NAME"]


def chat_loop(mcp_server_url):
    """Run an interactive chat loop"""
    logger.info("\nMCP Client Started!")
    logger.info("Type your queries or 'quit' to exit.")

    while True:
        try:
            query = input("\nQuery: ").strip()
            if not query:  # 如果輸入為空,直接跳過
                continue
            if query.lower() == "quit":
                break
            random_uuid = uuid.uuid4()
            response = process_query(mcp_server_url, random_uuid, query)
            logger.info(response)
        except Exception as e:
            logger.error(f"\nchat_loop Error: {e}\n{traceback.format_exc()}")


def process_query(mcp_server_url, reqid, query):
    """
    處理查詢,使用OpenAI API和MCP工具
    Args:
        mcp_server_url: MCP Server的URL
        reqid: 請求ID
        query: 用戶查詢
    Returns:
        處理結(jié)果
    """
    available_tools = MCPClient.list_tools(mcp_server_url, reqid)
    openai = OpenAI(api_key=API_KEY, base_url=BASE_URL)

    messages = [{"role": "user", "content": query}]

    try:
        current_response = openai.chat.completions.create(
            model=MODEL_NAME,
            messages=messages,
            tools=available_tools,
            stream=False,
        )

        final_text = []

        if current_response.choices[0].message.content:
            final_result = current_response.choices[0].message.content
            final_text.append(final_result)
            # logger.info("AI:" + final_result)

        # 處理返回的內(nèi)容
        content = current_response.choices[0]
        # logger.info(
        #     "OpenAI Response JSON:\n%s",
        #     json.dumps(current_response.model_dump(), indent=4),
        # )
        if content.finish_reason == "tool_calls":
            # 如果需要使用工具,解析工具調(diào)用
            tool_call = content.message.tool_calls[0]
            tool_name = tool_call.function.name
            tool_args = json.loads(tool_call.function.arguments)
            callInfoStr = f"[Calling tool {tool_name} with args {tool_args}]"
            logger.info(callInfoStr)
            # 執(zhí)行工具
            result = MCPClient.call_tool(mcp_server_url, tool_name, tool_args, "LLMreqId1")
            final_text.append(callInfoStr)

            # 將結(jié)果存入消息歷史
            # 檢查 result 和 result.content 是否存在
            tool_response = ""
            if result and hasattr(result, "content") and result.content:
                tool_response = result.content[0].text
            else:
                tool_response = "Tool returned empty or invalid response"

            messages.append(content.message.model_dump())
            messages.append(
                {
                    "role": "tool",
                    "content": tool_response,
                    "tool_call_id": tool_call.id,
                }
            )

            # 將結(jié)果返回給大模型生成最終響應(yīng)
            current_response = openai.chat.completions.create(
                model=MODEL_NAME,
                messages=messages,
                tools=available_tools,
                stream=False,
            )
            final_result = current_response.choices[0].message.content
            final_text.append(final_result)
        return "\n".join(final_text)
    except Exception as e:
        logger.error(
            f"process_query Error processing query: {e}\n{traceback.format_exc()}"
        )
        return None
if __name__ == "__main__":
    parser = argparse.ArgumentParser(description="mcp client call tool")
    parser.add_argument(
        "--url", required=True, help="Full URL, e.g. http://localhost:8114/sse"
    )
    args = parser.parse_args()
    # query = "你是誰?"
    # result = process_query("http://localhost:8114/sse", "mock reqid", query)
    # logger.info(f"Query result: {result}")
    chat_loop(args.url)

LLMChat2MCP_SystemPrompt.py

# LLMChat2MCP_FunctionCall.py
from openai import OpenAI
from utils import logger
from dotenv import load_dotenv
import os
from streamablehttp_mcpclient import MCPClient
import json
import uuid
import argparse
import traceback
import re

logger = logger.setup_logging()

# 加載環(huán)境變量
load_dotenv()

API_KEY = os.environ["API_KEY"]
BASE_URL = os.environ["BASE_URL"]
MODEL_NAME = os.environ["MODEL_NAME"]
MAX_TOOL_CALLS = 3  # 最大工具調(diào)用次數(shù),防止無限循環(huán)


def build_system_prompt(available_tools):
    """構(gòu)建系統(tǒng)提示詞,描述所有可用的MCP工具"""
    tools_desc = []

    # 解析工具列表并生成描述
    for tool in available_tools:
        func = tool["function"]
        name = func["name"]
        desc = func["description"]
        params = func["parameters"]["properties"]

        # 生成參數(shù)描述
        param_desc = []
        for param_name, param_info in params.items():
            param_desc.append(
                f"{param_name} ({param_info.get('type', 'string')}): "
                f"{param_info.get('description', 'No description')}"
            )

        tools_desc.append(
            f"工具名稱: {name}\n"
            f"描述: {desc}\n"
            f"參數(shù): {', '.join(param_desc)}\n"
            "---"
        )

    # 構(gòu)建完整的系統(tǒng)提示詞
    return (
        "你是一個智能助手,可以使用以下工具解決問題。當(dāng)用戶請求需要工具時,"
        "請嚴(yán)格按以下格式響應(yīng):\n"
        'TOOL_CALL: {"tool": "工具名稱", "arguments": {"參數(shù)1": "值1", ...}}\n\n'
        "可用工具列表:\n" + "\n".join(tools_desc) + "\n\n" + "重要規(guī)則:\n"
        "1. 只有在需要時才調(diào)用工具\n"
        "2. 響應(yīng)必須包含TOOL_CALL: 前綴\n"
        "3. 不要解釋工具調(diào)用,只需輸出JSON\n"
        "4. 如果不需要工具,直接回答用戶問題"
    )


def extract_tool_call(response_content):
    """從LLM響應(yīng)中提取工具調(diào)用信息"""
    # 使用正則表達式匹配TOOL_CALL: {...} 格式
    pattern = r'TOOL_CALL:\s*(\{.*\})'
    match = re.search(pattern, response_content, re.DOTALL)
    if not match:
        return None
    jsonStr=match.group(1)
    try:
        tool_call = json.loads(jsonStr)
        return tool_call
    except json.JSONDecodeError:
        logger.error(f"JSON解析錯誤: {jsonStr}")
        return None


def process_query(mcp_server_url, reqid, query):
    """
    處理查詢,使用OpenAI API和MCP工具(系統(tǒng)提示詞方式)
    """
    # 獲取可用工具列表
    available_tools = MCPClient.list_tools(mcp_server_url, reqid)

    # 構(gòu)建系統(tǒng)提示詞
    system_prompt = build_system_prompt(available_tools)

    openai = OpenAI(api_key=API_KEY, base_url=BASE_URL)
    messages = [
        {"role": "system", "content": system_prompt},
        {"role": "user", "content": query},
    ]

    tool_calls_count = 0
    final_response = None

    while tool_calls_count < MAX_TOOL_CALLS:
        try:
            # 調(diào)用OpenAI API
            response = openai.chat.completions.create(
                model=MODEL_NAME,
                messages=messages,
                stream=False,
            )

            # 獲取響應(yīng)內(nèi)容
            response_content = response.choices[0].message.content
            # logger.info(f"LLM原始響應(yīng): {response_content}")

            # 檢查是否需要調(diào)用工具
            tool_call = extract_tool_call(response_content)

            if tool_call:
                # 處理工具調(diào)用
                tool_name = tool_call["tool"]
                tool_args = tool_call["arguments"]

                call_info = f"[調(diào)用工具 {tool_name} 參數(shù): {tool_args}]"
                logger.info(call_info)

                # 執(zhí)行工具調(diào)用
                result = MCPClient.call_tool(
                    mcp_server_url, tool_name, tool_args, reqid
                )

                # 處理工具響應(yīng)
                tool_response = ""
                if result and hasattr(result, "content") and result.content:
                    tool_response = result.content[0].text
                else:
                    tool_response = "工具返回空響應(yīng)或無效響應(yīng)"

                logger.info(f"工具響應(yīng): {tool_response}")

                # 添加到消息歷史
                messages.append({"role": "assistant", "content": response_content})
                messages.append(
                    {
                        "role": "user",
                        "content": f"工具調(diào)用結(jié)果: {tool_response}\n\n請根據(jù)此結(jié)果回答用戶問題",
                    }
                )

                tool_calls_count += 1
            else:
                # 沒有工具調(diào)用,直接返回結(jié)果
                final_response = response_content
                break

        except Exception as e:
            logger.error(f"處理錯誤: {e}\n{traceback.format_exc()}")
            final_response = "處理查詢時出錯"
            break

    # 如果達到最大調(diào)用次數(shù)仍未獲得最終響應(yīng)
    if not final_response:
        final_response = "已達到最大工具調(diào)用次數(shù)。" "最后響應(yīng): " + response_content

    return final_response


def chat_loop(mcp_server_url):
    """Run an interactive chat loop"""
    logger.info("\nMCP Client Started!")
    logger.info("Type your queries or 'quit' to exit.")

    while True:
        try:
            query = input("\nQuery: ").strip()
            if not query:  # 如果輸入為空,直接跳過
                continue
            if query.lower() == "quit":
                break
            random_uuid = uuid.uuid4()
            response = process_query(mcp_server_url, random_uuid, query)
            logger.info(response)
        except Exception as e:
            logger.error(f"\nchat_loop Error: {e}\n{traceback.format_exc()}")


# chat_loop 和 __main__ 部分保持不變(與原始代碼相同)
if __name__ == "__main__":
#    tool_call= extract_tool_call('TOOL_CALL: {"tool": "multiply", "arguments": {"a": 3, "b": 77}}')
#    logger.info(tool_call)
    parser = argparse.ArgumentParser(description="mcp client call tool")
    parser.add_argument(
        "--url", required=True, help="Full URL, e.g. http://localhost:8114/sse"
    )
    args = parser.parse_args()
    # query = "你是誰?"
    # result = process_query("http://localhost:8114/sse", "mock reqid", query)
    # logger.info(f"Query result: {result}")
    chat_loop(args.url)
image-2025-8-1_14-10-50.png

image-2025-8-1_14-11-19.png

總結(jié)

1,MCP HTTP+SSE傳輸方式官方已經(jīng)廢棄,后續(xù)主要使用StreamableHTTP傳輸方式。
2,MCP StreamableHTTP提供了stateless_http和json_response兩個重要參數(shù)細化對的AI不同場景處理能力。
3,不是所有的大模型都支持FunctionCall的方式,還有一種通用的方式是走系統(tǒng)提示詞(system prompt),大體思路和FunctionCall差不多,只是系統(tǒng)提示詞(system prompt)需要LLM打模型按指定的格式返回MCPServer的名稱Tool以及args參數(shù)。
4,通過上述代碼可知,在在AIAgent的整個調(diào)用流程中,LLM大模型只做推理,真正的ToolCall還是AIAgent角色,LLM大模型會返回需要調(diào)用的MCPServer、調(diào)用的Tool、調(diào)用的參數(shù)給到AIAgent,AIAgent在使用MCPClient調(diào)用MCPServer,在將返回的結(jié)果給到LLM做下一步的推理,重復(fù)上述過程,直到LLM認(rèn)為任務(wù)結(jié)束。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容