Files
hyungi_document_server/app/api/study_questions.py
T
Hyungi Ahn e5982ebde4 feat(study): Phase 1 학습 루프 데이터 계층 — progress 캐시 + finalize + review API
vision (풀이 → 확인 → 학습 → 복습 → 다음 풀이 가중치) 의 데이터 계층.

데이터 모델 (migrations 222~225):
- study_question_progress 테이블 — user × topic × question 단위 현재 상태 캐시
  - 마지막 시도: last_outcome, last_attempted_at, last_attempt_id
  - 검토 상태: last_reviewed_at
  - 복습 큐: due_at, review_stage
  - 패턴 분류 (derived): pattern_state, pattern_updated_at, pattern_window_attempts
- 3 partial idx (due / topic_pattern / pending_review) — 탭별 빠른 조회

패턴 분류 (services/study/learning_pattern.py):
- 7 분류: unattempted/unsure/chronic_wrong/regressed/recovered/stable/unstable
- 윈도우 = 최근 3회 + 과거 correct/wrong 존재 여부
- chronic_wrong > regressed > recovered 우선순위 (보수적 학습)
- 가드: wrong 1회만으로 regressed 안 됨 (이전 correct 이력 필요)
- stable 은 3 연속 correct 부터

세션 종료 집계 (services/study/session_finalize.py):
- attempts append-only 원본 보존, progress upsert 만
- 마지막 attempt 직후 finalize hook 자동 발동
- finalize 는 last_* + pattern_state 만 갱신, due_at 미진입 문제는 NULL 유지
- 이미 due_at 박힌 문제는 finalize 가 stage 갱신 (correct → +1 / wrong → 리셋)

API (api/study_question_progress.py):
- POST /study-topics/{tid}/questions/{qid}/review-complete
  → last_reviewed_at + (wrong/unsure 인 경우만) due_at 최초 부여
- GET /study-topics/{tid}/review-queue?tab=due_today|pending_review|chronic|regressed|mastered
  → 5 탭 paginated 조회
  → pending_review 는 last_reviewed_at < last_attempted_at 까지 포함 (이전 확인완료 후 다시 wrong 잡힘)

Phase 1-E (풀이 선별 알고리즘) 은 후속 commit.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-01 09:28:46 +09:00

1583 lines
58 KiB
Python

"""학습 워크스페이스 문제은행 + 복습모드 API.
PR-2 가드레일:
- study_topic 컨테이너에 자산 타입별 1:N (문제) 추가. polymorphic 단일 테이블 영구 금지.
- subject/scope 강한 enum 미사용 (자유 텍스트, 자동완성은 후속 PR).
- 문제 삭제는 soft delete only. attempts 는 RESTRICT FK 로 DB 레벨 보호.
- correct_choice 변경 시 기존 attempts 재계산 안 함 (기록 = 시점 사실).
- 복습 출제 시 정답·해설 응답 비공개 → 답 제출 시점에만 노출.
- wrong_only=true 정의 = 가장 최근 attempt 가 오답인 문제. "한 번이라도 틀린 적 있는" 아님.
"""
import asyncio
import logging
import random
from datetime import datetime, timezone
from typing import Annotated
from fastapi import APIRouter, Depends, File, HTTPException, Query, UploadFile
from fastapi.responses import FileResponse
from pydantic import BaseModel, Field
from sqlalchemy import and_, case, func, select, text as sql_text, update
from sqlalchemy.ext.asyncio import AsyncSession
from ai.client import AIClient
from core.auth import get_current_user
from core.config import settings
from core.database import get_session
from models.study_question import StudyQuestion, StudyQuestionAttempt
from models.study_question_image import StudyQuestionImage
from models.study_quiz_session import StudyQuizSession
from models.study_topic import StudyTopic
from models.user import User
from services.search.llm_gate import get_mlx_gate
from services.study.explanation_rag import (
EvidenceItem,
gather_explanation_context,
render_evidence_block,
)
logger = logging.getLogger(__name__)
router = APIRouter()
# ─── Helpers ───
def _verify_topic_ownership(topic: StudyTopic | None, user: User) -> StudyTopic:
"""study_topics.py 와 동일한 패턴 — 정보 누설 방지로 mismatch 도 404."""
if topic is None or topic.user_id != user.id or topic.deleted_at is not None:
raise HTTPException(status_code=404, detail="학습 주제를 찾을 수 없습니다")
return topic
def _verify_question_ownership(q: StudyQuestion | None, user: User) -> StudyQuestion:
if q is None or q.user_id != user.id or q.deleted_at is not None:
raise HTTPException(status_code=404, detail="문제를 찾을 수 없습니다")
return q
# ─── Pydantic 스키마 ───
class StudyQuestionCreate(BaseModel):
question_text: str = Field(min_length=1)
choice_1: str = Field(min_length=1)
choice_2: str = Field(min_length=1)
choice_3: str = Field(min_length=1)
choice_4: str = Field(min_length=1)
correct_choice: int = Field(ge=1, le=4)
subject: str | None = Field(default=None, max_length=120)
scope: str | None = Field(default=None, max_length=200)
exam_name: str | None = Field(default=None, max_length=120)
exam_round: str | None = Field(default=None, max_length=120)
# PR-6: 회차 안 문항 번호. 미명시 + exam_round 명시 시 서버가 max+1 자동 채움.
exam_question_number: int | None = Field(default=None, ge=1)
explanation: str | None = None
source_note: str | None = None
is_active: bool = True
class StudyQuestionUpdate(BaseModel):
question_text: str | None = Field(default=None, min_length=1)
choice_1: str | None = Field(default=None, min_length=1)
choice_2: str | None = Field(default=None, min_length=1)
choice_3: str | None = Field(default=None, min_length=1)
choice_4: str | None = Field(default=None, min_length=1)
correct_choice: int | None = Field(default=None, ge=1, le=4)
subject: str | None = Field(default=None, max_length=120)
scope: str | None = Field(default=None, max_length=200)
exam_name: str | None = Field(default=None, max_length=120)
exam_round: str | None = Field(default=None, max_length=120)
exam_question_number: int | None = Field(default=None, ge=1)
explanation: str | None = None
source_note: str | None = None
is_active: bool | None = None
class QuestionAttemptStats(BaseModel):
attempt_count: int
correct_count: int
wrong_count: int
class StudyQuestionImageItem(BaseModel):
"""문제 첨부 이미지 메타. raw bytes 는 별도 GET endpoint."""
id: int
sort_order: int
mime_type: str
file_size: int
served_url: str # 클라이언트 표시용 — /api/study-questions/{qid}/images/{img_id}/raw
class StudyQuestionResponse(BaseModel):
"""편집·관리용 — 정답·해설 모두 노출."""
id: int
study_topic_id: int
question_text: str
choice_1: str
choice_2: str
choice_3: str
choice_4: str
correct_choice: int
subject: str | None
scope: str | None
exam_name: str | None
exam_round: str | None
explanation: str | None
source_note: str | None
is_active: bool
# PR-6: 회차 안 문항 번호
exam_question_number: int | None = None
# PR-3: AI 풀이 상태 (편집 화면에서 사용). 본문은 별도 GET /ai-explanation 으로
ai_explanation_status: str = "none"
ai_explanation_generated_at: datetime | None = None
ai_explanation_model: str | None = None
# PR-8: 첨부 이미지
images: list[StudyQuestionImageItem] = []
created_at: datetime
updated_at: datetime
stats: QuestionAttemptStats
class StudyQuestionSummary(BaseModel):
"""통합 뷰·목록용 — 본문 truncate, 정답 비공개."""
id: int
question_text: str # 앞 80자 truncate (서버 측에서 처리)
subject: str | None
scope: str | None
exam_name: str | None
exam_round: str | None
exam_question_number: int | None = None # PR-11/12 보기 페이지 prev/next 용
is_active: bool
has_images: bool = False
attempt_count: int
last_correct: bool | None
created_at: datetime
class StudyQuestionListResponse(BaseModel):
items: list[StudyQuestionSummary]
total: int
page: int
page_size: int
class ReviewChoice(BaseModel):
number: int
text: str
class ReviewQuestionItem(BaseModel):
"""복습 출제 응답 — 정답·해설 비공개."""
id: int
question_text: str
choices: list[ReviewChoice]
subject: str | None
scope: str | None
stats: QuestionAttemptStats
images: list[StudyQuestionImageItem] = []
class ReviewQuestionListResponse(BaseModel):
items: list[ReviewQuestionItem]
total: int
target_per_subject: int
subject_distribution: dict[str, int] # {"연소공학": 20, ...}
class AttemptCreate(BaseModel):
"""PR-9: selected_choice (1~4) 또는 is_unsure 둘 중 하나 필수.
is_unsure=true 면 selected_choice 무시 + outcome='unsure' 로 박힘.
PR-10: quiz_session_id 전달 시 세션의 cursor + count 도 함께 갱신.
"""
selected_choice: int | None = Field(default=None, ge=1, le=4)
is_unsure: bool = False
quiz_session_id: int | None = None
class AttemptResponse(BaseModel):
id: int # PR-10: 학습완료 토글 시 attempt 단위로 PATCH 하므로 id 노출 필요.
is_correct: bool
selected_choice: int | None
correct_choice: int
outcome: str # PR-9: correct | wrong | unsure
explanation: str | None
stats: QuestionAttemptStats
quiz_session_id: int | None = None
quiz_session_status: str | None = None # in_progress | done (마지막 문제면 done 으로 박힘)
quiz_session_cursor: int | None = None
reviewed_at: datetime | None = None
# ─── PR-5: 비슷한 문제 검색 (embedding cosine) ───
class SimilarQuestionItem(BaseModel):
id: int
study_topic_id: int
question_text: str # 80자 truncate
subject: str | None
scope: str | None
exam_name: str | None
exam_round: str | None
similarity: float # 1 - cosine_distance, 1.0=동일, 0.0=무관
attempt_count: int
last_correct: bool | None
class SimilarQuestionsResponse(BaseModel):
items: list[SimilarQuestionItem]
source_status: str # 현재 문제의 embedding_status. 'ready' 가 아니면 빈 결과.
source_id: int
# ─── PR-3: AI 풀이 생성 ───
class AIExplanationCreate(BaseModel):
"""수동 트리거. regenerate=true 면 캐시 무시하고 새 본문 생성."""
regenerate: bool = False
class AIExplanationEvidence(BaseModel):
source_type: str # 'document' | 'question'
source_id: int
title: str
snippet: str
class AIExplanationResponse(BaseModel):
ai_explanation: str | None
ai_explanation_status: str # ready | stale | failed | none
ai_explanation_generated_at: datetime | None
ai_explanation_model: str | None
evidence: list[AIExplanationEvidence] = []
from_cache: bool = False
is_stale: bool = False
can_regenerate: bool = True
# ─── 통계 헬퍼 ───
# PR-8: 이미지 저장 root + 헬퍼
from pathlib import Path as _Path
STUDY_IMG_ROOT = _Path(settings.nas_mount_path) / "study_question_images"
ALLOWED_IMAGE_MIME = {"image/png", "image/jpeg", "image/webp", "image/gif"}
MAX_IMAGE_BYTES = 10 * 1024 * 1024 # 10MB / 파일
def _image_url(qid: int, img_id: int) -> str:
return f"/api/study-questions/{qid}/images/{img_id}/raw"
def _image_to_item(img: StudyQuestionImage) -> StudyQuestionImageItem:
return StudyQuestionImageItem(
id=img.id,
sort_order=img.sort_order,
mime_type=img.mime_type,
file_size=img.file_size,
served_url=_image_url(img.study_question_id, img.id),
)
async def _images_for_question(
session: AsyncSession, question_id: int
) -> list[StudyQuestionImageItem]:
rows = (
await session.execute(
select(StudyQuestionImage)
.where(StudyQuestionImage.study_question_id == question_id)
.order_by(StudyQuestionImage.sort_order.asc(), StudyQuestionImage.id.asc())
)
).scalars().all()
return [_image_to_item(r) for r in rows]
async def _images_for_questions_batch(
session: AsyncSession, question_ids: list[int]
) -> dict[int, list[StudyQuestionImageItem]]:
"""N+1 회피 — 여러 question 의 이미지 한 번에 조회."""
if not question_ids:
return {}
rows = (
await session.execute(
select(StudyQuestionImage)
.where(StudyQuestionImage.study_question_id.in_(question_ids))
.order_by(
StudyQuestionImage.study_question_id,
StudyQuestionImage.sort_order.asc(),
StudyQuestionImage.id.asc(),
)
)
).scalars().all()
out: dict[int, list[StudyQuestionImageItem]] = {qid: [] for qid in question_ids}
for r in rows:
out.setdefault(r.study_question_id, []).append(_image_to_item(r))
return out
async def _attempt_stats(
session: AsyncSession, user_id: int, question_id: int
) -> QuestionAttemptStats:
"""단일 문제의 누적 attempt 통계 (사용자 한정)."""
row = (
await session.execute(
select(
func.count().label("total"),
func.coalesce(
func.sum(case((StudyQuestionAttempt.is_correct.is_(True), 1), else_=0)),
0,
).label("correct"),
).where(
StudyQuestionAttempt.user_id == user_id,
StudyQuestionAttempt.study_question_id == question_id,
)
)
).one()
total = int(row.total or 0)
correct = int(row.correct or 0)
return QuestionAttemptStats(
attempt_count=total, correct_count=correct, wrong_count=total - correct
)
def _truncate(text: str, n: int = 80) -> str:
return text if len(text) <= n else text[:n].rstrip() + ""
# ─── 토픽 단위 엔드포인트 ───
@router.get(
"/study-topics/{topic_id}/questions",
response_model=StudyQuestionListResponse,
)
async def list_questions_in_topic(
topic_id: int,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
subject: str | None = Query(None),
scope: str | None = Query(None),
exam_round: str | None = Query(None, description="회차 필터 — 보기 페이지 prev/next 같은 단일 회차 한정 조회"),
page: int = Query(1, ge=1),
page_size: int = Query(50, ge=1, le=200),
):
"""토픽 내 문제 목록. soft-deleted 제외. summary 응답 (정답 비공개)."""
topic = await session.get(StudyTopic, topic_id)
_verify_topic_ownership(topic, user)
base = select(StudyQuestion).where(
StudyQuestion.user_id == user.id,
StudyQuestion.study_topic_id == topic_id,
StudyQuestion.deleted_at.is_(None),
)
if subject is not None:
base = base.where(StudyQuestion.subject == subject)
if scope is not None:
base = base.where(StudyQuestion.scope == scope)
if exam_round is not None:
base = base.where(StudyQuestion.exam_round == exam_round)
total = (
await session.execute(select(func.count()).select_from(base.subquery()))
).scalar() or 0
# 정렬: exam_round 필터 시 회차 안 prev/next 안정화 (exam_question_number asc NULLS LAST,
# created_at asc fallback). 미필터 시 기존 created_at desc 유지.
if exam_round is not None:
order_clause = (
StudyQuestion.exam_question_number.asc().nulls_last(),
StudyQuestion.created_at.asc(),
StudyQuestion.id.asc(),
)
else:
order_clause = (StudyQuestion.created_at.desc(), StudyQuestion.id.desc())
rows = (
await session.execute(
base.order_by(*order_clause)
.offset((page - 1) * page_size)
.limit(page_size)
)
).scalars().all()
# 한 번의 쿼리로 attempt 통계 + 마지막 정/오답 끌어오기 (N+1 회피)
qids = [q.id for q in rows]
stats_map: dict[int, tuple[int, int]] = {}
last_correct_map: dict[int, bool] = {}
if qids:
stats_rows = (
await session.execute(
select(
StudyQuestionAttempt.study_question_id,
func.count().label("total"),
func.coalesce(
func.sum(case((StudyQuestionAttempt.is_correct.is_(True), 1), else_=0)),
0,
).label("correct"),
)
.where(
StudyQuestionAttempt.user_id == user.id,
StudyQuestionAttempt.study_question_id.in_(qids),
)
.group_by(StudyQuestionAttempt.study_question_id)
)
).all()
for r in stats_rows:
stats_map[r.study_question_id] = (int(r.total), int(r.correct))
latest_rows = (
await session.execute(
select(
StudyQuestionAttempt.study_question_id,
StudyQuestionAttempt.is_correct,
StudyQuestionAttempt.answered_at,
)
.where(
StudyQuestionAttempt.user_id == user.id,
StudyQuestionAttempt.study_question_id.in_(qids),
)
.order_by(
StudyQuestionAttempt.study_question_id,
StudyQuestionAttempt.answered_at.desc(),
)
.distinct(StudyQuestionAttempt.study_question_id)
)
).all()
for r in latest_rows:
last_correct_map[r.study_question_id] = bool(r.is_correct)
# PR-8: has_images batch
has_image_set: set[int] = set()
if qids:
img_rows = (
await session.execute(
select(StudyQuestionImage.study_question_id)
.where(StudyQuestionImage.study_question_id.in_(qids))
.distinct()
)
).scalars().all()
has_image_set = set(img_rows)
items = [
StudyQuestionSummary(
id=q.id,
question_text=_truncate(q.question_text, 80),
subject=q.subject,
scope=q.scope,
exam_name=q.exam_name,
exam_round=q.exam_round,
exam_question_number=q.exam_question_number,
is_active=q.is_active,
attempt_count=stats_map.get(q.id, (0, 0))[0],
last_correct=last_correct_map.get(q.id),
has_images=q.id in has_image_set,
created_at=q.created_at,
)
for q in rows
]
return StudyQuestionListResponse(items=items, total=total, page=page, page_size=page_size)
@router.post(
"/study-topics/{topic_id}/questions",
response_model=StudyQuestionResponse,
status_code=201,
)
async def create_question_in_topic(
topic_id: int,
body: StudyQuestionCreate,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
):
topic = await session.get(StudyTopic, topic_id)
_verify_topic_ownership(topic, user)
# PR-6: exam_question_number 미명시 + exam_round 명시 시 서버가 max+1 자동 채움.
# 개인 사용 환경 race 부담 적어 단순 SELECT max 사용. 동시 입력 빈번해지면 향후
# SELECT FOR UPDATE 또는 토픽 단위 advisory lock 으로 강화 검토.
qnum = body.exam_question_number
if qnum is None and body.exam_round:
max_row = await session.execute(
select(func.coalesce(func.max(StudyQuestion.exam_question_number), 0))
.where(
StudyQuestion.user_id == user.id,
StudyQuestion.study_topic_id == topic_id,
StudyQuestion.exam_round == body.exam_round,
StudyQuestion.deleted_at.is_(None),
)
)
qnum = int(max_row.scalar() or 0) + 1
q = StudyQuestion(
user_id=user.id,
study_topic_id=topic_id,
question_text=body.question_text,
choice_1=body.choice_1,
choice_2=body.choice_2,
choice_3=body.choice_3,
choice_4=body.choice_4,
correct_choice=body.correct_choice,
subject=body.subject,
scope=body.scope,
exam_name=body.exam_name,
exam_round=body.exam_round,
exam_question_number=qnum,
explanation=body.explanation,
source_note=body.source_note,
is_active=body.is_active,
)
session.add(q)
await session.flush()
await session.commit()
stats = QuestionAttemptStats(attempt_count=0, correct_count=0, wrong_count=0)
return StudyQuestionResponse(
id=q.id,
study_topic_id=q.study_topic_id,
question_text=q.question_text,
choice_1=q.choice_1,
choice_2=q.choice_2,
choice_3=q.choice_3,
choice_4=q.choice_4,
correct_choice=q.correct_choice,
subject=q.subject,
scope=q.scope,
exam_name=q.exam_name,
exam_round=q.exam_round,
explanation=q.explanation,
source_note=q.source_note,
is_active=q.is_active,
exam_question_number=q.exam_question_number,
ai_explanation_status=q.ai_explanation_status,
ai_explanation_generated_at=q.ai_explanation_generated_at,
ai_explanation_model=q.ai_explanation_model,
images=await _images_for_question(session, q.id),
created_at=q.created_at,
updated_at=q.updated_at,
stats=stats,
)
# ─── 복습 출제 ───
@router.get(
"/study-topics/{topic_id}/review/questions",
response_model=ReviewQuestionListResponse,
)
async def review_questions_for_topic(
topic_id: int,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
subject: str | None = Query(None, description="단일 과목 집중. 미지정 시 과목별 균등 추출"),
scope: str | None = Query(None),
target_per_subject: int = Query(20, ge=1, le=100),
wrong_only: bool = Query(
False,
description="가장 최근 attempt 가 오답인 문제만 (latest-wrong, ever_wrong 아님)",
),
):
"""복습 출제. default = 과목별 target_per_subject 무작위 균등 추출.
응답에서 정답·해설은 비공개 — 답 제출(POST /api/study-questions/{id}/attempt) 시점에만 노출.
"""
topic = await session.get(StudyTopic, topic_id)
_verify_topic_ownership(topic, user)
# 후보 질문 base — soft delete 제외 + is_active
base_filter = and_(
StudyQuestion.user_id == user.id,
StudyQuestion.study_topic_id == topic_id,
StudyQuestion.deleted_at.is_(None),
StudyQuestion.is_active.is_(True),
)
# wrong_only=true: latest attempt 가 오답인 question_id 만 후보로 좁힘.
# DISTINCT ON (study_question_id) ORDER BY answered_at DESC 패턴.
wrong_qids: set[int] | None = None
if wrong_only:
latest = (
await session.execute(
select(
StudyQuestionAttempt.study_question_id,
StudyQuestionAttempt.is_correct,
)
.where(
StudyQuestionAttempt.user_id == user.id,
StudyQuestionAttempt.study_topic_id == topic_id,
)
.order_by(
StudyQuestionAttempt.study_question_id,
StudyQuestionAttempt.answered_at.desc(),
)
.distinct(StudyQuestionAttempt.study_question_id)
)
).all()
wrong_qids = {r.study_question_id for r in latest if r.is_correct is False}
if not wrong_qids:
return ReviewQuestionListResponse(
items=[], total=0, target_per_subject=target_per_subject, subject_distribution={}
)
# 과목 그룹 결정
if subject is not None:
# 단일 과목 집중
subjects: list[str | None] = [subject]
else:
subj_query = (
select(StudyQuestion.subject)
.where(base_filter)
.distinct()
)
if scope is not None:
subj_query = subj_query.where(StudyQuestion.scope == scope)
if wrong_qids is not None:
subj_query = subj_query.where(StudyQuestion.id.in_(wrong_qids))
subjects = [r[0] for r in (await session.execute(subj_query)).all()]
if not subjects:
return ReviewQuestionListResponse(
items=[], total=0, target_per_subject=target_per_subject, subject_distribution={}
)
# 각 subject 별로 ORDER BY random() LIMIT target_per_subject
selected: list[StudyQuestion] = []
distribution: dict[str, int] = {}
for subj in subjects:
sub_q = select(StudyQuestion).where(base_filter)
if subj is None:
sub_q = sub_q.where(StudyQuestion.subject.is_(None))
else:
sub_q = sub_q.where(StudyQuestion.subject == subj)
if scope is not None:
sub_q = sub_q.where(StudyQuestion.scope == scope)
if wrong_qids is not None:
sub_q = sub_q.where(StudyQuestion.id.in_(wrong_qids))
sub_q = sub_q.order_by(func.random()).limit(target_per_subject)
rows = (await session.execute(sub_q)).scalars().all()
if rows:
selected.extend(rows)
distribution[subj or ""] = len(rows)
# 그룹별 추출 후 전체 셔플 (입력 순서 단조 회피)
random.shuffle(selected)
# attempt 통계 batch (N+1 회피)
qids = [q.id for q in selected]
stats_map: dict[int, QuestionAttemptStats] = {}
if qids:
stats_rows = (
await session.execute(
select(
StudyQuestionAttempt.study_question_id,
func.count().label("total"),
func.coalesce(
func.sum(case((StudyQuestionAttempt.is_correct.is_(True), 1), else_=0)),
0,
).label("correct"),
)
.where(
StudyQuestionAttempt.user_id == user.id,
StudyQuestionAttempt.study_question_id.in_(qids),
)
.group_by(StudyQuestionAttempt.study_question_id)
)
).all()
for r in stats_rows:
t = int(r.total)
c = int(r.correct)
stats_map[r.study_question_id] = QuestionAttemptStats(
attempt_count=t, correct_count=c, wrong_count=t - c
)
# PR-8: 이미지 batch
images_map = await _images_for_questions_batch(session, qids)
items = [
ReviewQuestionItem(
id=q.id,
question_text=q.question_text,
choices=[
ReviewChoice(number=1, text=q.choice_1),
ReviewChoice(number=2, text=q.choice_2),
ReviewChoice(number=3, text=q.choice_3),
ReviewChoice(number=4, text=q.choice_4),
],
subject=q.subject,
scope=q.scope,
stats=stats_map.get(
q.id, QuestionAttemptStats(attempt_count=0, correct_count=0, wrong_count=0)
),
images=images_map.get(q.id, []),
)
for q in selected
]
return ReviewQuestionListResponse(
items=items,
total=len(items),
target_per_subject=target_per_subject,
subject_distribution=distribution,
)
# ─── 단건 엔드포인트 ───
@router.get("/study-questions/{question_id}", response_model=StudyQuestionResponse)
async def get_question(
question_id: int,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
):
q = await session.get(StudyQuestion, question_id)
q = _verify_question_ownership(q, user)
stats = await _attempt_stats(session, user.id, question_id)
return StudyQuestionResponse(
id=q.id,
study_topic_id=q.study_topic_id,
question_text=q.question_text,
choice_1=q.choice_1,
choice_2=q.choice_2,
choice_3=q.choice_3,
choice_4=q.choice_4,
correct_choice=q.correct_choice,
subject=q.subject,
scope=q.scope,
exam_name=q.exam_name,
exam_round=q.exam_round,
explanation=q.explanation,
source_note=q.source_note,
is_active=q.is_active,
exam_question_number=q.exam_question_number,
ai_explanation_status=q.ai_explanation_status,
ai_explanation_generated_at=q.ai_explanation_generated_at,
ai_explanation_model=q.ai_explanation_model,
images=await _images_for_question(session, q.id),
created_at=q.created_at,
updated_at=q.updated_at,
stats=stats,
)
@router.patch("/study-questions/{question_id}", response_model=StudyQuestionResponse)
async def update_question(
question_id: int,
body: StudyQuestionUpdate,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
):
"""부분 업데이트. correct_choice 변경 시 기존 attempts.is_correct 재계산 안 함 (시점 사실 보존).
프론트 편집 페이지에 안내 배너 노출."""
q = await session.get(StudyQuestion, question_id)
q = _verify_question_ownership(q, user)
fields_set = body.model_fields_set
SIMPLE_FIELDS = {
"question_text", "choice_1", "choice_2", "choice_3", "choice_4",
"correct_choice", "subject", "scope", "exam_name", "exam_round",
"exam_question_number",
"explanation", "source_note", "is_active",
}
for fname in SIMPLE_FIELDS & fields_set:
setattr(q, fname, getattr(body, fname))
# PR-3: 문제 핵심 필드 변경 시 AI 해설 stale 전이 (본문은 보존, UI 배지로 안내).
# ready 상태에서만 stale 로 전이 — pending/failed/none/stale 은 변경 안 함.
AI_STALE_TRIGGER = {
"question_text", "choice_1", "choice_2", "choice_3", "choice_4", "correct_choice",
}
if AI_STALE_TRIGGER & fields_set and q.ai_explanation_status == "ready":
q.ai_explanation_status = "stale"
# PR-4: 임베딩 stale 전이. 본문(question_text/choice_*)이 바뀌었을 때만 재계산.
# correct_choice 변경은 의미 검색에 영향 없으므로 재계산 안 함.
EMBED_STALE_TRIGGER = {
"question_text", "choice_1", "choice_2", "choice_3", "choice_4",
}
if EMBED_STALE_TRIGGER & fields_set and q.embedding_status == "ready":
q.embedding_status = "stale"
# PR-12-A 후속: related 캐시 stale 전이.
# 임베딩 본문 변경 (위 EMBED_STALE_TRIGGER) + exam_round 변경 시.
# exam_round 는 related-types 의 회차 필터 조건이라 본인 round 바뀌면 candidate 재선정 필요.
RELATED_STALE_TRIGGER = EMBED_STALE_TRIGGER | {"exam_round"}
if RELATED_STALE_TRIGGER & fields_set and q.related_computed_at is not None:
q.related_computed_at = None
q.updated_at = datetime.now(timezone.utc)
await session.commit()
stats = await _attempt_stats(session, user.id, question_id)
return StudyQuestionResponse(
id=q.id,
study_topic_id=q.study_topic_id,
question_text=q.question_text,
choice_1=q.choice_1,
choice_2=q.choice_2,
choice_3=q.choice_3,
choice_4=q.choice_4,
correct_choice=q.correct_choice,
subject=q.subject,
scope=q.scope,
exam_name=q.exam_name,
exam_round=q.exam_round,
explanation=q.explanation,
source_note=q.source_note,
is_active=q.is_active,
exam_question_number=q.exam_question_number,
ai_explanation_status=q.ai_explanation_status,
ai_explanation_generated_at=q.ai_explanation_generated_at,
ai_explanation_model=q.ai_explanation_model,
images=await _images_for_question(session, q.id),
created_at=q.created_at,
updated_at=q.updated_at,
stats=stats,
)
@router.delete("/study-questions/{question_id}", status_code=204)
async def soft_delete_question(
question_id: int,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
):
"""soft delete only. attempts 는 RESTRICT FK 로 보호되어 영구 보존.
hard delete 호출 경로 자체가 없음 — DB 레벨에서도 attempts 가 있으면 거부.
PR-12-A 후속: 같은 토픽의 다른 문제 related 캐시에 이 qid 가 candidate 로
남아있을 수 있음. 같은 토픽 ready 행들의 related_computed_at 을 NULL 마킹 → cron 재계산.
"""
q = await session.get(StudyQuestion, question_id)
q = _verify_question_ownership(q, user)
q.deleted_at = datetime.now(timezone.utc)
q.updated_at = q.deleted_at
await session.execute(
update(StudyQuestion)
.where(
StudyQuestion.study_topic_id == q.study_topic_id,
StudyQuestion.id != q.id,
StudyQuestion.embedding_status == "ready",
StudyQuestion.deleted_at.is_(None),
StudyQuestion.related_computed_at.is_not(None),
)
.values(related_computed_at=None)
)
await session.commit()
# ─── attempt ───
@router.post("/study-questions/{question_id}/attempt", response_model=AttemptResponse)
async def submit_attempt(
question_id: int,
body: AttemptCreate,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
):
"""답 제출. PR-9: is_unsure=true 면 outcome='unsure' + selected_choice=NULL.
그 외엔 selected_choice 와 correct_choice 비교 → outcome='correct'/'wrong'.
PR-10: quiz_session_id 전달 시 같은 트랜잭션에서 세션 cursor + count 도 갱신.
cursor 가 question_ids 끝까지 도달하면 status='done', finished_at 박힘.
"""
q = await session.get(StudyQuestion, question_id)
q = _verify_question_ownership(q, user)
if body.is_unsure:
selected = None
is_correct = False
outcome = "unsure"
elif body.selected_choice is None:
raise HTTPException(
status_code=422,
detail="selected_choice (1~4) 또는 is_unsure=true 가 필요합니다",
)
else:
selected = body.selected_choice
is_correct = selected == q.correct_choice
outcome = "correct" if is_correct else "wrong"
# PR-10: 세션 연동. 기본은 None.
quiz_session: StudyQuizSession | None = None
if body.quiz_session_id is not None:
quiz_session = await session.get(StudyQuizSession, body.quiz_session_id)
if quiz_session is None or quiz_session.user_id != user.id:
raise HTTPException(status_code=404, detail="quiz_session 을 찾을 수 없습니다")
if quiz_session.study_topic_id != q.study_topic_id:
raise HTTPException(
status_code=400,
detail="quiz_session 의 토픽과 문제의 토픽이 다릅니다",
)
if quiz_session.status != "in_progress":
raise HTTPException(
status_code=409,
detail=f"quiz_session 상태가 {quiz_session.status} — 이미 종료된 세션입니다",
)
# 현재 cursor 위치의 문제와 일치해야만 진행 (out-of-order 제출 차단).
qids = quiz_session.question_ids or []
if quiz_session.cursor >= len(qids):
raise HTTPException(status_code=409, detail="이미 모든 문제를 풀었습니다")
if qids[quiz_session.cursor] != q.id:
raise HTTPException(
status_code=409,
detail="현재 cursor 위치의 문제와 question_id 가 일치하지 않습니다 (이중 제출/순서 어긋남)",
)
attempt = StudyQuestionAttempt(
user_id=user.id,
study_question_id=q.id,
study_topic_id=q.study_topic_id,
selected_choice=selected,
correct_choice=q.correct_choice,
is_correct=is_correct,
outcome=outcome,
quiz_session_id=quiz_session.id if quiz_session else None,
)
session.add(attempt)
session_just_finished = False
if quiz_session is not None:
quiz_session.cursor = quiz_session.cursor + 1
if outcome == "correct":
quiz_session.correct_count += 1
elif outcome == "wrong":
quiz_session.wrong_count += 1
elif outcome == "unsure":
quiz_session.unsure_count += 1
if quiz_session.cursor >= len(quiz_session.question_ids or []):
quiz_session.status = "done"
quiz_session.finished_at = datetime.now(timezone.utc)
session_just_finished = True
quiz_session.updated_at = datetime.now(timezone.utc)
# 1차 commit: attempt + quiz_session.status (실패 시 attempt 조차 안 박힘 — 사용자 재시도)
await session.commit()
await session.refresh(attempt)
# Phase 1: 세션 종료 시 progress upsert + pattern 갱신 + (있는 경우) 복습 stage 갱신.
# 별도 트랜잭션 — finalize 실패가 attempt 보존을 위협하지 않게 분리. 멱등성 보장 (다음 review/queue
# 호출 시 stale 진단 가능). 첫 차 commit 의 attempts 가 이미 가시화된 상태에서 시작.
if session_just_finished and quiz_session is not None:
from services.study.session_finalize import finalize_session
try:
await finalize_session(
session,
user_id=user.id,
study_topic_id=q.study_topic_id,
quiz_session_id=quiz_session.id,
)
await session.commit()
except Exception:
import logging
logging.getLogger(__name__).exception(
"finalize_session_failed quiz_session_id=%s", quiz_session.id
)
await session.rollback()
stats = await _attempt_stats(session, user.id, question_id)
return AttemptResponse(
id=attempt.id,
is_correct=is_correct,
selected_choice=selected,
correct_choice=q.correct_choice,
outcome=outcome,
explanation=q.explanation,
stats=stats,
quiz_session_id=quiz_session.id if quiz_session else None,
quiz_session_status=quiz_session.status if quiz_session else None,
quiz_session_cursor=quiz_session.cursor if quiz_session else None,
reviewed_at=attempt.reviewed_at,
)
# ─── PR-10: 학습완료 토글 ───
class AttemptReviewRequest(BaseModel):
reviewed: bool
class AttemptReviewResponse(BaseModel):
id: int
reviewed_at: datetime | None
@router.patch(
"/study-question-attempts/{attempt_id}/review-mark",
response_model=AttemptReviewResponse,
)
async def mark_attempt_reviewed(
attempt_id: int,
body: AttemptReviewRequest,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
):
"""결과 카드에서 [학습완료] 토글. reviewed=true 면 timestamp, false 면 NULL."""
attempt = await session.get(StudyQuestionAttempt, attempt_id)
if attempt is None or attempt.user_id != user.id:
raise HTTPException(status_code=404, detail="attempt 를 찾을 수 없습니다")
attempt.reviewed_at = datetime.now(timezone.utc) if body.reviewed else None
await session.commit()
return AttemptReviewResponse(id=attempt.id, reviewed_at=attempt.reviewed_at)
# ─── PR-5: 비슷한 문제 검색 (embedding cosine) ───
@router.get(
"/study-questions/{question_id}/similar",
response_model=SimilarQuestionsResponse,
)
async def list_similar_questions(
question_id: int,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
limit: int = Query(5, ge=1, le=20),
topic_only: bool = Query(True, description="True 면 같은 study_topic 안에서만"),
):
"""현재 문제의 embedding 기준 cosine 유사도 top-K. 26B 호출 없이 vector search 만.
제외:
- 자기 자신 (id != current)
- soft-deleted
- embedding_status != 'ready' (대기 중·실패·미생성)
source_status='ready' 가 아니면 items 빈 배열 반환 (UI 에서 안내).
"""
src = await session.get(StudyQuestion, question_id)
src = _verify_question_ownership(src, user)
if src.embedding_status != "ready" or src.embedding is None:
return SimilarQuestionsResponse(
items=[], source_status=src.embedding_status, source_id=question_id
)
# cosine 거리 = embedding <=> ref. 유사도 = 1 - distance.
distance_expr = StudyQuestion.embedding.cosine_distance(src.embedding)
base = (
select(StudyQuestion, distance_expr.label("distance"))
.where(
StudyQuestion.user_id == user.id,
StudyQuestion.id != question_id,
StudyQuestion.deleted_at.is_(None),
StudyQuestion.embedding_status == "ready",
StudyQuestion.embedding.is_not(None),
)
.order_by(distance_expr.asc())
.limit(limit)
)
if topic_only:
base = base.where(StudyQuestion.study_topic_id == src.study_topic_id)
rows = (await session.execute(base)).all()
similar_qs = [(r[0], float(r.distance)) for r in rows]
# attempt_count + last_correct batch (N+1 회피)
qids = [q.id for q, _ in similar_qs]
attempt_count_map: dict[int, int] = {}
last_correct_map: dict[int, bool] = {}
if qids:
cnt_rows = (
await session.execute(
select(
StudyQuestionAttempt.study_question_id,
func.count().label("total"),
)
.where(
StudyQuestionAttempt.user_id == user.id,
StudyQuestionAttempt.study_question_id.in_(qids),
)
.group_by(StudyQuestionAttempt.study_question_id)
)
).all()
for r in cnt_rows:
attempt_count_map[r.study_question_id] = int(r.total)
latest_rows = (
await session.execute(
select(
StudyQuestionAttempt.study_question_id,
StudyQuestionAttempt.is_correct,
)
.where(
StudyQuestionAttempt.user_id == user.id,
StudyQuestionAttempt.study_question_id.in_(qids),
)
.order_by(
StudyQuestionAttempt.study_question_id,
StudyQuestionAttempt.answered_at.desc(),
)
.distinct(StudyQuestionAttempt.study_question_id)
)
).all()
for r in latest_rows:
last_correct_map[r.study_question_id] = bool(r.is_correct)
def _truncate(text: str, n: int = 80) -> str:
return text if len(text) <= n else text[:n].rstrip() + ""
items = [
SimilarQuestionItem(
id=q.id,
study_topic_id=q.study_topic_id,
question_text=_truncate(q.question_text, 80),
subject=q.subject,
scope=q.scope,
exam_name=q.exam_name,
exam_round=q.exam_round,
similarity=round(1.0 - dist, 4),
attempt_count=attempt_count_map.get(q.id, 0),
last_correct=last_correct_map.get(q.id),
)
for q, dist in similar_qs
]
return SimilarQuestionsResponse(items=items, source_status="ready", source_id=question_id)
# ─── PR-12-A: 반복 출제 / 유사 유형 분류 ───
class RelatedQuestionItem(BaseModel):
id: int
study_topic_id: int
question_text: str
subject: str | None
scope: str | None
exam_round: str | None
exam_question_number: int | None
similarity: float
class RelatedTypesResponse(BaseModel):
source_id: int
source_status: str # ready | pending | failed | stale | none
source_exam_round: str | None # null/empty 면 두 리스트 모두 빈 + 카운트 0
# 다른 user / 같은 회차 / 자기 자신은 백엔드에서 모두 제외됨
repeat_questions: list[RelatedQuestionItem] # similarity >= 0.95
similar_questions: list[RelatedQuestionItem] # 0.88 <= similarity < 0.95
# 카운트 — round_count 가 1차 표시 기준 (회차 간 반복성).
repeat_related_count: int
repeat_round_count: int
repeat_grade: str | None = None # 단골/잘 나오는 반복 출제/반복 출제/신출/빈출 (round_count<2 면 null)
similar_related_count: int
similar_round_count: int
@router.get(
"/study-questions/{question_id}/related-types",
response_model=RelatedTypesResponse,
)
async def list_related_types(
question_id: int,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
):
"""반복 출제(🔥) / 유사 유형(🧩) 분리. 회차 조건 백엔드 강제.
학습 의미:
- 반복 출제 = 거의 같은 형태가 다른 회차에 다시 등장 (암기/패턴 고정 가치)
- 유사 유형 = 같은 개념·풀이 패턴이 다른 회차에 등장 (개념 복습 가치)
공통 service 함수 사용 — bulk endpoint 와 분류 로직 공유 (drift 회피).
PR-12-A 후속: study_questions.related_* 컬럼 캐시 우선 조회. cache hit (computed_at IS NOT NULL
+ threshold version 일치) 면 HNSW 검색 생략. miss 면 즉시 계산 + 캐시 저장 (cron 못 따라온 안전망).
"""
from datetime import datetime, timezone
from services.study.related_types import (
THRESHOLD_VERSION,
classify_related_for_question,
deserialize_candidates,
serialize_candidates,
)
src = await session.get(StudyQuestion, question_id)
src = _verify_question_ownership(src, user)
src_round = (src.exam_round or "").strip()
src_round_or_none = src_round if src_round else None
# 임베딩 미준비 → 빈 응답.
if src.embedding_status != "ready" or src.embedding is None:
return RelatedTypesResponse(
source_id=question_id,
source_status=src.embedding_status,
source_exam_round=src_round_or_none,
repeat_questions=[],
similar_questions=[],
repeat_related_count=0,
repeat_round_count=0,
similar_related_count=0,
similar_round_count=0,
)
def _candidates_to_items(cands):
return [
RelatedQuestionItem(
id=c.id,
study_topic_id=c.study_topic_id,
question_text=c.question_text,
subject=c.subject,
scope=c.scope,
exam_round=c.exam_round,
exam_question_number=c.exam_question_number,
similarity=round(c.similarity, 4),
)
for c in cands
]
# cache hit 조건: computed_at 존재 + 임계값 fingerprint 일치
if (
src.related_computed_at is not None
and src.related_threshold_version == THRESHOLD_VERSION
):
repeat_cands = deserialize_candidates(src.related_repeat, src.study_topic_id)
similar_cands = deserialize_candidates(src.related_similar, src.study_topic_id)
return RelatedTypesResponse(
source_id=question_id,
source_status="ready",
source_exam_round=src_round_or_none,
repeat_questions=_candidates_to_items(repeat_cands),
similar_questions=_candidates_to_items(similar_cands),
repeat_related_count=len(repeat_cands),
repeat_round_count=src.related_repeat_round_count or 0,
repeat_grade=src.related_repeat_grade,
similar_related_count=len(similar_cands),
similar_round_count=src.related_similar_round_count or 0,
)
# cache miss → 즉시 계산 + 저장 (cron 다음 틱 기다리지 않음, 빈 응답 회피)
cls = await classify_related_for_question(session, user_id=user.id, source=src)
src.related_repeat = serialize_candidates(cls.repeat)
src.related_similar = serialize_candidates(cls.similar)
src.related_repeat_round_count = cls.repeat_round_count
src.related_similar_round_count = cls.similar_round_count
src.related_repeat_grade = cls.repeat_grade
src.related_computed_at = datetime.now(timezone.utc)
src.related_threshold_version = THRESHOLD_VERSION
await session.commit()
return RelatedTypesResponse(
source_id=question_id,
source_status="ready",
source_exam_round=src_round_or_none,
repeat_questions=_candidates_to_items(cls.repeat),
similar_questions=_candidates_to_items(cls.similar),
repeat_related_count=cls.repeat_related_count,
repeat_round_count=cls.repeat_round_count,
repeat_grade=cls.repeat_grade,
similar_related_count=cls.similar_related_count,
similar_round_count=cls.similar_round_count,
)
# ─── PR-8: 이미지 업로드/조회/삭제 ───
@router.post(
"/study-questions/{question_id}/images",
response_model=StudyQuestionImageItem,
status_code=201,
)
async def upload_question_image(
question_id: int,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
file: UploadFile = File(...),
):
"""문제 첨부 이미지 업로드. 1파일 = 1행. 여러 호출로 여러 이미지 추가.
경로: /documents/study_question_images/{topic_id}/{qid}/{img_id}.{ext}
인증: 같은 user 의 question 만. 다른 사용자는 404.
제한: PNG/JPEG/WEBP/GIF, 10MB/파일.
"""
q = await session.get(StudyQuestion, question_id)
q = _verify_question_ownership(q, user)
# MIME 검증
mime = (file.content_type or "").lower()
if mime not in ALLOWED_IMAGE_MIME:
raise HTTPException(
status_code=415,
detail=f"지원하지 않는 이미지 형식: {mime} (PNG/JPEG/WEBP/GIF 만 허용)",
)
# 파일 읽기 + 크기 검증
raw = await file.read()
if len(raw) > MAX_IMAGE_BYTES:
raise HTTPException(
status_code=413,
detail=f"이미지 크기 초과 ({len(raw)} > {MAX_IMAGE_BYTES} 바이트, 10MB 제한)",
)
if len(raw) == 0:
raise HTTPException(status_code=400, detail="빈 파일")
# 확장자 결정
ext_map = {
"image/png": "png",
"image/jpeg": "jpg",
"image/webp": "webp",
"image/gif": "gif",
}
ext = ext_map[mime]
# DB 행 먼저 만들고 id 받음 (파일명에 사용)
img = StudyQuestionImage(
user_id=user.id,
study_question_id=q.id,
file_path="", # 일단 빈 값, id 받은 후 채움
file_size=len(raw),
mime_type=mime,
sort_order=0,
)
session.add(img)
await session.flush() # img.id 확보
# 파일 시스템 저장
target_dir = STUDY_IMG_ROOT / str(q.study_topic_id) / str(q.id)
target_dir.mkdir(parents=True, exist_ok=True)
target_path = target_dir / f"{img.id}.{ext}"
target_path.write_bytes(raw)
img.file_path = str(target_path)
# sort_order = 같은 question 의 max+1
max_sort = (
await session.execute(
select(func.coalesce(func.max(StudyQuestionImage.sort_order), -1))
.where(StudyQuestionImage.study_question_id == q.id)
.where(StudyQuestionImage.id != img.id)
)
).scalar() or -1
img.sort_order = int(max_sort) + 1
await session.commit()
return _image_to_item(img)
@router.get("/study-questions/{question_id}/images/{image_id}/raw")
async def get_question_image_raw(
question_id: int,
image_id: int,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
):
"""이미지 raw bytes. 같은 user 의 question 첨부만 응답."""
img = await session.get(StudyQuestionImage, image_id)
if img is None or img.user_id != user.id or img.study_question_id != question_id:
raise HTTPException(status_code=404, detail="이미지를 찾을 수 없습니다")
file_path = _Path(img.file_path)
if not file_path.is_file():
raise HTTPException(status_code=410, detail="파일이 사라졌습니다")
return FileResponse(str(file_path), media_type=img.mime_type)
@router.delete("/study-questions/{question_id}/images/{image_id}", status_code=204)
async def delete_question_image(
question_id: int,
image_id: int,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
):
img = await session.get(StudyQuestionImage, image_id)
if img is None or img.user_id != user.id or img.study_question_id != question_id:
raise HTTPException(status_code=404, detail="이미지를 찾을 수 없습니다")
# 파일 시스템 삭제 (실패해도 DB row 는 정리)
try:
file_path = _Path(img.file_path)
if file_path.is_file():
file_path.unlink()
except Exception as e:
logger.warning("study_q_image_unlink_failed id=%s: %s", image_id, e)
await session.delete(img)
await session.commit()
# ─── PR-3: AI 풀이 생성 엔드포인트 ───
# MLX 호출 timeout (초). MLX gate + 26B 추론 평균 ~10s, 안전 마진.
LLM_TIMEOUT_S = 30.0
# 프롬프트 템플릿 lazy load
_PROMPT_PATH = "study_question_explanation.txt"
_prompt_cache: str | None = None
def _load_explanation_prompt() -> str:
global _prompt_cache
if _prompt_cache is None:
from pathlib import Path
prompts_dir = Path(__file__).resolve().parent.parent / "prompts"
_prompt_cache = (prompts_dir / _PROMPT_PATH).read_text(encoding="utf-8")
return _prompt_cache
def _render_prompt(q: StudyQuestion, doc_block: str, question_block: str) -> str:
template = _load_explanation_prompt()
return (
template
.replace("{question_text}", q.question_text)
.replace("{choice_1}", q.choice_1)
.replace("{choice_2}", q.choice_2)
.replace("{choice_3}", q.choice_3)
.replace("{choice_4}", q.choice_4)
.replace("{correct_choice}", str(q.correct_choice))
.replace("{documents_evidence_block}", doc_block)
.replace("{questions_evidence_block}", question_block)
)
def _make_cache_response(q: StudyQuestion, is_stale: bool) -> AIExplanationResponse:
"""캐시 본문만 반환 (evidence 재계산 안 함). status/메타는 q row 그대로."""
return AIExplanationResponse(
ai_explanation=q.ai_explanation,
ai_explanation_status=q.ai_explanation_status,
ai_explanation_generated_at=q.ai_explanation_generated_at,
ai_explanation_model=q.ai_explanation_model,
evidence=[],
from_cache=True,
is_stale=is_stale,
can_regenerate=True,
)
@router.post(
"/study-questions/{question_id}/ai-explanation",
response_model=AIExplanationResponse,
)
async def generate_ai_explanation(
question_id: int,
body: AIExplanationCreate,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
):
"""수동 트리거 AI 풀이 생성. RAG = 매핑 자료 + 같은 토픽 다른 문제 (자기 자신 제외).
primary 모델 단독 (Mac mini MLX gemma-4-26b). MLX gate Semaphore(1) 경유.
- regenerate=false 이고 status=ready → 캐시 즉시 반환 (MLX 호출 없음)
- regenerate=false 이고 status=stale → 캐시 본문 반환, is_stale=true 플래그
- status=pending → 409 (이미 다른 호출 진행 중, race-safe 조건부 UPDATE 로 보장)
- 그 외 (none/failed/regenerate=true) → 새로 생성
실패 시 status='failed', 직전 본문은 보존.
"""
q = await session.get(StudyQuestion, question_id)
q = _verify_question_ownership(q, user)
# 1) 캐시 단축 분기 (regenerate=false)
if not body.regenerate:
if q.ai_explanation_status == "ready":
return _make_cache_response(q, is_stale=False)
if q.ai_explanation_status == "stale":
return _make_cache_response(q, is_stale=True)
if q.ai_explanation_status == "pending":
raise HTTPException(
status_code=409,
detail={"status": "pending", "detail": "이미 생성 중입니다"},
)
# status in {none, failed} → 신규 생성
else:
# regenerate=true 라도 pending 은 차단 (동시 호출 방어)
if q.ai_explanation_status == "pending":
raise HTTPException(
status_code=409,
detail={"status": "pending", "detail": "이미 생성 중입니다"},
)
# 2) Race-safe pending 전이 (조건부 UPDATE — 동시 호출 차단)
lock_result = await session.execute(
update(StudyQuestion)
.where(
StudyQuestion.id == question_id,
StudyQuestion.user_id == user.id,
StudyQuestion.ai_explanation_status != "pending",
)
.values(ai_explanation_status="pending", updated_at=datetime.now(timezone.utc))
.returning(StudyQuestion.id)
)
if lock_result.scalar_one_or_none() is None:
# 다른 호출이 먼저 pending 박음
raise HTTPException(
status_code=409,
detail={"status": "pending", "detail": "이미 생성 중입니다"},
)
await session.commit()
await session.refresh(q)
# 3) RAG 근거 수집
try:
ctx = await gather_explanation_context(session, user.id, q)
except Exception as e:
logger.warning("study_explanation_rag_failed: %s: %s", type(e).__name__, e)
# RAG 실패 시 빈 evidence 로 진행 (프롬프트가 "자료 근거 부족" 분기 처리)
from services.study.explanation_rag import ExplanationContext
ctx = ExplanationContext(documents=[], questions=[])
# 4) 프롬프트 조립 + MLX gate 안에서 primary 호출 + timeout
doc_block = render_evidence_block(ctx.documents)
q_block = render_evidence_block(ctx.questions)
prompt = _render_prompt(q, doc_block, q_block)
ai_client = AIClient()
raw_text: str | None = None
error_message: str | None = None
try:
async with get_mlx_gate():
async with asyncio.timeout(LLM_TIMEOUT_S):
raw_text = await ai_client.call_primary(prompt)
except asyncio.TimeoutError:
error_message = f"MLX timeout ({LLM_TIMEOUT_S}s)"
logger.warning("study_explanation_mlx_timeout qid=%s", question_id)
except Exception as e:
error_message = f"{type(e).__name__}: {e}"
logger.exception("study_explanation_mlx_failed qid=%s", question_id)
finally:
await ai_client.close()
# 5) 결과 저장 (성공/실패 분기) — q 객체 fresh refresh
q = await session.get(StudyQuestion, question_id)
if raw_text is None or not raw_text.strip():
# 실패 — 직전 본문 보존, status='failed'
q.ai_explanation_status = "failed"
q.updated_at = datetime.now(timezone.utc)
await session.commit()
return AIExplanationResponse(
ai_explanation=q.ai_explanation,
ai_explanation_status="failed",
ai_explanation_generated_at=q.ai_explanation_generated_at,
ai_explanation_model=q.ai_explanation_model,
evidence=[e.to_dict() for e in ctx.all],
from_cache=False,
is_stale=False,
can_regenerate=True,
)
# 성공 — Qwen think 태그 등 제거
from ai.client import strip_thinking
cleaned = strip_thinking(raw_text).strip()
q.ai_explanation = cleaned
q.ai_explanation_status = "ready"
q.ai_explanation_generated_at = datetime.now(timezone.utc)
primary_name = ai_client.ai.primary.model if hasattr(ai_client.ai.primary, "model") else "primary"
q.ai_explanation_model = f"mlx:{primary_name}"
q.updated_at = q.ai_explanation_generated_at
await session.commit()
return AIExplanationResponse(
ai_explanation=q.ai_explanation,
ai_explanation_status="ready",
ai_explanation_generated_at=q.ai_explanation_generated_at,
ai_explanation_model=q.ai_explanation_model,
evidence=[e.to_dict() for e in ctx.all],
from_cache=False,
is_stale=False,
can_regenerate=True,
)