"""학습 워크스페이스 문제은행 + 복습모드 API. PR-2 가드레일: - study_topic 컨테이너에 자산 타입별 1:N (문제) 추가. polymorphic 단일 테이블 영구 금지. - subject/scope 강한 enum 미사용 (자유 텍스트, 자동완성은 후속 PR). - 문제 삭제는 soft delete only. attempts 는 RESTRICT FK 로 DB 레벨 보호. - correct_choice 변경 시 기존 attempts 재계산 안 함 (기록 = 시점 사실). - 복습 출제 시 정답·해설 응답 비공개 → 답 제출 시점에만 노출. - wrong_only=true 정의 = 가장 최근 attempt 가 오답인 문제. "한 번이라도 틀린 적 있는" 아님. """ import asyncio import logging import random from datetime import datetime, timezone from typing import Annotated from fastapi import APIRouter, Depends, HTTPException, Query from pydantic import BaseModel, Field from sqlalchemy import and_, case, func, select, text as sql_text, update from sqlalchemy.ext.asyncio import AsyncSession from ai.client import AIClient from core.auth import get_current_user from core.database import get_session from models.study_question import StudyQuestion, StudyQuestionAttempt from models.study_topic import StudyTopic from models.user import User from services.search.llm_gate import get_mlx_gate from services.study.explanation_rag import ( EvidenceItem, gather_explanation_context, render_evidence_block, ) logger = logging.getLogger(__name__) router = APIRouter() # ─── Helpers ─── def _verify_topic_ownership(topic: StudyTopic | None, user: User) -> StudyTopic: """study_topics.py 와 동일한 패턴 — 정보 누설 방지로 mismatch 도 404.""" if topic is None or topic.user_id != user.id or topic.deleted_at is not None: raise HTTPException(status_code=404, detail="학습 주제를 찾을 수 없습니다") return topic def _verify_question_ownership(q: StudyQuestion | None, user: User) -> StudyQuestion: if q is None or q.user_id != user.id or q.deleted_at is not None: raise HTTPException(status_code=404, detail="문제를 찾을 수 없습니다") return q # ─── Pydantic 스키마 ─── class StudyQuestionCreate(BaseModel): question_text: str = Field(min_length=1) choice_1: str = Field(min_length=1) choice_2: str = Field(min_length=1) choice_3: str = Field(min_length=1) choice_4: str = Field(min_length=1) correct_choice: int = Field(ge=1, le=4) subject: str | None = Field(default=None, max_length=120) scope: str | None = Field(default=None, max_length=200) exam_name: str | None = Field(default=None, max_length=120) exam_round: str | None = Field(default=None, max_length=120) explanation: str | None = None source_note: str | None = None is_active: bool = True class StudyQuestionUpdate(BaseModel): question_text: str | None = Field(default=None, min_length=1) choice_1: str | None = Field(default=None, min_length=1) choice_2: str | None = Field(default=None, min_length=1) choice_3: str | None = Field(default=None, min_length=1) choice_4: str | None = Field(default=None, min_length=1) correct_choice: int | None = Field(default=None, ge=1, le=4) subject: str | None = Field(default=None, max_length=120) scope: str | None = Field(default=None, max_length=200) exam_name: str | None = Field(default=None, max_length=120) exam_round: str | None = Field(default=None, max_length=120) explanation: str | None = None source_note: str | None = None is_active: bool | None = None class QuestionAttemptStats(BaseModel): attempt_count: int correct_count: int wrong_count: int class StudyQuestionResponse(BaseModel): """편집·관리용 — 정답·해설 모두 노출.""" id: int study_topic_id: int question_text: str choice_1: str choice_2: str choice_3: str choice_4: str correct_choice: int subject: str | None scope: str | None exam_name: str | None exam_round: str | None explanation: str | None source_note: str | None is_active: bool # PR-3: AI 풀이 상태 (편집 화면에서 사용). 본문은 별도 GET /ai-explanation 으로 ai_explanation_status: str = "none" ai_explanation_generated_at: datetime | None = None ai_explanation_model: str | None = None created_at: datetime updated_at: datetime stats: QuestionAttemptStats class StudyQuestionSummary(BaseModel): """통합 뷰·목록용 — 본문 truncate, 정답 비공개.""" id: int question_text: str # 앞 80자 truncate (서버 측에서 처리) subject: str | None scope: str | None exam_name: str | None exam_round: str | None is_active: bool attempt_count: int last_correct: bool | None created_at: datetime class StudyQuestionListResponse(BaseModel): items: list[StudyQuestionSummary] total: int page: int page_size: int class ReviewChoice(BaseModel): number: int text: str class ReviewQuestionItem(BaseModel): """복습 출제 응답 — 정답·해설 비공개.""" id: int question_text: str choices: list[ReviewChoice] subject: str | None scope: str | None stats: QuestionAttemptStats class ReviewQuestionListResponse(BaseModel): items: list[ReviewQuestionItem] total: int target_per_subject: int subject_distribution: dict[str, int] # {"연소공학": 20, ...} class AttemptCreate(BaseModel): selected_choice: int = Field(ge=1, le=4) class AttemptResponse(BaseModel): is_correct: bool selected_choice: int correct_choice: int explanation: str | None stats: QuestionAttemptStats # ─── PR-3: AI 풀이 생성 ─── class AIExplanationCreate(BaseModel): """수동 트리거. regenerate=true 면 캐시 무시하고 새 본문 생성.""" regenerate: bool = False class AIExplanationEvidence(BaseModel): source_type: str # 'document' | 'question' source_id: int title: str snippet: str class AIExplanationResponse(BaseModel): ai_explanation: str | None ai_explanation_status: str # ready | stale | failed | none ai_explanation_generated_at: datetime | None ai_explanation_model: str | None evidence: list[AIExplanationEvidence] = [] from_cache: bool = False is_stale: bool = False can_regenerate: bool = True # ─── 통계 헬퍼 ─── async def _attempt_stats( session: AsyncSession, user_id: int, question_id: int ) -> QuestionAttemptStats: """단일 문제의 누적 attempt 통계 (사용자 한정).""" row = ( await session.execute( select( func.count().label("total"), func.coalesce( func.sum(case((StudyQuestionAttempt.is_correct.is_(True), 1), else_=0)), 0, ).label("correct"), ).where( StudyQuestionAttempt.user_id == user_id, StudyQuestionAttempt.study_question_id == question_id, ) ) ).one() total = int(row.total or 0) correct = int(row.correct or 0) return QuestionAttemptStats( attempt_count=total, correct_count=correct, wrong_count=total - correct ) def _truncate(text: str, n: int = 80) -> str: return text if len(text) <= n else text[:n].rstrip() + "…" # ─── 토픽 단위 엔드포인트 ─── @router.get( "/study-topics/{topic_id}/questions", response_model=StudyQuestionListResponse, ) async def list_questions_in_topic( topic_id: int, user: Annotated[User, Depends(get_current_user)], session: Annotated[AsyncSession, Depends(get_session)], subject: str | None = Query(None), scope: str | None = Query(None), page: int = Query(1, ge=1), page_size: int = Query(50, ge=1, le=200), ): """토픽 내 문제 목록. soft-deleted 제외. summary 응답 (정답 비공개).""" topic = await session.get(StudyTopic, topic_id) _verify_topic_ownership(topic, user) base = select(StudyQuestion).where( StudyQuestion.user_id == user.id, StudyQuestion.study_topic_id == topic_id, StudyQuestion.deleted_at.is_(None), ) if subject is not None: base = base.where(StudyQuestion.subject == subject) if scope is not None: base = base.where(StudyQuestion.scope == scope) total = ( await session.execute(select(func.count()).select_from(base.subquery())) ).scalar() or 0 rows = ( await session.execute( base.order_by(StudyQuestion.created_at.desc(), StudyQuestion.id.desc()) .offset((page - 1) * page_size) .limit(page_size) ) ).scalars().all() # 한 번의 쿼리로 attempt 통계 + 마지막 정/오답 끌어오기 (N+1 회피) qids = [q.id for q in rows] stats_map: dict[int, tuple[int, int]] = {} last_correct_map: dict[int, bool] = {} if qids: stats_rows = ( await session.execute( select( StudyQuestionAttempt.study_question_id, func.count().label("total"), func.coalesce( func.sum(case((StudyQuestionAttempt.is_correct.is_(True), 1), else_=0)), 0, ).label("correct"), ) .where( StudyQuestionAttempt.user_id == user.id, StudyQuestionAttempt.study_question_id.in_(qids), ) .group_by(StudyQuestionAttempt.study_question_id) ) ).all() for r in stats_rows: stats_map[r.study_question_id] = (int(r.total), int(r.correct)) latest_rows = ( await session.execute( select( StudyQuestionAttempt.study_question_id, StudyQuestionAttempt.is_correct, StudyQuestionAttempt.answered_at, ) .where( StudyQuestionAttempt.user_id == user.id, StudyQuestionAttempt.study_question_id.in_(qids), ) .order_by( StudyQuestionAttempt.study_question_id, StudyQuestionAttempt.answered_at.desc(), ) .distinct(StudyQuestionAttempt.study_question_id) ) ).all() for r in latest_rows: last_correct_map[r.study_question_id] = bool(r.is_correct) items = [ StudyQuestionSummary( id=q.id, question_text=_truncate(q.question_text, 80), subject=q.subject, scope=q.scope, exam_name=q.exam_name, exam_round=q.exam_round, is_active=q.is_active, attempt_count=stats_map.get(q.id, (0, 0))[0], last_correct=last_correct_map.get(q.id), created_at=q.created_at, ) for q in rows ] return StudyQuestionListResponse(items=items, total=total, page=page, page_size=page_size) @router.post( "/study-topics/{topic_id}/questions", response_model=StudyQuestionResponse, status_code=201, ) async def create_question_in_topic( topic_id: int, body: StudyQuestionCreate, user: Annotated[User, Depends(get_current_user)], session: Annotated[AsyncSession, Depends(get_session)], ): topic = await session.get(StudyTopic, topic_id) _verify_topic_ownership(topic, user) q = StudyQuestion( user_id=user.id, study_topic_id=topic_id, question_text=body.question_text, choice_1=body.choice_1, choice_2=body.choice_2, choice_3=body.choice_3, choice_4=body.choice_4, correct_choice=body.correct_choice, subject=body.subject, scope=body.scope, exam_name=body.exam_name, exam_round=body.exam_round, explanation=body.explanation, source_note=body.source_note, is_active=body.is_active, ) session.add(q) await session.flush() await session.commit() stats = QuestionAttemptStats(attempt_count=0, correct_count=0, wrong_count=0) return StudyQuestionResponse( id=q.id, study_topic_id=q.study_topic_id, question_text=q.question_text, choice_1=q.choice_1, choice_2=q.choice_2, choice_3=q.choice_3, choice_4=q.choice_4, correct_choice=q.correct_choice, subject=q.subject, scope=q.scope, exam_name=q.exam_name, exam_round=q.exam_round, explanation=q.explanation, source_note=q.source_note, is_active=q.is_active, ai_explanation_status=q.ai_explanation_status, ai_explanation_generated_at=q.ai_explanation_generated_at, ai_explanation_model=q.ai_explanation_model, created_at=q.created_at, updated_at=q.updated_at, stats=stats, ) # ─── 복습 출제 ─── @router.get( "/study-topics/{topic_id}/review/questions", response_model=ReviewQuestionListResponse, ) async def review_questions_for_topic( topic_id: int, user: Annotated[User, Depends(get_current_user)], session: Annotated[AsyncSession, Depends(get_session)], subject: str | None = Query(None, description="단일 과목 집중. 미지정 시 과목별 균등 추출"), scope: str | None = Query(None), target_per_subject: int = Query(20, ge=1, le=100), wrong_only: bool = Query( False, description="가장 최근 attempt 가 오답인 문제만 (latest-wrong, ever_wrong 아님)", ), ): """복습 출제. default = 과목별 target_per_subject 무작위 균등 추출. 응답에서 정답·해설은 비공개 — 답 제출(POST /api/study-questions/{id}/attempt) 시점에만 노출. """ topic = await session.get(StudyTopic, topic_id) _verify_topic_ownership(topic, user) # 후보 질문 base — soft delete 제외 + is_active base_filter = and_( StudyQuestion.user_id == user.id, StudyQuestion.study_topic_id == topic_id, StudyQuestion.deleted_at.is_(None), StudyQuestion.is_active.is_(True), ) # wrong_only=true: latest attempt 가 오답인 question_id 만 후보로 좁힘. # DISTINCT ON (study_question_id) ORDER BY answered_at DESC 패턴. wrong_qids: set[int] | None = None if wrong_only: latest = ( await session.execute( select( StudyQuestionAttempt.study_question_id, StudyQuestionAttempt.is_correct, ) .where( StudyQuestionAttempt.user_id == user.id, StudyQuestionAttempt.study_topic_id == topic_id, ) .order_by( StudyQuestionAttempt.study_question_id, StudyQuestionAttempt.answered_at.desc(), ) .distinct(StudyQuestionAttempt.study_question_id) ) ).all() wrong_qids = {r.study_question_id for r in latest if r.is_correct is False} if not wrong_qids: return ReviewQuestionListResponse( items=[], total=0, target_per_subject=target_per_subject, subject_distribution={} ) # 과목 그룹 결정 if subject is not None: # 단일 과목 집중 subjects: list[str | None] = [subject] else: subj_query = ( select(StudyQuestion.subject) .where(base_filter) .distinct() ) if scope is not None: subj_query = subj_query.where(StudyQuestion.scope == scope) if wrong_qids is not None: subj_query = subj_query.where(StudyQuestion.id.in_(wrong_qids)) subjects = [r[0] for r in (await session.execute(subj_query)).all()] if not subjects: return ReviewQuestionListResponse( items=[], total=0, target_per_subject=target_per_subject, subject_distribution={} ) # 각 subject 별로 ORDER BY random() LIMIT target_per_subject selected: list[StudyQuestion] = [] distribution: dict[str, int] = {} for subj in subjects: sub_q = select(StudyQuestion).where(base_filter) if subj is None: sub_q = sub_q.where(StudyQuestion.subject.is_(None)) else: sub_q = sub_q.where(StudyQuestion.subject == subj) if scope is not None: sub_q = sub_q.where(StudyQuestion.scope == scope) if wrong_qids is not None: sub_q = sub_q.where(StudyQuestion.id.in_(wrong_qids)) sub_q = sub_q.order_by(func.random()).limit(target_per_subject) rows = (await session.execute(sub_q)).scalars().all() if rows: selected.extend(rows) distribution[subj or ""] = len(rows) # 그룹별 추출 후 전체 셔플 (입력 순서 단조 회피) random.shuffle(selected) # attempt 통계 batch (N+1 회피) qids = [q.id for q in selected] stats_map: dict[int, QuestionAttemptStats] = {} if qids: stats_rows = ( await session.execute( select( StudyQuestionAttempt.study_question_id, func.count().label("total"), func.coalesce( func.sum(case((StudyQuestionAttempt.is_correct.is_(True), 1), else_=0)), 0, ).label("correct"), ) .where( StudyQuestionAttempt.user_id == user.id, StudyQuestionAttempt.study_question_id.in_(qids), ) .group_by(StudyQuestionAttempt.study_question_id) ) ).all() for r in stats_rows: t = int(r.total) c = int(r.correct) stats_map[r.study_question_id] = QuestionAttemptStats( attempt_count=t, correct_count=c, wrong_count=t - c ) items = [ ReviewQuestionItem( id=q.id, question_text=q.question_text, choices=[ ReviewChoice(number=1, text=q.choice_1), ReviewChoice(number=2, text=q.choice_2), ReviewChoice(number=3, text=q.choice_3), ReviewChoice(number=4, text=q.choice_4), ], subject=q.subject, scope=q.scope, stats=stats_map.get( q.id, QuestionAttemptStats(attempt_count=0, correct_count=0, wrong_count=0) ), ) for q in selected ] return ReviewQuestionListResponse( items=items, total=len(items), target_per_subject=target_per_subject, subject_distribution=distribution, ) # ─── 단건 엔드포인트 ─── @router.get("/study-questions/{question_id}", response_model=StudyQuestionResponse) async def get_question( question_id: int, user: Annotated[User, Depends(get_current_user)], session: Annotated[AsyncSession, Depends(get_session)], ): q = await session.get(StudyQuestion, question_id) q = _verify_question_ownership(q, user) stats = await _attempt_stats(session, user.id, question_id) return StudyQuestionResponse( id=q.id, study_topic_id=q.study_topic_id, question_text=q.question_text, choice_1=q.choice_1, choice_2=q.choice_2, choice_3=q.choice_3, choice_4=q.choice_4, correct_choice=q.correct_choice, subject=q.subject, scope=q.scope, exam_name=q.exam_name, exam_round=q.exam_round, explanation=q.explanation, source_note=q.source_note, is_active=q.is_active, ai_explanation_status=q.ai_explanation_status, ai_explanation_generated_at=q.ai_explanation_generated_at, ai_explanation_model=q.ai_explanation_model, created_at=q.created_at, updated_at=q.updated_at, stats=stats, ) @router.patch("/study-questions/{question_id}", response_model=StudyQuestionResponse) async def update_question( question_id: int, body: StudyQuestionUpdate, user: Annotated[User, Depends(get_current_user)], session: Annotated[AsyncSession, Depends(get_session)], ): """부분 업데이트. correct_choice 변경 시 기존 attempts.is_correct 재계산 안 함 (시점 사실 보존). 프론트 편집 페이지에 안내 배너 노출.""" q = await session.get(StudyQuestion, question_id) q = _verify_question_ownership(q, user) fields_set = body.model_fields_set SIMPLE_FIELDS = { "question_text", "choice_1", "choice_2", "choice_3", "choice_4", "correct_choice", "subject", "scope", "exam_name", "exam_round", "explanation", "source_note", "is_active", } for fname in SIMPLE_FIELDS & fields_set: setattr(q, fname, getattr(body, fname)) # PR-3: 문제 핵심 필드 변경 시 AI 해설 stale 전이 (본문은 보존, UI 배지로 안내). # ready 상태에서만 stale 로 전이 — pending/failed/none/stale 은 변경 안 함. AI_STALE_TRIGGER = { "question_text", "choice_1", "choice_2", "choice_3", "choice_4", "correct_choice", } if AI_STALE_TRIGGER & fields_set and q.ai_explanation_status == "ready": q.ai_explanation_status = "stale" # PR-4: 임베딩 stale 전이. 본문(question_text/choice_*)이 바뀌었을 때만 재계산. # correct_choice 변경은 의미 검색에 영향 없으므로 재계산 안 함. EMBED_STALE_TRIGGER = { "question_text", "choice_1", "choice_2", "choice_3", "choice_4", } if EMBED_STALE_TRIGGER & fields_set and q.embedding_status == "ready": q.embedding_status = "stale" q.updated_at = datetime.now(timezone.utc) await session.commit() stats = await _attempt_stats(session, user.id, question_id) return StudyQuestionResponse( id=q.id, study_topic_id=q.study_topic_id, question_text=q.question_text, choice_1=q.choice_1, choice_2=q.choice_2, choice_3=q.choice_3, choice_4=q.choice_4, correct_choice=q.correct_choice, subject=q.subject, scope=q.scope, exam_name=q.exam_name, exam_round=q.exam_round, explanation=q.explanation, source_note=q.source_note, is_active=q.is_active, ai_explanation_status=q.ai_explanation_status, ai_explanation_generated_at=q.ai_explanation_generated_at, ai_explanation_model=q.ai_explanation_model, created_at=q.created_at, updated_at=q.updated_at, stats=stats, ) @router.delete("/study-questions/{question_id}", status_code=204) async def soft_delete_question( question_id: int, user: Annotated[User, Depends(get_current_user)], session: Annotated[AsyncSession, Depends(get_session)], ): """soft delete only. attempts 는 RESTRICT FK 로 보호되어 영구 보존. hard delete 호출 경로 자체가 없음 — DB 레벨에서도 attempts 가 있으면 거부.""" q = await session.get(StudyQuestion, question_id) q = _verify_question_ownership(q, user) q.deleted_at = datetime.now(timezone.utc) q.updated_at = q.deleted_at await session.commit() # ─── attempt ─── @router.post("/study-questions/{question_id}/attempt", response_model=AttemptResponse) async def submit_attempt( question_id: int, body: AttemptCreate, user: Annotated[User, Depends(get_current_user)], session: Annotated[AsyncSession, Depends(get_session)], ): """답 제출. is_correct 판정 + attempt 1행 insert + 누적 통계 + 정답·해설 노출.""" q = await session.get(StudyQuestion, question_id) q = _verify_question_ownership(q, user) is_correct = body.selected_choice == q.correct_choice attempt = StudyQuestionAttempt( user_id=user.id, study_question_id=q.id, study_topic_id=q.study_topic_id, selected_choice=body.selected_choice, correct_choice=q.correct_choice, is_correct=is_correct, ) session.add(attempt) await session.commit() stats = await _attempt_stats(session, user.id, question_id) return AttemptResponse( is_correct=is_correct, selected_choice=body.selected_choice, correct_choice=q.correct_choice, explanation=q.explanation, stats=stats, ) # ─── PR-3: AI 풀이 생성 엔드포인트 ─── # MLX 호출 timeout (초). MLX gate + 26B 추론 평균 ~10s, 안전 마진. LLM_TIMEOUT_S = 30.0 # 프롬프트 템플릿 lazy load _PROMPT_PATH = "study_question_explanation.txt" _prompt_cache: str | None = None def _load_explanation_prompt() -> str: global _prompt_cache if _prompt_cache is None: from pathlib import Path prompts_dir = Path(__file__).resolve().parent.parent / "prompts" _prompt_cache = (prompts_dir / _PROMPT_PATH).read_text(encoding="utf-8") return _prompt_cache def _render_prompt(q: StudyQuestion, doc_block: str, question_block: str) -> str: template = _load_explanation_prompt() return ( template .replace("{question_text}", q.question_text) .replace("{choice_1}", q.choice_1) .replace("{choice_2}", q.choice_2) .replace("{choice_3}", q.choice_3) .replace("{choice_4}", q.choice_4) .replace("{correct_choice}", str(q.correct_choice)) .replace("{documents_evidence_block}", doc_block) .replace("{questions_evidence_block}", question_block) ) def _make_cache_response(q: StudyQuestion, is_stale: bool) -> AIExplanationResponse: """캐시 본문만 반환 (evidence 재계산 안 함). status/메타는 q row 그대로.""" return AIExplanationResponse( ai_explanation=q.ai_explanation, ai_explanation_status=q.ai_explanation_status, ai_explanation_generated_at=q.ai_explanation_generated_at, ai_explanation_model=q.ai_explanation_model, evidence=[], from_cache=True, is_stale=is_stale, can_regenerate=True, ) @router.post( "/study-questions/{question_id}/ai-explanation", response_model=AIExplanationResponse, ) async def generate_ai_explanation( question_id: int, body: AIExplanationCreate, user: Annotated[User, Depends(get_current_user)], session: Annotated[AsyncSession, Depends(get_session)], ): """수동 트리거 AI 풀이 생성. RAG = 매핑 자료 + 같은 토픽 다른 문제 (자기 자신 제외). primary 모델 단독 (Mac mini MLX gemma-4-26b). MLX gate Semaphore(1) 경유. - regenerate=false 이고 status=ready → 캐시 즉시 반환 (MLX 호출 없음) - regenerate=false 이고 status=stale → 캐시 본문 반환, is_stale=true 플래그 - status=pending → 409 (이미 다른 호출 진행 중, race-safe 조건부 UPDATE 로 보장) - 그 외 (none/failed/regenerate=true) → 새로 생성 실패 시 status='failed', 직전 본문은 보존. """ q = await session.get(StudyQuestion, question_id) q = _verify_question_ownership(q, user) # 1) 캐시 단축 분기 (regenerate=false) if not body.regenerate: if q.ai_explanation_status == "ready": return _make_cache_response(q, is_stale=False) if q.ai_explanation_status == "stale": return _make_cache_response(q, is_stale=True) if q.ai_explanation_status == "pending": raise HTTPException( status_code=409, detail={"status": "pending", "detail": "이미 생성 중입니다"}, ) # status in {none, failed} → 신규 생성 else: # regenerate=true 라도 pending 은 차단 (동시 호출 방어) if q.ai_explanation_status == "pending": raise HTTPException( status_code=409, detail={"status": "pending", "detail": "이미 생성 중입니다"}, ) # 2) Race-safe pending 전이 (조건부 UPDATE — 동시 호출 차단) lock_result = await session.execute( update(StudyQuestion) .where( StudyQuestion.id == question_id, StudyQuestion.user_id == user.id, StudyQuestion.ai_explanation_status != "pending", ) .values(ai_explanation_status="pending", updated_at=datetime.now(timezone.utc)) .returning(StudyQuestion.id) ) if lock_result.scalar_one_or_none() is None: # 다른 호출이 먼저 pending 박음 raise HTTPException( status_code=409, detail={"status": "pending", "detail": "이미 생성 중입니다"}, ) await session.commit() await session.refresh(q) # 3) RAG 근거 수집 try: ctx = await gather_explanation_context(session, user.id, q) except Exception as e: logger.warning("study_explanation_rag_failed: %s: %s", type(e).__name__, e) # RAG 실패 시 빈 evidence 로 진행 (프롬프트가 "자료 근거 부족" 분기 처리) from services.study.explanation_rag import ExplanationContext ctx = ExplanationContext(documents=[], questions=[]) # 4) 프롬프트 조립 + MLX gate 안에서 primary 호출 + timeout doc_block = render_evidence_block(ctx.documents) q_block = render_evidence_block(ctx.questions) prompt = _render_prompt(q, doc_block, q_block) ai_client = AIClient() raw_text: str | None = None error_message: str | None = None try: async with get_mlx_gate(): async with asyncio.timeout(LLM_TIMEOUT_S): raw_text = await ai_client.call_primary(prompt) except asyncio.TimeoutError: error_message = f"MLX timeout ({LLM_TIMEOUT_S}s)" logger.warning("study_explanation_mlx_timeout qid=%s", question_id) except Exception as e: error_message = f"{type(e).__name__}: {e}" logger.exception("study_explanation_mlx_failed qid=%s", question_id) finally: await ai_client.close() # 5) 결과 저장 (성공/실패 분기) — q 객체 fresh refresh q = await session.get(StudyQuestion, question_id) if raw_text is None or not raw_text.strip(): # 실패 — 직전 본문 보존, status='failed' q.ai_explanation_status = "failed" q.updated_at = datetime.now(timezone.utc) await session.commit() return AIExplanationResponse( ai_explanation=q.ai_explanation, ai_explanation_status="failed", ai_explanation_generated_at=q.ai_explanation_generated_at, ai_explanation_model=q.ai_explanation_model, evidence=[e.to_dict() for e in ctx.all], from_cache=False, is_stale=False, can_regenerate=True, ) # 성공 — Qwen think 태그 등 제거 from ai.client import strip_thinking cleaned = strip_thinking(raw_text).strip() q.ai_explanation = cleaned q.ai_explanation_status = "ready" q.ai_explanation_generated_at = datetime.now(timezone.utc) primary_name = ai_client.ai.primary.model if hasattr(ai_client.ai.primary, "model") else "primary" q.ai_explanation_model = f"mlx:{primary_name}" q.updated_at = q.ai_explanation_generated_at await session.commit() return AIExplanationResponse( ai_explanation=q.ai_explanation, ai_explanation_status="ready", ai_explanation_generated_at=q.ai_explanation_generated_at, ai_explanation_model=q.ai_explanation_model, evidence=[e.to_dict() for e in ctx.all], from_cache=False, is_stale=False, can_regenerate=True, )