validate_csv.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. """
  4. CSV 数据校验工具。
  5. 基于 CSV_CONFIG 和 canonical genre 枚举校验 references/csv/ 下所有表的数据质量。
  6. """
  7. from __future__ import annotations
  8. import argparse
  9. import csv
  10. import json
  11. import re
  12. import sys
  13. from pathlib import Path
  14. from typing import Any, Dict, List, Optional
  15. sys.path.insert(0, str(Path(__file__).resolve().parent))
  16. from reference_search import CSV_CONFIG, GENRE_CANONICAL, split_multi_value
  17. from genre_taxonomy import default_taxonomy_path, load_genre_taxonomy
  18. _CHINESE_COMMA_RE = re.compile(r",")
  19. _MULTI_VALUE_COLUMNS = ("适用技能", "关键词", "意图与同义词", "适用题材")
  20. _ROUTE_TABLE = "题材与调性推理"
  21. _REASONING_TABLE = "裁决规则"
  22. _MIN_ROUTE_ROWS = 16
  23. _MIN_REASONING_ROWS = 14
  24. _VALID_SKILLS = {"init", "plan", "write", "review", "query", "learn", "dashboard", "story-system"}
  25. _VALID_LEVELS = {"提醒", "缺陷补偿", "知识补充"}
  26. def _split_multi_value(cell: str) -> List[str]:
  27. return split_multi_value(cell)
  28. def _default_csv_dir() -> Path:
  29. return Path(__file__).resolve().parent.parent / "references" / "csv"
  30. def _read_csv(path: Path) -> tuple[List[str], List[Dict[str, Any]]]:
  31. with open(path, "r", encoding="utf-8-sig", newline="") as f:
  32. reader = csv.DictReader(f)
  33. rows = list(reader)
  34. headers = list(reader.fieldnames or [])
  35. return headers, rows
  36. def _validate_genre_taxonomy(errors: List[str], warnings: List[str]) -> None:
  37. taxonomy_path = default_taxonomy_path()
  38. if not taxonomy_path.exists():
  39. errors.append(f"[genre-index] 文件不存在: {taxonomy_path}")
  40. return
  41. try:
  42. taxonomy = load_genre_taxonomy(str(taxonomy_path))
  43. except Exception as exc:
  44. errors.append(f"[genre-index] 加载失败: {exc}")
  45. return
  46. templates_dir = Path(__file__).resolve().parent.parent / "templates" / "genres"
  47. template_files = {path.name for path in templates_dir.glob("*.md")}
  48. referenced_files = {entry.template_file for entry in taxonomy.entries if entry.template_file}
  49. missing_templates = sorted(referenced_files - template_files)
  50. if missing_templates:
  51. errors.append(f"[genre-index] template_file 不存在: {', '.join(missing_templates)}")
  52. unreferenced_templates = sorted(template_files - referenced_files)
  53. if unreferenced_templates:
  54. errors.append(f"[genre-index] 模板未被 index 覆盖: {', '.join(unreferenced_templates)}")
  55. for entry in taxonomy.entries:
  56. if entry.template_file and not entry.template_file.endswith(".md"):
  57. errors.append(f"[genre-index] {entry.label} template_file 应以 .md 结尾: {entry.template_file}")
  58. if entry.canonical_genre != "全部" and entry.canonical_genre not in GENRE_CANONICAL:
  59. errors.append(f"[genre-index] {entry.label} canonical_genre 不合法: {entry.canonical_genre}")
  60. if len(template_files) != 37:
  61. warnings.append(f"[genre-index] 当前模板数量为 {len(template_files)},预期 37")
  62. def validate(csv_dir: Path) -> Dict[str, List[str]]:
  63. errors: List[str] = []
  64. warnings: List[str] = []
  65. all_ids: Dict[str, str] = {}
  66. valid_genres = GENRE_CANONICAL | {"全部"}
  67. _validate_genre_taxonomy(errors, warnings)
  68. for table_name, config in CSV_CONFIG.items():
  69. csv_path = csv_dir / config["file"]
  70. if not csv_path.exists():
  71. errors.append(f"[{table_name}] 文件不存在: {config['file']}")
  72. continue
  73. headers, rows = _read_csv(csv_path)
  74. header_set = set(headers)
  75. prefix = str(config.get("prefix", "")).strip()
  76. required_cols = list(config.get("required_cols", []))
  77. declared_cols = set(config.get("search_cols", {}).keys())
  78. declared_cols.update(config.get("output_cols", []))
  79. declared_cols.update(required_cols)
  80. poison_col = str(config.get("poison_col", "")).strip()
  81. if poison_col:
  82. declared_cols.add(poison_col)
  83. missing_headers = declared_cols - header_set
  84. if missing_headers:
  85. joined = ", ".join(sorted(missing_headers))
  86. errors.append(f"[{table_name}] CSV 缺少列头: {joined}")
  87. for line_no, row in enumerate(rows, start=2):
  88. row_id = (row.get("编号") or "").strip()
  89. if None in row:
  90. extras = row.get(None) or []
  91. errors.append(
  92. f"[{table_name}] 行{line_no} ({row_id or '无编号'}) 字段数超过表头: {extras}"
  93. )
  94. if row_id:
  95. if row_id in all_ids:
  96. errors.append(
  97. f"[{table_name}] 行{line_no} 编号 {row_id} 重复(首次出现于 {all_ids[row_id]})"
  98. )
  99. else:
  100. all_ids[row_id] = table_name
  101. if prefix and row_id and not row_id.startswith(f"{prefix}-"):
  102. errors.append(f"[{table_name}] 行{line_no} 编号 {row_id} 应以 {prefix}- 开头")
  103. for col in required_cols:
  104. value = (row.get(col) or "").strip()
  105. if not value:
  106. errors.append(f"[{table_name}] 行{line_no} ({row_id}) 必填列 {col} 为空")
  107. for col in _MULTI_VALUE_COLUMNS:
  108. value = row.get(col) or ""
  109. if _CHINESE_COMMA_RE.search(value):
  110. errors.append(
  111. f"[{table_name}] 行{line_no} ({row_id}) {col} 含中文逗号,应使用 |"
  112. )
  113. skill_cell = (row.get("适用技能") or "").strip()
  114. if "适用技能" in header_set:
  115. skill_tokens = _split_multi_value(skill_cell)
  116. if not skill_tokens:
  117. errors.append(f"[{table_name}] 行{line_no} ({row_id}) 适用技能为空")
  118. for skill in skill_tokens:
  119. if skill not in _VALID_SKILLS:
  120. errors.append(f"[{table_name}] 行{line_no} ({row_id}) 适用技能值 '{skill}' 不合法")
  121. if "层级" in header_set:
  122. level = (row.get("层级") or "").strip()
  123. allowed_levels = set(_VALID_LEVELS)
  124. if table_name == _REASONING_TABLE:
  125. allowed_levels.add("推理层")
  126. if not level:
  127. errors.append(f"[{table_name}] 行{line_no} ({row_id}) 层级为空")
  128. elif level not in allowed_levels:
  129. errors.append(f"[{table_name}] 行{line_no} ({row_id}) 层级值 '{level}' 不合法")
  130. genre_cell = (row.get("适用题材") or "").strip()
  131. if genre_cell:
  132. for genre in _split_multi_value(genre_cell):
  133. if genre not in valid_genres:
  134. warnings.append(
  135. f"[{table_name}] 行{line_no} ({row_id}) 适用题材值 '{genre}' 不在 canonical 枚举中"
  136. )
  137. route_path = csv_dir / f"{_ROUTE_TABLE}.csv"
  138. route_canonicals: set[str] = set()
  139. route_rows: List[Dict[str, str]] = []
  140. if route_path.exists():
  141. _, route_rows = _read_csv(route_path)
  142. if len(route_rows) < _MIN_ROUTE_ROWS:
  143. warnings.append(
  144. f"[{_ROUTE_TABLE}] 路由行数 {len(route_rows)} 低于 Phase 2 验收线 {_MIN_ROUTE_ROWS}"
  145. )
  146. for line_no, row in enumerate(route_rows, start=2):
  147. row_id = (row.get("编号") or "").strip()
  148. canonical = (row.get("canonical_genre") or "").strip()
  149. if not canonical:
  150. warnings.append(f"[{_ROUTE_TABLE}] 行{line_no} ({row_id}) canonical_genre 为空")
  151. continue
  152. if canonical == "全部":
  153. continue
  154. if canonical not in GENRE_CANONICAL:
  155. warnings.append(
  156. f"[{_ROUTE_TABLE}] 行{line_no} ({row_id}) canonical_genre '{canonical}' 不在 canonical 枚举中"
  157. )
  158. continue
  159. route_canonicals.add(canonical)
  160. reasoning_path = csv_dir / f"{_REASONING_TABLE}.csv"
  161. reasoning_rows: List[Dict[str, str]] = []
  162. reasoning_genres: set[str] = set()
  163. if reasoning_path.exists():
  164. _, reasoning_rows = _read_csv(reasoning_path)
  165. if len(reasoning_rows) < _MIN_REASONING_ROWS:
  166. warnings.append(
  167. f"[{_REASONING_TABLE}] 裁决行数 {len(reasoning_rows)} 低于 Phase 2 验收线 {_MIN_REASONING_ROWS}"
  168. )
  169. for line_no, row in enumerate(reasoning_rows, start=2):
  170. row_id = (row.get("编号") or "").strip()
  171. genre = (row.get("题材") or "").strip()
  172. if not genre:
  173. continue
  174. if genre not in GENRE_CANONICAL:
  175. warnings.append(f"[{_REASONING_TABLE}] 行{line_no} ({row_id}) 题材 '{genre}' 不在 canonical 枚举中")
  176. continue
  177. reasoning_genres.add(genre)
  178. for canonical_genre in sorted(GENRE_CANONICAL):
  179. if canonical_genre not in reasoning_genres:
  180. warnings.append(f"[{_REASONING_TABLE}] canonical genre '{canonical_genre}' 无对应裁决行")
  181. for canonical_genre in sorted(route_canonicals):
  182. if canonical_genre not in reasoning_genres:
  183. warnings.append(f"[{_ROUTE_TABLE}] canonical genre '{canonical_genre}' 无对应裁决行")
  184. for canonical_genre in sorted(reasoning_genres):
  185. if route_rows and canonical_genre not in route_canonicals:
  186. warnings.append(f"[{_REASONING_TABLE}] canonical genre '{canonical_genre}' 无对应路由行")
  187. return {"errors": errors, "warnings": warnings}
  188. def main(argv: Optional[List[str]] = None) -> int:
  189. parser = argparse.ArgumentParser(description="Validate reference CSV files")
  190. parser.add_argument("--csv-dir", default=None, help="Override CSV directory")
  191. parser.add_argument("--format", choices=["text", "json"], default="text")
  192. args = parser.parse_args(argv)
  193. csv_dir = Path(args.csv_dir) if args.csv_dir else _default_csv_dir()
  194. result = validate(csv_dir)
  195. if args.format == "json":
  196. print(json.dumps(result, ensure_ascii=False, indent=2))
  197. else:
  198. for error in result["errors"]:
  199. print(f"ERROR: {error}")
  200. for warning in result["warnings"]:
  201. print(f"WARN: {warning}")
  202. print(f"\n--- {len(result['errors'])} error(s), {len(result['warnings'])} warning(s) ---")
  203. return 1 if result["errors"] else 0
  204. if __name__ == "__main__":
  205. raise SystemExit(main())