MCP实现原理
MCP介绍
概述
MCP 遵循 客户端-服务器 架构,其中:
- 主机是LLM应用程序(如 Claude 桌面或 IDE)发起连接
- 客户端与服务器在主机应用程序内保持 1:1 连接
- 服务器为客户端提供上下文、工具和提示
核心组件
协议层
- 关键类包括
- Protocol
- Client
- Server
export declare class Protocol<SendRequestT extends Request, SendNotificationT extends Notification, SendResultT extends Result> {
private _transport?;
private _requestMessageId;
protected _requestHandlers: Map<string, (request: JSONRPCRequest) => Promise<SendResultT>>;
private _notificationHandlers;
private _responseHandlers;
private _progressHandlers;
/**
* Callback for when the connection is closed for any reason.
*
* This is invoked when close() is called as well.
*/
onclose?: () => void;
/**
* Callback for when an error occurs.
*
* Note that errors are not necessarily fatal; they are used for reporting any kind of exceptional condition out of band.
*/
onerror?: (error: Error) => void;
/**
* A handler to invoke for any request types that do not have their own handler installed.
*/
fallbackRequestHandler?: (request: Request) => Promise<SendResultT>;
/**
* A handler to invoke for any notification types that do not have their own handler installed.
*/
fallbackNotificationHandler?: (notification: Notification) => Promise<void>;
constructor();
/**
* Attaches to the given transport, starts it, and starts listening for messages.
*
* The Protocol object assumes ownership of the Transport, replacing any callbacks that have already been set, and expects that it is the only user of the Transport instance going forward.
*/
connect(transport: Transport): Promise<void>;
private _onclose;
private _onerror;
private _onnotification;
private _onrequest;
private _onprogress;
private _onresponse;
get transport(): Transport | undefined;
/**
* Closes the connection.
*/
close(): Promise<void>;
/**
* Sends a request and wait for a response, with optional progress notifications in the meantime (if supported by the server).
*
* Do not use this method to emit notifications! Use notification() instead.
*/
request<T extends ZodType<object>>(request: SendRequestT, resultSchema: T, onprogress?: ProgressCallback): Promise<z.infer<T>>;
/**
* Emits a notification, which is a one-way message that does not expect a response.
*/
notification(notification: SendNotificationT): Promise<void>;
/**
* Registers a handler to invoke when this protocol object receives a request with the given method.
*
* Note that this will replace any previous request handler for the same method.
*/
setRequestHandler<T extends ZodObject<{
method: ZodLiteral<string>;
}>>(requestSchema: T, handler: (request: z.infer<T>) => SendResultT | Promise<SendResultT>): void;
/**
* Removes the request handler for the given method.
*/
removeRequestHandler(method: string): void;
/**
* Registers a handler to invoke when this protocol object receives a notification with the given method.
*
* Note that this will replace any previous notification handler for the same method.
*/
setNotificationHandler<T extends ZodObject<{
method: ZodLiteral<string>;
}>>(notificationSchema: T, handler: (notification: z.infer<T>) => void | Promise<void>): void;
/**
* Removes the notification handler for the given method.
*/
removeNotificationHandler(method: string): void;
}
export declare class Server<RequestT extends Request = Request, NotificationT extends Notification = Notification, ResultT extends Result = Result> extends Protocol<ServerRequest | RequestT, ServerNotification | NotificationT, ServerResult | ResultT> {
private _serverInfo;
private _clientCapabilities?;
private _clientVersion?;
/**
* Callback for when initialization has fully completed (i.e., the client has sent an `initialized` notification).
*/
oninitialized?: () => void;
/**
* Initializes this server with the given name and version information.
*/
constructor(_serverInfo: Implementation);
private _oninitialize;
/**
* After initialization has completed, this will be populated with the client's reported capabilities.
*/
getClientCapabilities(): ClientCapabilities | undefined;
/**
* After initialization has completed, this will be populated with information about the client's name and version.
*/
getClientVersion(): Implementation | undefined;
private getCapabilities;
ping(): Promise<{
_meta?: import("zod").objectOutputType<{}, import("zod").ZodTypeAny, "passthrough"> | undefined;
}>;
createMessage(params: CreateMessageRequest["params"], onprogress?: ProgressCallback): Promise<import("zod").objectOutputType<import("zod").objectUtil.extendShape<{
_meta: import("zod").ZodOptional<import("zod").ZodObject<{}, "passthrough", import("zod").ZodTypeAny, import("zod").objectOutputType<{}, import("zod").ZodTypeAny, "passthrough">, import("zod").objectInputType<{}, import("zod").ZodTypeAny, "passthrough">>>;
}, {
model: import("zod").ZodString;
stopReason: import("zod").ZodOptional<import("zod").ZodUnion<[import("zod").ZodEnum<["endTurn", "stopSequence", "maxTokens"]>, import("zod").ZodString]>>;
role: import("zod").ZodEnum<["user", "assistant"]>;
content: import("zod").ZodDiscriminatedUnion<"type", [import("zod").ZodObject<{
type: import("zod").ZodLiteral<"text">;
text: import("zod").ZodString;
}, "passthrough", import("zod").ZodTypeAny, import("zod").objectOutputType<{
type: import("zod").ZodLiteral<"text">;
text: import("zod").ZodString;
}, import("zod").ZodTypeAny, "passthrough">, import("zod").objectInputType<{
type: import("zod").ZodLiteral<"text">;
text: import("zod").ZodString;
}, import("zod").ZodTypeAny, "passthrough">>, import("zod").ZodObject<{
type: import("zod").ZodLiteral<"image">;
data: import("zod").ZodString;
mimeType: import("zod").ZodString;
}, "passthrough", import("zod").ZodTypeAny, import("zod").objectOutputType<{
type: import("zod").ZodLiteral<"image">;
data: import("zod").ZodString;
mimeType: import("zod").ZodString;
}, import("zod").ZodTypeAny, "passthrough">, import("zod").objectInputType<{
type: import("zod").ZodLiteral<"image">;
data: import("zod").ZodString;
mimeType: import("zod").ZodString;
}, import("zod").ZodTypeAny, "passthrough">>]>;
}>, import("zod").ZodTypeAny, "passthrough">>;
listRoots(params?: ListRootsRequest["params"], onprogress?: ProgressCallback): Promise<import("zod").objectOutputType<import("zod").objectUtil.extendShape<{
_meta: import("zod").ZodOptional<import("zod").ZodObject<{}, "passthrough", import("zod").ZodTypeAny, import("zod").objectOutputType<{}, import("zod").ZodTypeAny, "passthrough">, import("zod").objectInputType<{}, import("zod").ZodTypeAny, "passthrough">>>;
}, {
roots: import("zod").ZodArray<import("zod").ZodObject<{
uri: import("zod").ZodString;
name: import("zod").ZodOptional<import("zod").ZodString>;
}, "passthrough", import("zod").ZodTypeAny, import("zod").objectOutputType<{
uri: import("zod").ZodString;
name: import("zod").ZodOptional<import("zod").ZodString>;
}, import("zod").ZodTypeAny, "passthrough">, import("zod").objectInputType<{
uri: import("zod").ZodString;
name: import("zod").ZodOptional<import("zod").ZodString>;
}, import("zod").ZodTypeAny, "passthrough">>, "many">;
}>, import("zod").ZodTypeAny, "passthrough">>;
sendLoggingMessage(params: LoggingMessageNotification["params"]): Promise<void>;
sendResourceUpdated(params: ResourceUpdatedNotification["params"]): Promise<void>;
sendResourceListChanged(): Promise<void>;
sendToolListChanged(): Promise<void>;
sendPromptListChanged(): Promise<void>;
}
export class Client extends Protocol {
/**
* Initializes this client with the given name and version information.
*/
constructor(_clientInfo) {
super();
this._clientInfo = _clientInfo;
}
async connect(transport) {
await super.connect(transport);
const result = await this.request({
method: "initialize",
params: {
protocolVersion: LATEST_PROTOCOL_VERSION,
capabilities: {},
clientInfo: this._clientInfo,
},
}, InitializeResultSchema);
if (result === undefined) {
throw new Error(`Server sent invalid initialize result: ${result}`);
}
if (!SUPPORTED_PROTOCOL_VERSIONS.includes(result.protocolVersion)) {
throw new Error(`Server's protocol version is not supported: ${result.protocolVersion}`);
}
this._serverCapabilities = result.capabilities;
this._serverVersion = result.serverInfo;
await this.notification({
method: "notifications/initialized",
});
}
/**
* After initialization has completed, this will be populated with the server's reported capabilities.
*/
getServerCapabilities() {
return this._serverCapabilities;
}
/**
* After initialization has completed, this will be populated with information about the server's name and version.
*/
getServerVersion() {
return this._serverVersion;
}
async ping() {
return this.request({ method: "ping" }, EmptyResultSchema);
}
async complete(params, onprogress) {
return this.request({ method: "completion/complete", params }, CompleteResultSchema, onprogress);
}
async setLoggingLevel(level) {
return this.request({ method: "logging/setLevel", params: { level } }, EmptyResultSchema);
}
async getPrompt(params, onprogress) {
return this.request({ method: "prompts/get", params }, GetPromptResultSchema, onprogress);
}
async listPrompts(params, onprogress) {
return this.request({ method: "prompts/list", params }, ListPromptsResultSchema, onprogress);
}
async listResources(params, onprogress) {
return this.request({ method: "resources/list", params }, ListResourcesResultSchema, onprogress);
}
async listResourceTemplates(params, onprogress) {
return this.request({ method: "resources/templates/list", params }, ListResourceTemplatesResultSchema, onprogress);
}
async readResource(params, onprogress) {
return this.request({ method: "resources/read", params }, ReadResourceResultSchema, onprogress);
}
async subscribeResource(params) {
return this.request({ method: "resources/subscribe", params }, EmptyResultSchema);
}
async unsubscribeResource(params) {
return this.request({ method: "resources/unsubscribe", params }, EmptyResultSchema);
}
async callTool(params, resultSchema = CallToolResultSchema, onprogress) {
return this.request({ method: "tools/call", params }, resultSchema, onprogress);
}
async listTools(params, onprogress) {
return this.request({ method: "tools/list", params }, ListToolsResultSchema, onprogress);
}
async sendRootsListChanged() {
return this.notification({ method: "notifications/roots/list_changed" });
}
}
传输层
- Stdio transport 标准输入输出传输
- 使用标准输入/输出进行通信
- 非常适合本地流程
- SSE transport 使用 SSE 传输
- 使用服务器发送事件执行服务器到客户端的消息
- HTTP POST 用于客户端到服务器的消息
- 所有传输都使用 JSON-RPC 2.0 来交换消息。请参阅规范以获取有关模型上下文协议消息格式的详细信息。
import { JSONRPCMessage } from "../types.js";
import { Transport } from "../shared/transport.js";
export type StdioServerParameters = {
/**
* The executable to run to start the server.
*/
command: string;
/**
* Command line arguments to pass to the executable.
*/
args?: string[];
/**
* The environment to use when spawning the process.
*
* If not specified, the result of getDefaultEnvironment() will be used.
*/
env?: Record<string, string>;
};
/**
* Environment variables to inherit by default, if an environment is not explicitly given.
*/
export declare const DEFAULT_INHERITED_ENV_VARS: string[];
/**
* Returns a default environment object including only environment variables deemed safe to inherit.
*/
export declare function getDefaultEnvironment(): Record<string, string>;
/**
* Client transport for stdio: this will connect to a server by spawning a process and communicating with it over stdin/stdout.
*
* This transport is only available in Node.js environments.
*/
export declare class StdioClientTransport implements Transport {
private _process?;
private _abortController;
private _readBuffer;
private _serverParams;
onclose?: () => void;
onerror?: (error: Error) => void;
onmessage?: (message: JSONRPCMessage) => void;
constructor(server: StdioServerParameters);
/**
* Starts the server process and prepares to communicate with it.
*/
start(): Promise<void>;
private processReadBuffer;
close(): Promise<void>;
send(message: JSONRPCMessage): Promise<void>;
}
//# sourceMappingURL=stdio.d.ts.map
import { Transport } from "../shared/transport.js";
import { JSONRPCMessage } from "../types.js";
/**
* Client transport for WebSocket: this will connect to a server over the WebSocket protocol.
*/
export declare class WebSocketClientTransport implements Transport {
private _socket?;
private _url;
onclose?: () => void;
onerror?: (error: Error) => void;
onmessage?: (message: JSONRPCMessage) => void;
constructor(url: URL);
start(): Promise<void>;
close(): Promise<void>;
send(message: JSONRPCMessage): Promise<void>;
}
//# sourceMappingURL=websocket.d.ts.map
import { Transport } from "../shared/transport.js";
import { JSONRPCMessage } from "../types.js";
/**
* Client transport for SSE: this will connect to a server using Server-Sent Events for receiving
* messages and make separate POST requests for sending messages.
*
* This uses the EventSource API in browsers. You can install the `eventsource` package for Node.js.
*/
export declare class SSEClientTransport implements Transport {
private _eventSource?;
private _endpoint?;
private _abortController?;
private _url;
onclose?: () => void;
onerror?: (error: Error) => void;
onmessage?: (message: JSONRPCMessage) => void;
constructor(url: URL);
start(): Promise<void>;
close(): Promise<void>;
send(message: JSONRPCMessage): Promise<void>;
}
//# sourceMappingURL=sse.d.ts.map
消息类型
- Requests 请求期望从另一端得到响应
interface Request { method: string; params?: { }; }
- Notifications 通知是一方向的消息,不需要回复 ```ts
interface Notification { method: string; params?: { }; }
- Results 结果是请求的成功响应:
```ts
interface Result {
[key: string]: unknown;
}
- Errors 错误表明请求失败: ```ts interface Error { code: number; message: string; data?: unknown; } enum ErrorCode { // Standard JSON-RPC error codes ParseError = -32700, InvalidRequest = -32600, MethodNotFound = -32601, InvalidParams = -32602, InternalError = -32603 }
SDKs 和应用程序可以定义自己的错误代码,大于-32000。 错误通过以下方式传播:
- 错误响应请求
- 传输错误事件
- 协议级错误处理器
### 连接生命周期
1. 初始化
- 客户端发送带有协议版本和功能的 initialize 请求
- 服务器响应其协议版本和功能
- 客户端发送 initialized 通知作为确认
- 普通消息交换开始
2. 消息交换
- 请求-响应:客户端或服务器发送请求,另一方响应
- 通知:任何一方发送单向消息
3. 终止
- 任何一方都可以终止连接:
- 通过 close() 进行干净关闭
- 网络端口
- 出现错误
## Inspector 检查员
MCP 检查员是一个用于测试和调试 MCP 服务器的交互式开发工具。
检查员直接运行 npx ,无需安装:
```shell
npx -y @modelcontextprotocol/inspector npx server-postgres postgresql://postgres:123456@localhost:5432/postgres
服务器连接面板
- 允许选择连接到服务器的传输方式
- 对于本地服务器,支持自定义命令行参数和环境变量
资源标签
- 列出所有可用资源
- 显示资源元数据(MIME 类型、描述)
- 允许资源内容检查
- 支持订阅测试
提示标签
- 显示可用的提示模板
- 显示提示参数和描述
- 启用带自定义参数的快速测试
- 预览生成的消息
工具标签
- 工具列表
- 显示工具架构和描述
- 启用具有自定义输入的工具测试
- 显示工具执行结果
通知面板
- 展示服务器记录的所有日志
- 显示来自服务器的通知
MCP工程实践
zed插件实现
use serde::Deserialize;
use std::env;
use zed::settings::ContextServerSettings;
use zed_extension_api::{self as zed, serde_json, Command, ContextServerId, Project, Result};
const PACKAGE_NAME: &str = "@zeddotdev/postgres-context-server";
const PACKAGE_VERSION: &str = "0.1.2";
const SERVER_PATH: &str = "node_modules/@zeddotdev/postgres-context-server/index.mjs";
//定义了主扩展结构
struct PostgresModelContextExtension;
//定义了配置结构
#[derive(Debug, Deserialize)]
struct PostgresContextServerSettings {
database_url: String,
}
//扩展实现
impl zed::Extension for PostgresModelContextExtension {
fn new() -> Self {
Self
}
//主要逻辑
fn context_server_command(
&mut self,
_context_server_id: &ContextServerId,
project: &Project,
) -> Result<Command> {
//包管理
let version = zed::npm_package_installed_version(PACKAGE_NAME)?;
if version.as_deref() != Some(PACKAGE_VERSION) {
//检查并确保正确版本的 Node.js 包已安装
zed::npm_install_package(PACKAGE_NAME, PACKAGE_VERSION)?;
}
//配置管理
let settings = ContextServerSettings::for_project("postgres-context-server", project)?;
let Some(settings) = settings.settings else {
//验证必要的 `database_url` 设置
return Err("missing `database_url` setting".into());
};
let settings: PostgresContextServerSettings =
serde_json::from_value(settings).map_err(|e| e.to_string())?;
//启动上下文服务器
Ok(Command {
//创建命令来启动 Node.js 服务器
command: "node".to_string(),
args: vec![
env::current_dir()
.unwrap()
.join(SERVER_PATH)
.to_string_lossy()
.to_string(),
],
env: vec![
//传递数据库 URL 作为环境变量
("DATABASE_URL".into(), settings.database_url)
],
})
}
}
zed::register_extension!(PostgresModelContextExtension);
node.js 服务器
#!/usr/bin/env node
// 指定使用 Node.js 运行此脚本
// 导入必要的依赖包
import pg from "pg";// 导入 PostgreSQL 客户端
//https://github.com/modelcontextprotocol/typescript-sdk MCP SDK
import { Server } from "@modelcontextprotocol/sdk/server/index.js";// 导入 MCP 服务器
import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";// 导入标准输入输出传输
// 导入各种请求模式定义
import {
CallToolRequestSchema, // 调用工具请求模式
ListResourcesRequestSchema, // 列出资源请求模式
ListPromptsRequestSchema, // 列出提示请求模式
ListToolsRequestSchema, // 列出工具请求模式
ReadResourceRequestSchema, // 读取资源请求模式
GetPromptRequestSchema, // 获取提示请求模式
CompleteRequestSchema, // 完成请求模式
} from "@modelcontextprotocol/sdk/types.js";
// 创建新的 MCP 服务器实例
const server = new Server({
name: "postgres-context-server", // 服务器名称
version: "0.1.0", // 版本号
});
// 获取数据库连接 URL
const databaseUrl = process.env.DATABASE_URL;
// 验证数据库 URL 是否有效
if (typeof databaseUrl == null || databaseUrl.trim().length === 0) {
console.error("Please provide a DATABASE_URL environment variable"); // 输出错误信息
process.exit(1); // 退出程序
}
// 创建资源基础 URL
const resourceBaseUrl = new URL(databaseUrl);
resourceBaseUrl.protocol = "postgres:"; // 设置协议
resourceBaseUrl.password = ""; // 清空密码
// 输出服务器启动信息
process.stderr.write("starting server. url: " + databaseUrl + "\n");
// 创建数据库连接池
const pool = new pg.Pool({
connectionString: databaseUrl,
});
// 定义常量
const SCHEMA_PATH = "schema"; // schema 路径
const SCHEMA_PROMPT_NAME = "pg-schema"; // schema 提示名称
const ALL_TABLES = "all-tables"; // 所有表标识符
// 处理列出资源请求
// 工作流程:
// - 连接数据库
// - 查询所有公共 schema 的表
// - 为每个表构建资源 URI
// - 返回资源列表
// - 释放连接
server.setRequestHandler(ListResourcesRequestSchema, async () => {
const client = await pool.connect(); // 获取数据库连接
try {
// 查询所有公共schema的表
const result = await client.query(
"SELECT table_name FROM information_schema.tables WHERE table_schema = 'public'",
);
return {
resources: result.rows.map((row) => ({
uri: new URL(`${row.table_name}/${SCHEMA_PATH}`, resourceBaseUrl).href, // 构建资源URI
mimeType: "application/json", // 设置MIME类型
name: `"${row.table_name}" database schema`, // 设置资源名称
})),
};
} finally {
client.release(); // 释放数据库连接
}
});
// 处理读取资源请求
// 工作流程:
// - 解析请求的资源 URI
// - 提取表名和 schema 信息
// - 查询指定表的列信息
// - 返回 JSON 格式的列信息
// - 释放数据库连接
server.setRequestHandler(ReadResourceRequestSchema, async (request) => {
const resourceUrl = new URL(request.params.uri); // 解析资源URL
// 解析路径组件
const pathComponents = resourceUrl.pathname.split("/");
const schema = pathComponents.pop();
const tableName = pathComponents.pop();
// 验证schema路径
if (schema !== SCHEMA_PATH) {
throw new Error("Invalid resource URI");
}
const client = await pool.connect(); // 获取数据库连接
try {
// 查询表的列信息
const result = await client.query(
"SELECT column_name, data_type FROM information_schema.columns WHERE table_name = $1",
[tableName],
);
return {
contents: [
{
uri: request.params.uri, // 资源URI
mimeType: "application/json", // MIME类型
text: JSON.stringify(result.rows, null, 2), // 格式化JSON输出
},
],
};
} finally {
client.release(); // 释放数据库连接
}
});
// 处理列出工具请求
// 工作流程:
// - 返回可用工具列表
// - 当前只提供一个 "query" 工具,用于执行只读 SQL 查询
server.setRequestHandler(ListToolsRequestSchema, async () => {
return {
tools: [
{
name: "query", // 工具名称
description: "Run a read-only SQL query", // 工具描述
inputSchema: {
type: "object", // 输入模式类型
properties: {
sql: { type: "string" }, // SQL查询字符串
},
},
},
],
};
});
// 处理调用工具请求
// 工作流程:
// - 验证工具名称
// - 开始只读事务
// - 执行 SQL 查询
// - 返回查询结果
// - 回滚事务并释放连接
server.setRequestHandler(CallToolRequestSchema, async (request) => {
if (request.params.name === "query") {
const sql = request.params.arguments?.sql; // 获取SQL查询
const client = await pool.connect(); // 获取数据库连接
try {
await client.query("BEGIN TRANSACTION READ ONLY"); // 开始只读事务
const result = await client.query(sql); // 执行查询
return { toolResult: result.rows }; // 返回查询结果
} catch (error) {
throw error; // 抛出错误
} finally {
// 回滚事务
client
.query("ROLLBACK")
.catch((error) =>
console.warn("Could not roll back transaction:", error),
);
client.release(); // 释放数据库连接
}
}
throw new Error("Tool not found"); // 工具未找到错误
});
// 处理自动完成请求
// 工作流程:
// - 验证提示名称
// - 检查是否已有参数
// - 查询所有表名
// - 返回可用的表名列表作为自动完成选项
server.setRequestHandler(CompleteRequestSchema, async (request) => {
process.stderr.write("Handling completions/complete request\n"); // 输出调试信息
if (request.params.ref.name === SCHEMA_PROMPT_NAME) {
const tableNameQuery = request.params.argument.value; // 获取表名查询
const alreadyHasArg = /\S*\s/.test(tableNameQuery); // 检查是否已有参数
// 如果已有参数,返回空结果
if (alreadyHasArg) {
return {
completion: {
values: [],
},
};
}
const client = await pool.connect(); // 获取数据库连接
try {
// 查询所有表名
const result = await client.query(
"SELECT table_name FROM information_schema.tables WHERE table_schema = 'public'",
);
const tables = result.rows.map((row) => row.table_name); // 提取表名
return {
completion: {
values: [ALL_TABLES, ...tables], // 返回所有表名和特殊标识符
},
};
} finally {
client.release(); // 释放数据库连接
}
}
throw new Error("unknown prompt"); // 未知提示错误
});
// 处理列出提示请求
// 工作流程:
// - 返回可用提示列表
// - 包含 schema 提示的描述和参数信息
server.setRequestHandler(ListPromptsRequestSchema, async () => {
process.stderr.write("Handling prompts/list request\n"); // 输出调试信息
return {
prompts: [
{
name: SCHEMA_PROMPT_NAME, // 提示名称
description: "Retrieve the schema for a given table in the postgres database", // 提示描述
arguments: [
{
name: "tableName", // 参数名称
description: "the table to describe", // 参数描述
required: true, // 必填参数
},
],
},
],
};
});
// 处理获取提示请求
// 工作流程:
// - 验证表名参数
// - 查询表的详细信息
// - 生成 CREATE TABLE SQL 语句
// - 返回格式化的 SQL 代码
server.setRequestHandler(GetPromptRequestSchema, async (request) => {
process.stderr.write("Handling prompts/get request\n"); // 输出调试信息
if (request.params.name === SCHEMA_PROMPT_NAME) {
const tableName = request.params.arguments?.tableName; // 获取表名
// 验证表名
if (typeof tableName !== "string" || tableName.length === 0) {
throw new Error(`Invalid tableName: ${tableName}`);
}
const client = await pool.connect(); // 获取数据库连接
try {
// 构建基础查询
const select =
"SELECT column_name, data_type, is_nullable, column_default, table_name FROM information_schema.columns";
let result;
// 根据是否查询所有表执行不同的查询
if (tableName === ALL_TABLES) {
result = await client.query(
`${select} WHERE table_schema NOT IN ('pg_catalog', 'information_schema')`,
);
} else {
result = await client.query(
`${select} WHERE table_name = $1`, [tableName],
);
}
// 获取所有唯一表名并排序
const allTableNames = Array.from(
new Set(result.rows.map((row) => row.table_name).sort()),
);
// 构建SQL创建表语句
let sql = "```sql\n";
for (let i = 0, len = allTableNames.length; i < len; i++) {
const tableName = allTableNames[i];
if (i > 0) {
sql += "\n";
}
// 为每个表构建创建表语句
sql += [
`create table "${tableName}" (`,
result.rows
.filter((row) => row.table_name === tableName)
.map((row) => {
const notNull = row.is_nullable === "NO" ? "" : " not null"; // 处理非空约束
const defaultValue =
row.column_default != null
? ` default ${row.column_default}` // 处理默认值
: "";
return ` "${row.column_name}" ${row.data_type}${notNull}${defaultValue}`; // 构建列定义
})
.join(",\n"),
");",
].join("\n");
sql += "\n";
}
sql += "```";
// 返回提示响应
return {
description:
tableName === ALL_TABLES
? "all table schemas" // 所有表的描述
: `${tableName} schema`, // 单个表的描述
messages: [
{
role: "user",
content: {
type: "text",
text: sql, // SQL创建表语句
},
},
],
};
} finally {
client.release(); // 释放数据库连接
}
}
throw new Error(`Prompt '${request.params.name}' not implemented`); // 提示未实现错误
});
// 运行服务器的主函数
// 工作流程:
// - 创建标准输入输出传输
// - 连接服务器
// - 开始处理请求
async function runServer() {
const transport = new StdioServerTransport(); // 创建标准输入输出传输
await server.connect(transport); // 连接服务器
}
// 启动服务器并处理错误
runServer().catch((error) => {
console.error(error); // 输出错误信息
process.exit(1); // 退出程序
});
完整的请求处理流程图:
graph TD
A[客户端请求] --> B{请求类型判断}
B -->|ListResources| C[查询所有表]
B -->|ReadResource| D[查询表结构]
B -->|ListTools| E[返回工具列表]
B -->|CallTool| F[执行SQL查询]
B -->|Complete| G[提供自动完成]
B -->|ListPrompts| H[返回提示列表]
B -->|GetPrompt| I[生成Schema SQL]
C --> J[返回响应]
D --> J
E --> J
F --> J
G --> J
H --> J
I --> J
J --> K[释放资源]
K --> L[结束请求]
每个处理函数都遵循以下通用模式:
- 验证输入参数
- 获取数据库连接
- 执行必要的查询
- 处理结果
- 释放资源
- 返回响应
这种模式确保了:
- 资源的正确管理
- 错误的适当处理
- 连接的及时释放
- 响应的一致性
Zed 编辑器和 MCP 的交互流程
sequenceDiagram participant Zed as Zed Editor participant Rust as Rust Extension participant Node as Node.js Server participant PG as PostgreSQL Zed->>Rust: 启动 MCP Rust->>Node: 启动服务器 Node->>PG: 连接数据库 Node-->>Zed: 返回服务器端口 loop Context Requests Zed->>Node: 发送上下文请求 Node->>PG: 查询数据库 PG-->>Node: 返回数据 Node-->>Zed: 返回上下文信息 end
- 提供的主要信息类型:
- 数据库表结构
- 列名和数据类型
- 表之间的关系
- 约束信息
- 索引信息
- 视图定义
- 存储过程和函数
- 使用场景示例:
SELECT * FROM us| -- 光标位置
当用户在此处输入时,MCP 会:
- 检测到用户正在输入表名
- 查询数据库中的所有表
- 提供以 “us” 开头的表名建议
- 在用户选择表后,可以继续提供该表的列信息
这样的架构允许:
- 实时数据库结构信息
- 智能代码补全
- SQL 语法验证
- 表关系提示
- 字段类型检查
通过这种方式,Postgres SQL MCP 为 Zed 编辑器提供了丰富的数据库上下文信息,提高了 SQL 编写的效率和准确性。