From a55bb3453ddd68eac5a23e360a1a9ea0077d0a9c Mon Sep 17 00:00:00 2001 From: hyungi Date: Fri, 3 Jul 2026 05:54:23 +0900 Subject: [PATCH] =?UTF-8?q?feat(presegment):=20PR3=20=ED=85=8C=EC=8A=A4?= =?UTF-8?q?=ED=8A=B8=20=E2=80=94=20=EC=95=8C=EB=9E=8C=C2=B7dedupe=C2=B7ove?= =?UTF-8?q?rride=20=EA=B2=80=EC=A6=9D=C2=B7=EC=9E=AC=EA=B0=9C=C2=B7?= =?UTF-8?q?=EB=AC=B4=ED=9A=8C=EA=B7=80=2019=EA=B1=B4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit tests/test_presegment_pr3.py: alerts no-op(env 미설정, 프로세스당 1회 로그)/ synochat·ntfy 포맷/실패 무raise(webhook 전부 fake — 실호출 0), HOLD 알람 발화+alerted_at 7일 dedupe, validate_override_boundaries(정상/dict형/중첩/ 캡초과/커버리지 부족/범위 밖/TODO 잔존/공백 경고), leaf_spans 원문 재구성, units_override 가 tier 판정(plan_summarize_units) 우회하고 map-reduce 재개, 잘못된 override(캡 초과·source_len 불일치)=재-HOLD+알람+LLM 콜 0, override 없는 소형(단일콜)·whole(HOLD+알람) 문서 무회귀. 기존 test_summarize_units 26 + test_deep_summary_mapreduce 등 인접 100건 pass 유지 (test_pipeline_hold 1건 실패는 main 기존 결함 — 본 PR 무관). Co-Authored-By: Claude Fable 5 --- tests/test_presegment_pr3.py | 486 +++++++++++++++++++++++++++++++++++ 1 file changed, 486 insertions(+) create mode 100644 tests/test_presegment_pr3.py diff --git a/tests/test_presegment_pr3.py b/tests/test_presegment_pr3.py new file mode 100644 index 0000000..9f26755 --- /dev/null +++ b/tests/test_presegment_pr3.py @@ -0,0 +1,486 @@ +"""presegment PR3 — HOLD 알람·units_override 재개·유인 분할 경계 검증 단위테스트. + +test_deep_summary_mapreduce.py (PR2) 선례를 따르는 seam 단위 검증: + - services.alerts.send_alert: env 미설정 no-op(프로세스당 1회 로그)·synochat/ntfy + 포맷·실패 시 절대 raise 금지 (webhook 은 전부 fake — 실호출 0). + - _hold_awaiting_split: 알람 발화 + alerted_at dedupe(7일). + - validate_override_boundaries: 정상/중첩/캡초과/커버리지 부족/TODO 잔존/범위 밖. + - process(): units_override 존재 시 tier 판정(plan_summarize_units) 우회 → + map-reduce 재개 / 잘못된 override 는 재-HOLD + 알람 / override 없는 문서 무회귀. +""" + +from __future__ import annotations + +import json +import os +import sys +from datetime import datetime, timedelta, timezone +from types import SimpleNamespace + +import pytest + +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "app")) + +import httpx # noqa: E402 + +import services.alerts as alerts # noqa: E402 +from ai.envelope import EscalationEnvelope # noqa: E402 +from models.queue import StageDeferred # noqa: E402 +from services.summarize_units import ( # noqa: E402 + CAP_TOKENS, + choose_override_source, + estimate_tokens, + extract_leaves, + leaf_spans, + plan_summarize_units, + units_from_boundaries, + validate_override_boundaries, +) +import workers.deep_summary_worker as dsw # noqa: E402 + + +# ─── fixtures (PR2 테스트와 동일 계열) ─────────────────────────────────────── + +# 헤딩 1개 + 한글 60,000자 단일 섹션 ≈ 31.7K tok (> CAP) → over% 100 → whole (HOLD 대상) +GIANT_WHOLE_MD = "# 통짜\n" + ("가" * 60_000) +# trigger(25K tok) 이하 소형 문서 — 기존 단일콜 경로 (무회귀 확인용) +SMALL_MD = "# 소형\n" + ("가" * 2_000) + +MAP_JSON = ( + '{"mode": "single", "tldr": "유닛 요약", "detail": "유닛 상세.",' + ' "inconsistencies": [], "confidence": 0.9}' +) +REDUCE_JSON = ( + '{"mode": "single", "tldr": "전체 요약", "detail": "최종 상세.",' + ' "inconsistencies": [], "confidence": 0.8}' +) + + +class FakeSession: + def __init__(self, row=None): + self.commits = 0 + self._row = row + + async def commit(self): + self.commits += 1 + + +class FakeProcSession(FakeSession): + """process() 레벨 — session.get(Document) + execute(select queue_row) fake.""" + + def __init__(self, doc, row): + super().__init__(row) + self._doc = doc + + async def get(self, model, pk): + return self._doc + + async def execute(self, stmt): + row = self._row + return SimpleNamespace(scalar_one_or_none=lambda: row) + + +class FakeClient: + """deep 슬롯 보유 — call_deep_or_defer 가 call_deep 을 탄다 (PR2 테스트 동일).""" + + def __init__(self): + self.ai = SimpleNamespace( + deep=SimpleNamespace(model="qwen-macbook", context_char_limit=260_000) + ) + self.prompts: list[str] = [] + + async def call_deep(self, prompt: str, system=None) -> str: + self.prompts.append(prompt) + if "유닛 요약 (총" in prompt: # reduce 프롬프트 마커 + return REDUCE_JSON + return MAP_JSON + + async def close(self): + pass + + +def _doc(text: str = GIANT_WHOLE_MD, md_content: str | None = None): + return SimpleNamespace( + id=999, + title="테스트 문서", + extracted_text=text, + md_content=md_content, + ai_detail_summary=None, + ai_inconsistencies=None, + ai_analysis_tier="triage", + ai_processed_at=None, + ) + + +def _envelope_raw(): + return { + "from_stage": "classify", + "escalation_reasons": ["long_context"], + "risk_flags": [], + "distilled_context": "4B 요지", + "original_pointers": {"doc_ids": [999]}, + } + + +def _envelope(): + return EscalationEnvelope.from_json(json.dumps(_envelope_raw())) + + +@pytest.fixture +def _patch_telemetry(monkeypatch): + events: list[dict] = [] + + async def fake_record(**kwargs): + events.append(kwargs) + + monkeypatch.setattr(dsw, "record_analyze_event", fake_record) + return events + + +@pytest.fixture +def _alert_recorder(monkeypatch): + calls: list[tuple[str, str]] = [] + + async def fake_alert(title: str, message: str) -> bool: + calls.append((title, message)) + return True + + monkeypatch.setattr(dsw, "send_alert", fake_alert) + return calls + + +# ─── A. services.alerts ────────────────────────────────────────────────────── + +class _FakeHttpxClient: + """httpx.AsyncClient 대체 — post 호출 캡처. 실호출 0.""" + + captured: list[dict] = [] + fail_with: Exception | None = None + status_code = 200 + + def __init__(self, timeout=None): + self.timeout = timeout + + async def __aenter__(self): + return self + + async def __aexit__(self, *exc): + return False + + async def post(self, url, **kwargs): + if _FakeHttpxClient.fail_with is not None: + raise _FakeHttpxClient.fail_with + _FakeHttpxClient.captured.append({"url": url, **kwargs}) + return SimpleNamespace(status_code=_FakeHttpxClient.status_code, text="ok") + + +@pytest.fixture +def _fake_httpx(monkeypatch): + _FakeHttpxClient.captured = [] + _FakeHttpxClient.fail_with = None + _FakeHttpxClient.status_code = 200 + monkeypatch.setattr(alerts.httpx, "AsyncClient", _FakeHttpxClient) + return _FakeHttpxClient + + +@pytest.mark.asyncio +async def test_send_alert_noop_without_env(monkeypatch, _fake_httpx): + monkeypatch.delenv("ALERT_WEBHOOK_URL", raising=False) + monkeypatch.setattr(alerts, "_noop_logged", False) + infos: list[str] = [] + monkeypatch.setattr(alerts.logger, "info", lambda msg, *a, **k: infos.append(msg)) + + assert await alerts.send_alert("t", "m") is False + assert await alerts.send_alert("t", "m") is False + assert len(infos) == 1 # 프로세스당 1회만 로그 + assert _fake_httpx.captured == [] # webhook 미호출 + + +@pytest.mark.asyncio +async def test_send_alert_synochat_format(monkeypatch, _fake_httpx): + monkeypatch.setenv("ALERT_WEBHOOK_URL", "http://chat.local/webhook") + monkeypatch.delenv("ALERT_WEBHOOK_KIND", raising=False) # 기본 = synochat + + assert await alerts.send_alert("제목", "본문 줄1\n줄2") is True + assert len(_fake_httpx.captured) == 1 + call = _fake_httpx.captured[0] + assert call["url"] == "http://chat.local/webhook" + payload = json.loads(call["data"]["payload"]) + assert payload == {"text": "제목\n본문 줄1\n줄2"} + + +@pytest.mark.asyncio +async def test_send_alert_ntfy_format(monkeypatch, _fake_httpx): + monkeypatch.setenv("ALERT_WEBHOOK_URL", "http://ntfy.local/ds") + monkeypatch.setenv("ALERT_WEBHOOK_KIND", "ntfy") + + assert await alerts.send_alert("한글 제목", "알람 본문") is True + call = _fake_httpx.captured[0] + # 한글 제목은 헤더(latin-1 한정) 대신 query param + assert call["params"] == {"title": "한글 제목"} + assert call["content"] == "알람 본문".encode("utf-8") + + +@pytest.mark.asyncio +async def test_send_alert_failure_never_raises(monkeypatch, _fake_httpx): + monkeypatch.setenv("ALERT_WEBHOOK_URL", "http://chat.local/webhook") + _fake_httpx.fail_with = httpx.ConnectError("down") + assert await alerts.send_alert("t", "m") is False # raise 없이 False + + _fake_httpx.fail_with = None + _fake_httpx.status_code = 500 + assert await alerts.send_alert("t", "m") is False # HTTP 5xx 도 False + + +# ─── B. HOLD 알람 + alerted_at dedupe ─────────────────────────────────────── + +@pytest.mark.asyncio +async def test_hold_alerts_once_then_dedupes(_alert_recorder): + plan = plan_summarize_units(GIANT_WHOLE_MD) + assert plan.tier == "whole" + row = SimpleNamespace(payload={"envelope": {"x": 1}}) + session = FakeSession() + + # 1차 HOLD — 알람 발화 + alerted_at 기록 + with pytest.raises(StageDeferred): + await dsw._hold_awaiting_split(session, row, plan, 999, doc_title="테스트 문서") + assert len(_alert_recorder) == 1 + title, message = _alert_recorder[0] + assert "999" in title + assert "테스트 문서" in message and "whole" in message + assert f"export --doc 999" in message # 유인 분할 힌트 + alerted_at = row.payload["presegment"]["alerted_at"] + assert datetime.fromisoformat(alerted_at) + + # 2차 재보류(24h 후 재계획 시나리오) — 7일 이내 → 재알람 억제 + with pytest.raises(StageDeferred): + await dsw._hold_awaiting_split(session, row, plan, 999, doc_title="테스트 문서") + assert len(_alert_recorder) == 1 + assert row.payload["presegment"]["alerted_at"] == alerted_at # 미갱신 + + # 3차 — alerted_at 이 8일 전이면 재발화 + 갱신 + stale = (datetime.now(timezone.utc) - timedelta(days=8)).isoformat() + payload = dict(row.payload) + payload["presegment"] = {**payload["presegment"], "alerted_at": stale} + row.payload = payload + with pytest.raises(StageDeferred): + await dsw._hold_awaiting_split(session, row, plan, 999, doc_title="테스트 문서") + assert len(_alert_recorder) == 2 + assert row.payload["presegment"]["alerted_at"] != stale + + +# ─── C. validate_override_boundaries ──────────────────────────────────────── + +def test_validate_ok_full_coverage(): + text = "가" * 40_000 # ≈ 21,160 tok + bounds = [[0, 20_000, "전반"], [20_000, 40_000, "후반"]] + check = validate_override_boundaries(text, bounds) + assert check.ok and check.errors == [] + assert check.coverage_pct == 100.0 + assert check.boundaries == [(0, 20_000, "전반"), (20_000, 40_000, "후반")] + assert all(t <= CAP_TOKENS for t in check.unit_tokens) + + +def test_validate_dict_form_and_units_from_boundaries(): + text = "가" * 10_000 + bounds = [{"start": 0, "end": 5_000, "title": "a"}, {"start": 5_000, "end": 10_000, "title": "b"}] + check = validate_override_boundaries(text, bounds) + assert check.ok + units = units_from_boundaries(text, check.boundaries) + assert [u.index for u in units] == [0, 1] + assert units[0].text == text[0:5_000] + assert units[0].section_titles == ["a"] + assert units[0].est_tokens == estimate_tokens(text[0:5_000]) + + +def test_validate_rejects_overlap(): + text = "가" * 10_000 + check = validate_override_boundaries(text, [[0, 6_000, "a"], [5_000, 10_000, "b"]]) + assert not check.ok + assert any("중첩" in e for e in check.errors) + + +def test_validate_rejects_cap_exceed(): + text = "가" * 40_000 # 단일 유닛 ≈ 21,160 tok > CAP 12,000 + check = validate_override_boundaries(text, [[0, 40_000, "통짜"]]) + assert not check.ok + assert any("cap" in e and "유닛 0" in e for e in check.errors) # 어느 유닛인지 명시 + + +def test_validate_rejects_low_coverage(): + text = "가" * 10_000 + check = validate_override_boundaries(text, [[0, 1_000, "머리만"]]) + assert not check.ok + assert any("커버리지" in e for e in check.errors) + + +def test_validate_rejects_out_of_range_and_todo(): + text = "가" * 1_000 + check = validate_override_boundaries(text, [[0, 2_000, "밖"]]) + assert not check.ok and any("범위 밖" in e for e in check.errors) + + check2 = validate_override_boundaries( + text, [{"start": 0, "end": 1_000, "title": "t", "todo": "분할 필요"}] + ) + assert not check2.ok and any("TODO" in e for e in check2.errors) + + +def test_validate_warns_on_gap_but_passes_coverage(): + text = "가" * 100_000 + # 4% 공백 구간 — 커버리지 96% (>=90) 통과 + 경고 + bounds = [[0, 20_000, "a"], [24_000, 100_000, None]] + # 24000~100000 = 76000자 ≈ 40,204 tok > CAP → cap 완화해 gap 경고만 검증 + check = validate_override_boundaries(text, bounds, cap=50_000) + assert check.ok + assert any("공백 구간" in w for w in check.warnings) + + +def test_choose_override_source_prefers_md_content(): + assert choose_override_source("# md 본문", "추출본") == ("md_content", "# md 본문") + assert choose_override_source(" \n", "추출본") == ("extracted_text", "추출본") + assert choose_override_source(None, None) == ("extracted_text", "") + + +def test_leaf_spans_reconstruct_source(): + md = "# 절 1\n" + "가" * 100 + "\n# 절 2\n" + "나" * 100 + leaves = extract_leaves(md) + spans = leaf_spans(md, leaves) + assert len(spans) == len(leaves) + for (s, e), leaf in zip(spans, leaves): + assert md[s:e] == leaf.text + # 인접 파티션 — 이어붙이면 원문 전체 + assert spans[0][0] == 0 and spans[-1][1] == len(md) + + +# ─── D. units_override 재개 경로 (worker process 레벨) ────────────────────── + +def _override_payload(text: str, n_units: int = 3, source: str = "extracted_text") -> dict: + step = len(text) // n_units + bounds = [] + for i in range(n_units): + s = i * step + e = len(text) if i == n_units - 1 else (i + 1) * step + bounds.append([s, e, f"파트 {i + 1}"]) + return { + "envelope": _envelope_raw(), + "subject_domain": "generic", + "presegment": { + "tier": "whole", "over_pct": 61.46, "awaiting_split": False, + "units_override": { + "source": source, "source_len": len(text), "boundaries": bounds, + }, + }, + } + + +@pytest.mark.asyncio +async def test_units_override_bypasses_tier_gate(monkeypatch, _patch_telemetry, _alert_recorder): + doc = _doc(GIANT_WHOLE_MD) # override 없으면 whole → HOLD 였을 문서 + row = SimpleNamespace(payload=_override_payload(GIANT_WHOLE_MD, n_units=3)) + session = FakeProcSession(doc, row) + + client = FakeClient() + monkeypatch.setattr(dsw, "AIClient", lambda: client) + + def _boom(*a, **k): + raise AssertionError("units_override 는 plan_summarize_units(tier 재판정)를 타면 안 됨") + + monkeypatch.setattr(dsw, "plan_summarize_units", _boom) + + await dsw.process(999, session) + + # map 3 + reduce 1, 모든 콜 캡 준수 (오버헤드 = 정책 템플릿+envelope ~3K) + assert len(client.prompts) == 4 + for p in client.prompts: + assert estimate_tokens(p) <= CAP_TOKENS + 3_000 + assert doc.ai_detail_summary == "최종 상세." + assert doc.ai_analysis_tier == "deep" + preseg = row.payload["presegment"] + assert preseg["tier"] == "override" + assert preseg["over_pct"] == 61.46 # HOLD 당시 실측치 보존 + assert len(preseg["map_results"]) == 3 + assert preseg["units_override"]["boundaries"][0][2] == "파트 1" # override 보존(감사) + assert _alert_recorder == [] # 정상 재개 — 알람 없음 + assert len(_patch_telemetry) == 1 + + +@pytest.mark.asyncio +async def test_bad_override_reholds_and_alerts(monkeypatch, _patch_telemetry, _alert_recorder): + doc = _doc(GIANT_WHOLE_MD) + payload = _override_payload(GIANT_WHOLE_MD, n_units=1) # 단일 유닛 ≈ 31.7K tok > CAP*1.1 + row = SimpleNamespace(payload=payload) + session = FakeProcSession(doc, row) + + def _no_llm(): + raise AssertionError("잘못된 override 는 LLM 콜(900s 재생산)로 흐르면 안 됨") + + monkeypatch.setattr(dsw, "AIClient", _no_llm) + + with pytest.raises(StageDeferred) as ei: + await dsw.process(999, session) + + assert ei.value.retry_after_minutes == dsw.HOLD_RETRY_MINUTES + preseg = row.payload["presegment"] + assert preseg["awaiting_split"] is True # 재-HOLD + assert "cap" in preseg["override_rejected"] + assert "units_override" in preseg # 원인 조사용 보존 + assert len(_alert_recorder) == 1 # 거부 알람 + assert "거부" in _alert_recorder[0][0] + assert doc.ai_detail_summary is None + assert _patch_telemetry == [] + + +@pytest.mark.asyncio +async def test_override_source_len_mismatch_reholds(monkeypatch, _alert_recorder): + doc = _doc(GIANT_WHOLE_MD) + payload = _override_payload(GIANT_WHOLE_MD, n_units=3) + payload["presegment"]["units_override"]["source_len"] = 12_345 # 본문 재생성 시나리오 + row = SimpleNamespace(payload=payload) + session = FakeProcSession(doc, row) + monkeypatch.setattr(dsw, "AIClient", lambda: (_ for _ in ()).throw(AssertionError("no LLM"))) + + with pytest.raises(StageDeferred): + await dsw.process(999, session) + + assert row.payload["presegment"]["awaiting_split"] is True + assert "source_len 불일치" in row.payload["presegment"]["override_rejected"] + assert len(_alert_recorder) == 1 + + +@pytest.mark.asyncio +async def test_no_override_small_doc_keeps_single_call_path( + monkeypatch, _patch_telemetry, _alert_recorder +): + """무회귀 — units_override 없는 소형 문서는 기존 단일콜 경로 그대로.""" + doc = _doc(SMALL_MD) + row = SimpleNamespace(payload={"envelope": _envelope_raw(), "subject_domain": "generic"}) + session = FakeProcSession(doc, row) + client = FakeClient() + monkeypatch.setattr(dsw, "AIClient", lambda: client) + + await dsw.process(999, session) + + assert len(client.prompts) == 1 # 단일콜 (map-reduce 아님) + assert SMALL_MD[:200].split("\n")[1][:50] in client.prompts[0] # 원문 슬라이스 포함 + assert doc.ai_detail_summary == "유닛 상세." + assert "presegment" not in row.payload # payload 무변경 + assert _alert_recorder == [] + + +@pytest.mark.asyncio +async def test_no_override_whole_doc_still_holds_with_alert( + monkeypatch, _patch_telemetry, _alert_recorder +): + """override 미주입 whole 문서 — 기존 HOLD 시멘틱 유지 + PR3 알람만 추가.""" + doc = _doc(GIANT_WHOLE_MD) + row = SimpleNamespace(payload={"envelope": _envelope_raw(), "subject_domain": "generic"}) + session = FakeProcSession(doc, row) + monkeypatch.setattr(dsw, "AIClient", lambda: (_ for _ in ()).throw(AssertionError("no LLM"))) + + with pytest.raises(StageDeferred): + await dsw.process(999, session) + + preseg = row.payload["presegment"] + assert preseg["awaiting_split"] is True and preseg["tier"] == "whole" + assert len(_alert_recorder) == 1 + assert "HOLD" in _alert_recorder[0][0]