From d66754518553644abe265ba0b1b1c3b284d94db0 Mon Sep 17 00:00:00 2001 From: hyungi Date: Fri, 12 Jun 2026 07:12:40 +0900 Subject: [PATCH] =?UTF-8?q?fix(classify):=20=EC=A0=81=EB=8C=80=20=EB=A6=AC?= =?UTF-8?q?=EB=B7=B0=20=EB=B0=98=EC=98=81=20=E2=80=94=20use=5Fdeep=20?= =?UTF-8?q?=EC=8A=A4=EB=A0=88=EB=94=A9(B1)=C2=B7StageDeferred=20=EC=A0=84?= =?UTF-8?q?=ED=8C=8C(B2)=C2=B7legacy=20=ED=98=B8=EC=B6=9C=20deep=20?= =?UTF-8?q?=EA=B2=BD=EC=9C=A0(M3)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - _run_tier_triage(use_deep) 스레딩 — 미배선 NameError(전 classify 파괴) fix - process 의 triage try 에 except StageDeferred: raise 선행 (drain 보류 시멘틱 복구) - legacy classify()/summarize() 에 cfg 파라미터 — use_deep 시 deep 슬롯 경유 + is_deferrable_error → StageDeferred 변환(첫 호출 = 최저비용 지점에서 보류, doc 쓰기 0) - ai_model_version = 실제 처리 경로 모델 (drain=qwen-macbook 귀속) - analyze_event model_name 스레딩 + deep triage cfg 에 top_p 동승 Co-Authored-By: Claude Fable 5 --- app/ai/client.py | 13 ++++--- app/workers/classify_worker.py | 67 +++++++++++++++++++++++++--------- 2 files changed, 57 insertions(+), 23 deletions(-) diff --git a/app/ai/client.py b/app/ai/client.py index 485d745..3684b76 100644 --- a/app/ai/client.py +++ b/app/ai/client.py @@ -242,20 +242,23 @@ class AIClient: # ─── Legacy API (classify_worker 교체 시 제거 예정) ─────────────────── - async def classify(self, text: str) -> dict: + async def classify(self, text: str, cfg=None) -> dict: """[DEPRECATED] 기존 classify_worker 전용. B-1 에서 summary_triage 로 대체. 호출부 정리 전 존속. 신규 코드는 call_triage + prompt_render 를 쓸 것. + cfg (2026-06-12 fair-share): 지정 시 primary 대신 해당 config 로 호출 — + drain classify 가 deep 슬롯(맥북) 경유에 사용. cfg != ai.primary 라 + _call_chat 의 primary→fallback 자동 전환은 발동하지 않는다 (에러 raw 전파). """ prompt = CLASSIFY_PROMPT.replace("{document_text}", text) - response = await self._call_chat(self.ai.primary, prompt) + response = await self._call_chat(cfg or self.ai.primary, prompt) return response - async def summarize(self, text: str, force_premium: bool = False) -> str: - """[DEPRECATED] 기존 호출부용. B-1 에서 summary_triage 가 tldr 대체.""" + async def summarize(self, text: str, force_premium: bool = False, cfg=None) -> str: + """[DEPRECATED] 기존 호출부용. B-1 에서 summary_triage 가 tldr 대체. cfg = classify() 와 동일.""" if force_premium: return await self._call_chat(self.ai.premium, f"다음 문서를 500자 이내로 요약해주세요:\n\n{text}") - return await self._call_chat(self.ai.primary, f"다음 문서를 500자 이내로 요약해주세요:\n\n{text}") + return await self._call_chat(cfg or self.ai.primary, f"다음 문서를 500자 이내로 요약해주세요:\n\n{text}") async def embed(self, text: str) -> list[float]: """벡터 임베딩 — GPU 서버 전용""" diff --git a/app/workers/classify_worker.py b/app/workers/classify_worker.py index 14e2548..b5c8908 100644 --- a/app/workers/classify_worker.py +++ b/app/workers/classify_worker.py @@ -31,7 +31,13 @@ from pydantic import BaseModel, Field, ValidationError from sqlalchemy import text as sql_text from sqlalchemy.ext.asyncio import AsyncSession -from ai.client import AIClient, call_deep_or_defer, parse_json_response, strip_thinking +from ai.client import ( + AIClient, + call_deep_or_defer, + is_deferrable_error, + parse_json_response, + strip_thinking, +) from ai.envelope import EscalationEnvelope from core.config import settings from core.utils import setup_logger @@ -453,10 +459,20 @@ async def process( logger.info(f"doc {document_id}: frontmatter 부분 인식 → LLM 으로 미설정 필드 보완") client = AIClient() + # fair-share (2026-06-12): use_deep 시 legacy classify/summarize 도 deep 슬롯(맥북) + # 경유 — 그래야 drain 의 "맥북 분담" 이 실제로 성립 (triage 만 보내면 50K 요약 + # 프리필이 맥미니에 남는다). deep 슬롯 sampling = primary 와 동일(0.3/0.9/8192). + legacy_cfg = settings.ai.deep if (use_deep and settings.ai.deep is not None) else None try: - # ─── 1. Legacy classify (primary 26B) ─── + # ─── 1. Legacy classify (primary 또는 deep) ─── truncated = doc.extracted_text[:MAX_CLASSIFY_TEXT] - raw_response = await client.classify(truncated) + try: + raw_response = await client.classify(truncated, cfg=legacy_cfg) + except Exception as exc: + if legacy_cfg is not None and is_deferrable_error(exc): + # 맥북 불가 — 첫 호출(최저 비용 지점)에서 보류로 전환, doc 쓰기 0 + raise StageDeferred(f"macbook_unavailable:{type(exc).__name__}") from exc + raise parsed = parse_json_response(raw_response) if not parsed: @@ -524,12 +540,17 @@ async def process( "reason": "classify pipeline", } - # ─── 2. Legacy 요약 (primary 26B) ─── - summary = await client.summarize(doc.extracted_text[:50000]) + # ─── 2. Legacy 요약 (primary 또는 deep) ─── + try: + summary = await client.summarize(doc.extracted_text[:50000], cfg=legacy_cfg) + except Exception as exc: + if legacy_cfg is not None and is_deferrable_error(exc): + raise StageDeferred(f"macbook_unavailable:{type(exc).__name__}") from exc + raise doc.ai_summary = strip_thinking(summary) - # ─── 메타데이터 (legacy 완료) ─── - doc.ai_model_version = settings.ai.primary.model + # ─── 메타데이터 (legacy 완료) — 실제 처리 머신 귀속 (drain=qwen-macbook) ─── + doc.ai_model_version = (legacy_cfg or settings.ai.primary).model doc.ai_processed_at = datetime.now(timezone.utc) logger.info( @@ -540,7 +561,9 @@ async def process( # ─── 3. PR-B B-1 — tier triage (4B, 실패는 legacy 결과 보존) ─── try: - await _run_tier_triage(client, doc, session) + await _run_tier_triage(client, doc, session, use_deep=use_deep) + except StageDeferred: + raise # 보류는 실패가 아님 — drain/consumer 가 attempts 미소모 처리 except Exception as exc: logger.exception(f"[triage] id={document_id} 전체 실패 — legacy 유지: {exc}") @@ -548,8 +571,10 @@ async def process( await client.close() -async def _run_tier_triage(client: AIClient, doc: Document, session: AsyncSession) -> None: - """summary_triage (p3a_short_summary) 경로.""" +async def _run_tier_triage( + client: AIClient, doc: Document, session: AsyncSession, *, use_deep: bool = False +) -> None: + """summary_triage (p3a_short_summary) 경로. use_deep = process() 에서 전달 (drain 전용).""" document_id = doc.id text = doc.extracted_text or "" input_chars = len(text) @@ -557,6 +582,14 @@ async def _run_tier_triage(client: AIClient, doc: Document, session: AsyncSessio triage_start = time.perf_counter() parse_error: str | None = None triage_out = TriageOutput() + # drain 경유 시 triage 도 deep 슬롯(맥북) — sampling 은 triage 것 유지(결정성). + deep_triage_cfg = None + if use_deep and settings.ai.deep is not None: + deep_triage_cfg = settings.ai.deep.model_copy(update={ + "temperature": settings.ai.triage.temperature, + "top_p": settings.ai.triage.top_p, + "max_tokens": settings.ai.triage.max_tokens, + }) # 입력이 triage 한도 초과면 호출 생략하고 long_context 로 escalate if input_chars > TRIAGE_TEXT_LIMIT: @@ -597,13 +630,9 @@ async def _run_tier_triage(client: AIClient, doc: Document, session: AsyncSessio prompt = rendered.replace("{extracted_text}", text[:TRIAGE_TEXT_LIMIT]) try: - if use_deep and settings.ai.deep is not None: - # drain 전용 — deep 슬롯 endpoint + triage sampling (결정성 유지). - # 맥북 불가(StageDeferred)는 아래 generic except 에 먹히지 않게 먼저 전파. - deep_triage_cfg = settings.ai.deep.model_copy(update={ - "temperature": settings.ai.triage.temperature, - "max_tokens": settings.ai.triage.max_tokens, - }) + if deep_triage_cfg is not None: + # drain 전용 — deep 슬롯 endpoint + triage sampling. 맥북 불가(StageDeferred) + # 는 아래 generic except 에 먹히지 않게 먼저 전파. raw_triage = await call_deep_or_defer(client, prompt, cfg=deep_triage_cfg) else: raw_triage = await client.call_triage(prompt) @@ -674,6 +703,7 @@ async def _run_tier_triage(client: AIClient, doc: Document, session: AsyncSessio escalation_reason=escalation_reason, parse_error=parse_error, routing_decision=routing_decision, + model_name=(deep_triage_cfg.model if deep_triage_cfg is not None else None), ) @@ -688,6 +718,7 @@ async def _apply_triage_result( escalation_reason: str | None, parse_error: str | None, routing_decision=None, + model_name: str | None = None, # fair-share: 실제 호출 경로 모델 (None=triage 기본) ) -> None: """TriageOutput → Document 필드 + R2 suppression + envelope enqueue + audit. @@ -778,7 +809,7 @@ async def _apply_triage_result( layers_returned=["tldr", "bullets"] if not parse_error else [], cached=False, latency_ms=latency_ms, - model_name=settings.ai.triage.model, + model_name=(model_name or settings.ai.triage.model), prompt_version=(f"{SUMMARY_TRIAGE_TASK}@{pv}" if pv else SUMMARY_TRIAGE_TASK), error_code=parse_error, source="document_server",