Files
hyungi b73a5cc601 feat(infra): 2노드 이관 P1-4 — rerank 프로토콜 스위치(tei|llamacpp)·OCR/STT 명시 게이트·413 재홈
- AIModelConfig.protocol 판별자 신설(기본 tei = 무회귀), llamacpp = /v1/rerank
  요청·응답 스키마 정규화(ai/rerank_protocol.py 순수함수 + 단위테스트 4)
- OCR_ENABLED/STT_ENABLED 명시 게이트 — GPU CUDA 서비스(Surya/faster-whisper)
  폐기 대응, silent 아님(경고 로그 + extract_meta 터미널 기록)
- DS Caddyfile request_body 100MB — 413 정책을 edge(home-caddy)에서 내부로 재홈
  (DSM 리버스 프록시 전환 대비, upload.max_bytes 정합)
- SSE X-Accel-Buffering는 기점검 결과 기구현(eid_chat)이라 무변경

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-07-02 13:11:06 +09:00

98 lines
4.0 KiB
Python

"""STT 전사 워커 — services/stt(faster-whisper) 호출 + audio_segments 저장.
queue_consumer 가 stage='stt' pending 큐 행을 pickup 하여 본 process() 를 호출.
services/stt 는 /transcribe {filePath, langs?, beamSize?} → {text, segments, language,
language_probability, duration}. 성공 시:
- Document.extracted_text = text (기존 classify/embed 파이프 재사용)
- Document.extractor_version = "faster-whisper@large-v3" (모델명 기록)
- Document.extracted_at = now()
- audio_segments INSERT 일괄 (기존 세그먼트는 삭제 후 재삽입, 재전사 대응)
audio 파이프라인: file_watcher 가 category='audio' + stage='stt' 등록 →
stt → classify → embed/chunk (extract 건너뜀). queue_consumer 의 next_stages 에서
처리.
"""
from datetime import datetime, timezone
from pathlib import Path
import httpx
from sqlalchemy import delete
from sqlalchemy.ext.asyncio import AsyncSession
from core.config import settings
from core.utils import setup_logger
from models.audio_segment import AudioSegment
from models.document import Document
logger = setup_logger("stt_worker")
# /transcribe 는 장시간 (30분 녹음 ≈ 수분). 충분히 여유. connect 는 짧게.
STT_TIMEOUT = httpx.Timeout(connect=10.0, read=1800.0, write=60.0, pool=10.0)
async def process(document_id: int, session: AsyncSession) -> None:
"""audio 문서 전사 — STT_ENDPOINT 호출 후 텍스트/세그먼트 저장."""
doc = await session.get(Document, document_id)
if not doc:
logger.error(f"[stt] document_id={document_id} 없음")
return
if not doc.file_path:
logger.warning(f"[stt] id={document_id} file_path 없음 — skip")
return
if not settings.stt_enabled:
# 2노드 이관(2026-07-02): GPU stt-service 폐기 — 명시 비활성. silent 금지:
# 경고 로그 + extract_meta 터미널 기록 (재시도 안 함, 상태 가시).
doc.extract_meta = {**(doc.extract_meta or {}), "stt_skip_reason": "disabled", "stt_terminal": True}
await session.commit()
logger.warning(f"[stt] id={document_id} STT_ENABLED=false — 터미널 skip (전사 없음)")
return
# NAS 마운트 경로로 절대화 (services/stt 컨테이너도 동일 경로에 bind mount)
container_path = str(Path(settings.nas_mount_path) / doc.file_path)
try:
async with httpx.AsyncClient(timeout=STT_TIMEOUT) as client:
resp = await client.post(
f"{settings.stt_endpoint}/transcribe",
json={"filePath": container_path},
)
resp.raise_for_status()
data = resp.json()
except httpx.HTTPError as e:
logger.error(f"[stt] id={document_id} 호출 실패: {e}")
raise
if "error" in data and not data.get("text"):
logger.error(f"[stt] id={document_id} 서비스 에러: {data['error']}")
raise RuntimeError(f"stt error: {data['error']}")
text = (data.get("text") or "").strip()
segments = data.get("segments") or []
# 기존 audio_segments 삭제 (재전사 대응) — 새 세그먼트로 교체
await session.execute(delete(AudioSegment).where(AudioSegment.document_id == document_id))
for seg in segments:
session.add(AudioSegment(
document_id=document_id,
start_s=float(seg["start"]),
end_s=float(seg["end"]),
text=str(seg["text"]),
))
doc.extracted_text = text
doc.extracted_at = datetime.now(timezone.utc)
model_name = None
# /ready 응답의 "model" 을 신뢰할 수 있지만, 매 호출마다 조회하지 않고
# 환경에 안 맞으면 /transcribe 응답에서 추론: language / duration 만 쓰고 모델명은 설정 기반
# (services/stt 가 여러 모델 swap 가능해지면 응답에 포함시킬 것)
doc.extractor_version = f"faster-whisper@{data.get('language', 'auto')}"
logger.info(
f"[stt] id={document_id} segments={len(segments)} chars={len(text)} "
f"lang={data.get('language')} dur={data.get('duration')}s"
)