Compare commits

..

8 Commits

Author SHA1 Message Date
hyungi 33ee81bf1d feat(presegment): G2 PR-3 — LLM 경계 폴백 (flag-gated, 기본 OFF, scaffold-first)
ToC 없는/게이트 미달 대형 PDF(>=60p)에 한해 off-card Qwen(맥북, call_deep_or_defer,
StageDeferred-safe) 경계 제안 → 동일 검증게이트(_is_clear_bundle) 통과 시에만 deterministic 과
공유하는 _create_children 로 분할. is_bundle=false/파싱·검증 실패=단일문서(오늘과 동일)+로깅.
- env PRESEGMENT_LLM_FALLBACK 기본 false → 배포 동작 무변(LLM 미호출, 검증=unit test)
- 자식생성 _create_children 공유 헬퍼로 리팩터(deterministic+LLM 단일 경로, 동작 동일)
- SegmentationOutput Pydantic + parse_json_response(house 패턴) + per-page heading 샘플(본문 미전송)
- prompt app/prompts/presegment_boundaries.txt + tests/test_presegment_llm.py(14, fitz/DB/LLM mock)
no direct HTTP·no silent fallback. 활성=flag ON + 실 router fixture 검증 후.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-18 17:53:28 +09:00
hyungi e011bdb741 Merge pull request 'Feat/presegment' (#48) from feat/presegment into main
Reviewed-on: #48
2026-06-18 17:36:32 +09:00
hyungi 051ecfda7d Merge pull request 'Feat/mineru extraction' (#47) from feat/mineru-extraction into main
Reviewed-on: #47
2026-06-18 17:36:23 +09:00
hyungi 2eda8d3bdd feat(presegment): G2 인제스트 재활성 — 후보 A e2e 검증 PASS
합성 번들 e2e PASS(자식 3개 합성 file_path·range, uq 위반 0 + 자식 extract range-clamp 1110자
range_ok) 후 인제스트 presegment 재활성(documents.py upload + file_watcher 3곳). 非PDF/단일=통과.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-18 17:22:01 +09:00
hyungi 8930803a11 feat(presegment): G2 후보 A — 자식 합성 file_path + bundle_source_path 실파일 해석
uq_documents_file_path 충돌 해소: 자식 file_path = unique 합성값 '{부모}#p{s}-{e}'
(UNIQUE 통과), 실파일은 bundle_source_path() 로 부모경로 복원(접미사 strip, 결정적).
- presegment_worker: bundle_source_path() 헬퍼 + 자식 합성 file_path
- extract_worker 자식분기: bundle_source_path + NFC/NFD resolve 로 실파일 range 추출
- marker_worker: container_path = bundle_source_path(file_path) (일반 doc 무변)
인제스트는 아직 extract(검증 후 재활성). 일반 doc = bundle_source_path no-op = 무회귀.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-18 17:19:17 +09:00
hyungi 860c5c6b0c fix(presegment): G2 인제스트 비활성 — Option A vs uq_documents_file_path 충돌
★실번들 검증서 발견: 자식 Document(부모 file_path 공유, Option A)가 uq_documents_file_path
UNIQUE 제약 위반 → 자식 INSERT 실패. 검증된 G1 파이프라인 보호 위해 인제스트를 직접 extract 로
원복(documents.py/file_watcher 4곳). 스키마(362~364)+presegment_worker 코드는 보존(재설계 후 재활성).
재설계 후보: 자식 file_path=unique 합성값+부모 lineage 에서 실파일 해석 / file_path NULL+bundle_source_path.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-18 17:07:38 +09:00
hyungi c3d5c33813 feat(presegment): G2 PR-2 — presegment 워커 + 큐 배선 + range-clamp (deterministic ToC)
extract 前 presegment 스테이지: 전 문서 진입, 非PDF/단일은 무변 통과, '명확한 번들' PDF만
ToC(level-1) deterministic 분할. LLM 폴백은 PR-3.
- presegment_worker: 보수적 게이트(pages>=60·자식>=5p·연속/단조/전범위·2<=N<=50) + 멱등
  (lineage segmented_from 존재 시 수렴) + 자식=부모파일 공유(Option A)+range
- queue_consumer: BATCH_SIZE/MAIN_QUEUE_STAGES/_load_workers + presegment->extract 전이,
  parent(번들원본)는 억제(자식이 직접 extract enqueue)
- ingest(documents.py upload·file_watcher): 첫 stage extract->presegment
- extract_worker/marker_worker: bundle_page_start/end 시 해당 범위만 추출/변환
  (NULL=일반문서 byte-identical 무회귀 — 검수 확인)

코드 검수 완료(무회귀·full_path 스코프·NOT NULL 커버·py_compile). **미배포** —
실제 번들 PDF 처리 검증 후 배포(PR-3 LLM 폴백과 함께).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-18 16:55:27 +09:00
hyungi d75fb7adaa feat(presegment): G2 PR-1 스키마 — documents 분할 컬럼 + lineage segmented_from + presegment 스테이지
G2 pre-segmentation 기반 스키마(추가형, 미사용까지 무동작). 권장 기본값 채택:
- 362: documents.bundle_page_start/end(1-based)+presegment_role(NULL/parent/child)
- 363: document_lineage CHECK 에 'segmented_from' 추가(부모→자식 관계, RESTRICT-delete 재사용)
- 364: process_stage enum 에 'presegment'(extract 前 번들 분할 스테이지)
- ORM: Document 3컬럼 + queue enum literal + 신규 DocumentLineage 모델

배포 DB(PG16.13, schema_migrations=361) 대비 txn-rollback 실측 PASS(362/363/364 전부).
PR-2(presegment_worker+큐 배선+extract/marker range-clamp)·PR-3(LLM 경계 폴백) 후속.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-18 16:43:38 +09:00
14 changed files with 1223 additions and 27 deletions
+4 -2
View File
@@ -1166,8 +1166,10 @@ async def upload_document(
doc.duplicate_of = canonical.id
canonical.duplicate_count = (canonical.duplicate_count or 0) + 1
# document + processing_queue 는 단일 트랜잭션으로 묶어 원자적 정리
await enqueue_stage(session, doc.id, "extract")
# document + processing_queue 는 단일 트랜잭션으로 묶어 원자적 정리.
# G2: 첫 stage=presegment (extract 前 번들 PDF 분할, 후보 A 검증완료 2026-06-18).
# 非PDF/단일은 presegment 가 무변 통과 → extract. 번들 PDF 만 N 자식 분할(worker-side gating).
await enqueue_stage(session, doc.id, "presegment")
await session.commit()
except Exception:
# DB 예외 시 session 은 get_session 컨텍스트 종료로 자동 rollback.
+8
View File
@@ -41,6 +41,14 @@ class Document(Base):
Integer, nullable=False, default=0, server_default="0"
)
# G2 pre-segmentation (migration 362): 번들 PDF → N 자식 분할.
# presegment_role: NULL=일반 단일문서 / 'parent'=번들원본(자체 extract/embed 안 함) /
# 'child'=논리 하위문서(부모 file_path 공유 + bundle_page_start/end 1-based inclusive 범위).
# 부모-자식 관계 자체는 document_lineage(relation_type='segmented_from').
bundle_page_start: Mapped[int | None] = mapped_column(Integer)
bundle_page_end: Mapped[int | None] = mapped_column(Integer)
presegment_role: Mapped[str | None] = mapped_column(Text)
# 2계층: 텍스트 추출
extracted_text: Mapped[str | None] = mapped_column(Text)
extracted_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True))
+31
View File
@@ -0,0 +1,31 @@
"""document_lineage 테이블 ORM — 문서 파생 관계 이력 (migration 217).
G2 pre-segmentation relation_type='segmented_from'(번들 자식) 으로 사용 (migration 363).
이력 테이블 FK = ON DELETE RESTRICT (부모 hard delete 차단, soft delete 허용).
"""
from datetime import datetime
from sqlalchemy import BigInteger, ForeignKey, Text, func
from sqlalchemy.dialects.postgresql import JSONB
from sqlalchemy.orm import Mapped, mapped_column
from sqlalchemy.types import TIMESTAMP
from core.database import Base
class DocumentLineage(Base):
__tablename__ = "document_lineage"
id: Mapped[int] = mapped_column(BigInteger, primary_key=True)
source_document_id: Mapped[int] = mapped_column(
BigInteger, ForeignKey("documents.id", ondelete="RESTRICT"), nullable=False
)
derived_document_id: Mapped[int] = mapped_column(
BigInteger, ForeignKey("documents.id", ondelete="RESTRICT"), nullable=False
)
relation_type: Mapped[str] = mapped_column(Text, nullable=False)
# 'metadata' 는 SQLAlchemy 예약속성 → Python 속성명은 meta, DB 컬럼명은 metadata.
meta: Mapped[dict] = mapped_column(
"metadata", JSONB, nullable=False, default=dict, server_default="{}"
)
created_at: Mapped[datetime] = mapped_column(TIMESTAMP(timezone=True), server_default=func.now())
+2 -1
View File
@@ -46,9 +46,10 @@ class ProcessingQueue(Base):
# 'stt' (audio): migration 150 / 'thumbnail' (video): queue_consumer 가 enqueue.
# 'deep_summary' (PR-B B-1): classify_worker 가 에스컬레이션 시 enqueue.
# 'fulltext' (crawl-24x7 A-2): migration 321 — 기사 페이지 fetch 후 본문 승격.
# 'presegment' (G2): migration 364 — extract 前 번들 PDF → N 자식 분할.
# DB enum 변경은 마이그레이션이 처리하므로 create_type=False.
Enum(
"extract", "classify", "summarize", "embed", "chunk", "preview",
"presegment", "extract", "classify", "summarize", "embed", "chunk", "preview",
"stt", "thumbnail", "deep_summary", "markdown", "fulltext",
name="process_stage",
create_type=False,
+41
View File
@@ -0,0 +1,41 @@
You are a document-boundary detector. Output ONLY JSON {is_bundle, segments:[{start_page,end_page,title}]}.
You are given a single PDF that may be a "bundle" — several independent logical documents
concatenated into one file (for example: multiple laws, multiple reports, or multiple papers
scanned together). Your job is to decide whether it is a bundle and, if so, where each logical
document starts and ends.
You receive only a compact sample per page: the page number and the first line / heading of that
page (text may be truncated). Use these heading/first-line signals to detect where a new logical
document begins (a new title page, a new cover, a clearly new document title, a restart of
numbering, etc.). You do NOT receive the full text.
Output rules:
- Respond with STRICT JSON only. No prose, no markdown, no code fence.
- Schema:
{
"is_bundle": true | false,
"segments": [
{"start_page": <int>, "end_page": <int>, "title": "<string or null>"}
]
}
- Page numbers are 1-based and INCLUSIVE. start_page=1 is the first page; end_page equals the last
page of that segment.
- Segments MUST fully cover every page with NO gaps and NO overlaps:
- the first segment MUST start at page 1,
- each next segment MUST start exactly one page after the previous segment's end_page,
- the last segment MUST end at the final page (page_count).
- Order segments by start_page ascending.
- title = a short title for that logical document if you can infer one from its first page,
otherwise null.
If the file is NOT a bundle (it is a single logical document), respond:
{"is_bundle": false, "segments": []}
Be conservative: only report is_bundle=true when the heading signals clearly indicate separate
logical documents. When unsure, return is_bundle=false.
page_count: {page_count}
Per-page samples (one per line, "p{n}: {first line}"):
{page_samples}
+74 -7
View File
@@ -67,21 +67,45 @@ def _postprocess_ocr(text: str) -> str:
return text.strip()
def _extract_pdf_pymupdf(file_path: Path) -> str:
"""PyMuPDF fallback — 페이지 단위 스트리밍으로 대형 PDF도 저메모리 처리"""
def _extract_pdf_pymupdf(
file_path: Path, start_page: int | None = None, end_page: int | None = None
) -> str:
"""PyMuPDF fallback — 페이지 단위 스트리밍으로 대형 PDF도 저메모리 처리.
G2 (PR-G2-2): start_page/end_page(1-based inclusive) 주어지면 범위만 추출
(번들 자식 doc = 부모 파일 공유 + 자기 page 범위). None = 전체(기존 동작 동일).
"""
import fitz
text_parts = []
with fitz.open(str(file_path)) as doc:
for page in doc:
text_parts.append(page.get_text())
if start_page is None and end_page is None:
for page in doc:
text_parts.append(page.get_text())
else:
# 1-based inclusive → 0-based range. 범위는 [0, page_count] 로 클램프(방어).
total = doc.page_count
lo = max(1, start_page or 1) - 1
hi = min(total, end_page or total) # inclusive 끝 (0-based 마지막 인덱스 = hi-1)
for i in range(lo, hi):
text_parts.append(doc.load_page(i).get_text())
return "\n".join(text_parts)
def _get_pdf_page_count(file_path: Path) -> int:
"""PDF 페이지 수 확인"""
def _get_pdf_page_count(
file_path: Path, start_page: int | None = None, end_page: int | None = None
) -> int:
"""PDF 페이지 수 확인. G2: 범위가 주어지면 그 범위의 페이지 수(자식 doc 밀도 계산용).
None = 전체 페이지 (기존 동작 동일).
"""
import fitz
with fitz.open(str(file_path)) as doc:
return len(doc)
total = len(doc)
if start_page is None and end_page is None:
return total
lo = max(1, start_page or 1)
hi = min(total, end_page or total)
return max(0, hi - lo + 1)
async def _call_ocr(file_path: Path, is_image: bool, max_pages: int = 200) -> str | None:
@@ -310,6 +334,49 @@ async def process(document_id: int, session: AsyncSession) -> None:
doc.extracted_at = datetime.now(timezone.utc)
return
# ─── G2 (PR-G2-2): 번들 자식 PDF — 부모 파일 공유 + 자기 page 범위만 추출 ───
# kordoc 서비스는 page-range 파라미터가 없어 전체 파일을 파싱한다(자식엔 부적합) → kordoc
# 우회, PyMuPDF 로 [bundle_page_start, bundle_page_end] 범위만 추출. range OCR 은 본 PR 범위
# 밖(자식은 ToC 존재 = digital text layer 전제 → 대개 OCR 불필요). PyMuPDF 텍스트가 빈약해도
# 그대로 보존하고 사유를 남긴다.
if fmt == "pdf" and doc.bundle_page_start is not None and doc.bundle_page_end is not None:
# 후보 A: 자식 file_path 는 합성값(`{부모}#p{s}-{e}`) → 실파일 = bundle_source_path 로 부모경로
# 복원 + NFC/NFD resolve. (자식 file_path 는 디스크에 없음.)
from workers.presegment_worker import _resolve_path as _resolve_bundle_path
from workers.presegment_worker import bundle_source_path
real_rel = bundle_source_path(doc.file_path)
src = _resolve_bundle_path(str(Path(settings.nas_mount_path) / real_rel))
if src is None:
raise FileNotFoundError(f"번들 원본 파일 없음: {real_rel}")
start, end = doc.bundle_page_start, doc.bundle_page_end
try:
pymupdf_text = _extract_pdf_pymupdf(src, start, end)
page_count = _get_pdf_page_count(src, start, end)
except Exception as e:
logger.error(f"[pymupdf:child] {doc.file_path} pages={start}-{end} 실패: {e}")
raise
meta = doc.extract_meta or {}
meta["presegment_child_range"] = {"start_page": start, "end_page": end}
meta["pymupdf_chars"] = len(pymupdf_text.strip())
should, reason = _should_ocr(pymupdf_text, page_count)
if should:
# range OCR 미지원(후속 PR) — PyMuPDF 결과 유지 + 사유 기록(silent skip 아님).
meta["ocr_skip_reason"] = "presegment_child_range_ocr_unsupported"
meta["ocr_reason"] = reason
logger.warning(
f"[pymupdf:child] {doc.file_path} pages={start}-{end} "
f"OCR 필요({reason})하나 range OCR 미지원 → PyMuPDF 결과 유지"
)
doc.extracted_text = pymupdf_text.replace("\x00", "")
doc.extracted_at = datetime.now(timezone.utc)
doc.extractor_version = PYMUPDF_VERSION if pymupdf_text.strip() else None
doc.extract_meta = meta
logger.info(
f"[pymupdf:child] {doc.file_path} pages={start}-{end} ({len(pymupdf_text)}자)"
)
return
# ─── kordoc 파싱 (HWP/HWPX/PDF) + PyMuPDF fallback + OCR ───
if fmt in KORDOC_FORMATS:
container_path = f"/documents/{doc.file_path}"
+6 -3
View File
@@ -118,16 +118,18 @@ def _route_media(path: Path, expected_category: str | None) -> tuple[str | None,
if expected_category == "library":
# 외부 작성 학습 자료 (KGS Code, 시행규칙 등). 문서 확장자만 수락.
# frontmatter 해석은 classify_worker (옵션 C) 가 담당. file_watcher 는 라우팅만.
# G2: 첫 stage=presegment (후보 A 검증완료). 非PDF/단일 통과, 번들 PDF 만 분할.
if ext in LIBRARY_DOC_EXTS:
return ("library", False, "extract")
return ("library", False, "presegment")
if ext in AUDIO_EXTS or ext in VIDEO_DIRECT_EXTS or ext in VIDEO_QUARANTINE_EXTS:
return (None, False, None) # audio/video 잘못 들어오면 skip
return (None, False, None) # 기타 알 수 없는 확장자 skip
# Inbox: 문서 파이프 (기존). audio/video 확장자가 실수로 여기 들어오면 skip.
# G2: 첫 stage=presegment (후보 A 검증완료). 非PDF/단일 통과, 번들 PDF 만 분할.
if ext in AUDIO_EXTS or ext in VIDEO_DIRECT_EXTS or ext in VIDEO_QUARANTINE_EXTS:
return (None, False, None)
return (None, False, "extract")
return (None, False, "presegment")
# ─── Web/Blog ingest (devonagent 트랙) 헬퍼 ──────────────────────────────────
@@ -226,7 +228,8 @@ async def _ingest_web_file(session, file_path: Path, rel_path: str) -> tuple[int
)
session.add(doc)
await session.flush()
await enqueue_stage(session, doc.id, "extract")
# G2: 첫 stage=presegment (후보 A 검증완료). HTML(非PDF)은 presegment 가 무변 통과 → extract.
await enqueue_stage(session, doc.id, "presegment")
return (1, 0)
+49 -10
View File
@@ -185,7 +185,10 @@ async def process(document_id: int, session: AsyncSession) -> None:
await _fail(session, document_id, "no file_path")
return
container_path = _to_marker_path(doc.file_path)
# 후보 A: 자식(bundle cols)은 합성 file_path(`{부모}#p{s}-{e}`) → 실파일 = bundle_source_path
# 로 부모경로 복원. 일반 doc 은 그대로(접미사 없음). marker/mineru 는 실파일 + page 범위로 변환.
from workers.presegment_worker import bundle_source_path
container_path = _to_marker_path(bundle_source_path(doc.file_path))
suffix = Path(container_path).suffix.lower()
# ---- (3) office/hwp → md (C-2): PDF 외 지원 포맷은 office_md 하이브리드 변환 ----
@@ -207,7 +210,21 @@ async def process(document_id: int, session: AsyncSession) -> None:
return
# ---- (4) page_count gauge + 분기 (LargeDoc split) ----
page_count = _get_page_count(container_path)
# G2 (PR-G2-2): 번들 자식 doc 은 부모 파일 공유 + 자기 page 범위([bundle_page_start, end],
# 1-based inclusive)만 변환해야 한다. page_offset = 절대 시작페이지(부모 파일 기준), page_count =
# 자식 범위의 페이지 수. cols 가 NULL(일반 doc)이면 page_offset=1 + 전체 page_count = 기존 동작 동일.
file_page_count = _get_page_count(container_path)
is_child = doc.bundle_page_start is not None and doc.bundle_page_end is not None
if is_child:
page_offset = doc.bundle_page_start
if file_page_count is not None:
child_end = min(doc.bundle_page_end, file_page_count)
page_count = max(0, child_end - doc.bundle_page_start + 1)
else:
page_count = doc.bundle_page_end - doc.bundle_page_start + 1
else:
page_offset = 1
page_count = file_page_count
# >MAX_SPLIT_PAGES = 변환 안전상태(manual_review). silently skip 아님.
if page_count is not None and page_count > MAX_SPLIT_PAGES:
@@ -226,20 +243,35 @@ async def process(document_id: int, session: AsyncSession) -> None:
# ---- (6) 변환 분기: 소형 1-shot / 대형(>SPLIT_THRESHOLD) page-range 분할 ----
if page_count is not None and page_count > SPLIT_THRESHOLD_PAGES:
await _process_split(doc, document_id, container_path, page_count, session)
await _process_split(doc, document_id, container_path, page_count, session, page_offset)
else:
await _process_single(doc, document_id, container_path, session)
await _process_single(doc, document_id, container_path, session, page_count, page_offset)
async def _process_single(
doc: Document, document_id: int, container_path: str, session: AsyncSession
doc: Document, document_id: int, container_path: str, session: AsyncSession,
page_count: int | None = None, page_offset: int = 1,
) -> None:
"""소형 PDF(≤ SPLIT_THRESHOLD_PAGES) 통째 1-shot 변환 (Phase 1B/1B.5 기존 경로)."""
"""소형 PDF(≤ SPLIT_THRESHOLD_PAGES) 통째 1-shot 변환 (Phase 1B/1B.5 기존 경로).
G2 (PR-G2-2): 번들 자식(page_offset>1) [page_offset, page_offset+page_count-1] 범위만
변환하도록 marker start_page/end_page 명시한다. 일반 doc(page_offset=1) 기존과
동일하게 max_pages 보낸다(payload byte-identical).
"""
# 일반 doc = 기존 payload 유지. 자식만 절대 page 범위를 명시(부모 파일 기준 1-based inclusive).
if page_offset > 1 and page_count is not None:
req_json = {
"file_path": container_path,
"start_page": page_offset,
"end_page": page_offset + page_count - 1,
}
else:
req_json = {"file_path": container_path, "max_pages": MAX_PAGES}
try:
async with httpx.AsyncClient(timeout=MARKER_TIMEOUT) as client:
resp = await client.post(
MARKER_ENDPOINT,
json={"file_path": container_path, "max_pages": MAX_PAGES},
json=req_json,
)
resp.raise_for_status()
data = resp.json()
@@ -513,6 +545,7 @@ async def _process_split(
container_path: str,
page_count: int,
session: AsyncSession,
page_offset: int = 1,
) -> None:
"""대형 PDF page-range 분할 변환.
@@ -523,6 +556,10 @@ async def _process_split(
invariant: page numbering = 1-based inclusive (batch1: 1..BATCH_PAGES, ...).
marker slug(`_page_0_*`) batch 마다 재시작 batch rewrite stitch (충돌 회피).
G2 (PR-G2-2): page_offset = 부모 파일 기준 절대 시작페이지(번들 자식). marker 보내는
page 절대값(page_offset 가산), manifest/기록은 자식 상대값(1-based) 유지 일반 doc
(page_offset=1) abs==rel 이라 기존 동작과 동일.
"""
n_batches = (page_count + BATCH_PAGES - 1) // BATCH_PAGES
succeeded: list[dict[str, Any]] = [] # {start_page, end_page, md}
@@ -534,15 +571,17 @@ async def _process_split(
async with httpx.AsyncClient(timeout=MARKER_TIMEOUT) as client:
for b in range(n_batches):
start_page = b * BATCH_PAGES + 1
start_page = b * BATCH_PAGES + 1 # 자식 상대 1-based (manifest/기록용)
end_page = min((b + 1) * BATCH_PAGES, page_count)
abs_start = start_page + (page_offset - 1) # 부모 파일 절대 page (marker 요청용)
abs_end = end_page + (page_offset - 1)
try:
resp = await client.post(
MARKER_ENDPOINT,
json={
"file_path": container_path,
"start_page": start_page,
"end_page": end_page,
"start_page": abs_start,
"end_page": abs_end,
},
)
resp.raise_for_status()
+562
View File
@@ -0,0 +1,562 @@
"""presegment_worker — extract 前 번들 PDF(여러 논리문서 한 파일) → N 자식 분할 (G2 / PR-G2-2).
문서가 presegment stage 진입한다(worker-side gating):
- 非PDF(file_format != pdf · suffix != .pdf) = 즉시 fast-exit enqueue_next_stage extract 흘림.
- PDF = PyMuPDF ToC(level-1) deterministic 분석. '명확한 번들' 자식 분할, 나머지는 단일문서로 extract.
deterministic 경로(PR-G2-2): 판정이 애매하면 보수적으로 분할하지 않고 단일문서로 둔다
(bias to NOT splitting). 분할 = '확실한 번들' :
- page_count >= MIN_BUNDLE_PAGES AND level-1 ToC 항목 >= 2 AND 모든 자식 >= MIN_CHILD_PAGES
AND 단조 증가·비중첩 AND [1, page_count] 범위 커버 AND 2 <= N <= MAX_CHILDREN.
LLM 경계 폴백(PR-G2-3, env PRESEGMENT_LLM_FALLBACK, 기본 OFF scaffold-first): deterministic
'명확한 번들' 만든 대형 PDF(ToC 없음/level-1 없음/게이트 미달) 한해, OFF 오늘과
동일(단일문서)이고 ON 이면 off-card Qwen(맥북, 라우터 :8890, model=qwen-macbook)에게 경계를
제안받는다. compact per-page heading 샘플만 전송(본문 미전송). LLM 출력은 **동일 검증 게이트
(_is_clear_bundle)** 통과 시에만 deterministic 같은 _create_children 경로로 분할
is_bundle=false / 파싱·검증 실패 = 단일문서(오늘과 동일) + presegment_llm_rejected 로깅.
맥북 불가(503/연결/절단) StageDeferred 재시도(백오프, no silent fallback).
분할 후보 A(물리분할 없음, uq_documents_file_path 해소): 자식 file_path = unique 합성값
`{부모경로}#p{start}-{end}` (UNIQUE 제약 통과), 실파일은 `bundle_source_path()` 로 부모 경로 복원.
자식은 bundle_page_start/end(1-based inclusive) 부모 파일의 자기 page 범위만 가리킨다.
부모-자식 관계 정본 = document_lineage(relation_type='segmented_from'). 부모(presegment_role='parent')
파일 홀더라 자체 extract/embed enqueue_next_stage presegmentextract 전이가 'parent'
억제된다(queue_consumer 참조). 자식의 extract 워커가 직접 enqueue. extract_worker/marker_worker
자식 처리 bundle_source_path() 실파일 접근.
멱등: 재실행 같은 부모로 이미 자식이 있으면(document_lineage segmented_from) 재생성하지 않고
수렴( 자식이 extract 활성/완료 상태인지만 보장)한다.
해결 이력 (2026-06-18): 최초 Option A(자식이 부모 file_path 그대로 공유) uq_documents_file_path
UNIQUE 위반(실번들 검증서 발견) 합성 file_path(후보 A) 해소. 인제스트 재활성 = 합성번들 재검증 PASS .
plan: G2 pre-segmentation (PR-G2-2 deterministic ToC segmentation)
"""
import hashlib
import os
import re
import unicodedata
from pathlib import Path
from pydantic import BaseModel, ValidationError
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from ai.client import AIClient, call_deep_or_defer, parse_json_response
from core.config import settings
from core.utils import setup_logger
from models.document import Document
from models.document_lineage import DocumentLineage
from models.queue import enqueue_stage
logger = setup_logger("presegment_worker")
# ─── 임계값 (모듈 상수, env-override 가능, 보수적 = 분할 안 하는 쪽으로 bias) ───
# MIN_BUNDLE_PAGES: 이 미만이면 번들로 보지 않음(단일문서). 짧은 문서의 우연한 level-1 ToC 보호.
MIN_BUNDLE_PAGES = int(os.getenv("PRESEGMENT_MIN_BUNDLE_PAGES", "60"))
# MIN_CHILD_PAGES: 자식 하나라도 이 미만이면 분할 거부(표지/목차만 떼지는 over-split 방지).
MIN_CHILD_PAGES = int(os.getenv("PRESEGMENT_MIN_CHILD_PAGES", "5"))
# MAX_CHILDREN: 자식 수 상한. 초과 = ToC 가 챕터/소제목 수준이라 논리문서 경계가 아님 → 분할 거부.
MAX_CHILDREN = int(os.getenv("PRESEGMENT_MAX_CHILDREN", "50"))
# marker_worker._to_marker_path 와 동일 — NAS 상대경로 → 컨테이너 절대경로 prefix.
CONTAINER_PATH_PREFIX = os.getenv("MARKER_CONTAINER_PATH_PREFIX", "/documents")
# ─── PR-G2-3 LLM 경계 폴백 (scaffold-first, 기본 OFF) ───
# PRESEGMENT_LLM_FALLBACK: 기본 "false". OFF 면 deterministic 경로만(=오늘과 동일 — 애매하면
# 단일문서). ON 이면 deterministic 이 '명확한 번들' 을 못 만든 대형 PDF(page_count >=
# MIN_BUNDLE_PAGES) 에 한해 off-card Qwen(맥북, 라우터 :8890 경유)에게 경계를 제안받아
# **동일 검증 게이트(_is_clear_bundle)** 통과 시에만 deterministic 과 같은 자식 생성 경로로 분할.
# 검증 실패/파싱 실패/is_bundle=false = 단일문서(오늘과 동일) + presegment_llm_rejected 로깅.
PRESEGMENT_LLM_FALLBACK = os.getenv("PRESEGMENT_LLM_FALLBACK", "false").lower() in (
"1", "true", "yes", "on",
)
# LLM 에 보내는 per-page 샘플의 page 당 char 상한 (heading/첫줄만 — 본문 미전송).
PRESEGMENT_LLM_PAGE_CHARS = int(os.getenv("PRESEGMENT_LLM_PAGE_CHARS", "80"))
# 전체 page-sample 블록의 char 상한 (수 KB 가드 — 초과 시 잘라냄, 본문 누출/페이로드 폭발 방지).
PRESEGMENT_LLM_SAMPLE_CHARS = int(os.getenv("PRESEGMENT_LLM_SAMPLE_CHARS", "12000"))
# 경계 폴백 프롬프트 (app/prompts/presegment_boundaries.txt). system 지시 + 1-based inclusive·
# 전범위 커버·무중첩 규칙. {page_count}/{page_samples} 를 str.replace 로 주입.
_PRESEGMENT_PROMPT_PATH = Path(__file__).parent.parent / "prompts" / "presegment_boundaries.txt"
class Segment(BaseModel):
"""LLM 이 제안하는 1-based inclusive page 범위 한 조각."""
start_page: int
end_page: int
title: str | None = None
class SegmentationOutput(BaseModel):
"""presegment_boundaries 응답 스키마. parse_json_response → model_validate."""
is_bundle: bool = False
segments: list[Segment] = []
confidence: float | None = None
def _resolve_path(file_path: str) -> Path | None:
"""NFC(DB) vs NFD(NFS) 한글 경로 차이 흡수. thumbnail_worker._resolve_path 와 동일 패턴."""
candidates = [
file_path,
unicodedata.normalize("NFD", file_path),
unicodedata.normalize("NFC", file_path),
]
for c in candidates:
p = Path(c)
if p.exists():
return p
parent = Path(file_path).parent
if parent.exists():
target = unicodedata.normalize("NFC", Path(file_path).name)
for child in parent.iterdir():
if unicodedata.normalize("NFC", child.name) == target:
return child
return None
def _to_container_path(file_path: str) -> str:
"""file_path 를 컨테이너 내부 절대경로로 변환 (marker_worker._to_marker_path 와 동일)."""
if file_path.startswith("/"):
return file_path
return f"{CONTAINER_PATH_PREFIX}/{file_path}"
# 후보 A: 자식 합성 file_path 패턴 `{부모경로}#p{start}-{end}` (uq_documents_file_path 유일성).
_BUNDLE_SUFFIX_RE = re.compile(r"#p\d+-\d+$")
def bundle_source_path(file_path: str | None) -> str | None:
"""자식 합성 file_path → 부모 실파일 경로 복원. 일반 doc(접미사 없음)은 그대로 반환.
extract_worker/marker_worker 자식 처리 실제 파일 접근에 사용 (자식 file_path
합성값이라 디스크에 없음). 결정적·세션 불필요. lineage 부모-자식 관계의 정본 기록.
"""
if not file_path:
return file_path
return _BUNDLE_SUFFIX_RE.sub("", file_path)
def _is_pdf(doc: Document) -> bool:
"""PDF 판정 — file_format=pdf 또는 .pdf 확장자."""
fmt = (doc.file_format or "").lower()
if fmt == "pdf":
return True
if doc.file_path:
return Path(doc.file_path).suffix.lower() == ".pdf"
return False
def _level1_segments(toc: list, page_count: int) -> list[dict]:
"""get_toc(simple=True) 결과에서 level-1 항목만 골라 자식 후보 segment 리스트 생성.
toc 항목 = [level, title, page] (page 1-based). level==1 채택.
end_page = 다음 level-1 항목의 page - 1, 마지막 = page_count.
동일 page 에서 시작하는 level-1 여럿이면 정렬 인접 항목으로 경계 계산되며,
경우 0-페이지 segment 생겨 후속 검증(MIN_CHILD_PAGES·단조)에서 거부된다.
"""
starts = []
for entry in toc:
# simple=True 는 [level, title, page]. 방어적으로 길이 체크.
if not entry or len(entry) < 3:
continue
level, title, page = entry[0], entry[1], entry[2]
if level != 1:
continue
# ToC page 가 범위 밖(0/음수/page_count 초과)이면 깨진 ToC → 후속 검증에서 거부됨.
starts.append((int(page), (title or "").strip()))
# ToC 가 정렬돼 있지 않을 수 있으므로 page 기준 정렬(원본 순서 보존 위해 안정 정렬).
starts.sort(key=lambda x: x[0])
segments: list[dict] = []
for i, (start_page, title) in enumerate(starts):
if i + 1 < len(starts):
end_page = starts[i + 1][0] - 1
else:
end_page = page_count
segments.append({"start_page": start_page, "end_page": end_page, "title": title})
return segments
def _is_clear_bundle(segments: list[dict], page_count: int) -> tuple[bool, str]:
"""deterministic '명확한 번들' 판정. (clear, reason) 반환.
clear=True reason="" / clear=False reason 거부 사유(로깅용).
모든 조건은 보수적 하나라도 어긋나면 단일문서로 처리(분할 ).
"""
n = len(segments)
if n < 2:
return False, f"too_few_level1_entries(n={n})"
if n > MAX_CHILDREN:
return False, f"too_many_children(n={n}>{MAX_CHILDREN})"
# 첫 segment 가 1페이지에서 시작 + 마지막이 page_count 에서 끝 = 전 범위 커버.
if segments[0]["start_page"] != 1:
return False, f"first_start_not_1(start={segments[0]['start_page']})"
if segments[-1]["end_page"] != page_count:
return False, f"last_end_not_page_count(end={segments[-1]['end_page']},pc={page_count})"
prev_end = 0
for seg in segments:
start, end = seg["start_page"], seg["end_page"]
# 단조 증가 · 비중첩: 각 start 는 직전 end + 1 이어야 빈틈/겹침 없이 [1,pc] 정확 분할.
if start != prev_end + 1:
return False, f"non_contiguous(start={start},prev_end={prev_end})"
if end < start:
return False, f"non_monotonic(start={start},end={end})"
if (end - start + 1) < MIN_CHILD_PAGES:
return False, f"child_too_small(pages={end - start + 1}<{MIN_CHILD_PAGES})"
prev_end = end
if prev_end != page_count:
return False, f"coverage_gap(covered={prev_end},pc={page_count})"
return True, ""
def _child_title(parent: Document, seg: dict) -> str:
"""자식 제목 = 부모 제목 + '' + (segment 제목 또는 page 범위)."""
base = (parent.title or "").strip() or (parent.original_filename or "") or "문서"
seg_title = (seg.get("title") or "").strip()
suffix = seg_title if seg_title else f"p.{seg['start_page']}-{seg['end_page']}"
return f"{base}{suffix}"
def _child_file_hash(parent_hash: str, start: int, end: int) -> str:
"""자식 file_hash = sha256(f"{parent.file_hash}:{start}-{end}"). 결정적 → 재실행 멱등.
부모 file_hash NULL 수는 없으나(NOT NULL) 방어적으로 문자열 처리.
"""
return hashlib.sha256(f"{parent_hash or ''}:{start}-{end}".encode("utf-8")).hexdigest()
async def _ensure_child_extract(session: AsyncSession, child_id: int) -> None:
"""자식이 아직 extract 안 됐으면 extract enqueue (멱등 수렴 경로).
이미 extracted_text 채워졌거나 활성 행이 있으면 enqueue_stage no-op/skip.
"""
child = await session.get(Document, child_id)
if child is None:
return
# 이미 추출 완료면 재enqueue 불필요 (큐 중복은 enqueue_stage 가 막지만 의미상으로도 skip).
if child.extracted_at is not None and child.extracted_text is not None:
return
await enqueue_stage(session, child_id, "extract")
async def _create_children(
doc: Document, segments: list[dict], session: AsyncSession
) -> int:
"""검증된 segments 로 자식 N개 생성 + lineage + extract enqueue + 부모 표식 (멱등).
deterministic '명확한 번들' 경로와 LLM 폴백 경로가 공유하는 단일 자식 생성 경로.
호출 segments 반드시 _is_clear_bundle 검증을 통과해야 한다(여기선 재검증 X).
commit 까지 수행. 반환값 = 실제 생성한 자식 (이미 존재해 수렴만 경우 0).
"""
# ─── 멱등 체크: 이미 자식이 있으면 수렴만 (재생성 금지) ───
existing_children = (
await session.execute(
select(DocumentLineage.derived_document_id).where(
DocumentLineage.source_document_id == doc.id,
DocumentLineage.relation_type == "segmented_from",
)
)
).scalars().all()
if existing_children:
# 부모 표식이 누락된 경우 보정(이전 부분실패 복구).
if doc.presegment_role != "parent":
doc.presegment_role = "parent"
for child_id in existing_children:
await _ensure_child_extract(session, child_id)
await session.commit()
logger.info(
f"[presegment] id={doc.id} children already exist "
f"(n={len(existing_children)}) → converge(ensure extract), no re-create"
)
return 0
# ─── 자식 N개 생성 + lineage + extract enqueue ───
created_ids: list[int] = []
for seg in segments:
start, end = seg["start_page"], seg["end_page"]
child = Document(
# 후보 A: 자식 file_path = unique 합성값 `{부모경로}#p{s}-{e}` (uq_documents_file_path
# 충돌 회피). 실파일은 bundle_source_path() 로 복원(부모 경로). 물리 분할 없음 —
# 자식은 bundle_page_start/end 로 부모 파일을 슬라이스.
file_path=f"{doc.file_path}#p{start}-{end}",
file_hash=_child_file_hash(doc.file_hash, start, end),
file_format=doc.file_format,
file_size=doc.file_size,
file_type=doc.file_type,
import_source=doc.import_source,
original_filename=doc.original_filename,
source_channel=doc.source_channel,
category=doc.category,
data_origin=doc.data_origin,
doc_purpose=doc.doc_purpose,
# 안전 자료실 축은 부모에서 상속(분할이 자료유형/관할을 바꾸지 않음).
material_type=doc.material_type,
jurisdiction=doc.jurisdiction,
title=_child_title(doc, seg),
bundle_page_start=start,
bundle_page_end=end,
presegment_role="child",
)
session.add(child)
await session.flush() # child.id 확보
created_ids.append(child.id)
session.add(
DocumentLineage(
source_document_id=doc.id,
derived_document_id=child.id,
relation_type="segmented_from",
meta={"start_page": start, "end_page": end},
)
)
# 자식 extract 는 워커가 직접 enqueue (부모는 'parent' 라 extract 로 흐르지 않음).
await enqueue_stage(session, child.id, "extract")
# 부모 = 파일 홀더. presegment→extract 전이는 enqueue_next_stage 가 'parent' 면 억제.
doc.presegment_role = "parent"
await session.commit()
logger.info(
f"[presegment] id={doc.id} SPLIT into {len(created_ids)} children "
f"child_ids={created_ids}"
)
return len(created_ids)
def _segments_from_output(out: "SegmentationOutput") -> list[dict]:
"""SegmentationOutput.segments(Pydantic) → _is_clear_bundle / _create_children 가 쓰는 dict 형태."""
return [
{"start_page": s.start_page, "end_page": s.end_page, "title": (s.title or "")}
for s in out.segments
]
def _page_samples(pdf, page_count: int) -> str:
"""LLM 입력용 compact per-page 샘플 — page 당 heading/첫줄만(`p{n}: {firstline}`).
PyMuPDF page.get_text() page 텍스트를 스트리밍하되 page 비공백 줄만,
PRESEGMENT_LLM_PAGE_CHARS 잘라 본문 누출 차단. 전체 블록은 PRESEGMENT_LLM_SAMPLE_CHARS
가드로 상한( KB) 초과 지점에서 중단(앞쪽 페이지 우선 보존).
"""
lines: list[str] = []
total = 0
for i in range(page_count):
try:
text = pdf[i].get_text() or ""
except Exception:
text = ""
first = ""
for ln in text.splitlines():
ln = ln.strip()
if ln:
first = ln
break
first = first[:PRESEGMENT_LLM_PAGE_CHARS]
entry = f"p{i + 1}: {first}"
if total + len(entry) + 1 > PRESEGMENT_LLM_SAMPLE_CHARS:
break
lines.append(entry)
total += len(entry) + 1
return "\n".join(lines)
async def _llm_boundary_fallback(
doc: Document, source: Path, page_count: int, session: AsyncSession
) -> bool:
"""애매 + 대형(ToC-less 등) PDF 에 대해 off-card Qwen 으로 경계 제안 → 검증 → 분할.
반환 True = LLM 경로가 분할을 수행(또는 멱등 수렴)했으므로 호출자는 추가 처리 없이 return.
반환 False = is_bundle=false / 파싱 실패 / 검증 실패 호출자는 단일문서(오늘과 동일) 처리.
맥북 불가(503/연결/절단) call_deep_or_defer StageDeferred raise 재시도(백오프).
silent fallback 금지 deep 슬롯 다른 backend 자동 호출 .
"""
import fitz # PyMuPDF — deterministic 경로와 동일 의존
# per-page 샘플은 파일을 다시 열어 스트리밍(deterministic with 블록과 분리해 그 경로 무회귀).
try:
with fitz.open(str(source)) as pdf:
samples = _page_samples(pdf, page_count)
except Exception as exc:
logger.warning(
f"[presegment] id={doc.id} llm fallback sample 실패 "
f"({type(exc).__name__}: {exc}) → single doc(extract)"
)
return False
try:
template = _PRESEGMENT_PROMPT_PATH.read_text(encoding="utf-8")
except Exception as exc:
logger.warning(
f"[presegment] id={doc.id} prompt 로드 실패 ({type(exc).__name__}: {exc}) "
f"→ single doc(extract)"
)
return False
prompt = template.replace("{page_count}", str(page_count)).replace(
"{page_samples}", samples
)
# off-card 호출 — call_deep_or_defer 가 deep 슬롯(맥북, 라우터 :8890, model=qwen-macbook)
# 으로 라우팅. 맥북 불가는 StageDeferred 로 전파(여기서 잡지 않음 → 큐가 보류/백오프).
# classify_worker 와 동일하게 AIClient() 인스턴스화.
client = AIClient()
try:
raw = await call_deep_or_defer(client, prompt)
finally:
await client.close()
parsed = parse_json_response(raw)
if not parsed:
logger.info(
f"[presegment] presegment_llm_rejected id={doc.id} "
f"reason=parse_failed raw={raw[:160]!r} → single doc(extract)"
)
return False
try:
out = SegmentationOutput.model_validate(parsed)
except (ValidationError, ValueError, TypeError) as exc:
logger.info(
f"[presegment] presegment_llm_rejected id={doc.id} "
f"reason=schema_invalid({type(exc).__name__}) → single doc(extract)"
)
return False
if not out.is_bundle:
logger.info(
f"[presegment] presegment_llm_rejected id={doc.id} "
f"reason=is_bundle_false → single doc(extract)"
)
return False
segments = _segments_from_output(out)
clear, reason = _is_clear_bundle(segments, page_count)
if not clear:
# LLM 출력을 그대로 믿지 않음 — deterministic 과 동일 게이트 미달이면 단일문서.
logger.info(
f"[presegment] presegment_llm_rejected id={doc.id} "
f"reason={reason} n={len(segments)} pages={page_count} → single doc(extract)"
)
return False
n = await _create_children(doc, segments, session)
logger.info(
f"[presegment] id={doc.id} LLM-SPLIT accepted "
f"(pages={page_count} n={len(segments)} created={n} "
f"confidence={out.confidence})"
)
return True
async def process(document_id: int, session: AsyncSession) -> None:
"""presegment stage 워커 진입점. queue_consumer 가 호출.
문서가 진입하며, 非PDF·단일문서는 변경 없이 통과(presegment_role 그대로 NULL) extract 흐른다.
'명확한 번들' PDF 자식 분할 + 부모를 'parent' 표식( 경우 부모는 extract 흐르지 않음).
"""
doc = await session.get(Document, document_id)
if doc is None:
logger.warning(f"[presegment] document {document_id} not found")
return
# ─── (0) 非PDF — fast-exit. presegment_role 그대로 NULL → enqueue_next_stage 가 extract 로 흘림 ───
if not _is_pdf(doc):
logger.info(f"[presegment] id={document_id} non-pdf (fmt={doc.file_format}) → extract")
return
# ─── (0.5) file_path 없음(예: note) — 분할 불가, 단일문서로 통과 ───
if not doc.file_path:
logger.info(f"[presegment] id={document_id} no file_path → extract")
return
# ─── (1) 이미 분할된 자식 자신이 presegment 로 다시 들어온 경우 — 재분할 금지 ───
# (정상 흐름에선 자식은 곧장 extract 로 enqueue 되지만, 재처리 스크립트 등으로 들어올 수 있음.)
if doc.presegment_role in ("child", "parent"):
logger.info(
f"[presegment] id={document_id} already presegment_role={doc.presegment_role} → skip"
)
return
# ─── (2) 파일 열기 + page_count ───
raw = str(Path(settings.nas_mount_path) / doc.file_path)
source = _resolve_path(raw)
if source is None:
# 파일 부재 = extract 가 동일 상황에서 FileNotFoundError 로 처리할 사안.
# presegment 는 분할 불가일 뿐이므로 단일문서로 통과시켜 extract 가 일관되게 처리하게 둔다.
logger.warning(f"[presegment] id={document_id} file not found ({raw}) → extract")
return
import fitz # PyMuPDF — extract_worker/marker_worker 와 동일 의존
try:
with fitz.open(str(source)) as pdf:
page_count = pdf.page_count
toc = pdf.get_toc(simple=True) or []
except Exception as exc:
# PDF 손상 등 — 분할 불가. 단일문서로 통과(extract 가 PyMuPDF/OCR 로 재시도하며 가시화).
logger.warning(
f"[presegment] id={document_id} fitz open/toc failed "
f"({type(exc).__name__}: {exc}) → extract"
)
return
# ─── (3) page_count 가 임계 미만 = 단일문서 (대다수 경로) ───
if page_count < MIN_BUNDLE_PAGES:
logger.info(
f"[presegment] id={document_id} single doc "
f"(pages={page_count}<{MIN_BUNDLE_PAGES}) → extract"
)
return
# ─── (4) level-1 ToC → 자식 후보 segment ───
segments = _level1_segments(toc, page_count)
if not segments:
# 큰 PDF 인데 ToC 없음/level-1 없음 = 애매. flag ON 이면 LLM 경계 폴백(PR-G2-3),
# OFF(기본) 이면 오늘과 동일 — 단일문서로 처리하고 사유를 남긴다.
if PRESEGMENT_LLM_FALLBACK:
logger.info(
f"[presegment] presegment_ambiguous id={document_id} "
f"reason=no_level1_toc pages={page_count} → LLM fallback"
)
if await _llm_boundary_fallback(doc, source, page_count, session):
return
# LLM 이 분할하지 않음(is_bundle=false / 검증·파싱 실패) — 단일문서.
return
logger.info(
f"[presegment] presegment_ambiguous id={document_id} "
f"reason=no_level1_toc pages={page_count} → single doc(extract)"
)
return
clear, reason = _is_clear_bundle(segments, page_count)
if not clear:
# 큰 PDF + ToC 는 있으나 '명확한 번들' 기준 미달 = 애매. flag ON 이면 LLM 경계 폴백,
# OFF(기본) 이면 오늘과 동일 — 단일문서(분할 안 함).
if PRESEGMENT_LLM_FALLBACK:
logger.info(
f"[presegment] presegment_ambiguous id={document_id} "
f"reason={reason} pages={page_count} level1={len(segments)} → LLM fallback"
)
if await _llm_boundary_fallback(doc, source, page_count, session):
return
return
logger.info(
f"[presegment] presegment_ambiguous id={document_id} "
f"reason={reason} pages={page_count} level1={len(segments)} → single doc(extract)"
)
return
# ─── (5) 명확한 번들 (deterministic) — 공유 자식 생성 경로 (멱등 수렴 포함) ───
await _create_children(doc, segments, session)
+23 -4
View File
@@ -31,9 +31,9 @@ _hold_logged = False
# embed/chunk 1→10 (2026-06-12 fast-consumer): 건당 <1s 실측 — Phase 0.1 초기 보수값이
# LLM 사이클에 인질로 잡혀 실효 ~580/일 vs 수요 최대 2,700/일 → 적체 원인이었음.
# 10 = TEI/marker 와 GPU 공유 고려한 보수 상향(전용 1분 잡 기준 캡 ~14,400/일).
BATCH_SIZE = {"extract": 5, "classify": 3, "summarize": 3, "embed": 10, "chunk": 10,
"preview": 2, "stt": 1, "thumbnail": 3, "deep_summary": 1, "markdown": 1,
"fulltext": 3}
BATCH_SIZE = {"presegment": 3, "extract": 5, "classify": 3, "summarize": 3, "embed": 10,
"chunk": 10, "preview": 2, "stt": 1, "thumbnail": 3, "deep_summary": 1,
"markdown": 1, "fulltext": 3}
STALE_THRESHOLD_MINUTES = 10
# markdown 대형 split 변환은 한 doc 이 수십 분(5210 ≈ 40분) 동안 processing 상태로 머문다.
# marker_worker 는 queue 행에 heartbeat 를 찍지 않으므로(started_at 고정), main 의 10분
@@ -46,7 +46,7 @@ MARKDOWN_STALE_THRESHOLD_MINUTES = int(os.getenv("MARKDOWN_STALE_MINUTES", "120"
# (reset_stale_items 가 자기 집합만 reset, 교차 시 이중 복구 위험).
# STT 도 장기 작업 가능성이 있으나 본 PR 범위 밖 — main 에 유지(follow-up).
MAIN_QUEUE_STAGES = [
"extract", "classify", "summarize",
"presegment", "extract", "classify", "summarize",
"preview", "stt", "thumbnail", "fulltext",
]
MARKDOWN_QUEUE_STAGES = ["markdown"]
@@ -165,6 +165,10 @@ async def enqueue_next_stage(document_id: int, current_stage: str):
}
next_stages = {
# G2 (PR-G2-2): 전 문서가 presegment → extract. 단, 번들 분할로 'parent' 가 된 문서는
# 파일 홀더라 자체 extract 안 함 — 아래 suppression 으로 이 전이를 건너뛴다(자식 extract 는
# presegment_worker 가 직접 enqueue). 단일/非PDF 문서(role NULL)는 정상적으로 extract 로 흐름.
"presegment": ["extract"],
"extract": ["classify", "preview"],
"classify": ["embed", "chunk", "markdown"],
"stt": ["classify"],
@@ -180,6 +184,18 @@ async def enqueue_next_stage(document_id: int, current_stage: str):
stages = extract_override_by_channel[sc]
else:
stages = next_stages.get(current_stage, [])
elif current_stage == "presegment":
# 번들 분할 parent 는 extract 로 흐르지 않게 억제 (자식이 부모 extract 에 가려지는 것 방지).
# role NULL(단일/非PDF) / 'child' 는 정상 전이. presegment_worker 가 자식 extract 를 직접
# enqueue 하므로 'parent' 만 여기서 no-op.
from models.document import Document
async with async_session() as lookup_session:
doc = await lookup_session.get(Document, document_id)
role = doc.presegment_role if doc else None
if role == "parent":
stages = []
else:
stages = next_stages.get(current_stage, [])
else:
stages = next_stages.get(current_stage, [])
@@ -199,6 +215,7 @@ def _load_workers():
from workers.deep_summary_worker import process as deep_summary_process
from workers.embed_worker import process as embed_process
from workers.extract_worker import process as extract_process
from workers.presegment_worker import process as presegment_process
from workers.preview_worker import process as preview_process
from workers.stt_worker import process as stt_process
from workers.summarize_worker import process as summarize_process
@@ -207,6 +224,8 @@ def _load_workers():
from workers.fulltext_worker import process as fulltext_process
return {
# G2 (PR-G2-2): extract 前 번들 PDF → N 자식 분할 (deterministic ToC). 非PDF/단일은 통과.
"presegment": presegment_process,
"extract": extract_process,
"classify": classify_process,
"summarize": summarize_process,
@@ -0,0 +1,10 @@
-- 362: G2 pre-segmentation — 번들 PDF(여러 논리문서 한 파일) → N 자식 문서 분할.
-- 자식 doc 의 원본 내 page 범위(1-based inclusive) + 분할 역할 표식.
-- 부모-자식 관계 자체는 document_lineage(relation_type='segmented_from', migration 363).
-- presegment_role: NULL=일반 단일문서(대다수) / 'parent'=번들원본(자체 extract/embed 안 함) /
-- 'child'=논리 하위문서(부모 file_path 공유 + bundle_page_start/end 범위로 슬라이스).
-- 단일 ALTER(다중 절) = 1 statement (asyncpg 멀티스테이트먼트 제약 준수).
ALTER TABLE documents
ADD COLUMN IF NOT EXISTS bundle_page_start INTEGER,
ADD COLUMN IF NOT EXISTS bundle_page_end INTEGER,
ADD COLUMN IF NOT EXISTS presegment_role TEXT;
@@ -0,0 +1,8 @@
-- 363: G2 — document_lineage.relation_type 에 'segmented_from'(번들 → 자식) 추가.
-- 217 의 column-level CHECK(PG 자동명 document_lineage_relation_type_check, 배포 DB 실측 확인)
-- 를 교체. DROP + ADD 를 단일 ALTER 의 두 절로 = 1 statement.
-- 멱등: DROP ... IF EXISTS 라 재실행 안전(이미 교체됐으면 새 제약 DROP 후 동일 재생성).
ALTER TABLE document_lineage
DROP CONSTRAINT IF EXISTS document_lineage_relation_type_check,
ADD CONSTRAINT document_lineage_relation_type_check
CHECK (relation_type IN ('cited','summarized_from','generated_from','revised_from','segmented_from'));
@@ -0,0 +1,5 @@
-- 364: G2 — process_stage 큐 스테이지 enum 에 'presegment' 추가 (extract 前 번들 분할 단계).
-- PG16: ALTER TYPE ADD VALUE 는 트랜잭션 내 실행 가능(값 추가만, 同 트랜잭션 내 사용은 안 함 —
-- 사용은 후속 마이그/런타임). IF NOT EXISTS = 재실행 멱등.
-- (이 한 줄 단독 파일 — 1 statement.)
ALTER TYPE process_stage ADD VALUE IF NOT EXISTS 'presegment';
+400
View File
@@ -0,0 +1,400 @@
"""PR-G2-3 — presegment LLM 경계 폴백 단위 테스트.
scaffold-first 안전성 박제:
(a) parse_json_response + SegmentationOutput 대표 fixture(ToC-less 120p 3 segments) 검증
(b) 검증 게이트(_is_clear_bundle) 정상 응답 수락 / 비정상(중첩·gap·tiny child·N>MAX) 거부
(c) flag OFF(기본) LLM 절대 호출 (call_deep count==0), flag ON 호출됨(positive control)
DB·PyMuPDF 불요(unit) AsyncSession 최소 fake, fitz sys.modules 주입 fake.
라이브 LLM 호출 없음(call_deep fixture 반환 monkeypatch). worker-process 레벨 E2E( PDF
번들 분할, 보류 백오프 DB 기록) GPU 라이브 게이트에서 별도 실측.
[[feedback_external_api_fixture_first]] / [[feedback_scaffold_first_for_external_cost_pr]]
"""
from __future__ import annotations
import json
import sys
import types
from pathlib import Path
import pytest
sys.path.insert(0, str(Path(__file__).parent.parent / "app"))
from ai.client import parse_json_response # noqa: E402
import workers.presegment_worker as pw # noqa: E402
from workers.presegment_worker import ( # noqa: E402
SegmentationOutput,
_is_clear_bundle,
_segments_from_output,
)
# ─── 대표 fixture: ToC-less 120p 번들 → 3 segments (1-based inclusive, 전범위·무중첩) ───
GOOD_LLM_JSON = json.dumps(
{
"is_bundle": True,
"segments": [
{"start_page": 1, "end_page": 40, "title": "문서 A"},
{"start_page": 41, "end_page": 85, "title": "문서 B"},
{"start_page": 86, "end_page": 120, "title": "문서 C"},
],
"confidence": 0.82,
},
ensure_ascii=False,
)
PAGE_COUNT = 120
# ─── (a) parse_json_response + SegmentationOutput 검증 ──────────────────────
def test_parse_and_validate_good_fixture():
parsed = parse_json_response(GOOD_LLM_JSON)
assert parsed is not None
out = SegmentationOutput.model_validate(parsed)
assert out.is_bundle is True
assert len(out.segments) == 3
assert out.segments[0].start_page == 1
assert out.segments[-1].end_page == PAGE_COUNT
assert out.confidence == pytest.approx(0.82)
def test_parse_tolerates_think_and_fence():
"""house parse_json_response 가 <think> + ```json fence 를 벗겨낸다."""
wrapped = f"<think>분석중...</think>\n```json\n{GOOD_LLM_JSON}\n```"
parsed = parse_json_response(wrapped)
out = SegmentationOutput.model_validate(parsed)
assert out.is_bundle is True and len(out.segments) == 3
# ─── (b) 검증 게이트 accept / reject ────────────────────────────────────────
def _segments(*spans):
return [{"start_page": s, "end_page": e, "title": ""} for (s, e) in spans]
def test_gate_accepts_good():
out = SegmentationOutput.model_validate(parse_json_response(GOOD_LLM_JSON))
segs = _segments_from_output(out)
clear, reason = _is_clear_bundle(segs, PAGE_COUNT)
assert clear is True, reason
assert reason == ""
def test_gate_rejects_overlap():
# 41 이어야 할 두번째 start 가 40 으로 중첩
clear, reason = _is_clear_bundle(_segments((1, 40), (40, 85), (86, 120)), PAGE_COUNT)
assert clear is False
assert "non_contiguous" in reason
def test_gate_rejects_gap():
# 40 다음이 42 로 시작 → 41 빈틈 (non_contiguous 로 검출)
clear, reason = _is_clear_bundle(_segments((1, 40), (42, 85), (86, 120)), PAGE_COUNT)
assert clear is False
assert "non_contiguous" in reason
def test_gate_rejects_tiny_child():
# 두번째 자식 41..43 = 3p < MIN_CHILD_PAGES(5)
clear, reason = _is_clear_bundle(_segments((1, 40), (41, 43), (44, 120)), PAGE_COUNT)
assert clear is False
assert "child_too_small" in reason
def test_gate_rejects_coverage_not_full():
# 마지막이 page_count 에 못 미침
clear, reason = _is_clear_bundle(_segments((1, 40), (41, 85), (86, 110)), PAGE_COUNT)
assert clear is False
assert "last_end_not_page_count" in reason
def test_gate_rejects_too_many_children():
# N > MAX_CHILDREN — 각 자식 MIN_CHILD_PAGES 만족시키되 개수만 초과
n = pw.MAX_CHILDREN + 1
pc = n * pw.MIN_CHILD_PAGES
spans = [
(i * pw.MIN_CHILD_PAGES + 1, (i + 1) * pw.MIN_CHILD_PAGES) for i in range(n)
]
clear, reason = _is_clear_bundle(_segments(*spans), pc)
assert clear is False
assert "too_many_children" in reason
def test_gate_rejects_single_segment():
clear, reason = _is_clear_bundle(_segments((1, 120)), PAGE_COUNT)
assert clear is False
assert "too_few_level1_entries" in reason
# ─── 공통 fake (DB / PyMuPDF) ──────────────────────────────────────────────
class _FakeDoc:
"""presegment 가 읽는 Document 필드만 가진 최소 stand-in."""
def __init__(self, doc_id=1):
self.id = doc_id
self.file_path = "PKM/bundle.pdf"
self.file_hash = "deadbeef"
self.file_format = "pdf"
self.file_size = 123
self.file_type = "document"
self.import_source = "upload"
self.original_filename = "bundle.pdf"
self.source_channel = None
self.category = None
self.data_origin = None
self.doc_purpose = None
self.material_type = None
self.jurisdiction = None
self.title = "번들"
self.presegment_role = None
self.bundle_page_start = None
self.bundle_page_end = None
self.extracted_at = None
self.extracted_text = None
class _ScalarResult:
def __init__(self, rows):
self._rows = rows
def scalars(self):
return self
def all(self):
return list(self._rows)
class _FakeSession:
"""_create_children / process 가 쓰는 AsyncSession 표면만 구현.
execute() = 기존 자식 lineage 조회 결과( 분할). add/flush child.id 부여.
get() = document_id 미리 등록한 doc, child_id 생성된 child.
"""
def __init__(self, doc):
self._docs = {doc.id: doc}
self.added = []
self.commits = 0
self.enqueued = [] # enqueue_stage monkeypatch 가 채움
self._next_id = 1000
async def get(self, _model, oid):
return self._docs.get(oid)
async def execute(self, _stmt):
# _create_children 의 기존 자식 조회 → 항상 빈(첫 분할). enqueue_stage 는 monkeypatch.
return _ScalarResult([])
def add(self, obj):
self.added.append(obj)
# child Document 에 id 부여 (flush 대용 — _FakeDoc/실 Document 모두 setattr 가능)
if getattr(obj, "id", None) is None and hasattr(obj, "presegment_role"):
self._next_id += 1
obj.id = self._next_id
self._docs[obj.id] = obj
async def flush(self):
for obj in self.added:
if getattr(obj, "id", None) is None and hasattr(obj, "presegment_role"):
self._next_id += 1
obj.id = self._next_id
self._docs[obj.id] = obj
async def commit(self):
self.commits += 1
def _install_fake_fitz(monkeypatch, *, page_count=PAGE_COUNT, toc=None, first_lines=None):
"""sys.modules['fitz'] 에 fake 주입 — worker 의 `import fitz` 가 이걸 받게 한다."""
toc = toc or []
class _FakePage:
def __init__(self, idx):
self._idx = idx
def get_text(self):
if first_lines and self._idx < len(first_lines):
return first_lines[self._idx]
return f"page {self._idx + 1} body text"
class _FakePdf:
def __init__(self):
self.page_count = page_count
def get_toc(self, simple=True):
return list(toc)
def __getitem__(self, idx):
return _FakePage(idx)
def __enter__(self):
return self
def __exit__(self, *exc):
return False
fake = types.ModuleType("fitz")
fake.open = lambda *_a, **_k: _FakePdf()
monkeypatch.setitem(sys.modules, "fitz", fake)
return fake
class _SpyClient:
"""AIClient stand-in — call_deep 호출 횟수 카운트 + 지정 응답 반환."""
calls = 0
response = GOOD_LLM_JSON
def __init__(self):
type(self).calls += 1 # 인스턴스화 자체는 비용 아님 — 호출 카운트는 call_deep 기준
async def call_deep(self, prompt, system=None):
type(self)._deep_calls += 1
return type(self).response
async def close(self):
pass
@pytest.fixture(autouse=True)
def _reset_spy():
_SpyClient.calls = 0
_SpyClient._deep_calls = 0
_SpyClient.response = GOOD_LLM_JSON
yield
# ─── (b) _llm_boundary_fallback 수락/거부 (mocked LLM) ──────────────────────
@pytest.mark.asyncio
async def test_fallback_accepts_good_and_creates_children(monkeypatch):
"""정상 LLM 응답 → 게이트 통과 → _create_children 가 3 자식 + parent 표식."""
_install_fake_fitz(monkeypatch)
monkeypatch.setattr(pw, "AIClient", _SpyClient)
# enqueue_stage 는 DB 의존 — no-op 으로 대체 (호출 인자만 기록)
enq = []
async def _fake_enqueue(session, doc_id, stage, **kw):
enq.append((doc_id, stage))
return True
monkeypatch.setattr(pw, "enqueue_stage", _fake_enqueue)
doc = _FakeDoc()
session = _FakeSession(doc)
ok = await pw._llm_boundary_fallback(doc, Path("/tmp/bundle.pdf"), PAGE_COUNT, session)
assert ok is True
assert _SpyClient._deep_calls == 1
# 자식 3개 생성 + parent 표식 + lineage 3 + commit
children = [o for o in session.added if getattr(o, "presegment_role", None) == "child"]
assert len(children) == 3
assert doc.presegment_role == "parent"
assert sum(1 for o in session.added if o.__class__.__name__ == "DocumentLineage") == 3
assert {s for (_id, s) in enq} == {"extract"}
@pytest.mark.asyncio
async def test_fallback_rejects_bad_segments(monkeypatch):
"""LLM 이 중첩 경계 반환 → 게이트 거부 → False + 자식 0 (단일문서)."""
_install_fake_fitz(monkeypatch)
bad = json.dumps({
"is_bundle": True,
"segments": [
{"start_page": 1, "end_page": 40},
{"start_page": 40, "end_page": 85}, # 중첩
{"start_page": 86, "end_page": 120},
],
})
_SpyClient.response = bad
monkeypatch.setattr(pw, "AIClient", _SpyClient)
async def _fake_enqueue(*a, **k):
return True
monkeypatch.setattr(pw, "enqueue_stage", _fake_enqueue)
doc = _FakeDoc()
session = _FakeSession(doc)
ok = await pw._llm_boundary_fallback(doc, Path("/tmp/b.pdf"), PAGE_COUNT, session)
assert ok is False
assert _SpyClient._deep_calls == 1
assert [o for o in session.added if getattr(o, "presegment_role", None) == "child"] == []
assert doc.presegment_role is None
@pytest.mark.asyncio
async def test_fallback_rejects_is_bundle_false(monkeypatch):
"""is_bundle=false → 호출은 했으나 분할 안 함(False, 자식 0)."""
_install_fake_fitz(monkeypatch)
_SpyClient.response = json.dumps({"is_bundle": False, "segments": []})
monkeypatch.setattr(pw, "AIClient", _SpyClient)
async def _fake_enqueue(*a, **k):
return True
monkeypatch.setattr(pw, "enqueue_stage", _fake_enqueue)
doc = _FakeDoc()
session = _FakeSession(doc)
ok = await pw._llm_boundary_fallback(doc, Path("/tmp/b.pdf"), PAGE_COUNT, session)
assert ok is False
assert _SpyClient._deep_calls == 1
assert doc.presegment_role is None
# ─── (c) flag gating — OFF=호출 0 (deployed default 무변), ON=호출됨 ───────────
@pytest.mark.asyncio
async def test_flag_off_never_calls_llm(monkeypatch):
"""PRESEGMENT_LLM_FALLBACK=False(기본) → 큰 ToC-less PDF 도 LLM 미호출 = 오늘과 동일."""
monkeypatch.setattr(pw, "PRESEGMENT_LLM_FALLBACK", False)
_install_fake_fitz(monkeypatch, page_count=120, toc=[]) # 대형 + level-1 ToC 없음 = 애매
monkeypatch.setattr(pw, "AIClient", _SpyClient)
monkeypatch.setattr(pw, "_resolve_path", lambda raw: Path("/tmp/bundle.pdf"))
async def _fake_enqueue(*a, **k):
return True
monkeypatch.setattr(pw, "enqueue_stage", _fake_enqueue)
doc = _FakeDoc()
session = _FakeSession(doc)
await pw.process(doc.id, session)
assert _SpyClient._deep_calls == 0 # ★ LLM 절대 호출 안 됨
assert doc.presegment_role is None # 단일문서 (분할 안 함)
assert session.commits == 0
@pytest.mark.asyncio
async def test_flag_on_calls_llm_and_splits(monkeypatch):
"""positive control — flag ON 이면 같은 입력에 LLM 호출 + 게이트 통과 시 분할."""
monkeypatch.setattr(pw, "PRESEGMENT_LLM_FALLBACK", True)
_install_fake_fitz(monkeypatch, page_count=120, toc=[])
_SpyClient.response = GOOD_LLM_JSON
monkeypatch.setattr(pw, "AIClient", _SpyClient)
monkeypatch.setattr(pw, "_resolve_path", lambda raw: Path("/tmp/bundle.pdf"))
async def _fake_enqueue(*a, **k):
return True
monkeypatch.setattr(pw, "enqueue_stage", _fake_enqueue)
doc = _FakeDoc()
session = _FakeSession(doc)
await pw.process(doc.id, session)
assert _SpyClient._deep_calls == 1 # LLM 호출됨
assert doc.presegment_role == "parent" # 분할 수행
children = [o for o in session.added if getattr(o, "presegment_role", None) == "child"]
assert len(children) == 3