From d26b1150d8d7eb96d30fcd68e810ccac37c53dc0 Mon Sep 17 00:00:00 2001 From: hyungi Date: Mon, 29 Jun 2026 13:18:24 +0900 Subject: [PATCH] =?UTF-8?q?fix(workers):=20presegment/csb=20=EC=9D=B4?= =?UTF-8?q?=EB=B2=A4=ED=8A=B8=EB=A3=A8=ED=94=84=20blocking=20I/O=20to=5Fth?= =?UTF-8?q?read=20=EC=98=A4=ED=94=84=EB=A1=9C=EB=93=9C?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - presegment_worker: fitz open/get_toc(동기 blocking, live 스테이지)를 to_thread 로 — 거대/손상 PDF 파싱이 같은 루프의 1분 consumer + FastAPI 요청을 수백 ms~초 정지시키던 것 해소. - csb_collector: 50MB PDF write_bytes + read_bytes(해시)를 to_thread 로 (R5 동형). Co-Authored-By: Claude Opus 4.8 (1M context) --- app/workers/csb_collector.py | 7 +++++-- app/workers/presegment_worker.py | 12 +++++++++--- 2 files changed, 14 insertions(+), 5 deletions(-) diff --git a/app/workers/csb_collector.py b/app/workers/csb_collector.py index b1dd86f..2087722 100644 --- a/app/workers/csb_collector.py +++ b/app/workers/csb_collector.py @@ -140,7 +140,8 @@ async def _download_pdf(url: str, dest: Path) -> int: if len(resp.content) > _MAX_PDF_BYTES: raise FeedError(f"PDF 크기 초과 ({len(resp.content)} bytes): {url}") dest.parent.mkdir(parents=True, exist_ok=True) - dest.write_bytes(resp.content) + # 최대 50MB PDF write 는 동기 blocking — 이벤트루프 점유 회피 to_thread (R5 동형). + await asyncio.to_thread(dest.write_bytes, resp.content) return len(resp.content) @@ -190,9 +191,11 @@ async def _ingest_pdf(session, page_slug: str, pdf_url: str) -> bool: dest = Path(settings.nas_mount_path) / rel_path size = await _download_pdf(pdf_url, dest) + # 50MB PDF read + sha256 는 동기 blocking(I/O+CPU) — 이벤트루프 점유 회피 to_thread (R5 동형). + file_hash = await asyncio.to_thread(lambda: hashlib.sha256(dest.read_bytes()).hexdigest()) doc = Document( file_path=rel_path, - file_hash=hashlib.sha256(dest.read_bytes()).hexdigest(), + file_hash=file_hash, file_format="pdf", file_size=size, file_type="immutable", diff --git a/app/workers/presegment_worker.py b/app/workers/presegment_worker.py index 75bc96d..334a037 100644 --- a/app/workers/presegment_worker.py +++ b/app/workers/presegment_worker.py @@ -497,12 +497,18 @@ async def process(document_id: int, session: AsyncSession) -> None: logger.warning(f"[presegment] id={document_id} file not found ({raw}) → extract") return + import asyncio + import fitz # PyMuPDF — extract_worker/marker_worker 와 동일 의존 + def _read_toc(path: str): + # fitz open/get_toc 는 동기 blocking — live 스테이지라 이벤트루프(같은 루프의 1분 consumer + + # FastAPI 요청) 점유 회피 위해 to_thread 오프로드(거대/손상 PDF 파싱 수백 ms~초). + with fitz.open(path) as pdf: + return pdf.page_count, (pdf.get_toc(simple=True) or []) + try: - with fitz.open(str(source)) as pdf: - page_count = pdf.page_count - toc = pdf.get_toc(simple=True) or [] + page_count, toc = await asyncio.to_thread(_read_toc, str(source)) except Exception as exc: # PDF 손상 등 — 분할 불가. 단일문서로 통과(extract 가 PyMuPDF/OCR 로 재시도하며 가시화). logger.warning(