Forráskód Böngészése

fix: bound json file arguments to project root

lingfengQAQ 3 hete
szülő
commit
3b8104c693

+ 17 - 2
webnovel-writer/scripts/data_modules/cli_args.py

@@ -74,12 +74,27 @@ def normalize_global_project_root(argv: List[str], *, flag: str = "--project-roo
     return [flag, value] + rest
 
 
-def load_json_arg(raw: str) -> Any:
+def _resolve_json_arg_file(target: str, *, base_dir: str | Path | None = None) -> Path:
+    path = Path(target).expanduser()
+    if not path.is_absolute() and base_dir is not None:
+        path = Path(base_dir) / path
+    resolved = path.resolve()
+    if base_dir is not None:
+        base = Path(base_dir).expanduser().resolve()
+        try:
+            resolved.relative_to(base)
+        except ValueError as exc:
+            raise ValueError(f"json arg file outside allowed directory: {resolved}") from exc
+    return resolved
+
+
+def load_json_arg(raw: str, *, base_dir: str | Path | None = None) -> Any:
     """
     解析 CLI 传入的 JSON 参数,支持两种形式:
     - 直接 JSON 字符串:'{"a":1}'
     - @ 文件路径:'@data.json'(从文件读取 JSON,避免 shell 引号地狱)
       - 特例:'@-' 表示从 stdin 读取
+      - 当传入 base_dir 时,@ 文件必须位于 base_dir 内
     """
     if raw is None:
         raise ValueError("missing json arg")
@@ -91,6 +106,6 @@ def load_json_arg(raw: str) -> Any:
         if target == "-":
             content = sys.stdin.read()
         else:
-            content = Path(target).read_text(encoding="utf-8")
+            content = _resolve_json_arg_file(target, base_dir=base_dir).read_text(encoding="utf-8")
         return json.loads(content)
     return json.loads(text)

+ 11 - 11
webnovel-writer/scripts/data_modules/index_manager.py

@@ -1030,8 +1030,8 @@ def main():
         emit_success(scenes, message="scenes")
 
     elif args.command == "process-chapter":
-        entities = load_json_arg(args.entities)
-        scenes = load_json_arg(args.scenes)
+        entities = load_json_arg(args.entities, base_dir=args.project_root)
+        scenes = load_json_arg(args.scenes, base_dir=args.project_root)
         stats = manager.process_chapter_data(
             chapter=args.chapter,
             title=args.title,
@@ -1132,7 +1132,7 @@ def main():
 
     elif args.command == "record-relationship-event":
         try:
-            data = load_json_arg(args.data)
+            data = load_json_arg(args.data, base_dir=args.project_root)
         except (TypeError, ValueError, json.JSONDecodeError):
             emit_error("INVALID_RELATIONSHIP_EVENT", "关系事件 JSON 无效")
         else:
@@ -1156,7 +1156,7 @@ def main():
                 emit_error("INVALID_RELATIONSHIP_EVENT", "关系事件参数无效,未写入")
 
     elif args.command == "upsert-entity":
-        data = load_json_arg(args.data)
+        data = load_json_arg(args.data, base_dir=args.project_root)
         entity = EntityMeta(
             id=data["id"],
             type=data["type"],
@@ -1173,7 +1173,7 @@ def main():
         emit_success({"id": entity.id, "created": is_new}, message="entity_upserted")
 
     elif args.command == "upsert-relationship":
-        data = load_json_arg(args.data)
+        data = load_json_arg(args.data, base_dir=args.project_root)
         rel = RelationshipMeta(
             from_entity=data["from_entity"],
             to_entity=data["to_entity"],
@@ -1188,7 +1188,7 @@ def main():
         )
 
     elif args.command == "record-state-change":
-        data = load_json_arg(args.data)
+        data = load_json_arg(args.data, base_dir=args.project_root)
         change = StateChangeMeta(
             entity_id=data["entity_id"],
             field=data["field"],
@@ -1224,7 +1224,7 @@ def main():
         emit_success(rows, message="invalid_list")
 
     elif args.command == "save-review-metrics":
-        data = load_json_arg(args.data)
+        data = load_json_arg(args.data, base_dir=args.project_root)
         metrics = ReviewMetrics(
             start_chapter=data["start_chapter"],
             end_chapter=data["end_chapter"],
@@ -1250,7 +1250,7 @@ def main():
         emit_success(stats, message="review_trend_stats")
 
     elif args.command == "save-writing-checklist-score":
-        data = load_json_arg(args.data)
+        data = load_json_arg(args.data, base_dir=args.project_root)
         metrics = WritingChecklistScoreMeta(
             chapter=data["chapter"],
             template=data.get("template", "plot"),
@@ -1346,7 +1346,7 @@ def main():
             emit_success(result, message="debt_payment", chapter=args.chapter)
 
     elif args.command == "create-override-contract":
-        data = load_json_arg(args.data)
+        data = load_json_arg(args.data, base_dir=args.project_root)
         contract = OverrideContractMeta(
             chapter=data["chapter"],
             constraint_type=data["constraint_type"],
@@ -1361,7 +1361,7 @@ def main():
         emit_success({"id": contract_id}, message="override_contract_created")
 
     elif args.command == "create-debt":
-        data = load_json_arg(args.data)
+        data = load_json_arg(args.data, base_dir=args.project_root)
         debt = ChaseDebtMeta(
             debt_type=data["debt_type"],
             original_amount=data.get("original_amount", 1.0),
@@ -1383,7 +1383,7 @@ def main():
             emit_error("NOT_FOUND", f"未找到 Override Contract #{args.contract_id}")
 
     elif args.command == "save-chapter-reading-power":
-        data = load_json_arg(args.data)
+        data = load_json_arg(args.data, base_dir=args.project_root)
         meta = ChapterReadingPowerMeta(
             chapter=data["chapter"],
             hook_type=data.get("hook_type", ""),

+ 3 - 2
webnovel-writer/scripts/data_modules/memory/store.py

@@ -233,8 +233,9 @@ def main() -> None:
     if args.command == "update":
         from .writer import MemoryWriter
 
-        payload = load_json_arg(args.data)
-        writer = MemoryWriter(config or get_config())
+        resolved_config = config or get_config()
+        payload = load_json_arg(args.data, base_dir=resolved_config.project_root)
+        writer = MemoryWriter(resolved_config)
         result = writer.update_from_chapter_result(args.chapter, payload)
         print_success(result, message="memory_updated")
         return

+ 1 - 1
webnovel-writer/scripts/data_modules/rag_adapter.py

@@ -1474,7 +1474,7 @@ def main():
         emit_success(stats, message="stats")
 
     elif args.command == "index-chapter":
-        scenes = load_json_arg(args.scenes)
+        scenes = load_json_arg(args.scenes, base_dir=config.project_root)
         chunks = []
 
         # summary chunk

+ 1 - 1
webnovel-writer/scripts/data_modules/sql_state_manager.py

@@ -603,7 +603,7 @@ def main():
         emit_success(data, message="alias_index")
 
     elif args.command == "process-chapter":
-        data = load_json_arg(args.data)
+        data = load_json_arg(args.data, base_dir=config.project_root)
         stats = manager.process_chapter_entities(
             chapter=args.chapter,
             entities_appeared=data.get("entities_appeared", []),

+ 1 - 1
webnovel-writer/scripts/data_modules/state_manager.py

@@ -1406,7 +1406,7 @@ def main():
         emit_success(payload, message="entities")
 
     elif args.command == "process-chapter":
-        data = load_json_arg(args.data)
+        data = load_json_arg(args.data, base_dir=args.project_root)
         validated = None
         last_exc = None
         for _ in range(3):

+ 1 - 1
webnovel-writer/scripts/data_modules/style_sampler.py

@@ -397,7 +397,7 @@ def main():
         emit_success([s.__dict__ for s in samples], message="samples")
 
     elif args.command == "extract":
-        scenes = load_json_arg(args.scenes)
+        scenes = load_json_arg(args.scenes, base_dir=sampler.config.project_root)
         candidates = sampler.extract_candidates(
             chapter=args.chapter,
             content="",

+ 25 - 0
webnovel-writer/scripts/data_modules/tests/test_coverage_boost.py

@@ -73,6 +73,31 @@ def test_load_json_arg_from_file(tmp_path):
     assert result == {"a": 1}
 
 
+def test_load_json_arg_rejects_file_outside_base_dir(tmp_path):
+    project = tmp_path / "project"
+    project.mkdir()
+    outside = tmp_path / "secret.json"
+    outside.write_text('{"secret": true}', encoding="utf-8")
+
+    with pytest.raises(ValueError, match="outside allowed directory"):
+        load_json_arg(f"@{outside}", base_dir=project)
+
+
+def test_load_json_arg_allows_file_inside_base_dir(tmp_path):
+    project = tmp_path / "project"
+    project.mkdir()
+    payload = project / "payload.json"
+    payload.write_text('{"ok": true}', encoding="utf-8")
+
+    assert load_json_arg(f"@{payload}", base_dir=project) == {"ok": True}
+
+
+def test_load_json_arg_stdin_ignores_base_dir(monkeypatch, tmp_path):
+    monkeypatch.setattr(sys, "stdin", StringIO('{"stdin": true}'))
+
+    assert load_json_arg("@-", base_dir=tmp_path) == {"stdin": True}
+
+
 def test_load_json_arg_from_stdin(monkeypatch):
     monkeypatch.setattr(sys, "stdin", StringIO('{"b":2}'))
     result = load_json_arg("@-")