Files
document-server/backend/src/api/routes/documents.py
Hyungi Ahn 8d7f4c04bb feat: PDF 매칭 필터링 및 서적 정보 UI 개선
- 서적 편집 페이지에서 PDF 매칭 드롭다운이 현재 서적의 PDF만 표시하도록 수정
- PDF 관리 페이지에 서적 정보 표시 UI 추가
- 타입 안전한 비교로 book_id 필터링 개선
- PDF 통계 카드에 서적별 분류 추가
- 필터 기능에 '서적 포함' 옵션 추가
- 디버깅 로그 추가로 문제 추적 개선

주요 변경사항:
- book-editor.js: String() 타입 변환으로 안전한 book_id 비교
- pdf-manager.html/js: 서적 정보 배지 및 통계 카드 추가
- book-documents.js: HTML 문서 필터링 로직 개선
2025-08-26 15:32:46 +09:00

935 lines
33 KiB
Python

"""
문서 관리 API 라우터
"""
from fastapi import APIRouter, Depends, HTTPException, status, UploadFile, File, Form
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, delete, and_, or_, update
from sqlalchemy.orm import selectinload
from typing import List, Optional
import os
import uuid
from uuid import UUID
import aiofiles
from pathlib import Path
from ...core.database import get_db
from ...core.config import settings
from ...models.user import User
from ...models.document import Document, Tag
from ...models.book import Book
from ..dependencies import get_current_active_user, get_current_admin_user
from pydantic import BaseModel
from datetime import datetime
class DocumentResponse(BaseModel):
"""문서 응답"""
id: str
title: str
description: Optional[str]
html_path: Optional[str] # PDF만 업로드하는 경우 None 가능
pdf_path: Optional[str]
thumbnail_path: Optional[str]
file_size: Optional[int]
page_count: Optional[int]
language: str
is_public: bool
is_processed: bool
created_at: datetime
updated_at: Optional[datetime]
document_date: Optional[datetime]
uploader_name: Optional[str]
tags: List[str] = []
# 서적 정보
book_id: Optional[str] = None
book_title: Optional[str] = None
book_author: Optional[str] = None
# 소분류 정보
category_id: Optional[str] = None
category_name: Optional[str] = None
sort_order: int = 0
# PDF 매칭 정보
matched_pdf_id: Optional[str] = None
class Config:
from_attributes = True
class TagResponse(BaseModel):
"""태그 응답"""
id: str
name: str
color: str
description: Optional[str]
document_count: int = 0
class Config:
from_attributes = True
class CreateTagRequest(BaseModel):
"""태그 생성 요청"""
name: str
color: str = "#3B82F6"
description: Optional[str] = None
router = APIRouter()
@router.get("/", response_model=List[DocumentResponse])
async def list_documents(
skip: int = 0,
limit: int = 50,
tag: Optional[str] = None,
search: Optional[str] = None,
current_user: User = Depends(get_current_active_user),
db: AsyncSession = Depends(get_db)
):
"""문서 목록 조회"""
query = select(Document).options(
selectinload(Document.uploader),
selectinload(Document.tags),
selectinload(Document.book), # 서적 정보 추가
selectinload(Document.category) # 소분류 정보 추가
)
# 권한 필터링 (관리자가 아니면 공개 문서 + 자신이 업로드한 문서만)
if not current_user.is_admin:
query = query.where(
or_(
Document.is_public == True,
Document.uploaded_by == current_user.id
)
)
# 태그 필터링
if tag:
query = query.join(Document.tags).where(Tag.name == tag)
# 검색 필터링
if search:
query = query.where(
or_(
Document.title.ilike(f"%{search}%"),
Document.description.ilike(f"%{search}%")
)
)
query = query.order_by(Document.created_at.desc()).offset(skip).limit(limit)
result = await db.execute(query)
documents = result.scalars().all()
# 응답 데이터 변환
response_data = []
for doc in documents:
doc_data = DocumentResponse(
id=str(doc.id),
title=doc.title,
description=doc.description,
html_path=doc.html_path, # None 가능 (PDF만 업로드한 경우)
pdf_path=doc.pdf_path,
thumbnail_path=doc.thumbnail_path,
file_size=doc.file_size,
page_count=doc.page_count,
language=doc.language,
is_public=doc.is_public,
is_processed=doc.is_processed,
created_at=doc.created_at,
updated_at=doc.updated_at,
document_date=doc.document_date,
uploader_name=doc.uploader.full_name or doc.uploader.email,
tags=[tag.name for tag in doc.tags],
# 서적 정보 추가
book_id=str(doc.book.id) if doc.book else None,
book_title=doc.book.title if doc.book else None,
book_author=doc.book.author if doc.book else None,
# 소분류 정보 추가
category_id=str(doc.category.id) if doc.category else None,
category_name=doc.category.name if doc.category else None,
sort_order=doc.sort_order,
# PDF 매칭 정보 추가
matched_pdf_id=str(doc.matched_pdf_id) if doc.matched_pdf_id else None
)
response_data.append(doc_data)
return response_data
@router.get("/hierarchy/structured", response_model=dict)
async def get_documents_by_hierarchy(
current_user: User = Depends(get_current_active_user),
db: AsyncSession = Depends(get_db)
):
"""계층구조별 문서 조회 (서적 > 소분류 > 문서)"""
# 모든 문서 조회 (기존 로직 재사용)
query = select(Document).options(
selectinload(Document.uploader),
selectinload(Document.tags),
selectinload(Document.book),
selectinload(Document.category)
)
# 권한 필터링
if not current_user.is_admin:
query = query.where(
or_(
Document.is_public == True,
Document.uploaded_by == current_user.id
)
)
# 정렬: 서적별 > 소분류별 > 문서 순서별
query = query.order_by(
Document.book_id.nulls_last(), # 서적 있는 것 먼저
Document.category_id.nulls_last(), # 소분류 있는 것 먼저
Document.sort_order,
Document.created_at.desc()
)
result = await db.execute(query)
documents = result.scalars().all()
# 계층구조로 그룹화
hierarchy = {
"books": {}, # 서적별 그룹
"uncategorized": [] # 미분류 문서들
}
for doc in documents:
doc_data = {
"id": str(doc.id),
"title": doc.title,
"description": doc.description,
"created_at": doc.created_at.isoformat(),
"uploader_name": doc.uploader.full_name or doc.uploader.email,
"tags": [tag.name for tag in doc.tags],
"sort_order": doc.sort_order,
"book_id": str(doc.book.id) if doc.book else None,
"book_title": doc.book.title if doc.book else None,
"category_id": str(doc.category.id) if doc.category else None,
"category_name": doc.category.name if doc.category else None
}
if doc.book:
# 서적이 있는 문서
book_id = str(doc.book.id)
if book_id not in hierarchy["books"]:
hierarchy["books"][book_id] = {
"id": book_id,
"title": doc.book.title,
"author": doc.book.author,
"categories": {},
"uncategorized_documents": []
}
if doc.category:
# 소분류가 있는 문서
category_id = str(doc.category.id)
if category_id not in hierarchy["books"][book_id]["categories"]:
hierarchy["books"][book_id]["categories"][category_id] = {
"id": category_id,
"name": doc.category.name,
"documents": []
}
hierarchy["books"][book_id]["categories"][category_id]["documents"].append(doc_data)
else:
# 서적은 있지만 소분류가 없는 문서
hierarchy["books"][book_id]["uncategorized_documents"].append(doc_data)
else:
# 서적이 없는 미분류 문서
hierarchy["uncategorized"].append(doc_data)
return hierarchy
@router.post("/", response_model=DocumentResponse)
async def upload_document(
title: str = Form(...),
description: Optional[str] = Form(None),
document_date: Optional[str] = Form(None),
language: Optional[str] = Form("ko"),
is_public: bool = Form(False),
tags: Optional[List[str]] = Form(None), # 태그 리스트
book_id: Optional[str] = Form(None), # 서적 ID 추가
html_file: UploadFile = File(...),
pdf_file: Optional[UploadFile] = File(None),
current_user: User = Depends(get_current_active_user),
db: AsyncSession = Depends(get_db)
):
"""문서 업로드"""
# 파일 확장자 확인 (HTML 또는 PDF 허용)
file_extension = html_file.filename.lower()
is_pdf_file = file_extension.endswith('.pdf')
is_html_file = file_extension.endswith(('.html', '.htm'))
if not (is_html_file or is_pdf_file):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Only HTML and PDF files are allowed"
)
if pdf_file and not pdf_file.filename.lower().endswith('.pdf'):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Only PDF files are allowed for the original document"
)
# 고유 파일명 생성
doc_id = str(uuid.uuid4())
# 메인 파일 처리 (HTML 또는 PDF) - 폴더 분리
if is_pdf_file:
main_filename = f"{doc_id}.pdf"
pdf_dir = os.path.join(settings.UPLOAD_DIR, "pdfs")
os.makedirs(pdf_dir, exist_ok=True) # PDF 폴더 생성
main_path = os.path.join(pdf_dir, main_filename)
html_path = None # PDF만 업로드하는 경우 html_path는 None
pdf_path = main_path # PDF 파일인 경우 pdf_path에 저장
else:
main_filename = f"{doc_id}.html"
html_dir = os.path.join(settings.UPLOAD_DIR, "documents")
os.makedirs(html_dir, exist_ok=True) # HTML 폴더 생성
main_path = os.path.join(html_dir, main_filename)
html_path = main_path
pdf_path = None
# 추가 PDF 파일 처리 (HTML 파일과 함께 업로드된 경우)
additional_pdf_path = None
if pdf_file:
additional_pdf_filename = f"{doc_id}_additional.pdf"
pdf_dir = os.path.join(settings.UPLOAD_DIR, "pdfs")
os.makedirs(pdf_dir, exist_ok=True) # PDF 폴더 생성
additional_pdf_path = os.path.join(pdf_dir, additional_pdf_filename)
try:
# 메인 파일 저장 (HTML 또는 PDF)
async with aiofiles.open(main_path, 'wb') as f:
content = await html_file.read()
await f.write(content)
# 추가 PDF 파일 저장 (HTML과 함께 업로드된 경우)
if pdf_file and additional_pdf_path:
async with aiofiles.open(additional_pdf_path, 'wb') as f:
additional_content = await pdf_file.read()
await f.write(additional_content)
# HTML 파일인 경우 추가 PDF를 pdf_path로 설정
if is_html_file:
pdf_path = additional_pdf_path
# 서적 ID 검증 (있는 경우)
validated_book_id = None
if book_id:
try:
# UUID 형식 검증 및 서적 존재 확인
from uuid import UUID
book_uuid = UUID(book_id)
book_result = await db.execute(select(Book).where(Book.id == book_uuid))
book = book_result.scalar_one_or_none()
if book:
validated_book_id = book_uuid
except (ValueError, Exception):
# 잘못된 UUID 형식이거나 서적이 없으면 무시
pass
# 문서 메타데이터 생성
document = Document(
id=doc_id,
title=title,
description=description,
html_path=html_path,
pdf_path=pdf_path,
language=language,
file_size=len(content), # HTML 파일 크기
uploaded_by=current_user.id,
original_filename=html_file.filename,
is_public=is_public,
document_date=datetime.fromisoformat(document_date) if document_date else None,
book_id=validated_book_id # 서적 ID 추가
)
db.add(document)
await db.flush() # ID 생성을 위해
# 태그 처리
if tags:
tag_names = [tag.strip() for tag in tags.split(',') if tag.strip()]
for tag_name in tag_names:
# 기존 태그 찾기 또는 생성
result = await db.execute(select(Tag).where(Tag.name == tag_name))
tag = result.scalar_one_or_none()
if not tag:
tag = Tag(
name=tag_name,
created_by=current_user.id
)
db.add(tag)
await db.flush()
document.tags.append(tag)
await db.commit()
# 문서 정보를 다시 로드 (태그 포함)
result = await db.execute(
select(Document)
.options(selectinload(Document.tags))
.where(Document.id == document.id)
)
document_with_tags = result.scalar_one()
# 응답 데이터 생성
return DocumentResponse(
id=str(document_with_tags.id),
title=document_with_tags.title,
description=document_with_tags.description,
html_path=document_with_tags.html_path,
pdf_path=document_with_tags.pdf_path,
thumbnail_path=document_with_tags.thumbnail_path,
file_size=document_with_tags.file_size,
page_count=document_with_tags.page_count,
language=document_with_tags.language,
is_public=document_with_tags.is_public,
is_processed=document_with_tags.is_processed,
created_at=document_with_tags.created_at,
updated_at=document_with_tags.updated_at,
document_date=document_with_tags.document_date,
uploader_name=current_user.full_name or current_user.email,
tags=[tag.name for tag in document_with_tags.tags],
matched_pdf_id=str(document_with_tags.matched_pdf_id) if document_with_tags.matched_pdf_id else None
)
except Exception as e:
# 파일 정리
if os.path.exists(main_path):
os.remove(main_path)
if additional_pdf_path and os.path.exists(additional_pdf_path):
os.remove(additional_pdf_path)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to upload document: {str(e)}"
)
@router.get("/{document_id}", response_model=DocumentResponse)
async def get_document(
document_id: str,
current_user: User = Depends(get_current_active_user),
db: AsyncSession = Depends(get_db)
):
"""문서 상세 조회"""
result = await db.execute(
select(Document)
.options(selectinload(Document.uploader), selectinload(Document.tags))
.where(Document.id == document_id)
)
document = result.scalar_one_or_none()
if not document:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Document not found"
)
# 권한 확인
if not document.is_public and document.uploaded_by != current_user.id and not current_user.is_admin:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Not enough permissions"
)
return DocumentResponse(
id=str(document.id),
title=document.title,
description=document.description,
html_path=document.html_path,
pdf_path=document.pdf_path,
thumbnail_path=document.thumbnail_path,
file_size=document.file_size,
page_count=document.page_count,
language=document.language,
is_public=document.is_public,
is_processed=document.is_processed,
created_at=document.created_at,
updated_at=document.updated_at,
document_date=document.document_date,
uploader_name=document.uploader.full_name or document.uploader.email,
tags=[tag.name for tag in document.tags],
matched_pdf_id=str(document.matched_pdf_id) if document.matched_pdf_id else None
)
@router.get("/{document_id}/content")
async def get_document_content(
document_id: str,
current_user: User = Depends(get_current_active_user),
db: AsyncSession = Depends(get_db)
):
"""문서 HTML 콘텐츠 조회"""
try:
doc_uuid = UUID(document_id)
except ValueError:
raise HTTPException(status_code=400, detail="Invalid document ID format")
# 문서 조회
query = select(Document).where(Document.id == doc_uuid)
result = await db.execute(query)
document = result.scalar_one_or_none()
if not document:
raise HTTPException(status_code=404, detail="Document not found")
# 권한 확인
if not current_user.is_admin and not document.is_public and document.uploaded_by != current_user.id:
raise HTTPException(status_code=403, detail="Access denied")
# HTML 파일 읽기
import os
from fastapi.responses import HTMLResponse
# html_path는 이미 전체 경로를 포함하고 있음
file_path = document.html_path
if not os.path.exists(file_path):
raise HTTPException(status_code=404, detail="Document file not found")
try:
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
return HTMLResponse(content=content)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Error reading document: {str(e)}")
class UpdateDocumentRequest(BaseModel):
"""문서 업데이트 요청"""
title: Optional[str] = None
description: Optional[str] = None
sort_order: Optional[int] = None
matched_pdf_id: Optional[str] = None
is_public: Optional[bool] = None
tags: Optional[List[str]] = None
@router.put("/{document_id}", response_model=DocumentResponse)
async def update_document(
document_id: str,
update_data: UpdateDocumentRequest,
current_user: User = Depends(get_current_active_user),
db: AsyncSession = Depends(get_db)
):
"""문서 정보 업데이트"""
try:
doc_uuid = UUID(document_id)
except ValueError:
raise HTTPException(status_code=400, detail="Invalid document ID format")
# 문서 조회
result = await db.execute(
select(Document)
.options(selectinload(Document.tags), selectinload(Document.uploader), selectinload(Document.book))
.where(Document.id == doc_uuid)
)
document = result.scalar_one_or_none()
if not document:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Document not found"
)
# 권한 확인 (관리자이거나 문서 소유자)
if not current_user.is_admin and document.uploaded_by != current_user.id:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Not enough permissions to update this document"
)
# 업데이트할 필드들 적용
update_fields = update_data.model_dump(exclude_unset=True)
for field, value in update_fields.items():
if field == "matched_pdf_id":
# PDF 매칭 처리
if value:
try:
pdf_uuid = UUID(value)
# PDF 문서가 실제로 존재하는지 확인
pdf_result = await db.execute(select(Document).where(Document.id == pdf_uuid))
pdf_doc = pdf_result.scalar_one_or_none()
if pdf_doc:
setattr(document, field, pdf_uuid)
except ValueError:
# 잘못된 UUID 형식이면 무시
pass
else:
# None으로 설정하여 매칭 해제
setattr(document, field, None)
elif field == "tags":
# 태그 처리
if value is not None:
# 기존 태그 관계 제거
document.tags.clear()
# 새 태그 추가
for tag_name in value:
tag_name = tag_name.strip()
if tag_name:
# 기존 태그 찾기 또는 생성
tag_result = await db.execute(select(Tag).where(Tag.name == tag_name))
tag = tag_result.scalar_one_or_none()
if not tag:
tag = Tag(
name=tag_name,
created_by=current_user.id
)
db.add(tag)
await db.flush()
document.tags.append(tag)
else:
# 일반 필드 업데이트
setattr(document, field, value)
# 업데이트 시간 갱신
document.updated_at = datetime.utcnow()
await db.commit()
await db.refresh(document)
# 응답 데이터 생성
return DocumentResponse(
id=str(document.id),
title=document.title,
description=document.description,
html_path=document.html_path,
pdf_path=document.pdf_path,
thumbnail_path=document.thumbnail_path,
file_size=document.file_size,
page_count=document.page_count,
language=document.language,
is_public=document.is_public,
is_processed=document.is_processed,
created_at=document.created_at,
updated_at=document.updated_at,
document_date=document.document_date,
uploader_name=document.uploader.full_name or document.uploader.email,
tags=[tag.name for tag in document.tags],
book_id=str(document.book.id) if document.book else None,
book_title=document.book.title if document.book else None,
book_author=document.book.author if document.book else None,
sort_order=document.sort_order,
matched_pdf_id=str(document.matched_pdf_id) if document.matched_pdf_id else None
)
@router.get("/{document_id}/download")
async def download_document(
document_id: str,
current_user: User = Depends(get_current_active_user),
db: AsyncSession = Depends(get_db)
):
"""문서 파일 다운로드"""
try:
doc_uuid = UUID(document_id)
except ValueError:
raise HTTPException(status_code=400, detail="Invalid document ID format")
# 문서 조회
query = select(Document).where(Document.id == doc_uuid)
result = await db.execute(query)
document = result.scalar_one_or_none()
if not document:
raise HTTPException(status_code=404, detail="Document not found")
# 권한 확인
if not current_user.is_admin and not document.is_public and document.uploaded_by != current_user.id:
raise HTTPException(status_code=403, detail="Access denied")
# 다운로드할 파일 경로 결정 (PDF 우선, 없으면 HTML)
file_path = document.pdf_path if document.pdf_path else document.html_path
if not file_path or not os.path.exists(file_path):
raise HTTPException(status_code=404, detail="Document file not found")
# 파일 응답
from fastapi.responses import FileResponse
# 파일명 설정
filename = document.original_filename
if not filename:
extension = '.pdf' if document.pdf_path else '.html'
filename = f"{document.title}{extension}"
return FileResponse(
path=file_path,
filename=filename,
media_type='application/octet-stream'
)
@router.get("/{document_id}/navigation")
async def get_document_navigation(
document_id: str,
current_user: User = Depends(get_current_active_user),
db: AsyncSession = Depends(get_db)
):
"""문서 네비게이션 정보 조회 (이전/다음 문서)"""
# 현재 문서 조회
result = await db.execute(select(Document).where(Document.id == document_id))
current_doc = result.scalar_one_or_none()
if not current_doc:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Document not found"
)
# 권한 확인
if not current_doc.is_public and current_doc.uploaded_by != current_user.id and not current_user.is_admin:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Access denied"
)
navigation_info = {
"current": {
"id": str(current_doc.id),
"title": current_doc.title,
"sort_order": current_doc.sort_order
},
"previous": None,
"next": None,
"book_info": None
}
# 서적에 속한 문서인 경우 이전/다음 문서 조회
if current_doc.book_id:
# 같은 서적의 HTML 문서들만 조회 (PDF 제외)
book_docs_result = await db.execute(
select(Document)
.where(
and_(
Document.book_id == current_doc.book_id,
Document.html_path.isnot(None), # HTML 문서만
or_(Document.is_public == True, Document.uploaded_by == current_user.id, current_user.is_admin == True)
)
)
.order_by(Document.sort_order.asc().nulls_last(), Document.created_at.asc())
)
book_docs = book_docs_result.scalars().all()
# 현재 문서의 인덱스 찾기
current_index = None
for i, doc in enumerate(book_docs):
if doc.id == current_doc.id:
current_index = i
break
if current_index is not None:
# 이전 문서
if current_index > 0:
prev_doc = book_docs[current_index - 1]
navigation_info["previous"] = {
"id": str(prev_doc.id),
"title": prev_doc.title,
"sort_order": prev_doc.sort_order
}
# 다음 문서
if current_index < len(book_docs) - 1:
next_doc = book_docs[current_index + 1]
navigation_info["next"] = {
"id": str(next_doc.id),
"title": next_doc.title,
"sort_order": next_doc.sort_order
}
# 서적 정보 추가
from ...models.book import Book
book_result = await db.execute(select(Book).where(Book.id == current_doc.book_id))
book = book_result.scalar_one_or_none()
if book:
navigation_info["book_info"] = {
"id": str(book.id),
"title": book.title,
"author": book.author
}
return navigation_info
@router.delete("/{document_id}")
async def delete_document(
document_id: str,
current_user: User = Depends(get_current_active_user),
db: AsyncSession = Depends(get_db)
):
"""문서 삭제"""
result = await db.execute(select(Document).where(Document.id == document_id))
document = result.scalar_one_or_none()
if not document:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Document not found"
)
# 권한 확인 (관리자만)
if not current_user.is_admin:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Only administrators can delete documents"
)
# 파일 삭제
if document.html_path and os.path.exists(document.html_path):
os.remove(document.html_path)
if document.pdf_path and os.path.exists(document.pdf_path):
os.remove(document.pdf_path)
if document.thumbnail_path and os.path.exists(document.thumbnail_path):
os.remove(document.thumbnail_path)
# 관련 데이터 안전하게 삭제 (외래키 제약 조건 해결)
from ...models.highlight import Highlight
from ...models.note import Note
from ...models.bookmark import Bookmark
try:
print(f"DEBUG: Starting deletion of document {document_id}")
# 0. PDF 참조 해제 (외래키 제약조건 해결)
# 이 문서를 matched_pdf_id로 참조하는 모든 문서의 참조를 NULL로 설정
await db.execute(
update(Document)
.where(Document.matched_pdf_id == document_id)
.values(matched_pdf_id=None)
)
print(f"DEBUG: Cleared matched_pdf_id references to document {document_id}")
# 1. 먼저 해당 문서의 모든 하이라이트 ID 조회
highlight_ids_result = await db.execute(select(Highlight.id).where(Highlight.document_id == document_id))
highlight_ids = [row[0] for row in highlight_ids_result.fetchall()]
print(f"DEBUG: Found {len(highlight_ids)} highlights to delete")
# 2. 하이라이트에 연결된 모든 메모 삭제
total_notes_deleted = 0
for highlight_id in highlight_ids:
note_result = await db.execute(delete(Note).where(Note.highlight_id == highlight_id))
total_notes_deleted += note_result.rowcount
print(f"DEBUG: Deleted {total_notes_deleted} notes by highlight_id")
# 3. document_id로 직접 연결된 메모도 삭제 (혹시 있다면)
direct_note_result = await db.execute(delete(Note).where(Note.document_id == document_id))
print(f"DEBUG: Deleted {direct_note_result.rowcount} notes by document_id")
# 4. 북마크 삭제
bookmark_result = await db.execute(delete(Bookmark).where(Bookmark.document_id == document_id))
print(f"DEBUG: Deleted {bookmark_result.rowcount} bookmarks")
# 5. 하이라이트 삭제 (이제 메모가 모두 삭제되었으므로 안전)
highlight_result = await db.execute(delete(Highlight).where(Highlight.document_id == document_id))
print(f"DEBUG: Deleted {highlight_result.rowcount} highlights")
# 6. 문서-태그 관계는 SQLAlchemy가 자동으로 처리
# 7. 마지막으로 문서 삭제
doc_result = await db.execute(delete(Document).where(Document.id == document_id))
print(f"DEBUG: Deleted {doc_result.rowcount} documents")
# 8. 커밋
await db.commit()
print(f"DEBUG: Successfully deleted document {document_id}")
except Exception as e:
print(f"ERROR: Failed to delete document {document_id}: {e}")
await db.rollback()
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to delete document: {str(e)}"
)
return {"message": "Document deleted successfully"}
@router.get("/tags/", response_model=List[TagResponse])
async def list_tags(
current_user: User = Depends(get_current_active_user),
db: AsyncSession = Depends(get_db)
):
"""태그 목록 조회"""
result = await db.execute(select(Tag).order_by(Tag.name))
tags = result.scalars().all()
# 각 태그의 문서 수 계산
response_data = []
for tag in tags:
# 문서 수 계산 (권한 고려)
doc_query = select(Document).join(Document.tags).where(Tag.id == tag.id)
if not current_user.is_admin:
doc_query = doc_query.where(
or_(
Document.is_public == True,
Document.uploaded_by == current_user.id
)
)
doc_result = await db.execute(doc_query)
document_count = len(doc_result.scalars().all())
tag_data = TagResponse(
id=str(tag.id),
name=tag.name,
color=tag.color,
description=tag.description,
document_count=document_count
)
response_data.append(tag_data)
return response_data
@router.post("/tags/", response_model=TagResponse)
async def create_tag(
tag_data: CreateTagRequest,
current_user: User = Depends(get_current_active_user),
db: AsyncSession = Depends(get_db)
):
"""태그 생성"""
# 중복 확인
result = await db.execute(select(Tag).where(Tag.name == tag_data.name))
existing_tag = result.scalar_one_or_none()
if existing_tag:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Tag already exists"
)
# 태그 생성
tag = Tag(
name=tag_data.name,
color=tag_data.color,
description=tag_data.description,
created_by=current_user.id
)
db.add(tag)
await db.commit()
await db.refresh(tag)
return TagResponse(
id=str(tag.id),
name=tag.name,
color=tag.color,
description=tag.description,
document_count=0
)