"""processing_queue 테이블 ORM (비동기 가공 큐)""" from datetime import datetime from sqlalchemy import BigInteger, DateTime, Enum, ForeignKey, SmallInteger, Text, func, or_, text from sqlalchemy.dialects.postgresql import JSONB, insert as pg_insert from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.orm import Mapped, mapped_column from sqlalchemy.types import TIMESTAMP from core.database import Base class StageDeferred(Exception): """워커가 '지금은 처리 불가 — 자료 손상 없이 보류' 를 선언하는 신호 (ds-macbook-offload-1). 맥북(M5 Max) deep 슬롯 경로 전용: 503(upstream_cold/editor_busy/warming) · 연결 실패 · 생성 중 절단(read-timeout, 맥북 sleep) 시 raise. queue_consumer/queue_drain 이 attempts 를 소모하지 않고 pending 복귀 + payload.deferred_until 백오프를 기록한다. 결과 쓰기는 호출 완주 + 파싱 성공 후에만 일어나므로 어느 시점에 끊겨도 부분 쓰기 0 (sleep-안전 불변식). """ def __init__(self, reason: str, retry_after_minutes: int = 30): super().__init__(reason) self.retry_after_minutes = retry_after_minutes def not_deferred_condition(): """보류 백오프(payload.deferred_until, ISO 문자열) 가 미래인 행을 claim 에서 제외. payload 없음 / 키 없음 = 통과. queue_consumer 와 queue_drain 의 claim 이 공유한다. """ deferred = ProcessingQueue.payload["deferred_until"].astext return or_( deferred.is_(None), deferred.cast(TIMESTAMP(timezone=True)) <= func.now(), ) class ProcessingQueue(Base): __tablename__ = "processing_queue" id: Mapped[int] = mapped_column(BigInteger, primary_key=True) document_id: Mapped[int] = mapped_column(BigInteger, ForeignKey("documents.id"), nullable=False) stage: Mapped[str] = mapped_column( # 'stt' (audio): migration 150 / 'thumbnail' (video): queue_consumer 가 enqueue. # 'deep_summary' (PR-B B-1): classify_worker 가 에스컬레이션 시 enqueue. # 'fulltext' (crawl-24x7 A-2): migration 321 — 기사 페이지 fetch 후 본문 승격. # DB enum 변경은 마이그레이션이 처리하므로 create_type=False. Enum( "extract", "classify", "summarize", "embed", "chunk", "preview", "stt", "thumbnail", "deep_summary", "markdown", "fulltext", name="process_stage", create_type=False, ), nullable=False, ) status: Mapped[str] = mapped_column( Enum("pending", "processing", "completed", "failed", name="process_status"), default="pending" ) attempts: Mapped[int] = mapped_column(SmallInteger, default=0) max_attempts: Mapped[int] = mapped_column(SmallInteger, default=3) error_message: Mapped[str | None] = mapped_column(Text) # B-1: deep_summary stage 가 EscalationEnvelope 를 payload 로 싣는다. 다른 stage 는 NULL. payload: Mapped[dict | None] = mapped_column(JSONB) created_at: Mapped[datetime] = mapped_column( DateTime(timezone=True), default=datetime.now ) started_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True)) completed_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True)) # DB 제약은 partial unique index uq_queue_active로 관리 (migration 117) async def enqueue_stage( session: AsyncSession, document_id: int, stage: str, *, status: str = "pending", payload: dict | None = None, ) -> bool: """ProcessingQueue에 행 추가 (DB 레벨 중복 방어). 같은 (document_id, stage)에 활성 행(pending/processing)이 이미 있으면 아무것도 하지 않고 False 반환. B-1: payload 옵션으로 deep_summary 에 EscalationEnvelope JSON 을 실을 수 있다. 같은 문서 deep_summary 가 재제안될 경우 on_conflict_do_nothing 으로 기존 payload 유지 (최초 envelope 가 원본). 이후 재처리 시 재분석은 새 classify 가 트리거. """ values: dict = {"document_id": document_id, "stage": stage, "status": status} if payload is not None: values["payload"] = payload stmt = ( pg_insert(ProcessingQueue) .values(**values) .on_conflict_do_nothing( index_elements=["document_id", "stage"], index_where=text("status IN ('pending', 'processing')"), ) ) result = await session.execute(stmt) return result.rowcount > 0