Files
hyungi_document_server/app/api/documents.py
T
Hyungi Ahn ba19c6fb79 feat(library): Phase 2A facet 탐색 기반 — 컬럼 + API + 필터
documents 테이블에 facet_company/topic/year/doctype 4개 축 추가.
facet_values 사전 테이블 + CRUD API.
facet-counts 집계 API (교차 필터링 지원).
문서 목록 API에 facet 필터 파라미터 추가.
DocumentResponse/DocumentUpdate 스키마에 facet 필드 포함.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-15 10:09:25 +09:00

631 lines
21 KiB
Python

"""문서 CRUD API"""
import shutil
from datetime import datetime, timezone
from pathlib import Path
from typing import Annotated
from urllib.parse import quote
from fastapi import APIRouter, Depends, Form, HTTPException, Query, UploadFile, status
from fastapi.responses import FileResponse
from pydantic import BaseModel
from sqlalchemy import func, select
from sqlalchemy.ext.asyncio import AsyncSession
from core.auth import get_current_user
from core.config import settings
from core.database import get_session
from core.utils import file_hash
from models.document import Document
from models.queue import ProcessingQueue, enqueue_stage
from models.user import User
router = APIRouter()
# ─── 스키마 ───
class DocumentResponse(BaseModel):
id: int
file_path: str | None
file_format: str
file_size: int | None
file_type: str
title: str | None
ai_domain: str | None
ai_sub_group: str | None
ai_tags: list | None
ai_summary: str | None
document_type: str | None
importance: str | None
ai_confidence: float | None
user_note: str | None
user_tags: list | None
pinned: bool | None
ask_includable: bool | None
derived_path: str | None
original_format: str | None
conversion_status: str | None
is_read: bool | None
review_status: str | None
edit_url: str | None
preview_status: str | None
source_channel: str | None
data_origin: str | None
doc_purpose: str | None
facet_company: str | None = None
facet_topic: str | None = None
facet_year: int | None = None
facet_doctype: str | None = None
extracted_at: datetime | None
ai_processed_at: datetime | None
embedded_at: datetime | None
created_at: datetime
updated_at: datetime
class Config:
from_attributes = True
class DocumentListResponse(BaseModel):
items: list[DocumentResponse]
total: int
page: int
page_size: int
class DocumentUpdate(BaseModel):
title: str | None = None
ai_domain: str | None = None
ai_sub_group: str | None = None
ai_tags: list | None = None
user_tags: list | None = None
user_note: str | None = None
is_read: bool | None = None
edit_url: str | None = None
source_channel: str | None = None
data_origin: str | None = None
doc_purpose: str | None = None
pinned: bool | None = None
facet_company: str | None = None
facet_topic: str | None = None
facet_year: int | None = None
facet_doctype: str | None = None
# ─── 스키마 (트리) ───
class TreeNode(BaseModel):
name: str
path: str
count: int
children: list["TreeNode"]
# ─── 엔드포인트 ───
@router.get("/tree")
async def get_document_tree(
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
):
"""도메인 트리 (3단계 경로 파싱, 사이드바용)"""
from sqlalchemy import text as sql_text
result = await session.execute(
sql_text("""
SELECT ai_domain, COUNT(*)
FROM documents
WHERE ai_domain IS NOT NULL AND ai_domain != '' AND ai_domain != 'News'
AND deleted_at IS NULL
GROUP BY ai_domain
ORDER BY ai_domain
""")
)
# 경로를 트리로 파싱
root: dict = {}
for domain_path, count in result:
parts = domain_path.split("/")
node = root
for part in parts:
if part not in node:
node[part] = {"_count": 0, "_children": {}}
node[part]["_count"] += count
node = node[part]["_children"]
def build_tree(d: dict, prefix: str = "") -> list[dict]:
nodes = []
for name, data in sorted(d.items()):
path = f"{prefix}/{name}" if prefix else name
children = build_tree(data["_children"], path)
nodes.append({
"name": name,
"path": path,
"count": data["_count"],
"children": children,
})
return nodes
return build_tree(root)
@router.get("/library-tree")
async def get_library_tree(
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
):
"""자료실 트리 (user_tags @library/ 경로 기반, unique doc count)"""
from core.library import LIBRARY_PREFIX
result = await session.execute(
select(Document.id, Document.user_tags).where(
Document.deleted_at == None, # noqa: E711
Document.user_tags != None, # noqa: E711
)
)
root: dict = {}
for doc_id, tags in result:
if not tags:
continue
seen_ancestors: set[str] = set()
for tag in tags:
if not isinstance(tag, str) or not tag.startswith(LIBRARY_PREFIX):
continue
path = tag[len(LIBRARY_PREFIX):]
parts = path.split("/")
node = root
for i, part in enumerate(parts):
if part not in node:
node[part] = {"_docs": set(), "_children": {}}
ancestor_key = "/".join(parts[: i + 1])
if ancestor_key not in seen_ancestors:
node[part]["_docs"].add(doc_id)
seen_ancestors.add(ancestor_key)
node = node[part]["_children"]
def build_library_tree(d: dict, prefix: str = "") -> list[dict]:
nodes = []
for name, data in sorted(d.items()):
if name.startswith("_"):
continue
path = f"{prefix}/{name}" if prefix else name
children = build_library_tree(data["_children"], path)
nodes.append({
"name": name,
"path": path,
"count": len(data["_docs"]),
"children": children,
})
return nodes
return build_library_tree(root)
@router.get("/library", response_model=DocumentListResponse)
async def list_library_documents(
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
path: str | None = None,
q: str | None = None,
sort: str = Query("updated_desc"),
page: int = Query(1, ge=1),
page_size: int = Query(20, ge=1, le=100),
facet_company: str | None = None,
facet_topic: str | None = None,
facet_year: int | None = None,
facet_doctype: str | None = None,
):
"""자료실 문서 목록 (prefix match, title 검색, facet 필터, 정렬)"""
from sqlalchemy import text as sql_text
from core.library import LIBRARY_PREFIX, normalize_library_path
# path 쿼리 정규화 (PATCH와 동일 semantics)
if path:
try:
path = normalize_library_path(path)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
query = select(Document).where(
Document.deleted_at == None, # noqa: E711
)
if path:
exact = f"{LIBRARY_PREFIX}{path}"
prefix = f"{LIBRARY_PREFIX}{path}/%"
query = query.where(
sql_text("""
EXISTS (
SELECT 1 FROM jsonb_array_elements_text(documents.user_tags) AS t
WHERE t = :exact OR t LIKE :prefix
)
""").bindparams(exact=exact, prefix=prefix)
)
else:
query = query.where(
sql_text("""
EXISTS (
SELECT 1 FROM jsonb_array_elements_text(documents.user_tags) AS t
WHERE t LIKE '@library/%'
)
""")
)
if q:
query = query.where(Document.title.ilike(f"%{q}%"))
# facet 필터
if facet_company:
query = query.where(Document.facet_company == facet_company)
if facet_topic:
query = query.where(Document.facet_topic == facet_topic)
if facet_year:
query = query.where(Document.facet_year == facet_year)
if facet_doctype:
query = query.where(Document.facet_doctype == facet_doctype)
# 전체 건수
count_query = select(func.count()).select_from(query.subquery())
total = (await session.execute(count_query)).scalar()
# 정렬
sort_map = {
"updated_desc": Document.updated_at.desc(),
"title_asc": Document.title.asc(),
"created_desc": Document.created_at.desc(),
}
query = query.order_by(sort_map.get(sort, Document.updated_at.desc()))
query = query.offset((page - 1) * page_size).limit(page_size)
result = await session.execute(query)
items = result.scalars().all()
return DocumentListResponse(
items=[DocumentResponse.model_validate(doc) for doc in items],
total=total,
page=page,
page_size=page_size,
)
@router.get("/", response_model=DocumentListResponse)
async def list_documents(
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
page: int = Query(1, ge=1),
page_size: int = Query(20, ge=1, le=500),
domain: str | None = None,
sub_group: str | None = None,
source: str | None = None,
format: str | None = None,
review_status: str | None = Query(None, description="pending | approved | rejected"),
):
"""문서 목록 조회 (페이지네이션 + 필터, 뉴스/메모 제외)"""
query = select(Document).where(
Document.deleted_at == None, # noqa: E711
Document.source_channel != "news",
Document.file_type != "note",
)
if domain:
# prefix 매칭: Industrial_Safety 클릭 시 하위 전부 포함
query = query.where(Document.ai_domain.startswith(domain))
if source:
query = query.where(Document.source_channel == source)
if format:
query = query.where(Document.file_format == format)
if review_status:
query = query.where(Document.review_status == review_status)
# 전체 건수
count_query = select(func.count()).select_from(query.subquery())
total = (await session.execute(count_query)).scalar()
# 페이지네이션
query = query.order_by(Document.created_at.desc())
query = query.offset((page - 1) * page_size).limit(page_size)
result = await session.execute(query)
items = result.scalars().all()
return DocumentListResponse(
items=[DocumentResponse.model_validate(doc) for doc in items],
total=total,
page=page,
page_size=page_size,
)
@router.get("/{doc_id}", response_model=DocumentResponse)
async def get_document(
doc_id: int,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
):
"""문서 단건 조회"""
doc = await session.get(Document, doc_id)
if not doc or doc.deleted_at is not None:
raise HTTPException(status_code=404, detail="문서를 찾을 수 없습니다")
return DocumentResponse.model_validate(doc)
@router.get("/{doc_id}/file")
async def get_document_file(
doc_id: int,
session: Annotated[AsyncSession, Depends(get_session)],
token: str | None = Query(None, description="Bearer token (iframe용)"),
download: bool = Query(False, description="true면 attachment (브라우저 다운로드)"),
user: User | None = Depends(lambda: None),
):
"""문서 원본 파일 서빙 (Bearer 헤더 또는 ?token= 쿼리 파라미터)"""
from core.auth import decode_token
# 쿼리 파라미터 토큰 검증
if token:
payload = decode_token(token)
if not payload or payload.get("type") != "access":
raise HTTPException(status_code=401, detail="유효하지 않은 토큰")
else:
# 일반 Bearer 헤더 인증 시도
raise HTTPException(status_code=401, detail="토큰이 필요합니다")
doc = await session.get(Document, doc_id)
if not doc:
raise HTTPException(status_code=404, detail="문서를 찾을 수 없습니다")
# note(메모)는 물리 파일이 없음
if not doc.file_path:
raise HTTPException(status_code=404, detail="파일이 없는 문서입니다 (메모)")
file_path = Path(settings.nas_mount_path) / doc.file_path
if not file_path.exists():
raise HTTPException(status_code=404, detail="파일을 찾을 수 없습니다")
# 미디어 타입 매핑
media_types = {
".pdf": "application/pdf",
".jpg": "image/jpeg", ".jpeg": "image/jpeg",
".png": "image/png", ".gif": "image/gif",
".bmp": "image/bmp", ".tiff": "image/tiff",
".svg": "image/svg+xml",
".txt": "text/plain", ".md": "text/plain",
".html": "text/html", ".csv": "text/csv",
".json": "application/json", ".xml": "application/xml",
}
suffix = file_path.suffix.lower()
media_type = media_types.get(suffix, "application/octet-stream")
# Content-Disposition: download=true면 attachment (한글 filename* 호환)
if download:
raw_title = doc.title or f"document-{doc_id}"
ascii_fallback = raw_title.encode("ascii", "replace").decode()
utf8_encoded = quote(f"{raw_title}{suffix}")
disposition = f'attachment; filename="{ascii_fallback}{suffix}"; filename*=UTF-8\'\'{utf8_encoded}'
else:
disposition = "inline"
return FileResponse(
path=str(file_path),
media_type=media_type,
headers={"Content-Disposition": disposition},
)
@router.post("/", response_model=DocumentResponse, status_code=201)
async def upload_document(
file: UploadFile,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
doc_purpose: str | None = Form(None, description="business | knowledge"),
library_path: str | None = Form(None, description="자료실 경로 (자동 @library/ 태깅)"),
):
"""파일 업로드 → Inbox 저장 + DB 등록 + 처리 큐 등록"""
from core.library import DEFAULT_LIBRARY_PATH, LIBRARY_PREFIX, normalize_library_path
# doc_purpose 검증
if doc_purpose is not None:
doc_purpose = doc_purpose.strip().lower()
if doc_purpose == "":
doc_purpose = None
elif doc_purpose not in ("business", "knowledge"):
raise HTTPException(status_code=400, detail="doc_purpose는 business 또는 knowledge만 가능")
# library_path 검증 + 정규화
library_tag = None
if library_path:
try:
normalized = normalize_library_path(library_path)
library_tag = f"{LIBRARY_PREFIX}{normalized}"
except ValueError as e:
raise HTTPException(status_code=400, detail=f"잘못된 자료실 경로: {e}")
# 자료실 업로드인데 경로 미지정 → 미분류 자동 태깅
if doc_purpose == "business" and not library_tag:
library_tag = f"{LIBRARY_PREFIX}{DEFAULT_LIBRARY_PATH}"
if not file.filename:
raise HTTPException(status_code=400, detail="파일명이 필요합니다")
# 파일명 정규화 (경로 이탈 방지)
safe_name = Path(file.filename).name
if not safe_name or safe_name.startswith("."):
raise HTTPException(status_code=400, detail="유효하지 않은 파일명")
# Inbox에 파일 저장
inbox_dir = Path(settings.nas_mount_path) / "PKM" / "Inbox"
inbox_dir.mkdir(parents=True, exist_ok=True)
target = (inbox_dir / safe_name).resolve()
# Inbox 하위 경로 검증
if not str(target).startswith(str(inbox_dir.resolve())):
raise HTTPException(status_code=400, detail="잘못된 파일 경로")
# 중복 파일명 처리
counter = 1
stem, suffix = target.stem, target.suffix
while target.exists():
target = inbox_dir.resolve() / f"{stem}_{counter}{suffix}"
counter += 1
content = await file.read()
target.write_bytes(content)
# 상대 경로 (NAS 루트 기준)
rel_path = str(target.relative_to(Path(settings.nas_mount_path)))
fhash = file_hash(target)
ext = target.suffix.lstrip(".").lower() or "unknown"
# DB 등록
doc = Document(
file_path=rel_path,
file_hash=fhash,
file_format=ext,
file_size=len(content),
file_type="immutable",
title=target.stem,
source_channel="manual",
doc_purpose=doc_purpose,
user_tags=[library_tag] if library_tag else [],
)
session.add(doc)
await session.flush()
# 처리 큐 등록
await enqueue_stage(session, doc.id, "extract")
await session.commit()
return DocumentResponse.model_validate(doc)
@router.patch("/{doc_id}", response_model=DocumentResponse)
async def update_document(
doc_id: int,
body: DocumentUpdate,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
):
"""문서 메타데이터 수정 (수동 오버라이드)"""
from core.library import validate_user_tags
doc = await session.get(Document, doc_id)
if not doc:
raise HTTPException(status_code=404, detail="문서를 찾을 수 없습니다")
update_data = body.model_dump(exclude_unset=True)
# user_tags 검증: @library/ 경로 정규화 + 타입/중복 체크
if "user_tags" in update_data and update_data["user_tags"] is not None:
try:
update_data["user_tags"] = validate_user_tags(update_data["user_tags"])
except (TypeError, ValueError) as e:
raise HTTPException(status_code=400, detail=str(e))
# doc_purpose 검증
if "doc_purpose" in update_data:
val = update_data["doc_purpose"]
if val is not None and val not in ("business", "knowledge"):
raise HTTPException(status_code=400, detail="doc_purpose는 business 또는 knowledge만 가능")
for field, value in update_data.items():
setattr(doc, field, value)
doc.updated_at = datetime.now(timezone.utc)
await session.commit()
return DocumentResponse.model_validate(doc)
@router.put("/{doc_id}/content")
async def save_document_content(
doc_id: int,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
body: dict = None,
):
"""Markdown 원본 파일 저장 + extracted_text 갱신"""
doc = await session.get(Document, doc_id)
if not doc:
raise HTTPException(status_code=404, detail="문서를 찾을 수 없습니다")
if doc.file_format not in ("md", "txt"):
raise HTTPException(status_code=400, detail="편집 가능한 포맷이 아닙니다 (md, txt만 가능)")
# note(메모)는 /api/memos/{id} PATCH로 수정
if not doc.file_path:
raise HTTPException(status_code=400, detail="파일이 없는 문서입니다. 메모는 /api/memos 사용")
content = body.get("content", "") if body else ""
file_path = Path(settings.nas_mount_path) / doc.file_path
file_path.write_text(content, encoding="utf-8")
# 메타 갱신
doc.file_size = len(content.encode("utf-8"))
doc.file_hash = file_hash(file_path)
doc.extracted_text = content[:15000]
doc.updated_at = datetime.now(timezone.utc)
await session.commit()
return DocumentResponse.model_validate(doc)
@router.get("/{doc_id}/preview")
async def get_document_preview(
doc_id: int,
session: Annotated[AsyncSession, Depends(get_session)],
token: str | None = Query(None, description="Bearer token (iframe용)"),
download: bool = Query(False, description="true면 attachment (PDF 다운로드)"),
):
"""PDF 미리보기 캐시 서빙"""
from core.auth import decode_token
if token:
payload = decode_token(token)
if not payload or payload.get("type") != "access":
raise HTTPException(status_code=401, detail="유효하지 않은 토큰")
else:
raise HTTPException(status_code=401, detail="토큰이 필요합니다")
doc = await session.get(Document, doc_id)
if not doc:
raise HTTPException(status_code=404, detail="문서를 찾을 수 없습니다")
preview_path = Path(settings.nas_mount_path) / "PKM" / ".preview" / f"{doc_id}.pdf"
if not preview_path.exists():
raise HTTPException(status_code=404, detail="미리보기가 아직 생성되지 않았습니다")
if download:
raw_title = doc.title or f"document-{doc_id}"
ascii_fallback = raw_title.encode("ascii", "replace").decode()
utf8_encoded = quote(f"{raw_title}.pdf")
disposition = f'attachment; filename="{ascii_fallback}.pdf"; filename*=UTF-8\'\'{utf8_encoded}'
else:
disposition = "inline"
return FileResponse(
path=str(preview_path),
media_type="application/pdf",
headers={"Content-Disposition": disposition},
)
@router.delete("/{doc_id}")
async def delete_document(
doc_id: int,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
delete_file: bool = Query(False, description="NAS 파일도 함께 삭제"),
):
"""문서 삭제 (기본: DB만 삭제, 파일 유지)"""
doc = await session.get(Document, doc_id)
if not doc:
raise HTTPException(status_code=404, detail="문서를 찾을 수 없습니다")
# soft-delete (물리 파일은 cleanup job에서 나중에 정리)
doc.deleted_at = datetime.now(timezone.utc)
await session.commit()
return {"message": f"문서 {doc_id} soft-delete 완료"}