Files
hyungi_document_server/app/api/documents.py
hyungi 63be005c6f fix(security): 보안 위생 5건 — library admin 게이트·edit_url SSRF·보안헤더·8080 바인드·하드코딩 비번 제거
M3 library.py: categories POST/PATCH/DELETE + facets POST 를 get_current_user→require_admin
(공유 분류 CRUD 를 17주체→admin 한정, news/digest 패턴 정합).
M1 documents.py: update_document PATCH 에 edit_url validate_feed_url 가드 — 내부/메타데이터 주소
후속 fetch(fulltext_worker) latent SSRF 차단(API 레이어 무방비 해소, news.py 동형).
Caddyfile: 보안 헤더(nosniff·X-Frame SAMEORIGIN·Referrer-Policy·-Server). HSTS 는 edge 소관.
compose: caddy 8080:80 0.0.0.0→127.0.0.1 (LAN 우회 차단, 실 ingress=home-caddy→caddy:80 도커망).
scripts: 하드코딩 죽은 DB 비번 → os.environ (1차 감사 누락분, .env 한정 점검이 놓침).

별도(DB): test-% 계정 12개 비활성화 (공유풀 주체 17→5, 랜덤해시라 비번노출 아님·위생).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-20 05:48:02 +00:00

1804 lines
72 KiB
Python

"""문서 CRUD API"""
import asyncio
import logging
import shutil
import time
from datetime import datetime, timezone
from pathlib import Path
from typing import Annotated, Literal
from urllib.parse import quote
from fastapi import (
APIRouter,
BackgroundTasks,
Depends,
Form,
Header,
HTTPException,
Query,
Request,
UploadFile,
status,
)
from fastapi.responses import FileResponse, StreamingResponse
from pydantic import BaseModel, field_validator
from sqlalchemy import func, select
from sqlalchemy.ext.asyncio import AsyncSession
from starlette.requests import ClientDisconnect
from ai.client import AIClient, _load_prompt, parse_json_response
from core.auth import get_current_user
from core.config import settings
from core.database import async_session, get_session
from core.utils import file_hash
from models.document import Document
from models.document_image import DocumentImage
from models.queue import ProcessingQueue, enqueue_stage
from models.user import User
from services.dedup import (
DUPLICATE_GROUPS_SQL,
DEDUP_OFF_CHANNELS,
find_canonical_for_hash,
find_near_duplicates,
)
from services.storage import StorageNotConfigured, get_storage_backend
from services.document_telemetry import record_analyze_event, sanitize_source
from services.prompt_versions import ANALYZE_PROMPT_VERSION, resolve_primary_model
from services.search.llm_gate import Priority, acquire_mlx_gate
router = APIRouter()
logger = logging.getLogger(__name__)
def _upload_error(status_code: int, error_code: str, message: str) -> HTTPException:
"""업로드 실패 응답. detail 은 객체 — 프론트가 error_code 로 분기.
error_code 종류:
body_too_large — Content-Length 또는 스트리밍 누적이 max_bytes 초과 (413)
upload_timeout — 서버 read timeout (408)
network_abort — 클라이언트 abort / 연결 끊김 (499)
empty_file — 0바이트 (400)
invalid_input — 파일명/경로/필드 검증 실패 (400)
unsupported_codec — 웹 업로드에서 direct-play 불가 비디오 (400, §3 video)
internal — 그 외 알 수 없는 에러 (500)
"""
return HTTPException(
status_code=status_code,
detail={"error_code": error_code, "message": message},
)
async def get_live_document(session: AsyncSession, doc_id: int) -> Document:
"""soft-delete(deleted_at) 가드 포함 문서 조회 — 없거나 삭제됐으면 404 (R7).
조회/수정 경로는 deleted_at 을 일관 가드하나 파일/콘텐츠 서빙 엔드포인트가 누락 →
삭제 문서의 원본/preview/전문이 doc_id(+유효 토큰)만으로 노출되던 비대칭. '경로마다
deleted_at 기억'에 의존하지 않게 헬퍼로 구조 강제(추가될 서빙 경로도 자동 보호).
"""
doc = await session.get(Document, doc_id)
if not doc or doc.deleted_at is not None:
raise HTTPException(status_code=404, detail="문서를 찾을 수 없습니다")
return doc
async def _near_dup_scan_bg(doc_id: int) -> None:
"""B-3: post-upload near_duplicate 스캔 (BackgroundTask). 자체 세션, best-effort.
업로드 직후엔 doc.embedding 이 아직 없을 수 있어(embed stage 미완) trigram 후보만
기록되는 경우가 많다 — non-gating. 어떤 예외도 업로드 결과(201)에 영향 주지 않는다.
영속화는 보류(on-the-fly) — 현재는 로깅까지. /duplicates 의 near-dup 노출은 phase2.
"""
try:
async with async_session() as bg_session:
findings = await find_near_duplicates(bg_session, doc_id)
if findings:
top = findings[0]
logger.info(
"[dedup] near_dup_scan doc=%s candidates=%d top=%s(cosine=%s)",
doc_id, len(findings), top["doc_id"], top.get("cosine"),
)
except Exception:
logger.warning("[dedup] near_dup_scan failed doc=%s", doc_id, exc_info=True)
def _parse_byte_range(range_header: str | None, size: int) -> tuple[int | None, int | None]:
"""HTTP Range 헤더(`bytes=start-end`) 파싱 → (start, end) inclusive. 없거나 무효면 (None, None).
D-2 원격 백엔드 Range pass-through 용 (local 은 FileResponse 가 자동 처리). suffix 형식
(`bytes=-N`) 도 지원. 다중 range 는 첫 구간만.
"""
if not range_header or not range_header.startswith("bytes=") or size <= 0:
return None, None
spec = range_header[len("bytes="):].split(",")[0].strip()
if "-" not in spec:
return None, None
lo, hi = spec.split("-", 1)
try:
if lo == "": # suffix range: 마지막 N 바이트
n = int(hi)
if n <= 0:
return None, None
return max(0, size - n), size - 1
start = int(lo)
end = int(hi) if hi else size - 1
except ValueError:
return None, None
if start > end or start >= size:
return None, None
return start, min(end, size - 1)
# ─── 스키마 ───
class DocumentResponse(BaseModel):
id: int
file_path: str | None
file_format: str
file_size: int | None
file_type: str
title: str | None
ai_domain: str | None
ai_sub_group: str | None
ai_tags: list | None
ai_summary: str | None
document_type: str | None
importance: str | None
ai_confidence: float | None
user_note: str | None
user_tags: list | None
pinned: bool | None
ask_includable: bool | None
derived_path: str | None
original_format: str | None
conversion_status: str | None
is_read: bool | None
review_status: str | None
edit_url: str | None
preview_status: str | None
source_channel: str | None
data_origin: str | None
doc_purpose: str | None
facet_company: str | None = None
facet_topic: str | None = None
facet_year: int | None = None
facet_doctype: str | None = None
category: str | None = None
ai_suggestion: dict | None = None
# PR-B B-1: summary_triage (4B) / summary_deep (26B) 분할 산출
ai_tldr: str | None = None
ai_bullets: list | None = None
ai_detail_summary: str | None = None
ai_inconsistencies: list | None = None
ai_analysis_tier: str | None = None # 'triage' | 'deep' | null
extracted_at: datetime | None
ai_processed_at: datetime | None
embedded_at: datetime | None
created_at: datetime
updated_at: datetime
# 회독 추적 (자료실 등) — 현재 사용자 기준. 다른 endpoint 응답에선 0/None.
read_count: int = 0
last_read_at: datetime | None = None
# S1-ADD (migration 287): 원본 파일명 + 중복검사. 앱은 옵셔널 디코딩, 없으면 폴백.
original_filename: str | None = None # 다운로드 라벨용. 없으면 file_path basename 폴백(앱 측).
duplicate_of: int | None = None # canonical doc id (자기 자신이 canonical 이면 None).
duplicate_count: int = 0 # 본인 제외 동일 판정 사본 수 (canonical 행 기준).
class Config:
from_attributes = True
class DocumentListResponse(BaseModel):
items: list[DocumentResponse]
total: int
page: int
page_size: int
class DocumentDetailResponse(DocumentResponse):
"""단건 조회 전용 — 본문(extracted_text)·canonical markdown 동봉.
리스트 응답은 페이로드 비대화 회피로 DocumentResponse 만 사용.
"""
extracted_text: str | None = None
md_content: str | None = None
md_frontmatter: dict | None = None
md_status: str | None = None
md_extraction_quality: dict | None = None
md_extraction_error: str | None = None
md_extraction_engine: str | None = None
md_extraction_engine_version: str | None = None
md_generated_at: datetime | None = None
@field_validator("md_status", mode="before")
@classmethod
def _db_success_to_completed(cls, v: str | None) -> str | None:
"""DB CHECK enum 은 'success'; 계약/fixture·앱 MD-first 렌더 트리거는 'completed'.
read-time(DB→API) 단방향 매핑만 — write 경로(ORM)는 이 모델을 거치지 않아 미적용.
pending/processing/partial/failed/skipped 는 양쪽 동일하므로 'success' 만 매핑한다.
(불변식: md_status ∈ {success,partial} ⟹ md_content 非공백 = 워커 postcondition, C-5.)
"""
return "completed" if v == "success" else v
class AcceptSuggestionRequest(BaseModel):
"""§1 accept-suggestion 요청 body — stale payload / doc 수정 검출.
jurisdiction: 안전 자료실 A-2 — material_type 제안 승인 시 사용자가 지정하는 관할.
law 승인은 필수 (기본값 없음 — KR 자동 부여 시 외국 자료가 KR 법령으로 오염되는
경로를 차단, plan A-2 계약).
"""
expected_source_updated_at: datetime
jurisdiction: str | None = None
class DocumentUpdate(BaseModel):
title: str | None = None
ai_domain: str | None = None
ai_sub_group: str | None = None
ai_tags: list | None = None
user_tags: list | None = None
user_note: str | None = None
is_read: bool | None = None
edit_url: str | None = None
source_channel: str | None = None
data_origin: str | None = None
doc_purpose: str | None = None
pinned: bool | None = None
facet_company: str | None = None
facet_topic: str | None = None
facet_year: int | None = None
facet_doctype: str | None = None
# ─── 스키마 (트리) ───
class TreeNode(BaseModel):
name: str
path: str
count: int
children: list["TreeNode"]
# ─── 엔드포인트 ───
@router.get("/tree")
async def get_document_tree(
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
):
"""도메인 트리 (3단계 경로 파싱, 사이드바용)"""
from sqlalchemy import text as sql_text
result = await session.execute(
sql_text("""
SELECT ai_domain, COUNT(*)
FROM documents
WHERE ai_domain IS NOT NULL AND ai_domain != '' AND ai_domain != 'News'
AND deleted_at IS NULL
-- 문서함(list) 기본 제외와 동일하게 맞춤: 뉴스/법령 채널·메모는 문서함에 안 뜨므로
-- 트리 카운트도 제외해야 "트리 N건인데 클릭하면 0건" 불일치가 안 생긴다.
AND source_channel != 'news'
AND source_channel != 'law_monitor'
AND file_type != 'note'
GROUP BY ai_domain
ORDER BY ai_domain
""")
)
# 경로를 트리로 파싱
root: dict = {}
for domain_path, count in result:
parts = domain_path.split("/")
node = root
for part in parts:
if part not in node:
node[part] = {"_count": 0, "_children": {}}
node[part]["_count"] += count
node = node[part]["_children"]
def build_tree(d: dict, prefix: str = "") -> list[dict]:
nodes = []
for name, data in sorted(d.items()):
path = f"{prefix}/{name}" if prefix else name
children = build_tree(data["_children"], path)
nodes.append({
"name": name,
"path": path,
"count": data["_count"],
"children": children,
})
return nodes
return build_tree(root)
@router.get("/library-tree")
async def get_library_tree(
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
):
"""자료실 트리 (user_tags @library/ 경로 기반, unique doc count)"""
from core.library import LIBRARY_PREFIX
result = await session.execute(
select(Document.id, Document.user_tags).where(
Document.deleted_at == None, # noqa: E711
Document.user_tags != None, # noqa: E711
)
)
root: dict = {}
for doc_id, tags in result:
if not tags:
continue
seen_ancestors: set[str] = set()
for tag in tags:
if not isinstance(tag, str) or not tag.startswith(LIBRARY_PREFIX):
continue
path = tag[len(LIBRARY_PREFIX):]
parts = path.split("/")
node = root
for i, part in enumerate(parts):
if part not in node:
node[part] = {"_docs": set(), "_children": {}}
ancestor_key = "/".join(parts[: i + 1])
if ancestor_key not in seen_ancestors:
node[part]["_docs"].add(doc_id)
seen_ancestors.add(ancestor_key)
node = node[part]["_children"]
def build_library_tree(d: dict, prefix: str = "") -> list[dict]:
nodes = []
for name, data in sorted(d.items()):
if name.startswith("_"):
continue
path = f"{prefix}/{name}" if prefix else name
children = build_library_tree(data["_children"], path)
nodes.append({
"name": name,
"path": path,
"count": len(data["_docs"]),
"children": children,
})
return nodes
return build_library_tree(root)
@router.get("/library", response_model=DocumentListResponse)
async def list_library_documents(
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
path: str | None = None,
q: str | None = None,
sort: str = Query("updated_desc"),
page: int = Query(1, ge=1),
page_size: int = Query(20, ge=1, le=100),
facet_company: str | None = None,
facet_topic: str | None = None,
facet_year: int | None = None,
facet_doctype: str | None = None,
unread: bool = Query(False, description="true: 현재 사용자 회독 0건만"),
):
"""자료실 문서 목록 (category='library' 기반, prefix match, facet 필터, 정렬).
`unread=true` 시 현재 사용자가 1번도 회독 안 한 documents 만.
"""
from sqlalchemy import text as sql_text
from core.library import LIBRARY_PREFIX, normalize_library_path
from models.document_read import DocumentRead
if path:
try:
path = normalize_library_path(path)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
query = select(Document).where(
Document.deleted_at == None, # noqa: E711
Document.category == "library",
)
# 안 본 자료만 — 현재 사용자의 read 가 없는 documents
if unread:
read_subq = (
select(DocumentRead.document_id)
.where(DocumentRead.user_id == user.id)
.scalar_subquery()
)
query = query.where(Document.id.notin_(read_subq))
if path:
exact = f"{LIBRARY_PREFIX}{path}"
prefix = f"{LIBRARY_PREFIX}{path}/%"
query = query.where(
sql_text("""
EXISTS (
SELECT 1 FROM jsonb_array_elements_text(documents.user_tags) AS t
WHERE t = :exact OR t LIKE :prefix
)
""").bindparams(exact=exact, prefix=prefix)
)
if q:
query = query.where(Document.title.ilike(f"%{q}%"))
# facet 필터
if facet_company:
query = query.where(Document.facet_company == facet_company)
if facet_topic:
query = query.where(Document.facet_topic == facet_topic)
if facet_year:
query = query.where(Document.facet_year == facet_year)
if facet_doctype:
query = query.where(Document.facet_doctype == facet_doctype)
# 전체 건수
count_query = select(func.count()).select_from(query.subquery())
total = (await session.execute(count_query)).scalar()
# 정렬
sort_map = {
"updated_desc": Document.updated_at.desc(),
"title_asc": Document.title.asc(),
"created_desc": Document.created_at.desc(),
}
query = query.order_by(sort_map.get(sort, Document.updated_at.desc()))
query = query.offset((page - 1) * page_size).limit(page_size)
result = await session.execute(query)
items = result.scalars().all()
# 회독 통계 한 번에 fetch (현재 페이지 N건 한정 — N+1 회피).
# DocumentRead 는 함수 상단에서 이미 import.
read_map: dict[int, tuple[int, datetime | None]] = {}
if items:
doc_ids = [d.id for d in items]
rs = await session.execute(
select(
DocumentRead.document_id,
func.count(DocumentRead.id),
func.max(DocumentRead.read_at),
)
.where(
DocumentRead.user_id == user.id,
DocumentRead.document_id.in_(doc_ids),
)
.group_by(DocumentRead.document_id)
)
for did, cnt, last in rs:
read_map[did] = (int(cnt or 0), last)
def _to_resp(doc):
resp = DocumentResponse.model_validate(doc)
cnt, last = read_map.get(doc.id, (0, None))
resp.read_count = cnt
resp.last_read_at = last
return resp
return DocumentListResponse(
items=[_to_resp(doc) for doc in items],
total=total,
page=page,
page_size=page_size,
)
# ─── Section 2: 카테고리 집계 (Sidebar / Dashboard) ───
#
# documents.category (§1 에서 추가) 가 1차 진입점. 이 엔드포인트는 Sidebar 배지 및
# /dashboard 카테고리 카드 용. ai_suggestion.proposed_category='library' 인
# 승인 대기 건수는 /library 의 pending 배지로 별도 표시.
@router.get("/stats/category-counts")
async def get_category_counts(
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
):
"""카테고리별 문서 건수 + 승인 대기 (library 제안) 건수.
Response:
{
"counts": { "document": 640, "library": 12, "news": 311, ... },
"library_pending_suggestions": 17
}
- 전제: §1 의 documents.category enum + ai_suggestion JSONB 가 이미 적용됨
- category IS NULL 인 문서는 counts 에서 제외 (§1 백필 전 드문 상태)
"""
from sqlalchemy import text as sql_text
count_rows = await session.execute(
sql_text("""
SELECT category::text AS category, COUNT(*) AS cnt
FROM documents
WHERE deleted_at IS NULL
AND category IS NOT NULL
GROUP BY category
""")
)
counts: dict[str, int] = {row.category: row.cnt for row in count_rows}
pending_scalar = (
await session.execute(
sql_text("""
SELECT COUNT(*)
FROM documents
WHERE deleted_at IS NULL
AND ai_suggestion IS NOT NULL
AND ai_suggestion->>'proposed_category' = 'library'
""")
)
).scalar()
return {
"counts": counts,
"library_pending_suggestions": int(pending_scalar or 0),
}
@router.get("/", response_model=DocumentListResponse)
async def list_documents(
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
page: int = Query(1, ge=1),
page_size: int = Query(20, ge=1, le=500),
domain: str | None = None,
sub_group: str | None = None,
source: str | None = None,
format: str | None = None,
review_status: str | None = Query(None, description="pending | approved | rejected"),
category: str | None = Query(None, description="doc_category enum — 지정 시 기본 news/memo 제외 해제"),
has_suggestion: bool | None = Query(None, description="true: ai_suggestion IS NOT NULL"),
proposed_category: str | None = Query(None, description="ai_suggestion.proposed_category 필터"),
material_type: str | None = Query(None, description="안전 자료실 C-1: 자료유형. 지정 시 기본 exclude 해제"),
jurisdiction: str | None = Query(None, description="안전 자료실 C-1: 관할 (KR/US/...)"),
):
"""문서 목록 조회 (페이지네이션 + 필터).
기본은 뉴스/메모 제외. `category` 지정 시 해당 카테고리만 반환 (기본 제외 해제).
§2 승인 UI 용: `has_suggestion=true&proposed_category=library` 조합.
"""
query = select(Document).where(
Document.deleted_at == None, # noqa: E711
)
if category:
# 명시적 카테고리 필터 — 기본 exclude 해제
query = query.where(Document.category == category)
elif material_type:
# 안전 자료실 C-1: material_type 지정 = 기본 exclude(news·law_monitor·note) 해제.
# 안전 코퍼스 본체(KOSHA 사례·CSB·법령 등)가 전부 note/crawl 채널이라 exclude 면 빈 화면.
query = query.where(Document.material_type == material_type)
else:
# 기본 목록: 뉴스/메모/법령 제외 (문서함 용도)
query = query.where(
Document.source_channel != "news",
Document.source_channel != "law_monitor",
Document.file_type != "note",
)
if jurisdiction:
query = query.where(Document.jurisdiction == jurisdiction)
if has_suggestion is True:
query = query.where(Document.ai_suggestion.isnot(None))
elif has_suggestion is False:
query = query.where(Document.ai_suggestion.is_(None))
if proposed_category:
# ai_suggestion JSONB 의 proposed_category 값 매칭
query = query.where(
Document.ai_suggestion["proposed_category"].astext == proposed_category
)
if domain:
# prefix 매칭: Industrial_Safety 클릭 시 하위 전부 포함
query = query.where(Document.ai_domain.startswith(domain))
if source:
query = query.where(Document.source_channel == source)
if format:
query = query.where(Document.file_format == format)
if review_status:
query = query.where(Document.review_status == review_status)
# 전체 건수
count_query = select(func.count()).select_from(query.subquery())
total = (await session.execute(count_query)).scalar()
# 페이지네이션
query = query.order_by(Document.created_at.desc())
query = query.offset((page - 1) * page_size).limit(page_size)
result = await session.execute(query)
items = result.scalars().all()
return DocumentListResponse(
items=[DocumentResponse.model_validate(doc) for doc in items],
total=total,
page=page,
page_size=page_size,
)
# ─── 중복검사 (dedup) — B-2 ───
# ★ 고정 path 라우트(/duplicates)는 동적 /{doc_id} 라우트보다 *위*에 등록해야 매칭 충돌이 없다.
class DuplicateGroup(BaseModel):
canonical_id: int
members: list[int]
reason: str
detail: str | None = None
class DuplicatesResponse(BaseModel):
groups: list[DuplicateGroup]
total_groups: int
total_duplicate_docs: int
@router.get("/duplicates", response_model=DuplicatesResponse)
async def list_duplicates(
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
):
"""content_hash(= file_hash exact) 중복 그룹 목록.
OFF-whitelist(law_monitor) 제외 + deleted 제외. idx_documents_hash 재사용(신규 인덱스/테이블 불요).
near_duplicate(유사도 기반) 그룹은 영속화 보류 → S1 은 exact 그룹만 노출(계약 shape 동일,
detail 문구만 'file_hash' 기준). 응답 shape = ds-app contract `documents_duplicates.json`.
"""
rows = (
await session.execute(DUPLICATE_GROUPS_SQL, {"off_channels": list(DEDUP_OFF_CHANNELS)})
).all()
groups = [
DuplicateGroup(
canonical_id=r.canonical_id,
members=list(r.members),
reason="content_hash",
detail="동일 file_hash (원본 바이트 SHA-256 일치)",
)
for r in rows
]
return DuplicatesResponse(
groups=groups,
total_groups=len(groups),
# 사본 수 = 그룹별 (멤버수-1) 합 (canonical 제외) — fixture total_duplicate_docs 정의와 동일.
total_duplicate_docs=sum(len(g.members) - 1 for g in groups),
)
class ClauseHit(BaseModel):
doc_id: int
doc_title: str
section_title: str | None = None
char_start: int | None = None
chunk_id: int
node_type: str | None = None
class ClauseLookupResponse(BaseModel):
label: str
hits: list[ClauseHit]
# NOTE: '/{doc_id}' (int path param) 라우트보다 먼저 선언해야 '/clause-lookup' 이 doc_id 로
# 잘못 매칭되지 않는다 (FastAPI 선언 순서 매칭). 이동 금지.
@router.get("/clause-lookup", response_model=ClauseLookupResponse)
async def clause_lookup(
label: str,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
):
"""절 식별자(예: UG-79)로 크로스-doc 절 위치 조회 — 'UG-79 보여줘' 진입점 (U-1).
절(node_type=clause/clause_split)은 in_corpus=false(검색 비활성)라 의미검색으론 못 찾으므로,
라벨 prefix 정확매칭으로 (doc, char_start) 를 직접 해소해 읽기뷰 점프를 가능케 한다.
대부분 1건; 부록(A-/E-/F-) 등 doc 간 공유 라벨만 다중 반환(에디션 선택). /sections 와 동일하게
document_chunks 직접 조회 — corpus_chunks 우회는 retrieval 아닌 정확지목이므로 의도적 예외.
"""
from sqlalchemy import text as sql_text
lab = (label or "").strip()
if not lab:
return ClauseLookupResponse(label=label, hits=[])
rows = (
await session.execute(
sql_text(
"""
SELECT c.doc_id, d.title AS doc_title, c.section_title, c.char_start, c.node_type,
-- 점프 타깃 = outline(/sections: is_leaf 또는 %_split)에 있는 chunk 여야 딥링크 동작.
-- 자신이 그러면 자신, 아니면(컨테이너 절: 자식 heading 보유·is_leaf=false) 문서순서상
-- 자신 이후 첫 딥링크 가능 chunk(=그 절 내용 시작)로 해소. 그래도 없으면 자신(폴백).
COALESCE(
CASE WHEN c.is_leaf = true OR c.node_type LIKE '%\\_split' ESCAPE '\\' THEN c.id END,
(SELECT ch.id FROM document_chunks ch
WHERE ch.doc_id = c.doc_id AND ch.source_type = 'hier_section'
AND ch.chunk_index >= c.chunk_index
AND (ch.is_leaf = true OR ch.node_type LIKE '%\\_split' ESCAPE '\\')
ORDER BY ch.chunk_index LIMIT 1),
c.id
) AS chunk_id
FROM document_chunks c
JOIN documents d ON d.id = c.doc_id
WHERE c.node_type IN ('clause', 'clause_split')
AND (c.section_title ILIKE :lab_sp OR c.section_title ILIKE :lab_eq)
AND d.deleted_at IS NULL
ORDER BY c.doc_id, c.char_start NULLS LAST
LIMIT 50
"""
).bindparams(lab_sp=lab + " %", lab_eq=lab)
)
).mappings().all()
return ClauseLookupResponse(label=lab, hits=[ClauseHit(**dict(r)) for r in rows])
@router.get("/{doc_id}", response_model=DocumentDetailResponse)
async def get_document(
doc_id: int,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
):
"""문서 단건 조회. 본문(extracted_text)·canonical markdown 동봉."""
doc = await session.get(Document, doc_id)
if not doc or doc.deleted_at is not None:
raise HTTPException(status_code=404, detail="문서를 찾을 수 없습니다")
return DocumentDetailResponse.model_validate(doc)
# ─── 절(hier section) 목차 + 요약 (PR-DocSrv-Hier-Section-UI-1) ───
class SectionItem(BaseModel):
chunk_id: int
section_title: str | None = None # raw 마크다운 포함 — 정제는 프런트(headingPath.ts)
heading_path: str | None = None # raw
level: int | None = None
node_type: str | None = None # window | chapter_split | clause_split | section_split | null
is_leaf: bool
parent_id: int | None = None # 트리 부모 chunk_id. window child 의 parent_id = 그 split-parent.
# 프런트 collapseWindows 가 비인접 window 를 split-parent 에 흡수할 때 사용.
char_start: int | None = None # md_content 내 heading offset(UTF-16). jump-target 만 값, 그 외 None (Path B)
text: str | None = None # 절 본문 = 청크 원문. 대형 split 문서는 md_content 가 앞 5만 자만 보존
# (marker LARGE_DOC_MD_CONTENT_HEAD_CHARS)이고 char_start 도 NULL 이라
# md_content 슬라이스로는 본문이 비므로, 청크 text 를 직접 렌더한다.
section_type: str | None = None
summary: str | None = None # status='summarized' 인 분석행에만, 그 외 None
confidence: float | None = None
class DocumentSectionsResponse(BaseModel):
doc_id: int
sections: list[SectionItem]
@router.get("/{doc_id}/sections", response_model=DocumentSectionsResponse)
async def get_document_sections(
doc_id: int,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
):
"""문서의 hier 절(leaf) 목차 + 절-레벨 요약(chunk_section_analysis).
⚠ 뷰 우회 — 의도적 예외 (변경 금지):
retrieval 경로(retrieval_service / *_rag)는 in_corpus=false 누출 방지를 위해
반드시 corpus_chunks 뷰만 본다. 그러나 이 endpoint 는 retrieval 이 아니라
"문서 전체 leaf 목차 표시"라서 in_corpus=false(검색 비활성) 절도 보여야 하므로
document_chunks 를 직접 조회한다. corpus_chunks 로 바꾸면 비활성 절이 목차에서
사라지는 회귀가 생기니 절대 바꾸지 말 것. (Hier-Decomp 코퍼스 격리 규율의 명시적 예외)
DISTINCT ON (c.id) + ORDER BY a.created_at/a.id DESC: chunk 당 최신 분석 1행만
(prompt_version 다중 시 중복 JOIN 방지). 절 없는 문서(legacy/news)는 sections=[].
"""
from sqlalchemy import text as sql_text
doc = await session.get(Document, doc_id)
if not doc or doc.deleted_at is not None:
raise HTTPException(status_code=404, detail="문서를 찾을 수 없습니다")
rows = (
await session.execute(
sql_text(
"""
SELECT chunk_id, section_title, heading_path, level, node_type, is_leaf, parent_id, char_start,
text, section_type, summary, confidence
FROM (
SELECT DISTINCT ON (c.id)
c.id AS chunk_id, c.chunk_index, c.section_title, c.heading_path,
c.level, c.node_type, c.is_leaf, c.parent_id, c.char_start, c.text,
a.section_type,
CASE WHEN a.status = 'summarized' THEN a.summary ELSE NULL END AS summary,
a.confidence
FROM document_chunks c
LEFT JOIN chunk_section_analysis a
ON a.chunk_id = c.id AND a.status = 'summarized'
WHERE c.doc_id = :doc_id
AND c.source_type = 'hier_section'
AND (c.is_leaf = true OR c.node_type LIKE '%\\_split' ESCAPE '\\')
ORDER BY c.id, a.created_at DESC, a.id DESC
) t
ORDER BY t.chunk_index
"""
).bindparams(doc_id=doc_id)
)
).mappings().all()
return DocumentSectionsResponse(
doc_id=doc_id,
sections=[SectionItem(**dict(r)) for r in rows],
)
# ─── 자료실 인접 자료 (이전/다음) ───
# 학습 흐름: 한 자료 다 읽으면 같은 챕터의 다음 자료로 자연스럽게 이동.
# library_path (정확 일치 + 하위 prefix) 안에서 title 오름차순 기준.
class NeighborItem(BaseModel):
id: int
title: str | None
class LibraryNeighborsResponse(BaseModel):
prev: NeighborItem | None
next: NeighborItem | None
path: str | None # 같은 path 내에서 계산된 결과
@router.get("/{doc_id}/library-neighbors", response_model=LibraryNeighborsResponse)
async def get_library_neighbors(
doc_id: int,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
):
"""현재 자료의 같은 library_path 안에서 이전/다음 자료. title_asc 정렬 기준.
library_path 추출: user_tags 의 첫 번째 `@library/...` 태그.
"""
from core.library import LIBRARY_PREFIX
doc = await session.get(Document, doc_id)
if not doc or doc.deleted_at is not None or doc.category != "library":
raise HTTPException(status_code=404, detail="자료실 자료가 아닙니다")
# 첫 번째 library 태그를 path 로
path: str | None = None
for t in (doc.user_tags or []):
if isinstance(t, str) and t.startswith(LIBRARY_PREFIX):
path = t[len(LIBRARY_PREFIX):]
break
if not path:
return LibraryNeighborsResponse(prev=None, next=None, path=None)
# 같은 path (정확히) 의 자료들 — title 오름차순.
# user_tags 는 JSONB. 다른 endpoint 와 일관되게 EXISTS + jsonb_array_elements_text 사용.
from sqlalchemy import text as sql_text
exact_tag = f"{LIBRARY_PREFIX}{path}"
res = await session.execute(
select(Document.id, Document.title)
.where(
Document.deleted_at == None, # noqa: E711
Document.category == "library",
sql_text("""
EXISTS (
SELECT 1 FROM jsonb_array_elements_text(documents.user_tags) AS t
WHERE t = :exact
)
""").bindparams(exact=exact_tag),
)
.order_by(Document.title.asc().nullslast(), Document.id.asc())
)
rows = list(res)
idx = next((i for i, r in enumerate(rows) if r.id == doc_id), -1)
prev_n = NeighborItem(id=rows[idx - 1].id, title=rows[idx - 1].title) if idx > 0 else None
next_n = NeighborItem(id=rows[idx + 1].id, title=rows[idx + 1].title) if 0 <= idx < len(rows) - 1 else None
return LibraryNeighborsResponse(prev=prev_n, next=next_n, path=path)
@router.get("/{doc_id}/file")
async def get_document_file(
doc_id: int,
session: Annotated[AsyncSession, Depends(get_session)],
token: str | None = Query(None, description="Bearer token (iframe용)"),
download: bool = Query(False, description="true면 attachment (브라우저 다운로드)"),
range_header: str | None = Header(None, alias="Range"),
user: User | None = Depends(lambda: None),
):
"""문서 원본 파일 서빙 (Bearer 헤더 또는 ?token= 쿼리 파라미터)"""
from core.auth import decode_token
# 쿼리 파라미터 토큰 검증
if token:
payload = decode_token(token)
if not payload or payload.get("type") != "access":
raise HTTPException(status_code=401, detail="유효하지 않은 토큰")
else:
# 일반 Bearer 헤더 인증 시도
raise HTTPException(status_code=401, detail="토큰이 필요합니다")
doc = await get_live_document(session, doc_id)
# note(메모)는 물리 파일이 없음
if not doc.file_path:
raise HTTPException(status_code=404, detail="파일이 없는 문서입니다 (메모)")
# D-2: 물리 경로 해석을 storage 백엔드로 단일화. local=FileResponse(Range 자동) /
# 원격=ABC.stream(range). /file URL·바디 shape 불변(non-breaking). 현재 활성 백엔드는
# LocalBackend only 라 동작 변경 0.
backend = get_storage_backend()
# 미디어 타입 매핑
# HTML5 <audio>/<video> 직접 재생을 위해 audio/video mime 포함. Starlette
# FileResponse 가 Range 헤더 자동 처리 → 영상 스트리밍 OK (§3).
media_types = {
".pdf": "application/pdf",
".jpg": "image/jpeg", ".jpeg": "image/jpeg",
".png": "image/png", ".gif": "image/gif",
".bmp": "image/bmp", ".tiff": "image/tiff",
".svg": "image/svg+xml",
".txt": "text/plain", ".md": "text/plain",
".html": "text/html", ".csv": "text/csv",
".json": "application/json", ".xml": "application/xml",
# 오디오
".mp3": "audio/mpeg", ".m4a": "audio/mp4",
".opus": "audio/ogg", ".ogg": "audio/ogg",
".wav": "audio/wav", ".flac": "audio/flac",
# 비디오 — direct play 호환 (§3 최소판)
".mp4": "video/mp4", ".webm": "video/webm",
}
suffix = Path(doc.file_path).suffix.lower()
media_type = media_types.get(suffix, "application/octet-stream")
# Content-Disposition: download=true면 attachment (한글 filename* 호환)
if download:
raw_title = doc.title or f"document-{doc_id}"
ascii_fallback = raw_title.encode("ascii", "replace").decode()
utf8_encoded = quote(f"{raw_title}{suffix}")
disposition = f'attachment; filename="{ascii_fallback}{suffix}"; filename*=UTF-8\'\'{utf8_encoded}'
else:
disposition = "inline"
# 로컬 백엔드: 기존과 동일하게 FileResponse (Range 자동 처리).
if backend.is_local:
local = backend.local_path(doc.file_path)
if local is None or not Path(local).exists():
raise HTTPException(status_code=404, detail="파일을 찾을 수 없습니다")
return FileResponse(
path=str(local),
media_type=media_type,
headers={"Content-Disposition": disposition},
)
# 원격 백엔드: D-1 ABC 의 Range pass-through. 미프로비전 백엔드는 stat() 가
# StorageNotConfigured → 503 (silent fallback 금지). 현재 LocalBackend only 라 미도달.
try:
st = await backend.stat(doc.file_path)
except StorageNotConfigured as exc:
raise HTTPException(status_code=503, detail=str(exc))
if not st.exists:
raise HTTPException(status_code=404, detail="파일을 찾을 수 없습니다")
start, end = _parse_byte_range(range_header, st.size)
headers = {"Content-Disposition": disposition, "Accept-Ranges": "bytes"}
if start is None:
headers["Content-Length"] = str(st.size)
status_code = 200
else:
headers["Content-Range"] = f"bytes {start}-{end}/{st.size}"
headers["Content-Length"] = str(end - start + 1)
status_code = 206
return StreamingResponse(
backend.stream(doc.file_path, start=start, end=end),
status_code=status_code,
media_type=media_type,
headers=headers,
)
@router.get("/{doc_id}/images/{image_key}/raw")
async def get_document_image_raw(
doc_id: int,
image_key: str,
session: Annotated[AsyncSession, Depends(get_session)],
token: str | None = Query(None, description="Bearer token (img 태그용)"),
):
"""marker 추출 이미지 raw bytes (Phase 1B.5).
md_content 안의 `![alt](docimg:img_NNN)` ref 를 frontend selector 가 이 라우트로 변환.
인증된 사용자만 응답 (단일 사용자 환경, ownership 컬럼 없음).
인증: `<img src=>` 는 Authorization header 를 못 보내므로 `?token=` 쿼리 파라미터
로 access token 을 전달 — 기존 `/{doc_id}/file?token=` 엔드포인트 (iframe 용) 와
동일 패턴.
"""
from core.auth import decode_token
if not token:
raise HTTPException(status_code=401, detail="토큰이 필요합니다")
payload = decode_token(token)
if not payload or payload.get("type") != "access":
raise HTTPException(status_code=401, detail="유효하지 않은 토큰")
# 문서 존재 확인 (image_key 만 있고 doc 가 사라진 케이스 차단 + soft-delete 가드)
doc = await get_live_document(session, doc_id)
img = await session.scalar(
select(DocumentImage).where(
DocumentImage.document_id == doc_id,
DocumentImage.image_key == image_key,
)
)
if img is None:
raise HTTPException(status_code=404, detail="이미지를 찾을 수 없습니다")
file_path = Path(img.file_path)
if not file_path.is_file():
raise HTTPException(status_code=410, detail="파일이 사라졌습니다")
return FileResponse(
str(file_path),
media_type=img.mime_type,
headers={
# 인증 라우트라 CDN/공용 cache 금지. 단일 사용자라 private + 1h 충분.
"Cache-Control": "private, max-age=3600",
"ETag": f'"{img.content_hash}"',
},
)
@router.post("/", response_model=DocumentResponse, status_code=201)
async def upload_document(
request: Request,
file: UploadFile,
background_tasks: BackgroundTasks,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
doc_purpose: str | None = Form(None, description="business | knowledge"),
library_path: str | None = Form(None, description="자료실 경로 (자동 @library/ 태깅)"),
facet_company: str | None = Form(None),
facet_topic: str | None = Form(None),
facet_year: int | None = Form(None),
facet_doctype: str | None = Form(None),
):
"""파일 업로드 → Inbox 저장 + DB 등록 + 처리 큐 등록.
Size 한도: `settings.upload.max_bytes` (authoritative).
- Content-Length 사전 차단 (slack_ratio 여유) → 413 body_too_large
- 스트리밍 누적 검사 (Content-Length 위조 방어) → 413 body_too_large
- 0바이트 파일은 400 empty_file reject
- 쓰기 중에는 `<name>.uploading` 임시명 → 완료 후 atomic rename.
→ 프로세스 크래시 시 잔존물은 cleanup_orphan_uploads 스케줄러가 수거.
- 클라이언트 abort (`ClientDisconnect`) → 499 network_abort + 임시파일 정리
- 파일 저장 완료 후에만 DB 레코드 생성 (고아 레코드 방지)
- 에러 응답 detail 은 `{error_code, message}` 객체 — 프론트가 코드별 분기.
"""
from core.library import DEFAULT_LIBRARY_PATH, LIBRARY_PREFIX, normalize_library_path
max_bytes = settings.upload.max_bytes
slack_ratio = settings.upload.content_length_slack_ratio
chunk_size = settings.upload.stream_chunk_bytes
# ── 사전 검증 (스트리밍 IO 시작 전) ──
# Content-Length 사전 차단 (multipart 오버헤드 여유 반영)
content_length_header = request.headers.get("content-length")
if content_length_header:
try:
cl = int(content_length_header)
if cl > int(max_bytes * slack_ratio):
raise _upload_error(413, "body_too_large", "파일이 너무 큽니다")
except ValueError:
pass # 잘못된 헤더는 스트리밍 단계에서 max_bytes 로 차단
# doc_purpose 검증
if doc_purpose is not None:
doc_purpose = doc_purpose.strip().lower()
if doc_purpose == "":
doc_purpose = None
elif doc_purpose not in ("business", "knowledge"):
raise _upload_error(400, "invalid_input", "doc_purpose는 business 또는 knowledge만 가능")
# library_path 검증 + 정규화
library_tag = None
if library_path:
try:
normalized = normalize_library_path(library_path)
library_tag = f"{LIBRARY_PREFIX}{normalized}"
except ValueError as e:
raise _upload_error(400, "invalid_input", f"잘못된 자료실 경로: {e}")
# 자료실 업로드인데 경로 미지정 → 미분류 자동 태깅
if doc_purpose == "business" and not library_tag:
library_tag = f"{LIBRARY_PREFIX}{DEFAULT_LIBRARY_PATH}"
if not file.filename:
raise _upload_error(400, "invalid_input", "파일명이 필요합니다")
# 파일명 정규화 (경로 이탈 방지)
safe_name = Path(file.filename).name
if not safe_name or safe_name.startswith("."):
raise _upload_error(400, "invalid_input", "유효하지 않은 파일명")
# §3: 웹 업로드는 direct-play 불가 비디오 거부 (NAS 드롭은 file_watcher 가
# quarantine 으로 수용). UploadDropzone 이 error_code='unsupported_codec' 로
# 배너 분기.
VIDEO_QUARANTINE_EXTS = {".mov", ".mkv", ".avi"}
if Path(safe_name).suffix.lower() in VIDEO_QUARANTINE_EXTS:
raise _upload_error(
status_code=400,
error_code="unsupported_codec",
message="브라우저에서 직접 재생 불가한 포맷입니다. mp4 (H.264/AAC) 또는 webm (VP9) 으로 변환 후 다시 올리세요.",
)
# ── 대상 경로 결정 ──
inbox_dir = Path(settings.nas_mount_path) / "PKM" / "Inbox"
inbox_dir.mkdir(parents=True, exist_ok=True)
target = (inbox_dir / safe_name).resolve()
# Inbox 하위 경로 검증
if not str(target).startswith(str(inbox_dir.resolve())):
raise _upload_error(400, "invalid_input", "잘못된 파일 경로")
# 중복 파일명 처리 — 최종 target 도, 임시 .uploading 파일도 모두 충돌 회피
counter = 1
stem, suffix = target.stem, target.suffix
staging = target.with_name(target.name + ".uploading")
while target.exists() or staging.exists():
target = inbox_dir.resolve() / f"{stem}_{counter}{suffix}"
staging = target.with_name(target.name + ".uploading")
counter += 1
# ── 스트리밍 저장 + 누적 size 검사 (`.uploading` 임시명) ──
written = 0
try:
with staging.open("wb") as f:
while chunk := await file.read(chunk_size):
written += len(chunk)
if written > max_bytes:
raise _upload_error(413, "body_too_large", "파일이 너무 큽니다")
f.write(chunk)
if written == 0:
raise _upload_error(400, "empty_file", "빈 파일은 업로드할 수 없습니다")
except ClientDisconnect:
staging.unlink(missing_ok=True)
logger.info("upload aborted by client: %s (written=%d)", safe_name, written)
# 499 = nginx 관용 (Client Closed Request). 응답 도달 가능성 낮지만 일관 형식 유지.
raise _upload_error(499, "network_abort", "업로드가 취소되었습니다")
except asyncio.TimeoutError:
staging.unlink(missing_ok=True)
logger.warning("upload timeout: %s (written=%d)", safe_name, written)
raise _upload_error(408, "upload_timeout", "업로드 시간 초과")
except HTTPException:
# _upload_error 가 만든 예외는 그대로 통과 + 임시 파일 정리
staging.unlink(missing_ok=True)
raise
except Exception:
staging.unlink(missing_ok=True)
logger.exception("upload internal error: %s (written=%d)", safe_name, written)
raise _upload_error(500, "internal", "업로드 처리 중 오류가 발생했습니다")
# ── 파일 저장 완료: atomic rename → 최종 경로 ──
try:
staging.replace(target)
except OSError:
staging.unlink(missing_ok=True)
logger.exception("upload rename failed: %s -> %s", staging, target)
raise _upload_error(500, "internal", "파일 저장 후 정리 중 오류가 발생했습니다")
# ── 파일 저장 완료 후에만 hash + DB 레코드 ──
rel_path = str(target.relative_to(Path(settings.nas_mount_path)))
fhash = file_hash(target)
ext = target.suffix.lstrip(".").lower() or "unknown"
try:
doc = Document(
file_path=rel_path,
file_hash=fhash,
file_format=ext,
file_size=written,
file_type="immutable",
title=target.stem,
# B-1: 업로드 원본 파일명(다운로드 라벨용). file_path 는 충돌 시 _N 리네임되므로
# 원본명을 별도 보존. safe_name = Path(file.filename).name (경로 이탈 제거된 basename).
original_filename=safe_name,
source_channel="manual",
doc_purpose=doc_purpose,
user_tags=[library_tag] if library_tag else [],
facet_company=facet_company or None,
facet_topic=facet_topic or None,
facet_year=facet_year,
facet_doctype=facet_doctype or None,
)
session.add(doc)
await session.flush()
# B-1: file_hash exact 중복 채움 (OFF-whitelist=law_monitor 제외). 거부(409) 아님 —
# 허용 + duplicate_of 링크 + canonical duplicate_count++ (법령 의도적 중복 보존 정책).
# 홈랩 저동시성이라 동시 동일-hash 업로드 TOCTOU 는 멱등/B-4 backfill 로 수습(락 불요).
canonical = await find_canonical_for_hash(session, fhash, exclude_id=doc.id)
if canonical is not None:
# 원래 canonical 이 soft-delete(deleted_at) 되어 former member 가 승격되면, 그 survivor 의
# stale duplicate_of 를 비워 'member 이자 counter' 모순을 막는다(B-4 불변식 유지). 문서는
# soft-delete only 라 FK ON DELETE SET NULL 이 발화하지 않아 잔여가 남기 때문(리뷰 발견).
# (삭제된 canonical 을 가리키는 다른 sibling 멤버의 잔여 포인터·overcount 는 야간
# dedup_reconcile 잡(B-4, 03:30 KST 멱등 절대 재계산)이 정리.)
if canonical.duplicate_of is not None:
canonical.duplicate_of = None
doc.duplicate_of = canonical.id
canonical.duplicate_count = (canonical.duplicate_count or 0) + 1
# document + processing_queue 는 단일 트랜잭션으로 묶어 원자적 정리.
# G2: 첫 stage=presegment (extract 前 번들 PDF 분할, 후보 A 검증완료 2026-06-18).
# 非PDF/단일은 presegment 가 무변 통과 → extract. 번들 PDF 만 N 자식 분할(worker-side gating).
await enqueue_stage(session, doc.id, "presegment")
await session.commit()
except Exception:
# DB 예외 시 session 은 get_session 컨텍스트 종료로 자동 rollback.
# 파일시스템 자원은 DB 와 분리된 자원이므로 명시적 unlink.
target.unlink(missing_ok=True)
raise
# B-3: near_duplicate 스캔은 post-upload 비동기 — 201 응답을 막지 않는다(non-gating 기록).
background_tasks.add_task(_near_dup_scan_bg, doc.id)
return DocumentResponse.model_validate(doc)
@router.patch("/{doc_id}", response_model=DocumentResponse)
async def update_document(
doc_id: int,
body: DocumentUpdate,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
):
"""문서 메타데이터 수정 (수동 오버라이드)"""
from core.library import validate_user_tags
doc = await session.get(Document, doc_id)
if not doc:
raise HTTPException(status_code=404, detail="문서를 찾을 수 없습니다")
update_data = body.model_dump(exclude_unset=True)
# user_tags 검증: @library/ 경로 정규화 + 타입/중복 체크
if "user_tags" in update_data and update_data["user_tags"] is not None:
try:
update_data["user_tags"] = validate_user_tags(update_data["user_tags"])
except (TypeError, ValueError) as e:
raise HTTPException(status_code=400, detail=str(e))
# doc_purpose 검증
if "doc_purpose" in update_data:
val = update_data["doc_purpose"]
if val is not None and val not in ("business", "knowledge"):
raise HTTPException(status_code=400, detail="doc_purpose는 business 또는 knowledge만 가능")
# edit_url SSRF 가드 (2026-06-20 M1): 내부/메타데이터 주소 후속 fetch 차단 (news.py 동형 검증)
if update_data.get("edit_url"):
from core.url_validator import validate_feed_url
try:
await asyncio.to_thread(validate_feed_url, update_data["edit_url"])
except Exception as e:
raise HTTPException(status_code=400, detail=f"edit_url 검증 실패: {e}")
for field, value in update_data.items():
setattr(doc, field, value)
doc.updated_at = datetime.now(timezone.utc)
await session.commit()
return DocumentResponse.model_validate(doc)
@router.post("/{doc_id}/accept-suggestion", response_model=DocumentResponse)
async def accept_suggestion(
doc_id: int,
body: AcceptSuggestionRequest,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
):
"""§1 AI suggestion 승인 — category / user_tags 전이 (idempotent + stale 검사).
- 200 (no-op): ai_suggestion 이 이미 NULL — 이전 승인/반려 후 중복 호출로 간주
- 200 (applied): payload 적용 + ai_suggestion 을 NULL 로 clear
- 409 Conflict: ai_suggestion.source_updated_at != expected
→ 새 classify 결과가 payload 를 덮어쓴 race 감지 (payload 교체)
Note: 원 plan 은 documents.updated_at != expected 로 "문서 전체 수정" 도 감지하려
했으나, SQLAlchemy onupdate 가 모든 워커 commit(classify/embed/...)에서 updated_at
을 bump 하므로 이 검사는 항상 fail → accept 영구 불가. payload 교체 검사 하나만으로
core race 는 막히고, 사용자 직접 편집 감지는 별도 user_updated_at 컬럼이 들어와야
의미 있다. 일단 payload key 검사만 유지.
"""
from sqlalchemy import select as sa_select
from core.library import validate_user_tags
# FOR UPDATE 로 동시 승인 race 방지
result = await session.execute(
sa_select(Document).where(Document.id == doc_id).with_for_update()
)
doc = result.scalar_one_or_none()
if not doc or doc.deleted_at is not None:
raise HTTPException(status_code=404, detail="문서를 찾을 수 없습니다")
if doc.ai_suggestion is None:
# idempotent no-op — 이미 처리됨 (2번째 POST / 반려 후 POST)
return DocumentResponse.model_validate(doc)
expected = body.expected_source_updated_at
# Stale 검사: payload 교체 감지 (새 classify 결과가 덮어쓴 경우)
raw_src = doc.ai_suggestion.get("source_updated_at")
suggestion_src = None
if isinstance(raw_src, str):
try:
suggestion_src = datetime.fromisoformat(raw_src)
except ValueError:
suggestion_src = None
if suggestion_src is None or suggestion_src != expected:
raise HTTPException(
status_code=409,
detail="제안 payload 가 교체되었습니다. 목록을 새로고침하세요.",
)
# payload 적용
proposed_category = doc.ai_suggestion.get("proposed_category")
proposed_path = doc.ai_suggestion.get("proposed_path")
# 안전 자료실 A-2 — material_type 제안 (classify 의 document_type 결정적 매핑)
proposed_material = doc.ai_suggestion.get("proposed_material_type")
if not proposed_category and not proposed_material:
raise HTTPException(
status_code=422,
detail="proposed_category/proposed_material_type 둘 다 누락된 suggestion",
)
if proposed_category:
doc.category = proposed_category
if proposed_material:
_MATERIAL_TYPES = {"law", "paper", "book", "incident", "manual", "standard", "guide"}
_JURISDICTIONS = {"KR", "US", "EU", "JP", "GB", "INT"}
if proposed_material not in _MATERIAL_TYPES:
raise HTTPException(
status_code=422, detail=f"허용 밖 material_type: {proposed_material}"
)
jur = body.jurisdiction or doc.ai_suggestion.get("proposed_jurisdiction")
if jur is not None and jur not in _JURISDICTIONS:
raise HTTPException(status_code=422, detail=f"허용 밖 jurisdiction: {jur}")
# law = 국가 필수 입력, 기본값 없음 (plan A-2 — KR 자동 부여 시 외국 법령 오염.
# DB CHECK(chk_documents_law_jurisdiction) 도 거부하지만 422 로 명시 안내).
if proposed_material == "law" and not jur:
raise HTTPException(
status_code=422,
detail="법령(law) 승인은 jurisdiction 필수 — body.jurisdiction 으로 국가를 지정하세요 (기본값 없음)",
)
doc.material_type = proposed_material
doc.jurisdiction = jur
# 미러 동기화 1문 — jurisdiction 부여/정정 시 청크 country 동반 UPDATE
# (leg 간 국가 불일치 방지, plan A-2 계약. 단일 지점 = 본 승인 경로).
if jur:
from sqlalchemy import update as sa_update
from models.chunk import DocumentChunk
await session.execute(
sa_update(DocumentChunk)
.where(DocumentChunk.doc_id == doc.id)
.values(country=jur)
)
# user_tags append (중복 방지, normalize + dedup 통과)
if proposed_path:
current_tags = list(doc.user_tags or [])
if proposed_path not in current_tags:
current_tags.append(proposed_path)
try:
doc.user_tags = validate_user_tags(current_tags)
except (TypeError, ValueError) as e:
raise HTTPException(
status_code=422, detail=f"proposed_path 태그 검증 실패: {e}"
)
doc.ai_suggestion = None
doc.updated_at = datetime.now(timezone.utc)
await session.commit()
await session.refresh(doc)
return DocumentResponse.model_validate(doc)
@router.delete("/{doc_id}/suggestion", status_code=204)
async def delete_suggestion(
doc_id: int,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
):
"""§1 AI suggestion 반려 — idempotent, stale 검사 없음.
철학: 승인은 보호, 반려는 단순. 사용자의 "이 제안 버려라" 최종 의사결정이므로
payload 가 바뀌어도 "버린다" 의도는 동일하게 유효.
"""
doc = await session.get(Document, doc_id)
if not doc or doc.deleted_at is not None:
raise HTTPException(status_code=404, detail="문서를 찾을 수 없습니다")
if doc.ai_suggestion is not None:
doc.ai_suggestion = None
doc.updated_at = datetime.now(timezone.utc)
await session.commit()
@router.put("/{doc_id}/content")
async def save_document_content(
doc_id: int,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
body: dict = None,
):
"""Markdown 원본 파일 저장 + extracted_text 갱신"""
# soft-delete 문서엔 쓰기 차단 (R7 — 삭제 문서 resurrect / NAS 재기록 방지)
doc = await get_live_document(session, doc_id)
if doc.file_format not in ("md", "txt"):
raise HTTPException(status_code=400, detail="편집 가능한 포맷이 아닙니다 (md, txt만 가능)")
# note(메모)는 /api/memos/{id} PATCH로 수정
if not doc.file_path:
raise HTTPException(status_code=400, detail="파일이 없는 문서입니다. 메모는 /api/memos 사용")
content = body.get("content", "") if body else ""
file_path = Path(settings.nas_mount_path) / doc.file_path
file_path.write_text(content, encoding="utf-8")
# 메타 갱신
doc.file_size = len(content.encode("utf-8"))
doc.file_hash = file_hash(file_path)
doc.extracted_text = content[:15000]
doc.updated_at = datetime.now(timezone.utc)
await session.commit()
return DocumentResponse.model_validate(doc)
@router.get("/{doc_id}/preview")
async def get_document_preview(
doc_id: int,
session: Annotated[AsyncSession, Depends(get_session)],
token: str | None = Query(None, description="Bearer token (iframe용)"),
download: bool = Query(False, description="true면 attachment (PDF 다운로드)"),
):
"""PDF 미리보기 캐시 서빙"""
from core.auth import decode_token
if token:
payload = decode_token(token)
if not payload or payload.get("type") != "access":
raise HTTPException(status_code=401, detail="유효하지 않은 토큰")
else:
raise HTTPException(status_code=401, detail="토큰이 필요합니다")
doc = await get_live_document(session, doc_id)
preview_path = Path(settings.nas_mount_path) / "PKM" / ".preview" / f"{doc_id}.pdf"
if not preview_path.exists():
raise HTTPException(status_code=404, detail="미리보기가 아직 생성되지 않았습니다")
if download:
raw_title = doc.title or f"document-{doc_id}"
ascii_fallback = raw_title.encode("ascii", "replace").decode()
utf8_encoded = quote(f"{raw_title}.pdf")
disposition = f'attachment; filename="{ascii_fallback}.pdf"; filename*=UTF-8\'\'{utf8_encoded}'
else:
disposition = "inline"
return FileResponse(
path=str(preview_path),
media_type="application/pdf",
headers={"Content-Disposition": disposition},
)
@router.delete("/{doc_id}")
async def delete_document(
doc_id: int,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
delete_file: bool = Query(False, description="NAS 원본도 삭제 (grace 후 retention sweep 이 물리삭제)"),
):
"""문서 삭제. 기본: soft-delete(숨김, 파일 보존). delete_file=true: purge 예약 (R7)."""
doc = await get_live_document(session, doc_id)
# soft-delete(숨김). delete_file=true 면 purge_requested_at 마커를 추가로 set —
# retention sweep cron(document_purge_sweep)이 grace(30일) 경과 후 NAS 원본 물리삭제
# + audit-log. ★일반 숨김(delete_file=false)은 파일 보존 = undelete 가능. sweep 는
# deleted_at 이 아니라 purge_requested_at 기준이라 단순 숨김이 영구삭제되지 않는다.
now = datetime.now(timezone.utc)
doc.deleted_at = now
if delete_file:
doc.purge_requested_at = now
await session.commit()
if delete_file:
return {"message": f"문서 {doc_id} 삭제 — NAS 원본은 30일 후 정리 예약"}
return {"message": f"문서 {doc_id} soft-delete 완료 (파일 보존)"}
@router.get("/{doc_id}/content")
async def get_document_content(
doc_id: int,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
):
"""문서 전문 텍스트 반환 (서비스 호출용)."""
doc = await get_live_document(session, doc_id)
raw_text = doc.extracted_text or ""
content = raw_text[:15000]
truncated = len(raw_text) > 15000
logger.info("document.content read doc_id=%s user=%s", doc_id, getattr(user, "username", "unknown"))
return {
"id": doc.id,
"title": doc.title,
"domain": doc.ai_domain,
"sub_group": doc.ai_sub_group,
"document_type": doc.document_type,
"ai_summary": doc.ai_summary,
"ai_tags": doc.ai_tags,
"content": content,
"content_length": len(raw_text),
"truncated": truncated,
}
# ─── Phase D.5: 문서 분석 (/{doc_id}/analyze) ───
ANALYZE_PROMPT = (
_load_prompt("document_analyze.txt")
if (Path(__file__).parent.parent / "prompts" / "document_analyze.txt").exists()
else ""
)
ANALYZE_TEXT_LIMIT = 12000 # chars (15000 → 12000, 실측 timeout 빈발)
ANALYZE_TIMEOUT_S = settings.llm_call_timeout_s # 2026-06-20 config 단일소스 (구 60s=빠른 Gemma)
ANALYZE_CACHE_TTL_S = 1800 # 30분
ANALYZE_CACHE_MAXSIZE = 100
ANALYZE_LAYER_MIN_CHARS = 50 # 이 미만이면 억지 채움으로 보고 제거
_ANALYZE_LAYER_SKIP_MARKERS = (
"해당 없음", "정보 없음", "n/a", "na",
"없음", "없습니다", "not applicable",
)
# 인메모리 LRU (FIFO, synthesis_service 패턴 참조)
_analyze_cache: dict[str, tuple["AnalyzeResponse", float]] = {}
class AnalysisLayer(BaseModel):
layer: Literal["evidence", "explanation", "examples", "summary"]
title: str
content: str
class AnalyzeResponse(BaseModel):
id: int
title: str | None
layers: list[AnalysisLayer]
elapsed_ms: float
truncated: bool
cached: bool
def _analyze_cache_key(doc_id: int, updated_at: datetime | None, created_at: datetime) -> str:
"""캐시 키 = doc_id + updated_at (없으면 created_at)"""
ts = updated_at or created_at
return f"{doc_id}:{ts.isoformat()}"
def _analyze_cache_get(key: str) -> "AnalyzeResponse | None":
entry = _analyze_cache.get(key)
if entry is None:
return None
result, stored_at = entry
if time.time() - stored_at > ANALYZE_CACHE_TTL_S:
_analyze_cache.pop(key, None)
return None
return result
def _analyze_cache_set(key: str, result: "AnalyzeResponse") -> None:
if len(_analyze_cache) >= ANALYZE_CACHE_MAXSIZE and key not in _analyze_cache:
try:
oldest = next(iter(_analyze_cache))
_analyze_cache.pop(oldest, None)
except StopIteration:
pass
_analyze_cache[key] = (result, time.time())
def _is_skip_content(content: str) -> bool:
"""'해당 없음' 계열 문구 판정 (억지 채움 제거용)."""
stripped = content.strip().lower()
if not stripped:
return True
for marker in _ANALYZE_LAYER_SKIP_MARKERS:
if stripped == marker or stripped.startswith(marker):
return True
return False
@router.post("/{doc_id}/analyze", response_model=AnalyzeResponse)
async def analyze_document(
doc_id: int,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
background_tasks: BackgroundTasks,
x_source: Annotated[str | None, Header(alias="X-Source")] = None,
) -> AnalyzeResponse:
"""문서 전문을 Gemma 4로 구조화 분석. 층(근거/해설/사례/요약) 중 해당 없는 것은 생략.
Phase E.2: analyze_events 로깅. try/finally로 성공/에러 모두 background insert.
X-Source 헤더로 호출자 식별 (document_server / synology_chat / ui_search / ui_detail / eval).
"""
t_start = time.perf_counter()
source = sanitize_source(x_source)
# telemetry 변수 (try/finally에서 참조)
truncated_flag = False
cached_flag = False
layers_returned: list[str] = []
error_code: str | None = None
try:
# 1. 문서 조회
doc = await session.get(Document, doc_id)
if not doc:
error_code = "not_found"
raise HTTPException(status_code=404, detail="문서를 찾을 수 없습니다")
# 2. 텍스트 확보
raw_text = doc.extracted_text or ""
if not raw_text.strip():
error_code = "no_text"
raise HTTPException(status_code=404, detail="텍스트 추출 미완료")
truncated_flag = len(raw_text) > ANALYZE_TEXT_LIMIT
doc_text = raw_text[:ANALYZE_TEXT_LIMIT]
# 3. 캐시 확인 (키: doc_id + updated_at/created_at)
cache_key = _analyze_cache_key(doc_id, doc.updated_at, doc.created_at)
cached = _analyze_cache_get(cache_key)
if cached is not None:
logger.info("document.analyze cache_hit doc_id=%s user=%s", doc_id, getattr(user, "username", "?"))
cached_flag = True
layers_returned = [la.layer for la in cached.layers]
truncated_flag = cached.truncated
return AnalyzeResponse(
id=cached.id,
title=cached.title,
layers=cached.layers,
elapsed_ms=(time.perf_counter() - t_start) * 1000,
truncated=cached.truncated,
cached=True,
)
# 4. 프롬프트 구성
if not ANALYZE_PROMPT:
error_code = "llm" # 설정 오류도 llm 범주
raise HTTPException(status_code=500, detail="분석 프롬프트 미설치")
prompt = ANALYZE_PROMPT.replace("{document_title}", doc.title or "").replace(
"{document_text}", doc_text
)
# 5. LLM 호출 (MLX gate + timeout 안쪽)
ai_client = AIClient()
raw: str | None = None
try:
async with acquire_mlx_gate(Priority.FOREGROUND):
async with asyncio.timeout(ANALYZE_TIMEOUT_S):
raw = await ai_client._call_chat(ai_client.ai.primary, prompt)
except asyncio.TimeoutError:
logger.warning("document.analyze timeout doc_id=%s", doc_id)
error_code = "timeout"
raise HTTPException(status_code=504, detail="분석 시간이 초과되었습니다")
except Exception as exc:
logger.warning("document.analyze llm_error doc_id=%s err=%s", doc_id, type(exc).__name__)
error_code = "llm"
raise HTTPException(status_code=502, detail="AI 서버 일시 오류")
finally:
try:
await ai_client.close()
except Exception:
pass
# 6. JSON 파싱
parsed = parse_json_response(raw or "")
if not isinstance(parsed, dict):
logger.warning("document.analyze parse_failed doc_id=%s raw_preview=%s", doc_id, (raw or "")[:200])
error_code = "parse"
raise HTTPException(status_code=422, detail="분석 결과 파싱 실패")
# 7. 층 검증 + 억지 채움 제거
raw_layers = parsed.get("layers") or []
if not isinstance(raw_layers, list):
error_code = "parse"
raise HTTPException(status_code=422, detail="분석 결과 형식 오류")
layer_titles = {
"evidence": "근거",
"explanation": "해설",
"examples": "사례",
"summary": "요약",
}
valid_layers: list[AnalysisLayer] = []
seen_layers: set[str] = set()
for item in raw_layers:
if not isinstance(item, dict):
continue
layer_type = item.get("layer")
content = (item.get("content") or "").strip()
if layer_type not in layer_titles:
continue
if layer_type in seen_layers:
continue
if len(content) < ANALYZE_LAYER_MIN_CHARS:
continue
if _is_skip_content(content):
continue
valid_layers.append(
AnalysisLayer(
layer=layer_type, # type: ignore[arg-type]
title=item.get("title") or layer_titles[layer_type],
content=content,
)
)
seen_layers.add(layer_type)
if not valid_layers or "summary" not in seen_layers:
logger.warning("document.analyze missing_summary doc_id=%s layers=%s", doc_id, seen_layers)
error_code = "missing_summary"
raise HTTPException(status_code=422, detail="분석 결과에 요약이 없습니다")
# 8. 응답 + 캐시 저장
elapsed_ms = (time.perf_counter() - t_start) * 1000
result = AnalyzeResponse(
id=doc.id,
title=doc.title,
layers=valid_layers,
elapsed_ms=elapsed_ms,
truncated=truncated_flag,
cached=False,
)
_analyze_cache_set(cache_key, result)
layers_returned = [la.layer for la in valid_layers]
logger.info(
"document.analyze ok doc_id=%s user=%s layers=%d elapsed_ms=%.0f",
doc_id,
getattr(user, "username", "?"),
len(valid_layers),
elapsed_ms,
)
return result
finally:
# Phase E.2: 모든 exit (성공/cache/에러) 에서 analyze_events INSERT
latency_ms = int((time.perf_counter() - t_start) * 1000)
background_tasks.add_task(
record_analyze_event,
doc_id=doc_id,
user_id=getattr(user, "id", None),
mode="quick",
text_limit=ANALYZE_TEXT_LIMIT,
truncated=truncated_flag,
layers_returned=layers_returned,
cached=cached_flag,
latency_ms=latency_ms,
model_name=resolve_primary_model(),
prompt_version=ANALYZE_PROMPT_VERSION,
error_code=error_code,
source=source,
)