diff --git a/app/api/documents.py b/app/api/documents.py index 920bb7d..a59e10d 100644 --- a/app/api/documents.py +++ b/app/api/documents.py @@ -33,6 +33,7 @@ from core.config import settings from core.database import get_session from core.utils import file_hash from models.document import Document +from models.document_image import DocumentImage from models.queue import ProcessingQueue, enqueue_stage from models.user import User from services.document_telemetry import record_analyze_event, sanitize_source @@ -670,6 +671,47 @@ async def get_document_file( ) +@router.get("/{doc_id}/images/{image_key}/raw") +async def get_document_image_raw( + doc_id: int, + image_key: str, + user: Annotated[User, Depends(get_current_user)], + session: Annotated[AsyncSession, Depends(get_session)], +): + """marker 추출 이미지 raw bytes (Phase 1B.5). + + md_content 안의 `![alt](docimg:img_NNN)` ref 를 frontend selector 가 이 라우트로 변환. + 인증된 사용자만 응답 (단일 사용자 환경, ownership 컬럼 없음 — get_current_user 게이트 충분). + """ + # 문서 존재 확인 (image_key 만 있고 doc 가 사라진 케이스 차단) + doc = await session.get(Document, doc_id) + if doc is None: + raise HTTPException(status_code=404, detail="문서를 찾을 수 없습니다") + + img = await session.scalar( + select(DocumentImage).where( + DocumentImage.document_id == doc_id, + DocumentImage.image_key == image_key, + ) + ) + if img is None: + raise HTTPException(status_code=404, detail="이미지를 찾을 수 없습니다") + + file_path = Path(img.file_path) + if not file_path.is_file(): + raise HTTPException(status_code=410, detail="파일이 사라졌습니다") + + return FileResponse( + str(file_path), + media_type=img.mime_type, + headers={ + # 인증 라우트라 CDN/공용 cache 금지. 단일 사용자라 private + 1h 충분. + "Cache-Control": "private, max-age=3600", + "ETag": f'"{img.content_hash}"', + }, + ) + + @router.post("/", response_model=DocumentResponse, status_code=201) async def upload_document( request: Request, diff --git a/app/models/document_image.py b/app/models/document_image.py new file mode 100644 index 0000000..d5ae63a --- /dev/null +++ b/app/models/document_image.py @@ -0,0 +1,42 @@ +"""document_images ORM (Phase 1B.5) — marker 추출 이미지 메타. + +저장: NAS `/documents/extracted_images/{document_id}/{image_key}.{ext}` +표시: GET /api/documents/{doc_id}/images/{image_key}/raw (인증 필요) + +md_content 의 ref 는 `![alt](docimg:img_001)` 형식 — image_key 가 sequence 기반 결정적이라 +재변환 시 idempotent. +""" + +from datetime import datetime + +from sqlalchemy import BigInteger, DateTime, ForeignKey, Integer, String, Text +from sqlalchemy.orm import Mapped, mapped_column + +from core.database import Base + + +class DocumentImage(Base): + __tablename__ = "document_images" + + id: Mapped[int] = mapped_column(BigInteger, primary_key=True) + document_id: Mapped[int] = mapped_column( + BigInteger, ForeignKey("documents.id", ondelete="CASCADE"), nullable=False + ) + image_key: Mapped[str] = mapped_column(String(32), nullable=False) + relative_path: Mapped[str] = mapped_column(Text, nullable=False) + file_path: Mapped[str] = mapped_column(Text, nullable=False) + mime_type: Mapped[str] = mapped_column(Text, nullable=False) + file_size: Mapped[int] = mapped_column(BigInteger, nullable=False) + content_hash: Mapped[str] = mapped_column(String(64), nullable=False) + width: Mapped[int | None] = mapped_column(Integer) + height: Mapped[int | None] = mapped_column(Integer) + page_index: Mapped[int | None] = mapped_column(Integer) + alt_text: Mapped[str | None] = mapped_column(Text) + source_slug: Mapped[str | None] = mapped_column(Text) + extraction_engine: Mapped[str] = mapped_column( + String(32), nullable=False, default="marker" + ) + extraction_engine_version: Mapped[str | None] = mapped_column(String(32)) + created_at: Mapped[datetime] = mapped_column( + DateTime(timezone=True), default=datetime.now, nullable=False + ) diff --git a/app/workers/marker_worker.py b/app/workers/marker_worker.py index d653866..c7ff1eb 100644 --- a/app/workers/marker_worker.py +++ b/app/workers/marker_worker.py @@ -1,13 +1,19 @@ -"""marker_worker — markdown stage 소비. Phase 1B Round 5. +"""marker_worker — markdown stage 소비. Phase 1B + Phase 1B.5 (ImgAuth). 플로우: - classify_worker 완료 → enqueue 'markdown' stage + classify_worker 완료 → enqueue 'markdown' stage (또는 reprocess 스크립트가 force=True 로 enqueue) → marker_worker.process() → doc_type / 확장자 / page_count 가드 → marker-service POST /convert + → 응답 이미지 NAS persist + document_images UPSERT + md_content ref 정규화 → md_content 저장 또는 doc-level failed (404/422) 또는 transient raise (5xx → queue retry) -plan: ~/.claude/plans/plan-idempotent-sundae.md +이미지 저장 위치: NAS `/documents/extracted_images/{document_id}/{image_key}.{ext}` +md_content ref 형식: `![alt](docimg:img_001)` — image_key 가 sequence 기반 결정적 → idempotent. + +plan: ~/.claude/plans/piped-humming-crystal.md """ +import base64 +import hashlib import logging import os import re @@ -16,10 +22,13 @@ from typing import Any import fitz # PyMuPDF import httpx -from sqlalchemy import update +from sqlalchemy import delete, desc, select, update +from sqlalchemy.dialects.postgresql import insert as pg_insert from sqlalchemy.ext.asyncio import AsyncSession from models.document import Document +from models.document_image import DocumentImage +from models.queue import ProcessingQueue logger = logging.getLogger(__name__) @@ -27,6 +36,22 @@ MARKER_ENDPOINT = "http://marker-service:3300/convert" MARKER_TIMEOUT = 300 # 큰 PDF 5 분 한도 MAX_PAGES = 200 # 페이지 hard limit +# Phase 1B.5: 이미지 NAS persist 토글. rollback 시 false → 응답 images 무시 + md_content +# rewrite skip → placeholder card 폴백 자연 유지. 환경변수 미설정 = 기본 활성화. +MARKDOWN_IMAGE_PERSIST = os.getenv("MARKDOWN_IMAGE_PERSIST", "true").lower() in ("1", "true", "yes") +EXTRACTED_IMAGES_ROOT = Path("/documents/extracted_images") + +# md_content image ref 정규식. alt + href 캡처. 외부 URL 은 보존 (slug 매칭 안 되는 경우). +_IMAGE_REF_RE = re.compile(r"!\[([^\]]*)\]\(([^)]+)\)") + +_FORMAT_TO_MIME = { + "png": "image/png", + "jpeg": "image/jpeg", + "jpg": "image/jpeg", + "webp": "image/webp", + "gif": "image/gif", +} + # Phase 1B = PDF only. DOCX 등은 후속 Phase. SUPPORTED_EXTENSIONS = {".pdf"} @@ -87,6 +112,11 @@ async def process(document_id: int, session: AsyncSession) -> None: logger.warning(f"[marker] document {document_id} not found") return + # ---- (0) force_reprocess flag — reprocess 스크립트가 queue payload 로 전달 ---- + force_reprocess = await _read_force_reprocess(session, document_id) + if force_reprocess: + logger.info(f"markdown_force_reprocess id={document_id}") + # ---- (1) doc_type skip ---- if doc.document_type in SKIP_DOC_TYPES: logger.info( @@ -180,9 +210,33 @@ async def process(document_id: int, session: AsyncSession) -> None: await _fail(session, document_id, str(exc)[:1000]) return - # ---- (7) success ---- - md_content = data["md_content"] + # ---- (7) image persist + md_content rewrite (Phase 1B.5) ---- + md_content_raw = data["md_content"] + images_resp = data.get("images") if MARKDOWN_IMAGE_PERSIST else None + + saved_images: list[dict[str, Any]] = [] + if images_resp: + try: + saved_images = _persist_images_to_nas(document_id, images_resp) + except OSError as exc: + # NAS 일시 끊김 등 — transient. queue retry 로 복구. + logger.warning( + f"[marker] image persist NAS write failed id={document_id}: " + f"{type(exc).__name__}: {exc}" + ) + raise + + # md_content 안의 ref 를 stable internal scheme `docimg:img_NNN` 으로 정규화. + slug_to_key = {img["source_slug"]: img["image_key"] for img in saved_images} + md_content = _rewrite_image_refs(md_content_raw, slug_to_key) + + # quality 메트릭은 정규화 *후* md_content 기준 (실제 저장본). image_count 도 정확. quality = _compute_quality(md_content, doc.extracted_text or "", data["raw_metrics"]) + if data.get("images_truncated"): + quality.setdefault("warnings", []).append("images_truncated") + + # ---- (8) DB 트랜잭션 — documents UPDATE + document_images UPSERT + 고아 row DELETE ---- + orphan_paths = await _sync_document_images(session, document_id, saved_images, data) await session.execute( update(Document).where(Document.id == document_id).values( @@ -191,7 +245,7 @@ async def process(document_id: int, session: AsyncSession) -> None: md_extraction_engine=data["engine"], md_extraction_engine_version=data["engine_version"], md_extraction_quality=quality, - md_content_hash=data["md_content_hash"], + md_content_hash=hashlib.sha256(md_content.encode("utf-8")).hexdigest(), md_source_hash=doc.file_hash, md_generated_at=_now(), md_extraction_error=None, @@ -201,11 +255,184 @@ async def process(document_id: int, session: AsyncSession) -> None: ) ) await session.commit() + + # ---- (9) commit 후 고아 NAS 파일 unlink (best-effort, 실패해도 DB 정합 유지) ---- + for orphan_path in orphan_paths: + try: + Path(orphan_path).unlink(missing_ok=True) + except Exception as exc: + logger.warning( + f"[marker] orphan image unlink failed id={document_id} path={orphan_path}: " + f"{type(exc).__name__}: {exc}" + ) + logger.info( - f"[marker] success id={document_id} len={len(md_content)} elapsed_ms={data['elapsed_ms']}" + f"[marker] success id={document_id} len={len(md_content)} " + f"images={len(saved_images)} orphans_removed={len(orphan_paths)} " + f"elapsed_ms={data['elapsed_ms']}" ) +async def _read_force_reprocess(session: AsyncSession, document_id: int) -> bool: + """현재 markdown stage queue 행의 payload.force_reprocess 조회. 없으면 False.""" + row = await session.scalar( + select(ProcessingQueue) + .where( + ProcessingQueue.document_id == document_id, + ProcessingQueue.stage == "markdown", + ProcessingQueue.status == "processing", + ) + .order_by(desc(ProcessingQueue.id)) + .limit(1) + ) + if not row or not row.payload: + return False + return bool(row.payload.get("force_reprocess")) + + +def _persist_images_to_nas( + document_id: int, images_resp: list[dict[str, Any]] +) -> list[dict[str, Any]]: + """marker 응답 이미지 list 를 NAS 에 저장하고 메타 dict 리스트 반환. + + image_key 는 sequence 기반 결정적 (`img_001` → `img_NNN`, marker 출력 순서 = 안정적). + 같은 doc 재변환 시 같은 key 가 같은 path 에 overwrite → idempotent. + """ + img_root = EXTRACTED_IMAGES_ROOT / str(document_id) + img_root.mkdir(parents=True, exist_ok=True) + + saved: list[dict[str, Any]] = [] + for seq, img in enumerate(images_resp, start=1): + try: + raw_bytes = base64.b64decode(img["bytes_b64"]) + except Exception as exc: + logger.warning( + f"[marker] image base64 decode failed id={document_id} " + f"seq={seq} slug={img.get('slug')}: {exc}" + ) + continue + + fmt = (img.get("format") or "png").lower() + ext = "jpeg" if fmt == "jpg" else fmt + image_key = f"img_{seq:03d}" + filename = f"{image_key}.{ext}" + rel_path = f"extracted_images/{document_id}/{filename}" + abs_path = img_root / filename + + # NAS write — 실패 시 OSError raise (transient retry). + abs_path.write_bytes(raw_bytes) + + saved.append({ + "image_key": image_key, + "source_slug": img.get("slug") or "", + "relative_path": rel_path, + "file_path": str(abs_path), + "mime_type": _FORMAT_TO_MIME.get(ext, "application/octet-stream"), + "file_size": len(raw_bytes), + "content_hash": hashlib.sha256(raw_bytes).hexdigest(), + "width": img.get("width"), + "height": img.get("height"), + }) + return saved + + +def _rewrite_image_refs(md_text: str, slug_to_key: dict[str, str]) -> str: + """md_content 안의 `![alt](slug)` ref 를 `![alt](docimg:image_key)` 로 정규화. + + - slug_to_key 에 없는 href 는 원본 유지 (외부 URL / 경로 변형 등) + - alt 그대로 보존 + - 매칭 = href 가 (a) 정확히 slug 와 같음 OR (b) basename 이 slug 와 같음 + (marker 가 `_page_0_Picture_3.jpeg` 또는 `subdir/_page_0_Picture_3.jpeg` 어느 쪽도 emit 가능) + """ + if not slug_to_key: + return md_text + + def _replace(match: re.Match) -> str: + alt, href = match.group(1), match.group(2) + # 정확 매치 우선 + if href in slug_to_key: + return f"![{alt}](docimg:{slug_to_key[href]})" + # basename 매치 fallback + basename = href.rsplit("/", 1)[-1] + if basename in slug_to_key: + return f"![{alt}](docimg:{slug_to_key[basename]})" + return match.group(0) + + return _IMAGE_REF_RE.sub(_replace, md_text) + + +async def _sync_document_images( + session: AsyncSession, + document_id: int, + saved_images: list[dict[str, Any]], + response_data: dict[str, Any], +) -> list[str]: + """document_images 동기화 — 신규 keys UPSERT + 고아 row DELETE. + + 반환: commit 후 unlink 해야 할 NAS 파일 경로 리스트. + """ + # 기존 row 조회 (file_path 보존 — 고아 파일 unlink 용) + existing = (await session.execute( + select(DocumentImage.image_key, DocumentImage.file_path) + .where(DocumentImage.document_id == document_id) + )).all() + existing_map = {key: path for key, path in existing} + + new_keys = {img["image_key"] for img in saved_images} + orphan_keys = set(existing_map.keys()) - new_keys + orphan_paths = [existing_map[k] for k in orphan_keys] + + # UPSERT — image_key 가 같으면 file_path/hash/dimensions 등을 갱신. + for img in saved_images: + stmt = ( + pg_insert(DocumentImage) + .values( + document_id=document_id, + image_key=img["image_key"], + relative_path=img["relative_path"], + file_path=img["file_path"], + mime_type=img["mime_type"], + file_size=img["file_size"], + content_hash=img["content_hash"], + width=img.get("width"), + height=img.get("height"), + page_index=img.get("page_index"), + alt_text=img.get("alt_text"), + source_slug=img.get("source_slug"), + extraction_engine=response_data.get("engine") or "marker", + extraction_engine_version=response_data.get("engine_version"), + ) + .on_conflict_do_update( + index_elements=["document_id", "image_key"], + set_={ + "relative_path": img["relative_path"], + "file_path": img["file_path"], + "mime_type": img["mime_type"], + "file_size": img["file_size"], + "content_hash": img["content_hash"], + "width": img.get("width"), + "height": img.get("height"), + "page_index": img.get("page_index"), + "alt_text": img.get("alt_text"), + "source_slug": img.get("source_slug"), + "extraction_engine": response_data.get("engine") or "marker", + "extraction_engine_version": response_data.get("engine_version"), + }, + ) + ) + await session.execute(stmt) + + if orphan_keys: + await session.execute( + delete(DocumentImage).where( + DocumentImage.document_id == document_id, + DocumentImage.image_key.in_(orphan_keys), + ) + ) + + return orphan_paths + + def _compute_quality(md: str, raw_text: str, raw_metrics: dict[str, Any]) -> dict[str, Any]: """1B 휴리스틱 quality. 임계 판정 미적용 (Phase 1D 후행).""" heading_lines = re.findall(r"^(#{1,6})\s", md, flags=re.MULTILINE) diff --git a/frontend/src/app.css b/frontend/src/app.css index 0a8d43e..027d085 100644 --- a/frontend/src/app.css +++ b/frontend/src/app.css @@ -200,5 +200,15 @@ body { } .markdown-doc .md-image-placeholder-icon { font-style: normal; opacity: 0.7; } +/* Phase 1B.5: ImgAuth selector 가 placeholder figure 안에 삽입하는 실제 . + selector 가 figure 의 자식만 교체하므로 figure 의 margin 은 유지 — 별도 wrapper 불필요. */ +.markdown-doc .md-image-placeholder > .md-image { + display: block; + max-width: 100%; + height: auto; + border-radius: 6px; + /* placeholder card 의 dashed 테두리는 swap 시 children 만 교체되어 자연 제거됨. */ +} + /* Phase 1C: frontmatter 박스 — 본문 위 메타 표시 */ .md-frontmatter dt { font-weight: 500; } diff --git a/frontend/src/lib/components/MarkdownDoc.svelte b/frontend/src/lib/components/MarkdownDoc.svelte index 4df2af3..01df7c2 100644 --- a/frontend/src/lib/components/MarkdownDoc.svelte +++ b/frontend/src/lib/components/MarkdownDoc.svelte @@ -19,6 +19,8 @@ import MarkdownStatusBadge from '$lib/components/MarkdownStatusBadge.svelte'; type Props = { + /** Phase 1B.5: docimg:img_NNN ref 를 /api/documents/{id}/images/{key}/raw 로 변환할 때 필요. */ + documentId?: number | null; mdContent?: string | null; mdFrontmatter?: Record | null; extractedText?: string | null; @@ -31,6 +33,7 @@ }; let { + documentId = null, mdContent = null, mdFrontmatter = null, extractedText = null, @@ -93,6 +96,40 @@ h.insertBefore(a, h.firstChild); } }); + + // Phase 1B.5: docimg:img_NNN placeholder → 실제 swap. + // - data-md-image-internal="1" 인 figure 만 대상 (외부 URL placeholder 는 그대로 유지) + // - documentId 미전달 시 swap 안 함 (prop 누락 / list view 등 안전) + // - fetch 실패 시 placeholder 유지 (onerror 로 figure 복원) + $effect(() => { + void renderedHtml; + if (!containerRef || documentId == null) return; + const placeholders = containerRef.querySelectorAll( + 'figure.md-image-placeholder[data-md-image-internal="1"]', + ); + for (const ph of placeholders) { + if (ph.dataset.mdImageSwapped === '1') continue; + const ref = ph.getAttribute('data-md-image-src'); + if (!ref) continue; + const key = ref.split(':', 2)[1]; + if (!key) continue; + const alt = ph.getAttribute('data-md-image-alt') ?? ''; + + const img = document.createElement('img'); + img.src = `/api/documents/${documentId}/images/${encodeURIComponent(key)}/raw`; + img.alt = alt; + img.loading = 'lazy'; + img.className = 'md-image'; + img.onerror = () => { + // fetch 실패 → placeholder 복원 (img 만 제거, figure 는 그대로) + try { img.remove(); } catch {} + delete ph.dataset.mdImageSwapped; + }; + // figure 안 내용을 img 로 교체 (figure 자체는 보존 → 다음 effect 재실행 시 idempotent 마커 검사 가능) + ph.replaceChildren(img); + ph.dataset.mdImageSwapped = '1'; + } + });
diff --git a/frontend/src/lib/utils/docMarkdown.ts b/frontend/src/lib/utils/docMarkdown.ts index 39b213c..6db85a4 100644 --- a/frontend/src/lib/utils/docMarkdown.ts +++ b/frontend/src/lib/utils/docMarkdown.ts @@ -1,12 +1,14 @@ /** - * 문서 본문 markdown 렌더 (Phase 1C — MarkdownDoc 컴포넌트 전용). + * 문서 본문 markdown 렌더 (Phase 1C + Phase 1B.5 — MarkdownDoc 컴포넌트 전용). * * mathMarkdown.ts (study 의 문제·해설용) 와 별도 인스턴스를 둬서 study 측 동작에 영향 없음. * * 차이점: * - GFM heading id (anchor 용 id 자동 부여, prefix=doc-) - * - 이미지는 placeholder card 로 렌더 (1B.5 ImgAuth wiring 전까지 깨진 아이콘 노출 방지). - * 원본 src 는 data-md-image-src 에 escape 되어 보존됨 — 1B.5 에서 selector 로 복원. + * - 이미지는 placeholder card 로 렌더. `data-md-image-internal="1"` 인 경우 (href 가 + * `docimg:img_NNN`) MarkdownDoc 의 selector 가 mount 후 실제 로 교체. + * `data-md-image-internal="0"` (외부 URL) 은 placeholder 유지 — 외부 이미지 자동 + * fetch 회피 (privacy + dependency). * * KaTeX / DOMPurify 정책은 mathMarkdown.ts 의 정책과 동일. */ @@ -46,12 +48,15 @@ docMarked.use({ image(token: any): string { const rawHref = (token?.href ?? '') as string; const rawAlt = (token?.text ?? '') as string; + const isInternal = rawHref.startsWith('docimg:'); const basename = rawHref.split('/').pop() ?? rawHref; const labelSrc = rawAlt || basename || '이미지'; const safeHref = escAttr(rawHref); + const safeAlt = escAttr(rawAlt); const safeLabel = escText(`[이미지: ${labelSrc} — 아직 표시되지 않음]`); + const internalFlag = isInternal ? '1' : '0'; return ( - `
` + + `
` + `
` + `` + `${safeLabel}` + @@ -65,8 +70,18 @@ docMarked.use({ const SANITIZE_OPTS = { USE_PROFILES: { html: true }, // KaTeX (style + aria-hidden), heading anchor (id), 이미지 마킹 (data-md-img, - // data-md-image-src — 1B.5 ImgAuth selector 용), figure caption (figure/figcaption) - ADD_ATTR: ['style', 'aria-hidden', 'id', 'data-md-img', 'data-md-image-src', 'loading'], + // data-md-image-src + data-md-image-internal + data-md-image-alt — 1B.5 ImgAuth + // selector 용), figure caption (figure/figcaption) + ADD_ATTR: [ + 'style', + 'aria-hidden', + 'id', + 'data-md-img', + 'data-md-image-src', + 'data-md-image-internal', + 'data-md-image-alt', + 'loading', + ], ADD_TAGS: ['figure', 'figcaption'], FORBID_TAGS: ['script', 'iframe', 'object', 'embed', 'link', 'meta'], FORBID_ATTR: ['onerror', 'onclick', 'onload', 'onmouseover', 'onfocus'], diff --git a/frontend/src/routes/documents/[id]/+page.svelte b/frontend/src/routes/documents/[id]/+page.svelte index 3b950e3..bd39baa 100644 --- a/frontend/src/routes/documents/[id]/+page.svelte +++ b/frontend/src/routes/documents/[id]/+page.svelte @@ -249,6 +249,7 @@ {#if viewerType === 'markdown' || viewerType === 'hwp-markdown'} {#if pdfViewMode === 'markdown' && canShowMarkdown} {#if doc.md_content || doc.extracted_text} dict: + quality = row.md_extraction_quality + return { + "id": row.id, + "md_status": row.md_status, + "md_content_hash": row.md_content_hash, + "md_extraction_engine": row.md_extraction_engine, + "md_extraction_engine_version": row.md_extraction_engine_version, + "md_extraction_quality": json.dumps(quality, ensure_ascii=False) if quality else "", + "md_generated_at": row.md_generated_at.isoformat() if row.md_generated_at else "", + "file_format": row.file_format, + "file_path": row.file_path, + "title": row.title or "", + } + + +async def run(*, apply: bool, only_ids: set[int] | None, snapshot_csv: str | None) -> int: + database_url = os.getenv( + "DATABASE_URL", + "postgresql+asyncpg://pkm:pkm@localhost:5432/pkm", + ) + + engine = create_async_engine(database_url) + session_factory = async_sessionmaker( + engine, class_=AsyncSession, expire_on_commit=False + ) + + try: + async with session_factory() as session: + rows = (await session.execute(text(CANDIDATES_SQL))).all() + if only_ids: + rows = [r for r in rows if r.id in only_ids] + + print(f"=== marker success 후보 = {len(rows)}건 ===") + if not rows: + print("후보 없음 — 종료.") + return 0 + + # pre-snapshot CSV 출력 + buf = StringIO() + writer = csv.DictWriter( + buf, + fieldnames=[ + "id", "md_status", "md_content_hash", "md_extraction_engine", + "md_extraction_engine_version", "md_extraction_quality", + "md_generated_at", "file_format", "file_path", "title", + ], + ) + writer.writeheader() + for row in rows: + writer.writerow(_serialize_row(row)) + csv_text = buf.getvalue() + + if snapshot_csv: + with open(snapshot_csv, "w", encoding="utf-8") as f: + f.write(csv_text) + print(f"[snapshot] {snapshot_csv} 에 {len(rows)}행 기록") + else: + print("\n=== Pre-snapshot CSV ===") + print(csv_text) + + if not apply: + print(f"\n[dry-run] {len(rows)}건 영향. --apply 로 실제 enqueue.") + return 0 + + # enqueue — UNIQUE(document_id, stage) WHERE status IN ('pending', 'processing') + # 가 있으므로 활성 markdown 행이 없는 doc 만 통과. 충돌 시 silent skip. + ENQUEUE_SQL = text(""" + INSERT INTO processing_queue (document_id, stage, status, payload) + VALUES (:doc_id, 'markdown', 'pending', :payload::jsonb) + ON CONFLICT DO NOTHING + """) + + payload = json.dumps({ + "force_reprocess": True, + "reason": "phase_1b5_imgauth_targeted_reprocess", + }) + + inserted = 0 + for row in rows: + result = await session.execute( + ENQUEUE_SQL, {"doc_id": row.id, "payload": payload} + ) + if result.rowcount > 0: + inserted += 1 + + await session.commit() + print(f"\n[apply] enqueue 완료 — {inserted}/{len(rows)} 건 신규 markdown 큐 추가") + print(" (skip = 이미 활성 markdown 큐 행이 있는 문서)") + return 0 + finally: + await engine.dispose() + + +def _parse_only_ids(arg: str | None) -> set[int] | None: + if not arg: + return None + out: set[int] = set() + for part in arg.split(","): + part = part.strip() + if part: + out.add(int(part)) + return out or None + + +def main() -> int: + parser = argparse.ArgumentParser(description=__doc__) + parser.add_argument("--apply", action="store_true", help="실제 enqueue (기본 dry-run)") + parser.add_argument("--dry-run", action="store_true", help="명시적 dry-run (default 동등)") + parser.add_argument( + "--only", type=str, default=None, + help="쉼표 구분 doc_id 화이트리스트 (sample 검증용, 예: 4809,5127,5180)", + ) + parser.add_argument( + "--snapshot-csv", type=str, default=None, + help="pre-snapshot 을 stdout 대신 이 경로의 CSV 파일로 저장", + ) + args = parser.parse_args() + + if args.apply and args.dry_run: + parser.error("--apply 와 --dry-run 동시 지정 불가") + + only_ids = _parse_only_ids(args.only) + return asyncio.run(run( + apply=args.apply, + only_ids=only_ids, + snapshot_csv=args.snapshot_csv, + )) + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/services/marker/server.py b/services/marker/server.py index 0d566f0..39e35f9 100644 --- a/services/marker/server.py +++ b/services/marker/server.py @@ -1,9 +1,14 @@ -"""marker-service — POST /convert: PDF → markdown (텍스트만, 이미지 제외). +"""marker-service — POST /convert: PDF → markdown + 추출 이미지 base64. -Phase 1B Round 5 — /ready 정확한 status code, warmup 실패 가시화, 변환 실패 = 422. -plan: ~/.claude/plans/plan-idempotent-sundae.md +Phase 1B (2026-05-01) — 텍스트만 응답, 이미지 폐기. +Phase 1B.5 (본 변경) — `_images` 직렬화해서 base64 응답에 포함. NAS write 권한이 + 없는 stateless 변환기 유지 (fastapi 가 NAS persist 담당). + +plan: ~/.claude/plans/piped-humming-crystal.md """ +import base64 import hashlib +import io import logging import os import threading @@ -11,7 +16,7 @@ import time from pathlib import Path from fastapi import FastAPI, HTTPException, Response -from pydantic import BaseModel +from pydantic import BaseModel, Field from marker.converters.pdf import PdfConverter from marker.models import create_model_dict @@ -35,6 +40,12 @@ _warmup_done = False _warmup_error: str | None = None _warmup_lock = threading.Lock() +# 이미지 응답 cap. base64 응답 크기 폭주 방지. 사용자 PDF 풀 측정 (Phase 1D) 시 +# 가장 이미지 많은 문서가 ~30건 수준 → 200 은 안전 마진. 초과 시 truncate flag 응답. +MAX_IMAGES_PER_DOC = int(os.getenv("MARKER_MAX_IMAGES_PER_DOC", "200")) +# per-image 최대 raw bytes (base64 전). 그래픽이 많은 풀페이지 스캔 회피. +MAX_BYTES_PER_IMAGE = int(os.getenv("MARKER_MAX_BYTES_PER_IMAGE", str(10 * 1024 * 1024))) + def _ensure_warmup() -> None: """첫 /convert 또는 startup hook 시 모델 로드. HF cache volume 활용.""" @@ -69,6 +80,15 @@ class ConvertRequest(BaseModel): max_pages: int | None = None +class ConvertImage(BaseModel): + """marker 추출 이미지 1건. fastapi 가 NAS 에 쓰고 docimg:img_NNN 으로 ref 정규화.""" + slug: str # marker 원본 slug (예: '_page_0_Picture_3.jpeg') + format: str # 'png' | 'jpeg' | 'webp' | 'gif' + width: int | None = None + height: int | None = None + bytes_b64: str # base64-encoded raw bytes + + class ConvertResponse(BaseModel): md_content: str md_content_hash: str @@ -76,6 +96,8 @@ class ConvertResponse(BaseModel): engine_version: str elapsed_ms: int raw_metrics: dict + images: list[ConvertImage] = Field(default_factory=list) + images_truncated: bool = False @app.get("/ready") @@ -124,9 +146,11 @@ async def convert(req: ConvertRequest): }, ) from exc - md_text, _meta, _images = text_from_rendered(rendered) + md_text, _meta, raw_images = text_from_rendered(rendered) elapsed_ms = int((time.monotonic() - start) * 1000) + images_payload, truncated = _serialize_images(raw_images, str(p)) + return ConvertResponse( md_content=md_text, md_content_hash=hashlib.sha256(md_text.encode("utf-8")).hexdigest(), @@ -135,6 +159,63 @@ async def convert(req: ConvertRequest): elapsed_ms=elapsed_ms, raw_metrics={ "page_count": getattr(rendered, "page_count", None), - "image_count_extracted": len(_images) if _images else 0, + "image_count_extracted": len(raw_images) if raw_images else 0, + "image_count_returned": len(images_payload), }, + images=images_payload, + images_truncated=truncated, ) + + +def _serialize_images(raw_images, src_path: str) -> tuple[list[ConvertImage], bool]: + """marker 의 `_images` (dict[slug, PIL.Image]) → base64 ConvertImage 리스트. + + 가드: + - MAX_IMAGES_PER_DOC 초과 시 head 만 반환 + truncated=True + - per-image 직렬화 실패 시 해당 이미지만 skip + warn (전체 fail 안 함) + - per-image 결과 byte 크기가 MAX_BYTES_PER_IMAGE 초과 시 skip + warn + """ + if not raw_images: + return [], False + + items = list(raw_images.items()) + truncated = len(items) > MAX_IMAGES_PER_DOC + if truncated: + logger.warning( + f"[marker-service] images truncated path={src_path} " + f"total={len(items)} cap={MAX_IMAGES_PER_DOC}" + ) + items = items[:MAX_IMAGES_PER_DOC] + + out: list[ConvertImage] = [] + for slug, pil_img in items: + try: + fmt_raw = (pil_img.format or "PNG").upper() + # WebP/GIF 도 marker 가 emit 가능하지만 본 1B.5 기준은 PNG/JPEG 우선. + # 알 수 없는 포맷이면 PNG 로 강제 (lossless re-encode). + fmt = fmt_raw if fmt_raw in {"PNG", "JPEG", "WEBP", "GIF"} else "PNG" + buf = io.BytesIO() + pil_img.save(buf, format=fmt) + raw_bytes = buf.getvalue() + if len(raw_bytes) > MAX_BYTES_PER_IMAGE: + logger.warning( + f"[marker-service] image too large skipped path={src_path} " + f"slug={slug} bytes={len(raw_bytes)} cap={MAX_BYTES_PER_IMAGE}" + ) + continue + out.append( + ConvertImage( + slug=slug, + format=fmt.lower(), + width=pil_img.width, + height=pil_img.height, + bytes_b64=base64.b64encode(raw_bytes).decode("ascii"), + ) + ) + except Exception as exc: + logger.warning( + f"[marker-service] image serialize failed path={src_path} " + f"slug={slug}: {type(exc).__name__}: {exc}" + ) + continue + return out, truncated diff --git a/tests/test_marker_image_persist.py b/tests/test_marker_image_persist.py new file mode 100644 index 0000000..a30fc51 --- /dev/null +++ b/tests/test_marker_image_persist.py @@ -0,0 +1,169 @@ +"""Phase 1B.5 ImgAuth — marker_worker 의 순수 헬퍼 단위 테스트. + +DB / NAS / marker-service 실접속이 필요한 통합 테스트는 별 파일 (배포 후 실행). +본 파일은 image-bytes mocking 만으로 검증 가능한 부분 (rewrite 로직 + persist 매핑). + +plan: ~/.claude/plans/piped-humming-crystal.md +""" + +from __future__ import annotations + +import base64 +import os +import sys + +import pytest + +# tests/ → 프로젝트 루트 → app/ +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "app")) + +from workers.marker_worker import ( + _persist_images_to_nas, + _rewrite_image_refs, +) + + +# ─── _rewrite_image_refs ─── + + +def test_rewrite_exact_slug_match(): + md = "본문\n\n![도식 1](_page_0_Picture_3.jpeg)\n\n뒤" + out = _rewrite_image_refs(md, {"_page_0_Picture_3.jpeg": "img_001"}) + assert "![도식 1](docimg:img_001)" in out + assert "_page_0_Picture_3.jpeg" not in out + + +def test_rewrite_basename_match_with_subdir_href(): + md = "![](sub/_page_2_Figure_1.png)" + out = _rewrite_image_refs(md, {"_page_2_Figure_1.png": "img_007"}) + assert out == "![](docimg:img_007)" + + +def test_rewrite_preserves_external_urls(): + md = "외부 ![logo](https://example.com/x.png) 와 내부 ![](slug.png)" + out = _rewrite_image_refs(md, {"slug.png": "img_002"}) + # 외부 URL 는 그대로, 내부 slug 만 docimg 로 치환. + assert "https://example.com/x.png" in out + assert "(docimg:img_002)" in out + + +def test_rewrite_preserves_alt_text(): + md = "![긴 한국어 alt 설명 with $math$](slug.jpeg)" + out = _rewrite_image_refs(md, {"slug.jpeg": "img_001"}) + assert out == "![긴 한국어 alt 설명 with $math$](docimg:img_001)" + + +def test_rewrite_no_slug_map_is_noop(): + md = "![](slug.png)" + assert _rewrite_image_refs(md, {}) == md + + +def test_rewrite_unknown_slug_kept(): + md = "![](unknown_slug.png)" + out = _rewrite_image_refs(md, {"other.png": "img_001"}) + assert out == md + + +def test_rewrite_idempotent_on_already_normalized(): + """이미 docimg:img_NNN 인 ref 는 slug 매칭 실패 → 변경 없음 (재변환 idempotent).""" + md = "![alt](docimg:img_001)" + out = _rewrite_image_refs(md, {"_page_0.jpeg": "img_001"}) + assert out == md + + +def test_rewrite_multiple_images(): + md = "![a](s1.png) text ![b](s2.png) ![c](s3.jpg)" + out = _rewrite_image_refs(md, { + "s1.png": "img_001", + "s2.png": "img_002", + "s3.jpg": "img_003", + }) + assert "(docimg:img_001)" in out + assert "(docimg:img_002)" in out + assert "(docimg:img_003)" in out + + +# ─── _persist_images_to_nas ─── + + +def _make_png_bytes() -> bytes: + """1x1 transparent PNG (signature + IHDR + IDAT + IEND).""" + return bytes.fromhex( + "89504e470d0a1a0a" # signature + "0000000d49484452" # IHDR len + type + "00000001000000010806000000" # 1x1 RGBA + "1f15c4890000000d4944415478" + "9c626001000000ffff03000006" + "00057ce4ec5d0000000049454e44ae426082" + ) + + +def test_persist_sequential_image_keys(tmp_path, monkeypatch): + # NAS root 를 tmp_path 로 redirect + monkeypatch.setattr( + "workers.marker_worker.EXTRACTED_IMAGES_ROOT", + tmp_path / "extracted_images", + ) + + payload = [ + {"slug": "_page_0.png", "format": "png", + "bytes_b64": base64.b64encode(_make_png_bytes()).decode("ascii")}, + {"slug": "_page_1.png", "format": "png", + "bytes_b64": base64.b64encode(_make_png_bytes()).decode("ascii")}, + {"slug": "_page_2.png", "format": "png", + "bytes_b64": base64.b64encode(_make_png_bytes()).decode("ascii")}, + ] + saved = _persist_images_to_nas(document_id=999, images_resp=payload) + + assert [s["image_key"] for s in saved] == ["img_001", "img_002", "img_003"] + assert all(s["mime_type"] == "image/png" for s in saved) + assert all(s["file_size"] > 0 for s in saved) + assert all(s["source_slug"].startswith("_page_") for s in saved) + # NAS 파일 실재 확인 + for s in saved: + from pathlib import Path + assert Path(s["file_path"]).is_file() + + +def test_persist_idempotent_on_rerun(tmp_path, monkeypatch): + """같은 doc_id 두번 persist → 같은 image_key 같은 path 에 overwrite.""" + monkeypatch.setattr( + "workers.marker_worker.EXTRACTED_IMAGES_ROOT", + tmp_path / "extracted_images", + ) + raw = _make_png_bytes() + payload = [{"slug": "_page_0.png", "format": "png", + "bytes_b64": base64.b64encode(raw).decode("ascii")}] + + s1 = _persist_images_to_nas(document_id=42, images_resp=payload) + s2 = _persist_images_to_nas(document_id=42, images_resp=payload) + assert s1[0]["image_key"] == s2[0]["image_key"] == "img_001" + assert s1[0]["file_path"] == s2[0]["file_path"] + assert s1[0]["content_hash"] == s2[0]["content_hash"] + + +def test_persist_skips_invalid_base64(tmp_path, monkeypatch): + """깨진 base64 는 skip — 다른 이미지 처리는 계속.""" + monkeypatch.setattr( + "workers.marker_worker.EXTRACTED_IMAGES_ROOT", + tmp_path / "extracted_images", + ) + raw = _make_png_bytes() + payload = [ + {"slug": "_page_0.png", "format": "png", "bytes_b64": "@@@invalid@@@"}, + {"slug": "_page_1.png", "format": "png", + "bytes_b64": base64.b64encode(raw).decode("ascii")}, + ] + saved = _persist_images_to_nas(document_id=7, images_resp=payload) + # 첫 번째 invalid skip, 두 번째만 저장. seq 는 그대로 진행 → img_002 가 됨. + assert len(saved) == 1 + assert saved[0]["image_key"] == "img_002" + assert saved[0]["source_slug"] == "_page_1.png" + + +def test_persist_empty_images_returns_empty(tmp_path, monkeypatch): + monkeypatch.setattr( + "workers.marker_worker.EXTRACTED_IMAGES_ROOT", + tmp_path / "extracted_images", + ) + assert _persist_images_to_nas(document_id=1, images_resp=[]) == []