feat(documents): S1-ADD dedup·원본명 3컬럼 + md_status success→completed 매핑 (A) + office→md PoC (C-1)

plan ds-s1-backend-1 (r5 수렴). 코드만 스테이징 — migration 미적용(restart 보류, E-2 Soft Lock 예외창).

A (앱 v1 디코딩 비파괴 최소선):
- A-1 migrations/287_documents_dedup_fields.sql: original_filename TEXT / duplicate_of BIGINT FK ON DELETE SET NULL
  / duplicate_count INTEGER NOT NULL DEFAULT 0. 단일 statement·PG16 fast-path·BEGIN/COMMIT 금지. backfill 미포함(B-4).
- A-2 app/models/document.py: 1계층 블록에 3 mapped_column (+ ForeignKey import). md_* 는 기존.
- A-3 app/api/documents.py: DocumentResponse 3필드(duplicate_count=0 non-opt) + DocumentDetailResponse
  field_validator(success→completed, mode=before) — read-time DB→API 단방향, write(ORM) 미적용.
- A-4 tests/test_s1_dedup_shape.py: success→completed 동작 + 비-success 통과 + 3필드 디폴트/roundtrip
  + ds-app contract fixture 디코드(skip-if-absent). py_compile OK. ★ backend 절반 — 전체 비파괴는 S3 render 테스트와 AND.

C-1 PoC (워커 미연결 — C-2 에서 marker_worker 분기 연결):
- app/workers/office_md.py: OOXML=markitdown(신규 dep, lazy) / hwp·hwpx=LibreOffice headless→HTML→markdownify(기존 dep).
  실패·빈출력·타임아웃·dep부재 → OfficeMdError raise (success+빈md 금지 = C-5 postcondition 의 변환기 계약).
- scripts/poc_office_md.py: 표 fidelity 측정 하니스. E-1 = prod LibreOffice 버전핀 안전컨텍스트 실행(hwpx 필터 버전 의존).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
hyungi
2026-06-04 22:09:12 +09:00
parent 701113738f
commit 16b0fe1743
6 changed files with 426 additions and 2 deletions
+15 -1
View File
@@ -22,7 +22,7 @@ from fastapi import (
status,
)
from fastapi.responses import FileResponse
from pydantic import BaseModel
from pydantic import BaseModel, field_validator
from sqlalchemy import func, select
from sqlalchemy.ext.asyncio import AsyncSession
from starlette.requests import ClientDisconnect
@@ -113,6 +113,10 @@ class DocumentResponse(BaseModel):
# 회독 추적 (자료실 등) — 현재 사용자 기준. 다른 endpoint 응답에선 0/None.
read_count: int = 0
last_read_at: datetime | None = None
# S1-ADD (migration 287): 원본 파일명 + 중복검사. 앱은 옵셔널 디코딩, 없으면 폴백.
original_filename: str | None = None # 다운로드 라벨용. 없으면 file_path basename 폴백(앱 측).
duplicate_of: int | None = None # canonical doc id (자기 자신이 canonical 이면 None).
duplicate_count: int = 0 # 본인 제외 동일 판정 사본 수 (canonical 행 기준).
class Config:
from_attributes = True
@@ -140,6 +144,16 @@ class DocumentDetailResponse(DocumentResponse):
md_extraction_engine_version: str | None = None
md_generated_at: datetime | None = None
@field_validator("md_status", mode="before")
@classmethod
def _db_success_to_completed(cls, v: str | None) -> str | None:
"""DB CHECK enum 은 'success'; 계약/fixture·앱 MD-first 렌더 트리거는 'completed'.
read-time(DB→API) 단방향 매핑만 — write 경로(ORM)는 이 모델을 거치지 않아 미적용.
pending/processing/partial/failed/skipped 는 양쪽 동일하므로 'success' 만 매핑한다.
(불변식: md_status ∈ {success,partial} ⟹ md_content 非공백 = 워커 postcondition, C-5.)
"""
return "completed" if v == "success" else v
class AcceptSuggestionRequest(BaseModel):
"""§1 accept-suggestion 요청 body — stale payload / doc 수정 검출."""
+14 -1
View File
@@ -3,7 +3,7 @@
from datetime import datetime
from pgvector.sqlalchemy import Vector
from sqlalchemy import BigInteger, Boolean, DateTime, Enum, Integer, String, Text
from sqlalchemy import BigInteger, Boolean, DateTime, Enum, ForeignKey, Integer, String, Text
from sqlalchemy.dialects.postgresql import JSONB
from sqlalchemy.orm import Mapped, mapped_column
@@ -28,6 +28,19 @@ class Document(Base):
)
import_source: Mapped[str | None] = mapped_column(Text)
# 1계층: 원본명 + 중복검사 (S1-ADD, migration 287)
# original_filename = 업로드 원본 파일명(다운로드 라벨용). file_path 는 충돌 시 _N 리네임됨.
# cf. original_format(ODF 변환용) / original_path·original_hash(007 legacy dead) 와 의미 구분.
# duplicate_of = canonical doc id (자기 자신이 canonical 이면 NULL). FK ON DELETE SET NULL.
# duplicate_count = canonical 행에 담는 '본인 제외 동일 판정 사본 수' (group_size-1). 업로드/backfill 가 갱신.
original_filename: Mapped[str | None] = mapped_column(Text)
duplicate_of: Mapped[int | None] = mapped_column(
BigInteger, ForeignKey("documents.id", ondelete="SET NULL")
)
duplicate_count: Mapped[int] = mapped_column(
Integer, nullable=False, default=0, server_default="0"
)
# 2계층: 텍스트 추출
extracted_text: Mapped[str | None] = mapped_column(Text)
extracted_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True))
+134
View File
@@ -0,0 +1,134 @@
"""office/hwp → Markdown 하이브리드 변환기 (plan ds-s1-backend-1, C-1 PoC).
★ PoC 상태 — marker_worker 에 아직 연결하지 않음(그건 C-2). 본 모듈은 변환 *계약*과
PoC 하니스(scripts/poc_office_md.py)가 호출하는 순수 함수만 제공한다.
전략 (하이브리드):
- OOXML(.docx/.xlsx/.pptx) → markitdown ← 신규 의존성(pip install markitdown). lazy import.
- .hwp/.hwpx → LibreOffice(headless) → HTML → markdownify ← markdownify 기존 의존성.
(LibreOffice 가 hwp import 필터 보유. .hwpx 는 .hwp 와 다른 필터·버전 의존 → E-1: prod LibreOffice
버전핀 안전컨텍스트에서 PoC 실행. 표 fidelity 가 진짜 리스크 — 하니스가 측정.)
실패 계약 (C-5 postcondition 의 backend 절반):
변환 실패·빈 출력·타임아웃·의존성 부재 → OfficeMdError 를 raise 한다.
**success + 빈 md 를 절대 반환하지 않는다** — 호출부(C-2 marker_worker)가 이를 잡아
md_status='failed'(¬success·¬skipped) 로 라우팅한다. 불변식: md_status ∈ {success,partial} ⟹ md_content 非공백.
"""
from __future__ import annotations
import os
import shutil
import subprocess
import tempfile
from pathlib import Path
OOXML_FORMATS = {".docx", ".xlsx", ".pptx"}
HWP_FORMATS = {".hwp", ".hwpx"}
SUPPORTED = OOXML_FORMATS | HWP_FORMATS
# 빈 출력 판정 임계 — 공백 제거 후 이 미만이면 '실패(빈 변환)'로 본다.
_MIN_BODY_CHARS = 16
_SOFFICE_BIN = os.environ.get("LIBREOFFICE_BIN", "soffice")
class OfficeMdError(Exception):
"""office/hwp → md 변환 실패 신호. 호출부는 md_status='failed' 로 라우팅."""
def convert_office_to_md(path: str | Path, *, timeout: int = 90) -> str:
"""office/hwp 파일을 Markdown 문자열로 변환. 실패/빈출력 시 OfficeMdError raise."""
p = Path(path)
suffix = p.suffix.lower()
if suffix not in SUPPORTED:
raise OfficeMdError(f"unsupported suffix for office_md: {suffix!r}")
if not p.exists():
raise OfficeMdError(f"file not found: {p}")
if suffix in OOXML_FORMATS:
md = _via_markitdown(p)
else: # .hwp / .hwpx
md = _via_libreoffice_html(p, timeout=timeout)
md = (md or "").strip()
if len(md) < _MIN_BODY_CHARS:
raise OfficeMdError(f"empty/too-short conversion ({len(md)} chars) for {p.name}")
return md
def _via_markitdown(path: Path) -> str:
try:
from markitdown import MarkItDown # lazy — 신규 의존성
except ImportError as e: # noqa: BLE001
raise OfficeMdError(
"markitdown 미설치 (OOXML 변환에 필요) — `pip install markitdown`. "
"C-1 PoC 는 prod worker 이미지/버전핀 컨텍스트에서 실행(E-1)."
) from e
try:
result = MarkItDown().convert(str(path))
except Exception as e: # noqa: BLE001 — 어떤 변환 예외든 failed 로 라우팅
raise OfficeMdError(f"markitdown 변환 실패: {path.name}: {e}") from e
return getattr(result, "text_content", "") or ""
def _via_libreoffice_html(path: Path, *, timeout: int) -> str:
"""LibreOffice headless 로 HTML 변환 후 markdownify. hwp/hwpx 용."""
try:
from markdownify import markdownify # 기존 의존성
except ImportError as e: # noqa: BLE001
raise OfficeMdError("markdownify 미설치(기존 의존성이어야 함)") from e
with tempfile.TemporaryDirectory(prefix="office_md_") as tmp:
tmpdir = Path(tmp)
# soffice 동시 실행 시 user profile 락 충돌 회피 — 호출별 격리 프로필.
profile = tmpdir / "lo_profile"
cmd = [
_SOFFICE_BIN,
"--headless",
"--nologo",
"--nofirststartwizard",
f"-env:UserInstallation=file://{profile}",
"--convert-to",
"html",
"--outdir",
str(tmpdir),
str(path),
]
try:
proc = subprocess.run(
cmd, capture_output=True, text=True, timeout=timeout, check=False
)
except FileNotFoundError as e:
raise OfficeMdError(
f"LibreOffice 바이너리 부재({_SOFFICE_BIN}) — LIBREOFFICE_BIN 설정 또는 설치 필요"
) from e
except subprocess.TimeoutExpired as e:
raise OfficeMdError(f"LibreOffice 변환 타임아웃({timeout}s): {path.name}") from e
html_path = tmpdir / f"{path.stem}.html"
if proc.returncode != 0 or not html_path.exists():
raise OfficeMdError(
f"LibreOffice html 변환 실패: {path.name} (rc={proc.returncode}): "
f"{(proc.stderr or proc.stdout or '').strip()[:300]}"
)
html = html_path.read_text(encoding="utf-8", errors="replace")
# 표 보존 위해 markdownify 가 table 을 GFM 으로 — heading_style ATX.
return markdownify(html, heading_style="ATX", strip=["span", "font"])
def table_fidelity(md: str) -> dict:
"""E-1 표 fidelity 의 crude 지표 — GFM 표 행/구분행 카운트 (정밀 평가 아님, 회귀 신호)."""
lines = md.splitlines()
pipe_rows = sum(1 for ln in lines if ln.strip().startswith("|") and ln.strip().endswith("|"))
sep_rows = sum(
1 for ln in lines
if ln.strip().startswith("|") and set(ln.strip()) <= set("|-: ")
)
return {
"chars": len(md),
"lines": len(lines),
"table_pipe_rows": pipe_rows,
"table_separator_rows": sep_rows, # 표 개수의 근사
"has_heading": any(ln.lstrip().startswith("#") for ln in lines),
}
+18
View File
@@ -0,0 +1,18 @@
-- 287_documents_dedup_fields.sql
-- S1-ADD (plan ds-s1-backend-1, A-1): 원본 파일명 + 중복검사 메타 3컬럼.
-- 계약: ds-app contract/CONTRACT.md [S1-ADD] — original_filename / duplicate_of / duplicate_count.
--
-- asyncpg exec_driver_sql 단일 statement 제약 — ALTER TABLE 다중 ADD COLUMN 절은 단일 statement 라 허용.
-- BEGIN/COMMIT 금지. PG 16: ADD COLUMN ... DEFAULT <constant> 는 fast path (table rewrite 없음).
-- duplicate_of self-FK 는 신규 all-NULL 컬럼이라 검증 스캔 trivial (NOT VALID 불요).
-- ON DELETE SET NULL: 원본(canonical) hard delete 허용 (RESTRICT=삭제 차단 / CASCADE=사본 연쇄삭제 위험 회피).
-- 기존 dup 그룹(law_monitor 제외)의 duplicate_of/duplicate_count backfill 은 B-4 별 배치 스크립트.
-- 28,941행 대량 UPDATE 를 startup migration(단일 트랜잭션)에 넣지 않는다.
--
-- original_filename 은 original_format(ODF 변환용)·original_path/original_hash(migration 007 legacy dead,
-- app 코드 미참조 — P0-1 grep 0건) 와 의미가 다르다: 업로드 시점 원본 파일명(다운로드 라벨용).
ALTER TABLE documents
ADD COLUMN IF NOT EXISTS original_filename TEXT,
ADD COLUMN IF NOT EXISTS duplicate_of BIGINT REFERENCES documents(id) ON DELETE SET NULL,
ADD COLUMN IF NOT EXISTS duplicate_count INTEGER NOT NULL DEFAULT 0;
+77
View File
@@ -0,0 +1,77 @@
#!/usr/bin/env python3
"""C-1 PoC 하니스 — office/hwp → md 변환 품질(특히 표 fidelity) 측정.
plan ds-s1-backend-1 C-1/E-1:
- hwp/hwpx 결과는 LibreOffice 버전 의존 **prod extract_worker 동일 버전(버전핀 안전컨텍스트)** 에서 실행해야
신호가 transfer . live worker job 태우는 아님(점유 0).
- OOXML markitdown(신규 dep): `pip install markitdown`.
- 샘플은 trivial 말고 **대표 복잡본**(법령·KGS 중심 .hwp/.hwpx, 병합셀/다중시트 xlsx).
사용:
python scripts/poc_office_md.py <file_or_dir> [<file_or_dir> ...]
# 예: 현 코퍼스 백필 후보(doc/docx/xls/xlsx/hwp) 샘플 디렉토리
python scripts/poc_office_md.py ~/poc_samples/
파일: 변환 성공 char/ 행수/heading 지표 + 본문 미리보기.
실패(OfficeMdError) FAILED 출력 이것이 C-5 md_status='failed' 라우팅할 케이스(설계대로).
"""
from __future__ import annotations
import os
import sys
from pathlib import Path
# app/ 를 path 에 (모듈 import 용).
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "app"))
from workers.office_md import SUPPORTED, OfficeMdError, convert_office_to_md, table_fidelity # noqa: E402
def _iter_targets(args: list[str]):
for a in args:
p = Path(a).expanduser()
if p.is_dir():
for child in sorted(p.rglob("*")):
if child.is_file() and child.suffix.lower() in SUPPORTED:
yield child
elif p.is_file():
yield p
else:
print(f" (skip, 경로 없음: {p})")
def main(argv: list[str]) -> int:
if not argv:
print(__doc__)
return 2
targets = list(_iter_targets(argv))
if not targets:
print("변환 대상(.docx/.xlsx/.pptx/.hwp/.hwpx) 없음.")
return 1
ok = fail = 0
for path in targets:
print(f"\n=== {path.name} ({path.suffix.lower()}) ===")
try:
md = convert_office_to_md(path)
except OfficeMdError as e:
fail += 1
print(f" FAILED → (C-5 가 md_status='failed' 라우팅) : {e}")
continue
ok += 1
fid = table_fidelity(md)
print(f" OK chars={fid['chars']} lines={fid['lines']} "
f"table_rows={fid['table_pipe_rows']} (sep≈표수 {fid['table_separator_rows']}) "
f"heading={fid['has_heading']}")
preview = "\n".join(f" | {ln}" for ln in md.splitlines()[:12])
print(preview)
print(f"\n--- 합계: OK {ok} / FAILED {fail} / 총 {len(targets)} ---")
print("표 fidelity 가 낮으면(table_rows 0 등) 해당 포맷은 변환기/필터 재검토 — "
"OOXML↔markitdown, hwp/hwpx↔LibreOffice 경계를 데이터로 확정(C-1).")
return 0 if fail == 0 else 1
if __name__ == "__main__":
raise SystemExit(main(sys.argv[1:]))
+168
View File
@@ -0,0 +1,168 @@
"""S1-ADD (plan ds-s1-backend-1) A-4 — call-shape regression + md_status 매핑 동작 검증.
검증 대상 (값이 아니라 *동작*):
1. DB md_status='success' 응답 'completed' 단방향 매핑 (P0-3 silent-fallback 함정 가드의 backend 절반).
- partial/pending/failed/skipped/None 그대로 통과 ('success' 매핑).
2. [S1-ADD] 3필드(original_filename / duplicate_of / duplicate_count) 디코드 + 기본값(duplicate_count=0).
3. (있으면) ds-app contract fixtures 응답 모델로 디코드 계약 shape 비파괴.
주의 테스트는 backend 직렬화 절반만 커버한다.
앱이 'completed' 실제 md-first 렌더 분기로 태우는지(¬extracted_text) S3 fixture-render 테스트가 책임진다
(A 그룹 close = backend green AND S3 render green, owner 명기 plan A-4).
실행 환경: app/ 의존성 설치된 컨텍스트(devsbx/GPU). 순수 단위(DB 불요).
"""
from __future__ import annotations
import json
import logging
import os
import sys
from datetime import datetime, timezone
from pathlib import Path
import pytest
# logs/ 가 운영 daemon(root) 소유일 때 import-time FileHandler PermissionError 방어 (test 한정).
_orig_file_handler = logging.FileHandler
def _safe_file_handler(filename, *args, **kwargs): # type: ignore[no-untyped-def]
try:
return _orig_file_handler(filename, *args, **kwargs)
except PermissionError:
return logging.NullHandler()
logging.FileHandler = _safe_file_handler # type: ignore[assignment]
# api.documents import 가 SQLAlchemy engine init 를 트리거 — dummy DATABASE_URL (실제 connect X).
os.environ.setdefault("DATABASE_URL", "postgresql+asyncpg://test:test@localhost:5432/test")
# tests/ → 프로젝트 루트 → app/
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "app"))
from api.documents import ( # noqa: E402
DocumentDetailResponse,
DocumentListResponse,
DocumentResponse,
)
_NOW = datetime(2026, 6, 4, 8, 0, 0, tzinfo=timezone.utc)
def _base_detail(**overrides) -> dict:
"""DocumentDetailResponse 가 요구하는 전 필드(필수 포함) 완비 dict. overrides 로 일부 교체."""
d = {
"id": 4912,
"file_path": "Engineering/ASME/x.pdf",
"file_format": "pdf",
"file_size": 1338920,
"file_type": "document",
"title": "x",
"ai_domain": "Engineering",
"ai_sub_group": "압력용기",
"ai_tags": ["ASME"],
"ai_summary": "요약",
"document_type": "standard",
"importance": "high",
"ai_confidence": 0.9,
"user_note": None,
"user_tags": None,
"pinned": True,
"ask_includable": True,
"derived_path": None,
"original_format": "pdf",
"conversion_status": "completed",
"is_read": True,
"review_status": "approved",
"edit_url": None,
"preview_status": "ready",
"source_channel": "upload",
"data_origin": "external",
"doc_purpose": "reference",
"extracted_at": _NOW,
"ai_processed_at": _NOW,
"embedded_at": _NOW,
"created_at": _NOW,
"updated_at": _NOW,
# detail 전용
"extracted_text": "원문 폴백 텍스트",
"md_content": "# 제목\n본문",
"md_frontmatter": {},
"md_status": "success",
"md_extraction_engine": "marker",
"md_generated_at": _NOW,
}
d.update(overrides)
return d
# ── 1. ★ md_status 단방향 매핑 (success → completed) ──────────────────────────
def test_db_success_serializes_as_completed():
m = DocumentDetailResponse.model_validate(_base_detail(md_status="success"))
assert m.md_status == "completed", "DB 'success' 는 응답에서 'completed' 로 매핑돼야 함(MD-first 렌더 트리거)"
# model_dump(직렬화) 까지 확인 — 앱이 받는 실제 값.
assert m.model_dump()["md_status"] == "completed"
@pytest.mark.parametrize("raw", ["pending", "processing", "partial", "failed", "skipped", None])
def test_non_success_statuses_pass_through(raw):
m = DocumentDetailResponse.model_validate(_base_detail(md_status=raw))
assert m.md_status == raw, f"'{raw}' 는 매핑 대상 아님 — 그대로 통과해야 함"
def test_mapping_is_read_only_not_a_write_path():
# 이 모델은 응답 직렬화 전용 — write(ORM) 경로가 'completed' 를 DB 로 되쓰지 않는지의 1차 방어선.
# 'completed' 입력이 들어와도(예: fixture) 그대로 'completed' (재매핑 없음, 멱등).
m = DocumentDetailResponse.model_validate(_base_detail(md_status="completed"))
assert m.md_status == "completed"
# ── 2. [S1-ADD] 3필드 디코드 + 기본값 ────────────────────────────────────────
def test_s1add_fields_default_on_list_response():
# DocumentResponse(리스트 행)에도 3필드 존재 — 미제공 시 기본값.
base = {k: v for k, v in _base_detail().items()
if k not in {"extracted_text", "md_content", "md_frontmatter", "md_status",
"md_extraction_engine", "md_generated_at"}}
m = DocumentResponse.model_validate(base)
assert m.duplicate_count == 0
assert m.duplicate_of is None
assert m.original_filename is None
def test_s1add_fields_roundtrip_values():
m = DocumentDetailResponse.model_validate(
_base_detail(original_filename="보고서.docx", duplicate_of=4912, duplicate_count=2)
)
assert m.original_filename == "보고서.docx"
assert m.duplicate_of == 4912
assert m.duplicate_count == 2
# ── 3. ds-app contract fixtures 디코드 (있으면) ──────────────────────────────
_FIXDIR = Path(os.path.expanduser("~/Documents/code/ds-app/contract/fixtures"))
@pytest.mark.skipif(not _FIXDIR.exists(), reason="ds-app contract fixtures 미존재(독립 repo) — 디코드 회귀 skip")
@pytest.mark.parametrize("fname", ["document_detail.json", "document_detail_pending_md.json"])
def test_contract_detail_fixture_decodes(fname):
payload = json.loads((_FIXDIR / fname).read_text())
m = DocumentDetailResponse.model_validate(payload)
# fixture 의 md_status 는 이미 API 어휘('completed'/'pending') — 매핑 멱등.
assert m.md_status == payload["md_status"]
# [S1-ADD] 필드가 fixture 에 있으면 디코드 일치.
if "duplicate_count" in payload:
assert m.duplicate_count == payload["duplicate_count"]
@pytest.mark.skipif(not _FIXDIR.exists(), reason="ds-app contract fixtures 미존재")
def test_contract_list_fixture_decodes():
payload = json.loads((_FIXDIR / "documents_list.json").read_text())
m = DocumentListResponse.model_validate(payload)
assert m.total == payload["total"]
assert len(m.items) == len(payload["items"])