fix(classify): 적대 리뷰 반영 — use_deep 스레딩(B1)·StageDeferred 전파(B2)·legacy 호출 deep 경유(M3)

- _run_tier_triage(use_deep) 스레딩 — 미배선 NameError(전 classify 파괴) fix
- process 의 triage try 에 except StageDeferred: raise 선행 (drain 보류 시멘틱 복구)
- legacy classify()/summarize() 에 cfg 파라미터 — use_deep 시 deep 슬롯 경유 +
  is_deferrable_error → StageDeferred 변환(첫 호출 = 최저비용 지점에서 보류, doc 쓰기 0)
- ai_model_version = 실제 처리 경로 모델 (drain=qwen-macbook 귀속)
- analyze_event model_name 스레딩 + deep triage cfg 에 top_p 동승

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
This commit is contained in:
hyungi
2026-06-12 07:12:40 +09:00
parent 235bbf9881
commit d667545185
2 changed files with 57 additions and 23 deletions
+8 -5
View File
@@ -242,20 +242,23 @@ class AIClient:
# ─── Legacy API (classify_worker 교체 시 제거 예정) ───────────────────
async def classify(self, text: str) -> dict:
async def classify(self, text: str, cfg=None) -> dict:
"""[DEPRECATED] 기존 classify_worker 전용. B-1 에서 summary_triage 로 대체.
호출부 정리 전 존속. 신규 코드는 call_triage + prompt_render 를 쓸 것.
cfg (2026-06-12 fair-share): 지정 시 primary 대신 해당 config 로 호출 —
drain classify 가 deep 슬롯(맥북) 경유에 사용. cfg != ai.primary 라
_call_chat 의 primary→fallback 자동 전환은 발동하지 않는다 (에러 raw 전파).
"""
prompt = CLASSIFY_PROMPT.replace("{document_text}", text)
response = await self._call_chat(self.ai.primary, prompt)
response = await self._call_chat(cfg or self.ai.primary, prompt)
return response
async def summarize(self, text: str, force_premium: bool = False) -> str:
"""[DEPRECATED] 기존 호출부용. B-1 에서 summary_triage 가 tldr 대체."""
async def summarize(self, text: str, force_premium: bool = False, cfg=None) -> str:
"""[DEPRECATED] 기존 호출부용. B-1 에서 summary_triage 가 tldr 대체. cfg = classify() 와 동일."""
if force_premium:
return await self._call_chat(self.ai.premium, f"다음 문서를 500자 이내로 요약해주세요:\n\n{text}")
return await self._call_chat(self.ai.primary, f"다음 문서를 500자 이내로 요약해주세요:\n\n{text}")
return await self._call_chat(cfg or self.ai.primary, f"다음 문서를 500자 이내로 요약해주세요:\n\n{text}")
async def embed(self, text: str) -> list[float]:
"""벡터 임베딩 — GPU 서버 전용"""
+49 -18
View File
@@ -31,7 +31,13 @@ from pydantic import BaseModel, Field, ValidationError
from sqlalchemy import text as sql_text
from sqlalchemy.ext.asyncio import AsyncSession
from ai.client import AIClient, call_deep_or_defer, parse_json_response, strip_thinking
from ai.client import (
AIClient,
call_deep_or_defer,
is_deferrable_error,
parse_json_response,
strip_thinking,
)
from ai.envelope import EscalationEnvelope
from core.config import settings
from core.utils import setup_logger
@@ -453,10 +459,20 @@ async def process(
logger.info(f"doc {document_id}: frontmatter 부분 인식 → LLM 으로 미설정 필드 보완")
client = AIClient()
# fair-share (2026-06-12): use_deep 시 legacy classify/summarize 도 deep 슬롯(맥북)
# 경유 — 그래야 drain 의 "맥북 분담" 이 실제로 성립 (triage 만 보내면 50K 요약
# 프리필이 맥미니에 남는다). deep 슬롯 sampling = primary 와 동일(0.3/0.9/8192).
legacy_cfg = settings.ai.deep if (use_deep and settings.ai.deep is not None) else None
try:
# ─── 1. Legacy classify (primary 26B) ───
# ─── 1. Legacy classify (primary 또는 deep) ───
truncated = doc.extracted_text[:MAX_CLASSIFY_TEXT]
raw_response = await client.classify(truncated)
try:
raw_response = await client.classify(truncated, cfg=legacy_cfg)
except Exception as exc:
if legacy_cfg is not None and is_deferrable_error(exc):
# 맥북 불가 — 첫 호출(최저 비용 지점)에서 보류로 전환, doc 쓰기 0
raise StageDeferred(f"macbook_unavailable:{type(exc).__name__}") from exc
raise
parsed = parse_json_response(raw_response)
if not parsed:
@@ -524,12 +540,17 @@ async def process(
"reason": "classify pipeline",
}
# ─── 2. Legacy 요약 (primary 26B) ───
summary = await client.summarize(doc.extracted_text[:50000])
# ─── 2. Legacy 요약 (primary 또는 deep) ───
try:
summary = await client.summarize(doc.extracted_text[:50000], cfg=legacy_cfg)
except Exception as exc:
if legacy_cfg is not None and is_deferrable_error(exc):
raise StageDeferred(f"macbook_unavailable:{type(exc).__name__}") from exc
raise
doc.ai_summary = strip_thinking(summary)
# ─── 메타데이터 (legacy 완료) ───
doc.ai_model_version = settings.ai.primary.model
# ─── 메타데이터 (legacy 완료) — 실제 처리 머신 귀속 (drain=qwen-macbook) ───
doc.ai_model_version = (legacy_cfg or settings.ai.primary).model
doc.ai_processed_at = datetime.now(timezone.utc)
logger.info(
@@ -540,7 +561,9 @@ async def process(
# ─── 3. PR-B B-1 — tier triage (4B, 실패는 legacy 결과 보존) ───
try:
await _run_tier_triage(client, doc, session)
await _run_tier_triage(client, doc, session, use_deep=use_deep)
except StageDeferred:
raise # 보류는 실패가 아님 — drain/consumer 가 attempts 미소모 처리
except Exception as exc:
logger.exception(f"[triage] id={document_id} 전체 실패 — legacy 유지: {exc}")
@@ -548,8 +571,10 @@ async def process(
await client.close()
async def _run_tier_triage(client: AIClient, doc: Document, session: AsyncSession) -> None:
"""summary_triage (p3a_short_summary) 경로."""
async def _run_tier_triage(
client: AIClient, doc: Document, session: AsyncSession, *, use_deep: bool = False
) -> None:
"""summary_triage (p3a_short_summary) 경로. use_deep = process() 에서 전달 (drain 전용)."""
document_id = doc.id
text = doc.extracted_text or ""
input_chars = len(text)
@@ -557,6 +582,14 @@ async def _run_tier_triage(client: AIClient, doc: Document, session: AsyncSessio
triage_start = time.perf_counter()
parse_error: str | None = None
triage_out = TriageOutput()
# drain 경유 시 triage 도 deep 슬롯(맥북) — sampling 은 triage 것 유지(결정성).
deep_triage_cfg = None
if use_deep and settings.ai.deep is not None:
deep_triage_cfg = settings.ai.deep.model_copy(update={
"temperature": settings.ai.triage.temperature,
"top_p": settings.ai.triage.top_p,
"max_tokens": settings.ai.triage.max_tokens,
})
# 입력이 triage 한도 초과면 호출 생략하고 long_context 로 escalate
if input_chars > TRIAGE_TEXT_LIMIT:
@@ -597,13 +630,9 @@ async def _run_tier_triage(client: AIClient, doc: Document, session: AsyncSessio
prompt = rendered.replace("{extracted_text}", text[:TRIAGE_TEXT_LIMIT])
try:
if use_deep and settings.ai.deep is not None:
# drain 전용 — deep 슬롯 endpoint + triage sampling (결정성 유지).
# 맥북 불가(StageDeferred)는 아래 generic except 에 먹히지 않게 먼저 전파.
deep_triage_cfg = settings.ai.deep.model_copy(update={
"temperature": settings.ai.triage.temperature,
"max_tokens": settings.ai.triage.max_tokens,
})
if deep_triage_cfg is not None:
# drain 전용 — deep 슬롯 endpoint + triage sampling. 맥북 불가(StageDeferred)
# 는 아래 generic except 에 먹히지 않게 먼저 전파.
raw_triage = await call_deep_or_defer(client, prompt, cfg=deep_triage_cfg)
else:
raw_triage = await client.call_triage(prompt)
@@ -674,6 +703,7 @@ async def _run_tier_triage(client: AIClient, doc: Document, session: AsyncSessio
escalation_reason=escalation_reason,
parse_error=parse_error,
routing_decision=routing_decision,
model_name=(deep_triage_cfg.model if deep_triage_cfg is not None else None),
)
@@ -688,6 +718,7 @@ async def _apply_triage_result(
escalation_reason: str | None,
parse_error: str | None,
routing_decision=None,
model_name: str | None = None, # fair-share: 실제 호출 경로 모델 (None=triage 기본)
) -> None:
"""TriageOutput → Document 필드 + R2 suppression + envelope enqueue + audit.
@@ -778,7 +809,7 @@ async def _apply_triage_result(
layers_returned=["tldr", "bullets"] if not parse_error else [],
cached=False,
latency_ms=latency_ms,
model_name=settings.ai.triage.model,
model_name=(model_name or settings.ai.triage.model),
prompt_version=(f"{SUMMARY_TRIAGE_TASK}@{pv}" if pv else SUMMARY_TRIAGE_TASK),
error_code=parse_error,
source="document_server",