fix(workers): presegment/csb 이벤트루프 blocking I/O to_thread 오프로드
- presegment_worker: fitz open/get_toc(동기 blocking, live 스테이지)를 to_thread 로 — 거대/손상 PDF 파싱이 같은 루프의 1분 consumer + FastAPI 요청을 수백 ms~초 정지시키던 것 해소. - csb_collector: 50MB PDF write_bytes + read_bytes(해시)를 to_thread 로 (R5 동형). Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -140,7 +140,8 @@ async def _download_pdf(url: str, dest: Path) -> int:
|
||||
if len(resp.content) > _MAX_PDF_BYTES:
|
||||
raise FeedError(f"PDF 크기 초과 ({len(resp.content)} bytes): {url}")
|
||||
dest.parent.mkdir(parents=True, exist_ok=True)
|
||||
dest.write_bytes(resp.content)
|
||||
# 최대 50MB PDF write 는 동기 blocking — 이벤트루프 점유 회피 to_thread (R5 동형).
|
||||
await asyncio.to_thread(dest.write_bytes, resp.content)
|
||||
return len(resp.content)
|
||||
|
||||
|
||||
@@ -190,9 +191,11 @@ async def _ingest_pdf(session, page_slug: str, pdf_url: str) -> bool:
|
||||
|
||||
dest = Path(settings.nas_mount_path) / rel_path
|
||||
size = await _download_pdf(pdf_url, dest)
|
||||
# 50MB PDF read + sha256 는 동기 blocking(I/O+CPU) — 이벤트루프 점유 회피 to_thread (R5 동형).
|
||||
file_hash = await asyncio.to_thread(lambda: hashlib.sha256(dest.read_bytes()).hexdigest())
|
||||
doc = Document(
|
||||
file_path=rel_path,
|
||||
file_hash=hashlib.sha256(dest.read_bytes()).hexdigest(),
|
||||
file_hash=file_hash,
|
||||
file_format="pdf",
|
||||
file_size=size,
|
||||
file_type="immutable",
|
||||
|
||||
@@ -497,12 +497,18 @@ async def process(document_id: int, session: AsyncSession) -> None:
|
||||
logger.warning(f"[presegment] id={document_id} file not found ({raw}) → extract")
|
||||
return
|
||||
|
||||
import asyncio
|
||||
|
||||
import fitz # PyMuPDF — extract_worker/marker_worker 와 동일 의존
|
||||
|
||||
def _read_toc(path: str):
|
||||
# fitz open/get_toc 는 동기 blocking — live 스테이지라 이벤트루프(같은 루프의 1분 consumer +
|
||||
# FastAPI 요청) 점유 회피 위해 to_thread 오프로드(거대/손상 PDF 파싱 수백 ms~초).
|
||||
with fitz.open(path) as pdf:
|
||||
return pdf.page_count, (pdf.get_toc(simple=True) or [])
|
||||
|
||||
try:
|
||||
with fitz.open(str(source)) as pdf:
|
||||
page_count = pdf.page_count
|
||||
toc = pdf.get_toc(simple=True) or []
|
||||
page_count, toc = await asyncio.to_thread(_read_toc, str(source))
|
||||
except Exception as exc:
|
||||
# PDF 손상 등 — 분할 불가. 단일문서로 통과(extract 가 PyMuPDF/OCR 로 재시도하며 가시화).
|
||||
logger.warning(
|
||||
|
||||
Reference in New Issue
Block a user