"""학습 워크스페이스 문제은행 + 복습모드 API. PR-2 가드레일: - study_topic 컨테이너에 자산 타입별 1:N (문제) 추가. polymorphic 단일 테이블 영구 금지. - subject/scope 강한 enum 미사용 (자유 텍스트, 자동완성은 후속 PR). - 문제 삭제는 soft delete only. attempts 는 RESTRICT FK 로 DB 레벨 보호. - correct_choice 변경 시 기존 attempts 재계산 안 함 (기록 = 시점 사실). - 복습 출제 시 정답·해설 응답 비공개 → 답 제출 시점에만 노출. - wrong_only=true 정의 = 가장 최근 attempt 가 오답인 문제. "한 번이라도 틀린 적 있는" 아님. """ import asyncio import logging import random from datetime import datetime, timezone from typing import Annotated from fastapi import APIRouter, Depends, File, HTTPException, Query, UploadFile from fastapi.responses import FileResponse from pydantic import BaseModel, Field from sqlalchemy import and_, case, func, select, text as sql_text, update from sqlalchemy.ext.asyncio import AsyncSession from ai.client import AIClient from core.auth import get_current_user from core.config import settings from core.database import get_session from models.study_question import StudyQuestion, StudyQuestionAttempt from models.study_question_image import StudyQuestionImage from models.study_quiz_session import StudyQuizSession from models.study_topic import StudyTopic from models.user import User from services.search.llm_gate import get_mlx_gate from services.study.explanation_rag import ( EvidenceItem, gather_explanation_context, render_evidence_block, ) logger = logging.getLogger(__name__) router = APIRouter() # ─── Helpers ─── def _verify_topic_ownership(topic: StudyTopic | None, user: User) -> StudyTopic: """study_topics.py 와 동일한 패턴 — 정보 누설 방지로 mismatch 도 404.""" if topic is None or topic.user_id != user.id or topic.deleted_at is not None: raise HTTPException(status_code=404, detail="학습 주제를 찾을 수 없습니다") return topic def _verify_question_ownership(q: StudyQuestion | None, user: User) -> StudyQuestion: if q is None or q.user_id != user.id or q.deleted_at is not None: raise HTTPException(status_code=404, detail="문제를 찾을 수 없습니다") return q # ─── Pydantic 스키마 ─── class StudyQuestionCreate(BaseModel): question_text: str = Field(min_length=1) choice_1: str = Field(min_length=1) choice_2: str = Field(min_length=1) choice_3: str = Field(min_length=1) choice_4: str = Field(min_length=1) correct_choice: int = Field(ge=1, le=4) subject: str | None = Field(default=None, max_length=120) scope: str | None = Field(default=None, max_length=200) exam_name: str | None = Field(default=None, max_length=120) exam_round: str | None = Field(default=None, max_length=120) # PR-6: 회차 안 문항 번호. 미명시 + exam_round 명시 시 서버가 max+1 자동 채움. exam_question_number: int | None = Field(default=None, ge=1) explanation: str | None = None source_note: str | None = None is_active: bool = True class StudyQuestionUpdate(BaseModel): question_text: str | None = Field(default=None, min_length=1) choice_1: str | None = Field(default=None, min_length=1) choice_2: str | None = Field(default=None, min_length=1) choice_3: str | None = Field(default=None, min_length=1) choice_4: str | None = Field(default=None, min_length=1) correct_choice: int | None = Field(default=None, ge=1, le=4) subject: str | None = Field(default=None, max_length=120) scope: str | None = Field(default=None, max_length=200) exam_name: str | None = Field(default=None, max_length=120) exam_round: str | None = Field(default=None, max_length=120) exam_question_number: int | None = Field(default=None, ge=1) explanation: str | None = None source_note: str | None = None is_active: bool | None = None class QuestionAttemptStats(BaseModel): attempt_count: int correct_count: int wrong_count: int class StudyQuestionImageItem(BaseModel): """문제 첨부 이미지 메타. raw bytes 는 별도 GET endpoint.""" id: int sort_order: int mime_type: str file_size: int served_url: str # 클라이언트 표시용 — /api/study-questions/{qid}/images/{img_id}/raw class StudyQuestionResponse(BaseModel): """편집·관리용 — 정답·해설 모두 노출.""" id: int study_topic_id: int question_text: str choice_1: str choice_2: str choice_3: str choice_4: str correct_choice: int subject: str | None scope: str | None exam_name: str | None exam_round: str | None explanation: str | None source_note: str | None is_active: bool # PR-6: 회차 안 문항 번호 exam_question_number: int | None = None # PR-3: AI 풀이 상태 (편집 화면에서 사용). 본문은 별도 GET /ai-explanation 으로 ai_explanation_status: str = "none" ai_explanation_generated_at: datetime | None = None ai_explanation_model: str | None = None # PR-8: 첨부 이미지 images: list[StudyQuestionImageItem] = [] created_at: datetime updated_at: datetime stats: QuestionAttemptStats class StudyQuestionSummary(BaseModel): """통합 뷰·목록용 — 본문 truncate, 정답 비공개.""" id: int question_text: str # 앞 80자 truncate (서버 측에서 처리) subject: str | None scope: str | None exam_name: str | None exam_round: str | None exam_question_number: int | None = None # PR-11/12 보기 페이지 prev/next 용 is_active: bool has_images: bool = False attempt_count: int last_correct: bool | None created_at: datetime class StudyQuestionListResponse(BaseModel): items: list[StudyQuestionSummary] total: int page: int page_size: int class ReviewChoice(BaseModel): number: int text: str class ReviewQuestionItem(BaseModel): """복습 출제 응답 — 정답·해설 비공개.""" id: int question_text: str choices: list[ReviewChoice] subject: str | None scope: str | None stats: QuestionAttemptStats images: list[StudyQuestionImageItem] = [] class ReviewQuestionListResponse(BaseModel): items: list[ReviewQuestionItem] total: int target_per_subject: int subject_distribution: dict[str, int] # {"연소공학": 20, ...} class AttemptCreate(BaseModel): """PR-9: selected_choice (1~4) 또는 is_unsure 둘 중 하나 필수. is_unsure=true 면 selected_choice 무시 + outcome='unsure' 로 박힘. PR-10: quiz_session_id 전달 시 세션의 cursor + count 도 함께 갱신. """ selected_choice: int | None = Field(default=None, ge=1, le=4) is_unsure: bool = False quiz_session_id: int | None = None class AttemptResponse(BaseModel): id: int # PR-10: 학습완료 토글 시 attempt 단위로 PATCH 하므로 id 노출 필요. is_correct: bool selected_choice: int | None correct_choice: int outcome: str # PR-9: correct | wrong | unsure explanation: str | None stats: QuestionAttemptStats quiz_session_id: int | None = None quiz_session_status: str | None = None # in_progress | done (마지막 문제면 done 으로 박힘) quiz_session_cursor: int | None = None reviewed_at: datetime | None = None # ─── PR-5: 비슷한 문제 검색 (embedding cosine) ─── class SimilarQuestionItem(BaseModel): id: int study_topic_id: int question_text: str # 80자 truncate subject: str | None scope: str | None exam_name: str | None exam_round: str | None similarity: float # 1 - cosine_distance, 1.0=동일, 0.0=무관 attempt_count: int last_correct: bool | None class SimilarQuestionsResponse(BaseModel): items: list[SimilarQuestionItem] source_status: str # 현재 문제의 embedding_status. 'ready' 가 아니면 빈 결과. source_id: int # ─── PR-3: AI 풀이 생성 ─── class AIExplanationCreate(BaseModel): """수동 트리거. regenerate=true 면 캐시 무시하고 새 본문 생성.""" regenerate: bool = False class AIExplanationEvidence(BaseModel): source_type: str # 'document' | 'question' source_id: int title: str snippet: str class AIExplanationResponse(BaseModel): ai_explanation: str | None ai_explanation_status: str # ready | stale | failed | none ai_explanation_generated_at: datetime | None ai_explanation_model: str | None evidence: list[AIExplanationEvidence] = [] from_cache: bool = False is_stale: bool = False can_regenerate: bool = True # ─── 통계 헬퍼 ─── # PR-8: 이미지 저장 root + 헬퍼 from pathlib import Path as _Path STUDY_IMG_ROOT = _Path(settings.nas_mount_path) / "study_question_images" ALLOWED_IMAGE_MIME = {"image/png", "image/jpeg", "image/webp", "image/gif"} MAX_IMAGE_BYTES = 10 * 1024 * 1024 # 10MB / 파일 def _image_url(qid: int, img_id: int) -> str: return f"/api/study-questions/{qid}/images/{img_id}/raw" def _image_to_item(img: StudyQuestionImage) -> StudyQuestionImageItem: return StudyQuestionImageItem( id=img.id, sort_order=img.sort_order, mime_type=img.mime_type, file_size=img.file_size, served_url=_image_url(img.study_question_id, img.id), ) async def _images_for_question( session: AsyncSession, question_id: int ) -> list[StudyQuestionImageItem]: rows = ( await session.execute( select(StudyQuestionImage) .where(StudyQuestionImage.study_question_id == question_id) .order_by(StudyQuestionImage.sort_order.asc(), StudyQuestionImage.id.asc()) ) ).scalars().all() return [_image_to_item(r) for r in rows] async def _images_for_questions_batch( session: AsyncSession, question_ids: list[int] ) -> dict[int, list[StudyQuestionImageItem]]: """N+1 회피 — 여러 question 의 이미지 한 번에 조회.""" if not question_ids: return {} rows = ( await session.execute( select(StudyQuestionImage) .where(StudyQuestionImage.study_question_id.in_(question_ids)) .order_by( StudyQuestionImage.study_question_id, StudyQuestionImage.sort_order.asc(), StudyQuestionImage.id.asc(), ) ) ).scalars().all() out: dict[int, list[StudyQuestionImageItem]] = {qid: [] for qid in question_ids} for r in rows: out.setdefault(r.study_question_id, []).append(_image_to_item(r)) return out async def _attempt_stats( session: AsyncSession, user_id: int, question_id: int ) -> QuestionAttemptStats: """단일 문제의 누적 attempt 통계 (사용자 한정).""" row = ( await session.execute( select( func.count().label("total"), func.coalesce( func.sum(case((StudyQuestionAttempt.is_correct.is_(True), 1), else_=0)), 0, ).label("correct"), ).where( StudyQuestionAttempt.user_id == user_id, StudyQuestionAttempt.study_question_id == question_id, ) ) ).one() total = int(row.total or 0) correct = int(row.correct or 0) return QuestionAttemptStats( attempt_count=total, correct_count=correct, wrong_count=total - correct ) def _truncate(text: str, n: int = 80) -> str: return text if len(text) <= n else text[:n].rstrip() + "…" # ─── 토픽 단위 엔드포인트 ─── @router.get( "/study-topics/{topic_id}/questions", response_model=StudyQuestionListResponse, ) async def list_questions_in_topic( topic_id: int, user: Annotated[User, Depends(get_current_user)], session: Annotated[AsyncSession, Depends(get_session)], subject: str | None = Query(None), scope: str | None = Query(None), exam_round: str | None = Query(None, description="회차 필터 — 보기 페이지 prev/next 같은 단일 회차 한정 조회"), page: int = Query(1, ge=1), page_size: int = Query(50, ge=1, le=200), ): """토픽 내 문제 목록. soft-deleted 제외. summary 응답 (정답 비공개).""" topic = await session.get(StudyTopic, topic_id) _verify_topic_ownership(topic, user) base = select(StudyQuestion).where( StudyQuestion.user_id == user.id, StudyQuestion.study_topic_id == topic_id, StudyQuestion.deleted_at.is_(None), ) if subject is not None: base = base.where(StudyQuestion.subject == subject) if scope is not None: base = base.where(StudyQuestion.scope == scope) if exam_round is not None: base = base.where(StudyQuestion.exam_round == exam_round) total = ( await session.execute(select(func.count()).select_from(base.subquery())) ).scalar() or 0 # 정렬: exam_round 필터 시 회차 안 prev/next 안정화 (exam_question_number asc NULLS LAST, # created_at asc fallback). 미필터 시 기존 created_at desc 유지. if exam_round is not None: order_clause = ( StudyQuestion.exam_question_number.asc().nulls_last(), StudyQuestion.created_at.asc(), StudyQuestion.id.asc(), ) else: order_clause = (StudyQuestion.created_at.desc(), StudyQuestion.id.desc()) rows = ( await session.execute( base.order_by(*order_clause) .offset((page - 1) * page_size) .limit(page_size) ) ).scalars().all() # 한 번의 쿼리로 attempt 통계 + 마지막 정/오답 끌어오기 (N+1 회피) qids = [q.id for q in rows] stats_map: dict[int, tuple[int, int]] = {} last_correct_map: dict[int, bool] = {} if qids: stats_rows = ( await session.execute( select( StudyQuestionAttempt.study_question_id, func.count().label("total"), func.coalesce( func.sum(case((StudyQuestionAttempt.is_correct.is_(True), 1), else_=0)), 0, ).label("correct"), ) .where( StudyQuestionAttempt.user_id == user.id, StudyQuestionAttempt.study_question_id.in_(qids), ) .group_by(StudyQuestionAttempt.study_question_id) ) ).all() for r in stats_rows: stats_map[r.study_question_id] = (int(r.total), int(r.correct)) latest_rows = ( await session.execute( select( StudyQuestionAttempt.study_question_id, StudyQuestionAttempt.is_correct, StudyQuestionAttempt.answered_at, ) .where( StudyQuestionAttempt.user_id == user.id, StudyQuestionAttempt.study_question_id.in_(qids), ) .order_by( StudyQuestionAttempt.study_question_id, StudyQuestionAttempt.answered_at.desc(), ) .distinct(StudyQuestionAttempt.study_question_id) ) ).all() for r in latest_rows: last_correct_map[r.study_question_id] = bool(r.is_correct) # PR-8: has_images batch has_image_set: set[int] = set() if qids: img_rows = ( await session.execute( select(StudyQuestionImage.study_question_id) .where(StudyQuestionImage.study_question_id.in_(qids)) .distinct() ) ).scalars().all() has_image_set = set(img_rows) items = [ StudyQuestionSummary( id=q.id, question_text=_truncate(q.question_text, 80), subject=q.subject, scope=q.scope, exam_name=q.exam_name, exam_round=q.exam_round, exam_question_number=q.exam_question_number, is_active=q.is_active, attempt_count=stats_map.get(q.id, (0, 0))[0], last_correct=last_correct_map.get(q.id), has_images=q.id in has_image_set, created_at=q.created_at, ) for q in rows ] return StudyQuestionListResponse(items=items, total=total, page=page, page_size=page_size) @router.post( "/study-topics/{topic_id}/questions", response_model=StudyQuestionResponse, status_code=201, ) async def create_question_in_topic( topic_id: int, body: StudyQuestionCreate, user: Annotated[User, Depends(get_current_user)], session: Annotated[AsyncSession, Depends(get_session)], ): topic = await session.get(StudyTopic, topic_id) _verify_topic_ownership(topic, user) # PR-6: exam_question_number 미명시 + exam_round 명시 시 서버가 max+1 자동 채움. # 개인 사용 환경 race 부담 적어 단순 SELECT max 사용. 동시 입력 빈번해지면 향후 # SELECT FOR UPDATE 또는 토픽 단위 advisory lock 으로 강화 검토. qnum = body.exam_question_number if qnum is None and body.exam_round: max_row = await session.execute( select(func.coalesce(func.max(StudyQuestion.exam_question_number), 0)) .where( StudyQuestion.user_id == user.id, StudyQuestion.study_topic_id == topic_id, StudyQuestion.exam_round == body.exam_round, StudyQuestion.deleted_at.is_(None), ) ) qnum = int(max_row.scalar() or 0) + 1 q = StudyQuestion( user_id=user.id, study_topic_id=topic_id, question_text=body.question_text, choice_1=body.choice_1, choice_2=body.choice_2, choice_3=body.choice_3, choice_4=body.choice_4, correct_choice=body.correct_choice, subject=body.subject, scope=body.scope, exam_name=body.exam_name, exam_round=body.exam_round, exam_question_number=qnum, explanation=body.explanation, source_note=body.source_note, is_active=body.is_active, ) session.add(q) await session.flush() await session.commit() stats = QuestionAttemptStats(attempt_count=0, correct_count=0, wrong_count=0) return StudyQuestionResponse( id=q.id, study_topic_id=q.study_topic_id, question_text=q.question_text, choice_1=q.choice_1, choice_2=q.choice_2, choice_3=q.choice_3, choice_4=q.choice_4, correct_choice=q.correct_choice, subject=q.subject, scope=q.scope, exam_name=q.exam_name, exam_round=q.exam_round, explanation=q.explanation, source_note=q.source_note, is_active=q.is_active, exam_question_number=q.exam_question_number, ai_explanation_status=q.ai_explanation_status, ai_explanation_generated_at=q.ai_explanation_generated_at, ai_explanation_model=q.ai_explanation_model, images=await _images_for_question(session, q.id), created_at=q.created_at, updated_at=q.updated_at, stats=stats, ) # ─── 복습 출제 ─── @router.get( "/study-topics/{topic_id}/review/questions", response_model=ReviewQuestionListResponse, ) async def review_questions_for_topic( topic_id: int, user: Annotated[User, Depends(get_current_user)], session: Annotated[AsyncSession, Depends(get_session)], subject: str | None = Query(None, description="단일 과목 집중. 미지정 시 과목별 균등 추출"), scope: str | None = Query(None), target_per_subject: int = Query(20, ge=1, le=100), wrong_only: bool = Query( False, description="가장 최근 attempt 가 오답인 문제만 (latest-wrong, ever_wrong 아님)", ), ): """복습 출제. default = 과목별 target_per_subject 무작위 균등 추출. 응답에서 정답·해설은 비공개 — 답 제출(POST /api/study-questions/{id}/attempt) 시점에만 노출. """ topic = await session.get(StudyTopic, topic_id) _verify_topic_ownership(topic, user) # 후보 질문 base — soft delete 제외 + is_active base_filter = and_( StudyQuestion.user_id == user.id, StudyQuestion.study_topic_id == topic_id, StudyQuestion.deleted_at.is_(None), StudyQuestion.is_active.is_(True), ) # wrong_only=true: latest attempt 가 오답인 question_id 만 후보로 좁힘. # DISTINCT ON (study_question_id) ORDER BY answered_at DESC 패턴. wrong_qids: set[int] | None = None if wrong_only: latest = ( await session.execute( select( StudyQuestionAttempt.study_question_id, StudyQuestionAttempt.is_correct, ) .where( StudyQuestionAttempt.user_id == user.id, StudyQuestionAttempt.study_topic_id == topic_id, ) .order_by( StudyQuestionAttempt.study_question_id, StudyQuestionAttempt.answered_at.desc(), ) .distinct(StudyQuestionAttempt.study_question_id) ) ).all() wrong_qids = {r.study_question_id for r in latest if r.is_correct is False} if not wrong_qids: return ReviewQuestionListResponse( items=[], total=0, target_per_subject=target_per_subject, subject_distribution={} ) # 과목 그룹 결정 if subject is not None: # 단일 과목 집중 subjects: list[str | None] = [subject] else: subj_query = ( select(StudyQuestion.subject) .where(base_filter) .distinct() ) if scope is not None: subj_query = subj_query.where(StudyQuestion.scope == scope) if wrong_qids is not None: subj_query = subj_query.where(StudyQuestion.id.in_(wrong_qids)) subjects = [r[0] for r in (await session.execute(subj_query)).all()] if not subjects: return ReviewQuestionListResponse( items=[], total=0, target_per_subject=target_per_subject, subject_distribution={} ) # 각 subject 별로 ORDER BY random() LIMIT target_per_subject selected: list[StudyQuestion] = [] distribution: dict[str, int] = {} for subj in subjects: sub_q = select(StudyQuestion).where(base_filter) if subj is None: sub_q = sub_q.where(StudyQuestion.subject.is_(None)) else: sub_q = sub_q.where(StudyQuestion.subject == subj) if scope is not None: sub_q = sub_q.where(StudyQuestion.scope == scope) if wrong_qids is not None: sub_q = sub_q.where(StudyQuestion.id.in_(wrong_qids)) sub_q = sub_q.order_by(func.random()).limit(target_per_subject) rows = (await session.execute(sub_q)).scalars().all() if rows: selected.extend(rows) distribution[subj or ""] = len(rows) # 그룹별 추출 후 전체 셔플 (입력 순서 단조 회피) random.shuffle(selected) # attempt 통계 batch (N+1 회피) qids = [q.id for q in selected] stats_map: dict[int, QuestionAttemptStats] = {} if qids: stats_rows = ( await session.execute( select( StudyQuestionAttempt.study_question_id, func.count().label("total"), func.coalesce( func.sum(case((StudyQuestionAttempt.is_correct.is_(True), 1), else_=0)), 0, ).label("correct"), ) .where( StudyQuestionAttempt.user_id == user.id, StudyQuestionAttempt.study_question_id.in_(qids), ) .group_by(StudyQuestionAttempt.study_question_id) ) ).all() for r in stats_rows: t = int(r.total) c = int(r.correct) stats_map[r.study_question_id] = QuestionAttemptStats( attempt_count=t, correct_count=c, wrong_count=t - c ) # PR-8: 이미지 batch images_map = await _images_for_questions_batch(session, qids) items = [ ReviewQuestionItem( id=q.id, question_text=q.question_text, choices=[ ReviewChoice(number=1, text=q.choice_1), ReviewChoice(number=2, text=q.choice_2), ReviewChoice(number=3, text=q.choice_3), ReviewChoice(number=4, text=q.choice_4), ], subject=q.subject, scope=q.scope, stats=stats_map.get( q.id, QuestionAttemptStats(attempt_count=0, correct_count=0, wrong_count=0) ), images=images_map.get(q.id, []), ) for q in selected ] return ReviewQuestionListResponse( items=items, total=len(items), target_per_subject=target_per_subject, subject_distribution=distribution, ) # ─── 단건 엔드포인트 ─── @router.get("/study-questions/{question_id}", response_model=StudyQuestionResponse) async def get_question( question_id: int, user: Annotated[User, Depends(get_current_user)], session: Annotated[AsyncSession, Depends(get_session)], ): q = await session.get(StudyQuestion, question_id) q = _verify_question_ownership(q, user) stats = await _attempt_stats(session, user.id, question_id) return StudyQuestionResponse( id=q.id, study_topic_id=q.study_topic_id, question_text=q.question_text, choice_1=q.choice_1, choice_2=q.choice_2, choice_3=q.choice_3, choice_4=q.choice_4, correct_choice=q.correct_choice, subject=q.subject, scope=q.scope, exam_name=q.exam_name, exam_round=q.exam_round, explanation=q.explanation, source_note=q.source_note, is_active=q.is_active, exam_question_number=q.exam_question_number, ai_explanation_status=q.ai_explanation_status, ai_explanation_generated_at=q.ai_explanation_generated_at, ai_explanation_model=q.ai_explanation_model, images=await _images_for_question(session, q.id), created_at=q.created_at, updated_at=q.updated_at, stats=stats, ) @router.patch("/study-questions/{question_id}", response_model=StudyQuestionResponse) async def update_question( question_id: int, body: StudyQuestionUpdate, user: Annotated[User, Depends(get_current_user)], session: Annotated[AsyncSession, Depends(get_session)], ): """부분 업데이트. correct_choice 변경 시 기존 attempts.is_correct 재계산 안 함 (시점 사실 보존). 프론트 편집 페이지에 안내 배너 노출.""" q = await session.get(StudyQuestion, question_id) q = _verify_question_ownership(q, user) fields_set = body.model_fields_set SIMPLE_FIELDS = { "question_text", "choice_1", "choice_2", "choice_3", "choice_4", "correct_choice", "subject", "scope", "exam_name", "exam_round", "exam_question_number", "explanation", "source_note", "is_active", } for fname in SIMPLE_FIELDS & fields_set: setattr(q, fname, getattr(body, fname)) # PR-3: 문제 핵심 필드 변경 시 AI 해설 stale 전이 (본문은 보존, UI 배지로 안내). # ready 상태에서만 stale 로 전이 — pending/failed/none/stale 은 변경 안 함. AI_STALE_TRIGGER = { "question_text", "choice_1", "choice_2", "choice_3", "choice_4", "correct_choice", } if AI_STALE_TRIGGER & fields_set and q.ai_explanation_status == "ready": q.ai_explanation_status = "stale" # PR-4: 임베딩 stale 전이. 본문(question_text/choice_*)이 바뀌었을 때만 재계산. # correct_choice 변경은 의미 검색에 영향 없으므로 재계산 안 함. EMBED_STALE_TRIGGER = { "question_text", "choice_1", "choice_2", "choice_3", "choice_4", } if EMBED_STALE_TRIGGER & fields_set and q.embedding_status == "ready": q.embedding_status = "stale" # PR-12-A 후속: related 캐시 stale 전이. # 임베딩 본문 변경 (위 EMBED_STALE_TRIGGER) + exam_round 변경 시. # exam_round 는 related-types 의 회차 필터 조건이라 본인 round 바뀌면 candidate 재선정 필요. RELATED_STALE_TRIGGER = EMBED_STALE_TRIGGER | {"exam_round"} if RELATED_STALE_TRIGGER & fields_set and q.related_computed_at is not None: q.related_computed_at = None q.updated_at = datetime.now(timezone.utc) await session.commit() stats = await _attempt_stats(session, user.id, question_id) return StudyQuestionResponse( id=q.id, study_topic_id=q.study_topic_id, question_text=q.question_text, choice_1=q.choice_1, choice_2=q.choice_2, choice_3=q.choice_3, choice_4=q.choice_4, correct_choice=q.correct_choice, subject=q.subject, scope=q.scope, exam_name=q.exam_name, exam_round=q.exam_round, explanation=q.explanation, source_note=q.source_note, is_active=q.is_active, exam_question_number=q.exam_question_number, ai_explanation_status=q.ai_explanation_status, ai_explanation_generated_at=q.ai_explanation_generated_at, ai_explanation_model=q.ai_explanation_model, images=await _images_for_question(session, q.id), created_at=q.created_at, updated_at=q.updated_at, stats=stats, ) @router.delete("/study-questions/{question_id}", status_code=204) async def soft_delete_question( question_id: int, user: Annotated[User, Depends(get_current_user)], session: Annotated[AsyncSession, Depends(get_session)], ): """soft delete only. attempts 는 RESTRICT FK 로 보호되어 영구 보존. hard delete 호출 경로 자체가 없음 — DB 레벨에서도 attempts 가 있으면 거부. PR-12-A 후속: 같은 토픽의 다른 문제 related 캐시에 이 qid 가 candidate 로 남아있을 수 있음. 같은 토픽 ready 행들의 related_computed_at 을 NULL 마킹 → cron 재계산. """ q = await session.get(StudyQuestion, question_id) q = _verify_question_ownership(q, user) q.deleted_at = datetime.now(timezone.utc) q.updated_at = q.deleted_at await session.execute( update(StudyQuestion) .where( StudyQuestion.study_topic_id == q.study_topic_id, StudyQuestion.id != q.id, StudyQuestion.embedding_status == "ready", StudyQuestion.deleted_at.is_(None), StudyQuestion.related_computed_at.is_not(None), ) .values(related_computed_at=None) ) await session.commit() # ─── attempt ─── @router.post("/study-questions/{question_id}/attempt", response_model=AttemptResponse) async def submit_attempt( question_id: int, body: AttemptCreate, user: Annotated[User, Depends(get_current_user)], session: Annotated[AsyncSession, Depends(get_session)], ): """답 제출. PR-9: is_unsure=true 면 outcome='unsure' + selected_choice=NULL. 그 외엔 selected_choice 와 correct_choice 비교 → outcome='correct'/'wrong'. PR-10: quiz_session_id 전달 시 같은 트랜잭션에서 세션 cursor + count 도 갱신. cursor 가 question_ids 끝까지 도달하면 status='done', finished_at 박힘. """ q = await session.get(StudyQuestion, question_id) q = _verify_question_ownership(q, user) if body.is_unsure: selected = None is_correct = False outcome = "unsure" elif body.selected_choice is None: raise HTTPException( status_code=422, detail="selected_choice (1~4) 또는 is_unsure=true 가 필요합니다", ) else: selected = body.selected_choice is_correct = selected == q.correct_choice outcome = "correct" if is_correct else "wrong" # PR-10: 세션 연동. 기본은 None. quiz_session: StudyQuizSession | None = None if body.quiz_session_id is not None: quiz_session = await session.get(StudyQuizSession, body.quiz_session_id) if quiz_session is None or quiz_session.user_id != user.id: raise HTTPException(status_code=404, detail="quiz_session 을 찾을 수 없습니다") if quiz_session.study_topic_id != q.study_topic_id: raise HTTPException( status_code=400, detail="quiz_session 의 토픽과 문제의 토픽이 다릅니다", ) if quiz_session.status != "in_progress": raise HTTPException( status_code=409, detail=f"quiz_session 상태가 {quiz_session.status} — 이미 종료된 세션입니다", ) # 현재 cursor 위치의 문제와 일치해야만 진행 (out-of-order 제출 차단). qids = quiz_session.question_ids or [] if quiz_session.cursor >= len(qids): raise HTTPException(status_code=409, detail="이미 모든 문제를 풀었습니다") if qids[quiz_session.cursor] != q.id: raise HTTPException( status_code=409, detail="현재 cursor 위치의 문제와 question_id 가 일치하지 않습니다 (이중 제출/순서 어긋남)", ) attempt = StudyQuestionAttempt( user_id=user.id, study_question_id=q.id, study_topic_id=q.study_topic_id, selected_choice=selected, correct_choice=q.correct_choice, is_correct=is_correct, outcome=outcome, quiz_session_id=quiz_session.id if quiz_session else None, ) session.add(attempt) if quiz_session is not None: quiz_session.cursor = quiz_session.cursor + 1 if outcome == "correct": quiz_session.correct_count += 1 elif outcome == "wrong": quiz_session.wrong_count += 1 elif outcome == "unsure": quiz_session.unsure_count += 1 if quiz_session.cursor >= len(quiz_session.question_ids or []): quiz_session.status = "done" quiz_session.finished_at = datetime.now(timezone.utc) quiz_session.updated_at = datetime.now(timezone.utc) await session.commit() await session.refresh(attempt) stats = await _attempt_stats(session, user.id, question_id) return AttemptResponse( id=attempt.id, is_correct=is_correct, selected_choice=selected, correct_choice=q.correct_choice, outcome=outcome, explanation=q.explanation, stats=stats, quiz_session_id=quiz_session.id if quiz_session else None, quiz_session_status=quiz_session.status if quiz_session else None, quiz_session_cursor=quiz_session.cursor if quiz_session else None, reviewed_at=attempt.reviewed_at, ) # ─── PR-10: 학습완료 토글 ─── class AttemptReviewRequest(BaseModel): reviewed: bool class AttemptReviewResponse(BaseModel): id: int reviewed_at: datetime | None @router.patch( "/study-question-attempts/{attempt_id}/review-mark", response_model=AttemptReviewResponse, ) async def mark_attempt_reviewed( attempt_id: int, body: AttemptReviewRequest, user: Annotated[User, Depends(get_current_user)], session: Annotated[AsyncSession, Depends(get_session)], ): """결과 카드에서 [학습완료] 토글. reviewed=true 면 timestamp, false 면 NULL.""" attempt = await session.get(StudyQuestionAttempt, attempt_id) if attempt is None or attempt.user_id != user.id: raise HTTPException(status_code=404, detail="attempt 를 찾을 수 없습니다") attempt.reviewed_at = datetime.now(timezone.utc) if body.reviewed else None await session.commit() return AttemptReviewResponse(id=attempt.id, reviewed_at=attempt.reviewed_at) # ─── PR-5: 비슷한 문제 검색 (embedding cosine) ─── @router.get( "/study-questions/{question_id}/similar", response_model=SimilarQuestionsResponse, ) async def list_similar_questions( question_id: int, user: Annotated[User, Depends(get_current_user)], session: Annotated[AsyncSession, Depends(get_session)], limit: int = Query(5, ge=1, le=20), topic_only: bool = Query(True, description="True 면 같은 study_topic 안에서만"), ): """현재 문제의 embedding 기준 cosine 유사도 top-K. 26B 호출 없이 vector search 만. 제외: - 자기 자신 (id != current) - soft-deleted - embedding_status != 'ready' (대기 중·실패·미생성) source_status='ready' 가 아니면 items 빈 배열 반환 (UI 에서 안내). """ src = await session.get(StudyQuestion, question_id) src = _verify_question_ownership(src, user) if src.embedding_status != "ready" or src.embedding is None: return SimilarQuestionsResponse( items=[], source_status=src.embedding_status, source_id=question_id ) # cosine 거리 = embedding <=> ref. 유사도 = 1 - distance. distance_expr = StudyQuestion.embedding.cosine_distance(src.embedding) base = ( select(StudyQuestion, distance_expr.label("distance")) .where( StudyQuestion.user_id == user.id, StudyQuestion.id != question_id, StudyQuestion.deleted_at.is_(None), StudyQuestion.embedding_status == "ready", StudyQuestion.embedding.is_not(None), ) .order_by(distance_expr.asc()) .limit(limit) ) if topic_only: base = base.where(StudyQuestion.study_topic_id == src.study_topic_id) rows = (await session.execute(base)).all() similar_qs = [(r[0], float(r.distance)) for r in rows] # attempt_count + last_correct batch (N+1 회피) qids = [q.id for q, _ in similar_qs] attempt_count_map: dict[int, int] = {} last_correct_map: dict[int, bool] = {} if qids: cnt_rows = ( await session.execute( select( StudyQuestionAttempt.study_question_id, func.count().label("total"), ) .where( StudyQuestionAttempt.user_id == user.id, StudyQuestionAttempt.study_question_id.in_(qids), ) .group_by(StudyQuestionAttempt.study_question_id) ) ).all() for r in cnt_rows: attempt_count_map[r.study_question_id] = int(r.total) latest_rows = ( await session.execute( select( StudyQuestionAttempt.study_question_id, StudyQuestionAttempt.is_correct, ) .where( StudyQuestionAttempt.user_id == user.id, StudyQuestionAttempt.study_question_id.in_(qids), ) .order_by( StudyQuestionAttempt.study_question_id, StudyQuestionAttempt.answered_at.desc(), ) .distinct(StudyQuestionAttempt.study_question_id) ) ).all() for r in latest_rows: last_correct_map[r.study_question_id] = bool(r.is_correct) def _truncate(text: str, n: int = 80) -> str: return text if len(text) <= n else text[:n].rstrip() + "…" items = [ SimilarQuestionItem( id=q.id, study_topic_id=q.study_topic_id, question_text=_truncate(q.question_text, 80), subject=q.subject, scope=q.scope, exam_name=q.exam_name, exam_round=q.exam_round, similarity=round(1.0 - dist, 4), attempt_count=attempt_count_map.get(q.id, 0), last_correct=last_correct_map.get(q.id), ) for q, dist in similar_qs ] return SimilarQuestionsResponse(items=items, source_status="ready", source_id=question_id) # ─── PR-12-A: 반복 출제 / 유사 유형 분류 ─── class RelatedQuestionItem(BaseModel): id: int study_topic_id: int question_text: str subject: str | None scope: str | None exam_round: str | None exam_question_number: int | None similarity: float class RelatedTypesResponse(BaseModel): source_id: int source_status: str # ready | pending | failed | stale | none source_exam_round: str | None # null/empty 면 두 리스트 모두 빈 + 카운트 0 # 다른 user / 같은 회차 / 자기 자신은 백엔드에서 모두 제외됨 repeat_questions: list[RelatedQuestionItem] # similarity >= 0.95 similar_questions: list[RelatedQuestionItem] # 0.88 <= similarity < 0.95 # 카운트 — round_count 가 1차 표시 기준 (회차 간 반복성). repeat_related_count: int repeat_round_count: int repeat_grade: str | None = None # 단골/잘 나오는 반복 출제/반복 출제/신출/빈출 (round_count<2 면 null) similar_related_count: int similar_round_count: int @router.get( "/study-questions/{question_id}/related-types", response_model=RelatedTypesResponse, ) async def list_related_types( question_id: int, user: Annotated[User, Depends(get_current_user)], session: Annotated[AsyncSession, Depends(get_session)], ): """반복 출제(🔥) / 유사 유형(🧩) 분리. 회차 조건 백엔드 강제. 학습 의미: - 반복 출제 = 거의 같은 형태가 다른 회차에 다시 등장 (암기/패턴 고정 가치) - 유사 유형 = 같은 개념·풀이 패턴이 다른 회차에 등장 (개념 복습 가치) 공통 service 함수 사용 — bulk endpoint 와 분류 로직 공유 (drift 회피). PR-12-A 후속: study_questions.related_* 컬럼 캐시 우선 조회. cache hit (computed_at IS NOT NULL + threshold version 일치) 면 HNSW 검색 생략. miss 면 즉시 계산 + 캐시 저장 (cron 못 따라온 안전망). """ from datetime import datetime, timezone from services.study.related_types import ( THRESHOLD_VERSION, classify_related_for_question, deserialize_candidates, serialize_candidates, ) src = await session.get(StudyQuestion, question_id) src = _verify_question_ownership(src, user) src_round = (src.exam_round or "").strip() src_round_or_none = src_round if src_round else None # 임베딩 미준비 → 빈 응답. if src.embedding_status != "ready" or src.embedding is None: return RelatedTypesResponse( source_id=question_id, source_status=src.embedding_status, source_exam_round=src_round_or_none, repeat_questions=[], similar_questions=[], repeat_related_count=0, repeat_round_count=0, similar_related_count=0, similar_round_count=0, ) def _candidates_to_items(cands): return [ RelatedQuestionItem( id=c.id, study_topic_id=c.study_topic_id, question_text=c.question_text, subject=c.subject, scope=c.scope, exam_round=c.exam_round, exam_question_number=c.exam_question_number, similarity=round(c.similarity, 4), ) for c in cands ] # cache hit 조건: computed_at 존재 + 임계값 fingerprint 일치 if ( src.related_computed_at is not None and src.related_threshold_version == THRESHOLD_VERSION ): repeat_cands = deserialize_candidates(src.related_repeat, src.study_topic_id) similar_cands = deserialize_candidates(src.related_similar, src.study_topic_id) return RelatedTypesResponse( source_id=question_id, source_status="ready", source_exam_round=src_round_or_none, repeat_questions=_candidates_to_items(repeat_cands), similar_questions=_candidates_to_items(similar_cands), repeat_related_count=len(repeat_cands), repeat_round_count=src.related_repeat_round_count or 0, repeat_grade=src.related_repeat_grade, similar_related_count=len(similar_cands), similar_round_count=src.related_similar_round_count or 0, ) # cache miss → 즉시 계산 + 저장 (cron 다음 틱 기다리지 않음, 빈 응답 회피) cls = await classify_related_for_question(session, user_id=user.id, source=src) src.related_repeat = serialize_candidates(cls.repeat) src.related_similar = serialize_candidates(cls.similar) src.related_repeat_round_count = cls.repeat_round_count src.related_similar_round_count = cls.similar_round_count src.related_repeat_grade = cls.repeat_grade src.related_computed_at = datetime.now(timezone.utc) src.related_threshold_version = THRESHOLD_VERSION await session.commit() return RelatedTypesResponse( source_id=question_id, source_status="ready", source_exam_round=src_round_or_none, repeat_questions=_candidates_to_items(cls.repeat), similar_questions=_candidates_to_items(cls.similar), repeat_related_count=cls.repeat_related_count, repeat_round_count=cls.repeat_round_count, repeat_grade=cls.repeat_grade, similar_related_count=cls.similar_related_count, similar_round_count=cls.similar_round_count, ) # ─── PR-8: 이미지 업로드/조회/삭제 ─── @router.post( "/study-questions/{question_id}/images", response_model=StudyQuestionImageItem, status_code=201, ) async def upload_question_image( question_id: int, user: Annotated[User, Depends(get_current_user)], session: Annotated[AsyncSession, Depends(get_session)], file: UploadFile = File(...), ): """문제 첨부 이미지 업로드. 1파일 = 1행. 여러 호출로 여러 이미지 추가. 경로: /documents/study_question_images/{topic_id}/{qid}/{img_id}.{ext} 인증: 같은 user 의 question 만. 다른 사용자는 404. 제한: PNG/JPEG/WEBP/GIF, 10MB/파일. """ q = await session.get(StudyQuestion, question_id) q = _verify_question_ownership(q, user) # MIME 검증 mime = (file.content_type or "").lower() if mime not in ALLOWED_IMAGE_MIME: raise HTTPException( status_code=415, detail=f"지원하지 않는 이미지 형식: {mime} (PNG/JPEG/WEBP/GIF 만 허용)", ) # 파일 읽기 + 크기 검증 raw = await file.read() if len(raw) > MAX_IMAGE_BYTES: raise HTTPException( status_code=413, detail=f"이미지 크기 초과 ({len(raw)} > {MAX_IMAGE_BYTES} 바이트, 10MB 제한)", ) if len(raw) == 0: raise HTTPException(status_code=400, detail="빈 파일") # 확장자 결정 ext_map = { "image/png": "png", "image/jpeg": "jpg", "image/webp": "webp", "image/gif": "gif", } ext = ext_map[mime] # DB 행 먼저 만들고 id 받음 (파일명에 사용) img = StudyQuestionImage( user_id=user.id, study_question_id=q.id, file_path="", # 일단 빈 값, id 받은 후 채움 file_size=len(raw), mime_type=mime, sort_order=0, ) session.add(img) await session.flush() # img.id 확보 # 파일 시스템 저장 target_dir = STUDY_IMG_ROOT / str(q.study_topic_id) / str(q.id) target_dir.mkdir(parents=True, exist_ok=True) target_path = target_dir / f"{img.id}.{ext}" target_path.write_bytes(raw) img.file_path = str(target_path) # sort_order = 같은 question 의 max+1 max_sort = ( await session.execute( select(func.coalesce(func.max(StudyQuestionImage.sort_order), -1)) .where(StudyQuestionImage.study_question_id == q.id) .where(StudyQuestionImage.id != img.id) ) ).scalar() or -1 img.sort_order = int(max_sort) + 1 await session.commit() return _image_to_item(img) @router.get("/study-questions/{question_id}/images/{image_id}/raw") async def get_question_image_raw( question_id: int, image_id: int, user: Annotated[User, Depends(get_current_user)], session: Annotated[AsyncSession, Depends(get_session)], ): """이미지 raw bytes. 같은 user 의 question 첨부만 응답.""" img = await session.get(StudyQuestionImage, image_id) if img is None or img.user_id != user.id or img.study_question_id != question_id: raise HTTPException(status_code=404, detail="이미지를 찾을 수 없습니다") file_path = _Path(img.file_path) if not file_path.is_file(): raise HTTPException(status_code=410, detail="파일이 사라졌습니다") return FileResponse(str(file_path), media_type=img.mime_type) @router.delete("/study-questions/{question_id}/images/{image_id}", status_code=204) async def delete_question_image( question_id: int, image_id: int, user: Annotated[User, Depends(get_current_user)], session: Annotated[AsyncSession, Depends(get_session)], ): img = await session.get(StudyQuestionImage, image_id) if img is None or img.user_id != user.id or img.study_question_id != question_id: raise HTTPException(status_code=404, detail="이미지를 찾을 수 없습니다") # 파일 시스템 삭제 (실패해도 DB row 는 정리) try: file_path = _Path(img.file_path) if file_path.is_file(): file_path.unlink() except Exception as e: logger.warning("study_q_image_unlink_failed id=%s: %s", image_id, e) await session.delete(img) await session.commit() # ─── PR-3: AI 풀이 생성 엔드포인트 ─── # MLX 호출 timeout (초). MLX gate + 26B 추론 평균 ~10s, 안전 마진. LLM_TIMEOUT_S = 30.0 # 프롬프트 템플릿 lazy load _PROMPT_PATH = "study_question_explanation.txt" _prompt_cache: str | None = None def _load_explanation_prompt() -> str: global _prompt_cache if _prompt_cache is None: from pathlib import Path prompts_dir = Path(__file__).resolve().parent.parent / "prompts" _prompt_cache = (prompts_dir / _PROMPT_PATH).read_text(encoding="utf-8") return _prompt_cache def _render_prompt(q: StudyQuestion, doc_block: str, question_block: str) -> str: template = _load_explanation_prompt() return ( template .replace("{question_text}", q.question_text) .replace("{choice_1}", q.choice_1) .replace("{choice_2}", q.choice_2) .replace("{choice_3}", q.choice_3) .replace("{choice_4}", q.choice_4) .replace("{correct_choice}", str(q.correct_choice)) .replace("{documents_evidence_block}", doc_block) .replace("{questions_evidence_block}", question_block) ) def _make_cache_response(q: StudyQuestion, is_stale: bool) -> AIExplanationResponse: """캐시 본문만 반환 (evidence 재계산 안 함). status/메타는 q row 그대로.""" return AIExplanationResponse( ai_explanation=q.ai_explanation, ai_explanation_status=q.ai_explanation_status, ai_explanation_generated_at=q.ai_explanation_generated_at, ai_explanation_model=q.ai_explanation_model, evidence=[], from_cache=True, is_stale=is_stale, can_regenerate=True, ) @router.post( "/study-questions/{question_id}/ai-explanation", response_model=AIExplanationResponse, ) async def generate_ai_explanation( question_id: int, body: AIExplanationCreate, user: Annotated[User, Depends(get_current_user)], session: Annotated[AsyncSession, Depends(get_session)], ): """수동 트리거 AI 풀이 생성. RAG = 매핑 자료 + 같은 토픽 다른 문제 (자기 자신 제외). primary 모델 단독 (Mac mini MLX gemma-4-26b). MLX gate Semaphore(1) 경유. - regenerate=false 이고 status=ready → 캐시 즉시 반환 (MLX 호출 없음) - regenerate=false 이고 status=stale → 캐시 본문 반환, is_stale=true 플래그 - status=pending → 409 (이미 다른 호출 진행 중, race-safe 조건부 UPDATE 로 보장) - 그 외 (none/failed/regenerate=true) → 새로 생성 실패 시 status='failed', 직전 본문은 보존. """ q = await session.get(StudyQuestion, question_id) q = _verify_question_ownership(q, user) # 1) 캐시 단축 분기 (regenerate=false) if not body.regenerate: if q.ai_explanation_status == "ready": return _make_cache_response(q, is_stale=False) if q.ai_explanation_status == "stale": return _make_cache_response(q, is_stale=True) if q.ai_explanation_status == "pending": raise HTTPException( status_code=409, detail={"status": "pending", "detail": "이미 생성 중입니다"}, ) # status in {none, failed} → 신규 생성 else: # regenerate=true 라도 pending 은 차단 (동시 호출 방어) if q.ai_explanation_status == "pending": raise HTTPException( status_code=409, detail={"status": "pending", "detail": "이미 생성 중입니다"}, ) # 2) Race-safe pending 전이 (조건부 UPDATE — 동시 호출 차단) lock_result = await session.execute( update(StudyQuestion) .where( StudyQuestion.id == question_id, StudyQuestion.user_id == user.id, StudyQuestion.ai_explanation_status != "pending", ) .values(ai_explanation_status="pending", updated_at=datetime.now(timezone.utc)) .returning(StudyQuestion.id) ) if lock_result.scalar_one_or_none() is None: # 다른 호출이 먼저 pending 박음 raise HTTPException( status_code=409, detail={"status": "pending", "detail": "이미 생성 중입니다"}, ) await session.commit() await session.refresh(q) # 3) RAG 근거 수집 try: ctx = await gather_explanation_context(session, user.id, q) except Exception as e: logger.warning("study_explanation_rag_failed: %s: %s", type(e).__name__, e) # RAG 실패 시 빈 evidence 로 진행 (프롬프트가 "자료 근거 부족" 분기 처리) from services.study.explanation_rag import ExplanationContext ctx = ExplanationContext(documents=[], questions=[]) # 4) 프롬프트 조립 + MLX gate 안에서 primary 호출 + timeout doc_block = render_evidence_block(ctx.documents) q_block = render_evidence_block(ctx.questions) prompt = _render_prompt(q, doc_block, q_block) ai_client = AIClient() raw_text: str | None = None error_message: str | None = None try: async with get_mlx_gate(): async with asyncio.timeout(LLM_TIMEOUT_S): raw_text = await ai_client.call_primary(prompt) except asyncio.TimeoutError: error_message = f"MLX timeout ({LLM_TIMEOUT_S}s)" logger.warning("study_explanation_mlx_timeout qid=%s", question_id) except Exception as e: error_message = f"{type(e).__name__}: {e}" logger.exception("study_explanation_mlx_failed qid=%s", question_id) finally: await ai_client.close() # 5) 결과 저장 (성공/실패 분기) — q 객체 fresh refresh q = await session.get(StudyQuestion, question_id) if raw_text is None or not raw_text.strip(): # 실패 — 직전 본문 보존, status='failed' q.ai_explanation_status = "failed" q.updated_at = datetime.now(timezone.utc) await session.commit() return AIExplanationResponse( ai_explanation=q.ai_explanation, ai_explanation_status="failed", ai_explanation_generated_at=q.ai_explanation_generated_at, ai_explanation_model=q.ai_explanation_model, evidence=[e.to_dict() for e in ctx.all], from_cache=False, is_stale=False, can_regenerate=True, ) # 성공 — Qwen think 태그 등 제거 from ai.client import strip_thinking cleaned = strip_thinking(raw_text).strip() q.ai_explanation = cleaned q.ai_explanation_status = "ready" q.ai_explanation_generated_at = datetime.now(timezone.utc) primary_name = ai_client.ai.primary.model if hasattr(ai_client.ai.primary, "model") else "primary" q.ai_explanation_model = f"mlx:{primary_name}" q.updated_at = q.ai_explanation_generated_at await session.commit() return AIExplanationResponse( ai_explanation=q.ai_explanation, ai_explanation_status="ready", ai_explanation_generated_at=q.ai_explanation_generated_at, ai_explanation_model=q.ai_explanation_model, evidence=[e.to_dict() for e in ctx.all], from_cache=False, is_stale=False, can_regenerate=True, )