| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- """
- Step 3 审查结果处理。
- 读取 reviewer agent 的原始输出 JSON,解析为 ReviewResult,
- 生成 metrics 用于 index.db 沉淀。
- """
- from __future__ import annotations
- import argparse
- import json
- import sys
- from pathlib import Path
- from typing import Any, Dict, List
- from runtime_compat import enable_windows_utf8_stdio
- def _ensure_scripts_path() -> None:
- scripts_dir = Path(__file__).resolve().parent
- if str(scripts_dir) not in sys.path:
- sys.path.insert(0, str(scripts_dir))
- _ensure_scripts_path()
- from data_modules.review_author_view import render_review_author_view
- from data_modules.review_schema import append_ai_flavor_anti_patterns, parse_review_output
- def _resolve_report_path(project_root: Path, report_file: str) -> Path:
- root = project_root.expanduser().resolve()
- report_path = Path(report_file).expanduser()
- if not report_path.is_absolute():
- report_path = root / report_path
- report_path = report_path.resolve()
- try:
- report_path.relative_to(root)
- except ValueError as exc:
- raise ValueError("report-file 必须位于 project_root 目录内") from exc
- return report_path
- def _format_issue(issue: Dict[str, Any], index: int) -> List[str]:
- description = str(issue.get("description") or "未填写问题描述")
- severity = str(issue.get("severity") or "medium")
- category = str(issue.get("category") or "other")
- location = str(issue.get("location") or "未标注位置")
- evidence = str(issue.get("evidence") or "未提供证据")
- fix_hint = str(issue.get("fix_hint") or "未提供修复方向")
- blocking = "是" if issue.get("blocking") else "否"
- return [
- f"{index}. **{description}**",
- f" - 严重级别:{severity}",
- f" - 分类:{category}",
- f" - 位置:{location}",
- f" - 阻断:{blocking}",
- f" - 证据:{evidence}",
- f" - 修复方向:{fix_hint}",
- ]
- def render_review_report(payload: Dict[str, Any]) -> str:
- result = payload["review_result"]
- metrics = payload["metrics"]
- issues = list(result.get("issues", []))
- blocking_issues = [issue for issue in issues if issue.get("blocking")]
- non_blocking_issues = [issue for issue in issues if not issue.get("blocking")]
- severity_counts = metrics.get("severity_counts", {})
- lines: List[str] = [
- f"# 第{payload['chapter']}章审查报告",
- "",
- render_review_author_view(payload).rstrip(),
- "",
- "## 总览",
- "",
- f"- 问题数:{result.get('issues_count', 0)}",
- f"- 阻断数:{result.get('blocking_count', 0)}",
- f"- 结论:{'需修复后重审' if result.get('has_blocking') else '无阻断问题'}",
- ]
- summary = str(result.get("summary") or "").strip()
- if summary:
- lines.append(f"- 摘要:{summary}")
- if severity_counts:
- ordered = [
- f"{level}={severity_counts.get(level, 0)}"
- for level in ("critical", "high", "medium", "low")
- ]
- lines.append(f"- 严重级别统计:{', '.join(ordered)}")
- lines.extend(["", "## 阻断问题", ""])
- if blocking_issues:
- for index, issue in enumerate(blocking_issues, start=1):
- lines.extend(_format_issue(issue, index))
- lines.append("")
- else:
- lines.append("无。")
- lines.append("")
- lines.extend(["## 其他问题", ""])
- if non_blocking_issues:
- for index, issue in enumerate(non_blocking_issues, start=1):
- lines.extend(_format_issue(issue, index))
- lines.append("")
- else:
- lines.append("无。")
- lines.append("")
- lines.extend(["## 修复方向", ""])
- if issues:
- ordered_issues = [*blocking_issues, *non_blocking_issues]
- for index, issue in enumerate(ordered_issues, start=1):
- description = str(issue.get("description") or "未填写问题描述")
- fix_hint = str(issue.get("fix_hint") or "未提供修复方向")
- lines.append(f"{index}. {description}:{fix_hint}")
- else:
- lines.append("暂无需要修复的问题。")
- return "\n".join(lines).rstrip() + "\n"
- def write_review_report(project_root: Path, report_file: str, payload: Dict[str, Any]) -> Path:
- report_path = _resolve_report_path(project_root, report_file)
- report_path.parent.mkdir(parents=True, exist_ok=True)
- report_path.write_text(render_review_report(payload), encoding="utf-8")
- return report_path
- def _build_review_metrics_record(metrics: Dict[str, Any]):
- from data_modules.index_manager import ReviewMetrics
- return ReviewMetrics(
- start_chapter=int(metrics["start_chapter"]),
- end_chapter=int(metrics["end_chapter"]),
- overall_score=float(metrics.get("overall_score", 0.0)),
- dimension_scores=dict(metrics.get("dimension_scores", {})),
- severity_counts=dict(metrics.get("severity_counts", {})),
- critical_issues=list(metrics.get("critical_issues", [])),
- report_file=str(metrics.get("report_file", "")),
- notes=str(metrics.get("notes", "")),
- )
- def build_review_artifacts(
- project_root: Path,
- chapter: int,
- review_results_path: Path,
- report_file: str = "",
- ) -> Dict[str, Any]:
- raw = json.loads(review_results_path.read_text(encoding="utf-8"))
- result = parse_review_output(chapter=chapter, raw=raw)
- anti_patterns_added = append_ai_flavor_anti_patterns(project_root, result)
- metrics = result.to_metrics_dict(report_file=report_file)
- normalized_review = result.to_dict()
- review_results_path.parent.mkdir(parents=True, exist_ok=True)
- review_results_path.write_text(
- json.dumps(normalized_review, ensure_ascii=False, indent=2),
- encoding="utf-8",
- )
- return {
- "chapter": chapter,
- "review_result": normalized_review,
- "metrics": metrics,
- "anti_patterns_added": anti_patterns_added,
- }
- def main() -> None:
- parser = argparse.ArgumentParser(description="Review pipeline v6")
- parser.add_argument("--project-root", required=True)
- parser.add_argument("--chapter", type=int, required=True)
- parser.add_argument("--review-results", required=True)
- parser.add_argument("--metrics-out", default="")
- parser.add_argument("--report-file", default="")
- parser.add_argument("--save-metrics", action="store_true",
- help="直接写入 index.db,省去单独调用 save-review-metrics")
- args = parser.parse_args()
- project_root = Path(args.project_root)
- review_results_path = Path(args.review_results)
- payload = build_review_artifacts(
- project_root=project_root,
- chapter=args.chapter,
- review_results_path=review_results_path,
- report_file=args.report_file,
- )
- if args.metrics_out:
- out_path = Path(args.metrics_out)
- out_path.parent.mkdir(parents=True, exist_ok=True)
- out_path.write_text(
- json.dumps(payload["metrics"], ensure_ascii=False, indent=2),
- encoding="utf-8",
- )
- if args.report_file:
- write_review_report(
- project_root=project_root,
- report_file=args.report_file,
- payload=payload,
- )
- if args.save_metrics:
- from data_modules.config import DataModulesConfig
- from data_modules.index_manager import IndexManager
- config = DataModulesConfig.from_project_root(project_root)
- manager = IndexManager(config)
- manager.save_review_metrics(_build_review_metrics_record(payload["metrics"]))
- print(json.dumps(payload, ensure_ascii=False, indent=2))
- if __name__ == "__main__":
- if sys.platform == "win32":
- enable_windows_utf8_stdio()
- main()
|