diff --git a/app/services/search/grounding_check.py b/app/services/search/grounding_check.py index c1a2ca0..66108ca 100644 --- a/app/services/search/grounding_check.py +++ b/app/services/search/grounding_check.py @@ -42,36 +42,267 @@ class GroundingResult: weak_flags: list[str] -_UNIT_CHARS = r'명인개%년월일조항호세건원' +_UNIT_CHARS = r'명인개%년월일조항호세건원회' # "이상/이하/초과/미만" — threshold 표현 (numeric conflict 에서 skip 대상) _THRESHOLD_SUFFIXES = re.compile(r'이상|이하|초과|미만') +# 약칭/근사치 prefix — 매칭 전 제거 (Phase 3.5 B1). +# ⚠ 최대/최소 는 의도적으로 제외 — 이들은 bound operator 라 의미가 다름 (Phase 3.5 B1 fix3). +# 약/대략/거의/얼추 만 노이즈 prefix 로 strip. +_APPROX_PREFIX_RE = re.compile(r'(약|대략|거의|얼추)\s*') + +# 단위 동의어 dict — 추출 직후 정규화 (Phase 3.5 B1) +# 의미가 동일한 단위는 같은 표기로 통일해서 set 비교/range overlap 안정화. +_UNIT_SYNONYMS: dict[str, str] = { + "인": "명", + "사람": "명", + "퍼센트": "%", + "프로": "%", + "KRW": "원", + "krw": "원", +} + +# tolerance(±1%) 허용 단위 — 양적 측정값 (Phase 3.5 B1) +_TOLERANCE_UNITS: frozenset[str] = frozenset({"명", "원", "%", "건", "개"}) + +# tolerance 미적용 단위 — 식별자성 숫자 (연도/조문/횟수) +_EXACT_ONLY_UNITS: frozenset[str] = frozenset({"년", "월", "일", "조", "항", "호", "회"}) + +# 최대/최소 prefix 패턴 — bound operator (Phase 3.5 B1 fix3). +# 매칭된 숫자는 exact pool 에서 제외하고 one-sided range 로 변환. +# 경계값 자체는 clear 대상 아님 (Codex 권장: "최대 100명" + answer "100명" → flag 유지). +_BOUND_PATTERN_RE = re.compile( + rf'(최대|최소)\s*(\d[\d,.]*)\s*([{_UNIT_CHARS}]|인|사람|퍼센트|프로|KRW|krw)' +) +_RANGE_INF = 10**18 # one-sided range 상한 sentinel + + +def _normalize_unit(unit: str) -> str: + """단위 동의어 → 대표 표기.""" + return _UNIT_SYNONYMS.get(unit, unit) + + +def _extract_unit(literal: str) -> str | None: + """리터럴에서 숫자 뒤 단위(한 글자 또는 동의어) 추출 + 정규화.""" + # 천단위 콤마 + 옵션 소수 + 한글 단위 한 글자 또는 동의어 + m = re.match(rf'[\d,.]+\s*([{_UNIT_CHARS}]|인|사람|퍼센트|프로|KRW|krw)', literal) + if not m: + return None + return _normalize_unit(m.group(1)) + + +def _extract_numeric_corpus(text: str) -> dict: + """단위별 숫자 + 범위 + bound 통합 추출 (Phase 3.5 B1 fix1+fix3). + + Returns: + { + "exact_by_unit": {unit_or_None: set(digits)}, # 평범한 숫자 (bound 제외) + "ranges_by_unit": {unit: [(lo, hi), ...]}, # 양방향(A~B) + 단방향(최대/최소) + } + + None 키는 단위 없는 bare 숫자. + `최대 N ` → ranges[(0, N-1)] (경계값 자체는 cleared 대상 아님) + `최소 N ` → ranges[(N+1, INF)] + """ + cleaned = _APPROX_PREFIX_RE.sub('', text) + + exact_by_unit: dict[str | None, set[str]] = {None: set()} + ranges_by_unit: dict[str, list[tuple[int, int]]] = {} + + # 1) 최대/최소 — bound. exact pool 에서 제외, one-sided range 로 변환. + bound_spans: list[tuple[int, int]] = [] # 매칭 substring 위치 — 이후 단계에서 skip + for m in _BOUND_PATTERN_RE.finditer(cleaned): + bound_kind = m.group(1) + try: + n = int(m.group(2).replace(',', '').split('.')[0]) + except ValueError: + continue + unit = _normalize_unit(m.group(3)) + if bound_kind == "최대": + ranges_by_unit.setdefault(unit, []).append((0, max(0, n - 1))) + else: # 최소 + ranges_by_unit.setdefault(unit, []).append((n + 1, _RANGE_INF)) + bound_spans.append((m.start(), m.end())) + + def _in_bound_span(pos: int) -> bool: + return any(s <= pos < e for s, e in bound_spans) + + # 2) 천단위 콤마 bare number + for m in re.finditer(r'\d{1,3}(?:,\d{3})+(?:\.\d+)?', cleaned): + if _in_bound_span(m.start()): + continue + exact_by_unit[None].add(m.group().replace(',', '')) + + # 3) 단위 있는 숫자 (단위 동의어 포함) + for m in re.finditer( + rf'(\d[\d,.]*)\s*([{_UNIT_CHARS}]|인|사람|퍼센트|프로|KRW|krw)', + cleaned, + ): + if _in_bound_span(m.start()): + continue + digits = m.group(1).replace(',', '').split('.')[0] + if not digits: + continue + unit = _normalize_unit(m.group(2)) + exact_by_unit.setdefault(unit, set()).add(digits) + + # 4) 양방향 범위 표현 (A~B / A 부터 B) + for m in re.finditer( + rf'(\d[\d,.]*)\s*(?:[~\-–]|부터)\s*(\d[\d,.]*)\s*([{_UNIT_CHARS}]|인|사람|퍼센트|프로)', + cleaned, + ): + if _in_bound_span(m.start()): + continue + try: + lo = int(m.group(1).replace(',', '').split('.')[0]) + hi = int(m.group(2).replace(',', '').split('.')[0]) + except ValueError: + continue + unit = _normalize_unit(m.group(3)) + ranges_by_unit.setdefault(unit, []).append((min(lo, hi), max(lo, hi))) + + # 5) bare 2자리+ 단독 숫자 + for m in re.finditer(r'\b(\d{2,})\b', cleaned): + if _in_bound_span(m.start()): + continue + exact_by_unit[None].add(m.group()) + + return { + "exact_by_unit": exact_by_unit, + "ranges_by_unit": ranges_by_unit, + } + + +def _within_unit_range( + n: int, unit: str | None, ranges_by_unit: dict[str, list[tuple[int, int]]] +) -> bool: + """unit-matching range 검증. + + answer unit 이 None (bare 숫자) 면 보수적으로 False — bare 답변은 range clear 대상 아님. + """ + if unit is None: + return False + return any(lo <= n <= hi for lo, hi in ranges_by_unit.get(unit, [])) + + +def _close_to_unit_pool( + n: int, unit: str | None, exact_by_unit: dict[str | None, set[str]], tol: float +) -> bool: + """unit-matching tolerance 검증. + + answer unit 이 None 이면 False — bare 답변은 tolerance 대상 아님. + 같은 unit bucket 안의 후보만 비교. + """ + if unit is None: + return False + candidates = exact_by_unit.get(unit, set()) + for c in candidates: + try: + cn = int(c) + except ValueError: + continue + if cn == 0: + continue + if abs(n - cn) / cn <= tol: + return True + return False + def _extract_number_literals(text: str) -> set[str]: - """숫자 + 단위 추출 + normalize (Phase 3.5b 개선).""" - # 1. 숫자 + 한국어 단위 접미사 - raw = set(re.findall(rf'\d[\d,.]*\s*[{_UNIT_CHARS}]\w{{0,2}}', text)) - # 2. 범위 표현 (10~20%, 100-200명 등) — 양쪽 숫자 각각 추출 + """숫자 + 단위 추출 + normalize (Phase 3.5 B1: 6단계 확장). + + 1) 약칭 prefix 제거 ("약 100명" → "100명") + 2) 천단위 콤마 bare number 우선 ("1,000" → "1000" set 등록) + 3) 한국어 단위 접미사 매칭 (기존) + 4) 범위 표현 양쪽 숫자 추출 (separator: ~, -, –, 부터) + 5) 단위 동의어 정규화 (인→명, 퍼센트→%, KRW→원) + 6) bare 2자리+ 추출 (기존) + """ + # 1. 약칭 prefix 제거 (전체 텍스트에서) + cleaned = _APPROX_PREFIX_RE.sub('', text) + + # 2. 천단위 콤마 bare number — normalize 된 값을 set 에 선등록 + normalized: set[str] = set() + for m in re.finditer(r'\d{1,3}(?:,\d{3})+(?:\.\d+)?', cleaned): + normalized.add(m.group().replace(',', '')) + + # 3. 숫자 + 한국어 단위 접미사 (동의어 포함) + raw: set[str] = set(re.findall( + rf'\d[\d,.]*\s*(?:[{_UNIT_CHARS}]|인|사람|퍼센트|프로|KRW|krw)\w{{0,2}}', + cleaned, + )) + + # 4. 범위 표현 — separator 에 "부터" 추가 for m in re.finditer( - rf'(\d[\d,.]*)\s*[~\-–]\s*(\d[\d,.]*)\s*([{_UNIT_CHARS}])', - text, + rf'(\d[\d,.]*)\s*(?:[~\-–]|부터)\s*(\d[\d,.]*)\s*([{_UNIT_CHARS}]|인|사람|퍼센트|프로)', + cleaned, ): - raw.add(m.group(1) + m.group(3)) - raw.add(m.group(2) + m.group(3)) - # 3. normalize - normalized = set() + unit_norm = _normalize_unit(m.group(3)) + raw.add(m.group(1) + unit_norm) + raw.add(m.group(2) + unit_norm) + + # 5. normalize: 단위 동의어 통일 + 콤마 제거 for r in raw: + # 단위 부분 정규화 + m = re.match(r'([\d,.]+)\s*([^\d\s]+)', r) + if m: + digits_part = m.group(1) + unit_part = _normalize_unit(m.group(2)) + normalized.add(digits_part + unit_part) + normalized.add(digits_part.replace(',', '') + unit_part) normalized.add(r.strip()) num_only = re.match(r'[\d,.]+', r) if num_only: normalized.add(num_only.group().replace(',', '')) - # 4. 단독 숫자 (2자리 이상만 — 1자리는 오탐 과다) - for d in re.findall(r'\b(\d{2,})\b', text): + + # 6. 단독 숫자 (2자리+ 만) + for d in re.findall(r'\b(\d{2,})\b', cleaned): normalized.add(d) return normalized +def _within_evidence_range(digits: str, raw: str, evidence_text: str) -> bool: + """evidence 에 'A~B 단위' 가 있고 answer 의 숫자가 그 범위 안이면 True. + + 범위 단위는 무시 (단위 비교는 호출 전 단계). digits = 정수 문자열. + """ + try: + n = int(digits) + except ValueError: + return False + cleaned_ev = _APPROX_PREFIX_RE.sub('', evidence_text) + for m in re.finditer( + rf'(\d[\d,.]*)\s*(?:[~\-–]|부터)\s*(\d[\d,.]*)\s*[{_UNIT_CHARS}]', + cleaned_ev, + ): + try: + lo = int(m.group(1).replace(',', '').split('.')[0]) + hi = int(m.group(2).replace(',', '').split('.')[0]) + if min(lo, hi) <= n <= max(lo, hi): + return True + except ValueError: + continue + return False + + +def _close_to_any(n: int, candidates: set[str], tol: float) -> bool: + """candidates 중 하나라도 (1±tol) 배율 안에 들어오면 True. + + n 은 정수, candidates 는 digits-only 문자열 집합. + """ + for c in candidates: + try: + cn = int(c) + except ValueError: + continue + if cn == 0: + continue + if abs(n - cn) / cn <= tol: + return True + return False + + def _extract_content_tokens(text: str) -> set[str]: """한국어 2자 이상 명사 + 영어 3자 이상 단어.""" return set(re.findall(r'[가-힣]{2,}|[a-zA-Z]{3,}', text)) @@ -156,19 +387,74 @@ def check( if not answer or not evidence: return GroundingResult([], []) - evidence_text = " ".join(e.span_text for e in evidence) + # ⚠ citation marker [n] 양측 제거 (대칭성 — Phase 3.5 B1) + evidence_text = re.sub(r'\[\d+\]', '', " ".join(e.span_text for e in evidence)) - # ── Strong 1: fabricated number (equality, not substring) ── - # ⚠ citation marker [n] 제거 후 숫자 추출 (안 그러면 [1][2][3] 이 fabricated 로 오탐) + # ── Strong 1: fabricated number (unit-aware 3단계 — Phase 3.5 B1 fix1+fix3) ── + # Codex 지적 반영: + # - fix1: range/tolerance/exact 모두 단위 일치 시에만 clear + # (예: "150원" vs "100~200명" → flag 유지) + # - fix3: 최대/최소 prefix 는 bound 의미 보존 + # (예: "최대 100명" + answer "100명" → flag 유지, "최대 100명" + answer "50명" → cleared) answer_clean = re.sub(r'\[\d+\]', '', answer) - answer_nums = _extract_number_literals(answer_clean) - evidence_nums = _extract_number_literals(evidence_text) - evidence_digits = {re.sub(r'[^\d]', '', en) for en in evidence_nums} - evidence_digits.discard('') - for num in answer_nums: - digits_only = re.sub(r'[^\d]', '', num) - if digits_only and digits_only not in evidence_digits: - strong.append(f"fabricated_number:{num}") + answer_corpus = _extract_numeric_corpus(answer_clean) + evidence_corpus = _extract_numeric_corpus(evidence_text) + ev_exact_by_unit = evidence_corpus["exact_by_unit"] + ev_ranges_by_unit = evidence_corpus["ranges_by_unit"] + + # cleared 는 (unit, digits) 쌍 단위로 추적 — 단위 충돌 케이스 방어 + cleared_pairs: set[tuple[str | None, str]] = set() + + # Pass 1: 각 (unit, digits) 가 evidence 에서 정당화되는지 판정 + for unit, digits_set in answer_corpus["exact_by_unit"].items(): + for d in digits_set: + # 1) exact match — 같은 unit bucket 내에서만 + if d in ev_exact_by_unit.get(unit, set()): + cleared_pairs.add((unit, d)) + continue + # bare answer (unit=None) 는 evidence bare bucket 도 보조 매칭 + if unit is None and d in ev_exact_by_unit.get(None, set()): + cleared_pairs.add((unit, d)) + continue + try: + n = int(d) + except ValueError: + continue + # 2) range — same-unit 만 (bare answer 는 range clear 대상 아님) + if _within_unit_range(n, unit, ev_ranges_by_unit): + cleared_pairs.add((unit, d)) + continue + # 3) ±1% tolerance — 단위가 양적(_TOLERANCE_UNITS) + 4자리+ + same-unit + if ( + unit in _TOLERANCE_UNITS + and len(d) >= 4 + and _close_to_unit_pool(n, unit, ev_exact_by_unit, tol=0.01) + ): + cleared_pairs.add((unit, d)) + continue + # 식별자성 단위(_EXACT_ONLY_UNITS) 는 tolerance 패스 X. + + # Pass 2: cleared 되지 않은 (unit, digits) 를 strong flag. + # 1자리 무시는 unit 이 식별자성(_EXACT_ONLY_UNITS: 년/월/일/조/항/호/회) 이 아닐 때만 적용. + # bare(None) 답변 숫자는 같은 digit 이 다른 unit 에서 cleared 됐으면 skip — 추출 부산물 방어. + # ⚠ 단위 cross-clear (예: "원" cleared → "명" 도 skip) 은 금지: Codex unit-mismatch 케이스가 깨짐. + unit_anchored_cleared: set[str] = {d for (u, d) in cleared_pairs if u is not None} + flagged_keys: set[tuple[str | None, str]] = set() + for unit, digits_set in answer_corpus["exact_by_unit"].items(): + for d in digits_set: + if (unit, d) in cleared_pairs or (unit, d) in flagged_keys: + continue + # bare(None) 답변 숫자가 임의의 단위 bucket 에서 cleared 됐으면 duplicate 로 처리. + # 사례: "1,000명" → unit bucket "명" 에 1000 + bare bucket None 에 1000 (comma normalize 부산물). + # 이미 ("명", "1000") 가 cleared 라면 (None, "1000") 도 같은 사실을 가리키므로 skip. + if unit is None and d in unit_anchored_cleared: + continue + if len(d) < 2 and unit not in _EXACT_ONLY_UNITS: + continue + flagged_keys.add((unit, d)) + # 사람이 읽기 좋게 "{digits}{unit}" 또는 bare 형태로 표기 + label = f"{d}{unit}" if unit else d + strong.append(f"fabricated_number:{label}") # ── Strong/Weak 2: query-answer intent alignment ── query_content = _extract_content_tokens(query) diff --git a/tests/test_grounding_fabricated_number.py b/tests/test_grounding_fabricated_number.py new file mode 100644 index 0000000..ae76b6d --- /dev/null +++ b/tests/test_grounding_fabricated_number.py @@ -0,0 +1,188 @@ +"""Phase 3.5 B1 (fix1+fix3): unit-aware fabricated_number + bound semantics. + +기준: +- 단위 일치 시에만 exact/range/tolerance clear (fix1: Codex unit-mismatch regression 방지) +- 약/대략/거의/얼추 만 approx prefix strip; 최대/최소 는 bound operator 로 보존 (fix3) +- tolerance 는 양적 단위(_TOLERANCE_UNITS) + 4자리+ 만; 식별자성(_EXACT_ONLY_UNITS) 은 strict +""" + +from __future__ import annotations + +import os +import sys + +# tests/ → 프로젝트 루트 → app/ +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "app")) + +import pytest + +from services.search.evidence_service import EvidenceItem +from services.search.grounding_check import check + + +def _ev(text: str, n: int = 1) -> EvidenceItem: + return EvidenceItem( + n=n, + chunk_id=None, + doc_id=100 + n, + title=f"doc{n}", + section_title=None, + span_text=text, + relevance=0.9, + rerank_score=0.85, + full_snippet=text, + source="llm", + ) + + +def _has_fabricated(result, sub: str | None = None) -> bool: + for f in result.strong_flags: + if not f.startswith("fabricated_number:"): + continue + if sub is None or sub in f: + return True + return False + + +# ─── 콤마/prefix/range/단위 동의어/citation (기존 17 케이스) ────── + + +def test_comma_thousand_match(): + r = check("질문", "총 1,000명 [1]", [_ev("총원은 1000명입니다.")]) + assert not _has_fabricated(r, "1000") + + +def test_comma_thousand_reverse(): + r = check("질문", "총 1000명 [1]", [_ev("총원은 1,000명입니다.")]) + assert not _has_fabricated(r) + + +def test_approx_prefix_in_answer(): + r = check("질문", "약 100명이 참여 [1]", [_ev("100명이 참여")]) + assert not _has_fabricated(r) + + +def test_approx_prefix_in_evidence(): + r = check("질문", "100명이 참여 [1]", [_ev("약 100명이 참여")]) + assert not _has_fabricated(r) + + +def test_range_inner_value_passes(): + r = check("질문", "약 150명 [1]", [_ev("100~200명 사이 추정")]) + assert not _has_fabricated(r, "150") + + +def test_range_outer_value_flagged(): + r = check("질문", "300명 [1]", [_ev("100~200명 사이 추정")]) + assert _has_fabricated(r, "300") + + +def test_unit_synonym_in_to_myeong(): + r = check("질문", "총 50인이 모임 [1]", [_ev("총 50명이 모임.")]) + assert not _has_fabricated(r) + + +def test_unit_synonym_percent_to_pct(): + r = check("질문", "비율 30퍼센트 [1]", [_ev("비율 30%이다.")]) + assert not _has_fabricated(r) + + +def test_citation_marker_both_sides(): + """bug fix: evidence 측 [n] 미제거로 디지트 합쳐지던 케이스.""" + r = check("질문", "가격 [1] 5,000원", [_ev("[2] 5,000원이 정확")]) + assert not _has_fabricated(r) + + +def test_genuine_fabricated_number(): + r = check("질문", "결과 777명 [1]", [_ev("500명, 300명을 받음.")]) + assert _has_fabricated(r, "777") + + +def test_amount_4digit_tolerance_passes(): + r = check("질문", "9,990원 [1]", [_ev("10,000원입니다.")]) + assert not _has_fabricated(r) + + +def test_year_no_tolerance_flagged(): + r = check("질문", "2024년 [1]", [_ev("2026년에 발효")]) + assert _has_fabricated(r, "2024") + + +def test_article_no_tolerance_flagged(): + r = check("질문", "제5조에 명시 [1]", [_ev("제6조에 따라")]) + assert _has_fabricated(r) + + +def test_count_no_tolerance_flagged(): + r = check("질문", "총 3회 위반 [1]", [_ev("총 4회 적발")]) + assert _has_fabricated(r) + + +def test_three_digit_strict(): + r = check("질문", "총 15개 [1]", [_ev("총 10개")]) + assert _has_fabricated(r, "15") + + +def test_single_digit_ignored(): + """1자리 + 양적 단위 → 무시 (오탐 방지).""" + r = check("질문", "총 3개 발생 [1]", [_ev("관련 통계 별도")]) + assert not _has_fabricated(r, "3개") + + +def test_range_korean_butter_separator(): + r = check("질문", "약 150명 [1]", [_ev("100부터 200명까지 대상.")]) + assert not _has_fabricated(r, "150") + + +# ─── fix1: unit-mismatch (Codex no-ship) ────────────────── + + +def test_won_vs_myeong_range_flagged(): + """answer '150원' vs evidence '100~200명' → 단위 불일치, flag 유지.""" + r = check("질문", "약 150원이 든다 [1]", [_ev("대상은 100~200명")]) + assert _has_fabricated(r, "150") + + +def test_won_vs_myeong_tolerance_flagged(): + """answer '9,990원' vs evidence '10,000명' → tolerance pool 단위 다름, flag 유지.""" + r = check("질문", "9,990원 [1]", [_ev("10,000명입니다.")]) + assert _has_fabricated(r, "9990") + + +def test_pct_vs_myeong_range_flagged(): + """answer '15%' vs evidence '10~20명' → 단위 불일치, flag 유지.""" + r = check("질문", "약 15% [1]", [_ev("대상 10~20명")]) + assert _has_fabricated(r, "15") + + +# ─── fix3: 최대/최소 bound semantics ─────────────────────── + + +def test_choedae_exact_boundary_flagged(): + """evidence '최대 100명' + answer '100명' → 경계값 자체는 cleared 아님.""" + r = check("질문", "100명이다 [1]", [_ev("최대 100명까지 가능")]) + assert _has_fabricated(r, "100") + + +def test_choeso_exact_boundary_flagged(): + """evidence '최소 100명' + answer '100명' → 경계값 자체는 cleared 아님.""" + r = check("질문", "100명이다 [1]", [_ev("최소 100명 이상 필요")]) + assert _has_fabricated(r, "100") + + +def test_choedae_inner_value_passes(): + """evidence '최대 100명' + answer '50명' → bound 안, cleared.""" + r = check("질문", "50명이다 [1]", [_ev("최대 100명까지 가능")]) + assert not _has_fabricated(r, "50") + + +def test_choeso_above_value_passes(): + """evidence '최소 100명' + answer '150명' → bound 안, cleared.""" + r = check("질문", "150명이다 [1]", [_ev("최소 100명 이상 필요")]) + assert not _has_fabricated(r, "150") + + +def test_choedae_outer_value_flagged(): + """evidence '최대 100명' + answer '200명' → bound 밖, flag.""" + r = check("질문", "200명이다 [1]", [_ev("최대 100명까지 가능")]) + assert _has_fabricated(r, "200")