#!/usr/bin/env python3 """Phase E 묶음 B — ask/analyze 평가 runner. evals/ask_analyze_v1.jsonl 을 읽어 /ask (GET) + /{doc_id}/analyze (POST) 엔드포인트에 호출하고 원시 측정값을 JSONL 로 기록한다. 집계(E.5)는 별도 스크립트에서 수행. 이 runner 는 "raw 값만" 저장한다. X-Source: eval 헤더를 박아 DB ask_events/analyze_events 에 source='eval' 로 분리 기록됨. """ from __future__ import annotations import argparse import asyncio import json import os import re import sys import time from dataclasses import dataclass from pathlib import Path from typing import Any import httpx # ───────────────────────────────────────────────────────── # 결과 JSONL 스키마 (순서 고정 — E.5 집계가 이 스키마 기준으로 동작) # ───────────────────────────────────────────────────────── RESULT_FIELDS: list[str] = [ "id", "type", "category", "golden", "doc_id", "query", "response", "latency_ms", "error", "error_code", "answer_length", "citation_count", "completeness", "refused", "critical_keywords_hit", "layers_count", "expected_layers_hit", "truncated", "cached", "prompt_version", "model_name", ] # 21 fields, order locked # ───────────────────────────────────────────────────────── # Normalize (critical_keywords_hit 계산용) # ───────────────────────────────────────────────────────── _NORMALIZE_REMOVE = re.compile(r"[\s·()\[\]{}.,;:!?\"'`]") def normalize(text: str) -> str: """공백/중점/괄호/구두점 제거 + 소문자화. 형태소 분석은 하지 않음. 예: "유해·위험요인 (risk)" → "유해위험요인risk" """ return _NORMALIZE_REMOVE.sub("", text).lower() def keywords_hit(answer: str, keywords: list[str]) -> dict[str, bool]: """각 키워드가 normalize 후 answer 내에 substring 매칭되는지 dict 반환.""" norm_answer = normalize(answer or "") return {kw: normalize(kw) in norm_answer for kw in keywords} # ───────────────────────────────────────────────────────── # HTTP status → analyze error_code 추정 (DB 와 정확히 동일하진 않음, 근사) # ───────────────────────────────────────────────────────── def analyze_error_code_from_status(status: int) -> str | None: """/analyze HTTP status → error_code 근사. 정확 값은 DB analyze_events 참조.""" if status == 404: return "not_found_or_no_text" # 404 는 문서 미존재 또는 텍스트 없음 if status == 504: return "timeout" if status == 502: return "llm" if status == 422: return "parse_or_missing_summary" if status >= 500: return "server_error" if status >= 400: return f"http_{status}" return None # ───────────────────────────────────────────────────────── # API 호출 (재시도 포함) # ───────────────────────────────────────────────────────── @dataclass class ApiResult: status: int body: Any latency_ms: float error: str | None = None async def call_ask( client: httpx.AsyncClient, base_url: str, token: str, source: str, query: str, eval_token: str | None = None, eval_case_id: str | None = None, ) -> ApiResult: url = f"{base_url.rstrip('/')}/api/search/ask" headers = { "Authorization": f"Bearer {token}", "X-Source": source, } if eval_token: headers["X-Eval-Token"] = eval_token if eval_case_id: headers["X-Eval-Case-Id"] = eval_case_id params = {"q": query} start = time.perf_counter() try: resp = await client.get(url, headers=headers, params=params, timeout=60.0) latency_ms = (time.perf_counter() - start) * 1000 try: body = resp.json() except Exception: body = None return ApiResult(status=resp.status_code, body=body, latency_ms=latency_ms) except Exception as exc: latency_ms = (time.perf_counter() - start) * 1000 return ApiResult(status=0, body=None, latency_ms=latency_ms, error=f"{type(exc).__name__}: {exc}") async def call_analyze( client: httpx.AsyncClient, base_url: str, token: str, source: str, doc_id: int, eval_token: str | None = None, eval_case_id: str | None = None, ) -> ApiResult: url = f"{base_url.rstrip('/')}/api/documents/{doc_id}/analyze" headers = { "Authorization": f"Bearer {token}", "X-Source": source, } if eval_token: headers["X-Eval-Token"] = eval_token if eval_case_id: headers["X-Eval-Case-Id"] = eval_case_id start = time.perf_counter() try: # analyze 는 서버 내부 timeout 60s (ANALYZE_TIMEOUT_S). client 는 여유 두고 130s. resp = await client.post(url, headers=headers, timeout=130.0) latency_ms = (time.perf_counter() - start) * 1000 try: body = resp.json() except Exception: body = None return ApiResult(status=resp.status_code, body=body, latency_ms=latency_ms) except Exception as exc: latency_ms = (time.perf_counter() - start) * 1000 return ApiResult(status=0, body=None, latency_ms=latency_ms, error=f"{type(exc).__name__}: {exc}") async def call_with_retry( fn, *args, retries: int = 1, retry_delay: float = 5.0, ) -> ApiResult: """timeout/5xx 1회 재시도. 4xx 는 재시도 안 함.""" attempt = 0 while True: result: ApiResult = await fn(*args) # 성공 (2xx) 이거나 클라이언트 에러 (4xx) 면 그대로 반환 if result.error is None and 200 <= result.status < 400: return result if result.status and 400 <= result.status < 500: return result # 네트워크 에러 or 5xx — 재시도 if attempt >= retries: return result attempt += 1 await asyncio.sleep(retry_delay) # ───────────────────────────────────────────────────────── # 단일 아이템 처리 → RESULT_FIELDS dict 생성 # ───────────────────────────────────────────────────────── def build_row_ask(item: dict, api: ApiResult) -> dict: body = api.body or {} answer = body.get("ai_answer") if isinstance(body, dict) else None citations = body.get("citations") if isinstance(body, dict) else None completeness = body.get("completeness") if isinstance(body, dict) else None refused = body.get("refused") if isinstance(body, dict) else None # error 판정: network error 또는 non-2xx error_msg = api.error if error_msg is None and api.status and not (200 <= api.status < 300): error_msg = f"http_{api.status}" row = { "id": item["id"], "type": "ask", "category": item.get("category"), "golden": item.get("golden", False), "doc_id": None, "query": item["query"], "response": answer, "latency_ms": round(api.latency_ms, 1), "error": error_msg, "error_code": None, # ask 는 DB refused/completeness 로 판단, 별도 error_code 없음 "answer_length": len(answer or "") if isinstance(answer, str) else None, "citation_count": len(citations) if isinstance(citations, list) else None, "completeness": completeness, "refused": refused, "critical_keywords_hit": keywords_hit(answer or "", item.get("critical_keywords", []) or []), "layers_count": None, "expected_layers_hit": None, "truncated": None, "cached": None, # prompt_version / model_name 은 API 응답에 노출 안 됨. # 집계 시 DB ask_events 와 join 해서 결합 (X-Source=eval 필터). "prompt_version": None, "model_name": None, } return row def build_row_analyze(item: dict, api: ApiResult) -> dict: body = api.body or {} layers = body.get("layers") if isinstance(body, dict) else None truncated = body.get("truncated") if isinstance(body, dict) else None cached = body.get("cached") if isinstance(body, dict) else None # 응답의 layers 에서 type 리스트 추출 layers_returned: list[str] = [] if isinstance(layers, list): for la in layers: if isinstance(la, dict) and la.get("layer"): layers_returned.append(la["layer"]) # 전체 response 문자열 = 각 layer content 이어붙이기 (answer_length 용) response_text = "" if isinstance(layers, list): parts = [] for la in layers: if isinstance(la, dict): parts.append(la.get("content") or "") response_text = "\n\n".join(p for p in parts if p) # expected_layers_hit 계산 expected_layers = item.get("expected_layers") or [] layers_hit = {el: (el in layers_returned) for el in expected_layers} error_msg = api.error if error_msg is None and api.status and not (200 <= api.status < 300): error_msg = f"http_{api.status}" error_code = analyze_error_code_from_status(api.status) if api.status else None row = { "id": item["id"], "type": "analyze", "category": item.get("category"), "golden": item.get("golden", False), "doc_id": item.get("doc_id"), "query": item["query"], "response": response_text or None, "latency_ms": round(api.latency_ms, 1), "error": error_msg, "error_code": error_code, "answer_length": len(response_text) if response_text else None, "citation_count": None, "completeness": None, "refused": None, "critical_keywords_hit": None, "layers_count": len(layers_returned) if layers_returned else 0, "expected_layers_hit": layers_hit, "truncated": truncated, "cached": cached, "prompt_version": None, "model_name": None, } return row # ───────────────────────────────────────────────────────── # 필터링 # ───────────────────────────────────────────────────────── def load_items( path: Path, only_golden: bool, only_type: str | None, start_from: str | None, limit: int | None, ) -> list[dict]: items: list[dict] = [] started = start_from is None with path.open(encoding="utf-8") as f: for line in f: line = line.strip() if not line: continue d = json.loads(line) if only_golden and not d.get("golden"): continue if only_type and d.get("type") != only_type: continue if not started: if d.get("id") == start_from: started = True else: continue items.append(d) if limit is not None and len(items) >= limit: break return items # ───────────────────────────────────────────────────────── # 메인 루프 # ───────────────────────────────────────────────────────── async def run_eval( items: list[dict], base_url: str, token: str, source: str, output_path: Path, min_interval: float, retries: int, retry_delay: float, eval_token: str | None = None, ) -> None: output_path.parent.mkdir(parents=True, exist_ok=True) total = len(items) errors = 0 async with httpx.AsyncClient() as client: with output_path.open("w", encoding="utf-8") as out: last_request_ts = 0.0 for idx, item in enumerate(items, start=1): # min-interval 유지 gap = time.perf_counter() - last_request_ts if gap < min_interval: await asyncio.sleep(min_interval - gap) iid = item.get("id", "?") itype = item.get("type", "?") icat = item.get("category", "?") last_request_ts = time.perf_counter() if itype == "ask": api = await call_with_retry( call_ask, client, base_url, token, source, item["query"], eval_token, iid, retries=retries, retry_delay=retry_delay, ) row = build_row_ask(item, api) elif itype == "analyze": doc_id = item.get("doc_id") if doc_id is None: row = { **{k: None for k in RESULT_FIELDS}, "id": iid, "type": "analyze", "category": icat, "golden": item.get("golden", False), "query": item.get("query"), "error": "doc_id_missing", "latency_ms": 0.0, } else: api = await call_with_retry( call_analyze, client, base_url, token, source, doc_id, eval_token, iid, retries=retries, retry_delay=retry_delay, ) row = build_row_analyze(item, api) else: row = { **{k: None for k in RESULT_FIELDS}, "id": iid, "type": itype, "category": icat, "golden": item.get("golden", False), "query": item.get("query"), "error": f"unknown_type:{itype}", "latency_ms": 0.0, } if row.get("error"): errors += 1 # RESULT_FIELDS 순서로 직렬화 ordered = {k: row.get(k) for k in RESULT_FIELDS} out.write(json.dumps(ordered, ensure_ascii=False) + "\n") out.flush() latency = row.get("latency_ms") or 0 err_mark = " ERR" if row.get("error") else "" print( f"[{idx}/{total}] {iid} ({itype}/{icat}) " f"{latency:.0f}ms{err_mark}", flush=True, ) print(f"\nDone. total={total}, errors={errors}", file=sys.stderr) # ───────────────────────────────────────────────────────── # CLI # ───────────────────────────────────────────────────────── def main() -> int: parser = argparse.ArgumentParser(description="ask/analyze 평가 runner (Phase E 묶음 B)") parser.add_argument("--eval-file", type=Path, required=True, help="평가셋 JSONL 경로") parser.add_argument("--base-url", type=str, required=True, help="Document Server base URL") parser.add_argument( "--token", type=str, default=os.environ.get("DOCSRV_TOKEN"), help="Bearer 토큰 (env DOCSRV_TOKEN)", ) parser.add_argument("--source", type=str, default="eval", help="X-Source 헤더 값 (default: eval)") parser.add_argument( "--eval-token", type=str, default=os.environ.get("EVAL_RUNNER_TOKEN"), help="X-Eval-Token 헤더 값 (env EVAL_RUNNER_TOKEN 자동 fallback). " "미지정 시 서버가 eval claim 거부 → source='document_server' 로 강등.", ) parser.add_argument("--concurrency", type=int, default=1, help="동시 요청 수 (현재 1 고정)") parser.add_argument("--min-interval", type=float, default=0.3, help="요청 간 최소 간격(초)") parser.add_argument("--retries", type=int, default=1, help="timeout/5xx 재시도 횟수") parser.add_argument("--retry-delay", type=float, default=5.0, help="재시도 delay(초)") parser.add_argument("--output", type=Path, required=True, help="결과 JSONL 출력 경로") parser.add_argument("--start-from", type=str, default=None, help="이 ID 부터 실행 (resume)") parser.add_argument("--only-golden", action="store_true", help="golden: true 만 필터") parser.add_argument( "--only-type", type=str, default=None, choices=["ask", "analyze"], help="type 필터 (ask 또는 analyze)", ) parser.add_argument( "--limit", type=int, default=None, help="처리 아이템 수 상한 (smoke test 용)", ) args = parser.parse_args() if not args.token: print("ERROR: --token 또는 env DOCSRV_TOKEN 필요", file=sys.stderr) return 2 if args.concurrency != 1: print("NOTE: concurrency != 1 은 현재 미지원. 1로 동작합니다.", file=sys.stderr) items = load_items( args.eval_file, only_golden=args.only_golden, only_type=args.only_type, start_from=args.start_from, limit=args.limit, ) if not items: print("ERROR: 필터 조건에 맞는 아이템 없음", file=sys.stderr) return 2 print( f"Loaded {len(items)} items from {args.eval_file} " f"(golden={args.only_golden}, type={args.only_type or 'all'}, start_from={args.start_from or '-'})", file=sys.stderr, ) if args.source == "eval" and not args.eval_token: print( "WARNING: --source=eval 인데 --eval-token 미지정. " "서버가 X-Source=eval 을 거부하고 source='document_server' 로 강등합니다.", file=sys.stderr, ) asyncio.run( run_eval( items, args.base_url, args.token, args.source, args.output, args.min_interval, args.retries, args.retry_delay, eval_token=args.eval_token, ) ) return 0 if __name__ == "__main__": sys.exit(main())