1
0

parse-pool.test.ts 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178
  1. /**
  2. * ParseWorkerPool — the worker pool that parses files across cores during a full
  3. * `codegraph index` (issue #1015). These tests drive the pool's queue / growth /
  4. * recycle / crash-recovery / timeout / teardown logic with INJECTED fake
  5. * workers, so they exercise the real scheduling code without spawning threads or
  6. * needing a built dist.
  7. *
  8. * End-to-end behavior with real worker threads (each worker owns a tree-sitter
  9. * WASM heap and runs extractFromSource) is covered by the extraction suite
  10. * against a real temp project; here we pin the orchestration that makes the
  11. * parallelism safe.
  12. */
  13. import { describe, it, expect } from 'vitest';
  14. import { ParseWorkerPool, resolveParsePoolSize, type ParsePoolWorker, type ParseTask } from '../src/extraction/parse-pool';
  15. import type { Language, ExtractionResult } from '../src/types';
  16. const sleep = (ms: number) => new Promise((r) => setTimeout(r, ms));
  17. interface ParseMsg { type: 'parse'; id: number; filePath: string; content: string; language: Language }
  18. type Action = { result: ExtractionResult } | { crash: true } | { hang: true } | { wait: Promise<ExtractionResult> };
  19. /**
  20. * Fake worker speaking the same {load-grammars → grammars-loaded} /
  21. * {parse → parse-result} protocol as the real parse-worker. `behavior` decides
  22. * per parse whether to return a result, crash (exit≠0), hang (never reply —
  23. * exercises the timeout), or wait on a promise (hold a parse in-flight to
  24. * observe concurrency). Emits 'grammars-loaded' on a macrotask so the pool has
  25. * wired its listeners first.
  26. */
  27. class FakeWorker implements ParsePoolWorker {
  28. private msgCb?: (m: unknown) => void;
  29. private exitCb?: (code: number) => void;
  30. alive = true;
  31. constructor(private behavior: (m: ParseMsg) => Action, private onTerminate?: () => void) {}
  32. on(event: string, cb: (...args: any[]) => void): void {
  33. if (event === 'message') this.msgCb = cb;
  34. else if (event === 'exit') this.exitCb = cb;
  35. // 'error' unused by the fakes
  36. }
  37. private reply(id: number, result: ExtractionResult): void {
  38. if (this.alive) this.msgCb?.({ type: 'parse-result', id, result });
  39. }
  40. postMessage(msg: unknown): void {
  41. const m = msg as { type: string } & Partial<ParseMsg>;
  42. if (m.type === 'load-grammars') {
  43. setTimeout(() => { if (this.alive) this.msgCb?.({ type: 'grammars-loaded' }); }, 0);
  44. return;
  45. }
  46. if (m.type !== 'parse') return;
  47. const action = this.behavior(m as ParseMsg);
  48. if ('crash' in action) {
  49. this.alive = false;
  50. setTimeout(() => this.exitCb?.(1), 0); // simulate a WASM-OOM exit(1)
  51. return;
  52. }
  53. if ('hang' in action) return; // never reply → timeout path
  54. if ('wait' in action) { void action.wait.then((r) => this.reply(m.id!, r)); return; }
  55. setTimeout(() => this.reply(m.id!, action.result), 0);
  56. }
  57. terminate(): Promise<number> { this.alive = false; this.onTerminate?.(); return Promise.resolve(0); }
  58. }
  59. const task = (filePath: string, content = 'code'): ParseTask => ({ filePath, content, language: 'typescript' as Language });
  60. const result = (tag = 0): ExtractionResult => ({ nodes: [], edges: [], unresolvedReferences: [], errors: [], durationMs: tag });
  61. /** Build a pool with a counting fake-worker factory. */
  62. function makePool(
  63. size: number,
  64. behavior: (m: ParseMsg) => Action,
  65. opts: Partial<{ recycleInterval: number; parseTimeoutMs: number }> = {}
  66. ) {
  67. let spawned = 0, terminated = 0;
  68. const pool = new ParseWorkerPool({
  69. languages: ['typescript'] as Language[],
  70. size,
  71. recycleInterval: opts.recycleInterval,
  72. parseTimeoutMs: opts.parseTimeoutMs,
  73. createWorker: () => { spawned++; return new FakeWorker(behavior, () => { terminated++; }); },
  74. });
  75. return { pool, counts: () => ({ spawned, terminated }) };
  76. }
  77. describe('resolveParsePoolSize', () => {
  78. it('treats explicit 0 and 1 as a single worker (the rollback path)', () => {
  79. expect(resolveParsePoolSize('0', 8)).toBe(1);
  80. expect(resolveParsePoolSize('1', 8)).toBe(1);
  81. });
  82. it('honors a numeric override, capped at the hard ceiling', () => {
  83. expect(resolveParsePoolSize('4', 8)).toBe(4);
  84. expect(resolveParsePoolSize('999', 8)).toBe(16);
  85. });
  86. it('defaults to clamp(cores-1, 1, 8) when unset/blank/non-numeric', () => {
  87. expect(resolveParsePoolSize(undefined, 8)).toBe(7);
  88. expect(resolveParsePoolSize('', 8)).toBe(7);
  89. expect(resolveParsePoolSize('abc', 8)).toBe(7);
  90. expect(resolveParsePoolSize(undefined, 1)).toBe(1); // never zero
  91. expect(resolveParsePoolSize(undefined, 2)).toBe(1); // leave a core
  92. expect(resolveParsePoolSize(undefined, 64)).toBe(8); // never above the default cap
  93. });
  94. });
  95. describe('ParseWorkerPool', () => {
  96. it('parses a file and returns the worker result', async () => {
  97. const { pool } = makePool(1, () => ({ result: result(42) }));
  98. const res = await pool.requestParse(task('a.ts'));
  99. expect(res.durationMs).toBe(42);
  100. await pool.destroy();
  101. });
  102. it('runs N parses in parallel across the pool (not serialized)', async () => {
  103. let active = 0, maxActive = 0;
  104. let release!: () => void;
  105. const gate = new Promise<void>((r) => { release = r; });
  106. const { pool } = makePool(4, () => ({
  107. wait: (async () => { active++; maxActive = Math.max(maxActive, active); await gate; active--; return result(); })(),
  108. }));
  109. const ps = [0, 1, 2, 3].map((i) => pool.requestParse(task(`f${i}.ts`)));
  110. await sleep(60); // let the pool grow to size and dispatch all four
  111. expect(maxActive).toBe(4);
  112. release();
  113. await Promise.all(ps);
  114. await pool.destroy();
  115. });
  116. it('grows lazily — a single parse does not spawn the whole pool', async () => {
  117. const { pool, counts } = makePool(8, () => ({ result: result() }));
  118. await pool.requestParse(task('only.ts'));
  119. expect(counts().spawned).toBe(1); // just the eager warm worker
  120. await pool.destroy();
  121. });
  122. it('recycles a worker after recycleInterval parses', async () => {
  123. const { pool, counts } = makePool(1, () => ({ result: result() }), { recycleInterval: 3 });
  124. for (let i = 0; i < 4; i++) await pool.requestParse(task(`f${i}.ts`));
  125. // 3 parses on the first worker → recycle (terminate + respawn); the 4th runs
  126. // on the fresh worker.
  127. expect(counts().spawned).toBe(2);
  128. expect(counts().terminated).toBeGreaterThanOrEqual(1);
  129. await pool.destroy();
  130. });
  131. it('rejects a parse whose worker crashes (retry-pass-recognisable message) and keeps serving', async () => {
  132. const { pool, counts } = makePool(1, (m) => (m.filePath === 'poison.ts' ? { crash: true } : { result: result(7) }));
  133. // The message must contain "Worker exited" so the orchestrator's retry pass
  134. // re-attempts it (that's the filter it uses).
  135. await expect(pool.requestParse(task('poison.ts'))).rejects.toThrow(/Worker exited/);
  136. const ok = await pool.requestParse(task('good.ts'));
  137. expect(ok.durationMs).toBe(7);
  138. expect(counts().spawned).toBe(2); // respawned after the crash
  139. await pool.destroy();
  140. });
  141. it('times out a hung parse and stays usable', async () => {
  142. const { pool } = makePool(1, (m) => (m.filePath === 'hang.ts' ? { hang: true } : { result: result(9) }), { parseTimeoutMs: 30 });
  143. await expect(pool.requestParse(task('hang.ts'))).rejects.toThrow(/timed out/i);
  144. const ok = await pool.requestParse(task('ok.ts'));
  145. expect(ok.durationMs).toBe(9);
  146. await pool.destroy();
  147. });
  148. it('serves a queue larger than the pool size', async () => {
  149. const { pool } = makePool(2, (m) => ({ result: result(Number(m.filePath.replace(/\D/g, ''))) }));
  150. const ps = Array.from({ length: 10 }, (_, i) => pool.requestParse(task(`${i}.ts`)));
  151. const res = await Promise.all(ps);
  152. expect(res.map((r) => r.durationMs).sort((a, b) => a - b)).toEqual([0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
  153. await pool.destroy();
  154. });
  155. it('destroy() rejects in-flight and subsequent parses', async () => {
  156. const { pool } = makePool(1, () => ({ hang: true }));
  157. const p = pool.requestParse(task('x.ts'));
  158. p.catch(() => {}); // avoid an unhandled rejection before we assert
  159. await sleep(10);
  160. await pool.destroy();
  161. await expect(p).rejects.toThrow(/destroyed/);
  162. await expect(pool.requestParse(task('y.ts'))).rejects.toThrow(/destroyed/);
  163. });
  164. });