06c2c35955
scripts/calibrate_ask.py — ask_events 집계 + markdown report 영구 도구.
기능:
- argparse: --source / --prompt-version / --since / --until / --eval-split
(tuning|confirm|all, id 해시 기반 deterministic split) / --run-label /
--output / --format md|json / --compare-against / --sample-limit /
--fp-artifacts / --inspect-shape / --dry-run
- 9개 fetcher (모두 read-only SELECT):
- Q0 defense_layers shape inspect
- Q1 re-gate tier 분포
- Q2 max_rerank_score 히스토그램 (bucket × bin)
- Q3 classifier 혼동행렬
- Q4 verifier severity 분포 (cast + COALESCE NULL safe)
- Q5 hallucination_flags top-K (UNION ALL outer wrap, strong/weak 컬럼 유지)
- Q6 eval golden mismatch (eval_case_id 기반 join + query string fallback)
- Q7 FP candidate (case A/B/C 분리 + candidate_reason 컬럼 + LIMIT/3 분배)
- Q8 answer_length p25/p50/p75 분포 (E.3 v1↔v2 비교 축)
- markdown render + json baseline + delta compare (compare-against)
- FP CSV dump (artifacts/fp_candidates_{run_label}.csv) + is_true_fp 공란
- dry-run: tests/calibrate_fixtures/sample_ask_events.json 로 출력 검증
- --threshold-overrides: Step 0 feasibility 통과 후 v2 (현재 stub raise)
read-only 강제: INSERT/UPDATE/DELETE/ALTER/DROP/TRUNCATE 0건.
tests/calibrate_fixtures/sample_ask_events.json: dry-run snapshot fixture.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
746 lines
30 KiB
Python
746 lines
30 KiB
Python
"""Phase 3.5 calibration CLI — ask_events 집계 + markdown report 생성.
|
||
|
||
사용법:
|
||
# Docker 컨테이너 내부 (권장 — DATABASE_URL 자동 주입)
|
||
docker compose exec fastapi python /app/scripts/calibrate_ask.py \\
|
||
--source eval --prompt-version search_synthesis.v1-400char \\
|
||
--run-label baseline_v1 --output reports/calibration_baseline_v1.md
|
||
|
||
# 로컬 (DATABASE_URL 환경변수 필요)
|
||
python scripts/calibrate_ask.py --inspect-shape
|
||
|
||
옵션:
|
||
--source eval / ui_search / ui_detail / document_server / ... (미지정=전체)
|
||
--prompt-version search_synthesis.v1-400char 등
|
||
--since / --until ISO8601, created_at 범위
|
||
--eval-split tuning(200) / confirm(100) / all (id 해시 기반 deterministic)
|
||
--run-label report 제목/파일명 라벨
|
||
--output .md 경로 (기본 reports/calibration.md). --format json 이면 .json 도 생성
|
||
--format md (사람용) | json (compare 용 baseline)
|
||
--compare-against 비교 대상 .json baseline 경로 (Δ 컬럼 출력)
|
||
--sample-limit FP candidate CSV 행수 (기본 30, 케이스별 분배)
|
||
--fp-artifacts FP CSV 경로 (기본 artifacts/fp_candidates_{run_label}.csv)
|
||
--inspect-shape defense_layers JSON sample 5건 출력 후 abort (Q0)
|
||
--threshold-overrides config/threshold_candidate.yaml — Step 0 feasibility 미해결, 미구현
|
||
--dry-run DB 미접속, tests/calibrate_fixtures/sample_ask_events.json 로드
|
||
|
||
읽기 전용 — INSERT/UPDATE/DELETE/ALTER 0건. SELECT 만.
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import argparse
|
||
import asyncio
|
||
import csv
|
||
import hashlib
|
||
import json
|
||
import os
|
||
import sys
|
||
from dataclasses import asdict, dataclass, field
|
||
from datetime import datetime
|
||
from pathlib import Path
|
||
from typing import Any
|
||
|
||
# 프로젝트 루트의 app/ 디렉토리를 경로에 추가 (seed_admin.py 패턴)
|
||
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "app"))
|
||
|
||
from sqlalchemy import text
|
||
from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine, AsyncSession
|
||
|
||
# ─── 경로 / 기본값 ─────────────────────────────────────────
|
||
|
||
PROJECT_ROOT = Path(__file__).resolve().parent.parent
|
||
EVAL_GOLDEN_PATH = PROJECT_ROOT / "evals" / "ask_analyze_v1.jsonl"
|
||
DEFAULT_REPORT = PROJECT_ROOT / "reports" / "calibration.md"
|
||
ARTIFACTS_DIR = PROJECT_ROOT / "artifacts"
|
||
DRY_RUN_FIXTURE = PROJECT_ROOT / "tests" / "calibrate_fixtures" / "sample_ask_events.json"
|
||
|
||
# eval split 비율 (id 해시 기반 deterministic)
|
||
TUNING_RATIO = 0.667 # 200 / 300
|
||
|
||
|
||
# ─── argparse ────────────────────────────────────────────
|
||
|
||
|
||
def parse_args() -> argparse.Namespace:
|
||
p = argparse.ArgumentParser(description="Phase 3.5 ask_events calibration report")
|
||
p.add_argument("--source", default=None,
|
||
help="ask_events.source 필터 (eval / ui_search / ui_detail / 미지정=전체)")
|
||
p.add_argument("--prompt-version", default=None,
|
||
help="ask_events.prompt_version 필터 (예: search_synthesis.v1-400char)")
|
||
p.add_argument("--since", default=None, help="ISO8601, created_at >= since")
|
||
p.add_argument("--until", default=None, help="ISO8601, created_at < until")
|
||
p.add_argument("--eval-split", choices=["tuning", "confirm", "all"], default="all",
|
||
help="source='eval' 일 때 holdout split")
|
||
p.add_argument("--run-label", default=None, help="report 제목/파일명 라벨")
|
||
p.add_argument("--output", default=str(DEFAULT_REPORT), help="md 출력 경로")
|
||
p.add_argument("--format", choices=["md", "json"], default="md",
|
||
help="md 만 생성 또는 md+json 둘 다 (--format json 시)")
|
||
p.add_argument("--compare-against", default=None, help="비교 대상 .json baseline 경로")
|
||
p.add_argument("--sample-limit", type=int, default=30, help="FP candidate CSV 총 행수")
|
||
p.add_argument("--fp-artifacts", default=None, help="FP CSV 경로")
|
||
p.add_argument("--inspect-shape", action="store_true",
|
||
help="defense_layers JSON sample 5건 출력 후 abort")
|
||
p.add_argument("--threshold-overrides", default=None,
|
||
help="config/threshold_candidate.yaml — Step 0 feasibility 미해결로 v2 미구현")
|
||
p.add_argument("--dry-run", action="store_true",
|
||
help="DB 미접속, fixtures 로 출력 검증")
|
||
args = p.parse_args()
|
||
if args.threshold_overrides:
|
||
raise SystemExit(
|
||
"--threshold-overrides 는 v2 미구현. Step 0 feasibility 통과 후 SQL "
|
||
"reclassification 추가 예정. 1차는 baseline/candidate 를 코드 분기 run "
|
||
"(코드 일시 수정 → eval replay 2회) 으로 측정."
|
||
)
|
||
if not args.run_label:
|
||
args.run_label = f"calibration_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
|
||
return args
|
||
|
||
|
||
# ─── 공통 WHERE 조립 ──────────────────────────────────────
|
||
|
||
|
||
def build_filters(args: argparse.Namespace) -> tuple[str, dict[str, Any]]:
|
||
"""공통 WHERE 절 SQL + 바인딩 파라미터.
|
||
|
||
조건 4가지: source, prompt_version, since, until.
|
||
None 인 항목은 IS NULL 로 무력화 (SQL CASE 회피, 단순 OR 패턴).
|
||
"""
|
||
clauses = [
|
||
"(:source IS NULL OR source = :source)",
|
||
"(:prompt_version IS NULL OR prompt_version = :prompt_version)",
|
||
"(:since IS NULL OR created_at >= :since::timestamptz)",
|
||
"(:until IS NULL OR created_at < :until::timestamptz)",
|
||
]
|
||
params: dict[str, Any] = {
|
||
"source": args.source,
|
||
"prompt_version": args.prompt_version,
|
||
"since": args.since,
|
||
"until": args.until,
|
||
}
|
||
return " AND ".join(clauses), params
|
||
|
||
|
||
# ─── eval split (id 해시) ────────────────────────────────
|
||
|
||
|
||
def split_by_id_hash(case_id: str, ratio: float = TUNING_RATIO) -> str:
|
||
"""deterministic split — sha256(id) 의 첫 32bit 를 [0,1) 로.
|
||
|
||
< ratio → 'tuning', >= ratio → 'confirm'.
|
||
"""
|
||
h = hashlib.sha256(case_id.encode()).digest()
|
||
bucket = int.from_bytes(h[:4], "big") / 0xFFFFFFFF
|
||
return "tuning" if bucket < ratio else "confirm"
|
||
|
||
|
||
def load_eval_golden(path: Path) -> dict[str, dict[str, Any]]:
|
||
"""evals/ask_analyze_v1.jsonl → {id: case_dict}.
|
||
|
||
각 case 는 {id, type, category, query, expected_behavior, critical_keywords, ...}.
|
||
"""
|
||
if not path.exists():
|
||
return {}
|
||
cases: dict[str, dict[str, Any]] = {}
|
||
with path.open("r", encoding="utf-8") as f:
|
||
for line in f:
|
||
line = line.strip()
|
||
if not line:
|
||
continue
|
||
try:
|
||
obj = json.loads(line)
|
||
cid = obj.get("id")
|
||
if cid:
|
||
cases[cid] = obj
|
||
except json.JSONDecodeError:
|
||
continue
|
||
return cases
|
||
|
||
|
||
def filter_eval_split(cases: dict[str, dict], split: str) -> set[str]:
|
||
"""split='all' 이면 전체 id, 아니면 split 매칭만."""
|
||
if split == "all":
|
||
return set(cases.keys())
|
||
return {cid for cid in cases if split_by_id_hash(cid) == split}
|
||
|
||
|
||
# ─── DB fetchers (Q0~Q8) ─────────────────────────────────
|
||
|
||
|
||
async def fetch_shape_inspect(session: AsyncSession) -> list[dict]:
|
||
"""Q0: defense_layers 5건 stdout 검증용."""
|
||
sql = text("""
|
||
SELECT id, defense_layers, created_at
|
||
FROM ask_events
|
||
WHERE defense_layers IS NOT NULL
|
||
ORDER BY created_at DESC
|
||
LIMIT 5
|
||
""")
|
||
rows = (await session.execute(sql)).mappings().all()
|
||
return [dict(r) for r in rows]
|
||
|
||
|
||
async def fetch_total_rows(session: AsyncSession, where: str, params: dict) -> int:
|
||
sql = text(f"SELECT COUNT(*) AS n FROM ask_events WHERE {where}")
|
||
return (await session.execute(sql, params)).scalar_one()
|
||
|
||
|
||
async def fetch_regate_distribution(session, where, params) -> list[dict]:
|
||
"""Q1: defense_layers->>'re_gate' 분포."""
|
||
sql = text(f"""
|
||
SELECT
|
||
COALESCE(defense_layers->>'re_gate', '(null)') AS tier,
|
||
COUNT(*) AS n,
|
||
ROUND(100.0 * COUNT(*) / SUM(COUNT(*)) OVER (), 2) AS pct
|
||
FROM ask_events
|
||
WHERE {where}
|
||
GROUP BY 1
|
||
ORDER BY n DESC
|
||
""")
|
||
return [dict(r) for r in (await session.execute(sql, params)).mappings()]
|
||
|
||
|
||
async def fetch_score_histogram(session, where, params) -> list[dict]:
|
||
"""Q2: max_rerank_score 히스토그램 × bucket."""
|
||
sql = text(f"""
|
||
SELECT
|
||
CASE WHEN refused THEN 'refused'
|
||
WHEN completeness = 'full' THEN 'full'
|
||
WHEN completeness = 'partial' THEN 'partial'
|
||
ELSE 'insufficient' END AS bucket,
|
||
WIDTH_BUCKET(COALESCE(max_rerank_score, 0.0), 0.0, 1.0, 10) AS bin,
|
||
COUNT(*) AS n,
|
||
ROUND(AVG(max_rerank_score)::numeric, 3) AS avg_score
|
||
FROM ask_events
|
||
WHERE {where}
|
||
GROUP BY 1, 2
|
||
ORDER BY 1, 2
|
||
""")
|
||
return [dict(r) for r in (await session.execute(sql, params)).mappings()]
|
||
|
||
|
||
async def fetch_classifier_confusion(session, where, params) -> list[dict]:
|
||
"""Q3: classifier_verdict × completeness × refused."""
|
||
sql = text(f"""
|
||
SELECT
|
||
COALESCE(classifier_verdict, '(null)') AS verdict,
|
||
COALESCE(completeness, '(null)') AS completeness,
|
||
refused,
|
||
COUNT(*) AS n
|
||
FROM ask_events
|
||
WHERE {where}
|
||
GROUP BY 1, 2, 3
|
||
ORDER BY n DESC
|
||
""")
|
||
return [dict(r) for r in (await session.execute(sql, params)).mappings()]
|
||
|
||
|
||
async def fetch_verifier_distribution(session, where, params) -> list[dict]:
|
||
"""Q4: verifier severity 분포 (cast + COALESCE 안전 처리)."""
|
||
sql = text(f"""
|
||
SELECT
|
||
COALESCE(defense_layers->'verifier'->>'status', 'n/a') AS status,
|
||
COALESCE((defense_layers->'verifier'->>'medium_count')::int, 0) AS medium_count,
|
||
COALESCE((defense_layers->'verifier'->>'strong_count')::int, 0) AS strong_count,
|
||
COALESCE(completeness, '(null)') AS completeness,
|
||
COUNT(*) AS n
|
||
FROM ask_events
|
||
WHERE {where}
|
||
GROUP BY 1, 2, 3, 4
|
||
ORDER BY 1, 2, 3, 4
|
||
""")
|
||
return [dict(r) for r in (await session.execute(sql, params)).mappings()]
|
||
|
||
|
||
async def fetch_flag_frequencies(session, where, params) -> list[dict]:
|
||
"""Q5: hallucination_flags top-K, UNION ALL outer wrap.
|
||
|
||
출력: [{flag_type, strength, n}], n DESC, top 40.
|
||
"""
|
||
sql = text(f"""
|
||
SELECT * FROM (
|
||
SELECT split_part(flag, ':', 1) AS flag_type, 'strong' AS strength, COUNT(*) AS n
|
||
FROM ask_events,
|
||
jsonb_array_elements_text(defense_layers->'grounding'->'strong') AS flag
|
||
WHERE {where}
|
||
GROUP BY split_part(flag, ':', 1)
|
||
UNION ALL
|
||
SELECT split_part(flag, ':', 1) AS flag_type, 'weak' AS strength, COUNT(*) AS n
|
||
FROM ask_events,
|
||
jsonb_array_elements_text(defense_layers->'grounding'->'weak') AS flag
|
||
WHERE {where}
|
||
GROUP BY split_part(flag, ':', 1)
|
||
) u
|
||
ORDER BY n DESC
|
||
LIMIT 40
|
||
""")
|
||
return [dict(r) for r in (await session.execute(sql, params)).mappings()]
|
||
|
||
|
||
async def fetch_fabricated_strong_rate(session, where, params) -> dict[str, float]:
|
||
"""B1 검증용: fabricated_number strong rate (raw count 아님).
|
||
|
||
rate = (fabricated_number 가 strong 에 1+ 등장한 행) / 전체 ask_events 행.
|
||
"""
|
||
sql = text(f"""
|
||
SELECT
|
||
COUNT(*) AS total,
|
||
SUM(CASE WHEN EXISTS (
|
||
SELECT 1 FROM jsonb_array_elements_text(defense_layers->'grounding'->'strong') f
|
||
WHERE f LIKE 'fabricated_number:%%'
|
||
) THEN 1 ELSE 0 END) AS hit
|
||
FROM ask_events
|
||
WHERE {where}
|
||
""")
|
||
row = (await session.execute(sql, params)).mappings().one()
|
||
total = int(row["total"] or 0)
|
||
hit = int(row["hit"] or 0)
|
||
rate = (hit / total) if total > 0 else 0.0
|
||
return {"total": total, "fabricated_strong_hit": hit, "rate": round(rate, 4)}
|
||
|
||
|
||
async def fetch_eval_join_with_split(
|
||
session, where, params, eval_cases: dict[str, dict], split_filter: set[str] | None,
|
||
) -> dict[str, Any]:
|
||
"""Q6: eval_case_id 기반 join + query string fallback.
|
||
|
||
출력:
|
||
- mismatch_groups: [{expected, actual, n, sample_queries}]
|
||
- eval_case_id_present: int
|
||
- eval_case_id_null: int
|
||
- join_failed_count: int (id 도 없고 query normalize 도 매칭 안 된 행)
|
||
"""
|
||
sql = text(f"""
|
||
WITH ranked AS (
|
||
SELECT
|
||
id, eval_case_id, query, completeness, refused,
|
||
ROW_NUMBER() OVER (PARTITION BY COALESCE(eval_case_id, query)
|
||
ORDER BY created_at DESC) AS rn
|
||
FROM ask_events
|
||
WHERE {where} AND source = 'eval'
|
||
)
|
||
SELECT id, eval_case_id, query, completeness, refused
|
||
FROM ranked WHERE rn = 1
|
||
""")
|
||
rows = [dict(r) for r in (await session.execute(sql, params)).mappings()]
|
||
|
||
# query string normalize 헬퍼 (lower + trim + 공백 단일화)
|
||
import re as _re
|
||
def norm(q: str | None) -> str:
|
||
if not q:
|
||
return ""
|
||
return _re.sub(r"\s+", " ", q).strip().lower()
|
||
|
||
norm_to_id = {norm(c.get("query")): cid for cid, c in eval_cases.items()
|
||
if c.get("query")}
|
||
|
||
eval_case_id_present = 0
|
||
eval_case_id_null = 0
|
||
join_failed_count = 0
|
||
matched_pairs: list[tuple[str, dict, str, bool]] = [] # (cid, case, actual_completeness, actual_refused)
|
||
|
||
for row in rows:
|
||
cid = row.get("eval_case_id")
|
||
if cid:
|
||
eval_case_id_present += 1
|
||
case = eval_cases.get(cid)
|
||
if not case:
|
||
join_failed_count += 1
|
||
continue
|
||
else:
|
||
eval_case_id_null += 1
|
||
cid = norm_to_id.get(norm(row.get("query")))
|
||
if not cid:
|
||
join_failed_count += 1
|
||
continue
|
||
case = eval_cases.get(cid)
|
||
if not case:
|
||
join_failed_count += 1
|
||
continue
|
||
if split_filter is not None and cid not in split_filter:
|
||
continue
|
||
actual_completeness = row.get("completeness") or ("refused" if row.get("refused") else "(null)")
|
||
matched_pairs.append((cid, case, actual_completeness, bool(row.get("refused"))))
|
||
|
||
# group by (expected_behavior, actual)
|
||
groups: dict[tuple[str, str], list[str]] = {}
|
||
for cid, case, actual, refused in matched_pairs:
|
||
expected = case.get("expected_behavior", "(unknown)")
|
||
# eval JSONL 의 expected_behavior 가 'answered'/'refused'/...; actual 도 정규화
|
||
actual_norm = "refused" if refused else (actual or "(null)")
|
||
key = (expected, actual_norm)
|
||
groups.setdefault(key, []).append(case.get("query", ""))
|
||
|
||
mismatch_groups = []
|
||
for (exp, act), queries in sorted(groups.items(), key=lambda x: -len(x[1])):
|
||
mismatch_groups.append({
|
||
"expected": exp,
|
||
"actual": act,
|
||
"n": len(queries),
|
||
"sample_queries": queries[:3],
|
||
})
|
||
|
||
return {
|
||
"mismatch_groups": mismatch_groups,
|
||
"eval_case_id_present": eval_case_id_present,
|
||
"eval_case_id_null": eval_case_id_null,
|
||
"join_failed_count": join_failed_count,
|
||
"matched_total": len(matched_pairs),
|
||
}
|
||
|
||
|
||
async def fetch_fp_candidates(session, where, params, limit: int) -> list[dict]:
|
||
"""Q7: 3개 case (A/B/C) UNION ALL + candidate_reason 컬럼.
|
||
|
||
각 case 별 limit/3 분배 (case 간 양 균형).
|
||
"""
|
||
per_case = max(1, limit // 3)
|
||
sql = text(f"""
|
||
WITH base AS (
|
||
SELECT
|
||
id, query, completeness, refused, classifier_verdict,
|
||
max_rerank_score, aggregate_score,
|
||
defense_layers->'grounding'->'strong' AS g_strong,
|
||
defense_layers->'verifier'->>'medium_count' AS v_medium,
|
||
defense_layers->>'re_gate' AS re_gate,
|
||
answer_length, prompt_version, source, eval_case_id, created_at
|
||
FROM ask_events WHERE {where}
|
||
),
|
||
case_a AS (
|
||
SELECT *, 'refused_high_rerank' AS candidate_reason
|
||
FROM base
|
||
WHERE refused = true AND COALESCE(max_rerank_score, 0.0) >= 0.35
|
||
ORDER BY created_at DESC LIMIT :per_case
|
||
),
|
||
case_b AS (
|
||
SELECT *, 'insufficient_classifier_sufficient' AS candidate_reason
|
||
FROM base
|
||
WHERE completeness = 'insufficient' AND classifier_verdict = 'sufficient'
|
||
ORDER BY created_at DESC LIMIT :per_case
|
||
),
|
||
case_c AS (
|
||
SELECT *, 'partial_only_fabricated_number' AS candidate_reason
|
||
FROM base
|
||
WHERE completeness = 'partial'
|
||
AND jsonb_array_length(COALESCE(g_strong, '[]'::jsonb)) = 1
|
||
AND (g_strong->>0) LIKE 'fabricated_number:%%'
|
||
ORDER BY created_at DESC LIMIT :per_case
|
||
)
|
||
SELECT * FROM case_a
|
||
UNION ALL SELECT * FROM case_b
|
||
UNION ALL SELECT * FROM case_c
|
||
""")
|
||
params2 = {**params, "per_case": per_case}
|
||
return [dict(r) for r in (await session.execute(sql, params2)).mappings()]
|
||
|
||
|
||
async def fetch_answer_length_distribution(session, where, params) -> list[dict]:
|
||
"""Q8: answer_length p25/p50/p75 × bucket."""
|
||
sql = text(f"""
|
||
SELECT
|
||
CASE WHEN refused THEN 'refused' ELSE COALESCE(completeness, '(null)') END AS bucket,
|
||
PERCENTILE_CONT(0.25) WITHIN GROUP (ORDER BY answer_length) AS p25,
|
||
PERCENTILE_CONT(0.50) WITHIN GROUP (ORDER BY answer_length) AS p50,
|
||
PERCENTILE_CONT(0.75) WITHIN GROUP (ORDER BY answer_length) AS p75,
|
||
AVG(answer_length)::int AS avg,
|
||
COUNT(*) AS n
|
||
FROM ask_events
|
||
WHERE {where} AND answer_length IS NOT NULL
|
||
GROUP BY 1
|
||
ORDER BY 1
|
||
""")
|
||
return [dict(r) for r in (await session.execute(sql, params)).mappings()]
|
||
|
||
|
||
# ─── rendering ───────────────────────────────────────────
|
||
|
||
|
||
def _md_table(headers: list[str], rows: list[list[Any]]) -> str:
|
||
if not rows:
|
||
return "_(empty)_\n"
|
||
lines = ["| " + " | ".join(headers) + " |",
|
||
"|" + "|".join(["---"] * len(headers)) + "|"]
|
||
for row in rows:
|
||
lines.append("| " + " | ".join(str(v) for v in row) + " |")
|
||
return "\n".join(lines) + "\n"
|
||
|
||
|
||
def render_markdown(sections: dict[str, Any], args: argparse.Namespace,
|
||
delta: dict[str, Any] | None = None) -> str:
|
||
label = args.run_label
|
||
out: list[str] = [f"# Calibration Report — {label}\n"]
|
||
out.append(f"Filter: source={args.source} prompt_version={args.prompt_version} "
|
||
f"since={args.since} until={args.until} eval_split={args.eval_split}\n")
|
||
out.append(f"Total rows: **{sections['total_rows']}**\n")
|
||
|
||
# 0. shape inspect (--inspect-shape 시 본 출력 자체가 sample)
|
||
if "shape_sample" in sections:
|
||
out.append("## 0. defense_layers shape sample (latest 5)\n")
|
||
for s in sections["shape_sample"]:
|
||
out.append(f"- id={s['id']} created_at={s['created_at']}\n")
|
||
out.append(" ```json\n")
|
||
out.append(" " + json.dumps(s["defense_layers"], ensure_ascii=False, indent=2).replace("\n", "\n ") + "\n")
|
||
out.append(" ```\n")
|
||
|
||
# 1. re-gate
|
||
out.append("## 1. Re-gate tier 분포\n")
|
||
out.append(_md_table(["tier", "n", "pct"],
|
||
[[r["tier"], r["n"], f"{r['pct']}%"] for r in sections["regate"]]))
|
||
|
||
# 2. score histogram
|
||
out.append("## 2. max_rerank_score 히스토그램 (bucket × bin 0~10)\n")
|
||
out.append(_md_table(["bucket", "bin", "n", "avg_score"],
|
||
[[r["bucket"], r["bin"], r["n"], r["avg_score"]] for r in sections["score_hist"]]))
|
||
|
||
# 3. classifier confusion
|
||
out.append("## 3. Classifier 혼동행렬 (verdict × completeness × refused)\n")
|
||
out.append(_md_table(["verdict", "completeness", "refused", "n"],
|
||
[[r["verdict"], r["completeness"], r["refused"], r["n"]] for r in sections["classifier"]]))
|
||
|
||
# 4. verifier
|
||
out.append("## 4. Verifier severity 분포\n")
|
||
out.append(_md_table(["status", "medium_count", "strong_count", "completeness", "n"],
|
||
[[r["status"], r["medium_count"], r["strong_count"], r["completeness"], r["n"]]
|
||
for r in sections["verifier"]]))
|
||
|
||
# 5. flags — 3개 표 (전체 / strong / weak)
|
||
flags = sections["flags"]
|
||
flags_strong = [f for f in flags if f["strength"] == "strong"]
|
||
flags_weak = [f for f in flags if f["strength"] == "weak"]
|
||
out.append("## 5. Hallucination flags top-K\n")
|
||
out.append("### 5.1 전체 top-20\n")
|
||
out.append(_md_table(["flag_type", "strength", "n"],
|
||
[[r["flag_type"], r["strength"], r["n"]] for r in flags[:20]]))
|
||
out.append("### 5.2 strong only top-10\n")
|
||
out.append(_md_table(["flag_type", "n"],
|
||
[[r["flag_type"], r["n"]] for r in flags_strong[:10]]))
|
||
out.append("### 5.3 weak only top-10\n")
|
||
out.append(_md_table(["flag_type", "n"],
|
||
[[r["flag_type"], r["n"]] for r in flags_weak[:10]]))
|
||
|
||
# B1 감시 — fabricated_number strong rate
|
||
fab = sections["fabricated_rate"]
|
||
out.append("### 5.4 fabricated_number strong rate (B1 추적용)\n")
|
||
out.append(f"- total rows: {fab['total']}\n")
|
||
out.append(f"- fabricated_strong hit: {fab['fabricated_strong_hit']}\n")
|
||
out.append(f"- **rate: {fab['rate'] * 100:.2f}%**\n")
|
||
|
||
# 6. eval mismatch (eval 일 때만)
|
||
if "eval" in sections:
|
||
ev = sections["eval"]
|
||
out.append("## 6. Eval golden mismatch (eval_case_id 기반)\n")
|
||
out.append(f"- eval_case_id present: {ev['eval_case_id_present']}\n")
|
||
out.append(f"- eval_case_id null (fallback): {ev['eval_case_id_null']}\n")
|
||
out.append(f"- join_failed_count: **{ev['join_failed_count']}**\n")
|
||
out.append(f"- matched total: {ev['matched_total']}\n\n")
|
||
out.append(_md_table(["expected", "actual", "n", "sample"],
|
||
[[g["expected"], g["actual"], g["n"], " | ".join(g["sample_queries"])[:120]]
|
||
for g in ev["mismatch_groups"]]))
|
||
|
||
# 7. FP candidates
|
||
fps = sections["fp_candidates"]
|
||
out.append(f"## 7. FP candidate sample (n={len(fps)}, case A/B/C 분리)\n")
|
||
out.append(f"전체 CSV: `{sections.get('fp_csv_path', '(미생성)')}`\n\n")
|
||
out.append(_md_table(
|
||
["case", "id", "completeness", "refused", "verdict", "max_score", "re_gate", "query"],
|
||
[[r["candidate_reason"], r["id"], r["completeness"], r["refused"],
|
||
r["classifier_verdict"], r["max_rerank_score"], r["re_gate"],
|
||
(r["query"] or "")[:60]] for r in fps]))
|
||
|
||
# 8. answer_length
|
||
out.append("## 8. answer_length 분포 (bucket × percentile)\n")
|
||
out.append(_md_table(["bucket", "p25", "p50", "p75", "avg", "n"],
|
||
[[r["bucket"], r["p25"], r["p50"], r["p75"], r["avg"], r["n"]]
|
||
for r in sections["answer_length"]]))
|
||
|
||
# 9. delta vs baseline
|
||
if delta:
|
||
out.append("## 9. Delta vs baseline\n")
|
||
out.append("```json\n")
|
||
out.append(json.dumps(delta, ensure_ascii=False, indent=2, default=str))
|
||
out.append("\n```\n")
|
||
|
||
return "".join(out)
|
||
|
||
|
||
def render_json(sections: dict[str, Any]) -> str:
|
||
return json.dumps(sections, ensure_ascii=False, indent=2, default=str)
|
||
|
||
|
||
def compute_delta(current: dict[str, Any], baseline: dict[str, Any]) -> dict[str, Any]:
|
||
"""간단 delta: total_rows + regate pct + fabricated_rate.
|
||
|
||
더 세밀한 비교는 향후 확장.
|
||
"""
|
||
delta: dict[str, Any] = {}
|
||
delta["total_rows"] = {
|
||
"current": current.get("total_rows"),
|
||
"baseline": baseline.get("total_rows"),
|
||
"diff": (current.get("total_rows") or 0) - (baseline.get("total_rows") or 0),
|
||
}
|
||
# regate tier 별 pct delta
|
||
base_regate = {r["tier"]: float(r["pct"]) for r in baseline.get("regate", [])}
|
||
cur_regate = {r["tier"]: float(r["pct"]) for r in current.get("regate", [])}
|
||
delta["regate_pct_diff_pp"] = {
|
||
tier: round(cur_regate.get(tier, 0.0) - base_regate.get(tier, 0.0), 2)
|
||
for tier in set(base_regate) | set(cur_regate)
|
||
}
|
||
# fabricated rate delta
|
||
cur_fr = current.get("fabricated_rate", {}).get("rate", 0.0)
|
||
base_fr = baseline.get("fabricated_rate", {}).get("rate", 0.0)
|
||
delta["fabricated_strong_rate"] = {
|
||
"current": cur_fr, "baseline": base_fr,
|
||
"diff_pp": round((cur_fr - base_fr) * 100, 2),
|
||
"rel_change_pct": (round((cur_fr - base_fr) / base_fr * 100, 2)
|
||
if base_fr > 0 else None),
|
||
}
|
||
return delta
|
||
|
||
|
||
# ─── FP CSV dump ──────────────────────────────────────────
|
||
|
||
|
||
def dump_fp_csv(rows: list[dict], path: Path) -> None:
|
||
path.parent.mkdir(parents=True, exist_ok=True)
|
||
if not rows:
|
||
path.write_text("", encoding="utf-8")
|
||
return
|
||
# 안정된 컬럼 순서 (plan 명세)
|
||
cols = [
|
||
"id", "candidate_reason", "query", "completeness", "refused",
|
||
"classifier_verdict", "max_rerank_score", "aggregate_score",
|
||
"g_strong", "v_medium", "re_gate", "answer_length",
|
||
"prompt_version", "source", "eval_case_id", "created_at",
|
||
"is_true_fp", # 사용자 수기 작성용 공란
|
||
]
|
||
with path.open("w", encoding="utf-8", newline="") as f:
|
||
w = csv.DictWriter(f, fieldnames=cols)
|
||
w.writeheader()
|
||
for r in rows:
|
||
row_out = {c: r.get(c) for c in cols if c != "is_true_fp"}
|
||
row_out["is_true_fp"] = ""
|
||
# JSONB / dict 는 문자열로
|
||
for k, v in list(row_out.items()):
|
||
if isinstance(v, (list, dict)):
|
||
row_out[k] = json.dumps(v, ensure_ascii=False)
|
||
w.writerow(row_out)
|
||
|
||
|
||
# ─── dry-run (DB 없이 fixture 로드) ───────────────────────
|
||
|
||
|
||
def dry_run_sections() -> dict[str, Any]:
|
||
if not DRY_RUN_FIXTURE.exists():
|
||
# 최소한의 inline fixture
|
||
return {
|
||
"total_rows": 3,
|
||
"regate": [{"tier": "clean", "n": 2, "pct": 66.67},
|
||
{"tier": "refuse(grounding_2+strong)", "n": 1, "pct": 33.33}],
|
||
"score_hist": [],
|
||
"classifier": [],
|
||
"verifier": [],
|
||
"flags": [],
|
||
"fabricated_rate": {"total": 3, "fabricated_strong_hit": 0, "rate": 0.0},
|
||
"fp_candidates": [],
|
||
"answer_length": [],
|
||
}
|
||
return json.loads(DRY_RUN_FIXTURE.read_text(encoding="utf-8"))
|
||
|
||
|
||
# ─── main ─────────────────────────────────────────────────
|
||
|
||
|
||
async def run(args: argparse.Namespace) -> None:
|
||
if args.dry_run:
|
||
sections = dry_run_sections()
|
||
sections.setdefault("fp_csv_path", "(dry-run, CSV skipped)")
|
||
_emit(args, sections)
|
||
return
|
||
|
||
# DB 연결
|
||
database_url = os.getenv(
|
||
"DATABASE_URL", "postgresql+asyncpg://pkm:pkm@localhost:5432/pkm"
|
||
)
|
||
engine = create_async_engine(database_url, echo=False)
|
||
session_factory = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
|
||
|
||
async with session_factory() as session:
|
||
if args.inspect_shape:
|
||
sample = await fetch_shape_inspect(session)
|
||
print(json.dumps(
|
||
[{"id": s["id"], "created_at": str(s["created_at"]),
|
||
"defense_layers": s["defense_layers"]} for s in sample],
|
||
ensure_ascii=False, indent=2, default=str,
|
||
))
|
||
await engine.dispose()
|
||
return
|
||
|
||
where, params = build_filters(args)
|
||
total = await fetch_total_rows(session, where, params)
|
||
if total == 0:
|
||
print(f"WARNING: 필터 조건에 매칭되는 ask_events 행 0건. "
|
||
f"source={args.source} prompt_version={args.prompt_version} "
|
||
f"since={args.since} until={args.until}")
|
||
|
||
sections: dict[str, Any] = {"total_rows": total}
|
||
sections["regate"] = await fetch_regate_distribution(session, where, params)
|
||
sections["score_hist"] = await fetch_score_histogram(session, where, params)
|
||
sections["classifier"] = await fetch_classifier_confusion(session, where, params)
|
||
sections["verifier"] = await fetch_verifier_distribution(session, where, params)
|
||
sections["flags"] = await fetch_flag_frequencies(session, where, params)
|
||
sections["fabricated_rate"] = await fetch_fabricated_strong_rate(session, where, params)
|
||
sections["fp_candidates"] = await fetch_fp_candidates(
|
||
session, where, params, args.sample_limit)
|
||
sections["answer_length"] = await fetch_answer_length_distribution(
|
||
session, where, params)
|
||
|
||
# eval 전용
|
||
if args.source == "eval":
|
||
cases = load_eval_golden(EVAL_GOLDEN_PATH)
|
||
split_filter = (filter_eval_split(cases, args.eval_split)
|
||
if args.eval_split != "all" else None)
|
||
sections["eval"] = await fetch_eval_join_with_split(
|
||
session, where, params, cases, split_filter)
|
||
|
||
await engine.dispose()
|
||
|
||
# FP CSV dump
|
||
fp_csv = (Path(args.fp_artifacts) if args.fp_artifacts else
|
||
ARTIFACTS_DIR / f"fp_candidates_{args.run_label}.csv")
|
||
dump_fp_csv(sections["fp_candidates"], fp_csv)
|
||
sections["fp_csv_path"] = str(fp_csv)
|
||
|
||
_emit(args, sections)
|
||
|
||
|
||
def _emit(args: argparse.Namespace, sections: dict[str, Any]) -> None:
|
||
"""rendering + 파일 쓰기. compare-against 처리."""
|
||
delta = None
|
||
if args.compare_against:
|
||
baseline_path = Path(args.compare_against)
|
||
if baseline_path.exists():
|
||
baseline = json.loads(baseline_path.read_text(encoding="utf-8"))
|
||
delta = compute_delta(sections, baseline)
|
||
else:
|
||
print(f"WARNING: compare-against baseline not found: {baseline_path}")
|
||
|
||
md = render_markdown(sections, args, delta)
|
||
out_path = Path(args.output)
|
||
out_path.parent.mkdir(parents=True, exist_ok=True)
|
||
out_path.write_text(md, encoding="utf-8")
|
||
print(f"✓ markdown report: {out_path}")
|
||
|
||
if args.format == "json":
|
||
json_path = out_path.with_suffix(".json")
|
||
json_path.write_text(render_json(sections), encoding="utf-8")
|
||
print(f"✓ json baseline: {json_path}")
|
||
|
||
|
||
def main() -> None:
|
||
args = parse_args()
|
||
asyncio.run(run(args))
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|