feat(scripts): Phase 2 markdown backfill — script + README

- scripts/phase2_backfill.py: 5 subcommands
  - inventory: pending PDFs dry-run CSV with skip forecast
  - select-canary: stratified 40 sample (seed 20260503)
  - enqueue: one-shot from sample CSV (--no-dry-run gate)
  - nightly-enqueue: cron-friendly with disable flag / marker /ready /
    active-queue threshold (oldest_age stuck guard) / DB pool guards
  - post-report: final state CSV + 1D baseline comparison MD
- evals/markdown/README.md: Phase 2 section appended
- plan: ~/.claude/plans/iridescent-gathering-clover.md
- depends on Phase 1B handwritten skip 7d0fca2 (marker_worker side guard)
This commit is contained in:
2026-05-02 23:44:59 +00:00
parent 5185501bbd
commit c81ce8d366
2 changed files with 901 additions and 0 deletions
+111
View File
@@ -117,3 +117,114 @@ docker compose exec fastapi python /app/scripts/phase1d_pilot.py select \
```
**enqueue 의 `--yes` 또는 `--no-dry-run` 류 실행은 별도 사용자 승인 + 야간 단발 sweep 윈도우 (23:00~03:00 KST) 안에서만**. 30건 backfill = marker-service BATCH_SIZE=1 × 평균 5분/건 ≈ 2.5h.
---
# Phase 2 — Full Backfill (legacy pending PDFs)
> Plan: `~/.claude/plans/iridescent-gathering-clover.md`
> Script: `scripts/phase2_backfill.py` (subcommands: inventory / select-canary / enqueue / nightly-enqueue / post-report)
## 목적
1D pilot 결과 = engineering go signal. legacy pending PDF (1D 후 잔여 ~237건) 을 marker_worker 로 변환해 `md_status='success'` 누적. **신규 업로드 우선권 보존, 야간 저부하 sweep, DB state 기반 idempotent checkpoint**.
진행 로드맵: **2-A dry-run inventory → 2-B canary 40건 → 2-C nightly sweep ~4-5 nights → 2-D post-report**.
## 파일
| 파일 | 역할 | 갱신 시점 |
|---|---|---|
| `phase2_inventory.csv` | pending PDFs dry-run inventory + skip forecast | 2-A 종료 (commit, 1회) |
| `phase2_canary_sample.csv` | stratified 40건 canary sample (시드 `20260503`) | 2-B(a) (commit) |
| `phase2_canary_result.md` | canary 결과 요약 + 1D 비교 + GO/HALT 결정 근거 | 2-B 종료 (commit) |
| `phase2_nightly_log.tsv` | 야간 sweep 한 줄/일 (date / enqueued / active_queue_at_start / active_queue_oldest_age_min / pending_pool_remaining / abort_reason / marker_ready) | append 매 sweep, 주 1회 commit |
| `phase2_post_report.csv` | Phase 2 sweep 처리된 doc 별 final state + quality | 2-D (commit) |
| `phase2_post_report.md` | 처리 분포 + 1D baseline 비교 + skip/failed/outlier 목록 | 2-D (commit) |
## Subcommand 사용법
### inventory (read-only, dry-run)
```bash
docker exec hyungi_document_server-fastapi-1 python /app/scripts/phase2_backfill.py inventory \
--output /app/evals/markdown/phase2_inventory.csv
```
- pending PDFs 전체에 대해 doc_id / file_size / text_density / doc_type / forecast_skip_reason 적재.
- forecast_skip_reason ∈ {unsupported_extension / doctype_skip / handwritten_hint / over_max_pages_estimated / none}. 'none' = 변환 시도 후보.
- handwritten_hint = title/path 에 `필기|손글씨|handwritten|handwriting` 매칭 (marker_worker 의 7d0fca2 룰 미러).
- over_max_pages_estimated = file_size > 25MB proxy. 실 page_count 는 marker_worker 가 PyMuPDF 로 결정.
### select-canary (재현성 시드)
```bash
docker exec hyungi_document_server-fastapi-1 python /app/scripts/phase2_backfill.py select-canary \
--inventory /app/evals/markdown/phase2_inventory.csv \
--output /app/evals/markdown/phase2_canary_sample.csv \
--seed 20260503
```
- 40건 buckets: large 6 / scan_likely 2 / study_note 10 / Academic_Paper 8 / Reference 6 / {Standard,Manual,Specification} 4 / {Note,Report,Memo,NULL} 4
- inventory 의 `forecast_skip_reason='none'` 만 선정 후보.
- 시드 고정 → 재실행 시 동일 sample.
### enqueue (one-shot, 사용자 승인 게이트)
```bash
# dry-run (default)
docker exec hyungi_document_server-fastapi-1 python /app/scripts/phase2_backfill.py enqueue \
--csv /app/evals/markdown/phase2_canary_sample.csv
# actual (사용자 승인 후)
docker exec hyungi_document_server-fastapi-1 python /app/scripts/phase2_backfill.py enqueue \
--csv /app/evals/markdown/phase2_canary_sample.csv --no-dry-run
```
- marker-service `/ready` 사전 검증.
- `enqueue_stage` idempotent — 중복 호출 안전.
### nightly-enqueue (cron / manual)
```bash
docker exec hyungi_document_server-fastapi-1 python /app/scripts/phase2_backfill.py nightly-enqueue \
--limit 50 --max-active-queue 5 \
--log-tsv /app/evals/markdown/phase2_nightly_log.tsv
```
- 가드 순서: disable flag (`/tmp/phase2_disable`) → marker /ready → active_queue ≤ threshold → DB pool 비어있지 않음 → enqueue.
- 매 sweep log_tsv 한 줄. abort_reason ∈ {disable_flag / marker_unhealthy / active_queue_threshold / pool_empty / empty}.
- pool_empty = Phase 2 자연 완료 신호 (cron 제거 hard gate trigger).
### post-report
```bash
docker exec hyungi_document_server-fastapi-1 python /app/scripts/phase2_backfill.py post-report \
--output-csv /app/evals/markdown/phase2_post_report.csv \
--output-md /app/evals/markdown/phase2_post_report.md \
--phase2-start 2026-05-03T00:00:00Z
```
- `--phase2-start` ISO timestamp 이후 `md_generated_at` 만 집계 (Phase 2 코드 push 시점 권장).
- 1D baseline (success 92% / elapsed_p50 34s / text_length_ratio_p50 1.15) 와 비교.
- outlier 후보: elapsed_ms > 300s, text_length_ratio < 0.5 또는 > 10, 신규 warning 종류.
## 의사결정 게이트
### 2-B canary GO/HALT
- success ≥ 36/40 (90%) AND failed ≤ 2 AND skipped ≤ 6 → **2-C 진입 GO**
- 위 미충족 → HALT, 사용자 보고 후 재검토
### 2-C nightly abort
- 1 night 안에 failed > 5 → script 가 disable flag 자동 생성
- marker-service `/ready` 실패 → 그 sweep 건너뜀, 다음 sweep 재시도
- active_queue_oldest_age_min > 60 (stuck 임계) → log 에 [warn], 사용자 morning check 으로 판단
### 2-D 종료 hard gate (Phase 2 closed 선언 직전)
- (cron 모드) `crontab -l | grep phase2_backfill` 결과 비어 있어야 함
- `~/.phase2_disable` 파일 정리 됨
- pending PDF (`md_status='pending'`, `file_format='pdf'`) ≤ 5
- processing_queue markdown active = 0
## 1D 와의 차이
| 항목 | 1D | Phase 2 |
|---|---|---|
| 목적 | failure mode 진단 | 풀 변환 |
| 대상 | 30건 stratified | 237건 잔여 |
| sample_source | existing_success + controlled_backfill | controlled_backfill only |
| 처리 모드 | one-shot cron (1회) | nightly cron (~4-5 nights) |
| 평가 | 사용자 5축 rubric | marker 자가 metrics + 1D baseline 비교 |
| anchor 보존 | doc 4809 forced_include | (재처리 안 함) |
| handwritten | over-sample (3건) | marker_worker 자동 skip 신뢰 |
+790
View File
@@ -0,0 +1,790 @@
"""Phase 2 — markdown canonical layer full backfill.
* legacy pending PDF (Phase 1B/1C/1D 후 잔여) 을 야간 sweep + canary 단계로 변환.
* 1D pilot (25건 controlled_backfill, 23 success / 2 skipped / 0 failed) 가 engineering go signal.
* handwritten 자동 skip (commit 7d0fca2) 와 marker_worker SKIP_DOC_TYPES / MAX_PAGES guard 모두 운영 시점 가드. 본 스크립트는 inventory · sample · enqueue 만 담당.
Subcommands:
inventory pending PDFs 의 dry-run inventory CSV 작성 (skip forecast 포함)
select-canary inventory 에서 stratified 40건 canary sample CSV 작성 (재현성 시드)
enqueue sample CSV 의 doc_id 들을 markdown 큐에 enqueue (one-shot, --no-dry-run 필요)
nightly-enqueue 야간 sweep — disable flag / marker ready / active-queue threshold / DB pool 가드 후 limit 만큼 enqueue + log_tsv
post-report 최종 결과 CSV + markdown 요약 (1D baseline 비교 포함)
실행 (GPU 서버):
docker exec hyungi_document_server-fastapi-1 python /app/scripts/phase2_backfill.py inventory \
--output /app/evals/markdown/phase2_inventory.csv
docker exec hyungi_document_server-fastapi-1 python /app/scripts/phase2_backfill.py select-canary \
--inventory /app/evals/markdown/phase2_inventory.csv \
--output /app/evals/markdown/phase2_canary_sample.csv \
--seed 20260503
docker exec hyungi_document_server-fastapi-1 python /app/scripts/phase2_backfill.py enqueue \
--csv /app/evals/markdown/phase2_canary_sample.csv --no-dry-run
# cron (nightly):
docker exec hyungi_document_server-fastapi-1 python /app/scripts/phase2_backfill.py nightly-enqueue \
--limit 50 --max-active-queue 5 \
--log-tsv /app/evals/markdown/phase2_nightly_log.tsv
docker exec hyungi_document_server-fastapi-1 python /app/scripts/phase2_backfill.py post-report \
--output-csv /app/evals/markdown/phase2_post_report.csv \
--output-md /app/evals/markdown/phase2_post_report.md
"""
import argparse
import asyncio
import csv
import json
import os
import random
import re
import sys
import urllib.error
import urllib.request
from collections import Counter, defaultdict
from datetime import datetime, timezone
from pathlib import Path
# fastapi 컨테이너 WORKDIR=/app — `from models...` import 가능하게 path 추가.
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
from sqlalchemy import func, select, text
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
# marker_worker 룰 미러 (1D pilot 패턴 따름 — drift 회피 위해 한 곳).
SKIP_DOC_TYPES = {
"발주서", "세금계산서", "명세표",
"Invoice", "Purchase_Order", "Estimate", "Statement",
}
# handwritten 자동 skip 키워드 (commit 7d0fca2 의 marker_worker 정의와 동일).
HANDWRITTEN_REGEX = re.compile(r"필기|손글씨|handwritten|handwriting", re.IGNORECASE)
# Inventory 의 page_count proxy — file_size 가 25MB 초과면 MAX_PAGES=200 초과 가능성 높음.
# 실제 marker_worker 가 PyMuPDF 로 page_count 확인 후 결정. 본 forecast 는 plan budget 추정용.
MAX_PAGES_FILESIZE_PROXY = 25 * 1024 * 1024
# stuck 판단 임계 — 평균 변환 시간 ~48s (1D 실측). 30분이면 의심, 60분이면 stuck likely.
STUCK_THRESHOLD_MIN = 60.0
# Disable flag — 사용자 또는 자동 abort 가드가 생성. cron 매 사이클 시작 시 체크.
DISABLE_FLAG_PATH = Path("/tmp/phase2_disable") # 컨테이너 내부 경로 (호스트 mount 별도)
# marker-service ready endpoint (Phase 1B 명세).
MARKER_READY_URL = "http://marker-service:3300/ready"
# ─── helpers ────────────────────────────────────────────────────────────
def _build_engine():
db_url = os.environ["DATABASE_URL"]
return create_async_engine(db_url, pool_pre_ping=True)
def _file_size_band(file_size: int | None) -> str:
if file_size is None:
return "unknown"
if file_size < 1 * 1024 * 1024:
return "S"
if file_size < 10 * 1024 * 1024:
return "M"
return "L"
def _text_density(text_len: int, file_size: int | None) -> float | None:
if not file_size or file_size <= 0:
return None
return text_len / (file_size / 1024.0)
def _text_density_band(density: float | None) -> str:
if density is None:
return "unknown"
if density < 5.0:
return "scan-likely"
if density < 50.0:
return "mixed"
return "born-digital"
def _forecast_skip_reason(file_format: str | None, doc_type: str | None,
title: str | None, file_path: str | None,
file_size: int | None) -> str:
if file_format and file_format.lower() != "pdf":
return "unsupported_extension"
if doc_type and doc_type in SKIP_DOC_TYPES:
return "doctype_skip"
blob = " ".join(filter(None, [title or "", file_path or ""]))
if HANDWRITTEN_REGEX.search(blob):
return "handwritten_hint"
if file_size and file_size > MAX_PAGES_FILESIZE_PROXY:
return "over_max_pages_estimated"
return "none"
def _percentile(values: list[float], p: float) -> float | None:
if not values:
return None
s = sorted(values)
idx = max(0, min(len(s) - 1, int(len(s) * p)))
return s[idx]
def _check_marker_ready() -> bool:
try:
with urllib.request.urlopen(MARKER_READY_URL, timeout=5) as resp:
data = json.load(resp)
return data.get("status") == "ready"
except (urllib.error.URLError, urllib.error.HTTPError, json.JSONDecodeError, OSError):
return False
# ─── inventory ──────────────────────────────────────────────────────────
INVENTORY_COLUMNS = [
"doc_id", "title", "file_path", "file_size", "file_size_band",
"text_len", "text_density", "text_density_band",
"doc_type", "forecast_skip_reason", "created_at",
]
async def cmd_inventory(output: Path) -> None:
engine = _build_engine()
Session = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
from models.document import Document # type: ignore
async with Session() as session:
rows = (
await session.execute(
select(
Document.id, Document.title, Document.file_path, Document.file_size,
Document.file_format, Document.document_type,
Document.extracted_text, Document.created_at,
).where(
Document.deleted_at.is_(None),
Document.file_format == "pdf",
Document.md_status == "pending",
)
.order_by(Document.created_at.asc())
)
).all()
enriched = []
for r in rows:
text_len = len(r.extracted_text or "")
density = _text_density(text_len, r.file_size)
enriched.append({
"doc_id": r.id,
"title": (r.title or ""),
"file_path": (r.file_path or ""),
"file_size": r.file_size or 0,
"file_size_band": _file_size_band(r.file_size),
"text_len": text_len,
"text_density": round(density, 3) if density is not None else "",
"text_density_band": _text_density_band(density),
"doc_type": r.document_type or "",
"forecast_skip_reason": _forecast_skip_reason(
r.file_format, r.document_type, r.title, r.file_path, r.file_size
),
"created_at": r.created_at.isoformat() if r.created_at else "",
})
output.parent.mkdir(parents=True, exist_ok=True)
with output.open("w", newline="", encoding="utf-8") as f:
writer = csv.DictWriter(f, fieldnames=INVENTORY_COLUMNS, extrasaction="ignore")
writer.writeheader()
for row in enriched:
writer.writerow(row)
skip_dist = Counter(r["forecast_skip_reason"] for r in enriched)
band_dist = Counter(r["file_size_band"] for r in enriched)
density_dist = Counter(r["text_density_band"] for r in enriched)
type_dist = Counter(r["doc_type"] or "(NULL)" for r in enriched)
print(f"\n## Phase 2 inventory")
print(f" total pending PDFs: {len(enriched)}")
print(f" output: {output}")
print(f"\n forecast_skip_reason:")
for k, v in skip_dist.most_common():
print(f" {k:<30} {v:>4}")
print(f"\n file_size_band: ", dict(band_dist))
print(f" text_density_band: ", dict(density_dist))
print(f"\n doc_type top 10:")
for k, v in type_dist.most_common(10):
print(f" {k:<25} {v:>4}")
convert_target = sum(1 for r in enriched if r["forecast_skip_reason"] == "none")
print(f"\n 변환 시도 후보 (forecast_skip_reason='none'): {convert_target}")
print(f" 즉시 skip 예상: {len(enriched) - convert_target}")
await engine.dispose()
# ─── select-canary ──────────────────────────────────────────────────────
CANARY_COLUMNS = [
"doc_id", "title", "file_size", "file_size_band",
"text_density", "text_density_band", "doc_type", "bucket_label",
]
def _select_canary_buckets(candidates: list[dict], rng: random.Random) -> list[dict]:
"""40건 stratified — plan §"Sample budget" 표 참조.
large 6 / scan_likely 2 / study_note 10 / Academic_Paper 8 / Reference 6 /
{Standard,Manual,Specification} 4 / {Note,Report,Memo,NULL} 4 = 40
"""
selected: list[dict] = []
used: set[int] = set()
def take(pool_filter, n: int, label: str) -> None:
avail = [c for c in candidates if c["doc_id"] not in used and pool_filter(c)]
rng.shuffle(avail)
for c in avail[:n]:
picked = dict(c)
picked["bucket_label"] = label
selected.append(picked)
used.add(c["doc_id"])
take(lambda c: c["file_size_band"] == "L", 6, "large")
take(lambda c: c["text_density_band"] == "scan-likely", 2, "scan_likely")
take(lambda c: c["doc_type"] == "study_note" and c["text_density_band"] == "born-digital", 10, "study_note")
take(lambda c: c["doc_type"] == "Academic_Paper" and c["text_density_band"] == "born-digital", 8, "Academic_Paper")
take(lambda c: c["doc_type"] == "Reference" and c["text_density_band"] == "born-digital", 6, "Reference")
take(lambda c: c["doc_type"] in {"Standard", "Manual", "Specification"}, 4, "tech_doc")
take(lambda c: (c["doc_type"] in {"Note", "Report", "Memo"} or not c["doc_type"]), 4, "minor_doc")
if len(selected) < 40:
leftover = [c for c in candidates if c["doc_id"] not in used]
rng.shuffle(leftover)
for c in leftover[: 40 - len(selected)]:
picked = dict(c)
picked["bucket_label"] = "filler"
selected.append(picked)
used.add(c["doc_id"])
return selected[:40]
def cmd_select_canary(inventory: Path, output: Path, seed: int) -> None:
if not inventory.is_file():
print(f"[error] inventory CSV 없음: {inventory}", file=sys.stderr)
sys.exit(1)
candidates: list[dict] = []
with inventory.open(encoding="utf-8") as f:
reader = csv.DictReader(f)
for row in reader:
if row["forecast_skip_reason"] != "none":
continue
candidates.append({
"doc_id": int(row["doc_id"]),
"title": row["title"],
"file_size": int(row["file_size"]) if row["file_size"] else 0,
"file_size_band": row["file_size_band"],
"text_density": row["text_density"],
"text_density_band": row["text_density_band"],
"doc_type": row["doc_type"],
})
rng = random.Random(seed)
selected = _select_canary_buckets(candidates, rng)
output.parent.mkdir(parents=True, exist_ok=True)
with output.open("w", newline="", encoding="utf-8") as f:
writer = csv.DictWriter(f, fieldnames=CANARY_COLUMNS, extrasaction="ignore")
writer.writeheader()
for s in selected:
writer.writerow({col: s.get(col, "") for col in CANARY_COLUMNS})
bucket_dist = Counter(s["bucket_label"] for s in selected)
print(f"\n## Canary sample (seed={seed})")
print(f" selected: {len(selected)} / 40")
print(f" output: {output}")
print(f"\n bucket_label:")
for k, v in bucket_dist.most_common():
print(f" {k:<20} {v:>3}")
print(f"\n doc_id list:")
for s in sorted(selected, key=lambda x: (x["bucket_label"], x["doc_id"])):
print(f" {s['doc_id']:>5} [{s['bucket_label']:<14}] {(s['title'] or '-')[:60]}")
# ─── enqueue (one-shot from CSV) ─────────────────────────────────────────
async def cmd_enqueue(csv_path: Path, dry_run: bool) -> None:
if not csv_path.is_file():
print(f"[error] sample CSV 없음: {csv_path}", file=sys.stderr)
sys.exit(1)
ids: list[int] = []
with csv_path.open(encoding="utf-8") as f:
reader = csv.DictReader(f)
for row in reader:
ids.append(int(row["doc_id"]))
if not ids:
print("[abort] enqueue 대상 없음.")
return
print(f"[targets] {len(ids)} doc_ids: {ids[:10]}{'' if len(ids) > 10 else ''}")
if dry_run:
print("[dry-run] --no-dry-run 으로 다시 실행하면 실제 enqueue.")
return
if not _check_marker_ready():
print("[abort] marker-service /ready 가 ready 가 아님. enqueue 중단.")
sys.exit(2)
engine = _build_engine()
Session = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
from models.queue import enqueue_stage # type: ignore
enqueued, skipped = [], []
async with Session() as session:
for doc_id in ids:
ok = await enqueue_stage(session, doc_id, "markdown")
(enqueued if ok else skipped).append(doc_id)
await session.commit()
print(f"\nenqueued: {len(enqueued)}, skipped (이미 active): {len(skipped)}")
if skipped:
print(f" skipped ids: {skipped[:20]}{'' if len(skipped) > 20 else ''}")
await engine.dispose()
# ─── nightly-enqueue (cron-friendly) ─────────────────────────────────────
NIGHTLY_TSV_COLUMNS = [
"date", "enqueued", "active_queue_at_start", "active_queue_oldest_age_min",
"pending_pool_remaining", "abort_reason", "marker_ready",
]
def _append_tsv(log_tsv: Path, row: dict) -> None:
log_tsv.parent.mkdir(parents=True, exist_ok=True)
new_file = not log_tsv.exists() or log_tsv.stat().st_size == 0
with log_tsv.open("a", encoding="utf-8") as f:
if new_file:
f.write("\t".join(NIGHTLY_TSV_COLUMNS) + "\n")
f.write("\t".join(str(row.get(col, "")) for col in NIGHTLY_TSV_COLUMNS) + "\n")
async def cmd_nightly_enqueue(limit: int, max_active_queue: int, log_tsv: Path,
dry_run: bool) -> None:
today = datetime.now().strftime("%Y-%m-%d")
base_row = {
"date": today,
"enqueued": 0,
"active_queue_at_start": "",
"active_queue_oldest_age_min": "",
"pending_pool_remaining": "",
"abort_reason": "",
"marker_ready": "",
}
# 1. disable flag
if DISABLE_FLAG_PATH.exists():
base_row["abort_reason"] = "disable_flag"
_append_tsv(log_tsv, base_row)
print(f"[abort] disable flag 존재: {DISABLE_FLAG_PATH}")
return
# 2. marker ready
marker_ready = _check_marker_ready()
base_row["marker_ready"] = 1 if marker_ready else 0
if not marker_ready:
base_row["abort_reason"] = "marker_unhealthy"
_append_tsv(log_tsv, base_row)
print("[abort] marker-service /ready 가 ready 아님")
return
engine = _build_engine()
Session = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
from models.document import Document # type: ignore
from models.queue import ProcessingQueue, enqueue_stage # type: ignore
# 3. active queue check + oldest age
async with Session() as session:
active_stats = (await session.execute(text("""
SELECT COUNT(*) AS active_count,
COALESCE(EXTRACT(EPOCH FROM (NOW() - MIN(COALESCE(started_at, created_at))))/60, 0) AS oldest_age_min
FROM processing_queue
WHERE stage='markdown' AND status IN ('pending','processing')
"""))).first()
active_count = int(active_stats[0])
oldest_age = float(active_stats[1])
base_row["active_queue_at_start"] = active_count
base_row["active_queue_oldest_age_min"] = round(oldest_age, 1)
if active_count > max_active_queue:
base_row["abort_reason"] = "active_queue_threshold"
_append_tsv(log_tsv, base_row)
print(f"[skip] active queue {active_count} > threshold {max_active_queue} (oldest {oldest_age:.1f}min)")
if oldest_age > STUCK_THRESHOLD_MIN:
print(f"[warn] oldest active row age {oldest_age:.1f}min > {STUCK_THRESHOLD_MIN}min — possible stuck")
await engine.dispose()
return
# 4. pending pool query
async with Session() as session:
pool_rows = (
await session.execute(text(f"""
SELECT id FROM documents
WHERE deleted_at IS NULL
AND file_format='pdf'
AND md_status='pending'
AND id NOT IN (
SELECT document_id FROM processing_queue
WHERE stage='markdown' AND status IN ('pending','processing')
)
ORDER BY created_at ASC
LIMIT {int(limit)}
"""))
).all()
pool_ids = [r[0] for r in pool_rows]
if not pool_ids:
base_row["abort_reason"] = "pool_empty"
base_row["pending_pool_remaining"] = 0
_append_tsv(log_tsv, base_row)
print("[done] pending pool empty — Phase 2 backfill 자연 완료 신호.")
await engine.dispose()
return
if dry_run:
print(f"[dry-run] would enqueue {len(pool_ids)} ids: {pool_ids[:10]}{'' if len(pool_ids) > 10 else ''}")
await engine.dispose()
return
# 5. enqueue
enqueued, skipped = [], []
async with Session() as session:
for doc_id in pool_ids:
ok = await enqueue_stage(session, doc_id, "markdown")
(enqueued if ok else skipped).append(doc_id)
await session.commit()
# 6. pending pool remaining (post-enqueue)
async with Session() as session:
remaining_count = (
await session.execute(text("""
SELECT COUNT(*) FROM documents d
WHERE d.deleted_at IS NULL
AND d.file_format='pdf'
AND d.md_status='pending'
AND d.id NOT IN (
SELECT document_id FROM processing_queue
WHERE stage='markdown' AND status IN ('pending','processing')
)
"""))
).scalar()
base_row["enqueued"] = len(enqueued)
base_row["pending_pool_remaining"] = int(remaining_count or 0)
_append_tsv(log_tsv, base_row)
print(f"\n[ok] enqueued={len(enqueued)} skipped={len(skipped)} remaining_pool={remaining_count}")
await engine.dispose()
# ─── post-report ────────────────────────────────────────────────────────
POST_REPORT_COLUMNS = [
"doc_id", "title", "final_md_status", "md_extraction_engine", "md_extraction_engine_version",
"elapsed_ms_estimate", "text_length_ratio", "markdown_heading_count",
"markdown_table_row_count", "markdown_image_count", "warnings", "phase2_processed_at",
]
# 1D pilot baseline (project_markdown_canonical_layer.md, Phase 1D 결과 섹션).
BASELINE_1D = {
"success_rate": 0.92, # 23/25
"skipped_rate": 0.08,
"failed_rate": 0.0,
"elapsed_p50_ms": 34000,
"elapsed_p90_ms": 112000,
"text_length_ratio_p50": 1.15,
"warnings_heading_jump_pct": 0.86, # 24/28
"warnings_low_image_alt_pct": 0.89, # 25/28
}
async def cmd_post_report(output_csv: Path, output_md: Path,
phase2_start: str | None) -> None:
"""Phase 2 sweep 결과 집계 + 1D baseline 비교.
phase2_start: ISO timestamp (예: '2026-05-03T00:00:00Z'). 이후 처리된 doc 만 집계.
None 이면 sample CSV 들에서 doc_id union 으로 한정 (별도 인자 미지원 — 기본 = NULL = 전 history).
실용적으로는 phase2_start 사용 권장 (Phase 2 코드 push 후 시각).
"""
engine = _build_engine()
Session = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
from models.document import Document # type: ignore
from models.queue import ProcessingQueue # type: ignore
async with Session() as session:
cond = [Document.deleted_at.is_(None), Document.file_format == "pdf"]
if phase2_start:
cond.append(Document.md_generated_at >= phase2_start)
else:
cond.append(Document.md_generated_at.is_not(None))
rows = (
await session.execute(
select(
Document.id, Document.title, Document.md_status,
Document.md_extraction_engine, Document.md_extraction_engine_version,
Document.md_extraction_quality, Document.md_extraction_error,
Document.md_generated_at,
).where(*cond)
)
).all()
ids = [r.id for r in rows]
if ids:
q_rows = (
await session.execute(
select(
ProcessingQueue.document_id,
ProcessingQueue.started_at,
ProcessingQueue.completed_at,
).where(
ProcessingQueue.document_id.in_(ids),
ProcessingQueue.stage == "markdown",
)
)
).all()
else:
q_rows = []
elapsed: dict[int, float] = {}
for q in q_rows:
if q.started_at and q.completed_at:
elapsed[q.document_id] = (q.completed_at - q.started_at).total_seconds() * 1000
# CSV
enriched_rows = []
for r in rows:
q = r.md_extraction_quality or {}
m = q.get("metrics") if isinstance(q, dict) else {}
m = m or {}
warnings = q.get("warnings") if isinstance(q, dict) else []
enriched_rows.append({
"doc_id": r.id,
"title": (r.title or "")[:120],
"final_md_status": r.md_status,
"md_extraction_engine": r.md_extraction_engine or "",
"md_extraction_engine_version": r.md_extraction_engine_version or "",
"elapsed_ms_estimate": int(elapsed.get(r.id, 0)) if r.id in elapsed else "",
"text_length_ratio": m.get("text_length_ratio", "") if m else "",
"markdown_heading_count": m.get("markdown_heading_count", "") if m else "",
"markdown_table_row_count": m.get("markdown_table_row_count", "") if m else "",
"markdown_image_count": m.get("markdown_image_count", "") if m else "",
"warnings": ",".join(warnings) if isinstance(warnings, list) else "",
"phase2_processed_at": r.md_generated_at.isoformat() if r.md_generated_at else "",
})
output_csv.parent.mkdir(parents=True, exist_ok=True)
with output_csv.open("w", newline="", encoding="utf-8") as f:
writer = csv.DictWriter(f, fieldnames=POST_REPORT_COLUMNS, extrasaction="ignore")
writer.writeheader()
for row in enriched_rows:
writer.writerow(row)
# Markdown summary
total = len(rows)
by_status = Counter(r.md_status for r in rows)
success_n = by_status.get("success", 0)
skipped_n = by_status.get("skipped", 0)
failed_n = by_status.get("failed", 0)
success_rows = [r for r in rows if r.md_status == "success"]
text_ratios = []
heading_jump_count = 0
low_image_alt_count = 0
new_warnings: Counter = Counter()
KNOWN_WARNINGS = {"heading_hierarchy_jump", "low_image_alt_text_ratio"}
for r in success_rows:
q = r.md_extraction_quality or {}
m = q.get("metrics") if isinstance(q, dict) else {}
m = m or {}
if isinstance(m.get("text_length_ratio"), (int, float)):
text_ratios.append(float(m["text_length_ratio"]))
warnings = q.get("warnings") if isinstance(q, dict) else []
if isinstance(warnings, list):
if "heading_hierarchy_jump" in warnings:
heading_jump_count += 1
if "low_image_alt_text_ratio" in warnings:
low_image_alt_count += 1
for w in warnings:
if w not in KNOWN_WARNINGS:
new_warnings[w] += 1
elapsed_succ = [elapsed[r.id] for r in success_rows if r.id in elapsed]
elapsed_p50 = _percentile(elapsed_succ, 0.5)
elapsed_p90 = _percentile(elapsed_succ, 0.9)
text_ratio_p50 = _percentile(text_ratios, 0.5)
skip_reasons = Counter()
for r in rows:
if r.md_status == "skipped":
reason = (r.md_extraction_error or "unknown").split(":", 1)[0]
skip_reasons[reason.strip()] += 1
failed_rows = [r for r in rows if r.md_status == "failed"]
outliers = []
for r in success_rows:
e = elapsed.get(r.id)
if e and e > 300_000:
outliers.append((r.id, f"elapsed_ms={int(e)}"))
q = r.md_extraction_quality or {}
m = q.get("metrics") if isinstance(q, dict) else {}
m = m or {}
ratio = m.get("text_length_ratio")
if isinstance(ratio, (int, float)) and (ratio < 0.5 or ratio > 10):
outliers.append((r.id, f"text_length_ratio={ratio:.2f}"))
def pct(n: int, d: int) -> str:
return f"{n/d*100:.0f}%" if d else "-"
def delta_pp(actual_n: int, actual_d: int, baseline: float) -> str:
if not actual_d:
return "-"
return f"{(actual_n/actual_d - baseline)*100:+.1f}pp"
def fmt_num(v) -> str:
if v is None:
return "-"
if isinstance(v, float):
return f"{v:.2f}"
return str(v)
scope_line = "Scope: file_format='pdf' AND deleted_at IS NULL"
if phase2_start:
scope_line += f" AND md_generated_at >= {phase2_start}"
else:
scope_line += " AND md_generated_at IS NOT NULL"
md_lines = [
"# Phase 2 Post-report",
"",
f"Generated: {datetime.now(timezone.utc).isoformat()}",
scope_line,
"",
f"## 처리 분포 (총 {total}건)",
"| status | count | rate |",
"|---|---|---|",
f"| success | {success_n} | {pct(success_n, total)} |",
f"| skipped | {skipped_n} | {pct(skipped_n, total)} |",
f"| failed | {failed_n} | {pct(failed_n, total)} |",
"",
"## vs 1D baseline 비교",
"| 메트릭 | 1D | Phase 2 | delta |",
"|---|---|---|---|",
f"| success rate | 92% | {pct(success_n, total)} | {delta_pp(success_n, total, BASELINE_1D['success_rate'])} |",
f"| skipped rate | 8% | {pct(skipped_n, total)} | {delta_pp(skipped_n, total, BASELINE_1D['skipped_rate'])} |",
f"| failed rate | 0% | {pct(failed_n, total)} | {delta_pp(failed_n, total, BASELINE_1D['failed_rate'])} |",
f"| elapsed_ms p50 | 34000 | {fmt_num(int(elapsed_p50)) if elapsed_p50 else '-'} | - |",
f"| elapsed_ms p90 | 112000 | {fmt_num(int(elapsed_p90)) if elapsed_p90 else '-'} | - |",
f"| text_length_ratio p50 | 1.15 | {fmt_num(text_ratio_p50)} | - |",
f"| warnings: heading_hierarchy_jump | 86% | {pct(heading_jump_count, success_n)} | - |",
f"| warnings: low_image_alt_text_ratio | 89% | {pct(low_image_alt_count, success_n)} | - |",
"",
"## skip reason 분포",
]
for reason, c in skip_reasons.most_common():
md_lines.append(f"- `{reason}`: {c}")
if not skip_reasons:
md_lines.append("- (no skipped)")
md_lines += [
f"",
f"## failed 케이스 ({len(failed_rows)}건)",
]
for r in failed_rows:
md_lines.append(f"- doc {r.id} `{(r.title or '-')[:60]}`: {r.md_extraction_error or '-'}")
if not failed_rows:
md_lines.append("- (no failed)")
md_lines += [
f"",
f"## outlier ({len(outliers)}건)",
]
for doc_id, note in outliers[:30]:
md_lines.append(f"- doc {doc_id}: {note}")
if not outliers:
md_lines.append("- (no outliers)")
if new_warnings:
md_lines += [f"", f"## 신규 warning 종류 (1D 미관측)"]
for w, c in new_warnings.most_common():
md_lines.append(f"- `{w}`: {c}")
output_md.parent.mkdir(parents=True, exist_ok=True)
output_md.write_text("\n".join(md_lines) + "\n", encoding="utf-8")
print(f"\n## Phase 2 post-report")
print(f" CSV: {output_csv}")
print(f" MD: {output_md}")
print(f" scope: {total} rows (success {success_n} / skipped {skipped_n} / failed {failed_n})")
await engine.dispose()
# ─── main ───────────────────────────────────────────────────────────────
def main() -> None:
parser = argparse.ArgumentParser(
description="Phase 2 markdown canonical layer full backfill",
formatter_class=argparse.RawDescriptionHelpFormatter,
)
sub = parser.add_subparsers(dest="cmd", required=True)
p_inv = sub.add_parser("inventory", help="pending PDFs dry-run inventory CSV")
p_inv.add_argument("--output", type=Path, required=True)
p_can = sub.add_parser("select-canary", help="stratified 40 canary sample CSV")
p_can.add_argument("--inventory", type=Path, required=True)
p_can.add_argument("--output", type=Path, required=True)
p_can.add_argument("--seed", type=int, default=20260503)
p_enq = sub.add_parser("enqueue", help="enqueue from sample CSV (one-shot)")
p_enq.add_argument("--csv", type=Path, required=True)
g = p_enq.add_mutually_exclusive_group()
g.add_argument("--dry-run", dest="dry_run", action="store_true", default=True)
g.add_argument("--no-dry-run", dest="dry_run", action="store_false")
p_nig = sub.add_parser("nightly-enqueue", help="nightly cron sweep")
p_nig.add_argument("--limit", type=int, default=50)
p_nig.add_argument("--max-active-queue", type=int, default=5)
p_nig.add_argument("--log-tsv", type=Path, required=True)
p_nig.add_argument("--dry-run", action="store_true", default=False)
p_rep = sub.add_parser("post-report", help="final results CSV + markdown summary")
p_rep.add_argument("--output-csv", type=Path, required=True)
p_rep.add_argument("--output-md", type=Path, required=True)
p_rep.add_argument("--phase2-start", type=str, default=None,
help="ISO timestamp; only docs with md_generated_at >= this are counted")
args = parser.parse_args()
if args.cmd == "inventory":
asyncio.run(cmd_inventory(args.output))
elif args.cmd == "select-canary":
cmd_select_canary(args.inventory, args.output, args.seed)
elif args.cmd == "enqueue":
asyncio.run(cmd_enqueue(args.csv, args.dry_run))
elif args.cmd == "nightly-enqueue":
asyncio.run(cmd_nightly_enqueue(args.limit, args.max_active_queue, args.log_tsv, args.dry_run))
elif args.cmd == "post-report":
asyncio.run(cmd_post_report(args.output_csv, args.output_md, args.phase2_start))
if __name__ == "__main__":
main()