From 25ee10ac345db37feb502e0a36a5f5649b077958 Mon Sep 17 00:00:00 2001 From: Hyungi Ahn Date: Sat, 2 May 2026 23:44:59 +0000 Subject: [PATCH] =?UTF-8?q?feat(scripts):=20Phase=202=20markdown=20backfil?= =?UTF-8?q?l=20=E2=80=94=20script=20+=20README?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - scripts/phase2_backfill.py: 5 subcommands - inventory: pending PDFs dry-run CSV with skip forecast - select-canary: stratified 40 sample (seed 20260503) - enqueue: one-shot from sample CSV (--no-dry-run gate) - nightly-enqueue: cron-friendly with disable flag / marker /ready / active-queue threshold (oldest_age stuck guard) / DB pool guards - post-report: final state CSV + 1D baseline comparison MD - evals/markdown/README.md: Phase 2 section appended - plan: ~/.claude/plans/iridescent-gathering-clover.md - depends on Phase 1B handwritten skip 7d0fca2 (marker_worker side guard) --- evals/markdown/README.md | 111 ++++++ scripts/phase2_backfill.py | 790 +++++++++++++++++++++++++++++++++++++ 2 files changed, 901 insertions(+) create mode 100644 scripts/phase2_backfill.py diff --git a/evals/markdown/README.md b/evals/markdown/README.md index a9ef9c2..47eb305 100644 --- a/evals/markdown/README.md +++ b/evals/markdown/README.md @@ -117,3 +117,114 @@ docker compose exec fastapi python /app/scripts/phase1d_pilot.py select \ ``` **enqueue 의 `--yes` 또는 `--no-dry-run` 류 실행은 별도 사용자 승인 + 야간 단발 sweep 윈도우 (23:00~03:00 KST) 안에서만**. 30건 backfill = marker-service BATCH_SIZE=1 × 평균 5분/건 ≈ 2.5h. + +--- + +# Phase 2 — Full Backfill (legacy pending PDFs) + +> Plan: `~/.claude/plans/iridescent-gathering-clover.md` +> Script: `scripts/phase2_backfill.py` (subcommands: inventory / select-canary / enqueue / nightly-enqueue / post-report) + +## 목적 + +1D pilot 결과 = engineering go signal. legacy pending PDF (1D 후 잔여 ~237건) 을 marker_worker 로 변환해 `md_status='success'` 누적. **신규 업로드 우선권 보존, 야간 저부하 sweep, DB state 기반 idempotent checkpoint**. + +진행 로드맵: **2-A dry-run inventory → 2-B canary 40건 → 2-C nightly sweep ~4-5 nights → 2-D post-report**. + +## 파일 + +| 파일 | 역할 | 갱신 시점 | +|---|---|---| +| `phase2_inventory.csv` | pending PDFs dry-run inventory + skip forecast | 2-A 종료 (commit, 1회) | +| `phase2_canary_sample.csv` | stratified 40건 canary sample (시드 `20260503`) | 2-B(a) (commit) | +| `phase2_canary_result.md` | canary 결과 요약 + 1D 비교 + GO/HALT 결정 근거 | 2-B 종료 (commit) | +| `phase2_nightly_log.tsv` | 야간 sweep 한 줄/일 (date / enqueued / active_queue_at_start / active_queue_oldest_age_min / pending_pool_remaining / abort_reason / marker_ready) | append 매 sweep, 주 1회 commit | +| `phase2_post_report.csv` | Phase 2 sweep 처리된 doc 별 final state + quality | 2-D (commit) | +| `phase2_post_report.md` | 처리 분포 + 1D baseline 비교 + skip/failed/outlier 목록 | 2-D (commit) | + +## Subcommand 사용법 + +### inventory (read-only, dry-run) +```bash +docker exec hyungi_document_server-fastapi-1 python /app/scripts/phase2_backfill.py inventory \ + --output /app/evals/markdown/phase2_inventory.csv +``` +- pending PDFs 전체에 대해 doc_id / file_size / text_density / doc_type / forecast_skip_reason 적재. +- forecast_skip_reason ∈ {unsupported_extension / doctype_skip / handwritten_hint / over_max_pages_estimated / none}. 'none' = 변환 시도 후보. +- handwritten_hint = title/path 에 `필기|손글씨|handwritten|handwriting` 매칭 (marker_worker 의 7d0fca2 룰 미러). +- over_max_pages_estimated = file_size > 25MB proxy. 실 page_count 는 marker_worker 가 PyMuPDF 로 결정. + +### select-canary (재현성 시드) +```bash +docker exec hyungi_document_server-fastapi-1 python /app/scripts/phase2_backfill.py select-canary \ + --inventory /app/evals/markdown/phase2_inventory.csv \ + --output /app/evals/markdown/phase2_canary_sample.csv \ + --seed 20260503 +``` +- 40건 buckets: large 6 / scan_likely 2 / study_note 10 / Academic_Paper 8 / Reference 6 / {Standard,Manual,Specification} 4 / {Note,Report,Memo,NULL} 4 +- inventory 의 `forecast_skip_reason='none'` 만 선정 후보. +- 시드 고정 → 재실행 시 동일 sample. + +### enqueue (one-shot, 사용자 승인 게이트) +```bash +# dry-run (default) +docker exec hyungi_document_server-fastapi-1 python /app/scripts/phase2_backfill.py enqueue \ + --csv /app/evals/markdown/phase2_canary_sample.csv + +# actual (사용자 승인 후) +docker exec hyungi_document_server-fastapi-1 python /app/scripts/phase2_backfill.py enqueue \ + --csv /app/evals/markdown/phase2_canary_sample.csv --no-dry-run +``` +- marker-service `/ready` 사전 검증. +- `enqueue_stage` idempotent — 중복 호출 안전. + +### nightly-enqueue (cron / manual) +```bash +docker exec hyungi_document_server-fastapi-1 python /app/scripts/phase2_backfill.py nightly-enqueue \ + --limit 50 --max-active-queue 5 \ + --log-tsv /app/evals/markdown/phase2_nightly_log.tsv +``` +- 가드 순서: disable flag (`/tmp/phase2_disable`) → marker /ready → active_queue ≤ threshold → DB pool 비어있지 않음 → enqueue. +- 매 sweep log_tsv 한 줄. abort_reason ∈ {disable_flag / marker_unhealthy / active_queue_threshold / pool_empty / empty}. +- pool_empty = Phase 2 자연 완료 신호 (cron 제거 hard gate trigger). + +### post-report +```bash +docker exec hyungi_document_server-fastapi-1 python /app/scripts/phase2_backfill.py post-report \ + --output-csv /app/evals/markdown/phase2_post_report.csv \ + --output-md /app/evals/markdown/phase2_post_report.md \ + --phase2-start 2026-05-03T00:00:00Z +``` +- `--phase2-start` ISO timestamp 이후 `md_generated_at` 만 집계 (Phase 2 코드 push 시점 권장). +- 1D baseline (success 92% / elapsed_p50 34s / text_length_ratio_p50 1.15) 와 비교. +- outlier 후보: elapsed_ms > 300s, text_length_ratio < 0.5 또는 > 10, 신규 warning 종류. + +## 의사결정 게이트 + +### 2-B canary GO/HALT +- success ≥ 36/40 (90%) AND failed ≤ 2 AND skipped ≤ 6 → **2-C 진입 GO** +- 위 미충족 → HALT, 사용자 보고 후 재검토 + +### 2-C nightly abort +- 1 night 안에 failed > 5 → script 가 disable flag 자동 생성 +- marker-service `/ready` 실패 → 그 sweep 건너뜀, 다음 sweep 재시도 +- active_queue_oldest_age_min > 60 (stuck 임계) → log 에 [warn], 사용자 morning check 으로 판단 + +### 2-D 종료 hard gate (Phase 2 closed 선언 직전) +- (cron 모드) `crontab -l | grep phase2_backfill` 결과 비어 있어야 함 +- `~/.phase2_disable` 파일 정리 됨 +- pending PDF (`md_status='pending'`, `file_format='pdf'`) ≤ 5 +- processing_queue markdown active = 0 + +## 1D 와의 차이 + +| 항목 | 1D | Phase 2 | +|---|---|---| +| 목적 | failure mode 진단 | 풀 변환 | +| 대상 | 30건 stratified | 237건 잔여 | +| sample_source | existing_success + controlled_backfill | controlled_backfill only | +| 처리 모드 | one-shot cron (1회) | nightly cron (~4-5 nights) | +| 평가 | 사용자 5축 rubric | marker 자가 metrics + 1D baseline 비교 | +| anchor 보존 | doc 4809 forced_include | (재처리 안 함) | +| handwritten | over-sample (3건) | marker_worker 자동 skip 신뢰 | + diff --git a/scripts/phase2_backfill.py b/scripts/phase2_backfill.py new file mode 100644 index 0000000..d2f89a8 --- /dev/null +++ b/scripts/phase2_backfill.py @@ -0,0 +1,790 @@ +"""Phase 2 — markdown canonical layer full backfill. + +* legacy pending PDF (Phase 1B/1C/1D 후 잔여) 을 야간 sweep + canary 단계로 변환. +* 1D pilot (25건 controlled_backfill, 23 success / 2 skipped / 0 failed) 가 engineering go signal. +* handwritten 자동 skip (commit 7d0fca2) 와 marker_worker SKIP_DOC_TYPES / MAX_PAGES guard 모두 운영 시점 가드. 본 스크립트는 inventory · sample · enqueue 만 담당. + +Subcommands: + inventory pending PDFs 의 dry-run inventory CSV 작성 (skip forecast 포함) + select-canary inventory 에서 stratified 40건 canary sample CSV 작성 (재현성 시드) + enqueue sample CSV 의 doc_id 들을 markdown 큐에 enqueue (one-shot, --no-dry-run 필요) + nightly-enqueue 야간 sweep — disable flag / marker ready / active-queue threshold / DB pool 가드 후 limit 만큼 enqueue + log_tsv + post-report 최종 결과 CSV + markdown 요약 (1D baseline 비교 포함) + +실행 (GPU 서버): + docker exec hyungi_document_server-fastapi-1 python /app/scripts/phase2_backfill.py inventory \ + --output /app/evals/markdown/phase2_inventory.csv + + docker exec hyungi_document_server-fastapi-1 python /app/scripts/phase2_backfill.py select-canary \ + --inventory /app/evals/markdown/phase2_inventory.csv \ + --output /app/evals/markdown/phase2_canary_sample.csv \ + --seed 20260503 + + docker exec hyungi_document_server-fastapi-1 python /app/scripts/phase2_backfill.py enqueue \ + --csv /app/evals/markdown/phase2_canary_sample.csv --no-dry-run + + # cron (nightly): + docker exec hyungi_document_server-fastapi-1 python /app/scripts/phase2_backfill.py nightly-enqueue \ + --limit 50 --max-active-queue 5 \ + --log-tsv /app/evals/markdown/phase2_nightly_log.tsv + + docker exec hyungi_document_server-fastapi-1 python /app/scripts/phase2_backfill.py post-report \ + --output-csv /app/evals/markdown/phase2_post_report.csv \ + --output-md /app/evals/markdown/phase2_post_report.md +""" + +import argparse +import asyncio +import csv +import json +import os +import random +import re +import sys +import urllib.error +import urllib.request +from collections import Counter, defaultdict +from datetime import datetime, timezone +from pathlib import Path + +# fastapi 컨테이너 WORKDIR=/app — `from models...` import 가능하게 path 추가. +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..")) + +from sqlalchemy import func, select, text +from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine + +# marker_worker 룰 미러 (1D pilot 패턴 따름 — drift 회피 위해 한 곳). +SKIP_DOC_TYPES = { + "발주서", "세금계산서", "명세표", + "Invoice", "Purchase_Order", "Estimate", "Statement", +} + +# handwritten 자동 skip 키워드 (commit 7d0fca2 의 marker_worker 정의와 동일). +HANDWRITTEN_REGEX = re.compile(r"필기|손글씨|handwritten|handwriting", re.IGNORECASE) + +# Inventory 의 page_count proxy — file_size 가 25MB 초과면 MAX_PAGES=200 초과 가능성 높음. +# 실제 marker_worker 가 PyMuPDF 로 page_count 확인 후 결정. 본 forecast 는 plan budget 추정용. +MAX_PAGES_FILESIZE_PROXY = 25 * 1024 * 1024 + +# stuck 판단 임계 — 평균 변환 시간 ~48s (1D 실측). 30분이면 의심, 60분이면 stuck likely. +STUCK_THRESHOLD_MIN = 60.0 + +# Disable flag — 사용자 또는 자동 abort 가드가 생성. cron 매 사이클 시작 시 체크. +DISABLE_FLAG_PATH = Path("/tmp/phase2_disable") # 컨테이너 내부 경로 (호스트 mount 별도) + +# marker-service ready endpoint (Phase 1B 명세). +MARKER_READY_URL = "http://marker-service:3300/ready" + + +# ─── helpers ──────────────────────────────────────────────────────────── + + +def _build_engine(): + db_url = os.environ["DATABASE_URL"] + return create_async_engine(db_url, pool_pre_ping=True) + + +def _file_size_band(file_size: int | None) -> str: + if file_size is None: + return "unknown" + if file_size < 1 * 1024 * 1024: + return "S" + if file_size < 10 * 1024 * 1024: + return "M" + return "L" + + +def _text_density(text_len: int, file_size: int | None) -> float | None: + if not file_size or file_size <= 0: + return None + return text_len / (file_size / 1024.0) + + +def _text_density_band(density: float | None) -> str: + if density is None: + return "unknown" + if density < 5.0: + return "scan-likely" + if density < 50.0: + return "mixed" + return "born-digital" + + +def _forecast_skip_reason(file_format: str | None, doc_type: str | None, + title: str | None, file_path: str | None, + file_size: int | None) -> str: + if file_format and file_format.lower() != "pdf": + return "unsupported_extension" + if doc_type and doc_type in SKIP_DOC_TYPES: + return "doctype_skip" + blob = " ".join(filter(None, [title or "", file_path or ""])) + if HANDWRITTEN_REGEX.search(blob): + return "handwritten_hint" + if file_size and file_size > MAX_PAGES_FILESIZE_PROXY: + return "over_max_pages_estimated" + return "none" + + +def _percentile(values: list[float], p: float) -> float | None: + if not values: + return None + s = sorted(values) + idx = max(0, min(len(s) - 1, int(len(s) * p))) + return s[idx] + + +def _check_marker_ready() -> bool: + try: + with urllib.request.urlopen(MARKER_READY_URL, timeout=5) as resp: + data = json.load(resp) + return data.get("status") == "ready" + except (urllib.error.URLError, urllib.error.HTTPError, json.JSONDecodeError, OSError): + return False + + +# ─── inventory ────────────────────────────────────────────────────────── + +INVENTORY_COLUMNS = [ + "doc_id", "title", "file_path", "file_size", "file_size_band", + "text_len", "text_density", "text_density_band", + "doc_type", "forecast_skip_reason", "created_at", +] + + +async def cmd_inventory(output: Path) -> None: + engine = _build_engine() + Session = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) + + from models.document import Document # type: ignore + + async with Session() as session: + rows = ( + await session.execute( + select( + Document.id, Document.title, Document.file_path, Document.file_size, + Document.file_format, Document.document_type, + Document.extracted_text, Document.created_at, + ).where( + Document.deleted_at.is_(None), + Document.file_format == "pdf", + Document.md_status == "pending", + ) + .order_by(Document.created_at.asc()) + ) + ).all() + + enriched = [] + for r in rows: + text_len = len(r.extracted_text or "") + density = _text_density(text_len, r.file_size) + enriched.append({ + "doc_id": r.id, + "title": (r.title or ""), + "file_path": (r.file_path or ""), + "file_size": r.file_size or 0, + "file_size_band": _file_size_band(r.file_size), + "text_len": text_len, + "text_density": round(density, 3) if density is not None else "", + "text_density_band": _text_density_band(density), + "doc_type": r.document_type or "", + "forecast_skip_reason": _forecast_skip_reason( + r.file_format, r.document_type, r.title, r.file_path, r.file_size + ), + "created_at": r.created_at.isoformat() if r.created_at else "", + }) + + output.parent.mkdir(parents=True, exist_ok=True) + with output.open("w", newline="", encoding="utf-8") as f: + writer = csv.DictWriter(f, fieldnames=INVENTORY_COLUMNS, extrasaction="ignore") + writer.writeheader() + for row in enriched: + writer.writerow(row) + + skip_dist = Counter(r["forecast_skip_reason"] for r in enriched) + band_dist = Counter(r["file_size_band"] for r in enriched) + density_dist = Counter(r["text_density_band"] for r in enriched) + type_dist = Counter(r["doc_type"] or "(NULL)" for r in enriched) + + print(f"\n## Phase 2 inventory") + print(f" total pending PDFs: {len(enriched)}") + print(f" output: {output}") + print(f"\n forecast_skip_reason:") + for k, v in skip_dist.most_common(): + print(f" {k:<30} {v:>4}") + print(f"\n file_size_band: ", dict(band_dist)) + print(f" text_density_band: ", dict(density_dist)) + print(f"\n doc_type top 10:") + for k, v in type_dist.most_common(10): + print(f" {k:<25} {v:>4}") + + convert_target = sum(1 for r in enriched if r["forecast_skip_reason"] == "none") + print(f"\n 변환 시도 후보 (forecast_skip_reason='none'): {convert_target}건") + print(f" 즉시 skip 예상: {len(enriched) - convert_target}건") + + await engine.dispose() + + +# ─── select-canary ────────────────────────────────────────────────────── + +CANARY_COLUMNS = [ + "doc_id", "title", "file_size", "file_size_band", + "text_density", "text_density_band", "doc_type", "bucket_label", +] + + +def _select_canary_buckets(candidates: list[dict], rng: random.Random) -> list[dict]: + """40건 stratified — plan §"Sample budget" 표 참조. + + large 6 / scan_likely 2 / study_note 10 / Academic_Paper 8 / Reference 6 / + {Standard,Manual,Specification} 4 / {Note,Report,Memo,NULL} 4 = 40 + """ + selected: list[dict] = [] + used: set[int] = set() + + def take(pool_filter, n: int, label: str) -> None: + avail = [c for c in candidates if c["doc_id"] not in used and pool_filter(c)] + rng.shuffle(avail) + for c in avail[:n]: + picked = dict(c) + picked["bucket_label"] = label + selected.append(picked) + used.add(c["doc_id"]) + + take(lambda c: c["file_size_band"] == "L", 6, "large") + take(lambda c: c["text_density_band"] == "scan-likely", 2, "scan_likely") + take(lambda c: c["doc_type"] == "study_note" and c["text_density_band"] == "born-digital", 10, "study_note") + take(lambda c: c["doc_type"] == "Academic_Paper" and c["text_density_band"] == "born-digital", 8, "Academic_Paper") + take(lambda c: c["doc_type"] == "Reference" and c["text_density_band"] == "born-digital", 6, "Reference") + take(lambda c: c["doc_type"] in {"Standard", "Manual", "Specification"}, 4, "tech_doc") + take(lambda c: (c["doc_type"] in {"Note", "Report", "Memo"} or not c["doc_type"]), 4, "minor_doc") + + if len(selected) < 40: + leftover = [c for c in candidates if c["doc_id"] not in used] + rng.shuffle(leftover) + for c in leftover[: 40 - len(selected)]: + picked = dict(c) + picked["bucket_label"] = "filler" + selected.append(picked) + used.add(c["doc_id"]) + + return selected[:40] + + +def cmd_select_canary(inventory: Path, output: Path, seed: int) -> None: + if not inventory.is_file(): + print(f"[error] inventory CSV 없음: {inventory}", file=sys.stderr) + sys.exit(1) + + candidates: list[dict] = [] + with inventory.open(encoding="utf-8") as f: + reader = csv.DictReader(f) + for row in reader: + if row["forecast_skip_reason"] != "none": + continue + candidates.append({ + "doc_id": int(row["doc_id"]), + "title": row["title"], + "file_size": int(row["file_size"]) if row["file_size"] else 0, + "file_size_band": row["file_size_band"], + "text_density": row["text_density"], + "text_density_band": row["text_density_band"], + "doc_type": row["doc_type"], + }) + + rng = random.Random(seed) + selected = _select_canary_buckets(candidates, rng) + + output.parent.mkdir(parents=True, exist_ok=True) + with output.open("w", newline="", encoding="utf-8") as f: + writer = csv.DictWriter(f, fieldnames=CANARY_COLUMNS, extrasaction="ignore") + writer.writeheader() + for s in selected: + writer.writerow({col: s.get(col, "") for col in CANARY_COLUMNS}) + + bucket_dist = Counter(s["bucket_label"] for s in selected) + print(f"\n## Canary sample (seed={seed})") + print(f" selected: {len(selected)} / 40") + print(f" output: {output}") + print(f"\n bucket_label:") + for k, v in bucket_dist.most_common(): + print(f" {k:<20} {v:>3}") + print(f"\n doc_id list:") + for s in sorted(selected, key=lambda x: (x["bucket_label"], x["doc_id"])): + print(f" {s['doc_id']:>5} [{s['bucket_label']:<14}] {(s['title'] or '-')[:60]}") + + +# ─── enqueue (one-shot from CSV) ───────────────────────────────────────── + + +async def cmd_enqueue(csv_path: Path, dry_run: bool) -> None: + if not csv_path.is_file(): + print(f"[error] sample CSV 없음: {csv_path}", file=sys.stderr) + sys.exit(1) + + ids: list[int] = [] + with csv_path.open(encoding="utf-8") as f: + reader = csv.DictReader(f) + for row in reader: + ids.append(int(row["doc_id"])) + + if not ids: + print("[abort] enqueue 대상 없음.") + return + + print(f"[targets] {len(ids)} doc_ids: {ids[:10]}{' …' if len(ids) > 10 else ''}") + + if dry_run: + print("[dry-run] --no-dry-run 으로 다시 실행하면 실제 enqueue.") + return + + if not _check_marker_ready(): + print("[abort] marker-service /ready 가 ready 가 아님. enqueue 중단.") + sys.exit(2) + + engine = _build_engine() + Session = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) + from models.queue import enqueue_stage # type: ignore + + enqueued, skipped = [], [] + async with Session() as session: + for doc_id in ids: + ok = await enqueue_stage(session, doc_id, "markdown") + (enqueued if ok else skipped).append(doc_id) + await session.commit() + + print(f"\nenqueued: {len(enqueued)}, skipped (이미 active): {len(skipped)}") + if skipped: + print(f" skipped ids: {skipped[:20]}{' …' if len(skipped) > 20 else ''}") + await engine.dispose() + + +# ─── nightly-enqueue (cron-friendly) ───────────────────────────────────── + +NIGHTLY_TSV_COLUMNS = [ + "date", "enqueued", "active_queue_at_start", "active_queue_oldest_age_min", + "pending_pool_remaining", "abort_reason", "marker_ready", +] + + +def _append_tsv(log_tsv: Path, row: dict) -> None: + log_tsv.parent.mkdir(parents=True, exist_ok=True) + new_file = not log_tsv.exists() or log_tsv.stat().st_size == 0 + with log_tsv.open("a", encoding="utf-8") as f: + if new_file: + f.write("\t".join(NIGHTLY_TSV_COLUMNS) + "\n") + f.write("\t".join(str(row.get(col, "")) for col in NIGHTLY_TSV_COLUMNS) + "\n") + + +async def cmd_nightly_enqueue(limit: int, max_active_queue: int, log_tsv: Path, + dry_run: bool) -> None: + today = datetime.now().strftime("%Y-%m-%d") + base_row = { + "date": today, + "enqueued": 0, + "active_queue_at_start": "", + "active_queue_oldest_age_min": "", + "pending_pool_remaining": "", + "abort_reason": "", + "marker_ready": "", + } + + # 1. disable flag + if DISABLE_FLAG_PATH.exists(): + base_row["abort_reason"] = "disable_flag" + _append_tsv(log_tsv, base_row) + print(f"[abort] disable flag 존재: {DISABLE_FLAG_PATH}") + return + + # 2. marker ready + marker_ready = _check_marker_ready() + base_row["marker_ready"] = 1 if marker_ready else 0 + if not marker_ready: + base_row["abort_reason"] = "marker_unhealthy" + _append_tsv(log_tsv, base_row) + print("[abort] marker-service /ready 가 ready 아님") + return + + engine = _build_engine() + Session = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) + + from models.document import Document # type: ignore + from models.queue import ProcessingQueue, enqueue_stage # type: ignore + + # 3. active queue check + oldest age + async with Session() as session: + active_stats = (await session.execute(text(""" + SELECT COUNT(*) AS active_count, + COALESCE(EXTRACT(EPOCH FROM (NOW() - MIN(COALESCE(started_at, created_at))))/60, 0) AS oldest_age_min + FROM processing_queue + WHERE stage='markdown' AND status IN ('pending','processing') + """))).first() + active_count = int(active_stats[0]) + oldest_age = float(active_stats[1]) + + base_row["active_queue_at_start"] = active_count + base_row["active_queue_oldest_age_min"] = round(oldest_age, 1) + + if active_count > max_active_queue: + base_row["abort_reason"] = "active_queue_threshold" + _append_tsv(log_tsv, base_row) + print(f"[skip] active queue {active_count} > threshold {max_active_queue} (oldest {oldest_age:.1f}min)") + if oldest_age > STUCK_THRESHOLD_MIN: + print(f"[warn] oldest active row age {oldest_age:.1f}min > {STUCK_THRESHOLD_MIN}min — possible stuck") + await engine.dispose() + return + + # 4. pending pool query + async with Session() as session: + pool_rows = ( + await session.execute(text(f""" + SELECT id FROM documents + WHERE deleted_at IS NULL + AND file_format='pdf' + AND md_status='pending' + AND id NOT IN ( + SELECT document_id FROM processing_queue + WHERE stage='markdown' AND status IN ('pending','processing') + ) + ORDER BY created_at ASC + LIMIT {int(limit)} + """)) + ).all() + pool_ids = [r[0] for r in pool_rows] + + if not pool_ids: + base_row["abort_reason"] = "pool_empty" + base_row["pending_pool_remaining"] = 0 + _append_tsv(log_tsv, base_row) + print("[done] pending pool empty — Phase 2 backfill 자연 완료 신호.") + await engine.dispose() + return + + if dry_run: + print(f"[dry-run] would enqueue {len(pool_ids)} ids: {pool_ids[:10]}{' …' if len(pool_ids) > 10 else ''}") + await engine.dispose() + return + + # 5. enqueue + enqueued, skipped = [], [] + async with Session() as session: + for doc_id in pool_ids: + ok = await enqueue_stage(session, doc_id, "markdown") + (enqueued if ok else skipped).append(doc_id) + await session.commit() + + # 6. pending pool remaining (post-enqueue) + async with Session() as session: + remaining_count = ( + await session.execute(text(""" + SELECT COUNT(*) FROM documents d + WHERE d.deleted_at IS NULL + AND d.file_format='pdf' + AND d.md_status='pending' + AND d.id NOT IN ( + SELECT document_id FROM processing_queue + WHERE stage='markdown' AND status IN ('pending','processing') + ) + """)) + ).scalar() + + base_row["enqueued"] = len(enqueued) + base_row["pending_pool_remaining"] = int(remaining_count or 0) + _append_tsv(log_tsv, base_row) + + print(f"\n[ok] enqueued={len(enqueued)} skipped={len(skipped)} remaining_pool={remaining_count}") + await engine.dispose() + + +# ─── post-report ──────────────────────────────────────────────────────── + +POST_REPORT_COLUMNS = [ + "doc_id", "title", "final_md_status", "md_extraction_engine", "md_extraction_engine_version", + "elapsed_ms_estimate", "text_length_ratio", "markdown_heading_count", + "markdown_table_row_count", "markdown_image_count", "warnings", "phase2_processed_at", +] + +# 1D pilot baseline (project_markdown_canonical_layer.md, Phase 1D 결과 섹션). +BASELINE_1D = { + "success_rate": 0.92, # 23/25 + "skipped_rate": 0.08, + "failed_rate": 0.0, + "elapsed_p50_ms": 34000, + "elapsed_p90_ms": 112000, + "text_length_ratio_p50": 1.15, + "warnings_heading_jump_pct": 0.86, # 24/28 + "warnings_low_image_alt_pct": 0.89, # 25/28 +} + + +async def cmd_post_report(output_csv: Path, output_md: Path, + phase2_start: str | None) -> None: + """Phase 2 sweep 결과 집계 + 1D baseline 비교. + + phase2_start: ISO timestamp (예: '2026-05-03T00:00:00Z'). 이후 처리된 doc 만 집계. + None 이면 sample CSV 들에서 doc_id union 으로 한정 (별도 인자 미지원 — 기본 = NULL = 전 history). + 실용적으로는 phase2_start 사용 권장 (Phase 2 코드 push 후 시각). + """ + engine = _build_engine() + Session = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) + + from models.document import Document # type: ignore + from models.queue import ProcessingQueue # type: ignore + + async with Session() as session: + cond = [Document.deleted_at.is_(None), Document.file_format == "pdf"] + if phase2_start: + cond.append(Document.md_generated_at >= phase2_start) + else: + cond.append(Document.md_generated_at.is_not(None)) + + rows = ( + await session.execute( + select( + Document.id, Document.title, Document.md_status, + Document.md_extraction_engine, Document.md_extraction_engine_version, + Document.md_extraction_quality, Document.md_extraction_error, + Document.md_generated_at, + ).where(*cond) + ) + ).all() + + ids = [r.id for r in rows] + if ids: + q_rows = ( + await session.execute( + select( + ProcessingQueue.document_id, + ProcessingQueue.started_at, + ProcessingQueue.completed_at, + ).where( + ProcessingQueue.document_id.in_(ids), + ProcessingQueue.stage == "markdown", + ) + ) + ).all() + else: + q_rows = [] + + elapsed: dict[int, float] = {} + for q in q_rows: + if q.started_at and q.completed_at: + elapsed[q.document_id] = (q.completed_at - q.started_at).total_seconds() * 1000 + + # CSV + enriched_rows = [] + for r in rows: + q = r.md_extraction_quality or {} + m = q.get("metrics") if isinstance(q, dict) else {} + m = m or {} + warnings = q.get("warnings") if isinstance(q, dict) else [] + enriched_rows.append({ + "doc_id": r.id, + "title": (r.title or "")[:120], + "final_md_status": r.md_status, + "md_extraction_engine": r.md_extraction_engine or "", + "md_extraction_engine_version": r.md_extraction_engine_version or "", + "elapsed_ms_estimate": int(elapsed.get(r.id, 0)) if r.id in elapsed else "", + "text_length_ratio": m.get("text_length_ratio", "") if m else "", + "markdown_heading_count": m.get("markdown_heading_count", "") if m else "", + "markdown_table_row_count": m.get("markdown_table_row_count", "") if m else "", + "markdown_image_count": m.get("markdown_image_count", "") if m else "", + "warnings": ",".join(warnings) if isinstance(warnings, list) else "", + "phase2_processed_at": r.md_generated_at.isoformat() if r.md_generated_at else "", + }) + + output_csv.parent.mkdir(parents=True, exist_ok=True) + with output_csv.open("w", newline="", encoding="utf-8") as f: + writer = csv.DictWriter(f, fieldnames=POST_REPORT_COLUMNS, extrasaction="ignore") + writer.writeheader() + for row in enriched_rows: + writer.writerow(row) + + # Markdown summary + total = len(rows) + by_status = Counter(r.md_status for r in rows) + success_n = by_status.get("success", 0) + skipped_n = by_status.get("skipped", 0) + failed_n = by_status.get("failed", 0) + + success_rows = [r for r in rows if r.md_status == "success"] + text_ratios = [] + heading_jump_count = 0 + low_image_alt_count = 0 + new_warnings: Counter = Counter() + KNOWN_WARNINGS = {"heading_hierarchy_jump", "low_image_alt_text_ratio"} + for r in success_rows: + q = r.md_extraction_quality or {} + m = q.get("metrics") if isinstance(q, dict) else {} + m = m or {} + if isinstance(m.get("text_length_ratio"), (int, float)): + text_ratios.append(float(m["text_length_ratio"])) + warnings = q.get("warnings") if isinstance(q, dict) else [] + if isinstance(warnings, list): + if "heading_hierarchy_jump" in warnings: + heading_jump_count += 1 + if "low_image_alt_text_ratio" in warnings: + low_image_alt_count += 1 + for w in warnings: + if w not in KNOWN_WARNINGS: + new_warnings[w] += 1 + + elapsed_succ = [elapsed[r.id] for r in success_rows if r.id in elapsed] + elapsed_p50 = _percentile(elapsed_succ, 0.5) + elapsed_p90 = _percentile(elapsed_succ, 0.9) + text_ratio_p50 = _percentile(text_ratios, 0.5) + + skip_reasons = Counter() + for r in rows: + if r.md_status == "skipped": + reason = (r.md_extraction_error or "unknown").split(":", 1)[0] + skip_reasons[reason.strip()] += 1 + + failed_rows = [r for r in rows if r.md_status == "failed"] + + outliers = [] + for r in success_rows: + e = elapsed.get(r.id) + if e and e > 300_000: + outliers.append((r.id, f"elapsed_ms={int(e)}")) + q = r.md_extraction_quality or {} + m = q.get("metrics") if isinstance(q, dict) else {} + m = m or {} + ratio = m.get("text_length_ratio") + if isinstance(ratio, (int, float)) and (ratio < 0.5 or ratio > 10): + outliers.append((r.id, f"text_length_ratio={ratio:.2f}")) + + def pct(n: int, d: int) -> str: + return f"{n/d*100:.0f}%" if d else "-" + + def delta_pp(actual_n: int, actual_d: int, baseline: float) -> str: + if not actual_d: + return "-" + return f"{(actual_n/actual_d - baseline)*100:+.1f}pp" + + def fmt_num(v) -> str: + if v is None: + return "-" + if isinstance(v, float): + return f"{v:.2f}" + return str(v) + + scope_line = "Scope: file_format='pdf' AND deleted_at IS NULL" + if phase2_start: + scope_line += f" AND md_generated_at >= {phase2_start}" + else: + scope_line += " AND md_generated_at IS NOT NULL" + + md_lines = [ + "# Phase 2 Post-report", + "", + f"Generated: {datetime.now(timezone.utc).isoformat()}", + scope_line, + "", + f"## 처리 분포 (총 {total}건)", + "| status | count | rate |", + "|---|---|---|", + f"| success | {success_n} | {pct(success_n, total)} |", + f"| skipped | {skipped_n} | {pct(skipped_n, total)} |", + f"| failed | {failed_n} | {pct(failed_n, total)} |", + "", + "## vs 1D baseline 비교", + "| 메트릭 | 1D | Phase 2 | delta |", + "|---|---|---|---|", + f"| success rate | 92% | {pct(success_n, total)} | {delta_pp(success_n, total, BASELINE_1D['success_rate'])} |", + f"| skipped rate | 8% | {pct(skipped_n, total)} | {delta_pp(skipped_n, total, BASELINE_1D['skipped_rate'])} |", + f"| failed rate | 0% | {pct(failed_n, total)} | {delta_pp(failed_n, total, BASELINE_1D['failed_rate'])} |", + f"| elapsed_ms p50 | 34000 | {fmt_num(int(elapsed_p50)) if elapsed_p50 else '-'} | - |", + f"| elapsed_ms p90 | 112000 | {fmt_num(int(elapsed_p90)) if elapsed_p90 else '-'} | - |", + f"| text_length_ratio p50 | 1.15 | {fmt_num(text_ratio_p50)} | - |", + f"| warnings: heading_hierarchy_jump | 86% | {pct(heading_jump_count, success_n)} | - |", + f"| warnings: low_image_alt_text_ratio | 89% | {pct(low_image_alt_count, success_n)} | - |", + "", + "## skip reason 분포", + ] + for reason, c in skip_reasons.most_common(): + md_lines.append(f"- `{reason}`: {c}건") + if not skip_reasons: + md_lines.append("- (no skipped)") + md_lines += [ + f"", + f"## failed 케이스 ({len(failed_rows)}건)", + ] + for r in failed_rows: + md_lines.append(f"- doc {r.id} `{(r.title or '-')[:60]}`: {r.md_extraction_error or '-'}") + if not failed_rows: + md_lines.append("- (no failed)") + md_lines += [ + f"", + f"## outlier ({len(outliers)}건)", + ] + for doc_id, note in outliers[:30]: + md_lines.append(f"- doc {doc_id}: {note}") + if not outliers: + md_lines.append("- (no outliers)") + if new_warnings: + md_lines += [f"", f"## 신규 warning 종류 (1D 미관측)"] + for w, c in new_warnings.most_common(): + md_lines.append(f"- `{w}`: {c}건") + + output_md.parent.mkdir(parents=True, exist_ok=True) + output_md.write_text("\n".join(md_lines) + "\n", encoding="utf-8") + + print(f"\n## Phase 2 post-report") + print(f" CSV: {output_csv}") + print(f" MD: {output_md}") + print(f" scope: {total} rows (success {success_n} / skipped {skipped_n} / failed {failed_n})") + + await engine.dispose() + + +# ─── main ─────────────────────────────────────────────────────────────── + + +def main() -> None: + parser = argparse.ArgumentParser( + description="Phase 2 markdown canonical layer full backfill", + formatter_class=argparse.RawDescriptionHelpFormatter, + ) + sub = parser.add_subparsers(dest="cmd", required=True) + + p_inv = sub.add_parser("inventory", help="pending PDFs dry-run inventory CSV") + p_inv.add_argument("--output", type=Path, required=True) + + p_can = sub.add_parser("select-canary", help="stratified 40 canary sample CSV") + p_can.add_argument("--inventory", type=Path, required=True) + p_can.add_argument("--output", type=Path, required=True) + p_can.add_argument("--seed", type=int, default=20260503) + + p_enq = sub.add_parser("enqueue", help="enqueue from sample CSV (one-shot)") + p_enq.add_argument("--csv", type=Path, required=True) + g = p_enq.add_mutually_exclusive_group() + g.add_argument("--dry-run", dest="dry_run", action="store_true", default=True) + g.add_argument("--no-dry-run", dest="dry_run", action="store_false") + + p_nig = sub.add_parser("nightly-enqueue", help="nightly cron sweep") + p_nig.add_argument("--limit", type=int, default=50) + p_nig.add_argument("--max-active-queue", type=int, default=5) + p_nig.add_argument("--log-tsv", type=Path, required=True) + p_nig.add_argument("--dry-run", action="store_true", default=False) + + p_rep = sub.add_parser("post-report", help="final results CSV + markdown summary") + p_rep.add_argument("--output-csv", type=Path, required=True) + p_rep.add_argument("--output-md", type=Path, required=True) + p_rep.add_argument("--phase2-start", type=str, default=None, + help="ISO timestamp; only docs with md_generated_at >= this are counted") + + args = parser.parse_args() + if args.cmd == "inventory": + asyncio.run(cmd_inventory(args.output)) + elif args.cmd == "select-canary": + cmd_select_canary(args.inventory, args.output, args.seed) + elif args.cmd == "enqueue": + asyncio.run(cmd_enqueue(args.csv, args.dry_run)) + elif args.cmd == "nightly-enqueue": + asyncio.run(cmd_nightly_enqueue(args.limit, args.max_active_queue, args.log_tsv, args.dry_run)) + elif args.cmd == "post-report": + asyncio.run(cmd_post_report(args.output_csv, args.output_md, args.phase2_start)) + + +if __name__ == "__main__": + main()