Files
hyungi_document_server/app/api/study_questions.py
T
Hyungi Ahn 8803e6a0fd feat(study): 시험·회차·문항 관리 (PR-6)
기사시험 회차별 100문제 채워가기 시나리오. 문제 입력 페이지를 단순 폼에서
"회차 진행률 추적·재개" 도구로 보강.

데이터 모델 (migrations 195~197):
- study_topics: exam_round_size INT CHECK 1~300 (회차당 문항 수, NULL=미설정)
  + exam_subjects JSONB DEFAULT '[]' (과목 리스트, 입력 페이지 드롭다운 옵션)
- study_questions: exam_question_number SMALLINT CHECK >0 (회차 안 문항 번호)
- partial idx (study_topic_id, exam_round, exam_question_number) WHERE
  deleted_at IS NULL AND exam_round IS NOT NULL — 회차별 max+count 고속화

백엔드:
- POST /questions: exam_round 명시 + exam_question_number 미명시 시 서버가
  같은 토픽·회차의 max+1 자동 채움
- 신규 GET /api/study-topics/{id}/exam-rounds: 회차별 진행률 집계
  {exam_round_size, items: [{exam_round, question_count, max_question_number,
   next_question_number, is_complete}]}
- StudyTopic Create/Update/Response/Meta 에 exam_round_size·exam_subjects
- StudyQuestion Create/Update/Response 에 exam_question_number
- exam_question_number 변경은 embedding stale 트리거에서 제외 (의미 영향 없음)

프론트:
- 토픽 생성/편집 모달: "시험 정보" 섹션 (회차당 문항 수 + 과목 리스트
  +추가/제거 칩)
- /study/topics/[id]/exam-rounds 신규 페이지: 회차 카드 + 진행 바 +
  [N번부터 이어서] 버튼 + [새 회차 시작] 모달
- 통합뷰 문제 섹션 헤더에 [회차 보기] 진입점
- /questions/new 페이지 전면 개편:
  - 시험명 = topic.name 자동 prefill
  - 과목 드롭다운 (topic.exam_subjects + 기존 distinct, "직접 입력" 토글)
  - 회차 드롭다운 (기존 distinct + "새 회차")
  - 문항 번호 자동 (회차 선택 시 next_question_number, 새 회차 = 1)
  - 진행률 바 (현재/exam_round_size)
  - 출처/메모 자동 합성 "회차 N번" (수정 가능)
  - "저장 후 계속 입력" → 본문/보기/정답 reset, 회차 유지, 문항 +1
  - 회차 변경 감지 시 문항 번호 1로 reset
  - exam_round_size 도달 시 회차 강조 + "저장 후 계속 입력" 비활성
- query string ?exam_round=&start_qnum= 지원 (회차 목록에서 재개 진입)

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-28 09:31:06 +09:00

1059 lines
37 KiB
Python

"""학습 워크스페이스 문제은행 + 복습모드 API.
PR-2 가드레일:
- study_topic 컨테이너에 자산 타입별 1:N (문제) 추가. polymorphic 단일 테이블 영구 금지.
- subject/scope 강한 enum 미사용 (자유 텍스트, 자동완성은 후속 PR).
- 문제 삭제는 soft delete only. attempts 는 RESTRICT FK 로 DB 레벨 보호.
- correct_choice 변경 시 기존 attempts 재계산 안 함 (기록 = 시점 사실).
- 복습 출제 시 정답·해설 응답 비공개 → 답 제출 시점에만 노출.
- wrong_only=true 정의 = 가장 최근 attempt 가 오답인 문제. "한 번이라도 틀린 적 있는" 아님.
"""
import asyncio
import logging
import random
from datetime import datetime, timezone
from typing import Annotated
from fastapi import APIRouter, Depends, HTTPException, Query
from pydantic import BaseModel, Field
from sqlalchemy import and_, case, func, select, text as sql_text, update
from sqlalchemy.ext.asyncio import AsyncSession
from ai.client import AIClient
from core.auth import get_current_user
from core.database import get_session
from models.study_question import StudyQuestion, StudyQuestionAttempt
from models.study_topic import StudyTopic
from models.user import User
from services.search.llm_gate import get_mlx_gate
from services.study.explanation_rag import (
EvidenceItem,
gather_explanation_context,
render_evidence_block,
)
logger = logging.getLogger(__name__)
router = APIRouter()
# ─── Helpers ───
def _verify_topic_ownership(topic: StudyTopic | None, user: User) -> StudyTopic:
"""study_topics.py 와 동일한 패턴 — 정보 누설 방지로 mismatch 도 404."""
if topic is None or topic.user_id != user.id or topic.deleted_at is not None:
raise HTTPException(status_code=404, detail="학습 주제를 찾을 수 없습니다")
return topic
def _verify_question_ownership(q: StudyQuestion | None, user: User) -> StudyQuestion:
if q is None or q.user_id != user.id or q.deleted_at is not None:
raise HTTPException(status_code=404, detail="문제를 찾을 수 없습니다")
return q
# ─── Pydantic 스키마 ───
class StudyQuestionCreate(BaseModel):
question_text: str = Field(min_length=1)
choice_1: str = Field(min_length=1)
choice_2: str = Field(min_length=1)
choice_3: str = Field(min_length=1)
choice_4: str = Field(min_length=1)
correct_choice: int = Field(ge=1, le=4)
subject: str | None = Field(default=None, max_length=120)
scope: str | None = Field(default=None, max_length=200)
exam_name: str | None = Field(default=None, max_length=120)
exam_round: str | None = Field(default=None, max_length=120)
# PR-6: 회차 안 문항 번호. 미명시 + exam_round 명시 시 서버가 max+1 자동 채움.
exam_question_number: int | None = Field(default=None, ge=1)
explanation: str | None = None
source_note: str | None = None
is_active: bool = True
class StudyQuestionUpdate(BaseModel):
question_text: str | None = Field(default=None, min_length=1)
choice_1: str | None = Field(default=None, min_length=1)
choice_2: str | None = Field(default=None, min_length=1)
choice_3: str | None = Field(default=None, min_length=1)
choice_4: str | None = Field(default=None, min_length=1)
correct_choice: int | None = Field(default=None, ge=1, le=4)
subject: str | None = Field(default=None, max_length=120)
scope: str | None = Field(default=None, max_length=200)
exam_name: str | None = Field(default=None, max_length=120)
exam_round: str | None = Field(default=None, max_length=120)
exam_question_number: int | None = Field(default=None, ge=1)
explanation: str | None = None
source_note: str | None = None
is_active: bool | None = None
class QuestionAttemptStats(BaseModel):
attempt_count: int
correct_count: int
wrong_count: int
class StudyQuestionResponse(BaseModel):
"""편집·관리용 — 정답·해설 모두 노출."""
id: int
study_topic_id: int
question_text: str
choice_1: str
choice_2: str
choice_3: str
choice_4: str
correct_choice: int
subject: str | None
scope: str | None
exam_name: str | None
exam_round: str | None
explanation: str | None
source_note: str | None
is_active: bool
# PR-6: 회차 안 문항 번호
exam_question_number: int | None = None
# PR-3: AI 풀이 상태 (편집 화면에서 사용). 본문은 별도 GET /ai-explanation 으로
ai_explanation_status: str = "none"
ai_explanation_generated_at: datetime | None = None
ai_explanation_model: str | None = None
created_at: datetime
updated_at: datetime
stats: QuestionAttemptStats
class StudyQuestionSummary(BaseModel):
"""통합 뷰·목록용 — 본문 truncate, 정답 비공개."""
id: int
question_text: str # 앞 80자 truncate (서버 측에서 처리)
subject: str | None
scope: str | None
exam_name: str | None
exam_round: str | None
is_active: bool
attempt_count: int
last_correct: bool | None
created_at: datetime
class StudyQuestionListResponse(BaseModel):
items: list[StudyQuestionSummary]
total: int
page: int
page_size: int
class ReviewChoice(BaseModel):
number: int
text: str
class ReviewQuestionItem(BaseModel):
"""복습 출제 응답 — 정답·해설 비공개."""
id: int
question_text: str
choices: list[ReviewChoice]
subject: str | None
scope: str | None
stats: QuestionAttemptStats
class ReviewQuestionListResponse(BaseModel):
items: list[ReviewQuestionItem]
total: int
target_per_subject: int
subject_distribution: dict[str, int] # {"연소공학": 20, ...}
class AttemptCreate(BaseModel):
selected_choice: int = Field(ge=1, le=4)
class AttemptResponse(BaseModel):
is_correct: bool
selected_choice: int
correct_choice: int
explanation: str | None
stats: QuestionAttemptStats
# ─── PR-5: 비슷한 문제 검색 (embedding cosine) ───
class SimilarQuestionItem(BaseModel):
id: int
study_topic_id: int
question_text: str # 80자 truncate
subject: str | None
scope: str | None
exam_name: str | None
exam_round: str | None
similarity: float # 1 - cosine_distance, 1.0=동일, 0.0=무관
attempt_count: int
last_correct: bool | None
class SimilarQuestionsResponse(BaseModel):
items: list[SimilarQuestionItem]
source_status: str # 현재 문제의 embedding_status. 'ready' 가 아니면 빈 결과.
source_id: int
# ─── PR-3: AI 풀이 생성 ───
class AIExplanationCreate(BaseModel):
"""수동 트리거. regenerate=true 면 캐시 무시하고 새 본문 생성."""
regenerate: bool = False
class AIExplanationEvidence(BaseModel):
source_type: str # 'document' | 'question'
source_id: int
title: str
snippet: str
class AIExplanationResponse(BaseModel):
ai_explanation: str | None
ai_explanation_status: str # ready | stale | failed | none
ai_explanation_generated_at: datetime | None
ai_explanation_model: str | None
evidence: list[AIExplanationEvidence] = []
from_cache: bool = False
is_stale: bool = False
can_regenerate: bool = True
# ─── 통계 헬퍼 ───
async def _attempt_stats(
session: AsyncSession, user_id: int, question_id: int
) -> QuestionAttemptStats:
"""단일 문제의 누적 attempt 통계 (사용자 한정)."""
row = (
await session.execute(
select(
func.count().label("total"),
func.coalesce(
func.sum(case((StudyQuestionAttempt.is_correct.is_(True), 1), else_=0)),
0,
).label("correct"),
).where(
StudyQuestionAttempt.user_id == user_id,
StudyQuestionAttempt.study_question_id == question_id,
)
)
).one()
total = int(row.total or 0)
correct = int(row.correct or 0)
return QuestionAttemptStats(
attempt_count=total, correct_count=correct, wrong_count=total - correct
)
def _truncate(text: str, n: int = 80) -> str:
return text if len(text) <= n else text[:n].rstrip() + ""
# ─── 토픽 단위 엔드포인트 ───
@router.get(
"/study-topics/{topic_id}/questions",
response_model=StudyQuestionListResponse,
)
async def list_questions_in_topic(
topic_id: int,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
subject: str | None = Query(None),
scope: str | None = Query(None),
page: int = Query(1, ge=1),
page_size: int = Query(50, ge=1, le=200),
):
"""토픽 내 문제 목록. soft-deleted 제외. summary 응답 (정답 비공개)."""
topic = await session.get(StudyTopic, topic_id)
_verify_topic_ownership(topic, user)
base = select(StudyQuestion).where(
StudyQuestion.user_id == user.id,
StudyQuestion.study_topic_id == topic_id,
StudyQuestion.deleted_at.is_(None),
)
if subject is not None:
base = base.where(StudyQuestion.subject == subject)
if scope is not None:
base = base.where(StudyQuestion.scope == scope)
total = (
await session.execute(select(func.count()).select_from(base.subquery()))
).scalar() or 0
rows = (
await session.execute(
base.order_by(StudyQuestion.created_at.desc(), StudyQuestion.id.desc())
.offset((page - 1) * page_size)
.limit(page_size)
)
).scalars().all()
# 한 번의 쿼리로 attempt 통계 + 마지막 정/오답 끌어오기 (N+1 회피)
qids = [q.id for q in rows]
stats_map: dict[int, tuple[int, int]] = {}
last_correct_map: dict[int, bool] = {}
if qids:
stats_rows = (
await session.execute(
select(
StudyQuestionAttempt.study_question_id,
func.count().label("total"),
func.coalesce(
func.sum(case((StudyQuestionAttempt.is_correct.is_(True), 1), else_=0)),
0,
).label("correct"),
)
.where(
StudyQuestionAttempt.user_id == user.id,
StudyQuestionAttempt.study_question_id.in_(qids),
)
.group_by(StudyQuestionAttempt.study_question_id)
)
).all()
for r in stats_rows:
stats_map[r.study_question_id] = (int(r.total), int(r.correct))
latest_rows = (
await session.execute(
select(
StudyQuestionAttempt.study_question_id,
StudyQuestionAttempt.is_correct,
StudyQuestionAttempt.answered_at,
)
.where(
StudyQuestionAttempt.user_id == user.id,
StudyQuestionAttempt.study_question_id.in_(qids),
)
.order_by(
StudyQuestionAttempt.study_question_id,
StudyQuestionAttempt.answered_at.desc(),
)
.distinct(StudyQuestionAttempt.study_question_id)
)
).all()
for r in latest_rows:
last_correct_map[r.study_question_id] = bool(r.is_correct)
items = [
StudyQuestionSummary(
id=q.id,
question_text=_truncate(q.question_text, 80),
subject=q.subject,
scope=q.scope,
exam_name=q.exam_name,
exam_round=q.exam_round,
is_active=q.is_active,
attempt_count=stats_map.get(q.id, (0, 0))[0],
last_correct=last_correct_map.get(q.id),
created_at=q.created_at,
)
for q in rows
]
return StudyQuestionListResponse(items=items, total=total, page=page, page_size=page_size)
@router.post(
"/study-topics/{topic_id}/questions",
response_model=StudyQuestionResponse,
status_code=201,
)
async def create_question_in_topic(
topic_id: int,
body: StudyQuestionCreate,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
):
topic = await session.get(StudyTopic, topic_id)
_verify_topic_ownership(topic, user)
# PR-6: exam_question_number 미명시 + exam_round 명시 시 서버가 max+1 자동 채움.
# 개인 사용 환경 race 부담 적어 단순 SELECT max 사용. 동시 입력 빈번해지면 향후
# SELECT FOR UPDATE 또는 토픽 단위 advisory lock 으로 강화 검토.
qnum = body.exam_question_number
if qnum is None and body.exam_round:
max_row = await session.execute(
select(func.coalesce(func.max(StudyQuestion.exam_question_number), 0))
.where(
StudyQuestion.user_id == user.id,
StudyQuestion.study_topic_id == topic_id,
StudyQuestion.exam_round == body.exam_round,
StudyQuestion.deleted_at.is_(None),
)
)
qnum = int(max_row.scalar() or 0) + 1
q = StudyQuestion(
user_id=user.id,
study_topic_id=topic_id,
question_text=body.question_text,
choice_1=body.choice_1,
choice_2=body.choice_2,
choice_3=body.choice_3,
choice_4=body.choice_4,
correct_choice=body.correct_choice,
subject=body.subject,
scope=body.scope,
exam_name=body.exam_name,
exam_round=body.exam_round,
exam_question_number=qnum,
explanation=body.explanation,
source_note=body.source_note,
is_active=body.is_active,
)
session.add(q)
await session.flush()
await session.commit()
stats = QuestionAttemptStats(attempt_count=0, correct_count=0, wrong_count=0)
return StudyQuestionResponse(
id=q.id,
study_topic_id=q.study_topic_id,
question_text=q.question_text,
choice_1=q.choice_1,
choice_2=q.choice_2,
choice_3=q.choice_3,
choice_4=q.choice_4,
correct_choice=q.correct_choice,
subject=q.subject,
scope=q.scope,
exam_name=q.exam_name,
exam_round=q.exam_round,
explanation=q.explanation,
source_note=q.source_note,
is_active=q.is_active,
exam_question_number=q.exam_question_number,
ai_explanation_status=q.ai_explanation_status,
ai_explanation_generated_at=q.ai_explanation_generated_at,
ai_explanation_model=q.ai_explanation_model,
created_at=q.created_at,
updated_at=q.updated_at,
stats=stats,
)
# ─── 복습 출제 ───
@router.get(
"/study-topics/{topic_id}/review/questions",
response_model=ReviewQuestionListResponse,
)
async def review_questions_for_topic(
topic_id: int,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
subject: str | None = Query(None, description="단일 과목 집중. 미지정 시 과목별 균등 추출"),
scope: str | None = Query(None),
target_per_subject: int = Query(20, ge=1, le=100),
wrong_only: bool = Query(
False,
description="가장 최근 attempt 가 오답인 문제만 (latest-wrong, ever_wrong 아님)",
),
):
"""복습 출제. default = 과목별 target_per_subject 무작위 균등 추출.
응답에서 정답·해설은 비공개 — 답 제출(POST /api/study-questions/{id}/attempt) 시점에만 노출.
"""
topic = await session.get(StudyTopic, topic_id)
_verify_topic_ownership(topic, user)
# 후보 질문 base — soft delete 제외 + is_active
base_filter = and_(
StudyQuestion.user_id == user.id,
StudyQuestion.study_topic_id == topic_id,
StudyQuestion.deleted_at.is_(None),
StudyQuestion.is_active.is_(True),
)
# wrong_only=true: latest attempt 가 오답인 question_id 만 후보로 좁힘.
# DISTINCT ON (study_question_id) ORDER BY answered_at DESC 패턴.
wrong_qids: set[int] | None = None
if wrong_only:
latest = (
await session.execute(
select(
StudyQuestionAttempt.study_question_id,
StudyQuestionAttempt.is_correct,
)
.where(
StudyQuestionAttempt.user_id == user.id,
StudyQuestionAttempt.study_topic_id == topic_id,
)
.order_by(
StudyQuestionAttempt.study_question_id,
StudyQuestionAttempt.answered_at.desc(),
)
.distinct(StudyQuestionAttempt.study_question_id)
)
).all()
wrong_qids = {r.study_question_id for r in latest if r.is_correct is False}
if not wrong_qids:
return ReviewQuestionListResponse(
items=[], total=0, target_per_subject=target_per_subject, subject_distribution={}
)
# 과목 그룹 결정
if subject is not None:
# 단일 과목 집중
subjects: list[str | None] = [subject]
else:
subj_query = (
select(StudyQuestion.subject)
.where(base_filter)
.distinct()
)
if scope is not None:
subj_query = subj_query.where(StudyQuestion.scope == scope)
if wrong_qids is not None:
subj_query = subj_query.where(StudyQuestion.id.in_(wrong_qids))
subjects = [r[0] for r in (await session.execute(subj_query)).all()]
if not subjects:
return ReviewQuestionListResponse(
items=[], total=0, target_per_subject=target_per_subject, subject_distribution={}
)
# 각 subject 별로 ORDER BY random() LIMIT target_per_subject
selected: list[StudyQuestion] = []
distribution: dict[str, int] = {}
for subj in subjects:
sub_q = select(StudyQuestion).where(base_filter)
if subj is None:
sub_q = sub_q.where(StudyQuestion.subject.is_(None))
else:
sub_q = sub_q.where(StudyQuestion.subject == subj)
if scope is not None:
sub_q = sub_q.where(StudyQuestion.scope == scope)
if wrong_qids is not None:
sub_q = sub_q.where(StudyQuestion.id.in_(wrong_qids))
sub_q = sub_q.order_by(func.random()).limit(target_per_subject)
rows = (await session.execute(sub_q)).scalars().all()
if rows:
selected.extend(rows)
distribution[subj or ""] = len(rows)
# 그룹별 추출 후 전체 셔플 (입력 순서 단조 회피)
random.shuffle(selected)
# attempt 통계 batch (N+1 회피)
qids = [q.id for q in selected]
stats_map: dict[int, QuestionAttemptStats] = {}
if qids:
stats_rows = (
await session.execute(
select(
StudyQuestionAttempt.study_question_id,
func.count().label("total"),
func.coalesce(
func.sum(case((StudyQuestionAttempt.is_correct.is_(True), 1), else_=0)),
0,
).label("correct"),
)
.where(
StudyQuestionAttempt.user_id == user.id,
StudyQuestionAttempt.study_question_id.in_(qids),
)
.group_by(StudyQuestionAttempt.study_question_id)
)
).all()
for r in stats_rows:
t = int(r.total)
c = int(r.correct)
stats_map[r.study_question_id] = QuestionAttemptStats(
attempt_count=t, correct_count=c, wrong_count=t - c
)
items = [
ReviewQuestionItem(
id=q.id,
question_text=q.question_text,
choices=[
ReviewChoice(number=1, text=q.choice_1),
ReviewChoice(number=2, text=q.choice_2),
ReviewChoice(number=3, text=q.choice_3),
ReviewChoice(number=4, text=q.choice_4),
],
subject=q.subject,
scope=q.scope,
stats=stats_map.get(
q.id, QuestionAttemptStats(attempt_count=0, correct_count=0, wrong_count=0)
),
)
for q in selected
]
return ReviewQuestionListResponse(
items=items,
total=len(items),
target_per_subject=target_per_subject,
subject_distribution=distribution,
)
# ─── 단건 엔드포인트 ───
@router.get("/study-questions/{question_id}", response_model=StudyQuestionResponse)
async def get_question(
question_id: int,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
):
q = await session.get(StudyQuestion, question_id)
q = _verify_question_ownership(q, user)
stats = await _attempt_stats(session, user.id, question_id)
return StudyQuestionResponse(
id=q.id,
study_topic_id=q.study_topic_id,
question_text=q.question_text,
choice_1=q.choice_1,
choice_2=q.choice_2,
choice_3=q.choice_3,
choice_4=q.choice_4,
correct_choice=q.correct_choice,
subject=q.subject,
scope=q.scope,
exam_name=q.exam_name,
exam_round=q.exam_round,
explanation=q.explanation,
source_note=q.source_note,
is_active=q.is_active,
exam_question_number=q.exam_question_number,
ai_explanation_status=q.ai_explanation_status,
ai_explanation_generated_at=q.ai_explanation_generated_at,
ai_explanation_model=q.ai_explanation_model,
created_at=q.created_at,
updated_at=q.updated_at,
stats=stats,
)
@router.patch("/study-questions/{question_id}", response_model=StudyQuestionResponse)
async def update_question(
question_id: int,
body: StudyQuestionUpdate,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
):
"""부분 업데이트. correct_choice 변경 시 기존 attempts.is_correct 재계산 안 함 (시점 사실 보존).
프론트 편집 페이지에 안내 배너 노출."""
q = await session.get(StudyQuestion, question_id)
q = _verify_question_ownership(q, user)
fields_set = body.model_fields_set
SIMPLE_FIELDS = {
"question_text", "choice_1", "choice_2", "choice_3", "choice_4",
"correct_choice", "subject", "scope", "exam_name", "exam_round",
"exam_question_number",
"explanation", "source_note", "is_active",
}
for fname in SIMPLE_FIELDS & fields_set:
setattr(q, fname, getattr(body, fname))
# PR-3: 문제 핵심 필드 변경 시 AI 해설 stale 전이 (본문은 보존, UI 배지로 안내).
# ready 상태에서만 stale 로 전이 — pending/failed/none/stale 은 변경 안 함.
AI_STALE_TRIGGER = {
"question_text", "choice_1", "choice_2", "choice_3", "choice_4", "correct_choice",
}
if AI_STALE_TRIGGER & fields_set and q.ai_explanation_status == "ready":
q.ai_explanation_status = "stale"
# PR-4: 임베딩 stale 전이. 본문(question_text/choice_*)이 바뀌었을 때만 재계산.
# correct_choice 변경은 의미 검색에 영향 없으므로 재계산 안 함.
EMBED_STALE_TRIGGER = {
"question_text", "choice_1", "choice_2", "choice_3", "choice_4",
}
if EMBED_STALE_TRIGGER & fields_set and q.embedding_status == "ready":
q.embedding_status = "stale"
q.updated_at = datetime.now(timezone.utc)
await session.commit()
stats = await _attempt_stats(session, user.id, question_id)
return StudyQuestionResponse(
id=q.id,
study_topic_id=q.study_topic_id,
question_text=q.question_text,
choice_1=q.choice_1,
choice_2=q.choice_2,
choice_3=q.choice_3,
choice_4=q.choice_4,
correct_choice=q.correct_choice,
subject=q.subject,
scope=q.scope,
exam_name=q.exam_name,
exam_round=q.exam_round,
explanation=q.explanation,
source_note=q.source_note,
is_active=q.is_active,
exam_question_number=q.exam_question_number,
ai_explanation_status=q.ai_explanation_status,
ai_explanation_generated_at=q.ai_explanation_generated_at,
ai_explanation_model=q.ai_explanation_model,
created_at=q.created_at,
updated_at=q.updated_at,
stats=stats,
)
@router.delete("/study-questions/{question_id}", status_code=204)
async def soft_delete_question(
question_id: int,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
):
"""soft delete only. attempts 는 RESTRICT FK 로 보호되어 영구 보존.
hard delete 호출 경로 자체가 없음 — DB 레벨에서도 attempts 가 있으면 거부."""
q = await session.get(StudyQuestion, question_id)
q = _verify_question_ownership(q, user)
q.deleted_at = datetime.now(timezone.utc)
q.updated_at = q.deleted_at
await session.commit()
# ─── attempt ───
@router.post("/study-questions/{question_id}/attempt", response_model=AttemptResponse)
async def submit_attempt(
question_id: int,
body: AttemptCreate,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
):
"""답 제출. is_correct 판정 + attempt 1행 insert + 누적 통계 + 정답·해설 노출."""
q = await session.get(StudyQuestion, question_id)
q = _verify_question_ownership(q, user)
is_correct = body.selected_choice == q.correct_choice
attempt = StudyQuestionAttempt(
user_id=user.id,
study_question_id=q.id,
study_topic_id=q.study_topic_id,
selected_choice=body.selected_choice,
correct_choice=q.correct_choice,
is_correct=is_correct,
)
session.add(attempt)
await session.commit()
stats = await _attempt_stats(session, user.id, question_id)
return AttemptResponse(
is_correct=is_correct,
selected_choice=body.selected_choice,
correct_choice=q.correct_choice,
explanation=q.explanation,
stats=stats,
)
# ─── PR-5: 비슷한 문제 검색 (embedding cosine) ───
@router.get(
"/study-questions/{question_id}/similar",
response_model=SimilarQuestionsResponse,
)
async def list_similar_questions(
question_id: int,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
limit: int = Query(5, ge=1, le=20),
topic_only: bool = Query(True, description="True 면 같은 study_topic 안에서만"),
):
"""현재 문제의 embedding 기준 cosine 유사도 top-K. 26B 호출 없이 vector search 만.
제외:
- 자기 자신 (id != current)
- soft-deleted
- embedding_status != 'ready' (대기 중·실패·미생성)
source_status='ready' 가 아니면 items 빈 배열 반환 (UI 에서 안내).
"""
src = await session.get(StudyQuestion, question_id)
src = _verify_question_ownership(src, user)
if src.embedding_status != "ready" or src.embedding is None:
return SimilarQuestionsResponse(
items=[], source_status=src.embedding_status, source_id=question_id
)
# cosine 거리 = embedding <=> ref. 유사도 = 1 - distance.
distance_expr = StudyQuestion.embedding.cosine_distance(src.embedding)
base = (
select(StudyQuestion, distance_expr.label("distance"))
.where(
StudyQuestion.user_id == user.id,
StudyQuestion.id != question_id,
StudyQuestion.deleted_at.is_(None),
StudyQuestion.embedding_status == "ready",
StudyQuestion.embedding.is_not(None),
)
.order_by(distance_expr.asc())
.limit(limit)
)
if topic_only:
base = base.where(StudyQuestion.study_topic_id == src.study_topic_id)
rows = (await session.execute(base)).all()
similar_qs = [(r[0], float(r.distance)) for r in rows]
# attempt_count + last_correct batch (N+1 회피)
qids = [q.id for q, _ in similar_qs]
attempt_count_map: dict[int, int] = {}
last_correct_map: dict[int, bool] = {}
if qids:
cnt_rows = (
await session.execute(
select(
StudyQuestionAttempt.study_question_id,
func.count().label("total"),
)
.where(
StudyQuestionAttempt.user_id == user.id,
StudyQuestionAttempt.study_question_id.in_(qids),
)
.group_by(StudyQuestionAttempt.study_question_id)
)
).all()
for r in cnt_rows:
attempt_count_map[r.study_question_id] = int(r.total)
latest_rows = (
await session.execute(
select(
StudyQuestionAttempt.study_question_id,
StudyQuestionAttempt.is_correct,
)
.where(
StudyQuestionAttempt.user_id == user.id,
StudyQuestionAttempt.study_question_id.in_(qids),
)
.order_by(
StudyQuestionAttempt.study_question_id,
StudyQuestionAttempt.answered_at.desc(),
)
.distinct(StudyQuestionAttempt.study_question_id)
)
).all()
for r in latest_rows:
last_correct_map[r.study_question_id] = bool(r.is_correct)
def _truncate(text: str, n: int = 80) -> str:
return text if len(text) <= n else text[:n].rstrip() + ""
items = [
SimilarQuestionItem(
id=q.id,
study_topic_id=q.study_topic_id,
question_text=_truncate(q.question_text, 80),
subject=q.subject,
scope=q.scope,
exam_name=q.exam_name,
exam_round=q.exam_round,
similarity=round(1.0 - dist, 4),
attempt_count=attempt_count_map.get(q.id, 0),
last_correct=last_correct_map.get(q.id),
)
for q, dist in similar_qs
]
return SimilarQuestionsResponse(items=items, source_status="ready", source_id=question_id)
# ─── PR-3: AI 풀이 생성 엔드포인트 ───
# MLX 호출 timeout (초). MLX gate + 26B 추론 평균 ~10s, 안전 마진.
LLM_TIMEOUT_S = 30.0
# 프롬프트 템플릿 lazy load
_PROMPT_PATH = "study_question_explanation.txt"
_prompt_cache: str | None = None
def _load_explanation_prompt() -> str:
global _prompt_cache
if _prompt_cache is None:
from pathlib import Path
prompts_dir = Path(__file__).resolve().parent.parent / "prompts"
_prompt_cache = (prompts_dir / _PROMPT_PATH).read_text(encoding="utf-8")
return _prompt_cache
def _render_prompt(q: StudyQuestion, doc_block: str, question_block: str) -> str:
template = _load_explanation_prompt()
return (
template
.replace("{question_text}", q.question_text)
.replace("{choice_1}", q.choice_1)
.replace("{choice_2}", q.choice_2)
.replace("{choice_3}", q.choice_3)
.replace("{choice_4}", q.choice_4)
.replace("{correct_choice}", str(q.correct_choice))
.replace("{documents_evidence_block}", doc_block)
.replace("{questions_evidence_block}", question_block)
)
def _make_cache_response(q: StudyQuestion, is_stale: bool) -> AIExplanationResponse:
"""캐시 본문만 반환 (evidence 재계산 안 함). status/메타는 q row 그대로."""
return AIExplanationResponse(
ai_explanation=q.ai_explanation,
ai_explanation_status=q.ai_explanation_status,
ai_explanation_generated_at=q.ai_explanation_generated_at,
ai_explanation_model=q.ai_explanation_model,
evidence=[],
from_cache=True,
is_stale=is_stale,
can_regenerate=True,
)
@router.post(
"/study-questions/{question_id}/ai-explanation",
response_model=AIExplanationResponse,
)
async def generate_ai_explanation(
question_id: int,
body: AIExplanationCreate,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
):
"""수동 트리거 AI 풀이 생성. RAG = 매핑 자료 + 같은 토픽 다른 문제 (자기 자신 제외).
primary 모델 단독 (Mac mini MLX gemma-4-26b). MLX gate Semaphore(1) 경유.
- regenerate=false 이고 status=ready → 캐시 즉시 반환 (MLX 호출 없음)
- regenerate=false 이고 status=stale → 캐시 본문 반환, is_stale=true 플래그
- status=pending → 409 (이미 다른 호출 진행 중, race-safe 조건부 UPDATE 로 보장)
- 그 외 (none/failed/regenerate=true) → 새로 생성
실패 시 status='failed', 직전 본문은 보존.
"""
q = await session.get(StudyQuestion, question_id)
q = _verify_question_ownership(q, user)
# 1) 캐시 단축 분기 (regenerate=false)
if not body.regenerate:
if q.ai_explanation_status == "ready":
return _make_cache_response(q, is_stale=False)
if q.ai_explanation_status == "stale":
return _make_cache_response(q, is_stale=True)
if q.ai_explanation_status == "pending":
raise HTTPException(
status_code=409,
detail={"status": "pending", "detail": "이미 생성 중입니다"},
)
# status in {none, failed} → 신규 생성
else:
# regenerate=true 라도 pending 은 차단 (동시 호출 방어)
if q.ai_explanation_status == "pending":
raise HTTPException(
status_code=409,
detail={"status": "pending", "detail": "이미 생성 중입니다"},
)
# 2) Race-safe pending 전이 (조건부 UPDATE — 동시 호출 차단)
lock_result = await session.execute(
update(StudyQuestion)
.where(
StudyQuestion.id == question_id,
StudyQuestion.user_id == user.id,
StudyQuestion.ai_explanation_status != "pending",
)
.values(ai_explanation_status="pending", updated_at=datetime.now(timezone.utc))
.returning(StudyQuestion.id)
)
if lock_result.scalar_one_or_none() is None:
# 다른 호출이 먼저 pending 박음
raise HTTPException(
status_code=409,
detail={"status": "pending", "detail": "이미 생성 중입니다"},
)
await session.commit()
await session.refresh(q)
# 3) RAG 근거 수집
try:
ctx = await gather_explanation_context(session, user.id, q)
except Exception as e:
logger.warning("study_explanation_rag_failed: %s: %s", type(e).__name__, e)
# RAG 실패 시 빈 evidence 로 진행 (프롬프트가 "자료 근거 부족" 분기 처리)
from services.study.explanation_rag import ExplanationContext
ctx = ExplanationContext(documents=[], questions=[])
# 4) 프롬프트 조립 + MLX gate 안에서 primary 호출 + timeout
doc_block = render_evidence_block(ctx.documents)
q_block = render_evidence_block(ctx.questions)
prompt = _render_prompt(q, doc_block, q_block)
ai_client = AIClient()
raw_text: str | None = None
error_message: str | None = None
try:
async with get_mlx_gate():
async with asyncio.timeout(LLM_TIMEOUT_S):
raw_text = await ai_client.call_primary(prompt)
except asyncio.TimeoutError:
error_message = f"MLX timeout ({LLM_TIMEOUT_S}s)"
logger.warning("study_explanation_mlx_timeout qid=%s", question_id)
except Exception as e:
error_message = f"{type(e).__name__}: {e}"
logger.exception("study_explanation_mlx_failed qid=%s", question_id)
finally:
await ai_client.close()
# 5) 결과 저장 (성공/실패 분기) — q 객체 fresh refresh
q = await session.get(StudyQuestion, question_id)
if raw_text is None or not raw_text.strip():
# 실패 — 직전 본문 보존, status='failed'
q.ai_explanation_status = "failed"
q.updated_at = datetime.now(timezone.utc)
await session.commit()
return AIExplanationResponse(
ai_explanation=q.ai_explanation,
ai_explanation_status="failed",
ai_explanation_generated_at=q.ai_explanation_generated_at,
ai_explanation_model=q.ai_explanation_model,
evidence=[e.to_dict() for e in ctx.all],
from_cache=False,
is_stale=False,
can_regenerate=True,
)
# 성공 — Qwen think 태그 등 제거
from ai.client import strip_thinking
cleaned = strip_thinking(raw_text).strip()
q.ai_explanation = cleaned
q.ai_explanation_status = "ready"
q.ai_explanation_generated_at = datetime.now(timezone.utc)
primary_name = ai_client.ai.primary.model if hasattr(ai_client.ai.primary, "model") else "primary"
q.ai_explanation_model = f"mlx:{primary_name}"
q.updated_at = q.ai_explanation_generated_at
await session.commit()
return AIExplanationResponse(
ai_explanation=q.ai_explanation,
ai_explanation_status="ready",
ai_explanation_generated_at=q.ai_explanation_generated_at,
ai_explanation_model=q.ai_explanation_model,
evidence=[e.to_dict() for e in ctx.all],
from_cache=False,
is_stale=False,
can_regenerate=True,
)