feat(presegment): G2 PR-2 — presegment 워커 + 큐 배선 + range-clamp (deterministic ToC)

extract 前 presegment 스테이지: 전 문서 진입, 非PDF/단일은 무변 통과, '명확한 번들' PDF만
ToC(level-1) deterministic 분할. LLM 폴백은 PR-3.
- presegment_worker: 보수적 게이트(pages>=60·자식>=5p·연속/단조/전범위·2<=N<=50) + 멱등
  (lineage segmented_from 존재 시 수렴) + 자식=부모파일 공유(Option A)+range
- queue_consumer: BATCH_SIZE/MAIN_QUEUE_STAGES/_load_workers + presegment->extract 전이,
  parent(번들원본)는 억제(자식이 직접 extract enqueue)
- ingest(documents.py upload·file_watcher): 첫 stage extract->presegment
- extract_worker/marker_worker: bundle_page_start/end 시 해당 범위만 추출/변환
  (NULL=일반문서 byte-identical 무회귀 — 검수 확인)

코드 검수 완료(무회귀·full_path 스코프·NOT NULL 커버·py_compile). **미배포** —
실제 번들 PDF 처리 검증 후 배포(PR-3 LLM 폴백과 함께).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
hyungi
2026-06-18 16:55:27 +09:00
parent d75fb7adaa
commit c3d5c33813
6 changed files with 486 additions and 25 deletions
+4 -2
View File
@@ -1166,8 +1166,10 @@ async def upload_document(
doc.duplicate_of = canonical.id
canonical.duplicate_count = (canonical.duplicate_count or 0) + 1
# document + processing_queue 는 단일 트랜잭션으로 묶어 원자적 정리
await enqueue_stage(session, doc.id, "extract")
# document + processing_queue 는 단일 트랜잭션으로 묶어 원자적 정리.
# G2 (PR-G2-2): 첫 stage = presegment (extract 前). 非PDF/단일문서는 presegment 가
# 변경 없이 통과시켜 extract 로 흐르고, 번들 PDF 만 자식 분할된다 (worker-side gating).
await enqueue_stage(session, doc.id, "presegment")
await session.commit()
except Exception:
# DB 예외 시 session 은 get_session 컨텍스트 종료로 자동 rollback.
+68 -7
View File
@@ -67,21 +67,45 @@ def _postprocess_ocr(text: str) -> str:
return text.strip()
def _extract_pdf_pymupdf(file_path: Path) -> str:
"""PyMuPDF fallback — 페이지 단위 스트리밍으로 대형 PDF도 저메모리 처리"""
def _extract_pdf_pymupdf(
file_path: Path, start_page: int | None = None, end_page: int | None = None
) -> str:
"""PyMuPDF fallback — 페이지 단위 스트리밍으로 대형 PDF도 저메모리 처리.
G2 (PR-G2-2): start_page/end_page(1-based inclusive) 주어지면 범위만 추출
(번들 자식 doc = 부모 파일 공유 + 자기 page 범위). None = 전체(기존 동작 동일).
"""
import fitz
text_parts = []
with fitz.open(str(file_path)) as doc:
for page in doc:
text_parts.append(page.get_text())
if start_page is None and end_page is None:
for page in doc:
text_parts.append(page.get_text())
else:
# 1-based inclusive → 0-based range. 범위는 [0, page_count] 로 클램프(방어).
total = doc.page_count
lo = max(1, start_page or 1) - 1
hi = min(total, end_page or total) # inclusive 끝 (0-based 마지막 인덱스 = hi-1)
for i in range(lo, hi):
text_parts.append(doc.load_page(i).get_text())
return "\n".join(text_parts)
def _get_pdf_page_count(file_path: Path) -> int:
"""PDF 페이지 수 확인"""
def _get_pdf_page_count(
file_path: Path, start_page: int | None = None, end_page: int | None = None
) -> int:
"""PDF 페이지 수 확인. G2: 범위가 주어지면 그 범위의 페이지 수(자식 doc 밀도 계산용).
None = 전체 페이지 (기존 동작 동일).
"""
import fitz
with fitz.open(str(file_path)) as doc:
return len(doc)
total = len(doc)
if start_page is None and end_page is None:
return total
lo = max(1, start_page or 1)
hi = min(total, end_page or total)
return max(0, hi - lo + 1)
async def _call_ocr(file_path: Path, is_image: bool, max_pages: int = 200) -> str | None:
@@ -310,6 +334,43 @@ async def process(document_id: int, session: AsyncSession) -> None:
doc.extracted_at = datetime.now(timezone.utc)
return
# ─── G2 (PR-G2-2): 번들 자식 PDF — 부모 파일 공유 + 자기 page 범위만 추출 ───
# kordoc 서비스는 page-range 파라미터가 없어 전체 파일을 파싱한다(자식엔 부적합) → kordoc
# 우회, PyMuPDF 로 [bundle_page_start, bundle_page_end] 범위만 추출. range OCR 은 본 PR 범위
# 밖(자식은 ToC 존재 = digital text layer 전제 → 대개 OCR 불필요). PyMuPDF 텍스트가 빈약해도
# 그대로 보존하고 사유를 남긴다.
if fmt == "pdf" and doc.bundle_page_start is not None and doc.bundle_page_end is not None:
if not full_path.exists():
raise FileNotFoundError(f"파일 없음: {full_path}")
start, end = doc.bundle_page_start, doc.bundle_page_end
try:
pymupdf_text = _extract_pdf_pymupdf(full_path, start, end)
page_count = _get_pdf_page_count(full_path, start, end)
except Exception as e:
logger.error(f"[pymupdf:child] {doc.file_path} pages={start}-{end} 실패: {e}")
raise
meta = doc.extract_meta or {}
meta["presegment_child_range"] = {"start_page": start, "end_page": end}
meta["pymupdf_chars"] = len(pymupdf_text.strip())
should, reason = _should_ocr(pymupdf_text, page_count)
if should:
# range OCR 미지원(후속 PR) — PyMuPDF 결과 유지 + 사유 기록(silent skip 아님).
meta["ocr_skip_reason"] = "presegment_child_range_ocr_unsupported"
meta["ocr_reason"] = reason
logger.warning(
f"[pymupdf:child] {doc.file_path} pages={start}-{end} "
f"OCR 필요({reason})하나 range OCR 미지원 → PyMuPDF 결과 유지"
)
doc.extracted_text = pymupdf_text.replace("\x00", "")
doc.extracted_at = datetime.now(timezone.utc)
doc.extractor_version = PYMUPDF_VERSION if pymupdf_text.strip() else None
doc.extract_meta = meta
logger.info(
f"[pymupdf:child] {doc.file_path} pages={start}-{end} ({len(pymupdf_text)}자)"
)
return
# ─── kordoc 파싱 (HWP/HWPX/PDF) + PyMuPDF fallback + OCR ───
if fmt in KORDOC_FORMATS:
container_path = f"/documents/{doc.file_path}"
+7 -3
View File
@@ -118,16 +118,18 @@ def _route_media(path: Path, expected_category: str | None) -> tuple[str | None,
if expected_category == "library":
# 외부 작성 학습 자료 (KGS Code, 시행규칙 등). 문서 확장자만 수락.
# frontmatter 해석은 classify_worker (옵션 C) 가 담당. file_watcher 는 라우팅만.
# G2 (PR-G2-2): 문서 첫 stage = presegment (extract 前). 非PDF/단일은 통과, 번들 PDF 만 분할.
if ext in LIBRARY_DOC_EXTS:
return ("library", False, "extract")
return ("library", False, "presegment")
if ext in AUDIO_EXTS or ext in VIDEO_DIRECT_EXTS or ext in VIDEO_QUARANTINE_EXTS:
return (None, False, None) # audio/video 잘못 들어오면 skip
return (None, False, None) # 기타 알 수 없는 확장자 skip
# Inbox: 문서 파이프 (기존). audio/video 확장자가 실수로 여기 들어오면 skip.
# G2 (PR-G2-2): 문서 첫 stage = presegment (extract 前). 非PDF/단일은 통과, 번들 PDF 만 분할.
if ext in AUDIO_EXTS or ext in VIDEO_DIRECT_EXTS or ext in VIDEO_QUARANTINE_EXTS:
return (None, False, None)
return (None, False, "extract")
return (None, False, "presegment")
# ─── Web/Blog ingest (devonagent 트랙) 헬퍼 ──────────────────────────────────
@@ -226,7 +228,9 @@ async def _ingest_web_file(session, file_path: Path, rel_path: str) -> tuple[int
)
session.add(doc)
await session.flush()
await enqueue_stage(session, doc.id, "extract")
# G2 (PR-G2-2): 모든 문서가 presegment 로 진입(단일 entry-point). HTML(非PDF)은 presegment 가
# 변경 없이 통과시켜 extract 로 흐른다 (worker-side gating).
await enqueue_stage(session, doc.id, "presegment")
return (1, 0)
+45 -9
View File
@@ -207,7 +207,21 @@ async def process(document_id: int, session: AsyncSession) -> None:
return
# ---- (4) page_count gauge + 분기 (LargeDoc split) ----
page_count = _get_page_count(container_path)
# G2 (PR-G2-2): 번들 자식 doc 은 부모 파일 공유 + 자기 page 범위([bundle_page_start, end],
# 1-based inclusive)만 변환해야 한다. page_offset = 절대 시작페이지(부모 파일 기준), page_count =
# 자식 범위의 페이지 수. cols 가 NULL(일반 doc)이면 page_offset=1 + 전체 page_count = 기존 동작 동일.
file_page_count = _get_page_count(container_path)
is_child = doc.bundle_page_start is not None and doc.bundle_page_end is not None
if is_child:
page_offset = doc.bundle_page_start
if file_page_count is not None:
child_end = min(doc.bundle_page_end, file_page_count)
page_count = max(0, child_end - doc.bundle_page_start + 1)
else:
page_count = doc.bundle_page_end - doc.bundle_page_start + 1
else:
page_offset = 1
page_count = file_page_count
# >MAX_SPLIT_PAGES = 변환 안전상태(manual_review). silently skip 아님.
if page_count is not None and page_count > MAX_SPLIT_PAGES:
@@ -226,20 +240,35 @@ async def process(document_id: int, session: AsyncSession) -> None:
# ---- (6) 변환 분기: 소형 1-shot / 대형(>SPLIT_THRESHOLD) page-range 분할 ----
if page_count is not None and page_count > SPLIT_THRESHOLD_PAGES:
await _process_split(doc, document_id, container_path, page_count, session)
await _process_split(doc, document_id, container_path, page_count, session, page_offset)
else:
await _process_single(doc, document_id, container_path, session)
await _process_single(doc, document_id, container_path, session, page_count, page_offset)
async def _process_single(
doc: Document, document_id: int, container_path: str, session: AsyncSession
doc: Document, document_id: int, container_path: str, session: AsyncSession,
page_count: int | None = None, page_offset: int = 1,
) -> None:
"""소형 PDF(≤ SPLIT_THRESHOLD_PAGES) 통째 1-shot 변환 (Phase 1B/1B.5 기존 경로)."""
"""소형 PDF(≤ SPLIT_THRESHOLD_PAGES) 통째 1-shot 변환 (Phase 1B/1B.5 기존 경로).
G2 (PR-G2-2): 번들 자식(page_offset>1) [page_offset, page_offset+page_count-1] 범위만
변환하도록 marker start_page/end_page 명시한다. 일반 doc(page_offset=1) 기존과
동일하게 max_pages 보낸다(payload byte-identical).
"""
# 일반 doc = 기존 payload 유지. 자식만 절대 page 범위를 명시(부모 파일 기준 1-based inclusive).
if page_offset > 1 and page_count is not None:
req_json = {
"file_path": container_path,
"start_page": page_offset,
"end_page": page_offset + page_count - 1,
}
else:
req_json = {"file_path": container_path, "max_pages": MAX_PAGES}
try:
async with httpx.AsyncClient(timeout=MARKER_TIMEOUT) as client:
resp = await client.post(
MARKER_ENDPOINT,
json={"file_path": container_path, "max_pages": MAX_PAGES},
json=req_json,
)
resp.raise_for_status()
data = resp.json()
@@ -513,6 +542,7 @@ async def _process_split(
container_path: str,
page_count: int,
session: AsyncSession,
page_offset: int = 1,
) -> None:
"""대형 PDF page-range 분할 변환.
@@ -523,6 +553,10 @@ async def _process_split(
invariant: page numbering = 1-based inclusive (batch1: 1..BATCH_PAGES, ...).
marker slug(`_page_0_*`) batch 마다 재시작 batch rewrite stitch (충돌 회피).
G2 (PR-G2-2): page_offset = 부모 파일 기준 절대 시작페이지(번들 자식). marker 보내는
page 절대값(page_offset 가산), manifest/기록은 자식 상대값(1-based) 유지 일반 doc
(page_offset=1) abs==rel 이라 기존 동작과 동일.
"""
n_batches = (page_count + BATCH_PAGES - 1) // BATCH_PAGES
succeeded: list[dict[str, Any]] = [] # {start_page, end_page, md}
@@ -534,15 +568,17 @@ async def _process_split(
async with httpx.AsyncClient(timeout=MARKER_TIMEOUT) as client:
for b in range(n_batches):
start_page = b * BATCH_PAGES + 1
start_page = b * BATCH_PAGES + 1 # 자식 상대 1-based (manifest/기록용)
end_page = min((b + 1) * BATCH_PAGES, page_count)
abs_start = start_page + (page_offset - 1) # 부모 파일 절대 page (marker 요청용)
abs_end = end_page + (page_offset - 1)
try:
resp = await client.post(
MARKER_ENDPOINT,
json={
"file_path": container_path,
"start_page": start_page,
"end_page": end_page,
"start_page": abs_start,
"end_page": abs_end,
},
)
resp.raise_for_status()
+339
View File
@@ -0,0 +1,339 @@
"""presegment_worker — extract 前 번들 PDF(여러 논리문서 한 파일) → N 자식 분할 (G2 / PR-G2-2).
문서가 presegment stage 진입한다(worker-side gating):
- 非PDF(file_format != pdf · suffix != .pdf) = 즉시 fast-exit enqueue_next_stage extract 흘림.
- PDF = PyMuPDF ToC(level-1) deterministic 분석. '명확한 번들' 자식 분할, 나머지는 단일문서로 extract.
PR **deterministic ** (LLM fallback = 후속 PR). 판정이 애매하면 보수적으로 분할하지 않고
단일문서로 둔다(bias to NOT splitting). 분할 = '확실한 번들' :
- page_count >= MIN_BUNDLE_PAGES AND level-1 ToC 항목 >= 2 AND 모든 자식 >= MIN_CHILD_PAGES
AND 단조 증가·비중첩 AND [1, page_count] 범위 커버 AND 2 <= N <= MAX_CHILDREN.
분할 Option A(파일 물리분할 없음): 자식은 부모 file_path 그대로 공유하고
bundle_page_start/end(1-based inclusive) 자기 page 범위만 가리킨다. 부모-자식 관계 자체는
document_lineage(relation_type='segmented_from'). 부모(presegment_role='parent') 파일 홀더라
자체 extract/embed enqueue_next_stage presegmentextract 전이가 'parent' 억제된다
(queue_consumer.enqueue_next_stage 참조). 자식의 extract 워커가 직접 enqueue 한다.
멱등: 재실행 같은 부모로 이미 자식이 있으면(document_lineage segmented_from) 재생성하지 않고
수렴( 자식이 extract 활성/완료 상태인지만 보장)한다.
plan: G2 pre-segmentation (PR-G2-2 deterministic ToC segmentation)
"""
import hashlib
import os
import unicodedata
from pathlib import Path
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from core.config import settings
from core.utils import setup_logger
from models.document import Document
from models.document_lineage import DocumentLineage
from models.queue import enqueue_stage
logger = setup_logger("presegment_worker")
# ─── 임계값 (모듈 상수, env-override 가능, 보수적 = 분할 안 하는 쪽으로 bias) ───
# MIN_BUNDLE_PAGES: 이 미만이면 번들로 보지 않음(단일문서). 짧은 문서의 우연한 level-1 ToC 보호.
MIN_BUNDLE_PAGES = int(os.getenv("PRESEGMENT_MIN_BUNDLE_PAGES", "60"))
# MIN_CHILD_PAGES: 자식 하나라도 이 미만이면 분할 거부(표지/목차만 떼지는 over-split 방지).
MIN_CHILD_PAGES = int(os.getenv("PRESEGMENT_MIN_CHILD_PAGES", "5"))
# MAX_CHILDREN: 자식 수 상한. 초과 = ToC 가 챕터/소제목 수준이라 논리문서 경계가 아님 → 분할 거부.
MAX_CHILDREN = int(os.getenv("PRESEGMENT_MAX_CHILDREN", "50"))
# marker_worker._to_marker_path 와 동일 — NAS 상대경로 → 컨테이너 절대경로 prefix.
CONTAINER_PATH_PREFIX = os.getenv("MARKER_CONTAINER_PATH_PREFIX", "/documents")
def _resolve_path(file_path: str) -> Path | None:
"""NFC(DB) vs NFD(NFS) 한글 경로 차이 흡수. thumbnail_worker._resolve_path 와 동일 패턴."""
candidates = [
file_path,
unicodedata.normalize("NFD", file_path),
unicodedata.normalize("NFC", file_path),
]
for c in candidates:
p = Path(c)
if p.exists():
return p
parent = Path(file_path).parent
if parent.exists():
target = unicodedata.normalize("NFC", Path(file_path).name)
for child in parent.iterdir():
if unicodedata.normalize("NFC", child.name) == target:
return child
return None
def _to_container_path(file_path: str) -> str:
"""file_path 를 컨테이너 내부 절대경로로 변환 (marker_worker._to_marker_path 와 동일)."""
if file_path.startswith("/"):
return file_path
return f"{CONTAINER_PATH_PREFIX}/{file_path}"
def _is_pdf(doc: Document) -> bool:
"""PDF 판정 — file_format=pdf 또는 .pdf 확장자."""
fmt = (doc.file_format or "").lower()
if fmt == "pdf":
return True
if doc.file_path:
return Path(doc.file_path).suffix.lower() == ".pdf"
return False
def _level1_segments(toc: list, page_count: int) -> list[dict]:
"""get_toc(simple=True) 결과에서 level-1 항목만 골라 자식 후보 segment 리스트 생성.
toc 항목 = [level, title, page] (page 1-based). level==1 채택.
end_page = 다음 level-1 항목의 page - 1, 마지막 = page_count.
동일 page 에서 시작하는 level-1 여럿이면 정렬 인접 항목으로 경계 계산되며,
경우 0-페이지 segment 생겨 후속 검증(MIN_CHILD_PAGES·단조)에서 거부된다.
"""
starts = []
for entry in toc:
# simple=True 는 [level, title, page]. 방어적으로 길이 체크.
if not entry or len(entry) < 3:
continue
level, title, page = entry[0], entry[1], entry[2]
if level != 1:
continue
# ToC page 가 범위 밖(0/음수/page_count 초과)이면 깨진 ToC → 후속 검증에서 거부됨.
starts.append((int(page), (title or "").strip()))
# ToC 가 정렬돼 있지 않을 수 있으므로 page 기준 정렬(원본 순서 보존 위해 안정 정렬).
starts.sort(key=lambda x: x[0])
segments: list[dict] = []
for i, (start_page, title) in enumerate(starts):
if i + 1 < len(starts):
end_page = starts[i + 1][0] - 1
else:
end_page = page_count
segments.append({"start_page": start_page, "end_page": end_page, "title": title})
return segments
def _is_clear_bundle(segments: list[dict], page_count: int) -> tuple[bool, str]:
"""deterministic '명확한 번들' 판정. (clear, reason) 반환.
clear=True reason="" / clear=False reason 거부 사유(로깅용).
모든 조건은 보수적 하나라도 어긋나면 단일문서로 처리(분할 ).
"""
n = len(segments)
if n < 2:
return False, f"too_few_level1_entries(n={n})"
if n > MAX_CHILDREN:
return False, f"too_many_children(n={n}>{MAX_CHILDREN})"
# 첫 segment 가 1페이지에서 시작 + 마지막이 page_count 에서 끝 = 전 범위 커버.
if segments[0]["start_page"] != 1:
return False, f"first_start_not_1(start={segments[0]['start_page']})"
if segments[-1]["end_page"] != page_count:
return False, f"last_end_not_page_count(end={segments[-1]['end_page']},pc={page_count})"
prev_end = 0
for seg in segments:
start, end = seg["start_page"], seg["end_page"]
# 단조 증가 · 비중첩: 각 start 는 직전 end + 1 이어야 빈틈/겹침 없이 [1,pc] 정확 분할.
if start != prev_end + 1:
return False, f"non_contiguous(start={start},prev_end={prev_end})"
if end < start:
return False, f"non_monotonic(start={start},end={end})"
if (end - start + 1) < MIN_CHILD_PAGES:
return False, f"child_too_small(pages={end - start + 1}<{MIN_CHILD_PAGES})"
prev_end = end
if prev_end != page_count:
return False, f"coverage_gap(covered={prev_end},pc={page_count})"
return True, ""
def _child_title(parent: Document, seg: dict) -> str:
"""자식 제목 = 부모 제목 + '' + (segment 제목 또는 page 범위)."""
base = (parent.title or "").strip() or (parent.original_filename or "") or "문서"
seg_title = (seg.get("title") or "").strip()
suffix = seg_title if seg_title else f"p.{seg['start_page']}-{seg['end_page']}"
return f"{base}{suffix}"
def _child_file_hash(parent_hash: str, start: int, end: int) -> str:
"""자식 file_hash = sha256(f"{parent.file_hash}:{start}-{end}"). 결정적 → 재실행 멱등.
부모 file_hash NULL 수는 없으나(NOT NULL) 방어적으로 문자열 처리.
"""
return hashlib.sha256(f"{parent_hash or ''}:{start}-{end}".encode("utf-8")).hexdigest()
async def _ensure_child_extract(session: AsyncSession, child_id: int) -> None:
"""자식이 아직 extract 안 됐으면 extract enqueue (멱등 수렴 경로).
이미 extracted_text 채워졌거나 활성 행이 있으면 enqueue_stage no-op/skip.
"""
child = await session.get(Document, child_id)
if child is None:
return
# 이미 추출 완료면 재enqueue 불필요 (큐 중복은 enqueue_stage 가 막지만 의미상으로도 skip).
if child.extracted_at is not None and child.extracted_text is not None:
return
await enqueue_stage(session, child_id, "extract")
async def process(document_id: int, session: AsyncSession) -> None:
"""presegment stage 워커 진입점. queue_consumer 가 호출.
문서가 진입하며, 非PDF·단일문서는 변경 없이 통과(presegment_role 그대로 NULL) extract 흐른다.
'명확한 번들' PDF 자식 분할 + 부모를 'parent' 표식( 경우 부모는 extract 흐르지 않음).
"""
doc = await session.get(Document, document_id)
if doc is None:
logger.warning(f"[presegment] document {document_id} not found")
return
# ─── (0) 非PDF — fast-exit. presegment_role 그대로 NULL → enqueue_next_stage 가 extract 로 흘림 ───
if not _is_pdf(doc):
logger.info(f"[presegment] id={document_id} non-pdf (fmt={doc.file_format}) → extract")
return
# ─── (0.5) file_path 없음(예: note) — 분할 불가, 단일문서로 통과 ───
if not doc.file_path:
logger.info(f"[presegment] id={document_id} no file_path → extract")
return
# ─── (1) 이미 분할된 자식 자신이 presegment 로 다시 들어온 경우 — 재분할 금지 ───
# (정상 흐름에선 자식은 곧장 extract 로 enqueue 되지만, 재처리 스크립트 등으로 들어올 수 있음.)
if doc.presegment_role in ("child", "parent"):
logger.info(
f"[presegment] id={document_id} already presegment_role={doc.presegment_role} → skip"
)
return
# ─── (2) 파일 열기 + page_count ───
raw = str(Path(settings.nas_mount_path) / doc.file_path)
source = _resolve_path(raw)
if source is None:
# 파일 부재 = extract 가 동일 상황에서 FileNotFoundError 로 처리할 사안.
# presegment 는 분할 불가일 뿐이므로 단일문서로 통과시켜 extract 가 일관되게 처리하게 둔다.
logger.warning(f"[presegment] id={document_id} file not found ({raw}) → extract")
return
import fitz # PyMuPDF — extract_worker/marker_worker 와 동일 의존
try:
with fitz.open(str(source)) as pdf:
page_count = pdf.page_count
toc = pdf.get_toc(simple=True) or []
except Exception as exc:
# PDF 손상 등 — 분할 불가. 단일문서로 통과(extract 가 PyMuPDF/OCR 로 재시도하며 가시화).
logger.warning(
f"[presegment] id={document_id} fitz open/toc failed "
f"({type(exc).__name__}: {exc}) → extract"
)
return
# ─── (3) page_count 가 임계 미만 = 단일문서 (대다수 경로) ───
if page_count < MIN_BUNDLE_PAGES:
logger.info(
f"[presegment] id={document_id} single doc "
f"(pages={page_count}<{MIN_BUNDLE_PAGES}) → extract"
)
return
# ─── (4) level-1 ToC → 자식 후보 segment ───
segments = _level1_segments(toc, page_count)
if not segments:
# 큰 PDF 인데 ToC 없음/level-1 없음 = 애매(LLM fallback 대상, 후속 PR).
# 이 PR 은 기본 = 단일문서로 처리하고 사유를 남긴다.
logger.info(
f"[presegment] presegment_ambiguous id={document_id} "
f"reason=no_level1_toc pages={page_count} → single doc(extract)"
)
return
clear, reason = _is_clear_bundle(segments, page_count)
if not clear:
# 큰 PDF + ToC 는 있으나 '명확한 번들' 기준 미달 = 애매 → 단일문서(분할 안 함).
logger.info(
f"[presegment] presegment_ambiguous id={document_id} "
f"reason={reason} pages={page_count} level1={len(segments)} → single doc(extract)"
)
return
# ─── (5) 명확한 번들 — 멱등 체크: 이미 자식이 있으면 수렴만 ───
existing_children = (
await session.execute(
select(DocumentLineage.derived_document_id).where(
DocumentLineage.source_document_id == doc.id,
DocumentLineage.relation_type == "segmented_from",
)
)
).scalars().all()
if existing_children:
# 부모 표식이 누락된 경우 보정(이전 부분실패 복구).
if doc.presegment_role != "parent":
doc.presegment_role = "parent"
for child_id in existing_children:
await _ensure_child_extract(session, child_id)
await session.commit()
logger.info(
f"[presegment] id={document_id} children already exist "
f"(n={len(existing_children)}) → converge(ensure extract), no re-create"
)
return
# ─── (6) 자식 N개 생성 + lineage + extract enqueue ───
n = len(segments)
created_ids: list[int] = []
for seg in segments:
start, end = seg["start_page"], seg["end_page"]
child = Document(
# Option A: 부모 파일 그대로 공유(물리 분할 없음). 자식은 bundle_page_start/end 로 슬라이스.
file_path=doc.file_path,
file_hash=_child_file_hash(doc.file_hash, start, end),
file_format=doc.file_format,
file_size=doc.file_size,
file_type=doc.file_type,
import_source=doc.import_source,
original_filename=doc.original_filename,
source_channel=doc.source_channel,
category=doc.category,
data_origin=doc.data_origin,
doc_purpose=doc.doc_purpose,
# 안전 자료실 축은 부모에서 상속(분할이 자료유형/관할을 바꾸지 않음).
material_type=doc.material_type,
jurisdiction=doc.jurisdiction,
title=_child_title(doc, seg),
bundle_page_start=start,
bundle_page_end=end,
presegment_role="child",
)
session.add(child)
await session.flush() # child.id 확보
created_ids.append(child.id)
session.add(
DocumentLineage(
source_document_id=doc.id,
derived_document_id=child.id,
relation_type="segmented_from",
meta={"start_page": start, "end_page": end},
)
)
# 자식 extract 는 워커가 직접 enqueue (부모는 'parent' 라 extract 로 흐르지 않음).
await enqueue_stage(session, child.id, "extract")
# 부모 = 파일 홀더. presegment→extract 전이는 enqueue_next_stage 가 'parent' 면 억제.
doc.presegment_role = "parent"
await session.commit()
logger.info(
f"[presegment] id={document_id} SPLIT into {n} children "
f"(pages={page_count}) child_ids={created_ids}"
)
+23 -4
View File
@@ -31,9 +31,9 @@ _hold_logged = False
# embed/chunk 1→10 (2026-06-12 fast-consumer): 건당 <1s 실측 — Phase 0.1 초기 보수값이
# LLM 사이클에 인질로 잡혀 실효 ~580/일 vs 수요 최대 2,700/일 → 적체 원인이었음.
# 10 = TEI/marker 와 GPU 공유 고려한 보수 상향(전용 1분 잡 기준 캡 ~14,400/일).
BATCH_SIZE = {"extract": 5, "classify": 3, "summarize": 3, "embed": 10, "chunk": 10,
"preview": 2, "stt": 1, "thumbnail": 3, "deep_summary": 1, "markdown": 1,
"fulltext": 3}
BATCH_SIZE = {"presegment": 3, "extract": 5, "classify": 3, "summarize": 3, "embed": 10,
"chunk": 10, "preview": 2, "stt": 1, "thumbnail": 3, "deep_summary": 1,
"markdown": 1, "fulltext": 3}
STALE_THRESHOLD_MINUTES = 10
# markdown 대형 split 변환은 한 doc 이 수십 분(5210 ≈ 40분) 동안 processing 상태로 머문다.
# marker_worker 는 queue 행에 heartbeat 를 찍지 않으므로(started_at 고정), main 의 10분
@@ -46,7 +46,7 @@ MARKDOWN_STALE_THRESHOLD_MINUTES = int(os.getenv("MARKDOWN_STALE_MINUTES", "120"
# (reset_stale_items 가 자기 집합만 reset, 교차 시 이중 복구 위험).
# STT 도 장기 작업 가능성이 있으나 본 PR 범위 밖 — main 에 유지(follow-up).
MAIN_QUEUE_STAGES = [
"extract", "classify", "summarize",
"presegment", "extract", "classify", "summarize",
"preview", "stt", "thumbnail", "fulltext",
]
MARKDOWN_QUEUE_STAGES = ["markdown"]
@@ -165,6 +165,10 @@ async def enqueue_next_stage(document_id: int, current_stage: str):
}
next_stages = {
# G2 (PR-G2-2): 전 문서가 presegment → extract. 단, 번들 분할로 'parent' 가 된 문서는
# 파일 홀더라 자체 extract 안 함 — 아래 suppression 으로 이 전이를 건너뛴다(자식 extract 는
# presegment_worker 가 직접 enqueue). 단일/非PDF 문서(role NULL)는 정상적으로 extract 로 흐름.
"presegment": ["extract"],
"extract": ["classify", "preview"],
"classify": ["embed", "chunk", "markdown"],
"stt": ["classify"],
@@ -180,6 +184,18 @@ async def enqueue_next_stage(document_id: int, current_stage: str):
stages = extract_override_by_channel[sc]
else:
stages = next_stages.get(current_stage, [])
elif current_stage == "presegment":
# 번들 분할 parent 는 extract 로 흐르지 않게 억제 (자식이 부모 extract 에 가려지는 것 방지).
# role NULL(단일/非PDF) / 'child' 는 정상 전이. presegment_worker 가 자식 extract 를 직접
# enqueue 하므로 'parent' 만 여기서 no-op.
from models.document import Document
async with async_session() as lookup_session:
doc = await lookup_session.get(Document, document_id)
role = doc.presegment_role if doc else None
if role == "parent":
stages = []
else:
stages = next_stages.get(current_stage, [])
else:
stages = next_stages.get(current_stage, [])
@@ -199,6 +215,7 @@ def _load_workers():
from workers.deep_summary_worker import process as deep_summary_process
from workers.embed_worker import process as embed_process
from workers.extract_worker import process as extract_process
from workers.presegment_worker import process as presegment_process
from workers.preview_worker import process as preview_process
from workers.stt_worker import process as stt_process
from workers.summarize_worker import process as summarize_process
@@ -207,6 +224,8 @@ def _load_workers():
from workers.fulltext_worker import process as fulltext_process
return {
# G2 (PR-G2-2): extract 前 번들 PDF → N 자식 분할 (deterministic ToC). 非PDF/단일은 통과.
"presegment": presegment_process,
"extract": extract_process,
"classify": classify_process,
"summarize": summarize_process,