2d86683636
코드리뷰 AIClient 정비 PR-B (#2 gate·#3 httpx·#4 public). #2 gate 구조 (call-site 컨벤션 — gate 는 caller-managed, AIClient self-gate 금지): · classify_worker consumer call_triage: gate 없이 Mac mini 직타하던 것 → acquire_mlx_gate(BACKGROUND). (drain 경로 call_deep_or_defer 는 맥북 deep 슬롯이라 mini gate 무관, 미적용.) · verifier_service: gate 없이 _request(verifier) 하던 것 → acquire_mlx_gate(FOREGROUND) + call_verifier. classifier/evidence 와 동일 gate 공유로 thundering-herd(22-timeout 사고) 방어. ★재진입 안전 검증: AIClient 메서드 내부 self-gate 0(전부 call-site) + evidence/classifier 는 이미 독립 gate 보유 + api/search 오케스트레이터 gate 미보유 → double-acquire 데드락 불가. #4 public 메서드: call_classifier/call_verifier 추가 → classifier/verifier_service 의 private _request 직접호출 봉인(egress 가드 일관 적용). gate 는 caller-managed 유지(call_primary 와 동일 계약). #3 공유 httpx: 호출마다 AsyncClient 생성(30+ 사이트)을 _get_shared_http() 단일 풀로 — keep-alive 재사용. 이벤트루프 바인딩이라 루프 변경(테스트) 시 재생성, close() 는 no-op. py_compile PASS. (잔여 #4: query_analyzer/digest/backends 의 _request·_call_chat 직접호출은 gated 라 안전, 후속 sweep.) Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
197 lines
7.0 KiB
Python
197 lines
7.0 KiB
Python
"""Exaone semantic verifier (Phase 3.5b).
|
|
|
|
답변-근거 간 의미적 모순(contradiction) 감지. rule-based grounding_check 가 못 잡는
|
|
미묘한 모순 포착. classifier 와 동일 패턴: circuit breaker + timeout + fail open.
|
|
|
|
## Severity 3단계
|
|
- strong: direct_negation (완전 모순) → re-gate 교차 자격
|
|
- medium: numeric_conflict, intent_core_mismatch → confidence 하향 (누적 시 강제 low)
|
|
- weak: nuance, unsupported_claim → 로깅 + mild confidence 하향
|
|
|
|
## 핵심 원칙
|
|
- **Verifier strong 단독 refuse 금지** — grounding strong 과 교차해야 refuse
|
|
- **Timeout 3s** — 느리면 없는 게 낫다 (fail open)
|
|
- MLX gate 사용 (Mac mini 26B endpoint — classifier/evidence 와 동일 gate 공유, 동시 race 방지)
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import os
|
|
import time
|
|
from dataclasses import dataclass, field
|
|
from typing import TYPE_CHECKING, Literal
|
|
|
|
from ai.client import AIClient, _load_prompt, parse_json_response
|
|
from core.config import settings
|
|
from core.utils import setup_logger
|
|
from .llm_gate import Priority, acquire_mlx_gate
|
|
|
|
if TYPE_CHECKING:
|
|
from .evidence_service import EvidenceItem
|
|
|
|
logger = setup_logger("verifier")
|
|
|
|
LLM_TIMEOUT_MS = 10000 # 2026-05-17 B-3: 3s 시 동시 부하 시 verifier 빈발 skip → grounding 약화. Mac mini 26B 가 verifier-style 짧은 LLM call 도 concurrent 호출 시 3s 초과 빈번 — 10s 로 raise
|
|
CIRCUIT_THRESHOLD = 5
|
|
CIRCUIT_RECOVERY_SEC = 60
|
|
|
|
_failure_count = 0
|
|
_circuit_open_until: float | None = None
|
|
|
|
# Phase 3.5 B2: numeric_conflict severity promote 실험.
|
|
# import time 평가 — env 변경 후 process restart 필수 (docker compose restart fastapi).
|
|
# default=0 (off). production 적용은 B3 FP 검증 통과 후만.
|
|
_NUMERIC_PROMOTE = os.getenv("VERIFIER_NUMERIC_PROMOTE", "0") == "1"
|
|
|
|
# severity 매핑 (프롬프트 "critical"/"minor" → 코드 strong/medium/weak)
|
|
# Tier 4 (B2): _NUMERIC_PROMOTE=1 일 때 numeric_conflict critical → strong 으로 격상.
|
|
# minor 는 medium 유지 (FP 위험 분리).
|
|
_SEVERITY_MAP: dict[str, dict[str, Literal["strong", "medium", "weak"]]] = {
|
|
"direct_negation": {"critical": "strong", "minor": "strong"},
|
|
"numeric_conflict": (
|
|
{"critical": "strong", "minor": "medium"} if _NUMERIC_PROMOTE
|
|
else {"critical": "medium", "minor": "medium"}
|
|
),
|
|
"intent_core_mismatch": {"critical": "medium", "minor": "medium"},
|
|
"nuance": {"critical": "weak", "minor": "weak"},
|
|
"unsupported_claim": {"critical": "weak", "minor": "weak"},
|
|
}
|
|
|
|
|
|
@dataclass(slots=True)
|
|
class Contradiction:
|
|
"""개별 모순 발견."""
|
|
type: str # direct_negation / numeric_conflict / intent_core_mismatch / nuance / unsupported_claim
|
|
severity: Literal["strong", "medium", "weak"]
|
|
claim: str
|
|
evidence_ref: str
|
|
explanation: str
|
|
|
|
|
|
@dataclass(slots=True)
|
|
class VerifierResult:
|
|
status: Literal["ok", "timeout", "error", "circuit_open", "skipped"]
|
|
contradictions: list[Contradiction]
|
|
elapsed_ms: float
|
|
|
|
|
|
try:
|
|
VERIFIER_PROMPT = _load_prompt("verifier.txt")
|
|
except FileNotFoundError:
|
|
VERIFIER_PROMPT = ""
|
|
logger.warning("verifier.txt not found — verifier will always skip")
|
|
|
|
|
|
def _build_input(
|
|
answer: str,
|
|
evidence: list[EvidenceItem],
|
|
) -> str:
|
|
"""답변 + evidence spans → 프롬프트."""
|
|
spans = "\n\n".join(
|
|
f"[{e.n}] {(e.title or '').strip()}\n{e.span_text}"
|
|
for e in evidence
|
|
)
|
|
return (
|
|
VERIFIER_PROMPT
|
|
.replace("{answer}", answer)
|
|
.replace("{numbered_evidence}", spans)
|
|
)
|
|
|
|
|
|
def _map_severity(ctype: str, raw_severity: str) -> Literal["strong", "medium", "weak"]:
|
|
"""type + raw severity → 코드 severity 3단계."""
|
|
type_map = _SEVERITY_MAP.get(ctype, {"critical": "weak", "minor": "weak"})
|
|
return type_map.get(raw_severity, "weak")
|
|
|
|
|
|
async def verify(
|
|
query: str,
|
|
answer: str,
|
|
evidence: list[EvidenceItem],
|
|
) -> VerifierResult:
|
|
"""답변-근거 semantic 검증. Parallel with grounding_check.
|
|
|
|
Returns:
|
|
VerifierResult. status "ok" 이 아니면 contradictions 빈 리스트 (fail open).
|
|
"""
|
|
global _failure_count, _circuit_open_until
|
|
t_start = time.perf_counter()
|
|
|
|
if _circuit_open_until and time.time() < _circuit_open_until:
|
|
return VerifierResult("circuit_open", [], 0.0)
|
|
|
|
if not VERIFIER_PROMPT:
|
|
return VerifierResult("skipped", [], 0.0)
|
|
|
|
if not hasattr(settings.ai, "verifier") or settings.ai.verifier is None:
|
|
return VerifierResult("skipped", [], 0.0)
|
|
|
|
if not answer or not evidence:
|
|
return VerifierResult("skipped", [], 0.0)
|
|
|
|
prompt = _build_input(answer, evidence)
|
|
client = AIClient()
|
|
try:
|
|
async with acquire_mlx_gate(Priority.FOREGROUND):
|
|
async with asyncio.timeout(LLM_TIMEOUT_MS / 1000):
|
|
raw = await client.call_verifier(prompt)
|
|
_failure_count = 0
|
|
except asyncio.TimeoutError:
|
|
_failure_count += 1
|
|
if _failure_count >= CIRCUIT_THRESHOLD:
|
|
_circuit_open_until = time.time() + CIRCUIT_RECOVERY_SEC
|
|
logger.error(f"verifier circuit OPEN for {CIRCUIT_RECOVERY_SEC}s")
|
|
logger.warning("verifier timeout")
|
|
return VerifierResult(
|
|
"timeout", [],
|
|
(time.perf_counter() - t_start) * 1000,
|
|
)
|
|
except Exception as e:
|
|
_failure_count += 1
|
|
if _failure_count >= CIRCUIT_THRESHOLD:
|
|
_circuit_open_until = time.time() + CIRCUIT_RECOVERY_SEC
|
|
logger.error(f"verifier circuit OPEN for {CIRCUIT_RECOVERY_SEC}s")
|
|
logger.warning(f"verifier error: {e}")
|
|
return VerifierResult(
|
|
"error", [],
|
|
(time.perf_counter() - t_start) * 1000,
|
|
)
|
|
finally:
|
|
await client.close()
|
|
|
|
elapsed_ms = (time.perf_counter() - t_start) * 1000
|
|
parsed = parse_json_response(raw)
|
|
if not isinstance(parsed, dict):
|
|
logger.warning("verifier parse failed raw=%r", (raw or "")[:200])
|
|
return VerifierResult("error", [], elapsed_ms)
|
|
|
|
# contradiction 파싱
|
|
raw_items = parsed.get("contradictions") or []
|
|
if not isinstance(raw_items, list):
|
|
raw_items = []
|
|
|
|
results: list[Contradiction] = []
|
|
for item in raw_items[:5]:
|
|
if not isinstance(item, dict):
|
|
continue
|
|
ctype = item.get("type", "")
|
|
if ctype not in _SEVERITY_MAP:
|
|
ctype = "unsupported_claim"
|
|
raw_sev = item.get("severity", "minor")
|
|
severity = _map_severity(ctype, raw_sev)
|
|
claim = str(item.get("claim", ""))[:50]
|
|
ev_ref = str(item.get("evidence_ref", ""))[:50]
|
|
explanation = str(item.get("explanation", ""))[:30]
|
|
results.append(Contradiction(ctype, severity, claim, ev_ref, explanation))
|
|
|
|
logger.info(
|
|
"verifier ok query=%r contradictions=%d strong=%d medium=%d elapsed_ms=%.0f",
|
|
query[:60],
|
|
len(results),
|
|
sum(1 for c in results if c.severity == "strong"),
|
|
sum(1 for c in results if c.severity == "medium"),
|
|
elapsed_ms,
|
|
)
|
|
return VerifierResult("ok", results, elapsed_ms)
|