"""텍스트 추출 워커 — kordoc / PyMuPDF / Surya OCR / LibreOffice / 직접 읽기""" import re import subprocess from datetime import datetime, timezone from pathlib import Path import httpx from sqlalchemy.ext.asyncio import AsyncSession from core.config import settings from core.utils import setup_logger from models.document import Document logger = setup_logger("extract_worker") # kordoc으로 파싱 가능한 포맷 KORDOC_FORMATS = {"hwp", "hwpx", "pdf"} # 직접 읽기 가능한 텍스트 포맷 TEXT_FORMATS = {"md", "txt", "csv", "json", "xml", "html"} # LibreOffice로 텍스트 추출 가능한 포맷 OFFICE_FORMATS = {"xlsx", "xls", "docx", "doc", "pptx", "ppt", "odt", "ods", "odp", "odoc", "osheet"} # OCR 대상 이미지 포맷 IMAGE_FORMATS = {"jpg", "jpeg", "png", "tiff", "tif", "bmp", "gif", "webp"} EXTRACTOR_VERSION = "kordoc@1.7" PYMUPDF_VERSION = "pymupdf" # ─── OCR 판정 함수 ─── def _should_ocr(text: str, page_count: int) -> tuple[bool, str]: """텍스트 추출 결과로 OCR 필요 여부 판정 — 2단계""" total = len(text.strip()) if total < 300: return True, "no_text_layer" avg = total / max(page_count, 1) if avg < 80 and total < 3000: return True, "low_text_density" return False, "" def _ocr_skip_reason(file_size: int, page_count: int) -> str | None: """OCR 상한 체크""" if page_count > 200: return "page_limit" if file_size > 150 * 1024 * 1024: return "size_limit" return None def _ocr_quality_ok(text: str, page_count: int, is_image: bool) -> bool: """OCR 결과 품질 검증 — 유형별 차등""" chars = len(text.strip()) if is_image: return chars >= 50 if page_count > 0: return chars >= 200 or (chars / max(page_count, 1)) >= 30 return chars >= 200 def _postprocess_ocr(text: str) -> str: """OCR 후처리 — NUL 제거 + 과도한 공백 정리""" text = text.replace("\x00", "") text = re.sub(r'\s{3,}', '\n', text) return text.strip() def _extract_pdf_pymupdf(file_path: Path) -> str: """PyMuPDF fallback — 페이지 단위 스트리밍으로 대형 PDF도 저메모리 처리""" import fitz text_parts = [] with fitz.open(str(file_path)) as doc: for page in doc: text_parts.append(page.get_text()) return "\n".join(text_parts) def _get_pdf_page_count(file_path: Path) -> int: """PDF 페이지 수 확인""" import fitz with fitz.open(str(file_path)) as doc: return len(doc) async def _call_ocr(file_path: Path, is_image: bool, max_pages: int = 200) -> str | None: """OCR 서비스 호출 — 타임아웃 페이지 수 비례""" container_path = f"/documents/{file_path.relative_to(Path(settings.nas_mount_path))}" timeout = 60 if is_image else min(600, max(120, max_pages * 3)) try: async with httpx.AsyncClient(timeout=timeout) as client: resp = await client.post( f"{settings.ocr_endpoint}/ocr", json={"filePath": container_path, "langs": ["ko", "en"], "maxPages": max_pages}, ) if resp.status_code == 200: data = resp.json() return data.get("text", "") except Exception as e: logger.error(f"[ocr] OCR 서비스 호출 실패: {e}") return None # ─── 메인 처리 ─── async def process(document_id: int, session: AsyncSession) -> None: """문서 텍스트 추출""" doc = await session.get(Document, document_id) if not doc: raise ValueError(f"문서 ID {document_id}를 찾을 수 없음") fmt = doc.file_format.lower() full_path = Path(settings.nas_mount_path) / doc.file_path # ─── 텍스트 파일 — 직접 읽기 ─── if fmt in TEXT_FORMATS: if not full_path.exists(): raise FileNotFoundError(f"파일 없음: {full_path}") text = full_path.read_text(encoding="utf-8", errors="replace") doc.extracted_text = text.replace("\x00", "") doc.extracted_at = datetime.now(timezone.utc) doc.extractor_version = "direct_read" logger.info(f"[텍스트] {doc.file_path} ({len(text)}자)") return # ─── 이미지 — OCR ─── if fmt in IMAGE_FORMATS: meta = doc.extract_meta or {} # OCR 1회 제한 if meta.get("ocr_attempted"): meta["ocr_skip_reason"] = "already_attempted" doc.extract_meta = meta logger.info(f"[이미지] {doc.file_path} — OCR 이미 시도됨, 스킵") return # 상한 체크 skip = _ocr_skip_reason(doc.file_size or 0, 1) if skip: doc.extracted_text = "" doc.extractor_version = None doc.extract_meta = {**meta, "ocr_skip_reason": skip, "ocr_terminal": True} doc.extracted_at = datetime.now(timezone.utc) logger.warning(f"[이미지] {doc.file_path} — OCR 스킵 ({skip})") return # OCR 서비스 호출 ocr_text = await _call_ocr(full_path, is_image=True) meta["ocr_attempted"] = True meta["ocr_reason"] = "image_file" if ocr_text and _ocr_quality_ok(ocr_text, 1, is_image=True): doc.extracted_text = _postprocess_ocr(ocr_text) doc.extractor_version = "surya_ocr" meta["ocr_chars"] = len(doc.extracted_text) logger.info(f"[surya_ocr] {doc.file_path} ({len(doc.extracted_text)}자)") else: doc.extracted_text = "" doc.extractor_version = None meta["ocr_quality_ok"] = False meta["ocr_terminal"] = True logger.warning(f"[이미지] {doc.file_path} — OCR 결과 품질 미달") doc.extract_meta = meta doc.extracted_at = datetime.now(timezone.utc) return # ─── kordoc 파싱 (HWP/HWPX/PDF) + PyMuPDF fallback + OCR ─── if fmt in KORDOC_FORMATS: container_path = f"/documents/{doc.file_path}" kordoc_timeout = min(300, max(60, (doc.file_size or 0) // (10 * 1024 * 1024) * 60 + 60)) kordoc_ok = False try: async with httpx.AsyncClient(timeout=kordoc_timeout) as client: resp = await client.post( f"{settings.kordoc_endpoint}/parse", json={"filePath": container_path}, ) if resp.status_code == 404: raise FileNotFoundError(f"kordoc: 파일 없음 — {container_path}") if resp.status_code == 200: data = resp.json() text = data.get("markdown", "").replace("\x00", "") if text: doc.extracted_text = text doc.extracted_at = datetime.now(timezone.utc) doc.extractor_version = EXTRACTOR_VERSION logger.info(f"[kordoc] {doc.file_path} ({len(text)}자)") kordoc_ok = True except FileNotFoundError: raise except Exception as e: logger.warning(f"[kordoc] {doc.file_path} 실패 ({e.__class__.__name__}), fallback 시도") if kordoc_ok: return # ─── PyMuPDF fallback (PDF만) ─── if fmt == "pdf" and full_path.exists(): try: pymupdf_text = _extract_pdf_pymupdf(full_path) page_count = _get_pdf_page_count(full_path) except Exception as e: logger.error(f"[pymupdf] {doc.file_path} 실패: {e}") pymupdf_text = "" page_count = 0 meta = doc.extract_meta or {} meta["pymupdf_chars"] = len(pymupdf_text.strip()) # PyMuPDF 텍스트 충분 여부 판정 should, reason = _should_ocr(pymupdf_text, page_count) if not should: # PyMuPDF 텍스트 충분 → OCR 불필요 doc.extracted_text = pymupdf_text.replace("\x00", "") doc.extracted_at = datetime.now(timezone.utc) doc.extractor_version = PYMUPDF_VERSION doc.extract_meta = meta logger.info(f"[pymupdf] {doc.file_path} ({len(pymupdf_text)}자)") return # ─── OCR 필요 ─── # OCR 1회 제한 if meta.get("ocr_attempted"): doc.extracted_text = pymupdf_text.replace("\x00", "") or "" doc.extracted_at = datetime.now(timezone.utc) doc.extractor_version = "pymupdf" if pymupdf_text.strip() else None meta["ocr_skip_reason"] = "already_attempted" doc.extract_meta = meta logger.info(f"[pdf] {doc.file_path} — OCR 이미 시도됨, PyMuPDF 결과 유지") return # 상한 체크 skip = _ocr_skip_reason(doc.file_size or 0, page_count) if skip: doc.extracted_text = pymupdf_text.replace("\x00", "") or "" doc.extracted_at = datetime.now(timezone.utc) doc.extractor_version = "pymupdf" if pymupdf_text.strip() else None doc.extract_meta = {**meta, "ocr_skip_reason": skip, "ocr_terminal": not pymupdf_text.strip()} logger.warning(f"[pdf] {doc.file_path} — OCR 스킵 ({skip}), PyMuPDF 결과 유지") return # OCR 서비스 호출 meta["ocr_attempted"] = True meta["ocr_reason"] = reason logger.info(f"[pdf] {doc.file_path} — OCR 시도 (reason={reason}, pages={page_count})") ocr_text = await _call_ocr(full_path, is_image=False, max_pages=min(page_count, 200)) if ocr_text and _ocr_quality_ok(ocr_text, page_count, is_image=False): doc.extracted_text = _postprocess_ocr(ocr_text) doc.extractor_version = "surya_ocr" meta["ocr_chars"] = len(doc.extracted_text) logger.info(f"[surya_ocr] {doc.file_path} ({len(doc.extracted_text)}자)") else: # OCR 실패 → PyMuPDF 텍스트라도 보존 doc.extracted_text = pymupdf_text.replace("\x00", "") or "" doc.extractor_version = "pymupdf" if pymupdf_text.strip() else None meta["ocr_quality_ok"] = False if not pymupdf_text.strip(): meta["ocr_terminal"] = True logger.warning(f"[pdf] {doc.file_path} — OCR 결과 품질 미달, PyMuPDF 결과 유지") doc.extract_meta = meta doc.extracted_at = datetime.now(timezone.utc) return # HWP/HWPX는 kordoc 전용 — fallback 없음 if fmt != "pdf": raise ValueError(f"kordoc 파싱 실패 (HWP/HWPX는 fallback 없음)") raise ValueError(f"PDF 텍스트 추출 실패 — kordoc + PyMuPDF 모두 실패") # ─── 오피스 포맷 — LibreOffice 텍스트 변환 ─── if fmt in OFFICE_FORMATS: if not full_path.exists(): raise FileNotFoundError(f"파일 없음: {full_path}") import shutil tmp_dir = Path("/tmp/extract_work") tmp_dir.mkdir(exist_ok=True) tmp_input = tmp_dir / f"input_{document_id}.{fmt}" shutil.copy2(str(full_path), str(tmp_input)) CALC_FORMATS = {"xlsx", "xls", "ods", "osheet"} if fmt in CALC_FORMATS: convert_to = "csv:Text - txt - csv (StarCalc):44,34,76,1" out_ext = "csv" else: convert_to = "txt:Text" out_ext = "txt" try: result = subprocess.run( ["libreoffice", "--headless", "--convert-to", convert_to, "--outdir", str(tmp_dir), str(tmp_input)], capture_output=True, text=True, timeout=60, ) out_file = tmp_dir / f"input_{document_id}.{out_ext}" if out_file.exists(): text = out_file.read_text(encoding="utf-8", errors="replace") # 설계 원칙: extract는 전체 텍스트 저장. classify/summarize가 자체 상한으로 slice. doc.extracted_text = text.replace("\x00", "") doc.extracted_at = datetime.now(timezone.utc) doc.extractor_version = "libreoffice" out_file.unlink() logger.info(f"[LibreOffice] {doc.file_path} ({len(text)}자)") else: raise RuntimeError(f"LibreOffice 변환 실패: {result.stderr[:300]}") except subprocess.TimeoutExpired: raise RuntimeError(f"LibreOffice 텍스트 추출 timeout (60s)") finally: tmp_input.unlink(missing_ok=True) # ─── ODF 변환 (편집용) ─── CONVERT_MAP = { 'xlsx': 'ods', 'xls': 'ods', 'docx': 'odt', 'doc': 'odt', 'pptx': 'odp', 'ppt': 'odp', } target_fmt = CONVERT_MAP.get(fmt) if target_fmt: try: derived_dir = full_path.parent / ".derived" derived_dir.mkdir(exist_ok=True) tmp_input2 = tmp_dir / f"convert_{document_id}.{fmt}" shutil.copy2(str(full_path), str(tmp_input2)) conv_result = subprocess.run( ["libreoffice", "--headless", "--convert-to", target_fmt, "--outdir", str(tmp_dir), str(tmp_input2)], capture_output=True, text=True, timeout=60, ) tmp_input2.unlink(missing_ok=True) conv_file = tmp_dir / f"convert_{document_id}.{target_fmt}" if conv_file.exists(): final_path = derived_dir / f"{document_id}.{target_fmt}" shutil.move(str(conv_file), str(final_path)) nas_root = Path(settings.nas_mount_path) doc.derived_path = str(final_path.relative_to(nas_root)) doc.original_format = doc.file_format doc.conversion_status = "done" logger.info(f"[ODF변환] {doc.file_path} → derived: {doc.derived_path}") else: doc.conversion_status = "failed" logger.warning(f"[ODF변환] 실패: {conv_result.stderr[:200]}") except Exception as e: doc.conversion_status = "failed" logger.error(f"[ODF변환] {doc.file_path} 에러: {e}") else: doc.conversion_status = "none" return # ─── 미지원 포맷 ─── doc.extracted_text = "" doc.extracted_at = datetime.now(timezone.utc) doc.extractor_version = f"unsupported_{fmt}" logger.warning(f"[미지원] {doc.file_path} (format={fmt})")