"""study_question_jobs ORM (Phase 4-A) — study 도메인 전용 비동기 작업 큐. processing_queue 가 documents.id FK 라 study_questions 에 직접 재사용 불가. 별도 테이블 + 별도 consumer (study_queue_consumer.py). kind 권장값: - 'explanation' (Phase 4-A): wrong/unsure 문제의 AI 풀이 prefetch - 'session_summary' (Phase 4-B 예약): 세션 단위 종합 분석. session_summary 는 question 단위에 얹기 어색해 Phase 4-B 구현 시 study_quiz_session_jobs 별도 분리 검토. terminal status (completed/failed/skipped) 는 completed_at 항상 기록. failed 재시도는 기존 row 를 pending 으로 되살리지 않고 새 row 생성 — 이력 누적. """ from __future__ import annotations from datetime import datetime from typing import Any from sqlalchemy import BigInteger, DateTime, ForeignKey, SmallInteger, String, Text, text from sqlalchemy.dialects.postgresql import JSONB, insert as pg_insert from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.orm import Mapped, mapped_column from core.database import Base class StudyQuestionJob(Base): __tablename__ = "study_question_jobs" id: Mapped[int] = mapped_column(BigInteger, primary_key=True) study_question_id: Mapped[int] = mapped_column( BigInteger, ForeignKey("study_questions.id", ondelete="CASCADE"), nullable=False ) user_id: Mapped[int] = mapped_column( BigInteger, ForeignKey("users.id", ondelete="CASCADE"), nullable=False ) kind: Mapped[str] = mapped_column(String(40), nullable=False) status: Mapped[str] = mapped_column(String(20), nullable=False, default="pending") attempts: Mapped[int] = mapped_column(SmallInteger, nullable=False, default=0) max_attempts: Mapped[int] = mapped_column(SmallInteger, nullable=False, default=2) error_code: Mapped[str | None] = mapped_column(String(40)) error_message: Mapped[str | None] = mapped_column(Text) payload: Mapped[dict | None] = mapped_column(JSONB) created_at: Mapped[datetime] = mapped_column( DateTime(timezone=True), default=datetime.now, nullable=False ) started_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True)) completed_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True)) # active partial unique idx 는 migration 232 가 관리. async def enqueue_study_question_job( session: AsyncSession, *, study_question_id: int, user_id: int, kind: str, payload: dict[str, Any] | None = None, ) -> bool: """study_question_jobs 에 행 추가 (DB 레벨 중복 방어). 같은 (study_question_id, kind) 에 활성 행 (pending/processing) 이 이미 있으면 아무것도 하지 않고 False 반환. terminal 이력은 별도 row 로 누적되므로 이번 호출이 failed/skipped/completed row 와 무관하게 새 active 행을 만들 수 있다. Returns: True = 새 enqueue 발생, False = 중복으로 건너뜀. """ values: dict[str, Any] = { "study_question_id": study_question_id, "user_id": user_id, "kind": kind, "status": "pending", } if payload is not None: values["payload"] = payload stmt = ( pg_insert(StudyQuestionJob) .values(**values) .on_conflict_do_nothing( index_elements=["study_question_id", "kind"], index_where=text("status IN ('pending', 'processing')"), ) ) result = await session.execute(stmt) return result.rowcount > 0