Files
hyungi_document_server/app/api/study_topics.py
T
Hyungi Ahn fc8aea1649 feat(study): 반복 출제 라벨 등급 + cosine 임계값 0.85 조정
- round_count 별 등급 매핑 (단골/잘 나오는 반복 출제/반복 출제/신출/빈출)
  - ≥7 단골, 5–6 잘 나오는 반복 출제, 3–4 반복 출제,
    2 + max(연도)≥2024 신출, 2 + 모두 옛 빈출
- SIMILAR_THRESHOLD 0.88 → 0.85 (5-source 분포 측정 결과 자연 갭 위치 반영)
- API 응답 + 프론트 3곳 (보기/통합뷰/결과 카드) 라벨 일괄 통일

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-30 08:50:39 +09:00

1763 lines
61 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""학습 워크스페이스(study_topic) API — 필기 세션 + 자료(library document) 묶음 컨테이너
핵심 개념:
- study_topic 은 단순 폴더/태그가 아니라 학습 자산 1차 컨테이너 (학습 워크스페이스).
- 향후 단어장/오디오/문제 세트 같은 자산이 같은 컨테이너 아래로 들어올 수 있도록
응답 구조를 sections + stats 형태로 설계 (현재는 sessions / documents 두 키만).
- documents.category(자료실 UI 축) 와 직교한 별도 분류 축. 자료실 facet/카테고리는 미터치.
- StudySession.certification/subject/topic 은 보존 — 본 컨테이너 와 직교한 세부 메타.
설계 제약:
- study_type 은 강한 enum 미사용. VALID_STUDY_TYPES 는 단순 권장값 (DB/Pydantic 강제 안 함).
- soft delete (deleted_at). active 행끼리만 (user_id, name) partial unique.
- 권한: 모든 쿼리에 user_id 강제. 다른 사용자 자료를 묶지 못함.
- 문서 매핑은 N:M (study_topic_documents). 세션 매핑은 1:N (study_sessions.study_topic_id).
- polymorphic 단일 study_topic_items 테이블은 만들지 않는다 (영구 금지).
"""
import asyncio
import logging
import random as _random
from datetime import datetime, timezone
from enum import Enum
from pathlib import Path as _Path
from typing import Annotated, Any
from fastapi import APIRouter, Depends, HTTPException, Query
from pydantic import BaseModel, Field
from sqlalchemy import and_, case, delete, func, select, text as sql_text, update
from sqlalchemy.exc import IntegrityError
from sqlalchemy.ext.asyncio import AsyncSession
from ai.client import AIClient, strip_thinking
from core.auth import get_current_user
from core.database import get_session
from core.library import LIBRARY_PREFIX, normalize_library_path
from models.document import Document
from models.study_session import StudySession
from models.study_topic import StudyTopic, StudyTopicDocument
from models.study_question import StudyQuestion, StudyQuestionAttempt
from models.study_question_image import StudyQuestionImage
from models.study_quiz_session import StudyQuizSession
from models.study_topic_subject_note import StudyTopicSubjectNote
from models.user import User
from services.search.llm_gate import get_mlx_gate
from services.study.subject_note_rag import (
SubjectNoteContext,
gather_subject_note_context,
render_evidence_block,
)
logger = logging.getLogger(__name__)
router = APIRouter()
# ─── 권장값 (강제 enum 아님, UI 안내용) ───
RECOMMENDED_STUDY_TYPES: set[str] = {
"certification", "language", "school", "work", "general",
}
# ─── Pydantic 스키마 ───
class StudyTopicCreate(BaseModel):
name: str = Field(min_length=1, max_length=120)
description: str | None = None
color: str | None = Field(default=None, max_length=20)
study_type: str | None = Field(default=None, max_length=40)
sort_order: int = 0
# PR-6: 시험 메타
exam_round_size: int | None = Field(default=None, ge=1, le=300)
exam_subjects: list[str] = []
class StudyTopicUpdate(BaseModel):
name: str | None = Field(default=None, min_length=1, max_length=120)
description: str | None = None
color: str | None = Field(default=None, max_length=20)
study_type: str | None = Field(default=None, max_length=40)
sort_order: int | None = None
# PR-6: 시험 메타
exam_round_size: int | None = Field(default=None, ge=1, le=300)
exam_subjects: list[str] | None = None
class StudyTopicResponse(BaseModel):
"""주제 목록 응답 — 집계 카운트 포함."""
id: int
name: str
description: str | None
color: str | None
study_type: str | None
sort_order: int
session_count: int = 0
document_count: int = 0
question_count: int = 0
# PR-6: 시험 메타
exam_round_size: int | None = None
exam_subjects: list[str] = []
created_at: datetime
updated_at: datetime
class StudyTopicListResponse(BaseModel):
items: list[StudyTopicResponse]
total: int
class StudyTopicSessionSummary(BaseModel):
"""상세 뷰의 세션 카드 페이로드 — 통합 뷰 렌더용 최소 필드."""
id: int
study_type: str
certification: str | None
language_code: str | None
learning_level: str | None
subject: str | None
topic: str | None
mode: str
repetition_count: int
review_state: str | None
created_at: datetime
updated_at: datetime
class StudyTopicDocumentSummary(BaseModel):
"""상세 뷰의 자료 카드 페이로드.
library_paths: 통합뷰 자료 섹션의 카테고리 트리 그룹핑용. user_tags 의
`@library/<path>` 태그에서 prefix 제거한 path 들. 한 자료가 여러 카테고리에
속할 수 있으나(다중 분류), 트리 그룹핑 시 첫 번째 path 만 사용. 빈 리스트면
"분류 없음" 그룹.
"""
id: int
title: str | None
file_format: str
file_type: str
category: str | None
ai_domain: str | None
importance: str | None
sort_order: int
linked_at: datetime
library_paths: list[str] = []
class StudyTopicQuestionSummary(BaseModel):
"""상세 뷰의 문제 카드 페이로드 — 정답·해설 비공개, 본문 80자 truncate."""
id: int
question_text: str
subject: str | None
scope: str | None
exam_name: str | None
exam_round: str | None
exam_question_number: int | None # PR-11: 회차별 그룹 안 정렬 키
is_active: bool
attempt_count: int
last_correct: bool | None
created_at: datetime
class StudyTopicSections(BaseModel):
"""확장 친화 dict — 향후 audio_assets / vocab_decks 키 추가 가능."""
sessions: list[StudyTopicSessionSummary]
documents: list[StudyTopicDocumentSummary]
questions: list[StudyTopicQuestionSummary] = []
class StudyTopicStats(BaseModel):
"""자산 카운트 — 0 으로 미리 노출해서 후속 PR 에서 필드만 채움.
question_count = PR-2 단일 문제 수. question_set_count = 후속 PR 의 회차/모의고사 묶음 수 (둘은 의미 다름).
"""
session_count: int
document_count: int
question_count: int = 0
audio_count: int = 0
vocab_count: int = 0
question_set_count: int = 0
class StudyTopicMeta(BaseModel):
id: int
name: str
description: str | None
color: str | None
study_type: str | None
sort_order: int
# PR-6: 시험 메타
exam_round_size: int | None = None
exam_subjects: list[str] = []
created_at: datetime
updated_at: datetime
class StudyTopicDetailResponse(BaseModel):
topic: StudyTopicMeta
sections: StudyTopicSections
stats: StudyTopicStats
class StudyTopicDocumentLinkRequest(BaseModel):
document_ids: list[int] = Field(min_length=1, max_length=100)
class StudyTopicDocumentLinkResponse(BaseModel):
linked: list[int]
skipped_existing: list[int]
skipped_not_found: list[int]
class StudyTopicDocumentByPathRequest(BaseModel):
"""카테고리 트리 노드 일괄 추가용. path prefix 매칭으로 하위 자료 모두 매핑.
include_subtree=True 면 path 와 그 하위 카테고리 전체, False 면 path 정확 일치만.
"""
path: str = Field(min_length=1, max_length=500)
include_subtree: bool = True
class StudyTopicDocumentByPathResponse(BaseModel):
linked_count: int
skipped_existing_count: int
total_in_path: int
path: str
# ─── Helpers ───
def _verify_topic_ownership(topic: StudyTopic | None, user: User) -> StudyTopic:
"""소유자 검증 + soft-deleted 행 차단. mismatch 도 404 (정보 누설 방지)."""
if topic is None or topic.user_id != user.id or topic.deleted_at is not None:
raise HTTPException(status_code=404, detail="학습 주제를 찾을 수 없습니다")
return topic
def _meta_from_topic(t: StudyTopic) -> StudyTopicMeta:
return StudyTopicMeta(
id=t.id,
name=t.name,
description=t.description,
color=t.color,
study_type=t.study_type,
sort_order=t.sort_order,
exam_round_size=t.exam_round_size,
exam_subjects=t.exam_subjects or [],
created_at=t.created_at,
updated_at=t.updated_at,
)
# ─── 엔드포인트 ───
@router.get("/", response_model=StudyTopicListResponse)
async def list_study_topics(
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
limit: int = Query(100, ge=1, le=500),
offset: int = Query(0, ge=0),
):
"""사용자의 active 주제 목록. 세션 수·자료 수 집계 포함."""
# active 주제 + 세션 수 (LEFT JOIN study_sessions)
sess_count_sub = (
select(
StudySession.study_topic_id.label("topic_id"),
func.count().label("c"),
)
.where(StudySession.user_id == user.id)
.where(StudySession.study_topic_id.is_not(None))
.group_by(StudySession.study_topic_id)
.subquery()
)
doc_count_sub = (
select(
StudyTopicDocument.study_topic_id.label("topic_id"),
func.count().label("c"),
)
.where(StudyTopicDocument.user_id == user.id)
.group_by(StudyTopicDocument.study_topic_id)
.subquery()
)
# PR-2 — 활성 문제 카운트 (soft-deleted 제외)
from models.study_question import StudyQuestion as _SQ
q_count_sub = (
select(
_SQ.study_topic_id.label("topic_id"),
func.count().label("c"),
)
.where(_SQ.user_id == user.id, _SQ.deleted_at.is_(None))
.group_by(_SQ.study_topic_id)
.subquery()
)
base = (
select(
StudyTopic,
func.coalesce(sess_count_sub.c.c, 0).label("session_count"),
func.coalesce(doc_count_sub.c.c, 0).label("document_count"),
func.coalesce(q_count_sub.c.c, 0).label("question_count"),
)
.outerjoin(sess_count_sub, sess_count_sub.c.topic_id == StudyTopic.id)
.outerjoin(doc_count_sub, doc_count_sub.c.topic_id == StudyTopic.id)
.outerjoin(q_count_sub, q_count_sub.c.topic_id == StudyTopic.id)
.where(StudyTopic.user_id == user.id, StudyTopic.deleted_at.is_(None))
.order_by(StudyTopic.sort_order.asc(), StudyTopic.id.desc())
)
total_query = select(func.count()).select_from(
select(StudyTopic.id)
.where(StudyTopic.user_id == user.id, StudyTopic.deleted_at.is_(None))
.subquery()
)
total = (await session.execute(total_query)).scalar() or 0
rows = (await session.execute(base.offset(offset).limit(limit))).all()
items = [
StudyTopicResponse(
id=t.id,
name=t.name,
description=t.description,
color=t.color,
study_type=t.study_type,
sort_order=t.sort_order,
session_count=int(sc),
document_count=int(dc),
question_count=int(qc),
exam_round_size=t.exam_round_size,
exam_subjects=t.exam_subjects or [],
created_at=t.created_at,
updated_at=t.updated_at,
)
for t, sc, dc, qc in rows
]
return StudyTopicListResponse(items=items, total=total)
class ExamRoundProgress(BaseModel):
exam_round: str
question_count: int
max_question_number: int # 0 = exam_question_number 가 모두 NULL
next_question_number: int | None # 도달 시 None
is_complete: bool
class ExamRoundsResponse(BaseModel):
exam_round_size: int | None # 토픽 메타. NULL = 미설정
items: list[ExamRoundProgress]
@router.get("/{topic_id}/exam-rounds", response_model=ExamRoundsResponse)
async def list_exam_rounds(
topic_id: int,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
):
"""토픽 안 회차별 진행률 집계. 진행률 표시 + 재개 UX 용.
is_complete = (exam_round_size IS NOT NULL AND question_count >= exam_round_size).
next_question_number = max + 1 (도달 시 NULL).
"""
topic = await session.get(StudyTopic, topic_id)
_verify_topic_ownership(topic, user)
from models.study_question import StudyQuestion as _SQ
rows = (
await session.execute(
select(
_SQ.exam_round,
func.count().label("question_count"),
func.coalesce(func.max(_SQ.exam_question_number), 0).label("max_qnum"),
)
.where(
_SQ.user_id == user.id,
_SQ.study_topic_id == topic_id,
_SQ.deleted_at.is_(None),
_SQ.exam_round.is_not(None),
)
.group_by(_SQ.exam_round)
.order_by(_SQ.exam_round.desc())
)
).all()
size = topic.exam_round_size
items: list[ExamRoundProgress] = []
for r in rows:
cnt = int(r.question_count)
mx = int(r.max_qnum)
is_complete = bool(size is not None and cnt >= size)
# 도달했으면 next None, 아니면 max+1 (max=0 이면 1번부터)
next_n = None if is_complete else (mx + 1 if mx > 0 else 1)
items.append(ExamRoundProgress(
exam_round=r.exam_round,
question_count=cnt,
max_question_number=mx,
next_question_number=next_n,
is_complete=is_complete,
))
return ExamRoundsResponse(exam_round_size=size, items=items)
@router.get("/by-document/{document_id}", response_model=list[StudyTopicMeta])
async def list_topics_for_document(
document_id: int,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
):
"""이 자료가 매핑된 active 주제 목록.
/study/sources 카드의 "이 자료가 속한 주제" 배지·컨텍스트 메뉴용.
"""
rows = (
await session.execute(
select(StudyTopic)
.join(StudyTopicDocument, StudyTopicDocument.study_topic_id == StudyTopic.id)
.where(
StudyTopicDocument.document_id == document_id,
StudyTopicDocument.user_id == user.id,
StudyTopic.deleted_at.is_(None),
)
.order_by(StudyTopic.sort_order.asc(), StudyTopic.id.desc())
)
).scalars().all()
return [_meta_from_topic(t) for t in rows]
@router.post("/", response_model=StudyTopicResponse, status_code=201)
async def create_study_topic(
body: StudyTopicCreate,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
):
"""주제 생성. 같은 이름의 active 주제가 이미 있으면 409.
soft-deleted 동명 주제는 partial unique index 에서 빠지므로 신규 생성 가능.
"""
topic = StudyTopic(
user_id=user.id,
name=body.name.strip(),
description=body.description,
color=body.color,
study_type=body.study_type,
sort_order=body.sort_order,
exam_round_size=body.exam_round_size,
exam_subjects=body.exam_subjects or [],
)
session.add(topic)
try:
await session.flush()
await session.commit()
except IntegrityError:
await session.rollback()
raise HTTPException(status_code=409, detail="이미 같은 이름의 학습 주제가 있습니다")
return StudyTopicResponse(
id=topic.id,
name=topic.name,
description=topic.description,
color=topic.color,
study_type=topic.study_type,
sort_order=topic.sort_order,
session_count=0,
document_count=0,
question_count=0,
exam_round_size=topic.exam_round_size,
exam_subjects=topic.exam_subjects or [],
created_at=topic.created_at,
updated_at=topic.updated_at,
)
@router.get("/{topic_id}", response_model=StudyTopicDetailResponse)
async def get_study_topic(
topic_id: int,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
):
"""통합 뷰: 주제 메타 + 세션 목록 + 자료 목록 + stats.
응답 형태는 후속 PR(오디오/단어장/문제세트 추가) 에서 sections / stats 키만 늘리면 되도록 dict 구조.
"""
topic = await session.get(StudyTopic, topic_id)
topic = _verify_topic_ownership(topic, user)
# 세션 목록 — 최근순
sess_rows = (
await session.execute(
select(StudySession)
.where(
StudySession.user_id == user.id,
StudySession.study_topic_id == topic_id,
)
.order_by(StudySession.created_at.desc(), StudySession.id.desc())
)
).scalars().all()
sessions_payload = [
StudyTopicSessionSummary(
id=s.id,
study_type=s.study_type,
certification=s.certification,
language_code=s.language_code,
learning_level=s.learning_level,
subject=s.subject,
topic=s.topic,
mode=s.mode,
repetition_count=s.repetition_count,
review_state=s.review_state,
created_at=s.created_at,
updated_at=s.updated_at,
)
for s in sess_rows
]
# 자료 목록 — 매핑 sort_order → created_at desc
doc_rows = (
await session.execute(
select(Document, StudyTopicDocument)
.join(
StudyTopicDocument,
and_(
StudyTopicDocument.document_id == Document.id,
StudyTopicDocument.study_topic_id == topic_id,
StudyTopicDocument.user_id == user.id,
),
)
.where(Document.deleted_at.is_(None))
.order_by(
StudyTopicDocument.sort_order.asc(),
StudyTopicDocument.created_at.desc(),
)
)
).all()
def _extract_library_paths(tags: list | None) -> list[str]:
if not tags:
return []
out: list[str] = []
for t in tags:
if isinstance(t, str) and t.startswith(LIBRARY_PREFIX):
out.append(t[len(LIBRARY_PREFIX):])
return out
documents_payload = [
StudyTopicDocumentSummary(
id=d.id,
title=d.title,
file_format=d.file_format,
file_type=str(d.file_type) if d.file_type is not None else "immutable",
category=str(d.category) if d.category is not None else None,
ai_domain=d.ai_domain,
importance=d.importance,
sort_order=link.sort_order,
linked_at=link.created_at,
library_paths=_extract_library_paths(d.user_tags),
)
for d, link in doc_rows
]
# 문제 목록 (PR-2) — 본문 truncate, 정답·해설 비공개. attempt 통계 batch 로 끌어옴.
from models.study_question import StudyQuestion, StudyQuestionAttempt
q_rows = (
await session.execute(
select(StudyQuestion)
.where(
StudyQuestion.user_id == user.id,
StudyQuestion.study_topic_id == topic_id,
StudyQuestion.deleted_at.is_(None),
)
.order_by(StudyQuestion.created_at.desc(), StudyQuestion.id.desc())
)
).scalars().all()
qids = [q.id for q in q_rows]
q_attempt_count: dict[int, int] = {}
q_last_correct: dict[int, bool] = {}
if qids:
from sqlalchemy import case as _case
cnt_rows = (
await session.execute(
select(
StudyQuestionAttempt.study_question_id,
func.count().label("total"),
)
.where(
StudyQuestionAttempt.user_id == user.id,
StudyQuestionAttempt.study_question_id.in_(qids),
)
.group_by(StudyQuestionAttempt.study_question_id)
)
).all()
for r in cnt_rows:
q_attempt_count[r.study_question_id] = int(r.total)
latest_rows = (
await session.execute(
select(
StudyQuestionAttempt.study_question_id,
StudyQuestionAttempt.is_correct,
)
.where(
StudyQuestionAttempt.user_id == user.id,
StudyQuestionAttempt.study_question_id.in_(qids),
)
.order_by(
StudyQuestionAttempt.study_question_id,
StudyQuestionAttempt.answered_at.desc(),
)
.distinct(StudyQuestionAttempt.study_question_id)
)
).all()
for r in latest_rows:
q_last_correct[r.study_question_id] = bool(r.is_correct)
def _truncate_q(text: str, n: int = 80) -> str:
return text if len(text) <= n else text[:n].rstrip() + ""
questions_payload = [
StudyTopicQuestionSummary(
id=q.id,
question_text=_truncate_q(q.question_text, 80),
subject=q.subject,
scope=q.scope,
exam_name=q.exam_name,
exam_round=q.exam_round,
exam_question_number=q.exam_question_number,
is_active=q.is_active,
attempt_count=q_attempt_count.get(q.id, 0),
last_correct=q_last_correct.get(q.id),
created_at=q.created_at,
)
for q in q_rows
]
return StudyTopicDetailResponse(
topic=_meta_from_topic(topic),
sections=StudyTopicSections(
sessions=sessions_payload,
documents=documents_payload,
questions=questions_payload,
),
stats=StudyTopicStats(
session_count=len(sessions_payload),
document_count=len(documents_payload),
question_count=len(questions_payload),
# 후속 PR 에서 채움
audio_count=0,
vocab_count=0,
question_set_count=0,
),
)
@router.patch("/{topic_id}", response_model=StudyTopicResponse)
async def update_study_topic(
topic_id: int,
body: StudyTopicUpdate,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
):
topic = await session.get(StudyTopic, topic_id)
topic = _verify_topic_ownership(topic, user)
fields_set = body.model_fields_set
if "name" in fields_set and body.name is not None:
topic.name = body.name.strip()
for fname in {"description", "color", "study_type", "sort_order"} & fields_set:
setattr(topic, fname, getattr(body, fname))
# PR-6: 시험 메타. exam_subjects 가 None 이면 (PATCH 미명시) 기존 유지, 빈 배열은 명시적 클리어
if "exam_round_size" in fields_set:
topic.exam_round_size = body.exam_round_size
if "exam_subjects" in fields_set and body.exam_subjects is not None:
topic.exam_subjects = body.exam_subjects
topic.updated_at = datetime.now(timezone.utc)
try:
await session.commit()
except IntegrityError:
await session.rollback()
raise HTTPException(status_code=409, detail="이미 같은 이름의 학습 주제가 있습니다")
# 응답용 카운트 별도 조회
sc = (await session.execute(
select(func.count())
.select_from(StudySession)
.where(StudySession.user_id == user.id, StudySession.study_topic_id == topic.id)
)).scalar() or 0
dc = (await session.execute(
select(func.count())
.select_from(StudyTopicDocument)
.where(StudyTopicDocument.study_topic_id == topic.id)
)).scalar() or 0
from models.study_question import StudyQuestion as _SQ2
qc = (await session.execute(
select(func.count())
.select_from(_SQ2)
.where(
_SQ2.user_id == user.id,
_SQ2.study_topic_id == topic.id,
_SQ2.deleted_at.is_(None),
)
)).scalar() or 0
return StudyTopicResponse(
id=topic.id,
name=topic.name,
description=topic.description,
color=topic.color,
study_type=topic.study_type,
sort_order=topic.sort_order,
session_count=int(sc),
document_count=int(dc),
question_count=int(qc),
exam_round_size=topic.exam_round_size,
exam_subjects=topic.exam_subjects or [],
created_at=topic.created_at,
updated_at=topic.updated_at,
)
@router.delete("/{topic_id}", status_code=204)
async def delete_study_topic(
topic_id: int,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
):
"""주제 soft delete. 세션의 study_topic_id 는 SET NULL, 문서 매핑은 cascade 제거.
같은 이름 재생성을 위해 partial unique index 가 deleted_at IS NULL 인 행만 본다.
"""
topic = await session.get(StudyTopic, topic_id)
topic = _verify_topic_ownership(topic, user)
# 세션 FK SET NULL — 명시적으로 (DB ON DELETE SET NULL 은 hard delete 시에만 동작)
await session.execute(
update(StudySession)
.where(
StudySession.user_id == user.id,
StudySession.study_topic_id == topic_id,
)
.values(study_topic_id=None)
)
# 문서 매핑 명시 제거 (soft delete 라 FK CASCADE 안 탐)
await session.execute(
delete(StudyTopicDocument).where(
StudyTopicDocument.study_topic_id == topic_id,
)
)
topic.deleted_at = datetime.now(timezone.utc)
await session.commit()
# ─── 자료(문서) 매핑 ───
@router.post(
"/{topic_id}/documents",
response_model=StudyTopicDocumentLinkResponse,
status_code=201,
)
async def link_documents_to_topic(
topic_id: int,
body: StudyTopicDocumentLinkRequest,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
):
"""자료 N개를 주제에 일괄 매핑. 이미 연결된 건 skipped_existing 으로 보고."""
topic = await session.get(StudyTopic, topic_id)
topic = _verify_topic_ownership(topic, user)
requested = list(dict.fromkeys(body.document_ids)) # 중복 제거 + 순서 보존
# 존재 + 미삭제 documents 만 통과 (single-user 시스템: documents.user_id 부재 가능)
valid_rows = (
await session.execute(
select(Document.id).where(
Document.id.in_(requested),
Document.deleted_at.is_(None),
)
)
).scalars().all()
valid_set = set(valid_rows)
not_found = [d for d in requested if d not in valid_set]
# 이미 매핑된 것
existing_rows = (
await session.execute(
select(StudyTopicDocument.document_id).where(
StudyTopicDocument.study_topic_id == topic_id,
StudyTopicDocument.document_id.in_(valid_set),
)
)
).scalars().all()
existing_set = set(existing_rows)
to_link = [d for d in requested if d in valid_set and d not in existing_set]
# 기존 max sort_order 다음부터
max_sort = (
await session.execute(
select(func.coalesce(func.max(StudyTopicDocument.sort_order), -1))
.where(StudyTopicDocument.study_topic_id == topic_id)
)
).scalar() or -1
for i, doc_id in enumerate(to_link, start=1):
session.add(StudyTopicDocument(
study_topic_id=topic_id,
document_id=doc_id,
user_id=user.id,
sort_order=int(max_sort) + i,
))
try:
await session.commit()
except IntegrityError:
await session.rollback()
raise HTTPException(status_code=409, detail="자료 매핑 중 충돌이 발생했습니다")
return StudyTopicDocumentLinkResponse(
linked=to_link,
skipped_existing=sorted(existing_set),
skipped_not_found=not_found,
)
@router.post(
"/{topic_id}/documents/by-path",
response_model=StudyTopicDocumentByPathResponse,
status_code=201,
)
async def link_documents_by_library_path(
topic_id: int,
body: StudyTopicDocumentByPathRequest,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
):
"""자료실 카테고리 path 기준으로 하위 자료를 일괄 매핑.
/api/documents/library?path=... 와 동일한 user_tags(@library/...) prefix 매칭 사용.
100건 limit 우회 — 한 카테고리에 자료가 많아도 한 번에 처리.
"""
topic = await session.get(StudyTopic, topic_id)
topic = _verify_topic_ownership(topic, user)
try:
normalized = normalize_library_path(body.path)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
exact_tag = f"{LIBRARY_PREFIX}{normalized}"
prefix_tag = f"{LIBRARY_PREFIX}{normalized}/%"
# @library/<path> 정확 일치 또는 (subtree 옵션 시) 그 하위까지 prefix 매칭
if body.include_subtree:
tag_filter = sql_text("""
EXISTS (
SELECT 1 FROM jsonb_array_elements_text(documents.user_tags) AS t
WHERE t = :exact OR t LIKE :prefix
)
""").bindparams(exact=exact_tag, prefix=prefix_tag)
else:
tag_filter = sql_text("""
EXISTS (
SELECT 1 FROM jsonb_array_elements_text(documents.user_tags) AS t
WHERE t = :exact
)
""").bindparams(exact=exact_tag)
candidate_rows = (
await session.execute(
select(Document.id).where(
Document.deleted_at.is_(None),
Document.category == "library",
tag_filter,
)
)
).scalars().all()
candidate_set = set(candidate_rows)
total_in_path = len(candidate_set)
if total_in_path == 0:
return StudyTopicDocumentByPathResponse(
linked_count=0,
skipped_existing_count=0,
total_in_path=0,
path=normalized,
)
existing_rows = (
await session.execute(
select(StudyTopicDocument.document_id).where(
StudyTopicDocument.study_topic_id == topic_id,
StudyTopicDocument.document_id.in_(candidate_set),
)
)
).scalars().all()
existing_set = set(existing_rows)
to_link = [d for d in candidate_rows if d not in existing_set]
max_sort = (
await session.execute(
select(func.coalesce(func.max(StudyTopicDocument.sort_order), -1))
.where(StudyTopicDocument.study_topic_id == topic_id)
)
).scalar() or -1
for i, doc_id in enumerate(to_link, start=1):
session.add(StudyTopicDocument(
study_topic_id=topic_id,
document_id=doc_id,
user_id=user.id,
sort_order=int(max_sort) + i,
))
try:
await session.commit()
except IntegrityError:
await session.rollback()
raise HTTPException(status_code=409, detail="자료 매핑 중 충돌이 발생했습니다")
return StudyTopicDocumentByPathResponse(
linked_count=len(to_link),
skipped_existing_count=len(existing_set),
total_in_path=total_in_path,
path=normalized,
)
@router.delete("/{topic_id}/documents/{document_id}", status_code=204)
async def unlink_document_from_topic(
topic_id: int,
document_id: int,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
):
topic = await session.get(StudyTopic, topic_id)
_verify_topic_ownership(topic, user)
result = await session.execute(
delete(StudyTopicDocument).where(
StudyTopicDocument.study_topic_id == topic_id,
StudyTopicDocument.document_id == document_id,
StudyTopicDocument.user_id == user.id,
)
)
await session.commit()
if result.rowcount == 0:
raise HTTPException(status_code=404, detail="매핑을 찾을 수 없습니다")
# ─── 세션 매핑 ───
@router.post("/{topic_id}/sessions/{session_id}", status_code=204)
async def attach_session_to_topic(
topic_id: int,
session_id: int,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
):
"""기존 세션을 주제에 연결. 다른 주제에 이미 연결돼 있으면 새 주제로 갱신 (1:N)."""
topic = await session.get(StudyTopic, topic_id)
_verify_topic_ownership(topic, user)
sess = await session.get(StudySession, session_id)
if sess is None or sess.user_id != user.id:
raise HTTPException(status_code=404, detail="학습 세션을 찾을 수 없습니다")
sess.study_topic_id = topic_id
sess.updated_at = datetime.now(timezone.utc)
await session.commit()
@router.delete("/{topic_id}/sessions/{session_id}", status_code=204)
async def detach_session_from_topic(
topic_id: int,
session_id: int,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
):
"""세션 분리. 현재 study_topic_id 가 topic_id 가 아니면 404."""
topic = await session.get(StudyTopic, topic_id)
_verify_topic_ownership(topic, user)
sess = await session.get(StudySession, session_id)
if sess is None or sess.user_id != user.id:
raise HTTPException(status_code=404, detail="학습 세션을 찾을 수 없습니다")
if sess.study_topic_id != topic_id:
raise HTTPException(status_code=404, detail="해당 주제에 연결된 세션이 아닙니다")
sess.study_topic_id = None
sess.updated_at = datetime.now(timezone.utc)
await session.commit()
# ─── PR-9: 분야 설명 (study_topic_subject_notes) ───
SUBJECT_NOTE_TIMEOUT_S = 30.0
_SUBJECT_NOTE_PROMPT_PATH = "study_subject_note.txt"
_subject_note_prompt_cache: str | None = None
def _load_subject_note_prompt() -> str:
global _subject_note_prompt_cache
if _subject_note_prompt_cache is None:
prompts_dir = _Path(__file__).resolve().parent.parent / "prompts"
_subject_note_prompt_cache = (prompts_dir / _SUBJECT_NOTE_PROMPT_PATH).read_text(encoding="utf-8")
return _subject_note_prompt_cache
def _render_subject_note_prompt(subject: str, scope: str, doc_block: str, q_block: str) -> str:
template = _load_subject_note_prompt()
return (
template
.replace("{subject}", subject)
.replace("{scope}", scope or "(미지정)")
.replace("{documents_evidence_block}", doc_block)
.replace("{questions_evidence_block}", q_block)
)
class SubjectNoteRequest(BaseModel):
subject: str = Field(min_length=1, max_length=120)
scope: str = Field(default="", max_length=200)
regenerate: bool = False
class SubjectNoteEvidence(BaseModel):
source_type: str
source_id: int
title: str
snippet: str
class SubjectNoteResponse(BaseModel):
subject: str
scope: str
content: str | None
status: str # ready | failed | none | stale | pending
generated_at: datetime | None
model: str | None
evidence: list[SubjectNoteEvidence] = []
from_cache: bool = False
can_regenerate: bool = True
def _note_cache_response(note: StudyTopicSubjectNote) -> SubjectNoteResponse:
return SubjectNoteResponse(
subject=note.subject,
scope=note.scope,
content=note.content,
status=note.status,
generated_at=note.generated_at,
model=note.model,
evidence=[],
from_cache=True,
can_regenerate=True,
)
@router.get("/{topic_id}/subject-notes", response_model=SubjectNoteResponse)
async def get_subject_note(
topic_id: int,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
subject: str = Query(..., min_length=1, max_length=120),
scope: str = Query("", max_length=200),
):
"""캐시 조회. 없으면 status='none' + content=null 응답."""
topic = await session.get(StudyTopic, topic_id)
_verify_topic_ownership(topic, user)
note = (
await session.execute(
select(StudyTopicSubjectNote).where(
StudyTopicSubjectNote.user_id == user.id,
StudyTopicSubjectNote.study_topic_id == topic_id,
StudyTopicSubjectNote.subject == subject,
StudyTopicSubjectNote.scope == scope,
)
)
).scalar_one_or_none()
if note is None:
return SubjectNoteResponse(
subject=subject, scope=scope, content=None, status="none",
generated_at=None, model=None, evidence=[], from_cache=True, can_regenerate=True,
)
return _note_cache_response(note)
@router.post("/{topic_id}/subject-notes/generate", response_model=SubjectNoteResponse)
async def generate_subject_note(
topic_id: int,
body: SubjectNoteRequest,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
):
"""분야 설명 AI 생성 + 캐시. PR-3 race-safe pending 패턴 동일.
regenerate=false + status=ready → 캐시 반환.
pending → 409.
그 외 → 새 생성. 실패 시 status='failed', 직전 본문 보존.
"""
topic = await session.get(StudyTopic, topic_id)
_verify_topic_ownership(topic, user)
# upsert: 기존 행 있으면 사용, 없으면 신규
note = (
await session.execute(
select(StudyTopicSubjectNote).where(
StudyTopicSubjectNote.user_id == user.id,
StudyTopicSubjectNote.study_topic_id == topic_id,
StudyTopicSubjectNote.subject == body.subject,
StudyTopicSubjectNote.scope == body.scope,
)
)
).scalar_one_or_none()
if note is None:
note = StudyTopicSubjectNote(
user_id=user.id,
study_topic_id=topic_id,
subject=body.subject,
scope=body.scope,
status="none",
)
session.add(note)
await session.flush()
await session.commit()
else:
# 캐시 단축
if not body.regenerate:
if note.status == "ready":
return _note_cache_response(note)
if note.status == "pending":
raise HTTPException(
status_code=409,
detail={"status": "pending", "detail": "이미 생성 중입니다"},
)
# none/failed/stale → 새로 생성
else:
if note.status == "pending":
raise HTTPException(
status_code=409,
detail={"status": "pending", "detail": "이미 생성 중입니다"},
)
# race-safe pending 전이
lock = await session.execute(
update(StudyTopicSubjectNote)
.where(
StudyTopicSubjectNote.id == note.id,
StudyTopicSubjectNote.status != "pending",
)
.values(status="pending", updated_at=datetime.now(timezone.utc))
.returning(StudyTopicSubjectNote.id)
)
if lock.scalar_one_or_none() is None:
raise HTTPException(
status_code=409,
detail={"status": "pending", "detail": "이미 생성 중입니다"},
)
await session.commit()
# RAG
try:
ctx = await gather_subject_note_context(session, user.id, topic_id, body.subject, body.scope)
except Exception as e:
logger.warning("subject_note_rag_failed: %s: %s", type(e).__name__, e)
ctx = SubjectNoteContext(documents=[], questions=[])
doc_block = render_evidence_block(ctx.documents)
q_block = render_evidence_block(ctx.questions)
prompt = _render_subject_note_prompt(body.subject, body.scope, doc_block, q_block)
ai_client = AIClient()
raw_text: str | None = None
try:
async with get_mlx_gate():
async with asyncio.timeout(SUBJECT_NOTE_TIMEOUT_S):
raw_text = await ai_client.call_primary(prompt)
except asyncio.TimeoutError:
logger.warning("subject_note_mlx_timeout topic=%s subject=%s", topic_id, body.subject)
except Exception:
logger.exception("subject_note_mlx_failed topic=%s subject=%s", topic_id, body.subject)
finally:
await ai_client.close()
note = await session.get(StudyTopicSubjectNote, note.id)
if not raw_text or not raw_text.strip():
note.status = "failed"
note.updated_at = datetime.now(timezone.utc)
await session.commit()
return SubjectNoteResponse(
subject=note.subject, scope=note.scope, content=note.content,
status="failed", generated_at=note.generated_at, model=note.model,
evidence=[e.to_dict() for e in ctx.all],
from_cache=False, can_regenerate=True,
)
cleaned = strip_thinking(raw_text).strip()
note.content = cleaned
note.status = "ready"
note.generated_at = datetime.now(timezone.utc)
primary_name = ai_client.ai.primary.model if hasattr(ai_client.ai.primary, "model") else "primary"
note.model = f"mlx:{primary_name}"
note.updated_at = note.generated_at
await session.commit()
return SubjectNoteResponse(
subject=note.subject, scope=note.scope, content=note.content,
status="ready", generated_at=note.generated_at, model=note.model,
evidence=[e.to_dict() for e in ctx.all],
from_cache=False, can_regenerate=True,
)
# ─── PR-10: 문제풀이 세션 (quiz_session) lifecycle ───
#
# 한 토픽당 in_progress 1개. 출제 시 session 행 생성 + question_ids 스냅샷.
# 풀이 중 attempt 마다 cursor + count 증가 (study_questions.submit_attempt 가 같은 트랜잭션에서).
# 마지막 문제 풀이 후 status='done' + finished_at 박힘.
class QuizMode(str, Enum):
"""PR-12-B: 출제 모드. 1차는 random 만. PR-12-C 에서 frequent_focus / wrong_variants 추가 예약."""
random = "random"
class QuizSessionStartRequest(BaseModel):
target_per_subject: int = Field(default=20, ge=1, le=100)
subject: str | None = Field(default=None, max_length=120)
scope: str | None = Field(default=None, max_length=200)
wrong_only: bool = False
abandon_existing: bool = False # true 면 기존 in_progress 세션을 abandoned 로 마감 후 새로
quiz_mode: QuizMode = QuizMode.random
class QuizSessionSummary(BaseModel):
"""진행 카드 + 결과 카드 공통 (가벼운 메타). 문제 본문은 상세 endpoint 에서."""
id: int
status: str
cursor: int
total: int
correct_count: int
wrong_count: int
unsure_count: int
target_per_subject: int
subject_filter: str | None
wrong_only: bool
quiz_mode: str # PR-12-B: 세션 메타. 향후 모드별 정답률 통계용.
subject_distribution: dict
created_at: datetime
updated_at: datetime
finished_at: datetime | None
# 결과 카드 헤더 "미확인 N건" 용 — done 세션에서만 의미.
unreviewed_wrong_unsure_count: int = 0
class QuizSessionListResponse(BaseModel):
active: QuizSessionSummary | None
recent_done: list[QuizSessionSummary]
async def _build_session_summary(
s: StudyQuizSession,
session: AsyncSession,
) -> QuizSessionSummary:
"""status='done' 일 때 미확인 카운트(reviewed_at NULL + outcome IN wrong/unsure) 계산."""
unreviewed = 0
if s.status == "done":
row = (
await session.execute(
select(func.count())
.select_from(StudyQuestionAttempt)
.where(
StudyQuestionAttempt.quiz_session_id == s.id,
StudyQuestionAttempt.outcome.in_(("wrong", "unsure")),
StudyQuestionAttempt.reviewed_at.is_(None),
)
)
).scalar() or 0
unreviewed = int(row)
return QuizSessionSummary(
id=s.id,
status=s.status,
cursor=s.cursor,
total=len(s.question_ids or []),
correct_count=s.correct_count,
wrong_count=s.wrong_count,
unsure_count=s.unsure_count,
target_per_subject=s.target_per_subject,
subject_filter=s.subject_filter,
wrong_only=s.wrong_only,
quiz_mode=s.quiz_mode,
subject_distribution=s.subject_distribution or {},
created_at=s.created_at,
updated_at=s.updated_at,
finished_at=s.finished_at,
unreviewed_wrong_unsure_count=unreviewed,
)
async def _select_questions_for_topic(
session: AsyncSession,
user: User,
topic_id: int,
*,
subject: str | None,
scope: str | None,
target_per_subject: int,
wrong_only: bool,
apply_spacing: bool = False,
) -> tuple[list[int], dict[str, int]]:
"""과목별 균등 추출 + (옵션) PR-12-B type spacing.
apply_spacing=True 면 subject bucket 별로 buffer × SPACING_BUFFER_RATIO 만큼
뽑은 뒤 apply_type_spacing 으로 같은 유형 과밀 방지. 회차 무관.
"""
from services.study.related_types import (
SPACING_BUFFER_RATIO,
apply_type_spacing,
make_spacing_candidates_from_rows,
)
base_filter = and_(
StudyQuestion.user_id == user.id,
StudyQuestion.study_topic_id == topic_id,
StudyQuestion.deleted_at.is_(None),
StudyQuestion.is_active.is_(True),
)
wrong_qids: set[int] | None = None
if wrong_only:
latest = (
await session.execute(
select(
StudyQuestionAttempt.study_question_id,
StudyQuestionAttempt.is_correct,
)
.where(
StudyQuestionAttempt.user_id == user.id,
StudyQuestionAttempt.study_topic_id == topic_id,
)
.order_by(
StudyQuestionAttempt.study_question_id,
StudyQuestionAttempt.answered_at.desc(),
)
.distinct(StudyQuestionAttempt.study_question_id)
)
).all()
wrong_qids = {r.study_question_id for r in latest if r.is_correct is False}
if not wrong_qids:
return [], {}
if subject is not None:
subjects: list[str | None] = [subject]
else:
subj_query = (
select(StudyQuestion.subject)
.where(base_filter)
.distinct()
)
if scope is not None:
subj_query = subj_query.where(StudyQuestion.scope == scope)
if wrong_qids is not None:
subj_query = subj_query.where(StudyQuestion.id.in_(wrong_qids))
subjects = [r[0] for r in (await session.execute(subj_query)).all()]
if not subjects:
return [], {}
selected_ids: list[int] = []
distribution: dict[str, int] = {}
for subj in subjects:
if apply_spacing:
# spacing buffer × ratio. subject bucket 안에서만 cosine 비교.
buffer_limit = max(target_per_subject, int(target_per_subject * SPACING_BUFFER_RATIO))
sub_q = select(
StudyQuestion.id,
StudyQuestion.embedding,
StudyQuestion.embedding_status,
).where(base_filter)
else:
sub_q = select(StudyQuestion.id).where(base_filter)
buffer_limit = target_per_subject
if subj is None:
sub_q = sub_q.where(StudyQuestion.subject.is_(None))
else:
sub_q = sub_q.where(StudyQuestion.subject == subj)
if scope is not None:
sub_q = sub_q.where(StudyQuestion.scope == scope)
if wrong_qids is not None:
sub_q = sub_q.where(StudyQuestion.id.in_(wrong_qids))
sub_q = sub_q.order_by(func.random()).limit(buffer_limit)
rows = (await session.execute(sub_q)).all()
if apply_spacing:
# rows 가 _SpacingCandidate 의 row protocol 만족 (id/embedding/embedding_status).
spacing_cands = make_spacing_candidates_from_rows(rows)
spacing_res = await apply_type_spacing(
spacing_cands,
target=target_per_subject,
subject_label=subj or "",
)
picked = spacing_res.selected_ids
else:
picked = [r[0] for r in rows[:target_per_subject]]
if picked:
selected_ids.extend(picked)
distribution[subj or ""] = len(picked)
_random.shuffle(selected_ids)
return selected_ids, distribution
@router.get("/{topic_id}/quiz-sessions", response_model=QuizSessionListResponse)
async def list_quiz_sessions(
topic_id: int,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
limit: int = Query(20, ge=1, le=100),
):
"""진행 중 + 최근 완료 N건. 통합뷰 진행/결과 카드용."""
topic = await session.get(StudyTopic, topic_id)
_verify_topic_ownership(topic, user)
active_row = (
await session.execute(
select(StudyQuizSession).where(
StudyQuizSession.user_id == user.id,
StudyQuizSession.study_topic_id == topic_id,
StudyQuizSession.status == "in_progress",
)
)
).scalar_one_or_none()
active_summary = await _build_session_summary(active_row, session) if active_row else None
done_rows = (
await session.execute(
select(StudyQuizSession)
.where(
StudyQuizSession.user_id == user.id,
StudyQuizSession.study_topic_id == topic_id,
StudyQuizSession.status == "done",
)
.order_by(StudyQuizSession.finished_at.desc().nulls_last())
.limit(limit)
)
).scalars().all()
done = [await _build_session_summary(s, session) for s in done_rows]
return QuizSessionListResponse(active=active_summary, recent_done=done)
@router.post("/{topic_id}/quiz-sessions", response_model=QuizSessionSummary, status_code=201)
async def start_quiz_session(
topic_id: int,
body: QuizSessionStartRequest,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
):
"""새 세션 시작. 기존 in_progress 가 있으면:
- body.abandon_existing=false → 기존 세션 그대로 반환 (이어풀기).
- body.abandon_existing=true → 기존을 abandoned 로 마감 후 새로 출제.
"""
topic = await session.get(StudyTopic, topic_id)
_verify_topic_ownership(topic, user)
existing = (
await session.execute(
select(StudyQuizSession).where(
StudyQuizSession.user_id == user.id,
StudyQuizSession.study_topic_id == topic_id,
StudyQuizSession.status == "in_progress",
)
)
).scalar_one_or_none()
if existing is not None:
if not body.abandon_existing:
# 이어풀기 — 기존 그대로 반환.
return await _build_session_summary(existing, session)
existing.status = "abandoned"
existing.updated_at = datetime.now(timezone.utc)
await session.flush()
# 신규 출제. PR-12-B: random 모드 → subject bucket 단위 type spacing 적용.
apply_spacing = body.quiz_mode == QuizMode.random
qids, distribution = await _select_questions_for_topic(
session,
user,
topic_id,
subject=body.subject,
scope=body.scope,
target_per_subject=body.target_per_subject,
wrong_only=body.wrong_only,
apply_spacing=apply_spacing,
)
if not qids:
raise HTTPException(status_code=400, detail="출제 가능한 문제가 없습니다")
new_session = StudyQuizSession(
user_id=user.id,
study_topic_id=topic_id,
target_per_subject=body.target_per_subject,
subject_filter=body.subject,
wrong_only=body.wrong_only,
quiz_mode=body.quiz_mode.value,
question_ids=qids,
subject_distribution=distribution,
status="in_progress",
cursor=0,
)
session.add(new_session)
try:
await session.commit()
except IntegrityError:
# partial unique idx 충돌 — 동시에 다른 호출이 in_progress 만든 경우.
await session.rollback()
existing = (
await session.execute(
select(StudyQuizSession).where(
StudyQuizSession.user_id == user.id,
StudyQuizSession.study_topic_id == topic_id,
StudyQuizSession.status == "in_progress",
)
)
).scalar_one_or_none()
if existing is None:
raise HTTPException(status_code=409, detail="quiz_session 충돌 — 다시 시도하세요")
return await _build_session_summary(existing, session)
return await _build_session_summary(new_session, session)
class QuizSessionAttemptItem(BaseModel):
"""결과 카드 expand 시 카드별 메타 (attempt + 학습완료 상태)."""
attempt_id: int
question_id: int
selected_choice: int | None
correct_choice: int
outcome: str
answered_at: datetime
reviewed_at: datetime | None
class QuizSessionDetailResponse(BaseModel):
summary: QuizSessionSummary
questions: list[dict] # ReviewQuestionItem 호환 shape
attempts: list[QuizSessionAttemptItem] # 풀이된 것만 (cursor 까지)
def _attempt_stats_dict_default() -> dict:
return {"attempt_count": 0, "correct_count": 0, "wrong_count": 0}
@router.get("/{topic_id}/quiz-sessions/{session_id}", response_model=QuizSessionDetailResponse)
async def get_quiz_session(
topic_id: int,
session_id: int,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
):
"""풀이 화면 / 결과 화면 공용. questions 는 출제 순서. attempts 는 cursor 까지 풀이된 것만."""
topic = await session.get(StudyTopic, topic_id)
_verify_topic_ownership(topic, user)
qs = await session.get(StudyQuizSession, session_id)
if qs is None or qs.user_id != user.id or qs.study_topic_id != topic_id:
raise HTTPException(status_code=404, detail="quiz_session 을 찾을 수 없습니다")
qids = list(qs.question_ids or [])
questions_payload: list[dict] = []
if qids:
rows = (
await session.execute(
select(StudyQuestion).where(StudyQuestion.id.in_(qids))
)
).scalars().all()
by_id = {q.id: q for q in rows}
# 이미지 batch — 기존 study_questions._images_for_questions_batch 재사용 (sort_order 기준).
from api.study_questions import _images_for_questions_batch
images_typed = await _images_for_questions_batch(session, qids)
images_map: dict[int, list[dict]] = {
qid: [it.model_dump() for it in items] for qid, items in images_typed.items()
}
# attempt count batch (per question)
stat_rows = (
await session.execute(
select(
StudyQuestionAttempt.study_question_id,
func.count().label("total"),
func.coalesce(
func.sum(case((StudyQuestionAttempt.is_correct.is_(True), 1), else_=0)),
0,
).label("correct"),
)
.where(
StudyQuestionAttempt.user_id == user.id,
StudyQuestionAttempt.study_question_id.in_(qids),
)
.group_by(StudyQuestionAttempt.study_question_id)
)
).all()
stats_map: dict[int, dict] = {}
for r in stat_rows:
t = int(r.total)
c = int(r.correct)
stats_map[r.study_question_id] = {
"attempt_count": t, "correct_count": c, "wrong_count": t - c
}
# 출제 순서 그대로 nest
for qid in qids:
q = by_id.get(qid)
if q is None:
continue
questions_payload.append({
"id": q.id,
"question_text": q.question_text,
"choices": [
{"number": 1, "text": q.choice_1},
{"number": 2, "text": q.choice_2},
{"number": 3, "text": q.choice_3},
{"number": 4, "text": q.choice_4},
],
"subject": q.subject,
"scope": q.scope,
"exam_round": q.exam_round,
"explanation": q.explanation,
"stats": stats_map.get(q.id, _attempt_stats_dict_default()),
"images": images_map.get(q.id, []),
# done 세션 결과 화면용 — 정답 노출. in_progress 세션은 클라이언트가 cursor 까지만 표시.
"correct_choice": q.correct_choice if qs.status == "done" else None,
})
# 이 세션의 attempts (cursor 까지). 같은 question 에 여러 attempts 있을 수 있지만,
# 이 세션 내에서는 정상 흐름상 question 당 1개. quiz_session_id 기준으로 가져옴.
attempt_rows = (
await session.execute(
select(StudyQuestionAttempt)
.where(
StudyQuestionAttempt.user_id == user.id,
StudyQuestionAttempt.quiz_session_id == qs.id,
)
.order_by(StudyQuestionAttempt.answered_at.asc())
)
).scalars().all()
attempts_payload = [
QuizSessionAttemptItem(
attempt_id=a.id,
question_id=a.study_question_id,
selected_choice=a.selected_choice,
correct_choice=a.correct_choice,
outcome=a.outcome,
answered_at=a.answered_at,
reviewed_at=a.reviewed_at,
)
for a in attempt_rows
]
summary = await _build_session_summary(qs, session)
return QuizSessionDetailResponse(
summary=summary,
questions=questions_payload,
attempts=attempts_payload,
)
class QuizSessionPatchRequest(BaseModel):
"""abandon → status='abandoned'. 다른 필드 패치는 현재 안 받음."""
action: str = Field(..., pattern="^(abandon)$")
@router.patch("/{topic_id}/quiz-sessions/{session_id}", response_model=QuizSessionSummary)
async def patch_quiz_session(
topic_id: int,
session_id: int,
body: QuizSessionPatchRequest,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
):
"""현재 abandon 만 지원 — 진행 중 세션 포기 (다시 시작 시 사용)."""
topic = await session.get(StudyTopic, topic_id)
_verify_topic_ownership(topic, user)
qs = await session.get(StudyQuizSession, session_id)
if qs is None or qs.user_id != user.id or qs.study_topic_id != topic_id:
raise HTTPException(status_code=404, detail="quiz_session 을 찾을 수 없습니다")
if body.action == "abandon":
if qs.status != "in_progress":
raise HTTPException(status_code=409, detail=f"이미 {qs.status} 상태입니다")
qs.status = "abandoned"
qs.updated_at = datetime.now(timezone.utc)
await session.commit()
return await _build_session_summary(qs, session)
# ─── PR-12-A: 반복 출제 / 유사 유형 배치 카운트 ───
class RelatedTypesBulkRequest(BaseModel):
question_ids: list[int] = Field(..., min_length=1, max_length=200)
class RelatedTypesBulkItem(BaseModel):
repeat_related_count: int
repeat_round_count: int
repeat_grade: str | None = None
similar_related_count: int
similar_round_count: int
class RelatedTypesBulkResponse(BaseModel):
# 입력 question_ids 전체 보존 — 권한 없음/임베딩 미준비/회차 미지정도 (0,0,0,0).
items: dict[int, RelatedTypesBulkItem]
@router.post("/{topic_id}/related-types-bulk", response_model=RelatedTypesBulkResponse)
async def related_types_bulk(
topic_id: int,
body: RelatedTypesBulkRequest,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
):
"""카드별 배지(round_count) 표시용 배치 카운트.
비교 대상 = 같은 topic 안 모든 ready 문제 (입력 qid 끼리만 비교 X).
응답 dict 는 입력 qid 전체 보존 — 누락 X.
"""
from services.study.related_types import classify_related_bulk
topic = await session.get(StudyTopic, topic_id)
_verify_topic_ownership(topic, user)
counts = await classify_related_bulk(
session,
user_id=user.id,
study_topic_id=topic_id,
question_ids=body.question_ids,
)
return RelatedTypesBulkResponse(
items={
qid: RelatedTypesBulkItem(
repeat_related_count=c.repeat_related_count,
repeat_round_count=c.repeat_round_count,
repeat_grade=c.repeat_grade,
similar_related_count=c.similar_related_count,
similar_round_count=c.similar_round_count,
)
for qid, c in counts.items()
}
)