Files
document-server/backend/src/api/routes/memo_trees.py
Hyungi Ahn f95f67364a feat: 소설 분기 시스템 및 트리 메모장 구현
🌟 주요 기능:
- 트리 구조 메모장 시스템
- 소설 분기 관리 (정사 경로 설정)
- 중앙 배치 트리 다이어그램
- 정사 경로 목차 뷰
- 인라인 편집 기능

📚 백엔드:
- MemoTree, MemoNode 모델 추가
- 정사 경로 자동 순서 관리
- 분기점에서 하나만 선택 가능한 로직
- RESTful API 엔드포인트

🎨 프론트엔드:
- memo-tree.html: 트리 다이어그램 에디터
- story-view.html: 정사 경로 목차 뷰
- SVG 연결선으로 시각적 트리 표현
- Alpine.js 기반 반응형 UI
- Monaco Editor 통합

 특별 기능:
- 정사 경로 황금색 배지 표시
- 확대/축소 및 패닝 지원
- 드래그 앤 드롭 준비
- 내보내기 및 인쇄 기능
- 인라인 편집 모달
2025-08-25 10:25:10 +09:00

701 lines
24 KiB
Python

"""
트리 구조 메모장 API 라우터
"""
from fastapi import APIRouter, Depends, HTTPException, status, Query
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, delete, func, and_, or_
from sqlalchemy.orm import selectinload
from typing import List, Optional
from uuid import UUID
from ...core.database import get_db
from ...models.user import User
from ...models.memo_tree import MemoTree, MemoNode, MemoNodeVersion, MemoTreeShare
from ...schemas.memo_tree import (
MemoTreeCreate, MemoTreeUpdate, MemoTreeResponse, MemoTreeWithNodes,
MemoNodeCreate, MemoNodeUpdate, MemoNodeResponse, MemoNodeMove,
MemoTreeStats, MemoSearchRequest, MemoSearchResult
)
from ..dependencies import get_current_active_user
router = APIRouter(prefix="/memo-trees", tags=["memo-trees"])
# ============================================================================
# 메모 트리 관리
# ============================================================================
@router.get("/", response_model=List[MemoTreeResponse])
async def get_user_memo_trees(
current_user: User = Depends(get_current_active_user),
db: AsyncSession = Depends(get_db),
include_archived: bool = Query(False, description="보관된 트리 포함 여부")
):
"""사용자의 메모 트리 목록 조회"""
try:
query = select(MemoTree).where(MemoTree.user_id == current_user.id)
if not include_archived:
query = query.where(MemoTree.is_archived == False)
query = query.order_by(MemoTree.updated_at.desc())
result = await db.execute(query)
trees = result.scalars().all()
# 각 트리의 노드 개수 계산
tree_responses = []
for tree in trees:
node_count_result = await db.execute(
select(func.count(MemoNode.id)).where(MemoNode.tree_id == tree.id)
)
node_count = node_count_result.scalar() or 0
tree_dict = {
"id": str(tree.id),
"user_id": str(tree.user_id),
"title": tree.title,
"description": tree.description,
"tree_type": tree.tree_type,
"template_data": tree.template_data,
"settings": tree.settings,
"created_at": tree.created_at,
"updated_at": tree.updated_at,
"is_public": tree.is_public,
"is_archived": tree.is_archived,
"node_count": node_count
}
tree_responses.append(MemoTreeResponse(**tree_dict))
return tree_responses
except Exception as e:
print(f"ERROR in get_user_memo_trees: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to get memo trees: {str(e)}"
)
@router.post("/", response_model=MemoTreeResponse)
async def create_memo_tree(
tree_data: MemoTreeCreate,
current_user: User = Depends(get_current_active_user),
db: AsyncSession = Depends(get_db)
):
"""새 메모 트리 생성"""
try:
new_tree = MemoTree(
user_id=current_user.id,
title=tree_data.title,
description=tree_data.description,
tree_type=tree_data.tree_type,
template_data=tree_data.template_data or {},
settings=tree_data.settings or {},
is_public=tree_data.is_public
)
db.add(new_tree)
await db.commit()
await db.refresh(new_tree)
tree_dict = {
"id": str(new_tree.id),
"user_id": str(new_tree.user_id),
"title": new_tree.title,
"description": new_tree.description,
"tree_type": new_tree.tree_type,
"template_data": new_tree.template_data,
"settings": new_tree.settings,
"created_at": new_tree.created_at,
"updated_at": new_tree.updated_at,
"is_public": new_tree.is_public,
"is_archived": new_tree.is_archived,
"node_count": 0
}
return MemoTreeResponse(**tree_dict)
except Exception as e:
await db.rollback()
print(f"ERROR in create_memo_tree: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to create memo tree: {str(e)}"
)
@router.get("/{tree_id}", response_model=MemoTreeResponse)
async def get_memo_tree(
tree_id: UUID,
current_user: User = Depends(get_current_active_user),
db: AsyncSession = Depends(get_db)
):
"""메모 트리 상세 조회"""
try:
result = await db.execute(
select(MemoTree).where(
and_(
MemoTree.id == tree_id,
or_(
MemoTree.user_id == current_user.id,
MemoTree.is_public == True
)
)
)
)
tree = result.scalar_one_or_none()
if not tree:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Memo tree not found"
)
# 노드 개수 계산
node_count_result = await db.execute(
select(func.count(MemoNode.id)).where(MemoNode.tree_id == tree.id)
)
node_count = node_count_result.scalar() or 0
tree_dict = {
"id": str(tree.id),
"user_id": str(tree.user_id),
"title": tree.title,
"description": tree.description,
"tree_type": tree.tree_type,
"template_data": tree.template_data,
"settings": tree.settings,
"created_at": tree.created_at,
"updated_at": tree.updated_at,
"is_public": tree.is_public,
"is_archived": tree.is_archived,
"node_count": node_count
}
return MemoTreeResponse(**tree_dict)
except HTTPException:
raise
except Exception as e:
print(f"ERROR in get_memo_tree: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to get memo tree: {str(e)}"
)
@router.put("/{tree_id}", response_model=MemoTreeResponse)
async def update_memo_tree(
tree_id: UUID,
tree_data: MemoTreeUpdate,
current_user: User = Depends(get_current_active_user),
db: AsyncSession = Depends(get_db)
):
"""메모 트리 업데이트"""
try:
result = await db.execute(
select(MemoTree).where(
and_(
MemoTree.id == tree_id,
MemoTree.user_id == current_user.id
)
)
)
tree = result.scalar_one_or_none()
if not tree:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Memo tree not found"
)
# 업데이트할 필드들 적용
update_data = tree_data.dict(exclude_unset=True)
for field, value in update_data.items():
setattr(tree, field, value)
await db.commit()
await db.refresh(tree)
# 노드 개수 계산
node_count_result = await db.execute(
select(func.count(MemoNode.id)).where(MemoNode.tree_id == tree.id)
)
node_count = node_count_result.scalar() or 0
tree_dict = {
"id": str(tree.id),
"user_id": str(tree.user_id),
"title": tree.title,
"description": tree.description,
"tree_type": tree.tree_type,
"template_data": tree.template_data,
"settings": tree.settings,
"created_at": tree.created_at,
"updated_at": tree.updated_at,
"is_public": tree.is_public,
"is_archived": tree.is_archived,
"node_count": node_count
}
return MemoTreeResponse(**tree_dict)
except HTTPException:
raise
except Exception as e:
await db.rollback()
print(f"ERROR in update_memo_tree: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to update memo tree: {str(e)}"
)
@router.delete("/{tree_id}")
async def delete_memo_tree(
tree_id: UUID,
current_user: User = Depends(get_current_active_user),
db: AsyncSession = Depends(get_db)
):
"""메모 트리 삭제"""
try:
result = await db.execute(
select(MemoTree).where(
and_(
MemoTree.id == tree_id,
MemoTree.user_id == current_user.id
)
)
)
tree = result.scalar_one_or_none()
if not tree:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Memo tree not found"
)
# 트리 삭제 (CASCADE로 관련 노드들도 자동 삭제됨)
await db.delete(tree)
await db.commit()
return {"message": "Memo tree deleted successfully"}
except HTTPException:
raise
except Exception as e:
await db.rollback()
print(f"ERROR in delete_memo_tree: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to delete memo tree: {str(e)}"
)
# ============================================================================
# 메모 노드 관리
# ============================================================================
@router.get("/{tree_id}/nodes", response_model=List[MemoNodeResponse])
async def get_memo_tree_nodes(
tree_id: UUID,
current_user: User = Depends(get_current_active_user),
db: AsyncSession = Depends(get_db)
):
"""메모 트리의 모든 노드 조회"""
try:
# 트리 접근 권한 확인
tree_result = await db.execute(
select(MemoTree).where(
and_(
MemoTree.id == tree_id,
or_(
MemoTree.user_id == current_user.id,
MemoTree.is_public == True
)
)
)
)
tree = tree_result.scalar_one_or_none()
if not tree:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Memo tree not found"
)
# 노드들 조회
result = await db.execute(
select(MemoNode)
.where(MemoNode.tree_id == tree_id)
.order_by(MemoNode.path, MemoNode.sort_order)
)
nodes = result.scalars().all()
# 각 노드의 자식 개수 계산
node_responses = []
for node in nodes:
children_count_result = await db.execute(
select(func.count(MemoNode.id)).where(MemoNode.parent_id == node.id)
)
children_count = children_count_result.scalar() or 0
node_dict = {
"id": str(node.id),
"tree_id": str(node.tree_id),
"parent_id": str(node.parent_id) if node.parent_id else None,
"user_id": str(node.user_id),
"title": node.title,
"content": node.content,
"node_type": node.node_type,
"sort_order": node.sort_order,
"depth_level": node.depth_level,
"path": node.path,
"tags": node.tags or [],
"node_metadata": node.node_metadata or {},
"status": node.status,
"word_count": node.word_count,
"is_canonical": node.is_canonical,
"canonical_order": node.canonical_order,
"story_path": node.story_path,
"created_at": node.created_at,
"updated_at": node.updated_at,
"children_count": children_count
}
node_responses.append(MemoNodeResponse(**node_dict))
return node_responses
except HTTPException:
raise
except Exception as e:
print(f"ERROR in get_memo_tree_nodes: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to get memo tree nodes: {str(e)}"
)
@router.post("/{tree_id}/nodes", response_model=MemoNodeResponse)
async def create_memo_node(
tree_id: UUID,
node_data: MemoNodeCreate,
current_user: User = Depends(get_current_active_user),
db: AsyncSession = Depends(get_db)
):
"""새 메모 노드 생성"""
try:
# 트리 접근 권한 확인
tree_result = await db.execute(
select(MemoTree).where(
and_(
MemoTree.id == tree_id,
MemoTree.user_id == current_user.id
)
)
)
tree = tree_result.scalar_one_or_none()
if not tree:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Memo tree not found"
)
# 부모 노드 확인 (있다면)
if node_data.parent_id:
parent_result = await db.execute(
select(MemoNode).where(
and_(
MemoNode.id == UUID(node_data.parent_id),
MemoNode.tree_id == tree_id
)
)
)
parent_node = parent_result.scalar_one_or_none()
if not parent_node:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Parent node not found"
)
# 단어 수 계산
word_count = 0
if node_data.content:
word_count = len(node_data.content.replace('\n', ' ').split())
new_node = MemoNode(
tree_id=tree_id,
parent_id=UUID(node_data.parent_id) if node_data.parent_id else None,
user_id=current_user.id,
title=node_data.title,
content=node_data.content,
node_type=node_data.node_type,
sort_order=node_data.sort_order,
tags=node_data.tags or [],
node_metadata=node_data.node_metadata or {},
status=node_data.status,
word_count=word_count,
is_canonical=node_data.is_canonical or False
)
db.add(new_node)
await db.commit()
await db.refresh(new_node)
node_dict = {
"id": str(new_node.id),
"tree_id": str(new_node.tree_id),
"parent_id": str(new_node.parent_id) if new_node.parent_id else None,
"user_id": str(new_node.user_id),
"title": new_node.title,
"content": new_node.content,
"node_type": new_node.node_type,
"sort_order": new_node.sort_order,
"depth_level": new_node.depth_level,
"path": new_node.path,
"tags": new_node.tags or [],
"node_metadata": new_node.node_metadata or {},
"status": new_node.status,
"word_count": new_node.word_count,
"is_canonical": new_node.is_canonical,
"canonical_order": new_node.canonical_order,
"story_path": new_node.story_path,
"created_at": new_node.created_at,
"updated_at": new_node.updated_at,
"children_count": 0
}
return MemoNodeResponse(**node_dict)
except HTTPException:
raise
except Exception as e:
await db.rollback()
print(f"ERROR in create_memo_node: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to create memo node: {str(e)}"
)
@router.get("/nodes/{node_id}", response_model=MemoNodeResponse)
async def get_memo_node(
node_id: UUID,
current_user: User = Depends(get_current_active_user),
db: AsyncSession = Depends(get_db)
):
"""메모 노드 상세 조회"""
try:
result = await db.execute(
select(MemoNode)
.options(selectinload(MemoNode.tree))
.where(MemoNode.id == node_id)
)
node = result.scalar_one_or_none()
if not node:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Memo node not found"
)
# 접근 권한 확인
if node.tree.user_id != current_user.id and not node.tree.is_public:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Not enough permissions to access this node"
)
# 자식 개수 계산
children_count_result = await db.execute(
select(func.count(MemoNode.id)).where(MemoNode.parent_id == node.id)
)
children_count = children_count_result.scalar() or 0
node_dict = {
"id": str(node.id),
"tree_id": str(node.tree_id),
"parent_id": str(node.parent_id) if node.parent_id else None,
"user_id": str(node.user_id),
"title": node.title,
"content": node.content,
"node_type": node.node_type,
"sort_order": node.sort_order,
"depth_level": node.depth_level,
"path": node.path,
"tags": node.tags or [],
"node_metadata": node.node_metadata or {},
"status": node.status,
"word_count": node.word_count,
"is_canonical": node.is_canonical,
"canonical_order": node.canonical_order,
"story_path": node.story_path,
"created_at": node.created_at,
"updated_at": node.updated_at,
"children_count": children_count
}
return MemoNodeResponse(**node_dict)
except HTTPException:
raise
except Exception as e:
print(f"ERROR in get_memo_node: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to get memo node: {str(e)}"
)
@router.put("/nodes/{node_id}", response_model=MemoNodeResponse)
async def update_memo_node(
node_id: UUID,
node_data: MemoNodeUpdate,
current_user: User = Depends(get_current_active_user),
db: AsyncSession = Depends(get_db)
):
"""메모 노드 업데이트"""
try:
result = await db.execute(
select(MemoNode)
.options(selectinload(MemoNode.tree))
.where(MemoNode.id == node_id)
)
node = result.scalar_one_or_none()
if not node:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Memo node not found"
)
# 접근 권한 확인 (소유자만 수정 가능)
if node.tree.user_id != current_user.id:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Not enough permissions to update this node"
)
# 업데이트할 필드들 적용
update_data = node_data.dict(exclude_unset=True)
for field, value in update_data.items():
if field == "parent_id" and value:
# 부모 노드 유효성 검사
parent_result = await db.execute(
select(MemoNode).where(
and_(
MemoNode.id == UUID(value),
MemoNode.tree_id == node.tree_id
)
)
)
parent_node = parent_result.scalar_one_or_none()
if not parent_node:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Parent node not found"
)
setattr(node, field, UUID(value))
elif field == "parent_id" and value is None:
setattr(node, field, None)
else:
setattr(node, field, value)
# 내용이 업데이트되면 단어 수 재계산
if "content" in update_data:
word_count = 0
if node.content:
word_count = len(node.content.replace('\n', ' ').split())
node.word_count = word_count
await db.commit()
await db.refresh(node)
# 자식 개수 계산
children_count_result = await db.execute(
select(func.count(MemoNode.id)).where(MemoNode.parent_id == node.id)
)
children_count = children_count_result.scalar() or 0
node_dict = {
"id": str(node.id),
"tree_id": str(node.tree_id),
"parent_id": str(node.parent_id) if node.parent_id else None,
"user_id": str(node.user_id),
"title": node.title,
"content": node.content,
"node_type": node.node_type,
"sort_order": node.sort_order,
"depth_level": node.depth_level,
"path": node.path,
"tags": node.tags or [],
"node_metadata": node.node_metadata or {},
"status": node.status,
"word_count": node.word_count,
"is_canonical": node.is_canonical,
"canonical_order": node.canonical_order,
"story_path": node.story_path,
"created_at": node.created_at,
"updated_at": node.updated_at,
"children_count": children_count
}
return MemoNodeResponse(**node_dict)
except HTTPException:
raise
except Exception as e:
await db.rollback()
print(f"ERROR in update_memo_node: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to update memo node: {str(e)}"
)
@router.delete("/nodes/{node_id}")
async def delete_memo_node(
node_id: UUID,
current_user: User = Depends(get_current_active_user),
db: AsyncSession = Depends(get_db)
):
"""메모 노드 삭제"""
try:
result = await db.execute(
select(MemoNode)
.options(selectinload(MemoNode.tree))
.where(MemoNode.id == node_id)
)
node = result.scalar_one_or_none()
if not node:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Memo node not found"
)
# 접근 권한 확인 (소유자만 삭제 가능)
if node.tree.user_id != current_user.id:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Not enough permissions to delete this node"
)
# 노드 삭제 (CASCADE로 자식 노드들도 자동 삭제됨)
await db.delete(node)
await db.commit()
return {"message": "Memo node deleted successfully"}
except HTTPException:
raise
except Exception as e:
await db.rollback()
print(f"ERROR in delete_memo_node: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to delete memo node: {str(e)}"
)