Files
hyungi_document_server/app/services/search/grounding_check.py
T
Hyungi Ahn 2665d4eb60 feat(grounding): Phase 3.5 B1 — unit-aware fabricated_number + bound semantics
Codex adversarial review (no-ship) 반영:

fix1: unit-aware numeric clearing
- _extract_numeric_corpus(): 단위별 bucket dict (exact_by_unit) +
  ranges_by_unit (양방향 + 단방향 bound 통합)
- _within_unit_range / _close_to_unit_pool: 같은 unit 안에서만 매칭
  bare answer 는 보수적으로 range/tolerance 패스 X
- 2-pass cleared_pairs (unit, digits): cross-unit cleared 절대 skip 안 함.
  bare(None) 답변은 unit-anchored cleared 시 duplicate 로 skip
  (콤마 normalize 부산물 보호 — Codex 케이스는 그대로 flag)

fix3: 최대/최소 bound semantics
- _APPROX_PREFIX_RE 에서 최대/최소 제거 (약/대략/거의/얼추 만 strip)
- _BOUND_PATTERN_RE: 최대 N → range (0, N-1), 최소 N → range (N+1, 1e18)
- 경계값 자체는 cleared 대상 아님 ("최대 100명" + answer "100명" → flag)
- bound span 내 숫자는 exact pool 에서 제외

기존 prefix strip / 콤마 / 부터 separator / 단위 동의어 / tolerance 4자리+ /
식별자성 단위 1자리 flag 동작 모두 유지.

tests/test_grounding_fabricated_number.py: 25 케이스 — 기존 17 + Codex
unit-mismatch 3 (won_vs_myeong_range/tol, pct_vs_myeong_range) + bound 5
(최대/최소 boundary/inner/outer).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-17 08:11:06 +09:00

506 lines
19 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Grounding check — post-synthesis 검증 (Phase 3.5a).
Strong/weak flag 분리:
- **Strong** (→ partial 강등 or refuse): fabricated_number, intent_misalignment(important)
- **Weak** (→ confidence lower only): uncited_claim, low_overlap, intent_misalignment(generic)
Re-gate 로직 (Phase 3.5a 9라운드 토론 결과):
- strong 1개 → partial 강등
- strong 2개 이상 → refuse
- weak → confidence "low"
Intent alignment (rule-based):
- query 의 핵심 명사가 answer 에 등장하는지 확인
- "처벌" 같은 중요 키워드 누락은 strong
- "주요", "관련" 같은 generic 은 무시
"""
from __future__ import annotations
import re
from dataclasses import dataclass
from typing import TYPE_CHECKING
from core.utils import setup_logger
if TYPE_CHECKING:
from .evidence_service import EvidenceItem
logger = setup_logger("grounding")
# "주요", "관련" 등 intent alignment 에서 제외할 generic 단어
GENERIC_TERMS = frozenset({
"주요", "관련", "내용", "정의", "기준", "방법", "설명", "개요",
"대한", "위한", "대해", "무엇", "어떤", "어떻게", "있는",
"하는", "되는", "이런", "그런", "이것", "그것",
})
@dataclass(slots=True)
class GroundingResult:
strong_flags: list[str]
weak_flags: list[str]
_UNIT_CHARS = r'명인개%년월일조항호세건원회'
# "이상/이하/초과/미만" — threshold 표현 (numeric conflict 에서 skip 대상)
_THRESHOLD_SUFFIXES = re.compile(r'이상|이하|초과|미만')
# 약칭/근사치 prefix — 매칭 전 제거 (Phase 3.5 B1).
# ⚠ 최대/최소 는 의도적으로 제외 — 이들은 bound operator 라 의미가 다름 (Phase 3.5 B1 fix3).
# 약/대략/거의/얼추 만 노이즈 prefix 로 strip.
_APPROX_PREFIX_RE = re.compile(r'(약|대략|거의|얼추)\s*')
# 단위 동의어 dict — 추출 직후 정규화 (Phase 3.5 B1)
# 의미가 동일한 단위는 같은 표기로 통일해서 set 비교/range overlap 안정화.
_UNIT_SYNONYMS: dict[str, str] = {
"": "",
"사람": "",
"퍼센트": "%",
"프로": "%",
"KRW": "",
"krw": "",
}
# tolerance(±1%) 허용 단위 — 양적 측정값 (Phase 3.5 B1)
_TOLERANCE_UNITS: frozenset[str] = frozenset({"", "", "%", "", ""})
# tolerance 미적용 단위 — 식별자성 숫자 (연도/조문/횟수)
_EXACT_ONLY_UNITS: frozenset[str] = frozenset({"", "", "", "", "", "", ""})
# 최대/최소 prefix 패턴 — bound operator (Phase 3.5 B1 fix3).
# 매칭된 숫자는 exact pool 에서 제외하고 one-sided range 로 변환.
# 경계값 자체는 clear 대상 아님 (Codex 권장: "최대 100명" + answer "100명" → flag 유지).
_BOUND_PATTERN_RE = re.compile(
rf'(최대|최소)\s*(\d[\d,.]*)\s*([{_UNIT_CHARS}]|인|사람|퍼센트|프로|KRW|krw)'
)
_RANGE_INF = 10**18 # one-sided range 상한 sentinel
def _normalize_unit(unit: str) -> str:
"""단위 동의어 → 대표 표기."""
return _UNIT_SYNONYMS.get(unit, unit)
def _extract_unit(literal: str) -> str | None:
"""리터럴에서 숫자 뒤 단위(한 글자 또는 동의어) 추출 + 정규화."""
# 천단위 콤마 + 옵션 소수 + 한글 단위 한 글자 또는 동의어
m = re.match(rf'[\d,.]+\s*([{_UNIT_CHARS}]|인|사람|퍼센트|프로|KRW|krw)', literal)
if not m:
return None
return _normalize_unit(m.group(1))
def _extract_numeric_corpus(text: str) -> dict:
"""단위별 숫자 + 범위 + bound 통합 추출 (Phase 3.5 B1 fix1+fix3).
Returns:
{
"exact_by_unit": {unit_or_None: set(digits)}, # 평범한 숫자 (bound 제외)
"ranges_by_unit": {unit: [(lo, hi), ...]}, # 양방향(A~B) + 단방향(최대/최소)
}
None 키는 단위 없는 bare 숫자.
`최대 N <unit>` → ranges[(0, N-1)] (경계값 자체는 cleared 대상 아님)
`최소 N <unit>` → ranges[(N+1, INF)]
"""
cleaned = _APPROX_PREFIX_RE.sub('', text)
exact_by_unit: dict[str | None, set[str]] = {None: set()}
ranges_by_unit: dict[str, list[tuple[int, int]]] = {}
# 1) 최대/최소 — bound. exact pool 에서 제외, one-sided range 로 변환.
bound_spans: list[tuple[int, int]] = [] # 매칭 substring 위치 — 이후 단계에서 skip
for m in _BOUND_PATTERN_RE.finditer(cleaned):
bound_kind = m.group(1)
try:
n = int(m.group(2).replace(',', '').split('.')[0])
except ValueError:
continue
unit = _normalize_unit(m.group(3))
if bound_kind == "최대":
ranges_by_unit.setdefault(unit, []).append((0, max(0, n - 1)))
else: # 최소
ranges_by_unit.setdefault(unit, []).append((n + 1, _RANGE_INF))
bound_spans.append((m.start(), m.end()))
def _in_bound_span(pos: int) -> bool:
return any(s <= pos < e for s, e in bound_spans)
# 2) 천단위 콤마 bare number
for m in re.finditer(r'\d{1,3}(?:,\d{3})+(?:\.\d+)?', cleaned):
if _in_bound_span(m.start()):
continue
exact_by_unit[None].add(m.group().replace(',', ''))
# 3) 단위 있는 숫자 (단위 동의어 포함)
for m in re.finditer(
rf'(\d[\d,.]*)\s*([{_UNIT_CHARS}]|인|사람|퍼센트|프로|KRW|krw)',
cleaned,
):
if _in_bound_span(m.start()):
continue
digits = m.group(1).replace(',', '').split('.')[0]
if not digits:
continue
unit = _normalize_unit(m.group(2))
exact_by_unit.setdefault(unit, set()).add(digits)
# 4) 양방향 범위 표현 (A~B / A 부터 B)
for m in re.finditer(
rf'(\d[\d,.]*)\s*(?:[~\-]|부터)\s*(\d[\d,.]*)\s*([{_UNIT_CHARS}]|인|사람|퍼센트|프로)',
cleaned,
):
if _in_bound_span(m.start()):
continue
try:
lo = int(m.group(1).replace(',', '').split('.')[0])
hi = int(m.group(2).replace(',', '').split('.')[0])
except ValueError:
continue
unit = _normalize_unit(m.group(3))
ranges_by_unit.setdefault(unit, []).append((min(lo, hi), max(lo, hi)))
# 5) bare 2자리+ 단독 숫자
for m in re.finditer(r'\b(\d{2,})\b', cleaned):
if _in_bound_span(m.start()):
continue
exact_by_unit[None].add(m.group())
return {
"exact_by_unit": exact_by_unit,
"ranges_by_unit": ranges_by_unit,
}
def _within_unit_range(
n: int, unit: str | None, ranges_by_unit: dict[str, list[tuple[int, int]]]
) -> bool:
"""unit-matching range 검증.
answer unit 이 None (bare 숫자) 면 보수적으로 False — bare 답변은 range clear 대상 아님.
"""
if unit is None:
return False
return any(lo <= n <= hi for lo, hi in ranges_by_unit.get(unit, []))
def _close_to_unit_pool(
n: int, unit: str | None, exact_by_unit: dict[str | None, set[str]], tol: float
) -> bool:
"""unit-matching tolerance 검증.
answer unit 이 None 이면 False — bare 답변은 tolerance 대상 아님.
같은 unit bucket 안의 후보만 비교.
"""
if unit is None:
return False
candidates = exact_by_unit.get(unit, set())
for c in candidates:
try:
cn = int(c)
except ValueError:
continue
if cn == 0:
continue
if abs(n - cn) / cn <= tol:
return True
return False
def _extract_number_literals(text: str) -> set[str]:
"""숫자 + 단위 추출 + normalize (Phase 3.5 B1: 6단계 확장).
1) 약칭 prefix 제거 ("약 100명""100명")
2) 천단위 콤마 bare number 우선 ("1,000""1000" set 등록)
3) 한국어 단위 접미사 매칭 (기존)
4) 범위 표현 양쪽 숫자 추출 (separator: ~, -, , 부터)
5) 단위 동의어 정규화 (인→명, 퍼센트→%, KRW→원)
6) bare 2자리+ 추출 (기존)
"""
# 1. 약칭 prefix 제거 (전체 텍스트에서)
cleaned = _APPROX_PREFIX_RE.sub('', text)
# 2. 천단위 콤마 bare number — normalize 된 값을 set 에 선등록
normalized: set[str] = set()
for m in re.finditer(r'\d{1,3}(?:,\d{3})+(?:\.\d+)?', cleaned):
normalized.add(m.group().replace(',', ''))
# 3. 숫자 + 한국어 단위 접미사 (동의어 포함)
raw: set[str] = set(re.findall(
rf'\d[\d,.]*\s*(?:[{_UNIT_CHARS}]|인|사람|퍼센트|프로|KRW|krw)\w{{0,2}}',
cleaned,
))
# 4. 범위 표현 — separator 에 "부터" 추가
for m in re.finditer(
rf'(\d[\d,.]*)\s*(?:[~\-]|부터)\s*(\d[\d,.]*)\s*([{_UNIT_CHARS}]|인|사람|퍼센트|프로)',
cleaned,
):
unit_norm = _normalize_unit(m.group(3))
raw.add(m.group(1) + unit_norm)
raw.add(m.group(2) + unit_norm)
# 5. normalize: 단위 동의어 통일 + 콤마 제거
for r in raw:
# 단위 부분 정규화
m = re.match(r'([\d,.]+)\s*([^\d\s]+)', r)
if m:
digits_part = m.group(1)
unit_part = _normalize_unit(m.group(2))
normalized.add(digits_part + unit_part)
normalized.add(digits_part.replace(',', '') + unit_part)
normalized.add(r.strip())
num_only = re.match(r'[\d,.]+', r)
if num_only:
normalized.add(num_only.group().replace(',', ''))
# 6. 단독 숫자 (2자리+ 만)
for d in re.findall(r'\b(\d{2,})\b', cleaned):
normalized.add(d)
return normalized
def _within_evidence_range(digits: str, raw: str, evidence_text: str) -> bool:
"""evidence 에 'A~B 단위' 가 있고 answer 의 숫자가 그 범위 안이면 True.
범위 단위는 무시 (단위 비교는 호출 전 단계). digits = 정수 문자열.
"""
try:
n = int(digits)
except ValueError:
return False
cleaned_ev = _APPROX_PREFIX_RE.sub('', evidence_text)
for m in re.finditer(
rf'(\d[\d,.]*)\s*(?:[~\-]|부터)\s*(\d[\d,.]*)\s*[{_UNIT_CHARS}]',
cleaned_ev,
):
try:
lo = int(m.group(1).replace(',', '').split('.')[0])
hi = int(m.group(2).replace(',', '').split('.')[0])
if min(lo, hi) <= n <= max(lo, hi):
return True
except ValueError:
continue
return False
def _close_to_any(n: int, candidates: set[str], tol: float) -> bool:
"""candidates 중 하나라도 (1±tol) 배율 안에 들어오면 True.
n 은 정수, candidates 는 digits-only 문자열 집합.
"""
for c in candidates:
try:
cn = int(c)
except ValueError:
continue
if cn == 0:
continue
if abs(n - cn) / cn <= tol:
return True
return False
def _extract_content_tokens(text: str) -> set[str]:
"""한국어 2자 이상 명사 + 영어 3자 이상 단어."""
return set(re.findall(r'[가-힣]{2,}|[a-zA-Z]{3,}', text))
def _parse_number_with_unit(literal: str) -> tuple[str, str] | None:
"""숫자 리터럴에서 (digits_only, unit) 분리. 단위 없으면 None."""
m = re.match(rf'([\d,.]+)\s*([{_UNIT_CHARS}])', literal)
if not m:
return None
digits = m.group(1).replace(',', '')
unit = m.group(2)
return (digits, unit)
def _check_evidence_numeric_conflicts(evidence: list["EvidenceItem"]) -> list[str]:
"""evidence 간 숫자 충돌 감지 (Phase 3.5b). evidence >= 2 일 때만 활성.
동일 단위, 다른 숫자 → weak flag. "이상/이하/초과/미만" 포함 시 skip.
bare number 는 비교 안 함 (조항 번호 등 false positive 방지).
"""
if len(evidence) < 2:
return []
# 각 evidence 에서 단위 있는 숫자 + threshold 여부 추출
# {evidence_idx: [(digits, unit, has_threshold), ...]}
per_evidence: dict[int, list[tuple[str, str, bool]]] = {}
for idx, ev in enumerate(evidence):
nums = re.findall(
rf'\d[\d,.]*\s*[{_UNIT_CHARS}]\w{{0,4}}',
ev.span_text,
)
entries = []
for raw in nums:
parsed = _parse_number_with_unit(raw)
if not parsed:
continue
has_thr = bool(_THRESHOLD_SUFFIXES.search(raw))
entries.append((parsed[0], parsed[1], has_thr))
if entries:
per_evidence[idx] = entries
if len(per_evidence) < 2:
return []
# 단위별로 evidence 간 숫자 비교
# {unit: {digits: [evidence_idx, ...]}}
unit_map: dict[str, dict[str, list[int]]] = {}
for idx, entries in per_evidence.items():
for digits, unit, has_thr in entries:
if has_thr:
continue # threshold 표현은 skip
if unit not in unit_map:
unit_map[unit] = {}
if digits not in unit_map[unit]:
unit_map[unit][digits] = []
if idx not in unit_map[unit][digits]:
unit_map[unit][digits].append(idx)
flags: list[str] = []
for unit, digits_map in unit_map.items():
distinct_values = list(digits_map.keys())
if len(distinct_values) >= 2:
# 가장 많이 등장하는 2개 비교
top2 = sorted(distinct_values, key=lambda d: len(digits_map[d]), reverse=True)[:2]
flags.append(
f"evidence_numeric_conflict:{top2[0]}{unit}_vs_{top2[1]}{unit}"
)
return flags
def check(
query: str,
answer: str,
evidence: list[EvidenceItem],
) -> GroundingResult:
"""답변 vs evidence grounding 검증 + query intent alignment."""
strong: list[str] = []
weak: list[str] = []
if not answer or not evidence:
return GroundingResult([], [])
# ⚠ citation marker [n] 양측 제거 (대칭성 — Phase 3.5 B1)
evidence_text = re.sub(r'\[\d+\]', '', " ".join(e.span_text for e in evidence))
# ── Strong 1: fabricated number (unit-aware 3단계 — Phase 3.5 B1 fix1+fix3) ──
# Codex 지적 반영:
# - fix1: range/tolerance/exact 모두 단위 일치 시에만 clear
# (예: "150원" vs "100~200명" → flag 유지)
# - fix3: 최대/최소 prefix 는 bound 의미 보존
# (예: "최대 100명" + answer "100명" → flag 유지, "최대 100명" + answer "50명" → cleared)
answer_clean = re.sub(r'\[\d+\]', '', answer)
answer_corpus = _extract_numeric_corpus(answer_clean)
evidence_corpus = _extract_numeric_corpus(evidence_text)
ev_exact_by_unit = evidence_corpus["exact_by_unit"]
ev_ranges_by_unit = evidence_corpus["ranges_by_unit"]
# cleared 는 (unit, digits) 쌍 단위로 추적 — 단위 충돌 케이스 방어
cleared_pairs: set[tuple[str | None, str]] = set()
# Pass 1: 각 (unit, digits) 가 evidence 에서 정당화되는지 판정
for unit, digits_set in answer_corpus["exact_by_unit"].items():
for d in digits_set:
# 1) exact match — 같은 unit bucket 내에서만
if d in ev_exact_by_unit.get(unit, set()):
cleared_pairs.add((unit, d))
continue
# bare answer (unit=None) 는 evidence bare bucket 도 보조 매칭
if unit is None and d in ev_exact_by_unit.get(None, set()):
cleared_pairs.add((unit, d))
continue
try:
n = int(d)
except ValueError:
continue
# 2) range — same-unit 만 (bare answer 는 range clear 대상 아님)
if _within_unit_range(n, unit, ev_ranges_by_unit):
cleared_pairs.add((unit, d))
continue
# 3) ±1% tolerance — 단위가 양적(_TOLERANCE_UNITS) + 4자리+ + same-unit
if (
unit in _TOLERANCE_UNITS
and len(d) >= 4
and _close_to_unit_pool(n, unit, ev_exact_by_unit, tol=0.01)
):
cleared_pairs.add((unit, d))
continue
# 식별자성 단위(_EXACT_ONLY_UNITS) 는 tolerance 패스 X.
# Pass 2: cleared 되지 않은 (unit, digits) 를 strong flag.
# 1자리 무시는 unit 이 식별자성(_EXACT_ONLY_UNITS: 년/월/일/조/항/호/회) 이 아닐 때만 적용.
# bare(None) 답변 숫자는 같은 digit 이 다른 unit 에서 cleared 됐으면 skip — 추출 부산물 방어.
# ⚠ 단위 cross-clear (예: "원" cleared → "명" 도 skip) 은 금지: Codex unit-mismatch 케이스가 깨짐.
unit_anchored_cleared: set[str] = {d for (u, d) in cleared_pairs if u is not None}
flagged_keys: set[tuple[str | None, str]] = set()
for unit, digits_set in answer_corpus["exact_by_unit"].items():
for d in digits_set:
if (unit, d) in cleared_pairs or (unit, d) in flagged_keys:
continue
# bare(None) 답변 숫자가 임의의 단위 bucket 에서 cleared 됐으면 duplicate 로 처리.
# 사례: "1,000명" → unit bucket "명" 에 1000 + bare bucket None 에 1000 (comma normalize 부산물).
# 이미 ("명", "1000") 가 cleared 라면 (None, "1000") 도 같은 사실을 가리키므로 skip.
if unit is None and d in unit_anchored_cleared:
continue
if len(d) < 2 and unit not in _EXACT_ONLY_UNITS:
continue
flagged_keys.add((unit, d))
# 사람이 읽기 좋게 "{digits}{unit}" 또는 bare 형태로 표기
label = f"{d}{unit}" if unit else d
strong.append(f"fabricated_number:{label}")
# ── Strong/Weak 2: query-answer intent alignment ──
query_content = _extract_content_tokens(query)
answer_content = _extract_content_tokens(answer)
if query_content:
missing_terms = query_content - answer_content
important_missing = [
t for t in missing_terms
if t not in GENERIC_TERMS and len(t) >= 2
]
if important_missing:
strong.append(
f"intent_misalignment:{','.join(important_missing[:3])}"
)
elif len(missing_terms) > len(query_content) * 0.5:
weak.append(
f"intent_misalignment_generic:"
f"missing({','.join(list(missing_terms)[:5])})"
)
# ── Weak 1: uncited claim ──
sentences = re.split(r'(?<=[.!?。])\s+', answer)
for s in sentences:
if len(s.strip()) > 20 and not re.search(r'\[\d+\]', s):
weak.append(f"uncited_claim:{s[:40]}")
# ── Weak: evidence 간 숫자 충돌 (Phase 3.5b) ──
conflicts = _check_evidence_numeric_conflicts(evidence)
weak.extend(conflicts)
# ── Weak 2: token overlap ──
answer_tokens = _extract_content_tokens(answer)
evidence_tokens = _extract_content_tokens(evidence_text)
if answer_tokens:
overlap = len(answer_tokens & evidence_tokens) / len(answer_tokens)
if overlap < 0.4:
weak.append(f"low_overlap:{overlap:.2f}")
if strong or weak:
logger.info(
"grounding query=%r strong=%d weak=%d flags=%s",
query[:60],
len(strong),
len(weak),
",".join(strong[:3] + weak[:3]),
)
return GroundingResult(strong, weak)