feat: Phase 3.5 calibration — telemetry source + grounding/verifier 강화 #1

Merged
hyungi merged 6 commits from feat/phase3-5-calibration into main 2026-04-17 08:11:07 +09:00
20 changed files with 1615 additions and 33 deletions
+5
View File
@@ -17,6 +17,11 @@ logs/
# 데이터 (법령 다운로드 등)
data/
# eval/calibration 실행 결과 (baseline jsonl 등)
# reports/ 는 이미 tracked 파일 있음 → 전체 ignore 하지 않음
results/
artifacts/
# macOS
.DS_Store
._*
+83 -5
View File
@@ -10,17 +10,20 @@
"""
import asyncio
import hmac
import time
from typing import Annotated, Literal
from fastapi import APIRouter, BackgroundTasks, Depends, Query
from fastapi import APIRouter, BackgroundTasks, Depends, Header, Query
from pydantic import BaseModel
from sqlalchemy.ext.asyncio import AsyncSession
from core.auth import get_current_user
from core.config import settings
from core.database import get_session
from core.utils import setup_logger
from models.user import User
from services.document_telemetry import sanitize_source
from services.search.classifier_service import ClassifierResult, classify
from services.search.evidence_service import EvidenceItem, extract_evidence
from services.search.fusion_service import DEFAULT_FUSION
@@ -367,6 +370,48 @@ def _build_ask_debug(
)
def _resolve_eval_identity(
x_source: str | None,
x_eval_case_id: str | None,
x_eval_token: str | None,
) -> tuple[str, str | None]:
"""X-Source/X-Eval-Case-Id 신뢰 검증 (Phase 3.5 fix2).
규칙:
- 기본값: source='document_server', eval_case_id=None
- X-Source=eval 또는 X-Eval-Case-Id 가 들어왔다면 eval claim 으로 간주
- eval claim 은 X-Eval-Token == settings.eval_runner_token 일 때만 수용
(constant-time compare, env 미설정 시 항상 거부)
- 거부 시: 헤더 무시 + warning log + source=sanitize(non-eval) / eval_case_id=None
- 통과 시: source='eval', eval_case_id=x_eval_case_id
반환: (source, eval_case_id)
"""
claimed_source = sanitize_source(x_source)
is_eval_claim = (claimed_source == "eval") or bool(x_eval_case_id)
if not is_eval_claim:
# 일반 호출 — eval_case_id 강제 None (source != 'eval' 이면 case_id 의미 없음)
return claimed_source, None
# eval claim — token 검증
expected = settings.eval_runner_token
presented = x_eval_token or ""
token_valid = bool(expected) and hmac.compare_digest(presented, expected)
if not token_valid:
logger.warning(
"eval header rejected: source=%s case_id=%s token_present=%s expected_set=%s",
x_source, x_eval_case_id, bool(x_eval_token), bool(expected),
)
# 일반 호출로 강등 — source='eval' 주장은 무시, case_id 도 무시
# claimed_source 가 'eval' 이면 default 'document_server' 로
if claimed_source == "eval":
return "document_server", None
return claimed_source, None
# token OK — eval 라벨 수용
return "eval", x_eval_case_id
@router.get("/ask", response_model=AskResponse)
async def ask(
q: str,
@@ -375,14 +420,24 @@ async def ask(
background_tasks: BackgroundTasks,
limit: int = Query(10, ge=1, le=20, description="synthesis 입력 상한"),
debug: bool = Query(False, description="evidence/synthesis 중간 상태 노출"),
x_source: Annotated[str | None, Header(alias="X-Source")] = None,
x_eval_case_id: Annotated[str | None, Header(alias="X-Eval-Case-Id")] = None,
x_eval_token: Annotated[str | None, Header(alias="X-Eval-Token")] = None,
):
"""근거 기반 AI 답변 (Phase 3.5a).
Phase 3.3 기반 + classifier parallel + refusal gate + grounding re-gate.
실패 경로에서도 `results` 는 항상 반환.
Phase 3.5 calibration trust boundary (fix2):
- X-Source / X-Eval-Case-Id 는 X-Eval-Token 이 EVAL_RUNNER_TOKEN 와 일치하는
trusted internal eval runner 에서만 수용된다.
- 일반 client 의 X-Source=eval 시도는 무시되고 source='document_server' 로 강제.
- source != 'eval' 이면 eval_case_id 항상 None.
"""
t_total = time.perf_counter()
defense_log: dict = {} # per-layer flag snapshot
source, eval_case_id = _resolve_eval_identity(x_source, x_eval_case_id, x_eval_token)
# 1. 검색 파이프라인
pr = await run_search(
@@ -500,6 +555,9 @@ async def ask(
missing_aspects=classifier_result.missing_aspects or None,
model_name=resolve_primary_model(),
prompt_version=ASK_PROMPT_VERSION,
# Phase 3.5 calibration
source=source,
eval_case_id=eval_case_id,
)
debug_obj = None
if debug:
@@ -580,7 +638,10 @@ async def ask(
"elapsed_ms": verifier_result.elapsed_ms,
}
# ── Re-gate: 6-tier completeness 결정 (Phase 3.5b 4차 리뷰 확정) ──
# ── Re-gate: 7-tier completeness 결정 (Phase 3.5 B2 — Tier 4 신규 삽입, 재번호) ──
# 기존 6-tier (3.5b 4차 리뷰) + Tier 4(g_strong + v_strong_numeric + low_conf → refuse).
# 호환성: defense_layers["re_gate"] 의 string literal 들은 기존 그대로 유지.
# 신규 "refuse(grounding+verifier_numeric)" 만 추가.
completeness: Literal["full", "partial", "insufficient"] = "full"
covered_aspects = classifier_result.covered_aspects or None
missing_aspects = classifier_result.missing_aspects or None
@@ -591,6 +652,12 @@ async def ask(
v_strong = [f for f in grounding.strong_flags if f.startswith("verifier_")]
v_medium = [f for f in grounding.weak_flags if f.startswith("verifier_") and "_medium:" in f]
has_direct_negation = any("direct_negation" in f for f in v_strong)
# Phase 3.5 B2: verifier strong flags 중 numeric_conflict 만 카운트.
# promote(VERIFIER_NUMERIC_PROMOTE=1) 활성 시 critical numeric_conflict 가 strong 으로 승격되며
# 여기 카운트에 잡힘. promote off 면 항상 0 → Tier 4 활성 안 됨 (기존 동작 유지).
v_strong_numeric = sum(
1 for f in v_strong if f.startswith("verifier_numeric_conflict")
)
if len(g_strong) >= 2:
# Tier 1: grounding strong 2+ → refuse
@@ -613,13 +680,21 @@ async def ask(
sr.refused = True
sr.confidence = None
defense_log["re_gate"] = "refuse(grounding+low_conf+weak_ev)"
elif g_strong and v_strong_numeric >= 1 and sr.confidence == "low":
# Tier 4 (B2 신규): grounding strong + verifier numeric_conflict strong + low conf → refuse.
# verifier strong 단독 refuse 금지 원칙 유지 — g_strong 교차 필수.
completeness = "insufficient"
sr.answer = None
sr.refused = True
sr.confidence = None
defense_log["re_gate"] = "refuse(grounding+verifier_numeric)"
elif g_strong or has_direct_negation:
# Tier 4: grounding strong 1 또는 verifier direct_negation 단독 → partial
# Tier 5 (기존 4): grounding strong 1 또는 verifier direct_negation 단독 → partial
completeness = "partial"
sr.confidence = "low"
defense_log["re_gate"] = "partial(strong_or_negation)"
elif v_medium:
# Tier 5: verifier medium 누적 → count 기반 confidence 하향
# Tier 6 (기존 5): verifier medium 누적 → count 기반 confidence 하향
medium_count = len(v_medium)
if medium_count >= 3:
sr.confidence = "low"
@@ -630,7 +705,7 @@ async def ask(
else:
defense_log["re_gate"] = f"medium_x{medium_count}(no_action)"
elif grounding.weak_flags:
# Tier 6: weak → confidence 한 단계 하향
# Tier 7 (기존 6): weak → confidence 한 단계 하향
if sr.confidence == "high":
sr.confidence = "medium"
defense_log["re_gate"] = "conf_lower(weak)"
@@ -697,6 +772,9 @@ async def ask(
missing_aspects=missing_aspects,
model_name=resolve_primary_model(),
prompt_version=ASK_PROMPT_VERSION,
# Phase 3.5 calibration
source=source,
eval_case_id=eval_case_id,
)
debug_obj = None
+6
View File
@@ -45,6 +45,10 @@ class Settings(BaseModel):
jwt_secret: str = ""
totp_secret: str = ""
# Phase 3.5: eval runner shared secret — X-Source=eval / X-Eval-Case-Id 헤더 신뢰 검증.
# 비어있으면 모든 eval 헤더 거부 (부재 = 비활성).
eval_runner_token: str = ""
# kordoc
kordoc_endpoint: str = "http://kordoc-service:3100"
@@ -62,6 +66,7 @@ def load_settings() -> Settings:
database_url = os.getenv("DATABASE_URL", "")
jwt_secret = os.getenv("JWT_SECRET", "")
totp_secret = os.getenv("TOTP_SECRET", "")
eval_runner_token = os.getenv("EVAL_RUNNER_TOKEN", "")
kordoc_endpoint = os.getenv("KORDOC_ENDPOINT", "http://kordoc-service:3100")
ocr_endpoint = os.getenv("OCR_ENDPOINT", "http://ocr-service:3200")
@@ -113,6 +118,7 @@ def load_settings() -> Settings:
nas_pkm_root=nas_pkm,
jwt_secret=jwt_secret,
totp_secret=totp_secret,
eval_runner_token=eval_runner_token,
kordoc_endpoint=kordoc_endpoint,
ocr_endpoint=ocr_endpoint,
taxonomy=taxonomy,
+4
View File
@@ -39,6 +39,10 @@ class AskEvent(Base):
missing_aspects: Mapped[list[Any] | None] = mapped_column(JSONB)
model_name: Mapped[str | None] = mapped_column(Text)
prompt_version: Mapped[str | None] = mapped_column(Text)
# Phase 3.5 calibration: eval/production 분리 + golden join 키
# 138~141 단계: nullable. 142 적용 후 source 는 NOT NULL (DB 강제, 앱은 항상 채움).
source: Mapped[str | None] = mapped_column(Text)
eval_case_id: Mapped[str | None] = mapped_column(Text)
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), default=datetime.now, nullable=False
)
+4 -3
View File
@@ -2,7 +2,7 @@ You are a grounding verifier. Given an answer and its evidence sources, check if
## Contradiction Types (IMPORTANT — severity depends on type)
- **direct_negation** (CRITICAL): Answer directly contradicts evidence. Examples: evidence "의무" but answer "권고"; evidence "금지" but answer "허용"; negation reversal ("~해야 한다" vs "~할 필요 없다").
- **numeric_conflict**: Answer states a number different from evidence. "50명" in evidence but "100명" in answer. Only flag if the same concept is referenced.
- **numeric_conflict**: Answer states a number different from evidence. "50명" in evidence but "100명" in answer. Only flag if the same concept is referenced. severity=critical when the number is the CORE answered quantity (amount/count/rate/date/duration that the query asked for); severity=minor when the number is peripheral (e.g., example/footnote).
- **intent_core_mismatch**: Answer addresses a fundamentally different topic than the query asked about.
- **nuance**: Answer overgeneralizes or adds qualifiers not in evidence (e.g., "모든" when evidence says "일부").
- **unsupported_claim**: Answer makes a factual claim with no basis in any evidence.
@@ -10,7 +10,7 @@ You are a grounding verifier. Given an answer and its evidence sources, check if
## Rules
1. Compare each claim in the answer against the cited evidence. A claim with [n] citation should be checked against evidence [n].
2. NOT a contradiction: Paraphrasing, summarizing, or restating the same fact in different words. Korean formal/informal style (합니다/한다) differences.
3. Numbers must match exactly after normalization (1,000 = 1000).
3. Numbers must match exactly after normalization (1,000 = 1000). Range values (e.g., "100~200명") satisfy any answer within range.
4. Legal/regulatory terms must preserve original meaning (의무 ≠ 권고, 금지 ≠ 제한, 허용 ≠ 금지).
5. Maximum 5 contradictions (most severe first: direct_negation > numeric_conflict > intent_core_mismatch > nuance > unsupported_claim).
@@ -30,7 +30,8 @@ You are a grounding verifier. Given an answer and its evidence sources, check if
severity mapping:
- direct_negation → "critical"
- All others → "minor"
- numeric_conflict → "critical" if the number is the CORE answered quantity, else "minor"
- All other types → "minor"
If no contradictions: {"contradictions": [], "verdict": "clean"}
+310 -24
View File
@@ -42,36 +42,267 @@ class GroundingResult:
weak_flags: list[str]
_UNIT_CHARS = r'명인개%년월일조항호세건원'
_UNIT_CHARS = r'명인개%년월일조항호세건원'
# "이상/이하/초과/미만" — threshold 표현 (numeric conflict 에서 skip 대상)
_THRESHOLD_SUFFIXES = re.compile(r'이상|이하|초과|미만')
# 약칭/근사치 prefix — 매칭 전 제거 (Phase 3.5 B1).
# ⚠ 최대/최소 는 의도적으로 제외 — 이들은 bound operator 라 의미가 다름 (Phase 3.5 B1 fix3).
# 약/대략/거의/얼추 만 노이즈 prefix 로 strip.
_APPROX_PREFIX_RE = re.compile(r'(약|대략|거의|얼추)\s*')
# 단위 동의어 dict — 추출 직후 정규화 (Phase 3.5 B1)
# 의미가 동일한 단위는 같은 표기로 통일해서 set 비교/range overlap 안정화.
_UNIT_SYNONYMS: dict[str, str] = {
"": "",
"사람": "",
"퍼센트": "%",
"프로": "%",
"KRW": "",
"krw": "",
}
# tolerance(±1%) 허용 단위 — 양적 측정값 (Phase 3.5 B1)
_TOLERANCE_UNITS: frozenset[str] = frozenset({"", "", "%", "", ""})
# tolerance 미적용 단위 — 식별자성 숫자 (연도/조문/횟수)
_EXACT_ONLY_UNITS: frozenset[str] = frozenset({"", "", "", "", "", "", ""})
# 최대/최소 prefix 패턴 — bound operator (Phase 3.5 B1 fix3).
# 매칭된 숫자는 exact pool 에서 제외하고 one-sided range 로 변환.
# 경계값 자체는 clear 대상 아님 (Codex 권장: "최대 100명" + answer "100명" → flag 유지).
_BOUND_PATTERN_RE = re.compile(
rf'(최대|최소)\s*(\d[\d,.]*)\s*([{_UNIT_CHARS}]|인|사람|퍼센트|프로|KRW|krw)'
)
_RANGE_INF = 10**18 # one-sided range 상한 sentinel
def _normalize_unit(unit: str) -> str:
"""단위 동의어 → 대표 표기."""
return _UNIT_SYNONYMS.get(unit, unit)
def _extract_unit(literal: str) -> str | None:
"""리터럴에서 숫자 뒤 단위(한 글자 또는 동의어) 추출 + 정규화."""
# 천단위 콤마 + 옵션 소수 + 한글 단위 한 글자 또는 동의어
m = re.match(rf'[\d,.]+\s*([{_UNIT_CHARS}]|인|사람|퍼센트|프로|KRW|krw)', literal)
if not m:
return None
return _normalize_unit(m.group(1))
def _extract_numeric_corpus(text: str) -> dict:
"""단위별 숫자 + 범위 + bound 통합 추출 (Phase 3.5 B1 fix1+fix3).
Returns:
{
"exact_by_unit": {unit_or_None: set(digits)}, # 평범한 숫자 (bound 제외)
"ranges_by_unit": {unit: [(lo, hi), ...]}, # 양방향(A~B) + 단방향(최대/최소)
}
None 키는 단위 없는 bare 숫자.
`최대 N <unit>` → ranges[(0, N-1)] (경계값 자체는 cleared 대상 아님)
`최소 N <unit>` → ranges[(N+1, INF)]
"""
cleaned = _APPROX_PREFIX_RE.sub('', text)
exact_by_unit: dict[str | None, set[str]] = {None: set()}
ranges_by_unit: dict[str, list[tuple[int, int]]] = {}
# 1) 최대/최소 — bound. exact pool 에서 제외, one-sided range 로 변환.
bound_spans: list[tuple[int, int]] = [] # 매칭 substring 위치 — 이후 단계에서 skip
for m in _BOUND_PATTERN_RE.finditer(cleaned):
bound_kind = m.group(1)
try:
n = int(m.group(2).replace(',', '').split('.')[0])
except ValueError:
continue
unit = _normalize_unit(m.group(3))
if bound_kind == "최대":
ranges_by_unit.setdefault(unit, []).append((0, max(0, n - 1)))
else: # 최소
ranges_by_unit.setdefault(unit, []).append((n + 1, _RANGE_INF))
bound_spans.append((m.start(), m.end()))
def _in_bound_span(pos: int) -> bool:
return any(s <= pos < e for s, e in bound_spans)
# 2) 천단위 콤마 bare number
for m in re.finditer(r'\d{1,3}(?:,\d{3})+(?:\.\d+)?', cleaned):
if _in_bound_span(m.start()):
continue
exact_by_unit[None].add(m.group().replace(',', ''))
# 3) 단위 있는 숫자 (단위 동의어 포함)
for m in re.finditer(
rf'(\d[\d,.]*)\s*([{_UNIT_CHARS}]|인|사람|퍼센트|프로|KRW|krw)',
cleaned,
):
if _in_bound_span(m.start()):
continue
digits = m.group(1).replace(',', '').split('.')[0]
if not digits:
continue
unit = _normalize_unit(m.group(2))
exact_by_unit.setdefault(unit, set()).add(digits)
# 4) 양방향 범위 표현 (A~B / A 부터 B)
for m in re.finditer(
rf'(\d[\d,.]*)\s*(?:[~\-]|부터)\s*(\d[\d,.]*)\s*([{_UNIT_CHARS}]|인|사람|퍼센트|프로)',
cleaned,
):
if _in_bound_span(m.start()):
continue
try:
lo = int(m.group(1).replace(',', '').split('.')[0])
hi = int(m.group(2).replace(',', '').split('.')[0])
except ValueError:
continue
unit = _normalize_unit(m.group(3))
ranges_by_unit.setdefault(unit, []).append((min(lo, hi), max(lo, hi)))
# 5) bare 2자리+ 단독 숫자
for m in re.finditer(r'\b(\d{2,})\b', cleaned):
if _in_bound_span(m.start()):
continue
exact_by_unit[None].add(m.group())
return {
"exact_by_unit": exact_by_unit,
"ranges_by_unit": ranges_by_unit,
}
def _within_unit_range(
n: int, unit: str | None, ranges_by_unit: dict[str, list[tuple[int, int]]]
) -> bool:
"""unit-matching range 검증.
answer unit 이 None (bare 숫자) 면 보수적으로 False — bare 답변은 range clear 대상 아님.
"""
if unit is None:
return False
return any(lo <= n <= hi for lo, hi in ranges_by_unit.get(unit, []))
def _close_to_unit_pool(
n: int, unit: str | None, exact_by_unit: dict[str | None, set[str]], tol: float
) -> bool:
"""unit-matching tolerance 검증.
answer unit 이 None 이면 False — bare 답변은 tolerance 대상 아님.
같은 unit bucket 안의 후보만 비교.
"""
if unit is None:
return False
candidates = exact_by_unit.get(unit, set())
for c in candidates:
try:
cn = int(c)
except ValueError:
continue
if cn == 0:
continue
if abs(n - cn) / cn <= tol:
return True
return False
def _extract_number_literals(text: str) -> set[str]:
"""숫자 + 단위 추출 + normalize (Phase 3.5b 개선)."""
# 1. 숫자 + 한국어 단위 접미사
raw = set(re.findall(rf'\d[\d,.]*\s*[{_UNIT_CHARS}]\w{{0,2}}', text))
# 2. 범위 표현 (10~20%, 100-200명 등) — 양쪽 숫자 각각 추출
"""숫자 + 단위 추출 + normalize (Phase 3.5 B1: 6단계 확장).
1) 약칭 prefix 제거 ("약 100명""100명")
2) 천단위 콤마 bare number 우선 ("1,000""1000" set 등록)
3) 한국어 단위 접미사 매칭 (기존)
4) 범위 표현 양쪽 숫자 추출 (separator: ~, -, , 부터)
5) 단위 동의어 정규화 (인→명, 퍼센트→%, KRW→원)
6) bare 2자리+ 추출 (기존)
"""
# 1. 약칭 prefix 제거 (전체 텍스트에서)
cleaned = _APPROX_PREFIX_RE.sub('', text)
# 2. 천단위 콤마 bare number — normalize 된 값을 set 에 선등록
normalized: set[str] = set()
for m in re.finditer(r'\d{1,3}(?:,\d{3})+(?:\.\d+)?', cleaned):
normalized.add(m.group().replace(',', ''))
# 3. 숫자 + 한국어 단위 접미사 (동의어 포함)
raw: set[str] = set(re.findall(
rf'\d[\d,.]*\s*(?:[{_UNIT_CHARS}]|인|사람|퍼센트|프로|KRW|krw)\w{{0,2}}',
cleaned,
))
# 4. 범위 표현 — separator 에 "부터" 추가
for m in re.finditer(
rf'(\d[\d,.]*)\s*[~\-]\s*(\d[\d,.]*)\s*([{_UNIT_CHARS}])',
text,
rf'(\d[\d,.]*)\s*(?:[~\-]|부터)\s*(\d[\d,.]*)\s*([{_UNIT_CHARS}]|인|사람|퍼센트|프로)',
cleaned,
):
raw.add(m.group(1) + m.group(3))
raw.add(m.group(2) + m.group(3))
# 3. normalize
normalized = set()
unit_norm = _normalize_unit(m.group(3))
raw.add(m.group(1) + unit_norm)
raw.add(m.group(2) + unit_norm)
# 5. normalize: 단위 동의어 통일 + 콤마 제거
for r in raw:
# 단위 부분 정규화
m = re.match(r'([\d,.]+)\s*([^\d\s]+)', r)
if m:
digits_part = m.group(1)
unit_part = _normalize_unit(m.group(2))
normalized.add(digits_part + unit_part)
normalized.add(digits_part.replace(',', '') + unit_part)
normalized.add(r.strip())
num_only = re.match(r'[\d,.]+', r)
if num_only:
normalized.add(num_only.group().replace(',', ''))
# 4. 단독 숫자 (2자리 이상만 — 1자리는 오탐 과다)
for d in re.findall(r'\b(\d{2,})\b', text):
# 6. 단독 숫자 (2자리+ 만)
for d in re.findall(r'\b(\d{2,})\b', cleaned):
normalized.add(d)
return normalized
def _within_evidence_range(digits: str, raw: str, evidence_text: str) -> bool:
"""evidence 에 'A~B 단위' 가 있고 answer 의 숫자가 그 범위 안이면 True.
범위 단위는 무시 (단위 비교는 호출 전 단계). digits = 정수 문자열.
"""
try:
n = int(digits)
except ValueError:
return False
cleaned_ev = _APPROX_PREFIX_RE.sub('', evidence_text)
for m in re.finditer(
rf'(\d[\d,.]*)\s*(?:[~\-]|부터)\s*(\d[\d,.]*)\s*[{_UNIT_CHARS}]',
cleaned_ev,
):
try:
lo = int(m.group(1).replace(',', '').split('.')[0])
hi = int(m.group(2).replace(',', '').split('.')[0])
if min(lo, hi) <= n <= max(lo, hi):
return True
except ValueError:
continue
return False
def _close_to_any(n: int, candidates: set[str], tol: float) -> bool:
"""candidates 중 하나라도 (1±tol) 배율 안에 들어오면 True.
n 은 정수, candidates 는 digits-only 문자열 집합.
"""
for c in candidates:
try:
cn = int(c)
except ValueError:
continue
if cn == 0:
continue
if abs(n - cn) / cn <= tol:
return True
return False
def _extract_content_tokens(text: str) -> set[str]:
"""한국어 2자 이상 명사 + 영어 3자 이상 단어."""
return set(re.findall(r'[가-힣]{2,}|[a-zA-Z]{3,}', text))
@@ -156,19 +387,74 @@ def check(
if not answer or not evidence:
return GroundingResult([], [])
evidence_text = " ".join(e.span_text for e in evidence)
# ⚠ citation marker [n] 양측 제거 (대칭성 — Phase 3.5 B1)
evidence_text = re.sub(r'\[\d+\]', '', " ".join(e.span_text for e in evidence))
# ── Strong 1: fabricated number (equality, not substring) ──
# ⚠ citation marker [n] 제거 후 숫자 추출 (안 그러면 [1][2][3] 이 fabricated 로 오탐)
# ── Strong 1: fabricated number (unit-aware 3단계 — Phase 3.5 B1 fix1+fix3) ──
# Codex 지적 반영:
# - fix1: range/tolerance/exact 모두 단위 일치 시에만 clear
# (예: "150원" vs "100~200명" → flag 유지)
# - fix3: 최대/최소 prefix 는 bound 의미 보존
# (예: "최대 100명" + answer "100명" → flag 유지, "최대 100명" + answer "50명" → cleared)
answer_clean = re.sub(r'\[\d+\]', '', answer)
answer_nums = _extract_number_literals(answer_clean)
evidence_nums = _extract_number_literals(evidence_text)
evidence_digits = {re.sub(r'[^\d]', '', en) for en in evidence_nums}
evidence_digits.discard('')
for num in answer_nums:
digits_only = re.sub(r'[^\d]', '', num)
if digits_only and digits_only not in evidence_digits:
strong.append(f"fabricated_number:{num}")
answer_corpus = _extract_numeric_corpus(answer_clean)
evidence_corpus = _extract_numeric_corpus(evidence_text)
ev_exact_by_unit = evidence_corpus["exact_by_unit"]
ev_ranges_by_unit = evidence_corpus["ranges_by_unit"]
# cleared 는 (unit, digits) 쌍 단위로 추적 — 단위 충돌 케이스 방어
cleared_pairs: set[tuple[str | None, str]] = set()
# Pass 1: 각 (unit, digits) 가 evidence 에서 정당화되는지 판정
for unit, digits_set in answer_corpus["exact_by_unit"].items():
for d in digits_set:
# 1) exact match — 같은 unit bucket 내에서만
if d in ev_exact_by_unit.get(unit, set()):
cleared_pairs.add((unit, d))
continue
# bare answer (unit=None) 는 evidence bare bucket 도 보조 매칭
if unit is None and d in ev_exact_by_unit.get(None, set()):
cleared_pairs.add((unit, d))
continue
try:
n = int(d)
except ValueError:
continue
# 2) range — same-unit 만 (bare answer 는 range clear 대상 아님)
if _within_unit_range(n, unit, ev_ranges_by_unit):
cleared_pairs.add((unit, d))
continue
# 3) ±1% tolerance — 단위가 양적(_TOLERANCE_UNITS) + 4자리+ + same-unit
if (
unit in _TOLERANCE_UNITS
and len(d) >= 4
and _close_to_unit_pool(n, unit, ev_exact_by_unit, tol=0.01)
):
cleared_pairs.add((unit, d))
continue
# 식별자성 단위(_EXACT_ONLY_UNITS) 는 tolerance 패스 X.
# Pass 2: cleared 되지 않은 (unit, digits) 를 strong flag.
# 1자리 무시는 unit 이 식별자성(_EXACT_ONLY_UNITS: 년/월/일/조/항/호/회) 이 아닐 때만 적용.
# bare(None) 답변 숫자는 같은 digit 이 다른 unit 에서 cleared 됐으면 skip — 추출 부산물 방어.
# ⚠ 단위 cross-clear (예: "원" cleared → "명" 도 skip) 은 금지: Codex unit-mismatch 케이스가 깨짐.
unit_anchored_cleared: set[str] = {d for (u, d) in cleared_pairs if u is not None}
flagged_keys: set[tuple[str | None, str]] = set()
for unit, digits_set in answer_corpus["exact_by_unit"].items():
for d in digits_set:
if (unit, d) in cleared_pairs or (unit, d) in flagged_keys:
continue
# bare(None) 답변 숫자가 임의의 단위 bucket 에서 cleared 됐으면 duplicate 로 처리.
# 사례: "1,000명" → unit bucket "명" 에 1000 + bare bucket None 에 1000 (comma normalize 부산물).
# 이미 ("명", "1000") 가 cleared 라면 (None, "1000") 도 같은 사실을 가리키므로 skip.
if unit is None and d in unit_anchored_cleared:
continue
if len(d) < 2 and unit not in _EXACT_ONLY_UNITS:
continue
flagged_keys.add((unit, d))
# 사람이 읽기 좋게 "{digits}{unit}" 또는 bare 형태로 표기
label = f"{d}{unit}" if unit else d
strong.append(f"fabricated_number:{label}")
# ── Strong/Weak 2: query-answer intent alignment ──
query_content = _extract_content_tokens(query)
+12 -1
View File
@@ -17,6 +17,7 @@
from __future__ import annotations
import asyncio
import os
import time
from dataclasses import dataclass, field
from typing import TYPE_CHECKING, Literal
@@ -37,10 +38,20 @@ CIRCUIT_RECOVERY_SEC = 60
_failure_count = 0
_circuit_open_until: float | None = None
# Phase 3.5 B2: numeric_conflict severity promote 실험.
# import time 평가 — env 변경 후 process restart 필수 (docker compose restart fastapi).
# default=0 (off). production 적용은 B3 FP 검증 통과 후만.
_NUMERIC_PROMOTE = os.getenv("VERIFIER_NUMERIC_PROMOTE", "0") == "1"
# severity 매핑 (프롬프트 "critical"/"minor" → 코드 strong/medium/weak)
# Tier 4 (B2): _NUMERIC_PROMOTE=1 일 때 numeric_conflict critical → strong 으로 격상.
# minor 는 medium 유지 (FP 위험 분리).
_SEVERITY_MAP: dict[str, dict[str, Literal["strong", "medium", "weak"]]] = {
"direct_negation": {"critical": "strong", "minor": "strong"},
"numeric_conflict": {"critical": "medium", "minor": "medium"},
"numeric_conflict": (
{"critical": "strong", "minor": "medium"} if _NUMERIC_PROMOTE
else {"critical": "medium", "minor": "medium"}
),
"intent_core_mismatch": {"critical": "medium", "minor": "medium"},
"nuance": {"critical": "weak", "minor": "weak"},
"unsupported_claim": {"critical": "weak", "minor": "weak"},
+9
View File
@@ -333,6 +333,9 @@ async def record_ask_event(
missing_aspects: list[str] | None = None,
model_name: str | None = None,
prompt_version: str | None = None,
# Phase 3.5 calibration: source 분리 + golden join
source: str | None = None,
eval_case_id: str | None = None,
) -> None:
"""ask_events INSERT. background task에서 호출 — 에러 삼킴.
@@ -341,6 +344,10 @@ async def record_ask_event(
- covered_aspects / missing_aspects: classifier 결과 그대로
- model_name: resolve_primary_model() 또는 호출사이트 명시
- prompt_version: ASK_PROMPT_VERSION 상수
Phase 3.5 calibration:
- source: sanitize_source(X-Source 헤더) — eval/ui_search/ui_detail/...
- eval_case_id: X-Eval-Case-Id 헤더 (eval 호출만 채움)
"""
try:
async with async_session() as session:
@@ -364,6 +371,8 @@ async def record_ask_event(
missing_aspects=missing_aspects,
model_name=model_name,
prompt_version=prompt_version,
source=source,
eval_case_id=eval_case_id,
)
session.add(row)
await session.commit()
+13
View File
@@ -50,3 +50,16 @@ NYT_API_KEY=
# ─── 국가법령정보센터 (법령 모니터링) ───
LAW_OC=
# ─── Phase 3.5 B2: verifier numeric_conflict promote 실험 ───
# 0=off (기본, critical/minor 둘 다 medium), 1=on (critical → strong, minor 는 medium 유지).
# ⚠ env 변경 후 process restart 필수 (docker compose restart fastapi) — _SEVERITY_MAP 가 import time 평가됨.
# B3 FP 검증 (true FP < 20%) 통과 후만 production 적용.
VERIFIER_NUMERIC_PROMOTE=0
# ─── Phase 3.5 fix2: eval runner shared secret ───
# /ask 엔드포인트의 X-Source=eval / X-Eval-Case-Id 헤더 신뢰 검증 토큰.
# 비어있거나 클라이언트 X-Eval-Token 와 불일치 시 eval 헤더 거부 (warning log + source='document_server' 강등).
# 충분히 긴 random secret 권장 (예: openssl rand -hex 32).
# scripts/run_eval_ask.py runner 가 동일 값을 X-Eval-Token 헤더로 전송해야 eval telemetry 적재됨.
EVAL_RUNNER_TOKEN=
@@ -0,0 +1 @@
ALTER TABLE ask_events ADD COLUMN IF NOT EXISTS source TEXT DEFAULT 'document_server', ADD COLUMN IF NOT EXISTS eval_case_id TEXT
@@ -0,0 +1 @@
CREATE INDEX IF NOT EXISTS idx_ask_events_source_created ON ask_events(source, created_at DESC)
@@ -0,0 +1 @@
CREATE INDEX IF NOT EXISTS idx_ask_events_eval_case_id ON ask_events(eval_case_id) WHERE eval_case_id IS NOT NULL
@@ -0,0 +1 @@
UPDATE ask_events SET source = 'document_server' WHERE source IS NULL
@@ -0,0 +1 @@
ALTER TABLE ask_events ALTER COLUMN source SET NOT NULL
+18
View File
@@ -0,0 +1,18 @@
# Deferred migrations
이 디렉토리의 `*.sql` 파일은 `app/core/database.py:_parse_migration_files()`
`migrations_dir.glob("*.sql")` (non-recursive) 에 잡히지 않으므로 자동 적용 안 됨.
활성화 절차: `git mv migrations/_deferred/<file>.sql migrations/<file>.sql` 후 deploy.
## 142_ask_events_source_notnull.sql
`source` 컬럼에 NOT NULL 제약 추가. **1주 운영 관찰 후 적용 권장**:
조건:
- 138~141 적용 후 7일 운영
- `SELECT COUNT(*) FROM ask_events WHERE source IS NULL AND created_at > <deploy_time>;`
결과 0 확인 — 즉, 새 코드가 모든 INSERT 에 source 를 항상 채우는지 empirical 검증
- 위 0 이면 142 활성화 → docker compose restart fastapi (init_db 가 자동 적용)
이유: NOT NULL 적용 후 NULL INSERT 시도 시 ask_events 기록 실패 (data loss).
+745
View File
@@ -0,0 +1,745 @@
"""Phase 3.5 calibration CLI — ask_events 집계 + markdown report 생성.
사용법:
# Docker 컨테이너 내부 (권장 — DATABASE_URL 자동 주입)
docker compose exec fastapi python /app/scripts/calibrate_ask.py \\
--source eval --prompt-version search_synthesis.v1-400char \\
--run-label baseline_v1 --output reports/calibration_baseline_v1.md
# 로컬 (DATABASE_URL 환경변수 필요)
python scripts/calibrate_ask.py --inspect-shape
옵션:
--source eval / ui_search / ui_detail / document_server / ... (미지정=전체)
--prompt-version search_synthesis.v1-400char
--since / --until ISO8601, created_at 범위
--eval-split tuning(200) / confirm(100) / all (id 해시 기반 deterministic)
--run-label report 제목/파일명 라벨
--output .md 경로 (기본 reports/calibration.md). --format json 이면 .json 생성
--format md (사람용) | json (compare baseline)
--compare-against 비교 대상 .json baseline 경로 (Δ 컬럼 출력)
--sample-limit FP candidate CSV 행수 (기본 30, 케이스별 분배)
--fp-artifacts FP CSV 경로 (기본 artifacts/fp_candidates_{run_label}.csv)
--inspect-shape defense_layers JSON sample 5 출력 abort (Q0)
--threshold-overrides config/threshold_candidate.yaml Step 0 feasibility 미해결, 미구현
--dry-run DB 미접속, tests/calibrate_fixtures/sample_ask_events.json 로드
읽기 전용 INSERT/UPDATE/DELETE/ALTER 0. SELECT .
"""
from __future__ import annotations
import argparse
import asyncio
import csv
import hashlib
import json
import os
import sys
from dataclasses import asdict, dataclass, field
from datetime import datetime
from pathlib import Path
from typing import Any
# 프로젝트 루트의 app/ 디렉토리를 경로에 추가 (seed_admin.py 패턴)
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "app"))
from sqlalchemy import text
from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine, AsyncSession
# ─── 경로 / 기본값 ─────────────────────────────────────────
PROJECT_ROOT = Path(__file__).resolve().parent.parent
EVAL_GOLDEN_PATH = PROJECT_ROOT / "evals" / "ask_analyze_v1.jsonl"
DEFAULT_REPORT = PROJECT_ROOT / "reports" / "calibration.md"
ARTIFACTS_DIR = PROJECT_ROOT / "artifacts"
DRY_RUN_FIXTURE = PROJECT_ROOT / "tests" / "calibrate_fixtures" / "sample_ask_events.json"
# eval split 비율 (id 해시 기반 deterministic)
TUNING_RATIO = 0.667 # 200 / 300
# ─── argparse ────────────────────────────────────────────
def parse_args() -> argparse.Namespace:
p = argparse.ArgumentParser(description="Phase 3.5 ask_events calibration report")
p.add_argument("--source", default=None,
help="ask_events.source 필터 (eval / ui_search / ui_detail / 미지정=전체)")
p.add_argument("--prompt-version", default=None,
help="ask_events.prompt_version 필터 (예: search_synthesis.v1-400char)")
p.add_argument("--since", default=None, help="ISO8601, created_at >= since")
p.add_argument("--until", default=None, help="ISO8601, created_at < until")
p.add_argument("--eval-split", choices=["tuning", "confirm", "all"], default="all",
help="source='eval' 일 때 holdout split")
p.add_argument("--run-label", default=None, help="report 제목/파일명 라벨")
p.add_argument("--output", default=str(DEFAULT_REPORT), help="md 출력 경로")
p.add_argument("--format", choices=["md", "json"], default="md",
help="md 만 생성 또는 md+json 둘 다 (--format json 시)")
p.add_argument("--compare-against", default=None, help="비교 대상 .json baseline 경로")
p.add_argument("--sample-limit", type=int, default=30, help="FP candidate CSV 총 행수")
p.add_argument("--fp-artifacts", default=None, help="FP CSV 경로")
p.add_argument("--inspect-shape", action="store_true",
help="defense_layers JSON sample 5건 출력 후 abort")
p.add_argument("--threshold-overrides", default=None,
help="config/threshold_candidate.yaml — Step 0 feasibility 미해결로 v2 미구현")
p.add_argument("--dry-run", action="store_true",
help="DB 미접속, fixtures 로 출력 검증")
args = p.parse_args()
if args.threshold_overrides:
raise SystemExit(
"--threshold-overrides 는 v2 미구현. Step 0 feasibility 통과 후 SQL "
"reclassification 추가 예정. 1차는 baseline/candidate 를 코드 분기 run "
"(코드 일시 수정 → eval replay 2회) 으로 측정."
)
if not args.run_label:
args.run_label = f"calibration_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
return args
# ─── 공통 WHERE 조립 ──────────────────────────────────────
def build_filters(args: argparse.Namespace) -> tuple[str, dict[str, Any]]:
"""공통 WHERE 절 SQL + 바인딩 파라미터.
조건 4가지: source, prompt_version, since, until.
None 항목은 IS NULL 무력화 (SQL CASE 회피, 단순 OR 패턴).
"""
clauses = [
"(:source IS NULL OR source = :source)",
"(:prompt_version IS NULL OR prompt_version = :prompt_version)",
"(:since IS NULL OR created_at >= :since::timestamptz)",
"(:until IS NULL OR created_at < :until::timestamptz)",
]
params: dict[str, Any] = {
"source": args.source,
"prompt_version": args.prompt_version,
"since": args.since,
"until": args.until,
}
return " AND ".join(clauses), params
# ─── eval split (id 해시) ────────────────────────────────
def split_by_id_hash(case_id: str, ratio: float = TUNING_RATIO) -> str:
"""deterministic split — sha256(id) 의 첫 32bit 를 [0,1) 로.
< ratio 'tuning', >= ratio 'confirm'.
"""
h = hashlib.sha256(case_id.encode()).digest()
bucket = int.from_bytes(h[:4], "big") / 0xFFFFFFFF
return "tuning" if bucket < ratio else "confirm"
def load_eval_golden(path: Path) -> dict[str, dict[str, Any]]:
"""evals/ask_analyze_v1.jsonl → {id: case_dict}.
case {id, type, category, query, expected_behavior, critical_keywords, ...}.
"""
if not path.exists():
return {}
cases: dict[str, dict[str, Any]] = {}
with path.open("r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line:
continue
try:
obj = json.loads(line)
cid = obj.get("id")
if cid:
cases[cid] = obj
except json.JSONDecodeError:
continue
return cases
def filter_eval_split(cases: dict[str, dict], split: str) -> set[str]:
"""split='all' 이면 전체 id, 아니면 split 매칭만."""
if split == "all":
return set(cases.keys())
return {cid for cid in cases if split_by_id_hash(cid) == split}
# ─── DB fetchers (Q0~Q8) ─────────────────────────────────
async def fetch_shape_inspect(session: AsyncSession) -> list[dict]:
"""Q0: defense_layers 5건 stdout 검증용."""
sql = text("""
SELECT id, defense_layers, created_at
FROM ask_events
WHERE defense_layers IS NOT NULL
ORDER BY created_at DESC
LIMIT 5
""")
rows = (await session.execute(sql)).mappings().all()
return [dict(r) for r in rows]
async def fetch_total_rows(session: AsyncSession, where: str, params: dict) -> int:
sql = text(f"SELECT COUNT(*) AS n FROM ask_events WHERE {where}")
return (await session.execute(sql, params)).scalar_one()
async def fetch_regate_distribution(session, where, params) -> list[dict]:
"""Q1: defense_layers->>'re_gate' 분포."""
sql = text(f"""
SELECT
COALESCE(defense_layers->>'re_gate', '(null)') AS tier,
COUNT(*) AS n,
ROUND(100.0 * COUNT(*) / SUM(COUNT(*)) OVER (), 2) AS pct
FROM ask_events
WHERE {where}
GROUP BY 1
ORDER BY n DESC
""")
return [dict(r) for r in (await session.execute(sql, params)).mappings()]
async def fetch_score_histogram(session, where, params) -> list[dict]:
"""Q2: max_rerank_score 히스토그램 × bucket."""
sql = text(f"""
SELECT
CASE WHEN refused THEN 'refused'
WHEN completeness = 'full' THEN 'full'
WHEN completeness = 'partial' THEN 'partial'
ELSE 'insufficient' END AS bucket,
WIDTH_BUCKET(COALESCE(max_rerank_score, 0.0), 0.0, 1.0, 10) AS bin,
COUNT(*) AS n,
ROUND(AVG(max_rerank_score)::numeric, 3) AS avg_score
FROM ask_events
WHERE {where}
GROUP BY 1, 2
ORDER BY 1, 2
""")
return [dict(r) for r in (await session.execute(sql, params)).mappings()]
async def fetch_classifier_confusion(session, where, params) -> list[dict]:
"""Q3: classifier_verdict × completeness × refused."""
sql = text(f"""
SELECT
COALESCE(classifier_verdict, '(null)') AS verdict,
COALESCE(completeness, '(null)') AS completeness,
refused,
COUNT(*) AS n
FROM ask_events
WHERE {where}
GROUP BY 1, 2, 3
ORDER BY n DESC
""")
return [dict(r) for r in (await session.execute(sql, params)).mappings()]
async def fetch_verifier_distribution(session, where, params) -> list[dict]:
"""Q4: verifier severity 분포 (cast + COALESCE 안전 처리)."""
sql = text(f"""
SELECT
COALESCE(defense_layers->'verifier'->>'status', 'n/a') AS status,
COALESCE((defense_layers->'verifier'->>'medium_count')::int, 0) AS medium_count,
COALESCE((defense_layers->'verifier'->>'strong_count')::int, 0) AS strong_count,
COALESCE(completeness, '(null)') AS completeness,
COUNT(*) AS n
FROM ask_events
WHERE {where}
GROUP BY 1, 2, 3, 4
ORDER BY 1, 2, 3, 4
""")
return [dict(r) for r in (await session.execute(sql, params)).mappings()]
async def fetch_flag_frequencies(session, where, params) -> list[dict]:
"""Q5: hallucination_flags top-K, UNION ALL outer wrap.
출력: [{flag_type, strength, n}], n DESC, top 40.
"""
sql = text(f"""
SELECT * FROM (
SELECT split_part(flag, ':', 1) AS flag_type, 'strong' AS strength, COUNT(*) AS n
FROM ask_events,
jsonb_array_elements_text(defense_layers->'grounding'->'strong') AS flag
WHERE {where}
GROUP BY split_part(flag, ':', 1)
UNION ALL
SELECT split_part(flag, ':', 1) AS flag_type, 'weak' AS strength, COUNT(*) AS n
FROM ask_events,
jsonb_array_elements_text(defense_layers->'grounding'->'weak') AS flag
WHERE {where}
GROUP BY split_part(flag, ':', 1)
) u
ORDER BY n DESC
LIMIT 40
""")
return [dict(r) for r in (await session.execute(sql, params)).mappings()]
async def fetch_fabricated_strong_rate(session, where, params) -> dict[str, float]:
"""B1 검증용: fabricated_number strong rate (raw count 아님).
rate = (fabricated_number strong 1+ 등장한 ) / 전체 ask_events .
"""
sql = text(f"""
SELECT
COUNT(*) AS total,
SUM(CASE WHEN EXISTS (
SELECT 1 FROM jsonb_array_elements_text(defense_layers->'grounding'->'strong') f
WHERE f LIKE 'fabricated_number:%%'
) THEN 1 ELSE 0 END) AS hit
FROM ask_events
WHERE {where}
""")
row = (await session.execute(sql, params)).mappings().one()
total = int(row["total"] or 0)
hit = int(row["hit"] or 0)
rate = (hit / total) if total > 0 else 0.0
return {"total": total, "fabricated_strong_hit": hit, "rate": round(rate, 4)}
async def fetch_eval_join_with_split(
session, where, params, eval_cases: dict[str, dict], split_filter: set[str] | None,
) -> dict[str, Any]:
"""Q6: eval_case_id 기반 join + query string fallback.
출력:
- mismatch_groups: [{expected, actual, n, sample_queries}]
- eval_case_id_present: int
- eval_case_id_null: int
- join_failed_count: int (id 없고 query normalize 매칭 )
"""
sql = text(f"""
WITH ranked AS (
SELECT
id, eval_case_id, query, completeness, refused,
ROW_NUMBER() OVER (PARTITION BY COALESCE(eval_case_id, query)
ORDER BY created_at DESC) AS rn
FROM ask_events
WHERE {where} AND source = 'eval'
)
SELECT id, eval_case_id, query, completeness, refused
FROM ranked WHERE rn = 1
""")
rows = [dict(r) for r in (await session.execute(sql, params)).mappings()]
# query string normalize 헬퍼 (lower + trim + 공백 단일화)
import re as _re
def norm(q: str | None) -> str:
if not q:
return ""
return _re.sub(r"\s+", " ", q).strip().lower()
norm_to_id = {norm(c.get("query")): cid for cid, c in eval_cases.items()
if c.get("query")}
eval_case_id_present = 0
eval_case_id_null = 0
join_failed_count = 0
matched_pairs: list[tuple[str, dict, str, bool]] = [] # (cid, case, actual_completeness, actual_refused)
for row in rows:
cid = row.get("eval_case_id")
if cid:
eval_case_id_present += 1
case = eval_cases.get(cid)
if not case:
join_failed_count += 1
continue
else:
eval_case_id_null += 1
cid = norm_to_id.get(norm(row.get("query")))
if not cid:
join_failed_count += 1
continue
case = eval_cases.get(cid)
if not case:
join_failed_count += 1
continue
if split_filter is not None and cid not in split_filter:
continue
actual_completeness = row.get("completeness") or ("refused" if row.get("refused") else "(null)")
matched_pairs.append((cid, case, actual_completeness, bool(row.get("refused"))))
# group by (expected_behavior, actual)
groups: dict[tuple[str, str], list[str]] = {}
for cid, case, actual, refused in matched_pairs:
expected = case.get("expected_behavior", "(unknown)")
# eval JSONL 의 expected_behavior 가 'answered'/'refused'/...; actual 도 정규화
actual_norm = "refused" if refused else (actual or "(null)")
key = (expected, actual_norm)
groups.setdefault(key, []).append(case.get("query", ""))
mismatch_groups = []
for (exp, act), queries in sorted(groups.items(), key=lambda x: -len(x[1])):
mismatch_groups.append({
"expected": exp,
"actual": act,
"n": len(queries),
"sample_queries": queries[:3],
})
return {
"mismatch_groups": mismatch_groups,
"eval_case_id_present": eval_case_id_present,
"eval_case_id_null": eval_case_id_null,
"join_failed_count": join_failed_count,
"matched_total": len(matched_pairs),
}
async def fetch_fp_candidates(session, where, params, limit: int) -> list[dict]:
"""Q7: 3개 case (A/B/C) UNION ALL + candidate_reason 컬럼.
case limit/3 분배 (case 균형).
"""
per_case = max(1, limit // 3)
sql = text(f"""
WITH base AS (
SELECT
id, query, completeness, refused, classifier_verdict,
max_rerank_score, aggregate_score,
defense_layers->'grounding'->'strong' AS g_strong,
defense_layers->'verifier'->>'medium_count' AS v_medium,
defense_layers->>'re_gate' AS re_gate,
answer_length, prompt_version, source, eval_case_id, created_at
FROM ask_events WHERE {where}
),
case_a AS (
SELECT *, 'refused_high_rerank' AS candidate_reason
FROM base
WHERE refused = true AND COALESCE(max_rerank_score, 0.0) >= 0.35
ORDER BY created_at DESC LIMIT :per_case
),
case_b AS (
SELECT *, 'insufficient_classifier_sufficient' AS candidate_reason
FROM base
WHERE completeness = 'insufficient' AND classifier_verdict = 'sufficient'
ORDER BY created_at DESC LIMIT :per_case
),
case_c AS (
SELECT *, 'partial_only_fabricated_number' AS candidate_reason
FROM base
WHERE completeness = 'partial'
AND jsonb_array_length(COALESCE(g_strong, '[]'::jsonb)) = 1
AND (g_strong->>0) LIKE 'fabricated_number:%%'
ORDER BY created_at DESC LIMIT :per_case
)
SELECT * FROM case_a
UNION ALL SELECT * FROM case_b
UNION ALL SELECT * FROM case_c
""")
params2 = {**params, "per_case": per_case}
return [dict(r) for r in (await session.execute(sql, params2)).mappings()]
async def fetch_answer_length_distribution(session, where, params) -> list[dict]:
"""Q8: answer_length p25/p50/p75 × bucket."""
sql = text(f"""
SELECT
CASE WHEN refused THEN 'refused' ELSE COALESCE(completeness, '(null)') END AS bucket,
PERCENTILE_CONT(0.25) WITHIN GROUP (ORDER BY answer_length) AS p25,
PERCENTILE_CONT(0.50) WITHIN GROUP (ORDER BY answer_length) AS p50,
PERCENTILE_CONT(0.75) WITHIN GROUP (ORDER BY answer_length) AS p75,
AVG(answer_length)::int AS avg,
COUNT(*) AS n
FROM ask_events
WHERE {where} AND answer_length IS NOT NULL
GROUP BY 1
ORDER BY 1
""")
return [dict(r) for r in (await session.execute(sql, params)).mappings()]
# ─── rendering ───────────────────────────────────────────
def _md_table(headers: list[str], rows: list[list[Any]]) -> str:
if not rows:
return "_(empty)_\n"
lines = ["| " + " | ".join(headers) + " |",
"|" + "|".join(["---"] * len(headers)) + "|"]
for row in rows:
lines.append("| " + " | ".join(str(v) for v in row) + " |")
return "\n".join(lines) + "\n"
def render_markdown(sections: dict[str, Any], args: argparse.Namespace,
delta: dict[str, Any] | None = None) -> str:
label = args.run_label
out: list[str] = [f"# Calibration Report — {label}\n"]
out.append(f"Filter: source={args.source} prompt_version={args.prompt_version} "
f"since={args.since} until={args.until} eval_split={args.eval_split}\n")
out.append(f"Total rows: **{sections['total_rows']}**\n")
# 0. shape inspect (--inspect-shape 시 본 출력 자체가 sample)
if "shape_sample" in sections:
out.append("## 0. defense_layers shape sample (latest 5)\n")
for s in sections["shape_sample"]:
out.append(f"- id={s['id']} created_at={s['created_at']}\n")
out.append(" ```json\n")
out.append(" " + json.dumps(s["defense_layers"], ensure_ascii=False, indent=2).replace("\n", "\n ") + "\n")
out.append(" ```\n")
# 1. re-gate
out.append("## 1. Re-gate tier 분포\n")
out.append(_md_table(["tier", "n", "pct"],
[[r["tier"], r["n"], f"{r['pct']}%"] for r in sections["regate"]]))
# 2. score histogram
out.append("## 2. max_rerank_score 히스토그램 (bucket × bin 0~10)\n")
out.append(_md_table(["bucket", "bin", "n", "avg_score"],
[[r["bucket"], r["bin"], r["n"], r["avg_score"]] for r in sections["score_hist"]]))
# 3. classifier confusion
out.append("## 3. Classifier 혼동행렬 (verdict × completeness × refused)\n")
out.append(_md_table(["verdict", "completeness", "refused", "n"],
[[r["verdict"], r["completeness"], r["refused"], r["n"]] for r in sections["classifier"]]))
# 4. verifier
out.append("## 4. Verifier severity 분포\n")
out.append(_md_table(["status", "medium_count", "strong_count", "completeness", "n"],
[[r["status"], r["medium_count"], r["strong_count"], r["completeness"], r["n"]]
for r in sections["verifier"]]))
# 5. flags — 3개 표 (전체 / strong / weak)
flags = sections["flags"]
flags_strong = [f for f in flags if f["strength"] == "strong"]
flags_weak = [f for f in flags if f["strength"] == "weak"]
out.append("## 5. Hallucination flags top-K\n")
out.append("### 5.1 전체 top-20\n")
out.append(_md_table(["flag_type", "strength", "n"],
[[r["flag_type"], r["strength"], r["n"]] for r in flags[:20]]))
out.append("### 5.2 strong only top-10\n")
out.append(_md_table(["flag_type", "n"],
[[r["flag_type"], r["n"]] for r in flags_strong[:10]]))
out.append("### 5.3 weak only top-10\n")
out.append(_md_table(["flag_type", "n"],
[[r["flag_type"], r["n"]] for r in flags_weak[:10]]))
# B1 감시 — fabricated_number strong rate
fab = sections["fabricated_rate"]
out.append("### 5.4 fabricated_number strong rate (B1 추적용)\n")
out.append(f"- total rows: {fab['total']}\n")
out.append(f"- fabricated_strong hit: {fab['fabricated_strong_hit']}\n")
out.append(f"- **rate: {fab['rate'] * 100:.2f}%**\n")
# 6. eval mismatch (eval 일 때만)
if "eval" in sections:
ev = sections["eval"]
out.append("## 6. Eval golden mismatch (eval_case_id 기반)\n")
out.append(f"- eval_case_id present: {ev['eval_case_id_present']}\n")
out.append(f"- eval_case_id null (fallback): {ev['eval_case_id_null']}\n")
out.append(f"- join_failed_count: **{ev['join_failed_count']}**\n")
out.append(f"- matched total: {ev['matched_total']}\n\n")
out.append(_md_table(["expected", "actual", "n", "sample"],
[[g["expected"], g["actual"], g["n"], " | ".join(g["sample_queries"])[:120]]
for g in ev["mismatch_groups"]]))
# 7. FP candidates
fps = sections["fp_candidates"]
out.append(f"## 7. FP candidate sample (n={len(fps)}, case A/B/C 분리)\n")
out.append(f"전체 CSV: `{sections.get('fp_csv_path', '(미생성)')}`\n\n")
out.append(_md_table(
["case", "id", "completeness", "refused", "verdict", "max_score", "re_gate", "query"],
[[r["candidate_reason"], r["id"], r["completeness"], r["refused"],
r["classifier_verdict"], r["max_rerank_score"], r["re_gate"],
(r["query"] or "")[:60]] for r in fps]))
# 8. answer_length
out.append("## 8. answer_length 분포 (bucket × percentile)\n")
out.append(_md_table(["bucket", "p25", "p50", "p75", "avg", "n"],
[[r["bucket"], r["p25"], r["p50"], r["p75"], r["avg"], r["n"]]
for r in sections["answer_length"]]))
# 9. delta vs baseline
if delta:
out.append("## 9. Delta vs baseline\n")
out.append("```json\n")
out.append(json.dumps(delta, ensure_ascii=False, indent=2, default=str))
out.append("\n```\n")
return "".join(out)
def render_json(sections: dict[str, Any]) -> str:
return json.dumps(sections, ensure_ascii=False, indent=2, default=str)
def compute_delta(current: dict[str, Any], baseline: dict[str, Any]) -> dict[str, Any]:
"""간단 delta: total_rows + regate pct + fabricated_rate.
세밀한 비교는 향후 확장.
"""
delta: dict[str, Any] = {}
delta["total_rows"] = {
"current": current.get("total_rows"),
"baseline": baseline.get("total_rows"),
"diff": (current.get("total_rows") or 0) - (baseline.get("total_rows") or 0),
}
# regate tier 별 pct delta
base_regate = {r["tier"]: float(r["pct"]) for r in baseline.get("regate", [])}
cur_regate = {r["tier"]: float(r["pct"]) for r in current.get("regate", [])}
delta["regate_pct_diff_pp"] = {
tier: round(cur_regate.get(tier, 0.0) - base_regate.get(tier, 0.0), 2)
for tier in set(base_regate) | set(cur_regate)
}
# fabricated rate delta
cur_fr = current.get("fabricated_rate", {}).get("rate", 0.0)
base_fr = baseline.get("fabricated_rate", {}).get("rate", 0.0)
delta["fabricated_strong_rate"] = {
"current": cur_fr, "baseline": base_fr,
"diff_pp": round((cur_fr - base_fr) * 100, 2),
"rel_change_pct": (round((cur_fr - base_fr) / base_fr * 100, 2)
if base_fr > 0 else None),
}
return delta
# ─── FP CSV dump ──────────────────────────────────────────
def dump_fp_csv(rows: list[dict], path: Path) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
if not rows:
path.write_text("", encoding="utf-8")
return
# 안정된 컬럼 순서 (plan 명세)
cols = [
"id", "candidate_reason", "query", "completeness", "refused",
"classifier_verdict", "max_rerank_score", "aggregate_score",
"g_strong", "v_medium", "re_gate", "answer_length",
"prompt_version", "source", "eval_case_id", "created_at",
"is_true_fp", # 사용자 수기 작성용 공란
]
with path.open("w", encoding="utf-8", newline="") as f:
w = csv.DictWriter(f, fieldnames=cols)
w.writeheader()
for r in rows:
row_out = {c: r.get(c) for c in cols if c != "is_true_fp"}
row_out["is_true_fp"] = ""
# JSONB / dict 는 문자열로
for k, v in list(row_out.items()):
if isinstance(v, (list, dict)):
row_out[k] = json.dumps(v, ensure_ascii=False)
w.writerow(row_out)
# ─── dry-run (DB 없이 fixture 로드) ───────────────────────
def dry_run_sections() -> dict[str, Any]:
if not DRY_RUN_FIXTURE.exists():
# 최소한의 inline fixture
return {
"total_rows": 3,
"regate": [{"tier": "clean", "n": 2, "pct": 66.67},
{"tier": "refuse(grounding_2+strong)", "n": 1, "pct": 33.33}],
"score_hist": [],
"classifier": [],
"verifier": [],
"flags": [],
"fabricated_rate": {"total": 3, "fabricated_strong_hit": 0, "rate": 0.0},
"fp_candidates": [],
"answer_length": [],
}
return json.loads(DRY_RUN_FIXTURE.read_text(encoding="utf-8"))
# ─── main ─────────────────────────────────────────────────
async def run(args: argparse.Namespace) -> None:
if args.dry_run:
sections = dry_run_sections()
sections.setdefault("fp_csv_path", "(dry-run, CSV skipped)")
_emit(args, sections)
return
# DB 연결
database_url = os.getenv(
"DATABASE_URL", "postgresql+asyncpg://pkm:pkm@localhost:5432/pkm"
)
engine = create_async_engine(database_url, echo=False)
session_factory = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
async with session_factory() as session:
if args.inspect_shape:
sample = await fetch_shape_inspect(session)
print(json.dumps(
[{"id": s["id"], "created_at": str(s["created_at"]),
"defense_layers": s["defense_layers"]} for s in sample],
ensure_ascii=False, indent=2, default=str,
))
await engine.dispose()
return
where, params = build_filters(args)
total = await fetch_total_rows(session, where, params)
if total == 0:
print(f"WARNING: 필터 조건에 매칭되는 ask_events 행 0건. "
f"source={args.source} prompt_version={args.prompt_version} "
f"since={args.since} until={args.until}")
sections: dict[str, Any] = {"total_rows": total}
sections["regate"] = await fetch_regate_distribution(session, where, params)
sections["score_hist"] = await fetch_score_histogram(session, where, params)
sections["classifier"] = await fetch_classifier_confusion(session, where, params)
sections["verifier"] = await fetch_verifier_distribution(session, where, params)
sections["flags"] = await fetch_flag_frequencies(session, where, params)
sections["fabricated_rate"] = await fetch_fabricated_strong_rate(session, where, params)
sections["fp_candidates"] = await fetch_fp_candidates(
session, where, params, args.sample_limit)
sections["answer_length"] = await fetch_answer_length_distribution(
session, where, params)
# eval 전용
if args.source == "eval":
cases = load_eval_golden(EVAL_GOLDEN_PATH)
split_filter = (filter_eval_split(cases, args.eval_split)
if args.eval_split != "all" else None)
sections["eval"] = await fetch_eval_join_with_split(
session, where, params, cases, split_filter)
await engine.dispose()
# FP CSV dump
fp_csv = (Path(args.fp_artifacts) if args.fp_artifacts else
ARTIFACTS_DIR / f"fp_candidates_{args.run_label}.csv")
dump_fp_csv(sections["fp_candidates"], fp_csv)
sections["fp_csv_path"] = str(fp_csv)
_emit(args, sections)
def _emit(args: argparse.Namespace, sections: dict[str, Any]) -> None:
"""rendering + 파일 쓰기. compare-against 처리."""
delta = None
if args.compare_against:
baseline_path = Path(args.compare_against)
if baseline_path.exists():
baseline = json.loads(baseline_path.read_text(encoding="utf-8"))
delta = compute_delta(sections, baseline)
else:
print(f"WARNING: compare-against baseline not found: {baseline_path}")
md = render_markdown(sections, args, delta)
out_path = Path(args.output)
out_path.parent.mkdir(parents=True, exist_ok=True)
out_path.write_text(md, encoding="utf-8")
print(f"✓ markdown report: {out_path}")
if args.format == "json":
json_path = out_path.with_suffix(".json")
json_path.write_text(render_json(sections), encoding="utf-8")
print(f"✓ json baseline: {json_path}")
def main() -> None:
args = parse_args()
asyncio.run(run(args))
if __name__ == "__main__":
main()
@@ -0,0 +1,63 @@
{
"total_rows": 10,
"regate": [
{"tier": "clean", "n": 5, "pct": 50.0},
{"tier": "partial(strong_or_negation)", "n": 3, "pct": 30.0},
{"tier": "refuse(grounding_2+strong)", "n": 1, "pct": 10.0},
{"tier": "conf_low(medium_x3)", "n": 1, "pct": 10.0}
],
"score_hist": [
{"bucket": "full", "bin": 9, "n": 4, "avg_score": 0.87},
{"bucket": "full", "bin": 8, "n": 1, "avg_score": 0.78},
{"bucket": "partial", "bin": 5, "n": 3, "avg_score": 0.51},
{"bucket": "refused", "bin": 2, "n": 1, "avg_score": 0.18},
{"bucket": "insufficient", "bin": 1, "n": 1, "avg_score": 0.08}
],
"classifier": [
{"verdict": "sufficient", "completeness": "full", "refused": false, "n": 5},
{"verdict": "sufficient", "completeness": "partial", "refused": false, "n": 3},
{"verdict": "insufficient", "completeness": "insufficient", "refused": true, "n": 2}
],
"verifier": [
{"status": "ok", "medium_count": 0, "strong_count": 0, "completeness": "full", "n": 5},
{"status": "ok", "medium_count": 1, "strong_count": 0, "completeness": "partial", "n": 2},
{"status": "ok", "medium_count": 3, "strong_count": 0, "completeness": "partial", "n": 1},
{"status": "skipped", "medium_count": 0, "strong_count": 0, "completeness": "insufficient", "n": 2}
],
"flags": [
{"flag_type": "fabricated_number", "strength": "strong", "n": 2},
{"flag_type": "uncited_claim", "strength": "weak", "n": 4},
{"flag_type": "low_overlap", "strength": "weak", "n": 3},
{"flag_type": "intent_misalignment", "strength": "strong", "n": 1}
],
"fabricated_rate": {
"total": 10,
"fabricated_strong_hit": 2,
"rate": 0.2
},
"fp_candidates": [
{
"id": 101,
"candidate_reason": "refused_high_rerank",
"query": "샘플 질의 1",
"completeness": "insufficient",
"refused": true,
"classifier_verdict": "insufficient",
"max_rerank_score": 0.42,
"aggregate_score": 1.05,
"g_strong": [],
"v_medium": "0",
"re_gate": "refuse(score_gate)",
"answer_length": 0,
"prompt_version": "search_synthesis.v1-400char",
"source": "eval",
"eval_case_id": "ask_def_001",
"created_at": "2026-04-17T08:00:00+00:00"
}
],
"answer_length": [
{"bucket": "full", "p25": 280, "p50": 350, "p75": 395, "avg": 340, "n": 5},
{"bucket": "partial", "p25": 200, "p50": 260, "p75": 320, "avg": 255, "n": 3},
{"bucket": "refused", "p25": 0, "p50": 0, "p75": 0, "avg": 0, "n": 2}
]
}
+92
View File
@@ -0,0 +1,92 @@
"""Phase 3.5 fix2: /ask 의 X-Source / X-Eval-Case-Id trust boundary.
`_resolve_eval_identity()` 단위 테스트.
- token 없음/틀림 + X-Source=eval source='document_server', eval_case_id=None
- token 일치 + X-Source=eval + X-Eval-Case-Id=case_xxx ('eval', 'case_xxx')
- token 틀림 + X-Eval-Case-Id (X-Source 미지정) eval_case_id=None
- 일반 호출 (X-Source=ui_search, no eval headers) ('ui_search', None)
- env 미설정 (eval_runner_token='') 모든 eval claim 거부
"""
from __future__ import annotations
import os
import sys
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "app"))
import pytest
@pytest.fixture
def resolve_with_token(monkeypatch):
"""settings.eval_runner_token 을 monkey-patch 해서 _resolve_eval_identity 테스트."""
def _make(token: str):
from core import config as cfg_mod
from api import search as search_mod
# 두 모듈 모두에서 settings 객체 참조하므로 직접 attr 변경
monkeypatch.setattr(search_mod.settings, "eval_runner_token", token)
return search_mod._resolve_eval_identity
return _make
def test_no_token_no_eval_headers_default(resolve_with_token):
"""일반 호출 — eval 헤더 없음, source 기본값."""
resolve = resolve_with_token("secret123")
assert resolve(None, None, None) == ("document_server", None)
def test_normal_source_with_token(resolve_with_token):
"""ui_search 호출 — eval 클레임 아님이라 token 무관."""
resolve = resolve_with_token("secret123")
assert resolve("ui_search", None, None) == ("ui_search", None)
def test_eval_claim_no_token_rejected(resolve_with_token):
"""X-Source=eval 인데 token 없음 → 거부, source='document_server'."""
resolve = resolve_with_token("secret123")
assert resolve("eval", "case_001", None) == ("document_server", None)
def test_eval_claim_wrong_token_rejected(resolve_with_token):
"""token 틀림 → 거부."""
resolve = resolve_with_token("secret123")
assert resolve("eval", "case_001", "wrong_token") == ("document_server", None)
def test_eval_claim_correct_token_accepted(resolve_with_token):
"""token 일치 → 'eval' source + case_id 적재."""
resolve = resolve_with_token("secret123")
assert resolve("eval", "case_001", "secret123") == ("eval", "case_001")
def test_eval_case_id_only_no_source_no_token(resolve_with_token):
"""X-Eval-Case-Id 만 있고 token 없음 → 거부, case_id=None."""
resolve = resolve_with_token("secret123")
assert resolve(None, "case_001", None) == ("document_server", None)
def test_eval_case_id_only_wrong_token(resolve_with_token):
"""X-Eval-Case-Id 만 + token 틀림 → 거부."""
resolve = resolve_with_token("secret123")
assert resolve(None, "case_001", "wrong") == ("document_server", None)
def test_env_unset_rejects_even_correct_format(resolve_with_token):
"""settings.eval_runner_token='' 인 환경 → 모든 eval 클레임 거부."""
resolve = resolve_with_token("")
# token 헤더가 와도 server side 가 비어있으면 거부 (constant-time False)
assert resolve("eval", "case_001", "") == ("document_server", None)
assert resolve("eval", "case_001", "anything") == ("document_server", None)
def test_non_eval_source_forces_case_id_none(resolve_with_token):
"""X-Source=ui_detail + X-Eval-Case-Id (실수로 같이 보냄) → case_id=None.
eval claim 아님 (source != 'eval' 이고 case_id fallback 으로 eval claim 트리거)
이지만 source claim 명시적으로 non-eval 이라 token 검증 case_id None.
"""
resolve = resolve_with_token("secret123")
# case_id 가 있으면 eval claim 으로 처리됨 → token 없으면 거부 → ('ui_detail' 클레임,
# 하지만 거부 분기에서 claimed_source != 'eval' 이라 그대로 'ui_detail' 반환, case_id=None)
assert resolve("ui_detail", "case_001", None) == ("ui_detail", None)
+188
View File
@@ -0,0 +1,188 @@
"""Phase 3.5 B1 (fix1+fix3): unit-aware fabricated_number + bound semantics.
기준:
- 단위 일치 시에만 exact/range/tolerance clear (fix1: Codex unit-mismatch regression 방지)
- /대략/거의/얼추 approx prefix strip; 최대/최소 bound operator 보존 (fix3)
- tolerance 양적 단위(_TOLERANCE_UNITS) + 4자리+ ; 식별자성(_EXACT_ONLY_UNITS) strict
"""
from __future__ import annotations
import os
import sys
# tests/ → 프로젝트 루트 → app/
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "app"))
import pytest
from services.search.evidence_service import EvidenceItem
from services.search.grounding_check import check
def _ev(text: str, n: int = 1) -> EvidenceItem:
return EvidenceItem(
n=n,
chunk_id=None,
doc_id=100 + n,
title=f"doc{n}",
section_title=None,
span_text=text,
relevance=0.9,
rerank_score=0.85,
full_snippet=text,
source="llm",
)
def _has_fabricated(result, sub: str | None = None) -> bool:
for f in result.strong_flags:
if not f.startswith("fabricated_number:"):
continue
if sub is None or sub in f:
return True
return False
# ─── 콤마/prefix/range/단위 동의어/citation (기존 17 케이스) ──────
def test_comma_thousand_match():
r = check("질문", "총 1,000명 [1]", [_ev("총원은 1000명입니다.")])
assert not _has_fabricated(r, "1000")
def test_comma_thousand_reverse():
r = check("질문", "총 1000명 [1]", [_ev("총원은 1,000명입니다.")])
assert not _has_fabricated(r)
def test_approx_prefix_in_answer():
r = check("질문", "약 100명이 참여 [1]", [_ev("100명이 참여")])
assert not _has_fabricated(r)
def test_approx_prefix_in_evidence():
r = check("질문", "100명이 참여 [1]", [_ev("약 100명이 참여")])
assert not _has_fabricated(r)
def test_range_inner_value_passes():
r = check("질문", "약 150명 [1]", [_ev("100~200명 사이 추정")])
assert not _has_fabricated(r, "150")
def test_range_outer_value_flagged():
r = check("질문", "300명 [1]", [_ev("100~200명 사이 추정")])
assert _has_fabricated(r, "300")
def test_unit_synonym_in_to_myeong():
r = check("질문", "총 50인이 모임 [1]", [_ev("총 50명이 모임.")])
assert not _has_fabricated(r)
def test_unit_synonym_percent_to_pct():
r = check("질문", "비율 30퍼센트 [1]", [_ev("비율 30%이다.")])
assert not _has_fabricated(r)
def test_citation_marker_both_sides():
"""bug fix: evidence 측 [n] 미제거로 디지트 합쳐지던 케이스."""
r = check("질문", "가격 [1] 5,000원", [_ev("[2] 5,000원이 정확")])
assert not _has_fabricated(r)
def test_genuine_fabricated_number():
r = check("질문", "결과 777명 [1]", [_ev("500명, 300명을 받음.")])
assert _has_fabricated(r, "777")
def test_amount_4digit_tolerance_passes():
r = check("질문", "9,990원 [1]", [_ev("10,000원입니다.")])
assert not _has_fabricated(r)
def test_year_no_tolerance_flagged():
r = check("질문", "2024년 [1]", [_ev("2026년에 발효")])
assert _has_fabricated(r, "2024")
def test_article_no_tolerance_flagged():
r = check("질문", "제5조에 명시 [1]", [_ev("제6조에 따라")])
assert _has_fabricated(r)
def test_count_no_tolerance_flagged():
r = check("질문", "총 3회 위반 [1]", [_ev("총 4회 적발")])
assert _has_fabricated(r)
def test_three_digit_strict():
r = check("질문", "총 15개 [1]", [_ev("총 10개")])
assert _has_fabricated(r, "15")
def test_single_digit_ignored():
"""1자리 + 양적 단위 → 무시 (오탐 방지)."""
r = check("질문", "총 3개 발생 [1]", [_ev("관련 통계 별도")])
assert not _has_fabricated(r, "3개")
def test_range_korean_butter_separator():
r = check("질문", "약 150명 [1]", [_ev("100부터 200명까지 대상.")])
assert not _has_fabricated(r, "150")
# ─── fix1: unit-mismatch (Codex no-ship) ──────────────────
def test_won_vs_myeong_range_flagged():
"""answer '150원' vs evidence '100~200명' → 단위 불일치, flag 유지."""
r = check("질문", "약 150원이 든다 [1]", [_ev("대상은 100~200명")])
assert _has_fabricated(r, "150")
def test_won_vs_myeong_tolerance_flagged():
"""answer '9,990원' vs evidence '10,000명' → tolerance pool 단위 다름, flag 유지."""
r = check("질문", "9,990원 [1]", [_ev("10,000명입니다.")])
assert _has_fabricated(r, "9990")
def test_pct_vs_myeong_range_flagged():
"""answer '15%' vs evidence '10~20명' → 단위 불일치, flag 유지."""
r = check("질문", "약 15% [1]", [_ev("대상 10~20명")])
assert _has_fabricated(r, "15")
# ─── fix3: 최대/최소 bound semantics ───────────────────────
def test_choedae_exact_boundary_flagged():
"""evidence '최대 100명' + answer '100명' → 경계값 자체는 cleared 아님."""
r = check("질문", "100명이다 [1]", [_ev("최대 100명까지 가능")])
assert _has_fabricated(r, "100")
def test_choeso_exact_boundary_flagged():
"""evidence '최소 100명' + answer '100명' → 경계값 자체는 cleared 아님."""
r = check("질문", "100명이다 [1]", [_ev("최소 100명 이상 필요")])
assert _has_fabricated(r, "100")
def test_choedae_inner_value_passes():
"""evidence '최대 100명' + answer '50명' → bound 안, cleared."""
r = check("질문", "50명이다 [1]", [_ev("최대 100명까지 가능")])
assert not _has_fabricated(r, "50")
def test_choeso_above_value_passes():
"""evidence '최소 100명' + answer '150명' → bound 안, cleared."""
r = check("질문", "150명이다 [1]", [_ev("최소 100명 이상 필요")])
assert not _has_fabricated(r, "150")
def test_choedae_outer_value_flagged():
"""evidence '최대 100명' + answer '200명' → bound 밖, flag."""
r = check("질문", "200명이다 [1]", [_ev("최대 100명까지 가능")])
assert _has_fabricated(r, "200")
+58
View File
@@ -0,0 +1,58 @@
"""Phase 3.5 B2: verifier _SEVERITY_MAP env flag 테스트.
VERIFIER_NUMERIC_PROMOTE 환경변수에 따른 _SEVERITY_MAP 변화 검증.
모듈은 import time env 평가하므로 reload 필요.
"""
from __future__ import annotations
import importlib
import os
import sys
# tests/ → 프로젝트 루트 → app/
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "app"))
import pytest
def _reload_verifier(monkeypatch, value: str | None):
"""env 설정 후 verifier_service 를 reload 하여 _SEVERITY_MAP 재평가."""
if value is None:
monkeypatch.delenv("VERIFIER_NUMERIC_PROMOTE", raising=False)
else:
monkeypatch.setenv("VERIFIER_NUMERIC_PROMOTE", value)
from services.search import verifier_service
importlib.reload(verifier_service)
return verifier_service
def test_severity_map_off_default(monkeypatch):
"""env 미설정 → numeric_conflict critical 은 medium (기존 동작)."""
vs = _reload_verifier(monkeypatch, None)
assert vs._SEVERITY_MAP["numeric_conflict"]["critical"] == "medium"
assert vs._SEVERITY_MAP["numeric_conflict"]["minor"] == "medium"
assert vs._NUMERIC_PROMOTE is False
def test_severity_map_on_critical_promoted(monkeypatch):
"""VERIFIER_NUMERIC_PROMOTE=1 → critical 만 strong, minor 는 medium 유지."""
vs = _reload_verifier(monkeypatch, "1")
assert vs._SEVERITY_MAP["numeric_conflict"]["critical"] == "strong"
assert vs._SEVERITY_MAP["numeric_conflict"]["minor"] == "medium"
assert vs._NUMERIC_PROMOTE is True
def test_severity_map_off_explicit_zero(monkeypatch):
"""VERIFIER_NUMERIC_PROMOTE=0 명시 → off (default 와 동일)."""
vs = _reload_verifier(monkeypatch, "0")
assert vs._SEVERITY_MAP["numeric_conflict"]["critical"] == "medium"
assert vs._NUMERIC_PROMOTE is False
def test_direct_negation_invariant(monkeypatch):
"""direct_negation 은 env 무관 항상 strong (불변 — 안전장치)."""
for value in [None, "0", "1"]:
vs = _reload_verifier(monkeypatch, value)
assert vs._SEVERITY_MAP["direct_negation"]["critical"] == "strong"
assert vs._SEVERITY_MAP["direct_negation"]["minor"] == "strong"