7d882352b8
aio_do_parse 에 자체 타임아웃이 없어 vLLM 행 시 _engine_lock 을 영구 점유 → markdown 변환 전체 마비(컨테이너 재시작 전까지). 클라이언트(marker_worker)는 300s 로 포기하나 서버측 inflight 는 자동 취소 안 됨. - _run_mineru 를 asyncio.wait_for(convert 600s / warmup 1200s)로 감싸 lock 점유 상한. - 타임아웃·OOM/CUDA 류 실패 시 _warmup_done 리셋 → 다음 요청 재워밍. 재워밍도 실패하면 _warmup_error → /ready 503 → healthcheck 재시작으로 escalate(영구 degradation 차단). Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
339 lines
14 KiB
Python
339 lines
14 KiB
Python
"""mineru-service — POST /convert: PDF → markdown + 추출 이미지 base64.
|
|
|
|
marker-service 대체(MinerU 2.5 VLM). **marker 의 /convert 계약을 그대로 복제**해서
|
|
marker_worker 가 엔드포인트만 바꾸면 되도록 한다(요청/응답 동일 shape):
|
|
|
|
요청: {file_path, max_pages?, start_page?, end_page?} (page = 1-based inclusive)
|
|
응답: {md_content, md_content_hash, engine, engine_version, elapsed_ms,
|
|
raw_metrics, images:[{slug, format, width, height, bytes_b64}], images_truncated}
|
|
|
|
설계 노트:
|
|
- **page range 는 PyMuPDF 로 직접 슬라이스**해서 MinerU 에 넘긴다(start_page..end_page →
|
|
0-based [a,b] 페이지만 담은 새 PDF bytes). MinerU 의 `end_page_id=0 falsy 무시` 버그 회피.
|
|
40p 윈도우 분할은 marker_worker 가 그대로 담당. (검증: fitz 슬라이스 렌더 = 원본과 동일 품질.)
|
|
- **★ 반드시 async 엔진(`aio_do_parse`) 사용.** 동기 `do_parse`(vllm-engine sync)는 본 모델
|
|
(MinerU2.5-Pro-2605-1.2B)에서 layout 토큰 malformed → 빈 md 산출(실측 G1-2). async
|
|
(`aio_do_parse` = vllm-async-engine, mineru CLI 가 쓰는 정상 경로) = 정상 출력.
|
|
- **이미지 = stateless**: marker 처럼 NAS write 안 함. MinerU 가 md 에 박는 ``
|
|
href 를 그대로 slug 으로 반환 → fastapi(marker_worker)의 `_rewrite_image_refs` 가 basename
|
|
매칭으로 `docimg:img_NNN` 정규화 + NAS persist. (계약 무변)
|
|
- **VRAM 캡**: `MINERU_GPU_MEMORY_UTILIZATION`(vLLM 분율, 0.40→~6GB 실측). compose 의
|
|
`MINERU_VIRTUAL_VRAM_SIZE` 도 무해(실측 정상)하나 출력엔 무관 — 캡은 분율로 충분.
|
|
backend=`vlm-engine`(기본 hybrid-engine 은 다중모델 로드 OOM, 반드시 명시).
|
|
|
|
엔진은 첫 변환(또는 startup warmup) 시 1회 로드 — MinerU ModelSingleton 캐시. 단일 GPU 라
|
|
변환은 _engine_lock 으로 직렬화.
|
|
"""
|
|
import asyncio
|
|
import base64
|
|
import hashlib
|
|
import inspect
|
|
import io
|
|
import logging
|
|
import os
|
|
import time
|
|
import unicodedata
|
|
from pathlib import Path
|
|
|
|
import fitz # PyMuPDF — page 슬라이스 + 페이지수
|
|
from fastapi import FastAPI, HTTPException, Response
|
|
from PIL import Image
|
|
from pydantic import BaseModel, Field
|
|
|
|
logger = logging.getLogger("mineru-service")
|
|
logging.basicConfig(level=logging.INFO)
|
|
app = FastAPI()
|
|
|
|
try:
|
|
import importlib.metadata
|
|
_engine_version = importlib.metadata.version("mineru")
|
|
except Exception:
|
|
_engine_version = "unknown"
|
|
|
|
# ---- 설정 (compose env 로 override) -----------------------------------------
|
|
MINERU_BACKEND = os.getenv("MINERU_BACKEND", "vlm-engine")
|
|
MINERU_LANG = os.getenv("MINERU_LANG", "korean")
|
|
GPU_MEM_UTIL = float(os.getenv("MINERU_GPU_MEMORY_UTILIZATION", "0.40"))
|
|
|
|
MAX_IMAGES_PER_DOC = int(os.getenv("MINERU_MAX_IMAGES_PER_DOC", "200"))
|
|
MAX_BYTES_PER_IMAGE = int(os.getenv("MINERU_MAX_BYTES_PER_IMAGE", str(10 * 1024 * 1024)))
|
|
MAX_PAGES_HARD = int(os.getenv("MINERU_MAX_PAGES_HARD", "200")) # 1-shot max_pages 안전장치
|
|
|
|
# self-timeout — 변환/워밍이 vLLM 행으로 _engine_lock 을 영구 점유해 서비스가 wedge 되는 것을 차단.
|
|
# (클라이언트 marker_worker 는 300s 로 포기하나 서버측 inflight 는 자동 취소 안 됨 → 서버 자체 상한 필요.)
|
|
PARSE_TIMEOUT_S = float(os.getenv("MINERU_PARSE_TIMEOUT_S", "600"))
|
|
WARMUP_TIMEOUT_S = float(os.getenv("MINERU_WARMUP_TIMEOUT_S", "1200")) # 최초 모델 다운로드(~2.4GB) 여유
|
|
|
|
_PRELOAD = os.getenv("MINERU_PRELOAD", "1") != "0"
|
|
|
|
# ---- 엔진 상태 ---------------------------------------------------------------
|
|
_warmup_done = False
|
|
_warmup_error: str | None = None
|
|
# 단일 GPU async 엔진 — warmup + convert 직렬화(엔진 1개, 임시디렉토리/싱글톤 경합 차단).
|
|
_engine_lock = asyncio.Lock()
|
|
|
|
|
|
def _is_engine_fatal(exc: BaseException) -> bool:
|
|
"""OOM/CUDA 류 = 엔진 상태 오염 가능 → 재워밍 강제 대상(타임아웃은 호출측에서 별도 판정)."""
|
|
s = f"{type(exc).__name__} {exc}".lower()
|
|
return any(
|
|
k in s
|
|
for k in ("out of memory", "oom", "cuda", "cublas", "device-side", "illegal memory")
|
|
)
|
|
|
|
|
|
async def _run_mineru(pdf_bytes: bytes, lang: str) -> tuple[str, list[dict]]:
|
|
"""슬라이스된 PDF bytes → (markdown, 이미지 dict 리스트). **async 엔진 경로.**
|
|
|
|
호출자(_ensure_warmup / convert)가 _engine_lock 을 잡은 상태로 호출한다.
|
|
이미지 dict: {slug, format, width, height, raw_bytes}. slug = md href 그대로.
|
|
"""
|
|
import glob
|
|
import tempfile
|
|
|
|
from mineru.cli.common import aio_do_parse
|
|
|
|
with tempfile.TemporaryDirectory(prefix="mineru_") as td:
|
|
candidate = {
|
|
"output_dir": td,
|
|
"pdf_file_names": ["doc"],
|
|
"pdf_bytes_list": [pdf_bytes],
|
|
"p_lang_list": [lang],
|
|
"backend": MINERU_BACKEND,
|
|
"formula_enable": True,
|
|
"table_enable": True,
|
|
"f_dump_md": True,
|
|
"f_dump_content_list": True,
|
|
"f_dump_middle_json": False,
|
|
"f_dump_model_output": False,
|
|
"f_dump_orig_pdf": False,
|
|
"f_draw_layout_bbox": False,
|
|
"f_draw_span_bbox": False,
|
|
"gpu_memory_utilization": GPU_MEM_UTIL,
|
|
}
|
|
sig = inspect.signature(aio_do_parse)
|
|
has_var_kw = any(
|
|
p.kind == inspect.Parameter.VAR_KEYWORD for p in sig.parameters.values()
|
|
)
|
|
kwargs = candidate if has_var_kw else {
|
|
k: v for k, v in candidate.items() if k in sig.parameters
|
|
}
|
|
await aio_do_parse(**kwargs)
|
|
|
|
md_files = sorted(glob.glob(f"{td}/**/*.md", recursive=True))
|
|
if not md_files:
|
|
raise RuntimeError("mineru produced no markdown output")
|
|
md_path = Path(md_files[0])
|
|
md_text = md_path.read_text(encoding="utf-8", errors="replace")
|
|
|
|
images: list[dict] = []
|
|
img_dir = md_path.parent / "images"
|
|
if img_dir.is_dir():
|
|
for img_file in sorted(img_dir.iterdir()):
|
|
if not img_file.is_file():
|
|
continue
|
|
raw = img_file.read_bytes()
|
|
slug = f"images/{img_file.name}" # md href 와 정확히 일치
|
|
w = h = None
|
|
try:
|
|
with Image.open(io.BytesIO(raw)) as im:
|
|
w, h = im.width, im.height
|
|
fmt = (im.format or "JPEG").lower()
|
|
except Exception:
|
|
fmt = img_file.suffix.lstrip(".").lower() or "jpeg"
|
|
images.append(
|
|
{"slug": slug, "format": fmt, "width": w, "height": h, "raw_bytes": raw}
|
|
)
|
|
return md_text, images
|
|
|
|
|
|
async def _ensure_warmup() -> None:
|
|
"""첫 /convert 또는 startup hook 시 1-page 합성 PDF 로 엔진+모델 적재."""
|
|
global _warmup_done, _warmup_error
|
|
if _warmup_done:
|
|
return
|
|
async with _engine_lock:
|
|
if _warmup_done:
|
|
return
|
|
try:
|
|
logger.info("[mineru-service] warmup start (async engine load + model fetch)")
|
|
doc = fitz.open()
|
|
page = doc.new_page()
|
|
page.insert_text((72, 72), "MinerU warmup.")
|
|
warmup_bytes = doc.tobytes()
|
|
doc.close()
|
|
await asyncio.wait_for(_run_mineru(warmup_bytes, MINERU_LANG), timeout=WARMUP_TIMEOUT_S)
|
|
_warmup_done = True
|
|
_warmup_error = None
|
|
logger.info(f"[mineru-service] warmup done engine_version={_engine_version}")
|
|
except Exception as exc:
|
|
_warmup_error = f"{type(exc).__name__}: {exc}"
|
|
logger.exception("[mineru-service] warmup failed")
|
|
raise
|
|
|
|
|
|
@app.on_event("startup")
|
|
async def startup():
|
|
if _PRELOAD:
|
|
asyncio.create_task(_ensure_warmup())
|
|
|
|
|
|
# ---- 계약 모델 (marker 와 동일 shape) ----------------------------------------
|
|
class ConvertRequest(BaseModel):
|
|
file_path: str
|
|
max_pages: int | None = None
|
|
start_page: int | None = None # 1-based inclusive
|
|
end_page: int | None = None # 1-based inclusive
|
|
|
|
|
|
class ConvertImage(BaseModel):
|
|
slug: str
|
|
format: str
|
|
width: int | None = None
|
|
height: int | None = None
|
|
bytes_b64: str
|
|
|
|
|
|
class ConvertResponse(BaseModel):
|
|
md_content: str
|
|
md_content_hash: str
|
|
engine: str
|
|
engine_version: str
|
|
elapsed_ms: int
|
|
raw_metrics: dict
|
|
images: list[ConvertImage] = Field(default_factory=list)
|
|
images_truncated: bool = False
|
|
|
|
|
|
@app.get("/health")
|
|
def health():
|
|
return {"status": "ok", "service": "mineru-service"}
|
|
|
|
|
|
@app.get("/ready")
|
|
async def ready(response: Response):
|
|
"""marker /ready 의미 복제: warmup_failed 만 503, idle/warming=200(depends_on 굳음 방지)."""
|
|
if _warmup_error:
|
|
response.status_code = 503
|
|
return {"status": "warmup_failed", "engine": "mineru",
|
|
"engine_version": _engine_version, "error": _warmup_error}
|
|
if not _warmup_done:
|
|
return {"status": "warming_up" if _PRELOAD else "idle", "engine": "mineru",
|
|
"engine_version": _engine_version, "models_loaded": False}
|
|
return {"status": "ready", "engine": "mineru",
|
|
"engine_version": _engine_version, "models_loaded": True}
|
|
|
|
|
|
def _resolve_path(file_path: str) -> Path | None:
|
|
"""NFC(DB) vs NFD(NFS) 한글 경로 정규화 차이 흡수. ocr/server.py 와 동일 패턴
|
|
(필수 — 한글명 파일은 NFS=NFD 저장이라 DB 의 NFC 경로로는 is_file=False)."""
|
|
for c in (file_path,
|
|
unicodedata.normalize("NFD", file_path),
|
|
unicodedata.normalize("NFC", file_path)):
|
|
p = Path(c)
|
|
if p.exists():
|
|
return p
|
|
parent = Path(file_path).parent
|
|
if parent.exists():
|
|
target = unicodedata.normalize("NFC", Path(file_path).name)
|
|
for child in parent.iterdir():
|
|
if unicodedata.normalize("NFC", child.name) == target:
|
|
return child
|
|
return None
|
|
|
|
|
|
def _slice_pdf(src_path: Path, start_page: int | None, end_page: int | None,
|
|
max_pages: int | None) -> tuple[bytes, int]:
|
|
"""요청 page 범위(1-based inclusive)만 담은 새 PDF bytes + 변환 페이지수 반환."""
|
|
with fitz.open(src_path) as src:
|
|
n = src.page_count
|
|
if start_page is not None and end_page is not None:
|
|
a = max(0, start_page - 1)
|
|
b = min(n - 1, end_page - 1)
|
|
else:
|
|
a = 0
|
|
cap = max_pages if max_pages is not None else MAX_PAGES_HARD
|
|
b = min(n - 1, cap - 1)
|
|
if b < a:
|
|
raise HTTPException(422, detail={"code": "bad_page_range",
|
|
"message": f"a={a} b={b} n={n}"})
|
|
out = fitz.open()
|
|
out.insert_pdf(src, from_page=a, to_page=b)
|
|
pdf_bytes = out.tobytes()
|
|
out.close()
|
|
return pdf_bytes, (b - a + 1)
|
|
|
|
|
|
def _serialize_images(images: list[dict], src_path: str) -> tuple[list[ConvertImage], bool]:
|
|
"""이미지 dict 리스트 → base64 ConvertImage 리스트 (marker 가드 동일)."""
|
|
truncated = len(images) > MAX_IMAGES_PER_DOC
|
|
if truncated:
|
|
logger.warning(f"[mineru-service] images truncated path={src_path} "
|
|
f"total={len(images)} cap={MAX_IMAGES_PER_DOC}")
|
|
images = images[:MAX_IMAGES_PER_DOC]
|
|
out: list[ConvertImage] = []
|
|
for img in images:
|
|
raw = img["raw_bytes"]
|
|
if len(raw) > MAX_BYTES_PER_IMAGE:
|
|
logger.warning(f"[mineru-service] image too large skipped path={src_path} "
|
|
f"slug={img['slug']} bytes={len(raw)} cap={MAX_BYTES_PER_IMAGE}")
|
|
continue
|
|
out.append(ConvertImage(
|
|
slug=img["slug"], format=img["format"],
|
|
width=img.get("width"), height=img.get("height"),
|
|
bytes_b64=base64.b64encode(raw).decode("ascii"),
|
|
))
|
|
return out, truncated
|
|
|
|
|
|
@app.post("/convert", response_model=ConvertResponse)
|
|
async def convert(req: ConvertRequest):
|
|
global _warmup_done
|
|
p = _resolve_path(req.file_path)
|
|
if p is None or not p.is_file():
|
|
raise HTTPException(404, detail={"code": "file_not_found", "message": req.file_path})
|
|
if req.start_page is not None and req.end_page is not None:
|
|
if req.start_page < 1 or req.end_page < req.start_page:
|
|
raise HTTPException(422, detail={"code": "bad_page_range",
|
|
"message": f"start_page={req.start_page} end_page={req.end_page}"})
|
|
|
|
pdf_bytes, page_count = _slice_pdf(p, req.start_page, req.end_page, req.max_pages)
|
|
|
|
await _ensure_warmup() # 엔진 로드 보장(내부에서 _engine_lock 잡았다 놓음)
|
|
async with _engine_lock: # 실제 변환 직렬화(단일 GPU)
|
|
start = time.monotonic()
|
|
try:
|
|
md_text, raw_images = await asyncio.wait_for(
|
|
_run_mineru(pdf_bytes, MINERU_LANG), timeout=PARSE_TIMEOUT_S
|
|
)
|
|
except HTTPException:
|
|
raise
|
|
except Exception as exc:
|
|
# 타임아웃(엔진 행) 또는 OOM/CUDA 류면 엔진 오염 가능 → 다음 요청이 재워밍하도록 리셋.
|
|
# 재워밍까지 실패하면 _ensure_warmup 이 _warmup_error 설정 → /ready 503 → healthcheck
|
|
# 재시작으로 escalate(영구 degradation 차단). 일시 OOM 이면 재워밍 성공 후 정상화.
|
|
if isinstance(exc, (asyncio.TimeoutError, TimeoutError)) or _is_engine_fatal(exc):
|
|
_warmup_done = False
|
|
logger.error("[mineru-service] engine reset (timeout/fatal) path=%s: %s", p, exc)
|
|
logger.exception(f"[mineru-service] conversion failed path={p}: {exc}")
|
|
raise HTTPException(422, detail={"code": "conversion_failed",
|
|
"message": f"{type(exc).__name__}: {exc}"}) from exc
|
|
elapsed_ms = int((time.monotonic() - start) * 1000)
|
|
|
|
images_payload, truncated = _serialize_images(raw_images, str(p))
|
|
|
|
return ConvertResponse(
|
|
md_content=md_text,
|
|
md_content_hash=hashlib.sha256(md_text.encode("utf-8")).hexdigest(),
|
|
engine="mineru",
|
|
engine_version=_engine_version,
|
|
elapsed_ms=elapsed_ms,
|
|
raw_metrics={
|
|
"page_count": page_count,
|
|
"image_count_extracted": len(raw_images),
|
|
"image_count_returned": len(images_payload),
|
|
},
|
|
images=images_payload,
|
|
images_truncated=truncated,
|
|
)
|