fix(mineru): 변환/워밍 self-timeout + OOM·행 시 엔진 재워밍 escalate
aio_do_parse 에 자체 타임아웃이 없어 vLLM 행 시 _engine_lock 을 영구 점유 → markdown 변환 전체 마비(컨테이너 재시작 전까지). 클라이언트(marker_worker)는 300s 로 포기하나 서버측 inflight 는 자동 취소 안 됨. - _run_mineru 를 asyncio.wait_for(convert 600s / warmup 1200s)로 감싸 lock 점유 상한. - 타임아웃·OOM/CUDA 류 실패 시 _warmup_done 리셋 → 다음 요청 재워밍. 재워밍도 실패하면 _warmup_error → /ready 503 → healthcheck 재시작으로 escalate(영구 degradation 차단). Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -59,6 +59,11 @@ MAX_IMAGES_PER_DOC = int(os.getenv("MINERU_MAX_IMAGES_PER_DOC", "200"))
|
|||||||
MAX_BYTES_PER_IMAGE = int(os.getenv("MINERU_MAX_BYTES_PER_IMAGE", str(10 * 1024 * 1024)))
|
MAX_BYTES_PER_IMAGE = int(os.getenv("MINERU_MAX_BYTES_PER_IMAGE", str(10 * 1024 * 1024)))
|
||||||
MAX_PAGES_HARD = int(os.getenv("MINERU_MAX_PAGES_HARD", "200")) # 1-shot max_pages 안전장치
|
MAX_PAGES_HARD = int(os.getenv("MINERU_MAX_PAGES_HARD", "200")) # 1-shot max_pages 안전장치
|
||||||
|
|
||||||
|
# self-timeout — 변환/워밍이 vLLM 행으로 _engine_lock 을 영구 점유해 서비스가 wedge 되는 것을 차단.
|
||||||
|
# (클라이언트 marker_worker 는 300s 로 포기하나 서버측 inflight 는 자동 취소 안 됨 → 서버 자체 상한 필요.)
|
||||||
|
PARSE_TIMEOUT_S = float(os.getenv("MINERU_PARSE_TIMEOUT_S", "600"))
|
||||||
|
WARMUP_TIMEOUT_S = float(os.getenv("MINERU_WARMUP_TIMEOUT_S", "1200")) # 최초 모델 다운로드(~2.4GB) 여유
|
||||||
|
|
||||||
_PRELOAD = os.getenv("MINERU_PRELOAD", "1") != "0"
|
_PRELOAD = os.getenv("MINERU_PRELOAD", "1") != "0"
|
||||||
|
|
||||||
# ---- 엔진 상태 ---------------------------------------------------------------
|
# ---- 엔진 상태 ---------------------------------------------------------------
|
||||||
@@ -68,6 +73,15 @@ _warmup_error: str | None = None
|
|||||||
_engine_lock = asyncio.Lock()
|
_engine_lock = asyncio.Lock()
|
||||||
|
|
||||||
|
|
||||||
|
def _is_engine_fatal(exc: BaseException) -> bool:
|
||||||
|
"""OOM/CUDA 류 = 엔진 상태 오염 가능 → 재워밍 강제 대상(타임아웃은 호출측에서 별도 판정)."""
|
||||||
|
s = f"{type(exc).__name__} {exc}".lower()
|
||||||
|
return any(
|
||||||
|
k in s
|
||||||
|
for k in ("out of memory", "oom", "cuda", "cublas", "device-side", "illegal memory")
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
async def _run_mineru(pdf_bytes: bytes, lang: str) -> tuple[str, list[dict]]:
|
async def _run_mineru(pdf_bytes: bytes, lang: str) -> tuple[str, list[dict]]:
|
||||||
"""슬라이스된 PDF bytes → (markdown, 이미지 dict 리스트). **async 엔진 경로.**
|
"""슬라이스된 PDF bytes → (markdown, 이미지 dict 리스트). **async 엔진 경로.**
|
||||||
|
|
||||||
@@ -148,7 +162,7 @@ async def _ensure_warmup() -> None:
|
|||||||
page.insert_text((72, 72), "MinerU warmup.")
|
page.insert_text((72, 72), "MinerU warmup.")
|
||||||
warmup_bytes = doc.tobytes()
|
warmup_bytes = doc.tobytes()
|
||||||
doc.close()
|
doc.close()
|
||||||
await _run_mineru(warmup_bytes, MINERU_LANG)
|
await asyncio.wait_for(_run_mineru(warmup_bytes, MINERU_LANG), timeout=WARMUP_TIMEOUT_S)
|
||||||
_warmup_done = True
|
_warmup_done = True
|
||||||
_warmup_error = None
|
_warmup_error = None
|
||||||
logger.info(f"[mineru-service] warmup done engine_version={_engine_version}")
|
logger.info(f"[mineru-service] warmup done engine_version={_engine_version}")
|
||||||
@@ -274,6 +288,7 @@ def _serialize_images(images: list[dict], src_path: str) -> tuple[list[ConvertIm
|
|||||||
|
|
||||||
@app.post("/convert", response_model=ConvertResponse)
|
@app.post("/convert", response_model=ConvertResponse)
|
||||||
async def convert(req: ConvertRequest):
|
async def convert(req: ConvertRequest):
|
||||||
|
global _warmup_done
|
||||||
p = _resolve_path(req.file_path)
|
p = _resolve_path(req.file_path)
|
||||||
if p is None or not p.is_file():
|
if p is None or not p.is_file():
|
||||||
raise HTTPException(404, detail={"code": "file_not_found", "message": req.file_path})
|
raise HTTPException(404, detail={"code": "file_not_found", "message": req.file_path})
|
||||||
@@ -288,10 +303,18 @@ async def convert(req: ConvertRequest):
|
|||||||
async with _engine_lock: # 실제 변환 직렬화(단일 GPU)
|
async with _engine_lock: # 실제 변환 직렬화(단일 GPU)
|
||||||
start = time.monotonic()
|
start = time.monotonic()
|
||||||
try:
|
try:
|
||||||
md_text, raw_images = await _run_mineru(pdf_bytes, MINERU_LANG)
|
md_text, raw_images = await asyncio.wait_for(
|
||||||
|
_run_mineru(pdf_bytes, MINERU_LANG), timeout=PARSE_TIMEOUT_S
|
||||||
|
)
|
||||||
except HTTPException:
|
except HTTPException:
|
||||||
raise
|
raise
|
||||||
except Exception as exc:
|
except Exception as exc:
|
||||||
|
# 타임아웃(엔진 행) 또는 OOM/CUDA 류면 엔진 오염 가능 → 다음 요청이 재워밍하도록 리셋.
|
||||||
|
# 재워밍까지 실패하면 _ensure_warmup 이 _warmup_error 설정 → /ready 503 → healthcheck
|
||||||
|
# 재시작으로 escalate(영구 degradation 차단). 일시 OOM 이면 재워밍 성공 후 정상화.
|
||||||
|
if isinstance(exc, (asyncio.TimeoutError, TimeoutError)) or _is_engine_fatal(exc):
|
||||||
|
_warmup_done = False
|
||||||
|
logger.error("[mineru-service] engine reset (timeout/fatal) path=%s: %s", p, exc)
|
||||||
logger.exception(f"[mineru-service] conversion failed path={p}: {exc}")
|
logger.exception(f"[mineru-service] conversion failed path={p}: {exc}")
|
||||||
raise HTTPException(422, detail={"code": "conversion_failed",
|
raise HTTPException(422, detail={"code": "conversion_failed",
|
||||||
"message": f"{type(exc).__name__}: {exc}"}) from exc
|
"message": f"{type(exc).__name__}: {exc}"}) from exc
|
||||||
|
|||||||
Reference in New Issue
Block a user