624b9d523d
- RERANKER_BACKEND_MAP 에서 cand_gte_ml_base 슬러그 제거 (컨테이너·DB 테이블 마이그360·override 이미 종료) - docker-compose.override.cand.yml / override.rerank-cand.yml 삭제 - search.py allowlist · run_eval.py help 정합 - dispatcher scaffold(_resolve_reranker)는 보존 (후보 재진입 대비) Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
1538 lines
59 KiB
Python
1538 lines
59 KiB
Python
#!/usr/bin/env python3
|
|
"""Document Server 검색 평가 스크립트 (Phase 0.2)
|
|
|
|
queries.yaml을 읽어 /api/search 엔드포인트에 호출하고
|
|
Recall@10, MRR@10, NDCG@10, Top3 hit-rate, Latency p50/p95를 계산한다.
|
|
|
|
A/B 비교 모드: --baseline-url, --candidate-url 를 각각 지정하면
|
|
두 엔드포인트에 동일 쿼리셋을 던지고 결과를 비교한다.
|
|
|
|
발주건 단위 baseline 모드 (Phase 0 / plan: merry-yawning-owl):
|
|
--queries-order + --order-groups + --output-order 로 xlsx/PDF 구조화 추출
|
|
gap 측정용 Tier 1A/1B/2 지표를 계산한다. 기존 --queries 경로와 CSV
|
|
스키마는 변경되지 않는다 (출력 소비자 보호).
|
|
|
|
사용 예:
|
|
|
|
# 단일 평가
|
|
export DOCSRV_TOKEN="eyJ..."
|
|
python tests/search_eval/run_eval.py \
|
|
--base-url https://docs.hyungi.net \
|
|
--output reports/baseline_2026-04-07.csv
|
|
|
|
# A/B 비교 (같은 토큰)
|
|
python tests/search_eval/run_eval.py \
|
|
--baseline-url https://docs.hyungi.net \
|
|
--candidate-url http://localhost:8000 \
|
|
--output reports/phase1_vs_baseline.csv
|
|
|
|
# 발주건 단위 baseline
|
|
python tests/search_eval/run_eval.py \
|
|
--base-url http://localhost:8000 \
|
|
--queries-order tests/search_eval/queries_order_baseline.yaml \
|
|
--order-groups tests/search_eval/order_groups.yaml \
|
|
--output-order reports/baseline_order_unit_2026-04-20.csv
|
|
|
|
토큰은 env DOCSRV_TOKEN 또는 --token 플래그로 전달.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import asyncio
|
|
import csv
|
|
import math
|
|
import os
|
|
import statistics
|
|
import sys
|
|
from dataclasses import dataclass, field
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
import httpx
|
|
import yaml
|
|
|
|
|
|
# ─────────────────────────────────────────────────────────
|
|
# 데이터 구조
|
|
# ─────────────────────────────────────────────────────────
|
|
|
|
|
|
@dataclass
|
|
class Query:
|
|
id: str
|
|
query: str
|
|
category: str
|
|
intent: str
|
|
domain_hint: str
|
|
relevant_ids: list[int]
|
|
top3_ids: list[int] = field(default_factory=list)
|
|
notes: str = ""
|
|
# v0.2 schema additions (Phase 1 — graded relevance baseline)
|
|
legacy_category: str = ""
|
|
language: str = "ko"
|
|
ocr_derived: bool = False
|
|
graded_relevance: dict[int, int] = field(default_factory=dict)
|
|
failure_expected: bool = False
|
|
|
|
|
|
@dataclass
|
|
class QueryResult:
|
|
query: Query
|
|
label: str # "baseline" or "candidate"
|
|
returned_ids: list[int]
|
|
latency_ms: float
|
|
recall_at_10: float
|
|
mrr_at_10: float
|
|
ndcg_at_10: float
|
|
top3_hit: bool
|
|
# v0.2 graded scores (Phase 1)
|
|
graded_ndcg_at_10: float = 0.0
|
|
graded_recall_at_10_t2: float = 0.0
|
|
graded_recall_at_10_t3: float = 0.0
|
|
# PR-Eval-GradedNDCG-Dedup: returned[:k] 의 중복 doc 수 박제. inflation 검출 audit.
|
|
dedup_count: int = 0
|
|
error: str | None = None
|
|
|
|
|
|
# ─────────────────────────────────────────────────────────
|
|
# 평가 지표
|
|
# ─────────────────────────────────────────────────────────
|
|
|
|
|
|
def _dedup_returned_ids(returned: list[int], k: int) -> tuple[list[int], int]:
|
|
"""returned[:k] 의 첫 등장 순서 보존 dedup.
|
|
|
|
PR-Eval-GradedNDCG-Dedup ([[feedback_graded_ndcg_dedup_invariant]]). graded NDCG /
|
|
binary NDCG 계산은 top-N 에 unique doc 가정 — retrieval path 가 중복 doc 박제
|
|
가능 시 actual DCG > ideal DCG → NDCG > 1.0 invariant 위반. Phase 2Q Phase 3
|
|
NDCG 0.927 inflation origin.
|
|
|
|
Returns: (deduped_top_k, dedup_count) — dedup_count = top-k 영역에서 제거된 중복 entry 수.
|
|
"""
|
|
seen: set[int] = set()
|
|
deduped: list[int] = []
|
|
raw_top_k = returned[:k]
|
|
for doc_id in raw_top_k:
|
|
if doc_id in seen:
|
|
continue
|
|
seen.add(doc_id)
|
|
deduped.append(doc_id)
|
|
dedup_count = len(raw_top_k) - len(deduped)
|
|
return deduped, dedup_count
|
|
|
|
|
|
def count_dedup(returned: list[int], k: int = 10) -> int:
|
|
"""returned[:k] 의 중복 doc 수 (audit 용)."""
|
|
_, dedup_count = _dedup_returned_ids(returned, k)
|
|
return dedup_count
|
|
|
|
|
|
def recall_at_k(returned: list[int], relevant: list[int], k: int = 10) -> float:
|
|
"""top-k 안에 들어간 정답 비율. 정답 0개면 1.0(빈 케이스는 별도 fail metric)."""
|
|
if not relevant:
|
|
return 1.0 if not returned else 0.0 # 비어야 정상인 케이스: 결과 있으면 fail
|
|
top_k = set(returned[:k])
|
|
hits = sum(1 for doc_id in relevant if doc_id in top_k)
|
|
return hits / len(relevant)
|
|
|
|
|
|
def mrr_at_k(returned: list[int], relevant: list[int], k: int = 10) -> float:
|
|
"""top-k 안 첫 정답의 reciprocal rank. 정답 없으면 0."""
|
|
if not relevant:
|
|
return 0.0
|
|
relevant_set = set(relevant)
|
|
for rank, doc_id in enumerate(returned[:k], start=1):
|
|
if doc_id in relevant_set:
|
|
return 1.0 / rank
|
|
return 0.0
|
|
|
|
|
|
def ndcg_at_k(returned: list[int], relevant: list[int], k: int = 10) -> float:
|
|
"""binary relevance 기반 NDCG@k. top3_ids 같은 가중치는 v0.1에선 무시.
|
|
|
|
PR-Eval-GradedNDCG-Dedup: returned[:k] 진입 직전 dedup (중복 doc inflation 방지).
|
|
"""
|
|
if not relevant:
|
|
return 0.0
|
|
deduped, _ = _dedup_returned_ids(returned, k)
|
|
relevant_set = set(relevant)
|
|
dcg = 0.0
|
|
for rank, doc_id in enumerate(deduped, start=1):
|
|
if doc_id in relevant_set:
|
|
# binary gain = 1, DCG = 1 / log2(rank+1)
|
|
dcg += 1.0 / math.log2(rank + 1)
|
|
# ideal DCG: 정답을 1..min(len(relevant), k) 위치에 모두 채운 경우
|
|
ideal_hits = min(len(relevant), k)
|
|
idcg = sum(1.0 / math.log2(r + 1) for r in range(1, ideal_hits + 1))
|
|
return dcg / idcg if idcg > 0 else 0.0
|
|
|
|
|
|
def graded_ndcg_at_k(returned: list[int], grades: dict[int, int], k: int = 10) -> float:
|
|
"""graded NDCG@k. grades[doc_id] in {0,1,2,3}. v0.2 산출물.
|
|
|
|
gain = 2^grade - 1 (grade=0 → gain=0, grade=3 → gain=7).
|
|
ideal DCG = grades 를 grade 내림차순으로 top-k 채운 경우.
|
|
grades 비어 있으면 0.0 (failure_expected 케이스는 별도 처리).
|
|
|
|
PR-Eval-GradedNDCG-Dedup: returned[:k] 진입 직전 dedup. Phase 2Q Phase 3 NDCG 0.927
|
|
inflation (top-N doc 중복 박제) 같은 invariant 위반 회피.
|
|
[[feedback_graded_ndcg_dedup_invariant]].
|
|
"""
|
|
if not grades:
|
|
return 0.0
|
|
deduped, _ = _dedup_returned_ids(returned, k)
|
|
dcg = 0.0
|
|
for rank, doc_id in enumerate(deduped, start=1):
|
|
grade = grades.get(doc_id, 0)
|
|
if grade > 0:
|
|
dcg += (2 ** grade - 1) / math.log2(rank + 1)
|
|
sorted_grades = sorted(grades.values(), reverse=True)[:k]
|
|
idcg = sum(
|
|
(2 ** g - 1) / math.log2(r + 1)
|
|
for r, g in enumerate(sorted_grades, start=1)
|
|
if g > 0
|
|
)
|
|
return dcg / idcg if idcg > 0 else 0.0
|
|
|
|
|
|
def graded_recall_at_k(
|
|
returned: list[int],
|
|
grades: dict[int, int],
|
|
threshold: int = 2,
|
|
k: int = 10,
|
|
) -> float:
|
|
"""grade >= threshold 만 정답으로 본 recall@k. v0.2 산출물.
|
|
|
|
threshold=2 → grade 2/3 만 정답 (relevant 이상).
|
|
threshold=3 → grade 3 만 정답 (highly relevant 만).
|
|
"""
|
|
relevant_set = {doc_id for doc_id, g in grades.items() if g >= threshold}
|
|
if not relevant_set:
|
|
return 1.0 if not returned else 0.0
|
|
top_k = set(returned[:k])
|
|
hits = sum(1 for doc_id in relevant_set if doc_id in top_k)
|
|
return hits / len(relevant_set)
|
|
|
|
|
|
def top3_hit(returned: list[int], top3_ids: list[int]) -> bool:
|
|
"""top3_ids가 비어있으면 True (체크 안함). 있으면 그 중 하나라도 top-3에 들어와야 함."""
|
|
if not top3_ids:
|
|
return True
|
|
top3 = set(returned[:3])
|
|
return any(doc_id in top3 for doc_id in top3_ids)
|
|
|
|
|
|
# ─────────────────────────────────────────────────────────
|
|
# API 호출
|
|
# ─────────────────────────────────────────────────────────
|
|
|
|
|
|
async def call_search(
|
|
client: httpx.AsyncClient,
|
|
base_url: str,
|
|
token: str,
|
|
query: str,
|
|
mode: str = "hybrid",
|
|
limit: int = 20,
|
|
fusion: str | None = None,
|
|
rerank: str | None = None,
|
|
analyze: str | None = None,
|
|
embedding_backend: str | None = None,
|
|
snapshot_doc_id_max: int | None = None,
|
|
snapshot_chunk_id_max: int | None = None,
|
|
reranker_backend: str | None = None,
|
|
rewrite_backend: str | None = None,
|
|
corpus_variant: str | None = None,
|
|
exact_knn: bool = False,
|
|
) -> tuple[list[int], float]:
|
|
"""검색 API 호출 → (doc_ids, latency_ms)."""
|
|
url = f"{base_url.rstrip('/')}/api/search/"
|
|
headers = {"Authorization": f"Bearer {token}"}
|
|
params: dict[str, str | int] = {"q": query, "mode": mode, "limit": limit}
|
|
if fusion:
|
|
params["fusion"] = fusion
|
|
if rerank is not None:
|
|
params["rerank"] = rerank
|
|
if analyze is not None:
|
|
params["analyze"] = analyze
|
|
if embedding_backend is not None:
|
|
params["embedding_backend"] = embedding_backend
|
|
if snapshot_doc_id_max is not None:
|
|
params["snapshot_doc_id_max"] = snapshot_doc_id_max
|
|
if snapshot_chunk_id_max is not None:
|
|
params["snapshot_chunk_id_max"] = snapshot_chunk_id_max
|
|
if reranker_backend is not None:
|
|
params["reranker_backend"] = reranker_backend
|
|
if rewrite_backend is not None:
|
|
params["rewrite_backend"] = rewrite_backend
|
|
if corpus_variant is not None:
|
|
params["corpus_variant"] = corpus_variant
|
|
if exact_knn:
|
|
params["exact_knn"] = "true"
|
|
|
|
import time
|
|
|
|
start = time.perf_counter()
|
|
response = await client.get(url, headers=headers, params=params, timeout=30.0)
|
|
latency_ms = (time.perf_counter() - start) * 1000
|
|
|
|
response.raise_for_status()
|
|
data = response.json()
|
|
returned_ids = [r["id"] for r in data.get("results", [])]
|
|
return returned_ids, latency_ms
|
|
|
|
|
|
# ─────────────────────────────────────────────────────────
|
|
# 평가 실행
|
|
# ─────────────────────────────────────────────────────────
|
|
|
|
|
|
async def evaluate(
|
|
queries: list[Query],
|
|
base_url: str,
|
|
token: str,
|
|
label: str,
|
|
mode: str = "hybrid",
|
|
fusion: str | None = None,
|
|
rerank: str | None = None,
|
|
analyze: str | None = None,
|
|
embedding_backend: str | None = None,
|
|
snapshot_doc_id_max: int | None = None,
|
|
snapshot_chunk_id_max: int | None = None,
|
|
reranker_backend: str | None = None,
|
|
rewrite_backend: str | None = None,
|
|
corpus_variant: str | None = None,
|
|
exact_knn: bool = False,
|
|
) -> list[QueryResult]:
|
|
"""전체 쿼리셋 평가."""
|
|
results: list[QueryResult] = []
|
|
|
|
async with httpx.AsyncClient() as client:
|
|
for q in queries:
|
|
try:
|
|
returned_ids, latency_ms = await call_search(
|
|
client, base_url, token, q.query, mode=mode, fusion=fusion, rerank=rerank, analyze=analyze,
|
|
embedding_backend=embedding_backend,
|
|
snapshot_doc_id_max=snapshot_doc_id_max,
|
|
snapshot_chunk_id_max=snapshot_chunk_id_max,
|
|
reranker_backend=reranker_backend,
|
|
rewrite_backend=rewrite_backend,
|
|
corpus_variant=corpus_variant,
|
|
exact_knn=exact_knn,
|
|
)
|
|
dedup_count = count_dedup(returned_ids, 10)
|
|
if dedup_count > 0:
|
|
print(
|
|
f" [dedup] {q.id}: top-10 에 중복 doc {dedup_count}개 (inflation 회피)",
|
|
file=sys.stderr,
|
|
)
|
|
results.append(
|
|
QueryResult(
|
|
query=q,
|
|
label=label,
|
|
returned_ids=returned_ids,
|
|
latency_ms=latency_ms,
|
|
recall_at_10=recall_at_k(returned_ids, q.relevant_ids, 10),
|
|
mrr_at_10=mrr_at_k(returned_ids, q.relevant_ids, 10),
|
|
ndcg_at_10=ndcg_at_k(returned_ids, q.relevant_ids, 10),
|
|
top3_hit=top3_hit(returned_ids, q.top3_ids),
|
|
graded_ndcg_at_10=graded_ndcg_at_k(returned_ids, q.graded_relevance, 10),
|
|
graded_recall_at_10_t2=graded_recall_at_k(
|
|
returned_ids, q.graded_relevance, threshold=2, k=10
|
|
),
|
|
graded_recall_at_10_t3=graded_recall_at_k(
|
|
returned_ids, q.graded_relevance, threshold=3, k=10
|
|
),
|
|
dedup_count=dedup_count,
|
|
)
|
|
)
|
|
except Exception as exc:
|
|
results.append(
|
|
QueryResult(
|
|
query=q,
|
|
label=label,
|
|
returned_ids=[],
|
|
latency_ms=0.0,
|
|
recall_at_10=0.0,
|
|
mrr_at_10=0.0,
|
|
ndcg_at_10=0.0,
|
|
top3_hit=False,
|
|
error=str(exc),
|
|
)
|
|
)
|
|
return results
|
|
|
|
|
|
# ─────────────────────────────────────────────────────────
|
|
# 결과 집계 / 출력
|
|
# ─────────────────────────────────────────────────────────
|
|
|
|
|
|
def percentile(values: list[float], p: float) -> float:
|
|
if not values:
|
|
return 0.0
|
|
s = sorted(values)
|
|
k = (len(s) - 1) * p
|
|
f = int(k)
|
|
c = min(f + 1, len(s) - 1)
|
|
if f == c:
|
|
return s[f]
|
|
return s[f] + (s[c] - s[f]) * (k - f)
|
|
|
|
|
|
def print_summary(
|
|
label: str,
|
|
results: list[QueryResult],
|
|
eval_version: str = "both",
|
|
) -> dict[str, Any]:
|
|
"""전체 + 카테고리별 요약 출력. 집계 dict 반환.
|
|
|
|
eval_version:
|
|
v0.1 — binary score 만 출력 (옛 점수 회귀 확인용)
|
|
v0.2 — graded score + language/ocr_derived 별 집계 (Phase 1)
|
|
both — 둘 다 출력 (default, baseline 박제용)
|
|
"""
|
|
n = len(results)
|
|
if n == 0:
|
|
return {}
|
|
|
|
# 실패 케이스(relevant_ids=[])는 평균 recall/mrr/ndcg에서 제외
|
|
scored = [r for r in results if r.query.relevant_ids]
|
|
failure_cases = [r for r in results if not r.query.relevant_ids]
|
|
|
|
# v0.1 binary scores
|
|
avg_recall = statistics.mean([r.recall_at_10 for r in scored]) if scored else 0.0
|
|
avg_mrr = statistics.mean([r.mrr_at_10 for r in scored]) if scored else 0.0
|
|
avg_ndcg = statistics.mean([r.ndcg_at_10 for r in scored]) if scored else 0.0
|
|
top3_rate = sum(1 for r in scored if r.top3_hit) / len(scored) if scored else 0.0
|
|
|
|
# v0.2 graded scores (graded_relevance 있는 케이스만 평균)
|
|
graded_scored = [r for r in results if r.query.graded_relevance]
|
|
avg_gndcg = (
|
|
statistics.mean([r.graded_ndcg_at_10 for r in graded_scored])
|
|
if graded_scored
|
|
else 0.0
|
|
)
|
|
avg_grecall_t2 = (
|
|
statistics.mean([r.graded_recall_at_10_t2 for r in graded_scored])
|
|
if graded_scored
|
|
else 0.0
|
|
)
|
|
avg_grecall_t3 = (
|
|
statistics.mean([r.graded_recall_at_10_t3 for r in graded_scored])
|
|
if graded_scored
|
|
else 0.0
|
|
)
|
|
|
|
latencies = [r.latency_ms for r in results if r.latency_ms > 0]
|
|
p50 = percentile(latencies, 0.50)
|
|
p95 = percentile(latencies, 0.95)
|
|
|
|
# 실패 케이스: 결과 0건이어야 정상
|
|
failure_correct = sum(1 for r in failure_cases if not r.returned_ids)
|
|
failure_precision = (
|
|
failure_correct / len(failure_cases) if failure_cases else 0.0
|
|
)
|
|
|
|
show_v01 = eval_version in ("v0.1", "both")
|
|
show_v02 = eval_version in ("v0.2", "both")
|
|
|
|
print(
|
|
f"\n=== {label} (n={n}, scored={len(scored)}, graded={len(graded_scored)}) ==="
|
|
)
|
|
if show_v01:
|
|
print(" -- v0.1 binary --")
|
|
print(f" Recall@10 : {avg_recall:.3f}")
|
|
print(f" MRR@10 : {avg_mrr:.3f}")
|
|
print(f" NDCG@10 : {avg_ndcg:.3f}")
|
|
print(f" Top-3 hit : {top3_rate:.3f}")
|
|
if show_v02:
|
|
print(" -- v0.2 graded --")
|
|
print(f" NDCG@10 (graded) : {avg_gndcg:.3f}")
|
|
print(f" Recall@10 (grade>=2) : {avg_grecall_t2:.3f}")
|
|
print(f" Recall@10 (grade>=3) : {avg_grecall_t3:.3f}")
|
|
print(f" Latency p50: {p50:.0f} ms")
|
|
print(f" Latency p95: {p95:.0f} ms")
|
|
if failure_cases:
|
|
print(
|
|
f" Failure-case precision: {failure_correct}/{len(failure_cases)}"
|
|
f" ({failure_precision:.2f}) — empty result expected"
|
|
)
|
|
# PR-Eval-GradedNDCG-Dedup: dedup audit stats (inflation 검출).
|
|
dedup_cases = [r for r in results if r.dedup_count > 0]
|
|
dedup_total = sum(r.dedup_count for r in dedup_cases)
|
|
print(
|
|
f" Dedup audit: {len(dedup_cases)}/{len(results)} cases with dedup applied"
|
|
f" (totaling {dedup_total} chunks). "
|
|
+ ("⚠️ inflation 의심 — retrieval path 검증" if dedup_cases else "✓ 정상 (top-N unique doc invariant)")
|
|
)
|
|
|
|
# 카테고리별
|
|
by_cat: dict[str, list[QueryResult]] = {}
|
|
for r in scored:
|
|
by_cat.setdefault(r.query.category, []).append(r)
|
|
by_cat_map: dict[str, dict[str, Any]] = {}
|
|
print(" by category:")
|
|
for cat, items in sorted(by_cat.items()):
|
|
cat_recall = statistics.mean([r.recall_at_10 for r in items])
|
|
cat_ndcg = statistics.mean([r.ndcg_at_10 for r in items])
|
|
graded_items = [r for r in items if r.query.graded_relevance]
|
|
cat_gndcg = (
|
|
statistics.mean([r.graded_ndcg_at_10 for r in graded_items])
|
|
if graded_items
|
|
else 0.0
|
|
)
|
|
by_cat_map[cat] = {
|
|
"n": len(items),
|
|
"recall_at_10": cat_recall,
|
|
"ndcg_at_10": cat_ndcg,
|
|
"graded_ndcg_at_10": cat_gndcg,
|
|
}
|
|
if show_v02:
|
|
print(
|
|
f" {cat:<22} n={len(items):>2} recall={cat_recall:.2f} ndcg={cat_ndcg:.2f} gndcg={cat_gndcg:.2f}"
|
|
)
|
|
else:
|
|
print(
|
|
f" {cat:<22} n={len(items):>2} recall={cat_recall:.2f} ndcg={cat_ndcg:.2f}"
|
|
)
|
|
|
|
# v0.2: language 별
|
|
by_lang_map: dict[str, dict[str, Any]] = {}
|
|
if show_v02:
|
|
by_lang: dict[str, list[QueryResult]] = {}
|
|
for r in scored:
|
|
by_lang.setdefault(r.query.language, []).append(r)
|
|
if by_lang:
|
|
print(" by language:")
|
|
for lang, items in sorted(by_lang.items()):
|
|
lang_recall = statistics.mean([r.recall_at_10 for r in items])
|
|
graded_items = [r for r in items if r.query.graded_relevance]
|
|
lang_gndcg = (
|
|
statistics.mean([r.graded_ndcg_at_10 for r in graded_items])
|
|
if graded_items
|
|
else 0.0
|
|
)
|
|
by_lang_map[lang] = {
|
|
"n": len(items),
|
|
"recall_at_10": lang_recall,
|
|
"graded_ndcg_at_10": lang_gndcg,
|
|
}
|
|
print(
|
|
f" {lang:<10} n={len(items):>2} recall={lang_recall:.2f} gndcg={lang_gndcg:.2f}"
|
|
)
|
|
|
|
# v0.2: ocr_derived 별
|
|
by_ocr_map: dict[str, dict[str, Any]] = {}
|
|
if show_v02:
|
|
by_ocr: dict[bool, list[QueryResult]] = {}
|
|
for r in scored:
|
|
by_ocr.setdefault(r.query.ocr_derived, []).append(r)
|
|
# OCR-derived 케이스가 1개 이상일 때만 표시
|
|
if any(flag for flag in by_ocr.keys()):
|
|
print(" by ocr_derived:")
|
|
for flag, items in sorted(by_ocr.items()):
|
|
ocr_recall = statistics.mean([r.recall_at_10 for r in items])
|
|
graded_items = [r for r in items if r.query.graded_relevance]
|
|
ocr_gndcg = (
|
|
statistics.mean([r.graded_ndcg_at_10 for r in graded_items])
|
|
if graded_items
|
|
else 0.0
|
|
)
|
|
by_ocr_map[str(flag).lower()] = {
|
|
"n": len(items),
|
|
"recall_at_10": ocr_recall,
|
|
"graded_ndcg_at_10": ocr_gndcg,
|
|
}
|
|
print(
|
|
f" {str(flag).lower():<10} n={len(items):>2} recall={ocr_recall:.2f} gndcg={ocr_gndcg:.2f}"
|
|
)
|
|
|
|
# 에러 케이스
|
|
errors = [r for r in results if r.error]
|
|
if errors:
|
|
print(f" ERRORS ({len(errors)}):")
|
|
for r in errors:
|
|
print(f" [{r.query.id}] {r.error}")
|
|
|
|
return {
|
|
"n": n,
|
|
"n_scored": len(scored),
|
|
"n_graded": len(graded_scored),
|
|
"recall_at_10": avg_recall,
|
|
"mrr_at_10": avg_mrr,
|
|
"ndcg_at_10": avg_ndcg,
|
|
"top3_hit_rate": top3_rate,
|
|
"graded_ndcg_at_10": avg_gndcg,
|
|
"graded_recall_at_10_t2": avg_grecall_t2,
|
|
"graded_recall_at_10_t3": avg_grecall_t3,
|
|
"latency_p50": p50,
|
|
"latency_p95": p95,
|
|
"failure_precision": failure_precision,
|
|
"by_category": by_cat_map,
|
|
"by_language": by_lang_map,
|
|
"by_ocr_derived": by_ocr_map,
|
|
}
|
|
|
|
|
|
def write_csv(results: list[QueryResult], output_path: Path) -> None:
|
|
output_path.parent.mkdir(parents=True, exist_ok=True)
|
|
with output_path.open("w", newline="", encoding="utf-8") as f:
|
|
writer = csv.writer(f)
|
|
writer.writerow(
|
|
[
|
|
"label",
|
|
"id",
|
|
"category",
|
|
"legacy_category",
|
|
"intent",
|
|
"domain_hint",
|
|
"language",
|
|
"ocr_derived",
|
|
"failure_expected",
|
|
"query",
|
|
"relevant_ids",
|
|
"graded_relevance",
|
|
"returned_ids_top10",
|
|
"latency_ms",
|
|
"recall_at_10",
|
|
"mrr_at_10",
|
|
"ndcg_at_10",
|
|
"top3_hit",
|
|
"graded_ndcg_at_10",
|
|
"graded_recall_at_10_t2",
|
|
"graded_recall_at_10_t3",
|
|
"dedup_count",
|
|
"error",
|
|
]
|
|
)
|
|
for r in results:
|
|
graded_str = ";".join(
|
|
f"{did}:{g}" for did, g in sorted(r.query.graded_relevance.items())
|
|
)
|
|
writer.writerow(
|
|
[
|
|
r.label,
|
|
r.query.id,
|
|
r.query.category,
|
|
r.query.legacy_category,
|
|
r.query.intent,
|
|
r.query.domain_hint,
|
|
r.query.language,
|
|
"1" if r.query.ocr_derived else "0",
|
|
"1" if r.query.failure_expected else "0",
|
|
r.query.query,
|
|
";".join(map(str, r.query.relevant_ids)),
|
|
graded_str,
|
|
";".join(map(str, r.returned_ids[:10])),
|
|
f"{r.latency_ms:.1f}",
|
|
f"{r.recall_at_10:.3f}",
|
|
f"{r.mrr_at_10:.3f}",
|
|
f"{r.ndcg_at_10:.3f}",
|
|
"1" if r.top3_hit else "0",
|
|
f"{r.graded_ndcg_at_10:.3f}",
|
|
f"{r.graded_recall_at_10_t2:.3f}",
|
|
f"{r.graded_recall_at_10_t3:.3f}",
|
|
str(r.dedup_count),
|
|
r.error or "",
|
|
]
|
|
)
|
|
print(f"\nCSV written: {output_path}")
|
|
|
|
|
|
# ─────────────────────────────────────────────────────────
|
|
# 로딩
|
|
# ─────────────────────────────────────────────────────────
|
|
|
|
|
|
def load_queries(yaml_path: Path) -> list[Query]:
|
|
with yaml_path.open(encoding="utf-8") as f:
|
|
data = yaml.safe_load(f)
|
|
queries: list[Query] = []
|
|
for q in data["queries"]:
|
|
relevant_ids = q.get("relevant_ids", []) or []
|
|
graded_raw = q.get("graded_relevance", {}) or {}
|
|
graded = {int(k): int(v) for k, v in graded_raw.items()}
|
|
# v0.1 fallback: if no graded_relevance but has relevant_ids,
|
|
# treat top3_ids as grade 3 and remaining relevant_ids as grade 2.
|
|
if not graded and relevant_ids:
|
|
top3 = set(q.get("top3_ids", []) or [])
|
|
for rid in relevant_ids:
|
|
graded[int(rid)] = 3 if int(rid) in top3 else 2
|
|
queries.append(
|
|
Query(
|
|
id=q["id"],
|
|
query=q["query"],
|
|
category=q["category"],
|
|
intent=q["intent"],
|
|
domain_hint=q["domain_hint"],
|
|
relevant_ids=relevant_ids,
|
|
top3_ids=q.get("top3_ids", []) or [],
|
|
notes=q.get("notes", "") or "",
|
|
# v0.2 columns (graceful default for v0.1 yaml)
|
|
legacy_category=q.get("legacy_category", q.get("category", "")) or "",
|
|
language=q.get("language", "ko") or "ko",
|
|
ocr_derived=bool(q.get("ocr_derived", False)),
|
|
graded_relevance=graded,
|
|
failure_expected=bool(
|
|
q.get("failure_expected", not relevant_ids)
|
|
),
|
|
)
|
|
)
|
|
return queries
|
|
|
|
|
|
# ═════════════════════════════════════════════════════════════════
|
|
# 발주건 단위 baseline (Phase 0 / plan: merry-yawning-owl)
|
|
# ─────────────────────────────────────────────────────────────────
|
|
# 아래 섹션은 "구조화 추출 gap 측정" 전용 코드 경로. 기존 legacy
|
|
# 쿼리 평가는 위 섹션 그대로 — 스키마/값 불변.
|
|
# ═════════════════════════════════════════════════════════════════
|
|
|
|
|
|
# ─────────────────────────────────────────────────────────
|
|
# 발주건 데이터 모델
|
|
# ─────────────────────────────────────────────────────────
|
|
|
|
|
|
@dataclass
|
|
class OrderGroupDoc:
|
|
doc_id: int
|
|
role: str # order_xlsx | order_pdf | calc_pdf
|
|
|
|
|
|
@dataclass
|
|
class OrderGroup:
|
|
order_group_id: str
|
|
description: str
|
|
docs: list[OrderGroupDoc]
|
|
|
|
def role_of(self, doc_id: int) -> str | None:
|
|
for d in self.docs:
|
|
if d.doc_id == doc_id:
|
|
return d.role
|
|
return None
|
|
|
|
def roles_set(self) -> set[str]:
|
|
return {d.role for d in self.docs}
|
|
|
|
|
|
@dataclass
|
|
class ExpectedLocation:
|
|
doc_id: int
|
|
role: str
|
|
location_type: str # sheet_range | page | document_only
|
|
location_value: str | None
|
|
is_primary: bool
|
|
|
|
|
|
@dataclass
|
|
class OrderQuery:
|
|
id: str
|
|
query: str
|
|
category: str # A | B | C | D
|
|
order_group_id: str
|
|
intent: str
|
|
expected_locations: list[ExpectedLocation]
|
|
notes: str = ""
|
|
|
|
|
|
@dataclass
|
|
class OrderQueryResult:
|
|
query: OrderQuery
|
|
returned_results: list[dict]
|
|
latency_ms: float
|
|
doc_match_top5: bool
|
|
cross_format_eligible: bool
|
|
cross_format_link_success_top10: bool
|
|
cross_format_link_success_top5: bool
|
|
range_citation_available: bool
|
|
page_citation_available: bool
|
|
matched_location_value: str | None
|
|
manual_refind_flag: bool
|
|
chunk_idx_stddev_top10: float | None
|
|
error: str | None = None
|
|
|
|
|
|
# ─────────────────────────────────────────────────────────
|
|
# 발주건 지표 (Tier 1A / 1B / 2)
|
|
# ─────────────────────────────────────────────────────────
|
|
|
|
|
|
def _collect_expected_doc_ids(locs: list[ExpectedLocation]) -> set[int]:
|
|
return {loc.doc_id for loc in locs}
|
|
|
|
|
|
def doc_match_at_k(returned_ids: list[int], expected_doc_ids: set[int], k: int = 5) -> bool:
|
|
"""Top-k에 expected doc_id 중 하나라도 있는가 (Tier 1A guardrail)."""
|
|
if not expected_doc_ids:
|
|
return False
|
|
return any(doc_id in expected_doc_ids for doc_id in returned_ids[:k])
|
|
|
|
|
|
def cross_format_link_success(
|
|
returned_ids: list[int],
|
|
expected_locations: list[ExpectedLocation],
|
|
group: OrderGroup,
|
|
k: int,
|
|
) -> tuple[bool, bool]:
|
|
"""Tier 1A 공식 지표. (success, eligible) 반환.
|
|
|
|
Eligible: order_group이 서로 다른 role을 2개 이상 보유 (즉 cross-format
|
|
연결을 측정할 의미가 있는 그룹).
|
|
|
|
Success (3조건 동시):
|
|
① is_primary=true expected role의 doc이 top-k에 1개 이상 존재
|
|
② 다른 role의 doc이 top-k에 1개 이상 존재
|
|
③ 두 doc 모두 동일 order_group 소속
|
|
|
|
"role 다른 doc_id 2개" 단순 존재 검사는 false positive 있어 사용 금지.
|
|
"""
|
|
if len(group.roles_set()) < 2:
|
|
return False, False # ineligible
|
|
|
|
primary_roles = {loc.role for loc in expected_locations if loc.is_primary}
|
|
if not primary_roles:
|
|
# primary 라벨이 없으면 eligible이긴 해도 success 판정 불가
|
|
return False, True
|
|
|
|
top_k_set = set(returned_ids[:k])
|
|
group_doc_ids_by_role: dict[str, list[int]] = {}
|
|
for d in group.docs:
|
|
group_doc_ids_by_role.setdefault(d.role, []).append(d.doc_id)
|
|
|
|
has_primary = any(
|
|
doc_id in top_k_set
|
|
for role in primary_roles
|
|
for doc_id in group_doc_ids_by_role.get(role, [])
|
|
)
|
|
other_roles = group.roles_set() - primary_roles
|
|
has_other = any(
|
|
doc_id in top_k_set
|
|
for role in other_roles
|
|
for doc_id in group_doc_ids_by_role.get(role, [])
|
|
)
|
|
return (has_primary and has_other), True
|
|
|
|
|
|
def range_citation_available(returned_results: list[dict]) -> bool:
|
|
"""Tier 2: 응답에 sheet_name 또는 cell_range 필드가 존재하고 비어있지 않은가.
|
|
|
|
현재 API(`app/api/search.py`)에는 해당 필드 없음 → baseline = False (0%).
|
|
"""
|
|
for r in returned_results:
|
|
if r.get("sheet_name") or r.get("cell_range"):
|
|
return True
|
|
return False
|
|
|
|
|
|
def page_citation_available(returned_results: list[dict]) -> bool:
|
|
"""Tier 2: 응답에 page 필드가 존재하고 비어있지 않은가.
|
|
|
|
현재 chunk.page는 항상 null → baseline = False (0%).
|
|
"""
|
|
for r in returned_results:
|
|
page = r.get("page")
|
|
if page is not None and page != "":
|
|
return True
|
|
return False
|
|
|
|
|
|
def _tokenize_query(q: str) -> list[str]:
|
|
"""간단한 토큰화: 공백 split 후 2자 이상만."""
|
|
return [t for t in q.lower().split() if len(t) >= 2]
|
|
|
|
|
|
def manual_refind_flag_v0(
|
|
returned_results: list[dict],
|
|
query_text: str,
|
|
score_threshold: float = 0.5,
|
|
) -> bool:
|
|
"""Tier 1B v0 heuristic. top_1 score < threshold AND snippet 핵심 토큰 미포함.
|
|
|
|
주의: v0. 점수 임계값 0.5는 **임시값** — 검색 score calibration 바뀌면
|
|
baseline 간 비교가 흔들릴 수 있다. 절대값처럼 취급 금지. 보고서에
|
|
"heuristic vs 실감각 수동 교차검증" 결과 병기 필수.
|
|
"""
|
|
if not returned_results:
|
|
return True
|
|
top_1 = returned_results[0]
|
|
score = top_1.get("score", 0.0)
|
|
if score is None:
|
|
score = 0.0
|
|
if score >= score_threshold:
|
|
return False
|
|
snippet = (top_1.get("snippet") or "").lower()
|
|
title = (top_1.get("title") or "").lower()
|
|
haystack = f"{title} {snippet}"
|
|
tokens = _tokenize_query(query_text)
|
|
if not tokens:
|
|
return False
|
|
has_any_token = any(t in haystack for t in tokens)
|
|
return not has_any_token
|
|
|
|
|
|
def _chunk_idx_stddev_top10(returned_results: list[dict]) -> float | None:
|
|
"""Top-10의 chunk_index 분산 (낮을수록 한 섹션에 몰림). Observational only."""
|
|
idxs = [r.get("chunk_index") for r in returned_results[:10]]
|
|
vals = [i for i in idxs if isinstance(i, int)]
|
|
if len(vals) < 2:
|
|
return None
|
|
return statistics.stdev(vals)
|
|
|
|
|
|
def _matched_location_value(
|
|
returned_results: list[dict],
|
|
expected_locations: list[ExpectedLocation],
|
|
) -> str | None:
|
|
"""Tier 2 matched_location: 현재 API는 location 필드를 노출하지 않으므로
|
|
baseline에선 항상 None. Phase 1A/1B 구현 이후 값이 채워진다.
|
|
"""
|
|
# 현재 API 응답에 location 정보 없음 → 항상 None
|
|
# Phase 1A/1B 구현 후 r.get("cell_range") / r.get("page") 체크로 확장
|
|
return None
|
|
|
|
|
|
# ─────────────────────────────────────────────────────────
|
|
# 발주건 API 호출 (full result dict 반환)
|
|
# ─────────────────────────────────────────────────────────
|
|
|
|
|
|
async def call_search_full(
|
|
client: httpx.AsyncClient,
|
|
base_url: str,
|
|
token: str,
|
|
query: str,
|
|
mode: str = "hybrid",
|
|
limit: int = 20,
|
|
fusion: str | None = None,
|
|
rerank: str | None = None,
|
|
analyze: str | None = None,
|
|
debug: bool = False,
|
|
embedding_backend: str | None = None,
|
|
snapshot_doc_id_max: int | None = None,
|
|
snapshot_chunk_id_max: int | None = None,
|
|
reranker_backend: str | None = None,
|
|
rewrite_backend: str | None = None,
|
|
) -> tuple[list[dict], float]:
|
|
"""call_search와 동일 로직. 단 full result dict 리스트 반환."""
|
|
url = f"{base_url.rstrip('/')}/api/search/"
|
|
headers = {"Authorization": f"Bearer {token}"}
|
|
params: dict[str, str | int] = {"q": query, "mode": mode, "limit": limit}
|
|
if fusion:
|
|
params["fusion"] = fusion
|
|
if rerank is not None:
|
|
params["rerank"] = rerank
|
|
if analyze is not None:
|
|
params["analyze"] = analyze
|
|
if debug:
|
|
params["debug"] = "true"
|
|
if embedding_backend is not None:
|
|
params["embedding_backend"] = embedding_backend
|
|
if snapshot_doc_id_max is not None:
|
|
params["snapshot_doc_id_max"] = snapshot_doc_id_max
|
|
if snapshot_chunk_id_max is not None:
|
|
params["snapshot_chunk_id_max"] = snapshot_chunk_id_max
|
|
if reranker_backend is not None:
|
|
params["reranker_backend"] = reranker_backend
|
|
if rewrite_backend is not None:
|
|
params["rewrite_backend"] = rewrite_backend
|
|
|
|
import time
|
|
|
|
start = time.perf_counter()
|
|
response = await client.get(url, headers=headers, params=params, timeout=30.0)
|
|
latency_ms = (time.perf_counter() - start) * 1000
|
|
response.raise_for_status()
|
|
data = response.json()
|
|
return data.get("results", []), latency_ms
|
|
|
|
|
|
# ─────────────────────────────────────────────────────────
|
|
# 발주건 평가 실행
|
|
# ─────────────────────────────────────────────────────────
|
|
|
|
|
|
async def evaluate_orders(
|
|
queries: list[OrderQuery],
|
|
groups: dict[str, OrderGroup],
|
|
base_url: str,
|
|
token: str,
|
|
mode: str = "hybrid",
|
|
fusion: str | None = None,
|
|
rerank: str | None = None,
|
|
analyze: str | None = None,
|
|
debug: bool = False,
|
|
) -> list[OrderQueryResult]:
|
|
"""발주건 쿼리셋 평가."""
|
|
results: list[OrderQueryResult] = []
|
|
async with httpx.AsyncClient() as client:
|
|
for q in queries:
|
|
group = groups.get(q.order_group_id)
|
|
if group is None:
|
|
results.append(
|
|
OrderQueryResult(
|
|
query=q,
|
|
returned_results=[],
|
|
latency_ms=0.0,
|
|
doc_match_top5=False,
|
|
cross_format_eligible=False,
|
|
cross_format_link_success_top10=False,
|
|
cross_format_link_success_top5=False,
|
|
range_citation_available=False,
|
|
page_citation_available=False,
|
|
matched_location_value=None,
|
|
manual_refind_flag=True,
|
|
chunk_idx_stddev_top10=None,
|
|
error=f"unknown order_group_id={q.order_group_id}",
|
|
)
|
|
)
|
|
continue
|
|
try:
|
|
returned, latency_ms = await call_search_full(
|
|
client, base_url, token, q.query,
|
|
mode=mode, fusion=fusion, rerank=rerank, analyze=analyze, debug=debug,
|
|
)
|
|
returned_ids = [r["id"] for r in returned]
|
|
expected_ids = _collect_expected_doc_ids(q.expected_locations)
|
|
|
|
cf10, eligible10 = cross_format_link_success(returned_ids, q.expected_locations, group, 10)
|
|
cf5, _eligible5 = cross_format_link_success(returned_ids, q.expected_locations, group, 5)
|
|
|
|
results.append(
|
|
OrderQueryResult(
|
|
query=q,
|
|
returned_results=returned,
|
|
latency_ms=latency_ms,
|
|
doc_match_top5=doc_match_at_k(returned_ids, expected_ids, 5),
|
|
cross_format_eligible=eligible10,
|
|
cross_format_link_success_top10=cf10 if eligible10 else False,
|
|
cross_format_link_success_top5=cf5 if eligible10 else False,
|
|
range_citation_available=range_citation_available(returned),
|
|
page_citation_available=page_citation_available(returned),
|
|
matched_location_value=_matched_location_value(returned, q.expected_locations),
|
|
manual_refind_flag=manual_refind_flag_v0(returned, q.query),
|
|
chunk_idx_stddev_top10=_chunk_idx_stddev_top10(returned),
|
|
)
|
|
)
|
|
except Exception as exc:
|
|
results.append(
|
|
OrderQueryResult(
|
|
query=q,
|
|
returned_results=[],
|
|
latency_ms=0.0,
|
|
doc_match_top5=False,
|
|
cross_format_eligible=False,
|
|
cross_format_link_success_top10=False,
|
|
cross_format_link_success_top5=False,
|
|
range_citation_available=False,
|
|
page_citation_available=False,
|
|
matched_location_value=None,
|
|
manual_refind_flag=True,
|
|
chunk_idx_stddev_top10=None,
|
|
error=str(exc),
|
|
)
|
|
)
|
|
return results
|
|
|
|
|
|
# ─────────────────────────────────────────────────────────
|
|
# 발주건 결과 집계 / 출력
|
|
# ─────────────────────────────────────────────────────────
|
|
|
|
|
|
def print_order_summary(results: list[OrderQueryResult]) -> dict[str, Any]:
|
|
"""Tier 1A/1B/2 지표 요약. 절대 건수 병기. 집계 dict 반환."""
|
|
n = len(results)
|
|
if n == 0:
|
|
return {}
|
|
|
|
# Tier 1A
|
|
doc_match_count = sum(1 for r in results if r.doc_match_top5)
|
|
eligible_results = [r for r in results if r.cross_format_eligible]
|
|
cf10_success = sum(1 for r in eligible_results if r.cross_format_link_success_top10)
|
|
cf5_success = sum(1 for r in eligible_results if r.cross_format_link_success_top5)
|
|
|
|
# Tier 1B
|
|
refind_flag_count = sum(1 for r in results if r.manual_refind_flag)
|
|
stddev_values = [r.chunk_idx_stddev_top10 for r in results if r.chunk_idx_stddev_top10 is not None]
|
|
avg_stddev = statistics.mean(stddev_values) if stddev_values else None
|
|
|
|
# Tier 2
|
|
range_avail_count = sum(1 for r in results if r.range_citation_available)
|
|
page_avail_count = sum(1 for r in results if r.page_citation_available)
|
|
|
|
# Latency
|
|
latencies = [r.latency_ms for r in results if r.latency_ms > 0]
|
|
p50 = percentile(latencies, 0.50)
|
|
p95 = percentile(latencies, 0.95)
|
|
|
|
print(f"\n=== Order-unit baseline (n={n}) ===")
|
|
print(" Tier 1A (gate 후보 / guardrail):")
|
|
print(
|
|
f" top_5_document_match_rate : {doc_match_count}/{n}"
|
|
f" ({doc_match_count / n:.1%}) — Guardrail, 비악화 강제"
|
|
)
|
|
if eligible_results:
|
|
print(
|
|
f" cross_format_link top-10 : {cf10_success}/{len(eligible_results)}"
|
|
f" ({cf10_success / len(eligible_results):.1%}) [공식 gate 후보]"
|
|
)
|
|
print(
|
|
f" cross_format_link top-5 : {cf5_success}/{len(eligible_results)}"
|
|
f" ({cf5_success / len(eligible_results):.1%}) [보조 관찰]"
|
|
)
|
|
else:
|
|
print(" cross_format_link : no eligible queries (group roles<2)")
|
|
|
|
print(" Tier 1B (관찰용):")
|
|
print(
|
|
f" manual_refind_flag (v0) : {refind_flag_count}/{n}"
|
|
f" ({refind_flag_count / n:.1%}) — heuristic, 수동 교차검증 필수"
|
|
)
|
|
if avg_stddev is not None:
|
|
print(f" chunk_idx_stddev_top10 (mean) : {avg_stddev:.2f}")
|
|
|
|
print(" Tier 2 (auto-eval 기준, 현재 시스템 baseline = 0):")
|
|
print(
|
|
f" range_citation_available : {range_avail_count}/{n}"
|
|
f" ({range_avail_count / n:.1%})"
|
|
)
|
|
print(
|
|
f" page_citation_available : {page_avail_count}/{n}"
|
|
f" ({page_avail_count / n:.1%})"
|
|
)
|
|
|
|
print(f" Latency p50 / p95 : {p50:.0f} / {p95:.0f} ms")
|
|
|
|
# 카테고리별 rollup
|
|
by_cat: dict[str, list[OrderQueryResult]] = {}
|
|
for r in results:
|
|
by_cat.setdefault(r.query.category, []).append(r)
|
|
print(" by category (A/B/C/D):")
|
|
for cat in sorted(by_cat.keys()):
|
|
items = by_cat[cat]
|
|
cat_doc = sum(1 for r in items if r.doc_match_top5)
|
|
cat_cf_eligible = [r for r in items if r.cross_format_eligible]
|
|
cat_cf10 = sum(1 for r in cat_cf_eligible if r.cross_format_link_success_top10)
|
|
cf_str = (
|
|
f"cf10 {cat_cf10}/{len(cat_cf_eligible)}"
|
|
if cat_cf_eligible else "cf10 n/a"
|
|
)
|
|
print(f" {cat} n={len(items):>2} doc_match {cat_doc}/{len(items)} {cf_str}")
|
|
|
|
# 발주건별 rollup
|
|
by_group: dict[str, list[OrderQueryResult]] = {}
|
|
for r in results:
|
|
by_group.setdefault(r.query.order_group_id, []).append(r)
|
|
print(" by order_group:")
|
|
for gid in sorted(by_group.keys()):
|
|
items = by_group[gid]
|
|
g_doc = sum(1 for r in items if r.doc_match_top5)
|
|
print(f" {gid} n={len(items):>2} doc_match {g_doc}/{len(items)}")
|
|
|
|
# 에러
|
|
errors = [r for r in results if r.error]
|
|
if errors:
|
|
print(f" ERRORS ({len(errors)}):")
|
|
for r in errors:
|
|
print(f" [{r.query.id}] {r.error}")
|
|
|
|
return {
|
|
"n": n,
|
|
"doc_match_top5": (doc_match_count, n),
|
|
"cross_format_link_top10": (cf10_success, len(eligible_results)),
|
|
"cross_format_link_top5": (cf5_success, len(eligible_results)),
|
|
"manual_refind_flag": (refind_flag_count, n),
|
|
"range_citation_available": (range_avail_count, n),
|
|
"page_citation_available": (page_avail_count, n),
|
|
"latency_p50": p50,
|
|
"latency_p95": p95,
|
|
}
|
|
|
|
|
|
def write_order_csv(results: list[OrderQueryResult], output_path: Path) -> None:
|
|
"""발주건 baseline 전용 CSV. 기존 write_csv와 분리 — 스키마 간섭 없음."""
|
|
output_path.parent.mkdir(parents=True, exist_ok=True)
|
|
columns = [
|
|
"id",
|
|
"query",
|
|
"category_abcd",
|
|
"order_group_id",
|
|
"intent",
|
|
"expected_doc_ids",
|
|
"expected_roles",
|
|
"expected_location_type",
|
|
"expected_location_value",
|
|
"returned_ids_top10",
|
|
"latency_ms",
|
|
"doc_match_top5",
|
|
"cross_format_eligible",
|
|
"cross_format_link_success_top10",
|
|
"cross_format_link_success_top5",
|
|
"range_citation_available",
|
|
"page_citation_available",
|
|
"matched_location_value",
|
|
"manual_refind_flag",
|
|
"chunk_idx_stddev_top10",
|
|
"notes",
|
|
"error",
|
|
]
|
|
with output_path.open("w", newline="", encoding="utf-8") as f:
|
|
writer = csv.writer(f)
|
|
writer.writerow(columns)
|
|
for r in results:
|
|
returned_ids = [item["id"] for item in r.returned_results[:10]]
|
|
# primary 우선으로 location_type/value 선택
|
|
primary_locs = [loc for loc in r.query.expected_locations if loc.is_primary]
|
|
repr_loc = primary_locs[0] if primary_locs else (
|
|
r.query.expected_locations[0] if r.query.expected_locations else None
|
|
)
|
|
writer.writerow(
|
|
[
|
|
r.query.id,
|
|
r.query.query,
|
|
r.query.category,
|
|
r.query.order_group_id,
|
|
r.query.intent,
|
|
";".join(str(loc.doc_id) for loc in r.query.expected_locations),
|
|
";".join(loc.role for loc in r.query.expected_locations),
|
|
repr_loc.location_type if repr_loc else "",
|
|
repr_loc.location_value if repr_loc and repr_loc.location_value else "",
|
|
";".join(map(str, returned_ids)),
|
|
f"{r.latency_ms:.1f}",
|
|
"1" if r.doc_match_top5 else "0",
|
|
"1" if r.cross_format_eligible else "0",
|
|
"1" if r.cross_format_link_success_top10 else "0",
|
|
"1" if r.cross_format_link_success_top5 else "0",
|
|
"1" if r.range_citation_available else "0",
|
|
"1" if r.page_citation_available else "0",
|
|
r.matched_location_value or "",
|
|
"1" if r.manual_refind_flag else "0",
|
|
f"{r.chunk_idx_stddev_top10:.2f}" if r.chunk_idx_stddev_top10 is not None else "",
|
|
r.query.notes,
|
|
r.error or "",
|
|
]
|
|
)
|
|
print(f"\nOrder baseline CSV written: {output_path}")
|
|
|
|
|
|
# ─────────────────────────────────────────────────────────
|
|
# 발주건 YAML 로딩
|
|
# ─────────────────────────────────────────────────────────
|
|
|
|
|
|
def load_order_groups(yaml_path: Path) -> dict[str, OrderGroup]:
|
|
with yaml_path.open(encoding="utf-8") as f:
|
|
data = yaml.safe_load(f)
|
|
groups: dict[str, OrderGroup] = {}
|
|
for g in data.get("groups", []):
|
|
docs = [
|
|
OrderGroupDoc(doc_id=int(d["doc_id"]), role=d["role"])
|
|
for d in g.get("docs", [])
|
|
]
|
|
groups[g["order_group_id"]] = OrderGroup(
|
|
order_group_id=g["order_group_id"],
|
|
description=g.get("description", "") or "",
|
|
docs=docs,
|
|
)
|
|
return groups
|
|
|
|
|
|
def load_order_queries(yaml_path: Path) -> list[OrderQuery]:
|
|
with yaml_path.open(encoding="utf-8") as f:
|
|
data = yaml.safe_load(f)
|
|
queries: list[OrderQuery] = []
|
|
for q in data.get("questions", []):
|
|
locs = []
|
|
for loc in q.get("expected_locations", []) or []:
|
|
locs.append(
|
|
ExpectedLocation(
|
|
doc_id=int(loc["doc_id"]),
|
|
role=loc["role"],
|
|
location_type=loc["location_type"],
|
|
location_value=loc.get("location_value"),
|
|
is_primary=bool(loc.get("is_primary", False)),
|
|
)
|
|
)
|
|
queries.append(
|
|
OrderQuery(
|
|
id=q["id"],
|
|
query=q["query"],
|
|
category=q["category"],
|
|
order_group_id=q["order_group_id"],
|
|
intent=q.get("intent", "") or "",
|
|
expected_locations=locs,
|
|
notes=q.get("notes", "") or "",
|
|
)
|
|
)
|
|
return queries
|
|
|
|
|
|
# ─────────────────────────────────────────────────────────
|
|
# CLI
|
|
# ─────────────────────────────────────────────────────────
|
|
|
|
|
|
def main() -> int:
|
|
parser = argparse.ArgumentParser(description="Document Server 검색 평가")
|
|
parser.add_argument(
|
|
"--queries",
|
|
type=Path,
|
|
default=Path(__file__).parent / "queries.yaml",
|
|
help="평가셋 YAML 경로",
|
|
)
|
|
parser.add_argument(
|
|
"--base-url",
|
|
type=str,
|
|
default=None,
|
|
help="단일 평가용 URL (예: https://docs.hyungi.net)",
|
|
)
|
|
parser.add_argument(
|
|
"--baseline-url",
|
|
type=str,
|
|
default=None,
|
|
help="A/B 비교용 baseline URL",
|
|
)
|
|
parser.add_argument(
|
|
"--candidate-url",
|
|
type=str,
|
|
default=None,
|
|
help="A/B 비교용 candidate URL",
|
|
)
|
|
parser.add_argument(
|
|
"--mode",
|
|
type=str,
|
|
default="hybrid",
|
|
choices=["fts", "trgm", "vector", "hybrid"],
|
|
help="검색 mode 파라미터",
|
|
)
|
|
parser.add_argument(
|
|
"--fusion",
|
|
type=str,
|
|
default=None,
|
|
choices=["legacy", "rrf", "rrf_boost"],
|
|
help="hybrid 모드 fusion 전략 (Phase 0.5+, 미지정 시 서버 기본값)",
|
|
)
|
|
parser.add_argument(
|
|
"--rerank",
|
|
type=str,
|
|
default=None,
|
|
choices=["true", "false"],
|
|
help="bge-reranker-v2-m3 활성화 (Phase 1.3+, 미지정 시 서버 기본값=true)",
|
|
)
|
|
parser.add_argument(
|
|
"--analyze",
|
|
type=str,
|
|
default=None,
|
|
choices=["true", "false"],
|
|
help="QueryAnalyzer 활성화 (Phase 2.1+, cache hit 시 multilingual 적용)",
|
|
)
|
|
parser.add_argument(
|
|
"--token",
|
|
type=str,
|
|
default=os.environ.get("DOCSRV_TOKEN"),
|
|
help="Bearer 토큰 (env DOCSRV_TOKEN)",
|
|
)
|
|
parser.add_argument(
|
|
"--output",
|
|
type=Path,
|
|
default=None,
|
|
help="CSV 출력 경로 (지정하면 raw 결과 저장)",
|
|
)
|
|
# 발주건 단위 baseline (Phase 0 / plan: merry-yawning-owl)
|
|
parser.add_argument(
|
|
"--queries-order",
|
|
type=Path,
|
|
default=None,
|
|
help="발주건 쿼리 YAML (queries_order_baseline.yaml)",
|
|
)
|
|
parser.add_argument(
|
|
"--order-groups",
|
|
type=Path,
|
|
default=None,
|
|
help="발주건 그룹 매핑 YAML (order_groups.yaml)",
|
|
)
|
|
parser.add_argument(
|
|
"--output-order",
|
|
type=Path,
|
|
default=None,
|
|
help="발주건 baseline 전용 CSV 출력 경로 (legacy --output과 분리)",
|
|
)
|
|
parser.add_argument(
|
|
"--debug",
|
|
action="store_true",
|
|
help="검색 API debug=true 요청 (발주건 모드에서 응답 검증용)",
|
|
)
|
|
parser.add_argument(
|
|
"--eval-version",
|
|
type=str,
|
|
default="both",
|
|
choices=["v0.1", "v0.2", "both"],
|
|
help="점수 출력 모드 (Phase 1, default both). v0.1=binary only / v0.2=graded only / both=둘 다",
|
|
)
|
|
parser.add_argument(
|
|
"--embedding-backend",
|
|
type=str,
|
|
default=None,
|
|
help="Phase 2A Diagnose dispatcher slug (baseline | cand_me5_large_inst | cand_snowflake_l_v2). 미지정 = production.",
|
|
)
|
|
parser.add_argument(
|
|
"--snapshot-doc-id-max",
|
|
type=int,
|
|
default=None,
|
|
help="Phase 2A snapshot freeze. documents.id <= 값 filter. baseline rebaseline 도 동일 적용.",
|
|
)
|
|
parser.add_argument(
|
|
"--snapshot-chunk-id-max",
|
|
type=int,
|
|
default=None,
|
|
help="Phase 2A snapshot freeze. document_chunks.id <= 값 filter. baseline rebaseline 도 동일 적용.",
|
|
)
|
|
parser.add_argument(
|
|
"--reranker-backend",
|
|
type=str,
|
|
default=None,
|
|
help="Phase 2B Diagnose reranker dispatcher slug (baseline). 후보 cand_gte_ml_base = NO-GO 종결·teardown(2026-06-18). 미지정 = production.",
|
|
)
|
|
parser.add_argument(
|
|
"--rewrite-backend",
|
|
type=str,
|
|
default=None,
|
|
help="Phase 2Q Diagnose query rewrite dispatcher slug (baseline | cand_multi_query_macmini | cand_multi_query_macbook). 미지정 = single-query path. Phase 1B scaffold = variants 박제만, retrieval 합성은 Phase 2.",
|
|
)
|
|
parser.add_argument(
|
|
"--corpus-variant",
|
|
type=str,
|
|
default=None,
|
|
choices=["prehier", "hier_sim_raw", "hier_sim_clean"],
|
|
help="Hier-Replace-Diagnose-1: chunk leg 측정 뷰 (prehier=legacy baseline | hier_sim_raw | hier_sim_clean). 미지정 = production corpus_chunks.",
|
|
)
|
|
parser.add_argument(
|
|
"--exact-knn",
|
|
action="store_true",
|
|
help="Hier-Replace-Diagnose-1: vector leg exact KNN (ivfflat 근사 제거). prehier vs hier_sim 공정 비교용. eval 전용.",
|
|
)
|
|
|
|
args = parser.parse_args()
|
|
|
|
if not args.token:
|
|
print("ERROR: --token 또는 env DOCSRV_TOKEN 필요", file=sys.stderr)
|
|
return 2
|
|
|
|
if not args.base_url and not (args.baseline_url and args.candidate_url):
|
|
print(
|
|
"ERROR: --base-url 또는 (--baseline-url + --candidate-url) 둘 중 하나 필요",
|
|
file=sys.stderr,
|
|
)
|
|
return 2
|
|
|
|
# 발주건 단위 baseline 모드 (Phase 0 / plan: merry-yawning-owl)
|
|
run_order_mode = args.queries_order is not None
|
|
# Legacy 경로 실행 조건: order-only 실행이 아닐 때 (= --queries-order + --output-order만 단독으로
|
|
# 준 경우는 skip). --output / --baseline-url / --candidate-url 중 하나라도 있으면 legacy도 실행.
|
|
run_legacy_mode = (
|
|
not run_order_mode
|
|
or args.output is not None
|
|
or args.baseline_url is not None
|
|
or args.candidate_url is not None
|
|
)
|
|
|
|
if run_order_mode:
|
|
if args.order_groups is None:
|
|
print("ERROR: --queries-order 사용 시 --order-groups 필수", file=sys.stderr)
|
|
return 2
|
|
if not args.base_url:
|
|
print("ERROR: --queries-order 모드는 --base-url만 지원 (A/B 미지원)", file=sys.stderr)
|
|
return 2
|
|
|
|
if not run_legacy_mode and not run_order_mode:
|
|
print("ERROR: 실행할 평가 경로가 없음", file=sys.stderr)
|
|
return 2
|
|
|
|
queries = load_queries(args.queries) if run_legacy_mode else []
|
|
if run_legacy_mode:
|
|
print(f"Loaded {len(queries)} queries from {args.queries}")
|
|
print(f"Mode: {args.mode}", end="")
|
|
if args.fusion:
|
|
print(f" / fusion: {args.fusion}", end="")
|
|
if args.rerank:
|
|
print(f" / rerank: {args.rerank}", end="")
|
|
print()
|
|
|
|
all_results: list[QueryResult] = []
|
|
|
|
if run_legacy_mode:
|
|
if args.base_url:
|
|
print(f"\n>>> evaluating: {args.base_url}")
|
|
results = asyncio.run(
|
|
evaluate(queries, args.base_url, args.token, "single", mode=args.mode, fusion=args.fusion, rerank=args.rerank, analyze=args.analyze, embedding_backend=args.embedding_backend, snapshot_doc_id_max=args.snapshot_doc_id_max, snapshot_chunk_id_max=args.snapshot_chunk_id_max, reranker_backend=args.reranker_backend, rewrite_backend=args.rewrite_backend, corpus_variant=args.corpus_variant, exact_knn=args.exact_knn)
|
|
)
|
|
print_summary("single", results, eval_version=args.eval_version)
|
|
all_results.extend(results)
|
|
else:
|
|
print(f"\n>>> baseline: {args.baseline_url}")
|
|
baseline_results = asyncio.run(
|
|
evaluate(queries, args.baseline_url, args.token, "baseline", mode=args.mode, fusion=args.fusion, rerank=args.rerank, analyze=args.analyze, embedding_backend=args.embedding_backend, snapshot_doc_id_max=args.snapshot_doc_id_max, snapshot_chunk_id_max=args.snapshot_chunk_id_max, reranker_backend=args.reranker_backend, rewrite_backend=args.rewrite_backend, corpus_variant=args.corpus_variant, exact_knn=args.exact_knn)
|
|
)
|
|
baseline_summary = print_summary("baseline", baseline_results, eval_version=args.eval_version)
|
|
|
|
print(f"\n>>> candidate: {args.candidate_url}")
|
|
candidate_results = asyncio.run(
|
|
evaluate(
|
|
queries, args.candidate_url, args.token, "candidate", mode=args.mode, fusion=args.fusion, rerank=args.rerank, analyze=args.analyze, embedding_backend=args.embedding_backend, snapshot_doc_id_max=args.snapshot_doc_id_max, snapshot_chunk_id_max=args.snapshot_chunk_id_max, reranker_backend=args.reranker_backend, rewrite_backend=args.rewrite_backend, corpus_variant=args.corpus_variant, exact_knn=args.exact_knn
|
|
)
|
|
)
|
|
candidate_summary = print_summary("candidate", candidate_results, eval_version=args.eval_version)
|
|
|
|
# 델타
|
|
print("\n=== Δ (candidate - baseline) ===")
|
|
for k in (
|
|
"recall_at_10",
|
|
"mrr_at_10",
|
|
"ndcg_at_10",
|
|
"top3_hit_rate",
|
|
"latency_p50",
|
|
"latency_p95",
|
|
):
|
|
delta = candidate_summary[k] - baseline_summary[k]
|
|
sign = "+" if delta >= 0 else ""
|
|
print(f" {k:<16}: {sign}{delta:.3f}")
|
|
|
|
all_results.extend(baseline_results)
|
|
all_results.extend(candidate_results)
|
|
|
|
if args.output:
|
|
write_csv(all_results, args.output)
|
|
|
|
# 발주건 단위 baseline (Phase 0)
|
|
if run_order_mode:
|
|
order_queries = load_order_queries(args.queries_order)
|
|
order_groups = load_order_groups(args.order_groups)
|
|
print(
|
|
f"\nLoaded {len(order_queries)} order queries from {args.queries_order}"
|
|
f" / {len(order_groups)} groups from {args.order_groups}"
|
|
)
|
|
order_results = asyncio.run(
|
|
evaluate_orders(
|
|
order_queries, order_groups, args.base_url, args.token,
|
|
mode=args.mode, fusion=args.fusion, rerank=args.rerank, analyze=args.analyze,
|
|
debug=args.debug,
|
|
)
|
|
)
|
|
print_order_summary(order_results)
|
|
if args.output_order:
|
|
write_order_csv(order_results, args.output_order)
|
|
elif not args.output:
|
|
print(
|
|
"\nNOTE: --output-order 미지정 — CSV 저장 skip. 결과는 stdout 요약만.",
|
|
file=sys.stderr,
|
|
)
|
|
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|