Files
hyungi_document_server/scripts/calibrate_ask.py
T
Hyungi Ahn 3a3cd832f6 fix(scripts): calibrate_ask.py --since/--until datetime 파싱
asyncpg 이 TIMESTAMPTZ 파라미터에 문자열 대신 datetime 객체를 요구
(DataError: invalid input, expected datetime instance, got str).
argparse type=datetime.fromisoformat 로 CLI 단계에서 파싱.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-17 08:22:01 +09:00

750 lines
31 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Phase 3.5 calibration CLI — ask_events 집계 + markdown report 생성.
사용법:
# Docker 컨테이너 내부 (권장 — DATABASE_URL 자동 주입)
docker compose exec fastapi python /app/scripts/calibrate_ask.py \\
--source eval --prompt-version search_synthesis.v1-400char \\
--run-label baseline_v1 --output reports/calibration_baseline_v1.md
# 로컬 (DATABASE_URL 환경변수 필요)
python scripts/calibrate_ask.py --inspect-shape
옵션:
--source eval / ui_search / ui_detail / document_server / ... (미지정=전체)
--prompt-version search_synthesis.v1-400char 등
--since / --until ISO8601, created_at 범위
--eval-split tuning(200) / confirm(100) / all (id 해시 기반 deterministic)
--run-label report 제목/파일명 라벨
--output .md 경로 (기본 reports/calibration.md). --format json 이면 .json 도 생성
--format md (사람용) | json (compare 용 baseline)
--compare-against 비교 대상 .json baseline 경로 (Δ 컬럼 출력)
--sample-limit FP candidate CSV 행수 (기본 30, 케이스별 분배)
--fp-artifacts FP CSV 경로 (기본 artifacts/fp_candidates_{run_label}.csv)
--inspect-shape defense_layers JSON sample 5건 출력 후 abort (Q0)
--threshold-overrides config/threshold_candidate.yaml — Step 0 feasibility 미해결, 미구현
--dry-run DB 미접속, tests/calibrate_fixtures/sample_ask_events.json 로드
읽기 전용 — INSERT/UPDATE/DELETE/ALTER 0건. SELECT 만.
"""
from __future__ import annotations
import argparse
import asyncio
import csv
import hashlib
import json
import os
import sys
from dataclasses import asdict, dataclass, field
from datetime import datetime
from pathlib import Path
from typing import Any
# 프로젝트 루트의 app/ 디렉토리를 경로에 추가 (seed_admin.py 패턴)
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "app"))
from sqlalchemy import text
from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine, AsyncSession
# ─── 경로 / 기본값 ─────────────────────────────────────────
PROJECT_ROOT = Path(__file__).resolve().parent.parent
EVAL_GOLDEN_PATH = PROJECT_ROOT / "evals" / "ask_analyze_v1.jsonl"
DEFAULT_REPORT = PROJECT_ROOT / "reports" / "calibration.md"
ARTIFACTS_DIR = PROJECT_ROOT / "artifacts"
DRY_RUN_FIXTURE = PROJECT_ROOT / "tests" / "calibrate_fixtures" / "sample_ask_events.json"
# eval split 비율 (id 해시 기반 deterministic)
TUNING_RATIO = 0.667 # 200 / 300
# ─── argparse ────────────────────────────────────────────
def parse_args() -> argparse.Namespace:
p = argparse.ArgumentParser(description="Phase 3.5 ask_events calibration report")
p.add_argument("--source", default=None,
help="ask_events.source 필터 (eval / ui_search / ui_detail / 미지정=전체)")
p.add_argument("--prompt-version", default=None,
help="ask_events.prompt_version 필터 (예: search_synthesis.v1-400char)")
p.add_argument("--since", default=None, type=datetime.fromisoformat,
help="ISO8601, created_at >= since")
p.add_argument("--until", default=None, type=datetime.fromisoformat,
help="ISO8601, created_at < until")
p.add_argument("--eval-split", choices=["tuning", "confirm", "all"], default="all",
help="source='eval' 일 때 holdout split")
p.add_argument("--run-label", default=None, help="report 제목/파일명 라벨")
p.add_argument("--output", default=str(DEFAULT_REPORT), help="md 출력 경로")
p.add_argument("--format", choices=["md", "json"], default="md",
help="md 만 생성 또는 md+json 둘 다 (--format json 시)")
p.add_argument("--compare-against", default=None, help="비교 대상 .json baseline 경로")
p.add_argument("--sample-limit", type=int, default=30, help="FP candidate CSV 총 행수")
p.add_argument("--fp-artifacts", default=None, help="FP CSV 경로")
p.add_argument("--inspect-shape", action="store_true",
help="defense_layers JSON sample 5건 출력 후 abort")
p.add_argument("--threshold-overrides", default=None,
help="config/threshold_candidate.yaml — Step 0 feasibility 미해결로 v2 미구현")
p.add_argument("--dry-run", action="store_true",
help="DB 미접속, fixtures 로 출력 검증")
args = p.parse_args()
if args.threshold_overrides:
raise SystemExit(
"--threshold-overrides 는 v2 미구현. Step 0 feasibility 통과 후 SQL "
"reclassification 추가 예정. 1차는 baseline/candidate 를 코드 분기 run "
"(코드 일시 수정 → eval replay 2회) 으로 측정."
)
if not args.run_label:
args.run_label = f"calibration_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
return args
# ─── 공통 WHERE 조립 ──────────────────────────────────────
def build_filters(args: argparse.Namespace) -> tuple[str, dict[str, Any]]:
"""공통 WHERE 절 SQL + 바인딩 파라미터.
조건 4가지: source, prompt_version, since, until.
None 인 항목은 WHERE 에 포함하지 않음 (asyncpg 이 None param 의 타입 추론 실패 회피).
"""
clauses: list[str] = []
params: dict[str, Any] = {}
if args.source is not None:
clauses.append("source = :source")
params["source"] = args.source
if args.prompt_version is not None:
clauses.append("prompt_version = :prompt_version")
params["prompt_version"] = args.prompt_version
if args.since is not None:
clauses.append("created_at >= CAST(:since AS TIMESTAMPTZ)")
params["since"] = args.since
if args.until is not None:
clauses.append("created_at < CAST(:until AS TIMESTAMPTZ)")
params["until"] = args.until
return (" AND ".join(clauses) if clauses else "TRUE"), params
# ─── eval split (id 해시) ────────────────────────────────
def split_by_id_hash(case_id: str, ratio: float = TUNING_RATIO) -> str:
"""deterministic split — sha256(id) 의 첫 32bit 를 [0,1) 로.
< ratio → 'tuning', >= ratio → 'confirm'.
"""
h = hashlib.sha256(case_id.encode()).digest()
bucket = int.from_bytes(h[:4], "big") / 0xFFFFFFFF
return "tuning" if bucket < ratio else "confirm"
def load_eval_golden(path: Path) -> dict[str, dict[str, Any]]:
"""evals/ask_analyze_v1.jsonl → {id: case_dict}.
각 case 는 {id, type, category, query, expected_behavior, critical_keywords, ...}.
"""
if not path.exists():
return {}
cases: dict[str, dict[str, Any]] = {}
with path.open("r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line:
continue
try:
obj = json.loads(line)
cid = obj.get("id")
if cid:
cases[cid] = obj
except json.JSONDecodeError:
continue
return cases
def filter_eval_split(cases: dict[str, dict], split: str) -> set[str]:
"""split='all' 이면 전체 id, 아니면 split 매칭만."""
if split == "all":
return set(cases.keys())
return {cid for cid in cases if split_by_id_hash(cid) == split}
# ─── DB fetchers (Q0~Q8) ─────────────────────────────────
async def fetch_shape_inspect(session: AsyncSession) -> list[dict]:
"""Q0: defense_layers 5건 stdout 검증용."""
sql = text("""
SELECT id, defense_layers, created_at
FROM ask_events
WHERE defense_layers IS NOT NULL
ORDER BY created_at DESC
LIMIT 5
""")
rows = (await session.execute(sql)).mappings().all()
return [dict(r) for r in rows]
async def fetch_total_rows(session: AsyncSession, where: str, params: dict) -> int:
sql = text(f"SELECT COUNT(*) AS n FROM ask_events WHERE {where}")
return (await session.execute(sql, params)).scalar_one()
async def fetch_regate_distribution(session, where, params) -> list[dict]:
"""Q1: defense_layers->>'re_gate' 분포."""
sql = text(f"""
SELECT
COALESCE(defense_layers->>'re_gate', '(null)') AS tier,
COUNT(*) AS n,
ROUND(100.0 * COUNT(*) / SUM(COUNT(*)) OVER (), 2) AS pct
FROM ask_events
WHERE {where}
GROUP BY 1
ORDER BY n DESC
""")
return [dict(r) for r in (await session.execute(sql, params)).mappings()]
async def fetch_score_histogram(session, where, params) -> list[dict]:
"""Q2: max_rerank_score 히스토그램 × bucket."""
sql = text(f"""
SELECT
CASE WHEN refused THEN 'refused'
WHEN completeness = 'full' THEN 'full'
WHEN completeness = 'partial' THEN 'partial'
ELSE 'insufficient' END AS bucket,
WIDTH_BUCKET(COALESCE(max_rerank_score, 0.0), 0.0, 1.0, 10) AS bin,
COUNT(*) AS n,
ROUND(AVG(max_rerank_score)::numeric, 3) AS avg_score
FROM ask_events
WHERE {where}
GROUP BY 1, 2
ORDER BY 1, 2
""")
return [dict(r) for r in (await session.execute(sql, params)).mappings()]
async def fetch_classifier_confusion(session, where, params) -> list[dict]:
"""Q3: classifier_verdict × completeness × refused."""
sql = text(f"""
SELECT
COALESCE(classifier_verdict, '(null)') AS verdict,
COALESCE(completeness, '(null)') AS completeness,
refused,
COUNT(*) AS n
FROM ask_events
WHERE {where}
GROUP BY 1, 2, 3
ORDER BY n DESC
""")
return [dict(r) for r in (await session.execute(sql, params)).mappings()]
async def fetch_verifier_distribution(session, where, params) -> list[dict]:
"""Q4: verifier severity 분포 (cast + COALESCE 안전 처리)."""
sql = text(f"""
SELECT
COALESCE(defense_layers->'verifier'->>'status', 'n/a') AS status,
COALESCE((defense_layers->'verifier'->>'medium_count')::int, 0) AS medium_count,
COALESCE((defense_layers->'verifier'->>'strong_count')::int, 0) AS strong_count,
COALESCE(completeness, '(null)') AS completeness,
COUNT(*) AS n
FROM ask_events
WHERE {where}
GROUP BY 1, 2, 3, 4
ORDER BY 1, 2, 3, 4
""")
return [dict(r) for r in (await session.execute(sql, params)).mappings()]
async def fetch_flag_frequencies(session, where, params) -> list[dict]:
"""Q5: hallucination_flags top-K, UNION ALL outer wrap.
출력: [{flag_type, strength, n}], n DESC, top 40.
"""
sql = text(f"""
SELECT * FROM (
SELECT split_part(flag, ':', 1) AS flag_type, 'strong' AS strength, COUNT(*) AS n
FROM ask_events,
jsonb_array_elements_text(defense_layers->'grounding'->'strong') AS flag
WHERE {where}
GROUP BY split_part(flag, ':', 1)
UNION ALL
SELECT split_part(flag, ':', 1) AS flag_type, 'weak' AS strength, COUNT(*) AS n
FROM ask_events,
jsonb_array_elements_text(defense_layers->'grounding'->'weak') AS flag
WHERE {where}
GROUP BY split_part(flag, ':', 1)
) u
ORDER BY n DESC
LIMIT 40
""")
return [dict(r) for r in (await session.execute(sql, params)).mappings()]
async def fetch_fabricated_strong_rate(session, where, params) -> dict[str, float]:
"""B1 검증용: fabricated_number strong rate (raw count 아님).
rate = (fabricated_number 가 strong 에 1+ 등장한 행) / 전체 ask_events 행.
"""
sql = text(f"""
SELECT
COUNT(*) AS total,
SUM(CASE WHEN EXISTS (
SELECT 1 FROM jsonb_array_elements_text(defense_layers->'grounding'->'strong') f
WHERE f LIKE 'fabricated_number:%%'
) THEN 1 ELSE 0 END) AS hit
FROM ask_events
WHERE {where}
""")
row = (await session.execute(sql, params)).mappings().one()
total = int(row["total"] or 0)
hit = int(row["hit"] or 0)
rate = (hit / total) if total > 0 else 0.0
return {"total": total, "fabricated_strong_hit": hit, "rate": round(rate, 4)}
async def fetch_eval_join_with_split(
session, where, params, eval_cases: dict[str, dict], split_filter: set[str] | None,
) -> dict[str, Any]:
"""Q6: eval_case_id 기반 join + query string fallback.
출력:
- mismatch_groups: [{expected, actual, n, sample_queries}]
- eval_case_id_present: int
- eval_case_id_null: int
- join_failed_count: int (id 도 없고 query normalize 도 매칭 안 된 행)
"""
sql = text(f"""
WITH ranked AS (
SELECT
id, eval_case_id, query, completeness, refused,
ROW_NUMBER() OVER (PARTITION BY COALESCE(eval_case_id, query)
ORDER BY created_at DESC) AS rn
FROM ask_events
WHERE {where} AND source = 'eval'
)
SELECT id, eval_case_id, query, completeness, refused
FROM ranked WHERE rn = 1
""")
rows = [dict(r) for r in (await session.execute(sql, params)).mappings()]
# query string normalize 헬퍼 (lower + trim + 공백 단일화)
import re as _re
def norm(q: str | None) -> str:
if not q:
return ""
return _re.sub(r"\s+", " ", q).strip().lower()
norm_to_id = {norm(c.get("query")): cid for cid, c in eval_cases.items()
if c.get("query")}
eval_case_id_present = 0
eval_case_id_null = 0
join_failed_count = 0
matched_pairs: list[tuple[str, dict, str, bool]] = [] # (cid, case, actual_completeness, actual_refused)
for row in rows:
cid = row.get("eval_case_id")
if cid:
eval_case_id_present += 1
case = eval_cases.get(cid)
if not case:
join_failed_count += 1
continue
else:
eval_case_id_null += 1
cid = norm_to_id.get(norm(row.get("query")))
if not cid:
join_failed_count += 1
continue
case = eval_cases.get(cid)
if not case:
join_failed_count += 1
continue
if split_filter is not None and cid not in split_filter:
continue
actual_completeness = row.get("completeness") or ("refused" if row.get("refused") else "(null)")
matched_pairs.append((cid, case, actual_completeness, bool(row.get("refused"))))
# group by (expected_behavior, actual)
groups: dict[tuple[str, str], list[str]] = {}
for cid, case, actual, refused in matched_pairs:
expected = case.get("expected_behavior", "(unknown)")
# eval JSONL 의 expected_behavior 가 'answered'/'refused'/...; actual 도 정규화
actual_norm = "refused" if refused else (actual or "(null)")
key = (expected, actual_norm)
groups.setdefault(key, []).append(case.get("query", ""))
mismatch_groups = []
for (exp, act), queries in sorted(groups.items(), key=lambda x: -len(x[1])):
mismatch_groups.append({
"expected": exp,
"actual": act,
"n": len(queries),
"sample_queries": queries[:3],
})
return {
"mismatch_groups": mismatch_groups,
"eval_case_id_present": eval_case_id_present,
"eval_case_id_null": eval_case_id_null,
"join_failed_count": join_failed_count,
"matched_total": len(matched_pairs),
}
async def fetch_fp_candidates(session, where, params, limit: int) -> list[dict]:
"""Q7: 3개 case (A/B/C) UNION ALL + candidate_reason 컬럼.
각 case 별 limit/3 분배 (case 간 양 균형).
"""
per_case = max(1, limit // 3)
sql = text(f"""
WITH base AS (
SELECT
id, query, completeness, refused, classifier_verdict,
max_rerank_score, aggregate_score,
defense_layers->'grounding'->'strong' AS g_strong,
defense_layers->'verifier'->>'medium_count' AS v_medium,
defense_layers->>'re_gate' AS re_gate,
answer_length, prompt_version, source, eval_case_id, created_at
FROM ask_events WHERE {where}
),
case_a AS (
SELECT *, 'refused_high_rerank' AS candidate_reason
FROM base
WHERE refused = true AND COALESCE(max_rerank_score, 0.0) >= 0.35
ORDER BY created_at DESC LIMIT :per_case
),
case_b AS (
SELECT *, 'insufficient_classifier_sufficient' AS candidate_reason
FROM base
WHERE completeness = 'insufficient' AND classifier_verdict = 'sufficient'
ORDER BY created_at DESC LIMIT :per_case
),
case_c AS (
SELECT *, 'partial_only_fabricated_number' AS candidate_reason
FROM base
WHERE completeness = 'partial'
AND jsonb_array_length(COALESCE(g_strong, '[]'::jsonb)) = 1
AND (g_strong->>0) LIKE 'fabricated_number:%%'
ORDER BY created_at DESC LIMIT :per_case
)
SELECT * FROM case_a
UNION ALL SELECT * FROM case_b
UNION ALL SELECT * FROM case_c
""")
params2 = {**params, "per_case": per_case}
return [dict(r) for r in (await session.execute(sql, params2)).mappings()]
async def fetch_answer_length_distribution(session, where, params) -> list[dict]:
"""Q8: answer_length p25/p50/p75 × bucket."""
sql = text(f"""
SELECT
CASE WHEN refused THEN 'refused' ELSE COALESCE(completeness, '(null)') END AS bucket,
PERCENTILE_CONT(0.25) WITHIN GROUP (ORDER BY answer_length) AS p25,
PERCENTILE_CONT(0.50) WITHIN GROUP (ORDER BY answer_length) AS p50,
PERCENTILE_CONT(0.75) WITHIN GROUP (ORDER BY answer_length) AS p75,
AVG(answer_length)::int AS avg,
COUNT(*) AS n
FROM ask_events
WHERE {where} AND answer_length IS NOT NULL
GROUP BY 1
ORDER BY 1
""")
return [dict(r) for r in (await session.execute(sql, params)).mappings()]
# ─── rendering ───────────────────────────────────────────
def _md_table(headers: list[str], rows: list[list[Any]]) -> str:
if not rows:
return "_(empty)_\n"
lines = ["| " + " | ".join(headers) + " |",
"|" + "|".join(["---"] * len(headers)) + "|"]
for row in rows:
lines.append("| " + " | ".join(str(v) for v in row) + " |")
return "\n".join(lines) + "\n"
def render_markdown(sections: dict[str, Any], args: argparse.Namespace,
delta: dict[str, Any] | None = None) -> str:
label = args.run_label
out: list[str] = [f"# Calibration Report — {label}\n"]
out.append(f"Filter: source={args.source} prompt_version={args.prompt_version} "
f"since={args.since} until={args.until} eval_split={args.eval_split}\n")
out.append(f"Total rows: **{sections['total_rows']}**\n")
# 0. shape inspect (--inspect-shape 시 본 출력 자체가 sample)
if "shape_sample" in sections:
out.append("## 0. defense_layers shape sample (latest 5)\n")
for s in sections["shape_sample"]:
out.append(f"- id={s['id']} created_at={s['created_at']}\n")
out.append(" ```json\n")
out.append(" " + json.dumps(s["defense_layers"], ensure_ascii=False, indent=2).replace("\n", "\n ") + "\n")
out.append(" ```\n")
# 1. re-gate
out.append("## 1. Re-gate tier 분포\n")
out.append(_md_table(["tier", "n", "pct"],
[[r["tier"], r["n"], f"{r['pct']}%"] for r in sections["regate"]]))
# 2. score histogram
out.append("## 2. max_rerank_score 히스토그램 (bucket × bin 0~10)\n")
out.append(_md_table(["bucket", "bin", "n", "avg_score"],
[[r["bucket"], r["bin"], r["n"], r["avg_score"]] for r in sections["score_hist"]]))
# 3. classifier confusion
out.append("## 3. Classifier 혼동행렬 (verdict × completeness × refused)\n")
out.append(_md_table(["verdict", "completeness", "refused", "n"],
[[r["verdict"], r["completeness"], r["refused"], r["n"]] for r in sections["classifier"]]))
# 4. verifier
out.append("## 4. Verifier severity 분포\n")
out.append(_md_table(["status", "medium_count", "strong_count", "completeness", "n"],
[[r["status"], r["medium_count"], r["strong_count"], r["completeness"], r["n"]]
for r in sections["verifier"]]))
# 5. flags — 3개 표 (전체 / strong / weak)
flags = sections["flags"]
flags_strong = [f for f in flags if f["strength"] == "strong"]
flags_weak = [f for f in flags if f["strength"] == "weak"]
out.append("## 5. Hallucination flags top-K\n")
out.append("### 5.1 전체 top-20\n")
out.append(_md_table(["flag_type", "strength", "n"],
[[r["flag_type"], r["strength"], r["n"]] for r in flags[:20]]))
out.append("### 5.2 strong only top-10\n")
out.append(_md_table(["flag_type", "n"],
[[r["flag_type"], r["n"]] for r in flags_strong[:10]]))
out.append("### 5.3 weak only top-10\n")
out.append(_md_table(["flag_type", "n"],
[[r["flag_type"], r["n"]] for r in flags_weak[:10]]))
# B1 감시 — fabricated_number strong rate
fab = sections["fabricated_rate"]
out.append("### 5.4 fabricated_number strong rate (B1 추적용)\n")
out.append(f"- total rows: {fab['total']}\n")
out.append(f"- fabricated_strong hit: {fab['fabricated_strong_hit']}\n")
out.append(f"- **rate: {fab['rate'] * 100:.2f}%**\n")
# 6. eval mismatch (eval 일 때만)
if "eval" in sections:
ev = sections["eval"]
out.append("## 6. Eval golden mismatch (eval_case_id 기반)\n")
out.append(f"- eval_case_id present: {ev['eval_case_id_present']}\n")
out.append(f"- eval_case_id null (fallback): {ev['eval_case_id_null']}\n")
out.append(f"- join_failed_count: **{ev['join_failed_count']}**\n")
out.append(f"- matched total: {ev['matched_total']}\n\n")
out.append(_md_table(["expected", "actual", "n", "sample"],
[[g["expected"], g["actual"], g["n"], " | ".join(g["sample_queries"])[:120]]
for g in ev["mismatch_groups"]]))
# 7. FP candidates
fps = sections["fp_candidates"]
out.append(f"## 7. FP candidate sample (n={len(fps)}, case A/B/C 분리)\n")
out.append(f"전체 CSV: `{sections.get('fp_csv_path', '(미생성)')}`\n\n")
out.append(_md_table(
["case", "id", "completeness", "refused", "verdict", "max_score", "re_gate", "query"],
[[r["candidate_reason"], r["id"], r["completeness"], r["refused"],
r["classifier_verdict"], r["max_rerank_score"], r["re_gate"],
(r["query"] or "")[:60]] for r in fps]))
# 8. answer_length
out.append("## 8. answer_length 분포 (bucket × percentile)\n")
out.append(_md_table(["bucket", "p25", "p50", "p75", "avg", "n"],
[[r["bucket"], r["p25"], r["p50"], r["p75"], r["avg"], r["n"]]
for r in sections["answer_length"]]))
# 9. delta vs baseline
if delta:
out.append("## 9. Delta vs baseline\n")
out.append("```json\n")
out.append(json.dumps(delta, ensure_ascii=False, indent=2, default=str))
out.append("\n```\n")
return "".join(out)
def render_json(sections: dict[str, Any]) -> str:
return json.dumps(sections, ensure_ascii=False, indent=2, default=str)
def compute_delta(current: dict[str, Any], baseline: dict[str, Any]) -> dict[str, Any]:
"""간단 delta: total_rows + regate pct + fabricated_rate.
더 세밀한 비교는 향후 확장.
"""
delta: dict[str, Any] = {}
delta["total_rows"] = {
"current": current.get("total_rows"),
"baseline": baseline.get("total_rows"),
"diff": (current.get("total_rows") or 0) - (baseline.get("total_rows") or 0),
}
# regate tier 별 pct delta
base_regate = {r["tier"]: float(r["pct"]) for r in baseline.get("regate", [])}
cur_regate = {r["tier"]: float(r["pct"]) for r in current.get("regate", [])}
delta["regate_pct_diff_pp"] = {
tier: round(cur_regate.get(tier, 0.0) - base_regate.get(tier, 0.0), 2)
for tier in set(base_regate) | set(cur_regate)
}
# fabricated rate delta
cur_fr = current.get("fabricated_rate", {}).get("rate", 0.0)
base_fr = baseline.get("fabricated_rate", {}).get("rate", 0.0)
delta["fabricated_strong_rate"] = {
"current": cur_fr, "baseline": base_fr,
"diff_pp": round((cur_fr - base_fr) * 100, 2),
"rel_change_pct": (round((cur_fr - base_fr) / base_fr * 100, 2)
if base_fr > 0 else None),
}
return delta
# ─── FP CSV dump ──────────────────────────────────────────
def dump_fp_csv(rows: list[dict], path: Path) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
if not rows:
path.write_text("", encoding="utf-8")
return
# 안정된 컬럼 순서 (plan 명세)
cols = [
"id", "candidate_reason", "query", "completeness", "refused",
"classifier_verdict", "max_rerank_score", "aggregate_score",
"g_strong", "v_medium", "re_gate", "answer_length",
"prompt_version", "source", "eval_case_id", "created_at",
"is_true_fp", # 사용자 수기 작성용 공란
]
with path.open("w", encoding="utf-8", newline="") as f:
w = csv.DictWriter(f, fieldnames=cols)
w.writeheader()
for r in rows:
row_out = {c: r.get(c) for c in cols if c != "is_true_fp"}
row_out["is_true_fp"] = ""
# JSONB / dict 는 문자열로
for k, v in list(row_out.items()):
if isinstance(v, (list, dict)):
row_out[k] = json.dumps(v, ensure_ascii=False)
w.writerow(row_out)
# ─── dry-run (DB 없이 fixture 로드) ───────────────────────
def dry_run_sections() -> dict[str, Any]:
if not DRY_RUN_FIXTURE.exists():
# 최소한의 inline fixture
return {
"total_rows": 3,
"regate": [{"tier": "clean", "n": 2, "pct": 66.67},
{"tier": "refuse(grounding_2+strong)", "n": 1, "pct": 33.33}],
"score_hist": [],
"classifier": [],
"verifier": [],
"flags": [],
"fabricated_rate": {"total": 3, "fabricated_strong_hit": 0, "rate": 0.0},
"fp_candidates": [],
"answer_length": [],
}
return json.loads(DRY_RUN_FIXTURE.read_text(encoding="utf-8"))
# ─── main ─────────────────────────────────────────────────
async def run(args: argparse.Namespace) -> None:
if args.dry_run:
sections = dry_run_sections()
sections.setdefault("fp_csv_path", "(dry-run, CSV skipped)")
_emit(args, sections)
return
# DB 연결
database_url = os.getenv(
"DATABASE_URL", "postgresql+asyncpg://pkm:pkm@localhost:5432/pkm"
)
engine = create_async_engine(database_url, echo=False)
session_factory = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
async with session_factory() as session:
if args.inspect_shape:
sample = await fetch_shape_inspect(session)
print(json.dumps(
[{"id": s["id"], "created_at": str(s["created_at"]),
"defense_layers": s["defense_layers"]} for s in sample],
ensure_ascii=False, indent=2, default=str,
))
await engine.dispose()
return
where, params = build_filters(args)
total = await fetch_total_rows(session, where, params)
if total == 0:
print(f"WARNING: 필터 조건에 매칭되는 ask_events 행 0건. "
f"source={args.source} prompt_version={args.prompt_version} "
f"since={args.since} until={args.until}")
sections: dict[str, Any] = {"total_rows": total}
sections["regate"] = await fetch_regate_distribution(session, where, params)
sections["score_hist"] = await fetch_score_histogram(session, where, params)
sections["classifier"] = await fetch_classifier_confusion(session, where, params)
sections["verifier"] = await fetch_verifier_distribution(session, where, params)
sections["flags"] = await fetch_flag_frequencies(session, where, params)
sections["fabricated_rate"] = await fetch_fabricated_strong_rate(session, where, params)
sections["fp_candidates"] = await fetch_fp_candidates(
session, where, params, args.sample_limit)
sections["answer_length"] = await fetch_answer_length_distribution(
session, where, params)
# eval 전용
if args.source == "eval":
cases = load_eval_golden(EVAL_GOLDEN_PATH)
split_filter = (filter_eval_split(cases, args.eval_split)
if args.eval_split != "all" else None)
sections["eval"] = await fetch_eval_join_with_split(
session, where, params, cases, split_filter)
await engine.dispose()
# FP CSV dump
fp_csv = (Path(args.fp_artifacts) if args.fp_artifacts else
ARTIFACTS_DIR / f"fp_candidates_{args.run_label}.csv")
dump_fp_csv(sections["fp_candidates"], fp_csv)
sections["fp_csv_path"] = str(fp_csv)
_emit(args, sections)
def _emit(args: argparse.Namespace, sections: dict[str, Any]) -> None:
"""rendering + 파일 쓰기. compare-against 처리."""
delta = None
if args.compare_against:
baseline_path = Path(args.compare_against)
if baseline_path.exists():
baseline = json.loads(baseline_path.read_text(encoding="utf-8"))
delta = compute_delta(sections, baseline)
else:
print(f"WARNING: compare-against baseline not found: {baseline_path}")
md = render_markdown(sections, args, delta)
out_path = Path(args.output)
out_path.parent.mkdir(parents=True, exist_ok=True)
out_path.write_text(md, encoding="utf-8")
print(f"✓ markdown report: {out_path}")
if args.format == "json":
json_path = out_path.with_suffix(".json")
json_path.write_text(render_json(sections), encoding="utf-8")
print(f"✓ json baseline: {json_path}")
def main() -> None:
args = parse_args()
asyncio.run(run(args))
if __name__ == "__main__":
main()