Files
hyungi_document_server/app/api/study_questions.py
T
Hyungi Ahn 4b7156061e feat(study): 문제은행 + 복습모드 (study_questions)
study_topic 워크스페이스에 4지선다 문제은행 자산 트랙 추가. 기사시험 필기
대비 시나리오 — 빠른 반복 입력 + 과목별 균등 추출 복습 + 정오답 누적.

데이터 모델 (migrations 186~190):
- study_questions: study_topic 1:N, soft delete, is_active 토글, correct_choice
  SMALLINT CHECK 1~4
- study_question_attempts: 답 제출 1행 누적. study_question_id FK는 ON DELETE
  RESTRICT (이력 보존 원칙 — hard delete 실수로 풀이 기록 소실 차단)

설계 원칙:
- 문제 삭제는 API 에서 soft delete only. attempts FK RESTRICT 로 DB 레벨도 보호
- correct_choice 변경 시 기존 attempts.is_correct 재계산 안 함 (시점 사실 보존)
- 복습 default = 과목별 target_per_subject(20) 무작위 균등 추출. 한 과목이
  부족하면 가용한 만큼만
- wrong_only=true 정의 = 가장 최근 attempt 가 오답인 문제 (latest-wrong, ever-wrong 아님)
- 출제 응답에서 정답·해설 비공개. 답 제출 시점에만 노출
- subject/scope 강한 enum 미사용 (자유 텍스트, 자동완성은 후속)

API: /api/study-topics/{id}/questions, /review/questions, /api/study-questions/{id},
/attempt. 통합뷰(/study-topics/{id}) 응답에 sections.questions / stats.question_count
추가. 기존 question_set_count 는 후속 PR(회차/모의고사 묶음)용으로 보존.

프론트: /study/topics/[id]에 문제 섹션 + "새 문제"/"복습 시작" 진입.
/questions/new (저장 후 계속 입력 + sessionStorage persistent),
/questions/[qid]/edit (정답 변경 시 attempts 재계산 안 됨 안내 배너),
/review (시작 옵션 → 풀이 → 마지막 요약).

후속 PR 예정: 오답노트/취약 과목 리포트, AI 해설/클러스터링, spaced
repetition, 이미지 OCR 입력, CSV import, study_question_sets 묶음.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-28 08:00:37 +09:00

651 lines
22 KiB
Python

"""학습 워크스페이스 문제은행 + 복습모드 API.
PR-2 가드레일:
- study_topic 컨테이너에 자산 타입별 1:N (문제) 추가. polymorphic 단일 테이블 영구 금지.
- subject/scope 강한 enum 미사용 (자유 텍스트, 자동완성은 후속 PR).
- 문제 삭제는 soft delete only. attempts 는 RESTRICT FK 로 DB 레벨 보호.
- correct_choice 변경 시 기존 attempts 재계산 안 함 (기록 = 시점 사실).
- 복습 출제 시 정답·해설 응답 비공개 → 답 제출 시점에만 노출.
- wrong_only=true 정의 = 가장 최근 attempt 가 오답인 문제. "한 번이라도 틀린 적 있는" 아님.
"""
import logging
import random
from datetime import datetime, timezone
from typing import Annotated
from fastapi import APIRouter, Depends, HTTPException, Query
from pydantic import BaseModel, Field
from sqlalchemy import and_, case, func, select
from sqlalchemy.ext.asyncio import AsyncSession
from core.auth import get_current_user
from core.database import get_session
from models.study_question import StudyQuestion, StudyQuestionAttempt
from models.study_topic import StudyTopic
from models.user import User
logger = logging.getLogger(__name__)
router = APIRouter()
# ─── Helpers ───
def _verify_topic_ownership(topic: StudyTopic | None, user: User) -> StudyTopic:
"""study_topics.py 와 동일한 패턴 — 정보 누설 방지로 mismatch 도 404."""
if topic is None or topic.user_id != user.id or topic.deleted_at is not None:
raise HTTPException(status_code=404, detail="학습 주제를 찾을 수 없습니다")
return topic
def _verify_question_ownership(q: StudyQuestion | None, user: User) -> StudyQuestion:
if q is None or q.user_id != user.id or q.deleted_at is not None:
raise HTTPException(status_code=404, detail="문제를 찾을 수 없습니다")
return q
# ─── Pydantic 스키마 ───
class StudyQuestionCreate(BaseModel):
question_text: str = Field(min_length=1)
choice_1: str = Field(min_length=1)
choice_2: str = Field(min_length=1)
choice_3: str = Field(min_length=1)
choice_4: str = Field(min_length=1)
correct_choice: int = Field(ge=1, le=4)
subject: str | None = Field(default=None, max_length=120)
scope: str | None = Field(default=None, max_length=200)
exam_name: str | None = Field(default=None, max_length=120)
exam_round: str | None = Field(default=None, max_length=120)
explanation: str | None = None
source_note: str | None = None
is_active: bool = True
class StudyQuestionUpdate(BaseModel):
question_text: str | None = Field(default=None, min_length=1)
choice_1: str | None = Field(default=None, min_length=1)
choice_2: str | None = Field(default=None, min_length=1)
choice_3: str | None = Field(default=None, min_length=1)
choice_4: str | None = Field(default=None, min_length=1)
correct_choice: int | None = Field(default=None, ge=1, le=4)
subject: str | None = Field(default=None, max_length=120)
scope: str | None = Field(default=None, max_length=200)
exam_name: str | None = Field(default=None, max_length=120)
exam_round: str | None = Field(default=None, max_length=120)
explanation: str | None = None
source_note: str | None = None
is_active: bool | None = None
class QuestionAttemptStats(BaseModel):
attempt_count: int
correct_count: int
wrong_count: int
class StudyQuestionResponse(BaseModel):
"""편집·관리용 — 정답·해설 모두 노출."""
id: int
study_topic_id: int
question_text: str
choice_1: str
choice_2: str
choice_3: str
choice_4: str
correct_choice: int
subject: str | None
scope: str | None
exam_name: str | None
exam_round: str | None
explanation: str | None
source_note: str | None
is_active: bool
created_at: datetime
updated_at: datetime
stats: QuestionAttemptStats
class StudyQuestionSummary(BaseModel):
"""통합 뷰·목록용 — 본문 truncate, 정답 비공개."""
id: int
question_text: str # 앞 80자 truncate (서버 측에서 처리)
subject: str | None
scope: str | None
exam_name: str | None
exam_round: str | None
is_active: bool
attempt_count: int
last_correct: bool | None
created_at: datetime
class StudyQuestionListResponse(BaseModel):
items: list[StudyQuestionSummary]
total: int
page: int
page_size: int
class ReviewChoice(BaseModel):
number: int
text: str
class ReviewQuestionItem(BaseModel):
"""복습 출제 응답 — 정답·해설 비공개."""
id: int
question_text: str
choices: list[ReviewChoice]
subject: str | None
scope: str | None
stats: QuestionAttemptStats
class ReviewQuestionListResponse(BaseModel):
items: list[ReviewQuestionItem]
total: int
target_per_subject: int
subject_distribution: dict[str, int] # {"연소공학": 20, ...}
class AttemptCreate(BaseModel):
selected_choice: int = Field(ge=1, le=4)
class AttemptResponse(BaseModel):
is_correct: bool
selected_choice: int
correct_choice: int
explanation: str | None
stats: QuestionAttemptStats
# ─── 통계 헬퍼 ───
async def _attempt_stats(
session: AsyncSession, user_id: int, question_id: int
) -> QuestionAttemptStats:
"""단일 문제의 누적 attempt 통계 (사용자 한정)."""
row = (
await session.execute(
select(
func.count().label("total"),
func.coalesce(
func.sum(case((StudyQuestionAttempt.is_correct.is_(True), 1), else_=0)),
0,
).label("correct"),
).where(
StudyQuestionAttempt.user_id == user_id,
StudyQuestionAttempt.study_question_id == question_id,
)
)
).one()
total = int(row.total or 0)
correct = int(row.correct or 0)
return QuestionAttemptStats(
attempt_count=total, correct_count=correct, wrong_count=total - correct
)
def _truncate(text: str, n: int = 80) -> str:
return text if len(text) <= n else text[:n].rstrip() + ""
# ─── 토픽 단위 엔드포인트 ───
@router.get(
"/study-topics/{topic_id}/questions",
response_model=StudyQuestionListResponse,
)
async def list_questions_in_topic(
topic_id: int,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
subject: str | None = Query(None),
scope: str | None = Query(None),
page: int = Query(1, ge=1),
page_size: int = Query(50, ge=1, le=200),
):
"""토픽 내 문제 목록. soft-deleted 제외. summary 응답 (정답 비공개)."""
topic = await session.get(StudyTopic, topic_id)
_verify_topic_ownership(topic, user)
base = select(StudyQuestion).where(
StudyQuestion.user_id == user.id,
StudyQuestion.study_topic_id == topic_id,
StudyQuestion.deleted_at.is_(None),
)
if subject is not None:
base = base.where(StudyQuestion.subject == subject)
if scope is not None:
base = base.where(StudyQuestion.scope == scope)
total = (
await session.execute(select(func.count()).select_from(base.subquery()))
).scalar() or 0
rows = (
await session.execute(
base.order_by(StudyQuestion.created_at.desc(), StudyQuestion.id.desc())
.offset((page - 1) * page_size)
.limit(page_size)
)
).scalars().all()
# 한 번의 쿼리로 attempt 통계 + 마지막 정/오답 끌어오기 (N+1 회피)
qids = [q.id for q in rows]
stats_map: dict[int, tuple[int, int]] = {}
last_correct_map: dict[int, bool] = {}
if qids:
stats_rows = (
await session.execute(
select(
StudyQuestionAttempt.study_question_id,
func.count().label("total"),
func.coalesce(
func.sum(case((StudyQuestionAttempt.is_correct.is_(True), 1), else_=0)),
0,
).label("correct"),
)
.where(
StudyQuestionAttempt.user_id == user.id,
StudyQuestionAttempt.study_question_id.in_(qids),
)
.group_by(StudyQuestionAttempt.study_question_id)
)
).all()
for r in stats_rows:
stats_map[r.study_question_id] = (int(r.total), int(r.correct))
latest_rows = (
await session.execute(
select(
StudyQuestionAttempt.study_question_id,
StudyQuestionAttempt.is_correct,
StudyQuestionAttempt.answered_at,
)
.where(
StudyQuestionAttempt.user_id == user.id,
StudyQuestionAttempt.study_question_id.in_(qids),
)
.order_by(
StudyQuestionAttempt.study_question_id,
StudyQuestionAttempt.answered_at.desc(),
)
.distinct(StudyQuestionAttempt.study_question_id)
)
).all()
for r in latest_rows:
last_correct_map[r.study_question_id] = bool(r.is_correct)
items = [
StudyQuestionSummary(
id=q.id,
question_text=_truncate(q.question_text, 80),
subject=q.subject,
scope=q.scope,
exam_name=q.exam_name,
exam_round=q.exam_round,
is_active=q.is_active,
attempt_count=stats_map.get(q.id, (0, 0))[0],
last_correct=last_correct_map.get(q.id),
created_at=q.created_at,
)
for q in rows
]
return StudyQuestionListResponse(items=items, total=total, page=page, page_size=page_size)
@router.post(
"/study-topics/{topic_id}/questions",
response_model=StudyQuestionResponse,
status_code=201,
)
async def create_question_in_topic(
topic_id: int,
body: StudyQuestionCreate,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
):
topic = await session.get(StudyTopic, topic_id)
_verify_topic_ownership(topic, user)
q = StudyQuestion(
user_id=user.id,
study_topic_id=topic_id,
question_text=body.question_text,
choice_1=body.choice_1,
choice_2=body.choice_2,
choice_3=body.choice_3,
choice_4=body.choice_4,
correct_choice=body.correct_choice,
subject=body.subject,
scope=body.scope,
exam_name=body.exam_name,
exam_round=body.exam_round,
explanation=body.explanation,
source_note=body.source_note,
is_active=body.is_active,
)
session.add(q)
await session.flush()
await session.commit()
stats = QuestionAttemptStats(attempt_count=0, correct_count=0, wrong_count=0)
return StudyQuestionResponse(
id=q.id,
study_topic_id=q.study_topic_id,
question_text=q.question_text,
choice_1=q.choice_1,
choice_2=q.choice_2,
choice_3=q.choice_3,
choice_4=q.choice_4,
correct_choice=q.correct_choice,
subject=q.subject,
scope=q.scope,
exam_name=q.exam_name,
exam_round=q.exam_round,
explanation=q.explanation,
source_note=q.source_note,
is_active=q.is_active,
created_at=q.created_at,
updated_at=q.updated_at,
stats=stats,
)
# ─── 복습 출제 ───
@router.get(
"/study-topics/{topic_id}/review/questions",
response_model=ReviewQuestionListResponse,
)
async def review_questions_for_topic(
topic_id: int,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
subject: str | None = Query(None, description="단일 과목 집중. 미지정 시 과목별 균등 추출"),
scope: str | None = Query(None),
target_per_subject: int = Query(20, ge=1, le=100),
wrong_only: bool = Query(
False,
description="가장 최근 attempt 가 오답인 문제만 (latest-wrong, ever_wrong 아님)",
),
):
"""복습 출제. default = 과목별 target_per_subject 무작위 균등 추출.
응답에서 정답·해설은 비공개 — 답 제출(POST /api/study-questions/{id}/attempt) 시점에만 노출.
"""
topic = await session.get(StudyTopic, topic_id)
_verify_topic_ownership(topic, user)
# 후보 질문 base — soft delete 제외 + is_active
base_filter = and_(
StudyQuestion.user_id == user.id,
StudyQuestion.study_topic_id == topic_id,
StudyQuestion.deleted_at.is_(None),
StudyQuestion.is_active.is_(True),
)
# wrong_only=true: latest attempt 가 오답인 question_id 만 후보로 좁힘.
# DISTINCT ON (study_question_id) ORDER BY answered_at DESC 패턴.
wrong_qids: set[int] | None = None
if wrong_only:
latest = (
await session.execute(
select(
StudyQuestionAttempt.study_question_id,
StudyQuestionAttempt.is_correct,
)
.where(
StudyQuestionAttempt.user_id == user.id,
StudyQuestionAttempt.study_topic_id == topic_id,
)
.order_by(
StudyQuestionAttempt.study_question_id,
StudyQuestionAttempt.answered_at.desc(),
)
.distinct(StudyQuestionAttempt.study_question_id)
)
).all()
wrong_qids = {r.study_question_id for r in latest if r.is_correct is False}
if not wrong_qids:
return ReviewQuestionListResponse(
items=[], total=0, target_per_subject=target_per_subject, subject_distribution={}
)
# 과목 그룹 결정
if subject is not None:
# 단일 과목 집중
subjects: list[str | None] = [subject]
else:
subj_query = (
select(StudyQuestion.subject)
.where(base_filter)
.distinct()
)
if scope is not None:
subj_query = subj_query.where(StudyQuestion.scope == scope)
if wrong_qids is not None:
subj_query = subj_query.where(StudyQuestion.id.in_(wrong_qids))
subjects = [r[0] for r in (await session.execute(subj_query)).all()]
if not subjects:
return ReviewQuestionListResponse(
items=[], total=0, target_per_subject=target_per_subject, subject_distribution={}
)
# 각 subject 별로 ORDER BY random() LIMIT target_per_subject
selected: list[StudyQuestion] = []
distribution: dict[str, int] = {}
for subj in subjects:
sub_q = select(StudyQuestion).where(base_filter)
if subj is None:
sub_q = sub_q.where(StudyQuestion.subject.is_(None))
else:
sub_q = sub_q.where(StudyQuestion.subject == subj)
if scope is not None:
sub_q = sub_q.where(StudyQuestion.scope == scope)
if wrong_qids is not None:
sub_q = sub_q.where(StudyQuestion.id.in_(wrong_qids))
sub_q = sub_q.order_by(func.random()).limit(target_per_subject)
rows = (await session.execute(sub_q)).scalars().all()
if rows:
selected.extend(rows)
distribution[subj or ""] = len(rows)
# 그룹별 추출 후 전체 셔플 (입력 순서 단조 회피)
random.shuffle(selected)
# attempt 통계 batch (N+1 회피)
qids = [q.id for q in selected]
stats_map: dict[int, QuestionAttemptStats] = {}
if qids:
stats_rows = (
await session.execute(
select(
StudyQuestionAttempt.study_question_id,
func.count().label("total"),
func.coalesce(
func.sum(case((StudyQuestionAttempt.is_correct.is_(True), 1), else_=0)),
0,
).label("correct"),
)
.where(
StudyQuestionAttempt.user_id == user.id,
StudyQuestionAttempt.study_question_id.in_(qids),
)
.group_by(StudyQuestionAttempt.study_question_id)
)
).all()
for r in stats_rows:
t = int(r.total)
c = int(r.correct)
stats_map[r.study_question_id] = QuestionAttemptStats(
attempt_count=t, correct_count=c, wrong_count=t - c
)
items = [
ReviewQuestionItem(
id=q.id,
question_text=q.question_text,
choices=[
ReviewChoice(number=1, text=q.choice_1),
ReviewChoice(number=2, text=q.choice_2),
ReviewChoice(number=3, text=q.choice_3),
ReviewChoice(number=4, text=q.choice_4),
],
subject=q.subject,
scope=q.scope,
stats=stats_map.get(
q.id, QuestionAttemptStats(attempt_count=0, correct_count=0, wrong_count=0)
),
)
for q in selected
]
return ReviewQuestionListResponse(
items=items,
total=len(items),
target_per_subject=target_per_subject,
subject_distribution=distribution,
)
# ─── 단건 엔드포인트 ───
@router.get("/study-questions/{question_id}", response_model=StudyQuestionResponse)
async def get_question(
question_id: int,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
):
q = await session.get(StudyQuestion, question_id)
q = _verify_question_ownership(q, user)
stats = await _attempt_stats(session, user.id, question_id)
return StudyQuestionResponse(
id=q.id,
study_topic_id=q.study_topic_id,
question_text=q.question_text,
choice_1=q.choice_1,
choice_2=q.choice_2,
choice_3=q.choice_3,
choice_4=q.choice_4,
correct_choice=q.correct_choice,
subject=q.subject,
scope=q.scope,
exam_name=q.exam_name,
exam_round=q.exam_round,
explanation=q.explanation,
source_note=q.source_note,
is_active=q.is_active,
created_at=q.created_at,
updated_at=q.updated_at,
stats=stats,
)
@router.patch("/study-questions/{question_id}", response_model=StudyQuestionResponse)
async def update_question(
question_id: int,
body: StudyQuestionUpdate,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
):
"""부분 업데이트. correct_choice 변경 시 기존 attempts.is_correct 재계산 안 함 (시점 사실 보존).
프론트 편집 페이지에 안내 배너 노출."""
q = await session.get(StudyQuestion, question_id)
q = _verify_question_ownership(q, user)
fields_set = body.model_fields_set
SIMPLE_FIELDS = {
"question_text", "choice_1", "choice_2", "choice_3", "choice_4",
"correct_choice", "subject", "scope", "exam_name", "exam_round",
"explanation", "source_note", "is_active",
}
for fname in SIMPLE_FIELDS & fields_set:
setattr(q, fname, getattr(body, fname))
q.updated_at = datetime.now(timezone.utc)
await session.commit()
stats = await _attempt_stats(session, user.id, question_id)
return StudyQuestionResponse(
id=q.id,
study_topic_id=q.study_topic_id,
question_text=q.question_text,
choice_1=q.choice_1,
choice_2=q.choice_2,
choice_3=q.choice_3,
choice_4=q.choice_4,
correct_choice=q.correct_choice,
subject=q.subject,
scope=q.scope,
exam_name=q.exam_name,
exam_round=q.exam_round,
explanation=q.explanation,
source_note=q.source_note,
is_active=q.is_active,
created_at=q.created_at,
updated_at=q.updated_at,
stats=stats,
)
@router.delete("/study-questions/{question_id}", status_code=204)
async def soft_delete_question(
question_id: int,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
):
"""soft delete only. attempts 는 RESTRICT FK 로 보호되어 영구 보존.
hard delete 호출 경로 자체가 없음 — DB 레벨에서도 attempts 가 있으면 거부."""
q = await session.get(StudyQuestion, question_id)
q = _verify_question_ownership(q, user)
q.deleted_at = datetime.now(timezone.utc)
q.updated_at = q.deleted_at
await session.commit()
# ─── attempt ───
@router.post("/study-questions/{question_id}/attempt", response_model=AttemptResponse)
async def submit_attempt(
question_id: int,
body: AttemptCreate,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
):
"""답 제출. is_correct 판정 + attempt 1행 insert + 누적 통계 + 정답·해설 노출."""
q = await session.get(StudyQuestion, question_id)
q = _verify_question_ownership(q, user)
is_correct = body.selected_choice == q.correct_choice
attempt = StudyQuestionAttempt(
user_id=user.id,
study_question_id=q.id,
study_topic_id=q.study_topic_id,
selected_choice=body.selected_choice,
correct_choice=q.correct_choice,
is_correct=is_correct,
)
session.add(attempt)
await session.commit()
stats = await _attempt_stats(session, user.id, question_id)
return AttemptResponse(
is_correct=is_correct,
selected_choice=body.selected_choice,
correct_choice=q.correct_choice,
explanation=q.explanation,
stats=stats,
)