From 23bb5ac9c95423a75849febbdb9941aa74df2d19 Mon Sep 17 00:00:00 2001 From: hyungi Date: Thu, 18 Jun 2026 17:52:27 +0900 Subject: [PATCH] =?UTF-8?q?feat(presegment):=20G2=20PR-3=20=E2=80=94=20LLM?= =?UTF-8?q?=20=EA=B2=BD=EA=B3=84=20=ED=8F=B4=EB=B0=B1=20(flag-gated,=20?= =?UTF-8?q?=EA=B8=B0=EB=B3=B8=20OFF,=20scaffold-first)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ToC 없는/게이트 미달 대형 PDF(>=60p)에 한해 off-card Qwen(맥북, call_deep_or_defer, StageDeferred-safe) 경계 제안 → 동일 검증게이트(_is_clear_bundle) 통과 시에만 deterministic 과 공유하는 _create_children 로 분할. is_bundle=false/파싱·검증 실패=단일문서(오늘과 동일)+로깅. - env PRESEGMENT_LLM_FALLBACK 기본 false → 배포 동작 무변(LLM 미호출, 검증=unit test) - 자식생성 _create_children 공유 헬퍼로 리팩터(deterministic+LLM 단일 경로, 동작 동일) - SegmentationOutput Pydantic + parse_json_response(house 패턴) + per-page heading 샘플(본문 미전송) - prompt app/prompts/presegment_boundaries.txt + tests/test_presegment_llm.py(14, fitz/DB/LLM mock) no direct HTTP·no silent fallback. 활성=flag ON + 실 router fixture 검증 후. Co-Authored-By: Claude Opus 4.8 (1M context) --- app/prompts/presegment_boundaries.txt | 41 +++ app/workers/presegment_worker.py | 358 ++++++++++++++++++----- tests/test_presegment_llm.py | 400 ++++++++++++++++++++++++++ 3 files changed, 720 insertions(+), 79 deletions(-) create mode 100644 app/prompts/presegment_boundaries.txt create mode 100644 tests/test_presegment_llm.py diff --git a/app/prompts/presegment_boundaries.txt b/app/prompts/presegment_boundaries.txt new file mode 100644 index 0000000..1731217 --- /dev/null +++ b/app/prompts/presegment_boundaries.txt @@ -0,0 +1,41 @@ +You are a document-boundary detector. Output ONLY JSON {is_bundle, segments:[{start_page,end_page,title}]}. + +You are given a single PDF that may be a "bundle" — several independent logical documents +concatenated into one file (for example: multiple laws, multiple reports, or multiple papers +scanned together). Your job is to decide whether it is a bundle and, if so, where each logical +document starts and ends. + +You receive only a compact sample per page: the page number and the first line / heading of that +page (text may be truncated). Use these heading/first-line signals to detect where a new logical +document begins (a new title page, a new cover, a clearly new document title, a restart of +numbering, etc.). You do NOT receive the full text. + +Output rules: +- Respond with STRICT JSON only. No prose, no markdown, no code fence. +- Schema: + { + "is_bundle": true | false, + "segments": [ + {"start_page": , "end_page": , "title": ""} + ] + } +- Page numbers are 1-based and INCLUSIVE. start_page=1 is the first page; end_page equals the last + page of that segment. +- Segments MUST fully cover every page with NO gaps and NO overlaps: + - the first segment MUST start at page 1, + - each next segment MUST start exactly one page after the previous segment's end_page, + - the last segment MUST end at the final page (page_count). +- Order segments by start_page ascending. +- title = a short title for that logical document if you can infer one from its first page, + otherwise null. + +If the file is NOT a bundle (it is a single logical document), respond: + {"is_bundle": false, "segments": []} + +Be conservative: only report is_bundle=true when the heading signals clearly indicate separate +logical documents. When unsure, return is_bundle=false. + +page_count: {page_count} + +Per-page samples (one per line, "p{n}: {first line}"): +{page_samples} diff --git a/app/workers/presegment_worker.py b/app/workers/presegment_worker.py index 4fc6fc7..75bc96d 100644 --- a/app/workers/presegment_worker.py +++ b/app/workers/presegment_worker.py @@ -4,11 +4,19 @@ - 非PDF(file_format != pdf · suffix != .pdf) = 즉시 fast-exit → enqueue_next_stage 가 extract 로 흘림. - PDF = PyMuPDF ToC(level-1) deterministic 분석. '명확한 번들' 만 자식 분할, 나머지는 단일문서로 extract. -이 PR 은 **deterministic 만** (LLM fallback = 후속 PR). 판정이 애매하면 보수적으로 분할하지 않고 -단일문서로 둔다(bias to NOT splitting). 분할 = '확실한 번들' 만: +deterministic 경로(PR-G2-2): 판정이 애매하면 보수적으로 분할하지 않고 단일문서로 둔다 +(bias to NOT splitting). 분할 = '확실한 번들' 만: - page_count >= MIN_BUNDLE_PAGES AND level-1 ToC 항목 >= 2 AND 모든 자식 >= MIN_CHILD_PAGES AND 단조 증가·비중첩 AND [1, page_count] 전 범위 커버 AND 2 <= N <= MAX_CHILDREN. +LLM 경계 폴백(PR-G2-3, env PRESEGMENT_LLM_FALLBACK, 기본 OFF — scaffold-first): deterministic +이 '명확한 번들' 을 못 만든 대형 PDF(ToC 없음/level-1 없음/게이트 미달)에 한해, OFF 면 오늘과 +동일(단일문서)이고 ON 이면 off-card Qwen(맥북, 라우터 :8890, model=qwen-macbook)에게 경계를 +제안받는다. compact per-page heading 샘플만 전송(본문 미전송). LLM 출력은 **동일 검증 게이트 +(_is_clear_bundle)** 통과 시에만 deterministic 과 같은 _create_children 경로로 분할 — +is_bundle=false / 파싱·검증 실패 = 단일문서(오늘과 동일) + presegment_llm_rejected 로깅. +맥북 불가(503/연결/절단)는 StageDeferred 로 큐 재시도(백오프, no silent fallback). + 분할 시 ★후보 A(물리분할 없음, uq_documents_file_path 해소): 자식 file_path = unique 합성값 `{부모경로}#p{start}-{end}` (UNIQUE 제약 통과), 실파일은 `bundle_source_path()` 로 부모 경로 복원. 자식은 bundle_page_start/end(1-based inclusive) 로 부모 파일의 자기 page 범위만 가리킨다. @@ -32,9 +40,11 @@ import re import unicodedata from pathlib import Path +from pydantic import BaseModel, ValidationError from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession +from ai.client import AIClient, call_deep_or_defer, parse_json_response from core.config import settings from core.utils import setup_logger from models.document import Document @@ -54,6 +64,40 @@ MAX_CHILDREN = int(os.getenv("PRESEGMENT_MAX_CHILDREN", "50")) # marker_worker._to_marker_path 와 동일 — NAS 상대경로 → 컨테이너 절대경로 prefix. CONTAINER_PATH_PREFIX = os.getenv("MARKER_CONTAINER_PATH_PREFIX", "/documents") +# ─── PR-G2-3 LLM 경계 폴백 (scaffold-first, 기본 OFF) ─── +# PRESEGMENT_LLM_FALLBACK: 기본 "false". OFF 면 deterministic 경로만(=오늘과 동일 — 애매하면 +# 단일문서). ON 이면 deterministic 이 '명확한 번들' 을 못 만든 대형 PDF(page_count >= +# MIN_BUNDLE_PAGES) 에 한해 off-card Qwen(맥북, 라우터 :8890 경유)에게 경계를 제안받아 +# **동일 검증 게이트(_is_clear_bundle)** 통과 시에만 deterministic 과 같은 자식 생성 경로로 분할. +# 검증 실패/파싱 실패/is_bundle=false = 단일문서(오늘과 동일) + presegment_llm_rejected 로깅. +PRESEGMENT_LLM_FALLBACK = os.getenv("PRESEGMENT_LLM_FALLBACK", "false").lower() in ( + "1", "true", "yes", "on", +) +# LLM 에 보내는 per-page 샘플의 page 당 char 상한 (heading/첫줄만 — 본문 미전송). +PRESEGMENT_LLM_PAGE_CHARS = int(os.getenv("PRESEGMENT_LLM_PAGE_CHARS", "80")) +# 전체 page-sample 블록의 char 상한 (수 KB 가드 — 초과 시 잘라냄, 본문 누출/페이로드 폭발 방지). +PRESEGMENT_LLM_SAMPLE_CHARS = int(os.getenv("PRESEGMENT_LLM_SAMPLE_CHARS", "12000")) + +# 경계 폴백 프롬프트 (app/prompts/presegment_boundaries.txt). system 지시 + 1-based inclusive· +# 전범위 커버·무중첩 규칙. {page_count}/{page_samples} 를 str.replace 로 주입. +_PRESEGMENT_PROMPT_PATH = Path(__file__).parent.parent / "prompts" / "presegment_boundaries.txt" + + +class Segment(BaseModel): + """LLM 이 제안하는 1-based inclusive page 범위 한 조각.""" + + start_page: int + end_page: int + title: str | None = None + + +class SegmentationOutput(BaseModel): + """presegment_boundaries 응답 스키마. parse_json_response → model_validate.""" + + is_bundle: bool = False + segments: list[Segment] = [] + confidence: float | None = None + def _resolve_path(file_path: str) -> Path | None: """NFC(DB) vs NFD(NFS) 한글 경로 차이 흡수. thumbnail_worker._resolve_path 와 동일 패턴.""" @@ -205,6 +249,216 @@ async def _ensure_child_extract(session: AsyncSession, child_id: int) -> None: await enqueue_stage(session, child_id, "extract") +async def _create_children( + doc: Document, segments: list[dict], session: AsyncSession +) -> int: + """검증된 segments 로 자식 N개 생성 + lineage + extract enqueue + 부모 표식 (멱등). + + deterministic '명확한 번들' 경로와 LLM 폴백 경로가 공유하는 단일 자식 생성 경로. + 호출 전 segments 는 반드시 _is_clear_bundle 검증을 통과해야 한다(여기선 재검증 X). + commit 까지 수행. 반환값 = 실제 생성한 자식 수(이미 존재해 수렴만 한 경우 0). + """ + # ─── 멱등 체크: 이미 자식이 있으면 수렴만 (재생성 금지) ─── + existing_children = ( + await session.execute( + select(DocumentLineage.derived_document_id).where( + DocumentLineage.source_document_id == doc.id, + DocumentLineage.relation_type == "segmented_from", + ) + ) + ).scalars().all() + + if existing_children: + # 부모 표식이 누락된 경우 보정(이전 부분실패 복구). + if doc.presegment_role != "parent": + doc.presegment_role = "parent" + for child_id in existing_children: + await _ensure_child_extract(session, child_id) + await session.commit() + logger.info( + f"[presegment] id={doc.id} children already exist " + f"(n={len(existing_children)}) → converge(ensure extract), no re-create" + ) + return 0 + + # ─── 자식 N개 생성 + lineage + extract enqueue ─── + created_ids: list[int] = [] + for seg in segments: + start, end = seg["start_page"], seg["end_page"] + child = Document( + # 후보 A: 자식 file_path = unique 합성값 `{부모경로}#p{s}-{e}` (uq_documents_file_path + # 충돌 회피). 실파일은 bundle_source_path() 로 복원(부모 경로). 물리 분할 없음 — + # 자식은 bundle_page_start/end 로 부모 파일을 슬라이스. + file_path=f"{doc.file_path}#p{start}-{end}", + file_hash=_child_file_hash(doc.file_hash, start, end), + file_format=doc.file_format, + file_size=doc.file_size, + file_type=doc.file_type, + import_source=doc.import_source, + original_filename=doc.original_filename, + source_channel=doc.source_channel, + category=doc.category, + data_origin=doc.data_origin, + doc_purpose=doc.doc_purpose, + # 안전 자료실 축은 부모에서 상속(분할이 자료유형/관할을 바꾸지 않음). + material_type=doc.material_type, + jurisdiction=doc.jurisdiction, + title=_child_title(doc, seg), + bundle_page_start=start, + bundle_page_end=end, + presegment_role="child", + ) + session.add(child) + await session.flush() # child.id 확보 + created_ids.append(child.id) + + session.add( + DocumentLineage( + source_document_id=doc.id, + derived_document_id=child.id, + relation_type="segmented_from", + meta={"start_page": start, "end_page": end}, + ) + ) + # 자식 extract 는 워커가 직접 enqueue (부모는 'parent' 라 extract 로 흐르지 않음). + await enqueue_stage(session, child.id, "extract") + + # 부모 = 파일 홀더. presegment→extract 전이는 enqueue_next_stage 가 'parent' 면 억제. + doc.presegment_role = "parent" + await session.commit() + + logger.info( + f"[presegment] id={doc.id} SPLIT into {len(created_ids)} children " + f"child_ids={created_ids}" + ) + return len(created_ids) + + +def _segments_from_output(out: "SegmentationOutput") -> list[dict]: + """SegmentationOutput.segments(Pydantic) → _is_clear_bundle / _create_children 가 쓰는 dict 형태.""" + return [ + {"start_page": s.start_page, "end_page": s.end_page, "title": (s.title or "")} + for s in out.segments + ] + + +def _page_samples(pdf, page_count: int) -> str: + """LLM 입력용 compact per-page 샘플 — page 당 heading/첫줄만(`p{n}: {firstline}`). + + PyMuPDF page.get_text() 로 page 별 텍스트를 스트리밍하되 page 당 첫 비공백 줄만, + PRESEGMENT_LLM_PAGE_CHARS 로 잘라 본문 누출 차단. 전체 블록은 PRESEGMENT_LLM_SAMPLE_CHARS + 가드로 상한(수 KB) — 초과 시 그 지점에서 중단(앞쪽 페이지 우선 보존). + """ + lines: list[str] = [] + total = 0 + for i in range(page_count): + try: + text = pdf[i].get_text() or "" + except Exception: + text = "" + first = "" + for ln in text.splitlines(): + ln = ln.strip() + if ln: + first = ln + break + first = first[:PRESEGMENT_LLM_PAGE_CHARS] + entry = f"p{i + 1}: {first}" + if total + len(entry) + 1 > PRESEGMENT_LLM_SAMPLE_CHARS: + break + lines.append(entry) + total += len(entry) + 1 + return "\n".join(lines) + + +async def _llm_boundary_fallback( + doc: Document, source: Path, page_count: int, session: AsyncSession +) -> bool: + """애매 + 대형(ToC-less 등) PDF 에 대해 off-card Qwen 으로 경계 제안 → 검증 → 분할. + + 반환 True = LLM 경로가 분할을 수행(또는 멱등 수렴)했으므로 호출자는 추가 처리 없이 return. + 반환 False = is_bundle=false / 파싱 실패 / 검증 실패 → 호출자는 단일문서(오늘과 동일) 처리. + 맥북 불가(503/연결/절단)는 call_deep_or_defer 가 StageDeferred 로 raise → 큐 재시도(백오프). + silent fallback 금지 — deep 슬롯 외 다른 backend 자동 호출 안 함. + """ + import fitz # PyMuPDF — deterministic 경로와 동일 의존 + + # per-page 샘플은 파일을 다시 열어 스트리밍(deterministic with 블록과 분리해 그 경로 무회귀). + try: + with fitz.open(str(source)) as pdf: + samples = _page_samples(pdf, page_count) + except Exception as exc: + logger.warning( + f"[presegment] id={doc.id} llm fallback sample 실패 " + f"({type(exc).__name__}: {exc}) → single doc(extract)" + ) + return False + + try: + template = _PRESEGMENT_PROMPT_PATH.read_text(encoding="utf-8") + except Exception as exc: + logger.warning( + f"[presegment] id={doc.id} prompt 로드 실패 ({type(exc).__name__}: {exc}) " + f"→ single doc(extract)" + ) + return False + + prompt = template.replace("{page_count}", str(page_count)).replace( + "{page_samples}", samples + ) + + # off-card 호출 — call_deep_or_defer 가 deep 슬롯(맥북, 라우터 :8890, model=qwen-macbook) + # 으로 라우팅. 맥북 불가는 StageDeferred 로 전파(여기서 잡지 않음 → 큐가 보류/백오프). + # classify_worker 와 동일하게 AIClient() 인스턴스화. + client = AIClient() + try: + raw = await call_deep_or_defer(client, prompt) + finally: + await client.close() + + parsed = parse_json_response(raw) + if not parsed: + logger.info( + f"[presegment] presegment_llm_rejected id={doc.id} " + f"reason=parse_failed raw={raw[:160]!r} → single doc(extract)" + ) + return False + + try: + out = SegmentationOutput.model_validate(parsed) + except (ValidationError, ValueError, TypeError) as exc: + logger.info( + f"[presegment] presegment_llm_rejected id={doc.id} " + f"reason=schema_invalid({type(exc).__name__}) → single doc(extract)" + ) + return False + + if not out.is_bundle: + logger.info( + f"[presegment] presegment_llm_rejected id={doc.id} " + f"reason=is_bundle_false → single doc(extract)" + ) + return False + + segments = _segments_from_output(out) + clear, reason = _is_clear_bundle(segments, page_count) + if not clear: + # LLM 출력을 그대로 믿지 않음 — deterministic 과 동일 게이트 미달이면 단일문서. + logger.info( + f"[presegment] presegment_llm_rejected id={doc.id} " + f"reason={reason} n={len(segments)} pages={page_count} → single doc(extract)" + ) + return False + + n = await _create_children(doc, segments, session) + logger.info( + f"[presegment] id={doc.id} LLM-SPLIT accepted " + f"(pages={page_count} n={len(segments)} created={n} " + f"confidence={out.confidence})" + ) + return True + + async def process(document_id: int, session: AsyncSession) -> None: """presegment stage 워커 진입점. queue_consumer 가 호출. @@ -269,8 +523,17 @@ async def process(document_id: int, session: AsyncSession) -> None: segments = _level1_segments(toc, page_count) if not segments: - # 큰 PDF 인데 ToC 없음/level-1 없음 = 애매(LLM fallback 대상, 후속 PR). - # 이 PR 은 기본 = 단일문서로 처리하고 사유를 남긴다. + # 큰 PDF 인데 ToC 없음/level-1 없음 = 애매. flag ON 이면 LLM 경계 폴백(PR-G2-3), + # OFF(기본) 이면 오늘과 동일 — 단일문서로 처리하고 사유를 남긴다. + if PRESEGMENT_LLM_FALLBACK: + logger.info( + f"[presegment] presegment_ambiguous id={document_id} " + f"reason=no_level1_toc pages={page_count} → LLM fallback" + ) + if await _llm_boundary_fallback(doc, source, page_count, session): + return + # LLM 이 분할하지 않음(is_bundle=false / 검증·파싱 실패) — 단일문서. + return logger.info( f"[presegment] presegment_ambiguous id={document_id} " f"reason=no_level1_toc pages={page_count} → single doc(extract)" @@ -279,84 +542,21 @@ async def process(document_id: int, session: AsyncSession) -> None: clear, reason = _is_clear_bundle(segments, page_count) if not clear: - # 큰 PDF + ToC 는 있으나 '명확한 번들' 기준 미달 = 애매 → 단일문서(분할 안 함). + # 큰 PDF + ToC 는 있으나 '명확한 번들' 기준 미달 = 애매. flag ON 이면 LLM 경계 폴백, + # OFF(기본) 이면 오늘과 동일 — 단일문서(분할 안 함). + if PRESEGMENT_LLM_FALLBACK: + logger.info( + f"[presegment] presegment_ambiguous id={document_id} " + f"reason={reason} pages={page_count} level1={len(segments)} → LLM fallback" + ) + if await _llm_boundary_fallback(doc, source, page_count, session): + return + return logger.info( f"[presegment] presegment_ambiguous id={document_id} " f"reason={reason} pages={page_count} level1={len(segments)} → single doc(extract)" ) return - # ─── (5) 명확한 번들 — 멱등 체크: 이미 자식이 있으면 수렴만 ─── - existing_children = ( - await session.execute( - select(DocumentLineage.derived_document_id).where( - DocumentLineage.source_document_id == doc.id, - DocumentLineage.relation_type == "segmented_from", - ) - ) - ).scalars().all() - - if existing_children: - # 부모 표식이 누락된 경우 보정(이전 부분실패 복구). - if doc.presegment_role != "parent": - doc.presegment_role = "parent" - for child_id in existing_children: - await _ensure_child_extract(session, child_id) - await session.commit() - logger.info( - f"[presegment] id={document_id} children already exist " - f"(n={len(existing_children)}) → converge(ensure extract), no re-create" - ) - return - - # ─── (6) 자식 N개 생성 + lineage + extract enqueue ─── - n = len(segments) - created_ids: list[int] = [] - for seg in segments: - start, end = seg["start_page"], seg["end_page"] - child = Document( - # 후보 A: 자식 file_path = unique 합성값 `{부모경로}#p{s}-{e}` (uq_documents_file_path - # 충돌 회피). 실파일은 bundle_source_path() 로 복원(부모 경로). 물리 분할 없음 — - # 자식은 bundle_page_start/end 로 부모 파일을 슬라이스. - file_path=f"{doc.file_path}#p{start}-{end}", - file_hash=_child_file_hash(doc.file_hash, start, end), - file_format=doc.file_format, - file_size=doc.file_size, - file_type=doc.file_type, - import_source=doc.import_source, - original_filename=doc.original_filename, - source_channel=doc.source_channel, - category=doc.category, - data_origin=doc.data_origin, - doc_purpose=doc.doc_purpose, - # 안전 자료실 축은 부모에서 상속(분할이 자료유형/관할을 바꾸지 않음). - material_type=doc.material_type, - jurisdiction=doc.jurisdiction, - title=_child_title(doc, seg), - bundle_page_start=start, - bundle_page_end=end, - presegment_role="child", - ) - session.add(child) - await session.flush() # child.id 확보 - created_ids.append(child.id) - - session.add( - DocumentLineage( - source_document_id=doc.id, - derived_document_id=child.id, - relation_type="segmented_from", - meta={"start_page": start, "end_page": end}, - ) - ) - # 자식 extract 는 워커가 직접 enqueue (부모는 'parent' 라 extract 로 흐르지 않음). - await enqueue_stage(session, child.id, "extract") - - # 부모 = 파일 홀더. presegment→extract 전이는 enqueue_next_stage 가 'parent' 면 억제. - doc.presegment_role = "parent" - await session.commit() - - logger.info( - f"[presegment] id={document_id} SPLIT into {n} children " - f"(pages={page_count}) child_ids={created_ids}" - ) + # ─── (5) 명확한 번들 (deterministic) — 공유 자식 생성 경로 (멱등 수렴 포함) ─── + await _create_children(doc, segments, session) diff --git a/tests/test_presegment_llm.py b/tests/test_presegment_llm.py new file mode 100644 index 0000000..e6a93cb --- /dev/null +++ b/tests/test_presegment_llm.py @@ -0,0 +1,400 @@ +"""PR-G2-3 — presegment LLM 경계 폴백 단위 테스트. + +scaffold-first 안전성 박제: + (a) parse_json_response + SegmentationOutput 가 대표 fixture(ToC-less 120p → 3 segments) 검증 + (b) 검증 게이트(_is_clear_bundle)가 정상 응답 수락 / 비정상(중첩·gap·tiny child·N>MAX) 거부 + (c) flag OFF(기본) → LLM 절대 호출 안 함(call_deep count==0), flag ON → 호출됨(positive control) + +DB·PyMuPDF 불요(unit) — AsyncSession 은 최소 fake, fitz 는 sys.modules 주입 fake. +라이브 LLM 호출 없음(call_deep 는 fixture 반환 monkeypatch). worker-process 레벨 E2E(실 PDF +번들 분할, 보류 백오프 DB 기록)는 GPU 라이브 게이트에서 별도 실측. +[[feedback_external_api_fixture_first]] / [[feedback_scaffold_first_for_external_cost_pr]] +""" + +from __future__ import annotations + +import json +import sys +import types +from pathlib import Path + +import pytest + +sys.path.insert(0, str(Path(__file__).parent.parent / "app")) + +from ai.client import parse_json_response # noqa: E402 +import workers.presegment_worker as pw # noqa: E402 +from workers.presegment_worker import ( # noqa: E402 + SegmentationOutput, + _is_clear_bundle, + _segments_from_output, +) + +# ─── 대표 fixture: ToC-less 120p 번들 → 3 segments (1-based inclusive, 전범위·무중첩) ─── +GOOD_LLM_JSON = json.dumps( + { + "is_bundle": True, + "segments": [ + {"start_page": 1, "end_page": 40, "title": "문서 A"}, + {"start_page": 41, "end_page": 85, "title": "문서 B"}, + {"start_page": 86, "end_page": 120, "title": "문서 C"}, + ], + "confidence": 0.82, + }, + ensure_ascii=False, +) + +PAGE_COUNT = 120 + + +# ─── (a) parse_json_response + SegmentationOutput 검증 ────────────────────── + + +def test_parse_and_validate_good_fixture(): + parsed = parse_json_response(GOOD_LLM_JSON) + assert parsed is not None + out = SegmentationOutput.model_validate(parsed) + assert out.is_bundle is True + assert len(out.segments) == 3 + assert out.segments[0].start_page == 1 + assert out.segments[-1].end_page == PAGE_COUNT + assert out.confidence == pytest.approx(0.82) + + +def test_parse_tolerates_think_and_fence(): + """house parse_json_response 가 + ```json fence 를 벗겨낸다.""" + wrapped = f"분석중...\n```json\n{GOOD_LLM_JSON}\n```" + parsed = parse_json_response(wrapped) + out = SegmentationOutput.model_validate(parsed) + assert out.is_bundle is True and len(out.segments) == 3 + + +# ─── (b) 검증 게이트 accept / reject ──────────────────────────────────────── + + +def _segments(*spans): + return [{"start_page": s, "end_page": e, "title": ""} for (s, e) in spans] + + +def test_gate_accepts_good(): + out = SegmentationOutput.model_validate(parse_json_response(GOOD_LLM_JSON)) + segs = _segments_from_output(out) + clear, reason = _is_clear_bundle(segs, PAGE_COUNT) + assert clear is True, reason + assert reason == "" + + +def test_gate_rejects_overlap(): + # 41 이어야 할 두번째 start 가 40 으로 중첩 + clear, reason = _is_clear_bundle(_segments((1, 40), (40, 85), (86, 120)), PAGE_COUNT) + assert clear is False + assert "non_contiguous" in reason + + +def test_gate_rejects_gap(): + # 40 다음이 42 로 시작 → 41 빈틈 (non_contiguous 로 검출) + clear, reason = _is_clear_bundle(_segments((1, 40), (42, 85), (86, 120)), PAGE_COUNT) + assert clear is False + assert "non_contiguous" in reason + + +def test_gate_rejects_tiny_child(): + # 두번째 자식 41..43 = 3p < MIN_CHILD_PAGES(5) + clear, reason = _is_clear_bundle(_segments((1, 40), (41, 43), (44, 120)), PAGE_COUNT) + assert clear is False + assert "child_too_small" in reason + + +def test_gate_rejects_coverage_not_full(): + # 마지막이 page_count 에 못 미침 + clear, reason = _is_clear_bundle(_segments((1, 40), (41, 85), (86, 110)), PAGE_COUNT) + assert clear is False + assert "last_end_not_page_count" in reason + + +def test_gate_rejects_too_many_children(): + # N > MAX_CHILDREN — 각 자식 MIN_CHILD_PAGES 만족시키되 개수만 초과 + n = pw.MAX_CHILDREN + 1 + pc = n * pw.MIN_CHILD_PAGES + spans = [ + (i * pw.MIN_CHILD_PAGES + 1, (i + 1) * pw.MIN_CHILD_PAGES) for i in range(n) + ] + clear, reason = _is_clear_bundle(_segments(*spans), pc) + assert clear is False + assert "too_many_children" in reason + + +def test_gate_rejects_single_segment(): + clear, reason = _is_clear_bundle(_segments((1, 120)), PAGE_COUNT) + assert clear is False + assert "too_few_level1_entries" in reason + + +# ─── 공통 fake (DB / PyMuPDF) ────────────────────────────────────────────── + + +class _FakeDoc: + """presegment 가 읽는 Document 필드만 가진 최소 stand-in.""" + + def __init__(self, doc_id=1): + self.id = doc_id + self.file_path = "PKM/bundle.pdf" + self.file_hash = "deadbeef" + self.file_format = "pdf" + self.file_size = 123 + self.file_type = "document" + self.import_source = "upload" + self.original_filename = "bundle.pdf" + self.source_channel = None + self.category = None + self.data_origin = None + self.doc_purpose = None + self.material_type = None + self.jurisdiction = None + self.title = "번들" + self.presegment_role = None + self.bundle_page_start = None + self.bundle_page_end = None + self.extracted_at = None + self.extracted_text = None + + +class _ScalarResult: + def __init__(self, rows): + self._rows = rows + + def scalars(self): + return self + + def all(self): + return list(self._rows) + + +class _FakeSession: + """_create_children / process 가 쓰는 AsyncSession 표면만 구현. + + execute() = 기존 자식 lineage 조회 → 빈 결과(첫 분할). add/flush 로 child.id 부여. + get() = document_id → 미리 등록한 doc, child_id → 생성된 child. + """ + + def __init__(self, doc): + self._docs = {doc.id: doc} + self.added = [] + self.commits = 0 + self.enqueued = [] # enqueue_stage monkeypatch 가 채움 + self._next_id = 1000 + + async def get(self, _model, oid): + return self._docs.get(oid) + + async def execute(self, _stmt): + # _create_children 의 기존 자식 조회 → 항상 빈(첫 분할). enqueue_stage 는 monkeypatch. + return _ScalarResult([]) + + def add(self, obj): + self.added.append(obj) + # child Document 에 id 부여 (flush 대용 — _FakeDoc/실 Document 모두 setattr 가능) + if getattr(obj, "id", None) is None and hasattr(obj, "presegment_role"): + self._next_id += 1 + obj.id = self._next_id + self._docs[obj.id] = obj + + async def flush(self): + for obj in self.added: + if getattr(obj, "id", None) is None and hasattr(obj, "presegment_role"): + self._next_id += 1 + obj.id = self._next_id + self._docs[obj.id] = obj + + async def commit(self): + self.commits += 1 + + +def _install_fake_fitz(monkeypatch, *, page_count=PAGE_COUNT, toc=None, first_lines=None): + """sys.modules['fitz'] 에 fake 주입 — worker 의 `import fitz` 가 이걸 받게 한다.""" + toc = toc or [] + + class _FakePage: + def __init__(self, idx): + self._idx = idx + + def get_text(self): + if first_lines and self._idx < len(first_lines): + return first_lines[self._idx] + return f"page {self._idx + 1} body text" + + class _FakePdf: + def __init__(self): + self.page_count = page_count + + def get_toc(self, simple=True): + return list(toc) + + def __getitem__(self, idx): + return _FakePage(idx) + + def __enter__(self): + return self + + def __exit__(self, *exc): + return False + + fake = types.ModuleType("fitz") + fake.open = lambda *_a, **_k: _FakePdf() + monkeypatch.setitem(sys.modules, "fitz", fake) + return fake + + +class _SpyClient: + """AIClient stand-in — call_deep 호출 횟수 카운트 + 지정 응답 반환.""" + + calls = 0 + response = GOOD_LLM_JSON + + def __init__(self): + type(self).calls += 1 # 인스턴스화 자체는 비용 아님 — 호출 카운트는 call_deep 기준 + + async def call_deep(self, prompt, system=None): + type(self)._deep_calls += 1 + return type(self).response + + async def close(self): + pass + + +@pytest.fixture(autouse=True) +def _reset_spy(): + _SpyClient.calls = 0 + _SpyClient._deep_calls = 0 + _SpyClient.response = GOOD_LLM_JSON + yield + + +# ─── (b) _llm_boundary_fallback 수락/거부 (mocked LLM) ────────────────────── + + +@pytest.mark.asyncio +async def test_fallback_accepts_good_and_creates_children(monkeypatch): + """정상 LLM 응답 → 게이트 통과 → _create_children 가 3 자식 + parent 표식.""" + _install_fake_fitz(monkeypatch) + monkeypatch.setattr(pw, "AIClient", _SpyClient) + # enqueue_stage 는 DB 의존 — no-op 으로 대체 (호출 인자만 기록) + enq = [] + + async def _fake_enqueue(session, doc_id, stage, **kw): + enq.append((doc_id, stage)) + return True + + monkeypatch.setattr(pw, "enqueue_stage", _fake_enqueue) + + doc = _FakeDoc() + session = _FakeSession(doc) + ok = await pw._llm_boundary_fallback(doc, Path("/tmp/bundle.pdf"), PAGE_COUNT, session) + + assert ok is True + assert _SpyClient._deep_calls == 1 + # 자식 3개 생성 + parent 표식 + lineage 3 + commit + children = [o for o in session.added if getattr(o, "presegment_role", None) == "child"] + assert len(children) == 3 + assert doc.presegment_role == "parent" + assert sum(1 for o in session.added if o.__class__.__name__ == "DocumentLineage") == 3 + assert {s for (_id, s) in enq} == {"extract"} + + +@pytest.mark.asyncio +async def test_fallback_rejects_bad_segments(monkeypatch): + """LLM 이 중첩 경계 반환 → 게이트 거부 → False + 자식 0 (단일문서).""" + _install_fake_fitz(monkeypatch) + bad = json.dumps({ + "is_bundle": True, + "segments": [ + {"start_page": 1, "end_page": 40}, + {"start_page": 40, "end_page": 85}, # 중첩 + {"start_page": 86, "end_page": 120}, + ], + }) + _SpyClient.response = bad + monkeypatch.setattr(pw, "AIClient", _SpyClient) + + async def _fake_enqueue(*a, **k): + return True + + monkeypatch.setattr(pw, "enqueue_stage", _fake_enqueue) + + doc = _FakeDoc() + session = _FakeSession(doc) + ok = await pw._llm_boundary_fallback(doc, Path("/tmp/b.pdf"), PAGE_COUNT, session) + + assert ok is False + assert _SpyClient._deep_calls == 1 + assert [o for o in session.added if getattr(o, "presegment_role", None) == "child"] == [] + assert doc.presegment_role is None + + +@pytest.mark.asyncio +async def test_fallback_rejects_is_bundle_false(monkeypatch): + """is_bundle=false → 호출은 했으나 분할 안 함(False, 자식 0).""" + _install_fake_fitz(monkeypatch) + _SpyClient.response = json.dumps({"is_bundle": False, "segments": []}) + monkeypatch.setattr(pw, "AIClient", _SpyClient) + + async def _fake_enqueue(*a, **k): + return True + + monkeypatch.setattr(pw, "enqueue_stage", _fake_enqueue) + + doc = _FakeDoc() + session = _FakeSession(doc) + ok = await pw._llm_boundary_fallback(doc, Path("/tmp/b.pdf"), PAGE_COUNT, session) + assert ok is False + assert _SpyClient._deep_calls == 1 + assert doc.presegment_role is None + + +# ─── (c) flag gating — OFF=호출 0 (deployed default 무변), ON=호출됨 ─────────── + + +@pytest.mark.asyncio +async def test_flag_off_never_calls_llm(monkeypatch): + """PRESEGMENT_LLM_FALLBACK=False(기본) → 큰 ToC-less PDF 도 LLM 미호출 = 오늘과 동일.""" + monkeypatch.setattr(pw, "PRESEGMENT_LLM_FALLBACK", False) + _install_fake_fitz(monkeypatch, page_count=120, toc=[]) # 대형 + level-1 ToC 없음 = 애매 + monkeypatch.setattr(pw, "AIClient", _SpyClient) + monkeypatch.setattr(pw, "_resolve_path", lambda raw: Path("/tmp/bundle.pdf")) + + async def _fake_enqueue(*a, **k): + return True + + monkeypatch.setattr(pw, "enqueue_stage", _fake_enqueue) + + doc = _FakeDoc() + session = _FakeSession(doc) + await pw.process(doc.id, session) + + assert _SpyClient._deep_calls == 0 # ★ LLM 절대 호출 안 됨 + assert doc.presegment_role is None # 단일문서 (분할 안 함) + assert session.commits == 0 + + +@pytest.mark.asyncio +async def test_flag_on_calls_llm_and_splits(monkeypatch): + """positive control — flag ON 이면 같은 입력에 LLM 호출 + 게이트 통과 시 분할.""" + monkeypatch.setattr(pw, "PRESEGMENT_LLM_FALLBACK", True) + _install_fake_fitz(monkeypatch, page_count=120, toc=[]) + _SpyClient.response = GOOD_LLM_JSON + monkeypatch.setattr(pw, "AIClient", _SpyClient) + monkeypatch.setattr(pw, "_resolve_path", lambda raw: Path("/tmp/bundle.pdf")) + + async def _fake_enqueue(*a, **k): + return True + + monkeypatch.setattr(pw, "enqueue_stage", _fake_enqueue) + + doc = _FakeDoc() + session = _FakeSession(doc) + await pw.process(doc.id, session) + + assert _SpyClient._deep_calls == 1 # LLM 호출됨 + assert doc.presegment_role == "parent" # 분할 수행 + children = [o for o in session.added if getattr(o, "presegment_role", None) == "child"] + assert len(children) == 3