e8da53490c
PR-3 의 결과 화면 [AI 해설 보기] 실시간 호출이 클릭 시 8~30초 대기. 풀이 직후
백그라운드 batch 로 미리 생성해 캐시 hit. 환각 가드는 PR-3 보다 강화 — envelope
JSON {answer_choice, explanation_md, confidence} + answer_choice == correct_choice
검증 + evidence 의무.
processing_queue 가 documents.id FK 라 study_questions 에 직접 재사용 불가 →
별도 study_question_jobs 테이블 + 별도 consumer.
Backend:
- migrations/231 — study_question_jobs CREATE TABLE (13컬럼, kind 권장값
'explanation' / 'session_summary' 예약, status pending/processing/completed/
failed/skipped, max_attempts=2)
- migrations/232 — partial unique idx (qid, kind) WHERE status IN
(pending, processing) — active 행 중복 차단, terminal 이력 누적 허용
- models/study_question_job — ORM + enqueue_study_question_job() 헬퍼
(on_conflict_do_nothing 멱등)
- prompts/study_explanation_envelope.txt — envelope 형식 프롬프트
(answer_choice 1~4 강제, confidence high/medium/low)
- workers/study_explanation_worker — terminal status 분기:
· evidence 둘 다 빈 리스트 → job/question 모두 skipped (LLM 호출 X)
· answer_choice != correct_choice → guard_fail / failed (재시도 X)
· timeout/parse → 재시도 후보 (max_attempts=2)
· catch-all except → unknown 명시 + retryable 분기
· question.ai_explanation_status='ready' 이미 박혀있으면 즉시 completed
· confidence 는 job.payload 에 보존 (운영 분석)
- workers/study_queue_consumer — APScheduler 1분 주기, BATCH_SIZE=1, MLX gate
Semaphore(1) 공유. STALE_MINUTES=10 자체 복구
- main.py — scheduler.add_job(consume_study_queue, ..., id='study_queue_consumer')
- services/study/explanation_enqueue — finalize + GET fallback 공유 헬퍼:
filter_needs_explanation (study_questions status + 최신 job error_code 필터,
guard_fail/evidence_missing 인 마지막 job 은 자동 재enqueue 제외) +
enqueue_explanation_for_qids (max_count cap)
- session_finalize — 끝에서 wrong/unsure qid prefetch enqueue (best-effort,
실패해도 finalize 자체 안 깨짐)
- api/study_topics get_quiz_session — done 세션에서 backfill enqueue (max=30,
non-blocking, debug 로그)
대상 조건: ai_explanation_status IN ('none', 'failed') OR ai_explanation IS NULL.
stale / skipped / pending / ready 는 자동 enqueue 대상 X. stale 재생성은 PR-3
명시 [다시 생성] 또는 후속 Phase 에서.
Plan: ~/.claude/plans/nifty-sparking-spindle.md (Phase 4-A)
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
88 lines
3.5 KiB
Python
88 lines
3.5 KiB
Python
"""study_question_jobs ORM (Phase 4-A) — study 도메인 전용 비동기 작업 큐.
|
|
|
|
processing_queue 가 documents.id FK 라 study_questions 에 직접 재사용 불가.
|
|
별도 테이블 + 별도 consumer (study_queue_consumer.py).
|
|
|
|
kind 권장값:
|
|
- 'explanation' (Phase 4-A): wrong/unsure 문제의 AI 풀이 prefetch
|
|
- 'session_summary' (Phase 4-B 예약): 세션 단위 종합 분석. session_summary 는 question
|
|
단위에 얹기 어색해 Phase 4-B 구현 시 study_quiz_session_jobs 별도 분리 검토.
|
|
|
|
terminal status (completed/failed/skipped) 는 completed_at 항상 기록.
|
|
failed 재시도는 기존 row 를 pending 으로 되살리지 않고 새 row 생성 — 이력 누적.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
from datetime import datetime
|
|
from typing import Any
|
|
|
|
from sqlalchemy import BigInteger, DateTime, ForeignKey, SmallInteger, String, Text, text
|
|
from sqlalchemy.dialects.postgresql import JSONB, insert as pg_insert
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
from sqlalchemy.orm import Mapped, mapped_column
|
|
|
|
from core.database import Base
|
|
|
|
|
|
class StudyQuestionJob(Base):
|
|
__tablename__ = "study_question_jobs"
|
|
|
|
id: Mapped[int] = mapped_column(BigInteger, primary_key=True)
|
|
study_question_id: Mapped[int] = mapped_column(
|
|
BigInteger, ForeignKey("study_questions.id", ondelete="CASCADE"), nullable=False
|
|
)
|
|
user_id: Mapped[int] = mapped_column(
|
|
BigInteger, ForeignKey("users.id", ondelete="CASCADE"), nullable=False
|
|
)
|
|
kind: Mapped[str] = mapped_column(String(40), nullable=False)
|
|
status: Mapped[str] = mapped_column(String(20), nullable=False, default="pending")
|
|
attempts: Mapped[int] = mapped_column(SmallInteger, nullable=False, default=0)
|
|
max_attempts: Mapped[int] = mapped_column(SmallInteger, nullable=False, default=2)
|
|
error_code: Mapped[str | None] = mapped_column(String(40))
|
|
error_message: Mapped[str | None] = mapped_column(Text)
|
|
payload: Mapped[dict | None] = mapped_column(JSONB)
|
|
created_at: Mapped[datetime] = mapped_column(
|
|
DateTime(timezone=True), default=datetime.now, nullable=False
|
|
)
|
|
started_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True))
|
|
completed_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True))
|
|
|
|
# active partial unique idx 는 migration 232 가 관리.
|
|
|
|
|
|
async def enqueue_study_question_job(
|
|
session: AsyncSession,
|
|
*,
|
|
study_question_id: int,
|
|
user_id: int,
|
|
kind: str,
|
|
payload: dict[str, Any] | None = None,
|
|
) -> bool:
|
|
"""study_question_jobs 에 행 추가 (DB 레벨 중복 방어).
|
|
|
|
같은 (study_question_id, kind) 에 활성 행 (pending/processing) 이 이미 있으면
|
|
아무것도 하지 않고 False 반환. terminal 이력은 별도 row 로 누적되므로 이번 호출이
|
|
failed/skipped/completed row 와 무관하게 새 active 행을 만들 수 있다.
|
|
|
|
Returns: True = 새 enqueue 발생, False = 중복으로 건너뜀.
|
|
"""
|
|
values: dict[str, Any] = {
|
|
"study_question_id": study_question_id,
|
|
"user_id": user_id,
|
|
"kind": kind,
|
|
"status": "pending",
|
|
}
|
|
if payload is not None:
|
|
values["payload"] = payload
|
|
stmt = (
|
|
pg_insert(StudyQuestionJob)
|
|
.values(**values)
|
|
.on_conflict_do_nothing(
|
|
index_elements=["study_question_id", "kind"],
|
|
index_where=text("status IN ('pending', 'processing')"),
|
|
)
|
|
)
|
|
result = await session.execute(stmt)
|
|
return result.rowcount > 0
|