"""presegment PR2 — deep_summary_worker map-reduce/HOLD 배선 단위테스트. worker-process 레벨(DB 필요)의 큐 상태 전이는 라이브 E2E 로 검증하고, 여기서는 새 메커니즘의 seam 을 단위 검증한다 (test_fair_share.py 선례): - _hold_awaiting_split: payload 마킹 commit 후 StageDeferred(HOLD_RETRY_MINUTES). - _process_map_reduce: 유닛별 map → reduce → doc 필드 기록 / 모든 콜 캡 준수 / payload.presegment.map_results 유닛 단위 persist(멱등 재개) / 실패 유닛 raise / drain 보류(StageDeferred) 시 완료 유닛 보존. """ from __future__ import annotations import os import sys from types import SimpleNamespace import pytest sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "app")) from ai.envelope import EscalationEnvelope # noqa: E402 from models.queue import StageDeferred # noqa: E402 from services.summarize_units import ( # noqa: E402 CAP_TOKENS, estimate_tokens, plan_summarize_units, ) import workers.deep_summary_worker as dsw # noqa: E402 # ─── fixtures ──────────────────────────────────────────────────────────────── # 30 절 × 한글 2,000자 ≈ 31.7K tok (> TRIGGER 25K) · 절당 ≈ 1,060 tok (< CAP) → auto GIANT_AUTO_MD = "\n".join(f"# 절 {i}\n" + ("가" * 2_000) for i in range(30)) # 헤딩 1개 + 한글 60,000자 단일 섹션 ≈ 31.7K tok (> CAP) → over% 100 → whole GIANT_WHOLE_MD = "# 통짜\n" + ("가" * 60_000) MAP_JSON = ( '{"mode": "single", "tldr": "유닛 요약", "detail": "유닛 상세.",' ' "inconsistencies": [{"kind": "version_drift", "desc": "개정판 차이"}],' ' "confidence": 0.9}' ) REDUCE_JSON = ( '{"mode": "single", "tldr": "전체 요약", "detail": "최종 상세.",' ' "inconsistencies": [], "confidence": 0.8}' ) class FakeSession: """commit 시점의 queue_row.payload 를 **객체 참조**로 박제 — SQLAlchemy 의 committed 스냅샷과 동일하게, 이후 in-place 변경이 과거 커밋 객체에 소급 반영되는 aliasing (60254 라이브에서 unit 0 만 persist 된 버그)을 검증 시점 직렬화로 탐지한다.""" def __init__(self, row=None): self.commits = 0 self._row = row self.snapshots: list = [] async def commit(self): self.commits += 1 if self._row is not None: self.snapshots.append(self._row.payload) # 참조 박제 — 복사 금지(의도) class FakeClient: """deep 슬롯 보유 클라이언트 — call_deep_or_defer 가 call_deep 을 타게 한다.""" def __init__(self, responses=None, fail_indexes=frozenset(), defer_from=None): self.ai = SimpleNamespace( deep=SimpleNamespace(model="qwen-macbook", context_char_limit=260_000) ) self.prompts: list[str] = [] self._fail_indexes = fail_indexes # 이 순번(0-based) 콜은 파싱 불가 응답 self._defer_from = defer_from # 이 순번부터 연결 실패(StageDeferred 변환 대상) async def call_deep(self, prompt: str, system=None) -> str: import httpx idx = len(self.prompts) if self._defer_from is not None and idx >= self._defer_from: raise httpx.ConnectError("macbook down") self.prompts.append(prompt) if idx in self._fail_indexes: return "정상 JSON 아님" if "유닛 요약 (총" in prompt: # reduce 프롬프트 마커 return REDUCE_JSON return MAP_JSON async def close(self): pass def _doc(): return SimpleNamespace( id=999, extracted_text=GIANT_AUTO_MD, ai_detail_summary=None, ai_inconsistencies=None, ai_analysis_tier="triage", ai_processed_at=None, ) def _envelope(): return EscalationEnvelope( from_stage="classify", escalation_reasons=("long_context",), risk_flags=(), distilled_context="4B 요지", original_pointers={"doc_ids": [999]}, ) @pytest.fixture def _patch_telemetry(monkeypatch): events: list[dict] = [] async def fake_record(**kwargs): events.append(kwargs) monkeypatch.setattr(dsw, "record_analyze_event", fake_record) return events # ─── _hold_awaiting_split ──────────────────────────────────────────────────── @pytest.mark.asyncio async def test_hold_marks_payload_and_defers(): plan = plan_summarize_units(GIANT_WHOLE_MD) assert plan.mode == "map_reduce" and plan.tier == "whole" session, row = FakeSession(), SimpleNamespace(payload={"envelope": {"x": 1}}) with pytest.raises(StageDeferred) as ei: await dsw._hold_awaiting_split(session, row, plan, document_id=999) assert ei.value.retry_after_minutes == dsw.HOLD_RETRY_MINUTES assert session.commits == 1 # 마킹이 defer 전에 commit — consumer 재읽기에서 보존 preseg = row.payload["presegment"] assert preseg["awaiting_split"] is True assert preseg["tier"] == "whole" assert preseg["units"] == len(plan.units) assert row.payload["envelope"] == {"x": 1} # 기존 payload 병합 보존 # ─── _process_map_reduce — 정상 경로 ──────────────────────────────────────── @pytest.mark.asyncio async def test_map_reduce_end_to_end(monkeypatch, _patch_telemetry): plan = plan_summarize_units(GIANT_AUTO_MD) assert plan.mode == "map_reduce" and plan.tier == "auto" n = len(plan.units) assert n >= 2 # greedy-pack 이 실제로 유닛을 나눴는지 client = FakeClient() monkeypatch.setattr(dsw, "AIClient", lambda: client) doc = _doc() row = SimpleNamespace(payload={"envelope": {"x": 1}}) session = FakeSession(row) await dsw._process_map_reduce( doc, row, _envelope(), "generic", plan, session, defer_on_deep_unavailable=False, ) # 콜 수 = 유닛 map n + reduce 1 assert len(client.prompts) == n + 1 # 검증 게이트: 모든 콜 est_tokens <= CAP + 오버헤드(정책 템플릿+envelope ~3K) for p in client.prompts: assert estimate_tokens(p) <= CAP_TOKENS + 3_000 # doc 기록 = reduce 출력, 불일치 = map 유닛 합본 dedup assert doc.ai_detail_summary == "최종 상세." assert doc.ai_analysis_tier == "deep" assert doc.ai_inconsistencies == [{"kind": "version_drift", "desc": "개정판 차이"}] # 유닛 단위 persist — 유닛마다 commit assert row.payload["presegment"]["units"] == n assert len(row.payload["presegment"]["map_results"]) == n assert session.commits == n # ★aliasing 회귀 방지: 각 commit 이 박제한 payload 객체를 사후에 봤을 때 # map_results 가 1,2,...,n 로 단조 증가해야 한다. in-place 변경(구 버그)이면 # 모든 스냅샷이 같은 dict 를 공유해 [n,n,...,n] 으로 보인다 = SQLAlchemy 가 # committed 스냅샷과 new 가 같다고 판정해 UPDATE 를 스킵하는 것과 등가. per_commit_units = [ len(s["presegment"]["map_results"]) for s in session.snapshots ] assert per_commit_units == list(range(1, n + 1)) # telemetry 1건 (reduce 기준) events = _patch_telemetry assert len(events) == 1 and events[0]["error_code"] is None # ─── 멱등 재개 ─────────────────────────────────────────────────────────────── @pytest.mark.asyncio async def test_map_reduce_resume_skips_done_units(monkeypatch, _patch_telemetry): plan = plan_summarize_units(GIANT_AUTO_MD) n = len(plan.units) client = FakeClient() monkeypatch.setattr(dsw, "AIClient", lambda: client) done_unit = { "index": 0, "titles": ["절 0"], "tldr": "이전 요약", "detail": "이전 상세.", "inconsistencies": [], } row = SimpleNamespace(payload={ "envelope": {"x": 1}, "presegment": {"map_results": {"0": done_unit}}, }) doc, session = _doc(), FakeSession() await dsw._process_map_reduce( doc, row, _envelope(), "generic", plan, session, defer_on_deep_unavailable=False, ) # 유닛 0 은 재호출 안 함 — map (n-1) + reduce 1 assert len(client.prompts) == n assert row.payload["presegment"]["map_results"]["0"]["detail"] == "이전 상세." assert doc.ai_detail_summary == "최종 상세." # ─── map 유닛 실패 → raise (성공분 persist) ───────────────────────────────── @pytest.mark.asyncio async def test_map_unit_parse_failure_raises_but_persists_good_units( monkeypatch, _patch_telemetry ): plan = plan_summarize_units(GIANT_AUTO_MD) n = len(plan.units) client = FakeClient(fail_indexes={1}) # 두 번째 map 콜만 파싱 불가 monkeypatch.setattr(dsw, "AIClient", lambda: client) doc, session = _doc(), FakeSession() row = SimpleNamespace(payload={"envelope": {"x": 1}}) with pytest.raises(ValueError, match="map 유닛"): await dsw._process_map_reduce( doc, row, _envelope(), "generic", plan, session, defer_on_deep_unavailable=False, ) # 성공 유닛(n-1)은 persist — 재시도 시 실패 1건만 재호출 assert len(row.payload["presegment"]["map_results"]) == n - 1 assert "1" not in row.payload["presegment"]["map_results"] assert doc.ai_detail_summary is None # doc 은 미기록 assert _patch_telemetry == [] # 가짜 완료 이벤트 없음 # ─── drain 보류 — 완료 유닛 보존 + StageDeferred 전파 ─────────────────────── @pytest.mark.asyncio async def test_map_defer_propagates_and_keeps_progress(monkeypatch, _patch_telemetry): plan = plan_summarize_units(GIANT_AUTO_MD) client = FakeClient(defer_from=1) # 첫 유닛 성공 후 맥북 절단 monkeypatch.setattr(dsw, "AIClient", lambda: client) doc, session = _doc(), FakeSession() row = SimpleNamespace(payload={"envelope": {"x": 1}}) with pytest.raises(StageDeferred): await dsw._process_map_reduce( doc, row, _envelope(), "generic", plan, session, defer_on_deep_unavailable=True, # drain 시멘틱 — 보류 전파 ) assert len(row.payload["presegment"]["map_results"]) == 1 assert doc.ai_detail_summary is None