Files
hyungi_document_server/app/workers/file_watcher.py
T
Hyungi Ahn 8f25d396df feat(upload): §4-독립 — error_code 체계 + .uploading orphan cleanup + 진행률/abort UX
plan: ~/.claude/plans/luminous-sprouting-hamster.md §4 (1GB/stt/dashboard 외 독립 항목)

backend:
- _upload_error(status, code, msg) 헬퍼 정의 (§3 가 호출만 추가했던 누락 수정).
  detail = {error_code, message} — 프론트가 error_code 로 분기.
- upload_document 의 모든 HTTPException 을 _upload_error 로 전환:
  body_too_large / invalid_input / empty_file / unsupported_codec / internal
- ClientDisconnect → 499 network_abort + 임시파일 정리.
  asyncio.TimeoutError → 408 upload_timeout.
- 쓰기 중 .uploading 임시명 → 완료 후 staging.replace(target) atomic rename.
  → 프로세스 크래시 잔존물은 cleanup_orphan_uploads 가 수거.
- file_watcher SKIP_EXTENSIONS 에 .uploading 추가 (오해 픽업 방지).

cleanup scheduler:
- workers/upload_cleanup.py 신규. 10분 주기로 Inbox 하위 *.uploading 중
  mtime > orphan_max_age_sec(3600) 인 파일 삭제.
- 최근 3회 (≈30분) 누적 삭제 수가 cleanup_warn_threshold(10) 이상이면
  WARNING 로그. in-memory deque (재시작 시 리셋) — 집요한 이슈만 잡는 목적.
- core/config.py UploadConfig 에 두 임계치 필드 (defaults — config.yaml override 무관).

frontend:
- api.ts: ApiError 에 optional errorCode/errorMessage 필드 (detail string 유지로
  기존 5+ 소비자 호환). parseDetail() 가 {error_code, message} 객체 응답을 풀어
  정규화. uploadFile(path, formData, {signal, onProgress}) XHR 헬퍼 신규
  (fetch() 가 upload progress 미지원이라 XHR). 401 refresh 1회 정책 동일.
- UploadDropzone.svelte 재작성: 진행률 바, 파일별/전체 abort 버튼, 페이지 이탈
  beforeunload 경고, errorCode 별 토스트 메시지 분기 (7 코드 — body_too_large /
  upload_timeout / network_abort / empty_file / invalid_input / unsupported_codec /
  internal). 컴포넌트 unmount 시 진행 중 업로드 abort.

보류:
- max_bytes 1GB 상향 + Caddyfile 1100MB (별도 결정으로 100MB 유지)
- /dashboard 카테고리 카드 (별도 plan)
- docs/categories.md (§1-3 정의 안착 후)

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-24 06:57:02 +09:00

159 lines
6.4 KiB
Python

"""파일 감시 워커 — Inbox/Recordings/Videos 스캔, 새/변경 파일 자동 등록.
§3 확장:
- 스캔 대상: PKM/Inbox (문서) + PKM/Recordings (오디오) + PKM/Videos (비디오)
- 확장자 → category 매핑 (audio/video)
- video 채널 정책: 웹 업로드는 upload 엔드포인트에서 mov/mkv/avi 거부.
NAS 드롭은 여기서 quarantine import (category='video', needs_conversion=true, stage 없음).
- Roon 음원 경로(prefix match) skip — settings.roon_library_path
- 파이프 분기: audio → stage='stt', video direct-play → stage='thumbnail',
video quarantine → stage 없음 (처리 안 함, UI 에서 재생 불가 안내)
"""
from pathlib import Path
from sqlalchemy import select
from core.config import settings
from core.database import async_session
from core.utils import file_hash, setup_logger
from models.document import Document
from models.queue import enqueue_stage
logger = setup_logger("file_watcher")
# 무시할 파일
SKIP_NAMES = {".DS_Store", "Thumbs.db", "desktop.ini", "Icon\r"}
SKIP_EXTENSIONS = {".tmp", ".part", ".crdownload", ".uploading"}
# §3 확장자 매핑
AUDIO_EXTS = {".mp3", ".m4a", ".opus", ".wav", ".flac", ".ogg"}
VIDEO_DIRECT_EXTS = {".mp4", ".webm"} # 브라우저 direct play
VIDEO_QUARANTINE_EXTS = {".mov", ".mkv", ".avi"} # 변환 필요, 보관만
# 스캔 대상: (하위경로, 예상 category) — None 은 문서함(카테고리 미지정)
SCAN_TARGETS: list[tuple[str, str | None]] = [
("Inbox", None),
("Recordings", "audio"),
("Videos", "video"),
]
def should_skip(path: Path) -> bool:
if path.name in SKIP_NAMES or path.name.startswith("._"):
return True
if path.suffix.lower() in SKIP_EXTENSIONS:
return True
# .derived / .preview / .thumbs 는 파생물 디렉토리
if ".derived" in path.parts or ".preview" in path.parts or ".thumbs" in path.parts:
return True
# Roon 라이브러리 skip (설정된 경우만)
roon = settings.roon_library_path
if roon and str(path).startswith(roon):
return True
return False
def _route_media(path: Path, expected_category: str | None) -> tuple[str | None, bool, str | None]:
"""확장자 기반으로 (category, needs_conversion, next_stage) 결정.
- Inbox 드롭: expected_category=None — 문서 확장자면 기존 'extract' 파이프,
audio/video 확장자면 혼란 방지로 skip (사용자가 Recordings/Videos 로 넣도록 유도)
- Recordings 드롭: audio 확장자만 수락. 그 외는 skip (log)
- Videos 드롭: direct-play → category+thumbnail, quarantine → category만 (needs_conversion=true)
"""
ext = path.suffix.lower()
if expected_category == "audio":
if ext in AUDIO_EXTS:
return ("audio", False, "stt")
return (None, False, None) # audio 폴더에 엉뚱한 포맷 → skip
if expected_category == "video":
if ext in VIDEO_DIRECT_EXTS:
return ("video", False, "thumbnail")
if ext in VIDEO_QUARANTINE_EXTS:
# quarantine — category 설정하되 stage 안 걸어둠 (재생 불가 안내만)
return ("video", True, None)
return (None, False, None) # 기타 → skip
# Inbox: 문서 파이프 (기존). audio/video 확장자가 실수로 여기 들어오면 skip.
if ext in AUDIO_EXTS or ext in VIDEO_DIRECT_EXTS or ext in VIDEO_QUARANTINE_EXTS:
return (None, False, None)
return (None, False, "extract")
async def watch_inbox():
"""PKM 하위 디렉토리를 스캔하여 새/변경 파일을 DB 등록 + 파이프 투입."""
pkm_root = Path(settings.nas_mount_path) / "PKM"
if not pkm_root.exists():
return
new_count = 0
changed_count = 0
async with async_session() as session:
for sub, expected_category in SCAN_TARGETS:
scan_root = pkm_root / sub
if not scan_root.exists():
continue
for file_path in scan_root.rglob("*"):
if not file_path.is_file() or should_skip(file_path):
continue
category, needs_conversion, next_stage = _route_media(
file_path, expected_category
)
# audio/video 폴더에 엉뚱한 확장자가 들어왔거나 Inbox 에
# audio/video 가 잘못 떨어진 경우 — 이 라운드에서 아예 skip
if category is None and next_stage is None:
continue
rel_path = str(file_path.relative_to(Path(settings.nas_mount_path)))
fhash = file_hash(file_path)
result = await session.execute(
select(Document).where(Document.file_path == rel_path)
)
existing = result.scalar_one_or_none()
if existing is None:
ext = file_path.suffix.lstrip(".").lower() or "unknown"
doc = Document(
file_path=rel_path,
file_hash=fhash,
file_format=ext,
file_size=file_path.stat().st_size,
file_type="immutable",
title=file_path.stem,
source_channel="drive_sync",
category=category,
needs_conversion=needs_conversion,
)
session.add(doc)
await session.flush()
if next_stage:
await enqueue_stage(session, doc.id, next_stage)
new_count += 1
elif existing.file_hash != fhash:
existing.file_hash = fhash
existing.file_size = file_path.stat().st_size
# 기존 문서에 category/quarantine flag 가 비어있으면 보정
if existing.category is None and category is not None:
existing.category = category
if needs_conversion and not getattr(existing, "needs_conversion", False):
existing.needs_conversion = True
if next_stage:
await enqueue_stage(session, existing.id, next_stage)
changed_count += 1
await session.commit()
if new_count or changed_count:
logger.info(f"[Inbox+§3] 새 파일 {new_count}건, 변경 파일 {changed_count}건 등록")