feat(study): Phase 4-A wrong/unsure AI 풀이 prefetch batch

PR-3 의 결과 화면 [AI 해설 보기] 실시간 호출이 클릭 시 8~30초 대기. 풀이 직후
백그라운드 batch 로 미리 생성해 캐시 hit. 환각 가드는 PR-3 보다 강화 — envelope
JSON {answer_choice, explanation_md, confidence} + answer_choice == correct_choice
검증 + evidence 의무.

processing_queue 가 documents.id FK 라 study_questions 에 직접 재사용 불가 →
별도 study_question_jobs 테이블 + 별도 consumer.

Backend:
- migrations/231 — study_question_jobs CREATE TABLE (13컬럼, kind 권장값
  'explanation' / 'session_summary' 예약, status pending/processing/completed/
  failed/skipped, max_attempts=2)
- migrations/232 — partial unique idx (qid, kind) WHERE status IN
  (pending, processing) — active 행 중복 차단, terminal 이력 누적 허용
- models/study_question_job — ORM + enqueue_study_question_job() 헬퍼
  (on_conflict_do_nothing 멱등)
- prompts/study_explanation_envelope.txt — envelope 형식 프롬프트
  (answer_choice 1~4 강제, confidence high/medium/low)
- workers/study_explanation_worker — terminal status 분기:
  · evidence 둘 다 빈 리스트 → job/question 모두 skipped (LLM 호출 X)
  · answer_choice != correct_choice → guard_fail / failed (재시도 X)
  · timeout/parse → 재시도 후보 (max_attempts=2)
  · catch-all except → unknown 명시 + retryable 분기
  · question.ai_explanation_status='ready' 이미 박혀있으면 즉시 completed
  · confidence 는 job.payload 에 보존 (운영 분석)
- workers/study_queue_consumer — APScheduler 1분 주기, BATCH_SIZE=1, MLX gate
  Semaphore(1) 공유. STALE_MINUTES=10 자체 복구
- main.py — scheduler.add_job(consume_study_queue, ..., id='study_queue_consumer')
- services/study/explanation_enqueue — finalize + GET fallback 공유 헬퍼:
  filter_needs_explanation (study_questions status + 최신 job error_code 필터,
  guard_fail/evidence_missing 인 마지막 job 은 자동 재enqueue 제외) +
  enqueue_explanation_for_qids (max_count cap)
- session_finalize — 끝에서 wrong/unsure qid prefetch enqueue (best-effort,
  실패해도 finalize 자체 안 깨짐)
- api/study_topics get_quiz_session — done 세션에서 backfill enqueue (max=30,
  non-blocking, debug 로그)

대상 조건: ai_explanation_status IN ('none', 'failed') OR ai_explanation IS NULL.
stale / skipped / pending / ready 는 자동 enqueue 대상 X. stale 재생성은 PR-3
명시 [다시 생성] 또는 후속 Phase 에서.

Plan: ~/.claude/plans/nifty-sparking-spindle.md (Phase 4-A)

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Hyungi Ahn
2026-05-01 11:42:08 +09:00
parent 711b81f8f0
commit e8da53490c
10 changed files with 660 additions and 0 deletions
+29
View File
@@ -1805,6 +1805,35 @@ async def get_quiz_session(
]
summary = await _build_session_summary(qs, session, include_progress_counts=True)
# Phase 4-A: 결과 화면 GET fallback — finalize enqueue 누락 또는 worker 처리 전
# 사용자가 결과 들어온 경우 같은 wrong/unsure qid 를 idempotent backfill enqueue.
# 한 요청당 30 cap + non-blocking + debug 로그. 실패해도 GET 200 유지.
if qs.status == "done":
try:
from services.study.explanation_enqueue import enqueue_explanation_for_qids
wrong_unsure_qids = [
a.study_question_id for a in attempt_rows
if a.outcome in ("wrong", "unsure")
]
if wrong_unsure_qids:
res = await enqueue_explanation_for_qids(
session,
user_id=user.id,
qids=wrong_unsure_qids,
max_count=30, # GET fallback cap
)
await session.commit()
logger.debug(
"phase4a_get_backfill session=%s candidates=%s enqueued=%s skipped=%s",
qs.id, res["candidate_count"], res["enqueue_count"], res["skipped_count"],
)
except Exception as e:
logger.warning(
"phase4a_get_backfill_failed session=%s: %s: %s",
qs.id, type(e).__name__, e,
)
return QuizSessionDetailResponse(
summary=summary,
questions=questions_payload,
+4
View File
@@ -44,6 +44,7 @@ async def lifespan(app: FastAPI):
from workers.mailplus_archive import run as mailplus_run
from workers.news_collector import run as news_collector_run
from workers.queue_consumer import consume_queue
from workers.study_queue_consumer import consume_study_queue
from workers.study_question_embed_worker import (
refresh_stale_related as study_q_related_refresh,
run as study_q_embed_run,
@@ -75,6 +76,9 @@ async def lifespan(app: FastAPI):
# PR-12-A 후속: related-types 캐시 stale 행 재계산. 임베딩 워커와 분리한 별도 cron.
# 새 문제 ready / 같은 토픽 invalidation / 임계값 변경 시 NULL 마킹된 행을 batch=20 처리.
scheduler.add_job(study_q_related_refresh, "interval", minutes=1, id="study_q_related_refresh")
# Phase 4-A: study_question_jobs 처리 — wrong/unsure AI 풀이 prefetch.
# MLX gate 직렬화 + BATCH_SIZE=1 로 GPU 부하 통제. STALE_MINUTES=10 자체 복구.
scheduler.add_job(consume_study_queue, "interval", minutes=1, id="study_queue_consumer")
# PR-B 레거시 tier 백필 — 30분 주기로 호출되지만 KST 00:00~06:00 시간대만 실제 enqueue.
# safety > law > manual 우선순위로 25건씩. 6720 레거시 → 야간당 ~150건 → 약 45일 소화.
scheduler.add_job(tier_backfill_run, "interval", minutes=30, id="tier_backfill")
+87
View File
@@ -0,0 +1,87 @@
"""study_question_jobs ORM (Phase 4-A) — study 도메인 전용 비동기 작업 큐.
processing_queue documents.id FK study_questions 직접 재사용 불가.
별도 테이블 + 별도 consumer (study_queue_consumer.py).
kind 권장값:
- 'explanation' (Phase 4-A): wrong/unsure 문제의 AI 풀이 prefetch
- 'session_summary' (Phase 4-B 예약): 세션 단위 종합 분석. session_summary question
단위에 얹기 어색해 Phase 4-B 구현 study_quiz_session_jobs 별도 분리 검토.
terminal status (completed/failed/skipped) completed_at 항상 기록.
failed 재시도는 기존 row pending 으로 되살리지 않고 row 생성 이력 누적.
"""
from __future__ import annotations
from datetime import datetime
from typing import Any
from sqlalchemy import BigInteger, DateTime, ForeignKey, SmallInteger, String, Text, text
from sqlalchemy.dialects.postgresql import JSONB, insert as pg_insert
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import Mapped, mapped_column
from core.database import Base
class StudyQuestionJob(Base):
__tablename__ = "study_question_jobs"
id: Mapped[int] = mapped_column(BigInteger, primary_key=True)
study_question_id: Mapped[int] = mapped_column(
BigInteger, ForeignKey("study_questions.id", ondelete="CASCADE"), nullable=False
)
user_id: Mapped[int] = mapped_column(
BigInteger, ForeignKey("users.id", ondelete="CASCADE"), nullable=False
)
kind: Mapped[str] = mapped_column(String(40), nullable=False)
status: Mapped[str] = mapped_column(String(20), nullable=False, default="pending")
attempts: Mapped[int] = mapped_column(SmallInteger, nullable=False, default=0)
max_attempts: Mapped[int] = mapped_column(SmallInteger, nullable=False, default=2)
error_code: Mapped[str | None] = mapped_column(String(40))
error_message: Mapped[str | None] = mapped_column(Text)
payload: Mapped[dict | None] = mapped_column(JSONB)
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), default=datetime.now, nullable=False
)
started_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True))
completed_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True))
# active partial unique idx 는 migration 232 가 관리.
async def enqueue_study_question_job(
session: AsyncSession,
*,
study_question_id: int,
user_id: int,
kind: str,
payload: dict[str, Any] | None = None,
) -> bool:
"""study_question_jobs 에 행 추가 (DB 레벨 중복 방어).
같은 (study_question_id, kind) 활성 (pending/processing) 이미 있으면
아무것도 하지 않고 False 반환. terminal 이력은 별도 row 누적되므로 이번 호출이
failed/skipped/completed row 무관하게 active 행을 만들 있다.
Returns: True = enqueue 발생, False = 중복으로 건너뜀.
"""
values: dict[str, Any] = {
"study_question_id": study_question_id,
"user_id": user_id,
"kind": kind,
"status": "pending",
}
if payload is not None:
values["payload"] = payload
stmt = (
pg_insert(StudyQuestionJob)
.values(**values)
.on_conflict_do_nothing(
index_elements=["study_question_id", "kind"],
index_where=text("status IN ('pending', 'processing')"),
)
)
result = await session.execute(stmt)
return result.rowcount > 0
@@ -0,0 +1,42 @@
당신은 한국 기사시험(가스기사·산업안전기사 등) 필기 학습 보조 AI 입니다.
4지선다 객관식 문제를 분석하고 정답 풀이를 작성합니다.
【문제】
{question_text}
【보기】
1. {choice_1}
2. {choice_2}
3. {choice_3}
4. {choice_4}
【사용자가 입력한 정답】
{correct_choice}번
【참고 자료 — 우선순위 순서】
▼ 자료 (1순위: 자료실 매핑 문서)
{documents_evidence_block}
▼ 같은 주제의 다른 문제 (2순위: 보조 근거)
{questions_evidence_block}
【지침】
1. 자료를 1순위 근거로 사용. 다른 문제는 보조 근거로만.
2. 자료 인용은 [자료: 제목] 형태. 문제 인용은 [관련: Q<id>] 형태.
3. 정답이 왜 맞는지 핵심 개념 → 오답 보기가 왜 틀렸는지 짧게 → 정리 순서.
4. 자료에 직접 근거가 없으면 "자료 근거 부족" 으로 명시하고, 일반 상식 풀이는 별도 단락에 표시.
5. **할루시네이션 방지 (절대 규칙)**:
- 자료 근거가 부족하면 법령명·조항·수치·기준값을 새로 만들어내지 않는다.
- 근거 없는 수치(예: "0.5 MPa", "10 mg/L")·공식·표준 번호(예: "KS B 6750")·통계는 작성하지 않는다.
- 자료에서 확인되지 않는 내용은 "자료에서 확인되지 않음" 이라고 명시한다.
- "보통 ~이다", "일반적으로 ~이다" 같은 모호한 단정도 자료 근거가 없으면 사용하지 않는다.
6. confidence 는 풀이 근거 강도에 따라 high/medium/low 중 하나로 선택:
- high: 자료에 직접 근거가 있고 정답이 명확
- medium: 자료가 부분 근거이고 일반 지식 보강 필요
- low: 자료 근거 부족하여 추론에 의존
7. answer_choice 는 풀이의 결론에 해당하는 정답 번호 (1~4 정수). 사용자 입력 정답과 자료 근거가 충돌하면 자료 근거를 따르고 explanation_md 에 충돌을 명시.
8. explanation_md 는 한국어 200~400자. 마크다운(굵게·리스트) 사용 가능.
【출력 형식 — 반드시 아래 JSON 만 출력. 메타 설명·인사·코드 펜스 없이 raw JSON 한 객체.】
{{"answer_choice": <1|2|3|4>, "explanation_md": "<풀이 본문 마크다운>", "confidence": "<high|medium|low>"}}
+119
View File
@@ -0,0 +1,119 @@
"""Phase 4-A 풀이 prefetch enqueue 헬퍼 — finalize_session + 결과 GET fallback 공유.
대상 조건:
- ai_explanation_status IN ('none', 'failed') OR ai_explanation IS NULL
- skipped / stale 자동 enqueue 대상 X (각각 자료 추가 / 명시 [다시 생성] 트리거)
- 같은 (qid, kind='explanation') 최신 study_question_jobs.error_code
guard_fail 또는 evidence_missing 이면 제외 (자동 재시도 금지 사유)
Plan: ~/.claude/plans/nifty-sparking-spindle.md
"""
from __future__ import annotations
import logging
from typing import Iterable
from sqlalchemy import or_, select
from sqlalchemy.ext.asyncio import AsyncSession
from models.study_question import StudyQuestion
from models.study_question_job import StudyQuestionJob, enqueue_study_question_job
logger = logging.getLogger(__name__)
KIND_EXPLANATION = "explanation"
NO_AUTO_RETRY_ERROR_CODES = ("guard_fail", "evidence_missing")
async def filter_needs_explanation(
session: AsyncSession, qids: Iterable[int]
) -> list[int]:
"""주어진 qid 중 자동 enqueue 대상만 추려서 반환.
1 SQL: study_questions 자체 조건 (status / ai_explanation null)
2 Python: 후보의 최신 study_question_jobs error_code 자동 재시도 금지 사유면 제외
"""
qids = list(qids)
if not qids:
return []
rows = (
await session.execute(
select(StudyQuestion.id).where(
StudyQuestion.id.in_(qids),
StudyQuestion.deleted_at.is_(None),
or_(
StudyQuestion.ai_explanation_status.in_(("none", "failed")),
StudyQuestion.ai_explanation.is_(None),
),
# skipped / stale 은 자동 enqueue 대상 X
StudyQuestion.ai_explanation_status.notin_(("skipped", "stale", "ready", "pending")),
)
)
).scalars().all()
candidate_ids = list(rows)
if not candidate_ids:
return []
# 각 후보의 최신 job (id DESC) 의 error_code — Python 에서 첫 row 만 채택
job_rows = (
await session.execute(
select(StudyQuestionJob.study_question_id, StudyQuestionJob.error_code)
.where(
StudyQuestionJob.study_question_id.in_(candidate_ids),
StudyQuestionJob.kind == KIND_EXPLANATION,
)
.order_by(
StudyQuestionJob.study_question_id.asc(),
StudyQuestionJob.id.desc(),
)
)
).all()
latest_error_by_qid: dict[int, str | None] = {}
for r in job_rows:
# 같은 qid 의 첫 등장 row = 최신 (id DESC 정렬)
if r.study_question_id not in latest_error_by_qid:
latest_error_by_qid[r.study_question_id] = r.error_code
return [
qid for qid in candidate_ids
if latest_error_by_qid.get(qid) not in NO_AUTO_RETRY_ERROR_CODES
]
async def enqueue_explanation_for_qids(
session: AsyncSession,
*,
user_id: int,
qids: Iterable[int],
max_count: int | None = None,
) -> dict[str, int]:
"""주어진 qid 묶음에 대해 enqueue 수행. caller 가 commit 책임.
max_count: 호출에 enqueue 최대 (GET fallback 30 cap).
Returns: {'enqueue_count': N, 'skipped_count': M, 'candidate_count': K}
"""
candidates = await filter_needs_explanation(session, qids)
if max_count is not None:
candidates = candidates[:max_count]
enqueue_count = 0
skipped_count = 0
for qid in candidates:
ok = await enqueue_study_question_job(
session,
study_question_id=qid,
user_id=user_id,
kind=KIND_EXPLANATION,
)
if ok:
enqueue_count += 1
else:
# 이미 active 행 있어 on_conflict_do_nothing — 정상.
skipped_count += 1
return {
"enqueue_count": enqueue_count,
"skipped_count": skipped_count,
"candidate_count": len(candidates),
}
+24
View File
@@ -213,6 +213,30 @@ async def finalize_session(
qs.recovered_count = recovered_count
qs.chronic_remaining_count = chronic_remaining
# 6. Phase 4-A: 이 세션의 wrong/unsure qid AI 풀이 prefetch enqueue (best-effort).
# 실패가 finalize 자체를 깨뜨리지 않도록 try/except 로 격리. 응답 카운트와 무관.
try:
from services.study.explanation_enqueue import enqueue_explanation_for_qids
wrong_unsure_qids = [
qid for qid, a in last_per_qid.items()
if a.outcome in ("wrong", "unsure")
]
if wrong_unsure_qids:
res = await enqueue_explanation_for_qids(
session, user_id=user_id, qids=wrong_unsure_qids,
)
import logging
logging.getLogger(__name__).info(
"phase4a_finalize_enqueue session=%s candidates=%s enqueued=%s skipped=%s",
quiz_session_id, res["candidate_count"], res["enqueue_count"], res["skipped_count"],
)
except Exception as e:
import logging
logging.getLogger(__name__).warning(
"phase4a_finalize_enqueue_failed session=%s: %s: %s",
quiz_session_id, type(e).__name__, e,
)
return SessionSummary(
correct=correct,
wrong=wrong,
+215
View File
@@ -0,0 +1,215 @@
"""Phase 4-A 풀이 prefetch worker — wrong/unsure 문제의 AI 풀이를 batch 로 미리 생성.
Plan: ~/.claude/plans/nifty-sparking-spindle.md
study_question_jobs (kind='explanation') row 1건을 받아 처리:
1. RAG 근거 수집 (PR-3 explanation_rag.py 재사용)
2. evidence 비어있으면 LLM 호출 X status='skipped'
3. MLX primary 호출 (gate Semaphore(1) 공유) envelope JSON
4. 환각 가드 answer_choice == question.correct_choice 검증
5. 통과 study_questions.ai_explanation 캐시 박기
terminal status (completed/failed/skipped) completed_at 항상 기록.
재시도 정책 guard_fail/evidence_missing final, (llm_timeout/parse_fail/unknown)
attempts < max_attempts pending 으로 복귀.
"""
from __future__ import annotations
import asyncio
import json
import logging
from datetime import datetime, timezone
from pathlib import Path
import httpx
from sqlalchemy.ext.asyncio import AsyncSession
from ai.client import AIClient, parse_json_response
from models.study_question import StudyQuestion
from models.study_question_job import StudyQuestionJob
from services.search.llm_gate import get_mlx_gate
from services.study.explanation_rag import (
gather_explanation_context,
render_evidence_block,
)
logger = logging.getLogger(__name__)
# PR-3 LLM_TIMEOUT_S 와 동일 안전 마진 (26B 평균 ~10s, gate 직렬화 고려)
LLM_TIMEOUT_S = 30.0
_ENVELOPE_PROMPT_FILE = "study_explanation_envelope.txt"
_envelope_template_cache: str | None = None
def _load_envelope_prompt() -> str:
global _envelope_template_cache
if _envelope_template_cache is None:
prompts_dir = Path(__file__).resolve().parent.parent / "prompts"
_envelope_template_cache = (
prompts_dir / _ENVELOPE_PROMPT_FILE
).read_text(encoding="utf-8")
return _envelope_template_cache
def _render_envelope_prompt(q: StudyQuestion, doc_block: str, q_block: str) -> str:
return (
_load_envelope_prompt()
.replace("{question_text}", q.question_text or "")
.replace("{choice_1}", q.choice_1 or "")
.replace("{choice_2}", q.choice_2 or "")
.replace("{choice_3}", q.choice_3 or "")
.replace("{choice_4}", q.choice_4 or "")
.replace("{correct_choice}", str(q.correct_choice))
.replace("{documents_evidence_block}", doc_block)
.replace("{questions_evidence_block}", q_block)
)
async def run_explanation_job(session: AsyncSession, job: StudyQuestionJob) -> None:
"""Phase 4-A: study_question_jobs row 1건 처리. caller 가 commit 책임.
job.status 호출 'pending' 가정. 종료 completed/failed/skipped/pending(재시도)
하나.
"""
now = lambda: datetime.now(timezone.utc) # noqa: E731
# attempt + processing 단정
job.attempts += 1
job.status = "processing"
job.started_at = now()
await session.flush()
try:
question = await session.get(StudyQuestion, job.study_question_id)
if question is None or question.deleted_at is not None:
# 삭제된 문제 — job 도 skipped 로 종결.
job.error_code = "evidence_missing"
job.error_message = "question deleted or missing"
job.status = "skipped"
job.completed_at = now()
return
# race-safe — PR-3 실시간 호출이 이미 ready 박았으면 즉시 종결.
if question.ai_explanation_status == "ready":
job.status = "completed"
job.completed_at = now()
return
# 1. RAG 근거 수집
ctx = await gather_explanation_context(session, job.user_id, question)
if not ctx.documents and not ctx.questions:
# evidence 없음 — LLM 호출 X. job/question 둘 다 skipped 통일.
job.error_code = "evidence_missing"
job.error_message = "no document/question evidence in this topic"
job.status = "skipped"
job.completed_at = now()
question.ai_explanation_status = "skipped"
question.updated_at = now()
return
# 2. 프롬프트 + MLX primary
doc_block = render_evidence_block(ctx.documents)
q_block = render_evidence_block(ctx.questions)
prompt = _render_envelope_prompt(question, doc_block, q_block)
ai_client = AIClient()
try:
async with get_mlx_gate():
async with asyncio.timeout(LLM_TIMEOUT_S):
raw_text = await ai_client.call_primary(prompt)
primary_name = (
ai_client.ai.primary.model
if hasattr(ai_client.ai, "primary") and hasattr(ai_client.ai.primary, "model")
else "primary"
)
finally:
await ai_client.close()
if not raw_text or not raw_text.strip():
# 빈 응답도 timeout 류로 처리 — 재시도 후보.
job.error_code = "llm_timeout"
job.error_message = "empty response from primary"
return
# 3. envelope 파싱
envelope = parse_json_response(raw_text)
if envelope is None or not isinstance(envelope, dict):
job.error_code = "parse_fail"
job.error_message = "envelope JSON parse failed"
return
answer_choice = envelope.get("answer_choice")
explanation_md = envelope.get("explanation_md") or ""
confidence = envelope.get("confidence")
if not isinstance(answer_choice, int) or answer_choice not in (1, 2, 3, 4):
job.error_code = "parse_fail"
job.error_message = f"invalid answer_choice: {answer_choice!r}"
return
if not explanation_md.strip():
job.error_code = "parse_fail"
job.error_message = "empty explanation_md"
return
# 4. 환각 가드 — 정답 번호 일치
if answer_choice != question.correct_choice:
job.error_code = "guard_fail"
job.error_message = (
f"answer_choice={answer_choice} != correct_choice={question.correct_choice}"
)
job.status = "failed"
job.completed_at = now()
question.ai_explanation_status = "failed"
question.updated_at = now()
return
# 5. 성공 — confidence 는 1차 통과 (Phase 4-B 임계 결정).
# 운영 분석 자산으로 payload 에 confidence 보존.
job_payload = dict(job.payload or {})
job_payload["confidence"] = confidence
job.payload = job_payload
question.ai_explanation = explanation_md
question.ai_explanation_status = "ready"
question.ai_explanation_generated_at = now()
question.ai_explanation_model = f"mlx:{primary_name}"
question.updated_at = question.ai_explanation_generated_at
job.status = "completed"
job.completed_at = now()
return
except (asyncio.TimeoutError, httpx.HTTPError) as e:
job.error_code = "llm_timeout"
job.error_message = f"{type(e).__name__}: {e}"
logger.warning(
"study_explanation_job_timeout job_id=%s qid=%s: %s",
job.id, job.study_question_id, e,
)
except (json.JSONDecodeError, ValueError) as e:
job.error_code = "parse_fail"
job.error_message = f"{type(e).__name__}: {e}"
logger.warning(
"study_explanation_job_parse_fail job_id=%s qid=%s: %s",
job.id, job.study_question_id, e,
)
except Exception as e:
# 예상 못한 예외 — error_code 미세팅 시 finally 가 None 을 retryable 로 보면 무한 루프.
# 명시적으로 'unknown' 박아 재시도 정책 안에 들어가게.
job.error_code = "unknown"
job.error_message = f"{type(e).__name__}: {e}"
logger.exception(
"study_explanation_job_unknown_fail job_id=%s qid=%s",
job.id, job.study_question_id,
)
finally:
# 재시도 분기 — guard_fail/evidence_missing 은 위 try 에서 이미 단정 종결.
# 여기 도달 케이스는 llm_timeout / parse_fail / unknown.
if job.status == "processing":
retryable = job.error_code in ("llm_timeout", "parse_fail", "unknown")
if retryable and job.attempts < job.max_attempts:
job.status = "pending" # 다음 cycle 재시도
else:
job.status = "failed"
job.completed_at = now()
+108
View File
@@ -0,0 +1,108 @@
"""Phase 4-A study_question_jobs consumer — APScheduler 1분 간격.
study 도메인 전용 (processing_queue 분리). BATCH_SIZE=1 MLX 26B
deep_summary 같은 Semaphore(1) 게이트를 공유하므로 GPU 부하 통제.
stage worker dispatch:
- kind='explanation': study_explanation_worker.run_explanation_job (Phase 4-A)
- kind='session_summary': Phase 4-B 예약, 1 미구현 pending 보면 즉시 skipped 처리
"""
from __future__ import annotations
from datetime import datetime, timedelta, timezone
from sqlalchemy import select, update
from sqlalchemy.exc import SQLAlchemyError
from core.database import async_session
from core.utils import setup_logger
from models.study_question_job import StudyQuestionJob
from workers.study_explanation_worker import run_explanation_job
logger = setup_logger("study_queue_consumer")
# 한 사이클에 한 row 씩 — MLX 게이트 직렬화 + GPU 부하 예측 가능.
BATCH_SIZE = 1
# processing 상태로 STALE_MINUTES 이상 방치된 job 은 worker 가 죽었다고 보고 pending 복구.
STALE_MINUTES = 10
async def reset_stale_study_jobs() -> None:
"""processing 으로 STALE_MINUTES 이상 방치된 job 을 pending 으로 복구.
partial unique idx (qid, kind) WHERE status IN ('pending','processing') 이라
같은 qid 다른 pending 동시에 있을 일이 거의 없음 (enqueue on_conflict_do_nothing).
그래도 안전하게 update 충돌 IntegrityError 단건 단위 catch.
"""
cutoff = datetime.now(timezone.utc) - timedelta(minutes=STALE_MINUTES)
try:
async with async_session() as session:
stmt = (
update(StudyQuestionJob)
.where(
StudyQuestionJob.status == "processing",
StudyQuestionJob.started_at.is_not(None),
StudyQuestionJob.started_at < cutoff,
)
.values(status="pending", started_at=None)
)
result = await session.execute(stmt)
await session.commit()
n = result.rowcount or 0
if n > 0:
logger.warning("study_jobs_stale_reset count=%s", n)
except SQLAlchemyError as e:
logger.exception("study_jobs_stale_reset_failed: %s", e)
async def consume_study_queue() -> None:
"""APScheduler 진입점. pending job BATCH_SIZE 만큼 처리."""
await reset_stale_study_jobs()
async with async_session() as session:
rows = (
await session.execute(
select(StudyQuestionJob)
.where(StudyQuestionJob.status == "pending")
.order_by(StudyQuestionJob.id.asc())
.limit(BATCH_SIZE)
)
).scalars().all()
for job_row in rows:
# 각 job 마다 독립 세션 — 한 job 실패가 다른 job 에 전염 X.
async with async_session() as s:
try:
# 같은 row 를 fresh load (lock 회피, 세션 격리)
job = await s.get(StudyQuestionJob, job_row.id)
if job is None or job.status != "pending":
continue # 다른 cycle 에서 이미 처리
if job.kind == "explanation":
await run_explanation_job(s, job)
elif job.kind == "session_summary":
# Phase 4-B 미구현 — 즉시 skipped 처리 (lost in queue 방지)
job.status = "skipped"
job.error_code = "unknown"
job.error_message = "session_summary not implemented in Phase 4-A"
job.completed_at = datetime.now(timezone.utc)
logger.info("study_job_unsupported_kind id=%s kind=%s", job.id, job.kind)
else:
job.status = "skipped"
job.error_code = "unknown"
job.error_message = f"unknown kind: {job.kind!r}"
job.completed_at = datetime.now(timezone.utc)
logger.warning("study_job_unknown_kind id=%s kind=%s", job.id, job.kind)
await s.commit()
logger.info(
"study_job_processed id=%s qid=%s kind=%s status=%s error_code=%s attempts=%s",
job.id, job.study_question_id, job.kind, job.status, job.error_code,
job.attempts,
)
except Exception as e:
# worker 안에서 잡지 못한 catastrophic 실패 — 세션 rollback 후 다음 cycle 재처리.
# study_explanation_worker 가 catch-all except 를 가지므로 여기 도달은 드묾.
await s.rollback()
logger.exception("study_job_outer_failed job_id=%s: %s", job_row.id, e)
+24
View File
@@ -0,0 +1,24 @@
-- 231_study_question_jobs.sql
-- Phase 4-A: study question 도메인 전용 비동기 작업 큐.
-- processing_queue 가 documents.id FK 라 study_questions 에 직접 재사용 불가.
--
-- 라이프사이클:
-- pending → processing → completed | failed | skipped
-- terminal status (completed/failed/skipped) 는 completed_at 항상 기록.
-- failed 재시도는 기존 row 를 pending 으로 되살리지 않고 새 row 생성 — 이력 누적.
CREATE TABLE study_question_jobs (
id BIGSERIAL PRIMARY KEY,
study_question_id BIGINT NOT NULL REFERENCES study_questions(id) ON DELETE CASCADE,
user_id BIGINT NOT NULL REFERENCES users(id) ON DELETE CASCADE,
kind VARCHAR(40) NOT NULL,
status VARCHAR(20) NOT NULL DEFAULT 'pending',
attempts SMALLINT NOT NULL DEFAULT 0,
max_attempts SMALLINT NOT NULL DEFAULT 2,
error_code VARCHAR(40),
error_message TEXT,
payload JSONB,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
started_at TIMESTAMPTZ,
completed_at TIMESTAMPTZ
);
@@ -0,0 +1,8 @@
-- 232_study_q_jobs_active_uq.sql
-- (study_question_id, kind) 활성 행 중복 차단.
-- terminal status (completed/failed/skipped) 는 누적 이력이라 unique 대상 X.
-- 같은 qid 가 failed 후 재enqueue 될 때 새 row 가 들어가는 것을 허용.
CREATE UNIQUE INDEX uq_study_q_jobs_active
ON study_question_jobs (study_question_id, kind)
WHERE status IN ('pending', 'processing');