#!/usr/bin/env python3 """Document Server 검색 평가 스크립트 (Phase 0.2) queries.yaml을 읽어 /api/search 엔드포인트에 호출하고 Recall@10, MRR@10, NDCG@10, Top3 hit-rate, Latency p50/p95를 계산한다. A/B 비교 모드: --baseline-url, --candidate-url 를 각각 지정하면 두 엔드포인트에 동일 쿼리셋을 던지고 결과를 비교한다. 발주건 단위 baseline 모드 (Phase 0 / plan: merry-yawning-owl): --queries-order + --order-groups + --output-order 로 xlsx/PDF 구조화 추출 gap 측정용 Tier 1A/1B/2 지표를 계산한다. 기존 --queries 경로와 CSV 스키마는 변경되지 않는다 (출력 소비자 보호). 사용 예: # 단일 평가 export DOCSRV_TOKEN="eyJ..." python tests/search_eval/run_eval.py \ --base-url https://docs.hyungi.net \ --output reports/baseline_2026-04-07.csv # A/B 비교 (같은 토큰) python tests/search_eval/run_eval.py \ --baseline-url https://docs.hyungi.net \ --candidate-url http://localhost:8000 \ --output reports/phase1_vs_baseline.csv # 발주건 단위 baseline python tests/search_eval/run_eval.py \ --base-url http://localhost:8000 \ --queries-order tests/search_eval/queries_order_baseline.yaml \ --order-groups tests/search_eval/order_groups.yaml \ --output-order reports/baseline_order_unit_2026-04-20.csv 토큰은 env DOCSRV_TOKEN 또는 --token 플래그로 전달. """ from __future__ import annotations import argparse import asyncio import csv import math import os import statistics import sys from dataclasses import dataclass, field from pathlib import Path from typing import Any import httpx import yaml # ───────────────────────────────────────────────────────── # 데이터 구조 # ───────────────────────────────────────────────────────── @dataclass class Query: id: str query: str category: str intent: str domain_hint: str relevant_ids: list[int] top3_ids: list[int] = field(default_factory=list) notes: str = "" # v0.2 schema additions (Phase 1 — graded relevance baseline) legacy_category: str = "" language: str = "ko" ocr_derived: bool = False graded_relevance: dict[int, int] = field(default_factory=dict) failure_expected: bool = False @dataclass class QueryResult: query: Query label: str # "baseline" or "candidate" returned_ids: list[int] latency_ms: float recall_at_10: float mrr_at_10: float ndcg_at_10: float top3_hit: bool # v0.2 graded scores (Phase 1) graded_ndcg_at_10: float = 0.0 graded_recall_at_10_t2: float = 0.0 graded_recall_at_10_t3: float = 0.0 error: str | None = None # ───────────────────────────────────────────────────────── # 평가 지표 # ───────────────────────────────────────────────────────── def recall_at_k(returned: list[int], relevant: list[int], k: int = 10) -> float: """top-k 안에 들어간 정답 비율. 정답 0개면 1.0(빈 케이스는 별도 fail metric).""" if not relevant: return 1.0 if not returned else 0.0 # 비어야 정상인 케이스: 결과 있으면 fail top_k = set(returned[:k]) hits = sum(1 for doc_id in relevant if doc_id in top_k) return hits / len(relevant) def mrr_at_k(returned: list[int], relevant: list[int], k: int = 10) -> float: """top-k 안 첫 정답의 reciprocal rank. 정답 없으면 0.""" if not relevant: return 0.0 relevant_set = set(relevant) for rank, doc_id in enumerate(returned[:k], start=1): if doc_id in relevant_set: return 1.0 / rank return 0.0 def ndcg_at_k(returned: list[int], relevant: list[int], k: int = 10) -> float: """binary relevance 기반 NDCG@k. top3_ids 같은 가중치는 v0.1에선 무시.""" if not relevant: return 0.0 relevant_set = set(relevant) dcg = 0.0 for rank, doc_id in enumerate(returned[:k], start=1): if doc_id in relevant_set: # binary gain = 1, DCG = 1 / log2(rank+1) dcg += 1.0 / math.log2(rank + 1) # ideal DCG: 정답을 1..min(len(relevant), k) 위치에 모두 채운 경우 ideal_hits = min(len(relevant), k) idcg = sum(1.0 / math.log2(r + 1) for r in range(1, ideal_hits + 1)) return dcg / idcg if idcg > 0 else 0.0 def graded_ndcg_at_k(returned: list[int], grades: dict[int, int], k: int = 10) -> float: """graded NDCG@k. grades[doc_id] in {0,1,2,3}. v0.2 산출물. gain = 2^grade - 1 (grade=0 → gain=0, grade=3 → gain=7). ideal DCG = grades 를 grade 내림차순으로 top-k 채운 경우. grades 비어 있으면 0.0 (failure_expected 케이스는 별도 처리). """ if not grades: return 0.0 dcg = 0.0 for rank, doc_id in enumerate(returned[:k], start=1): grade = grades.get(doc_id, 0) if grade > 0: dcg += (2 ** grade - 1) / math.log2(rank + 1) sorted_grades = sorted(grades.values(), reverse=True)[:k] idcg = sum( (2 ** g - 1) / math.log2(r + 1) for r, g in enumerate(sorted_grades, start=1) if g > 0 ) return dcg / idcg if idcg > 0 else 0.0 def graded_recall_at_k( returned: list[int], grades: dict[int, int], threshold: int = 2, k: int = 10, ) -> float: """grade >= threshold 만 정답으로 본 recall@k. v0.2 산출물. threshold=2 → grade 2/3 만 정답 (relevant 이상). threshold=3 → grade 3 만 정답 (highly relevant 만). """ relevant_set = {doc_id for doc_id, g in grades.items() if g >= threshold} if not relevant_set: return 1.0 if not returned else 0.0 top_k = set(returned[:k]) hits = sum(1 for doc_id in relevant_set if doc_id in top_k) return hits / len(relevant_set) def top3_hit(returned: list[int], top3_ids: list[int]) -> bool: """top3_ids가 비어있으면 True (체크 안함). 있으면 그 중 하나라도 top-3에 들어와야 함.""" if not top3_ids: return True top3 = set(returned[:3]) return any(doc_id in top3 for doc_id in top3_ids) # ───────────────────────────────────────────────────────── # API 호출 # ───────────────────────────────────────────────────────── async def call_search( client: httpx.AsyncClient, base_url: str, token: str, query: str, mode: str = "hybrid", limit: int = 20, fusion: str | None = None, rerank: str | None = None, analyze: str | None = None, embedding_backend: str | None = None, snapshot_doc_id_max: int | None = None, snapshot_chunk_id_max: int | None = None, reranker_backend: str | None = None, ) -> tuple[list[int], float]: """검색 API 호출 → (doc_ids, latency_ms).""" url = f"{base_url.rstrip('/')}/api/search/" headers = {"Authorization": f"Bearer {token}"} params: dict[str, str | int] = {"q": query, "mode": mode, "limit": limit} if fusion: params["fusion"] = fusion if rerank is not None: params["rerank"] = rerank if analyze is not None: params["analyze"] = analyze if embedding_backend is not None: params["embedding_backend"] = embedding_backend if snapshot_doc_id_max is not None: params["snapshot_doc_id_max"] = snapshot_doc_id_max if snapshot_chunk_id_max is not None: params["snapshot_chunk_id_max"] = snapshot_chunk_id_max if reranker_backend is not None: params["reranker_backend"] = reranker_backend import time start = time.perf_counter() response = await client.get(url, headers=headers, params=params, timeout=30.0) latency_ms = (time.perf_counter() - start) * 1000 response.raise_for_status() data = response.json() returned_ids = [r["id"] for r in data.get("results", [])] return returned_ids, latency_ms # ───────────────────────────────────────────────────────── # 평가 실행 # ───────────────────────────────────────────────────────── async def evaluate( queries: list[Query], base_url: str, token: str, label: str, mode: str = "hybrid", fusion: str | None = None, rerank: str | None = None, analyze: str | None = None, embedding_backend: str | None = None, snapshot_doc_id_max: int | None = None, snapshot_chunk_id_max: int | None = None, reranker_backend: str | None = None, ) -> list[QueryResult]: """전체 쿼리셋 평가.""" results: list[QueryResult] = [] async with httpx.AsyncClient() as client: for q in queries: try: returned_ids, latency_ms = await call_search( client, base_url, token, q.query, mode=mode, fusion=fusion, rerank=rerank, analyze=analyze, embedding_backend=embedding_backend, snapshot_doc_id_max=snapshot_doc_id_max, snapshot_chunk_id_max=snapshot_chunk_id_max, reranker_backend=reranker_backend, ) results.append( QueryResult( query=q, label=label, returned_ids=returned_ids, latency_ms=latency_ms, recall_at_10=recall_at_k(returned_ids, q.relevant_ids, 10), mrr_at_10=mrr_at_k(returned_ids, q.relevant_ids, 10), ndcg_at_10=ndcg_at_k(returned_ids, q.relevant_ids, 10), top3_hit=top3_hit(returned_ids, q.top3_ids), graded_ndcg_at_10=graded_ndcg_at_k(returned_ids, q.graded_relevance, 10), graded_recall_at_10_t2=graded_recall_at_k( returned_ids, q.graded_relevance, threshold=2, k=10 ), graded_recall_at_10_t3=graded_recall_at_k( returned_ids, q.graded_relevance, threshold=3, k=10 ), ) ) except Exception as exc: results.append( QueryResult( query=q, label=label, returned_ids=[], latency_ms=0.0, recall_at_10=0.0, mrr_at_10=0.0, ndcg_at_10=0.0, top3_hit=False, error=str(exc), ) ) return results # ───────────────────────────────────────────────────────── # 결과 집계 / 출력 # ───────────────────────────────────────────────────────── def percentile(values: list[float], p: float) -> float: if not values: return 0.0 s = sorted(values) k = (len(s) - 1) * p f = int(k) c = min(f + 1, len(s) - 1) if f == c: return s[f] return s[f] + (s[c] - s[f]) * (k - f) def print_summary( label: str, results: list[QueryResult], eval_version: str = "both", ) -> dict[str, Any]: """전체 + 카테고리별 요약 출력. 집계 dict 반환. eval_version: v0.1 — binary score 만 출력 (옛 점수 회귀 확인용) v0.2 — graded score + language/ocr_derived 별 집계 (Phase 1) both — 둘 다 출력 (default, baseline 박제용) """ n = len(results) if n == 0: return {} # 실패 케이스(relevant_ids=[])는 평균 recall/mrr/ndcg에서 제외 scored = [r for r in results if r.query.relevant_ids] failure_cases = [r for r in results if not r.query.relevant_ids] # v0.1 binary scores avg_recall = statistics.mean([r.recall_at_10 for r in scored]) if scored else 0.0 avg_mrr = statistics.mean([r.mrr_at_10 for r in scored]) if scored else 0.0 avg_ndcg = statistics.mean([r.ndcg_at_10 for r in scored]) if scored else 0.0 top3_rate = sum(1 for r in scored if r.top3_hit) / len(scored) if scored else 0.0 # v0.2 graded scores (graded_relevance 있는 케이스만 평균) graded_scored = [r for r in results if r.query.graded_relevance] avg_gndcg = ( statistics.mean([r.graded_ndcg_at_10 for r in graded_scored]) if graded_scored else 0.0 ) avg_grecall_t2 = ( statistics.mean([r.graded_recall_at_10_t2 for r in graded_scored]) if graded_scored else 0.0 ) avg_grecall_t3 = ( statistics.mean([r.graded_recall_at_10_t3 for r in graded_scored]) if graded_scored else 0.0 ) latencies = [r.latency_ms for r in results if r.latency_ms > 0] p50 = percentile(latencies, 0.50) p95 = percentile(latencies, 0.95) # 실패 케이스: 결과 0건이어야 정상 failure_correct = sum(1 for r in failure_cases if not r.returned_ids) failure_precision = ( failure_correct / len(failure_cases) if failure_cases else 0.0 ) show_v01 = eval_version in ("v0.1", "both") show_v02 = eval_version in ("v0.2", "both") print( f"\n=== {label} (n={n}, scored={len(scored)}, graded={len(graded_scored)}) ===" ) if show_v01: print(" -- v0.1 binary --") print(f" Recall@10 : {avg_recall:.3f}") print(f" MRR@10 : {avg_mrr:.3f}") print(f" NDCG@10 : {avg_ndcg:.3f}") print(f" Top-3 hit : {top3_rate:.3f}") if show_v02: print(" -- v0.2 graded --") print(f" NDCG@10 (graded) : {avg_gndcg:.3f}") print(f" Recall@10 (grade>=2) : {avg_grecall_t2:.3f}") print(f" Recall@10 (grade>=3) : {avg_grecall_t3:.3f}") print(f" Latency p50: {p50:.0f} ms") print(f" Latency p95: {p95:.0f} ms") if failure_cases: print( f" Failure-case precision: {failure_correct}/{len(failure_cases)}" f" ({failure_precision:.2f}) — empty result expected" ) # 카테고리별 by_cat: dict[str, list[QueryResult]] = {} for r in scored: by_cat.setdefault(r.query.category, []).append(r) by_cat_map: dict[str, dict[str, Any]] = {} print(" by category:") for cat, items in sorted(by_cat.items()): cat_recall = statistics.mean([r.recall_at_10 for r in items]) cat_ndcg = statistics.mean([r.ndcg_at_10 for r in items]) graded_items = [r for r in items if r.query.graded_relevance] cat_gndcg = ( statistics.mean([r.graded_ndcg_at_10 for r in graded_items]) if graded_items else 0.0 ) by_cat_map[cat] = { "n": len(items), "recall_at_10": cat_recall, "ndcg_at_10": cat_ndcg, "graded_ndcg_at_10": cat_gndcg, } if show_v02: print( f" {cat:<22} n={len(items):>2} recall={cat_recall:.2f} ndcg={cat_ndcg:.2f} gndcg={cat_gndcg:.2f}" ) else: print( f" {cat:<22} n={len(items):>2} recall={cat_recall:.2f} ndcg={cat_ndcg:.2f}" ) # v0.2: language 별 by_lang_map: dict[str, dict[str, Any]] = {} if show_v02: by_lang: dict[str, list[QueryResult]] = {} for r in scored: by_lang.setdefault(r.query.language, []).append(r) if by_lang: print(" by language:") for lang, items in sorted(by_lang.items()): lang_recall = statistics.mean([r.recall_at_10 for r in items]) graded_items = [r for r in items if r.query.graded_relevance] lang_gndcg = ( statistics.mean([r.graded_ndcg_at_10 for r in graded_items]) if graded_items else 0.0 ) by_lang_map[lang] = { "n": len(items), "recall_at_10": lang_recall, "graded_ndcg_at_10": lang_gndcg, } print( f" {lang:<10} n={len(items):>2} recall={lang_recall:.2f} gndcg={lang_gndcg:.2f}" ) # v0.2: ocr_derived 별 by_ocr_map: dict[str, dict[str, Any]] = {} if show_v02: by_ocr: dict[bool, list[QueryResult]] = {} for r in scored: by_ocr.setdefault(r.query.ocr_derived, []).append(r) # OCR-derived 케이스가 1개 이상일 때만 표시 if any(flag for flag in by_ocr.keys()): print(" by ocr_derived:") for flag, items in sorted(by_ocr.items()): ocr_recall = statistics.mean([r.recall_at_10 for r in items]) graded_items = [r for r in items if r.query.graded_relevance] ocr_gndcg = ( statistics.mean([r.graded_ndcg_at_10 for r in graded_items]) if graded_items else 0.0 ) by_ocr_map[str(flag).lower()] = { "n": len(items), "recall_at_10": ocr_recall, "graded_ndcg_at_10": ocr_gndcg, } print( f" {str(flag).lower():<10} n={len(items):>2} recall={ocr_recall:.2f} gndcg={ocr_gndcg:.2f}" ) # 에러 케이스 errors = [r for r in results if r.error] if errors: print(f" ERRORS ({len(errors)}):") for r in errors: print(f" [{r.query.id}] {r.error}") return { "n": n, "n_scored": len(scored), "n_graded": len(graded_scored), "recall_at_10": avg_recall, "mrr_at_10": avg_mrr, "ndcg_at_10": avg_ndcg, "top3_hit_rate": top3_rate, "graded_ndcg_at_10": avg_gndcg, "graded_recall_at_10_t2": avg_grecall_t2, "graded_recall_at_10_t3": avg_grecall_t3, "latency_p50": p50, "latency_p95": p95, "failure_precision": failure_precision, "by_category": by_cat_map, "by_language": by_lang_map, "by_ocr_derived": by_ocr_map, } def write_csv(results: list[QueryResult], output_path: Path) -> None: output_path.parent.mkdir(parents=True, exist_ok=True) with output_path.open("w", newline="", encoding="utf-8") as f: writer = csv.writer(f) writer.writerow( [ "label", "id", "category", "legacy_category", "intent", "domain_hint", "language", "ocr_derived", "failure_expected", "query", "relevant_ids", "graded_relevance", "returned_ids_top10", "latency_ms", "recall_at_10", "mrr_at_10", "ndcg_at_10", "top3_hit", "graded_ndcg_at_10", "graded_recall_at_10_t2", "graded_recall_at_10_t3", "error", ] ) for r in results: graded_str = ";".join( f"{did}:{g}" for did, g in sorted(r.query.graded_relevance.items()) ) writer.writerow( [ r.label, r.query.id, r.query.category, r.query.legacy_category, r.query.intent, r.query.domain_hint, r.query.language, "1" if r.query.ocr_derived else "0", "1" if r.query.failure_expected else "0", r.query.query, ";".join(map(str, r.query.relevant_ids)), graded_str, ";".join(map(str, r.returned_ids[:10])), f"{r.latency_ms:.1f}", f"{r.recall_at_10:.3f}", f"{r.mrr_at_10:.3f}", f"{r.ndcg_at_10:.3f}", "1" if r.top3_hit else "0", f"{r.graded_ndcg_at_10:.3f}", f"{r.graded_recall_at_10_t2:.3f}", f"{r.graded_recall_at_10_t3:.3f}", r.error or "", ] ) print(f"\nCSV written: {output_path}") # ───────────────────────────────────────────────────────── # 로딩 # ───────────────────────────────────────────────────────── def load_queries(yaml_path: Path) -> list[Query]: with yaml_path.open(encoding="utf-8") as f: data = yaml.safe_load(f) queries: list[Query] = [] for q in data["queries"]: relevant_ids = q.get("relevant_ids", []) or [] graded_raw = q.get("graded_relevance", {}) or {} graded = {int(k): int(v) for k, v in graded_raw.items()} # v0.1 fallback: if no graded_relevance but has relevant_ids, # treat top3_ids as grade 3 and remaining relevant_ids as grade 2. if not graded and relevant_ids: top3 = set(q.get("top3_ids", []) or []) for rid in relevant_ids: graded[int(rid)] = 3 if int(rid) in top3 else 2 queries.append( Query( id=q["id"], query=q["query"], category=q["category"], intent=q["intent"], domain_hint=q["domain_hint"], relevant_ids=relevant_ids, top3_ids=q.get("top3_ids", []) or [], notes=q.get("notes", "") or "", # v0.2 columns (graceful default for v0.1 yaml) legacy_category=q.get("legacy_category", q.get("category", "")) or "", language=q.get("language", "ko") or "ko", ocr_derived=bool(q.get("ocr_derived", False)), graded_relevance=graded, failure_expected=bool( q.get("failure_expected", not relevant_ids) ), ) ) return queries # ═════════════════════════════════════════════════════════════════ # 발주건 단위 baseline (Phase 0 / plan: merry-yawning-owl) # ───────────────────────────────────────────────────────────────── # 아래 섹션은 "구조화 추출 gap 측정" 전용 코드 경로. 기존 legacy # 쿼리 평가는 위 섹션 그대로 — 스키마/값 불변. # ═════════════════════════════════════════════════════════════════ # ───────────────────────────────────────────────────────── # 발주건 데이터 모델 # ───────────────────────────────────────────────────────── @dataclass class OrderGroupDoc: doc_id: int role: str # order_xlsx | order_pdf | calc_pdf @dataclass class OrderGroup: order_group_id: str description: str docs: list[OrderGroupDoc] def role_of(self, doc_id: int) -> str | None: for d in self.docs: if d.doc_id == doc_id: return d.role return None def roles_set(self) -> set[str]: return {d.role for d in self.docs} @dataclass class ExpectedLocation: doc_id: int role: str location_type: str # sheet_range | page | document_only location_value: str | None is_primary: bool @dataclass class OrderQuery: id: str query: str category: str # A | B | C | D order_group_id: str intent: str expected_locations: list[ExpectedLocation] notes: str = "" @dataclass class OrderQueryResult: query: OrderQuery returned_results: list[dict] latency_ms: float doc_match_top5: bool cross_format_eligible: bool cross_format_link_success_top10: bool cross_format_link_success_top5: bool range_citation_available: bool page_citation_available: bool matched_location_value: str | None manual_refind_flag: bool chunk_idx_stddev_top10: float | None error: str | None = None # ───────────────────────────────────────────────────────── # 발주건 지표 (Tier 1A / 1B / 2) # ───────────────────────────────────────────────────────── def _collect_expected_doc_ids(locs: list[ExpectedLocation]) -> set[int]: return {loc.doc_id for loc in locs} def doc_match_at_k(returned_ids: list[int], expected_doc_ids: set[int], k: int = 5) -> bool: """Top-k에 expected doc_id 중 하나라도 있는가 (Tier 1A guardrail).""" if not expected_doc_ids: return False return any(doc_id in expected_doc_ids for doc_id in returned_ids[:k]) def cross_format_link_success( returned_ids: list[int], expected_locations: list[ExpectedLocation], group: OrderGroup, k: int, ) -> tuple[bool, bool]: """Tier 1A 공식 지표. (success, eligible) 반환. Eligible: order_group이 서로 다른 role을 2개 이상 보유 (즉 cross-format 연결을 측정할 의미가 있는 그룹). Success (3조건 동시): ① is_primary=true expected role의 doc이 top-k에 1개 이상 존재 ② 다른 role의 doc이 top-k에 1개 이상 존재 ③ 두 doc 모두 동일 order_group 소속 "role 다른 doc_id 2개" 단순 존재 검사는 false positive 있어 사용 금지. """ if len(group.roles_set()) < 2: return False, False # ineligible primary_roles = {loc.role for loc in expected_locations if loc.is_primary} if not primary_roles: # primary 라벨이 없으면 eligible이긴 해도 success 판정 불가 return False, True top_k_set = set(returned_ids[:k]) group_doc_ids_by_role: dict[str, list[int]] = {} for d in group.docs: group_doc_ids_by_role.setdefault(d.role, []).append(d.doc_id) has_primary = any( doc_id in top_k_set for role in primary_roles for doc_id in group_doc_ids_by_role.get(role, []) ) other_roles = group.roles_set() - primary_roles has_other = any( doc_id in top_k_set for role in other_roles for doc_id in group_doc_ids_by_role.get(role, []) ) return (has_primary and has_other), True def range_citation_available(returned_results: list[dict]) -> bool: """Tier 2: 응답에 sheet_name 또는 cell_range 필드가 존재하고 비어있지 않은가. 현재 API(`app/api/search.py`)에는 해당 필드 없음 → baseline = False (0%). """ for r in returned_results: if r.get("sheet_name") or r.get("cell_range"): return True return False def page_citation_available(returned_results: list[dict]) -> bool: """Tier 2: 응답에 page 필드가 존재하고 비어있지 않은가. 현재 chunk.page는 항상 null → baseline = False (0%). """ for r in returned_results: page = r.get("page") if page is not None and page != "": return True return False def _tokenize_query(q: str) -> list[str]: """간단한 토큰화: 공백 split 후 2자 이상만.""" return [t for t in q.lower().split() if len(t) >= 2] def manual_refind_flag_v0( returned_results: list[dict], query_text: str, score_threshold: float = 0.5, ) -> bool: """Tier 1B v0 heuristic. top_1 score < threshold AND snippet 핵심 토큰 미포함. 주의: v0. 점수 임계값 0.5는 **임시값** — 검색 score calibration 바뀌면 baseline 간 비교가 흔들릴 수 있다. 절대값처럼 취급 금지. 보고서에 "heuristic vs 실감각 수동 교차검증" 결과 병기 필수. """ if not returned_results: return True top_1 = returned_results[0] score = top_1.get("score", 0.0) if score is None: score = 0.0 if score >= score_threshold: return False snippet = (top_1.get("snippet") or "").lower() title = (top_1.get("title") or "").lower() haystack = f"{title} {snippet}" tokens = _tokenize_query(query_text) if not tokens: return False has_any_token = any(t in haystack for t in tokens) return not has_any_token def _chunk_idx_stddev_top10(returned_results: list[dict]) -> float | None: """Top-10의 chunk_index 분산 (낮을수록 한 섹션에 몰림). Observational only.""" idxs = [r.get("chunk_index") for r in returned_results[:10]] vals = [i for i in idxs if isinstance(i, int)] if len(vals) < 2: return None return statistics.stdev(vals) def _matched_location_value( returned_results: list[dict], expected_locations: list[ExpectedLocation], ) -> str | None: """Tier 2 matched_location: 현재 API는 location 필드를 노출하지 않으므로 baseline에선 항상 None. Phase 1A/1B 구현 이후 값이 채워진다. """ # 현재 API 응답에 location 정보 없음 → 항상 None # Phase 1A/1B 구현 후 r.get("cell_range") / r.get("page") 체크로 확장 return None # ───────────────────────────────────────────────────────── # 발주건 API 호출 (full result dict 반환) # ───────────────────────────────────────────────────────── async def call_search_full( client: httpx.AsyncClient, base_url: str, token: str, query: str, mode: str = "hybrid", limit: int = 20, fusion: str | None = None, rerank: str | None = None, analyze: str | None = None, debug: bool = False, embedding_backend: str | None = None, snapshot_doc_id_max: int | None = None, snapshot_chunk_id_max: int | None = None, reranker_backend: str | None = None, ) -> tuple[list[dict], float]: """call_search와 동일 로직. 단 full result dict 리스트 반환.""" url = f"{base_url.rstrip('/')}/api/search/" headers = {"Authorization": f"Bearer {token}"} params: dict[str, str | int] = {"q": query, "mode": mode, "limit": limit} if fusion: params["fusion"] = fusion if rerank is not None: params["rerank"] = rerank if analyze is not None: params["analyze"] = analyze if debug: params["debug"] = "true" if embedding_backend is not None: params["embedding_backend"] = embedding_backend if snapshot_doc_id_max is not None: params["snapshot_doc_id_max"] = snapshot_doc_id_max if snapshot_chunk_id_max is not None: params["snapshot_chunk_id_max"] = snapshot_chunk_id_max if reranker_backend is not None: params["reranker_backend"] = reranker_backend import time start = time.perf_counter() response = await client.get(url, headers=headers, params=params, timeout=30.0) latency_ms = (time.perf_counter() - start) * 1000 response.raise_for_status() data = response.json() return data.get("results", []), latency_ms # ───────────────────────────────────────────────────────── # 발주건 평가 실행 # ───────────────────────────────────────────────────────── async def evaluate_orders( queries: list[OrderQuery], groups: dict[str, OrderGroup], base_url: str, token: str, mode: str = "hybrid", fusion: str | None = None, rerank: str | None = None, analyze: str | None = None, debug: bool = False, ) -> list[OrderQueryResult]: """발주건 쿼리셋 평가.""" results: list[OrderQueryResult] = [] async with httpx.AsyncClient() as client: for q in queries: group = groups.get(q.order_group_id) if group is None: results.append( OrderQueryResult( query=q, returned_results=[], latency_ms=0.0, doc_match_top5=False, cross_format_eligible=False, cross_format_link_success_top10=False, cross_format_link_success_top5=False, range_citation_available=False, page_citation_available=False, matched_location_value=None, manual_refind_flag=True, chunk_idx_stddev_top10=None, error=f"unknown order_group_id={q.order_group_id}", ) ) continue try: returned, latency_ms = await call_search_full( client, base_url, token, q.query, mode=mode, fusion=fusion, rerank=rerank, analyze=analyze, debug=debug, ) returned_ids = [r["id"] for r in returned] expected_ids = _collect_expected_doc_ids(q.expected_locations) cf10, eligible10 = cross_format_link_success(returned_ids, q.expected_locations, group, 10) cf5, _eligible5 = cross_format_link_success(returned_ids, q.expected_locations, group, 5) results.append( OrderQueryResult( query=q, returned_results=returned, latency_ms=latency_ms, doc_match_top5=doc_match_at_k(returned_ids, expected_ids, 5), cross_format_eligible=eligible10, cross_format_link_success_top10=cf10 if eligible10 else False, cross_format_link_success_top5=cf5 if eligible10 else False, range_citation_available=range_citation_available(returned), page_citation_available=page_citation_available(returned), matched_location_value=_matched_location_value(returned, q.expected_locations), manual_refind_flag=manual_refind_flag_v0(returned, q.query), chunk_idx_stddev_top10=_chunk_idx_stddev_top10(returned), ) ) except Exception as exc: results.append( OrderQueryResult( query=q, returned_results=[], latency_ms=0.0, doc_match_top5=False, cross_format_eligible=False, cross_format_link_success_top10=False, cross_format_link_success_top5=False, range_citation_available=False, page_citation_available=False, matched_location_value=None, manual_refind_flag=True, chunk_idx_stddev_top10=None, error=str(exc), ) ) return results # ───────────────────────────────────────────────────────── # 발주건 결과 집계 / 출력 # ───────────────────────────────────────────────────────── def print_order_summary(results: list[OrderQueryResult]) -> dict[str, Any]: """Tier 1A/1B/2 지표 요약. 절대 건수 병기. 집계 dict 반환.""" n = len(results) if n == 0: return {} # Tier 1A doc_match_count = sum(1 for r in results if r.doc_match_top5) eligible_results = [r for r in results if r.cross_format_eligible] cf10_success = sum(1 for r in eligible_results if r.cross_format_link_success_top10) cf5_success = sum(1 for r in eligible_results if r.cross_format_link_success_top5) # Tier 1B refind_flag_count = sum(1 for r in results if r.manual_refind_flag) stddev_values = [r.chunk_idx_stddev_top10 for r in results if r.chunk_idx_stddev_top10 is not None] avg_stddev = statistics.mean(stddev_values) if stddev_values else None # Tier 2 range_avail_count = sum(1 for r in results if r.range_citation_available) page_avail_count = sum(1 for r in results if r.page_citation_available) # Latency latencies = [r.latency_ms for r in results if r.latency_ms > 0] p50 = percentile(latencies, 0.50) p95 = percentile(latencies, 0.95) print(f"\n=== Order-unit baseline (n={n}) ===") print(" Tier 1A (gate 후보 / guardrail):") print( f" top_5_document_match_rate : {doc_match_count}/{n}" f" ({doc_match_count / n:.1%}) — Guardrail, 비악화 강제" ) if eligible_results: print( f" cross_format_link top-10 : {cf10_success}/{len(eligible_results)}" f" ({cf10_success / len(eligible_results):.1%}) [공식 gate 후보]" ) print( f" cross_format_link top-5 : {cf5_success}/{len(eligible_results)}" f" ({cf5_success / len(eligible_results):.1%}) [보조 관찰]" ) else: print(" cross_format_link : no eligible queries (group roles<2)") print(" Tier 1B (관찰용):") print( f" manual_refind_flag (v0) : {refind_flag_count}/{n}" f" ({refind_flag_count / n:.1%}) — heuristic, 수동 교차검증 필수" ) if avg_stddev is not None: print(f" chunk_idx_stddev_top10 (mean) : {avg_stddev:.2f}") print(" Tier 2 (auto-eval 기준, 현재 시스템 baseline = 0):") print( f" range_citation_available : {range_avail_count}/{n}" f" ({range_avail_count / n:.1%})" ) print( f" page_citation_available : {page_avail_count}/{n}" f" ({page_avail_count / n:.1%})" ) print(f" Latency p50 / p95 : {p50:.0f} / {p95:.0f} ms") # 카테고리별 rollup by_cat: dict[str, list[OrderQueryResult]] = {} for r in results: by_cat.setdefault(r.query.category, []).append(r) print(" by category (A/B/C/D):") for cat in sorted(by_cat.keys()): items = by_cat[cat] cat_doc = sum(1 for r in items if r.doc_match_top5) cat_cf_eligible = [r for r in items if r.cross_format_eligible] cat_cf10 = sum(1 for r in cat_cf_eligible if r.cross_format_link_success_top10) cf_str = ( f"cf10 {cat_cf10}/{len(cat_cf_eligible)}" if cat_cf_eligible else "cf10 n/a" ) print(f" {cat} n={len(items):>2} doc_match {cat_doc}/{len(items)} {cf_str}") # 발주건별 rollup by_group: dict[str, list[OrderQueryResult]] = {} for r in results: by_group.setdefault(r.query.order_group_id, []).append(r) print(" by order_group:") for gid in sorted(by_group.keys()): items = by_group[gid] g_doc = sum(1 for r in items if r.doc_match_top5) print(f" {gid} n={len(items):>2} doc_match {g_doc}/{len(items)}") # 에러 errors = [r for r in results if r.error] if errors: print(f" ERRORS ({len(errors)}):") for r in errors: print(f" [{r.query.id}] {r.error}") return { "n": n, "doc_match_top5": (doc_match_count, n), "cross_format_link_top10": (cf10_success, len(eligible_results)), "cross_format_link_top5": (cf5_success, len(eligible_results)), "manual_refind_flag": (refind_flag_count, n), "range_citation_available": (range_avail_count, n), "page_citation_available": (page_avail_count, n), "latency_p50": p50, "latency_p95": p95, } def write_order_csv(results: list[OrderQueryResult], output_path: Path) -> None: """발주건 baseline 전용 CSV. 기존 write_csv와 분리 — 스키마 간섭 없음.""" output_path.parent.mkdir(parents=True, exist_ok=True) columns = [ "id", "query", "category_abcd", "order_group_id", "intent", "expected_doc_ids", "expected_roles", "expected_location_type", "expected_location_value", "returned_ids_top10", "latency_ms", "doc_match_top5", "cross_format_eligible", "cross_format_link_success_top10", "cross_format_link_success_top5", "range_citation_available", "page_citation_available", "matched_location_value", "manual_refind_flag", "chunk_idx_stddev_top10", "notes", "error", ] with output_path.open("w", newline="", encoding="utf-8") as f: writer = csv.writer(f) writer.writerow(columns) for r in results: returned_ids = [item["id"] for item in r.returned_results[:10]] # primary 우선으로 location_type/value 선택 primary_locs = [loc for loc in r.query.expected_locations if loc.is_primary] repr_loc = primary_locs[0] if primary_locs else ( r.query.expected_locations[0] if r.query.expected_locations else None ) writer.writerow( [ r.query.id, r.query.query, r.query.category, r.query.order_group_id, r.query.intent, ";".join(str(loc.doc_id) for loc in r.query.expected_locations), ";".join(loc.role for loc in r.query.expected_locations), repr_loc.location_type if repr_loc else "", repr_loc.location_value if repr_loc and repr_loc.location_value else "", ";".join(map(str, returned_ids)), f"{r.latency_ms:.1f}", "1" if r.doc_match_top5 else "0", "1" if r.cross_format_eligible else "0", "1" if r.cross_format_link_success_top10 else "0", "1" if r.cross_format_link_success_top5 else "0", "1" if r.range_citation_available else "0", "1" if r.page_citation_available else "0", r.matched_location_value or "", "1" if r.manual_refind_flag else "0", f"{r.chunk_idx_stddev_top10:.2f}" if r.chunk_idx_stddev_top10 is not None else "", r.query.notes, r.error or "", ] ) print(f"\nOrder baseline CSV written: {output_path}") # ───────────────────────────────────────────────────────── # 발주건 YAML 로딩 # ───────────────────────────────────────────────────────── def load_order_groups(yaml_path: Path) -> dict[str, OrderGroup]: with yaml_path.open(encoding="utf-8") as f: data = yaml.safe_load(f) groups: dict[str, OrderGroup] = {} for g in data.get("groups", []): docs = [ OrderGroupDoc(doc_id=int(d["doc_id"]), role=d["role"]) for d in g.get("docs", []) ] groups[g["order_group_id"]] = OrderGroup( order_group_id=g["order_group_id"], description=g.get("description", "") or "", docs=docs, ) return groups def load_order_queries(yaml_path: Path) -> list[OrderQuery]: with yaml_path.open(encoding="utf-8") as f: data = yaml.safe_load(f) queries: list[OrderQuery] = [] for q in data.get("questions", []): locs = [] for loc in q.get("expected_locations", []) or []: locs.append( ExpectedLocation( doc_id=int(loc["doc_id"]), role=loc["role"], location_type=loc["location_type"], location_value=loc.get("location_value"), is_primary=bool(loc.get("is_primary", False)), ) ) queries.append( OrderQuery( id=q["id"], query=q["query"], category=q["category"], order_group_id=q["order_group_id"], intent=q.get("intent", "") or "", expected_locations=locs, notes=q.get("notes", "") or "", ) ) return queries # ───────────────────────────────────────────────────────── # CLI # ───────────────────────────────────────────────────────── def main() -> int: parser = argparse.ArgumentParser(description="Document Server 검색 평가") parser.add_argument( "--queries", type=Path, default=Path(__file__).parent / "queries.yaml", help="평가셋 YAML 경로", ) parser.add_argument( "--base-url", type=str, default=None, help="단일 평가용 URL (예: https://docs.hyungi.net)", ) parser.add_argument( "--baseline-url", type=str, default=None, help="A/B 비교용 baseline URL", ) parser.add_argument( "--candidate-url", type=str, default=None, help="A/B 비교용 candidate URL", ) parser.add_argument( "--mode", type=str, default="hybrid", choices=["fts", "trgm", "vector", "hybrid"], help="검색 mode 파라미터", ) parser.add_argument( "--fusion", type=str, default=None, choices=["legacy", "rrf", "rrf_boost"], help="hybrid 모드 fusion 전략 (Phase 0.5+, 미지정 시 서버 기본값)", ) parser.add_argument( "--rerank", type=str, default=None, choices=["true", "false"], help="bge-reranker-v2-m3 활성화 (Phase 1.3+, 미지정 시 서버 기본값=true)", ) parser.add_argument( "--analyze", type=str, default=None, choices=["true", "false"], help="QueryAnalyzer 활성화 (Phase 2.1+, cache hit 시 multilingual 적용)", ) parser.add_argument( "--token", type=str, default=os.environ.get("DOCSRV_TOKEN"), help="Bearer 토큰 (env DOCSRV_TOKEN)", ) parser.add_argument( "--output", type=Path, default=None, help="CSV 출력 경로 (지정하면 raw 결과 저장)", ) # 발주건 단위 baseline (Phase 0 / plan: merry-yawning-owl) parser.add_argument( "--queries-order", type=Path, default=None, help="발주건 쿼리 YAML (queries_order_baseline.yaml)", ) parser.add_argument( "--order-groups", type=Path, default=None, help="발주건 그룹 매핑 YAML (order_groups.yaml)", ) parser.add_argument( "--output-order", type=Path, default=None, help="발주건 baseline 전용 CSV 출력 경로 (legacy --output과 분리)", ) parser.add_argument( "--debug", action="store_true", help="검색 API debug=true 요청 (발주건 모드에서 응답 검증용)", ) parser.add_argument( "--eval-version", type=str, default="both", choices=["v0.1", "v0.2", "both"], help="점수 출력 모드 (Phase 1, default both). v0.1=binary only / v0.2=graded only / both=둘 다", ) parser.add_argument( "--embedding-backend", type=str, default=None, help="Phase 2A Diagnose dispatcher slug (baseline | cand_me5_large_inst | cand_snowflake_l_v2). 미지정 = production.", ) parser.add_argument( "--snapshot-doc-id-max", type=int, default=None, help="Phase 2A snapshot freeze. documents.id <= 값 filter. baseline rebaseline 도 동일 적용.", ) parser.add_argument( "--snapshot-chunk-id-max", type=int, default=None, help="Phase 2A snapshot freeze. document_chunks.id <= 값 filter. baseline rebaseline 도 동일 적용.", ) parser.add_argument( "--reranker-backend", type=str, default=None, help="Phase 2B Diagnose reranker dispatcher slug (baseline | cand_gte_ml_base). 미지정 = production.", ) args = parser.parse_args() if not args.token: print("ERROR: --token 또는 env DOCSRV_TOKEN 필요", file=sys.stderr) return 2 if not args.base_url and not (args.baseline_url and args.candidate_url): print( "ERROR: --base-url 또는 (--baseline-url + --candidate-url) 둘 중 하나 필요", file=sys.stderr, ) return 2 # 발주건 단위 baseline 모드 (Phase 0 / plan: merry-yawning-owl) run_order_mode = args.queries_order is not None # Legacy 경로 실행 조건: order-only 실행이 아닐 때 (= --queries-order + --output-order만 단독으로 # 준 경우는 skip). --output / --baseline-url / --candidate-url 중 하나라도 있으면 legacy도 실행. run_legacy_mode = ( not run_order_mode or args.output is not None or args.baseline_url is not None or args.candidate_url is not None ) if run_order_mode: if args.order_groups is None: print("ERROR: --queries-order 사용 시 --order-groups 필수", file=sys.stderr) return 2 if not args.base_url: print("ERROR: --queries-order 모드는 --base-url만 지원 (A/B 미지원)", file=sys.stderr) return 2 if not run_legacy_mode and not run_order_mode: print("ERROR: 실행할 평가 경로가 없음", file=sys.stderr) return 2 queries = load_queries(args.queries) if run_legacy_mode else [] if run_legacy_mode: print(f"Loaded {len(queries)} queries from {args.queries}") print(f"Mode: {args.mode}", end="") if args.fusion: print(f" / fusion: {args.fusion}", end="") if args.rerank: print(f" / rerank: {args.rerank}", end="") print() all_results: list[QueryResult] = [] if run_legacy_mode: if args.base_url: print(f"\n>>> evaluating: {args.base_url}") results = asyncio.run( evaluate(queries, args.base_url, args.token, "single", mode=args.mode, fusion=args.fusion, rerank=args.rerank, analyze=args.analyze, embedding_backend=args.embedding_backend, snapshot_doc_id_max=args.snapshot_doc_id_max, snapshot_chunk_id_max=args.snapshot_chunk_id_max, reranker_backend=args.reranker_backend) ) print_summary("single", results, eval_version=args.eval_version) all_results.extend(results) else: print(f"\n>>> baseline: {args.baseline_url}") baseline_results = asyncio.run( evaluate(queries, args.baseline_url, args.token, "baseline", mode=args.mode, fusion=args.fusion, rerank=args.rerank, analyze=args.analyze, embedding_backend=args.embedding_backend, snapshot_doc_id_max=args.snapshot_doc_id_max, snapshot_chunk_id_max=args.snapshot_chunk_id_max, reranker_backend=args.reranker_backend) ) baseline_summary = print_summary("baseline", baseline_results, eval_version=args.eval_version) print(f"\n>>> candidate: {args.candidate_url}") candidate_results = asyncio.run( evaluate( queries, args.candidate_url, args.token, "candidate", mode=args.mode, fusion=args.fusion, rerank=args.rerank, analyze=args.analyze, embedding_backend=args.embedding_backend, snapshot_doc_id_max=args.snapshot_doc_id_max, snapshot_chunk_id_max=args.snapshot_chunk_id_max, reranker_backend=args.reranker_backend ) ) candidate_summary = print_summary("candidate", candidate_results, eval_version=args.eval_version) # 델타 print("\n=== Δ (candidate - baseline) ===") for k in ( "recall_at_10", "mrr_at_10", "ndcg_at_10", "top3_hit_rate", "latency_p50", "latency_p95", ): delta = candidate_summary[k] - baseline_summary[k] sign = "+" if delta >= 0 else "" print(f" {k:<16}: {sign}{delta:.3f}") all_results.extend(baseline_results) all_results.extend(candidate_results) if args.output: write_csv(all_results, args.output) # 발주건 단위 baseline (Phase 0) if run_order_mode: order_queries = load_order_queries(args.queries_order) order_groups = load_order_groups(args.order_groups) print( f"\nLoaded {len(order_queries)} order queries from {args.queries_order}" f" / {len(order_groups)} groups from {args.order_groups}" ) order_results = asyncio.run( evaluate_orders( order_queries, order_groups, args.base_url, args.token, mode=args.mode, fusion=args.fusion, rerank=args.rerank, analyze=args.analyze, debug=args.debug, ) ) print_order_summary(order_results) if args.output_order: write_order_csv(order_results, args.output_order) elif not args.output: print( "\nNOTE: --output-order 미지정 — CSV 저장 skip. 결과는 stdout 요약만.", file=sys.stderr, ) return 0 if __name__ == "__main__": sys.exit(main())