Files
hyungi_document_server/app/workers/file_watcher.py
Hyungi Ahn 1b5fa95a9f feat: 오피스 → ODF 변환 + 원본/편집본 분리 아키텍처
- original_path/format/hash + conversion_status 필드 추가 (migration 007)
- extract_worker: 텍스트 추출 후 xlsx→ods, docx→odt 등 ODF 변환
  - 변환본은 .derived/{doc_id}.ods 에 저장
  - 원본 메타 보존 (original_path/format/hash)
- file_watcher: .derived/ .preview/ 디렉토리 제외
- DocumentViewer: ODF 포맷이면 편집 버튼 자동 표시
  - edit_url 있으면 "편집", 없으면 "Synology Drive에서 열기"

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-03 13:11:43 +09:00

101 lines
3.4 KiB
Python

"""파일 감시 워커 — Inbox 디렉토리 스캔, 새 파일/변경 파일 자동 등록"""
from pathlib import Path
from sqlalchemy import select
from core.config import settings
from core.database import async_session
from core.utils import file_hash, setup_logger
from models.document import Document
from models.queue import ProcessingQueue
logger = setup_logger("file_watcher")
# 무시할 파일
SKIP_NAMES = {".DS_Store", "Thumbs.db", "desktop.ini", "Icon\r"}
SKIP_EXTENSIONS = {".tmp", ".part", ".crdownload"}
def should_skip(path: Path) -> bool:
if path.name in SKIP_NAMES or path.name.startswith("._"):
return True
if path.suffix.lower() in SKIP_EXTENSIONS:
return True
# .derived/ 및 .preview/ 디렉토리 내 파일 제외
if ".derived" in path.parts or ".preview" in path.parts:
return True
return False
async def watch_inbox():
"""Inbox 디렉토리를 스캔하여 새/변경 파일을 DB에 등록"""
inbox_path = Path(settings.nas_mount_path) / "PKM" / "Inbox"
if not inbox_path.exists():
return
files = [f for f in inbox_path.rglob("*") if f.is_file() and not should_skip(f)]
if not files:
return
new_count = 0
changed_count = 0
async with async_session() as session:
for file_path in files:
rel_path = str(file_path.relative_to(Path(settings.nas_mount_path)))
fhash = file_hash(file_path)
# DB에서 기존 문서 확인
result = await session.execute(
select(Document).where(Document.file_path == rel_path)
)
existing = result.scalar_one_or_none()
if existing is None:
# 새 파일 → 등록
ext = file_path.suffix.lstrip(".").lower() or "unknown"
doc = Document(
file_path=rel_path,
file_hash=fhash,
file_format=ext,
file_size=file_path.stat().st_size,
file_type="immutable",
title=file_path.stem,
source_channel="drive_sync",
)
session.add(doc)
await session.flush()
session.add(ProcessingQueue(
document_id=doc.id,
stage="extract",
status="pending",
))
new_count += 1
elif existing.file_hash != fhash:
# 해시 변경 → 재가공
existing.file_hash = fhash
existing.file_size = file_path.stat().st_size
# 기존 pending/processing 큐 항목이 없으면 extract부터 재시작
queue_check = await session.execute(
select(ProcessingQueue).where(
ProcessingQueue.document_id == existing.id,
ProcessingQueue.status.in_(["pending", "processing"]),
)
)
if not queue_check.scalar_one_or_none():
session.add(ProcessingQueue(
document_id=existing.id,
stage="extract",
status="pending",
))
changed_count += 1
await session.commit()
if new_count or changed_count:
logger.info(f"[Inbox] 새 파일 {new_count}건, 변경 파일 {changed_count}건 등록")