33ee81bf1d
ToC 없는/게이트 미달 대형 PDF(>=60p)에 한해 off-card Qwen(맥북, call_deep_or_defer, StageDeferred-safe) 경계 제안 → 동일 검증게이트(_is_clear_bundle) 통과 시에만 deterministic 과 공유하는 _create_children 로 분할. is_bundle=false/파싱·검증 실패=단일문서(오늘과 동일)+로깅. - env PRESEGMENT_LLM_FALLBACK 기본 false → 배포 동작 무변(LLM 미호출, 검증=unit test) - 자식생성 _create_children 공유 헬퍼로 리팩터(deterministic+LLM 단일 경로, 동작 동일) - SegmentationOutput Pydantic + parse_json_response(house 패턴) + per-page heading 샘플(본문 미전송) - prompt app/prompts/presegment_boundaries.txt + tests/test_presegment_llm.py(14, fitz/DB/LLM mock) no direct HTTP·no silent fallback. 활성=flag ON + 실 router fixture 검증 후. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
563 lines
25 KiB
Python
563 lines
25 KiB
Python
"""presegment_worker — extract 前 번들 PDF(여러 논리문서 한 파일) → N 자식 분할 (G2 / PR-G2-2).
|
|
|
|
전 문서가 presegment stage 로 진입한다(worker-side gating):
|
|
- 非PDF(file_format != pdf · suffix != .pdf) = 즉시 fast-exit → enqueue_next_stage 가 extract 로 흘림.
|
|
- PDF = PyMuPDF ToC(level-1) deterministic 분석. '명확한 번들' 만 자식 분할, 나머지는 단일문서로 extract.
|
|
|
|
deterministic 경로(PR-G2-2): 판정이 애매하면 보수적으로 분할하지 않고 단일문서로 둔다
|
|
(bias to NOT splitting). 분할 = '확실한 번들' 만:
|
|
- page_count >= MIN_BUNDLE_PAGES AND level-1 ToC 항목 >= 2 AND 모든 자식 >= MIN_CHILD_PAGES
|
|
AND 단조 증가·비중첩 AND [1, page_count] 전 범위 커버 AND 2 <= N <= MAX_CHILDREN.
|
|
|
|
LLM 경계 폴백(PR-G2-3, env PRESEGMENT_LLM_FALLBACK, 기본 OFF — scaffold-first): deterministic
|
|
이 '명확한 번들' 을 못 만든 대형 PDF(ToC 없음/level-1 없음/게이트 미달)에 한해, OFF 면 오늘과
|
|
동일(단일문서)이고 ON 이면 off-card Qwen(맥북, 라우터 :8890, model=qwen-macbook)에게 경계를
|
|
제안받는다. compact per-page heading 샘플만 전송(본문 미전송). LLM 출력은 **동일 검증 게이트
|
|
(_is_clear_bundle)** 통과 시에만 deterministic 과 같은 _create_children 경로로 분할 —
|
|
is_bundle=false / 파싱·검증 실패 = 단일문서(오늘과 동일) + presegment_llm_rejected 로깅.
|
|
맥북 불가(503/연결/절단)는 StageDeferred 로 큐 재시도(백오프, no silent fallback).
|
|
|
|
분할 시 ★후보 A(물리분할 없음, uq_documents_file_path 해소): 자식 file_path = unique 합성값
|
|
`{부모경로}#p{start}-{end}` (UNIQUE 제약 통과), 실파일은 `bundle_source_path()` 로 부모 경로 복원.
|
|
자식은 bundle_page_start/end(1-based inclusive) 로 부모 파일의 자기 page 범위만 가리킨다.
|
|
부모-자식 관계 정본 = document_lineage(relation_type='segmented_from'). 부모(presegment_role='parent')는
|
|
파일 홀더라 자체 extract/embed 안 함 — enqueue_next_stage 의 presegment→extract 전이가 'parent' 면
|
|
억제된다(queue_consumer 참조). 자식의 extract 는 이 워커가 직접 enqueue. extract_worker/marker_worker
|
|
가 자식 처리 시 bundle_source_path() 로 실파일 접근.
|
|
|
|
멱등: 재실행 시 같은 부모로 이미 자식이 있으면(document_lineage segmented_from) 재생성하지 않고
|
|
수렴(각 자식이 extract 활성/완료 상태인지만 보장)한다.
|
|
|
|
★해결 이력 (2026-06-18): 최초 Option A(자식이 부모 file_path 그대로 공유)는 uq_documents_file_path
|
|
UNIQUE 위반(실번들 검증서 발견) → 합성 file_path(후보 A)로 해소. 인제스트 재활성 = 합성번들 재검증 PASS 후.
|
|
|
|
plan: G2 pre-segmentation (PR-G2-2 deterministic ToC segmentation)
|
|
"""
|
|
|
|
import hashlib
|
|
import os
|
|
import re
|
|
import unicodedata
|
|
from pathlib import Path
|
|
|
|
from pydantic import BaseModel, ValidationError
|
|
from sqlalchemy import select
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from ai.client import AIClient, call_deep_or_defer, parse_json_response
|
|
from core.config import settings
|
|
from core.utils import setup_logger
|
|
from models.document import Document
|
|
from models.document_lineage import DocumentLineage
|
|
from models.queue import enqueue_stage
|
|
|
|
logger = setup_logger("presegment_worker")
|
|
|
|
# ─── 임계값 (모듈 상수, env-override 가능, 보수적 = 분할 안 하는 쪽으로 bias) ───
|
|
# MIN_BUNDLE_PAGES: 이 미만이면 번들로 보지 않음(단일문서). 짧은 문서의 우연한 level-1 ToC 보호.
|
|
MIN_BUNDLE_PAGES = int(os.getenv("PRESEGMENT_MIN_BUNDLE_PAGES", "60"))
|
|
# MIN_CHILD_PAGES: 자식 하나라도 이 미만이면 분할 거부(표지/목차만 떼지는 over-split 방지).
|
|
MIN_CHILD_PAGES = int(os.getenv("PRESEGMENT_MIN_CHILD_PAGES", "5"))
|
|
# MAX_CHILDREN: 자식 수 상한. 초과 = ToC 가 챕터/소제목 수준이라 논리문서 경계가 아님 → 분할 거부.
|
|
MAX_CHILDREN = int(os.getenv("PRESEGMENT_MAX_CHILDREN", "50"))
|
|
|
|
# marker_worker._to_marker_path 와 동일 — NAS 상대경로 → 컨테이너 절대경로 prefix.
|
|
CONTAINER_PATH_PREFIX = os.getenv("MARKER_CONTAINER_PATH_PREFIX", "/documents")
|
|
|
|
# ─── PR-G2-3 LLM 경계 폴백 (scaffold-first, 기본 OFF) ───
|
|
# PRESEGMENT_LLM_FALLBACK: 기본 "false". OFF 면 deterministic 경로만(=오늘과 동일 — 애매하면
|
|
# 단일문서). ON 이면 deterministic 이 '명확한 번들' 을 못 만든 대형 PDF(page_count >=
|
|
# MIN_BUNDLE_PAGES) 에 한해 off-card Qwen(맥북, 라우터 :8890 경유)에게 경계를 제안받아
|
|
# **동일 검증 게이트(_is_clear_bundle)** 통과 시에만 deterministic 과 같은 자식 생성 경로로 분할.
|
|
# 검증 실패/파싱 실패/is_bundle=false = 단일문서(오늘과 동일) + presegment_llm_rejected 로깅.
|
|
PRESEGMENT_LLM_FALLBACK = os.getenv("PRESEGMENT_LLM_FALLBACK", "false").lower() in (
|
|
"1", "true", "yes", "on",
|
|
)
|
|
# LLM 에 보내는 per-page 샘플의 page 당 char 상한 (heading/첫줄만 — 본문 미전송).
|
|
PRESEGMENT_LLM_PAGE_CHARS = int(os.getenv("PRESEGMENT_LLM_PAGE_CHARS", "80"))
|
|
# 전체 page-sample 블록의 char 상한 (수 KB 가드 — 초과 시 잘라냄, 본문 누출/페이로드 폭발 방지).
|
|
PRESEGMENT_LLM_SAMPLE_CHARS = int(os.getenv("PRESEGMENT_LLM_SAMPLE_CHARS", "12000"))
|
|
|
|
# 경계 폴백 프롬프트 (app/prompts/presegment_boundaries.txt). system 지시 + 1-based inclusive·
|
|
# 전범위 커버·무중첩 규칙. {page_count}/{page_samples} 를 str.replace 로 주입.
|
|
_PRESEGMENT_PROMPT_PATH = Path(__file__).parent.parent / "prompts" / "presegment_boundaries.txt"
|
|
|
|
|
|
class Segment(BaseModel):
|
|
"""LLM 이 제안하는 1-based inclusive page 범위 한 조각."""
|
|
|
|
start_page: int
|
|
end_page: int
|
|
title: str | None = None
|
|
|
|
|
|
class SegmentationOutput(BaseModel):
|
|
"""presegment_boundaries 응답 스키마. parse_json_response → model_validate."""
|
|
|
|
is_bundle: bool = False
|
|
segments: list[Segment] = []
|
|
confidence: float | None = None
|
|
|
|
|
|
def _resolve_path(file_path: str) -> Path | None:
|
|
"""NFC(DB) vs NFD(NFS) 한글 경로 차이 흡수. thumbnail_worker._resolve_path 와 동일 패턴."""
|
|
candidates = [
|
|
file_path,
|
|
unicodedata.normalize("NFD", file_path),
|
|
unicodedata.normalize("NFC", file_path),
|
|
]
|
|
for c in candidates:
|
|
p = Path(c)
|
|
if p.exists():
|
|
return p
|
|
parent = Path(file_path).parent
|
|
if parent.exists():
|
|
target = unicodedata.normalize("NFC", Path(file_path).name)
|
|
for child in parent.iterdir():
|
|
if unicodedata.normalize("NFC", child.name) == target:
|
|
return child
|
|
return None
|
|
|
|
|
|
def _to_container_path(file_path: str) -> str:
|
|
"""file_path 를 컨테이너 내부 절대경로로 변환 (marker_worker._to_marker_path 와 동일)."""
|
|
if file_path.startswith("/"):
|
|
return file_path
|
|
return f"{CONTAINER_PATH_PREFIX}/{file_path}"
|
|
|
|
|
|
# 후보 A: 자식 합성 file_path 패턴 `{부모경로}#p{start}-{end}` (uq_documents_file_path 유일성).
|
|
_BUNDLE_SUFFIX_RE = re.compile(r"#p\d+-\d+$")
|
|
|
|
|
|
def bundle_source_path(file_path: str | None) -> str | None:
|
|
"""자식 합성 file_path → 부모 실파일 경로 복원. 일반 doc(접미사 없음)은 그대로 반환.
|
|
|
|
extract_worker/marker_worker 가 자식 처리 시 실제 파일 접근에 사용 (자식 file_path 는
|
|
합성값이라 디스크에 없음). 결정적·세션 불필요. lineage 가 부모-자식 관계의 정본 기록.
|
|
"""
|
|
if not file_path:
|
|
return file_path
|
|
return _BUNDLE_SUFFIX_RE.sub("", file_path)
|
|
|
|
|
|
def _is_pdf(doc: Document) -> bool:
|
|
"""PDF 판정 — file_format=pdf 또는 .pdf 확장자."""
|
|
fmt = (doc.file_format or "").lower()
|
|
if fmt == "pdf":
|
|
return True
|
|
if doc.file_path:
|
|
return Path(doc.file_path).suffix.lower() == ".pdf"
|
|
return False
|
|
|
|
|
|
def _level1_segments(toc: list, page_count: int) -> list[dict]:
|
|
"""get_toc(simple=True) 결과에서 level-1 항목만 골라 자식 후보 segment 리스트 생성.
|
|
|
|
toc 항목 = [level, title, page] (page 는 1-based). level==1 만 채택.
|
|
end_page = 다음 level-1 항목의 page - 1, 마지막 = page_count.
|
|
동일 page 에서 시작하는 level-1 이 여럿이면 정렬 후 인접 항목으로 경계 계산되며,
|
|
그 경우 0-페이지 segment 가 생겨 후속 검증(MIN_CHILD_PAGES·단조)에서 거부된다.
|
|
"""
|
|
starts = []
|
|
for entry in toc:
|
|
# simple=True 는 [level, title, page]. 방어적으로 길이 체크.
|
|
if not entry or len(entry) < 3:
|
|
continue
|
|
level, title, page = entry[0], entry[1], entry[2]
|
|
if level != 1:
|
|
continue
|
|
# ToC page 가 범위 밖(0/음수/page_count 초과)이면 깨진 ToC → 후속 검증에서 거부됨.
|
|
starts.append((int(page), (title or "").strip()))
|
|
|
|
# ToC 가 정렬돼 있지 않을 수 있으므로 page 기준 정렬(원본 순서 보존 위해 안정 정렬).
|
|
starts.sort(key=lambda x: x[0])
|
|
|
|
segments: list[dict] = []
|
|
for i, (start_page, title) in enumerate(starts):
|
|
if i + 1 < len(starts):
|
|
end_page = starts[i + 1][0] - 1
|
|
else:
|
|
end_page = page_count
|
|
segments.append({"start_page": start_page, "end_page": end_page, "title": title})
|
|
return segments
|
|
|
|
|
|
def _is_clear_bundle(segments: list[dict], page_count: int) -> tuple[bool, str]:
|
|
"""deterministic '명확한 번들' 판정. (clear, reason) 반환.
|
|
|
|
clear=True 면 reason="" / clear=False 면 reason 은 거부 사유(로깅용).
|
|
모든 조건은 보수적 — 하나라도 어긋나면 단일문서로 처리(분할 안 함).
|
|
"""
|
|
n = len(segments)
|
|
if n < 2:
|
|
return False, f"too_few_level1_entries(n={n})"
|
|
if n > MAX_CHILDREN:
|
|
return False, f"too_many_children(n={n}>{MAX_CHILDREN})"
|
|
|
|
# 첫 segment 가 1페이지에서 시작 + 마지막이 page_count 에서 끝 = 전 범위 커버.
|
|
if segments[0]["start_page"] != 1:
|
|
return False, f"first_start_not_1(start={segments[0]['start_page']})"
|
|
if segments[-1]["end_page"] != page_count:
|
|
return False, f"last_end_not_page_count(end={segments[-1]['end_page']},pc={page_count})"
|
|
|
|
prev_end = 0
|
|
for seg in segments:
|
|
start, end = seg["start_page"], seg["end_page"]
|
|
# 단조 증가 · 비중첩: 각 start 는 직전 end + 1 이어야 빈틈/겹침 없이 [1,pc] 정확 분할.
|
|
if start != prev_end + 1:
|
|
return False, f"non_contiguous(start={start},prev_end={prev_end})"
|
|
if end < start:
|
|
return False, f"non_monotonic(start={start},end={end})"
|
|
if (end - start + 1) < MIN_CHILD_PAGES:
|
|
return False, f"child_too_small(pages={end - start + 1}<{MIN_CHILD_PAGES})"
|
|
prev_end = end
|
|
|
|
if prev_end != page_count:
|
|
return False, f"coverage_gap(covered={prev_end},pc={page_count})"
|
|
|
|
return True, ""
|
|
|
|
|
|
def _child_title(parent: Document, seg: dict) -> str:
|
|
"""자식 제목 = 부모 제목 + ' — ' + (segment 제목 또는 page 범위)."""
|
|
base = (parent.title or "").strip() or (parent.original_filename or "") or "문서"
|
|
seg_title = (seg.get("title") or "").strip()
|
|
suffix = seg_title if seg_title else f"p.{seg['start_page']}-{seg['end_page']}"
|
|
return f"{base} — {suffix}"
|
|
|
|
|
|
def _child_file_hash(parent_hash: str, start: int, end: int) -> str:
|
|
"""자식 file_hash = sha256(f"{parent.file_hash}:{start}-{end}"). 결정적 → 재실행 멱등.
|
|
|
|
부모 file_hash 가 NULL 일 수는 없으나(NOT NULL) 방어적으로 빈 문자열 처리.
|
|
"""
|
|
return hashlib.sha256(f"{parent_hash or ''}:{start}-{end}".encode("utf-8")).hexdigest()
|
|
|
|
|
|
async def _ensure_child_extract(session: AsyncSession, child_id: int) -> None:
|
|
"""자식이 아직 extract 안 됐으면 extract enqueue (멱등 수렴 경로).
|
|
|
|
이미 extracted_text 가 채워졌거나 활성 큐 행이 있으면 enqueue_stage 가 no-op/skip.
|
|
"""
|
|
child = await session.get(Document, child_id)
|
|
if child is None:
|
|
return
|
|
# 이미 추출 완료면 재enqueue 불필요 (큐 중복은 enqueue_stage 가 막지만 의미상으로도 skip).
|
|
if child.extracted_at is not None and child.extracted_text is not None:
|
|
return
|
|
await enqueue_stage(session, child_id, "extract")
|
|
|
|
|
|
async def _create_children(
|
|
doc: Document, segments: list[dict], session: AsyncSession
|
|
) -> int:
|
|
"""검증된 segments 로 자식 N개 생성 + lineage + extract enqueue + 부모 표식 (멱등).
|
|
|
|
deterministic '명확한 번들' 경로와 LLM 폴백 경로가 공유하는 단일 자식 생성 경로.
|
|
호출 전 segments 는 반드시 _is_clear_bundle 검증을 통과해야 한다(여기선 재검증 X).
|
|
commit 까지 수행. 반환값 = 실제 생성한 자식 수(이미 존재해 수렴만 한 경우 0).
|
|
"""
|
|
# ─── 멱등 체크: 이미 자식이 있으면 수렴만 (재생성 금지) ───
|
|
existing_children = (
|
|
await session.execute(
|
|
select(DocumentLineage.derived_document_id).where(
|
|
DocumentLineage.source_document_id == doc.id,
|
|
DocumentLineage.relation_type == "segmented_from",
|
|
)
|
|
)
|
|
).scalars().all()
|
|
|
|
if existing_children:
|
|
# 부모 표식이 누락된 경우 보정(이전 부분실패 복구).
|
|
if doc.presegment_role != "parent":
|
|
doc.presegment_role = "parent"
|
|
for child_id in existing_children:
|
|
await _ensure_child_extract(session, child_id)
|
|
await session.commit()
|
|
logger.info(
|
|
f"[presegment] id={doc.id} children already exist "
|
|
f"(n={len(existing_children)}) → converge(ensure extract), no re-create"
|
|
)
|
|
return 0
|
|
|
|
# ─── 자식 N개 생성 + lineage + extract enqueue ───
|
|
created_ids: list[int] = []
|
|
for seg in segments:
|
|
start, end = seg["start_page"], seg["end_page"]
|
|
child = Document(
|
|
# 후보 A: 자식 file_path = unique 합성값 `{부모경로}#p{s}-{e}` (uq_documents_file_path
|
|
# 충돌 회피). 실파일은 bundle_source_path() 로 복원(부모 경로). 물리 분할 없음 —
|
|
# 자식은 bundle_page_start/end 로 부모 파일을 슬라이스.
|
|
file_path=f"{doc.file_path}#p{start}-{end}",
|
|
file_hash=_child_file_hash(doc.file_hash, start, end),
|
|
file_format=doc.file_format,
|
|
file_size=doc.file_size,
|
|
file_type=doc.file_type,
|
|
import_source=doc.import_source,
|
|
original_filename=doc.original_filename,
|
|
source_channel=doc.source_channel,
|
|
category=doc.category,
|
|
data_origin=doc.data_origin,
|
|
doc_purpose=doc.doc_purpose,
|
|
# 안전 자료실 축은 부모에서 상속(분할이 자료유형/관할을 바꾸지 않음).
|
|
material_type=doc.material_type,
|
|
jurisdiction=doc.jurisdiction,
|
|
title=_child_title(doc, seg),
|
|
bundle_page_start=start,
|
|
bundle_page_end=end,
|
|
presegment_role="child",
|
|
)
|
|
session.add(child)
|
|
await session.flush() # child.id 확보
|
|
created_ids.append(child.id)
|
|
|
|
session.add(
|
|
DocumentLineage(
|
|
source_document_id=doc.id,
|
|
derived_document_id=child.id,
|
|
relation_type="segmented_from",
|
|
meta={"start_page": start, "end_page": end},
|
|
)
|
|
)
|
|
# 자식 extract 는 워커가 직접 enqueue (부모는 'parent' 라 extract 로 흐르지 않음).
|
|
await enqueue_stage(session, child.id, "extract")
|
|
|
|
# 부모 = 파일 홀더. presegment→extract 전이는 enqueue_next_stage 가 'parent' 면 억제.
|
|
doc.presegment_role = "parent"
|
|
await session.commit()
|
|
|
|
logger.info(
|
|
f"[presegment] id={doc.id} SPLIT into {len(created_ids)} children "
|
|
f"child_ids={created_ids}"
|
|
)
|
|
return len(created_ids)
|
|
|
|
|
|
def _segments_from_output(out: "SegmentationOutput") -> list[dict]:
|
|
"""SegmentationOutput.segments(Pydantic) → _is_clear_bundle / _create_children 가 쓰는 dict 형태."""
|
|
return [
|
|
{"start_page": s.start_page, "end_page": s.end_page, "title": (s.title or "")}
|
|
for s in out.segments
|
|
]
|
|
|
|
|
|
def _page_samples(pdf, page_count: int) -> str:
|
|
"""LLM 입력용 compact per-page 샘플 — page 당 heading/첫줄만(`p{n}: {firstline}`).
|
|
|
|
PyMuPDF page.get_text() 로 page 별 텍스트를 스트리밍하되 page 당 첫 비공백 줄만,
|
|
PRESEGMENT_LLM_PAGE_CHARS 로 잘라 본문 누출 차단. 전체 블록은 PRESEGMENT_LLM_SAMPLE_CHARS
|
|
가드로 상한(수 KB) — 초과 시 그 지점에서 중단(앞쪽 페이지 우선 보존).
|
|
"""
|
|
lines: list[str] = []
|
|
total = 0
|
|
for i in range(page_count):
|
|
try:
|
|
text = pdf[i].get_text() or ""
|
|
except Exception:
|
|
text = ""
|
|
first = ""
|
|
for ln in text.splitlines():
|
|
ln = ln.strip()
|
|
if ln:
|
|
first = ln
|
|
break
|
|
first = first[:PRESEGMENT_LLM_PAGE_CHARS]
|
|
entry = f"p{i + 1}: {first}"
|
|
if total + len(entry) + 1 > PRESEGMENT_LLM_SAMPLE_CHARS:
|
|
break
|
|
lines.append(entry)
|
|
total += len(entry) + 1
|
|
return "\n".join(lines)
|
|
|
|
|
|
async def _llm_boundary_fallback(
|
|
doc: Document, source: Path, page_count: int, session: AsyncSession
|
|
) -> bool:
|
|
"""애매 + 대형(ToC-less 등) PDF 에 대해 off-card Qwen 으로 경계 제안 → 검증 → 분할.
|
|
|
|
반환 True = LLM 경로가 분할을 수행(또는 멱등 수렴)했으므로 호출자는 추가 처리 없이 return.
|
|
반환 False = is_bundle=false / 파싱 실패 / 검증 실패 → 호출자는 단일문서(오늘과 동일) 처리.
|
|
맥북 불가(503/연결/절단)는 call_deep_or_defer 가 StageDeferred 로 raise → 큐 재시도(백오프).
|
|
silent fallback 금지 — deep 슬롯 외 다른 backend 자동 호출 안 함.
|
|
"""
|
|
import fitz # PyMuPDF — deterministic 경로와 동일 의존
|
|
|
|
# per-page 샘플은 파일을 다시 열어 스트리밍(deterministic with 블록과 분리해 그 경로 무회귀).
|
|
try:
|
|
with fitz.open(str(source)) as pdf:
|
|
samples = _page_samples(pdf, page_count)
|
|
except Exception as exc:
|
|
logger.warning(
|
|
f"[presegment] id={doc.id} llm fallback sample 실패 "
|
|
f"({type(exc).__name__}: {exc}) → single doc(extract)"
|
|
)
|
|
return False
|
|
|
|
try:
|
|
template = _PRESEGMENT_PROMPT_PATH.read_text(encoding="utf-8")
|
|
except Exception as exc:
|
|
logger.warning(
|
|
f"[presegment] id={doc.id} prompt 로드 실패 ({type(exc).__name__}: {exc}) "
|
|
f"→ single doc(extract)"
|
|
)
|
|
return False
|
|
|
|
prompt = template.replace("{page_count}", str(page_count)).replace(
|
|
"{page_samples}", samples
|
|
)
|
|
|
|
# off-card 호출 — call_deep_or_defer 가 deep 슬롯(맥북, 라우터 :8890, model=qwen-macbook)
|
|
# 으로 라우팅. 맥북 불가는 StageDeferred 로 전파(여기서 잡지 않음 → 큐가 보류/백오프).
|
|
# classify_worker 와 동일하게 AIClient() 인스턴스화.
|
|
client = AIClient()
|
|
try:
|
|
raw = await call_deep_or_defer(client, prompt)
|
|
finally:
|
|
await client.close()
|
|
|
|
parsed = parse_json_response(raw)
|
|
if not parsed:
|
|
logger.info(
|
|
f"[presegment] presegment_llm_rejected id={doc.id} "
|
|
f"reason=parse_failed raw={raw[:160]!r} → single doc(extract)"
|
|
)
|
|
return False
|
|
|
|
try:
|
|
out = SegmentationOutput.model_validate(parsed)
|
|
except (ValidationError, ValueError, TypeError) as exc:
|
|
logger.info(
|
|
f"[presegment] presegment_llm_rejected id={doc.id} "
|
|
f"reason=schema_invalid({type(exc).__name__}) → single doc(extract)"
|
|
)
|
|
return False
|
|
|
|
if not out.is_bundle:
|
|
logger.info(
|
|
f"[presegment] presegment_llm_rejected id={doc.id} "
|
|
f"reason=is_bundle_false → single doc(extract)"
|
|
)
|
|
return False
|
|
|
|
segments = _segments_from_output(out)
|
|
clear, reason = _is_clear_bundle(segments, page_count)
|
|
if not clear:
|
|
# LLM 출력을 그대로 믿지 않음 — deterministic 과 동일 게이트 미달이면 단일문서.
|
|
logger.info(
|
|
f"[presegment] presegment_llm_rejected id={doc.id} "
|
|
f"reason={reason} n={len(segments)} pages={page_count} → single doc(extract)"
|
|
)
|
|
return False
|
|
|
|
n = await _create_children(doc, segments, session)
|
|
logger.info(
|
|
f"[presegment] id={doc.id} LLM-SPLIT accepted "
|
|
f"(pages={page_count} n={len(segments)} created={n} "
|
|
f"confidence={out.confidence})"
|
|
)
|
|
return True
|
|
|
|
|
|
async def process(document_id: int, session: AsyncSession) -> None:
|
|
"""presegment stage 워커 진입점. queue_consumer 가 호출.
|
|
|
|
전 문서가 진입하며, 非PDF·단일문서는 변경 없이 통과(presegment_role 그대로 NULL) → extract 로 흐른다.
|
|
'명확한 번들' PDF 만 자식 분할 + 부모를 'parent' 로 표식(이 경우 부모는 extract 로 흐르지 않음).
|
|
"""
|
|
doc = await session.get(Document, document_id)
|
|
if doc is None:
|
|
logger.warning(f"[presegment] document {document_id} not found")
|
|
return
|
|
|
|
# ─── (0) 非PDF — fast-exit. presegment_role 그대로 NULL → enqueue_next_stage 가 extract 로 흘림 ───
|
|
if not _is_pdf(doc):
|
|
logger.info(f"[presegment] id={document_id} non-pdf (fmt={doc.file_format}) → extract")
|
|
return
|
|
|
|
# ─── (0.5) file_path 없음(예: note) — 분할 불가, 단일문서로 통과 ───
|
|
if not doc.file_path:
|
|
logger.info(f"[presegment] id={document_id} no file_path → extract")
|
|
return
|
|
|
|
# ─── (1) 이미 분할된 자식 자신이 presegment 로 다시 들어온 경우 — 재분할 금지 ───
|
|
# (정상 흐름에선 자식은 곧장 extract 로 enqueue 되지만, 재처리 스크립트 등으로 들어올 수 있음.)
|
|
if doc.presegment_role in ("child", "parent"):
|
|
logger.info(
|
|
f"[presegment] id={document_id} already presegment_role={doc.presegment_role} → skip"
|
|
)
|
|
return
|
|
|
|
# ─── (2) 파일 열기 + page_count ───
|
|
raw = str(Path(settings.nas_mount_path) / doc.file_path)
|
|
source = _resolve_path(raw)
|
|
if source is None:
|
|
# 파일 부재 = extract 가 동일 상황에서 FileNotFoundError 로 처리할 사안.
|
|
# presegment 는 분할 불가일 뿐이므로 단일문서로 통과시켜 extract 가 일관되게 처리하게 둔다.
|
|
logger.warning(f"[presegment] id={document_id} file not found ({raw}) → extract")
|
|
return
|
|
|
|
import fitz # PyMuPDF — extract_worker/marker_worker 와 동일 의존
|
|
|
|
try:
|
|
with fitz.open(str(source)) as pdf:
|
|
page_count = pdf.page_count
|
|
toc = pdf.get_toc(simple=True) or []
|
|
except Exception as exc:
|
|
# PDF 손상 등 — 분할 불가. 단일문서로 통과(extract 가 PyMuPDF/OCR 로 재시도하며 가시화).
|
|
logger.warning(
|
|
f"[presegment] id={document_id} fitz open/toc failed "
|
|
f"({type(exc).__name__}: {exc}) → extract"
|
|
)
|
|
return
|
|
|
|
# ─── (3) page_count 가 임계 미만 = 단일문서 (대다수 경로) ───
|
|
if page_count < MIN_BUNDLE_PAGES:
|
|
logger.info(
|
|
f"[presegment] id={document_id} single doc "
|
|
f"(pages={page_count}<{MIN_BUNDLE_PAGES}) → extract"
|
|
)
|
|
return
|
|
|
|
# ─── (4) level-1 ToC → 자식 후보 segment ───
|
|
segments = _level1_segments(toc, page_count)
|
|
|
|
if not segments:
|
|
# 큰 PDF 인데 ToC 없음/level-1 없음 = 애매. flag ON 이면 LLM 경계 폴백(PR-G2-3),
|
|
# OFF(기본) 이면 오늘과 동일 — 단일문서로 처리하고 사유를 남긴다.
|
|
if PRESEGMENT_LLM_FALLBACK:
|
|
logger.info(
|
|
f"[presegment] presegment_ambiguous id={document_id} "
|
|
f"reason=no_level1_toc pages={page_count} → LLM fallback"
|
|
)
|
|
if await _llm_boundary_fallback(doc, source, page_count, session):
|
|
return
|
|
# LLM 이 분할하지 않음(is_bundle=false / 검증·파싱 실패) — 단일문서.
|
|
return
|
|
logger.info(
|
|
f"[presegment] presegment_ambiguous id={document_id} "
|
|
f"reason=no_level1_toc pages={page_count} → single doc(extract)"
|
|
)
|
|
return
|
|
|
|
clear, reason = _is_clear_bundle(segments, page_count)
|
|
if not clear:
|
|
# 큰 PDF + ToC 는 있으나 '명확한 번들' 기준 미달 = 애매. flag ON 이면 LLM 경계 폴백,
|
|
# OFF(기본) 이면 오늘과 동일 — 단일문서(분할 안 함).
|
|
if PRESEGMENT_LLM_FALLBACK:
|
|
logger.info(
|
|
f"[presegment] presegment_ambiguous id={document_id} "
|
|
f"reason={reason} pages={page_count} level1={len(segments)} → LLM fallback"
|
|
)
|
|
if await _llm_boundary_fallback(doc, source, page_count, session):
|
|
return
|
|
return
|
|
logger.info(
|
|
f"[presegment] presegment_ambiguous id={document_id} "
|
|
f"reason={reason} pages={page_count} level1={len(segments)} → single doc(extract)"
|
|
)
|
|
return
|
|
|
|
# ─── (5) 명확한 번들 (deterministic) — 공유 자식 생성 경로 (멱등 수렴 포함) ───
|
|
await _create_children(doc, segments, session)
|