ソースを参照

feat(resolution): bridge Celery .delay()/.apply_async() dispatch to the task body

Celery decouples a task's call site from its body: a @shared_task / @app.task
decorated def is invoked via task.delay(...) / task.apply_async(...), a dynamic
hop with no static edge, so flows dead-end at the dispatch and the agent reads
tasks.py to reconstruct them. celeryDispatchEdges links the enclosing function
at each .delay/.apply_async site -> the task function body.

Precision rests on a DECORATOR gate: the dispatched name must resolve to a
Python function carrying a task decorator, read from the source lines ABOVE its
def (the def's startLine excludes the decorator, and no decorates edge exists
since @shared_task is an unresolved external import). The kind==='function'
filter drops same-named test-method collisions; canvas forms (group(t).delay(),
t.s()/.si()) have no single identifier before .delay so they're skipped, not
mis-bridged; cross-module name collisions prefer a same-file task else bail.

Surfaces as `dynamic: celery dispatch` via the generic synth-edge fallback.

Validated 100% precision on two grep-confirmed repos exercising both decorator
dialects: paperless-ngx (small, @shared_task, 31 edges, 31/31 real) and pretix
(medium, @app.task, 63 edges across 21 tasks, 0/21 false positives); 0 on the
httpie control (no Celery). Node-stable (pure edge synth). Suite 1615 green.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Colby McHenry 2 日 前
コミット
6e5c3a9336

+ 1 - 0
CHANGELOG.md

@@ -15,6 +15,7 @@ and adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
 - `codegraph_explore` now connects a Vue component to the **Pinia** store action it calls. When code does `const store = useUserStore()` and then `store.fetchUser()`, that call now links through to the `fetchUser` action in the store module — so "what happens when this view loads its data?" traces from the component into the action's body instead of stopping at the `store.fetchUser()` line. Works for both Pinia store styles (options and setup), and stays precise (a built-in like `store.$patch()` or an unrelated same-named method isn't mislinked).
 - `codegraph_explore` now follows **Vuex** string dispatch. A `dispatch('user/login')` or `commit('SET_TOKEN')` call — namespaced `'module/action'` keys included — now links to the action or mutation it names, resolved to the correct store module even when several modules share an action name (and without being fooled by a same-named `api/` helper). So "what runs when this dispatches?" traces from the call into the store handler and on to the mutations it commits. Vuex's canonical `export default { namespaced, actions, mutations }` module shape is now indexed too, so those handlers are findable symbols.
 - `codegraph_explore` now connects React data-fetching flows built on **RTK Query** (Redux Toolkit's `createApi`). An endpoint defined inside `createApi({ endpoints })` and the `useGetXQuery` / `useUpdateYMutation` hook it generates were both invisible to analysis — so "what does this component fetch?" or "where does `useGetThingQuery` get its data?" dead-ended, because the hook, the endpoint, and the component had nothing linking them. CodeGraph now indexes each endpoint and each generated hook as real symbols and wires the path `component → useGetXQuery → getX → queryFn`, so the flow resolves in one explore call instead of reading the API slice by hand. Both the arrow (`endpoints: build => ({ … })`) and method (`endpoints(builder) { return { … } }`) styles are recognized, along with the `useLazyGetXQuery` variant; hand-written hooks of a similar name are left untouched.
+- `codegraph_explore` now follows **Celery** task dispatch in Python. A `send_email.delay(...)` or `send_email.apply_async(...)` call now links to the `@shared_task` / `@app.task` function it runs — typically defined in a different module (`tasks.py`) from where it's triggered (a view or service) — so "what actually happens when this is dispatched?" traces from the call site straight into the task body instead of stopping at the `.delay()` line. Both decorator dialects are recognized (bare `@shared_task` and the arg'd `@app.task(bind=True, …)` form), including the module-qualified `tasks.invalidate_cache.apply_async()` call style. It stays precise: a `.delay()` on something that isn't a Celery task is never mislinked, so a project that doesn't use Celery is unaffected.
 
 - `codegraph_explore` now surfaces the right code in large multi-layer projects. When you ask a backend-flow question in a repo that pairs an API server with a big frontend that mirrors the same domain words — say an `app/` admin UI sitting over an `api/` server — the server-side file that genuinely matches several of your query's terms is no longer pushed out of the results by the larger, more interconnected frontend layer. A file corroborated by two or more distinct query terms is now kept in the answer even when a denser unrelated layer would otherwise crowd it out, so "how does X read items / handle the request" returns the service or handler that does the work instead of a wall of frontend views. Single-layer projects are unaffected; set `CODEGRAPH_RANK_NO_MULTITERM=1` to revert to the previous ranking.
 - Impact and blast-radius analysis for TypeScript, JavaScript, Go, Python, Rust, Ruby, C, Java, C#, PHP, Scala, Kotlin, Swift, Dart, and Pascal/Delphi now understands the readers of a constant. When you change a file-scope, package-level, module-level, or class-level constant — a config object, a lookup table, a shared constant — the other symbols in that file that read it now show up as affected, where before they were invisible (impact only followed calls, imports, and inheritance, so a constant's consumers looked like "nothing depends on this"). This makes `codegraph impact`, and the impact trail in `codegraph_explore`/`codegraph_node`, catch the "change this table, break its readers" class of change. It's on by default and adds no nodes to your graph; bundled/minified files and ambiguously-shadowed names are skipped to keep results precise. Set `CODEGRAPH_VALUE_REFS=0` to turn it off.

+ 129 - 0
__tests__/celery-dispatch-synthesizer.test.ts

@@ -0,0 +1,129 @@
+/**
+ * Celery task-dispatch bridge (Python).
+ *
+ * Celery decouples a task's call site from its body: a `@shared_task` / `@app.task`
+ * decorated `def` is invoked through `task.delay(...)` / `task.apply_async(...)`, a
+ * dynamic hop with no static edge. This bridges each `.delay`/`.apply_async` site to
+ * the task function, gated on the DECORATOR (read from the source above the `def`) so a
+ * `.delay()` on a non-task object resolves to nothing. Covers both decorator dialects
+ * (`@shared_task`, `@app.task(...)`), the module-qualified `mod.task.apply_async()` form,
+ * and proves the precision gates: a plain function called with `.delay()` and a canvas
+ * `group(...).delay()` (no single identifier before `.delay`) both contribute no edge.
+ */
+import { describe, it, expect, beforeEach, afterEach } from 'vitest';
+import * as fs from 'node:fs';
+import * as path from 'node:path';
+import * as os from 'node:os';
+import { CodeGraph } from '../src';
+
+describe('celery-dispatch synthesizer', () => {
+  let dir: string;
+  beforeEach(() => { dir = fs.mkdtempSync(path.join(os.tmpdir(), 'celery-dispatch-')); });
+  afterEach(() => { fs.rmSync(dir, { recursive: true, force: true }); });
+
+  it('bridges .delay()/.apply_async() to decorated tasks, ignoring non-task and canvas dispatch', async () => {
+    // Two decorator dialects: bare @shared_task and arg'd @app.task(...).
+    fs.writeFileSync(
+      path.join(dir, 'tasks.py'),
+      `from celery import shared_task
+from myapp.celery import app
+
+
+@shared_task
+def send_email(to):
+    return to
+
+
+@app.task(bind=True, max_retries=3)
+def crunch(self, n):
+    return n * 2
+`
+    );
+    fs.mkdirSync(path.join(dir, 'services'), { recursive: true });
+    fs.writeFileSync(
+      path.join(dir, 'services', 'tickets.py'),
+      `from celery import shared_task
+
+
+@shared_task
+def invalidate_cache():
+    return None
+`
+    );
+    // A plain function — NOT a celery task — that nonetheless has .delay() called on it.
+    fs.writeFileSync(
+      path.join(dir, 'utils.py'),
+      `def process_data(x):
+    return x
+`
+    );
+    // Dispatch sites, all inside one enclosing function.
+    fs.writeFileSync(
+      path.join(dir, 'views.py'),
+      `from tasks import send_email, crunch
+from services import tickets
+from utils import process_data
+from celery import group
+
+
+def handle_request(req):
+    send_email.delay(req.addr)                 # → send_email task (cross-file)
+    crunch.apply_async(args=[5])               # → crunch task (@app.task dialect)
+    tickets.invalidate_cache.apply_async()     # module-qualified → invalidate_cache
+    process_data.delay(req.x)                  # NOT a task → no edge
+    group([send_email.s(a) for a in req.addrs]).delay()  # canvas → no edge
+`
+    );
+
+    const cg = await CodeGraph.init(dir, { silent: true });
+    await cg.indexAll();
+    const db = (cg as any).db.db;
+
+    const edges = db
+      .prepare(
+        `SELECT s.name source, t.name target, t.file_path tf, json_extract(e.metadata,'$.via') via
+         FROM edges e JOIN nodes s ON s.id = e.source JOIN nodes t ON t.id = e.target
+         WHERE json_extract(e.metadata,'$.synthesizedBy') = 'celery-dispatch'`
+      )
+      .all();
+
+    const targets = (src: string) => edges.filter((r: any) => r.source === src).map((r: any) => r.target).sort();
+    // handle_request dispatches exactly the three real tasks (both dialects + module-qualified).
+    expect(targets('handle_request')).toEqual(['crunch', 'invalidate_cache', 'send_email']);
+    // The @app.task target resolved to the task def, not anything else.
+    const crunchEdge = edges.find((r: any) => r.target === 'crunch');
+    expect(crunchEdge.tf).toMatch(/tasks\.py$/);
+    // Module-qualified `tickets.invalidate_cache.apply_async()` resolved by the last identifier.
+    const cacheEdge = edges.find((r: any) => r.target === 'invalidate_cache');
+    expect(cacheEdge.tf).toMatch(/services[\\/]tickets\.py$/);
+    expect(cacheEdge.via).toBe('invalidate_cache');
+    // PRECISION: a plain function called with .delay() is never targeted (no decorator).
+    expect(edges.some((r: any) => r.target === 'process_data')).toBe(false);
+
+    cg.close?.();
+  });
+
+  it('produces no edges in a Celery-free project (clean control)', async () => {
+    fs.writeFileSync(
+      path.join(dir, 'app.py'),
+      `def schedule(job):
+    job.delay()          # a .delay() that has nothing to do with Celery
+    return job
+
+
+def run():
+    schedule(make_job())
+`
+    );
+    const cg = await CodeGraph.init(dir, { silent: true });
+    await cg.indexAll();
+    const db = (cg as any).db.db;
+    const count = db
+      .prepare(
+        `SELECT count(*) c FROM edges WHERE json_extract(metadata,'$.synthesizedBy') = 'celery-dispatch'`
+      )
+      .get();
+    expect(count.c).toBe(0);
+    cg.close?.();
+  });
+});

+ 5 - 1
docs/design/dispatch-synthesizer-backlog.md

@@ -64,7 +64,8 @@ Status legend (matches the playbook): ✅ done+validated · 🟡 shipped but und
 | Shape | Ecosystem | Anchor | Mechanism | Status |
 |---|---|---|---|---|
 | **MediatR / CQRS** | .NET | `IRequest<T>` → `IRequestHandler<TReq,T>` by the generic request type; `_mediator.Send(new GetFooQuery())` → handler | S (generic-type-keyed) | 🔬 named a frontier in CLAUDE.md, but it's statically keyable via the generic — worth a real attempt |
-| **Celery / Sidekiq** | Python / Ruby | `@task`/`@shared_task` + `.delay()`/`.apply_async()`; `Worker.perform_async` → `perform` | R/X (decorator + name) | ⬜ |
+| **Celery** | Python | `@shared_task`/`@app.task`/`@<app>.task`/`@task` def + `.delay()`/`.apply_async()` call → task body | S (decorator-gated name) | ✅ **SHIPPED (2026-06-20)** — `synthesizedBy:'celery-dispatch'` (`celeryDispatchEdges`). Link the enclosing fn at each `.delay(`/`.apply_async(` site → the task fn. Precision rests on the DECORATOR gate: the dispatched name must resolve to a Python `function` carrying a task decorator, read from the source lines ABOVE its `def` (the def's own startLine excludes the decorator; no `decorates` edge exists — `@shared_task` is an unresolved external import). `kind==='function'` filter drops the same-named test-method collision (`consume_file`). Canvas forms (`group(t).delay()`, `t.s()`/`.si()`) have no single identifier before `.delay` → skipped, not mis-bridged. Cross-module name collision → same-file preference else bail. **100% precision: paperless-ngx (small, `@shared_task`, 31 edges, 31/31 real), pretix (medium, `@app.task`, 63 edges across 21 tasks, 0/21 FP); 0 on the httpie control (no Celery).** Node-stable (pure edge synth, no extraction change). Surfaces as `dynamic: celery dispatch @site` via the generic fallback. `+ celery-dispatch-synthesizer.test.ts`. **Deferred (recall):** canvas dispatch, class-based `Task` subclasses, `app.send_task('dotted.name')` string dispatch, aliased imports (`import send_email as s; s.delay()`). |
+| **Sidekiq** | Ruby | `class W; include Sidekiq::Job; def perform; end` + `W.perform_async(...)` → `perform` | S (class→perform) | ⬜ — the Ruby sibling of Celery; build next-to-it (grep-confirm ≥2 `perform_async` repos). |
 | **Laravel / Spring events** | PHP / Java | `event(OrderShipped::class)` → `EventServiceProvider` listener map; `@EventListener onX(EventT)` → publisher by event type | R (mapped) | ⬜ |
 
 ### Tier C — frontier, ⛔ do **not** build (no static anchor; would add noise)
@@ -82,6 +83,9 @@ Status legend (matches the playbook): ✅ done+validated · 🟡 shipped but und
 | Redux thunk | `redux-thunk` | ✅ **generalizes (2026-06-20)** — precise on uwave-web (small, 5 edges), session-desktop (medium, 2), trezor (large, 211); control shapeshift (RTK Query, no thunks) = 0. Receiver-agnostic (`api.dispatch`/`thunkApi.dispatch`/`window.…dispatch` all matched). **⚠️ 2 follow-ups below.** |
 | Object-literal registry | `object-registry` | ✅ **shipped (2026-06-20)** — xrengine `CommandManager` (64), Prebid.js (7), warp-drive (1); 0 false positives after 4 precision gates. |
 | RTK Query | `rtk-query` | ✅ **shipped (2026-06-20)** — 100% precision (hooks == synth edges, 0 cross-file) on basetool (54), minusx-metabase (11), shapeshift (13); 0 on uwave-web control. Extraction mints endpoint + generated-hook nodes; synth bridges hook→endpoint by convention. |
+| Pinia store | `pinia-store` | ✅ **shipped (2026-06-20)** — `useStore().action()` instance dispatch → action; 100% precision Geeker (41) / MallChat (64), 0 on element-admin (Vuex) control. |
+| Vuex dispatch | `vuex-dispatch` | ✅ **shipped (2026-06-20)** — string `dispatch('ns/action')`/`commit('M')` → handler; 100% precision element-admin (55) / vue-admin-template (12) / d2-admin (63), 0 on Redux controls. |
+| Celery | `celery-dispatch` | ✅ **shipped (2026-06-20)** — `.delay()`/`.apply_async()` → `@shared_task`/`@app.task` body; 100% precision paperless-ngx (31) / pretix (63 across 21 tasks), 0 on httpie control. Decorator-gated via source above the `def`. |
 | (see playbook §6 / `callback-synthesizer.ts` for the other ~20 channels) | | |
 
 ### redux-thunk follow-ups (found by the n>1 validation — this is exactly what it's for)

+ 98 - 1
src/resolution/callback-synthesizer.ts

@@ -2092,12 +2092,107 @@ function vuexDispatchEdges(ctx: ResolutionContext): Edge[] {
   return edges;
 }
 
+// ── Celery task dispatch (Python) ─────────────────────────────────────────────
+// Celery decouples a task's call site from its body through async dispatch:
+//   # tasks.py
+//   @shared_task                       # also @app.task / @celery_app.task / @<app>.task / @task
+//   def process(account_ids): ...
+//   # views.py — a DIFFERENT module
+//   process.apply_async(kwargs={...})  # or process.delay(...) — dynamic, no static edge
+// Bridge it: link the enclosing function/method at each `.delay(`/`.apply_async(` site → the
+// task function body. Precision rests on the DECORATOR gate — the dispatched name must resolve
+// to a Python function carrying a celery task decorator (read from the source lines above its
+// `def`, since the def's own startLine excludes the decorator). A `.delay()` on a non-task
+// object resolves to no task node → no edge, so a Celery-free repo yields 0. Same-file /
+// unique-candidate disambiguation like vuex. (Canvas forms — `group(t).delay()`, `t.s()`/`.si()`
+// — have no single identifier before `.delay`/`.apply_async`, so they're skipped, not mis-bridged.)
+const CELERY_DISPATCH_RE = /\b([A-Za-z_]\w*)\s*\.\s*(?:delay|apply_async)\s*\(/g;
+// A task decorator: bare `@shared_task`/`@task` or attribute `@app.task`/`@celery_app.task`,
+// each optionally called with args. `\b`-bounded and `@`-anchored so `@mytask`, or a symbol
+// merely named `task`, can't match. No `/g`, so `.test()` is stateless across reuse.
+const CELERY_TASK_DECORATOR_RE = /@\s*(?:[A-Za-z_][\w.]*\.)?(?:shared_task|task)\b/;
+const CELERY_PY_EXT = /\.py$/;
+const CELERY_FANOUT_CAP = 80;
+const CELERY_DECORATOR_LOOKBACK = 12; // max lines above a `def` to scan for its decorators
+
+function celeryDispatchEdges(ctx: ResolutionContext): Edge[] {
+  // Memoize the decorator check per task-candidate node: it reads the file and scans a few
+  // lines above the def. Only called on names that are actually `.delay`/`.apply_async`
+  // receivers, so the candidate set stays small.
+  const taskCache = new Map<string, boolean>();
+  const isCeleryTask = (node: Node): boolean => {
+    let v = taskCache.get(node.id);
+    if (v !== undefined) return v;
+    v = false;
+    if (node.kind === 'function' && CELERY_PY_EXT.test(node.filePath)) {
+      const content = ctx.readFile(node.filePath);
+      if (content) {
+        const lines = content.split('\n');
+        // startLine is the `def` line (decorators sit ABOVE it). Walk upward, stopping at the
+        // previous declaration so a non-task def can never inherit the prior def's decorator.
+        const stop = Math.max(0, node.startLine - 1 - CELERY_DECORATOR_LOOKBACK);
+        for (let i = node.startLine - 2; i >= stop; i--) {
+          const t = (lines[i] ?? '').trim();
+          if (/^(?:async\s+def|def|class)\b/.test(t)) break; // previous decl → stop
+          if (CELERY_TASK_DECORATOR_RE.test(t)) { v = true; break; }
+        }
+      }
+    }
+    taskCache.set(node.id, v);
+    return v;
+  };
+
+  const resolve = (name: string, dispatchFile: string): Node | null => {
+    const cands = ctx.getNodesByName(name).filter((n) => n.kind === 'function' && isCeleryTask(n));
+    if (!cands.length) return null;
+    if (cands.length === 1) return cands[0]!;
+    // Cross-module name collision: prefer a task defined in the dispatching file, else bail
+    // (ambiguous — precision over recall, like vuex's root-key resolution).
+    return cands.find((c) => c.filePath === dispatchFile) ?? null;
+  };
+
+  const edges: Edge[] = [];
+  const seen = new Set<string>();
+  for (const file of ctx.getAllFiles()) {
+    if (!CELERY_PY_EXT.test(file)) continue;
+    const content = ctx.readFile(file);
+    if (!content || (!content.includes('.delay(') && !content.includes('.apply_async('))) continue;
+    const safe = stripCommentsForRegex(content, 'python');
+    const nodesInFile = ctx.getNodesInFile(file);
+    CELERY_DISPATCH_RE.lastIndex = 0;
+    let m: RegExpExecArray | null;
+    let added = 0;
+    while ((m = CELERY_DISPATCH_RE.exec(safe)) && added < CELERY_FANOUT_CAP) {
+      const name = m[1]!;
+      const line = safe.slice(0, m.index).split('\n').length;
+      const disp = enclosingFn(nodesInFile, line);
+      if (!disp) continue; // module-level dispatch — no source symbol to attribute
+      const target = resolve(name, file);
+      if (!target || target.id === disp.id) continue;
+      const key = `${disp.id}>${target.id}`;
+      if (seen.has(key)) continue;
+      seen.add(key);
+      edges.push({
+        source: disp.id,
+        target: target.id,
+        kind: 'calls',
+        line,
+        provenance: 'heuristic',
+        metadata: { synthesizedBy: 'celery-dispatch', via: name, registeredAt: `${file}:${line}` },
+      });
+      added++;
+    }
+  }
+  return edges;
+}
+
 /**
  * Synthesize dispatcher→callback edges (field observers + EventEmitters +
  * React re-render + JSX children + Vue templates + SvelteKit load + RN event
  * channel + Fabric native-impl + MyBatis Java↔XML + Gin middleware chain +
  * Redux-thunk dispatch chain + object-literal registry dispatch + RTK Query
- * generated-hook → endpoint + Pinia useStore().action() + Vuex string dispatch).
+ * generated-hook → endpoint + Pinia useStore().action() + Vuex string dispatch +
+ * Celery task .delay()/.apply_async() → task body).
  * Returns the count added. Never throws into indexing — callers wrap in try/catch.
  */
 export function synthesizeCallbackEdges(queries: QueryBuilder, ctx: ResolutionContext): number {
@@ -2140,6 +2235,7 @@ export function synthesizeCallbackEdges(queries: QueryBuilder, ctx: ResolutionCo
   const rtkEdges = rtkQueryEdges(queries, ctx);
   const piniaEdges = piniaStoreEdges(ctx);
   const vuexEdges = vuexDispatchEdges(ctx);
+  const celeryEdges = celeryDispatchEdges(ctx);
 
   const merged: Edge[] = [];
   const seen = new Set<string>();
@@ -2168,6 +2264,7 @@ export function synthesizeCallbackEdges(queries: QueryBuilder, ctx: ResolutionCo
     ...rtkEdges,
     ...piniaEdges,
     ...vuexEdges,
+    ...celeryEdges,
   ]) {
     const key = `${e.source}>${e.target}`;
     if (seen.has(key)) continue;