Files
hyungi_document_server/app/workers/queue_consumer.py
T
Hyungi Ahn 1e2c004dd4 feat(media): §3 audio STT + video 재생 인프라
plan: ~/.claude/plans/luminous-sprouting-hamster.md §3

스키마:
- migrations/147_audio_segments_table.sql: audio_segments (STT 타임스탬프
  세그먼트)
- migrations/148_audio_segments_idx.sql: (document_id, start_s) idx
- migrations/149_document_media_cols.sql: documents.thumbnail_path +
  needs_conversion
- migrations/150_queue_stage_stt.sql: process_stage += 'stt'
- migrations/151_queue_stage_thumbnail.sql: process_stage += 'thumbnail'
- app/models/audio_segment.py, document.py (thumbnail_path/needs_conversion)

서비스:
- services/stt/{Dockerfile, requirements.txt, server.py} — faster-whisper
  large-v3 GPU 컨테이너. /transcribe (filePath/langs/beamSize) +
  /health + /ready (cuda device_count + model_loaded). NFC/NFD 경로
  resolver (OCR 교훈).
- docker-compose.yml: stt-service 추가 (GPU 1 예약, :3300, NAS ro mount,
  stt_models volume, start_period 300s), fastapi env 에 STT_ENDPOINT.

파이프라인 (의존 §1 category):
- app/workers/stt_worker.py 신규: stage='stt' pickup → STT_ENDPOINT 호출 →
  extracted_text + audio_segments 저장. Timeout 30분.
- app/workers/thumbnail_worker.py 신규: ffmpeg 50% 지점 1장 →
  PKM/Videos/.thumbs/{id}.jpg + thumbnail_path 세팅.
  needs_conversion=true 는 skip.
- app/workers/file_watcher.py 확장: PKM/{Inbox, Recordings, Videos}
  스캔. 확장자→category, audio→stage=stt, video .mp4/.webm→
  stage=thumbnail, video .mov/.mkv/.avi→needs_conversion=true + stage
  없음. settings.roon_library_path prefix skip.
- app/workers/queue_consumer.py 확장: stt + thumbnail workers 등록,
  BATCH_SIZE(stt=1, thumbnail=3), next_stages 에 stt→[classify] 추가
  (audio 는 extract 건너뜀).
- app/Dockerfile: ffmpeg 추가 (썸네일 subprocess 용).

API (의존 §1):
- /api/audio/{id}/segments — AudioSegment ORDER BY start_s
- /api/video/{id}/thumbnail — thumbnail_path FileResponse (쿼리 토큰)
- /api/documents/{id}/file: media_types 에 audio/video mime 포함 (§2
  커밋에 이미 포함). Starlette FileResponse 가 Range 자동.
- upload_document: .mov/.mkv/.avi 웹 업로드 거부 (error_code
  unsupported_codec). NAS 드롭은 file_watcher 가 quarantine 수용.

프론트:
- AudioPlayer.svelte: HTML5 audio + 전사 세그먼트 sticky 패널 + 줄
  클릭 seek. activeIdx 하이라이트.
- VideoPlayer.svelte: HTML5 video direct play + needs_conversion 안내
  카드. poster 는 thumbnail endpoint.
- /audio (목록 grid) + /audio/[id] (플레이어)
- /video (썸네일 grid + 변환 필요 배지) + /video/[id] (플레이어)
- Sidebar.svelte: Mic/Film 아이콘 + audio/video 네비 활성, count
  배지 (§2 /stats/category-counts 재사용).

설정:
- app/core/config.py: stt_endpoint + roon_library_path.

DoD 배포 후 smoke: /ready cuda:true, 회의 mp3 transcribe, audio
extract 없이 classify 진행(queue 회귀), /audio 재생, .mp4 재생,
.mov 웹 400, .mov NAS quarantine, Sidebar 네비 + count.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-24 06:47:36 +09:00

229 lines
9.7 KiB
Python

"""처리 큐 소비자 — APScheduler에서 1분 간격으로 호출"""
from datetime import datetime, timedelta, timezone
from sqlalchemy import select, update, delete, exists
from sqlalchemy.exc import IntegrityError, SQLAlchemyError
from sqlalchemy.orm import aliased
from core.database import async_session
from core.utils import setup_logger
from models.queue import ProcessingQueue, enqueue_stage
logger = setup_logger("queue_consumer")
# stage별 배치 크기
# stt 는 GPU 단일 점유 + 회의 30분짜리도 가능 → 배치 1. thumbnail 은 ffmpeg subprocess 로 가벼움.
BATCH_SIZE = {"extract": 5, "classify": 3, "summarize": 3, "embed": 1, "chunk": 1,
"preview": 2, "stt": 1, "thumbnail": 3}
STALE_THRESHOLD_MINUTES = 10
async def reset_stale_items():
"""processing 상태로 오래 방치된 항목 복구
1) 같은 (document_id, stage)에 pending 행이 이미 있으면
stale processing 행은 중복이므로 삭제
2) pending이 없는 stale processing 행만 pending으로 되돌림
"""
cutoff = datetime.now(timezone.utc) - timedelta(minutes=STALE_THRESHOLD_MINUTES)
processing_row = aliased(ProcessingQueue)
pending_row = aliased(ProcessingQueue)
try:
async with async_session() as session:
# Step A: pending 중복이 이미 있는 stale processing 삭제
delete_stmt = (
delete(ProcessingQueue)
.where(
ProcessingQueue.id.in_(
select(processing_row.id).where(
processing_row.status == "processing",
processing_row.started_at.is_not(None),
processing_row.started_at < cutoff,
exists(
select(1).where(
pending_row.document_id == processing_row.document_id,
pending_row.stage == processing_row.stage,
pending_row.status == "pending",
)
),
)
)
)
)
delete_result = await session.execute(delete_stmt)
# Step B: pending 없는 stale processing만 pending으로 복구
recoverable_ids = select(processing_row.id).where(
processing_row.status == "processing",
processing_row.started_at.is_not(None),
processing_row.started_at < cutoff,
~exists(
select(1).where(
pending_row.document_id == processing_row.document_id,
pending_row.stage == processing_row.stage,
pending_row.status == "pending",
)
),
)
update_stmt = (
update(ProcessingQueue)
.where(ProcessingQueue.id.in_(recoverable_ids))
.values(status="pending", started_at=None)
)
update_result = await session.execute(update_stmt)
await session.commit()
deleted = delete_result.rowcount or 0
recovered = update_result.rowcount or 0
if deleted > 0:
logger.warning(
"deleted %s stale processing rows that already had pending duplicates",
deleted,
)
if recovered > 0:
logger.warning(
"recovered %s stale processing rows back to pending",
recovered,
)
except IntegrityError:
logger.exception("reset_stale_items failed with IntegrityError; skipping this cycle")
except SQLAlchemyError:
logger.exception("reset_stale_items failed with database error; skipping this cycle")
except Exception:
logger.exception("reset_stale_items failed unexpectedly; skipping this cycle")
async def enqueue_next_stage(document_id: int, current_stage: str):
"""현재 stage 완료 후 다음 stage를 pending으로 등록.
§3 추가:
stt → [classify] (audio 는 extract 건너뛰고 stt 가 extracted_text 를 채움)
thumbnail → [] (video 는 leaf — classify/embed 없음)
"""
next_stages = {
"extract": ["classify", "preview"],
"classify": ["embed", "chunk"],
"stt": ["classify"],
}
stages = next_stages.get(current_stage, [])
if not stages:
return
async with async_session() as session:
for next_stage in stages:
await enqueue_stage(session, document_id, next_stage)
await session.commit()
async def consume_queue():
"""큐에서 pending 항목을 가져와 stage별 워커 실행"""
from workers.classify_worker import process as classify_process
from workers.chunk_worker import process as chunk_process
from workers.embed_worker import process as embed_process
from workers.extract_worker import process as extract_process
from workers.preview_worker import process as preview_process
from workers.stt_worker import process as stt_process
from workers.summarize_worker import process as summarize_process
from workers.thumbnail_worker import process as thumbnail_process
workers = {
"extract": extract_process,
"classify": classify_process,
"summarize": summarize_process,
"embed": embed_process,
"chunk": chunk_process,
"preview": preview_process,
"stt": stt_process,
"thumbnail": thumbnail_process,
}
try:
await reset_stale_items()
except Exception:
logger.exception("stale reset failed, but continuing queue consumption")
for stage, worker_fn in workers.items():
batch_size = BATCH_SIZE.get(stage, 3)
# pending 항목 조회
async with async_session() as session:
result = await session.execute(
select(ProcessingQueue.id, ProcessingQueue.document_id)
.where(
ProcessingQueue.stage == stage,
ProcessingQueue.status == "pending",
)
.order_by(ProcessingQueue.created_at)
.limit(batch_size)
)
pending_items = result.all()
# 각 항목을 독립 세션에서 처리
for queue_id, document_id in pending_items:
# 상태를 processing으로 변경
async with async_session() as session:
item = await session.get(ProcessingQueue, queue_id)
if not item or item.status != "pending":
continue
item.status = "processing"
item.started_at = datetime.now(timezone.utc)
item.attempts += 1
await session.commit()
# 워커 실행 (독립 세션)
try:
# note(메모)는 이미 extracted_text가 있으므로 extract/preview skip
if stage in ("extract", "preview"):
from models.document import Document
async with async_session() as check_session:
doc = await check_session.get(Document, document_id)
if doc and doc.file_type == "note":
async with async_session() as skip_session:
item = await skip_session.get(ProcessingQueue, queue_id)
if item:
item.status = "completed"
item.completed_at = datetime.now(timezone.utc)
await skip_session.commit()
await enqueue_next_stage(document_id, stage)
logger.info(f"[{stage}] document_id={document_id} skip (note)")
continue
async with async_session() as worker_session:
await worker_fn(document_id, worker_session)
await worker_session.commit()
# 완료 처리
async with async_session() as session:
item = await session.get(ProcessingQueue, queue_id)
if not item:
logger.warning(f"[{stage}] queue_id={queue_id} 없음 (삭제됨?), skip")
continue
item.status = "completed"
item.completed_at = datetime.now(timezone.utc)
await session.commit()
await enqueue_next_stage(document_id, stage)
logger.info(f"[{stage}] document_id={document_id} 완료")
except Exception as e:
# 실패 처리
async with async_session() as session:
item = await session.get(ProcessingQueue, queue_id)
if not item:
logger.warning(f"[{stage}] queue_id={queue_id} 없음 (삭제됨?), skip")
continue
# 빈 메시지 방어: str → repr → 클래스명 순 fallback
err_text = str(e) or repr(e) or type(e).__name__
item.error_message = err_text[:500]
if item.attempts >= item.max_attempts:
item.status = "failed"
logger.error(f"[{stage}] document_id={document_id} 영구 실패: {e}")
else:
item.status = "pending"
item.started_at = None
logger.warning(f"[{stage}] document_id={document_id} 재시도 예정 ({item.attempts}/{item.max_attempts}): {e}")
await session.commit()