1
0

celery-dispatch-synthesizer.test.ts 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129
  1. /**
  2. * Celery task-dispatch bridge (Python).
  3. *
  4. * Celery decouples a task's call site from its body: a `@shared_task` / `@app.task`
  5. * decorated `def` is invoked through `task.delay(...)` / `task.apply_async(...)`, a
  6. * dynamic hop with no static edge. This bridges each `.delay`/`.apply_async` site to
  7. * the task function, gated on the DECORATOR (read from the source above the `def`) so a
  8. * `.delay()` on a non-task object resolves to nothing. Covers both decorator dialects
  9. * (`@shared_task`, `@app.task(...)`), the module-qualified `mod.task.apply_async()` form,
  10. * and proves the precision gates: a plain function called with `.delay()` and a canvas
  11. * `group(...).delay()` (no single identifier before `.delay`) both contribute no edge.
  12. */
  13. import { describe, it, expect, beforeEach, afterEach } from 'vitest';
  14. import * as fs from 'node:fs';
  15. import * as path from 'node:path';
  16. import * as os from 'node:os';
  17. import { CodeGraph } from '../src';
  18. describe('celery-dispatch synthesizer', () => {
  19. let dir: string;
  20. beforeEach(() => { dir = fs.mkdtempSync(path.join(os.tmpdir(), 'celery-dispatch-')); });
  21. afterEach(() => { fs.rmSync(dir, { recursive: true, force: true }); });
  22. it('bridges .delay()/.apply_async() to decorated tasks, ignoring non-task and canvas dispatch', async () => {
  23. // Two decorator dialects: bare @shared_task and arg'd @app.task(...).
  24. fs.writeFileSync(
  25. path.join(dir, 'tasks.py'),
  26. `from celery import shared_task
  27. from myapp.celery import app
  28. @shared_task
  29. def send_email(to):
  30. return to
  31. @app.task(bind=True, max_retries=3)
  32. def crunch(self, n):
  33. return n * 2
  34. `
  35. );
  36. fs.mkdirSync(path.join(dir, 'services'), { recursive: true });
  37. fs.writeFileSync(
  38. path.join(dir, 'services', 'tickets.py'),
  39. `from celery import shared_task
  40. @shared_task
  41. def invalidate_cache():
  42. return None
  43. `
  44. );
  45. // A plain function — NOT a celery task — that nonetheless has .delay() called on it.
  46. fs.writeFileSync(
  47. path.join(dir, 'utils.py'),
  48. `def process_data(x):
  49. return x
  50. `
  51. );
  52. // Dispatch sites, all inside one enclosing function.
  53. fs.writeFileSync(
  54. path.join(dir, 'views.py'),
  55. `from tasks import send_email, crunch
  56. from services import tickets
  57. from utils import process_data
  58. from celery import group
  59. def handle_request(req):
  60. send_email.delay(req.addr) # → send_email task (cross-file)
  61. crunch.apply_async(args=[5]) # → crunch task (@app.task dialect)
  62. tickets.invalidate_cache.apply_async() # module-qualified → invalidate_cache
  63. process_data.delay(req.x) # NOT a task → no edge
  64. group([send_email.s(a) for a in req.addrs]).delay() # canvas → no edge
  65. `
  66. );
  67. const cg = await CodeGraph.init(dir, { silent: true });
  68. await cg.indexAll();
  69. const db = (cg as any).db.db;
  70. const edges = db
  71. .prepare(
  72. `SELECT s.name source, t.name target, t.file_path tf, json_extract(e.metadata,'$.via') via
  73. FROM edges e JOIN nodes s ON s.id = e.source JOIN nodes t ON t.id = e.target
  74. WHERE json_extract(e.metadata,'$.synthesizedBy') = 'celery-dispatch'`
  75. )
  76. .all();
  77. const targets = (src: string) => edges.filter((r: any) => r.source === src).map((r: any) => r.target).sort();
  78. // handle_request dispatches exactly the three real tasks (both dialects + module-qualified).
  79. expect(targets('handle_request')).toEqual(['crunch', 'invalidate_cache', 'send_email']);
  80. // The @app.task target resolved to the task def, not anything else.
  81. const crunchEdge = edges.find((r: any) => r.target === 'crunch');
  82. expect(crunchEdge.tf).toMatch(/tasks\.py$/);
  83. // Module-qualified `tickets.invalidate_cache.apply_async()` resolved by the last identifier.
  84. const cacheEdge = edges.find((r: any) => r.target === 'invalidate_cache');
  85. expect(cacheEdge.tf).toMatch(/services[\\/]tickets\.py$/);
  86. expect(cacheEdge.via).toBe('invalidate_cache');
  87. // PRECISION: a plain function called with .delay() is never targeted (no decorator).
  88. expect(edges.some((r: any) => r.target === 'process_data')).toBe(false);
  89. cg.close?.();
  90. });
  91. it('produces no edges in a Celery-free project (clean control)', async () => {
  92. fs.writeFileSync(
  93. path.join(dir, 'app.py'),
  94. `def schedule(job):
  95. job.delay() # a .delay() that has nothing to do with Celery
  96. return job
  97. def run():
  98. schedule(make_job())
  99. `
  100. );
  101. const cg = await CodeGraph.init(dir, { silent: true });
  102. await cg.indexAll();
  103. const db = (cg as any).db.db;
  104. const count = db
  105. .prepare(
  106. `SELECT count(*) c FROM edges WHERE json_extract(metadata,'$.synthesizedBy') = 'celery-dispatch'`
  107. )
  108. .get();
  109. expect(count.c).toBe(0);
  110. cg.close?.();
  111. });
  112. });