Files
hyungi_document_server/scripts/phase1d_pilot.py
T
Hyungi Ahn b09687d41d feat(scripts): Phase 1D Round 2 — controlled backfill stratification
기존 phase1d_pilot.py (단순 ai_domain × file_size 3-bucket) 를 plan
~/.claude/plans/stratified-mingling-otter.md 의 4축 + sample_source 분리
+ forced_include 로 augment.

Round 1 (ai_domain × file_size 3-bucket) 의 한계:
  pending PDFs 의 자연 분포만 반영 → 알려진 약점 (필기/스캔/한중일
  mixed OCR) 이 sample 에 안 들어옴. 1C 시각 확인에서 doc 4809
  (Note_240805_용접교육 필기) 가 실제로 그 패턴을 보였는데, 자연
  selection 에 맡기면 다음 라운드도 같은 case 가 빠질 위험.

Round 2 디자인:
  - 4 축 stratification: doc_type × file_size_band × text_density_band
    × handwritten_hint
  - sample_source ∈ {existing_success(5), controlled_backfill(25)}
  - forced_include doc 4809 — known bad anchor. 다음 튜닝/대안 도입 후
    같은 문서 재변환 결과와 1:1 비교 가능.
  - text_density = LENGTH(extracted_text) / (file_size / 1024) chars/KB
    가장 깨끗한 단일 proxy. 0.17(필기 4809) ↔ 94(born-digital 3759)
    양 끝 검증.
  - script_mix proxy: Hangul/CJK/Hiragana/Katakana/Latin Unicode block
    ratio → korean_dominant / mixed_korean_cjk / mixed_korean_latin /
    cjk_dominant / latin_dominant / unknown.
  - page_count_estimate: existing_success 는 md_extraction_quality.
    metrics.source_page_count 사용. controlled_backfill 은 NULL
    (marker 가 PyMuPDF 로 어차피 다시 읽음).
  - 시드 SAMPLE_SEED=20260502 고정, 재현성 보장.

Sample 분포 (실측 2026-05-02):
  bucket_label: born_digital=12, mixed=5, existing_calibration=4,
                handwritten=3, scan_likely=3, large=2, existing_anchor=1
  doc_type: Academic_Paper=7, study_note=6, Standard=5, Note=4,
            Reference=3, Manual=3, Drawing=1, Report=1
  file_size_band: M=14, S=12, L=4
  text_density_band: born-digital=15, scan-likely=9, mixed=6
  handwritten_hint: lo=26, hi=4 (모집단 1.1% 대비 13배 over-sample)
  forced anchor doc 4809 = density 0.17 (사용자 시각 확인의 그 문서)

새 subcommand:
  eval_template — pilot_1d_eval.csv 스켈레톤 (rubric 5축 1~5 +
  overall_pass + notes). 사용자가 MarkdownDoc + PDF 토글 비교하며
  점수 채움.

기존 cmd_enqueue (snapshot/backup/dedup) + cmd_report (quality 메트릭)
는 유지.

산출물:
  scripts/phase1d_pilot.py — 4축 + sample_source + forced_include +
    eval_template subcommand. CSV+JSON dual output.
  evals/markdown/README.md — rubric + decision matrix + workflow guide.
  evals/markdown/pilot_1d_sample.csv — 30 rows × 15 cols (시드 결과,
    재현성 보존).
  evals/markdown/pilot_1d_eval.csv — 빈 스켈레톤 (사용자 평가 후 채움).

실행 경계:
  Step 1~3 (selection / template / dry-run) = 본 PR 으로 완료.
  Step 4 (--yes enqueue, 실제 30건 markdown 큐 인입) = 사용자 timing
  승인 + 야간 단발 sweep 윈도우 (23:00~03:00 KST) 안에서 별도 실행.
  marker-service BATCH_SIZE=1, 30건 평균 5분/건 ≈ 2.5h.

Verify:
  GPU 서버 fastapi 컨테이너에서 select 실행 → 30건 sample CSV 생성됨.
  eval_template subcommand 동작 확인. enqueue dry-run 으로 30 doc_ids
  + snapshot 출력 후 사용자 취소 분기 확인.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-02 16:15:09 +09:00

764 lines
30 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Phase 1D — marker markdown 품질 pilot (one-shot admin script).
* 영구 worker 경로 아님. 30건 한정 sample 로 baseline 품질 측정.
* Phase 2 전체 백필 결정은 1D 결과 보고 후행.
* 1B.5 (이미지 추출 / _meta 보존) 는 별도 PR — 본 스크립트 영역 아님.
Stratification (Round 2 refined, plan: ~/.claude/plans/stratified-mingling-otter.md):
4 축: doc_type × file_size_band × text_density_band × handwritten_hint
+ sample_source ∈ {existing_success, controlled_backfill}
- existing_success 5건 (anchor 1 + calibration 4)
- controlled_backfill 25건 (handwritten 3 / scan_likely 2~3 / mixed 5 / born_digital 12 / large 2)
+ forced_include: doc 4809 (Note_240805_용접교육 필기) — known bad handwritten anchor.
document_type ∈ SKIP_DOC_TYPES 제외 (marker_worker 룰 미러).
Subcommands:
select stratified 30건 dry-run + CSV+JSON 저장
enqueue select 결과를 markdown 큐에 enqueue (uq_queue_active 위반 회피)
report md_status 분포·실패사유·quality 메트릭·UI 검수 URL 출력
eval_template pilot_1d_eval.csv 스켈레톤 출력 (사용자가 rubric 5축 점수 채움)
실행 (GPU 서버):
docker compose exec fastapi python /app/scripts/phase1d_pilot.py select \
--csv /app/evals/markdown/pilot_1d_sample.csv
docker compose exec fastapi python /app/scripts/phase1d_pilot.py enqueue --yes
docker compose exec fastapi python /app/scripts/phase1d_pilot.py report
docker compose exec fastapi python /app/scripts/phase1d_pilot.py eval_template \
--csv /app/evals/markdown/pilot_1d_eval.csv
"""
import argparse
import asyncio
import csv
import json
import os
import random
import re
import sys
from collections import Counter, defaultdict
from pathlib import Path
# fastapi 컨테이너는 WORKDIR=/app 에 코드를 펼쳐놓음 (app/ 디렉토리 없음).
# /app/scripts/../app 이 아니라 /app 자체가 sys.path 에 있어야 `from models...` import 가능.
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
# marker_worker 의 SKIP 룰 미러 — drift 회피 위해 한 곳만 진실. 변경 시 동기 필요.
SKIP_DOC_TYPES = {
"발주서", "세금계산서", "명세표",
"Invoice", "Purchase_Order", "Estimate", "Statement",
}
# file_size bucket (page_count proxy). PDF 평균 1페이지 ~10~50KB.
SIZE_BUCKETS = [
("small", 0, 500 * 1024), # < 500KB
("medium", 500 * 1024, 5 * 1024 * 1024), # 500KB ~ 5MB
("large", 5 * 1024 * 1024, 10**12), # > 5MB
]
# 4축 stratification 의 file_size_band — Round 2 plan
FILE_SIZE_BAND_THRESHOLDS = [
("S", 0, 1 * 1024 * 1024), # < 1MB
("M", 1 * 1024 * 1024, 10 * 1024 * 1024), # 1~10MB
("L", 10 * 1024 * 1024, 10**12), # > 10MB
]
# text_density (chars per KB of file) — born-digital vs scan 구분 단일 깨끗한 proxy.
# 0.17 (필기 4809) ↔ 94 (born-digital 3759) 양 끝 검증됨.
TEXT_DENSITY_BANDS = [
("scan-likely", 0.0, 5.0),
("mixed", 5.0, 50.0),
("born-digital", 50.0, float("inf")),
]
HANDWRITTEN_HINT_REGEX = re.compile(r"필기|노트|handwritten|scan|스캔|note", re.IGNORECASE)
# Forced include — 사용자 시각 확인에서 발견된 known bad anchor.
# 1D 결과로 다음 라운드 튜닝 시 같은 문서를 재변환해 개선 여부 판정.
FORCED_INCLUDES: dict[int, str] = {
4809: "known_bad_handwritten_anchor",
}
# 재현성 시드 — 한 번 만든 sample CSV 가 동일 결과 보장.
SAMPLE_SEED = 20260502
PILOT_TARGET = 30
EXISTING_SUCCESS_TARGET = 5
CONTROLLED_BACKFILL_TARGET = PILOT_TARGET - EXISTING_SUCCESS_TARGET # 25
DEFAULT_OUT = Path("/tmp/phase1d_pilot.json")
DEFAULT_CSV = Path("/tmp/phase1d_pilot.csv")
DEFAULT_EVAL_CSV = Path("/tmp/phase1d_eval.csv")
def _bucket(file_size: int | None) -> str:
"""legacy 3-bucket — cmd_report 의 file_size bucket 호환."""
if file_size is None:
return "unknown"
for name, lo, hi in SIZE_BUCKETS:
if lo <= file_size < hi:
return name
return "outlier"
def _file_size_band(file_size: int | None) -> str:
"""Round 2 refined band: S / M / L."""
if file_size is None:
return "unknown"
for name, lo, hi in FILE_SIZE_BAND_THRESHOLDS:
if lo <= file_size < hi:
return name
return "L"
def _text_density(text_len: int, file_size: int | None) -> float | None:
"""chars per KB of file. file_size==0/None 이면 None."""
if not file_size or file_size <= 0:
return None
return text_len / (file_size / 1024.0)
def _text_density_band(density: float | None) -> str:
if density is None:
return "unknown"
for name, lo, hi in TEXT_DENSITY_BANDS:
if lo <= density < hi:
return name
return "unknown"
def _handwritten_hint(title: str | None, file_path: str | None) -> str:
"""title 또는 file_path 에 필기/노트/handwritten/scan 매칭 → 'hi' / 'lo'."""
blob = " ".join(filter(None, [title or "", file_path or ""]))
return "hi" if HANDWRITTEN_HINT_REGEX.search(blob) else "lo"
def _scan_likely(text_len: int, file_size: int | None, density: float | None) -> bool:
"""text_density < 5 또는 extracted_text 부재 → 스캔 가능성 높음."""
if text_len == 0:
return True
if density is not None and density < 5.0:
return True
return False
def _script_mix(extracted_text: str | None, sample_chars: int = 10000) -> str:
"""첫 N자에서 Hangul/CJK/Hiragana/Katakana/Latin 비율로 라벨링.
한 script ≥ 0.7 → '<script>_dominant'
두 script 각 ≥ 0.1 → 'mixed_<a>_<b>'
그 외 → 'unknown'.
mojibake/OCR 노이즈가 심하면 비율이 이상하게 나오는데, 그것도 신호.
"""
if not extracted_text:
return "unknown"
sample = extracted_text[:sample_chars]
counts = {"hangul": 0, "cjk": 0, "kana": 0, "latin": 0, "other": 0}
total = 0
for ch in sample:
cp = ord(ch)
if ch.isspace():
continue
total += 1
if 0xAC00 <= cp <= 0xD7A3 or 0x1100 <= cp <= 0x11FF or 0x3130 <= cp <= 0x318F:
counts["hangul"] += 1
elif 0x4E00 <= cp <= 0x9FFF or 0x3400 <= cp <= 0x4DBF:
counts["cjk"] += 1
elif 0x3040 <= cp <= 0x30FF:
counts["kana"] += 1
elif (0x0041 <= cp <= 0x005A) or (0x0061 <= cp <= 0x007A) or (0x00C0 <= cp <= 0x024F):
counts["latin"] += 1
else:
counts["other"] += 1
if total == 0:
return "unknown"
ratios = {k: v / total for k, v in counts.items() if k != "other"}
primary = sorted(ratios.items(), key=lambda x: -x[1])
if primary[0][1] >= 0.7:
return f"{primary[0][0]}_dominant"
significant = [name for name, r in primary if r >= 0.1]
if len(significant) >= 2:
return "mixed_" + "_".join(sorted(significant[:2]))
return "unknown"
def _page_count_estimate(md_extraction_quality: dict | None) -> int | None:
"""existing_success 의 marker quality.metrics.source_page_count 가 있으면 사용.
controlled_backfill 은 marker 가 변환 시 PyMuPDF 로 채울 예정 → NULL.
사후 평가 해석용 보조값."""
if not md_extraction_quality or not isinstance(md_extraction_quality, dict):
return None
metrics = md_extraction_quality.get("metrics")
if not isinstance(metrics, dict):
return None
pc = metrics.get("source_page_count")
if isinstance(pc, int):
return pc
return None
# Sample CSV 컬럼 순서 — plan §"Sample CSV 컬럼" 과 동일.
CSV_COLUMNS = [
"doc_id", "title", "sample_source", "forced_include_reason", "bucket_label",
"doc_type", "file_size", "file_size_band",
"text_len", "text_density", "text_density_band",
"handwritten_hint", "scan_likely", "script_mix",
"page_count_estimate",
]
def _build_engine() -> "AsyncEngine":
db_url = os.environ["DATABASE_URL"]
return create_async_engine(db_url, pool_pre_ping=True)
# ─── select ───
def _enrich_row(r) -> dict:
"""Document row → sample dict (compute proxies)."""
text_len = len(r.extracted_text or "")
density = _text_density(text_len, r.file_size)
return {
"doc_id": r.id,
"title": r.title,
"doc_type": r.document_type,
"file_size": r.file_size or 0,
"file_size_band": _file_size_band(r.file_size),
"text_len": text_len,
"text_density": round(density, 3) if density is not None else None,
"text_density_band": _text_density_band(density),
"handwritten_hint": _handwritten_hint(r.title, r.file_path),
"scan_likely": _scan_likely(text_len, r.file_size, density),
"script_mix": _script_mix(r.extracted_text),
"page_count_estimate": _page_count_estimate(r.md_extraction_quality),
"_file_path": r.file_path,
"_ai_domain": r.ai_domain,
}
def _allocate_controlled_backfill(candidates: list[dict], rng: random.Random) -> list[dict]:
"""controlled_backfill 25건 — 4축 stratified + 의도적 over-sample.
plan §"Sample budget":
handwritten 3 / scan_likely 2~3 / mixed 5 / born_digital 12 / large 2
"""
selected: list[dict] = []
used: set[int] = set()
def take(pool: list[dict], n: int, label: str) -> None:
avail = [c for c in pool if c["doc_id"] not in used]
rng.shuffle(avail)
for c in avail[:n]:
c["bucket_label"] = label
selected.append(c)
used.add(c["doc_id"])
# 1. handwritten_hint=hi (전 3건 채택, 모집단 1.1% → sample 10%)
take([c for c in candidates if c["handwritten_hint"] == "hi"], 3, "handwritten")
# 2. scan-likely (handwritten 과 dedupe 후 2~3건)
take([c for c in candidates if c["text_density_band"] == "scan-likely"], 3, "scan_likely")
# 3. mixed (5건)
take([c for c in candidates if c["text_density_band"] == "mixed"], 5, "mixed")
# 4. born-digital × doc_type 다양 (12건). doc_type 분포 가이드:
# study_note 3 / Academic_Paper 3 / Reference 2 / Note 1 / (Manual+Standard+Specification) 2 / NULL 1
born_digital = [c for c in candidates if c["text_density_band"] == "born-digital" and c["doc_id"] not in used]
by_type: dict[str | None, list[dict]] = defaultdict(list)
for c in born_digital:
by_type[c["doc_type"]].append(c)
for pool in by_type.values():
rng.shuffle(pool)
target_quota = [
("study_note", 3),
("Academic_Paper", 3),
("Reference", 2),
("Note", 1),
("Manual", 1),
("Standard", 1),
(None, 1), # NULL
]
born_added = 0
for dt, n in target_quota:
for c in by_type.get(dt, [])[:n]:
if c["doc_id"] in used:
continue
c["bucket_label"] = "born_digital"
selected.append(c)
used.add(c["doc_id"])
born_added += 1
if born_added >= 12:
break
if born_added >= 12:
break
# 12 미달 시 남은 born-digital 로 채움 (doc_type 무관)
if born_added < 12:
leftover = [c for c in born_digital if c["doc_id"] not in used]
rng.shuffle(leftover)
for c in leftover[: 12 - born_added]:
c["bucket_label"] = "born_digital"
selected.append(c)
used.add(c["doc_id"])
# 5. file_size band=L (>10MB) — 위 4 bucket 안 든 것 보충 (목표 2건)
large_pool = [c for c in candidates if c["file_size_band"] == "L" and c["doc_id"] not in used]
take(large_pool, 2, "large")
# 30 - existing_success(5) = 25 가 목표. 부족하면 일반 pending 에서 보충.
if len(selected) < CONTROLLED_BACKFILL_TARGET:
leftover = [c for c in candidates if c["doc_id"] not in used]
rng.shuffle(leftover)
for c in leftover[: CONTROLLED_BACKFILL_TARGET - len(selected)]:
c["bucket_label"] = "filler"
selected.append(c)
used.add(c["doc_id"])
return selected[:CONTROLLED_BACKFILL_TARGET]
def _allocate_existing_success(rows: list, rng: random.Random) -> list[dict]:
"""existing_success 5건 = forced anchor 1 + calibration 4 (text_density 분포 균형)."""
enriched = [_enrich_row(r) for r in rows]
by_id = {c["doc_id"]: c for c in enriched}
selected: list[dict] = []
used: set[int] = set()
# forced_include
for fid, reason in FORCED_INCLUDES.items():
if fid in by_id:
c = by_id[fid]
c["bucket_label"] = "existing_anchor"
c["forced_include_reason"] = reason
selected.append(c)
used.add(fid)
else:
print(f"[warn] forced_include doc_id={fid} 가 existing_success 후보에 없음 — skip")
# calibration 4건 — text_density 분포 균형
remaining = [c for c in enriched if c["doc_id"] not in used]
quotas: list[tuple[str, int]] = [("scan-likely", 1), ("mixed", 1), ("born-digital", 2)]
for band, n in quotas:
pool = [c for c in remaining if c["text_density_band"] == band and c["doc_id"] not in used]
rng.shuffle(pool)
for c in pool[:n]:
c["bucket_label"] = "existing_calibration"
selected.append(c)
used.add(c["doc_id"])
if len(selected) < EXISTING_SUCCESS_TARGET:
leftover = [c for c in remaining if c["doc_id"] not in used]
rng.shuffle(leftover)
for c in leftover[: EXISTING_SUCCESS_TARGET - len(selected)]:
c["bucket_label"] = "existing_calibration"
selected.append(c)
used.add(c["doc_id"])
return selected[:EXISTING_SUCCESS_TARGET]
def _print_distribution(samples: list[dict]) -> None:
print(f"\n선정 {len(samples)}건 (목표 {PILOT_TARGET}):\n")
print(f"{'ID':>6} {'KB':>8} {'src':<22} {'bucket':<22} {'doctype':<22} {'density':>8} title")
print("-" * 160)
for s in sorted(samples, key=lambda x: (x["sample_source"], x["bucket_label"], x["doc_id"])):
d = s["text_density"]
density_s = f"{d:.2f}" if d is not None else "-"
print(
f"{s['doc_id']:>6} {(s['file_size'] // 1024):>8} "
f"{s['sample_source'][:22]:<22} "
f"{s['bucket_label'][:22]:<22} "
f"{(s['doc_type'] or '-')[:22]:<22} "
f"{density_s:>8} "
f"{(s['title'] or '-')[:60]}"
)
by_axis = lambda key: Counter(s[key] for s in samples)
print("\n분포:")
for axis in ("sample_source", "bucket_label", "doc_type", "file_size_band", "text_density_band", "handwritten_hint", "script_mix"):
c = by_axis(axis)
line = ", ".join(f"{k}={v}" for k, v in sorted(c.items(), key=lambda x: -x[1]))
print(f" {axis}: {line}")
async def cmd_select(out_path: Path, csv_path: Path | None) -> None:
engine = _build_engine()
Session = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
from models.document import Document # type: ignore
async with Session() as session:
# A. existing_success — 기존에 marker_worker 가 변환 성공한 PDFs (anchor + calibration).
es_rows = (
await session.execute(
select(
Document.id, Document.title, Document.ai_domain, Document.document_type,
Document.file_size, Document.file_path, Document.file_format,
Document.extracted_text, Document.md_extraction_quality,
).where(
Document.deleted_at.is_(None),
Document.file_format == "pdf",
Document.md_status == "success",
)
)
).all()
# B. controlled_backfill — pending PDF document, SKIP_DOC_TYPES 제외.
cb_rows = (
await session.execute(
select(
Document.id, Document.title, Document.ai_domain, Document.document_type,
Document.file_size, Document.file_path, Document.file_format,
Document.extracted_text, Document.md_extraction_quality,
).where(
Document.deleted_at.is_(None),
Document.file_format == "pdf",
Document.md_status == "pending",
Document.category == "document",
)
)
).all()
cb_candidates_raw = [
r for r in cb_rows if not (r.document_type and r.document_type in SKIP_DOC_TYPES)
]
print(
f"existing_success PDFs: {len(es_rows)}건 / "
f"controlled_backfill 후보 (SKIP 제외): {len(cb_candidates_raw)}"
f"(전체 pending PDF document: {len(cb_rows)}건)"
)
rng = random.Random(SAMPLE_SEED)
existing_samples = _allocate_existing_success(es_rows, rng)
for s in existing_samples:
s["sample_source"] = "existing_success"
s.setdefault("forced_include_reason", "")
cb_candidates = [_enrich_row(r) for r in cb_candidates_raw]
backfill_samples = _allocate_controlled_backfill(cb_candidates, rng)
for s in backfill_samples:
s["sample_source"] = "controlled_backfill"
s.setdefault("forced_include_reason", "")
samples = existing_samples + backfill_samples
_print_distribution(samples)
# CSV 저장 (사용자 review + commit 대상)
if csv_path is not None:
csv_path.parent.mkdir(parents=True, exist_ok=True)
with csv_path.open("w", newline="", encoding="utf-8") as f:
writer = csv.DictWriter(f, fieldnames=CSV_COLUMNS, extrasaction="ignore")
writer.writeheader()
for s in samples:
row = {col: s.get(col) for col in CSV_COLUMNS}
# bool → str (CSV 가독성)
row["scan_likely"] = "true" if s.get("scan_likely") else "false"
writer.writerow(row)
print(f"\nCSV 저장: {csv_path}")
# JSON 저장 — cmd_enqueue / cmd_report 호환용 (기존 schema 유지)
payload = {
"target": PILOT_TARGET,
"seed": SAMPLE_SEED,
"ids": [s["doc_id"] for s in samples],
"items": [
{
"id": s["doc_id"],
"title": s["title"],
"ai_domain": s.get("_ai_domain"),
"document_type": s["doc_type"],
"file_size": s["file_size"],
"file_path": s.get("_file_path"),
"sample_source": s["sample_source"],
"bucket_label": s["bucket_label"],
"forced_include_reason": s.get("forced_include_reason", ""),
}
for s in samples
],
}
out_path.write_text(json.dumps(payload, ensure_ascii=False, indent=2))
print(f"JSON 저장: {out_path}")
await engine.dispose()
# ─── enqueue ───
async def cmd_enqueue(in_path: Path, yes: bool) -> None:
from datetime import datetime, timezone
from sqlalchemy import func
raw_payload = in_path.read_text()
payload = json.loads(raw_payload)
ids: list[int] = payload["ids"]
ts = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ")
# (1) /tmp/phase1d_pilot.json 원본 보존 — 재실행/덮어쓰기 대비 timestamped 사본
backup_path = in_path.with_name(f"{in_path.stem}_pre_enqueue_{ts}.json")
backup_path.write_text(raw_payload)
print(f"[backup] {backup_path}")
# (2) enqueue 대상 document_id 목록 (한 줄)
print(f"[targets] {len(ids)}건: {ids}")
engine = _build_engine()
Session = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
from models.document import Document # type: ignore
from models.queue import enqueue_stage # type: ignore
# (3) 실행 전 md_status 분포 스냅샷
async with Session() as session:
snap_rows = (
await session.execute(
select(Document.md_status, func.count())
.where(Document.deleted_at.is_(None))
.group_by(Document.md_status)
)
).all()
snapshot = {
"timestamp_utc": ts,
"scope": "documents WHERE deleted_at IS NULL",
"md_status_distribution": {str(s): int(c) for s, c in snap_rows},
"sample_ids": ids,
}
snap_path = in_path.with_name(f"phase1d_md_status_pre_{ts}.json")
snap_path.write_text(json.dumps(snapshot, ensure_ascii=False, indent=2))
print(f"[snapshot] {snap_path}")
print(f" {snapshot['md_status_distribution']}")
if not yes:
confirm = input(f"\n{len(ids)}건 markdown 큐에 enqueue 합니다. 진행? [y/N] ")
if confirm.strip().lower() not in ("y", "yes"):
print("취소됨.")
await engine.dispose()
return
enqueued, skipped = [], []
async with Session() as session:
for doc_id in ids:
ok = await enqueue_stage(session, doc_id, "markdown")
(enqueued if ok else skipped).append(doc_id)
await session.commit()
print(f"\nenqueued: {len(enqueued)}, skipped (이미 active): {len(skipped)}")
if skipped:
print(f" skipped ids: {skipped[:20]}{'' if len(skipped) > 20 else ''}")
await engine.dispose()
# ─── report ───
def _stat(vals: list[float]) -> str:
if not vals:
return "-"
s = sorted(vals)
n = len(s)
p50 = s[n // 2]
p90 = s[min(n - 1, int(n * 0.9))]
return f"min={s[0]:.2f} p50={p50:.2f} p90={p90:.2f} max={s[-1]:.2f}"
KATEX_INLINE_RE = re.compile(r"(?<!\$)\$[^$\n]{1,160}\$(?!\$)")
async def cmd_report(in_path: Path) -> None:
payload = json.loads(in_path.read_text())
ids: list[int] = payload["ids"]
engine = _build_engine()
Session = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
from models.document import Document # type: ignore
from models.queue import ProcessingQueue # type: ignore
async with Session() as session:
rows = (
await session.execute(
select(
Document.id, Document.title, Document.ai_domain, Document.file_size,
Document.md_status, Document.md_extraction_error,
Document.md_extraction_quality, Document.md_content,
Document.md_generated_at, Document.created_at,
).where(Document.id.in_(ids))
)
).all()
q_rows = (
await session.execute(
select(
ProcessingQueue.document_id,
ProcessingQueue.status,
ProcessingQueue.attempts,
ProcessingQueue.started_at,
ProcessingQueue.completed_at,
).where(
ProcessingQueue.document_id.in_(ids),
ProcessingQueue.stage == "markdown",
)
)
).all()
elapsed: dict[int, float] = {}
queue_status: dict[int, str] = {}
queue_attempts: dict[int, int] = {}
for q in q_rows:
queue_status[q.document_id] = q.status
queue_attempts[q.document_id] = q.attempts
if q.started_at and q.completed_at:
elapsed[q.document_id] = (q.completed_at - q.started_at).total_seconds()
by_status = Counter(r.md_status for r in rows)
print(f"\n## md_status 분포 (sample {len(rows)}건)")
for s, c in sorted(by_status.items(), key=lambda x: -x[1]):
print(f" {s:<12} {c}")
in_queue = [i for i in ids if queue_status.get(i) in {"pending", "processing"}]
if in_queue:
print(f"\n 처리 대기/중 (큐 active): {len(in_queue)}")
es = [v for k, v in elapsed.items()
if next((r for r in rows if r.id == k and r.md_status == "success"), None)]
if es:
print(f"\n## 평균 처리 시간 (success {len(es)}건)")
print(f" avg={sum(es)/len(es):.1f}s {_stat(es)}")
else:
print("\n## 처리 시간: 측정 가능한 success 없음")
fail_reasons: Counter = Counter()
for r in rows:
if r.md_status == "failed":
fail_reasons[(r.md_extraction_error or "unknown")[:140]] += 1
if fail_reasons:
print("\n## 실패 사유 top 5")
for reason, c in fail_reasons.most_common(5):
print(f" {c:>3} {reason}")
text_ratios, heading_jumps, table_rows, image_counts = [], [], [], []
warning_kinds: Counter = Counter()
katex_candidates, table_with_rows = [], []
has_headings = 0
success_rows = [r for r in rows if r.md_status == "success"]
for r in success_rows:
q = r.md_extraction_quality or {}
m = q.get("metrics", {}) or {}
if m.get("text_length_ratio") is not None: text_ratios.append(m["text_length_ratio"])
if m.get("heading_jump_count") is not None: heading_jumps.append(m["heading_jump_count"])
if m.get("markdown_table_row_count") is not None:
table_rows.append(m["markdown_table_row_count"])
if m["markdown_table_row_count"] > 0:
table_with_rows.append(r.id)
if m.get("markdown_image_count") is not None: image_counts.append(m["markdown_image_count"])
if (m.get("markdown_heading_count") or 0) > 0:
has_headings += 1
for w in (q.get("warnings") or []):
warning_kinds[w] += 1
c = r.md_content or ""
if "$$" in c or KATEX_INLINE_RE.search(c):
katex_candidates.append(r.id)
if success_rows:
print(f"\n## quality 메트릭 (success {len(success_rows)}건)")
print(f" text_length_ratio {_stat(text_ratios)}")
print(f" heading_jump_count {_stat(heading_jumps)}")
print(f" markdown_table_row_count {_stat(table_rows)} (rows>0: {len(table_with_rows)}건)")
print(f" markdown_image_count {_stat(image_counts)} (현재 server.py 가 _images 버림 → 0 정상)")
print(f" heading anchor 후보 (markdown_heading_count > 0): {has_headings}")
print(f" KaTeX 후보 ($-수식 매칭): {len(katex_candidates)}건 ids={katex_candidates[:10]}")
if warning_kinds:
print("\n warnings (kind):")
for w, c in warning_kinds.most_common():
print(f" {c:>3} {w}")
print("\n## file_size bucket 별 success 비율")
bucket_stat: dict[str, list[int]] = defaultdict(lambda: [0, 0])
for r in rows:
b = _bucket(r.file_size)
bucket_stat[b][1] += 1
if r.md_status == "success":
bucket_stat[b][0] += 1
for b in ("small", "medium", "large", "unknown", "outlier"):
s, t = bucket_stat[b]
if t == 0:
continue
rate = (s / t * 100) if t else 0
print(f" {b:<8} {s}/{t} ({rate:.0f}%)")
print("\n## 사용자 검수 후보 (UI URL — 표 깨짐/heading anchor/KaTeX 직접 확인):")
for r in sorted(rows, key=lambda x: (x.md_status or "", -(x.file_size or 0))):
marker = "" if r.id in katex_candidates else " "
print(f" {marker} https://document.hyungi.net/documents/{r.id} [{r.md_status}] {(r.title or '-')[:60]}")
await engine.dispose()
EVAL_TEMPLATE_COLUMNS = [
"doc_id", "title", "sample_source", "bucket_label",
# rubric 5축 1~5점 (사용자 작성). plan §"Quality evaluation rubric".
"text_accuracy", "structure", "noise_rate", "multi_script", "completeness",
"overall_pass", # boolean (true/false) — "검색/참고에 쓸 만한가" 직관 판단
"notes", # 자유서술
]
def cmd_eval_template(in_path: Path, csv_out: Path) -> None:
"""select 결과 JSON 을 읽어 평가용 빈 CSV 스켈레톤을 출력. 사용자가 점수 채움."""
payload = json.loads(in_path.read_text())
items = payload.get("items", [])
csv_out.parent.mkdir(parents=True, exist_ok=True)
with csv_out.open("w", newline="", encoding="utf-8") as f:
writer = csv.DictWriter(f, fieldnames=EVAL_TEMPLATE_COLUMNS, extrasaction="ignore")
writer.writeheader()
for it in items:
writer.writerow({
"doc_id": it["id"],
"title": it.get("title", ""),
"sample_source": it.get("sample_source", ""),
"bucket_label": it.get("bucket_label", ""),
"text_accuracy": "",
"structure": "",
"noise_rate": "",
"multi_script": "",
"completeness": "",
"overall_pass": "",
"notes": "",
})
print(f"eval template 저장: {csv_out} ({len(items)} rows)")
print("rubric: 1~5점 (text_accuracy / structure / noise_rate / multi_script / completeness)")
print(" overall_pass = true/false ('검색/참고에 쓸 만한가' 직관 판단)")
print(f"평가 가이드: evals/markdown/README.md 참조")
def main() -> None:
parser = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter)
sub = parser.add_subparsers(dest="cmd", required=True)
p_sel = sub.add_parser("select", help="stratified 30건 dry-run + CSV+JSON 저장")
p_sel.add_argument("--out", type=Path, default=DEFAULT_OUT, help="JSON (cmd_enqueue/report 호환용)")
p_sel.add_argument("--csv", type=Path, default=DEFAULT_CSV, help="CSV (사용자 review + commit 대상)")
p_enq = sub.add_parser("enqueue", help="markdown 큐 enqueue")
p_enq.add_argument("--in", dest="in_path", type=Path, default=DEFAULT_OUT)
p_enq.add_argument("--yes", action="store_true")
p_rep = sub.add_parser("report", help="결과 집계")
p_rep.add_argument("--in", dest="in_path", type=Path, default=DEFAULT_OUT)
p_evt = sub.add_parser("eval_template", help="평가 CSV 스켈레톤 출력 (사용자가 rubric 점수 채움)")
p_evt.add_argument("--in", dest="in_path", type=Path, default=DEFAULT_OUT)
p_evt.add_argument("--csv", type=Path, default=DEFAULT_EVAL_CSV)
args = parser.parse_args()
if args.cmd == "select":
asyncio.run(cmd_select(args.out, args.csv))
elif args.cmd == "enqueue":
asyncio.run(cmd_enqueue(args.in_path, args.yes))
elif args.cmd == "report":
asyncio.run(cmd_report(args.in_path))
elif args.cmd == "eval_template":
cmd_eval_template(args.in_path, args.csv)
if __name__ == "__main__":
main()