Files
hyungi_document_server/app/api/study_sessions.py
T
Hyungi Ahn 63ed4d81e5 feat(study): study_topics 학습 워크스페이스 컨테이너 도입
필기 세션과 자료(library document)를 한 학습 주제(예: 가스기사) 아래로 묶는
1차 컨테이너. 향후 단어장/오디오/문제세트 등 학습 자산이 같은 묶음으로 들어올 수
있도록 응답 구조(sections + stats)를 dict 기반으로 설계.

데이터 모델 (migrations 179~185):
- study_topics: user_id × name partial unique (active 행만), soft delete
- study_sessions.study_topic_id: 1:N nullable FK (ON DELETE SET NULL)
- study_topic_documents: 자료 N:M 매핑 (user_id 반정규화로 권한 격리)

설계 원칙:
- documents.category(자료실 UI 축)와 직교 → 자료실 facet/카테고리 미터치
- StudySession.certification/subject/topic 보존 (세부 메타로 계속 사용)
- study_type은 느슨한 분류 (강한 enum 미사용, jlpt_n3 등 확장 여지)
- polymorphic study_topic_items 영구 금지 → 자산 타입별 조인 테이블 추가 방식

API: /api/study-topics CRUD + /by-document/{id} + 자료/세션 매핑 엔드포인트.
프론트: /study/topics 목록 + /study/topics/[id] 통합 뷰(필기·자료 두 트랙) +
        write 폼에 워크스페이스 드롭다운 + study hub 진입 카드.

후속 PR-2 어학 UX, PR-3 오디오 자산, PR-4 AI retrieval scope.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-28 07:06:37 +09:00

928 lines
34 KiB
Python

"""학습 세션 API — Phase 1 MVP (자격증 + 어학 일반화)
iPad 손글씨 필사 / 모바일 암기노트 / 모바일 퀴즈 가 같은 study_sessions 데이터를
공유. 본 모듈은 Phase 1 = iPad 필사 세션 + DB/API 일반화 까지만 다룬다.
핵심:
- study_type 'certification' | 'language' 분기. metadata jsonb 가 도메인별 자유 메타.
- 단일 *_document_id 컬럼 ❌. 모든 미디어 연결은 study_session_assets 로 통일.
- documents 본체는 절대 삭제하지 않음 (assets 연결만 해제).
- ownership 검증: study_sessions.user_id == current_user.id (필수).
documents 는 single-user 시스템이라 컬럼 부재 — 미래 multi-user 대비
`getattr(doc, 'user_id', None)` 로 부드럽게 검증 (값 있으면 비교, 없으면 통과).
- 409 중복: UNIQUE(study_session_id, document_id, asset_type, role) 위반.
Phase 2~4 미사용 필드 (review_state / quiz / ocr_text / ai_summary / prompt 등) 는
스키마에만 존재, 자동 로직 없음. 별도 PR 에서 활성.
"""
import asyncio
import logging
from datetime import datetime, timezone
from pathlib import Path
from typing import Annotated, Any
from fastapi import (
APIRouter,
Depends,
Form,
HTTPException,
Query,
Request,
UploadFile,
)
from pydantic import BaseModel, Field
from sqlalchemy import and_, delete, func, select
from sqlalchemy.exc import IntegrityError
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload
from starlette.requests import ClientDisconnect
from core.auth import get_current_user
from core.config import settings
from core.database import get_session
from core.utils import file_hash
from models.document import Document
from models.queue import enqueue_stage
from models.study_session import StudySession, StudySessionAsset
from models.user import User
logger = logging.getLogger(__name__)
router = APIRouter()
# ─── Enum 검증 상수 ───
VALID_STUDY_TYPES: set[str] = {"certification", "language"}
VALID_MODES: set[str] = {
"copy", "trace", "blank-repeat",
"dictation", "shadowing",
"quiz", "flashcard", # Phase 2~4 활성, schema 만 수용
}
VALID_ASSET_TYPES: set[str] = {
"source_scan", "handwriting_png", "audio", "video", "transcript", "reference",
}
VALID_ROLES: set[str | None] = {
None,
"prompt", "answer", "pronunciation", "lecture",
"listening_source", "shadowing_source", "reference",
}
VALID_REVIEW_STATES: set[str | None] = {
None, "new", "learning", "weak", "mastered",
}
VALID_ORDERS: set[str] = {"created_at", "next_review_at", "last_quiz_at"}
# ─── Helpers ───
def _upload_error(status_code: int, error_code: str, message: str) -> HTTPException:
"""업로드 실패 응답 — documents.py 와 동일한 패턴."""
return HTTPException(
status_code=status_code,
detail={"error_code": error_code, "message": message},
)
def _verify_session_ownership(
sess: StudySession | None, user: User
) -> StudySession:
"""세션 ownership 검증. 정보 누설 방지로 mismatch 도 404."""
if sess is None or sess.user_id != user.id:
raise HTTPException(status_code=404, detail="학습 세션을 찾을 수 없습니다")
return sess
def _verify_document_ownership(doc: Document | None, user: User) -> Document:
"""문서 ownership 검증.
documents.user_id 컬럼은 현재 single-user 시스템이라 부재.
미래 multi-user 대비 `getattr` 로 안전하게 비교.
"""
if doc is None or getattr(doc, "deleted_at", None) is not None:
raise HTTPException(status_code=404, detail="문서를 찾을 수 없습니다")
doc_user_id = getattr(doc, "user_id", None)
if doc_user_id is not None and doc_user_id != user.id:
raise HTTPException(status_code=404, detail="문서를 찾을 수 없습니다")
return doc
# ─── Pydantic Schemas ───
class StudySessionAssetCreate(BaseModel):
document_id: int
asset_type: str
role: str | None = None
sort_order: int = 0
class StudySessionAssetResponse(BaseModel):
id: int
document_id: int
asset_type: str
role: str | None
sort_order: int
created_at: datetime
class Config:
from_attributes = True
class StudySessionCreate(BaseModel):
study_type: str = "certification"
certification: str | None = None
language_code: str | None = None
learning_level: str | None = None
subject: str | None = None
topic: str | None = None
source_text: str | None = None
source_page: int | None = None
mode: str = "copy"
prompt_question: str | None = None
expected_answer: str | None = None
metadata: dict[str, Any] | None = None
target_count: int | None = None
canvas_width: int | None = None
canvas_height: int | None = None
strokes_json: dict[str, Any] | None = None
# 학습 워크스페이스 묶음. 미지정 시 미분류.
study_topic_id: int | None = None
class StudySessionUpdate(BaseModel):
"""PATCH 부분 업데이트 — 명시 set 된 필드만 반영."""
certification: str | None = None
language_code: str | None = None
learning_level: str | None = None
subject: str | None = None
topic: str | None = None
source_text: str | None = None
source_page: int | None = None
mode: str | None = None
prompt_question: str | None = None
expected_answer: str | None = None
metadata: dict[str, Any] | None = None
target_count: int | None = None
repetition_count: int | None = None
canvas_width: int | None = None
canvas_height: int | None = None
strokes_json: dict[str, Any] | None = None
ocr_text: str | None = None
user_corrected_text: str | None = None
review_state: str | None = None
next_review_at: datetime | None = None
# 주제 재할당 (NULL 로 분리도 가능)
study_topic_id: int | None = None
class StudySessionResponse(BaseModel):
id: int
user_id: int
study_type: str
certification: str | None
language_code: str | None
learning_level: str | None
subject: str | None
topic: str | None
source_text: str | None
source_page: int | None
mode: str
prompt_question: str | None
expected_answer: str | None
metadata: dict[str, Any] | None = Field(default=None)
target_count: int | None
repetition_count: int
canvas_width: int | None
canvas_height: int | None
schema_version: int
strokes_json: dict[str, Any] | None
ocr_text: str | None
user_corrected_text: str | None
ai_summary: str | None
review_state: str | None
next_review_at: datetime | None
last_quiz_at: datetime | None
correct_count: int
incorrect_count: int
study_topic_id: int | None = None
assets: list[StudySessionAssetResponse]
created_at: datetime
updated_at: datetime
class StudySessionListResponse(BaseModel):
items: list[StudySessionResponse]
total: int
limit: int
offset: int
def _to_session_response(sess: StudySession) -> StudySessionResponse:
return StudySessionResponse(
id=sess.id,
user_id=sess.user_id,
study_type=sess.study_type,
certification=sess.certification,
language_code=sess.language_code,
learning_level=sess.learning_level,
subject=sess.subject,
topic=sess.topic,
source_text=sess.source_text,
source_page=sess.source_page,
mode=sess.mode,
prompt_question=sess.prompt_question,
expected_answer=sess.expected_answer,
metadata=sess.metadata_json,
target_count=sess.target_count,
repetition_count=sess.repetition_count,
canvas_width=sess.canvas_width,
canvas_height=sess.canvas_height,
schema_version=sess.schema_version,
strokes_json=sess.strokes_json,
ocr_text=sess.ocr_text,
user_corrected_text=sess.user_corrected_text,
ai_summary=sess.ai_summary,
review_state=sess.review_state,
next_review_at=sess.next_review_at,
last_quiz_at=sess.last_quiz_at,
correct_count=sess.correct_count,
incorrect_count=sess.incorrect_count,
study_topic_id=sess.study_topic_id,
assets=[
StudySessionAssetResponse.model_validate(a) for a in (sess.assets or [])
],
created_at=sess.created_at,
updated_at=sess.updated_at,
)
def _validate_create_payload(body: StudySessionCreate) -> None:
if body.study_type not in VALID_STUDY_TYPES:
raise HTTPException(
status_code=422,
detail=f"study_type 은 {sorted(VALID_STUDY_TYPES)} 중 하나여야 합니다",
)
if body.mode not in VALID_MODES:
raise HTTPException(
status_code=422,
detail=f"mode 는 {sorted(VALID_MODES)} 중 하나여야 합니다",
)
# ─── 엔드포인트 ───
@router.post("/", response_model=StudySessionResponse, status_code=201)
async def create_study_session(
body: StudySessionCreate,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
):
"""새 학습 세션 생성.
자격증 예: study_type='certification', certification='산업안전기사',
subject='산업안전보건법', topic='안전보건관리책임자의 직무', mode='copy'
어학 예: study_type='language', language_code='ja', learning_level='JLPT N3',
subject='漢字', topic='安全', source_text='安全',
metadata={'reading':'あんぜん','meaning':'안전','unit_type':'kanji'}
"""
_validate_create_payload(body)
# study_topic_id 가 주어지면 소유 검증 (다른 사용자의 주제로 매핑 차단)
if body.study_topic_id is not None:
from models.study_topic import StudyTopic as _Topic
topic = await session.get(_Topic, body.study_topic_id)
if topic is None or topic.user_id != user.id or topic.deleted_at is not None:
raise HTTPException(status_code=404, detail="학습 주제를 찾을 수 없습니다")
sess = StudySession(
user_id=user.id,
study_type=body.study_type,
certification=body.certification,
language_code=body.language_code,
learning_level=body.learning_level,
subject=body.subject,
topic=body.topic,
source_text=body.source_text,
source_page=body.source_page,
mode=body.mode,
prompt_question=body.prompt_question,
expected_answer=body.expected_answer,
metadata_json=body.metadata,
target_count=body.target_count,
canvas_width=body.canvas_width,
canvas_height=body.canvas_height,
strokes_json=body.strokes_json,
study_topic_id=body.study_topic_id,
)
session.add(sess)
await session.flush()
await session.commit()
# 새 세션은 assets 가 비어있지만 async session lazy load 우회를 위해 명시 refresh
await session.refresh(sess, attribute_names=["assets"])
return _to_session_response(sess)
@router.get("/", response_model=StudySessionListResponse)
async def list_study_sessions(
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
study_type: str | None = Query(None),
certification: str | None = Query(None),
language_code: str | None = Query(None),
learning_level: str | None = Query(None),
subject: str | None = Query(None),
topic: str | None = Query(None),
review_state: str | None = Query(None),
document_id: int | None = Query(None, description="이 문서가 연결된 세션만"),
asset_type: str | None = Query(None, description="이 asset_type 보유 세션만"),
mode: str | None = Query(None),
due_before: datetime | None = Query(None, description="next_review_at <= due_before"),
study_topic_id: int | None = Query(None, description="학습 워크스페이스(주제) id"),
order: str = Query("created_at"),
limit: int = Query(50, ge=1, le=200),
offset: int = Query(0, ge=0),
):
"""학습 세션 목록 — Phase 1 부터 모든 filter 수용 (Phase 3/4 활성 대비)."""
if study_type is not None and study_type not in VALID_STUDY_TYPES:
raise HTTPException(status_code=422, detail="study_type 값이 올바르지 않습니다")
if review_state is not None and review_state not in VALID_REVIEW_STATES:
raise HTTPException(status_code=422, detail="review_state 값이 올바르지 않습니다")
if asset_type is not None and asset_type not in VALID_ASSET_TYPES:
raise HTTPException(status_code=422, detail="asset_type 값이 올바르지 않습니다")
if mode is not None and mode not in VALID_MODES:
raise HTTPException(status_code=422, detail="mode 값이 올바르지 않습니다")
if order not in VALID_ORDERS:
raise HTTPException(status_code=422, detail="order 값이 올바르지 않습니다")
base = select(StudySession).where(StudySession.user_id == user.id)
if study_type is not None:
base = base.where(StudySession.study_type == study_type)
if certification is not None:
base = base.where(StudySession.certification == certification)
if language_code is not None:
base = base.where(StudySession.language_code == language_code)
if learning_level is not None:
base = base.where(StudySession.learning_level == learning_level)
if subject is not None:
base = base.where(StudySession.subject == subject)
if topic is not None:
base = base.where(StudySession.topic == topic)
if review_state is not None:
base = base.where(StudySession.review_state == review_state)
if mode is not None:
base = base.where(StudySession.mode == mode)
if due_before is not None:
base = base.where(StudySession.next_review_at <= due_before)
if study_topic_id is not None:
base = base.where(StudySession.study_topic_id == study_topic_id)
# assets join filter — EXISTS 서브쿼리
if document_id is not None or asset_type is not None:
asset_conditions = [StudySessionAsset.study_session_id == StudySession.id]
if document_id is not None:
asset_conditions.append(StudySessionAsset.document_id == document_id)
if asset_type is not None:
asset_conditions.append(StudySessionAsset.asset_type == asset_type)
base = base.where(
select(StudySessionAsset.id)
.where(and_(*asset_conditions))
.exists()
)
count_query = select(func.count()).select_from(base.subquery())
total = (await session.execute(count_query)).scalar() or 0
if order == "next_review_at":
ordered = base.order_by(StudySession.next_review_at.asc().nullslast(), StudySession.id.desc())
elif order == "last_quiz_at":
ordered = base.order_by(StudySession.last_quiz_at.desc().nullslast(), StudySession.id.desc())
else:
ordered = base.order_by(StudySession.created_at.desc(), StudySession.id.desc())
ordered = (
ordered.options(selectinload(StudySession.assets))
.offset(offset)
.limit(limit)
)
rows = (await session.execute(ordered)).scalars().all()
return StudySessionListResponse(
items=[_to_session_response(s) for s in rows],
total=total,
limit=limit,
offset=offset,
)
@router.get("/groups")
async def get_study_groups(
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
):
"""도메인별 그룹 카운트 (Phase 3 모바일 카드 메뉴 대비, Phase 1 부터 endpoint 제공).
응답: {by_type: {certification: {...}, language: {...}}}
"""
# certification 그룹: certification → subject → topic
cert_query = (
select(
StudySession.certification,
StudySession.subject,
StudySession.topic,
func.count().label("session_count"),
func.count().filter(StudySession.review_state == "weak").label("weak_count"),
func.count()
.filter(
and_(
StudySession.next_review_at.is_not(None),
StudySession.next_review_at <= datetime.now(timezone.utc),
)
)
.label("due_count"),
)
.where(
StudySession.user_id == user.id,
StudySession.study_type == "certification",
)
.group_by(StudySession.certification, StudySession.subject, StudySession.topic)
)
cert_rows = (await session.execute(cert_query)).all()
# language 그룹: language_code → learning_level → subject → topic + assets 보유 여부
lang_query = (
select(
StudySession.language_code,
StudySession.learning_level,
StudySession.subject,
StudySession.topic,
func.count().label("session_count"),
func.count().filter(StudySession.review_state == "weak").label("weak_count"),
func.count()
.filter(
and_(
StudySession.next_review_at.is_not(None),
StudySession.next_review_at <= datetime.now(timezone.utc),
)
)
.label("due_count"),
)
.where(
StudySession.user_id == user.id,
StudySession.study_type == "language",
)
.group_by(
StudySession.language_code,
StudySession.learning_level,
StudySession.subject,
StudySession.topic,
)
)
lang_rows = (await session.execute(lang_query)).all()
# 어학 그룹의 has_audio / has_video — 별도 카운트 (assets 와 join)
media_query = (
select(
StudySession.language_code,
StudySession.learning_level,
StudySession.subject,
StudySession.topic,
StudySessionAsset.asset_type,
func.count().label("c"),
)
.join(StudySessionAsset, StudySessionAsset.study_session_id == StudySession.id)
.where(
StudySession.user_id == user.id,
StudySession.study_type == "language",
StudySessionAsset.asset_type.in_(["audio", "video"]),
)
.group_by(
StudySession.language_code,
StudySession.learning_level,
StudySession.subject,
StudySession.topic,
StudySessionAsset.asset_type,
)
)
media_rows = (await session.execute(media_query)).all()
media_map: dict[tuple, dict[str, int]] = {}
for r in media_rows:
key = (r.language_code, r.learning_level, r.subject, r.topic)
media_map.setdefault(key, {"audio": 0, "video": 0})[r.asset_type] = r.c
# certification 트리 빌드
cert_groups: dict[str | None, dict[str | None, dict[str | None, dict]]] = {}
for r in cert_rows:
cert_groups.setdefault(r.certification, {}).setdefault(r.subject, {})[r.topic] = {
"session_count": r.session_count,
"weak_count": r.weak_count,
"due_count": r.due_count,
}
cert_out = []
for cert_name, subjects in cert_groups.items():
subj_list = []
sess_total = weak_total = due_total = 0
for subj_name, topics in subjects.items():
topic_list = []
s_count = w_count = d_count = 0
for topic_name, stats in topics.items():
topic_list.append({
"topic": topic_name,
"session_count": stats["session_count"],
"weak_count": stats["weak_count"],
"due_count": stats["due_count"],
})
s_count += stats["session_count"]
w_count += stats["weak_count"]
d_count += stats["due_count"]
subj_list.append({
"subject": subj_name,
"topics": topic_list,
"session_count": s_count,
"weak_count": w_count,
"due_count": d_count,
})
sess_total += s_count
weak_total += w_count
due_total += d_count
cert_out.append({
"certification": cert_name,
"subjects": subj_list,
"session_count": sess_total,
"weak_count": weak_total,
"due_count": due_total,
})
# language 트리 빌드
lang_groups: dict[str | None, dict[str | None, dict[str | None, dict[str | None, dict]]]] = {}
for r in lang_rows:
media = media_map.get(
(r.language_code, r.learning_level, r.subject, r.topic),
{"audio": 0, "video": 0},
)
(
lang_groups
.setdefault(r.language_code, {})
.setdefault(r.learning_level, {})
.setdefault(r.subject, {})[r.topic]
) = {
"session_count": r.session_count,
"weak_count": r.weak_count,
"due_count": r.due_count,
"has_audio": media["audio"] > 0,
"has_video": media["video"] > 0,
}
lang_out = []
for lang_code, levels in lang_groups.items():
for level_name, subjects in levels.items():
subj_list = []
for subj_name, topics in subjects.items():
topic_list = []
for topic_name, stats in topics.items():
topic_list.append({
"topic": topic_name,
"session_count": stats["session_count"],
"weak_count": stats["weak_count"],
"due_count": stats["due_count"],
"has_audio": stats["has_audio"],
"has_video": stats["has_video"],
})
subj_list.append({"subject": subj_name, "topics": topic_list})
lang_out.append({
"language_code": lang_code,
"learning_level": level_name,
"subjects": subj_list,
})
return {
"by_type": {
"certification": {"groups": cert_out},
"language": {"groups": lang_out},
}
}
@router.get("/{session_id}", response_model=StudySessionResponse)
async def get_study_session(
session_id: int,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
):
sess = await session.get(
StudySession, session_id, options=[selectinload(StudySession.assets)]
)
sess = _verify_session_ownership(sess, user)
return _to_session_response(sess)
@router.patch("/{session_id}", response_model=StudySessionResponse)
async def update_study_session(
session_id: int,
body: StudySessionUpdate,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
):
sess = await session.get(
StudySession, session_id, options=[selectinload(StudySession.assets)]
)
sess = _verify_session_ownership(sess, user)
# 명시 set 된 필드만 적용
fields_set = body.model_fields_set
if "mode" in fields_set:
if body.mode not in VALID_MODES:
raise HTTPException(status_code=422, detail="mode 값이 올바르지 않습니다")
sess.mode = body.mode
if "review_state" in fields_set:
if body.review_state not in VALID_REVIEW_STATES:
raise HTTPException(status_code=422, detail="review_state 값이 올바르지 않습니다")
sess.review_state = body.review_state
# study_topic_id 변경 시 소유 검증
if "study_topic_id" in fields_set and body.study_topic_id is not None:
from models.study_topic import StudyTopic as _Topic
topic = await session.get(_Topic, body.study_topic_id)
if topic is None or topic.user_id != user.id or topic.deleted_at is not None:
raise HTTPException(status_code=404, detail="학습 주제를 찾을 수 없습니다")
# 단순 매핑 필드 (검증 불필요)
SIMPLE_FIELDS = {
"certification", "language_code", "learning_level", "subject", "topic",
"source_text", "source_page", "prompt_question", "expected_answer",
"target_count", "repetition_count",
"canvas_width", "canvas_height", "strokes_json",
"ocr_text", "user_corrected_text", "next_review_at",
"study_topic_id",
}
for fname in SIMPLE_FIELDS & fields_set:
setattr(sess, fname, getattr(body, fname))
if "metadata" in fields_set:
sess.metadata_json = body.metadata
sess.updated_at = datetime.now(timezone.utc)
await session.commit()
await session.refresh(sess, attribute_names=["assets"])
return _to_session_response(sess)
@router.delete("/{session_id}", status_code=204)
async def delete_study_session(
session_id: int,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
):
"""학습 세션 삭제. 연관 assets 도 cascade 로 함께 제거 (DB ON DELETE CASCADE).
documents 본체는 유지 — assets row 만 사라진다.
"""
sess = await session.get(StudySession, session_id)
sess = _verify_session_ownership(sess, user)
await session.delete(sess)
await session.commit()
# ─── Assets 엔드포인트 ───
@router.post(
"/{session_id}/assets",
response_model=StudySessionAssetResponse,
status_code=201,
)
async def link_study_asset(
session_id: int,
body: StudySessionAssetCreate,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
):
"""기존 documents 의 id 를 study_session 에 asset 으로 연결.
409: 같은 (session, document, asset_type, role) 조합 이미 존재.
"""
if body.asset_type not in VALID_ASSET_TYPES:
raise HTTPException(
status_code=422,
detail=f"asset_type 은 {sorted(VALID_ASSET_TYPES)} 중 하나여야 합니다",
)
if body.role not in VALID_ROLES:
raise HTTPException(
status_code=422,
detail=f"role 은 {sorted(r for r in VALID_ROLES if r is not None)} 중 하나 또는 NULL 이어야 합니다",
)
sess = await session.get(StudySession, session_id)
sess = _verify_session_ownership(sess, user)
doc = await session.get(Document, body.document_id)
_verify_document_ownership(doc, user)
# 사전 SELECT 로 중복 검사 + DB UNIQUE 제약 둘 다 — race condition 안전.
existing = await session.execute(
select(StudySessionAsset).where(
StudySessionAsset.study_session_id == session_id,
StudySessionAsset.document_id == body.document_id,
StudySessionAsset.asset_type == body.asset_type,
StudySessionAsset.role.is_(body.role) if body.role is None
else StudySessionAsset.role == body.role,
)
)
if existing.scalar_one_or_none() is not None:
raise HTTPException(
status_code=409,
detail={
"error_code": "asset_already_linked",
"message": "해당 문서가 이미 같은 asset_type/role 로 연결되어 있습니다",
},
)
asset = StudySessionAsset(
study_session_id=session_id,
document_id=body.document_id,
asset_type=body.asset_type,
role=body.role,
sort_order=body.sort_order,
)
session.add(asset)
try:
await session.commit()
except IntegrityError:
await session.rollback()
# UNIQUE 위반 — 위 사전 SELECT 와 race 했을 가능성. 동일 메시지로 응답.
raise HTTPException(
status_code=409,
detail={
"error_code": "asset_already_linked",
"message": "해당 문서가 이미 같은 asset_type/role 로 연결되어 있습니다",
},
)
await session.refresh(asset)
return StudySessionAssetResponse.model_validate(asset)
@router.delete(
"/{session_id}/assets/{asset_id}", status_code=204
)
async def unlink_study_asset(
session_id: int,
asset_id: int,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
):
"""asset 연결 해제. documents 본체는 유지."""
sess = await session.get(StudySession, session_id)
sess = _verify_session_ownership(sess, user)
asset = await session.get(StudySessionAsset, asset_id)
if asset is None or asset.study_session_id != session_id:
raise HTTPException(status_code=404, detail="asset 을 찾을 수 없습니다")
await session.delete(asset)
await session.commit()
# ─── Snapshot (PNG 업로드) ───
@router.post("/{session_id}/snapshot", response_model=StudySessionAssetResponse, status_code=201)
async def upload_handwriting_snapshot(
session_id: int,
request: Request,
file: UploadFile,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
sort_order: int = Form(0),
):
"""캔버스 PNG 업로드 → documents 등록 + handwriting_png asset 연결.
documents.py upload_document 의 atomic rename + error_code 패턴을 PNG 전용으로 차용.
동일 세션에 여러 snapshot 누적 가능 (UNIQUE 제약은 (session, document, type, role) 단위라
document_id 가 매번 새로 생기므로 충돌 없음).
"""
sess = await session.get(StudySession, session_id)
sess = _verify_session_ownership(sess, user)
if not file.filename:
raise _upload_error(400, "invalid_input", "파일명이 필요합니다")
safe_name = Path(file.filename).name
if not safe_name or safe_name.startswith("."):
raise _upload_error(400, "invalid_input", "유효하지 않은 파일명")
ext = Path(safe_name).suffix.lower()
if ext != ".png":
raise _upload_error(
400, "invalid_input", "snapshot 은 PNG 파일만 지원합니다",
)
max_bytes = settings.upload.max_bytes
slack_ratio = settings.upload.content_length_slack_ratio
chunk_size = settings.upload.stream_chunk_bytes
# Content-Length 사전 차단
cl_header = request.headers.get("content-length")
if cl_header:
try:
cl = int(cl_header)
if cl > int(max_bytes * slack_ratio):
raise _upload_error(413, "body_too_large", "파일이 너무 큽니다")
except ValueError:
pass
# NAS Inbox 경로 결정 + 충돌 회피
inbox_dir = Path(settings.nas_mount_path) / "PKM" / "Inbox"
inbox_dir.mkdir(parents=True, exist_ok=True)
target = (inbox_dir / safe_name).resolve()
if not str(target).startswith(str(inbox_dir.resolve())):
raise _upload_error(400, "invalid_input", "잘못된 파일 경로")
counter = 1
stem, suffix = target.stem, target.suffix
staging = target.with_name(target.name + ".uploading")
while target.exists() or staging.exists():
target = inbox_dir.resolve() / f"{stem}_{counter}{suffix}"
staging = target.with_name(target.name + ".uploading")
counter += 1
# 스트리밍 저장 + 누적 사이즈 검증
written = 0
try:
with staging.open("wb") as f:
while chunk := await file.read(chunk_size):
written += len(chunk)
if written > max_bytes:
raise _upload_error(413, "body_too_large", "파일이 너무 큽니다")
f.write(chunk)
if written == 0:
raise _upload_error(400, "empty_file", "빈 파일은 업로드할 수 없습니다")
except ClientDisconnect:
staging.unlink(missing_ok=True)
logger.info("snapshot aborted by client: %s (written=%d)", safe_name, written)
raise _upload_error(499, "network_abort", "업로드가 취소되었습니다")
except asyncio.TimeoutError:
staging.unlink(missing_ok=True)
logger.warning("snapshot timeout: %s (written=%d)", safe_name, written)
raise _upload_error(408, "upload_timeout", "업로드 시간 초과")
except HTTPException:
staging.unlink(missing_ok=True)
raise
except Exception:
staging.unlink(missing_ok=True)
logger.exception("snapshot internal error: %s (written=%d)", safe_name, written)
raise _upload_error(500, "internal", "업로드 처리 중 오류가 발생했습니다")
# atomic rename → 최종 경로
try:
staging.replace(target)
except OSError:
staging.unlink(missing_ok=True)
logger.exception("snapshot rename failed: %s -> %s", staging, target)
raise _upload_error(500, "internal", "파일 저장 후 정리 중 오류가 발생했습니다")
# Document + ProcessingQueue('extract') + StudySessionAsset 단일 트랜잭션
rel_path = str(target.relative_to(Path(settings.nas_mount_path)))
fhash = file_hash(target)
# 학습 세션 메타에서 user_tags 합성
domain_tag = sess.certification or sess.language_code or "general"
user_tags = ["handwriting", domain_tag]
if sess.subject:
user_tags.append(sess.subject)
title = f"필기 — {sess.topic or sess.subject or 'study session'} #{session_id}"
try:
doc = Document(
file_path=rel_path,
file_hash=fhash,
file_format="png",
file_size=written,
file_type="immutable",
title=title,
user_tags=user_tags,
)
session.add(doc)
await session.flush()
await enqueue_stage(session, doc.id, "extract")
asset = StudySessionAsset(
study_session_id=session_id,
document_id=doc.id,
asset_type="handwriting_png",
role="answer",
sort_order=sort_order,
)
session.add(asset)
await session.commit()
await session.refresh(asset)
except Exception:
# DB 트랜잭션은 자동 rollback. 파일은 별도 자원 → 명시 unlink.
target.unlink(missing_ok=True)
raise
return StudySessionAssetResponse.model_validate(asset)