Files
hyungi_document_server/app/api/library.py
T
Hyungi Ahn 964d4ffc67 feat(library): 자료실 분류 체계 독립 관리 Phase 1
library_categories 테이블 추가로 빈 카테고리 생성 가능.
CRUD API (생성/leaf rename/leaf delete) + 트리 머지 엔드포인트.
사이드바 트리에 컨텍스트 메뉴 (추가/이름변경/삭제).
LibraryPathEditor를 카테고리 기반 flat selector로 전환.
미분류는 시스템 분류로 보호 (삭제/이름변경 불가).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-15 10:01:53 +09:00

335 lines
10 KiB
Python

"""자료실 분류 체계 CRUD API — /api/library"""
from datetime import datetime
from typing import Annotated
from fastapi import APIRouter, Depends, HTTPException, Query
from pydantic import BaseModel
from sqlalchemy import func, select
from sqlalchemy import text as sql_text
from sqlalchemy.ext.asyncio import AsyncSession
from core.auth import get_current_user
from core.database import get_session
from core.library import LIBRARY_PREFIX, MAX_DEPTH, normalize_library_path
from models.category import LibraryCategory
from models.document import Document
from models.user import User
router = APIRouter()
# ─── 스키마 ───
class CategoryCreate(BaseModel):
path: str
class CategoryRename(BaseModel):
path: str
new_name: str
class CategoryResponse(BaseModel):
id: int
path: str
name: str
parent_path: str | None
depth: int
is_system: bool
created_at: datetime
updated_at: datetime
model_config = {"from_attributes": True}
class CategoryTreeNode(BaseModel):
name: str
path: str
count: int
is_category: bool
is_system: bool
has_children: bool
children: list["CategoryTreeNode"]
# ─── 엔드포인트 ───
@router.get("/categories", response_model=list[CategoryResponse])
async def list_categories(
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
):
"""전체 카테고리 flat 목록 (path 순)"""
result = await session.execute(
select(LibraryCategory).order_by(LibraryCategory.path)
)
return [CategoryResponse.model_validate(c) for c in result.scalars().all()]
@router.post("/categories", response_model=CategoryResponse, status_code=201)
async def create_category(
body: CategoryCreate,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
):
"""카테고리 생성 (조상 자동 생성 포함)"""
try:
normalized = normalize_library_path(body.path)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
segments = normalized.split("/")
if len(segments) > MAX_DEPTH:
raise HTTPException(status_code=400, detail=f"최대 {MAX_DEPTH}단계까지 가능")
# 중복 검사
existing = await session.execute(
select(LibraryCategory).where(LibraryCategory.path == normalized)
)
if existing.scalar_one_or_none():
raise HTTPException(status_code=409, detail="이미 존재하는 분류 경로")
# 조상 자동 생성
for i in range(1, len(segments)):
ancestor_path = "/".join(segments[:i])
ancestor_name = segments[i - 1]
ancestor_parent = "/".join(segments[: i - 1]) or None
exists = await session.execute(
select(LibraryCategory.id).where(
LibraryCategory.path == ancestor_path
)
)
if not exists.scalar_one_or_none():
session.add(LibraryCategory(
path=ancestor_path,
name=ancestor_name,
parent_path=ancestor_parent,
depth=i,
))
# 본 카테고리 생성
category = LibraryCategory(
path=normalized,
name=segments[-1],
parent_path="/".join(segments[:-1]) or None,
depth=len(segments),
)
session.add(category)
await session.commit()
await session.refresh(category)
return CategoryResponse.model_validate(category)
@router.patch("/categories", response_model=CategoryResponse)
async def rename_category(
body: CategoryRename,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
):
"""카테고리 이름 변경 (leaf only, path 기반 식별)"""
# 카테고리 조회
result = await session.execute(
select(LibraryCategory).where(LibraryCategory.path == body.path)
)
category = result.scalar_one_or_none()
if not category:
raise HTTPException(status_code=404, detail="분류를 찾을 수 없습니다")
# 시스템 분류 보호
if category.is_system:
raise HTTPException(status_code=422, detail="시스템 분류는 변경할 수 없습니다")
# leaf 검사
children = await session.execute(
select(func.count()).where(
LibraryCategory.parent_path == category.path
)
)
if children.scalar() > 0:
raise HTTPException(
status_code=422, detail="하위 분류가 있어 이름을 변경할 수 없습니다"
)
# new_name 검증
new_name = body.new_name.strip()
if not new_name:
raise HTTPException(status_code=400, detail="빈 이름")
if len(new_name) > 30:
raise HTTPException(status_code=400, detail="이름은 30자 이하")
# 새 path 계산
new_path = (
f"{category.parent_path}/{new_name}" if category.parent_path else new_name
)
# 중복 검사
dup = await session.execute(
select(LibraryCategory.id).where(LibraryCategory.path == new_path)
)
if dup.scalar_one_or_none():
raise HTTPException(status_code=409, detail="같은 이름의 분류가 이미 존재합니다")
old_tag = f"{LIBRARY_PREFIX}{category.path}"
new_tag = f"{LIBRARY_PREFIX}{new_path}"
# 문서 태그 갱신
await session.execute(
sql_text("""
UPDATE documents
SET user_tags = COALESCE((
SELECT jsonb_agg(
CASE WHEN elem = :old_tag THEN :new_tag ELSE elem END
)
FROM jsonb_array_elements_text(
COALESCE(user_tags, '[]'::jsonb)
) AS elem
), '[]'::jsonb)
WHERE user_tags @> :old_tag_jsonb
""").bindparams(
old_tag=old_tag,
new_tag=new_tag,
old_tag_jsonb=f'["{old_tag}"]',
)
)
# 카테고리 row 갱신 (path, name만. parent_path 유지)
category.path = new_path
category.name = new_name
await session.commit()
await session.refresh(category)
return CategoryResponse.model_validate(category)
@router.delete("/categories", status_code=204)
async def delete_category(
path: str = Query(..., description="삭제할 카테고리 경로"),
user: Annotated[User, Depends(get_current_user)] = None,
session: Annotated[AsyncSession, Depends(get_session)] = None,
):
"""카테고리 삭제 (leaf only, 문서 없는 경우만)"""
result = await session.execute(
select(LibraryCategory).where(LibraryCategory.path == path)
)
category = result.scalar_one_or_none()
if not category:
raise HTTPException(status_code=404, detail="분류를 찾을 수 없습니다")
if category.is_system:
raise HTTPException(status_code=422, detail="시스템 분류는 삭제할 수 없습니다")
# leaf 검사
children = await session.execute(
select(func.count()).where(
LibraryCategory.parent_path == category.path
)
)
if children.scalar() > 0:
raise HTTPException(
status_code=422, detail="하위 분류가 있어 삭제할 수 없습니다"
)
# 문서 연결 검사
tag = f"{LIBRARY_PREFIX}{category.path}"
doc_count = await session.execute(
sql_text("""
SELECT COUNT(*) FROM documents
WHERE deleted_at IS NULL
AND EXISTS (
SELECT 1 FROM jsonb_array_elements_text(
COALESCE(user_tags, '[]'::jsonb)
) AS t
WHERE t = :tag
)
""").bindparams(tag=tag)
)
if doc_count.scalar() > 0:
raise HTTPException(
status_code=422,
detail="이 분류에 속한 문서가 있어 삭제할 수 없습니다. 문서를 먼저 이동하세요.",
)
await session.delete(category)
await session.commit()
@router.get("/tree", response_model=list[CategoryTreeNode])
async def get_library_tree(
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
):
"""카테고리 저장소 + 문서 태그 count 머지 트리"""
# 1. 카테고리 전체 fetch
cat_result = await session.execute(
select(LibraryCategory).order_by(LibraryCategory.path)
)
categories = cat_result.scalars().all()
# path → category 매핑
cat_map: dict[str, LibraryCategory] = {c.path: c for c in categories}
# 2. 문서 태그에서 doc count 집계
doc_result = await session.execute(
select(Document.id, Document.user_tags).where(
Document.deleted_at == None, # noqa: E711
Document.user_tags != None, # noqa: E711
)
)
# path → set of doc_ids
path_docs: dict[str, set[int]] = {}
for doc_id, tags in doc_result:
if not tags:
continue
seen_ancestors: set[str] = set()
for tag in tags:
if not isinstance(tag, str) or not tag.startswith(LIBRARY_PREFIX):
continue
path = tag[len(LIBRARY_PREFIX):]
parts = path.split("/")
for i in range(1, len(parts) + 1):
ancestor = "/".join(parts[:i])
if ancestor not in seen_ancestors:
path_docs.setdefault(ancestor, set()).add(doc_id)
seen_ancestors.add(ancestor)
# 3. 모든 path 합산 (카테고리 + 태그)
all_paths = set(cat_map.keys()) | set(path_docs.keys())
# 4. 트리 구축
root: dict = {}
for p in sorted(all_paths):
parts = p.split("/")
node = root
for i, part in enumerate(parts):
if part not in node:
node[part] = {"_children": {}}
node = node[part]["_children"] if i < len(parts) - 1 else node[part]
def build_tree(d: dict, prefix: str = "") -> list[dict]:
nodes = []
for name, data in sorted(d.items()):
if name.startswith("_"):
continue
path = f"{prefix}/{name}" if prefix else name
children_dict = data.get("_children", {})
children = build_tree(children_dict, path)
cat = cat_map.get(path)
nodes.append(CategoryTreeNode(
name=name,
path=path,
count=len(path_docs.get(path, set())),
is_category=path in cat_map,
is_system=cat.is_system if cat else False,
has_children=len(children) > 0,
children=children,
))
return nodes
return build_tree(root)