Files
hyungi_document_server/app/api/library.py
Hyungi Ahn 49d8f68986 feat(library): 자료실 회독 카운트 추적 (PR-A backend)
자료실 자료를 사용자가 명시적으로 "1회독 완료" 클릭 시 +1 누적.
detail 진입 자동 카운트 . append-only 로그.

데이터:
- migrations 174~176: document_reads 테이블 + 인덱스 2개 (단일 statement 분할)

ORM:
- app/models/document_read.py: DocumentRead (user_id, document_id, read_at)

API (app/api/document_reads.py, /api/documents prefix):
- POST   /api/documents/{id}/read       — 회독 +1
- GET    /api/documents/{id}/read-stats — {read_count, last_read_at}
- DELETE /api/documents/{id}/read/last  — 현재 사용자의 그 문서 마지막 1건만
  · ownership: WHERE user_id=current_user.id AND document_id=:doc_id
  · documents 에 user_id 부재 (single-user). multi-user 전환 시 ownership
    check 추가 필요 — 코드 주석 명시.

응답 확장:
- DocumentResponse: read_count(default 0), last_read_at(default None)
- /api/documents/library: 페이지 N건 한정 LEFT JOIN 으로 read 통계 매핑 (N+1 회피)
- /api/library/tree CategoryTreeNode: unread_count 추가
  · 기존 path_docs 가 ancestor 누적 구조라 그대로 활용 — 하위 경로 합산 자동

규칙 (사용자 명시 — 변경 금지):
  · 같은 날 여러 번 클릭 → 각각 별개 회독
  · 실수 클릭 취소 = DELETE /read/last
  · documents 에 read_count 컬럼 추가 , 로그 기반 COUNT(*) 만

plan: ~/.claude/plans/scalable-chasing-stonebraker.md
브랜치: feature/library-reads (손글씨 트랙과 분리)
2026-04-27 12:08:36 +09:00

545 lines
18 KiB
Python

"""자료실 분류 체계 CRUD API — /api/library"""
from datetime import datetime
from typing import Annotated
from fastapi import APIRouter, Depends, HTTPException, Query
from pydantic import BaseModel
from sqlalchemy import func, select
from sqlalchemy import text as sql_text
from sqlalchemy.ext.asyncio import AsyncSession
from core.auth import get_current_user
from core.database import get_session
from core.library import LIBRARY_PREFIX, MAX_DEPTH, normalize_library_path
from models.category import LibraryCategory
from models.document import Document
from models.facet_value import FacetValue
from models.user import User
FACET_TYPES = ("company", "topic", "doctype") # year는 사전 불필요
router = APIRouter()
# ─── 스키마 ───
class CategoryCreate(BaseModel):
path: str
class CategoryRename(BaseModel):
path: str
new_name: str
class CategoryResponse(BaseModel):
id: int
path: str
name: str
parent_path: str | None
depth: int
is_system: bool
created_at: datetime
updated_at: datetime
model_config = {"from_attributes": True}
class CategoryTreeNode(BaseModel):
name: str
path: str
count: int
# 현재 사용자 기준, 해당 경로 (하위 경로 포함) 의 안 본 자료 수.
# 0 이면 모두 1+회독.
unread_count: int = 0
is_category: bool
is_system: bool
has_children: bool
children: list["CategoryTreeNode"]
# ─── 엔드포인트 ───
@router.get("/categories", response_model=list[CategoryResponse])
async def list_categories(
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
):
"""전체 카테고리 flat 목록 (path 순)"""
result = await session.execute(
select(LibraryCategory).order_by(LibraryCategory.path)
)
return [CategoryResponse.model_validate(c) for c in result.scalars().all()]
@router.post("/categories", response_model=CategoryResponse, status_code=201)
async def create_category(
body: CategoryCreate,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
):
"""카테고리 생성 (조상 자동 생성 포함)"""
try:
normalized = normalize_library_path(body.path)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
segments = normalized.split("/")
if len(segments) > MAX_DEPTH:
raise HTTPException(status_code=400, detail=f"최대 {MAX_DEPTH}단계까지 가능")
# 중복 검사
existing = await session.execute(
select(LibraryCategory).where(LibraryCategory.path == normalized)
)
if existing.scalar_one_or_none():
raise HTTPException(status_code=409, detail="이미 존재하는 분류 경로")
# 조상 자동 생성
for i in range(1, len(segments)):
ancestor_path = "/".join(segments[:i])
ancestor_name = segments[i - 1]
ancestor_parent = "/".join(segments[: i - 1]) or None
exists = await session.execute(
select(LibraryCategory.id).where(
LibraryCategory.path == ancestor_path
)
)
if not exists.scalar_one_or_none():
session.add(LibraryCategory(
path=ancestor_path,
name=ancestor_name,
parent_path=ancestor_parent,
depth=i,
))
# 본 카테고리 생성
category = LibraryCategory(
path=normalized,
name=segments[-1],
parent_path="/".join(segments[:-1]) or None,
depth=len(segments),
)
session.add(category)
await session.commit()
await session.refresh(category)
return CategoryResponse.model_validate(category)
@router.patch("/categories", response_model=CategoryResponse)
async def rename_category(
body: CategoryRename,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
):
"""카테고리 이름 변경 (leaf only, path 기반 식별)"""
# 카테고리 조회
result = await session.execute(
select(LibraryCategory).where(LibraryCategory.path == body.path)
)
category = result.scalar_one_or_none()
if not category:
raise HTTPException(status_code=404, detail="분류를 찾을 수 없습니다")
# 시스템 분류 보호
if category.is_system:
raise HTTPException(status_code=422, detail="시스템 분류는 변경할 수 없습니다")
# leaf 검사
children = await session.execute(
select(func.count()).where(
LibraryCategory.parent_path == category.path
)
)
if children.scalar() > 0:
raise HTTPException(
status_code=422, detail="하위 분류가 있어 이름을 변경할 수 없습니다"
)
# new_name 검증
new_name = body.new_name.strip()
if not new_name:
raise HTTPException(status_code=400, detail="빈 이름")
if len(new_name) > 30:
raise HTTPException(status_code=400, detail="이름은 30자 이하")
# 새 path 계산
new_path = (
f"{category.parent_path}/{new_name}" if category.parent_path else new_name
)
# 중복 검사
dup = await session.execute(
select(LibraryCategory.id).where(LibraryCategory.path == new_path)
)
if dup.scalar_one_or_none():
raise HTTPException(status_code=409, detail="같은 이름의 분류가 이미 존재합니다")
old_tag = f"{LIBRARY_PREFIX}{category.path}"
new_tag = f"{LIBRARY_PREFIX}{new_path}"
# 문서 태그 갱신
await session.execute(
sql_text("""
UPDATE documents
SET user_tags = COALESCE((
SELECT jsonb_agg(
CASE WHEN elem = :old_tag THEN :new_tag ELSE elem END
)
FROM jsonb_array_elements_text(
COALESCE(user_tags, '[]'::jsonb)
) AS elem
), '[]'::jsonb)
WHERE user_tags @> :old_tag_jsonb
""").bindparams(
old_tag=old_tag,
new_tag=new_tag,
old_tag_jsonb=f'["{old_tag}"]',
)
)
# 카테고리 row 갱신 (path, name만. parent_path 유지)
category.path = new_path
category.name = new_name
await session.commit()
await session.refresh(category)
return CategoryResponse.model_validate(category)
@router.delete("/categories", status_code=204)
async def delete_category(
path: str = Query(..., description="삭제할 카테고리 경로"),
user: Annotated[User, Depends(get_current_user)] = None,
session: Annotated[AsyncSession, Depends(get_session)] = None,
):
"""카테고리 삭제 (leaf only, 문서 없는 경우만)"""
result = await session.execute(
select(LibraryCategory).where(LibraryCategory.path == path)
)
category = result.scalar_one_or_none()
if not category:
raise HTTPException(status_code=404, detail="분류를 찾을 수 없습니다")
if category.is_system:
raise HTTPException(status_code=422, detail="시스템 분류는 삭제할 수 없습니다")
# leaf 검사
children = await session.execute(
select(func.count()).where(
LibraryCategory.parent_path == category.path
)
)
if children.scalar() > 0:
raise HTTPException(
status_code=422, detail="하위 분류가 있어 삭제할 수 없습니다"
)
# 문서 연결 검사
tag = f"{LIBRARY_PREFIX}{category.path}"
doc_count = await session.execute(
sql_text("""
SELECT COUNT(*) FROM documents
WHERE deleted_at IS NULL
AND EXISTS (
SELECT 1 FROM jsonb_array_elements_text(
COALESCE(user_tags, '[]'::jsonb)
) AS t
WHERE t = :tag
)
""").bindparams(tag=tag)
)
if doc_count.scalar() > 0:
raise HTTPException(
status_code=422,
detail="이 분류에 속한 문서가 있어 삭제할 수 없습니다. 문서를 먼저 이동하세요.",
)
await session.delete(category)
await session.commit()
@router.get("/tree", response_model=list[CategoryTreeNode])
async def get_library_tree(
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
):
"""카테고리 저장소 + 문서 태그 count 머지 트리"""
# 1. 카테고리 전체 fetch
cat_result = await session.execute(
select(LibraryCategory).order_by(LibraryCategory.path)
)
categories = cat_result.scalars().all()
# path → category 매핑
cat_map: dict[str, LibraryCategory] = {c.path: c for c in categories}
# 2. 문서 태그에서 doc count 집계
doc_result = await session.execute(
select(Document.id, Document.user_tags).where(
Document.deleted_at == None, # noqa: E711
Document.user_tags != None, # noqa: E711
)
)
# path → set of doc_ids
path_docs: dict[str, set[int]] = {}
for doc_id, tags in doc_result:
if not tags:
continue
seen_ancestors: set[str] = set()
for tag in tags:
if not isinstance(tag, str) or not tag.startswith(LIBRARY_PREFIX):
continue
path = tag[len(LIBRARY_PREFIX):]
parts = path.split("/")
for i in range(1, len(parts) + 1):
ancestor = "/".join(parts[:i])
if ancestor not in seen_ancestors:
path_docs.setdefault(ancestor, set()).add(doc_id)
seen_ancestors.add(ancestor)
# 2.5 현재 사용자가 1+회독 한 doc_id 집합 (안 본 자료 = 전체 - 읽음)
from models.document_read import DocumentRead
read_result = await session.execute(
select(DocumentRead.document_id)
.where(DocumentRead.user_id == user.id)
.group_by(DocumentRead.document_id)
)
read_doc_ids: set[int] = {r[0] for r in read_result}
# 3. 모든 path 합산 (카테고리 + 태그)
all_paths = set(cat_map.keys()) | set(path_docs.keys())
# 4. 트리 구축
root: dict = {}
for p in sorted(all_paths):
parts = p.split("/")
node = root
for i, part in enumerate(parts):
if part not in node:
node[part] = {"_children": {}}
node = node[part]["_children"] if i < len(parts) - 1 else node[part]
def build_tree(d: dict, prefix: str = "") -> list[dict]:
nodes = []
for name, data in sorted(d.items()):
if name.startswith("_"):
continue
path = f"{prefix}/{name}" if prefix else name
children_dict = data.get("_children", {})
children = build_tree(children_dict, path)
cat = cat_map.get(path)
# path_docs[path] 는 이미 본 노드의 자손 doc 까지 누적되어 있음 (위 ancestor 누적 로직).
# 따라서 unread_count 도 하위 경로 전체 합산 (bottom-up 별도 계산 불필요).
docs_at_path = path_docs.get(path, set())
unread = len(docs_at_path - read_doc_ids)
nodes.append(CategoryTreeNode(
name=name,
path=path,
count=len(docs_at_path),
unread_count=unread,
is_category=path in cat_map,
is_system=cat.is_system if cat else False,
has_children=len(children) > 0,
children=children,
))
return nodes
return build_tree(root)
# ─── Facet API (Phase 2) ───
class FacetValueResponse(BaseModel):
facet_type: str
value: str
model_config = {"from_attributes": True}
class FacetCountItem(BaseModel):
value: str
count: int
class FacetCountsResponse(BaseModel):
company: list[FacetCountItem]
topic: list[FacetCountItem]
year: list[FacetCountItem]
doctype: list[FacetCountItem]
@router.get("/facets", response_model=dict[str, list[str]])
async def get_facet_values(
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
):
"""facet 축별 허용값 사전 (year는 실제 데이터 기반)"""
result: dict[str, list[str]] = {}
for ft in FACET_TYPES:
rows = await session.execute(
select(FacetValue.value)
.where(FacetValue.facet_type == ft)
.order_by(FacetValue.value)
)
result[ft] = [r[0] for r in rows]
# year는 사전 없이 실제 문서 값에서 추출
year_rows = await session.execute(
select(Document.facet_year)
.where(
Document.deleted_at == None, # noqa: E711
Document.facet_year != None, # noqa: E711
)
.distinct()
.order_by(Document.facet_year.desc())
)
result["year"] = [str(r[0]) for r in year_rows]
return result
@router.post("/facets", response_model=FacetValueResponse, status_code=201)
async def add_facet_value(
body: FacetValueResponse,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
):
"""facet 사전에 새 값 추가"""
if body.facet_type not in FACET_TYPES:
raise HTTPException(status_code=400, detail=f"허용 facet: {', '.join(FACET_TYPES)}")
value = body.value.strip()
if not value:
raise HTTPException(status_code=400, detail="빈 값")
existing = await session.execute(
select(FacetValue).where(
FacetValue.facet_type == body.facet_type,
FacetValue.value == value,
)
)
if existing.scalar_one_or_none():
raise HTTPException(status_code=409, detail="이미 존재하는 값")
fv = FacetValue(facet_type=body.facet_type, value=value)
session.add(fv)
await session.commit()
return FacetValueResponse(facet_type=body.facet_type, value=value)
@router.get("/facet-counts", response_model=FacetCountsResponse)
async def get_facet_counts(
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
library_path: str | None = None,
facet_company: str | None = None,
facet_topic: str | None = None,
facet_year: int | None = None,
facet_doctype: str | None = None,
q: str | None = None,
):
"""현재 필터 기준 facet별 집계 count"""
def base_query():
query = select(Document).where(
Document.deleted_at == None, # noqa: E711
Document.doc_purpose == "business",
)
if library_path:
exact = f"{LIBRARY_PREFIX}{library_path}"
prefix = f"{LIBRARY_PREFIX}{library_path}/%"
query = query.where(
sql_text("""
EXISTS (
SELECT 1 FROM jsonb_array_elements_text(
COALESCE(documents.user_tags, '[]'::jsonb)
) AS t
WHERE t = :exact OR t LIKE :prefix
)
""").bindparams(exact=exact, prefix=prefix)
)
if q:
query = query.where(Document.title.ilike(f"%{q}%"))
return query
result = FacetCountsResponse(company=[], topic=[], year=[], doctype=[])
# company counts (다른 facet 필터 적용, 자기 자신 제외)
q_company = base_query()
if facet_topic:
q_company = q_company.where(Document.facet_topic == facet_topic)
if facet_year:
q_company = q_company.where(Document.facet_year == facet_year)
if facet_doctype:
q_company = q_company.where(Document.facet_doctype == facet_doctype)
rows = await session.execute(
select(Document.facet_company, func.count())
.where(Document.facet_company != None) # noqa: E711
.where(Document.id.in_(q_company.with_only_columns(Document.id).subquery().select()))
.group_by(Document.facet_company)
.order_by(func.count().desc())
)
result.company = [FacetCountItem(value=r[0], count=r[1]) for r in rows]
# topic counts
q_topic = base_query()
if facet_company:
q_topic = q_topic.where(Document.facet_company == facet_company)
if facet_year:
q_topic = q_topic.where(Document.facet_year == facet_year)
if facet_doctype:
q_topic = q_topic.where(Document.facet_doctype == facet_doctype)
rows = await session.execute(
select(Document.facet_topic, func.count())
.where(Document.facet_topic != None) # noqa: E711
.where(Document.id.in_(q_topic.with_only_columns(Document.id).subquery().select()))
.group_by(Document.facet_topic)
.order_by(func.count().desc())
)
result.topic = [FacetCountItem(value=r[0], count=r[1]) for r in rows]
# year counts
q_year = base_query()
if facet_company:
q_year = q_year.where(Document.facet_company == facet_company)
if facet_topic:
q_year = q_year.where(Document.facet_topic == facet_topic)
if facet_doctype:
q_year = q_year.where(Document.facet_doctype == facet_doctype)
rows = await session.execute(
select(Document.facet_year, func.count())
.where(Document.facet_year != None) # noqa: E711
.where(Document.id.in_(q_year.with_only_columns(Document.id).subquery().select()))
.group_by(Document.facet_year)
.order_by(Document.facet_year.desc())
)
result.year = [FacetCountItem(value=str(r[0]), count=r[1]) for r in rows]
# doctype counts
q_doctype = base_query()
if facet_company:
q_doctype = q_doctype.where(Document.facet_company == facet_company)
if facet_topic:
q_doctype = q_doctype.where(Document.facet_topic == facet_topic)
if facet_year:
q_doctype = q_doctype.where(Document.facet_year == facet_year)
rows = await session.execute(
select(Document.facet_doctype, func.count())
.where(Document.facet_doctype != None) # noqa: E711
.where(Document.id.in_(q_doctype.with_only_columns(Document.id).subquery().select()))
.group_by(Document.facet_doctype)
.order_by(func.count().desc())
)
result.doctype = [FacetCountItem(value=r[0], count=r[1]) for r in rows]
return result