diff --git a/app/api/documents.py b/app/api/documents.py index 2d5ddde..b7ffbcb 100644 --- a/app/api/documents.py +++ b/app/api/documents.py @@ -1166,8 +1166,10 @@ async def upload_document( doc.duplicate_of = canonical.id canonical.duplicate_count = (canonical.duplicate_count or 0) + 1 - # document + processing_queue 는 단일 트랜잭션으로 묶어 원자적 정리 - await enqueue_stage(session, doc.id, "extract") + # document + processing_queue 는 단일 트랜잭션으로 묶어 원자적 정리. + # G2 (PR-G2-2): 첫 stage = presegment (extract 前). 非PDF/단일문서는 presegment 가 + # 변경 없이 통과시켜 extract 로 흐르고, 번들 PDF 만 자식 분할된다 (worker-side gating). + await enqueue_stage(session, doc.id, "presegment") await session.commit() except Exception: # DB 예외 시 session 은 get_session 컨텍스트 종료로 자동 rollback. diff --git a/app/workers/extract_worker.py b/app/workers/extract_worker.py index cc162b1..b671e6c 100644 --- a/app/workers/extract_worker.py +++ b/app/workers/extract_worker.py @@ -67,21 +67,45 @@ def _postprocess_ocr(text: str) -> str: return text.strip() -def _extract_pdf_pymupdf(file_path: Path) -> str: - """PyMuPDF fallback — 페이지 단위 스트리밍으로 대형 PDF도 저메모리 처리""" +def _extract_pdf_pymupdf( + file_path: Path, start_page: int | None = None, end_page: int | None = None +) -> str: + """PyMuPDF fallback — 페이지 단위 스트리밍으로 대형 PDF도 저메모리 처리. + + G2 (PR-G2-2): start_page/end_page(1-based inclusive) 가 주어지면 그 범위만 추출 + (번들 자식 doc = 부모 파일 공유 + 자기 page 범위). 둘 다 None = 전체(기존 동작 동일). + """ import fitz text_parts = [] with fitz.open(str(file_path)) as doc: - for page in doc: - text_parts.append(page.get_text()) + if start_page is None and end_page is None: + for page in doc: + text_parts.append(page.get_text()) + else: + # 1-based inclusive → 0-based range. 범위는 [0, page_count] 로 클램프(방어). + total = doc.page_count + lo = max(1, start_page or 1) - 1 + hi = min(total, end_page or total) # inclusive 끝 (0-based 마지막 인덱스 = hi-1) + for i in range(lo, hi): + text_parts.append(doc.load_page(i).get_text()) return "\n".join(text_parts) -def _get_pdf_page_count(file_path: Path) -> int: - """PDF 페이지 수 확인""" +def _get_pdf_page_count( + file_path: Path, start_page: int | None = None, end_page: int | None = None +) -> int: + """PDF 페이지 수 확인. G2: 범위가 주어지면 그 범위의 페이지 수(자식 doc 밀도 계산용). + + 둘 다 None = 전체 페이지 수(기존 동작 동일). + """ import fitz with fitz.open(str(file_path)) as doc: - return len(doc) + total = len(doc) + if start_page is None and end_page is None: + return total + lo = max(1, start_page or 1) + hi = min(total, end_page or total) + return max(0, hi - lo + 1) async def _call_ocr(file_path: Path, is_image: bool, max_pages: int = 200) -> str | None: @@ -310,6 +334,43 @@ async def process(document_id: int, session: AsyncSession) -> None: doc.extracted_at = datetime.now(timezone.utc) return + # ─── G2 (PR-G2-2): 번들 자식 PDF — 부모 파일 공유 + 자기 page 범위만 추출 ─── + # kordoc 서비스는 page-range 파라미터가 없어 전체 파일을 파싱한다(자식엔 부적합) → kordoc + # 우회, PyMuPDF 로 [bundle_page_start, bundle_page_end] 범위만 추출. range OCR 은 본 PR 범위 + # 밖(자식은 ToC 존재 = digital text layer 전제 → 대개 OCR 불필요). PyMuPDF 텍스트가 빈약해도 + # 그대로 보존하고 사유를 남긴다. + if fmt == "pdf" and doc.bundle_page_start is not None and doc.bundle_page_end is not None: + if not full_path.exists(): + raise FileNotFoundError(f"파일 없음: {full_path}") + start, end = doc.bundle_page_start, doc.bundle_page_end + try: + pymupdf_text = _extract_pdf_pymupdf(full_path, start, end) + page_count = _get_pdf_page_count(full_path, start, end) + except Exception as e: + logger.error(f"[pymupdf:child] {doc.file_path} pages={start}-{end} 실패: {e}") + raise + + meta = doc.extract_meta or {} + meta["presegment_child_range"] = {"start_page": start, "end_page": end} + meta["pymupdf_chars"] = len(pymupdf_text.strip()) + should, reason = _should_ocr(pymupdf_text, page_count) + if should: + # range OCR 미지원(후속 PR) — PyMuPDF 결과 유지 + 사유 기록(silent skip 아님). + meta["ocr_skip_reason"] = "presegment_child_range_ocr_unsupported" + meta["ocr_reason"] = reason + logger.warning( + f"[pymupdf:child] {doc.file_path} pages={start}-{end} " + f"OCR 필요({reason})하나 range OCR 미지원 → PyMuPDF 결과 유지" + ) + doc.extracted_text = pymupdf_text.replace("\x00", "") + doc.extracted_at = datetime.now(timezone.utc) + doc.extractor_version = PYMUPDF_VERSION if pymupdf_text.strip() else None + doc.extract_meta = meta + logger.info( + f"[pymupdf:child] {doc.file_path} pages={start}-{end} ({len(pymupdf_text)}자)" + ) + return + # ─── kordoc 파싱 (HWP/HWPX/PDF) + PyMuPDF fallback + OCR ─── if fmt in KORDOC_FORMATS: container_path = f"/documents/{doc.file_path}" diff --git a/app/workers/file_watcher.py b/app/workers/file_watcher.py index 99c7694..8abba74 100644 --- a/app/workers/file_watcher.py +++ b/app/workers/file_watcher.py @@ -118,16 +118,18 @@ def _route_media(path: Path, expected_category: str | None) -> tuple[str | None, if expected_category == "library": # 외부 작성 학습 자료 (KGS Code, 시행규칙 등). 문서 확장자만 수락. # frontmatter 해석은 classify_worker (옵션 C) 가 담당. file_watcher 는 라우팅만. + # G2 (PR-G2-2): 문서 첫 stage = presegment (extract 前). 非PDF/단일은 통과, 번들 PDF 만 분할. if ext in LIBRARY_DOC_EXTS: - return ("library", False, "extract") + return ("library", False, "presegment") if ext in AUDIO_EXTS or ext in VIDEO_DIRECT_EXTS or ext in VIDEO_QUARANTINE_EXTS: return (None, False, None) # audio/video 잘못 들어오면 skip return (None, False, None) # 기타 알 수 없는 확장자 skip # Inbox: 문서 파이프 (기존). audio/video 확장자가 실수로 여기 들어오면 skip. + # G2 (PR-G2-2): 문서 첫 stage = presegment (extract 前). 非PDF/단일은 통과, 번들 PDF 만 분할. if ext in AUDIO_EXTS or ext in VIDEO_DIRECT_EXTS or ext in VIDEO_QUARANTINE_EXTS: return (None, False, None) - return (None, False, "extract") + return (None, False, "presegment") # ─── Web/Blog ingest (devonagent 트랙) 헬퍼 ────────────────────────────────── @@ -226,7 +228,9 @@ async def _ingest_web_file(session, file_path: Path, rel_path: str) -> tuple[int ) session.add(doc) await session.flush() - await enqueue_stage(session, doc.id, "extract") + # G2 (PR-G2-2): 모든 문서가 presegment 로 진입(단일 entry-point). HTML(非PDF)은 presegment 가 + # 변경 없이 통과시켜 extract 로 흐른다 (worker-side gating). + await enqueue_stage(session, doc.id, "presegment") return (1, 0) diff --git a/app/workers/marker_worker.py b/app/workers/marker_worker.py index 6f1055f..44c39d2 100644 --- a/app/workers/marker_worker.py +++ b/app/workers/marker_worker.py @@ -207,7 +207,21 @@ async def process(document_id: int, session: AsyncSession) -> None: return # ---- (4) page_count gauge + 분기 (LargeDoc split) ---- - page_count = _get_page_count(container_path) + # G2 (PR-G2-2): 번들 자식 doc 은 부모 파일 공유 + 자기 page 범위([bundle_page_start, end], + # 1-based inclusive)만 변환해야 한다. page_offset = 절대 시작페이지(부모 파일 기준), page_count = + # 자식 범위의 페이지 수. cols 가 NULL(일반 doc)이면 page_offset=1 + 전체 page_count = 기존 동작 동일. + file_page_count = _get_page_count(container_path) + is_child = doc.bundle_page_start is not None and doc.bundle_page_end is not None + if is_child: + page_offset = doc.bundle_page_start + if file_page_count is not None: + child_end = min(doc.bundle_page_end, file_page_count) + page_count = max(0, child_end - doc.bundle_page_start + 1) + else: + page_count = doc.bundle_page_end - doc.bundle_page_start + 1 + else: + page_offset = 1 + page_count = file_page_count # >MAX_SPLIT_PAGES = 변환 안전상태(manual_review). silently skip 아님. if page_count is not None and page_count > MAX_SPLIT_PAGES: @@ -226,20 +240,35 @@ async def process(document_id: int, session: AsyncSession) -> None: # ---- (6) 변환 분기: 소형 1-shot / 대형(>SPLIT_THRESHOLD) page-range 분할 ---- if page_count is not None and page_count > SPLIT_THRESHOLD_PAGES: - await _process_split(doc, document_id, container_path, page_count, session) + await _process_split(doc, document_id, container_path, page_count, session, page_offset) else: - await _process_single(doc, document_id, container_path, session) + await _process_single(doc, document_id, container_path, session, page_count, page_offset) async def _process_single( - doc: Document, document_id: int, container_path: str, session: AsyncSession + doc: Document, document_id: int, container_path: str, session: AsyncSession, + page_count: int | None = None, page_offset: int = 1, ) -> None: - """소형 PDF(≤ SPLIT_THRESHOLD_PAGES) 통째 1-shot 변환 (Phase 1B/1B.5 기존 경로).""" + """소형 PDF(≤ SPLIT_THRESHOLD_PAGES) 통째 1-shot 변환 (Phase 1B/1B.5 기존 경로). + + G2 (PR-G2-2): 번들 자식(page_offset>1)은 [page_offset, page_offset+page_count-1] 범위만 + 변환하도록 marker 에 start_page/end_page 를 명시한다. 일반 doc(page_offset=1)은 기존과 + 동일하게 max_pages 만 보낸다(payload byte-identical). + """ + # 일반 doc = 기존 payload 유지. 자식만 절대 page 범위를 명시(부모 파일 기준 1-based inclusive). + if page_offset > 1 and page_count is not None: + req_json = { + "file_path": container_path, + "start_page": page_offset, + "end_page": page_offset + page_count - 1, + } + else: + req_json = {"file_path": container_path, "max_pages": MAX_PAGES} try: async with httpx.AsyncClient(timeout=MARKER_TIMEOUT) as client: resp = await client.post( MARKER_ENDPOINT, - json={"file_path": container_path, "max_pages": MAX_PAGES}, + json=req_json, ) resp.raise_for_status() data = resp.json() @@ -513,6 +542,7 @@ async def _process_split( container_path: str, page_count: int, session: AsyncSession, + page_offset: int = 1, ) -> None: """대형 PDF page-range 분할 변환. @@ -523,6 +553,10 @@ async def _process_split( invariant: page numbering = 1-based inclusive (batch1: 1..BATCH_PAGES, ...). marker slug(`_page_0_*`) 는 batch 마다 재시작 → batch 별 rewrite 후 stitch (충돌 회피). + + G2 (PR-G2-2): page_offset = 부모 파일 기준 절대 시작페이지(번들 자식). marker 에 보내는 + page 는 절대값(page_offset 가산), manifest/기록은 자식 상대값(1-based) 유지 — 일반 doc + (page_offset=1)은 abs==rel 이라 기존 동작과 동일. """ n_batches = (page_count + BATCH_PAGES - 1) // BATCH_PAGES succeeded: list[dict[str, Any]] = [] # {start_page, end_page, md} @@ -534,15 +568,17 @@ async def _process_split( async with httpx.AsyncClient(timeout=MARKER_TIMEOUT) as client: for b in range(n_batches): - start_page = b * BATCH_PAGES + 1 + start_page = b * BATCH_PAGES + 1 # 자식 상대 1-based (manifest/기록용) end_page = min((b + 1) * BATCH_PAGES, page_count) + abs_start = start_page + (page_offset - 1) # 부모 파일 절대 page (marker 요청용) + abs_end = end_page + (page_offset - 1) try: resp = await client.post( MARKER_ENDPOINT, json={ "file_path": container_path, - "start_page": start_page, - "end_page": end_page, + "start_page": abs_start, + "end_page": abs_end, }, ) resp.raise_for_status() diff --git a/app/workers/presegment_worker.py b/app/workers/presegment_worker.py new file mode 100644 index 0000000..b3700e0 --- /dev/null +++ b/app/workers/presegment_worker.py @@ -0,0 +1,339 @@ +"""presegment_worker — extract 前 번들 PDF(여러 논리문서 한 파일) → N 자식 분할 (G2 / PR-G2-2). + +전 문서가 presegment stage 로 진입한다(worker-side gating): + - 非PDF(file_format != pdf · suffix != .pdf) = 즉시 fast-exit → enqueue_next_stage 가 extract 로 흘림. + - PDF = PyMuPDF ToC(level-1) deterministic 분석. '명확한 번들' 만 자식 분할, 나머지는 단일문서로 extract. + +이 PR 은 **deterministic 만** (LLM fallback = 후속 PR). 판정이 애매하면 보수적으로 분할하지 않고 +단일문서로 둔다(bias to NOT splitting). 분할 = '확실한 번들' 만: + - page_count >= MIN_BUNDLE_PAGES AND level-1 ToC 항목 >= 2 AND 모든 자식 >= MIN_CHILD_PAGES + AND 단조 증가·비중첩 AND [1, page_count] 전 범위 커버 AND 2 <= N <= MAX_CHILDREN. + +분할 시 Option A(파일 물리분할 없음): 자식은 부모 file_path 를 그대로 공유하고 +bundle_page_start/end(1-based inclusive) 로 자기 page 범위만 가리킨다. 부모-자식 관계 자체는 +document_lineage(relation_type='segmented_from'). 부모(presegment_role='parent')는 파일 홀더라 +자체 extract/embed 안 함 — enqueue_next_stage 의 presegment→extract 전이가 'parent' 면 억제된다 +(queue_consumer.enqueue_next_stage 참조). 자식의 extract 는 이 워커가 직접 enqueue 한다. + +멱등: 재실행 시 같은 부모로 이미 자식이 있으면(document_lineage segmented_from) 재생성하지 않고 +수렴(각 자식이 extract 활성/완료 상태인지만 보장)한다. + +plan: G2 pre-segmentation (PR-G2-2 deterministic ToC segmentation) +""" + +import hashlib +import os +import unicodedata +from pathlib import Path + +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession + +from core.config import settings +from core.utils import setup_logger +from models.document import Document +from models.document_lineage import DocumentLineage +from models.queue import enqueue_stage + +logger = setup_logger("presegment_worker") + +# ─── 임계값 (모듈 상수, env-override 가능, 보수적 = 분할 안 하는 쪽으로 bias) ─── +# MIN_BUNDLE_PAGES: 이 미만이면 번들로 보지 않음(단일문서). 짧은 문서의 우연한 level-1 ToC 보호. +MIN_BUNDLE_PAGES = int(os.getenv("PRESEGMENT_MIN_BUNDLE_PAGES", "60")) +# MIN_CHILD_PAGES: 자식 하나라도 이 미만이면 분할 거부(표지/목차만 떼지는 over-split 방지). +MIN_CHILD_PAGES = int(os.getenv("PRESEGMENT_MIN_CHILD_PAGES", "5")) +# MAX_CHILDREN: 자식 수 상한. 초과 = ToC 가 챕터/소제목 수준이라 논리문서 경계가 아님 → 분할 거부. +MAX_CHILDREN = int(os.getenv("PRESEGMENT_MAX_CHILDREN", "50")) + +# marker_worker._to_marker_path 와 동일 — NAS 상대경로 → 컨테이너 절대경로 prefix. +CONTAINER_PATH_PREFIX = os.getenv("MARKER_CONTAINER_PATH_PREFIX", "/documents") + + +def _resolve_path(file_path: str) -> Path | None: + """NFC(DB) vs NFD(NFS) 한글 경로 차이 흡수. thumbnail_worker._resolve_path 와 동일 패턴.""" + candidates = [ + file_path, + unicodedata.normalize("NFD", file_path), + unicodedata.normalize("NFC", file_path), + ] + for c in candidates: + p = Path(c) + if p.exists(): + return p + parent = Path(file_path).parent + if parent.exists(): + target = unicodedata.normalize("NFC", Path(file_path).name) + for child in parent.iterdir(): + if unicodedata.normalize("NFC", child.name) == target: + return child + return None + + +def _to_container_path(file_path: str) -> str: + """file_path 를 컨테이너 내부 절대경로로 변환 (marker_worker._to_marker_path 와 동일).""" + if file_path.startswith("/"): + return file_path + return f"{CONTAINER_PATH_PREFIX}/{file_path}" + + +def _is_pdf(doc: Document) -> bool: + """PDF 판정 — file_format=pdf 또는 .pdf 확장자.""" + fmt = (doc.file_format or "").lower() + if fmt == "pdf": + return True + if doc.file_path: + return Path(doc.file_path).suffix.lower() == ".pdf" + return False + + +def _level1_segments(toc: list, page_count: int) -> list[dict]: + """get_toc(simple=True) 결과에서 level-1 항목만 골라 자식 후보 segment 리스트 생성. + + toc 항목 = [level, title, page] (page 는 1-based). level==1 만 채택. + end_page = 다음 level-1 항목의 page - 1, 마지막 = page_count. + 동일 page 에서 시작하는 level-1 이 여럿이면 정렬 후 인접 항목으로 경계 계산되며, + 그 경우 0-페이지 segment 가 생겨 후속 검증(MIN_CHILD_PAGES·단조)에서 거부된다. + """ + starts = [] + for entry in toc: + # simple=True 는 [level, title, page]. 방어적으로 길이 체크. + if not entry or len(entry) < 3: + continue + level, title, page = entry[0], entry[1], entry[2] + if level != 1: + continue + # ToC page 가 범위 밖(0/음수/page_count 초과)이면 깨진 ToC → 후속 검증에서 거부됨. + starts.append((int(page), (title or "").strip())) + + # ToC 가 정렬돼 있지 않을 수 있으므로 page 기준 정렬(원본 순서 보존 위해 안정 정렬). + starts.sort(key=lambda x: x[0]) + + segments: list[dict] = [] + for i, (start_page, title) in enumerate(starts): + if i + 1 < len(starts): + end_page = starts[i + 1][0] - 1 + else: + end_page = page_count + segments.append({"start_page": start_page, "end_page": end_page, "title": title}) + return segments + + +def _is_clear_bundle(segments: list[dict], page_count: int) -> tuple[bool, str]: + """deterministic '명확한 번들' 판정. (clear, reason) 반환. + + clear=True 면 reason="" / clear=False 면 reason 은 거부 사유(로깅용). + 모든 조건은 보수적 — 하나라도 어긋나면 단일문서로 처리(분할 안 함). + """ + n = len(segments) + if n < 2: + return False, f"too_few_level1_entries(n={n})" + if n > MAX_CHILDREN: + return False, f"too_many_children(n={n}>{MAX_CHILDREN})" + + # 첫 segment 가 1페이지에서 시작 + 마지막이 page_count 에서 끝 = 전 범위 커버. + if segments[0]["start_page"] != 1: + return False, f"first_start_not_1(start={segments[0]['start_page']})" + if segments[-1]["end_page"] != page_count: + return False, f"last_end_not_page_count(end={segments[-1]['end_page']},pc={page_count})" + + prev_end = 0 + for seg in segments: + start, end = seg["start_page"], seg["end_page"] + # 단조 증가 · 비중첩: 각 start 는 직전 end + 1 이어야 빈틈/겹침 없이 [1,pc] 정확 분할. + if start != prev_end + 1: + return False, f"non_contiguous(start={start},prev_end={prev_end})" + if end < start: + return False, f"non_monotonic(start={start},end={end})" + if (end - start + 1) < MIN_CHILD_PAGES: + return False, f"child_too_small(pages={end - start + 1}<{MIN_CHILD_PAGES})" + prev_end = end + + if prev_end != page_count: + return False, f"coverage_gap(covered={prev_end},pc={page_count})" + + return True, "" + + +def _child_title(parent: Document, seg: dict) -> str: + """자식 제목 = 부모 제목 + ' — ' + (segment 제목 또는 page 범위).""" + base = (parent.title or "").strip() or (parent.original_filename or "") or "문서" + seg_title = (seg.get("title") or "").strip() + suffix = seg_title if seg_title else f"p.{seg['start_page']}-{seg['end_page']}" + return f"{base} — {suffix}" + + +def _child_file_hash(parent_hash: str, start: int, end: int) -> str: + """자식 file_hash = sha256(f"{parent.file_hash}:{start}-{end}"). 결정적 → 재실행 멱등. + + 부모 file_hash 가 NULL 일 수는 없으나(NOT NULL) 방어적으로 빈 문자열 처리. + """ + return hashlib.sha256(f"{parent_hash or ''}:{start}-{end}".encode("utf-8")).hexdigest() + + +async def _ensure_child_extract(session: AsyncSession, child_id: int) -> None: + """자식이 아직 extract 안 됐으면 extract enqueue (멱등 수렴 경로). + + 이미 extracted_text 가 채워졌거나 활성 큐 행이 있으면 enqueue_stage 가 no-op/skip. + """ + child = await session.get(Document, child_id) + if child is None: + return + # 이미 추출 완료면 재enqueue 불필요 (큐 중복은 enqueue_stage 가 막지만 의미상으로도 skip). + if child.extracted_at is not None and child.extracted_text is not None: + return + await enqueue_stage(session, child_id, "extract") + + +async def process(document_id: int, session: AsyncSession) -> None: + """presegment stage 워커 진입점. queue_consumer 가 호출. + + 전 문서가 진입하며, 非PDF·단일문서는 변경 없이 통과(presegment_role 그대로 NULL) → extract 로 흐른다. + '명확한 번들' PDF 만 자식 분할 + 부모를 'parent' 로 표식(이 경우 부모는 extract 로 흐르지 않음). + """ + doc = await session.get(Document, document_id) + if doc is None: + logger.warning(f"[presegment] document {document_id} not found") + return + + # ─── (0) 非PDF — fast-exit. presegment_role 그대로 NULL → enqueue_next_stage 가 extract 로 흘림 ─── + if not _is_pdf(doc): + logger.info(f"[presegment] id={document_id} non-pdf (fmt={doc.file_format}) → extract") + return + + # ─── (0.5) file_path 없음(예: note) — 분할 불가, 단일문서로 통과 ─── + if not doc.file_path: + logger.info(f"[presegment] id={document_id} no file_path → extract") + return + + # ─── (1) 이미 분할된 자식 자신이 presegment 로 다시 들어온 경우 — 재분할 금지 ─── + # (정상 흐름에선 자식은 곧장 extract 로 enqueue 되지만, 재처리 스크립트 등으로 들어올 수 있음.) + if doc.presegment_role in ("child", "parent"): + logger.info( + f"[presegment] id={document_id} already presegment_role={doc.presegment_role} → skip" + ) + return + + # ─── (2) 파일 열기 + page_count ─── + raw = str(Path(settings.nas_mount_path) / doc.file_path) + source = _resolve_path(raw) + if source is None: + # 파일 부재 = extract 가 동일 상황에서 FileNotFoundError 로 처리할 사안. + # presegment 는 분할 불가일 뿐이므로 단일문서로 통과시켜 extract 가 일관되게 처리하게 둔다. + logger.warning(f"[presegment] id={document_id} file not found ({raw}) → extract") + return + + import fitz # PyMuPDF — extract_worker/marker_worker 와 동일 의존 + + try: + with fitz.open(str(source)) as pdf: + page_count = pdf.page_count + toc = pdf.get_toc(simple=True) or [] + except Exception as exc: + # PDF 손상 등 — 분할 불가. 단일문서로 통과(extract 가 PyMuPDF/OCR 로 재시도하며 가시화). + logger.warning( + f"[presegment] id={document_id} fitz open/toc failed " + f"({type(exc).__name__}: {exc}) → extract" + ) + return + + # ─── (3) page_count 가 임계 미만 = 단일문서 (대다수 경로) ─── + if page_count < MIN_BUNDLE_PAGES: + logger.info( + f"[presegment] id={document_id} single doc " + f"(pages={page_count}<{MIN_BUNDLE_PAGES}) → extract" + ) + return + + # ─── (4) level-1 ToC → 자식 후보 segment ─── + segments = _level1_segments(toc, page_count) + + if not segments: + # 큰 PDF 인데 ToC 없음/level-1 없음 = 애매(LLM fallback 대상, 후속 PR). + # 이 PR 은 기본 = 단일문서로 처리하고 사유를 남긴다. + logger.info( + f"[presegment] presegment_ambiguous id={document_id} " + f"reason=no_level1_toc pages={page_count} → single doc(extract)" + ) + return + + clear, reason = _is_clear_bundle(segments, page_count) + if not clear: + # 큰 PDF + ToC 는 있으나 '명확한 번들' 기준 미달 = 애매 → 단일문서(분할 안 함). + logger.info( + f"[presegment] presegment_ambiguous id={document_id} " + f"reason={reason} pages={page_count} level1={len(segments)} → single doc(extract)" + ) + return + + # ─── (5) 명확한 번들 — 멱등 체크: 이미 자식이 있으면 수렴만 ─── + existing_children = ( + await session.execute( + select(DocumentLineage.derived_document_id).where( + DocumentLineage.source_document_id == doc.id, + DocumentLineage.relation_type == "segmented_from", + ) + ) + ).scalars().all() + + if existing_children: + # 부모 표식이 누락된 경우 보정(이전 부분실패 복구). + if doc.presegment_role != "parent": + doc.presegment_role = "parent" + for child_id in existing_children: + await _ensure_child_extract(session, child_id) + await session.commit() + logger.info( + f"[presegment] id={document_id} children already exist " + f"(n={len(existing_children)}) → converge(ensure extract), no re-create" + ) + return + + # ─── (6) 자식 N개 생성 + lineage + extract enqueue ─── + n = len(segments) + created_ids: list[int] = [] + for seg in segments: + start, end = seg["start_page"], seg["end_page"] + child = Document( + # Option A: 부모 파일 그대로 공유(물리 분할 없음). 자식은 bundle_page_start/end 로 슬라이스. + file_path=doc.file_path, + file_hash=_child_file_hash(doc.file_hash, start, end), + file_format=doc.file_format, + file_size=doc.file_size, + file_type=doc.file_type, + import_source=doc.import_source, + original_filename=doc.original_filename, + source_channel=doc.source_channel, + category=doc.category, + data_origin=doc.data_origin, + doc_purpose=doc.doc_purpose, + # 안전 자료실 축은 부모에서 상속(분할이 자료유형/관할을 바꾸지 않음). + material_type=doc.material_type, + jurisdiction=doc.jurisdiction, + title=_child_title(doc, seg), + bundle_page_start=start, + bundle_page_end=end, + presegment_role="child", + ) + session.add(child) + await session.flush() # child.id 확보 + created_ids.append(child.id) + + session.add( + DocumentLineage( + source_document_id=doc.id, + derived_document_id=child.id, + relation_type="segmented_from", + meta={"start_page": start, "end_page": end}, + ) + ) + # 자식 extract 는 워커가 직접 enqueue (부모는 'parent' 라 extract 로 흐르지 않음). + await enqueue_stage(session, child.id, "extract") + + # 부모 = 파일 홀더. presegment→extract 전이는 enqueue_next_stage 가 'parent' 면 억제. + doc.presegment_role = "parent" + await session.commit() + + logger.info( + f"[presegment] id={document_id} SPLIT into {n} children " + f"(pages={page_count}) child_ids={created_ids}" + ) diff --git a/app/workers/queue_consumer.py b/app/workers/queue_consumer.py index c7c2ebb..a72acd2 100644 --- a/app/workers/queue_consumer.py +++ b/app/workers/queue_consumer.py @@ -31,9 +31,9 @@ _hold_logged = False # embed/chunk 1→10 (2026-06-12 fast-consumer): 건당 <1s 실측 — Phase 0.1 초기 보수값이 # LLM 사이클에 인질로 잡혀 실효 ~580/일 vs 수요 최대 2,700/일 → 적체 원인이었음. # 10 = TEI/marker 와 GPU 공유 고려한 보수 상향(전용 1분 잡 기준 캡 ~14,400/일). -BATCH_SIZE = {"extract": 5, "classify": 3, "summarize": 3, "embed": 10, "chunk": 10, - "preview": 2, "stt": 1, "thumbnail": 3, "deep_summary": 1, "markdown": 1, - "fulltext": 3} +BATCH_SIZE = {"presegment": 3, "extract": 5, "classify": 3, "summarize": 3, "embed": 10, + "chunk": 10, "preview": 2, "stt": 1, "thumbnail": 3, "deep_summary": 1, + "markdown": 1, "fulltext": 3} STALE_THRESHOLD_MINUTES = 10 # markdown 대형 split 변환은 한 doc 이 수십 분(5210 ≈ 40분) 동안 processing 상태로 머문다. # marker_worker 는 queue 행에 heartbeat 를 찍지 않으므로(started_at 고정), main 의 10분 @@ -46,7 +46,7 @@ MARKDOWN_STALE_THRESHOLD_MINUTES = int(os.getenv("MARKDOWN_STALE_MINUTES", "120" # (reset_stale_items 가 자기 집합만 reset, 교차 시 이중 복구 위험). # STT 도 장기 작업 가능성이 있으나 본 PR 범위 밖 — main 에 유지(follow-up). MAIN_QUEUE_STAGES = [ - "extract", "classify", "summarize", + "presegment", "extract", "classify", "summarize", "preview", "stt", "thumbnail", "fulltext", ] MARKDOWN_QUEUE_STAGES = ["markdown"] @@ -165,6 +165,10 @@ async def enqueue_next_stage(document_id: int, current_stage: str): } next_stages = { + # G2 (PR-G2-2): 전 문서가 presegment → extract. 단, 번들 분할로 'parent' 가 된 문서는 + # 파일 홀더라 자체 extract 안 함 — 아래 suppression 으로 이 전이를 건너뛴다(자식 extract 는 + # presegment_worker 가 직접 enqueue). 단일/非PDF 문서(role NULL)는 정상적으로 extract 로 흐름. + "presegment": ["extract"], "extract": ["classify", "preview"], "classify": ["embed", "chunk", "markdown"], "stt": ["classify"], @@ -180,6 +184,18 @@ async def enqueue_next_stage(document_id: int, current_stage: str): stages = extract_override_by_channel[sc] else: stages = next_stages.get(current_stage, []) + elif current_stage == "presegment": + # 번들 분할 parent 는 extract 로 흐르지 않게 억제 (자식이 부모 extract 에 가려지는 것 방지). + # role NULL(단일/非PDF) / 'child' 는 정상 전이. presegment_worker 가 자식 extract 를 직접 + # enqueue 하므로 'parent' 만 여기서 no-op. + from models.document import Document + async with async_session() as lookup_session: + doc = await lookup_session.get(Document, document_id) + role = doc.presegment_role if doc else None + if role == "parent": + stages = [] + else: + stages = next_stages.get(current_stage, []) else: stages = next_stages.get(current_stage, []) @@ -199,6 +215,7 @@ def _load_workers(): from workers.deep_summary_worker import process as deep_summary_process from workers.embed_worker import process as embed_process from workers.extract_worker import process as extract_process + from workers.presegment_worker import process as presegment_process from workers.preview_worker import process as preview_process from workers.stt_worker import process as stt_process from workers.summarize_worker import process as summarize_process @@ -207,6 +224,8 @@ def _load_workers(): from workers.fulltext_worker import process as fulltext_process return { + # G2 (PR-G2-2): extract 前 번들 PDF → N 자식 분할 (deterministic ToC). 非PDF/단일은 통과. + "presegment": presegment_process, "extract": extract_process, "classify": classify_process, "summarize": summarize_process,