feat(eval): E.6 runner + 평가셋 main 복원 (from feat/eval-infra)

selective checkout (not cherry-pick):
- scripts/run_eval_ask.py (RESULT_FIELDS 21 고정, X-Source:eval 헤더)
- evals/ask_analyze_v1.jsonl (300 case = ask 220 + analyze 80)

E.3/E.6 측정 진입점. feat/eval-infra 의 원본은 유지.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Hyungi Ahn
2026-04-17 09:10:18 +09:00
parent 3971cf08d2
commit c82d52e73f
2 changed files with 743 additions and 0 deletions
+443
View File
@@ -0,0 +1,443 @@
#!/usr/bin/env python3
"""Phase E 묶음 B — ask/analyze 평가 runner.
evals/ask_analyze_v1.jsonl 을 읽어 /ask (GET) + /{doc_id}/analyze (POST)
엔드포인트에 호출하고 원시 측정값을 JSONL 로 기록한다.
집계(E.5)는 별도 스크립트에서 수행. 이 runner 는 "raw 값만" 저장한다.
X-Source: eval 헤더를 박아 DB ask_events/analyze_events 에 source='eval' 로 분리 기록됨.
"""
from __future__ import annotations
import argparse
import asyncio
import json
import os
import re
import sys
import time
from dataclasses import dataclass
from pathlib import Path
from typing import Any
import httpx
# ─────────────────────────────────────────────────────────
# 결과 JSONL 스키마 (순서 고정 — E.5 집계가 이 스키마 기준으로 동작)
# ─────────────────────────────────────────────────────────
RESULT_FIELDS: list[str] = [
"id", "type", "category", "golden", "doc_id",
"query", "response", "latency_ms", "error", "error_code",
"answer_length", "citation_count", "completeness", "refused",
"critical_keywords_hit",
"layers_count", "expected_layers_hit", "truncated", "cached",
"prompt_version", "model_name",
] # 21 fields, order locked
# ─────────────────────────────────────────────────────────
# Normalize (critical_keywords_hit 계산용)
# ─────────────────────────────────────────────────────────
_NORMALIZE_REMOVE = re.compile(r"[\s·()\[\]{}.,;:!?\"'`]")
def normalize(text: str) -> str:
"""공백/중점/괄호/구두점 제거 + 소문자화. 형태소 분석은 하지 않음.
예: "유해·위험요인 (risk)""유해위험요인risk"
"""
return _NORMALIZE_REMOVE.sub("", text).lower()
def keywords_hit(answer: str, keywords: list[str]) -> dict[str, bool]:
"""각 키워드가 normalize 후 answer 내에 substring 매칭되는지 dict 반환."""
norm_answer = normalize(answer or "")
return {kw: normalize(kw) in norm_answer for kw in keywords}
# ─────────────────────────────────────────────────────────
# HTTP status → analyze error_code 추정 (DB 와 정확히 동일하진 않음, 근사)
# ─────────────────────────────────────────────────────────
def analyze_error_code_from_status(status: int) -> str | None:
"""/analyze HTTP status → error_code 근사. 정확 값은 DB analyze_events 참조."""
if status == 404:
return "not_found_or_no_text" # 404 는 문서 미존재 또는 텍스트 없음
if status == 504:
return "timeout"
if status == 502:
return "llm"
if status == 422:
return "parse_or_missing_summary"
if status >= 500:
return "server_error"
if status >= 400:
return f"http_{status}"
return None
# ─────────────────────────────────────────────────────────
# API 호출 (재시도 포함)
# ─────────────────────────────────────────────────────────
@dataclass
class ApiResult:
status: int
body: Any
latency_ms: float
error: str | None = None
async def call_ask(
client: httpx.AsyncClient, base_url: str, token: str, source: str,
query: str,
) -> ApiResult:
url = f"{base_url.rstrip('/')}/api/search/ask"
headers = {
"Authorization": f"Bearer {token}",
"X-Source": source,
}
params = {"q": query}
start = time.perf_counter()
try:
resp = await client.get(url, headers=headers, params=params, timeout=60.0)
latency_ms = (time.perf_counter() - start) * 1000
try:
body = resp.json()
except Exception:
body = None
return ApiResult(status=resp.status_code, body=body, latency_ms=latency_ms)
except Exception as exc:
latency_ms = (time.perf_counter() - start) * 1000
return ApiResult(status=0, body=None, latency_ms=latency_ms, error=f"{type(exc).__name__}: {exc}")
async def call_analyze(
client: httpx.AsyncClient, base_url: str, token: str, source: str,
doc_id: int,
) -> ApiResult:
url = f"{base_url.rstrip('/')}/api/documents/{doc_id}/analyze"
headers = {
"Authorization": f"Bearer {token}",
"X-Source": source,
}
start = time.perf_counter()
try:
# analyze 는 서버 내부 timeout 60s (ANALYZE_TIMEOUT_S). client 는 여유 두고 130s.
resp = await client.post(url, headers=headers, timeout=130.0)
latency_ms = (time.perf_counter() - start) * 1000
try:
body = resp.json()
except Exception:
body = None
return ApiResult(status=resp.status_code, body=body, latency_ms=latency_ms)
except Exception as exc:
latency_ms = (time.perf_counter() - start) * 1000
return ApiResult(status=0, body=None, latency_ms=latency_ms, error=f"{type(exc).__name__}: {exc}")
async def call_with_retry(
fn, *args, retries: int = 1, retry_delay: float = 5.0,
) -> ApiResult:
"""timeout/5xx 1회 재시도. 4xx 는 재시도 안 함."""
attempt = 0
while True:
result: ApiResult = await fn(*args)
# 성공 (2xx) 이거나 클라이언트 에러 (4xx) 면 그대로 반환
if result.error is None and 200 <= result.status < 400:
return result
if result.status and 400 <= result.status < 500:
return result
# 네트워크 에러 or 5xx — 재시도
if attempt >= retries:
return result
attempt += 1
await asyncio.sleep(retry_delay)
# ─────────────────────────────────────────────────────────
# 단일 아이템 처리 → RESULT_FIELDS dict 생성
# ─────────────────────────────────────────────────────────
def build_row_ask(item: dict, api: ApiResult) -> dict:
body = api.body or {}
answer = body.get("ai_answer") if isinstance(body, dict) else None
citations = body.get("citations") if isinstance(body, dict) else None
completeness = body.get("completeness") if isinstance(body, dict) else None
refused = body.get("refused") if isinstance(body, dict) else None
# error 판정: network error 또는 non-2xx
error_msg = api.error
if error_msg is None and api.status and not (200 <= api.status < 300):
error_msg = f"http_{api.status}"
row = {
"id": item["id"],
"type": "ask",
"category": item.get("category"),
"golden": item.get("golden", False),
"doc_id": None,
"query": item["query"],
"response": answer,
"latency_ms": round(api.latency_ms, 1),
"error": error_msg,
"error_code": None, # ask 는 DB refused/completeness 로 판단, 별도 error_code 없음
"answer_length": len(answer or "") if isinstance(answer, str) else None,
"citation_count": len(citations) if isinstance(citations, list) else None,
"completeness": completeness,
"refused": refused,
"critical_keywords_hit": keywords_hit(answer or "", item.get("critical_keywords", []) or []),
"layers_count": None,
"expected_layers_hit": None,
"truncated": None,
"cached": None,
# prompt_version / model_name 은 API 응답에 노출 안 됨.
# 집계 시 DB ask_events 와 join 해서 결합 (X-Source=eval 필터).
"prompt_version": None,
"model_name": None,
}
return row
def build_row_analyze(item: dict, api: ApiResult) -> dict:
body = api.body or {}
layers = body.get("layers") if isinstance(body, dict) else None
truncated = body.get("truncated") if isinstance(body, dict) else None
cached = body.get("cached") if isinstance(body, dict) else None
# 응답의 layers 에서 type 리스트 추출
layers_returned: list[str] = []
if isinstance(layers, list):
for la in layers:
if isinstance(la, dict) and la.get("layer"):
layers_returned.append(la["layer"])
# 전체 response 문자열 = 각 layer content 이어붙이기 (answer_length 용)
response_text = ""
if isinstance(layers, list):
parts = []
for la in layers:
if isinstance(la, dict):
parts.append(la.get("content") or "")
response_text = "\n\n".join(p for p in parts if p)
# expected_layers_hit 계산
expected_layers = item.get("expected_layers") or []
layers_hit = {el: (el in layers_returned) for el in expected_layers}
error_msg = api.error
if error_msg is None and api.status and not (200 <= api.status < 300):
error_msg = f"http_{api.status}"
error_code = analyze_error_code_from_status(api.status) if api.status else None
row = {
"id": item["id"],
"type": "analyze",
"category": item.get("category"),
"golden": item.get("golden", False),
"doc_id": item.get("doc_id"),
"query": item["query"],
"response": response_text or None,
"latency_ms": round(api.latency_ms, 1),
"error": error_msg,
"error_code": error_code,
"answer_length": len(response_text) if response_text else None,
"citation_count": None,
"completeness": None,
"refused": None,
"critical_keywords_hit": None,
"layers_count": len(layers_returned) if layers_returned else 0,
"expected_layers_hit": layers_hit,
"truncated": truncated,
"cached": cached,
"prompt_version": None,
"model_name": None,
}
return row
# ─────────────────────────────────────────────────────────
# 필터링
# ─────────────────────────────────────────────────────────
def load_items(
path: Path,
only_golden: bool,
only_type: str | None,
start_from: str | None,
limit: int | None,
) -> list[dict]:
items: list[dict] = []
started = start_from is None
with path.open(encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line:
continue
d = json.loads(line)
if only_golden and not d.get("golden"):
continue
if only_type and d.get("type") != only_type:
continue
if not started:
if d.get("id") == start_from:
started = True
else:
continue
items.append(d)
if limit is not None and len(items) >= limit:
break
return items
# ─────────────────────────────────────────────────────────
# 메인 루프
# ─────────────────────────────────────────────────────────
async def run_eval(
items: list[dict], base_url: str, token: str, source: str,
output_path: Path, min_interval: float, retries: int, retry_delay: float,
) -> None:
output_path.parent.mkdir(parents=True, exist_ok=True)
total = len(items)
errors = 0
async with httpx.AsyncClient() as client:
with output_path.open("w", encoding="utf-8") as out:
last_request_ts = 0.0
for idx, item in enumerate(items, start=1):
# min-interval 유지
gap = time.perf_counter() - last_request_ts
if gap < min_interval:
await asyncio.sleep(min_interval - gap)
iid = item.get("id", "?")
itype = item.get("type", "?")
icat = item.get("category", "?")
last_request_ts = time.perf_counter()
if itype == "ask":
api = await call_with_retry(
call_ask, client, base_url, token, source, item["query"],
retries=retries, retry_delay=retry_delay,
)
row = build_row_ask(item, api)
elif itype == "analyze":
doc_id = item.get("doc_id")
if doc_id is None:
row = {
**{k: None for k in RESULT_FIELDS},
"id": iid, "type": "analyze", "category": icat,
"golden": item.get("golden", False),
"query": item.get("query"),
"error": "doc_id_missing", "latency_ms": 0.0,
}
else:
api = await call_with_retry(
call_analyze, client, base_url, token, source, doc_id,
retries=retries, retry_delay=retry_delay,
)
row = build_row_analyze(item, api)
else:
row = {
**{k: None for k in RESULT_FIELDS},
"id": iid, "type": itype, "category": icat,
"golden": item.get("golden", False),
"query": item.get("query"),
"error": f"unknown_type:{itype}", "latency_ms": 0.0,
}
if row.get("error"):
errors += 1
# RESULT_FIELDS 순서로 직렬화
ordered = {k: row.get(k) for k in RESULT_FIELDS}
out.write(json.dumps(ordered, ensure_ascii=False) + "\n")
out.flush()
latency = row.get("latency_ms") or 0
err_mark = " ERR" if row.get("error") else ""
print(
f"[{idx}/{total}] {iid} ({itype}/{icat}) "
f"{latency:.0f}ms{err_mark}",
flush=True,
)
print(f"\nDone. total={total}, errors={errors}", file=sys.stderr)
# ─────────────────────────────────────────────────────────
# CLI
# ─────────────────────────────────────────────────────────
def main() -> int:
parser = argparse.ArgumentParser(description="ask/analyze 평가 runner (Phase E 묶음 B)")
parser.add_argument("--eval-file", type=Path, required=True, help="평가셋 JSONL 경로")
parser.add_argument("--base-url", type=str, required=True, help="Document Server base URL")
parser.add_argument(
"--token", type=str, default=os.environ.get("DOCSRV_TOKEN"),
help="Bearer 토큰 (env DOCSRV_TOKEN)",
)
parser.add_argument("--source", type=str, default="eval", help="X-Source 헤더 값 (default: eval)")
parser.add_argument("--concurrency", type=int, default=1, help="동시 요청 수 (현재 1 고정)")
parser.add_argument("--min-interval", type=float, default=0.3, help="요청 간 최소 간격(초)")
parser.add_argument("--retries", type=int, default=1, help="timeout/5xx 재시도 횟수")
parser.add_argument("--retry-delay", type=float, default=5.0, help="재시도 delay(초)")
parser.add_argument("--output", type=Path, required=True, help="결과 JSONL 출력 경로")
parser.add_argument("--start-from", type=str, default=None, help="이 ID 부터 실행 (resume)")
parser.add_argument("--only-golden", action="store_true", help="golden: true 만 필터")
parser.add_argument(
"--only-type", type=str, default=None, choices=["ask", "analyze"],
help="type 필터 (ask 또는 analyze)",
)
parser.add_argument(
"--limit", type=int, default=None,
help="처리 아이템 수 상한 (smoke test 용)",
)
args = parser.parse_args()
if not args.token:
print("ERROR: --token 또는 env DOCSRV_TOKEN 필요", file=sys.stderr)
return 2
if args.concurrency != 1:
print("NOTE: concurrency != 1 은 현재 미지원. 1로 동작합니다.", file=sys.stderr)
items = load_items(
args.eval_file,
only_golden=args.only_golden,
only_type=args.only_type,
start_from=args.start_from,
limit=args.limit,
)
if not items:
print("ERROR: 필터 조건에 맞는 아이템 없음", file=sys.stderr)
return 2
print(
f"Loaded {len(items)} items from {args.eval_file} "
f"(golden={args.only_golden}, type={args.only_type or 'all'}, start_from={args.start_from or '-'})",
file=sys.stderr,
)
asyncio.run(
run_eval(
items, args.base_url, args.token, args.source,
args.output, args.min_interval, args.retries, args.retry_delay,
)
)
return 0
if __name__ == "__main__":
sys.exit(main())