AI

MCP

MCP实现原理分析

Posted by Carlo on November 28, 2024

MCP实现原理

文档 仓库

MCP介绍

概述

MCP 遵循 客户端-服务器 架构,其中:

  • 主机是LLM应用程序(如 Claude 桌面或 IDE)发起连接
  • 客户端与服务器在主机应用程序内保持 1:1 连接
  • 服务器为客户端提供上下文、工具和提示

    核心组件

    协议层

  • 关键类包括
    • Protocol
    • Client
    • Server
export declare class Protocol<SendRequestT extends Request, SendNotificationT extends Notification, SendResultT extends Result> {
  private _transport?;
  private _requestMessageId;
  protected _requestHandlers: Map<string, (request: JSONRPCRequest) => Promise<SendResultT>>;
  private _notificationHandlers;
  private _responseHandlers;
  private _progressHandlers;
  /**
   * Callback for when the connection is closed for any reason.
   *
   * This is invoked when close() is called as well.
   */
  onclose?: () => void;
  /**
   * Callback for when an error occurs.
   *
   * Note that errors are not necessarily fatal; they are used for reporting any kind of exceptional condition out of band.
   */
  onerror?: (error: Error) => void;
  /**
   * A handler to invoke for any request types that do not have their own handler installed.
   */
  fallbackRequestHandler?: (request: Request) => Promise<SendResultT>;
  /**
   * A handler to invoke for any notification types that do not have their own handler installed.
   */
  fallbackNotificationHandler?: (notification: Notification) => Promise<void>;
  constructor();
  /**
   * Attaches to the given transport, starts it, and starts listening for messages.
   *
   * The Protocol object assumes ownership of the Transport, replacing any callbacks that have already been set, and expects that it is the only user of the Transport instance going forward.
   */
  connect(transport: Transport): Promise<void>;
  private _onclose;
  private _onerror;
  private _onnotification;
  private _onrequest;
  private _onprogress;
  private _onresponse;
  get transport(): Transport | undefined;
  /**
   * Closes the connection.
   */
  close(): Promise<void>;
  /**
   * Sends a request and wait for a response, with optional progress notifications in the meantime (if supported by the server).
   *
   * Do not use this method to emit notifications! Use notification() instead.
   */
  request<T extends ZodType<object>>(request: SendRequestT, resultSchema: T, onprogress?: ProgressCallback): Promise<z.infer<T>>;
  /**
   * Emits a notification, which is a one-way message that does not expect a response.
   */
  notification(notification: SendNotificationT): Promise<void>;
  /**
   * Registers a handler to invoke when this protocol object receives a request with the given method.
   *
   * Note that this will replace any previous request handler for the same method.
   */
  setRequestHandler<T extends ZodObject<{
    method: ZodLiteral<string>;
  }>>(requestSchema: T, handler: (request: z.infer<T>) => SendResultT | Promise<SendResultT>): void;
  /**
   * Removes the request handler for the given method.
   */
  removeRequestHandler(method: string): void;
  /**
   * Registers a handler to invoke when this protocol object receives a notification with the given method.
   *
   * Note that this will replace any previous notification handler for the same method.
   */
  setNotificationHandler<T extends ZodObject<{
    method: ZodLiteral<string>;
  }>>(notificationSchema: T, handler: (notification: z.infer<T>) => void | Promise<void>): void;
  /**
   * Removes the notification handler for the given method.
   */
  removeNotificationHandler(method: string): void;
}

export declare class Server<RequestT extends Request = Request, NotificationT extends Notification = Notification, ResultT extends Result = Result> extends Protocol<ServerRequest | RequestT, ServerNotification | NotificationT, ServerResult | ResultT> {
  private _serverInfo;
  private _clientCapabilities?;
  private _clientVersion?;
  /**
   * Callback for when initialization has fully completed (i.e., the client has sent an `initialized` notification).
   */
  oninitialized?: () => void;
  /**
   * Initializes this server with the given name and version information.
   */
  constructor(_serverInfo: Implementation);
  private _oninitialize;
  /**
   * After initialization has completed, this will be populated with the client's reported capabilities.
   */
  getClientCapabilities(): ClientCapabilities | undefined;
  /**
   * After initialization has completed, this will be populated with information about the client's name and version.
   */
  getClientVersion(): Implementation | undefined;
  private getCapabilities;
  ping(): Promise<{
    _meta?: import("zod").objectOutputType<{}, import("zod").ZodTypeAny, "passthrough"> | undefined;
  }>;
  createMessage(params: CreateMessageRequest["params"], onprogress?: ProgressCallback): Promise<import("zod").objectOutputType<import("zod").objectUtil.extendShape<{
    _meta: import("zod").ZodOptional<import("zod").ZodObject<{}, "passthrough", import("zod").ZodTypeAny, import("zod").objectOutputType<{}, import("zod").ZodTypeAny, "passthrough">, import("zod").objectInputType<{}, import("zod").ZodTypeAny, "passthrough">>>;
  }, {
    model: import("zod").ZodString;
    stopReason: import("zod").ZodOptional<import("zod").ZodUnion<[import("zod").ZodEnum<["endTurn", "stopSequence", "maxTokens"]>, import("zod").ZodString]>>;
    role: import("zod").ZodEnum<["user", "assistant"]>;
    content: import("zod").ZodDiscriminatedUnion<"type", [import("zod").ZodObject<{
      type: import("zod").ZodLiteral<"text">;
      text: import("zod").ZodString;
    }, "passthrough", import("zod").ZodTypeAny, import("zod").objectOutputType<{
      type: import("zod").ZodLiteral<"text">;
      text: import("zod").ZodString;
    }, import("zod").ZodTypeAny, "passthrough">, import("zod").objectInputType<{
      type: import("zod").ZodLiteral<"text">;
      text: import("zod").ZodString;
    }, import("zod").ZodTypeAny, "passthrough">>, import("zod").ZodObject<{
      type: import("zod").ZodLiteral<"image">;
      data: import("zod").ZodString;
      mimeType: import("zod").ZodString;
    }, "passthrough", import("zod").ZodTypeAny, import("zod").objectOutputType<{
      type: import("zod").ZodLiteral<"image">;
      data: import("zod").ZodString;
      mimeType: import("zod").ZodString;
    }, import("zod").ZodTypeAny, "passthrough">, import("zod").objectInputType<{
      type: import("zod").ZodLiteral<"image">;
      data: import("zod").ZodString;
      mimeType: import("zod").ZodString;
    }, import("zod").ZodTypeAny, "passthrough">>]>;
  }>, import("zod").ZodTypeAny, "passthrough">>;
  listRoots(params?: ListRootsRequest["params"], onprogress?: ProgressCallback): Promise<import("zod").objectOutputType<import("zod").objectUtil.extendShape<{
    _meta: import("zod").ZodOptional<import("zod").ZodObject<{}, "passthrough", import("zod").ZodTypeAny, import("zod").objectOutputType<{}, import("zod").ZodTypeAny, "passthrough">, import("zod").objectInputType<{}, import("zod").ZodTypeAny, "passthrough">>>;
  }, {
    roots: import("zod").ZodArray<import("zod").ZodObject<{
      uri: import("zod").ZodString;
      name: import("zod").ZodOptional<import("zod").ZodString>;
    }, "passthrough", import("zod").ZodTypeAny, import("zod").objectOutputType<{
      uri: import("zod").ZodString;
      name: import("zod").ZodOptional<import("zod").ZodString>;
    }, import("zod").ZodTypeAny, "passthrough">, import("zod").objectInputType<{
      uri: import("zod").ZodString;
      name: import("zod").ZodOptional<import("zod").ZodString>;
    }, import("zod").ZodTypeAny, "passthrough">>, "many">;
  }>, import("zod").ZodTypeAny, "passthrough">>;
  sendLoggingMessage(params: LoggingMessageNotification["params"]): Promise<void>;
  sendResourceUpdated(params: ResourceUpdatedNotification["params"]): Promise<void>;
  sendResourceListChanged(): Promise<void>;
  sendToolListChanged(): Promise<void>;
  sendPromptListChanged(): Promise<void>;
}

export class Client extends Protocol {
  /**
   * Initializes this client with the given name and version information.
   */
  constructor(_clientInfo) {
    super();
    this._clientInfo = _clientInfo;
  }
  async connect(transport) {
    await super.connect(transport);
    const result = await this.request({
      method: "initialize",
      params: {
        protocolVersion: LATEST_PROTOCOL_VERSION,
        capabilities: {},
        clientInfo: this._clientInfo,
      },
    }, InitializeResultSchema);
    if (result === undefined) {
      throw new Error(`Server sent invalid initialize result: ${result}`);
    }
    if (!SUPPORTED_PROTOCOL_VERSIONS.includes(result.protocolVersion)) {
      throw new Error(`Server's protocol version is not supported: ${result.protocolVersion}`);
    }
    this._serverCapabilities = result.capabilities;
    this._serverVersion = result.serverInfo;
    await this.notification({
      method: "notifications/initialized",
    });
  }
  /**
   * After initialization has completed, this will be populated with the server's reported capabilities.
   */
  getServerCapabilities() {
    return this._serverCapabilities;
  }
  /**
   * After initialization has completed, this will be populated with information about the server's name and version.
   */
  getServerVersion() {
    return this._serverVersion;
  }
  async ping() {
    return this.request({ method: "ping" }, EmptyResultSchema);
  }
  async complete(params, onprogress) {
    return this.request({ method: "completion/complete", params }, CompleteResultSchema, onprogress);
  }
  async setLoggingLevel(level) {
    return this.request({ method: "logging/setLevel", params: { level } }, EmptyResultSchema);
  }
  async getPrompt(params, onprogress) {
    return this.request({ method: "prompts/get", params }, GetPromptResultSchema, onprogress);
  }
  async listPrompts(params, onprogress) {
    return this.request({ method: "prompts/list", params }, ListPromptsResultSchema, onprogress);
  }
  async listResources(params, onprogress) {
    return this.request({ method: "resources/list", params }, ListResourcesResultSchema, onprogress);
  }
  async listResourceTemplates(params, onprogress) {
    return this.request({ method: "resources/templates/list", params }, ListResourceTemplatesResultSchema, onprogress);
  }
  async readResource(params, onprogress) {
    return this.request({ method: "resources/read", params }, ReadResourceResultSchema, onprogress);
  }
  async subscribeResource(params) {
    return this.request({ method: "resources/subscribe", params }, EmptyResultSchema);
  }
  async unsubscribeResource(params) {
    return this.request({ method: "resources/unsubscribe", params }, EmptyResultSchema);
  }
  async callTool(params, resultSchema = CallToolResultSchema, onprogress) {
    return this.request({ method: "tools/call", params }, resultSchema, onprogress);
  }
  async listTools(params, onprogress) {
    return this.request({ method: "tools/list", params }, ListToolsResultSchema, onprogress);
  }
  async sendRootsListChanged() {
    return this.notification({ method: "notifications/roots/list_changed" });
  }
}

传输层

  • Stdio transport 标准输入输出传输
    • 使用标准输入/输出进行通信
    • 非常适合本地流程
  • SSE transport 使用 SSE 传输
    • 使用服务器发送事件执行服务器到客户端的消息
    • HTTP POST 用于客户端到服务器的消息
  • 所有传输都使用 JSON-RPC 2.0 来交换消息。请参阅规范以获取有关模型上下文协议消息格式的详细信息。
import { JSONRPCMessage } from "../types.js";
import { Transport } from "../shared/transport.js";
export type StdioServerParameters = {
    /**
     * The executable to run to start the server.
     */
    command: string;
    /**
     * Command line arguments to pass to the executable.
     */
    args?: string[];
    /**
     * The environment to use when spawning the process.
     *
     * If not specified, the result of getDefaultEnvironment() will be used.
     */
    env?: Record<string, string>;
};
/**
 * Environment variables to inherit by default, if an environment is not explicitly given.
 */
export declare const DEFAULT_INHERITED_ENV_VARS: string[];
/**
 * Returns a default environment object including only environment variables deemed safe to inherit.
 */
export declare function getDefaultEnvironment(): Record<string, string>;
/**
 * Client transport for stdio: this will connect to a server by spawning a process and communicating with it over stdin/stdout.
 *
 * This transport is only available in Node.js environments.
 */
export declare class StdioClientTransport implements Transport {
    private _process?;
    private _abortController;
    private _readBuffer;
    private _serverParams;
    onclose?: () => void;
    onerror?: (error: Error) => void;
    onmessage?: (message: JSONRPCMessage) => void;
    constructor(server: StdioServerParameters);
    /**
     * Starts the server process and prepares to communicate with it.
     */
    start(): Promise<void>;
    private processReadBuffer;
    close(): Promise<void>;
    send(message: JSONRPCMessage): Promise<void>;
}
//# sourceMappingURL=stdio.d.ts.map

import { Transport } from "../shared/transport.js";
import { JSONRPCMessage } from "../types.js";
/**
 * Client transport for WebSocket: this will connect to a server over the WebSocket protocol.
 */
export declare class WebSocketClientTransport implements Transport {
  private _socket?;
  private _url;
  onclose?: () => void;
  onerror?: (error: Error) => void;
  onmessage?: (message: JSONRPCMessage) => void;
  constructor(url: URL);
  start(): Promise<void>;
  close(): Promise<void>;
  send(message: JSONRPCMessage): Promise<void>;
}
//# sourceMappingURL=websocket.d.ts.map

import { Transport } from "../shared/transport.js";
import { JSONRPCMessage } from "../types.js";
/**
 * Client transport for SSE: this will connect to a server using Server-Sent Events for receiving
 * messages and make separate POST requests for sending messages.
 *
 * This uses the EventSource API in browsers. You can install the `eventsource` package for Node.js.
 */
export declare class SSEClientTransport implements Transport {
  private _eventSource?;
  private _endpoint?;
  private _abortController?;
  private _url;
  onclose?: () => void;
  onerror?: (error: Error) => void;
  onmessage?: (message: JSONRPCMessage) => void;
  constructor(url: URL);
  start(): Promise<void>;
  close(): Promise<void>;
  send(message: JSONRPCMessage): Promise<void>;
}
//# sourceMappingURL=sse.d.ts.map

消息类型

  • Requests 请求期望从另一端得到响应
    interface Request {
    method: string;
    params?: { };
    }
    
  • Notifications 通知是一方向的消息,不需要回复 ```ts

interface Notification { method: string; params?: { }; }

- Results 结果是请求的成功响应:
```ts

interface Result {
  [key: string]: unknown;
}

  • Errors 错误表明请求失败: ```ts interface Error { code: number; message: string; data?: unknown; } enum ErrorCode { // Standard JSON-RPC error codes ParseError = -32700, InvalidRequest = -32600, MethodNotFound = -32601, InvalidParams = -32602, InternalError = -32603 }
SDKs 和应用程序可以定义自己的错误代码,大于-32000。 错误通过以下方式传播:
- 错误响应请求
- 传输错误事件
- 协议级错误处理器
### 连接生命周期
1. 初始化
    - 客户端发送带有协议版本和功能的 initialize 请求
    - 服务器响应其协议版本和功能
    - 客户端发送 initialized 通知作为确认
    - 普通消息交换开始
2. 消息交换
    - 请求-响应:客户端或服务器发送请求,另一方响应
    - 通知:任何一方发送单向消息
3. 终止
    - 任何一方都可以终止连接:
        - 通过 close() 进行干净关闭
        - 网络端口
        - 出现错误
## Inspector 检查员
MCP 检查员是一个用于测试和调试 MCP 服务器的交互式开发工具。
检查员直接运行 npx ,无需安装:
```shell
npx -y @modelcontextprotocol/inspector npx server-postgres postgresql://postgres:123456@localhost:5432/postgres

服务器连接面板

  • 允许选择连接到服务器的传输方式
  • 对于本地服务器,支持自定义命令行参数和环境变量

资源标签

  • 列出所有可用资源
  • 显示资源元数据(MIME 类型、描述)
  • 允许资源内容检查
  • 支持订阅测试

提示标签

  • 显示可用的提示模板
  • 显示提示参数和描述
  • 启用带自定义参数的快速测试
  • 预览生成的消息

工具标签

  • 工具列表
  • 显示工具架构和描述
  • 启用具有自定义输入的工具测试
  • 显示工具执行结果

通知面板

  • 展示服务器记录的所有日志
  • 显示来自服务器的通知

MCP工程实践

zed插件实现

use serde::Deserialize;
use std::env;
use zed::settings::ContextServerSettings;
use zed_extension_api::{self as zed, serde_json, Command, ContextServerId, Project, Result};

const PACKAGE_NAME: &str = "@zeddotdev/postgres-context-server";
const PACKAGE_VERSION: &str = "0.1.2";
const SERVER_PATH: &str = "node_modules/@zeddotdev/postgres-context-server/index.mjs";
//定义了主扩展结构
struct PostgresModelContextExtension;
//定义了配置结构
#[derive(Debug, Deserialize)]
struct PostgresContextServerSettings {
    database_url: String,
}
//扩展实现
impl zed::Extension for PostgresModelContextExtension {
    fn new() -> Self {
        Self
    }
    //主要逻辑
    fn context_server_command(
        &mut self,
        _context_server_id: &ContextServerId,
        project: &Project,
    ) -> Result<Command> {
        //包管理 
        let version = zed::npm_package_installed_version(PACKAGE_NAME)?;
        if version.as_deref() != Some(PACKAGE_VERSION) {
            //检查并确保正确版本的 Node.js 包已安装
            zed::npm_install_package(PACKAGE_NAME, PACKAGE_VERSION)?;
        }
        //配置管理
        let settings = ContextServerSettings::for_project("postgres-context-server", project)?;
        let Some(settings) = settings.settings else {
            //验证必要的 `database_url` 设置
            return Err("missing `database_url` setting".into());
        };
        let settings: PostgresContextServerSettings =
            serde_json::from_value(settings).map_err(|e| e.to_string())?;
        //启动上下文服务器
        Ok(Command {
            //创建命令来启动 Node.js 服务器
            command: "node".to_string(),
            args: vec![
                env::current_dir()
                    .unwrap()
                    .join(SERVER_PATH)
                    .to_string_lossy()
                    .to_string(),
            ],
            env: vec![
                //传递数据库 URL 作为环境变量
                ("DATABASE_URL".into(), settings.database_url)
            ],
        })
    }
}

zed::register_extension!(PostgresModelContextExtension);

node.js 服务器

#!/usr/bin/env node
// 指定使用 Node.js 运行此脚本
// 导入必要的依赖包
import pg from "pg";// 导入 PostgreSQL 客户端
//https://github.com/modelcontextprotocol/typescript-sdk MCP SDK
import { Server } from "@modelcontextprotocol/sdk/server/index.js";// 导入 MCP 服务器 
import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";// 导入标准输入输出传输
// 导入各种请求模式定义
import {
  CallToolRequestSchema, // 调用工具请求模式
  ListResourcesRequestSchema, // 列出资源请求模式
  ListPromptsRequestSchema, // 列出提示请求模式
  ListToolsRequestSchema, // 列出工具请求模式
  ReadResourceRequestSchema, // 读取资源请求模式
  GetPromptRequestSchema, // 获取提示请求模式
  CompleteRequestSchema, // 完成请求模式
} from "@modelcontextprotocol/sdk/types.js";

// 创建新的 MCP 服务器实例
const server = new Server({
  name: "postgres-context-server", // 服务器名称
  version: "0.1.0", // 版本号
});

// 获取数据库连接 URL
const databaseUrl = process.env.DATABASE_URL;
// 验证数据库 URL 是否有效
if (typeof databaseUrl == null || databaseUrl.trim().length === 0) {
  console.error("Please provide a DATABASE_URL environment variable"); // 输出错误信息
  process.exit(1); // 退出程序
}

// 创建资源基础 URL
const resourceBaseUrl = new URL(databaseUrl);
resourceBaseUrl.protocol = "postgres:"; // 设置协议
resourceBaseUrl.password = ""; // 清空密码

// 输出服务器启动信息
process.stderr.write("starting server. url: " + databaseUrl + "\n");
// 创建数据库连接池
const pool = new pg.Pool({
  connectionString: databaseUrl,
});

// 定义常量
const SCHEMA_PATH = "schema"; // schema 路径
const SCHEMA_PROMPT_NAME = "pg-schema"; // schema 提示名称
const ALL_TABLES = "all-tables"; // 所有表标识符

// 处理列出资源请求
// 工作流程:
// - 连接数据库
// - 查询所有公共 schema 的表
// - 为每个表构建资源 URI
// - 返回资源列表
// - 释放连接
server.setRequestHandler(ListResourcesRequestSchema, async () => {
  const client = await pool.connect(); // 获取数据库连接
  try {
    // 查询所有公共schema的表
    const result = await client.query(
            "SELECT table_name FROM information_schema.tables WHERE table_schema = 'public'",
    );
    return {
      resources: result.rows.map((row) => ({
        uri: new URL(`${row.table_name}/${SCHEMA_PATH}`, resourceBaseUrl).href, // 构建资源URI
        mimeType: "application/json", // 设置MIME类型
        name: `"${row.table_name}" database schema`, // 设置资源名称
      })),
    };
  } finally {
    client.release(); // 释放数据库连接
  }
});

// 处理读取资源请求
// 工作流程:
// - 解析请求的资源 URI
// - 提取表名和 schema 信息
// - 查询指定表的列信息
// - 返回 JSON 格式的列信息
// - 释放数据库连接
server.setRequestHandler(ReadResourceRequestSchema, async (request) => {
  const resourceUrl = new URL(request.params.uri); // 解析资源URL

  // 解析路径组件
  const pathComponents = resourceUrl.pathname.split("/");
  const schema = pathComponents.pop();
  const tableName = pathComponents.pop();

  // 验证schema路径
  if (schema !== SCHEMA_PATH) {
    throw new Error("Invalid resource URI");
  }

  const client = await pool.connect(); // 获取数据库连接
  try {
    // 查询表的列信息
    const result = await client.query(
            "SELECT column_name, data_type FROM information_schema.columns WHERE table_name = $1",
            [tableName],
    );

    return {
      contents: [
        {
          uri: request.params.uri, // 资源URI
          mimeType: "application/json", // MIME类型
          text: JSON.stringify(result.rows, null, 2), // 格式化JSON输出
        },
      ],
    };
  } finally {
    client.release(); // 释放数据库连接
  }
});

// 处理列出工具请求
// 工作流程:
// - 返回可用工具列表
// - 当前只提供一个 "query" 工具,用于执行只读 SQL 查询
server.setRequestHandler(ListToolsRequestSchema, async () => {
  return {
    tools: [
      {
        name: "query", // 工具名称
        description: "Run a read-only SQL query", // 工具描述
        inputSchema: {
          type: "object", // 输入模式类型
          properties: {
            sql: { type: "string" }, // SQL查询字符串
          },
        },
      },
    ],
  };
});

// 处理调用工具请求
// 工作流程:
// - 验证工具名称
// - 开始只读事务
// - 执行 SQL 查询
// - 返回查询结果
// - 回滚事务并释放连接
server.setRequestHandler(CallToolRequestSchema, async (request) => {
  if (request.params.name === "query") {
    const sql = request.params.arguments?.sql; // 获取SQL查询

    const client = await pool.connect(); // 获取数据库连接
    try {
      await client.query("BEGIN TRANSACTION READ ONLY"); // 开始只读事务
      const result = await client.query(sql); // 执行查询
      return { toolResult: result.rows }; // 返回查询结果
    } catch (error) {
      throw error; // 抛出错误
    } finally {
      // 回滚事务
      client
              .query("ROLLBACK")
              .catch((error) =>
                      console.warn("Could not roll back transaction:", error),
              );

      client.release(); // 释放数据库连接
    }
  }
  throw new Error("Tool not found"); // 工具未找到错误
});

// 处理自动完成请求
// 工作流程:
// - 验证提示名称
// - 检查是否已有参数
// - 查询所有表名
// - 返回可用的表名列表作为自动完成选项
server.setRequestHandler(CompleteRequestSchema, async (request) => {
  process.stderr.write("Handling completions/complete request\n"); // 输出调试信息

  if (request.params.ref.name === SCHEMA_PROMPT_NAME) {
    const tableNameQuery = request.params.argument.value; // 获取表名查询
    const alreadyHasArg = /\S*\s/.test(tableNameQuery); // 检查是否已有参数

    // 如果已有参数,返回空结果
    if (alreadyHasArg) {
      return {
        completion: {
          values: [],
        },
      };
    }

    const client = await pool.connect(); // 获取数据库连接
    try {
      // 查询所有表名
      const result = await client.query(
              "SELECT table_name FROM information_schema.tables WHERE table_schema = 'public'",
      );
      const tables = result.rows.map((row) => row.table_name); // 提取表名
      return {
        completion: {
          values: [ALL_TABLES, ...tables], // 返回所有表名和特殊标识符
        },
      };
    } finally {
      client.release(); // 释放数据库连接
    }
  }

  throw new Error("unknown prompt"); // 未知提示错误
});

// 处理列出提示请求
// 工作流程:
// - 返回可用提示列表
// - 包含 schema 提示的描述和参数信息 
server.setRequestHandler(ListPromptsRequestSchema, async () => {
  process.stderr.write("Handling prompts/list request\n"); // 输出调试信息

  return {
    prompts: [
      {
        name: SCHEMA_PROMPT_NAME, // 提示名称
        description: "Retrieve the schema for a given table in the postgres database", // 提示描述
        arguments: [
          {
            name: "tableName", // 参数名称
            description: "the table to describe", // 参数描述
            required: true, // 必填参数
          },
        ],
      },
    ],
  };
});

// 处理获取提示请求
// 工作流程:
// - 验证表名参数
// - 查询表的详细信息
// - 生成 CREATE TABLE SQL 语句
// - 返回格式化的 SQL 代码
server.setRequestHandler(GetPromptRequestSchema, async (request) => {
  process.stderr.write("Handling prompts/get request\n"); // 输出调试信息

  if (request.params.name === SCHEMA_PROMPT_NAME) {
    const tableName = request.params.arguments?.tableName; // 获取表名

    // 验证表名
    if (typeof tableName !== "string" || tableName.length === 0) {
      throw new Error(`Invalid tableName: ${tableName}`);
    }

    const client = await pool.connect(); // 获取数据库连接

    try {
      // 构建基础查询
      const select =
              "SELECT column_name, data_type, is_nullable, column_default, table_name FROM information_schema.columns";

      let result;
      // 根据是否查询所有表执行不同的查询
      if (tableName === ALL_TABLES) {
        result = await client.query(
                `${select} WHERE table_schema NOT IN ('pg_catalog', 'information_schema')`,
        );
      } else {
        result = await client.query(
                `${select} WHERE table_name = $1`, [tableName],
        );
      }

      // 获取所有唯一表名并排序
      const allTableNames = Array.from(
              new Set(result.rows.map((row) => row.table_name).sort()),
      );

      // 构建SQL创建表语句
      let sql = "```sql\n";
      for (let i = 0, len = allTableNames.length; i < len; i++) {
        const tableName = allTableNames[i];
        if (i > 0) {
          sql += "\n";
        }

        // 为每个表构建创建表语句
        sql += [
          `create table "${tableName}" (`,
          result.rows
                  .filter((row) => row.table_name === tableName)
                  .map((row) => {
                    const notNull = row.is_nullable === "NO" ? "" : " not null"; // 处理非空约束
                    const defaultValue =
                            row.column_default != null
                                    ? ` default ${row.column_default}` // 处理默认值
                                    : "";
                    return `    "${row.column_name}" ${row.data_type}${notNull}${defaultValue}`; // 构建列定义
                  })
                  .join(",\n"),
          ");",
        ].join("\n");
        sql += "\n";
      }
      sql += "```";

      // 返回提示响应
      return {
        description:
                tableName === ALL_TABLES
                        ? "all table schemas" // 所有表的描述
                        : `${tableName} schema`, // 单个表的描述
        messages: [
          {
            role: "user",
            content: {
              type: "text",
              text: sql, // SQL创建表语句
            },
          },
        ],
      };
    } finally {
      client.release(); // 释放数据库连接
    }
  }

  throw new Error(`Prompt '${request.params.name}' not implemented`); // 提示未实现错误
});

// 运行服务器的主函数
// 工作流程:
// - 创建标准输入输出传输
// - 连接服务器
// - 开始处理请求
async function runServer() {
  const transport = new StdioServerTransport(); // 创建标准输入输出传输
  await server.connect(transport); // 连接服务器
}

// 启动服务器并处理错误
runServer().catch((error) => {
  console.error(error); // 输出错误信息
  process.exit(1); // 退出程序
});

完整的请求处理流程图:

graph TD
    A[客户端请求] --> B{请求类型判断}
    B -->|ListResources| C[查询所有表]
    B -->|ReadResource| D[查询表结构]
    B -->|ListTools| E[返回工具列表]
    B -->|CallTool| F[执行SQL查询]
    B -->|Complete| G[提供自动完成]
    B -->|ListPrompts| H[返回提示列表]
    B -->|GetPrompt| I[生成Schema SQL]
    
    C --> J[返回响应]
    D --> J
    E --> J
    F --> J
    G --> J
    H --> J
    I --> J
    
    J --> K[释放资源]
    K --> L[结束请求]

每个处理函数都遵循以下通用模式:

  1. 验证输入参数
  2. 获取数据库连接
  3. 执行必要的查询
  4. 处理结果
  5. 释放资源
  6. 返回响应

这种模式确保了:

  • 资源的正确管理
  • 错误的适当处理
  • 连接的及时释放
  • 响应的一致性

    Zed 编辑器和 MCP 的交互流程

    sequenceDiagram
      participant Zed as Zed Editor
      participant Rust as Rust Extension
      participant Node as Node.js Server
      participant PG as PostgreSQL
    
      Zed->>Rust: 启动 MCP
      Rust->>Node: 启动服务器
      Node->>PG: 连接数据库
      Node-->>Zed: 返回服务器端口
        
      loop Context Requests
          Zed->>Node: 发送上下文请求
          Node->>PG: 查询数据库
          PG-->>Node: 返回数据
          Node-->>Zed: 返回上下文信息
      end
    
  1. 提供的主要信息类型:
    • 数据库表结构
    • 列名和数据类型
    • 表之间的关系
    • 约束信息
    • 索引信息
    • 视图定义
    • 存储过程和函数
  2. 使用场景示例:
    SELECT * FROM us|  -- 光标位置
    

    当用户在此处输入时,MCP 会:

  3. 检测到用户正在输入表名
  4. 查询数据库中的所有表
  5. 提供以 “us” 开头的表名建议
  6. 在用户选择表后,可以继续提供该表的列信息

这样的架构允许:

  • 实时数据库结构信息
  • 智能代码补全
  • SQL 语法验证
  • 表关系提示
  • 字段类型检查

通过这种方式,Postgres SQL MCP 为 Zed 编辑器提供了丰富的数据库上下文信息,提高了 SQL 编写的效率和准确性。