From e50869cbdaea27925dac336523e983cd0e8ba6cb Mon Sep 17 00:00:00 2001 From: Hyungi Ahn Date: Fri, 1 May 2026 00:06:23 +0000 Subject: [PATCH] =?UTF-8?q?feat(canonical):=20Phase=201B=20marker-service?= =?UTF-8?q?=20+=20marker=5Fworker=20for=20PDF=E2=86=92markdown=20(222)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 신규 컨테이너 marker-service (port 3300, Marker 1.10.2 + surya 0.17.1 + HF cache volume). marker_worker 가 markdown stage 큐 소비: classify_worker → enqueue 'markdown' (leaf, embed/chunk 와 독립) → SKIP_DOC_TYPES (발주서/세금계산서/명세표) 스킵 → 확장자 != .pdf 스킵 (Phase 1B = PDF only) → page_count > 200 스킵 → marker-service POST /convert → 422/404 = doc-level failed, 5xx = queue retry 안정성 장치: - migration 222: ALTER TYPE process_stage ADD VALUE markdown (단일 statement) - md_extraction_quality JSONB dict 직접 저장 - skip 시 md_content/hash NULL 클리어 - /ready Response.status_code + warmup_error 가시화 - HF cache volume (build-time download 0) - file_path 는 NAS 상대경로 → /documents prefix prepend 성공 기준: 파이프라인 안정성. markdown 품질은 Phase 1D pilot. Pre-flight (2026-05-01): - marker-pdf 1.10.2 stable - file_path 9503건 NAS 상대경로 - DOCUMENT_TYPES 한국어 7종 → SKIP alias 보강 - queue retry max_attempts=3 + reset_stale_items 확인 - main 220/221 study_q_related 선점 → 222 rebump Plan: ~/.claude/plans/plan-idempotent-sundae.md (Round 5 approved) Co-Authored-By: Claude Opus 4.7 (1M context) --- app/models/document.py | 16 ++ app/workers/marker_worker.py | 239 ++++++++++++++++++ app/workers/queue_consumer.py | 7 +- docker-compose.yml | 33 +++ .../222_processing_queue_stage_markdown.sql | 8 + services/marker/Dockerfile | 22 ++ services/marker/requirements.txt | 9 + services/marker/server.py | 136 ++++++++++ 8 files changed, 468 insertions(+), 2 deletions(-) create mode 100644 app/workers/marker_worker.py create mode 100644 migrations/222_processing_queue_stage_markdown.sql create mode 100644 services/marker/Dockerfile create mode 100644 services/marker/requirements.txt create mode 100644 services/marker/server.py diff --git a/app/models/document.py b/app/models/document.py index fcfaa84..7de81ee 100644 --- a/app/models/document.py +++ b/app/models/document.py @@ -139,6 +139,22 @@ class Document(Base): facet_year: Mapped[int | None] = mapped_column(Integer) facet_doctype: Mapped[str | None] = mapped_column(Text) + # === Phase 1A canonical Markdown layer columns (migrations 211~219) === + # plan: ~/.claude/plans/plan-idempotent-sundae.md + md_content: Mapped[str | None] = mapped_column(Text) + md_frontmatter: Mapped[dict] = mapped_column(JSONB, nullable=False, default=dict) + md_format_version: Mapped[str] = mapped_column(Text, nullable=False, default='1.0') + md_status: Mapped[str] = mapped_column(Text, nullable=False, default='pending') + md_extraction_engine: Mapped[str | None] = mapped_column(Text) + md_extraction_engine_version: Mapped[str | None] = mapped_column(Text) + md_extraction_quality: Mapped[dict | None] = mapped_column(JSONB) + md_extraction_error: Mapped[str | None] = mapped_column(Text) + md_content_hash: Mapped[str | None] = mapped_column(Text) + md_source_hash: Mapped[str | None] = mapped_column(Text) + md_generated_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True)) + content_origin: Mapped[str] = mapped_column(Text, nullable=False, default='extracted') + md_draft_status: Mapped[str | None] = mapped_column(Text) + # 타임스탬프 created_at: Mapped[datetime] = mapped_column( DateTime(timezone=True), default=datetime.now diff --git a/app/workers/marker_worker.py b/app/workers/marker_worker.py new file mode 100644 index 0000000..904ae10 --- /dev/null +++ b/app/workers/marker_worker.py @@ -0,0 +1,239 @@ +"""marker_worker — markdown stage 소비. Phase 1B Round 5. + +플로우: + classify_worker 완료 → enqueue 'markdown' stage + → marker_worker.process() + → doc_type / 확장자 / page_count 가드 → marker-service POST /convert + → md_content 저장 또는 doc-level failed (404/422) 또는 transient raise (5xx → queue retry) + +plan: ~/.claude/plans/plan-idempotent-sundae.md +""" +import logging +import os +import re +from pathlib import Path +from typing import Any + +import fitz # PyMuPDF +import httpx +from sqlalchemy import update +from sqlalchemy.ext.asyncio import AsyncSession + +from models.document import Document + +logger = logging.getLogger(__name__) + +MARKER_ENDPOINT = "http://marker-service:3300/convert" +MARKER_TIMEOUT = 300 # 큰 PDF 5 분 한도 +MAX_PAGES = 200 # 페이지 hard limit + +# Phase 1B = PDF only. DOCX 등은 후속 Phase. +SUPPORTED_EXTENSIONS = {".pdf"} + +# config.yaml document_types 의 한국어 label 직접 사용 (Pre-flight 결과). +# Round 0 사용자 의도 = 표 중심 발주/계산/명세 도메인. +SKIP_DOC_TYPES = { + # 한국어 (실제 config label) + "발주서", "세금계산서", "명세표", + # 영문 placeholder (현재 config 미포함, 미래 신규 분류 자동 적용) + "Invoice", "Purchase_Order", "Estimate", "Statement", +} + +# documents.file_path 는 NAS 상대경로 (예: 'news/SCMP/abc'). +# fastapi 와 marker-service 모두 NAS 를 /documents 에 ro 마운트. +CONTAINER_PATH_PREFIX = os.getenv("MARKER_CONTAINER_PATH_PREFIX", "/documents") + + +def _to_marker_path(file_path: str) -> str: + """file_path 를 컨테이너 내부 절대경로로 변환.""" + if file_path.startswith("/"): + return file_path # 이미 절대경로 (legacy 가능성, 그대로) + return f"{CONTAINER_PATH_PREFIX}/{file_path}" + + +def _get_page_count(file_path: str) -> int | None: + """PyMuPDF 로 페이지 수 사전 검사. 실패 시 None (가드 통과).""" + try: + with fitz.open(file_path) as doc: + return doc.page_count + except Exception as exc: + logger.warning(f"[marker] page count read failed path={file_path}: {exc}") + return None + + +async def process(document_id: int, session: AsyncSession) -> None: + """markdown stage 워커 진입점. queue_consumer 가 호출.""" + doc = await session.get(Document, document_id) + if doc is None: + logger.warning(f"[marker] document {document_id} not found") + return + + # ---- (1) doc_type skip ---- + if doc.document_type in SKIP_DOC_TYPES: + logger.info( + f"markdown_skip_table_heavy_doctype id={document_id} doc_type={doc.document_type}" + ) + await _set_skipped(session, document_id, f"skipped: doc_type={doc.document_type}") + return + + # ---- (2) file_path validation ---- + if not doc.file_path: + await _fail(session, document_id, "no file_path") + return + + container_path = _to_marker_path(doc.file_path) + + # ---- (3) PDF only ---- + suffix = Path(container_path).suffix.lower() + if suffix not in SUPPORTED_EXTENSIONS: + logger.info(f"markdown_skip_unsupported_extension id={document_id} ext={suffix}") + await _set_skipped( + session, document_id, + f"skipped: unsupported extension={suffix} in Phase 1B", + ) + return + + # ---- (4) MAX_PAGES guard ---- + page_count = _get_page_count(container_path) + if page_count is not None and page_count > MAX_PAGES: + logger.info(f"markdown_skip_too_many_pages id={document_id} pages={page_count}") + await _set_skipped( + session, document_id, + f"skipped: page_count={page_count} exceeds MAX_PAGES={MAX_PAGES}", + ) + return + + # ---- (5) processing 표시 ---- + await session.execute( + update(Document).where(Document.id == document_id).values(md_status="processing") + ) + await session.commit() + + # ---- (6) Marker 호출 ---- + try: + async with httpx.AsyncClient(timeout=MARKER_TIMEOUT) as client: + resp = await client.post( + MARKER_ENDPOINT, + json={"file_path": container_path, "max_pages": MAX_PAGES}, + ) + resp.raise_for_status() + data = resp.json() + except httpx.HTTPStatusError as exc: + # 404 (file_not_found) + 422 (conversion_failed) = non-retryable failed. + # 5xx 만 transient → re-raise → queue retry. + if exc.response.status_code in {404, 422}: + try: + detail = exc.response.json().get("detail", {}) + code = detail.get("code", "unknown") + message = detail.get("message", exc.response.text) + except Exception: + code, message = "unknown", exc.response.text + logger.warning( + f"[marker] non-retryable {exc.response.status_code} {code} id={document_id}: {message[:200]}" + ) + await _fail(session, document_id, f"{code}: {message[:1000]}") + return + logger.exception(f"[marker] http error id={document_id}: {exc}") + raise + except (httpx.ConnectError, httpx.TimeoutException) as exc: + logger.warning(f"[marker] transient error id={document_id}: {exc}") + raise + except Exception as exc: + logger.exception(f"[marker] unexpected error id={document_id}: {exc}") + await _fail(session, document_id, str(exc)[:1000]) + return + + # ---- (7) success ---- + md_content = data["md_content"] + quality = _compute_quality(md_content, doc.extracted_text or "", data["raw_metrics"]) + + await session.execute( + update(Document).where(Document.id == document_id).values( + md_content=md_content, + md_status="success", + md_extraction_engine=data["engine"], + md_extraction_engine_version=data["engine_version"], + md_extraction_quality=quality, + md_content_hash=data["md_content_hash"], + md_source_hash=doc.file_hash, + md_generated_at=_now(), + md_extraction_error=None, + md_frontmatter=doc.md_frontmatter or {}, + md_format_version="1.0", + content_origin="extracted", + ) + ) + await session.commit() + logger.info( + f"[marker] success id={document_id} len={len(md_content)} elapsed_ms={data['elapsed_ms']}" + ) + + +def _compute_quality(md: str, raw_text: str, raw_metrics: dict[str, Any]) -> dict[str, Any]: + """1B 휴리스틱 quality. 임계 판정 미적용 (Phase 1D 후행).""" + heading_lines = re.findall(r"^(#{1,6})\s", md, flags=re.MULTILINE) + table_rows = re.findall(r"^\|.*\|\s*$", md, flags=re.MULTILINE) + image_refs = re.findall(r"!\[[^\]]*\]\([^)]+\)", md) + image_with_alt = re.findall(r"!\[[^\]]+\]\([^)]+\)", md) + + metrics = { + "source_page_count": raw_metrics.get("page_count"), + "markdown_heading_count": len(heading_lines), + "heading_jump_count": _count_heading_jumps(heading_lines), + "markdown_table_row_count": len(table_rows), + "markdown_image_count": len(image_refs), + "image_alt_text_ratio": (len(image_with_alt) / len(image_refs)) if image_refs else None, + "text_length_ratio": (len(md) / len(raw_text)) if raw_text else None, + } + warnings = [] + if metrics["heading_jump_count"] > 0: + warnings.append("heading_hierarchy_jump") + if metrics["image_alt_text_ratio"] is not None and metrics["image_alt_text_ratio"] < 0.5: + warnings.append("low_image_alt_text_ratio") + return { + "score": None, # Phase 1D 에서 가중합 결정 + "metrics": metrics, + "warnings": warnings, + } + + +def _count_heading_jumps(headings: list[str]) -> int: + levels = [len(h) for h in headings] + return sum(1 for i in range(1, len(levels)) if levels[i] - levels[i - 1] > 1) + + +async def _set_skipped(session: AsyncSession, document_id: int, reason: str) -> None: + """skip 시 md_content/hash NULL 클리어 (이전 success 잔존 방지).""" + await session.execute( + update(Document).where(Document.id == document_id).values( + md_status="skipped", + md_content=None, + md_content_hash=None, + md_extraction_engine="marker", + md_extraction_error=reason, + md_generated_at=_now(), + content_origin="extracted", + ) + ) + await session.commit() + + +async def _fail(session: AsyncSession, document_id: int, error: str) -> None: + """doc-level failed (재시도 무의미).""" + await session.execute( + update(Document).where(Document.id == document_id).values( + md_status="failed", + md_content=None, + md_content_hash=None, + md_extraction_error=error, + md_extraction_engine="marker", + md_generated_at=_now(), + content_origin="extracted", + ) + ) + await session.commit() + + +def _now(): + from datetime import datetime, timezone + return datetime.now(timezone.utc) diff --git a/app/workers/queue_consumer.py b/app/workers/queue_consumer.py index 0e3c689..0aa130e 100644 --- a/app/workers/queue_consumer.py +++ b/app/workers/queue_consumer.py @@ -16,7 +16,7 @@ logger = setup_logger("queue_consumer") # stt 는 GPU 단일 점유 + 회의 30분짜리도 가능 → 배치 1. thumbnail 은 ffmpeg subprocess 로 가벼움. # deep_summary (PR-B B-1) 는 MLX 26B 단일 Semaphore(1) 경유 → 배치 1. BATCH_SIZE = {"extract": 5, "classify": 3, "summarize": 3, "embed": 1, "chunk": 1, - "preview": 2, "stt": 1, "thumbnail": 3, "deep_summary": 1} + "preview": 2, "stt": 1, "thumbnail": 3, "deep_summary": 1, "markdown": 1} STALE_THRESHOLD_MINUTES = 10 @@ -106,7 +106,7 @@ async def enqueue_next_stage(document_id: int, current_stage: str): """ next_stages = { "extract": ["classify", "preview"], - "classify": ["embed", "chunk"], + "classify": ["embed", "chunk", "markdown"], "stt": ["classify"], } stages = next_stages.get(current_stage, []) @@ -130,6 +130,7 @@ async def consume_queue(): from workers.stt_worker import process as stt_process from workers.summarize_worker import process as summarize_process from workers.thumbnail_worker import process as thumbnail_process + from workers.marker_worker import process as marker_process workers = { "extract": extract_process, @@ -143,6 +144,8 @@ async def consume_queue(): # PR-B B-1: classify 가 에스컬레이션 판단 시 enqueue → 26B 가 detail_summary 작성. # next_stages 에 추가하지 않음 — deep_summary 는 leaf (classify→embed/chunk 흐름과 독립). "deep_summary": deep_summary_process, + # Phase 1B: classify 완료 후 enqueue. PDF→markdown 변환 (leaf, embed/chunk 와 독립). + "markdown": marker_process, } try: diff --git a/docker-compose.yml b/docker-compose.yml index 5d66999..d719ce0 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -54,6 +54,34 @@ services: start_period: 180s restart: unless-stopped + # Phase 1B (2026-05-01): PDF → markdown 변환. ocr-service 와 별도 컨테이너 (deps 충돌 회피). + marker-service: + build: ./services/marker + ports: + - "127.0.0.1:3300:3300" + expose: + - "3300" + environment: + - HF_HOME=/models/huggingface + - TORCH_HOME=/models/torch + volumes: + - ${NAS_NFS_PATH:-/mnt/nas/Document_Server}:/documents:ro + - marker_models:/models + deploy: + resources: + reservations: + devices: + - driver: nvidia + count: 1 + capabilities: [gpu] + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:3300/ready"] + interval: 30s + timeout: 10s + retries: 3 + start_period: 300s + restart: unless-stopped + stt-service: # 2026-04-24: STT 가 Mac mini (faster-whisper, 192.168.1.122:8804 / 100.76.254.116:8804) # 로 이전됨. GPU 에서 컨테이너는 더 이상 기동하지 않는다. 복원이 필요하면 @@ -153,12 +181,16 @@ services: condition: service_healthy kordoc-service: condition: service_healthy + marker-service: + condition: service_healthy env_file: - credentials.env environment: - DATABASE_URL=postgresql+asyncpg://pkm:${POSTGRES_PASSWORD}@postgres:5432/pkm - KORDOC_ENDPOINT=http://kordoc-service:3100 - OCR_ENDPOINT=http://ocr-service:3200 + - MARKER_ENDPOINT=http://marker-service:3300 + - MARKER_CONTAINER_PATH_PREFIX=/documents # 2026-04-24 STT Mac mini 이전: 기본값 100.76.254.116:8804 (Tailscale), 필요 시 # MAC_MINI_HOST env 로 192.168.1.122 등 LAN IP 주입. - STT_ENDPOINT=http://${MAC_MINI_HOST:-100.76.254.116}:8804 @@ -191,3 +223,4 @@ volumes: reranker_cache: ocr_models: stt_models: + marker_models: diff --git a/migrations/222_processing_queue_stage_markdown.sql b/migrations/222_processing_queue_stage_markdown.sql new file mode 100644 index 0000000..b453c92 --- /dev/null +++ b/migrations/222_processing_queue_stage_markdown.sql @@ -0,0 +1,8 @@ +-- 222_processing_queue_stage_markdown.sql +-- Phase 1B: process_stage enum 에 'markdown' 추가. +-- plan: ~/.claude/plans/plan-idempotent-sundae.md (Phase 1B Round 5) +-- +-- marker_worker 가 소비할 stage. classify_worker 완료 시 enqueue. +-- 단일 statement (asyncpg exec_driver_sql 제약, feedback_migration_runner_single_statement). + +ALTER TYPE process_stage ADD VALUE IF NOT EXISTS 'markdown'; diff --git a/services/marker/Dockerfile b/services/marker/Dockerfile new file mode 100644 index 0000000..33ddfa4 --- /dev/null +++ b/services/marker/Dockerfile @@ -0,0 +1,22 @@ +FROM python:3.12-slim + +WORKDIR /app + +RUN apt-get update && apt-get install -y --no-install-recommends \ + libgl1 libglib2.0-0 curl \ + && apt-get clean && rm -rf /var/lib/apt/lists/* + +COPY requirements.txt . +RUN pip install --no-cache-dir \ + --extra-index-url https://download.pytorch.org/whl/cu126 \ + -r requirements.txt + +# 모델 미다운로드 (HF cache volume → 첫 호출/warmup 시 적재). + +COPY server.py . + +EXPOSE 3300 +HEALTHCHECK --start-period=300s --interval=30s --timeout=10s --retries=3 \ + CMD curl -f http://localhost:3300/ready || exit 1 + +CMD ["uvicorn", "server:app", "--host", "0.0.0.0", "--port", "3300"] diff --git a/services/marker/requirements.txt b/services/marker/requirements.txt new file mode 100644 index 0000000..ef7cc06 --- /dev/null +++ b/services/marker/requirements.txt @@ -0,0 +1,9 @@ +torch==2.11.0+cu126 +torchvision==0.26.0+cu126 +transformers==4.57.6 +surya-ocr==0.17.1 +marker-pdf==1.10.2 +pymupdf>=1.24.0,<2.0.0 +fastapi>=0.110.0,<1.0.0 +uvicorn[standard]>=0.27.0,<1.0.0 +pillow>=10.0.0,<12.0.0 diff --git a/services/marker/server.py b/services/marker/server.py new file mode 100644 index 0000000..2ffd799 --- /dev/null +++ b/services/marker/server.py @@ -0,0 +1,136 @@ +"""marker-service — POST /convert: PDF → markdown (텍스트만, 이미지 제외). + +Phase 1B Round 5 — /ready 정확한 status code, warmup 실패 가시화, 변환 실패 = 422. +plan: ~/.claude/plans/plan-idempotent-sundae.md +""" +import hashlib +import logging +import os +import threading +import time +from pathlib import Path + +from fastapi import FastAPI, HTTPException, Response +from pydantic import BaseModel + +from marker.converters.pdf import PdfConverter +from marker.models import create_model_dict +from marker.output import text_from_rendered +import marker as marker_module + +logger = logging.getLogger(__name__) +app = FastAPI() + +os.environ.setdefault("HF_HOME", "/models/huggingface") +os.environ.setdefault("TORCH_HOME", "/models/torch") + +_models = None +_converter = None +_engine_version = getattr(marker_module, "__version__", "unknown") +_warmup_done = False +_warmup_error: str | None = None +_warmup_lock = threading.Lock() + + +def _ensure_warmup() -> None: + """첫 /convert 또는 startup hook 시 모델 로드. HF cache volume 활용.""" + global _models, _converter, _warmup_done, _warmup_error + if _warmup_done: + return + with _warmup_lock: + if _warmup_done: + return + try: + logger.info("[marker-service] warmup start") + _models = create_model_dict() + _converter = PdfConverter(artifact_dict=_models) + _warmup_done = True + _warmup_error = None + logger.info(f"[marker-service] warmup done engine_version={_engine_version}") + except Exception as exc: + _warmup_error = f"{type(exc).__name__}: {exc}" + logger.exception("[marker-service] warmup failed") + raise + + +@app.on_event("startup") +async def startup(): + """startup hook — async warmup 백그라운드. /ready 가 완료 여부 노출.""" + import asyncio + asyncio.create_task(asyncio.to_thread(_ensure_warmup)) + + +class ConvertRequest(BaseModel): + file_path: str + max_pages: int | None = None + + +class ConvertResponse(BaseModel): + md_content: str + md_content_hash: str + engine: str + engine_version: str + elapsed_ms: int + raw_metrics: dict + + +@app.get("/ready") +async def ready(response: Response): + """Round 4 #1+#2: Response.status_code 명시 + warmup_error 노출.""" + if _warmup_error: + response.status_code = 503 + return { + "status": "warmup_failed", + "engine": "marker", + "engine_version": _engine_version, + "error": _warmup_error, + } + if not _warmup_done: + response.status_code = 503 + return { + "status": "warming_up", + "engine": "marker", + "engine_version": _engine_version, + } + return { + "status": "ready", + "engine": "marker", + "engine_version": _engine_version, + } + + +@app.post("/convert", response_model=ConvertResponse) +async def convert(req: ConvertRequest): + _ensure_warmup() + + p = Path(req.file_path) + if not p.is_file(): + raise HTTPException(404, detail={"code": "file_not_found", "message": str(p)}) + + start = time.monotonic() + try: + rendered = _converter(str(p)) + except Exception as exc: + logger.exception(f"[marker-service] conversion failed path={p}: {exc}") + raise HTTPException( + status_code=422, + detail={ + "code": "conversion_failed", + "message": f"{type(exc).__name__}: {exc}", + }, + ) from exc + + md_text, _meta, _images = text_from_rendered(rendered) + elapsed_ms = int((time.monotonic() - start) * 1000) + + return ConvertResponse( + md_content=md_text, + md_content_hash=hashlib.sha256(md_text.encode("utf-8")).hexdigest(), + engine="marker", + engine_version=_engine_version, + elapsed_ms=elapsed_ms, + raw_metrics={ + "page_count": getattr(rendered, "page_count", None), + "image_count_extracted": len(_images) if _images else 0, + }, + )