From d75fb7adaa5d1cccc1abc52448a41f992e3b7939 Mon Sep 17 00:00:00 2001 From: hyungi Date: Thu, 18 Jun 2026 16:43:38 +0900 Subject: [PATCH 1/5] =?UTF-8?q?feat(presegment):=20G2=20PR-1=20=EC=8A=A4?= =?UTF-8?q?=ED=82=A4=EB=A7=88=20=E2=80=94=20documents=20=EB=B6=84=ED=95=A0?= =?UTF-8?q?=20=EC=BB=AC=EB=9F=BC=20+=20lineage=20segmented=5Ffrom=20+=20pr?= =?UTF-8?q?esegment=20=EC=8A=A4=ED=85=8C=EC=9D=B4=EC=A7=80?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit G2 pre-segmentation 기반 스키마(추가형, 미사용까지 무동작). 권장 기본값 채택: - 362: documents.bundle_page_start/end(1-based)+presegment_role(NULL/parent/child) - 363: document_lineage CHECK 에 'segmented_from' 추가(부모→자식 관계, RESTRICT-delete 재사용) - 364: process_stage enum 에 'presegment'(extract 前 번들 분할 스테이지) - ORM: Document 3컬럼 + queue enum literal + 신규 DocumentLineage 모델 배포 DB(PG16.13, schema_migrations=361) 대비 txn-rollback 실측 PASS(362/363/364 전부). PR-2(presegment_worker+큐 배선+extract/marker range-clamp)·PR-3(LLM 경계 폴백) 후속. Co-Authored-By: Claude Opus 4.8 (1M context) --- app/models/document.py | 8 +++++ app/models/document_lineage.py | 31 +++++++++++++++++++ app/models/queue.py | 3 +- migrations/362_documents_presegment_cols.sql | 10 ++++++ .../363_document_lineage_segmented_from.sql | 8 +++++ migrations/364_process_stage_presegment.sql | 5 +++ 6 files changed, 64 insertions(+), 1 deletion(-) create mode 100644 app/models/document_lineage.py create mode 100644 migrations/362_documents_presegment_cols.sql create mode 100644 migrations/363_document_lineage_segmented_from.sql create mode 100644 migrations/364_process_stage_presegment.sql diff --git a/app/models/document.py b/app/models/document.py index 8436da8..14a2d60 100644 --- a/app/models/document.py +++ b/app/models/document.py @@ -41,6 +41,14 @@ class Document(Base): Integer, nullable=False, default=0, server_default="0" ) + # G2 pre-segmentation (migration 362): 번들 PDF → N 자식 분할. + # presegment_role: NULL=일반 단일문서 / 'parent'=번들원본(자체 extract/embed 안 함) / + # 'child'=논리 하위문서(부모 file_path 공유 + bundle_page_start/end 1-based inclusive 범위). + # 부모-자식 관계 자체는 document_lineage(relation_type='segmented_from'). + bundle_page_start: Mapped[int | None] = mapped_column(Integer) + bundle_page_end: Mapped[int | None] = mapped_column(Integer) + presegment_role: Mapped[str | None] = mapped_column(Text) + # 2계층: 텍스트 추출 extracted_text: Mapped[str | None] = mapped_column(Text) extracted_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True)) diff --git a/app/models/document_lineage.py b/app/models/document_lineage.py new file mode 100644 index 0000000..4cb29fc --- /dev/null +++ b/app/models/document_lineage.py @@ -0,0 +1,31 @@ +"""document_lineage 테이블 ORM — 문서 파생 관계 이력 (migration 217). + +G2 pre-segmentation 이 relation_type='segmented_from'(번들 → 자식) 으로 사용 (migration 363). +이력 테이블 FK = ON DELETE RESTRICT (부모 hard delete 차단, soft delete 만 허용). +""" +from datetime import datetime + +from sqlalchemy import BigInteger, ForeignKey, Text, func +from sqlalchemy.dialects.postgresql import JSONB +from sqlalchemy.orm import Mapped, mapped_column +from sqlalchemy.types import TIMESTAMP + +from core.database import Base + + +class DocumentLineage(Base): + __tablename__ = "document_lineage" + + id: Mapped[int] = mapped_column(BigInteger, primary_key=True) + source_document_id: Mapped[int] = mapped_column( + BigInteger, ForeignKey("documents.id", ondelete="RESTRICT"), nullable=False + ) + derived_document_id: Mapped[int] = mapped_column( + BigInteger, ForeignKey("documents.id", ondelete="RESTRICT"), nullable=False + ) + relation_type: Mapped[str] = mapped_column(Text, nullable=False) + # 'metadata' 는 SQLAlchemy 예약속성 → Python 속성명은 meta, DB 컬럼명은 metadata. + meta: Mapped[dict] = mapped_column( + "metadata", JSONB, nullable=False, default=dict, server_default="{}" + ) + created_at: Mapped[datetime] = mapped_column(TIMESTAMP(timezone=True), server_default=func.now()) diff --git a/app/models/queue.py b/app/models/queue.py index f750b1a..c0ed78e 100644 --- a/app/models/queue.py +++ b/app/models/queue.py @@ -46,9 +46,10 @@ class ProcessingQueue(Base): # 'stt' (audio): migration 150 / 'thumbnail' (video): queue_consumer 가 enqueue. # 'deep_summary' (PR-B B-1): classify_worker 가 에스컬레이션 시 enqueue. # 'fulltext' (crawl-24x7 A-2): migration 321 — 기사 페이지 fetch 후 본문 승격. + # 'presegment' (G2): migration 364 — extract 前 번들 PDF → N 자식 분할. # DB enum 변경은 마이그레이션이 처리하므로 create_type=False. Enum( - "extract", "classify", "summarize", "embed", "chunk", "preview", + "presegment", "extract", "classify", "summarize", "embed", "chunk", "preview", "stt", "thumbnail", "deep_summary", "markdown", "fulltext", name="process_stage", create_type=False, diff --git a/migrations/362_documents_presegment_cols.sql b/migrations/362_documents_presegment_cols.sql new file mode 100644 index 0000000..b1008fd --- /dev/null +++ b/migrations/362_documents_presegment_cols.sql @@ -0,0 +1,10 @@ +-- 362: G2 pre-segmentation — 번들 PDF(여러 논리문서 한 파일) → N 자식 문서 분할. +-- 자식 doc 의 원본 내 page 범위(1-based inclusive) + 분할 역할 표식. +-- 부모-자식 관계 자체는 document_lineage(relation_type='segmented_from', migration 363). +-- presegment_role: NULL=일반 단일문서(대다수) / 'parent'=번들원본(자체 extract/embed 안 함) / +-- 'child'=논리 하위문서(부모 file_path 공유 + bundle_page_start/end 범위로 슬라이스). +-- 단일 ALTER(다중 절) = 1 statement (asyncpg 멀티스테이트먼트 제약 준수). +ALTER TABLE documents + ADD COLUMN IF NOT EXISTS bundle_page_start INTEGER, + ADD COLUMN IF NOT EXISTS bundle_page_end INTEGER, + ADD COLUMN IF NOT EXISTS presegment_role TEXT; diff --git a/migrations/363_document_lineage_segmented_from.sql b/migrations/363_document_lineage_segmented_from.sql new file mode 100644 index 0000000..e13faf5 --- /dev/null +++ b/migrations/363_document_lineage_segmented_from.sql @@ -0,0 +1,8 @@ +-- 363: G2 — document_lineage.relation_type 에 'segmented_from'(번들 → 자식) 추가. +-- 217 의 column-level CHECK(PG 자동명 document_lineage_relation_type_check, 배포 DB 실측 확인) +-- 를 교체. DROP + ADD 를 단일 ALTER 의 두 절로 = 1 statement. +-- 멱등: DROP ... IF EXISTS 라 재실행 안전(이미 교체됐으면 새 제약 DROP 후 동일 재생성). +ALTER TABLE document_lineage + DROP CONSTRAINT IF EXISTS document_lineage_relation_type_check, + ADD CONSTRAINT document_lineage_relation_type_check + CHECK (relation_type IN ('cited','summarized_from','generated_from','revised_from','segmented_from')); diff --git a/migrations/364_process_stage_presegment.sql b/migrations/364_process_stage_presegment.sql new file mode 100644 index 0000000..218d1e7 --- /dev/null +++ b/migrations/364_process_stage_presegment.sql @@ -0,0 +1,5 @@ +-- 364: G2 — process_stage 큐 스테이지 enum 에 'presegment' 추가 (extract 前 번들 분할 단계). +-- PG16: ALTER TYPE ADD VALUE 는 트랜잭션 내 실행 가능(값 추가만, 同 트랜잭션 내 사용은 안 함 — +-- 사용은 후속 마이그/런타임). IF NOT EXISTS = 재실행 멱등. +-- (이 한 줄 단독 파일 — 1 statement.) +ALTER TYPE process_stage ADD VALUE IF NOT EXISTS 'presegment'; From c3d5c338133066d86f1d63fd5bbc2814812511bc Mon Sep 17 00:00:00 2001 From: hyungi Date: Thu, 18 Jun 2026 16:55:27 +0900 Subject: [PATCH 2/5] =?UTF-8?q?feat(presegment):=20G2=20PR-2=20=E2=80=94?= =?UTF-8?q?=20presegment=20=EC=9B=8C=EC=BB=A4=20+=20=ED=81=90=20=EB=B0=B0?= =?UTF-8?q?=EC=84=A0=20+=20range-clamp=20(deterministic=20ToC)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit extract 前 presegment 스테이지: 전 문서 진입, 非PDF/단일은 무변 통과, '명확한 번들' PDF만 ToC(level-1) deterministic 분할. LLM 폴백은 PR-3. - presegment_worker: 보수적 게이트(pages>=60·자식>=5p·연속/단조/전범위·2<=N<=50) + 멱등 (lineage segmented_from 존재 시 수렴) + 자식=부모파일 공유(Option A)+range - queue_consumer: BATCH_SIZE/MAIN_QUEUE_STAGES/_load_workers + presegment->extract 전이, parent(번들원본)는 억제(자식이 직접 extract enqueue) - ingest(documents.py upload·file_watcher): 첫 stage extract->presegment - extract_worker/marker_worker: bundle_page_start/end 시 해당 범위만 추출/변환 (NULL=일반문서 byte-identical 무회귀 — 검수 확인) 코드 검수 완료(무회귀·full_path 스코프·NOT NULL 커버·py_compile). **미배포** — 실제 번들 PDF 처리 검증 후 배포(PR-3 LLM 폴백과 함께). Co-Authored-By: Claude Opus 4.8 (1M context) --- app/api/documents.py | 6 +- app/workers/extract_worker.py | 75 ++++++- app/workers/file_watcher.py | 10 +- app/workers/marker_worker.py | 54 ++++- app/workers/presegment_worker.py | 339 +++++++++++++++++++++++++++++++ app/workers/queue_consumer.py | 27 ++- 6 files changed, 486 insertions(+), 25 deletions(-) create mode 100644 app/workers/presegment_worker.py diff --git a/app/api/documents.py b/app/api/documents.py index 2d5ddde..b7ffbcb 100644 --- a/app/api/documents.py +++ b/app/api/documents.py @@ -1166,8 +1166,10 @@ async def upload_document( doc.duplicate_of = canonical.id canonical.duplicate_count = (canonical.duplicate_count or 0) + 1 - # document + processing_queue 는 단일 트랜잭션으로 묶어 원자적 정리 - await enqueue_stage(session, doc.id, "extract") + # document + processing_queue 는 단일 트랜잭션으로 묶어 원자적 정리. + # G2 (PR-G2-2): 첫 stage = presegment (extract 前). 非PDF/단일문서는 presegment 가 + # 변경 없이 통과시켜 extract 로 흐르고, 번들 PDF 만 자식 분할된다 (worker-side gating). + await enqueue_stage(session, doc.id, "presegment") await session.commit() except Exception: # DB 예외 시 session 은 get_session 컨텍스트 종료로 자동 rollback. diff --git a/app/workers/extract_worker.py b/app/workers/extract_worker.py index cc162b1..b671e6c 100644 --- a/app/workers/extract_worker.py +++ b/app/workers/extract_worker.py @@ -67,21 +67,45 @@ def _postprocess_ocr(text: str) -> str: return text.strip() -def _extract_pdf_pymupdf(file_path: Path) -> str: - """PyMuPDF fallback — 페이지 단위 스트리밍으로 대형 PDF도 저메모리 처리""" +def _extract_pdf_pymupdf( + file_path: Path, start_page: int | None = None, end_page: int | None = None +) -> str: + """PyMuPDF fallback — 페이지 단위 스트리밍으로 대형 PDF도 저메모리 처리. + + G2 (PR-G2-2): start_page/end_page(1-based inclusive) 가 주어지면 그 범위만 추출 + (번들 자식 doc = 부모 파일 공유 + 자기 page 범위). 둘 다 None = 전체(기존 동작 동일). + """ import fitz text_parts = [] with fitz.open(str(file_path)) as doc: - for page in doc: - text_parts.append(page.get_text()) + if start_page is None and end_page is None: + for page in doc: + text_parts.append(page.get_text()) + else: + # 1-based inclusive → 0-based range. 범위는 [0, page_count] 로 클램프(방어). + total = doc.page_count + lo = max(1, start_page or 1) - 1 + hi = min(total, end_page or total) # inclusive 끝 (0-based 마지막 인덱스 = hi-1) + for i in range(lo, hi): + text_parts.append(doc.load_page(i).get_text()) return "\n".join(text_parts) -def _get_pdf_page_count(file_path: Path) -> int: - """PDF 페이지 수 확인""" +def _get_pdf_page_count( + file_path: Path, start_page: int | None = None, end_page: int | None = None +) -> int: + """PDF 페이지 수 확인. G2: 범위가 주어지면 그 범위의 페이지 수(자식 doc 밀도 계산용). + + 둘 다 None = 전체 페이지 수(기존 동작 동일). + """ import fitz with fitz.open(str(file_path)) as doc: - return len(doc) + total = len(doc) + if start_page is None and end_page is None: + return total + lo = max(1, start_page or 1) + hi = min(total, end_page or total) + return max(0, hi - lo + 1) async def _call_ocr(file_path: Path, is_image: bool, max_pages: int = 200) -> str | None: @@ -310,6 +334,43 @@ async def process(document_id: int, session: AsyncSession) -> None: doc.extracted_at = datetime.now(timezone.utc) return + # ─── G2 (PR-G2-2): 번들 자식 PDF — 부모 파일 공유 + 자기 page 범위만 추출 ─── + # kordoc 서비스는 page-range 파라미터가 없어 전체 파일을 파싱한다(자식엔 부적합) → kordoc + # 우회, PyMuPDF 로 [bundle_page_start, bundle_page_end] 범위만 추출. range OCR 은 본 PR 범위 + # 밖(자식은 ToC 존재 = digital text layer 전제 → 대개 OCR 불필요). PyMuPDF 텍스트가 빈약해도 + # 그대로 보존하고 사유를 남긴다. + if fmt == "pdf" and doc.bundle_page_start is not None and doc.bundle_page_end is not None: + if not full_path.exists(): + raise FileNotFoundError(f"파일 없음: {full_path}") + start, end = doc.bundle_page_start, doc.bundle_page_end + try: + pymupdf_text = _extract_pdf_pymupdf(full_path, start, end) + page_count = _get_pdf_page_count(full_path, start, end) + except Exception as e: + logger.error(f"[pymupdf:child] {doc.file_path} pages={start}-{end} 실패: {e}") + raise + + meta = doc.extract_meta or {} + meta["presegment_child_range"] = {"start_page": start, "end_page": end} + meta["pymupdf_chars"] = len(pymupdf_text.strip()) + should, reason = _should_ocr(pymupdf_text, page_count) + if should: + # range OCR 미지원(후속 PR) — PyMuPDF 결과 유지 + 사유 기록(silent skip 아님). + meta["ocr_skip_reason"] = "presegment_child_range_ocr_unsupported" + meta["ocr_reason"] = reason + logger.warning( + f"[pymupdf:child] {doc.file_path} pages={start}-{end} " + f"OCR 필요({reason})하나 range OCR 미지원 → PyMuPDF 결과 유지" + ) + doc.extracted_text = pymupdf_text.replace("\x00", "") + doc.extracted_at = datetime.now(timezone.utc) + doc.extractor_version = PYMUPDF_VERSION if pymupdf_text.strip() else None + doc.extract_meta = meta + logger.info( + f"[pymupdf:child] {doc.file_path} pages={start}-{end} ({len(pymupdf_text)}자)" + ) + return + # ─── kordoc 파싱 (HWP/HWPX/PDF) + PyMuPDF fallback + OCR ─── if fmt in KORDOC_FORMATS: container_path = f"/documents/{doc.file_path}" diff --git a/app/workers/file_watcher.py b/app/workers/file_watcher.py index 99c7694..8abba74 100644 --- a/app/workers/file_watcher.py +++ b/app/workers/file_watcher.py @@ -118,16 +118,18 @@ def _route_media(path: Path, expected_category: str | None) -> tuple[str | None, if expected_category == "library": # 외부 작성 학습 자료 (KGS Code, 시행규칙 등). 문서 확장자만 수락. # frontmatter 해석은 classify_worker (옵션 C) 가 담당. file_watcher 는 라우팅만. + # G2 (PR-G2-2): 문서 첫 stage = presegment (extract 前). 非PDF/단일은 통과, 번들 PDF 만 분할. if ext in LIBRARY_DOC_EXTS: - return ("library", False, "extract") + return ("library", False, "presegment") if ext in AUDIO_EXTS or ext in VIDEO_DIRECT_EXTS or ext in VIDEO_QUARANTINE_EXTS: return (None, False, None) # audio/video 잘못 들어오면 skip return (None, False, None) # 기타 알 수 없는 확장자 skip # Inbox: 문서 파이프 (기존). audio/video 확장자가 실수로 여기 들어오면 skip. + # G2 (PR-G2-2): 문서 첫 stage = presegment (extract 前). 非PDF/단일은 통과, 번들 PDF 만 분할. if ext in AUDIO_EXTS or ext in VIDEO_DIRECT_EXTS or ext in VIDEO_QUARANTINE_EXTS: return (None, False, None) - return (None, False, "extract") + return (None, False, "presegment") # ─── Web/Blog ingest (devonagent 트랙) 헬퍼 ────────────────────────────────── @@ -226,7 +228,9 @@ async def _ingest_web_file(session, file_path: Path, rel_path: str) -> tuple[int ) session.add(doc) await session.flush() - await enqueue_stage(session, doc.id, "extract") + # G2 (PR-G2-2): 모든 문서가 presegment 로 진입(단일 entry-point). HTML(非PDF)은 presegment 가 + # 변경 없이 통과시켜 extract 로 흐른다 (worker-side gating). + await enqueue_stage(session, doc.id, "presegment") return (1, 0) diff --git a/app/workers/marker_worker.py b/app/workers/marker_worker.py index 6f1055f..44c39d2 100644 --- a/app/workers/marker_worker.py +++ b/app/workers/marker_worker.py @@ -207,7 +207,21 @@ async def process(document_id: int, session: AsyncSession) -> None: return # ---- (4) page_count gauge + 분기 (LargeDoc split) ---- - page_count = _get_page_count(container_path) + # G2 (PR-G2-2): 번들 자식 doc 은 부모 파일 공유 + 자기 page 범위([bundle_page_start, end], + # 1-based inclusive)만 변환해야 한다. page_offset = 절대 시작페이지(부모 파일 기준), page_count = + # 자식 범위의 페이지 수. cols 가 NULL(일반 doc)이면 page_offset=1 + 전체 page_count = 기존 동작 동일. + file_page_count = _get_page_count(container_path) + is_child = doc.bundle_page_start is not None and doc.bundle_page_end is not None + if is_child: + page_offset = doc.bundle_page_start + if file_page_count is not None: + child_end = min(doc.bundle_page_end, file_page_count) + page_count = max(0, child_end - doc.bundle_page_start + 1) + else: + page_count = doc.bundle_page_end - doc.bundle_page_start + 1 + else: + page_offset = 1 + page_count = file_page_count # >MAX_SPLIT_PAGES = 변환 안전상태(manual_review). silently skip 아님. if page_count is not None and page_count > MAX_SPLIT_PAGES: @@ -226,20 +240,35 @@ async def process(document_id: int, session: AsyncSession) -> None: # ---- (6) 변환 분기: 소형 1-shot / 대형(>SPLIT_THRESHOLD) page-range 분할 ---- if page_count is not None and page_count > SPLIT_THRESHOLD_PAGES: - await _process_split(doc, document_id, container_path, page_count, session) + await _process_split(doc, document_id, container_path, page_count, session, page_offset) else: - await _process_single(doc, document_id, container_path, session) + await _process_single(doc, document_id, container_path, session, page_count, page_offset) async def _process_single( - doc: Document, document_id: int, container_path: str, session: AsyncSession + doc: Document, document_id: int, container_path: str, session: AsyncSession, + page_count: int | None = None, page_offset: int = 1, ) -> None: - """소형 PDF(≤ SPLIT_THRESHOLD_PAGES) 통째 1-shot 변환 (Phase 1B/1B.5 기존 경로).""" + """소형 PDF(≤ SPLIT_THRESHOLD_PAGES) 통째 1-shot 변환 (Phase 1B/1B.5 기존 경로). + + G2 (PR-G2-2): 번들 자식(page_offset>1)은 [page_offset, page_offset+page_count-1] 범위만 + 변환하도록 marker 에 start_page/end_page 를 명시한다. 일반 doc(page_offset=1)은 기존과 + 동일하게 max_pages 만 보낸다(payload byte-identical). + """ + # 일반 doc = 기존 payload 유지. 자식만 절대 page 범위를 명시(부모 파일 기준 1-based inclusive). + if page_offset > 1 and page_count is not None: + req_json = { + "file_path": container_path, + "start_page": page_offset, + "end_page": page_offset + page_count - 1, + } + else: + req_json = {"file_path": container_path, "max_pages": MAX_PAGES} try: async with httpx.AsyncClient(timeout=MARKER_TIMEOUT) as client: resp = await client.post( MARKER_ENDPOINT, - json={"file_path": container_path, "max_pages": MAX_PAGES}, + json=req_json, ) resp.raise_for_status() data = resp.json() @@ -513,6 +542,7 @@ async def _process_split( container_path: str, page_count: int, session: AsyncSession, + page_offset: int = 1, ) -> None: """대형 PDF page-range 분할 변환. @@ -523,6 +553,10 @@ async def _process_split( invariant: page numbering = 1-based inclusive (batch1: 1..BATCH_PAGES, ...). marker slug(`_page_0_*`) 는 batch 마다 재시작 → batch 별 rewrite 후 stitch (충돌 회피). + + G2 (PR-G2-2): page_offset = 부모 파일 기준 절대 시작페이지(번들 자식). marker 에 보내는 + page 는 절대값(page_offset 가산), manifest/기록은 자식 상대값(1-based) 유지 — 일반 doc + (page_offset=1)은 abs==rel 이라 기존 동작과 동일. """ n_batches = (page_count + BATCH_PAGES - 1) // BATCH_PAGES succeeded: list[dict[str, Any]] = [] # {start_page, end_page, md} @@ -534,15 +568,17 @@ async def _process_split( async with httpx.AsyncClient(timeout=MARKER_TIMEOUT) as client: for b in range(n_batches): - start_page = b * BATCH_PAGES + 1 + start_page = b * BATCH_PAGES + 1 # 자식 상대 1-based (manifest/기록용) end_page = min((b + 1) * BATCH_PAGES, page_count) + abs_start = start_page + (page_offset - 1) # 부모 파일 절대 page (marker 요청용) + abs_end = end_page + (page_offset - 1) try: resp = await client.post( MARKER_ENDPOINT, json={ "file_path": container_path, - "start_page": start_page, - "end_page": end_page, + "start_page": abs_start, + "end_page": abs_end, }, ) resp.raise_for_status() diff --git a/app/workers/presegment_worker.py b/app/workers/presegment_worker.py new file mode 100644 index 0000000..b3700e0 --- /dev/null +++ b/app/workers/presegment_worker.py @@ -0,0 +1,339 @@ +"""presegment_worker — extract 前 번들 PDF(여러 논리문서 한 파일) → N 자식 분할 (G2 / PR-G2-2). + +전 문서가 presegment stage 로 진입한다(worker-side gating): + - 非PDF(file_format != pdf · suffix != .pdf) = 즉시 fast-exit → enqueue_next_stage 가 extract 로 흘림. + - PDF = PyMuPDF ToC(level-1) deterministic 분석. '명확한 번들' 만 자식 분할, 나머지는 단일문서로 extract. + +이 PR 은 **deterministic 만** (LLM fallback = 후속 PR). 판정이 애매하면 보수적으로 분할하지 않고 +단일문서로 둔다(bias to NOT splitting). 분할 = '확실한 번들' 만: + - page_count >= MIN_BUNDLE_PAGES AND level-1 ToC 항목 >= 2 AND 모든 자식 >= MIN_CHILD_PAGES + AND 단조 증가·비중첩 AND [1, page_count] 전 범위 커버 AND 2 <= N <= MAX_CHILDREN. + +분할 시 Option A(파일 물리분할 없음): 자식은 부모 file_path 를 그대로 공유하고 +bundle_page_start/end(1-based inclusive) 로 자기 page 범위만 가리킨다. 부모-자식 관계 자체는 +document_lineage(relation_type='segmented_from'). 부모(presegment_role='parent')는 파일 홀더라 +자체 extract/embed 안 함 — enqueue_next_stage 의 presegment→extract 전이가 'parent' 면 억제된다 +(queue_consumer.enqueue_next_stage 참조). 자식의 extract 는 이 워커가 직접 enqueue 한다. + +멱등: 재실행 시 같은 부모로 이미 자식이 있으면(document_lineage segmented_from) 재생성하지 않고 +수렴(각 자식이 extract 활성/완료 상태인지만 보장)한다. + +plan: G2 pre-segmentation (PR-G2-2 deterministic ToC segmentation) +""" + +import hashlib +import os +import unicodedata +from pathlib import Path + +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession + +from core.config import settings +from core.utils import setup_logger +from models.document import Document +from models.document_lineage import DocumentLineage +from models.queue import enqueue_stage + +logger = setup_logger("presegment_worker") + +# ─── 임계값 (모듈 상수, env-override 가능, 보수적 = 분할 안 하는 쪽으로 bias) ─── +# MIN_BUNDLE_PAGES: 이 미만이면 번들로 보지 않음(단일문서). 짧은 문서의 우연한 level-1 ToC 보호. +MIN_BUNDLE_PAGES = int(os.getenv("PRESEGMENT_MIN_BUNDLE_PAGES", "60")) +# MIN_CHILD_PAGES: 자식 하나라도 이 미만이면 분할 거부(표지/목차만 떼지는 over-split 방지). +MIN_CHILD_PAGES = int(os.getenv("PRESEGMENT_MIN_CHILD_PAGES", "5")) +# MAX_CHILDREN: 자식 수 상한. 초과 = ToC 가 챕터/소제목 수준이라 논리문서 경계가 아님 → 분할 거부. +MAX_CHILDREN = int(os.getenv("PRESEGMENT_MAX_CHILDREN", "50")) + +# marker_worker._to_marker_path 와 동일 — NAS 상대경로 → 컨테이너 절대경로 prefix. +CONTAINER_PATH_PREFIX = os.getenv("MARKER_CONTAINER_PATH_PREFIX", "/documents") + + +def _resolve_path(file_path: str) -> Path | None: + """NFC(DB) vs NFD(NFS) 한글 경로 차이 흡수. thumbnail_worker._resolve_path 와 동일 패턴.""" + candidates = [ + file_path, + unicodedata.normalize("NFD", file_path), + unicodedata.normalize("NFC", file_path), + ] + for c in candidates: + p = Path(c) + if p.exists(): + return p + parent = Path(file_path).parent + if parent.exists(): + target = unicodedata.normalize("NFC", Path(file_path).name) + for child in parent.iterdir(): + if unicodedata.normalize("NFC", child.name) == target: + return child + return None + + +def _to_container_path(file_path: str) -> str: + """file_path 를 컨테이너 내부 절대경로로 변환 (marker_worker._to_marker_path 와 동일).""" + if file_path.startswith("/"): + return file_path + return f"{CONTAINER_PATH_PREFIX}/{file_path}" + + +def _is_pdf(doc: Document) -> bool: + """PDF 판정 — file_format=pdf 또는 .pdf 확장자.""" + fmt = (doc.file_format or "").lower() + if fmt == "pdf": + return True + if doc.file_path: + return Path(doc.file_path).suffix.lower() == ".pdf" + return False + + +def _level1_segments(toc: list, page_count: int) -> list[dict]: + """get_toc(simple=True) 결과에서 level-1 항목만 골라 자식 후보 segment 리스트 생성. + + toc 항목 = [level, title, page] (page 는 1-based). level==1 만 채택. + end_page = 다음 level-1 항목의 page - 1, 마지막 = page_count. + 동일 page 에서 시작하는 level-1 이 여럿이면 정렬 후 인접 항목으로 경계 계산되며, + 그 경우 0-페이지 segment 가 생겨 후속 검증(MIN_CHILD_PAGES·단조)에서 거부된다. + """ + starts = [] + for entry in toc: + # simple=True 는 [level, title, page]. 방어적으로 길이 체크. + if not entry or len(entry) < 3: + continue + level, title, page = entry[0], entry[1], entry[2] + if level != 1: + continue + # ToC page 가 범위 밖(0/음수/page_count 초과)이면 깨진 ToC → 후속 검증에서 거부됨. + starts.append((int(page), (title or "").strip())) + + # ToC 가 정렬돼 있지 않을 수 있으므로 page 기준 정렬(원본 순서 보존 위해 안정 정렬). + starts.sort(key=lambda x: x[0]) + + segments: list[dict] = [] + for i, (start_page, title) in enumerate(starts): + if i + 1 < len(starts): + end_page = starts[i + 1][0] - 1 + else: + end_page = page_count + segments.append({"start_page": start_page, "end_page": end_page, "title": title}) + return segments + + +def _is_clear_bundle(segments: list[dict], page_count: int) -> tuple[bool, str]: + """deterministic '명확한 번들' 판정. (clear, reason) 반환. + + clear=True 면 reason="" / clear=False 면 reason 은 거부 사유(로깅용). + 모든 조건은 보수적 — 하나라도 어긋나면 단일문서로 처리(분할 안 함). + """ + n = len(segments) + if n < 2: + return False, f"too_few_level1_entries(n={n})" + if n > MAX_CHILDREN: + return False, f"too_many_children(n={n}>{MAX_CHILDREN})" + + # 첫 segment 가 1페이지에서 시작 + 마지막이 page_count 에서 끝 = 전 범위 커버. + if segments[0]["start_page"] != 1: + return False, f"first_start_not_1(start={segments[0]['start_page']})" + if segments[-1]["end_page"] != page_count: + return False, f"last_end_not_page_count(end={segments[-1]['end_page']},pc={page_count})" + + prev_end = 0 + for seg in segments: + start, end = seg["start_page"], seg["end_page"] + # 단조 증가 · 비중첩: 각 start 는 직전 end + 1 이어야 빈틈/겹침 없이 [1,pc] 정확 분할. + if start != prev_end + 1: + return False, f"non_contiguous(start={start},prev_end={prev_end})" + if end < start: + return False, f"non_monotonic(start={start},end={end})" + if (end - start + 1) < MIN_CHILD_PAGES: + return False, f"child_too_small(pages={end - start + 1}<{MIN_CHILD_PAGES})" + prev_end = end + + if prev_end != page_count: + return False, f"coverage_gap(covered={prev_end},pc={page_count})" + + return True, "" + + +def _child_title(parent: Document, seg: dict) -> str: + """자식 제목 = 부모 제목 + ' — ' + (segment 제목 또는 page 범위).""" + base = (parent.title or "").strip() or (parent.original_filename or "") or "문서" + seg_title = (seg.get("title") or "").strip() + suffix = seg_title if seg_title else f"p.{seg['start_page']}-{seg['end_page']}" + return f"{base} — {suffix}" + + +def _child_file_hash(parent_hash: str, start: int, end: int) -> str: + """자식 file_hash = sha256(f"{parent.file_hash}:{start}-{end}"). 결정적 → 재실행 멱등. + + 부모 file_hash 가 NULL 일 수는 없으나(NOT NULL) 방어적으로 빈 문자열 처리. + """ + return hashlib.sha256(f"{parent_hash or ''}:{start}-{end}".encode("utf-8")).hexdigest() + + +async def _ensure_child_extract(session: AsyncSession, child_id: int) -> None: + """자식이 아직 extract 안 됐으면 extract enqueue (멱등 수렴 경로). + + 이미 extracted_text 가 채워졌거나 활성 큐 행이 있으면 enqueue_stage 가 no-op/skip. + """ + child = await session.get(Document, child_id) + if child is None: + return + # 이미 추출 완료면 재enqueue 불필요 (큐 중복은 enqueue_stage 가 막지만 의미상으로도 skip). + if child.extracted_at is not None and child.extracted_text is not None: + return + await enqueue_stage(session, child_id, "extract") + + +async def process(document_id: int, session: AsyncSession) -> None: + """presegment stage 워커 진입점. queue_consumer 가 호출. + + 전 문서가 진입하며, 非PDF·단일문서는 변경 없이 통과(presegment_role 그대로 NULL) → extract 로 흐른다. + '명확한 번들' PDF 만 자식 분할 + 부모를 'parent' 로 표식(이 경우 부모는 extract 로 흐르지 않음). + """ + doc = await session.get(Document, document_id) + if doc is None: + logger.warning(f"[presegment] document {document_id} not found") + return + + # ─── (0) 非PDF — fast-exit. presegment_role 그대로 NULL → enqueue_next_stage 가 extract 로 흘림 ─── + if not _is_pdf(doc): + logger.info(f"[presegment] id={document_id} non-pdf (fmt={doc.file_format}) → extract") + return + + # ─── (0.5) file_path 없음(예: note) — 분할 불가, 단일문서로 통과 ─── + if not doc.file_path: + logger.info(f"[presegment] id={document_id} no file_path → extract") + return + + # ─── (1) 이미 분할된 자식 자신이 presegment 로 다시 들어온 경우 — 재분할 금지 ─── + # (정상 흐름에선 자식은 곧장 extract 로 enqueue 되지만, 재처리 스크립트 등으로 들어올 수 있음.) + if doc.presegment_role in ("child", "parent"): + logger.info( + f"[presegment] id={document_id} already presegment_role={doc.presegment_role} → skip" + ) + return + + # ─── (2) 파일 열기 + page_count ─── + raw = str(Path(settings.nas_mount_path) / doc.file_path) + source = _resolve_path(raw) + if source is None: + # 파일 부재 = extract 가 동일 상황에서 FileNotFoundError 로 처리할 사안. + # presegment 는 분할 불가일 뿐이므로 단일문서로 통과시켜 extract 가 일관되게 처리하게 둔다. + logger.warning(f"[presegment] id={document_id} file not found ({raw}) → extract") + return + + import fitz # PyMuPDF — extract_worker/marker_worker 와 동일 의존 + + try: + with fitz.open(str(source)) as pdf: + page_count = pdf.page_count + toc = pdf.get_toc(simple=True) or [] + except Exception as exc: + # PDF 손상 등 — 분할 불가. 단일문서로 통과(extract 가 PyMuPDF/OCR 로 재시도하며 가시화). + logger.warning( + f"[presegment] id={document_id} fitz open/toc failed " + f"({type(exc).__name__}: {exc}) → extract" + ) + return + + # ─── (3) page_count 가 임계 미만 = 단일문서 (대다수 경로) ─── + if page_count < MIN_BUNDLE_PAGES: + logger.info( + f"[presegment] id={document_id} single doc " + f"(pages={page_count}<{MIN_BUNDLE_PAGES}) → extract" + ) + return + + # ─── (4) level-1 ToC → 자식 후보 segment ─── + segments = _level1_segments(toc, page_count) + + if not segments: + # 큰 PDF 인데 ToC 없음/level-1 없음 = 애매(LLM fallback 대상, 후속 PR). + # 이 PR 은 기본 = 단일문서로 처리하고 사유를 남긴다. + logger.info( + f"[presegment] presegment_ambiguous id={document_id} " + f"reason=no_level1_toc pages={page_count} → single doc(extract)" + ) + return + + clear, reason = _is_clear_bundle(segments, page_count) + if not clear: + # 큰 PDF + ToC 는 있으나 '명확한 번들' 기준 미달 = 애매 → 단일문서(분할 안 함). + logger.info( + f"[presegment] presegment_ambiguous id={document_id} " + f"reason={reason} pages={page_count} level1={len(segments)} → single doc(extract)" + ) + return + + # ─── (5) 명확한 번들 — 멱등 체크: 이미 자식이 있으면 수렴만 ─── + existing_children = ( + await session.execute( + select(DocumentLineage.derived_document_id).where( + DocumentLineage.source_document_id == doc.id, + DocumentLineage.relation_type == "segmented_from", + ) + ) + ).scalars().all() + + if existing_children: + # 부모 표식이 누락된 경우 보정(이전 부분실패 복구). + if doc.presegment_role != "parent": + doc.presegment_role = "parent" + for child_id in existing_children: + await _ensure_child_extract(session, child_id) + await session.commit() + logger.info( + f"[presegment] id={document_id} children already exist " + f"(n={len(existing_children)}) → converge(ensure extract), no re-create" + ) + return + + # ─── (6) 자식 N개 생성 + lineage + extract enqueue ─── + n = len(segments) + created_ids: list[int] = [] + for seg in segments: + start, end = seg["start_page"], seg["end_page"] + child = Document( + # Option A: 부모 파일 그대로 공유(물리 분할 없음). 자식은 bundle_page_start/end 로 슬라이스. + file_path=doc.file_path, + file_hash=_child_file_hash(doc.file_hash, start, end), + file_format=doc.file_format, + file_size=doc.file_size, + file_type=doc.file_type, + import_source=doc.import_source, + original_filename=doc.original_filename, + source_channel=doc.source_channel, + category=doc.category, + data_origin=doc.data_origin, + doc_purpose=doc.doc_purpose, + # 안전 자료실 축은 부모에서 상속(분할이 자료유형/관할을 바꾸지 않음). + material_type=doc.material_type, + jurisdiction=doc.jurisdiction, + title=_child_title(doc, seg), + bundle_page_start=start, + bundle_page_end=end, + presegment_role="child", + ) + session.add(child) + await session.flush() # child.id 확보 + created_ids.append(child.id) + + session.add( + DocumentLineage( + source_document_id=doc.id, + derived_document_id=child.id, + relation_type="segmented_from", + meta={"start_page": start, "end_page": end}, + ) + ) + # 자식 extract 는 워커가 직접 enqueue (부모는 'parent' 라 extract 로 흐르지 않음). + await enqueue_stage(session, child.id, "extract") + + # 부모 = 파일 홀더. presegment→extract 전이는 enqueue_next_stage 가 'parent' 면 억제. + doc.presegment_role = "parent" + await session.commit() + + logger.info( + f"[presegment] id={document_id} SPLIT into {n} children " + f"(pages={page_count}) child_ids={created_ids}" + ) diff --git a/app/workers/queue_consumer.py b/app/workers/queue_consumer.py index c7c2ebb..a72acd2 100644 --- a/app/workers/queue_consumer.py +++ b/app/workers/queue_consumer.py @@ -31,9 +31,9 @@ _hold_logged = False # embed/chunk 1→10 (2026-06-12 fast-consumer): 건당 <1s 실측 — Phase 0.1 초기 보수값이 # LLM 사이클에 인질로 잡혀 실효 ~580/일 vs 수요 최대 2,700/일 → 적체 원인이었음. # 10 = TEI/marker 와 GPU 공유 고려한 보수 상향(전용 1분 잡 기준 캡 ~14,400/일). -BATCH_SIZE = {"extract": 5, "classify": 3, "summarize": 3, "embed": 10, "chunk": 10, - "preview": 2, "stt": 1, "thumbnail": 3, "deep_summary": 1, "markdown": 1, - "fulltext": 3} +BATCH_SIZE = {"presegment": 3, "extract": 5, "classify": 3, "summarize": 3, "embed": 10, + "chunk": 10, "preview": 2, "stt": 1, "thumbnail": 3, "deep_summary": 1, + "markdown": 1, "fulltext": 3} STALE_THRESHOLD_MINUTES = 10 # markdown 대형 split 변환은 한 doc 이 수십 분(5210 ≈ 40분) 동안 processing 상태로 머문다. # marker_worker 는 queue 행에 heartbeat 를 찍지 않으므로(started_at 고정), main 의 10분 @@ -46,7 +46,7 @@ MARKDOWN_STALE_THRESHOLD_MINUTES = int(os.getenv("MARKDOWN_STALE_MINUTES", "120" # (reset_stale_items 가 자기 집합만 reset, 교차 시 이중 복구 위험). # STT 도 장기 작업 가능성이 있으나 본 PR 범위 밖 — main 에 유지(follow-up). MAIN_QUEUE_STAGES = [ - "extract", "classify", "summarize", + "presegment", "extract", "classify", "summarize", "preview", "stt", "thumbnail", "fulltext", ] MARKDOWN_QUEUE_STAGES = ["markdown"] @@ -165,6 +165,10 @@ async def enqueue_next_stage(document_id: int, current_stage: str): } next_stages = { + # G2 (PR-G2-2): 전 문서가 presegment → extract. 단, 번들 분할로 'parent' 가 된 문서는 + # 파일 홀더라 자체 extract 안 함 — 아래 suppression 으로 이 전이를 건너뛴다(자식 extract 는 + # presegment_worker 가 직접 enqueue). 단일/非PDF 문서(role NULL)는 정상적으로 extract 로 흐름. + "presegment": ["extract"], "extract": ["classify", "preview"], "classify": ["embed", "chunk", "markdown"], "stt": ["classify"], @@ -180,6 +184,18 @@ async def enqueue_next_stage(document_id: int, current_stage: str): stages = extract_override_by_channel[sc] else: stages = next_stages.get(current_stage, []) + elif current_stage == "presegment": + # 번들 분할 parent 는 extract 로 흐르지 않게 억제 (자식이 부모 extract 에 가려지는 것 방지). + # role NULL(단일/非PDF) / 'child' 는 정상 전이. presegment_worker 가 자식 extract 를 직접 + # enqueue 하므로 'parent' 만 여기서 no-op. + from models.document import Document + async with async_session() as lookup_session: + doc = await lookup_session.get(Document, document_id) + role = doc.presegment_role if doc else None + if role == "parent": + stages = [] + else: + stages = next_stages.get(current_stage, []) else: stages = next_stages.get(current_stage, []) @@ -199,6 +215,7 @@ def _load_workers(): from workers.deep_summary_worker import process as deep_summary_process from workers.embed_worker import process as embed_process from workers.extract_worker import process as extract_process + from workers.presegment_worker import process as presegment_process from workers.preview_worker import process as preview_process from workers.stt_worker import process as stt_process from workers.summarize_worker import process as summarize_process @@ -207,6 +224,8 @@ def _load_workers(): from workers.fulltext_worker import process as fulltext_process return { + # G2 (PR-G2-2): extract 前 번들 PDF → N 자식 분할 (deterministic ToC). 非PDF/단일은 통과. + "presegment": presegment_process, "extract": extract_process, "classify": classify_process, "summarize": summarize_process, From 860c5c6b0c7a55597011e4860a61e22336d276ca Mon Sep 17 00:00:00 2001 From: hyungi Date: Thu, 18 Jun 2026 17:07:38 +0900 Subject: [PATCH 3/5] =?UTF-8?q?fix(presegment):=20G2=20=EC=9D=B8=EC=A0=9C?= =?UTF-8?q?=EC=8A=A4=ED=8A=B8=20=EB=B9=84=ED=99=9C=EC=84=B1=20=E2=80=94=20?= =?UTF-8?q?Option=20A=20vs=20uq=5Fdocuments=5Ffile=5Fpath=20=EC=B6=A9?= =?UTF-8?q?=EB=8F=8C?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ★실번들 검증서 발견: 자식 Document(부모 file_path 공유, Option A)가 uq_documents_file_path UNIQUE 제약 위반 → 자식 INSERT 실패. 검증된 G1 파이프라인 보호 위해 인제스트를 직접 extract 로 원복(documents.py/file_watcher 4곳). 스키마(362~364)+presegment_worker 코드는 보존(재설계 후 재활성). 재설계 후보: 자식 file_path=unique 합성값+부모 lineage 에서 실파일 해석 / file_path NULL+bundle_source_path. Co-Authored-By: Claude Opus 4.8 (1M context) --- app/api/documents.py | 6 +++--- app/workers/file_watcher.py | 13 ++++++------- app/workers/presegment_worker.py | 6 ++++++ 3 files changed, 15 insertions(+), 10 deletions(-) diff --git a/app/api/documents.py b/app/api/documents.py index b7ffbcb..3dec397 100644 --- a/app/api/documents.py +++ b/app/api/documents.py @@ -1167,9 +1167,9 @@ async def upload_document( canonical.duplicate_count = (canonical.duplicate_count or 0) + 1 # document + processing_queue 는 단일 트랜잭션으로 묶어 원자적 정리. - # G2 (PR-G2-2): 첫 stage = presegment (extract 前). 非PDF/단일문서는 presegment 가 - # 변경 없이 통과시켜 extract 로 흐르고, 번들 PDF 만 자식 분할된다 (worker-side gating). - await enqueue_stage(session, doc.id, "presegment") + # ★ G2 presegment 인제스트 비활성 (2026-06-18): Option A(자식이 부모 file_path 공유)가 + # uq_documents_file_path UNIQUE 제약과 충돌 — 자식파일 전략 재설계 후 재활성. 현재=직접 extract. + await enqueue_stage(session, doc.id, "extract") await session.commit() except Exception: # DB 예외 시 session 은 get_session 컨텍스트 종료로 자동 rollback. diff --git a/app/workers/file_watcher.py b/app/workers/file_watcher.py index 8abba74..6c7c9d9 100644 --- a/app/workers/file_watcher.py +++ b/app/workers/file_watcher.py @@ -118,18 +118,18 @@ def _route_media(path: Path, expected_category: str | None) -> tuple[str | None, if expected_category == "library": # 외부 작성 학습 자료 (KGS Code, 시행규칙 등). 문서 확장자만 수락. # frontmatter 해석은 classify_worker (옵션 C) 가 담당. file_watcher 는 라우팅만. - # G2 (PR-G2-2): 문서 첫 stage = presegment (extract 前). 非PDF/단일은 통과, 번들 PDF 만 분할. + # ★ G2 presegment 인제스트 비활성(2026-06-18, uq_documents_file_path 충돌) → 직접 extract. if ext in LIBRARY_DOC_EXTS: - return ("library", False, "presegment") + return ("library", False, "extract") if ext in AUDIO_EXTS or ext in VIDEO_DIRECT_EXTS or ext in VIDEO_QUARANTINE_EXTS: return (None, False, None) # audio/video 잘못 들어오면 skip return (None, False, None) # 기타 알 수 없는 확장자 skip # Inbox: 문서 파이프 (기존). audio/video 확장자가 실수로 여기 들어오면 skip. - # G2 (PR-G2-2): 문서 첫 stage = presegment (extract 前). 非PDF/단일은 통과, 번들 PDF 만 분할. + # ★ G2 presegment 인제스트 비활성(2026-06-18, uq_documents_file_path 충돌) → 직접 extract. if ext in AUDIO_EXTS or ext in VIDEO_DIRECT_EXTS or ext in VIDEO_QUARANTINE_EXTS: return (None, False, None) - return (None, False, "presegment") + return (None, False, "extract") # ─── Web/Blog ingest (devonagent 트랙) 헬퍼 ────────────────────────────────── @@ -228,9 +228,8 @@ async def _ingest_web_file(session, file_path: Path, rel_path: str) -> tuple[int ) session.add(doc) await session.flush() - # G2 (PR-G2-2): 모든 문서가 presegment 로 진입(단일 entry-point). HTML(非PDF)은 presegment 가 - # 변경 없이 통과시켜 extract 로 흐른다 (worker-side gating). - await enqueue_stage(session, doc.id, "presegment") + # ★ G2 presegment 인제스트 비활성(2026-06-18, uq_documents_file_path 충돌) → 직접 extract. + await enqueue_stage(session, doc.id, "extract") return (1, 0) diff --git a/app/workers/presegment_worker.py b/app/workers/presegment_worker.py index b3700e0..869c1f9 100644 --- a/app/workers/presegment_worker.py +++ b/app/workers/presegment_worker.py @@ -18,6 +18,12 @@ document_lineage(relation_type='segmented_from'). 부모(presegment_role='parent 멱등: 재실행 시 같은 부모로 이미 자식이 있으면(document_lineage segmented_from) 재생성하지 않고 수렴(각 자식이 extract 활성/완료 상태인지만 보장)한다. +★★ BLOCKER (2026-06-18 실측): Option A(자식이 부모 file_path 공유)는 `uq_documents_file_path` +UNIQUE 제약과 충돌 → 자식 INSERT UniqueViolation 으로 실패한다. 따라서 **현재 인제스트는 presegment +를 enqueue 하지 않음**(documents.py/file_watcher.py 가 직접 extract). 본 워커는 재설계 전까지 미사용. +재설계 후보: 자식 file_path=unique 합성값(`{parent}#p{s}-{e}`)+실파일은 부모(lineage source)에서 해석 +/ 또는 file_path NULL + 별도 bundle_source_path 컬럼. 결정 후 인제스트 재활성. + plan: G2 pre-segmentation (PR-G2-2 deterministic ToC segmentation) """ From 8930803a112ac3e8f1eaec0c144609b56d25f6a1 Mon Sep 17 00:00:00 2001 From: hyungi Date: Thu, 18 Jun 2026 17:19:17 +0900 Subject: [PATCH 4/5] =?UTF-8?q?feat(presegment):=20G2=20=ED=9B=84=EB=B3=B4?= =?UTF-8?q?=20A=20=E2=80=94=20=EC=9E=90=EC=8B=9D=20=ED=95=A9=EC=84=B1=20fi?= =?UTF-8?q?le=5Fpath=20+=20bundle=5Fsource=5Fpath=20=EC=8B=A4=ED=8C=8C?= =?UTF-8?q?=EC=9D=BC=20=ED=95=B4=EC=84=9D?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit uq_documents_file_path 충돌 해소: 자식 file_path = unique 합성값 '{부모}#p{s}-{e}' (UNIQUE 통과), 실파일은 bundle_source_path() 로 부모경로 복원(접미사 strip, 결정적). - presegment_worker: bundle_source_path() 헬퍼 + 자식 합성 file_path - extract_worker 자식분기: bundle_source_path + NFC/NFD resolve 로 실파일 range 추출 - marker_worker: container_path = bundle_source_path(file_path) (일반 doc 무변) 인제스트는 아직 extract(검증 후 재활성). 일반 doc = bundle_source_path no-op = 무회귀. Co-Authored-By: Claude Opus 4.8 (1M context) --- app/workers/extract_worker.py | 14 +++++++---- app/workers/marker_worker.py | 5 +++- app/workers/presegment_worker.py | 41 ++++++++++++++++++++++---------- 3 files changed, 43 insertions(+), 17 deletions(-) diff --git a/app/workers/extract_worker.py b/app/workers/extract_worker.py index b671e6c..3a5339b 100644 --- a/app/workers/extract_worker.py +++ b/app/workers/extract_worker.py @@ -340,12 +340,18 @@ async def process(document_id: int, session: AsyncSession) -> None: # 밖(자식은 ToC 존재 = digital text layer 전제 → 대개 OCR 불필요). PyMuPDF 텍스트가 빈약해도 # 그대로 보존하고 사유를 남긴다. if fmt == "pdf" and doc.bundle_page_start is not None and doc.bundle_page_end is not None: - if not full_path.exists(): - raise FileNotFoundError(f"파일 없음: {full_path}") + # 후보 A: 자식 file_path 는 합성값(`{부모}#p{s}-{e}`) → 실파일 = bundle_source_path 로 부모경로 + # 복원 + NFC/NFD resolve. (자식 file_path 는 디스크에 없음.) + from workers.presegment_worker import _resolve_path as _resolve_bundle_path + from workers.presegment_worker import bundle_source_path + real_rel = bundle_source_path(doc.file_path) + src = _resolve_bundle_path(str(Path(settings.nas_mount_path) / real_rel)) + if src is None: + raise FileNotFoundError(f"번들 원본 파일 없음: {real_rel}") start, end = doc.bundle_page_start, doc.bundle_page_end try: - pymupdf_text = _extract_pdf_pymupdf(full_path, start, end) - page_count = _get_pdf_page_count(full_path, start, end) + pymupdf_text = _extract_pdf_pymupdf(src, start, end) + page_count = _get_pdf_page_count(src, start, end) except Exception as e: logger.error(f"[pymupdf:child] {doc.file_path} pages={start}-{end} 실패: {e}") raise diff --git a/app/workers/marker_worker.py b/app/workers/marker_worker.py index 44c39d2..8b43434 100644 --- a/app/workers/marker_worker.py +++ b/app/workers/marker_worker.py @@ -185,7 +185,10 @@ async def process(document_id: int, session: AsyncSession) -> None: await _fail(session, document_id, "no file_path") return - container_path = _to_marker_path(doc.file_path) + # 후보 A: 자식(bundle cols)은 합성 file_path(`{부모}#p{s}-{e}`) → 실파일 = bundle_source_path + # 로 부모경로 복원. 일반 doc 은 그대로(접미사 없음). marker/mineru 는 실파일 + page 범위로 변환. + from workers.presegment_worker import bundle_source_path + container_path = _to_marker_path(bundle_source_path(doc.file_path)) suffix = Path(container_path).suffix.lower() # ---- (3) office/hwp → md (C-2): PDF 외 지원 포맷은 office_md 하이브리드 변환 ---- diff --git a/app/workers/presegment_worker.py b/app/workers/presegment_worker.py index 869c1f9..4fc6fc7 100644 --- a/app/workers/presegment_worker.py +++ b/app/workers/presegment_worker.py @@ -9,26 +9,26 @@ - page_count >= MIN_BUNDLE_PAGES AND level-1 ToC 항목 >= 2 AND 모든 자식 >= MIN_CHILD_PAGES AND 단조 증가·비중첩 AND [1, page_count] 전 범위 커버 AND 2 <= N <= MAX_CHILDREN. -분할 시 Option A(파일 물리분할 없음): 자식은 부모 file_path 를 그대로 공유하고 -bundle_page_start/end(1-based inclusive) 로 자기 page 범위만 가리킨다. 부모-자식 관계 자체는 -document_lineage(relation_type='segmented_from'). 부모(presegment_role='parent')는 파일 홀더라 -자체 extract/embed 안 함 — enqueue_next_stage 의 presegment→extract 전이가 'parent' 면 억제된다 -(queue_consumer.enqueue_next_stage 참조). 자식의 extract 는 이 워커가 직접 enqueue 한다. +분할 시 ★후보 A(물리분할 없음, uq_documents_file_path 해소): 자식 file_path = unique 합성값 +`{부모경로}#p{start}-{end}` (UNIQUE 제약 통과), 실파일은 `bundle_source_path()` 로 부모 경로 복원. +자식은 bundle_page_start/end(1-based inclusive) 로 부모 파일의 자기 page 범위만 가리킨다. +부모-자식 관계 정본 = document_lineage(relation_type='segmented_from'). 부모(presegment_role='parent')는 +파일 홀더라 자체 extract/embed 안 함 — enqueue_next_stage 의 presegment→extract 전이가 'parent' 면 +억제된다(queue_consumer 참조). 자식의 extract 는 이 워커가 직접 enqueue. extract_worker/marker_worker +가 자식 처리 시 bundle_source_path() 로 실파일 접근. 멱등: 재실행 시 같은 부모로 이미 자식이 있으면(document_lineage segmented_from) 재생성하지 않고 수렴(각 자식이 extract 활성/완료 상태인지만 보장)한다. -★★ BLOCKER (2026-06-18 실측): Option A(자식이 부모 file_path 공유)는 `uq_documents_file_path` -UNIQUE 제약과 충돌 → 자식 INSERT UniqueViolation 으로 실패한다. 따라서 **현재 인제스트는 presegment -를 enqueue 하지 않음**(documents.py/file_watcher.py 가 직접 extract). 본 워커는 재설계 전까지 미사용. -재설계 후보: 자식 file_path=unique 합성값(`{parent}#p{s}-{e}`)+실파일은 부모(lineage source)에서 해석 -/ 또는 file_path NULL + 별도 bundle_source_path 컬럼. 결정 후 인제스트 재활성. +★해결 이력 (2026-06-18): 최초 Option A(자식이 부모 file_path 그대로 공유)는 uq_documents_file_path +UNIQUE 위반(실번들 검증서 발견) → 합성 file_path(후보 A)로 해소. 인제스트 재활성 = 합성번들 재검증 PASS 후. plan: G2 pre-segmentation (PR-G2-2 deterministic ToC segmentation) """ import hashlib import os +import re import unicodedata from pathlib import Path @@ -82,6 +82,21 @@ def _to_container_path(file_path: str) -> str: return f"{CONTAINER_PATH_PREFIX}/{file_path}" +# 후보 A: 자식 합성 file_path 패턴 `{부모경로}#p{start}-{end}` (uq_documents_file_path 유일성). +_BUNDLE_SUFFIX_RE = re.compile(r"#p\d+-\d+$") + + +def bundle_source_path(file_path: str | None) -> str | None: + """자식 합성 file_path → 부모 실파일 경로 복원. 일반 doc(접미사 없음)은 그대로 반환. + + extract_worker/marker_worker 가 자식 처리 시 실제 파일 접근에 사용 (자식 file_path 는 + 합성값이라 디스크에 없음). 결정적·세션 불필요. lineage 가 부모-자식 관계의 정본 기록. + """ + if not file_path: + return file_path + return _BUNDLE_SUFFIX_RE.sub("", file_path) + + def _is_pdf(doc: Document) -> bool: """PDF 판정 — file_format=pdf 또는 .pdf 확장자.""" fmt = (doc.file_format or "").lower() @@ -300,8 +315,10 @@ async def process(document_id: int, session: AsyncSession) -> None: for seg in segments: start, end = seg["start_page"], seg["end_page"] child = Document( - # Option A: 부모 파일 그대로 공유(물리 분할 없음). 자식은 bundle_page_start/end 로 슬라이스. - file_path=doc.file_path, + # 후보 A: 자식 file_path = unique 합성값 `{부모경로}#p{s}-{e}` (uq_documents_file_path + # 충돌 회피). 실파일은 bundle_source_path() 로 복원(부모 경로). 물리 분할 없음 — + # 자식은 bundle_page_start/end 로 부모 파일을 슬라이스. + file_path=f"{doc.file_path}#p{start}-{end}", file_hash=_child_file_hash(doc.file_hash, start, end), file_format=doc.file_format, file_size=doc.file_size, From 2eda8d3bdd8186169fafeb4c3cab2a91c134227a Mon Sep 17 00:00:00 2001 From: hyungi Date: Thu, 18 Jun 2026 17:22:01 +0900 Subject: [PATCH 5/5] =?UTF-8?q?feat(presegment):=20G2=20=EC=9D=B8=EC=A0=9C?= =?UTF-8?q?=EC=8A=A4=ED=8A=B8=20=EC=9E=AC=ED=99=9C=EC=84=B1=20=E2=80=94=20?= =?UTF-8?q?=ED=9B=84=EB=B3=B4=20A=20e2e=20=EA=B2=80=EC=A6=9D=20PASS?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 합성 번들 e2e PASS(자식 3개 합성 file_path·range, uq 위반 0 + 자식 extract range-clamp 1110자 range_ok) 후 인제스트 presegment 재활성(documents.py upload + file_watcher 3곳). 非PDF/단일=통과. Co-Authored-By: Claude Opus 4.8 (1M context) --- app/api/documents.py | 6 +++--- app/workers/file_watcher.py | 12 ++++++------ 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/app/api/documents.py b/app/api/documents.py index 3dec397..f9fc615 100644 --- a/app/api/documents.py +++ b/app/api/documents.py @@ -1167,9 +1167,9 @@ async def upload_document( canonical.duplicate_count = (canonical.duplicate_count or 0) + 1 # document + processing_queue 는 단일 트랜잭션으로 묶어 원자적 정리. - # ★ G2 presegment 인제스트 비활성 (2026-06-18): Option A(자식이 부모 file_path 공유)가 - # uq_documents_file_path UNIQUE 제약과 충돌 — 자식파일 전략 재설계 후 재활성. 현재=직접 extract. - await enqueue_stage(session, doc.id, "extract") + # G2: 첫 stage=presegment (extract 前 번들 PDF 분할, 후보 A 검증완료 2026-06-18). + # 非PDF/단일은 presegment 가 무변 통과 → extract. 번들 PDF 만 N 자식 분할(worker-side gating). + await enqueue_stage(session, doc.id, "presegment") await session.commit() except Exception: # DB 예외 시 session 은 get_session 컨텍스트 종료로 자동 rollback. diff --git a/app/workers/file_watcher.py b/app/workers/file_watcher.py index 6c7c9d9..a96e9df 100644 --- a/app/workers/file_watcher.py +++ b/app/workers/file_watcher.py @@ -118,18 +118,18 @@ def _route_media(path: Path, expected_category: str | None) -> tuple[str | None, if expected_category == "library": # 외부 작성 학습 자료 (KGS Code, 시행규칙 등). 문서 확장자만 수락. # frontmatter 해석은 classify_worker (옵션 C) 가 담당. file_watcher 는 라우팅만. - # ★ G2 presegment 인제스트 비활성(2026-06-18, uq_documents_file_path 충돌) → 직접 extract. + # G2: 첫 stage=presegment (후보 A 검증완료). 非PDF/단일 통과, 번들 PDF 만 분할. if ext in LIBRARY_DOC_EXTS: - return ("library", False, "extract") + return ("library", False, "presegment") if ext in AUDIO_EXTS or ext in VIDEO_DIRECT_EXTS or ext in VIDEO_QUARANTINE_EXTS: return (None, False, None) # audio/video 잘못 들어오면 skip return (None, False, None) # 기타 알 수 없는 확장자 skip # Inbox: 문서 파이프 (기존). audio/video 확장자가 실수로 여기 들어오면 skip. - # ★ G2 presegment 인제스트 비활성(2026-06-18, uq_documents_file_path 충돌) → 직접 extract. + # G2: 첫 stage=presegment (후보 A 검증완료). 非PDF/단일 통과, 번들 PDF 만 분할. if ext in AUDIO_EXTS or ext in VIDEO_DIRECT_EXTS or ext in VIDEO_QUARANTINE_EXTS: return (None, False, None) - return (None, False, "extract") + return (None, False, "presegment") # ─── Web/Blog ingest (devonagent 트랙) 헬퍼 ────────────────────────────────── @@ -228,8 +228,8 @@ async def _ingest_web_file(session, file_path: Path, rel_path: str) -> tuple[int ) session.add(doc) await session.flush() - # ★ G2 presegment 인제스트 비활성(2026-06-18, uq_documents_file_path 충돌) → 직접 extract. - await enqueue_stage(session, doc.id, "extract") + # G2: 첫 stage=presegment (후보 A 검증완료). HTML(非PDF)은 presegment 가 무변 통과 → extract. + await enqueue_stage(session, doc.id, "presegment") return (1, 0)