| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238 |
- #!/usr/bin/env python3
- """
- Package discovery and context output.
- Provides:
- get_packages_info - Get structured package info
- get_packages_section - Build PACKAGES text section
- get_context_packages_text - Full packages text output (--mode packages)
- get_context_packages_json - Full packages JSON output (--mode packages --json)
- """
- from __future__ import annotations
- from pathlib import Path
- from .config import _is_true_config_value, get_default_package, get_packages, get_spec_scope
- from .paths import (
- DIR_SPEC,
- DIR_WORKFLOW,
- get_current_task,
- get_repo_root,
- )
- from .tasks import load_task
- # =============================================================================
- # Internal Helpers
- # =============================================================================
- def _scan_spec_layers(spec_dir: Path, package: str | None = None) -> list[str]:
- """Scan spec directory for available layers (subdirectories).
- For monorepo: scans spec/<package>/
- For single-repo: scans spec/
- """
- target = spec_dir / package if package else spec_dir
- if not target.is_dir():
- return []
- return sorted(
- d.name for d in target.iterdir() if d.is_dir() and d.name != "guides"
- )
- def _get_active_task_package(repo_root: Path) -> str | None:
- """Get the package field from the active task's task.json."""
- current = get_current_task(repo_root)
- if not current:
- return None
- ct = load_task(repo_root / current)
- return ct.package if ct and ct.package else None
- def _resolve_scope_set(
- packages: dict,
- spec_scope,
- task_pkg: str | None,
- default_pkg: str | None,
- ) -> set | None:
- """Resolve spec_scope to a set of allowed package names, or None for full scan."""
- if not packages:
- return None
- if spec_scope is None:
- return None
- if isinstance(spec_scope, str) and spec_scope == "active_task":
- if task_pkg and task_pkg in packages:
- return {task_pkg}
- if default_pkg and default_pkg in packages:
- return {default_pkg}
- return None
- if isinstance(spec_scope, list):
- valid = {e for e in spec_scope if e in packages}
- if valid:
- return valid
- # All invalid: fallback
- if task_pkg and task_pkg in packages:
- return {task_pkg}
- if default_pkg and default_pkg in packages:
- return {default_pkg}
- return None
- return None
- # =============================================================================
- # Public Functions
- # =============================================================================
- def get_packages_info(repo_root: Path) -> list[dict]:
- """Get structured package info for monorepo projects.
- Returns list of dicts with keys: name, path, type, default, specLayers,
- isSubmodule, isGitRepo.
- Returns empty list for single-repo projects.
- """
- packages = get_packages(repo_root)
- if not packages:
- return []
- default_pkg = get_default_package(repo_root)
- spec_dir = repo_root / DIR_WORKFLOW / DIR_SPEC
- result = []
- for pkg_name, pkg_config in packages.items():
- pkg_path = pkg_config.get("path", pkg_name) if isinstance(pkg_config, dict) else str(pkg_config)
- pkg_type = pkg_config.get("type", "local") if isinstance(pkg_config, dict) else "local"
- pkg_git = pkg_config.get("git", False) if isinstance(pkg_config, dict) else False
- layers = _scan_spec_layers(spec_dir, pkg_name)
- result.append({
- "name": pkg_name,
- "path": pkg_path,
- "type": pkg_type,
- "default": pkg_name == default_pkg,
- "specLayers": layers,
- "isSubmodule": pkg_type == "submodule",
- "isGitRepo": _is_true_config_value(pkg_git),
- })
- return result
- def get_packages_section(repo_root: Path) -> str:
- """Build the PACKAGES section for text output."""
- spec_dir = repo_root / DIR_WORKFLOW / DIR_SPEC
- pkg_info = get_packages_info(repo_root)
- lines: list[str] = []
- lines.append("## PACKAGES")
- if not pkg_info:
- lines.append("(single-repo mode)")
- layers = _scan_spec_layers(spec_dir)
- if layers:
- lines.append(f"Spec layers: {', '.join(layers)}")
- return "\n".join(lines)
- default_pkg = get_default_package(repo_root)
- for pkg in pkg_info:
- layers_str = f" [{', '.join(pkg['specLayers'])}]" if pkg["specLayers"] else ""
- submodule_tag = " (submodule)" if pkg["isSubmodule"] else ""
- git_repo_tag = " (git repo)" if pkg["isGitRepo"] else ""
- default_tag = " *" if pkg["default"] else ""
- lines.append(
- f"- {pkg['name']:<16} {pkg['path']:<20}{layers_str}{submodule_tag}{git_repo_tag}{default_tag}"
- )
- if default_pkg:
- lines.append(f"Default package: {default_pkg}")
- return "\n".join(lines)
- def get_context_packages_text(repo_root: Path | None = None) -> str:
- """Get packages context as formatted text (for --mode packages)."""
- if repo_root is None:
- repo_root = get_repo_root()
- pkg_info = get_packages_info(repo_root)
- lines: list[str] = []
- if not pkg_info:
- spec_dir = repo_root / DIR_WORKFLOW / DIR_SPEC
- lines.append("Single-repo project (no packages configured)")
- lines.append("")
- layers = _scan_spec_layers(spec_dir)
- if layers:
- lines.append(f"Spec layers: {', '.join(layers)}")
- return "\n".join(lines)
- # Resolve scope for annotations
- packages_dict = get_packages(repo_root) or {}
- default_pkg = get_default_package(repo_root)
- spec_scope = get_spec_scope(repo_root)
- task_pkg = _get_active_task_package(repo_root)
- scope_set = _resolve_scope_set(packages_dict, spec_scope, task_pkg, default_pkg)
- lines.append("## PACKAGES")
- lines.append("")
- for pkg in pkg_info:
- default_tag = " (default)" if pkg["default"] else ""
- type_tag = f" [{pkg['type']}]" if pkg["type"] != "local" else ""
- git_tag = " [git repo]" if pkg["isGitRepo"] else ""
- # Scope annotation
- scope_tag = ""
- if scope_set is not None and pkg["name"] not in scope_set:
- scope_tag = " (out of scope)"
- lines.append(f"### {pkg['name']}{default_tag}{type_tag}{git_tag}{scope_tag}")
- lines.append(f"Path: {pkg['path']}")
- if pkg["specLayers"]:
- lines.append(f"Spec layers: {', '.join(pkg['specLayers'])}")
- for layer in pkg["specLayers"]:
- lines.append(f" - .trellis/spec/{pkg['name']}/{layer}/index.md")
- else:
- lines.append("Spec: not configured")
- lines.append("")
- # Also show shared guides
- guides_dir = repo_root / DIR_WORKFLOW / DIR_SPEC / "guides"
- if guides_dir.is_dir():
- lines.append("### Shared Guides (always included)")
- lines.append("Path: .trellis/spec/guides/index.md")
- lines.append("")
- return "\n".join(lines)
- def get_context_packages_json(repo_root: Path | None = None) -> dict:
- """Get packages context as a dictionary (for --mode packages --json)."""
- if repo_root is None:
- repo_root = get_repo_root()
- pkg_info = get_packages_info(repo_root)
- if not pkg_info:
- spec_dir = repo_root / DIR_WORKFLOW / DIR_SPEC
- layers = _scan_spec_layers(spec_dir)
- return {
- "mode": "single-repo",
- "specLayers": layers,
- }
- default_pkg = get_default_package(repo_root)
- spec_scope = get_spec_scope(repo_root)
- task_pkg = _get_active_task_package(repo_root)
- return {
- "mode": "monorepo",
- "packages": pkg_info,
- "defaultPackage": default_pkg,
- "specScope": spec_scope,
- "activeTaskPackage": task_pkg,
- }
|