transport.ts 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407
  1. /**
  2. * MCP JSON-RPC Transports
  3. *
  4. * Two flavors share the same wire format (newline-delimited JSON-RPC 2.0):
  5. *
  6. * - `StdioTransport` — original transport; reads/writes the process's
  7. * stdin/stdout. Used by direct-mode MCP servers.
  8. * - `SocketTransport` — wraps a single `net.Socket`. Used by the shared-daemon
  9. * architecture (see {@link ./daemon}) to multiplex multiple MCP clients onto
  10. * one CodeGraph instance via per-connection sessions.
  11. *
  12. * Both implement {@link JsonRpcTransport} so the session-level protocol logic
  13. * (initialize / tools/list / tools/call, plus server-initiated `roots/list`)
  14. * is identical regardless of where the bytes come from.
  15. */
  16. import * as readline from 'readline';
  17. import type { Socket } from 'net';
  18. /**
  19. * JSON-RPC 2.0 Request
  20. */
  21. export interface JsonRpcRequest {
  22. jsonrpc: '2.0';
  23. id: string | number;
  24. method: string;
  25. params?: unknown;
  26. }
  27. /**
  28. * JSON-RPC 2.0 Response
  29. */
  30. export interface JsonRpcResponse {
  31. jsonrpc: '2.0';
  32. id: string | number | null;
  33. result?: unknown;
  34. error?: JsonRpcError;
  35. }
  36. /**
  37. * JSON-RPC 2.0 Error
  38. */
  39. export interface JsonRpcError {
  40. code: number;
  41. message: string;
  42. data?: unknown;
  43. }
  44. /**
  45. * JSON-RPC 2.0 Notification (no id, no response expected)
  46. */
  47. export interface JsonRpcNotification {
  48. jsonrpc: '2.0';
  49. method: string;
  50. params?: unknown;
  51. }
  52. // Standard JSON-RPC error codes
  53. export const ErrorCodes = {
  54. ParseError: -32700,
  55. InvalidRequest: -32600,
  56. MethodNotFound: -32601,
  57. InvalidParams: -32602,
  58. InternalError: -32603,
  59. } as const;
  60. export type MessageHandler = (message: JsonRpcRequest | JsonRpcNotification) => Promise<void>;
  61. /**
  62. * Generic JSON-RPC transport interface — common surface for stdio and socket
  63. * carriers. Anything below the session layer (initialize, tool dispatch, etc.)
  64. * talks to this, not to a concrete transport class.
  65. */
  66. export interface JsonRpcTransport {
  67. start(handler: MessageHandler): void;
  68. stop(): void;
  69. send(response: JsonRpcResponse): void;
  70. notify(method: string, params?: unknown): void;
  71. request(method: string, params?: unknown, timeoutMs?: number): Promise<unknown>;
  72. sendResult(id: string | number, result: unknown): void;
  73. sendError(id: string | number | null, code: number, message: string, data?: unknown): void;
  74. }
  75. /**
  76. * Shared implementation of newline-delimited JSON-RPC 2.0 over any
  77. * `Readable`/`Writable` stream pair. Stdio and socket transports both wrap
  78. * this — the only difference between them is which streams get plugged in
  79. * and how a "close" propagates back to the owning code.
  80. */
  81. abstract class LineBasedJsonRpcTransport implements JsonRpcTransport {
  82. protected messageHandler: MessageHandler | null = null;
  83. // Outstanding server-initiated requests (e.g. roots/list), keyed by the id
  84. // we sent. Responses from the client are matched back here.
  85. protected pending = new Map<string | number, {
  86. resolve: (value: unknown) => void;
  87. reject: (error: Error) => void;
  88. }>();
  89. protected nextRequestId = 1;
  90. protected stopped = false;
  91. abstract start(handler: MessageHandler): void;
  92. protected abstract write(line: string): void;
  93. protected abstract idPrefix(): string;
  94. abstract stop(): void;
  95. /**
  96. * Send a server-initiated request to the client and await its response.
  97. *
  98. * MCP is bidirectional: the server can ask the client questions too. We use
  99. * this for `roots/list` — the spec-blessed way to learn the workspace root
  100. * when the client didn't pass one in `initialize` (see issue #196). Rejects
  101. * on timeout so callers can fall back rather than hang forever.
  102. */
  103. request(method: string, params?: unknown, timeoutMs = 5000): Promise<unknown> {
  104. const id = `${this.idPrefix()}-${this.nextRequestId++}`;
  105. return new Promise<unknown>((resolve, reject) => {
  106. const timer = setTimeout(() => {
  107. this.pending.delete(id);
  108. reject(new Error(`Timed out after ${timeoutMs}ms waiting for "${method}" response`));
  109. }, timeoutMs);
  110. // Don't let a pending request keep the process alive on shutdown.
  111. timer.unref?.();
  112. this.pending.set(id, {
  113. resolve: (value) => { clearTimeout(timer); resolve(value); },
  114. reject: (error) => { clearTimeout(timer); reject(error); },
  115. });
  116. this.write(JSON.stringify({ jsonrpc: '2.0', id, method, params }));
  117. });
  118. }
  119. send(response: JsonRpcResponse): void {
  120. this.write(JSON.stringify(response));
  121. }
  122. notify(method: string, params?: unknown): void {
  123. const notification: JsonRpcNotification = { jsonrpc: '2.0', method, params };
  124. this.write(JSON.stringify(notification));
  125. }
  126. sendResult(id: string | number, result: unknown): void {
  127. this.send({ jsonrpc: '2.0', id, result });
  128. }
  129. sendError(id: string | number | null, code: number, message: string, data?: unknown): void {
  130. this.send({ jsonrpc: '2.0', id, error: { code, message, data } });
  131. }
  132. /**
  133. * Fail any in-flight server-initiated requests so their awaiters don't hang.
  134. * Called from `stop()` in subclasses.
  135. */
  136. protected rejectPending(reason: string): void {
  137. for (const { reject } of this.pending.values()) {
  138. reject(new Error(reason));
  139. }
  140. this.pending.clear();
  141. }
  142. /**
  143. * Handle an incoming line of JSON. Both transports feed lines here.
  144. */
  145. protected async handleLine(line: string): Promise<void> {
  146. const trimmed = line.trim();
  147. if (!trimmed) return;
  148. let parsed: unknown;
  149. try {
  150. parsed = JSON.parse(trimmed);
  151. } catch {
  152. this.sendError(null, ErrorCodes.ParseError, 'Parse error: invalid JSON');
  153. return;
  154. }
  155. // Response to a server-initiated request (has id + result/error, no method).
  156. // Route it to the awaiting requester instead of the message handler — these
  157. // used to be dropped as "Invalid Request" because they carry no method.
  158. const obj = parsed as Record<string, unknown>;
  159. if (
  160. obj?.jsonrpc === '2.0' &&
  161. typeof obj.method !== 'string' &&
  162. 'id' in obj &&
  163. ('result' in obj || 'error' in obj)
  164. ) {
  165. this.handleResponse(obj);
  166. return;
  167. }
  168. // Validate basic JSON-RPC structure
  169. if (!this.isValidMessage(parsed)) {
  170. this.sendError(null, ErrorCodes.InvalidRequest, 'Invalid Request: not a valid JSON-RPC 2.0 message');
  171. return;
  172. }
  173. if (this.messageHandler) {
  174. try {
  175. await this.messageHandler(parsed as JsonRpcRequest | JsonRpcNotification);
  176. } catch (err) {
  177. const message = parsed as JsonRpcRequest;
  178. if ('id' in message) {
  179. this.sendError(
  180. message.id,
  181. ErrorCodes.InternalError,
  182. `Internal error: ${err instanceof Error ? err.message : String(err)}`
  183. );
  184. }
  185. }
  186. }
  187. }
  188. /**
  189. * Resolve (or reject) the pending server-initiated request matching this
  190. * response's id. Unknown ids are ignored — the client may echo something we
  191. * never sent, or a request may have already timed out.
  192. */
  193. private handleResponse(msg: Record<string, unknown>): void {
  194. const id = msg.id as string | number;
  195. const pending = this.pending.get(id);
  196. if (!pending) return;
  197. this.pending.delete(id);
  198. if ('error' in msg && msg.error) {
  199. const err = msg.error as { message?: string };
  200. pending.reject(new Error(err.message || 'Request failed'));
  201. } else {
  202. pending.resolve(msg.result);
  203. }
  204. }
  205. /**
  206. * Check if message is a valid JSON-RPC 2.0 message
  207. */
  208. private isValidMessage(msg: unknown): boolean {
  209. if (typeof msg !== 'object' || msg === null) return false;
  210. const obj = msg as Record<string, unknown>;
  211. if (obj.jsonrpc !== '2.0') return false;
  212. if (typeof obj.method !== 'string') return false;
  213. return true;
  214. }
  215. }
  216. export interface StdioTransportOptions {
  217. /**
  218. * If true, the transport calls `process.exit(0)` when stdin closes. Set to
  219. * `false` in shared-daemon mode where the stdio "session" is just *one* of
  220. * many clients — losing it shouldn't drag the daemon down. The default
  221. * (true) matches the original single-process behavior callers rely on.
  222. */
  223. exitOnClose?: boolean;
  224. /**
  225. * Optional callback fired when the stdin stream closes. The daemon uses
  226. * this to decrement its connected-clients refcount.
  227. */
  228. onClose?: () => void;
  229. }
  230. /**
  231. * Stdio Transport for MCP
  232. *
  233. * Reads JSON-RPC messages from stdin and writes responses to stdout. Used by
  234. * the direct (single-process) MCP server path, where the MCP host launches
  235. * one server per session and talks to it over the child's stdio. Also used by
  236. * shared-daemon mode for the launcher's session (with `exitOnClose: false`)
  237. * so the daemon outlives its launcher.
  238. */
  239. export class StdioTransport extends LineBasedJsonRpcTransport {
  240. private rl: readline.Interface | null = null;
  241. private opts: Required<StdioTransportOptions>;
  242. constructor(opts: StdioTransportOptions = {}) {
  243. super();
  244. this.opts = {
  245. exitOnClose: opts.exitOnClose ?? true,
  246. onClose: opts.onClose ?? (() => { /* no-op */ }),
  247. };
  248. }
  249. start(handler: MessageHandler): void {
  250. this.messageHandler = handler;
  251. this.rl = readline.createInterface({
  252. input: process.stdin,
  253. output: process.stdout,
  254. terminal: false,
  255. });
  256. this.rl.on('line', async (line) => {
  257. await this.handleLine(line);
  258. });
  259. this.rl.on('close', () => {
  260. this.opts.onClose();
  261. if (this.opts.exitOnClose) {
  262. process.exit(0);
  263. }
  264. });
  265. }
  266. stop(): void {
  267. if (this.stopped) return;
  268. this.stopped = true;
  269. this.rejectPending('Transport stopped');
  270. if (this.rl) {
  271. this.rl.close();
  272. this.rl = null;
  273. }
  274. }
  275. protected write(line: string): void {
  276. process.stdout.write(line + '\n');
  277. }
  278. protected idPrefix(): string {
  279. return 'cg-srv';
  280. }
  281. }
  282. /**
  283. * Socket Transport for MCP daemon sessions.
  284. *
  285. * Wraps a single `net.Socket` (Unix domain socket on POSIX, named pipe on
  286. * Windows). One instance per connected MCP client. Unlike {@link StdioTransport},
  287. * `stop()` and stream-close *don't* call `process.exit` — a daemon-side session
  288. * ending must not bring down the whole daemon.
  289. */
  290. export class SocketTransport extends LineBasedJsonRpcTransport {
  291. private buffer = '';
  292. private closeHandlers: Array<() => void> = [];
  293. constructor(private socket: Socket, private prefix: string = 'cg-sock') {
  294. super();
  295. }
  296. /**
  297. * Register a callback fired exactly once when the socket closes (from either
  298. * side). Used by the daemon to decrement its connected-clients refcount.
  299. */
  300. onClose(handler: () => void): void {
  301. this.closeHandlers.push(handler);
  302. }
  303. start(handler: MessageHandler): void {
  304. this.messageHandler = handler;
  305. this.socket.setEncoding('utf8');
  306. this.socket.on('data', (chunk: string) => {
  307. this.buffer += chunk;
  308. let idx;
  309. // Drain every complete line; tail-fragment stays in the buffer for the
  310. // next chunk. The handler is async but we don't await it here — JSON-RPC
  311. // permits out-of-order responses, and serializing here would deadlock if
  312. // a handler issued a server-initiated request that needed a *later* line
  313. // to arrive (e.g. roots/list mid-tools-call).
  314. while ((idx = this.buffer.indexOf('\n')) !== -1) {
  315. const line = this.buffer.slice(0, idx);
  316. this.buffer = this.buffer.slice(idx + 1);
  317. void this.handleLine(line);
  318. }
  319. });
  320. this.socket.on('close', () => this.handleSocketClose());
  321. this.socket.on('error', (err) => {
  322. // Don't crash the daemon over a broken pipe; just shut this connection.
  323. process.stderr.write(`[CodeGraph daemon] socket error: ${err.message}\n`);
  324. this.handleSocketClose();
  325. });
  326. }
  327. stop(): void {
  328. if (this.stopped) return;
  329. this.stopped = true;
  330. this.rejectPending('Transport stopped');
  331. if (!this.socket.destroyed) {
  332. this.socket.end();
  333. this.socket.destroy();
  334. }
  335. }
  336. /**
  337. * Write a one-shot line directly to the socket (no JSON-RPC framing applied
  338. * by this class — caller produces the line). The daemon uses this for the
  339. * hello/handshake line that precedes the JSON-RPC stream.
  340. */
  341. writeRaw(line: string): void {
  342. if (!this.socket.destroyed) {
  343. this.socket.write(line.endsWith('\n') ? line : line + '\n');
  344. }
  345. }
  346. protected write(line: string): void {
  347. if (!this.socket.destroyed) {
  348. this.socket.write(line + '\n');
  349. }
  350. }
  351. protected idPrefix(): string {
  352. return this.prefix;
  353. }
  354. private handleSocketClose(): void {
  355. if (this.stopped) return;
  356. this.stopped = true;
  357. this.rejectPending('Socket closed');
  358. for (const h of this.closeHandlers) {
  359. try { h(); } catch { /* never let a close-handler take the daemon down */ }
  360. }
  361. this.closeHandlers = [];
  362. }
  363. }