feat(markdown): hwp raster 이미지 NAS 영속 + library backfill 스크립트

pyhwp(hwp5html) 가 bindata/ 로 추출하는 raster 이미지를 NAS 에 영속한다. 기존엔
변환 tempdir 와 함께 폐기돼 경고 없이 silent 유실(도식·수식)이었다(적대 리뷰 MEDIUM).

- office_md.py: _run_hwp5html 으로 hwp5html 1회 실행 → (markdown, raster_images).
  convert_hwp_to_md_and_images() 신규 = marker_worker 이미지 경로용. hwp5html 은 이미지를
  본문 xhtml 에 <img> 앵커하지 않아(--css/--html 동일) 인라인 위치 복원 불가 → 호출부가
  말미 갤러리로 부착. OLE 수식/도형은 앵커도 raster 도 아니라 영속 제외.
- marker_worker._process_office: .hwp raster 를 marker(PDF)의 _persist_images_to_nas 로
  NAS 영속 + document_images UPSERT(_sync_document_images, 재변환 orphan 정리) + md 말미
  ## 첨부 이미지 docimg: 갤러리 + quality.warnings hwp_images_appended. docx/xlsx/pptx/
  hwpx 는 이미지 미처리(기존 동작 유지).
- scripts/backfill_hwp_library.py: 지정 PKM 폴더 .hwp 를 content-hash dedup(Inbox 중복 +
  _1/카피본 사본 흡수) 후 category=library 일회성 ingest.

검증(E2E): Knowledge/Engineering 18개 → dedup 후 신규 5개(산업안전기사 3~7과목) ingest,
5/5 success. 제4과목 raster 3장 → NAS extracted_images/35778/img_001~003.jpeg 실재 +
document_images 3 row(engine=pyhwp) + md 갤러리 docimg ref. 이미지 없는 문서는 갤러리
미생성. 텍스트/표 경로 회귀 0(기존 4건 재변환 success).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
hyungi
2026-06-09 05:10:45 +00:00
parent d0994a1bce
commit 55216271a6
3 changed files with 230 additions and 8 deletions
+70 -4
View File
@@ -394,7 +394,11 @@ async def _process_office(
partial arm 은 PDF split 전용 — office 는 이진이라 여기 없음. 'completed' 는 A-3 직렬화 전용(워커 미사용).
quality 는 content-type-aware: office=scored(_compute_quality). 동기 변환은 to_thread 로 event loop 비차단.
"""
from workers.office_md import OfficeMdError, convert_office_to_md
from workers.office_md import (
OfficeMdError,
convert_hwp_to_md_and_images,
convert_office_to_md,
)
suffix = Path(container_path).suffix.lower()
if suffix == ".hwp":
@@ -403,9 +407,16 @@ async def _process_office(
engine = "libreoffice_hwp" # HWPX 는 pyhwp 미지원 → LibreOffice 폴백
else:
engine = "markitdown"
hwp_images: list[dict[str, Any]] = []
try:
# 동기 subprocess(LibreOffice)/markitdown — 스레드로 빼서 이벤트 루프 비차단.
md_content = await asyncio.to_thread(convert_office_to_md, container_path)
# 동기 subprocess/markitdown — 스레드로 빼서 이벤트 루프 비차단.
if suffix == ".hwp":
md_content, hwp_images = await asyncio.to_thread(
convert_hwp_to_md_and_images, container_path
)
else:
md_content = await asyncio.to_thread(convert_office_to_md, container_path)
except OfficeMdError as exc:
logger.warning(f"[marker] office md 변환 실패 id={document_id} engine={engine}: {exc}")
await _fail(session, document_id, f"office_md: {str(exc)[:990]}", engine=engine)
@@ -415,8 +426,49 @@ async def _process_office(
await _fail(session, document_id, f"office_md_unexpected: {str(exc)[:980]}", engine=engine)
return
# ---- 이미지 NAS persist (.hwp 전용) ----
# hwp5html 은 bindata raster 를 추출하나 본문 xhtml 에 <img> 앵커가 없어(orphan, --css/--html
# 동일) 인라인 위치 복원 불가 → marker(PDF) 의 _persist_images_to_nas 로 NAS 영속 후 md 말미
# 갤러리로 부착(docimg: ref = 뷰어 해석). OLE 수식/도형은 앵커도 raster 도 아니라 제외.
# docx/xlsx/pptx/hwpx 는 이미지 미처리(기존 동작 유지).
saved_images: list[dict[str, Any]] = []
orphan_paths: list[str] = []
if suffix == ".hwp" and MARKDOWN_IMAGE_PERSIST:
if hwp_images:
images_resp = [
{
"bytes_b64": base64.b64encode(im["data"]).decode("ascii"),
"format": im.get("format") or "png",
"slug": "",
"width": None,
"height": None,
}
for im in hwp_images
]
try:
saved_images = _persist_images_to_nas(document_id, images_resp)
except OSError as exc:
# NAS 일시 끊김 등 — transient. queue retry 로 복구.
logger.warning(
f"[marker] hwp image persist NAS write failed id={document_id}: "
f"{type(exc).__name__}: {exc}"
)
raise
if saved_images:
gallery = "\n\n## 첨부 이미지\n\n" + "\n\n".join(
f"![](docimg:{img['image_key']})" for img in saved_images
)
md_content = md_content + gallery
# 재변환 시 현재 saved_images 기준으로 과거 document_images row/NAS 파일 정리.
orphan_paths = await _sync_document_images(
session, document_id, saved_images, {"engine": engine}
)
# 성공 — 계약상 md_content 는 비공백(빈출력은 raise). quality scored.
quality = _compute_quality(md_content, doc.extracted_text or "", {"page_count": None})
if saved_images:
quality.setdefault("warnings", []).append(f"hwp_images_appended:{len(saved_images)}")
await session.execute(
update(Document).where(Document.id == document_id).values(
md_content=md_content,
@@ -434,7 +486,21 @@ async def _process_office(
)
)
await session.commit()
logger.info(f"[marker] office success id={document_id} engine={engine} len={len(md_content)}")
# commit 후 고아 NAS 파일 unlink (best-effort, 실패해도 DB 정합 유지).
for orphan_path in orphan_paths:
try:
Path(orphan_path).unlink(missing_ok=True)
except Exception as exc:
logger.warning(
f"[marker] orphan image unlink failed id={document_id} path={orphan_path}: "
f"{type(exc).__name__}: {exc}"
)
logger.info(
f"[marker] office success id={document_id} engine={engine} "
f"len={len(md_content)} images={len(saved_images)}"
)
async def _process_split(
+50 -4
View File
@@ -40,6 +40,10 @@ _SOFFICE_BIN = os.environ.get("LIBREOFFICE_BIN", "libreoffice")
# pyhwp 콘솔 스크립트(pip install pyhwp 시 PATH 등록). HWP5 binary(.hwp) 전용.
_HWP5HTML_BIN = os.environ.get("HWP5HTML_BIN", "hwp5html")
# hwp5html 이 bindata/ 로 추출하는 첨부물 중 NAS 영속 대상 raster 확장자.
# (OLE 수식/도형은 index.xhtml 에 앵커가 없어 위치 복원 불가 → 영속 제외.)
_RASTER_EXTS = {"jpg", "jpeg", "png", "gif", "bmp"}
class OfficeMdError(Exception):
"""office/hwp → md 변환 실패 신호. 호출부는 md_status='failed' 로 라우팅."""
@@ -82,12 +86,16 @@ def _via_markitdown(path: Path) -> str:
return getattr(result, "text_content", "") or ""
def _via_pyhwp_html(path: Path, *, timeout: int) -> str:
"""HWP5 binary(.hwp) → pyhwp hwp5html → markdownify.
def _run_hwp5html(path: Path, *, timeout: int) -> tuple[str, list[dict]]:
"""HWP5 binary(.hwp) → (markdown, raster_images). hwp5html 1회 실행 = md + 이미지 동시 추출.
LibreOffice 번들 libhwplo 필터가 실제 한컴 HWP5 파일을 못 읽어(rc=0 + 'source file could
not be loaded') 전건 실패 → 순수 Python HWP5 전용 변환기 pyhwp(CLI hwp5html)로 교체.
`_via_libreoffice_html` 와 동일한 실패 계약(rc≠0 또는 출력 부재 → OfficeMdError raise).
raster_images = [{'data': bytes, 'format': 'jpeg'|'png'|...}] — bindata/ 의 래스터만.
hwp5html 은 이미지를 본문 xhtml 에 <img> 로 앵커하지 않으므로(bindata orphan, --css/--html 동일)
인라인 위치는 복원 불가 → 호출부가 NAS 영속 후 말미 갤러리로 부착한다.
"""
try:
from markdownify import markdownify # 기존 의존성
@@ -96,7 +104,7 @@ def _via_pyhwp_html(path: Path, *, timeout: int) -> str:
with tempfile.TemporaryDirectory(prefix="office_md_hwp_") as tmp:
outdir = Path(tmp)
# hwp5html --output <dir> <file.hwp> → <dir>/index.xhtml + styles.css
# hwp5html --output <dir> <file.hwp> → <dir>/index.xhtml + styles.css + bindata/
cmd = [_HWP5HTML_BIN, "--output", str(outdir), str(path)]
try:
proc = subprocess.run(
@@ -122,7 +130,45 @@ def _via_pyhwp_html(path: Path, *, timeout: int) -> str:
# _MIN_BODY_CHARS(16) 빈출력 게이트를 무력화(빈 변환의 false-success) → markdownify 전에 제거.
html = re.sub(r"^\s*<\?xml[^>]*\?>\s*", "", html)
# 표 보존 위해 markdownify 가 table 을 GFM 으로 — heading_style ATX (libreoffice 경로와 동일).
return markdownify(html, heading_style="ATX", strip=["span", "font"])
md = markdownify(html, heading_style="ATX", strip=["span", "font"])
images: list[dict] = []
bindata = outdir / "bindata"
if bindata.is_dir():
for f in sorted(bindata.iterdir()):
ext = f.suffix.lower().lstrip(".")
if ext in _RASTER_EXTS:
images.append({
"data": f.read_bytes(),
"format": "jpeg" if ext == "jpg" else ext,
})
return md, images
def _via_pyhwp_html(path: Path, *, timeout: int) -> str:
"""HWP5 binary(.hwp) → markdown (이미지 제외). convert_office_to_md 단일 텍스트 경로용."""
md, _images = _run_hwp5html(path, timeout=timeout)
return md
def convert_hwp_to_md_and_images(
path: str | Path, *, timeout: int = 90
) -> tuple[str, list[dict]]:
"""HWP5(.hwp) → (markdown, raster_images). marker_worker 이미지 영속 경로 전용.
실패/빈출력 계약은 convert_office_to_md 와 동일(OfficeMdError raise / 빈 md 절대 반환 금지).
raster_images 원소 = {'data': bytes, 'format': str}; 비어있을 수 있음(이미지 없는 문서).
"""
p = Path(path)
if p.suffix.lower() != ".hwp":
raise OfficeMdError(f"convert_hwp_to_md_and_images: .hwp 전용, got {p.suffix!r}")
if not p.exists():
raise OfficeMdError(f"file not found: {p}")
md, images = _run_hwp5html(p, timeout=timeout)
md = (md or "").strip()
if len(md) < _MIN_BODY_CHARS:
raise OfficeMdError(f"empty/too-short conversion ({len(md)} chars) for {p.name}")
return md, images
def _via_libreoffice_html(path: Path, *, timeout: int) -> str:
+110
View File
@@ -0,0 +1,110 @@
"""HWP(library) 백필 — 지정 PKM 폴더의 .hwp 를 content-hash dedup 후 일회성 ingest.
산업안전기사 외부 학습자료(category='library') 코퍼스에 편입한다. file_watcher
PKM 트랙 로직을 재사용하되 dedup file_path 아닌 **file_hash** 기준으로 해서
(a) Inbox 중복 (b) `_1`/`카피본` 사본을 1건으로 수렴시킨다(file_watcher path dedup 이라
동일내용 다른경로를 중복 ingest ). 이후 파이프라인:
extract(텍스트) classify embed/chunk(검색) markdown(.hwp=pyhwp hwp5html + raster NAS 영속)
실행 (GPU 서버):
# dry-run (기본) — 무엇이 ingest/skip 될지만 출력
docker exec hyungi_document_server-fastapi-1 \
python /app/scripts/backfill_hwp_library.py --subdir Knowledge/Engineering
# 실제 ingest
docker exec hyungi_document_server-fastapi-1 \
python /app/scripts/backfill_hwp_library.py --subdir Knowledge/Engineering --commit
"""
import argparse
import asyncio
import sys
from pathlib import Path
from sqlalchemy import select
from core.config import settings
from core.database import async_session
from core.utils import file_hash
from models.document import Document
from models.queue import enqueue_stage
async def run(subdir: str, commit: bool) -> int:
nas_root = Path(settings.nas_mount_path)
scan_root = nas_root / "PKM" / subdir
if not scan_root.exists():
print(f"[backfill] scan_root 부재: {scan_root}", file=sys.stderr)
return 2
files = sorted(
p for p in scan_root.rglob("*") if p.is_file() and p.suffix.lower() == ".hwp"
)
print(f"[backfill] {scan_root} 하위 .hwp {len(files)}개 발견 / mode={'COMMIT' if commit else 'DRY-RUN'}")
ingested = skipped_existing = skipped_batchdup = 0
seen_hashes: set[str] = set()
async with async_session() as session:
for fp in files:
rel_path = str(fp.relative_to(nas_root))
fhash = file_hash(fp)
if fhash in seen_hashes:
print(f" SKIP(batch-dup) {rel_path}")
skipped_batchdup += 1
continue
seen_hashes.add(fhash)
# content-hash dedup (path 무관) — Inbox 중복 + _1/카피본 사본 흡수
existing = (
await session.execute(
select(Document.id).where(Document.file_hash == fhash).limit(1)
)
).first()
if existing:
print(f" SKIP(exists id={existing[0]}) {rel_path}")
skipped_existing += 1
continue
ingested += 1
if not commit:
print(f" INGEST(dry) {rel_path}")
continue
doc = Document(
file_path=rel_path,
file_hash=fhash,
file_format="hwp",
file_size=fp.stat().st_size,
file_type="immutable",
title=fp.stem,
source_channel="drive_sync",
category="library",
needs_conversion=False,
)
session.add(doc)
await session.flush()
await enqueue_stage(session, doc.id, "extract")
print(f" INGEST id={doc.id} {rel_path}")
if commit:
await session.commit()
print(
f"[backfill] done — ingest={ingested} "
f"skip_existing={skipped_existing} skip_batchdup={skipped_batchdup}"
)
return 0
def main() -> int:
ap = argparse.ArgumentParser(description=__doc__)
ap.add_argument("--subdir", default="Knowledge/Engineering", help="PKM 하위 스캔 폴더")
ap.add_argument("--commit", action="store_true", help="실제 ingest (없으면 dry-run)")
args = ap.parse_args()
return asyncio.run(run(args.subdir, args.commit))
if __name__ == "__main__":
sys.exit(main())