Files
hyungi_document_server/app/services/study/explanation_enqueue.py
Hyungi Ahn e8da53490c feat(study): Phase 4-A wrong/unsure AI 풀이 prefetch batch
PR-3 의 결과 화면 [AI 해설 보기] 실시간 호출이 클릭 시 8~30초 대기. 풀이 직후
백그라운드 batch 로 미리 생성해 캐시 hit. 환각 가드는 PR-3 보다 강화 — envelope
JSON {answer_choice, explanation_md, confidence} + answer_choice == correct_choice
검증 + evidence 의무.

processing_queue 가 documents.id FK 라 study_questions 에 직접 재사용 불가 →
별도 study_question_jobs 테이블 + 별도 consumer.

Backend:
- migrations/231 — study_question_jobs CREATE TABLE (13컬럼, kind 권장값
  'explanation' / 'session_summary' 예약, status pending/processing/completed/
  failed/skipped, max_attempts=2)
- migrations/232 — partial unique idx (qid, kind) WHERE status IN
  (pending, processing) — active 행 중복 차단, terminal 이력 누적 허용
- models/study_question_job — ORM + enqueue_study_question_job() 헬퍼
  (on_conflict_do_nothing 멱등)
- prompts/study_explanation_envelope.txt — envelope 형식 프롬프트
  (answer_choice 1~4 강제, confidence high/medium/low)
- workers/study_explanation_worker — terminal status 분기:
  · evidence 둘 다 빈 리스트 → job/question 모두 skipped (LLM 호출 X)
  · answer_choice != correct_choice → guard_fail / failed (재시도 X)
  · timeout/parse → 재시도 후보 (max_attempts=2)
  · catch-all except → unknown 명시 + retryable 분기
  · question.ai_explanation_status='ready' 이미 박혀있으면 즉시 completed
  · confidence 는 job.payload 에 보존 (운영 분석)
- workers/study_queue_consumer — APScheduler 1분 주기, BATCH_SIZE=1, MLX gate
  Semaphore(1) 공유. STALE_MINUTES=10 자체 복구
- main.py — scheduler.add_job(consume_study_queue, ..., id='study_queue_consumer')
- services/study/explanation_enqueue — finalize + GET fallback 공유 헬퍼:
  filter_needs_explanation (study_questions status + 최신 job error_code 필터,
  guard_fail/evidence_missing 인 마지막 job 은 자동 재enqueue 제외) +
  enqueue_explanation_for_qids (max_count cap)
- session_finalize — 끝에서 wrong/unsure qid prefetch enqueue (best-effort,
  실패해도 finalize 자체 안 깨짐)
- api/study_topics get_quiz_session — done 세션에서 backfill enqueue (max=30,
  non-blocking, debug 로그)

대상 조건: ai_explanation_status IN ('none', 'failed') OR ai_explanation IS NULL.
stale / skipped / pending / ready 는 자동 enqueue 대상 X. stale 재생성은 PR-3
명시 [다시 생성] 또는 후속 Phase 에서.

Plan: ~/.claude/plans/nifty-sparking-spindle.md (Phase 4-A)

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-01 11:42:08 +09:00

120 lines
4.0 KiB
Python

"""Phase 4-A 풀이 prefetch enqueue 헬퍼 — finalize_session + 결과 GET fallback 공유.
대상 조건:
- ai_explanation_status IN ('none', 'failed') OR ai_explanation IS NULL
- skipped / stale 은 자동 enqueue 대상 X (각각 자료 추가 / 명시 [다시 생성] 트리거)
- 같은 (qid, kind='explanation') 의 최신 study_question_jobs.error_code 가
guard_fail 또는 evidence_missing 이면 제외 (자동 재시도 금지 사유)
Plan: ~/.claude/plans/nifty-sparking-spindle.md
"""
from __future__ import annotations
import logging
from typing import Iterable
from sqlalchemy import or_, select
from sqlalchemy.ext.asyncio import AsyncSession
from models.study_question import StudyQuestion
from models.study_question_job import StudyQuestionJob, enqueue_study_question_job
logger = logging.getLogger(__name__)
KIND_EXPLANATION = "explanation"
NO_AUTO_RETRY_ERROR_CODES = ("guard_fail", "evidence_missing")
async def filter_needs_explanation(
session: AsyncSession, qids: Iterable[int]
) -> list[int]:
"""주어진 qid 중 자동 enqueue 대상만 추려서 반환.
1차 SQL: study_questions 자체 조건 (status / ai_explanation null)
2차 Python: 각 후보의 최신 study_question_jobs error_code 가 자동 재시도 금지 사유면 제외
"""
qids = list(qids)
if not qids:
return []
rows = (
await session.execute(
select(StudyQuestion.id).where(
StudyQuestion.id.in_(qids),
StudyQuestion.deleted_at.is_(None),
or_(
StudyQuestion.ai_explanation_status.in_(("none", "failed")),
StudyQuestion.ai_explanation.is_(None),
),
# skipped / stale 은 자동 enqueue 대상 X
StudyQuestion.ai_explanation_status.notin_(("skipped", "stale", "ready", "pending")),
)
)
).scalars().all()
candidate_ids = list(rows)
if not candidate_ids:
return []
# 각 후보의 최신 job (id DESC) 의 error_code — Python 에서 첫 row 만 채택
job_rows = (
await session.execute(
select(StudyQuestionJob.study_question_id, StudyQuestionJob.error_code)
.where(
StudyQuestionJob.study_question_id.in_(candidate_ids),
StudyQuestionJob.kind == KIND_EXPLANATION,
)
.order_by(
StudyQuestionJob.study_question_id.asc(),
StudyQuestionJob.id.desc(),
)
)
).all()
latest_error_by_qid: dict[int, str | None] = {}
for r in job_rows:
# 같은 qid 의 첫 등장 row = 최신 (id DESC 정렬)
if r.study_question_id not in latest_error_by_qid:
latest_error_by_qid[r.study_question_id] = r.error_code
return [
qid for qid in candidate_ids
if latest_error_by_qid.get(qid) not in NO_AUTO_RETRY_ERROR_CODES
]
async def enqueue_explanation_for_qids(
session: AsyncSession,
*,
user_id: int,
qids: Iterable[int],
max_count: int | None = None,
) -> dict[str, int]:
"""주어진 qid 묶음에 대해 enqueue 수행. caller 가 commit 책임.
max_count: 한 호출에 enqueue 할 최대 행 수 (GET fallback 30 cap).
Returns: {'enqueue_count': N, 'skipped_count': M, 'candidate_count': K}
"""
candidates = await filter_needs_explanation(session, qids)
if max_count is not None:
candidates = candidates[:max_count]
enqueue_count = 0
skipped_count = 0
for qid in candidates:
ok = await enqueue_study_question_job(
session,
study_question_id=qid,
user_id=user_id,
kind=KIND_EXPLANATION,
)
if ok:
enqueue_count += 1
else:
# 이미 active 행 있어 on_conflict_do_nothing — 정상.
skipped_count += 1
return {
"enqueue_count": enqueue_count,
"skipped_count": skipped_count,
"candidate_count": len(candidates),
}