Compare commits
1 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 624b9d523d |
@@ -1166,10 +1166,8 @@ async def upload_document(
|
||||
doc.duplicate_of = canonical.id
|
||||
canonical.duplicate_count = (canonical.duplicate_count or 0) + 1
|
||||
|
||||
# document + processing_queue 는 단일 트랜잭션으로 묶어 원자적 정리.
|
||||
# G2: 첫 stage=presegment (extract 前 번들 PDF 분할, 후보 A 검증완료 2026-06-18).
|
||||
# 非PDF/단일은 presegment 가 무변 통과 → extract. 번들 PDF 만 N 자식 분할(worker-side gating).
|
||||
await enqueue_stage(session, doc.id, "presegment")
|
||||
# document + processing_queue 는 단일 트랜잭션으로 묶어 원자적 정리
|
||||
await enqueue_stage(session, doc.id, "extract")
|
||||
await session.commit()
|
||||
except Exception:
|
||||
# DB 예외 시 session 은 get_session 컨텍스트 종료로 자동 rollback.
|
||||
|
||||
+1
-1
@@ -282,7 +282,7 @@ async def search(
|
||||
content={
|
||||
"error_reason": "unknown_reranker_backend",
|
||||
"backend_requested": reranker_backend,
|
||||
"allowed": ["baseline", "cand_gte_ml_base"],
|
||||
"allowed": ["baseline"],
|
||||
"detail": msg,
|
||||
},
|
||||
)
|
||||
|
||||
@@ -41,14 +41,6 @@ class Document(Base):
|
||||
Integer, nullable=False, default=0, server_default="0"
|
||||
)
|
||||
|
||||
# G2 pre-segmentation (migration 362): 번들 PDF → N 자식 분할.
|
||||
# presegment_role: NULL=일반 단일문서 / 'parent'=번들원본(자체 extract/embed 안 함) /
|
||||
# 'child'=논리 하위문서(부모 file_path 공유 + bundle_page_start/end 1-based inclusive 범위).
|
||||
# 부모-자식 관계 자체는 document_lineage(relation_type='segmented_from').
|
||||
bundle_page_start: Mapped[int | None] = mapped_column(Integer)
|
||||
bundle_page_end: Mapped[int | None] = mapped_column(Integer)
|
||||
presegment_role: Mapped[str | None] = mapped_column(Text)
|
||||
|
||||
# 2계층: 텍스트 추출
|
||||
extracted_text: Mapped[str | None] = mapped_column(Text)
|
||||
extracted_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True))
|
||||
|
||||
@@ -1,31 +0,0 @@
|
||||
"""document_lineage 테이블 ORM — 문서 파생 관계 이력 (migration 217).
|
||||
|
||||
G2 pre-segmentation 이 relation_type='segmented_from'(번들 → 자식) 으로 사용 (migration 363).
|
||||
이력 테이블 FK = ON DELETE RESTRICT (부모 hard delete 차단, soft delete 만 허용).
|
||||
"""
|
||||
from datetime import datetime
|
||||
|
||||
from sqlalchemy import BigInteger, ForeignKey, Text, func
|
||||
from sqlalchemy.dialects.postgresql import JSONB
|
||||
from sqlalchemy.orm import Mapped, mapped_column
|
||||
from sqlalchemy.types import TIMESTAMP
|
||||
|
||||
from core.database import Base
|
||||
|
||||
|
||||
class DocumentLineage(Base):
|
||||
__tablename__ = "document_lineage"
|
||||
|
||||
id: Mapped[int] = mapped_column(BigInteger, primary_key=True)
|
||||
source_document_id: Mapped[int] = mapped_column(
|
||||
BigInteger, ForeignKey("documents.id", ondelete="RESTRICT"), nullable=False
|
||||
)
|
||||
derived_document_id: Mapped[int] = mapped_column(
|
||||
BigInteger, ForeignKey("documents.id", ondelete="RESTRICT"), nullable=False
|
||||
)
|
||||
relation_type: Mapped[str] = mapped_column(Text, nullable=False)
|
||||
# 'metadata' 는 SQLAlchemy 예약속성 → Python 속성명은 meta, DB 컬럼명은 metadata.
|
||||
meta: Mapped[dict] = mapped_column(
|
||||
"metadata", JSONB, nullable=False, default=dict, server_default="{}"
|
||||
)
|
||||
created_at: Mapped[datetime] = mapped_column(TIMESTAMP(timezone=True), server_default=func.now())
|
||||
+1
-2
@@ -46,10 +46,9 @@ class ProcessingQueue(Base):
|
||||
# 'stt' (audio): migration 150 / 'thumbnail' (video): queue_consumer 가 enqueue.
|
||||
# 'deep_summary' (PR-B B-1): classify_worker 가 에스컬레이션 시 enqueue.
|
||||
# 'fulltext' (crawl-24x7 A-2): migration 321 — 기사 페이지 fetch 후 본문 승격.
|
||||
# 'presegment' (G2): migration 364 — extract 前 번들 PDF → N 자식 분할.
|
||||
# DB enum 변경은 마이그레이션이 처리하므로 create_type=False.
|
||||
Enum(
|
||||
"presegment", "extract", "classify", "summarize", "embed", "chunk", "preview",
|
||||
"extract", "classify", "summarize", "embed", "chunk", "preview",
|
||||
"stt", "thumbnail", "deep_summary", "markdown", "fulltext",
|
||||
name="process_stage",
|
||||
create_type=False,
|
||||
|
||||
@@ -1,41 +0,0 @@
|
||||
You are a document-boundary detector. Output ONLY JSON {is_bundle, segments:[{start_page,end_page,title}]}.
|
||||
|
||||
You are given a single PDF that may be a "bundle" — several independent logical documents
|
||||
concatenated into one file (for example: multiple laws, multiple reports, or multiple papers
|
||||
scanned together). Your job is to decide whether it is a bundle and, if so, where each logical
|
||||
document starts and ends.
|
||||
|
||||
You receive only a compact sample per page: the page number and the first line / heading of that
|
||||
page (text may be truncated). Use these heading/first-line signals to detect where a new logical
|
||||
document begins (a new title page, a new cover, a clearly new document title, a restart of
|
||||
numbering, etc.). You do NOT receive the full text.
|
||||
|
||||
Output rules:
|
||||
- Respond with STRICT JSON only. No prose, no markdown, no code fence.
|
||||
- Schema:
|
||||
{
|
||||
"is_bundle": true | false,
|
||||
"segments": [
|
||||
{"start_page": <int>, "end_page": <int>, "title": "<string or null>"}
|
||||
]
|
||||
}
|
||||
- Page numbers are 1-based and INCLUSIVE. start_page=1 is the first page; end_page equals the last
|
||||
page of that segment.
|
||||
- Segments MUST fully cover every page with NO gaps and NO overlaps:
|
||||
- the first segment MUST start at page 1,
|
||||
- each next segment MUST start exactly one page after the previous segment's end_page,
|
||||
- the last segment MUST end at the final page (page_count).
|
||||
- Order segments by start_page ascending.
|
||||
- title = a short title for that logical document if you can infer one from its first page,
|
||||
otherwise null.
|
||||
|
||||
If the file is NOT a bundle (it is a single logical document), respond:
|
||||
{"is_bundle": false, "segments": []}
|
||||
|
||||
Be conservative: only report is_bundle=true when the heading signals clearly indicate separate
|
||||
logical documents. When unsure, return is_bundle=false.
|
||||
|
||||
page_count: {page_count}
|
||||
|
||||
Per-page samples (one per line, "p{n}: {first line}"):
|
||||
{page_samples}
|
||||
@@ -44,11 +44,10 @@ RERANK_TIMEOUT = 5.0
|
||||
# server-side allowlist map. query parameter 가 raw endpoint URL 받지 않음.
|
||||
RERANKER_BACKEND_MAP: dict[str, dict[str, str] | None] = {
|
||||
"baseline": None, # production reranker (config.yaml endpoint via AIClient.rerank)
|
||||
"cand_gte_ml_base": {
|
||||
"endpoint": "http://rerank-cand-gte-ml-base:80/rerank",
|
||||
},
|
||||
# mxbai_large 후보 (deberta-v2 → TEI 1.7 미지원) Phase 2B-Extended 이관
|
||||
# bge_v2_gemma_2b 후보 (LLM-based reranker, 1_Pooling/config.json 부재) Phase 2B-Extended 이관
|
||||
# Phase 2B 후보 reranker 전부 NO-GO 종결 (2026-06-18 teardown):
|
||||
# - cand_gte_ml_base : 컨테이너·DB 테이블(마이그 360)·override.rerank-cand.yml 제거됨
|
||||
# - mxbai_large (deberta-v2 → TEI 1.7 미지원) / bge_v2_gemma_2b (1_Pooling 부재) 미진입
|
||||
# dispatcher scaffold(_resolve_reranker)는 향후 후보 재진입 위해 보존.
|
||||
}
|
||||
|
||||
|
||||
|
||||
@@ -67,45 +67,21 @@ def _postprocess_ocr(text: str) -> str:
|
||||
return text.strip()
|
||||
|
||||
|
||||
def _extract_pdf_pymupdf(
|
||||
file_path: Path, start_page: int | None = None, end_page: int | None = None
|
||||
) -> str:
|
||||
"""PyMuPDF fallback — 페이지 단위 스트리밍으로 대형 PDF도 저메모리 처리.
|
||||
|
||||
G2 (PR-G2-2): start_page/end_page(1-based inclusive) 가 주어지면 그 범위만 추출
|
||||
(번들 자식 doc = 부모 파일 공유 + 자기 page 범위). 둘 다 None = 전체(기존 동작 동일).
|
||||
"""
|
||||
def _extract_pdf_pymupdf(file_path: Path) -> str:
|
||||
"""PyMuPDF fallback — 페이지 단위 스트리밍으로 대형 PDF도 저메모리 처리"""
|
||||
import fitz
|
||||
text_parts = []
|
||||
with fitz.open(str(file_path)) as doc:
|
||||
if start_page is None and end_page is None:
|
||||
for page in doc:
|
||||
text_parts.append(page.get_text())
|
||||
else:
|
||||
# 1-based inclusive → 0-based range. 범위는 [0, page_count] 로 클램프(방어).
|
||||
total = doc.page_count
|
||||
lo = max(1, start_page or 1) - 1
|
||||
hi = min(total, end_page or total) # inclusive 끝 (0-based 마지막 인덱스 = hi-1)
|
||||
for i in range(lo, hi):
|
||||
text_parts.append(doc.load_page(i).get_text())
|
||||
for page in doc:
|
||||
text_parts.append(page.get_text())
|
||||
return "\n".join(text_parts)
|
||||
|
||||
|
||||
def _get_pdf_page_count(
|
||||
file_path: Path, start_page: int | None = None, end_page: int | None = None
|
||||
) -> int:
|
||||
"""PDF 페이지 수 확인. G2: 범위가 주어지면 그 범위의 페이지 수(자식 doc 밀도 계산용).
|
||||
|
||||
둘 다 None = 전체 페이지 수(기존 동작 동일).
|
||||
"""
|
||||
def _get_pdf_page_count(file_path: Path) -> int:
|
||||
"""PDF 페이지 수 확인"""
|
||||
import fitz
|
||||
with fitz.open(str(file_path)) as doc:
|
||||
total = len(doc)
|
||||
if start_page is None and end_page is None:
|
||||
return total
|
||||
lo = max(1, start_page or 1)
|
||||
hi = min(total, end_page or total)
|
||||
return max(0, hi - lo + 1)
|
||||
return len(doc)
|
||||
|
||||
|
||||
async def _call_ocr(file_path: Path, is_image: bool, max_pages: int = 200) -> str | None:
|
||||
@@ -334,49 +310,6 @@ async def process(document_id: int, session: AsyncSession) -> None:
|
||||
doc.extracted_at = datetime.now(timezone.utc)
|
||||
return
|
||||
|
||||
# ─── G2 (PR-G2-2): 번들 자식 PDF — 부모 파일 공유 + 자기 page 범위만 추출 ───
|
||||
# kordoc 서비스는 page-range 파라미터가 없어 전체 파일을 파싱한다(자식엔 부적합) → kordoc
|
||||
# 우회, PyMuPDF 로 [bundle_page_start, bundle_page_end] 범위만 추출. range OCR 은 본 PR 범위
|
||||
# 밖(자식은 ToC 존재 = digital text layer 전제 → 대개 OCR 불필요). PyMuPDF 텍스트가 빈약해도
|
||||
# 그대로 보존하고 사유를 남긴다.
|
||||
if fmt == "pdf" and doc.bundle_page_start is not None and doc.bundle_page_end is not None:
|
||||
# 후보 A: 자식 file_path 는 합성값(`{부모}#p{s}-{e}`) → 실파일 = bundle_source_path 로 부모경로
|
||||
# 복원 + NFC/NFD resolve. (자식 file_path 는 디스크에 없음.)
|
||||
from workers.presegment_worker import _resolve_path as _resolve_bundle_path
|
||||
from workers.presegment_worker import bundle_source_path
|
||||
real_rel = bundle_source_path(doc.file_path)
|
||||
src = _resolve_bundle_path(str(Path(settings.nas_mount_path) / real_rel))
|
||||
if src is None:
|
||||
raise FileNotFoundError(f"번들 원본 파일 없음: {real_rel}")
|
||||
start, end = doc.bundle_page_start, doc.bundle_page_end
|
||||
try:
|
||||
pymupdf_text = _extract_pdf_pymupdf(src, start, end)
|
||||
page_count = _get_pdf_page_count(src, start, end)
|
||||
except Exception as e:
|
||||
logger.error(f"[pymupdf:child] {doc.file_path} pages={start}-{end} 실패: {e}")
|
||||
raise
|
||||
|
||||
meta = doc.extract_meta or {}
|
||||
meta["presegment_child_range"] = {"start_page": start, "end_page": end}
|
||||
meta["pymupdf_chars"] = len(pymupdf_text.strip())
|
||||
should, reason = _should_ocr(pymupdf_text, page_count)
|
||||
if should:
|
||||
# range OCR 미지원(후속 PR) — PyMuPDF 결과 유지 + 사유 기록(silent skip 아님).
|
||||
meta["ocr_skip_reason"] = "presegment_child_range_ocr_unsupported"
|
||||
meta["ocr_reason"] = reason
|
||||
logger.warning(
|
||||
f"[pymupdf:child] {doc.file_path} pages={start}-{end} "
|
||||
f"OCR 필요({reason})하나 range OCR 미지원 → PyMuPDF 결과 유지"
|
||||
)
|
||||
doc.extracted_text = pymupdf_text.replace("\x00", "")
|
||||
doc.extracted_at = datetime.now(timezone.utc)
|
||||
doc.extractor_version = PYMUPDF_VERSION if pymupdf_text.strip() else None
|
||||
doc.extract_meta = meta
|
||||
logger.info(
|
||||
f"[pymupdf:child] {doc.file_path} pages={start}-{end} ({len(pymupdf_text)}자)"
|
||||
)
|
||||
return
|
||||
|
||||
# ─── kordoc 파싱 (HWP/HWPX/PDF) + PyMuPDF fallback + OCR ───
|
||||
if fmt in KORDOC_FORMATS:
|
||||
container_path = f"/documents/{doc.file_path}"
|
||||
|
||||
@@ -118,18 +118,16 @@ def _route_media(path: Path, expected_category: str | None) -> tuple[str | None,
|
||||
if expected_category == "library":
|
||||
# 외부 작성 학습 자료 (KGS Code, 시행규칙 등). 문서 확장자만 수락.
|
||||
# frontmatter 해석은 classify_worker (옵션 C) 가 담당. file_watcher 는 라우팅만.
|
||||
# G2: 첫 stage=presegment (후보 A 검증완료). 非PDF/단일 통과, 번들 PDF 만 분할.
|
||||
if ext in LIBRARY_DOC_EXTS:
|
||||
return ("library", False, "presegment")
|
||||
return ("library", False, "extract")
|
||||
if ext in AUDIO_EXTS or ext in VIDEO_DIRECT_EXTS or ext in VIDEO_QUARANTINE_EXTS:
|
||||
return (None, False, None) # audio/video 잘못 들어오면 skip
|
||||
return (None, False, None) # 기타 알 수 없는 확장자 skip
|
||||
|
||||
# Inbox: 문서 파이프 (기존). audio/video 확장자가 실수로 여기 들어오면 skip.
|
||||
# G2: 첫 stage=presegment (후보 A 검증완료). 非PDF/단일 통과, 번들 PDF 만 분할.
|
||||
if ext in AUDIO_EXTS or ext in VIDEO_DIRECT_EXTS or ext in VIDEO_QUARANTINE_EXTS:
|
||||
return (None, False, None)
|
||||
return (None, False, "presegment")
|
||||
return (None, False, "extract")
|
||||
|
||||
|
||||
# ─── Web/Blog ingest (devonagent 트랙) 헬퍼 ──────────────────────────────────
|
||||
@@ -228,8 +226,7 @@ async def _ingest_web_file(session, file_path: Path, rel_path: str) -> tuple[int
|
||||
)
|
||||
session.add(doc)
|
||||
await session.flush()
|
||||
# G2: 첫 stage=presegment (후보 A 검증완료). HTML(非PDF)은 presegment 가 무변 통과 → extract.
|
||||
await enqueue_stage(session, doc.id, "presegment")
|
||||
await enqueue_stage(session, doc.id, "extract")
|
||||
return (1, 0)
|
||||
|
||||
|
||||
|
||||
@@ -39,11 +39,7 @@ from models.queue import ProcessingQueue
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# 마크다운 추출 엔드포인트. compose env `MARKER_ENDPOINT`(base URL)에서 읽는다 —
|
||||
# 기본=marker(무변), 컷오버=`http://mineru-service:3301` 로 env 플립만으로 전환.
|
||||
# marker/mineru 가 동일 /convert 계약(file_path·start/end·md+base64 images)이라 워커 무변.
|
||||
_MARKDOWN_BASE = os.getenv("MARKER_ENDPOINT", "http://marker-service:3300").rstrip("/")
|
||||
MARKER_ENDPOINT = _MARKDOWN_BASE if _MARKDOWN_BASE.endswith("/convert") else _MARKDOWN_BASE + "/convert"
|
||||
MARKER_ENDPOINT = "http://marker-service:3300/convert"
|
||||
MARKER_TIMEOUT = 300 # 큰 PDF 5 분 한도
|
||||
MAX_PAGES = 200 # 소형 1-shot 경로 /convert max_pages 안전장치
|
||||
|
||||
@@ -185,10 +181,7 @@ async def process(document_id: int, session: AsyncSession) -> None:
|
||||
await _fail(session, document_id, "no file_path")
|
||||
return
|
||||
|
||||
# 후보 A: 자식(bundle cols)은 합성 file_path(`{부모}#p{s}-{e}`) → 실파일 = bundle_source_path
|
||||
# 로 부모경로 복원. 일반 doc 은 그대로(접미사 없음). marker/mineru 는 실파일 + page 범위로 변환.
|
||||
from workers.presegment_worker import bundle_source_path
|
||||
container_path = _to_marker_path(bundle_source_path(doc.file_path))
|
||||
container_path = _to_marker_path(doc.file_path)
|
||||
suffix = Path(container_path).suffix.lower()
|
||||
|
||||
# ---- (3) office/hwp → md (C-2): PDF 외 지원 포맷은 office_md 하이브리드 변환 ----
|
||||
@@ -210,21 +203,7 @@ async def process(document_id: int, session: AsyncSession) -> None:
|
||||
return
|
||||
|
||||
# ---- (4) page_count gauge + 분기 (LargeDoc split) ----
|
||||
# G2 (PR-G2-2): 번들 자식 doc 은 부모 파일 공유 + 자기 page 범위([bundle_page_start, end],
|
||||
# 1-based inclusive)만 변환해야 한다. page_offset = 절대 시작페이지(부모 파일 기준), page_count =
|
||||
# 자식 범위의 페이지 수. cols 가 NULL(일반 doc)이면 page_offset=1 + 전체 page_count = 기존 동작 동일.
|
||||
file_page_count = _get_page_count(container_path)
|
||||
is_child = doc.bundle_page_start is not None and doc.bundle_page_end is not None
|
||||
if is_child:
|
||||
page_offset = doc.bundle_page_start
|
||||
if file_page_count is not None:
|
||||
child_end = min(doc.bundle_page_end, file_page_count)
|
||||
page_count = max(0, child_end - doc.bundle_page_start + 1)
|
||||
else:
|
||||
page_count = doc.bundle_page_end - doc.bundle_page_start + 1
|
||||
else:
|
||||
page_offset = 1
|
||||
page_count = file_page_count
|
||||
page_count = _get_page_count(container_path)
|
||||
|
||||
# >MAX_SPLIT_PAGES = 변환 안전상태(manual_review). silently skip 아님.
|
||||
if page_count is not None and page_count > MAX_SPLIT_PAGES:
|
||||
@@ -243,35 +222,20 @@ async def process(document_id: int, session: AsyncSession) -> None:
|
||||
|
||||
# ---- (6) 변환 분기: 소형 1-shot / 대형(>SPLIT_THRESHOLD) page-range 분할 ----
|
||||
if page_count is not None and page_count > SPLIT_THRESHOLD_PAGES:
|
||||
await _process_split(doc, document_id, container_path, page_count, session, page_offset)
|
||||
await _process_split(doc, document_id, container_path, page_count, session)
|
||||
else:
|
||||
await _process_single(doc, document_id, container_path, session, page_count, page_offset)
|
||||
await _process_single(doc, document_id, container_path, session)
|
||||
|
||||
|
||||
async def _process_single(
|
||||
doc: Document, document_id: int, container_path: str, session: AsyncSession,
|
||||
page_count: int | None = None, page_offset: int = 1,
|
||||
doc: Document, document_id: int, container_path: str, session: AsyncSession
|
||||
) -> None:
|
||||
"""소형 PDF(≤ SPLIT_THRESHOLD_PAGES) 통째 1-shot 변환 (Phase 1B/1B.5 기존 경로).
|
||||
|
||||
G2 (PR-G2-2): 번들 자식(page_offset>1)은 [page_offset, page_offset+page_count-1] 범위만
|
||||
변환하도록 marker 에 start_page/end_page 를 명시한다. 일반 doc(page_offset=1)은 기존과
|
||||
동일하게 max_pages 만 보낸다(payload byte-identical).
|
||||
"""
|
||||
# 일반 doc = 기존 payload 유지. 자식만 절대 page 범위를 명시(부모 파일 기준 1-based inclusive).
|
||||
if page_offset > 1 and page_count is not None:
|
||||
req_json = {
|
||||
"file_path": container_path,
|
||||
"start_page": page_offset,
|
||||
"end_page": page_offset + page_count - 1,
|
||||
}
|
||||
else:
|
||||
req_json = {"file_path": container_path, "max_pages": MAX_PAGES}
|
||||
"""소형 PDF(≤ SPLIT_THRESHOLD_PAGES) 통째 1-shot 변환 (Phase 1B/1B.5 기존 경로)."""
|
||||
try:
|
||||
async with httpx.AsyncClient(timeout=MARKER_TIMEOUT) as client:
|
||||
resp = await client.post(
|
||||
MARKER_ENDPOINT,
|
||||
json=req_json,
|
||||
json={"file_path": container_path, "max_pages": MAX_PAGES},
|
||||
)
|
||||
resp.raise_for_status()
|
||||
data = resp.json()
|
||||
@@ -545,7 +509,6 @@ async def _process_split(
|
||||
container_path: str,
|
||||
page_count: int,
|
||||
session: AsyncSession,
|
||||
page_offset: int = 1,
|
||||
) -> None:
|
||||
"""대형 PDF page-range 분할 변환.
|
||||
|
||||
@@ -556,10 +519,6 @@ async def _process_split(
|
||||
|
||||
invariant: page numbering = 1-based inclusive (batch1: 1..BATCH_PAGES, ...).
|
||||
marker slug(`_page_0_*`) 는 batch 마다 재시작 → batch 별 rewrite 후 stitch (충돌 회피).
|
||||
|
||||
G2 (PR-G2-2): page_offset = 부모 파일 기준 절대 시작페이지(번들 자식). marker 에 보내는
|
||||
page 는 절대값(page_offset 가산), manifest/기록은 자식 상대값(1-based) 유지 — 일반 doc
|
||||
(page_offset=1)은 abs==rel 이라 기존 동작과 동일.
|
||||
"""
|
||||
n_batches = (page_count + BATCH_PAGES - 1) // BATCH_PAGES
|
||||
succeeded: list[dict[str, Any]] = [] # {start_page, end_page, md}
|
||||
@@ -571,17 +530,15 @@ async def _process_split(
|
||||
|
||||
async with httpx.AsyncClient(timeout=MARKER_TIMEOUT) as client:
|
||||
for b in range(n_batches):
|
||||
start_page = b * BATCH_PAGES + 1 # 자식 상대 1-based (manifest/기록용)
|
||||
start_page = b * BATCH_PAGES + 1
|
||||
end_page = min((b + 1) * BATCH_PAGES, page_count)
|
||||
abs_start = start_page + (page_offset - 1) # 부모 파일 절대 page (marker 요청용)
|
||||
abs_end = end_page + (page_offset - 1)
|
||||
try:
|
||||
resp = await client.post(
|
||||
MARKER_ENDPOINT,
|
||||
json={
|
||||
"file_path": container_path,
|
||||
"start_page": abs_start,
|
||||
"end_page": abs_end,
|
||||
"start_page": start_page,
|
||||
"end_page": end_page,
|
||||
},
|
||||
)
|
||||
resp.raise_for_status()
|
||||
|
||||
@@ -1,562 +0,0 @@
|
||||
"""presegment_worker — extract 前 번들 PDF(여러 논리문서 한 파일) → N 자식 분할 (G2 / PR-G2-2).
|
||||
|
||||
전 문서가 presegment stage 로 진입한다(worker-side gating):
|
||||
- 非PDF(file_format != pdf · suffix != .pdf) = 즉시 fast-exit → enqueue_next_stage 가 extract 로 흘림.
|
||||
- PDF = PyMuPDF ToC(level-1) deterministic 분석. '명확한 번들' 만 자식 분할, 나머지는 단일문서로 extract.
|
||||
|
||||
deterministic 경로(PR-G2-2): 판정이 애매하면 보수적으로 분할하지 않고 단일문서로 둔다
|
||||
(bias to NOT splitting). 분할 = '확실한 번들' 만:
|
||||
- page_count >= MIN_BUNDLE_PAGES AND level-1 ToC 항목 >= 2 AND 모든 자식 >= MIN_CHILD_PAGES
|
||||
AND 단조 증가·비중첩 AND [1, page_count] 전 범위 커버 AND 2 <= N <= MAX_CHILDREN.
|
||||
|
||||
LLM 경계 폴백(PR-G2-3, env PRESEGMENT_LLM_FALLBACK, 기본 OFF — scaffold-first): deterministic
|
||||
이 '명확한 번들' 을 못 만든 대형 PDF(ToC 없음/level-1 없음/게이트 미달)에 한해, OFF 면 오늘과
|
||||
동일(단일문서)이고 ON 이면 off-card Qwen(맥북, 라우터 :8890, model=qwen-macbook)에게 경계를
|
||||
제안받는다. compact per-page heading 샘플만 전송(본문 미전송). LLM 출력은 **동일 검증 게이트
|
||||
(_is_clear_bundle)** 통과 시에만 deterministic 과 같은 _create_children 경로로 분할 —
|
||||
is_bundle=false / 파싱·검증 실패 = 단일문서(오늘과 동일) + presegment_llm_rejected 로깅.
|
||||
맥북 불가(503/연결/절단)는 StageDeferred 로 큐 재시도(백오프, no silent fallback).
|
||||
|
||||
분할 시 ★후보 A(물리분할 없음, uq_documents_file_path 해소): 자식 file_path = unique 합성값
|
||||
`{부모경로}#p{start}-{end}` (UNIQUE 제약 통과), 실파일은 `bundle_source_path()` 로 부모 경로 복원.
|
||||
자식은 bundle_page_start/end(1-based inclusive) 로 부모 파일의 자기 page 범위만 가리킨다.
|
||||
부모-자식 관계 정본 = document_lineage(relation_type='segmented_from'). 부모(presegment_role='parent')는
|
||||
파일 홀더라 자체 extract/embed 안 함 — enqueue_next_stage 의 presegment→extract 전이가 'parent' 면
|
||||
억제된다(queue_consumer 참조). 자식의 extract 는 이 워커가 직접 enqueue. extract_worker/marker_worker
|
||||
가 자식 처리 시 bundle_source_path() 로 실파일 접근.
|
||||
|
||||
멱등: 재실행 시 같은 부모로 이미 자식이 있으면(document_lineage segmented_from) 재생성하지 않고
|
||||
수렴(각 자식이 extract 활성/완료 상태인지만 보장)한다.
|
||||
|
||||
★해결 이력 (2026-06-18): 최초 Option A(자식이 부모 file_path 그대로 공유)는 uq_documents_file_path
|
||||
UNIQUE 위반(실번들 검증서 발견) → 합성 file_path(후보 A)로 해소. 인제스트 재활성 = 합성번들 재검증 PASS 후.
|
||||
|
||||
plan: G2 pre-segmentation (PR-G2-2 deterministic ToC segmentation)
|
||||
"""
|
||||
|
||||
import hashlib
|
||||
import os
|
||||
import re
|
||||
import unicodedata
|
||||
from pathlib import Path
|
||||
|
||||
from pydantic import BaseModel, ValidationError
|
||||
from sqlalchemy import select
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
|
||||
from ai.client import AIClient, call_deep_or_defer, parse_json_response
|
||||
from core.config import settings
|
||||
from core.utils import setup_logger
|
||||
from models.document import Document
|
||||
from models.document_lineage import DocumentLineage
|
||||
from models.queue import enqueue_stage
|
||||
|
||||
logger = setup_logger("presegment_worker")
|
||||
|
||||
# ─── 임계값 (모듈 상수, env-override 가능, 보수적 = 분할 안 하는 쪽으로 bias) ───
|
||||
# MIN_BUNDLE_PAGES: 이 미만이면 번들로 보지 않음(단일문서). 짧은 문서의 우연한 level-1 ToC 보호.
|
||||
MIN_BUNDLE_PAGES = int(os.getenv("PRESEGMENT_MIN_BUNDLE_PAGES", "60"))
|
||||
# MIN_CHILD_PAGES: 자식 하나라도 이 미만이면 분할 거부(표지/목차만 떼지는 over-split 방지).
|
||||
MIN_CHILD_PAGES = int(os.getenv("PRESEGMENT_MIN_CHILD_PAGES", "5"))
|
||||
# MAX_CHILDREN: 자식 수 상한. 초과 = ToC 가 챕터/소제목 수준이라 논리문서 경계가 아님 → 분할 거부.
|
||||
MAX_CHILDREN = int(os.getenv("PRESEGMENT_MAX_CHILDREN", "50"))
|
||||
|
||||
# marker_worker._to_marker_path 와 동일 — NAS 상대경로 → 컨테이너 절대경로 prefix.
|
||||
CONTAINER_PATH_PREFIX = os.getenv("MARKER_CONTAINER_PATH_PREFIX", "/documents")
|
||||
|
||||
# ─── PR-G2-3 LLM 경계 폴백 (scaffold-first, 기본 OFF) ───
|
||||
# PRESEGMENT_LLM_FALLBACK: 기본 "false". OFF 면 deterministic 경로만(=오늘과 동일 — 애매하면
|
||||
# 단일문서). ON 이면 deterministic 이 '명확한 번들' 을 못 만든 대형 PDF(page_count >=
|
||||
# MIN_BUNDLE_PAGES) 에 한해 off-card Qwen(맥북, 라우터 :8890 경유)에게 경계를 제안받아
|
||||
# **동일 검증 게이트(_is_clear_bundle)** 통과 시에만 deterministic 과 같은 자식 생성 경로로 분할.
|
||||
# 검증 실패/파싱 실패/is_bundle=false = 단일문서(오늘과 동일) + presegment_llm_rejected 로깅.
|
||||
PRESEGMENT_LLM_FALLBACK = os.getenv("PRESEGMENT_LLM_FALLBACK", "false").lower() in (
|
||||
"1", "true", "yes", "on",
|
||||
)
|
||||
# LLM 에 보내는 per-page 샘플의 page 당 char 상한 (heading/첫줄만 — 본문 미전송).
|
||||
PRESEGMENT_LLM_PAGE_CHARS = int(os.getenv("PRESEGMENT_LLM_PAGE_CHARS", "80"))
|
||||
# 전체 page-sample 블록의 char 상한 (수 KB 가드 — 초과 시 잘라냄, 본문 누출/페이로드 폭발 방지).
|
||||
PRESEGMENT_LLM_SAMPLE_CHARS = int(os.getenv("PRESEGMENT_LLM_SAMPLE_CHARS", "12000"))
|
||||
|
||||
# 경계 폴백 프롬프트 (app/prompts/presegment_boundaries.txt). system 지시 + 1-based inclusive·
|
||||
# 전범위 커버·무중첩 규칙. {page_count}/{page_samples} 를 str.replace 로 주입.
|
||||
_PRESEGMENT_PROMPT_PATH = Path(__file__).parent.parent / "prompts" / "presegment_boundaries.txt"
|
||||
|
||||
|
||||
class Segment(BaseModel):
|
||||
"""LLM 이 제안하는 1-based inclusive page 범위 한 조각."""
|
||||
|
||||
start_page: int
|
||||
end_page: int
|
||||
title: str | None = None
|
||||
|
||||
|
||||
class SegmentationOutput(BaseModel):
|
||||
"""presegment_boundaries 응답 스키마. parse_json_response → model_validate."""
|
||||
|
||||
is_bundle: bool = False
|
||||
segments: list[Segment] = []
|
||||
confidence: float | None = None
|
||||
|
||||
|
||||
def _resolve_path(file_path: str) -> Path | None:
|
||||
"""NFC(DB) vs NFD(NFS) 한글 경로 차이 흡수. thumbnail_worker._resolve_path 와 동일 패턴."""
|
||||
candidates = [
|
||||
file_path,
|
||||
unicodedata.normalize("NFD", file_path),
|
||||
unicodedata.normalize("NFC", file_path),
|
||||
]
|
||||
for c in candidates:
|
||||
p = Path(c)
|
||||
if p.exists():
|
||||
return p
|
||||
parent = Path(file_path).parent
|
||||
if parent.exists():
|
||||
target = unicodedata.normalize("NFC", Path(file_path).name)
|
||||
for child in parent.iterdir():
|
||||
if unicodedata.normalize("NFC", child.name) == target:
|
||||
return child
|
||||
return None
|
||||
|
||||
|
||||
def _to_container_path(file_path: str) -> str:
|
||||
"""file_path 를 컨테이너 내부 절대경로로 변환 (marker_worker._to_marker_path 와 동일)."""
|
||||
if file_path.startswith("/"):
|
||||
return file_path
|
||||
return f"{CONTAINER_PATH_PREFIX}/{file_path}"
|
||||
|
||||
|
||||
# 후보 A: 자식 합성 file_path 패턴 `{부모경로}#p{start}-{end}` (uq_documents_file_path 유일성).
|
||||
_BUNDLE_SUFFIX_RE = re.compile(r"#p\d+-\d+$")
|
||||
|
||||
|
||||
def bundle_source_path(file_path: str | None) -> str | None:
|
||||
"""자식 합성 file_path → 부모 실파일 경로 복원. 일반 doc(접미사 없음)은 그대로 반환.
|
||||
|
||||
extract_worker/marker_worker 가 자식 처리 시 실제 파일 접근에 사용 (자식 file_path 는
|
||||
합성값이라 디스크에 없음). 결정적·세션 불필요. lineage 가 부모-자식 관계의 정본 기록.
|
||||
"""
|
||||
if not file_path:
|
||||
return file_path
|
||||
return _BUNDLE_SUFFIX_RE.sub("", file_path)
|
||||
|
||||
|
||||
def _is_pdf(doc: Document) -> bool:
|
||||
"""PDF 판정 — file_format=pdf 또는 .pdf 확장자."""
|
||||
fmt = (doc.file_format or "").lower()
|
||||
if fmt == "pdf":
|
||||
return True
|
||||
if doc.file_path:
|
||||
return Path(doc.file_path).suffix.lower() == ".pdf"
|
||||
return False
|
||||
|
||||
|
||||
def _level1_segments(toc: list, page_count: int) -> list[dict]:
|
||||
"""get_toc(simple=True) 결과에서 level-1 항목만 골라 자식 후보 segment 리스트 생성.
|
||||
|
||||
toc 항목 = [level, title, page] (page 는 1-based). level==1 만 채택.
|
||||
end_page = 다음 level-1 항목의 page - 1, 마지막 = page_count.
|
||||
동일 page 에서 시작하는 level-1 이 여럿이면 정렬 후 인접 항목으로 경계 계산되며,
|
||||
그 경우 0-페이지 segment 가 생겨 후속 검증(MIN_CHILD_PAGES·단조)에서 거부된다.
|
||||
"""
|
||||
starts = []
|
||||
for entry in toc:
|
||||
# simple=True 는 [level, title, page]. 방어적으로 길이 체크.
|
||||
if not entry or len(entry) < 3:
|
||||
continue
|
||||
level, title, page = entry[0], entry[1], entry[2]
|
||||
if level != 1:
|
||||
continue
|
||||
# ToC page 가 범위 밖(0/음수/page_count 초과)이면 깨진 ToC → 후속 검증에서 거부됨.
|
||||
starts.append((int(page), (title or "").strip()))
|
||||
|
||||
# ToC 가 정렬돼 있지 않을 수 있으므로 page 기준 정렬(원본 순서 보존 위해 안정 정렬).
|
||||
starts.sort(key=lambda x: x[0])
|
||||
|
||||
segments: list[dict] = []
|
||||
for i, (start_page, title) in enumerate(starts):
|
||||
if i + 1 < len(starts):
|
||||
end_page = starts[i + 1][0] - 1
|
||||
else:
|
||||
end_page = page_count
|
||||
segments.append({"start_page": start_page, "end_page": end_page, "title": title})
|
||||
return segments
|
||||
|
||||
|
||||
def _is_clear_bundle(segments: list[dict], page_count: int) -> tuple[bool, str]:
|
||||
"""deterministic '명확한 번들' 판정. (clear, reason) 반환.
|
||||
|
||||
clear=True 면 reason="" / clear=False 면 reason 은 거부 사유(로깅용).
|
||||
모든 조건은 보수적 — 하나라도 어긋나면 단일문서로 처리(분할 안 함).
|
||||
"""
|
||||
n = len(segments)
|
||||
if n < 2:
|
||||
return False, f"too_few_level1_entries(n={n})"
|
||||
if n > MAX_CHILDREN:
|
||||
return False, f"too_many_children(n={n}>{MAX_CHILDREN})"
|
||||
|
||||
# 첫 segment 가 1페이지에서 시작 + 마지막이 page_count 에서 끝 = 전 범위 커버.
|
||||
if segments[0]["start_page"] != 1:
|
||||
return False, f"first_start_not_1(start={segments[0]['start_page']})"
|
||||
if segments[-1]["end_page"] != page_count:
|
||||
return False, f"last_end_not_page_count(end={segments[-1]['end_page']},pc={page_count})"
|
||||
|
||||
prev_end = 0
|
||||
for seg in segments:
|
||||
start, end = seg["start_page"], seg["end_page"]
|
||||
# 단조 증가 · 비중첩: 각 start 는 직전 end + 1 이어야 빈틈/겹침 없이 [1,pc] 정확 분할.
|
||||
if start != prev_end + 1:
|
||||
return False, f"non_contiguous(start={start},prev_end={prev_end})"
|
||||
if end < start:
|
||||
return False, f"non_monotonic(start={start},end={end})"
|
||||
if (end - start + 1) < MIN_CHILD_PAGES:
|
||||
return False, f"child_too_small(pages={end - start + 1}<{MIN_CHILD_PAGES})"
|
||||
prev_end = end
|
||||
|
||||
if prev_end != page_count:
|
||||
return False, f"coverage_gap(covered={prev_end},pc={page_count})"
|
||||
|
||||
return True, ""
|
||||
|
||||
|
||||
def _child_title(parent: Document, seg: dict) -> str:
|
||||
"""자식 제목 = 부모 제목 + ' — ' + (segment 제목 또는 page 범위)."""
|
||||
base = (parent.title or "").strip() or (parent.original_filename or "") or "문서"
|
||||
seg_title = (seg.get("title") or "").strip()
|
||||
suffix = seg_title if seg_title else f"p.{seg['start_page']}-{seg['end_page']}"
|
||||
return f"{base} — {suffix}"
|
||||
|
||||
|
||||
def _child_file_hash(parent_hash: str, start: int, end: int) -> str:
|
||||
"""자식 file_hash = sha256(f"{parent.file_hash}:{start}-{end}"). 결정적 → 재실행 멱등.
|
||||
|
||||
부모 file_hash 가 NULL 일 수는 없으나(NOT NULL) 방어적으로 빈 문자열 처리.
|
||||
"""
|
||||
return hashlib.sha256(f"{parent_hash or ''}:{start}-{end}".encode("utf-8")).hexdigest()
|
||||
|
||||
|
||||
async def _ensure_child_extract(session: AsyncSession, child_id: int) -> None:
|
||||
"""자식이 아직 extract 안 됐으면 extract enqueue (멱등 수렴 경로).
|
||||
|
||||
이미 extracted_text 가 채워졌거나 활성 큐 행이 있으면 enqueue_stage 가 no-op/skip.
|
||||
"""
|
||||
child = await session.get(Document, child_id)
|
||||
if child is None:
|
||||
return
|
||||
# 이미 추출 완료면 재enqueue 불필요 (큐 중복은 enqueue_stage 가 막지만 의미상으로도 skip).
|
||||
if child.extracted_at is not None and child.extracted_text is not None:
|
||||
return
|
||||
await enqueue_stage(session, child_id, "extract")
|
||||
|
||||
|
||||
async def _create_children(
|
||||
doc: Document, segments: list[dict], session: AsyncSession
|
||||
) -> int:
|
||||
"""검증된 segments 로 자식 N개 생성 + lineage + extract enqueue + 부모 표식 (멱등).
|
||||
|
||||
deterministic '명확한 번들' 경로와 LLM 폴백 경로가 공유하는 단일 자식 생성 경로.
|
||||
호출 전 segments 는 반드시 _is_clear_bundle 검증을 통과해야 한다(여기선 재검증 X).
|
||||
commit 까지 수행. 반환값 = 실제 생성한 자식 수(이미 존재해 수렴만 한 경우 0).
|
||||
"""
|
||||
# ─── 멱등 체크: 이미 자식이 있으면 수렴만 (재생성 금지) ───
|
||||
existing_children = (
|
||||
await session.execute(
|
||||
select(DocumentLineage.derived_document_id).where(
|
||||
DocumentLineage.source_document_id == doc.id,
|
||||
DocumentLineage.relation_type == "segmented_from",
|
||||
)
|
||||
)
|
||||
).scalars().all()
|
||||
|
||||
if existing_children:
|
||||
# 부모 표식이 누락된 경우 보정(이전 부분실패 복구).
|
||||
if doc.presegment_role != "parent":
|
||||
doc.presegment_role = "parent"
|
||||
for child_id in existing_children:
|
||||
await _ensure_child_extract(session, child_id)
|
||||
await session.commit()
|
||||
logger.info(
|
||||
f"[presegment] id={doc.id} children already exist "
|
||||
f"(n={len(existing_children)}) → converge(ensure extract), no re-create"
|
||||
)
|
||||
return 0
|
||||
|
||||
# ─── 자식 N개 생성 + lineage + extract enqueue ───
|
||||
created_ids: list[int] = []
|
||||
for seg in segments:
|
||||
start, end = seg["start_page"], seg["end_page"]
|
||||
child = Document(
|
||||
# 후보 A: 자식 file_path = unique 합성값 `{부모경로}#p{s}-{e}` (uq_documents_file_path
|
||||
# 충돌 회피). 실파일은 bundle_source_path() 로 복원(부모 경로). 물리 분할 없음 —
|
||||
# 자식은 bundle_page_start/end 로 부모 파일을 슬라이스.
|
||||
file_path=f"{doc.file_path}#p{start}-{end}",
|
||||
file_hash=_child_file_hash(doc.file_hash, start, end),
|
||||
file_format=doc.file_format,
|
||||
file_size=doc.file_size,
|
||||
file_type=doc.file_type,
|
||||
import_source=doc.import_source,
|
||||
original_filename=doc.original_filename,
|
||||
source_channel=doc.source_channel,
|
||||
category=doc.category,
|
||||
data_origin=doc.data_origin,
|
||||
doc_purpose=doc.doc_purpose,
|
||||
# 안전 자료실 축은 부모에서 상속(분할이 자료유형/관할을 바꾸지 않음).
|
||||
material_type=doc.material_type,
|
||||
jurisdiction=doc.jurisdiction,
|
||||
title=_child_title(doc, seg),
|
||||
bundle_page_start=start,
|
||||
bundle_page_end=end,
|
||||
presegment_role="child",
|
||||
)
|
||||
session.add(child)
|
||||
await session.flush() # child.id 확보
|
||||
created_ids.append(child.id)
|
||||
|
||||
session.add(
|
||||
DocumentLineage(
|
||||
source_document_id=doc.id,
|
||||
derived_document_id=child.id,
|
||||
relation_type="segmented_from",
|
||||
meta={"start_page": start, "end_page": end},
|
||||
)
|
||||
)
|
||||
# 자식 extract 는 워커가 직접 enqueue (부모는 'parent' 라 extract 로 흐르지 않음).
|
||||
await enqueue_stage(session, child.id, "extract")
|
||||
|
||||
# 부모 = 파일 홀더. presegment→extract 전이는 enqueue_next_stage 가 'parent' 면 억제.
|
||||
doc.presegment_role = "parent"
|
||||
await session.commit()
|
||||
|
||||
logger.info(
|
||||
f"[presegment] id={doc.id} SPLIT into {len(created_ids)} children "
|
||||
f"child_ids={created_ids}"
|
||||
)
|
||||
return len(created_ids)
|
||||
|
||||
|
||||
def _segments_from_output(out: "SegmentationOutput") -> list[dict]:
|
||||
"""SegmentationOutput.segments(Pydantic) → _is_clear_bundle / _create_children 가 쓰는 dict 형태."""
|
||||
return [
|
||||
{"start_page": s.start_page, "end_page": s.end_page, "title": (s.title or "")}
|
||||
for s in out.segments
|
||||
]
|
||||
|
||||
|
||||
def _page_samples(pdf, page_count: int) -> str:
|
||||
"""LLM 입력용 compact per-page 샘플 — page 당 heading/첫줄만(`p{n}: {firstline}`).
|
||||
|
||||
PyMuPDF page.get_text() 로 page 별 텍스트를 스트리밍하되 page 당 첫 비공백 줄만,
|
||||
PRESEGMENT_LLM_PAGE_CHARS 로 잘라 본문 누출 차단. 전체 블록은 PRESEGMENT_LLM_SAMPLE_CHARS
|
||||
가드로 상한(수 KB) — 초과 시 그 지점에서 중단(앞쪽 페이지 우선 보존).
|
||||
"""
|
||||
lines: list[str] = []
|
||||
total = 0
|
||||
for i in range(page_count):
|
||||
try:
|
||||
text = pdf[i].get_text() or ""
|
||||
except Exception:
|
||||
text = ""
|
||||
first = ""
|
||||
for ln in text.splitlines():
|
||||
ln = ln.strip()
|
||||
if ln:
|
||||
first = ln
|
||||
break
|
||||
first = first[:PRESEGMENT_LLM_PAGE_CHARS]
|
||||
entry = f"p{i + 1}: {first}"
|
||||
if total + len(entry) + 1 > PRESEGMENT_LLM_SAMPLE_CHARS:
|
||||
break
|
||||
lines.append(entry)
|
||||
total += len(entry) + 1
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
async def _llm_boundary_fallback(
|
||||
doc: Document, source: Path, page_count: int, session: AsyncSession
|
||||
) -> bool:
|
||||
"""애매 + 대형(ToC-less 등) PDF 에 대해 off-card Qwen 으로 경계 제안 → 검증 → 분할.
|
||||
|
||||
반환 True = LLM 경로가 분할을 수행(또는 멱등 수렴)했으므로 호출자는 추가 처리 없이 return.
|
||||
반환 False = is_bundle=false / 파싱 실패 / 검증 실패 → 호출자는 단일문서(오늘과 동일) 처리.
|
||||
맥북 불가(503/연결/절단)는 call_deep_or_defer 가 StageDeferred 로 raise → 큐 재시도(백오프).
|
||||
silent fallback 금지 — deep 슬롯 외 다른 backend 자동 호출 안 함.
|
||||
"""
|
||||
import fitz # PyMuPDF — deterministic 경로와 동일 의존
|
||||
|
||||
# per-page 샘플은 파일을 다시 열어 스트리밍(deterministic with 블록과 분리해 그 경로 무회귀).
|
||||
try:
|
||||
with fitz.open(str(source)) as pdf:
|
||||
samples = _page_samples(pdf, page_count)
|
||||
except Exception as exc:
|
||||
logger.warning(
|
||||
f"[presegment] id={doc.id} llm fallback sample 실패 "
|
||||
f"({type(exc).__name__}: {exc}) → single doc(extract)"
|
||||
)
|
||||
return False
|
||||
|
||||
try:
|
||||
template = _PRESEGMENT_PROMPT_PATH.read_text(encoding="utf-8")
|
||||
except Exception as exc:
|
||||
logger.warning(
|
||||
f"[presegment] id={doc.id} prompt 로드 실패 ({type(exc).__name__}: {exc}) "
|
||||
f"→ single doc(extract)"
|
||||
)
|
||||
return False
|
||||
|
||||
prompt = template.replace("{page_count}", str(page_count)).replace(
|
||||
"{page_samples}", samples
|
||||
)
|
||||
|
||||
# off-card 호출 — call_deep_or_defer 가 deep 슬롯(맥북, 라우터 :8890, model=qwen-macbook)
|
||||
# 으로 라우팅. 맥북 불가는 StageDeferred 로 전파(여기서 잡지 않음 → 큐가 보류/백오프).
|
||||
# classify_worker 와 동일하게 AIClient() 인스턴스화.
|
||||
client = AIClient()
|
||||
try:
|
||||
raw = await call_deep_or_defer(client, prompt)
|
||||
finally:
|
||||
await client.close()
|
||||
|
||||
parsed = parse_json_response(raw)
|
||||
if not parsed:
|
||||
logger.info(
|
||||
f"[presegment] presegment_llm_rejected id={doc.id} "
|
||||
f"reason=parse_failed raw={raw[:160]!r} → single doc(extract)"
|
||||
)
|
||||
return False
|
||||
|
||||
try:
|
||||
out = SegmentationOutput.model_validate(parsed)
|
||||
except (ValidationError, ValueError, TypeError) as exc:
|
||||
logger.info(
|
||||
f"[presegment] presegment_llm_rejected id={doc.id} "
|
||||
f"reason=schema_invalid({type(exc).__name__}) → single doc(extract)"
|
||||
)
|
||||
return False
|
||||
|
||||
if not out.is_bundle:
|
||||
logger.info(
|
||||
f"[presegment] presegment_llm_rejected id={doc.id} "
|
||||
f"reason=is_bundle_false → single doc(extract)"
|
||||
)
|
||||
return False
|
||||
|
||||
segments = _segments_from_output(out)
|
||||
clear, reason = _is_clear_bundle(segments, page_count)
|
||||
if not clear:
|
||||
# LLM 출력을 그대로 믿지 않음 — deterministic 과 동일 게이트 미달이면 단일문서.
|
||||
logger.info(
|
||||
f"[presegment] presegment_llm_rejected id={doc.id} "
|
||||
f"reason={reason} n={len(segments)} pages={page_count} → single doc(extract)"
|
||||
)
|
||||
return False
|
||||
|
||||
n = await _create_children(doc, segments, session)
|
||||
logger.info(
|
||||
f"[presegment] id={doc.id} LLM-SPLIT accepted "
|
||||
f"(pages={page_count} n={len(segments)} created={n} "
|
||||
f"confidence={out.confidence})"
|
||||
)
|
||||
return True
|
||||
|
||||
|
||||
async def process(document_id: int, session: AsyncSession) -> None:
|
||||
"""presegment stage 워커 진입점. queue_consumer 가 호출.
|
||||
|
||||
전 문서가 진입하며, 非PDF·단일문서는 변경 없이 통과(presegment_role 그대로 NULL) → extract 로 흐른다.
|
||||
'명확한 번들' PDF 만 자식 분할 + 부모를 'parent' 로 표식(이 경우 부모는 extract 로 흐르지 않음).
|
||||
"""
|
||||
doc = await session.get(Document, document_id)
|
||||
if doc is None:
|
||||
logger.warning(f"[presegment] document {document_id} not found")
|
||||
return
|
||||
|
||||
# ─── (0) 非PDF — fast-exit. presegment_role 그대로 NULL → enqueue_next_stage 가 extract 로 흘림 ───
|
||||
if not _is_pdf(doc):
|
||||
logger.info(f"[presegment] id={document_id} non-pdf (fmt={doc.file_format}) → extract")
|
||||
return
|
||||
|
||||
# ─── (0.5) file_path 없음(예: note) — 분할 불가, 단일문서로 통과 ───
|
||||
if not doc.file_path:
|
||||
logger.info(f"[presegment] id={document_id} no file_path → extract")
|
||||
return
|
||||
|
||||
# ─── (1) 이미 분할된 자식 자신이 presegment 로 다시 들어온 경우 — 재분할 금지 ───
|
||||
# (정상 흐름에선 자식은 곧장 extract 로 enqueue 되지만, 재처리 스크립트 등으로 들어올 수 있음.)
|
||||
if doc.presegment_role in ("child", "parent"):
|
||||
logger.info(
|
||||
f"[presegment] id={document_id} already presegment_role={doc.presegment_role} → skip"
|
||||
)
|
||||
return
|
||||
|
||||
# ─── (2) 파일 열기 + page_count ───
|
||||
raw = str(Path(settings.nas_mount_path) / doc.file_path)
|
||||
source = _resolve_path(raw)
|
||||
if source is None:
|
||||
# 파일 부재 = extract 가 동일 상황에서 FileNotFoundError 로 처리할 사안.
|
||||
# presegment 는 분할 불가일 뿐이므로 단일문서로 통과시켜 extract 가 일관되게 처리하게 둔다.
|
||||
logger.warning(f"[presegment] id={document_id} file not found ({raw}) → extract")
|
||||
return
|
||||
|
||||
import fitz # PyMuPDF — extract_worker/marker_worker 와 동일 의존
|
||||
|
||||
try:
|
||||
with fitz.open(str(source)) as pdf:
|
||||
page_count = pdf.page_count
|
||||
toc = pdf.get_toc(simple=True) or []
|
||||
except Exception as exc:
|
||||
# PDF 손상 등 — 분할 불가. 단일문서로 통과(extract 가 PyMuPDF/OCR 로 재시도하며 가시화).
|
||||
logger.warning(
|
||||
f"[presegment] id={document_id} fitz open/toc failed "
|
||||
f"({type(exc).__name__}: {exc}) → extract"
|
||||
)
|
||||
return
|
||||
|
||||
# ─── (3) page_count 가 임계 미만 = 단일문서 (대다수 경로) ───
|
||||
if page_count < MIN_BUNDLE_PAGES:
|
||||
logger.info(
|
||||
f"[presegment] id={document_id} single doc "
|
||||
f"(pages={page_count}<{MIN_BUNDLE_PAGES}) → extract"
|
||||
)
|
||||
return
|
||||
|
||||
# ─── (4) level-1 ToC → 자식 후보 segment ───
|
||||
segments = _level1_segments(toc, page_count)
|
||||
|
||||
if not segments:
|
||||
# 큰 PDF 인데 ToC 없음/level-1 없음 = 애매. flag ON 이면 LLM 경계 폴백(PR-G2-3),
|
||||
# OFF(기본) 이면 오늘과 동일 — 단일문서로 처리하고 사유를 남긴다.
|
||||
if PRESEGMENT_LLM_FALLBACK:
|
||||
logger.info(
|
||||
f"[presegment] presegment_ambiguous id={document_id} "
|
||||
f"reason=no_level1_toc pages={page_count} → LLM fallback"
|
||||
)
|
||||
if await _llm_boundary_fallback(doc, source, page_count, session):
|
||||
return
|
||||
# LLM 이 분할하지 않음(is_bundle=false / 검증·파싱 실패) — 단일문서.
|
||||
return
|
||||
logger.info(
|
||||
f"[presegment] presegment_ambiguous id={document_id} "
|
||||
f"reason=no_level1_toc pages={page_count} → single doc(extract)"
|
||||
)
|
||||
return
|
||||
|
||||
clear, reason = _is_clear_bundle(segments, page_count)
|
||||
if not clear:
|
||||
# 큰 PDF + ToC 는 있으나 '명확한 번들' 기준 미달 = 애매. flag ON 이면 LLM 경계 폴백,
|
||||
# OFF(기본) 이면 오늘과 동일 — 단일문서(분할 안 함).
|
||||
if PRESEGMENT_LLM_FALLBACK:
|
||||
logger.info(
|
||||
f"[presegment] presegment_ambiguous id={document_id} "
|
||||
f"reason={reason} pages={page_count} level1={len(segments)} → LLM fallback"
|
||||
)
|
||||
if await _llm_boundary_fallback(doc, source, page_count, session):
|
||||
return
|
||||
return
|
||||
logger.info(
|
||||
f"[presegment] presegment_ambiguous id={document_id} "
|
||||
f"reason={reason} pages={page_count} level1={len(segments)} → single doc(extract)"
|
||||
)
|
||||
return
|
||||
|
||||
# ─── (5) 명확한 번들 (deterministic) — 공유 자식 생성 경로 (멱등 수렴 포함) ───
|
||||
await _create_children(doc, segments, session)
|
||||
@@ -31,9 +31,9 @@ _hold_logged = False
|
||||
# embed/chunk 1→10 (2026-06-12 fast-consumer): 건당 <1s 실측 — Phase 0.1 초기 보수값이
|
||||
# LLM 사이클에 인질로 잡혀 실효 ~580/일 vs 수요 최대 2,700/일 → 적체 원인이었음.
|
||||
# 10 = TEI/marker 와 GPU 공유 고려한 보수 상향(전용 1분 잡 기준 캡 ~14,400/일).
|
||||
BATCH_SIZE = {"presegment": 3, "extract": 5, "classify": 3, "summarize": 3, "embed": 10,
|
||||
"chunk": 10, "preview": 2, "stt": 1, "thumbnail": 3, "deep_summary": 1,
|
||||
"markdown": 1, "fulltext": 3}
|
||||
BATCH_SIZE = {"extract": 5, "classify": 3, "summarize": 3, "embed": 10, "chunk": 10,
|
||||
"preview": 2, "stt": 1, "thumbnail": 3, "deep_summary": 1, "markdown": 1,
|
||||
"fulltext": 3}
|
||||
STALE_THRESHOLD_MINUTES = 10
|
||||
# markdown 대형 split 변환은 한 doc 이 수십 분(5210 ≈ 40분) 동안 processing 상태로 머문다.
|
||||
# marker_worker 는 queue 행에 heartbeat 를 찍지 않으므로(started_at 고정), main 의 10분
|
||||
@@ -46,7 +46,7 @@ MARKDOWN_STALE_THRESHOLD_MINUTES = int(os.getenv("MARKDOWN_STALE_MINUTES", "120"
|
||||
# (reset_stale_items 가 자기 집합만 reset, 교차 시 이중 복구 위험).
|
||||
# STT 도 장기 작업 가능성이 있으나 본 PR 범위 밖 — main 에 유지(follow-up).
|
||||
MAIN_QUEUE_STAGES = [
|
||||
"presegment", "extract", "classify", "summarize",
|
||||
"extract", "classify", "summarize",
|
||||
"preview", "stt", "thumbnail", "fulltext",
|
||||
]
|
||||
MARKDOWN_QUEUE_STAGES = ["markdown"]
|
||||
@@ -165,10 +165,6 @@ async def enqueue_next_stage(document_id: int, current_stage: str):
|
||||
}
|
||||
|
||||
next_stages = {
|
||||
# G2 (PR-G2-2): 전 문서가 presegment → extract. 단, 번들 분할로 'parent' 가 된 문서는
|
||||
# 파일 홀더라 자체 extract 안 함 — 아래 suppression 으로 이 전이를 건너뛴다(자식 extract 는
|
||||
# presegment_worker 가 직접 enqueue). 단일/非PDF 문서(role NULL)는 정상적으로 extract 로 흐름.
|
||||
"presegment": ["extract"],
|
||||
"extract": ["classify", "preview"],
|
||||
"classify": ["embed", "chunk", "markdown"],
|
||||
"stt": ["classify"],
|
||||
@@ -184,18 +180,6 @@ async def enqueue_next_stage(document_id: int, current_stage: str):
|
||||
stages = extract_override_by_channel[sc]
|
||||
else:
|
||||
stages = next_stages.get(current_stage, [])
|
||||
elif current_stage == "presegment":
|
||||
# 번들 분할 parent 는 extract 로 흐르지 않게 억제 (자식이 부모 extract 에 가려지는 것 방지).
|
||||
# role NULL(단일/非PDF) / 'child' 는 정상 전이. presegment_worker 가 자식 extract 를 직접
|
||||
# enqueue 하므로 'parent' 만 여기서 no-op.
|
||||
from models.document import Document
|
||||
async with async_session() as lookup_session:
|
||||
doc = await lookup_session.get(Document, document_id)
|
||||
role = doc.presegment_role if doc else None
|
||||
if role == "parent":
|
||||
stages = []
|
||||
else:
|
||||
stages = next_stages.get(current_stage, [])
|
||||
else:
|
||||
stages = next_stages.get(current_stage, [])
|
||||
|
||||
@@ -215,7 +199,6 @@ def _load_workers():
|
||||
from workers.deep_summary_worker import process as deep_summary_process
|
||||
from workers.embed_worker import process as embed_process
|
||||
from workers.extract_worker import process as extract_process
|
||||
from workers.presegment_worker import process as presegment_process
|
||||
from workers.preview_worker import process as preview_process
|
||||
from workers.stt_worker import process as stt_process
|
||||
from workers.summarize_worker import process as summarize_process
|
||||
@@ -224,8 +207,6 @@ def _load_workers():
|
||||
from workers.fulltext_worker import process as fulltext_process
|
||||
|
||||
return {
|
||||
# G2 (PR-G2-2): extract 前 번들 PDF → N 자식 분할 (deterministic ToC). 非PDF/단일은 통과.
|
||||
"presegment": presegment_process,
|
||||
"extract": extract_process,
|
||||
"classify": classify_process,
|
||||
"summarize": summarize_process,
|
||||
|
||||
@@ -1,135 +0,0 @@
|
||||
# Phase 2A — Embedding candidate compose override (Diagnose only)
|
||||
#
|
||||
# Profile-isolated: `--profile embed-cand` 명시 opt-in. default up 시 미기동.
|
||||
# production fastapi/postgres/reranker 에 영향 0.
|
||||
# 본 PR 종료 시 별 chore (PR-2A-Chunks-Cand-Cleanup-1) 에서 제거.
|
||||
#
|
||||
# 후보 상태 (2026-05-23):
|
||||
# - me5_large_inst : ✅ smoke PASS (dim 1024)
|
||||
# - bge_mgemma2 : ❌ Phase 2A-Extended 별 PR 이관 (9B FP16 → VRAM OOM risk + 다운로드 cost)
|
||||
# - me5_ko : ❌ 폐기 (401 Unauthorized, gated/모델명 부정확)
|
||||
# - snowflake_l_v2 : 신규 추가 (Snowflake/snowflake-arctic-embed-l-v2.0, 2024-12, multilingual 강화)
|
||||
#
|
||||
# 사용:
|
||||
# docker compose -f docker-compose.yml -f docker-compose.override.cand.yml \
|
||||
# --profile embed-cand up -d embedding-cand-me5-inst
|
||||
#
|
||||
# 호출 (DS network 내부):
|
||||
# http://embedding-cand-me5-inst:80/embed
|
||||
# http://embedding-cand-snowflake-l-v2:80/embed
|
||||
|
||||
services:
|
||||
embedding-cand-me5-inst:
|
||||
image: ghcr.io/huggingface/text-embeddings-inference:1.7
|
||||
restart: unless-stopped
|
||||
container_name: hyungi_document_server-embedding-cand-me5-inst-1
|
||||
expose:
|
||||
- "80"
|
||||
environment:
|
||||
- MODEL_ID=intfloat/multilingual-e5-large-instruct
|
||||
- MAX_BATCH_TOKENS=8192
|
||||
- MAX_CONCURRENT_REQUESTS=4
|
||||
volumes:
|
||||
- embedding_cand_me5_inst_cache:/data
|
||||
deploy:
|
||||
resources:
|
||||
reservations:
|
||||
devices:
|
||||
- driver: nvidia
|
||||
count: 1
|
||||
capabilities: [gpu]
|
||||
healthcheck:
|
||||
test: ["CMD", "curl", "-fsS", "http://localhost/health"]
|
||||
interval: 30s
|
||||
timeout: 10s
|
||||
retries: 5
|
||||
start_period: 60s
|
||||
profiles: ["embed-cand"]
|
||||
|
||||
embedding-cand-snowflake-l-v2:
|
||||
image: ghcr.io/huggingface/text-embeddings-inference:1.7
|
||||
restart: unless-stopped
|
||||
container_name: hyungi_document_server-embedding-cand-snowflake-l-v2-1
|
||||
expose:
|
||||
- "80"
|
||||
environment:
|
||||
- MODEL_ID=Snowflake/snowflake-arctic-embed-l-v2.0
|
||||
- MAX_BATCH_TOKENS=8192
|
||||
- MAX_CONCURRENT_REQUESTS=4
|
||||
volumes:
|
||||
- embedding_cand_snowflake_l_v2_cache:/data
|
||||
deploy:
|
||||
resources:
|
||||
reservations:
|
||||
devices:
|
||||
- driver: nvidia
|
||||
count: 1
|
||||
capabilities: [gpu]
|
||||
healthcheck:
|
||||
test: ["CMD", "curl", "-fsS", "http://localhost/health"]
|
||||
interval: 30s
|
||||
timeout: 10s
|
||||
retries: 5
|
||||
start_period: 60s
|
||||
profiles: ["embed-cand"]
|
||||
|
||||
# ===== 비활성 후보 (Phase 2A-Extended 별 PR 이관 또는 폐기) =====
|
||||
# 진단 박제만 보존. 본 PR scope 외.
|
||||
|
||||
embedding-cand-bge-mgemma2:
|
||||
image: ghcr.io/huggingface/text-embeddings-inference:1.7
|
||||
container_name: hyungi_document_server-embedding-cand-bge-mgemma2-1
|
||||
expose:
|
||||
- "80"
|
||||
environment:
|
||||
- MODEL_ID=BAAI/bge-multilingual-gemma2
|
||||
- MAX_BATCH_TOKENS=8192
|
||||
- MAX_CONCURRENT_REQUESTS=4
|
||||
volumes:
|
||||
- embedding_cand_bge_mgemma2_cache:/data
|
||||
deploy:
|
||||
resources:
|
||||
reservations:
|
||||
devices:
|
||||
- driver: nvidia
|
||||
count: 1
|
||||
capabilities: [gpu]
|
||||
healthcheck:
|
||||
test: ["CMD", "curl", "-fsS", "http://localhost/health"]
|
||||
interval: 30s
|
||||
timeout: 10s
|
||||
retries: 5
|
||||
start_period: 300s
|
||||
profiles: ["embed-cand-extended"] # 본 PR 미사용. extended 별 profile.
|
||||
|
||||
embedding-cand-me5-ko:
|
||||
image: ghcr.io/huggingface/text-embeddings-inference:1.7
|
||||
container_name: hyungi_document_server-embedding-cand-me5-ko-1
|
||||
expose:
|
||||
- "80"
|
||||
environment:
|
||||
- MODEL_ID=dragonkue/multilingual-e5-large-ko
|
||||
- MAX_BATCH_TOKENS=8192
|
||||
- MAX_CONCURRENT_REQUESTS=4
|
||||
volumes:
|
||||
- embedding_cand_me5_ko_cache:/data
|
||||
deploy:
|
||||
resources:
|
||||
reservations:
|
||||
devices:
|
||||
- driver: nvidia
|
||||
count: 1
|
||||
capabilities: [gpu]
|
||||
healthcheck:
|
||||
test: ["CMD", "curl", "-fsS", "http://localhost/health"]
|
||||
interval: 30s
|
||||
timeout: 10s
|
||||
retries: 5
|
||||
start_period: 60s
|
||||
profiles: ["embed-cand-disabled"] # 401 fail. 사용 X.
|
||||
|
||||
volumes:
|
||||
embedding_cand_me5_inst_cache:
|
||||
embedding_cand_snowflake_l_v2_cache:
|
||||
embedding_cand_bge_mgemma2_cache:
|
||||
embedding_cand_me5_ko_cache:
|
||||
@@ -1,101 +0,0 @@
|
||||
# Phase 2B — Reranker candidate compose override (Diagnose only)
|
||||
#
|
||||
# Profile-isolated: `--profile rerank-cand` 명시 opt-in. default up 시 미기동.
|
||||
# production fastapi/postgres/reranker(bge-reranker-v2-m3) 에 영향 0.
|
||||
# 본 PR 종료 후 별 chore (PR-2B-Rerank-Cand-Cleanup-1) 에서 제거.
|
||||
#
|
||||
# 후보 상태 (2026-05-23):
|
||||
# - gte_ml_base : Apache 2.0, 305M, smoke 대기
|
||||
# - mxbai_large : Apache 2.0, ~435M, safetensors 부재 — TEI smoke risk
|
||||
# - bge_v2_gemma_2b : Gemma 라이센스, 2.5B FP16 ~5GB, smoke 대기
|
||||
#
|
||||
# 사용:
|
||||
# docker compose -f docker-compose.yml -f docker-compose.override.rerank-cand.yml \
|
||||
# --profile rerank-cand up -d rerank-cand-gte-ml-base
|
||||
|
||||
services:
|
||||
rerank-cand-gte-ml-base:
|
||||
image: ghcr.io/huggingface/text-embeddings-inference:1.7
|
||||
restart: unless-stopped
|
||||
container_name: hyungi_document_server-rerank-cand-gte-ml-base-1
|
||||
expose:
|
||||
- "80"
|
||||
environment:
|
||||
- MODEL_ID=Alibaba-NLP/gte-multilingual-reranker-base
|
||||
- MAX_BATCH_TOKENS=8192
|
||||
- MAX_CONCURRENT_REQUESTS=4
|
||||
volumes:
|
||||
- rerank_cand_gte_ml_base_cache:/data
|
||||
deploy:
|
||||
resources:
|
||||
reservations:
|
||||
devices:
|
||||
- driver: nvidia
|
||||
count: 1
|
||||
capabilities: [gpu]
|
||||
healthcheck:
|
||||
test: ["CMD", "curl", "-fsS", "http://localhost/health"]
|
||||
interval: 30s
|
||||
timeout: 10s
|
||||
retries: 5
|
||||
start_period: 60s
|
||||
profiles: ["rerank-cand"]
|
||||
|
||||
rerank-cand-mxbai-large:
|
||||
image: ghcr.io/huggingface/text-embeddings-inference:1.7
|
||||
restart: unless-stopped
|
||||
container_name: hyungi_document_server-rerank-cand-mxbai-large-1
|
||||
expose:
|
||||
- "80"
|
||||
environment:
|
||||
- MODEL_ID=mixedbread-ai/mxbai-rerank-large-v1
|
||||
- MAX_BATCH_TOKENS=8192
|
||||
- MAX_CONCURRENT_REQUESTS=4
|
||||
volumes:
|
||||
- rerank_cand_mxbai_large_cache:/data
|
||||
deploy:
|
||||
resources:
|
||||
reservations:
|
||||
devices:
|
||||
- driver: nvidia
|
||||
count: 1
|
||||
capabilities: [gpu]
|
||||
healthcheck:
|
||||
test: ["CMD", "curl", "-fsS", "http://localhost/health"]
|
||||
interval: 30s
|
||||
timeout: 10s
|
||||
retries: 5
|
||||
start_period: 60s
|
||||
profiles: ["rerank-cand"]
|
||||
|
||||
rerank-cand-bge-v2-gemma-2b:
|
||||
image: ghcr.io/huggingface/text-embeddings-inference:1.7
|
||||
restart: unless-stopped
|
||||
container_name: hyungi_document_server-rerank-cand-bge-v2-gemma-2b-1
|
||||
expose:
|
||||
- "80"
|
||||
environment:
|
||||
- MODEL_ID=BAAI/bge-reranker-v2-gemma
|
||||
- MAX_BATCH_TOKENS=8192
|
||||
- MAX_CONCURRENT_REQUESTS=2
|
||||
volumes:
|
||||
- rerank_cand_bge_v2_gemma_2b_cache:/data
|
||||
deploy:
|
||||
resources:
|
||||
reservations:
|
||||
devices:
|
||||
- driver: nvidia
|
||||
count: 1
|
||||
capabilities: [gpu]
|
||||
healthcheck:
|
||||
test: ["CMD", "curl", "-fsS", "http://localhost/health"]
|
||||
interval: 30s
|
||||
timeout: 10s
|
||||
retries: 5
|
||||
start_period: 120s
|
||||
profiles: ["rerank-cand"]
|
||||
|
||||
volumes:
|
||||
rerank_cand_gte_ml_base_cache:
|
||||
rerank_cand_mxbai_large_cache:
|
||||
rerank_cand_bge_v2_gemma_2b_cache:
|
||||
+18
-23
@@ -54,27 +54,24 @@ services:
|
||||
start_period: 180s
|
||||
restart: unless-stopped
|
||||
|
||||
# MinerU 2.5 VLM PDF→markdown 추출 — ★ marker-service 대체(컷오버 2026-06-18, A/B 8/8 PASS).
|
||||
# 단일카드 markdown VRAM ~10GB(marker)→~5.9GB 고정. fastapi 가 MARKER_ENDPOINT 로 호출.
|
||||
# 동기 do_parse 버그 회피 위해 server.py 는 async aio_do_parse 사용. 포트 3301.
|
||||
mineru-service:
|
||||
build: ./services/mineru
|
||||
# Phase 1B (2026-05-01): PDF → markdown 변환. ocr-service 와 별도 컨테이너 (deps 충돌 회피).
|
||||
marker-service:
|
||||
build: ./services/marker
|
||||
ports:
|
||||
- "127.0.0.1:3301:3301"
|
||||
- "127.0.0.1:3300:3300"
|
||||
expose:
|
||||
- "3301"
|
||||
- "3300"
|
||||
environment:
|
||||
# vlm-engine = 순수 VLM 단일모델. 기본 hybrid-engine 은 다중모델 로드 = OOM(반드시 명시).
|
||||
- MINERU_BACKEND=vlm-engine
|
||||
- MINERU_LANG=${MINERU_LANG:-korean}
|
||||
# 공유 16GB 카드 공존: 절대 VRAM 캡(GB, 공유카드 robust) + vLLM 분율 캡 병용.
|
||||
- MINERU_VIRTUAL_VRAM_SIZE=${MINERU_VIRTUAL_VRAM_SIZE:-6}
|
||||
- MINERU_GPU_MEMORY_UTILIZATION=${MINERU_GPU_MEMORY_UTILIZATION:-0.40}
|
||||
- MINERU_PRELOAD=${MINERU_PRELOAD:-1}
|
||||
- HF_HOME=/models/huggingface
|
||||
- TORCH_HOME=/models/torch
|
||||
# D-1 (crawl-24x7): idle-unload 전환 — 영구 점유(~3.5GB) 해제가 90% 봉투의 전제.
|
||||
# /ready 는 idle 에서도 200 (fastapi depends_on service_healthy 유지).
|
||||
# 롤백 = MARKER_PRELOAD=1 + MARKER_IDLE_UNLOAD_MINUTES=0.
|
||||
- MARKER_PRELOAD=0
|
||||
- MARKER_IDLE_UNLOAD_MINUTES=${MARKER_IDLE_UNLOAD_MINUTES:-30}
|
||||
volumes:
|
||||
- ${NAS_NFS_PATH:-/mnt/nas/Document_Server}:/documents:ro
|
||||
- mineru_models:/root/.cache
|
||||
ipc: host # vLLM 공유메모리 — 공식 run 의 --ipc=host 대응.
|
||||
- marker_models:/models
|
||||
deploy:
|
||||
resources:
|
||||
reservations:
|
||||
@@ -83,11 +80,11 @@ services:
|
||||
count: 1
|
||||
capabilities: [gpu]
|
||||
healthcheck:
|
||||
test: ["CMD", "curl", "-f", "http://localhost:3301/ready"]
|
||||
test: ["CMD", "curl", "-f", "http://localhost:3300/ready"]
|
||||
interval: 30s
|
||||
timeout: 10s
|
||||
retries: 3
|
||||
start_period: 900s # VLM 모델 lazy 다운로드(~2.4GB)+엔진 로드 여유.
|
||||
start_period: 300s
|
||||
restart: unless-stopped
|
||||
|
||||
stt-service:
|
||||
@@ -187,8 +184,7 @@ services:
|
||||
condition: service_healthy
|
||||
kordoc-service:
|
||||
condition: service_healthy
|
||||
# 마크다운 엔진 = mineru-service (marker-service 제거 2026-06-18, 롤백=git history).
|
||||
mineru-service:
|
||||
marker-service:
|
||||
condition: service_healthy
|
||||
env_file:
|
||||
- credentials.env
|
||||
@@ -196,8 +192,7 @@ services:
|
||||
- DATABASE_URL=postgresql+asyncpg://pkm:${POSTGRES_PASSWORD}@postgres:5432/pkm
|
||||
- KORDOC_ENDPOINT=http://kordoc-service:3100
|
||||
- OCR_ENDPOINT=http://ocr-service:3200
|
||||
# ★ 컷오버 2026-06-18: marker-service:3300 → mineru-service:3301 (동일 /convert 계약).
|
||||
- MARKER_ENDPOINT=http://mineru-service:3301
|
||||
- MARKER_ENDPOINT=http://marker-service:3300
|
||||
- MARKER_CONTAINER_PATH_PREFIX=/documents
|
||||
# 2026-05-08 (D9 Track B revised): GPU stt-service 정식 승격, 내부 DNS 사용.
|
||||
- STT_ENDPOINT=http://stt-service:3300
|
||||
@@ -275,4 +270,4 @@ volumes:
|
||||
reranker_cache:
|
||||
ocr_models:
|
||||
stt_models:
|
||||
mineru_models:
|
||||
marker_models:
|
||||
|
||||
@@ -1,10 +0,0 @@
|
||||
-- 362: G2 pre-segmentation — 번들 PDF(여러 논리문서 한 파일) → N 자식 문서 분할.
|
||||
-- 자식 doc 의 원본 내 page 범위(1-based inclusive) + 분할 역할 표식.
|
||||
-- 부모-자식 관계 자체는 document_lineage(relation_type='segmented_from', migration 363).
|
||||
-- presegment_role: NULL=일반 단일문서(대다수) / 'parent'=번들원본(자체 extract/embed 안 함) /
|
||||
-- 'child'=논리 하위문서(부모 file_path 공유 + bundle_page_start/end 범위로 슬라이스).
|
||||
-- 단일 ALTER(다중 절) = 1 statement (asyncpg 멀티스테이트먼트 제약 준수).
|
||||
ALTER TABLE documents
|
||||
ADD COLUMN IF NOT EXISTS bundle_page_start INTEGER,
|
||||
ADD COLUMN IF NOT EXISTS bundle_page_end INTEGER,
|
||||
ADD COLUMN IF NOT EXISTS presegment_role TEXT;
|
||||
@@ -1,8 +0,0 @@
|
||||
-- 363: G2 — document_lineage.relation_type 에 'segmented_from'(번들 → 자식) 추가.
|
||||
-- 217 의 column-level CHECK(PG 자동명 document_lineage_relation_type_check, 배포 DB 실측 확인)
|
||||
-- 를 교체. DROP + ADD 를 단일 ALTER 의 두 절로 = 1 statement.
|
||||
-- 멱등: DROP ... IF EXISTS 라 재실행 안전(이미 교체됐으면 새 제약 DROP 후 동일 재생성).
|
||||
ALTER TABLE document_lineage
|
||||
DROP CONSTRAINT IF EXISTS document_lineage_relation_type_check,
|
||||
ADD CONSTRAINT document_lineage_relation_type_check
|
||||
CHECK (relation_type IN ('cited','summarized_from','generated_from','revised_from','segmented_from'));
|
||||
@@ -1,5 +0,0 @@
|
||||
-- 364: G2 — process_stage 큐 스테이지 enum 에 'presegment' 추가 (extract 前 번들 분할 단계).
|
||||
-- PG16: ALTER TYPE ADD VALUE 는 트랜잭션 내 실행 가능(값 추가만, 同 트랜잭션 내 사용은 안 함 —
|
||||
-- 사용은 후속 마이그/런타임). IF NOT EXISTS = 재실행 멱등.
|
||||
-- (이 한 줄 단독 파일 — 1 statement.)
|
||||
ALTER TYPE process_stage ADD VALUE IF NOT EXISTS 'presegment';
|
||||
@@ -0,0 +1,22 @@
|
||||
FROM python:3.12-slim
|
||||
|
||||
WORKDIR /app
|
||||
|
||||
RUN apt-get update && apt-get install -y --no-install-recommends \
|
||||
libgl1 libglib2.0-0 curl \
|
||||
&& apt-get clean && rm -rf /var/lib/apt/lists/*
|
||||
|
||||
COPY requirements.txt .
|
||||
RUN pip install --no-cache-dir \
|
||||
--extra-index-url https://download.pytorch.org/whl/cu126 \
|
||||
-r requirements.txt
|
||||
|
||||
# 모델 미다운로드 (HF cache volume → 첫 호출/warmup 시 적재).
|
||||
|
||||
COPY server.py .
|
||||
|
||||
EXPOSE 3300
|
||||
HEALTHCHECK --start-period=300s --interval=30s --timeout=10s --retries=3 \
|
||||
CMD curl -f http://localhost:3300/ready || exit 1
|
||||
|
||||
CMD ["uvicorn", "server:app", "--host", "0.0.0.0", "--port", "3300"]
|
||||
@@ -0,0 +1,9 @@
|
||||
torch==2.11.0+cu126
|
||||
torchvision==0.26.0+cu126
|
||||
transformers==4.57.6
|
||||
surya-ocr==0.17.1
|
||||
marker-pdf==1.10.2
|
||||
pymupdf>=1.24.0,<2.0.0
|
||||
fastapi>=0.110.0,<1.0.0
|
||||
uvicorn[standard]>=0.27.0,<1.0.0
|
||||
pillow>=10.0.0,<12.0.0
|
||||
@@ -0,0 +1,325 @@
|
||||
"""marker-service — POST /convert: PDF → markdown + 추출 이미지 base64.
|
||||
|
||||
Phase 1B (2026-05-01) — 텍스트만 응답, 이미지 폐기.
|
||||
Phase 1B.5 — `_images` 직렬화해서 base64 응답에 포함. NAS write 권한이
|
||||
없는 stateless 변환기 유지 (fastapi 가 NAS persist 담당).
|
||||
D-1 (plan crawl-24x7-1, 2026-06-10) — idle-unload 운영 전환:
|
||||
MARKER_PRELOAD=0 : startup warmup 끔 (첫 /convert 시 lazy load)
|
||||
MARKER_IDLE_UNLOAD_MINUTES : N분 유휴 시 모델 해제 (0=비활성, 기존 동작)
|
||||
/ready 는 idle(미적재)에서도 200 — fastapi 의 depends_on service_healthy 가
|
||||
lazy 모드에서 영구 미기동으로 굳는 것 방지. 503 은 warmup_failed 한정.
|
||||
|
||||
plan: ~/.claude/plans/piped-humming-crystal.md
|
||||
"""
|
||||
import base64
|
||||
import gc
|
||||
import hashlib
|
||||
import io
|
||||
import logging
|
||||
import os
|
||||
import threading
|
||||
import time
|
||||
from pathlib import Path
|
||||
|
||||
from fastapi import FastAPI, HTTPException, Response
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
from marker.converters.pdf import PdfConverter
|
||||
from marker.models import create_model_dict
|
||||
from marker.output import text_from_rendered
|
||||
import marker as marker_module
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
app = FastAPI()
|
||||
|
||||
os.environ.setdefault("HF_HOME", "/models/huggingface")
|
||||
os.environ.setdefault("TORCH_HOME", "/models/torch")
|
||||
|
||||
_models = None
|
||||
_converter = None
|
||||
try:
|
||||
import importlib.metadata
|
||||
_engine_version = importlib.metadata.version("marker-pdf")
|
||||
except Exception:
|
||||
_engine_version = "unknown"
|
||||
_warmup_done = False
|
||||
_warmup_error: str | None = None
|
||||
_warmup_lock = threading.Lock()
|
||||
|
||||
# D-1 idle-unload 상태 — 전이는 전부 _warmup_lock 아래
|
||||
_PRELOAD = os.getenv("MARKER_PRELOAD", "1") != "0"
|
||||
_IDLE_UNLOAD_MINUTES = int(os.getenv("MARKER_IDLE_UNLOAD_MINUTES", "0"))
|
||||
_inflight = 0
|
||||
_last_used = time.monotonic()
|
||||
|
||||
# 이미지 응답 cap. base64 응답 크기 폭주 방지. 사용자 PDF 풀 측정 (Phase 1D) 시
|
||||
# 가장 이미지 많은 문서가 ~30건 수준 → 200 은 안전 마진. 초과 시 truncate flag 응답.
|
||||
MAX_IMAGES_PER_DOC = int(os.getenv("MARKER_MAX_IMAGES_PER_DOC", "200"))
|
||||
# per-image 최대 raw bytes (base64 전). 그래픽이 많은 풀페이지 스캔 회피.
|
||||
MAX_BYTES_PER_IMAGE = int(os.getenv("MARKER_MAX_BYTES_PER_IMAGE", str(10 * 1024 * 1024)))
|
||||
|
||||
|
||||
def _ensure_warmup() -> None:
|
||||
"""첫 /convert 또는 startup hook 시 모델 로드. HF cache volume 활용."""
|
||||
global _models, _converter, _warmup_done, _warmup_error
|
||||
if _warmup_done:
|
||||
return
|
||||
with _warmup_lock:
|
||||
if _warmup_done:
|
||||
return
|
||||
try:
|
||||
logger.info("[marker-service] warmup start")
|
||||
_models = create_model_dict()
|
||||
_converter = PdfConverter(artifact_dict=_models)
|
||||
_warmup_done = True
|
||||
_warmup_error = None
|
||||
logger.info(f"[marker-service] warmup done engine_version={_engine_version}")
|
||||
except Exception as exc:
|
||||
_warmup_error = f"{type(exc).__name__}: {exc}"
|
||||
logger.exception("[marker-service] warmup failed")
|
||||
raise
|
||||
|
||||
|
||||
def _acquire_models():
|
||||
"""warmup 보장 + inflight 진입을 원자적으로 — ensure 직후 reaper 가 해제하는 경합 차단."""
|
||||
global _inflight
|
||||
while True:
|
||||
_ensure_warmup()
|
||||
with _warmup_lock:
|
||||
if _warmup_done:
|
||||
_inflight += 1
|
||||
return
|
||||
# ensure 와 lock 재진입 사이에 unload 가 끼어든 희귀 경합 — 재시도
|
||||
|
||||
|
||||
def _release_models():
|
||||
global _inflight, _last_used
|
||||
with _warmup_lock:
|
||||
_inflight -= 1
|
||||
_last_used = time.monotonic()
|
||||
|
||||
|
||||
def _maybe_unload() -> None:
|
||||
"""유휴 시 모델 해제. 변환 중(inflight>0)이면 절대 해제하지 않는다.
|
||||
|
||||
split 변환의 배치 사이 간격은 초 단위 — N>=1분 임계면 배치 사이 해제 없음.
|
||||
"""
|
||||
global _models, _converter, _warmup_done
|
||||
with _warmup_lock:
|
||||
if not _warmup_done or _inflight > 0:
|
||||
return
|
||||
if time.monotonic() - _last_used < _IDLE_UNLOAD_MINUTES * 60:
|
||||
return
|
||||
_models = None
|
||||
_converter = None
|
||||
_warmup_done = False
|
||||
gc.collect()
|
||||
try:
|
||||
import torch
|
||||
torch.cuda.empty_cache()
|
||||
except Exception:
|
||||
pass
|
||||
logger.info(f"[marker-service] idle-unload: 모델 해제 (유휴 {_IDLE_UNLOAD_MINUTES}분 초과)")
|
||||
|
||||
|
||||
async def _idle_reaper():
|
||||
import asyncio
|
||||
while True:
|
||||
await asyncio.sleep(60)
|
||||
try:
|
||||
_maybe_unload()
|
||||
except Exception:
|
||||
logger.exception("[marker-service] idle reaper 오류")
|
||||
|
||||
|
||||
@app.on_event("startup")
|
||||
async def startup():
|
||||
"""startup hook — warmup 은 MARKER_PRELOAD 게이트 (D-1: lazy 기본 전환은 compose 가)."""
|
||||
import asyncio
|
||||
if _PRELOAD:
|
||||
asyncio.create_task(asyncio.to_thread(_ensure_warmup))
|
||||
if _IDLE_UNLOAD_MINUTES > 0:
|
||||
asyncio.create_task(_idle_reaper())
|
||||
logger.info(f"[marker-service] idle-unload 활성: {_IDLE_UNLOAD_MINUTES}분")
|
||||
|
||||
|
||||
class ConvertRequest(BaseModel):
|
||||
file_path: str
|
||||
max_pages: int | None = None
|
||||
# page range (1-based inclusive) — LargeDoc split 변환용. marker 내부 0-based 변환은
|
||||
# convert() 에 격리 (page numbering invariant: DB/API=1-based, marker=0-based).
|
||||
start_page: int | None = None
|
||||
end_page: int | None = None
|
||||
|
||||
|
||||
class ConvertImage(BaseModel):
|
||||
"""marker 추출 이미지 1건. fastapi 가 NAS 에 쓰고 docimg:img_NNN 으로 ref 정규화."""
|
||||
slug: str # marker 원본 slug (예: '_page_0_Picture_3.jpeg')
|
||||
format: str # 'png' | 'jpeg' | 'webp' | 'gif'
|
||||
width: int | None = None
|
||||
height: int | None = None
|
||||
bytes_b64: str # base64-encoded raw bytes
|
||||
|
||||
|
||||
class ConvertResponse(BaseModel):
|
||||
md_content: str
|
||||
md_content_hash: str
|
||||
engine: str
|
||||
engine_version: str
|
||||
elapsed_ms: int
|
||||
raw_metrics: dict
|
||||
images: list[ConvertImage] = Field(default_factory=list)
|
||||
images_truncated: bool = False
|
||||
|
||||
|
||||
@app.get("/health")
|
||||
def health():
|
||||
return {"status": "ok", "service": "marker-service"}
|
||||
|
||||
|
||||
@app.get("/ready")
|
||||
async def ready(response: Response):
|
||||
"""Round 4 #1+#2: Response.status_code 명시 + warmup_error 노출.
|
||||
|
||||
D-1: idle(미적재) = 200. 503 은 warmup_failed 한정 — lazy 모드에서 fastapi
|
||||
depends_on service_healthy 가 영구 미기동으로 굳지 않게. 배포 검증에서
|
||||
'status=ready' 단언하던 runbook 은 강제 warm 호출(/convert 1건)로 대체.
|
||||
"""
|
||||
if _warmup_error:
|
||||
response.status_code = 503
|
||||
return {
|
||||
"status": "warmup_failed",
|
||||
"engine": "marker",
|
||||
"engine_version": _engine_version,
|
||||
"error": _warmup_error,
|
||||
}
|
||||
if not _warmup_done:
|
||||
return {
|
||||
"status": "warming_up" if _PRELOAD else "idle",
|
||||
"engine": "marker",
|
||||
"engine_version": _engine_version,
|
||||
"models_loaded": False,
|
||||
"idle_unload_minutes": _IDLE_UNLOAD_MINUTES,
|
||||
}
|
||||
return {
|
||||
"status": "ready",
|
||||
"engine": "marker",
|
||||
"engine_version": _engine_version,
|
||||
"models_loaded": True,
|
||||
"inflight": _inflight,
|
||||
"idle_unload_minutes": _IDLE_UNLOAD_MINUTES,
|
||||
}
|
||||
|
||||
|
||||
@app.post("/convert", response_model=ConvertResponse)
|
||||
async def convert(req: ConvertRequest):
|
||||
p = Path(req.file_path)
|
||||
if not p.is_file():
|
||||
raise HTTPException(404, detail={"code": "file_not_found", "message": str(p)})
|
||||
if req.start_page is not None and req.end_page is not None:
|
||||
if req.start_page < 1 or req.end_page < req.start_page:
|
||||
raise HTTPException(
|
||||
422,
|
||||
detail={
|
||||
"code": "bad_page_range",
|
||||
"message": f"start_page={req.start_page} end_page={req.end_page}",
|
||||
},
|
||||
)
|
||||
|
||||
# D-1: warmup 보장 + inflight 진입 원자화 — 변환 중 reaper 해제 차단. 해제는 finally.
|
||||
_acquire_models()
|
||||
try:
|
||||
start = time.monotonic()
|
||||
# page range 지정 시 per-request converter (모델 _models 재사용 → reload 없음).
|
||||
# invariant: req.start_page/end_page = 1-based inclusive → marker 0-based 로 변환.
|
||||
converter = _converter
|
||||
if req.start_page is not None and req.end_page is not None:
|
||||
page_range = list(range(req.start_page - 1, req.end_page)) # 0-based inclusive
|
||||
converter = PdfConverter(artifact_dict=_models, config={"page_range": page_range})
|
||||
try:
|
||||
rendered = converter(str(p))
|
||||
except Exception as exc:
|
||||
logger.exception(f"[marker-service] conversion failed path={p}: {exc}")
|
||||
raise HTTPException(
|
||||
status_code=422,
|
||||
detail={
|
||||
"code": "conversion_failed",
|
||||
"message": f"{type(exc).__name__}: {exc}",
|
||||
},
|
||||
) from exc
|
||||
|
||||
md_text, _meta, raw_images = text_from_rendered(rendered)
|
||||
elapsed_ms = int((time.monotonic() - start) * 1000)
|
||||
finally:
|
||||
_release_models()
|
||||
|
||||
images_payload, truncated = _serialize_images(raw_images, str(p))
|
||||
|
||||
return ConvertResponse(
|
||||
md_content=md_text,
|
||||
md_content_hash=hashlib.sha256(md_text.encode("utf-8")).hexdigest(),
|
||||
engine="marker",
|
||||
engine_version=_engine_version,
|
||||
elapsed_ms=elapsed_ms,
|
||||
raw_metrics={
|
||||
"page_count": getattr(rendered, "page_count", None),
|
||||
"image_count_extracted": len(raw_images) if raw_images else 0,
|
||||
"image_count_returned": len(images_payload),
|
||||
},
|
||||
images=images_payload,
|
||||
images_truncated=truncated,
|
||||
)
|
||||
|
||||
|
||||
def _serialize_images(raw_images, src_path: str) -> tuple[list[ConvertImage], bool]:
|
||||
"""marker 의 `_images` (dict[slug, PIL.Image]) → base64 ConvertImage 리스트.
|
||||
|
||||
가드:
|
||||
- MAX_IMAGES_PER_DOC 초과 시 head 만 반환 + truncated=True
|
||||
- per-image 직렬화 실패 시 해당 이미지만 skip + warn (전체 fail 안 함)
|
||||
- per-image 결과 byte 크기가 MAX_BYTES_PER_IMAGE 초과 시 skip + warn
|
||||
"""
|
||||
if not raw_images:
|
||||
return [], False
|
||||
|
||||
items = list(raw_images.items())
|
||||
truncated = len(items) > MAX_IMAGES_PER_DOC
|
||||
if truncated:
|
||||
logger.warning(
|
||||
f"[marker-service] images truncated path={src_path} "
|
||||
f"total={len(items)} cap={MAX_IMAGES_PER_DOC}"
|
||||
)
|
||||
items = items[:MAX_IMAGES_PER_DOC]
|
||||
|
||||
out: list[ConvertImage] = []
|
||||
for slug, pil_img in items:
|
||||
try:
|
||||
fmt_raw = (pil_img.format or "PNG").upper()
|
||||
# WebP/GIF 도 marker 가 emit 가능하지만 본 1B.5 기준은 PNG/JPEG 우선.
|
||||
# 알 수 없는 포맷이면 PNG 로 강제 (lossless re-encode).
|
||||
fmt = fmt_raw if fmt_raw in {"PNG", "JPEG", "WEBP", "GIF"} else "PNG"
|
||||
buf = io.BytesIO()
|
||||
pil_img.save(buf, format=fmt)
|
||||
raw_bytes = buf.getvalue()
|
||||
if len(raw_bytes) > MAX_BYTES_PER_IMAGE:
|
||||
logger.warning(
|
||||
f"[marker-service] image too large skipped path={src_path} "
|
||||
f"slug={slug} bytes={len(raw_bytes)} cap={MAX_BYTES_PER_IMAGE}"
|
||||
)
|
||||
continue
|
||||
out.append(
|
||||
ConvertImage(
|
||||
slug=slug,
|
||||
format=fmt.lower(),
|
||||
width=pil_img.width,
|
||||
height=pil_img.height,
|
||||
bytes_b64=base64.b64encode(raw_bytes).decode("ascii"),
|
||||
)
|
||||
)
|
||||
except Exception as exc:
|
||||
logger.warning(
|
||||
f"[marker-service] image serialize failed path={src_path} "
|
||||
f"slug={slug}: {type(exc).__name__}: {exc}"
|
||||
)
|
||||
continue
|
||||
return out, truncated
|
||||
@@ -1,45 +0,0 @@
|
||||
# mineru-service — MinerU 2.5 VLM 기반 PDF→markdown 추출기. marker-service 대체.
|
||||
# 단일카드(RTX 4070 Ti S 16GB→PRO 4000 24GB) markdown VRAM ~10GB(marker)→~5GB(MinerU VLM).
|
||||
#
|
||||
# 공식 opendatalab/MinerU global Dockerfile 기반:
|
||||
# FROM vllm/vllm-openai:v0.21.0 (CUDA 13.0). GPU 호스트 드라이버 595.71.05 / CUDA 13.2 가
|
||||
# 13.0 런타임 지원 → cu129 폴백 불필요. vLLM 은 base 이미지가 제공하므로 mineru 는 [core] 만.
|
||||
#
|
||||
# 모델은 이미지에 굽지 않고 런타임 warmup 시 HF cache 볼륨으로 lazy 다운로드 (marker/ocr 선례 =
|
||||
# 서버 .cache 볼륨). 이미지 슬림 유지 + server.py 반복 빌드 빠름 + 모델 볼륨 영속.
|
||||
FROM vllm/vllm-openai:v0.21.0
|
||||
|
||||
# base 이미지의 ENTRYPOINT(vLLM OpenAI 서버)를 제거 — 우리는 uvicorn 으로 자체 FastAPI 기동.
|
||||
ENTRYPOINT []
|
||||
|
||||
# opencv(libgl) + CJK 폰트(레이아웃/렌더 안전) + curl(healthcheck). 공식 Dockerfile 동일.
|
||||
RUN apt-get update && apt-get install -y --no-install-recommends \
|
||||
fonts-noto-core fonts-noto-cjk fontconfig libgl1 curl \
|
||||
&& fc-cache -fv \
|
||||
&& apt-get clean && rm -rf /var/lib/apt/lists/*
|
||||
|
||||
# mineru[core] — 공식 설치 라인. vLLM(vlm-engine 백엔드)은 base 가 이미 제공.
|
||||
RUN python3 -m pip install -U 'mineru[core]>=3.2.1' --break-system-packages \
|
||||
&& python3 -m pip cache purge
|
||||
|
||||
# 서비스 wrapper 의존성. base(vllm-openai)+mineru 가 fastapi/uvicorn/pillow 를 이미 제공 →
|
||||
# pymupdf 만 추가(나머지 명시 핀은 base 의 pillow 12.x 를 불필요하게 다운그레이드해서 제거).
|
||||
RUN python3 -m pip install --no-cache-dir --break-system-packages \
|
||||
'pymupdf>=1.24.0,<2.0.0'
|
||||
|
||||
# MINERU_MODEL_SOURCE=huggingface = warmup 시 lazy 다운로드 (HF cache 볼륨에 영속).
|
||||
# PYTORCH_CUDA_ALLOC_CONF=expandable_segments:True = 단편화 완화(연구 권고, 거대 입력 OOM 완충).
|
||||
ENV MINERU_MODEL_SOURCE=huggingface \
|
||||
HF_HOME=/root/.cache/huggingface \
|
||||
PYTORCH_CUDA_ALLOC_CONF=expandable_segments:True
|
||||
|
||||
WORKDIR /app
|
||||
# server.py = 무거운 pip 레이어 뒤에 COPY → 반복 빌드 시 캐시 적중(빠른 재빌드).
|
||||
COPY server.py /app/server.py
|
||||
|
||||
EXPOSE 3301
|
||||
# VLM 모델 lazy 다운로드(~2.4GB)+엔진 로드 여유로 start-period 길게.
|
||||
HEALTHCHECK --start-period=900s --interval=30s --timeout=10s --retries=3 \
|
||||
CMD curl -f http://localhost:3301/ready || exit 1
|
||||
|
||||
CMD ["uvicorn", "server:app", "--host", "0.0.0.0", "--port", "3301"]
|
||||
@@ -1,315 +0,0 @@
|
||||
"""mineru-service — POST /convert: PDF → markdown + 추출 이미지 base64.
|
||||
|
||||
marker-service 대체(MinerU 2.5 VLM). **marker 의 /convert 계약을 그대로 복제**해서
|
||||
marker_worker 가 엔드포인트만 바꾸면 되도록 한다(요청/응답 동일 shape):
|
||||
|
||||
요청: {file_path, max_pages?, start_page?, end_page?} (page = 1-based inclusive)
|
||||
응답: {md_content, md_content_hash, engine, engine_version, elapsed_ms,
|
||||
raw_metrics, images:[{slug, format, width, height, bytes_b64}], images_truncated}
|
||||
|
||||
설계 노트:
|
||||
- **page range 는 PyMuPDF 로 직접 슬라이스**해서 MinerU 에 넘긴다(start_page..end_page →
|
||||
0-based [a,b] 페이지만 담은 새 PDF bytes). MinerU 의 `end_page_id=0 falsy 무시` 버그 회피.
|
||||
40p 윈도우 분할은 marker_worker 가 그대로 담당. (검증: fitz 슬라이스 렌더 = 원본과 동일 품질.)
|
||||
- **★ 반드시 async 엔진(`aio_do_parse`) 사용.** 동기 `do_parse`(vllm-engine sync)는 본 모델
|
||||
(MinerU2.5-Pro-2605-1.2B)에서 layout 토큰 malformed → 빈 md 산출(실측 G1-2). async
|
||||
(`aio_do_parse` = vllm-async-engine, mineru CLI 가 쓰는 정상 경로) = 정상 출력.
|
||||
- **이미지 = stateless**: marker 처럼 NAS write 안 함. MinerU 가 md 에 박는 ``
|
||||
href 를 그대로 slug 으로 반환 → fastapi(marker_worker)의 `_rewrite_image_refs` 가 basename
|
||||
매칭으로 `docimg:img_NNN` 정규화 + NAS persist. (계약 무변)
|
||||
- **VRAM 캡**: `MINERU_GPU_MEMORY_UTILIZATION`(vLLM 분율, 0.40→~6GB 실측). compose 의
|
||||
`MINERU_VIRTUAL_VRAM_SIZE` 도 무해(실측 정상)하나 출력엔 무관 — 캡은 분율로 충분.
|
||||
backend=`vlm-engine`(기본 hybrid-engine 은 다중모델 로드 OOM, 반드시 명시).
|
||||
|
||||
엔진은 첫 변환(또는 startup warmup) 시 1회 로드 — MinerU ModelSingleton 캐시. 단일 GPU 라
|
||||
변환은 _engine_lock 으로 직렬화.
|
||||
"""
|
||||
import asyncio
|
||||
import base64
|
||||
import hashlib
|
||||
import inspect
|
||||
import io
|
||||
import logging
|
||||
import os
|
||||
import time
|
||||
import unicodedata
|
||||
from pathlib import Path
|
||||
|
||||
import fitz # PyMuPDF — page 슬라이스 + 페이지수
|
||||
from fastapi import FastAPI, HTTPException, Response
|
||||
from PIL import Image
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
logger = logging.getLogger("mineru-service")
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
app = FastAPI()
|
||||
|
||||
try:
|
||||
import importlib.metadata
|
||||
_engine_version = importlib.metadata.version("mineru")
|
||||
except Exception:
|
||||
_engine_version = "unknown"
|
||||
|
||||
# ---- 설정 (compose env 로 override) -----------------------------------------
|
||||
MINERU_BACKEND = os.getenv("MINERU_BACKEND", "vlm-engine")
|
||||
MINERU_LANG = os.getenv("MINERU_LANG", "korean")
|
||||
GPU_MEM_UTIL = float(os.getenv("MINERU_GPU_MEMORY_UTILIZATION", "0.40"))
|
||||
|
||||
MAX_IMAGES_PER_DOC = int(os.getenv("MINERU_MAX_IMAGES_PER_DOC", "200"))
|
||||
MAX_BYTES_PER_IMAGE = int(os.getenv("MINERU_MAX_BYTES_PER_IMAGE", str(10 * 1024 * 1024)))
|
||||
MAX_PAGES_HARD = int(os.getenv("MINERU_MAX_PAGES_HARD", "200")) # 1-shot max_pages 안전장치
|
||||
|
||||
_PRELOAD = os.getenv("MINERU_PRELOAD", "1") != "0"
|
||||
|
||||
# ---- 엔진 상태 ---------------------------------------------------------------
|
||||
_warmup_done = False
|
||||
_warmup_error: str | None = None
|
||||
# 단일 GPU async 엔진 — warmup + convert 직렬화(엔진 1개, 임시디렉토리/싱글톤 경합 차단).
|
||||
_engine_lock = asyncio.Lock()
|
||||
|
||||
|
||||
async def _run_mineru(pdf_bytes: bytes, lang: str) -> tuple[str, list[dict]]:
|
||||
"""슬라이스된 PDF bytes → (markdown, 이미지 dict 리스트). **async 엔진 경로.**
|
||||
|
||||
호출자(_ensure_warmup / convert)가 _engine_lock 을 잡은 상태로 호출한다.
|
||||
이미지 dict: {slug, format, width, height, raw_bytes}. slug = md href 그대로.
|
||||
"""
|
||||
import glob
|
||||
import tempfile
|
||||
|
||||
from mineru.cli.common import aio_do_parse
|
||||
|
||||
with tempfile.TemporaryDirectory(prefix="mineru_") as td:
|
||||
candidate = {
|
||||
"output_dir": td,
|
||||
"pdf_file_names": ["doc"],
|
||||
"pdf_bytes_list": [pdf_bytes],
|
||||
"p_lang_list": [lang],
|
||||
"backend": MINERU_BACKEND,
|
||||
"formula_enable": True,
|
||||
"table_enable": True,
|
||||
"f_dump_md": True,
|
||||
"f_dump_content_list": True,
|
||||
"f_dump_middle_json": False,
|
||||
"f_dump_model_output": False,
|
||||
"f_dump_orig_pdf": False,
|
||||
"f_draw_layout_bbox": False,
|
||||
"f_draw_span_bbox": False,
|
||||
"gpu_memory_utilization": GPU_MEM_UTIL,
|
||||
}
|
||||
sig = inspect.signature(aio_do_parse)
|
||||
has_var_kw = any(
|
||||
p.kind == inspect.Parameter.VAR_KEYWORD for p in sig.parameters.values()
|
||||
)
|
||||
kwargs = candidate if has_var_kw else {
|
||||
k: v for k, v in candidate.items() if k in sig.parameters
|
||||
}
|
||||
await aio_do_parse(**kwargs)
|
||||
|
||||
md_files = sorted(glob.glob(f"{td}/**/*.md", recursive=True))
|
||||
if not md_files:
|
||||
raise RuntimeError("mineru produced no markdown output")
|
||||
md_path = Path(md_files[0])
|
||||
md_text = md_path.read_text(encoding="utf-8", errors="replace")
|
||||
|
||||
images: list[dict] = []
|
||||
img_dir = md_path.parent / "images"
|
||||
if img_dir.is_dir():
|
||||
for img_file in sorted(img_dir.iterdir()):
|
||||
if not img_file.is_file():
|
||||
continue
|
||||
raw = img_file.read_bytes()
|
||||
slug = f"images/{img_file.name}" # md href 와 정확히 일치
|
||||
w = h = None
|
||||
try:
|
||||
with Image.open(io.BytesIO(raw)) as im:
|
||||
w, h = im.width, im.height
|
||||
fmt = (im.format or "JPEG").lower()
|
||||
except Exception:
|
||||
fmt = img_file.suffix.lstrip(".").lower() or "jpeg"
|
||||
images.append(
|
||||
{"slug": slug, "format": fmt, "width": w, "height": h, "raw_bytes": raw}
|
||||
)
|
||||
return md_text, images
|
||||
|
||||
|
||||
async def _ensure_warmup() -> None:
|
||||
"""첫 /convert 또는 startup hook 시 1-page 합성 PDF 로 엔진+모델 적재."""
|
||||
global _warmup_done, _warmup_error
|
||||
if _warmup_done:
|
||||
return
|
||||
async with _engine_lock:
|
||||
if _warmup_done:
|
||||
return
|
||||
try:
|
||||
logger.info("[mineru-service] warmup start (async engine load + model fetch)")
|
||||
doc = fitz.open()
|
||||
page = doc.new_page()
|
||||
page.insert_text((72, 72), "MinerU warmup.")
|
||||
warmup_bytes = doc.tobytes()
|
||||
doc.close()
|
||||
await _run_mineru(warmup_bytes, MINERU_LANG)
|
||||
_warmup_done = True
|
||||
_warmup_error = None
|
||||
logger.info(f"[mineru-service] warmup done engine_version={_engine_version}")
|
||||
except Exception as exc:
|
||||
_warmup_error = f"{type(exc).__name__}: {exc}"
|
||||
logger.exception("[mineru-service] warmup failed")
|
||||
raise
|
||||
|
||||
|
||||
@app.on_event("startup")
|
||||
async def startup():
|
||||
if _PRELOAD:
|
||||
asyncio.create_task(_ensure_warmup())
|
||||
|
||||
|
||||
# ---- 계약 모델 (marker 와 동일 shape) ----------------------------------------
|
||||
class ConvertRequest(BaseModel):
|
||||
file_path: str
|
||||
max_pages: int | None = None
|
||||
start_page: int | None = None # 1-based inclusive
|
||||
end_page: int | None = None # 1-based inclusive
|
||||
|
||||
|
||||
class ConvertImage(BaseModel):
|
||||
slug: str
|
||||
format: str
|
||||
width: int | None = None
|
||||
height: int | None = None
|
||||
bytes_b64: str
|
||||
|
||||
|
||||
class ConvertResponse(BaseModel):
|
||||
md_content: str
|
||||
md_content_hash: str
|
||||
engine: str
|
||||
engine_version: str
|
||||
elapsed_ms: int
|
||||
raw_metrics: dict
|
||||
images: list[ConvertImage] = Field(default_factory=list)
|
||||
images_truncated: bool = False
|
||||
|
||||
|
||||
@app.get("/health")
|
||||
def health():
|
||||
return {"status": "ok", "service": "mineru-service"}
|
||||
|
||||
|
||||
@app.get("/ready")
|
||||
async def ready(response: Response):
|
||||
"""marker /ready 의미 복제: warmup_failed 만 503, idle/warming=200(depends_on 굳음 방지)."""
|
||||
if _warmup_error:
|
||||
response.status_code = 503
|
||||
return {"status": "warmup_failed", "engine": "mineru",
|
||||
"engine_version": _engine_version, "error": _warmup_error}
|
||||
if not _warmup_done:
|
||||
return {"status": "warming_up" if _PRELOAD else "idle", "engine": "mineru",
|
||||
"engine_version": _engine_version, "models_loaded": False}
|
||||
return {"status": "ready", "engine": "mineru",
|
||||
"engine_version": _engine_version, "models_loaded": True}
|
||||
|
||||
|
||||
def _resolve_path(file_path: str) -> Path | None:
|
||||
"""NFC(DB) vs NFD(NFS) 한글 경로 정규화 차이 흡수. ocr/server.py 와 동일 패턴
|
||||
(필수 — 한글명 파일은 NFS=NFD 저장이라 DB 의 NFC 경로로는 is_file=False)."""
|
||||
for c in (file_path,
|
||||
unicodedata.normalize("NFD", file_path),
|
||||
unicodedata.normalize("NFC", file_path)):
|
||||
p = Path(c)
|
||||
if p.exists():
|
||||
return p
|
||||
parent = Path(file_path).parent
|
||||
if parent.exists():
|
||||
target = unicodedata.normalize("NFC", Path(file_path).name)
|
||||
for child in parent.iterdir():
|
||||
if unicodedata.normalize("NFC", child.name) == target:
|
||||
return child
|
||||
return None
|
||||
|
||||
|
||||
def _slice_pdf(src_path: Path, start_page: int | None, end_page: int | None,
|
||||
max_pages: int | None) -> tuple[bytes, int]:
|
||||
"""요청 page 범위(1-based inclusive)만 담은 새 PDF bytes + 변환 페이지수 반환."""
|
||||
with fitz.open(src_path) as src:
|
||||
n = src.page_count
|
||||
if start_page is not None and end_page is not None:
|
||||
a = max(0, start_page - 1)
|
||||
b = min(n - 1, end_page - 1)
|
||||
else:
|
||||
a = 0
|
||||
cap = max_pages if max_pages is not None else MAX_PAGES_HARD
|
||||
b = min(n - 1, cap - 1)
|
||||
if b < a:
|
||||
raise HTTPException(422, detail={"code": "bad_page_range",
|
||||
"message": f"a={a} b={b} n={n}"})
|
||||
out = fitz.open()
|
||||
out.insert_pdf(src, from_page=a, to_page=b)
|
||||
pdf_bytes = out.tobytes()
|
||||
out.close()
|
||||
return pdf_bytes, (b - a + 1)
|
||||
|
||||
|
||||
def _serialize_images(images: list[dict], src_path: str) -> tuple[list[ConvertImage], bool]:
|
||||
"""이미지 dict 리스트 → base64 ConvertImage 리스트 (marker 가드 동일)."""
|
||||
truncated = len(images) > MAX_IMAGES_PER_DOC
|
||||
if truncated:
|
||||
logger.warning(f"[mineru-service] images truncated path={src_path} "
|
||||
f"total={len(images)} cap={MAX_IMAGES_PER_DOC}")
|
||||
images = images[:MAX_IMAGES_PER_DOC]
|
||||
out: list[ConvertImage] = []
|
||||
for img in images:
|
||||
raw = img["raw_bytes"]
|
||||
if len(raw) > MAX_BYTES_PER_IMAGE:
|
||||
logger.warning(f"[mineru-service] image too large skipped path={src_path} "
|
||||
f"slug={img['slug']} bytes={len(raw)} cap={MAX_BYTES_PER_IMAGE}")
|
||||
continue
|
||||
out.append(ConvertImage(
|
||||
slug=img["slug"], format=img["format"],
|
||||
width=img.get("width"), height=img.get("height"),
|
||||
bytes_b64=base64.b64encode(raw).decode("ascii"),
|
||||
))
|
||||
return out, truncated
|
||||
|
||||
|
||||
@app.post("/convert", response_model=ConvertResponse)
|
||||
async def convert(req: ConvertRequest):
|
||||
p = _resolve_path(req.file_path)
|
||||
if p is None or not p.is_file():
|
||||
raise HTTPException(404, detail={"code": "file_not_found", "message": req.file_path})
|
||||
if req.start_page is not None and req.end_page is not None:
|
||||
if req.start_page < 1 or req.end_page < req.start_page:
|
||||
raise HTTPException(422, detail={"code": "bad_page_range",
|
||||
"message": f"start_page={req.start_page} end_page={req.end_page}"})
|
||||
|
||||
pdf_bytes, page_count = _slice_pdf(p, req.start_page, req.end_page, req.max_pages)
|
||||
|
||||
await _ensure_warmup() # 엔진 로드 보장(내부에서 _engine_lock 잡았다 놓음)
|
||||
async with _engine_lock: # 실제 변환 직렬화(단일 GPU)
|
||||
start = time.monotonic()
|
||||
try:
|
||||
md_text, raw_images = await _run_mineru(pdf_bytes, MINERU_LANG)
|
||||
except HTTPException:
|
||||
raise
|
||||
except Exception as exc:
|
||||
logger.exception(f"[mineru-service] conversion failed path={p}: {exc}")
|
||||
raise HTTPException(422, detail={"code": "conversion_failed",
|
||||
"message": f"{type(exc).__name__}: {exc}"}) from exc
|
||||
elapsed_ms = int((time.monotonic() - start) * 1000)
|
||||
|
||||
images_payload, truncated = _serialize_images(raw_images, str(p))
|
||||
|
||||
return ConvertResponse(
|
||||
md_content=md_text,
|
||||
md_content_hash=hashlib.sha256(md_text.encode("utf-8")).hexdigest(),
|
||||
engine="mineru",
|
||||
engine_version=_engine_version,
|
||||
elapsed_ms=elapsed_ms,
|
||||
raw_metrics={
|
||||
"page_count": page_count,
|
||||
"image_count_extracted": len(raw_images),
|
||||
"image_count_returned": len(images_payload),
|
||||
},
|
||||
images=images_payload,
|
||||
images_truncated=truncated,
|
||||
)
|
||||
@@ -1394,7 +1394,7 @@ def main() -> int:
|
||||
"--reranker-backend",
|
||||
type=str,
|
||||
default=None,
|
||||
help="Phase 2B Diagnose reranker dispatcher slug (baseline | cand_gte_ml_base). 미지정 = production.",
|
||||
help="Phase 2B Diagnose reranker dispatcher slug (baseline). 후보 cand_gte_ml_base = NO-GO 종결·teardown(2026-06-18). 미지정 = production.",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--rewrite-backend",
|
||||
|
||||
@@ -1,400 +0,0 @@
|
||||
"""PR-G2-3 — presegment LLM 경계 폴백 단위 테스트.
|
||||
|
||||
scaffold-first 안전성 박제:
|
||||
(a) parse_json_response + SegmentationOutput 가 대표 fixture(ToC-less 120p → 3 segments) 검증
|
||||
(b) 검증 게이트(_is_clear_bundle)가 정상 응답 수락 / 비정상(중첩·gap·tiny child·N>MAX) 거부
|
||||
(c) flag OFF(기본) → LLM 절대 호출 안 함(call_deep count==0), flag ON → 호출됨(positive control)
|
||||
|
||||
DB·PyMuPDF 불요(unit) — AsyncSession 은 최소 fake, fitz 는 sys.modules 주입 fake.
|
||||
라이브 LLM 호출 없음(call_deep 는 fixture 반환 monkeypatch). worker-process 레벨 E2E(실 PDF
|
||||
번들 분할, 보류 백오프 DB 기록)는 GPU 라이브 게이트에서 별도 실측.
|
||||
[[feedback_external_api_fixture_first]] / [[feedback_scaffold_first_for_external_cost_pr]]
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import sys
|
||||
import types
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
|
||||
sys.path.insert(0, str(Path(__file__).parent.parent / "app"))
|
||||
|
||||
from ai.client import parse_json_response # noqa: E402
|
||||
import workers.presegment_worker as pw # noqa: E402
|
||||
from workers.presegment_worker import ( # noqa: E402
|
||||
SegmentationOutput,
|
||||
_is_clear_bundle,
|
||||
_segments_from_output,
|
||||
)
|
||||
|
||||
# ─── 대표 fixture: ToC-less 120p 번들 → 3 segments (1-based inclusive, 전범위·무중첩) ───
|
||||
GOOD_LLM_JSON = json.dumps(
|
||||
{
|
||||
"is_bundle": True,
|
||||
"segments": [
|
||||
{"start_page": 1, "end_page": 40, "title": "문서 A"},
|
||||
{"start_page": 41, "end_page": 85, "title": "문서 B"},
|
||||
{"start_page": 86, "end_page": 120, "title": "문서 C"},
|
||||
],
|
||||
"confidence": 0.82,
|
||||
},
|
||||
ensure_ascii=False,
|
||||
)
|
||||
|
||||
PAGE_COUNT = 120
|
||||
|
||||
|
||||
# ─── (a) parse_json_response + SegmentationOutput 검증 ──────────────────────
|
||||
|
||||
|
||||
def test_parse_and_validate_good_fixture():
|
||||
parsed = parse_json_response(GOOD_LLM_JSON)
|
||||
assert parsed is not None
|
||||
out = SegmentationOutput.model_validate(parsed)
|
||||
assert out.is_bundle is True
|
||||
assert len(out.segments) == 3
|
||||
assert out.segments[0].start_page == 1
|
||||
assert out.segments[-1].end_page == PAGE_COUNT
|
||||
assert out.confidence == pytest.approx(0.82)
|
||||
|
||||
|
||||
def test_parse_tolerates_think_and_fence():
|
||||
"""house parse_json_response 가 <think> + ```json fence 를 벗겨낸다."""
|
||||
wrapped = f"<think>분석중...</think>\n```json\n{GOOD_LLM_JSON}\n```"
|
||||
parsed = parse_json_response(wrapped)
|
||||
out = SegmentationOutput.model_validate(parsed)
|
||||
assert out.is_bundle is True and len(out.segments) == 3
|
||||
|
||||
|
||||
# ─── (b) 검증 게이트 accept / reject ────────────────────────────────────────
|
||||
|
||||
|
||||
def _segments(*spans):
|
||||
return [{"start_page": s, "end_page": e, "title": ""} for (s, e) in spans]
|
||||
|
||||
|
||||
def test_gate_accepts_good():
|
||||
out = SegmentationOutput.model_validate(parse_json_response(GOOD_LLM_JSON))
|
||||
segs = _segments_from_output(out)
|
||||
clear, reason = _is_clear_bundle(segs, PAGE_COUNT)
|
||||
assert clear is True, reason
|
||||
assert reason == ""
|
||||
|
||||
|
||||
def test_gate_rejects_overlap():
|
||||
# 41 이어야 할 두번째 start 가 40 으로 중첩
|
||||
clear, reason = _is_clear_bundle(_segments((1, 40), (40, 85), (86, 120)), PAGE_COUNT)
|
||||
assert clear is False
|
||||
assert "non_contiguous" in reason
|
||||
|
||||
|
||||
def test_gate_rejects_gap():
|
||||
# 40 다음이 42 로 시작 → 41 빈틈 (non_contiguous 로 검출)
|
||||
clear, reason = _is_clear_bundle(_segments((1, 40), (42, 85), (86, 120)), PAGE_COUNT)
|
||||
assert clear is False
|
||||
assert "non_contiguous" in reason
|
||||
|
||||
|
||||
def test_gate_rejects_tiny_child():
|
||||
# 두번째 자식 41..43 = 3p < MIN_CHILD_PAGES(5)
|
||||
clear, reason = _is_clear_bundle(_segments((1, 40), (41, 43), (44, 120)), PAGE_COUNT)
|
||||
assert clear is False
|
||||
assert "child_too_small" in reason
|
||||
|
||||
|
||||
def test_gate_rejects_coverage_not_full():
|
||||
# 마지막이 page_count 에 못 미침
|
||||
clear, reason = _is_clear_bundle(_segments((1, 40), (41, 85), (86, 110)), PAGE_COUNT)
|
||||
assert clear is False
|
||||
assert "last_end_not_page_count" in reason
|
||||
|
||||
|
||||
def test_gate_rejects_too_many_children():
|
||||
# N > MAX_CHILDREN — 각 자식 MIN_CHILD_PAGES 만족시키되 개수만 초과
|
||||
n = pw.MAX_CHILDREN + 1
|
||||
pc = n * pw.MIN_CHILD_PAGES
|
||||
spans = [
|
||||
(i * pw.MIN_CHILD_PAGES + 1, (i + 1) * pw.MIN_CHILD_PAGES) for i in range(n)
|
||||
]
|
||||
clear, reason = _is_clear_bundle(_segments(*spans), pc)
|
||||
assert clear is False
|
||||
assert "too_many_children" in reason
|
||||
|
||||
|
||||
def test_gate_rejects_single_segment():
|
||||
clear, reason = _is_clear_bundle(_segments((1, 120)), PAGE_COUNT)
|
||||
assert clear is False
|
||||
assert "too_few_level1_entries" in reason
|
||||
|
||||
|
||||
# ─── 공통 fake (DB / PyMuPDF) ──────────────────────────────────────────────
|
||||
|
||||
|
||||
class _FakeDoc:
|
||||
"""presegment 가 읽는 Document 필드만 가진 최소 stand-in."""
|
||||
|
||||
def __init__(self, doc_id=1):
|
||||
self.id = doc_id
|
||||
self.file_path = "PKM/bundle.pdf"
|
||||
self.file_hash = "deadbeef"
|
||||
self.file_format = "pdf"
|
||||
self.file_size = 123
|
||||
self.file_type = "document"
|
||||
self.import_source = "upload"
|
||||
self.original_filename = "bundle.pdf"
|
||||
self.source_channel = None
|
||||
self.category = None
|
||||
self.data_origin = None
|
||||
self.doc_purpose = None
|
||||
self.material_type = None
|
||||
self.jurisdiction = None
|
||||
self.title = "번들"
|
||||
self.presegment_role = None
|
||||
self.bundle_page_start = None
|
||||
self.bundle_page_end = None
|
||||
self.extracted_at = None
|
||||
self.extracted_text = None
|
||||
|
||||
|
||||
class _ScalarResult:
|
||||
def __init__(self, rows):
|
||||
self._rows = rows
|
||||
|
||||
def scalars(self):
|
||||
return self
|
||||
|
||||
def all(self):
|
||||
return list(self._rows)
|
||||
|
||||
|
||||
class _FakeSession:
|
||||
"""_create_children / process 가 쓰는 AsyncSession 표면만 구현.
|
||||
|
||||
execute() = 기존 자식 lineage 조회 → 빈 결과(첫 분할). add/flush 로 child.id 부여.
|
||||
get() = document_id → 미리 등록한 doc, child_id → 생성된 child.
|
||||
"""
|
||||
|
||||
def __init__(self, doc):
|
||||
self._docs = {doc.id: doc}
|
||||
self.added = []
|
||||
self.commits = 0
|
||||
self.enqueued = [] # enqueue_stage monkeypatch 가 채움
|
||||
self._next_id = 1000
|
||||
|
||||
async def get(self, _model, oid):
|
||||
return self._docs.get(oid)
|
||||
|
||||
async def execute(self, _stmt):
|
||||
# _create_children 의 기존 자식 조회 → 항상 빈(첫 분할). enqueue_stage 는 monkeypatch.
|
||||
return _ScalarResult([])
|
||||
|
||||
def add(self, obj):
|
||||
self.added.append(obj)
|
||||
# child Document 에 id 부여 (flush 대용 — _FakeDoc/실 Document 모두 setattr 가능)
|
||||
if getattr(obj, "id", None) is None and hasattr(obj, "presegment_role"):
|
||||
self._next_id += 1
|
||||
obj.id = self._next_id
|
||||
self._docs[obj.id] = obj
|
||||
|
||||
async def flush(self):
|
||||
for obj in self.added:
|
||||
if getattr(obj, "id", None) is None and hasattr(obj, "presegment_role"):
|
||||
self._next_id += 1
|
||||
obj.id = self._next_id
|
||||
self._docs[obj.id] = obj
|
||||
|
||||
async def commit(self):
|
||||
self.commits += 1
|
||||
|
||||
|
||||
def _install_fake_fitz(monkeypatch, *, page_count=PAGE_COUNT, toc=None, first_lines=None):
|
||||
"""sys.modules['fitz'] 에 fake 주입 — worker 의 `import fitz` 가 이걸 받게 한다."""
|
||||
toc = toc or []
|
||||
|
||||
class _FakePage:
|
||||
def __init__(self, idx):
|
||||
self._idx = idx
|
||||
|
||||
def get_text(self):
|
||||
if first_lines and self._idx < len(first_lines):
|
||||
return first_lines[self._idx]
|
||||
return f"page {self._idx + 1} body text"
|
||||
|
||||
class _FakePdf:
|
||||
def __init__(self):
|
||||
self.page_count = page_count
|
||||
|
||||
def get_toc(self, simple=True):
|
||||
return list(toc)
|
||||
|
||||
def __getitem__(self, idx):
|
||||
return _FakePage(idx)
|
||||
|
||||
def __enter__(self):
|
||||
return self
|
||||
|
||||
def __exit__(self, *exc):
|
||||
return False
|
||||
|
||||
fake = types.ModuleType("fitz")
|
||||
fake.open = lambda *_a, **_k: _FakePdf()
|
||||
monkeypatch.setitem(sys.modules, "fitz", fake)
|
||||
return fake
|
||||
|
||||
|
||||
class _SpyClient:
|
||||
"""AIClient stand-in — call_deep 호출 횟수 카운트 + 지정 응답 반환."""
|
||||
|
||||
calls = 0
|
||||
response = GOOD_LLM_JSON
|
||||
|
||||
def __init__(self):
|
||||
type(self).calls += 1 # 인스턴스화 자체는 비용 아님 — 호출 카운트는 call_deep 기준
|
||||
|
||||
async def call_deep(self, prompt, system=None):
|
||||
type(self)._deep_calls += 1
|
||||
return type(self).response
|
||||
|
||||
async def close(self):
|
||||
pass
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def _reset_spy():
|
||||
_SpyClient.calls = 0
|
||||
_SpyClient._deep_calls = 0
|
||||
_SpyClient.response = GOOD_LLM_JSON
|
||||
yield
|
||||
|
||||
|
||||
# ─── (b) _llm_boundary_fallback 수락/거부 (mocked LLM) ──────────────────────
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_fallback_accepts_good_and_creates_children(monkeypatch):
|
||||
"""정상 LLM 응답 → 게이트 통과 → _create_children 가 3 자식 + parent 표식."""
|
||||
_install_fake_fitz(monkeypatch)
|
||||
monkeypatch.setattr(pw, "AIClient", _SpyClient)
|
||||
# enqueue_stage 는 DB 의존 — no-op 으로 대체 (호출 인자만 기록)
|
||||
enq = []
|
||||
|
||||
async def _fake_enqueue(session, doc_id, stage, **kw):
|
||||
enq.append((doc_id, stage))
|
||||
return True
|
||||
|
||||
monkeypatch.setattr(pw, "enqueue_stage", _fake_enqueue)
|
||||
|
||||
doc = _FakeDoc()
|
||||
session = _FakeSession(doc)
|
||||
ok = await pw._llm_boundary_fallback(doc, Path("/tmp/bundle.pdf"), PAGE_COUNT, session)
|
||||
|
||||
assert ok is True
|
||||
assert _SpyClient._deep_calls == 1
|
||||
# 자식 3개 생성 + parent 표식 + lineage 3 + commit
|
||||
children = [o for o in session.added if getattr(o, "presegment_role", None) == "child"]
|
||||
assert len(children) == 3
|
||||
assert doc.presegment_role == "parent"
|
||||
assert sum(1 for o in session.added if o.__class__.__name__ == "DocumentLineage") == 3
|
||||
assert {s for (_id, s) in enq} == {"extract"}
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_fallback_rejects_bad_segments(monkeypatch):
|
||||
"""LLM 이 중첩 경계 반환 → 게이트 거부 → False + 자식 0 (단일문서)."""
|
||||
_install_fake_fitz(monkeypatch)
|
||||
bad = json.dumps({
|
||||
"is_bundle": True,
|
||||
"segments": [
|
||||
{"start_page": 1, "end_page": 40},
|
||||
{"start_page": 40, "end_page": 85}, # 중첩
|
||||
{"start_page": 86, "end_page": 120},
|
||||
],
|
||||
})
|
||||
_SpyClient.response = bad
|
||||
monkeypatch.setattr(pw, "AIClient", _SpyClient)
|
||||
|
||||
async def _fake_enqueue(*a, **k):
|
||||
return True
|
||||
|
||||
monkeypatch.setattr(pw, "enqueue_stage", _fake_enqueue)
|
||||
|
||||
doc = _FakeDoc()
|
||||
session = _FakeSession(doc)
|
||||
ok = await pw._llm_boundary_fallback(doc, Path("/tmp/b.pdf"), PAGE_COUNT, session)
|
||||
|
||||
assert ok is False
|
||||
assert _SpyClient._deep_calls == 1
|
||||
assert [o for o in session.added if getattr(o, "presegment_role", None) == "child"] == []
|
||||
assert doc.presegment_role is None
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_fallback_rejects_is_bundle_false(monkeypatch):
|
||||
"""is_bundle=false → 호출은 했으나 분할 안 함(False, 자식 0)."""
|
||||
_install_fake_fitz(monkeypatch)
|
||||
_SpyClient.response = json.dumps({"is_bundle": False, "segments": []})
|
||||
monkeypatch.setattr(pw, "AIClient", _SpyClient)
|
||||
|
||||
async def _fake_enqueue(*a, **k):
|
||||
return True
|
||||
|
||||
monkeypatch.setattr(pw, "enqueue_stage", _fake_enqueue)
|
||||
|
||||
doc = _FakeDoc()
|
||||
session = _FakeSession(doc)
|
||||
ok = await pw._llm_boundary_fallback(doc, Path("/tmp/b.pdf"), PAGE_COUNT, session)
|
||||
assert ok is False
|
||||
assert _SpyClient._deep_calls == 1
|
||||
assert doc.presegment_role is None
|
||||
|
||||
|
||||
# ─── (c) flag gating — OFF=호출 0 (deployed default 무변), ON=호출됨 ───────────
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_flag_off_never_calls_llm(monkeypatch):
|
||||
"""PRESEGMENT_LLM_FALLBACK=False(기본) → 큰 ToC-less PDF 도 LLM 미호출 = 오늘과 동일."""
|
||||
monkeypatch.setattr(pw, "PRESEGMENT_LLM_FALLBACK", False)
|
||||
_install_fake_fitz(monkeypatch, page_count=120, toc=[]) # 대형 + level-1 ToC 없음 = 애매
|
||||
monkeypatch.setattr(pw, "AIClient", _SpyClient)
|
||||
monkeypatch.setattr(pw, "_resolve_path", lambda raw: Path("/tmp/bundle.pdf"))
|
||||
|
||||
async def _fake_enqueue(*a, **k):
|
||||
return True
|
||||
|
||||
monkeypatch.setattr(pw, "enqueue_stage", _fake_enqueue)
|
||||
|
||||
doc = _FakeDoc()
|
||||
session = _FakeSession(doc)
|
||||
await pw.process(doc.id, session)
|
||||
|
||||
assert _SpyClient._deep_calls == 0 # ★ LLM 절대 호출 안 됨
|
||||
assert doc.presegment_role is None # 단일문서 (분할 안 함)
|
||||
assert session.commits == 0
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_flag_on_calls_llm_and_splits(monkeypatch):
|
||||
"""positive control — flag ON 이면 같은 입력에 LLM 호출 + 게이트 통과 시 분할."""
|
||||
monkeypatch.setattr(pw, "PRESEGMENT_LLM_FALLBACK", True)
|
||||
_install_fake_fitz(monkeypatch, page_count=120, toc=[])
|
||||
_SpyClient.response = GOOD_LLM_JSON
|
||||
monkeypatch.setattr(pw, "AIClient", _SpyClient)
|
||||
monkeypatch.setattr(pw, "_resolve_path", lambda raw: Path("/tmp/bundle.pdf"))
|
||||
|
||||
async def _fake_enqueue(*a, **k):
|
||||
return True
|
||||
|
||||
monkeypatch.setattr(pw, "enqueue_stage", _fake_enqueue)
|
||||
|
||||
doc = _FakeDoc()
|
||||
session = _FakeSession(doc)
|
||||
await pw.process(doc.id, session)
|
||||
|
||||
assert _SpyClient._deep_calls == 1 # LLM 호출됨
|
||||
assert doc.presegment_role == "parent" # 분할 수행
|
||||
children = [o for o in session.added if getattr(o, "presegment_role", None) == "child"]
|
||||
assert len(children) == 3
|
||||
Reference in New Issue
Block a user