Files
hyungi_document_server/app/workers/deep_summary_worker.py
T
Hyungi Ahn 6fdc48e5b6 feat(ai): B-1 summary tier 분할 — triage(4B) + deep_summary(26B)
PR-A policy 레이어를 재사용하여 classify_worker 에 tier triage 경로를 추가.
Legacy ai_summary / ai_domain / ai_suggestion 은 유지 (회귀 0), tldr/bullets/
detail/inconsistencies 는 별도 필드로 분리.

Migrations (156~160):
- 156 documents: ai_tldr, ai_bullets, ai_detail_summary, ai_inconsistencies,
  ai_analysis_tier 5컬럼
- 157 process_stage 에 'deep_summary' ADD VALUE 단독 (Postgres 동일 트랜잭션
  제약 회피)
- 158 processing_queue.payload JSONB (envelope 전달)
- 159 analyze_events 에 tier + suppressed_reason
- 160 suppressed_reason partial index

Models/ORM:
- Document: 5컬럼 Mapped 추가
- ProcessingQueue: deep_summary enum 확장 + payload 필드, enqueue_stage 에
  payload 옵션
- AnalyzeEvent: PR-A shadow 6컬럼 + PR-B tier/suppressed_reason

Workers:
- classify_worker: 기존 legacy 경로 뒤에 _run_tier_triage 추가.
  - _match_subject_domain(doc, text): source_channel + 본문 keywords + ai_domain
    prefix 로 PR-A policy 의 subject_domain 이름 결정 (category 매칭 금지).
  - R1 TriageOutput pydantic + JSON 깨짐 fallback (triage_json_invalid).
  - R2 _check_backlog_guard(): 30분 window ratio > threshold OR pending 초과면
    soft escalate suppress. hard escalate 는 통과.
  - R3 _slice_text_ranges(): 260k 초과 시 head 120k + mid 20k + tail 120k 3조각.
  - escalate 시 EscalationEnvelope 구성 + {envelope, subject_domain} payload 로
    deep_summary enqueue.
- deep_summary_worker (신규): queue payload 에서 envelope + subject_domain 읽기 →
  render_26b("p3c_deep_summary", subject_domain) + MLX 호출 (llm_gate Semaphore(1)
  경유) → ai_detail_summary + ai_inconsistencies 저장 + ai_analysis_tier='deep'.
  _filter_inconsistencies 로 허용 kind (version_drift / procedure_conflict /
  source_conflict / missing_basis) 만 통과 — 구매/계약 kind drop.
- queue_consumer: workers dict 에 deep_summary 추가 + BATCH_SIZE=1. next_stages
  는 건드리지 않음 — classify → embed/chunk 는 그대로, deep_summary 는 독립 체인.

Telemetry:
- record_analyze_event: subject_domain / risk_flags / escalation_reasons /
  confidence / policy_version / shadow_would_route_to / tier / escalated_to_26b /
  suppressed_reason 파라미터 확장. classify/deep worker 가 mode="summary_triage"
  또는 "summary_deep" 로 기록.

API:
- DocumentResponse 에 ai_tldr / ai_bullets / ai_detail_summary /
  ai_inconsistencies / ai_analysis_tier 5필드 노출.

Prompts:
- classify.txt 에 DEPRECATED 주석만 추가 (파일 유지 — rollback 경로 보존).
- PR-A 의 app/prompts/policy/p3a_short_summary.txt (4B) 와 p3c_deep_summary.txt
  (26B) 를 그대로 사용. 내 소유의 summary_triage.txt / summary_deep.txt 는 중복
  이라 별도 커밋에서 제거하지 않고 바로 생성 전 삭제.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-24 10:22:40 +09:00

211 lines
7.8 KiB
Python

"""PR-B B-1 Deep Summary 워커 — 26B (primary MLX) 에스컬레이션 분석.
큐 stage 'deep_summary' 에서 pickup. classify_worker 가 enqueue 시 payload 로 실은
EscalationEnvelope + subject_domain 을 읽어, PR-A policy 템플릿 `p3c_deep_summary` 를
렌더링한 뒤 26B primary 를 호출한다. llm_gate Semaphore(1) 경유 — MLX 단일 인퍼런스 보호.
출력을 documents.ai_detail_summary / ai_inconsistencies 에 저장하고 ai_analysis_tier 를
'deep' 으로 전이. 실패는 무해하게 legacy 결과 보존.
"""
from __future__ import annotations
import json
import time
from datetime import datetime, timezone
from pydantic import BaseModel, Field, ValidationError
from sqlalchemy import desc, select
from sqlalchemy.ext.asyncio import AsyncSession
from ai.client import AIClient, parse_json_response
from ai.envelope import EscalationEnvelope
from core.config import settings
from core.utils import setup_logger
from models.document import Document
from models.queue import ProcessingQueue
from policy.prompt_render import render_26b, policy_version as compute_policy_version
from services.document_telemetry import record_analyze_event
from services.search.llm_gate import get_mlx_gate
logger = setup_logger("deep_summary_worker")
DEEP_SUMMARY_TASK = "p3c_deep_summary"
# inconsistencies kind 허용 목록 (feedback_document_server_domain_scope.md — 구매/계약 제외)
ALLOWED_INCONSISTENCY_KINDS = {
"version_drift", "procedure_conflict", "source_conflict", "missing_basis",
}
class DeepSummaryOutput(BaseModel):
"""p3c_deep_summary (26B) 응답 스키마. 파싱 실패 시 기본값 + skip."""
mode: str = "single"
tldr: str = ""
bullets: list[str] = Field(default_factory=list)
detail: str = "" # 26B 가 채우는 상세 (detail_summary 로 저장)
bundle_flow: list[str] | None = None
inconsistencies: list[dict] | None = None
entities_confirmed: dict | None = None
directives_applied: list[str] | None = None
confidence: float = 0.5
async def process(document_id: int, session: AsyncSession) -> None:
"""deep_summary 큐 pickup → 26B 호출 → 필드 저장."""
doc = await session.get(Document, document_id)
if not doc:
raise ValueError(f"deep_summary: document id={document_id} 없음")
# 최신 deep_summary 큐 행의 payload 조회 (queue_consumer 가 status='processing' 으로 세팅한 상태)
queue_row = (await session.execute(
select(ProcessingQueue)
.where(
ProcessingQueue.document_id == document_id,
ProcessingQueue.stage == "deep_summary",
ProcessingQueue.status == "processing",
)
.order_by(desc(ProcessingQueue.id))
.limit(1)
)).scalar_one_or_none()
if not queue_row:
logger.warning(f"[deep] processing 상태의 deep_summary row 없음 id={document_id}")
return
payload = queue_row.payload or {}
envelope_raw = payload.get("envelope")
subject_domain = payload.get("subject_domain") or "generic"
if not envelope_raw:
logger.error(f"[deep] envelope 없음 id={document_id} payload_keys={list(payload.keys())}")
raise ValueError("deep_summary payload 에 envelope 없음")
envelope = EscalationEnvelope.from_json(json.dumps(envelope_raw))
# 원문 슬라이스 추출 (envelope.original_pointers.text_ranges 기반)
slices = _build_text_slices(doc.extracted_text or "", envelope.original_pointers)
# PR-A 템플릿 렌더
try:
rendered = render_26b(DEEP_SUMMARY_TASK, subject_domain)
except Exception as exc:
logger.exception(f"[deep] render_26b 실패 subject={subject_domain}: {exc}")
raise
prompt = (
rendered
.replace("{escalation_envelope_json}", envelope.to_system_injection())
.replace("{original_text_slices}", slices)
)
client = AIClient()
latency_ms = 0
parse_error: str | None = None
deep_out = DeepSummaryOutput()
try:
start = time.perf_counter()
async with get_mlx_gate(): # primary(26B) 보호 Semaphore(1)
raw = await client.call_primary(prompt)
latency_ms = int((time.perf_counter() - start) * 1000)
except Exception as exc:
logger.warning(f"[deep] 26B 호출 실패 id={document_id}: {exc}")
parse_error = "call_failed"
raw = ""
finally:
await client.close()
if raw:
try:
parsed = parse_json_response(raw) or {}
deep_out = DeepSummaryOutput.model_validate(parsed)
except (ValidationError, ValueError, TypeError) as exc:
parse_error = f"parse:{type(exc).__name__}"
logger.warning(f"[deep] JSON 파싱/검증 실패 id={document_id}: {exc}")
if not parse_error:
doc.ai_detail_summary = (deep_out.detail or "").strip() or None
doc.ai_inconsistencies = _filter_inconsistencies(deep_out.inconsistencies or [])
doc.ai_analysis_tier = "deep"
doc.ai_processed_at = datetime.now(timezone.utc)
try:
pv = compute_policy_version(DEEP_SUMMARY_TASK)
except Exception:
pv = None
await record_analyze_event(
doc_id=document_id,
user_id=None,
mode="summary_deep",
text_limit=settings.ai.primary.context_char_limit or 260000,
truncated=False,
layers_returned=["detail_summary", "inconsistencies"] if not parse_error else [],
cached=False,
latency_ms=latency_ms,
model_name=settings.ai.primary.model,
prompt_version=(f"{DEEP_SUMMARY_TASK}@{pv}" if pv else DEEP_SUMMARY_TASK),
error_code=parse_error,
source="document_server",
subject_domain=subject_domain,
risk_flags=list(envelope.risk_flags),
high_impact_task=None,
escalation_reasons=list(envelope.escalation_reasons),
confidence=deep_out.confidence,
policy_version=pv,
shadow_would_route_to="primary",
tier="primary",
escalated_to_26b=True,
suppressed_reason=None,
)
logger.info(
f"[deep] id={document_id} subject={subject_domain} "
f"detail_len={len(doc.ai_detail_summary or '')} "
f"inc={len(doc.ai_inconsistencies or [])} latency_ms={latency_ms} "
f"parse_error={parse_error}"
)
def _build_text_slices(text: str, pointers: dict) -> str:
"""original_pointers.text_ranges 의 [{start, end}] 를 실제 본문 슬라이스로 합친다.
3조각 이상이면 각 조각 앞에 [slice N — head/middle/tail] 라벨을 붙여 26B 가 순서 인지.
단일 슬라이스면 라벨 없이 원문 그대로.
"""
ranges = (pointers or {}).get("text_ranges") or []
if not ranges:
return text[:260_000]
if len(ranges) == 1:
r = ranges[0]
return text[r.get("start", 0):r.get("end", len(text))]
labels = ["head", "middle", "tail"]
parts: list[str] = []
for idx, r in enumerate(ranges):
label = labels[idx] if idx < len(labels) else f"slice{idx}"
chunk = text[r.get("start", 0):r.get("end", len(text))]
parts.append(f"[slice {idx}{label}]\n{chunk}")
return "\n\n".join(parts)
def _filter_inconsistencies(items: list) -> list[dict]:
"""허용 kind 목록 (safety/news 도메인 한정) 만 통과시킨다.
`amount_mismatch` 같은 구매/계약 kind 는 여기서 drop (feedback_document_server_domain_scope.md).
구조 오류 (kind/desc 누락) 도 drop.
"""
out: list[dict] = []
for it in items or []:
if not isinstance(it, dict):
continue
kind = str(it.get("kind") or "")
desc_ = str(it.get("desc") or "").strip()
if kind not in ALLOWED_INCONSISTENCY_KINDS:
continue
if not desc_:
continue
out.append({"kind": kind, "desc": desc_})
return out