7d2e678ea1
기존 `await file.read()` 는 임의 크기 파일을 메모리에 전부 적재한 후 저장해
디스크 고갈 / OOM 공격 벡터 였음. Caddy/home-caddy 프록시 한도에만 의존했고
FastAPI 측 policy enforcement 가 전무했음. 이 커밋으로 서버가 authoritative
으로 강제 집행.
변경:
- `Request` DI 추가 → Content-Length 사전 차단 (max_bytes * slack_ratio 초과 시 413)
- `await file.read()` → 청크 루프 스트리밍 (stream_chunk_bytes 단위)
- 누적 size > max_bytes 시 스트리밍 중 413 (Content-Length 위조 방어)
- 0바이트 파일 → 400 reject (정책: 유의미한 문서 ingest 대상 아님)
- 파일 저장 완료 + close 이후 에만 file_hash 및 DB 레코드 생성
- Document 레코드 와 processing_queue 는 단일 트랜잭션으로 묶고,
DB 예외 시 session rollback + partial file unlink 로 원자적 정리
- 예외 시 `except Exception` 으로 cleanup (BaseException 계열은 의도적으로 패스)
설정 값: config.yaml `upload.{max_bytes, content_length_slack_ratio, stream_chunk_bytes}`.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
983 lines
34 KiB
Python
983 lines
34 KiB
Python
"""문서 CRUD API"""
|
|
|
|
import asyncio
|
|
import logging
|
|
import shutil
|
|
import time
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from typing import Annotated, Literal
|
|
from urllib.parse import quote
|
|
|
|
from fastapi import (
|
|
APIRouter,
|
|
BackgroundTasks,
|
|
Depends,
|
|
Form,
|
|
Header,
|
|
HTTPException,
|
|
Query,
|
|
Request,
|
|
UploadFile,
|
|
status,
|
|
)
|
|
from fastapi.responses import FileResponse
|
|
from pydantic import BaseModel
|
|
from sqlalchemy import func, select
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from ai.client import AIClient, _load_prompt, parse_json_response
|
|
from core.auth import get_current_user
|
|
from core.config import settings
|
|
from core.database import get_session
|
|
from core.utils import file_hash
|
|
from models.document import Document
|
|
from models.queue import ProcessingQueue, enqueue_stage
|
|
from models.user import User
|
|
from services.document_telemetry import record_analyze_event, sanitize_source
|
|
from services.prompt_versions import ANALYZE_PROMPT_VERSION, resolve_primary_model
|
|
from services.search.llm_gate import get_mlx_gate
|
|
|
|
router = APIRouter()
|
|
|
|
|
|
# ─── 스키마 ───
|
|
|
|
|
|
class DocumentResponse(BaseModel):
|
|
id: int
|
|
file_path: str | None
|
|
file_format: str
|
|
file_size: int | None
|
|
file_type: str
|
|
title: str | None
|
|
ai_domain: str | None
|
|
ai_sub_group: str | None
|
|
ai_tags: list | None
|
|
ai_summary: str | None
|
|
document_type: str | None
|
|
importance: str | None
|
|
ai_confidence: float | None
|
|
user_note: str | None
|
|
user_tags: list | None
|
|
pinned: bool | None
|
|
ask_includable: bool | None
|
|
derived_path: str | None
|
|
original_format: str | None
|
|
conversion_status: str | None
|
|
is_read: bool | None
|
|
review_status: str | None
|
|
edit_url: str | None
|
|
preview_status: str | None
|
|
source_channel: str | None
|
|
data_origin: str | None
|
|
doc_purpose: str | None
|
|
facet_company: str | None = None
|
|
facet_topic: str | None = None
|
|
facet_year: int | None = None
|
|
facet_doctype: str | None = None
|
|
extracted_at: datetime | None
|
|
ai_processed_at: datetime | None
|
|
embedded_at: datetime | None
|
|
created_at: datetime
|
|
updated_at: datetime
|
|
|
|
class Config:
|
|
from_attributes = True
|
|
|
|
|
|
class DocumentListResponse(BaseModel):
|
|
items: list[DocumentResponse]
|
|
total: int
|
|
page: int
|
|
page_size: int
|
|
|
|
|
|
class DocumentUpdate(BaseModel):
|
|
title: str | None = None
|
|
ai_domain: str | None = None
|
|
ai_sub_group: str | None = None
|
|
ai_tags: list | None = None
|
|
user_tags: list | None = None
|
|
user_note: str | None = None
|
|
is_read: bool | None = None
|
|
edit_url: str | None = None
|
|
source_channel: str | None = None
|
|
data_origin: str | None = None
|
|
doc_purpose: str | None = None
|
|
pinned: bool | None = None
|
|
facet_company: str | None = None
|
|
facet_topic: str | None = None
|
|
facet_year: int | None = None
|
|
facet_doctype: str | None = None
|
|
|
|
|
|
# ─── 스키마 (트리) ───
|
|
|
|
|
|
class TreeNode(BaseModel):
|
|
name: str
|
|
path: str
|
|
count: int
|
|
children: list["TreeNode"]
|
|
|
|
|
|
# ─── 엔드포인트 ───
|
|
|
|
|
|
@router.get("/tree")
|
|
async def get_document_tree(
|
|
user: Annotated[User, Depends(get_current_user)],
|
|
session: Annotated[AsyncSession, Depends(get_session)],
|
|
):
|
|
"""도메인 트리 (3단계 경로 파싱, 사이드바용)"""
|
|
from sqlalchemy import text as sql_text
|
|
|
|
result = await session.execute(
|
|
sql_text("""
|
|
SELECT ai_domain, COUNT(*)
|
|
FROM documents
|
|
WHERE ai_domain IS NOT NULL AND ai_domain != '' AND ai_domain != 'News'
|
|
AND deleted_at IS NULL
|
|
GROUP BY ai_domain
|
|
ORDER BY ai_domain
|
|
""")
|
|
)
|
|
|
|
# 경로를 트리로 파싱
|
|
root: dict = {}
|
|
for domain_path, count in result:
|
|
parts = domain_path.split("/")
|
|
node = root
|
|
for part in parts:
|
|
if part not in node:
|
|
node[part] = {"_count": 0, "_children": {}}
|
|
node[part]["_count"] += count
|
|
node = node[part]["_children"]
|
|
|
|
def build_tree(d: dict, prefix: str = "") -> list[dict]:
|
|
nodes = []
|
|
for name, data in sorted(d.items()):
|
|
path = f"{prefix}/{name}" if prefix else name
|
|
children = build_tree(data["_children"], path)
|
|
nodes.append({
|
|
"name": name,
|
|
"path": path,
|
|
"count": data["_count"],
|
|
"children": children,
|
|
})
|
|
return nodes
|
|
|
|
return build_tree(root)
|
|
|
|
|
|
@router.get("/library-tree")
|
|
async def get_library_tree(
|
|
user: Annotated[User, Depends(get_current_user)],
|
|
session: Annotated[AsyncSession, Depends(get_session)],
|
|
):
|
|
"""자료실 트리 (user_tags @library/ 경로 기반, unique doc count)"""
|
|
from core.library import LIBRARY_PREFIX
|
|
|
|
result = await session.execute(
|
|
select(Document.id, Document.user_tags).where(
|
|
Document.deleted_at == None, # noqa: E711
|
|
Document.user_tags != None, # noqa: E711
|
|
)
|
|
)
|
|
|
|
root: dict = {}
|
|
for doc_id, tags in result:
|
|
if not tags:
|
|
continue
|
|
seen_ancestors: set[str] = set()
|
|
for tag in tags:
|
|
if not isinstance(tag, str) or not tag.startswith(LIBRARY_PREFIX):
|
|
continue
|
|
path = tag[len(LIBRARY_PREFIX):]
|
|
parts = path.split("/")
|
|
node = root
|
|
for i, part in enumerate(parts):
|
|
if part not in node:
|
|
node[part] = {"_docs": set(), "_children": {}}
|
|
ancestor_key = "/".join(parts[: i + 1])
|
|
if ancestor_key not in seen_ancestors:
|
|
node[part]["_docs"].add(doc_id)
|
|
seen_ancestors.add(ancestor_key)
|
|
node = node[part]["_children"]
|
|
|
|
def build_library_tree(d: dict, prefix: str = "") -> list[dict]:
|
|
nodes = []
|
|
for name, data in sorted(d.items()):
|
|
if name.startswith("_"):
|
|
continue
|
|
path = f"{prefix}/{name}" if prefix else name
|
|
children = build_library_tree(data["_children"], path)
|
|
nodes.append({
|
|
"name": name,
|
|
"path": path,
|
|
"count": len(data["_docs"]),
|
|
"children": children,
|
|
})
|
|
return nodes
|
|
|
|
return build_library_tree(root)
|
|
|
|
|
|
@router.get("/library", response_model=DocumentListResponse)
|
|
async def list_library_documents(
|
|
user: Annotated[User, Depends(get_current_user)],
|
|
session: Annotated[AsyncSession, Depends(get_session)],
|
|
path: str | None = None,
|
|
q: str | None = None,
|
|
sort: str = Query("updated_desc"),
|
|
page: int = Query(1, ge=1),
|
|
page_size: int = Query(20, ge=1, le=100),
|
|
facet_company: str | None = None,
|
|
facet_topic: str | None = None,
|
|
facet_year: int | None = None,
|
|
facet_doctype: str | None = None,
|
|
):
|
|
"""자료실 문서 목록 (prefix match, title 검색, facet 필터, 정렬)"""
|
|
from sqlalchemy import text as sql_text
|
|
|
|
from core.library import LIBRARY_PREFIX, normalize_library_path
|
|
|
|
# path 쿼리 정규화 (PATCH와 동일 semantics)
|
|
if path:
|
|
try:
|
|
path = normalize_library_path(path)
|
|
except ValueError as e:
|
|
raise HTTPException(status_code=400, detail=str(e))
|
|
|
|
query = select(Document).where(
|
|
Document.deleted_at == None, # noqa: E711
|
|
)
|
|
|
|
if path:
|
|
exact = f"{LIBRARY_PREFIX}{path}"
|
|
prefix = f"{LIBRARY_PREFIX}{path}/%"
|
|
query = query.where(
|
|
sql_text("""
|
|
EXISTS (
|
|
SELECT 1 FROM jsonb_array_elements_text(documents.user_tags) AS t
|
|
WHERE t = :exact OR t LIKE :prefix
|
|
)
|
|
""").bindparams(exact=exact, prefix=prefix)
|
|
)
|
|
else:
|
|
query = query.where(
|
|
sql_text("""
|
|
EXISTS (
|
|
SELECT 1 FROM jsonb_array_elements_text(documents.user_tags) AS t
|
|
WHERE t LIKE '@library/%'
|
|
)
|
|
""")
|
|
)
|
|
|
|
if q:
|
|
query = query.where(Document.title.ilike(f"%{q}%"))
|
|
|
|
# facet 필터
|
|
if facet_company:
|
|
query = query.where(Document.facet_company == facet_company)
|
|
if facet_topic:
|
|
query = query.where(Document.facet_topic == facet_topic)
|
|
if facet_year:
|
|
query = query.where(Document.facet_year == facet_year)
|
|
if facet_doctype:
|
|
query = query.where(Document.facet_doctype == facet_doctype)
|
|
|
|
# 전체 건수
|
|
count_query = select(func.count()).select_from(query.subquery())
|
|
total = (await session.execute(count_query)).scalar()
|
|
|
|
# 정렬
|
|
sort_map = {
|
|
"updated_desc": Document.updated_at.desc(),
|
|
"title_asc": Document.title.asc(),
|
|
"created_desc": Document.created_at.desc(),
|
|
}
|
|
query = query.order_by(sort_map.get(sort, Document.updated_at.desc()))
|
|
query = query.offset((page - 1) * page_size).limit(page_size)
|
|
result = await session.execute(query)
|
|
items = result.scalars().all()
|
|
|
|
return DocumentListResponse(
|
|
items=[DocumentResponse.model_validate(doc) for doc in items],
|
|
total=total,
|
|
page=page,
|
|
page_size=page_size,
|
|
)
|
|
|
|
|
|
@router.get("/", response_model=DocumentListResponse)
|
|
async def list_documents(
|
|
user: Annotated[User, Depends(get_current_user)],
|
|
session: Annotated[AsyncSession, Depends(get_session)],
|
|
page: int = Query(1, ge=1),
|
|
page_size: int = Query(20, ge=1, le=500),
|
|
domain: str | None = None,
|
|
sub_group: str | None = None,
|
|
source: str | None = None,
|
|
format: str | None = None,
|
|
review_status: str | None = Query(None, description="pending | approved | rejected"),
|
|
):
|
|
"""문서 목록 조회 (페이지네이션 + 필터, 뉴스/메모 제외)"""
|
|
query = select(Document).where(
|
|
Document.deleted_at == None, # noqa: E711
|
|
Document.source_channel != "news",
|
|
Document.file_type != "note",
|
|
)
|
|
|
|
if domain:
|
|
# prefix 매칭: Industrial_Safety 클릭 시 하위 전부 포함
|
|
query = query.where(Document.ai_domain.startswith(domain))
|
|
if source:
|
|
query = query.where(Document.source_channel == source)
|
|
if format:
|
|
query = query.where(Document.file_format == format)
|
|
if review_status:
|
|
query = query.where(Document.review_status == review_status)
|
|
|
|
# 전체 건수
|
|
count_query = select(func.count()).select_from(query.subquery())
|
|
total = (await session.execute(count_query)).scalar()
|
|
|
|
# 페이지네이션
|
|
query = query.order_by(Document.created_at.desc())
|
|
query = query.offset((page - 1) * page_size).limit(page_size)
|
|
result = await session.execute(query)
|
|
items = result.scalars().all()
|
|
|
|
return DocumentListResponse(
|
|
items=[DocumentResponse.model_validate(doc) for doc in items],
|
|
total=total,
|
|
page=page,
|
|
page_size=page_size,
|
|
)
|
|
|
|
|
|
@router.get("/{doc_id}", response_model=DocumentResponse)
|
|
async def get_document(
|
|
doc_id: int,
|
|
user: Annotated[User, Depends(get_current_user)],
|
|
session: Annotated[AsyncSession, Depends(get_session)],
|
|
):
|
|
"""문서 단건 조회"""
|
|
doc = await session.get(Document, doc_id)
|
|
if not doc or doc.deleted_at is not None:
|
|
raise HTTPException(status_code=404, detail="문서를 찾을 수 없습니다")
|
|
return DocumentResponse.model_validate(doc)
|
|
|
|
|
|
@router.get("/{doc_id}/file")
|
|
async def get_document_file(
|
|
doc_id: int,
|
|
session: Annotated[AsyncSession, Depends(get_session)],
|
|
token: str | None = Query(None, description="Bearer token (iframe용)"),
|
|
download: bool = Query(False, description="true면 attachment (브라우저 다운로드)"),
|
|
user: User | None = Depends(lambda: None),
|
|
):
|
|
"""문서 원본 파일 서빙 (Bearer 헤더 또는 ?token= 쿼리 파라미터)"""
|
|
from core.auth import decode_token
|
|
|
|
# 쿼리 파라미터 토큰 검증
|
|
if token:
|
|
payload = decode_token(token)
|
|
if not payload or payload.get("type") != "access":
|
|
raise HTTPException(status_code=401, detail="유효하지 않은 토큰")
|
|
else:
|
|
# 일반 Bearer 헤더 인증 시도
|
|
raise HTTPException(status_code=401, detail="토큰이 필요합니다")
|
|
|
|
doc = await session.get(Document, doc_id)
|
|
if not doc:
|
|
raise HTTPException(status_code=404, detail="문서를 찾을 수 없습니다")
|
|
|
|
# note(메모)는 물리 파일이 없음
|
|
if not doc.file_path:
|
|
raise HTTPException(status_code=404, detail="파일이 없는 문서입니다 (메모)")
|
|
|
|
file_path = Path(settings.nas_mount_path) / doc.file_path
|
|
if not file_path.exists():
|
|
raise HTTPException(status_code=404, detail="파일을 찾을 수 없습니다")
|
|
|
|
# 미디어 타입 매핑
|
|
media_types = {
|
|
".pdf": "application/pdf",
|
|
".jpg": "image/jpeg", ".jpeg": "image/jpeg",
|
|
".png": "image/png", ".gif": "image/gif",
|
|
".bmp": "image/bmp", ".tiff": "image/tiff",
|
|
".svg": "image/svg+xml",
|
|
".txt": "text/plain", ".md": "text/plain",
|
|
".html": "text/html", ".csv": "text/csv",
|
|
".json": "application/json", ".xml": "application/xml",
|
|
}
|
|
suffix = file_path.suffix.lower()
|
|
media_type = media_types.get(suffix, "application/octet-stream")
|
|
|
|
# Content-Disposition: download=true면 attachment (한글 filename* 호환)
|
|
if download:
|
|
raw_title = doc.title or f"document-{doc_id}"
|
|
ascii_fallback = raw_title.encode("ascii", "replace").decode()
|
|
utf8_encoded = quote(f"{raw_title}{suffix}")
|
|
disposition = f'attachment; filename="{ascii_fallback}{suffix}"; filename*=UTF-8\'\'{utf8_encoded}'
|
|
else:
|
|
disposition = "inline"
|
|
|
|
return FileResponse(
|
|
path=str(file_path),
|
|
media_type=media_type,
|
|
headers={"Content-Disposition": disposition},
|
|
)
|
|
|
|
|
|
@router.post("/", response_model=DocumentResponse, status_code=201)
|
|
async def upload_document(
|
|
request: Request,
|
|
file: UploadFile,
|
|
user: Annotated[User, Depends(get_current_user)],
|
|
session: Annotated[AsyncSession, Depends(get_session)],
|
|
doc_purpose: str | None = Form(None, description="business | knowledge"),
|
|
library_path: str | None = Form(None, description="자료실 경로 (자동 @library/ 태깅)"),
|
|
facet_company: str | None = Form(None),
|
|
facet_topic: str | None = Form(None),
|
|
facet_year: int | None = Form(None),
|
|
facet_doctype: str | None = Form(None),
|
|
):
|
|
"""파일 업로드 → Inbox 저장 + DB 등록 + 처리 큐 등록.
|
|
|
|
Size 한도: `settings.upload.max_bytes` (authoritative).
|
|
- Content-Length 사전 차단 (slack_ratio 여유) → 413
|
|
- 스트리밍 누적 검사 (Content-Length 위조 방어) → 413
|
|
- 0바이트 파일은 400 reject
|
|
- 파일 저장 완료 후에만 DB 레코드 생성 (고아 레코드 방지)
|
|
- 예외 발생 시 partial file cleanup
|
|
"""
|
|
from core.library import DEFAULT_LIBRARY_PATH, LIBRARY_PREFIX, normalize_library_path
|
|
|
|
max_bytes = settings.upload.max_bytes
|
|
slack_ratio = settings.upload.content_length_slack_ratio
|
|
chunk_size = settings.upload.stream_chunk_bytes
|
|
|
|
# ── 사전 검증 (스트리밍 IO 시작 전) ──
|
|
|
|
# Content-Length 사전 차단 (multipart 오버헤드 여유 반영)
|
|
content_length_header = request.headers.get("content-length")
|
|
if content_length_header:
|
|
try:
|
|
cl = int(content_length_header)
|
|
if cl > int(max_bytes * slack_ratio):
|
|
raise HTTPException(status_code=413, detail="파일이 너무 큽니다")
|
|
except ValueError:
|
|
pass # 잘못된 헤더는 스트리밍 단계에서 max_bytes 로 차단
|
|
|
|
# doc_purpose 검증
|
|
if doc_purpose is not None:
|
|
doc_purpose = doc_purpose.strip().lower()
|
|
if doc_purpose == "":
|
|
doc_purpose = None
|
|
elif doc_purpose not in ("business", "knowledge"):
|
|
raise HTTPException(status_code=400, detail="doc_purpose는 business 또는 knowledge만 가능")
|
|
|
|
# library_path 검증 + 정규화
|
|
library_tag = None
|
|
if library_path:
|
|
try:
|
|
normalized = normalize_library_path(library_path)
|
|
library_tag = f"{LIBRARY_PREFIX}{normalized}"
|
|
except ValueError as e:
|
|
raise HTTPException(status_code=400, detail=f"잘못된 자료실 경로: {e}")
|
|
|
|
# 자료실 업로드인데 경로 미지정 → 미분류 자동 태깅
|
|
if doc_purpose == "business" and not library_tag:
|
|
library_tag = f"{LIBRARY_PREFIX}{DEFAULT_LIBRARY_PATH}"
|
|
|
|
if not file.filename:
|
|
raise HTTPException(status_code=400, detail="파일명이 필요합니다")
|
|
|
|
# 파일명 정규화 (경로 이탈 방지)
|
|
safe_name = Path(file.filename).name
|
|
if not safe_name or safe_name.startswith("."):
|
|
raise HTTPException(status_code=400, detail="유효하지 않은 파일명")
|
|
|
|
# ── 대상 경로 결정 ──
|
|
|
|
inbox_dir = Path(settings.nas_mount_path) / "PKM" / "Inbox"
|
|
inbox_dir.mkdir(parents=True, exist_ok=True)
|
|
target = (inbox_dir / safe_name).resolve()
|
|
|
|
# Inbox 하위 경로 검증
|
|
if not str(target).startswith(str(inbox_dir.resolve())):
|
|
raise HTTPException(status_code=400, detail="잘못된 파일 경로")
|
|
|
|
# 중복 파일명 처리
|
|
counter = 1
|
|
stem, suffix = target.stem, target.suffix
|
|
while target.exists():
|
|
target = inbox_dir.resolve() / f"{stem}_{counter}{suffix}"
|
|
counter += 1
|
|
|
|
# ── 스트리밍 저장 + 누적 size 검사 ──
|
|
|
|
written = 0
|
|
try:
|
|
with target.open("wb") as f:
|
|
while chunk := await file.read(chunk_size):
|
|
written += len(chunk)
|
|
if written > max_bytes:
|
|
raise HTTPException(status_code=413, detail="파일이 너무 큽니다")
|
|
f.write(chunk)
|
|
# with 블록 종료 시 자동 flush + close
|
|
if written == 0:
|
|
raise HTTPException(status_code=400, detail="빈 파일은 업로드할 수 없습니다")
|
|
except Exception:
|
|
# partial file cleanup. KeyboardInterrupt/SystemExit 등 BaseException 계열은 잡지 않음.
|
|
target.unlink(missing_ok=True)
|
|
raise
|
|
|
|
# ── 파일 저장 완료 후에만 hash + DB 레코드 ──
|
|
|
|
rel_path = str(target.relative_to(Path(settings.nas_mount_path)))
|
|
fhash = file_hash(target)
|
|
ext = target.suffix.lstrip(".").lower() or "unknown"
|
|
|
|
try:
|
|
doc = Document(
|
|
file_path=rel_path,
|
|
file_hash=fhash,
|
|
file_format=ext,
|
|
file_size=written,
|
|
file_type="immutable",
|
|
title=target.stem,
|
|
source_channel="manual",
|
|
doc_purpose=doc_purpose,
|
|
user_tags=[library_tag] if library_tag else [],
|
|
facet_company=facet_company or None,
|
|
facet_topic=facet_topic or None,
|
|
facet_year=facet_year,
|
|
facet_doctype=facet_doctype or None,
|
|
)
|
|
session.add(doc)
|
|
await session.flush()
|
|
# document + processing_queue 는 단일 트랜잭션으로 묶어 원자적 정리
|
|
await enqueue_stage(session, doc.id, "extract")
|
|
await session.commit()
|
|
except Exception:
|
|
# DB 예외 시 session 은 get_session 컨텍스트 종료로 자동 rollback.
|
|
# 파일시스템 자원은 DB 와 분리된 자원이므로 명시적 unlink.
|
|
target.unlink(missing_ok=True)
|
|
raise
|
|
|
|
return DocumentResponse.model_validate(doc)
|
|
|
|
|
|
@router.patch("/{doc_id}", response_model=DocumentResponse)
|
|
async def update_document(
|
|
doc_id: int,
|
|
body: DocumentUpdate,
|
|
user: Annotated[User, Depends(get_current_user)],
|
|
session: Annotated[AsyncSession, Depends(get_session)],
|
|
):
|
|
"""문서 메타데이터 수정 (수동 오버라이드)"""
|
|
from core.library import validate_user_tags
|
|
|
|
doc = await session.get(Document, doc_id)
|
|
if not doc:
|
|
raise HTTPException(status_code=404, detail="문서를 찾을 수 없습니다")
|
|
|
|
update_data = body.model_dump(exclude_unset=True)
|
|
|
|
# user_tags 검증: @library/ 경로 정규화 + 타입/중복 체크
|
|
if "user_tags" in update_data and update_data["user_tags"] is not None:
|
|
try:
|
|
update_data["user_tags"] = validate_user_tags(update_data["user_tags"])
|
|
except (TypeError, ValueError) as e:
|
|
raise HTTPException(status_code=400, detail=str(e))
|
|
|
|
# doc_purpose 검증
|
|
if "doc_purpose" in update_data:
|
|
val = update_data["doc_purpose"]
|
|
if val is not None and val not in ("business", "knowledge"):
|
|
raise HTTPException(status_code=400, detail="doc_purpose는 business 또는 knowledge만 가능")
|
|
|
|
for field, value in update_data.items():
|
|
setattr(doc, field, value)
|
|
doc.updated_at = datetime.now(timezone.utc)
|
|
await session.commit()
|
|
|
|
return DocumentResponse.model_validate(doc)
|
|
|
|
|
|
@router.put("/{doc_id}/content")
|
|
async def save_document_content(
|
|
doc_id: int,
|
|
user: Annotated[User, Depends(get_current_user)],
|
|
session: Annotated[AsyncSession, Depends(get_session)],
|
|
body: dict = None,
|
|
):
|
|
"""Markdown 원본 파일 저장 + extracted_text 갱신"""
|
|
doc = await session.get(Document, doc_id)
|
|
if not doc:
|
|
raise HTTPException(status_code=404, detail="문서를 찾을 수 없습니다")
|
|
|
|
if doc.file_format not in ("md", "txt"):
|
|
raise HTTPException(status_code=400, detail="편집 가능한 포맷이 아닙니다 (md, txt만 가능)")
|
|
|
|
# note(메모)는 /api/memos/{id} PATCH로 수정
|
|
if not doc.file_path:
|
|
raise HTTPException(status_code=400, detail="파일이 없는 문서입니다. 메모는 /api/memos 사용")
|
|
|
|
content = body.get("content", "") if body else ""
|
|
file_path = Path(settings.nas_mount_path) / doc.file_path
|
|
file_path.write_text(content, encoding="utf-8")
|
|
|
|
# 메타 갱신
|
|
doc.file_size = len(content.encode("utf-8"))
|
|
doc.file_hash = file_hash(file_path)
|
|
doc.extracted_text = content[:15000]
|
|
doc.updated_at = datetime.now(timezone.utc)
|
|
await session.commit()
|
|
|
|
return DocumentResponse.model_validate(doc)
|
|
|
|
|
|
@router.get("/{doc_id}/preview")
|
|
async def get_document_preview(
|
|
doc_id: int,
|
|
session: Annotated[AsyncSession, Depends(get_session)],
|
|
token: str | None = Query(None, description="Bearer token (iframe용)"),
|
|
download: bool = Query(False, description="true면 attachment (PDF 다운로드)"),
|
|
):
|
|
"""PDF 미리보기 캐시 서빙"""
|
|
from core.auth import decode_token
|
|
|
|
if token:
|
|
payload = decode_token(token)
|
|
if not payload or payload.get("type") != "access":
|
|
raise HTTPException(status_code=401, detail="유효하지 않은 토큰")
|
|
else:
|
|
raise HTTPException(status_code=401, detail="토큰이 필요합니다")
|
|
|
|
doc = await session.get(Document, doc_id)
|
|
if not doc:
|
|
raise HTTPException(status_code=404, detail="문서를 찾을 수 없습니다")
|
|
|
|
preview_path = Path(settings.nas_mount_path) / "PKM" / ".preview" / f"{doc_id}.pdf"
|
|
if not preview_path.exists():
|
|
raise HTTPException(status_code=404, detail="미리보기가 아직 생성되지 않았습니다")
|
|
|
|
if download:
|
|
raw_title = doc.title or f"document-{doc_id}"
|
|
ascii_fallback = raw_title.encode("ascii", "replace").decode()
|
|
utf8_encoded = quote(f"{raw_title}.pdf")
|
|
disposition = f'attachment; filename="{ascii_fallback}.pdf"; filename*=UTF-8\'\'{utf8_encoded}'
|
|
else:
|
|
disposition = "inline"
|
|
|
|
return FileResponse(
|
|
path=str(preview_path),
|
|
media_type="application/pdf",
|
|
headers={"Content-Disposition": disposition},
|
|
)
|
|
|
|
|
|
@router.delete("/{doc_id}")
|
|
async def delete_document(
|
|
doc_id: int,
|
|
user: Annotated[User, Depends(get_current_user)],
|
|
session: Annotated[AsyncSession, Depends(get_session)],
|
|
delete_file: bool = Query(False, description="NAS 파일도 함께 삭제"),
|
|
):
|
|
"""문서 삭제 (기본: DB만 삭제, 파일 유지)"""
|
|
doc = await session.get(Document, doc_id)
|
|
if not doc:
|
|
raise HTTPException(status_code=404, detail="문서를 찾을 수 없습니다")
|
|
|
|
# soft-delete (물리 파일은 cleanup job에서 나중에 정리)
|
|
doc.deleted_at = datetime.now(timezone.utc)
|
|
await session.commit()
|
|
|
|
return {"message": f"문서 {doc_id} soft-delete 완료"}
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
@router.get("/{doc_id}/content")
|
|
async def get_document_content(
|
|
doc_id: int,
|
|
user: Annotated[User, Depends(get_current_user)],
|
|
session: Annotated[AsyncSession, Depends(get_session)],
|
|
):
|
|
"""문서 전문 텍스트 반환 (서비스 호출용)."""
|
|
doc = await session.get(Document, doc_id)
|
|
if not doc:
|
|
raise HTTPException(status_code=404, detail="문서를 찾을 수 없습니다")
|
|
|
|
raw_text = doc.extracted_text or ""
|
|
content = raw_text[:15000]
|
|
truncated = len(raw_text) > 15000
|
|
logger.info("document.content read doc_id=%s user=%s", doc_id, getattr(user, "username", "unknown"))
|
|
|
|
return {
|
|
"id": doc.id,
|
|
"title": doc.title,
|
|
"domain": doc.ai_domain,
|
|
"sub_group": doc.ai_sub_group,
|
|
"document_type": doc.document_type,
|
|
"ai_summary": doc.ai_summary,
|
|
"ai_tags": doc.ai_tags,
|
|
"content": content,
|
|
"content_length": len(raw_text),
|
|
"truncated": truncated,
|
|
}
|
|
|
|
|
|
# ─── Phase D.5: 문서 분석 (/{doc_id}/analyze) ───
|
|
|
|
ANALYZE_PROMPT = (
|
|
_load_prompt("document_analyze.txt")
|
|
if (Path(__file__).parent.parent / "prompts" / "document_analyze.txt").exists()
|
|
else ""
|
|
)
|
|
|
|
ANALYZE_TEXT_LIMIT = 12000 # chars (15000 → 12000, 실측 timeout 빈발)
|
|
ANALYZE_TIMEOUT_S = 60 # 15,000자 입력 + 4층 출력. 실측 7~45초, safety margin 포함
|
|
ANALYZE_CACHE_TTL_S = 1800 # 30분
|
|
ANALYZE_CACHE_MAXSIZE = 100
|
|
ANALYZE_LAYER_MIN_CHARS = 50 # 이 미만이면 억지 채움으로 보고 제거
|
|
_ANALYZE_LAYER_SKIP_MARKERS = (
|
|
"해당 없음", "정보 없음", "n/a", "na",
|
|
"없음", "없습니다", "not applicable",
|
|
)
|
|
|
|
# 인메모리 LRU (FIFO, synthesis_service 패턴 참조)
|
|
_analyze_cache: dict[str, tuple["AnalyzeResponse", float]] = {}
|
|
|
|
|
|
class AnalysisLayer(BaseModel):
|
|
layer: Literal["evidence", "explanation", "examples", "summary"]
|
|
title: str
|
|
content: str
|
|
|
|
|
|
class AnalyzeResponse(BaseModel):
|
|
id: int
|
|
title: str | None
|
|
layers: list[AnalysisLayer]
|
|
elapsed_ms: float
|
|
truncated: bool
|
|
cached: bool
|
|
|
|
|
|
def _analyze_cache_key(doc_id: int, updated_at: datetime | None, created_at: datetime) -> str:
|
|
"""캐시 키 = doc_id + updated_at (없으면 created_at)"""
|
|
ts = updated_at or created_at
|
|
return f"{doc_id}:{ts.isoformat()}"
|
|
|
|
|
|
def _analyze_cache_get(key: str) -> "AnalyzeResponse | None":
|
|
entry = _analyze_cache.get(key)
|
|
if entry is None:
|
|
return None
|
|
result, stored_at = entry
|
|
if time.time() - stored_at > ANALYZE_CACHE_TTL_S:
|
|
_analyze_cache.pop(key, None)
|
|
return None
|
|
return result
|
|
|
|
|
|
def _analyze_cache_set(key: str, result: "AnalyzeResponse") -> None:
|
|
if len(_analyze_cache) >= ANALYZE_CACHE_MAXSIZE and key not in _analyze_cache:
|
|
try:
|
|
oldest = next(iter(_analyze_cache))
|
|
_analyze_cache.pop(oldest, None)
|
|
except StopIteration:
|
|
pass
|
|
_analyze_cache[key] = (result, time.time())
|
|
|
|
|
|
def _is_skip_content(content: str) -> bool:
|
|
"""'해당 없음' 계열 문구 판정 (억지 채움 제거용)."""
|
|
stripped = content.strip().lower()
|
|
if not stripped:
|
|
return True
|
|
for marker in _ANALYZE_LAYER_SKIP_MARKERS:
|
|
if stripped == marker or stripped.startswith(marker):
|
|
return True
|
|
return False
|
|
|
|
|
|
@router.post("/{doc_id}/analyze", response_model=AnalyzeResponse)
|
|
async def analyze_document(
|
|
doc_id: int,
|
|
user: Annotated[User, Depends(get_current_user)],
|
|
session: Annotated[AsyncSession, Depends(get_session)],
|
|
background_tasks: BackgroundTasks,
|
|
x_source: Annotated[str | None, Header(alias="X-Source")] = None,
|
|
) -> AnalyzeResponse:
|
|
"""문서 전문을 Gemma 4로 구조화 분석. 층(근거/해설/사례/요약) 중 해당 없는 것은 생략.
|
|
|
|
Phase E.2: analyze_events 로깅. try/finally로 성공/에러 모두 background insert.
|
|
X-Source 헤더로 호출자 식별 (document_server / synology_chat / ui_search / ui_detail / eval).
|
|
"""
|
|
t_start = time.perf_counter()
|
|
source = sanitize_source(x_source)
|
|
# telemetry 변수 (try/finally에서 참조)
|
|
truncated_flag = False
|
|
cached_flag = False
|
|
layers_returned: list[str] = []
|
|
error_code: str | None = None
|
|
|
|
try:
|
|
# 1. 문서 조회
|
|
doc = await session.get(Document, doc_id)
|
|
if not doc:
|
|
error_code = "not_found"
|
|
raise HTTPException(status_code=404, detail="문서를 찾을 수 없습니다")
|
|
|
|
# 2. 텍스트 확보
|
|
raw_text = doc.extracted_text or ""
|
|
if not raw_text.strip():
|
|
error_code = "no_text"
|
|
raise HTTPException(status_code=404, detail="텍스트 추출 미완료")
|
|
|
|
truncated_flag = len(raw_text) > ANALYZE_TEXT_LIMIT
|
|
doc_text = raw_text[:ANALYZE_TEXT_LIMIT]
|
|
|
|
# 3. 캐시 확인 (키: doc_id + updated_at/created_at)
|
|
cache_key = _analyze_cache_key(doc_id, doc.updated_at, doc.created_at)
|
|
cached = _analyze_cache_get(cache_key)
|
|
if cached is not None:
|
|
logger.info("document.analyze cache_hit doc_id=%s user=%s", doc_id, getattr(user, "username", "?"))
|
|
cached_flag = True
|
|
layers_returned = [la.layer for la in cached.layers]
|
|
truncated_flag = cached.truncated
|
|
return AnalyzeResponse(
|
|
id=cached.id,
|
|
title=cached.title,
|
|
layers=cached.layers,
|
|
elapsed_ms=(time.perf_counter() - t_start) * 1000,
|
|
truncated=cached.truncated,
|
|
cached=True,
|
|
)
|
|
|
|
# 4. 프롬프트 구성
|
|
if not ANALYZE_PROMPT:
|
|
error_code = "llm" # 설정 오류도 llm 범주
|
|
raise HTTPException(status_code=500, detail="분석 프롬프트 미설치")
|
|
prompt = ANALYZE_PROMPT.replace("{document_title}", doc.title or "").replace(
|
|
"{document_text}", doc_text
|
|
)
|
|
|
|
# 5. LLM 호출 (MLX gate + timeout 안쪽)
|
|
ai_client = AIClient()
|
|
raw: str | None = None
|
|
try:
|
|
async with get_mlx_gate():
|
|
async with asyncio.timeout(ANALYZE_TIMEOUT_S):
|
|
raw = await ai_client._call_chat(ai_client.ai.primary, prompt)
|
|
except asyncio.TimeoutError:
|
|
logger.warning("document.analyze timeout doc_id=%s", doc_id)
|
|
error_code = "timeout"
|
|
raise HTTPException(status_code=504, detail="분석 시간이 초과되었습니다")
|
|
except Exception as exc:
|
|
logger.warning("document.analyze llm_error doc_id=%s err=%s", doc_id, type(exc).__name__)
|
|
error_code = "llm"
|
|
raise HTTPException(status_code=502, detail="AI 서버 일시 오류")
|
|
finally:
|
|
try:
|
|
await ai_client.close()
|
|
except Exception:
|
|
pass
|
|
|
|
# 6. JSON 파싱
|
|
parsed = parse_json_response(raw or "")
|
|
if not isinstance(parsed, dict):
|
|
logger.warning("document.analyze parse_failed doc_id=%s raw_preview=%s", doc_id, (raw or "")[:200])
|
|
error_code = "parse"
|
|
raise HTTPException(status_code=422, detail="분석 결과 파싱 실패")
|
|
|
|
# 7. 층 검증 + 억지 채움 제거
|
|
raw_layers = parsed.get("layers") or []
|
|
if not isinstance(raw_layers, list):
|
|
error_code = "parse"
|
|
raise HTTPException(status_code=422, detail="분석 결과 형식 오류")
|
|
|
|
layer_titles = {
|
|
"evidence": "근거",
|
|
"explanation": "해설",
|
|
"examples": "사례",
|
|
"summary": "요약",
|
|
}
|
|
valid_layers: list[AnalysisLayer] = []
|
|
seen_layers: set[str] = set()
|
|
for item in raw_layers:
|
|
if not isinstance(item, dict):
|
|
continue
|
|
layer_type = item.get("layer")
|
|
content = (item.get("content") or "").strip()
|
|
if layer_type not in layer_titles:
|
|
continue
|
|
if layer_type in seen_layers:
|
|
continue
|
|
if len(content) < ANALYZE_LAYER_MIN_CHARS:
|
|
continue
|
|
if _is_skip_content(content):
|
|
continue
|
|
valid_layers.append(
|
|
AnalysisLayer(
|
|
layer=layer_type, # type: ignore[arg-type]
|
|
title=item.get("title") or layer_titles[layer_type],
|
|
content=content,
|
|
)
|
|
)
|
|
seen_layers.add(layer_type)
|
|
|
|
if not valid_layers or "summary" not in seen_layers:
|
|
logger.warning("document.analyze missing_summary doc_id=%s layers=%s", doc_id, seen_layers)
|
|
error_code = "missing_summary"
|
|
raise HTTPException(status_code=422, detail="분석 결과에 요약이 없습니다")
|
|
|
|
# 8. 응답 + 캐시 저장
|
|
elapsed_ms = (time.perf_counter() - t_start) * 1000
|
|
result = AnalyzeResponse(
|
|
id=doc.id,
|
|
title=doc.title,
|
|
layers=valid_layers,
|
|
elapsed_ms=elapsed_ms,
|
|
truncated=truncated_flag,
|
|
cached=False,
|
|
)
|
|
_analyze_cache_set(cache_key, result)
|
|
layers_returned = [la.layer for la in valid_layers]
|
|
|
|
logger.info(
|
|
"document.analyze ok doc_id=%s user=%s layers=%d elapsed_ms=%.0f",
|
|
doc_id,
|
|
getattr(user, "username", "?"),
|
|
len(valid_layers),
|
|
elapsed_ms,
|
|
)
|
|
return result
|
|
finally:
|
|
# Phase E.2: 모든 exit (성공/cache/에러) 에서 analyze_events INSERT
|
|
latency_ms = int((time.perf_counter() - t_start) * 1000)
|
|
background_tasks.add_task(
|
|
record_analyze_event,
|
|
doc_id=doc_id,
|
|
user_id=getattr(user, "id", None),
|
|
mode="quick",
|
|
text_limit=ANALYZE_TEXT_LIMIT,
|
|
truncated=truncated_flag,
|
|
layers_returned=layers_returned,
|
|
cached=cached_flag,
|
|
latency_ms=latency_ms,
|
|
model_name=resolve_primary_model(),
|
|
prompt_version=ANALYZE_PROMPT_VERSION,
|
|
error_code=error_code,
|
|
source=source,
|
|
)
|