Files
hyungi_document_server/scripts/marker_reprocess_existing_success.py
T
Hyungi Ahn 68fa86ea52 feat(markdown): persist extracted images with auth routes
Markdown Canonical Phase 1B.5 — marker 가 추출하던 이미지를 NAS 에 영구 저장하고
DB 메타 + 인증 라우트 + 프론트 swap 까지 wiring.

핵심 변경:
- marker-service /convert 응답에 base64 image 리스트 포함 (stateless 유지, NAS write 권한 X)
- marker_worker 가 NAS `/documents/extracted_images/{doc_id}/` 에 persist + UPSERT +
  고아 row DELETE + md_content ref 를 `docimg:img_NNN` stable scheme 으로 정규화
- /api/documents/{id}/images/{key}/raw 인증 라우트 (Cache-Control private + ETag = content_hash)
- frontend MarkdownDoc 가 placeholder card 안의 docimg ref 를 실제 <img> 로 swap

원칙:
- 이미지 binary = NAS, metadata = Postgres (학습 섹션 패턴 동일)
- image_key sequence 기반 결정적 → 재변환 idempotent
- MARKDOWN_IMAGE_PERSIST=false env 로 rollback 가능 (placeholder card 폴백 자연 유지)

기존 28건 marker success 문서는 본 PR 에서 건드리지 않음 — deploy + 신규 업로드 1건 +
sample 5건 검증 후 scripts/marker_reprocess_existing_success.py 로 targeted reprocess.

plan: ~/.claude/plans/piped-humming-crystal.md

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-10 14:05:41 +09:00

194 lines
7.1 KiB
Python

"""Phase 1B.5 ImgAuth — 기존 marker success 문서 targeted 재변환.
목적:
Phase 1B 배포 (2026-05-01) ~ 1B.5 직전 사이에 marker_worker 가 만든 success 문서들은
md_content 안에 깨진 ref (`![](_page_0_Picture_3.jpeg)` 등) 만 있고 NAS 파일 부재.
1B.5 deploy 후 이 문서들을 force_reprocess=true 로 다시 큐에 넣어 이미지 persist +
md_content ref 정규화 (`docimg:img_NNN`) 을 적용한다.
사용자 못박은 절차 (plan: piped-humming-crystal.md Step 6):
1. 1B.5 merge/deploy 전에는 기존 success 문서 절대 건드리지 않음
2. 1B.5 deploy + 신규 업로드 1~2건 + sample 5건 검증 후에만 28건 진행
3. anchor/baseline 보존: pre-snapshot CSV + 재변환 전후 quality metric 비교값
4. Phase 2 cron 와 분리 (별 단계, 야간 시간 단발 실행)
실행:
# 1) 후보 + pre-snapshot 출력 (CSV stdout)
docker compose exec fastapi python /app/scripts/marker_reprocess_existing_success.py --dry-run
# 2) sample 모드 (지정한 doc_id 만 enqueue)
docker compose exec fastapi python /app/scripts/marker_reprocess_existing_success.py \\
--apply --only 4809,5127,5180,5183
# 3) 전체 28건 enqueue (sample 검증 통과 후)
docker compose exec fastapi python /app/scripts/marker_reprocess_existing_success.py --apply
# 4) snapshot CSV 파일 저장
docker compose exec fastapi python /app/scripts/marker_reprocess_existing_success.py \\
--dry-run --snapshot-csv /app/logs/marker_pre_imgauth_snapshot_2026MMDD.csv
배포 후 검증:
- 모든 후보가 md_status='success' 유지
- document_images row 분포 (예상: ~20건 이미지 보유, ~8건 없음)
- md_content 안 `docimg:` ref 수 == document_images row 수 per doc
- anchor doc 4809 quality 비교 (heading_count / table_row_count / text_length_ratio ±5%)
"""
import argparse
import asyncio
import csv
import json
import os
import sys
from io import StringIO
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "app"))
from sqlalchemy import text
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
CANDIDATES_SQL = """
SELECT id, md_status, md_content_hash, md_extraction_engine, md_extraction_engine_version,
md_extraction_quality, md_generated_at,
file_format, file_path, title
FROM documents
WHERE md_status = 'success'
AND md_extraction_engine = 'marker'
ORDER BY id
"""
def _serialize_row(row) -> dict:
quality = row.md_extraction_quality
return {
"id": row.id,
"md_status": row.md_status,
"md_content_hash": row.md_content_hash,
"md_extraction_engine": row.md_extraction_engine,
"md_extraction_engine_version": row.md_extraction_engine_version,
"md_extraction_quality": json.dumps(quality, ensure_ascii=False) if quality else "",
"md_generated_at": row.md_generated_at.isoformat() if row.md_generated_at else "",
"file_format": row.file_format,
"file_path": row.file_path,
"title": row.title or "",
}
async def run(*, apply: bool, only_ids: set[int] | None, snapshot_csv: str | None) -> int:
database_url = os.getenv(
"DATABASE_URL",
"postgresql+asyncpg://pkm:pkm@localhost:5432/pkm",
)
engine = create_async_engine(database_url)
session_factory = async_sessionmaker(
engine, class_=AsyncSession, expire_on_commit=False
)
try:
async with session_factory() as session:
rows = (await session.execute(text(CANDIDATES_SQL))).all()
if only_ids:
rows = [r for r in rows if r.id in only_ids]
print(f"=== marker success 후보 = {len(rows)}건 ===")
if not rows:
print("후보 없음 — 종료.")
return 0
# pre-snapshot CSV 출력
buf = StringIO()
writer = csv.DictWriter(
buf,
fieldnames=[
"id", "md_status", "md_content_hash", "md_extraction_engine",
"md_extraction_engine_version", "md_extraction_quality",
"md_generated_at", "file_format", "file_path", "title",
],
)
writer.writeheader()
for row in rows:
writer.writerow(_serialize_row(row))
csv_text = buf.getvalue()
if snapshot_csv:
with open(snapshot_csv, "w", encoding="utf-8") as f:
f.write(csv_text)
print(f"[snapshot] {snapshot_csv}{len(rows)}행 기록")
else:
print("\n=== Pre-snapshot CSV ===")
print(csv_text)
if not apply:
print(f"\n[dry-run] {len(rows)}건 영향. --apply 로 실제 enqueue.")
return 0
# enqueue — UNIQUE(document_id, stage) WHERE status IN ('pending', 'processing')
# 가 있으므로 활성 markdown 행이 없는 doc 만 통과. 충돌 시 silent skip.
ENQUEUE_SQL = text("""
INSERT INTO processing_queue (document_id, stage, status, payload)
VALUES (:doc_id, 'markdown', 'pending', :payload::jsonb)
ON CONFLICT DO NOTHING
""")
payload = json.dumps({
"force_reprocess": True,
"reason": "phase_1b5_imgauth_targeted_reprocess",
})
inserted = 0
for row in rows:
result = await session.execute(
ENQUEUE_SQL, {"doc_id": row.id, "payload": payload}
)
if result.rowcount > 0:
inserted += 1
await session.commit()
print(f"\n[apply] enqueue 완료 — {inserted}/{len(rows)} 건 신규 markdown 큐 추가")
print(" (skip = 이미 활성 markdown 큐 행이 있는 문서)")
return 0
finally:
await engine.dispose()
def _parse_only_ids(arg: str | None) -> set[int] | None:
if not arg:
return None
out: set[int] = set()
for part in arg.split(","):
part = part.strip()
if part:
out.add(int(part))
return out or None
def main() -> int:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--apply", action="store_true", help="실제 enqueue (기본 dry-run)")
parser.add_argument("--dry-run", action="store_true", help="명시적 dry-run (default 동등)")
parser.add_argument(
"--only", type=str, default=None,
help="쉼표 구분 doc_id 화이트리스트 (sample 검증용, 예: 4809,5127,5180)",
)
parser.add_argument(
"--snapshot-csv", type=str, default=None,
help="pre-snapshot 을 stdout 대신 이 경로의 CSV 파일로 저장",
)
args = parser.parse_args()
if args.apply and args.dry_run:
parser.error("--apply 와 --dry-run 동시 지정 불가")
only_ids = _parse_only_ids(args.only)
return asyncio.run(run(
apply=args.apply,
only_ids=only_ids,
snapshot_csv=args.snapshot_csv,
))
if __name__ == "__main__":
sys.exit(main())