From 06c2c35955373f6927d0268c04620f3b41cd70fd Mon Sep 17 00:00:00 2001 From: Hyungi Ahn Date: Fri, 17 Apr 2026 08:00:59 +0900 Subject: [PATCH] =?UTF-8?q?feat(scripts):=20Phase=203.5=20=E2=80=94=20cali?= =?UTF-8?q?brate=5Fask.py=20CLI=20(Q0~Q8=20+=20render=20+=20FP=20CSV)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit scripts/calibrate_ask.py — ask_events 집계 + markdown report 영구 도구. 기능: - argparse: --source / --prompt-version / --since / --until / --eval-split (tuning|confirm|all, id 해시 기반 deterministic split) / --run-label / --output / --format md|json / --compare-against / --sample-limit / --fp-artifacts / --inspect-shape / --dry-run - 9개 fetcher (모두 read-only SELECT): - Q0 defense_layers shape inspect - Q1 re-gate tier 분포 - Q2 max_rerank_score 히스토그램 (bucket × bin) - Q3 classifier 혼동행렬 - Q4 verifier severity 분포 (cast + COALESCE NULL safe) - Q5 hallucination_flags top-K (UNION ALL outer wrap, strong/weak 컬럼 유지) - Q6 eval golden mismatch (eval_case_id 기반 join + query string fallback) - Q7 FP candidate (case A/B/C 분리 + candidate_reason 컬럼 + LIMIT/3 분배) - Q8 answer_length p25/p50/p75 분포 (E.3 v1↔v2 비교 축) - markdown render + json baseline + delta compare (compare-against) - FP CSV dump (artifacts/fp_candidates_{run_label}.csv) + is_true_fp 공란 - dry-run: tests/calibrate_fixtures/sample_ask_events.json 로 출력 검증 - --threshold-overrides: Step 0 feasibility 통과 후 v2 (현재 stub raise) read-only 강제: INSERT/UPDATE/DELETE/ALTER/DROP/TRUNCATE 0건. tests/calibrate_fixtures/sample_ask_events.json: dry-run snapshot fixture. Co-Authored-By: Claude Opus 4.7 (1M context) --- scripts/calibrate_ask.py | 745 ++++++++++++++++++ .../calibrate_fixtures/sample_ask_events.json | 63 ++ 2 files changed, 808 insertions(+) create mode 100644 scripts/calibrate_ask.py create mode 100644 tests/calibrate_fixtures/sample_ask_events.json diff --git a/scripts/calibrate_ask.py b/scripts/calibrate_ask.py new file mode 100644 index 0000000..3ecdb13 --- /dev/null +++ b/scripts/calibrate_ask.py @@ -0,0 +1,745 @@ +"""Phase 3.5 calibration CLI — ask_events 집계 + markdown report 생성. + +사용법: + # Docker 컨테이너 내부 (권장 — DATABASE_URL 자동 주입) + docker compose exec fastapi python /app/scripts/calibrate_ask.py \\ + --source eval --prompt-version search_synthesis.v1-400char \\ + --run-label baseline_v1 --output reports/calibration_baseline_v1.md + + # 로컬 (DATABASE_URL 환경변수 필요) + python scripts/calibrate_ask.py --inspect-shape + +옵션: + --source eval / ui_search / ui_detail / document_server / ... (미지정=전체) + --prompt-version search_synthesis.v1-400char 등 + --since / --until ISO8601, created_at 범위 + --eval-split tuning(200) / confirm(100) / all (id 해시 기반 deterministic) + --run-label report 제목/파일명 라벨 + --output .md 경로 (기본 reports/calibration.md). --format json 이면 .json 도 생성 + --format md (사람용) | json (compare 용 baseline) + --compare-against 비교 대상 .json baseline 경로 (Δ 컬럼 출력) + --sample-limit FP candidate CSV 행수 (기본 30, 케이스별 분배) + --fp-artifacts FP CSV 경로 (기본 artifacts/fp_candidates_{run_label}.csv) + --inspect-shape defense_layers JSON sample 5건 출력 후 abort (Q0) + --threshold-overrides config/threshold_candidate.yaml — Step 0 feasibility 미해결, 미구현 + --dry-run DB 미접속, tests/calibrate_fixtures/sample_ask_events.json 로드 + +읽기 전용 — INSERT/UPDATE/DELETE/ALTER 0건. SELECT 만. +""" + +from __future__ import annotations + +import argparse +import asyncio +import csv +import hashlib +import json +import os +import sys +from dataclasses import asdict, dataclass, field +from datetime import datetime +from pathlib import Path +from typing import Any + +# 프로젝트 루트의 app/ 디렉토리를 경로에 추가 (seed_admin.py 패턴) +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "app")) + +from sqlalchemy import text +from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine, AsyncSession + +# ─── 경로 / 기본값 ───────────────────────────────────────── + +PROJECT_ROOT = Path(__file__).resolve().parent.parent +EVAL_GOLDEN_PATH = PROJECT_ROOT / "evals" / "ask_analyze_v1.jsonl" +DEFAULT_REPORT = PROJECT_ROOT / "reports" / "calibration.md" +ARTIFACTS_DIR = PROJECT_ROOT / "artifacts" +DRY_RUN_FIXTURE = PROJECT_ROOT / "tests" / "calibrate_fixtures" / "sample_ask_events.json" + +# eval split 비율 (id 해시 기반 deterministic) +TUNING_RATIO = 0.667 # 200 / 300 + + +# ─── argparse ──────────────────────────────────────────── + + +def parse_args() -> argparse.Namespace: + p = argparse.ArgumentParser(description="Phase 3.5 ask_events calibration report") + p.add_argument("--source", default=None, + help="ask_events.source 필터 (eval / ui_search / ui_detail / 미지정=전체)") + p.add_argument("--prompt-version", default=None, + help="ask_events.prompt_version 필터 (예: search_synthesis.v1-400char)") + p.add_argument("--since", default=None, help="ISO8601, created_at >= since") + p.add_argument("--until", default=None, help="ISO8601, created_at < until") + p.add_argument("--eval-split", choices=["tuning", "confirm", "all"], default="all", + help="source='eval' 일 때 holdout split") + p.add_argument("--run-label", default=None, help="report 제목/파일명 라벨") + p.add_argument("--output", default=str(DEFAULT_REPORT), help="md 출력 경로") + p.add_argument("--format", choices=["md", "json"], default="md", + help="md 만 생성 또는 md+json 둘 다 (--format json 시)") + p.add_argument("--compare-against", default=None, help="비교 대상 .json baseline 경로") + p.add_argument("--sample-limit", type=int, default=30, help="FP candidate CSV 총 행수") + p.add_argument("--fp-artifacts", default=None, help="FP CSV 경로") + p.add_argument("--inspect-shape", action="store_true", + help="defense_layers JSON sample 5건 출력 후 abort") + p.add_argument("--threshold-overrides", default=None, + help="config/threshold_candidate.yaml — Step 0 feasibility 미해결로 v2 미구현") + p.add_argument("--dry-run", action="store_true", + help="DB 미접속, fixtures 로 출력 검증") + args = p.parse_args() + if args.threshold_overrides: + raise SystemExit( + "--threshold-overrides 는 v2 미구현. Step 0 feasibility 통과 후 SQL " + "reclassification 추가 예정. 1차는 baseline/candidate 를 코드 분기 run " + "(코드 일시 수정 → eval replay 2회) 으로 측정." + ) + if not args.run_label: + args.run_label = f"calibration_{datetime.now().strftime('%Y%m%d_%H%M%S')}" + return args + + +# ─── 공통 WHERE 조립 ────────────────────────────────────── + + +def build_filters(args: argparse.Namespace) -> tuple[str, dict[str, Any]]: + """공통 WHERE 절 SQL + 바인딩 파라미터. + + 조건 4가지: source, prompt_version, since, until. + None 인 항목은 IS NULL 로 무력화 (SQL CASE 회피, 단순 OR 패턴). + """ + clauses = [ + "(:source IS NULL OR source = :source)", + "(:prompt_version IS NULL OR prompt_version = :prompt_version)", + "(:since IS NULL OR created_at >= :since::timestamptz)", + "(:until IS NULL OR created_at < :until::timestamptz)", + ] + params: dict[str, Any] = { + "source": args.source, + "prompt_version": args.prompt_version, + "since": args.since, + "until": args.until, + } + return " AND ".join(clauses), params + + +# ─── eval split (id 해시) ──────────────────────────────── + + +def split_by_id_hash(case_id: str, ratio: float = TUNING_RATIO) -> str: + """deterministic split — sha256(id) 의 첫 32bit 를 [0,1) 로. + + < ratio → 'tuning', >= ratio → 'confirm'. + """ + h = hashlib.sha256(case_id.encode()).digest() + bucket = int.from_bytes(h[:4], "big") / 0xFFFFFFFF + return "tuning" if bucket < ratio else "confirm" + + +def load_eval_golden(path: Path) -> dict[str, dict[str, Any]]: + """evals/ask_analyze_v1.jsonl → {id: case_dict}. + + 각 case 는 {id, type, category, query, expected_behavior, critical_keywords, ...}. + """ + if not path.exists(): + return {} + cases: dict[str, dict[str, Any]] = {} + with path.open("r", encoding="utf-8") as f: + for line in f: + line = line.strip() + if not line: + continue + try: + obj = json.loads(line) + cid = obj.get("id") + if cid: + cases[cid] = obj + except json.JSONDecodeError: + continue + return cases + + +def filter_eval_split(cases: dict[str, dict], split: str) -> set[str]: + """split='all' 이면 전체 id, 아니면 split 매칭만.""" + if split == "all": + return set(cases.keys()) + return {cid for cid in cases if split_by_id_hash(cid) == split} + + +# ─── DB fetchers (Q0~Q8) ───────────────────────────────── + + +async def fetch_shape_inspect(session: AsyncSession) -> list[dict]: + """Q0: defense_layers 5건 stdout 검증용.""" + sql = text(""" + SELECT id, defense_layers, created_at + FROM ask_events + WHERE defense_layers IS NOT NULL + ORDER BY created_at DESC + LIMIT 5 + """) + rows = (await session.execute(sql)).mappings().all() + return [dict(r) for r in rows] + + +async def fetch_total_rows(session: AsyncSession, where: str, params: dict) -> int: + sql = text(f"SELECT COUNT(*) AS n FROM ask_events WHERE {where}") + return (await session.execute(sql, params)).scalar_one() + + +async def fetch_regate_distribution(session, where, params) -> list[dict]: + """Q1: defense_layers->>'re_gate' 분포.""" + sql = text(f""" + SELECT + COALESCE(defense_layers->>'re_gate', '(null)') AS tier, + COUNT(*) AS n, + ROUND(100.0 * COUNT(*) / SUM(COUNT(*)) OVER (), 2) AS pct + FROM ask_events + WHERE {where} + GROUP BY 1 + ORDER BY n DESC + """) + return [dict(r) for r in (await session.execute(sql, params)).mappings()] + + +async def fetch_score_histogram(session, where, params) -> list[dict]: + """Q2: max_rerank_score 히스토그램 × bucket.""" + sql = text(f""" + SELECT + CASE WHEN refused THEN 'refused' + WHEN completeness = 'full' THEN 'full' + WHEN completeness = 'partial' THEN 'partial' + ELSE 'insufficient' END AS bucket, + WIDTH_BUCKET(COALESCE(max_rerank_score, 0.0), 0.0, 1.0, 10) AS bin, + COUNT(*) AS n, + ROUND(AVG(max_rerank_score)::numeric, 3) AS avg_score + FROM ask_events + WHERE {where} + GROUP BY 1, 2 + ORDER BY 1, 2 + """) + return [dict(r) for r in (await session.execute(sql, params)).mappings()] + + +async def fetch_classifier_confusion(session, where, params) -> list[dict]: + """Q3: classifier_verdict × completeness × refused.""" + sql = text(f""" + SELECT + COALESCE(classifier_verdict, '(null)') AS verdict, + COALESCE(completeness, '(null)') AS completeness, + refused, + COUNT(*) AS n + FROM ask_events + WHERE {where} + GROUP BY 1, 2, 3 + ORDER BY n DESC + """) + return [dict(r) for r in (await session.execute(sql, params)).mappings()] + + +async def fetch_verifier_distribution(session, where, params) -> list[dict]: + """Q4: verifier severity 분포 (cast + COALESCE 안전 처리).""" + sql = text(f""" + SELECT + COALESCE(defense_layers->'verifier'->>'status', 'n/a') AS status, + COALESCE((defense_layers->'verifier'->>'medium_count')::int, 0) AS medium_count, + COALESCE((defense_layers->'verifier'->>'strong_count')::int, 0) AS strong_count, + COALESCE(completeness, '(null)') AS completeness, + COUNT(*) AS n + FROM ask_events + WHERE {where} + GROUP BY 1, 2, 3, 4 + ORDER BY 1, 2, 3, 4 + """) + return [dict(r) for r in (await session.execute(sql, params)).mappings()] + + +async def fetch_flag_frequencies(session, where, params) -> list[dict]: + """Q5: hallucination_flags top-K, UNION ALL outer wrap. + + 출력: [{flag_type, strength, n}], n DESC, top 40. + """ + sql = text(f""" + SELECT * FROM ( + SELECT split_part(flag, ':', 1) AS flag_type, 'strong' AS strength, COUNT(*) AS n + FROM ask_events, + jsonb_array_elements_text(defense_layers->'grounding'->'strong') AS flag + WHERE {where} + GROUP BY split_part(flag, ':', 1) + UNION ALL + SELECT split_part(flag, ':', 1) AS flag_type, 'weak' AS strength, COUNT(*) AS n + FROM ask_events, + jsonb_array_elements_text(defense_layers->'grounding'->'weak') AS flag + WHERE {where} + GROUP BY split_part(flag, ':', 1) + ) u + ORDER BY n DESC + LIMIT 40 + """) + return [dict(r) for r in (await session.execute(sql, params)).mappings()] + + +async def fetch_fabricated_strong_rate(session, where, params) -> dict[str, float]: + """B1 검증용: fabricated_number strong rate (raw count 아님). + + rate = (fabricated_number 가 strong 에 1+ 등장한 행) / 전체 ask_events 행. + """ + sql = text(f""" + SELECT + COUNT(*) AS total, + SUM(CASE WHEN EXISTS ( + SELECT 1 FROM jsonb_array_elements_text(defense_layers->'grounding'->'strong') f + WHERE f LIKE 'fabricated_number:%%' + ) THEN 1 ELSE 0 END) AS hit + FROM ask_events + WHERE {where} + """) + row = (await session.execute(sql, params)).mappings().one() + total = int(row["total"] or 0) + hit = int(row["hit"] or 0) + rate = (hit / total) if total > 0 else 0.0 + return {"total": total, "fabricated_strong_hit": hit, "rate": round(rate, 4)} + + +async def fetch_eval_join_with_split( + session, where, params, eval_cases: dict[str, dict], split_filter: set[str] | None, +) -> dict[str, Any]: + """Q6: eval_case_id 기반 join + query string fallback. + + 출력: + - mismatch_groups: [{expected, actual, n, sample_queries}] + - eval_case_id_present: int + - eval_case_id_null: int + - join_failed_count: int (id 도 없고 query normalize 도 매칭 안 된 행) + """ + sql = text(f""" + WITH ranked AS ( + SELECT + id, eval_case_id, query, completeness, refused, + ROW_NUMBER() OVER (PARTITION BY COALESCE(eval_case_id, query) + ORDER BY created_at DESC) AS rn + FROM ask_events + WHERE {where} AND source = 'eval' + ) + SELECT id, eval_case_id, query, completeness, refused + FROM ranked WHERE rn = 1 + """) + rows = [dict(r) for r in (await session.execute(sql, params)).mappings()] + + # query string normalize 헬퍼 (lower + trim + 공백 단일화) + import re as _re + def norm(q: str | None) -> str: + if not q: + return "" + return _re.sub(r"\s+", " ", q).strip().lower() + + norm_to_id = {norm(c.get("query")): cid for cid, c in eval_cases.items() + if c.get("query")} + + eval_case_id_present = 0 + eval_case_id_null = 0 + join_failed_count = 0 + matched_pairs: list[tuple[str, dict, str, bool]] = [] # (cid, case, actual_completeness, actual_refused) + + for row in rows: + cid = row.get("eval_case_id") + if cid: + eval_case_id_present += 1 + case = eval_cases.get(cid) + if not case: + join_failed_count += 1 + continue + else: + eval_case_id_null += 1 + cid = norm_to_id.get(norm(row.get("query"))) + if not cid: + join_failed_count += 1 + continue + case = eval_cases.get(cid) + if not case: + join_failed_count += 1 + continue + if split_filter is not None and cid not in split_filter: + continue + actual_completeness = row.get("completeness") or ("refused" if row.get("refused") else "(null)") + matched_pairs.append((cid, case, actual_completeness, bool(row.get("refused")))) + + # group by (expected_behavior, actual) + groups: dict[tuple[str, str], list[str]] = {} + for cid, case, actual, refused in matched_pairs: + expected = case.get("expected_behavior", "(unknown)") + # eval JSONL 의 expected_behavior 가 'answered'/'refused'/...; actual 도 정규화 + actual_norm = "refused" if refused else (actual or "(null)") + key = (expected, actual_norm) + groups.setdefault(key, []).append(case.get("query", "")) + + mismatch_groups = [] + for (exp, act), queries in sorted(groups.items(), key=lambda x: -len(x[1])): + mismatch_groups.append({ + "expected": exp, + "actual": act, + "n": len(queries), + "sample_queries": queries[:3], + }) + + return { + "mismatch_groups": mismatch_groups, + "eval_case_id_present": eval_case_id_present, + "eval_case_id_null": eval_case_id_null, + "join_failed_count": join_failed_count, + "matched_total": len(matched_pairs), + } + + +async def fetch_fp_candidates(session, where, params, limit: int) -> list[dict]: + """Q7: 3개 case (A/B/C) UNION ALL + candidate_reason 컬럼. + + 각 case 별 limit/3 분배 (case 간 양 균형). + """ + per_case = max(1, limit // 3) + sql = text(f""" + WITH base AS ( + SELECT + id, query, completeness, refused, classifier_verdict, + max_rerank_score, aggregate_score, + defense_layers->'grounding'->'strong' AS g_strong, + defense_layers->'verifier'->>'medium_count' AS v_medium, + defense_layers->>'re_gate' AS re_gate, + answer_length, prompt_version, source, eval_case_id, created_at + FROM ask_events WHERE {where} + ), + case_a AS ( + SELECT *, 'refused_high_rerank' AS candidate_reason + FROM base + WHERE refused = true AND COALESCE(max_rerank_score, 0.0) >= 0.35 + ORDER BY created_at DESC LIMIT :per_case + ), + case_b AS ( + SELECT *, 'insufficient_classifier_sufficient' AS candidate_reason + FROM base + WHERE completeness = 'insufficient' AND classifier_verdict = 'sufficient' + ORDER BY created_at DESC LIMIT :per_case + ), + case_c AS ( + SELECT *, 'partial_only_fabricated_number' AS candidate_reason + FROM base + WHERE completeness = 'partial' + AND jsonb_array_length(COALESCE(g_strong, '[]'::jsonb)) = 1 + AND (g_strong->>0) LIKE 'fabricated_number:%%' + ORDER BY created_at DESC LIMIT :per_case + ) + SELECT * FROM case_a + UNION ALL SELECT * FROM case_b + UNION ALL SELECT * FROM case_c + """) + params2 = {**params, "per_case": per_case} + return [dict(r) for r in (await session.execute(sql, params2)).mappings()] + + +async def fetch_answer_length_distribution(session, where, params) -> list[dict]: + """Q8: answer_length p25/p50/p75 × bucket.""" + sql = text(f""" + SELECT + CASE WHEN refused THEN 'refused' ELSE COALESCE(completeness, '(null)') END AS bucket, + PERCENTILE_CONT(0.25) WITHIN GROUP (ORDER BY answer_length) AS p25, + PERCENTILE_CONT(0.50) WITHIN GROUP (ORDER BY answer_length) AS p50, + PERCENTILE_CONT(0.75) WITHIN GROUP (ORDER BY answer_length) AS p75, + AVG(answer_length)::int AS avg, + COUNT(*) AS n + FROM ask_events + WHERE {where} AND answer_length IS NOT NULL + GROUP BY 1 + ORDER BY 1 + """) + return [dict(r) for r in (await session.execute(sql, params)).mappings()] + + +# ─── rendering ─────────────────────────────────────────── + + +def _md_table(headers: list[str], rows: list[list[Any]]) -> str: + if not rows: + return "_(empty)_\n" + lines = ["| " + " | ".join(headers) + " |", + "|" + "|".join(["---"] * len(headers)) + "|"] + for row in rows: + lines.append("| " + " | ".join(str(v) for v in row) + " |") + return "\n".join(lines) + "\n" + + +def render_markdown(sections: dict[str, Any], args: argparse.Namespace, + delta: dict[str, Any] | None = None) -> str: + label = args.run_label + out: list[str] = [f"# Calibration Report — {label}\n"] + out.append(f"Filter: source={args.source} prompt_version={args.prompt_version} " + f"since={args.since} until={args.until} eval_split={args.eval_split}\n") + out.append(f"Total rows: **{sections['total_rows']}**\n") + + # 0. shape inspect (--inspect-shape 시 본 출력 자체가 sample) + if "shape_sample" in sections: + out.append("## 0. defense_layers shape sample (latest 5)\n") + for s in sections["shape_sample"]: + out.append(f"- id={s['id']} created_at={s['created_at']}\n") + out.append(" ```json\n") + out.append(" " + json.dumps(s["defense_layers"], ensure_ascii=False, indent=2).replace("\n", "\n ") + "\n") + out.append(" ```\n") + + # 1. re-gate + out.append("## 1. Re-gate tier 분포\n") + out.append(_md_table(["tier", "n", "pct"], + [[r["tier"], r["n"], f"{r['pct']}%"] for r in sections["regate"]])) + + # 2. score histogram + out.append("## 2. max_rerank_score 히스토그램 (bucket × bin 0~10)\n") + out.append(_md_table(["bucket", "bin", "n", "avg_score"], + [[r["bucket"], r["bin"], r["n"], r["avg_score"]] for r in sections["score_hist"]])) + + # 3. classifier confusion + out.append("## 3. Classifier 혼동행렬 (verdict × completeness × refused)\n") + out.append(_md_table(["verdict", "completeness", "refused", "n"], + [[r["verdict"], r["completeness"], r["refused"], r["n"]] for r in sections["classifier"]])) + + # 4. verifier + out.append("## 4. Verifier severity 분포\n") + out.append(_md_table(["status", "medium_count", "strong_count", "completeness", "n"], + [[r["status"], r["medium_count"], r["strong_count"], r["completeness"], r["n"]] + for r in sections["verifier"]])) + + # 5. flags — 3개 표 (전체 / strong / weak) + flags = sections["flags"] + flags_strong = [f for f in flags if f["strength"] == "strong"] + flags_weak = [f for f in flags if f["strength"] == "weak"] + out.append("## 5. Hallucination flags top-K\n") + out.append("### 5.1 전체 top-20\n") + out.append(_md_table(["flag_type", "strength", "n"], + [[r["flag_type"], r["strength"], r["n"]] for r in flags[:20]])) + out.append("### 5.2 strong only top-10\n") + out.append(_md_table(["flag_type", "n"], + [[r["flag_type"], r["n"]] for r in flags_strong[:10]])) + out.append("### 5.3 weak only top-10\n") + out.append(_md_table(["flag_type", "n"], + [[r["flag_type"], r["n"]] for r in flags_weak[:10]])) + + # B1 감시 — fabricated_number strong rate + fab = sections["fabricated_rate"] + out.append("### 5.4 fabricated_number strong rate (B1 추적용)\n") + out.append(f"- total rows: {fab['total']}\n") + out.append(f"- fabricated_strong hit: {fab['fabricated_strong_hit']}\n") + out.append(f"- **rate: {fab['rate'] * 100:.2f}%**\n") + + # 6. eval mismatch (eval 일 때만) + if "eval" in sections: + ev = sections["eval"] + out.append("## 6. Eval golden mismatch (eval_case_id 기반)\n") + out.append(f"- eval_case_id present: {ev['eval_case_id_present']}\n") + out.append(f"- eval_case_id null (fallback): {ev['eval_case_id_null']}\n") + out.append(f"- join_failed_count: **{ev['join_failed_count']}**\n") + out.append(f"- matched total: {ev['matched_total']}\n\n") + out.append(_md_table(["expected", "actual", "n", "sample"], + [[g["expected"], g["actual"], g["n"], " | ".join(g["sample_queries"])[:120]] + for g in ev["mismatch_groups"]])) + + # 7. FP candidates + fps = sections["fp_candidates"] + out.append(f"## 7. FP candidate sample (n={len(fps)}, case A/B/C 분리)\n") + out.append(f"전체 CSV: `{sections.get('fp_csv_path', '(미생성)')}`\n\n") + out.append(_md_table( + ["case", "id", "completeness", "refused", "verdict", "max_score", "re_gate", "query"], + [[r["candidate_reason"], r["id"], r["completeness"], r["refused"], + r["classifier_verdict"], r["max_rerank_score"], r["re_gate"], + (r["query"] or "")[:60]] for r in fps])) + + # 8. answer_length + out.append("## 8. answer_length 분포 (bucket × percentile)\n") + out.append(_md_table(["bucket", "p25", "p50", "p75", "avg", "n"], + [[r["bucket"], r["p25"], r["p50"], r["p75"], r["avg"], r["n"]] + for r in sections["answer_length"]])) + + # 9. delta vs baseline + if delta: + out.append("## 9. Delta vs baseline\n") + out.append("```json\n") + out.append(json.dumps(delta, ensure_ascii=False, indent=2, default=str)) + out.append("\n```\n") + + return "".join(out) + + +def render_json(sections: dict[str, Any]) -> str: + return json.dumps(sections, ensure_ascii=False, indent=2, default=str) + + +def compute_delta(current: dict[str, Any], baseline: dict[str, Any]) -> dict[str, Any]: + """간단 delta: total_rows + regate pct + fabricated_rate. + + 더 세밀한 비교는 향후 확장. + """ + delta: dict[str, Any] = {} + delta["total_rows"] = { + "current": current.get("total_rows"), + "baseline": baseline.get("total_rows"), + "diff": (current.get("total_rows") or 0) - (baseline.get("total_rows") or 0), + } + # regate tier 별 pct delta + base_regate = {r["tier"]: float(r["pct"]) for r in baseline.get("regate", [])} + cur_regate = {r["tier"]: float(r["pct"]) for r in current.get("regate", [])} + delta["regate_pct_diff_pp"] = { + tier: round(cur_regate.get(tier, 0.0) - base_regate.get(tier, 0.0), 2) + for tier in set(base_regate) | set(cur_regate) + } + # fabricated rate delta + cur_fr = current.get("fabricated_rate", {}).get("rate", 0.0) + base_fr = baseline.get("fabricated_rate", {}).get("rate", 0.0) + delta["fabricated_strong_rate"] = { + "current": cur_fr, "baseline": base_fr, + "diff_pp": round((cur_fr - base_fr) * 100, 2), + "rel_change_pct": (round((cur_fr - base_fr) / base_fr * 100, 2) + if base_fr > 0 else None), + } + return delta + + +# ─── FP CSV dump ────────────────────────────────────────── + + +def dump_fp_csv(rows: list[dict], path: Path) -> None: + path.parent.mkdir(parents=True, exist_ok=True) + if not rows: + path.write_text("", encoding="utf-8") + return + # 안정된 컬럼 순서 (plan 명세) + cols = [ + "id", "candidate_reason", "query", "completeness", "refused", + "classifier_verdict", "max_rerank_score", "aggregate_score", + "g_strong", "v_medium", "re_gate", "answer_length", + "prompt_version", "source", "eval_case_id", "created_at", + "is_true_fp", # 사용자 수기 작성용 공란 + ] + with path.open("w", encoding="utf-8", newline="") as f: + w = csv.DictWriter(f, fieldnames=cols) + w.writeheader() + for r in rows: + row_out = {c: r.get(c) for c in cols if c != "is_true_fp"} + row_out["is_true_fp"] = "" + # JSONB / dict 는 문자열로 + for k, v in list(row_out.items()): + if isinstance(v, (list, dict)): + row_out[k] = json.dumps(v, ensure_ascii=False) + w.writerow(row_out) + + +# ─── dry-run (DB 없이 fixture 로드) ─────────────────────── + + +def dry_run_sections() -> dict[str, Any]: + if not DRY_RUN_FIXTURE.exists(): + # 최소한의 inline fixture + return { + "total_rows": 3, + "regate": [{"tier": "clean", "n": 2, "pct": 66.67}, + {"tier": "refuse(grounding_2+strong)", "n": 1, "pct": 33.33}], + "score_hist": [], + "classifier": [], + "verifier": [], + "flags": [], + "fabricated_rate": {"total": 3, "fabricated_strong_hit": 0, "rate": 0.0}, + "fp_candidates": [], + "answer_length": [], + } + return json.loads(DRY_RUN_FIXTURE.read_text(encoding="utf-8")) + + +# ─── main ───────────────────────────────────────────────── + + +async def run(args: argparse.Namespace) -> None: + if args.dry_run: + sections = dry_run_sections() + sections.setdefault("fp_csv_path", "(dry-run, CSV skipped)") + _emit(args, sections) + return + + # DB 연결 + database_url = os.getenv( + "DATABASE_URL", "postgresql+asyncpg://pkm:pkm@localhost:5432/pkm" + ) + engine = create_async_engine(database_url, echo=False) + session_factory = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) + + async with session_factory() as session: + if args.inspect_shape: + sample = await fetch_shape_inspect(session) + print(json.dumps( + [{"id": s["id"], "created_at": str(s["created_at"]), + "defense_layers": s["defense_layers"]} for s in sample], + ensure_ascii=False, indent=2, default=str, + )) + await engine.dispose() + return + + where, params = build_filters(args) + total = await fetch_total_rows(session, where, params) + if total == 0: + print(f"WARNING: 필터 조건에 매칭되는 ask_events 행 0건. " + f"source={args.source} prompt_version={args.prompt_version} " + f"since={args.since} until={args.until}") + + sections: dict[str, Any] = {"total_rows": total} + sections["regate"] = await fetch_regate_distribution(session, where, params) + sections["score_hist"] = await fetch_score_histogram(session, where, params) + sections["classifier"] = await fetch_classifier_confusion(session, where, params) + sections["verifier"] = await fetch_verifier_distribution(session, where, params) + sections["flags"] = await fetch_flag_frequencies(session, where, params) + sections["fabricated_rate"] = await fetch_fabricated_strong_rate(session, where, params) + sections["fp_candidates"] = await fetch_fp_candidates( + session, where, params, args.sample_limit) + sections["answer_length"] = await fetch_answer_length_distribution( + session, where, params) + + # eval 전용 + if args.source == "eval": + cases = load_eval_golden(EVAL_GOLDEN_PATH) + split_filter = (filter_eval_split(cases, args.eval_split) + if args.eval_split != "all" else None) + sections["eval"] = await fetch_eval_join_with_split( + session, where, params, cases, split_filter) + + await engine.dispose() + + # FP CSV dump + fp_csv = (Path(args.fp_artifacts) if args.fp_artifacts else + ARTIFACTS_DIR / f"fp_candidates_{args.run_label}.csv") + dump_fp_csv(sections["fp_candidates"], fp_csv) + sections["fp_csv_path"] = str(fp_csv) + + _emit(args, sections) + + +def _emit(args: argparse.Namespace, sections: dict[str, Any]) -> None: + """rendering + 파일 쓰기. compare-against 처리.""" + delta = None + if args.compare_against: + baseline_path = Path(args.compare_against) + if baseline_path.exists(): + baseline = json.loads(baseline_path.read_text(encoding="utf-8")) + delta = compute_delta(sections, baseline) + else: + print(f"WARNING: compare-against baseline not found: {baseline_path}") + + md = render_markdown(sections, args, delta) + out_path = Path(args.output) + out_path.parent.mkdir(parents=True, exist_ok=True) + out_path.write_text(md, encoding="utf-8") + print(f"✓ markdown report: {out_path}") + + if args.format == "json": + json_path = out_path.with_suffix(".json") + json_path.write_text(render_json(sections), encoding="utf-8") + print(f"✓ json baseline: {json_path}") + + +def main() -> None: + args = parse_args() + asyncio.run(run(args)) + + +if __name__ == "__main__": + main() diff --git a/tests/calibrate_fixtures/sample_ask_events.json b/tests/calibrate_fixtures/sample_ask_events.json new file mode 100644 index 0000000..9756cc2 --- /dev/null +++ b/tests/calibrate_fixtures/sample_ask_events.json @@ -0,0 +1,63 @@ +{ + "total_rows": 10, + "regate": [ + {"tier": "clean", "n": 5, "pct": 50.0}, + {"tier": "partial(strong_or_negation)", "n": 3, "pct": 30.0}, + {"tier": "refuse(grounding_2+strong)", "n": 1, "pct": 10.0}, + {"tier": "conf_low(medium_x3)", "n": 1, "pct": 10.0} + ], + "score_hist": [ + {"bucket": "full", "bin": 9, "n": 4, "avg_score": 0.87}, + {"bucket": "full", "bin": 8, "n": 1, "avg_score": 0.78}, + {"bucket": "partial", "bin": 5, "n": 3, "avg_score": 0.51}, + {"bucket": "refused", "bin": 2, "n": 1, "avg_score": 0.18}, + {"bucket": "insufficient", "bin": 1, "n": 1, "avg_score": 0.08} + ], + "classifier": [ + {"verdict": "sufficient", "completeness": "full", "refused": false, "n": 5}, + {"verdict": "sufficient", "completeness": "partial", "refused": false, "n": 3}, + {"verdict": "insufficient", "completeness": "insufficient", "refused": true, "n": 2} + ], + "verifier": [ + {"status": "ok", "medium_count": 0, "strong_count": 0, "completeness": "full", "n": 5}, + {"status": "ok", "medium_count": 1, "strong_count": 0, "completeness": "partial", "n": 2}, + {"status": "ok", "medium_count": 3, "strong_count": 0, "completeness": "partial", "n": 1}, + {"status": "skipped", "medium_count": 0, "strong_count": 0, "completeness": "insufficient", "n": 2} + ], + "flags": [ + {"flag_type": "fabricated_number", "strength": "strong", "n": 2}, + {"flag_type": "uncited_claim", "strength": "weak", "n": 4}, + {"flag_type": "low_overlap", "strength": "weak", "n": 3}, + {"flag_type": "intent_misalignment", "strength": "strong", "n": 1} + ], + "fabricated_rate": { + "total": 10, + "fabricated_strong_hit": 2, + "rate": 0.2 + }, + "fp_candidates": [ + { + "id": 101, + "candidate_reason": "refused_high_rerank", + "query": "샘플 질의 1", + "completeness": "insufficient", + "refused": true, + "classifier_verdict": "insufficient", + "max_rerank_score": 0.42, + "aggregate_score": 1.05, + "g_strong": [], + "v_medium": "0", + "re_gate": "refuse(score_gate)", + "answer_length": 0, + "prompt_version": "search_synthesis.v1-400char", + "source": "eval", + "eval_case_id": "ask_def_001", + "created_at": "2026-04-17T08:00:00+00:00" + } + ], + "answer_length": [ + {"bucket": "full", "p25": 280, "p50": 350, "p75": 395, "avg": 340, "n": 5}, + {"bucket": "partial", "p25": 200, "p50": 260, "p75": 320, "avg": 255, "n": 3}, + {"bucket": "refused", "p25": 0, "p50": 0, "p75": 0, "avg": 0, "n": 2} + ] +}