"""Phase 2 — markdown canonical layer full backfill. * legacy pending PDF (Phase 1B/1C/1D 후 잔여) 을 야간 sweep + canary 단계로 변환. * 1D pilot (25건 controlled_backfill, 23 success / 2 skipped / 0 failed) 가 engineering go signal. * handwritten 자동 skip (commit 7d0fca2) 와 marker_worker SKIP_DOC_TYPES / MAX_PAGES guard 모두 운영 시점 가드. 본 스크립트는 inventory · sample · enqueue 만 담당. Subcommands: inventory pending PDFs 의 dry-run inventory CSV 작성 (skip forecast 포함) select-canary inventory 에서 stratified 40건 canary sample CSV 작성 (재현성 시드) enqueue sample CSV 의 doc_id 들을 markdown 큐에 enqueue (one-shot, --no-dry-run 필요) nightly-enqueue 야간 sweep — disable flag / marker ready / active-queue threshold / DB pool 가드 후 limit 만큼 enqueue + log_tsv post-report 최종 결과 CSV + markdown 요약 (1D baseline 비교 포함) 실행 (GPU 서버): docker exec hyungi_document_server-fastapi-1 python /app/scripts/phase2_backfill.py inventory \ --output /app/evals/markdown/phase2_inventory.csv docker exec hyungi_document_server-fastapi-1 python /app/scripts/phase2_backfill.py select-canary \ --inventory /app/evals/markdown/phase2_inventory.csv \ --output /app/evals/markdown/phase2_canary_sample.csv \ --seed 20260503 docker exec hyungi_document_server-fastapi-1 python /app/scripts/phase2_backfill.py enqueue \ --csv /app/evals/markdown/phase2_canary_sample.csv --no-dry-run # cron (nightly): docker exec hyungi_document_server-fastapi-1 python /app/scripts/phase2_backfill.py nightly-enqueue \ --limit 50 --max-active-queue 5 \ --log-tsv /app/evals/markdown/phase2_nightly_log.tsv docker exec hyungi_document_server-fastapi-1 python /app/scripts/phase2_backfill.py post-report \ --output-csv /app/evals/markdown/phase2_post_report.csv \ --output-md /app/evals/markdown/phase2_post_report.md """ import argparse import asyncio import csv import json import os import random import re import sys import urllib.error import urllib.request from collections import Counter, defaultdict from datetime import datetime, timezone from pathlib import Path # fastapi 컨테이너 WORKDIR=/app — `from models...` import 가능하게 path 추가. sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..")) from sqlalchemy import func, select, text from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine # marker_worker 룰 미러 (1D pilot 패턴 따름 — drift 회피 위해 한 곳). SKIP_DOC_TYPES = { "발주서", "세금계산서", "명세표", "Invoice", "Purchase_Order", "Estimate", "Statement", } # handwritten 자동 skip 키워드 (commit 7d0fca2 의 marker_worker 정의와 동일). HANDWRITTEN_REGEX = re.compile(r"필기|손글씨|handwritten|handwriting", re.IGNORECASE) # Inventory 의 page_count proxy — file_size 가 25MB 초과면 MAX_PAGES=200 초과 가능성 높음. # 실제 marker_worker 가 PyMuPDF 로 page_count 확인 후 결정. 본 forecast 는 plan budget 추정용. MAX_PAGES_FILESIZE_PROXY = 25 * 1024 * 1024 # stuck 판단 임계 — 평균 변환 시간 ~48s (1D 실측). 30분이면 의심, 60분이면 stuck likely. STUCK_THRESHOLD_MIN = 60.0 # Disable flag — 사용자 또는 자동 abort 가드가 생성. cron 매 사이클 시작 시 체크. DISABLE_FLAG_PATH = Path("/tmp/phase2_disable") # 컨테이너 내부 경로 (호스트 mount 별도) # marker-service ready endpoint (Phase 1B 명세). MARKER_READY_URL = "http://marker-service:3300/ready" # ─── helpers ──────────────────────────────────────────────────────────── def _build_engine(): db_url = os.environ["DATABASE_URL"] return create_async_engine(db_url, pool_pre_ping=True) def _file_size_band(file_size: int | None) -> str: if file_size is None: return "unknown" if file_size < 1 * 1024 * 1024: return "S" if file_size < 10 * 1024 * 1024: return "M" return "L" def _text_density(text_len: int, file_size: int | None) -> float | None: if not file_size or file_size <= 0: return None return text_len / (file_size / 1024.0) def _text_density_band(density: float | None) -> str: if density is None: return "unknown" if density < 5.0: return "scan-likely" if density < 50.0: return "mixed" return "born-digital" def _forecast_skip_reason(file_format: str | None, doc_type: str | None, title: str | None, file_path: str | None, file_size: int | None) -> str: if file_format and file_format.lower() != "pdf": return "unsupported_extension" if doc_type and doc_type in SKIP_DOC_TYPES: return "doctype_skip" blob = " ".join(filter(None, [title or "", file_path or ""])) if HANDWRITTEN_REGEX.search(blob): return "handwritten_hint" if file_size and file_size > MAX_PAGES_FILESIZE_PROXY: return "over_max_pages_estimated" return "none" def _percentile(values: list[float], p: float) -> float | None: if not values: return None s = sorted(values) idx = max(0, min(len(s) - 1, int(len(s) * p))) return s[idx] def _check_marker_ready() -> bool: try: with urllib.request.urlopen(MARKER_READY_URL, timeout=5) as resp: data = json.load(resp) return data.get("status") == "ready" except (urllib.error.URLError, urllib.error.HTTPError, json.JSONDecodeError, OSError): return False # ─── inventory ────────────────────────────────────────────────────────── INVENTORY_COLUMNS = [ "doc_id", "title", "file_path", "file_size", "file_size_band", "text_len", "text_density", "text_density_band", "doc_type", "forecast_skip_reason", "created_at", ] async def cmd_inventory(output: Path) -> None: engine = _build_engine() Session = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) from models.document import Document # type: ignore async with Session() as session: rows = ( await session.execute( select( Document.id, Document.title, Document.file_path, Document.file_size, Document.file_format, Document.document_type, Document.extracted_text, Document.created_at, ).where( Document.deleted_at.is_(None), Document.file_format == "pdf", Document.md_status == "pending", ) .order_by(Document.created_at.asc()) ) ).all() enriched = [] for r in rows: text_len = len(r.extracted_text or "") density = _text_density(text_len, r.file_size) enriched.append({ "doc_id": r.id, "title": (r.title or ""), "file_path": (r.file_path or ""), "file_size": r.file_size or 0, "file_size_band": _file_size_band(r.file_size), "text_len": text_len, "text_density": round(density, 3) if density is not None else "", "text_density_band": _text_density_band(density), "doc_type": r.document_type or "", "forecast_skip_reason": _forecast_skip_reason( r.file_format, r.document_type, r.title, r.file_path, r.file_size ), "created_at": r.created_at.isoformat() if r.created_at else "", }) output.parent.mkdir(parents=True, exist_ok=True) with output.open("w", newline="", encoding="utf-8") as f: writer = csv.DictWriter(f, fieldnames=INVENTORY_COLUMNS, extrasaction="ignore") writer.writeheader() for row in enriched: writer.writerow(row) skip_dist = Counter(r["forecast_skip_reason"] for r in enriched) band_dist = Counter(r["file_size_band"] for r in enriched) density_dist = Counter(r["text_density_band"] for r in enriched) type_dist = Counter(r["doc_type"] or "(NULL)" for r in enriched) print(f"\n## Phase 2 inventory") print(f" total pending PDFs: {len(enriched)}") print(f" output: {output}") print(f"\n forecast_skip_reason:") for k, v in skip_dist.most_common(): print(f" {k:<30} {v:>4}") print(f"\n file_size_band: ", dict(band_dist)) print(f" text_density_band: ", dict(density_dist)) print(f"\n doc_type top 10:") for k, v in type_dist.most_common(10): print(f" {k:<25} {v:>4}") convert_target = sum(1 for r in enriched if r["forecast_skip_reason"] == "none") print(f"\n 변환 시도 후보 (forecast_skip_reason='none'): {convert_target}건") print(f" 즉시 skip 예상: {len(enriched) - convert_target}건") await engine.dispose() # ─── select-canary ────────────────────────────────────────────────────── CANARY_COLUMNS = [ "doc_id", "title", "file_size", "file_size_band", "text_density", "text_density_band", "doc_type", "bucket_label", ] def _select_canary_buckets(candidates: list[dict], rng: random.Random) -> list[dict]: """40건 stratified — plan §"Sample budget" 표 참조. large 6 / scan_likely 2 / study_note 10 / Academic_Paper 8 / Reference 6 / {Standard,Manual,Specification} 4 / {Note,Report,Memo,NULL} 4 = 40 """ selected: list[dict] = [] used: set[int] = set() def take(pool_filter, n: int, label: str) -> None: avail = [c for c in candidates if c["doc_id"] not in used and pool_filter(c)] rng.shuffle(avail) for c in avail[:n]: picked = dict(c) picked["bucket_label"] = label selected.append(picked) used.add(c["doc_id"]) take(lambda c: c["file_size_band"] == "L", 6, "large") take(lambda c: c["text_density_band"] == "scan-likely", 2, "scan_likely") take(lambda c: c["doc_type"] == "study_note" and c["text_density_band"] == "born-digital", 10, "study_note") take(lambda c: c["doc_type"] == "Academic_Paper" and c["text_density_band"] == "born-digital", 8, "Academic_Paper") take(lambda c: c["doc_type"] == "Reference" and c["text_density_band"] == "born-digital", 6, "Reference") take(lambda c: c["doc_type"] in {"Standard", "Manual", "Specification"}, 4, "tech_doc") take(lambda c: (c["doc_type"] in {"Note", "Report", "Memo"} or not c["doc_type"]), 4, "minor_doc") if len(selected) < 40: leftover = [c for c in candidates if c["doc_id"] not in used] rng.shuffle(leftover) for c in leftover[: 40 - len(selected)]: picked = dict(c) picked["bucket_label"] = "filler" selected.append(picked) used.add(c["doc_id"]) return selected[:40] def cmd_select_canary(inventory: Path, output: Path, seed: int) -> None: if not inventory.is_file(): print(f"[error] inventory CSV 없음: {inventory}", file=sys.stderr) sys.exit(1) candidates: list[dict] = [] with inventory.open(encoding="utf-8") as f: reader = csv.DictReader(f) for row in reader: if row["forecast_skip_reason"] != "none": continue candidates.append({ "doc_id": int(row["doc_id"]), "title": row["title"], "file_size": int(row["file_size"]) if row["file_size"] else 0, "file_size_band": row["file_size_band"], "text_density": row["text_density"], "text_density_band": row["text_density_band"], "doc_type": row["doc_type"], }) rng = random.Random(seed) selected = _select_canary_buckets(candidates, rng) output.parent.mkdir(parents=True, exist_ok=True) with output.open("w", newline="", encoding="utf-8") as f: writer = csv.DictWriter(f, fieldnames=CANARY_COLUMNS, extrasaction="ignore") writer.writeheader() for s in selected: writer.writerow({col: s.get(col, "") for col in CANARY_COLUMNS}) bucket_dist = Counter(s["bucket_label"] for s in selected) print(f"\n## Canary sample (seed={seed})") print(f" selected: {len(selected)} / 40") print(f" output: {output}") print(f"\n bucket_label:") for k, v in bucket_dist.most_common(): print(f" {k:<20} {v:>3}") print(f"\n doc_id list:") for s in sorted(selected, key=lambda x: (x["bucket_label"], x["doc_id"])): print(f" {s['doc_id']:>5} [{s['bucket_label']:<14}] {(s['title'] or '-')[:60]}") # ─── enqueue (one-shot from CSV) ───────────────────────────────────────── async def cmd_enqueue(csv_path: Path, dry_run: bool) -> None: if not csv_path.is_file(): print(f"[error] sample CSV 없음: {csv_path}", file=sys.stderr) sys.exit(1) ids: list[int] = [] with csv_path.open(encoding="utf-8") as f: reader = csv.DictReader(f) for row in reader: ids.append(int(row["doc_id"])) if not ids: print("[abort] enqueue 대상 없음.") return print(f"[targets] {len(ids)} doc_ids: {ids[:10]}{' …' if len(ids) > 10 else ''}") if dry_run: print("[dry-run] --no-dry-run 으로 다시 실행하면 실제 enqueue.") return if not _check_marker_ready(): print("[abort] marker-service /ready 가 ready 가 아님. enqueue 중단.") sys.exit(2) engine = _build_engine() Session = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) from models.queue import enqueue_stage # type: ignore enqueued, skipped = [], [] async with Session() as session: for doc_id in ids: ok = await enqueue_stage(session, doc_id, "markdown") (enqueued if ok else skipped).append(doc_id) await session.commit() print(f"\nenqueued: {len(enqueued)}, skipped (이미 active): {len(skipped)}") if skipped: print(f" skipped ids: {skipped[:20]}{' …' if len(skipped) > 20 else ''}") await engine.dispose() # ─── nightly-enqueue (cron-friendly) ───────────────────────────────────── NIGHTLY_TSV_COLUMNS = [ "date", "enqueued", "active_queue_at_start", "active_queue_oldest_age_min", "pending_pool_remaining", "abort_reason", "marker_ready", ] def _append_tsv(log_tsv: Path, row: dict) -> None: log_tsv.parent.mkdir(parents=True, exist_ok=True) new_file = not log_tsv.exists() or log_tsv.stat().st_size == 0 with log_tsv.open("a", encoding="utf-8") as f: if new_file: f.write("\t".join(NIGHTLY_TSV_COLUMNS) + "\n") f.write("\t".join(str(row.get(col, "")) for col in NIGHTLY_TSV_COLUMNS) + "\n") async def cmd_nightly_enqueue(limit: int, max_active_queue: int, log_tsv: Path, dry_run: bool) -> None: today = datetime.now().strftime("%Y-%m-%d") base_row = { "date": today, "enqueued": 0, "active_queue_at_start": "", "active_queue_oldest_age_min": "", "pending_pool_remaining": "", "abort_reason": "", "marker_ready": "", } # 1. disable flag if DISABLE_FLAG_PATH.exists(): base_row["abort_reason"] = "disable_flag" _append_tsv(log_tsv, base_row) print(f"[abort] disable flag 존재: {DISABLE_FLAG_PATH}") return # 2. marker ready marker_ready = _check_marker_ready() base_row["marker_ready"] = 1 if marker_ready else 0 if not marker_ready: base_row["abort_reason"] = "marker_unhealthy" _append_tsv(log_tsv, base_row) print("[abort] marker-service /ready 가 ready 아님") return engine = _build_engine() Session = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) from models.document import Document # type: ignore from models.queue import ProcessingQueue, enqueue_stage # type: ignore # 3. active queue check + oldest age async with Session() as session: active_stats = (await session.execute(text(""" SELECT COUNT(*) AS active_count, COALESCE(EXTRACT(EPOCH FROM (NOW() - MIN(COALESCE(started_at, created_at))))/60, 0) AS oldest_age_min FROM processing_queue WHERE stage='markdown' AND status IN ('pending','processing') """))).first() active_count = int(active_stats[0]) oldest_age = float(active_stats[1]) base_row["active_queue_at_start"] = active_count base_row["active_queue_oldest_age_min"] = round(oldest_age, 1) if active_count > max_active_queue: base_row["abort_reason"] = "active_queue_threshold" _append_tsv(log_tsv, base_row) print(f"[skip] active queue {active_count} > threshold {max_active_queue} (oldest {oldest_age:.1f}min)") if oldest_age > STUCK_THRESHOLD_MIN: print(f"[warn] oldest active row age {oldest_age:.1f}min > {STUCK_THRESHOLD_MIN}min — possible stuck") await engine.dispose() return # 4. pending pool query async with Session() as session: pool_rows = ( await session.execute(text(f""" SELECT id FROM documents WHERE deleted_at IS NULL AND file_format='pdf' AND md_status='pending' AND id NOT IN ( SELECT document_id FROM processing_queue WHERE stage='markdown' AND status IN ('pending','processing') ) ORDER BY created_at ASC LIMIT {int(limit)} """)) ).all() pool_ids = [r[0] for r in pool_rows] if not pool_ids: base_row["abort_reason"] = "pool_empty" base_row["pending_pool_remaining"] = 0 _append_tsv(log_tsv, base_row) print("[done] pending pool empty — Phase 2 backfill 자연 완료 신호.") await engine.dispose() return if dry_run: print(f"[dry-run] would enqueue {len(pool_ids)} ids: {pool_ids[:10]}{' …' if len(pool_ids) > 10 else ''}") await engine.dispose() return # 5. enqueue enqueued, skipped = [], [] async with Session() as session: for doc_id in pool_ids: ok = await enqueue_stage(session, doc_id, "markdown") (enqueued if ok else skipped).append(doc_id) await session.commit() # 6. pending pool remaining (post-enqueue) async with Session() as session: remaining_count = ( await session.execute(text(""" SELECT COUNT(*) FROM documents d WHERE d.deleted_at IS NULL AND d.file_format='pdf' AND d.md_status='pending' AND d.id NOT IN ( SELECT document_id FROM processing_queue WHERE stage='markdown' AND status IN ('pending','processing') ) """)) ).scalar() base_row["enqueued"] = len(enqueued) base_row["pending_pool_remaining"] = int(remaining_count or 0) _append_tsv(log_tsv, base_row) print(f"\n[ok] enqueued={len(enqueued)} skipped={len(skipped)} remaining_pool={remaining_count}") await engine.dispose() # ─── post-report ──────────────────────────────────────────────────────── POST_REPORT_COLUMNS = [ "doc_id", "title", "final_md_status", "md_extraction_engine", "md_extraction_engine_version", "elapsed_ms_estimate", "text_length_ratio", "markdown_heading_count", "markdown_table_row_count", "markdown_image_count", "warnings", "phase2_processed_at", ] # 1D pilot baseline (project_markdown_canonical_layer.md, Phase 1D 결과 섹션). BASELINE_1D = { "success_rate": 0.92, # 23/25 "skipped_rate": 0.08, "failed_rate": 0.0, "elapsed_p50_ms": 34000, "elapsed_p90_ms": 112000, "text_length_ratio_p50": 1.15, "warnings_heading_jump_pct": 0.86, # 24/28 "warnings_low_image_alt_pct": 0.89, # 25/28 } async def cmd_post_report(output_csv: Path, output_md: Path, phase2_start: str | None) -> None: """Phase 2 sweep 결과 집계 + 1D baseline 비교. phase2_start: ISO timestamp (예: '2026-05-03T00:00:00Z'). 이후 처리된 doc 만 집계. None 이면 sample CSV 들에서 doc_id union 으로 한정 (별도 인자 미지원 — 기본 = NULL = 전 history). 실용적으로는 phase2_start 사용 권장 (Phase 2 코드 push 후 시각). """ engine = _build_engine() Session = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) from models.document import Document # type: ignore from models.queue import ProcessingQueue # type: ignore async with Session() as session: cond = [Document.deleted_at.is_(None), Document.file_format == "pdf"] if phase2_start: cond.append(Document.md_generated_at >= phase2_start) else: cond.append(Document.md_generated_at.is_not(None)) rows = ( await session.execute( select( Document.id, Document.title, Document.md_status, Document.md_extraction_engine, Document.md_extraction_engine_version, Document.md_extraction_quality, Document.md_extraction_error, Document.md_generated_at, ).where(*cond) ) ).all() ids = [r.id for r in rows] if ids: q_rows = ( await session.execute( select( ProcessingQueue.document_id, ProcessingQueue.started_at, ProcessingQueue.completed_at, ).where( ProcessingQueue.document_id.in_(ids), ProcessingQueue.stage == "markdown", ) ) ).all() else: q_rows = [] elapsed: dict[int, float] = {} for q in q_rows: if q.started_at and q.completed_at: elapsed[q.document_id] = (q.completed_at - q.started_at).total_seconds() * 1000 # CSV enriched_rows = [] for r in rows: q = r.md_extraction_quality or {} m = q.get("metrics") if isinstance(q, dict) else {} m = m or {} warnings = q.get("warnings") if isinstance(q, dict) else [] enriched_rows.append({ "doc_id": r.id, "title": (r.title or "")[:120], "final_md_status": r.md_status, "md_extraction_engine": r.md_extraction_engine or "", "md_extraction_engine_version": r.md_extraction_engine_version or "", "elapsed_ms_estimate": int(elapsed.get(r.id, 0)) if r.id in elapsed else "", "text_length_ratio": m.get("text_length_ratio", "") if m else "", "markdown_heading_count": m.get("markdown_heading_count", "") if m else "", "markdown_table_row_count": m.get("markdown_table_row_count", "") if m else "", "markdown_image_count": m.get("markdown_image_count", "") if m else "", "warnings": ",".join(warnings) if isinstance(warnings, list) else "", "phase2_processed_at": r.md_generated_at.isoformat() if r.md_generated_at else "", }) output_csv.parent.mkdir(parents=True, exist_ok=True) with output_csv.open("w", newline="", encoding="utf-8") as f: writer = csv.DictWriter(f, fieldnames=POST_REPORT_COLUMNS, extrasaction="ignore") writer.writeheader() for row in enriched_rows: writer.writerow(row) # Markdown summary total = len(rows) by_status = Counter(r.md_status for r in rows) success_n = by_status.get("success", 0) skipped_n = by_status.get("skipped", 0) failed_n = by_status.get("failed", 0) success_rows = [r for r in rows if r.md_status == "success"] text_ratios = [] heading_jump_count = 0 low_image_alt_count = 0 new_warnings: Counter = Counter() KNOWN_WARNINGS = {"heading_hierarchy_jump", "low_image_alt_text_ratio"} for r in success_rows: q = r.md_extraction_quality or {} m = q.get("metrics") if isinstance(q, dict) else {} m = m or {} if isinstance(m.get("text_length_ratio"), (int, float)): text_ratios.append(float(m["text_length_ratio"])) warnings = q.get("warnings") if isinstance(q, dict) else [] if isinstance(warnings, list): if "heading_hierarchy_jump" in warnings: heading_jump_count += 1 if "low_image_alt_text_ratio" in warnings: low_image_alt_count += 1 for w in warnings: if w not in KNOWN_WARNINGS: new_warnings[w] += 1 elapsed_succ = [elapsed[r.id] for r in success_rows if r.id in elapsed] elapsed_p50 = _percentile(elapsed_succ, 0.5) elapsed_p90 = _percentile(elapsed_succ, 0.9) text_ratio_p50 = _percentile(text_ratios, 0.5) skip_reasons = Counter() for r in rows: if r.md_status == "skipped": reason = (r.md_extraction_error or "unknown").split(":", 1)[0] skip_reasons[reason.strip()] += 1 failed_rows = [r for r in rows if r.md_status == "failed"] outliers = [] for r in success_rows: e = elapsed.get(r.id) if e and e > 300_000: outliers.append((r.id, f"elapsed_ms={int(e)}")) q = r.md_extraction_quality or {} m = q.get("metrics") if isinstance(q, dict) else {} m = m or {} ratio = m.get("text_length_ratio") if isinstance(ratio, (int, float)) and (ratio < 0.5 or ratio > 10): outliers.append((r.id, f"text_length_ratio={ratio:.2f}")) def pct(n: int, d: int) -> str: return f"{n/d*100:.0f}%" if d else "-" def delta_pp(actual_n: int, actual_d: int, baseline: float) -> str: if not actual_d: return "-" return f"{(actual_n/actual_d - baseline)*100:+.1f}pp" def fmt_num(v) -> str: if v is None: return "-" if isinstance(v, float): return f"{v:.2f}" return str(v) scope_line = "Scope: file_format='pdf' AND deleted_at IS NULL" if phase2_start: scope_line += f" AND md_generated_at >= {phase2_start}" else: scope_line += " AND md_generated_at IS NOT NULL" md_lines = [ "# Phase 2 Post-report", "", f"Generated: {datetime.now(timezone.utc).isoformat()}", scope_line, "", f"## 처리 분포 (총 {total}건)", "| status | count | rate |", "|---|---|---|", f"| success | {success_n} | {pct(success_n, total)} |", f"| skipped | {skipped_n} | {pct(skipped_n, total)} |", f"| failed | {failed_n} | {pct(failed_n, total)} |", "", "## vs 1D baseline 비교", "| 메트릭 | 1D | Phase 2 | delta |", "|---|---|---|---|", f"| success rate | 92% | {pct(success_n, total)} | {delta_pp(success_n, total, BASELINE_1D['success_rate'])} |", f"| skipped rate | 8% | {pct(skipped_n, total)} | {delta_pp(skipped_n, total, BASELINE_1D['skipped_rate'])} |", f"| failed rate | 0% | {pct(failed_n, total)} | {delta_pp(failed_n, total, BASELINE_1D['failed_rate'])} |", f"| elapsed_ms p50 | 34000 | {fmt_num(int(elapsed_p50)) if elapsed_p50 else '-'} | - |", f"| elapsed_ms p90 | 112000 | {fmt_num(int(elapsed_p90)) if elapsed_p90 else '-'} | - |", f"| text_length_ratio p50 | 1.15 | {fmt_num(text_ratio_p50)} | - |", f"| warnings: heading_hierarchy_jump | 86% | {pct(heading_jump_count, success_n)} | - |", f"| warnings: low_image_alt_text_ratio | 89% | {pct(low_image_alt_count, success_n)} | - |", "", "## skip reason 분포", ] for reason, c in skip_reasons.most_common(): md_lines.append(f"- `{reason}`: {c}건") if not skip_reasons: md_lines.append("- (no skipped)") md_lines += [ f"", f"## failed 케이스 ({len(failed_rows)}건)", ] for r in failed_rows: md_lines.append(f"- doc {r.id} `{(r.title or '-')[:60]}`: {r.md_extraction_error or '-'}") if not failed_rows: md_lines.append("- (no failed)") md_lines += [ f"", f"## outlier ({len(outliers)}건)", ] for doc_id, note in outliers[:30]: md_lines.append(f"- doc {doc_id}: {note}") if not outliers: md_lines.append("- (no outliers)") if new_warnings: md_lines += [f"", f"## 신규 warning 종류 (1D 미관측)"] for w, c in new_warnings.most_common(): md_lines.append(f"- `{w}`: {c}건") output_md.parent.mkdir(parents=True, exist_ok=True) output_md.write_text("\n".join(md_lines) + "\n", encoding="utf-8") print(f"\n## Phase 2 post-report") print(f" CSV: {output_csv}") print(f" MD: {output_md}") print(f" scope: {total} rows (success {success_n} / skipped {skipped_n} / failed {failed_n})") await engine.dispose() # ─── main ─────────────────────────────────────────────────────────────── def main() -> None: parser = argparse.ArgumentParser( description="Phase 2 markdown canonical layer full backfill", formatter_class=argparse.RawDescriptionHelpFormatter, ) sub = parser.add_subparsers(dest="cmd", required=True) p_inv = sub.add_parser("inventory", help="pending PDFs dry-run inventory CSV") p_inv.add_argument("--output", type=Path, required=True) p_can = sub.add_parser("select-canary", help="stratified 40 canary sample CSV") p_can.add_argument("--inventory", type=Path, required=True) p_can.add_argument("--output", type=Path, required=True) p_can.add_argument("--seed", type=int, default=20260503) p_enq = sub.add_parser("enqueue", help="enqueue from sample CSV (one-shot)") p_enq.add_argument("--csv", type=Path, required=True) g = p_enq.add_mutually_exclusive_group() g.add_argument("--dry-run", dest="dry_run", action="store_true", default=True) g.add_argument("--no-dry-run", dest="dry_run", action="store_false") p_nig = sub.add_parser("nightly-enqueue", help="nightly cron sweep") p_nig.add_argument("--limit", type=int, default=50) p_nig.add_argument("--max-active-queue", type=int, default=5) p_nig.add_argument("--log-tsv", type=Path, required=True) p_nig.add_argument("--dry-run", action="store_true", default=False) p_rep = sub.add_parser("post-report", help="final results CSV + markdown summary") p_rep.add_argument("--output-csv", type=Path, required=True) p_rep.add_argument("--output-md", type=Path, required=True) p_rep.add_argument("--phase2-start", type=str, default=None, help="ISO timestamp; only docs with md_generated_at >= this are counted") args = parser.parse_args() if args.cmd == "inventory": asyncio.run(cmd_inventory(args.output)) elif args.cmd == "select-canary": cmd_select_canary(args.inventory, args.output, args.seed) elif args.cmd == "enqueue": asyncio.run(cmd_enqueue(args.csv, args.dry_run)) elif args.cmd == "nightly-enqueue": asyncio.run(cmd_nightly_enqueue(args.limit, args.max_active_queue, args.log_tsv, args.dry_run)) elif args.cmd == "post-report": asyncio.run(cmd_post_report(args.output_csv, args.output_md, args.phase2_start)) if __name__ == "__main__": main()