feat(canonical): Phase 1B marker-service + marker_worker for PDF→markdown (222)

신규 컨테이너 marker-service (port 3300, Marker 1.10.2 + surya 0.17.1 + HF
cache volume). marker_worker 가 markdown stage 큐 소비:
  classify_worker → enqueue 'markdown' (leaf, embed/chunk 와 독립)
  → SKIP_DOC_TYPES (발주서/세금계산서/명세표) 스킵
  → 확장자 != .pdf 스킵 (Phase 1B = PDF only)
  → page_count > 200 스킵
  → marker-service POST /convert
  → 422/404 = doc-level failed, 5xx = queue retry

안정성 장치:
- migration 222: ALTER TYPE process_stage ADD VALUE markdown (단일 statement)
- md_extraction_quality JSONB dict 직접 저장
- skip 시 md_content/hash NULL 클리어
- /ready Response.status_code + warmup_error 가시화
- HF cache volume (build-time download 0)
- file_path 는 NAS 상대경로 → /documents prefix prepend

성공 기준: 파이프라인 안정성. markdown 품질은 Phase 1D pilot.

Pre-flight (2026-05-01):
- marker-pdf 1.10.2 stable
- file_path 9503건 NAS 상대경로
- DOCUMENT_TYPES 한국어 7종 → SKIP alias 보강
- queue retry max_attempts=3 + reset_stale_items 확인
- main 220/221 study_q_related 선점 → 222 rebump

Plan: ~/.claude/plans/plan-idempotent-sundae.md (Round 5 approved)

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Hyungi Ahn
2026-05-01 00:06:23 +00:00
parent 2dd0f655bc
commit e50869cbda
8 changed files with 468 additions and 2 deletions
+16
View File
@@ -139,6 +139,22 @@ class Document(Base):
facet_year: Mapped[int | None] = mapped_column(Integer)
facet_doctype: Mapped[str | None] = mapped_column(Text)
# === Phase 1A canonical Markdown layer columns (migrations 211~219) ===
# plan: ~/.claude/plans/plan-idempotent-sundae.md
md_content: Mapped[str | None] = mapped_column(Text)
md_frontmatter: Mapped[dict] = mapped_column(JSONB, nullable=False, default=dict)
md_format_version: Mapped[str] = mapped_column(Text, nullable=False, default='1.0')
md_status: Mapped[str] = mapped_column(Text, nullable=False, default='pending')
md_extraction_engine: Mapped[str | None] = mapped_column(Text)
md_extraction_engine_version: Mapped[str | None] = mapped_column(Text)
md_extraction_quality: Mapped[dict | None] = mapped_column(JSONB)
md_extraction_error: Mapped[str | None] = mapped_column(Text)
md_content_hash: Mapped[str | None] = mapped_column(Text)
md_source_hash: Mapped[str | None] = mapped_column(Text)
md_generated_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True))
content_origin: Mapped[str] = mapped_column(Text, nullable=False, default='extracted')
md_draft_status: Mapped[str | None] = mapped_column(Text)
# 타임스탬프
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), default=datetime.now
+239
View File
@@ -0,0 +1,239 @@
"""marker_worker — markdown stage 소비. Phase 1B Round 5.
플로우:
classify_worker 완료 → enqueue 'markdown' stage
→ marker_worker.process()
→ doc_type / 확장자 / page_count 가드 → marker-service POST /convert
→ md_content 저장 또는 doc-level failed (404/422) 또는 transient raise (5xx → queue retry)
plan: ~/.claude/plans/plan-idempotent-sundae.md
"""
import logging
import os
import re
from pathlib import Path
from typing import Any
import fitz # PyMuPDF
import httpx
from sqlalchemy import update
from sqlalchemy.ext.asyncio import AsyncSession
from models.document import Document
logger = logging.getLogger(__name__)
MARKER_ENDPOINT = "http://marker-service:3300/convert"
MARKER_TIMEOUT = 300 # 큰 PDF 5 분 한도
MAX_PAGES = 200 # 페이지 hard limit
# Phase 1B = PDF only. DOCX 등은 후속 Phase.
SUPPORTED_EXTENSIONS = {".pdf"}
# config.yaml document_types 의 한국어 label 직접 사용 (Pre-flight 결과).
# Round 0 사용자 의도 = 표 중심 발주/계산/명세 도메인.
SKIP_DOC_TYPES = {
# 한국어 (실제 config label)
"발주서", "세금계산서", "명세표",
# 영문 placeholder (현재 config 미포함, 미래 신규 분류 자동 적용)
"Invoice", "Purchase_Order", "Estimate", "Statement",
}
# documents.file_path 는 NAS 상대경로 (예: 'news/SCMP/abc').
# fastapi 와 marker-service 모두 NAS 를 /documents 에 ro 마운트.
CONTAINER_PATH_PREFIX = os.getenv("MARKER_CONTAINER_PATH_PREFIX", "/documents")
def _to_marker_path(file_path: str) -> str:
"""file_path 를 컨테이너 내부 절대경로로 변환."""
if file_path.startswith("/"):
return file_path # 이미 절대경로 (legacy 가능성, 그대로)
return f"{CONTAINER_PATH_PREFIX}/{file_path}"
def _get_page_count(file_path: str) -> int | None:
"""PyMuPDF 로 페이지 수 사전 검사. 실패 시 None (가드 통과)."""
try:
with fitz.open(file_path) as doc:
return doc.page_count
except Exception as exc:
logger.warning(f"[marker] page count read failed path={file_path}: {exc}")
return None
async def process(document_id: int, session: AsyncSession) -> None:
"""markdown stage 워커 진입점. queue_consumer 가 호출."""
doc = await session.get(Document, document_id)
if doc is None:
logger.warning(f"[marker] document {document_id} not found")
return
# ---- (1) doc_type skip ----
if doc.document_type in SKIP_DOC_TYPES:
logger.info(
f"markdown_skip_table_heavy_doctype id={document_id} doc_type={doc.document_type}"
)
await _set_skipped(session, document_id, f"skipped: doc_type={doc.document_type}")
return
# ---- (2) file_path validation ----
if not doc.file_path:
await _fail(session, document_id, "no file_path")
return
container_path = _to_marker_path(doc.file_path)
# ---- (3) PDF only ----
suffix = Path(container_path).suffix.lower()
if suffix not in SUPPORTED_EXTENSIONS:
logger.info(f"markdown_skip_unsupported_extension id={document_id} ext={suffix}")
await _set_skipped(
session, document_id,
f"skipped: unsupported extension={suffix} in Phase 1B",
)
return
# ---- (4) MAX_PAGES guard ----
page_count = _get_page_count(container_path)
if page_count is not None and page_count > MAX_PAGES:
logger.info(f"markdown_skip_too_many_pages id={document_id} pages={page_count}")
await _set_skipped(
session, document_id,
f"skipped: page_count={page_count} exceeds MAX_PAGES={MAX_PAGES}",
)
return
# ---- (5) processing 표시 ----
await session.execute(
update(Document).where(Document.id == document_id).values(md_status="processing")
)
await session.commit()
# ---- (6) Marker 호출 ----
try:
async with httpx.AsyncClient(timeout=MARKER_TIMEOUT) as client:
resp = await client.post(
MARKER_ENDPOINT,
json={"file_path": container_path, "max_pages": MAX_PAGES},
)
resp.raise_for_status()
data = resp.json()
except httpx.HTTPStatusError as exc:
# 404 (file_not_found) + 422 (conversion_failed) = non-retryable failed.
# 5xx 만 transient → re-raise → queue retry.
if exc.response.status_code in {404, 422}:
try:
detail = exc.response.json().get("detail", {})
code = detail.get("code", "unknown")
message = detail.get("message", exc.response.text)
except Exception:
code, message = "unknown", exc.response.text
logger.warning(
f"[marker] non-retryable {exc.response.status_code} {code} id={document_id}: {message[:200]}"
)
await _fail(session, document_id, f"{code}: {message[:1000]}")
return
logger.exception(f"[marker] http error id={document_id}: {exc}")
raise
except (httpx.ConnectError, httpx.TimeoutException) as exc:
logger.warning(f"[marker] transient error id={document_id}: {exc}")
raise
except Exception as exc:
logger.exception(f"[marker] unexpected error id={document_id}: {exc}")
await _fail(session, document_id, str(exc)[:1000])
return
# ---- (7) success ----
md_content = data["md_content"]
quality = _compute_quality(md_content, doc.extracted_text or "", data["raw_metrics"])
await session.execute(
update(Document).where(Document.id == document_id).values(
md_content=md_content,
md_status="success",
md_extraction_engine=data["engine"],
md_extraction_engine_version=data["engine_version"],
md_extraction_quality=quality,
md_content_hash=data["md_content_hash"],
md_source_hash=doc.file_hash,
md_generated_at=_now(),
md_extraction_error=None,
md_frontmatter=doc.md_frontmatter or {},
md_format_version="1.0",
content_origin="extracted",
)
)
await session.commit()
logger.info(
f"[marker] success id={document_id} len={len(md_content)} elapsed_ms={data['elapsed_ms']}"
)
def _compute_quality(md: str, raw_text: str, raw_metrics: dict[str, Any]) -> dict[str, Any]:
"""1B 휴리스틱 quality. 임계 판정 미적용 (Phase 1D 후행)."""
heading_lines = re.findall(r"^(#{1,6})\s", md, flags=re.MULTILINE)
table_rows = re.findall(r"^\|.*\|\s*$", md, flags=re.MULTILINE)
image_refs = re.findall(r"!\[[^\]]*\]\([^)]+\)", md)
image_with_alt = re.findall(r"!\[[^\]]+\]\([^)]+\)", md)
metrics = {
"source_page_count": raw_metrics.get("page_count"),
"markdown_heading_count": len(heading_lines),
"heading_jump_count": _count_heading_jumps(heading_lines),
"markdown_table_row_count": len(table_rows),
"markdown_image_count": len(image_refs),
"image_alt_text_ratio": (len(image_with_alt) / len(image_refs)) if image_refs else None,
"text_length_ratio": (len(md) / len(raw_text)) if raw_text else None,
}
warnings = []
if metrics["heading_jump_count"] > 0:
warnings.append("heading_hierarchy_jump")
if metrics["image_alt_text_ratio"] is not None and metrics["image_alt_text_ratio"] < 0.5:
warnings.append("low_image_alt_text_ratio")
return {
"score": None, # Phase 1D 에서 가중합 결정
"metrics": metrics,
"warnings": warnings,
}
def _count_heading_jumps(headings: list[str]) -> int:
levels = [len(h) for h in headings]
return sum(1 for i in range(1, len(levels)) if levels[i] - levels[i - 1] > 1)
async def _set_skipped(session: AsyncSession, document_id: int, reason: str) -> None:
"""skip 시 md_content/hash NULL 클리어 (이전 success 잔존 방지)."""
await session.execute(
update(Document).where(Document.id == document_id).values(
md_status="skipped",
md_content=None,
md_content_hash=None,
md_extraction_engine="marker",
md_extraction_error=reason,
md_generated_at=_now(),
content_origin="extracted",
)
)
await session.commit()
async def _fail(session: AsyncSession, document_id: int, error: str) -> None:
"""doc-level failed (재시도 무의미)."""
await session.execute(
update(Document).where(Document.id == document_id).values(
md_status="failed",
md_content=None,
md_content_hash=None,
md_extraction_error=error,
md_extraction_engine="marker",
md_generated_at=_now(),
content_origin="extracted",
)
)
await session.commit()
def _now():
from datetime import datetime, timezone
return datetime.now(timezone.utc)
+5 -2
View File
@@ -16,7 +16,7 @@ logger = setup_logger("queue_consumer")
# stt 는 GPU 단일 점유 + 회의 30분짜리도 가능 → 배치 1. thumbnail 은 ffmpeg subprocess 로 가벼움.
# deep_summary (PR-B B-1) 는 MLX 26B 단일 Semaphore(1) 경유 → 배치 1.
BATCH_SIZE = {"extract": 5, "classify": 3, "summarize": 3, "embed": 1, "chunk": 1,
"preview": 2, "stt": 1, "thumbnail": 3, "deep_summary": 1}
"preview": 2, "stt": 1, "thumbnail": 3, "deep_summary": 1, "markdown": 1}
STALE_THRESHOLD_MINUTES = 10
@@ -106,7 +106,7 @@ async def enqueue_next_stage(document_id: int, current_stage: str):
"""
next_stages = {
"extract": ["classify", "preview"],
"classify": ["embed", "chunk"],
"classify": ["embed", "chunk", "markdown"],
"stt": ["classify"],
}
stages = next_stages.get(current_stage, [])
@@ -130,6 +130,7 @@ async def consume_queue():
from workers.stt_worker import process as stt_process
from workers.summarize_worker import process as summarize_process
from workers.thumbnail_worker import process as thumbnail_process
from workers.marker_worker import process as marker_process
workers = {
"extract": extract_process,
@@ -143,6 +144,8 @@ async def consume_queue():
# PR-B B-1: classify 가 에스컬레이션 판단 시 enqueue → 26B 가 detail_summary 작성.
# next_stages 에 추가하지 않음 — deep_summary 는 leaf (classify→embed/chunk 흐름과 독립).
"deep_summary": deep_summary_process,
# Phase 1B: classify 완료 후 enqueue. PDF→markdown 변환 (leaf, embed/chunk 와 독립).
"markdown": marker_process,
}
try:
+33
View File
@@ -54,6 +54,34 @@ services:
start_period: 180s
restart: unless-stopped
# Phase 1B (2026-05-01): PDF → markdown 변환. ocr-service 와 별도 컨테이너 (deps 충돌 회피).
marker-service:
build: ./services/marker
ports:
- "127.0.0.1:3300:3300"
expose:
- "3300"
environment:
- HF_HOME=/models/huggingface
- TORCH_HOME=/models/torch
volumes:
- ${NAS_NFS_PATH:-/mnt/nas/Document_Server}:/documents:ro
- marker_models:/models
deploy:
resources:
reservations:
devices:
- driver: nvidia
count: 1
capabilities: [gpu]
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:3300/ready"]
interval: 30s
timeout: 10s
retries: 3
start_period: 300s
restart: unless-stopped
stt-service:
# 2026-04-24: STT 가 Mac mini (faster-whisper, 192.168.1.122:8804 / 100.76.254.116:8804)
# 로 이전됨. GPU 에서 컨테이너는 더 이상 기동하지 않는다. 복원이 필요하면
@@ -153,12 +181,16 @@ services:
condition: service_healthy
kordoc-service:
condition: service_healthy
marker-service:
condition: service_healthy
env_file:
- credentials.env
environment:
- DATABASE_URL=postgresql+asyncpg://pkm:${POSTGRES_PASSWORD}@postgres:5432/pkm
- KORDOC_ENDPOINT=http://kordoc-service:3100
- OCR_ENDPOINT=http://ocr-service:3200
- MARKER_ENDPOINT=http://marker-service:3300
- MARKER_CONTAINER_PATH_PREFIX=/documents
# 2026-04-24 STT Mac mini 이전: 기본값 100.76.254.116:8804 (Tailscale), 필요 시
# MAC_MINI_HOST env 로 192.168.1.122 등 LAN IP 주입.
- STT_ENDPOINT=http://${MAC_MINI_HOST:-100.76.254.116}:8804
@@ -191,3 +223,4 @@ volumes:
reranker_cache:
ocr_models:
stt_models:
marker_models:
@@ -0,0 +1,8 @@
-- 222_processing_queue_stage_markdown.sql
-- Phase 1B: process_stage enum 에 'markdown' 추가.
-- plan: ~/.claude/plans/plan-idempotent-sundae.md (Phase 1B Round 5)
--
-- marker_worker 가 소비할 stage. classify_worker 완료 시 enqueue.
-- 단일 statement (asyncpg exec_driver_sql 제약, feedback_migration_runner_single_statement).
ALTER TYPE process_stage ADD VALUE IF NOT EXISTS 'markdown';
+22
View File
@@ -0,0 +1,22 @@
FROM python:3.12-slim
WORKDIR /app
RUN apt-get update && apt-get install -y --no-install-recommends \
libgl1 libglib2.0-0 curl \
&& apt-get clean && rm -rf /var/lib/apt/lists/*
COPY requirements.txt .
RUN pip install --no-cache-dir \
--extra-index-url https://download.pytorch.org/whl/cu126 \
-r requirements.txt
# 모델 미다운로드 (HF cache volume → 첫 호출/warmup 시 적재).
COPY server.py .
EXPOSE 3300
HEALTHCHECK --start-period=300s --interval=30s --timeout=10s --retries=3 \
CMD curl -f http://localhost:3300/ready || exit 1
CMD ["uvicorn", "server:app", "--host", "0.0.0.0", "--port", "3300"]
+9
View File
@@ -0,0 +1,9 @@
torch==2.11.0+cu126
torchvision==0.26.0+cu126
transformers==4.57.6
surya-ocr==0.17.1
marker-pdf==1.10.2
pymupdf>=1.24.0,<2.0.0
fastapi>=0.110.0,<1.0.0
uvicorn[standard]>=0.27.0,<1.0.0
pillow>=10.0.0,<12.0.0
+136
View File
@@ -0,0 +1,136 @@
"""marker-service — POST /convert: PDF → markdown (텍스트만, 이미지 제외).
Phase 1B Round 5 — /ready 정확한 status code, warmup 실패 가시화, 변환 실패 = 422.
plan: ~/.claude/plans/plan-idempotent-sundae.md
"""
import hashlib
import logging
import os
import threading
import time
from pathlib import Path
from fastapi import FastAPI, HTTPException, Response
from pydantic import BaseModel
from marker.converters.pdf import PdfConverter
from marker.models import create_model_dict
from marker.output import text_from_rendered
import marker as marker_module
logger = logging.getLogger(__name__)
app = FastAPI()
os.environ.setdefault("HF_HOME", "/models/huggingface")
os.environ.setdefault("TORCH_HOME", "/models/torch")
_models = None
_converter = None
_engine_version = getattr(marker_module, "__version__", "unknown")
_warmup_done = False
_warmup_error: str | None = None
_warmup_lock = threading.Lock()
def _ensure_warmup() -> None:
"""첫 /convert 또는 startup hook 시 모델 로드. HF cache volume 활용."""
global _models, _converter, _warmup_done, _warmup_error
if _warmup_done:
return
with _warmup_lock:
if _warmup_done:
return
try:
logger.info("[marker-service] warmup start")
_models = create_model_dict()
_converter = PdfConverter(artifact_dict=_models)
_warmup_done = True
_warmup_error = None
logger.info(f"[marker-service] warmup done engine_version={_engine_version}")
except Exception as exc:
_warmup_error = f"{type(exc).__name__}: {exc}"
logger.exception("[marker-service] warmup failed")
raise
@app.on_event("startup")
async def startup():
"""startup hook — async warmup 백그라운드. /ready 가 완료 여부 노출."""
import asyncio
asyncio.create_task(asyncio.to_thread(_ensure_warmup))
class ConvertRequest(BaseModel):
file_path: str
max_pages: int | None = None
class ConvertResponse(BaseModel):
md_content: str
md_content_hash: str
engine: str
engine_version: str
elapsed_ms: int
raw_metrics: dict
@app.get("/ready")
async def ready(response: Response):
"""Round 4 #1+#2: Response.status_code 명시 + warmup_error 노출."""
if _warmup_error:
response.status_code = 503
return {
"status": "warmup_failed",
"engine": "marker",
"engine_version": _engine_version,
"error": _warmup_error,
}
if not _warmup_done:
response.status_code = 503
return {
"status": "warming_up",
"engine": "marker",
"engine_version": _engine_version,
}
return {
"status": "ready",
"engine": "marker",
"engine_version": _engine_version,
}
@app.post("/convert", response_model=ConvertResponse)
async def convert(req: ConvertRequest):
_ensure_warmup()
p = Path(req.file_path)
if not p.is_file():
raise HTTPException(404, detail={"code": "file_not_found", "message": str(p)})
start = time.monotonic()
try:
rendered = _converter(str(p))
except Exception as exc:
logger.exception(f"[marker-service] conversion failed path={p}: {exc}")
raise HTTPException(
status_code=422,
detail={
"code": "conversion_failed",
"message": f"{type(exc).__name__}: {exc}",
},
) from exc
md_text, _meta, _images = text_from_rendered(rendered)
elapsed_ms = int((time.monotonic() - start) * 1000)
return ConvertResponse(
md_content=md_text,
md_content_hash=hashlib.sha256(md_text.encode("utf-8")).hexdigest(),
engine="marker",
engine_version=_engine_version,
elapsed_ms=elapsed_ms,
raw_metrics={
"page_count": getattr(rendered, "page_count", None),
"image_count_extracted": len(_images) if _images else 0,
},
)