Files
hyungi_document_server/app/workers/deep_summary_worker.py
T
hyungi 33427d4a42 feat(presegment): PR3 A+B — HOLD 웹훅 알람 + units_override 재개 경로
A. services/alerts.py 신설 — send_alert(title, message):
   ALERT_WEBHOOK_URL 미설정=no-op(프로세스당 1회 INFO), ALERT_WEBHOOK_KIND
   synochat(기본)|ntfy, httpx 5s, 실패=WARNING만(절대 raise 금지).
   deep_summary HOLD/override 거부 시 발화 — 문서 id·제목·tier·over%·토큰·
   초과 섹션 상위3·재개 예정시각·유인 분할 힌트. dedupe=payload.presegment
   .alerted_at(7일) — 매 24h 재보류마다 재알람 방지.

B. units_override 재개 — payload.presegment.units_override 존재 시 tier
   재판정·HOLD 없이 (start,end,title) 문자 오프셋 경계로 유닛 구성 후 기존
   PR2 map-reduce 그대로(유닛 단위 멱등 commit·reduce·doc 기록). 방어:
   source_len 불일치·형식 오류·유닛 추정토큰 > CAP*1.1 이면 실패 대신
   재-HOLD + 알람(잘못된 override 의 900s 콜 재생산 차단). override 없는
   문서는 기존 경로 무회귀.

summarize_units 에 공용 순수함수 추가: choose_override_source(md_content
우선)·validate_override_boundaries(단조·비중첩·범위·커버리지 90%·유닛 캡)·
units_from_boundaries·leaf_spans + greedy_pack 유닛에 leaf_indexes 기록
(export CLI 스팬 계산용).

plan ds-presegment-mapreduce-2 / env DEEP_SUMMARY_HOLD_RETRY_MINUTES 유지.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-07-03 05:54:02 +09:00

831 lines
35 KiB
Python

"""PR-B B-1 Deep Summary 워커 — 26B (primary MLX) 에스컬레이션 분석.
큐 stage 'deep_summary' 에서 pickup. classify_worker 가 enqueue 시 payload 로 실은
EscalationEnvelope + subject_domain 을 읽어, PR-A policy 템플릿 `p3c_deep_summary` 를
렌더링한 뒤 26B primary 를 호출한다. llm_gate Semaphore(1) 경유 — MLX 단일 인퍼런스 보호.
출력을 documents.ai_detail_summary / ai_inconsistencies 에 저장하고 ai_analysis_tier 를
'deep' 으로 전이. 실패는 무해하게 legacy 결과 보존.
"""
from __future__ import annotations
import asyncio
import json
import os
import time
from datetime import datetime, timedelta, timezone
from pydantic import BaseModel, Field, ValidationError
from sqlalchemy import desc, select
from sqlalchemy.ext.asyncio import AsyncSession
import json
import re
from ai.client import AIClient, call_deep_or_defer, parse_json_response, strip_thinking
from ai.envelope import EscalationEnvelope
from core.config import settings
from core.utils import setup_logger
from models.document import Document
from models.queue import ProcessingQueue, StageDeferred
from policy.prompt_render import render_26b, policy_version as compute_policy_version
from services.alerts import send_alert
from services.document_telemetry import record_analyze_event
from services.search.llm_gate import Priority, acquire_mlx_gate
from services.summarize_units import (
CAP_TOKENS,
UnitPlan,
build_reduce_units_block,
choose_override_source,
estimate_tokens,
plan_summarize_units,
render_map_slice,
units_from_boundaries,
validate_override_boundaries,
)
logger = setup_logger("deep_summary_worker")
DEEP_SUMMARY_TASK = "p3c_deep_summary"
# presegment PR2 (plan ds-presegment-mapreduce-2) — 거대문서 map-reduce
REDUCE_TASK = "p3c_deep_summary_reduce"
# HYBRID/TIER2(클로드 유인 분할 필요) HOLD 재확인 간격 — attempts 미소모(StageDeferred)라
# 영구 failed 없음. PR3: HOLD 시 웹훅 알람 + units_override 주입 시 즉시 재개.
HOLD_RETRY_MINUTES = int(os.getenv("DEEP_SUMMARY_HOLD_RETRY_MINUTES", "1440"))
# HOLD 알람 dedupe — payload.presegment.alerted_at 이 이 일수 이내면 재발화 억제
# (매 24h 재보류마다 재알람 방지). apply CLI 가 override 기록 시 alerted_at 을 지워
# 다음 이벤트(예: override 거부)는 신선하게 발화된다.
ALERT_DEDUPE_DAYS = 7
# units_override 방어 캡 슬랙 — apply 게이트(CAP)보다 10% 여유. 초과 유닛은 실패
# 대신 재-HOLD + 알람 (잘못된 override 가 900s 콜을 재생산하는 것 차단).
OVERRIDE_CAP_SLACK = 1.1
# reduce 프롬프트 오버헤드가 비정상적으로 커도 유닛 블록 예산은 이 밑으로 안 내려감(방어).
REDUCE_BUDGET_FLOOR_TOKENS = 1_000
# inconsistencies kind 허용 목록 (feedback_document_server_domain_scope.md — 구매/계약 제외)
ALLOWED_INCONSISTENCY_KINDS = {
"version_drift", "procedure_conflict", "source_conflict", "missing_basis",
}
class DeepSummaryOutput(BaseModel):
"""p3c_deep_summary (26B) 응답 스키마. 파싱 실패 시 기본값 + skip."""
mode: str = "single"
tldr: str = ""
bullets: list[str] = Field(default_factory=list)
detail: str = "" # 26B 가 채우는 상세 (detail_summary 로 저장)
bundle_flow: list[str] | None = None
inconsistencies: list[dict] | None = None
entities_confirmed: dict | None = None
directives_applied: list[str] | None = None
confidence: float = 0.5
async def process(
document_id: int, session: AsyncSession, *, defer_on_deep_unavailable: bool = False
) -> None:
"""deep_summary 큐 pickup → LLM 호출 → 필드 저장.
defer_on_deep_unavailable:
False (기본, consumer 경로) = 맥북(deep 슬롯) 우선 시도, 불가 시 즉시
맥미니 primary 로 처리. 2026-06-12 fair-share: 양 머신이 동일 모델
(Qwen3.6-27B-6bit)이라 폴백 = 품질 강등이 아니라 단순 분배.
True (queue_drain 전용) = 맥북 불가를 StageDeferred 로 올려 drain 이
보류 후 run 을 멈춘다 (drain = 맥북 분담 전용 레버 시멘틱 유지).
"""
doc = await session.get(Document, document_id)
if not doc:
raise ValueError(f"deep_summary: document id={document_id} 없음")
# 최신 deep_summary 큐 행의 payload 조회 (queue_consumer 가 status='processing' 으로 세팅한 상태)
queue_row = (await session.execute(
select(ProcessingQueue)
.where(
ProcessingQueue.document_id == document_id,
ProcessingQueue.stage == "deep_summary",
ProcessingQueue.status == "processing",
)
.order_by(desc(ProcessingQueue.id))
.limit(1)
)).scalar_one_or_none()
if not queue_row:
logger.warning(f"[deep] processing 상태의 deep_summary row 없음 id={document_id}")
return
payload = queue_row.payload or {}
envelope_raw = payload.get("envelope")
subject_domain = payload.get("subject_domain") or "generic"
if not envelope_raw:
logger.error(f"[deep] envelope 없음 id={document_id} payload_keys={list(payload.keys())}")
raise ValueError("deep_summary payload 에 envelope 없음")
envelope = EscalationEnvelope.from_json(json.dumps(envelope_raw))
# ─── presegment PR3 — units_override 재개 경로 (유인 분할 경계 주입) ───
# apply CLI(scripts/presegment_attended.py) 가 payload.presegment.units_override 를
# 기록한 문서는 tier 재판정·HOLD 없이 그 경계로 유닛을 구성해 기존 PR2
# map-reduce 경로를 그대로 탄다. override 없는 문서는 아래 기존 경로와 바이트 동일.
if (payload.get("presegment") or {}).get("units_override"):
await _process_units_override(
doc, queue_row, envelope, subject_domain, session,
defer_on_deep_unavailable=defer_on_deep_unavailable,
)
return
# ─── presegment PR2 게이트 (plan ds-presegment-mapreduce-2) ───
# TRIGGER(25K tok) 이하 = 아래 기존 단일콜 경로 그대로(무회귀). 초과 시 3-way:
# auto(over%==0) → 로컬 map-reduce (유닛별 26B → reduce)
# hybrid/whole → HOLD(awaiting_split) — 맥미니 미전송, 클로드 유인 분할은 PR3
# 게이트/유닛은 전체 extracted_text 기준 — 단일콜의 head/mid/tail "가운데 폐기"를
# 전 유닛 커버리지로 대체한다. build_hier_tree 가 거대 md 에서 초 단위 CPU 라
# 이벤트루프 점유 회피 위해 to_thread (presegment_worker._read_toc 와 동일 패턴).
unit_plan = await asyncio.to_thread(plan_summarize_units, doc.extracted_text or "")
if unit_plan.mode == "map_reduce":
# units 빈 auto 는 이론상 불가(비어있지 않은 텍스트 = leaf >= 1)지만, 빈 reduce
# 단일콜(환각 위험)로 흐르지 않게 방어적으로 HOLD 로 보낸다.
if unit_plan.tier != "auto" or not unit_plan.units:
await _hold_awaiting_split(
session, queue_row, unit_plan, document_id, doc_title=doc.title
)
await _process_map_reduce(
doc, queue_row, envelope, subject_domain, unit_plan, session,
defer_on_deep_unavailable=defer_on_deep_unavailable,
)
return
# 원문 슬라이스 추출 (envelope.original_pointers.text_ranges 기반)
slices = _build_text_slices(doc.extracted_text or "", envelope.original_pointers)
# PR-A 템플릿 렌더
try:
rendered = render_26b(DEEP_SUMMARY_TASK, subject_domain)
except Exception as exc:
logger.exception(f"[deep] render_26b 실패 subject={subject_domain}: {exc}")
raise
prompt = (
rendered
.replace("{escalation_envelope_json}", envelope.to_system_injection())
.replace("{original_text_slices}", slices)
)
client = AIClient()
# ds-macbook-offload-1: deep 슬롯 구성 시 맥북 M5 Max 경유(라우터). 부재 시 기존 경로 그대로.
deep_cfg = client.ai.deep
used_cfg = deep_cfg or settings.ai.primary
latency_ms = 0
parse_error: str | None = None
deep_out = DeepSummaryOutput()
try:
start = time.perf_counter()
if deep_cfg is not None:
# 맥북 우선 — 맥미니 mlx gate 미점유(별 endpoint). doc 쓰기는 완주+파싱
# 후에만 일어나므로 어느 시점에 끊겨도 부분 쓰기 0.
try:
raw = await call_deep_or_defer(client, prompt)
except StageDeferred:
if defer_on_deep_unavailable:
raise # drain 전용 — 맥북 레버 시멘틱 (보류 후 run 종료)
# consumer 경로: 동일 모델이라 강등 아님 — 맥미니가 즉시 처리 (2026-06-12)
logger.info(
f"[deep] id={document_id} 맥북 불가 → 맥미니 primary 처리 (fair-share)"
)
used_cfg = settings.ai.primary
async with acquire_mlx_gate(Priority.BACKGROUND):
raw = await client.call_primary(prompt)
else:
async with acquire_mlx_gate(Priority.BACKGROUND): # 2026-05-17 B-1: classify-escalate worker
raw = await client.call_primary(prompt)
latency_ms = int((time.perf_counter() - start) * 1000)
except StageDeferred:
# 보류는 실패가 아님 — analyze_event 미기록(가짜 완료 방지), drain 이 백오프 기록.
logger.info(f"[deep] id={document_id} 맥북 일시 불가 — 보류 (deferred)")
raise
except Exception as exc:
# 호출 실패(네트워크/API 5xx 등)는 삼키지 않고 전파 (R3) — queue_consumer 가
# attempts 소진까지 재시도 후 status=failed(dead-letter)로 가시화한다. 삼키면
# worker_fn 이 정상 반환 → 큐가 completed 로 확정 → ai_detail_summary 영구 누락 +
# tier 가 triage 에 고착(silent 영구 손실). extract/marker/fulltext/stt 정본과 일치.
# 완주 전 doc 쓰기(168~)는 일어나지 않으므로 부분 쓰기 0 (sleep-안전).
logger.warning(f"[deep] 호출 실패 id={document_id} model={used_cfg.model}: {exc}")
raise
finally:
await client.close()
if raw:
try:
# parse_json_response 는 중첩 JSON (entities_confirmed) 을 최외곽으로 오인하는
# 케이스가 있어 — deep_summary 응답에서 자주 발생 — 최외곽 추출 전용 helper 사용.
parsed = _parse_outermost_json(raw) or parse_json_response(raw)
if not parsed:
# 잘린 응답 fallback — field-level regex 로 detail/tldr/inconsistencies 추출
parsed = _regex_extract_fields(raw)
deep_out = DeepSummaryOutput.model_validate(parsed or {})
if not deep_out.detail and parsed and parsed.get("_fallback"):
logger.info(f"[deep] id={document_id} regex fallback parsed keys={list(parsed.keys())}")
except (ValidationError, ValueError, TypeError) as exc:
parse_error = f"parse:{type(exc).__name__}"
logger.warning(f"[deep] JSON 파싱/검증 실패 id={document_id}: {exc}")
if not parse_error:
doc.ai_detail_summary = (deep_out.detail or "").strip() or None
doc.ai_inconsistencies = _filter_inconsistencies(deep_out.inconsistencies or [])
doc.ai_analysis_tier = "deep"
doc.ai_processed_at = datetime.now(timezone.utc)
try:
pv = compute_policy_version(DEEP_SUMMARY_TASK)
except Exception:
pv = None
await record_analyze_event(
doc_id=document_id,
user_id=None,
mode="summary_deep",
text_limit=used_cfg.context_char_limit or 260000,
truncated=False,
layers_returned=["detail_summary", "inconsistencies"] if not parse_error else [],
cached=False,
latency_ms=latency_ms,
# deep 슬롯 사용 시 실처리 모델(qwen-macbook alias) 기록 — 어느 머신이 처리했는지 추적
model_name=used_cfg.model,
prompt_version=(f"{DEEP_SUMMARY_TASK}@{pv}" if pv else DEEP_SUMMARY_TASK),
error_code=parse_error,
source="document_server",
subject_domain=subject_domain,
risk_flags=list(envelope.risk_flags),
high_impact_task=None,
escalation_reasons=list(envelope.escalation_reasons),
confidence=deep_out.confidence,
policy_version=pv,
shadow_would_route_to="primary",
tier="primary",
escalated_to_26b=True,
suppressed_reason=None,
)
logger.info(
f"[deep] id={document_id} subject={subject_domain} "
f"detail_len={len(doc.ai_detail_summary or '')} "
f"inc={len(doc.ai_inconsistencies or [])} latency_ms={latency_ms} "
f"parse_error={parse_error}"
)
def _hold_alert_due(preseg: dict, now: datetime) -> bool:
"""HOLD 알람 dedupe — alerted_at 이 없거나 ALERT_DEDUPE_DAYS 초과 시에만 발화."""
ts = preseg.get("alerted_at")
if not ts:
return True
try:
prev = datetime.fromisoformat(str(ts))
except ValueError:
return True # 깨진 타임스탬프 = 기록 신뢰 불가 → 발화하고 재기록
if prev.tzinfo is None:
prev = prev.replace(tzinfo=timezone.utc)
return (now - prev) >= timedelta(days=ALERT_DEDUPE_DAYS)
async def _hold_awaiting_split(
session: AsyncSession,
queue_row: ProcessingQueue,
plan: UnitPlan,
document_id: int,
doc_title: str | None = None,
) -> None:
"""HYBRID/TIER2 — 클로드 유인 분할 대기(HOLD). 맥미니 미전송, StageDeferred 보류.
payload.presegment.awaiting_split 마킹을 먼저 commit — StageDeferred 핸들러
(queue_consumer)는 새 세션에서 행을 다시 읽어 deferred_until 만 병합하므로 유실 없음.
PR3: 유인 전환 게이트 웹훅 알람 발화(alerted_at dedupe — 매 24h 재보류마다 재알람 방지).
무인 자동 cloud 호출 금지 룰 준수(클로드 경로는 항상 유인 게이트).
"""
payload = dict(queue_row.payload or {})
preseg = dict(payload.get("presegment") or {})
now = datetime.now(timezone.utc)
alert_due = _hold_alert_due(preseg, now)
oversized = [
(u.section_titles[0] if u.section_titles else None)
for u in plan.units if u.over_cap
][:20]
preseg.update({
"awaiting_split": True,
"tier": plan.tier,
"over_pct": plan.over_pct,
"total_est_tokens": plan.total_est_tokens,
"units": len(plan.units),
# 클로드가 분할해야 할 초과 섹션 표본 (알람 본문 + export CLI 안내용)
"oversized_sections": oversized,
})
if alert_due:
preseg["alerted_at"] = now.isoformat()
payload["presegment"] = preseg
queue_row.payload = payload # 재할당 = JSONB 변경 감지
await session.commit()
logger.info(
f"[deep] id={document_id} awaiting_split tier={plan.tier} over_pct={plan.over_pct} "
f"total_est_tokens={plan.total_est_tokens} units={len(plan.units)} "
f"→ HOLD ({HOLD_RETRY_MINUTES}분 후 재확인, alert={'발화' if alert_due else 'dedupe'})"
)
if alert_due:
# commit 이후 발화 — 알람이 5s 행에 걸려도 payload 마킹은 이미 영속.
resume_at = now + timedelta(minutes=HOLD_RETRY_MINUTES)
top3 = ", ".join(t for t in oversized[:3] if t) or "(제목 없음)"
await send_alert(
f"[DS] deep_summary HOLD — doc {document_id} 유인 분할 필요",
(
f"문서: {doc_title or '(제목 없음)'} (id={document_id})\n"
f"tier={plan.tier} / over%={plan.over_pct} / "
f"total_est_tokens={plan.total_est_tokens:,} / units={len(plan.units)}\n"
f"초과 섹션(상위 3): {top3}\n"
f"재개 예정(UTC): {resume_at.isoformat(timespec='minutes')}\n"
f"유인 분할: scripts/presegment_attended.py export --doc {document_id}"
),
)
raise StageDeferred(
f"awaiting_split:{plan.tier}", retry_after_minutes=HOLD_RETRY_MINUTES
)
async def _rehold_bad_override(
session: AsyncSession, queue_row: ProcessingQueue, doc: Document, reason: str
) -> None:
"""잘못된 units_override — 실패 대신 재-HOLD + 알람 (900s 콜 재생산 차단).
units_override 는 payload 에 보존(사람이 원인 조사) + override_rejected 사유 기록.
apply CLI 가 alerted_at 을 지워두므로 첫 거부는 즉시 발화되고, 이후 24h 재보류
루프는 ALERT_DEDUPE_DAYS dedupe 로 억제된다.
"""
document_id = doc.id
payload = dict(queue_row.payload or {})
preseg = dict(payload.get("presegment") or {})
now = datetime.now(timezone.utc)
alert_due = _hold_alert_due(preseg, now)
preseg.update({
"awaiting_split": True,
"override_rejected": reason,
"override_rejected_at": now.isoformat(),
})
if alert_due:
preseg["alerted_at"] = now.isoformat()
payload["presegment"] = preseg
queue_row.payload = payload # 재할당 = JSONB 변경 감지
await session.commit()
logger.warning(f"[deep] id={document_id} units_override 거부 → 재-HOLD: {reason}")
if alert_due:
resume_at = now + timedelta(minutes=HOLD_RETRY_MINUTES)
await send_alert(
f"[DS] deep_summary 유인 분할 경계 거부 — doc {document_id}",
(
f"문서: {doc.title or '(제목 없음)'} (id={document_id})\n"
f"사유: {reason}\n"
f"재개 예정(UTC): {resume_at.isoformat(timespec='minutes')}\n"
f"수정: scripts/presegment_attended.py export --doc {document_id} "
f"→ apply --doc {document_id} --boundaries FILE"
),
)
raise StageDeferred(
f"override_rejected:{reason[:80]}", retry_after_minutes=HOLD_RETRY_MINUTES
)
async def _process_units_override(
doc: Document,
queue_row: ProcessingQueue,
envelope: EscalationEnvelope,
subject_domain: str,
session: AsyncSession,
*,
defer_on_deep_unavailable: bool,
) -> None:
"""PR3 — apply CLI 가 기록한 유인 분할 경계로 map-reduce 재개.
경계 = units_override.source 텍스트의 (start, end) 문자 오프셋. 방어 검증
(source_len 일치·단조·비중첩·범위·유닛 캡*1.1) 실패 시 재-HOLD + 알람 —
apply 이후 본문이 재생성됐거나 수기 주입이 깨진 경우 900s 콜로 흐르지 않는다.
통과 시 기존 PR2 _process_map_reduce 를 그대로 탄다(맵 결과 유닛 단위 commit·
reduce·ai_detail_summary 기록 — 유닛 index 는 payload 박제 경계 서수라 안정).
"""
document_id = doc.id
preseg = dict((queue_row.payload or {}).get("presegment") or {})
override = preseg.get("units_override")
if isinstance(override, (list, tuple)):
# 수기 주입 호환 — bare [(start,end,title)] 리스트도 허용
override = {"boundaries": list(override)}
if not isinstance(override, dict):
await _rehold_bad_override(
session, queue_row, doc, f"units_override 형식 오류 (type={type(override).__name__})"
)
source = override.get("source")
if source is None:
source, text_src = choose_override_source(doc.md_content, doc.extracted_text)
elif source in ("md_content", "extracted_text"):
text_src = (doc.md_content if source == "md_content" else doc.extracted_text) or ""
else:
await _rehold_bad_override(
session, queue_row, doc, f"units_override.source={source!r} 미지원"
)
expected_len = override.get("source_len")
if expected_len is not None and expected_len != len(text_src):
await _rehold_bad_override(
session, queue_row, doc,
f"source_len 불일치 — override={expected_len:,} vs 현재 {source}={len(text_src):,}"
" (본문 재생성됨 — export 부터 재실행)",
)
check = validate_override_boundaries(
text_src,
override.get("boundaries") or [],
cap=int(CAP_TOKENS * OVERRIDE_CAP_SLACK),
min_coverage_pct=0.0, # 커버리지 품질 게이트는 apply CLI 가 이미 통과시킴
)
if not check.ok:
await _rehold_bad_override(session, queue_row, doc, "; ".join(check.errors[:5]))
units = units_from_boundaries(text_src, check.boundaries)
plan = UnitPlan(
mode="map_reduce",
tier="override",
total_est_tokens=estimate_tokens(text_src),
over_pct=float(preseg.get("over_pct") or 0.0),
units=units,
)
logger.info(
f"[deep] id={document_id} units_override 재개 — source={source} units={len(units)} "
f"coverage={check.coverage_pct}% max_unit_tokens={max(check.unit_tokens, default=0)}"
)
await _process_map_reduce(
doc, queue_row, envelope, subject_domain, plan, session,
defer_on_deep_unavailable=defer_on_deep_unavailable,
)
async def _call_26b(
client: AIClient, prompt: str, *, defer_on_deep_unavailable: bool, document_id: int
):
"""map/reduce 공용 26B 호출 — 단일콜 경로와 동일한 deep 슬롯 우선 + fair-share 폴백.
반환 (raw, used_cfg). 맥북(deep) 불가 시 consumer 경로는 맥미니 primary 로 즉시
처리(동일 모델 — 강등 아님), drain 경로는 StageDeferred 전파(맥북 레버 시멘틱).
"""
deep_cfg = client.ai.deep
if deep_cfg is not None:
try:
return await call_deep_or_defer(client, prompt), deep_cfg
except StageDeferred:
if defer_on_deep_unavailable:
raise
logger.info(f"[deep] id={document_id} 맥북 불가 → 맥미니 primary 처리 (fair-share)")
async with acquire_mlx_gate(Priority.BACKGROUND):
return await client.call_primary(prompt), settings.ai.primary
def _parse_deep_output(raw: str) -> tuple[DeepSummaryOutput | None, str | None]:
"""raw → DeepSummaryOutput. 단일콜 경로와 동일한 3단 파서. 실패 시 (None, parse_error)."""
try:
parsed = _parse_outermost_json(raw) or parse_json_response(raw)
if not parsed:
parsed = _regex_extract_fields(raw)
return DeepSummaryOutput.model_validate(parsed or {}), None
except (ValidationError, ValueError, TypeError) as exc:
return None, f"parse:{type(exc).__name__}"
async def _process_map_reduce(
doc: Document,
queue_row: ProcessingQueue,
envelope: EscalationEnvelope,
subject_domain: str,
plan: UnitPlan,
session: AsyncSession,
*,
defer_on_deep_unavailable: bool,
) -> None:
"""TIER1 자동 — 유닛별 map(26B) → reduce(26B) → 단일콜과 동일 필드 기록.
멱등 재개: 성공 유닛은 payload.presegment.map_results 에 즉시 commit —
502/defer/재시작 후 재클레임 시 완료 유닛은 건너뛴다. 유닛 인덱스는
plan_summarize_units 가 같은 extracted_text 에 결정적이라 attempt 간 안정.
파싱 실패 유닛이 남으면 raise → queue_consumer 의 기존 attempts/백오프 재사용
(실패 유닛만 재호출되므로 재시도 비용 = 잔여 유닛뿐).
"""
document_id = doc.id
units = plan.units
n = len(units)
payload = dict(queue_row.payload or {})
preseg = dict(payload.get("presegment") or {})
preseg.pop("awaiting_split", None) # 재계획으로 auto 가 된 경우 HOLD 마킹 해제
map_results: dict = dict(preseg.get("map_results") or {})
logger.info(
f"[deep] id={document_id} map_reduce 시작 units={n} over_pct={plan.over_pct} "
f"total_est_tokens={plan.total_est_tokens} resume={len(map_results)}/{n}"
)
rendered = render_26b(DEEP_SUMMARY_TASK, subject_domain)
envelope_injection = envelope.to_system_injection()
client = AIClient()
start = time.perf_counter()
used_cfg = client.ai.deep or settings.ai.primary
failed_units: list[int] = []
try:
# ── map: 유닛별 26B (콜 사이마다 gate 를 놓아 짧은 인터랙티브 요청이 끼어든다) ──
for unit in units:
key = str(unit.index)
if key in map_results:
continue
prompt = (
rendered
.replace("{escalation_envelope_json}", envelope_injection)
.replace("{original_text_slices}", render_map_slice(unit, n))
)
# 검증 게이트 "모든 LLM 콜 캡 초과 0" 을 로그로 단정 가능하게 남긴다.
logger.info(
f"[deep] id={document_id} map {unit.index + 1}/{n} "
f"unit_tokens={unit.est_tokens} prompt_est_tokens={estimate_tokens(prompt)} "
f"cap={CAP_TOKENS}"
)
raw, used_cfg = await _call_26b(
client, prompt,
defer_on_deep_unavailable=defer_on_deep_unavailable,
document_id=document_id,
)
out, perr = _parse_deep_output(raw)
if out is None or not (out.detail or out.tldr):
# 실패 유닛은 persist 하지 않음 — 재시도가 이 유닛만 다시 호출한다.
failed_units.append(unit.index)
logger.warning(
f"[deep] id={document_id} map {unit.index + 1}/{n} 결과 비었음/파싱 실패"
f"({perr}) — 유닛 재시도 대상"
)
continue
# ★매 유닛 새 dict 로 재구성 (in-place 변경 금지) — 직전 commit 의 committed
# 스냅샷이 같은 중첩 객체를 참조하면 old==new 로 보여 SQLAlchemy 가 UPDATE 를
# 스킵한다(60254 라이브에서 unit 0 만 persist 된 aliasing 버그의 fix).
map_results = {
**map_results,
key: {
"index": unit.index,
"titles": [t for t in unit.section_titles if t][:8],
"tldr": out.tldr,
"detail": out.detail,
"inconsistencies": _filter_inconsistencies(out.inconsistencies or []),
},
}
preseg = {
**preseg,
"tier": plan.tier,
"over_pct": plan.over_pct,
"total_est_tokens": plan.total_est_tokens,
"units": n,
"map_results": map_results,
}
payload = {**payload, "presegment": preseg}
queue_row.payload = payload # 재할당 = JSONB 변경 감지
await session.commit() # 유닛 단위 멱등 재개 지점
if failed_units:
raise ValueError(
f"map 유닛 {len(failed_units)}/{n}건 결과 없음 — 재시도 대상: {failed_units[:10]}"
)
# ── reduce: 요약들의 요약 1콜 (유닛 블록도 캡 이하로 절단 보장) ──
reduce_rendered = render_26b(REDUCE_TASK, subject_domain)
base_prompt = (
reduce_rendered
.replace("{escalation_envelope_json}", envelope_injection)
.replace("{unit_count}", str(n))
)
budget = max(
REDUCE_BUDGET_FLOOR_TOKENS, CAP_TOKENS - estimate_tokens(base_prompt)
)
ordered = [map_results[str(u.index)] for u in units]
block, reduce_truncated = build_reduce_units_block(ordered, budget)
reduce_prompt = base_prompt.replace("{unit_summaries}", block)
logger.info(
f"[deep] id={document_id} reduce units={n} "
f"prompt_est_tokens={estimate_tokens(reduce_prompt)} cap={CAP_TOKENS} "
f"truncated={reduce_truncated}"
)
raw, used_cfg = await _call_26b(
client, reduce_prompt,
defer_on_deep_unavailable=defer_on_deep_unavailable,
document_id=document_id,
)
except StageDeferred:
logger.info(
f"[deep] id={document_id} map_reduce 보류 — 완료 유닛 {len(map_results)}/{n} 보존"
)
raise
except Exception as exc:
# 단일콜 경로와 동일 — 호출 실패는 전파해 queue_consumer 가 재시도/dead-letter 처리.
logger.warning(f"[deep] id={document_id} map_reduce 실패: {exc}")
raise
finally:
await client.close()
latency_ms = int((time.perf_counter() - start) * 1000)
deep_out, parse_error = _parse_deep_output(raw)
if deep_out is None:
# 단일콜 경로와 동일 시멘틱 — doc 미기록(legacy 결과 보존), 이벤트로 가시화.
deep_out = DeepSummaryOutput()
logger.warning(f"[deep] id={document_id} reduce 파싱 실패 ({parse_error}) — doc 미기록")
if not parse_error:
doc.ai_detail_summary = (deep_out.detail or "").strip() or None
# 불일치 = reduce 출력 + map 유닛 합본 dedup — reduce 가 떨궈도 유닛 발견분 보전.
merged = _filter_inconsistencies(deep_out.inconsistencies or [])
seen = {(i["kind"], i["desc"]) for i in merged}
for res in ordered:
for inc in res.get("inconsistencies") or []:
k = (inc.get("kind"), inc.get("desc"))
if k not in seen:
seen.add(k)
merged.append(inc)
doc.ai_inconsistencies = merged
doc.ai_analysis_tier = "deep"
doc.ai_processed_at = datetime.now(timezone.utc)
try:
pv = compute_policy_version(REDUCE_TASK)
except Exception:
pv = None
await record_analyze_event(
doc_id=document_id,
user_id=None,
mode="summary_deep",
text_limit=used_cfg.context_char_limit or 260000,
truncated=reduce_truncated,
layers_returned=["detail_summary", "inconsistencies"] if not parse_error else [],
cached=False,
latency_ms=latency_ms,
model_name=used_cfg.model,
prompt_version=(f"{REDUCE_TASK}@{pv}" if pv else REDUCE_TASK),
error_code=parse_error,
source="document_server",
subject_domain=subject_domain,
risk_flags=list(envelope.risk_flags),
high_impact_task=None,
escalation_reasons=list(envelope.escalation_reasons),
confidence=deep_out.confidence,
policy_version=pv,
shadow_would_route_to="primary",
tier="primary",
escalated_to_26b=True,
suppressed_reason=None,
)
logger.info(
f"[deep] id={document_id} map_reduce 완료 units={n} "
f"detail_len={len(doc.ai_detail_summary or '')} inc={len(doc.ai_inconsistencies or [])} "
f"latency_ms={latency_ms} parse_error={parse_error}"
)
def _build_text_slices(text: str, pointers: dict) -> str:
"""original_pointers.text_ranges 의 [{start, end}] 를 실제 본문 슬라이스로 합친다.
3조각 이상이면 각 조각 앞에 [slice N — head/middle/tail] 라벨을 붙여 26B 가 순서 인지.
단일 슬라이스면 라벨 없이 원문 그대로.
"""
ranges = (pointers or {}).get("text_ranges") or []
if not ranges:
return text[:260_000]
if len(ranges) == 1:
r = ranges[0]
return text[r.get("start", 0):r.get("end", len(text))]
labels = ["head", "middle", "tail"]
parts: list[str] = []
for idx, r in enumerate(ranges):
label = labels[idx] if idx < len(labels) else f"slice{idx}"
chunk = text[r.get("start", 0):r.get("end", len(text))]
parts.append(f"[slice {idx}{label}]\n{chunk}")
return "\n\n".join(parts)
def _parse_outermost_json(raw: str) -> dict | None:
"""Response 의 첫 '{' 부터 brace balance 로 최외곽 JSON 추출.
parse_json_response 의 re.finditer 패턴이 1단계 중첩까지만 매치해서 deep_summary
응답처럼 `entities_confirmed: {...}` 2단계 중첩이 포함된 경우 최외곽 대신 내부
객체만 반환되는 문제를 우회. 또한 응답이 잘려 closure `}` 가 없으면 강제로
`}` 추가 시도하여 부분 파싱.
"""
cleaned = strip_thinking(raw)
code_match = re.search(r"```(?:json)?\s*(\{.*)", cleaned, re.DOTALL)
if code_match:
cleaned = code_match.group(1)
start = cleaned.find("{")
if start < 0:
return None
depth = 0
end = -1
in_str = False
esc = False
for i in range(start, len(cleaned)):
ch = cleaned[i]
if esc:
esc = False
continue
if ch == "\\":
esc = True
continue
if ch == '"':
in_str = not in_str
continue
if in_str:
continue
if ch == "{":
depth += 1
elif ch == "}":
depth -= 1
if depth == 0:
end = i + 1
break
if end > 0:
try:
return json.loads(cleaned[start:end])
except json.JSONDecodeError:
pass
# 응답 잘림 — 남은 depth 만큼 `}` 보강 후 재시도
if depth > 0:
candidate = cleaned[start:].rstrip().rstrip(",") + ("}" * depth)
try:
return json.loads(candidate)
except json.JSONDecodeError:
pass
return None
def _regex_extract_fields(raw: str) -> dict:
"""JSON parse 실패 시 field-level regex 로 detail/tldr/mode/inconsistencies 추출.
응답이 잘렸거나 중간에 문자열이 끊긴 경우에도 앞쪽에 완결된 필드는 건진다.
`"detail": ""` 처럼 key-value 쌍을 개별 매칭.
"""
def _str_field(key: str) -> str | None:
m = re.search(rf'"{key}"\s*:\s*"((?:[^"\\]|\\.)*)"', raw, re.DOTALL)
if not m:
return None
try:
# JSON string escape 복원 (\n, \\, \" 등)
return json.loads('"' + m.group(1) + '"')
except json.JSONDecodeError:
return m.group(1)
def _arr_field(key: str) -> list | None:
# 단순 문자열 배열만 지원 — bullets / inconsistencies_desc 등
m = re.search(rf'"{key}"\s*:\s*(\[[^\]]*\])', raw, re.DOTALL)
if not m:
return None
try:
return json.loads(m.group(1))
except json.JSONDecodeError:
return None
out: dict = {"_fallback": True}
mode = _str_field("mode")
if mode:
out["mode"] = mode
tldr = _str_field("tldr")
if tldr:
out["tldr"] = tldr
detail = _str_field("detail")
if detail:
out["detail"] = detail
bullets = _arr_field("bullets")
if bullets is not None:
out["bullets"] = bullets
inc = _arr_field("inconsistencies")
if inc is not None:
out["inconsistencies"] = inc
return out
def _filter_inconsistencies(items: list) -> list[dict]:
"""허용 kind 목록 (safety/news 도메인 한정) 만 통과시킨다.
`amount_mismatch` 같은 구매/계약 kind 는 여기서 drop (feedback_document_server_domain_scope.md).
구조 오류 (kind/desc 누락) 도 drop.
"""
out: list[dict] = []
for it in items or []:
if not isinstance(it, dict):
continue
kind = str(it.get("kind") or "")
desc_ = str(it.get("desc") or "").strip()
if kind not in ALLOWED_INCONSISTENCY_KINDS:
continue
if not desc_:
continue
out.append({"kind": kind, "desc": desc_})
return out