query-pool.test.ts 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174
  1. /**
  2. * QueryPool — the off-loop worker pool that keeps the shared daemon's main
  3. * event loop free for the MCP transport under concurrent read load (the
  4. * "10 subagents time out" report). These tests drive the pool's queue / growth /
  5. * crash-recovery / backstop logic with INJECTED fake workers, so they exercise
  6. * the real scheduling code without spawning threads or needing a built dist.
  7. *
  8. * End-to-end behavior with real worker threads (a worker opens its own WAL read
  9. * connection and runs codegraph_explore) is validated separately against a real
  10. * index; here we pin the orchestration that makes that safe and fair.
  11. */
  12. import { describe, it, expect } from 'vitest';
  13. import { QueryPool, resolvePoolSize, type PoolWorker } from '../src/mcp/query-pool';
  14. import type { ToolResult } from '../src/mcp/tools';
  15. const sleep = (ms: number) => new Promise((r) => setTimeout(r, ms));
  16. interface CallMsg { type: 'call'; id: number; toolName: string; args: Record<string, unknown> }
  17. type Action = { result: ToolResult } | { crash: true } | { hang: true } | { wait: Promise<ToolResult> };
  18. /**
  19. * Fake worker speaking the same {type:'ready'|'result'} protocol as the real
  20. * one. `behavior` decides per call whether to return a result, crash (exit≠0),
  21. * hang (never reply — exercises the backstop), or wait on a promise (lets a test
  22. * hold a call in-flight to observe concurrency). Emits 'ready' on a macrotask so
  23. * the pool has wired its listeners first.
  24. */
  25. class FakeWorker implements PoolWorker {
  26. private msgCb?: (m: unknown) => void;
  27. private exitCb?: (code: number) => void;
  28. alive = true;
  29. constructor(private behavior: (m: CallMsg) => Action, readyOk = true) {
  30. setTimeout(() => { if (this.alive) this.msgCb?.({ type: 'ready', ok: readyOk }); }, 0);
  31. }
  32. on(event: string, cb: (...args: any[]) => void): void {
  33. if (event === 'message') this.msgCb = cb;
  34. else if (event === 'exit') this.exitCb = cb;
  35. // 'error' unused by the fakes
  36. }
  37. private reply(id: number, result: ToolResult): void {
  38. if (this.alive) this.msgCb?.({ type: 'result', id, result });
  39. }
  40. postMessage(msg: unknown): void {
  41. const m = msg as CallMsg;
  42. if (!m || m.type !== 'call') return;
  43. const action = this.behavior(m);
  44. if ('crash' in action) {
  45. this.alive = false;
  46. setTimeout(() => this.exitCb?.(13), 0); // simulate a crash exit
  47. return;
  48. }
  49. if ('hang' in action) return; // never reply
  50. if ('wait' in action) { void action.wait.then((r) => this.reply(m.id, r)); return; }
  51. setTimeout(() => this.reply(m.id, action.result), 0);
  52. }
  53. terminate(): Promise<number> { this.alive = false; return Promise.resolve(0); }
  54. }
  55. const ok = (text: string): ToolResult => ({ content: [{ type: 'text', text }] });
  56. describe('resolvePoolSize', () => {
  57. it('honors a numeric override and disables on 0', () => {
  58. expect(resolvePoolSize('0', 8)).toBe(0);
  59. expect(resolvePoolSize('3', 8)).toBe(3);
  60. });
  61. it('caps the override at the hard ceiling', () => {
  62. expect(resolvePoolSize('999', 8)).toBe(16);
  63. });
  64. it('defaults to clamp(cores-1, 1, 16) when unset/blank/non-numeric', () => {
  65. expect(resolvePoolSize(undefined, 8)).toBe(7);
  66. expect(resolvePoolSize('', 8)).toBe(7);
  67. expect(resolvePoolSize('abc', 8)).toBe(7);
  68. expect(resolvePoolSize(undefined, 1)).toBe(1); // never zero
  69. expect(resolvePoolSize(undefined, 64)).toBe(16); // never above the ceiling
  70. });
  71. });
  72. describe('QueryPool', () => {
  73. it('dispatches a call and returns the worker result', async () => {
  74. const pool = new QueryPool({ root: '/x', size: 1, createWorker: () => new FakeWorker((m) => ({ result: ok(`r:${m.toolName}`) })) });
  75. const res = await pool.run('codegraph_explore', { query: 'q' });
  76. expect(res.content[0].text).toBe('r:codegraph_explore');
  77. await pool.destroy();
  78. });
  79. it('runs N concurrent calls in parallel (not serialized)', async () => {
  80. let active = 0, maxActive = 0;
  81. let release!: () => void;
  82. const gate = new Promise<void>((r) => { release = r; });
  83. // Each call holds in-flight until the gate opens, so max concurrency across
  84. // the pool is observable: with size=5 and 5 calls, all 5 should run at once.
  85. const behavior = (m: CallMsg): Action => ({
  86. wait: (async () => {
  87. active++; maxActive = Math.max(maxActive, active);
  88. await gate;
  89. active--;
  90. return ok(`r${m.id}`);
  91. })(),
  92. });
  93. const pool = new QueryPool({ root: '/x', size: 5, createWorker: () => new FakeWorker(behavior) });
  94. const calls = Promise.all(Array.from({ length: 5 }, (_, i) => pool.run('codegraph_search', { i })));
  95. await sleep(40); // let all workers spawn (cold-start cap → a few generations) + dispatch
  96. expect(maxActive).toBe(5);
  97. release();
  98. const results = await calls;
  99. expect(results.every((r) => /^r\d+$/.test(r.content[0].text))).toBe(true);
  100. await pool.destroy();
  101. });
  102. it('does not spawn the whole pool for a single call (pending-aware growth)', async () => {
  103. let created = 0;
  104. const pool = new QueryPool({ root: '/x', size: 8, createWorker: () => { created++; return new FakeWorker((m) => ({ result: ok(`r${m.id}`) })); } });
  105. await pool.run('codegraph_node', { symbol: 's' });
  106. // One eager worker + at most the cold-start cap — never all 8.
  107. expect(created).toBeLessThanOrEqual(2);
  108. await pool.destroy();
  109. });
  110. it('recovers from a worker crash: retries the in-flight call and respawns', async () => {
  111. let calls = 0;
  112. const pool = new QueryPool({
  113. root: '/x', size: 2, maxRetries: 1,
  114. // First dispatch crashes its worker; the retry (on a respawn/other worker) succeeds.
  115. createWorker: () => new FakeWorker((m) => (++calls === 1 ? { crash: true } : { result: ok(`recovered:${m.id}`) })),
  116. });
  117. const res = await pool.run('codegraph_explore', { query: 'q' });
  118. expect(res.isError).toBeFalsy();
  119. expect(res.content[0].text).toBe('recovered:1');
  120. await sleep(10);
  121. // The pool grows lazily, so one call keeps one worker — but the crash must
  122. // have been replaced (not dropped to zero) and the pool stays healthy and
  123. // keeps serving.
  124. expect(pool.liveWorkers).toBeGreaterThanOrEqual(1);
  125. expect(pool.healthy).toBe(true);
  126. const again = await pool.run('codegraph_node', { symbol: 's' });
  127. expect(again.isError).toBeFalsy();
  128. await pool.destroy();
  129. });
  130. it('fails a poison call gracefully without wedging the pool', async () => {
  131. // This specific call always crashes its worker; a normal call still works.
  132. const poison = (m: CallMsg) => m.toolName === 'codegraph_explore';
  133. const pool = new QueryPool({
  134. root: '/x', size: 3, maxRetries: 1,
  135. createWorker: () => new FakeWorker((m) => (poison(m) ? { crash: true } : { result: ok(`ok:${m.id}`) })),
  136. });
  137. const bad = await pool.run('codegraph_explore', { query: 'boom' });
  138. expect(bad.isError).toBe(true); // graceful, after retries
  139. const good = await pool.run('codegraph_search', { query: 'fine' });
  140. expect(good.isError).toBeFalsy();
  141. expect(good.content[0].text).toMatch(/^ok:/);
  142. await pool.destroy();
  143. });
  144. it('graceful backstop: a call that can\'t be served in time gets success-shaped busy guidance', async () => {
  145. // 1 worker, every call hangs; soft-timeout small → the caller gets guidance,
  146. // never a hard error, never a hang.
  147. const pool = new QueryPool({ root: '/x', size: 1, softTimeoutMs: 60, createWorker: () => new FakeWorker(() => ({ hang: true })) });
  148. const res = await pool.run('codegraph_explore', { query: 'q' });
  149. expect(res.isError).toBeFalsy(); // NOT an error (abandonment rule)
  150. expect(res.content[0].text).toMatch(/busy|retry/i);
  151. await pool.destroy();
  152. });
  153. it('destroy settles outstanding calls instead of hanging', async () => {
  154. const pool = new QueryPool({ root: '/x', size: 1, softTimeoutMs: 10_000, createWorker: () => new FakeWorker(() => ({ hang: true })) });
  155. const pending = pool.run('codegraph_explore', { query: 'q' });
  156. await sleep(5);
  157. await pool.destroy();
  158. const res = await pending; // must resolve, not hang
  159. expect(res.isError).toBe(true);
  160. expect(pool.healthy).toBe(false);
  161. });
  162. });