Files
hyungi_document_server/app/api/study_topics.py
T
Hyungi Ahn d968b2d901 feat(study): 문제풀이 모드 개편 + 결과 분류 + 분야 설명 (PR-9)
- 라벨 "복습 시작" → "문제풀이"
- attempts.outcome 컬럼 + selected_choice nullable (correct/wrong/unsure)
- 풀이 중 정답·해설·AI·비슷한 문제 모두 비노출, 답 클릭 시 자동 진행
- "모르겠음" 5번째 옵션 추가
- 결과 화면 = 정답/틀린/모르겠음 3 카테고리 탭, 카드 클릭 expand
  - 틀린 → PR-3 AI 해설 (RAG)
  - 모르겠음 → 분야(subject+scope) 설명 AI 즉석 생성 + 캐시 (PR-9 신규)
- 분야 설명 RAG: 매핑 documents 청크 + 같은 분야 다른 문제·해설 → bge-reranker
- 마이그레이션 200~205 (single-statement, asyncpg 호환)

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-28 15:58:35 +09:00

1213 lines
41 KiB
Python

"""학습 워크스페이스(study_topic) API — 필기 세션 + 자료(library document) 묶음 컨테이너
핵심 개념:
- study_topic 은 단순 폴더/태그가 아니라 학습 자산 1차 컨테이너 (학습 워크스페이스).
- 향후 단어장/오디오/문제 세트 같은 자산이 같은 컨테이너 아래로 들어올 수 있도록
응답 구조를 sections + stats 형태로 설계 (현재는 sessions / documents 두 키만).
- documents.category(자료실 UI 축) 와 직교한 별도 분류 축. 자료실 facet/카테고리는 미터치.
- StudySession.certification/subject/topic 은 보존 — 본 컨테이너 와 직교한 세부 메타.
설계 제약:
- study_type 은 강한 enum 미사용. VALID_STUDY_TYPES 는 단순 권장값 (DB/Pydantic 강제 안 함).
- soft delete (deleted_at). active 행끼리만 (user_id, name) partial unique.
- 권한: 모든 쿼리에 user_id 강제. 다른 사용자 자료를 묶지 못함.
- 문서 매핑은 N:M (study_topic_documents). 세션 매핑은 1:N (study_sessions.study_topic_id).
- polymorphic 단일 study_topic_items 테이블은 만들지 않는다 (영구 금지).
"""
import asyncio
import logging
from datetime import datetime, timezone
from pathlib import Path as _Path
from typing import Annotated, Any
from fastapi import APIRouter, Depends, HTTPException, Query
from pydantic import BaseModel, Field
from sqlalchemy import and_, delete, func, select, text as sql_text, update
from sqlalchemy.exc import IntegrityError
from sqlalchemy.ext.asyncio import AsyncSession
from ai.client import AIClient, strip_thinking
from core.auth import get_current_user
from core.database import get_session
from core.library import LIBRARY_PREFIX, normalize_library_path
from models.document import Document
from models.study_session import StudySession
from models.study_topic import StudyTopic, StudyTopicDocument
from models.study_topic_subject_note import StudyTopicSubjectNote
from models.user import User
from services.search.llm_gate import get_mlx_gate
from services.study.subject_note_rag import (
SubjectNoteContext,
gather_subject_note_context,
render_evidence_block,
)
logger = logging.getLogger(__name__)
router = APIRouter()
# ─── 권장값 (강제 enum 아님, UI 안내용) ───
RECOMMENDED_STUDY_TYPES: set[str] = {
"certification", "language", "school", "work", "general",
}
# ─── Pydantic 스키마 ───
class StudyTopicCreate(BaseModel):
name: str = Field(min_length=1, max_length=120)
description: str | None = None
color: str | None = Field(default=None, max_length=20)
study_type: str | None = Field(default=None, max_length=40)
sort_order: int = 0
# PR-6: 시험 메타
exam_round_size: int | None = Field(default=None, ge=1, le=300)
exam_subjects: list[str] = []
class StudyTopicUpdate(BaseModel):
name: str | None = Field(default=None, min_length=1, max_length=120)
description: str | None = None
color: str | None = Field(default=None, max_length=20)
study_type: str | None = Field(default=None, max_length=40)
sort_order: int | None = None
# PR-6: 시험 메타
exam_round_size: int | None = Field(default=None, ge=1, le=300)
exam_subjects: list[str] | None = None
class StudyTopicResponse(BaseModel):
"""주제 목록 응답 — 집계 카운트 포함."""
id: int
name: str
description: str | None
color: str | None
study_type: str | None
sort_order: int
session_count: int = 0
document_count: int = 0
question_count: int = 0
# PR-6: 시험 메타
exam_round_size: int | None = None
exam_subjects: list[str] = []
created_at: datetime
updated_at: datetime
class StudyTopicListResponse(BaseModel):
items: list[StudyTopicResponse]
total: int
class StudyTopicSessionSummary(BaseModel):
"""상세 뷰의 세션 카드 페이로드 — 통합 뷰 렌더용 최소 필드."""
id: int
study_type: str
certification: str | None
language_code: str | None
learning_level: str | None
subject: str | None
topic: str | None
mode: str
repetition_count: int
review_state: str | None
created_at: datetime
updated_at: datetime
class StudyTopicDocumentSummary(BaseModel):
"""상세 뷰의 자료 카드 페이로드.
library_paths: 통합뷰 자료 섹션의 카테고리 트리 그룹핑용. user_tags 의
`@library/<path>` 태그에서 prefix 제거한 path 들. 한 자료가 여러 카테고리에
속할 수 있으나(다중 분류), 트리 그룹핑 시 첫 번째 path 만 사용. 빈 리스트면
"분류 없음" 그룹.
"""
id: int
title: str | None
file_format: str
file_type: str
category: str | None
ai_domain: str | None
importance: str | None
sort_order: int
linked_at: datetime
library_paths: list[str] = []
class StudyTopicQuestionSummary(BaseModel):
"""상세 뷰의 문제 카드 페이로드 — 정답·해설 비공개, 본문 80자 truncate."""
id: int
question_text: str
subject: str | None
scope: str | None
exam_name: str | None
exam_round: str | None
is_active: bool
attempt_count: int
last_correct: bool | None
created_at: datetime
class StudyTopicSections(BaseModel):
"""확장 친화 dict — 향후 audio_assets / vocab_decks 키 추가 가능."""
sessions: list[StudyTopicSessionSummary]
documents: list[StudyTopicDocumentSummary]
questions: list[StudyTopicQuestionSummary] = []
class StudyTopicStats(BaseModel):
"""자산 카운트 — 0 으로 미리 노출해서 후속 PR 에서 필드만 채움.
question_count = PR-2 단일 문제 수. question_set_count = 후속 PR 의 회차/모의고사 묶음 수 (둘은 의미 다름).
"""
session_count: int
document_count: int
question_count: int = 0
audio_count: int = 0
vocab_count: int = 0
question_set_count: int = 0
class StudyTopicMeta(BaseModel):
id: int
name: str
description: str | None
color: str | None
study_type: str | None
sort_order: int
# PR-6: 시험 메타
exam_round_size: int | None = None
exam_subjects: list[str] = []
created_at: datetime
updated_at: datetime
class StudyTopicDetailResponse(BaseModel):
topic: StudyTopicMeta
sections: StudyTopicSections
stats: StudyTopicStats
class StudyTopicDocumentLinkRequest(BaseModel):
document_ids: list[int] = Field(min_length=1, max_length=100)
class StudyTopicDocumentLinkResponse(BaseModel):
linked: list[int]
skipped_existing: list[int]
skipped_not_found: list[int]
class StudyTopicDocumentByPathRequest(BaseModel):
"""카테고리 트리 노드 일괄 추가용. path prefix 매칭으로 하위 자료 모두 매핑.
include_subtree=True 면 path 와 그 하위 카테고리 전체, False 면 path 정확 일치만.
"""
path: str = Field(min_length=1, max_length=500)
include_subtree: bool = True
class StudyTopicDocumentByPathResponse(BaseModel):
linked_count: int
skipped_existing_count: int
total_in_path: int
path: str
# ─── Helpers ───
def _verify_topic_ownership(topic: StudyTopic | None, user: User) -> StudyTopic:
"""소유자 검증 + soft-deleted 행 차단. mismatch 도 404 (정보 누설 방지)."""
if topic is None or topic.user_id != user.id or topic.deleted_at is not None:
raise HTTPException(status_code=404, detail="학습 주제를 찾을 수 없습니다")
return topic
def _meta_from_topic(t: StudyTopic) -> StudyTopicMeta:
return StudyTopicMeta(
id=t.id,
name=t.name,
description=t.description,
color=t.color,
study_type=t.study_type,
sort_order=t.sort_order,
exam_round_size=t.exam_round_size,
exam_subjects=t.exam_subjects or [],
created_at=t.created_at,
updated_at=t.updated_at,
)
# ─── 엔드포인트 ───
@router.get("/", response_model=StudyTopicListResponse)
async def list_study_topics(
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
limit: int = Query(100, ge=1, le=500),
offset: int = Query(0, ge=0),
):
"""사용자의 active 주제 목록. 세션 수·자료 수 집계 포함."""
# active 주제 + 세션 수 (LEFT JOIN study_sessions)
sess_count_sub = (
select(
StudySession.study_topic_id.label("topic_id"),
func.count().label("c"),
)
.where(StudySession.user_id == user.id)
.where(StudySession.study_topic_id.is_not(None))
.group_by(StudySession.study_topic_id)
.subquery()
)
doc_count_sub = (
select(
StudyTopicDocument.study_topic_id.label("topic_id"),
func.count().label("c"),
)
.where(StudyTopicDocument.user_id == user.id)
.group_by(StudyTopicDocument.study_topic_id)
.subquery()
)
# PR-2 — 활성 문제 카운트 (soft-deleted 제외)
from models.study_question import StudyQuestion as _SQ
q_count_sub = (
select(
_SQ.study_topic_id.label("topic_id"),
func.count().label("c"),
)
.where(_SQ.user_id == user.id, _SQ.deleted_at.is_(None))
.group_by(_SQ.study_topic_id)
.subquery()
)
base = (
select(
StudyTopic,
func.coalesce(sess_count_sub.c.c, 0).label("session_count"),
func.coalesce(doc_count_sub.c.c, 0).label("document_count"),
func.coalesce(q_count_sub.c.c, 0).label("question_count"),
)
.outerjoin(sess_count_sub, sess_count_sub.c.topic_id == StudyTopic.id)
.outerjoin(doc_count_sub, doc_count_sub.c.topic_id == StudyTopic.id)
.outerjoin(q_count_sub, q_count_sub.c.topic_id == StudyTopic.id)
.where(StudyTopic.user_id == user.id, StudyTopic.deleted_at.is_(None))
.order_by(StudyTopic.sort_order.asc(), StudyTopic.id.desc())
)
total_query = select(func.count()).select_from(
select(StudyTopic.id)
.where(StudyTopic.user_id == user.id, StudyTopic.deleted_at.is_(None))
.subquery()
)
total = (await session.execute(total_query)).scalar() or 0
rows = (await session.execute(base.offset(offset).limit(limit))).all()
items = [
StudyTopicResponse(
id=t.id,
name=t.name,
description=t.description,
color=t.color,
study_type=t.study_type,
sort_order=t.sort_order,
session_count=int(sc),
document_count=int(dc),
question_count=int(qc),
exam_round_size=t.exam_round_size,
exam_subjects=t.exam_subjects or [],
created_at=t.created_at,
updated_at=t.updated_at,
)
for t, sc, dc, qc in rows
]
return StudyTopicListResponse(items=items, total=total)
class ExamRoundProgress(BaseModel):
exam_round: str
question_count: int
max_question_number: int # 0 = exam_question_number 가 모두 NULL
next_question_number: int | None # 도달 시 None
is_complete: bool
class ExamRoundsResponse(BaseModel):
exam_round_size: int | None # 토픽 메타. NULL = 미설정
items: list[ExamRoundProgress]
@router.get("/{topic_id}/exam-rounds", response_model=ExamRoundsResponse)
async def list_exam_rounds(
topic_id: int,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
):
"""토픽 안 회차별 진행률 집계. 진행률 표시 + 재개 UX 용.
is_complete = (exam_round_size IS NOT NULL AND question_count >= exam_round_size).
next_question_number = max + 1 (도달 시 NULL).
"""
topic = await session.get(StudyTopic, topic_id)
_verify_topic_ownership(topic, user)
from models.study_question import StudyQuestion as _SQ
rows = (
await session.execute(
select(
_SQ.exam_round,
func.count().label("question_count"),
func.coalesce(func.max(_SQ.exam_question_number), 0).label("max_qnum"),
)
.where(
_SQ.user_id == user.id,
_SQ.study_topic_id == topic_id,
_SQ.deleted_at.is_(None),
_SQ.exam_round.is_not(None),
)
.group_by(_SQ.exam_round)
.order_by(_SQ.exam_round.desc())
)
).all()
size = topic.exam_round_size
items: list[ExamRoundProgress] = []
for r in rows:
cnt = int(r.question_count)
mx = int(r.max_qnum)
is_complete = bool(size is not None and cnt >= size)
# 도달했으면 next None, 아니면 max+1 (max=0 이면 1번부터)
next_n = None if is_complete else (mx + 1 if mx > 0 else 1)
items.append(ExamRoundProgress(
exam_round=r.exam_round,
question_count=cnt,
max_question_number=mx,
next_question_number=next_n,
is_complete=is_complete,
))
return ExamRoundsResponse(exam_round_size=size, items=items)
@router.get("/by-document/{document_id}", response_model=list[StudyTopicMeta])
async def list_topics_for_document(
document_id: int,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
):
"""이 자료가 매핑된 active 주제 목록.
/study/sources 카드의 "이 자료가 속한 주제" 배지·컨텍스트 메뉴용.
"""
rows = (
await session.execute(
select(StudyTopic)
.join(StudyTopicDocument, StudyTopicDocument.study_topic_id == StudyTopic.id)
.where(
StudyTopicDocument.document_id == document_id,
StudyTopicDocument.user_id == user.id,
StudyTopic.deleted_at.is_(None),
)
.order_by(StudyTopic.sort_order.asc(), StudyTopic.id.desc())
)
).scalars().all()
return [_meta_from_topic(t) for t in rows]
@router.post("/", response_model=StudyTopicResponse, status_code=201)
async def create_study_topic(
body: StudyTopicCreate,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
):
"""주제 생성. 같은 이름의 active 주제가 이미 있으면 409.
soft-deleted 동명 주제는 partial unique index 에서 빠지므로 신규 생성 가능.
"""
topic = StudyTopic(
user_id=user.id,
name=body.name.strip(),
description=body.description,
color=body.color,
study_type=body.study_type,
sort_order=body.sort_order,
exam_round_size=body.exam_round_size,
exam_subjects=body.exam_subjects or [],
)
session.add(topic)
try:
await session.flush()
await session.commit()
except IntegrityError:
await session.rollback()
raise HTTPException(status_code=409, detail="이미 같은 이름의 학습 주제가 있습니다")
return StudyTopicResponse(
id=topic.id,
name=topic.name,
description=topic.description,
color=topic.color,
study_type=topic.study_type,
sort_order=topic.sort_order,
session_count=0,
document_count=0,
question_count=0,
exam_round_size=topic.exam_round_size,
exam_subjects=topic.exam_subjects or [],
created_at=topic.created_at,
updated_at=topic.updated_at,
)
@router.get("/{topic_id}", response_model=StudyTopicDetailResponse)
async def get_study_topic(
topic_id: int,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
):
"""통합 뷰: 주제 메타 + 세션 목록 + 자료 목록 + stats.
응답 형태는 후속 PR(오디오/단어장/문제세트 추가) 에서 sections / stats 키만 늘리면 되도록 dict 구조.
"""
topic = await session.get(StudyTopic, topic_id)
topic = _verify_topic_ownership(topic, user)
# 세션 목록 — 최근순
sess_rows = (
await session.execute(
select(StudySession)
.where(
StudySession.user_id == user.id,
StudySession.study_topic_id == topic_id,
)
.order_by(StudySession.created_at.desc(), StudySession.id.desc())
)
).scalars().all()
sessions_payload = [
StudyTopicSessionSummary(
id=s.id,
study_type=s.study_type,
certification=s.certification,
language_code=s.language_code,
learning_level=s.learning_level,
subject=s.subject,
topic=s.topic,
mode=s.mode,
repetition_count=s.repetition_count,
review_state=s.review_state,
created_at=s.created_at,
updated_at=s.updated_at,
)
for s in sess_rows
]
# 자료 목록 — 매핑 sort_order → created_at desc
doc_rows = (
await session.execute(
select(Document, StudyTopicDocument)
.join(
StudyTopicDocument,
and_(
StudyTopicDocument.document_id == Document.id,
StudyTopicDocument.study_topic_id == topic_id,
StudyTopicDocument.user_id == user.id,
),
)
.where(Document.deleted_at.is_(None))
.order_by(
StudyTopicDocument.sort_order.asc(),
StudyTopicDocument.created_at.desc(),
)
)
).all()
def _extract_library_paths(tags: list | None) -> list[str]:
if not tags:
return []
out: list[str] = []
for t in tags:
if isinstance(t, str) and t.startswith(LIBRARY_PREFIX):
out.append(t[len(LIBRARY_PREFIX):])
return out
documents_payload = [
StudyTopicDocumentSummary(
id=d.id,
title=d.title,
file_format=d.file_format,
file_type=str(d.file_type) if d.file_type is not None else "immutable",
category=str(d.category) if d.category is not None else None,
ai_domain=d.ai_domain,
importance=d.importance,
sort_order=link.sort_order,
linked_at=link.created_at,
library_paths=_extract_library_paths(d.user_tags),
)
for d, link in doc_rows
]
# 문제 목록 (PR-2) — 본문 truncate, 정답·해설 비공개. attempt 통계 batch 로 끌어옴.
from models.study_question import StudyQuestion, StudyQuestionAttempt
q_rows = (
await session.execute(
select(StudyQuestion)
.where(
StudyQuestion.user_id == user.id,
StudyQuestion.study_topic_id == topic_id,
StudyQuestion.deleted_at.is_(None),
)
.order_by(StudyQuestion.created_at.desc(), StudyQuestion.id.desc())
)
).scalars().all()
qids = [q.id for q in q_rows]
q_attempt_count: dict[int, int] = {}
q_last_correct: dict[int, bool] = {}
if qids:
from sqlalchemy import case as _case
cnt_rows = (
await session.execute(
select(
StudyQuestionAttempt.study_question_id,
func.count().label("total"),
)
.where(
StudyQuestionAttempt.user_id == user.id,
StudyQuestionAttempt.study_question_id.in_(qids),
)
.group_by(StudyQuestionAttempt.study_question_id)
)
).all()
for r in cnt_rows:
q_attempt_count[r.study_question_id] = int(r.total)
latest_rows = (
await session.execute(
select(
StudyQuestionAttempt.study_question_id,
StudyQuestionAttempt.is_correct,
)
.where(
StudyQuestionAttempt.user_id == user.id,
StudyQuestionAttempt.study_question_id.in_(qids),
)
.order_by(
StudyQuestionAttempt.study_question_id,
StudyQuestionAttempt.answered_at.desc(),
)
.distinct(StudyQuestionAttempt.study_question_id)
)
).all()
for r in latest_rows:
q_last_correct[r.study_question_id] = bool(r.is_correct)
def _truncate_q(text: str, n: int = 80) -> str:
return text if len(text) <= n else text[:n].rstrip() + ""
questions_payload = [
StudyTopicQuestionSummary(
id=q.id,
question_text=_truncate_q(q.question_text, 80),
subject=q.subject,
scope=q.scope,
exam_name=q.exam_name,
exam_round=q.exam_round,
is_active=q.is_active,
attempt_count=q_attempt_count.get(q.id, 0),
last_correct=q_last_correct.get(q.id),
created_at=q.created_at,
)
for q in q_rows
]
return StudyTopicDetailResponse(
topic=_meta_from_topic(topic),
sections=StudyTopicSections(
sessions=sessions_payload,
documents=documents_payload,
questions=questions_payload,
),
stats=StudyTopicStats(
session_count=len(sessions_payload),
document_count=len(documents_payload),
question_count=len(questions_payload),
# 후속 PR 에서 채움
audio_count=0,
vocab_count=0,
question_set_count=0,
),
)
@router.patch("/{topic_id}", response_model=StudyTopicResponse)
async def update_study_topic(
topic_id: int,
body: StudyTopicUpdate,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
):
topic = await session.get(StudyTopic, topic_id)
topic = _verify_topic_ownership(topic, user)
fields_set = body.model_fields_set
if "name" in fields_set and body.name is not None:
topic.name = body.name.strip()
for fname in {"description", "color", "study_type", "sort_order"} & fields_set:
setattr(topic, fname, getattr(body, fname))
# PR-6: 시험 메타. exam_subjects 가 None 이면 (PATCH 미명시) 기존 유지, 빈 배열은 명시적 클리어
if "exam_round_size" in fields_set:
topic.exam_round_size = body.exam_round_size
if "exam_subjects" in fields_set and body.exam_subjects is not None:
topic.exam_subjects = body.exam_subjects
topic.updated_at = datetime.now(timezone.utc)
try:
await session.commit()
except IntegrityError:
await session.rollback()
raise HTTPException(status_code=409, detail="이미 같은 이름의 학습 주제가 있습니다")
# 응답용 카운트 별도 조회
sc = (await session.execute(
select(func.count())
.select_from(StudySession)
.where(StudySession.user_id == user.id, StudySession.study_topic_id == topic.id)
)).scalar() or 0
dc = (await session.execute(
select(func.count())
.select_from(StudyTopicDocument)
.where(StudyTopicDocument.study_topic_id == topic.id)
)).scalar() or 0
from models.study_question import StudyQuestion as _SQ2
qc = (await session.execute(
select(func.count())
.select_from(_SQ2)
.where(
_SQ2.user_id == user.id,
_SQ2.study_topic_id == topic.id,
_SQ2.deleted_at.is_(None),
)
)).scalar() or 0
return StudyTopicResponse(
id=topic.id,
name=topic.name,
description=topic.description,
color=topic.color,
study_type=topic.study_type,
sort_order=topic.sort_order,
session_count=int(sc),
document_count=int(dc),
question_count=int(qc),
exam_round_size=topic.exam_round_size,
exam_subjects=topic.exam_subjects or [],
created_at=topic.created_at,
updated_at=topic.updated_at,
)
@router.delete("/{topic_id}", status_code=204)
async def delete_study_topic(
topic_id: int,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
):
"""주제 soft delete. 세션의 study_topic_id 는 SET NULL, 문서 매핑은 cascade 제거.
같은 이름 재생성을 위해 partial unique index 가 deleted_at IS NULL 인 행만 본다.
"""
topic = await session.get(StudyTopic, topic_id)
topic = _verify_topic_ownership(topic, user)
# 세션 FK SET NULL — 명시적으로 (DB ON DELETE SET NULL 은 hard delete 시에만 동작)
await session.execute(
update(StudySession)
.where(
StudySession.user_id == user.id,
StudySession.study_topic_id == topic_id,
)
.values(study_topic_id=None)
)
# 문서 매핑 명시 제거 (soft delete 라 FK CASCADE 안 탐)
await session.execute(
delete(StudyTopicDocument).where(
StudyTopicDocument.study_topic_id == topic_id,
)
)
topic.deleted_at = datetime.now(timezone.utc)
await session.commit()
# ─── 자료(문서) 매핑 ───
@router.post(
"/{topic_id}/documents",
response_model=StudyTopicDocumentLinkResponse,
status_code=201,
)
async def link_documents_to_topic(
topic_id: int,
body: StudyTopicDocumentLinkRequest,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
):
"""자료 N개를 주제에 일괄 매핑. 이미 연결된 건 skipped_existing 으로 보고."""
topic = await session.get(StudyTopic, topic_id)
topic = _verify_topic_ownership(topic, user)
requested = list(dict.fromkeys(body.document_ids)) # 중복 제거 + 순서 보존
# 존재 + 미삭제 documents 만 통과 (single-user 시스템: documents.user_id 부재 가능)
valid_rows = (
await session.execute(
select(Document.id).where(
Document.id.in_(requested),
Document.deleted_at.is_(None),
)
)
).scalars().all()
valid_set = set(valid_rows)
not_found = [d for d in requested if d not in valid_set]
# 이미 매핑된 것
existing_rows = (
await session.execute(
select(StudyTopicDocument.document_id).where(
StudyTopicDocument.study_topic_id == topic_id,
StudyTopicDocument.document_id.in_(valid_set),
)
)
).scalars().all()
existing_set = set(existing_rows)
to_link = [d for d in requested if d in valid_set and d not in existing_set]
# 기존 max sort_order 다음부터
max_sort = (
await session.execute(
select(func.coalesce(func.max(StudyTopicDocument.sort_order), -1))
.where(StudyTopicDocument.study_topic_id == topic_id)
)
).scalar() or -1
for i, doc_id in enumerate(to_link, start=1):
session.add(StudyTopicDocument(
study_topic_id=topic_id,
document_id=doc_id,
user_id=user.id,
sort_order=int(max_sort) + i,
))
try:
await session.commit()
except IntegrityError:
await session.rollback()
raise HTTPException(status_code=409, detail="자료 매핑 중 충돌이 발생했습니다")
return StudyTopicDocumentLinkResponse(
linked=to_link,
skipped_existing=sorted(existing_set),
skipped_not_found=not_found,
)
@router.post(
"/{topic_id}/documents/by-path",
response_model=StudyTopicDocumentByPathResponse,
status_code=201,
)
async def link_documents_by_library_path(
topic_id: int,
body: StudyTopicDocumentByPathRequest,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
):
"""자료실 카테고리 path 기준으로 하위 자료를 일괄 매핑.
/api/documents/library?path=... 와 동일한 user_tags(@library/...) prefix 매칭 사용.
100건 limit 우회 — 한 카테고리에 자료가 많아도 한 번에 처리.
"""
topic = await session.get(StudyTopic, topic_id)
topic = _verify_topic_ownership(topic, user)
try:
normalized = normalize_library_path(body.path)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
exact_tag = f"{LIBRARY_PREFIX}{normalized}"
prefix_tag = f"{LIBRARY_PREFIX}{normalized}/%"
# @library/<path> 정확 일치 또는 (subtree 옵션 시) 그 하위까지 prefix 매칭
if body.include_subtree:
tag_filter = sql_text("""
EXISTS (
SELECT 1 FROM jsonb_array_elements_text(documents.user_tags) AS t
WHERE t = :exact OR t LIKE :prefix
)
""").bindparams(exact=exact_tag, prefix=prefix_tag)
else:
tag_filter = sql_text("""
EXISTS (
SELECT 1 FROM jsonb_array_elements_text(documents.user_tags) AS t
WHERE t = :exact
)
""").bindparams(exact=exact_tag)
candidate_rows = (
await session.execute(
select(Document.id).where(
Document.deleted_at.is_(None),
Document.category == "library",
tag_filter,
)
)
).scalars().all()
candidate_set = set(candidate_rows)
total_in_path = len(candidate_set)
if total_in_path == 0:
return StudyTopicDocumentByPathResponse(
linked_count=0,
skipped_existing_count=0,
total_in_path=0,
path=normalized,
)
existing_rows = (
await session.execute(
select(StudyTopicDocument.document_id).where(
StudyTopicDocument.study_topic_id == topic_id,
StudyTopicDocument.document_id.in_(candidate_set),
)
)
).scalars().all()
existing_set = set(existing_rows)
to_link = [d for d in candidate_rows if d not in existing_set]
max_sort = (
await session.execute(
select(func.coalesce(func.max(StudyTopicDocument.sort_order), -1))
.where(StudyTopicDocument.study_topic_id == topic_id)
)
).scalar() or -1
for i, doc_id in enumerate(to_link, start=1):
session.add(StudyTopicDocument(
study_topic_id=topic_id,
document_id=doc_id,
user_id=user.id,
sort_order=int(max_sort) + i,
))
try:
await session.commit()
except IntegrityError:
await session.rollback()
raise HTTPException(status_code=409, detail="자료 매핑 중 충돌이 발생했습니다")
return StudyTopicDocumentByPathResponse(
linked_count=len(to_link),
skipped_existing_count=len(existing_set),
total_in_path=total_in_path,
path=normalized,
)
@router.delete("/{topic_id}/documents/{document_id}", status_code=204)
async def unlink_document_from_topic(
topic_id: int,
document_id: int,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
):
topic = await session.get(StudyTopic, topic_id)
_verify_topic_ownership(topic, user)
result = await session.execute(
delete(StudyTopicDocument).where(
StudyTopicDocument.study_topic_id == topic_id,
StudyTopicDocument.document_id == document_id,
StudyTopicDocument.user_id == user.id,
)
)
await session.commit()
if result.rowcount == 0:
raise HTTPException(status_code=404, detail="매핑을 찾을 수 없습니다")
# ─── 세션 매핑 ───
@router.post("/{topic_id}/sessions/{session_id}", status_code=204)
async def attach_session_to_topic(
topic_id: int,
session_id: int,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
):
"""기존 세션을 주제에 연결. 다른 주제에 이미 연결돼 있으면 새 주제로 갱신 (1:N)."""
topic = await session.get(StudyTopic, topic_id)
_verify_topic_ownership(topic, user)
sess = await session.get(StudySession, session_id)
if sess is None or sess.user_id != user.id:
raise HTTPException(status_code=404, detail="학습 세션을 찾을 수 없습니다")
sess.study_topic_id = topic_id
sess.updated_at = datetime.now(timezone.utc)
await session.commit()
@router.delete("/{topic_id}/sessions/{session_id}", status_code=204)
async def detach_session_from_topic(
topic_id: int,
session_id: int,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
):
"""세션 분리. 현재 study_topic_id 가 topic_id 가 아니면 404."""
topic = await session.get(StudyTopic, topic_id)
_verify_topic_ownership(topic, user)
sess = await session.get(StudySession, session_id)
if sess is None or sess.user_id != user.id:
raise HTTPException(status_code=404, detail="학습 세션을 찾을 수 없습니다")
if sess.study_topic_id != topic_id:
raise HTTPException(status_code=404, detail="해당 주제에 연결된 세션이 아닙니다")
sess.study_topic_id = None
sess.updated_at = datetime.now(timezone.utc)
await session.commit()
# ─── PR-9: 분야 설명 (study_topic_subject_notes) ───
SUBJECT_NOTE_TIMEOUT_S = 30.0
_SUBJECT_NOTE_PROMPT_PATH = "study_subject_note.txt"
_subject_note_prompt_cache: str | None = None
def _load_subject_note_prompt() -> str:
global _subject_note_prompt_cache
if _subject_note_prompt_cache is None:
prompts_dir = _Path(__file__).resolve().parent.parent / "prompts"
_subject_note_prompt_cache = (prompts_dir / _SUBJECT_NOTE_PROMPT_PATH).read_text(encoding="utf-8")
return _subject_note_prompt_cache
def _render_subject_note_prompt(subject: str, scope: str, doc_block: str, q_block: str) -> str:
template = _load_subject_note_prompt()
return (
template
.replace("{subject}", subject)
.replace("{scope}", scope or "(미지정)")
.replace("{documents_evidence_block}", doc_block)
.replace("{questions_evidence_block}", q_block)
)
class SubjectNoteRequest(BaseModel):
subject: str = Field(min_length=1, max_length=120)
scope: str = Field(default="", max_length=200)
regenerate: bool = False
class SubjectNoteEvidence(BaseModel):
source_type: str
source_id: int
title: str
snippet: str
class SubjectNoteResponse(BaseModel):
subject: str
scope: str
content: str | None
status: str # ready | failed | none | stale | pending
generated_at: datetime | None
model: str | None
evidence: list[SubjectNoteEvidence] = []
from_cache: bool = False
can_regenerate: bool = True
def _note_cache_response(note: StudyTopicSubjectNote) -> SubjectNoteResponse:
return SubjectNoteResponse(
subject=note.subject,
scope=note.scope,
content=note.content,
status=note.status,
generated_at=note.generated_at,
model=note.model,
evidence=[],
from_cache=True,
can_regenerate=True,
)
@router.get("/{topic_id}/subject-notes", response_model=SubjectNoteResponse)
async def get_subject_note(
topic_id: int,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
subject: str = Query(..., min_length=1, max_length=120),
scope: str = Query("", max_length=200),
):
"""캐시 조회. 없으면 status='none' + content=null 응답."""
topic = await session.get(StudyTopic, topic_id)
_verify_topic_ownership(topic, user)
note = (
await session.execute(
select(StudyTopicSubjectNote).where(
StudyTopicSubjectNote.user_id == user.id,
StudyTopicSubjectNote.study_topic_id == topic_id,
StudyTopicSubjectNote.subject == subject,
StudyTopicSubjectNote.scope == scope,
)
)
).scalar_one_or_none()
if note is None:
return SubjectNoteResponse(
subject=subject, scope=scope, content=None, status="none",
generated_at=None, model=None, evidence=[], from_cache=True, can_regenerate=True,
)
return _note_cache_response(note)
@router.post("/{topic_id}/subject-notes/generate", response_model=SubjectNoteResponse)
async def generate_subject_note(
topic_id: int,
body: SubjectNoteRequest,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
):
"""분야 설명 AI 생성 + 캐시. PR-3 race-safe pending 패턴 동일.
regenerate=false + status=ready → 캐시 반환.
pending → 409.
그 외 → 새 생성. 실패 시 status='failed', 직전 본문 보존.
"""
topic = await session.get(StudyTopic, topic_id)
_verify_topic_ownership(topic, user)
# upsert: 기존 행 있으면 사용, 없으면 신규
note = (
await session.execute(
select(StudyTopicSubjectNote).where(
StudyTopicSubjectNote.user_id == user.id,
StudyTopicSubjectNote.study_topic_id == topic_id,
StudyTopicSubjectNote.subject == body.subject,
StudyTopicSubjectNote.scope == body.scope,
)
)
).scalar_one_or_none()
if note is None:
note = StudyTopicSubjectNote(
user_id=user.id,
study_topic_id=topic_id,
subject=body.subject,
scope=body.scope,
status="none",
)
session.add(note)
await session.flush()
await session.commit()
else:
# 캐시 단축
if not body.regenerate:
if note.status == "ready":
return _note_cache_response(note)
if note.status == "pending":
raise HTTPException(
status_code=409,
detail={"status": "pending", "detail": "이미 생성 중입니다"},
)
# none/failed/stale → 새로 생성
else:
if note.status == "pending":
raise HTTPException(
status_code=409,
detail={"status": "pending", "detail": "이미 생성 중입니다"},
)
# race-safe pending 전이
lock = await session.execute(
update(StudyTopicSubjectNote)
.where(
StudyTopicSubjectNote.id == note.id,
StudyTopicSubjectNote.status != "pending",
)
.values(status="pending", updated_at=datetime.now(timezone.utc))
.returning(StudyTopicSubjectNote.id)
)
if lock.scalar_one_or_none() is None:
raise HTTPException(
status_code=409,
detail={"status": "pending", "detail": "이미 생성 중입니다"},
)
await session.commit()
# RAG
try:
ctx = await gather_subject_note_context(session, user.id, topic_id, body.subject, body.scope)
except Exception as e:
logger.warning("subject_note_rag_failed: %s: %s", type(e).__name__, e)
ctx = SubjectNoteContext(documents=[], questions=[])
doc_block = render_evidence_block(ctx.documents)
q_block = render_evidence_block(ctx.questions)
prompt = _render_subject_note_prompt(body.subject, body.scope, doc_block, q_block)
ai_client = AIClient()
raw_text: str | None = None
try:
async with get_mlx_gate():
async with asyncio.timeout(SUBJECT_NOTE_TIMEOUT_S):
raw_text = await ai_client.call_primary(prompt)
except asyncio.TimeoutError:
logger.warning("subject_note_mlx_timeout topic=%s subject=%s", topic_id, body.subject)
except Exception:
logger.exception("subject_note_mlx_failed topic=%s subject=%s", topic_id, body.subject)
finally:
await ai_client.close()
note = await session.get(StudyTopicSubjectNote, note.id)
if not raw_text or not raw_text.strip():
note.status = "failed"
note.updated_at = datetime.now(timezone.utc)
await session.commit()
return SubjectNoteResponse(
subject=note.subject, scope=note.scope, content=note.content,
status="failed", generated_at=note.generated_at, model=note.model,
evidence=[e.to_dict() for e in ctx.all],
from_cache=False, can_regenerate=True,
)
cleaned = strip_thinking(raw_text).strip()
note.content = cleaned
note.status = "ready"
note.generated_at = datetime.now(timezone.utc)
primary_name = ai_client.ai.primary.model if hasattr(ai_client.ai.primary, "model") else "primary"
note.model = f"mlx:{primary_name}"
note.updated_at = note.generated_at
await session.commit()
return SubjectNoteResponse(
subject=note.subject, scope=note.scope, content=note.content,
status="ready", generated_at=note.generated_at, model=note.model,
evidence=[e.to_dict() for e in ctx.all],
from_cache=False, can_regenerate=True,
)