Files
hyungi_document_server/app/api/study_topics.py
T
Hyungi Ahn 62afc571c0 feat(study): 카테고리 트리에서 자료 일괄 추가
자료 추가 모달이 1건씩 검색·체크박스만 지원해서 같은 카테고리에 자료가
많을 때 비효율적. /api/library/tree 의 카테고리 구조를 모달 좌측에 띄우고,
노드 옆 아이콘 한 번으로 그 path 하위 자료 전체를 한 번에 매핑.

백엔드: POST /api/study-topics/{id}/documents/by-path 추가. user_tags
@library/<path> prefix 매칭(documents.py 의 list_library_documents 와
동일한 EXISTS 쿼리)으로 100건 limit 우회. 응답은 linked_count /
skipped_existing_count / total_in_path 카운트만 노출.

프론트: 모달을 max-w-4xl + grid(트리/자료) 레이아웃으로 개편. 트리 노드
클릭 = 우측 자료 목록 path 필터링, 노드 옆 FolderPlus 버튼 = 즉시 일괄
추가. 검색·체크박스·전체선택은 그대로. 모바일은 트리가 상단 max-h-40vh
영역으로 stack.
2026-04-28 07:29:59 +09:00

748 lines
24 KiB
Python

"""학습 워크스페이스(study_topic) API — 필기 세션 + 자료(library document) 묶음 컨테이너
핵심 개념:
- study_topic 은 단순 폴더/태그가 아니라 학습 자산 1차 컨테이너 (학습 워크스페이스).
- 향후 단어장/오디오/문제 세트 같은 자산이 같은 컨테이너 아래로 들어올 수 있도록
응답 구조를 sections + stats 형태로 설계 (현재는 sessions / documents 두 키만).
- documents.category(자료실 UI 축) 와 직교한 별도 분류 축. 자료실 facet/카테고리는 미터치.
- StudySession.certification/subject/topic 은 보존 — 본 컨테이너 와 직교한 세부 메타.
설계 제약:
- study_type 은 강한 enum 미사용. VALID_STUDY_TYPES 는 단순 권장값 (DB/Pydantic 강제 안 함).
- soft delete (deleted_at). active 행끼리만 (user_id, name) partial unique.
- 권한: 모든 쿼리에 user_id 강제. 다른 사용자 자료를 묶지 못함.
- 문서 매핑은 N:M (study_topic_documents). 세션 매핑은 1:N (study_sessions.study_topic_id).
- polymorphic 단일 study_topic_items 테이블은 만들지 않는다 (영구 금지).
"""
import logging
from datetime import datetime, timezone
from typing import Annotated, Any
from fastapi import APIRouter, Depends, HTTPException, Query
from pydantic import BaseModel, Field
from sqlalchemy import and_, delete, func, select, text as sql_text, update
from sqlalchemy.exc import IntegrityError
from sqlalchemy.ext.asyncio import AsyncSession
from core.auth import get_current_user
from core.database import get_session
from core.library import LIBRARY_PREFIX, normalize_library_path
from models.document import Document
from models.study_session import StudySession
from models.study_topic import StudyTopic, StudyTopicDocument
from models.user import User
logger = logging.getLogger(__name__)
router = APIRouter()
# ─── 권장값 (강제 enum 아님, UI 안내용) ───
RECOMMENDED_STUDY_TYPES: set[str] = {
"certification", "language", "school", "work", "general",
}
# ─── Pydantic 스키마 ───
class StudyTopicCreate(BaseModel):
name: str = Field(min_length=1, max_length=120)
description: str | None = None
color: str | None = Field(default=None, max_length=20)
study_type: str | None = Field(default=None, max_length=40)
sort_order: int = 0
class StudyTopicUpdate(BaseModel):
name: str | None = Field(default=None, min_length=1, max_length=120)
description: str | None = None
color: str | None = Field(default=None, max_length=20)
study_type: str | None = Field(default=None, max_length=40)
sort_order: int | None = None
class StudyTopicResponse(BaseModel):
"""주제 목록 응답 — 집계 카운트 포함."""
id: int
name: str
description: str | None
color: str | None
study_type: str | None
sort_order: int
session_count: int = 0
document_count: int = 0
created_at: datetime
updated_at: datetime
class StudyTopicListResponse(BaseModel):
items: list[StudyTopicResponse]
total: int
class StudyTopicSessionSummary(BaseModel):
"""상세 뷰의 세션 카드 페이로드 — 통합 뷰 렌더용 최소 필드."""
id: int
study_type: str
certification: str | None
language_code: str | None
learning_level: str | None
subject: str | None
topic: str | None
mode: str
repetition_count: int
review_state: str | None
created_at: datetime
updated_at: datetime
class StudyTopicDocumentSummary(BaseModel):
"""상세 뷰의 자료 카드 페이로드."""
id: int
title: str | None
file_format: str
file_type: str
category: str | None
ai_domain: str | None
importance: str | None
sort_order: int
linked_at: datetime
class StudyTopicSections(BaseModel):
"""확장 친화 dict — 향후 audio_assets / vocab_decks / question_sets 키 추가 가능."""
sessions: list[StudyTopicSessionSummary]
documents: list[StudyTopicDocumentSummary]
class StudyTopicStats(BaseModel):
"""자산 카운트 — 0 으로 미리 노출해서 후속 PR 에서 필드만 채움."""
session_count: int
document_count: int
audio_count: int = 0
vocab_count: int = 0
question_set_count: int = 0
class StudyTopicMeta(BaseModel):
id: int
name: str
description: str | None
color: str | None
study_type: str | None
sort_order: int
created_at: datetime
updated_at: datetime
class StudyTopicDetailResponse(BaseModel):
topic: StudyTopicMeta
sections: StudyTopicSections
stats: StudyTopicStats
class StudyTopicDocumentLinkRequest(BaseModel):
document_ids: list[int] = Field(min_length=1, max_length=100)
class StudyTopicDocumentLinkResponse(BaseModel):
linked: list[int]
skipped_existing: list[int]
skipped_not_found: list[int]
class StudyTopicDocumentByPathRequest(BaseModel):
"""카테고리 트리 노드 일괄 추가용. path prefix 매칭으로 하위 자료 모두 매핑.
include_subtree=True 면 path 와 그 하위 카테고리 전체, False 면 path 정확 일치만.
"""
path: str = Field(min_length=1, max_length=500)
include_subtree: bool = True
class StudyTopicDocumentByPathResponse(BaseModel):
linked_count: int
skipped_existing_count: int
total_in_path: int
path: str
# ─── Helpers ───
def _verify_topic_ownership(topic: StudyTopic | None, user: User) -> StudyTopic:
"""소유자 검증 + soft-deleted 행 차단. mismatch 도 404 (정보 누설 방지)."""
if topic is None or topic.user_id != user.id or topic.deleted_at is not None:
raise HTTPException(status_code=404, detail="학습 주제를 찾을 수 없습니다")
return topic
def _meta_from_topic(t: StudyTopic) -> StudyTopicMeta:
return StudyTopicMeta(
id=t.id,
name=t.name,
description=t.description,
color=t.color,
study_type=t.study_type,
sort_order=t.sort_order,
created_at=t.created_at,
updated_at=t.updated_at,
)
# ─── 엔드포인트 ───
@router.get("/", response_model=StudyTopicListResponse)
async def list_study_topics(
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
limit: int = Query(100, ge=1, le=500),
offset: int = Query(0, ge=0),
):
"""사용자의 active 주제 목록. 세션 수·자료 수 집계 포함."""
# active 주제 + 세션 수 (LEFT JOIN study_sessions)
sess_count_sub = (
select(
StudySession.study_topic_id.label("topic_id"),
func.count().label("c"),
)
.where(StudySession.user_id == user.id)
.where(StudySession.study_topic_id.is_not(None))
.group_by(StudySession.study_topic_id)
.subquery()
)
doc_count_sub = (
select(
StudyTopicDocument.study_topic_id.label("topic_id"),
func.count().label("c"),
)
.where(StudyTopicDocument.user_id == user.id)
.group_by(StudyTopicDocument.study_topic_id)
.subquery()
)
base = (
select(
StudyTopic,
func.coalesce(sess_count_sub.c.c, 0).label("session_count"),
func.coalesce(doc_count_sub.c.c, 0).label("document_count"),
)
.outerjoin(sess_count_sub, sess_count_sub.c.topic_id == StudyTopic.id)
.outerjoin(doc_count_sub, doc_count_sub.c.topic_id == StudyTopic.id)
.where(StudyTopic.user_id == user.id, StudyTopic.deleted_at.is_(None))
.order_by(StudyTopic.sort_order.asc(), StudyTopic.id.desc())
)
total_query = select(func.count()).select_from(
select(StudyTopic.id)
.where(StudyTopic.user_id == user.id, StudyTopic.deleted_at.is_(None))
.subquery()
)
total = (await session.execute(total_query)).scalar() or 0
rows = (await session.execute(base.offset(offset).limit(limit))).all()
items = [
StudyTopicResponse(
id=t.id,
name=t.name,
description=t.description,
color=t.color,
study_type=t.study_type,
sort_order=t.sort_order,
session_count=int(sc),
document_count=int(dc),
created_at=t.created_at,
updated_at=t.updated_at,
)
for t, sc, dc in rows
]
return StudyTopicListResponse(items=items, total=total)
@router.get("/by-document/{document_id}", response_model=list[StudyTopicMeta])
async def list_topics_for_document(
document_id: int,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
):
"""이 자료가 매핑된 active 주제 목록.
/study/sources 카드의 "이 자료가 속한 주제" 배지·컨텍스트 메뉴용.
"""
rows = (
await session.execute(
select(StudyTopic)
.join(StudyTopicDocument, StudyTopicDocument.study_topic_id == StudyTopic.id)
.where(
StudyTopicDocument.document_id == document_id,
StudyTopicDocument.user_id == user.id,
StudyTopic.deleted_at.is_(None),
)
.order_by(StudyTopic.sort_order.asc(), StudyTopic.id.desc())
)
).scalars().all()
return [_meta_from_topic(t) for t in rows]
@router.post("/", response_model=StudyTopicResponse, status_code=201)
async def create_study_topic(
body: StudyTopicCreate,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
):
"""주제 생성. 같은 이름의 active 주제가 이미 있으면 409.
soft-deleted 동명 주제는 partial unique index 에서 빠지므로 신규 생성 가능.
"""
topic = StudyTopic(
user_id=user.id,
name=body.name.strip(),
description=body.description,
color=body.color,
study_type=body.study_type,
sort_order=body.sort_order,
)
session.add(topic)
try:
await session.flush()
await session.commit()
except IntegrityError:
await session.rollback()
raise HTTPException(status_code=409, detail="이미 같은 이름의 학습 주제가 있습니다")
return StudyTopicResponse(
id=topic.id,
name=topic.name,
description=topic.description,
color=topic.color,
study_type=topic.study_type,
sort_order=topic.sort_order,
session_count=0,
document_count=0,
created_at=topic.created_at,
updated_at=topic.updated_at,
)
@router.get("/{topic_id}", response_model=StudyTopicDetailResponse)
async def get_study_topic(
topic_id: int,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
):
"""통합 뷰: 주제 메타 + 세션 목록 + 자료 목록 + stats.
응답 형태는 후속 PR(오디오/단어장/문제세트 추가) 에서 sections / stats 키만 늘리면 되도록 dict 구조.
"""
topic = await session.get(StudyTopic, topic_id)
topic = _verify_topic_ownership(topic, user)
# 세션 목록 — 최근순
sess_rows = (
await session.execute(
select(StudySession)
.where(
StudySession.user_id == user.id,
StudySession.study_topic_id == topic_id,
)
.order_by(StudySession.created_at.desc(), StudySession.id.desc())
)
).scalars().all()
sessions_payload = [
StudyTopicSessionSummary(
id=s.id,
study_type=s.study_type,
certification=s.certification,
language_code=s.language_code,
learning_level=s.learning_level,
subject=s.subject,
topic=s.topic,
mode=s.mode,
repetition_count=s.repetition_count,
review_state=s.review_state,
created_at=s.created_at,
updated_at=s.updated_at,
)
for s in sess_rows
]
# 자료 목록 — 매핑 sort_order → created_at desc
doc_rows = (
await session.execute(
select(Document, StudyTopicDocument)
.join(
StudyTopicDocument,
and_(
StudyTopicDocument.document_id == Document.id,
StudyTopicDocument.study_topic_id == topic_id,
StudyTopicDocument.user_id == user.id,
),
)
.where(Document.deleted_at.is_(None))
.order_by(
StudyTopicDocument.sort_order.asc(),
StudyTopicDocument.created_at.desc(),
)
)
).all()
documents_payload = [
StudyTopicDocumentSummary(
id=d.id,
title=d.title,
file_format=d.file_format,
file_type=str(d.file_type) if d.file_type is not None else "immutable",
category=str(d.category) if d.category is not None else None,
ai_domain=d.ai_domain,
importance=d.importance,
sort_order=link.sort_order,
linked_at=link.created_at,
)
for d, link in doc_rows
]
return StudyTopicDetailResponse(
topic=_meta_from_topic(topic),
sections=StudyTopicSections(
sessions=sessions_payload,
documents=documents_payload,
),
stats=StudyTopicStats(
session_count=len(sessions_payload),
document_count=len(documents_payload),
# 후속 PR 에서 채움
audio_count=0,
vocab_count=0,
question_set_count=0,
),
)
@router.patch("/{topic_id}", response_model=StudyTopicResponse)
async def update_study_topic(
topic_id: int,
body: StudyTopicUpdate,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
):
topic = await session.get(StudyTopic, topic_id)
topic = _verify_topic_ownership(topic, user)
fields_set = body.model_fields_set
if "name" in fields_set and body.name is not None:
topic.name = body.name.strip()
for fname in {"description", "color", "study_type", "sort_order"} & fields_set:
setattr(topic, fname, getattr(body, fname))
topic.updated_at = datetime.now(timezone.utc)
try:
await session.commit()
except IntegrityError:
await session.rollback()
raise HTTPException(status_code=409, detail="이미 같은 이름의 학습 주제가 있습니다")
# 응답용 카운트 별도 조회
sc = (await session.execute(
select(func.count())
.select_from(StudySession)
.where(StudySession.user_id == user.id, StudySession.study_topic_id == topic.id)
)).scalar() or 0
dc = (await session.execute(
select(func.count())
.select_from(StudyTopicDocument)
.where(StudyTopicDocument.study_topic_id == topic.id)
)).scalar() or 0
return StudyTopicResponse(
id=topic.id,
name=topic.name,
description=topic.description,
color=topic.color,
study_type=topic.study_type,
sort_order=topic.sort_order,
session_count=int(sc),
document_count=int(dc),
created_at=topic.created_at,
updated_at=topic.updated_at,
)
@router.delete("/{topic_id}", status_code=204)
async def delete_study_topic(
topic_id: int,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
):
"""주제 soft delete. 세션의 study_topic_id 는 SET NULL, 문서 매핑은 cascade 제거.
같은 이름 재생성을 위해 partial unique index 가 deleted_at IS NULL 인 행만 본다.
"""
topic = await session.get(StudyTopic, topic_id)
topic = _verify_topic_ownership(topic, user)
# 세션 FK SET NULL — 명시적으로 (DB ON DELETE SET NULL 은 hard delete 시에만 동작)
await session.execute(
update(StudySession)
.where(
StudySession.user_id == user.id,
StudySession.study_topic_id == topic_id,
)
.values(study_topic_id=None)
)
# 문서 매핑 명시 제거 (soft delete 라 FK CASCADE 안 탐)
await session.execute(
delete(StudyTopicDocument).where(
StudyTopicDocument.study_topic_id == topic_id,
)
)
topic.deleted_at = datetime.now(timezone.utc)
await session.commit()
# ─── 자료(문서) 매핑 ───
@router.post(
"/{topic_id}/documents",
response_model=StudyTopicDocumentLinkResponse,
status_code=201,
)
async def link_documents_to_topic(
topic_id: int,
body: StudyTopicDocumentLinkRequest,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
):
"""자료 N개를 주제에 일괄 매핑. 이미 연결된 건 skipped_existing 으로 보고."""
topic = await session.get(StudyTopic, topic_id)
topic = _verify_topic_ownership(topic, user)
requested = list(dict.fromkeys(body.document_ids)) # 중복 제거 + 순서 보존
# 존재 + 미삭제 documents 만 통과 (single-user 시스템: documents.user_id 부재 가능)
valid_rows = (
await session.execute(
select(Document.id).where(
Document.id.in_(requested),
Document.deleted_at.is_(None),
)
)
).scalars().all()
valid_set = set(valid_rows)
not_found = [d for d in requested if d not in valid_set]
# 이미 매핑된 것
existing_rows = (
await session.execute(
select(StudyTopicDocument.document_id).where(
StudyTopicDocument.study_topic_id == topic_id,
StudyTopicDocument.document_id.in_(valid_set),
)
)
).scalars().all()
existing_set = set(existing_rows)
to_link = [d for d in requested if d in valid_set and d not in existing_set]
# 기존 max sort_order 다음부터
max_sort = (
await session.execute(
select(func.coalesce(func.max(StudyTopicDocument.sort_order), -1))
.where(StudyTopicDocument.study_topic_id == topic_id)
)
).scalar() or -1
for i, doc_id in enumerate(to_link, start=1):
session.add(StudyTopicDocument(
study_topic_id=topic_id,
document_id=doc_id,
user_id=user.id,
sort_order=int(max_sort) + i,
))
try:
await session.commit()
except IntegrityError:
await session.rollback()
raise HTTPException(status_code=409, detail="자료 매핑 중 충돌이 발생했습니다")
return StudyTopicDocumentLinkResponse(
linked=to_link,
skipped_existing=sorted(existing_set),
skipped_not_found=not_found,
)
@router.post(
"/{topic_id}/documents/by-path",
response_model=StudyTopicDocumentByPathResponse,
status_code=201,
)
async def link_documents_by_library_path(
topic_id: int,
body: StudyTopicDocumentByPathRequest,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
):
"""자료실 카테고리 path 기준으로 하위 자료를 일괄 매핑.
/api/documents/library?path=... 와 동일한 user_tags(@library/...) prefix 매칭 사용.
100건 limit 우회 — 한 카테고리에 자료가 많아도 한 번에 처리.
"""
topic = await session.get(StudyTopic, topic_id)
topic = _verify_topic_ownership(topic, user)
try:
normalized = normalize_library_path(body.path)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
exact_tag = f"{LIBRARY_PREFIX}{normalized}"
prefix_tag = f"{LIBRARY_PREFIX}{normalized}/%"
# @library/<path> 정확 일치 또는 (subtree 옵션 시) 그 하위까지 prefix 매칭
if body.include_subtree:
tag_filter = sql_text("""
EXISTS (
SELECT 1 FROM jsonb_array_elements_text(documents.user_tags) AS t
WHERE t = :exact OR t LIKE :prefix
)
""").bindparams(exact=exact_tag, prefix=prefix_tag)
else:
tag_filter = sql_text("""
EXISTS (
SELECT 1 FROM jsonb_array_elements_text(documents.user_tags) AS t
WHERE t = :exact
)
""").bindparams(exact=exact_tag)
candidate_rows = (
await session.execute(
select(Document.id).where(
Document.deleted_at.is_(None),
Document.category == "library",
tag_filter,
)
)
).scalars().all()
candidate_set = set(candidate_rows)
total_in_path = len(candidate_set)
if total_in_path == 0:
return StudyTopicDocumentByPathResponse(
linked_count=0,
skipped_existing_count=0,
total_in_path=0,
path=normalized,
)
existing_rows = (
await session.execute(
select(StudyTopicDocument.document_id).where(
StudyTopicDocument.study_topic_id == topic_id,
StudyTopicDocument.document_id.in_(candidate_set),
)
)
).scalars().all()
existing_set = set(existing_rows)
to_link = [d for d in candidate_rows if d not in existing_set]
max_sort = (
await session.execute(
select(func.coalesce(func.max(StudyTopicDocument.sort_order), -1))
.where(StudyTopicDocument.study_topic_id == topic_id)
)
).scalar() or -1
for i, doc_id in enumerate(to_link, start=1):
session.add(StudyTopicDocument(
study_topic_id=topic_id,
document_id=doc_id,
user_id=user.id,
sort_order=int(max_sort) + i,
))
try:
await session.commit()
except IntegrityError:
await session.rollback()
raise HTTPException(status_code=409, detail="자료 매핑 중 충돌이 발생했습니다")
return StudyTopicDocumentByPathResponse(
linked_count=len(to_link),
skipped_existing_count=len(existing_set),
total_in_path=total_in_path,
path=normalized,
)
@router.delete("/{topic_id}/documents/{document_id}", status_code=204)
async def unlink_document_from_topic(
topic_id: int,
document_id: int,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
):
topic = await session.get(StudyTopic, topic_id)
_verify_topic_ownership(topic, user)
result = await session.execute(
delete(StudyTopicDocument).where(
StudyTopicDocument.study_topic_id == topic_id,
StudyTopicDocument.document_id == document_id,
StudyTopicDocument.user_id == user.id,
)
)
await session.commit()
if result.rowcount == 0:
raise HTTPException(status_code=404, detail="매핑을 찾을 수 없습니다")
# ─── 세션 매핑 ───
@router.post("/{topic_id}/sessions/{session_id}", status_code=204)
async def attach_session_to_topic(
topic_id: int,
session_id: int,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
):
"""기존 세션을 주제에 연결. 다른 주제에 이미 연결돼 있으면 새 주제로 갱신 (1:N)."""
topic = await session.get(StudyTopic, topic_id)
_verify_topic_ownership(topic, user)
sess = await session.get(StudySession, session_id)
if sess is None or sess.user_id != user.id:
raise HTTPException(status_code=404, detail="학습 세션을 찾을 수 없습니다")
sess.study_topic_id = topic_id
sess.updated_at = datetime.now(timezone.utc)
await session.commit()
@router.delete("/{topic_id}/sessions/{session_id}", status_code=204)
async def detach_session_from_topic(
topic_id: int,
session_id: int,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
):
"""세션 분리. 현재 study_topic_id 가 topic_id 가 아니면 404."""
topic = await session.get(StudyTopic, topic_id)
_verify_topic_ownership(topic, user)
sess = await session.get(StudySession, session_id)
if sess is None or sess.user_id != user.id:
raise HTTPException(status_code=404, detail="학습 세션을 찾을 수 없습니다")
if sess.study_topic_id != topic_id:
raise HTTPException(status_code=404, detail="해당 주제에 연결된 세션이 아닙니다")
sess.study_topic_id = None
sess.updated_at = datetime.now(timezone.utc)
await session.commit()