Files
hyungi 63457e6afc feat(publish): S-1 pub_topics 발행 — projection+저작훅+백필 (study→viewer)
주제(study_topic) 메타를 발행 레이어에 실어 viewer 가 주제/회차 단위 퀴즈를
구성하게 한다(현재 topic 이름 미발행이라 불가). plan study-viewer-port S-1.
- publish_projection: KIND_TOPIC + project_topic(topic_id·name·exam_round_size).
  회차는 미발행 = viewer 가 pub_content(study_question) 의 exam_name/exam_round 로
  파생(추가 발행 불요). topic_id = project_question.topic_id 와 동일 DS 식별자라
  viewer 문항→주제 상관 키(pub_id 는 opaque 라 상관 키 아님).
- publish_enqueue: enqueue_topic_publish + backfill_publish_topics(bounded page,
  deleted_at IS NULL). 멱등 = 워커 (payload_hash, deleted) 디둡.
- study_topics 저작훅(전부 study_publish_enabled 게이트): create(flush→enqueue→
  commit) / update(재투영, payload 무변경은 디둡이 rev 안 올림=churn 0) /
  delete(tombstone, raw DELETE 금지·워커 경유).
- scripts/backfill_publish_topics.py: 기존 주제 1회 outbox 적재(overflow 가드).

워커·/published/feed 는 kind-generic(무변경, 실측). flag on 환경 배포 시 주제 발행
시작 → S-3 viewer 수용(generic upsert·kind-filtered read) 선행 전제, 게이트 PASS 됨.
백필 실행·배포순서 cutover 는 deploy 게이트(소프트락)라 본 슬라이스 미포함.
py_compile PASS · project_topic payload 단위검증.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-25 13:48:08 +09:00

2178 lines
80 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""학습 워크스페이스(study_topic) API — 필기 세션 + 자료(library document) 묶음 컨테이너
핵심 개념:
- study_topic 은 단순 폴더/태그가 아니라 학습 자산 1차 컨테이너 (학습 워크스페이스).
- 향후 단어장/오디오/문제 세트 같은 자산이 같은 컨테이너 아래로 들어올 수 있도록
응답 구조를 sections + stats 형태로 설계 (현재는 sessions / documents 두 키만).
- documents.category(자료실 UI 축) 와 직교한 별도 분류 축. 자료실 facet/카테고리는 미터치.
- StudySession.certification/subject/topic 은 보존 — 본 컨테이너 와 직교한 세부 메타.
설계 제약:
- study_type 은 강한 enum 미사용. VALID_STUDY_TYPES 는 단순 권장값 (DB/Pydantic 강제 안 함).
- soft delete (deleted_at). active 행끼리만 (user_id, name) partial unique.
- 권한: 모든 쿼리에 user_id 강제. 다른 사용자 자료를 묶지 못함.
- 문서 매핑은 N:M (study_topic_documents). 세션 매핑은 1:N (study_sessions.study_topic_id).
- polymorphic 단일 study_topic_items 테이블은 만들지 않는다 (영구 금지).
"""
import asyncio
import logging
import random as _random
from datetime import datetime, timezone
from enum import Enum
from pathlib import Path as _Path
from typing import Annotated, Any
from fastapi import APIRouter, Depends, HTTPException, Query
from pydantic import BaseModel, Field
from sqlalchemy import and_, case, delete, func, or_ as sql_or, select, text as sql_text, update
from sqlalchemy.exc import IntegrityError
from sqlalchemy.ext.asyncio import AsyncSession
from ai.client import AIClient, strip_thinking
from eid.ai import EidAIClient
from eid.compose import compose
from core.auth import get_current_user
from core.config import settings
from core.database import get_session
from core.library import LIBRARY_PREFIX, normalize_library_path
from models.document import Document
from models.study_session import StudySession
from models.study_topic import StudyTopic, StudyTopicDocument
from models.study_question import StudyQuestion, StudyQuestionAttempt
from models.study_question_image import StudyQuestionImage
from models.study_quiz_session import StudyQuizSession
from models.study_topic_subject_note import StudyTopicSubjectNote
from models.eid_study_weakness import EidStudyWeakness
from models.eid_review_set_draft import EidReviewSetDraft
from models.user import User
from services.search.llm_gate import Priority, acquire_mlx_gate
from services.study.publish_enqueue import enqueue_publish, enqueue_topic_publish
from services.study.publish_projection import KIND_TOPIC
from services.study.subject_note_rag import (
SubjectNoteContext,
gather_subject_note_context,
render_evidence_block,
)
from services.study.weakness_compute import format_habit_block, format_weakness_block
logger = logging.getLogger(__name__)
router = APIRouter()
# ─── 권장값 (강제 enum 아님, UI 안내용) ───
RECOMMENDED_STUDY_TYPES: set[str] = {
"certification", "language", "school", "work", "general",
}
# ─── Pydantic 스키마 ───
class StudyTopicCreate(BaseModel):
name: str = Field(min_length=1, max_length=120)
description: str | None = None
color: str | None = Field(default=None, max_length=20)
study_type: str | None = Field(default=None, max_length=40)
sort_order: int = 0
# PR-6: 시험 메타
exam_round_size: int | None = Field(default=None, ge=1, le=300)
exam_subjects: list[str] = []
class StudyTopicUpdate(BaseModel):
name: str | None = Field(default=None, min_length=1, max_length=120)
description: str | None = None
color: str | None = Field(default=None, max_length=20)
study_type: str | None = Field(default=None, max_length=40)
sort_order: int | None = None
# PR-6: 시험 메타
exam_round_size: int | None = Field(default=None, ge=1, le=300)
exam_subjects: list[str] | None = None
# 공부 암기노트: 공부중 토글 (true=focused_at=now, false=clear)
focused: bool | None = None
class StudyTopicResponse(BaseModel):
"""주제 목록 응답 — 집계 카운트 포함."""
id: int
name: str
description: str | None
color: str | None
study_type: str | None
sort_order: int
session_count: int = 0
document_count: int = 0
question_count: int = 0
# PR-6: 시험 메타
exam_round_size: int | None = None
exam_subjects: list[str] = []
# 공부 암기노트: 공부중 태그 상태
focused: bool = False
created_at: datetime
updated_at: datetime
class StudyTopicListResponse(BaseModel):
items: list[StudyTopicResponse]
total: int
class StudyTopicSessionSummary(BaseModel):
"""상세 뷰의 세션 카드 페이로드 — 통합 뷰 렌더용 최소 필드."""
id: int
study_type: str
certification: str | None
language_code: str | None
learning_level: str | None
subject: str | None
topic: str | None
mode: str
repetition_count: int
review_state: str | None
created_at: datetime
updated_at: datetime
class StudyTopicDocumentSummary(BaseModel):
"""상세 뷰의 자료 카드 페이로드.
library_paths: 통합뷰 자료 섹션의 카테고리 트리 그룹핑용. user_tags 의
`@library/<path>` 태그에서 prefix 제거한 path 들. 한 자료가 여러 카테고리에
속할 수 있으나(다중 분류), 트리 그룹핑 시 첫 번째 path 만 사용. 빈 리스트면
"분류 없음" 그룹.
"""
id: int
title: str | None
file_format: str
file_type: str
category: str | None
ai_domain: str | None
importance: str | None
sort_order: int
linked_at: datetime
library_paths: list[str] = []
class StudyTopicQuestionSummary(BaseModel):
"""상세 뷰의 문제 카드 페이로드 — 정답·해설 비공개, 본문 80자 truncate."""
id: int
question_text: str
subject: str | None
scope: str | None
exam_name: str | None
exam_round: str | None
exam_question_number: int | None # PR-11: 회차별 그룹 안 정렬 키
is_active: bool
attempt_count: int
last_correct: bool | None
created_at: datetime
class StudyTopicSections(BaseModel):
"""확장 친화 dict — 향후 audio_assets / vocab_decks 키 추가 가능."""
sessions: list[StudyTopicSessionSummary]
documents: list[StudyTopicDocumentSummary]
questions: list[StudyTopicQuestionSummary] = []
class StudyTopicStats(BaseModel):
"""자산 카운트 — 0 으로 미리 노출해서 후속 PR 에서 필드만 채움.
question_count = PR-2 단일 문제 수. question_set_count = 후속 PR 의 회차/모의고사 묶음 수 (둘은 의미 다름).
"""
session_count: int
document_count: int
question_count: int = 0
audio_count: int = 0
vocab_count: int = 0
question_set_count: int = 0
class StudyTopicMeta(BaseModel):
id: int
name: str
description: str | None
color: str | None
study_type: str | None
sort_order: int
# PR-6: 시험 메타
exam_round_size: int | None = None
exam_subjects: list[str] = []
# 공부 암기노트: 공부중 태그 상태
focused: bool = False
created_at: datetime
updated_at: datetime
class StudyTopicDetailResponse(BaseModel):
topic: StudyTopicMeta
sections: StudyTopicSections
stats: StudyTopicStats
class StudyTopicDocumentLinkRequest(BaseModel):
document_ids: list[int] = Field(min_length=1, max_length=100)
class StudyTopicDocumentLinkResponse(BaseModel):
linked: list[int]
skipped_existing: list[int]
skipped_not_found: list[int]
class StudyTopicDocumentByPathRequest(BaseModel):
"""카테고리 트리 노드 일괄 추가용. path prefix 매칭으로 하위 자료 모두 매핑.
include_subtree=True 면 path 와 그 하위 카테고리 전체, False 면 path 정확 일치만.
"""
path: str = Field(min_length=1, max_length=500)
include_subtree: bool = True
class StudyTopicDocumentByPathResponse(BaseModel):
linked_count: int
skipped_existing_count: int
total_in_path: int
path: str
# ─── Helpers ───
def _verify_topic_ownership(topic: StudyTopic | None, user: User) -> StudyTopic:
"""소유자 검증 + soft-deleted 행 차단. mismatch 도 404 (정보 누설 방지)."""
if topic is None or topic.user_id != user.id or topic.deleted_at is not None:
raise HTTPException(status_code=404, detail="학습 주제를 찾을 수 없습니다")
return topic
def _meta_from_topic(t: StudyTopic) -> StudyTopicMeta:
return StudyTopicMeta(
id=t.id,
name=t.name,
description=t.description,
color=t.color,
study_type=t.study_type,
sort_order=t.sort_order,
exam_round_size=t.exam_round_size,
exam_subjects=t.exam_subjects or [],
created_at=t.created_at,
updated_at=t.updated_at,
)
# ─── 엔드포인트 ───
@router.get("/", response_model=StudyTopicListResponse)
async def list_study_topics(
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
limit: int = Query(100, ge=1, le=500),
offset: int = Query(0, ge=0),
):
"""사용자의 active 주제 목록. 세션 수·자료 수 집계 포함."""
# active 주제 + 세션 수 (LEFT JOIN study_sessions)
sess_count_sub = (
select(
StudySession.study_topic_id.label("topic_id"),
func.count().label("c"),
)
.where(StudySession.user_id == user.id)
.where(StudySession.study_topic_id.is_not(None))
.group_by(StudySession.study_topic_id)
.subquery()
)
doc_count_sub = (
select(
StudyTopicDocument.study_topic_id.label("topic_id"),
func.count().label("c"),
)
.where(StudyTopicDocument.user_id == user.id)
.group_by(StudyTopicDocument.study_topic_id)
.subquery()
)
# PR-2 — 활성 문제 카운트 (soft-deleted 제외)
from models.study_question import StudyQuestion as _SQ
q_count_sub = (
select(
_SQ.study_topic_id.label("topic_id"),
func.count().label("c"),
)
.where(_SQ.user_id == user.id, _SQ.deleted_at.is_(None))
.group_by(_SQ.study_topic_id)
.subquery()
)
base = (
select(
StudyTopic,
func.coalesce(sess_count_sub.c.c, 0).label("session_count"),
func.coalesce(doc_count_sub.c.c, 0).label("document_count"),
func.coalesce(q_count_sub.c.c, 0).label("question_count"),
)
.outerjoin(sess_count_sub, sess_count_sub.c.topic_id == StudyTopic.id)
.outerjoin(doc_count_sub, doc_count_sub.c.topic_id == StudyTopic.id)
.outerjoin(q_count_sub, q_count_sub.c.topic_id == StudyTopic.id)
.where(StudyTopic.user_id == user.id, StudyTopic.deleted_at.is_(None))
.order_by(StudyTopic.sort_order.asc(), StudyTopic.id.desc())
)
total_query = select(func.count()).select_from(
select(StudyTopic.id)
.where(StudyTopic.user_id == user.id, StudyTopic.deleted_at.is_(None))
.subquery()
)
total = (await session.execute(total_query)).scalar() or 0
rows = (await session.execute(base.offset(offset).limit(limit))).all()
items = [
StudyTopicResponse(
id=t.id,
name=t.name,
description=t.description,
color=t.color,
study_type=t.study_type,
sort_order=t.sort_order,
session_count=int(sc),
document_count=int(dc),
question_count=int(qc),
exam_round_size=t.exam_round_size,
exam_subjects=t.exam_subjects or [],
created_at=t.created_at,
updated_at=t.updated_at,
)
for t, sc, dc, qc in rows
]
return StudyTopicListResponse(items=items, total=total)
class ExamRoundProgress(BaseModel):
exam_round: str
question_count: int
max_question_number: int # 0 = exam_question_number 가 모두 NULL
next_question_number: int | None # 도달 시 None
is_complete: bool
class ExamRoundsResponse(BaseModel):
exam_round_size: int | None # 토픽 메타. NULL = 미설정
items: list[ExamRoundProgress]
@router.get("/{topic_id}/exam-rounds", response_model=ExamRoundsResponse)
async def list_exam_rounds(
topic_id: int,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
):
"""토픽 안 회차별 진행률 집계. 진행률 표시 + 재개 UX 용.
is_complete = (exam_round_size IS NOT NULL AND question_count >= exam_round_size).
next_question_number = max + 1 (도달 시 NULL).
"""
topic = await session.get(StudyTopic, topic_id)
_verify_topic_ownership(topic, user)
from models.study_question import StudyQuestion as _SQ
rows = (
await session.execute(
select(
_SQ.exam_round,
func.count().label("question_count"),
func.coalesce(func.max(_SQ.exam_question_number), 0).label("max_qnum"),
)
.where(
_SQ.user_id == user.id,
_SQ.study_topic_id == topic_id,
_SQ.deleted_at.is_(None),
_SQ.exam_round.is_not(None),
)
.group_by(_SQ.exam_round)
.order_by(_SQ.exam_round.desc())
)
).all()
size = topic.exam_round_size
items: list[ExamRoundProgress] = []
for r in rows:
cnt = int(r.question_count)
mx = int(r.max_qnum)
is_complete = bool(size is not None and cnt >= size)
# 도달했으면 next None, 아니면 max+1 (max=0 이면 1번부터)
next_n = None if is_complete else (mx + 1 if mx > 0 else 1)
items.append(ExamRoundProgress(
exam_round=r.exam_round,
question_count=cnt,
max_question_number=mx,
next_question_number=next_n,
is_complete=is_complete,
))
return ExamRoundsResponse(exam_round_size=size, items=items)
@router.get("/by-document/{document_id}", response_model=list[StudyTopicMeta])
async def list_topics_for_document(
document_id: int,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
):
"""이 자료가 매핑된 active 주제 목록.
/study/sources 카드의 "이 자료가 속한 주제" 배지·컨텍스트 메뉴용.
"""
rows = (
await session.execute(
select(StudyTopic)
.join(StudyTopicDocument, StudyTopicDocument.study_topic_id == StudyTopic.id)
.where(
StudyTopicDocument.document_id == document_id,
StudyTopicDocument.user_id == user.id,
StudyTopic.deleted_at.is_(None),
)
.order_by(StudyTopic.sort_order.asc(), StudyTopic.id.desc())
)
).scalars().all()
return [_meta_from_topic(t) for t in rows]
@router.post("/", response_model=StudyTopicResponse, status_code=201)
async def create_study_topic(
body: StudyTopicCreate,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
):
"""주제 생성. 같은 이름의 active 주제가 이미 있으면 409.
soft-deleted 동명 주제는 partial unique index 에서 빠지므로 신규 생성 가능.
"""
topic = StudyTopic(
user_id=user.id,
name=body.name.strip(),
description=body.description,
color=body.color,
study_type=body.study_type,
sort_order=body.sort_order,
exam_round_size=body.exam_round_size,
exam_subjects=body.exam_subjects or [],
)
session.add(topic)
try:
await session.flush()
# 발행 outbox 적재(같은 tx, flag off 면 no-op) — 신규 주제 발행. S-1.
if settings.study_publish_enabled:
await enqueue_topic_publish(session, topic)
await session.commit()
except IntegrityError:
await session.rollback()
raise HTTPException(status_code=409, detail="이미 같은 이름의 학습 주제가 있습니다")
return StudyTopicResponse(
id=topic.id,
name=topic.name,
description=topic.description,
color=topic.color,
study_type=topic.study_type,
sort_order=topic.sort_order,
session_count=0,
document_count=0,
question_count=0,
exam_round_size=topic.exam_round_size,
exam_subjects=topic.exam_subjects or [],
created_at=topic.created_at,
updated_at=topic.updated_at,
)
@router.get("/{topic_id}", response_model=StudyTopicDetailResponse)
async def get_study_topic(
topic_id: int,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
):
"""통합 뷰: 주제 메타 + 세션 목록 + 자료 목록 + stats.
응답 형태는 후속 PR(오디오/단어장/문제세트 추가) 에서 sections / stats 키만 늘리면 되도록 dict 구조.
"""
topic = await session.get(StudyTopic, topic_id)
topic = _verify_topic_ownership(topic, user)
# 세션 목록 — 최근순
sess_rows = (
await session.execute(
select(StudySession)
.where(
StudySession.user_id == user.id,
StudySession.study_topic_id == topic_id,
)
.order_by(StudySession.created_at.desc(), StudySession.id.desc())
)
).scalars().all()
sessions_payload = [
StudyTopicSessionSummary(
id=s.id,
study_type=s.study_type,
certification=s.certification,
language_code=s.language_code,
learning_level=s.learning_level,
subject=s.subject,
topic=s.topic,
mode=s.mode,
repetition_count=s.repetition_count,
review_state=s.review_state,
created_at=s.created_at,
updated_at=s.updated_at,
)
for s in sess_rows
]
# 자료 목록 — 매핑 sort_order → created_at desc
doc_rows = (
await session.execute(
select(Document, StudyTopicDocument)
.join(
StudyTopicDocument,
and_(
StudyTopicDocument.document_id == Document.id,
StudyTopicDocument.study_topic_id == topic_id,
StudyTopicDocument.user_id == user.id,
),
)
.where(Document.deleted_at.is_(None))
.order_by(
StudyTopicDocument.sort_order.asc(),
StudyTopicDocument.created_at.desc(),
)
)
).all()
def _extract_library_paths(tags: list | None) -> list[str]:
if not tags:
return []
out: list[str] = []
for t in tags:
if isinstance(t, str) and t.startswith(LIBRARY_PREFIX):
out.append(t[len(LIBRARY_PREFIX):])
return out
documents_payload = [
StudyTopicDocumentSummary(
id=d.id,
title=d.title,
file_format=d.file_format,
file_type=str(d.file_type) if d.file_type is not None else "immutable",
category=str(d.category) if d.category is not None else None,
ai_domain=d.ai_domain,
importance=d.importance,
sort_order=link.sort_order,
linked_at=link.created_at,
library_paths=_extract_library_paths(d.user_tags),
)
for d, link in doc_rows
]
# 문제 목록 (PR-2) — 본문 truncate, 정답·해설 비공개. attempt 통계 batch 로 끌어옴.
from models.study_question import StudyQuestion, StudyQuestionAttempt
q_rows = (
await session.execute(
select(StudyQuestion)
.where(
StudyQuestion.user_id == user.id,
StudyQuestion.study_topic_id == topic_id,
StudyQuestion.deleted_at.is_(None),
)
.order_by(StudyQuestion.created_at.desc(), StudyQuestion.id.desc())
)
).scalars().all()
qids = [q.id for q in q_rows]
q_attempt_count: dict[int, int] = {}
q_last_correct: dict[int, bool] = {}
if qids:
from sqlalchemy import case as _case
cnt_rows = (
await session.execute(
select(
StudyQuestionAttempt.study_question_id,
func.count().label("total"),
)
.where(
StudyQuestionAttempt.user_id == user.id,
StudyQuestionAttempt.study_question_id.in_(qids),
)
.group_by(StudyQuestionAttempt.study_question_id)
)
).all()
for r in cnt_rows:
q_attempt_count[r.study_question_id] = int(r.total)
latest_rows = (
await session.execute(
select(
StudyQuestionAttempt.study_question_id,
StudyQuestionAttempt.is_correct,
)
.where(
StudyQuestionAttempt.user_id == user.id,
StudyQuestionAttempt.study_question_id.in_(qids),
)
.order_by(
StudyQuestionAttempt.study_question_id,
StudyQuestionAttempt.answered_at.desc(),
)
.distinct(StudyQuestionAttempt.study_question_id)
)
).all()
for r in latest_rows:
q_last_correct[r.study_question_id] = bool(r.is_correct)
def _truncate_q(text: str, n: int = 80) -> str:
return text if len(text) <= n else text[:n].rstrip() + ""
questions_payload = [
StudyTopicQuestionSummary(
id=q.id,
question_text=_truncate_q(q.question_text, 80),
subject=q.subject,
scope=q.scope,
exam_name=q.exam_name,
exam_round=q.exam_round,
exam_question_number=q.exam_question_number,
is_active=q.is_active,
attempt_count=q_attempt_count.get(q.id, 0),
last_correct=q_last_correct.get(q.id),
created_at=q.created_at,
)
for q in q_rows
]
return StudyTopicDetailResponse(
topic=_meta_from_topic(topic),
sections=StudyTopicSections(
sessions=sessions_payload,
documents=documents_payload,
questions=questions_payload,
),
stats=StudyTopicStats(
session_count=len(sessions_payload),
document_count=len(documents_payload),
question_count=len(questions_payload),
# 후속 PR 에서 채움
audio_count=0,
vocab_count=0,
question_set_count=0,
),
)
@router.patch("/{topic_id}", response_model=StudyTopicResponse)
async def update_study_topic(
topic_id: int,
body: StudyTopicUpdate,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
):
topic = await session.get(StudyTopic, topic_id)
topic = _verify_topic_ownership(topic, user)
fields_set = body.model_fields_set
if "name" in fields_set and body.name is not None:
topic.name = body.name.strip()
for fname in {"description", "color", "study_type", "sort_order"} & fields_set:
setattr(topic, fname, getattr(body, fname))
# PR-6: 시험 메타. exam_subjects 가 None 이면 (PATCH 미명시) 기존 유지, 빈 배열은 명시적 클리어
if "exam_round_size" in fields_set:
topic.exam_round_size = body.exam_round_size
if "exam_subjects" in fields_set and body.exam_subjects is not None:
topic.exam_subjects = body.exam_subjects
# 공부 암기노트: 공부중 태그 토글 (focused_at IS NOT NULL = reminder/세션 대상)
if "focused" in fields_set:
topic.focused_at = datetime.now(timezone.utc) if body.focused else None
topic.updated_at = datetime.now(timezone.utc)
# 발행 재투영(같은 tx) — 주제 메타 갱신 반영. payload(name·exam_round_size) 무변경(focused 등)
# 은 워커 (payload_hash, deleted) 디둡이 rev 안 올리고 흡수 = churn 없음. S-1.
if settings.study_publish_enabled:
await enqueue_topic_publish(session, topic)
try:
await session.commit()
except IntegrityError:
await session.rollback()
raise HTTPException(status_code=409, detail="이미 같은 이름의 학습 주제가 있습니다")
# 응답용 카운트 별도 조회
sc = (await session.execute(
select(func.count())
.select_from(StudySession)
.where(StudySession.user_id == user.id, StudySession.study_topic_id == topic.id)
)).scalar() or 0
dc = (await session.execute(
select(func.count())
.select_from(StudyTopicDocument)
.where(StudyTopicDocument.study_topic_id == topic.id)
)).scalar() or 0
from models.study_question import StudyQuestion as _SQ2
qc = (await session.execute(
select(func.count())
.select_from(_SQ2)
.where(
_SQ2.user_id == user.id,
_SQ2.study_topic_id == topic.id,
_SQ2.deleted_at.is_(None),
)
)).scalar() or 0
return StudyTopicResponse(
id=topic.id,
name=topic.name,
description=topic.description,
color=topic.color,
study_type=topic.study_type,
sort_order=topic.sort_order,
session_count=int(sc),
document_count=int(dc),
question_count=int(qc),
exam_round_size=topic.exam_round_size,
exam_subjects=topic.exam_subjects or [],
focused=topic.focused_at is not None,
created_at=topic.created_at,
updated_at=topic.updated_at,
)
@router.delete("/{topic_id}", status_code=204)
async def delete_study_topic(
topic_id: int,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
):
"""주제 soft delete. 세션의 study_topic_id 는 SET NULL, 문서 매핑은 cascade 제거.
같은 이름 재생성을 위해 partial unique index 가 deleted_at IS NULL 인 행만 본다.
"""
topic = await session.get(StudyTopic, topic_id)
topic = _verify_topic_ownership(topic, user)
# 세션 FK SET NULL — 명시적으로 (DB ON DELETE SET NULL 은 hard delete 시에만 동작)
await session.execute(
update(StudySession)
.where(
StudySession.user_id == user.id,
StudySession.study_topic_id == topic_id,
)
.values(study_topic_id=None)
)
# 문서 매핑 명시 제거 (soft delete 라 FK CASCADE 안 탐)
await session.execute(
delete(StudyTopicDocument).where(
StudyTopicDocument.study_topic_id == topic_id,
)
)
topic.deleted_at = datetime.now(timezone.utc)
# 발행 tombstone(같은 tx) — 삭제는 feed 1급 이벤트(raw DELETE 금지·워커 경유). S-1.
if settings.study_publish_enabled:
await enqueue_publish(session, kind=KIND_TOPIC, source_id=topic.id, payload=None, deleted=True)
await session.commit()
# ─── 자료(문서) 매핑 ───
@router.post(
"/{topic_id}/documents",
response_model=StudyTopicDocumentLinkResponse,
status_code=201,
)
async def link_documents_to_topic(
topic_id: int,
body: StudyTopicDocumentLinkRequest,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
):
"""자료 N개를 주제에 일괄 매핑. 이미 연결된 건 skipped_existing 으로 보고."""
topic = await session.get(StudyTopic, topic_id)
topic = _verify_topic_ownership(topic, user)
requested = list(dict.fromkeys(body.document_ids)) # 중복 제거 + 순서 보존
# 존재 + 미삭제 documents 만 통과 (single-user 시스템: documents.user_id 부재 가능)
valid_rows = (
await session.execute(
select(Document.id).where(
Document.id.in_(requested),
Document.deleted_at.is_(None),
)
)
).scalars().all()
valid_set = set(valid_rows)
not_found = [d for d in requested if d not in valid_set]
# 이미 매핑된 것
existing_rows = (
await session.execute(
select(StudyTopicDocument.document_id).where(
StudyTopicDocument.study_topic_id == topic_id,
StudyTopicDocument.document_id.in_(valid_set),
)
)
).scalars().all()
existing_set = set(existing_rows)
to_link = [d for d in requested if d in valid_set and d not in existing_set]
# 기존 max sort_order 다음부터
max_sort = (
await session.execute(
select(func.coalesce(func.max(StudyTopicDocument.sort_order), -1))
.where(StudyTopicDocument.study_topic_id == topic_id)
)
).scalar() or -1
for i, doc_id in enumerate(to_link, start=1):
session.add(StudyTopicDocument(
study_topic_id=topic_id,
document_id=doc_id,
user_id=user.id,
sort_order=int(max_sort) + i,
))
try:
await session.commit()
except IntegrityError:
await session.rollback()
raise HTTPException(status_code=409, detail="자료 매핑 중 충돌이 발생했습니다")
return StudyTopicDocumentLinkResponse(
linked=to_link,
skipped_existing=sorted(existing_set),
skipped_not_found=not_found,
)
@router.post(
"/{topic_id}/documents/by-path",
response_model=StudyTopicDocumentByPathResponse,
status_code=201,
)
async def link_documents_by_library_path(
topic_id: int,
body: StudyTopicDocumentByPathRequest,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
):
"""자료실 카테고리 path 기준으로 하위 자료를 일괄 매핑.
/api/documents/library?path=... 와 동일한 user_tags(@library/...) prefix 매칭 사용.
100건 limit 우회 — 한 카테고리에 자료가 많아도 한 번에 처리.
"""
topic = await session.get(StudyTopic, topic_id)
topic = _verify_topic_ownership(topic, user)
try:
normalized = normalize_library_path(body.path)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
exact_tag = f"{LIBRARY_PREFIX}{normalized}"
prefix_tag = f"{LIBRARY_PREFIX}{normalized}/%"
# @library/<path> 정확 일치 또는 (subtree 옵션 시) 그 하위까지 prefix 매칭
if body.include_subtree:
tag_filter = sql_text("""
EXISTS (
SELECT 1 FROM jsonb_array_elements_text(documents.user_tags) AS t
WHERE t = :exact OR t LIKE :prefix
)
""").bindparams(exact=exact_tag, prefix=prefix_tag)
else:
tag_filter = sql_text("""
EXISTS (
SELECT 1 FROM jsonb_array_elements_text(documents.user_tags) AS t
WHERE t = :exact
)
""").bindparams(exact=exact_tag)
candidate_rows = (
await session.execute(
select(Document.id).where(
Document.deleted_at.is_(None),
Document.category == "library",
tag_filter,
)
)
).scalars().all()
candidate_set = set(candidate_rows)
total_in_path = len(candidate_set)
if total_in_path == 0:
return StudyTopicDocumentByPathResponse(
linked_count=0,
skipped_existing_count=0,
total_in_path=0,
path=normalized,
)
existing_rows = (
await session.execute(
select(StudyTopicDocument.document_id).where(
StudyTopicDocument.study_topic_id == topic_id,
StudyTopicDocument.document_id.in_(candidate_set),
)
)
).scalars().all()
existing_set = set(existing_rows)
to_link = [d for d in candidate_rows if d not in existing_set]
max_sort = (
await session.execute(
select(func.coalesce(func.max(StudyTopicDocument.sort_order), -1))
.where(StudyTopicDocument.study_topic_id == topic_id)
)
).scalar() or -1
for i, doc_id in enumerate(to_link, start=1):
session.add(StudyTopicDocument(
study_topic_id=topic_id,
document_id=doc_id,
user_id=user.id,
sort_order=int(max_sort) + i,
))
try:
await session.commit()
except IntegrityError:
await session.rollback()
raise HTTPException(status_code=409, detail="자료 매핑 중 충돌이 발생했습니다")
return StudyTopicDocumentByPathResponse(
linked_count=len(to_link),
skipped_existing_count=len(existing_set),
total_in_path=total_in_path,
path=normalized,
)
@router.delete("/{topic_id}/documents/{document_id}", status_code=204)
async def unlink_document_from_topic(
topic_id: int,
document_id: int,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
):
topic = await session.get(StudyTopic, topic_id)
_verify_topic_ownership(topic, user)
result = await session.execute(
delete(StudyTopicDocument).where(
StudyTopicDocument.study_topic_id == topic_id,
StudyTopicDocument.document_id == document_id,
StudyTopicDocument.user_id == user.id,
)
)
await session.commit()
if result.rowcount == 0:
raise HTTPException(status_code=404, detail="매핑을 찾을 수 없습니다")
# ─── 세션 매핑 ───
@router.post("/{topic_id}/sessions/{session_id}", status_code=204)
async def attach_session_to_topic(
topic_id: int,
session_id: int,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
):
"""기존 세션을 주제에 연결. 다른 주제에 이미 연결돼 있으면 새 주제로 갱신 (1:N)."""
topic = await session.get(StudyTopic, topic_id)
_verify_topic_ownership(topic, user)
sess = await session.get(StudySession, session_id)
if sess is None or sess.user_id != user.id:
raise HTTPException(status_code=404, detail="학습 세션을 찾을 수 없습니다")
sess.study_topic_id = topic_id
sess.updated_at = datetime.now(timezone.utc)
await session.commit()
@router.delete("/{topic_id}/sessions/{session_id}", status_code=204)
async def detach_session_from_topic(
topic_id: int,
session_id: int,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
):
"""세션 분리. 현재 study_topic_id 가 topic_id 가 아니면 404."""
topic = await session.get(StudyTopic, topic_id)
_verify_topic_ownership(topic, user)
sess = await session.get(StudySession, session_id)
if sess is None or sess.user_id != user.id:
raise HTTPException(status_code=404, detail="학습 세션을 찾을 수 없습니다")
if sess.study_topic_id != topic_id:
raise HTTPException(status_code=404, detail="해당 주제에 연결된 세션이 아닙니다")
sess.study_topic_id = None
sess.updated_at = datetime.now(timezone.utc)
await session.commit()
# ─── PR-9: 분야 설명 (study_topic_subject_notes) ───
SUBJECT_NOTE_TIMEOUT_S = settings.llm_call_timeout_s
_SUBJECT_NOTE_PROMPT_PATH = "study_subject_note.txt"
_subject_note_prompt_cache: str | None = None
def _load_subject_note_prompt() -> str:
global _subject_note_prompt_cache
if _subject_note_prompt_cache is None:
prompts_dir = _Path(__file__).resolve().parent.parent / "prompts"
_subject_note_prompt_cache = (prompts_dir / _SUBJECT_NOTE_PROMPT_PATH).read_text(encoding="utf-8")
return _subject_note_prompt_cache
def _render_subject_note_prompt(subject: str, scope: str, doc_block: str, q_block: str) -> str:
template = _load_subject_note_prompt()
return (
template
.replace("{subject}", subject)
.replace("{scope}", scope or "(미지정)")
.replace("{documents_evidence_block}", doc_block)
.replace("{questions_evidence_block}", q_block)
)
class SubjectNoteRequest(BaseModel):
subject: str = Field(min_length=1, max_length=120)
scope: str = Field(default="", max_length=200)
regenerate: bool = False
class SubjectNoteEvidence(BaseModel):
source_type: str
source_id: int
title: str
snippet: str
class SubjectNoteResponse(BaseModel):
subject: str
scope: str
content: str | None
status: str # ready | failed | none | stale | pending
generated_at: datetime | None
model: str | None
evidence: list[SubjectNoteEvidence] = []
from_cache: bool = False
can_regenerate: bool = True
def _note_cache_response(note: StudyTopicSubjectNote) -> SubjectNoteResponse:
return SubjectNoteResponse(
subject=note.subject,
scope=note.scope,
content=note.content,
status=note.status,
generated_at=note.generated_at,
model=note.model,
evidence=[],
from_cache=True,
can_regenerate=True,
)
@router.get("/{topic_id}/subject-notes", response_model=SubjectNoteResponse)
async def get_subject_note(
topic_id: int,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
subject: str = Query(..., min_length=1, max_length=120),
scope: str = Query("", max_length=200),
):
"""캐시 조회. 없으면 status='none' + content=null 응답."""
topic = await session.get(StudyTopic, topic_id)
_verify_topic_ownership(topic, user)
note = (
await session.execute(
select(StudyTopicSubjectNote).where(
StudyTopicSubjectNote.user_id == user.id,
StudyTopicSubjectNote.study_topic_id == topic_id,
StudyTopicSubjectNote.subject == subject,
StudyTopicSubjectNote.scope == scope,
)
)
).scalar_one_or_none()
if note is None:
return SubjectNoteResponse(
subject=subject, scope=scope, content=None, status="none",
generated_at=None, model=None, evidence=[], from_cache=True, can_regenerate=True,
)
return _note_cache_response(note)
@router.post("/{topic_id}/subject-notes/generate", response_model=SubjectNoteResponse)
async def generate_subject_note(
topic_id: int,
body: SubjectNoteRequest,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
):
"""분야 설명 AI 생성 + 캐시. PR-3 race-safe pending 패턴 동일.
regenerate=false + status=ready → 캐시 반환.
pending → 409.
그 외 → 새 생성. 실패 시 status='failed', 직전 본문 보존.
"""
topic = await session.get(StudyTopic, topic_id)
_verify_topic_ownership(topic, user)
# upsert: 기존 행 있으면 사용, 없으면 신규
note = (
await session.execute(
select(StudyTopicSubjectNote).where(
StudyTopicSubjectNote.user_id == user.id,
StudyTopicSubjectNote.study_topic_id == topic_id,
StudyTopicSubjectNote.subject == body.subject,
StudyTopicSubjectNote.scope == body.scope,
)
)
).scalar_one_or_none()
if note is None:
note = StudyTopicSubjectNote(
user_id=user.id,
study_topic_id=topic_id,
subject=body.subject,
scope=body.scope,
status="none",
)
session.add(note)
await session.flush()
await session.commit()
else:
# 캐시 단축
if not body.regenerate:
if note.status == "ready":
return _note_cache_response(note)
if note.status == "pending":
raise HTTPException(
status_code=409,
detail={"status": "pending", "detail": "이미 생성 중입니다"},
)
# none/failed/stale → 새로 생성
else:
if note.status == "pending":
raise HTTPException(
status_code=409,
detail={"status": "pending", "detail": "이미 생성 중입니다"},
)
# race-safe pending 전이
lock = await session.execute(
update(StudyTopicSubjectNote)
.where(
StudyTopicSubjectNote.id == note.id,
StudyTopicSubjectNote.status != "pending",
)
.values(status="pending", updated_at=datetime.now(timezone.utc))
.returning(StudyTopicSubjectNote.id)
)
if lock.scalar_one_or_none() is None:
raise HTTPException(
status_code=409,
detail={"status": "pending", "detail": "이미 생성 중입니다"},
)
await session.commit()
# RAG
try:
ctx = await gather_subject_note_context(session, user.id, topic_id, body.subject, body.scope)
except Exception as e:
logger.warning("subject_note_rag_failed: %s: %s", type(e).__name__, e)
ctx = SubjectNoteContext(documents=[], questions=[])
doc_block = render_evidence_block(ctx.documents)
q_block = render_evidence_block(ctx.questions)
prompt = _render_subject_note_prompt(body.subject, body.scope, doc_block, q_block)
ai_client = EidAIClient() # 이드 표면 = egress 코드층 박탈(call_primary only, W4-1)
raw_text: str | None = None
try:
async with acquire_mlx_gate(Priority.FOREGROUND):
async with asyncio.timeout(SUBJECT_NOTE_TIMEOUT_S):
# 이드 substrate(persona+rules)=system / 렌더 템플릿(지시+evidence)=user (W2-2)
raw_text = await ai_client.call_primary(
prompt, system=compose("study_subject_note", task="")
)
except asyncio.TimeoutError:
logger.warning("subject_note_mlx_timeout topic=%s subject=%s", topic_id, body.subject)
except Exception:
logger.exception("subject_note_mlx_failed topic=%s subject=%s", topic_id, body.subject)
finally:
await ai_client.close()
note = await session.get(StudyTopicSubjectNote, note.id)
if not raw_text or not raw_text.strip():
note.status = "failed"
note.updated_at = datetime.now(timezone.utc)
await session.commit()
return SubjectNoteResponse(
subject=note.subject, scope=note.scope, content=note.content,
status="failed", generated_at=note.generated_at, model=note.model,
evidence=[e.to_dict() for e in ctx.all],
from_cache=False, can_regenerate=True,
)
cleaned = strip_thinking(raw_text).strip()
note.content = cleaned
note.status = "ready"
note.generated_at = datetime.now(timezone.utc)
primary_name = ai_client.ai.primary.model if hasattr(ai_client.ai.primary, "model") else "primary"
note.model = f"mlx:{primary_name}"
note.updated_at = note.generated_at
await session.commit()
return SubjectNoteResponse(
subject=note.subject, scope=note.scope, content=note.content,
status="ready", generated_at=note.generated_at, model=note.model,
evidence=[e.to_dict() for e in ctx.all],
from_cache=False, can_regenerate=True,
)
# ─── 이드 W3-2: 학습 약점 진단 (study_diagnosis surface) ───
#
# 워커(study_weakness)가 산출한 최신 eid_study_weakness 스냅샷을 '학습 진단 코치'(study overlay)
# 로 번역. 약점/태도 '판정'은 코드 derived(스냅샷) — LLM 은 스냅샷 블록 값만 인용(환각 약점 차단).
# compose("study_diagnosis") = persona+rules+study overlay(+{placeholder}) → 표면이 블록 substitute.
DIAGNOSIS_TIMEOUT_S = settings.llm_call_timeout_s
class StudyDiagnosisResponse(BaseModel):
status: str # ready | none
content: str | None = None
model: str | None = None
generated_at: datetime | None = None
snapshot_at: datetime | None = None
review_set_draft_id: int | None = None
@router.post("/diagnosis/generate", response_model=StudyDiagnosisResponse)
async def generate_study_diagnosis(
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
):
"""누적 학습 약점/태도 진단(학습 진단 코치). 최신 약점 스냅샷을 코치 언어로 번역만.
워커 미가동(스냅샷 부재)이면 status='none''아직 진단 데이터 없음' 명시(빈약속/추측 회피).
"""
snap = (
await session.execute(
select(EidStudyWeakness)
.where(EidStudyWeakness.user_id == user.id, EidStudyWeakness.status == "active")
.order_by(EidStudyWeakness.created_at.desc())
.limit(1)
)
).scalar_one_or_none()
if snap is None:
return StudyDiagnosisResponse(status="none")
draft = (
await session.execute(
select(EidReviewSetDraft)
.where(
EidReviewSetDraft.user_id == user.id,
EidReviewSetDraft.source_weakness_id == snap.id, # 이 스냅샷이 산출한 draft만(W3 review #5)
)
.order_by(EidReviewSetDraft.created_at.desc())
.limit(1)
)
).scalar_one_or_none()
weakness_block = format_weakness_block(
snap.weaknesses or [], shallow_overall=snap.is_shallow_sample
)
if draft is not None and draft.question_ids:
weakness_block += (
f"\n《권장 복습세트 초안》 set #{draft.id} · {len(draft.question_ids)}문항 "
f"(reason={draft.reason}) — 사용자 1클릭 확인 후에만 실제 편성. 자율 편성 금지."
)
habit_block = format_habit_block(snap.habit_signals or {})
# compose 는 study overlay(placeholder 포함)를 system 에 넣음 → 표면이 placeholder 를 실데이터로 치환.
composed = compose("study_diagnosis", task="")
# fail-closed: overlay degrade(placeholder 부재)면 스냅샷 없이 LLM 돌릴 때 약점 날조 위험 →
# 진단 생략(status='none'). weakness·habit 두 placeholder 다 확인(W3 review #4).
if "{weakness_snapshot_block}" not in composed or "{habit_signal_block}" not in composed:
logger.error(
"study_diagnosis: study overlay degraded — placeholder 부재, 진단 생략(fail-closed) user=%s",
user.id,
)
return StudyDiagnosisResponse(status="none")
system = (
composed
.replace("{weakness_snapshot_block}", weakness_block)
.replace("{habit_signal_block}", habit_block)
)
prompt = (
"누적 학습 이력을 근거로 내 약점 토픽과 학습 태도를 진단해줘. "
"위 《약점 스냅샷》·《태도 신호》 블록에 있는 값만 인용하고, 블록에 없는 토픽·수치·약점명은 "
"만들지 마라. 약점 Top-N + 각 구체 근거 + (있으면) 권장 복습세트 초안을 제시하고, "
"각 토픽의 tier 가 정한 강도를 넘기지 마라(라벨=방향, tier=긴급도)."
)
ai_client = EidAIClient() # 이드 표면 = egress 코드층 박탈(call_primary only, W4-1)
raw_text: str | None = None
try:
async with acquire_mlx_gate(Priority.FOREGROUND):
async with asyncio.timeout(DIAGNOSIS_TIMEOUT_S):
raw_text = await ai_client.call_primary(prompt, system=system)
except asyncio.TimeoutError:
logger.warning("study_diagnosis_mlx_timeout user=%s", user.id)
except Exception:
logger.exception("study_diagnosis_mlx_failed user=%s", user.id)
finally:
await ai_client.close()
if not raw_text or not raw_text.strip():
raise HTTPException(status_code=503, detail="진단 생성 실패 (LLM)")
primary_name = ai_client.ai.primary.model if hasattr(ai_client.ai.primary, "model") else "primary"
return StudyDiagnosisResponse(
status="ready",
content=strip_thinking(raw_text).strip(),
model=f"mlx:{primary_name}",
generated_at=datetime.now(timezone.utc),
snapshot_at=snap.source_generated_at,
review_set_draft_id=draft.id if draft else None,
)
# ─── PR-10: 문제풀이 세션 (quiz_session) lifecycle ───
#
# 한 토픽당 in_progress 1개. 출제 시 session 행 생성 + question_ids 스냅샷.
# 풀이 중 attempt 마다 cursor + count 증가 (study_questions.submit_attempt 가 같은 트랜잭션에서).
# 마지막 문제 풀이 후 status='done' + finished_at 박힘.
class QuizMode(str, Enum):
"""PR-12-B: 출제 모드. 1차는 random 만. PR-12-C 에서 frequent_focus / wrong_variants 추가 예약."""
random = "random"
class QuizSessionStartRequest(BaseModel):
target_per_subject: int = Field(default=20, ge=1, le=100)
subject: str | None = Field(default=None, max_length=120)
scope: str | None = Field(default=None, max_length=200)
wrong_only: bool = False
abandon_existing: bool = False # true 면 기존 in_progress 세션을 abandoned 로 마감 후 새로
quiz_mode: QuizMode = QuizMode.random
# Phase 1-E: bucket + stage 비율 기반 선별. stage 설정 시 기존 _select_questions_for_topic
# 우회하고 select_questions_for_quiz 사용. size 가 명시되면 size 만큼 출제 (subject 무관).
stage: str | None = Field(default=None, pattern="^(intro|learning|pre_exam)$")
size: int | None = Field(default=None, ge=1, le=200)
# Phase 2-E: 복습함에서 명시 선택된 문제로 세션. 우선순위 stage > question_ids > 기존 경로.
# 출제 순서는 클라이언트가 보낸 그대로 보존 (사용자 인지 순서). 한도 200, 중복 제거됨.
question_ids: list[int] | None = Field(default=None)
class QuizSessionSummary(BaseModel):
"""진행 카드 + 결과 카드 공통 (가벼운 메타). 문제 본문은 상세 endpoint 에서."""
id: int
status: str
cursor: int
total: int
correct_count: int
wrong_count: int
unsure_count: int
target_per_subject: int
subject_filter: str | None
wrong_only: bool
quiz_mode: str # PR-12-B: 세션 메타. 향후 모드별 정답률 통계용.
subject_distribution: dict
created_at: datetime
updated_at: datetime
finished_at: datetime | None
# 결과 카드 헤더 "미확인 N건" 용 — done 세션에서만 의미.
unreviewed_wrong_unsure_count: int = 0
# Phase 2-B: finalize 시점 스냅샷 (DB 영속화 — 세션 종료 후 변하지 않음).
newly_correct_count: int = 0
relapsed_count: int = 0
recovered_count: int = 0
chronic_remaining_count: int = 0
# Phase 2-B: 동적 (지금 시점) 할 일 카운트. 결과 화면에서 다른 세션이 끼어도 fresh.
pending_review_count: int = 0
chronic_count: int = 0
regressed_count: int = 0
class QuizSessionListResponse(BaseModel):
active: QuizSessionSummary | None
recent_done: list[QuizSessionSummary]
async def _build_session_summary(
s: StudyQuizSession,
session: AsyncSession,
*,
include_progress_counts: bool = False,
) -> QuizSessionSummary:
"""status='done' 일 때 미확인 카운트 + (옵션) 지금 시점 progress 기반 할 일 카운트 계산.
include_progress_counts=True 면 결과 화면 헤더용 동적 카운트 3종 (pending_review/chronic/regressed)
SQL 3회 추가. 목록 endpoint 에서는 호출당 비용을 피하려고 default False.
"""
unreviewed = 0
if s.status == "done":
row = (
await session.execute(
select(func.count())
.select_from(StudyQuestionAttempt)
.where(
StudyQuestionAttempt.quiz_session_id == s.id,
StudyQuestionAttempt.outcome.in_(("wrong", "unsure")),
StudyQuestionAttempt.reviewed_at.is_(None),
)
)
).scalar() or 0
unreviewed = int(row)
pending_review_count = 0
chronic_count = 0
regressed_count = 0
if include_progress_counts:
from models.study_question_progress import StudyQuestionProgress
pr_row = (
await session.execute(
select(func.count())
.select_from(StudyQuestionProgress)
.where(
StudyQuestionProgress.user_id == s.user_id,
StudyQuestionProgress.study_topic_id == s.study_topic_id,
StudyQuestionProgress.last_outcome.in_(("wrong", "unsure")),
sql_or(
StudyQuestionProgress.last_reviewed_at.is_(None),
and_(
StudyQuestionProgress.last_reviewed_at.is_not(None),
StudyQuestionProgress.last_attempted_at.is_not(None),
StudyQuestionProgress.last_reviewed_at
< StudyQuestionProgress.last_attempted_at,
),
),
)
)
).scalar() or 0
pending_review_count = int(pr_row)
ch_row = (
await session.execute(
select(func.count())
.select_from(StudyQuestionProgress)
.where(
StudyQuestionProgress.user_id == s.user_id,
StudyQuestionProgress.study_topic_id == s.study_topic_id,
StudyQuestionProgress.pattern_state == "chronic_wrong",
)
)
).scalar() or 0
chronic_count = int(ch_row)
rg_row = (
await session.execute(
select(func.count())
.select_from(StudyQuestionProgress)
.where(
StudyQuestionProgress.user_id == s.user_id,
StudyQuestionProgress.study_topic_id == s.study_topic_id,
StudyQuestionProgress.pattern_state == "regressed",
)
)
).scalar() or 0
regressed_count = int(rg_row)
return QuizSessionSummary(
id=s.id,
status=s.status,
cursor=s.cursor,
total=len(s.question_ids or []),
correct_count=s.correct_count,
wrong_count=s.wrong_count,
unsure_count=s.unsure_count,
target_per_subject=s.target_per_subject,
subject_filter=s.subject_filter,
wrong_only=s.wrong_only,
quiz_mode=s.quiz_mode,
subject_distribution=s.subject_distribution or {},
created_at=s.created_at,
updated_at=s.updated_at,
finished_at=s.finished_at,
unreviewed_wrong_unsure_count=unreviewed,
newly_correct_count=s.newly_correct_count,
relapsed_count=s.relapsed_count,
recovered_count=s.recovered_count,
chronic_remaining_count=s.chronic_remaining_count,
pending_review_count=pending_review_count,
chronic_count=chronic_count,
regressed_count=regressed_count,
)
async def _select_questions_for_topic(
session: AsyncSession,
user: User,
topic_id: int,
*,
subject: str | None,
scope: str | None,
target_per_subject: int,
wrong_only: bool,
apply_spacing: bool = False,
) -> tuple[list[int], dict[str, int]]:
"""과목별 균등 추출 + (옵션) PR-12-B type spacing.
apply_spacing=True 면 subject bucket 별로 buffer × SPACING_BUFFER_RATIO 만큼
뽑은 뒤 apply_type_spacing 으로 같은 유형 과밀 방지. 회차 무관.
"""
from services.study.related_types import (
SPACING_BUFFER_RATIO,
apply_type_spacing,
make_spacing_candidates_from_rows,
)
base_filter = and_(
StudyQuestion.user_id == user.id,
StudyQuestion.study_topic_id == topic_id,
StudyQuestion.deleted_at.is_(None),
StudyQuestion.is_active.is_(True),
)
wrong_qids: set[int] | None = None
if wrong_only:
latest = (
await session.execute(
select(
StudyQuestionAttempt.study_question_id,
StudyQuestionAttempt.is_correct,
)
.where(
StudyQuestionAttempt.user_id == user.id,
StudyQuestionAttempt.study_topic_id == topic_id,
)
.order_by(
StudyQuestionAttempt.study_question_id,
StudyQuestionAttempt.answered_at.desc(),
)
.distinct(StudyQuestionAttempt.study_question_id)
)
).all()
wrong_qids = {r.study_question_id for r in latest if r.is_correct is False}
if not wrong_qids:
return [], {}
if subject is not None:
subjects: list[str | None] = [subject]
else:
subj_query = (
select(StudyQuestion.subject)
.where(base_filter)
.distinct()
)
if scope is not None:
subj_query = subj_query.where(StudyQuestion.scope == scope)
if wrong_qids is not None:
subj_query = subj_query.where(StudyQuestion.id.in_(wrong_qids))
subjects = [r[0] for r in (await session.execute(subj_query)).all()]
if not subjects:
return [], {}
selected_ids: list[int] = []
distribution: dict[str, int] = {}
for subj in subjects:
if apply_spacing:
# spacing buffer × ratio. subject bucket 안에서만 cosine 비교.
buffer_limit = max(target_per_subject, int(target_per_subject * SPACING_BUFFER_RATIO))
sub_q = select(
StudyQuestion.id,
StudyQuestion.embedding,
StudyQuestion.embedding_status,
).where(base_filter)
else:
sub_q = select(StudyQuestion.id).where(base_filter)
buffer_limit = target_per_subject
if subj is None:
sub_q = sub_q.where(StudyQuestion.subject.is_(None))
else:
sub_q = sub_q.where(StudyQuestion.subject == subj)
if scope is not None:
sub_q = sub_q.where(StudyQuestion.scope == scope)
if wrong_qids is not None:
sub_q = sub_q.where(StudyQuestion.id.in_(wrong_qids))
sub_q = sub_q.order_by(func.random()).limit(buffer_limit)
rows = (await session.execute(sub_q)).all()
if apply_spacing:
# rows 가 _SpacingCandidate 의 row protocol 만족 (id/embedding/embedding_status).
spacing_cands = make_spacing_candidates_from_rows(rows)
spacing_res = await apply_type_spacing(
spacing_cands,
target=target_per_subject,
subject_label=subj or "",
)
picked = spacing_res.selected_ids
else:
picked = [r[0] for r in rows[:target_per_subject]]
if picked:
selected_ids.extend(picked)
distribution[subj or ""] = len(picked)
_random.shuffle(selected_ids)
return selected_ids, distribution
@router.get("/{topic_id}/quiz-sessions", response_model=QuizSessionListResponse)
async def list_quiz_sessions(
topic_id: int,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
limit: int = Query(20, ge=1, le=100),
):
"""진행 중 + 최근 완료 N건. 통합뷰 진행/결과 카드용."""
topic = await session.get(StudyTopic, topic_id)
_verify_topic_ownership(topic, user)
active_row = (
await session.execute(
select(StudyQuizSession).where(
StudyQuizSession.user_id == user.id,
StudyQuizSession.study_topic_id == topic_id,
StudyQuizSession.status == "in_progress",
)
)
).scalar_one_or_none()
active_summary = await _build_session_summary(active_row, session) if active_row else None
done_rows = (
await session.execute(
select(StudyQuizSession)
.where(
StudyQuizSession.user_id == user.id,
StudyQuizSession.study_topic_id == topic_id,
StudyQuizSession.status == "done",
)
.order_by(StudyQuizSession.finished_at.desc().nulls_last())
.limit(limit)
)
).scalars().all()
done = [await _build_session_summary(s, session) for s in done_rows]
return QuizSessionListResponse(active=active_summary, recent_done=done)
@router.post("/{topic_id}/quiz-sessions", response_model=QuizSessionSummary, status_code=201)
async def start_quiz_session(
topic_id: int,
body: QuizSessionStartRequest,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
):
"""새 세션 시작. 기존 in_progress 가 있으면:
- body.abandon_existing=false → 기존 세션 그대로 반환 (이어풀기).
- body.abandon_existing=true → 기존을 abandoned 로 마감 후 새로 출제.
"""
topic = await session.get(StudyTopic, topic_id)
_verify_topic_ownership(topic, user)
existing = (
await session.execute(
select(StudyQuizSession).where(
StudyQuizSession.user_id == user.id,
StudyQuizSession.study_topic_id == topic_id,
StudyQuizSession.status == "in_progress",
)
)
).scalar_one_or_none()
if existing is not None:
if not body.abandon_existing:
# 이어풀기 — 기존 그대로 반환.
return await _build_session_summary(existing, session)
existing.status = "abandoned"
existing.updated_at = datetime.now(timezone.utc)
await session.flush()
# Phase 1-E: stage 명시 시 bucket + 비율 기반 선별 (단일 풀이 진입점 vision).
# Phase 2-E: question_ids 명시 시 복습함 multi-select 경로 — 검증만 + 순서 보존.
# stage 미명시 시 기존 subject bucket + spacing (PR-12-B 호환).
if body.stage is not None:
from services.study.quiz_selection import select_questions_for_quiz
size = body.size or 100
try:
qids, distribution = await select_questions_for_quiz(
session,
user_id=user.id,
study_topic_id=topic_id,
stage=body.stage,
size=size,
)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
if not qids:
raise HTTPException(status_code=400, detail="출제 가능한 문제가 없습니다")
elif body.question_ids is not None:
# 중복 제거 (순서 보존)
seen: set[int] = set()
wanted: list[int] = []
for qid in body.question_ids:
if qid not in seen:
seen.add(qid)
wanted.append(qid)
if not wanted:
raise HTTPException(status_code=400, detail="문제를 1개 이상 선택해주세요")
if len(wanted) > 200:
raise HTTPException(status_code=400, detail="한 세션에 최대 200문제까지 선택 가능합니다")
# 유효성 검증 — user × topic 소속 + 미삭제. soft-deleted 또는 다른 user/topic 의 qid 는 제거.
valid_rows = (
await session.execute(
select(StudyQuestion.id, StudyQuestion.subject)
.where(
StudyQuestion.id.in_(wanted),
StudyQuestion.user_id == user.id,
StudyQuestion.study_topic_id == topic_id,
StudyQuestion.deleted_at.is_(None),
)
)
).all()
valid_set = {r.id for r in valid_rows}
qids = [qid for qid in wanted if qid in valid_set]
if not qids:
raise HTTPException(status_code=400, detail="선택한 문제가 이 주제에 존재하지 않습니다")
# subject distribution 계산 (결과 카드 통계용)
subject_by_id = {r.id: (r.subject or "(미분류)") for r in valid_rows}
distribution: dict[str, int] = {}
for qid in qids:
sub = subject_by_id.get(qid, "(미분류)")
distribution[sub] = distribution.get(sub, 0) + 1
else:
# 기존 PR-12-B 경로: subject bucket + type spacing.
apply_spacing = body.quiz_mode == QuizMode.random
qids, distribution = await _select_questions_for_topic(
session,
user,
topic_id,
subject=body.subject,
scope=body.scope,
target_per_subject=body.target_per_subject,
wrong_only=body.wrong_only,
apply_spacing=apply_spacing,
)
if not qids:
raise HTTPException(status_code=400, detail="출제 가능한 문제가 없습니다")
new_session = StudyQuizSession(
user_id=user.id,
study_topic_id=topic_id,
target_per_subject=body.target_per_subject,
subject_filter=body.subject,
wrong_only=body.wrong_only,
quiz_mode=body.quiz_mode.value,
question_ids=qids,
subject_distribution=distribution,
status="in_progress",
cursor=0,
)
session.add(new_session)
try:
await session.commit()
except IntegrityError:
# partial unique idx 충돌 — 동시에 다른 호출이 in_progress 만든 경우.
await session.rollback()
existing = (
await session.execute(
select(StudyQuizSession).where(
StudyQuizSession.user_id == user.id,
StudyQuizSession.study_topic_id == topic_id,
StudyQuizSession.status == "in_progress",
)
)
).scalar_one_or_none()
if existing is None:
raise HTTPException(status_code=409, detail="quiz_session 충돌 — 다시 시도하세요")
return await _build_session_summary(existing, session)
return await _build_session_summary(new_session, session)
class QuizSessionAttemptItem(BaseModel):
"""결과 카드 expand 시 카드별 메타 (attempt + 학습완료 상태)."""
attempt_id: int
question_id: int
selected_choice: int | None
correct_choice: int
outcome: str
answered_at: datetime
reviewed_at: datetime | None
class QuizSessionAnalysisOut(BaseModel):
"""Phase 4-B v1: 결과 화면 헤더 카드용. summary_md 박혔으면 본문 표시,
없으면 job_status / job_error_code 보고 placeholder 분기."""
summary_md: str | None
confidence: str | None
generated_at: datetime | None
is_stale: bool
job_status: str | None # 최신 job — 'pending'/'processing'/'completed'/'failed'/'skipped'
job_error_code: str | None # 최신 job 의 error_code (실패/skip 사유 노출용)
class QuizSessionDetailResponse(BaseModel):
summary: QuizSessionSummary
questions: list[dict] # ReviewQuestionItem 호환 shape
attempts: list[QuizSessionAttemptItem] # 풀이된 것만 (cursor 까지)
ai_session_analysis: QuizSessionAnalysisOut | None = None
def _attempt_stats_dict_default() -> dict:
return {"attempt_count": 0, "correct_count": 0, "wrong_count": 0}
@router.get("/{topic_id}/quiz-sessions/{session_id}", response_model=QuizSessionDetailResponse)
async def get_quiz_session(
topic_id: int,
session_id: int,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
):
"""풀이 화면 / 결과 화면 공용. questions 는 출제 순서. attempts 는 cursor 까지 풀이된 것만."""
topic = await session.get(StudyTopic, topic_id)
_verify_topic_ownership(topic, user)
qs = await session.get(StudyQuizSession, session_id)
if qs is None or qs.user_id != user.id or qs.study_topic_id != topic_id:
raise HTTPException(status_code=404, detail="quiz_session 을 찾을 수 없습니다")
qids = list(qs.question_ids or [])
questions_payload: list[dict] = []
if qids:
rows = (
await session.execute(
select(StudyQuestion).where(StudyQuestion.id.in_(qids))
)
).scalars().all()
by_id = {q.id: q for q in rows}
# 이미지 batch — 기존 study_questions._images_for_questions_batch 재사용 (sort_order 기준).
from api.study_questions import _images_for_questions_batch
images_typed = await _images_for_questions_batch(session, qids)
images_map: dict[int, list[dict]] = {
qid: [it.model_dump() for it in items] for qid, items in images_typed.items()
}
# attempt count batch (per question)
stat_rows = (
await session.execute(
select(
StudyQuestionAttempt.study_question_id,
func.count().label("total"),
func.coalesce(
func.sum(case((StudyQuestionAttempt.is_correct.is_(True), 1), else_=0)),
0,
).label("correct"),
)
.where(
StudyQuestionAttempt.user_id == user.id,
StudyQuestionAttempt.study_question_id.in_(qids),
)
.group_by(StudyQuestionAttempt.study_question_id)
)
).all()
stats_map: dict[int, dict] = {}
for r in stat_rows:
t = int(r.total)
c = int(r.correct)
stats_map[r.study_question_id] = {
"attempt_count": t, "correct_count": c, "wrong_count": t - c
}
# 출제 순서 그대로 nest
for qid in qids:
q = by_id.get(qid)
if q is None:
continue
questions_payload.append({
"id": q.id,
"question_text": q.question_text,
"choices": [
{"number": 1, "text": q.choice_1},
{"number": 2, "text": q.choice_2},
{"number": 3, "text": q.choice_3},
{"number": 4, "text": q.choice_4},
],
"subject": q.subject,
"scope": q.scope,
"exam_round": q.exam_round,
"explanation": q.explanation,
"stats": stats_map.get(q.id, _attempt_stats_dict_default()),
"images": images_map.get(q.id, []),
# done 세션 결과 화면용 — 정답 노출. in_progress 세션은 클라이언트가 cursor 까지만 표시.
"correct_choice": q.correct_choice if qs.status == "done" else None,
# Phase 4-A: 결과 헤더의 AI 풀이 진척 indicator 용. wrong/unsure 와 결합해 카운트.
"ai_explanation_status": q.ai_explanation_status,
})
# 이 세션의 attempts (cursor 까지). 같은 question 에 여러 attempts 있을 수 있지만,
# 이 세션 내에서는 정상 흐름상 question 당 1개. quiz_session_id 기준으로 가져옴.
attempt_rows = (
await session.execute(
select(StudyQuestionAttempt)
.where(
StudyQuestionAttempt.user_id == user.id,
StudyQuestionAttempt.quiz_session_id == qs.id,
)
.order_by(StudyQuestionAttempt.answered_at.asc())
)
).scalars().all()
attempts_payload = [
QuizSessionAttemptItem(
attempt_id=a.id,
question_id=a.study_question_id,
selected_choice=a.selected_choice,
correct_choice=a.correct_choice,
outcome=a.outcome,
answered_at=a.answered_at,
reviewed_at=a.reviewed_at,
)
for a in attempt_rows
]
summary = await _build_session_summary(qs, session, include_progress_counts=True)
# Phase 4-A: 결과 화면 GET fallback — finalize enqueue 누락 또는 worker 처리 전
# 사용자가 결과 들어온 경우 같은 wrong/unsure qid 를 idempotent backfill enqueue.
# 한 요청당 30 cap + non-blocking + debug 로그. 실패해도 GET 200 유지.
if qs.status == "done":
try:
from services.study.explanation_enqueue import enqueue_explanation_for_qids
wrong_unsure_qids = [
a.study_question_id for a in attempt_rows
if a.outcome in ("wrong", "unsure")
]
if wrong_unsure_qids:
res = await enqueue_explanation_for_qids(
session,
user_id=user.id,
qids=wrong_unsure_qids,
max_count=30, # GET fallback cap
)
await session.commit()
logger.debug(
"phase4a_get_backfill session=%s candidates=%s enqueued=%s skipped=%s",
qs.id, res["candidate_count"], res["enqueue_count"], res["skipped_count"],
)
except Exception as e:
logger.warning(
"phase4a_get_backfill_failed session=%s: %s: %s",
qs.id, type(e).__name__, e,
)
# Phase 4-B v1: ai_session_analysis 응답 (결과 캐시 LEFT JOIN + 최신 job)
ai_session_analysis: QuizSessionAnalysisOut | None = None
if qs.status == "done":
try:
from models.study_quiz_session_analysis import StudyQuizSessionAnalysis
from models.study_quiz_session_job import StudyQuizSessionJob
an_row = (
await session.execute(
select(StudyQuizSessionAnalysis).where(
StudyQuizSessionAnalysis.study_quiz_session_id == qs.id
)
)
).scalar_one_or_none()
latest_job = (
await session.execute(
select(StudyQuizSessionJob)
.where(StudyQuizSessionJob.study_quiz_session_id == qs.id)
.order_by(StudyQuizSessionJob.id.desc())
.limit(1)
)
).scalar_one_or_none()
if an_row is not None or latest_job is not None:
ai_session_analysis = QuizSessionAnalysisOut(
summary_md=an_row.summary_md if an_row else None,
confidence=an_row.confidence if an_row else None,
generated_at=an_row.generated_at if an_row else None,
is_stale=bool(an_row.is_stale) if an_row else False,
job_status=latest_job.status if latest_job else None,
job_error_code=latest_job.error_code if latest_job else None,
)
# GET fallback enqueue — 기존 analysis 또는 active job 없으면만 시도.
# best-effort: 실패가 GET 응답 깨지 않게 try/except.
try:
if an_row is None and (latest_job is None or latest_job.status not in ("pending", "processing")):
from services.study.session_analysis_enqueue import enqueue_session_analysis_auto
res = await enqueue_session_analysis_auto(
session, user_id=user.id, study_quiz_session_id=qs.id,
)
await session.commit()
logger.debug(
"phase4b_get_backfill session=%s enqueued=%s",
qs.id, res["enqueued"],
)
except Exception as e:
logger.warning(
"phase4b_get_backfill_failed session=%s: %s: %s",
qs.id, type(e).__name__, e,
)
except Exception as e:
logger.warning(
"phase4b_get_analysis_failed session=%s: %s: %s",
qs.id, type(e).__name__, e,
)
return QuizSessionDetailResponse(
summary=summary,
questions=questions_payload,
attempts=attempts_payload,
ai_session_analysis=ai_session_analysis,
)
class QuizSessionPatchRequest(BaseModel):
"""abandon → status='abandoned'. 다른 필드 패치는 현재 안 받음."""
action: str = Field(..., pattern="^(abandon)$")
@router.patch("/{topic_id}/quiz-sessions/{session_id}", response_model=QuizSessionSummary)
async def patch_quiz_session(
topic_id: int,
session_id: int,
body: QuizSessionPatchRequest,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
):
"""현재 abandon 만 지원 — 진행 중 세션 포기 (다시 시작 시 사용)."""
topic = await session.get(StudyTopic, topic_id)
_verify_topic_ownership(topic, user)
qs = await session.get(StudyQuizSession, session_id)
if qs is None or qs.user_id != user.id or qs.study_topic_id != topic_id:
raise HTTPException(status_code=404, detail="quiz_session 을 찾을 수 없습니다")
if body.action == "abandon":
if qs.status != "in_progress":
raise HTTPException(status_code=409, detail=f"이미 {qs.status} 상태입니다")
qs.status = "abandoned"
qs.updated_at = datetime.now(timezone.utc)
await session.commit()
return await _build_session_summary(qs, session)
# ─── Phase 4-B v1: 세션 분석 재생성 ───
class RegenerateSummaryResponse(BaseModel):
enqueued: bool
reason: str | None = None # insufficient_attempts / already_active / not_done / not_found / race_lost
@router.post(
"/{topic_id}/quiz-sessions/{session_id}/regenerate-summary",
response_model=RegenerateSummaryResponse,
)
async def regenerate_session_summary(
topic_id: int,
session_id: int,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
):
"""사용자 [재생성] 버튼 — wrong/unsure < 5 즉시 차단 + active job 차단 + is_stale 처리.
Plan ~/.claude/plans/nifty-sparking-spindle.md (Phase 4-B v1) 의 Manual 트리거 정책 그대로.
"""
topic = await session.get(StudyTopic, topic_id)
_verify_topic_ownership(topic, user)
qs = await session.get(StudyQuizSession, session_id)
if qs is None or qs.user_id != user.id or qs.study_topic_id != topic_id:
raise HTTPException(status_code=404, detail="quiz_session 을 찾을 수 없습니다")
from services.study.session_analysis_enqueue import request_session_analysis_regenerate
res = await request_session_analysis_regenerate(
session, user_id=user.id, study_quiz_session_id=session_id,
)
await session.commit()
return RegenerateSummaryResponse(enqueued=res["enqueued"], reason=res.get("reason"))
# ─── PR-12-A: 반복 출제 / 유사 유형 배치 카운트 ───
class RelatedTypesBulkRequest(BaseModel):
question_ids: list[int] = Field(..., min_length=1, max_length=200)
class RelatedTypesBulkItem(BaseModel):
repeat_related_count: int
repeat_round_count: int
repeat_grade: str | None = None
similar_related_count: int
similar_round_count: int
class RelatedTypesBulkResponse(BaseModel):
# 입력 question_ids 전체 보존 — 권한 없음/임베딩 미준비/회차 미지정도 (0,0,0,0).
items: dict[int, RelatedTypesBulkItem]
@router.post("/{topic_id}/related-types-bulk", response_model=RelatedTypesBulkResponse)
async def related_types_bulk(
topic_id: int,
body: RelatedTypesBulkRequest,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
):
"""카드별 배지(round_count) 표시용 배치 카운트.
비교 대상 = 같은 topic 안 모든 ready 문제 (입력 qid 끼리만 비교 X).
응답 dict 는 입력 qid 전체 보존 — 누락 X.
"""
from services.study.related_types import classify_related_bulk
topic = await session.get(StudyTopic, topic_id)
_verify_topic_ownership(topic, user)
counts = await classify_related_bulk(
session,
user_id=user.id,
study_topic_id=topic_id,
question_ids=body.question_ids,
)
return RelatedTypesBulkResponse(
items={
qid: RelatedTypesBulkItem(
repeat_related_count=c.repeat_related_count,
repeat_round_count=c.repeat_round_count,
repeat_grade=c.repeat_grade,
similar_related_count=c.similar_related_count,
similar_round_count=c.similar_round_count,
)
for qid, c in counts.items()
}
)