feat(search): split markdown processing for large PDFs (>threshold)

PR-DocSrv-LargeDoc-Split-Markdown-1 commit 3 (plan brisk-paging-quokka.md).

- page_count gauge 분기: 소형(<=120p)=_process_single 통째 1-shot / 대형(>120p)=_process_split
- MAX_PAGES=200 hard skip 제거 → 대형은 BATCH_PAGES=40 page-range 윈도우 순차 변환
- 각 batch /convert start_page/end_page(1-based) 호출 + slug 충돌 회피 batch별 ref rewrite + stitch
- _persist_images_to_nas seq_offset → batch 간 image_key(img_NNN) 연속
- md_status success/partial/failed (전부/일부/전무) + failed batch manifest JSON
- 대형 md_content = head+manifest (LARGE_DOC_MD_CONTENT_HEAD_CHARS=50000), canonical=document_chunks(commit 4)
- MARKER_MAX_SPLIT_PAGES=5000 초과 = skipped_too_large 안전상태

검증: G1 소형회귀 doc6675 동일(success,6292,14)/single경로 / G2 doc5180 453p→12batch success
manifest+207img(img_001~207 연속) / G4 stuck0 restart0 각batch<300s. 섹션 chunk적재(G3)=commit 4.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
hyungi
2026-05-24 07:39:49 +00:00
parent 2528996dee
commit 7aaabe2c75
+236 -10
View File
@@ -3,9 +3,12 @@
플로우:
classify_worker 완료 → enqueue 'markdown' stage (또는 reprocess 스크립트가 force=True 로 enqueue)
→ marker_worker.process()
→ doc_type / 확장자 / page_count 가드 → marker-service POST /convert
→ doc_type / 확장자 / page_count gauge
→ 소형(≤SPLIT_THRESHOLD_PAGES) = _process_single 통째 1-shot /convert
대형(>SPLIT_THRESHOLD_PAGES) = _process_split page-range BATCH_PAGES 윈도우 분할
→ 응답 이미지 NAS persist + document_images UPSERT + md_content ref 정규화
→ md_content 저장 또는 doc-level failed (404/422) 또는 transient raise (5xx → queue retry)
→ md_content 저장(대형=head+manifest) / md_status success·partial·failed / doc-level failed(404/422)
/ transient raise(소형 5xx → queue retry)
이미지 저장 위치: NAS `/documents/extracted_images/{document_id}/{image_key}.{ext}`
md_content ref 형식: `![alt](docimg:img_001)` — image_key 가 sequence 기반 결정적 → idempotent.
@@ -14,6 +17,7 @@ plan: ~/.claude/plans/piped-humming-crystal.md
"""
import base64
import hashlib
import json
import logging
import os
import re
@@ -34,7 +38,16 @@ logger = logging.getLogger(__name__)
MARKER_ENDPOINT = "http://marker-service:3300/convert"
MARKER_TIMEOUT = 300 # 큰 PDF 5 분 한도
MAX_PAGES = 200 # 페이지 hard limit
MAX_PAGES = 200 # 소형 1-shot 경로 /convert max_pages 안전장치
# LargeDoc split (PR-DocSrv-LargeDoc-Split-Markdown-1, plan brisk-paging-quokka.md):
# >SPLIT_THRESHOLD_PAGES PDF 는 통째 skip 대신 BATCH_PAGES 윈도우로 page-range 분할 변환.
# 각 batch /convert 호출 < MARKER_TIMEOUT 보장 + queue 폭주 회피. 권위 검색본(섹션 chunks)은
# commit 4 에서 document_chunks(source_type=marker_section) 에 적재 (본 commit 3 미포함).
SPLIT_THRESHOLD_PAGES = int(os.getenv("MARKER_SPLIT_THRESHOLD_PAGES", "120"))
BATCH_PAGES = int(os.getenv("MARKER_BATCH_PAGES", "40"))
MAX_SPLIT_PAGES = int(os.getenv("MARKER_MAX_SPLIT_PAGES", "5000")) # 초과 = skipped_too_large 안전상태
LARGE_DOC_MD_CONTENT_HEAD_CHARS = int(os.getenv("LARGE_DOC_MD_CONTENT_HEAD_CHARS", "50000"))
# Phase 1B.5: 이미지 NAS persist 토글. rollback 시 false → 응답 images 무시 + md_content
# rewrite skip → placeholder card 폴백 자연 유지. 환경변수 미설정 = 기본 활성화.
@@ -155,13 +168,15 @@ async def process(document_id: int, session: AsyncSession) -> None:
)
return
# ---- (4) MAX_PAGES guard ----
# ---- (4) page_count gauge + 분기 (LargeDoc split) ----
page_count = _get_page_count(container_path)
if page_count is not None and page_count > MAX_PAGES:
logger.info(f"markdown_skip_too_many_pages id={document_id} pages={page_count}")
# >MAX_SPLIT_PAGES = 변환 안전상태(manual_review). silently skip 아님.
if page_count is not None and page_count > MAX_SPLIT_PAGES:
logger.info(f"markdown_skip_too_large id={document_id} pages={page_count}")
await _set_skipped(
session, document_id,
f"skipped: page_count={page_count} exceeds MAX_PAGES={MAX_PAGES}",
f"skipped_too_large: page_count={page_count} exceeds MAX_SPLIT_PAGES={MAX_SPLIT_PAGES}",
)
return
@@ -171,7 +186,17 @@ async def process(document_id: int, session: AsyncSession) -> None:
)
await session.commit()
# ---- (6) Marker 호출 ----
# ---- (6) 변환 분기: 소형 1-shot / 대형(>SPLIT_THRESHOLD) page-range 분할 ----
if page_count is not None and page_count > SPLIT_THRESHOLD_PAGES:
await _process_split(doc, document_id, container_path, page_count, session)
else:
await _process_single(doc, document_id, container_path, session)
async def _process_single(
doc: Document, document_id: int, container_path: str, session: AsyncSession
) -> None:
"""소형 PDF(≤ SPLIT_THRESHOLD_PAGES) 통째 1-shot 변환 (Phase 1B/1B.5 기존 경로)."""
try:
async with httpx.AsyncClient(timeout=MARKER_TIMEOUT) as client:
resp = await client.post(
@@ -273,6 +298,205 @@ async def process(document_id: int, session: AsyncSession) -> None:
)
async def _process_split(
doc: Document,
document_id: int,
container_path: str,
page_count: int,
session: AsyncSession,
) -> None:
"""대형 PDF page-range 분할 변환.
BATCH_PAGES 윈도우로 순차 /convert (각 호출 < MARKER_TIMEOUT). batch 단위로
이미지 persist + ref rewrite 후 stitch. md_content = head+manifest (full blob 미저장,
canonical 검색본 = document_chunks, commit 4). 결과: 전부 성공=success / 일부=partial /
전부 실패=failed.
invariant: page numbering = 1-based inclusive (batch1: 1..BATCH_PAGES, ...).
marker slug(`_page_0_*`) 는 batch 마다 재시작 → batch 별 rewrite 후 stitch (충돌 회피).
"""
n_batches = (page_count + BATCH_PAGES - 1) // BATCH_PAGES
succeeded: list[dict[str, Any]] = [] # {start_page, end_page, md}
failed: list[dict[str, Any]] = [] # {start_page, end_page, error}
all_saved: list[dict[str, Any]] = []
engine: str | None = None
engine_version: str | None = None
images_truncated = False
async with httpx.AsyncClient(timeout=MARKER_TIMEOUT) as client:
for b in range(n_batches):
start_page = b * BATCH_PAGES + 1
end_page = min((b + 1) * BATCH_PAGES, page_count)
try:
resp = await client.post(
MARKER_ENDPOINT,
json={
"file_path": container_path,
"start_page": start_page,
"end_page": end_page,
},
)
resp.raise_for_status()
data = resp.json()
except httpx.HTTPStatusError as exc:
# 404 = 파일 자체 부재 → 전체 doc fatal (batch 의미 없음).
if exc.response.status_code == 404:
await _fail(
session, document_id,
f"file_not_found during split: {container_path}",
)
return
msg = _http_error_message(exc)
logger.warning(
f"[marker] split batch failed id={document_id} "
f"pages={start_page}-{end_page} status={exc.response.status_code}: {msg[:200]}"
)
failed.append({"start_page": start_page, "end_page": end_page, "error": msg[:500]})
continue
except Exception as exc:
# transient(5xx/transport/timeout) 포함 — batch 실패로 기록(성공 batch 보존).
# 복구는 re-enqueue(force_reprocess)로 전체 재시도.
logger.warning(
f"[marker] split batch error id={document_id} "
f"pages={start_page}-{end_page} kind={type(exc).__name__}: {exc}"
)
failed.append({
"start_page": start_page, "end_page": end_page,
"error": f"{type(exc).__name__}: {exc}"[:500],
})
continue
engine = data.get("engine") or engine
engine_version = data.get("engine_version") or engine_version
if data.get("images_truncated"):
images_truncated = True
md_raw = data["md_content"]
images_resp = data.get("images") if MARKDOWN_IMAGE_PERSIST else None
saved_batch: list[dict[str, Any]] = []
if images_resp:
try:
saved_batch = _persist_images_to_nas(
document_id, images_resp, seq_offset=len(all_saved)
)
except OSError as exc:
logger.warning(
f"[marker] split image persist failed id={document_id} "
f"pages={start_page}-{end_page}: {type(exc).__name__}: {exc}"
)
failed.append({
"start_page": start_page, "end_page": end_page,
"error": f"image_persist: {type(exc).__name__}: {exc}"[:500],
})
continue
slug_to_key = {img["source_slug"]: img["image_key"] for img in saved_batch}
md_batch = _rewrite_image_refs(md_raw, slug_to_key)
all_saved.extend(saved_batch)
succeeded.append({"start_page": start_page, "end_page": end_page, "md": md_batch})
manifest = _split_manifest(page_count, n_batches, succeeded, failed)
# ---- 결과 판정 ----
if not succeeded:
await _fail(session, document_id, json.dumps(manifest, ensure_ascii=False)[:4000])
logger.warning(f"[marker] split failed id={document_id} all {n_batches} batches failed")
return
md_status = "success" if not failed else "partial"
stitched = "\n\n".join(b["md"] for b in succeeded)
md_content = _build_large_md_content(stitched[:LARGE_DOC_MD_CONTENT_HEAD_CHARS], manifest)
quality = _compute_quality(stitched, doc.extracted_text or "", {"page_count": page_count})
quality.setdefault("metrics", {})["split"] = manifest
if images_truncated:
quality.setdefault("warnings", []).append("images_truncated")
orphan_paths = await _sync_document_images(
session, document_id, all_saved,
{"engine": engine or "marker", "engine_version": engine_version},
)
await session.execute(
update(Document).where(Document.id == document_id).values(
md_content=md_content,
md_status=md_status,
md_extraction_engine=engine or "marker",
md_extraction_engine_version=engine_version,
md_extraction_quality=quality,
md_content_hash=hashlib.sha256(md_content.encode("utf-8")).hexdigest(),
md_source_hash=doc.file_hash,
md_generated_at=_now(),
md_extraction_error=(
None if md_status == "success"
else json.dumps(manifest, ensure_ascii=False)[:4000]
),
md_frontmatter=doc.md_frontmatter or {},
md_format_version="1.0",
content_origin="extracted",
)
)
await session.commit()
for orphan_path in orphan_paths:
try:
Path(orphan_path).unlink(missing_ok=True)
except Exception as exc:
logger.warning(
f"[marker] orphan image unlink failed id={document_id} path={orphan_path}: "
f"{type(exc).__name__}: {exc}"
)
logger.info(
f"[marker] split done id={document_id} status={md_status} pages={page_count} "
f"batches={n_batches} ok={len(succeeded)} failed={len(failed)} "
f"images={len(all_saved)} md_head_len={len(md_content)}"
)
def _http_error_message(exc: httpx.HTTPStatusError) -> str:
"""marker-service 에러 응답에서 code: message 추출 (JSON 실패 시 raw text)."""
try:
detail = exc.response.json().get("detail", {})
return f'{detail.get("code", "unknown")}: {detail.get("message", exc.response.text)}'
except Exception:
return exc.response.text
def _split_manifest(
total_pages: int, n_batches: int,
succeeded: list[dict[str, Any]], failed: list[dict[str, Any]],
) -> dict[str, Any]:
"""split 결과 manifest — md_extraction_error/quality.metrics.split 공통."""
return {
"mode": "split",
"total_pages": total_pages,
"batch_pages": BATCH_PAGES,
"batch_count": n_batches,
"succeeded_batches": len(succeeded),
"failed_batches": [
{"start_page": f["start_page"], "end_page": f["end_page"], "error": f["error"]}
for f in failed
],
"canonical_storage": "document_chunks(source_type=marker_section)",
}
def _build_large_md_content(head: str, manifest: dict[str, Any]) -> str:
"""대형 split 문서의 md_content = manifest 헤더 + head preview (full blob 미저장)."""
failed_n = len(manifest["failed_batches"])
badge = "partial" if failed_n else "success"
header = (
f"<!-- LARGE_DOC_SPLIT mode=split total_pages={manifest['total_pages']} "
f"batch_pages={manifest['batch_pages']} batches={manifest['batch_count']} "
f"failed={failed_n} -->\n"
f"> 📄 **대형 문서 분할 변환** — 총 {manifest['total_pages']}p · "
f"{manifest['batch_count']} batch (batch_pages={manifest['batch_pages']}) · 상태 **{badge}**\n"
f"> 권위 검색본 = `document_chunks` (source_type=marker_section). 아래는 head preview.\n\n"
)
return header + head
async def _read_force_reprocess(session: AsyncSession, document_id: int) -> bool:
"""현재 markdown stage queue 행의 payload.force_reprocess 조회. 없으면 False."""
row = await session.scalar(
@@ -291,18 +515,20 @@ async def _read_force_reprocess(session: AsyncSession, document_id: int) -> bool
def _persist_images_to_nas(
document_id: int, images_resp: list[dict[str, Any]]
document_id: int, images_resp: list[dict[str, Any]], seq_offset: int = 0
) -> list[dict[str, Any]]:
"""marker 응답 이미지 list 를 NAS 에 저장하고 메타 dict 리스트 반환.
image_key 는 sequence 기반 결정적 (`img_001` → `img_NNN`, marker 출력 순서 = 안정적).
같은 doc 재변환 시 같은 key 가 같은 path 에 overwrite → idempotent.
seq_offset: split 변환에서 batch 간 key 연속성 유지용 (batch1 0개 → batch2 offset=N).
"""
img_root = EXTRACTED_IMAGES_ROOT / str(document_id)
img_root.mkdir(parents=True, exist_ok=True)
saved: list[dict[str, Any]] = []
for seq, img in enumerate(images_resp, start=1):
for seq, img in enumerate(images_resp, start=seq_offset + 1):
try:
raw_bytes = base64.b64decode(img["bytes_b64"])
except Exception as exc: