Переглянути джерело

perf(mcp): proxy answers initialize/tools-list locally — cold-start handshake ~600ms→~90ms

The proxy used to WAIT for the shared daemon to spawn + bind (~600ms) before the
MCP handshake completed, so a headless agent firing its first turn inside that
window hit "No such tool available" and flailed into grep/Read (the cold-start
race). But initialize + tools/list are STATIC — they don't need the daemon.

`runLocalHandshakeProxy` (proxy.ts) now answers them instantly from constants
(PROTOCOL_VERSION/SERVER_INFO/SERVER_INSTRUCTIONS + getStaticTools()), and only
tool CALLS are forwarded to the daemon — which connects in the background while
the agent thinks between tools/list and its first call. The daemon's reply to the
forwarded initialize is suppressed (the client already got the local one).

Robustness preserved: if the daemon never comes up (version mismatch / spawn
fail), a lazily-created in-process engine serves the calls — the old fall-back-
to-direct guarantee, just deferred. connectWithHello distinguishes
'version-mismatch' (definitive → serve in-process immediately, no 6s poll) from
'not yet' (keep polling), and emits the "Attached to shared daemon" line the
daemon tests key on.

Result: tool registration cold 91ms / warm 88ms (was ~600–811ms) — the race is
ELIMINATED, not just narrowed. First tool call still waits for the daemon's index
load (a call that returns late, not a missing-tool error). Validated: 28/28
MCP+daemon tests (sharing, #277 survive-client-death, version-mismatch fallback,
idle-timeout), full suite 1090 pass (only pre-existing npm-shim network fails),
end-to-end smoke initialize→tools/list→tools/call returns correct results.

(The old transparent-pipe runProxy/pipeUntilClose are now superseded and unused —
left in place for this change; safe to prune in a follow-up.)

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Colby McHenry 3 тижнів тому
батько
коміт
82ae484f5a
5 змінених файлів з 250 додано та 40 видалено
  1. 3 2
      __tests__/mcp-daemon.test.ts
  2. 34 36
      src/mcp/index.ts
  3. 196 0
      src/mcp/proxy.ts
  4. 4 2
      src/mcp/session.ts
  5. 13 0
      src/mcp/tools.ts

+ 3 - 2
__tests__/mcp-daemon.test.ts

@@ -346,11 +346,12 @@ describe('Shared MCP daemon (issue #411)', () => {
       servers.push(server);
       sendInitialize(server.child, `file://${tempDir}`, 1);
       // Despite the mismatched daemon, the client still gets an initialize
-      // response — the proxy refuses to attach and falls back to direct mode.
+      // response — the proxy answers the handshake locally and, refusing to
+      // attach across the version mismatch, serves the session in-process.
       const resp = await waitFor(() => findResponse(server.stdout, 1), 10000);
       expect(resp.result.serverInfo.name).toBe('codegraph');
       await waitFor(
-        () => server.stderr.some((l) => l.includes('falling back to direct mode')),
+        () => server.stderr.some((l) => l.includes('serving this session in-process')),
         6000,
       );
     } finally {

+ 34 - 36
src/mcp/index.ts

@@ -47,7 +47,7 @@ import {
   isProcessAlive,
   tryAcquireDaemonLock,
 } from './daemon';
-import { runProxy } from './proxy';
+import { connectWithHello, runLocalHandshakeProxy } from './proxy';
 import { getDaemonSocketPath } from './daemon-paths';
 import { HOST_PPID_ENV } from '../extraction/wasm-runtime-flags';
 
@@ -263,21 +263,20 @@ export class MCPServer {
     }
 
     try {
-      const mode = await this.connectOrSpawnDaemon(root);
-      if (mode === 'fallback') {
-        return this.startDirect('daemon unavailable; fallback to direct');
-      }
-      // 'proxy': connectOrSpawnDaemon ran the stdio↔socket pipe to completion
-      // (it only returns once the host disconnected). The process is now
-      // expected to terminate naturally — the proxy installed its own watchdog.
+      // Answer the MCP handshake LOCALLY (instant tool registration — no waiting
+      // ~600ms for the daemon to spawn+bind, which produced the cold-start race)
+      // and forward tool CALLS to the shared daemon, connected in the background.
+      // Runs until the host disconnects; the proxy installs its own watchdog and
+      // falls back to an in-process engine if the daemon never comes up.
       this.mode = 'proxy';
+      await this.runProxyWithLocalHandshake(root);
       return;
     } catch (err) {
-      // Belt-and-braces: if anything throws inside the daemon machinery,
-      // never wedge the user — fall back to a working direct-mode session.
+      // Belt-and-braces: a throw during proxy SETUP (before the client was served)
+      // is still safe to recover from with a direct-mode session.
       const msg = err instanceof Error ? err.message : String(err);
-      process.stderr.write(`[CodeGraph MCP] Daemon path failed (${msg}); falling back to direct mode.\n`);
-      return this.startDirect('daemon path threw');
+      process.stderr.write(`[CodeGraph MCP] Proxy path failed (${msg}); falling back to direct mode.\n`);
+      return this.startDirect('proxy path threw');
     }
   }
 
@@ -381,32 +380,31 @@ export class MCPServer {
   }
 
   /**
-   * Become a proxy to the shared daemon, spawning the daemon first if none is
-   * reachable. Returns 'proxy' once the proxied session has run to completion
-   * (the host disconnected), or 'fallback' if the caller should run in-process.
+   * Proxy mode (the common case). Serve the MCP handshake LOCALLY for instant
+   * tool registration, forwarding tool calls to the shared daemon — which is
+   * connected in the background (probed, then spawned + polled if absent) so the
+   * handshake never waits ~600ms on it. Runs until the host disconnects; the
+   * proxy falls back to an in-process engine if the daemon never binds, so this
+   * never wedges a session.
    */
-  private async connectOrSpawnDaemon(root: string): Promise<'proxy' | 'fallback'> {
+  private async runProxyWithLocalHandshake(root: string): Promise<void> {
     const socketPath = getDaemonSocketPath(root);
-
-    // Fast path: a daemon may already be listening. On success runProxy pipes
-    // stdio until the host disconnects, so a 'proxied' outcome means this
-    // process has finished its entire job.
-    let probe = await runProxy(socketPath);
-    if (probe.outcome === 'proxied') return 'proxy';
-    if (probe.reason === 'version mismatch') return 'fallback';
-
-    // No reachable daemon — spawn one (detached) and wait for it to bind.
-    spawnDetachedDaemon(root);
-
-    for (let attempt = 0; attempt < DAEMON_CONNECT_MAX_RETRIES; attempt++) {
-      await sleep(DAEMON_CONNECT_RETRY_DELAY_MS);
-      probe = await runProxy(socketPath);
-      if (probe.outcome === 'proxied') return 'proxy';
-      if (probe.reason === 'version mismatch') return 'fallback';
-    }
-
-    // Daemon never came up in time — run in-process so the user is never blocked.
-    return 'fallback';
+    const getDaemonSocket = async () => {
+      // Fast path: a daemon may already be listening.
+      const probe = await connectWithHello(socketPath);
+      if (probe === 'version-mismatch') return null; // definitive — serve in-process, don't poll for 6s
+      if (probe) return probe;
+      // None reachable — spawn one (detached) and poll for its bind.
+      spawnDetachedDaemon(root);
+      for (let attempt = 0; attempt < DAEMON_CONNECT_MAX_RETRIES; attempt++) {
+        await sleep(DAEMON_CONNECT_RETRY_DELAY_MS);
+        const s = await connectWithHello(socketPath);
+        if (s === 'version-mismatch') return null;
+        if (s) return s;
+      }
+      return null; // never bound — the proxy serves this session in-process
+    };
+    await runLocalHandshakeProxy({ getDaemonSocket, makeEngine: () => new MCPEngine(), root });
   }
 
   /** Standard SIGINT/SIGTERM handlers that route to our `stop()` (direct mode). */

+ 196 - 0
src/mcp/proxy.ts

@@ -23,6 +23,10 @@ import * as net from 'net';
 import { HOST_PPID_ENV } from '../extraction/wasm-runtime-flags';
 import { DaemonHello, MAX_HELLO_LINE_BYTES } from './daemon';
 import { CodeGraphPackageVersion } from './version';
+import { SERVER_INFO, PROTOCOL_VERSION } from './session';
+import { SERVER_INSTRUCTIONS } from './server-instructions';
+import { getStaticTools } from './tools';
+import type { MCPEngine } from './engine';
 
 /** Default poll cadence for the PPID watchdog (same as the direct server). */
 const DEFAULT_PPID_POLL_MS = 5000;
@@ -96,6 +100,198 @@ export async function runProxy(
   process.exit(0);
 }
 
+/**
+ * Connect to a daemon at `socketPath` and verify its hello (exact version match).
+ * Returns the live socket (hello already consumed) or null if unreachable / stale
+ * / version-mismatched. Unlike {@link runProxy} it does NOT pipe — the caller
+ * owns the socket. Used by the local-handshake proxy's background connect.
+ */
+export async function connectWithHello(
+  socketPath: string,
+  expectedVersion: string = CodeGraphPackageVersion,
+): Promise<net.Socket | 'version-mismatch' | null> {
+  if (process.platform !== 'win32' && !fs.existsSync(socketPath)) return null;
+  const socket = net.createConnection(socketPath);
+  socket.setEncoding('utf8');
+  const hello = await readHelloLine(socket).catch(() => null);
+  if (!hello) {
+    socket.destroy();
+    return null; // no daemon yet — caller should keep polling
+  }
+  if (hello.codegraph !== expectedVersion) {
+    // A daemon IS up but it's the wrong version — definitive, not a "not yet".
+    // Don't poll; the caller serves in-process so we never run stale-vs-new.
+    process.stderr.write(
+      `[CodeGraph MCP] Found a daemon on ${socketPath} but version (${hello.codegraph}) ` +
+      `differs from ours (${expectedVersion}); serving this session in-process.\n`
+    );
+    socket.destroy();
+    return 'version-mismatch';
+  }
+  process.stderr.write(
+    `[CodeGraph MCP] Attached to shared daemon on ${socketPath} (pid ${hello.pid}, v${hello.codegraph}).\n`
+  );
+  return socket;
+}
+
+type JsonRpc = Record<string, unknown>;
+
+/** Dependencies the local-handshake proxy needs, injected by MCPServer (which
+ *  owns the daemon-spawn machinery and the engine factory). */
+export interface LocalHandshakeDeps {
+  /** Probe → spawn → retry → hello-verify; resolves a connected daemon socket,
+   *  or null when the daemon path is genuinely unavailable (→ in-process fallback). */
+  getDaemonSocket(): Promise<net.Socket | null>;
+  /** Lazily create an in-process engine — used ONLY if the daemon never comes up,
+   *  preserving the "a broken daemon never wedges a session" guarantee. */
+  makeEngine(): MCPEngine;
+  /** Project root for the fallback engine's lazy init. */
+  root: string;
+}
+
+/**
+ * Local-handshake proxy (the cold-start fix).
+ *
+ * Answers `initialize` + `tools/list` from STATIC constants the instant the
+ * client asks — tools register in ~process-startup time instead of waiting
+ * ~600ms for the daemon to spawn+bind, which is what produced the "No such tool
+ * available" race that made headless agents flail into grep/Read. Tool CALLS are
+ * forwarded to the shared daemon (connected in the background); the daemon's
+ * response to the forwarded `initialize` is suppressed (the client already got
+ * the local one). If the daemon never comes up (version mismatch / spawn fail),
+ * a lazily-created in-process engine serves the calls — so the handshake speedup
+ * never costs the old fall-back-to-direct robustness.
+ */
+export async function runLocalHandshakeProxy(deps: LocalHandshakeDeps): Promise<void> {
+  let daemonStatus: 'connecting' | 'ready' | 'failed' = 'connecting';
+  let daemonSocket: net.Socket | null = null;
+  let clientInitId: unknown = undefined;   // suppress the daemon's reply to the forwarded initialize
+  const pending: string[] = [];            // client lines buffered until the daemon resolves
+  let engine: MCPEngine | null = null;
+  let engineReady: Promise<void> | null = null;
+  let shuttingDown = false;
+
+  const writeClient = (obj: JsonRpc | string): void => {
+    try { process.stdout.write((typeof obj === 'string' ? obj : JSON.stringify(obj)) + '\n'); } catch { /* host gone */ }
+  };
+  const shutdown = (): void => {
+    if (shuttingDown) return; shuttingDown = true;
+    try { daemonSocket?.destroy(); } catch { /* ignore */ }
+    try { engine?.stop(); } catch { /* ignore */ }
+    process.exit(0);
+  };
+  const ensureEngine = (): Promise<void> => {
+    if (!engine) engine = deps.makeEngine();
+    if (!engineReady) engineReady = engine.ensureInitialized(deps.root).catch(() => { /* degraded */ });
+    return engineReady;
+  };
+  // Daemon-unavailable fallback: serve a client message in-process.
+  const handleLocally = async (line: string): Promise<void> => {
+    let msg: JsonRpc; try { msg = JSON.parse(line) as JsonRpc; } catch { return; }
+    const id = msg.id;
+    if (msg.method === 'tools/call' && id !== undefined) {
+      try {
+        await ensureEngine();
+        const params = (msg.params || {}) as { name: string; arguments?: Record<string, unknown> };
+        const result = await engine!.getToolHandler().execute(params.name, params.arguments || {});
+        writeClient({ jsonrpc: '2.0', id, result });
+      } catch (err) {
+        writeClient({ jsonrpc: '2.0', id, error: { code: -32603, message: err instanceof Error ? err.message : String(err) } });
+      }
+    } else if (msg.method === 'ping' && id !== undefined) {
+      writeClient({ jsonrpc: '2.0', id, result: {} });
+    }
+    // initialize already answered locally; notifications (initialized) need no reply.
+  };
+  const routeToDaemon = (line: string): void => {
+    if (daemonStatus === 'ready' && daemonSocket) {
+      try { daemonSocket.write(line.endsWith('\n') ? line : line + '\n'); } catch { /* close path */ }
+    } else if (daemonStatus === 'failed') {
+      void handleLocally(line);
+    } else {
+      pending.push(line);
+    }
+  };
+
+  // ---- client (stdin) ----
+  let stdinBuf = '';
+  process.stdin.setEncoding('utf8');
+  process.stdin.on('data', (chunk: string) => {
+    stdinBuf += chunk;
+    let idx: number;
+    while ((idx = stdinBuf.indexOf('\n')) !== -1) {
+      const line = stdinBuf.slice(0, idx).trim();
+      stdinBuf = stdinBuf.slice(idx + 1);
+      if (!line) continue;
+      let msg: JsonRpc; try { msg = JSON.parse(line) as JsonRpc; } catch { routeToDaemon(line); continue; }
+      if (msg.method === 'initialize') {
+        clientInitId = msg.id;
+        writeClient({ jsonrpc: '2.0', id: msg.id, result: { protocolVersion: PROTOCOL_VERSION, capabilities: { tools: {} }, serverInfo: SERVER_INFO, instructions: SERVER_INSTRUCTIONS } });
+        routeToDaemon(line); // prime the daemon so it resolves the project (its reply is suppressed below)
+      } else if (msg.method === 'tools/list') {
+        writeClient({ jsonrpc: '2.0', id: msg.id, result: { tools: getStaticTools() } });
+      } else {
+        routeToDaemon(line);
+      }
+    }
+  });
+  process.stdin.on('end', shutdown);
+  process.stdin.on('close', shutdown);
+  startPpidWatchdogNoSocket(shutdown);
+
+  // ---- daemon connection (background) ----
+  let socket: net.Socket | null = null;
+  try { socket = await deps.getDaemonSocket(); } catch { socket = null; }
+
+  if (socket && !shuttingDown) {
+    daemonSocket = socket;
+    daemonStatus = 'ready';
+    let sockBuf = '';
+    socket.setEncoding('utf8');
+    socket.on('data', (chunk: string) => {
+      sockBuf += chunk;
+      let idx: number;
+      while ((idx = sockBuf.indexOf('\n')) !== -1) {
+        const line = sockBuf.slice(0, idx);
+        sockBuf = sockBuf.slice(idx + 1);
+        if (!line.trim()) continue;
+        if (clientInitId !== undefined) {
+          try { const m = JSON.parse(line) as JsonRpc; if (m.id === clientInitId && ('result' in m || 'error' in m)) continue; } catch { /* relay */ }
+        }
+        writeClient(line);
+      }
+    });
+    socket.on('close', shutdown);
+    socket.on('error', shutdown);
+    for (const line of pending) { try { socket.write(line + '\n'); } catch { /* ignore */ } }
+    pending.length = 0;
+  } else if (!shuttingDown) {
+    daemonStatus = 'failed';
+    process.stderr.write('[CodeGraph MCP] Shared daemon unavailable; serving this session in-process (degraded).\n');
+    const buffered = pending.splice(0);
+    for (const line of buffered) await handleLocally(line);
+  }
+
+  await new Promise<void>(() => { /* stdin keeps the loop alive; exit via shutdown() */ });
+}
+
+/** PPID watchdog for the local-handshake proxy — same #277 logic as
+ *  {@link startPpidWatchdog} but with no socket to close (the caller's shutdown
+ *  handles teardown). */
+function startPpidWatchdogNoSocket(onDeath: () => void): void {
+  const pollMs = parsePollMs(process.env.CODEGRAPH_PPID_POLL_MS);
+  if (pollMs <= 0) return;
+  const originalPpid = process.ppid;
+  const hostPpid = parseHostPpid(process.env[HOST_PPID_ENV]);
+  const timer = setInterval(() => {
+    if (process.ppid !== originalPpid || (hostPpid !== null && !isProcessAliveLocal(hostPpid))) {
+      process.stderr.write('[CodeGraph MCP] Parent process exited; shutting down.\n');
+      onDeath();
+    }
+  }, pollMs);
+  timer.unref?.();
+}
+
 /**
  * Read one CRLF/LF-terminated JSON line from the socket, parse it as the
  * daemon hello, and return it. Bounded to {@link MAX_HELLO_LINE_BYTES} so a

+ 4 - 2
src/mcp/session.ts

@@ -23,13 +23,15 @@ import { CodeGraphPackageVersion } from './version';
  * MCP Server Info — kept on the session because some clients log it. The
  * version tracks the real package version (was a hard-coded '0.1.0').
  */
-const SERVER_INFO = {
+// Exported so the proxy can answer `initialize` locally with the IDENTICAL
+// payload the daemon would send — no drift between the two handshake paths.
+export const SERVER_INFO = {
   name: 'codegraph',
   version: CodeGraphPackageVersion,
 };
 
 /** MCP Protocol Version (latest the server claims). */
-const PROTOCOL_VERSION = '2024-11-05';
+export const PROTOCOL_VERSION = '2024-11-05';
 
 /**
  * How long to wait for the client's `roots/list` response before giving up

+ 13 - 0
src/mcp/tools.ts

@@ -630,6 +630,19 @@ export const tools: ToolDefinition[] = [
   },
 ];
 
+/**
+ * Allowlist-filtered tool definitions WITHOUT an engine — the static surface the
+ * proxy answers `tools/list` with before any project is open. Mirrors
+ * `ToolHandler.getTools()` in the no-CodeGraph case (the dynamic per-repo budget
+ * note in a description only adds once `cg` is loaded; the schemas are static).
+ */
+export function getStaticTools(): ToolDefinition[] {
+  const raw = process.env.CODEGRAPH_MCP_TOOLS;
+  if (!raw || !raw.trim()) return tools;
+  const allow = new Set(raw.split(',').map(s => s.trim().replace(/^codegraph_/, '')).filter(Boolean));
+  return allow.size ? tools.filter(t => allow.has(t.name.replace(/^codegraph_/, ''))) : tools;
+}
+
 /**
  * Tool handler that executes tools against a CodeGraph instance
  *