Files
hyungi_document_server/app/models/queue.py
T
Hyungi Ahn cec464ae2d fix(media): §3 ship-readiness — stt preload + healthcheck + queue enum + dashboard queue_lag
stt:
- services/stt/server.py: lazy → eager preload in FastAPI lifespan.
  STT_PRELOAD=0 으로 lazy 강제 가능 (개발/테스트). preload 실패해도
  프로세스는 살아 있고 /ready false 로 남아 healthcheck 가 unhealthy 처리.
- docker-compose.yml: healthcheck /health → /ready. /health 는 단순
  liveness 라 모델 미적재 상태도 healthy 로 잡혀 운영 신호 부적합.

queue ORM:
- app/models/queue.py: process_stage enum 에 'stt'/'thumbnail' 추가 +
  create_type=False (migration 150/151 가 DB enum 확장 담당). 이게
  없으면 stt_worker INSERT 시 SQLAlchemy 가 enum value 를 거부.

dashboard 강화 (§4 선제, §3 신규 stage 까지 자동 커버):
- app/api/dashboard.py: category_counts + library_pending_suggestions +
  queue_lag (stage 별 pending/processing/failed + oldest_pending_age_sec).
- frontend/src/lib/stores/system.ts: QueueLag 타입 + DashboardSummary 확장.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-24 07:04:52 +09:00

63 lines
2.4 KiB
Python

"""processing_queue 테이블 ORM (비동기 가공 큐)"""
from datetime import datetime
from sqlalchemy import BigInteger, DateTime, Enum, ForeignKey, SmallInteger, Text, text
from sqlalchemy.dialects.postgresql import insert as pg_insert
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import Mapped, mapped_column
from core.database import Base
class ProcessingQueue(Base):
__tablename__ = "processing_queue"
id: Mapped[int] = mapped_column(BigInteger, primary_key=True)
document_id: Mapped[int] = mapped_column(BigInteger, ForeignKey("documents.id"), nullable=False)
stage: Mapped[str] = mapped_column(
# 'stt' (audio): migration 150 / 'thumbnail' (video): queue_consumer 가 enqueue.
# DB enum 변경은 마이그레이션이 처리하므로 create_type=False.
Enum(
"extract", "classify", "summarize", "embed", "chunk", "preview",
"stt", "thumbnail",
name="process_stage",
create_type=False,
),
nullable=False,
)
status: Mapped[str] = mapped_column(
Enum("pending", "processing", "completed", "failed", name="process_status"),
default="pending"
)
attempts: Mapped[int] = mapped_column(SmallInteger, default=0)
max_attempts: Mapped[int] = mapped_column(SmallInteger, default=3)
error_message: Mapped[str | None] = mapped_column(Text)
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), default=datetime.now
)
started_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True))
completed_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True))
# DB 제약은 partial unique index uq_queue_active로 관리 (migration 117)
async def enqueue_stage(
session: AsyncSession, document_id: int, stage: str, *, status: str = "pending",
) -> bool:
"""ProcessingQueue에 행 추가 (DB 레벨 중복 방어).
같은 (document_id, stage)에 활성 행(pending/processing)이 이미 있으면
아무것도 하지 않고 False 반환.
"""
stmt = (
pg_insert(ProcessingQueue)
.values(document_id=document_id, stage=stage, status=status)
.on_conflict_do_nothing(
index_elements=["document_id", "stage"],
index_where=text("status IN ('pending', 'processing')"),
)
)
result = await session.execute(stmt)
return result.rowcount > 0