"""파일 감시 워커 — Inbox 디렉토리 스캔, 새 파일/변경 파일 자동 등록""" from pathlib import Path from sqlalchemy import select from core.config import settings from core.database import async_session from core.utils import file_hash, setup_logger from models.document import Document from models.queue import ProcessingQueue logger = setup_logger("file_watcher") # 무시할 파일 SKIP_NAMES = {".DS_Store", "Thumbs.db", "desktop.ini", "Icon\r"} SKIP_EXTENSIONS = {".tmp", ".part", ".crdownload"} def should_skip(path: Path) -> bool: if path.name in SKIP_NAMES or path.name.startswith("._"): return True if path.suffix.lower() in SKIP_EXTENSIONS: return True # .derived/ 및 .preview/ 디렉토리 내 파일 제외 if ".derived" in path.parts or ".preview" in path.parts: return True return False async def watch_inbox(): """Inbox 디렉토리를 스캔하여 새/변경 파일을 DB에 등록""" inbox_path = Path(settings.nas_mount_path) / "PKM" / "Inbox" if not inbox_path.exists(): return files = [f for f in inbox_path.rglob("*") if f.is_file() and not should_skip(f)] if not files: return new_count = 0 changed_count = 0 async with async_session() as session: for file_path in files: rel_path = str(file_path.relative_to(Path(settings.nas_mount_path))) fhash = file_hash(file_path) # DB에서 기존 문서 확인 result = await session.execute( select(Document).where(Document.file_path == rel_path) ) existing = result.scalar_one_or_none() if existing is None: # 새 파일 → 등록 ext = file_path.suffix.lstrip(".").lower() or "unknown" doc = Document( file_path=rel_path, file_hash=fhash, file_format=ext, file_size=file_path.stat().st_size, file_type="immutable", title=file_path.stem, source_channel="drive_sync", ) session.add(doc) await session.flush() session.add(ProcessingQueue( document_id=doc.id, stage="extract", status="pending", )) new_count += 1 elif existing.file_hash != fhash: # 해시 변경 → 재가공 existing.file_hash = fhash existing.file_size = file_path.stat().st_size # 기존 pending/processing 큐 항목이 없으면 extract부터 재시작 queue_check = await session.execute( select(ProcessingQueue).where( ProcessingQueue.document_id == existing.id, ProcessingQueue.status.in_(["pending", "processing"]), ) ) if not queue_check.scalar_one_or_none(): session.add(ProcessingQueue( document_id=existing.id, stage="extract", status="pending", )) changed_count += 1 await session.commit() if new_count or changed_count: logger.info(f"[Inbox] 새 파일 {new_count}건, 변경 파일 {changed_count}건 등록")