1. [critical] config.yaml → settings 객체에서 taxonomy 로드 (import crash 방지) 2. [high] ODF 변환: file_path 유지, derived_path 별도 필드 (무한 중복 방지) 3. [high] 법령 분할: 첫 장 이전 조문을 "서문"으로 보존 4. [high] Inbox: review_status 필드 분리 (pending/approved/rejected) 5. [high] 삭제: soft-delete (deleted_at) + worker 방어 + active_documents 뷰 - 모든 조회에 deleted_at IS NULL 일관 적용 - queue_consumer: row 없으면 gracefully skip Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
168 lines
6.7 KiB
Python
168 lines
6.7 KiB
Python
"""텍스트 추출 워커 — kordoc / LibreOffice / 직접 읽기"""
|
|
|
|
import subprocess
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
|
|
import httpx
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from core.config import settings
|
|
from core.utils import setup_logger
|
|
from models.document import Document
|
|
|
|
logger = setup_logger("extract_worker")
|
|
|
|
# kordoc으로 파싱 가능한 포맷
|
|
KORDOC_FORMATS = {"hwp", "hwpx", "pdf"}
|
|
# 직접 읽기 가능한 텍스트 포맷
|
|
TEXT_FORMATS = {"md", "txt", "csv", "json", "xml", "html"}
|
|
# LibreOffice로 텍스트 추출 가능한 포맷
|
|
OFFICE_FORMATS = {"xlsx", "xls", "docx", "doc", "pptx", "ppt", "odt", "ods", "odp", "odoc", "osheet"}
|
|
# OCR 필요 이미지 포맷 (Phase 2)
|
|
IMAGE_FORMATS = {"jpg", "jpeg", "png", "tiff", "tif", "bmp", "gif"}
|
|
|
|
EXTRACTOR_VERSION = "kordoc@1.7"
|
|
|
|
|
|
async def process(document_id: int, session: AsyncSession) -> None:
|
|
"""문서 텍스트 추출"""
|
|
doc = await session.get(Document, document_id)
|
|
if not doc:
|
|
raise ValueError(f"문서 ID {document_id}를 찾을 수 없음")
|
|
|
|
fmt = doc.file_format.lower()
|
|
full_path = Path(settings.nas_mount_path) / doc.file_path
|
|
|
|
# 텍스트 파일 — 직접 읽기
|
|
if fmt in TEXT_FORMATS:
|
|
if not full_path.exists():
|
|
raise FileNotFoundError(f"파일 없음: {full_path}")
|
|
text = full_path.read_text(encoding="utf-8", errors="replace")
|
|
doc.extracted_text = text
|
|
doc.extracted_at = datetime.now(timezone.utc)
|
|
doc.extractor_version = "direct_read"
|
|
logger.info(f"[텍스트] {doc.file_path} ({len(text)}자)")
|
|
return
|
|
|
|
# 이미지 — 스킵 (Phase 2 OCR)
|
|
if fmt in IMAGE_FORMATS:
|
|
doc.extracted_text = ""
|
|
doc.extracted_at = datetime.now(timezone.utc)
|
|
doc.extractor_version = "skip_image"
|
|
logger.info(f"[이미지] {doc.file_path} — OCR 미구현, 스킵")
|
|
return
|
|
|
|
# kordoc 파싱 (HWP/HWPX/PDF)
|
|
if fmt in KORDOC_FORMATS:
|
|
# 컨테이너 내부 경로: /documents/{file_path}
|
|
container_path = f"/documents/{doc.file_path}"
|
|
async with httpx.AsyncClient(timeout=60) as client:
|
|
resp = await client.post(
|
|
f"{settings.kordoc_endpoint}/parse",
|
|
json={"filePath": container_path},
|
|
)
|
|
|
|
if resp.status_code == 404:
|
|
raise FileNotFoundError(f"kordoc: 파일 없음 — {container_path}")
|
|
if resp.status_code == 422:
|
|
raise ValueError(f"kordoc: 파싱 실패 — {resp.json().get('error', 'unknown')}")
|
|
resp.raise_for_status()
|
|
|
|
data = resp.json()
|
|
doc.extracted_text = data.get("markdown", "")
|
|
doc.extracted_at = datetime.now(timezone.utc)
|
|
doc.extractor_version = EXTRACTOR_VERSION
|
|
logger.info(f"[kordoc] {doc.file_path} ({len(doc.extracted_text)}자)")
|
|
return
|
|
|
|
# 오피스 포맷 — LibreOffice 텍스트 변환
|
|
if fmt in OFFICE_FORMATS:
|
|
if not full_path.exists():
|
|
raise FileNotFoundError(f"파일 없음: {full_path}")
|
|
|
|
import shutil
|
|
tmp_dir = Path("/tmp/extract_work")
|
|
tmp_dir.mkdir(exist_ok=True)
|
|
|
|
# 한글 파일명 문제 방지 — 영문 임시 파일로 복사
|
|
tmp_input = tmp_dir / f"input_{document_id}.{fmt}"
|
|
shutil.copy2(str(full_path), str(tmp_input))
|
|
|
|
# 스프레드시트는 csv, 나머지는 txt
|
|
CALC_FORMATS = {"xlsx", "xls", "ods", "osheet"}
|
|
if fmt in CALC_FORMATS:
|
|
convert_to = "csv:Text - txt - csv (StarCalc):44,34,76,1"
|
|
out_ext = "csv"
|
|
else:
|
|
convert_to = "txt:Text"
|
|
out_ext = "txt"
|
|
|
|
try:
|
|
result = subprocess.run(
|
|
["libreoffice", "--headless", "--convert-to", convert_to, "--outdir", str(tmp_dir), str(tmp_input)],
|
|
capture_output=True, text=True, timeout=60,
|
|
)
|
|
out_file = tmp_dir / f"input_{document_id}.{out_ext}"
|
|
if out_file.exists():
|
|
text = out_file.read_text(encoding="utf-8", errors="replace")
|
|
doc.extracted_text = text[:15000]
|
|
doc.extracted_at = datetime.now(timezone.utc)
|
|
doc.extractor_version = "libreoffice"
|
|
out_file.unlink()
|
|
logger.info(f"[LibreOffice] {doc.file_path} ({len(text)}자)")
|
|
else:
|
|
raise RuntimeError(f"LibreOffice 변환 실패: {result.stderr[:300]}")
|
|
except subprocess.TimeoutExpired:
|
|
raise RuntimeError(f"LibreOffice 텍스트 추출 timeout (60s)")
|
|
finally:
|
|
tmp_input.unlink(missing_ok=True)
|
|
|
|
# ─── ODF 변환 (편집용) ───
|
|
CONVERT_MAP = {
|
|
'xlsx': 'ods', 'xls': 'ods',
|
|
'docx': 'odt', 'doc': 'odt',
|
|
'pptx': 'odp', 'ppt': 'odp',
|
|
}
|
|
target_fmt = CONVERT_MAP.get(fmt)
|
|
if target_fmt:
|
|
try:
|
|
# .derived 디렉토리에 변환 (file_path는 원본 유지!)
|
|
derived_dir = full_path.parent / ".derived"
|
|
derived_dir.mkdir(exist_ok=True)
|
|
tmp_input2 = tmp_dir / f"convert_{document_id}.{fmt}"
|
|
shutil.copy2(str(full_path), str(tmp_input2))
|
|
|
|
conv_result = subprocess.run(
|
|
["libreoffice", "--headless", "--convert-to", target_fmt, "--outdir", str(tmp_dir), str(tmp_input2)],
|
|
capture_output=True, text=True, timeout=60,
|
|
)
|
|
tmp_input2.unlink(missing_ok=True)
|
|
|
|
conv_file = tmp_dir / f"convert_{document_id}.{target_fmt}"
|
|
if conv_file.exists():
|
|
final_path = derived_dir / f"{document_id}.{target_fmt}"
|
|
shutil.move(str(conv_file), str(final_path))
|
|
|
|
nas_root = Path(settings.nas_mount_path)
|
|
doc.derived_path = str(final_path.relative_to(nas_root))
|
|
doc.original_format = doc.file_format
|
|
doc.conversion_status = "done"
|
|
logger.info(f"[ODF변환] {doc.file_path} → derived: {doc.derived_path}")
|
|
else:
|
|
doc.conversion_status = "failed"
|
|
logger.warning(f"[ODF변환] 실패: {conv_result.stderr[:200]}")
|
|
except Exception as e:
|
|
doc.conversion_status = "failed"
|
|
logger.error(f"[ODF변환] {doc.file_path} 에러: {e}")
|
|
else:
|
|
doc.conversion_status = "none"
|
|
|
|
return
|
|
|
|
# 미지원 포맷
|
|
doc.extracted_text = ""
|
|
doc.extracted_at = datetime.now(timezone.utc)
|
|
doc.extractor_version = f"unsupported_{fmt}"
|
|
logger.warning(f"[미지원] {doc.file_path} (format={fmt})")
|