diff --git a/app/workers/marker_worker.py b/app/workers/marker_worker.py index 60cb273..cc2c70d 100644 --- a/app/workers/marker_worker.py +++ b/app/workers/marker_worker.py @@ -394,7 +394,11 @@ async def _process_office( partial arm 은 PDF split 전용 — office 는 이진이라 여기 없음. 'completed' 는 A-3 직렬화 전용(워커 미사용). quality 는 content-type-aware: office=scored(_compute_quality). 동기 변환은 to_thread 로 event loop 비차단. """ - from workers.office_md import OfficeMdError, convert_office_to_md + from workers.office_md import ( + OfficeMdError, + convert_hwp_to_md_and_images, + convert_office_to_md, + ) suffix = Path(container_path).suffix.lower() if suffix == ".hwp": @@ -403,9 +407,16 @@ async def _process_office( engine = "libreoffice_hwp" # HWPX 는 pyhwp 미지원 → LibreOffice 폴백 else: engine = "markitdown" + + hwp_images: list[dict[str, Any]] = [] try: - # 동기 subprocess(LibreOffice)/markitdown — 스레드로 빼서 이벤트 루프 비차단. - md_content = await asyncio.to_thread(convert_office_to_md, container_path) + # 동기 subprocess/markitdown — 스레드로 빼서 이벤트 루프 비차단. + if suffix == ".hwp": + md_content, hwp_images = await asyncio.to_thread( + convert_hwp_to_md_and_images, container_path + ) + else: + md_content = await asyncio.to_thread(convert_office_to_md, container_path) except OfficeMdError as exc: logger.warning(f"[marker] office md 변환 실패 id={document_id} engine={engine}: {exc}") await _fail(session, document_id, f"office_md: {str(exc)[:990]}", engine=engine) @@ -415,8 +426,49 @@ async def _process_office( await _fail(session, document_id, f"office_md_unexpected: {str(exc)[:980]}", engine=engine) return + # ---- 이미지 NAS persist (.hwp 전용) ---- + # hwp5html 은 bindata raster 를 추출하나 본문 xhtml 에 앵커가 없어(orphan, --css/--html + # 동일) 인라인 위치 복원 불가 → marker(PDF) 의 _persist_images_to_nas 로 NAS 영속 후 md 말미 + # 갤러리로 부착(docimg: ref = 뷰어 해석). OLE 수식/도형은 앵커도 raster 도 아니라 제외. + # docx/xlsx/pptx/hwpx 는 이미지 미처리(기존 동작 유지). + saved_images: list[dict[str, Any]] = [] + orphan_paths: list[str] = [] + if suffix == ".hwp" and MARKDOWN_IMAGE_PERSIST: + if hwp_images: + images_resp = [ + { + "bytes_b64": base64.b64encode(im["data"]).decode("ascii"), + "format": im.get("format") or "png", + "slug": "", + "width": None, + "height": None, + } + for im in hwp_images + ] + try: + saved_images = _persist_images_to_nas(document_id, images_resp) + except OSError as exc: + # NAS 일시 끊김 등 — transient. queue retry 로 복구. + logger.warning( + f"[marker] hwp image persist NAS write failed id={document_id}: " + f"{type(exc).__name__}: {exc}" + ) + raise + if saved_images: + gallery = "\n\n## 첨부 이미지\n\n" + "\n\n".join( + f"![](docimg:{img['image_key']})" for img in saved_images + ) + md_content = md_content + gallery + # 재변환 시 현재 saved_images 기준으로 과거 document_images row/NAS 파일 정리. + orphan_paths = await _sync_document_images( + session, document_id, saved_images, {"engine": engine} + ) + # 성공 — 계약상 md_content 는 비공백(빈출력은 raise). quality scored. quality = _compute_quality(md_content, doc.extracted_text or "", {"page_count": None}) + if saved_images: + quality.setdefault("warnings", []).append(f"hwp_images_appended:{len(saved_images)}") + await session.execute( update(Document).where(Document.id == document_id).values( md_content=md_content, @@ -434,7 +486,21 @@ async def _process_office( ) ) await session.commit() - logger.info(f"[marker] office success id={document_id} engine={engine} len={len(md_content)}") + + # commit 후 고아 NAS 파일 unlink (best-effort, 실패해도 DB 정합 유지). + for orphan_path in orphan_paths: + try: + Path(orphan_path).unlink(missing_ok=True) + except Exception as exc: + logger.warning( + f"[marker] orphan image unlink failed id={document_id} path={orphan_path}: " + f"{type(exc).__name__}: {exc}" + ) + + logger.info( + f"[marker] office success id={document_id} engine={engine} " + f"len={len(md_content)} images={len(saved_images)}" + ) async def _process_split( diff --git a/app/workers/office_md.py b/app/workers/office_md.py index 4345955..4308111 100644 --- a/app/workers/office_md.py +++ b/app/workers/office_md.py @@ -40,6 +40,10 @@ _SOFFICE_BIN = os.environ.get("LIBREOFFICE_BIN", "libreoffice") # pyhwp 콘솔 스크립트(pip install pyhwp 시 PATH 등록). HWP5 binary(.hwp) 전용. _HWP5HTML_BIN = os.environ.get("HWP5HTML_BIN", "hwp5html") +# hwp5html 이 bindata/ 로 추출하는 첨부물 중 NAS 영속 대상 raster 확장자. +# (OLE 수식/도형은 index.xhtml 에 앵커가 없어 위치 복원 불가 → 영속 제외.) +_RASTER_EXTS = {"jpg", "jpeg", "png", "gif", "bmp"} + class OfficeMdError(Exception): """office/hwp → md 변환 실패 신호. 호출부는 md_status='failed' 로 라우팅.""" @@ -82,12 +86,16 @@ def _via_markitdown(path: Path) -> str: return getattr(result, "text_content", "") or "" -def _via_pyhwp_html(path: Path, *, timeout: int) -> str: - """HWP5 binary(.hwp) → pyhwp hwp5html → markdownify. +def _run_hwp5html(path: Path, *, timeout: int) -> tuple[str, list[dict]]: + """HWP5 binary(.hwp) → (markdown, raster_images). hwp5html 1회 실행 = md + 이미지 동시 추출. LibreOffice 번들 libhwplo 필터가 실제 한컴 HWP5 파일을 못 읽어(rc=0 + 'source file could not be loaded') 전건 실패 → 순수 Python HWP5 전용 변환기 pyhwp(CLI hwp5html)로 교체. `_via_libreoffice_html` 와 동일한 실패 계약(rc≠0 또는 출력 부재 → OfficeMdError raise). + + raster_images = [{'data': bytes, 'format': 'jpeg'|'png'|...}] — bindata/ 의 래스터만. + hwp5html 은 이미지를 본문 xhtml 에 로 앵커하지 않으므로(bindata orphan, --css/--html 동일) + 인라인 위치는 복원 불가 → 호출부가 NAS 영속 후 말미 갤러리로 부착한다. """ try: from markdownify import markdownify # 기존 의존성 @@ -96,7 +104,7 @@ def _via_pyhwp_html(path: Path, *, timeout: int) -> str: with tempfile.TemporaryDirectory(prefix="office_md_hwp_") as tmp: outdir = Path(tmp) - # hwp5html --output /index.xhtml + styles.css + # hwp5html --output /index.xhtml + styles.css + bindata/ cmd = [_HWP5HTML_BIN, "--output", str(outdir), str(path)] try: proc = subprocess.run( @@ -122,7 +130,45 @@ def _via_pyhwp_html(path: Path, *, timeout: int) -> str: # _MIN_BODY_CHARS(16) 빈출력 게이트를 무력화(빈 변환의 false-success) → markdownify 전에 제거. html = re.sub(r"^\s*<\?xml[^>]*\?>\s*", "", html) # 표 보존 위해 markdownify 가 table 을 GFM 으로 — heading_style ATX (libreoffice 경로와 동일). - return markdownify(html, heading_style="ATX", strip=["span", "font"]) + md = markdownify(html, heading_style="ATX", strip=["span", "font"]) + + images: list[dict] = [] + bindata = outdir / "bindata" + if bindata.is_dir(): + for f in sorted(bindata.iterdir()): + ext = f.suffix.lower().lstrip(".") + if ext in _RASTER_EXTS: + images.append({ + "data": f.read_bytes(), + "format": "jpeg" if ext == "jpg" else ext, + }) + return md, images + + +def _via_pyhwp_html(path: Path, *, timeout: int) -> str: + """HWP5 binary(.hwp) → markdown (이미지 제외). convert_office_to_md 단일 텍스트 경로용.""" + md, _images = _run_hwp5html(path, timeout=timeout) + return md + + +def convert_hwp_to_md_and_images( + path: str | Path, *, timeout: int = 90 +) -> tuple[str, list[dict]]: + """HWP5(.hwp) → (markdown, raster_images). marker_worker 이미지 영속 경로 전용. + + 실패/빈출력 계약은 convert_office_to_md 와 동일(OfficeMdError raise / 빈 md 절대 반환 금지). + raster_images 원소 = {'data': bytes, 'format': str}; 비어있을 수 있음(이미지 없는 문서). + """ + p = Path(path) + if p.suffix.lower() != ".hwp": + raise OfficeMdError(f"convert_hwp_to_md_and_images: .hwp 전용, got {p.suffix!r}") + if not p.exists(): + raise OfficeMdError(f"file not found: {p}") + md, images = _run_hwp5html(p, timeout=timeout) + md = (md or "").strip() + if len(md) < _MIN_BODY_CHARS: + raise OfficeMdError(f"empty/too-short conversion ({len(md)} chars) for {p.name}") + return md, images def _via_libreoffice_html(path: Path, *, timeout: int) -> str: diff --git a/scripts/backfill_hwp_library.py b/scripts/backfill_hwp_library.py new file mode 100644 index 0000000..e038127 --- /dev/null +++ b/scripts/backfill_hwp_library.py @@ -0,0 +1,110 @@ +"""HWP(library) 백필 — 지정 PKM 폴더의 .hwp 를 content-hash dedup 후 일회성 ingest. + +산업안전기사 등 외부 학습자료(category='library')를 코퍼스에 편입한다. file_watcher 의 +PKM 트랙 로직을 재사용하되 dedup 을 file_path 가 아닌 **file_hash** 기준으로 해서 +(a) Inbox 중복 (b) `_1`/`카피본` 사본을 1건으로 수렴시킨다(file_watcher 는 path dedup 이라 +동일내용 다른경로를 중복 ingest 함). 이후 파이프라인: + extract(텍스트) → classify → embed/chunk(검색) → markdown(.hwp=pyhwp hwp5html + raster NAS 영속) + +실행 (GPU 서버): + # dry-run (기본) — 무엇이 ingest/skip 될지만 출력 + docker exec hyungi_document_server-fastapi-1 \ + python /app/scripts/backfill_hwp_library.py --subdir Knowledge/Engineering + + # 실제 ingest + docker exec hyungi_document_server-fastapi-1 \ + python /app/scripts/backfill_hwp_library.py --subdir Knowledge/Engineering --commit +""" + +import argparse +import asyncio +import sys +from pathlib import Path + +from sqlalchemy import select + +from core.config import settings +from core.database import async_session +from core.utils import file_hash +from models.document import Document +from models.queue import enqueue_stage + + +async def run(subdir: str, commit: bool) -> int: + nas_root = Path(settings.nas_mount_path) + scan_root = nas_root / "PKM" / subdir + if not scan_root.exists(): + print(f"[backfill] scan_root 부재: {scan_root}", file=sys.stderr) + return 2 + + files = sorted( + p for p in scan_root.rglob("*") if p.is_file() and p.suffix.lower() == ".hwp" + ) + print(f"[backfill] {scan_root} 하위 .hwp {len(files)}개 발견 / mode={'COMMIT' if commit else 'DRY-RUN'}") + + ingested = skipped_existing = skipped_batchdup = 0 + seen_hashes: set[str] = set() + + async with async_session() as session: + for fp in files: + rel_path = str(fp.relative_to(nas_root)) + fhash = file_hash(fp) + + if fhash in seen_hashes: + print(f" SKIP(batch-dup) {rel_path}") + skipped_batchdup += 1 + continue + seen_hashes.add(fhash) + + # content-hash dedup (path 무관) — Inbox 중복 + _1/카피본 사본 흡수 + existing = ( + await session.execute( + select(Document.id).where(Document.file_hash == fhash).limit(1) + ) + ).first() + if existing: + print(f" SKIP(exists id={existing[0]}) {rel_path}") + skipped_existing += 1 + continue + + ingested += 1 + if not commit: + print(f" INGEST(dry) {rel_path}") + continue + + doc = Document( + file_path=rel_path, + file_hash=fhash, + file_format="hwp", + file_size=fp.stat().st_size, + file_type="immutable", + title=fp.stem, + source_channel="drive_sync", + category="library", + needs_conversion=False, + ) + session.add(doc) + await session.flush() + await enqueue_stage(session, doc.id, "extract") + print(f" INGEST id={doc.id} {rel_path}") + + if commit: + await session.commit() + + print( + f"[backfill] done — ingest={ingested} " + f"skip_existing={skipped_existing} skip_batchdup={skipped_batchdup}" + ) + return 0 + + +def main() -> int: + ap = argparse.ArgumentParser(description=__doc__) + ap.add_argument("--subdir", default="Knowledge/Engineering", help="PKM 하위 스캔 폴더") + ap.add_argument("--commit", action="store_true", help="실제 ingest (없으면 dry-run)") + args = ap.parse_args() + return asyncio.run(run(args.subdir, args.commit)) + + +if __name__ == "__main__": + sys.exit(main())