proxy.ts 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477
  1. /**
  2. * MCP proxy mode — issue #411.
  3. *
  4. * The proxy is a near-transparent stdio↔socket pipe. Once it has verified
  5. * the daemon's hello line (same major.minor.patch as ours), it does no
  6. * protocol parsing of its own: every byte the MCP host writes to the proxy's
  7. * stdin goes straight to the daemon socket, and every byte the daemon emits
  8. * goes straight to the host's stdout. Server-initiated JSON-RPC requests
  9. * (e.g. `roots/list`) flow through the same pipe transparently.
  10. *
  11. * Lifecycle expectations:
  12. * - The proxy exits when *either* stream closes (host stdin closed →
  13. * daemon socket end, or daemon-side socket close → host stdout end).
  14. * - Closing the socket on the proxy side is what tells the daemon to
  15. * decrement its connected-clients refcount.
  16. * - On a parent-process death we can't detect via stdin close (e.g. SIGKILL
  17. * of the MCP host), the proxy's PPID watchdog catches it — same logic
  18. * the direct-mode server uses; see issue #277.
  19. */
  20. import * as fs from 'fs';
  21. import * as net from 'net';
  22. import { HOST_PPID_ENV } from '../extraction/wasm-runtime-flags';
  23. import { DaemonClientHello, DaemonHello, MAX_HELLO_LINE_BYTES } from './daemon';
  24. import { supervisionLostReason } from './ppid-watchdog';
  25. import { CodeGraphPackageVersion } from './version';
  26. import { SERVER_INFO, PROTOCOL_VERSION } from './session';
  27. import { SERVER_INSTRUCTIONS } from './server-instructions';
  28. import { getStaticTools } from './tools';
  29. import type { MCPEngine } from './engine';
  30. /** Default poll cadence for the PPID watchdog (same as the direct server). */
  31. const DEFAULT_PPID_POLL_MS = 5000;
  32. export interface ProxyResult {
  33. /**
  34. * `proxied` — successfully attached to a same-version daemon and piped
  35. * stdio. The proxy stays alive until either end closes.
  36. * `fallback-needed` — the daemon rejected us (version mismatch / unreachable
  37. * socket) and the caller should run the server in direct mode.
  38. */
  39. outcome: 'proxied' | 'fallback-needed';
  40. reason?: string;
  41. }
  42. /**
  43. * Attempt to connect to the daemon at `socketPath` and pipe stdio through it.
  44. *
  45. * Returns a promise that resolves when either:
  46. * - the connection succeeded and one of stdin/socket has now closed
  47. * (after which the process should exit), or
  48. * - the connection failed early enough that the caller can still fall
  49. * back to direct mode.
  50. *
  51. * The `expectedVersion` param defaults to the package's own version — daemon
  52. * and proxy MUST match exactly. Mismatch resolves with
  53. * `outcome: 'fallback-needed'` so the caller can transparently start its own
  54. * server. (We accept the cost of two concurrent servers in this case as the
  55. * price of never silently running a stale daemon against newer client code.)
  56. */
  57. export async function runProxy(
  58. socketPath: string,
  59. expectedVersion: string = CodeGraphPackageVersion,
  60. ): Promise<ProxyResult> {
  61. // POSIX: refuse to connect to a stale socket file that points at no
  62. // listening process. `fs.existsSync` is a cheap pre-check; a real
  63. // ECONNREFUSED below catches the rare "exists but unbound" race.
  64. if (process.platform !== 'win32' && !fs.existsSync(socketPath)) {
  65. return { outcome: 'fallback-needed', reason: 'socket file missing' };
  66. }
  67. const socket = net.createConnection(socketPath);
  68. socket.setEncoding('utf8');
  69. const hello = await readHelloLine(socket).catch((err) => {
  70. socket.destroy();
  71. return new Error(String(err));
  72. });
  73. if (hello instanceof Error) {
  74. return { outcome: 'fallback-needed', reason: hello.message };
  75. }
  76. if (hello.codegraph !== expectedVersion) {
  77. process.stderr.write(
  78. `[CodeGraph MCP] Found a daemon on ${socketPath} but version (${hello.codegraph}) ` +
  79. `differs from ours (${expectedVersion}); falling back to direct mode.\n`
  80. );
  81. socket.destroy();
  82. return { outcome: 'fallback-needed', reason: 'version mismatch' };
  83. }
  84. process.stderr.write(
  85. `[CodeGraph MCP] Attached to shared daemon on ${socketPath} (pid ${hello.pid}, v${hello.codegraph}).\n`
  86. );
  87. sendClientHello(socket);
  88. startPpidWatchdog(socket);
  89. await pipeUntilClose(socket);
  90. // Host disconnected (or the daemon went away). The proxy's only job is the
  91. // pipe; exit now so we don't linger — process.stdin's 'data' listener would
  92. // otherwise keep the event loop alive and leave a zombie launcher behind.
  93. process.exit(0);
  94. }
  95. /**
  96. * Connect to a daemon at `socketPath` and verify its hello (exact version match).
  97. * Returns the live socket (hello already consumed) or null if unreachable / stale
  98. * / version-mismatched. Unlike {@link runProxy} it does NOT pipe — the caller
  99. * owns the socket. Used by the local-handshake proxy's background connect.
  100. */
  101. export async function connectWithHello(
  102. socketPath: string,
  103. expectedVersion: string = CodeGraphPackageVersion,
  104. ): Promise<net.Socket | 'version-mismatch' | null> {
  105. if (process.platform !== 'win32' && !fs.existsSync(socketPath)) return null;
  106. const socket = net.createConnection(socketPath);
  107. socket.setEncoding('utf8');
  108. const hello = await readHelloLine(socket).catch(() => null);
  109. if (!hello) {
  110. socket.destroy();
  111. return null; // no daemon yet — caller should keep polling
  112. }
  113. if (hello.codegraph !== expectedVersion) {
  114. // A daemon IS up but it's the wrong version — definitive, not a "not yet".
  115. // Don't poll; the caller serves in-process so we never run stale-vs-new.
  116. process.stderr.write(
  117. `[CodeGraph MCP] Found a daemon on ${socketPath} but version (${hello.codegraph}) ` +
  118. `differs from ours (${expectedVersion}); serving this session in-process.\n`
  119. );
  120. socket.destroy();
  121. return 'version-mismatch';
  122. }
  123. process.stderr.write(
  124. `[CodeGraph MCP] Attached to shared daemon on ${socketPath} (pid ${hello.pid}, v${hello.codegraph}).\n`
  125. );
  126. sendClientHello(socket);
  127. return socket;
  128. }
  129. /**
  130. * Tell the daemon our pids right after we verify its hello, so its liveness
  131. * sweep can reap this client if our process dies without the socket ever
  132. * signalling close (the Windows named-pipe hazard behind #692). Best-effort:
  133. * sent before any piped bytes so it's always the daemon's first line from us,
  134. * and a write failure here is harmless (the daemon just falls back to the
  135. * socket-close lifecycle). `hostPid` mirrors the PPID watchdog: the threaded
  136. * host pid if set, else our own parent (the host, on a no-relaunch bundle).
  137. */
  138. function sendClientHello(socket: net.Socket): void {
  139. const clientHello: DaemonClientHello = {
  140. codegraph_client: 1,
  141. pid: process.pid,
  142. hostPid: parseHostPpid(process.env[HOST_PPID_ENV]) ?? process.ppid,
  143. };
  144. try { socket.write(JSON.stringify(clientHello) + '\n'); } catch { /* best-effort */ }
  145. }
  146. type JsonRpc = Record<string, unknown>;
  147. /** Dependencies the local-handshake proxy needs, injected by MCPServer (which
  148. * owns the daemon-spawn machinery and the engine factory). */
  149. export interface LocalHandshakeDeps {
  150. /** Probe → spawn → retry → hello-verify; resolves a connected daemon socket,
  151. * or null when the daemon path is genuinely unavailable (→ in-process fallback). */
  152. getDaemonSocket(): Promise<net.Socket | null>;
  153. /** Lazily create an in-process engine — used ONLY if the daemon never comes up,
  154. * preserving the "a broken daemon never wedges a session" guarantee. */
  155. makeEngine(): MCPEngine;
  156. /** Project root for the fallback engine's lazy init. */
  157. root: string;
  158. }
  159. /**
  160. * Local-handshake proxy (the cold-start fix).
  161. *
  162. * Answers `initialize` + `tools/list` from STATIC constants the instant the
  163. * client asks — tools register in ~process-startup time instead of waiting
  164. * ~600ms for the daemon to spawn+bind, which is what produced the "No such tool
  165. * available" race that made headless agents flail into grep/Read. Tool CALLS are
  166. * forwarded to the shared daemon (connected in the background); the daemon's
  167. * response to the forwarded `initialize` is suppressed (the client already got
  168. * the local one). If the daemon never comes up (version mismatch / spawn fail),
  169. * a lazily-created in-process engine serves the calls — so the handshake speedup
  170. * never costs the old fall-back-to-direct robustness.
  171. */
  172. export async function runLocalHandshakeProxy(deps: LocalHandshakeDeps): Promise<void> {
  173. let daemonStatus: 'connecting' | 'ready' | 'failed' = 'connecting';
  174. let daemonSocket: net.Socket | null = null;
  175. let clientInitId: unknown = undefined; // suppress the daemon's reply to the forwarded initialize
  176. const pending: string[] = []; // client lines buffered until the daemon resolves
  177. let engine: MCPEngine | null = null;
  178. let engineReady: Promise<void> | null = null;
  179. let shuttingDown = false;
  180. const writeClient = (obj: JsonRpc | string): void => {
  181. try { process.stdout.write((typeof obj === 'string' ? obj : JSON.stringify(obj)) + '\n'); } catch { /* host gone */ }
  182. };
  183. const shutdown = (): void => {
  184. if (shuttingDown) return; shuttingDown = true;
  185. try { daemonSocket?.destroy(); } catch { /* ignore */ }
  186. try { engine?.stop(); } catch { /* ignore */ }
  187. process.exit(0);
  188. };
  189. const ensureEngine = (): Promise<void> => {
  190. if (!engine) engine = deps.makeEngine();
  191. if (!engineReady) engineReady = engine.ensureInitialized(deps.root).catch(() => { /* degraded */ });
  192. return engineReady;
  193. };
  194. // Daemon-unavailable fallback: serve a client message in-process.
  195. const handleLocally = async (line: string): Promise<void> => {
  196. let msg: JsonRpc; try { msg = JSON.parse(line) as JsonRpc; } catch { return; }
  197. const id = msg.id;
  198. if (msg.method === 'tools/call' && id !== undefined) {
  199. try {
  200. await ensureEngine();
  201. const params = (msg.params || {}) as { name: string; arguments?: Record<string, unknown> };
  202. const result = await engine!.getToolHandler().execute(params.name, params.arguments || {});
  203. writeClient({ jsonrpc: '2.0', id, result });
  204. } catch (err) {
  205. writeClient({ jsonrpc: '2.0', id, error: { code: -32603, message: err instanceof Error ? err.message : String(err) } });
  206. }
  207. } else if (msg.method === 'ping' && id !== undefined) {
  208. writeClient({ jsonrpc: '2.0', id, result: {} });
  209. }
  210. // initialize already answered locally; notifications (initialized) need no reply.
  211. };
  212. const routeToDaemon = (line: string): void => {
  213. if (daemonStatus === 'ready' && daemonSocket) {
  214. try { daemonSocket.write(line.endsWith('\n') ? line : line + '\n'); } catch { /* close path */ }
  215. } else if (daemonStatus === 'failed') {
  216. void handleLocally(line);
  217. } else {
  218. pending.push(line);
  219. }
  220. };
  221. // ---- client (stdin) ----
  222. let stdinBuf = '';
  223. process.stdin.setEncoding('utf8');
  224. process.stdin.on('data', (chunk: string) => {
  225. stdinBuf += chunk;
  226. let idx: number;
  227. while ((idx = stdinBuf.indexOf('\n')) !== -1) {
  228. const line = stdinBuf.slice(0, idx).trim();
  229. stdinBuf = stdinBuf.slice(idx + 1);
  230. if (!line) continue;
  231. let msg: JsonRpc; try { msg = JSON.parse(line) as JsonRpc; } catch { routeToDaemon(line); continue; }
  232. if (msg.method === 'initialize') {
  233. clientInitId = msg.id;
  234. writeClient({ jsonrpc: '2.0', id: msg.id, result: { protocolVersion: PROTOCOL_VERSION, capabilities: { tools: {} }, serverInfo: SERVER_INFO, instructions: SERVER_INSTRUCTIONS } });
  235. routeToDaemon(line); // prime the daemon so it resolves the project (its reply is suppressed below)
  236. } else if (msg.method === 'tools/list') {
  237. writeClient({ jsonrpc: '2.0', id: msg.id, result: { tools: getStaticTools() } });
  238. } else if (msg.method === 'resources/list') {
  239. // No resources exposed — answer the probe locally so it never reaches
  240. // the daemon as an unhandled method and logs `-32601`. (#621)
  241. writeClient({ jsonrpc: '2.0', id: msg.id, result: { resources: [] } });
  242. } else if (msg.method === 'resources/templates/list') {
  243. writeClient({ jsonrpc: '2.0', id: msg.id, result: { resourceTemplates: [] } });
  244. } else if (msg.method === 'prompts/list') {
  245. writeClient({ jsonrpc: '2.0', id: msg.id, result: { prompts: [] } });
  246. } else {
  247. routeToDaemon(line);
  248. }
  249. }
  250. });
  251. process.stdin.on('end', shutdown);
  252. process.stdin.on('close', shutdown);
  253. startPpidWatchdogNoSocket(shutdown);
  254. // ---- daemon connection (background) ----
  255. let socket: net.Socket | null = null;
  256. try { socket = await deps.getDaemonSocket(); } catch { socket = null; }
  257. if (socket && !shuttingDown) {
  258. daemonSocket = socket;
  259. daemonStatus = 'ready';
  260. let sockBuf = '';
  261. socket.setEncoding('utf8');
  262. socket.on('data', (chunk: string) => {
  263. sockBuf += chunk;
  264. let idx: number;
  265. while ((idx = sockBuf.indexOf('\n')) !== -1) {
  266. const line = sockBuf.slice(0, idx);
  267. sockBuf = sockBuf.slice(idx + 1);
  268. if (!line.trim()) continue;
  269. if (clientInitId !== undefined) {
  270. try { const m = JSON.parse(line) as JsonRpc; if (m.id === clientInitId && ('result' in m || 'error' in m)) continue; } catch { /* relay */ }
  271. }
  272. writeClient(line);
  273. }
  274. });
  275. socket.on('close', shutdown);
  276. socket.on('error', shutdown);
  277. for (const line of pending) { try { socket.write(line + '\n'); } catch { /* ignore */ } }
  278. pending.length = 0;
  279. } else if (!shuttingDown) {
  280. daemonStatus = 'failed';
  281. process.stderr.write('[CodeGraph MCP] Shared daemon unavailable; serving this session in-process (degraded).\n');
  282. const buffered = pending.splice(0);
  283. for (const line of buffered) await handleLocally(line);
  284. }
  285. await new Promise<void>(() => { /* stdin keeps the loop alive; exit via shutdown() */ });
  286. }
  287. /** PPID watchdog for the local-handshake proxy — same #277 logic as
  288. * {@link startPpidWatchdog} but with no socket to close (the caller's shutdown
  289. * handles teardown). */
  290. function startPpidWatchdogNoSocket(onDeath: () => void): void {
  291. const pollMs = parsePollMs(process.env.CODEGRAPH_PPID_POLL_MS);
  292. if (pollMs <= 0) return;
  293. const originalPpid = process.ppid;
  294. const hostPpid = parseHostPpid(process.env[HOST_PPID_ENV]);
  295. const timer = setInterval(() => {
  296. const reason = supervisionLostReason({
  297. originalPpid,
  298. currentPpid: process.ppid,
  299. hostPpid,
  300. isAlive: isProcessAliveLocal,
  301. });
  302. if (reason) {
  303. process.stderr.write(`[CodeGraph MCP] Parent process exited (${reason}); shutting down.\n`);
  304. onDeath();
  305. }
  306. }, pollMs);
  307. timer.unref?.();
  308. }
  309. /**
  310. * Read one CRLF/LF-terminated JSON line from the socket, parse it as the
  311. * daemon hello, and return it. Bounded to {@link MAX_HELLO_LINE_BYTES} so a
  312. * malicious or broken peer can't OOM us. Times out at 3s — a healthy daemon
  313. * sends hello immediately on accept.
  314. */
  315. function readHelloLine(socket: net.Socket): Promise<DaemonHello> {
  316. return new Promise((resolve, reject) => {
  317. let buffer = '';
  318. const cleanup = () => {
  319. socket.removeListener('data', onData);
  320. socket.removeListener('error', onError);
  321. socket.removeListener('close', onClose);
  322. clearTimeout(timer);
  323. };
  324. const onData = (chunk: string | Buffer) => {
  325. buffer += typeof chunk === 'string' ? chunk : chunk.toString('utf8');
  326. const idx = buffer.indexOf('\n');
  327. if (idx === -1) {
  328. if (buffer.length > MAX_HELLO_LINE_BYTES) {
  329. cleanup();
  330. reject(new Error('daemon hello line exceeded size limit'));
  331. }
  332. return;
  333. }
  334. const line = buffer.slice(0, idx);
  335. // Re-emit anything past the newline so the pipe-stage sees it.
  336. const tail = buffer.slice(idx + 1);
  337. cleanup();
  338. if (tail.length > 0) {
  339. // Push back via unshift — Node's net.Socket supports it on readable streams.
  340. socket.unshift(tail);
  341. }
  342. try {
  343. const parsed = JSON.parse(line) as DaemonHello;
  344. if (typeof parsed.codegraph !== 'string' || typeof parsed.pid !== 'number') {
  345. reject(new Error('daemon hello missing required fields'));
  346. return;
  347. }
  348. resolve(parsed);
  349. } catch (err) {
  350. reject(new Error(`daemon hello not JSON: ${err instanceof Error ? err.message : String(err)}`));
  351. }
  352. };
  353. const onError = (err: Error) => { cleanup(); reject(err); };
  354. const onClose = () => { cleanup(); reject(new Error('daemon closed connection before hello')); };
  355. const timer = setTimeout(() => {
  356. cleanup();
  357. reject(new Error('timed out waiting for daemon hello'));
  358. }, 3000);
  359. timer.unref?.();
  360. socket.on('data', onData);
  361. socket.on('error', onError);
  362. socket.on('close', onClose);
  363. });
  364. }
  365. /**
  366. * Pipe stdin → socket and socket → stdout. Resolves once either end closes
  367. * so the process can exit. Note: we deliberately do NOT use
  368. * `process.stdin.pipe(socket)` because pipe propagates 'end' onto the
  369. * downstream, which would close the socket prematurely if stdin happens to
  370. * end early — the MCP spec allows it to stay open across reconnects.
  371. */
  372. function pipeUntilClose(socket: net.Socket): Promise<void> {
  373. return new Promise((resolve) => {
  374. let resolved = false;
  375. const done = () => { if (!resolved) { resolved = true; resolve(); } };
  376. process.stdin.on('data', (chunk) => {
  377. try { socket.write(chunk); } catch { /* socket may have errored — close path catches it */ }
  378. });
  379. process.stdin.on('end', () => {
  380. try { socket.end(); } catch { /* ignore */ }
  381. done();
  382. });
  383. process.stdin.on('close', () => {
  384. try { socket.destroy(); } catch { /* ignore */ }
  385. done();
  386. });
  387. socket.on('data', (chunk) => {
  388. try { process.stdout.write(chunk); } catch { /* ignore */ }
  389. });
  390. socket.on('end', () => done());
  391. socket.on('close', () => done());
  392. socket.on('error', (err) => {
  393. process.stderr.write(`[CodeGraph MCP] daemon socket error: ${err.message}\n`);
  394. done();
  395. });
  396. });
  397. }
  398. /**
  399. * PPID watchdog mirroring the one in `MCPServer.start` — kills the proxy if
  400. * the MCP host (or its proxy of a host, see HOST_PPID_ENV) goes away without
  401. * closing stdin. Issue #277 documents why we can't rely on stdin EOF on
  402. * Linux: the parent may be SIGKILL'd and reparenting doesn't close pipes.
  403. *
  404. * The proxy's "kill" is just a socket close + process.exit — no SQLite or
  405. * watchers to clean up, so this is cheap.
  406. */
  407. function startPpidWatchdog(socket: net.Socket): void {
  408. const pollMs = parsePollMs(process.env.CODEGRAPH_PPID_POLL_MS);
  409. if (pollMs <= 0) return;
  410. const originalPpid = process.ppid;
  411. const hostPpid = parseHostPpid(process.env[HOST_PPID_ENV]);
  412. const timer = setInterval(() => {
  413. const reason = supervisionLostReason({
  414. originalPpid,
  415. currentPpid: process.ppid,
  416. hostPpid,
  417. isAlive: isProcessAliveLocal,
  418. });
  419. if (reason) {
  420. process.stderr.write(`[CodeGraph MCP] Parent process exited (${reason}); shutting down.\n`);
  421. try { socket.destroy(); } catch { /* ignore */ }
  422. process.exit(0);
  423. }
  424. }, pollMs);
  425. timer.unref?.();
  426. }
  427. function parsePollMs(raw: string | undefined): number {
  428. if (raw === undefined || raw === '') return DEFAULT_PPID_POLL_MS;
  429. const parsed = Number(raw);
  430. if (!Number.isFinite(parsed)) return DEFAULT_PPID_POLL_MS;
  431. if (parsed < 0) return DEFAULT_PPID_POLL_MS;
  432. return Math.floor(parsed);
  433. }
  434. function parseHostPpid(raw: string | undefined): number | null {
  435. if (raw === undefined || raw === '') return null;
  436. const parsed = Number(raw);
  437. if (!Number.isInteger(parsed) || parsed <= 1) return null;
  438. return parsed;
  439. }
  440. function isProcessAliveLocal(pid: number): boolean {
  441. try {
  442. process.kill(pid, 0);
  443. return true;
  444. } catch (err: unknown) {
  445. const e = err as NodeJS.ErrnoException;
  446. if (e.code === 'EPERM') return true;
  447. return false;
  448. }
  449. }