mirror of
https://github.com/garrytan/gstack.git
synced 2026-05-13 07:53:04 +08:00
v1.33.0.0 feat: /sync-gbrain memory-stage batch-import refactor (D1-D8) + F6/F9 + signal cleanup (#1432)
* refactor: batch-import architecture (D1-D8) + F6 atomic state + F9 full-file hash
bin/gstack-memory-ingest.ts: rewrite memory ingest around `gbrain import <dir>`
batch path. Replaces per-file gbrainPutPage loop (~470s of subprocess startup
per cold run) with prepare-then-batch:
walkAllSources
-> preparePages: mtime-skip + optional gitleaks (--scan-secrets) + parse
-> writeStaged: mkdir -p per slug segment, hierarchical (D1)
-> snapshot ~/.gbrain/sync-failures.jsonl byte offset
-> runGbrainImport (async spawn) -> parseImportJson
-> readNewFailures: read appended bytes, map back to source paths (D7)
-> state.sessions[path] = {...} for files NOT in failed set
-> saveStateAtomic (F6) + cleanupStagingDir
Architecture decisions:
D1 hierarchical staging dir
D2 cut over, deleted gbrainPutPage entirely
D3 source-file gitleaks made opt-in via --scan-secrets (gstack-brain-sync
owns the cross-machine boundary; per-file scan was redundant ~470s tax)
D4 OK/ERR verdict (no DEGRADED tri-state)
D5 unified state schema (no separate skip-list)
D6 trust gbrain content_hash idempotency (no skip_reason bookkeeping)
D7 byte-offset snapshot of sync-failures.jsonl + per-source mapping
F6 saveState uses tmp+rename atomic write
F9 fileSha256 removes 1MB cap; full-file hash (no more silent tail-edit
misses on long partial transcripts)
Signal handling: installSignalForwarder propagates SIGTERM/SIGINT to the
gbrain child process AND synchronously cleans the staging dir before
process.exit. Pre-fix, orchestrator timeouts left gbrain processes
orphaned holding the PGLite write lock (observed: 15-hour-CPU-time
orphan still alive a day later).
parseImportJson returns null on unparseable output (treated as ERR by
caller) instead of silently zeroing through.
gbrainAvailable() probes for the `import` subcommand instead of `put`.
Plan + review chain at /Users/garrytan/.claude/plans/purrfect-tumbling-quiche.md.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
* feat: orchestrator OK/ERR verdict parser for batch memory ingest
gstack-gbrain-sync.ts: memory-stage parser now picks [memory-ingest] ERR
lines preferentially over the latest [memory-ingest] line, strips the
prefix and any leading 'ERR: ' for cleaner summary output, and surfaces
'(killed by signal / timeout)' when the child exits with status=null.
Matches D6's OK/ERR contract: per-file failures (FILE_TOO_LARGE etc.)
show in the summary count but only system-level failures (gbrain crash,
process kill, missing CLI) mark the stage ERR.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
* test: batch-ingest writer regressions + refresh golden ship fixtures
test/gstack-memory-ingest.test.ts: 5 new tests for the batch-import
architecture:
1. D1 hierarchical staging slug round-trip — asserts staged file lives
in transcripts/claude-code/<dir>/*.md, not flat at staging root
2. Frontmatter injection — asserts title/type/tags written into the
staged page's YAML block
3. D7 sync-failures.jsonl exclusion — files listed as failed by
gbrain do NOT get state-recorded; one of two test sessions lands,
the other stays un-ingested for retry next run
4. Missing-`import`-subcommand error path — when gbrain only advertises
legacy `put`, memory-ingest exits 1 with [memory-ingest] ERR
5. --scan-secrets opt-in path — verifies a dirty-source file is
skipped via the secret-scan match when the flag is on, while a
clean session in the same run still gets staged
Replaces the prior put-per-file shim with an import-batch shim. The
shim fails loudly (exit 99) if the new code ever regresses to per-file
`gbrain put` calls.
test/fixtures/golden/{claude,codex,factory}-ship-SKILL.md: refresh
golden baselines to match the current generated SKILL.md content after
the v1.31.0.0 AskUserQuestion fallback-clause deletion. Goldens were
stale from that release; test was failing on origin/main before this
PR. Caught by the /ship test pass.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
* v1.33.0.0 docs: design doc, P2 perf TODOs, gbrain guidance block, changelog
docs/designs/SYNC_GBRAIN_BATCH_INGEST.md: full design doc with the 8
decisions (D1-D8), source-verified gbrain behaviors (content_hash
idempotency, frontmatter parity, path-authoritative slug, per-file
failure surface), measured performance vs plan target, F9 hash
migration one-time cliff note, and follow-up TODOs.
CLAUDE.md: append `## GBrain Search Guidance` block from /sync-gbrain
indicating this worktree's pin and how the agent should prefer gbrain
search over Grep for semantic queries.
TODOS.md: P2 `gbrain import` perf-on-large-staging-dirs investigation
(5,131 files takes >10min in gbrain when 501 takes 10s — likely N+1
SQL or auto-link reconciliation). P3 cache-no-changes-since-last-import
at the prepare-batch level for true no-op fast paths.
VERSION + package.json: bump to 1.33.0.0 (queue-aware via
bin/gstack-next-version — skipped v1.32.0.0 which is claimed by
sibling worktree garrytan/wellington / PR #1431).
CHANGELOG.md: v1.33.0.0 entry per the release-summary format.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
* docs: setup-gbrain/memory.md reflects opt-in per-file gitleaks
Per-file gitleaks scanning during memory ingest is now opt-in via
--scan-secrets (or GSTACK_MEMORY_INGEST_SCAN_SECRETS=1). Update the
user-facing reference doc so it stops claiming "every page passes
through gitleaks." Also corrects the /gbrain-sync → /sync-gbrain
command typo and the post-incident recovery section.
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
---------
Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -47,9 +47,14 @@ import {
|
||||
statSync,
|
||||
mkdirSync,
|
||||
appendFileSync,
|
||||
renameSync,
|
||||
openSync,
|
||||
readSync,
|
||||
closeSync,
|
||||
rmSync,
|
||||
} from "fs";
|
||||
import { join, basename, dirname } from "path";
|
||||
import { execSync, execFileSync } from "child_process";
|
||||
import { execSync, execFileSync, spawnSync, spawn, type ChildProcess } from "child_process";
|
||||
import { homedir } from "os";
|
||||
import { createHash } from "crypto";
|
||||
|
||||
@@ -73,6 +78,12 @@ interface CliArgs {
|
||||
sources: Set<MemoryType>;
|
||||
limit: number | null;
|
||||
noWrite: boolean;
|
||||
/**
|
||||
* Opt-in per-file gitleaks scan during the prepare phase. Off by
|
||||
* default — the cross-machine boundary (gstack-brain-sync, git push)
|
||||
* has its own scanner. Setting this adds ~4-8 min to cold runs.
|
||||
*/
|
||||
scanSecrets: boolean;
|
||||
}
|
||||
|
||||
type MemoryType =
|
||||
@@ -137,6 +148,14 @@ interface BulkResult {
|
||||
failed: number;
|
||||
duration_ms: number;
|
||||
partial_pages: number;
|
||||
/**
|
||||
* D6: when set, indicates a process-level failure (gbrain CLI missing
|
||||
* or `gbrain import` crashed). Per-file errors (FILE_TOO_LARGE etc.)
|
||||
* land in `failed` but do NOT set this flag — the orchestrator should
|
||||
* still treat the run as OK with summary mentioning the failure count.
|
||||
* Only when this is set does the verdict become ERR.
|
||||
*/
|
||||
system_error?: string;
|
||||
}
|
||||
|
||||
// ── Constants ──────────────────────────────────────────────────────────────
|
||||
@@ -176,6 +195,9 @@ Options:
|
||||
--limit <N> Stop after N pages written (smoke testing).
|
||||
--no-write Skip gbrain put_page calls (still updates state file).
|
||||
Used by tests + dry runs without actual ingest.
|
||||
--scan-secrets Opt-in per-file gitleaks scan during prepare. Off by
|
||||
default; gstack-brain-sync already gates the git-push
|
||||
boundary. Adds ~4-8 min to cold runs.
|
||||
--help This text.
|
||||
`);
|
||||
}
|
||||
@@ -190,6 +212,7 @@ function parseArgs(): CliArgs {
|
||||
let limit: number | null = null;
|
||||
let sources: Set<MemoryType> = new Set(ALL_TYPES);
|
||||
let noWrite = process.env.GSTACK_MEMORY_INGEST_NO_WRITE === "1";
|
||||
let scanSecrets = process.env.GSTACK_MEMORY_INGEST_SCAN_SECRETS === "1";
|
||||
|
||||
for (let i = 0; i < args.length; i++) {
|
||||
const a = args[i];
|
||||
@@ -202,6 +225,7 @@ function parseArgs(): CliArgs {
|
||||
case "--include-unattributed": includeUnattributed = true; break;
|
||||
case "--all-history": allHistory = true; break;
|
||||
case "--no-write": noWrite = true; break;
|
||||
case "--scan-secrets": scanSecrets = true; break;
|
||||
case "--limit":
|
||||
limit = parseInt(args[++i] || "0", 10);
|
||||
if (!Number.isFinite(limit) || limit <= 0) {
|
||||
@@ -229,7 +253,7 @@ function parseArgs(): CliArgs {
|
||||
}
|
||||
}
|
||||
|
||||
return { mode, quiet, benchmark, includeUnattributed, allHistory, sources, limit, noWrite };
|
||||
return { mode, quiet, benchmark, includeUnattributed, allHistory, sources, limit, noWrite, scanSecrets };
|
||||
}
|
||||
|
||||
// ── State file ─────────────────────────────────────────────────────────────
|
||||
@@ -268,9 +292,14 @@ function loadState(): IngestState {
|
||||
}
|
||||
|
||||
function saveState(state: IngestState): void {
|
||||
// F6 (Codex finding 6): tmp+rename atomic write so a crash mid-write
|
||||
// never leaves a truncated/corrupt state file. Matches the pattern
|
||||
// in gstack-gbrain-sync.ts:saveSyncState.
|
||||
try {
|
||||
mkdirSync(dirname(STATE_PATH), { recursive: true });
|
||||
writeFileSync(STATE_PATH, JSON.stringify(state, null, 2), "utf-8");
|
||||
const tmp = `${STATE_PATH}.tmp.${process.pid}`;
|
||||
writeFileSync(tmp, JSON.stringify(state, null, 2), "utf-8");
|
||||
renameSync(tmp, STATE_PATH);
|
||||
} catch (err) {
|
||||
console.error(`[state] write failed: ${(err as Error).message}`);
|
||||
}
|
||||
@@ -278,12 +307,15 @@ function saveState(state: IngestState): void {
|
||||
|
||||
// ── File hash + change detection ───────────────────────────────────────────
|
||||
|
||||
function fileSha256(path: string, maxBytes = 1024 * 1024): string {
|
||||
// Hash the first 1MB only; sufficient for change detection on big JSONL.
|
||||
function fileSha256(path: string): string {
|
||||
// F9 (Codex finding 9): full-file hash. The prior 1MB cap silently
|
||||
// missed tail edits to long partial transcripts — exactly the
|
||||
// recovery case this pipeline needs to handle correctly. Realistic
|
||||
// max for an ingest source is ~50MB (long JSONL); fine to load in
|
||||
// memory for hashing.
|
||||
try {
|
||||
const fd = readFileSync(path);
|
||||
const slice = fd.length > maxBytes ? fd.subarray(0, maxBytes) : fd;
|
||||
return createHash("sha256").update(slice).digest("hex");
|
||||
const buf = readFileSync(path);
|
||||
return createHash("sha256").update(buf).digest("hex");
|
||||
} catch {
|
||||
return "";
|
||||
}
|
||||
@@ -753,51 +785,66 @@ function buildArtifactPage(path: string, type: MemoryType): PageRecord {
|
||||
};
|
||||
}
|
||||
|
||||
// ── Writer (calls `gbrain put`) ────────────────────────────────────────────
|
||||
// ── Writer (batch via `gbrain import <dir>`) ───────────────────────────────
|
||||
//
|
||||
// Architecture (post plan-eng-review + Codex outside-voice):
|
||||
//
|
||||
// walkAllSources(ctx)
|
||||
// → for each path: mtime-skip / source-file gitleaks (D3) / parse / buildPage
|
||||
// → renderPageBody injects title/type/tags into YAML frontmatter
|
||||
// → writeStaged: mkdir -p slug subdirs (D1), write ${slug}.md
|
||||
// → snapshot ~/.gbrain/sync-failures.jsonl byte-offset (D7)
|
||||
// → spawnSync `gbrain import <stagingDir> --no-embed --json` (D6)
|
||||
// → parseImportJson(stdout) → { imported, skipped, errors, ... } (D6 OK/ERR)
|
||||
// → readNewFailures(preImportOffset, slugMap) → Set<sourcePath> (D7)
|
||||
// → state.sessions[path] = { ... } for prepared files NOT in failed set
|
||||
// → saveStateAtomic (F6 tmp+rename) + cleanupStagingDir
|
||||
//
|
||||
// We trust gbrain's content_hash idempotency (verified in
|
||||
// ~/git/gbrain/src/core/import-file.ts:242-243, :478) — repeated imports
|
||||
// of identical content are cheap. So we do NOT track per-file skip_reasons,
|
||||
// do NOT keep a SIGTERM checkpoint, and do NOT advance a three-state verdict.
|
||||
|
||||
let _gbrainAvailability: boolean | null = null;
|
||||
function gbrainAvailable(): boolean {
|
||||
if (_gbrainAvailability !== null) return _gbrainAvailability;
|
||||
try {
|
||||
execSync("command -v gbrain", { stdio: "ignore" });
|
||||
// gbrain v0.27 retired the legacy `put_page` flag-form for `put <slug>`
|
||||
// (content via stdin, metadata as YAML frontmatter). Probe `--help` for
|
||||
// the `put` subcommand so we surface a single clean error here rather
|
||||
// than failing every page with "Unknown command: put_page". The regex
|
||||
// anchors on the indented subcommand format gbrain's help actually uses
|
||||
// (" put ..."), not any whitespace-bordered "put" word in prose.
|
||||
// Probe `--help` for the `import` subcommand. gbrain v0.20.0+ ships
|
||||
// `import <dir>` (batch markdown import via path-authoritative slugs).
|
||||
// If absent, we surface a single clean error here rather than failing
|
||||
// the whole stage with a confusing usage message from gbrain itself.
|
||||
const help = execFileSync("gbrain", ["--help"], {
|
||||
encoding: "utf-8",
|
||||
timeout: 5000,
|
||||
stdio: ["ignore", "pipe", "pipe"],
|
||||
});
|
||||
_gbrainAvailability = /^\s+put\s/m.test(help);
|
||||
_gbrainAvailability = /^\s+import\s/m.test(help);
|
||||
} catch {
|
||||
_gbrainAvailability = false;
|
||||
}
|
||||
return _gbrainAvailability;
|
||||
}
|
||||
|
||||
function gbrainPutPage(page: PageRecord): { ok: boolean; error?: string } {
|
||||
if (!gbrainAvailable()) {
|
||||
return { ok: false, error: "gbrain CLI not in PATH or missing `put` subcommand" };
|
||||
}
|
||||
// gbrain v0.27+ uses `put <slug>` (positional, content via stdin) instead
|
||||
// of the legacy `put_page` flag form. Metadata rides as YAML frontmatter:
|
||||
// - When the page body already starts with frontmatter (transcripts), inject
|
||||
// title/type/tags into the existing block so gbrain's frontmatter parser
|
||||
// picks them up.
|
||||
// - When the page body has no frontmatter (raw artifacts: design-docs,
|
||||
// learnings, builder-profile-entries), wrap with a fresh frontmatter
|
||||
// carrying the same fields. Without this branch, artifact pages would
|
||||
// land in gbrain with empty title/type/tags.
|
||||
/**
|
||||
* Build the markdown body with YAML frontmatter (title/type/tags) injected.
|
||||
*
|
||||
* Two cases:
|
||||
* - Page body already starts with `---\n` (transcripts) — inject into the
|
||||
* existing frontmatter block before its close fence so gbrain's frontmatter
|
||||
* parser picks up the fields alongside any session-level metadata the
|
||||
* transcript builder already wrote (session_id, cwd, git_remote, etc.).
|
||||
* - No leading frontmatter (raw artifacts: design-docs, learnings, etc.) —
|
||||
* wrap with a fresh frontmatter block carrying title/type/tags. Without
|
||||
* this branch, artifact pages would land in gbrain with empty metadata.
|
||||
*
|
||||
* gbrain enforces slug = path-derived (slugifyPath in gbrain's sync.ts).
|
||||
* We do NOT set `slug:` in frontmatter — the staging-dir filename is the
|
||||
* source of truth and gbrain rejects mismatches.
|
||||
*/
|
||||
function renderPageBody(page: PageRecord): string {
|
||||
let body = page.body;
|
||||
if (body.startsWith("---\n")) {
|
||||
// Locate the closing --- delimiter. buildTranscriptPage joins with "\n"
|
||||
// and does not append a trailing newline, so the close fence looks like
|
||||
// "...\n---" followed directly by body content (no "\n---\n" pattern).
|
||||
// Match the close on "\n---" only — the inject lands BEFORE the close
|
||||
// fence, inside the frontmatter block, regardless of what follows it.
|
||||
const end = body.indexOf("\n---", 4);
|
||||
if (end > 0) {
|
||||
const inject = [
|
||||
@@ -822,27 +869,155 @@ function gbrainPutPage(page: PageRecord): { ok: boolean; error?: string } {
|
||||
// Strip NUL bytes — Postgres rejects 0x00 in UTF-8 text columns. Some Claude
|
||||
// Code transcripts contain NUL inside user-pasted content or tool output, and
|
||||
// surfacing those as `internal_error: invalid byte sequence` from the brain
|
||||
// is unhelpful when we can sanitize at write time.
|
||||
// is unhelpful when we can sanitize at write time. Originally landed in v1.32.0.0
|
||||
// (PR #1411) on the per-file `gbrain put` path; moved here so all staged
|
||||
// pages still get the same sanitization.
|
||||
body = body.replace(/\x00/g, "");
|
||||
try {
|
||||
execFileSync("gbrain", ["put", page.slug], {
|
||||
input: body,
|
||||
encoding: "utf-8",
|
||||
// Bumped from 30s: auto-link reconciliation on dense transcripts hits
|
||||
// 30s once the brain has a few hundred existing pages.
|
||||
timeout: 60000,
|
||||
// Bumped from default 1MB: without this, gbrain's actual stderr gets
|
||||
// truncated and callers see only "Command failed:" with no detail.
|
||||
maxBuffer: 16 * 1024 * 1024,
|
||||
stdio: ["pipe", "pipe", "pipe"],
|
||||
});
|
||||
return { ok: true };
|
||||
} catch (err: any) {
|
||||
const stderr = err?.stderr?.toString?.() ?? "";
|
||||
const stdout = err?.stdout?.toString?.() ?? "";
|
||||
const detail = stderr || stdout || (err instanceof Error ? err.message : String(err));
|
||||
return { ok: false, error: detail.split("\n")[0].slice(0, 300) };
|
||||
return body;
|
||||
}
|
||||
|
||||
interface PreparedPage {
|
||||
/** Page slug (path-shaped, e.g. "transcripts/claude-code/foo"). */
|
||||
slug: string;
|
||||
/** Original source file on disk (e.g. ~/.claude/projects/.../foo.jsonl). */
|
||||
source_path: string;
|
||||
/** Full markdown including frontmatter — ready to write. */
|
||||
rendered_body: string;
|
||||
/** Carry-through fields for state recording on success. */
|
||||
page_slug: string;
|
||||
partial: boolean;
|
||||
}
|
||||
|
||||
interface StagingResult {
|
||||
staging_dir: string;
|
||||
written: number;
|
||||
errors: Array<{ slug: string; error: string }>;
|
||||
/** Map from staging-dir-relative path (e.g. "transcripts/foo.md") → source path. */
|
||||
stagedPathToSource: Map<string, string>;
|
||||
}
|
||||
|
||||
/**
|
||||
* Write prepared pages to a staging dir, mirroring slug hierarchy.
|
||||
*
|
||||
* D1: gbrain's `slugifyPath` (sync.ts:260) derives the slug from the
|
||||
* directory-aware relative path inside the import dir, so slugs containing
|
||||
* slashes (e.g. "transcripts/claude-code/foo") must live in matching
|
||||
* subdirectories of the staging dir. Otherwise the slug becomes flattened
|
||||
* or rejected by gbrain's path-vs-frontmatter slug check (import-file.ts:429).
|
||||
*
|
||||
* Filename = `${slug}.md`. mkdir is recursive. Existing files overwrite.
|
||||
* Errors per-file are collected; the whole batch is best-effort.
|
||||
*/
|
||||
function writeStaged(prepared: PreparedPage[], stagingDir: string): StagingResult {
|
||||
mkdirSync(stagingDir, { recursive: true });
|
||||
const stagedPathToSource = new Map<string, string>();
|
||||
const errors: Array<{ slug: string; error: string }> = [];
|
||||
let written = 0;
|
||||
for (const p of prepared) {
|
||||
const relPath = `${p.slug}.md`;
|
||||
const absPath = join(stagingDir, relPath);
|
||||
try {
|
||||
mkdirSync(dirname(absPath), { recursive: true });
|
||||
writeFileSync(absPath, p.rendered_body, "utf-8");
|
||||
stagedPathToSource.set(relPath, p.source_path);
|
||||
written++;
|
||||
} catch (err) {
|
||||
errors.push({ slug: p.slug, error: (err as Error).message });
|
||||
}
|
||||
}
|
||||
return { staging_dir: stagingDir, written, errors, stagedPathToSource };
|
||||
}
|
||||
|
||||
interface ImportJsonResult {
|
||||
status?: string;
|
||||
duration_s?: number;
|
||||
imported?: number;
|
||||
skipped?: number;
|
||||
errors?: number;
|
||||
chunks?: number;
|
||||
total_files?: number;
|
||||
}
|
||||
|
||||
/**
|
||||
* Parse the `gbrain import --json` stdout payload (single JSON object on
|
||||
* the last non-empty line per commands/import.ts:271-275).
|
||||
*
|
||||
* Returns parsed counts on success, or `null` to signal "unparseable" — the
|
||||
* caller treats null as ERR (system_error) rather than silently passing
|
||||
* through as zeros. Pre-2026-05-11 this returned zeros on parse failure,
|
||||
* which silently masked gbrain crashes as "0 imported, 0 failed = OK".
|
||||
*/
|
||||
function parseImportJson(stdout: string): ImportJsonResult | null {
|
||||
const lines = stdout.split("\n").map((s) => s.trim()).filter(Boolean);
|
||||
for (let i = lines.length - 1; i >= 0; i--) {
|
||||
const line = lines[i];
|
||||
if (line.startsWith("{") && line.endsWith("}")) {
|
||||
try {
|
||||
const parsed = JSON.parse(line);
|
||||
if (typeof parsed === "object" && parsed && "imported" in parsed) {
|
||||
return parsed as ImportJsonResult;
|
||||
}
|
||||
} catch {
|
||||
// try next line up
|
||||
}
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Read failures appended to ~/.gbrain/sync-failures.jsonl since the
|
||||
* snapshotted byte offset, and map them back to source paths.
|
||||
*
|
||||
* D7: gbrain import writes per-file failures to sync-failures.jsonl
|
||||
* (commands/import.ts:308-310) explicitly so "callers can gate state
|
||||
* advances" (comment at :28). We snapshot the file size before import
|
||||
* and read only the appended bytes after, so we never confuse new
|
||||
* entries with prior-run leftovers.
|
||||
*
|
||||
* Each line is `{ path, error, code, commit, ts }`. The `path` is the
|
||||
* staging-dir-relative filename gbrain saw (e.g. "transcripts/foo.md").
|
||||
* stagedPathToSource maps that back to the original source file.
|
||||
*/
|
||||
function readNewFailures(
|
||||
syncFailuresPath: string,
|
||||
preImportOffset: number,
|
||||
stagedPathToSource: Map<string, string>,
|
||||
): Set<string> {
|
||||
const failed = new Set<string>();
|
||||
try {
|
||||
if (!existsSync(syncFailuresPath)) return failed;
|
||||
const stat = statSync(syncFailuresPath);
|
||||
if (stat.size <= preImportOffset) return failed;
|
||||
// Read appended bytes only. readSync with a positional offset works
|
||||
// synchronously without slurping the whole file.
|
||||
const fd = openSync(syncFailuresPath, "r");
|
||||
try {
|
||||
const buf = Buffer.alloc(stat.size - preImportOffset);
|
||||
readSync(fd, buf, 0, buf.length, preImportOffset);
|
||||
const text = buf.toString("utf-8");
|
||||
for (const line of text.split("\n")) {
|
||||
const trimmed = line.trim();
|
||||
if (!trimmed) continue;
|
||||
try {
|
||||
const entry = JSON.parse(trimmed) as { path?: string };
|
||||
if (entry.path) {
|
||||
const source = stagedPathToSource.get(entry.path);
|
||||
if (source) failed.add(source);
|
||||
}
|
||||
} catch {
|
||||
// ignore malformed line
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
closeSync(fd);
|
||||
}
|
||||
} catch {
|
||||
// Best-effort. If we can't read failures, we conservatively assume
|
||||
// none — caller will state-record all prepared files. Worst case:
|
||||
// failed files get a retry-on-next-run shot anyway via content_hash.
|
||||
}
|
||||
return failed;
|
||||
}
|
||||
|
||||
// ── Main ingest passes ─────────────────────────────────────────────────────
|
||||
@@ -901,34 +1076,72 @@ async function probeMode(args: CliArgs): Promise<ProbeReport> {
|
||||
};
|
||||
}
|
||||
|
||||
async function ingestPass(args: CliArgs): Promise<BulkResult> {
|
||||
const t0 = Date.now();
|
||||
const state = loadState();
|
||||
const ctx = makeWalkContext(args, state);
|
||||
|
||||
let written = 0;
|
||||
/**
|
||||
* Prepare phase: walk sources, apply incremental + optional-secret-scan filters,
|
||||
* parse transcripts/artifacts into PageRecord, render bodies with
|
||||
* frontmatter. Returns the PreparedPage[] to stage + counts of files
|
||||
* filtered at each gate.
|
||||
*
|
||||
* Secret scanning policy (post 2026-05-10 perf review):
|
||||
*
|
||||
* The actual cross-machine exfiltration boundary is `gstack-brain-sync`,
|
||||
* which runs a regex-based secret scanner on the staged diff before
|
||||
* `git commit` (see bin/gstack-brain-sync:78-110: AWS keys, GitHub
|
||||
* tokens, OpenAI keys, PEM blocks, JWTs, bearer-token-in-JSON). That's
|
||||
* the right place — it gates content leaving the machine.
|
||||
*
|
||||
* memory-ingest, by contrast, moves data from one local file to a
|
||||
* local PGLite database. Scanning every source file at ingest time
|
||||
* doesn't change exposure (the secret already lives in plaintext
|
||||
* where the user keeps their transcripts and artifacts) but costs
|
||||
* ~470s on cold runs. We removed the per-file gitleaks gate as
|
||||
* redundant defense-in-depth and made it opt-in via `--scan-secrets`
|
||||
* for users who want belt-and-suspenders.
|
||||
*/
|
||||
function preparePages(
|
||||
args: CliArgs,
|
||||
ctx: WalkContext,
|
||||
state: IngestState,
|
||||
): {
|
||||
prepared: PreparedPage[];
|
||||
skippedSecret: number;
|
||||
skippedDedup: number;
|
||||
skippedUnattributed: number;
|
||||
parseFailed: number;
|
||||
partialPages: number;
|
||||
} {
|
||||
const prepared: PreparedPage[] = [];
|
||||
let skippedSecret = 0;
|
||||
let skippedDedup = 0;
|
||||
let skippedUnattributed = 0;
|
||||
let failed = 0;
|
||||
let parseFailed = 0;
|
||||
let partialPages = 0;
|
||||
|
||||
for (const { path, type } of walkAllSources(ctx)) {
|
||||
if (args.limit !== null && written >= args.limit) break;
|
||||
if (args.limit !== null && prepared.length >= args.limit) break;
|
||||
|
||||
if (args.mode === "incremental" && !fileChangedSinceState(path, state)) {
|
||||
skippedDedup++;
|
||||
continue;
|
||||
}
|
||||
|
||||
// Secret scan first
|
||||
const scan = secretScanFile(path);
|
||||
if (scan.scanner === "gitleaks" && scan.findings.length > 0) {
|
||||
skippedSecret++;
|
||||
if (!args.quiet) {
|
||||
console.error(`[secret-scan match] ${path} (${scan.findings.length} finding${scan.findings.length === 1 ? "" : "s"}); skipped`);
|
||||
// Optional belt-and-suspenders: when --scan-secrets is set, scan the
|
||||
// source file with gitleaks and skip dirty ones. Off by default
|
||||
// because gstack-brain-sync already gates the cross-machine boundary
|
||||
// and per-file gitleaks costs ~256ms/file (4-8 min on a real corpus).
|
||||
if (args.scanSecrets) {
|
||||
const scan = secretScanFile(path);
|
||||
if (scan.scanner === "gitleaks" && scan.findings.length > 0) {
|
||||
skippedSecret++;
|
||||
if (!args.quiet) {
|
||||
console.error(
|
||||
`[secret-scan match] ${path} (${scan.findings.length} finding${
|
||||
scan.findings.length === 1 ? "" : "s"
|
||||
}); skipped`,
|
||||
);
|
||||
}
|
||||
continue;
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
let page: PageRecord;
|
||||
@@ -936,7 +1149,7 @@ async function ingestPass(args: CliArgs): Promise<BulkResult> {
|
||||
if (type === "transcript") {
|
||||
const session = parseTranscriptJsonl(path);
|
||||
if (!session) {
|
||||
failed++;
|
||||
parseFailed++;
|
||||
continue;
|
||||
}
|
||||
if (!args.includeUnattributed && !session.cwd) {
|
||||
@@ -953,38 +1166,373 @@ async function ingestPass(args: CliArgs): Promise<BulkResult> {
|
||||
page = buildArtifactPage(path, type);
|
||||
}
|
||||
} catch (err) {
|
||||
failed++;
|
||||
parseFailed++;
|
||||
console.error(`[parse-error] ${path}: ${(err as Error).message}`);
|
||||
continue;
|
||||
}
|
||||
|
||||
const result = args.noWrite
|
||||
? { ok: true }
|
||||
: await withErrorContext(
|
||||
`put_page:${page.slug}`,
|
||||
async () => gbrainPutPage(page),
|
||||
"gstack-memory-ingest"
|
||||
);
|
||||
if (!result.ok) {
|
||||
failed++;
|
||||
if (!args.quiet) {
|
||||
console.error(`[put-error] ${page.slug}: ${result.error || "unknown"}`);
|
||||
prepared.push({
|
||||
slug: page.slug,
|
||||
source_path: path,
|
||||
rendered_body: renderPageBody(page),
|
||||
page_slug: page.slug,
|
||||
partial: page.partial ?? false,
|
||||
});
|
||||
}
|
||||
|
||||
return {
|
||||
prepared,
|
||||
skippedSecret,
|
||||
skippedDedup,
|
||||
skippedUnattributed,
|
||||
parseFailed,
|
||||
partialPages,
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Make a per-run staging directory at ~/.gstack/.staging-ingest-<pid>-<ts>/
|
||||
* The pid+ts namespace avoids collisions when two ingest passes run
|
||||
* concurrently (the orchestrator's lock should prevent this, but
|
||||
* defense-in-depth).
|
||||
*/
|
||||
function makeStagingDir(): string {
|
||||
const dir = join(GSTACK_HOME, `.staging-ingest-${process.pid}-${Date.now()}`);
|
||||
mkdirSync(dir, { recursive: true });
|
||||
return dir;
|
||||
}
|
||||
|
||||
/**
|
||||
* Best-effort recursive cleanup. Failures swallowed — at worst we leak a
|
||||
* staging dir to disk; the next run uses a new one and they age out via
|
||||
* normal disk hygiene. We deliberately do NOT crash the pipeline on
|
||||
* cleanup failure.
|
||||
*/
|
||||
function cleanupStagingDir(dir: string): void {
|
||||
try {
|
||||
rmSync(dir, { recursive: true, force: true });
|
||||
} catch {
|
||||
// best-effort
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Track the currently-running gbrain import child + active staging dir so
|
||||
* SIGTERM/SIGINT on the parent process can:
|
||||
* 1. forward the signal to the child (otherwise gbrain orphans, holds the
|
||||
* PGLite write lock, and burns CPU — observed during 2026-05-10 cold-run
|
||||
* testing)
|
||||
* 2. synchronously clean up the staging dir BEFORE process.exit (otherwise
|
||||
* finally blocks in async callers don't run after process.exit from
|
||||
* inside a signal handler, leaking the staging dir on every interrupt)
|
||||
*/
|
||||
let _activeImportChild: ChildProcess | null = null;
|
||||
let _activeStagingDir: string | null = null;
|
||||
let _signalHandlersInstalled = false;
|
||||
function installSignalForwarder(): void {
|
||||
if (_signalHandlersInstalled) return;
|
||||
_signalHandlersInstalled = true;
|
||||
const forward = (signal: NodeJS.Signals) => () => {
|
||||
if (_activeImportChild && _activeImportChild.pid && !_activeImportChild.killed) {
|
||||
try {
|
||||
process.kill(_activeImportChild.pid, signal);
|
||||
} catch {
|
||||
// child may have already exited between the alive-check and the kill
|
||||
}
|
||||
}
|
||||
// Synchronously clean up the active staging dir before exiting. The async
|
||||
// `finally` blocks in ingestPass never run after process.exit fires from
|
||||
// inside this handler, so cleanup has to happen here.
|
||||
if (_activeStagingDir) {
|
||||
cleanupStagingDir(_activeStagingDir);
|
||||
_activeStagingDir = null;
|
||||
}
|
||||
// Re-raise to default action so the parent actually exits. Without this,
|
||||
// a SIGTERM handler that doesn't exit holds the process alive.
|
||||
process.exit(signal === "SIGINT" ? 130 : 143);
|
||||
};
|
||||
process.on("SIGTERM", forward("SIGTERM"));
|
||||
process.on("SIGINT", forward("SIGINT"));
|
||||
}
|
||||
|
||||
/**
|
||||
* Run gbrain import as an async child so we can install signal handlers
|
||||
* that kill the child on parent SIGTERM/SIGINT. Returns the same shape as
|
||||
* spawnSync's result so the caller doesn't care which mode was used.
|
||||
*/
|
||||
function runGbrainImport(
|
||||
stagingDir: string,
|
||||
timeoutMs: number,
|
||||
): Promise<{ status: number | null; stdout: string; stderr: string }> {
|
||||
installSignalForwarder();
|
||||
return new Promise((resolve) => {
|
||||
const child = spawn(
|
||||
"gbrain",
|
||||
["import", stagingDir, "--no-embed", "--json"],
|
||||
{ stdio: ["ignore", "pipe", "pipe"] },
|
||||
);
|
||||
_activeImportChild = child;
|
||||
let stdout = "";
|
||||
let stderr = "";
|
||||
let timedOut = false;
|
||||
const timer = setTimeout(() => {
|
||||
timedOut = true;
|
||||
try {
|
||||
if (child.pid) process.kill(child.pid, "SIGTERM");
|
||||
} catch {
|
||||
// already gone
|
||||
}
|
||||
}, timeoutMs);
|
||||
child.stdout?.on("data", (chunk) => {
|
||||
stdout += chunk.toString("utf-8");
|
||||
});
|
||||
child.stderr?.on("data", (chunk) => {
|
||||
stderr += chunk.toString("utf-8");
|
||||
});
|
||||
child.on("close", (status) => {
|
||||
clearTimeout(timer);
|
||||
_activeImportChild = null;
|
||||
resolve({
|
||||
status: timedOut ? null : status,
|
||||
stdout,
|
||||
stderr,
|
||||
});
|
||||
});
|
||||
child.on("error", (err) => {
|
||||
clearTimeout(timer);
|
||||
_activeImportChild = null;
|
||||
resolve({
|
||||
status: null,
|
||||
stdout,
|
||||
stderr: stderr + `\n[spawn-error] ${(err as Error).message}`,
|
||||
});
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
async function ingestPass(args: CliArgs): Promise<BulkResult> {
|
||||
const t0 = Date.now();
|
||||
const state = loadState();
|
||||
const ctx = makeWalkContext(args, state);
|
||||
|
||||
// Phase 1: prepare (parse + secret-scan + filter + render frontmatter).
|
||||
const prep = preparePages(args, ctx, state);
|
||||
|
||||
let written = 0;
|
||||
let failed = 0;
|
||||
|
||||
if (args.noWrite) {
|
||||
// --no-write: skip the gbrain import call but still record state for
|
||||
// prepared pages (treat them as ingested for dedup purposes). Matches
|
||||
// the prior contract from --help: "Skip gbrain put_page calls (still
|
||||
// updates state file)".
|
||||
const nowIso = new Date().toISOString();
|
||||
for (const p of prep.prepared) {
|
||||
try {
|
||||
state.sessions[p.source_path] = {
|
||||
mtime_ns: Math.floor(statSync(p.source_path).mtimeMs * 1e6),
|
||||
sha256: fileSha256(p.source_path),
|
||||
ingested_at: nowIso,
|
||||
page_slug: p.page_slug,
|
||||
partial: p.partial,
|
||||
};
|
||||
written++;
|
||||
} catch {
|
||||
// best-effort state record
|
||||
}
|
||||
}
|
||||
state.last_full_walk = new Date().toISOString();
|
||||
state.last_writer = "gstack-memory-ingest";
|
||||
saveState(state);
|
||||
return {
|
||||
written,
|
||||
skipped_secret: prep.skippedSecret,
|
||||
skipped_dedup: prep.skippedDedup,
|
||||
skipped_unattributed: prep.skippedUnattributed,
|
||||
failed: prep.parseFailed,
|
||||
duration_ms: Date.now() - t0,
|
||||
partial_pages: prep.partialPages,
|
||||
};
|
||||
}
|
||||
|
||||
if (prep.prepared.length === 0) {
|
||||
// Nothing to import — still touch state.last_full_walk and exit.
|
||||
state.last_full_walk = new Date().toISOString();
|
||||
state.last_writer = "gstack-memory-ingest";
|
||||
saveState(state);
|
||||
return {
|
||||
written: 0,
|
||||
skipped_secret: prep.skippedSecret,
|
||||
skipped_dedup: prep.skippedDedup,
|
||||
skipped_unattributed: prep.skippedUnattributed,
|
||||
failed: prep.parseFailed,
|
||||
duration_ms: Date.now() - t0,
|
||||
partial_pages: prep.partialPages,
|
||||
};
|
||||
}
|
||||
|
||||
if (!gbrainAvailable()) {
|
||||
const msg =
|
||||
"gbrain CLI not in PATH or missing `import` subcommand. Run /setup-gbrain.";
|
||||
console.error(`[memory-ingest] ERR: ${msg}`);
|
||||
return {
|
||||
written: 0,
|
||||
skipped_secret: prep.skippedSecret,
|
||||
skipped_dedup: prep.skippedDedup,
|
||||
skipped_unattributed: prep.skippedUnattributed,
|
||||
failed: prep.parseFailed + prep.prepared.length,
|
||||
duration_ms: Date.now() - t0,
|
||||
partial_pages: prep.partialPages,
|
||||
system_error: msg,
|
||||
};
|
||||
}
|
||||
|
||||
// Phase 2: stage to a per-run dir + invoke gbrain import.
|
||||
const stagingDir = makeStagingDir();
|
||||
// Register staging dir with the signal forwarder so SIGTERM/SIGINT can
|
||||
// synchronously clean it up before process.exit (the async finally block
|
||||
// below does NOT run after a signal-handler exit).
|
||||
_activeStagingDir = stagingDir;
|
||||
try {
|
||||
const staging = writeStaged(prep.prepared, stagingDir);
|
||||
failed += staging.errors.length;
|
||||
if (!args.quiet && staging.errors.length > 0) {
|
||||
for (const e of staging.errors.slice(0, 5)) {
|
||||
console.error(`[stage-error] ${e.slug}: ${e.error}`);
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
state.sessions[path] = {
|
||||
mtime_ns: Math.floor(statSync(path).mtimeMs * 1e6),
|
||||
sha256: page.content_sha256,
|
||||
ingested_at: new Date().toISOString(),
|
||||
page_slug: page.slug,
|
||||
partial: page.partial,
|
||||
};
|
||||
written++;
|
||||
if (!args.quiet) {
|
||||
const tag = page.partial ? " [partial]" : "";
|
||||
console.log(`[${written}] ${page.slug}${tag}`);
|
||||
// D7: snapshot sync-failures.jsonl byte-offset before import so we
|
||||
// can read only newly-appended failure entries afterwards.
|
||||
const syncFailuresPath = join(homedir(), ".gbrain", "sync-failures.jsonl");
|
||||
let preImportOffset = 0;
|
||||
try {
|
||||
if (existsSync(syncFailuresPath)) {
|
||||
preImportOffset = statSync(syncFailuresPath).size;
|
||||
}
|
||||
} catch {
|
||||
// best-effort; absent file → 0 offset, all future entries are "new"
|
||||
}
|
||||
|
||||
if (!args.quiet) {
|
||||
console.error(
|
||||
`[memory-ingest] staged ${staging.written} pages → ${stagingDir}; running gbrain import...`,
|
||||
);
|
||||
}
|
||||
|
||||
// D6: single batch import. `--no-embed` matches the prior per-file
|
||||
// behavior (we never enabled embedding); embeddings happen on-demand
|
||||
// via gbrain's own pipelines. `--json` gives us structured counts.
|
||||
//
|
||||
// Async spawn (not spawnSync) so the signal forwarder installed in
|
||||
// runGbrainImport propagates SIGTERM/SIGINT to the child. With sync
|
||||
// spawn, parent termination orphans the gbrain process (observed
|
||||
// during 2026-05-10 cold-run testing — gbrain kept running 15 min
|
||||
// after the orchestrator timed out).
|
||||
const importResult = await runGbrainImport(stagingDir, 30 * 60 * 1000);
|
||||
|
||||
const stdout = importResult.stdout || "";
|
||||
const stderr = importResult.stderr || "";
|
||||
const importJson = parseImportJson(stdout);
|
||||
|
||||
if (importResult.status !== 0) {
|
||||
const tail = (stderr.trim().split("\n").pop() || "").slice(0, 300);
|
||||
const msg = `gbrain import exited ${importResult.status}: ${tail}`;
|
||||
console.error(`[memory-ingest] ERR: ${msg}`);
|
||||
// We conservatively state-record nothing on a non-zero exit — per-run
|
||||
// partial progress is invisible to us when the importer crashed.
|
||||
// sync-failures.jsonl entries may still hold per-file detail.
|
||||
failed += prep.prepared.length;
|
||||
return {
|
||||
written: 0,
|
||||
skipped_secret: prep.skippedSecret,
|
||||
skipped_dedup: prep.skippedDedup,
|
||||
skipped_unattributed: prep.skippedUnattributed,
|
||||
failed,
|
||||
duration_ms: Date.now() - t0,
|
||||
partial_pages: prep.partialPages,
|
||||
system_error: msg,
|
||||
};
|
||||
}
|
||||
|
||||
if (!args.quiet) {
|
||||
// Echo gbrain's own progress lines on stderr through so the user sees
|
||||
// them when running interactively. Already on our stderr from the
|
||||
// child via `stdio: pipe`, but we explicitly forward for clarity.
|
||||
process.stderr.write(stderr);
|
||||
}
|
||||
|
||||
if (importJson === null) {
|
||||
// gbrain exited 0 but didn't emit a parseable --json line. Treat as
|
||||
// ERR rather than silently passing zeros through — silent zeros let
|
||||
// a future gbrain-output regression mask data loss.
|
||||
const msg =
|
||||
"gbrain import exited 0 but emitted no parseable --json payload. " +
|
||||
"Refusing to advance state.";
|
||||
console.error(`[memory-ingest] ERR: ${msg}`);
|
||||
failed += prep.prepared.length;
|
||||
return {
|
||||
written: 0,
|
||||
skipped_secret: prep.skippedSecret,
|
||||
skipped_dedup: prep.skippedDedup,
|
||||
skipped_unattributed: prep.skippedUnattributed,
|
||||
failed,
|
||||
duration_ms: Date.now() - t0,
|
||||
partial_pages: prep.partialPages,
|
||||
system_error: msg,
|
||||
};
|
||||
}
|
||||
|
||||
// D7: identify which staged files failed to import and exclude them
|
||||
// from state recording. Source paths get a retry on the next run.
|
||||
const failedSources = readNewFailures(
|
||||
syncFailuresPath,
|
||||
preImportOffset,
|
||||
staging.stagedPathToSource,
|
||||
);
|
||||
failed += failedSources.size;
|
||||
|
||||
// Phase 3: state recording. Only files that landed in gbrain get
|
||||
// their mtime+sha256 stamped. Failed source paths are deliberately
|
||||
// left un-state'd so the next run re-prepares them and gbrain's
|
||||
// content_hash dedup short-circuits the import.
|
||||
const nowIso = new Date().toISOString();
|
||||
for (const p of prep.prepared) {
|
||||
if (failedSources.has(p.source_path)) continue;
|
||||
try {
|
||||
state.sessions[p.source_path] = {
|
||||
mtime_ns: Math.floor(statSync(p.source_path).mtimeMs * 1e6),
|
||||
sha256: fileSha256(p.source_path),
|
||||
ingested_at: nowIso,
|
||||
page_slug: p.page_slug,
|
||||
partial: p.partial,
|
||||
};
|
||||
written++;
|
||||
if (!args.quiet) {
|
||||
const tag = p.partial ? " [partial]" : "";
|
||||
console.log(`[${written}] ${p.page_slug}${tag}`);
|
||||
}
|
||||
} catch (err) {
|
||||
// statSync can fail if the source file was removed mid-run; skip
|
||||
// recording but don't fail the whole pass.
|
||||
console.error(
|
||||
`[state-record] ${p.source_path}: ${(err as Error).message}`,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
if (!args.quiet) {
|
||||
console.error(
|
||||
`[memory-ingest] gbrain import: ${importJson.imported ?? 0} imported, ` +
|
||||
`${importJson.skipped ?? 0} unchanged, ${importJson.errors ?? 0} failed` +
|
||||
(failedSources.size > 0
|
||||
? ` (see ~/.gbrain/sync-failures.jsonl for details)`
|
||||
: ""),
|
||||
);
|
||||
}
|
||||
} finally {
|
||||
cleanupStagingDir(stagingDir);
|
||||
_activeStagingDir = null;
|
||||
}
|
||||
|
||||
state.last_full_walk = new Date().toISOString();
|
||||
@@ -993,12 +1541,12 @@ async function ingestPass(args: CliArgs): Promise<BulkResult> {
|
||||
|
||||
return {
|
||||
written,
|
||||
skipped_secret: skippedSecret,
|
||||
skipped_dedup: skippedDedup,
|
||||
skipped_unattributed: skippedUnattributed,
|
||||
failed,
|
||||
skipped_secret: prep.skippedSecret,
|
||||
skipped_dedup: prep.skippedDedup,
|
||||
skipped_unattributed: prep.skippedUnattributed,
|
||||
failed: failed + prep.parseFailed,
|
||||
duration_ms: Date.now() - t0,
|
||||
partial_pages: partialPages,
|
||||
partial_pages: prep.partialPages,
|
||||
};
|
||||
}
|
||||
|
||||
@@ -1072,11 +1620,15 @@ async function main(): Promise<void> {
|
||||
if (result.written > 0 || result.failed > 0) {
|
||||
console.error(`[memory-ingest] ${result.written} written, ${result.failed} failed in ${dt}ms`);
|
||||
}
|
||||
// D6: system_error → process-level failure; orchestrator sees ERR.
|
||||
// Per-file errors do NOT exit non-zero.
|
||||
if (result.system_error) process.exit(1);
|
||||
return;
|
||||
}
|
||||
|
||||
const result = await ingestPass(args);
|
||||
printBulkResult(result, args);
|
||||
if (result.system_error) process.exit(1);
|
||||
}
|
||||
|
||||
main().catch((err) => {
|
||||
|
||||
Reference in New Issue
Block a user