"""Phase 4-A 풀이 prefetch enqueue 헬퍼 — finalize_session + 결과 GET fallback 공유. 대상 조건: - ai_explanation_status IN ('none', 'failed') OR ai_explanation IS NULL - skipped / stale 은 자동 enqueue 대상 X (각각 자료 추가 / 명시 [다시 생성] 트리거) - 같은 (qid, kind='explanation') 의 최신 study_question_jobs.error_code 가 guard_fail 또는 evidence_missing 이면 제외 (자동 재시도 금지 사유) Plan: ~/.claude/plans/nifty-sparking-spindle.md """ from __future__ import annotations import logging from typing import Iterable from sqlalchemy import or_, select from sqlalchemy.ext.asyncio import AsyncSession from models.study_question import StudyQuestion from models.study_question_job import StudyQuestionJob, enqueue_study_question_job logger = logging.getLogger(__name__) KIND_EXPLANATION = "explanation" NO_AUTO_RETRY_ERROR_CODES = ("guard_fail", "evidence_missing") async def filter_needs_explanation( session: AsyncSession, qids: Iterable[int] ) -> list[int]: """주어진 qid 중 자동 enqueue 대상만 추려서 반환. 1차 SQL: study_questions 자체 조건 (status / ai_explanation null) 2차 Python: 각 후보의 최신 study_question_jobs error_code 가 자동 재시도 금지 사유면 제외 """ qids = list(qids) if not qids: return [] rows = ( await session.execute( select(StudyQuestion.id).where( StudyQuestion.id.in_(qids), StudyQuestion.deleted_at.is_(None), or_( StudyQuestion.ai_explanation_status.in_(("none", "failed")), StudyQuestion.ai_explanation.is_(None), ), # skipped / stale 은 자동 enqueue 대상 X StudyQuestion.ai_explanation_status.notin_(("skipped", "stale", "ready", "pending")), ) ) ).scalars().all() candidate_ids = list(rows) if not candidate_ids: return [] # 각 후보의 최신 job (id DESC) 의 error_code — Python 에서 첫 row 만 채택 job_rows = ( await session.execute( select(StudyQuestionJob.study_question_id, StudyQuestionJob.error_code) .where( StudyQuestionJob.study_question_id.in_(candidate_ids), StudyQuestionJob.kind == KIND_EXPLANATION, ) .order_by( StudyQuestionJob.study_question_id.asc(), StudyQuestionJob.id.desc(), ) ) ).all() latest_error_by_qid: dict[int, str | None] = {} for r in job_rows: # 같은 qid 의 첫 등장 row = 최신 (id DESC 정렬) if r.study_question_id not in latest_error_by_qid: latest_error_by_qid[r.study_question_id] = r.error_code return [ qid for qid in candidate_ids if latest_error_by_qid.get(qid) not in NO_AUTO_RETRY_ERROR_CODES ] async def enqueue_explanation_for_qids( session: AsyncSession, *, user_id: int, qids: Iterable[int], max_count: int | None = None, ) -> dict[str, int]: """주어진 qid 묶음에 대해 enqueue 수행. caller 가 commit 책임. max_count: 한 호출에 enqueue 할 최대 행 수 (GET fallback 30 cap). Returns: {'enqueue_count': N, 'skipped_count': M, 'candidate_count': K} """ candidates = await filter_needs_explanation(session, qids) if max_count is not None: candidates = candidates[:max_count] enqueue_count = 0 skipped_count = 0 for qid in candidates: ok = await enqueue_study_question_job( session, study_question_id=qid, user_id=user_id, kind=KIND_EXPLANATION, ) if ok: enqueue_count += 1 else: # 이미 active 행 있어 on_conflict_do_nothing — 정상. skipped_count += 1 return { "enqueue_count": enqueue_count, "skipped_count": skipped_count, "candidate_count": len(candidates), }