cec464ae2d
stt: - services/stt/server.py: lazy → eager preload in FastAPI lifespan. STT_PRELOAD=0 으로 lazy 강제 가능 (개발/테스트). preload 실패해도 프로세스는 살아 있고 /ready false 로 남아 healthcheck 가 unhealthy 처리. - docker-compose.yml: healthcheck /health → /ready. /health 는 단순 liveness 라 모델 미적재 상태도 healthy 로 잡혀 운영 신호 부적합. queue ORM: - app/models/queue.py: process_stage enum 에 'stt'/'thumbnail' 추가 + create_type=False (migration 150/151 가 DB enum 확장 담당). 이게 없으면 stt_worker INSERT 시 SQLAlchemy 가 enum value 를 거부. dashboard 강화 (§4 선제, §3 신규 stage 까지 자동 커버): - app/api/dashboard.py: category_counts + library_pending_suggestions + queue_lag (stage 별 pending/processing/failed + oldest_pending_age_sec). - frontend/src/lib/stores/system.ts: QueueLag 타입 + DashboardSummary 확장. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
63 lines
2.4 KiB
Python
63 lines
2.4 KiB
Python
"""processing_queue 테이블 ORM (비동기 가공 큐)"""
|
|
|
|
from datetime import datetime
|
|
|
|
from sqlalchemy import BigInteger, DateTime, Enum, ForeignKey, SmallInteger, Text, text
|
|
from sqlalchemy.dialects.postgresql import insert as pg_insert
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
from sqlalchemy.orm import Mapped, mapped_column
|
|
|
|
from core.database import Base
|
|
|
|
|
|
class ProcessingQueue(Base):
|
|
__tablename__ = "processing_queue"
|
|
|
|
id: Mapped[int] = mapped_column(BigInteger, primary_key=True)
|
|
document_id: Mapped[int] = mapped_column(BigInteger, ForeignKey("documents.id"), nullable=False)
|
|
stage: Mapped[str] = mapped_column(
|
|
# 'stt' (audio): migration 150 / 'thumbnail' (video): queue_consumer 가 enqueue.
|
|
# DB enum 변경은 마이그레이션이 처리하므로 create_type=False.
|
|
Enum(
|
|
"extract", "classify", "summarize", "embed", "chunk", "preview",
|
|
"stt", "thumbnail",
|
|
name="process_stage",
|
|
create_type=False,
|
|
),
|
|
nullable=False,
|
|
)
|
|
status: Mapped[str] = mapped_column(
|
|
Enum("pending", "processing", "completed", "failed", name="process_status"),
|
|
default="pending"
|
|
)
|
|
attempts: Mapped[int] = mapped_column(SmallInteger, default=0)
|
|
max_attempts: Mapped[int] = mapped_column(SmallInteger, default=3)
|
|
error_message: Mapped[str | None] = mapped_column(Text)
|
|
created_at: Mapped[datetime] = mapped_column(
|
|
DateTime(timezone=True), default=datetime.now
|
|
)
|
|
started_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True))
|
|
completed_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True))
|
|
|
|
# DB 제약은 partial unique index uq_queue_active로 관리 (migration 117)
|
|
|
|
|
|
async def enqueue_stage(
|
|
session: AsyncSession, document_id: int, stage: str, *, status: str = "pending",
|
|
) -> bool:
|
|
"""ProcessingQueue에 행 추가 (DB 레벨 중복 방어).
|
|
|
|
같은 (document_id, stage)에 활성 행(pending/processing)이 이미 있으면
|
|
아무것도 하지 않고 False 반환.
|
|
"""
|
|
stmt = (
|
|
pg_insert(ProcessingQueue)
|
|
.values(document_id=document_id, stage=stage, status=status)
|
|
.on_conflict_do_nothing(
|
|
index_elements=["document_id", "stage"],
|
|
index_where=text("status IN ('pending', 'processing')"),
|
|
)
|
|
)
|
|
result = await session.execute(stmt)
|
|
return result.rowcount > 0
|