0362f52130
Round 2 sample 에 existing_success 5건 (anchor doc 4809 + calibration 4)
이 포함되었지만, cmd_enqueue 가 sample_source 무시하고 30건 전부 enqueue
하던 버그. 결과:
- existing 5건 marker 재처리 (~25분 marker 시간 낭비)
- 동일 quality output 으로 md_content overwrite → baseline 유실
- anchor (doc 4809) 의 "before" 상태가 사라져 후속 라운드 비교 anchor 손상
Fix:
- default = sample_source == "controlled_backfill" 만 enqueue (25건)
- --include-existing flag 추가 (후속 Marker 튜닝 라운드에서 anchor 재처리
필요 시 사용)
- print 로 mode 명시 + 제외된 ids 표시
야간 단발 sweep (23:00 KST) 예약 실행 전 fix.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
790 lines
31 KiB
Python
790 lines
31 KiB
Python
"""Phase 1D — marker markdown 품질 pilot (one-shot admin script).
|
||
|
||
* 영구 worker 경로 아님. 30건 한정 sample 로 baseline 품질 측정.
|
||
* Phase 2 전체 백필 결정은 1D 결과 보고 후행.
|
||
* 1B.5 (이미지 추출 / _meta 보존) 는 별도 PR — 본 스크립트 영역 아님.
|
||
|
||
Stratification (Round 2 refined, plan: ~/.claude/plans/stratified-mingling-otter.md):
|
||
4 축: doc_type × file_size_band × text_density_band × handwritten_hint
|
||
+ sample_source ∈ {existing_success, controlled_backfill}
|
||
- existing_success 5건 (anchor 1 + calibration 4)
|
||
- controlled_backfill 25건 (handwritten 3 / scan_likely 2~3 / mixed 5 / born_digital 12 / large 2)
|
||
+ forced_include: doc 4809 (Note_240805_용접교육 필기) — known bad handwritten anchor.
|
||
document_type ∈ SKIP_DOC_TYPES 제외 (marker_worker 룰 미러).
|
||
|
||
Subcommands:
|
||
select stratified 30건 dry-run + CSV+JSON 저장
|
||
enqueue select 결과를 markdown 큐에 enqueue (uq_queue_active 위반 회피)
|
||
report md_status 분포·실패사유·quality 메트릭·UI 검수 URL 출력
|
||
eval_template pilot_1d_eval.csv 스켈레톤 출력 (사용자가 rubric 5축 점수 채움)
|
||
|
||
실행 (GPU 서버):
|
||
docker compose exec fastapi python /app/scripts/phase1d_pilot.py select \
|
||
--csv /app/evals/markdown/pilot_1d_sample.csv
|
||
docker compose exec fastapi python /app/scripts/phase1d_pilot.py enqueue --yes
|
||
docker compose exec fastapi python /app/scripts/phase1d_pilot.py report
|
||
docker compose exec fastapi python /app/scripts/phase1d_pilot.py eval_template \
|
||
--csv /app/evals/markdown/pilot_1d_eval.csv
|
||
"""
|
||
|
||
import argparse
|
||
import asyncio
|
||
import csv
|
||
import json
|
||
import os
|
||
import random
|
||
import re
|
||
import sys
|
||
from collections import Counter, defaultdict
|
||
from pathlib import Path
|
||
|
||
# fastapi 컨테이너는 WORKDIR=/app 에 코드를 펼쳐놓음 (app/ 디렉토리 없음).
|
||
# /app/scripts/../app 이 아니라 /app 자체가 sys.path 에 있어야 `from models...` import 가능.
|
||
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
|
||
|
||
from sqlalchemy import select
|
||
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
|
||
|
||
# marker_worker 의 SKIP 룰 미러 — drift 회피 위해 한 곳만 진실. 변경 시 동기 필요.
|
||
SKIP_DOC_TYPES = {
|
||
"발주서", "세금계산서", "명세표",
|
||
"Invoice", "Purchase_Order", "Estimate", "Statement",
|
||
}
|
||
|
||
# file_size bucket (page_count proxy). PDF 평균 1페이지 ~10~50KB.
|
||
SIZE_BUCKETS = [
|
||
("small", 0, 500 * 1024), # < 500KB
|
||
("medium", 500 * 1024, 5 * 1024 * 1024), # 500KB ~ 5MB
|
||
("large", 5 * 1024 * 1024, 10**12), # > 5MB
|
||
]
|
||
|
||
# 4축 stratification 의 file_size_band — Round 2 plan
|
||
FILE_SIZE_BAND_THRESHOLDS = [
|
||
("S", 0, 1 * 1024 * 1024), # < 1MB
|
||
("M", 1 * 1024 * 1024, 10 * 1024 * 1024), # 1~10MB
|
||
("L", 10 * 1024 * 1024, 10**12), # > 10MB
|
||
]
|
||
|
||
# text_density (chars per KB of file) — born-digital vs scan 구분 단일 깨끗한 proxy.
|
||
# 0.17 (필기 4809) ↔ 94 (born-digital 3759) 양 끝 검증됨.
|
||
TEXT_DENSITY_BANDS = [
|
||
("scan-likely", 0.0, 5.0),
|
||
("mixed", 5.0, 50.0),
|
||
("born-digital", 50.0, float("inf")),
|
||
]
|
||
|
||
HANDWRITTEN_HINT_REGEX = re.compile(r"필기|노트|handwritten|scan|스캔|note", re.IGNORECASE)
|
||
|
||
# Forced include — 사용자 시각 확인에서 발견된 known bad anchor.
|
||
# 1D 결과로 다음 라운드 튜닝 시 같은 문서를 재변환해 개선 여부 판정.
|
||
FORCED_INCLUDES: dict[int, str] = {
|
||
4809: "known_bad_handwritten_anchor",
|
||
}
|
||
|
||
# 재현성 시드 — 한 번 만든 sample CSV 가 동일 결과 보장.
|
||
SAMPLE_SEED = 20260502
|
||
|
||
PILOT_TARGET = 30
|
||
EXISTING_SUCCESS_TARGET = 5
|
||
CONTROLLED_BACKFILL_TARGET = PILOT_TARGET - EXISTING_SUCCESS_TARGET # 25
|
||
|
||
DEFAULT_OUT = Path("/tmp/phase1d_pilot.json")
|
||
DEFAULT_CSV = Path("/tmp/phase1d_pilot.csv")
|
||
DEFAULT_EVAL_CSV = Path("/tmp/phase1d_eval.csv")
|
||
|
||
|
||
def _bucket(file_size: int | None) -> str:
|
||
"""legacy 3-bucket — cmd_report 의 file_size bucket 호환."""
|
||
if file_size is None:
|
||
return "unknown"
|
||
for name, lo, hi in SIZE_BUCKETS:
|
||
if lo <= file_size < hi:
|
||
return name
|
||
return "outlier"
|
||
|
||
|
||
def _file_size_band(file_size: int | None) -> str:
|
||
"""Round 2 refined band: S / M / L."""
|
||
if file_size is None:
|
||
return "unknown"
|
||
for name, lo, hi in FILE_SIZE_BAND_THRESHOLDS:
|
||
if lo <= file_size < hi:
|
||
return name
|
||
return "L"
|
||
|
||
|
||
def _text_density(text_len: int, file_size: int | None) -> float | None:
|
||
"""chars per KB of file. file_size==0/None 이면 None."""
|
||
if not file_size or file_size <= 0:
|
||
return None
|
||
return text_len / (file_size / 1024.0)
|
||
|
||
|
||
def _text_density_band(density: float | None) -> str:
|
||
if density is None:
|
||
return "unknown"
|
||
for name, lo, hi in TEXT_DENSITY_BANDS:
|
||
if lo <= density < hi:
|
||
return name
|
||
return "unknown"
|
||
|
||
|
||
def _handwritten_hint(title: str | None, file_path: str | None) -> str:
|
||
"""title 또는 file_path 에 필기/노트/handwritten/scan 매칭 → 'hi' / 'lo'."""
|
||
blob = " ".join(filter(None, [title or "", file_path or ""]))
|
||
return "hi" if HANDWRITTEN_HINT_REGEX.search(blob) else "lo"
|
||
|
||
|
||
def _scan_likely(text_len: int, file_size: int | None, density: float | None) -> bool:
|
||
"""text_density < 5 또는 extracted_text 부재 → 스캔 가능성 높음."""
|
||
if text_len == 0:
|
||
return True
|
||
if density is not None and density < 5.0:
|
||
return True
|
||
return False
|
||
|
||
|
||
def _script_mix(extracted_text: str | None, sample_chars: int = 10000) -> str:
|
||
"""첫 N자에서 Hangul/CJK/Hiragana/Katakana/Latin 비율로 라벨링.
|
||
한 script ≥ 0.7 → '<script>_dominant'
|
||
두 script 각 ≥ 0.1 → 'mixed_<a>_<b>'
|
||
그 외 → 'unknown'.
|
||
mojibake/OCR 노이즈가 심하면 비율이 이상하게 나오는데, 그것도 신호.
|
||
"""
|
||
if not extracted_text:
|
||
return "unknown"
|
||
sample = extracted_text[:sample_chars]
|
||
counts = {"hangul": 0, "cjk": 0, "kana": 0, "latin": 0, "other": 0}
|
||
total = 0
|
||
for ch in sample:
|
||
cp = ord(ch)
|
||
if ch.isspace():
|
||
continue
|
||
total += 1
|
||
if 0xAC00 <= cp <= 0xD7A3 or 0x1100 <= cp <= 0x11FF or 0x3130 <= cp <= 0x318F:
|
||
counts["hangul"] += 1
|
||
elif 0x4E00 <= cp <= 0x9FFF or 0x3400 <= cp <= 0x4DBF:
|
||
counts["cjk"] += 1
|
||
elif 0x3040 <= cp <= 0x30FF:
|
||
counts["kana"] += 1
|
||
elif (0x0041 <= cp <= 0x005A) or (0x0061 <= cp <= 0x007A) or (0x00C0 <= cp <= 0x024F):
|
||
counts["latin"] += 1
|
||
else:
|
||
counts["other"] += 1
|
||
if total == 0:
|
||
return "unknown"
|
||
ratios = {k: v / total for k, v in counts.items() if k != "other"}
|
||
primary = sorted(ratios.items(), key=lambda x: -x[1])
|
||
if primary[0][1] >= 0.7:
|
||
return f"{primary[0][0]}_dominant"
|
||
significant = [name for name, r in primary if r >= 0.1]
|
||
if len(significant) >= 2:
|
||
return "mixed_" + "_".join(sorted(significant[:2]))
|
||
return "unknown"
|
||
|
||
|
||
def _page_count_estimate(md_extraction_quality: dict | None) -> int | None:
|
||
"""existing_success 의 marker quality.metrics.source_page_count 가 있으면 사용.
|
||
controlled_backfill 은 marker 가 변환 시 PyMuPDF 로 채울 예정 → NULL.
|
||
사후 평가 해석용 보조값."""
|
||
if not md_extraction_quality or not isinstance(md_extraction_quality, dict):
|
||
return None
|
||
metrics = md_extraction_quality.get("metrics")
|
||
if not isinstance(metrics, dict):
|
||
return None
|
||
pc = metrics.get("source_page_count")
|
||
if isinstance(pc, int):
|
||
return pc
|
||
return None
|
||
|
||
|
||
# Sample CSV 컬럼 순서 — plan §"Sample CSV 컬럼" 과 동일.
|
||
CSV_COLUMNS = [
|
||
"doc_id", "title", "sample_source", "forced_include_reason", "bucket_label",
|
||
"doc_type", "file_size", "file_size_band",
|
||
"text_len", "text_density", "text_density_band",
|
||
"handwritten_hint", "scan_likely", "script_mix",
|
||
"page_count_estimate",
|
||
]
|
||
|
||
|
||
def _build_engine() -> "AsyncEngine":
|
||
db_url = os.environ["DATABASE_URL"]
|
||
return create_async_engine(db_url, pool_pre_ping=True)
|
||
|
||
|
||
# ─── select ───
|
||
|
||
def _enrich_row(r) -> dict:
|
||
"""Document row → sample dict (compute proxies)."""
|
||
text_len = len(r.extracted_text or "")
|
||
density = _text_density(text_len, r.file_size)
|
||
return {
|
||
"doc_id": r.id,
|
||
"title": r.title,
|
||
"doc_type": r.document_type,
|
||
"file_size": r.file_size or 0,
|
||
"file_size_band": _file_size_band(r.file_size),
|
||
"text_len": text_len,
|
||
"text_density": round(density, 3) if density is not None else None,
|
||
"text_density_band": _text_density_band(density),
|
||
"handwritten_hint": _handwritten_hint(r.title, r.file_path),
|
||
"scan_likely": _scan_likely(text_len, r.file_size, density),
|
||
"script_mix": _script_mix(r.extracted_text),
|
||
"page_count_estimate": _page_count_estimate(r.md_extraction_quality),
|
||
"_file_path": r.file_path,
|
||
"_ai_domain": r.ai_domain,
|
||
}
|
||
|
||
|
||
def _allocate_controlled_backfill(candidates: list[dict], rng: random.Random) -> list[dict]:
|
||
"""controlled_backfill 25건 — 4축 stratified + 의도적 over-sample.
|
||
|
||
plan §"Sample budget":
|
||
handwritten 3 / scan_likely 2~3 / mixed 5 / born_digital 12 / large 2
|
||
"""
|
||
selected: list[dict] = []
|
||
used: set[int] = set()
|
||
|
||
def take(pool: list[dict], n: int, label: str) -> None:
|
||
avail = [c for c in pool if c["doc_id"] not in used]
|
||
rng.shuffle(avail)
|
||
for c in avail[:n]:
|
||
c["bucket_label"] = label
|
||
selected.append(c)
|
||
used.add(c["doc_id"])
|
||
|
||
# 1. handwritten_hint=hi (전 3건 채택, 모집단 1.1% → sample 10%)
|
||
take([c for c in candidates if c["handwritten_hint"] == "hi"], 3, "handwritten")
|
||
|
||
# 2. scan-likely (handwritten 과 dedupe 후 2~3건)
|
||
take([c for c in candidates if c["text_density_band"] == "scan-likely"], 3, "scan_likely")
|
||
|
||
# 3. mixed (5건)
|
||
take([c for c in candidates if c["text_density_band"] == "mixed"], 5, "mixed")
|
||
|
||
# 4. born-digital × doc_type 다양 (12건). doc_type 분포 가이드:
|
||
# study_note 3 / Academic_Paper 3 / Reference 2 / Note 1 / (Manual+Standard+Specification) 2 / NULL 1
|
||
born_digital = [c for c in candidates if c["text_density_band"] == "born-digital" and c["doc_id"] not in used]
|
||
by_type: dict[str | None, list[dict]] = defaultdict(list)
|
||
for c in born_digital:
|
||
by_type[c["doc_type"]].append(c)
|
||
for pool in by_type.values():
|
||
rng.shuffle(pool)
|
||
|
||
target_quota = [
|
||
("study_note", 3),
|
||
("Academic_Paper", 3),
|
||
("Reference", 2),
|
||
("Note", 1),
|
||
("Manual", 1),
|
||
("Standard", 1),
|
||
(None, 1), # NULL
|
||
]
|
||
born_added = 0
|
||
for dt, n in target_quota:
|
||
for c in by_type.get(dt, [])[:n]:
|
||
if c["doc_id"] in used:
|
||
continue
|
||
c["bucket_label"] = "born_digital"
|
||
selected.append(c)
|
||
used.add(c["doc_id"])
|
||
born_added += 1
|
||
if born_added >= 12:
|
||
break
|
||
if born_added >= 12:
|
||
break
|
||
# 12 미달 시 남은 born-digital 로 채움 (doc_type 무관)
|
||
if born_added < 12:
|
||
leftover = [c for c in born_digital if c["doc_id"] not in used]
|
||
rng.shuffle(leftover)
|
||
for c in leftover[: 12 - born_added]:
|
||
c["bucket_label"] = "born_digital"
|
||
selected.append(c)
|
||
used.add(c["doc_id"])
|
||
|
||
# 5. file_size band=L (>10MB) — 위 4 bucket 안 든 것 보충 (목표 2건)
|
||
large_pool = [c for c in candidates if c["file_size_band"] == "L" and c["doc_id"] not in used]
|
||
take(large_pool, 2, "large")
|
||
|
||
# 30 - existing_success(5) = 25 가 목표. 부족하면 일반 pending 에서 보충.
|
||
if len(selected) < CONTROLLED_BACKFILL_TARGET:
|
||
leftover = [c for c in candidates if c["doc_id"] not in used]
|
||
rng.shuffle(leftover)
|
||
for c in leftover[: CONTROLLED_BACKFILL_TARGET - len(selected)]:
|
||
c["bucket_label"] = "filler"
|
||
selected.append(c)
|
||
used.add(c["doc_id"])
|
||
return selected[:CONTROLLED_BACKFILL_TARGET]
|
||
|
||
|
||
def _allocate_existing_success(rows: list, rng: random.Random) -> list[dict]:
|
||
"""existing_success 5건 = forced anchor 1 + calibration 4 (text_density 분포 균형)."""
|
||
enriched = [_enrich_row(r) for r in rows]
|
||
by_id = {c["doc_id"]: c for c in enriched}
|
||
selected: list[dict] = []
|
||
used: set[int] = set()
|
||
|
||
# forced_include
|
||
for fid, reason in FORCED_INCLUDES.items():
|
||
if fid in by_id:
|
||
c = by_id[fid]
|
||
c["bucket_label"] = "existing_anchor"
|
||
c["forced_include_reason"] = reason
|
||
selected.append(c)
|
||
used.add(fid)
|
||
else:
|
||
print(f"[warn] forced_include doc_id={fid} 가 existing_success 후보에 없음 — skip")
|
||
|
||
# calibration 4건 — text_density 분포 균형
|
||
remaining = [c for c in enriched if c["doc_id"] not in used]
|
||
quotas: list[tuple[str, int]] = [("scan-likely", 1), ("mixed", 1), ("born-digital", 2)]
|
||
for band, n in quotas:
|
||
pool = [c for c in remaining if c["text_density_band"] == band and c["doc_id"] not in used]
|
||
rng.shuffle(pool)
|
||
for c in pool[:n]:
|
||
c["bucket_label"] = "existing_calibration"
|
||
selected.append(c)
|
||
used.add(c["doc_id"])
|
||
if len(selected) < EXISTING_SUCCESS_TARGET:
|
||
leftover = [c for c in remaining if c["doc_id"] not in used]
|
||
rng.shuffle(leftover)
|
||
for c in leftover[: EXISTING_SUCCESS_TARGET - len(selected)]:
|
||
c["bucket_label"] = "existing_calibration"
|
||
selected.append(c)
|
||
used.add(c["doc_id"])
|
||
return selected[:EXISTING_SUCCESS_TARGET]
|
||
|
||
|
||
def _print_distribution(samples: list[dict]) -> None:
|
||
print(f"\n선정 {len(samples)}건 (목표 {PILOT_TARGET}):\n")
|
||
print(f"{'ID':>6} {'KB':>8} {'src':<22} {'bucket':<22} {'doctype':<22} {'density':>8} title")
|
||
print("-" * 160)
|
||
for s in sorted(samples, key=lambda x: (x["sample_source"], x["bucket_label"], x["doc_id"])):
|
||
d = s["text_density"]
|
||
density_s = f"{d:.2f}" if d is not None else "-"
|
||
print(
|
||
f"{s['doc_id']:>6} {(s['file_size'] // 1024):>8} "
|
||
f"{s['sample_source'][:22]:<22} "
|
||
f"{s['bucket_label'][:22]:<22} "
|
||
f"{(s['doc_type'] or '-')[:22]:<22} "
|
||
f"{density_s:>8} "
|
||
f"{(s['title'] or '-')[:60]}"
|
||
)
|
||
|
||
by_axis = lambda key: Counter(s[key] for s in samples)
|
||
print("\n분포:")
|
||
for axis in ("sample_source", "bucket_label", "doc_type", "file_size_band", "text_density_band", "handwritten_hint", "script_mix"):
|
||
c = by_axis(axis)
|
||
line = ", ".join(f"{k}={v}" for k, v in sorted(c.items(), key=lambda x: -x[1]))
|
||
print(f" {axis}: {line}")
|
||
|
||
|
||
async def cmd_select(out_path: Path, csv_path: Path | None) -> None:
|
||
engine = _build_engine()
|
||
Session = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
|
||
|
||
from models.document import Document # type: ignore
|
||
|
||
async with Session() as session:
|
||
# A. existing_success — 기존에 marker_worker 가 변환 성공한 PDFs (anchor + calibration).
|
||
es_rows = (
|
||
await session.execute(
|
||
select(
|
||
Document.id, Document.title, Document.ai_domain, Document.document_type,
|
||
Document.file_size, Document.file_path, Document.file_format,
|
||
Document.extracted_text, Document.md_extraction_quality,
|
||
).where(
|
||
Document.deleted_at.is_(None),
|
||
Document.file_format == "pdf",
|
||
Document.md_status == "success",
|
||
)
|
||
)
|
||
).all()
|
||
|
||
# B. controlled_backfill — pending PDF document, SKIP_DOC_TYPES 제외.
|
||
cb_rows = (
|
||
await session.execute(
|
||
select(
|
||
Document.id, Document.title, Document.ai_domain, Document.document_type,
|
||
Document.file_size, Document.file_path, Document.file_format,
|
||
Document.extracted_text, Document.md_extraction_quality,
|
||
).where(
|
||
Document.deleted_at.is_(None),
|
||
Document.file_format == "pdf",
|
||
Document.md_status == "pending",
|
||
Document.category == "document",
|
||
)
|
||
)
|
||
).all()
|
||
|
||
cb_candidates_raw = [
|
||
r for r in cb_rows if not (r.document_type and r.document_type in SKIP_DOC_TYPES)
|
||
]
|
||
print(
|
||
f"existing_success PDFs: {len(es_rows)}건 / "
|
||
f"controlled_backfill 후보 (SKIP 제외): {len(cb_candidates_raw)}건 "
|
||
f"(전체 pending PDF document: {len(cb_rows)}건)"
|
||
)
|
||
|
||
rng = random.Random(SAMPLE_SEED)
|
||
|
||
existing_samples = _allocate_existing_success(es_rows, rng)
|
||
for s in existing_samples:
|
||
s["sample_source"] = "existing_success"
|
||
s.setdefault("forced_include_reason", "")
|
||
|
||
cb_candidates = [_enrich_row(r) for r in cb_candidates_raw]
|
||
backfill_samples = _allocate_controlled_backfill(cb_candidates, rng)
|
||
for s in backfill_samples:
|
||
s["sample_source"] = "controlled_backfill"
|
||
s.setdefault("forced_include_reason", "")
|
||
|
||
samples = existing_samples + backfill_samples
|
||
|
||
_print_distribution(samples)
|
||
|
||
# CSV 저장 (사용자 review + commit 대상)
|
||
if csv_path is not None:
|
||
csv_path.parent.mkdir(parents=True, exist_ok=True)
|
||
with csv_path.open("w", newline="", encoding="utf-8") as f:
|
||
writer = csv.DictWriter(f, fieldnames=CSV_COLUMNS, extrasaction="ignore")
|
||
writer.writeheader()
|
||
for s in samples:
|
||
row = {col: s.get(col) for col in CSV_COLUMNS}
|
||
# bool → str (CSV 가독성)
|
||
row["scan_likely"] = "true" if s.get("scan_likely") else "false"
|
||
writer.writerow(row)
|
||
print(f"\nCSV 저장: {csv_path}")
|
||
|
||
# JSON 저장 — cmd_enqueue / cmd_report 호환용 (기존 schema 유지)
|
||
payload = {
|
||
"target": PILOT_TARGET,
|
||
"seed": SAMPLE_SEED,
|
||
"ids": [s["doc_id"] for s in samples],
|
||
"items": [
|
||
{
|
||
"id": s["doc_id"],
|
||
"title": s["title"],
|
||
"ai_domain": s.get("_ai_domain"),
|
||
"document_type": s["doc_type"],
|
||
"file_size": s["file_size"],
|
||
"file_path": s.get("_file_path"),
|
||
"sample_source": s["sample_source"],
|
||
"bucket_label": s["bucket_label"],
|
||
"forced_include_reason": s.get("forced_include_reason", ""),
|
||
}
|
||
for s in samples
|
||
],
|
||
}
|
||
out_path.write_text(json.dumps(payload, ensure_ascii=False, indent=2))
|
||
print(f"JSON 저장: {out_path}")
|
||
await engine.dispose()
|
||
|
||
|
||
# ─── enqueue ───
|
||
|
||
async def cmd_enqueue(in_path: Path, yes: bool, include_existing: bool = False) -> None:
|
||
from datetime import datetime, timezone
|
||
from sqlalchemy import func
|
||
|
||
raw_payload = in_path.read_text()
|
||
payload = json.loads(raw_payload)
|
||
items: list[dict] = payload.get("items", [])
|
||
|
||
# Round 2 정책: existing_success (5건, anchor + calibration) 는 enqueue 제외.
|
||
# 그들은 기존 md_content 그대로 두고 평가 anchor 로 사용. 재처리 시 marker 시간
|
||
# 낭비 + 같은 quality output overwrite 로 baseline 유실. controlled_backfill (25건)
|
||
# 만 새로 변환. 후속 라운드 (Marker 튜닝 후) anchor 재처리 필요 시 --include-existing.
|
||
if include_existing:
|
||
target_items = items
|
||
ids: list[int] = [it["id"] for it in target_items]
|
||
print(f"[mode] include_existing=True — sample 30건 전부 enqueue")
|
||
else:
|
||
target_items = [it for it in items if it.get("sample_source") == "controlled_backfill"]
|
||
ids = [it["id"] for it in target_items]
|
||
excluded = [it["id"] for it in items if it.get("sample_source") != "controlled_backfill"]
|
||
print(
|
||
f"[mode] controlled_backfill 만 enqueue ({len(ids)}건). "
|
||
f"existing_success 제외 ({len(excluded)}건): {excluded}"
|
||
)
|
||
|
||
if not ids:
|
||
print("[abort] enqueue 대상 없음.")
|
||
return
|
||
|
||
ts = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ")
|
||
|
||
# (1) /tmp/phase1d_pilot.json 원본 보존 — 재실행/덮어쓰기 대비 timestamped 사본
|
||
backup_path = in_path.with_name(f"{in_path.stem}_pre_enqueue_{ts}.json")
|
||
backup_path.write_text(raw_payload)
|
||
print(f"[backup] {backup_path}")
|
||
|
||
# (2) enqueue 대상 document_id 목록 (한 줄)
|
||
print(f"[targets] {len(ids)}건: {ids}")
|
||
|
||
engine = _build_engine()
|
||
Session = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
|
||
|
||
from models.document import Document # type: ignore
|
||
from models.queue import enqueue_stage # type: ignore
|
||
|
||
# (3) 실행 전 md_status 분포 스냅샷
|
||
async with Session() as session:
|
||
snap_rows = (
|
||
await session.execute(
|
||
select(Document.md_status, func.count())
|
||
.where(Document.deleted_at.is_(None))
|
||
.group_by(Document.md_status)
|
||
)
|
||
).all()
|
||
snapshot = {
|
||
"timestamp_utc": ts,
|
||
"scope": "documents WHERE deleted_at IS NULL",
|
||
"md_status_distribution": {str(s): int(c) for s, c in snap_rows},
|
||
"sample_ids": ids,
|
||
}
|
||
snap_path = in_path.with_name(f"phase1d_md_status_pre_{ts}.json")
|
||
snap_path.write_text(json.dumps(snapshot, ensure_ascii=False, indent=2))
|
||
print(f"[snapshot] {snap_path}")
|
||
print(f" {snapshot['md_status_distribution']}")
|
||
|
||
if not yes:
|
||
confirm = input(f"\n{len(ids)}건 markdown 큐에 enqueue 합니다. 진행? [y/N] ")
|
||
if confirm.strip().lower() not in ("y", "yes"):
|
||
print("취소됨.")
|
||
await engine.dispose()
|
||
return
|
||
|
||
enqueued, skipped = [], []
|
||
async with Session() as session:
|
||
for doc_id in ids:
|
||
ok = await enqueue_stage(session, doc_id, "markdown")
|
||
(enqueued if ok else skipped).append(doc_id)
|
||
await session.commit()
|
||
|
||
print(f"\nenqueued: {len(enqueued)}, skipped (이미 active): {len(skipped)}")
|
||
if skipped:
|
||
print(f" skipped ids: {skipped[:20]}{' …' if len(skipped) > 20 else ''}")
|
||
await engine.dispose()
|
||
|
||
|
||
# ─── report ───
|
||
|
||
def _stat(vals: list[float]) -> str:
|
||
if not vals:
|
||
return "-"
|
||
s = sorted(vals)
|
||
n = len(s)
|
||
p50 = s[n // 2]
|
||
p90 = s[min(n - 1, int(n * 0.9))]
|
||
return f"min={s[0]:.2f} p50={p50:.2f} p90={p90:.2f} max={s[-1]:.2f}"
|
||
|
||
|
||
KATEX_INLINE_RE = re.compile(r"(?<!\$)\$[^$\n]{1,160}\$(?!\$)")
|
||
|
||
|
||
async def cmd_report(in_path: Path) -> None:
|
||
payload = json.loads(in_path.read_text())
|
||
ids: list[int] = payload["ids"]
|
||
|
||
engine = _build_engine()
|
||
Session = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
|
||
|
||
from models.document import Document # type: ignore
|
||
from models.queue import ProcessingQueue # type: ignore
|
||
|
||
async with Session() as session:
|
||
rows = (
|
||
await session.execute(
|
||
select(
|
||
Document.id, Document.title, Document.ai_domain, Document.file_size,
|
||
Document.md_status, Document.md_extraction_error,
|
||
Document.md_extraction_quality, Document.md_content,
|
||
Document.md_generated_at, Document.created_at,
|
||
).where(Document.id.in_(ids))
|
||
)
|
||
).all()
|
||
|
||
q_rows = (
|
||
await session.execute(
|
||
select(
|
||
ProcessingQueue.document_id,
|
||
ProcessingQueue.status,
|
||
ProcessingQueue.attempts,
|
||
ProcessingQueue.started_at,
|
||
ProcessingQueue.completed_at,
|
||
).where(
|
||
ProcessingQueue.document_id.in_(ids),
|
||
ProcessingQueue.stage == "markdown",
|
||
)
|
||
)
|
||
).all()
|
||
|
||
elapsed: dict[int, float] = {}
|
||
queue_status: dict[int, str] = {}
|
||
queue_attempts: dict[int, int] = {}
|
||
for q in q_rows:
|
||
queue_status[q.document_id] = q.status
|
||
queue_attempts[q.document_id] = q.attempts
|
||
if q.started_at and q.completed_at:
|
||
elapsed[q.document_id] = (q.completed_at - q.started_at).total_seconds()
|
||
|
||
by_status = Counter(r.md_status for r in rows)
|
||
print(f"\n## md_status 분포 (sample {len(rows)}건)")
|
||
for s, c in sorted(by_status.items(), key=lambda x: -x[1]):
|
||
print(f" {s:<12} {c}")
|
||
in_queue = [i for i in ids if queue_status.get(i) in {"pending", "processing"}]
|
||
if in_queue:
|
||
print(f"\n 처리 대기/중 (큐 active): {len(in_queue)}건")
|
||
|
||
es = [v for k, v in elapsed.items()
|
||
if next((r for r in rows if r.id == k and r.md_status == "success"), None)]
|
||
if es:
|
||
print(f"\n## 평균 처리 시간 (success {len(es)}건)")
|
||
print(f" avg={sum(es)/len(es):.1f}s {_stat(es)}")
|
||
else:
|
||
print("\n## 처리 시간: 측정 가능한 success 없음")
|
||
|
||
fail_reasons: Counter = Counter()
|
||
for r in rows:
|
||
if r.md_status == "failed":
|
||
fail_reasons[(r.md_extraction_error or "unknown")[:140]] += 1
|
||
if fail_reasons:
|
||
print("\n## 실패 사유 top 5")
|
||
for reason, c in fail_reasons.most_common(5):
|
||
print(f" {c:>3} {reason}")
|
||
|
||
text_ratios, heading_jumps, table_rows, image_counts = [], [], [], []
|
||
warning_kinds: Counter = Counter()
|
||
katex_candidates, table_with_rows = [], []
|
||
has_headings = 0
|
||
success_rows = [r for r in rows if r.md_status == "success"]
|
||
for r in success_rows:
|
||
q = r.md_extraction_quality or {}
|
||
m = q.get("metrics", {}) or {}
|
||
if m.get("text_length_ratio") is not None: text_ratios.append(m["text_length_ratio"])
|
||
if m.get("heading_jump_count") is not None: heading_jumps.append(m["heading_jump_count"])
|
||
if m.get("markdown_table_row_count") is not None:
|
||
table_rows.append(m["markdown_table_row_count"])
|
||
if m["markdown_table_row_count"] > 0:
|
||
table_with_rows.append(r.id)
|
||
if m.get("markdown_image_count") is not None: image_counts.append(m["markdown_image_count"])
|
||
if (m.get("markdown_heading_count") or 0) > 0:
|
||
has_headings += 1
|
||
for w in (q.get("warnings") or []):
|
||
warning_kinds[w] += 1
|
||
c = r.md_content or ""
|
||
if "$$" in c or KATEX_INLINE_RE.search(c):
|
||
katex_candidates.append(r.id)
|
||
|
||
if success_rows:
|
||
print(f"\n## quality 메트릭 (success {len(success_rows)}건)")
|
||
print(f" text_length_ratio {_stat(text_ratios)}")
|
||
print(f" heading_jump_count {_stat(heading_jumps)}")
|
||
print(f" markdown_table_row_count {_stat(table_rows)} (rows>0: {len(table_with_rows)}건)")
|
||
print(f" markdown_image_count {_stat(image_counts)} (현재 server.py 가 _images 버림 → 0 정상)")
|
||
print(f" heading anchor 후보 (markdown_heading_count > 0): {has_headings}건")
|
||
print(f" KaTeX 후보 ($-수식 매칭): {len(katex_candidates)}건 ids={katex_candidates[:10]}")
|
||
if warning_kinds:
|
||
print("\n warnings (kind):")
|
||
for w, c in warning_kinds.most_common():
|
||
print(f" {c:>3} {w}")
|
||
|
||
print("\n## file_size bucket 별 success 비율")
|
||
bucket_stat: dict[str, list[int]] = defaultdict(lambda: [0, 0])
|
||
for r in rows:
|
||
b = _bucket(r.file_size)
|
||
bucket_stat[b][1] += 1
|
||
if r.md_status == "success":
|
||
bucket_stat[b][0] += 1
|
||
for b in ("small", "medium", "large", "unknown", "outlier"):
|
||
s, t = bucket_stat[b]
|
||
if t == 0:
|
||
continue
|
||
rate = (s / t * 100) if t else 0
|
||
print(f" {b:<8} {s}/{t} ({rate:.0f}%)")
|
||
|
||
print("\n## 사용자 검수 후보 (UI URL — 표 깨짐/heading anchor/KaTeX 직접 확인):")
|
||
for r in sorted(rows, key=lambda x: (x.md_status or "", -(x.file_size or 0))):
|
||
marker = "★" if r.id in katex_candidates else " "
|
||
print(f" {marker} https://document.hyungi.net/documents/{r.id} [{r.md_status}] {(r.title or '-')[:60]}")
|
||
|
||
await engine.dispose()
|
||
|
||
|
||
EVAL_TEMPLATE_COLUMNS = [
|
||
"doc_id", "title", "sample_source", "bucket_label",
|
||
# rubric 5축 1~5점 (사용자 작성). plan §"Quality evaluation rubric".
|
||
"text_accuracy", "structure", "noise_rate", "multi_script", "completeness",
|
||
"overall_pass", # boolean (true/false) — "검색/참고에 쓸 만한가" 직관 판단
|
||
"notes", # 자유서술
|
||
]
|
||
|
||
|
||
def cmd_eval_template(in_path: Path, csv_out: Path) -> None:
|
||
"""select 결과 JSON 을 읽어 평가용 빈 CSV 스켈레톤을 출력. 사용자가 점수 채움."""
|
||
payload = json.loads(in_path.read_text())
|
||
items = payload.get("items", [])
|
||
csv_out.parent.mkdir(parents=True, exist_ok=True)
|
||
with csv_out.open("w", newline="", encoding="utf-8") as f:
|
||
writer = csv.DictWriter(f, fieldnames=EVAL_TEMPLATE_COLUMNS, extrasaction="ignore")
|
||
writer.writeheader()
|
||
for it in items:
|
||
writer.writerow({
|
||
"doc_id": it["id"],
|
||
"title": it.get("title", ""),
|
||
"sample_source": it.get("sample_source", ""),
|
||
"bucket_label": it.get("bucket_label", ""),
|
||
"text_accuracy": "",
|
||
"structure": "",
|
||
"noise_rate": "",
|
||
"multi_script": "",
|
||
"completeness": "",
|
||
"overall_pass": "",
|
||
"notes": "",
|
||
})
|
||
print(f"eval template 저장: {csv_out} ({len(items)} rows)")
|
||
print("rubric: 1~5점 (text_accuracy / structure / noise_rate / multi_script / completeness)")
|
||
print(" overall_pass = true/false ('검색/참고에 쓸 만한가' 직관 판단)")
|
||
print(f"평가 가이드: evals/markdown/README.md 참조")
|
||
|
||
|
||
def main() -> None:
|
||
parser = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter)
|
||
sub = parser.add_subparsers(dest="cmd", required=True)
|
||
|
||
p_sel = sub.add_parser("select", help="stratified 30건 dry-run + CSV+JSON 저장")
|
||
p_sel.add_argument("--out", type=Path, default=DEFAULT_OUT, help="JSON (cmd_enqueue/report 호환용)")
|
||
p_sel.add_argument("--csv", type=Path, default=DEFAULT_CSV, help="CSV (사용자 review + commit 대상)")
|
||
|
||
p_enq = sub.add_parser("enqueue", help="markdown 큐 enqueue")
|
||
p_enq.add_argument("--in", dest="in_path", type=Path, default=DEFAULT_OUT)
|
||
p_enq.add_argument("--yes", action="store_true")
|
||
p_enq.add_argument(
|
||
"--include-existing",
|
||
action="store_true",
|
||
help="existing_success (anchor + calibration) 도 재처리. default 는 controlled_backfill 만.",
|
||
)
|
||
|
||
p_rep = sub.add_parser("report", help="결과 집계")
|
||
p_rep.add_argument("--in", dest="in_path", type=Path, default=DEFAULT_OUT)
|
||
|
||
p_evt = sub.add_parser("eval_template", help="평가 CSV 스켈레톤 출력 (사용자가 rubric 점수 채움)")
|
||
p_evt.add_argument("--in", dest="in_path", type=Path, default=DEFAULT_OUT)
|
||
p_evt.add_argument("--csv", type=Path, default=DEFAULT_EVAL_CSV)
|
||
|
||
args = parser.parse_args()
|
||
if args.cmd == "select":
|
||
asyncio.run(cmd_select(args.out, args.csv))
|
||
elif args.cmd == "enqueue":
|
||
asyncio.run(cmd_enqueue(args.in_path, args.yes, args.include_existing))
|
||
elif args.cmd == "report":
|
||
asyncio.run(cmd_report(args.in_path))
|
||
elif args.cmd == "eval_template":
|
||
cmd_eval_template(args.in_path, args.csv)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|