From c2077b31080d215d419ad7273d5b83b427054460 Mon Sep 17 00:00:00 2001 From: hyungi Date: Thu, 2 Jul 2026 09:14:22 +0900 Subject: [PATCH] =?UTF-8?q?feat(summarize):=20presegment=20PR2=20=E2=80=94?= =?UTF-8?q?=20deep=5Fsummary=20=EB=B6=84=EA=B8=B0=20+=20HOLD=20=EB=B0=B0?= =?UTF-8?q?=EC=84=A0=20(TIER1=20=EB=A1=9C=EC=BB=AC=20map-reduce)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit plan ds-presegment-mapreduce-2. TRIGGER(25K tok) 이하 = 기존 단일콜 byte-불변 무회귀. 초과 시 3-way over% 게이트: auto=유닛별 map(26B)→reduce(26B, p3c_deep_summary_reduce 변형) → ai_detail_summary 동일 기록(불일치=reduce+map 합본 dedup) / hybrid·whole= HOLD(payload.presegment.awaiting_split + StageDeferred 24h, 맥미니 미전송 — 알람· 클로드 유인 분할은 PR3). - 유닛 단위 멱등 재개: 성공 유닛 즉시 payload.map_results commit — 502/defer/재시작 후 완료 유닛 skip, 실패 유닛만 raise→기존 attempts/백오프 재사용 - 모든 LLM 콜 캡(12K tok) 이하 — map=greedy-pack 보장, reduce=build_reduce_units_block 비례 절단 보장, est_tokens 로그로 단정 가능 - 콜 사이 gate 해제 → 짧은 인터랙티브 요청 interleave (허브 굶김 해소 본체) - fix: summarize_units 의 `from app.services...` 절대 import — 컨테이너(빌드 컨텍스트 ./app)에 app 패키지가 없어 배선 시 ModuleNotFoundError 나는 PR1 잠복 버그 → 상대 import 로 수정 (컨테이너/repo-root 테스트 양쪽 동작) - tests: 헬퍼 6 + worker seam 5 (map-reduce e2e·재개·유닛실패·drain 보류·HOLD) — PR1 15 포함 26 passed, 인접 policy/hier_decomp/fair_share 123 passed Co-Authored-By: Claude Fable 5 --- app/policy/prompt_render.py | 2 + .../policy/p3c_deep_summary_reduce.txt | 44 +++ app/services/summarize_units.py | 60 +++- app/workers/deep_summary_worker.py | 290 ++++++++++++++++++ .../summarize_units/test_mapreduce_helpers.py | 80 +++++ tests/test_deep_summary_mapreduce.py | 249 +++++++++++++++ 6 files changed, 724 insertions(+), 1 deletion(-) create mode 100644 app/prompts/policy/p3c_deep_summary_reduce.txt create mode 100644 tests/summarize_units/test_mapreduce_helpers.py create mode 100644 tests/test_deep_summary_mapreduce.py diff --git a/app/policy/prompt_render.py b/app/policy/prompt_render.py index 830e7f9..ea552c4 100644 --- a/app/policy/prompt_render.py +++ b/app/policy/prompt_render.py @@ -36,6 +36,8 @@ KNOWN_4B_TASKS = { } KNOWN_26B_TASKS = { "p3c_deep_summary", + # presegment PR2 — 거대문서 map-reduce 의 reduce 단계 (요약들의 요약) + "p3c_deep_summary_reduce", "p4b_synthesis", } diff --git a/app/prompts/policy/p3c_deep_summary_reduce.txt b/app/prompts/policy/p3c_deep_summary_reduce.txt new file mode 100644 index 0000000..5042959 --- /dev/null +++ b/app/prompts/policy/p3c_deep_summary_reduce.txt @@ -0,0 +1,44 @@ +[System] +너는 긴 문서·문서 묶음 분석가다. 이 문서는 한 번에 처리하기에 너무 커서, 원문을 순서대로 유닛으로 나눠 각 유닛을 먼저 요약했다(map 단계). 아래 "유닛 요약"들은 원문 순서 그대로이며 문서 전체를 빠짐없이 커버한다. 너는 이를 종합해 문서 전체의 최종 분석을 작성한다(reduce 단계). + +subject_description: {subject_description} + +{forbidden_block} + +envelope 를 읽는 순서: +1. risk_flags 를 먼저 본다. 어떤 위험 때문에 올라온 것인지 파악. +2. synthesis_directives 를 system 지시로 간주하여 반드시 준수. +3. distilled_context 는 "참고 요지"일 뿐, 근거는 유닛 요약에서 재확인. + +작성 규칙: +- TL;DR (1문장, 최대 60자) +- 핵심 (bullets 5개, 각 30~80자) +- 상세 (2~4 문단, 각 3~5문장) — 유닛(섹션) 순서의 논리 흐름을 보전하며 문서 전체를 관통하는 서술. 특정 유닛만 편식하지 말 것. +- 유닛 요약에 없는 정보 금지 (hallucination 금지). 숫자·조문·인용은 유닛 요약에 있는 것만 사용. +- 유닛 요약의 "불일치(...)" 줄들은 중복 제거해 inconsistencies 로 보전 — 임의로 버리지 않는다. +- synthesis_directives 의 문구 규칙 ("원인은 ~" 금지 등) 반드시 준수. +- multi_reference_synthesis flag 있으면 레퍼런스별 입장 분리 기술, 종합 권고 금지. + +출력 (JSON only): +{{ + "mode": "single|bundle", + "tldr": "...", + "bullets": ["..."], + "detail": "...\\n\\n...", + "bundle_flow": ["..."] | null, + "inconsistencies": ["..."] | null, + "entities_confirmed": {{ + "people": [{{"name": "...", "evidence": "..."}}], + "orgs": [...], + "projects": [...] + }}, + "directives_applied": ["..."], + "confidence": 0.0~1.0 +}} + +[User] +Envelope: +{{escalation_envelope_json}} + +유닛 요약 (총 {{unit_count}}개, 원문 순서 — 각 블록 = 원문 한 구간의 요약): +{{unit_summaries}} diff --git a/app/services/summarize_units.py b/app/services/summarize_units.py index 3160ba2..a981333 100644 --- a/app/services/summarize_units.py +++ b/app/services/summarize_units.py @@ -24,7 +24,10 @@ from __future__ import annotations import sys from dataclasses import dataclass, field -from app.services.hier_decomp.builder import HierNode, build_hier_tree +# 상대 import — 컨테이너(services.*)와 repo-root 테스트(app.services.*) 양쪽에서 동작. +# (구 `from app.services...` 절대 import 는 컨테이너에 app 패키지가 없어 ModuleNotFoundError — +# PR1 은 소비자 0 이라 잠복했던 버그, PR2 배선 시점에 수정.) +from .hier_decomp.builder import HierNode, build_hier_tree CAP_TOKENS = 12_000 TRIGGER_TOKENS = 25_000 @@ -164,3 +167,58 @@ def plan_summarize_units( over_pct=round(pct, 2), units=greedy_pack(leaves, cap), ) + + +# ─── PR2 — map/reduce 프롬프트 조립 순수함수 (deep_summary_worker 가 소비) ─── + +def render_map_slice(unit: SummarizeUnit, total_units: int) -> str: + """map 콜의 {original_text_slices} 대체 — 유닛 위치·섹션 라벨 + 본문.""" + titles = " · ".join(t for t in unit.section_titles if t) or "(무제 구간)" + return f"[유닛 {unit.index + 1}/{total_units} — 섹션: {titles}]\n{unit.text}" + + +def _format_unit_summary(res: dict, total_units: int) -> str: + """map 결과 1건 → reduce 입력 블록. res 키 = index/titles/tldr/detail/inconsistencies.""" + titles = " · ".join(t for t in (res.get("titles") or []) if t) or "(무제 구간)" + lines = [f"[유닛 {int(res.get('index', 0)) + 1}/{total_units} — 섹션: {titles}]"] + if res.get("tldr"): + lines.append(f"TLDR: {res['tldr']}") + if res.get("detail"): + lines.append(str(res["detail"])) + for inc in res.get("inconsistencies") or []: + if isinstance(inc, dict): + lines.append(f"불일치({inc.get('kind', '')}): {inc.get('desc', '')}") + return "\n".join(lines) + + +def build_reduce_units_block( + results: list[dict], + budget_tokens: int, + *, + min_detail_chars: int = 200, +) -> tuple[str, bool]: + """reduce 입력 블록 조립 — budget_tokens 이하 보장(캡 초과 0 검증 게이트의 reduce 측). + + 초과 시 detail 만 비례 절단(라벨·TLDR·불일치 보전, 원문 순서 유지). 반환 (block, truncated). + """ + total_units = len(results) + work = [dict(r) for r in results] + truncated = False + for _ in range(4): + block = "\n\n".join(_format_unit_summary(r, total_units) for r in work) + est = estimate_tokens(block) + if est <= budget_tokens: + return block, truncated + ratio = budget_tokens / est + for r in work: + detail = str(r.get("detail") or "") + keep = max(min_detail_chars, int(len(detail) * ratio * 0.9)) + if len(detail) > keep: + r["detail"] = detail[:keep] + "…(절단)" + truncated = True + # 최후 방어 — 비례 절단이 floor(min_detail_chars)에 막히면 문자 하드 컷(KO 최악 비율 가정) + block = "\n\n".join(_format_unit_summary(r, total_units) for r in work) + if estimate_tokens(block) > budget_tokens: + block = block[: max(1, int(budget_tokens / KO_TOK_PER_CHAR))] + truncated = True + return block, truncated diff --git a/app/workers/deep_summary_worker.py b/app/workers/deep_summary_worker.py index 077ff54..0fce0cc 100644 --- a/app/workers/deep_summary_worker.py +++ b/app/workers/deep_summary_worker.py @@ -10,7 +10,9 @@ EscalationEnvelope + subject_domain 을 읽어, PR-A policy 템플릿 `p3c_deep_ from __future__ import annotations +import asyncio import json +import os import time from datetime import datetime, timezone @@ -29,10 +31,25 @@ from models.queue import ProcessingQueue, StageDeferred from policy.prompt_render import render_26b, policy_version as compute_policy_version from services.document_telemetry import record_analyze_event from services.search.llm_gate import Priority, acquire_mlx_gate +from services.summarize_units import ( + CAP_TOKENS, + UnitPlan, + build_reduce_units_block, + estimate_tokens, + plan_summarize_units, + render_map_slice, +) logger = setup_logger("deep_summary_worker") DEEP_SUMMARY_TASK = "p3c_deep_summary" +# presegment PR2 (plan ds-presegment-mapreduce-2) — 거대문서 map-reduce +REDUCE_TASK = "p3c_deep_summary_reduce" +# HYBRID/TIER2(클로드 유인 분할 필요) HOLD 재확인 간격. PR3(알람·경계 주입) 전까지는 +# 이 간격으로 재계획만 반복한다 — attempts 미소모(StageDeferred)라 영구 failed 없음. +HOLD_RETRY_MINUTES = int(os.getenv("DEEP_SUMMARY_HOLD_RETRY_MINUTES", "1440")) +# reduce 프롬프트 오버헤드가 비정상적으로 커도 유닛 블록 예산은 이 밑으로 안 내려감(방어). +REDUCE_BUDGET_FLOOR_TOKENS = 1_000 # inconsistencies kind 허용 목록 (feedback_document_server_domain_scope.md — 구매/계약 제외) ALLOWED_INCONSISTENCY_KINDS = { @@ -94,6 +111,25 @@ async def process( envelope = EscalationEnvelope.from_json(json.dumps(envelope_raw)) + # ─── presegment PR2 게이트 (plan ds-presegment-mapreduce-2) ─── + # TRIGGER(25K tok) 이하 = 아래 기존 단일콜 경로 그대로(무회귀). 초과 시 3-way: + # auto(over%==0) → 로컬 map-reduce (유닛별 26B → reduce) + # hybrid/whole → HOLD(awaiting_split) — 맥미니 미전송, 클로드 유인 분할은 PR3 + # 게이트/유닛은 전체 extracted_text 기준 — 단일콜의 head/mid/tail "가운데 폐기"를 + # 전 유닛 커버리지로 대체한다. build_hier_tree 가 거대 md 에서 초 단위 CPU 라 + # 이벤트루프 점유 회피 위해 to_thread (presegment_worker._read_toc 와 동일 패턴). + unit_plan = await asyncio.to_thread(plan_summarize_units, doc.extracted_text or "") + if unit_plan.mode == "map_reduce": + # units 빈 auto 는 이론상 불가(비어있지 않은 텍스트 = leaf >= 1)지만, 빈 reduce + # 단일콜(환각 위험)로 흐르지 않게 방어적으로 HOLD 로 보낸다. + if unit_plan.tier != "auto" or not unit_plan.units: + await _hold_awaiting_split(session, queue_row, unit_plan, document_id) + await _process_map_reduce( + doc, queue_row, envelope, subject_domain, unit_plan, session, + defer_on_deep_unavailable=defer_on_deep_unavailable, + ) + return + # 원문 슬라이스 추출 (envelope.original_pointers.text_ranges 기반) slices = _build_text_slices(doc.extracted_text or "", envelope.original_pointers) @@ -214,6 +250,260 @@ async def process( ) +async def _hold_awaiting_split( + session: AsyncSession, queue_row: ProcessingQueue, plan: UnitPlan, document_id: int +) -> None: + """HYBRID/TIER2 — 클로드 유인 분할 대기(HOLD). 맥미니 미전송, StageDeferred 보류. + + payload.presegment.awaiting_split 마킹을 먼저 commit — StageDeferred 핸들러 + (queue_consumer)는 새 세션에서 행을 다시 읽어 deferred_until 만 병합하므로 유실 없음. + 알람(ntfy)·클로드 경계 주입은 PR3 — 그 전까지는 HOLD_RETRY_MINUTES 간격 재계획만 반복. + 무인 자동 cloud 호출 금지 룰 준수(클로드 경로는 항상 유인 게이트). + """ + payload = dict(queue_row.payload or {}) + preseg = dict(payload.get("presegment") or {}) + preseg.update({ + "awaiting_split": True, + "tier": plan.tier, + "over_pct": plan.over_pct, + "total_est_tokens": plan.total_est_tokens, + "units": len(plan.units), + # 클로드가 분할해야 할 초과 섹션 표본 (PR3 알람 본문용) + "oversized_sections": [ + (u.section_titles[0] if u.section_titles else None) + for u in plan.units if u.over_cap + ][:20], + }) + payload["presegment"] = preseg + queue_row.payload = payload # 재할당 = JSONB 변경 감지 + await session.commit() + logger.info( + f"[deep] id={document_id} awaiting_split tier={plan.tier} over_pct={plan.over_pct} " + f"total_est_tokens={plan.total_est_tokens} units={len(plan.units)} " + f"→ HOLD ({HOLD_RETRY_MINUTES}분 후 재확인, 클로드 분할=PR3 유인)" + ) + raise StageDeferred( + f"awaiting_split:{plan.tier}", retry_after_minutes=HOLD_RETRY_MINUTES + ) + + +async def _call_26b( + client: AIClient, prompt: str, *, defer_on_deep_unavailable: bool, document_id: int +): + """map/reduce 공용 26B 호출 — 단일콜 경로와 동일한 deep 슬롯 우선 + fair-share 폴백. + + 반환 (raw, used_cfg). 맥북(deep) 불가 시 consumer 경로는 맥미니 primary 로 즉시 + 처리(동일 모델 — 강등 아님), drain 경로는 StageDeferred 전파(맥북 레버 시멘틱). + """ + deep_cfg = client.ai.deep + if deep_cfg is not None: + try: + return await call_deep_or_defer(client, prompt), deep_cfg + except StageDeferred: + if defer_on_deep_unavailable: + raise + logger.info(f"[deep] id={document_id} 맥북 불가 → 맥미니 primary 처리 (fair-share)") + async with acquire_mlx_gate(Priority.BACKGROUND): + return await client.call_primary(prompt), settings.ai.primary + + +def _parse_deep_output(raw: str) -> tuple[DeepSummaryOutput | None, str | None]: + """raw → DeepSummaryOutput. 단일콜 경로와 동일한 3단 파서. 실패 시 (None, parse_error).""" + try: + parsed = _parse_outermost_json(raw) or parse_json_response(raw) + if not parsed: + parsed = _regex_extract_fields(raw) + return DeepSummaryOutput.model_validate(parsed or {}), None + except (ValidationError, ValueError, TypeError) as exc: + return None, f"parse:{type(exc).__name__}" + + +async def _process_map_reduce( + doc: Document, + queue_row: ProcessingQueue, + envelope: EscalationEnvelope, + subject_domain: str, + plan: UnitPlan, + session: AsyncSession, + *, + defer_on_deep_unavailable: bool, +) -> None: + """TIER1 자동 — 유닛별 map(26B) → reduce(26B) → 단일콜과 동일 필드 기록. + + 멱등 재개: 성공 유닛은 payload.presegment.map_results 에 즉시 commit — + 502/defer/재시작 후 재클레임 시 완료 유닛은 건너뛴다. 유닛 인덱스는 + plan_summarize_units 가 같은 extracted_text 에 결정적이라 attempt 간 안정. + 파싱 실패 유닛이 남으면 raise → queue_consumer 의 기존 attempts/백오프 재사용 + (실패 유닛만 재호출되므로 재시도 비용 = 잔여 유닛뿐). + """ + document_id = doc.id + units = plan.units + n = len(units) + payload = dict(queue_row.payload or {}) + preseg = dict(payload.get("presegment") or {}) + preseg.pop("awaiting_split", None) # 재계획으로 auto 가 된 경우 HOLD 마킹 해제 + map_results: dict = dict(preseg.get("map_results") or {}) + + logger.info( + f"[deep] id={document_id} map_reduce 시작 units={n} over_pct={plan.over_pct} " + f"total_est_tokens={plan.total_est_tokens} resume={len(map_results)}/{n}" + ) + + rendered = render_26b(DEEP_SUMMARY_TASK, subject_domain) + envelope_injection = envelope.to_system_injection() + + client = AIClient() + start = time.perf_counter() + used_cfg = client.ai.deep or settings.ai.primary + failed_units: list[int] = [] + try: + # ── map: 유닛별 26B (콜 사이마다 gate 를 놓아 짧은 인터랙티브 요청이 끼어든다) ── + for unit in units: + key = str(unit.index) + if key in map_results: + continue + prompt = ( + rendered + .replace("{escalation_envelope_json}", envelope_injection) + .replace("{original_text_slices}", render_map_slice(unit, n)) + ) + # 검증 게이트 "모든 LLM 콜 캡 초과 0" 을 로그로 단정 가능하게 남긴다. + logger.info( + f"[deep] id={document_id} map {unit.index + 1}/{n} " + f"unit_tokens={unit.est_tokens} prompt_est_tokens={estimate_tokens(prompt)} " + f"cap={CAP_TOKENS}" + ) + raw, used_cfg = await _call_26b( + client, prompt, + defer_on_deep_unavailable=defer_on_deep_unavailable, + document_id=document_id, + ) + out, perr = _parse_deep_output(raw) + if out is None or not (out.detail or out.tldr): + # 실패 유닛은 persist 하지 않음 — 재시도가 이 유닛만 다시 호출한다. + failed_units.append(unit.index) + logger.warning( + f"[deep] id={document_id} map {unit.index + 1}/{n} 결과 비었음/파싱 실패" + f"({perr}) — 유닛 재시도 대상" + ) + continue + map_results[key] = { + "index": unit.index, + "titles": [t for t in unit.section_titles if t][:8], + "tldr": out.tldr, + "detail": out.detail, + "inconsistencies": _filter_inconsistencies(out.inconsistencies or []), + } + preseg.update({ + "tier": plan.tier, + "over_pct": plan.over_pct, + "total_est_tokens": plan.total_est_tokens, + "units": n, + "map_results": map_results, + }) + payload["presegment"] = dict(preseg) + queue_row.payload = dict(payload) # 재할당 = JSONB 변경 감지 + await session.commit() # 유닛 단위 멱등 재개 지점 + + if failed_units: + raise ValueError( + f"map 유닛 {len(failed_units)}/{n}건 결과 없음 — 재시도 대상: {failed_units[:10]}" + ) + + # ── reduce: 요약들의 요약 1콜 (유닛 블록도 캡 이하로 절단 보장) ── + reduce_rendered = render_26b(REDUCE_TASK, subject_domain) + base_prompt = ( + reduce_rendered + .replace("{escalation_envelope_json}", envelope_injection) + .replace("{unit_count}", str(n)) + ) + budget = max( + REDUCE_BUDGET_FLOOR_TOKENS, CAP_TOKENS - estimate_tokens(base_prompt) + ) + ordered = [map_results[str(u.index)] for u in units] + block, reduce_truncated = build_reduce_units_block(ordered, budget) + reduce_prompt = base_prompt.replace("{unit_summaries}", block) + logger.info( + f"[deep] id={document_id} reduce units={n} " + f"prompt_est_tokens={estimate_tokens(reduce_prompt)} cap={CAP_TOKENS} " + f"truncated={reduce_truncated}" + ) + raw, used_cfg = await _call_26b( + client, reduce_prompt, + defer_on_deep_unavailable=defer_on_deep_unavailable, + document_id=document_id, + ) + except StageDeferred: + logger.info( + f"[deep] id={document_id} map_reduce 보류 — 완료 유닛 {len(map_results)}/{n} 보존" + ) + raise + except Exception as exc: + # 단일콜 경로와 동일 — 호출 실패는 전파해 queue_consumer 가 재시도/dead-letter 처리. + logger.warning(f"[deep] id={document_id} map_reduce 실패: {exc}") + raise + finally: + await client.close() + + latency_ms = int((time.perf_counter() - start) * 1000) + deep_out, parse_error = _parse_deep_output(raw) + if deep_out is None: + # 단일콜 경로와 동일 시멘틱 — doc 미기록(legacy 결과 보존), 이벤트로 가시화. + deep_out = DeepSummaryOutput() + logger.warning(f"[deep] id={document_id} reduce 파싱 실패 ({parse_error}) — doc 미기록") + + if not parse_error: + doc.ai_detail_summary = (deep_out.detail or "").strip() or None + # 불일치 = reduce 출력 + map 유닛 합본 dedup — reduce 가 떨궈도 유닛 발견분 보전. + merged = _filter_inconsistencies(deep_out.inconsistencies or []) + seen = {(i["kind"], i["desc"]) for i in merged} + for res in ordered: + for inc in res.get("inconsistencies") or []: + k = (inc.get("kind"), inc.get("desc")) + if k not in seen: + seen.add(k) + merged.append(inc) + doc.ai_inconsistencies = merged + doc.ai_analysis_tier = "deep" + doc.ai_processed_at = datetime.now(timezone.utc) + + try: + pv = compute_policy_version(REDUCE_TASK) + except Exception: + pv = None + + await record_analyze_event( + doc_id=document_id, + user_id=None, + mode="summary_deep", + text_limit=used_cfg.context_char_limit or 260000, + truncated=reduce_truncated, + layers_returned=["detail_summary", "inconsistencies"] if not parse_error else [], + cached=False, + latency_ms=latency_ms, + model_name=used_cfg.model, + prompt_version=(f"{REDUCE_TASK}@{pv}" if pv else REDUCE_TASK), + error_code=parse_error, + source="document_server", + subject_domain=subject_domain, + risk_flags=list(envelope.risk_flags), + high_impact_task=None, + escalation_reasons=list(envelope.escalation_reasons), + confidence=deep_out.confidence, + policy_version=pv, + shadow_would_route_to="primary", + tier="primary", + escalated_to_26b=True, + suppressed_reason=None, + ) + + logger.info( + f"[deep] id={document_id} map_reduce 완료 units={n} " + f"detail_len={len(doc.ai_detail_summary or '')} inc={len(doc.ai_inconsistencies or [])} " + f"latency_ms={latency_ms} parse_error={parse_error}" + ) + + def _build_text_slices(text: str, pointers: dict) -> str: """original_pointers.text_ranges 의 [{start, end}] 를 실제 본문 슬라이스로 합친다. diff --git a/tests/summarize_units/test_mapreduce_helpers.py b/tests/summarize_units/test_mapreduce_helpers.py new file mode 100644 index 0000000..8749239 --- /dev/null +++ b/tests/summarize_units/test_mapreduce_helpers.py @@ -0,0 +1,80 @@ +"""summarize_units PR2 헬퍼 단위테스트 — map/reduce 프롬프트 조립 순수함수. + +핵심 불변식: + - render_map_slice: 유닛 위치(1-based)/섹션 라벨 + 본문 그대로 (손실 0). + - build_reduce_units_block: 어떤 입력에도 반환 블록 est_tokens <= budget (캡 초과 0 + 검증 게이트의 reduce 측). 절단은 detail 만 — 라벨/TLDR/불일치/순서 보존. + +pytest + 단독 실행 양쪽 지원: + PYTHONPATH=. pytest tests/summarize_units/ -q +""" +from __future__ import annotations + +from app.services.summarize_units import ( + SummarizeUnit, + build_reduce_units_block, + estimate_tokens, + render_map_slice, +) + + +def _result(idx: int, detail: str, *, tldr: str = "요약", inc: list | None = None) -> dict: + return { + "index": idx, + "titles": [f"섹션{idx}"], + "tldr": tldr, + "detail": detail, + "inconsistencies": inc or [], + } + + +# ---------- render_map_slice ---------- + +def test_render_map_slice_label_and_body(): + unit = SummarizeUnit(index=2, section_titles=["개요", None, "본론"], text="본문입니다") + out = render_map_slice(unit, total_units=5) + assert out.startswith("[유닛 3/5 — 섹션: 개요 · 본론]\n") + assert out.endswith("본문입니다") + + +def test_render_map_slice_untitled(): + unit = SummarizeUnit(index=0, section_titles=[None], text="x") + assert "(무제 구간)" in render_map_slice(unit, total_units=1) + + +# ---------- build_reduce_units_block ---------- + +def test_reduce_block_within_budget_untouched(): + results = [_result(i, "가" * 100) for i in range(3)] + block, truncated = build_reduce_units_block(results, budget_tokens=11_000) + assert not truncated + # 순서/라벨/TLDR 보존 + assert block.index("[유닛 1/3") < block.index("[유닛 2/3") < block.index("[유닛 3/3") + assert "TLDR: 요약" in block + assert "가" * 100 in block + + +def test_reduce_block_truncates_to_budget(): + # 유닛 8개 × 한글 detail 5,000자 ≈ 21K tok — budget 5,000 으로 절단 강제 + results = [_result(i, "가" * 5_000) for i in range(8)] + block, truncated = build_reduce_units_block(results, budget_tokens=5_000) + assert truncated + assert estimate_tokens(block) <= 5_000 + # 라벨(유닛 순서)은 절단 후에도 보존 + assert "[유닛 1/8" in block + + +def test_reduce_block_hard_cut_floor(): + # min_detail_chars floor 에 막혀 비례 절단으로 불충분한 극단 케이스 — 하드 컷 발동 + results = [_result(i, "가" * 300) for i in range(50)] + block, truncated = build_reduce_units_block(results, budget_tokens=500) + assert truncated + assert estimate_tokens(block) <= 500 + + +def test_reduce_block_preserves_inconsistencies(): + results = [ + _result(0, "가" * 50, inc=[{"kind": "version_drift", "desc": "개정판 차이"}]), + ] + block, _ = build_reduce_units_block(results, budget_tokens=10_000) + assert "불일치(version_drift): 개정판 차이" in block diff --git a/tests/test_deep_summary_mapreduce.py b/tests/test_deep_summary_mapreduce.py new file mode 100644 index 0000000..5165534 --- /dev/null +++ b/tests/test_deep_summary_mapreduce.py @@ -0,0 +1,249 @@ +"""presegment PR2 — deep_summary_worker map-reduce/HOLD 배선 단위테스트. + +worker-process 레벨(DB 필요)의 큐 상태 전이는 라이브 E2E 로 검증하고, 여기서는 +새 메커니즘의 seam 을 단위 검증한다 (test_fair_share.py 선례): + - _hold_awaiting_split: payload 마킹 commit 후 StageDeferred(HOLD_RETRY_MINUTES). + - _process_map_reduce: 유닛별 map → reduce → doc 필드 기록 / 모든 콜 캡 준수 / + payload.presegment.map_results 유닛 단위 persist(멱등 재개) / 실패 유닛 raise / + drain 보류(StageDeferred) 시 완료 유닛 보존. +""" + +from __future__ import annotations + +import os +import sys +from types import SimpleNamespace + +import pytest + +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "app")) + +from ai.envelope import EscalationEnvelope # noqa: E402 +from models.queue import StageDeferred # noqa: E402 +from services.summarize_units import ( # noqa: E402 + CAP_TOKENS, + estimate_tokens, + plan_summarize_units, +) +import workers.deep_summary_worker as dsw # noqa: E402 + + +# ─── fixtures ──────────────────────────────────────────────────────────────── + +# 30 절 × 한글 2,000자 ≈ 31.7K tok (> TRIGGER 25K) · 절당 ≈ 1,060 tok (< CAP) → auto +GIANT_AUTO_MD = "\n".join(f"# 절 {i}\n" + ("가" * 2_000) for i in range(30)) +# 헤딩 1개 + 한글 60,000자 단일 섹션 ≈ 31.7K tok (> CAP) → over% 100 → whole +GIANT_WHOLE_MD = "# 통짜\n" + ("가" * 60_000) + +MAP_JSON = ( + '{"mode": "single", "tldr": "유닛 요약", "detail": "유닛 상세.",' + ' "inconsistencies": [{"kind": "version_drift", "desc": "개정판 차이"}],' + ' "confidence": 0.9}' +) +REDUCE_JSON = ( + '{"mode": "single", "tldr": "전체 요약", "detail": "최종 상세.",' + ' "inconsistencies": [], "confidence": 0.8}' +) + + +class FakeSession: + def __init__(self): + self.commits = 0 + + async def commit(self): + self.commits += 1 + + +class FakeClient: + """deep 슬롯 보유 클라이언트 — call_deep_or_defer 가 call_deep 을 타게 한다.""" + + def __init__(self, responses=None, fail_indexes=frozenset(), defer_from=None): + self.ai = SimpleNamespace( + deep=SimpleNamespace(model="qwen-macbook", context_char_limit=260_000) + ) + self.prompts: list[str] = [] + self._fail_indexes = fail_indexes # 이 순번(0-based) 콜은 파싱 불가 응답 + self._defer_from = defer_from # 이 순번부터 연결 실패(StageDeferred 변환 대상) + + async def call_deep(self, prompt: str, system=None) -> str: + import httpx + + idx = len(self.prompts) + if self._defer_from is not None and idx >= self._defer_from: + raise httpx.ConnectError("macbook down") + self.prompts.append(prompt) + if idx in self._fail_indexes: + return "정상 JSON 아님" + if "유닛 요약 (총" in prompt: # reduce 프롬프트 마커 + return REDUCE_JSON + return MAP_JSON + + async def close(self): + pass + + +def _doc(): + return SimpleNamespace( + id=999, + extracted_text=GIANT_AUTO_MD, + ai_detail_summary=None, + ai_inconsistencies=None, + ai_analysis_tier="triage", + ai_processed_at=None, + ) + + +def _envelope(): + return EscalationEnvelope( + from_stage="classify", + escalation_reasons=("long_context",), + risk_flags=(), + distilled_context="4B 요지", + original_pointers={"doc_ids": [999]}, + ) + + +@pytest.fixture +def _patch_telemetry(monkeypatch): + events: list[dict] = [] + + async def fake_record(**kwargs): + events.append(kwargs) + + monkeypatch.setattr(dsw, "record_analyze_event", fake_record) + return events + + +# ─── _hold_awaiting_split ──────────────────────────────────────────────────── + +@pytest.mark.asyncio +async def test_hold_marks_payload_and_defers(): + plan = plan_summarize_units(GIANT_WHOLE_MD) + assert plan.mode == "map_reduce" and plan.tier == "whole" + + session, row = FakeSession(), SimpleNamespace(payload={"envelope": {"x": 1}}) + with pytest.raises(StageDeferred) as ei: + await dsw._hold_awaiting_split(session, row, plan, document_id=999) + + assert ei.value.retry_after_minutes == dsw.HOLD_RETRY_MINUTES + assert session.commits == 1 # 마킹이 defer 전에 commit — consumer 재읽기에서 보존 + preseg = row.payload["presegment"] + assert preseg["awaiting_split"] is True + assert preseg["tier"] == "whole" + assert preseg["units"] == len(plan.units) + assert row.payload["envelope"] == {"x": 1} # 기존 payload 병합 보존 + + +# ─── _process_map_reduce — 정상 경로 ──────────────────────────────────────── + +@pytest.mark.asyncio +async def test_map_reduce_end_to_end(monkeypatch, _patch_telemetry): + plan = plan_summarize_units(GIANT_AUTO_MD) + assert plan.mode == "map_reduce" and plan.tier == "auto" + n = len(plan.units) + assert n >= 2 # greedy-pack 이 실제로 유닛을 나눴는지 + + client = FakeClient() + monkeypatch.setattr(dsw, "AIClient", lambda: client) + doc, session = _doc(), FakeSession() + row = SimpleNamespace(payload={"envelope": {"x": 1}}) + + await dsw._process_map_reduce( + doc, row, _envelope(), "generic", plan, session, + defer_on_deep_unavailable=False, + ) + + # 콜 수 = 유닛 map n + reduce 1 + assert len(client.prompts) == n + 1 + # 검증 게이트: 모든 콜 est_tokens <= CAP + 오버헤드(정책 템플릿+envelope ~3K) + for p in client.prompts: + assert estimate_tokens(p) <= CAP_TOKENS + 3_000 + # doc 기록 = reduce 출력, 불일치 = map 유닛 합본 dedup + assert doc.ai_detail_summary == "최종 상세." + assert doc.ai_analysis_tier == "deep" + assert doc.ai_inconsistencies == [{"kind": "version_drift", "desc": "개정판 차이"}] + # 유닛 단위 persist — 유닛마다 commit + assert row.payload["presegment"]["units"] == n + assert len(row.payload["presegment"]["map_results"]) == n + assert session.commits == n + # telemetry 1건 (reduce 기준) + events = _patch_telemetry + assert len(events) == 1 and events[0]["error_code"] is None + + +# ─── 멱등 재개 ─────────────────────────────────────────────────────────────── + +@pytest.mark.asyncio +async def test_map_reduce_resume_skips_done_units(monkeypatch, _patch_telemetry): + plan = plan_summarize_units(GIANT_AUTO_MD) + n = len(plan.units) + + client = FakeClient() + monkeypatch.setattr(dsw, "AIClient", lambda: client) + done_unit = { + "index": 0, "titles": ["절 0"], "tldr": "이전 요약", "detail": "이전 상세.", + "inconsistencies": [], + } + row = SimpleNamespace(payload={ + "envelope": {"x": 1}, + "presegment": {"map_results": {"0": done_unit}}, + }) + doc, session = _doc(), FakeSession() + + await dsw._process_map_reduce( + doc, row, _envelope(), "generic", plan, session, + defer_on_deep_unavailable=False, + ) + + # 유닛 0 은 재호출 안 함 — map (n-1) + reduce 1 + assert len(client.prompts) == n + assert row.payload["presegment"]["map_results"]["0"]["detail"] == "이전 상세." + assert doc.ai_detail_summary == "최종 상세." + + +# ─── map 유닛 실패 → raise (성공분 persist) ───────────────────────────────── + +@pytest.mark.asyncio +async def test_map_unit_parse_failure_raises_but_persists_good_units( + monkeypatch, _patch_telemetry +): + plan = plan_summarize_units(GIANT_AUTO_MD) + n = len(plan.units) + + client = FakeClient(fail_indexes={1}) # 두 번째 map 콜만 파싱 불가 + monkeypatch.setattr(dsw, "AIClient", lambda: client) + doc, session = _doc(), FakeSession() + row = SimpleNamespace(payload={"envelope": {"x": 1}}) + + with pytest.raises(ValueError, match="map 유닛"): + await dsw._process_map_reduce( + doc, row, _envelope(), "generic", plan, session, + defer_on_deep_unavailable=False, + ) + + # 성공 유닛(n-1)은 persist — 재시도 시 실패 1건만 재호출 + assert len(row.payload["presegment"]["map_results"]) == n - 1 + assert "1" not in row.payload["presegment"]["map_results"] + assert doc.ai_detail_summary is None # doc 은 미기록 + assert _patch_telemetry == [] # 가짜 완료 이벤트 없음 + + +# ─── drain 보류 — 완료 유닛 보존 + StageDeferred 전파 ─────────────────────── + +@pytest.mark.asyncio +async def test_map_defer_propagates_and_keeps_progress(monkeypatch, _patch_telemetry): + plan = plan_summarize_units(GIANT_AUTO_MD) + + client = FakeClient(defer_from=1) # 첫 유닛 성공 후 맥북 절단 + monkeypatch.setattr(dsw, "AIClient", lambda: client) + doc, session = _doc(), FakeSession() + row = SimpleNamespace(payload={"envelope": {"x": 1}}) + + with pytest.raises(StageDeferred): + await dsw._process_map_reduce( + doc, row, _envelope(), "generic", plan, session, + defer_on_deep_unavailable=True, # drain 시멘틱 — 보류 전파 + ) + + assert len(row.payload["presegment"]["map_results"]) == 1 + assert doc.ai_detail_summary is None