Files
hyungi_document_server/scripts/phase1d_pilot.py
T
Hyungi Ahn 3e831a2dc7 fix(canonical): Phase 1D script sys.path — /app/scripts/.. 가 PYTHONPATH 루트
fastapi 컨테이너는 WORKDIR=/app, 코드가 직접 풀려있고 app/ 디렉토리 없음.
backfill_category.py 의 ../app 패턴은 컨테이너 안에서 /app/app (없음)
가 되어 ModuleNotFoundError. 스크립트 자기 디렉토리의 .. 를 sys.path 에
넣어 /app 루트 노출.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-01 09:50:23 +09:00

372 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Phase 1D — marker markdown 품질 pilot (one-shot admin script).
* 영구 worker 경로 아님. 30건 한정 sample 로 baseline 품질 측정.
* Phase 2 전체 백필 결정은 1D 결과 보고 후행.
* 1B.5 (이미지 추출 / _meta 보존) 는 별도 PR — 본 스크립트 영역 아님.
Stratification:
ai_domain × file_size_bucket (page_count 는 documents 컬럼 없음 → file_size proxy)
보조: 각 cell 안에서 file_size 작은/큰 mix.
document_type ∈ SKIP_DOC_TYPES 제외 (marker_worker 의 SKIP 룰과 동일).
Subcommands:
select stratified 30건 dry-run + JSON 저장
enqueue select 결과를 markdown 큐에 enqueue (uq_queue_active 위반 회피)
report md_status 분포·실패사유·quality 메트릭·UI 검수 URL 출력
실행 (GPU 서버):
docker compose exec fastapi python /app/scripts/phase1d_pilot.py select
docker compose exec fastapi python /app/scripts/phase1d_pilot.py enqueue --yes
docker compose exec fastapi python /app/scripts/phase1d_pilot.py report
"""
import argparse
import asyncio
import json
import os
import re
import sys
from collections import Counter, defaultdict
from pathlib import Path
# fastapi 컨테이너는 WORKDIR=/app 에 코드를 펼쳐놓음 (app/ 디렉토리 없음).
# /app/scripts/../app 이 아니라 /app 자체가 sys.path 에 있어야 `from models...` import 가능.
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
# marker_worker 의 SKIP 룰 미러 — drift 회피 위해 한 곳만 진실. 변경 시 동기 필요.
SKIP_DOC_TYPES = {
"발주서", "세금계산서", "명세표",
"Invoice", "Purchase_Order", "Estimate", "Statement",
}
# file_size bucket (page_count proxy). PDF 평균 1페이지 ~10~50KB.
SIZE_BUCKETS = [
("small", 0, 500 * 1024), # < 500KB
("medium", 500 * 1024, 5 * 1024 * 1024), # 500KB ~ 5MB
("large", 5 * 1024 * 1024, 10**12), # > 5MB
]
PILOT_TARGET = 30
DEFAULT_OUT = Path("/tmp/phase1d_pilot.json")
def _bucket(file_size: int | None) -> str:
if file_size is None:
return "unknown"
for name, lo, hi in SIZE_BUCKETS:
if lo <= file_size < hi:
return name
return "outlier"
def _build_engine() -> "AsyncEngine":
db_url = os.environ["DATABASE_URL"]
return create_async_engine(db_url, pool_pre_ping=True)
# ─── select ───
async def cmd_select(out_path: Path) -> None:
engine = _build_engine()
Session = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
from models.document import Document # type: ignore
async with Session() as session:
rows = (
await session.execute(
select(
Document.id,
Document.title,
Document.ai_domain,
Document.document_type,
Document.file_size,
Document.file_path,
Document.file_format,
Document.category,
Document.md_status,
).where(
Document.deleted_at.is_(None),
Document.file_format == "pdf",
Document.md_status == "pending",
Document.category == "document",
)
)
).all()
candidates = [
r for r in rows
if not (r.document_type and r.document_type in SKIP_DOC_TYPES)
]
print(f"후보 (필터 후): {len(candidates)}건 (전체 pending PDF document: {len(rows)}건)")
grouped: dict[tuple[str, str], list] = defaultdict(list)
for r in candidates:
domain = r.ai_domain or "unknown"
grouped[(domain, _bucket(r.file_size))].append(r)
cells = sorted(grouped.keys())
base_per_cell = max(1, PILOT_TARGET // max(1, len(cells)))
sample: list = []
leftover_cells: list[tuple[tuple[str, str], list]] = []
for cell in cells:
items = sorted(grouped[cell], key=lambda x: (x.file_size or 0, x.id))
take = min(base_per_cell, len(items))
if take >= 2:
half = take // 2
sample.extend(items[:half]) # 작은 쪽
sample.extend(items[-(take - half):]) # 큰 쪽
else:
sample.extend(items[:take])
if len(items) > take:
leftover_cells.append((cell, items[take:]))
leftover_cells.sort(key=lambda x: -len(x[1]))
li = 0
while len(sample) < PILOT_TARGET and li < len(leftover_cells):
_, items = leftover_cells[li]
if items:
sample.append(items.pop(0))
else:
li += 1
sample = sample[:PILOT_TARGET]
print(f"\n선정 {len(sample)}건 (목표 {PILOT_TARGET}):\n")
print(f"{'ID':>6} {'KB':>8} {'domain':<22} {'doctype':<22} title")
print("-" * 130)
for r in sample:
print(
f"{r.id:>6} {((r.file_size or 0) // 1024):>8} "
f"{(r.ai_domain or '-')[:22]:<22} "
f"{(r.document_type or '-')[:22]:<22} "
f"{(r.title or '-')[:60]}"
)
cell_counts: Counter = Counter()
for r in sample:
cell_counts[((r.ai_domain or "unknown"), _bucket(r.file_size))] += 1
print("\n분포 (ai_domain × file_size_bucket):")
for (d, b), c in sorted(cell_counts.items()):
print(f" {d:<22} × {b:<8} : {c}")
payload = {
"target": PILOT_TARGET,
"ids": [r.id for r in sample],
"items": [
{
"id": r.id,
"title": r.title,
"ai_domain": r.ai_domain,
"document_type": r.document_type,
"file_size": r.file_size,
"file_path": r.file_path,
}
for r in sample
],
}
out_path.write_text(json.dumps(payload, ensure_ascii=False, indent=2))
print(f"\n저장: {out_path}")
await engine.dispose()
# ─── enqueue ───
async def cmd_enqueue(in_path: Path, yes: bool) -> None:
payload = json.loads(in_path.read_text())
ids = payload["ids"]
if not yes:
confirm = input(f"\n{len(ids)}건 markdown 큐에 enqueue 합니다. 진행? [y/N] ")
if confirm.strip().lower() not in ("y", "yes"):
print("취소됨.")
return
engine = _build_engine()
Session = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
from models.queue import enqueue_stage # type: ignore
enqueued, skipped = [], []
async with Session() as session:
for doc_id in ids:
ok = await enqueue_stage(session, doc_id, "markdown")
(enqueued if ok else skipped).append(doc_id)
await session.commit()
print(f"enqueued: {len(enqueued)}, skipped (이미 active): {len(skipped)}")
if skipped:
print(f" skipped ids: {skipped[:20]}{'' if len(skipped) > 20 else ''}")
await engine.dispose()
# ─── report ───
def _stat(vals: list[float]) -> str:
if not vals:
return "-"
s = sorted(vals)
n = len(s)
p50 = s[n // 2]
p90 = s[min(n - 1, int(n * 0.9))]
return f"min={s[0]:.2f} p50={p50:.2f} p90={p90:.2f} max={s[-1]:.2f}"
KATEX_INLINE_RE = re.compile(r"(?<!\$)\$[^$\n]{1,160}\$(?!\$)")
async def cmd_report(in_path: Path) -> None:
payload = json.loads(in_path.read_text())
ids: list[int] = payload["ids"]
engine = _build_engine()
Session = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
from models.document import Document # type: ignore
from models.queue import ProcessingQueue # type: ignore
async with Session() as session:
rows = (
await session.execute(
select(
Document.id, Document.title, Document.ai_domain, Document.file_size,
Document.md_status, Document.md_extraction_error,
Document.md_extraction_quality, Document.md_content,
Document.md_generated_at, Document.created_at,
).where(Document.id.in_(ids))
)
).all()
q_rows = (
await session.execute(
select(
ProcessingQueue.document_id,
ProcessingQueue.status,
ProcessingQueue.attempts,
ProcessingQueue.started_at,
ProcessingQueue.completed_at,
).where(
ProcessingQueue.document_id.in_(ids),
ProcessingQueue.stage == "markdown",
)
)
).all()
elapsed: dict[int, float] = {}
queue_status: dict[int, str] = {}
queue_attempts: dict[int, int] = {}
for q in q_rows:
queue_status[q.document_id] = q.status
queue_attempts[q.document_id] = q.attempts
if q.started_at and q.completed_at:
elapsed[q.document_id] = (q.completed_at - q.started_at).total_seconds()
by_status = Counter(r.md_status for r in rows)
print(f"\n## md_status 분포 (sample {len(rows)}건)")
for s, c in sorted(by_status.items(), key=lambda x: -x[1]):
print(f" {s:<12} {c}")
in_queue = [i for i in ids if queue_status.get(i) in {"pending", "processing"}]
if in_queue:
print(f"\n 처리 대기/중 (큐 active): {len(in_queue)}")
es = [v for k, v in elapsed.items()
if next((r for r in rows if r.id == k and r.md_status == "success"), None)]
if es:
print(f"\n## 평균 처리 시간 (success {len(es)}건)")
print(f" avg={sum(es)/len(es):.1f}s {_stat(es)}")
else:
print("\n## 처리 시간: 측정 가능한 success 없음")
fail_reasons: Counter = Counter()
for r in rows:
if r.md_status == "failed":
fail_reasons[(r.md_extraction_error or "unknown")[:140]] += 1
if fail_reasons:
print("\n## 실패 사유 top 5")
for reason, c in fail_reasons.most_common(5):
print(f" {c:>3} {reason}")
text_ratios, heading_jumps, table_rows, image_counts = [], [], [], []
warning_kinds: Counter = Counter()
katex_candidates, table_with_rows = [], []
has_headings = 0
success_rows = [r for r in rows if r.md_status == "success"]
for r in success_rows:
q = r.md_extraction_quality or {}
m = q.get("metrics", {}) or {}
if m.get("text_length_ratio") is not None: text_ratios.append(m["text_length_ratio"])
if m.get("heading_jump_count") is not None: heading_jumps.append(m["heading_jump_count"])
if m.get("markdown_table_row_count") is not None:
table_rows.append(m["markdown_table_row_count"])
if m["markdown_table_row_count"] > 0:
table_with_rows.append(r.id)
if m.get("markdown_image_count") is not None: image_counts.append(m["markdown_image_count"])
if (m.get("markdown_heading_count") or 0) > 0:
has_headings += 1
for w in (q.get("warnings") or []):
warning_kinds[w] += 1
c = r.md_content or ""
if "$$" in c or KATEX_INLINE_RE.search(c):
katex_candidates.append(r.id)
if success_rows:
print(f"\n## quality 메트릭 (success {len(success_rows)}건)")
print(f" text_length_ratio {_stat(text_ratios)}")
print(f" heading_jump_count {_stat(heading_jumps)}")
print(f" markdown_table_row_count {_stat(table_rows)} (rows>0: {len(table_with_rows)}건)")
print(f" markdown_image_count {_stat(image_counts)} (현재 server.py 가 _images 버림 → 0 정상)")
print(f" heading anchor 후보 (markdown_heading_count > 0): {has_headings}")
print(f" KaTeX 후보 ($-수식 매칭): {len(katex_candidates)}건 ids={katex_candidates[:10]}")
if warning_kinds:
print("\n warnings (kind):")
for w, c in warning_kinds.most_common():
print(f" {c:>3} {w}")
print("\n## file_size bucket 별 success 비율")
bucket_stat: dict[str, list[int]] = defaultdict(lambda: [0, 0])
for r in rows:
b = _bucket(r.file_size)
bucket_stat[b][1] += 1
if r.md_status == "success":
bucket_stat[b][0] += 1
for b in ("small", "medium", "large", "unknown", "outlier"):
s, t = bucket_stat[b]
if t == 0:
continue
rate = (s / t * 100) if t else 0
print(f" {b:<8} {s}/{t} ({rate:.0f}%)")
print("\n## 사용자 검수 후보 (UI URL — 표 깨짐/heading anchor/KaTeX 직접 확인):")
for r in sorted(rows, key=lambda x: (x.md_status or "", -(x.file_size or 0))):
marker = "" if r.id in katex_candidates else " "
print(f" {marker} https://document.hyungi.net/documents/{r.id} [{r.md_status}] {(r.title or '-')[:60]}")
await engine.dispose()
def main() -> None:
parser = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter)
sub = parser.add_subparsers(dest="cmd", required=True)
p_sel = sub.add_parser("select", help="stratified 30건 dry-run")
p_sel.add_argument("--out", type=Path, default=DEFAULT_OUT)
p_enq = sub.add_parser("enqueue", help="markdown 큐 enqueue")
p_enq.add_argument("--in", dest="in_path", type=Path, default=DEFAULT_OUT)
p_enq.add_argument("--yes", action="store_true")
p_rep = sub.add_parser("report", help="결과 집계")
p_rep.add_argument("--in", dest="in_path", type=Path, default=DEFAULT_OUT)
args = parser.parse_args()
if args.cmd == "select":
asyncio.run(cmd_select(args.out))
elif args.cmd == "enqueue":
asyncio.run(cmd_enqueue(args.in_path, args.yes))
elif args.cmd == "report":
asyncio.run(cmd_report(args.in_path))
if __name__ == "__main__":
main()