|
|
@@ -31,6 +31,22 @@ const FILE_IO_BATCH_SIZE = 10;
|
|
|
|
|
|
// PARSER_RESET_INTERVAL moved to parse-worker.ts (runs in worker thread)
|
|
|
|
|
|
+/**
|
|
|
+ * Maximum time (ms) to wait for a single file to parse in the worker thread.
|
|
|
+ * If tree-sitter hangs or WASM runs out of memory, this prevents the entire
|
|
|
+ * indexing run from freezing. The worker is restarted after a timeout.
|
|
|
+ */
|
|
|
+const PARSE_TIMEOUT_MS = 10_000;
|
|
|
+
|
|
|
+/**
|
|
|
+ * Number of files to parse before recycling the worker thread.
|
|
|
+ * WASM linear memory can grow but NEVER shrink (WebAssembly spec limitation).
|
|
|
+ * The only way to reclaim tree-sitter's WASM heap is to destroy the entire
|
|
|
+ * V8 isolate by terminating the worker thread and spawning a fresh one.
|
|
|
+ * This interval balances memory usage against the cost of reloading grammars.
|
|
|
+ */
|
|
|
+const WORKER_RECYCLE_INTERVAL = 500;
|
|
|
+
|
|
|
/**
|
|
|
* Progress callback for indexing operations
|
|
|
*/
|
|
|
@@ -395,7 +411,8 @@ export class ExtractionOrchestrator {
|
|
|
*/
|
|
|
async indexAll(
|
|
|
onProgress?: (progress: IndexProgress) => void,
|
|
|
- signal?: AbortSignal
|
|
|
+ signal?: AbortSignal,
|
|
|
+ verbose?: boolean
|
|
|
): Promise<IndexResult> {
|
|
|
await initGrammars();
|
|
|
const startTime = Date.now();
|
|
|
@@ -406,6 +423,10 @@ export class ExtractionOrchestrator {
|
|
|
let totalNodes = 0;
|
|
|
let totalEdges = 0;
|
|
|
|
|
|
+ const log = verbose
|
|
|
+ ? (msg: string) => { console.log(`[worker] ${msg}`); }
|
|
|
+ : (_msg: string) => {};
|
|
|
+
|
|
|
// Phase 1: Scan for files
|
|
|
onProgress?.({
|
|
|
phase: 'scanning',
|
|
|
@@ -446,58 +467,139 @@ export class ExtractionOrchestrator {
|
|
|
// Falls back to in-process parsing if the compiled worker is unavailable (e.g. tests).
|
|
|
const parseWorkerPath = path.join(__dirname, 'parse-worker.js');
|
|
|
const useWorker = fs.existsSync(parseWorkerPath);
|
|
|
- let parseWorker: import('worker_threads').Worker | null = null;
|
|
|
+ let WorkerClass: typeof import('worker_threads').Worker | null = null;
|
|
|
|
|
|
if (useWorker) {
|
|
|
const { Worker } = await import('worker_threads');
|
|
|
- parseWorker = new Worker(parseWorkerPath);
|
|
|
+ WorkerClass = Worker;
|
|
|
} else {
|
|
|
// In-process fallback: load grammars locally
|
|
|
await loadGrammarsForLanguages(neededLanguages);
|
|
|
}
|
|
|
|
|
|
- // Set up worker-based or in-process parsing
|
|
|
+ // --- Worker lifecycle management ---
|
|
|
+ // The worker can crash (OOM in WASM) or hang on pathological files.
|
|
|
+ // We track pending parse promises and handle both cases:
|
|
|
+ // - Timeout: terminate + restart the worker, reject the timed-out request
|
|
|
+ // - Crash: reject all pending promises, restart for remaining files
|
|
|
+ let parseWorker: import('worker_threads').Worker | null = null;
|
|
|
let nextId = 0;
|
|
|
+ let workerParseCount = 0;
|
|
|
const pendingParses = new Map<number, {
|
|
|
resolve: (result: ExtractionResult) => void;
|
|
|
+ reject: (err: Error) => void;
|
|
|
+ timer: ReturnType<typeof setTimeout>;
|
|
|
}>();
|
|
|
|
|
|
- if (parseWorker) {
|
|
|
- // Wait for grammars to load in the worker
|
|
|
- await new Promise<void>((resolve, reject) => {
|
|
|
- parseWorker!.once('message', (msg: { type: string }) => {
|
|
|
- if (msg.type === 'grammars-loaded') resolve();
|
|
|
- else reject(new Error(`Unexpected message: ${msg.type}`));
|
|
|
- });
|
|
|
- parseWorker!.postMessage({ type: 'load-grammars', languages: neededLanguages });
|
|
|
- });
|
|
|
+ function rejectAllPending(reason: string): void {
|
|
|
+ for (const [id, pending] of pendingParses) {
|
|
|
+ clearTimeout(pending.timer);
|
|
|
+ pendingParses.delete(id);
|
|
|
+ pending.reject(new Error(reason));
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
- parseWorker.on('message', (msg: { type: string; id?: number; result?: ExtractionResult }) => {
|
|
|
+ function attachWorkerHandlers(w: import('worker_threads').Worker): void {
|
|
|
+ w.on('message', (msg: { type: string; id?: number; result?: ExtractionResult }) => {
|
|
|
if (msg.type === 'parse-result' && msg.id !== undefined) {
|
|
|
const pending = pendingParses.get(msg.id);
|
|
|
if (pending) {
|
|
|
+ clearTimeout(pending.timer);
|
|
|
pendingParses.delete(msg.id);
|
|
|
pending.resolve(msg.result!);
|
|
|
}
|
|
|
}
|
|
|
});
|
|
|
+
|
|
|
+ w.on('error', (err) => {
|
|
|
+ logWarn('Parse worker error', { error: err.message });
|
|
|
+ rejectAllPending(`Worker error: ${err.message}`);
|
|
|
+ });
|
|
|
+
|
|
|
+ w.on('exit', (code) => {
|
|
|
+ if (code !== 0 && pendingParses.size > 0) {
|
|
|
+ logWarn('Parse worker exited unexpectedly', { code });
|
|
|
+ rejectAllPending(`Worker exited with code ${code}`);
|
|
|
+ }
|
|
|
+ // Clear reference so we know to respawn
|
|
|
+ if (parseWorker === w) parseWorker = null;
|
|
|
+ });
|
|
|
}
|
|
|
|
|
|
- function requestParse(filePath: string, content: string): Promise<ExtractionResult> {
|
|
|
- if (parseWorker) {
|
|
|
- return new Promise<ExtractionResult>((resolve) => {
|
|
|
- const id = nextId++;
|
|
|
- pendingParses.set(id, { resolve });
|
|
|
- parseWorker!.postMessage({ type: 'parse', id, filePath, content });
|
|
|
+ async function ensureWorker(): Promise<import('worker_threads').Worker> {
|
|
|
+ if (parseWorker) return parseWorker;
|
|
|
+ log('Spawning new parse worker...');
|
|
|
+ parseWorker = new WorkerClass!(parseWorkerPath);
|
|
|
+ attachWorkerHandlers(parseWorker);
|
|
|
+
|
|
|
+ // Load grammars in the new worker
|
|
|
+ await new Promise<void>((resolve, reject) => {
|
|
|
+ parseWorker!.once('message', (msg: { type: string }) => {
|
|
|
+ if (msg.type === 'grammars-loaded') resolve();
|
|
|
+ else reject(new Error(`Unexpected message: ${msg.type}`));
|
|
|
});
|
|
|
+ parseWorker!.postMessage({ type: 'load-grammars', languages: neededLanguages });
|
|
|
+ });
|
|
|
+
|
|
|
+ return parseWorker;
|
|
|
+ }
|
|
|
+
|
|
|
+ if (WorkerClass) {
|
|
|
+ await ensureWorker();
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Recycle the worker thread to reclaim WASM memory.
|
|
|
+ * Terminates the current worker and clears the reference so
|
|
|
+ * ensureWorker() will spawn a fresh one on the next call.
|
|
|
+ */
|
|
|
+ function recycleWorker(): void {
|
|
|
+ if (!parseWorker) return;
|
|
|
+ log(`Recycling worker after ${workerParseCount} parses (heap: ${Math.round(process.memoryUsage().rss / 1024 / 1024)}MB RSS)`);
|
|
|
+ const w = parseWorker;
|
|
|
+ parseWorker = null;
|
|
|
+ workerParseCount = 0;
|
|
|
+ // Fire-and-forget: worker.terminate() can hang if WASM is stuck
|
|
|
+ w.terminate().catch(() => {});
|
|
|
+ }
|
|
|
+
|
|
|
+ async function requestParse(filePath: string, content: string): Promise<ExtractionResult> {
|
|
|
+ if (!WorkerClass) {
|
|
|
+ // In-process fallback
|
|
|
+ return extractFromSource(filePath, content, detectLanguage(filePath));
|
|
|
}
|
|
|
- // In-process fallback
|
|
|
- return Promise.resolve(extractFromSource(filePath, content, detectLanguage(filePath)));
|
|
|
+
|
|
|
+ // Recycle the worker before the next parse if we've hit the threshold.
|
|
|
+ // This destroys the WASM linear memory (which can grow but never shrink)
|
|
|
+ // and starts a fresh worker with a clean heap.
|
|
|
+ if (workerParseCount >= WORKER_RECYCLE_INTERVAL) {
|
|
|
+ await recycleWorker();
|
|
|
+ }
|
|
|
+
|
|
|
+ const worker = await ensureWorker();
|
|
|
+ const id = nextId++;
|
|
|
+ workerParseCount++;
|
|
|
+
|
|
|
+ return new Promise<ExtractionResult>((resolve, reject) => {
|
|
|
+ const timer = setTimeout(() => {
|
|
|
+ pendingParses.delete(id);
|
|
|
+ log(`TIMEOUT: ${filePath} exceeded ${PARSE_TIMEOUT_MS}ms — killing worker`);
|
|
|
+ // Reject FIRST — worker.terminate() can hang if WASM is stuck
|
|
|
+ parseWorker = null;
|
|
|
+ workerParseCount = 0;
|
|
|
+ reject(new Error(`Parse timed out after ${PARSE_TIMEOUT_MS}ms`));
|
|
|
+ // Fire-and-forget: kill the stuck worker in the background
|
|
|
+ worker.terminate().catch(() => {});
|
|
|
+ }, PARSE_TIMEOUT_MS);
|
|
|
+
|
|
|
+ pendingParses.set(id, { resolve, reject, timer });
|
|
|
+ worker.postMessage({ type: 'parse', id, filePath, content });
|
|
|
+ });
|
|
|
}
|
|
|
|
|
|
for (let i = 0; i < files.length; i += FILE_IO_BATCH_SIZE) {
|
|
|
if (signal?.aborted) {
|
|
|
- if (parseWorker) await parseWorker.terminate();
|
|
|
+ if (parseWorker) (parseWorker as import('worker_threads').Worker).terminate().catch(() => {});
|
|
|
return {
|
|
|
success: false,
|
|
|
filesIndexed,
|
|
|
@@ -533,7 +635,7 @@ export class ExtractionOrchestrator {
|
|
|
// Send to worker for parsing, store results on main thread
|
|
|
for (const { filePath, content, stats, error } of fileContents) {
|
|
|
if (signal?.aborted) {
|
|
|
- if (parseWorker) await parseWorker.terminate();
|
|
|
+ if (parseWorker) (parseWorker as import('worker_threads').Worker).terminate().catch(() => {});
|
|
|
return {
|
|
|
success: false,
|
|
|
filesIndexed,
|
|
|
@@ -546,7 +648,7 @@ export class ExtractionOrchestrator {
|
|
|
};
|
|
|
}
|
|
|
|
|
|
- processed++;
|
|
|
+ // Report progress before parsing (show current file being worked on)
|
|
|
onProgress?.({
|
|
|
phase: 'parsing',
|
|
|
current: processed,
|
|
|
@@ -555,6 +657,7 @@ export class ExtractionOrchestrator {
|
|
|
});
|
|
|
|
|
|
if (error || content === null || stats === null) {
|
|
|
+ processed++;
|
|
|
filesErrored++;
|
|
|
errors.push({
|
|
|
message: `Failed to read file: ${error instanceof Error ? error.message : String(error)}`,
|
|
|
@@ -565,8 +668,24 @@ export class ExtractionOrchestrator {
|
|
|
continue;
|
|
|
}
|
|
|
|
|
|
- // Parse in worker thread (main thread stays unblocked)
|
|
|
- const result = await requestParse(filePath, content);
|
|
|
+ // Parse in worker thread (main thread stays unblocked).
|
|
|
+ // Wrapped in try/catch to handle worker timeouts and crashes gracefully.
|
|
|
+ let result: ExtractionResult;
|
|
|
+ try {
|
|
|
+ result = await requestParse(filePath, content);
|
|
|
+ } catch (parseErr) {
|
|
|
+ processed++;
|
|
|
+ filesErrored++;
|
|
|
+ errors.push({
|
|
|
+ message: parseErr instanceof Error ? parseErr.message : String(parseErr),
|
|
|
+ filePath,
|
|
|
+ severity: 'error',
|
|
|
+ code: 'parse_error',
|
|
|
+ });
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+
|
|
|
+ processed++;
|
|
|
|
|
|
// Store in database on main thread (SQLite is not thread-safe)
|
|
|
if (result.nodes.length > 0 || result.errors.length === 0) {
|
|
|
@@ -593,8 +712,11 @@ export class ExtractionOrchestrator {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- // Shut down parse worker
|
|
|
- if (parseWorker) await parseWorker.terminate();
|
|
|
+ // Shut down parse worker and clear any pending timers
|
|
|
+ rejectAllPending('Indexing complete');
|
|
|
+ if (parseWorker) {
|
|
|
+ (parseWorker as import('worker_threads').Worker).terminate().catch(() => {});
|
|
|
+ }
|
|
|
|
|
|
// Phase 3: Resolve references
|
|
|
onProgress?.({
|