feat(summarize): presegment PR2 — deep_summary 분기 + HOLD 배선 (TIER1 로컬 map-reduce)

plan ds-presegment-mapreduce-2. TRIGGER(25K tok) 이하 = 기존 단일콜 byte-불변 무회귀.
초과 시 3-way over% 게이트: auto=유닛별 map(26B)→reduce(26B, p3c_deep_summary_reduce
변형) → ai_detail_summary 동일 기록(불일치=reduce+map 합본 dedup) / hybrid·whole=
HOLD(payload.presegment.awaiting_split + StageDeferred 24h, 맥미니 미전송 — 알람·
클로드 유인 분할은 PR3).

- 유닛 단위 멱등 재개: 성공 유닛 즉시 payload.map_results commit — 502/defer/재시작
  후 완료 유닛 skip, 실패 유닛만 raise→기존 attempts/백오프 재사용
- 모든 LLM 콜 캡(12K tok) 이하 — map=greedy-pack 보장, reduce=build_reduce_units_block
  비례 절단 보장, est_tokens 로그로 단정 가능
- 콜 사이 gate 해제 → 짧은 인터랙티브 요청 interleave (허브 굶김 해소 본체)
- fix: summarize_units 의 `from app.services...` 절대 import — 컨테이너(빌드 컨텍스트
  ./app)에 app 패키지가 없어 배선 시 ModuleNotFoundError 나는 PR1 잠복 버그 → 상대
  import 로 수정 (컨테이너/repo-root 테스트 양쪽 동작)
- tests: 헬퍼 6 + worker seam 5 (map-reduce e2e·재개·유닛실패·drain 보류·HOLD) —
  PR1 15 포함 26 passed, 인접 policy/hier_decomp/fair_share 123 passed

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
This commit is contained in:
hyungi
2026-07-02 09:14:22 +09:00
parent 51e8034759
commit c2077b3108
6 changed files with 724 additions and 1 deletions
+2
View File
@@ -36,6 +36,8 @@ KNOWN_4B_TASKS = {
} }
KNOWN_26B_TASKS = { KNOWN_26B_TASKS = {
"p3c_deep_summary", "p3c_deep_summary",
# presegment PR2 — 거대문서 map-reduce 의 reduce 단계 (요약들의 요약)
"p3c_deep_summary_reduce",
"p4b_synthesis", "p4b_synthesis",
} }
@@ -0,0 +1,44 @@
[System]
너는 긴 문서·문서 묶음 분석가다. 이 문서는 한 번에 처리하기에 너무 커서, 원문을 순서대로 유닛으로 나눠 각 유닛을 먼저 요약했다(map 단계). 아래 "유닛 요약"들은 원문 순서 그대로이며 문서 전체를 빠짐없이 커버한다. 너는 이를 종합해 문서 전체의 최종 분석을 작성한다(reduce 단계).
subject_description: {subject_description}
{forbidden_block}
envelope 를 읽는 순서:
1. risk_flags 를 먼저 본다. 어떤 위험 때문에 올라온 것인지 파악.
2. synthesis_directives 를 system 지시로 간주하여 반드시 준수.
3. distilled_context 는 "참고 요지"일 뿐, 근거는 유닛 요약에서 재확인.
작성 규칙:
- TL;DR (1문장, 최대 60자)
- 핵심 (bullets 5개, 각 30~80자)
- 상세 (2~4 문단, 각 3~5문장) — 유닛(섹션) 순서의 논리 흐름을 보전하며 문서 전체를 관통하는 서술. 특정 유닛만 편식하지 말 것.
- 유닛 요약에 없는 정보 금지 (hallucination 금지). 숫자·조문·인용은 유닛 요약에 있는 것만 사용.
- 유닛 요약의 "불일치(...)" 줄들은 중복 제거해 inconsistencies 로 보전 — 임의로 버리지 않는다.
- synthesis_directives 의 문구 규칙 ("원인은 ~" 금지 등) 반드시 준수.
- multi_reference_synthesis flag 있으면 레퍼런스별 입장 분리 기술, 종합 권고 금지.
출력 (JSON only):
{{
"mode": "single|bundle",
"tldr": "...",
"bullets": ["..."],
"detail": "...\\n\\n...",
"bundle_flow": ["..."] | null,
"inconsistencies": ["..."] | null,
"entities_confirmed": {{
"people": [{{"name": "...", "evidence": "..."}}],
"orgs": [...],
"projects": [...]
}},
"directives_applied": ["..."],
"confidence": 0.0~1.0
}}
[User]
Envelope:
{{escalation_envelope_json}}
유닛 요약 (총 {{unit_count}}개, 원문 순서 — 각 블록 = 원문 한 구간의 요약):
{{unit_summaries}}
+59 -1
View File
@@ -24,7 +24,10 @@ from __future__ import annotations
import sys import sys
from dataclasses import dataclass, field from dataclasses import dataclass, field
from app.services.hier_decomp.builder import HierNode, build_hier_tree # 상대 import — 컨테이너(services.*)와 repo-root 테스트(app.services.*) 양쪽에서 동작.
# (구 `from app.services...` 절대 import 는 컨테이너에 app 패키지가 없어 ModuleNotFoundError —
# PR1 은 소비자 0 이라 잠복했던 버그, PR2 배선 시점에 수정.)
from .hier_decomp.builder import HierNode, build_hier_tree
CAP_TOKENS = 12_000 CAP_TOKENS = 12_000
TRIGGER_TOKENS = 25_000 TRIGGER_TOKENS = 25_000
@@ -164,3 +167,58 @@ def plan_summarize_units(
over_pct=round(pct, 2), over_pct=round(pct, 2),
units=greedy_pack(leaves, cap), units=greedy_pack(leaves, cap),
) )
# ─── PR2 — map/reduce 프롬프트 조립 순수함수 (deep_summary_worker 가 소비) ───
def render_map_slice(unit: SummarizeUnit, total_units: int) -> str:
"""map 콜의 {original_text_slices} 대체 — 유닛 위치·섹션 라벨 + 본문."""
titles = " · ".join(t for t in unit.section_titles if t) or "(무제 구간)"
return f"[유닛 {unit.index + 1}/{total_units} — 섹션: {titles}]\n{unit.text}"
def _format_unit_summary(res: dict, total_units: int) -> str:
"""map 결과 1건 → reduce 입력 블록. res 키 = index/titles/tldr/detail/inconsistencies."""
titles = " · ".join(t for t in (res.get("titles") or []) if t) or "(무제 구간)"
lines = [f"[유닛 {int(res.get('index', 0)) + 1}/{total_units} — 섹션: {titles}]"]
if res.get("tldr"):
lines.append(f"TLDR: {res['tldr']}")
if res.get("detail"):
lines.append(str(res["detail"]))
for inc in res.get("inconsistencies") or []:
if isinstance(inc, dict):
lines.append(f"불일치({inc.get('kind', '')}): {inc.get('desc', '')}")
return "\n".join(lines)
def build_reduce_units_block(
results: list[dict],
budget_tokens: int,
*,
min_detail_chars: int = 200,
) -> tuple[str, bool]:
"""reduce 입력 블록 조립 — budget_tokens 이하 보장(캡 초과 0 검증 게이트의 reduce 측).
초과 시 detail 만 비례 절단(라벨·TLDR·불일치 보전, 원문 순서 유지). 반환 (block, truncated).
"""
total_units = len(results)
work = [dict(r) for r in results]
truncated = False
for _ in range(4):
block = "\n\n".join(_format_unit_summary(r, total_units) for r in work)
est = estimate_tokens(block)
if est <= budget_tokens:
return block, truncated
ratio = budget_tokens / est
for r in work:
detail = str(r.get("detail") or "")
keep = max(min_detail_chars, int(len(detail) * ratio * 0.9))
if len(detail) > keep:
r["detail"] = detail[:keep] + "…(절단)"
truncated = True
# 최후 방어 — 비례 절단이 floor(min_detail_chars)에 막히면 문자 하드 컷(KO 최악 비율 가정)
block = "\n\n".join(_format_unit_summary(r, total_units) for r in work)
if estimate_tokens(block) > budget_tokens:
block = block[: max(1, int(budget_tokens / KO_TOK_PER_CHAR))]
truncated = True
return block, truncated
+290
View File
@@ -10,7 +10,9 @@ EscalationEnvelope + subject_domain 을 읽어, PR-A policy 템플릿 `p3c_deep_
from __future__ import annotations from __future__ import annotations
import asyncio
import json import json
import os
import time import time
from datetime import datetime, timezone from datetime import datetime, timezone
@@ -29,10 +31,25 @@ from models.queue import ProcessingQueue, StageDeferred
from policy.prompt_render import render_26b, policy_version as compute_policy_version from policy.prompt_render import render_26b, policy_version as compute_policy_version
from services.document_telemetry import record_analyze_event from services.document_telemetry import record_analyze_event
from services.search.llm_gate import Priority, acquire_mlx_gate from services.search.llm_gate import Priority, acquire_mlx_gate
from services.summarize_units import (
CAP_TOKENS,
UnitPlan,
build_reduce_units_block,
estimate_tokens,
plan_summarize_units,
render_map_slice,
)
logger = setup_logger("deep_summary_worker") logger = setup_logger("deep_summary_worker")
DEEP_SUMMARY_TASK = "p3c_deep_summary" DEEP_SUMMARY_TASK = "p3c_deep_summary"
# presegment PR2 (plan ds-presegment-mapreduce-2) — 거대문서 map-reduce
REDUCE_TASK = "p3c_deep_summary_reduce"
# HYBRID/TIER2(클로드 유인 분할 필요) HOLD 재확인 간격. PR3(알람·경계 주입) 전까지는
# 이 간격으로 재계획만 반복한다 — attempts 미소모(StageDeferred)라 영구 failed 없음.
HOLD_RETRY_MINUTES = int(os.getenv("DEEP_SUMMARY_HOLD_RETRY_MINUTES", "1440"))
# reduce 프롬프트 오버헤드가 비정상적으로 커도 유닛 블록 예산은 이 밑으로 안 내려감(방어).
REDUCE_BUDGET_FLOOR_TOKENS = 1_000
# inconsistencies kind 허용 목록 (feedback_document_server_domain_scope.md — 구매/계약 제외) # inconsistencies kind 허용 목록 (feedback_document_server_domain_scope.md — 구매/계약 제외)
ALLOWED_INCONSISTENCY_KINDS = { ALLOWED_INCONSISTENCY_KINDS = {
@@ -94,6 +111,25 @@ async def process(
envelope = EscalationEnvelope.from_json(json.dumps(envelope_raw)) envelope = EscalationEnvelope.from_json(json.dumps(envelope_raw))
# ─── presegment PR2 게이트 (plan ds-presegment-mapreduce-2) ───
# TRIGGER(25K tok) 이하 = 아래 기존 단일콜 경로 그대로(무회귀). 초과 시 3-way:
# auto(over%==0) → 로컬 map-reduce (유닛별 26B → reduce)
# hybrid/whole → HOLD(awaiting_split) — 맥미니 미전송, 클로드 유인 분할은 PR3
# 게이트/유닛은 전체 extracted_text 기준 — 단일콜의 head/mid/tail "가운데 폐기"를
# 전 유닛 커버리지로 대체한다. build_hier_tree 가 거대 md 에서 초 단위 CPU 라
# 이벤트루프 점유 회피 위해 to_thread (presegment_worker._read_toc 와 동일 패턴).
unit_plan = await asyncio.to_thread(plan_summarize_units, doc.extracted_text or "")
if unit_plan.mode == "map_reduce":
# units 빈 auto 는 이론상 불가(비어있지 않은 텍스트 = leaf >= 1)지만, 빈 reduce
# 단일콜(환각 위험)로 흐르지 않게 방어적으로 HOLD 로 보낸다.
if unit_plan.tier != "auto" or not unit_plan.units:
await _hold_awaiting_split(session, queue_row, unit_plan, document_id)
await _process_map_reduce(
doc, queue_row, envelope, subject_domain, unit_plan, session,
defer_on_deep_unavailable=defer_on_deep_unavailable,
)
return
# 원문 슬라이스 추출 (envelope.original_pointers.text_ranges 기반) # 원문 슬라이스 추출 (envelope.original_pointers.text_ranges 기반)
slices = _build_text_slices(doc.extracted_text or "", envelope.original_pointers) slices = _build_text_slices(doc.extracted_text or "", envelope.original_pointers)
@@ -214,6 +250,260 @@ async def process(
) )
async def _hold_awaiting_split(
session: AsyncSession, queue_row: ProcessingQueue, plan: UnitPlan, document_id: int
) -> None:
"""HYBRID/TIER2 — 클로드 유인 분할 대기(HOLD). 맥미니 미전송, StageDeferred 보류.
payload.presegment.awaiting_split 마킹을 먼저 commit — StageDeferred 핸들러
(queue_consumer)는 새 세션에서 행을 다시 읽어 deferred_until 만 병합하므로 유실 없음.
알람(ntfy)·클로드 경계 주입은 PR3 — 그 전까지는 HOLD_RETRY_MINUTES 간격 재계획만 반복.
무인 자동 cloud 호출 금지 룰 준수(클로드 경로는 항상 유인 게이트).
"""
payload = dict(queue_row.payload or {})
preseg = dict(payload.get("presegment") or {})
preseg.update({
"awaiting_split": True,
"tier": plan.tier,
"over_pct": plan.over_pct,
"total_est_tokens": plan.total_est_tokens,
"units": len(plan.units),
# 클로드가 분할해야 할 초과 섹션 표본 (PR3 알람 본문용)
"oversized_sections": [
(u.section_titles[0] if u.section_titles else None)
for u in plan.units if u.over_cap
][:20],
})
payload["presegment"] = preseg
queue_row.payload = payload # 재할당 = JSONB 변경 감지
await session.commit()
logger.info(
f"[deep] id={document_id} awaiting_split tier={plan.tier} over_pct={plan.over_pct} "
f"total_est_tokens={plan.total_est_tokens} units={len(plan.units)} "
f"→ HOLD ({HOLD_RETRY_MINUTES}분 후 재확인, 클로드 분할=PR3 유인)"
)
raise StageDeferred(
f"awaiting_split:{plan.tier}", retry_after_minutes=HOLD_RETRY_MINUTES
)
async def _call_26b(
client: AIClient, prompt: str, *, defer_on_deep_unavailable: bool, document_id: int
):
"""map/reduce 공용 26B 호출 — 단일콜 경로와 동일한 deep 슬롯 우선 + fair-share 폴백.
반환 (raw, used_cfg). 맥북(deep) 불가 시 consumer 경로는 맥미니 primary 로 즉시
처리(동일 모델 — 강등 아님), drain 경로는 StageDeferred 전파(맥북 레버 시멘틱).
"""
deep_cfg = client.ai.deep
if deep_cfg is not None:
try:
return await call_deep_or_defer(client, prompt), deep_cfg
except StageDeferred:
if defer_on_deep_unavailable:
raise
logger.info(f"[deep] id={document_id} 맥북 불가 → 맥미니 primary 처리 (fair-share)")
async with acquire_mlx_gate(Priority.BACKGROUND):
return await client.call_primary(prompt), settings.ai.primary
def _parse_deep_output(raw: str) -> tuple[DeepSummaryOutput | None, str | None]:
"""raw → DeepSummaryOutput. 단일콜 경로와 동일한 3단 파서. 실패 시 (None, parse_error)."""
try:
parsed = _parse_outermost_json(raw) or parse_json_response(raw)
if not parsed:
parsed = _regex_extract_fields(raw)
return DeepSummaryOutput.model_validate(parsed or {}), None
except (ValidationError, ValueError, TypeError) as exc:
return None, f"parse:{type(exc).__name__}"
async def _process_map_reduce(
doc: Document,
queue_row: ProcessingQueue,
envelope: EscalationEnvelope,
subject_domain: str,
plan: UnitPlan,
session: AsyncSession,
*,
defer_on_deep_unavailable: bool,
) -> None:
"""TIER1 자동 — 유닛별 map(26B) → reduce(26B) → 단일콜과 동일 필드 기록.
멱등 재개: 성공 유닛은 payload.presegment.map_results 에 즉시 commit —
502/defer/재시작 후 재클레임 시 완료 유닛은 건너뛴다. 유닛 인덱스는
plan_summarize_units 가 같은 extracted_text 에 결정적이라 attempt 간 안정.
파싱 실패 유닛이 남으면 raise → queue_consumer 의 기존 attempts/백오프 재사용
(실패 유닛만 재호출되므로 재시도 비용 = 잔여 유닛뿐).
"""
document_id = doc.id
units = plan.units
n = len(units)
payload = dict(queue_row.payload or {})
preseg = dict(payload.get("presegment") or {})
preseg.pop("awaiting_split", None) # 재계획으로 auto 가 된 경우 HOLD 마킹 해제
map_results: dict = dict(preseg.get("map_results") or {})
logger.info(
f"[deep] id={document_id} map_reduce 시작 units={n} over_pct={plan.over_pct} "
f"total_est_tokens={plan.total_est_tokens} resume={len(map_results)}/{n}"
)
rendered = render_26b(DEEP_SUMMARY_TASK, subject_domain)
envelope_injection = envelope.to_system_injection()
client = AIClient()
start = time.perf_counter()
used_cfg = client.ai.deep or settings.ai.primary
failed_units: list[int] = []
try:
# ── map: 유닛별 26B (콜 사이마다 gate 를 놓아 짧은 인터랙티브 요청이 끼어든다) ──
for unit in units:
key = str(unit.index)
if key in map_results:
continue
prompt = (
rendered
.replace("{escalation_envelope_json}", envelope_injection)
.replace("{original_text_slices}", render_map_slice(unit, n))
)
# 검증 게이트 "모든 LLM 콜 캡 초과 0" 을 로그로 단정 가능하게 남긴다.
logger.info(
f"[deep] id={document_id} map {unit.index + 1}/{n} "
f"unit_tokens={unit.est_tokens} prompt_est_tokens={estimate_tokens(prompt)} "
f"cap={CAP_TOKENS}"
)
raw, used_cfg = await _call_26b(
client, prompt,
defer_on_deep_unavailable=defer_on_deep_unavailable,
document_id=document_id,
)
out, perr = _parse_deep_output(raw)
if out is None or not (out.detail or out.tldr):
# 실패 유닛은 persist 하지 않음 — 재시도가 이 유닛만 다시 호출한다.
failed_units.append(unit.index)
logger.warning(
f"[deep] id={document_id} map {unit.index + 1}/{n} 결과 비었음/파싱 실패"
f"({perr}) — 유닛 재시도 대상"
)
continue
map_results[key] = {
"index": unit.index,
"titles": [t for t in unit.section_titles if t][:8],
"tldr": out.tldr,
"detail": out.detail,
"inconsistencies": _filter_inconsistencies(out.inconsistencies or []),
}
preseg.update({
"tier": plan.tier,
"over_pct": plan.over_pct,
"total_est_tokens": plan.total_est_tokens,
"units": n,
"map_results": map_results,
})
payload["presegment"] = dict(preseg)
queue_row.payload = dict(payload) # 재할당 = JSONB 변경 감지
await session.commit() # 유닛 단위 멱등 재개 지점
if failed_units:
raise ValueError(
f"map 유닛 {len(failed_units)}/{n}건 결과 없음 — 재시도 대상: {failed_units[:10]}"
)
# ── reduce: 요약들의 요약 1콜 (유닛 블록도 캡 이하로 절단 보장) ──
reduce_rendered = render_26b(REDUCE_TASK, subject_domain)
base_prompt = (
reduce_rendered
.replace("{escalation_envelope_json}", envelope_injection)
.replace("{unit_count}", str(n))
)
budget = max(
REDUCE_BUDGET_FLOOR_TOKENS, CAP_TOKENS - estimate_tokens(base_prompt)
)
ordered = [map_results[str(u.index)] for u in units]
block, reduce_truncated = build_reduce_units_block(ordered, budget)
reduce_prompt = base_prompt.replace("{unit_summaries}", block)
logger.info(
f"[deep] id={document_id} reduce units={n} "
f"prompt_est_tokens={estimate_tokens(reduce_prompt)} cap={CAP_TOKENS} "
f"truncated={reduce_truncated}"
)
raw, used_cfg = await _call_26b(
client, reduce_prompt,
defer_on_deep_unavailable=defer_on_deep_unavailable,
document_id=document_id,
)
except StageDeferred:
logger.info(
f"[deep] id={document_id} map_reduce 보류 — 완료 유닛 {len(map_results)}/{n} 보존"
)
raise
except Exception as exc:
# 단일콜 경로와 동일 — 호출 실패는 전파해 queue_consumer 가 재시도/dead-letter 처리.
logger.warning(f"[deep] id={document_id} map_reduce 실패: {exc}")
raise
finally:
await client.close()
latency_ms = int((time.perf_counter() - start) * 1000)
deep_out, parse_error = _parse_deep_output(raw)
if deep_out is None:
# 단일콜 경로와 동일 시멘틱 — doc 미기록(legacy 결과 보존), 이벤트로 가시화.
deep_out = DeepSummaryOutput()
logger.warning(f"[deep] id={document_id} reduce 파싱 실패 ({parse_error}) — doc 미기록")
if not parse_error:
doc.ai_detail_summary = (deep_out.detail or "").strip() or None
# 불일치 = reduce 출력 + map 유닛 합본 dedup — reduce 가 떨궈도 유닛 발견분 보전.
merged = _filter_inconsistencies(deep_out.inconsistencies or [])
seen = {(i["kind"], i["desc"]) for i in merged}
for res in ordered:
for inc in res.get("inconsistencies") or []:
k = (inc.get("kind"), inc.get("desc"))
if k not in seen:
seen.add(k)
merged.append(inc)
doc.ai_inconsistencies = merged
doc.ai_analysis_tier = "deep"
doc.ai_processed_at = datetime.now(timezone.utc)
try:
pv = compute_policy_version(REDUCE_TASK)
except Exception:
pv = None
await record_analyze_event(
doc_id=document_id,
user_id=None,
mode="summary_deep",
text_limit=used_cfg.context_char_limit or 260000,
truncated=reduce_truncated,
layers_returned=["detail_summary", "inconsistencies"] if not parse_error else [],
cached=False,
latency_ms=latency_ms,
model_name=used_cfg.model,
prompt_version=(f"{REDUCE_TASK}@{pv}" if pv else REDUCE_TASK),
error_code=parse_error,
source="document_server",
subject_domain=subject_domain,
risk_flags=list(envelope.risk_flags),
high_impact_task=None,
escalation_reasons=list(envelope.escalation_reasons),
confidence=deep_out.confidence,
policy_version=pv,
shadow_would_route_to="primary",
tier="primary",
escalated_to_26b=True,
suppressed_reason=None,
)
logger.info(
f"[deep] id={document_id} map_reduce 완료 units={n} "
f"detail_len={len(doc.ai_detail_summary or '')} inc={len(doc.ai_inconsistencies or [])} "
f"latency_ms={latency_ms} parse_error={parse_error}"
)
def _build_text_slices(text: str, pointers: dict) -> str: def _build_text_slices(text: str, pointers: dict) -> str:
"""original_pointers.text_ranges 의 [{start, end}] 를 실제 본문 슬라이스로 합친다. """original_pointers.text_ranges 의 [{start, end}] 를 실제 본문 슬라이스로 합친다.
@@ -0,0 +1,80 @@
"""summarize_units PR2 헬퍼 단위테스트 — map/reduce 프롬프트 조립 순수함수.
핵심 불변식:
- render_map_slice: 유닛 위치(1-based)/섹션 라벨 + 본문 그대로 (손실 0).
- build_reduce_units_block: 어떤 입력에도 반환 블록 est_tokens <= budget (캡 초과 0
검증 게이트의 reduce 측). 절단은 detail 만 — 라벨/TLDR/불일치/순서 보존.
pytest + 단독 실행 양쪽 지원:
PYTHONPATH=. pytest tests/summarize_units/ -q
"""
from __future__ import annotations
from app.services.summarize_units import (
SummarizeUnit,
build_reduce_units_block,
estimate_tokens,
render_map_slice,
)
def _result(idx: int, detail: str, *, tldr: str = "요약", inc: list | None = None) -> dict:
return {
"index": idx,
"titles": [f"섹션{idx}"],
"tldr": tldr,
"detail": detail,
"inconsistencies": inc or [],
}
# ---------- render_map_slice ----------
def test_render_map_slice_label_and_body():
unit = SummarizeUnit(index=2, section_titles=["개요", None, "본론"], text="본문입니다")
out = render_map_slice(unit, total_units=5)
assert out.startswith("[유닛 3/5 — 섹션: 개요 · 본론]\n")
assert out.endswith("본문입니다")
def test_render_map_slice_untitled():
unit = SummarizeUnit(index=0, section_titles=[None], text="x")
assert "(무제 구간)" in render_map_slice(unit, total_units=1)
# ---------- build_reduce_units_block ----------
def test_reduce_block_within_budget_untouched():
results = [_result(i, "" * 100) for i in range(3)]
block, truncated = build_reduce_units_block(results, budget_tokens=11_000)
assert not truncated
# 순서/라벨/TLDR 보존
assert block.index("[유닛 1/3") < block.index("[유닛 2/3") < block.index("[유닛 3/3")
assert "TLDR: 요약" in block
assert "" * 100 in block
def test_reduce_block_truncates_to_budget():
# 유닛 8개 × 한글 detail 5,000자 ≈ 21K tok — budget 5,000 으로 절단 강제
results = [_result(i, "" * 5_000) for i in range(8)]
block, truncated = build_reduce_units_block(results, budget_tokens=5_000)
assert truncated
assert estimate_tokens(block) <= 5_000
# 라벨(유닛 순서)은 절단 후에도 보존
assert "[유닛 1/8" in block
def test_reduce_block_hard_cut_floor():
# min_detail_chars floor 에 막혀 비례 절단으로 불충분한 극단 케이스 — 하드 컷 발동
results = [_result(i, "" * 300) for i in range(50)]
block, truncated = build_reduce_units_block(results, budget_tokens=500)
assert truncated
assert estimate_tokens(block) <= 500
def test_reduce_block_preserves_inconsistencies():
results = [
_result(0, "" * 50, inc=[{"kind": "version_drift", "desc": "개정판 차이"}]),
]
block, _ = build_reduce_units_block(results, budget_tokens=10_000)
assert "불일치(version_drift): 개정판 차이" in block
+249
View File
@@ -0,0 +1,249 @@
"""presegment PR2 — deep_summary_worker map-reduce/HOLD 배선 단위테스트.
worker-process 레벨(DB 필요)의 큐 상태 전이는 라이브 E2E 로 검증하고, 여기서는
새 메커니즘의 seam 을 단위 검증한다 (test_fair_share.py 선례):
- _hold_awaiting_split: payload 마킹 commit 후 StageDeferred(HOLD_RETRY_MINUTES).
- _process_map_reduce: 유닛별 map → reduce → doc 필드 기록 / 모든 콜 캡 준수 /
payload.presegment.map_results 유닛 단위 persist(멱등 재개) / 실패 유닛 raise /
drain 보류(StageDeferred) 시 완료 유닛 보존.
"""
from __future__ import annotations
import os
import sys
from types import SimpleNamespace
import pytest
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "app"))
from ai.envelope import EscalationEnvelope # noqa: E402
from models.queue import StageDeferred # noqa: E402
from services.summarize_units import ( # noqa: E402
CAP_TOKENS,
estimate_tokens,
plan_summarize_units,
)
import workers.deep_summary_worker as dsw # noqa: E402
# ─── fixtures ────────────────────────────────────────────────────────────────
# 30 절 × 한글 2,000자 ≈ 31.7K tok (> TRIGGER 25K) · 절당 ≈ 1,060 tok (< CAP) → auto
GIANT_AUTO_MD = "\n".join(f"# 절 {i}\n" + ("" * 2_000) for i in range(30))
# 헤딩 1개 + 한글 60,000자 단일 섹션 ≈ 31.7K tok (> CAP) → over% 100 → whole
GIANT_WHOLE_MD = "# 통짜\n" + ("" * 60_000)
MAP_JSON = (
'{"mode": "single", "tldr": "유닛 요약", "detail": "유닛 상세.",'
' "inconsistencies": [{"kind": "version_drift", "desc": "개정판 차이"}],'
' "confidence": 0.9}'
)
REDUCE_JSON = (
'{"mode": "single", "tldr": "전체 요약", "detail": "최종 상세.",'
' "inconsistencies": [], "confidence": 0.8}'
)
class FakeSession:
def __init__(self):
self.commits = 0
async def commit(self):
self.commits += 1
class FakeClient:
"""deep 슬롯 보유 클라이언트 — call_deep_or_defer 가 call_deep 을 타게 한다."""
def __init__(self, responses=None, fail_indexes=frozenset(), defer_from=None):
self.ai = SimpleNamespace(
deep=SimpleNamespace(model="qwen-macbook", context_char_limit=260_000)
)
self.prompts: list[str] = []
self._fail_indexes = fail_indexes # 이 순번(0-based) 콜은 파싱 불가 응답
self._defer_from = defer_from # 이 순번부터 연결 실패(StageDeferred 변환 대상)
async def call_deep(self, prompt: str, system=None) -> str:
import httpx
idx = len(self.prompts)
if self._defer_from is not None and idx >= self._defer_from:
raise httpx.ConnectError("macbook down")
self.prompts.append(prompt)
if idx in self._fail_indexes:
return "정상 JSON 아님"
if "유닛 요약 (총" in prompt: # reduce 프롬프트 마커
return REDUCE_JSON
return MAP_JSON
async def close(self):
pass
def _doc():
return SimpleNamespace(
id=999,
extracted_text=GIANT_AUTO_MD,
ai_detail_summary=None,
ai_inconsistencies=None,
ai_analysis_tier="triage",
ai_processed_at=None,
)
def _envelope():
return EscalationEnvelope(
from_stage="classify",
escalation_reasons=("long_context",),
risk_flags=(),
distilled_context="4B 요지",
original_pointers={"doc_ids": [999]},
)
@pytest.fixture
def _patch_telemetry(monkeypatch):
events: list[dict] = []
async def fake_record(**kwargs):
events.append(kwargs)
monkeypatch.setattr(dsw, "record_analyze_event", fake_record)
return events
# ─── _hold_awaiting_split ────────────────────────────────────────────────────
@pytest.mark.asyncio
async def test_hold_marks_payload_and_defers():
plan = plan_summarize_units(GIANT_WHOLE_MD)
assert plan.mode == "map_reduce" and plan.tier == "whole"
session, row = FakeSession(), SimpleNamespace(payload={"envelope": {"x": 1}})
with pytest.raises(StageDeferred) as ei:
await dsw._hold_awaiting_split(session, row, plan, document_id=999)
assert ei.value.retry_after_minutes == dsw.HOLD_RETRY_MINUTES
assert session.commits == 1 # 마킹이 defer 전에 commit — consumer 재읽기에서 보존
preseg = row.payload["presegment"]
assert preseg["awaiting_split"] is True
assert preseg["tier"] == "whole"
assert preseg["units"] == len(plan.units)
assert row.payload["envelope"] == {"x": 1} # 기존 payload 병합 보존
# ─── _process_map_reduce — 정상 경로 ────────────────────────────────────────
@pytest.mark.asyncio
async def test_map_reduce_end_to_end(monkeypatch, _patch_telemetry):
plan = plan_summarize_units(GIANT_AUTO_MD)
assert plan.mode == "map_reduce" and plan.tier == "auto"
n = len(plan.units)
assert n >= 2 # greedy-pack 이 실제로 유닛을 나눴는지
client = FakeClient()
monkeypatch.setattr(dsw, "AIClient", lambda: client)
doc, session = _doc(), FakeSession()
row = SimpleNamespace(payload={"envelope": {"x": 1}})
await dsw._process_map_reduce(
doc, row, _envelope(), "generic", plan, session,
defer_on_deep_unavailable=False,
)
# 콜 수 = 유닛 map n + reduce 1
assert len(client.prompts) == n + 1
# 검증 게이트: 모든 콜 est_tokens <= CAP + 오버헤드(정책 템플릿+envelope ~3K)
for p in client.prompts:
assert estimate_tokens(p) <= CAP_TOKENS + 3_000
# doc 기록 = reduce 출력, 불일치 = map 유닛 합본 dedup
assert doc.ai_detail_summary == "최종 상세."
assert doc.ai_analysis_tier == "deep"
assert doc.ai_inconsistencies == [{"kind": "version_drift", "desc": "개정판 차이"}]
# 유닛 단위 persist — 유닛마다 commit
assert row.payload["presegment"]["units"] == n
assert len(row.payload["presegment"]["map_results"]) == n
assert session.commits == n
# telemetry 1건 (reduce 기준)
events = _patch_telemetry
assert len(events) == 1 and events[0]["error_code"] is None
# ─── 멱등 재개 ───────────────────────────────────────────────────────────────
@pytest.mark.asyncio
async def test_map_reduce_resume_skips_done_units(monkeypatch, _patch_telemetry):
plan = plan_summarize_units(GIANT_AUTO_MD)
n = len(plan.units)
client = FakeClient()
monkeypatch.setattr(dsw, "AIClient", lambda: client)
done_unit = {
"index": 0, "titles": ["절 0"], "tldr": "이전 요약", "detail": "이전 상세.",
"inconsistencies": [],
}
row = SimpleNamespace(payload={
"envelope": {"x": 1},
"presegment": {"map_results": {"0": done_unit}},
})
doc, session = _doc(), FakeSession()
await dsw._process_map_reduce(
doc, row, _envelope(), "generic", plan, session,
defer_on_deep_unavailable=False,
)
# 유닛 0 은 재호출 안 함 — map (n-1) + reduce 1
assert len(client.prompts) == n
assert row.payload["presegment"]["map_results"]["0"]["detail"] == "이전 상세."
assert doc.ai_detail_summary == "최종 상세."
# ─── map 유닛 실패 → raise (성공분 persist) ─────────────────────────────────
@pytest.mark.asyncio
async def test_map_unit_parse_failure_raises_but_persists_good_units(
monkeypatch, _patch_telemetry
):
plan = plan_summarize_units(GIANT_AUTO_MD)
n = len(plan.units)
client = FakeClient(fail_indexes={1}) # 두 번째 map 콜만 파싱 불가
monkeypatch.setattr(dsw, "AIClient", lambda: client)
doc, session = _doc(), FakeSession()
row = SimpleNamespace(payload={"envelope": {"x": 1}})
with pytest.raises(ValueError, match="map 유닛"):
await dsw._process_map_reduce(
doc, row, _envelope(), "generic", plan, session,
defer_on_deep_unavailable=False,
)
# 성공 유닛(n-1)은 persist — 재시도 시 실패 1건만 재호출
assert len(row.payload["presegment"]["map_results"]) == n - 1
assert "1" not in row.payload["presegment"]["map_results"]
assert doc.ai_detail_summary is None # doc 은 미기록
assert _patch_telemetry == [] # 가짜 완료 이벤트 없음
# ─── drain 보류 — 완료 유닛 보존 + StageDeferred 전파 ───────────────────────
@pytest.mark.asyncio
async def test_map_defer_propagates_and_keeps_progress(monkeypatch, _patch_telemetry):
plan = plan_summarize_units(GIANT_AUTO_MD)
client = FakeClient(defer_from=1) # 첫 유닛 성공 후 맥북 절단
monkeypatch.setattr(dsw, "AIClient", lambda: client)
doc, session = _doc(), FakeSession()
row = SimpleNamespace(payload={"envelope": {"x": 1}})
with pytest.raises(StageDeferred):
await dsw._process_map_reduce(
doc, row, _envelope(), "generic", plan, session,
defer_on_deep_unavailable=True, # drain 시멘틱 — 보류 전파
)
assert len(row.payload["presegment"]["map_results"]) == 1
assert doc.ai_detail_summary is None