"""학습 워크스페이스(study_topic) API — 필기 세션 + 자료(library document) 묶음 컨테이너 핵심 개념: - study_topic 은 단순 폴더/태그가 아니라 학습 자산 1차 컨테이너 (학습 워크스페이스). - 향후 단어장/오디오/문제 세트 같은 자산이 같은 컨테이너 아래로 들어올 수 있도록 응답 구조를 sections + stats 형태로 설계 (현재는 sessions / documents 두 키만). - documents.category(자료실 UI 축) 와 직교한 별도 분류 축. 자료실 facet/카테고리는 미터치. - StudySession.certification/subject/topic 은 보존 — 본 컨테이너 와 직교한 세부 메타. 설계 제약: - study_type 은 강한 enum 미사용. VALID_STUDY_TYPES 는 단순 권장값 (DB/Pydantic 강제 안 함). - soft delete (deleted_at). active 행끼리만 (user_id, name) partial unique. - 권한: 모든 쿼리에 user_id 강제. 다른 사용자 자료를 묶지 못함. - 문서 매핑은 N:M (study_topic_documents). 세션 매핑은 1:N (study_sessions.study_topic_id). - polymorphic 단일 study_topic_items 테이블은 만들지 않는다 (영구 금지). """ import asyncio import logging import random as _random from datetime import datetime, timezone from enum import Enum from pathlib import Path as _Path from typing import Annotated, Any from fastapi import APIRouter, Depends, HTTPException, Query from pydantic import BaseModel, Field from sqlalchemy import and_, case, delete, func, or_ as sql_or, select, text as sql_text, update from sqlalchemy.exc import IntegrityError from sqlalchemy.ext.asyncio import AsyncSession from ai.client import AIClient, strip_thinking from eid.ai import EidAIClient from eid.compose import compose from core.auth import get_current_user from core.database import get_session from core.library import LIBRARY_PREFIX, normalize_library_path from models.document import Document from models.study_session import StudySession from models.study_topic import StudyTopic, StudyTopicDocument from models.study_question import StudyQuestion, StudyQuestionAttempt from models.study_question_image import StudyQuestionImage from models.study_quiz_session import StudyQuizSession from models.study_topic_subject_note import StudyTopicSubjectNote from models.eid_study_weakness import EidStudyWeakness from models.eid_review_set_draft import EidReviewSetDraft from models.user import User from services.search.llm_gate import Priority, acquire_mlx_gate from services.study.subject_note_rag import ( SubjectNoteContext, gather_subject_note_context, render_evidence_block, ) from services.study.weakness_compute import format_habit_block, format_weakness_block logger = logging.getLogger(__name__) router = APIRouter() # ─── 권장값 (강제 enum 아님, UI 안내용) ─── RECOMMENDED_STUDY_TYPES: set[str] = { "certification", "language", "school", "work", "general", } # ─── Pydantic 스키마 ─── class StudyTopicCreate(BaseModel): name: str = Field(min_length=1, max_length=120) description: str | None = None color: str | None = Field(default=None, max_length=20) study_type: str | None = Field(default=None, max_length=40) sort_order: int = 0 # PR-6: 시험 메타 exam_round_size: int | None = Field(default=None, ge=1, le=300) exam_subjects: list[str] = [] class StudyTopicUpdate(BaseModel): name: str | None = Field(default=None, min_length=1, max_length=120) description: str | None = None color: str | None = Field(default=None, max_length=20) study_type: str | None = Field(default=None, max_length=40) sort_order: int | None = None # PR-6: 시험 메타 exam_round_size: int | None = Field(default=None, ge=1, le=300) exam_subjects: list[str] | None = None # 공부 암기노트: 공부중 토글 (true=focused_at=now, false=clear) focused: bool | None = None class StudyTopicResponse(BaseModel): """주제 목록 응답 — 집계 카운트 포함.""" id: int name: str description: str | None color: str | None study_type: str | None sort_order: int session_count: int = 0 document_count: int = 0 question_count: int = 0 # PR-6: 시험 메타 exam_round_size: int | None = None exam_subjects: list[str] = [] # 공부 암기노트: 공부중 태그 상태 focused: bool = False created_at: datetime updated_at: datetime class StudyTopicListResponse(BaseModel): items: list[StudyTopicResponse] total: int class StudyTopicSessionSummary(BaseModel): """상세 뷰의 세션 카드 페이로드 — 통합 뷰 렌더용 최소 필드.""" id: int study_type: str certification: str | None language_code: str | None learning_level: str | None subject: str | None topic: str | None mode: str repetition_count: int review_state: str | None created_at: datetime updated_at: datetime class StudyTopicDocumentSummary(BaseModel): """상세 뷰의 자료 카드 페이로드. library_paths: 통합뷰 자료 섹션의 카테고리 트리 그룹핑용. user_tags 의 `@library/` 태그에서 prefix 제거한 path 들. 한 자료가 여러 카테고리에 속할 수 있으나(다중 분류), 트리 그룹핑 시 첫 번째 path 만 사용. 빈 리스트면 "분류 없음" 그룹. """ id: int title: str | None file_format: str file_type: str category: str | None ai_domain: str | None importance: str | None sort_order: int linked_at: datetime library_paths: list[str] = [] class StudyTopicQuestionSummary(BaseModel): """상세 뷰의 문제 카드 페이로드 — 정답·해설 비공개, 본문 80자 truncate.""" id: int question_text: str subject: str | None scope: str | None exam_name: str | None exam_round: str | None exam_question_number: int | None # PR-11: 회차별 그룹 안 정렬 키 is_active: bool attempt_count: int last_correct: bool | None created_at: datetime class StudyTopicSections(BaseModel): """확장 친화 dict — 향후 audio_assets / vocab_decks 키 추가 가능.""" sessions: list[StudyTopicSessionSummary] documents: list[StudyTopicDocumentSummary] questions: list[StudyTopicQuestionSummary] = [] class StudyTopicStats(BaseModel): """자산 카운트 — 0 으로 미리 노출해서 후속 PR 에서 필드만 채움. question_count = PR-2 단일 문제 수. question_set_count = 후속 PR 의 회차/모의고사 묶음 수 (둘은 의미 다름). """ session_count: int document_count: int question_count: int = 0 audio_count: int = 0 vocab_count: int = 0 question_set_count: int = 0 class StudyTopicMeta(BaseModel): id: int name: str description: str | None color: str | None study_type: str | None sort_order: int # PR-6: 시험 메타 exam_round_size: int | None = None exam_subjects: list[str] = [] # 공부 암기노트: 공부중 태그 상태 focused: bool = False created_at: datetime updated_at: datetime class StudyTopicDetailResponse(BaseModel): topic: StudyTopicMeta sections: StudyTopicSections stats: StudyTopicStats class StudyTopicDocumentLinkRequest(BaseModel): document_ids: list[int] = Field(min_length=1, max_length=100) class StudyTopicDocumentLinkResponse(BaseModel): linked: list[int] skipped_existing: list[int] skipped_not_found: list[int] class StudyTopicDocumentByPathRequest(BaseModel): """카테고리 트리 노드 일괄 추가용. path prefix 매칭으로 하위 자료 모두 매핑. include_subtree=True 면 path 와 그 하위 카테고리 전체, False 면 path 정확 일치만. """ path: str = Field(min_length=1, max_length=500) include_subtree: bool = True class StudyTopicDocumentByPathResponse(BaseModel): linked_count: int skipped_existing_count: int total_in_path: int path: str # ─── Helpers ─── def _verify_topic_ownership(topic: StudyTopic | None, user: User) -> StudyTopic: """소유자 검증 + soft-deleted 행 차단. mismatch 도 404 (정보 누설 방지).""" if topic is None or topic.user_id != user.id or topic.deleted_at is not None: raise HTTPException(status_code=404, detail="학습 주제를 찾을 수 없습니다") return topic def _meta_from_topic(t: StudyTopic) -> StudyTopicMeta: return StudyTopicMeta( id=t.id, name=t.name, description=t.description, color=t.color, study_type=t.study_type, sort_order=t.sort_order, exam_round_size=t.exam_round_size, exam_subjects=t.exam_subjects or [], created_at=t.created_at, updated_at=t.updated_at, ) # ─── 엔드포인트 ─── @router.get("/", response_model=StudyTopicListResponse) async def list_study_topics( user: Annotated[User, Depends(get_current_user)], session: Annotated[AsyncSession, Depends(get_session)], limit: int = Query(100, ge=1, le=500), offset: int = Query(0, ge=0), ): """사용자의 active 주제 목록. 세션 수·자료 수 집계 포함.""" # active 주제 + 세션 수 (LEFT JOIN study_sessions) sess_count_sub = ( select( StudySession.study_topic_id.label("topic_id"), func.count().label("c"), ) .where(StudySession.user_id == user.id) .where(StudySession.study_topic_id.is_not(None)) .group_by(StudySession.study_topic_id) .subquery() ) doc_count_sub = ( select( StudyTopicDocument.study_topic_id.label("topic_id"), func.count().label("c"), ) .where(StudyTopicDocument.user_id == user.id) .group_by(StudyTopicDocument.study_topic_id) .subquery() ) # PR-2 — 활성 문제 카운트 (soft-deleted 제외) from models.study_question import StudyQuestion as _SQ q_count_sub = ( select( _SQ.study_topic_id.label("topic_id"), func.count().label("c"), ) .where(_SQ.user_id == user.id, _SQ.deleted_at.is_(None)) .group_by(_SQ.study_topic_id) .subquery() ) base = ( select( StudyTopic, func.coalesce(sess_count_sub.c.c, 0).label("session_count"), func.coalesce(doc_count_sub.c.c, 0).label("document_count"), func.coalesce(q_count_sub.c.c, 0).label("question_count"), ) .outerjoin(sess_count_sub, sess_count_sub.c.topic_id == StudyTopic.id) .outerjoin(doc_count_sub, doc_count_sub.c.topic_id == StudyTopic.id) .outerjoin(q_count_sub, q_count_sub.c.topic_id == StudyTopic.id) .where(StudyTopic.user_id == user.id, StudyTopic.deleted_at.is_(None)) .order_by(StudyTopic.sort_order.asc(), StudyTopic.id.desc()) ) total_query = select(func.count()).select_from( select(StudyTopic.id) .where(StudyTopic.user_id == user.id, StudyTopic.deleted_at.is_(None)) .subquery() ) total = (await session.execute(total_query)).scalar() or 0 rows = (await session.execute(base.offset(offset).limit(limit))).all() items = [ StudyTopicResponse( id=t.id, name=t.name, description=t.description, color=t.color, study_type=t.study_type, sort_order=t.sort_order, session_count=int(sc), document_count=int(dc), question_count=int(qc), exam_round_size=t.exam_round_size, exam_subjects=t.exam_subjects or [], created_at=t.created_at, updated_at=t.updated_at, ) for t, sc, dc, qc in rows ] return StudyTopicListResponse(items=items, total=total) class ExamRoundProgress(BaseModel): exam_round: str question_count: int max_question_number: int # 0 = exam_question_number 가 모두 NULL next_question_number: int | None # 도달 시 None is_complete: bool class ExamRoundsResponse(BaseModel): exam_round_size: int | None # 토픽 메타. NULL = 미설정 items: list[ExamRoundProgress] @router.get("/{topic_id}/exam-rounds", response_model=ExamRoundsResponse) async def list_exam_rounds( topic_id: int, user: Annotated[User, Depends(get_current_user)], session: Annotated[AsyncSession, Depends(get_session)], ): """토픽 안 회차별 진행률 집계. 진행률 표시 + 재개 UX 용. is_complete = (exam_round_size IS NOT NULL AND question_count >= exam_round_size). next_question_number = max + 1 (도달 시 NULL). """ topic = await session.get(StudyTopic, topic_id) _verify_topic_ownership(topic, user) from models.study_question import StudyQuestion as _SQ rows = ( await session.execute( select( _SQ.exam_round, func.count().label("question_count"), func.coalesce(func.max(_SQ.exam_question_number), 0).label("max_qnum"), ) .where( _SQ.user_id == user.id, _SQ.study_topic_id == topic_id, _SQ.deleted_at.is_(None), _SQ.exam_round.is_not(None), ) .group_by(_SQ.exam_round) .order_by(_SQ.exam_round.desc()) ) ).all() size = topic.exam_round_size items: list[ExamRoundProgress] = [] for r in rows: cnt = int(r.question_count) mx = int(r.max_qnum) is_complete = bool(size is not None and cnt >= size) # 도달했으면 next None, 아니면 max+1 (max=0 이면 1번부터) next_n = None if is_complete else (mx + 1 if mx > 0 else 1) items.append(ExamRoundProgress( exam_round=r.exam_round, question_count=cnt, max_question_number=mx, next_question_number=next_n, is_complete=is_complete, )) return ExamRoundsResponse(exam_round_size=size, items=items) @router.get("/by-document/{document_id}", response_model=list[StudyTopicMeta]) async def list_topics_for_document( document_id: int, user: Annotated[User, Depends(get_current_user)], session: Annotated[AsyncSession, Depends(get_session)], ): """이 자료가 매핑된 active 주제 목록. /study/sources 카드의 "이 자료가 속한 주제" 배지·컨텍스트 메뉴용. """ rows = ( await session.execute( select(StudyTopic) .join(StudyTopicDocument, StudyTopicDocument.study_topic_id == StudyTopic.id) .where( StudyTopicDocument.document_id == document_id, StudyTopicDocument.user_id == user.id, StudyTopic.deleted_at.is_(None), ) .order_by(StudyTopic.sort_order.asc(), StudyTopic.id.desc()) ) ).scalars().all() return [_meta_from_topic(t) for t in rows] @router.post("/", response_model=StudyTopicResponse, status_code=201) async def create_study_topic( body: StudyTopicCreate, user: Annotated[User, Depends(get_current_user)], session: Annotated[AsyncSession, Depends(get_session)], ): """주제 생성. 같은 이름의 active 주제가 이미 있으면 409. soft-deleted 동명 주제는 partial unique index 에서 빠지므로 신규 생성 가능. """ topic = StudyTopic( user_id=user.id, name=body.name.strip(), description=body.description, color=body.color, study_type=body.study_type, sort_order=body.sort_order, exam_round_size=body.exam_round_size, exam_subjects=body.exam_subjects or [], ) session.add(topic) try: await session.flush() await session.commit() except IntegrityError: await session.rollback() raise HTTPException(status_code=409, detail="이미 같은 이름의 학습 주제가 있습니다") return StudyTopicResponse( id=topic.id, name=topic.name, description=topic.description, color=topic.color, study_type=topic.study_type, sort_order=topic.sort_order, session_count=0, document_count=0, question_count=0, exam_round_size=topic.exam_round_size, exam_subjects=topic.exam_subjects or [], created_at=topic.created_at, updated_at=topic.updated_at, ) @router.get("/{topic_id}", response_model=StudyTopicDetailResponse) async def get_study_topic( topic_id: int, user: Annotated[User, Depends(get_current_user)], session: Annotated[AsyncSession, Depends(get_session)], ): """통합 뷰: 주제 메타 + 세션 목록 + 자료 목록 + stats. 응답 형태는 후속 PR(오디오/단어장/문제세트 추가) 에서 sections / stats 키만 늘리면 되도록 dict 구조. """ topic = await session.get(StudyTopic, topic_id) topic = _verify_topic_ownership(topic, user) # 세션 목록 — 최근순 sess_rows = ( await session.execute( select(StudySession) .where( StudySession.user_id == user.id, StudySession.study_topic_id == topic_id, ) .order_by(StudySession.created_at.desc(), StudySession.id.desc()) ) ).scalars().all() sessions_payload = [ StudyTopicSessionSummary( id=s.id, study_type=s.study_type, certification=s.certification, language_code=s.language_code, learning_level=s.learning_level, subject=s.subject, topic=s.topic, mode=s.mode, repetition_count=s.repetition_count, review_state=s.review_state, created_at=s.created_at, updated_at=s.updated_at, ) for s in sess_rows ] # 자료 목록 — 매핑 sort_order → created_at desc doc_rows = ( await session.execute( select(Document, StudyTopicDocument) .join( StudyTopicDocument, and_( StudyTopicDocument.document_id == Document.id, StudyTopicDocument.study_topic_id == topic_id, StudyTopicDocument.user_id == user.id, ), ) .where(Document.deleted_at.is_(None)) .order_by( StudyTopicDocument.sort_order.asc(), StudyTopicDocument.created_at.desc(), ) ) ).all() def _extract_library_paths(tags: list | None) -> list[str]: if not tags: return [] out: list[str] = [] for t in tags: if isinstance(t, str) and t.startswith(LIBRARY_PREFIX): out.append(t[len(LIBRARY_PREFIX):]) return out documents_payload = [ StudyTopicDocumentSummary( id=d.id, title=d.title, file_format=d.file_format, file_type=str(d.file_type) if d.file_type is not None else "immutable", category=str(d.category) if d.category is not None else None, ai_domain=d.ai_domain, importance=d.importance, sort_order=link.sort_order, linked_at=link.created_at, library_paths=_extract_library_paths(d.user_tags), ) for d, link in doc_rows ] # 문제 목록 (PR-2) — 본문 truncate, 정답·해설 비공개. attempt 통계 batch 로 끌어옴. from models.study_question import StudyQuestion, StudyQuestionAttempt q_rows = ( await session.execute( select(StudyQuestion) .where( StudyQuestion.user_id == user.id, StudyQuestion.study_topic_id == topic_id, StudyQuestion.deleted_at.is_(None), ) .order_by(StudyQuestion.created_at.desc(), StudyQuestion.id.desc()) ) ).scalars().all() qids = [q.id for q in q_rows] q_attempt_count: dict[int, int] = {} q_last_correct: dict[int, bool] = {} if qids: from sqlalchemy import case as _case cnt_rows = ( await session.execute( select( StudyQuestionAttempt.study_question_id, func.count().label("total"), ) .where( StudyQuestionAttempt.user_id == user.id, StudyQuestionAttempt.study_question_id.in_(qids), ) .group_by(StudyQuestionAttempt.study_question_id) ) ).all() for r in cnt_rows: q_attempt_count[r.study_question_id] = int(r.total) latest_rows = ( await session.execute( select( StudyQuestionAttempt.study_question_id, StudyQuestionAttempt.is_correct, ) .where( StudyQuestionAttempt.user_id == user.id, StudyQuestionAttempt.study_question_id.in_(qids), ) .order_by( StudyQuestionAttempt.study_question_id, StudyQuestionAttempt.answered_at.desc(), ) .distinct(StudyQuestionAttempt.study_question_id) ) ).all() for r in latest_rows: q_last_correct[r.study_question_id] = bool(r.is_correct) def _truncate_q(text: str, n: int = 80) -> str: return text if len(text) <= n else text[:n].rstrip() + "…" questions_payload = [ StudyTopicQuestionSummary( id=q.id, question_text=_truncate_q(q.question_text, 80), subject=q.subject, scope=q.scope, exam_name=q.exam_name, exam_round=q.exam_round, exam_question_number=q.exam_question_number, is_active=q.is_active, attempt_count=q_attempt_count.get(q.id, 0), last_correct=q_last_correct.get(q.id), created_at=q.created_at, ) for q in q_rows ] return StudyTopicDetailResponse( topic=_meta_from_topic(topic), sections=StudyTopicSections( sessions=sessions_payload, documents=documents_payload, questions=questions_payload, ), stats=StudyTopicStats( session_count=len(sessions_payload), document_count=len(documents_payload), question_count=len(questions_payload), # 후속 PR 에서 채움 audio_count=0, vocab_count=0, question_set_count=0, ), ) @router.patch("/{topic_id}", response_model=StudyTopicResponse) async def update_study_topic( topic_id: int, body: StudyTopicUpdate, user: Annotated[User, Depends(get_current_user)], session: Annotated[AsyncSession, Depends(get_session)], ): topic = await session.get(StudyTopic, topic_id) topic = _verify_topic_ownership(topic, user) fields_set = body.model_fields_set if "name" in fields_set and body.name is not None: topic.name = body.name.strip() for fname in {"description", "color", "study_type", "sort_order"} & fields_set: setattr(topic, fname, getattr(body, fname)) # PR-6: 시험 메타. exam_subjects 가 None 이면 (PATCH 미명시) 기존 유지, 빈 배열은 명시적 클리어 if "exam_round_size" in fields_set: topic.exam_round_size = body.exam_round_size if "exam_subjects" in fields_set and body.exam_subjects is not None: topic.exam_subjects = body.exam_subjects # 공부 암기노트: 공부중 태그 토글 (focused_at IS NOT NULL = reminder/세션 대상) if "focused" in fields_set: topic.focused_at = datetime.now(timezone.utc) if body.focused else None topic.updated_at = datetime.now(timezone.utc) try: await session.commit() except IntegrityError: await session.rollback() raise HTTPException(status_code=409, detail="이미 같은 이름의 학습 주제가 있습니다") # 응답용 카운트 별도 조회 sc = (await session.execute( select(func.count()) .select_from(StudySession) .where(StudySession.user_id == user.id, StudySession.study_topic_id == topic.id) )).scalar() or 0 dc = (await session.execute( select(func.count()) .select_from(StudyTopicDocument) .where(StudyTopicDocument.study_topic_id == topic.id) )).scalar() or 0 from models.study_question import StudyQuestion as _SQ2 qc = (await session.execute( select(func.count()) .select_from(_SQ2) .where( _SQ2.user_id == user.id, _SQ2.study_topic_id == topic.id, _SQ2.deleted_at.is_(None), ) )).scalar() or 0 return StudyTopicResponse( id=topic.id, name=topic.name, description=topic.description, color=topic.color, study_type=topic.study_type, sort_order=topic.sort_order, session_count=int(sc), document_count=int(dc), question_count=int(qc), exam_round_size=topic.exam_round_size, exam_subjects=topic.exam_subjects or [], focused=topic.focused_at is not None, created_at=topic.created_at, updated_at=topic.updated_at, ) @router.delete("/{topic_id}", status_code=204) async def delete_study_topic( topic_id: int, user: Annotated[User, Depends(get_current_user)], session: Annotated[AsyncSession, Depends(get_session)], ): """주제 soft delete. 세션의 study_topic_id 는 SET NULL, 문서 매핑은 cascade 제거. 같은 이름 재생성을 위해 partial unique index 가 deleted_at IS NULL 인 행만 본다. """ topic = await session.get(StudyTopic, topic_id) topic = _verify_topic_ownership(topic, user) # 세션 FK SET NULL — 명시적으로 (DB ON DELETE SET NULL 은 hard delete 시에만 동작) await session.execute( update(StudySession) .where( StudySession.user_id == user.id, StudySession.study_topic_id == topic_id, ) .values(study_topic_id=None) ) # 문서 매핑 명시 제거 (soft delete 라 FK CASCADE 안 탐) await session.execute( delete(StudyTopicDocument).where( StudyTopicDocument.study_topic_id == topic_id, ) ) topic.deleted_at = datetime.now(timezone.utc) await session.commit() # ─── 자료(문서) 매핑 ─── @router.post( "/{topic_id}/documents", response_model=StudyTopicDocumentLinkResponse, status_code=201, ) async def link_documents_to_topic( topic_id: int, body: StudyTopicDocumentLinkRequest, user: Annotated[User, Depends(get_current_user)], session: Annotated[AsyncSession, Depends(get_session)], ): """자료 N개를 주제에 일괄 매핑. 이미 연결된 건 skipped_existing 으로 보고.""" topic = await session.get(StudyTopic, topic_id) topic = _verify_topic_ownership(topic, user) requested = list(dict.fromkeys(body.document_ids)) # 중복 제거 + 순서 보존 # 존재 + 미삭제 documents 만 통과 (single-user 시스템: documents.user_id 부재 가능) valid_rows = ( await session.execute( select(Document.id).where( Document.id.in_(requested), Document.deleted_at.is_(None), ) ) ).scalars().all() valid_set = set(valid_rows) not_found = [d for d in requested if d not in valid_set] # 이미 매핑된 것 existing_rows = ( await session.execute( select(StudyTopicDocument.document_id).where( StudyTopicDocument.study_topic_id == topic_id, StudyTopicDocument.document_id.in_(valid_set), ) ) ).scalars().all() existing_set = set(existing_rows) to_link = [d for d in requested if d in valid_set and d not in existing_set] # 기존 max sort_order 다음부터 max_sort = ( await session.execute( select(func.coalesce(func.max(StudyTopicDocument.sort_order), -1)) .where(StudyTopicDocument.study_topic_id == topic_id) ) ).scalar() or -1 for i, doc_id in enumerate(to_link, start=1): session.add(StudyTopicDocument( study_topic_id=topic_id, document_id=doc_id, user_id=user.id, sort_order=int(max_sort) + i, )) try: await session.commit() except IntegrityError: await session.rollback() raise HTTPException(status_code=409, detail="자료 매핑 중 충돌이 발생했습니다") return StudyTopicDocumentLinkResponse( linked=to_link, skipped_existing=sorted(existing_set), skipped_not_found=not_found, ) @router.post( "/{topic_id}/documents/by-path", response_model=StudyTopicDocumentByPathResponse, status_code=201, ) async def link_documents_by_library_path( topic_id: int, body: StudyTopicDocumentByPathRequest, user: Annotated[User, Depends(get_current_user)], session: Annotated[AsyncSession, Depends(get_session)], ): """자료실 카테고리 path 기준으로 하위 자료를 일괄 매핑. /api/documents/library?path=... 와 동일한 user_tags(@library/...) prefix 매칭 사용. 100건 limit 우회 — 한 카테고리에 자료가 많아도 한 번에 처리. """ topic = await session.get(StudyTopic, topic_id) topic = _verify_topic_ownership(topic, user) try: normalized = normalize_library_path(body.path) except ValueError as e: raise HTTPException(status_code=400, detail=str(e)) exact_tag = f"{LIBRARY_PREFIX}{normalized}" prefix_tag = f"{LIBRARY_PREFIX}{normalized}/%" # @library/ 정확 일치 또는 (subtree 옵션 시) 그 하위까지 prefix 매칭 if body.include_subtree: tag_filter = sql_text(""" EXISTS ( SELECT 1 FROM jsonb_array_elements_text(documents.user_tags) AS t WHERE t = :exact OR t LIKE :prefix ) """).bindparams(exact=exact_tag, prefix=prefix_tag) else: tag_filter = sql_text(""" EXISTS ( SELECT 1 FROM jsonb_array_elements_text(documents.user_tags) AS t WHERE t = :exact ) """).bindparams(exact=exact_tag) candidate_rows = ( await session.execute( select(Document.id).where( Document.deleted_at.is_(None), Document.category == "library", tag_filter, ) ) ).scalars().all() candidate_set = set(candidate_rows) total_in_path = len(candidate_set) if total_in_path == 0: return StudyTopicDocumentByPathResponse( linked_count=0, skipped_existing_count=0, total_in_path=0, path=normalized, ) existing_rows = ( await session.execute( select(StudyTopicDocument.document_id).where( StudyTopicDocument.study_topic_id == topic_id, StudyTopicDocument.document_id.in_(candidate_set), ) ) ).scalars().all() existing_set = set(existing_rows) to_link = [d for d in candidate_rows if d not in existing_set] max_sort = ( await session.execute( select(func.coalesce(func.max(StudyTopicDocument.sort_order), -1)) .where(StudyTopicDocument.study_topic_id == topic_id) ) ).scalar() or -1 for i, doc_id in enumerate(to_link, start=1): session.add(StudyTopicDocument( study_topic_id=topic_id, document_id=doc_id, user_id=user.id, sort_order=int(max_sort) + i, )) try: await session.commit() except IntegrityError: await session.rollback() raise HTTPException(status_code=409, detail="자료 매핑 중 충돌이 발생했습니다") return StudyTopicDocumentByPathResponse( linked_count=len(to_link), skipped_existing_count=len(existing_set), total_in_path=total_in_path, path=normalized, ) @router.delete("/{topic_id}/documents/{document_id}", status_code=204) async def unlink_document_from_topic( topic_id: int, document_id: int, user: Annotated[User, Depends(get_current_user)], session: Annotated[AsyncSession, Depends(get_session)], ): topic = await session.get(StudyTopic, topic_id) _verify_topic_ownership(topic, user) result = await session.execute( delete(StudyTopicDocument).where( StudyTopicDocument.study_topic_id == topic_id, StudyTopicDocument.document_id == document_id, StudyTopicDocument.user_id == user.id, ) ) await session.commit() if result.rowcount == 0: raise HTTPException(status_code=404, detail="매핑을 찾을 수 없습니다") # ─── 세션 매핑 ─── @router.post("/{topic_id}/sessions/{session_id}", status_code=204) async def attach_session_to_topic( topic_id: int, session_id: int, user: Annotated[User, Depends(get_current_user)], session: Annotated[AsyncSession, Depends(get_session)], ): """기존 세션을 주제에 연결. 다른 주제에 이미 연결돼 있으면 새 주제로 갱신 (1:N).""" topic = await session.get(StudyTopic, topic_id) _verify_topic_ownership(topic, user) sess = await session.get(StudySession, session_id) if sess is None or sess.user_id != user.id: raise HTTPException(status_code=404, detail="학습 세션을 찾을 수 없습니다") sess.study_topic_id = topic_id sess.updated_at = datetime.now(timezone.utc) await session.commit() @router.delete("/{topic_id}/sessions/{session_id}", status_code=204) async def detach_session_from_topic( topic_id: int, session_id: int, user: Annotated[User, Depends(get_current_user)], session: Annotated[AsyncSession, Depends(get_session)], ): """세션 분리. 현재 study_topic_id 가 topic_id 가 아니면 404.""" topic = await session.get(StudyTopic, topic_id) _verify_topic_ownership(topic, user) sess = await session.get(StudySession, session_id) if sess is None or sess.user_id != user.id: raise HTTPException(status_code=404, detail="학습 세션을 찾을 수 없습니다") if sess.study_topic_id != topic_id: raise HTTPException(status_code=404, detail="해당 주제에 연결된 세션이 아닙니다") sess.study_topic_id = None sess.updated_at = datetime.now(timezone.utc) await session.commit() # ─── PR-9: 분야 설명 (study_topic_subject_notes) ─── SUBJECT_NOTE_TIMEOUT_S = 30.0 _SUBJECT_NOTE_PROMPT_PATH = "study_subject_note.txt" _subject_note_prompt_cache: str | None = None def _load_subject_note_prompt() -> str: global _subject_note_prompt_cache if _subject_note_prompt_cache is None: prompts_dir = _Path(__file__).resolve().parent.parent / "prompts" _subject_note_prompt_cache = (prompts_dir / _SUBJECT_NOTE_PROMPT_PATH).read_text(encoding="utf-8") return _subject_note_prompt_cache def _render_subject_note_prompt(subject: str, scope: str, doc_block: str, q_block: str) -> str: template = _load_subject_note_prompt() return ( template .replace("{subject}", subject) .replace("{scope}", scope or "(미지정)") .replace("{documents_evidence_block}", doc_block) .replace("{questions_evidence_block}", q_block) ) class SubjectNoteRequest(BaseModel): subject: str = Field(min_length=1, max_length=120) scope: str = Field(default="", max_length=200) regenerate: bool = False class SubjectNoteEvidence(BaseModel): source_type: str source_id: int title: str snippet: str class SubjectNoteResponse(BaseModel): subject: str scope: str content: str | None status: str # ready | failed | none | stale | pending generated_at: datetime | None model: str | None evidence: list[SubjectNoteEvidence] = [] from_cache: bool = False can_regenerate: bool = True def _note_cache_response(note: StudyTopicSubjectNote) -> SubjectNoteResponse: return SubjectNoteResponse( subject=note.subject, scope=note.scope, content=note.content, status=note.status, generated_at=note.generated_at, model=note.model, evidence=[], from_cache=True, can_regenerate=True, ) @router.get("/{topic_id}/subject-notes", response_model=SubjectNoteResponse) async def get_subject_note( topic_id: int, user: Annotated[User, Depends(get_current_user)], session: Annotated[AsyncSession, Depends(get_session)], subject: str = Query(..., min_length=1, max_length=120), scope: str = Query("", max_length=200), ): """캐시 조회. 없으면 status='none' + content=null 응답.""" topic = await session.get(StudyTopic, topic_id) _verify_topic_ownership(topic, user) note = ( await session.execute( select(StudyTopicSubjectNote).where( StudyTopicSubjectNote.user_id == user.id, StudyTopicSubjectNote.study_topic_id == topic_id, StudyTopicSubjectNote.subject == subject, StudyTopicSubjectNote.scope == scope, ) ) ).scalar_one_or_none() if note is None: return SubjectNoteResponse( subject=subject, scope=scope, content=None, status="none", generated_at=None, model=None, evidence=[], from_cache=True, can_regenerate=True, ) return _note_cache_response(note) @router.post("/{topic_id}/subject-notes/generate", response_model=SubjectNoteResponse) async def generate_subject_note( topic_id: int, body: SubjectNoteRequest, user: Annotated[User, Depends(get_current_user)], session: Annotated[AsyncSession, Depends(get_session)], ): """분야 설명 AI 생성 + 캐시. PR-3 race-safe pending 패턴 동일. regenerate=false + status=ready → 캐시 반환. pending → 409. 그 외 → 새 생성. 실패 시 status='failed', 직전 본문 보존. """ topic = await session.get(StudyTopic, topic_id) _verify_topic_ownership(topic, user) # upsert: 기존 행 있으면 사용, 없으면 신규 note = ( await session.execute( select(StudyTopicSubjectNote).where( StudyTopicSubjectNote.user_id == user.id, StudyTopicSubjectNote.study_topic_id == topic_id, StudyTopicSubjectNote.subject == body.subject, StudyTopicSubjectNote.scope == body.scope, ) ) ).scalar_one_or_none() if note is None: note = StudyTopicSubjectNote( user_id=user.id, study_topic_id=topic_id, subject=body.subject, scope=body.scope, status="none", ) session.add(note) await session.flush() await session.commit() else: # 캐시 단축 if not body.regenerate: if note.status == "ready": return _note_cache_response(note) if note.status == "pending": raise HTTPException( status_code=409, detail={"status": "pending", "detail": "이미 생성 중입니다"}, ) # none/failed/stale → 새로 생성 else: if note.status == "pending": raise HTTPException( status_code=409, detail={"status": "pending", "detail": "이미 생성 중입니다"}, ) # race-safe pending 전이 lock = await session.execute( update(StudyTopicSubjectNote) .where( StudyTopicSubjectNote.id == note.id, StudyTopicSubjectNote.status != "pending", ) .values(status="pending", updated_at=datetime.now(timezone.utc)) .returning(StudyTopicSubjectNote.id) ) if lock.scalar_one_or_none() is None: raise HTTPException( status_code=409, detail={"status": "pending", "detail": "이미 생성 중입니다"}, ) await session.commit() # RAG try: ctx = await gather_subject_note_context(session, user.id, topic_id, body.subject, body.scope) except Exception as e: logger.warning("subject_note_rag_failed: %s: %s", type(e).__name__, e) ctx = SubjectNoteContext(documents=[], questions=[]) doc_block = render_evidence_block(ctx.documents) q_block = render_evidence_block(ctx.questions) prompt = _render_subject_note_prompt(body.subject, body.scope, doc_block, q_block) ai_client = EidAIClient() # 이드 표면 = egress 코드층 박탈(call_primary only, W4-1) raw_text: str | None = None try: async with acquire_mlx_gate(Priority.FOREGROUND): async with asyncio.timeout(SUBJECT_NOTE_TIMEOUT_S): # 이드 substrate(persona+rules)=system / 렌더 템플릿(지시+evidence)=user (W2-2) raw_text = await ai_client.call_primary( prompt, system=compose("study_subject_note", task="") ) except asyncio.TimeoutError: logger.warning("subject_note_mlx_timeout topic=%s subject=%s", topic_id, body.subject) except Exception: logger.exception("subject_note_mlx_failed topic=%s subject=%s", topic_id, body.subject) finally: await ai_client.close() note = await session.get(StudyTopicSubjectNote, note.id) if not raw_text or not raw_text.strip(): note.status = "failed" note.updated_at = datetime.now(timezone.utc) await session.commit() return SubjectNoteResponse( subject=note.subject, scope=note.scope, content=note.content, status="failed", generated_at=note.generated_at, model=note.model, evidence=[e.to_dict() for e in ctx.all], from_cache=False, can_regenerate=True, ) cleaned = strip_thinking(raw_text).strip() note.content = cleaned note.status = "ready" note.generated_at = datetime.now(timezone.utc) primary_name = ai_client.ai.primary.model if hasattr(ai_client.ai.primary, "model") else "primary" note.model = f"mlx:{primary_name}" note.updated_at = note.generated_at await session.commit() return SubjectNoteResponse( subject=note.subject, scope=note.scope, content=note.content, status="ready", generated_at=note.generated_at, model=note.model, evidence=[e.to_dict() for e in ctx.all], from_cache=False, can_regenerate=True, ) # ─── 이드 W3-2: 학습 약점 진단 (study_diagnosis surface) ─── # # 워커(study_weakness)가 산출한 최신 eid_study_weakness 스냅샷을 '학습 진단 코치'(study overlay) # 로 번역. 약점/태도 '판정'은 코드 derived(스냅샷) — LLM 은 스냅샷 블록 값만 인용(환각 약점 차단). # compose("study_diagnosis") = persona+rules+study overlay(+{placeholder}) → 표면이 블록 substitute. DIAGNOSIS_TIMEOUT_S = 40.0 class StudyDiagnosisResponse(BaseModel): status: str # ready | none content: str | None = None model: str | None = None generated_at: datetime | None = None snapshot_at: datetime | None = None review_set_draft_id: int | None = None @router.post("/diagnosis/generate", response_model=StudyDiagnosisResponse) async def generate_study_diagnosis( user: Annotated[User, Depends(get_current_user)], session: Annotated[AsyncSession, Depends(get_session)], ): """누적 학습 약점/태도 진단(학습 진단 코치). 최신 약점 스냅샷을 코치 언어로 번역만. 워커 미가동(스냅샷 부재)이면 status='none' — '아직 진단 데이터 없음' 명시(빈약속/추측 회피). """ snap = ( await session.execute( select(EidStudyWeakness) .where(EidStudyWeakness.user_id == user.id, EidStudyWeakness.status == "active") .order_by(EidStudyWeakness.created_at.desc()) .limit(1) ) ).scalar_one_or_none() if snap is None: return StudyDiagnosisResponse(status="none") draft = ( await session.execute( select(EidReviewSetDraft) .where( EidReviewSetDraft.user_id == user.id, EidReviewSetDraft.source_weakness_id == snap.id, # 이 스냅샷이 산출한 draft만(W3 review #5) ) .order_by(EidReviewSetDraft.created_at.desc()) .limit(1) ) ).scalar_one_or_none() weakness_block = format_weakness_block( snap.weaknesses or [], shallow_overall=snap.is_shallow_sample ) if draft is not None and draft.question_ids: weakness_block += ( f"\n《권장 복습세트 초안》 set #{draft.id} · {len(draft.question_ids)}문항 " f"(reason={draft.reason}) — 사용자 1클릭 확인 후에만 실제 편성. 자율 편성 금지." ) habit_block = format_habit_block(snap.habit_signals or {}) # compose 는 study overlay(placeholder 포함)를 system 에 넣음 → 표면이 placeholder 를 실데이터로 치환. composed = compose("study_diagnosis", task="") # fail-closed: overlay degrade(placeholder 부재)면 스냅샷 없이 LLM 돌릴 때 약점 날조 위험 → # 진단 생략(status='none'). weakness·habit 두 placeholder 다 확인(W3 review #4). if "{weakness_snapshot_block}" not in composed or "{habit_signal_block}" not in composed: logger.error( "study_diagnosis: study overlay degraded — placeholder 부재, 진단 생략(fail-closed) user=%s", user.id, ) return StudyDiagnosisResponse(status="none") system = ( composed .replace("{weakness_snapshot_block}", weakness_block) .replace("{habit_signal_block}", habit_block) ) prompt = ( "누적 학습 이력을 근거로 내 약점 토픽과 학습 태도를 진단해줘. " "위 《약점 스냅샷》·《태도 신호》 블록에 있는 값만 인용하고, 블록에 없는 토픽·수치·약점명은 " "만들지 마라. 약점 Top-N + 각 구체 근거 + (있으면) 권장 복습세트 초안을 제시하고, " "각 토픽의 tier 가 정한 강도를 넘기지 마라(라벨=방향, tier=긴급도)." ) ai_client = EidAIClient() # 이드 표면 = egress 코드층 박탈(call_primary only, W4-1) raw_text: str | None = None try: async with acquire_mlx_gate(Priority.FOREGROUND): async with asyncio.timeout(DIAGNOSIS_TIMEOUT_S): raw_text = await ai_client.call_primary(prompt, system=system) except asyncio.TimeoutError: logger.warning("study_diagnosis_mlx_timeout user=%s", user.id) except Exception: logger.exception("study_diagnosis_mlx_failed user=%s", user.id) finally: await ai_client.close() if not raw_text or not raw_text.strip(): raise HTTPException(status_code=503, detail="진단 생성 실패 (LLM)") primary_name = ai_client.ai.primary.model if hasattr(ai_client.ai.primary, "model") else "primary" return StudyDiagnosisResponse( status="ready", content=strip_thinking(raw_text).strip(), model=f"mlx:{primary_name}", generated_at=datetime.now(timezone.utc), snapshot_at=snap.source_generated_at, review_set_draft_id=draft.id if draft else None, ) # ─── PR-10: 문제풀이 세션 (quiz_session) lifecycle ─── # # 한 토픽당 in_progress 1개. 출제 시 session 행 생성 + question_ids 스냅샷. # 풀이 중 attempt 마다 cursor + count 증가 (study_questions.submit_attempt 가 같은 트랜잭션에서). # 마지막 문제 풀이 후 status='done' + finished_at 박힘. class QuizMode(str, Enum): """PR-12-B: 출제 모드. 1차는 random 만. PR-12-C 에서 frequent_focus / wrong_variants 추가 예약.""" random = "random" class QuizSessionStartRequest(BaseModel): target_per_subject: int = Field(default=20, ge=1, le=100) subject: str | None = Field(default=None, max_length=120) scope: str | None = Field(default=None, max_length=200) wrong_only: bool = False abandon_existing: bool = False # true 면 기존 in_progress 세션을 abandoned 로 마감 후 새로 quiz_mode: QuizMode = QuizMode.random # Phase 1-E: bucket + stage 비율 기반 선별. stage 설정 시 기존 _select_questions_for_topic # 우회하고 select_questions_for_quiz 사용. size 가 명시되면 size 만큼 출제 (subject 무관). stage: str | None = Field(default=None, pattern="^(intro|learning|pre_exam)$") size: int | None = Field(default=None, ge=1, le=200) # Phase 2-E: 복습함에서 명시 선택된 문제로 세션. 우선순위 stage > question_ids > 기존 경로. # 출제 순서는 클라이언트가 보낸 그대로 보존 (사용자 인지 순서). 한도 200, 중복 제거됨. question_ids: list[int] | None = Field(default=None) class QuizSessionSummary(BaseModel): """진행 카드 + 결과 카드 공통 (가벼운 메타). 문제 본문은 상세 endpoint 에서.""" id: int status: str cursor: int total: int correct_count: int wrong_count: int unsure_count: int target_per_subject: int subject_filter: str | None wrong_only: bool quiz_mode: str # PR-12-B: 세션 메타. 향후 모드별 정답률 통계용. subject_distribution: dict created_at: datetime updated_at: datetime finished_at: datetime | None # 결과 카드 헤더 "미확인 N건" 용 — done 세션에서만 의미. unreviewed_wrong_unsure_count: int = 0 # Phase 2-B: finalize 시점 스냅샷 (DB 영속화 — 세션 종료 후 변하지 않음). newly_correct_count: int = 0 relapsed_count: int = 0 recovered_count: int = 0 chronic_remaining_count: int = 0 # Phase 2-B: 동적 (지금 시점) 할 일 카운트. 결과 화면에서 다른 세션이 끼어도 fresh. pending_review_count: int = 0 chronic_count: int = 0 regressed_count: int = 0 class QuizSessionListResponse(BaseModel): active: QuizSessionSummary | None recent_done: list[QuizSessionSummary] async def _build_session_summary( s: StudyQuizSession, session: AsyncSession, *, include_progress_counts: bool = False, ) -> QuizSessionSummary: """status='done' 일 때 미확인 카운트 + (옵션) 지금 시점 progress 기반 할 일 카운트 계산. include_progress_counts=True 면 결과 화면 헤더용 동적 카운트 3종 (pending_review/chronic/regressed) SQL 3회 추가. 목록 endpoint 에서는 호출당 비용을 피하려고 default False. """ unreviewed = 0 if s.status == "done": row = ( await session.execute( select(func.count()) .select_from(StudyQuestionAttempt) .where( StudyQuestionAttempt.quiz_session_id == s.id, StudyQuestionAttempt.outcome.in_(("wrong", "unsure")), StudyQuestionAttempt.reviewed_at.is_(None), ) ) ).scalar() or 0 unreviewed = int(row) pending_review_count = 0 chronic_count = 0 regressed_count = 0 if include_progress_counts: from models.study_question_progress import StudyQuestionProgress pr_row = ( await session.execute( select(func.count()) .select_from(StudyQuestionProgress) .where( StudyQuestionProgress.user_id == s.user_id, StudyQuestionProgress.study_topic_id == s.study_topic_id, StudyQuestionProgress.last_outcome.in_(("wrong", "unsure")), sql_or( StudyQuestionProgress.last_reviewed_at.is_(None), and_( StudyQuestionProgress.last_reviewed_at.is_not(None), StudyQuestionProgress.last_attempted_at.is_not(None), StudyQuestionProgress.last_reviewed_at < StudyQuestionProgress.last_attempted_at, ), ), ) ) ).scalar() or 0 pending_review_count = int(pr_row) ch_row = ( await session.execute( select(func.count()) .select_from(StudyQuestionProgress) .where( StudyQuestionProgress.user_id == s.user_id, StudyQuestionProgress.study_topic_id == s.study_topic_id, StudyQuestionProgress.pattern_state == "chronic_wrong", ) ) ).scalar() or 0 chronic_count = int(ch_row) rg_row = ( await session.execute( select(func.count()) .select_from(StudyQuestionProgress) .where( StudyQuestionProgress.user_id == s.user_id, StudyQuestionProgress.study_topic_id == s.study_topic_id, StudyQuestionProgress.pattern_state == "regressed", ) ) ).scalar() or 0 regressed_count = int(rg_row) return QuizSessionSummary( id=s.id, status=s.status, cursor=s.cursor, total=len(s.question_ids or []), correct_count=s.correct_count, wrong_count=s.wrong_count, unsure_count=s.unsure_count, target_per_subject=s.target_per_subject, subject_filter=s.subject_filter, wrong_only=s.wrong_only, quiz_mode=s.quiz_mode, subject_distribution=s.subject_distribution or {}, created_at=s.created_at, updated_at=s.updated_at, finished_at=s.finished_at, unreviewed_wrong_unsure_count=unreviewed, newly_correct_count=s.newly_correct_count, relapsed_count=s.relapsed_count, recovered_count=s.recovered_count, chronic_remaining_count=s.chronic_remaining_count, pending_review_count=pending_review_count, chronic_count=chronic_count, regressed_count=regressed_count, ) async def _select_questions_for_topic( session: AsyncSession, user: User, topic_id: int, *, subject: str | None, scope: str | None, target_per_subject: int, wrong_only: bool, apply_spacing: bool = False, ) -> tuple[list[int], dict[str, int]]: """과목별 균등 추출 + (옵션) PR-12-B type spacing. apply_spacing=True 면 subject bucket 별로 buffer × SPACING_BUFFER_RATIO 만큼 뽑은 뒤 apply_type_spacing 으로 같은 유형 과밀 방지. 회차 무관. """ from services.study.related_types import ( SPACING_BUFFER_RATIO, apply_type_spacing, make_spacing_candidates_from_rows, ) base_filter = and_( StudyQuestion.user_id == user.id, StudyQuestion.study_topic_id == topic_id, StudyQuestion.deleted_at.is_(None), StudyQuestion.is_active.is_(True), ) wrong_qids: set[int] | None = None if wrong_only: latest = ( await session.execute( select( StudyQuestionAttempt.study_question_id, StudyQuestionAttempt.is_correct, ) .where( StudyQuestionAttempt.user_id == user.id, StudyQuestionAttempt.study_topic_id == topic_id, ) .order_by( StudyQuestionAttempt.study_question_id, StudyQuestionAttempt.answered_at.desc(), ) .distinct(StudyQuestionAttempt.study_question_id) ) ).all() wrong_qids = {r.study_question_id for r in latest if r.is_correct is False} if not wrong_qids: return [], {} if subject is not None: subjects: list[str | None] = [subject] else: subj_query = ( select(StudyQuestion.subject) .where(base_filter) .distinct() ) if scope is not None: subj_query = subj_query.where(StudyQuestion.scope == scope) if wrong_qids is not None: subj_query = subj_query.where(StudyQuestion.id.in_(wrong_qids)) subjects = [r[0] for r in (await session.execute(subj_query)).all()] if not subjects: return [], {} selected_ids: list[int] = [] distribution: dict[str, int] = {} for subj in subjects: if apply_spacing: # spacing buffer × ratio. subject bucket 안에서만 cosine 비교. buffer_limit = max(target_per_subject, int(target_per_subject * SPACING_BUFFER_RATIO)) sub_q = select( StudyQuestion.id, StudyQuestion.embedding, StudyQuestion.embedding_status, ).where(base_filter) else: sub_q = select(StudyQuestion.id).where(base_filter) buffer_limit = target_per_subject if subj is None: sub_q = sub_q.where(StudyQuestion.subject.is_(None)) else: sub_q = sub_q.where(StudyQuestion.subject == subj) if scope is not None: sub_q = sub_q.where(StudyQuestion.scope == scope) if wrong_qids is not None: sub_q = sub_q.where(StudyQuestion.id.in_(wrong_qids)) sub_q = sub_q.order_by(func.random()).limit(buffer_limit) rows = (await session.execute(sub_q)).all() if apply_spacing: # rows 가 _SpacingCandidate 의 row protocol 만족 (id/embedding/embedding_status). spacing_cands = make_spacing_candidates_from_rows(rows) spacing_res = await apply_type_spacing( spacing_cands, target=target_per_subject, subject_label=subj or "", ) picked = spacing_res.selected_ids else: picked = [r[0] for r in rows[:target_per_subject]] if picked: selected_ids.extend(picked) distribution[subj or ""] = len(picked) _random.shuffle(selected_ids) return selected_ids, distribution @router.get("/{topic_id}/quiz-sessions", response_model=QuizSessionListResponse) async def list_quiz_sessions( topic_id: int, user: Annotated[User, Depends(get_current_user)], session: Annotated[AsyncSession, Depends(get_session)], limit: int = Query(20, ge=1, le=100), ): """진행 중 + 최근 완료 N건. 통합뷰 진행/결과 카드용.""" topic = await session.get(StudyTopic, topic_id) _verify_topic_ownership(topic, user) active_row = ( await session.execute( select(StudyQuizSession).where( StudyQuizSession.user_id == user.id, StudyQuizSession.study_topic_id == topic_id, StudyQuizSession.status == "in_progress", ) ) ).scalar_one_or_none() active_summary = await _build_session_summary(active_row, session) if active_row else None done_rows = ( await session.execute( select(StudyQuizSession) .where( StudyQuizSession.user_id == user.id, StudyQuizSession.study_topic_id == topic_id, StudyQuizSession.status == "done", ) .order_by(StudyQuizSession.finished_at.desc().nulls_last()) .limit(limit) ) ).scalars().all() done = [await _build_session_summary(s, session) for s in done_rows] return QuizSessionListResponse(active=active_summary, recent_done=done) @router.post("/{topic_id}/quiz-sessions", response_model=QuizSessionSummary, status_code=201) async def start_quiz_session( topic_id: int, body: QuizSessionStartRequest, user: Annotated[User, Depends(get_current_user)], session: Annotated[AsyncSession, Depends(get_session)], ): """새 세션 시작. 기존 in_progress 가 있으면: - body.abandon_existing=false → 기존 세션 그대로 반환 (이어풀기). - body.abandon_existing=true → 기존을 abandoned 로 마감 후 새로 출제. """ topic = await session.get(StudyTopic, topic_id) _verify_topic_ownership(topic, user) existing = ( await session.execute( select(StudyQuizSession).where( StudyQuizSession.user_id == user.id, StudyQuizSession.study_topic_id == topic_id, StudyQuizSession.status == "in_progress", ) ) ).scalar_one_or_none() if existing is not None: if not body.abandon_existing: # 이어풀기 — 기존 그대로 반환. return await _build_session_summary(existing, session) existing.status = "abandoned" existing.updated_at = datetime.now(timezone.utc) await session.flush() # Phase 1-E: stage 명시 시 bucket + 비율 기반 선별 (단일 풀이 진입점 vision). # Phase 2-E: question_ids 명시 시 복습함 multi-select 경로 — 검증만 + 순서 보존. # stage 미명시 시 기존 subject bucket + spacing (PR-12-B 호환). if body.stage is not None: from services.study.quiz_selection import select_questions_for_quiz size = body.size or 100 try: qids, distribution = await select_questions_for_quiz( session, user_id=user.id, study_topic_id=topic_id, stage=body.stage, size=size, ) except ValueError as e: raise HTTPException(status_code=400, detail=str(e)) if not qids: raise HTTPException(status_code=400, detail="출제 가능한 문제가 없습니다") elif body.question_ids is not None: # 중복 제거 (순서 보존) seen: set[int] = set() wanted: list[int] = [] for qid in body.question_ids: if qid not in seen: seen.add(qid) wanted.append(qid) if not wanted: raise HTTPException(status_code=400, detail="문제를 1개 이상 선택해주세요") if len(wanted) > 200: raise HTTPException(status_code=400, detail="한 세션에 최대 200문제까지 선택 가능합니다") # 유효성 검증 — user × topic 소속 + 미삭제. soft-deleted 또는 다른 user/topic 의 qid 는 제거. valid_rows = ( await session.execute( select(StudyQuestion.id, StudyQuestion.subject) .where( StudyQuestion.id.in_(wanted), StudyQuestion.user_id == user.id, StudyQuestion.study_topic_id == topic_id, StudyQuestion.deleted_at.is_(None), ) ) ).all() valid_set = {r.id for r in valid_rows} qids = [qid for qid in wanted if qid in valid_set] if not qids: raise HTTPException(status_code=400, detail="선택한 문제가 이 주제에 존재하지 않습니다") # subject distribution 계산 (결과 카드 통계용) subject_by_id = {r.id: (r.subject or "(미분류)") for r in valid_rows} distribution: dict[str, int] = {} for qid in qids: sub = subject_by_id.get(qid, "(미분류)") distribution[sub] = distribution.get(sub, 0) + 1 else: # 기존 PR-12-B 경로: subject bucket + type spacing. apply_spacing = body.quiz_mode == QuizMode.random qids, distribution = await _select_questions_for_topic( session, user, topic_id, subject=body.subject, scope=body.scope, target_per_subject=body.target_per_subject, wrong_only=body.wrong_only, apply_spacing=apply_spacing, ) if not qids: raise HTTPException(status_code=400, detail="출제 가능한 문제가 없습니다") new_session = StudyQuizSession( user_id=user.id, study_topic_id=topic_id, target_per_subject=body.target_per_subject, subject_filter=body.subject, wrong_only=body.wrong_only, quiz_mode=body.quiz_mode.value, question_ids=qids, subject_distribution=distribution, status="in_progress", cursor=0, ) session.add(new_session) try: await session.commit() except IntegrityError: # partial unique idx 충돌 — 동시에 다른 호출이 in_progress 만든 경우. await session.rollback() existing = ( await session.execute( select(StudyQuizSession).where( StudyQuizSession.user_id == user.id, StudyQuizSession.study_topic_id == topic_id, StudyQuizSession.status == "in_progress", ) ) ).scalar_one_or_none() if existing is None: raise HTTPException(status_code=409, detail="quiz_session 충돌 — 다시 시도하세요") return await _build_session_summary(existing, session) return await _build_session_summary(new_session, session) class QuizSessionAttemptItem(BaseModel): """결과 카드 expand 시 카드별 메타 (attempt + 학습완료 상태).""" attempt_id: int question_id: int selected_choice: int | None correct_choice: int outcome: str answered_at: datetime reviewed_at: datetime | None class QuizSessionAnalysisOut(BaseModel): """Phase 4-B v1: 결과 화면 헤더 카드용. summary_md 박혔으면 본문 표시, 없으면 job_status / job_error_code 보고 placeholder 분기.""" summary_md: str | None confidence: str | None generated_at: datetime | None is_stale: bool job_status: str | None # 최신 job — 'pending'/'processing'/'completed'/'failed'/'skipped' job_error_code: str | None # 최신 job 의 error_code (실패/skip 사유 노출용) class QuizSessionDetailResponse(BaseModel): summary: QuizSessionSummary questions: list[dict] # ReviewQuestionItem 호환 shape attempts: list[QuizSessionAttemptItem] # 풀이된 것만 (cursor 까지) ai_session_analysis: QuizSessionAnalysisOut | None = None def _attempt_stats_dict_default() -> dict: return {"attempt_count": 0, "correct_count": 0, "wrong_count": 0} @router.get("/{topic_id}/quiz-sessions/{session_id}", response_model=QuizSessionDetailResponse) async def get_quiz_session( topic_id: int, session_id: int, user: Annotated[User, Depends(get_current_user)], session: Annotated[AsyncSession, Depends(get_session)], ): """풀이 화면 / 결과 화면 공용. questions 는 출제 순서. attempts 는 cursor 까지 풀이된 것만.""" topic = await session.get(StudyTopic, topic_id) _verify_topic_ownership(topic, user) qs = await session.get(StudyQuizSession, session_id) if qs is None or qs.user_id != user.id or qs.study_topic_id != topic_id: raise HTTPException(status_code=404, detail="quiz_session 을 찾을 수 없습니다") qids = list(qs.question_ids or []) questions_payload: list[dict] = [] if qids: rows = ( await session.execute( select(StudyQuestion).where(StudyQuestion.id.in_(qids)) ) ).scalars().all() by_id = {q.id: q for q in rows} # 이미지 batch — 기존 study_questions._images_for_questions_batch 재사용 (sort_order 기준). from api.study_questions import _images_for_questions_batch images_typed = await _images_for_questions_batch(session, qids) images_map: dict[int, list[dict]] = { qid: [it.model_dump() for it in items] for qid, items in images_typed.items() } # attempt count batch (per question) stat_rows = ( await session.execute( select( StudyQuestionAttempt.study_question_id, func.count().label("total"), func.coalesce( func.sum(case((StudyQuestionAttempt.is_correct.is_(True), 1), else_=0)), 0, ).label("correct"), ) .where( StudyQuestionAttempt.user_id == user.id, StudyQuestionAttempt.study_question_id.in_(qids), ) .group_by(StudyQuestionAttempt.study_question_id) ) ).all() stats_map: dict[int, dict] = {} for r in stat_rows: t = int(r.total) c = int(r.correct) stats_map[r.study_question_id] = { "attempt_count": t, "correct_count": c, "wrong_count": t - c } # 출제 순서 그대로 nest for qid in qids: q = by_id.get(qid) if q is None: continue questions_payload.append({ "id": q.id, "question_text": q.question_text, "choices": [ {"number": 1, "text": q.choice_1}, {"number": 2, "text": q.choice_2}, {"number": 3, "text": q.choice_3}, {"number": 4, "text": q.choice_4}, ], "subject": q.subject, "scope": q.scope, "exam_round": q.exam_round, "explanation": q.explanation, "stats": stats_map.get(q.id, _attempt_stats_dict_default()), "images": images_map.get(q.id, []), # done 세션 결과 화면용 — 정답 노출. in_progress 세션은 클라이언트가 cursor 까지만 표시. "correct_choice": q.correct_choice if qs.status == "done" else None, # Phase 4-A: 결과 헤더의 AI 풀이 진척 indicator 용. wrong/unsure 와 결합해 카운트. "ai_explanation_status": q.ai_explanation_status, }) # 이 세션의 attempts (cursor 까지). 같은 question 에 여러 attempts 있을 수 있지만, # 이 세션 내에서는 정상 흐름상 question 당 1개. quiz_session_id 기준으로 가져옴. attempt_rows = ( await session.execute( select(StudyQuestionAttempt) .where( StudyQuestionAttempt.user_id == user.id, StudyQuestionAttempt.quiz_session_id == qs.id, ) .order_by(StudyQuestionAttempt.answered_at.asc()) ) ).scalars().all() attempts_payload = [ QuizSessionAttemptItem( attempt_id=a.id, question_id=a.study_question_id, selected_choice=a.selected_choice, correct_choice=a.correct_choice, outcome=a.outcome, answered_at=a.answered_at, reviewed_at=a.reviewed_at, ) for a in attempt_rows ] summary = await _build_session_summary(qs, session, include_progress_counts=True) # Phase 4-A: 결과 화면 GET fallback — finalize enqueue 누락 또는 worker 처리 전 # 사용자가 결과 들어온 경우 같은 wrong/unsure qid 를 idempotent backfill enqueue. # 한 요청당 30 cap + non-blocking + debug 로그. 실패해도 GET 200 유지. if qs.status == "done": try: from services.study.explanation_enqueue import enqueue_explanation_for_qids wrong_unsure_qids = [ a.study_question_id for a in attempt_rows if a.outcome in ("wrong", "unsure") ] if wrong_unsure_qids: res = await enqueue_explanation_for_qids( session, user_id=user.id, qids=wrong_unsure_qids, max_count=30, # GET fallback cap ) await session.commit() logger.debug( "phase4a_get_backfill session=%s candidates=%s enqueued=%s skipped=%s", qs.id, res["candidate_count"], res["enqueue_count"], res["skipped_count"], ) except Exception as e: logger.warning( "phase4a_get_backfill_failed session=%s: %s: %s", qs.id, type(e).__name__, e, ) # Phase 4-B v1: ai_session_analysis 응답 (결과 캐시 LEFT JOIN + 최신 job) ai_session_analysis: QuizSessionAnalysisOut | None = None if qs.status == "done": try: from models.study_quiz_session_analysis import StudyQuizSessionAnalysis from models.study_quiz_session_job import StudyQuizSessionJob an_row = ( await session.execute( select(StudyQuizSessionAnalysis).where( StudyQuizSessionAnalysis.study_quiz_session_id == qs.id ) ) ).scalar_one_or_none() latest_job = ( await session.execute( select(StudyQuizSessionJob) .where(StudyQuizSessionJob.study_quiz_session_id == qs.id) .order_by(StudyQuizSessionJob.id.desc()) .limit(1) ) ).scalar_one_or_none() if an_row is not None or latest_job is not None: ai_session_analysis = QuizSessionAnalysisOut( summary_md=an_row.summary_md if an_row else None, confidence=an_row.confidence if an_row else None, generated_at=an_row.generated_at if an_row else None, is_stale=bool(an_row.is_stale) if an_row else False, job_status=latest_job.status if latest_job else None, job_error_code=latest_job.error_code if latest_job else None, ) # GET fallback enqueue — 기존 analysis 또는 active job 없으면만 시도. # best-effort: 실패가 GET 응답 깨지 않게 try/except. try: if an_row is None and (latest_job is None or latest_job.status not in ("pending", "processing")): from services.study.session_analysis_enqueue import enqueue_session_analysis_auto res = await enqueue_session_analysis_auto( session, user_id=user.id, study_quiz_session_id=qs.id, ) await session.commit() logger.debug( "phase4b_get_backfill session=%s enqueued=%s", qs.id, res["enqueued"], ) except Exception as e: logger.warning( "phase4b_get_backfill_failed session=%s: %s: %s", qs.id, type(e).__name__, e, ) except Exception as e: logger.warning( "phase4b_get_analysis_failed session=%s: %s: %s", qs.id, type(e).__name__, e, ) return QuizSessionDetailResponse( summary=summary, questions=questions_payload, attempts=attempts_payload, ai_session_analysis=ai_session_analysis, ) class QuizSessionPatchRequest(BaseModel): """abandon → status='abandoned'. 다른 필드 패치는 현재 안 받음.""" action: str = Field(..., pattern="^(abandon)$") @router.patch("/{topic_id}/quiz-sessions/{session_id}", response_model=QuizSessionSummary) async def patch_quiz_session( topic_id: int, session_id: int, body: QuizSessionPatchRequest, user: Annotated[User, Depends(get_current_user)], session: Annotated[AsyncSession, Depends(get_session)], ): """현재 abandon 만 지원 — 진행 중 세션 포기 (다시 시작 시 사용).""" topic = await session.get(StudyTopic, topic_id) _verify_topic_ownership(topic, user) qs = await session.get(StudyQuizSession, session_id) if qs is None or qs.user_id != user.id or qs.study_topic_id != topic_id: raise HTTPException(status_code=404, detail="quiz_session 을 찾을 수 없습니다") if body.action == "abandon": if qs.status != "in_progress": raise HTTPException(status_code=409, detail=f"이미 {qs.status} 상태입니다") qs.status = "abandoned" qs.updated_at = datetime.now(timezone.utc) await session.commit() return await _build_session_summary(qs, session) # ─── Phase 4-B v1: 세션 분석 재생성 ─── class RegenerateSummaryResponse(BaseModel): enqueued: bool reason: str | None = None # insufficient_attempts / already_active / not_done / not_found / race_lost @router.post( "/{topic_id}/quiz-sessions/{session_id}/regenerate-summary", response_model=RegenerateSummaryResponse, ) async def regenerate_session_summary( topic_id: int, session_id: int, user: Annotated[User, Depends(get_current_user)], session: Annotated[AsyncSession, Depends(get_session)], ): """사용자 [재생성] 버튼 — wrong/unsure < 5 즉시 차단 + active job 차단 + is_stale 처리. Plan ~/.claude/plans/nifty-sparking-spindle.md (Phase 4-B v1) 의 Manual 트리거 정책 그대로. """ topic = await session.get(StudyTopic, topic_id) _verify_topic_ownership(topic, user) qs = await session.get(StudyQuizSession, session_id) if qs is None or qs.user_id != user.id or qs.study_topic_id != topic_id: raise HTTPException(status_code=404, detail="quiz_session 을 찾을 수 없습니다") from services.study.session_analysis_enqueue import request_session_analysis_regenerate res = await request_session_analysis_regenerate( session, user_id=user.id, study_quiz_session_id=session_id, ) await session.commit() return RegenerateSummaryResponse(enqueued=res["enqueued"], reason=res.get("reason")) # ─── PR-12-A: 반복 출제 / 유사 유형 배치 카운트 ─── class RelatedTypesBulkRequest(BaseModel): question_ids: list[int] = Field(..., min_length=1, max_length=200) class RelatedTypesBulkItem(BaseModel): repeat_related_count: int repeat_round_count: int repeat_grade: str | None = None similar_related_count: int similar_round_count: int class RelatedTypesBulkResponse(BaseModel): # 입력 question_ids 전체 보존 — 권한 없음/임베딩 미준비/회차 미지정도 (0,0,0,0). items: dict[int, RelatedTypesBulkItem] @router.post("/{topic_id}/related-types-bulk", response_model=RelatedTypesBulkResponse) async def related_types_bulk( topic_id: int, body: RelatedTypesBulkRequest, user: Annotated[User, Depends(get_current_user)], session: Annotated[AsyncSession, Depends(get_session)], ): """카드별 배지(round_count) 표시용 배치 카운트. 비교 대상 = 같은 topic 안 모든 ready 문제 (입력 qid 끼리만 비교 X). 응답 dict 는 입력 qid 전체 보존 — 누락 X. """ from services.study.related_types import classify_related_bulk topic = await session.get(StudyTopic, topic_id) _verify_topic_ownership(topic, user) counts = await classify_related_bulk( session, user_id=user.id, study_topic_id=topic_id, question_ids=body.question_ids, ) return RelatedTypesBulkResponse( items={ qid: RelatedTypesBulkItem( repeat_related_count=c.repeat_related_count, repeat_round_count=c.repeat_round_count, repeat_grade=c.repeat_grade, similar_related_count=c.similar_related_count, similar_round_count=c.similar_round_count, ) for qid, c in counts.items() } )