| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- """
- Workflow Phase Extraction.
- Extracts step-level content from .trellis/workflow.md and optionally filters
- platform-specific blocks.
- Platform marker syntax in workflow.md:
- [Claude Code, Cursor, ...]
- agent-capable content
- [/Claude Code, Cursor, ...]
- Provides:
- get_phase_index - Extract the Phase Index section (no --step)
- get_step - Extract a single step (#### X.X) section
- filter_platform - Strip platform blocks that don't include the given name
- """
- from __future__ import annotations
- import re
- from .paths import DIR_WORKFLOW, get_repo_root
- def _workflow_md_path():
- return get_repo_root() / DIR_WORKFLOW / "workflow.md"
- # Match a line that *is* a platform marker: "[A, B, C]" or "[/A, B, C]"
- _MARKER_RE = re.compile(r"^\[(/?)([A-Za-z][^\[\]]*)\]\s*$")
- # Step heading: "#### 1.0 Title" or "#### 1.0 ..."
- _STEP_HEADING_RE = re.compile(r"^####\s+(\d+\.\d+)\b.*$")
- # Phase Index starts here; Phase 1/2/3 step bodies follow; ends at Breadcrumbs.
- _PHASE_INDEX_HEADING = "## Phase Index"
- def _read_workflow() -> str:
- path = _workflow_md_path()
- if not path.exists():
- raise FileNotFoundError(f"workflow.md not found: {path}")
- return path.read_text(encoding="utf-8")
- def _parse_marker(line: str) -> tuple[bool, list[str]] | None:
- """Parse a platform marker line.
- Returns:
- (is_closing, [platform_names]) if line is a marker, else None.
- """
- m = _MARKER_RE.match(line)
- if not m:
- return None
- is_closing = m.group(1) == "/"
- names = [p.strip() for p in m.group(2).split(",") if p.strip()]
- return is_closing, names
- def get_phase_index() -> str:
- """Return Phase Index + Phase 1/2/3 step bodies from workflow.md.
- Matches what the SessionStart hook injects into the `<workflow>` block:
- starts at `## Phase Index`, continues through `## Phase 1: Plan`,
- `## Phase 2: Execute`, `## Phase 3: Finish`, stops at
- `## Customizing Trellis (for forks)` (the docs-for-forks footer).
- `[workflow-state:STATUS]` tag blocks (now embedded in Phase Index since
- v0.5.0-rc.0) are consumed by the UserPromptSubmit hook so they're
- stripped from this output.
- """
- text = _read_workflow()
- lines = text.splitlines()
- start: int | None = None
- end: int | None = None
- for i, line in enumerate(lines):
- stripped = line.strip()
- if start is None and stripped == _PHASE_INDEX_HEADING:
- start = i
- continue
- if start is not None and stripped == "## Customizing Trellis (for forks)":
- end = i
- break
- if start is None:
- return ""
- if end is None:
- end = len(lines)
- section = "\n".join(lines[start:end]).rstrip()
- # Strip [workflow-state:STATUS]...[/workflow-state:STATUS] blocks since
- # they're injected separately by inject-workflow-state.py per-turn.
- import re as _re
- tag_re = _re.compile(
- r"\[workflow-state:([A-Za-z0-9_-]+)\]\s*\n.*?\n\s*\[/workflow-state:\1\]\n?",
- _re.DOTALL,
- )
- return tag_re.sub("", section).rstrip() + "\n"
- def get_step(step_id: str) -> str:
- """Return the `#### X.X` section matching step_id (header + body).
- Body ends at the next `####` or `---` or `##` heading (whichever comes first).
- """
- text = _read_workflow()
- lines = text.splitlines()
- start: int | None = None
- for i, line in enumerate(lines):
- m = _STEP_HEADING_RE.match(line)
- if m and m.group(1) == step_id:
- start = i
- break
- if start is None:
- return ""
- end: int = len(lines)
- for j in range(start + 1, len(lines)):
- line = lines[j]
- if line.startswith("#### "):
- end = j
- break
- if line.startswith("## "):
- end = j
- break
- # Horizontal rule at column 0
- if line.strip() == "---":
- end = j
- break
- return "\n".join(lines[start:end]).rstrip() + "\n"
- def _platform_matches(platform: str, block_names: list[str]) -> bool:
- """Case-insensitive fuzzy match: accept 'cursor', 'Cursor', 'claude-code', 'Claude Code'."""
- needle = platform.lower().replace("-", "").replace("_", "").replace(" ", "")
- for name in block_names:
- hay = name.lower().replace("-", "").replace("_", "").replace(" ", "")
- if needle == hay:
- return True
- return False
- def resolve_effective_platform(platform: str, config: dict) -> str:
- """Map ``codex`` to a dispatch-mode-namespaced virtual platform name.
- When ``--platform codex`` is passed, return ``"codex-inline"`` (default)
- or ``"codex-sub-agent"`` based on ``.trellis/config.yaml`` ``codex.dispatch_mode``.
- ``filter_platform`` then surfaces blocks whose marker lists include the
- namespaced name (e.g. ``[codex-sub-agent, ...]`` or ``[codex-inline, Kilo,
- Antigravity, Windsurf]``).
- Default is ``inline`` because Codex sub-agents run with ``fork_turns="none"``
- isolation and can't inherit the parent session's task context — inline
- keeps the main agent in charge so context isn't lost. Invalid / missing
- values also fall back to inline.
- Other platforms are returned unchanged.
- """
- if platform == "codex":
- mode = "inline"
- codex_cfg = config.get("codex") if isinstance(config, dict) else None
- if isinstance(codex_cfg, dict):
- cfg_mode = codex_cfg.get("dispatch_mode")
- if cfg_mode in ("inline", "sub-agent"):
- mode = cfg_mode
- return f"codex-{mode}"
- return platform
- def filter_platform(content: str, platform: str) -> str:
- """Keep lines outside any `[...]` block + lines inside blocks that include platform.
- Marker lines themselves are dropped from the output.
- """
- lines = content.splitlines()
- out: list[str] = []
- in_block = False
- keep_block = False
- for line in lines:
- marker = _parse_marker(line)
- if marker is not None:
- is_closing, names = marker
- if not is_closing:
- in_block = True
- keep_block = _platform_matches(platform, names)
- else:
- in_block = False
- keep_block = False
- continue # drop the marker line itself
- if in_block:
- if keep_block:
- out.append(line)
- continue
- out.append(line)
- # Collapse runs of 3+ blank lines that may arise from dropped markers
- collapsed: list[str] = []
- blank_run = 0
- for line in out:
- if line.strip() == "":
- blank_run += 1
- if blank_run <= 2:
- collapsed.append(line)
- else:
- blank_run = 0
- collapsed.append(line)
- return "\n".join(collapsed).rstrip() + "\n"
|