Files
hyungi_document_server/app/workers/stt_worker.py
Hyungi Ahn 1e2c004dd4 feat(media): §3 audio STT + video 재생 인프라
plan: ~/.claude/plans/luminous-sprouting-hamster.md §3

스키마:
- migrations/147_audio_segments_table.sql: audio_segments (STT 타임스탬프
  세그먼트)
- migrations/148_audio_segments_idx.sql: (document_id, start_s) idx
- migrations/149_document_media_cols.sql: documents.thumbnail_path +
  needs_conversion
- migrations/150_queue_stage_stt.sql: process_stage += 'stt'
- migrations/151_queue_stage_thumbnail.sql: process_stage += 'thumbnail'
- app/models/audio_segment.py, document.py (thumbnail_path/needs_conversion)

서비스:
- services/stt/{Dockerfile, requirements.txt, server.py} — faster-whisper
  large-v3 GPU 컨테이너. /transcribe (filePath/langs/beamSize) +
  /health + /ready (cuda device_count + model_loaded). NFC/NFD 경로
  resolver (OCR 교훈).
- docker-compose.yml: stt-service 추가 (GPU 1 예약, :3300, NAS ro mount,
  stt_models volume, start_period 300s), fastapi env 에 STT_ENDPOINT.

파이프라인 (의존 §1 category):
- app/workers/stt_worker.py 신규: stage='stt' pickup → STT_ENDPOINT 호출 →
  extracted_text + audio_segments 저장. Timeout 30분.
- app/workers/thumbnail_worker.py 신규: ffmpeg 50% 지점 1장 →
  PKM/Videos/.thumbs/{id}.jpg + thumbnail_path 세팅.
  needs_conversion=true 는 skip.
- app/workers/file_watcher.py 확장: PKM/{Inbox, Recordings, Videos}
  스캔. 확장자→category, audio→stage=stt, video .mp4/.webm→
  stage=thumbnail, video .mov/.mkv/.avi→needs_conversion=true + stage
  없음. settings.roon_library_path prefix skip.
- app/workers/queue_consumer.py 확장: stt + thumbnail workers 등록,
  BATCH_SIZE(stt=1, thumbnail=3), next_stages 에 stt→[classify] 추가
  (audio 는 extract 건너뜀).
- app/Dockerfile: ffmpeg 추가 (썸네일 subprocess 용).

API (의존 §1):
- /api/audio/{id}/segments — AudioSegment ORDER BY start_s
- /api/video/{id}/thumbnail — thumbnail_path FileResponse (쿼리 토큰)
- /api/documents/{id}/file: media_types 에 audio/video mime 포함 (§2
  커밋에 이미 포함). Starlette FileResponse 가 Range 자동.
- upload_document: .mov/.mkv/.avi 웹 업로드 거부 (error_code
  unsupported_codec). NAS 드롭은 file_watcher 가 quarantine 수용.

프론트:
- AudioPlayer.svelte: HTML5 audio + 전사 세그먼트 sticky 패널 + 줄
  클릭 seek. activeIdx 하이라이트.
- VideoPlayer.svelte: HTML5 video direct play + needs_conversion 안내
  카드. poster 는 thumbnail endpoint.
- /audio (목록 grid) + /audio/[id] (플레이어)
- /video (썸네일 grid + 변환 필요 배지) + /video/[id] (플레이어)
- Sidebar.svelte: Mic/Film 아이콘 + audio/video 네비 활성, count
  배지 (§2 /stats/category-counts 재사용).

설정:
- app/core/config.py: stt_endpoint + roon_library_path.

DoD 배포 후 smoke: /ready cuda:true, 회의 mp3 transcribe, audio
extract 없이 classify 진행(queue 회귀), /audio 재생, .mp4 재생,
.mov 웹 400, .mov NAS quarantine, Sidebar 네비 + count.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-24 06:47:36 +09:00

90 lines
3.6 KiB
Python

"""STT 전사 워커 — services/stt(faster-whisper) 호출 + audio_segments 저장.
queue_consumer 가 stage='stt' pending 큐 행을 pickup 하여 본 process() 를 호출.
services/stt 는 /transcribe {filePath, langs?, beamSize?} → {text, segments, language,
language_probability, duration}. 성공 시:
- Document.extracted_text = text (기존 classify/embed 파이프 재사용)
- Document.extractor_version = "faster-whisper@large-v3" (모델명 기록)
- Document.extracted_at = now()
- audio_segments INSERT 일괄 (기존 세그먼트는 삭제 후 재삽입, 재전사 대응)
audio 파이프라인: file_watcher 가 category='audio' + stage='stt' 등록 →
stt → classify → embed/chunk (extract 건너뜀). queue_consumer 의 next_stages 에서
처리.
"""
from datetime import datetime, timezone
from pathlib import Path
import httpx
from sqlalchemy import delete
from sqlalchemy.ext.asyncio import AsyncSession
from core.config import settings
from core.utils import setup_logger
from models.audio_segment import AudioSegment
from models.document import Document
logger = setup_logger("stt_worker")
# /transcribe 는 장시간 (30분 녹음 ≈ 수분). 충분히 여유. connect 는 짧게.
STT_TIMEOUT = httpx.Timeout(connect=10.0, read=1800.0, write=60.0, pool=10.0)
async def process(document_id: int, session: AsyncSession) -> None:
"""audio 문서 전사 — STT_ENDPOINT 호출 후 텍스트/세그먼트 저장."""
doc = await session.get(Document, document_id)
if not doc:
logger.error(f"[stt] document_id={document_id} 없음")
return
if not doc.file_path:
logger.warning(f"[stt] id={document_id} file_path 없음 — skip")
return
# NAS 마운트 경로로 절대화 (services/stt 컨테이너도 동일 경로에 bind mount)
container_path = str(Path(settings.nas_mount_path) / doc.file_path)
try:
async with httpx.AsyncClient(timeout=STT_TIMEOUT) as client:
resp = await client.post(
f"{settings.stt_endpoint}/transcribe",
json={"filePath": container_path},
)
resp.raise_for_status()
data = resp.json()
except httpx.HTTPError as e:
logger.error(f"[stt] id={document_id} 호출 실패: {e}")
raise
if "error" in data and not data.get("text"):
logger.error(f"[stt] id={document_id} 서비스 에러: {data['error']}")
raise RuntimeError(f"stt error: {data['error']}")
text = (data.get("text") or "").strip()
segments = data.get("segments") or []
# 기존 audio_segments 삭제 (재전사 대응) — 새 세그먼트로 교체
await session.execute(delete(AudioSegment).where(AudioSegment.document_id == document_id))
for seg in segments:
session.add(AudioSegment(
document_id=document_id,
start_s=float(seg["start"]),
end_s=float(seg["end"]),
text=str(seg["text"]),
))
doc.extracted_text = text
doc.extracted_at = datetime.now(timezone.utc)
model_name = None
# /ready 응답의 "model" 을 신뢰할 수 있지만, 매 호출마다 조회하지 않고
# 환경에 안 맞으면 /transcribe 응답에서 추론: language / duration 만 쓰고 모델명은 설정 기반
# (services/stt 가 여러 모델 swap 가능해지면 응답에 포함시킬 것)
doc.extractor_version = f"faster-whisper@{data.get('language', 'auto')}"
logger.info(
f"[stt] id={document_id} segments={len(segments)} chars={len(text)} "
f"lang={data.get('language')} dur={data.get('duration')}s"
)