c11f113cf1
worker_fn 이 transient 실패를 삼켜 정상 반환하면 queue_consumer 가 status=completed 로 확정 → 영구 데이터 손실 + 재시도/추적 0. 정본(extract/marker/fulltext/stt 는 re-raise)과 어긋난 곳을 통일: - deep_summary: 호출 실패(call_failed)를 삼키지 않고 raise → 재시도→failed dead-letter (이전엔 ai_detail_summary 영구 누락 + tier triage 고착). - thumbnail: _extract_thumbnail 실패를 silent return → raise (썸네일 영구 누락 방지). - queue_consumer: 완료 커밋 후 enqueue_next_stage(정상·skip-note 2곳)를 자체 try 로 격리 — enqueue 실패가 outer except 로 전파돼 completed 항목을 재오픈(stage 재실행) 하던 결함 차단. 실패는 ERROR 로 가시화. - broad except 에 asyncio.CancelledError 명시 통과(embed worker / ask classifier·verifier). dead-letter = ProcessingQueue.status='failed'(기존 attempts/max_attempts 머신 재사용, 신규 컬럼 불필요). 검증: py_compile 통과. 큐 재시도 의미 synthetic smoke(staging) 예정. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
357 lines
14 KiB
Python
357 lines
14 KiB
Python
"""PR-B B-1 Deep Summary 워커 — 26B (primary MLX) 에스컬레이션 분석.
|
|
|
|
큐 stage 'deep_summary' 에서 pickup. classify_worker 가 enqueue 시 payload 로 실은
|
|
EscalationEnvelope + subject_domain 을 읽어, PR-A policy 템플릿 `p3c_deep_summary` 를
|
|
렌더링한 뒤 26B primary 를 호출한다. llm_gate Semaphore(1) 경유 — MLX 단일 인퍼런스 보호.
|
|
|
|
출력을 documents.ai_detail_summary / ai_inconsistencies 에 저장하고 ai_analysis_tier 를
|
|
'deep' 으로 전이. 실패는 무해하게 legacy 결과 보존.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import time
|
|
from datetime import datetime, timezone
|
|
|
|
from pydantic import BaseModel, Field, ValidationError
|
|
from sqlalchemy import desc, select
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
import json
|
|
import re
|
|
from ai.client import AIClient, call_deep_or_defer, parse_json_response, strip_thinking
|
|
from ai.envelope import EscalationEnvelope
|
|
from core.config import settings
|
|
from core.utils import setup_logger
|
|
from models.document import Document
|
|
from models.queue import ProcessingQueue, StageDeferred
|
|
from policy.prompt_render import render_26b, policy_version as compute_policy_version
|
|
from services.document_telemetry import record_analyze_event
|
|
from services.search.llm_gate import Priority, acquire_mlx_gate
|
|
|
|
logger = setup_logger("deep_summary_worker")
|
|
|
|
DEEP_SUMMARY_TASK = "p3c_deep_summary"
|
|
|
|
# inconsistencies kind 허용 목록 (feedback_document_server_domain_scope.md — 구매/계약 제외)
|
|
ALLOWED_INCONSISTENCY_KINDS = {
|
|
"version_drift", "procedure_conflict", "source_conflict", "missing_basis",
|
|
}
|
|
|
|
|
|
class DeepSummaryOutput(BaseModel):
|
|
"""p3c_deep_summary (26B) 응답 스키마. 파싱 실패 시 기본값 + skip."""
|
|
|
|
mode: str = "single"
|
|
tldr: str = ""
|
|
bullets: list[str] = Field(default_factory=list)
|
|
detail: str = "" # 26B 가 채우는 상세 (detail_summary 로 저장)
|
|
bundle_flow: list[str] | None = None
|
|
inconsistencies: list[dict] | None = None
|
|
entities_confirmed: dict | None = None
|
|
directives_applied: list[str] | None = None
|
|
confidence: float = 0.5
|
|
|
|
|
|
async def process(
|
|
document_id: int, session: AsyncSession, *, defer_on_deep_unavailable: bool = False
|
|
) -> None:
|
|
"""deep_summary 큐 pickup → LLM 호출 → 필드 저장.
|
|
|
|
defer_on_deep_unavailable:
|
|
False (기본, consumer 경로) = 맥북(deep 슬롯) 우선 시도, 불가 시 즉시
|
|
맥미니 primary 로 처리. 2026-06-12 fair-share: 양 머신이 동일 모델
|
|
(Qwen3.6-27B-6bit)이라 폴백 = 품질 강등이 아니라 단순 분배.
|
|
True (queue_drain 전용) = 맥북 불가를 StageDeferred 로 올려 drain 이
|
|
보류 후 run 을 멈춘다 (drain = 맥북 분담 전용 레버 시멘틱 유지).
|
|
"""
|
|
doc = await session.get(Document, document_id)
|
|
if not doc:
|
|
raise ValueError(f"deep_summary: document id={document_id} 없음")
|
|
|
|
# 최신 deep_summary 큐 행의 payload 조회 (queue_consumer 가 status='processing' 으로 세팅한 상태)
|
|
queue_row = (await session.execute(
|
|
select(ProcessingQueue)
|
|
.where(
|
|
ProcessingQueue.document_id == document_id,
|
|
ProcessingQueue.stage == "deep_summary",
|
|
ProcessingQueue.status == "processing",
|
|
)
|
|
.order_by(desc(ProcessingQueue.id))
|
|
.limit(1)
|
|
)).scalar_one_or_none()
|
|
if not queue_row:
|
|
logger.warning(f"[deep] processing 상태의 deep_summary row 없음 id={document_id}")
|
|
return
|
|
|
|
payload = queue_row.payload or {}
|
|
envelope_raw = payload.get("envelope")
|
|
subject_domain = payload.get("subject_domain") or "generic"
|
|
if not envelope_raw:
|
|
logger.error(f"[deep] envelope 없음 id={document_id} payload_keys={list(payload.keys())}")
|
|
raise ValueError("deep_summary payload 에 envelope 없음")
|
|
|
|
envelope = EscalationEnvelope.from_json(json.dumps(envelope_raw))
|
|
|
|
# 원문 슬라이스 추출 (envelope.original_pointers.text_ranges 기반)
|
|
slices = _build_text_slices(doc.extracted_text or "", envelope.original_pointers)
|
|
|
|
# PR-A 템플릿 렌더
|
|
try:
|
|
rendered = render_26b(DEEP_SUMMARY_TASK, subject_domain)
|
|
except Exception as exc:
|
|
logger.exception(f"[deep] render_26b 실패 subject={subject_domain}: {exc}")
|
|
raise
|
|
|
|
prompt = (
|
|
rendered
|
|
.replace("{escalation_envelope_json}", envelope.to_system_injection())
|
|
.replace("{original_text_slices}", slices)
|
|
)
|
|
|
|
client = AIClient()
|
|
# ds-macbook-offload-1: deep 슬롯 구성 시 맥북 M5 Max 경유(라우터). 부재 시 기존 경로 그대로.
|
|
deep_cfg = client.ai.deep
|
|
used_cfg = deep_cfg or settings.ai.primary
|
|
latency_ms = 0
|
|
parse_error: str | None = None
|
|
deep_out = DeepSummaryOutput()
|
|
|
|
try:
|
|
start = time.perf_counter()
|
|
if deep_cfg is not None:
|
|
# 맥북 우선 — 맥미니 mlx gate 미점유(별 endpoint). doc 쓰기는 완주+파싱
|
|
# 후에만 일어나므로 어느 시점에 끊겨도 부분 쓰기 0.
|
|
try:
|
|
raw = await call_deep_or_defer(client, prompt)
|
|
except StageDeferred:
|
|
if defer_on_deep_unavailable:
|
|
raise # drain 전용 — 맥북 레버 시멘틱 (보류 후 run 종료)
|
|
# consumer 경로: 동일 모델이라 강등 아님 — 맥미니가 즉시 처리 (2026-06-12)
|
|
logger.info(
|
|
f"[deep] id={document_id} 맥북 불가 → 맥미니 primary 처리 (fair-share)"
|
|
)
|
|
used_cfg = settings.ai.primary
|
|
async with acquire_mlx_gate(Priority.BACKGROUND):
|
|
raw = await client.call_primary(prompt)
|
|
else:
|
|
async with acquire_mlx_gate(Priority.BACKGROUND): # 2026-05-17 B-1: classify-escalate worker
|
|
raw = await client.call_primary(prompt)
|
|
latency_ms = int((time.perf_counter() - start) * 1000)
|
|
except StageDeferred:
|
|
# 보류는 실패가 아님 — analyze_event 미기록(가짜 완료 방지), drain 이 백오프 기록.
|
|
logger.info(f"[deep] id={document_id} 맥북 일시 불가 — 보류 (deferred)")
|
|
raise
|
|
except Exception as exc:
|
|
# 호출 실패(네트워크/API 5xx 등)는 삼키지 않고 전파 (R3) — queue_consumer 가
|
|
# attempts 소진까지 재시도 후 status=failed(dead-letter)로 가시화한다. 삼키면
|
|
# worker_fn 이 정상 반환 → 큐가 completed 로 확정 → ai_detail_summary 영구 누락 +
|
|
# tier 가 triage 에 고착(silent 영구 손실). extract/marker/fulltext/stt 정본과 일치.
|
|
# 완주 전 doc 쓰기(168~)는 일어나지 않으므로 부분 쓰기 0 (sleep-안전).
|
|
logger.warning(f"[deep] 호출 실패 id={document_id} model={used_cfg.model}: {exc}")
|
|
raise
|
|
finally:
|
|
await client.close()
|
|
|
|
if raw:
|
|
try:
|
|
# parse_json_response 는 중첩 JSON (entities_confirmed) 을 최외곽으로 오인하는
|
|
# 케이스가 있어 — deep_summary 응답에서 자주 발생 — 최외곽 추출 전용 helper 사용.
|
|
parsed = _parse_outermost_json(raw) or parse_json_response(raw)
|
|
if not parsed:
|
|
# 잘린 응답 fallback — field-level regex 로 detail/tldr/inconsistencies 추출
|
|
parsed = _regex_extract_fields(raw)
|
|
deep_out = DeepSummaryOutput.model_validate(parsed or {})
|
|
if not deep_out.detail and parsed and parsed.get("_fallback"):
|
|
logger.info(f"[deep] id={document_id} regex fallback parsed keys={list(parsed.keys())}")
|
|
except (ValidationError, ValueError, TypeError) as exc:
|
|
parse_error = f"parse:{type(exc).__name__}"
|
|
logger.warning(f"[deep] JSON 파싱/검증 실패 id={document_id}: {exc}")
|
|
|
|
if not parse_error:
|
|
doc.ai_detail_summary = (deep_out.detail or "").strip() or None
|
|
doc.ai_inconsistencies = _filter_inconsistencies(deep_out.inconsistencies or [])
|
|
doc.ai_analysis_tier = "deep"
|
|
doc.ai_processed_at = datetime.now(timezone.utc)
|
|
|
|
try:
|
|
pv = compute_policy_version(DEEP_SUMMARY_TASK)
|
|
except Exception:
|
|
pv = None
|
|
|
|
await record_analyze_event(
|
|
doc_id=document_id,
|
|
user_id=None,
|
|
mode="summary_deep",
|
|
text_limit=used_cfg.context_char_limit or 260000,
|
|
truncated=False,
|
|
layers_returned=["detail_summary", "inconsistencies"] if not parse_error else [],
|
|
cached=False,
|
|
latency_ms=latency_ms,
|
|
# deep 슬롯 사용 시 실처리 모델(qwen-macbook alias) 기록 — 어느 머신이 처리했는지 추적
|
|
model_name=used_cfg.model,
|
|
prompt_version=(f"{DEEP_SUMMARY_TASK}@{pv}" if pv else DEEP_SUMMARY_TASK),
|
|
error_code=parse_error,
|
|
source="document_server",
|
|
subject_domain=subject_domain,
|
|
risk_flags=list(envelope.risk_flags),
|
|
high_impact_task=None,
|
|
escalation_reasons=list(envelope.escalation_reasons),
|
|
confidence=deep_out.confidence,
|
|
policy_version=pv,
|
|
shadow_would_route_to="primary",
|
|
tier="primary",
|
|
escalated_to_26b=True,
|
|
suppressed_reason=None,
|
|
)
|
|
|
|
logger.info(
|
|
f"[deep] id={document_id} subject={subject_domain} "
|
|
f"detail_len={len(doc.ai_detail_summary or '')} "
|
|
f"inc={len(doc.ai_inconsistencies or [])} latency_ms={latency_ms} "
|
|
f"parse_error={parse_error}"
|
|
)
|
|
|
|
|
|
def _build_text_slices(text: str, pointers: dict) -> str:
|
|
"""original_pointers.text_ranges 의 [{start, end}] 를 실제 본문 슬라이스로 합친다.
|
|
|
|
3조각 이상이면 각 조각 앞에 [slice N — head/middle/tail] 라벨을 붙여 26B 가 순서 인지.
|
|
단일 슬라이스면 라벨 없이 원문 그대로.
|
|
"""
|
|
ranges = (pointers or {}).get("text_ranges") or []
|
|
if not ranges:
|
|
return text[:260_000]
|
|
|
|
if len(ranges) == 1:
|
|
r = ranges[0]
|
|
return text[r.get("start", 0):r.get("end", len(text))]
|
|
|
|
labels = ["head", "middle", "tail"]
|
|
parts: list[str] = []
|
|
for idx, r in enumerate(ranges):
|
|
label = labels[idx] if idx < len(labels) else f"slice{idx}"
|
|
chunk = text[r.get("start", 0):r.get("end", len(text))]
|
|
parts.append(f"[slice {idx} — {label}]\n{chunk}")
|
|
return "\n\n".join(parts)
|
|
|
|
|
|
def _parse_outermost_json(raw: str) -> dict | None:
|
|
"""Response 의 첫 '{' 부터 brace balance 로 최외곽 JSON 추출.
|
|
|
|
parse_json_response 의 re.finditer 패턴이 1단계 중첩까지만 매치해서 deep_summary
|
|
응답처럼 `entities_confirmed: {...}` 2단계 중첩이 포함된 경우 최외곽 대신 내부
|
|
객체만 반환되는 문제를 우회. 또한 응답이 잘려 closure `}` 가 없으면 강제로
|
|
`}` 추가 시도하여 부분 파싱.
|
|
"""
|
|
cleaned = strip_thinking(raw)
|
|
code_match = re.search(r"```(?:json)?\s*(\{.*)", cleaned, re.DOTALL)
|
|
if code_match:
|
|
cleaned = code_match.group(1)
|
|
start = cleaned.find("{")
|
|
if start < 0:
|
|
return None
|
|
depth = 0
|
|
end = -1
|
|
in_str = False
|
|
esc = False
|
|
for i in range(start, len(cleaned)):
|
|
ch = cleaned[i]
|
|
if esc:
|
|
esc = False
|
|
continue
|
|
if ch == "\\":
|
|
esc = True
|
|
continue
|
|
if ch == '"':
|
|
in_str = not in_str
|
|
continue
|
|
if in_str:
|
|
continue
|
|
if ch == "{":
|
|
depth += 1
|
|
elif ch == "}":
|
|
depth -= 1
|
|
if depth == 0:
|
|
end = i + 1
|
|
break
|
|
if end > 0:
|
|
try:
|
|
return json.loads(cleaned[start:end])
|
|
except json.JSONDecodeError:
|
|
pass
|
|
# 응답 잘림 — 남은 depth 만큼 `}` 보강 후 재시도
|
|
if depth > 0:
|
|
candidate = cleaned[start:].rstrip().rstrip(",") + ("}" * depth)
|
|
try:
|
|
return json.loads(candidate)
|
|
except json.JSONDecodeError:
|
|
pass
|
|
return None
|
|
|
|
|
|
def _regex_extract_fields(raw: str) -> dict:
|
|
"""JSON parse 실패 시 field-level regex 로 detail/tldr/mode/inconsistencies 추출.
|
|
|
|
응답이 잘렸거나 중간에 문자열이 끊긴 경우에도 앞쪽에 완결된 필드는 건진다.
|
|
`"detail": "…"` 처럼 key-value 쌍을 개별 매칭.
|
|
"""
|
|
def _str_field(key: str) -> str | None:
|
|
m = re.search(rf'"{key}"\s*:\s*"((?:[^"\\]|\\.)*)"', raw, re.DOTALL)
|
|
if not m:
|
|
return None
|
|
try:
|
|
# JSON string escape 복원 (\n, \\, \" 등)
|
|
return json.loads('"' + m.group(1) + '"')
|
|
except json.JSONDecodeError:
|
|
return m.group(1)
|
|
|
|
def _arr_field(key: str) -> list | None:
|
|
# 단순 문자열 배열만 지원 — bullets / inconsistencies_desc 등
|
|
m = re.search(rf'"{key}"\s*:\s*(\[[^\]]*\])', raw, re.DOTALL)
|
|
if not m:
|
|
return None
|
|
try:
|
|
return json.loads(m.group(1))
|
|
except json.JSONDecodeError:
|
|
return None
|
|
|
|
out: dict = {"_fallback": True}
|
|
mode = _str_field("mode")
|
|
if mode:
|
|
out["mode"] = mode
|
|
tldr = _str_field("tldr")
|
|
if tldr:
|
|
out["tldr"] = tldr
|
|
detail = _str_field("detail")
|
|
if detail:
|
|
out["detail"] = detail
|
|
bullets = _arr_field("bullets")
|
|
if bullets is not None:
|
|
out["bullets"] = bullets
|
|
inc = _arr_field("inconsistencies")
|
|
if inc is not None:
|
|
out["inconsistencies"] = inc
|
|
return out
|
|
|
|
|
|
def _filter_inconsistencies(items: list) -> list[dict]:
|
|
"""허용 kind 목록 (safety/news 도메인 한정) 만 통과시킨다.
|
|
|
|
`amount_mismatch` 같은 구매/계약 kind 는 여기서 drop (feedback_document_server_domain_scope.md).
|
|
구조 오류 (kind/desc 누락) 도 drop.
|
|
"""
|
|
out: list[dict] = []
|
|
for it in items or []:
|
|
if not isinstance(it, dict):
|
|
continue
|
|
kind = str(it.get("kind") or "")
|
|
desc_ = str(it.get("desc") or "").strip()
|
|
if kind not in ALLOWED_INCONSISTENCY_KINDS:
|
|
continue
|
|
if not desc_:
|
|
continue
|
|
out.append({"kind": kind, "desc": desc_})
|
|
return out
|