Files
hyungi_document_server/scripts/phase1d_pilot.py
T
Hyungi Ahn 0362f52130 fix(scripts): Phase 1D enqueue 가 existing_success 재처리하지 않도록 필터
Round 2 sample 에 existing_success 5건 (anchor doc 4809 + calibration 4)
이 포함되었지만, cmd_enqueue 가 sample_source 무시하고 30건 전부 enqueue
하던 버그. 결과:
  - existing 5건 marker 재처리 (~25분 marker 시간 낭비)
  - 동일 quality output 으로 md_content overwrite → baseline 유실
  - anchor (doc 4809) 의 "before" 상태가 사라져 후속 라운드 비교 anchor 손상

Fix:
  - default = sample_source == "controlled_backfill" 만 enqueue (25건)
  - --include-existing flag 추가 (후속 Marker 튜닝 라운드에서 anchor 재처리
    필요 시 사용)
  - print 로 mode 명시 + 제외된 ids 표시

야간 단발 sweep (23:00 KST) 예약 실행 전 fix.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-02 16:27:31 +09:00

790 lines
31 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Phase 1D — marker markdown 품질 pilot (one-shot admin script).
* 영구 worker 경로 아님. 30건 한정 sample 로 baseline 품질 측정.
* Phase 2 전체 백필 결정은 1D 결과 보고 후행.
* 1B.5 (이미지 추출 / _meta 보존) 는 별도 PR — 본 스크립트 영역 아님.
Stratification (Round 2 refined, plan: ~/.claude/plans/stratified-mingling-otter.md):
4 축: doc_type × file_size_band × text_density_band × handwritten_hint
+ sample_source ∈ {existing_success, controlled_backfill}
- existing_success 5건 (anchor 1 + calibration 4)
- controlled_backfill 25건 (handwritten 3 / scan_likely 2~3 / mixed 5 / born_digital 12 / large 2)
+ forced_include: doc 4809 (Note_240805_용접교육 필기) — known bad handwritten anchor.
document_type ∈ SKIP_DOC_TYPES 제외 (marker_worker 룰 미러).
Subcommands:
select stratified 30건 dry-run + CSV+JSON 저장
enqueue select 결과를 markdown 큐에 enqueue (uq_queue_active 위반 회피)
report md_status 분포·실패사유·quality 메트릭·UI 검수 URL 출력
eval_template pilot_1d_eval.csv 스켈레톤 출력 (사용자가 rubric 5축 점수 채움)
실행 (GPU 서버):
docker compose exec fastapi python /app/scripts/phase1d_pilot.py select \
--csv /app/evals/markdown/pilot_1d_sample.csv
docker compose exec fastapi python /app/scripts/phase1d_pilot.py enqueue --yes
docker compose exec fastapi python /app/scripts/phase1d_pilot.py report
docker compose exec fastapi python /app/scripts/phase1d_pilot.py eval_template \
--csv /app/evals/markdown/pilot_1d_eval.csv
"""
import argparse
import asyncio
import csv
import json
import os
import random
import re
import sys
from collections import Counter, defaultdict
from pathlib import Path
# fastapi 컨테이너는 WORKDIR=/app 에 코드를 펼쳐놓음 (app/ 디렉토리 없음).
# /app/scripts/../app 이 아니라 /app 자체가 sys.path 에 있어야 `from models...` import 가능.
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
# marker_worker 의 SKIP 룰 미러 — drift 회피 위해 한 곳만 진실. 변경 시 동기 필요.
SKIP_DOC_TYPES = {
"발주서", "세금계산서", "명세표",
"Invoice", "Purchase_Order", "Estimate", "Statement",
}
# file_size bucket (page_count proxy). PDF 평균 1페이지 ~10~50KB.
SIZE_BUCKETS = [
("small", 0, 500 * 1024), # < 500KB
("medium", 500 * 1024, 5 * 1024 * 1024), # 500KB ~ 5MB
("large", 5 * 1024 * 1024, 10**12), # > 5MB
]
# 4축 stratification 의 file_size_band — Round 2 plan
FILE_SIZE_BAND_THRESHOLDS = [
("S", 0, 1 * 1024 * 1024), # < 1MB
("M", 1 * 1024 * 1024, 10 * 1024 * 1024), # 1~10MB
("L", 10 * 1024 * 1024, 10**12), # > 10MB
]
# text_density (chars per KB of file) — born-digital vs scan 구분 단일 깨끗한 proxy.
# 0.17 (필기 4809) ↔ 94 (born-digital 3759) 양 끝 검증됨.
TEXT_DENSITY_BANDS = [
("scan-likely", 0.0, 5.0),
("mixed", 5.0, 50.0),
("born-digital", 50.0, float("inf")),
]
HANDWRITTEN_HINT_REGEX = re.compile(r"필기|노트|handwritten|scan|스캔|note", re.IGNORECASE)
# Forced include — 사용자 시각 확인에서 발견된 known bad anchor.
# 1D 결과로 다음 라운드 튜닝 시 같은 문서를 재변환해 개선 여부 판정.
FORCED_INCLUDES: dict[int, str] = {
4809: "known_bad_handwritten_anchor",
}
# 재현성 시드 — 한 번 만든 sample CSV 가 동일 결과 보장.
SAMPLE_SEED = 20260502
PILOT_TARGET = 30
EXISTING_SUCCESS_TARGET = 5
CONTROLLED_BACKFILL_TARGET = PILOT_TARGET - EXISTING_SUCCESS_TARGET # 25
DEFAULT_OUT = Path("/tmp/phase1d_pilot.json")
DEFAULT_CSV = Path("/tmp/phase1d_pilot.csv")
DEFAULT_EVAL_CSV = Path("/tmp/phase1d_eval.csv")
def _bucket(file_size: int | None) -> str:
"""legacy 3-bucket — cmd_report 의 file_size bucket 호환."""
if file_size is None:
return "unknown"
for name, lo, hi in SIZE_BUCKETS:
if lo <= file_size < hi:
return name
return "outlier"
def _file_size_band(file_size: int | None) -> str:
"""Round 2 refined band: S / M / L."""
if file_size is None:
return "unknown"
for name, lo, hi in FILE_SIZE_BAND_THRESHOLDS:
if lo <= file_size < hi:
return name
return "L"
def _text_density(text_len: int, file_size: int | None) -> float | None:
"""chars per KB of file. file_size==0/None 이면 None."""
if not file_size or file_size <= 0:
return None
return text_len / (file_size / 1024.0)
def _text_density_band(density: float | None) -> str:
if density is None:
return "unknown"
for name, lo, hi in TEXT_DENSITY_BANDS:
if lo <= density < hi:
return name
return "unknown"
def _handwritten_hint(title: str | None, file_path: str | None) -> str:
"""title 또는 file_path 에 필기/노트/handwritten/scan 매칭 → 'hi' / 'lo'."""
blob = " ".join(filter(None, [title or "", file_path or ""]))
return "hi" if HANDWRITTEN_HINT_REGEX.search(blob) else "lo"
def _scan_likely(text_len: int, file_size: int | None, density: float | None) -> bool:
"""text_density < 5 또는 extracted_text 부재 → 스캔 가능성 높음."""
if text_len == 0:
return True
if density is not None and density < 5.0:
return True
return False
def _script_mix(extracted_text: str | None, sample_chars: int = 10000) -> str:
"""첫 N자에서 Hangul/CJK/Hiragana/Katakana/Latin 비율로 라벨링.
한 script ≥ 0.7 → '<script>_dominant'
두 script 각 ≥ 0.1 → 'mixed_<a>_<b>'
그 외 → 'unknown'.
mojibake/OCR 노이즈가 심하면 비율이 이상하게 나오는데, 그것도 신호.
"""
if not extracted_text:
return "unknown"
sample = extracted_text[:sample_chars]
counts = {"hangul": 0, "cjk": 0, "kana": 0, "latin": 0, "other": 0}
total = 0
for ch in sample:
cp = ord(ch)
if ch.isspace():
continue
total += 1
if 0xAC00 <= cp <= 0xD7A3 or 0x1100 <= cp <= 0x11FF or 0x3130 <= cp <= 0x318F:
counts["hangul"] += 1
elif 0x4E00 <= cp <= 0x9FFF or 0x3400 <= cp <= 0x4DBF:
counts["cjk"] += 1
elif 0x3040 <= cp <= 0x30FF:
counts["kana"] += 1
elif (0x0041 <= cp <= 0x005A) or (0x0061 <= cp <= 0x007A) or (0x00C0 <= cp <= 0x024F):
counts["latin"] += 1
else:
counts["other"] += 1
if total == 0:
return "unknown"
ratios = {k: v / total for k, v in counts.items() if k != "other"}
primary = sorted(ratios.items(), key=lambda x: -x[1])
if primary[0][1] >= 0.7:
return f"{primary[0][0]}_dominant"
significant = [name for name, r in primary if r >= 0.1]
if len(significant) >= 2:
return "mixed_" + "_".join(sorted(significant[:2]))
return "unknown"
def _page_count_estimate(md_extraction_quality: dict | None) -> int | None:
"""existing_success 의 marker quality.metrics.source_page_count 가 있으면 사용.
controlled_backfill 은 marker 가 변환 시 PyMuPDF 로 채울 예정 → NULL.
사후 평가 해석용 보조값."""
if not md_extraction_quality or not isinstance(md_extraction_quality, dict):
return None
metrics = md_extraction_quality.get("metrics")
if not isinstance(metrics, dict):
return None
pc = metrics.get("source_page_count")
if isinstance(pc, int):
return pc
return None
# Sample CSV 컬럼 순서 — plan §"Sample CSV 컬럼" 과 동일.
CSV_COLUMNS = [
"doc_id", "title", "sample_source", "forced_include_reason", "bucket_label",
"doc_type", "file_size", "file_size_band",
"text_len", "text_density", "text_density_band",
"handwritten_hint", "scan_likely", "script_mix",
"page_count_estimate",
]
def _build_engine() -> "AsyncEngine":
db_url = os.environ["DATABASE_URL"]
return create_async_engine(db_url, pool_pre_ping=True)
# ─── select ───
def _enrich_row(r) -> dict:
"""Document row → sample dict (compute proxies)."""
text_len = len(r.extracted_text or "")
density = _text_density(text_len, r.file_size)
return {
"doc_id": r.id,
"title": r.title,
"doc_type": r.document_type,
"file_size": r.file_size or 0,
"file_size_band": _file_size_band(r.file_size),
"text_len": text_len,
"text_density": round(density, 3) if density is not None else None,
"text_density_band": _text_density_band(density),
"handwritten_hint": _handwritten_hint(r.title, r.file_path),
"scan_likely": _scan_likely(text_len, r.file_size, density),
"script_mix": _script_mix(r.extracted_text),
"page_count_estimate": _page_count_estimate(r.md_extraction_quality),
"_file_path": r.file_path,
"_ai_domain": r.ai_domain,
}
def _allocate_controlled_backfill(candidates: list[dict], rng: random.Random) -> list[dict]:
"""controlled_backfill 25건 — 4축 stratified + 의도적 over-sample.
plan §"Sample budget":
handwritten 3 / scan_likely 2~3 / mixed 5 / born_digital 12 / large 2
"""
selected: list[dict] = []
used: set[int] = set()
def take(pool: list[dict], n: int, label: str) -> None:
avail = [c for c in pool if c["doc_id"] not in used]
rng.shuffle(avail)
for c in avail[:n]:
c["bucket_label"] = label
selected.append(c)
used.add(c["doc_id"])
# 1. handwritten_hint=hi (전 3건 채택, 모집단 1.1% → sample 10%)
take([c for c in candidates if c["handwritten_hint"] == "hi"], 3, "handwritten")
# 2. scan-likely (handwritten 과 dedupe 후 2~3건)
take([c for c in candidates if c["text_density_band"] == "scan-likely"], 3, "scan_likely")
# 3. mixed (5건)
take([c for c in candidates if c["text_density_band"] == "mixed"], 5, "mixed")
# 4. born-digital × doc_type 다양 (12건). doc_type 분포 가이드:
# study_note 3 / Academic_Paper 3 / Reference 2 / Note 1 / (Manual+Standard+Specification) 2 / NULL 1
born_digital = [c for c in candidates if c["text_density_band"] == "born-digital" and c["doc_id"] not in used]
by_type: dict[str | None, list[dict]] = defaultdict(list)
for c in born_digital:
by_type[c["doc_type"]].append(c)
for pool in by_type.values():
rng.shuffle(pool)
target_quota = [
("study_note", 3),
("Academic_Paper", 3),
("Reference", 2),
("Note", 1),
("Manual", 1),
("Standard", 1),
(None, 1), # NULL
]
born_added = 0
for dt, n in target_quota:
for c in by_type.get(dt, [])[:n]:
if c["doc_id"] in used:
continue
c["bucket_label"] = "born_digital"
selected.append(c)
used.add(c["doc_id"])
born_added += 1
if born_added >= 12:
break
if born_added >= 12:
break
# 12 미달 시 남은 born-digital 로 채움 (doc_type 무관)
if born_added < 12:
leftover = [c for c in born_digital if c["doc_id"] not in used]
rng.shuffle(leftover)
for c in leftover[: 12 - born_added]:
c["bucket_label"] = "born_digital"
selected.append(c)
used.add(c["doc_id"])
# 5. file_size band=L (>10MB) — 위 4 bucket 안 든 것 보충 (목표 2건)
large_pool = [c for c in candidates if c["file_size_band"] == "L" and c["doc_id"] not in used]
take(large_pool, 2, "large")
# 30 - existing_success(5) = 25 가 목표. 부족하면 일반 pending 에서 보충.
if len(selected) < CONTROLLED_BACKFILL_TARGET:
leftover = [c for c in candidates if c["doc_id"] not in used]
rng.shuffle(leftover)
for c in leftover[: CONTROLLED_BACKFILL_TARGET - len(selected)]:
c["bucket_label"] = "filler"
selected.append(c)
used.add(c["doc_id"])
return selected[:CONTROLLED_BACKFILL_TARGET]
def _allocate_existing_success(rows: list, rng: random.Random) -> list[dict]:
"""existing_success 5건 = forced anchor 1 + calibration 4 (text_density 분포 균형)."""
enriched = [_enrich_row(r) for r in rows]
by_id = {c["doc_id"]: c for c in enriched}
selected: list[dict] = []
used: set[int] = set()
# forced_include
for fid, reason in FORCED_INCLUDES.items():
if fid in by_id:
c = by_id[fid]
c["bucket_label"] = "existing_anchor"
c["forced_include_reason"] = reason
selected.append(c)
used.add(fid)
else:
print(f"[warn] forced_include doc_id={fid} 가 existing_success 후보에 없음 — skip")
# calibration 4건 — text_density 분포 균형
remaining = [c for c in enriched if c["doc_id"] not in used]
quotas: list[tuple[str, int]] = [("scan-likely", 1), ("mixed", 1), ("born-digital", 2)]
for band, n in quotas:
pool = [c for c in remaining if c["text_density_band"] == band and c["doc_id"] not in used]
rng.shuffle(pool)
for c in pool[:n]:
c["bucket_label"] = "existing_calibration"
selected.append(c)
used.add(c["doc_id"])
if len(selected) < EXISTING_SUCCESS_TARGET:
leftover = [c for c in remaining if c["doc_id"] not in used]
rng.shuffle(leftover)
for c in leftover[: EXISTING_SUCCESS_TARGET - len(selected)]:
c["bucket_label"] = "existing_calibration"
selected.append(c)
used.add(c["doc_id"])
return selected[:EXISTING_SUCCESS_TARGET]
def _print_distribution(samples: list[dict]) -> None:
print(f"\n선정 {len(samples)}건 (목표 {PILOT_TARGET}):\n")
print(f"{'ID':>6} {'KB':>8} {'src':<22} {'bucket':<22} {'doctype':<22} {'density':>8} title")
print("-" * 160)
for s in sorted(samples, key=lambda x: (x["sample_source"], x["bucket_label"], x["doc_id"])):
d = s["text_density"]
density_s = f"{d:.2f}" if d is not None else "-"
print(
f"{s['doc_id']:>6} {(s['file_size'] // 1024):>8} "
f"{s['sample_source'][:22]:<22} "
f"{s['bucket_label'][:22]:<22} "
f"{(s['doc_type'] or '-')[:22]:<22} "
f"{density_s:>8} "
f"{(s['title'] or '-')[:60]}"
)
by_axis = lambda key: Counter(s[key] for s in samples)
print("\n분포:")
for axis in ("sample_source", "bucket_label", "doc_type", "file_size_band", "text_density_band", "handwritten_hint", "script_mix"):
c = by_axis(axis)
line = ", ".join(f"{k}={v}" for k, v in sorted(c.items(), key=lambda x: -x[1]))
print(f" {axis}: {line}")
async def cmd_select(out_path: Path, csv_path: Path | None) -> None:
engine = _build_engine()
Session = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
from models.document import Document # type: ignore
async with Session() as session:
# A. existing_success — 기존에 marker_worker 가 변환 성공한 PDFs (anchor + calibration).
es_rows = (
await session.execute(
select(
Document.id, Document.title, Document.ai_domain, Document.document_type,
Document.file_size, Document.file_path, Document.file_format,
Document.extracted_text, Document.md_extraction_quality,
).where(
Document.deleted_at.is_(None),
Document.file_format == "pdf",
Document.md_status == "success",
)
)
).all()
# B. controlled_backfill — pending PDF document, SKIP_DOC_TYPES 제외.
cb_rows = (
await session.execute(
select(
Document.id, Document.title, Document.ai_domain, Document.document_type,
Document.file_size, Document.file_path, Document.file_format,
Document.extracted_text, Document.md_extraction_quality,
).where(
Document.deleted_at.is_(None),
Document.file_format == "pdf",
Document.md_status == "pending",
Document.category == "document",
)
)
).all()
cb_candidates_raw = [
r for r in cb_rows if not (r.document_type and r.document_type in SKIP_DOC_TYPES)
]
print(
f"existing_success PDFs: {len(es_rows)}건 / "
f"controlled_backfill 후보 (SKIP 제외): {len(cb_candidates_raw)}"
f"(전체 pending PDF document: {len(cb_rows)}건)"
)
rng = random.Random(SAMPLE_SEED)
existing_samples = _allocate_existing_success(es_rows, rng)
for s in existing_samples:
s["sample_source"] = "existing_success"
s.setdefault("forced_include_reason", "")
cb_candidates = [_enrich_row(r) for r in cb_candidates_raw]
backfill_samples = _allocate_controlled_backfill(cb_candidates, rng)
for s in backfill_samples:
s["sample_source"] = "controlled_backfill"
s.setdefault("forced_include_reason", "")
samples = existing_samples + backfill_samples
_print_distribution(samples)
# CSV 저장 (사용자 review + commit 대상)
if csv_path is not None:
csv_path.parent.mkdir(parents=True, exist_ok=True)
with csv_path.open("w", newline="", encoding="utf-8") as f:
writer = csv.DictWriter(f, fieldnames=CSV_COLUMNS, extrasaction="ignore")
writer.writeheader()
for s in samples:
row = {col: s.get(col) for col in CSV_COLUMNS}
# bool → str (CSV 가독성)
row["scan_likely"] = "true" if s.get("scan_likely") else "false"
writer.writerow(row)
print(f"\nCSV 저장: {csv_path}")
# JSON 저장 — cmd_enqueue / cmd_report 호환용 (기존 schema 유지)
payload = {
"target": PILOT_TARGET,
"seed": SAMPLE_SEED,
"ids": [s["doc_id"] for s in samples],
"items": [
{
"id": s["doc_id"],
"title": s["title"],
"ai_domain": s.get("_ai_domain"),
"document_type": s["doc_type"],
"file_size": s["file_size"],
"file_path": s.get("_file_path"),
"sample_source": s["sample_source"],
"bucket_label": s["bucket_label"],
"forced_include_reason": s.get("forced_include_reason", ""),
}
for s in samples
],
}
out_path.write_text(json.dumps(payload, ensure_ascii=False, indent=2))
print(f"JSON 저장: {out_path}")
await engine.dispose()
# ─── enqueue ───
async def cmd_enqueue(in_path: Path, yes: bool, include_existing: bool = False) -> None:
from datetime import datetime, timezone
from sqlalchemy import func
raw_payload = in_path.read_text()
payload = json.loads(raw_payload)
items: list[dict] = payload.get("items", [])
# Round 2 정책: existing_success (5건, anchor + calibration) 는 enqueue 제외.
# 그들은 기존 md_content 그대로 두고 평가 anchor 로 사용. 재처리 시 marker 시간
# 낭비 + 같은 quality output overwrite 로 baseline 유실. controlled_backfill (25건)
# 만 새로 변환. 후속 라운드 (Marker 튜닝 후) anchor 재처리 필요 시 --include-existing.
if include_existing:
target_items = items
ids: list[int] = [it["id"] for it in target_items]
print(f"[mode] include_existing=True — sample 30건 전부 enqueue")
else:
target_items = [it for it in items if it.get("sample_source") == "controlled_backfill"]
ids = [it["id"] for it in target_items]
excluded = [it["id"] for it in items if it.get("sample_source") != "controlled_backfill"]
print(
f"[mode] controlled_backfill 만 enqueue ({len(ids)}건). "
f"existing_success 제외 ({len(excluded)}건): {excluded}"
)
if not ids:
print("[abort] enqueue 대상 없음.")
return
ts = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ")
# (1) /tmp/phase1d_pilot.json 원본 보존 — 재실행/덮어쓰기 대비 timestamped 사본
backup_path = in_path.with_name(f"{in_path.stem}_pre_enqueue_{ts}.json")
backup_path.write_text(raw_payload)
print(f"[backup] {backup_path}")
# (2) enqueue 대상 document_id 목록 (한 줄)
print(f"[targets] {len(ids)}건: {ids}")
engine = _build_engine()
Session = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
from models.document import Document # type: ignore
from models.queue import enqueue_stage # type: ignore
# (3) 실행 전 md_status 분포 스냅샷
async with Session() as session:
snap_rows = (
await session.execute(
select(Document.md_status, func.count())
.where(Document.deleted_at.is_(None))
.group_by(Document.md_status)
)
).all()
snapshot = {
"timestamp_utc": ts,
"scope": "documents WHERE deleted_at IS NULL",
"md_status_distribution": {str(s): int(c) for s, c in snap_rows},
"sample_ids": ids,
}
snap_path = in_path.with_name(f"phase1d_md_status_pre_{ts}.json")
snap_path.write_text(json.dumps(snapshot, ensure_ascii=False, indent=2))
print(f"[snapshot] {snap_path}")
print(f" {snapshot['md_status_distribution']}")
if not yes:
confirm = input(f"\n{len(ids)}건 markdown 큐에 enqueue 합니다. 진행? [y/N] ")
if confirm.strip().lower() not in ("y", "yes"):
print("취소됨.")
await engine.dispose()
return
enqueued, skipped = [], []
async with Session() as session:
for doc_id in ids:
ok = await enqueue_stage(session, doc_id, "markdown")
(enqueued if ok else skipped).append(doc_id)
await session.commit()
print(f"\nenqueued: {len(enqueued)}, skipped (이미 active): {len(skipped)}")
if skipped:
print(f" skipped ids: {skipped[:20]}{'' if len(skipped) > 20 else ''}")
await engine.dispose()
# ─── report ───
def _stat(vals: list[float]) -> str:
if not vals:
return "-"
s = sorted(vals)
n = len(s)
p50 = s[n // 2]
p90 = s[min(n - 1, int(n * 0.9))]
return f"min={s[0]:.2f} p50={p50:.2f} p90={p90:.2f} max={s[-1]:.2f}"
KATEX_INLINE_RE = re.compile(r"(?<!\$)\$[^$\n]{1,160}\$(?!\$)")
async def cmd_report(in_path: Path) -> None:
payload = json.loads(in_path.read_text())
ids: list[int] = payload["ids"]
engine = _build_engine()
Session = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
from models.document import Document # type: ignore
from models.queue import ProcessingQueue # type: ignore
async with Session() as session:
rows = (
await session.execute(
select(
Document.id, Document.title, Document.ai_domain, Document.file_size,
Document.md_status, Document.md_extraction_error,
Document.md_extraction_quality, Document.md_content,
Document.md_generated_at, Document.created_at,
).where(Document.id.in_(ids))
)
).all()
q_rows = (
await session.execute(
select(
ProcessingQueue.document_id,
ProcessingQueue.status,
ProcessingQueue.attempts,
ProcessingQueue.started_at,
ProcessingQueue.completed_at,
).where(
ProcessingQueue.document_id.in_(ids),
ProcessingQueue.stage == "markdown",
)
)
).all()
elapsed: dict[int, float] = {}
queue_status: dict[int, str] = {}
queue_attempts: dict[int, int] = {}
for q in q_rows:
queue_status[q.document_id] = q.status
queue_attempts[q.document_id] = q.attempts
if q.started_at and q.completed_at:
elapsed[q.document_id] = (q.completed_at - q.started_at).total_seconds()
by_status = Counter(r.md_status for r in rows)
print(f"\n## md_status 분포 (sample {len(rows)}건)")
for s, c in sorted(by_status.items(), key=lambda x: -x[1]):
print(f" {s:<12} {c}")
in_queue = [i for i in ids if queue_status.get(i) in {"pending", "processing"}]
if in_queue:
print(f"\n 처리 대기/중 (큐 active): {len(in_queue)}")
es = [v for k, v in elapsed.items()
if next((r for r in rows if r.id == k and r.md_status == "success"), None)]
if es:
print(f"\n## 평균 처리 시간 (success {len(es)}건)")
print(f" avg={sum(es)/len(es):.1f}s {_stat(es)}")
else:
print("\n## 처리 시간: 측정 가능한 success 없음")
fail_reasons: Counter = Counter()
for r in rows:
if r.md_status == "failed":
fail_reasons[(r.md_extraction_error or "unknown")[:140]] += 1
if fail_reasons:
print("\n## 실패 사유 top 5")
for reason, c in fail_reasons.most_common(5):
print(f" {c:>3} {reason}")
text_ratios, heading_jumps, table_rows, image_counts = [], [], [], []
warning_kinds: Counter = Counter()
katex_candidates, table_with_rows = [], []
has_headings = 0
success_rows = [r for r in rows if r.md_status == "success"]
for r in success_rows:
q = r.md_extraction_quality or {}
m = q.get("metrics", {}) or {}
if m.get("text_length_ratio") is not None: text_ratios.append(m["text_length_ratio"])
if m.get("heading_jump_count") is not None: heading_jumps.append(m["heading_jump_count"])
if m.get("markdown_table_row_count") is not None:
table_rows.append(m["markdown_table_row_count"])
if m["markdown_table_row_count"] > 0:
table_with_rows.append(r.id)
if m.get("markdown_image_count") is not None: image_counts.append(m["markdown_image_count"])
if (m.get("markdown_heading_count") or 0) > 0:
has_headings += 1
for w in (q.get("warnings") or []):
warning_kinds[w] += 1
c = r.md_content or ""
if "$$" in c or KATEX_INLINE_RE.search(c):
katex_candidates.append(r.id)
if success_rows:
print(f"\n## quality 메트릭 (success {len(success_rows)}건)")
print(f" text_length_ratio {_stat(text_ratios)}")
print(f" heading_jump_count {_stat(heading_jumps)}")
print(f" markdown_table_row_count {_stat(table_rows)} (rows>0: {len(table_with_rows)}건)")
print(f" markdown_image_count {_stat(image_counts)} (현재 server.py 가 _images 버림 → 0 정상)")
print(f" heading anchor 후보 (markdown_heading_count > 0): {has_headings}")
print(f" KaTeX 후보 ($-수식 매칭): {len(katex_candidates)}건 ids={katex_candidates[:10]}")
if warning_kinds:
print("\n warnings (kind):")
for w, c in warning_kinds.most_common():
print(f" {c:>3} {w}")
print("\n## file_size bucket 별 success 비율")
bucket_stat: dict[str, list[int]] = defaultdict(lambda: [0, 0])
for r in rows:
b = _bucket(r.file_size)
bucket_stat[b][1] += 1
if r.md_status == "success":
bucket_stat[b][0] += 1
for b in ("small", "medium", "large", "unknown", "outlier"):
s, t = bucket_stat[b]
if t == 0:
continue
rate = (s / t * 100) if t else 0
print(f" {b:<8} {s}/{t} ({rate:.0f}%)")
print("\n## 사용자 검수 후보 (UI URL — 표 깨짐/heading anchor/KaTeX 직접 확인):")
for r in sorted(rows, key=lambda x: (x.md_status or "", -(x.file_size or 0))):
marker = "" if r.id in katex_candidates else " "
print(f" {marker} https://document.hyungi.net/documents/{r.id} [{r.md_status}] {(r.title or '-')[:60]}")
await engine.dispose()
EVAL_TEMPLATE_COLUMNS = [
"doc_id", "title", "sample_source", "bucket_label",
# rubric 5축 1~5점 (사용자 작성). plan §"Quality evaluation rubric".
"text_accuracy", "structure", "noise_rate", "multi_script", "completeness",
"overall_pass", # boolean (true/false) — "검색/참고에 쓸 만한가" 직관 판단
"notes", # 자유서술
]
def cmd_eval_template(in_path: Path, csv_out: Path) -> None:
"""select 결과 JSON 을 읽어 평가용 빈 CSV 스켈레톤을 출력. 사용자가 점수 채움."""
payload = json.loads(in_path.read_text())
items = payload.get("items", [])
csv_out.parent.mkdir(parents=True, exist_ok=True)
with csv_out.open("w", newline="", encoding="utf-8") as f:
writer = csv.DictWriter(f, fieldnames=EVAL_TEMPLATE_COLUMNS, extrasaction="ignore")
writer.writeheader()
for it in items:
writer.writerow({
"doc_id": it["id"],
"title": it.get("title", ""),
"sample_source": it.get("sample_source", ""),
"bucket_label": it.get("bucket_label", ""),
"text_accuracy": "",
"structure": "",
"noise_rate": "",
"multi_script": "",
"completeness": "",
"overall_pass": "",
"notes": "",
})
print(f"eval template 저장: {csv_out} ({len(items)} rows)")
print("rubric: 1~5점 (text_accuracy / structure / noise_rate / multi_script / completeness)")
print(" overall_pass = true/false ('검색/참고에 쓸 만한가' 직관 판단)")
print(f"평가 가이드: evals/markdown/README.md 참조")
def main() -> None:
parser = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter)
sub = parser.add_subparsers(dest="cmd", required=True)
p_sel = sub.add_parser("select", help="stratified 30건 dry-run + CSV+JSON 저장")
p_sel.add_argument("--out", type=Path, default=DEFAULT_OUT, help="JSON (cmd_enqueue/report 호환용)")
p_sel.add_argument("--csv", type=Path, default=DEFAULT_CSV, help="CSV (사용자 review + commit 대상)")
p_enq = sub.add_parser("enqueue", help="markdown 큐 enqueue")
p_enq.add_argument("--in", dest="in_path", type=Path, default=DEFAULT_OUT)
p_enq.add_argument("--yes", action="store_true")
p_enq.add_argument(
"--include-existing",
action="store_true",
help="existing_success (anchor + calibration) 도 재처리. default 는 controlled_backfill 만.",
)
p_rep = sub.add_parser("report", help="결과 집계")
p_rep.add_argument("--in", dest="in_path", type=Path, default=DEFAULT_OUT)
p_evt = sub.add_parser("eval_template", help="평가 CSV 스켈레톤 출력 (사용자가 rubric 점수 채움)")
p_evt.add_argument("--in", dest="in_path", type=Path, default=DEFAULT_OUT)
p_evt.add_argument("--csv", type=Path, default=DEFAULT_EVAL_CSV)
args = parser.parse_args()
if args.cmd == "select":
asyncio.run(cmd_select(args.out, args.csv))
elif args.cmd == "enqueue":
asyncio.run(cmd_enqueue(args.in_path, args.yes, args.include_existing))
elif args.cmd == "report":
asyncio.run(cmd_report(args.in_path))
elif args.cmd == "eval_template":
cmd_eval_template(args.in_path, args.csv)
if __name__ == "__main__":
main()