"""학습 세션 API — Phase 1 MVP (자격증 + 어학 일반화) iPad 손글씨 필사 / 모바일 암기노트 / 모바일 퀴즈 가 같은 study_sessions 데이터를 공유. 본 모듈은 Phase 1 = iPad 필사 세션 + DB/API 일반화 까지만 다룬다. 핵심: - study_type 'certification' | 'language' 분기. metadata jsonb 가 도메인별 자유 메타. - 단일 *_document_id 컬럼 ❌. 모든 미디어 연결은 study_session_assets 로 통일. - documents 본체는 절대 삭제하지 않음 (assets 연결만 해제). - ownership 검증: study_sessions.user_id == current_user.id (필수). documents 는 single-user 시스템이라 컬럼 부재 — 미래 multi-user 대비 `getattr(doc, 'user_id', None)` 로 부드럽게 검증 (값 있으면 비교, 없으면 통과). - 409 중복: UNIQUE(study_session_id, document_id, asset_type, role) 위반. Phase 2~4 미사용 필드 (review_state / quiz / ocr_text / ai_summary / prompt 등) 는 스키마에만 존재, 자동 로직 없음. 별도 PR 에서 활성. """ import asyncio import logging from datetime import datetime, timezone from pathlib import Path from typing import Annotated, Any from fastapi import ( APIRouter, Depends, Form, HTTPException, Query, Request, UploadFile, ) from pydantic import BaseModel, Field from sqlalchemy import and_, delete, func, select from sqlalchemy.exc import IntegrityError from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.orm import selectinload from starlette.requests import ClientDisconnect from core.auth import get_current_user from core.config import settings from core.database import get_session from core.utils import file_hash from models.document import Document from models.queue import enqueue_stage from models.study_session import StudySession, StudySessionAsset from models.user import User logger = logging.getLogger(__name__) router = APIRouter() # ─── Enum 검증 상수 ─── VALID_STUDY_TYPES: set[str] = {"certification", "language"} VALID_MODES: set[str] = { "copy", "trace", "blank-repeat", "dictation", "shadowing", "quiz", "flashcard", # Phase 2~4 활성, schema 만 수용 } VALID_ASSET_TYPES: set[str] = { "source_scan", "handwriting_png", "audio", "video", "transcript", "reference", } VALID_ROLES: set[str | None] = { None, "prompt", "answer", "pronunciation", "lecture", "listening_source", "shadowing_source", "reference", } VALID_REVIEW_STATES: set[str | None] = { None, "new", "learning", "weak", "mastered", } VALID_ORDERS: set[str] = {"created_at", "next_review_at", "last_quiz_at"} # ─── Helpers ─── def _upload_error(status_code: int, error_code: str, message: str) -> HTTPException: """업로드 실패 응답 — documents.py 와 동일한 패턴.""" return HTTPException( status_code=status_code, detail={"error_code": error_code, "message": message}, ) def _verify_session_ownership( sess: StudySession | None, user: User ) -> StudySession: """세션 ownership 검증. 정보 누설 방지로 mismatch 도 404.""" if sess is None or sess.user_id != user.id: raise HTTPException(status_code=404, detail="학습 세션을 찾을 수 없습니다") return sess def _verify_document_ownership(doc: Document | None, user: User) -> Document: """문서 ownership 검증. documents.user_id 컬럼은 현재 single-user 시스템이라 부재. 미래 multi-user 대비 `getattr` 로 안전하게 비교. """ if doc is None or getattr(doc, "deleted_at", None) is not None: raise HTTPException(status_code=404, detail="문서를 찾을 수 없습니다") doc_user_id = getattr(doc, "user_id", None) if doc_user_id is not None and doc_user_id != user.id: raise HTTPException(status_code=404, detail="문서를 찾을 수 없습니다") return doc # ─── Pydantic Schemas ─── class StudySessionAssetCreate(BaseModel): document_id: int asset_type: str role: str | None = None sort_order: int = 0 class StudySessionAssetResponse(BaseModel): id: int document_id: int asset_type: str role: str | None sort_order: int created_at: datetime class Config: from_attributes = True class StudySessionCreate(BaseModel): study_type: str = "certification" certification: str | None = None language_code: str | None = None learning_level: str | None = None subject: str | None = None topic: str | None = None source_text: str | None = None source_page: int | None = None mode: str = "copy" prompt_question: str | None = None expected_answer: str | None = None metadata: dict[str, Any] | None = None target_count: int | None = None canvas_width: int | None = None canvas_height: int | None = None strokes_json: dict[str, Any] | None = None # 학습 워크스페이스 묶음. 미지정 시 미분류. study_topic_id: int | None = None class StudySessionUpdate(BaseModel): """PATCH 부분 업데이트 — 명시 set 된 필드만 반영.""" certification: str | None = None language_code: str | None = None learning_level: str | None = None subject: str | None = None topic: str | None = None source_text: str | None = None source_page: int | None = None mode: str | None = None prompt_question: str | None = None expected_answer: str | None = None metadata: dict[str, Any] | None = None target_count: int | None = None repetition_count: int | None = None canvas_width: int | None = None canvas_height: int | None = None strokes_json: dict[str, Any] | None = None ocr_text: str | None = None user_corrected_text: str | None = None review_state: str | None = None next_review_at: datetime | None = None # 주제 재할당 (NULL 로 분리도 가능) study_topic_id: int | None = None class StudySessionResponse(BaseModel): id: int user_id: int study_type: str certification: str | None language_code: str | None learning_level: str | None subject: str | None topic: str | None source_text: str | None source_page: int | None mode: str prompt_question: str | None expected_answer: str | None metadata: dict[str, Any] | None = Field(default=None) target_count: int | None repetition_count: int canvas_width: int | None canvas_height: int | None schema_version: int strokes_json: dict[str, Any] | None ocr_text: str | None user_corrected_text: str | None ai_summary: str | None review_state: str | None next_review_at: datetime | None last_quiz_at: datetime | None correct_count: int incorrect_count: int study_topic_id: int | None = None assets: list[StudySessionAssetResponse] created_at: datetime updated_at: datetime class StudySessionListResponse(BaseModel): items: list[StudySessionResponse] total: int limit: int offset: int def _to_session_response(sess: StudySession) -> StudySessionResponse: return StudySessionResponse( id=sess.id, user_id=sess.user_id, study_type=sess.study_type, certification=sess.certification, language_code=sess.language_code, learning_level=sess.learning_level, subject=sess.subject, topic=sess.topic, source_text=sess.source_text, source_page=sess.source_page, mode=sess.mode, prompt_question=sess.prompt_question, expected_answer=sess.expected_answer, metadata=sess.metadata_json, target_count=sess.target_count, repetition_count=sess.repetition_count, canvas_width=sess.canvas_width, canvas_height=sess.canvas_height, schema_version=sess.schema_version, strokes_json=sess.strokes_json, ocr_text=sess.ocr_text, user_corrected_text=sess.user_corrected_text, ai_summary=sess.ai_summary, review_state=sess.review_state, next_review_at=sess.next_review_at, last_quiz_at=sess.last_quiz_at, correct_count=sess.correct_count, incorrect_count=sess.incorrect_count, study_topic_id=sess.study_topic_id, assets=[ StudySessionAssetResponse.model_validate(a) for a in (sess.assets or []) ], created_at=sess.created_at, updated_at=sess.updated_at, ) def _validate_create_payload(body: StudySessionCreate) -> None: if body.study_type not in VALID_STUDY_TYPES: raise HTTPException( status_code=422, detail=f"study_type 은 {sorted(VALID_STUDY_TYPES)} 중 하나여야 합니다", ) if body.mode not in VALID_MODES: raise HTTPException( status_code=422, detail=f"mode 는 {sorted(VALID_MODES)} 중 하나여야 합니다", ) # ─── 엔드포인트 ─── @router.post("/", response_model=StudySessionResponse, status_code=201) async def create_study_session( body: StudySessionCreate, user: Annotated[User, Depends(get_current_user)], session: Annotated[AsyncSession, Depends(get_session)], ): """새 학습 세션 생성. 자격증 예: study_type='certification', certification='산업안전기사', subject='산업안전보건법', topic='안전보건관리책임자의 직무', mode='copy' 어학 예: study_type='language', language_code='ja', learning_level='JLPT N3', subject='漢字', topic='安全', source_text='安全', metadata={'reading':'あんぜん','meaning':'안전','unit_type':'kanji'} """ _validate_create_payload(body) # study_topic_id 가 주어지면 소유 검증 (다른 사용자의 주제로 매핑 차단) if body.study_topic_id is not None: from models.study_topic import StudyTopic as _Topic topic = await session.get(_Topic, body.study_topic_id) if topic is None or topic.user_id != user.id or topic.deleted_at is not None: raise HTTPException(status_code=404, detail="학습 주제를 찾을 수 없습니다") sess = StudySession( user_id=user.id, study_type=body.study_type, certification=body.certification, language_code=body.language_code, learning_level=body.learning_level, subject=body.subject, topic=body.topic, source_text=body.source_text, source_page=body.source_page, mode=body.mode, prompt_question=body.prompt_question, expected_answer=body.expected_answer, metadata_json=body.metadata, target_count=body.target_count, canvas_width=body.canvas_width, canvas_height=body.canvas_height, strokes_json=body.strokes_json, study_topic_id=body.study_topic_id, ) session.add(sess) await session.flush() await session.commit() # 새 세션은 assets 가 비어있지만 async session lazy load 우회를 위해 명시 refresh await session.refresh(sess, attribute_names=["assets"]) return _to_session_response(sess) @router.get("/", response_model=StudySessionListResponse) async def list_study_sessions( user: Annotated[User, Depends(get_current_user)], session: Annotated[AsyncSession, Depends(get_session)], study_type: str | None = Query(None), certification: str | None = Query(None), language_code: str | None = Query(None), learning_level: str | None = Query(None), subject: str | None = Query(None), topic: str | None = Query(None), review_state: str | None = Query(None), document_id: int | None = Query(None, description="이 문서가 연결된 세션만"), asset_type: str | None = Query(None, description="이 asset_type 보유 세션만"), mode: str | None = Query(None), due_before: datetime | None = Query(None, description="next_review_at <= due_before"), study_topic_id: int | None = Query(None, description="학습 워크스페이스(주제) id"), order: str = Query("created_at"), limit: int = Query(50, ge=1, le=200), offset: int = Query(0, ge=0), ): """학습 세션 목록 — Phase 1 부터 모든 filter 수용 (Phase 3/4 활성 대비).""" if study_type is not None and study_type not in VALID_STUDY_TYPES: raise HTTPException(status_code=422, detail="study_type 값이 올바르지 않습니다") if review_state is not None and review_state not in VALID_REVIEW_STATES: raise HTTPException(status_code=422, detail="review_state 값이 올바르지 않습니다") if asset_type is not None and asset_type not in VALID_ASSET_TYPES: raise HTTPException(status_code=422, detail="asset_type 값이 올바르지 않습니다") if mode is not None and mode not in VALID_MODES: raise HTTPException(status_code=422, detail="mode 값이 올바르지 않습니다") if order not in VALID_ORDERS: raise HTTPException(status_code=422, detail="order 값이 올바르지 않습니다") base = select(StudySession).where(StudySession.user_id == user.id) if study_type is not None: base = base.where(StudySession.study_type == study_type) if certification is not None: base = base.where(StudySession.certification == certification) if language_code is not None: base = base.where(StudySession.language_code == language_code) if learning_level is not None: base = base.where(StudySession.learning_level == learning_level) if subject is not None: base = base.where(StudySession.subject == subject) if topic is not None: base = base.where(StudySession.topic == topic) if review_state is not None: base = base.where(StudySession.review_state == review_state) if mode is not None: base = base.where(StudySession.mode == mode) if due_before is not None: base = base.where(StudySession.next_review_at <= due_before) if study_topic_id is not None: base = base.where(StudySession.study_topic_id == study_topic_id) # assets join filter — EXISTS 서브쿼리 if document_id is not None or asset_type is not None: asset_conditions = [StudySessionAsset.study_session_id == StudySession.id] if document_id is not None: asset_conditions.append(StudySessionAsset.document_id == document_id) if asset_type is not None: asset_conditions.append(StudySessionAsset.asset_type == asset_type) base = base.where( select(StudySessionAsset.id) .where(and_(*asset_conditions)) .exists() ) count_query = select(func.count()).select_from(base.subquery()) total = (await session.execute(count_query)).scalar() or 0 if order == "next_review_at": ordered = base.order_by(StudySession.next_review_at.asc().nullslast(), StudySession.id.desc()) elif order == "last_quiz_at": ordered = base.order_by(StudySession.last_quiz_at.desc().nullslast(), StudySession.id.desc()) else: ordered = base.order_by(StudySession.created_at.desc(), StudySession.id.desc()) ordered = ( ordered.options(selectinload(StudySession.assets)) .offset(offset) .limit(limit) ) rows = (await session.execute(ordered)).scalars().all() return StudySessionListResponse( items=[_to_session_response(s) for s in rows], total=total, limit=limit, offset=offset, ) @router.get("/groups") async def get_study_groups( user: Annotated[User, Depends(get_current_user)], session: Annotated[AsyncSession, Depends(get_session)], ): """도메인별 그룹 카운트 (Phase 3 모바일 카드 메뉴 대비, Phase 1 부터 endpoint 제공). 응답: {by_type: {certification: {...}, language: {...}}} """ # certification 그룹: certification → subject → topic cert_query = ( select( StudySession.certification, StudySession.subject, StudySession.topic, func.count().label("session_count"), func.count().filter(StudySession.review_state == "weak").label("weak_count"), func.count() .filter( and_( StudySession.next_review_at.is_not(None), StudySession.next_review_at <= datetime.now(timezone.utc), ) ) .label("due_count"), ) .where( StudySession.user_id == user.id, StudySession.study_type == "certification", ) .group_by(StudySession.certification, StudySession.subject, StudySession.topic) ) cert_rows = (await session.execute(cert_query)).all() # language 그룹: language_code → learning_level → subject → topic + assets 보유 여부 lang_query = ( select( StudySession.language_code, StudySession.learning_level, StudySession.subject, StudySession.topic, func.count().label("session_count"), func.count().filter(StudySession.review_state == "weak").label("weak_count"), func.count() .filter( and_( StudySession.next_review_at.is_not(None), StudySession.next_review_at <= datetime.now(timezone.utc), ) ) .label("due_count"), ) .where( StudySession.user_id == user.id, StudySession.study_type == "language", ) .group_by( StudySession.language_code, StudySession.learning_level, StudySession.subject, StudySession.topic, ) ) lang_rows = (await session.execute(lang_query)).all() # 어학 그룹의 has_audio / has_video — 별도 카운트 (assets 와 join) media_query = ( select( StudySession.language_code, StudySession.learning_level, StudySession.subject, StudySession.topic, StudySessionAsset.asset_type, func.count().label("c"), ) .join(StudySessionAsset, StudySessionAsset.study_session_id == StudySession.id) .where( StudySession.user_id == user.id, StudySession.study_type == "language", StudySessionAsset.asset_type.in_(["audio", "video"]), ) .group_by( StudySession.language_code, StudySession.learning_level, StudySession.subject, StudySession.topic, StudySessionAsset.asset_type, ) ) media_rows = (await session.execute(media_query)).all() media_map: dict[tuple, dict[str, int]] = {} for r in media_rows: key = (r.language_code, r.learning_level, r.subject, r.topic) media_map.setdefault(key, {"audio": 0, "video": 0})[r.asset_type] = r.c # certification 트리 빌드 cert_groups: dict[str | None, dict[str | None, dict[str | None, dict]]] = {} for r in cert_rows: cert_groups.setdefault(r.certification, {}).setdefault(r.subject, {})[r.topic] = { "session_count": r.session_count, "weak_count": r.weak_count, "due_count": r.due_count, } cert_out = [] for cert_name, subjects in cert_groups.items(): subj_list = [] sess_total = weak_total = due_total = 0 for subj_name, topics in subjects.items(): topic_list = [] s_count = w_count = d_count = 0 for topic_name, stats in topics.items(): topic_list.append({ "topic": topic_name, "session_count": stats["session_count"], "weak_count": stats["weak_count"], "due_count": stats["due_count"], }) s_count += stats["session_count"] w_count += stats["weak_count"] d_count += stats["due_count"] subj_list.append({ "subject": subj_name, "topics": topic_list, "session_count": s_count, "weak_count": w_count, "due_count": d_count, }) sess_total += s_count weak_total += w_count due_total += d_count cert_out.append({ "certification": cert_name, "subjects": subj_list, "session_count": sess_total, "weak_count": weak_total, "due_count": due_total, }) # language 트리 빌드 lang_groups: dict[str | None, dict[str | None, dict[str | None, dict[str | None, dict]]]] = {} for r in lang_rows: media = media_map.get( (r.language_code, r.learning_level, r.subject, r.topic), {"audio": 0, "video": 0}, ) ( lang_groups .setdefault(r.language_code, {}) .setdefault(r.learning_level, {}) .setdefault(r.subject, {})[r.topic] ) = { "session_count": r.session_count, "weak_count": r.weak_count, "due_count": r.due_count, "has_audio": media["audio"] > 0, "has_video": media["video"] > 0, } lang_out = [] for lang_code, levels in lang_groups.items(): for level_name, subjects in levels.items(): subj_list = [] for subj_name, topics in subjects.items(): topic_list = [] for topic_name, stats in topics.items(): topic_list.append({ "topic": topic_name, "session_count": stats["session_count"], "weak_count": stats["weak_count"], "due_count": stats["due_count"], "has_audio": stats["has_audio"], "has_video": stats["has_video"], }) subj_list.append({"subject": subj_name, "topics": topic_list}) lang_out.append({ "language_code": lang_code, "learning_level": level_name, "subjects": subj_list, }) return { "by_type": { "certification": {"groups": cert_out}, "language": {"groups": lang_out}, } } @router.get("/{session_id}", response_model=StudySessionResponse) async def get_study_session( session_id: int, user: Annotated[User, Depends(get_current_user)], session: Annotated[AsyncSession, Depends(get_session)], ): sess = await session.get( StudySession, session_id, options=[selectinload(StudySession.assets)] ) sess = _verify_session_ownership(sess, user) return _to_session_response(sess) @router.patch("/{session_id}", response_model=StudySessionResponse) async def update_study_session( session_id: int, body: StudySessionUpdate, user: Annotated[User, Depends(get_current_user)], session: Annotated[AsyncSession, Depends(get_session)], ): sess = await session.get( StudySession, session_id, options=[selectinload(StudySession.assets)] ) sess = _verify_session_ownership(sess, user) # 명시 set 된 필드만 적용 fields_set = body.model_fields_set if "mode" in fields_set: if body.mode not in VALID_MODES: raise HTTPException(status_code=422, detail="mode 값이 올바르지 않습니다") sess.mode = body.mode if "review_state" in fields_set: if body.review_state not in VALID_REVIEW_STATES: raise HTTPException(status_code=422, detail="review_state 값이 올바르지 않습니다") sess.review_state = body.review_state # study_topic_id 변경 시 소유 검증 if "study_topic_id" in fields_set and body.study_topic_id is not None: from models.study_topic import StudyTopic as _Topic topic = await session.get(_Topic, body.study_topic_id) if topic is None or topic.user_id != user.id or topic.deleted_at is not None: raise HTTPException(status_code=404, detail="학습 주제를 찾을 수 없습니다") # 단순 매핑 필드 (검증 불필요) SIMPLE_FIELDS = { "certification", "language_code", "learning_level", "subject", "topic", "source_text", "source_page", "prompt_question", "expected_answer", "target_count", "repetition_count", "canvas_width", "canvas_height", "strokes_json", "ocr_text", "user_corrected_text", "next_review_at", "study_topic_id", } for fname in SIMPLE_FIELDS & fields_set: setattr(sess, fname, getattr(body, fname)) if "metadata" in fields_set: sess.metadata_json = body.metadata sess.updated_at = datetime.now(timezone.utc) await session.commit() await session.refresh(sess, attribute_names=["assets"]) return _to_session_response(sess) @router.delete("/{session_id}", status_code=204) async def delete_study_session( session_id: int, user: Annotated[User, Depends(get_current_user)], session: Annotated[AsyncSession, Depends(get_session)], ): """학습 세션 삭제. 연관 assets 도 cascade 로 함께 제거 (DB ON DELETE CASCADE). documents 본체는 유지 — assets row 만 사라진다. """ sess = await session.get(StudySession, session_id) sess = _verify_session_ownership(sess, user) await session.delete(sess) await session.commit() # ─── Assets 엔드포인트 ─── @router.post( "/{session_id}/assets", response_model=StudySessionAssetResponse, status_code=201, ) async def link_study_asset( session_id: int, body: StudySessionAssetCreate, user: Annotated[User, Depends(get_current_user)], session: Annotated[AsyncSession, Depends(get_session)], ): """기존 documents 의 id 를 study_session 에 asset 으로 연결. 409: 같은 (session, document, asset_type, role) 조합 이미 존재. """ if body.asset_type not in VALID_ASSET_TYPES: raise HTTPException( status_code=422, detail=f"asset_type 은 {sorted(VALID_ASSET_TYPES)} 중 하나여야 합니다", ) if body.role not in VALID_ROLES: raise HTTPException( status_code=422, detail=f"role 은 {sorted(r for r in VALID_ROLES if r is not None)} 중 하나 또는 NULL 이어야 합니다", ) sess = await session.get(StudySession, session_id) sess = _verify_session_ownership(sess, user) doc = await session.get(Document, body.document_id) _verify_document_ownership(doc, user) # 사전 SELECT 로 중복 검사 + DB UNIQUE 제약 둘 다 — race condition 안전. existing = await session.execute( select(StudySessionAsset).where( StudySessionAsset.study_session_id == session_id, StudySessionAsset.document_id == body.document_id, StudySessionAsset.asset_type == body.asset_type, StudySessionAsset.role.is_(body.role) if body.role is None else StudySessionAsset.role == body.role, ) ) if existing.scalar_one_or_none() is not None: raise HTTPException( status_code=409, detail={ "error_code": "asset_already_linked", "message": "해당 문서가 이미 같은 asset_type/role 로 연결되어 있습니다", }, ) asset = StudySessionAsset( study_session_id=session_id, document_id=body.document_id, asset_type=body.asset_type, role=body.role, sort_order=body.sort_order, ) session.add(asset) try: await session.commit() except IntegrityError: await session.rollback() # UNIQUE 위반 — 위 사전 SELECT 와 race 했을 가능성. 동일 메시지로 응답. raise HTTPException( status_code=409, detail={ "error_code": "asset_already_linked", "message": "해당 문서가 이미 같은 asset_type/role 로 연결되어 있습니다", }, ) await session.refresh(asset) return StudySessionAssetResponse.model_validate(asset) @router.delete( "/{session_id}/assets/{asset_id}", status_code=204 ) async def unlink_study_asset( session_id: int, asset_id: int, user: Annotated[User, Depends(get_current_user)], session: Annotated[AsyncSession, Depends(get_session)], ): """asset 연결 해제. documents 본체는 유지.""" sess = await session.get(StudySession, session_id) sess = _verify_session_ownership(sess, user) asset = await session.get(StudySessionAsset, asset_id) if asset is None or asset.study_session_id != session_id: raise HTTPException(status_code=404, detail="asset 을 찾을 수 없습니다") await session.delete(asset) await session.commit() # ─── Snapshot (PNG 업로드) ─── @router.post("/{session_id}/snapshot", response_model=StudySessionAssetResponse, status_code=201) async def upload_handwriting_snapshot( session_id: int, request: Request, file: UploadFile, user: Annotated[User, Depends(get_current_user)], session: Annotated[AsyncSession, Depends(get_session)], sort_order: int = Form(0), ): """캔버스 PNG 업로드 → documents 등록 + handwriting_png asset 연결. documents.py upload_document 의 atomic rename + error_code 패턴을 PNG 전용으로 차용. 동일 세션에 여러 snapshot 누적 가능 (UNIQUE 제약은 (session, document, type, role) 단위라 document_id 가 매번 새로 생기므로 충돌 없음). """ sess = await session.get(StudySession, session_id) sess = _verify_session_ownership(sess, user) if not file.filename: raise _upload_error(400, "invalid_input", "파일명이 필요합니다") safe_name = Path(file.filename).name if not safe_name or safe_name.startswith("."): raise _upload_error(400, "invalid_input", "유효하지 않은 파일명") ext = Path(safe_name).suffix.lower() if ext != ".png": raise _upload_error( 400, "invalid_input", "snapshot 은 PNG 파일만 지원합니다", ) max_bytes = settings.upload.max_bytes slack_ratio = settings.upload.content_length_slack_ratio chunk_size = settings.upload.stream_chunk_bytes # Content-Length 사전 차단 cl_header = request.headers.get("content-length") if cl_header: try: cl = int(cl_header) if cl > int(max_bytes * slack_ratio): raise _upload_error(413, "body_too_large", "파일이 너무 큽니다") except ValueError: pass # NAS Inbox 경로 결정 + 충돌 회피 inbox_dir = Path(settings.nas_mount_path) / "PKM" / "Inbox" inbox_dir.mkdir(parents=True, exist_ok=True) target = (inbox_dir / safe_name).resolve() if not str(target).startswith(str(inbox_dir.resolve())): raise _upload_error(400, "invalid_input", "잘못된 파일 경로") counter = 1 stem, suffix = target.stem, target.suffix staging = target.with_name(target.name + ".uploading") while target.exists() or staging.exists(): target = inbox_dir.resolve() / f"{stem}_{counter}{suffix}" staging = target.with_name(target.name + ".uploading") counter += 1 # 스트리밍 저장 + 누적 사이즈 검증 written = 0 try: with staging.open("wb") as f: while chunk := await file.read(chunk_size): written += len(chunk) if written > max_bytes: raise _upload_error(413, "body_too_large", "파일이 너무 큽니다") f.write(chunk) if written == 0: raise _upload_error(400, "empty_file", "빈 파일은 업로드할 수 없습니다") except ClientDisconnect: staging.unlink(missing_ok=True) logger.info("snapshot aborted by client: %s (written=%d)", safe_name, written) raise _upload_error(499, "network_abort", "업로드가 취소되었습니다") except asyncio.TimeoutError: staging.unlink(missing_ok=True) logger.warning("snapshot timeout: %s (written=%d)", safe_name, written) raise _upload_error(408, "upload_timeout", "업로드 시간 초과") except HTTPException: staging.unlink(missing_ok=True) raise except Exception: staging.unlink(missing_ok=True) logger.exception("snapshot internal error: %s (written=%d)", safe_name, written) raise _upload_error(500, "internal", "업로드 처리 중 오류가 발생했습니다") # atomic rename → 최종 경로 try: staging.replace(target) except OSError: staging.unlink(missing_ok=True) logger.exception("snapshot rename failed: %s -> %s", staging, target) raise _upload_error(500, "internal", "파일 저장 후 정리 중 오류가 발생했습니다") # Document + ProcessingQueue('extract') + StudySessionAsset 단일 트랜잭션 rel_path = str(target.relative_to(Path(settings.nas_mount_path))) fhash = file_hash(target) # 학습 세션 메타에서 user_tags 합성 domain_tag = sess.certification or sess.language_code or "general" user_tags = ["handwriting", domain_tag] if sess.subject: user_tags.append(sess.subject) title = f"필기 — {sess.topic or sess.subject or 'study session'} #{session_id}" try: doc = Document( file_path=rel_path, file_hash=fhash, file_format="png", file_size=written, file_type="immutable", title=title, user_tags=user_tags, ) session.add(doc) await session.flush() await enqueue_stage(session, doc.id, "extract") asset = StudySessionAsset( study_session_id=session_id, document_id=doc.id, asset_type="handwriting_png", role="answer", sort_order=sort_order, ) session.add(asset) await session.commit() await session.refresh(asset) except Exception: # DB 트랜잭션은 자동 rollback. 파일은 별도 자원 → 명시 unlink. target.unlink(missing_ok=True) raise return StudySessionAssetResponse.model_validate(asset)