33427d4a42
A. services/alerts.py 신설 — send_alert(title, message): ALERT_WEBHOOK_URL 미설정=no-op(프로세스당 1회 INFO), ALERT_WEBHOOK_KIND synochat(기본)|ntfy, httpx 5s, 실패=WARNING만(절대 raise 금지). deep_summary HOLD/override 거부 시 발화 — 문서 id·제목·tier·over%·토큰· 초과 섹션 상위3·재개 예정시각·유인 분할 힌트. dedupe=payload.presegment .alerted_at(7일) — 매 24h 재보류마다 재알람 방지. B. units_override 재개 — payload.presegment.units_override 존재 시 tier 재판정·HOLD 없이 (start,end,title) 문자 오프셋 경계로 유닛 구성 후 기존 PR2 map-reduce 그대로(유닛 단위 멱등 commit·reduce·doc 기록). 방어: source_len 불일치·형식 오류·유닛 추정토큰 > CAP*1.1 이면 실패 대신 재-HOLD + 알람(잘못된 override 의 900s 콜 재생산 차단). override 없는 문서는 기존 경로 무회귀. summarize_units 에 공용 순수함수 추가: choose_override_source(md_content 우선)·validate_override_boundaries(단조·비중첩·범위·커버리지 90%·유닛 캡)· units_from_boundaries·leaf_spans + greedy_pack 유닛에 leaf_indexes 기록 (export CLI 스팬 계산용). plan ds-presegment-mapreduce-2 / env DEEP_SUMMARY_HOLD_RETRY_MINUTES 유지. Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
831 lines
35 KiB
Python
831 lines
35 KiB
Python
"""PR-B B-1 Deep Summary 워커 — 26B (primary MLX) 에스컬레이션 분석.
|
|
|
|
큐 stage 'deep_summary' 에서 pickup. classify_worker 가 enqueue 시 payload 로 실은
|
|
EscalationEnvelope + subject_domain 을 읽어, PR-A policy 템플릿 `p3c_deep_summary` 를
|
|
렌더링한 뒤 26B primary 를 호출한다. llm_gate Semaphore(1) 경유 — MLX 단일 인퍼런스 보호.
|
|
|
|
출력을 documents.ai_detail_summary / ai_inconsistencies 에 저장하고 ai_analysis_tier 를
|
|
'deep' 으로 전이. 실패는 무해하게 legacy 결과 보존.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import json
|
|
import os
|
|
import time
|
|
from datetime import datetime, timedelta, timezone
|
|
|
|
from pydantic import BaseModel, Field, ValidationError
|
|
from sqlalchemy import desc, select
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
import json
|
|
import re
|
|
from ai.client import AIClient, call_deep_or_defer, parse_json_response, strip_thinking
|
|
from ai.envelope import EscalationEnvelope
|
|
from core.config import settings
|
|
from core.utils import setup_logger
|
|
from models.document import Document
|
|
from models.queue import ProcessingQueue, StageDeferred
|
|
from policy.prompt_render import render_26b, policy_version as compute_policy_version
|
|
from services.alerts import send_alert
|
|
from services.document_telemetry import record_analyze_event
|
|
from services.search.llm_gate import Priority, acquire_mlx_gate
|
|
from services.summarize_units import (
|
|
CAP_TOKENS,
|
|
UnitPlan,
|
|
build_reduce_units_block,
|
|
choose_override_source,
|
|
estimate_tokens,
|
|
plan_summarize_units,
|
|
render_map_slice,
|
|
units_from_boundaries,
|
|
validate_override_boundaries,
|
|
)
|
|
|
|
logger = setup_logger("deep_summary_worker")
|
|
|
|
DEEP_SUMMARY_TASK = "p3c_deep_summary"
|
|
# presegment PR2 (plan ds-presegment-mapreduce-2) — 거대문서 map-reduce
|
|
REDUCE_TASK = "p3c_deep_summary_reduce"
|
|
# HYBRID/TIER2(클로드 유인 분할 필요) HOLD 재확인 간격 — attempts 미소모(StageDeferred)라
|
|
# 영구 failed 없음. PR3: HOLD 시 웹훅 알람 + units_override 주입 시 즉시 재개.
|
|
HOLD_RETRY_MINUTES = int(os.getenv("DEEP_SUMMARY_HOLD_RETRY_MINUTES", "1440"))
|
|
# HOLD 알람 dedupe — payload.presegment.alerted_at 이 이 일수 이내면 재발화 억제
|
|
# (매 24h 재보류마다 재알람 방지). apply CLI 가 override 기록 시 alerted_at 을 지워
|
|
# 다음 이벤트(예: override 거부)는 신선하게 발화된다.
|
|
ALERT_DEDUPE_DAYS = 7
|
|
# units_override 방어 캡 슬랙 — apply 게이트(CAP)보다 10% 여유. 초과 유닛은 실패
|
|
# 대신 재-HOLD + 알람 (잘못된 override 가 900s 콜을 재생산하는 것 차단).
|
|
OVERRIDE_CAP_SLACK = 1.1
|
|
# reduce 프롬프트 오버헤드가 비정상적으로 커도 유닛 블록 예산은 이 밑으로 안 내려감(방어).
|
|
REDUCE_BUDGET_FLOOR_TOKENS = 1_000
|
|
|
|
# inconsistencies kind 허용 목록 (feedback_document_server_domain_scope.md — 구매/계약 제외)
|
|
ALLOWED_INCONSISTENCY_KINDS = {
|
|
"version_drift", "procedure_conflict", "source_conflict", "missing_basis",
|
|
}
|
|
|
|
|
|
class DeepSummaryOutput(BaseModel):
|
|
"""p3c_deep_summary (26B) 응답 스키마. 파싱 실패 시 기본값 + skip."""
|
|
|
|
mode: str = "single"
|
|
tldr: str = ""
|
|
bullets: list[str] = Field(default_factory=list)
|
|
detail: str = "" # 26B 가 채우는 상세 (detail_summary 로 저장)
|
|
bundle_flow: list[str] | None = None
|
|
inconsistencies: list[dict] | None = None
|
|
entities_confirmed: dict | None = None
|
|
directives_applied: list[str] | None = None
|
|
confidence: float = 0.5
|
|
|
|
|
|
async def process(
|
|
document_id: int, session: AsyncSession, *, defer_on_deep_unavailable: bool = False
|
|
) -> None:
|
|
"""deep_summary 큐 pickup → LLM 호출 → 필드 저장.
|
|
|
|
defer_on_deep_unavailable:
|
|
False (기본, consumer 경로) = 맥북(deep 슬롯) 우선 시도, 불가 시 즉시
|
|
맥미니 primary 로 처리. 2026-06-12 fair-share: 양 머신이 동일 모델
|
|
(Qwen3.6-27B-6bit)이라 폴백 = 품질 강등이 아니라 단순 분배.
|
|
True (queue_drain 전용) = 맥북 불가를 StageDeferred 로 올려 drain 이
|
|
보류 후 run 을 멈춘다 (drain = 맥북 분담 전용 레버 시멘틱 유지).
|
|
"""
|
|
doc = await session.get(Document, document_id)
|
|
if not doc:
|
|
raise ValueError(f"deep_summary: document id={document_id} 없음")
|
|
|
|
# 최신 deep_summary 큐 행의 payload 조회 (queue_consumer 가 status='processing' 으로 세팅한 상태)
|
|
queue_row = (await session.execute(
|
|
select(ProcessingQueue)
|
|
.where(
|
|
ProcessingQueue.document_id == document_id,
|
|
ProcessingQueue.stage == "deep_summary",
|
|
ProcessingQueue.status == "processing",
|
|
)
|
|
.order_by(desc(ProcessingQueue.id))
|
|
.limit(1)
|
|
)).scalar_one_or_none()
|
|
if not queue_row:
|
|
logger.warning(f"[deep] processing 상태의 deep_summary row 없음 id={document_id}")
|
|
return
|
|
|
|
payload = queue_row.payload or {}
|
|
envelope_raw = payload.get("envelope")
|
|
subject_domain = payload.get("subject_domain") or "generic"
|
|
if not envelope_raw:
|
|
logger.error(f"[deep] envelope 없음 id={document_id} payload_keys={list(payload.keys())}")
|
|
raise ValueError("deep_summary payload 에 envelope 없음")
|
|
|
|
envelope = EscalationEnvelope.from_json(json.dumps(envelope_raw))
|
|
|
|
# ─── presegment PR3 — units_override 재개 경로 (유인 분할 경계 주입) ───
|
|
# apply CLI(scripts/presegment_attended.py) 가 payload.presegment.units_override 를
|
|
# 기록한 문서는 tier 재판정·HOLD 없이 그 경계로 유닛을 구성해 기존 PR2
|
|
# map-reduce 경로를 그대로 탄다. override 없는 문서는 아래 기존 경로와 바이트 동일.
|
|
if (payload.get("presegment") or {}).get("units_override"):
|
|
await _process_units_override(
|
|
doc, queue_row, envelope, subject_domain, session,
|
|
defer_on_deep_unavailable=defer_on_deep_unavailable,
|
|
)
|
|
return
|
|
|
|
# ─── presegment PR2 게이트 (plan ds-presegment-mapreduce-2) ───
|
|
# TRIGGER(25K tok) 이하 = 아래 기존 단일콜 경로 그대로(무회귀). 초과 시 3-way:
|
|
# auto(over%==0) → 로컬 map-reduce (유닛별 26B → reduce)
|
|
# hybrid/whole → HOLD(awaiting_split) — 맥미니 미전송, 클로드 유인 분할은 PR3
|
|
# 게이트/유닛은 전체 extracted_text 기준 — 단일콜의 head/mid/tail "가운데 폐기"를
|
|
# 전 유닛 커버리지로 대체한다. build_hier_tree 가 거대 md 에서 초 단위 CPU 라
|
|
# 이벤트루프 점유 회피 위해 to_thread (presegment_worker._read_toc 와 동일 패턴).
|
|
unit_plan = await asyncio.to_thread(plan_summarize_units, doc.extracted_text or "")
|
|
if unit_plan.mode == "map_reduce":
|
|
# units 빈 auto 는 이론상 불가(비어있지 않은 텍스트 = leaf >= 1)지만, 빈 reduce
|
|
# 단일콜(환각 위험)로 흐르지 않게 방어적으로 HOLD 로 보낸다.
|
|
if unit_plan.tier != "auto" or not unit_plan.units:
|
|
await _hold_awaiting_split(
|
|
session, queue_row, unit_plan, document_id, doc_title=doc.title
|
|
)
|
|
await _process_map_reduce(
|
|
doc, queue_row, envelope, subject_domain, unit_plan, session,
|
|
defer_on_deep_unavailable=defer_on_deep_unavailable,
|
|
)
|
|
return
|
|
|
|
# 원문 슬라이스 추출 (envelope.original_pointers.text_ranges 기반)
|
|
slices = _build_text_slices(doc.extracted_text or "", envelope.original_pointers)
|
|
|
|
# PR-A 템플릿 렌더
|
|
try:
|
|
rendered = render_26b(DEEP_SUMMARY_TASK, subject_domain)
|
|
except Exception as exc:
|
|
logger.exception(f"[deep] render_26b 실패 subject={subject_domain}: {exc}")
|
|
raise
|
|
|
|
prompt = (
|
|
rendered
|
|
.replace("{escalation_envelope_json}", envelope.to_system_injection())
|
|
.replace("{original_text_slices}", slices)
|
|
)
|
|
|
|
client = AIClient()
|
|
# ds-macbook-offload-1: deep 슬롯 구성 시 맥북 M5 Max 경유(라우터). 부재 시 기존 경로 그대로.
|
|
deep_cfg = client.ai.deep
|
|
used_cfg = deep_cfg or settings.ai.primary
|
|
latency_ms = 0
|
|
parse_error: str | None = None
|
|
deep_out = DeepSummaryOutput()
|
|
|
|
try:
|
|
start = time.perf_counter()
|
|
if deep_cfg is not None:
|
|
# 맥북 우선 — 맥미니 mlx gate 미점유(별 endpoint). doc 쓰기는 완주+파싱
|
|
# 후에만 일어나므로 어느 시점에 끊겨도 부분 쓰기 0.
|
|
try:
|
|
raw = await call_deep_or_defer(client, prompt)
|
|
except StageDeferred:
|
|
if defer_on_deep_unavailable:
|
|
raise # drain 전용 — 맥북 레버 시멘틱 (보류 후 run 종료)
|
|
# consumer 경로: 동일 모델이라 강등 아님 — 맥미니가 즉시 처리 (2026-06-12)
|
|
logger.info(
|
|
f"[deep] id={document_id} 맥북 불가 → 맥미니 primary 처리 (fair-share)"
|
|
)
|
|
used_cfg = settings.ai.primary
|
|
async with acquire_mlx_gate(Priority.BACKGROUND):
|
|
raw = await client.call_primary(prompt)
|
|
else:
|
|
async with acquire_mlx_gate(Priority.BACKGROUND): # 2026-05-17 B-1: classify-escalate worker
|
|
raw = await client.call_primary(prompt)
|
|
latency_ms = int((time.perf_counter() - start) * 1000)
|
|
except StageDeferred:
|
|
# 보류는 실패가 아님 — analyze_event 미기록(가짜 완료 방지), drain 이 백오프 기록.
|
|
logger.info(f"[deep] id={document_id} 맥북 일시 불가 — 보류 (deferred)")
|
|
raise
|
|
except Exception as exc:
|
|
# 호출 실패(네트워크/API 5xx 등)는 삼키지 않고 전파 (R3) — queue_consumer 가
|
|
# attempts 소진까지 재시도 후 status=failed(dead-letter)로 가시화한다. 삼키면
|
|
# worker_fn 이 정상 반환 → 큐가 completed 로 확정 → ai_detail_summary 영구 누락 +
|
|
# tier 가 triage 에 고착(silent 영구 손실). extract/marker/fulltext/stt 정본과 일치.
|
|
# 완주 전 doc 쓰기(168~)는 일어나지 않으므로 부분 쓰기 0 (sleep-안전).
|
|
logger.warning(f"[deep] 호출 실패 id={document_id} model={used_cfg.model}: {exc}")
|
|
raise
|
|
finally:
|
|
await client.close()
|
|
|
|
if raw:
|
|
try:
|
|
# parse_json_response 는 중첩 JSON (entities_confirmed) 을 최외곽으로 오인하는
|
|
# 케이스가 있어 — deep_summary 응답에서 자주 발생 — 최외곽 추출 전용 helper 사용.
|
|
parsed = _parse_outermost_json(raw) or parse_json_response(raw)
|
|
if not parsed:
|
|
# 잘린 응답 fallback — field-level regex 로 detail/tldr/inconsistencies 추출
|
|
parsed = _regex_extract_fields(raw)
|
|
deep_out = DeepSummaryOutput.model_validate(parsed or {})
|
|
if not deep_out.detail and parsed and parsed.get("_fallback"):
|
|
logger.info(f"[deep] id={document_id} regex fallback parsed keys={list(parsed.keys())}")
|
|
except (ValidationError, ValueError, TypeError) as exc:
|
|
parse_error = f"parse:{type(exc).__name__}"
|
|
logger.warning(f"[deep] JSON 파싱/검증 실패 id={document_id}: {exc}")
|
|
|
|
if not parse_error:
|
|
doc.ai_detail_summary = (deep_out.detail or "").strip() or None
|
|
doc.ai_inconsistencies = _filter_inconsistencies(deep_out.inconsistencies or [])
|
|
doc.ai_analysis_tier = "deep"
|
|
doc.ai_processed_at = datetime.now(timezone.utc)
|
|
|
|
try:
|
|
pv = compute_policy_version(DEEP_SUMMARY_TASK)
|
|
except Exception:
|
|
pv = None
|
|
|
|
await record_analyze_event(
|
|
doc_id=document_id,
|
|
user_id=None,
|
|
mode="summary_deep",
|
|
text_limit=used_cfg.context_char_limit or 260000,
|
|
truncated=False,
|
|
layers_returned=["detail_summary", "inconsistencies"] if not parse_error else [],
|
|
cached=False,
|
|
latency_ms=latency_ms,
|
|
# deep 슬롯 사용 시 실처리 모델(qwen-macbook alias) 기록 — 어느 머신이 처리했는지 추적
|
|
model_name=used_cfg.model,
|
|
prompt_version=(f"{DEEP_SUMMARY_TASK}@{pv}" if pv else DEEP_SUMMARY_TASK),
|
|
error_code=parse_error,
|
|
source="document_server",
|
|
subject_domain=subject_domain,
|
|
risk_flags=list(envelope.risk_flags),
|
|
high_impact_task=None,
|
|
escalation_reasons=list(envelope.escalation_reasons),
|
|
confidence=deep_out.confidence,
|
|
policy_version=pv,
|
|
shadow_would_route_to="primary",
|
|
tier="primary",
|
|
escalated_to_26b=True,
|
|
suppressed_reason=None,
|
|
)
|
|
|
|
logger.info(
|
|
f"[deep] id={document_id} subject={subject_domain} "
|
|
f"detail_len={len(doc.ai_detail_summary or '')} "
|
|
f"inc={len(doc.ai_inconsistencies or [])} latency_ms={latency_ms} "
|
|
f"parse_error={parse_error}"
|
|
)
|
|
|
|
|
|
def _hold_alert_due(preseg: dict, now: datetime) -> bool:
|
|
"""HOLD 알람 dedupe — alerted_at 이 없거나 ALERT_DEDUPE_DAYS 초과 시에만 발화."""
|
|
ts = preseg.get("alerted_at")
|
|
if not ts:
|
|
return True
|
|
try:
|
|
prev = datetime.fromisoformat(str(ts))
|
|
except ValueError:
|
|
return True # 깨진 타임스탬프 = 기록 신뢰 불가 → 발화하고 재기록
|
|
if prev.tzinfo is None:
|
|
prev = prev.replace(tzinfo=timezone.utc)
|
|
return (now - prev) >= timedelta(days=ALERT_DEDUPE_DAYS)
|
|
|
|
|
|
async def _hold_awaiting_split(
|
|
session: AsyncSession,
|
|
queue_row: ProcessingQueue,
|
|
plan: UnitPlan,
|
|
document_id: int,
|
|
doc_title: str | None = None,
|
|
) -> None:
|
|
"""HYBRID/TIER2 — 클로드 유인 분할 대기(HOLD). 맥미니 미전송, StageDeferred 보류.
|
|
|
|
payload.presegment.awaiting_split 마킹을 먼저 commit — StageDeferred 핸들러
|
|
(queue_consumer)는 새 세션에서 행을 다시 읽어 deferred_until 만 병합하므로 유실 없음.
|
|
PR3: 유인 전환 게이트 웹훅 알람 발화(alerted_at dedupe — 매 24h 재보류마다 재알람 방지).
|
|
무인 자동 cloud 호출 금지 룰 준수(클로드 경로는 항상 유인 게이트).
|
|
"""
|
|
payload = dict(queue_row.payload or {})
|
|
preseg = dict(payload.get("presegment") or {})
|
|
now = datetime.now(timezone.utc)
|
|
alert_due = _hold_alert_due(preseg, now)
|
|
oversized = [
|
|
(u.section_titles[0] if u.section_titles else None)
|
|
for u in plan.units if u.over_cap
|
|
][:20]
|
|
preseg.update({
|
|
"awaiting_split": True,
|
|
"tier": plan.tier,
|
|
"over_pct": plan.over_pct,
|
|
"total_est_tokens": plan.total_est_tokens,
|
|
"units": len(plan.units),
|
|
# 클로드가 분할해야 할 초과 섹션 표본 (알람 본문 + export CLI 안내용)
|
|
"oversized_sections": oversized,
|
|
})
|
|
if alert_due:
|
|
preseg["alerted_at"] = now.isoformat()
|
|
payload["presegment"] = preseg
|
|
queue_row.payload = payload # 재할당 = JSONB 변경 감지
|
|
await session.commit()
|
|
logger.info(
|
|
f"[deep] id={document_id} awaiting_split tier={plan.tier} over_pct={plan.over_pct} "
|
|
f"total_est_tokens={plan.total_est_tokens} units={len(plan.units)} "
|
|
f"→ HOLD ({HOLD_RETRY_MINUTES}분 후 재확인, alert={'발화' if alert_due else 'dedupe'})"
|
|
)
|
|
if alert_due:
|
|
# commit 이후 발화 — 알람이 5s 행에 걸려도 payload 마킹은 이미 영속.
|
|
resume_at = now + timedelta(minutes=HOLD_RETRY_MINUTES)
|
|
top3 = ", ".join(t for t in oversized[:3] if t) or "(제목 없음)"
|
|
await send_alert(
|
|
f"[DS] deep_summary HOLD — doc {document_id} 유인 분할 필요",
|
|
(
|
|
f"문서: {doc_title or '(제목 없음)'} (id={document_id})\n"
|
|
f"tier={plan.tier} / over%={plan.over_pct} / "
|
|
f"total_est_tokens={plan.total_est_tokens:,} / units={len(plan.units)}\n"
|
|
f"초과 섹션(상위 3): {top3}\n"
|
|
f"재개 예정(UTC): {resume_at.isoformat(timespec='minutes')}\n"
|
|
f"유인 분할: scripts/presegment_attended.py export --doc {document_id}"
|
|
),
|
|
)
|
|
raise StageDeferred(
|
|
f"awaiting_split:{plan.tier}", retry_after_minutes=HOLD_RETRY_MINUTES
|
|
)
|
|
|
|
|
|
async def _rehold_bad_override(
|
|
session: AsyncSession, queue_row: ProcessingQueue, doc: Document, reason: str
|
|
) -> None:
|
|
"""잘못된 units_override — 실패 대신 재-HOLD + 알람 (900s 콜 재생산 차단).
|
|
|
|
units_override 는 payload 에 보존(사람이 원인 조사) + override_rejected 사유 기록.
|
|
apply CLI 가 alerted_at 을 지워두므로 첫 거부는 즉시 발화되고, 이후 24h 재보류
|
|
루프는 ALERT_DEDUPE_DAYS dedupe 로 억제된다.
|
|
"""
|
|
document_id = doc.id
|
|
payload = dict(queue_row.payload or {})
|
|
preseg = dict(payload.get("presegment") or {})
|
|
now = datetime.now(timezone.utc)
|
|
alert_due = _hold_alert_due(preseg, now)
|
|
preseg.update({
|
|
"awaiting_split": True,
|
|
"override_rejected": reason,
|
|
"override_rejected_at": now.isoformat(),
|
|
})
|
|
if alert_due:
|
|
preseg["alerted_at"] = now.isoformat()
|
|
payload["presegment"] = preseg
|
|
queue_row.payload = payload # 재할당 = JSONB 변경 감지
|
|
await session.commit()
|
|
logger.warning(f"[deep] id={document_id} units_override 거부 → 재-HOLD: {reason}")
|
|
if alert_due:
|
|
resume_at = now + timedelta(minutes=HOLD_RETRY_MINUTES)
|
|
await send_alert(
|
|
f"[DS] deep_summary 유인 분할 경계 거부 — doc {document_id}",
|
|
(
|
|
f"문서: {doc.title or '(제목 없음)'} (id={document_id})\n"
|
|
f"사유: {reason}\n"
|
|
f"재개 예정(UTC): {resume_at.isoformat(timespec='minutes')}\n"
|
|
f"수정: scripts/presegment_attended.py export --doc {document_id} "
|
|
f"→ apply --doc {document_id} --boundaries FILE"
|
|
),
|
|
)
|
|
raise StageDeferred(
|
|
f"override_rejected:{reason[:80]}", retry_after_minutes=HOLD_RETRY_MINUTES
|
|
)
|
|
|
|
|
|
async def _process_units_override(
|
|
doc: Document,
|
|
queue_row: ProcessingQueue,
|
|
envelope: EscalationEnvelope,
|
|
subject_domain: str,
|
|
session: AsyncSession,
|
|
*,
|
|
defer_on_deep_unavailable: bool,
|
|
) -> None:
|
|
"""PR3 — apply CLI 가 기록한 유인 분할 경계로 map-reduce 재개.
|
|
|
|
경계 = units_override.source 텍스트의 (start, end) 문자 오프셋. 방어 검증
|
|
(source_len 일치·단조·비중첩·범위·유닛 캡*1.1) 실패 시 재-HOLD + 알람 —
|
|
apply 이후 본문이 재생성됐거나 수기 주입이 깨진 경우 900s 콜로 흐르지 않는다.
|
|
통과 시 기존 PR2 _process_map_reduce 를 그대로 탄다(맵 결과 유닛 단위 commit·
|
|
reduce·ai_detail_summary 기록 — 유닛 index 는 payload 박제 경계 서수라 안정).
|
|
"""
|
|
document_id = doc.id
|
|
preseg = dict((queue_row.payload or {}).get("presegment") or {})
|
|
override = preseg.get("units_override")
|
|
if isinstance(override, (list, tuple)):
|
|
# 수기 주입 호환 — bare [(start,end,title)] 리스트도 허용
|
|
override = {"boundaries": list(override)}
|
|
if not isinstance(override, dict):
|
|
await _rehold_bad_override(
|
|
session, queue_row, doc, f"units_override 형식 오류 (type={type(override).__name__})"
|
|
)
|
|
|
|
source = override.get("source")
|
|
if source is None:
|
|
source, text_src = choose_override_source(doc.md_content, doc.extracted_text)
|
|
elif source in ("md_content", "extracted_text"):
|
|
text_src = (doc.md_content if source == "md_content" else doc.extracted_text) or ""
|
|
else:
|
|
await _rehold_bad_override(
|
|
session, queue_row, doc, f"units_override.source={source!r} 미지원"
|
|
)
|
|
|
|
expected_len = override.get("source_len")
|
|
if expected_len is not None and expected_len != len(text_src):
|
|
await _rehold_bad_override(
|
|
session, queue_row, doc,
|
|
f"source_len 불일치 — override={expected_len:,} vs 현재 {source}={len(text_src):,}"
|
|
" (본문 재생성됨 — export 부터 재실행)",
|
|
)
|
|
|
|
check = validate_override_boundaries(
|
|
text_src,
|
|
override.get("boundaries") or [],
|
|
cap=int(CAP_TOKENS * OVERRIDE_CAP_SLACK),
|
|
min_coverage_pct=0.0, # 커버리지 품질 게이트는 apply CLI 가 이미 통과시킴
|
|
)
|
|
if not check.ok:
|
|
await _rehold_bad_override(session, queue_row, doc, "; ".join(check.errors[:5]))
|
|
|
|
units = units_from_boundaries(text_src, check.boundaries)
|
|
plan = UnitPlan(
|
|
mode="map_reduce",
|
|
tier="override",
|
|
total_est_tokens=estimate_tokens(text_src),
|
|
over_pct=float(preseg.get("over_pct") or 0.0),
|
|
units=units,
|
|
)
|
|
logger.info(
|
|
f"[deep] id={document_id} units_override 재개 — source={source} units={len(units)} "
|
|
f"coverage={check.coverage_pct}% max_unit_tokens={max(check.unit_tokens, default=0)}"
|
|
)
|
|
await _process_map_reduce(
|
|
doc, queue_row, envelope, subject_domain, plan, session,
|
|
defer_on_deep_unavailable=defer_on_deep_unavailable,
|
|
)
|
|
|
|
|
|
async def _call_26b(
|
|
client: AIClient, prompt: str, *, defer_on_deep_unavailable: bool, document_id: int
|
|
):
|
|
"""map/reduce 공용 26B 호출 — 단일콜 경로와 동일한 deep 슬롯 우선 + fair-share 폴백.
|
|
|
|
반환 (raw, used_cfg). 맥북(deep) 불가 시 consumer 경로는 맥미니 primary 로 즉시
|
|
처리(동일 모델 — 강등 아님), drain 경로는 StageDeferred 전파(맥북 레버 시멘틱).
|
|
"""
|
|
deep_cfg = client.ai.deep
|
|
if deep_cfg is not None:
|
|
try:
|
|
return await call_deep_or_defer(client, prompt), deep_cfg
|
|
except StageDeferred:
|
|
if defer_on_deep_unavailable:
|
|
raise
|
|
logger.info(f"[deep] id={document_id} 맥북 불가 → 맥미니 primary 처리 (fair-share)")
|
|
async with acquire_mlx_gate(Priority.BACKGROUND):
|
|
return await client.call_primary(prompt), settings.ai.primary
|
|
|
|
|
|
def _parse_deep_output(raw: str) -> tuple[DeepSummaryOutput | None, str | None]:
|
|
"""raw → DeepSummaryOutput. 단일콜 경로와 동일한 3단 파서. 실패 시 (None, parse_error)."""
|
|
try:
|
|
parsed = _parse_outermost_json(raw) or parse_json_response(raw)
|
|
if not parsed:
|
|
parsed = _regex_extract_fields(raw)
|
|
return DeepSummaryOutput.model_validate(parsed or {}), None
|
|
except (ValidationError, ValueError, TypeError) as exc:
|
|
return None, f"parse:{type(exc).__name__}"
|
|
|
|
|
|
async def _process_map_reduce(
|
|
doc: Document,
|
|
queue_row: ProcessingQueue,
|
|
envelope: EscalationEnvelope,
|
|
subject_domain: str,
|
|
plan: UnitPlan,
|
|
session: AsyncSession,
|
|
*,
|
|
defer_on_deep_unavailable: bool,
|
|
) -> None:
|
|
"""TIER1 자동 — 유닛별 map(26B) → reduce(26B) → 단일콜과 동일 필드 기록.
|
|
|
|
멱등 재개: 성공 유닛은 payload.presegment.map_results 에 즉시 commit —
|
|
502/defer/재시작 후 재클레임 시 완료 유닛은 건너뛴다. 유닛 인덱스는
|
|
plan_summarize_units 가 같은 extracted_text 에 결정적이라 attempt 간 안정.
|
|
파싱 실패 유닛이 남으면 raise → queue_consumer 의 기존 attempts/백오프 재사용
|
|
(실패 유닛만 재호출되므로 재시도 비용 = 잔여 유닛뿐).
|
|
"""
|
|
document_id = doc.id
|
|
units = plan.units
|
|
n = len(units)
|
|
payload = dict(queue_row.payload or {})
|
|
preseg = dict(payload.get("presegment") or {})
|
|
preseg.pop("awaiting_split", None) # 재계획으로 auto 가 된 경우 HOLD 마킹 해제
|
|
map_results: dict = dict(preseg.get("map_results") or {})
|
|
|
|
logger.info(
|
|
f"[deep] id={document_id} map_reduce 시작 units={n} over_pct={plan.over_pct} "
|
|
f"total_est_tokens={plan.total_est_tokens} resume={len(map_results)}/{n}"
|
|
)
|
|
|
|
rendered = render_26b(DEEP_SUMMARY_TASK, subject_domain)
|
|
envelope_injection = envelope.to_system_injection()
|
|
|
|
client = AIClient()
|
|
start = time.perf_counter()
|
|
used_cfg = client.ai.deep or settings.ai.primary
|
|
failed_units: list[int] = []
|
|
try:
|
|
# ── map: 유닛별 26B (콜 사이마다 gate 를 놓아 짧은 인터랙티브 요청이 끼어든다) ──
|
|
for unit in units:
|
|
key = str(unit.index)
|
|
if key in map_results:
|
|
continue
|
|
prompt = (
|
|
rendered
|
|
.replace("{escalation_envelope_json}", envelope_injection)
|
|
.replace("{original_text_slices}", render_map_slice(unit, n))
|
|
)
|
|
# 검증 게이트 "모든 LLM 콜 캡 초과 0" 을 로그로 단정 가능하게 남긴다.
|
|
logger.info(
|
|
f"[deep] id={document_id} map {unit.index + 1}/{n} "
|
|
f"unit_tokens={unit.est_tokens} prompt_est_tokens={estimate_tokens(prompt)} "
|
|
f"cap={CAP_TOKENS}"
|
|
)
|
|
raw, used_cfg = await _call_26b(
|
|
client, prompt,
|
|
defer_on_deep_unavailable=defer_on_deep_unavailable,
|
|
document_id=document_id,
|
|
)
|
|
out, perr = _parse_deep_output(raw)
|
|
if out is None or not (out.detail or out.tldr):
|
|
# 실패 유닛은 persist 하지 않음 — 재시도가 이 유닛만 다시 호출한다.
|
|
failed_units.append(unit.index)
|
|
logger.warning(
|
|
f"[deep] id={document_id} map {unit.index + 1}/{n} 결과 비었음/파싱 실패"
|
|
f"({perr}) — 유닛 재시도 대상"
|
|
)
|
|
continue
|
|
# ★매 유닛 새 dict 로 재구성 (in-place 변경 금지) — 직전 commit 의 committed
|
|
# 스냅샷이 같은 중첩 객체를 참조하면 old==new 로 보여 SQLAlchemy 가 UPDATE 를
|
|
# 스킵한다(60254 라이브에서 unit 0 만 persist 된 aliasing 버그의 fix).
|
|
map_results = {
|
|
**map_results,
|
|
key: {
|
|
"index": unit.index,
|
|
"titles": [t for t in unit.section_titles if t][:8],
|
|
"tldr": out.tldr,
|
|
"detail": out.detail,
|
|
"inconsistencies": _filter_inconsistencies(out.inconsistencies or []),
|
|
},
|
|
}
|
|
preseg = {
|
|
**preseg,
|
|
"tier": plan.tier,
|
|
"over_pct": plan.over_pct,
|
|
"total_est_tokens": plan.total_est_tokens,
|
|
"units": n,
|
|
"map_results": map_results,
|
|
}
|
|
payload = {**payload, "presegment": preseg}
|
|
queue_row.payload = payload # 재할당 = JSONB 변경 감지
|
|
await session.commit() # 유닛 단위 멱등 재개 지점
|
|
|
|
if failed_units:
|
|
raise ValueError(
|
|
f"map 유닛 {len(failed_units)}/{n}건 결과 없음 — 재시도 대상: {failed_units[:10]}"
|
|
)
|
|
|
|
# ── reduce: 요약들의 요약 1콜 (유닛 블록도 캡 이하로 절단 보장) ──
|
|
reduce_rendered = render_26b(REDUCE_TASK, subject_domain)
|
|
base_prompt = (
|
|
reduce_rendered
|
|
.replace("{escalation_envelope_json}", envelope_injection)
|
|
.replace("{unit_count}", str(n))
|
|
)
|
|
budget = max(
|
|
REDUCE_BUDGET_FLOOR_TOKENS, CAP_TOKENS - estimate_tokens(base_prompt)
|
|
)
|
|
ordered = [map_results[str(u.index)] for u in units]
|
|
block, reduce_truncated = build_reduce_units_block(ordered, budget)
|
|
reduce_prompt = base_prompt.replace("{unit_summaries}", block)
|
|
logger.info(
|
|
f"[deep] id={document_id} reduce units={n} "
|
|
f"prompt_est_tokens={estimate_tokens(reduce_prompt)} cap={CAP_TOKENS} "
|
|
f"truncated={reduce_truncated}"
|
|
)
|
|
raw, used_cfg = await _call_26b(
|
|
client, reduce_prompt,
|
|
defer_on_deep_unavailable=defer_on_deep_unavailable,
|
|
document_id=document_id,
|
|
)
|
|
except StageDeferred:
|
|
logger.info(
|
|
f"[deep] id={document_id} map_reduce 보류 — 완료 유닛 {len(map_results)}/{n} 보존"
|
|
)
|
|
raise
|
|
except Exception as exc:
|
|
# 단일콜 경로와 동일 — 호출 실패는 전파해 queue_consumer 가 재시도/dead-letter 처리.
|
|
logger.warning(f"[deep] id={document_id} map_reduce 실패: {exc}")
|
|
raise
|
|
finally:
|
|
await client.close()
|
|
|
|
latency_ms = int((time.perf_counter() - start) * 1000)
|
|
deep_out, parse_error = _parse_deep_output(raw)
|
|
if deep_out is None:
|
|
# 단일콜 경로와 동일 시멘틱 — doc 미기록(legacy 결과 보존), 이벤트로 가시화.
|
|
deep_out = DeepSummaryOutput()
|
|
logger.warning(f"[deep] id={document_id} reduce 파싱 실패 ({parse_error}) — doc 미기록")
|
|
|
|
if not parse_error:
|
|
doc.ai_detail_summary = (deep_out.detail or "").strip() or None
|
|
# 불일치 = reduce 출력 + map 유닛 합본 dedup — reduce 가 떨궈도 유닛 발견분 보전.
|
|
merged = _filter_inconsistencies(deep_out.inconsistencies or [])
|
|
seen = {(i["kind"], i["desc"]) for i in merged}
|
|
for res in ordered:
|
|
for inc in res.get("inconsistencies") or []:
|
|
k = (inc.get("kind"), inc.get("desc"))
|
|
if k not in seen:
|
|
seen.add(k)
|
|
merged.append(inc)
|
|
doc.ai_inconsistencies = merged
|
|
doc.ai_analysis_tier = "deep"
|
|
doc.ai_processed_at = datetime.now(timezone.utc)
|
|
|
|
try:
|
|
pv = compute_policy_version(REDUCE_TASK)
|
|
except Exception:
|
|
pv = None
|
|
|
|
await record_analyze_event(
|
|
doc_id=document_id,
|
|
user_id=None,
|
|
mode="summary_deep",
|
|
text_limit=used_cfg.context_char_limit or 260000,
|
|
truncated=reduce_truncated,
|
|
layers_returned=["detail_summary", "inconsistencies"] if not parse_error else [],
|
|
cached=False,
|
|
latency_ms=latency_ms,
|
|
model_name=used_cfg.model,
|
|
prompt_version=(f"{REDUCE_TASK}@{pv}" if pv else REDUCE_TASK),
|
|
error_code=parse_error,
|
|
source="document_server",
|
|
subject_domain=subject_domain,
|
|
risk_flags=list(envelope.risk_flags),
|
|
high_impact_task=None,
|
|
escalation_reasons=list(envelope.escalation_reasons),
|
|
confidence=deep_out.confidence,
|
|
policy_version=pv,
|
|
shadow_would_route_to="primary",
|
|
tier="primary",
|
|
escalated_to_26b=True,
|
|
suppressed_reason=None,
|
|
)
|
|
|
|
logger.info(
|
|
f"[deep] id={document_id} map_reduce 완료 units={n} "
|
|
f"detail_len={len(doc.ai_detail_summary or '')} inc={len(doc.ai_inconsistencies or [])} "
|
|
f"latency_ms={latency_ms} parse_error={parse_error}"
|
|
)
|
|
|
|
|
|
def _build_text_slices(text: str, pointers: dict) -> str:
|
|
"""original_pointers.text_ranges 의 [{start, end}] 를 실제 본문 슬라이스로 합친다.
|
|
|
|
3조각 이상이면 각 조각 앞에 [slice N — head/middle/tail] 라벨을 붙여 26B 가 순서 인지.
|
|
단일 슬라이스면 라벨 없이 원문 그대로.
|
|
"""
|
|
ranges = (pointers or {}).get("text_ranges") or []
|
|
if not ranges:
|
|
return text[:260_000]
|
|
|
|
if len(ranges) == 1:
|
|
r = ranges[0]
|
|
return text[r.get("start", 0):r.get("end", len(text))]
|
|
|
|
labels = ["head", "middle", "tail"]
|
|
parts: list[str] = []
|
|
for idx, r in enumerate(ranges):
|
|
label = labels[idx] if idx < len(labels) else f"slice{idx}"
|
|
chunk = text[r.get("start", 0):r.get("end", len(text))]
|
|
parts.append(f"[slice {idx} — {label}]\n{chunk}")
|
|
return "\n\n".join(parts)
|
|
|
|
|
|
def _parse_outermost_json(raw: str) -> dict | None:
|
|
"""Response 의 첫 '{' 부터 brace balance 로 최외곽 JSON 추출.
|
|
|
|
parse_json_response 의 re.finditer 패턴이 1단계 중첩까지만 매치해서 deep_summary
|
|
응답처럼 `entities_confirmed: {...}` 2단계 중첩이 포함된 경우 최외곽 대신 내부
|
|
객체만 반환되는 문제를 우회. 또한 응답이 잘려 closure `}` 가 없으면 강제로
|
|
`}` 추가 시도하여 부분 파싱.
|
|
"""
|
|
cleaned = strip_thinking(raw)
|
|
code_match = re.search(r"```(?:json)?\s*(\{.*)", cleaned, re.DOTALL)
|
|
if code_match:
|
|
cleaned = code_match.group(1)
|
|
start = cleaned.find("{")
|
|
if start < 0:
|
|
return None
|
|
depth = 0
|
|
end = -1
|
|
in_str = False
|
|
esc = False
|
|
for i in range(start, len(cleaned)):
|
|
ch = cleaned[i]
|
|
if esc:
|
|
esc = False
|
|
continue
|
|
if ch == "\\":
|
|
esc = True
|
|
continue
|
|
if ch == '"':
|
|
in_str = not in_str
|
|
continue
|
|
if in_str:
|
|
continue
|
|
if ch == "{":
|
|
depth += 1
|
|
elif ch == "}":
|
|
depth -= 1
|
|
if depth == 0:
|
|
end = i + 1
|
|
break
|
|
if end > 0:
|
|
try:
|
|
return json.loads(cleaned[start:end])
|
|
except json.JSONDecodeError:
|
|
pass
|
|
# 응답 잘림 — 남은 depth 만큼 `}` 보강 후 재시도
|
|
if depth > 0:
|
|
candidate = cleaned[start:].rstrip().rstrip(",") + ("}" * depth)
|
|
try:
|
|
return json.loads(candidate)
|
|
except json.JSONDecodeError:
|
|
pass
|
|
return None
|
|
|
|
|
|
def _regex_extract_fields(raw: str) -> dict:
|
|
"""JSON parse 실패 시 field-level regex 로 detail/tldr/mode/inconsistencies 추출.
|
|
|
|
응답이 잘렸거나 중간에 문자열이 끊긴 경우에도 앞쪽에 완결된 필드는 건진다.
|
|
`"detail": "…"` 처럼 key-value 쌍을 개별 매칭.
|
|
"""
|
|
def _str_field(key: str) -> str | None:
|
|
m = re.search(rf'"{key}"\s*:\s*"((?:[^"\\]|\\.)*)"', raw, re.DOTALL)
|
|
if not m:
|
|
return None
|
|
try:
|
|
# JSON string escape 복원 (\n, \\, \" 등)
|
|
return json.loads('"' + m.group(1) + '"')
|
|
except json.JSONDecodeError:
|
|
return m.group(1)
|
|
|
|
def _arr_field(key: str) -> list | None:
|
|
# 단순 문자열 배열만 지원 — bullets / inconsistencies_desc 등
|
|
m = re.search(rf'"{key}"\s*:\s*(\[[^\]]*\])', raw, re.DOTALL)
|
|
if not m:
|
|
return None
|
|
try:
|
|
return json.loads(m.group(1))
|
|
except json.JSONDecodeError:
|
|
return None
|
|
|
|
out: dict = {"_fallback": True}
|
|
mode = _str_field("mode")
|
|
if mode:
|
|
out["mode"] = mode
|
|
tldr = _str_field("tldr")
|
|
if tldr:
|
|
out["tldr"] = tldr
|
|
detail = _str_field("detail")
|
|
if detail:
|
|
out["detail"] = detail
|
|
bullets = _arr_field("bullets")
|
|
if bullets is not None:
|
|
out["bullets"] = bullets
|
|
inc = _arr_field("inconsistencies")
|
|
if inc is not None:
|
|
out["inconsistencies"] = inc
|
|
return out
|
|
|
|
|
|
def _filter_inconsistencies(items: list) -> list[dict]:
|
|
"""허용 kind 목록 (safety/news 도메인 한정) 만 통과시킨다.
|
|
|
|
`amount_mismatch` 같은 구매/계약 kind 는 여기서 drop (feedback_document_server_domain_scope.md).
|
|
구조 오류 (kind/desc 누락) 도 drop.
|
|
"""
|
|
out: list[dict] = []
|
|
for it in items or []:
|
|
if not isinstance(it, dict):
|
|
continue
|
|
kind = str(it.get("kind") or "")
|
|
desc_ = str(it.get("desc") or "").strip()
|
|
if kind not in ALLOWED_INCONSISTENCY_KINDS:
|
|
continue
|
|
if not desc_:
|
|
continue
|
|
out.append({"kind": kind, "desc": desc_})
|
|
return out
|