"""파일 감시 워커 — Inbox/Recordings/Videos 스캔, 새/변경 파일 자동 등록. §3 확장: - 스캔 대상: PKM/Inbox (문서) + PKM/Recordings (오디오) + PKM/Videos (비디오) - 확장자 → category 매핑 (audio/video) - video 채널 정책: 웹 업로드는 upload 엔드포인트에서 mov/mkv/avi 거부. NAS 드롭은 여기서 quarantine import (category='video', needs_conversion=true, stage 없음). - Roon 음원 경로(prefix match) skip — settings.roon_library_path - 파이프 분기: audio → stage='stt', video direct-play → stage='thumbnail', video quarantine → stage 없음 (처리 안 함, UI 에서 재생 불가 안내) """ from pathlib import Path from sqlalchemy import select from core.config import settings from core.database import async_session from core.utils import file_hash, setup_logger from models.document import Document from models.queue import enqueue_stage logger = setup_logger("file_watcher") # 무시할 파일 SKIP_NAMES = {".DS_Store", "Thumbs.db", "desktop.ini", "Icon\r"} SKIP_EXTENSIONS = {".tmp", ".part", ".crdownload", ".uploading"} # §3 확장자 매핑 AUDIO_EXTS = {".mp3", ".m4a", ".opus", ".wav", ".flac", ".ogg"} VIDEO_DIRECT_EXTS = {".mp4", ".webm"} # 브라우저 direct play VIDEO_QUARANTINE_EXTS = {".mov", ".mkv", ".avi"} # 변환 필요, 보관만 # library (외부 작성 학습 자료) 폴더 — md/pdf/docx 등 문서 확장자만 수락 LIBRARY_DOC_EXTS = {".md", ".pdf", ".docx", ".doc", ".txt", ".rtf", ".html", ".odt"} # 스캔 대상: (하위경로, 예상 category) — None 은 문서함(카테고리 미지정) SCAN_TARGETS: list[tuple[str, str | None]] = [ ("Inbox", None), ("Recordings", "audio"), ("Videos", "video"), ] def should_skip(path: Path) -> bool: if path.name in SKIP_NAMES or path.name.startswith("._"): return True if path.suffix.lower() in SKIP_EXTENSIONS: return True # .derived / .preview / .thumbs 는 파생물 디렉토리 if ".derived" in path.parts or ".preview" in path.parts or ".thumbs" in path.parts: return True # Roon 라이브러리 skip (설정된 경우만) roon = settings.roon_library_path if roon and str(path).startswith(roon): return True return False def _route_media(path: Path, expected_category: str | None) -> tuple[str | None, bool, str | None]: """확장자 기반으로 (category, needs_conversion, next_stage) 결정. - Inbox 드롭: expected_category=None — 문서 확장자면 기존 'extract' 파이프, audio/video 확장자면 혼란 방지로 skip (사용자가 Recordings/Videos 로 넣도록 유도) - Recordings 드롭: audio 확장자만 수락. 그 외는 skip (log) - Videos 드롭: direct-play → category+thumbnail, quarantine → category만 (needs_conversion=true) """ ext = path.suffix.lower() if expected_category == "audio": if ext in AUDIO_EXTS: return ("audio", False, "stt") return (None, False, None) # audio 폴더에 엉뚱한 포맷 → skip if expected_category == "video": if ext in VIDEO_DIRECT_EXTS: return ("video", False, "thumbnail") if ext in VIDEO_QUARANTINE_EXTS: # quarantine — category 설정하되 stage 안 걸어둠 (재생 불가 안내만) return ("video", True, None) return (None, False, None) # 기타 → skip if expected_category == "library": # 외부 작성 학습 자료 (KGS Code, 시행규칙 등). 문서 확장자만 수락. # frontmatter 해석은 classify_worker (옵션 C) 가 담당. file_watcher 는 라우팅만. if ext in LIBRARY_DOC_EXTS: return ("library", False, "extract") if ext in AUDIO_EXTS or ext in VIDEO_DIRECT_EXTS or ext in VIDEO_QUARANTINE_EXTS: return (None, False, None) # audio/video 잘못 들어오면 skip return (None, False, None) # 기타 알 수 없는 확장자 skip # Inbox: 문서 파이프 (기존). audio/video 확장자가 실수로 여기 들어오면 skip. if ext in AUDIO_EXTS or ext in VIDEO_DIRECT_EXTS or ext in VIDEO_QUARANTINE_EXTS: return (None, False, None) return (None, False, "extract") async def watch_inbox(): """PKM 하위 디렉토리를 스캔하여 새/변경 파일을 DB 등록 + 파이프 투입.""" pkm_root = Path(settings.nas_mount_path) / "PKM" if not pkm_root.exists(): return new_count = 0 changed_count = 0 # 동적 스캔 대상 합성: 기본 (Inbox/Recordings/Videos) + env 로 확장된 library 경로 # settings.additional_watch_targets 는 PKM 상대 경로 리스트 (예: "Knowledge/Industrial_Safety/가스기사/KGS_Code") targets = list(SCAN_TARGETS) for extra_path in settings.additional_watch_targets: targets.append((extra_path, "library")) async with async_session() as session: for sub, expected_category in targets: scan_root = pkm_root / sub if not scan_root.exists(): continue for file_path in scan_root.rglob("*"): if not file_path.is_file() or should_skip(file_path): continue category, needs_conversion, next_stage = _route_media( file_path, expected_category ) # audio/video 폴더에 엉뚱한 확장자가 들어왔거나 Inbox 에 # audio/video 가 잘못 떨어진 경우 — 이 라운드에서 아예 skip if category is None and next_stage is None: continue rel_path = str(file_path.relative_to(Path(settings.nas_mount_path))) fhash = file_hash(file_path) result = await session.execute( select(Document).where(Document.file_path == rel_path) ) existing = result.scalar_one_or_none() if existing is None: ext = file_path.suffix.lstrip(".").lower() or "unknown" doc = Document( file_path=rel_path, file_hash=fhash, file_format=ext, file_size=file_path.stat().st_size, file_type="immutable", title=file_path.stem, source_channel="drive_sync", category=category, needs_conversion=needs_conversion, ) session.add(doc) await session.flush() if next_stage: await enqueue_stage(session, doc.id, next_stage) new_count += 1 elif existing.file_hash != fhash: existing.file_hash = fhash existing.file_size = file_path.stat().st_size # 기존 문서에 category/quarantine flag 가 비어있으면 보정 if existing.category is None and category is not None: existing.category = category if needs_conversion and not getattr(existing, "needs_conversion", False): existing.needs_conversion = True if next_stage: await enqueue_stage(session, existing.id, next_stage) changed_count += 1 await session.commit() if new_count or changed_count: logger.info(f"[Inbox+§3] 새 파일 {new_count}건, 변경 파일 {changed_count}건 등록")