"""mineru-service — POST /convert: PDF → markdown + 추출 이미지 base64. marker-service 대체(MinerU 2.5 VLM). **marker 의 /convert 계약을 그대로 복제**해서 marker_worker 가 엔드포인트만 바꾸면 되도록 한다(요청/응답 동일 shape): 요청: {file_path, max_pages?, start_page?, end_page?} (page = 1-based inclusive) 응답: {md_content, md_content_hash, engine, engine_version, elapsed_ms, raw_metrics, images:[{slug, format, width, height, bytes_b64}], images_truncated} 설계 노트: - **page range 는 PyMuPDF 로 직접 슬라이스**해서 MinerU 에 넘긴다(start_page..end_page → 0-based [a,b] 페이지만 담은 새 PDF bytes). MinerU 의 `end_page_id=0 falsy 무시` 버그 회피. 40p 윈도우 분할은 marker_worker 가 그대로 담당. (검증: fitz 슬라이스 렌더 = 원본과 동일 품질.) - **★ 반드시 async 엔진(`aio_do_parse`) 사용.** 동기 `do_parse`(vllm-engine sync)는 본 모델 (MinerU2.5-Pro-2605-1.2B)에서 layout 토큰 malformed → 빈 md 산출(실측 G1-2). async (`aio_do_parse` = vllm-async-engine, mineru CLI 가 쓰는 정상 경로) = 정상 출력. - **이미지 = stateless**: marker 처럼 NAS write 안 함. MinerU 가 md 에 박는 `![](images/.jpg)` href 를 그대로 slug 으로 반환 → fastapi(marker_worker)의 `_rewrite_image_refs` 가 basename 매칭으로 `docimg:img_NNN` 정규화 + NAS persist. (계약 무변) - **VRAM 캡**: `MINERU_GPU_MEMORY_UTILIZATION`(vLLM 분율, 0.40→~6GB 실측). compose 의 `MINERU_VIRTUAL_VRAM_SIZE` 도 무해(실측 정상)하나 출력엔 무관 — 캡은 분율로 충분. backend=`vlm-engine`(기본 hybrid-engine 은 다중모델 로드 OOM, 반드시 명시). 엔진은 첫 변환(또는 startup warmup) 시 1회 로드 — MinerU ModelSingleton 캐시. 단일 GPU 라 변환은 _engine_lock 으로 직렬화. """ import asyncio import base64 import hashlib import inspect import io import logging import os import time import unicodedata from pathlib import Path import fitz # PyMuPDF — page 슬라이스 + 페이지수 from fastapi import FastAPI, HTTPException, Response from PIL import Image from pydantic import BaseModel, Field logger = logging.getLogger("mineru-service") logging.basicConfig(level=logging.INFO) app = FastAPI() try: import importlib.metadata _engine_version = importlib.metadata.version("mineru") except Exception: _engine_version = "unknown" # ---- 설정 (compose env 로 override) ----------------------------------------- MINERU_BACKEND = os.getenv("MINERU_BACKEND", "vlm-engine") MINERU_LANG = os.getenv("MINERU_LANG", "korean") GPU_MEM_UTIL = float(os.getenv("MINERU_GPU_MEMORY_UTILIZATION", "0.40")) MAX_IMAGES_PER_DOC = int(os.getenv("MINERU_MAX_IMAGES_PER_DOC", "200")) MAX_BYTES_PER_IMAGE = int(os.getenv("MINERU_MAX_BYTES_PER_IMAGE", str(10 * 1024 * 1024))) MAX_PAGES_HARD = int(os.getenv("MINERU_MAX_PAGES_HARD", "200")) # 1-shot max_pages 안전장치 # self-timeout — 변환/워밍이 vLLM 행으로 _engine_lock 을 영구 점유해 서비스가 wedge 되는 것을 차단. # (클라이언트 marker_worker 는 300s 로 포기하나 서버측 inflight 는 자동 취소 안 됨 → 서버 자체 상한 필요.) PARSE_TIMEOUT_S = float(os.getenv("MINERU_PARSE_TIMEOUT_S", "600")) WARMUP_TIMEOUT_S = float(os.getenv("MINERU_WARMUP_TIMEOUT_S", "1200")) # 최초 모델 다운로드(~2.4GB) 여유 _PRELOAD = os.getenv("MINERU_PRELOAD", "1") != "0" # ---- 엔진 상태 --------------------------------------------------------------- _warmup_done = False _warmup_error: str | None = None # 단일 GPU async 엔진 — warmup + convert 직렬화(엔진 1개, 임시디렉토리/싱글톤 경합 차단). _engine_lock = asyncio.Lock() def _is_engine_fatal(exc: BaseException) -> bool: """OOM/CUDA 류 = 엔진 상태 오염 가능 → 재워밍 강제 대상(타임아웃은 호출측에서 별도 판정).""" s = f"{type(exc).__name__} {exc}".lower() return any( k in s for k in ("out of memory", "oom", "cuda", "cublas", "device-side", "illegal memory") ) async def _run_mineru(pdf_bytes: bytes, lang: str) -> tuple[str, list[dict]]: """슬라이스된 PDF bytes → (markdown, 이미지 dict 리스트). **async 엔진 경로.** 호출자(_ensure_warmup / convert)가 _engine_lock 을 잡은 상태로 호출한다. 이미지 dict: {slug, format, width, height, raw_bytes}. slug = md href 그대로. """ import glob import tempfile from mineru.cli.common import aio_do_parse with tempfile.TemporaryDirectory(prefix="mineru_") as td: candidate = { "output_dir": td, "pdf_file_names": ["doc"], "pdf_bytes_list": [pdf_bytes], "p_lang_list": [lang], "backend": MINERU_BACKEND, "formula_enable": True, "table_enable": True, "f_dump_md": True, "f_dump_content_list": True, "f_dump_middle_json": False, "f_dump_model_output": False, "f_dump_orig_pdf": False, "f_draw_layout_bbox": False, "f_draw_span_bbox": False, "gpu_memory_utilization": GPU_MEM_UTIL, } sig = inspect.signature(aio_do_parse) has_var_kw = any( p.kind == inspect.Parameter.VAR_KEYWORD for p in sig.parameters.values() ) kwargs = candidate if has_var_kw else { k: v for k, v in candidate.items() if k in sig.parameters } await aio_do_parse(**kwargs) md_files = sorted(glob.glob(f"{td}/**/*.md", recursive=True)) if not md_files: raise RuntimeError("mineru produced no markdown output") md_path = Path(md_files[0]) md_text = md_path.read_text(encoding="utf-8", errors="replace") images: list[dict] = [] img_dir = md_path.parent / "images" if img_dir.is_dir(): for img_file in sorted(img_dir.iterdir()): if not img_file.is_file(): continue raw = img_file.read_bytes() slug = f"images/{img_file.name}" # md href 와 정확히 일치 w = h = None try: with Image.open(io.BytesIO(raw)) as im: w, h = im.width, im.height fmt = (im.format or "JPEG").lower() except Exception: fmt = img_file.suffix.lstrip(".").lower() or "jpeg" images.append( {"slug": slug, "format": fmt, "width": w, "height": h, "raw_bytes": raw} ) return md_text, images async def _ensure_warmup() -> None: """첫 /convert 또는 startup hook 시 1-page 합성 PDF 로 엔진+모델 적재.""" global _warmup_done, _warmup_error if _warmup_done: return async with _engine_lock: if _warmup_done: return try: logger.info("[mineru-service] warmup start (async engine load + model fetch)") doc = fitz.open() page = doc.new_page() page.insert_text((72, 72), "MinerU warmup.") warmup_bytes = doc.tobytes() doc.close() await asyncio.wait_for(_run_mineru(warmup_bytes, MINERU_LANG), timeout=WARMUP_TIMEOUT_S) _warmup_done = True _warmup_error = None logger.info(f"[mineru-service] warmup done engine_version={_engine_version}") except Exception as exc: _warmup_error = f"{type(exc).__name__}: {exc}" logger.exception("[mineru-service] warmup failed") raise @app.on_event("startup") async def startup(): if _PRELOAD: asyncio.create_task(_ensure_warmup()) # ---- 계약 모델 (marker 와 동일 shape) ---------------------------------------- class ConvertRequest(BaseModel): file_path: str max_pages: int | None = None start_page: int | None = None # 1-based inclusive end_page: int | None = None # 1-based inclusive class ConvertImage(BaseModel): slug: str format: str width: int | None = None height: int | None = None bytes_b64: str class ConvertResponse(BaseModel): md_content: str md_content_hash: str engine: str engine_version: str elapsed_ms: int raw_metrics: dict images: list[ConvertImage] = Field(default_factory=list) images_truncated: bool = False @app.get("/health") def health(): return {"status": "ok", "service": "mineru-service"} @app.get("/ready") async def ready(response: Response): """marker /ready 의미 복제: warmup_failed 만 503, idle/warming=200(depends_on 굳음 방지).""" if _warmup_error: response.status_code = 503 return {"status": "warmup_failed", "engine": "mineru", "engine_version": _engine_version, "error": _warmup_error} if not _warmup_done: return {"status": "warming_up" if _PRELOAD else "idle", "engine": "mineru", "engine_version": _engine_version, "models_loaded": False} return {"status": "ready", "engine": "mineru", "engine_version": _engine_version, "models_loaded": True} def _resolve_path(file_path: str) -> Path | None: """NFC(DB) vs NFD(NFS) 한글 경로 정규화 차이 흡수. ocr/server.py 와 동일 패턴 (필수 — 한글명 파일은 NFS=NFD 저장이라 DB 의 NFC 경로로는 is_file=False).""" for c in (file_path, unicodedata.normalize("NFD", file_path), unicodedata.normalize("NFC", file_path)): p = Path(c) if p.exists(): return p parent = Path(file_path).parent if parent.exists(): target = unicodedata.normalize("NFC", Path(file_path).name) for child in parent.iterdir(): if unicodedata.normalize("NFC", child.name) == target: return child return None def _slice_pdf(src_path: Path, start_page: int | None, end_page: int | None, max_pages: int | None) -> tuple[bytes, int]: """요청 page 범위(1-based inclusive)만 담은 새 PDF bytes + 변환 페이지수 반환.""" with fitz.open(src_path) as src: n = src.page_count if start_page is not None and end_page is not None: a = max(0, start_page - 1) b = min(n - 1, end_page - 1) else: a = 0 cap = max_pages if max_pages is not None else MAX_PAGES_HARD b = min(n - 1, cap - 1) if b < a: raise HTTPException(422, detail={"code": "bad_page_range", "message": f"a={a} b={b} n={n}"}) out = fitz.open() out.insert_pdf(src, from_page=a, to_page=b) pdf_bytes = out.tobytes() out.close() return pdf_bytes, (b - a + 1) def _serialize_images(images: list[dict], src_path: str) -> tuple[list[ConvertImage], bool]: """이미지 dict 리스트 → base64 ConvertImage 리스트 (marker 가드 동일).""" truncated = len(images) > MAX_IMAGES_PER_DOC if truncated: logger.warning(f"[mineru-service] images truncated path={src_path} " f"total={len(images)} cap={MAX_IMAGES_PER_DOC}") images = images[:MAX_IMAGES_PER_DOC] out: list[ConvertImage] = [] for img in images: raw = img["raw_bytes"] if len(raw) > MAX_BYTES_PER_IMAGE: logger.warning(f"[mineru-service] image too large skipped path={src_path} " f"slug={img['slug']} bytes={len(raw)} cap={MAX_BYTES_PER_IMAGE}") continue out.append(ConvertImage( slug=img["slug"], format=img["format"], width=img.get("width"), height=img.get("height"), bytes_b64=base64.b64encode(raw).decode("ascii"), )) return out, truncated @app.post("/convert", response_model=ConvertResponse) async def convert(req: ConvertRequest): global _warmup_done p = _resolve_path(req.file_path) if p is None or not p.is_file(): raise HTTPException(404, detail={"code": "file_not_found", "message": req.file_path}) if req.start_page is not None and req.end_page is not None: if req.start_page < 1 or req.end_page < req.start_page: raise HTTPException(422, detail={"code": "bad_page_range", "message": f"start_page={req.start_page} end_page={req.end_page}"}) pdf_bytes, page_count = _slice_pdf(p, req.start_page, req.end_page, req.max_pages) await _ensure_warmup() # 엔진 로드 보장(내부에서 _engine_lock 잡았다 놓음) async with _engine_lock: # 실제 변환 직렬화(단일 GPU) start = time.monotonic() try: md_text, raw_images = await asyncio.wait_for( _run_mineru(pdf_bytes, MINERU_LANG), timeout=PARSE_TIMEOUT_S ) except HTTPException: raise except Exception as exc: # 타임아웃(엔진 행) 또는 OOM/CUDA 류면 엔진 오염 가능 → 다음 요청이 재워밍하도록 리셋. # 재워밍까지 실패하면 _ensure_warmup 이 _warmup_error 설정 → /ready 503 → healthcheck # 재시작으로 escalate(영구 degradation 차단). 일시 OOM 이면 재워밍 성공 후 정상화. if isinstance(exc, (asyncio.TimeoutError, TimeoutError)) or _is_engine_fatal(exc): _warmup_done = False logger.error("[mineru-service] engine reset (timeout/fatal) path=%s: %s", p, exc) logger.exception(f"[mineru-service] conversion failed path={p}: {exc}") raise HTTPException(422, detail={"code": "conversion_failed", "message": f"{type(exc).__name__}: {exc}"}) from exc elapsed_ms = int((time.monotonic() - start) * 1000) images_payload, truncated = _serialize_images(raw_images, str(p)) return ConvertResponse( md_content=md_text, md_content_hash=hashlib.sha256(md_text.encode("utf-8")).hexdigest(), engine="mineru", engine_version=_engine_version, elapsed_ms=elapsed_ms, raw_metrics={ "page_count": page_count, "image_count_extracted": len(raw_images), "image_count_returned": len(images_payload), }, images=images_payload, images_truncated=truncated, )