Files
hyungi_document_server/scripts/run_eval_ask.py
Hyungi Ahn c9f766512d feat(eval): run_eval_ask runner 에 X-Eval-Token/X-Eval-Case-Id 전파 추가
배경: Phase 3.5 fix2 로 서버 /ask 는 X-Source=eval 을 받아들이려면
X-Eval-Token 이 EVAL_RUNNER_TOKEN 와 일치해야 함. runner 에 해당 헤더
주입 경로가 없어 eval 호출이 전부 source='document_server' 로 강등됐음.

변경:
- call_ask / call_analyze: eval_token, eval_case_id 인자 추가. 조건부 헤더 주입
- run_eval: eval_token 파라미터 추가
- CLI: --eval-token 플래그 추가 (env EVAL_RUNNER_TOKEN 자동 fallback)
- main(): --source=eval + --eval-token 미지정 조합에 warning 출력
- eval_case_id 는 item id 자동 전달 → ask_events.eval_case_id join 키로 활용

E.6 재측정의 source='eval' 정확 기록 선결 조건.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-17 09:12:24 +09:00

470 lines
19 KiB
Python
Executable File

#!/usr/bin/env python3
"""Phase E 묶음 B — ask/analyze 평가 runner.
evals/ask_analyze_v1.jsonl 을 읽어 /ask (GET) + /{doc_id}/analyze (POST)
엔드포인트에 호출하고 원시 측정값을 JSONL 로 기록한다.
집계(E.5)는 별도 스크립트에서 수행. 이 runner 는 "raw 값만" 저장한다.
X-Source: eval 헤더를 박아 DB ask_events/analyze_events 에 source='eval' 로 분리 기록됨.
"""
from __future__ import annotations
import argparse
import asyncio
import json
import os
import re
import sys
import time
from dataclasses import dataclass
from pathlib import Path
from typing import Any
import httpx
# ─────────────────────────────────────────────────────────
# 결과 JSONL 스키마 (순서 고정 — E.5 집계가 이 스키마 기준으로 동작)
# ─────────────────────────────────────────────────────────
RESULT_FIELDS: list[str] = [
"id", "type", "category", "golden", "doc_id",
"query", "response", "latency_ms", "error", "error_code",
"answer_length", "citation_count", "completeness", "refused",
"critical_keywords_hit",
"layers_count", "expected_layers_hit", "truncated", "cached",
"prompt_version", "model_name",
] # 21 fields, order locked
# ─────────────────────────────────────────────────────────
# Normalize (critical_keywords_hit 계산용)
# ─────────────────────────────────────────────────────────
_NORMALIZE_REMOVE = re.compile(r"[\s·()\[\]{}.,;:!?\"'`]")
def normalize(text: str) -> str:
"""공백/중점/괄호/구두점 제거 + 소문자화. 형태소 분석은 하지 않음.
예: "유해·위험요인 (risk)""유해위험요인risk"
"""
return _NORMALIZE_REMOVE.sub("", text).lower()
def keywords_hit(answer: str, keywords: list[str]) -> dict[str, bool]:
"""각 키워드가 normalize 후 answer 내에 substring 매칭되는지 dict 반환."""
norm_answer = normalize(answer or "")
return {kw: normalize(kw) in norm_answer for kw in keywords}
# ─────────────────────────────────────────────────────────
# HTTP status → analyze error_code 추정 (DB 와 정확히 동일하진 않음, 근사)
# ─────────────────────────────────────────────────────────
def analyze_error_code_from_status(status: int) -> str | None:
"""/analyze HTTP status → error_code 근사. 정확 값은 DB analyze_events 참조."""
if status == 404:
return "not_found_or_no_text" # 404 는 문서 미존재 또는 텍스트 없음
if status == 504:
return "timeout"
if status == 502:
return "llm"
if status == 422:
return "parse_or_missing_summary"
if status >= 500:
return "server_error"
if status >= 400:
return f"http_{status}"
return None
# ─────────────────────────────────────────────────────────
# API 호출 (재시도 포함)
# ─────────────────────────────────────────────────────────
@dataclass
class ApiResult:
status: int
body: Any
latency_ms: float
error: str | None = None
async def call_ask(
client: httpx.AsyncClient, base_url: str, token: str, source: str,
query: str,
eval_token: str | None = None, eval_case_id: str | None = None,
) -> ApiResult:
url = f"{base_url.rstrip('/')}/api/search/ask"
headers = {
"Authorization": f"Bearer {token}",
"X-Source": source,
}
if eval_token:
headers["X-Eval-Token"] = eval_token
if eval_case_id:
headers["X-Eval-Case-Id"] = eval_case_id
params = {"q": query}
start = time.perf_counter()
try:
resp = await client.get(url, headers=headers, params=params, timeout=60.0)
latency_ms = (time.perf_counter() - start) * 1000
try:
body = resp.json()
except Exception:
body = None
return ApiResult(status=resp.status_code, body=body, latency_ms=latency_ms)
except Exception as exc:
latency_ms = (time.perf_counter() - start) * 1000
return ApiResult(status=0, body=None, latency_ms=latency_ms, error=f"{type(exc).__name__}: {exc}")
async def call_analyze(
client: httpx.AsyncClient, base_url: str, token: str, source: str,
doc_id: int,
eval_token: str | None = None, eval_case_id: str | None = None,
) -> ApiResult:
url = f"{base_url.rstrip('/')}/api/documents/{doc_id}/analyze"
headers = {
"Authorization": f"Bearer {token}",
"X-Source": source,
}
if eval_token:
headers["X-Eval-Token"] = eval_token
if eval_case_id:
headers["X-Eval-Case-Id"] = eval_case_id
start = time.perf_counter()
try:
# analyze 는 서버 내부 timeout 60s (ANALYZE_TIMEOUT_S). client 는 여유 두고 130s.
resp = await client.post(url, headers=headers, timeout=130.0)
latency_ms = (time.perf_counter() - start) * 1000
try:
body = resp.json()
except Exception:
body = None
return ApiResult(status=resp.status_code, body=body, latency_ms=latency_ms)
except Exception as exc:
latency_ms = (time.perf_counter() - start) * 1000
return ApiResult(status=0, body=None, latency_ms=latency_ms, error=f"{type(exc).__name__}: {exc}")
async def call_with_retry(
fn, *args, retries: int = 1, retry_delay: float = 5.0,
) -> ApiResult:
"""timeout/5xx 1회 재시도. 4xx 는 재시도 안 함."""
attempt = 0
while True:
result: ApiResult = await fn(*args)
# 성공 (2xx) 이거나 클라이언트 에러 (4xx) 면 그대로 반환
if result.error is None and 200 <= result.status < 400:
return result
if result.status and 400 <= result.status < 500:
return result
# 네트워크 에러 or 5xx — 재시도
if attempt >= retries:
return result
attempt += 1
await asyncio.sleep(retry_delay)
# ─────────────────────────────────────────────────────────
# 단일 아이템 처리 → RESULT_FIELDS dict 생성
# ─────────────────────────────────────────────────────────
def build_row_ask(item: dict, api: ApiResult) -> dict:
body = api.body or {}
answer = body.get("ai_answer") if isinstance(body, dict) else None
citations = body.get("citations") if isinstance(body, dict) else None
completeness = body.get("completeness") if isinstance(body, dict) else None
refused = body.get("refused") if isinstance(body, dict) else None
# error 판정: network error 또는 non-2xx
error_msg = api.error
if error_msg is None and api.status and not (200 <= api.status < 300):
error_msg = f"http_{api.status}"
row = {
"id": item["id"],
"type": "ask",
"category": item.get("category"),
"golden": item.get("golden", False),
"doc_id": None,
"query": item["query"],
"response": answer,
"latency_ms": round(api.latency_ms, 1),
"error": error_msg,
"error_code": None, # ask 는 DB refused/completeness 로 판단, 별도 error_code 없음
"answer_length": len(answer or "") if isinstance(answer, str) else None,
"citation_count": len(citations) if isinstance(citations, list) else None,
"completeness": completeness,
"refused": refused,
"critical_keywords_hit": keywords_hit(answer or "", item.get("critical_keywords", []) or []),
"layers_count": None,
"expected_layers_hit": None,
"truncated": None,
"cached": None,
# prompt_version / model_name 은 API 응답에 노출 안 됨.
# 집계 시 DB ask_events 와 join 해서 결합 (X-Source=eval 필터).
"prompt_version": None,
"model_name": None,
}
return row
def build_row_analyze(item: dict, api: ApiResult) -> dict:
body = api.body or {}
layers = body.get("layers") if isinstance(body, dict) else None
truncated = body.get("truncated") if isinstance(body, dict) else None
cached = body.get("cached") if isinstance(body, dict) else None
# 응답의 layers 에서 type 리스트 추출
layers_returned: list[str] = []
if isinstance(layers, list):
for la in layers:
if isinstance(la, dict) and la.get("layer"):
layers_returned.append(la["layer"])
# 전체 response 문자열 = 각 layer content 이어붙이기 (answer_length 용)
response_text = ""
if isinstance(layers, list):
parts = []
for la in layers:
if isinstance(la, dict):
parts.append(la.get("content") or "")
response_text = "\n\n".join(p for p in parts if p)
# expected_layers_hit 계산
expected_layers = item.get("expected_layers") or []
layers_hit = {el: (el in layers_returned) for el in expected_layers}
error_msg = api.error
if error_msg is None and api.status and not (200 <= api.status < 300):
error_msg = f"http_{api.status}"
error_code = analyze_error_code_from_status(api.status) if api.status else None
row = {
"id": item["id"],
"type": "analyze",
"category": item.get("category"),
"golden": item.get("golden", False),
"doc_id": item.get("doc_id"),
"query": item["query"],
"response": response_text or None,
"latency_ms": round(api.latency_ms, 1),
"error": error_msg,
"error_code": error_code,
"answer_length": len(response_text) if response_text else None,
"citation_count": None,
"completeness": None,
"refused": None,
"critical_keywords_hit": None,
"layers_count": len(layers_returned) if layers_returned else 0,
"expected_layers_hit": layers_hit,
"truncated": truncated,
"cached": cached,
"prompt_version": None,
"model_name": None,
}
return row
# ─────────────────────────────────────────────────────────
# 필터링
# ─────────────────────────────────────────────────────────
def load_items(
path: Path,
only_golden: bool,
only_type: str | None,
start_from: str | None,
limit: int | None,
) -> list[dict]:
items: list[dict] = []
started = start_from is None
with path.open(encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line:
continue
d = json.loads(line)
if only_golden and not d.get("golden"):
continue
if only_type and d.get("type") != only_type:
continue
if not started:
if d.get("id") == start_from:
started = True
else:
continue
items.append(d)
if limit is not None and len(items) >= limit:
break
return items
# ─────────────────────────────────────────────────────────
# 메인 루프
# ─────────────────────────────────────────────────────────
async def run_eval(
items: list[dict], base_url: str, token: str, source: str,
output_path: Path, min_interval: float, retries: int, retry_delay: float,
eval_token: str | None = None,
) -> None:
output_path.parent.mkdir(parents=True, exist_ok=True)
total = len(items)
errors = 0
async with httpx.AsyncClient() as client:
with output_path.open("w", encoding="utf-8") as out:
last_request_ts = 0.0
for idx, item in enumerate(items, start=1):
# min-interval 유지
gap = time.perf_counter() - last_request_ts
if gap < min_interval:
await asyncio.sleep(min_interval - gap)
iid = item.get("id", "?")
itype = item.get("type", "?")
icat = item.get("category", "?")
last_request_ts = time.perf_counter()
if itype == "ask":
api = await call_with_retry(
call_ask, client, base_url, token, source, item["query"],
eval_token, iid,
retries=retries, retry_delay=retry_delay,
)
row = build_row_ask(item, api)
elif itype == "analyze":
doc_id = item.get("doc_id")
if doc_id is None:
row = {
**{k: None for k in RESULT_FIELDS},
"id": iid, "type": "analyze", "category": icat,
"golden": item.get("golden", False),
"query": item.get("query"),
"error": "doc_id_missing", "latency_ms": 0.0,
}
else:
api = await call_with_retry(
call_analyze, client, base_url, token, source, doc_id,
eval_token, iid,
retries=retries, retry_delay=retry_delay,
)
row = build_row_analyze(item, api)
else:
row = {
**{k: None for k in RESULT_FIELDS},
"id": iid, "type": itype, "category": icat,
"golden": item.get("golden", False),
"query": item.get("query"),
"error": f"unknown_type:{itype}", "latency_ms": 0.0,
}
if row.get("error"):
errors += 1
# RESULT_FIELDS 순서로 직렬화
ordered = {k: row.get(k) for k in RESULT_FIELDS}
out.write(json.dumps(ordered, ensure_ascii=False) + "\n")
out.flush()
latency = row.get("latency_ms") or 0
err_mark = " ERR" if row.get("error") else ""
print(
f"[{idx}/{total}] {iid} ({itype}/{icat}) "
f"{latency:.0f}ms{err_mark}",
flush=True,
)
print(f"\nDone. total={total}, errors={errors}", file=sys.stderr)
# ─────────────────────────────────────────────────────────
# CLI
# ─────────────────────────────────────────────────────────
def main() -> int:
parser = argparse.ArgumentParser(description="ask/analyze 평가 runner (Phase E 묶음 B)")
parser.add_argument("--eval-file", type=Path, required=True, help="평가셋 JSONL 경로")
parser.add_argument("--base-url", type=str, required=True, help="Document Server base URL")
parser.add_argument(
"--token", type=str, default=os.environ.get("DOCSRV_TOKEN"),
help="Bearer 토큰 (env DOCSRV_TOKEN)",
)
parser.add_argument("--source", type=str, default="eval", help="X-Source 헤더 값 (default: eval)")
parser.add_argument(
"--eval-token", type=str, default=os.environ.get("EVAL_RUNNER_TOKEN"),
help="X-Eval-Token 헤더 값 (env EVAL_RUNNER_TOKEN 자동 fallback). "
"미지정 시 서버가 eval claim 거부 → source='document_server' 로 강등.",
)
parser.add_argument("--concurrency", type=int, default=1, help="동시 요청 수 (현재 1 고정)")
parser.add_argument("--min-interval", type=float, default=0.3, help="요청 간 최소 간격(초)")
parser.add_argument("--retries", type=int, default=1, help="timeout/5xx 재시도 횟수")
parser.add_argument("--retry-delay", type=float, default=5.0, help="재시도 delay(초)")
parser.add_argument("--output", type=Path, required=True, help="결과 JSONL 출력 경로")
parser.add_argument("--start-from", type=str, default=None, help="이 ID 부터 실행 (resume)")
parser.add_argument("--only-golden", action="store_true", help="golden: true 만 필터")
parser.add_argument(
"--only-type", type=str, default=None, choices=["ask", "analyze"],
help="type 필터 (ask 또는 analyze)",
)
parser.add_argument(
"--limit", type=int, default=None,
help="처리 아이템 수 상한 (smoke test 용)",
)
args = parser.parse_args()
if not args.token:
print("ERROR: --token 또는 env DOCSRV_TOKEN 필요", file=sys.stderr)
return 2
if args.concurrency != 1:
print("NOTE: concurrency != 1 은 현재 미지원. 1로 동작합니다.", file=sys.stderr)
items = load_items(
args.eval_file,
only_golden=args.only_golden,
only_type=args.only_type,
start_from=args.start_from,
limit=args.limit,
)
if not items:
print("ERROR: 필터 조건에 맞는 아이템 없음", file=sys.stderr)
return 2
print(
f"Loaded {len(items)} items from {args.eval_file} "
f"(golden={args.only_golden}, type={args.only_type or 'all'}, start_from={args.start_from or '-'})",
file=sys.stderr,
)
if args.source == "eval" and not args.eval_token:
print(
"WARNING: --source=eval 인데 --eval-token 미지정. "
"서버가 X-Source=eval 을 거부하고 source='document_server' 로 강등합니다.",
file=sys.stderr,
)
asyncio.run(
run_eval(
items, args.base_url, args.token, args.source,
args.output, args.min_interval, args.retries, args.retry_delay,
eval_token=args.eval_token,
)
)
return 0
if __name__ == "__main__":
sys.exit(main())