"""Phase 1B.5 ImgAuth — 기존 marker success 문서 targeted 재변환. 목적: Phase 1B 배포 (2026-05-01) ~ 1B.5 직전 사이에 marker_worker 가 만든 success 문서들은 md_content 안에 깨진 ref (`![](_page_0_Picture_3.jpeg)` 등) 만 있고 NAS 파일 부재. 1B.5 deploy 후 이 문서들을 force_reprocess=true 로 다시 큐에 넣어 이미지 persist + md_content ref 정규화 (`docimg:img_NNN`) 을 적용한다. 사용자 못박은 절차 (plan: piped-humming-crystal.md Step 6): 1. 1B.5 merge/deploy 전에는 기존 success 문서 절대 건드리지 않음 2. 1B.5 deploy + 신규 업로드 1~2건 + sample 5건 검증 후에만 28건 진행 3. anchor/baseline 보존: pre-snapshot CSV + 재변환 전후 quality metric 비교값 4. Phase 2 cron 와 분리 (별 단계, 야간 시간 단발 실행) 실행: # 1) 후보 + pre-snapshot 출력 (CSV stdout) docker compose exec fastapi python /app/scripts/marker_reprocess_existing_success.py --dry-run # 2) sample 모드 (지정한 doc_id 만 enqueue) docker compose exec fastapi python /app/scripts/marker_reprocess_existing_success.py \\ --apply --only 4809,5127,5180,5183 # 3) 전체 28건 enqueue (sample 검증 통과 후) docker compose exec fastapi python /app/scripts/marker_reprocess_existing_success.py --apply # 4) snapshot CSV 파일 저장 docker compose exec fastapi python /app/scripts/marker_reprocess_existing_success.py \\ --dry-run --snapshot-csv /app/logs/marker_pre_imgauth_snapshot_2026MMDD.csv 배포 후 검증: - 모든 후보가 md_status='success' 유지 - document_images row 분포 (예상: ~20건 이미지 보유, ~8건 없음) - md_content 안 `docimg:` ref 수 == document_images row 수 per doc - anchor doc 4809 quality 비교 (heading_count / table_row_count / text_length_ratio ±5%) """ import argparse import asyncio import csv import json import os import sys from io import StringIO sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "app")) from sqlalchemy import text from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine CANDIDATES_SQL = """ SELECT id, md_status, md_content_hash, md_extraction_engine, md_extraction_engine_version, md_extraction_quality, md_generated_at, file_format, file_path, title FROM documents WHERE md_status = 'success' AND md_extraction_engine = 'marker' ORDER BY id """ def _serialize_row(row) -> dict: quality = row.md_extraction_quality return { "id": row.id, "md_status": row.md_status, "md_content_hash": row.md_content_hash, "md_extraction_engine": row.md_extraction_engine, "md_extraction_engine_version": row.md_extraction_engine_version, "md_extraction_quality": json.dumps(quality, ensure_ascii=False) if quality else "", "md_generated_at": row.md_generated_at.isoformat() if row.md_generated_at else "", "file_format": row.file_format, "file_path": row.file_path, "title": row.title or "", } async def run(*, apply: bool, only_ids: set[int] | None, snapshot_csv: str | None) -> int: database_url = os.getenv( "DATABASE_URL", "postgresql+asyncpg://pkm:pkm@localhost:5432/pkm", ) engine = create_async_engine(database_url) session_factory = async_sessionmaker( engine, class_=AsyncSession, expire_on_commit=False ) try: async with session_factory() as session: rows = (await session.execute(text(CANDIDATES_SQL))).all() if only_ids: rows = [r for r in rows if r.id in only_ids] print(f"=== marker success 후보 = {len(rows)}건 ===") if not rows: print("후보 없음 — 종료.") return 0 # pre-snapshot CSV 출력 buf = StringIO() writer = csv.DictWriter( buf, fieldnames=[ "id", "md_status", "md_content_hash", "md_extraction_engine", "md_extraction_engine_version", "md_extraction_quality", "md_generated_at", "file_format", "file_path", "title", ], ) writer.writeheader() for row in rows: writer.writerow(_serialize_row(row)) csv_text = buf.getvalue() if snapshot_csv: with open(snapshot_csv, "w", encoding="utf-8") as f: f.write(csv_text) print(f"[snapshot] {snapshot_csv} 에 {len(rows)}행 기록") else: print("\n=== Pre-snapshot CSV ===") print(csv_text) if not apply: print(f"\n[dry-run] {len(rows)}건 영향. --apply 로 실제 enqueue.") return 0 # enqueue — UNIQUE(document_id, stage) WHERE status IN ('pending', 'processing') # 가 있으므로 활성 markdown 행이 없는 doc 만 통과. 충돌 시 silent skip. # CAST(:payload AS jsonb) — `::jsonb` postfix 캐스트는 SQLAlchemy text() # 의 named-param prefix `:` 와 충돌해 syntax error 발생. ENQUEUE_SQL = text(""" INSERT INTO processing_queue (document_id, stage, status, payload) VALUES (:doc_id, 'markdown', 'pending', CAST(:payload AS jsonb)) ON CONFLICT DO NOTHING """) payload = json.dumps({ "force_reprocess": True, "reason": "phase_1b5_imgauth_targeted_reprocess", }) inserted = 0 for row in rows: result = await session.execute( ENQUEUE_SQL, {"doc_id": row.id, "payload": payload} ) if result.rowcount > 0: inserted += 1 await session.commit() print(f"\n[apply] enqueue 완료 — {inserted}/{len(rows)} 건 신규 markdown 큐 추가") print(" (skip = 이미 활성 markdown 큐 행이 있는 문서)") return 0 finally: await engine.dispose() def _parse_only_ids(arg: str | None) -> set[int] | None: if not arg: return None out: set[int] = set() for part in arg.split(","): part = part.strip() if part: out.add(int(part)) return out or None def main() -> int: parser = argparse.ArgumentParser(description=__doc__) parser.add_argument("--apply", action="store_true", help="실제 enqueue (기본 dry-run)") parser.add_argument("--dry-run", action="store_true", help="명시적 dry-run (default 동등)") parser.add_argument( "--only", type=str, default=None, help="쉼표 구분 doc_id 화이트리스트 (sample 검증용, 예: 4809,5127,5180)", ) parser.add_argument( "--snapshot-csv", type=str, default=None, help="pre-snapshot 을 stdout 대신 이 경로의 CSV 파일로 저장", ) args = parser.parse_args() if args.apply and args.dry_run: parser.error("--apply 와 --dry-run 동시 지정 불가") only_ids = _parse_only_ids(args.only) return asyncio.run(run( apply=args.apply, only_ids=only_ids, snapshot_csv=args.snapshot_csv, )) if __name__ == "__main__": sys.exit(main())