Files
hyungi_document_server/services/mineru/server.py
T
hyungi bb929f88d0 feat(extraction): MinerU 2.5 VLM 추출 서비스 + 워커 엔드포인트 env화
marker-service(Surya, ~10GB) 대체 후보. MinerU2.5-Pro-2605-1.2B VLM(vllm-async-engine,
~5.9GB 고정). marker /convert 계약 복제(file_path·start/end·md+base64 images) → 워커는
MARKER_ENDPOINT env 플립만으로 전환. 단일카드(16GB) 검색스택 공존, 40p 윈도우 무변.

- services/mineru: Dockerfile(vllm/vllm-openai:v0.21.0 + mineru[core]) + async server.py
  (NFC/NFD 한글경로 resolver, PyMuPDF page 슬라이스, gpu_memory_utilization 캡)
- docker-compose: mineru-service profile-gated(기본 미기동=marker 무영향) + mineru_models vol
- marker_worker: MARKER_ENDPOINT 하드코딩 → env(기본 marker, 무변)

격리 PoC A/B 8/8 게이트 PASS (한국어/표/수식LaTeX/heading/figure/40p VRAM).
컷오버(env 플립+marker 제거)는 별 단계(읽기뷰 회귀 0 게이트).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-18 15:58:55 +09:00

316 lines
13 KiB
Python

"""mineru-service — POST /convert: PDF → markdown + 추출 이미지 base64.
marker-service 대체(MinerU 2.5 VLM). **marker 의 /convert 계약을 그대로 복제**해서
marker_worker 가 엔드포인트만 바꾸면 되도록 한다(요청/응답 동일 shape):
요청: {file_path, max_pages?, start_page?, end_page?} (page = 1-based inclusive)
응답: {md_content, md_content_hash, engine, engine_version, elapsed_ms,
raw_metrics, images:[{slug, format, width, height, bytes_b64}], images_truncated}
설계 노트:
- **page range 는 PyMuPDF 로 직접 슬라이스**해서 MinerU 에 넘긴다(start_page..end_page →
0-based [a,b] 페이지만 담은 새 PDF bytes). MinerU 의 `end_page_id=0 falsy 무시` 버그 회피.
40p 윈도우 분할은 marker_worker 가 그대로 담당. (검증: fitz 슬라이스 렌더 = 원본과 동일 품질.)
- **★ 반드시 async 엔진(`aio_do_parse`) 사용.** 동기 `do_parse`(vllm-engine sync)는 본 모델
(MinerU2.5-Pro-2605-1.2B)에서 layout 토큰 malformed → 빈 md 산출(실측 G1-2). async
(`aio_do_parse` = vllm-async-engine, mineru CLI 가 쓰는 정상 경로) = 정상 출력.
- **이미지 = stateless**: marker 처럼 NAS write 안 함. MinerU 가 md 에 박는 `![](images/<sha>.jpg)`
href 를 그대로 slug 으로 반환 → fastapi(marker_worker)의 `_rewrite_image_refs` 가 basename
매칭으로 `docimg:img_NNN` 정규화 + NAS persist. (계약 무변)
- **VRAM 캡**: `MINERU_GPU_MEMORY_UTILIZATION`(vLLM 분율, 0.40→~6GB 실측). compose 의
`MINERU_VIRTUAL_VRAM_SIZE` 도 무해(실측 정상)하나 출력엔 무관 — 캡은 분율로 충분.
backend=`vlm-engine`(기본 hybrid-engine 은 다중모델 로드 OOM, 반드시 명시).
엔진은 첫 변환(또는 startup warmup) 시 1회 로드 — MinerU ModelSingleton 캐시. 단일 GPU 라
변환은 _engine_lock 으로 직렬화.
"""
import asyncio
import base64
import hashlib
import inspect
import io
import logging
import os
import time
import unicodedata
from pathlib import Path
import fitz # PyMuPDF — page 슬라이스 + 페이지수
from fastapi import FastAPI, HTTPException, Response
from PIL import Image
from pydantic import BaseModel, Field
logger = logging.getLogger("mineru-service")
logging.basicConfig(level=logging.INFO)
app = FastAPI()
try:
import importlib.metadata
_engine_version = importlib.metadata.version("mineru")
except Exception:
_engine_version = "unknown"
# ---- 설정 (compose env 로 override) -----------------------------------------
MINERU_BACKEND = os.getenv("MINERU_BACKEND", "vlm-engine")
MINERU_LANG = os.getenv("MINERU_LANG", "korean")
GPU_MEM_UTIL = float(os.getenv("MINERU_GPU_MEMORY_UTILIZATION", "0.40"))
MAX_IMAGES_PER_DOC = int(os.getenv("MINERU_MAX_IMAGES_PER_DOC", "200"))
MAX_BYTES_PER_IMAGE = int(os.getenv("MINERU_MAX_BYTES_PER_IMAGE", str(10 * 1024 * 1024)))
MAX_PAGES_HARD = int(os.getenv("MINERU_MAX_PAGES_HARD", "200")) # 1-shot max_pages 안전장치
_PRELOAD = os.getenv("MINERU_PRELOAD", "1") != "0"
# ---- 엔진 상태 ---------------------------------------------------------------
_warmup_done = False
_warmup_error: str | None = None
# 단일 GPU async 엔진 — warmup + convert 직렬화(엔진 1개, 임시디렉토리/싱글톤 경합 차단).
_engine_lock = asyncio.Lock()
async def _run_mineru(pdf_bytes: bytes, lang: str) -> tuple[str, list[dict]]:
"""슬라이스된 PDF bytes → (markdown, 이미지 dict 리스트). **async 엔진 경로.**
호출자(_ensure_warmup / convert)가 _engine_lock 을 잡은 상태로 호출한다.
이미지 dict: {slug, format, width, height, raw_bytes}. slug = md href 그대로.
"""
import glob
import tempfile
from mineru.cli.common import aio_do_parse
with tempfile.TemporaryDirectory(prefix="mineru_") as td:
candidate = {
"output_dir": td,
"pdf_file_names": ["doc"],
"pdf_bytes_list": [pdf_bytes],
"p_lang_list": [lang],
"backend": MINERU_BACKEND,
"formula_enable": True,
"table_enable": True,
"f_dump_md": True,
"f_dump_content_list": True,
"f_dump_middle_json": False,
"f_dump_model_output": False,
"f_dump_orig_pdf": False,
"f_draw_layout_bbox": False,
"f_draw_span_bbox": False,
"gpu_memory_utilization": GPU_MEM_UTIL,
}
sig = inspect.signature(aio_do_parse)
has_var_kw = any(
p.kind == inspect.Parameter.VAR_KEYWORD for p in sig.parameters.values()
)
kwargs = candidate if has_var_kw else {
k: v for k, v in candidate.items() if k in sig.parameters
}
await aio_do_parse(**kwargs)
md_files = sorted(glob.glob(f"{td}/**/*.md", recursive=True))
if not md_files:
raise RuntimeError("mineru produced no markdown output")
md_path = Path(md_files[0])
md_text = md_path.read_text(encoding="utf-8", errors="replace")
images: list[dict] = []
img_dir = md_path.parent / "images"
if img_dir.is_dir():
for img_file in sorted(img_dir.iterdir()):
if not img_file.is_file():
continue
raw = img_file.read_bytes()
slug = f"images/{img_file.name}" # md href 와 정확히 일치
w = h = None
try:
with Image.open(io.BytesIO(raw)) as im:
w, h = im.width, im.height
fmt = (im.format or "JPEG").lower()
except Exception:
fmt = img_file.suffix.lstrip(".").lower() or "jpeg"
images.append(
{"slug": slug, "format": fmt, "width": w, "height": h, "raw_bytes": raw}
)
return md_text, images
async def _ensure_warmup() -> None:
"""첫 /convert 또는 startup hook 시 1-page 합성 PDF 로 엔진+모델 적재."""
global _warmup_done, _warmup_error
if _warmup_done:
return
async with _engine_lock:
if _warmup_done:
return
try:
logger.info("[mineru-service] warmup start (async engine load + model fetch)")
doc = fitz.open()
page = doc.new_page()
page.insert_text((72, 72), "MinerU warmup.")
warmup_bytes = doc.tobytes()
doc.close()
await _run_mineru(warmup_bytes, MINERU_LANG)
_warmup_done = True
_warmup_error = None
logger.info(f"[mineru-service] warmup done engine_version={_engine_version}")
except Exception as exc:
_warmup_error = f"{type(exc).__name__}: {exc}"
logger.exception("[mineru-service] warmup failed")
raise
@app.on_event("startup")
async def startup():
if _PRELOAD:
asyncio.create_task(_ensure_warmup())
# ---- 계약 모델 (marker 와 동일 shape) ----------------------------------------
class ConvertRequest(BaseModel):
file_path: str
max_pages: int | None = None
start_page: int | None = None # 1-based inclusive
end_page: int | None = None # 1-based inclusive
class ConvertImage(BaseModel):
slug: str
format: str
width: int | None = None
height: int | None = None
bytes_b64: str
class ConvertResponse(BaseModel):
md_content: str
md_content_hash: str
engine: str
engine_version: str
elapsed_ms: int
raw_metrics: dict
images: list[ConvertImage] = Field(default_factory=list)
images_truncated: bool = False
@app.get("/health")
def health():
return {"status": "ok", "service": "mineru-service"}
@app.get("/ready")
async def ready(response: Response):
"""marker /ready 의미 복제: warmup_failed 만 503, idle/warming=200(depends_on 굳음 방지)."""
if _warmup_error:
response.status_code = 503
return {"status": "warmup_failed", "engine": "mineru",
"engine_version": _engine_version, "error": _warmup_error}
if not _warmup_done:
return {"status": "warming_up" if _PRELOAD else "idle", "engine": "mineru",
"engine_version": _engine_version, "models_loaded": False}
return {"status": "ready", "engine": "mineru",
"engine_version": _engine_version, "models_loaded": True}
def _resolve_path(file_path: str) -> Path | None:
"""NFC(DB) vs NFD(NFS) 한글 경로 정규화 차이 흡수. ocr/server.py 와 동일 패턴
(필수 — 한글명 파일은 NFS=NFD 저장이라 DB 의 NFC 경로로는 is_file=False)."""
for c in (file_path,
unicodedata.normalize("NFD", file_path),
unicodedata.normalize("NFC", file_path)):
p = Path(c)
if p.exists():
return p
parent = Path(file_path).parent
if parent.exists():
target = unicodedata.normalize("NFC", Path(file_path).name)
for child in parent.iterdir():
if unicodedata.normalize("NFC", child.name) == target:
return child
return None
def _slice_pdf(src_path: Path, start_page: int | None, end_page: int | None,
max_pages: int | None) -> tuple[bytes, int]:
"""요청 page 범위(1-based inclusive)만 담은 새 PDF bytes + 변환 페이지수 반환."""
with fitz.open(src_path) as src:
n = src.page_count
if start_page is not None and end_page is not None:
a = max(0, start_page - 1)
b = min(n - 1, end_page - 1)
else:
a = 0
cap = max_pages if max_pages is not None else MAX_PAGES_HARD
b = min(n - 1, cap - 1)
if b < a:
raise HTTPException(422, detail={"code": "bad_page_range",
"message": f"a={a} b={b} n={n}"})
out = fitz.open()
out.insert_pdf(src, from_page=a, to_page=b)
pdf_bytes = out.tobytes()
out.close()
return pdf_bytes, (b - a + 1)
def _serialize_images(images: list[dict], src_path: str) -> tuple[list[ConvertImage], bool]:
"""이미지 dict 리스트 → base64 ConvertImage 리스트 (marker 가드 동일)."""
truncated = len(images) > MAX_IMAGES_PER_DOC
if truncated:
logger.warning(f"[mineru-service] images truncated path={src_path} "
f"total={len(images)} cap={MAX_IMAGES_PER_DOC}")
images = images[:MAX_IMAGES_PER_DOC]
out: list[ConvertImage] = []
for img in images:
raw = img["raw_bytes"]
if len(raw) > MAX_BYTES_PER_IMAGE:
logger.warning(f"[mineru-service] image too large skipped path={src_path} "
f"slug={img['slug']} bytes={len(raw)} cap={MAX_BYTES_PER_IMAGE}")
continue
out.append(ConvertImage(
slug=img["slug"], format=img["format"],
width=img.get("width"), height=img.get("height"),
bytes_b64=base64.b64encode(raw).decode("ascii"),
))
return out, truncated
@app.post("/convert", response_model=ConvertResponse)
async def convert(req: ConvertRequest):
p = _resolve_path(req.file_path)
if p is None or not p.is_file():
raise HTTPException(404, detail={"code": "file_not_found", "message": req.file_path})
if req.start_page is not None and req.end_page is not None:
if req.start_page < 1 or req.end_page < req.start_page:
raise HTTPException(422, detail={"code": "bad_page_range",
"message": f"start_page={req.start_page} end_page={req.end_page}"})
pdf_bytes, page_count = _slice_pdf(p, req.start_page, req.end_page, req.max_pages)
await _ensure_warmup() # 엔진 로드 보장(내부에서 _engine_lock 잡았다 놓음)
async with _engine_lock: # 실제 변환 직렬화(단일 GPU)
start = time.monotonic()
try:
md_text, raw_images = await _run_mineru(pdf_bytes, MINERU_LANG)
except HTTPException:
raise
except Exception as exc:
logger.exception(f"[mineru-service] conversion failed path={p}: {exc}")
raise HTTPException(422, detail={"code": "conversion_failed",
"message": f"{type(exc).__name__}: {exc}"}) from exc
elapsed_ms = int((time.monotonic() - start) * 1000)
images_payload, truncated = _serialize_images(raw_images, str(p))
return ConvertResponse(
md_content=md_text,
md_content_hash=hashlib.sha256(md_text.encode("utf-8")).hexdigest(),
engine="mineru",
engine_version=_engine_version,
elapsed_ms=elapsed_ms,
raw_metrics={
"page_count": page_count,
"image_count_extracted": len(raw_images),
"image_count_returned": len(images_payload),
},
images=images_payload,
images_truncated=truncated,
)