ops(canonical): Phase 1D marker pilot one-shot script (select/enqueue/report)

30건 한정 stratified pilot. baseline markdown 품질 측정 후 Phase 2 전체
백필 결정. 영구 worker 경로 아님.

대상 WHERE:
  deleted_at IS NULL
  AND file_format='pdf'
  AND md_status='pending'
  AND category='document'
  AND document_type NOT IN SKIP_DOC_TYPES (marker_worker 와 일관)

Stratification:
  ai_domain × file_size_bucket (small<500KB / medium<5MB / large)
  documents 에 page_count 컬럼 부재 (marker_worker 가 PyMuPDF 로 동적
  측정) → file_size 를 길이 proxy 로 사용.
  cell 안에서 file_size 작은/큰 mix 로 짧은/긴 문서 차이 관찰.

Subcommands:
  select  — 30건 dry-run + JSON 저장 (/tmp/phase1d_pilot.json)
  enqueue — markdown 큐 enqueue (uq_queue_active 충돌 시 skip)
  report  — md_status / 평균 elapsed / 실패 top5 / heading anchor 후보 /
           KaTeX 후보 / file_size bucket 별 success 비율 / UI 검수 URL

리포트 메모:
  markdown_image_count 는 현재 server.py 가 _images 버림 → 0 정상.
  Phase 1B.5 에서 _images 출력 시 자동 활성.

실행:
  docker compose exec fastapi python /app/scripts/phase1d_pilot.py select
  docker compose exec fastapi python /app/scripts/phase1d_pilot.py enqueue --yes
  docker compose exec fastapi python /app/scripts/phase1d_pilot.py report

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Hyungi Ahn
2026-05-01 09:49:07 +09:00
parent d3bf963a66
commit f98cf2e505
+369
View File
@@ -0,0 +1,369 @@
"""Phase 1D — marker markdown 품질 pilot (one-shot admin script).
* 영구 worker 경로 아님. 30건 한정 sample 로 baseline 품질 측정.
* Phase 2 전체 백필 결정은 1D 결과 보고 후행.
* 1B.5 (이미지 추출 / _meta 보존) 는 별도 PR — 본 스크립트 영역 아님.
Stratification:
ai_domain × file_size_bucket (page_count 는 documents 컬럼 없음 → file_size proxy)
보조: 각 cell 안에서 file_size 작은/큰 mix.
document_type ∈ SKIP_DOC_TYPES 제외 (marker_worker 의 SKIP 룰과 동일).
Subcommands:
select stratified 30건 dry-run + JSON 저장
enqueue select 결과를 markdown 큐에 enqueue (uq_queue_active 위반 회피)
report md_status 분포·실패사유·quality 메트릭·UI 검수 URL 출력
실행 (GPU 서버):
docker compose exec fastapi python /app/scripts/phase1d_pilot.py select
docker compose exec fastapi python /app/scripts/phase1d_pilot.py enqueue --yes
docker compose exec fastapi python /app/scripts/phase1d_pilot.py report
"""
import argparse
import asyncio
import json
import os
import re
import sys
from collections import Counter, defaultdict
from pathlib import Path
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "app"))
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
# marker_worker 의 SKIP 룰 미러 — drift 회피 위해 한 곳만 진실. 변경 시 동기 필요.
SKIP_DOC_TYPES = {
"발주서", "세금계산서", "명세표",
"Invoice", "Purchase_Order", "Estimate", "Statement",
}
# file_size bucket (page_count proxy). PDF 평균 1페이지 ~10~50KB.
SIZE_BUCKETS = [
("small", 0, 500 * 1024), # < 500KB
("medium", 500 * 1024, 5 * 1024 * 1024), # 500KB ~ 5MB
("large", 5 * 1024 * 1024, 10**12), # > 5MB
]
PILOT_TARGET = 30
DEFAULT_OUT = Path("/tmp/phase1d_pilot.json")
def _bucket(file_size: int | None) -> str:
if file_size is None:
return "unknown"
for name, lo, hi in SIZE_BUCKETS:
if lo <= file_size < hi:
return name
return "outlier"
def _build_engine() -> "AsyncEngine":
db_url = os.environ["DATABASE_URL"]
return create_async_engine(db_url, pool_pre_ping=True)
# ─── select ───
async def cmd_select(out_path: Path) -> None:
engine = _build_engine()
Session = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
from models.document import Document # type: ignore
async with Session() as session:
rows = (
await session.execute(
select(
Document.id,
Document.title,
Document.ai_domain,
Document.document_type,
Document.file_size,
Document.file_path,
Document.file_format,
Document.category,
Document.md_status,
).where(
Document.deleted_at.is_(None),
Document.file_format == "pdf",
Document.md_status == "pending",
Document.category == "document",
)
)
).all()
candidates = [
r for r in rows
if not (r.document_type and r.document_type in SKIP_DOC_TYPES)
]
print(f"후보 (필터 후): {len(candidates)}건 (전체 pending PDF document: {len(rows)}건)")
grouped: dict[tuple[str, str], list] = defaultdict(list)
for r in candidates:
domain = r.ai_domain or "unknown"
grouped[(domain, _bucket(r.file_size))].append(r)
cells = sorted(grouped.keys())
base_per_cell = max(1, PILOT_TARGET // max(1, len(cells)))
sample: list = []
leftover_cells: list[tuple[tuple[str, str], list]] = []
for cell in cells:
items = sorted(grouped[cell], key=lambda x: (x.file_size or 0, x.id))
take = min(base_per_cell, len(items))
if take >= 2:
half = take // 2
sample.extend(items[:half]) # 작은 쪽
sample.extend(items[-(take - half):]) # 큰 쪽
else:
sample.extend(items[:take])
if len(items) > take:
leftover_cells.append((cell, items[take:]))
leftover_cells.sort(key=lambda x: -len(x[1]))
li = 0
while len(sample) < PILOT_TARGET and li < len(leftover_cells):
_, items = leftover_cells[li]
if items:
sample.append(items.pop(0))
else:
li += 1
sample = sample[:PILOT_TARGET]
print(f"\n선정 {len(sample)}건 (목표 {PILOT_TARGET}):\n")
print(f"{'ID':>6} {'KB':>8} {'domain':<22} {'doctype':<22} title")
print("-" * 130)
for r in sample:
print(
f"{r.id:>6} {((r.file_size or 0) // 1024):>8} "
f"{(r.ai_domain or '-')[:22]:<22} "
f"{(r.document_type or '-')[:22]:<22} "
f"{(r.title or '-')[:60]}"
)
cell_counts: Counter = Counter()
for r in sample:
cell_counts[((r.ai_domain or "unknown"), _bucket(r.file_size))] += 1
print("\n분포 (ai_domain × file_size_bucket):")
for (d, b), c in sorted(cell_counts.items()):
print(f" {d:<22} × {b:<8} : {c}")
payload = {
"target": PILOT_TARGET,
"ids": [r.id for r in sample],
"items": [
{
"id": r.id,
"title": r.title,
"ai_domain": r.ai_domain,
"document_type": r.document_type,
"file_size": r.file_size,
"file_path": r.file_path,
}
for r in sample
],
}
out_path.write_text(json.dumps(payload, ensure_ascii=False, indent=2))
print(f"\n저장: {out_path}")
await engine.dispose()
# ─── enqueue ───
async def cmd_enqueue(in_path: Path, yes: bool) -> None:
payload = json.loads(in_path.read_text())
ids = payload["ids"]
if not yes:
confirm = input(f"\n{len(ids)}건 markdown 큐에 enqueue 합니다. 진행? [y/N] ")
if confirm.strip().lower() not in ("y", "yes"):
print("취소됨.")
return
engine = _build_engine()
Session = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
from models.queue import enqueue_stage # type: ignore
enqueued, skipped = [], []
async with Session() as session:
for doc_id in ids:
ok = await enqueue_stage(session, doc_id, "markdown")
(enqueued if ok else skipped).append(doc_id)
await session.commit()
print(f"enqueued: {len(enqueued)}, skipped (이미 active): {len(skipped)}")
if skipped:
print(f" skipped ids: {skipped[:20]}{'' if len(skipped) > 20 else ''}")
await engine.dispose()
# ─── report ───
def _stat(vals: list[float]) -> str:
if not vals:
return "-"
s = sorted(vals)
n = len(s)
p50 = s[n // 2]
p90 = s[min(n - 1, int(n * 0.9))]
return f"min={s[0]:.2f} p50={p50:.2f} p90={p90:.2f} max={s[-1]:.2f}"
KATEX_INLINE_RE = re.compile(r"(?<!\$)\$[^$\n]{1,160}\$(?!\$)")
async def cmd_report(in_path: Path) -> None:
payload = json.loads(in_path.read_text())
ids: list[int] = payload["ids"]
engine = _build_engine()
Session = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
from models.document import Document # type: ignore
from models.queue import ProcessingQueue # type: ignore
async with Session() as session:
rows = (
await session.execute(
select(
Document.id, Document.title, Document.ai_domain, Document.file_size,
Document.md_status, Document.md_extraction_error,
Document.md_extraction_quality, Document.md_content,
Document.md_generated_at, Document.created_at,
).where(Document.id.in_(ids))
)
).all()
q_rows = (
await session.execute(
select(
ProcessingQueue.document_id,
ProcessingQueue.status,
ProcessingQueue.attempts,
ProcessingQueue.started_at,
ProcessingQueue.completed_at,
).where(
ProcessingQueue.document_id.in_(ids),
ProcessingQueue.stage == "markdown",
)
)
).all()
elapsed: dict[int, float] = {}
queue_status: dict[int, str] = {}
queue_attempts: dict[int, int] = {}
for q in q_rows:
queue_status[q.document_id] = q.status
queue_attempts[q.document_id] = q.attempts
if q.started_at and q.completed_at:
elapsed[q.document_id] = (q.completed_at - q.started_at).total_seconds()
by_status = Counter(r.md_status for r in rows)
print(f"\n## md_status 분포 (sample {len(rows)}건)")
for s, c in sorted(by_status.items(), key=lambda x: -x[1]):
print(f" {s:<12} {c}")
in_queue = [i for i in ids if queue_status.get(i) in {"pending", "processing"}]
if in_queue:
print(f"\n 처리 대기/중 (큐 active): {len(in_queue)}")
es = [v for k, v in elapsed.items()
if next((r for r in rows if r.id == k and r.md_status == "success"), None)]
if es:
print(f"\n## 평균 처리 시간 (success {len(es)}건)")
print(f" avg={sum(es)/len(es):.1f}s {_stat(es)}")
else:
print("\n## 처리 시간: 측정 가능한 success 없음")
fail_reasons: Counter = Counter()
for r in rows:
if r.md_status == "failed":
fail_reasons[(r.md_extraction_error or "unknown")[:140]] += 1
if fail_reasons:
print("\n## 실패 사유 top 5")
for reason, c in fail_reasons.most_common(5):
print(f" {c:>3} {reason}")
text_ratios, heading_jumps, table_rows, image_counts = [], [], [], []
warning_kinds: Counter = Counter()
katex_candidates, table_with_rows = [], []
has_headings = 0
success_rows = [r for r in rows if r.md_status == "success"]
for r in success_rows:
q = r.md_extraction_quality or {}
m = q.get("metrics", {}) or {}
if m.get("text_length_ratio") is not None: text_ratios.append(m["text_length_ratio"])
if m.get("heading_jump_count") is not None: heading_jumps.append(m["heading_jump_count"])
if m.get("markdown_table_row_count") is not None:
table_rows.append(m["markdown_table_row_count"])
if m["markdown_table_row_count"] > 0:
table_with_rows.append(r.id)
if m.get("markdown_image_count") is not None: image_counts.append(m["markdown_image_count"])
if (m.get("markdown_heading_count") or 0) > 0:
has_headings += 1
for w in (q.get("warnings") or []):
warning_kinds[w] += 1
c = r.md_content or ""
if "$$" in c or KATEX_INLINE_RE.search(c):
katex_candidates.append(r.id)
if success_rows:
print(f"\n## quality 메트릭 (success {len(success_rows)}건)")
print(f" text_length_ratio {_stat(text_ratios)}")
print(f" heading_jump_count {_stat(heading_jumps)}")
print(f" markdown_table_row_count {_stat(table_rows)} (rows>0: {len(table_with_rows)}건)")
print(f" markdown_image_count {_stat(image_counts)} (현재 server.py 가 _images 버림 → 0 정상)")
print(f" heading anchor 후보 (markdown_heading_count > 0): {has_headings}")
print(f" KaTeX 후보 ($-수식 매칭): {len(katex_candidates)}건 ids={katex_candidates[:10]}")
if warning_kinds:
print("\n warnings (kind):")
for w, c in warning_kinds.most_common():
print(f" {c:>3} {w}")
print("\n## file_size bucket 별 success 비율")
bucket_stat: dict[str, list[int]] = defaultdict(lambda: [0, 0])
for r in rows:
b = _bucket(r.file_size)
bucket_stat[b][1] += 1
if r.md_status == "success":
bucket_stat[b][0] += 1
for b in ("small", "medium", "large", "unknown", "outlier"):
s, t = bucket_stat[b]
if t == 0:
continue
rate = (s / t * 100) if t else 0
print(f" {b:<8} {s}/{t} ({rate:.0f}%)")
print("\n## 사용자 검수 후보 (UI URL — 표 깨짐/heading anchor/KaTeX 직접 확인):")
for r in sorted(rows, key=lambda x: (x.md_status or "", -(x.file_size or 0))):
marker = "" if r.id in katex_candidates else " "
print(f" {marker} https://document.hyungi.net/documents/{r.id} [{r.md_status}] {(r.title or '-')[:60]}")
await engine.dispose()
def main() -> None:
parser = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter)
sub = parser.add_subparsers(dest="cmd", required=True)
p_sel = sub.add_parser("select", help="stratified 30건 dry-run")
p_sel.add_argument("--out", type=Path, default=DEFAULT_OUT)
p_enq = sub.add_parser("enqueue", help="markdown 큐 enqueue")
p_enq.add_argument("--in", dest="in_path", type=Path, default=DEFAULT_OUT)
p_enq.add_argument("--yes", action="store_true")
p_rep = sub.add_parser("report", help="결과 집계")
p_rep.add_argument("--in", dest="in_path", type=Path, default=DEFAULT_OUT)
args = parser.parse_args()
if args.cmd == "select":
asyncio.run(cmd_select(args.out))
elif args.cmd == "enqueue":
asyncio.run(cmd_enqueue(args.in_path, args.yes))
elif args.cmd == "report":
asyncio.run(cmd_report(args.in_path))
if __name__ == "__main__":
main()