Files
hyungi_document_server/app/workers/extract_worker.py
T
Hyungi Ahn 088966bf78 feat(extract): OCR 트리거 규칙 + extract_meta JSONB
스캔 PDF/이미지 자동 OCR 트리거 + 결과 품질 검증 + 1회 제한.

- extract_meta JSONB 컬럼 추가 (migration 134)
  ocr_attempted, ocr_reason, ocr_skip_reason, ocr_terminal, ocr_chars
- PDF OCR 트리거: total_chars < 300 또는 avg < 80 && total < 3000
- 이미지 자동 OCR: jpg/png/tiff/webp 등
- 품질 차등: 이미지 50자, PDF 200자 또는 페이지당 30자
- 상한: pages > 200 또는 file_size > 150MB → 스킵
- OCR 1회 제한: extract_meta.ocr_attempted로 재시도 방지
- extractor_version은 도구명만 (surya_ocr/pymupdf/kordoc)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-15 15:04:13 +09:00

366 lines
14 KiB
Python

"""텍스트 추출 워커 — kordoc / PyMuPDF / Surya OCR / LibreOffice / 직접 읽기"""
import re
import subprocess
from datetime import datetime, timezone
from pathlib import Path
import httpx
from sqlalchemy.ext.asyncio import AsyncSession
from core.config import settings
from core.utils import setup_logger
from models.document import Document
logger = setup_logger("extract_worker")
# kordoc으로 파싱 가능한 포맷
KORDOC_FORMATS = {"hwp", "hwpx", "pdf"}
# 직접 읽기 가능한 텍스트 포맷
TEXT_FORMATS = {"md", "txt", "csv", "json", "xml", "html"}
# LibreOffice로 텍스트 추출 가능한 포맷
OFFICE_FORMATS = {"xlsx", "xls", "docx", "doc", "pptx", "ppt", "odt", "ods", "odp", "odoc", "osheet"}
# OCR 대상 이미지 포맷
IMAGE_FORMATS = {"jpg", "jpeg", "png", "tiff", "tif", "bmp", "gif", "webp"}
EXTRACTOR_VERSION = "kordoc@1.7"
PYMUPDF_VERSION = "pymupdf"
# ─── OCR 판정 함수 ───
def _should_ocr(text: str, page_count: int) -> tuple[bool, str]:
"""텍스트 추출 결과로 OCR 필요 여부 판정 — 2단계"""
total = len(text.strip())
if total < 300:
return True, "no_text_layer"
avg = total / max(page_count, 1)
if avg < 80 and total < 3000:
return True, "low_text_density"
return False, ""
def _ocr_skip_reason(file_size: int, page_count: int) -> str | None:
"""OCR 상한 체크"""
if page_count > 200:
return "page_limit"
if file_size > 150 * 1024 * 1024:
return "size_limit"
return None
def _ocr_quality_ok(text: str, page_count: int, is_image: bool) -> bool:
"""OCR 결과 품질 검증 — 유형별 차등"""
chars = len(text.strip())
if is_image:
return chars >= 50
if page_count > 0:
return chars >= 200 or (chars / max(page_count, 1)) >= 30
return chars >= 200
def _postprocess_ocr(text: str) -> str:
"""OCR 후처리 — NUL 제거 + 과도한 공백 정리"""
text = text.replace("\x00", "")
text = re.sub(r'\s{3,}', '\n', text)
return text.strip()
def _extract_pdf_pymupdf(file_path: Path) -> str:
"""PyMuPDF fallback — 페이지 단위 스트리밍으로 대형 PDF도 저메모리 처리"""
import fitz
text_parts = []
with fitz.open(str(file_path)) as doc:
for page in doc:
text_parts.append(page.get_text())
return "\n".join(text_parts)
def _get_pdf_page_count(file_path: Path) -> int:
"""PDF 페이지 수 확인"""
import fitz
with fitz.open(str(file_path)) as doc:
return len(doc)
async def _call_ocr(file_path: Path, is_image: bool, max_pages: int = 200) -> str | None:
"""OCR 서비스 호출 — 타임아웃 페이지 수 비례"""
container_path = f"/documents/{file_path.relative_to(Path(settings.nas_mount_path))}"
timeout = 60 if is_image else min(600, max(120, max_pages * 3))
try:
async with httpx.AsyncClient(timeout=timeout) as client:
resp = await client.post(
f"{settings.ocr_endpoint}/ocr",
json={"filePath": container_path, "langs": ["ko", "en"], "maxPages": max_pages},
)
if resp.status_code == 200:
data = resp.json()
return data.get("text", "")
except Exception as e:
logger.error(f"[ocr] OCR 서비스 호출 실패: {e}")
return None
# ─── 메인 처리 ───
async def process(document_id: int, session: AsyncSession) -> None:
"""문서 텍스트 추출"""
doc = await session.get(Document, document_id)
if not doc:
raise ValueError(f"문서 ID {document_id}를 찾을 수 없음")
fmt = doc.file_format.lower()
full_path = Path(settings.nas_mount_path) / doc.file_path
# ─── 텍스트 파일 — 직접 읽기 ───
if fmt in TEXT_FORMATS:
if not full_path.exists():
raise FileNotFoundError(f"파일 없음: {full_path}")
text = full_path.read_text(encoding="utf-8", errors="replace")
doc.extracted_text = text.replace("\x00", "")
doc.extracted_at = datetime.now(timezone.utc)
doc.extractor_version = "direct_read"
logger.info(f"[텍스트] {doc.file_path} ({len(text)}자)")
return
# ─── 이미지 — OCR ───
if fmt in IMAGE_FORMATS:
meta = doc.extract_meta or {}
# OCR 1회 제한
if meta.get("ocr_attempted"):
meta["ocr_skip_reason"] = "already_attempted"
doc.extract_meta = meta
logger.info(f"[이미지] {doc.file_path} — OCR 이미 시도됨, 스킵")
return
# 상한 체크
skip = _ocr_skip_reason(doc.file_size or 0, 1)
if skip:
doc.extracted_text = ""
doc.extractor_version = None
doc.extract_meta = {**meta, "ocr_skip_reason": skip, "ocr_terminal": True}
doc.extracted_at = datetime.now(timezone.utc)
logger.warning(f"[이미지] {doc.file_path} — OCR 스킵 ({skip})")
return
# OCR 서비스 호출
ocr_text = await _call_ocr(full_path, is_image=True)
meta["ocr_attempted"] = True
meta["ocr_reason"] = "image_file"
if ocr_text and _ocr_quality_ok(ocr_text, 1, is_image=True):
doc.extracted_text = _postprocess_ocr(ocr_text)
doc.extractor_version = "surya_ocr"
meta["ocr_chars"] = len(doc.extracted_text)
logger.info(f"[surya_ocr] {doc.file_path} ({len(doc.extracted_text)}자)")
else:
doc.extracted_text = ""
doc.extractor_version = None
meta["ocr_quality_ok"] = False
meta["ocr_terminal"] = True
logger.warning(f"[이미지] {doc.file_path} — OCR 결과 품질 미달")
doc.extract_meta = meta
doc.extracted_at = datetime.now(timezone.utc)
return
# ─── kordoc 파싱 (HWP/HWPX/PDF) + PyMuPDF fallback + OCR ───
if fmt in KORDOC_FORMATS:
container_path = f"/documents/{doc.file_path}"
kordoc_timeout = min(300, max(60, (doc.file_size or 0) // (10 * 1024 * 1024) * 60 + 60))
kordoc_ok = False
try:
async with httpx.AsyncClient(timeout=kordoc_timeout) as client:
resp = await client.post(
f"{settings.kordoc_endpoint}/parse",
json={"filePath": container_path},
)
if resp.status_code == 404:
raise FileNotFoundError(f"kordoc: 파일 없음 — {container_path}")
if resp.status_code == 200:
data = resp.json()
text = data.get("markdown", "").replace("\x00", "")
if text:
doc.extracted_text = text
doc.extracted_at = datetime.now(timezone.utc)
doc.extractor_version = EXTRACTOR_VERSION
logger.info(f"[kordoc] {doc.file_path} ({len(text)}자)")
kordoc_ok = True
except FileNotFoundError:
raise
except Exception as e:
logger.warning(f"[kordoc] {doc.file_path} 실패 ({e.__class__.__name__}), fallback 시도")
if kordoc_ok:
return
# ─── PyMuPDF fallback (PDF만) ───
if fmt == "pdf" and full_path.exists():
try:
pymupdf_text = _extract_pdf_pymupdf(full_path)
page_count = _get_pdf_page_count(full_path)
except Exception as e:
logger.error(f"[pymupdf] {doc.file_path} 실패: {e}")
pymupdf_text = ""
page_count = 0
meta = doc.extract_meta or {}
meta["pymupdf_chars"] = len(pymupdf_text.strip())
# PyMuPDF 텍스트 충분 여부 판정
should, reason = _should_ocr(pymupdf_text, page_count)
if not should:
# PyMuPDF 텍스트 충분 → OCR 불필요
doc.extracted_text = pymupdf_text.replace("\x00", "")
doc.extracted_at = datetime.now(timezone.utc)
doc.extractor_version = PYMUPDF_VERSION
doc.extract_meta = meta
logger.info(f"[pymupdf] {doc.file_path} ({len(pymupdf_text)}자)")
return
# ─── OCR 필요 ───
# OCR 1회 제한
if meta.get("ocr_attempted"):
doc.extracted_text = pymupdf_text.replace("\x00", "") or ""
doc.extracted_at = datetime.now(timezone.utc)
doc.extractor_version = "pymupdf" if pymupdf_text.strip() else None
meta["ocr_skip_reason"] = "already_attempted"
doc.extract_meta = meta
logger.info(f"[pdf] {doc.file_path} — OCR 이미 시도됨, PyMuPDF 결과 유지")
return
# 상한 체크
skip = _ocr_skip_reason(doc.file_size or 0, page_count)
if skip:
doc.extracted_text = pymupdf_text.replace("\x00", "") or ""
doc.extracted_at = datetime.now(timezone.utc)
doc.extractor_version = "pymupdf" if pymupdf_text.strip() else None
doc.extract_meta = {**meta, "ocr_skip_reason": skip, "ocr_terminal": not pymupdf_text.strip()}
logger.warning(f"[pdf] {doc.file_path} — OCR 스킵 ({skip}), PyMuPDF 결과 유지")
return
# OCR 서비스 호출
meta["ocr_attempted"] = True
meta["ocr_reason"] = reason
logger.info(f"[pdf] {doc.file_path} — OCR 시도 (reason={reason}, pages={page_count})")
ocr_text = await _call_ocr(full_path, is_image=False, max_pages=min(page_count, 200))
if ocr_text and _ocr_quality_ok(ocr_text, page_count, is_image=False):
doc.extracted_text = _postprocess_ocr(ocr_text)
doc.extractor_version = "surya_ocr"
meta["ocr_chars"] = len(doc.extracted_text)
logger.info(f"[surya_ocr] {doc.file_path} ({len(doc.extracted_text)}자)")
else:
# OCR 실패 → PyMuPDF 텍스트라도 보존
doc.extracted_text = pymupdf_text.replace("\x00", "") or ""
doc.extractor_version = "pymupdf" if pymupdf_text.strip() else None
meta["ocr_quality_ok"] = False
if not pymupdf_text.strip():
meta["ocr_terminal"] = True
logger.warning(f"[pdf] {doc.file_path} — OCR 결과 품질 미달, PyMuPDF 결과 유지")
doc.extract_meta = meta
doc.extracted_at = datetime.now(timezone.utc)
return
# HWP/HWPX는 kordoc 전용 — fallback 없음
if fmt != "pdf":
raise ValueError(f"kordoc 파싱 실패 (HWP/HWPX는 fallback 없음)")
raise ValueError(f"PDF 텍스트 추출 실패 — kordoc + PyMuPDF 모두 실패")
# ─── 오피스 포맷 — LibreOffice 텍스트 변환 ───
if fmt in OFFICE_FORMATS:
if not full_path.exists():
raise FileNotFoundError(f"파일 없음: {full_path}")
import shutil
tmp_dir = Path("/tmp/extract_work")
tmp_dir.mkdir(exist_ok=True)
tmp_input = tmp_dir / f"input_{document_id}.{fmt}"
shutil.copy2(str(full_path), str(tmp_input))
CALC_FORMATS = {"xlsx", "xls", "ods", "osheet"}
if fmt in CALC_FORMATS:
convert_to = "csv:Text - txt - csv (StarCalc):44,34,76,1"
out_ext = "csv"
else:
convert_to = "txt:Text"
out_ext = "txt"
try:
result = subprocess.run(
["libreoffice", "--headless", "--convert-to", convert_to, "--outdir", str(tmp_dir), str(tmp_input)],
capture_output=True, text=True, timeout=60,
)
out_file = tmp_dir / f"input_{document_id}.{out_ext}"
if out_file.exists():
text = out_file.read_text(encoding="utf-8", errors="replace")
# 설계 원칙: extract는 전체 텍스트 저장. classify/summarize가 자체 상한으로 slice.
doc.extracted_text = text.replace("\x00", "")
doc.extracted_at = datetime.now(timezone.utc)
doc.extractor_version = "libreoffice"
out_file.unlink()
logger.info(f"[LibreOffice] {doc.file_path} ({len(text)}자)")
else:
raise RuntimeError(f"LibreOffice 변환 실패: {result.stderr[:300]}")
except subprocess.TimeoutExpired:
raise RuntimeError(f"LibreOffice 텍스트 추출 timeout (60s)")
finally:
tmp_input.unlink(missing_ok=True)
# ─── ODF 변환 (편집용) ───
CONVERT_MAP = {
'xlsx': 'ods', 'xls': 'ods',
'docx': 'odt', 'doc': 'odt',
'pptx': 'odp', 'ppt': 'odp',
}
target_fmt = CONVERT_MAP.get(fmt)
if target_fmt:
try:
derived_dir = full_path.parent / ".derived"
derived_dir.mkdir(exist_ok=True)
tmp_input2 = tmp_dir / f"convert_{document_id}.{fmt}"
shutil.copy2(str(full_path), str(tmp_input2))
conv_result = subprocess.run(
["libreoffice", "--headless", "--convert-to", target_fmt, "--outdir", str(tmp_dir), str(tmp_input2)],
capture_output=True, text=True, timeout=60,
)
tmp_input2.unlink(missing_ok=True)
conv_file = tmp_dir / f"convert_{document_id}.{target_fmt}"
if conv_file.exists():
final_path = derived_dir / f"{document_id}.{target_fmt}"
shutil.move(str(conv_file), str(final_path))
nas_root = Path(settings.nas_mount_path)
doc.derived_path = str(final_path.relative_to(nas_root))
doc.original_format = doc.file_format
doc.conversion_status = "done"
logger.info(f"[ODF변환] {doc.file_path} → derived: {doc.derived_path}")
else:
doc.conversion_status = "failed"
logger.warning(f"[ODF변환] 실패: {conv_result.stderr[:200]}")
except Exception as e:
doc.conversion_status = "failed"
logger.error(f"[ODF변환] {doc.file_path} 에러: {e}")
else:
doc.conversion_status = "none"
return
# ─── 미지원 포맷 ───
doc.extracted_text = ""
doc.extracted_at = datetime.now(timezone.utc)
doc.extractor_version = f"unsupported_{fmt}"
logger.warning(f"[미지원] {doc.file_path} (format={fmt})")