From 7aaabe2c7592c2ee618596d394facb7a73781b0a Mon Sep 17 00:00:00 2001 From: hyungi Date: Sun, 24 May 2026 07:39:49 +0000 Subject: [PATCH] feat(search): split markdown processing for large PDFs (>threshold) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit PR-DocSrv-LargeDoc-Split-Markdown-1 commit 3 (plan brisk-paging-quokka.md). - page_count gauge 분기: 소형(<=120p)=_process_single 통째 1-shot / 대형(>120p)=_process_split - MAX_PAGES=200 hard skip 제거 → 대형은 BATCH_PAGES=40 page-range 윈도우 순차 변환 - 각 batch /convert start_page/end_page(1-based) 호출 + slug 충돌 회피 batch별 ref rewrite + stitch - _persist_images_to_nas seq_offset → batch 간 image_key(img_NNN) 연속 - md_status success/partial/failed (전부/일부/전무) + failed batch manifest JSON - 대형 md_content = head+manifest (LARGE_DOC_MD_CONTENT_HEAD_CHARS=50000), canonical=document_chunks(commit 4) - MARKER_MAX_SPLIT_PAGES=5000 초과 = skipped_too_large 안전상태 검증: G1 소형회귀 doc6675 동일(success,6292,14)/single경로 / G2 doc5180 453p→12batch success manifest+207img(img_001~207 연속) / G4 stuck0 restart0 각batch<300s. 섹션 chunk적재(G3)=commit 4. Co-Authored-By: Claude Opus 4.7 (1M context) --- app/workers/marker_worker.py | 246 +++++++++++++++++++++++++++++++++-- 1 file changed, 236 insertions(+), 10 deletions(-) diff --git a/app/workers/marker_worker.py b/app/workers/marker_worker.py index c7ff1eb..d8a0ef9 100644 --- a/app/workers/marker_worker.py +++ b/app/workers/marker_worker.py @@ -3,9 +3,12 @@ 플로우: classify_worker 완료 → enqueue 'markdown' stage (또는 reprocess 스크립트가 force=True 로 enqueue) → marker_worker.process() - → doc_type / 확장자 / page_count 가드 → marker-service POST /convert + → doc_type / 확장자 / page_count gauge + → 소형(≤SPLIT_THRESHOLD_PAGES) = _process_single 통째 1-shot /convert + 대형(>SPLIT_THRESHOLD_PAGES) = _process_split page-range BATCH_PAGES 윈도우 분할 → 응답 이미지 NAS persist + document_images UPSERT + md_content ref 정규화 - → md_content 저장 또는 doc-level failed (404/422) 또는 transient raise (5xx → queue retry) + → md_content 저장(대형=head+manifest) / md_status success·partial·failed / doc-level failed(404/422) + / transient raise(소형 5xx → queue retry) 이미지 저장 위치: NAS `/documents/extracted_images/{document_id}/{image_key}.{ext}` md_content ref 형식: `![alt](docimg:img_001)` — image_key 가 sequence 기반 결정적 → idempotent. @@ -14,6 +17,7 @@ plan: ~/.claude/plans/piped-humming-crystal.md """ import base64 import hashlib +import json import logging import os import re @@ -34,7 +38,16 @@ logger = logging.getLogger(__name__) MARKER_ENDPOINT = "http://marker-service:3300/convert" MARKER_TIMEOUT = 300 # 큰 PDF 5 분 한도 -MAX_PAGES = 200 # 페이지 hard limit +MAX_PAGES = 200 # 소형 1-shot 경로 /convert max_pages 안전장치 + +# LargeDoc split (PR-DocSrv-LargeDoc-Split-Markdown-1, plan brisk-paging-quokka.md): +# >SPLIT_THRESHOLD_PAGES PDF 는 통째 skip 대신 BATCH_PAGES 윈도우로 page-range 분할 변환. +# 각 batch /convert 호출 < MARKER_TIMEOUT 보장 + queue 폭주 회피. 권위 검색본(섹션 chunks)은 +# commit 4 에서 document_chunks(source_type=marker_section) 에 적재 (본 commit 3 미포함). +SPLIT_THRESHOLD_PAGES = int(os.getenv("MARKER_SPLIT_THRESHOLD_PAGES", "120")) +BATCH_PAGES = int(os.getenv("MARKER_BATCH_PAGES", "40")) +MAX_SPLIT_PAGES = int(os.getenv("MARKER_MAX_SPLIT_PAGES", "5000")) # 초과 = skipped_too_large 안전상태 +LARGE_DOC_MD_CONTENT_HEAD_CHARS = int(os.getenv("LARGE_DOC_MD_CONTENT_HEAD_CHARS", "50000")) # Phase 1B.5: 이미지 NAS persist 토글. rollback 시 false → 응답 images 무시 + md_content # rewrite skip → placeholder card 폴백 자연 유지. 환경변수 미설정 = 기본 활성화. @@ -155,13 +168,15 @@ async def process(document_id: int, session: AsyncSession) -> None: ) return - # ---- (4) MAX_PAGES guard ---- + # ---- (4) page_count gauge + 분기 (LargeDoc split) ---- page_count = _get_page_count(container_path) - if page_count is not None and page_count > MAX_PAGES: - logger.info(f"markdown_skip_too_many_pages id={document_id} pages={page_count}") + + # >MAX_SPLIT_PAGES = 변환 안전상태(manual_review). silently skip 아님. + if page_count is not None and page_count > MAX_SPLIT_PAGES: + logger.info(f"markdown_skip_too_large id={document_id} pages={page_count}") await _set_skipped( session, document_id, - f"skipped: page_count={page_count} exceeds MAX_PAGES={MAX_PAGES}", + f"skipped_too_large: page_count={page_count} exceeds MAX_SPLIT_PAGES={MAX_SPLIT_PAGES}", ) return @@ -171,7 +186,17 @@ async def process(document_id: int, session: AsyncSession) -> None: ) await session.commit() - # ---- (6) Marker 호출 ---- + # ---- (6) 변환 분기: 소형 1-shot / 대형(>SPLIT_THRESHOLD) page-range 분할 ---- + if page_count is not None and page_count > SPLIT_THRESHOLD_PAGES: + await _process_split(doc, document_id, container_path, page_count, session) + else: + await _process_single(doc, document_id, container_path, session) + + +async def _process_single( + doc: Document, document_id: int, container_path: str, session: AsyncSession +) -> None: + """소형 PDF(≤ SPLIT_THRESHOLD_PAGES) 통째 1-shot 변환 (Phase 1B/1B.5 기존 경로).""" try: async with httpx.AsyncClient(timeout=MARKER_TIMEOUT) as client: resp = await client.post( @@ -273,6 +298,205 @@ async def process(document_id: int, session: AsyncSession) -> None: ) +async def _process_split( + doc: Document, + document_id: int, + container_path: str, + page_count: int, + session: AsyncSession, +) -> None: + """대형 PDF page-range 분할 변환. + + BATCH_PAGES 윈도우로 순차 /convert (각 호출 < MARKER_TIMEOUT). batch 단위로 + 이미지 persist + ref rewrite 후 stitch. md_content = head+manifest (full blob 미저장, + canonical 검색본 = document_chunks, commit 4). 결과: 전부 성공=success / 일부=partial / + 전부 실패=failed. + + invariant: page numbering = 1-based inclusive (batch1: 1..BATCH_PAGES, ...). + marker slug(`_page_0_*`) 는 batch 마다 재시작 → batch 별 rewrite 후 stitch (충돌 회피). + """ + n_batches = (page_count + BATCH_PAGES - 1) // BATCH_PAGES + succeeded: list[dict[str, Any]] = [] # {start_page, end_page, md} + failed: list[dict[str, Any]] = [] # {start_page, end_page, error} + all_saved: list[dict[str, Any]] = [] + engine: str | None = None + engine_version: str | None = None + images_truncated = False + + async with httpx.AsyncClient(timeout=MARKER_TIMEOUT) as client: + for b in range(n_batches): + start_page = b * BATCH_PAGES + 1 + end_page = min((b + 1) * BATCH_PAGES, page_count) + try: + resp = await client.post( + MARKER_ENDPOINT, + json={ + "file_path": container_path, + "start_page": start_page, + "end_page": end_page, + }, + ) + resp.raise_for_status() + data = resp.json() + except httpx.HTTPStatusError as exc: + # 404 = 파일 자체 부재 → 전체 doc fatal (batch 의미 없음). + if exc.response.status_code == 404: + await _fail( + session, document_id, + f"file_not_found during split: {container_path}", + ) + return + msg = _http_error_message(exc) + logger.warning( + f"[marker] split batch failed id={document_id} " + f"pages={start_page}-{end_page} status={exc.response.status_code}: {msg[:200]}" + ) + failed.append({"start_page": start_page, "end_page": end_page, "error": msg[:500]}) + continue + except Exception as exc: + # transient(5xx/transport/timeout) 포함 — batch 실패로 기록(성공 batch 보존). + # 복구는 re-enqueue(force_reprocess)로 전체 재시도. + logger.warning( + f"[marker] split batch error id={document_id} " + f"pages={start_page}-{end_page} kind={type(exc).__name__}: {exc}" + ) + failed.append({ + "start_page": start_page, "end_page": end_page, + "error": f"{type(exc).__name__}: {exc}"[:500], + }) + continue + + engine = data.get("engine") or engine + engine_version = data.get("engine_version") or engine_version + if data.get("images_truncated"): + images_truncated = True + + md_raw = data["md_content"] + images_resp = data.get("images") if MARKDOWN_IMAGE_PERSIST else None + saved_batch: list[dict[str, Any]] = [] + if images_resp: + try: + saved_batch = _persist_images_to_nas( + document_id, images_resp, seq_offset=len(all_saved) + ) + except OSError as exc: + logger.warning( + f"[marker] split image persist failed id={document_id} " + f"pages={start_page}-{end_page}: {type(exc).__name__}: {exc}" + ) + failed.append({ + "start_page": start_page, "end_page": end_page, + "error": f"image_persist: {type(exc).__name__}: {exc}"[:500], + }) + continue + + slug_to_key = {img["source_slug"]: img["image_key"] for img in saved_batch} + md_batch = _rewrite_image_refs(md_raw, slug_to_key) + all_saved.extend(saved_batch) + succeeded.append({"start_page": start_page, "end_page": end_page, "md": md_batch}) + + manifest = _split_manifest(page_count, n_batches, succeeded, failed) + + # ---- 결과 판정 ---- + if not succeeded: + await _fail(session, document_id, json.dumps(manifest, ensure_ascii=False)[:4000]) + logger.warning(f"[marker] split failed id={document_id} all {n_batches} batches failed") + return + + md_status = "success" if not failed else "partial" + stitched = "\n\n".join(b["md"] for b in succeeded) + md_content = _build_large_md_content(stitched[:LARGE_DOC_MD_CONTENT_HEAD_CHARS], manifest) + + quality = _compute_quality(stitched, doc.extracted_text or "", {"page_count": page_count}) + quality.setdefault("metrics", {})["split"] = manifest + if images_truncated: + quality.setdefault("warnings", []).append("images_truncated") + + orphan_paths = await _sync_document_images( + session, document_id, all_saved, + {"engine": engine or "marker", "engine_version": engine_version}, + ) + + await session.execute( + update(Document).where(Document.id == document_id).values( + md_content=md_content, + md_status=md_status, + md_extraction_engine=engine or "marker", + md_extraction_engine_version=engine_version, + md_extraction_quality=quality, + md_content_hash=hashlib.sha256(md_content.encode("utf-8")).hexdigest(), + md_source_hash=doc.file_hash, + md_generated_at=_now(), + md_extraction_error=( + None if md_status == "success" + else json.dumps(manifest, ensure_ascii=False)[:4000] + ), + md_frontmatter=doc.md_frontmatter or {}, + md_format_version="1.0", + content_origin="extracted", + ) + ) + await session.commit() + + for orphan_path in orphan_paths: + try: + Path(orphan_path).unlink(missing_ok=True) + except Exception as exc: + logger.warning( + f"[marker] orphan image unlink failed id={document_id} path={orphan_path}: " + f"{type(exc).__name__}: {exc}" + ) + + logger.info( + f"[marker] split done id={document_id} status={md_status} pages={page_count} " + f"batches={n_batches} ok={len(succeeded)} failed={len(failed)} " + f"images={len(all_saved)} md_head_len={len(md_content)}" + ) + + +def _http_error_message(exc: httpx.HTTPStatusError) -> str: + """marker-service 에러 응답에서 code: message 추출 (JSON 실패 시 raw text).""" + try: + detail = exc.response.json().get("detail", {}) + return f'{detail.get("code", "unknown")}: {detail.get("message", exc.response.text)}' + except Exception: + return exc.response.text + + +def _split_manifest( + total_pages: int, n_batches: int, + succeeded: list[dict[str, Any]], failed: list[dict[str, Any]], +) -> dict[str, Any]: + """split 결과 manifest — md_extraction_error/quality.metrics.split 공통.""" + return { + "mode": "split", + "total_pages": total_pages, + "batch_pages": BATCH_PAGES, + "batch_count": n_batches, + "succeeded_batches": len(succeeded), + "failed_batches": [ + {"start_page": f["start_page"], "end_page": f["end_page"], "error": f["error"]} + for f in failed + ], + "canonical_storage": "document_chunks(source_type=marker_section)", + } + + +def _build_large_md_content(head: str, manifest: dict[str, Any]) -> str: + """대형 split 문서의 md_content = manifest 헤더 + head preview (full blob 미저장).""" + failed_n = len(manifest["failed_batches"]) + badge = "partial" if failed_n else "success" + header = ( + f"\n" + f"> 📄 **대형 문서 분할 변환** — 총 {manifest['total_pages']}p · " + f"{manifest['batch_count']} batch (batch_pages={manifest['batch_pages']}) · 상태 **{badge}**\n" + f"> 권위 검색본 = `document_chunks` (source_type=marker_section). 아래는 head preview.\n\n" + ) + return header + head + + async def _read_force_reprocess(session: AsyncSession, document_id: int) -> bool: """현재 markdown stage queue 행의 payload.force_reprocess 조회. 없으면 False.""" row = await session.scalar( @@ -291,18 +515,20 @@ async def _read_force_reprocess(session: AsyncSession, document_id: int) -> bool def _persist_images_to_nas( - document_id: int, images_resp: list[dict[str, Any]] + document_id: int, images_resp: list[dict[str, Any]], seq_offset: int = 0 ) -> list[dict[str, Any]]: """marker 응답 이미지 list 를 NAS 에 저장하고 메타 dict 리스트 반환. image_key 는 sequence 기반 결정적 (`img_001` → `img_NNN`, marker 출력 순서 = 안정적). 같은 doc 재변환 시 같은 key 가 같은 path 에 overwrite → idempotent. + + seq_offset: split 변환에서 batch 간 key 연속성 유지용 (batch1 0개 → batch2 offset=N). """ img_root = EXTRACTED_IMAGES_ROOT / str(document_id) img_root.mkdir(parents=True, exist_ok=True) saved: list[dict[str, Any]] = [] - for seq, img in enumerate(images_resp, start=1): + for seq, img in enumerate(images_resp, start=seq_offset + 1): try: raw_bytes = base64.b64decode(img["bytes_b64"]) except Exception as exc: