Procházet zdrojové kódy

feat(extraction): parallelize indexing across a parse worker pool (#1015) (#1025)

indexAll parsed every file through a single worker thread, so a full `codegraph index` used one core no matter the machine. Add ParseWorkerPool (src/extraction/parse-pool.ts), modeled on the shipped QueryPool: indexAll now parses across clamp(cores-1,1,8) workers. CODEGRAPH_PARSE_WORKERS overrides the count; 1 reproduces the previous single-worker path exactly (the rollback).

Parses run concurrently but results commit to SQLite in file order. This matters: the post-index resolution phase selects among ambiguous same-named candidates by node DB-insertion order, so a stable commit order keeps the graph deterministic — byte-identical to the serial path — instead of drifting with parse-completion timing. A bounded reorder buffer (backpressure on dispatched-but-uncommitted count) keeps memory flat even if a file is slow at the commit cursor.

Crash/timeout of a worker rejects only that file's parse (feeding the existing retry pass) and respawns; per-worker recycle every 250 parses reclaims WASM heap. In-process fallback unchanged when the compiled worker is absent (tests).

Validated on real OSS (django +9%, redis +17%; modest and parse-fraction-dependent), graph byte-identical across worker counts, peak memory flat-to-lower since workers recycle independently — so the #320 OOM concern doesn't materialize. Adds 11 pool unit tests.

Closes #1015. Refs #320.

Co-authored-by: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Colby Mchenry před 18 hodinami
rodič
revize
9716fb27ae
4 změnil soubory, kde provedl 708 přidání a 226 odebrání
  1. 1 0
      CHANGELOG.md
  2. 178 0
      __tests__/parse-pool.test.ts
  3. 169 226
      src/extraction/index.ts
  4. 360 0
      src/extraction/parse-pool.ts

+ 1 - 0
CHANGELOG.md

@@ -14,6 +14,7 @@ and adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
 - You can now exclude committed directories from the index with an `exclude` list in `codegraph.json` — even when they're git-tracked. `.gitignore` can't drop a directory git already tracks, so a vendored theme or SDK that's checked into your repo (a committed Metronic theme under `static/`, a bundled vendor library) had no supported way to be kept out — it just bloated the graph and slowed indexing. Add a root `codegraph.json` with, e.g., `{ "exclude": ["static/", "**/vendor/**"] }` and those paths are skipped on indexing, sync, and file-watching, on both git and non-git projects. Patterns are gitignore-style and matched against repo-root-relative paths. This complements the existing `includeIgnored` (its opposite — opt *in* to gitignored embedded repos). (#999)
 - CodeGraph now follows C/C++ commands that are dispatched through macro-built function-pointer tables, so the handler functions they reach are no longer dead-ends in the graph. Many C projects register a handler into a struct's function-pointer field through a macro and a generated table — redis is the classic case: every command (`getCommand`, `decrbyCommand`, …) is wired into the command struct's `proc` field by a `MAKE_CMD(…)` table that lives in a generated, `#include`-d file, then invoked as `c->cmd->proc(c)`. CodeGraph now reads those macro-built tables — including ones whose struct type is itself a macro alias, whose table sits in an `#include`-d file that is never indexed on its own, or that are wrapped in conditional compilation (`#ifdef`) and defined inline with the struct. It recognizes function-pointer fields declared through a function typedef, and follows the receiver — a chained access (`c->cmd->proc`) or an array subscript through a file-scope table (`(cmdnames[i].cmd_func)(…)`) — across field types. It also follows dispatch through a bare array of function pointers with no struct wrapper at all — the opcode/handler-table pattern common in interpreters and emulators, where a table like `opcodes[op](…)` invokes one of many registered handler functions by index — linking the dispatcher to every handler in the array. The upshot: asking for the callers or blast radius of a command handler now finds the dispatcher that reaches it. For redis, `call` shows up as a caller of every command; for SQLite, the builtin SQL functions registered through `FUNCTION(...)` link to where they're invoked; for Vim, every `:ex` and normal-mode command links from the dispatcher. (#991, extending #932)
 - CodeGraph no longer times out when many agents query it at once. The shared background server that serves all your editor and agent sessions used to run every query on a single thread, so a burst of concurrent requests — for example a swarm of subagents exploring a large monorepo together — queued up behind one another and, while the heavy ones ran, froze the connection so finished answers couldn't even be sent back until the whole batch drained. Past a handful of simultaneous callers that routinely surfaced as MCP request timeouts. The shared server now answers queries across a pool of worker threads, so concurrent requests run in parallel and the connection stays responsive the whole time; when it's genuinely saturated a call returns a brief "busy, retry shortly" note (not an error) instead of hanging past your client's timeout. The pool sizes itself to your machine — roughly one worker per core, leaving one for coordination — and a single editor session is unaffected (no pool, no overhead). Set `CODEGRAPH_QUERY_POOL_SIZE` to choose a specific number of workers, or `0` to revert to single-threaded in-process queries.
+- Indexing now parses files across multiple CPU cores instead of one, so building a project's graph — `codegraph index`, the first index of a project, and the background re-index after changes — is faster on multi-core machines, most noticeably on large or parse-heavy codebases. The graph it produces is identical to before and re-indexing stays deterministic: parsing runs in parallel, but results are still committed in a fixed order, so the same project always yields the same graph. CodeGraph sizes the pool to your machine automatically (leaving a core free for everything else); set `CODEGRAPH_PARSE_WORKERS` to choose a specific number of parse workers, or `CODEGRAPH_PARSE_WORKERS=1` to restore the previous single-core behavior. Peak memory is unchanged — workers reclaim parser memory independently, so it doesn't grow with the number of cores. (#1015, #320)
 - When CodeGraph's MCP server runs with no default project of its own — started outside any repository (for example behind an MCP gateway), or at a monorepo root whose indexes live in sub-projects — it now marks `projectPath` as a required argument on every tool call. Before, `projectPath` was always optional, so an agent talking to such a server would often omit it, get back guidance to pass it, and not reliably retry — you had to nudge it by hand every time. Now the requirement is part of the tool definition the agent sees, so it supplies the path to the project it's working on the first time. When the server does have a default project — the normal case, launched inside your repo — `projectPath` stays optional and a call without it falls back to that project exactly as before. Thanks @wauxhall for the report. (#993)
 
 ### Fixes

+ 178 - 0
__tests__/parse-pool.test.ts

@@ -0,0 +1,178 @@
+/**
+ * ParseWorkerPool — the worker pool that parses files across cores during a full
+ * `codegraph index` (issue #1015). These tests drive the pool's queue / growth /
+ * recycle / crash-recovery / timeout / teardown logic with INJECTED fake
+ * workers, so they exercise the real scheduling code without spawning threads or
+ * needing a built dist.
+ *
+ * End-to-end behavior with real worker threads (each worker owns a tree-sitter
+ * WASM heap and runs extractFromSource) is covered by the extraction suite
+ * against a real temp project; here we pin the orchestration that makes the
+ * parallelism safe.
+ */
+import { describe, it, expect } from 'vitest';
+import { ParseWorkerPool, resolveParsePoolSize, type ParsePoolWorker, type ParseTask } from '../src/extraction/parse-pool';
+import type { Language, ExtractionResult } from '../src/types';
+
+const sleep = (ms: number) => new Promise((r) => setTimeout(r, ms));
+
+interface ParseMsg { type: 'parse'; id: number; filePath: string; content: string; language: Language }
+type Action = { result: ExtractionResult } | { crash: true } | { hang: true } | { wait: Promise<ExtractionResult> };
+
+/**
+ * Fake worker speaking the same {load-grammars → grammars-loaded} /
+ * {parse → parse-result} protocol as the real parse-worker. `behavior` decides
+ * per parse whether to return a result, crash (exit≠0), hang (never reply —
+ * exercises the timeout), or wait on a promise (hold a parse in-flight to
+ * observe concurrency). Emits 'grammars-loaded' on a macrotask so the pool has
+ * wired its listeners first.
+ */
+class FakeWorker implements ParsePoolWorker {
+  private msgCb?: (m: unknown) => void;
+  private exitCb?: (code: number) => void;
+  alive = true;
+  constructor(private behavior: (m: ParseMsg) => Action, private onTerminate?: () => void) {}
+  on(event: string, cb: (...args: any[]) => void): void {
+    if (event === 'message') this.msgCb = cb;
+    else if (event === 'exit') this.exitCb = cb;
+    // 'error' unused by the fakes
+  }
+  private reply(id: number, result: ExtractionResult): void {
+    if (this.alive) this.msgCb?.({ type: 'parse-result', id, result });
+  }
+  postMessage(msg: unknown): void {
+    const m = msg as { type: string } & Partial<ParseMsg>;
+    if (m.type === 'load-grammars') {
+      setTimeout(() => { if (this.alive) this.msgCb?.({ type: 'grammars-loaded' }); }, 0);
+      return;
+    }
+    if (m.type !== 'parse') return;
+    const action = this.behavior(m as ParseMsg);
+    if ('crash' in action) {
+      this.alive = false;
+      setTimeout(() => this.exitCb?.(1), 0); // simulate a WASM-OOM exit(1)
+      return;
+    }
+    if ('hang' in action) return; // never reply → timeout path
+    if ('wait' in action) { void action.wait.then((r) => this.reply(m.id!, r)); return; }
+    setTimeout(() => this.reply(m.id!, action.result), 0);
+  }
+  terminate(): Promise<number> { this.alive = false; this.onTerminate?.(); return Promise.resolve(0); }
+}
+
+const task = (filePath: string, content = 'code'): ParseTask => ({ filePath, content, language: 'typescript' as Language });
+const result = (tag = 0): ExtractionResult => ({ nodes: [], edges: [], unresolvedReferences: [], errors: [], durationMs: tag });
+
+/** Build a pool with a counting fake-worker factory. */
+function makePool(
+  size: number,
+  behavior: (m: ParseMsg) => Action,
+  opts: Partial<{ recycleInterval: number; parseTimeoutMs: number }> = {}
+) {
+  let spawned = 0, terminated = 0;
+  const pool = new ParseWorkerPool({
+    languages: ['typescript'] as Language[],
+    size,
+    recycleInterval: opts.recycleInterval,
+    parseTimeoutMs: opts.parseTimeoutMs,
+    createWorker: () => { spawned++; return new FakeWorker(behavior, () => { terminated++; }); },
+  });
+  return { pool, counts: () => ({ spawned, terminated }) };
+}
+
+describe('resolveParsePoolSize', () => {
+  it('treats explicit 0 and 1 as a single worker (the rollback path)', () => {
+    expect(resolveParsePoolSize('0', 8)).toBe(1);
+    expect(resolveParsePoolSize('1', 8)).toBe(1);
+  });
+  it('honors a numeric override, capped at the hard ceiling', () => {
+    expect(resolveParsePoolSize('4', 8)).toBe(4);
+    expect(resolveParsePoolSize('999', 8)).toBe(16);
+  });
+  it('defaults to clamp(cores-1, 1, 8) when unset/blank/non-numeric', () => {
+    expect(resolveParsePoolSize(undefined, 8)).toBe(7);
+    expect(resolveParsePoolSize('', 8)).toBe(7);
+    expect(resolveParsePoolSize('abc', 8)).toBe(7);
+    expect(resolveParsePoolSize(undefined, 1)).toBe(1);   // never zero
+    expect(resolveParsePoolSize(undefined, 2)).toBe(1);   // leave a core
+    expect(resolveParsePoolSize(undefined, 64)).toBe(8);  // never above the default cap
+  });
+});
+
+describe('ParseWorkerPool', () => {
+  it('parses a file and returns the worker result', async () => {
+    const { pool } = makePool(1, () => ({ result: result(42) }));
+    const res = await pool.requestParse(task('a.ts'));
+    expect(res.durationMs).toBe(42);
+    await pool.destroy();
+  });
+
+  it('runs N parses in parallel across the pool (not serialized)', async () => {
+    let active = 0, maxActive = 0;
+    let release!: () => void;
+    const gate = new Promise<void>((r) => { release = r; });
+    const { pool } = makePool(4, () => ({
+      wait: (async () => { active++; maxActive = Math.max(maxActive, active); await gate; active--; return result(); })(),
+    }));
+    const ps = [0, 1, 2, 3].map((i) => pool.requestParse(task(`f${i}.ts`)));
+    await sleep(60); // let the pool grow to size and dispatch all four
+    expect(maxActive).toBe(4);
+    release();
+    await Promise.all(ps);
+    await pool.destroy();
+  });
+
+  it('grows lazily — a single parse does not spawn the whole pool', async () => {
+    const { pool, counts } = makePool(8, () => ({ result: result() }));
+    await pool.requestParse(task('only.ts'));
+    expect(counts().spawned).toBe(1); // just the eager warm worker
+    await pool.destroy();
+  });
+
+  it('recycles a worker after recycleInterval parses', async () => {
+    const { pool, counts } = makePool(1, () => ({ result: result() }), { recycleInterval: 3 });
+    for (let i = 0; i < 4; i++) await pool.requestParse(task(`f${i}.ts`));
+    // 3 parses on the first worker → recycle (terminate + respawn); the 4th runs
+    // on the fresh worker.
+    expect(counts().spawned).toBe(2);
+    expect(counts().terminated).toBeGreaterThanOrEqual(1);
+    await pool.destroy();
+  });
+
+  it('rejects a parse whose worker crashes (retry-pass-recognisable message) and keeps serving', async () => {
+    const { pool, counts } = makePool(1, (m) => (m.filePath === 'poison.ts' ? { crash: true } : { result: result(7) }));
+    // The message must contain "Worker exited" so the orchestrator's retry pass
+    // re-attempts it (that's the filter it uses).
+    await expect(pool.requestParse(task('poison.ts'))).rejects.toThrow(/Worker exited/);
+    const ok = await pool.requestParse(task('good.ts'));
+    expect(ok.durationMs).toBe(7);
+    expect(counts().spawned).toBe(2); // respawned after the crash
+    await pool.destroy();
+  });
+
+  it('times out a hung parse and stays usable', async () => {
+    const { pool } = makePool(1, (m) => (m.filePath === 'hang.ts' ? { hang: true } : { result: result(9) }), { parseTimeoutMs: 30 });
+    await expect(pool.requestParse(task('hang.ts'))).rejects.toThrow(/timed out/i);
+    const ok = await pool.requestParse(task('ok.ts'));
+    expect(ok.durationMs).toBe(9);
+    await pool.destroy();
+  });
+
+  it('serves a queue larger than the pool size', async () => {
+    const { pool } = makePool(2, (m) => ({ result: result(Number(m.filePath.replace(/\D/g, ''))) }));
+    const ps = Array.from({ length: 10 }, (_, i) => pool.requestParse(task(`${i}.ts`)));
+    const res = await Promise.all(ps);
+    expect(res.map((r) => r.durationMs).sort((a, b) => a - b)).toEqual([0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
+    await pool.destroy();
+  });
+
+  it('destroy() rejects in-flight and subsequent parses', async () => {
+    const { pool } = makePool(1, () => ({ hang: true }));
+    const p = pool.requestParse(task('x.ts'));
+    p.catch(() => {}); // avoid an unhandled rejection before we assert
+    await sleep(10);
+    await pool.destroy();
+    await expect(p).rejects.toThrow(/destroyed/);
+    await expect(pool.requestParse(task('y.ts'))).rejects.toThrow(/destroyed/);
+  });
+});

+ 169 - 226
src/extraction/index.ts

@@ -7,6 +7,7 @@
 import * as fs from 'fs';
 import * as fsp from 'fs/promises';
 import * as path from 'path';
+import * as os from 'os';
 import * as crypto from 'crypto';
 import { execFileSync } from 'child_process';
 import {
@@ -18,6 +19,7 @@ import {
 } from '../types';
 import { QueryBuilder } from '../db/queries';
 import { extractFromSource } from './tree-sitter';
+import { ParseWorkerPool, resolveParsePoolSize } from './parse-pool';
 import { detectLanguage, isSourceFile, isLanguageSupported, isFileLevelOnlyLanguage, initGrammars, loadGrammarsForLanguages } from './grammars';
 import { loadExtensionOverrides, loadIncludeIgnoredPatterns, loadExcludePatterns } from '../project-config';
 import { isCodeGraphDataDir } from '../directory';
@@ -1151,171 +1153,155 @@ export class ExtractionOrchestrator {
       neededLanguages.push('cpp');
     }
 
-    // Try to use a worker thread for parsing (keeps main thread unblocked for UI).
-    // Falls back to in-process parsing if the compiled worker is unavailable (e.g. tests).
+    // Parse files on a pool of worker threads (keeps the main thread free for UI
+    // and uses every core). Falls back to in-process parsing when the compiled
+    // worker is unavailable (e.g. running from source in tests).
     const parseWorkerPath = path.join(__dirname, 'parse-worker.js');
     const useWorker = fs.existsSync(parseWorkerPath);
-    let WorkerClass: typeof import('worker_threads').Worker | null = null;
 
+    let pool: ParseWorkerPool | null = null;
     if (useWorker) {
-      const { Worker } = await import('worker_threads');
-      WorkerClass = Worker;
+      // CODEGRAPH_PARSE_WORKERS: explicit worker count; 1 = the old single-worker
+      // behaviour (the conservative rollback). Unset → clamp(cores-1, 1, 8).
+      const poolSize = resolveParsePoolSize(process.env.CODEGRAPH_PARSE_WORKERS, os.cpus().length);
+      pool = new ParseWorkerPool({
+        languages: neededLanguages,
+        size: poolSize,
+        workerScriptPath: parseWorkerPath,
+        recycleInterval: WORKER_RECYCLE_INTERVAL,
+        parseTimeoutMs: PARSE_TIMEOUT_MS,
+        log,
+      });
+      log(`Parse worker pool: ${poolSize} worker(s)`);
     } else {
-      // In-process fallback: load grammars locally
+      // In-process fallback: load grammars locally and parse on the main thread.
       await loadGrammarsForLanguages(neededLanguages);
     }
 
-    // --- Worker lifecycle management ---
-    // The worker can crash (OOM in WASM) or hang on pathological files.
-    // We track pending parse promises and handle both cases:
-    //   - Timeout: terminate + restart the worker, reject the timed-out request
-    //   - Crash: reject all pending promises, restart for remaining files
-    let parseWorker: import('worker_threads').Worker | null = null;
-    let nextId = 0;
-    let workerParseCount = 0;
-    const pendingParses = new Map<number, {
-      resolve: (result: ExtractionResult) => void;
-      reject: (err: Error) => void;
-      timer: ReturnType<typeof setTimeout>;
-    }>();
-
-    function rejectAllPending(reason: string): void {
-      for (const [id, pending] of pendingParses) {
-        clearTimeout(pending.timer);
-        pendingParses.delete(id);
-        pending.reject(new Error(reason));
+    /**
+     * Parse one file: on the pool when available (the promise REJECTS on a worker
+     * crash/timeout — the caller records it and the retry pass re-attempts), or
+     * in-process synchronously as the no-worker fallback. The language is resolved
+     * here on the main thread, where the codegraph.json overrides are loaded.
+     */
+    const parseFile = (filePath: string, content: string): Promise<ExtractionResult> => {
+      const language = detectLanguage(filePath, content, overrides);
+      if (!pool) return Promise.resolve(extractFromSource(filePath, content, language, frameworkNames));
+      return pool.requestParse({ filePath, content, language, frameworkNames });
+    };
+
+    // --- Bounded rolling-window dispatch, ordered commit ---
+    // Reads stay batched/parallel; parses run concurrently across the pool; the
+    // SQLite store stays on the main thread (it isn't thread-safe). Crucially we
+    // COMMIT results in original file order, not parse-completion order: the
+    // resolution phase (run after indexing) resolves an ambiguous reference to one
+    // of several same-named candidates by the nodes' DB insertion order, so a
+    // stable commit order keeps the resulting graph deterministic — byte-identical
+    // to the single-worker path — instead of drifting with parse timing. The
+    // `completed` buffer holds at most ~windowSize out-of-order results, so memory
+    // stays bounded.
+    const windowSize = pool ? Math.max(4, pool.size * 2) : 1;
+    const inFlight = new Set<Promise<void>>();
+    const completed = new Map<number,
+      | { ok: true; filePath: string; content: string; stats: fs.Stats; result: ExtractionResult }
+      | { ok: false; filePath: string; err: unknown }>();
+    let nextSeq = 0;       // file-order sequence assigned at dispatch
+    let nextToStore = 0;   // cursor: next sequence to commit
+    let aborted = false;
+
+    const storeResult = (filePath: string, content: string, stats: fs.Stats, result: ExtractionResult): void => {
+      processed++;
+
+      // Store in database on main thread (SQLite is not thread-safe)
+      if (result.nodes.length > 0 || result.errors.length === 0) {
+        const language = detectLanguage(filePath, content, overrides);
+        this.storeExtractionResult(filePath, content, language, stats, result);
       }
-    }
 
-    function attachWorkerHandlers(w: import('worker_threads').Worker): void {
-      w.on('message', (msg: { type: string; id?: number; result?: ExtractionResult }) => {
-        if (msg.type === 'parse-result' && msg.id !== undefined) {
-          const pending = pendingParses.get(msg.id);
-          if (pending) {
-            clearTimeout(pending.timer);
-            pendingParses.delete(msg.id);
-            pending.resolve(msg.result!);
-          }
+      if (result.errors.length > 0) {
+        for (const err of result.errors) {
+          if (!err.filePath) err.filePath = filePath;
         }
-      });
-
-      w.on('error', (err) => {
-        logWarn('Parse worker error', { error: err.message });
-        rejectAllPending(`Worker error: ${err.message}`);
-      });
+        errors.push(...result.errors);
+      }
 
-      w.on('exit', (code) => {
-        if (code !== 0 && pendingParses.size > 0) {
-          logWarn('Parse worker exited unexpectedly', { code });
-          rejectAllPending(`Worker exited with code ${code}`);
-        }
-        // Clear reference so we know to respawn, reset count so
-        // the fresh worker gets a full cycle before recycling.
-        if (parseWorker === w) {
-          parseWorker = null;
-          workerParseCount = 0;
+      if (result.nodes.length > 0) {
+        filesIndexed++;
+        totalNodes += result.nodes.length;
+        totalEdges += result.edges.length;
+      } else if (result.errors.some((e) => e.severity === 'error')) {
+        filesErrored++;
+      } else {
+        // Files with no symbols but no errors (yaml, twig, properties) are
+        // tracked at the file level — count them as indexed so the CLI doesn't
+        // misleadingly report "No files found to index".
+        const lang = detectLanguage(filePath, content, overrides);
+        if (isFileLevelOnlyLanguage(lang)) {
+          filesIndexed++;
+        } else {
+          filesSkipped++;
         }
-      });
-    }
-
-    async function ensureWorker(): Promise<import('worker_threads').Worker> {
-      if (parseWorker) return parseWorker;
-      log('Spawning new parse worker...');
-      parseWorker = new WorkerClass!(parseWorkerPath);
-      attachWorkerHandlers(parseWorker);
-
-      // Load grammars in the new worker
-      await new Promise<void>((resolve, reject) => {
-        parseWorker!.once('message', (msg: { type: string }) => {
-          if (msg.type === 'grammars-loaded') resolve();
-          else reject(new Error(`Unexpected message: ${msg.type}`));
-        });
-        parseWorker!.postMessage({ type: 'load-grammars', languages: neededLanguages });
-      });
-
-      return parseWorker;
-    }
-
-    if (WorkerClass) {
-      await ensureWorker();
-    }
+      }
 
-    /**
-     * Recycle the worker thread to reclaim WASM memory.
-     * Terminates the current worker and clears the reference so
-     * ensureWorker() will spawn a fresh one on the next call.
-     */
-    function recycleWorker(): void {
-      if (!parseWorker) return;
-      log(`Recycling worker after ${workerParseCount} parses (heap: ${Math.round(process.memoryUsage().rss / 1024 / 1024)}MB RSS)`);
-      const w = parseWorker;
-      parseWorker = null;
-      workerParseCount = 0;
-      // Fire-and-forget: worker.terminate() can hang if WASM is stuck
-      w.terminate().catch(() => {});
-    }
+      onProgress?.({ phase: 'parsing', current: processed, total, currentFile: filePath });
+    };
 
-    async function requestParse(filePath: string, content: string): Promise<ExtractionResult> {
-      // Resolve the language on the main thread (where the project's
-      // codegraph.json overrides are loaded) and hand it to the worker, so the
-      // worker never needs the override map itself.
-      const language = detectLanguage(filePath, content, overrides);
+    const recordParseFailure = (filePath: string, err: unknown): void => {
+      processed++;
+      filesErrored++;
+      errors.push({
+        message: err instanceof Error ? err.message : String(err),
+        filePath,
+        severity: 'error',
+        code: 'parse_error',
+      });
+      onProgress?.({ phase: 'parsing', current: processed, total });
+    };
 
-      if (!WorkerClass) {
-        // In-process fallback
-        return extractFromSource(
-          filePath,
-          content,
-          language,
-          frameworkNames
-        );
+    // Commit buffered parses to the DB in file order, advancing the cursor over
+    // contiguous completed results. Runs after each parse settles (and once more
+    // after the drain). storeResult / recordParseFailure run here single-threaded,
+    // so shared counters and SQLite writes never race despite parallel parsing.
+    const flushOrdered = (): void => {
+      if (aborted) return;
+      while (completed.has(nextToStore)) {
+        const item = completed.get(nextToStore)!;
+        completed.delete(nextToStore);
+        nextToStore++;
+        if (item.ok) storeResult(item.filePath, item.content, item.stats, item.result);
+        else recordParseFailure(item.filePath, item.err);
       }
+    };
 
-      // Recycle the worker before the next parse if we've hit the threshold.
-      // This destroys the WASM linear memory (which can grow but never shrink)
-      // and starts a fresh worker with a clean heap.
-      if (workerParseCount >= WORKER_RECYCLE_INTERVAL) {
-        await recycleWorker();
+    // Dispatch one file's parse (parses run concurrently across the pool), tagged
+    // with its file-order sequence so flushOrdered commits results in order. The
+    // backpressure below bounds how far parsing runs ahead of the in-order commit.
+    const feed = async (filePath: string, content: string, stats: fs.Stats): Promise<void> => {
+      const seq = nextSeq++;
+      const p = (async () => {
+        try {
+          const result = await parseFile(filePath, content);
+          completed.set(seq, { ok: true, filePath, content, stats, result });
+        } catch (parseErr) {
+          completed.set(seq, { ok: false, filePath, err: parseErr });
+        }
+        flushOrdered();
+      })();
+      const tracked = p.finally(() => { inFlight.delete(tracked); });
+      inFlight.add(tracked);
+      // Backpressure on the dispatched-but-not-yet-committed count (in-flight +
+      // buffered), not just in-flight: a slow file sitting at the commit cursor
+      // lets later parses finish and buffer, which would otherwise grow without
+      // bound. Wait for parses to settle (each may advance the cursor) until the
+      // window has room. `inFlight.size > 0` guards against an empty race — the
+      // cursor file is always still in flight when the window is full.
+      while (nextSeq - nextToStore >= windowSize && inFlight.size > 0) {
+        await Promise.race(inFlight);
       }
-
-      const worker = await ensureWorker();
-      const id = nextId++;
-      workerParseCount++;
-
-      // Scale timeout for large files: base 10s + 10s per 100KB
-      const timeoutMs = PARSE_TIMEOUT_MS + Math.floor(content.length / 100_000) * 10_000;
-
-      return new Promise<ExtractionResult>((resolve, reject) => {
-        const timer = setTimeout(() => {
-          pendingParses.delete(id);
-          log(`TIMEOUT: ${filePath} exceeded ${timeoutMs}ms — killing worker`);
-          // Reject FIRST — worker.terminate() can hang if WASM is stuck
-          parseWorker = null;
-          workerParseCount = 0;
-          reject(new Error(`Parse timed out after ${timeoutMs}ms`));
-          // Fire-and-forget: kill the stuck worker in the background
-          worker.terminate().catch(() => {});
-        }, timeoutMs);
-
-        pendingParses.set(id, { resolve, reject, timer });
-        worker.postMessage({ type: 'parse', id, filePath, content, frameworkNames, language });
-      });
-    }
+    };
 
     for (let i = 0; i < files.length; i += FILE_IO_BATCH_SIZE) {
-      if (signal?.aborted) {
-        if (parseWorker) (parseWorker as import('worker_threads').Worker).terminate().catch(() => {});
-        return {
-          success: false,
-          filesIndexed,
-          filesSkipped,
-          filesErrored,
-          nodesCreated: totalNodes,
-          edgesCreated: totalEdges,
-          errors: [{ message: 'Aborted', severity: 'error' }, ...errors],
-          durationMs: Date.now() - startTime,
-        };
-      }
+      if (signal?.aborted) { aborted = true; break; }
 
       const batch = files.slice(i, i + FILE_IO_BATCH_SIZE);
 
@@ -1340,29 +1326,10 @@ export class ExtractionOrchestrator {
         })
       );
 
-      // Send to worker for parsing, store results on main thread
+      // Dispatch each readable file into the bounded parse window; the window
+      // stores results on the main thread as they arrive.
       for (const { filePath, content, stats, error } of fileContents) {
-        if (signal?.aborted) {
-          if (parseWorker) (parseWorker as import('worker_threads').Worker).terminate().catch(() => {});
-          return {
-            success: false,
-            filesIndexed,
-            filesSkipped,
-            filesErrored,
-            nodesCreated: totalNodes,
-            edgesCreated: totalEdges,
-            errors: [{ message: 'Aborted', severity: 'error' }, ...errors],
-            durationMs: Date.now() - startTime,
-          };
-        }
-
-        // Report progress before parsing (show current file being worked on)
-        onProgress?.({
-          phase: 'parsing',
-          current: processed,
-          total,
-          currentFile: filePath,
-        });
+        if (signal?.aborted) { aborted = true; break; }
 
         if (error || content === null || stats === null) {
           processed++;
@@ -1373,6 +1340,7 @@ export class ExtractionOrchestrator {
             severity: 'error',
             code: 'read_error',
           });
+          onProgress?.({ phase: 'parsing', current: processed, total });
           continue;
         }
 
@@ -1394,56 +1362,33 @@ export class ExtractionOrchestrator {
           continue;
         }
 
-        // Parse in worker thread (main thread stays unblocked).
-        // Wrapped in try/catch to handle worker timeouts and crashes gracefully.
-        let result: ExtractionResult;
-        try {
-          result = await requestParse(filePath, content);
-        } catch (parseErr) {
-          processed++;
-          filesErrored++;
-          errors.push({
-            message: parseErr instanceof Error ? parseErr.message : String(parseErr),
-            filePath,
-            severity: 'error',
-            code: 'parse_error',
-          });
-          continue;
-        }
-
-        processed++;
+        // Parse on the pool (main thread stays unblocked). Errors/timeouts are
+        // handled inside feed() → recordParseFailure, feeding the retry pass.
+        await feed(filePath, content, stats);
+      }
 
-        // Store in database on main thread (SQLite is not thread-safe)
-        if (result.nodes.length > 0 || result.errors.length === 0) {
-          const language = detectLanguage(filePath, content, overrides);
-          this.storeExtractionResult(filePath, content, language, stats, result);
-        }
+      if (aborted) break;
+    }
 
-        if (result.errors.length > 0) {
-          for (const err of result.errors) {
-            if (!err.filePath) err.filePath = filePath;
-          }
-          errors.push(...result.errors);
-        }
+    // Drain parses still in flight (skip on abort — we tear down below instead),
+    // then commit any results the cursor hasn't reached yet.
+    if (!aborted) {
+      await Promise.all(inFlight);
+      flushOrdered();
+    }
 
-        if (result.nodes.length > 0) {
-          filesIndexed++;
-          totalNodes += result.nodes.length;
-          totalEdges += result.edges.length;
-        } else if (result.errors.some((e) => e.severity === 'error')) {
-          filesErrored++;
-        } else {
-          // Files with no symbols but no errors (yaml, twig, properties) are
-          // tracked at the file level — count them as indexed so the CLI
-          // doesn't misleadingly report "No files found to index".
-          const lang = detectLanguage(filePath, content, overrides);
-          if (isFileLevelOnlyLanguage(lang)) {
-            filesIndexed++;
-          } else {
-            filesSkipped++;
-          }
-        }
-      }
+    if (signal?.aborted || aborted) {
+      if (pool) await pool.destroy();
+      return {
+        success: false,
+        filesIndexed,
+        filesSkipped,
+        filesErrored,
+        nodesCreated: totalNodes,
+        edgesCreated: totalEdges,
+        errors: [{ message: 'Aborted', severity: 'error' }, ...errors],
+        durationMs: Date.now() - startTime,
+      };
     }
 
     // Report 100% so the progress bar doesn't hang at 99%
@@ -1466,18 +1411,20 @@ export class ExtractionOrchestrator {
         (e.message.includes('Worker exited') || e.message.includes('memory access out of bounds'))
     );
 
-    if (retryableErrors.length > 0 && WorkerClass) {
+    if (retryableErrors.length > 0 && pool) {
       log(`Retrying ${retryableErrors.length} files that failed due to WASM memory errors...`);
 
+      // Fresh WASM heaps for the retry phase. A retry that still crashes its
+      // worker makes the pool respawn it, so later retries keep landing on clean
+      // workers too.
+      pool.recycleAll();
+
       const stillFailing: typeof retryableErrors = [];
 
       for (const errEntry of retryableErrors) {
         const filePath = errEntry.filePath!;
         if (signal?.aborted) break;
 
-        // Fresh worker for every retry — maximum WASM headroom
-        recycleWorker();
-
         let content: string;
         try {
           const fullPath = validatePathWithinRoot(this.rootDir, filePath);
@@ -1489,7 +1436,7 @@ export class ExtractionOrchestrator {
 
         let result: ExtractionResult;
         try {
-          result = await requestParse(filePath, content);
+          result = await parseFile(filePath, content);
         } catch {
           stillFailing.push(errEntry);
           continue;
@@ -1516,13 +1463,12 @@ export class ExtractionOrchestrator {
       // code nodes but consume parser memory.
       if (stillFailing.length > 0) {
         log(`${stillFailing.length} files still failing — retrying with comments stripped...`);
+        pool.recycleAll();
 
         for (const errEntry of stillFailing) {
           const filePath = errEntry.filePath!;
           if (signal?.aborted) break;
 
-          recycleWorker();
-
           let fullContent: string;
           try {
             const fullPath = validatePathWithinRoot(this.rootDir, filePath);
@@ -1541,7 +1487,7 @@ export class ExtractionOrchestrator {
 
           let result: ExtractionResult;
           try {
-            result = await requestParse(filePath, stripped);
+            result = await parseFile(filePath, stripped);
           } catch {
             continue;
           }
@@ -1563,11 +1509,8 @@ export class ExtractionOrchestrator {
       }
     }
 
-    // Shut down parse worker and clear any pending timers
-    rejectAllPending('Indexing complete');
-    if (parseWorker) {
-      (parseWorker as import('worker_threads').Worker).terminate().catch(() => {});
-    }
+    // Shut down the parse worker pool.
+    if (pool) await pool.destroy();
 
     return {
       success: filesIndexed > 0 || errors.filter((e) => e.severity === 'error').length === 0,

+ 360 - 0
src/extraction/parse-pool.ts

@@ -0,0 +1,360 @@
+/**
+ * Parse worker pool — runs tree-sitter parsing across N worker threads so a full
+ * `codegraph index` uses every core instead of pinning one.
+ *
+ * Why this exists: `ExtractionOrchestrator.indexAll()` already reads files in
+ * parallel, but it parsed them through a SINGLE worker thread, so on an
+ * N-core machine indexing a large repo used one core and left the rest idle
+ * (issue #1015, the parse-time half of #320). Spreading the parse calls across a
+ * pool of workers — each its own tree-sitter WASM heap — restores multi-core
+ * throughput. SQLite storage stays on the main thread (it isn't thread-safe), so
+ * only the CPU-bound parse step is parallelised; results are stored as they
+ * arrive, in whatever order they finish.
+ *
+ * Design mirrors {@link ../mcp/query-pool} (idle-list dispatch, lazy growth,
+ * throttled cold-starts, crash recovery), with parse-specific behaviour:
+ *   - per-worker recycle: WASM linear memory grows but never shrinks, so each
+ *     worker is torn down and replaced after `recycleInterval` parses to reclaim
+ *     its heap — the same reason the old single worker recycled.
+ *   - reject, don't retry: a parse that crashes or times out its worker REJECTS
+ *     (with a message the orchestrator's retry pass recognises) rather than being
+ *     silently requeued — the orchestrator owns the smarter two-stage retry
+ *     (fresh worker, then comment-stripped) on a clean WASM heap.
+ *   - a size-1 pool reproduces the old single-worker path exactly, which is the
+ *     conservative rollback: set `CODEGRAPH_PARSE_WORKERS=1`.
+ *
+ * Memory: peak scales with pool size (≈ size × a worker's pre-recycle heap), so
+ * the default is capped and the env var lets constrained machines dial it down.
+ */
+
+import { Worker } from 'worker_threads';
+import type { Language, ExtractionResult } from '../types';
+
+/**
+ * Minimal worker surface the pool drives — satisfied by a real `worker_threads`
+ * Worker. Abstracted so tests can inject a fake worker and exercise the pool's
+ * queue / growth / recycle / crash-recovery logic without spawning threads or a
+ * built `dist/`.
+ */
+export interface ParsePoolWorker {
+  postMessage(msg: unknown): void;
+  terminate(): Promise<number> | void;
+  on(event: 'message', cb: (m: unknown) => void): void;
+  on(event: 'error', cb: (e: Error) => void): void;
+  on(event: 'exit', cb: (code: number) => void): void;
+}
+
+/** A single file to parse. `language` is resolved on the main thread (it holds
+ *  the project's codegraph.json extension overrides) and handed to the worker. */
+export interface ParseTask {
+  filePath: string;
+  content: string;
+  language: Language;
+  frameworkNames?: string[];
+}
+
+/** Default upper bound on the pool size derived from the core count. */
+const DEFAULT_PARSE_POOL_CAP = 8;
+/** Hard ceiling on pool size regardless of an explicit env override. */
+const MAX_PARSE_POOL_SIZE = 16;
+/** Parses a worker performs before it's recycled to reclaim WASM heap. */
+const DEFAULT_RECYCLE_INTERVAL = 250;
+/** Base per-parse timeout; scaled up for large files by the caller's formula. */
+const DEFAULT_PARSE_TIMEOUT_MS = 10_000;
+/**
+ * Max workers cold-starting at once. A worker's cold start is heavy (module load
+ * + grammar WASM compile); starting the whole pool simultaneously thrashes CPU.
+ * Warming a couple at a time keeps each start fast while the pool still reaches
+ * full size within a few parses of a large run.
+ */
+const MAX_CONCURRENT_SPAWN = 2;
+/**
+ * Total worker deaths before the pool stops respawning and fails outstanding
+ * work, so a systematically-broken worker platform degrades instead of
+ * respawning forever. Set high: normal per-file WASM crashes are cleared by the
+ * orchestrator's retry pass and shouldn't trip this on a merely-crashy repo.
+ */
+const CRASH_BUDGET = 100;
+
+/**
+ * Resolve the pool size from the `CODEGRAPH_PARSE_WORKERS` override and the
+ * machine's core count.
+ *   - explicit `0` or `1` → 1 worker (the old single-worker path; the rollback).
+ *   - explicit `N` → N, clamped to [1, 16].
+ *   - unset / blank / non-numeric → `clamp(cores - 1, 1, 8)` (leave a core for
+ *     the main thread + UI; never zero — parsing always needs a worker).
+ */
+export function resolveParsePoolSize(envVal: string | undefined, cpuCount: number): number {
+  if (envVal !== undefined && envVal !== '') {
+    const n = Number(envVal);
+    if (Number.isFinite(n) && n >= 0) {
+      return Math.max(1, Math.min(Math.floor(n), MAX_PARSE_POOL_SIZE));
+    }
+    // non-numeric / negative → fall through to the default
+  }
+  return Math.max(1, Math.min(cpuCount - 1, DEFAULT_PARSE_POOL_CAP));
+}
+
+interface ParseJob {
+  id: number;
+  task: ParseTask;
+  resolve: (r: ExtractionResult) => void;
+  reject: (e: Error) => void;
+  settled: boolean;
+  timer?: ReturnType<typeof setTimeout>;
+}
+
+/** Shape of a message a worker posts back (grammar-load ack or a parse result). */
+interface ParseWorkerMessage {
+  type?: string;
+  id?: number;
+  result?: ExtractionResult;
+}
+
+export interface ParseWorkerPoolOptions {
+  /** Languages to load grammars for in every worker at spawn. */
+  languages: Language[];
+  /** Number of worker threads (≥1). Clamp the resolved value before passing. */
+  size: number;
+  /** Compiled `parse-worker.js` path. Required unless `createWorker` is given. */
+  workerScriptPath?: string;
+  /** Parses per worker before recycle. Default 250. */
+  recycleInterval?: number;
+  /** Base per-parse timeout (ms); scaled by file size per parse. Default 10s. */
+  parseTimeoutMs?: number;
+  /** Worker factory (tests inject a fake). Defaults to a real `worker_threads` Worker. */
+  createWorker?: () => ParsePoolWorker;
+  /** Optional verbose logger (the orchestrator's `[worker] …` logger). */
+  log?: (msg: string) => void;
+}
+
+export class ParseWorkerPool {
+  private idle: ParsePoolWorker[] = [];
+  private queue: ParseJob[] = [];
+  private inflight = new Map<ParsePoolWorker, ParseJob>();
+  private workers = new Set<ParsePoolWorker>();
+  // Spawned but not yet 'grammars-loaded'. Growth counts these so a single first
+  // parse doesn't spawn the whole pool before the eager worker reports ready.
+  private pending = new Set<ParsePoolWorker>();
+  private parseCounts = new Map<ParsePoolWorker, number>();
+  private nextId = 1;
+  private totalCrashes = 0;
+  private destroyed = false;
+
+  private readonly languages: Language[];
+  private readonly maxSize: number;
+  private readonly recycleInterval: number;
+  private readonly parseTimeoutMs: number;
+  private readonly createWorker: () => ParsePoolWorker;
+  private readonly log: (msg: string) => void;
+
+  constructor(opts: ParseWorkerPoolOptions) {
+    this.languages = opts.languages;
+    this.maxSize = Math.max(1, Math.min(opts.size, MAX_PARSE_POOL_SIZE));
+    this.recycleInterval = opts.recycleInterval ?? DEFAULT_RECYCLE_INTERVAL;
+    this.parseTimeoutMs = opts.parseTimeoutMs ?? DEFAULT_PARSE_TIMEOUT_MS;
+    this.log = opts.log ?? (() => {});
+    if (opts.createWorker) {
+      this.createWorker = opts.createWorker;
+    } else if (opts.workerScriptPath) {
+      const scriptPath = opts.workerScriptPath;
+      this.createWorker = () => new Worker(scriptPath);
+    } else {
+      throw new Error('ParseWorkerPool requires workerScriptPath or createWorker');
+    }
+    this.spawnOne(); // one eager warm worker, ready for the first parse
+  }
+
+  /** Pool size cap (for logging). */
+  get size(): number { return this.maxSize; }
+
+  /** Live worker count (for tests). */
+  get liveWorkers(): number { return this.workers.size; }
+
+  /** False once the crash budget is exhausted (or after destroy). */
+  get healthy(): boolean {
+    return !this.destroyed && this.totalCrashes < CRASH_BUDGET;
+  }
+
+  /**
+   * Parse one file on the pool. Resolves with the extraction result, or REJECTS
+   * if the parse times out or its worker crashes — the caller records the error
+   * and (for worker-exit/OOM rejections) re-attempts in its retry pass.
+   */
+  requestParse(task: ParseTask): Promise<ExtractionResult> {
+    if (this.destroyed) return Promise.reject(new Error('Parse pool destroyed'));
+    return new Promise<ExtractionResult>((resolve, reject) => {
+      this.queue.push({ id: this.nextId++, task, resolve, reject, settled: false });
+      this.drain();
+    });
+  }
+
+  private spawnOne(): void {
+    if (this.destroyed || this.workers.size >= this.maxSize || !this.healthy) return;
+    let w: ParsePoolWorker;
+    try {
+      w = this.createWorker();
+    } catch {
+      this.totalCrashes++; // counts toward the circuit breaker
+      return;
+    }
+    this.workers.add(w);
+    this.pending.add(w);
+    this.parseCounts.set(w, 0);
+    w.on('message', (m) => this.onMessage(w, (m ?? {}) as ParseWorkerMessage));
+    w.on('error', (e) => this.onWorkerGone(w, `Worker error: ${e?.message ?? 'unknown'}`));
+    w.on('exit', (code) => { if (code !== 0) this.onWorkerGone(w, `Worker exited with code ${code}`); });
+    // Load grammars; the worker replies 'grammars-loaded' and only then is idle.
+    w.postMessage({ type: 'load-grammars', languages: this.languages });
+  }
+
+  private onMessage(w: ParsePoolWorker, m: ParseWorkerMessage): void {
+    if (m.type === 'grammars-loaded') {
+      if (!this.workers.has(w)) return; // recycled/destroyed before ready
+      this.pending.delete(w);
+      this.idle.push(w);
+      this.drain();
+      return;
+    }
+    if (m.type === 'parse-result') {
+      const job = this.inflight.get(w);
+      if (!job || (m.id !== undefined && m.id !== job.id)) return; // stale (post-recycle)
+      this.inflight.delete(w);
+      // Recycle the worker once it's done enough parses to have grown its WASM
+      // heap; otherwise return it to the idle set for the next job.
+      if ((this.parseCounts.get(w) ?? 0) >= this.recycleInterval) {
+        this.recycle(w);
+      } else {
+        this.idle.push(w);
+      }
+      this.settle(job, m.result);
+      this.drain();
+    }
+  }
+
+  /** A worker died (crash hook / OOM exit / spawn error). Reject its in-flight
+   *  parse so the caller's retry pass can re-attempt it, then respawn. */
+  private onWorkerGone(w: ParsePoolWorker, message: string): void {
+    if (!this.workers.has(w)) return; // already handled (error+exit both fire), or recycled
+    this.removeWorker(w);
+    this.totalCrashes++;
+    const job = this.inflight.get(w);
+    this.inflight.delete(w);
+    try { void w.terminate(); } catch { /* already gone */ }
+    if (job) this.settle(job, undefined, new Error(message));
+    if (this.healthy) this.spawnOne(); // keep capacity
+    this.drain();
+  }
+
+  /** Tear down a worker that has hit its recycle threshold and replace it. Not a
+   *  crash, so it doesn't count against the budget. */
+  private recycle(w: ParsePoolWorker): void {
+    this.log(`Recycling worker after ${this.parseCounts.get(w)} parses (heap: ${Math.round(process.memoryUsage().rss / 1024 / 1024)}MB RSS)`);
+    this.removeWorker(w);
+    // Fire-and-forget: worker.terminate() can hang if WASM is wedged.
+    try { void w.terminate(); } catch { /* already gone */ }
+    if (this.healthy && !this.destroyed) this.spawnOne();
+  }
+
+  private removeWorker(w: ParsePoolWorker): void {
+    this.workers.delete(w);
+    this.pending.delete(w);
+    this.parseCounts.delete(w);
+    this.idle = this.idle.filter((x) => x !== w);
+  }
+
+  private dispatch(w: ParsePoolWorker, job: ParseJob): void {
+    this.inflight.set(w, job);
+    this.parseCounts.set(w, (this.parseCounts.get(w) ?? 0) + 1);
+    // Scale the timeout for large files: base + 10s per 100KB (matches the
+    // original single-worker formula so pathological-file behaviour is unchanged).
+    const timeoutMs = this.parseTimeoutMs + Math.floor(job.task.content.length / 100_000) * 10_000;
+    job.timer = setTimeout(() => this.onTimeout(w, job, timeoutMs), timeoutMs);
+    job.timer.unref?.();
+    w.postMessage({
+      type: 'parse',
+      id: job.id,
+      filePath: job.task.filePath,
+      content: job.task.content,
+      frameworkNames: job.task.frameworkNames,
+      language: job.task.language,
+    });
+  }
+
+  private onTimeout(w: ParsePoolWorker, job: ParseJob, ms: number): void {
+    if (job.settled || !this.workers.has(w)) return;
+    this.log(`TIMEOUT: ${job.task.filePath} exceeded ${ms}ms — killing worker`);
+    // Kill the (possibly WASM-wedged) worker and reject this parse. A timeout
+    // isn't a crash — don't charge the budget — but the worker is gone, so spawn
+    // a replacement to keep capacity.
+    this.removeWorker(w);
+    this.inflight.delete(w);
+    try { void w.terminate(); } catch { /* already gone */ }
+    this.settle(job, undefined, new Error(`Parse timed out after ${ms}ms`));
+    if (this.healthy) this.spawnOne();
+    this.drain();
+  }
+
+  private drain(): void {
+    // Grow toward maxSize while queued work outstrips workers that are idle OR
+    // already on their way up — throttled so we never cold-start the whole pool
+    // at once.
+    while (
+      this.queue.length > this.idle.length + this.pending.size &&
+      this.workers.size < this.maxSize &&
+      this.pending.size < MAX_CONCURRENT_SPAWN &&
+      !this.destroyed &&
+      this.healthy
+    ) {
+      this.spawnOne();
+    }
+    // Dispatch queued jobs to idle workers.
+    while (this.idle.length && this.queue.length) {
+      let job: ParseJob | undefined;
+      while (this.queue.length && (job = this.queue.shift()) && job.settled) job = undefined;
+      if (!job || job.settled) break;
+      const w = this.idle.pop()!;
+      this.dispatch(w, job);
+    }
+    // Hang-prevention: if there's queued work but nothing can ever run it (no
+    // idle workers, none spawning, none alive), fail it instead of hanging
+    // forever. Reached only when the crash budget is exhausted or after destroy.
+    if (this.queue.length && this.idle.length === 0 && this.pending.size === 0 && this.workers.size === 0) {
+      const reason = this.destroyed ? 'parse pool destroyed' : 'parse pool exhausted its worker crash budget';
+      for (const job of this.queue.splice(0)) this.settle(job, undefined, new Error(reason));
+    }
+  }
+
+  private settle(job: ParseJob, result?: ExtractionResult, err?: Error): void {
+    if (job.settled) return;
+    job.settled = true;
+    if (job.timer) clearTimeout(job.timer);
+    if (err) job.reject(err);
+    else job.resolve(result!);
+  }
+
+  /**
+   * Recycle every idle worker now (fresh WASM heaps). The orchestrator calls
+   * this before its retry pass so crash-on-memory files get the cleanest heap.
+   */
+  recycleAll(): void {
+    for (const w of [...this.idle]) this.recycle(w);
+  }
+
+  /** Terminate all workers and reject any outstanding parses. */
+  async destroy(): Promise<void> {
+    if (this.destroyed) return;
+    this.destroyed = true;
+    const ws = [...this.workers];
+    this.workers.clear();
+    this.pending.clear();
+    this.parseCounts.clear();
+    this.idle = [];
+    for (const job of [...this.inflight.values(), ...this.queue]) {
+      this.settle(job, undefined, new Error('parse pool destroyed'));
+    }
+    this.inflight.clear();
+    this.queue = [];
+    await Promise.all(ws.map((w) => Promise.resolve(w.terminate()).catch(() => { /* already gone */ })));
+  }
+}