1e2c004dd4
plan: ~/.claude/plans/luminous-sprouting-hamster.md §3
스키마:
- migrations/147_audio_segments_table.sql: audio_segments (STT 타임스탬프
세그먼트)
- migrations/148_audio_segments_idx.sql: (document_id, start_s) idx
- migrations/149_document_media_cols.sql: documents.thumbnail_path +
needs_conversion
- migrations/150_queue_stage_stt.sql: process_stage += 'stt'
- migrations/151_queue_stage_thumbnail.sql: process_stage += 'thumbnail'
- app/models/audio_segment.py, document.py (thumbnail_path/needs_conversion)
서비스:
- services/stt/{Dockerfile, requirements.txt, server.py} — faster-whisper
large-v3 GPU 컨테이너. /transcribe (filePath/langs/beamSize) +
/health + /ready (cuda device_count + model_loaded). NFC/NFD 경로
resolver (OCR 교훈).
- docker-compose.yml: stt-service 추가 (GPU 1 예약, :3300, NAS ro mount,
stt_models volume, start_period 300s), fastapi env 에 STT_ENDPOINT.
파이프라인 (의존 §1 category):
- app/workers/stt_worker.py 신규: stage='stt' pickup → STT_ENDPOINT 호출 →
extracted_text + audio_segments 저장. Timeout 30분.
- app/workers/thumbnail_worker.py 신규: ffmpeg 50% 지점 1장 →
PKM/Videos/.thumbs/{id}.jpg + thumbnail_path 세팅.
needs_conversion=true 는 skip.
- app/workers/file_watcher.py 확장: PKM/{Inbox, Recordings, Videos}
스캔. 확장자→category, audio→stage=stt, video .mp4/.webm→
stage=thumbnail, video .mov/.mkv/.avi→needs_conversion=true + stage
없음. settings.roon_library_path prefix skip.
- app/workers/queue_consumer.py 확장: stt + thumbnail workers 등록,
BATCH_SIZE(stt=1, thumbnail=3), next_stages 에 stt→[classify] 추가
(audio 는 extract 건너뜀).
- app/Dockerfile: ffmpeg 추가 (썸네일 subprocess 용).
API (의존 §1):
- /api/audio/{id}/segments — AudioSegment ORDER BY start_s
- /api/video/{id}/thumbnail — thumbnail_path FileResponse (쿼리 토큰)
- /api/documents/{id}/file: media_types 에 audio/video mime 포함 (§2
커밋에 이미 포함). Starlette FileResponse 가 Range 자동.
- upload_document: .mov/.mkv/.avi 웹 업로드 거부 (error_code
unsupported_codec). NAS 드롭은 file_watcher 가 quarantine 수용.
프론트:
- AudioPlayer.svelte: HTML5 audio + 전사 세그먼트 sticky 패널 + 줄
클릭 seek. activeIdx 하이라이트.
- VideoPlayer.svelte: HTML5 video direct play + needs_conversion 안내
카드. poster 는 thumbnail endpoint.
- /audio (목록 grid) + /audio/[id] (플레이어)
- /video (썸네일 grid + 변환 필요 배지) + /video/[id] (플레이어)
- Sidebar.svelte: Mic/Film 아이콘 + audio/video 네비 활성, count
배지 (§2 /stats/category-counts 재사용).
설정:
- app/core/config.py: stt_endpoint + roon_library_path.
DoD 배포 후 smoke: /ready cuda:true, 회의 mp3 transcribe, audio
extract 없이 classify 진행(queue 회귀), /audio 재생, .mp4 재생,
.mov 웹 400, .mov NAS quarantine, Sidebar 네비 + count.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
159 lines
6.4 KiB
Python
159 lines
6.4 KiB
Python
"""파일 감시 워커 — Inbox/Recordings/Videos 스캔, 새/변경 파일 자동 등록.
|
|
|
|
§3 확장:
|
|
- 스캔 대상: PKM/Inbox (문서) + PKM/Recordings (오디오) + PKM/Videos (비디오)
|
|
- 확장자 → category 매핑 (audio/video)
|
|
- video 채널 정책: 웹 업로드는 upload 엔드포인트에서 mov/mkv/avi 거부.
|
|
NAS 드롭은 여기서 quarantine import (category='video', needs_conversion=true, stage 없음).
|
|
- Roon 음원 경로(prefix match) skip — settings.roon_library_path
|
|
- 파이프 분기: audio → stage='stt', video direct-play → stage='thumbnail',
|
|
video quarantine → stage 없음 (처리 안 함, UI 에서 재생 불가 안내)
|
|
"""
|
|
|
|
from pathlib import Path
|
|
|
|
from sqlalchemy import select
|
|
|
|
from core.config import settings
|
|
from core.database import async_session
|
|
from core.utils import file_hash, setup_logger
|
|
from models.document import Document
|
|
from models.queue import enqueue_stage
|
|
|
|
logger = setup_logger("file_watcher")
|
|
|
|
# 무시할 파일
|
|
SKIP_NAMES = {".DS_Store", "Thumbs.db", "desktop.ini", "Icon\r"}
|
|
SKIP_EXTENSIONS = {".tmp", ".part", ".crdownload"}
|
|
|
|
# §3 확장자 매핑
|
|
AUDIO_EXTS = {".mp3", ".m4a", ".opus", ".wav", ".flac", ".ogg"}
|
|
VIDEO_DIRECT_EXTS = {".mp4", ".webm"} # 브라우저 direct play
|
|
VIDEO_QUARANTINE_EXTS = {".mov", ".mkv", ".avi"} # 변환 필요, 보관만
|
|
|
|
# 스캔 대상: (하위경로, 예상 category) — None 은 문서함(카테고리 미지정)
|
|
SCAN_TARGETS: list[tuple[str, str | None]] = [
|
|
("Inbox", None),
|
|
("Recordings", "audio"),
|
|
("Videos", "video"),
|
|
]
|
|
|
|
|
|
def should_skip(path: Path) -> bool:
|
|
if path.name in SKIP_NAMES or path.name.startswith("._"):
|
|
return True
|
|
if path.suffix.lower() in SKIP_EXTENSIONS:
|
|
return True
|
|
# .derived / .preview / .thumbs 는 파생물 디렉토리
|
|
if ".derived" in path.parts or ".preview" in path.parts or ".thumbs" in path.parts:
|
|
return True
|
|
# Roon 라이브러리 skip (설정된 경우만)
|
|
roon = settings.roon_library_path
|
|
if roon and str(path).startswith(roon):
|
|
return True
|
|
return False
|
|
|
|
|
|
def _route_media(path: Path, expected_category: str | None) -> tuple[str | None, bool, str | None]:
|
|
"""확장자 기반으로 (category, needs_conversion, next_stage) 결정.
|
|
|
|
- Inbox 드롭: expected_category=None — 문서 확장자면 기존 'extract' 파이프,
|
|
audio/video 확장자면 혼란 방지로 skip (사용자가 Recordings/Videos 로 넣도록 유도)
|
|
- Recordings 드롭: audio 확장자만 수락. 그 외는 skip (log)
|
|
- Videos 드롭: direct-play → category+thumbnail, quarantine → category만 (needs_conversion=true)
|
|
"""
|
|
ext = path.suffix.lower()
|
|
|
|
if expected_category == "audio":
|
|
if ext in AUDIO_EXTS:
|
|
return ("audio", False, "stt")
|
|
return (None, False, None) # audio 폴더에 엉뚱한 포맷 → skip
|
|
|
|
if expected_category == "video":
|
|
if ext in VIDEO_DIRECT_EXTS:
|
|
return ("video", False, "thumbnail")
|
|
if ext in VIDEO_QUARANTINE_EXTS:
|
|
# quarantine — category 설정하되 stage 안 걸어둠 (재생 불가 안내만)
|
|
return ("video", True, None)
|
|
return (None, False, None) # 기타 → skip
|
|
|
|
# Inbox: 문서 파이프 (기존). audio/video 확장자가 실수로 여기 들어오면 skip.
|
|
if ext in AUDIO_EXTS or ext in VIDEO_DIRECT_EXTS or ext in VIDEO_QUARANTINE_EXTS:
|
|
return (None, False, None)
|
|
return (None, False, "extract")
|
|
|
|
|
|
async def watch_inbox():
|
|
"""PKM 하위 디렉토리를 스캔하여 새/변경 파일을 DB 등록 + 파이프 투입."""
|
|
pkm_root = Path(settings.nas_mount_path) / "PKM"
|
|
if not pkm_root.exists():
|
|
return
|
|
|
|
new_count = 0
|
|
changed_count = 0
|
|
|
|
async with async_session() as session:
|
|
for sub, expected_category in SCAN_TARGETS:
|
|
scan_root = pkm_root / sub
|
|
if not scan_root.exists():
|
|
continue
|
|
|
|
for file_path in scan_root.rglob("*"):
|
|
if not file_path.is_file() or should_skip(file_path):
|
|
continue
|
|
|
|
category, needs_conversion, next_stage = _route_media(
|
|
file_path, expected_category
|
|
)
|
|
|
|
# audio/video 폴더에 엉뚱한 확장자가 들어왔거나 Inbox 에
|
|
# audio/video 가 잘못 떨어진 경우 — 이 라운드에서 아예 skip
|
|
if category is None and next_stage is None:
|
|
continue
|
|
|
|
rel_path = str(file_path.relative_to(Path(settings.nas_mount_path)))
|
|
fhash = file_hash(file_path)
|
|
|
|
result = await session.execute(
|
|
select(Document).where(Document.file_path == rel_path)
|
|
)
|
|
existing = result.scalar_one_or_none()
|
|
|
|
if existing is None:
|
|
ext = file_path.suffix.lstrip(".").lower() or "unknown"
|
|
doc = Document(
|
|
file_path=rel_path,
|
|
file_hash=fhash,
|
|
file_format=ext,
|
|
file_size=file_path.stat().st_size,
|
|
file_type="immutable",
|
|
title=file_path.stem,
|
|
source_channel="drive_sync",
|
|
category=category,
|
|
needs_conversion=needs_conversion,
|
|
)
|
|
session.add(doc)
|
|
await session.flush()
|
|
|
|
if next_stage:
|
|
await enqueue_stage(session, doc.id, next_stage)
|
|
new_count += 1
|
|
|
|
elif existing.file_hash != fhash:
|
|
existing.file_hash = fhash
|
|
existing.file_size = file_path.stat().st_size
|
|
# 기존 문서에 category/quarantine flag 가 비어있으면 보정
|
|
if existing.category is None and category is not None:
|
|
existing.category = category
|
|
if needs_conversion and not getattr(existing, "needs_conversion", False):
|
|
existing.needs_conversion = True
|
|
|
|
if next_stage:
|
|
await enqueue_stage(session, existing.id, next_stage)
|
|
changed_count += 1
|
|
|
|
await session.commit()
|
|
|
|
if new_count or changed_count:
|
|
logger.info(f"[Inbox+§3] 새 파일 {new_count}건, 변경 파일 {changed_count}건 등록")
|