cover_image

AI王炸:MCP服务端客户端的完整实现

空山雪林 Sumslack团队
2025年04月16日 11:46

概述

试想一下,如果要想在现有应用上构建,让AI读取引用我们功能和数据,该怎么办,比如询问某个城市的天气,我们希望AI能调用天气函数返回相应结果,这时MCP (Model Context Protocol)就可以派上用场了,它相当于我们电脑的USB-C接口,提供了一个标准方式让AI模型连接不同的应用和工具。 我们可以建一个MUP Server来处理这类业务,比如市面上已有各类MUP Server,比较典型的高德地图MCP,除此之外,还有旅行交通的 AirbnbMCPServer,提供房源问询,版本控制的 gitlab-mr-mcp,工具类mcp-openai,开发类 mcp-server-and-gw等,更多工具可查看:https://github.com/punkpeye/awesome-mcp-servers/blob/main/README-zh.md#%E6%9C%8D%E5%8A%A1%E5%99%A8%E5%AE%9E%E7%8E%B0

图片

MCP 服务器的职能变得非常容易理解:即遵循 MCP 协议来暴露其可提供的 Resources、Tools 或 Prompts:

  • Resources:结构化数据(如文件、API 响应)
  • Tools:可执行函数(如查询数据库、发送邮件)
  • Prompts:预定义的交互模板

MCP与Function Calling的区别:

图片

开发一个MCP Server

使用Python为例。

  • 安装uv
  1. windows
  2. powershell -ExecutionPolicy ByPass -"irm https://astral.sh/uv/install.ps1 | iex"
  3. linux/mac
  4. curl -LsSf https://astral.sh/uv/install.sh | sh
  • 安装依赖
  1. pip install mcp
  2. pip install mcp[cli]
  3. pip install httpx==0.27
  • 编写代码:非常简单,将普通的py函数添加一个注解即可。
  1. import os
  2. from mcp.server.fastmcp import FastMCP
  3. mcp = FastMCP("债券服务Demo")
  4. @mcp.tool()
  5. def filterByRate(a:str) -> list:
  6.     """根据债券评级的条件筛选债券"""
  7.     print('债券评级过滤条件:',a)
  8.     return ["债券A","债券B","债券C"]
  9. @mcp.tool()
  10. def filterByType(t:str) -> list:
  11.     """根据债券类型的条件筛选债券"""
  12.     print('债券类型过滤条件:',t)
  13.     return ["债券A","债券C","债券D"]
  14. @mcp.tool()
  15. def filterByBidRange(a1:float,a2:float) -> list:
  16.     """根据债券bid收益率区间筛选债券"""
  17.     print('债券类型过滤条件:',a1,a2)
  18.     return ["债券A2","债券C2","债券D2"]
  19. @mcp.tool()
  20. def filterResult(**kwargs) -> list:
  21.     """获取所有符合条件的债券结果"""
  22.     print('filterResult:',kwargs)
  23.     return ["债券A111","债券C222","债券D3333"]
  24. if __name__ == "__main__":
  25.     # mcp.run(transport='stdio')
  26.     mcp.run(transport='sse')
  • 启动服务: mcp dev.\mcp\hello.py

客户端集成

工具集成

以 CherryStudio为例,其他类型,添加一个: MCPServer,问答时选中MCP Server即可.

图片
图片

代码集成

程序集成:编写一个客户端程序即可,我们以本地ollama部署的qwen为例,代码如下:

  1. client_mcp.py
  2. import asyncio
  3. import json
  4. import sys
  5. import time
  6. from typing import Optional
  7. from contextlib import AsyncExitStack
  8. from mcp.client.sse import sse_client
  9. from mcp import ClientSession, StdioServerParameters
  10. from mcp.client.stdio import stdio_client
  11. from openai import AsyncOpenAI
  12. class MCPClient:
  13.     def __init__(self):
  14.         # Initialize session and client objects
  15.         self.session: Optional[ClientSession] = None
  16.         self.exit_stack = AsyncExitStack()
  17.         self.client = AsyncOpenAI(
  18.             api_key="test",
  19.             base_url="http://ollama地址:11434/v1"
  20.         )
  21.     async def connect_to_server(self, server_script_path: str):
  22.         """Connect to an MCP server
  23.         Args:
  24.             server_script_path: Path to the server script (.py or .js)
  25.         """
  26.         is_python = server_script_path.endswith(".py")
  27.         is_js = server_script_path.endswith(".js")
  28.         if not (is_python or is_js):
  29.             raise ValueError("Server script must be a .py or .js file")
  30.         command = "python" if is_python else "node"
  31.         server_params = StdioServerParameters(
  32.             command=command, args=[server_script_path], env=None
  33.         )
  34.         stdio_transport = await self.exit_stack.enter_async_context(
  35.             stdio_client(server_params)
  36.         )
  37.         self.stdio, self.write = stdio_transport
  38.         self.session = await self.exit_stack.enter_async_context(
  39.             ClientSession(self.stdio, self.write)
  40.         )
  41.         await self.session.initialize()
  42.         # List available tools
  43.         response = await self.session.list_tools()
  44.         tools = response.tools
  45.         print("\nConnected to server with tools:", [tool.name for tool in tools])
  46.     async def connect_to_sse_server(self, server_url: str):
  47.         """Connect to an MCP server
  48.         Args:
  49.             server_script_path: Path to the server script (.py or .js)
  50.         """
  51.         self._streams_context = sse_client(url=server_url)
  52.         streams = await self._streams_context.__aenter__()
  53.         self._session_context = ClientSession(*streams)
  54.         self.session = await self._session_context.__aenter__()
  55.         await self.session.initialize()
  56.         # List available tools
  57.         response = await self.session.list_tools()
  58.         tools = response.tools
  59.         print("\nConnected to server with tools:", [tool.name for tool in tools])
  60.     async def process_query(self, query: str) -> str:
  61.         """使用 LLM 和 MCP 服务器提供的工具处理查询"""
  62.         messages = [
  63.             {
  64.                 "role": "user",
  65.                 "content": query
  66.             }
  67.         ]
  68.         response = await self.session.list_tools()
  69.         available_tools = [{
  70.             "type": "function",
  71.             "function": {
  72.                 "name": tool.name,
  73.                 "description": tool.description,
  74.                 "parameters": tool.inputSchema
  75.             }
  76.         } for tool in response.tools]
  77.         # 初始化 LLM API 调用
  78.         response = await self.client.chat.completions.create(
  79.             model="qwen2.5:14b",
  80.             messages=messages,
  81.             tools=available_tools  # 将工具列表传递给 LLM
  82.         )
  83.         final_text = []
  84.         message = response.choices[0].message
  85.         print(response.choices[0])
  86.         final_text.append(message.content or "")
  87.         # 处理响应并处理工具调用
  88.         if message.tool_calls:
  89.             # 处理每个工具调用
  90.             for tool_call in message.tool_calls:
  91.                 tool_name = tool_call.function.name
  92.                 tool_args = json.loads(tool_call.function.arguments)
  93.                 # 执行工具调用
  94.                 start_time = time.time()
  95.                 result = await self.session.call_tool(tool_name, tool_args)
  96.                 end_time = time.time()
  97.                 print(f"Tool {tool_name} took {end_time - start_time} seconds to execute")
  98.                 final_text.append(f"[Calling tool {tool_name} with args {tool_args}]")
  99.                 # 将工具调用和结果添加到消息历史
  100.                 messages.append({
  101.                     "role": "assistant",
  102.                     "tool_calls": [
  103.                         {
  104.                             "id": tool_call.id,
  105.                             "type": "function",
  106.                             "function": {
  107.                                 "name": tool_name,
  108.                                 "arguments": json.dumps(tool_args)
  109.                             }
  110.                         }
  111.                     ]
  112.                 })
  113.                 messages.append({
  114.                     "role": "tool",
  115.                     "tool_call_id": tool_call.id,
  116.                     "content": str(result.content)
  117.                 })
  118.             # 将工具调用的结果交给 LLM
  119.             response = await self.client.chat.completions.create(
  120.                 model="qwen2.5:14b",
  121.                 messages=messages,
  122.                 tools=available_tools
  123.             )
  124.             message = response.choices[0].message
  125.             if message.content:
  126.                 final_text.append(message.content)
  127.         return "\n".join(final_text)
  128.     async def chat_loop(self):
  129.         """Run an interactive chat loop"""
  130.         print("\nMCP Client Started!")
  131.         print("Type your queries or 'quit' to exit.")
  132.         while True:
  133.             try:
  134.                 query = input("\nQuery: ").strip()
  135.                 if query.lower() == 'quit':
  136.                     break
  137.                 response = await self.process_query(query)
  138.                 print("\n" + response)
  139.             except Exception as e:
  140.                 print(f"\nError: {str(e)}")
  141.     async def cleanup(self):
  142.         """Clean up resources"""
  143.         await self.exit_stack.aclose()
  144. main.py
  145. import requests
  146. import json
  147. import httpx
  148. import asyncio
  149. from client_mcp import MCPClient
  150. import sys
  151. async def main():
  152.     url_server_mcp = 'http://localhost:8000/sse'
  153.     client = MCPClient()
  154.     try:
  155.         # 根据MCP Server传输协议进行选择
  156.         await client.connect_to_sse_server(url_server_mcp)
  157.         await client.chat_loop()
  158.     finally:
  159.         await client.cleanup()
  160. if __name__ == '__main__':
  161.     loop = asyncio.get_event_loop()
  162.     loop.run_until_complete(main())



欢迎关注我的公众号“Sumslack团队”,原创技术文章第一时间推送。

修改于2025年04月16日
继续滑动看下一个
Sumslack团队
向上滑动看下一个