feat(scripts): Phase 3.5 — calibrate_ask.py CLI (Q0~Q8 + render + FP CSV)
scripts/calibrate_ask.py — ask_events 집계 + markdown report 영구 도구.
기능:
- argparse: --source / --prompt-version / --since / --until / --eval-split
(tuning|confirm|all, id 해시 기반 deterministic split) / --run-label /
--output / --format md|json / --compare-against / --sample-limit /
--fp-artifacts / --inspect-shape / --dry-run
- 9개 fetcher (모두 read-only SELECT):
- Q0 defense_layers shape inspect
- Q1 re-gate tier 분포
- Q2 max_rerank_score 히스토그램 (bucket × bin)
- Q3 classifier 혼동행렬
- Q4 verifier severity 분포 (cast + COALESCE NULL safe)
- Q5 hallucination_flags top-K (UNION ALL outer wrap, strong/weak 컬럼 유지)
- Q6 eval golden mismatch (eval_case_id 기반 join + query string fallback)
- Q7 FP candidate (case A/B/C 분리 + candidate_reason 컬럼 + LIMIT/3 분배)
- Q8 answer_length p25/p50/p75 분포 (E.3 v1↔v2 비교 축)
- markdown render + json baseline + delta compare (compare-against)
- FP CSV dump (artifacts/fp_candidates_{run_label}.csv) + is_true_fp 공란
- dry-run: tests/calibrate_fixtures/sample_ask_events.json 로 출력 검증
- --threshold-overrides: Step 0 feasibility 통과 후 v2 (현재 stub raise)
read-only 강제: INSERT/UPDATE/DELETE/ALTER/DROP/TRUNCATE 0건.
tests/calibrate_fixtures/sample_ask_events.json: dry-run snapshot fixture.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,745 @@
|
||||
"""Phase 3.5 calibration CLI — ask_events 집계 + markdown report 생성.
|
||||
|
||||
사용법:
|
||||
# Docker 컨테이너 내부 (권장 — DATABASE_URL 자동 주입)
|
||||
docker compose exec fastapi python /app/scripts/calibrate_ask.py \\
|
||||
--source eval --prompt-version search_synthesis.v1-400char \\
|
||||
--run-label baseline_v1 --output reports/calibration_baseline_v1.md
|
||||
|
||||
# 로컬 (DATABASE_URL 환경변수 필요)
|
||||
python scripts/calibrate_ask.py --inspect-shape
|
||||
|
||||
옵션:
|
||||
--source eval / ui_search / ui_detail / document_server / ... (미지정=전체)
|
||||
--prompt-version search_synthesis.v1-400char 등
|
||||
--since / --until ISO8601, created_at 범위
|
||||
--eval-split tuning(200) / confirm(100) / all (id 해시 기반 deterministic)
|
||||
--run-label report 제목/파일명 라벨
|
||||
--output .md 경로 (기본 reports/calibration.md). --format json 이면 .json 도 생성
|
||||
--format md (사람용) | json (compare 용 baseline)
|
||||
--compare-against 비교 대상 .json baseline 경로 (Δ 컬럼 출력)
|
||||
--sample-limit FP candidate CSV 행수 (기본 30, 케이스별 분배)
|
||||
--fp-artifacts FP CSV 경로 (기본 artifacts/fp_candidates_{run_label}.csv)
|
||||
--inspect-shape defense_layers JSON sample 5건 출력 후 abort (Q0)
|
||||
--threshold-overrides config/threshold_candidate.yaml — Step 0 feasibility 미해결, 미구현
|
||||
--dry-run DB 미접속, tests/calibrate_fixtures/sample_ask_events.json 로드
|
||||
|
||||
읽기 전용 — INSERT/UPDATE/DELETE/ALTER 0건. SELECT 만.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import asyncio
|
||||
import csv
|
||||
import hashlib
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
from dataclasses import asdict, dataclass, field
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
# 프로젝트 루트의 app/ 디렉토리를 경로에 추가 (seed_admin.py 패턴)
|
||||
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "app"))
|
||||
|
||||
from sqlalchemy import text
|
||||
from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine, AsyncSession
|
||||
|
||||
# ─── 경로 / 기본값 ─────────────────────────────────────────
|
||||
|
||||
PROJECT_ROOT = Path(__file__).resolve().parent.parent
|
||||
EVAL_GOLDEN_PATH = PROJECT_ROOT / "evals" / "ask_analyze_v1.jsonl"
|
||||
DEFAULT_REPORT = PROJECT_ROOT / "reports" / "calibration.md"
|
||||
ARTIFACTS_DIR = PROJECT_ROOT / "artifacts"
|
||||
DRY_RUN_FIXTURE = PROJECT_ROOT / "tests" / "calibrate_fixtures" / "sample_ask_events.json"
|
||||
|
||||
# eval split 비율 (id 해시 기반 deterministic)
|
||||
TUNING_RATIO = 0.667 # 200 / 300
|
||||
|
||||
|
||||
# ─── argparse ────────────────────────────────────────────
|
||||
|
||||
|
||||
def parse_args() -> argparse.Namespace:
|
||||
p = argparse.ArgumentParser(description="Phase 3.5 ask_events calibration report")
|
||||
p.add_argument("--source", default=None,
|
||||
help="ask_events.source 필터 (eval / ui_search / ui_detail / 미지정=전체)")
|
||||
p.add_argument("--prompt-version", default=None,
|
||||
help="ask_events.prompt_version 필터 (예: search_synthesis.v1-400char)")
|
||||
p.add_argument("--since", default=None, help="ISO8601, created_at >= since")
|
||||
p.add_argument("--until", default=None, help="ISO8601, created_at < until")
|
||||
p.add_argument("--eval-split", choices=["tuning", "confirm", "all"], default="all",
|
||||
help="source='eval' 일 때 holdout split")
|
||||
p.add_argument("--run-label", default=None, help="report 제목/파일명 라벨")
|
||||
p.add_argument("--output", default=str(DEFAULT_REPORT), help="md 출력 경로")
|
||||
p.add_argument("--format", choices=["md", "json"], default="md",
|
||||
help="md 만 생성 또는 md+json 둘 다 (--format json 시)")
|
||||
p.add_argument("--compare-against", default=None, help="비교 대상 .json baseline 경로")
|
||||
p.add_argument("--sample-limit", type=int, default=30, help="FP candidate CSV 총 행수")
|
||||
p.add_argument("--fp-artifacts", default=None, help="FP CSV 경로")
|
||||
p.add_argument("--inspect-shape", action="store_true",
|
||||
help="defense_layers JSON sample 5건 출력 후 abort")
|
||||
p.add_argument("--threshold-overrides", default=None,
|
||||
help="config/threshold_candidate.yaml — Step 0 feasibility 미해결로 v2 미구현")
|
||||
p.add_argument("--dry-run", action="store_true",
|
||||
help="DB 미접속, fixtures 로 출력 검증")
|
||||
args = p.parse_args()
|
||||
if args.threshold_overrides:
|
||||
raise SystemExit(
|
||||
"--threshold-overrides 는 v2 미구현. Step 0 feasibility 통과 후 SQL "
|
||||
"reclassification 추가 예정. 1차는 baseline/candidate 를 코드 분기 run "
|
||||
"(코드 일시 수정 → eval replay 2회) 으로 측정."
|
||||
)
|
||||
if not args.run_label:
|
||||
args.run_label = f"calibration_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
|
||||
return args
|
||||
|
||||
|
||||
# ─── 공통 WHERE 조립 ──────────────────────────────────────
|
||||
|
||||
|
||||
def build_filters(args: argparse.Namespace) -> tuple[str, dict[str, Any]]:
|
||||
"""공통 WHERE 절 SQL + 바인딩 파라미터.
|
||||
|
||||
조건 4가지: source, prompt_version, since, until.
|
||||
None 인 항목은 IS NULL 로 무력화 (SQL CASE 회피, 단순 OR 패턴).
|
||||
"""
|
||||
clauses = [
|
||||
"(:source IS NULL OR source = :source)",
|
||||
"(:prompt_version IS NULL OR prompt_version = :prompt_version)",
|
||||
"(:since IS NULL OR created_at >= :since::timestamptz)",
|
||||
"(:until IS NULL OR created_at < :until::timestamptz)",
|
||||
]
|
||||
params: dict[str, Any] = {
|
||||
"source": args.source,
|
||||
"prompt_version": args.prompt_version,
|
||||
"since": args.since,
|
||||
"until": args.until,
|
||||
}
|
||||
return " AND ".join(clauses), params
|
||||
|
||||
|
||||
# ─── eval split (id 해시) ────────────────────────────────
|
||||
|
||||
|
||||
def split_by_id_hash(case_id: str, ratio: float = TUNING_RATIO) -> str:
|
||||
"""deterministic split — sha256(id) 의 첫 32bit 를 [0,1) 로.
|
||||
|
||||
< ratio → 'tuning', >= ratio → 'confirm'.
|
||||
"""
|
||||
h = hashlib.sha256(case_id.encode()).digest()
|
||||
bucket = int.from_bytes(h[:4], "big") / 0xFFFFFFFF
|
||||
return "tuning" if bucket < ratio else "confirm"
|
||||
|
||||
|
||||
def load_eval_golden(path: Path) -> dict[str, dict[str, Any]]:
|
||||
"""evals/ask_analyze_v1.jsonl → {id: case_dict}.
|
||||
|
||||
각 case 는 {id, type, category, query, expected_behavior, critical_keywords, ...}.
|
||||
"""
|
||||
if not path.exists():
|
||||
return {}
|
||||
cases: dict[str, dict[str, Any]] = {}
|
||||
with path.open("r", encoding="utf-8") as f:
|
||||
for line in f:
|
||||
line = line.strip()
|
||||
if not line:
|
||||
continue
|
||||
try:
|
||||
obj = json.loads(line)
|
||||
cid = obj.get("id")
|
||||
if cid:
|
||||
cases[cid] = obj
|
||||
except json.JSONDecodeError:
|
||||
continue
|
||||
return cases
|
||||
|
||||
|
||||
def filter_eval_split(cases: dict[str, dict], split: str) -> set[str]:
|
||||
"""split='all' 이면 전체 id, 아니면 split 매칭만."""
|
||||
if split == "all":
|
||||
return set(cases.keys())
|
||||
return {cid for cid in cases if split_by_id_hash(cid) == split}
|
||||
|
||||
|
||||
# ─── DB fetchers (Q0~Q8) ─────────────────────────────────
|
||||
|
||||
|
||||
async def fetch_shape_inspect(session: AsyncSession) -> list[dict]:
|
||||
"""Q0: defense_layers 5건 stdout 검증용."""
|
||||
sql = text("""
|
||||
SELECT id, defense_layers, created_at
|
||||
FROM ask_events
|
||||
WHERE defense_layers IS NOT NULL
|
||||
ORDER BY created_at DESC
|
||||
LIMIT 5
|
||||
""")
|
||||
rows = (await session.execute(sql)).mappings().all()
|
||||
return [dict(r) for r in rows]
|
||||
|
||||
|
||||
async def fetch_total_rows(session: AsyncSession, where: str, params: dict) -> int:
|
||||
sql = text(f"SELECT COUNT(*) AS n FROM ask_events WHERE {where}")
|
||||
return (await session.execute(sql, params)).scalar_one()
|
||||
|
||||
|
||||
async def fetch_regate_distribution(session, where, params) -> list[dict]:
|
||||
"""Q1: defense_layers->>'re_gate' 분포."""
|
||||
sql = text(f"""
|
||||
SELECT
|
||||
COALESCE(defense_layers->>'re_gate', '(null)') AS tier,
|
||||
COUNT(*) AS n,
|
||||
ROUND(100.0 * COUNT(*) / SUM(COUNT(*)) OVER (), 2) AS pct
|
||||
FROM ask_events
|
||||
WHERE {where}
|
||||
GROUP BY 1
|
||||
ORDER BY n DESC
|
||||
""")
|
||||
return [dict(r) for r in (await session.execute(sql, params)).mappings()]
|
||||
|
||||
|
||||
async def fetch_score_histogram(session, where, params) -> list[dict]:
|
||||
"""Q2: max_rerank_score 히스토그램 × bucket."""
|
||||
sql = text(f"""
|
||||
SELECT
|
||||
CASE WHEN refused THEN 'refused'
|
||||
WHEN completeness = 'full' THEN 'full'
|
||||
WHEN completeness = 'partial' THEN 'partial'
|
||||
ELSE 'insufficient' END AS bucket,
|
||||
WIDTH_BUCKET(COALESCE(max_rerank_score, 0.0), 0.0, 1.0, 10) AS bin,
|
||||
COUNT(*) AS n,
|
||||
ROUND(AVG(max_rerank_score)::numeric, 3) AS avg_score
|
||||
FROM ask_events
|
||||
WHERE {where}
|
||||
GROUP BY 1, 2
|
||||
ORDER BY 1, 2
|
||||
""")
|
||||
return [dict(r) for r in (await session.execute(sql, params)).mappings()]
|
||||
|
||||
|
||||
async def fetch_classifier_confusion(session, where, params) -> list[dict]:
|
||||
"""Q3: classifier_verdict × completeness × refused."""
|
||||
sql = text(f"""
|
||||
SELECT
|
||||
COALESCE(classifier_verdict, '(null)') AS verdict,
|
||||
COALESCE(completeness, '(null)') AS completeness,
|
||||
refused,
|
||||
COUNT(*) AS n
|
||||
FROM ask_events
|
||||
WHERE {where}
|
||||
GROUP BY 1, 2, 3
|
||||
ORDER BY n DESC
|
||||
""")
|
||||
return [dict(r) for r in (await session.execute(sql, params)).mappings()]
|
||||
|
||||
|
||||
async def fetch_verifier_distribution(session, where, params) -> list[dict]:
|
||||
"""Q4: verifier severity 분포 (cast + COALESCE 안전 처리)."""
|
||||
sql = text(f"""
|
||||
SELECT
|
||||
COALESCE(defense_layers->'verifier'->>'status', 'n/a') AS status,
|
||||
COALESCE((defense_layers->'verifier'->>'medium_count')::int, 0) AS medium_count,
|
||||
COALESCE((defense_layers->'verifier'->>'strong_count')::int, 0) AS strong_count,
|
||||
COALESCE(completeness, '(null)') AS completeness,
|
||||
COUNT(*) AS n
|
||||
FROM ask_events
|
||||
WHERE {where}
|
||||
GROUP BY 1, 2, 3, 4
|
||||
ORDER BY 1, 2, 3, 4
|
||||
""")
|
||||
return [dict(r) for r in (await session.execute(sql, params)).mappings()]
|
||||
|
||||
|
||||
async def fetch_flag_frequencies(session, where, params) -> list[dict]:
|
||||
"""Q5: hallucination_flags top-K, UNION ALL outer wrap.
|
||||
|
||||
출력: [{flag_type, strength, n}], n DESC, top 40.
|
||||
"""
|
||||
sql = text(f"""
|
||||
SELECT * FROM (
|
||||
SELECT split_part(flag, ':', 1) AS flag_type, 'strong' AS strength, COUNT(*) AS n
|
||||
FROM ask_events,
|
||||
jsonb_array_elements_text(defense_layers->'grounding'->'strong') AS flag
|
||||
WHERE {where}
|
||||
GROUP BY split_part(flag, ':', 1)
|
||||
UNION ALL
|
||||
SELECT split_part(flag, ':', 1) AS flag_type, 'weak' AS strength, COUNT(*) AS n
|
||||
FROM ask_events,
|
||||
jsonb_array_elements_text(defense_layers->'grounding'->'weak') AS flag
|
||||
WHERE {where}
|
||||
GROUP BY split_part(flag, ':', 1)
|
||||
) u
|
||||
ORDER BY n DESC
|
||||
LIMIT 40
|
||||
""")
|
||||
return [dict(r) for r in (await session.execute(sql, params)).mappings()]
|
||||
|
||||
|
||||
async def fetch_fabricated_strong_rate(session, where, params) -> dict[str, float]:
|
||||
"""B1 검증용: fabricated_number strong rate (raw count 아님).
|
||||
|
||||
rate = (fabricated_number 가 strong 에 1+ 등장한 행) / 전체 ask_events 행.
|
||||
"""
|
||||
sql = text(f"""
|
||||
SELECT
|
||||
COUNT(*) AS total,
|
||||
SUM(CASE WHEN EXISTS (
|
||||
SELECT 1 FROM jsonb_array_elements_text(defense_layers->'grounding'->'strong') f
|
||||
WHERE f LIKE 'fabricated_number:%%'
|
||||
) THEN 1 ELSE 0 END) AS hit
|
||||
FROM ask_events
|
||||
WHERE {where}
|
||||
""")
|
||||
row = (await session.execute(sql, params)).mappings().one()
|
||||
total = int(row["total"] or 0)
|
||||
hit = int(row["hit"] or 0)
|
||||
rate = (hit / total) if total > 0 else 0.0
|
||||
return {"total": total, "fabricated_strong_hit": hit, "rate": round(rate, 4)}
|
||||
|
||||
|
||||
async def fetch_eval_join_with_split(
|
||||
session, where, params, eval_cases: dict[str, dict], split_filter: set[str] | None,
|
||||
) -> dict[str, Any]:
|
||||
"""Q6: eval_case_id 기반 join + query string fallback.
|
||||
|
||||
출력:
|
||||
- mismatch_groups: [{expected, actual, n, sample_queries}]
|
||||
- eval_case_id_present: int
|
||||
- eval_case_id_null: int
|
||||
- join_failed_count: int (id 도 없고 query normalize 도 매칭 안 된 행)
|
||||
"""
|
||||
sql = text(f"""
|
||||
WITH ranked AS (
|
||||
SELECT
|
||||
id, eval_case_id, query, completeness, refused,
|
||||
ROW_NUMBER() OVER (PARTITION BY COALESCE(eval_case_id, query)
|
||||
ORDER BY created_at DESC) AS rn
|
||||
FROM ask_events
|
||||
WHERE {where} AND source = 'eval'
|
||||
)
|
||||
SELECT id, eval_case_id, query, completeness, refused
|
||||
FROM ranked WHERE rn = 1
|
||||
""")
|
||||
rows = [dict(r) for r in (await session.execute(sql, params)).mappings()]
|
||||
|
||||
# query string normalize 헬퍼 (lower + trim + 공백 단일화)
|
||||
import re as _re
|
||||
def norm(q: str | None) -> str:
|
||||
if not q:
|
||||
return ""
|
||||
return _re.sub(r"\s+", " ", q).strip().lower()
|
||||
|
||||
norm_to_id = {norm(c.get("query")): cid for cid, c in eval_cases.items()
|
||||
if c.get("query")}
|
||||
|
||||
eval_case_id_present = 0
|
||||
eval_case_id_null = 0
|
||||
join_failed_count = 0
|
||||
matched_pairs: list[tuple[str, dict, str, bool]] = [] # (cid, case, actual_completeness, actual_refused)
|
||||
|
||||
for row in rows:
|
||||
cid = row.get("eval_case_id")
|
||||
if cid:
|
||||
eval_case_id_present += 1
|
||||
case = eval_cases.get(cid)
|
||||
if not case:
|
||||
join_failed_count += 1
|
||||
continue
|
||||
else:
|
||||
eval_case_id_null += 1
|
||||
cid = norm_to_id.get(norm(row.get("query")))
|
||||
if not cid:
|
||||
join_failed_count += 1
|
||||
continue
|
||||
case = eval_cases.get(cid)
|
||||
if not case:
|
||||
join_failed_count += 1
|
||||
continue
|
||||
if split_filter is not None and cid not in split_filter:
|
||||
continue
|
||||
actual_completeness = row.get("completeness") or ("refused" if row.get("refused") else "(null)")
|
||||
matched_pairs.append((cid, case, actual_completeness, bool(row.get("refused"))))
|
||||
|
||||
# group by (expected_behavior, actual)
|
||||
groups: dict[tuple[str, str], list[str]] = {}
|
||||
for cid, case, actual, refused in matched_pairs:
|
||||
expected = case.get("expected_behavior", "(unknown)")
|
||||
# eval JSONL 의 expected_behavior 가 'answered'/'refused'/...; actual 도 정규화
|
||||
actual_norm = "refused" if refused else (actual or "(null)")
|
||||
key = (expected, actual_norm)
|
||||
groups.setdefault(key, []).append(case.get("query", ""))
|
||||
|
||||
mismatch_groups = []
|
||||
for (exp, act), queries in sorted(groups.items(), key=lambda x: -len(x[1])):
|
||||
mismatch_groups.append({
|
||||
"expected": exp,
|
||||
"actual": act,
|
||||
"n": len(queries),
|
||||
"sample_queries": queries[:3],
|
||||
})
|
||||
|
||||
return {
|
||||
"mismatch_groups": mismatch_groups,
|
||||
"eval_case_id_present": eval_case_id_present,
|
||||
"eval_case_id_null": eval_case_id_null,
|
||||
"join_failed_count": join_failed_count,
|
||||
"matched_total": len(matched_pairs),
|
||||
}
|
||||
|
||||
|
||||
async def fetch_fp_candidates(session, where, params, limit: int) -> list[dict]:
|
||||
"""Q7: 3개 case (A/B/C) UNION ALL + candidate_reason 컬럼.
|
||||
|
||||
각 case 별 limit/3 분배 (case 간 양 균형).
|
||||
"""
|
||||
per_case = max(1, limit // 3)
|
||||
sql = text(f"""
|
||||
WITH base AS (
|
||||
SELECT
|
||||
id, query, completeness, refused, classifier_verdict,
|
||||
max_rerank_score, aggregate_score,
|
||||
defense_layers->'grounding'->'strong' AS g_strong,
|
||||
defense_layers->'verifier'->>'medium_count' AS v_medium,
|
||||
defense_layers->>'re_gate' AS re_gate,
|
||||
answer_length, prompt_version, source, eval_case_id, created_at
|
||||
FROM ask_events WHERE {where}
|
||||
),
|
||||
case_a AS (
|
||||
SELECT *, 'refused_high_rerank' AS candidate_reason
|
||||
FROM base
|
||||
WHERE refused = true AND COALESCE(max_rerank_score, 0.0) >= 0.35
|
||||
ORDER BY created_at DESC LIMIT :per_case
|
||||
),
|
||||
case_b AS (
|
||||
SELECT *, 'insufficient_classifier_sufficient' AS candidate_reason
|
||||
FROM base
|
||||
WHERE completeness = 'insufficient' AND classifier_verdict = 'sufficient'
|
||||
ORDER BY created_at DESC LIMIT :per_case
|
||||
),
|
||||
case_c AS (
|
||||
SELECT *, 'partial_only_fabricated_number' AS candidate_reason
|
||||
FROM base
|
||||
WHERE completeness = 'partial'
|
||||
AND jsonb_array_length(COALESCE(g_strong, '[]'::jsonb)) = 1
|
||||
AND (g_strong->>0) LIKE 'fabricated_number:%%'
|
||||
ORDER BY created_at DESC LIMIT :per_case
|
||||
)
|
||||
SELECT * FROM case_a
|
||||
UNION ALL SELECT * FROM case_b
|
||||
UNION ALL SELECT * FROM case_c
|
||||
""")
|
||||
params2 = {**params, "per_case": per_case}
|
||||
return [dict(r) for r in (await session.execute(sql, params2)).mappings()]
|
||||
|
||||
|
||||
async def fetch_answer_length_distribution(session, where, params) -> list[dict]:
|
||||
"""Q8: answer_length p25/p50/p75 × bucket."""
|
||||
sql = text(f"""
|
||||
SELECT
|
||||
CASE WHEN refused THEN 'refused' ELSE COALESCE(completeness, '(null)') END AS bucket,
|
||||
PERCENTILE_CONT(0.25) WITHIN GROUP (ORDER BY answer_length) AS p25,
|
||||
PERCENTILE_CONT(0.50) WITHIN GROUP (ORDER BY answer_length) AS p50,
|
||||
PERCENTILE_CONT(0.75) WITHIN GROUP (ORDER BY answer_length) AS p75,
|
||||
AVG(answer_length)::int AS avg,
|
||||
COUNT(*) AS n
|
||||
FROM ask_events
|
||||
WHERE {where} AND answer_length IS NOT NULL
|
||||
GROUP BY 1
|
||||
ORDER BY 1
|
||||
""")
|
||||
return [dict(r) for r in (await session.execute(sql, params)).mappings()]
|
||||
|
||||
|
||||
# ─── rendering ───────────────────────────────────────────
|
||||
|
||||
|
||||
def _md_table(headers: list[str], rows: list[list[Any]]) -> str:
|
||||
if not rows:
|
||||
return "_(empty)_\n"
|
||||
lines = ["| " + " | ".join(headers) + " |",
|
||||
"|" + "|".join(["---"] * len(headers)) + "|"]
|
||||
for row in rows:
|
||||
lines.append("| " + " | ".join(str(v) for v in row) + " |")
|
||||
return "\n".join(lines) + "\n"
|
||||
|
||||
|
||||
def render_markdown(sections: dict[str, Any], args: argparse.Namespace,
|
||||
delta: dict[str, Any] | None = None) -> str:
|
||||
label = args.run_label
|
||||
out: list[str] = [f"# Calibration Report — {label}\n"]
|
||||
out.append(f"Filter: source={args.source} prompt_version={args.prompt_version} "
|
||||
f"since={args.since} until={args.until} eval_split={args.eval_split}\n")
|
||||
out.append(f"Total rows: **{sections['total_rows']}**\n")
|
||||
|
||||
# 0. shape inspect (--inspect-shape 시 본 출력 자체가 sample)
|
||||
if "shape_sample" in sections:
|
||||
out.append("## 0. defense_layers shape sample (latest 5)\n")
|
||||
for s in sections["shape_sample"]:
|
||||
out.append(f"- id={s['id']} created_at={s['created_at']}\n")
|
||||
out.append(" ```json\n")
|
||||
out.append(" " + json.dumps(s["defense_layers"], ensure_ascii=False, indent=2).replace("\n", "\n ") + "\n")
|
||||
out.append(" ```\n")
|
||||
|
||||
# 1. re-gate
|
||||
out.append("## 1. Re-gate tier 분포\n")
|
||||
out.append(_md_table(["tier", "n", "pct"],
|
||||
[[r["tier"], r["n"], f"{r['pct']}%"] for r in sections["regate"]]))
|
||||
|
||||
# 2. score histogram
|
||||
out.append("## 2. max_rerank_score 히스토그램 (bucket × bin 0~10)\n")
|
||||
out.append(_md_table(["bucket", "bin", "n", "avg_score"],
|
||||
[[r["bucket"], r["bin"], r["n"], r["avg_score"]] for r in sections["score_hist"]]))
|
||||
|
||||
# 3. classifier confusion
|
||||
out.append("## 3. Classifier 혼동행렬 (verdict × completeness × refused)\n")
|
||||
out.append(_md_table(["verdict", "completeness", "refused", "n"],
|
||||
[[r["verdict"], r["completeness"], r["refused"], r["n"]] for r in sections["classifier"]]))
|
||||
|
||||
# 4. verifier
|
||||
out.append("## 4. Verifier severity 분포\n")
|
||||
out.append(_md_table(["status", "medium_count", "strong_count", "completeness", "n"],
|
||||
[[r["status"], r["medium_count"], r["strong_count"], r["completeness"], r["n"]]
|
||||
for r in sections["verifier"]]))
|
||||
|
||||
# 5. flags — 3개 표 (전체 / strong / weak)
|
||||
flags = sections["flags"]
|
||||
flags_strong = [f for f in flags if f["strength"] == "strong"]
|
||||
flags_weak = [f for f in flags if f["strength"] == "weak"]
|
||||
out.append("## 5. Hallucination flags top-K\n")
|
||||
out.append("### 5.1 전체 top-20\n")
|
||||
out.append(_md_table(["flag_type", "strength", "n"],
|
||||
[[r["flag_type"], r["strength"], r["n"]] for r in flags[:20]]))
|
||||
out.append("### 5.2 strong only top-10\n")
|
||||
out.append(_md_table(["flag_type", "n"],
|
||||
[[r["flag_type"], r["n"]] for r in flags_strong[:10]]))
|
||||
out.append("### 5.3 weak only top-10\n")
|
||||
out.append(_md_table(["flag_type", "n"],
|
||||
[[r["flag_type"], r["n"]] for r in flags_weak[:10]]))
|
||||
|
||||
# B1 감시 — fabricated_number strong rate
|
||||
fab = sections["fabricated_rate"]
|
||||
out.append("### 5.4 fabricated_number strong rate (B1 추적용)\n")
|
||||
out.append(f"- total rows: {fab['total']}\n")
|
||||
out.append(f"- fabricated_strong hit: {fab['fabricated_strong_hit']}\n")
|
||||
out.append(f"- **rate: {fab['rate'] * 100:.2f}%**\n")
|
||||
|
||||
# 6. eval mismatch (eval 일 때만)
|
||||
if "eval" in sections:
|
||||
ev = sections["eval"]
|
||||
out.append("## 6. Eval golden mismatch (eval_case_id 기반)\n")
|
||||
out.append(f"- eval_case_id present: {ev['eval_case_id_present']}\n")
|
||||
out.append(f"- eval_case_id null (fallback): {ev['eval_case_id_null']}\n")
|
||||
out.append(f"- join_failed_count: **{ev['join_failed_count']}**\n")
|
||||
out.append(f"- matched total: {ev['matched_total']}\n\n")
|
||||
out.append(_md_table(["expected", "actual", "n", "sample"],
|
||||
[[g["expected"], g["actual"], g["n"], " | ".join(g["sample_queries"])[:120]]
|
||||
for g in ev["mismatch_groups"]]))
|
||||
|
||||
# 7. FP candidates
|
||||
fps = sections["fp_candidates"]
|
||||
out.append(f"## 7. FP candidate sample (n={len(fps)}, case A/B/C 분리)\n")
|
||||
out.append(f"전체 CSV: `{sections.get('fp_csv_path', '(미생성)')}`\n\n")
|
||||
out.append(_md_table(
|
||||
["case", "id", "completeness", "refused", "verdict", "max_score", "re_gate", "query"],
|
||||
[[r["candidate_reason"], r["id"], r["completeness"], r["refused"],
|
||||
r["classifier_verdict"], r["max_rerank_score"], r["re_gate"],
|
||||
(r["query"] or "")[:60]] for r in fps]))
|
||||
|
||||
# 8. answer_length
|
||||
out.append("## 8. answer_length 분포 (bucket × percentile)\n")
|
||||
out.append(_md_table(["bucket", "p25", "p50", "p75", "avg", "n"],
|
||||
[[r["bucket"], r["p25"], r["p50"], r["p75"], r["avg"], r["n"]]
|
||||
for r in sections["answer_length"]]))
|
||||
|
||||
# 9. delta vs baseline
|
||||
if delta:
|
||||
out.append("## 9. Delta vs baseline\n")
|
||||
out.append("```json\n")
|
||||
out.append(json.dumps(delta, ensure_ascii=False, indent=2, default=str))
|
||||
out.append("\n```\n")
|
||||
|
||||
return "".join(out)
|
||||
|
||||
|
||||
def render_json(sections: dict[str, Any]) -> str:
|
||||
return json.dumps(sections, ensure_ascii=False, indent=2, default=str)
|
||||
|
||||
|
||||
def compute_delta(current: dict[str, Any], baseline: dict[str, Any]) -> dict[str, Any]:
|
||||
"""간단 delta: total_rows + regate pct + fabricated_rate.
|
||||
|
||||
더 세밀한 비교는 향후 확장.
|
||||
"""
|
||||
delta: dict[str, Any] = {}
|
||||
delta["total_rows"] = {
|
||||
"current": current.get("total_rows"),
|
||||
"baseline": baseline.get("total_rows"),
|
||||
"diff": (current.get("total_rows") or 0) - (baseline.get("total_rows") or 0),
|
||||
}
|
||||
# regate tier 별 pct delta
|
||||
base_regate = {r["tier"]: float(r["pct"]) for r in baseline.get("regate", [])}
|
||||
cur_regate = {r["tier"]: float(r["pct"]) for r in current.get("regate", [])}
|
||||
delta["regate_pct_diff_pp"] = {
|
||||
tier: round(cur_regate.get(tier, 0.0) - base_regate.get(tier, 0.0), 2)
|
||||
for tier in set(base_regate) | set(cur_regate)
|
||||
}
|
||||
# fabricated rate delta
|
||||
cur_fr = current.get("fabricated_rate", {}).get("rate", 0.0)
|
||||
base_fr = baseline.get("fabricated_rate", {}).get("rate", 0.0)
|
||||
delta["fabricated_strong_rate"] = {
|
||||
"current": cur_fr, "baseline": base_fr,
|
||||
"diff_pp": round((cur_fr - base_fr) * 100, 2),
|
||||
"rel_change_pct": (round((cur_fr - base_fr) / base_fr * 100, 2)
|
||||
if base_fr > 0 else None),
|
||||
}
|
||||
return delta
|
||||
|
||||
|
||||
# ─── FP CSV dump ──────────────────────────────────────────
|
||||
|
||||
|
||||
def dump_fp_csv(rows: list[dict], path: Path) -> None:
|
||||
path.parent.mkdir(parents=True, exist_ok=True)
|
||||
if not rows:
|
||||
path.write_text("", encoding="utf-8")
|
||||
return
|
||||
# 안정된 컬럼 순서 (plan 명세)
|
||||
cols = [
|
||||
"id", "candidate_reason", "query", "completeness", "refused",
|
||||
"classifier_verdict", "max_rerank_score", "aggregate_score",
|
||||
"g_strong", "v_medium", "re_gate", "answer_length",
|
||||
"prompt_version", "source", "eval_case_id", "created_at",
|
||||
"is_true_fp", # 사용자 수기 작성용 공란
|
||||
]
|
||||
with path.open("w", encoding="utf-8", newline="") as f:
|
||||
w = csv.DictWriter(f, fieldnames=cols)
|
||||
w.writeheader()
|
||||
for r in rows:
|
||||
row_out = {c: r.get(c) for c in cols if c != "is_true_fp"}
|
||||
row_out["is_true_fp"] = ""
|
||||
# JSONB / dict 는 문자열로
|
||||
for k, v in list(row_out.items()):
|
||||
if isinstance(v, (list, dict)):
|
||||
row_out[k] = json.dumps(v, ensure_ascii=False)
|
||||
w.writerow(row_out)
|
||||
|
||||
|
||||
# ─── dry-run (DB 없이 fixture 로드) ───────────────────────
|
||||
|
||||
|
||||
def dry_run_sections() -> dict[str, Any]:
|
||||
if not DRY_RUN_FIXTURE.exists():
|
||||
# 최소한의 inline fixture
|
||||
return {
|
||||
"total_rows": 3,
|
||||
"regate": [{"tier": "clean", "n": 2, "pct": 66.67},
|
||||
{"tier": "refuse(grounding_2+strong)", "n": 1, "pct": 33.33}],
|
||||
"score_hist": [],
|
||||
"classifier": [],
|
||||
"verifier": [],
|
||||
"flags": [],
|
||||
"fabricated_rate": {"total": 3, "fabricated_strong_hit": 0, "rate": 0.0},
|
||||
"fp_candidates": [],
|
||||
"answer_length": [],
|
||||
}
|
||||
return json.loads(DRY_RUN_FIXTURE.read_text(encoding="utf-8"))
|
||||
|
||||
|
||||
# ─── main ─────────────────────────────────────────────────
|
||||
|
||||
|
||||
async def run(args: argparse.Namespace) -> None:
|
||||
if args.dry_run:
|
||||
sections = dry_run_sections()
|
||||
sections.setdefault("fp_csv_path", "(dry-run, CSV skipped)")
|
||||
_emit(args, sections)
|
||||
return
|
||||
|
||||
# DB 연결
|
||||
database_url = os.getenv(
|
||||
"DATABASE_URL", "postgresql+asyncpg://pkm:pkm@localhost:5432/pkm"
|
||||
)
|
||||
engine = create_async_engine(database_url, echo=False)
|
||||
session_factory = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
|
||||
|
||||
async with session_factory() as session:
|
||||
if args.inspect_shape:
|
||||
sample = await fetch_shape_inspect(session)
|
||||
print(json.dumps(
|
||||
[{"id": s["id"], "created_at": str(s["created_at"]),
|
||||
"defense_layers": s["defense_layers"]} for s in sample],
|
||||
ensure_ascii=False, indent=2, default=str,
|
||||
))
|
||||
await engine.dispose()
|
||||
return
|
||||
|
||||
where, params = build_filters(args)
|
||||
total = await fetch_total_rows(session, where, params)
|
||||
if total == 0:
|
||||
print(f"WARNING: 필터 조건에 매칭되는 ask_events 행 0건. "
|
||||
f"source={args.source} prompt_version={args.prompt_version} "
|
||||
f"since={args.since} until={args.until}")
|
||||
|
||||
sections: dict[str, Any] = {"total_rows": total}
|
||||
sections["regate"] = await fetch_regate_distribution(session, where, params)
|
||||
sections["score_hist"] = await fetch_score_histogram(session, where, params)
|
||||
sections["classifier"] = await fetch_classifier_confusion(session, where, params)
|
||||
sections["verifier"] = await fetch_verifier_distribution(session, where, params)
|
||||
sections["flags"] = await fetch_flag_frequencies(session, where, params)
|
||||
sections["fabricated_rate"] = await fetch_fabricated_strong_rate(session, where, params)
|
||||
sections["fp_candidates"] = await fetch_fp_candidates(
|
||||
session, where, params, args.sample_limit)
|
||||
sections["answer_length"] = await fetch_answer_length_distribution(
|
||||
session, where, params)
|
||||
|
||||
# eval 전용
|
||||
if args.source == "eval":
|
||||
cases = load_eval_golden(EVAL_GOLDEN_PATH)
|
||||
split_filter = (filter_eval_split(cases, args.eval_split)
|
||||
if args.eval_split != "all" else None)
|
||||
sections["eval"] = await fetch_eval_join_with_split(
|
||||
session, where, params, cases, split_filter)
|
||||
|
||||
await engine.dispose()
|
||||
|
||||
# FP CSV dump
|
||||
fp_csv = (Path(args.fp_artifacts) if args.fp_artifacts else
|
||||
ARTIFACTS_DIR / f"fp_candidates_{args.run_label}.csv")
|
||||
dump_fp_csv(sections["fp_candidates"], fp_csv)
|
||||
sections["fp_csv_path"] = str(fp_csv)
|
||||
|
||||
_emit(args, sections)
|
||||
|
||||
|
||||
def _emit(args: argparse.Namespace, sections: dict[str, Any]) -> None:
|
||||
"""rendering + 파일 쓰기. compare-against 처리."""
|
||||
delta = None
|
||||
if args.compare_against:
|
||||
baseline_path = Path(args.compare_against)
|
||||
if baseline_path.exists():
|
||||
baseline = json.loads(baseline_path.read_text(encoding="utf-8"))
|
||||
delta = compute_delta(sections, baseline)
|
||||
else:
|
||||
print(f"WARNING: compare-against baseline not found: {baseline_path}")
|
||||
|
||||
md = render_markdown(sections, args, delta)
|
||||
out_path = Path(args.output)
|
||||
out_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
out_path.write_text(md, encoding="utf-8")
|
||||
print(f"✓ markdown report: {out_path}")
|
||||
|
||||
if args.format == "json":
|
||||
json_path = out_path.with_suffix(".json")
|
||||
json_path.write_text(render_json(sections), encoding="utf-8")
|
||||
print(f"✓ json baseline: {json_path}")
|
||||
|
||||
|
||||
def main() -> None:
|
||||
args = parse_args()
|
||||
asyncio.run(run(args))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -0,0 +1,63 @@
|
||||
{
|
||||
"total_rows": 10,
|
||||
"regate": [
|
||||
{"tier": "clean", "n": 5, "pct": 50.0},
|
||||
{"tier": "partial(strong_or_negation)", "n": 3, "pct": 30.0},
|
||||
{"tier": "refuse(grounding_2+strong)", "n": 1, "pct": 10.0},
|
||||
{"tier": "conf_low(medium_x3)", "n": 1, "pct": 10.0}
|
||||
],
|
||||
"score_hist": [
|
||||
{"bucket": "full", "bin": 9, "n": 4, "avg_score": 0.87},
|
||||
{"bucket": "full", "bin": 8, "n": 1, "avg_score": 0.78},
|
||||
{"bucket": "partial", "bin": 5, "n": 3, "avg_score": 0.51},
|
||||
{"bucket": "refused", "bin": 2, "n": 1, "avg_score": 0.18},
|
||||
{"bucket": "insufficient", "bin": 1, "n": 1, "avg_score": 0.08}
|
||||
],
|
||||
"classifier": [
|
||||
{"verdict": "sufficient", "completeness": "full", "refused": false, "n": 5},
|
||||
{"verdict": "sufficient", "completeness": "partial", "refused": false, "n": 3},
|
||||
{"verdict": "insufficient", "completeness": "insufficient", "refused": true, "n": 2}
|
||||
],
|
||||
"verifier": [
|
||||
{"status": "ok", "medium_count": 0, "strong_count": 0, "completeness": "full", "n": 5},
|
||||
{"status": "ok", "medium_count": 1, "strong_count": 0, "completeness": "partial", "n": 2},
|
||||
{"status": "ok", "medium_count": 3, "strong_count": 0, "completeness": "partial", "n": 1},
|
||||
{"status": "skipped", "medium_count": 0, "strong_count": 0, "completeness": "insufficient", "n": 2}
|
||||
],
|
||||
"flags": [
|
||||
{"flag_type": "fabricated_number", "strength": "strong", "n": 2},
|
||||
{"flag_type": "uncited_claim", "strength": "weak", "n": 4},
|
||||
{"flag_type": "low_overlap", "strength": "weak", "n": 3},
|
||||
{"flag_type": "intent_misalignment", "strength": "strong", "n": 1}
|
||||
],
|
||||
"fabricated_rate": {
|
||||
"total": 10,
|
||||
"fabricated_strong_hit": 2,
|
||||
"rate": 0.2
|
||||
},
|
||||
"fp_candidates": [
|
||||
{
|
||||
"id": 101,
|
||||
"candidate_reason": "refused_high_rerank",
|
||||
"query": "샘플 질의 1",
|
||||
"completeness": "insufficient",
|
||||
"refused": true,
|
||||
"classifier_verdict": "insufficient",
|
||||
"max_rerank_score": 0.42,
|
||||
"aggregate_score": 1.05,
|
||||
"g_strong": [],
|
||||
"v_medium": "0",
|
||||
"re_gate": "refuse(score_gate)",
|
||||
"answer_length": 0,
|
||||
"prompt_version": "search_synthesis.v1-400char",
|
||||
"source": "eval",
|
||||
"eval_case_id": "ask_def_001",
|
||||
"created_at": "2026-04-17T08:00:00+00:00"
|
||||
}
|
||||
],
|
||||
"answer_length": [
|
||||
{"bucket": "full", "p25": 280, "p50": 350, "p75": 395, "avg": 340, "n": 5},
|
||||
{"bucket": "partial", "p25": 200, "p50": 260, "p75": 320, "avg": 255, "n": 3},
|
||||
{"bucket": "refused", "p25": 0, "p50": 0, "p75": 0, "avg": 0, "n": 2}
|
||||
]
|
||||
}
|
||||
Reference in New Issue
Block a user