Files
hyungi_document_server/app/api/study_questions.py
T
Hyungi Ahn 7f4d64c6df feat(study): 문제풀이 세션 + 결과 카드 + 학습완료 체크 (PR-10)
- study_quiz_sessions 테이블 (한 토픽 in_progress 1개 partial unique)
- study_question_attempts 에 quiz_session_id + reviewed_at 컬럼
- 풀이 진행률 서버 단일 진실 (cursor) — 나갔다 와도 이어풀기 가능
- 통합뷰: 진행 중 카드(이어풀기) + 최근 완료 결과 카드(미확인 N건 배지)
- 신규 /quiz-sessions/[sid] 결과 페이지 (3 카테고리 + AI 해설 + 분야 설명 + 학습완료 토글)
- /review 페이지는 풀이만, 마지막 문제 풀이 후 결과 페이지로 redirect
- 마이그레이션 206~209 (single-statement, asyncpg 호환)
- API: POST/GET/PATCH /study-topics/{tid}/quiz-sessions(/{sid}),
       PATCH /study-question-attempts/{aid}/review-mark
- AttemptCreate.quiz_session_id 추가 — submit_attempt 가 같은 트랜잭션에서
  세션 cursor + count 증가, 마지막이면 status='done' + finished_at

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-28 16:49:21 +09:00

1384 lines
49 KiB
Python

"""학습 워크스페이스 문제은행 + 복습모드 API.
PR-2 가드레일:
- study_topic 컨테이너에 자산 타입별 1:N (문제) 추가. polymorphic 단일 테이블 영구 금지.
- subject/scope 강한 enum 미사용 (자유 텍스트, 자동완성은 후속 PR).
- 문제 삭제는 soft delete only. attempts 는 RESTRICT FK 로 DB 레벨 보호.
- correct_choice 변경 시 기존 attempts 재계산 안 함 (기록 = 시점 사실).
- 복습 출제 시 정답·해설 응답 비공개 → 답 제출 시점에만 노출.
- wrong_only=true 정의 = 가장 최근 attempt 가 오답인 문제. "한 번이라도 틀린 적 있는" 아님.
"""
import asyncio
import logging
import random
from datetime import datetime, timezone
from typing import Annotated
from fastapi import APIRouter, Depends, File, HTTPException, Query, UploadFile
from fastapi.responses import FileResponse
from pydantic import BaseModel, Field
from sqlalchemy import and_, case, func, select, text as sql_text, update
from sqlalchemy.ext.asyncio import AsyncSession
from ai.client import AIClient
from core.auth import get_current_user
from core.config import settings
from core.database import get_session
from models.study_question import StudyQuestion, StudyQuestionAttempt
from models.study_question_image import StudyQuestionImage
from models.study_quiz_session import StudyQuizSession
from models.study_topic import StudyTopic
from models.user import User
from services.search.llm_gate import get_mlx_gate
from services.study.explanation_rag import (
EvidenceItem,
gather_explanation_context,
render_evidence_block,
)
logger = logging.getLogger(__name__)
router = APIRouter()
# ─── Helpers ───
def _verify_topic_ownership(topic: StudyTopic | None, user: User) -> StudyTopic:
"""study_topics.py 와 동일한 패턴 — 정보 누설 방지로 mismatch 도 404."""
if topic is None or topic.user_id != user.id or topic.deleted_at is not None:
raise HTTPException(status_code=404, detail="학습 주제를 찾을 수 없습니다")
return topic
def _verify_question_ownership(q: StudyQuestion | None, user: User) -> StudyQuestion:
if q is None or q.user_id != user.id or q.deleted_at is not None:
raise HTTPException(status_code=404, detail="문제를 찾을 수 없습니다")
return q
# ─── Pydantic 스키마 ───
class StudyQuestionCreate(BaseModel):
question_text: str = Field(min_length=1)
choice_1: str = Field(min_length=1)
choice_2: str = Field(min_length=1)
choice_3: str = Field(min_length=1)
choice_4: str = Field(min_length=1)
correct_choice: int = Field(ge=1, le=4)
subject: str | None = Field(default=None, max_length=120)
scope: str | None = Field(default=None, max_length=200)
exam_name: str | None = Field(default=None, max_length=120)
exam_round: str | None = Field(default=None, max_length=120)
# PR-6: 회차 안 문항 번호. 미명시 + exam_round 명시 시 서버가 max+1 자동 채움.
exam_question_number: int | None = Field(default=None, ge=1)
explanation: str | None = None
source_note: str | None = None
is_active: bool = True
class StudyQuestionUpdate(BaseModel):
question_text: str | None = Field(default=None, min_length=1)
choice_1: str | None = Field(default=None, min_length=1)
choice_2: str | None = Field(default=None, min_length=1)
choice_3: str | None = Field(default=None, min_length=1)
choice_4: str | None = Field(default=None, min_length=1)
correct_choice: int | None = Field(default=None, ge=1, le=4)
subject: str | None = Field(default=None, max_length=120)
scope: str | None = Field(default=None, max_length=200)
exam_name: str | None = Field(default=None, max_length=120)
exam_round: str | None = Field(default=None, max_length=120)
exam_question_number: int | None = Field(default=None, ge=1)
explanation: str | None = None
source_note: str | None = None
is_active: bool | None = None
class QuestionAttemptStats(BaseModel):
attempt_count: int
correct_count: int
wrong_count: int
class StudyQuestionImageItem(BaseModel):
"""문제 첨부 이미지 메타. raw bytes 는 별도 GET endpoint."""
id: int
sort_order: int
mime_type: str
file_size: int
served_url: str # 클라이언트 표시용 — /api/study-questions/{qid}/images/{img_id}/raw
class StudyQuestionResponse(BaseModel):
"""편집·관리용 — 정답·해설 모두 노출."""
id: int
study_topic_id: int
question_text: str
choice_1: str
choice_2: str
choice_3: str
choice_4: str
correct_choice: int
subject: str | None
scope: str | None
exam_name: str | None
exam_round: str | None
explanation: str | None
source_note: str | None
is_active: bool
# PR-6: 회차 안 문항 번호
exam_question_number: int | None = None
# PR-3: AI 풀이 상태 (편집 화면에서 사용). 본문은 별도 GET /ai-explanation 으로
ai_explanation_status: str = "none"
ai_explanation_generated_at: datetime | None = None
ai_explanation_model: str | None = None
# PR-8: 첨부 이미지
images: list[StudyQuestionImageItem] = []
created_at: datetime
updated_at: datetime
stats: QuestionAttemptStats
class StudyQuestionSummary(BaseModel):
"""통합 뷰·목록용 — 본문 truncate, 정답 비공개."""
id: int
question_text: str # 앞 80자 truncate (서버 측에서 처리)
subject: str | None
scope: str | None
exam_name: str | None
exam_round: str | None
is_active: bool
has_images: bool = False
attempt_count: int
last_correct: bool | None
created_at: datetime
class StudyQuestionListResponse(BaseModel):
items: list[StudyQuestionSummary]
total: int
page: int
page_size: int
class ReviewChoice(BaseModel):
number: int
text: str
class ReviewQuestionItem(BaseModel):
"""복습 출제 응답 — 정답·해설 비공개."""
id: int
question_text: str
choices: list[ReviewChoice]
subject: str | None
scope: str | None
stats: QuestionAttemptStats
images: list[StudyQuestionImageItem] = []
class ReviewQuestionListResponse(BaseModel):
items: list[ReviewQuestionItem]
total: int
target_per_subject: int
subject_distribution: dict[str, int] # {"연소공학": 20, ...}
class AttemptCreate(BaseModel):
"""PR-9: selected_choice (1~4) 또는 is_unsure 둘 중 하나 필수.
is_unsure=true 면 selected_choice 무시 + outcome='unsure' 로 박힘.
PR-10: quiz_session_id 전달 시 세션의 cursor + count 도 함께 갱신.
"""
selected_choice: int | None = Field(default=None, ge=1, le=4)
is_unsure: bool = False
quiz_session_id: int | None = None
class AttemptResponse(BaseModel):
id: int # PR-10: 학습완료 토글 시 attempt 단위로 PATCH 하므로 id 노출 필요.
is_correct: bool
selected_choice: int | None
correct_choice: int
outcome: str # PR-9: correct | wrong | unsure
explanation: str | None
stats: QuestionAttemptStats
quiz_session_id: int | None = None
quiz_session_status: str | None = None # in_progress | done (마지막 문제면 done 으로 박힘)
quiz_session_cursor: int | None = None
reviewed_at: datetime | None = None
# ─── PR-5: 비슷한 문제 검색 (embedding cosine) ───
class SimilarQuestionItem(BaseModel):
id: int
study_topic_id: int
question_text: str # 80자 truncate
subject: str | None
scope: str | None
exam_name: str | None
exam_round: str | None
similarity: float # 1 - cosine_distance, 1.0=동일, 0.0=무관
attempt_count: int
last_correct: bool | None
class SimilarQuestionsResponse(BaseModel):
items: list[SimilarQuestionItem]
source_status: str # 현재 문제의 embedding_status. 'ready' 가 아니면 빈 결과.
source_id: int
# ─── PR-3: AI 풀이 생성 ───
class AIExplanationCreate(BaseModel):
"""수동 트리거. regenerate=true 면 캐시 무시하고 새 본문 생성."""
regenerate: bool = False
class AIExplanationEvidence(BaseModel):
source_type: str # 'document' | 'question'
source_id: int
title: str
snippet: str
class AIExplanationResponse(BaseModel):
ai_explanation: str | None
ai_explanation_status: str # ready | stale | failed | none
ai_explanation_generated_at: datetime | None
ai_explanation_model: str | None
evidence: list[AIExplanationEvidence] = []
from_cache: bool = False
is_stale: bool = False
can_regenerate: bool = True
# ─── 통계 헬퍼 ───
# PR-8: 이미지 저장 root + 헬퍼
from pathlib import Path as _Path
STUDY_IMG_ROOT = _Path(settings.nas_mount_path) / "study_question_images"
ALLOWED_IMAGE_MIME = {"image/png", "image/jpeg", "image/webp", "image/gif"}
MAX_IMAGE_BYTES = 10 * 1024 * 1024 # 10MB / 파일
def _image_url(qid: int, img_id: int) -> str:
return f"/api/study-questions/{qid}/images/{img_id}/raw"
def _image_to_item(img: StudyQuestionImage) -> StudyQuestionImageItem:
return StudyQuestionImageItem(
id=img.id,
sort_order=img.sort_order,
mime_type=img.mime_type,
file_size=img.file_size,
served_url=_image_url(img.study_question_id, img.id),
)
async def _images_for_question(
session: AsyncSession, question_id: int
) -> list[StudyQuestionImageItem]:
rows = (
await session.execute(
select(StudyQuestionImage)
.where(StudyQuestionImage.study_question_id == question_id)
.order_by(StudyQuestionImage.sort_order.asc(), StudyQuestionImage.id.asc())
)
).scalars().all()
return [_image_to_item(r) for r in rows]
async def _images_for_questions_batch(
session: AsyncSession, question_ids: list[int]
) -> dict[int, list[StudyQuestionImageItem]]:
"""N+1 회피 — 여러 question 의 이미지 한 번에 조회."""
if not question_ids:
return {}
rows = (
await session.execute(
select(StudyQuestionImage)
.where(StudyQuestionImage.study_question_id.in_(question_ids))
.order_by(
StudyQuestionImage.study_question_id,
StudyQuestionImage.sort_order.asc(),
StudyQuestionImage.id.asc(),
)
)
).scalars().all()
out: dict[int, list[StudyQuestionImageItem]] = {qid: [] for qid in question_ids}
for r in rows:
out.setdefault(r.study_question_id, []).append(_image_to_item(r))
return out
async def _attempt_stats(
session: AsyncSession, user_id: int, question_id: int
) -> QuestionAttemptStats:
"""단일 문제의 누적 attempt 통계 (사용자 한정)."""
row = (
await session.execute(
select(
func.count().label("total"),
func.coalesce(
func.sum(case((StudyQuestionAttempt.is_correct.is_(True), 1), else_=0)),
0,
).label("correct"),
).where(
StudyQuestionAttempt.user_id == user_id,
StudyQuestionAttempt.study_question_id == question_id,
)
)
).one()
total = int(row.total or 0)
correct = int(row.correct or 0)
return QuestionAttemptStats(
attempt_count=total, correct_count=correct, wrong_count=total - correct
)
def _truncate(text: str, n: int = 80) -> str:
return text if len(text) <= n else text[:n].rstrip() + ""
# ─── 토픽 단위 엔드포인트 ───
@router.get(
"/study-topics/{topic_id}/questions",
response_model=StudyQuestionListResponse,
)
async def list_questions_in_topic(
topic_id: int,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
subject: str | None = Query(None),
scope: str | None = Query(None),
page: int = Query(1, ge=1),
page_size: int = Query(50, ge=1, le=200),
):
"""토픽 내 문제 목록. soft-deleted 제외. summary 응답 (정답 비공개)."""
topic = await session.get(StudyTopic, topic_id)
_verify_topic_ownership(topic, user)
base = select(StudyQuestion).where(
StudyQuestion.user_id == user.id,
StudyQuestion.study_topic_id == topic_id,
StudyQuestion.deleted_at.is_(None),
)
if subject is not None:
base = base.where(StudyQuestion.subject == subject)
if scope is not None:
base = base.where(StudyQuestion.scope == scope)
total = (
await session.execute(select(func.count()).select_from(base.subquery()))
).scalar() or 0
rows = (
await session.execute(
base.order_by(StudyQuestion.created_at.desc(), StudyQuestion.id.desc())
.offset((page - 1) * page_size)
.limit(page_size)
)
).scalars().all()
# 한 번의 쿼리로 attempt 통계 + 마지막 정/오답 끌어오기 (N+1 회피)
qids = [q.id for q in rows]
stats_map: dict[int, tuple[int, int]] = {}
last_correct_map: dict[int, bool] = {}
if qids:
stats_rows = (
await session.execute(
select(
StudyQuestionAttempt.study_question_id,
func.count().label("total"),
func.coalesce(
func.sum(case((StudyQuestionAttempt.is_correct.is_(True), 1), else_=0)),
0,
).label("correct"),
)
.where(
StudyQuestionAttempt.user_id == user.id,
StudyQuestionAttempt.study_question_id.in_(qids),
)
.group_by(StudyQuestionAttempt.study_question_id)
)
).all()
for r in stats_rows:
stats_map[r.study_question_id] = (int(r.total), int(r.correct))
latest_rows = (
await session.execute(
select(
StudyQuestionAttempt.study_question_id,
StudyQuestionAttempt.is_correct,
StudyQuestionAttempt.answered_at,
)
.where(
StudyQuestionAttempt.user_id == user.id,
StudyQuestionAttempt.study_question_id.in_(qids),
)
.order_by(
StudyQuestionAttempt.study_question_id,
StudyQuestionAttempt.answered_at.desc(),
)
.distinct(StudyQuestionAttempt.study_question_id)
)
).all()
for r in latest_rows:
last_correct_map[r.study_question_id] = bool(r.is_correct)
# PR-8: has_images batch
has_image_set: set[int] = set()
if qids:
img_rows = (
await session.execute(
select(StudyQuestionImage.study_question_id)
.where(StudyQuestionImage.study_question_id.in_(qids))
.distinct()
)
).scalars().all()
has_image_set = set(img_rows)
items = [
StudyQuestionSummary(
id=q.id,
question_text=_truncate(q.question_text, 80),
subject=q.subject,
scope=q.scope,
exam_name=q.exam_name,
exam_round=q.exam_round,
is_active=q.is_active,
attempt_count=stats_map.get(q.id, (0, 0))[0],
last_correct=last_correct_map.get(q.id),
has_images=q.id in has_image_set,
created_at=q.created_at,
)
for q in rows
]
return StudyQuestionListResponse(items=items, total=total, page=page, page_size=page_size)
@router.post(
"/study-topics/{topic_id}/questions",
response_model=StudyQuestionResponse,
status_code=201,
)
async def create_question_in_topic(
topic_id: int,
body: StudyQuestionCreate,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
):
topic = await session.get(StudyTopic, topic_id)
_verify_topic_ownership(topic, user)
# PR-6: exam_question_number 미명시 + exam_round 명시 시 서버가 max+1 자동 채움.
# 개인 사용 환경 race 부담 적어 단순 SELECT max 사용. 동시 입력 빈번해지면 향후
# SELECT FOR UPDATE 또는 토픽 단위 advisory lock 으로 강화 검토.
qnum = body.exam_question_number
if qnum is None and body.exam_round:
max_row = await session.execute(
select(func.coalesce(func.max(StudyQuestion.exam_question_number), 0))
.where(
StudyQuestion.user_id == user.id,
StudyQuestion.study_topic_id == topic_id,
StudyQuestion.exam_round == body.exam_round,
StudyQuestion.deleted_at.is_(None),
)
)
qnum = int(max_row.scalar() or 0) + 1
q = StudyQuestion(
user_id=user.id,
study_topic_id=topic_id,
question_text=body.question_text,
choice_1=body.choice_1,
choice_2=body.choice_2,
choice_3=body.choice_3,
choice_4=body.choice_4,
correct_choice=body.correct_choice,
subject=body.subject,
scope=body.scope,
exam_name=body.exam_name,
exam_round=body.exam_round,
exam_question_number=qnum,
explanation=body.explanation,
source_note=body.source_note,
is_active=body.is_active,
)
session.add(q)
await session.flush()
await session.commit()
stats = QuestionAttemptStats(attempt_count=0, correct_count=0, wrong_count=0)
return StudyQuestionResponse(
id=q.id,
study_topic_id=q.study_topic_id,
question_text=q.question_text,
choice_1=q.choice_1,
choice_2=q.choice_2,
choice_3=q.choice_3,
choice_4=q.choice_4,
correct_choice=q.correct_choice,
subject=q.subject,
scope=q.scope,
exam_name=q.exam_name,
exam_round=q.exam_round,
explanation=q.explanation,
source_note=q.source_note,
is_active=q.is_active,
exam_question_number=q.exam_question_number,
ai_explanation_status=q.ai_explanation_status,
ai_explanation_generated_at=q.ai_explanation_generated_at,
ai_explanation_model=q.ai_explanation_model,
images=await _images_for_question(session, q.id),
created_at=q.created_at,
updated_at=q.updated_at,
stats=stats,
)
# ─── 복습 출제 ───
@router.get(
"/study-topics/{topic_id}/review/questions",
response_model=ReviewQuestionListResponse,
)
async def review_questions_for_topic(
topic_id: int,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
subject: str | None = Query(None, description="단일 과목 집중. 미지정 시 과목별 균등 추출"),
scope: str | None = Query(None),
target_per_subject: int = Query(20, ge=1, le=100),
wrong_only: bool = Query(
False,
description="가장 최근 attempt 가 오답인 문제만 (latest-wrong, ever_wrong 아님)",
),
):
"""복습 출제. default = 과목별 target_per_subject 무작위 균등 추출.
응답에서 정답·해설은 비공개 — 답 제출(POST /api/study-questions/{id}/attempt) 시점에만 노출.
"""
topic = await session.get(StudyTopic, topic_id)
_verify_topic_ownership(topic, user)
# 후보 질문 base — soft delete 제외 + is_active
base_filter = and_(
StudyQuestion.user_id == user.id,
StudyQuestion.study_topic_id == topic_id,
StudyQuestion.deleted_at.is_(None),
StudyQuestion.is_active.is_(True),
)
# wrong_only=true: latest attempt 가 오답인 question_id 만 후보로 좁힘.
# DISTINCT ON (study_question_id) ORDER BY answered_at DESC 패턴.
wrong_qids: set[int] | None = None
if wrong_only:
latest = (
await session.execute(
select(
StudyQuestionAttempt.study_question_id,
StudyQuestionAttempt.is_correct,
)
.where(
StudyQuestionAttempt.user_id == user.id,
StudyQuestionAttempt.study_topic_id == topic_id,
)
.order_by(
StudyQuestionAttempt.study_question_id,
StudyQuestionAttempt.answered_at.desc(),
)
.distinct(StudyQuestionAttempt.study_question_id)
)
).all()
wrong_qids = {r.study_question_id for r in latest if r.is_correct is False}
if not wrong_qids:
return ReviewQuestionListResponse(
items=[], total=0, target_per_subject=target_per_subject, subject_distribution={}
)
# 과목 그룹 결정
if subject is not None:
# 단일 과목 집중
subjects: list[str | None] = [subject]
else:
subj_query = (
select(StudyQuestion.subject)
.where(base_filter)
.distinct()
)
if scope is not None:
subj_query = subj_query.where(StudyQuestion.scope == scope)
if wrong_qids is not None:
subj_query = subj_query.where(StudyQuestion.id.in_(wrong_qids))
subjects = [r[0] for r in (await session.execute(subj_query)).all()]
if not subjects:
return ReviewQuestionListResponse(
items=[], total=0, target_per_subject=target_per_subject, subject_distribution={}
)
# 각 subject 별로 ORDER BY random() LIMIT target_per_subject
selected: list[StudyQuestion] = []
distribution: dict[str, int] = {}
for subj in subjects:
sub_q = select(StudyQuestion).where(base_filter)
if subj is None:
sub_q = sub_q.where(StudyQuestion.subject.is_(None))
else:
sub_q = sub_q.where(StudyQuestion.subject == subj)
if scope is not None:
sub_q = sub_q.where(StudyQuestion.scope == scope)
if wrong_qids is not None:
sub_q = sub_q.where(StudyQuestion.id.in_(wrong_qids))
sub_q = sub_q.order_by(func.random()).limit(target_per_subject)
rows = (await session.execute(sub_q)).scalars().all()
if rows:
selected.extend(rows)
distribution[subj or ""] = len(rows)
# 그룹별 추출 후 전체 셔플 (입력 순서 단조 회피)
random.shuffle(selected)
# attempt 통계 batch (N+1 회피)
qids = [q.id for q in selected]
stats_map: dict[int, QuestionAttemptStats] = {}
if qids:
stats_rows = (
await session.execute(
select(
StudyQuestionAttempt.study_question_id,
func.count().label("total"),
func.coalesce(
func.sum(case((StudyQuestionAttempt.is_correct.is_(True), 1), else_=0)),
0,
).label("correct"),
)
.where(
StudyQuestionAttempt.user_id == user.id,
StudyQuestionAttempt.study_question_id.in_(qids),
)
.group_by(StudyQuestionAttempt.study_question_id)
)
).all()
for r in stats_rows:
t = int(r.total)
c = int(r.correct)
stats_map[r.study_question_id] = QuestionAttemptStats(
attempt_count=t, correct_count=c, wrong_count=t - c
)
# PR-8: 이미지 batch
images_map = await _images_for_questions_batch(session, qids)
items = [
ReviewQuestionItem(
id=q.id,
question_text=q.question_text,
choices=[
ReviewChoice(number=1, text=q.choice_1),
ReviewChoice(number=2, text=q.choice_2),
ReviewChoice(number=3, text=q.choice_3),
ReviewChoice(number=4, text=q.choice_4),
],
subject=q.subject,
scope=q.scope,
stats=stats_map.get(
q.id, QuestionAttemptStats(attempt_count=0, correct_count=0, wrong_count=0)
),
images=images_map.get(q.id, []),
)
for q in selected
]
return ReviewQuestionListResponse(
items=items,
total=len(items),
target_per_subject=target_per_subject,
subject_distribution=distribution,
)
# ─── 단건 엔드포인트 ───
@router.get("/study-questions/{question_id}", response_model=StudyQuestionResponse)
async def get_question(
question_id: int,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
):
q = await session.get(StudyQuestion, question_id)
q = _verify_question_ownership(q, user)
stats = await _attempt_stats(session, user.id, question_id)
return StudyQuestionResponse(
id=q.id,
study_topic_id=q.study_topic_id,
question_text=q.question_text,
choice_1=q.choice_1,
choice_2=q.choice_2,
choice_3=q.choice_3,
choice_4=q.choice_4,
correct_choice=q.correct_choice,
subject=q.subject,
scope=q.scope,
exam_name=q.exam_name,
exam_round=q.exam_round,
explanation=q.explanation,
source_note=q.source_note,
is_active=q.is_active,
exam_question_number=q.exam_question_number,
ai_explanation_status=q.ai_explanation_status,
ai_explanation_generated_at=q.ai_explanation_generated_at,
ai_explanation_model=q.ai_explanation_model,
images=await _images_for_question(session, q.id),
created_at=q.created_at,
updated_at=q.updated_at,
stats=stats,
)
@router.patch("/study-questions/{question_id}", response_model=StudyQuestionResponse)
async def update_question(
question_id: int,
body: StudyQuestionUpdate,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
):
"""부분 업데이트. correct_choice 변경 시 기존 attempts.is_correct 재계산 안 함 (시점 사실 보존).
프론트 편집 페이지에 안내 배너 노출."""
q = await session.get(StudyQuestion, question_id)
q = _verify_question_ownership(q, user)
fields_set = body.model_fields_set
SIMPLE_FIELDS = {
"question_text", "choice_1", "choice_2", "choice_3", "choice_4",
"correct_choice", "subject", "scope", "exam_name", "exam_round",
"exam_question_number",
"explanation", "source_note", "is_active",
}
for fname in SIMPLE_FIELDS & fields_set:
setattr(q, fname, getattr(body, fname))
# PR-3: 문제 핵심 필드 변경 시 AI 해설 stale 전이 (본문은 보존, UI 배지로 안내).
# ready 상태에서만 stale 로 전이 — pending/failed/none/stale 은 변경 안 함.
AI_STALE_TRIGGER = {
"question_text", "choice_1", "choice_2", "choice_3", "choice_4", "correct_choice",
}
if AI_STALE_TRIGGER & fields_set and q.ai_explanation_status == "ready":
q.ai_explanation_status = "stale"
# PR-4: 임베딩 stale 전이. 본문(question_text/choice_*)이 바뀌었을 때만 재계산.
# correct_choice 변경은 의미 검색에 영향 없으므로 재계산 안 함.
EMBED_STALE_TRIGGER = {
"question_text", "choice_1", "choice_2", "choice_3", "choice_4",
}
if EMBED_STALE_TRIGGER & fields_set and q.embedding_status == "ready":
q.embedding_status = "stale"
q.updated_at = datetime.now(timezone.utc)
await session.commit()
stats = await _attempt_stats(session, user.id, question_id)
return StudyQuestionResponse(
id=q.id,
study_topic_id=q.study_topic_id,
question_text=q.question_text,
choice_1=q.choice_1,
choice_2=q.choice_2,
choice_3=q.choice_3,
choice_4=q.choice_4,
correct_choice=q.correct_choice,
subject=q.subject,
scope=q.scope,
exam_name=q.exam_name,
exam_round=q.exam_round,
explanation=q.explanation,
source_note=q.source_note,
is_active=q.is_active,
exam_question_number=q.exam_question_number,
ai_explanation_status=q.ai_explanation_status,
ai_explanation_generated_at=q.ai_explanation_generated_at,
ai_explanation_model=q.ai_explanation_model,
images=await _images_for_question(session, q.id),
created_at=q.created_at,
updated_at=q.updated_at,
stats=stats,
)
@router.delete("/study-questions/{question_id}", status_code=204)
async def soft_delete_question(
question_id: int,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
):
"""soft delete only. attempts 는 RESTRICT FK 로 보호되어 영구 보존.
hard delete 호출 경로 자체가 없음 — DB 레벨에서도 attempts 가 있으면 거부."""
q = await session.get(StudyQuestion, question_id)
q = _verify_question_ownership(q, user)
q.deleted_at = datetime.now(timezone.utc)
q.updated_at = q.deleted_at
await session.commit()
# ─── attempt ───
@router.post("/study-questions/{question_id}/attempt", response_model=AttemptResponse)
async def submit_attempt(
question_id: int,
body: AttemptCreate,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
):
"""답 제출. PR-9: is_unsure=true 면 outcome='unsure' + selected_choice=NULL.
그 외엔 selected_choice 와 correct_choice 비교 → outcome='correct'/'wrong'.
PR-10: quiz_session_id 전달 시 같은 트랜잭션에서 세션 cursor + count 도 갱신.
cursor 가 question_ids 끝까지 도달하면 status='done', finished_at 박힘.
"""
q = await session.get(StudyQuestion, question_id)
q = _verify_question_ownership(q, user)
if body.is_unsure:
selected = None
is_correct = False
outcome = "unsure"
elif body.selected_choice is None:
raise HTTPException(
status_code=422,
detail="selected_choice (1~4) 또는 is_unsure=true 가 필요합니다",
)
else:
selected = body.selected_choice
is_correct = selected == q.correct_choice
outcome = "correct" if is_correct else "wrong"
# PR-10: 세션 연동. 기본은 None.
quiz_session: StudyQuizSession | None = None
if body.quiz_session_id is not None:
quiz_session = await session.get(StudyQuizSession, body.quiz_session_id)
if quiz_session is None or quiz_session.user_id != user.id:
raise HTTPException(status_code=404, detail="quiz_session 을 찾을 수 없습니다")
if quiz_session.study_topic_id != q.study_topic_id:
raise HTTPException(
status_code=400,
detail="quiz_session 의 토픽과 문제의 토픽이 다릅니다",
)
if quiz_session.status != "in_progress":
raise HTTPException(
status_code=409,
detail=f"quiz_session 상태가 {quiz_session.status} — 이미 종료된 세션입니다",
)
# 현재 cursor 위치의 문제와 일치해야만 진행 (out-of-order 제출 차단).
qids = quiz_session.question_ids or []
if quiz_session.cursor >= len(qids):
raise HTTPException(status_code=409, detail="이미 모든 문제를 풀었습니다")
if qids[quiz_session.cursor] != q.id:
raise HTTPException(
status_code=409,
detail="현재 cursor 위치의 문제와 question_id 가 일치하지 않습니다 (이중 제출/순서 어긋남)",
)
attempt = StudyQuestionAttempt(
user_id=user.id,
study_question_id=q.id,
study_topic_id=q.study_topic_id,
selected_choice=selected,
correct_choice=q.correct_choice,
is_correct=is_correct,
outcome=outcome,
quiz_session_id=quiz_session.id if quiz_session else None,
)
session.add(attempt)
if quiz_session is not None:
quiz_session.cursor = quiz_session.cursor + 1
if outcome == "correct":
quiz_session.correct_count += 1
elif outcome == "wrong":
quiz_session.wrong_count += 1
elif outcome == "unsure":
quiz_session.unsure_count += 1
if quiz_session.cursor >= len(quiz_session.question_ids or []):
quiz_session.status = "done"
quiz_session.finished_at = datetime.now(timezone.utc)
quiz_session.updated_at = datetime.now(timezone.utc)
await session.commit()
await session.refresh(attempt)
stats = await _attempt_stats(session, user.id, question_id)
return AttemptResponse(
id=attempt.id,
is_correct=is_correct,
selected_choice=selected,
correct_choice=q.correct_choice,
outcome=outcome,
explanation=q.explanation,
stats=stats,
quiz_session_id=quiz_session.id if quiz_session else None,
quiz_session_status=quiz_session.status if quiz_session else None,
quiz_session_cursor=quiz_session.cursor if quiz_session else None,
reviewed_at=attempt.reviewed_at,
)
# ─── PR-10: 학습완료 토글 ───
class AttemptReviewRequest(BaseModel):
reviewed: bool
class AttemptReviewResponse(BaseModel):
id: int
reviewed_at: datetime | None
@router.patch(
"/study-question-attempts/{attempt_id}/review-mark",
response_model=AttemptReviewResponse,
)
async def mark_attempt_reviewed(
attempt_id: int,
body: AttemptReviewRequest,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
):
"""결과 카드에서 [학습완료] 토글. reviewed=true 면 timestamp, false 면 NULL."""
attempt = await session.get(StudyQuestionAttempt, attempt_id)
if attempt is None or attempt.user_id != user.id:
raise HTTPException(status_code=404, detail="attempt 를 찾을 수 없습니다")
attempt.reviewed_at = datetime.now(timezone.utc) if body.reviewed else None
await session.commit()
return AttemptReviewResponse(id=attempt.id, reviewed_at=attempt.reviewed_at)
# ─── PR-5: 비슷한 문제 검색 (embedding cosine) ───
@router.get(
"/study-questions/{question_id}/similar",
response_model=SimilarQuestionsResponse,
)
async def list_similar_questions(
question_id: int,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
limit: int = Query(5, ge=1, le=20),
topic_only: bool = Query(True, description="True 면 같은 study_topic 안에서만"),
):
"""현재 문제의 embedding 기준 cosine 유사도 top-K. 26B 호출 없이 vector search 만.
제외:
- 자기 자신 (id != current)
- soft-deleted
- embedding_status != 'ready' (대기 중·실패·미생성)
source_status='ready' 가 아니면 items 빈 배열 반환 (UI 에서 안내).
"""
src = await session.get(StudyQuestion, question_id)
src = _verify_question_ownership(src, user)
if src.embedding_status != "ready" or src.embedding is None:
return SimilarQuestionsResponse(
items=[], source_status=src.embedding_status, source_id=question_id
)
# cosine 거리 = embedding <=> ref. 유사도 = 1 - distance.
distance_expr = StudyQuestion.embedding.cosine_distance(src.embedding)
base = (
select(StudyQuestion, distance_expr.label("distance"))
.where(
StudyQuestion.user_id == user.id,
StudyQuestion.id != question_id,
StudyQuestion.deleted_at.is_(None),
StudyQuestion.embedding_status == "ready",
StudyQuestion.embedding.is_not(None),
)
.order_by(distance_expr.asc())
.limit(limit)
)
if topic_only:
base = base.where(StudyQuestion.study_topic_id == src.study_topic_id)
rows = (await session.execute(base)).all()
similar_qs = [(r[0], float(r.distance)) for r in rows]
# attempt_count + last_correct batch (N+1 회피)
qids = [q.id for q, _ in similar_qs]
attempt_count_map: dict[int, int] = {}
last_correct_map: dict[int, bool] = {}
if qids:
cnt_rows = (
await session.execute(
select(
StudyQuestionAttempt.study_question_id,
func.count().label("total"),
)
.where(
StudyQuestionAttempt.user_id == user.id,
StudyQuestionAttempt.study_question_id.in_(qids),
)
.group_by(StudyQuestionAttempt.study_question_id)
)
).all()
for r in cnt_rows:
attempt_count_map[r.study_question_id] = int(r.total)
latest_rows = (
await session.execute(
select(
StudyQuestionAttempt.study_question_id,
StudyQuestionAttempt.is_correct,
)
.where(
StudyQuestionAttempt.user_id == user.id,
StudyQuestionAttempt.study_question_id.in_(qids),
)
.order_by(
StudyQuestionAttempt.study_question_id,
StudyQuestionAttempt.answered_at.desc(),
)
.distinct(StudyQuestionAttempt.study_question_id)
)
).all()
for r in latest_rows:
last_correct_map[r.study_question_id] = bool(r.is_correct)
def _truncate(text: str, n: int = 80) -> str:
return text if len(text) <= n else text[:n].rstrip() + ""
items = [
SimilarQuestionItem(
id=q.id,
study_topic_id=q.study_topic_id,
question_text=_truncate(q.question_text, 80),
subject=q.subject,
scope=q.scope,
exam_name=q.exam_name,
exam_round=q.exam_round,
similarity=round(1.0 - dist, 4),
attempt_count=attempt_count_map.get(q.id, 0),
last_correct=last_correct_map.get(q.id),
)
for q, dist in similar_qs
]
return SimilarQuestionsResponse(items=items, source_status="ready", source_id=question_id)
# ─── PR-8: 이미지 업로드/조회/삭제 ───
@router.post(
"/study-questions/{question_id}/images",
response_model=StudyQuestionImageItem,
status_code=201,
)
async def upload_question_image(
question_id: int,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
file: UploadFile = File(...),
):
"""문제 첨부 이미지 업로드. 1파일 = 1행. 여러 호출로 여러 이미지 추가.
경로: /documents/study_question_images/{topic_id}/{qid}/{img_id}.{ext}
인증: 같은 user 의 question 만. 다른 사용자는 404.
제한: PNG/JPEG/WEBP/GIF, 10MB/파일.
"""
q = await session.get(StudyQuestion, question_id)
q = _verify_question_ownership(q, user)
# MIME 검증
mime = (file.content_type or "").lower()
if mime not in ALLOWED_IMAGE_MIME:
raise HTTPException(
status_code=415,
detail=f"지원하지 않는 이미지 형식: {mime} (PNG/JPEG/WEBP/GIF 만 허용)",
)
# 파일 읽기 + 크기 검증
raw = await file.read()
if len(raw) > MAX_IMAGE_BYTES:
raise HTTPException(
status_code=413,
detail=f"이미지 크기 초과 ({len(raw)} > {MAX_IMAGE_BYTES} 바이트, 10MB 제한)",
)
if len(raw) == 0:
raise HTTPException(status_code=400, detail="빈 파일")
# 확장자 결정
ext_map = {
"image/png": "png",
"image/jpeg": "jpg",
"image/webp": "webp",
"image/gif": "gif",
}
ext = ext_map[mime]
# DB 행 먼저 만들고 id 받음 (파일명에 사용)
img = StudyQuestionImage(
user_id=user.id,
study_question_id=q.id,
file_path="", # 일단 빈 값, id 받은 후 채움
file_size=len(raw),
mime_type=mime,
sort_order=0,
)
session.add(img)
await session.flush() # img.id 확보
# 파일 시스템 저장
target_dir = STUDY_IMG_ROOT / str(q.study_topic_id) / str(q.id)
target_dir.mkdir(parents=True, exist_ok=True)
target_path = target_dir / f"{img.id}.{ext}"
target_path.write_bytes(raw)
img.file_path = str(target_path)
# sort_order = 같은 question 의 max+1
max_sort = (
await session.execute(
select(func.coalesce(func.max(StudyQuestionImage.sort_order), -1))
.where(StudyQuestionImage.study_question_id == q.id)
.where(StudyQuestionImage.id != img.id)
)
).scalar() or -1
img.sort_order = int(max_sort) + 1
await session.commit()
return _image_to_item(img)
@router.get("/study-questions/{question_id}/images/{image_id}/raw")
async def get_question_image_raw(
question_id: int,
image_id: int,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
):
"""이미지 raw bytes. 같은 user 의 question 첨부만 응답."""
img = await session.get(StudyQuestionImage, image_id)
if img is None or img.user_id != user.id or img.study_question_id != question_id:
raise HTTPException(status_code=404, detail="이미지를 찾을 수 없습니다")
file_path = _Path(img.file_path)
if not file_path.is_file():
raise HTTPException(status_code=410, detail="파일이 사라졌습니다")
return FileResponse(str(file_path), media_type=img.mime_type)
@router.delete("/study-questions/{question_id}/images/{image_id}", status_code=204)
async def delete_question_image(
question_id: int,
image_id: int,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
):
img = await session.get(StudyQuestionImage, image_id)
if img is None or img.user_id != user.id or img.study_question_id != question_id:
raise HTTPException(status_code=404, detail="이미지를 찾을 수 없습니다")
# 파일 시스템 삭제 (실패해도 DB row 는 정리)
try:
file_path = _Path(img.file_path)
if file_path.is_file():
file_path.unlink()
except Exception as e:
logger.warning("study_q_image_unlink_failed id=%s: %s", image_id, e)
await session.delete(img)
await session.commit()
# ─── PR-3: AI 풀이 생성 엔드포인트 ───
# MLX 호출 timeout (초). MLX gate + 26B 추론 평균 ~10s, 안전 마진.
LLM_TIMEOUT_S = 30.0
# 프롬프트 템플릿 lazy load
_PROMPT_PATH = "study_question_explanation.txt"
_prompt_cache: str | None = None
def _load_explanation_prompt() -> str:
global _prompt_cache
if _prompt_cache is None:
from pathlib import Path
prompts_dir = Path(__file__).resolve().parent.parent / "prompts"
_prompt_cache = (prompts_dir / _PROMPT_PATH).read_text(encoding="utf-8")
return _prompt_cache
def _render_prompt(q: StudyQuestion, doc_block: str, question_block: str) -> str:
template = _load_explanation_prompt()
return (
template
.replace("{question_text}", q.question_text)
.replace("{choice_1}", q.choice_1)
.replace("{choice_2}", q.choice_2)
.replace("{choice_3}", q.choice_3)
.replace("{choice_4}", q.choice_4)
.replace("{correct_choice}", str(q.correct_choice))
.replace("{documents_evidence_block}", doc_block)
.replace("{questions_evidence_block}", question_block)
)
def _make_cache_response(q: StudyQuestion, is_stale: bool) -> AIExplanationResponse:
"""캐시 본문만 반환 (evidence 재계산 안 함). status/메타는 q row 그대로."""
return AIExplanationResponse(
ai_explanation=q.ai_explanation,
ai_explanation_status=q.ai_explanation_status,
ai_explanation_generated_at=q.ai_explanation_generated_at,
ai_explanation_model=q.ai_explanation_model,
evidence=[],
from_cache=True,
is_stale=is_stale,
can_regenerate=True,
)
@router.post(
"/study-questions/{question_id}/ai-explanation",
response_model=AIExplanationResponse,
)
async def generate_ai_explanation(
question_id: int,
body: AIExplanationCreate,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
):
"""수동 트리거 AI 풀이 생성. RAG = 매핑 자료 + 같은 토픽 다른 문제 (자기 자신 제외).
primary 모델 단독 (Mac mini MLX gemma-4-26b). MLX gate Semaphore(1) 경유.
- regenerate=false 이고 status=ready → 캐시 즉시 반환 (MLX 호출 없음)
- regenerate=false 이고 status=stale → 캐시 본문 반환, is_stale=true 플래그
- status=pending → 409 (이미 다른 호출 진행 중, race-safe 조건부 UPDATE 로 보장)
- 그 외 (none/failed/regenerate=true) → 새로 생성
실패 시 status='failed', 직전 본문은 보존.
"""
q = await session.get(StudyQuestion, question_id)
q = _verify_question_ownership(q, user)
# 1) 캐시 단축 분기 (regenerate=false)
if not body.regenerate:
if q.ai_explanation_status == "ready":
return _make_cache_response(q, is_stale=False)
if q.ai_explanation_status == "stale":
return _make_cache_response(q, is_stale=True)
if q.ai_explanation_status == "pending":
raise HTTPException(
status_code=409,
detail={"status": "pending", "detail": "이미 생성 중입니다"},
)
# status in {none, failed} → 신규 생성
else:
# regenerate=true 라도 pending 은 차단 (동시 호출 방어)
if q.ai_explanation_status == "pending":
raise HTTPException(
status_code=409,
detail={"status": "pending", "detail": "이미 생성 중입니다"},
)
# 2) Race-safe pending 전이 (조건부 UPDATE — 동시 호출 차단)
lock_result = await session.execute(
update(StudyQuestion)
.where(
StudyQuestion.id == question_id,
StudyQuestion.user_id == user.id,
StudyQuestion.ai_explanation_status != "pending",
)
.values(ai_explanation_status="pending", updated_at=datetime.now(timezone.utc))
.returning(StudyQuestion.id)
)
if lock_result.scalar_one_or_none() is None:
# 다른 호출이 먼저 pending 박음
raise HTTPException(
status_code=409,
detail={"status": "pending", "detail": "이미 생성 중입니다"},
)
await session.commit()
await session.refresh(q)
# 3) RAG 근거 수집
try:
ctx = await gather_explanation_context(session, user.id, q)
except Exception as e:
logger.warning("study_explanation_rag_failed: %s: %s", type(e).__name__, e)
# RAG 실패 시 빈 evidence 로 진행 (프롬프트가 "자료 근거 부족" 분기 처리)
from services.study.explanation_rag import ExplanationContext
ctx = ExplanationContext(documents=[], questions=[])
# 4) 프롬프트 조립 + MLX gate 안에서 primary 호출 + timeout
doc_block = render_evidence_block(ctx.documents)
q_block = render_evidence_block(ctx.questions)
prompt = _render_prompt(q, doc_block, q_block)
ai_client = AIClient()
raw_text: str | None = None
error_message: str | None = None
try:
async with get_mlx_gate():
async with asyncio.timeout(LLM_TIMEOUT_S):
raw_text = await ai_client.call_primary(prompt)
except asyncio.TimeoutError:
error_message = f"MLX timeout ({LLM_TIMEOUT_S}s)"
logger.warning("study_explanation_mlx_timeout qid=%s", question_id)
except Exception as e:
error_message = f"{type(e).__name__}: {e}"
logger.exception("study_explanation_mlx_failed qid=%s", question_id)
finally:
await ai_client.close()
# 5) 결과 저장 (성공/실패 분기) — q 객체 fresh refresh
q = await session.get(StudyQuestion, question_id)
if raw_text is None or not raw_text.strip():
# 실패 — 직전 본문 보존, status='failed'
q.ai_explanation_status = "failed"
q.updated_at = datetime.now(timezone.utc)
await session.commit()
return AIExplanationResponse(
ai_explanation=q.ai_explanation,
ai_explanation_status="failed",
ai_explanation_generated_at=q.ai_explanation_generated_at,
ai_explanation_model=q.ai_explanation_model,
evidence=[e.to_dict() for e in ctx.all],
from_cache=False,
is_stale=False,
can_regenerate=True,
)
# 성공 — Qwen think 태그 등 제거
from ai.client import strip_thinking
cleaned = strip_thinking(raw_text).strip()
q.ai_explanation = cleaned
q.ai_explanation_status = "ready"
q.ai_explanation_generated_at = datetime.now(timezone.utc)
primary_name = ai_client.ai.primary.model if hasattr(ai_client.ai.primary, "model") else "primary"
q.ai_explanation_model = f"mlx:{primary_name}"
q.updated_at = q.ai_explanation_generated_at
await session.commit()
return AIExplanationResponse(
ai_explanation=q.ai_explanation,
ai_explanation_status="ready",
ai_explanation_generated_at=q.ai_explanation_generated_at,
ai_explanation_model=q.ai_explanation_model,
evidence=[e.to_dict() for e in ctx.all],
from_cache=False,
is_stale=False,
can_regenerate=True,
)