diff --git a/scripts/phase1d_pilot.py b/scripts/phase1d_pilot.py new file mode 100644 index 0000000..c476e1c --- /dev/null +++ b/scripts/phase1d_pilot.py @@ -0,0 +1,369 @@ +"""Phase 1D — marker markdown 품질 pilot (one-shot admin script). + +* 영구 worker 경로 아님. 30건 한정 sample 로 baseline 품질 측정. +* Phase 2 전체 백필 결정은 1D 결과 보고 후행. +* 1B.5 (이미지 추출 / _meta 보존) 는 별도 PR — 본 스크립트 영역 아님. + +Stratification: + ai_domain × file_size_bucket (page_count 는 documents 컬럼 없음 → file_size proxy) + 보조: 각 cell 안에서 file_size 작은/큰 mix. + document_type ∈ SKIP_DOC_TYPES 제외 (marker_worker 의 SKIP 룰과 동일). + +Subcommands: + select stratified 30건 dry-run + JSON 저장 + enqueue select 결과를 markdown 큐에 enqueue (uq_queue_active 위반 회피) + report md_status 분포·실패사유·quality 메트릭·UI 검수 URL 출력 + +실행 (GPU 서버): + docker compose exec fastapi python /app/scripts/phase1d_pilot.py select + docker compose exec fastapi python /app/scripts/phase1d_pilot.py enqueue --yes + docker compose exec fastapi python /app/scripts/phase1d_pilot.py report +""" + +import argparse +import asyncio +import json +import os +import re +import sys +from collections import Counter, defaultdict +from pathlib import Path + +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "app")) + +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine + +# marker_worker 의 SKIP 룰 미러 — drift 회피 위해 한 곳만 진실. 변경 시 동기 필요. +SKIP_DOC_TYPES = { + "발주서", "세금계산서", "명세표", + "Invoice", "Purchase_Order", "Estimate", "Statement", +} + +# file_size bucket (page_count proxy). PDF 평균 1페이지 ~10~50KB. +SIZE_BUCKETS = [ + ("small", 0, 500 * 1024), # < 500KB + ("medium", 500 * 1024, 5 * 1024 * 1024), # 500KB ~ 5MB + ("large", 5 * 1024 * 1024, 10**12), # > 5MB +] + +PILOT_TARGET = 30 +DEFAULT_OUT = Path("/tmp/phase1d_pilot.json") + + +def _bucket(file_size: int | None) -> str: + if file_size is None: + return "unknown" + for name, lo, hi in SIZE_BUCKETS: + if lo <= file_size < hi: + return name + return "outlier" + + +def _build_engine() -> "AsyncEngine": + db_url = os.environ["DATABASE_URL"] + return create_async_engine(db_url, pool_pre_ping=True) + + +# ─── select ─── + +async def cmd_select(out_path: Path) -> None: + engine = _build_engine() + Session = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) + + from models.document import Document # type: ignore + + async with Session() as session: + rows = ( + await session.execute( + select( + Document.id, + Document.title, + Document.ai_domain, + Document.document_type, + Document.file_size, + Document.file_path, + Document.file_format, + Document.category, + Document.md_status, + ).where( + Document.deleted_at.is_(None), + Document.file_format == "pdf", + Document.md_status == "pending", + Document.category == "document", + ) + ) + ).all() + + candidates = [ + r for r in rows + if not (r.document_type and r.document_type in SKIP_DOC_TYPES) + ] + print(f"후보 (필터 후): {len(candidates)}건 (전체 pending PDF document: {len(rows)}건)") + + grouped: dict[tuple[str, str], list] = defaultdict(list) + for r in candidates: + domain = r.ai_domain or "unknown" + grouped[(domain, _bucket(r.file_size))].append(r) + + cells = sorted(grouped.keys()) + base_per_cell = max(1, PILOT_TARGET // max(1, len(cells))) + + sample: list = [] + leftover_cells: list[tuple[tuple[str, str], list]] = [] + for cell in cells: + items = sorted(grouped[cell], key=lambda x: (x.file_size or 0, x.id)) + take = min(base_per_cell, len(items)) + if take >= 2: + half = take // 2 + sample.extend(items[:half]) # 작은 쪽 + sample.extend(items[-(take - half):]) # 큰 쪽 + else: + sample.extend(items[:take]) + if len(items) > take: + leftover_cells.append((cell, items[take:])) + + leftover_cells.sort(key=lambda x: -len(x[1])) + li = 0 + while len(sample) < PILOT_TARGET and li < len(leftover_cells): + _, items = leftover_cells[li] + if items: + sample.append(items.pop(0)) + else: + li += 1 + sample = sample[:PILOT_TARGET] + + print(f"\n선정 {len(sample)}건 (목표 {PILOT_TARGET}):\n") + print(f"{'ID':>6} {'KB':>8} {'domain':<22} {'doctype':<22} title") + print("-" * 130) + for r in sample: + print( + f"{r.id:>6} {((r.file_size or 0) // 1024):>8} " + f"{(r.ai_domain or '-')[:22]:<22} " + f"{(r.document_type or '-')[:22]:<22} " + f"{(r.title or '-')[:60]}" + ) + + cell_counts: Counter = Counter() + for r in sample: + cell_counts[((r.ai_domain or "unknown"), _bucket(r.file_size))] += 1 + print("\n분포 (ai_domain × file_size_bucket):") + for (d, b), c in sorted(cell_counts.items()): + print(f" {d:<22} × {b:<8} : {c}") + + payload = { + "target": PILOT_TARGET, + "ids": [r.id for r in sample], + "items": [ + { + "id": r.id, + "title": r.title, + "ai_domain": r.ai_domain, + "document_type": r.document_type, + "file_size": r.file_size, + "file_path": r.file_path, + } + for r in sample + ], + } + out_path.write_text(json.dumps(payload, ensure_ascii=False, indent=2)) + print(f"\n저장: {out_path}") + await engine.dispose() + + +# ─── enqueue ─── + +async def cmd_enqueue(in_path: Path, yes: bool) -> None: + payload = json.loads(in_path.read_text()) + ids = payload["ids"] + + if not yes: + confirm = input(f"\n{len(ids)}건 markdown 큐에 enqueue 합니다. 진행? [y/N] ") + if confirm.strip().lower() not in ("y", "yes"): + print("취소됨.") + return + + engine = _build_engine() + Session = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) + + from models.queue import enqueue_stage # type: ignore + + enqueued, skipped = [], [] + async with Session() as session: + for doc_id in ids: + ok = await enqueue_stage(session, doc_id, "markdown") + (enqueued if ok else skipped).append(doc_id) + await session.commit() + + print(f"enqueued: {len(enqueued)}, skipped (이미 active): {len(skipped)}") + if skipped: + print(f" skipped ids: {skipped[:20]}{' …' if len(skipped) > 20 else ''}") + await engine.dispose() + + +# ─── report ─── + +def _stat(vals: list[float]) -> str: + if not vals: + return "-" + s = sorted(vals) + n = len(s) + p50 = s[n // 2] + p90 = s[min(n - 1, int(n * 0.9))] + return f"min={s[0]:.2f} p50={p50:.2f} p90={p90:.2f} max={s[-1]:.2f}" + + +KATEX_INLINE_RE = re.compile(r"(? None: + payload = json.loads(in_path.read_text()) + ids: list[int] = payload["ids"] + + engine = _build_engine() + Session = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) + + from models.document import Document # type: ignore + from models.queue import ProcessingQueue # type: ignore + + async with Session() as session: + rows = ( + await session.execute( + select( + Document.id, Document.title, Document.ai_domain, Document.file_size, + Document.md_status, Document.md_extraction_error, + Document.md_extraction_quality, Document.md_content, + Document.md_generated_at, Document.created_at, + ).where(Document.id.in_(ids)) + ) + ).all() + + q_rows = ( + await session.execute( + select( + ProcessingQueue.document_id, + ProcessingQueue.status, + ProcessingQueue.attempts, + ProcessingQueue.started_at, + ProcessingQueue.completed_at, + ).where( + ProcessingQueue.document_id.in_(ids), + ProcessingQueue.stage == "markdown", + ) + ) + ).all() + + elapsed: dict[int, float] = {} + queue_status: dict[int, str] = {} + queue_attempts: dict[int, int] = {} + for q in q_rows: + queue_status[q.document_id] = q.status + queue_attempts[q.document_id] = q.attempts + if q.started_at and q.completed_at: + elapsed[q.document_id] = (q.completed_at - q.started_at).total_seconds() + + by_status = Counter(r.md_status for r in rows) + print(f"\n## md_status 분포 (sample {len(rows)}건)") + for s, c in sorted(by_status.items(), key=lambda x: -x[1]): + print(f" {s:<12} {c}") + in_queue = [i for i in ids if queue_status.get(i) in {"pending", "processing"}] + if in_queue: + print(f"\n 처리 대기/중 (큐 active): {len(in_queue)}건") + + es = [v for k, v in elapsed.items() + if next((r for r in rows if r.id == k and r.md_status == "success"), None)] + if es: + print(f"\n## 평균 처리 시간 (success {len(es)}건)") + print(f" avg={sum(es)/len(es):.1f}s {_stat(es)}") + else: + print("\n## 처리 시간: 측정 가능한 success 없음") + + fail_reasons: Counter = Counter() + for r in rows: + if r.md_status == "failed": + fail_reasons[(r.md_extraction_error or "unknown")[:140]] += 1 + if fail_reasons: + print("\n## 실패 사유 top 5") + for reason, c in fail_reasons.most_common(5): + print(f" {c:>3} {reason}") + + text_ratios, heading_jumps, table_rows, image_counts = [], [], [], [] + warning_kinds: Counter = Counter() + katex_candidates, table_with_rows = [], [] + has_headings = 0 + success_rows = [r for r in rows if r.md_status == "success"] + for r in success_rows: + q = r.md_extraction_quality or {} + m = q.get("metrics", {}) or {} + if m.get("text_length_ratio") is not None: text_ratios.append(m["text_length_ratio"]) + if m.get("heading_jump_count") is not None: heading_jumps.append(m["heading_jump_count"]) + if m.get("markdown_table_row_count") is not None: + table_rows.append(m["markdown_table_row_count"]) + if m["markdown_table_row_count"] > 0: + table_with_rows.append(r.id) + if m.get("markdown_image_count") is not None: image_counts.append(m["markdown_image_count"]) + if (m.get("markdown_heading_count") or 0) > 0: + has_headings += 1 + for w in (q.get("warnings") or []): + warning_kinds[w] += 1 + c = r.md_content or "" + if "$$" in c or KATEX_INLINE_RE.search(c): + katex_candidates.append(r.id) + + if success_rows: + print(f"\n## quality 메트릭 (success {len(success_rows)}건)") + print(f" text_length_ratio {_stat(text_ratios)}") + print(f" heading_jump_count {_stat(heading_jumps)}") + print(f" markdown_table_row_count {_stat(table_rows)} (rows>0: {len(table_with_rows)}건)") + print(f" markdown_image_count {_stat(image_counts)} (현재 server.py 가 _images 버림 → 0 정상)") + print(f" heading anchor 후보 (markdown_heading_count > 0): {has_headings}건") + print(f" KaTeX 후보 ($-수식 매칭): {len(katex_candidates)}건 ids={katex_candidates[:10]}") + if warning_kinds: + print("\n warnings (kind):") + for w, c in warning_kinds.most_common(): + print(f" {c:>3} {w}") + + print("\n## file_size bucket 별 success 비율") + bucket_stat: dict[str, list[int]] = defaultdict(lambda: [0, 0]) + for r in rows: + b = _bucket(r.file_size) + bucket_stat[b][1] += 1 + if r.md_status == "success": + bucket_stat[b][0] += 1 + for b in ("small", "medium", "large", "unknown", "outlier"): + s, t = bucket_stat[b] + if t == 0: + continue + rate = (s / t * 100) if t else 0 + print(f" {b:<8} {s}/{t} ({rate:.0f}%)") + + print("\n## 사용자 검수 후보 (UI URL — 표 깨짐/heading anchor/KaTeX 직접 확인):") + for r in sorted(rows, key=lambda x: (x.md_status or "", -(x.file_size or 0))): + marker = "★" if r.id in katex_candidates else " " + print(f" {marker} https://document.hyungi.net/documents/{r.id} [{r.md_status}] {(r.title or '-')[:60]}") + + await engine.dispose() + + +def main() -> None: + parser = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter) + sub = parser.add_subparsers(dest="cmd", required=True) + p_sel = sub.add_parser("select", help="stratified 30건 dry-run") + p_sel.add_argument("--out", type=Path, default=DEFAULT_OUT) + p_enq = sub.add_parser("enqueue", help="markdown 큐 enqueue") + p_enq.add_argument("--in", dest="in_path", type=Path, default=DEFAULT_OUT) + p_enq.add_argument("--yes", action="store_true") + p_rep = sub.add_parser("report", help="결과 집계") + p_rep.add_argument("--in", dest="in_path", type=Path, default=DEFAULT_OUT) + + args = parser.parse_args() + if args.cmd == "select": + asyncio.run(cmd_select(args.out)) + elif args.cmd == "enqueue": + asyncio.run(cmd_enqueue(args.in_path, args.yes)) + elif args.cmd == "report": + asyncio.run(cmd_report(args.in_path)) + + +if __name__ == "__main__": + main()