Files
hyungi_document_server/app/services/search/classifier_service.py
T
Hyungi Ahn ad3d51e3e0 fix(search): classifier + evidence gate 안으로 이동 (Mac mini 26B race 종결)
llm_gate.py docstring 영구 룰: "MLX primary 호출 경로는 예외 없이 gate 획득 필수".
PR #20 이후 classifier (Mac mini 26B 신규) + evidence (triage→Mac mini 26B 통합)
모두 gate 외부 실행 — concurrent 안전성 별 검토 명시. 1주 관찰 결과: race 빈번.

본 PR-Hermes-Docsrv-Search-1 Layer 1 fixture 측정:
- 8/10 query "conservative_refuse(no_classifier)" — classifier 가 동시 부하 시
  거의 모두 ReadTimeout 또는 wait_for(6s) timeout
- evidence ev_ms=15005 — synthesis 와 race 로 15s 누적

영향:
- ask total 시간 증가 (parallel race → serialized): query_analyzer 5s +
  classifier 3-5s + evidence 5s + synthesis 30s ≈ 40-45s 상한 (현실 평균)
- 응답률 ↑: race timeout 으로 인한 conservative_refuse 해소
- 사용자 체감: 빠른 거절 → 의미있는 답변. 단 대기 시간 ↑

후속:
- skill `docsrv_ask` curl `--max-time 20` → 60s 상향 필요 (별 PR 또는 본 PR
  안의 follow-up)
- 본 메모리 `2026-05-21 Mac mini 26B 1주 부하 측정` observation 의 결정
  outcome: gate 복귀 (triage 별 작은 모델 재도입 옵션은 보류)
2026-05-16 19:54:55 +09:00

157 lines
5.5 KiB
Python

"""Answerability classifier (Phase 3.5a).
Mac mini 26B MLX 기반 (config.yaml ai.models.classifier — PR #20 이후 triage/primary/classifier 동일 endpoint). MLX gate 밖 — evidence extraction 과 병렬 실행 (concurrent 안전성 별 검토).
P1 실측 결과: ternary (full/partial/insufficient) 불안정 → **binary (sufficient/insufficient)**.
"full" vs "partial" 구분은 grounding_check 의 intent alignment 이 담당.
Classifier verdict 는 "relevant evidence 가 있나" 의 binary 판단.
covered_aspects / missing_aspects 는 로깅용으로 유지 (refusal gate 에서 사용 안 함).
"""
from __future__ import annotations
import asyncio
import time
from dataclasses import dataclass
from typing import Literal
from ai.client import AIClient, _load_prompt, parse_json_response
from core.config import settings
from core.utils import setup_logger
from .llm_gate import get_mlx_gate
logger = setup_logger("classifier")
LLM_TIMEOUT_MS = 30000
CIRCUIT_THRESHOLD = 5
CIRCUIT_RECOVERY_SEC = 60
_failure_count = 0
_circuit_open_until: float | None = None
@dataclass(slots=True)
class ClassifierResult:
status: Literal["ok", "timeout", "error", "circuit_open", "skipped"]
verdict: Literal["sufficient", "insufficient"] | None
covered_aspects: list[str]
missing_aspects: list[str]
elapsed_ms: float
try:
CLASSIFIER_PROMPT = _load_prompt("classifier.txt")
except FileNotFoundError:
CLASSIFIER_PROMPT = ""
logger.warning("classifier.txt not found — classifier will always skip")
def _build_input(
query: str,
top_chunks: list[dict],
rerank_scores: list[float],
) -> str:
"""Y+ input (content + scores with role separation)."""
chunk_block = "\n".join(
f"[{i+1}] title: {c.get('title','')}\n"
f" section: {c.get('section','')}\n"
f" snippet: {c.get('snippet','')}"
for i, c in enumerate(top_chunks[:3])
)
scores_str = ", ".join(f"{s:.2f}" for s in rerank_scores[:3])
return (
CLASSIFIER_PROMPT
.replace("{query}", query)
.replace("{chunks}", chunk_block)
.replace("{scores}", scores_str)
)
async def classify(
query: str,
top_chunks: list[dict],
rerank_scores: list[float],
) -> ClassifierResult:
"""Always-on binary classifier. Parallel with evidence extraction.
Returns:
ClassifierResult with verdict=sufficient|insufficient.
Status "ok" 이 아니면 verdict=None (caller 가 fallback 처리).
"""
global _failure_count, _circuit_open_until
t_start = time.perf_counter()
# Circuit breaker
if _circuit_open_until and time.time() < _circuit_open_until:
return ClassifierResult("circuit_open", None, [], [], 0.0)
if not CLASSIFIER_PROMPT:
return ClassifierResult("skipped", None, [], [], 0.0)
if not hasattr(settings.ai, "classifier") or settings.ai.classifier is None:
return ClassifierResult("skipped", None, [], [], 0.0)
prompt = _build_input(query, top_chunks, rerank_scores)
client = AIClient()
try:
# 2026-05-17: PR #20 이후 endpoint 가 Mac mini 26B → llm_gate Semaphore(1) 필수.
# Gate 미사용 시 classifier + evidence + synthesis 가 동시에 single-inference
# MLX 에 race → 거의 모두 timeout (실측: 8/10 fixture query). docstring 영구 룰:
# "MLX primary 호출 경로는 예외 없이 gate 획득 필수".
async with get_mlx_gate():
async with asyncio.timeout(LLM_TIMEOUT_MS / 1000):
raw = await client._request(settings.ai.classifier, prompt)
_failure_count = 0
except asyncio.TimeoutError:
_failure_count += 1
if _failure_count >= CIRCUIT_THRESHOLD:
_circuit_open_until = time.time() + CIRCUIT_RECOVERY_SEC
logger.error(f"classifier circuit OPEN for {CIRCUIT_RECOVERY_SEC}s")
logger.warning("classifier timeout")
return ClassifierResult(
"timeout", None, [], [],
(time.perf_counter() - t_start) * 1000,
)
except Exception as e:
_failure_count += 1
if _failure_count >= CIRCUIT_THRESHOLD:
_circuit_open_until = time.time() + CIRCUIT_RECOVERY_SEC
logger.error(f"classifier circuit OPEN for {CIRCUIT_RECOVERY_SEC}s")
logger.warning("classifier error: type=%s repr=%r", type(e).__name__, e)
return ClassifierResult(
"error", None, [], [],
(time.perf_counter() - t_start) * 1000,
)
finally:
await client.close()
elapsed_ms = (time.perf_counter() - t_start) * 1000
parsed = parse_json_response(raw)
if not isinstance(parsed, dict):
logger.warning("classifier parse failed raw=%r", (raw or "")[:200])
return ClassifierResult("error", None, [], [], elapsed_ms)
# ternary → binary 매핑
raw_verdict = parsed.get("verdict", "")
if raw_verdict == "insufficient":
verdict: Literal["sufficient", "insufficient"] | None = "insufficient"
elif raw_verdict in ("full", "partial", "sufficient"):
verdict = "sufficient"
else:
verdict = None
covered = parsed.get("covered_aspects") or []
missing = parsed.get("missing_aspects") or []
if not isinstance(covered, list):
covered = []
if not isinstance(missing, list):
missing = []
logger.info(
"classifier ok query=%r verdict=%s (raw=%s) covered=%d missing=%d elapsed_ms=%.0f",
query[:60], verdict, raw_verdict, len(covered), len(missing), elapsed_ms,
)
return ClassifierResult("ok", verdict, covered, missing, elapsed_ms)