فهرست منبع

feat(mcp): worker-thread liveness watchdog to self-kill a wedged main thread (#856)

Belt-and-suspenders follow-up to #855. Any non-yielding sync loop on the main
thread wedges the event loop, and nothing running on that loop (timers, signal
handlers, PPID watchdog) can recover it — only another thread can.

A tiny worker thread (in the detached daemon + direct modes) watches a
shared-memory heartbeat the main thread bumps each event-loop turn; if it stops
advancing across enough consecutive checks (~CODEGRAPH_WATCHDOG_TIMEOUT_MS,
default 60s) the worker SIGKILLs the process so a fresh daemon starts on the next
connection. Counts consecutive stale checks (not wall-clock) so it's immune to
clock jumps / sleep; tuned never to fire on real work; opt out with
CODEGRAPH_NO_WATCHDOG=1.
Colby Mchenry 1 هفته پیش
والد
کامیت
576149e062
4فایلهای تغییر یافته به همراه340 افزوده شده و 0 حذف شده
  1. 4 0
      CHANGELOG.md
  2. 148 0
      __tests__/liveness-watchdog.test.ts
  3. 13 0
      src/mcp/index.ts
  4. 175 0
      src/mcp/liveness-watchdog.ts

+ 4 - 0
CHANGELOG.md

@@ -9,6 +9,10 @@ and adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
 
 ## [Unreleased]
 
+### New Features
+
+- The CodeGraph MCP server now self-heals if its main thread ever locks up. A lightweight watchdog notices when the process has stopped responding and stops it so a fresh one starts on your next request — it can no longer sit pinned at 100% CPU with no way to recover. Tune the detection window with `CODEGRAPH_WATCHDOG_TIMEOUT_MS`, or turn it off entirely with `CODEGRAPH_NO_WATCHDOG=1`. (#850)
+
 ### Fixes
 
 - The CodeGraph MCP server no longer risks getting stuck at 100% CPU after an unexpected internal error. Previously such an error was logged but the process was left running in a broken state, where it could spin a CPU core indefinitely and had to be killed by hand. The server now logs the error and exits cleanly, so a fresh one starts on the next request. Thanks @songhlc. (#850)

+ 148 - 0
__tests__/liveness-watchdog.test.ts

@@ -0,0 +1,148 @@
+import { describe, it, expect, beforeAll } from 'vitest';
+import { spawn } from 'child_process';
+import * as fs from 'fs';
+import * as path from 'path';
+import {
+  stepHeartbeat,
+  parseWatchdogTimeoutMs,
+  deriveCheckIntervalMs,
+  installMainThreadWatchdog,
+  DEFAULT_WATCHDOG_TIMEOUT_MS,
+} from '../src/mcp/liveness-watchdog';
+
+describe('stepHeartbeat (wedge-detection reducer)', () => {
+  it('resets the stale count when the counter advances', () => {
+    const r = stepHeartbeat({ lastCounter: 5, staleChecks: 3 }, 6, 4);
+    expect(r.wedged).toBe(false);
+    expect(r.next).toEqual({ lastCounter: 6, staleChecks: 0 });
+  });
+
+  it('accumulates stale checks while the counter is frozen', () => {
+    let s = { lastCounter: 9, staleChecks: 0 };
+    for (let i = 1; i < 4; i++) {
+      const r = stepHeartbeat(s, 9, 4);
+      expect(r.wedged).toBe(false);
+      expect(r.next.staleChecks).toBe(i);
+      s = r.next;
+    }
+  });
+
+  it('reports wedged once the stale count reaches the threshold', () => {
+    const r = stepHeartbeat({ lastCounter: 9, staleChecks: 3 }, 9, 4);
+    expect(r.wedged).toBe(true);
+  });
+
+  it('a single late heartbeat rescues the process (sleep/clock-jump safety)', () => {
+    // 3 stale checks, then progress (as if the main thread resumed after a
+    // system sleep) — must NOT be considered wedged.
+    let s = { lastCounter: 1, staleChecks: 0 };
+    s = stepHeartbeat(s, 1, 4).next; // stale 1
+    s = stepHeartbeat(s, 1, 4).next; // stale 2
+    s = stepHeartbeat(s, 1, 4).next; // stale 3
+    const resumed = stepHeartbeat(s, 2, 4); // counter advanced
+    expect(resumed.wedged).toBe(false);
+    expect(resumed.next.staleChecks).toBe(0);
+  });
+});
+
+describe('config parsing', () => {
+  it('parseWatchdogTimeoutMs falls back for missing/invalid input', () => {
+    expect(parseWatchdogTimeoutMs(undefined)).toBe(DEFAULT_WATCHDOG_TIMEOUT_MS);
+    expect(parseWatchdogTimeoutMs('not-a-number')).toBe(DEFAULT_WATCHDOG_TIMEOUT_MS);
+    expect(parseWatchdogTimeoutMs('0')).toBe(DEFAULT_WATCHDOG_TIMEOUT_MS);
+    expect(parseWatchdogTimeoutMs('-5')).toBe(DEFAULT_WATCHDOG_TIMEOUT_MS);
+    expect(parseWatchdogTimeoutMs('1500')).toBe(1500);
+  });
+
+  it('deriveCheckIntervalMs stays within [50, 2000] and scales with the timeout', () => {
+    expect(deriveCheckIntervalMs(60_000)).toBe(2000); // clamped high
+    expect(deriveCheckIntervalMs(500)).toBe(100); // 500/5
+    expect(deriveCheckIntervalMs(10)).toBe(50); // clamped low
+  });
+});
+
+describe('installMainThreadWatchdog opt-out', () => {
+  it('returns null (no worker) when CODEGRAPH_NO_WATCHDOG is set', () => {
+    const prev = process.env.CODEGRAPH_NO_WATCHDOG;
+    process.env.CODEGRAPH_NO_WATCHDOG = '1';
+    try {
+      expect(installMainThreadWatchdog()).toBeNull();
+    } finally {
+      if (prev === undefined) delete process.env.CODEGRAPH_NO_WATCHDOG;
+      else process.env.CODEGRAPH_NO_WATCHDOG = prev;
+    }
+  });
+});
+
+/**
+ * End-to-end: spawn a real process, install the real worker, and prove it kills
+ * a wedged main thread (and ONLY a wedged one). Drives the built module the same
+ * way mcp-ppid-watchdog.test.ts drives the built CLI.
+ */
+describe('liveness watchdog (spawned, real worker)', () => {
+  const MODULE = path.resolve(__dirname, '../dist/mcp/liveness-watchdog.js');
+
+  beforeAll(() => {
+    if (!fs.existsSync(MODULE)) {
+      throw new Error(`Build the project first: ${MODULE} is missing (run npm run build).`);
+    }
+  });
+
+  function runChild(
+    env: Record<string, string>,
+    body: string,
+    hardTimeoutMs: number
+  ): Promise<{ code: number | null; signal: NodeJS.Signals | 'TIMEOUT' | null }> {
+    const src = `
+      const { installMainThreadWatchdog } = require(${JSON.stringify(MODULE)});
+      installMainThreadWatchdog();
+      ${body}
+    `;
+    const child = spawn(process.execPath, ['-e', src], {
+      env: { ...process.env, ...env },
+      stdio: ['ignore', 'ignore', 'ignore'],
+    });
+    return new Promise((resolve) => {
+      const timer = setTimeout(() => {
+        child.kill('SIGKILL');
+        resolve({ code: null, signal: 'TIMEOUT' });
+      }, hardTimeoutMs);
+      child.on('exit', (code, signal) => {
+        clearTimeout(timer);
+        resolve({ code, signal });
+      });
+    });
+  }
+
+  it('SIGKILLs a process whose main thread wedges in a sync loop', async () => {
+    const { signal } = await runChild(
+      { CODEGRAPH_WATCHDOG_TIMEOUT_MS: '500' },
+      'setTimeout(() => { while (true) {} }, 150);', // wedge the event loop forever
+      8000
+    );
+    expect(signal).toBe('SIGKILL');
+  }, 12000);
+
+  it('does NOT kill a healthy process that keeps its event loop turning', async () => {
+    const { code, signal } = await runChild(
+      { CODEGRAPH_WATCHDOG_TIMEOUT_MS: '500' },
+      // Stay responsive for 1.5s (3× the timeout), then exit cleanly with 7.
+      'const iv = setInterval(() => {}, 50); setTimeout(() => { clearInterval(iv); process.exit(7); }, 1500);',
+      8000
+    );
+    expect(signal).toBeNull(); // never signalled
+    expect(code).toBe(7); // exited on its own terms
+  }, 12000);
+
+  it('does NOT kill a wedged process when CODEGRAPH_NO_WATCHDOG=1', async () => {
+    const { signal } = await runChild(
+      { CODEGRAPH_WATCHDOG_TIMEOUT_MS: '500', CODEGRAPH_NO_WATCHDOG: '1' },
+      // Wedge briefly, but the test's hard timeout reaps it (the watchdog must not).
+      'setTimeout(() => { const end = Date.now() + 1500; while (Date.now() < end) {} process.exit(3); }, 150);',
+      8000
+    );
+    // Killed by neither the watchdog (disabled) nor the hard timeout — it ran
+    // its bounded busy-loop and exited 3 on its own.
+    expect(signal).toBeNull();
+  }, 12000);
+});

+ 13 - 0
src/mcp/index.ts

@@ -51,6 +51,7 @@ import { connectWithHello, runLocalHandshakeProxy } from './proxy';
 import { getDaemonSocketPath } from './daemon-paths';
 import { getTelemetry } from '../telemetry';
 import { supervisionLostReason } from './ppid-watchdog';
+import { installMainThreadWatchdog, WatchdogHandle } from './liveness-watchdog';
 import { treatStdinFailureAsShutdown } from './stdin-teardown';
 import { HOST_PPID_ENV } from '../extraction/wasm-runtime-flags';
 
@@ -220,6 +221,9 @@ export class MCPServer {
   private engine: MCPEngine | null = null;
   private daemon: Daemon | null = null;
   private ppidWatchdog: ReturnType<typeof setInterval> | null = null;
+  // Worker-thread liveness watchdog (#850). Long-lived modes only; SIGKILLs the
+  // process if the main thread wedges in a non-yielding sync loop.
+  private livenessWatchdog: WatchdogHandle | null = null;
   // PPID watchdog baseline — captured at construction so we always have a
   // baseline, even if start() runs after a fork-style reparent.
   private originalPpid: number = process.ppid;
@@ -300,6 +304,10 @@ export class MCPServer {
       clearInterval(this.ppidWatchdog);
       this.ppidWatchdog = null;
     }
+    if (this.livenessWatchdog) {
+      this.livenessWatchdog.stop();
+      this.livenessWatchdog = null;
+    }
     if (this.daemon) {
       void this.daemon.stop('stop()');
       // Daemon.stop calls process.exit; nothing else to do.
@@ -345,6 +353,7 @@ export class MCPServer {
     this.mode = 'direct';
     this.installSignalHandlers();
     this.installPpidWatchdog();
+    this.livenessWatchdog = installMainThreadWatchdog();
   }
 
   /**
@@ -366,6 +375,10 @@ export class MCPServer {
         await daemon.start();
         this.daemon = daemon;
         this.mode = 'daemon';
+        // The detached daemon has no PPID watchdog or stdin lifeline, so a
+        // wedged main thread would pin a core forever (#850). The liveness
+        // watchdog is its only recovery path.
+        this.livenessWatchdog = installMainThreadWatchdog();
         return; // the net.Server keeps the process alive
       }
 

+ 175 - 0
src/mcp/liveness-watchdog.ts

@@ -0,0 +1,175 @@
+/**
+ * Main-thread liveness watchdog — belt-and-suspenders for #850.
+ *
+ * The #850 fix removes the one *known* trigger (the uncaught-exception handler
+ * no longer formats a raw Error's `.stack` — the step that could enter a
+ * non-terminating V8 source-position loop). But ANY synchronous, non-yielding
+ * loop on the main thread — a future V8 stack-format pathology, a runaway
+ * regex, an accidental `while (true)` — wedges the event loop, and from JS you
+ * cannot interrupt it: timers, signal handlers, and the PPID watchdog all run
+ * *on* that blocked loop, so the process pins a core forever with no
+ * self-recovery (the exact unrecoverable state #850 reported).
+ *
+ * The only observer still running when the main thread is wedged is another
+ * THREAD. This installs a tiny worker thread that watches a heartbeat the main
+ * thread bumps through shared memory. If the heartbeat stops advancing across
+ * enough consecutive checks (~`timeoutMs` of real time), the worker concludes
+ * the main thread is wedged and SIGKILLs the process — the one signal a wedged
+ * event loop can't swallow — so a fresh daemon starts on the next connection
+ * instead of a zombie pinning a core.
+ *
+ * **Why count checks, not elapsed wall-clock.** A laptop that sleeps freezes
+ * both threads; on wake `Date.now()` has jumped hours but the heartbeat sat
+ * still — a wall-clock delta would false-positive and kill a perfectly healthy
+ * daemon. Counting *consecutive worker iterations* with no progress is immune:
+ * a healthy main thread resumes and bumps the heartbeat within one interval of
+ * waking, resetting the count; only a thread that never resumes keeps it
+ * climbing. {@link stepHeartbeat} is the pure reducer behind both the worker
+ * and the unit tests.
+ *
+ * **Why it won't fire on real work.** Heavy parsing runs in the parse worker
+ * (off this thread) and indexing shells out to a child process, so the daemon's
+ * main thread only ever does fast, bounded work (socket handling + sub-second
+ * SQLite reads). The default timeout is therefore vastly larger than any
+ * legitimate main-thread block yet vastly smaller than "forever". Opt out with
+ * `CODEGRAPH_NO_WATCHDOG=1`; tune with `CODEGRAPH_WATCHDOG_TIMEOUT_MS`.
+ */
+import { Worker } from 'worker_threads';
+
+/** Default: 60s — ~300× shorter than the 5h #850 wedge, far longer than any real main-thread block. */
+export const DEFAULT_WATCHDOG_TIMEOUT_MS = 60_000;
+
+export interface HeartbeatState {
+  /** Last heartbeat counter the worker observed. */
+  lastCounter: number;
+  /** Consecutive checks the counter has NOT advanced. */
+  staleChecks: number;
+}
+
+/**
+ * Pure reducer for one worker check. `maxStaleChecks` consecutive no-progress
+ * checks → wedged. Counting iterations (not wall-clock) is what makes this
+ * robust to clock jumps / system sleep.
+ */
+export function stepHeartbeat(
+  state: HeartbeatState,
+  counter: number,
+  maxStaleChecks: number
+): { next: HeartbeatState; wedged: boolean } {
+  if (counter !== state.lastCounter) {
+    return { next: { lastCounter: counter, staleChecks: 0 }, wedged: false };
+  }
+  const staleChecks = state.staleChecks + 1;
+  return {
+    next: { lastCounter: counter, staleChecks },
+    wedged: staleChecks >= maxStaleChecks,
+  };
+}
+
+/** `true` for `1/true/yes/on` (case-insensitive); `false` otherwise. */
+function isEnvTruthy(raw: string | undefined): boolean {
+  if (!raw) return false;
+  return ['1', 'true', 'yes', 'on'].includes(raw.trim().toLowerCase());
+}
+
+/** Parse the timeout env, falling back to the default for missing/invalid values. */
+export function parseWatchdogTimeoutMs(
+  raw: string | undefined,
+  fallback: number = DEFAULT_WATCHDOG_TIMEOUT_MS
+): number {
+  if (raw === undefined) return fallback;
+  const n = Number(raw);
+  return Number.isFinite(n) && n > 0 ? n : fallback;
+}
+
+/** Derive a heartbeat/check cadence that fires several times inside the timeout window. */
+export function deriveCheckIntervalMs(timeoutMs: number): number {
+  return Math.min(2000, Math.max(50, Math.round(timeoutMs / 5)));
+}
+
+export interface WatchdogHandle {
+  /** Stop heartbeating and terminate the worker. Idempotent. */
+  stop(): void;
+}
+
+/**
+ * The worker body, run via `new Worker(src, { eval: true })`. Inlined as a
+ * string (not a shipped `.js`) so there is no dist-vs-src path to resolve — it
+ * runs identically under `tsx` in tests and under the bundle in production.
+ * Mirrors {@link stepHeartbeat}; keep the two in sync (the unit test pins the
+ * algorithm, the integration test pins this exact body end-to-end).
+ */
+const WORKER_SOURCE = `
+const { workerData } = require('worker_threads');
+const fs = require('fs');
+const beat = new Int32Array(workerData.sab);
+const { checkMs, maxStaleChecks } = workerData;
+let lastCounter = Atomics.load(beat, 0);
+let staleChecks = 0;
+const timer = setInterval(() => {
+  const counter = Atomics.load(beat, 0);
+  if (counter !== lastCounter) { lastCounter = counter; staleChecks = 0; return; }
+  if (++staleChecks < maxStaleChecks) return;
+  clearInterval(timer);
+  const secs = Math.round((staleChecks * checkMs) / 1000);
+  try {
+    fs.writeSync(2, '[CodeGraph] Main thread unresponsive for ~' + secs + 's — killing the wedged process so a fresh one can start (#850). Disable with CODEGRAPH_NO_WATCHDOG=1.\\n');
+  } catch (e) { /* stderr gone */ }
+  try { process.kill(process.pid, 'SIGKILL'); } catch (e) { /* nothing left to try */ }
+}, checkMs);
+`;
+
+/**
+ * Install the main-thread liveness watchdog for a long-lived process. Returns a
+ * handle to stop it, or `null` when disabled or when the worker can't be
+ * spawned (degraded, never throws — a missing watchdog must never keep a
+ * process from starting).
+ */
+export function installMainThreadWatchdog(): WatchdogHandle | null {
+  if (isEnvTruthy(process.env.CODEGRAPH_NO_WATCHDOG)) return null;
+
+  const timeoutMs = parseWatchdogTimeoutMs(process.env.CODEGRAPH_WATCHDOG_TIMEOUT_MS);
+  const checkMs = deriveCheckIntervalMs(timeoutMs);
+  const maxStaleChecks = Math.max(1, Math.ceil(timeoutMs / checkMs));
+
+  // Single Int32 counter in shared memory. The main thread bumps it each tick;
+  // the worker reads it. Atomics make the write visible across threads.
+  const sab = new SharedArrayBuffer(Int32Array.BYTES_PER_ELEMENT);
+  const beat = new Int32Array(sab);
+
+  // The heartbeat: firing at all means the event loop is turning. unref'd so it
+  // never keeps the process alive on its own (the server's socket does that).
+  const heartbeat = setInterval(() => {
+    Atomics.add(beat, 0, 1);
+  }, checkMs);
+  heartbeat.unref();
+
+  let worker: Worker;
+  try {
+    worker = new Worker(WORKER_SOURCE, {
+      eval: true,
+      workerData: { sab, checkMs, maxStaleChecks },
+    });
+  } catch {
+    // Worker threads unavailable — fall back to no watchdog rather than refuse
+    // to start. Degraded (a future wedge wouldn't self-kill) but not broken.
+    clearInterval(heartbeat);
+    return null;
+  }
+
+  // A watchdog-worker error must never escalate to the global handler (which now
+  // exits, #850): swallow it and run degraded.
+  worker.on('error', () => { /* watchdog gone; nothing safe to do here */ });
+  // Don't let the watchdog keep the process alive past its real work.
+  worker.unref();
+
+  let stopped = false;
+  return {
+    stop(): void {
+      if (stopped) return;
+      stopped = true;
+      clearInterval(heartbeat);
+      void worker.terminate();
+    },
+  };
+}