Compare commits

...

1 Commits

Author SHA1 Message Date
hyungi 23bb5ac9c9 feat(presegment): G2 PR-3 — LLM 경계 폴백 (flag-gated, 기본 OFF, scaffold-first)
ToC 없는/게이트 미달 대형 PDF(>=60p)에 한해 off-card Qwen(맥북, call_deep_or_defer,
StageDeferred-safe) 경계 제안 → 동일 검증게이트(_is_clear_bundle) 통과 시에만 deterministic 과
공유하는 _create_children 로 분할. is_bundle=false/파싱·검증 실패=단일문서(오늘과 동일)+로깅.
- env PRESEGMENT_LLM_FALLBACK 기본 false → 배포 동작 무변(LLM 미호출, 검증=unit test)
- 자식생성 _create_children 공유 헬퍼로 리팩터(deterministic+LLM 단일 경로, 동작 동일)
- SegmentationOutput Pydantic + parse_json_response(house 패턴) + per-page heading 샘플(본문 미전송)
- prompt app/prompts/presegment_boundaries.txt + tests/test_presegment_llm.py(14, fitz/DB/LLM mock)
no direct HTTP·no silent fallback. 활성=flag ON + 실 router fixture 검증 후.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-18 17:52:27 +09:00
3 changed files with 720 additions and 79 deletions
+41
View File
@@ -0,0 +1,41 @@
You are a document-boundary detector. Output ONLY JSON {is_bundle, segments:[{start_page,end_page,title}]}.
You are given a single PDF that may be a "bundle" — several independent logical documents
concatenated into one file (for example: multiple laws, multiple reports, or multiple papers
scanned together). Your job is to decide whether it is a bundle and, if so, where each logical
document starts and ends.
You receive only a compact sample per page: the page number and the first line / heading of that
page (text may be truncated). Use these heading/first-line signals to detect where a new logical
document begins (a new title page, a new cover, a clearly new document title, a restart of
numbering, etc.). You do NOT receive the full text.
Output rules:
- Respond with STRICT JSON only. No prose, no markdown, no code fence.
- Schema:
{
"is_bundle": true | false,
"segments": [
{"start_page": <int>, "end_page": <int>, "title": "<string or null>"}
]
}
- Page numbers are 1-based and INCLUSIVE. start_page=1 is the first page; end_page equals the last
page of that segment.
- Segments MUST fully cover every page with NO gaps and NO overlaps:
- the first segment MUST start at page 1,
- each next segment MUST start exactly one page after the previous segment's end_page,
- the last segment MUST end at the final page (page_count).
- Order segments by start_page ascending.
- title = a short title for that logical document if you can infer one from its first page,
otherwise null.
If the file is NOT a bundle (it is a single logical document), respond:
{"is_bundle": false, "segments": []}
Be conservative: only report is_bundle=true when the heading signals clearly indicate separate
logical documents. When unsure, return is_bundle=false.
page_count: {page_count}
Per-page samples (one per line, "p{n}: {first line}"):
{page_samples}
+279 -79
View File
@@ -4,11 +4,19 @@
- 非PDF(file_format != pdf · suffix != .pdf) = 즉시 fast-exit → enqueue_next_stage 가 extract 로 흘림. - 非PDF(file_format != pdf · suffix != .pdf) = 즉시 fast-exit → enqueue_next_stage 가 extract 로 흘림.
- PDF = PyMuPDF ToC(level-1) deterministic 분석. '명확한 번들' 만 자식 분할, 나머지는 단일문서로 extract. - PDF = PyMuPDF ToC(level-1) deterministic 분석. '명확한 번들' 만 자식 분할, 나머지는 단일문서로 extract.
이 PR 은 **deterministic 만** (LLM fallback = 후속 PR). 판정이 애매하면 보수적으로 분할하지 않고 deterministic 경로(PR-G2-2): 판정이 애매하면 보수적으로 분할하지 않고 단일문서로 둔다
단일문서로 둔다(bias to NOT splitting). 분할 = '확실한 번들' 만: (bias to NOT splitting). 분할 = '확실한 번들' 만:
- page_count >= MIN_BUNDLE_PAGES AND level-1 ToC 항목 >= 2 AND 모든 자식 >= MIN_CHILD_PAGES - page_count >= MIN_BUNDLE_PAGES AND level-1 ToC 항목 >= 2 AND 모든 자식 >= MIN_CHILD_PAGES
AND 단조 증가·비중첩 AND [1, page_count] 전 범위 커버 AND 2 <= N <= MAX_CHILDREN. AND 단조 증가·비중첩 AND [1, page_count] 전 범위 커버 AND 2 <= N <= MAX_CHILDREN.
LLM 경계 폴백(PR-G2-3, env PRESEGMENT_LLM_FALLBACK, 기본 OFF — scaffold-first): deterministic
'명확한 번들' 을 못 만든 대형 PDF(ToC 없음/level-1 없음/게이트 미달)에 한해, OFF 면 오늘과
동일(단일문서)이고 ON 이면 off-card Qwen(맥북, 라우터 :8890, model=qwen-macbook)에게 경계를
제안받는다. compact per-page heading 샘플만 전송(본문 미전송). LLM 출력은 **동일 검증 게이트
(_is_clear_bundle)** 통과 시에만 deterministic 과 같은 _create_children 경로로 분할 —
is_bundle=false / 파싱·검증 실패 = 단일문서(오늘과 동일) + presegment_llm_rejected 로깅.
맥북 불가(503/연결/절단)는 StageDeferred 로 큐 재시도(백오프, no silent fallback).
분할 시 ★후보 A(물리분할 없음, uq_documents_file_path 해소): 자식 file_path = unique 합성값 분할 시 ★후보 A(물리분할 없음, uq_documents_file_path 해소): 자식 file_path = unique 합성값
`{부모경로}#p{start}-{end}` (UNIQUE 제약 통과), 실파일은 `bundle_source_path()` 로 부모 경로 복원. `{부모경로}#p{start}-{end}` (UNIQUE 제약 통과), 실파일은 `bundle_source_path()` 로 부모 경로 복원.
자식은 bundle_page_start/end(1-based inclusive) 로 부모 파일의 자기 page 범위만 가리킨다. 자식은 bundle_page_start/end(1-based inclusive) 로 부모 파일의 자기 page 범위만 가리킨다.
@@ -32,9 +40,11 @@ import re
import unicodedata import unicodedata
from pathlib import Path from pathlib import Path
from pydantic import BaseModel, ValidationError
from sqlalchemy import select from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.ext.asyncio import AsyncSession
from ai.client import AIClient, call_deep_or_defer, parse_json_response
from core.config import settings from core.config import settings
from core.utils import setup_logger from core.utils import setup_logger
from models.document import Document from models.document import Document
@@ -54,6 +64,40 @@ MAX_CHILDREN = int(os.getenv("PRESEGMENT_MAX_CHILDREN", "50"))
# marker_worker._to_marker_path 와 동일 — NAS 상대경로 → 컨테이너 절대경로 prefix. # marker_worker._to_marker_path 와 동일 — NAS 상대경로 → 컨테이너 절대경로 prefix.
CONTAINER_PATH_PREFIX = os.getenv("MARKER_CONTAINER_PATH_PREFIX", "/documents") CONTAINER_PATH_PREFIX = os.getenv("MARKER_CONTAINER_PATH_PREFIX", "/documents")
# ─── PR-G2-3 LLM 경계 폴백 (scaffold-first, 기본 OFF) ───
# PRESEGMENT_LLM_FALLBACK: 기본 "false". OFF 면 deterministic 경로만(=오늘과 동일 — 애매하면
# 단일문서). ON 이면 deterministic 이 '명확한 번들' 을 못 만든 대형 PDF(page_count >=
# MIN_BUNDLE_PAGES) 에 한해 off-card Qwen(맥북, 라우터 :8890 경유)에게 경계를 제안받아
# **동일 검증 게이트(_is_clear_bundle)** 통과 시에만 deterministic 과 같은 자식 생성 경로로 분할.
# 검증 실패/파싱 실패/is_bundle=false = 단일문서(오늘과 동일) + presegment_llm_rejected 로깅.
PRESEGMENT_LLM_FALLBACK = os.getenv("PRESEGMENT_LLM_FALLBACK", "false").lower() in (
"1", "true", "yes", "on",
)
# LLM 에 보내는 per-page 샘플의 page 당 char 상한 (heading/첫줄만 — 본문 미전송).
PRESEGMENT_LLM_PAGE_CHARS = int(os.getenv("PRESEGMENT_LLM_PAGE_CHARS", "80"))
# 전체 page-sample 블록의 char 상한 (수 KB 가드 — 초과 시 잘라냄, 본문 누출/페이로드 폭발 방지).
PRESEGMENT_LLM_SAMPLE_CHARS = int(os.getenv("PRESEGMENT_LLM_SAMPLE_CHARS", "12000"))
# 경계 폴백 프롬프트 (app/prompts/presegment_boundaries.txt). system 지시 + 1-based inclusive·
# 전범위 커버·무중첩 규칙. {page_count}/{page_samples} 를 str.replace 로 주입.
_PRESEGMENT_PROMPT_PATH = Path(__file__).parent.parent / "prompts" / "presegment_boundaries.txt"
class Segment(BaseModel):
"""LLM 이 제안하는 1-based inclusive page 범위 한 조각."""
start_page: int
end_page: int
title: str | None = None
class SegmentationOutput(BaseModel):
"""presegment_boundaries 응답 스키마. parse_json_response → model_validate."""
is_bundle: bool = False
segments: list[Segment] = []
confidence: float | None = None
def _resolve_path(file_path: str) -> Path | None: def _resolve_path(file_path: str) -> Path | None:
"""NFC(DB) vs NFD(NFS) 한글 경로 차이 흡수. thumbnail_worker._resolve_path 와 동일 패턴.""" """NFC(DB) vs NFD(NFS) 한글 경로 차이 흡수. thumbnail_worker._resolve_path 와 동일 패턴."""
@@ -205,6 +249,216 @@ async def _ensure_child_extract(session: AsyncSession, child_id: int) -> None:
await enqueue_stage(session, child_id, "extract") await enqueue_stage(session, child_id, "extract")
async def _create_children(
doc: Document, segments: list[dict], session: AsyncSession
) -> int:
"""검증된 segments 로 자식 N개 생성 + lineage + extract enqueue + 부모 표식 (멱등).
deterministic '명확한 번들' 경로와 LLM 폴백 경로가 공유하는 단일 자식 생성 경로.
호출 전 segments 는 반드시 _is_clear_bundle 검증을 통과해야 한다(여기선 재검증 X).
commit 까지 수행. 반환값 = 실제 생성한 자식 수(이미 존재해 수렴만 한 경우 0).
"""
# ─── 멱등 체크: 이미 자식이 있으면 수렴만 (재생성 금지) ───
existing_children = (
await session.execute(
select(DocumentLineage.derived_document_id).where(
DocumentLineage.source_document_id == doc.id,
DocumentLineage.relation_type == "segmented_from",
)
)
).scalars().all()
if existing_children:
# 부모 표식이 누락된 경우 보정(이전 부분실패 복구).
if doc.presegment_role != "parent":
doc.presegment_role = "parent"
for child_id in existing_children:
await _ensure_child_extract(session, child_id)
await session.commit()
logger.info(
f"[presegment] id={doc.id} children already exist "
f"(n={len(existing_children)}) → converge(ensure extract), no re-create"
)
return 0
# ─── 자식 N개 생성 + lineage + extract enqueue ───
created_ids: list[int] = []
for seg in segments:
start, end = seg["start_page"], seg["end_page"]
child = Document(
# 후보 A: 자식 file_path = unique 합성값 `{부모경로}#p{s}-{e}` (uq_documents_file_path
# 충돌 회피). 실파일은 bundle_source_path() 로 복원(부모 경로). 물리 분할 없음 —
# 자식은 bundle_page_start/end 로 부모 파일을 슬라이스.
file_path=f"{doc.file_path}#p{start}-{end}",
file_hash=_child_file_hash(doc.file_hash, start, end),
file_format=doc.file_format,
file_size=doc.file_size,
file_type=doc.file_type,
import_source=doc.import_source,
original_filename=doc.original_filename,
source_channel=doc.source_channel,
category=doc.category,
data_origin=doc.data_origin,
doc_purpose=doc.doc_purpose,
# 안전 자료실 축은 부모에서 상속(분할이 자료유형/관할을 바꾸지 않음).
material_type=doc.material_type,
jurisdiction=doc.jurisdiction,
title=_child_title(doc, seg),
bundle_page_start=start,
bundle_page_end=end,
presegment_role="child",
)
session.add(child)
await session.flush() # child.id 확보
created_ids.append(child.id)
session.add(
DocumentLineage(
source_document_id=doc.id,
derived_document_id=child.id,
relation_type="segmented_from",
meta={"start_page": start, "end_page": end},
)
)
# 자식 extract 는 워커가 직접 enqueue (부모는 'parent' 라 extract 로 흐르지 않음).
await enqueue_stage(session, child.id, "extract")
# 부모 = 파일 홀더. presegment→extract 전이는 enqueue_next_stage 가 'parent' 면 억제.
doc.presegment_role = "parent"
await session.commit()
logger.info(
f"[presegment] id={doc.id} SPLIT into {len(created_ids)} children "
f"child_ids={created_ids}"
)
return len(created_ids)
def _segments_from_output(out: "SegmentationOutput") -> list[dict]:
"""SegmentationOutput.segments(Pydantic) → _is_clear_bundle / _create_children 가 쓰는 dict 형태."""
return [
{"start_page": s.start_page, "end_page": s.end_page, "title": (s.title or "")}
for s in out.segments
]
def _page_samples(pdf, page_count: int) -> str:
"""LLM 입력용 compact per-page 샘플 — page 당 heading/첫줄만(`p{n}: {firstline}`).
PyMuPDF page.get_text() 로 page 별 텍스트를 스트리밍하되 page 당 첫 비공백 줄만,
PRESEGMENT_LLM_PAGE_CHARS 로 잘라 본문 누출 차단. 전체 블록은 PRESEGMENT_LLM_SAMPLE_CHARS
가드로 상한(수 KB) — 초과 시 그 지점에서 중단(앞쪽 페이지 우선 보존).
"""
lines: list[str] = []
total = 0
for i in range(page_count):
try:
text = pdf[i].get_text() or ""
except Exception:
text = ""
first = ""
for ln in text.splitlines():
ln = ln.strip()
if ln:
first = ln
break
first = first[:PRESEGMENT_LLM_PAGE_CHARS]
entry = f"p{i + 1}: {first}"
if total + len(entry) + 1 > PRESEGMENT_LLM_SAMPLE_CHARS:
break
lines.append(entry)
total += len(entry) + 1
return "\n".join(lines)
async def _llm_boundary_fallback(
doc: Document, source: Path, page_count: int, session: AsyncSession
) -> bool:
"""애매 + 대형(ToC-less 등) PDF 에 대해 off-card Qwen 으로 경계 제안 → 검증 → 분할.
반환 True = LLM 경로가 분할을 수행(또는 멱등 수렴)했으므로 호출자는 추가 처리 없이 return.
반환 False = is_bundle=false / 파싱 실패 / 검증 실패 → 호출자는 단일문서(오늘과 동일) 처리.
맥북 불가(503/연결/절단)는 call_deep_or_defer 가 StageDeferred 로 raise → 큐 재시도(백오프).
silent fallback 금지 — deep 슬롯 외 다른 backend 자동 호출 안 함.
"""
import fitz # PyMuPDF — deterministic 경로와 동일 의존
# per-page 샘플은 파일을 다시 열어 스트리밍(deterministic with 블록과 분리해 그 경로 무회귀).
try:
with fitz.open(str(source)) as pdf:
samples = _page_samples(pdf, page_count)
except Exception as exc:
logger.warning(
f"[presegment] id={doc.id} llm fallback sample 실패 "
f"({type(exc).__name__}: {exc}) → single doc(extract)"
)
return False
try:
template = _PRESEGMENT_PROMPT_PATH.read_text(encoding="utf-8")
except Exception as exc:
logger.warning(
f"[presegment] id={doc.id} prompt 로드 실패 ({type(exc).__name__}: {exc}) "
f"→ single doc(extract)"
)
return False
prompt = template.replace("{page_count}", str(page_count)).replace(
"{page_samples}", samples
)
# off-card 호출 — call_deep_or_defer 가 deep 슬롯(맥북, 라우터 :8890, model=qwen-macbook)
# 으로 라우팅. 맥북 불가는 StageDeferred 로 전파(여기서 잡지 않음 → 큐가 보류/백오프).
# classify_worker 와 동일하게 AIClient() 인스턴스화.
client = AIClient()
try:
raw = await call_deep_or_defer(client, prompt)
finally:
await client.close()
parsed = parse_json_response(raw)
if not parsed:
logger.info(
f"[presegment] presegment_llm_rejected id={doc.id} "
f"reason=parse_failed raw={raw[:160]!r} → single doc(extract)"
)
return False
try:
out = SegmentationOutput.model_validate(parsed)
except (ValidationError, ValueError, TypeError) as exc:
logger.info(
f"[presegment] presegment_llm_rejected id={doc.id} "
f"reason=schema_invalid({type(exc).__name__}) → single doc(extract)"
)
return False
if not out.is_bundle:
logger.info(
f"[presegment] presegment_llm_rejected id={doc.id} "
f"reason=is_bundle_false → single doc(extract)"
)
return False
segments = _segments_from_output(out)
clear, reason = _is_clear_bundle(segments, page_count)
if not clear:
# LLM 출력을 그대로 믿지 않음 — deterministic 과 동일 게이트 미달이면 단일문서.
logger.info(
f"[presegment] presegment_llm_rejected id={doc.id} "
f"reason={reason} n={len(segments)} pages={page_count} → single doc(extract)"
)
return False
n = await _create_children(doc, segments, session)
logger.info(
f"[presegment] id={doc.id} LLM-SPLIT accepted "
f"(pages={page_count} n={len(segments)} created={n} "
f"confidence={out.confidence})"
)
return True
async def process(document_id: int, session: AsyncSession) -> None: async def process(document_id: int, session: AsyncSession) -> None:
"""presegment stage 워커 진입점. queue_consumer 가 호출. """presegment stage 워커 진입점. queue_consumer 가 호출.
@@ -269,8 +523,17 @@ async def process(document_id: int, session: AsyncSession) -> None:
segments = _level1_segments(toc, page_count) segments = _level1_segments(toc, page_count)
if not segments: if not segments:
# 큰 PDF 인데 ToC 없음/level-1 없음 = 애매(LLM fallback 대상, 후속 PR). # 큰 PDF 인데 ToC 없음/level-1 없음 = 애매. flag ON 이면 LLM 경계 폴백(PR-G2-3),
# 이 PR 은 기본 = 단일문서로 처리하고 사유를 남긴다. # OFF(기본) 이면 오늘과 동일 — 단일문서로 처리하고 사유를 남긴다.
if PRESEGMENT_LLM_FALLBACK:
logger.info(
f"[presegment] presegment_ambiguous id={document_id} "
f"reason=no_level1_toc pages={page_count} → LLM fallback"
)
if await _llm_boundary_fallback(doc, source, page_count, session):
return
# LLM 이 분할하지 않음(is_bundle=false / 검증·파싱 실패) — 단일문서.
return
logger.info( logger.info(
f"[presegment] presegment_ambiguous id={document_id} " f"[presegment] presegment_ambiguous id={document_id} "
f"reason=no_level1_toc pages={page_count} → single doc(extract)" f"reason=no_level1_toc pages={page_count} → single doc(extract)"
@@ -279,84 +542,21 @@ async def process(document_id: int, session: AsyncSession) -> None:
clear, reason = _is_clear_bundle(segments, page_count) clear, reason = _is_clear_bundle(segments, page_count)
if not clear: if not clear:
# 큰 PDF + ToC 는 있으나 '명확한 번들' 기준 미달 = 애매 → 단일문서(분할 안 함). # 큰 PDF + ToC 는 있으나 '명확한 번들' 기준 미달 = 애매. flag ON 이면 LLM 경계 폴백,
# OFF(기본) 이면 오늘과 동일 — 단일문서(분할 안 함).
if PRESEGMENT_LLM_FALLBACK:
logger.info(
f"[presegment] presegment_ambiguous id={document_id} "
f"reason={reason} pages={page_count} level1={len(segments)} → LLM fallback"
)
if await _llm_boundary_fallback(doc, source, page_count, session):
return
return
logger.info( logger.info(
f"[presegment] presegment_ambiguous id={document_id} " f"[presegment] presegment_ambiguous id={document_id} "
f"reason={reason} pages={page_count} level1={len(segments)} → single doc(extract)" f"reason={reason} pages={page_count} level1={len(segments)} → single doc(extract)"
) )
return return
# ─── (5) 명확한 번들 — 멱등 체크: 이미 자식이 있으면 수렴만 ─── # ─── (5) 명확한 번들 (deterministic) — 공유 자식 생성 경로 (멱등 수렴 포함) ───
existing_children = ( await _create_children(doc, segments, session)
await session.execute(
select(DocumentLineage.derived_document_id).where(
DocumentLineage.source_document_id == doc.id,
DocumentLineage.relation_type == "segmented_from",
)
)
).scalars().all()
if existing_children:
# 부모 표식이 누락된 경우 보정(이전 부분실패 복구).
if doc.presegment_role != "parent":
doc.presegment_role = "parent"
for child_id in existing_children:
await _ensure_child_extract(session, child_id)
await session.commit()
logger.info(
f"[presegment] id={document_id} children already exist "
f"(n={len(existing_children)}) → converge(ensure extract), no re-create"
)
return
# ─── (6) 자식 N개 생성 + lineage + extract enqueue ───
n = len(segments)
created_ids: list[int] = []
for seg in segments:
start, end = seg["start_page"], seg["end_page"]
child = Document(
# 후보 A: 자식 file_path = unique 합성값 `{부모경로}#p{s}-{e}` (uq_documents_file_path
# 충돌 회피). 실파일은 bundle_source_path() 로 복원(부모 경로). 물리 분할 없음 —
# 자식은 bundle_page_start/end 로 부모 파일을 슬라이스.
file_path=f"{doc.file_path}#p{start}-{end}",
file_hash=_child_file_hash(doc.file_hash, start, end),
file_format=doc.file_format,
file_size=doc.file_size,
file_type=doc.file_type,
import_source=doc.import_source,
original_filename=doc.original_filename,
source_channel=doc.source_channel,
category=doc.category,
data_origin=doc.data_origin,
doc_purpose=doc.doc_purpose,
# 안전 자료실 축은 부모에서 상속(분할이 자료유형/관할을 바꾸지 않음).
material_type=doc.material_type,
jurisdiction=doc.jurisdiction,
title=_child_title(doc, seg),
bundle_page_start=start,
bundle_page_end=end,
presegment_role="child",
)
session.add(child)
await session.flush() # child.id 확보
created_ids.append(child.id)
session.add(
DocumentLineage(
source_document_id=doc.id,
derived_document_id=child.id,
relation_type="segmented_from",
meta={"start_page": start, "end_page": end},
)
)
# 자식 extract 는 워커가 직접 enqueue (부모는 'parent' 라 extract 로 흐르지 않음).
await enqueue_stage(session, child.id, "extract")
# 부모 = 파일 홀더. presegment→extract 전이는 enqueue_next_stage 가 'parent' 면 억제.
doc.presegment_role = "parent"
await session.commit()
logger.info(
f"[presegment] id={document_id} SPLIT into {n} children "
f"(pages={page_count}) child_ids={created_ids}"
)
+400
View File
@@ -0,0 +1,400 @@
"""PR-G2-3 — presegment LLM 경계 폴백 단위 테스트.
scaffold-first 안전성 박제:
(a) parse_json_response + SegmentationOutput 가 대표 fixture(ToC-less 120p → 3 segments) 검증
(b) 검증 게이트(_is_clear_bundle)가 정상 응답 수락 / 비정상(중첩·gap·tiny child·N>MAX) 거부
(c) flag OFF(기본) → LLM 절대 호출 안 함(call_deep count==0), flag ON → 호출됨(positive control)
DB·PyMuPDF 불요(unit) — AsyncSession 은 최소 fake, fitz 는 sys.modules 주입 fake.
라이브 LLM 호출 없음(call_deep 는 fixture 반환 monkeypatch). worker-process 레벨 E2E(실 PDF
번들 분할, 보류 백오프 DB 기록)는 GPU 라이브 게이트에서 별도 실측.
[[feedback_external_api_fixture_first]] / [[feedback_scaffold_first_for_external_cost_pr]]
"""
from __future__ import annotations
import json
import sys
import types
from pathlib import Path
import pytest
sys.path.insert(0, str(Path(__file__).parent.parent / "app"))
from ai.client import parse_json_response # noqa: E402
import workers.presegment_worker as pw # noqa: E402
from workers.presegment_worker import ( # noqa: E402
SegmentationOutput,
_is_clear_bundle,
_segments_from_output,
)
# ─── 대표 fixture: ToC-less 120p 번들 → 3 segments (1-based inclusive, 전범위·무중첩) ───
GOOD_LLM_JSON = json.dumps(
{
"is_bundle": True,
"segments": [
{"start_page": 1, "end_page": 40, "title": "문서 A"},
{"start_page": 41, "end_page": 85, "title": "문서 B"},
{"start_page": 86, "end_page": 120, "title": "문서 C"},
],
"confidence": 0.82,
},
ensure_ascii=False,
)
PAGE_COUNT = 120
# ─── (a) parse_json_response + SegmentationOutput 검증 ──────────────────────
def test_parse_and_validate_good_fixture():
parsed = parse_json_response(GOOD_LLM_JSON)
assert parsed is not None
out = SegmentationOutput.model_validate(parsed)
assert out.is_bundle is True
assert len(out.segments) == 3
assert out.segments[0].start_page == 1
assert out.segments[-1].end_page == PAGE_COUNT
assert out.confidence == pytest.approx(0.82)
def test_parse_tolerates_think_and_fence():
"""house parse_json_response 가 <think> + ```json fence 를 벗겨낸다."""
wrapped = f"<think>분석중...</think>\n```json\n{GOOD_LLM_JSON}\n```"
parsed = parse_json_response(wrapped)
out = SegmentationOutput.model_validate(parsed)
assert out.is_bundle is True and len(out.segments) == 3
# ─── (b) 검증 게이트 accept / reject ────────────────────────────────────────
def _segments(*spans):
return [{"start_page": s, "end_page": e, "title": ""} for (s, e) in spans]
def test_gate_accepts_good():
out = SegmentationOutput.model_validate(parse_json_response(GOOD_LLM_JSON))
segs = _segments_from_output(out)
clear, reason = _is_clear_bundle(segs, PAGE_COUNT)
assert clear is True, reason
assert reason == ""
def test_gate_rejects_overlap():
# 41 이어야 할 두번째 start 가 40 으로 중첩
clear, reason = _is_clear_bundle(_segments((1, 40), (40, 85), (86, 120)), PAGE_COUNT)
assert clear is False
assert "non_contiguous" in reason
def test_gate_rejects_gap():
# 40 다음이 42 로 시작 → 41 빈틈 (non_contiguous 로 검출)
clear, reason = _is_clear_bundle(_segments((1, 40), (42, 85), (86, 120)), PAGE_COUNT)
assert clear is False
assert "non_contiguous" in reason
def test_gate_rejects_tiny_child():
# 두번째 자식 41..43 = 3p < MIN_CHILD_PAGES(5)
clear, reason = _is_clear_bundle(_segments((1, 40), (41, 43), (44, 120)), PAGE_COUNT)
assert clear is False
assert "child_too_small" in reason
def test_gate_rejects_coverage_not_full():
# 마지막이 page_count 에 못 미침
clear, reason = _is_clear_bundle(_segments((1, 40), (41, 85), (86, 110)), PAGE_COUNT)
assert clear is False
assert "last_end_not_page_count" in reason
def test_gate_rejects_too_many_children():
# N > MAX_CHILDREN — 각 자식 MIN_CHILD_PAGES 만족시키되 개수만 초과
n = pw.MAX_CHILDREN + 1
pc = n * pw.MIN_CHILD_PAGES
spans = [
(i * pw.MIN_CHILD_PAGES + 1, (i + 1) * pw.MIN_CHILD_PAGES) for i in range(n)
]
clear, reason = _is_clear_bundle(_segments(*spans), pc)
assert clear is False
assert "too_many_children" in reason
def test_gate_rejects_single_segment():
clear, reason = _is_clear_bundle(_segments((1, 120)), PAGE_COUNT)
assert clear is False
assert "too_few_level1_entries" in reason
# ─── 공통 fake (DB / PyMuPDF) ──────────────────────────────────────────────
class _FakeDoc:
"""presegment 가 읽는 Document 필드만 가진 최소 stand-in."""
def __init__(self, doc_id=1):
self.id = doc_id
self.file_path = "PKM/bundle.pdf"
self.file_hash = "deadbeef"
self.file_format = "pdf"
self.file_size = 123
self.file_type = "document"
self.import_source = "upload"
self.original_filename = "bundle.pdf"
self.source_channel = None
self.category = None
self.data_origin = None
self.doc_purpose = None
self.material_type = None
self.jurisdiction = None
self.title = "번들"
self.presegment_role = None
self.bundle_page_start = None
self.bundle_page_end = None
self.extracted_at = None
self.extracted_text = None
class _ScalarResult:
def __init__(self, rows):
self._rows = rows
def scalars(self):
return self
def all(self):
return list(self._rows)
class _FakeSession:
"""_create_children / process 가 쓰는 AsyncSession 표면만 구현.
execute() = 기존 자식 lineage 조회 → 빈 결과(첫 분할). add/flush 로 child.id 부여.
get() = document_id → 미리 등록한 doc, child_id → 생성된 child.
"""
def __init__(self, doc):
self._docs = {doc.id: doc}
self.added = []
self.commits = 0
self.enqueued = [] # enqueue_stage monkeypatch 가 채움
self._next_id = 1000
async def get(self, _model, oid):
return self._docs.get(oid)
async def execute(self, _stmt):
# _create_children 의 기존 자식 조회 → 항상 빈(첫 분할). enqueue_stage 는 monkeypatch.
return _ScalarResult([])
def add(self, obj):
self.added.append(obj)
# child Document 에 id 부여 (flush 대용 — _FakeDoc/실 Document 모두 setattr 가능)
if getattr(obj, "id", None) is None and hasattr(obj, "presegment_role"):
self._next_id += 1
obj.id = self._next_id
self._docs[obj.id] = obj
async def flush(self):
for obj in self.added:
if getattr(obj, "id", None) is None and hasattr(obj, "presegment_role"):
self._next_id += 1
obj.id = self._next_id
self._docs[obj.id] = obj
async def commit(self):
self.commits += 1
def _install_fake_fitz(monkeypatch, *, page_count=PAGE_COUNT, toc=None, first_lines=None):
"""sys.modules['fitz'] 에 fake 주입 — worker 의 `import fitz` 가 이걸 받게 한다."""
toc = toc or []
class _FakePage:
def __init__(self, idx):
self._idx = idx
def get_text(self):
if first_lines and self._idx < len(first_lines):
return first_lines[self._idx]
return f"page {self._idx + 1} body text"
class _FakePdf:
def __init__(self):
self.page_count = page_count
def get_toc(self, simple=True):
return list(toc)
def __getitem__(self, idx):
return _FakePage(idx)
def __enter__(self):
return self
def __exit__(self, *exc):
return False
fake = types.ModuleType("fitz")
fake.open = lambda *_a, **_k: _FakePdf()
monkeypatch.setitem(sys.modules, "fitz", fake)
return fake
class _SpyClient:
"""AIClient stand-in — call_deep 호출 횟수 카운트 + 지정 응답 반환."""
calls = 0
response = GOOD_LLM_JSON
def __init__(self):
type(self).calls += 1 # 인스턴스화 자체는 비용 아님 — 호출 카운트는 call_deep 기준
async def call_deep(self, prompt, system=None):
type(self)._deep_calls += 1
return type(self).response
async def close(self):
pass
@pytest.fixture(autouse=True)
def _reset_spy():
_SpyClient.calls = 0
_SpyClient._deep_calls = 0
_SpyClient.response = GOOD_LLM_JSON
yield
# ─── (b) _llm_boundary_fallback 수락/거부 (mocked LLM) ──────────────────────
@pytest.mark.asyncio
async def test_fallback_accepts_good_and_creates_children(monkeypatch):
"""정상 LLM 응답 → 게이트 통과 → _create_children 가 3 자식 + parent 표식."""
_install_fake_fitz(monkeypatch)
monkeypatch.setattr(pw, "AIClient", _SpyClient)
# enqueue_stage 는 DB 의존 — no-op 으로 대체 (호출 인자만 기록)
enq = []
async def _fake_enqueue(session, doc_id, stage, **kw):
enq.append((doc_id, stage))
return True
monkeypatch.setattr(pw, "enqueue_stage", _fake_enqueue)
doc = _FakeDoc()
session = _FakeSession(doc)
ok = await pw._llm_boundary_fallback(doc, Path("/tmp/bundle.pdf"), PAGE_COUNT, session)
assert ok is True
assert _SpyClient._deep_calls == 1
# 자식 3개 생성 + parent 표식 + lineage 3 + commit
children = [o for o in session.added if getattr(o, "presegment_role", None) == "child"]
assert len(children) == 3
assert doc.presegment_role == "parent"
assert sum(1 for o in session.added if o.__class__.__name__ == "DocumentLineage") == 3
assert {s for (_id, s) in enq} == {"extract"}
@pytest.mark.asyncio
async def test_fallback_rejects_bad_segments(monkeypatch):
"""LLM 이 중첩 경계 반환 → 게이트 거부 → False + 자식 0 (단일문서)."""
_install_fake_fitz(monkeypatch)
bad = json.dumps({
"is_bundle": True,
"segments": [
{"start_page": 1, "end_page": 40},
{"start_page": 40, "end_page": 85}, # 중첩
{"start_page": 86, "end_page": 120},
],
})
_SpyClient.response = bad
monkeypatch.setattr(pw, "AIClient", _SpyClient)
async def _fake_enqueue(*a, **k):
return True
monkeypatch.setattr(pw, "enqueue_stage", _fake_enqueue)
doc = _FakeDoc()
session = _FakeSession(doc)
ok = await pw._llm_boundary_fallback(doc, Path("/tmp/b.pdf"), PAGE_COUNT, session)
assert ok is False
assert _SpyClient._deep_calls == 1
assert [o for o in session.added if getattr(o, "presegment_role", None) == "child"] == []
assert doc.presegment_role is None
@pytest.mark.asyncio
async def test_fallback_rejects_is_bundle_false(monkeypatch):
"""is_bundle=false → 호출은 했으나 분할 안 함(False, 자식 0)."""
_install_fake_fitz(monkeypatch)
_SpyClient.response = json.dumps({"is_bundle": False, "segments": []})
monkeypatch.setattr(pw, "AIClient", _SpyClient)
async def _fake_enqueue(*a, **k):
return True
monkeypatch.setattr(pw, "enqueue_stage", _fake_enqueue)
doc = _FakeDoc()
session = _FakeSession(doc)
ok = await pw._llm_boundary_fallback(doc, Path("/tmp/b.pdf"), PAGE_COUNT, session)
assert ok is False
assert _SpyClient._deep_calls == 1
assert doc.presegment_role is None
# ─── (c) flag gating — OFF=호출 0 (deployed default 무변), ON=호출됨 ───────────
@pytest.mark.asyncio
async def test_flag_off_never_calls_llm(monkeypatch):
"""PRESEGMENT_LLM_FALLBACK=False(기본) → 큰 ToC-less PDF 도 LLM 미호출 = 오늘과 동일."""
monkeypatch.setattr(pw, "PRESEGMENT_LLM_FALLBACK", False)
_install_fake_fitz(monkeypatch, page_count=120, toc=[]) # 대형 + level-1 ToC 없음 = 애매
monkeypatch.setattr(pw, "AIClient", _SpyClient)
monkeypatch.setattr(pw, "_resolve_path", lambda raw: Path("/tmp/bundle.pdf"))
async def _fake_enqueue(*a, **k):
return True
monkeypatch.setattr(pw, "enqueue_stage", _fake_enqueue)
doc = _FakeDoc()
session = _FakeSession(doc)
await pw.process(doc.id, session)
assert _SpyClient._deep_calls == 0 # ★ LLM 절대 호출 안 됨
assert doc.presegment_role is None # 단일문서 (분할 안 함)
assert session.commits == 0
@pytest.mark.asyncio
async def test_flag_on_calls_llm_and_splits(monkeypatch):
"""positive control — flag ON 이면 같은 입력에 LLM 호출 + 게이트 통과 시 분할."""
monkeypatch.setattr(pw, "PRESEGMENT_LLM_FALLBACK", True)
_install_fake_fitz(monkeypatch, page_count=120, toc=[])
_SpyClient.response = GOOD_LLM_JSON
monkeypatch.setattr(pw, "AIClient", _SpyClient)
monkeypatch.setattr(pw, "_resolve_path", lambda raw: Path("/tmp/bundle.pdf"))
async def _fake_enqueue(*a, **k):
return True
monkeypatch.setattr(pw, "enqueue_stage", _fake_enqueue)
doc = _FakeDoc()
session = _FakeSession(doc)
await pw.process(doc.id, session)
assert _SpyClient._deep_calls == 1 # LLM 호출됨
assert doc.presegment_role == "parent" # 분할 수행
children = [o for o in session.added if getattr(o, "presegment_role", None) == "child"]
assert len(children) == 3