Merge pull request 'Feat/presegment' (#48) from feat/presegment into main

Reviewed-on: #48
This commit was merged in pull request #48.
This commit is contained in:
2026-06-18 17:36:32 +09:00
12 changed files with 582 additions and 27 deletions
+4 -2
View File
@@ -1166,8 +1166,10 @@ async def upload_document(
doc.duplicate_of = canonical.id
canonical.duplicate_count = (canonical.duplicate_count or 0) + 1
# document + processing_queue 는 단일 트랜잭션으로 묶어 원자적 정리
await enqueue_stage(session, doc.id, "extract")
# document + processing_queue 는 단일 트랜잭션으로 묶어 원자적 정리.
# G2: 첫 stage=presegment (extract 前 번들 PDF 분할, 후보 A 검증완료 2026-06-18).
# 非PDF/단일은 presegment 가 무변 통과 → extract. 번들 PDF 만 N 자식 분할(worker-side gating).
await enqueue_stage(session, doc.id, "presegment")
await session.commit()
except Exception:
# DB 예외 시 session 은 get_session 컨텍스트 종료로 자동 rollback.
+8
View File
@@ -41,6 +41,14 @@ class Document(Base):
Integer, nullable=False, default=0, server_default="0"
)
# G2 pre-segmentation (migration 362): 번들 PDF → N 자식 분할.
# presegment_role: NULL=일반 단일문서 / 'parent'=번들원본(자체 extract/embed 안 함) /
# 'child'=논리 하위문서(부모 file_path 공유 + bundle_page_start/end 1-based inclusive 범위).
# 부모-자식 관계 자체는 document_lineage(relation_type='segmented_from').
bundle_page_start: Mapped[int | None] = mapped_column(Integer)
bundle_page_end: Mapped[int | None] = mapped_column(Integer)
presegment_role: Mapped[str | None] = mapped_column(Text)
# 2계층: 텍스트 추출
extracted_text: Mapped[str | None] = mapped_column(Text)
extracted_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True))
+31
View File
@@ -0,0 +1,31 @@
"""document_lineage 테이블 ORM — 문서 파생 관계 이력 (migration 217).
G2 pre-segmentation 이 relation_type='segmented_from'(번들 → 자식) 으로 사용 (migration 363).
이력 테이블 FK = ON DELETE RESTRICT (부모 hard delete 차단, soft delete 만 허용).
"""
from datetime import datetime
from sqlalchemy import BigInteger, ForeignKey, Text, func
from sqlalchemy.dialects.postgresql import JSONB
from sqlalchemy.orm import Mapped, mapped_column
from sqlalchemy.types import TIMESTAMP
from core.database import Base
class DocumentLineage(Base):
__tablename__ = "document_lineage"
id: Mapped[int] = mapped_column(BigInteger, primary_key=True)
source_document_id: Mapped[int] = mapped_column(
BigInteger, ForeignKey("documents.id", ondelete="RESTRICT"), nullable=False
)
derived_document_id: Mapped[int] = mapped_column(
BigInteger, ForeignKey("documents.id", ondelete="RESTRICT"), nullable=False
)
relation_type: Mapped[str] = mapped_column(Text, nullable=False)
# 'metadata' 는 SQLAlchemy 예약속성 → Python 속성명은 meta, DB 컬럼명은 metadata.
meta: Mapped[dict] = mapped_column(
"metadata", JSONB, nullable=False, default=dict, server_default="{}"
)
created_at: Mapped[datetime] = mapped_column(TIMESTAMP(timezone=True), server_default=func.now())
+2 -1
View File
@@ -46,9 +46,10 @@ class ProcessingQueue(Base):
# 'stt' (audio): migration 150 / 'thumbnail' (video): queue_consumer 가 enqueue.
# 'deep_summary' (PR-B B-1): classify_worker 가 에스컬레이션 시 enqueue.
# 'fulltext' (crawl-24x7 A-2): migration 321 — 기사 페이지 fetch 후 본문 승격.
# 'presegment' (G2): migration 364 — extract 前 번들 PDF → N 자식 분할.
# DB enum 변경은 마이그레이션이 처리하므로 create_type=False.
Enum(
"extract", "classify", "summarize", "embed", "chunk", "preview",
"presegment", "extract", "classify", "summarize", "embed", "chunk", "preview",
"stt", "thumbnail", "deep_summary", "markdown", "fulltext",
name="process_stage",
create_type=False,
+74 -7
View File
@@ -67,21 +67,45 @@ def _postprocess_ocr(text: str) -> str:
return text.strip()
def _extract_pdf_pymupdf(file_path: Path) -> str:
"""PyMuPDF fallback — 페이지 단위 스트리밍으로 대형 PDF도 저메모리 처리"""
def _extract_pdf_pymupdf(
file_path: Path, start_page: int | None = None, end_page: int | None = None
) -> str:
"""PyMuPDF fallback — 페이지 단위 스트리밍으로 대형 PDF도 저메모리 처리.
G2 (PR-G2-2): start_page/end_page(1-based inclusive) 가 주어지면 그 범위만 추출
(번들 자식 doc = 부모 파일 공유 + 자기 page 범위). 둘 다 None = 전체(기존 동작 동일).
"""
import fitz
text_parts = []
with fitz.open(str(file_path)) as doc:
for page in doc:
text_parts.append(page.get_text())
if start_page is None and end_page is None:
for page in doc:
text_parts.append(page.get_text())
else:
# 1-based inclusive → 0-based range. 범위는 [0, page_count] 로 클램프(방어).
total = doc.page_count
lo = max(1, start_page or 1) - 1
hi = min(total, end_page or total) # inclusive 끝 (0-based 마지막 인덱스 = hi-1)
for i in range(lo, hi):
text_parts.append(doc.load_page(i).get_text())
return "\n".join(text_parts)
def _get_pdf_page_count(file_path: Path) -> int:
"""PDF 페이지 수 확인"""
def _get_pdf_page_count(
file_path: Path, start_page: int | None = None, end_page: int | None = None
) -> int:
"""PDF 페이지 수 확인. G2: 범위가 주어지면 그 범위의 페이지 수(자식 doc 밀도 계산용).
둘 다 None = 전체 페이지 수(기존 동작 동일).
"""
import fitz
with fitz.open(str(file_path)) as doc:
return len(doc)
total = len(doc)
if start_page is None and end_page is None:
return total
lo = max(1, start_page or 1)
hi = min(total, end_page or total)
return max(0, hi - lo + 1)
async def _call_ocr(file_path: Path, is_image: bool, max_pages: int = 200) -> str | None:
@@ -310,6 +334,49 @@ async def process(document_id: int, session: AsyncSession) -> None:
doc.extracted_at = datetime.now(timezone.utc)
return
# ─── G2 (PR-G2-2): 번들 자식 PDF — 부모 파일 공유 + 자기 page 범위만 추출 ───
# kordoc 서비스는 page-range 파라미터가 없어 전체 파일을 파싱한다(자식엔 부적합) → kordoc
# 우회, PyMuPDF 로 [bundle_page_start, bundle_page_end] 범위만 추출. range OCR 은 본 PR 범위
# 밖(자식은 ToC 존재 = digital text layer 전제 → 대개 OCR 불필요). PyMuPDF 텍스트가 빈약해도
# 그대로 보존하고 사유를 남긴다.
if fmt == "pdf" and doc.bundle_page_start is not None and doc.bundle_page_end is not None:
# 후보 A: 자식 file_path 는 합성값(`{부모}#p{s}-{e}`) → 실파일 = bundle_source_path 로 부모경로
# 복원 + NFC/NFD resolve. (자식 file_path 는 디스크에 없음.)
from workers.presegment_worker import _resolve_path as _resolve_bundle_path
from workers.presegment_worker import bundle_source_path
real_rel = bundle_source_path(doc.file_path)
src = _resolve_bundle_path(str(Path(settings.nas_mount_path) / real_rel))
if src is None:
raise FileNotFoundError(f"번들 원본 파일 없음: {real_rel}")
start, end = doc.bundle_page_start, doc.bundle_page_end
try:
pymupdf_text = _extract_pdf_pymupdf(src, start, end)
page_count = _get_pdf_page_count(src, start, end)
except Exception as e:
logger.error(f"[pymupdf:child] {doc.file_path} pages={start}-{end} 실패: {e}")
raise
meta = doc.extract_meta or {}
meta["presegment_child_range"] = {"start_page": start, "end_page": end}
meta["pymupdf_chars"] = len(pymupdf_text.strip())
should, reason = _should_ocr(pymupdf_text, page_count)
if should:
# range OCR 미지원(후속 PR) — PyMuPDF 결과 유지 + 사유 기록(silent skip 아님).
meta["ocr_skip_reason"] = "presegment_child_range_ocr_unsupported"
meta["ocr_reason"] = reason
logger.warning(
f"[pymupdf:child] {doc.file_path} pages={start}-{end} "
f"OCR 필요({reason})하나 range OCR 미지원 → PyMuPDF 결과 유지"
)
doc.extracted_text = pymupdf_text.replace("\x00", "")
doc.extracted_at = datetime.now(timezone.utc)
doc.extractor_version = PYMUPDF_VERSION if pymupdf_text.strip() else None
doc.extract_meta = meta
logger.info(
f"[pymupdf:child] {doc.file_path} pages={start}-{end} ({len(pymupdf_text)}자)"
)
return
# ─── kordoc 파싱 (HWP/HWPX/PDF) + PyMuPDF fallback + OCR ───
if fmt in KORDOC_FORMATS:
container_path = f"/documents/{doc.file_path}"
+6 -3
View File
@@ -118,16 +118,18 @@ def _route_media(path: Path, expected_category: str | None) -> tuple[str | None,
if expected_category == "library":
# 외부 작성 학습 자료 (KGS Code, 시행규칙 등). 문서 확장자만 수락.
# frontmatter 해석은 classify_worker (옵션 C) 가 담당. file_watcher 는 라우팅만.
# G2: 첫 stage=presegment (후보 A 검증완료). 非PDF/단일 통과, 번들 PDF 만 분할.
if ext in LIBRARY_DOC_EXTS:
return ("library", False, "extract")
return ("library", False, "presegment")
if ext in AUDIO_EXTS or ext in VIDEO_DIRECT_EXTS or ext in VIDEO_QUARANTINE_EXTS:
return (None, False, None) # audio/video 잘못 들어오면 skip
return (None, False, None) # 기타 알 수 없는 확장자 skip
# Inbox: 문서 파이프 (기존). audio/video 확장자가 실수로 여기 들어오면 skip.
# G2: 첫 stage=presegment (후보 A 검증완료). 非PDF/단일 통과, 번들 PDF 만 분할.
if ext in AUDIO_EXTS or ext in VIDEO_DIRECT_EXTS or ext in VIDEO_QUARANTINE_EXTS:
return (None, False, None)
return (None, False, "extract")
return (None, False, "presegment")
# ─── Web/Blog ingest (devonagent 트랙) 헬퍼 ──────────────────────────────────
@@ -226,7 +228,8 @@ async def _ingest_web_file(session, file_path: Path, rel_path: str) -> tuple[int
)
session.add(doc)
await session.flush()
await enqueue_stage(session, doc.id, "extract")
# G2: 첫 stage=presegment (후보 A 검증완료). HTML(非PDF)은 presegment 가 무변 통과 → extract.
await enqueue_stage(session, doc.id, "presegment")
return (1, 0)
+49 -10
View File
@@ -185,7 +185,10 @@ async def process(document_id: int, session: AsyncSession) -> None:
await _fail(session, document_id, "no file_path")
return
container_path = _to_marker_path(doc.file_path)
# 후보 A: 자식(bundle cols)은 합성 file_path(`{부모}#p{s}-{e}`) → 실파일 = bundle_source_path
# 로 부모경로 복원. 일반 doc 은 그대로(접미사 없음). marker/mineru 는 실파일 + page 범위로 변환.
from workers.presegment_worker import bundle_source_path
container_path = _to_marker_path(bundle_source_path(doc.file_path))
suffix = Path(container_path).suffix.lower()
# ---- (3) office/hwp → md (C-2): PDF 외 지원 포맷은 office_md 하이브리드 변환 ----
@@ -207,7 +210,21 @@ async def process(document_id: int, session: AsyncSession) -> None:
return
# ---- (4) page_count gauge + 분기 (LargeDoc split) ----
page_count = _get_page_count(container_path)
# G2 (PR-G2-2): 번들 자식 doc 은 부모 파일 공유 + 자기 page 범위([bundle_page_start, end],
# 1-based inclusive)만 변환해야 한다. page_offset = 절대 시작페이지(부모 파일 기준), page_count =
# 자식 범위의 페이지 수. cols 가 NULL(일반 doc)이면 page_offset=1 + 전체 page_count = 기존 동작 동일.
file_page_count = _get_page_count(container_path)
is_child = doc.bundle_page_start is not None and doc.bundle_page_end is not None
if is_child:
page_offset = doc.bundle_page_start
if file_page_count is not None:
child_end = min(doc.bundle_page_end, file_page_count)
page_count = max(0, child_end - doc.bundle_page_start + 1)
else:
page_count = doc.bundle_page_end - doc.bundle_page_start + 1
else:
page_offset = 1
page_count = file_page_count
# >MAX_SPLIT_PAGES = 변환 안전상태(manual_review). silently skip 아님.
if page_count is not None and page_count > MAX_SPLIT_PAGES:
@@ -226,20 +243,35 @@ async def process(document_id: int, session: AsyncSession) -> None:
# ---- (6) 변환 분기: 소형 1-shot / 대형(>SPLIT_THRESHOLD) page-range 분할 ----
if page_count is not None and page_count > SPLIT_THRESHOLD_PAGES:
await _process_split(doc, document_id, container_path, page_count, session)
await _process_split(doc, document_id, container_path, page_count, session, page_offset)
else:
await _process_single(doc, document_id, container_path, session)
await _process_single(doc, document_id, container_path, session, page_count, page_offset)
async def _process_single(
doc: Document, document_id: int, container_path: str, session: AsyncSession
doc: Document, document_id: int, container_path: str, session: AsyncSession,
page_count: int | None = None, page_offset: int = 1,
) -> None:
"""소형 PDF(≤ SPLIT_THRESHOLD_PAGES) 통째 1-shot 변환 (Phase 1B/1B.5 기존 경로)."""
"""소형 PDF(≤ SPLIT_THRESHOLD_PAGES) 통째 1-shot 변환 (Phase 1B/1B.5 기존 경로).
G2 (PR-G2-2): 번들 자식(page_offset>1) [page_offset, page_offset+page_count-1] 범위만
변환하도록 marker start_page/end_page 명시한다. 일반 doc(page_offset=1) 기존과
동일하게 max_pages 보낸다(payload byte-identical).
"""
# 일반 doc = 기존 payload 유지. 자식만 절대 page 범위를 명시(부모 파일 기준 1-based inclusive).
if page_offset > 1 and page_count is not None:
req_json = {
"file_path": container_path,
"start_page": page_offset,
"end_page": page_offset + page_count - 1,
}
else:
req_json = {"file_path": container_path, "max_pages": MAX_PAGES}
try:
async with httpx.AsyncClient(timeout=MARKER_TIMEOUT) as client:
resp = await client.post(
MARKER_ENDPOINT,
json={"file_path": container_path, "max_pages": MAX_PAGES},
json=req_json,
)
resp.raise_for_status()
data = resp.json()
@@ -513,6 +545,7 @@ async def _process_split(
container_path: str,
page_count: int,
session: AsyncSession,
page_offset: int = 1,
) -> None:
"""대형 PDF page-range 분할 변환.
@@ -523,6 +556,10 @@ async def _process_split(
invariant: page numbering = 1-based inclusive (batch1: 1..BATCH_PAGES, ...).
marker slug(`_page_0_*`) batch 마다 재시작 batch rewrite stitch (충돌 회피).
G2 (PR-G2-2): page_offset = 부모 파일 기준 절대 시작페이지(번들 자식). marker 보내는
page 절대값(page_offset 가산), manifest/기록은 자식 상대값(1-based) 유지 일반 doc
(page_offset=1) abs==rel 이라 기존 동작과 동일.
"""
n_batches = (page_count + BATCH_PAGES - 1) // BATCH_PAGES
succeeded: list[dict[str, Any]] = [] # {start_page, end_page, md}
@@ -534,15 +571,17 @@ async def _process_split(
async with httpx.AsyncClient(timeout=MARKER_TIMEOUT) as client:
for b in range(n_batches):
start_page = b * BATCH_PAGES + 1
start_page = b * BATCH_PAGES + 1 # 자식 상대 1-based (manifest/기록용)
end_page = min((b + 1) * BATCH_PAGES, page_count)
abs_start = start_page + (page_offset - 1) # 부모 파일 절대 page (marker 요청용)
abs_end = end_page + (page_offset - 1)
try:
resp = await client.post(
MARKER_ENDPOINT,
json={
"file_path": container_path,
"start_page": start_page,
"end_page": end_page,
"start_page": abs_start,
"end_page": abs_end,
},
)
resp.raise_for_status()
+362
View File
@@ -0,0 +1,362 @@
"""presegment_worker — extract 前 번들 PDF(여러 논리문서 한 파일) → N 자식 분할 (G2 / PR-G2-2).
문서가 presegment stage 진입한다(worker-side gating):
- 非PDF(file_format != pdf · suffix != .pdf) = 즉시 fast-exit enqueue_next_stage extract 흘림.
- PDF = PyMuPDF ToC(level-1) deterministic 분석. '명확한 번들' 자식 분할, 나머지는 단일문서로 extract.
PR **deterministic ** (LLM fallback = 후속 PR). 판정이 애매하면 보수적으로 분할하지 않고
단일문서로 둔다(bias to NOT splitting). 분할 = '확실한 번들' :
- page_count >= MIN_BUNDLE_PAGES AND level-1 ToC 항목 >= 2 AND 모든 자식 >= MIN_CHILD_PAGES
AND 단조 증가·비중첩 AND [1, page_count] 범위 커버 AND 2 <= N <= MAX_CHILDREN.
분할 후보 A(물리분할 없음, uq_documents_file_path 해소): 자식 file_path = unique 합성값
`{부모경로}#p{start}-{end}` (UNIQUE 제약 통과), 실파일은 `bundle_source_path()` 로 부모 경로 복원.
자식은 bundle_page_start/end(1-based inclusive) 부모 파일의 자기 page 범위만 가리킨다.
부모-자식 관계 정본 = document_lineage(relation_type='segmented_from'). 부모(presegment_role='parent')
파일 홀더라 자체 extract/embed enqueue_next_stage presegmentextract 전이가 'parent'
억제된다(queue_consumer 참조). 자식의 extract 워커가 직접 enqueue. extract_worker/marker_worker
자식 처리 bundle_source_path() 실파일 접근.
멱등: 재실행 같은 부모로 이미 자식이 있으면(document_lineage segmented_from) 재생성하지 않고
수렴( 자식이 extract 활성/완료 상태인지만 보장)한다.
해결 이력 (2026-06-18): 최초 Option A(자식이 부모 file_path 그대로 공유) uq_documents_file_path
UNIQUE 위반(실번들 검증서 발견) 합성 file_path(후보 A) 해소. 인제스트 재활성 = 합성번들 재검증 PASS .
plan: G2 pre-segmentation (PR-G2-2 deterministic ToC segmentation)
"""
import hashlib
import os
import re
import unicodedata
from pathlib import Path
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from core.config import settings
from core.utils import setup_logger
from models.document import Document
from models.document_lineage import DocumentLineage
from models.queue import enqueue_stage
logger = setup_logger("presegment_worker")
# ─── 임계값 (모듈 상수, env-override 가능, 보수적 = 분할 안 하는 쪽으로 bias) ───
# MIN_BUNDLE_PAGES: 이 미만이면 번들로 보지 않음(단일문서). 짧은 문서의 우연한 level-1 ToC 보호.
MIN_BUNDLE_PAGES = int(os.getenv("PRESEGMENT_MIN_BUNDLE_PAGES", "60"))
# MIN_CHILD_PAGES: 자식 하나라도 이 미만이면 분할 거부(표지/목차만 떼지는 over-split 방지).
MIN_CHILD_PAGES = int(os.getenv("PRESEGMENT_MIN_CHILD_PAGES", "5"))
# MAX_CHILDREN: 자식 수 상한. 초과 = ToC 가 챕터/소제목 수준이라 논리문서 경계가 아님 → 분할 거부.
MAX_CHILDREN = int(os.getenv("PRESEGMENT_MAX_CHILDREN", "50"))
# marker_worker._to_marker_path 와 동일 — NAS 상대경로 → 컨테이너 절대경로 prefix.
CONTAINER_PATH_PREFIX = os.getenv("MARKER_CONTAINER_PATH_PREFIX", "/documents")
def _resolve_path(file_path: str) -> Path | None:
"""NFC(DB) vs NFD(NFS) 한글 경로 차이 흡수. thumbnail_worker._resolve_path 와 동일 패턴."""
candidates = [
file_path,
unicodedata.normalize("NFD", file_path),
unicodedata.normalize("NFC", file_path),
]
for c in candidates:
p = Path(c)
if p.exists():
return p
parent = Path(file_path).parent
if parent.exists():
target = unicodedata.normalize("NFC", Path(file_path).name)
for child in parent.iterdir():
if unicodedata.normalize("NFC", child.name) == target:
return child
return None
def _to_container_path(file_path: str) -> str:
"""file_path 를 컨테이너 내부 절대경로로 변환 (marker_worker._to_marker_path 와 동일)."""
if file_path.startswith("/"):
return file_path
return f"{CONTAINER_PATH_PREFIX}/{file_path}"
# 후보 A: 자식 합성 file_path 패턴 `{부모경로}#p{start}-{end}` (uq_documents_file_path 유일성).
_BUNDLE_SUFFIX_RE = re.compile(r"#p\d+-\d+$")
def bundle_source_path(file_path: str | None) -> str | None:
"""자식 합성 file_path → 부모 실파일 경로 복원. 일반 doc(접미사 없음)은 그대로 반환.
extract_worker/marker_worker 자식 처리 실제 파일 접근에 사용 (자식 file_path
합성값이라 디스크에 없음). 결정적·세션 불필요. lineage 부모-자식 관계의 정본 기록.
"""
if not file_path:
return file_path
return _BUNDLE_SUFFIX_RE.sub("", file_path)
def _is_pdf(doc: Document) -> bool:
"""PDF 판정 — file_format=pdf 또는 .pdf 확장자."""
fmt = (doc.file_format or "").lower()
if fmt == "pdf":
return True
if doc.file_path:
return Path(doc.file_path).suffix.lower() == ".pdf"
return False
def _level1_segments(toc: list, page_count: int) -> list[dict]:
"""get_toc(simple=True) 결과에서 level-1 항목만 골라 자식 후보 segment 리스트 생성.
toc 항목 = [level, title, page] (page 1-based). level==1 채택.
end_page = 다음 level-1 항목의 page - 1, 마지막 = page_count.
동일 page 에서 시작하는 level-1 여럿이면 정렬 인접 항목으로 경계 계산되며,
경우 0-페이지 segment 생겨 후속 검증(MIN_CHILD_PAGES·단조)에서 거부된다.
"""
starts = []
for entry in toc:
# simple=True 는 [level, title, page]. 방어적으로 길이 체크.
if not entry or len(entry) < 3:
continue
level, title, page = entry[0], entry[1], entry[2]
if level != 1:
continue
# ToC page 가 범위 밖(0/음수/page_count 초과)이면 깨진 ToC → 후속 검증에서 거부됨.
starts.append((int(page), (title or "").strip()))
# ToC 가 정렬돼 있지 않을 수 있으므로 page 기준 정렬(원본 순서 보존 위해 안정 정렬).
starts.sort(key=lambda x: x[0])
segments: list[dict] = []
for i, (start_page, title) in enumerate(starts):
if i + 1 < len(starts):
end_page = starts[i + 1][0] - 1
else:
end_page = page_count
segments.append({"start_page": start_page, "end_page": end_page, "title": title})
return segments
def _is_clear_bundle(segments: list[dict], page_count: int) -> tuple[bool, str]:
"""deterministic '명확한 번들' 판정. (clear, reason) 반환.
clear=True reason="" / clear=False reason 거부 사유(로깅용).
모든 조건은 보수적 하나라도 어긋나면 단일문서로 처리(분할 ).
"""
n = len(segments)
if n < 2:
return False, f"too_few_level1_entries(n={n})"
if n > MAX_CHILDREN:
return False, f"too_many_children(n={n}>{MAX_CHILDREN})"
# 첫 segment 가 1페이지에서 시작 + 마지막이 page_count 에서 끝 = 전 범위 커버.
if segments[0]["start_page"] != 1:
return False, f"first_start_not_1(start={segments[0]['start_page']})"
if segments[-1]["end_page"] != page_count:
return False, f"last_end_not_page_count(end={segments[-1]['end_page']},pc={page_count})"
prev_end = 0
for seg in segments:
start, end = seg["start_page"], seg["end_page"]
# 단조 증가 · 비중첩: 각 start 는 직전 end + 1 이어야 빈틈/겹침 없이 [1,pc] 정확 분할.
if start != prev_end + 1:
return False, f"non_contiguous(start={start},prev_end={prev_end})"
if end < start:
return False, f"non_monotonic(start={start},end={end})"
if (end - start + 1) < MIN_CHILD_PAGES:
return False, f"child_too_small(pages={end - start + 1}<{MIN_CHILD_PAGES})"
prev_end = end
if prev_end != page_count:
return False, f"coverage_gap(covered={prev_end},pc={page_count})"
return True, ""
def _child_title(parent: Document, seg: dict) -> str:
"""자식 제목 = 부모 제목 + '' + (segment 제목 또는 page 범위)."""
base = (parent.title or "").strip() or (parent.original_filename or "") or "문서"
seg_title = (seg.get("title") or "").strip()
suffix = seg_title if seg_title else f"p.{seg['start_page']}-{seg['end_page']}"
return f"{base}{suffix}"
def _child_file_hash(parent_hash: str, start: int, end: int) -> str:
"""자식 file_hash = sha256(f"{parent.file_hash}:{start}-{end}"). 결정적 → 재실행 멱등.
부모 file_hash NULL 수는 없으나(NOT NULL) 방어적으로 문자열 처리.
"""
return hashlib.sha256(f"{parent_hash or ''}:{start}-{end}".encode("utf-8")).hexdigest()
async def _ensure_child_extract(session: AsyncSession, child_id: int) -> None:
"""자식이 아직 extract 안 됐으면 extract enqueue (멱등 수렴 경로).
이미 extracted_text 채워졌거나 활성 행이 있으면 enqueue_stage no-op/skip.
"""
child = await session.get(Document, child_id)
if child is None:
return
# 이미 추출 완료면 재enqueue 불필요 (큐 중복은 enqueue_stage 가 막지만 의미상으로도 skip).
if child.extracted_at is not None and child.extracted_text is not None:
return
await enqueue_stage(session, child_id, "extract")
async def process(document_id: int, session: AsyncSession) -> None:
"""presegment stage 워커 진입점. queue_consumer 가 호출.
문서가 진입하며, 非PDF·단일문서는 변경 없이 통과(presegment_role 그대로 NULL) extract 흐른다.
'명확한 번들' PDF 자식 분할 + 부모를 'parent' 표식( 경우 부모는 extract 흐르지 않음).
"""
doc = await session.get(Document, document_id)
if doc is None:
logger.warning(f"[presegment] document {document_id} not found")
return
# ─── (0) 非PDF — fast-exit. presegment_role 그대로 NULL → enqueue_next_stage 가 extract 로 흘림 ───
if not _is_pdf(doc):
logger.info(f"[presegment] id={document_id} non-pdf (fmt={doc.file_format}) → extract")
return
# ─── (0.5) file_path 없음(예: note) — 분할 불가, 단일문서로 통과 ───
if not doc.file_path:
logger.info(f"[presegment] id={document_id} no file_path → extract")
return
# ─── (1) 이미 분할된 자식 자신이 presegment 로 다시 들어온 경우 — 재분할 금지 ───
# (정상 흐름에선 자식은 곧장 extract 로 enqueue 되지만, 재처리 스크립트 등으로 들어올 수 있음.)
if doc.presegment_role in ("child", "parent"):
logger.info(
f"[presegment] id={document_id} already presegment_role={doc.presegment_role} → skip"
)
return
# ─── (2) 파일 열기 + page_count ───
raw = str(Path(settings.nas_mount_path) / doc.file_path)
source = _resolve_path(raw)
if source is None:
# 파일 부재 = extract 가 동일 상황에서 FileNotFoundError 로 처리할 사안.
# presegment 는 분할 불가일 뿐이므로 단일문서로 통과시켜 extract 가 일관되게 처리하게 둔다.
logger.warning(f"[presegment] id={document_id} file not found ({raw}) → extract")
return
import fitz # PyMuPDF — extract_worker/marker_worker 와 동일 의존
try:
with fitz.open(str(source)) as pdf:
page_count = pdf.page_count
toc = pdf.get_toc(simple=True) or []
except Exception as exc:
# PDF 손상 등 — 분할 불가. 단일문서로 통과(extract 가 PyMuPDF/OCR 로 재시도하며 가시화).
logger.warning(
f"[presegment] id={document_id} fitz open/toc failed "
f"({type(exc).__name__}: {exc}) → extract"
)
return
# ─── (3) page_count 가 임계 미만 = 단일문서 (대다수 경로) ───
if page_count < MIN_BUNDLE_PAGES:
logger.info(
f"[presegment] id={document_id} single doc "
f"(pages={page_count}<{MIN_BUNDLE_PAGES}) → extract"
)
return
# ─── (4) level-1 ToC → 자식 후보 segment ───
segments = _level1_segments(toc, page_count)
if not segments:
# 큰 PDF 인데 ToC 없음/level-1 없음 = 애매(LLM fallback 대상, 후속 PR).
# 이 PR 은 기본 = 단일문서로 처리하고 사유를 남긴다.
logger.info(
f"[presegment] presegment_ambiguous id={document_id} "
f"reason=no_level1_toc pages={page_count} → single doc(extract)"
)
return
clear, reason = _is_clear_bundle(segments, page_count)
if not clear:
# 큰 PDF + ToC 는 있으나 '명확한 번들' 기준 미달 = 애매 → 단일문서(분할 안 함).
logger.info(
f"[presegment] presegment_ambiguous id={document_id} "
f"reason={reason} pages={page_count} level1={len(segments)} → single doc(extract)"
)
return
# ─── (5) 명확한 번들 — 멱등 체크: 이미 자식이 있으면 수렴만 ───
existing_children = (
await session.execute(
select(DocumentLineage.derived_document_id).where(
DocumentLineage.source_document_id == doc.id,
DocumentLineage.relation_type == "segmented_from",
)
)
).scalars().all()
if existing_children:
# 부모 표식이 누락된 경우 보정(이전 부분실패 복구).
if doc.presegment_role != "parent":
doc.presegment_role = "parent"
for child_id in existing_children:
await _ensure_child_extract(session, child_id)
await session.commit()
logger.info(
f"[presegment] id={document_id} children already exist "
f"(n={len(existing_children)}) → converge(ensure extract), no re-create"
)
return
# ─── (6) 자식 N개 생성 + lineage + extract enqueue ───
n = len(segments)
created_ids: list[int] = []
for seg in segments:
start, end = seg["start_page"], seg["end_page"]
child = Document(
# 후보 A: 자식 file_path = unique 합성값 `{부모경로}#p{s}-{e}` (uq_documents_file_path
# 충돌 회피). 실파일은 bundle_source_path() 로 복원(부모 경로). 물리 분할 없음 —
# 자식은 bundle_page_start/end 로 부모 파일을 슬라이스.
file_path=f"{doc.file_path}#p{start}-{end}",
file_hash=_child_file_hash(doc.file_hash, start, end),
file_format=doc.file_format,
file_size=doc.file_size,
file_type=doc.file_type,
import_source=doc.import_source,
original_filename=doc.original_filename,
source_channel=doc.source_channel,
category=doc.category,
data_origin=doc.data_origin,
doc_purpose=doc.doc_purpose,
# 안전 자료실 축은 부모에서 상속(분할이 자료유형/관할을 바꾸지 않음).
material_type=doc.material_type,
jurisdiction=doc.jurisdiction,
title=_child_title(doc, seg),
bundle_page_start=start,
bundle_page_end=end,
presegment_role="child",
)
session.add(child)
await session.flush() # child.id 확보
created_ids.append(child.id)
session.add(
DocumentLineage(
source_document_id=doc.id,
derived_document_id=child.id,
relation_type="segmented_from",
meta={"start_page": start, "end_page": end},
)
)
# 자식 extract 는 워커가 직접 enqueue (부모는 'parent' 라 extract 로 흐르지 않음).
await enqueue_stage(session, child.id, "extract")
# 부모 = 파일 홀더. presegment→extract 전이는 enqueue_next_stage 가 'parent' 면 억제.
doc.presegment_role = "parent"
await session.commit()
logger.info(
f"[presegment] id={document_id} SPLIT into {n} children "
f"(pages={page_count}) child_ids={created_ids}"
)
+23 -4
View File
@@ -31,9 +31,9 @@ _hold_logged = False
# embed/chunk 1→10 (2026-06-12 fast-consumer): 건당 <1s 실측 — Phase 0.1 초기 보수값이
# LLM 사이클에 인질로 잡혀 실효 ~580/일 vs 수요 최대 2,700/일 → 적체 원인이었음.
# 10 = TEI/marker 와 GPU 공유 고려한 보수 상향(전용 1분 잡 기준 캡 ~14,400/일).
BATCH_SIZE = {"extract": 5, "classify": 3, "summarize": 3, "embed": 10, "chunk": 10,
"preview": 2, "stt": 1, "thumbnail": 3, "deep_summary": 1, "markdown": 1,
"fulltext": 3}
BATCH_SIZE = {"presegment": 3, "extract": 5, "classify": 3, "summarize": 3, "embed": 10,
"chunk": 10, "preview": 2, "stt": 1, "thumbnail": 3, "deep_summary": 1,
"markdown": 1, "fulltext": 3}
STALE_THRESHOLD_MINUTES = 10
# markdown 대형 split 변환은 한 doc 이 수십 분(5210 ≈ 40분) 동안 processing 상태로 머문다.
# marker_worker 는 queue 행에 heartbeat 를 찍지 않으므로(started_at 고정), main 의 10분
@@ -46,7 +46,7 @@ MARKDOWN_STALE_THRESHOLD_MINUTES = int(os.getenv("MARKDOWN_STALE_MINUTES", "120"
# (reset_stale_items 가 자기 집합만 reset, 교차 시 이중 복구 위험).
# STT 도 장기 작업 가능성이 있으나 본 PR 범위 밖 — main 에 유지(follow-up).
MAIN_QUEUE_STAGES = [
"extract", "classify", "summarize",
"presegment", "extract", "classify", "summarize",
"preview", "stt", "thumbnail", "fulltext",
]
MARKDOWN_QUEUE_STAGES = ["markdown"]
@@ -165,6 +165,10 @@ async def enqueue_next_stage(document_id: int, current_stage: str):
}
next_stages = {
# G2 (PR-G2-2): 전 문서가 presegment → extract. 단, 번들 분할로 'parent' 가 된 문서는
# 파일 홀더라 자체 extract 안 함 — 아래 suppression 으로 이 전이를 건너뛴다(자식 extract 는
# presegment_worker 가 직접 enqueue). 단일/非PDF 문서(role NULL)는 정상적으로 extract 로 흐름.
"presegment": ["extract"],
"extract": ["classify", "preview"],
"classify": ["embed", "chunk", "markdown"],
"stt": ["classify"],
@@ -180,6 +184,18 @@ async def enqueue_next_stage(document_id: int, current_stage: str):
stages = extract_override_by_channel[sc]
else:
stages = next_stages.get(current_stage, [])
elif current_stage == "presegment":
# 번들 분할 parent 는 extract 로 흐르지 않게 억제 (자식이 부모 extract 에 가려지는 것 방지).
# role NULL(단일/非PDF) / 'child' 는 정상 전이. presegment_worker 가 자식 extract 를 직접
# enqueue 하므로 'parent' 만 여기서 no-op.
from models.document import Document
async with async_session() as lookup_session:
doc = await lookup_session.get(Document, document_id)
role = doc.presegment_role if doc else None
if role == "parent":
stages = []
else:
stages = next_stages.get(current_stage, [])
else:
stages = next_stages.get(current_stage, [])
@@ -199,6 +215,7 @@ def _load_workers():
from workers.deep_summary_worker import process as deep_summary_process
from workers.embed_worker import process as embed_process
from workers.extract_worker import process as extract_process
from workers.presegment_worker import process as presegment_process
from workers.preview_worker import process as preview_process
from workers.stt_worker import process as stt_process
from workers.summarize_worker import process as summarize_process
@@ -207,6 +224,8 @@ def _load_workers():
from workers.fulltext_worker import process as fulltext_process
return {
# G2 (PR-G2-2): extract 前 번들 PDF → N 자식 분할 (deterministic ToC). 非PDF/단일은 통과.
"presegment": presegment_process,
"extract": extract_process,
"classify": classify_process,
"summarize": summarize_process,
@@ -0,0 +1,10 @@
-- 362: G2 pre-segmentation — 번들 PDF(여러 논리문서 한 파일) → N 자식 문서 분할.
-- 자식 doc 의 원본 내 page 범위(1-based inclusive) + 분할 역할 표식.
-- 부모-자식 관계 자체는 document_lineage(relation_type='segmented_from', migration 363).
-- presegment_role: NULL=일반 단일문서(대다수) / 'parent'=번들원본(자체 extract/embed 안 함) /
-- 'child'=논리 하위문서(부모 file_path 공유 + bundle_page_start/end 범위로 슬라이스).
-- 단일 ALTER(다중 절) = 1 statement (asyncpg 멀티스테이트먼트 제약 준수).
ALTER TABLE documents
ADD COLUMN IF NOT EXISTS bundle_page_start INTEGER,
ADD COLUMN IF NOT EXISTS bundle_page_end INTEGER,
ADD COLUMN IF NOT EXISTS presegment_role TEXT;
@@ -0,0 +1,8 @@
-- 363: G2 — document_lineage.relation_type 에 'segmented_from'(번들 → 자식) 추가.
-- 217 의 column-level CHECK(PG 자동명 document_lineage_relation_type_check, 배포 DB 실측 확인)
-- 를 교체. DROP + ADD 를 단일 ALTER 의 두 절로 = 1 statement.
-- 멱등: DROP ... IF EXISTS 라 재실행 안전(이미 교체됐으면 새 제약 DROP 후 동일 재생성).
ALTER TABLE document_lineage
DROP CONSTRAINT IF EXISTS document_lineage_relation_type_check,
ADD CONSTRAINT document_lineage_relation_type_check
CHECK (relation_type IN ('cited','summarized_from','generated_from','revised_from','segmented_from'));
@@ -0,0 +1,5 @@
-- 364: G2 — process_stage 큐 스테이지 enum 에 'presegment' 추가 (extract 前 번들 분할 단계).
-- PG16: ALTER TYPE ADD VALUE 는 트랜잭션 내 실행 가능(값 추가만, 同 트랜잭션 내 사용은 안 함 —
-- 사용은 후속 마이그/런타임). IF NOT EXISTS = 재실행 멱등.
-- (이 한 줄 단독 파일 — 1 statement.)
ALTER TYPE process_stage ADD VALUE IF NOT EXISTS 'presegment';