test_memory_contract_adapter.py 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. """MemoryContractAdapter 集成测试。"""
  4. from __future__ import annotations
  5. import json
  6. import sys
  7. from pathlib import Path
  8. import pytest
  9. # 确保 scripts/ 在 sys.path 中
  10. _scripts_dir = str(Path(__file__).resolve().parent.parent.parent)
  11. if _scripts_dir not in sys.path:
  12. sys.path.insert(0, _scripts_dir)
  13. from data_modules.config import DataModulesConfig
  14. from data_modules.memory_contract import (
  15. CommitResult,
  16. ContextPack,
  17. EntitySnapshot,
  18. MemoryContract,
  19. OpenLoop,
  20. Rule,
  21. TimelineEvent,
  22. )
  23. from data_modules.memory_contract_adapter import MemoryContractAdapter
  24. def _make_project(tmp_path: Path) -> DataModulesConfig:
  25. """创建最小项目结构并返回配置。"""
  26. webnovel_dir = tmp_path / ".webnovel"
  27. webnovel_dir.mkdir(parents=True, exist_ok=True)
  28. (webnovel_dir / "state.json").write_text("{}", encoding="utf-8")
  29. (webnovel_dir / "summaries").mkdir(exist_ok=True)
  30. return DataModulesConfig.from_project_root(tmp_path)
  31. class TestAdapterSatisfiesProtocol:
  32. def test_isinstance_check(self, tmp_path):
  33. cfg = _make_project(tmp_path)
  34. adapter = MemoryContractAdapter(cfg)
  35. assert isinstance(adapter, MemoryContract)
  36. class TestReadSummary:
  37. def test_read_existing_summary(self, tmp_path):
  38. cfg = _make_project(tmp_path)
  39. summary_dir = cfg.webnovel_dir / "summaries"
  40. summary_dir.mkdir(parents=True, exist_ok=True)
  41. (summary_dir / "ch0010.md").write_text("第10章摘要", encoding="utf-8")
  42. adapter = MemoryContractAdapter(cfg)
  43. text = adapter.read_summary(10)
  44. assert text == "第10章摘要"
  45. def test_read_missing_summary(self, tmp_path):
  46. cfg = _make_project(tmp_path)
  47. adapter = MemoryContractAdapter(cfg)
  48. assert adapter.read_summary(999) == ""
  49. class TestQueryEntity:
  50. def test_query_nonexistent_entity(self, tmp_path):
  51. cfg = _make_project(tmp_path)
  52. adapter = MemoryContractAdapter(cfg)
  53. assert adapter.query_entity("nobody") is None
  54. def test_query_existing_entity(self, tmp_path):
  55. cfg = _make_project(tmp_path)
  56. # 写入包含实体的 state.json
  57. state = {
  58. "entities_v3": {
  59. "角色": {
  60. "xiaoyan": {
  61. "name": "萧炎",
  62. "tier": "核心",
  63. "aliases": ["他"],
  64. "realm": "斗帝",
  65. "first_appearance": 1,
  66. "last_appearance": 100,
  67. }
  68. }
  69. },
  70. "state_changes": [
  71. {"entity_id": "xiaoyan", "field": "realm", "old": "斗圣", "new": "斗帝", "chapter": 100}
  72. ],
  73. }
  74. (cfg.state_file).write_text(json.dumps(state, ensure_ascii=False), encoding="utf-8")
  75. adapter = MemoryContractAdapter(cfg)
  76. snap = adapter.query_entity("xiaoyan")
  77. assert snap is not None
  78. assert snap.name == "萧炎"
  79. assert snap.type == "角色"
  80. assert snap.tier == "核心"
  81. assert "他" in snap.aliases
  82. assert len(snap.recent_state_changes) == 1
  83. class TestQueryRules:
  84. def test_query_rules_empty(self, tmp_path):
  85. cfg = _make_project(tmp_path)
  86. adapter = MemoryContractAdapter(cfg)
  87. assert adapter.query_rules() == []
  88. def test_query_rules_with_data(self, tmp_path):
  89. cfg = _make_project(tmp_path)
  90. # 写入 scratchpad 数据
  91. from data_modules.memory.schema import MemoryItem
  92. from data_modules.memory.store import ScratchpadManager
  93. store = ScratchpadManager(cfg)
  94. store.upsert_item(MemoryItem(
  95. id="rule-1", layer="semantic", category="world_rule",
  96. subject="力量体系", field="异火数量", value="23种",
  97. status="active", source_chapter=1,
  98. ))
  99. adapter = MemoryContractAdapter(cfg)
  100. rules = adapter.query_rules()
  101. assert len(rules) == 1
  102. assert rules[0].value == "23种"
  103. assert rules[0].domain == "力量体系"
  104. def test_query_rules_filter_by_domain(self, tmp_path):
  105. cfg = _make_project(tmp_path)
  106. from data_modules.memory.schema import MemoryItem
  107. from data_modules.memory.store import ScratchpadManager
  108. store = ScratchpadManager(cfg)
  109. store.upsert_item(MemoryItem(
  110. id="rule-1", layer="semantic", category="world_rule",
  111. subject="力量体系", field="异火数量", value="23种",
  112. status="active", source_chapter=1,
  113. ))
  114. store.upsert_item(MemoryItem(
  115. id="rule-2", layer="semantic", category="world_rule",
  116. subject="社会结构", field="帝国数量", value="4个",
  117. status="active", source_chapter=2,
  118. ))
  119. adapter = MemoryContractAdapter(cfg)
  120. rules = adapter.query_rules(domain="力量体系")
  121. assert len(rules) == 1
  122. assert rules[0].field == "异火数量"
  123. class TestGetOpenLoops:
  124. def test_get_open_loops_empty(self, tmp_path):
  125. cfg = _make_project(tmp_path)
  126. adapter = MemoryContractAdapter(cfg)
  127. assert adapter.get_open_loops() == []
  128. def test_get_open_loops_with_data(self, tmp_path):
  129. cfg = _make_project(tmp_path)
  130. from data_modules.memory.schema import MemoryItem
  131. from data_modules.memory.store import ScratchpadManager
  132. store = ScratchpadManager(cfg)
  133. store.upsert_item(MemoryItem(
  134. id="ol-1", layer="semantic", category="open_loop",
  135. subject="三年之约", field="", value="萧炎与纳兰嫣然三年之约",
  136. status="active", source_chapter=1,
  137. payload={"expected_payoff": "大比", "urgency": 0.9},
  138. ))
  139. adapter = MemoryContractAdapter(cfg)
  140. loops = adapter.get_open_loops()
  141. assert len(loops) == 1
  142. assert loops[0].content == "萧炎与纳兰嫣然三年之约"
  143. assert loops[0].urgency == 0.9
  144. class TestGetTimeline:
  145. def test_get_timeline_empty(self, tmp_path):
  146. cfg = _make_project(tmp_path)
  147. adapter = MemoryContractAdapter(cfg)
  148. assert adapter.get_timeline(1, 100) == []
  149. def test_get_timeline_filters_by_range(self, tmp_path):
  150. cfg = _make_project(tmp_path)
  151. from data_modules.memory.schema import MemoryItem
  152. from data_modules.memory.store import ScratchpadManager
  153. store = ScratchpadManager(cfg)
  154. for ch in [5, 10, 50, 100]:
  155. store.upsert_item(MemoryItem(
  156. id=f"tl-{ch}", layer="semantic", category="timeline",
  157. subject="事件", field=f"第{ch}章时", value=f"事件{ch}",
  158. status="active", source_chapter=ch,
  159. ))
  160. adapter = MemoryContractAdapter(cfg)
  161. events = adapter.get_timeline(8, 55)
  162. assert len(events) == 2
  163. assert events[0].chapter == 10
  164. assert events[1].chapter == 50
  165. class TestLoadContext:
  166. def test_load_context_returns_context_pack(self, tmp_path):
  167. cfg = _make_project(tmp_path)
  168. adapter = MemoryContractAdapter(cfg)
  169. pack = adapter.load_context(10)
  170. assert isinstance(pack, ContextPack)
  171. assert pack.chapter == 10
  172. class TestCommitChapter:
  173. def test_commit_chapter_basic(self, tmp_path):
  174. cfg = _make_project(tmp_path)
  175. adapter = MemoryContractAdapter(cfg)
  176. result = adapter.commit_chapter(1, {
  177. "entities_appeared": [{"id": "xiaoyan", "type": "角色"}],
  178. "entities_new": [],
  179. "state_changes": [],
  180. "relationships_new": [],
  181. })
  182. assert isinstance(result, CommitResult)
  183. assert result.chapter == 1
  184. assert result.entities_updated == 1