3b7fd900e4
60254 라이브 E2E 에서 발견: 완주는 성공했으나 payload.presegment.map_results 에 unit 0 만 persist. 원인 = map_results dict 를 in-place 변경 → 직전 commit 의 SQLAlchemy committed 스냅샷이 같은 중첩 객체를 참조 → old==new 판정 → 2번째 commit 부터 UPDATE 스킵. 멱등 재개 시 완료 유닛 재호출 비용 발생(정확성 무영향). fix = 매 유닛 map_results/preseg/payload 전부 새 dict 재구성(공유 참조 0). test = FakeSession 이 commit 시점 payload 객체 참조를 박제, 사후 직렬화로 스냅샷 유닛 수가 1..n 단조 증가 단정 — 구 코드에 대해 FAILED 네거티브 검증 완료. Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
267 lines
10 KiB
Python
267 lines
10 KiB
Python
"""presegment PR2 — deep_summary_worker map-reduce/HOLD 배선 단위테스트.
|
||
|
||
worker-process 레벨(DB 필요)의 큐 상태 전이는 라이브 E2E 로 검증하고, 여기서는
|
||
새 메커니즘의 seam 을 단위 검증한다 (test_fair_share.py 선례):
|
||
- _hold_awaiting_split: payload 마킹 commit 후 StageDeferred(HOLD_RETRY_MINUTES).
|
||
- _process_map_reduce: 유닛별 map → reduce → doc 필드 기록 / 모든 콜 캡 준수 /
|
||
payload.presegment.map_results 유닛 단위 persist(멱등 재개) / 실패 유닛 raise /
|
||
drain 보류(StageDeferred) 시 완료 유닛 보존.
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import os
|
||
import sys
|
||
from types import SimpleNamespace
|
||
|
||
import pytest
|
||
|
||
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "app"))
|
||
|
||
from ai.envelope import EscalationEnvelope # noqa: E402
|
||
from models.queue import StageDeferred # noqa: E402
|
||
from services.summarize_units import ( # noqa: E402
|
||
CAP_TOKENS,
|
||
estimate_tokens,
|
||
plan_summarize_units,
|
||
)
|
||
import workers.deep_summary_worker as dsw # noqa: E402
|
||
|
||
|
||
# ─── fixtures ────────────────────────────────────────────────────────────────
|
||
|
||
# 30 절 × 한글 2,000자 ≈ 31.7K tok (> TRIGGER 25K) · 절당 ≈ 1,060 tok (< CAP) → auto
|
||
GIANT_AUTO_MD = "\n".join(f"# 절 {i}\n" + ("가" * 2_000) for i in range(30))
|
||
# 헤딩 1개 + 한글 60,000자 단일 섹션 ≈ 31.7K tok (> CAP) → over% 100 → whole
|
||
GIANT_WHOLE_MD = "# 통짜\n" + ("가" * 60_000)
|
||
|
||
MAP_JSON = (
|
||
'{"mode": "single", "tldr": "유닛 요약", "detail": "유닛 상세.",'
|
||
' "inconsistencies": [{"kind": "version_drift", "desc": "개정판 차이"}],'
|
||
' "confidence": 0.9}'
|
||
)
|
||
REDUCE_JSON = (
|
||
'{"mode": "single", "tldr": "전체 요약", "detail": "최종 상세.",'
|
||
' "inconsistencies": [], "confidence": 0.8}'
|
||
)
|
||
|
||
|
||
class FakeSession:
|
||
"""commit 시점의 queue_row.payload 를 **객체 참조**로 박제 — SQLAlchemy 의 committed
|
||
스냅샷과 동일하게, 이후 in-place 변경이 과거 커밋 객체에 소급 반영되는 aliasing
|
||
(60254 라이브에서 unit 0 만 persist 된 버그)을 검증 시점 직렬화로 탐지한다."""
|
||
|
||
def __init__(self, row=None):
|
||
self.commits = 0
|
||
self._row = row
|
||
self.snapshots: list = []
|
||
|
||
async def commit(self):
|
||
self.commits += 1
|
||
if self._row is not None:
|
||
self.snapshots.append(self._row.payload) # 참조 박제 — 복사 금지(의도)
|
||
|
||
|
||
class FakeClient:
|
||
"""deep 슬롯 보유 클라이언트 — call_deep_or_defer 가 call_deep 을 타게 한다."""
|
||
|
||
def __init__(self, responses=None, fail_indexes=frozenset(), defer_from=None):
|
||
self.ai = SimpleNamespace(
|
||
deep=SimpleNamespace(model="qwen-macbook", context_char_limit=260_000)
|
||
)
|
||
self.prompts: list[str] = []
|
||
self._fail_indexes = fail_indexes # 이 순번(0-based) 콜은 파싱 불가 응답
|
||
self._defer_from = defer_from # 이 순번부터 연결 실패(StageDeferred 변환 대상)
|
||
|
||
async def call_deep(self, prompt: str, system=None) -> str:
|
||
import httpx
|
||
|
||
idx = len(self.prompts)
|
||
if self._defer_from is not None and idx >= self._defer_from:
|
||
raise httpx.ConnectError("macbook down")
|
||
self.prompts.append(prompt)
|
||
if idx in self._fail_indexes:
|
||
return "정상 JSON 아님"
|
||
if "유닛 요약 (총" in prompt: # reduce 프롬프트 마커
|
||
return REDUCE_JSON
|
||
return MAP_JSON
|
||
|
||
async def close(self):
|
||
pass
|
||
|
||
|
||
def _doc():
|
||
return SimpleNamespace(
|
||
id=999,
|
||
extracted_text=GIANT_AUTO_MD,
|
||
ai_detail_summary=None,
|
||
ai_inconsistencies=None,
|
||
ai_analysis_tier="triage",
|
||
ai_processed_at=None,
|
||
)
|
||
|
||
|
||
def _envelope():
|
||
return EscalationEnvelope(
|
||
from_stage="classify",
|
||
escalation_reasons=("long_context",),
|
||
risk_flags=(),
|
||
distilled_context="4B 요지",
|
||
original_pointers={"doc_ids": [999]},
|
||
)
|
||
|
||
|
||
@pytest.fixture
|
||
def _patch_telemetry(monkeypatch):
|
||
events: list[dict] = []
|
||
|
||
async def fake_record(**kwargs):
|
||
events.append(kwargs)
|
||
|
||
monkeypatch.setattr(dsw, "record_analyze_event", fake_record)
|
||
return events
|
||
|
||
|
||
# ─── _hold_awaiting_split ────────────────────────────────────────────────────
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_hold_marks_payload_and_defers():
|
||
plan = plan_summarize_units(GIANT_WHOLE_MD)
|
||
assert plan.mode == "map_reduce" and plan.tier == "whole"
|
||
|
||
session, row = FakeSession(), SimpleNamespace(payload={"envelope": {"x": 1}})
|
||
with pytest.raises(StageDeferred) as ei:
|
||
await dsw._hold_awaiting_split(session, row, plan, document_id=999)
|
||
|
||
assert ei.value.retry_after_minutes == dsw.HOLD_RETRY_MINUTES
|
||
assert session.commits == 1 # 마킹이 defer 전에 commit — consumer 재읽기에서 보존
|
||
preseg = row.payload["presegment"]
|
||
assert preseg["awaiting_split"] is True
|
||
assert preseg["tier"] == "whole"
|
||
assert preseg["units"] == len(plan.units)
|
||
assert row.payload["envelope"] == {"x": 1} # 기존 payload 병합 보존
|
||
|
||
|
||
# ─── _process_map_reduce — 정상 경로 ────────────────────────────────────────
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_map_reduce_end_to_end(monkeypatch, _patch_telemetry):
|
||
plan = plan_summarize_units(GIANT_AUTO_MD)
|
||
assert plan.mode == "map_reduce" and plan.tier == "auto"
|
||
n = len(plan.units)
|
||
assert n >= 2 # greedy-pack 이 실제로 유닛을 나눴는지
|
||
|
||
client = FakeClient()
|
||
monkeypatch.setattr(dsw, "AIClient", lambda: client)
|
||
doc = _doc()
|
||
row = SimpleNamespace(payload={"envelope": {"x": 1}})
|
||
session = FakeSession(row)
|
||
|
||
await dsw._process_map_reduce(
|
||
doc, row, _envelope(), "generic", plan, session,
|
||
defer_on_deep_unavailable=False,
|
||
)
|
||
|
||
# 콜 수 = 유닛 map n + reduce 1
|
||
assert len(client.prompts) == n + 1
|
||
# 검증 게이트: 모든 콜 est_tokens <= CAP + 오버헤드(정책 템플릿+envelope ~3K)
|
||
for p in client.prompts:
|
||
assert estimate_tokens(p) <= CAP_TOKENS + 3_000
|
||
# doc 기록 = reduce 출력, 불일치 = map 유닛 합본 dedup
|
||
assert doc.ai_detail_summary == "최종 상세."
|
||
assert doc.ai_analysis_tier == "deep"
|
||
assert doc.ai_inconsistencies == [{"kind": "version_drift", "desc": "개정판 차이"}]
|
||
# 유닛 단위 persist — 유닛마다 commit
|
||
assert row.payload["presegment"]["units"] == n
|
||
assert len(row.payload["presegment"]["map_results"]) == n
|
||
assert session.commits == n
|
||
# ★aliasing 회귀 방지: 각 commit 이 박제한 payload 객체를 사후에 봤을 때
|
||
# map_results 가 1,2,...,n 로 단조 증가해야 한다. in-place 변경(구 버그)이면
|
||
# 모든 스냅샷이 같은 dict 를 공유해 [n,n,...,n] 으로 보인다 = SQLAlchemy 가
|
||
# committed 스냅샷과 new 가 같다고 판정해 UPDATE 를 스킵하는 것과 등가.
|
||
per_commit_units = [
|
||
len(s["presegment"]["map_results"]) for s in session.snapshots
|
||
]
|
||
assert per_commit_units == list(range(1, n + 1))
|
||
# telemetry 1건 (reduce 기준)
|
||
events = _patch_telemetry
|
||
assert len(events) == 1 and events[0]["error_code"] is None
|
||
|
||
|
||
# ─── 멱등 재개 ───────────────────────────────────────────────────────────────
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_map_reduce_resume_skips_done_units(monkeypatch, _patch_telemetry):
|
||
plan = plan_summarize_units(GIANT_AUTO_MD)
|
||
n = len(plan.units)
|
||
|
||
client = FakeClient()
|
||
monkeypatch.setattr(dsw, "AIClient", lambda: client)
|
||
done_unit = {
|
||
"index": 0, "titles": ["절 0"], "tldr": "이전 요약", "detail": "이전 상세.",
|
||
"inconsistencies": [],
|
||
}
|
||
row = SimpleNamespace(payload={
|
||
"envelope": {"x": 1},
|
||
"presegment": {"map_results": {"0": done_unit}},
|
||
})
|
||
doc, session = _doc(), FakeSession()
|
||
|
||
await dsw._process_map_reduce(
|
||
doc, row, _envelope(), "generic", plan, session,
|
||
defer_on_deep_unavailable=False,
|
||
)
|
||
|
||
# 유닛 0 은 재호출 안 함 — map (n-1) + reduce 1
|
||
assert len(client.prompts) == n
|
||
assert row.payload["presegment"]["map_results"]["0"]["detail"] == "이전 상세."
|
||
assert doc.ai_detail_summary == "최종 상세."
|
||
|
||
|
||
# ─── map 유닛 실패 → raise (성공분 persist) ─────────────────────────────────
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_map_unit_parse_failure_raises_but_persists_good_units(
|
||
monkeypatch, _patch_telemetry
|
||
):
|
||
plan = plan_summarize_units(GIANT_AUTO_MD)
|
||
n = len(plan.units)
|
||
|
||
client = FakeClient(fail_indexes={1}) # 두 번째 map 콜만 파싱 불가
|
||
monkeypatch.setattr(dsw, "AIClient", lambda: client)
|
||
doc, session = _doc(), FakeSession()
|
||
row = SimpleNamespace(payload={"envelope": {"x": 1}})
|
||
|
||
with pytest.raises(ValueError, match="map 유닛"):
|
||
await dsw._process_map_reduce(
|
||
doc, row, _envelope(), "generic", plan, session,
|
||
defer_on_deep_unavailable=False,
|
||
)
|
||
|
||
# 성공 유닛(n-1)은 persist — 재시도 시 실패 1건만 재호출
|
||
assert len(row.payload["presegment"]["map_results"]) == n - 1
|
||
assert "1" not in row.payload["presegment"]["map_results"]
|
||
assert doc.ai_detail_summary is None # doc 은 미기록
|
||
assert _patch_telemetry == [] # 가짜 완료 이벤트 없음
|
||
|
||
|
||
# ─── drain 보류 — 완료 유닛 보존 + StageDeferred 전파 ───────────────────────
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_map_defer_propagates_and_keeps_progress(monkeypatch, _patch_telemetry):
|
||
plan = plan_summarize_units(GIANT_AUTO_MD)
|
||
|
||
client = FakeClient(defer_from=1) # 첫 유닛 성공 후 맥북 절단
|
||
monkeypatch.setattr(dsw, "AIClient", lambda: client)
|
||
doc, session = _doc(), FakeSession()
|
||
row = SimpleNamespace(payload={"envelope": {"x": 1}})
|
||
|
||
with pytest.raises(StageDeferred):
|
||
await dsw._process_map_reduce(
|
||
doc, row, _envelope(), "generic", plan, session,
|
||
defer_on_deep_unavailable=True, # drain 시멘틱 — 보류 전파
|
||
)
|
||
|
||
assert len(row.payload["presegment"]["map_results"]) == 1
|
||
assert doc.ai_detail_summary is None
|