Files
hyungi_document_server/app/workers/extract_worker.py
Hyungi Ahn 24142ea605 fix: Codex 리뷰 5건 수정 (critical 1 + high 4)
1. [critical] config.yaml → settings 객체에서 taxonomy 로드 (import crash 방지)
2. [high] ODF 변환: file_path 유지, derived_path 별도 필드 (무한 중복 방지)
3. [high] 법령 분할: 첫 장 이전 조문을 "서문"으로 보존
4. [high] Inbox: review_status 필드 분리 (pending/approved/rejected)
5. [high] 삭제: soft-delete (deleted_at) + worker 방어 + active_documents 뷰
   - 모든 조회에 deleted_at IS NULL 일관 적용
   - queue_consumer: row 없으면 gracefully skip

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-06 07:15:13 +09:00

168 lines
6.7 KiB
Python

"""텍스트 추출 워커 — kordoc / LibreOffice / 직접 읽기"""
import subprocess
from datetime import datetime, timezone
from pathlib import Path
import httpx
from sqlalchemy.ext.asyncio import AsyncSession
from core.config import settings
from core.utils import setup_logger
from models.document import Document
logger = setup_logger("extract_worker")
# kordoc으로 파싱 가능한 포맷
KORDOC_FORMATS = {"hwp", "hwpx", "pdf"}
# 직접 읽기 가능한 텍스트 포맷
TEXT_FORMATS = {"md", "txt", "csv", "json", "xml", "html"}
# LibreOffice로 텍스트 추출 가능한 포맷
OFFICE_FORMATS = {"xlsx", "xls", "docx", "doc", "pptx", "ppt", "odt", "ods", "odp", "odoc", "osheet"}
# OCR 필요 이미지 포맷 (Phase 2)
IMAGE_FORMATS = {"jpg", "jpeg", "png", "tiff", "tif", "bmp", "gif"}
EXTRACTOR_VERSION = "kordoc@1.7"
async def process(document_id: int, session: AsyncSession) -> None:
"""문서 텍스트 추출"""
doc = await session.get(Document, document_id)
if not doc:
raise ValueError(f"문서 ID {document_id}를 찾을 수 없음")
fmt = doc.file_format.lower()
full_path = Path(settings.nas_mount_path) / doc.file_path
# 텍스트 파일 — 직접 읽기
if fmt in TEXT_FORMATS:
if not full_path.exists():
raise FileNotFoundError(f"파일 없음: {full_path}")
text = full_path.read_text(encoding="utf-8", errors="replace")
doc.extracted_text = text
doc.extracted_at = datetime.now(timezone.utc)
doc.extractor_version = "direct_read"
logger.info(f"[텍스트] {doc.file_path} ({len(text)}자)")
return
# 이미지 — 스킵 (Phase 2 OCR)
if fmt in IMAGE_FORMATS:
doc.extracted_text = ""
doc.extracted_at = datetime.now(timezone.utc)
doc.extractor_version = "skip_image"
logger.info(f"[이미지] {doc.file_path} — OCR 미구현, 스킵")
return
# kordoc 파싱 (HWP/HWPX/PDF)
if fmt in KORDOC_FORMATS:
# 컨테이너 내부 경로: /documents/{file_path}
container_path = f"/documents/{doc.file_path}"
async with httpx.AsyncClient(timeout=60) as client:
resp = await client.post(
f"{settings.kordoc_endpoint}/parse",
json={"filePath": container_path},
)
if resp.status_code == 404:
raise FileNotFoundError(f"kordoc: 파일 없음 — {container_path}")
if resp.status_code == 422:
raise ValueError(f"kordoc: 파싱 실패 — {resp.json().get('error', 'unknown')}")
resp.raise_for_status()
data = resp.json()
doc.extracted_text = data.get("markdown", "")
doc.extracted_at = datetime.now(timezone.utc)
doc.extractor_version = EXTRACTOR_VERSION
logger.info(f"[kordoc] {doc.file_path} ({len(doc.extracted_text)}자)")
return
# 오피스 포맷 — LibreOffice 텍스트 변환
if fmt in OFFICE_FORMATS:
if not full_path.exists():
raise FileNotFoundError(f"파일 없음: {full_path}")
import shutil
tmp_dir = Path("/tmp/extract_work")
tmp_dir.mkdir(exist_ok=True)
# 한글 파일명 문제 방지 — 영문 임시 파일로 복사
tmp_input = tmp_dir / f"input_{document_id}.{fmt}"
shutil.copy2(str(full_path), str(tmp_input))
# 스프레드시트는 csv, 나머지는 txt
CALC_FORMATS = {"xlsx", "xls", "ods", "osheet"}
if fmt in CALC_FORMATS:
convert_to = "csv:Text - txt - csv (StarCalc):44,34,76,1"
out_ext = "csv"
else:
convert_to = "txt:Text"
out_ext = "txt"
try:
result = subprocess.run(
["libreoffice", "--headless", "--convert-to", convert_to, "--outdir", str(tmp_dir), str(tmp_input)],
capture_output=True, text=True, timeout=60,
)
out_file = tmp_dir / f"input_{document_id}.{out_ext}"
if out_file.exists():
text = out_file.read_text(encoding="utf-8", errors="replace")
doc.extracted_text = text[:15000]
doc.extracted_at = datetime.now(timezone.utc)
doc.extractor_version = "libreoffice"
out_file.unlink()
logger.info(f"[LibreOffice] {doc.file_path} ({len(text)}자)")
else:
raise RuntimeError(f"LibreOffice 변환 실패: {result.stderr[:300]}")
except subprocess.TimeoutExpired:
raise RuntimeError(f"LibreOffice 텍스트 추출 timeout (60s)")
finally:
tmp_input.unlink(missing_ok=True)
# ─── ODF 변환 (편집용) ───
CONVERT_MAP = {
'xlsx': 'ods', 'xls': 'ods',
'docx': 'odt', 'doc': 'odt',
'pptx': 'odp', 'ppt': 'odp',
}
target_fmt = CONVERT_MAP.get(fmt)
if target_fmt:
try:
# .derived 디렉토리에 변환 (file_path는 원본 유지!)
derived_dir = full_path.parent / ".derived"
derived_dir.mkdir(exist_ok=True)
tmp_input2 = tmp_dir / f"convert_{document_id}.{fmt}"
shutil.copy2(str(full_path), str(tmp_input2))
conv_result = subprocess.run(
["libreoffice", "--headless", "--convert-to", target_fmt, "--outdir", str(tmp_dir), str(tmp_input2)],
capture_output=True, text=True, timeout=60,
)
tmp_input2.unlink(missing_ok=True)
conv_file = tmp_dir / f"convert_{document_id}.{target_fmt}"
if conv_file.exists():
final_path = derived_dir / f"{document_id}.{target_fmt}"
shutil.move(str(conv_file), str(final_path))
nas_root = Path(settings.nas_mount_path)
doc.derived_path = str(final_path.relative_to(nas_root))
doc.original_format = doc.file_format
doc.conversion_status = "done"
logger.info(f"[ODF변환] {doc.file_path} → derived: {doc.derived_path}")
else:
doc.conversion_status = "failed"
logger.warning(f"[ODF변환] 실패: {conv_result.stderr[:200]}")
except Exception as e:
doc.conversion_status = "failed"
logger.error(f"[ODF변환] {doc.file_path} 에러: {e}")
else:
doc.conversion_status = "none"
return
# 미지원 포맷
doc.extracted_text = ""
doc.extracted_at = datetime.now(timezone.utc)
doc.extractor_version = f"unsupported_{fmt}"
logger.warning(f"[미지원] {doc.file_path} (format={fmt})")