diff --git a/app/core/config.py b/app/core/config.py index 3b2765d..49ba5b8 100644 --- a/app/core/config.py +++ b/app/core/config.py @@ -48,6 +48,9 @@ class Settings(BaseModel): # kordoc kordoc_endpoint: str = "http://kordoc-service:3100" + # OCR (Surya) + ocr_endpoint: str = "http://ocr-service:3200" + # 분류 체계 taxonomy: dict = {} document_types: list[str] = [] @@ -60,6 +63,7 @@ def load_settings() -> Settings: jwt_secret = os.getenv("JWT_SECRET", "") totp_secret = os.getenv("TOTP_SECRET", "") kordoc_endpoint = os.getenv("KORDOC_ENDPOINT", "http://kordoc-service:3100") + ocr_endpoint = os.getenv("OCR_ENDPOINT", "http://ocr-service:3200") # config.yaml — Docker 컨테이너 내부(/app/config.yaml) 또는 프로젝트 루트 config_path = Path("/app/config.yaml") @@ -110,6 +114,7 @@ def load_settings() -> Settings: jwt_secret=jwt_secret, totp_secret=totp_secret, kordoc_endpoint=kordoc_endpoint, + ocr_endpoint=ocr_endpoint, taxonomy=taxonomy, document_types=document_types, ) diff --git a/app/models/document.py b/app/models/document.py index cc7159f..005d31f 100644 --- a/app/models/document.py +++ b/app/models/document.py @@ -33,6 +33,9 @@ class Document(Base): extracted_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True)) extractor_version: Mapped[str | None] = mapped_column(String(50)) + # 2계층: 추출 메타 (OCR 판정/실행) + extract_meta: Mapped[dict | None] = mapped_column(JSONB, default=dict) + # 2계층: AI 가공 ai_summary: Mapped[str | None] = mapped_column(Text) ai_tags: Mapped[dict | None] = mapped_column(JSONB, default=[]) diff --git a/app/workers/extract_worker.py b/app/workers/extract_worker.py index 0d13607..8ebbb7e 100644 --- a/app/workers/extract_worker.py +++ b/app/workers/extract_worker.py @@ -1,5 +1,6 @@ -"""텍스트 추출 워커 — kordoc / LibreOffice / 직접 읽기""" +"""텍스트 추출 워커 — kordoc / PyMuPDF / Surya OCR / LibreOffice / 직접 읽기""" +import re import subprocess from datetime import datetime, timezone from pathlib import Path @@ -19,17 +20,55 @@ KORDOC_FORMATS = {"hwp", "hwpx", "pdf"} TEXT_FORMATS = {"md", "txt", "csv", "json", "xml", "html"} # LibreOffice로 텍스트 추출 가능한 포맷 OFFICE_FORMATS = {"xlsx", "xls", "docx", "doc", "pptx", "ppt", "odt", "ods", "odp", "odoc", "osheet"} -# OCR 필요 이미지 포맷 (Phase 2) -IMAGE_FORMATS = {"jpg", "jpeg", "png", "tiff", "tif", "bmp", "gif"} +# OCR 대상 이미지 포맷 +IMAGE_FORMATS = {"jpg", "jpeg", "png", "tiff", "tif", "bmp", "gif", "webp"} EXTRACTOR_VERSION = "kordoc@1.7" PYMUPDF_VERSION = "pymupdf" +# ─── OCR 판정 함수 ─── + +def _should_ocr(text: str, page_count: int) -> tuple[bool, str]: + """텍스트 추출 결과로 OCR 필요 여부 판정 — 2단계""" + total = len(text.strip()) + if total < 300: + return True, "no_text_layer" + avg = total / max(page_count, 1) + if avg < 80 and total < 3000: + return True, "low_text_density" + return False, "" + + +def _ocr_skip_reason(file_size: int, page_count: int) -> str | None: + """OCR 상한 체크""" + if page_count > 200: + return "page_limit" + if file_size > 150 * 1024 * 1024: + return "size_limit" + return None + + +def _ocr_quality_ok(text: str, page_count: int, is_image: bool) -> bool: + """OCR 결과 품질 검증 — 유형별 차등""" + chars = len(text.strip()) + if is_image: + return chars >= 50 + if page_count > 0: + return chars >= 200 or (chars / max(page_count, 1)) >= 30 + return chars >= 200 + + +def _postprocess_ocr(text: str) -> str: + """OCR 후처리 — NUL 제거 + 과도한 공백 정리""" + text = text.replace("\x00", "") + text = re.sub(r'\s{3,}', '\n', text) + return text.strip() + + def _extract_pdf_pymupdf(file_path: Path) -> str: """PyMuPDF fallback — 페이지 단위 스트리밍으로 대형 PDF도 저메모리 처리""" import fitz - text_parts = [] with fitz.open(str(file_path)) as doc: for page in doc: @@ -37,6 +76,33 @@ def _extract_pdf_pymupdf(file_path: Path) -> str: return "\n".join(text_parts) +def _get_pdf_page_count(file_path: Path) -> int: + """PDF 페이지 수 확인""" + import fitz + with fitz.open(str(file_path)) as doc: + return len(doc) + + +async def _call_ocr(file_path: Path, is_image: bool, max_pages: int = 200) -> str | None: + """OCR 서비스 호출 — 타임아웃 페이지 수 비례""" + container_path = f"/documents/{file_path.relative_to(Path(settings.nas_mount_path))}" + timeout = 60 if is_image else min(600, max(120, max_pages * 3)) + try: + async with httpx.AsyncClient(timeout=timeout) as client: + resp = await client.post( + f"{settings.ocr_endpoint}/ocr", + json={"filePath": container_path, "langs": ["ko", "en"], "maxPages": max_pages}, + ) + if resp.status_code == 200: + data = resp.json() + return data.get("text", "") + except Exception as e: + logger.error(f"[ocr] OCR 서비스 호출 실패: {e}") + return None + + +# ─── 메인 처리 ─── + async def process(document_id: int, session: AsyncSession) -> None: """문서 텍스트 추출""" doc = await session.get(Document, document_id) @@ -46,29 +112,60 @@ async def process(document_id: int, session: AsyncSession) -> None: fmt = doc.file_format.lower() full_path = Path(settings.nas_mount_path) / doc.file_path - # 텍스트 파일 — 직접 읽기 + # ─── 텍스트 파일 — 직접 읽기 ─── if fmt in TEXT_FORMATS: if not full_path.exists(): raise FileNotFoundError(f"파일 없음: {full_path}") text = full_path.read_text(encoding="utf-8", errors="replace") - # NUL 바이트 제거 (Postgres TEXT 저장 시 CharacterNotInRepertoireError 방지) doc.extracted_text = text.replace("\x00", "") doc.extracted_at = datetime.now(timezone.utc) doc.extractor_version = "direct_read" logger.info(f"[텍스트] {doc.file_path} ({len(text)}자)") return - # 이미지 — 스킵 (Phase 2 OCR) + # ─── 이미지 — OCR ─── if fmt in IMAGE_FORMATS: - doc.extracted_text = "" + meta = doc.extract_meta or {} + + # OCR 1회 제한 + if meta.get("ocr_attempted"): + meta["ocr_skip_reason"] = "already_attempted" + doc.extract_meta = meta + logger.info(f"[이미지] {doc.file_path} — OCR 이미 시도됨, 스킵") + return + + # 상한 체크 + skip = _ocr_skip_reason(doc.file_size or 0, 1) + if skip: + doc.extracted_text = "" + doc.extractor_version = None + doc.extract_meta = {**meta, "ocr_skip_reason": skip, "ocr_terminal": True} + doc.extracted_at = datetime.now(timezone.utc) + logger.warning(f"[이미지] {doc.file_path} — OCR 스킵 ({skip})") + return + + # OCR 서비스 호출 + ocr_text = await _call_ocr(full_path, is_image=True) + meta["ocr_attempted"] = True + meta["ocr_reason"] = "image_file" + + if ocr_text and _ocr_quality_ok(ocr_text, 1, is_image=True): + doc.extracted_text = _postprocess_ocr(ocr_text) + doc.extractor_version = "surya_ocr" + meta["ocr_chars"] = len(doc.extracted_text) + logger.info(f"[surya_ocr] {doc.file_path} ({len(doc.extracted_text)}자)") + else: + doc.extracted_text = "" + doc.extractor_version = None + meta["ocr_quality_ok"] = False + meta["ocr_terminal"] = True + logger.warning(f"[이미지] {doc.file_path} — OCR 결과 품질 미달") + + doc.extract_meta = meta doc.extracted_at = datetime.now(timezone.utc) - doc.extractor_version = "skip_image" - logger.info(f"[이미지] {doc.file_path} — OCR 미구현, 스킵") return - # kordoc 파싱 (HWP/HWPX/PDF) - # OOM 방지는 docker-compose.yml의 kordoc-service mem_limit으로 통제. - # kordoc 실패 시 PDF는 PyMuPDF fallback으로 텍스트 추출. + # ─── kordoc 파싱 (HWP/HWPX/PDF) + PyMuPDF fallback + OCR ─── if fmt in KORDOC_FORMATS: container_path = f"/documents/{doc.file_path}" kordoc_timeout = min(300, max(60, (doc.file_size or 0) // (10 * 1024 * 1024) * 60 + 60)) @@ -84,9 +181,6 @@ async def process(document_id: int, session: AsyncSession) -> None: if resp.status_code == 404: raise FileNotFoundError(f"kordoc: 파일 없음 — {container_path}") - if resp.status_code not in (200, 422): - resp.raise_for_status() - if resp.status_code == 200: data = resp.json() text = data.get("markdown", "").replace("\x00", "") @@ -100,33 +194,89 @@ async def process(document_id: int, session: AsyncSession) -> None: except FileNotFoundError: raise except Exception as e: - logger.warning(f"[kordoc] {doc.file_path} 실패 ({e.__class__.__name__}), PyMuPDF fallback 시도") + logger.warning(f"[kordoc] {doc.file_path} 실패 ({e.__class__.__name__}), fallback 시도") if kordoc_ok: return - # PyMuPDF fallback — PDF만 가능 (HWP/HWPX는 kordoc 전용) + # ─── PyMuPDF fallback (PDF만) ─── if fmt == "pdf" and full_path.exists(): try: - text = _extract_pdf_pymupdf(full_path) - text = text.replace("\x00", "") - if text.strip(): - doc.extracted_text = text - doc.extracted_at = datetime.now(timezone.utc) - doc.extractor_version = PYMUPDF_VERSION - logger.info(f"[pymupdf] {doc.file_path} ({len(text)}자)") - return - else: - logger.warning(f"[pymupdf] {doc.file_path} — 텍스트 레이어 없음 (스캔 전용 PDF)") + pymupdf_text = _extract_pdf_pymupdf(full_path) + page_count = _get_pdf_page_count(full_path) except Exception as e: logger.error(f"[pymupdf] {doc.file_path} 실패: {e}") + pymupdf_text = "" + page_count = 0 - # kordoc + PyMuPDF 모두 실패 시 + meta = doc.extract_meta or {} + meta["pymupdf_chars"] = len(pymupdf_text.strip()) + + # PyMuPDF 텍스트 충분 여부 판정 + should, reason = _should_ocr(pymupdf_text, page_count) + + if not should: + # PyMuPDF 텍스트 충분 → OCR 불필요 + doc.extracted_text = pymupdf_text.replace("\x00", "") + doc.extracted_at = datetime.now(timezone.utc) + doc.extractor_version = PYMUPDF_VERSION + doc.extract_meta = meta + logger.info(f"[pymupdf] {doc.file_path} ({len(pymupdf_text)}자)") + return + + # ─── OCR 필요 ─── + + # OCR 1회 제한 + if meta.get("ocr_attempted"): + doc.extracted_text = pymupdf_text.replace("\x00", "") or "" + doc.extracted_at = datetime.now(timezone.utc) + doc.extractor_version = "pymupdf" if pymupdf_text.strip() else None + meta["ocr_skip_reason"] = "already_attempted" + doc.extract_meta = meta + logger.info(f"[pdf] {doc.file_path} — OCR 이미 시도됨, PyMuPDF 결과 유지") + return + + # 상한 체크 + skip = _ocr_skip_reason(doc.file_size or 0, page_count) + if skip: + doc.extracted_text = pymupdf_text.replace("\x00", "") or "" + doc.extracted_at = datetime.now(timezone.utc) + doc.extractor_version = "pymupdf" if pymupdf_text.strip() else None + doc.extract_meta = {**meta, "ocr_skip_reason": skip, "ocr_terminal": not pymupdf_text.strip()} + logger.warning(f"[pdf] {doc.file_path} — OCR 스킵 ({skip}), PyMuPDF 결과 유지") + return + + # OCR 서비스 호출 + meta["ocr_attempted"] = True + meta["ocr_reason"] = reason + logger.info(f"[pdf] {doc.file_path} — OCR 시도 (reason={reason}, pages={page_count})") + + ocr_text = await _call_ocr(full_path, is_image=False, max_pages=min(page_count, 200)) + + if ocr_text and _ocr_quality_ok(ocr_text, page_count, is_image=False): + doc.extracted_text = _postprocess_ocr(ocr_text) + doc.extractor_version = "surya_ocr" + meta["ocr_chars"] = len(doc.extracted_text) + logger.info(f"[surya_ocr] {doc.file_path} ({len(doc.extracted_text)}자)") + else: + # OCR 실패 → PyMuPDF 텍스트라도 보존 + doc.extracted_text = pymupdf_text.replace("\x00", "") or "" + doc.extractor_version = "pymupdf" if pymupdf_text.strip() else None + meta["ocr_quality_ok"] = False + if not pymupdf_text.strip(): + meta["ocr_terminal"] = True + logger.warning(f"[pdf] {doc.file_path} — OCR 결과 품질 미달, PyMuPDF 결과 유지") + + doc.extract_meta = meta + doc.extracted_at = datetime.now(timezone.utc) + return + + # HWP/HWPX는 kordoc 전용 — fallback 없음 if fmt != "pdf": raise ValueError(f"kordoc 파싱 실패 (HWP/HWPX는 fallback 없음)") raise ValueError(f"PDF 텍스트 추출 실패 — kordoc + PyMuPDF 모두 실패") - # 오피스 포맷 — LibreOffice 텍스트 변환 + # ─── 오피스 포맷 — LibreOffice 텍스트 변환 ─── if fmt in OFFICE_FORMATS: if not full_path.exists(): raise FileNotFoundError(f"파일 없음: {full_path}") @@ -135,11 +285,9 @@ async def process(document_id: int, session: AsyncSession) -> None: tmp_dir = Path("/tmp/extract_work") tmp_dir.mkdir(exist_ok=True) - # 한글 파일명 문제 방지 — 영문 임시 파일로 복사 tmp_input = tmp_dir / f"input_{document_id}.{fmt}" shutil.copy2(str(full_path), str(tmp_input)) - # 스프레드시트는 csv, 나머지는 txt CALC_FORMATS = {"xlsx", "xls", "ods", "osheet"} if fmt in CALC_FORMATS: convert_to = "csv:Text - txt - csv (StarCalc):44,34,76,1" @@ -156,7 +304,6 @@ async def process(document_id: int, session: AsyncSession) -> None: out_file = tmp_dir / f"input_{document_id}.{out_ext}" if out_file.exists(): text = out_file.read_text(encoding="utf-8", errors="replace") - # NUL 바이트 제거 (Postgres TEXT 저장 시 CharacterNotInRepertoireError 방지) # 설계 원칙: extract는 전체 텍스트 저장. classify/summarize가 자체 상한으로 slice. doc.extracted_text = text.replace("\x00", "") doc.extracted_at = datetime.now(timezone.utc) @@ -179,7 +326,6 @@ async def process(document_id: int, session: AsyncSession) -> None: target_fmt = CONVERT_MAP.get(fmt) if target_fmt: try: - # .derived 디렉토리에 변환 (file_path는 원본 유지!) derived_dir = full_path.parent / ".derived" derived_dir.mkdir(exist_ok=True) tmp_input2 = tmp_dir / f"convert_{document_id}.{fmt}" @@ -212,7 +358,7 @@ async def process(document_id: int, session: AsyncSession) -> None: return - # 미지원 포맷 + # ─── 미지원 포맷 ─── doc.extracted_text = "" doc.extracted_at = datetime.now(timezone.utc) doc.extractor_version = f"unsupported_{fmt}" diff --git a/migrations/134_extract_meta.sql b/migrations/134_extract_meta.sql new file mode 100644 index 0000000..e53f21c --- /dev/null +++ b/migrations/134_extract_meta.sql @@ -0,0 +1,2 @@ +ALTER TABLE documents ADD COLUMN IF NOT EXISTS extract_meta JSONB DEFAULT '{}'; +COMMENT ON COLUMN documents.extract_meta IS 'OCR 판정/실행 메타데이터: ocr_attempted, ocr_reason, ocr_skip_reason, ocr_chars, pymupdf_chars, ocr_terminal';