试想一下,如果要想在现有应用上构建,让AI读取引用我们功能和数据,该怎么办,比如询问某个城市的天气,我们希望AI能调用天气函数返回相应结果,这时MCP (Model Context Protocol)就可以派上用场了,它相当于我们电脑的USB-C接口,提供了一个标准方式让AI模型连接不同的应用和工具。 我们可以建一个MUP Server来处理这类业务,比如市面上已有各类MUP Server,比较典型的高德地图MCP,除此之外,还有旅行交通的 AirbnbMCPServer
,提供房源问询,版本控制的 gitlab-mr-mcp
,工具类mcp-openai,开发类 mcp-server-and-gw
等,更多工具可查看:https://github.com/punkpeye/awesome-mcp-servers/blob/main/README-zh.md#%E6%9C%8D%E5%8A%A1%E5%99%A8%E5%AE%9E%E7%8E%B0
MCP 服务器的职能变得非常容易理解:即遵循 MCP 协议来暴露其可提供的 Resources、Tools 或 Prompts:
MCP与Function Calling的区别:
使用Python为例。
windows:
powershell -ExecutionPolicy ByPass -c "irm https://astral.sh/uv/install.ps1 | iex"
linux/mac:
curl -LsSf https://astral.sh/uv/install.sh | sh
pip install mcp
pip install mcp[cli]
pip install httpx==0.27
import os
from mcp.server.fastmcp import FastMCP
mcp = FastMCP("债券服务Demo")
@mcp.tool()
def filterByRate(a:str) -> list:
"""根据债券评级的条件筛选债券"""
print('债券评级过滤条件:',a)
return ["债券A","债券B","债券C"]
@mcp.tool()
def filterByType(t:str) -> list:
"""根据债券类型的条件筛选债券"""
print('债券类型过滤条件:',t)
return ["债券A","债券C","债券D"]
@mcp.tool()
def filterByBidRange(a1:float,a2:float) -> list:
"""根据债券bid收益率区间筛选债券"""
print('债券类型过滤条件:',a1,a2)
return ["债券A2","债券C2","债券D2"]
@mcp.tool()
def filterResult(**kwargs) -> list:
"""获取所有符合条件的债券结果"""
print('filterResult:',kwargs)
return ["债券A111","债券C222","债券D3333"]
if __name__ == "__main__":
# mcp.run(transport='stdio')
mcp.run(transport='sse')
mcp dev.\mcp\hello.py
以 CherryStudio
为例,其他类型,添加一个: MCPServer
,问答时选中MCP Server即可.
程序集成:编写一个客户端程序即可,我们以本地ollama部署的qwen为例,代码如下:
client_mcp.py
import asyncio
import json
import sys
import time
from typing import Optional
from contextlib import AsyncExitStack
from mcp.client.sse import sse_client
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
from openai import AsyncOpenAI
class MCPClient:
def __init__(self):
# Initialize session and client objects
self.session: Optional[ClientSession] = None
self.exit_stack = AsyncExitStack()
self.client = AsyncOpenAI(
api_key="test",
base_url="http://ollama地址:11434/v1"
)
async def connect_to_server(self, server_script_path: str):
"""Connect to an MCP server
Args:
server_script_path: Path to the server script (.py or .js)
"""
is_python = server_script_path.endswith(".py")
is_js = server_script_path.endswith(".js")
if not (is_python or is_js):
raise ValueError("Server script must be a .py or .js file")
command = "python" if is_python else "node"
server_params = StdioServerParameters(
command=command, args=[server_script_path], env=None
)
stdio_transport = await self.exit_stack.enter_async_context(
stdio_client(server_params)
)
self.stdio, self.write = stdio_transport
self.session = await self.exit_stack.enter_async_context(
ClientSession(self.stdio, self.write)
)
await self.session.initialize()
# List available tools
response = await self.session.list_tools()
tools = response.tools
print("\nConnected to server with tools:", [tool.name for tool in tools])
async def connect_to_sse_server(self, server_url: str):
"""Connect to an MCP server
Args:
server_script_path: Path to the server script (.py or .js)
"""
self._streams_context = sse_client(url=server_url)
streams = await self._streams_context.__aenter__()
self._session_context = ClientSession(*streams)
self.session = await self._session_context.__aenter__()
await self.session.initialize()
# List available tools
response = await self.session.list_tools()
tools = response.tools
print("\nConnected to server with tools:", [tool.name for tool in tools])
async def process_query(self, query: str) -> str:
"""使用 LLM 和 MCP 服务器提供的工具处理查询"""
messages = [
{
"role": "user",
"content": query
}
]
response = await self.session.list_tools()
available_tools = [{
"type": "function",
"function": {
"name": tool.name,
"description": tool.description,
"parameters": tool.inputSchema
}
} for tool in response.tools]
# 初始化 LLM API 调用
response = await self.client.chat.completions.create(
model="qwen2.5:14b",
messages=messages,
tools=available_tools # 将工具列表传递给 LLM
)
final_text = []
message = response.choices[0].message
print(response.choices[0])
final_text.append(message.content or "")
# 处理响应并处理工具调用
if message.tool_calls:
# 处理每个工具调用
for tool_call in message.tool_calls:
tool_name = tool_call.function.name
tool_args = json.loads(tool_call.function.arguments)
# 执行工具调用
start_time = time.time()
result = await self.session.call_tool(tool_name, tool_args)
end_time = time.time()
print(f"Tool {tool_name} took {end_time - start_time} seconds to execute")
final_text.append(f"[Calling tool {tool_name} with args {tool_args}]")
# 将工具调用和结果添加到消息历史
messages.append({
"role": "assistant",
"tool_calls": [
{
"id": tool_call.id,
"type": "function",
"function": {
"name": tool_name,
"arguments": json.dumps(tool_args)
}
}
]
})
messages.append({
"role": "tool",
"tool_call_id": tool_call.id,
"content": str(result.content)
})
# 将工具调用的结果交给 LLM
response = await self.client.chat.completions.create(
model="qwen2.5:14b",
messages=messages,
tools=available_tools
)
message = response.choices[0].message
if message.content:
final_text.append(message.content)
return "\n".join(final_text)
async def chat_loop(self):
"""Run an interactive chat loop"""
print("\nMCP Client Started!")
print("Type your queries or 'quit' to exit.")
while True:
try:
query = input("\nQuery: ").strip()
if query.lower() == 'quit':
break
response = await self.process_query(query)
print("\n" + response)
except Exception as e:
print(f"\nError: {str(e)}")
async def cleanup(self):
"""Clean up resources"""
await self.exit_stack.aclose()
main.py
import requests
import json
import httpx
import asyncio
from client_mcp import MCPClient
import sys
async def main():
url_server_mcp = 'http://localhost:8000/sse'
client = MCPClient()
try:
# 根据MCP Server传输协议进行选择
await client.connect_to_sse_server(url_server_mcp)
await client.chat_loop()
finally:
await client.cleanup()
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
欢迎关注我的公众号“Sumslack团队”,原创技术文章第一时间推送。