diff --git a/app/api/study_topics.py b/app/api/study_topics.py index a4da1be..ace0b85 100644 --- a/app/api/study_topics.py +++ b/app/api/study_topics.py @@ -1805,6 +1805,35 @@ async def get_quiz_session( ] summary = await _build_session_summary(qs, session, include_progress_counts=True) + + # Phase 4-A: 결과 화면 GET fallback — finalize enqueue 누락 또는 worker 처리 전 + # 사용자가 결과 들어온 경우 같은 wrong/unsure qid 를 idempotent backfill enqueue. + # 한 요청당 30 cap + non-blocking + debug 로그. 실패해도 GET 200 유지. + if qs.status == "done": + try: + from services.study.explanation_enqueue import enqueue_explanation_for_qids + wrong_unsure_qids = [ + a.study_question_id for a in attempt_rows + if a.outcome in ("wrong", "unsure") + ] + if wrong_unsure_qids: + res = await enqueue_explanation_for_qids( + session, + user_id=user.id, + qids=wrong_unsure_qids, + max_count=30, # GET fallback cap + ) + await session.commit() + logger.debug( + "phase4a_get_backfill session=%s candidates=%s enqueued=%s skipped=%s", + qs.id, res["candidate_count"], res["enqueue_count"], res["skipped_count"], + ) + except Exception as e: + logger.warning( + "phase4a_get_backfill_failed session=%s: %s: %s", + qs.id, type(e).__name__, e, + ) + return QuizSessionDetailResponse( summary=summary, questions=questions_payload, diff --git a/app/main.py b/app/main.py index cdca5f7..3cec1ce 100644 --- a/app/main.py +++ b/app/main.py @@ -44,6 +44,7 @@ async def lifespan(app: FastAPI): from workers.mailplus_archive import run as mailplus_run from workers.news_collector import run as news_collector_run from workers.queue_consumer import consume_queue + from workers.study_queue_consumer import consume_study_queue from workers.study_question_embed_worker import ( refresh_stale_related as study_q_related_refresh, run as study_q_embed_run, @@ -75,6 +76,9 @@ async def lifespan(app: FastAPI): # PR-12-A 후속: related-types 캐시 stale 행 재계산. 임베딩 워커와 분리한 별도 cron. # 새 문제 ready / 같은 토픽 invalidation / 임계값 변경 시 NULL 마킹된 행을 batch=20 처리. scheduler.add_job(study_q_related_refresh, "interval", minutes=1, id="study_q_related_refresh") + # Phase 4-A: study_question_jobs 처리 — wrong/unsure AI 풀이 prefetch. + # MLX gate 직렬화 + BATCH_SIZE=1 로 GPU 부하 통제. STALE_MINUTES=10 자체 복구. + scheduler.add_job(consume_study_queue, "interval", minutes=1, id="study_queue_consumer") # PR-B 레거시 tier 백필 — 30분 주기로 호출되지만 KST 00:00~06:00 시간대만 실제 enqueue. # safety > law > manual 우선순위로 25건씩. 6720 레거시 → 야간당 ~150건 → 약 45일 소화. scheduler.add_job(tier_backfill_run, "interval", minutes=30, id="tier_backfill") diff --git a/app/models/study_question_job.py b/app/models/study_question_job.py new file mode 100644 index 0000000..e4a321c --- /dev/null +++ b/app/models/study_question_job.py @@ -0,0 +1,87 @@ +"""study_question_jobs ORM (Phase 4-A) — study 도메인 전용 비동기 작업 큐. + +processing_queue 가 documents.id FK 라 study_questions 에 직접 재사용 불가. +별도 테이블 + 별도 consumer (study_queue_consumer.py). + +kind 권장값: + - 'explanation' (Phase 4-A): wrong/unsure 문제의 AI 풀이 prefetch + - 'session_summary' (Phase 4-B 예약): 세션 단위 종합 분석. session_summary 는 question + 단위에 얹기 어색해 Phase 4-B 구현 시 study_quiz_session_jobs 별도 분리 검토. + +terminal status (completed/failed/skipped) 는 completed_at 항상 기록. +failed 재시도는 기존 row 를 pending 으로 되살리지 않고 새 row 생성 — 이력 누적. +""" + +from __future__ import annotations + +from datetime import datetime +from typing import Any + +from sqlalchemy import BigInteger, DateTime, ForeignKey, SmallInteger, String, Text, text +from sqlalchemy.dialects.postgresql import JSONB, insert as pg_insert +from sqlalchemy.ext.asyncio import AsyncSession +from sqlalchemy.orm import Mapped, mapped_column + +from core.database import Base + + +class StudyQuestionJob(Base): + __tablename__ = "study_question_jobs" + + id: Mapped[int] = mapped_column(BigInteger, primary_key=True) + study_question_id: Mapped[int] = mapped_column( + BigInteger, ForeignKey("study_questions.id", ondelete="CASCADE"), nullable=False + ) + user_id: Mapped[int] = mapped_column( + BigInteger, ForeignKey("users.id", ondelete="CASCADE"), nullable=False + ) + kind: Mapped[str] = mapped_column(String(40), nullable=False) + status: Mapped[str] = mapped_column(String(20), nullable=False, default="pending") + attempts: Mapped[int] = mapped_column(SmallInteger, nullable=False, default=0) + max_attempts: Mapped[int] = mapped_column(SmallInteger, nullable=False, default=2) + error_code: Mapped[str | None] = mapped_column(String(40)) + error_message: Mapped[str | None] = mapped_column(Text) + payload: Mapped[dict | None] = mapped_column(JSONB) + created_at: Mapped[datetime] = mapped_column( + DateTime(timezone=True), default=datetime.now, nullable=False + ) + started_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True)) + completed_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True)) + + # active partial unique idx 는 migration 232 가 관리. + + +async def enqueue_study_question_job( + session: AsyncSession, + *, + study_question_id: int, + user_id: int, + kind: str, + payload: dict[str, Any] | None = None, +) -> bool: + """study_question_jobs 에 행 추가 (DB 레벨 중복 방어). + + 같은 (study_question_id, kind) 에 활성 행 (pending/processing) 이 이미 있으면 + 아무것도 하지 않고 False 반환. terminal 이력은 별도 row 로 누적되므로 이번 호출이 + failed/skipped/completed row 와 무관하게 새 active 행을 만들 수 있다. + + Returns: True = 새 enqueue 발생, False = 중복으로 건너뜀. + """ + values: dict[str, Any] = { + "study_question_id": study_question_id, + "user_id": user_id, + "kind": kind, + "status": "pending", + } + if payload is not None: + values["payload"] = payload + stmt = ( + pg_insert(StudyQuestionJob) + .values(**values) + .on_conflict_do_nothing( + index_elements=["study_question_id", "kind"], + index_where=text("status IN ('pending', 'processing')"), + ) + ) + result = await session.execute(stmt) + return result.rowcount > 0 diff --git a/app/prompts/study_explanation_envelope.txt b/app/prompts/study_explanation_envelope.txt new file mode 100644 index 0000000..e694c57 --- /dev/null +++ b/app/prompts/study_explanation_envelope.txt @@ -0,0 +1,42 @@ +당신은 한국 기사시험(가스기사·산업안전기사 등) 필기 학습 보조 AI 입니다. +4지선다 객관식 문제를 분석하고 정답 풀이를 작성합니다. + +【문제】 +{question_text} + +【보기】 +1. {choice_1} +2. {choice_2} +3. {choice_3} +4. {choice_4} + +【사용자가 입력한 정답】 +{correct_choice}번 + +【참고 자료 — 우선순위 순서】 + +▼ 자료 (1순위: 자료실 매핑 문서) +{documents_evidence_block} + +▼ 같은 주제의 다른 문제 (2순위: 보조 근거) +{questions_evidence_block} + +【지침】 +1. 자료를 1순위 근거로 사용. 다른 문제는 보조 근거로만. +2. 자료 인용은 [자료: 제목] 형태. 문제 인용은 [관련: Q] 형태. +3. 정답이 왜 맞는지 핵심 개념 → 오답 보기가 왜 틀렸는지 짧게 → 정리 순서. +4. 자료에 직접 근거가 없으면 "자료 근거 부족" 으로 명시하고, 일반 상식 풀이는 별도 단락에 표시. +5. **할루시네이션 방지 (절대 규칙)**: + - 자료 근거가 부족하면 법령명·조항·수치·기준값을 새로 만들어내지 않는다. + - 근거 없는 수치(예: "0.5 MPa", "10 mg/L")·공식·표준 번호(예: "KS B 6750")·통계는 작성하지 않는다. + - 자료에서 확인되지 않는 내용은 "자료에서 확인되지 않음" 이라고 명시한다. + - "보통 ~이다", "일반적으로 ~이다" 같은 모호한 단정도 자료 근거가 없으면 사용하지 않는다. +6. confidence 는 풀이 근거 강도에 따라 high/medium/low 중 하나로 선택: + - high: 자료에 직접 근거가 있고 정답이 명확 + - medium: 자료가 부분 근거이고 일반 지식 보강 필요 + - low: 자료 근거 부족하여 추론에 의존 +7. answer_choice 는 풀이의 결론에 해당하는 정답 번호 (1~4 정수). 사용자 입력 정답과 자료 근거가 충돌하면 자료 근거를 따르고 explanation_md 에 충돌을 명시. +8. explanation_md 는 한국어 200~400자. 마크다운(굵게·리스트) 사용 가능. + +【출력 형식 — 반드시 아래 JSON 만 출력. 메타 설명·인사·코드 펜스 없이 raw JSON 한 객체.】 +{{"answer_choice": <1|2|3|4>, "explanation_md": "<풀이 본문 마크다운>", "confidence": ""}} diff --git a/app/services/study/explanation_enqueue.py b/app/services/study/explanation_enqueue.py new file mode 100644 index 0000000..5e1d551 --- /dev/null +++ b/app/services/study/explanation_enqueue.py @@ -0,0 +1,119 @@ +"""Phase 4-A 풀이 prefetch enqueue 헬퍼 — finalize_session + 결과 GET fallback 공유. + +대상 조건: + - ai_explanation_status IN ('none', 'failed') OR ai_explanation IS NULL + - skipped / stale 은 자동 enqueue 대상 X (각각 자료 추가 / 명시 [다시 생성] 트리거) + - 같은 (qid, kind='explanation') 의 최신 study_question_jobs.error_code 가 + guard_fail 또는 evidence_missing 이면 제외 (자동 재시도 금지 사유) + +Plan: ~/.claude/plans/nifty-sparking-spindle.md +""" + +from __future__ import annotations + +import logging +from typing import Iterable + +from sqlalchemy import or_, select +from sqlalchemy.ext.asyncio import AsyncSession + +from models.study_question import StudyQuestion +from models.study_question_job import StudyQuestionJob, enqueue_study_question_job + +logger = logging.getLogger(__name__) + +KIND_EXPLANATION = "explanation" +NO_AUTO_RETRY_ERROR_CODES = ("guard_fail", "evidence_missing") + + +async def filter_needs_explanation( + session: AsyncSession, qids: Iterable[int] +) -> list[int]: + """주어진 qid 중 자동 enqueue 대상만 추려서 반환. + + 1차 SQL: study_questions 자체 조건 (status / ai_explanation null) + 2차 Python: 각 후보의 최신 study_question_jobs error_code 가 자동 재시도 금지 사유면 제외 + """ + qids = list(qids) + if not qids: + return [] + + rows = ( + await session.execute( + select(StudyQuestion.id).where( + StudyQuestion.id.in_(qids), + StudyQuestion.deleted_at.is_(None), + or_( + StudyQuestion.ai_explanation_status.in_(("none", "failed")), + StudyQuestion.ai_explanation.is_(None), + ), + # skipped / stale 은 자동 enqueue 대상 X + StudyQuestion.ai_explanation_status.notin_(("skipped", "stale", "ready", "pending")), + ) + ) + ).scalars().all() + candidate_ids = list(rows) + if not candidate_ids: + return [] + + # 각 후보의 최신 job (id DESC) 의 error_code — Python 에서 첫 row 만 채택 + job_rows = ( + await session.execute( + select(StudyQuestionJob.study_question_id, StudyQuestionJob.error_code) + .where( + StudyQuestionJob.study_question_id.in_(candidate_ids), + StudyQuestionJob.kind == KIND_EXPLANATION, + ) + .order_by( + StudyQuestionJob.study_question_id.asc(), + StudyQuestionJob.id.desc(), + ) + ) + ).all() + latest_error_by_qid: dict[int, str | None] = {} + for r in job_rows: + # 같은 qid 의 첫 등장 row = 최신 (id DESC 정렬) + if r.study_question_id not in latest_error_by_qid: + latest_error_by_qid[r.study_question_id] = r.error_code + + return [ + qid for qid in candidate_ids + if latest_error_by_qid.get(qid) not in NO_AUTO_RETRY_ERROR_CODES + ] + + +async def enqueue_explanation_for_qids( + session: AsyncSession, + *, + user_id: int, + qids: Iterable[int], + max_count: int | None = None, +) -> dict[str, int]: + """주어진 qid 묶음에 대해 enqueue 수행. caller 가 commit 책임. + + max_count: 한 호출에 enqueue 할 최대 행 수 (GET fallback 30 cap). + Returns: {'enqueue_count': N, 'skipped_count': M, 'candidate_count': K} + """ + candidates = await filter_needs_explanation(session, qids) + if max_count is not None: + candidates = candidates[:max_count] + + enqueue_count = 0 + skipped_count = 0 + for qid in candidates: + ok = await enqueue_study_question_job( + session, + study_question_id=qid, + user_id=user_id, + kind=KIND_EXPLANATION, + ) + if ok: + enqueue_count += 1 + else: + # 이미 active 행 있어 on_conflict_do_nothing — 정상. + skipped_count += 1 + return { + "enqueue_count": enqueue_count, + "skipped_count": skipped_count, + "candidate_count": len(candidates), + } diff --git a/app/services/study/session_finalize.py b/app/services/study/session_finalize.py index 4852e5f..1d9b5b8 100644 --- a/app/services/study/session_finalize.py +++ b/app/services/study/session_finalize.py @@ -213,6 +213,30 @@ async def finalize_session( qs.recovered_count = recovered_count qs.chronic_remaining_count = chronic_remaining + # 6. Phase 4-A: 이 세션의 wrong/unsure qid AI 풀이 prefetch enqueue (best-effort). + # 실패가 finalize 자체를 깨뜨리지 않도록 try/except 로 격리. 응답 카운트와 무관. + try: + from services.study.explanation_enqueue import enqueue_explanation_for_qids + wrong_unsure_qids = [ + qid for qid, a in last_per_qid.items() + if a.outcome in ("wrong", "unsure") + ] + if wrong_unsure_qids: + res = await enqueue_explanation_for_qids( + session, user_id=user_id, qids=wrong_unsure_qids, + ) + import logging + logging.getLogger(__name__).info( + "phase4a_finalize_enqueue session=%s candidates=%s enqueued=%s skipped=%s", + quiz_session_id, res["candidate_count"], res["enqueue_count"], res["skipped_count"], + ) + except Exception as e: + import logging + logging.getLogger(__name__).warning( + "phase4a_finalize_enqueue_failed session=%s: %s: %s", + quiz_session_id, type(e).__name__, e, + ) + return SessionSummary( correct=correct, wrong=wrong, diff --git a/app/workers/study_explanation_worker.py b/app/workers/study_explanation_worker.py new file mode 100644 index 0000000..a8f20e7 --- /dev/null +++ b/app/workers/study_explanation_worker.py @@ -0,0 +1,215 @@ +"""Phase 4-A 풀이 prefetch worker — wrong/unsure 문제의 AI 풀이를 batch 로 미리 생성. + +Plan: ~/.claude/plans/nifty-sparking-spindle.md +study_question_jobs (kind='explanation') row 1건을 받아 처리: + 1. RAG 근거 수집 (PR-3 의 explanation_rag.py 재사용) + 2. evidence 둘 다 비어있으면 LLM 호출 X → status='skipped' + 3. MLX primary 호출 (gate Semaphore(1) 공유) → envelope JSON + 4. 환각 가드 — answer_choice == question.correct_choice 검증 + 5. 통과 시 study_questions.ai_explanation 캐시 박기 + +terminal status (completed/failed/skipped) 는 completed_at 항상 기록. +재시도 정책 — guard_fail/evidence_missing 은 final, 그 외 (llm_timeout/parse_fail/unknown) +는 attempts < max_attempts 면 pending 으로 복귀. +""" + +from __future__ import annotations + +import asyncio +import json +import logging +from datetime import datetime, timezone +from pathlib import Path + +import httpx +from sqlalchemy.ext.asyncio import AsyncSession + +from ai.client import AIClient, parse_json_response +from models.study_question import StudyQuestion +from models.study_question_job import StudyQuestionJob +from services.search.llm_gate import get_mlx_gate +from services.study.explanation_rag import ( + gather_explanation_context, + render_evidence_block, +) + +logger = logging.getLogger(__name__) + +# PR-3 LLM_TIMEOUT_S 와 동일 안전 마진 (26B 평균 ~10s, gate 직렬화 고려) +LLM_TIMEOUT_S = 30.0 + +_ENVELOPE_PROMPT_FILE = "study_explanation_envelope.txt" +_envelope_template_cache: str | None = None + + +def _load_envelope_prompt() -> str: + global _envelope_template_cache + if _envelope_template_cache is None: + prompts_dir = Path(__file__).resolve().parent.parent / "prompts" + _envelope_template_cache = ( + prompts_dir / _ENVELOPE_PROMPT_FILE + ).read_text(encoding="utf-8") + return _envelope_template_cache + + +def _render_envelope_prompt(q: StudyQuestion, doc_block: str, q_block: str) -> str: + return ( + _load_envelope_prompt() + .replace("{question_text}", q.question_text or "") + .replace("{choice_1}", q.choice_1 or "") + .replace("{choice_2}", q.choice_2 or "") + .replace("{choice_3}", q.choice_3 or "") + .replace("{choice_4}", q.choice_4 or "") + .replace("{correct_choice}", str(q.correct_choice)) + .replace("{documents_evidence_block}", doc_block) + .replace("{questions_evidence_block}", q_block) + ) + + +async def run_explanation_job(session: AsyncSession, job: StudyQuestionJob) -> None: + """Phase 4-A: study_question_jobs row 1건 처리. caller 가 commit 책임. + + job.status 는 호출 전 'pending' 가정. 종료 시 completed/failed/skipped/pending(재시도) + 중 하나. + """ + now = lambda: datetime.now(timezone.utc) # noqa: E731 + + # attempt + processing 단정 + job.attempts += 1 + job.status = "processing" + job.started_at = now() + await session.flush() + + try: + question = await session.get(StudyQuestion, job.study_question_id) + if question is None or question.deleted_at is not None: + # 삭제된 문제 — job 도 skipped 로 종결. + job.error_code = "evidence_missing" + job.error_message = "question deleted or missing" + job.status = "skipped" + job.completed_at = now() + return + + # race-safe — PR-3 실시간 호출이 이미 ready 박았으면 즉시 종결. + if question.ai_explanation_status == "ready": + job.status = "completed" + job.completed_at = now() + return + + # 1. RAG 근거 수집 + ctx = await gather_explanation_context(session, job.user_id, question) + if not ctx.documents and not ctx.questions: + # evidence 없음 — LLM 호출 X. job/question 둘 다 skipped 통일. + job.error_code = "evidence_missing" + job.error_message = "no document/question evidence in this topic" + job.status = "skipped" + job.completed_at = now() + question.ai_explanation_status = "skipped" + question.updated_at = now() + return + + # 2. 프롬프트 + MLX primary + doc_block = render_evidence_block(ctx.documents) + q_block = render_evidence_block(ctx.questions) + prompt = _render_envelope_prompt(question, doc_block, q_block) + + ai_client = AIClient() + try: + async with get_mlx_gate(): + async with asyncio.timeout(LLM_TIMEOUT_S): + raw_text = await ai_client.call_primary(prompt) + primary_name = ( + ai_client.ai.primary.model + if hasattr(ai_client.ai, "primary") and hasattr(ai_client.ai.primary, "model") + else "primary" + ) + finally: + await ai_client.close() + + if not raw_text or not raw_text.strip(): + # 빈 응답도 timeout 류로 처리 — 재시도 후보. + job.error_code = "llm_timeout" + job.error_message = "empty response from primary" + return + + # 3. envelope 파싱 + envelope = parse_json_response(raw_text) + if envelope is None or not isinstance(envelope, dict): + job.error_code = "parse_fail" + job.error_message = "envelope JSON parse failed" + return + + answer_choice = envelope.get("answer_choice") + explanation_md = envelope.get("explanation_md") or "" + confidence = envelope.get("confidence") + + if not isinstance(answer_choice, int) or answer_choice not in (1, 2, 3, 4): + job.error_code = "parse_fail" + job.error_message = f"invalid answer_choice: {answer_choice!r}" + return + if not explanation_md.strip(): + job.error_code = "parse_fail" + job.error_message = "empty explanation_md" + return + + # 4. 환각 가드 — 정답 번호 일치 + if answer_choice != question.correct_choice: + job.error_code = "guard_fail" + job.error_message = ( + f"answer_choice={answer_choice} != correct_choice={question.correct_choice}" + ) + job.status = "failed" + job.completed_at = now() + question.ai_explanation_status = "failed" + question.updated_at = now() + return + + # 5. 성공 — confidence 는 1차 통과 (Phase 4-B 임계 결정). + # 운영 분석 자산으로 payload 에 confidence 보존. + job_payload = dict(job.payload or {}) + job_payload["confidence"] = confidence + job.payload = job_payload + + question.ai_explanation = explanation_md + question.ai_explanation_status = "ready" + question.ai_explanation_generated_at = now() + question.ai_explanation_model = f"mlx:{primary_name}" + question.updated_at = question.ai_explanation_generated_at + + job.status = "completed" + job.completed_at = now() + return + + except (asyncio.TimeoutError, httpx.HTTPError) as e: + job.error_code = "llm_timeout" + job.error_message = f"{type(e).__name__}: {e}" + logger.warning( + "study_explanation_job_timeout job_id=%s qid=%s: %s", + job.id, job.study_question_id, e, + ) + except (json.JSONDecodeError, ValueError) as e: + job.error_code = "parse_fail" + job.error_message = f"{type(e).__name__}: {e}" + logger.warning( + "study_explanation_job_parse_fail job_id=%s qid=%s: %s", + job.id, job.study_question_id, e, + ) + except Exception as e: + # 예상 못한 예외 — error_code 미세팅 시 finally 가 None 을 retryable 로 보면 무한 루프. + # 명시적으로 'unknown' 박아 재시도 정책 안에 들어가게. + job.error_code = "unknown" + job.error_message = f"{type(e).__name__}: {e}" + logger.exception( + "study_explanation_job_unknown_fail job_id=%s qid=%s", + job.id, job.study_question_id, + ) + finally: + # 재시도 분기 — guard_fail/evidence_missing 은 위 try 에서 이미 단정 종결. + # 여기 도달 케이스는 llm_timeout / parse_fail / unknown. + if job.status == "processing": + retryable = job.error_code in ("llm_timeout", "parse_fail", "unknown") + if retryable and job.attempts < job.max_attempts: + job.status = "pending" # 다음 cycle 재시도 + else: + job.status = "failed" + job.completed_at = now() diff --git a/app/workers/study_queue_consumer.py b/app/workers/study_queue_consumer.py new file mode 100644 index 0000000..6b565e7 --- /dev/null +++ b/app/workers/study_queue_consumer.py @@ -0,0 +1,108 @@ +"""Phase 4-A study_question_jobs consumer — APScheduler 1분 간격. + +study 도메인 전용 큐 (processing_queue 와 분리). BATCH_SIZE=1 — MLX 26B 가 +deep_summary 와 같은 Semaphore(1) 게이트를 공유하므로 GPU 부하 통제. + +stage 별 worker dispatch: + - kind='explanation': study_explanation_worker.run_explanation_job (Phase 4-A) + - kind='session_summary': Phase 4-B 예약, 1차 미구현 — pending 만 보면 즉시 skipped 처리 +""" + +from __future__ import annotations + +from datetime import datetime, timedelta, timezone + +from sqlalchemy import select, update +from sqlalchemy.exc import SQLAlchemyError + +from core.database import async_session +from core.utils import setup_logger +from models.study_question_job import StudyQuestionJob +from workers.study_explanation_worker import run_explanation_job + +logger = setup_logger("study_queue_consumer") + +# 한 사이클에 한 row 씩 — MLX 게이트 직렬화 + GPU 부하 예측 가능. +BATCH_SIZE = 1 +# processing 상태로 STALE_MINUTES 이상 방치된 job 은 worker 가 죽었다고 보고 pending 복구. +STALE_MINUTES = 10 + + +async def reset_stale_study_jobs() -> None: + """processing 으로 STALE_MINUTES 이상 방치된 job 을 pending 으로 복구. + + partial unique idx 가 (qid, kind) WHERE status IN ('pending','processing') 이라 + 같은 qid 의 다른 pending 이 동시에 있을 일이 거의 없음 (enqueue 가 on_conflict_do_nothing). + 그래도 안전하게 update 만 — 충돌 시 IntegrityError 는 단건 단위 catch. + """ + cutoff = datetime.now(timezone.utc) - timedelta(minutes=STALE_MINUTES) + try: + async with async_session() as session: + stmt = ( + update(StudyQuestionJob) + .where( + StudyQuestionJob.status == "processing", + StudyQuestionJob.started_at.is_not(None), + StudyQuestionJob.started_at < cutoff, + ) + .values(status="pending", started_at=None) + ) + result = await session.execute(stmt) + await session.commit() + n = result.rowcount or 0 + if n > 0: + logger.warning("study_jobs_stale_reset count=%s", n) + except SQLAlchemyError as e: + logger.exception("study_jobs_stale_reset_failed: %s", e) + + +async def consume_study_queue() -> None: + """APScheduler 진입점. pending job BATCH_SIZE 만큼 처리.""" + await reset_stale_study_jobs() + + async with async_session() as session: + rows = ( + await session.execute( + select(StudyQuestionJob) + .where(StudyQuestionJob.status == "pending") + .order_by(StudyQuestionJob.id.asc()) + .limit(BATCH_SIZE) + ) + ).scalars().all() + + for job_row in rows: + # 각 job 마다 독립 세션 — 한 job 실패가 다른 job 에 전염 X. + async with async_session() as s: + try: + # 같은 row 를 fresh load (lock 회피, 세션 격리) + job = await s.get(StudyQuestionJob, job_row.id) + if job is None or job.status != "pending": + continue # 다른 cycle 에서 이미 처리 + + if job.kind == "explanation": + await run_explanation_job(s, job) + elif job.kind == "session_summary": + # Phase 4-B 미구현 — 즉시 skipped 처리 (lost in queue 방지) + job.status = "skipped" + job.error_code = "unknown" + job.error_message = "session_summary not implemented in Phase 4-A" + job.completed_at = datetime.now(timezone.utc) + logger.info("study_job_unsupported_kind id=%s kind=%s", job.id, job.kind) + else: + job.status = "skipped" + job.error_code = "unknown" + job.error_message = f"unknown kind: {job.kind!r}" + job.completed_at = datetime.now(timezone.utc) + logger.warning("study_job_unknown_kind id=%s kind=%s", job.id, job.kind) + + await s.commit() + logger.info( + "study_job_processed id=%s qid=%s kind=%s status=%s error_code=%s attempts=%s", + job.id, job.study_question_id, job.kind, job.status, job.error_code, + job.attempts, + ) + except Exception as e: + # worker 안에서 잡지 못한 catastrophic 실패 — 세션 rollback 후 다음 cycle 재처리. + # study_explanation_worker 가 catch-all except 를 가지므로 여기 도달은 드묾. + await s.rollback() + logger.exception("study_job_outer_failed job_id=%s: %s", job_row.id, e) diff --git a/migrations/231_study_question_jobs.sql b/migrations/231_study_question_jobs.sql new file mode 100644 index 0000000..f3db476 --- /dev/null +++ b/migrations/231_study_question_jobs.sql @@ -0,0 +1,24 @@ +-- 231_study_question_jobs.sql +-- Phase 4-A: study question 도메인 전용 비동기 작업 큐. +-- processing_queue 가 documents.id FK 라 study_questions 에 직접 재사용 불가. +-- +-- 라이프사이클: +-- pending → processing → completed | failed | skipped +-- terminal status (completed/failed/skipped) 는 completed_at 항상 기록. +-- failed 재시도는 기존 row 를 pending 으로 되살리지 않고 새 row 생성 — 이력 누적. + +CREATE TABLE study_question_jobs ( + id BIGSERIAL PRIMARY KEY, + study_question_id BIGINT NOT NULL REFERENCES study_questions(id) ON DELETE CASCADE, + user_id BIGINT NOT NULL REFERENCES users(id) ON DELETE CASCADE, + kind VARCHAR(40) NOT NULL, + status VARCHAR(20) NOT NULL DEFAULT 'pending', + attempts SMALLINT NOT NULL DEFAULT 0, + max_attempts SMALLINT NOT NULL DEFAULT 2, + error_code VARCHAR(40), + error_message TEXT, + payload JSONB, + created_at TIMESTAMPTZ NOT NULL DEFAULT now(), + started_at TIMESTAMPTZ, + completed_at TIMESTAMPTZ +); diff --git a/migrations/232_study_q_jobs_active_uq.sql b/migrations/232_study_q_jobs_active_uq.sql new file mode 100644 index 0000000..9e28a91 --- /dev/null +++ b/migrations/232_study_q_jobs_active_uq.sql @@ -0,0 +1,8 @@ +-- 232_study_q_jobs_active_uq.sql +-- (study_question_id, kind) 활성 행 중복 차단. +-- terminal status (completed/failed/skipped) 는 누적 이력이라 unique 대상 X. +-- 같은 qid 가 failed 후 재enqueue 될 때 새 row 가 들어가는 것을 허용. + +CREATE UNIQUE INDEX uq_study_q_jobs_active + ON study_question_jobs (study_question_id, kind) + WHERE status IN ('pending', 'processing');