feat(ask): Phase 3.5b guardrails — verifier + telemetry + grounding 강화

Phase 3.5a(classifier+refusal gate+grounding) 위에 4개 Item 추가:

Item 0: ask_events telemetry 배선
- AskEvent ORM 모델 + record_ask_event() — ask_events INSERT 완성
- defense_layers에 input_snapshot(query, chunks, answer) 저장
- refused/normal 두 경로 모두 telemetry 호출

Item 3: evidence 간 numeric conflict detection
- 동일 단위 다른 숫자 → weak flag
- "이상/이하/초과/미만" threshold 표현 → skip (FP 방지)

Item 4: fabricated_number normalization 개선
- 단위 접미사 건/원 추가, 범위 표현(10~20%) 양쪽 추출
- bare number 2자리 이상만 (1자리 FP 제거)

Item 1: exaone semantic verifier (판단권 잠금 배선)
- verifier_service.py — 3s timeout, circuit breaker, severity 3단계
- direct_negation만 strong, numeric/intent→medium, 나머지→weak
- verifier strong 단독 refuse 금지 — grounding과 교차 필수
- 6-tier re-gate (4라운드 리뷰 확정)
- grounding strong 2+ OR max_score<0.2 → verifier skip

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Hyungi Ahn
2026-04-10 09:49:56 +09:00
parent a0e1717206
commit b2306c3afd
9 changed files with 533 additions and 20 deletions

View File

@@ -42,17 +42,32 @@ class GroundingResult:
weak_flags: list[str]
_UNIT_CHARS = r'명인개%년월일조항호세건원'
# "이상/이하/초과/미만" — threshold 표현 (numeric conflict 에서 skip 대상)
_THRESHOLD_SUFFIXES = re.compile(r'이상|이하|초과|미만')
def _extract_number_literals(text: str) -> set[str]:
"""숫자 + 단위 추출 + normalize."""
raw = set(re.findall(r'\d[\d,.]*\s*[명인개%년월일조항호세]\w{0,2}', text))
"""숫자 + 단위 추출 + normalize (Phase 3.5b 개선)."""
# 1. 숫자 + 한국어 단위 접미사
raw = set(re.findall(rf'\d[\d,.]*\s*[{_UNIT_CHARS}]\w{{0,2}}', text))
# 2. 범위 표현 (10~20%, 100-200명 등) — 양쪽 숫자 각각 추출
for m in re.finditer(
rf'(\d[\d,.]*)\s*[~\-]\s*(\d[\d,.]*)\s*([{_UNIT_CHARS}])',
text,
):
raw.add(m.group(1) + m.group(3))
raw.add(m.group(2) + m.group(3))
# 3. normalize
normalized = set()
for r in raw:
normalized.add(r.strip())
num_only = re.match(r'[\d,.]+', r)
if num_only:
normalized.add(num_only.group().replace(',', ''))
# 단독 숫자도 추출
for d in re.findall(r'\b\d+\b', text):
# 4. 단독 숫자 (2자리 이상만 — 1자리는 오탐 과다)
for d in re.findall(r'\b(\d{2,})\b', text):
normalized.add(d)
return normalized
@@ -62,6 +77,73 @@ def _extract_content_tokens(text: str) -> set[str]:
return set(re.findall(r'[가-힣]{2,}|[a-zA-Z]{3,}', text))
def _parse_number_with_unit(literal: str) -> tuple[str, str] | None:
"""숫자 리터럴에서 (digits_only, unit) 분리. 단위 없으면 None."""
m = re.match(rf'([\d,.]+)\s*([{_UNIT_CHARS}])', literal)
if not m:
return None
digits = m.group(1).replace(',', '')
unit = m.group(2)
return (digits, unit)
def _check_evidence_numeric_conflicts(evidence: list["EvidenceItem"]) -> list[str]:
"""evidence 간 숫자 충돌 감지 (Phase 3.5b). evidence >= 2 일 때만 활성.
동일 단위, 다른 숫자 → weak flag. "이상/이하/초과/미만" 포함 시 skip.
bare number 는 비교 안 함 (조항 번호 등 false positive 방지).
"""
if len(evidence) < 2:
return []
# 각 evidence 에서 단위 있는 숫자 + threshold 여부 추출
# {evidence_idx: [(digits, unit, has_threshold), ...]}
per_evidence: dict[int, list[tuple[str, str, bool]]] = {}
for idx, ev in enumerate(evidence):
nums = re.findall(
rf'\d[\d,.]*\s*[{_UNIT_CHARS}]\w{{0,4}}',
ev.span_text,
)
entries = []
for raw in nums:
parsed = _parse_number_with_unit(raw)
if not parsed:
continue
has_thr = bool(_THRESHOLD_SUFFIXES.search(raw))
entries.append((parsed[0], parsed[1], has_thr))
if entries:
per_evidence[idx] = entries
if len(per_evidence) < 2:
return []
# 단위별로 evidence 간 숫자 비교
# {unit: {digits: [evidence_idx, ...]}}
unit_map: dict[str, dict[str, list[int]]] = {}
for idx, entries in per_evidence.items():
for digits, unit, has_thr in entries:
if has_thr:
continue # threshold 표현은 skip
if unit not in unit_map:
unit_map[unit] = {}
if digits not in unit_map[unit]:
unit_map[unit][digits] = []
if idx not in unit_map[unit][digits]:
unit_map[unit][digits].append(idx)
flags: list[str] = []
for unit, digits_map in unit_map.items():
distinct_values = list(digits_map.keys())
if len(distinct_values) >= 2:
# 가장 많이 등장하는 2개 비교
top2 = sorted(distinct_values, key=lambda d: len(digits_map[d]), reverse=True)[:2]
flags.append(
f"evidence_numeric_conflict:{top2[0]}{unit}_vs_{top2[1]}{unit}"
)
return flags
def check(
query: str,
answer: str,
@@ -113,6 +195,10 @@ def check(
if len(s.strip()) > 20 and not re.search(r'\[\d+\]', s):
weak.append(f"uncited_claim:{s[:40]}")
# ── Weak: evidence 간 숫자 충돌 (Phase 3.5b) ──
conflicts = _check_evidence_numeric_conflicts(evidence)
weak.extend(conflicts)
# ── Weak 2: token overlap ──
answer_tokens = _extract_content_tokens(answer)
evidence_tokens = _extract_content_tokens(evidence_text)

View File

@@ -0,0 +1,183 @@
"""Exaone semantic verifier (Phase 3.5b).
답변-근거 간 의미적 모순(contradiction) 감지. rule-based grounding_check 가 못 잡는
미묘한 모순 포착. classifier 와 동일 패턴: circuit breaker + timeout + fail open.
## Severity 3단계
- strong: direct_negation (완전 모순) → re-gate 교차 자격
- medium: numeric_conflict, intent_core_mismatch → confidence 하향 (누적 시 강제 low)
- weak: nuance, unsupported_claim → 로깅 + mild confidence 하향
## 핵심 원칙
- **Verifier strong 단독 refuse 금지** — grounding strong 과 교차해야 refuse
- **Timeout 3s** — 느리면 없는 게 낫다 (fail open)
- MLX gate 미사용 (GPU Ollama concurrent OK)
"""
from __future__ import annotations
import asyncio
import time
from dataclasses import dataclass, field
from typing import TYPE_CHECKING, Literal
from ai.client import AIClient, _load_prompt, parse_json_response
from core.config import settings
from core.utils import setup_logger
if TYPE_CHECKING:
from .evidence_service import EvidenceItem
logger = setup_logger("verifier")
LLM_TIMEOUT_MS = 3000
CIRCUIT_THRESHOLD = 5
CIRCUIT_RECOVERY_SEC = 60
_failure_count = 0
_circuit_open_until: float | None = None
# severity 매핑 (프롬프트 "critical"/"minor" → 코드 strong/medium/weak)
_SEVERITY_MAP: dict[str, dict[str, Literal["strong", "medium", "weak"]]] = {
"direct_negation": {"critical": "strong", "minor": "strong"},
"numeric_conflict": {"critical": "medium", "minor": "medium"},
"intent_core_mismatch": {"critical": "medium", "minor": "medium"},
"nuance": {"critical": "weak", "minor": "weak"},
"unsupported_claim": {"critical": "weak", "minor": "weak"},
}
@dataclass(slots=True)
class Contradiction:
"""개별 모순 발견."""
type: str # direct_negation / numeric_conflict / intent_core_mismatch / nuance / unsupported_claim
severity: Literal["strong", "medium", "weak"]
claim: str
evidence_ref: str
explanation: str
@dataclass(slots=True)
class VerifierResult:
status: Literal["ok", "timeout", "error", "circuit_open", "skipped"]
contradictions: list[Contradiction]
elapsed_ms: float
try:
VERIFIER_PROMPT = _load_prompt("verifier.txt")
except FileNotFoundError:
VERIFIER_PROMPT = ""
logger.warning("verifier.txt not found — verifier will always skip")
def _build_input(
answer: str,
evidence: list[EvidenceItem],
) -> str:
"""답변 + evidence spans → 프롬프트."""
spans = "\n\n".join(
f"[{e.n}] {(e.title or '').strip()}\n{e.span_text}"
for e in evidence
)
return (
VERIFIER_PROMPT
.replace("{answer}", answer)
.replace("{numbered_evidence}", spans)
)
def _map_severity(ctype: str, raw_severity: str) -> Literal["strong", "medium", "weak"]:
"""type + raw severity → 코드 severity 3단계."""
type_map = _SEVERITY_MAP.get(ctype, {"critical": "weak", "minor": "weak"})
return type_map.get(raw_severity, "weak")
async def verify(
query: str,
answer: str,
evidence: list[EvidenceItem],
) -> VerifierResult:
"""답변-근거 semantic 검증. Parallel with grounding_check.
Returns:
VerifierResult. status "ok" 이 아니면 contradictions 빈 리스트 (fail open).
"""
global _failure_count, _circuit_open_until
t_start = time.perf_counter()
if _circuit_open_until and time.time() < _circuit_open_until:
return VerifierResult("circuit_open", [], 0.0)
if not VERIFIER_PROMPT:
return VerifierResult("skipped", [], 0.0)
if not hasattr(settings.ai, "verifier") or settings.ai.verifier is None:
return VerifierResult("skipped", [], 0.0)
if not answer or not evidence:
return VerifierResult("skipped", [], 0.0)
prompt = _build_input(answer, evidence)
client = AIClient()
try:
async with asyncio.timeout(LLM_TIMEOUT_MS / 1000):
raw = await client._request(settings.ai.verifier, prompt)
_failure_count = 0
except asyncio.TimeoutError:
_failure_count += 1
if _failure_count >= CIRCUIT_THRESHOLD:
_circuit_open_until = time.time() + CIRCUIT_RECOVERY_SEC
logger.error(f"verifier circuit OPEN for {CIRCUIT_RECOVERY_SEC}s")
logger.warning("verifier timeout")
return VerifierResult(
"timeout", [],
(time.perf_counter() - t_start) * 1000,
)
except Exception as e:
_failure_count += 1
if _failure_count >= CIRCUIT_THRESHOLD:
_circuit_open_until = time.time() + CIRCUIT_RECOVERY_SEC
logger.error(f"verifier circuit OPEN for {CIRCUIT_RECOVERY_SEC}s")
logger.warning(f"verifier error: {e}")
return VerifierResult(
"error", [],
(time.perf_counter() - t_start) * 1000,
)
finally:
await client.close()
elapsed_ms = (time.perf_counter() - t_start) * 1000
parsed = parse_json_response(raw)
if not isinstance(parsed, dict):
logger.warning("verifier parse failed raw=%r", (raw or "")[:200])
return VerifierResult("error", [], elapsed_ms)
# contradiction 파싱
raw_items = parsed.get("contradictions") or []
if not isinstance(raw_items, list):
raw_items = []
results: list[Contradiction] = []
for item in raw_items[:5]:
if not isinstance(item, dict):
continue
ctype = item.get("type", "")
if ctype not in _SEVERITY_MAP:
ctype = "unsupported_claim"
raw_sev = item.get("severity", "minor")
severity = _map_severity(ctype, raw_sev)
claim = str(item.get("claim", ""))[:50]
ev_ref = str(item.get("evidence_ref", ""))[:50]
explanation = str(item.get("explanation", ""))[:30]
results.append(Contradiction(ctype, severity, claim, ev_ref, explanation))
logger.info(
"verifier ok query=%r contradictions=%d strong=%d medium=%d elapsed_ms=%.0f",
query[:60],
len(results),
sum(1 for c in results if c.severity == "strong"),
sum(1 for c in results if c.severity == "medium"),
elapsed_ms,
)
return VerifierResult("ok", results, elapsed_ms)