Files
hyungi_document_server/app/services/search/verifier_service.py
T
hyungi 118f32f9b1 refactor(ai): PR #20 reframe cleanup — Ollama LLM 잔재 주석 정정
PR #20 (2026-05-14, GPU LLM 제거 + Mac mini 26B MLX 흡수) 의 swap 이
backends.json + 코드 주석/docstring 까지 따라가지 못한 표현 잔재 정리.

- app/ai/client.py: AIClient docstring 및 call_triage / call_fallback
  docstring 의 "4B Ollama" → "Mac mini 26B MLX" / "현재는 triage 와
  동일 엔드포인트" → "Claude Sonnet 4 API (PR #20 swap 완료)"
- app/core/config.py: triage/primary/fallback 주석 통합 + Phase 3.5
  classifier/verifier 주석에 PR #20 endpoint 명시 (history 보존)
- app/services/search/{llm_gate,classifier_service,verifier_service,
  evidence_service}.py: "fallback(Ollama)" / "Ollama concurrent OK"
  / "triage(4B Ollama)" 표현을 Mac mini 26B MLX endpoint 기준으로
  정정 + concurrent 안전성 별 검토 마커 추가
- app/services/digest/summarizer.py: "MLX hang/Ollama stall 방어"
  → "MLX hang / fallback Claude API stall 방어"
- app/services/prompt_versions.py: SUMMARY_TRIAGE_TASK + ASK_PROMPT_VERSION
  주석의 "4B Ollama" / "4B gemma Ollama" → Mac mini 26B MLX
- app/workers/classify_worker.py: B-1 tier triage docstring 정정

코드 동작 변경 0 (주석/docstring 만). embed_worker / study_question_embed_worker
의 "Ollama bge-m3" 표현은 사실 정확이라 유지.

검증:
- ollama list → bge-m3:latest 잔존 (embedding owner)
- /api/embeddings probe → 1024-dim 200 OK
- fastapi embed/ollama error 0 (last 10min)
- document.hyungi.net 200

plan: ~/.claude/plans/4-stateless-dongarra.md

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-15 12:09:15 +00:00

195 lines
6.7 KiB
Python

"""Exaone semantic verifier (Phase 3.5b).
답변-근거 간 의미적 모순(contradiction) 감지. rule-based grounding_check 가 못 잡는
미묘한 모순 포착. classifier 와 동일 패턴: circuit breaker + timeout + fail open.
## Severity 3단계
- strong: direct_negation (완전 모순) → re-gate 교차 자격
- medium: numeric_conflict, intent_core_mismatch → confidence 하향 (누적 시 강제 low)
- weak: nuance, unsupported_claim → 로깅 + mild confidence 하향
## 핵심 원칙
- **Verifier strong 단독 refuse 금지** — grounding strong 과 교차해야 refuse
- **Timeout 3s** — 느리면 없는 게 낫다 (fail open)
- MLX gate 미사용 (PR #20 이후 Mac mini 26B endpoint — concurrent 안전성 별 검토)
"""
from __future__ import annotations
import asyncio
import os
import time
from dataclasses import dataclass, field
from typing import TYPE_CHECKING, Literal
from ai.client import AIClient, _load_prompt, parse_json_response
from core.config import settings
from core.utils import setup_logger
if TYPE_CHECKING:
from .evidence_service import EvidenceItem
logger = setup_logger("verifier")
LLM_TIMEOUT_MS = 3000
CIRCUIT_THRESHOLD = 5
CIRCUIT_RECOVERY_SEC = 60
_failure_count = 0
_circuit_open_until: float | None = None
# Phase 3.5 B2: numeric_conflict severity promote 실험.
# import time 평가 — env 변경 후 process restart 필수 (docker compose restart fastapi).
# default=0 (off). production 적용은 B3 FP 검증 통과 후만.
_NUMERIC_PROMOTE = os.getenv("VERIFIER_NUMERIC_PROMOTE", "0") == "1"
# severity 매핑 (프롬프트 "critical"/"minor" → 코드 strong/medium/weak)
# Tier 4 (B2): _NUMERIC_PROMOTE=1 일 때 numeric_conflict critical → strong 으로 격상.
# minor 는 medium 유지 (FP 위험 분리).
_SEVERITY_MAP: dict[str, dict[str, Literal["strong", "medium", "weak"]]] = {
"direct_negation": {"critical": "strong", "minor": "strong"},
"numeric_conflict": (
{"critical": "strong", "minor": "medium"} if _NUMERIC_PROMOTE
else {"critical": "medium", "minor": "medium"}
),
"intent_core_mismatch": {"critical": "medium", "minor": "medium"},
"nuance": {"critical": "weak", "minor": "weak"},
"unsupported_claim": {"critical": "weak", "minor": "weak"},
}
@dataclass(slots=True)
class Contradiction:
"""개별 모순 발견."""
type: str # direct_negation / numeric_conflict / intent_core_mismatch / nuance / unsupported_claim
severity: Literal["strong", "medium", "weak"]
claim: str
evidence_ref: str
explanation: str
@dataclass(slots=True)
class VerifierResult:
status: Literal["ok", "timeout", "error", "circuit_open", "skipped"]
contradictions: list[Contradiction]
elapsed_ms: float
try:
VERIFIER_PROMPT = _load_prompt("verifier.txt")
except FileNotFoundError:
VERIFIER_PROMPT = ""
logger.warning("verifier.txt not found — verifier will always skip")
def _build_input(
answer: str,
evidence: list[EvidenceItem],
) -> str:
"""답변 + evidence spans → 프롬프트."""
spans = "\n\n".join(
f"[{e.n}] {(e.title or '').strip()}\n{e.span_text}"
for e in evidence
)
return (
VERIFIER_PROMPT
.replace("{answer}", answer)
.replace("{numbered_evidence}", spans)
)
def _map_severity(ctype: str, raw_severity: str) -> Literal["strong", "medium", "weak"]:
"""type + raw severity → 코드 severity 3단계."""
type_map = _SEVERITY_MAP.get(ctype, {"critical": "weak", "minor": "weak"})
return type_map.get(raw_severity, "weak")
async def verify(
query: str,
answer: str,
evidence: list[EvidenceItem],
) -> VerifierResult:
"""답변-근거 semantic 검증. Parallel with grounding_check.
Returns:
VerifierResult. status "ok" 이 아니면 contradictions 빈 리스트 (fail open).
"""
global _failure_count, _circuit_open_until
t_start = time.perf_counter()
if _circuit_open_until and time.time() < _circuit_open_until:
return VerifierResult("circuit_open", [], 0.0)
if not VERIFIER_PROMPT:
return VerifierResult("skipped", [], 0.0)
if not hasattr(settings.ai, "verifier") or settings.ai.verifier is None:
return VerifierResult("skipped", [], 0.0)
if not answer or not evidence:
return VerifierResult("skipped", [], 0.0)
prompt = _build_input(answer, evidence)
client = AIClient()
try:
async with asyncio.timeout(LLM_TIMEOUT_MS / 1000):
raw = await client._request(settings.ai.verifier, prompt)
_failure_count = 0
except asyncio.TimeoutError:
_failure_count += 1
if _failure_count >= CIRCUIT_THRESHOLD:
_circuit_open_until = time.time() + CIRCUIT_RECOVERY_SEC
logger.error(f"verifier circuit OPEN for {CIRCUIT_RECOVERY_SEC}s")
logger.warning("verifier timeout")
return VerifierResult(
"timeout", [],
(time.perf_counter() - t_start) * 1000,
)
except Exception as e:
_failure_count += 1
if _failure_count >= CIRCUIT_THRESHOLD:
_circuit_open_until = time.time() + CIRCUIT_RECOVERY_SEC
logger.error(f"verifier circuit OPEN for {CIRCUIT_RECOVERY_SEC}s")
logger.warning(f"verifier error: {e}")
return VerifierResult(
"error", [],
(time.perf_counter() - t_start) * 1000,
)
finally:
await client.close()
elapsed_ms = (time.perf_counter() - t_start) * 1000
parsed = parse_json_response(raw)
if not isinstance(parsed, dict):
logger.warning("verifier parse failed raw=%r", (raw or "")[:200])
return VerifierResult("error", [], elapsed_ms)
# contradiction 파싱
raw_items = parsed.get("contradictions") or []
if not isinstance(raw_items, list):
raw_items = []
results: list[Contradiction] = []
for item in raw_items[:5]:
if not isinstance(item, dict):
continue
ctype = item.get("type", "")
if ctype not in _SEVERITY_MAP:
ctype = "unsupported_claim"
raw_sev = item.get("severity", "minor")
severity = _map_severity(ctype, raw_sev)
claim = str(item.get("claim", ""))[:50]
ev_ref = str(item.get("evidence_ref", ""))[:50]
explanation = str(item.get("explanation", ""))[:30]
results.append(Contradiction(ctype, severity, claim, ev_ref, explanation))
logger.info(
"verifier ok query=%r contradictions=%d strong=%d medium=%d elapsed_ms=%.0f",
query[:60],
len(results),
sum(1 for c in results if c.severity == "strong"),
sum(1 for c in results if c.severity == "medium"),
elapsed_ms,
)
return VerifierResult("ok", results, elapsed_ms)