""" 문서 관리 API 라우터 """ from fastapi import APIRouter, Depends, HTTPException, status, UploadFile, File, Form from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select, delete, and_, or_ from sqlalchemy.orm import selectinload from typing import List, Optional import os import uuid import aiofiles from pathlib import Path from src.core.database import get_db from src.core.config import settings from src.models.user import User from src.models.document import Document, Tag from src.api.dependencies import get_current_active_user, get_current_admin_user from pydantic import BaseModel from datetime import datetime class DocumentResponse(BaseModel): """문서 응답""" id: str title: str description: Optional[str] html_path: str pdf_path: Optional[str] thumbnail_path: Optional[str] file_size: Optional[int] page_count: Optional[int] language: str is_public: bool is_processed: bool created_at: datetime updated_at: Optional[datetime] document_date: Optional[datetime] uploader_name: Optional[str] tags: List[str] = [] class Config: from_attributes = True class TagResponse(BaseModel): """태그 응답""" id: str name: str color: str description: Optional[str] document_count: int = 0 class Config: from_attributes = True class CreateTagRequest(BaseModel): """태그 생성 요청""" name: str color: str = "#3B82F6" description: Optional[str] = None router = APIRouter() @router.get("/", response_model=List[DocumentResponse]) async def list_documents( skip: int = 0, limit: int = 50, tag: Optional[str] = None, search: Optional[str] = None, current_user: User = Depends(get_current_active_user), db: AsyncSession = Depends(get_db) ): """문서 목록 조회""" query = select(Document).options( selectinload(Document.uploader), selectinload(Document.tags) ) # 권한 필터링 (관리자가 아니면 공개 문서 + 자신이 업로드한 문서만) if not current_user.is_admin: query = query.where( or_( Document.is_public == True, Document.uploaded_by == current_user.id ) ) # 태그 필터링 if tag: query = query.join(Document.tags).where(Tag.name == tag) # 검색 필터링 if search: query = query.where( or_( Document.title.ilike(f"%{search}%"), Document.description.ilike(f"%{search}%") ) ) query = query.order_by(Document.created_at.desc()).offset(skip).limit(limit) result = await db.execute(query) documents = result.scalars().all() # 응답 데이터 변환 response_data = [] for doc in documents: doc_data = DocumentResponse.from_orm(doc) doc_data.uploader_name = doc.uploader.full_name or doc.uploader.email doc_data.tags = [tag.name for tag in doc.tags] response_data.append(doc_data) return response_data @router.post("/", response_model=DocumentResponse) async def upload_document( title: str = Form(...), description: Optional[str] = Form(None), document_date: Optional[str] = Form(None), is_public: bool = Form(False), tags: Optional[str] = Form(None), # 쉼표로 구분된 태그 html_file: UploadFile = File(...), pdf_file: Optional[UploadFile] = File(None), current_user: User = Depends(get_current_active_user), db: AsyncSession = Depends(get_db) ): """문서 업로드""" # 파일 확장자 확인 if not html_file.filename.lower().endswith(('.html', '.htm')): raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Only HTML files are allowed for the main document" ) if pdf_file and not pdf_file.filename.lower().endswith('.pdf'): raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Only PDF files are allowed for the original document" ) # 고유 파일명 생성 doc_id = str(uuid.uuid4()) html_filename = f"{doc_id}.html" pdf_filename = f"{doc_id}.pdf" if pdf_file else None # 파일 저장 경로 html_path = os.path.join(settings.UPLOAD_DIR, "documents", html_filename) pdf_path = os.path.join(settings.UPLOAD_DIR, "documents", pdf_filename) if pdf_file else None try: # HTML 파일 저장 async with aiofiles.open(html_path, 'wb') as f: content = await html_file.read() await f.write(content) # PDF 파일 저장 (있는 경우) if pdf_file and pdf_path: async with aiofiles.open(pdf_path, 'wb') as f: content = await pdf_file.read() await f.write(content) # 문서 메타데이터 생성 document = Document( id=doc_id, title=title, description=description, html_path=html_path, pdf_path=pdf_path, file_size=len(await html_file.read()) if html_file else None, uploaded_by=current_user.id, original_filename=html_file.filename, is_public=is_public, document_date=datetime.fromisoformat(document_date) if document_date else None ) db.add(document) await db.flush() # ID 생성을 위해 # 태그 처리 if tags: tag_names = [tag.strip() for tag in tags.split(',') if tag.strip()] for tag_name in tag_names: # 기존 태그 찾기 또는 생성 result = await db.execute(select(Tag).where(Tag.name == tag_name)) tag = result.scalar_one_or_none() if not tag: tag = Tag( name=tag_name, created_by=current_user.id ) db.add(tag) await db.flush() document.tags.append(tag) await db.commit() await db.refresh(document) # 응답 데이터 생성 response_data = DocumentResponse.from_orm(document) response_data.uploader_name = current_user.full_name or current_user.email response_data.tags = [tag.name for tag in document.tags] return response_data except Exception as e: # 파일 정리 if os.path.exists(html_path): os.remove(html_path) if pdf_path and os.path.exists(pdf_path): os.remove(pdf_path) raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to upload document: {str(e)}" ) @router.get("/{document_id}", response_model=DocumentResponse) async def get_document( document_id: str, current_user: User = Depends(get_current_active_user), db: AsyncSession = Depends(get_db) ): """문서 상세 조회""" result = await db.execute( select(Document) .options(selectinload(Document.uploader), selectinload(Document.tags)) .where(Document.id == document_id) ) document = result.scalar_one_or_none() if not document: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Document not found" ) # 권한 확인 if not document.is_public and document.uploaded_by != current_user.id and not current_user.is_admin: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Not enough permissions" ) response_data = DocumentResponse.from_orm(document) response_data.uploader_name = document.uploader.full_name or document.uploader.email response_data.tags = [tag.name for tag in document.tags] return response_data @router.delete("/{document_id}") async def delete_document( document_id: str, current_user: User = Depends(get_current_active_user), db: AsyncSession = Depends(get_db) ): """문서 삭제""" result = await db.execute(select(Document).where(Document.id == document_id)) document = result.scalar_one_or_none() if not document: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Document not found" ) # 권한 확인 (업로더 또는 관리자만) if document.uploaded_by != current_user.id and not current_user.is_admin: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Not enough permissions" ) # 파일 삭제 if document.html_path and os.path.exists(document.html_path): os.remove(document.html_path) if document.pdf_path and os.path.exists(document.pdf_path): os.remove(document.pdf_path) if document.thumbnail_path and os.path.exists(document.thumbnail_path): os.remove(document.thumbnail_path) # 데이터베이스에서 삭제 await db.execute(delete(Document).where(Document.id == document_id)) await db.commit() return {"message": "Document deleted successfully"} @router.get("/tags/", response_model=List[TagResponse]) async def list_tags( current_user: User = Depends(get_current_active_user), db: AsyncSession = Depends(get_db) ): """태그 목록 조회""" result = await db.execute(select(Tag).order_by(Tag.name)) tags = result.scalars().all() # 각 태그의 문서 수 계산 response_data = [] for tag in tags: tag_data = TagResponse.from_orm(tag) # 문서 수 계산 (권한 고려) doc_query = select(Document).join(Document.tags).where(Tag.id == tag.id) if not current_user.is_admin: doc_query = doc_query.where( or_( Document.is_public == True, Document.uploaded_by == current_user.id ) ) doc_result = await db.execute(doc_query) tag_data.document_count = len(doc_result.scalars().all()) response_data.append(tag_data) return response_data @router.post("/tags/", response_model=TagResponse) async def create_tag( tag_data: CreateTagRequest, current_user: User = Depends(get_current_active_user), db: AsyncSession = Depends(get_db) ): """태그 생성""" # 중복 확인 result = await db.execute(select(Tag).where(Tag.name == tag_data.name)) existing_tag = result.scalar_one_or_none() if existing_tag: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Tag already exists" ) # 태그 생성 tag = Tag( name=tag_data.name, color=tag_data.color, description=tag_data.description, created_by=current_user.id ) db.add(tag) await db.commit() await db.refresh(tag) response_data = TagResponse.from_orm(tag) response_data.document_count = 0 return response_data