"""Phase 3.5 calibration CLI — ask_events 집계 + markdown report 생성. 사용법: # Docker 컨테이너 내부 (권장 — DATABASE_URL 자동 주입) docker compose exec fastapi python /app/scripts/calibrate_ask.py \\ --source eval --prompt-version search_synthesis.v1-400char \\ --run-label baseline_v1 --output reports/calibration_baseline_v1.md # 로컬 (DATABASE_URL 환경변수 필요) python scripts/calibrate_ask.py --inspect-shape 옵션: --source eval / ui_search / ui_detail / document_server / ... (미지정=전체) --prompt-version search_synthesis.v1-400char 등 --since / --until ISO8601, created_at 범위 --eval-split tuning(200) / confirm(100) / all (id 해시 기반 deterministic) --run-label report 제목/파일명 라벨 --output .md 경로 (기본 reports/calibration.md). --format json 이면 .json 도 생성 --format md (사람용) | json (compare 용 baseline) --compare-against 비교 대상 .json baseline 경로 (Δ 컬럼 출력) --sample-limit FP candidate CSV 행수 (기본 30, 케이스별 분배) --fp-artifacts FP CSV 경로 (기본 artifacts/fp_candidates_{run_label}.csv) --inspect-shape defense_layers JSON sample 5건 출력 후 abort (Q0) --threshold-overrides config/threshold_candidate.yaml — Step 0 feasibility 미해결, 미구현 --dry-run DB 미접속, tests/calibrate_fixtures/sample_ask_events.json 로드 읽기 전용 — INSERT/UPDATE/DELETE/ALTER 0건. SELECT 만. """ from __future__ import annotations import argparse import asyncio import csv import hashlib import json import os import sys from dataclasses import asdict, dataclass, field from datetime import datetime from pathlib import Path from typing import Any # 프로젝트 루트의 app/ 디렉토리를 경로에 추가 (seed_admin.py 패턴) sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "app")) from sqlalchemy import text from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine, AsyncSession # ─── 경로 / 기본값 ───────────────────────────────────────── PROJECT_ROOT = Path(__file__).resolve().parent.parent EVAL_GOLDEN_PATH = PROJECT_ROOT / "evals" / "ask_analyze_v1.jsonl" DEFAULT_REPORT = PROJECT_ROOT / "reports" / "calibration.md" ARTIFACTS_DIR = PROJECT_ROOT / "artifacts" DRY_RUN_FIXTURE = PROJECT_ROOT / "tests" / "calibrate_fixtures" / "sample_ask_events.json" # eval split 비율 (id 해시 기반 deterministic) TUNING_RATIO = 0.667 # 200 / 300 # ─── argparse ──────────────────────────────────────────── def parse_args() -> argparse.Namespace: p = argparse.ArgumentParser(description="Phase 3.5 ask_events calibration report") p.add_argument("--source", default=None, help="ask_events.source 필터 (eval / ui_search / ui_detail / 미지정=전체)") p.add_argument("--prompt-version", default=None, help="ask_events.prompt_version 필터 (예: search_synthesis.v1-400char)") p.add_argument("--since", default=None, type=datetime.fromisoformat, help="ISO8601, created_at >= since") p.add_argument("--until", default=None, type=datetime.fromisoformat, help="ISO8601, created_at < until") p.add_argument("--eval-split", choices=["tuning", "confirm", "all"], default="all", help="source='eval' 일 때 holdout split") p.add_argument("--run-label", default=None, help="report 제목/파일명 라벨") p.add_argument("--output", default=str(DEFAULT_REPORT), help="md 출력 경로") p.add_argument("--format", choices=["md", "json"], default="md", help="md 만 생성 또는 md+json 둘 다 (--format json 시)") p.add_argument("--compare-against", default=None, help="비교 대상 .json baseline 경로") p.add_argument("--sample-limit", type=int, default=30, help="FP candidate CSV 총 행수") p.add_argument("--fp-artifacts", default=None, help="FP CSV 경로") p.add_argument("--inspect-shape", action="store_true", help="defense_layers JSON sample 5건 출력 후 abort") p.add_argument("--threshold-overrides", default=None, help="config/threshold_candidate.yaml — Step 0 feasibility 미해결로 v2 미구현") p.add_argument("--dry-run", action="store_true", help="DB 미접속, fixtures 로 출력 검증") args = p.parse_args() if args.threshold_overrides: raise SystemExit( "--threshold-overrides 는 v2 미구현. Step 0 feasibility 통과 후 SQL " "reclassification 추가 예정. 1차는 baseline/candidate 를 코드 분기 run " "(코드 일시 수정 → eval replay 2회) 으로 측정." ) if not args.run_label: args.run_label = f"calibration_{datetime.now().strftime('%Y%m%d_%H%M%S')}" return args # ─── 공통 WHERE 조립 ────────────────────────────────────── def build_filters(args: argparse.Namespace) -> tuple[str, dict[str, Any]]: """공통 WHERE 절 SQL + 바인딩 파라미터. 조건 4가지: source, prompt_version, since, until. None 인 항목은 WHERE 에 포함하지 않음 (asyncpg 이 None param 의 타입 추론 실패 회피). """ clauses: list[str] = [] params: dict[str, Any] = {} if args.source is not None: clauses.append("source = :source") params["source"] = args.source if args.prompt_version is not None: clauses.append("prompt_version = :prompt_version") params["prompt_version"] = args.prompt_version if args.since is not None: clauses.append("created_at >= CAST(:since AS TIMESTAMPTZ)") params["since"] = args.since if args.until is not None: clauses.append("created_at < CAST(:until AS TIMESTAMPTZ)") params["until"] = args.until return (" AND ".join(clauses) if clauses else "TRUE"), params # ─── eval split (id 해시) ──────────────────────────────── def split_by_id_hash(case_id: str, ratio: float = TUNING_RATIO) -> str: """deterministic split — sha256(id) 의 첫 32bit 를 [0,1) 로. < ratio → 'tuning', >= ratio → 'confirm'. """ h = hashlib.sha256(case_id.encode()).digest() bucket = int.from_bytes(h[:4], "big") / 0xFFFFFFFF return "tuning" if bucket < ratio else "confirm" def load_eval_golden(path: Path) -> dict[str, dict[str, Any]]: """evals/ask_analyze_v1.jsonl → {id: case_dict}. 각 case 는 {id, type, category, query, expected_behavior, critical_keywords, ...}. """ if not path.exists(): return {} cases: dict[str, dict[str, Any]] = {} with path.open("r", encoding="utf-8") as f: for line in f: line = line.strip() if not line: continue try: obj = json.loads(line) cid = obj.get("id") if cid: cases[cid] = obj except json.JSONDecodeError: continue return cases def filter_eval_split(cases: dict[str, dict], split: str) -> set[str]: """split='all' 이면 전체 id, 아니면 split 매칭만.""" if split == "all": return set(cases.keys()) return {cid for cid in cases if split_by_id_hash(cid) == split} # ─── DB fetchers (Q0~Q8) ───────────────────────────────── async def fetch_shape_inspect(session: AsyncSession) -> list[dict]: """Q0: defense_layers 5건 stdout 검증용.""" sql = text(""" SELECT id, defense_layers, created_at FROM ask_events WHERE defense_layers IS NOT NULL ORDER BY created_at DESC LIMIT 5 """) rows = (await session.execute(sql)).mappings().all() return [dict(r) for r in rows] async def fetch_total_rows(session: AsyncSession, where: str, params: dict) -> int: sql = text(f"SELECT COUNT(*) AS n FROM ask_events WHERE {where}") return (await session.execute(sql, params)).scalar_one() async def fetch_regate_distribution(session, where, params) -> list[dict]: """Q1: defense_layers->>'re_gate' 분포.""" sql = text(f""" SELECT COALESCE(defense_layers->>'re_gate', '(null)') AS tier, COUNT(*) AS n, ROUND(100.0 * COUNT(*) / SUM(COUNT(*)) OVER (), 2) AS pct FROM ask_events WHERE {where} GROUP BY 1 ORDER BY n DESC """) return [dict(r) for r in (await session.execute(sql, params)).mappings()] async def fetch_score_histogram(session, where, params) -> list[dict]: """Q2: max_rerank_score 히스토그램 × bucket.""" sql = text(f""" SELECT CASE WHEN refused THEN 'refused' WHEN completeness = 'full' THEN 'full' WHEN completeness = 'partial' THEN 'partial' ELSE 'insufficient' END AS bucket, WIDTH_BUCKET(COALESCE(max_rerank_score, 0.0), 0.0, 1.0, 10) AS bin, COUNT(*) AS n, ROUND(AVG(max_rerank_score)::numeric, 3) AS avg_score FROM ask_events WHERE {where} GROUP BY 1, 2 ORDER BY 1, 2 """) return [dict(r) for r in (await session.execute(sql, params)).mappings()] async def fetch_classifier_confusion(session, where, params) -> list[dict]: """Q3: classifier_verdict × completeness × refused.""" sql = text(f""" SELECT COALESCE(classifier_verdict, '(null)') AS verdict, COALESCE(completeness, '(null)') AS completeness, refused, COUNT(*) AS n FROM ask_events WHERE {where} GROUP BY 1, 2, 3 ORDER BY n DESC """) return [dict(r) for r in (await session.execute(sql, params)).mappings()] async def fetch_verifier_distribution(session, where, params) -> list[dict]: """Q4: verifier severity 분포 (cast + COALESCE 안전 처리).""" sql = text(f""" SELECT COALESCE(defense_layers->'verifier'->>'status', 'n/a') AS status, COALESCE((defense_layers->'verifier'->>'medium_count')::int, 0) AS medium_count, COALESCE((defense_layers->'verifier'->>'strong_count')::int, 0) AS strong_count, COALESCE(completeness, '(null)') AS completeness, COUNT(*) AS n FROM ask_events WHERE {where} GROUP BY 1, 2, 3, 4 ORDER BY 1, 2, 3, 4 """) return [dict(r) for r in (await session.execute(sql, params)).mappings()] async def fetch_flag_frequencies(session, where, params) -> list[dict]: """Q5: hallucination_flags top-K, UNION ALL outer wrap. 출력: [{flag_type, strength, n}], n DESC, top 40. """ sql = text(f""" SELECT * FROM ( SELECT split_part(flag, ':', 1) AS flag_type, 'strong' AS strength, COUNT(*) AS n FROM ask_events, jsonb_array_elements_text(defense_layers->'grounding'->'strong') AS flag WHERE {where} GROUP BY split_part(flag, ':', 1) UNION ALL SELECT split_part(flag, ':', 1) AS flag_type, 'weak' AS strength, COUNT(*) AS n FROM ask_events, jsonb_array_elements_text(defense_layers->'grounding'->'weak') AS flag WHERE {where} GROUP BY split_part(flag, ':', 1) ) u ORDER BY n DESC LIMIT 40 """) return [dict(r) for r in (await session.execute(sql, params)).mappings()] async def fetch_fabricated_strong_rate(session, where, params) -> dict[str, float]: """B1 검증용: fabricated_number strong rate (raw count 아님). rate = (fabricated_number 가 strong 에 1+ 등장한 행) / 전체 ask_events 행. """ sql = text(f""" SELECT COUNT(*) AS total, SUM(CASE WHEN EXISTS ( SELECT 1 FROM jsonb_array_elements_text(defense_layers->'grounding'->'strong') f WHERE f LIKE 'fabricated_number:%%' ) THEN 1 ELSE 0 END) AS hit FROM ask_events WHERE {where} """) row = (await session.execute(sql, params)).mappings().one() total = int(row["total"] or 0) hit = int(row["hit"] or 0) rate = (hit / total) if total > 0 else 0.0 return {"total": total, "fabricated_strong_hit": hit, "rate": round(rate, 4)} async def fetch_eval_join_with_split( session, where, params, eval_cases: dict[str, dict], split_filter: set[str] | None, ) -> dict[str, Any]: """Q6: eval_case_id 기반 join + query string fallback. 출력: - mismatch_groups: [{expected, actual, n, sample_queries}] - eval_case_id_present: int - eval_case_id_null: int - join_failed_count: int (id 도 없고 query normalize 도 매칭 안 된 행) """ sql = text(f""" WITH ranked AS ( SELECT id, eval_case_id, query, completeness, refused, ROW_NUMBER() OVER (PARTITION BY COALESCE(eval_case_id, query) ORDER BY created_at DESC) AS rn FROM ask_events WHERE {where} AND source = 'eval' ) SELECT id, eval_case_id, query, completeness, refused FROM ranked WHERE rn = 1 """) rows = [dict(r) for r in (await session.execute(sql, params)).mappings()] # query string normalize 헬퍼 (lower + trim + 공백 단일화) import re as _re def norm(q: str | None) -> str: if not q: return "" return _re.sub(r"\s+", " ", q).strip().lower() norm_to_id = {norm(c.get("query")): cid for cid, c in eval_cases.items() if c.get("query")} eval_case_id_present = 0 eval_case_id_null = 0 join_failed_count = 0 matched_pairs: list[tuple[str, dict, str, bool]] = [] # (cid, case, actual_completeness, actual_refused) for row in rows: cid = row.get("eval_case_id") if cid: eval_case_id_present += 1 case = eval_cases.get(cid) if not case: join_failed_count += 1 continue else: eval_case_id_null += 1 cid = norm_to_id.get(norm(row.get("query"))) if not cid: join_failed_count += 1 continue case = eval_cases.get(cid) if not case: join_failed_count += 1 continue if split_filter is not None and cid not in split_filter: continue actual_completeness = row.get("completeness") or ("refused" if row.get("refused") else "(null)") matched_pairs.append((cid, case, actual_completeness, bool(row.get("refused")))) # group by (expected_behavior, actual) groups: dict[tuple[str, str], list[str]] = {} for cid, case, actual, refused in matched_pairs: expected = case.get("expected_behavior", "(unknown)") # eval JSONL 의 expected_behavior 가 'answered'/'refused'/...; actual 도 정규화 actual_norm = "refused" if refused else (actual or "(null)") key = (expected, actual_norm) groups.setdefault(key, []).append(case.get("query", "")) mismatch_groups = [] for (exp, act), queries in sorted(groups.items(), key=lambda x: -len(x[1])): mismatch_groups.append({ "expected": exp, "actual": act, "n": len(queries), "sample_queries": queries[:3], }) return { "mismatch_groups": mismatch_groups, "eval_case_id_present": eval_case_id_present, "eval_case_id_null": eval_case_id_null, "join_failed_count": join_failed_count, "matched_total": len(matched_pairs), } async def fetch_fp_candidates(session, where, params, limit: int) -> list[dict]: """Q7: 3개 case (A/B/C) UNION ALL + candidate_reason 컬럼. 각 case 별 limit/3 분배 (case 간 양 균형). """ per_case = max(1, limit // 3) sql = text(f""" WITH base AS ( SELECT id, query, completeness, refused, classifier_verdict, max_rerank_score, aggregate_score, defense_layers->'grounding'->'strong' AS g_strong, defense_layers->'verifier'->>'medium_count' AS v_medium, defense_layers->>'re_gate' AS re_gate, answer_length, prompt_version, source, eval_case_id, created_at FROM ask_events WHERE {where} ), case_a AS ( SELECT *, 'refused_high_rerank' AS candidate_reason FROM base WHERE refused = true AND COALESCE(max_rerank_score, 0.0) >= 0.35 ORDER BY created_at DESC LIMIT :per_case ), case_b AS ( SELECT *, 'insufficient_classifier_sufficient' AS candidate_reason FROM base WHERE completeness = 'insufficient' AND classifier_verdict = 'sufficient' ORDER BY created_at DESC LIMIT :per_case ), case_c AS ( SELECT *, 'partial_only_fabricated_number' AS candidate_reason FROM base WHERE completeness = 'partial' AND jsonb_array_length(COALESCE(g_strong, '[]'::jsonb)) = 1 AND (g_strong->>0) LIKE 'fabricated_number:%%' ORDER BY created_at DESC LIMIT :per_case ) SELECT * FROM case_a UNION ALL SELECT * FROM case_b UNION ALL SELECT * FROM case_c """) params2 = {**params, "per_case": per_case} return [dict(r) for r in (await session.execute(sql, params2)).mappings()] async def fetch_answer_length_distribution(session, where, params) -> list[dict]: """Q8: answer_length p25/p50/p75 × bucket.""" sql = text(f""" SELECT CASE WHEN refused THEN 'refused' ELSE COALESCE(completeness, '(null)') END AS bucket, PERCENTILE_CONT(0.25) WITHIN GROUP (ORDER BY answer_length) AS p25, PERCENTILE_CONT(0.50) WITHIN GROUP (ORDER BY answer_length) AS p50, PERCENTILE_CONT(0.75) WITHIN GROUP (ORDER BY answer_length) AS p75, AVG(answer_length)::int AS avg, COUNT(*) AS n FROM ask_events WHERE {where} AND answer_length IS NOT NULL GROUP BY 1 ORDER BY 1 """) return [dict(r) for r in (await session.execute(sql, params)).mappings()] # ─── rendering ─────────────────────────────────────────── def _md_table(headers: list[str], rows: list[list[Any]]) -> str: if not rows: return "_(empty)_\n" lines = ["| " + " | ".join(headers) + " |", "|" + "|".join(["---"] * len(headers)) + "|"] for row in rows: lines.append("| " + " | ".join(str(v) for v in row) + " |") return "\n".join(lines) + "\n" def render_markdown(sections: dict[str, Any], args: argparse.Namespace, delta: dict[str, Any] | None = None) -> str: label = args.run_label out: list[str] = [f"# Calibration Report — {label}\n"] out.append(f"Filter: source={args.source} prompt_version={args.prompt_version} " f"since={args.since} until={args.until} eval_split={args.eval_split}\n") out.append(f"Total rows: **{sections['total_rows']}**\n") # 0. shape inspect (--inspect-shape 시 본 출력 자체가 sample) if "shape_sample" in sections: out.append("## 0. defense_layers shape sample (latest 5)\n") for s in sections["shape_sample"]: out.append(f"- id={s['id']} created_at={s['created_at']}\n") out.append(" ```json\n") out.append(" " + json.dumps(s["defense_layers"], ensure_ascii=False, indent=2).replace("\n", "\n ") + "\n") out.append(" ```\n") # 1. re-gate out.append("## 1. Re-gate tier 분포\n") out.append(_md_table(["tier", "n", "pct"], [[r["tier"], r["n"], f"{r['pct']}%"] for r in sections["regate"]])) # 2. score histogram out.append("## 2. max_rerank_score 히스토그램 (bucket × bin 0~10)\n") out.append(_md_table(["bucket", "bin", "n", "avg_score"], [[r["bucket"], r["bin"], r["n"], r["avg_score"]] for r in sections["score_hist"]])) # 3. classifier confusion out.append("## 3. Classifier 혼동행렬 (verdict × completeness × refused)\n") out.append(_md_table(["verdict", "completeness", "refused", "n"], [[r["verdict"], r["completeness"], r["refused"], r["n"]] for r in sections["classifier"]])) # 4. verifier out.append("## 4. Verifier severity 분포\n") out.append(_md_table(["status", "medium_count", "strong_count", "completeness", "n"], [[r["status"], r["medium_count"], r["strong_count"], r["completeness"], r["n"]] for r in sections["verifier"]])) # 5. flags — 3개 표 (전체 / strong / weak) flags = sections["flags"] flags_strong = [f for f in flags if f["strength"] == "strong"] flags_weak = [f for f in flags if f["strength"] == "weak"] out.append("## 5. Hallucination flags top-K\n") out.append("### 5.1 전체 top-20\n") out.append(_md_table(["flag_type", "strength", "n"], [[r["flag_type"], r["strength"], r["n"]] for r in flags[:20]])) out.append("### 5.2 strong only top-10\n") out.append(_md_table(["flag_type", "n"], [[r["flag_type"], r["n"]] for r in flags_strong[:10]])) out.append("### 5.3 weak only top-10\n") out.append(_md_table(["flag_type", "n"], [[r["flag_type"], r["n"]] for r in flags_weak[:10]])) # B1 감시 — fabricated_number strong rate fab = sections["fabricated_rate"] out.append("### 5.4 fabricated_number strong rate (B1 추적용)\n") out.append(f"- total rows: {fab['total']}\n") out.append(f"- fabricated_strong hit: {fab['fabricated_strong_hit']}\n") out.append(f"- **rate: {fab['rate'] * 100:.2f}%**\n") # 6. eval mismatch (eval 일 때만) if "eval" in sections: ev = sections["eval"] out.append("## 6. Eval golden mismatch (eval_case_id 기반)\n") out.append(f"- eval_case_id present: {ev['eval_case_id_present']}\n") out.append(f"- eval_case_id null (fallback): {ev['eval_case_id_null']}\n") out.append(f"- join_failed_count: **{ev['join_failed_count']}**\n") out.append(f"- matched total: {ev['matched_total']}\n\n") out.append(_md_table(["expected", "actual", "n", "sample"], [[g["expected"], g["actual"], g["n"], " | ".join(g["sample_queries"])[:120]] for g in ev["mismatch_groups"]])) # 7. FP candidates fps = sections["fp_candidates"] out.append(f"## 7. FP candidate sample (n={len(fps)}, case A/B/C 분리)\n") out.append(f"전체 CSV: `{sections.get('fp_csv_path', '(미생성)')}`\n\n") out.append(_md_table( ["case", "id", "completeness", "refused", "verdict", "max_score", "re_gate", "query"], [[r["candidate_reason"], r["id"], r["completeness"], r["refused"], r["classifier_verdict"], r["max_rerank_score"], r["re_gate"], (r["query"] or "")[:60]] for r in fps])) # 8. answer_length out.append("## 8. answer_length 분포 (bucket × percentile)\n") out.append(_md_table(["bucket", "p25", "p50", "p75", "avg", "n"], [[r["bucket"], r["p25"], r["p50"], r["p75"], r["avg"], r["n"]] for r in sections["answer_length"]])) # 9. delta vs baseline if delta: out.append("## 9. Delta vs baseline\n") out.append("```json\n") out.append(json.dumps(delta, ensure_ascii=False, indent=2, default=str)) out.append("\n```\n") return "".join(out) def render_json(sections: dict[str, Any]) -> str: return json.dumps(sections, ensure_ascii=False, indent=2, default=str) def compute_delta(current: dict[str, Any], baseline: dict[str, Any]) -> dict[str, Any]: """간단 delta: total_rows + regate pct + fabricated_rate. 더 세밀한 비교는 향후 확장. """ delta: dict[str, Any] = {} delta["total_rows"] = { "current": current.get("total_rows"), "baseline": baseline.get("total_rows"), "diff": (current.get("total_rows") or 0) - (baseline.get("total_rows") or 0), } # regate tier 별 pct delta base_regate = {r["tier"]: float(r["pct"]) for r in baseline.get("regate", [])} cur_regate = {r["tier"]: float(r["pct"]) for r in current.get("regate", [])} delta["regate_pct_diff_pp"] = { tier: round(cur_regate.get(tier, 0.0) - base_regate.get(tier, 0.0), 2) for tier in set(base_regate) | set(cur_regate) } # fabricated rate delta cur_fr = current.get("fabricated_rate", {}).get("rate", 0.0) base_fr = baseline.get("fabricated_rate", {}).get("rate", 0.0) delta["fabricated_strong_rate"] = { "current": cur_fr, "baseline": base_fr, "diff_pp": round((cur_fr - base_fr) * 100, 2), "rel_change_pct": (round((cur_fr - base_fr) / base_fr * 100, 2) if base_fr > 0 else None), } return delta # ─── FP CSV dump ────────────────────────────────────────── def dump_fp_csv(rows: list[dict], path: Path) -> None: path.parent.mkdir(parents=True, exist_ok=True) if not rows: path.write_text("", encoding="utf-8") return # 안정된 컬럼 순서 (plan 명세) cols = [ "id", "candidate_reason", "query", "completeness", "refused", "classifier_verdict", "max_rerank_score", "aggregate_score", "g_strong", "v_medium", "re_gate", "answer_length", "prompt_version", "source", "eval_case_id", "created_at", "is_true_fp", # 사용자 수기 작성용 공란 ] with path.open("w", encoding="utf-8", newline="") as f: w = csv.DictWriter(f, fieldnames=cols) w.writeheader() for r in rows: row_out = {c: r.get(c) for c in cols if c != "is_true_fp"} row_out["is_true_fp"] = "" # JSONB / dict 는 문자열로 for k, v in list(row_out.items()): if isinstance(v, (list, dict)): row_out[k] = json.dumps(v, ensure_ascii=False) w.writerow(row_out) # ─── dry-run (DB 없이 fixture 로드) ─────────────────────── def dry_run_sections() -> dict[str, Any]: if not DRY_RUN_FIXTURE.exists(): # 최소한의 inline fixture return { "total_rows": 3, "regate": [{"tier": "clean", "n": 2, "pct": 66.67}, {"tier": "refuse(grounding_2+strong)", "n": 1, "pct": 33.33}], "score_hist": [], "classifier": [], "verifier": [], "flags": [], "fabricated_rate": {"total": 3, "fabricated_strong_hit": 0, "rate": 0.0}, "fp_candidates": [], "answer_length": [], } return json.loads(DRY_RUN_FIXTURE.read_text(encoding="utf-8")) # ─── main ───────────────────────────────────────────────── async def run(args: argparse.Namespace) -> None: if args.dry_run: sections = dry_run_sections() sections.setdefault("fp_csv_path", "(dry-run, CSV skipped)") _emit(args, sections) return # DB 연결 database_url = os.getenv( "DATABASE_URL", "postgresql+asyncpg://pkm:pkm@localhost:5432/pkm" ) engine = create_async_engine(database_url, echo=False) session_factory = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) async with session_factory() as session: if args.inspect_shape: sample = await fetch_shape_inspect(session) print(json.dumps( [{"id": s["id"], "created_at": str(s["created_at"]), "defense_layers": s["defense_layers"]} for s in sample], ensure_ascii=False, indent=2, default=str, )) await engine.dispose() return where, params = build_filters(args) total = await fetch_total_rows(session, where, params) if total == 0: print(f"WARNING: 필터 조건에 매칭되는 ask_events 행 0건. " f"source={args.source} prompt_version={args.prompt_version} " f"since={args.since} until={args.until}") sections: dict[str, Any] = {"total_rows": total} sections["regate"] = await fetch_regate_distribution(session, where, params) sections["score_hist"] = await fetch_score_histogram(session, where, params) sections["classifier"] = await fetch_classifier_confusion(session, where, params) sections["verifier"] = await fetch_verifier_distribution(session, where, params) sections["flags"] = await fetch_flag_frequencies(session, where, params) sections["fabricated_rate"] = await fetch_fabricated_strong_rate(session, where, params) sections["fp_candidates"] = await fetch_fp_candidates( session, where, params, args.sample_limit) sections["answer_length"] = await fetch_answer_length_distribution( session, where, params) # eval 전용 if args.source == "eval": cases = load_eval_golden(EVAL_GOLDEN_PATH) split_filter = (filter_eval_split(cases, args.eval_split) if args.eval_split != "all" else None) sections["eval"] = await fetch_eval_join_with_split( session, where, params, cases, split_filter) await engine.dispose() # FP CSV dump fp_csv = (Path(args.fp_artifacts) if args.fp_artifacts else ARTIFACTS_DIR / f"fp_candidates_{args.run_label}.csv") dump_fp_csv(sections["fp_candidates"], fp_csv) sections["fp_csv_path"] = str(fp_csv) _emit(args, sections) def _emit(args: argparse.Namespace, sections: dict[str, Any]) -> None: """rendering + 파일 쓰기. compare-against 처리.""" delta = None if args.compare_against: baseline_path = Path(args.compare_against) if baseline_path.exists(): baseline = json.loads(baseline_path.read_text(encoding="utf-8")) delta = compute_delta(sections, baseline) else: print(f"WARNING: compare-against baseline not found: {baseline_path}") md = render_markdown(sections, args, delta) out_path = Path(args.output) out_path.parent.mkdir(parents=True, exist_ok=True) out_path.write_text(md, encoding="utf-8") print(f"✓ markdown report: {out_path}") if args.format == "json": json_path = out_path.with_suffix(".json") json_path.write_text(render_json(sections), encoding="utf-8") print(f"✓ json baseline: {json_path}") def main() -> None: args = parse_args() asyncio.run(run(args)) if __name__ == "__main__": main()