"""문서 CRUD API""" import asyncio import logging import shutil import time from datetime import datetime, timezone from pathlib import Path from typing import Annotated, Literal from urllib.parse import quote from fastapi import ( APIRouter, BackgroundTasks, Depends, Form, Header, HTTPException, Query, Request, UploadFile, status, ) from fastapi.responses import FileResponse, StreamingResponse from pydantic import BaseModel, field_validator from sqlalchemy import func, select from sqlalchemy.ext.asyncio import AsyncSession from starlette.requests import ClientDisconnect from ai.client import AIClient, _load_prompt, parse_json_response from core.auth import get_current_user from core.config import settings from core.database import async_session, get_session from core.utils import file_hash from models.document import Document from models.document_image import DocumentImage from models.queue import ProcessingQueue, enqueue_stage from models.user import User from services.dedup import ( DUPLICATE_GROUPS_SQL, DEDUP_OFF_CHANNELS, find_canonical_for_hash, find_near_duplicates, ) from services.storage import StorageNotConfigured, get_storage_backend from services.document_telemetry import record_analyze_event, sanitize_source from services.prompt_versions import ANALYZE_PROMPT_VERSION, resolve_primary_model from services.search.llm_gate import Priority, acquire_mlx_gate router = APIRouter() logger = logging.getLogger(__name__) def _upload_error(status_code: int, error_code: str, message: str) -> HTTPException: """업로드 실패 응답. detail 은 객체 — 프론트가 error_code 로 분기. error_code 종류: body_too_large — Content-Length 또는 스트리밍 누적이 max_bytes 초과 (413) upload_timeout — 서버 read timeout (408) network_abort — 클라이언트 abort / 연결 끊김 (499) empty_file — 0바이트 (400) invalid_input — 파일명/경로/필드 검증 실패 (400) unsupported_codec — 웹 업로드에서 direct-play 불가 비디오 (400, §3 video) internal — 그 외 알 수 없는 에러 (500) """ return HTTPException( status_code=status_code, detail={"error_code": error_code, "message": message}, ) async def _near_dup_scan_bg(doc_id: int) -> None: """B-3: post-upload near_duplicate 스캔 (BackgroundTask). 자체 세션, best-effort. 업로드 직후엔 doc.embedding 이 아직 없을 수 있어(embed stage 미완) trigram 후보만 기록되는 경우가 많다 — non-gating. 어떤 예외도 업로드 결과(201)에 영향 주지 않는다. 영속화는 보류(on-the-fly) — 현재는 로깅까지. /duplicates 의 near-dup 노출은 phase2. """ try: async with async_session() as bg_session: findings = await find_near_duplicates(bg_session, doc_id) if findings: top = findings[0] logger.info( "[dedup] near_dup_scan doc=%s candidates=%d top=%s(cosine=%s)", doc_id, len(findings), top["doc_id"], top.get("cosine"), ) except Exception: logger.warning("[dedup] near_dup_scan failed doc=%s", doc_id, exc_info=True) def _parse_byte_range(range_header: str | None, size: int) -> tuple[int | None, int | None]: """HTTP Range 헤더(`bytes=start-end`) 파싱 → (start, end) inclusive. 없거나 무효면 (None, None). D-2 원격 백엔드 Range pass-through 용 (local 은 FileResponse 가 자동 처리). suffix 형식 (`bytes=-N`) 도 지원. 다중 range 는 첫 구간만. """ if not range_header or not range_header.startswith("bytes=") or size <= 0: return None, None spec = range_header[len("bytes="):].split(",")[0].strip() if "-" not in spec: return None, None lo, hi = spec.split("-", 1) try: if lo == "": # suffix range: 마지막 N 바이트 n = int(hi) if n <= 0: return None, None return max(0, size - n), size - 1 start = int(lo) end = int(hi) if hi else size - 1 except ValueError: return None, None if start > end or start >= size: return None, None return start, min(end, size - 1) # ─── 스키마 ─── class DocumentResponse(BaseModel): id: int file_path: str | None file_format: str file_size: int | None file_type: str title: str | None ai_domain: str | None ai_sub_group: str | None ai_tags: list | None ai_summary: str | None document_type: str | None importance: str | None ai_confidence: float | None user_note: str | None user_tags: list | None pinned: bool | None ask_includable: bool | None derived_path: str | None original_format: str | None conversion_status: str | None is_read: bool | None review_status: str | None edit_url: str | None preview_status: str | None source_channel: str | None data_origin: str | None doc_purpose: str | None facet_company: str | None = None facet_topic: str | None = None facet_year: int | None = None facet_doctype: str | None = None category: str | None = None ai_suggestion: dict | None = None # PR-B B-1: summary_triage (4B) / summary_deep (26B) 분할 산출 ai_tldr: str | None = None ai_bullets: list | None = None ai_detail_summary: str | None = None ai_inconsistencies: list | None = None ai_analysis_tier: str | None = None # 'triage' | 'deep' | null extracted_at: datetime | None ai_processed_at: datetime | None embedded_at: datetime | None created_at: datetime updated_at: datetime # 회독 추적 (자료실 등) — 현재 사용자 기준. 다른 endpoint 응답에선 0/None. read_count: int = 0 last_read_at: datetime | None = None # S1-ADD (migration 287): 원본 파일명 + 중복검사. 앱은 옵셔널 디코딩, 없으면 폴백. original_filename: str | None = None # 다운로드 라벨용. 없으면 file_path basename 폴백(앱 측). duplicate_of: int | None = None # canonical doc id (자기 자신이 canonical 이면 None). duplicate_count: int = 0 # 본인 제외 동일 판정 사본 수 (canonical 행 기준). class Config: from_attributes = True class DocumentListResponse(BaseModel): items: list[DocumentResponse] total: int page: int page_size: int class DocumentDetailResponse(DocumentResponse): """단건 조회 전용 — 본문(extracted_text)·canonical markdown 동봉. 리스트 응답은 페이로드 비대화 회피로 DocumentResponse 만 사용. """ extracted_text: str | None = None md_content: str | None = None md_frontmatter: dict | None = None md_status: str | None = None md_extraction_quality: dict | None = None md_extraction_error: str | None = None md_extraction_engine: str | None = None md_extraction_engine_version: str | None = None md_generated_at: datetime | None = None @field_validator("md_status", mode="before") @classmethod def _db_success_to_completed(cls, v: str | None) -> str | None: """DB CHECK enum 은 'success'; 계약/fixture·앱 MD-first 렌더 트리거는 'completed'. read-time(DB→API) 단방향 매핑만 — write 경로(ORM)는 이 모델을 거치지 않아 미적용. pending/processing/partial/failed/skipped 는 양쪽 동일하므로 'success' 만 매핑한다. (불변식: md_status ∈ {success,partial} ⟹ md_content 非공백 = 워커 postcondition, C-5.) """ return "completed" if v == "success" else v class AcceptSuggestionRequest(BaseModel): """§1 accept-suggestion 요청 body — stale payload / doc 수정 검출. jurisdiction: 안전 자료실 A-2 — material_type 제안 승인 시 사용자가 지정하는 관할. law 승인은 필수 (기본값 없음 — KR 자동 부여 시 외국 자료가 KR 법령으로 오염되는 경로를 차단, plan A-2 계약). """ expected_source_updated_at: datetime jurisdiction: str | None = None class DocumentUpdate(BaseModel): title: str | None = None ai_domain: str | None = None ai_sub_group: str | None = None ai_tags: list | None = None user_tags: list | None = None user_note: str | None = None is_read: bool | None = None edit_url: str | None = None source_channel: str | None = None data_origin: str | None = None doc_purpose: str | None = None pinned: bool | None = None facet_company: str | None = None facet_topic: str | None = None facet_year: int | None = None facet_doctype: str | None = None # ─── 스키마 (트리) ─── class TreeNode(BaseModel): name: str path: str count: int children: list["TreeNode"] # ─── 엔드포인트 ─── @router.get("/tree") async def get_document_tree( user: Annotated[User, Depends(get_current_user)], session: Annotated[AsyncSession, Depends(get_session)], ): """도메인 트리 (3단계 경로 파싱, 사이드바용)""" from sqlalchemy import text as sql_text result = await session.execute( sql_text(""" SELECT ai_domain, COUNT(*) FROM documents WHERE ai_domain IS NOT NULL AND ai_domain != '' AND ai_domain != 'News' AND deleted_at IS NULL -- 문서함(list) 기본 제외와 동일하게 맞춤: 뉴스/법령 채널·메모는 문서함에 안 뜨므로 -- 트리 카운트도 제외해야 "트리 N건인데 클릭하면 0건" 불일치가 안 생긴다. AND source_channel != 'news' AND source_channel != 'law_monitor' AND file_type != 'note' GROUP BY ai_domain ORDER BY ai_domain """) ) # 경로를 트리로 파싱 root: dict = {} for domain_path, count in result: parts = domain_path.split("/") node = root for part in parts: if part not in node: node[part] = {"_count": 0, "_children": {}} node[part]["_count"] += count node = node[part]["_children"] def build_tree(d: dict, prefix: str = "") -> list[dict]: nodes = [] for name, data in sorted(d.items()): path = f"{prefix}/{name}" if prefix else name children = build_tree(data["_children"], path) nodes.append({ "name": name, "path": path, "count": data["_count"], "children": children, }) return nodes return build_tree(root) @router.get("/library-tree") async def get_library_tree( user: Annotated[User, Depends(get_current_user)], session: Annotated[AsyncSession, Depends(get_session)], ): """자료실 트리 (user_tags @library/ 경로 기반, unique doc count)""" from core.library import LIBRARY_PREFIX result = await session.execute( select(Document.id, Document.user_tags).where( Document.deleted_at == None, # noqa: E711 Document.user_tags != None, # noqa: E711 ) ) root: dict = {} for doc_id, tags in result: if not tags: continue seen_ancestors: set[str] = set() for tag in tags: if not isinstance(tag, str) or not tag.startswith(LIBRARY_PREFIX): continue path = tag[len(LIBRARY_PREFIX):] parts = path.split("/") node = root for i, part in enumerate(parts): if part not in node: node[part] = {"_docs": set(), "_children": {}} ancestor_key = "/".join(parts[: i + 1]) if ancestor_key not in seen_ancestors: node[part]["_docs"].add(doc_id) seen_ancestors.add(ancestor_key) node = node[part]["_children"] def build_library_tree(d: dict, prefix: str = "") -> list[dict]: nodes = [] for name, data in sorted(d.items()): if name.startswith("_"): continue path = f"{prefix}/{name}" if prefix else name children = build_library_tree(data["_children"], path) nodes.append({ "name": name, "path": path, "count": len(data["_docs"]), "children": children, }) return nodes return build_library_tree(root) @router.get("/library", response_model=DocumentListResponse) async def list_library_documents( user: Annotated[User, Depends(get_current_user)], session: Annotated[AsyncSession, Depends(get_session)], path: str | None = None, q: str | None = None, sort: str = Query("updated_desc"), page: int = Query(1, ge=1), page_size: int = Query(20, ge=1, le=100), facet_company: str | None = None, facet_topic: str | None = None, facet_year: int | None = None, facet_doctype: str | None = None, unread: bool = Query(False, description="true: 현재 사용자 회독 0건만"), ): """자료실 문서 목록 (category='library' 기반, prefix match, facet 필터, 정렬). `unread=true` 시 현재 사용자가 1번도 회독 안 한 documents 만. """ from sqlalchemy import text as sql_text from core.library import LIBRARY_PREFIX, normalize_library_path from models.document_read import DocumentRead if path: try: path = normalize_library_path(path) except ValueError as e: raise HTTPException(status_code=400, detail=str(e)) query = select(Document).where( Document.deleted_at == None, # noqa: E711 Document.category == "library", ) # 안 본 자료만 — 현재 사용자의 read 가 없는 documents if unread: read_subq = ( select(DocumentRead.document_id) .where(DocumentRead.user_id == user.id) .scalar_subquery() ) query = query.where(Document.id.notin_(read_subq)) if path: exact = f"{LIBRARY_PREFIX}{path}" prefix = f"{LIBRARY_PREFIX}{path}/%" query = query.where( sql_text(""" EXISTS ( SELECT 1 FROM jsonb_array_elements_text(documents.user_tags) AS t WHERE t = :exact OR t LIKE :prefix ) """).bindparams(exact=exact, prefix=prefix) ) if q: query = query.where(Document.title.ilike(f"%{q}%")) # facet 필터 if facet_company: query = query.where(Document.facet_company == facet_company) if facet_topic: query = query.where(Document.facet_topic == facet_topic) if facet_year: query = query.where(Document.facet_year == facet_year) if facet_doctype: query = query.where(Document.facet_doctype == facet_doctype) # 전체 건수 count_query = select(func.count()).select_from(query.subquery()) total = (await session.execute(count_query)).scalar() # 정렬 sort_map = { "updated_desc": Document.updated_at.desc(), "title_asc": Document.title.asc(), "created_desc": Document.created_at.desc(), } query = query.order_by(sort_map.get(sort, Document.updated_at.desc())) query = query.offset((page - 1) * page_size).limit(page_size) result = await session.execute(query) items = result.scalars().all() # 회독 통계 한 번에 fetch (현재 페이지 N건 한정 — N+1 회피). # DocumentRead 는 함수 상단에서 이미 import. read_map: dict[int, tuple[int, datetime | None]] = {} if items: doc_ids = [d.id for d in items] rs = await session.execute( select( DocumentRead.document_id, func.count(DocumentRead.id), func.max(DocumentRead.read_at), ) .where( DocumentRead.user_id == user.id, DocumentRead.document_id.in_(doc_ids), ) .group_by(DocumentRead.document_id) ) for did, cnt, last in rs: read_map[did] = (int(cnt or 0), last) def _to_resp(doc): resp = DocumentResponse.model_validate(doc) cnt, last = read_map.get(doc.id, (0, None)) resp.read_count = cnt resp.last_read_at = last return resp return DocumentListResponse( items=[_to_resp(doc) for doc in items], total=total, page=page, page_size=page_size, ) # ─── Section 2: 카테고리 집계 (Sidebar / Dashboard) ─── # # documents.category (§1 에서 추가) 가 1차 진입점. 이 엔드포인트는 Sidebar 배지 및 # /dashboard 카테고리 카드 용. ai_suggestion.proposed_category='library' 인 # 승인 대기 건수는 /library 의 pending 배지로 별도 표시. @router.get("/stats/category-counts") async def get_category_counts( user: Annotated[User, Depends(get_current_user)], session: Annotated[AsyncSession, Depends(get_session)], ): """카테고리별 문서 건수 + 승인 대기 (library 제안) 건수. Response: { "counts": { "document": 640, "library": 12, "news": 311, ... }, "library_pending_suggestions": 17 } - 전제: §1 의 documents.category enum + ai_suggestion JSONB 가 이미 적용됨 - category IS NULL 인 문서는 counts 에서 제외 (§1 백필 전 드문 상태) """ from sqlalchemy import text as sql_text count_rows = await session.execute( sql_text(""" SELECT category::text AS category, COUNT(*) AS cnt FROM documents WHERE deleted_at IS NULL AND category IS NOT NULL GROUP BY category """) ) counts: dict[str, int] = {row.category: row.cnt for row in count_rows} pending_scalar = ( await session.execute( sql_text(""" SELECT COUNT(*) FROM documents WHERE deleted_at IS NULL AND ai_suggestion IS NOT NULL AND ai_suggestion->>'proposed_category' = 'library' """) ) ).scalar() return { "counts": counts, "library_pending_suggestions": int(pending_scalar or 0), } @router.get("/", response_model=DocumentListResponse) async def list_documents( user: Annotated[User, Depends(get_current_user)], session: Annotated[AsyncSession, Depends(get_session)], page: int = Query(1, ge=1), page_size: int = Query(20, ge=1, le=500), domain: str | None = None, sub_group: str | None = None, source: str | None = None, format: str | None = None, review_status: str | None = Query(None, description="pending | approved | rejected"), category: str | None = Query(None, description="doc_category enum — 지정 시 기본 news/memo 제외 해제"), has_suggestion: bool | None = Query(None, description="true: ai_suggestion IS NOT NULL"), proposed_category: str | None = Query(None, description="ai_suggestion.proposed_category 필터"), material_type: str | None = Query(None, description="안전 자료실 C-1: 자료유형. 지정 시 기본 exclude 해제"), jurisdiction: str | None = Query(None, description="안전 자료실 C-1: 관할 (KR/US/...)"), ): """문서 목록 조회 (페이지네이션 + 필터). 기본은 뉴스/메모 제외. `category` 지정 시 해당 카테고리만 반환 (기본 제외 해제). §2 승인 UI 용: `has_suggestion=true&proposed_category=library` 조합. """ query = select(Document).where( Document.deleted_at == None, # noqa: E711 ) if category: # 명시적 카테고리 필터 — 기본 exclude 해제 query = query.where(Document.category == category) elif material_type: # 안전 자료실 C-1: material_type 지정 = 기본 exclude(news·law_monitor·note) 해제. # 안전 코퍼스 본체(KOSHA 사례·CSB·법령 등)가 전부 note/crawl 채널이라 exclude 면 빈 화면. query = query.where(Document.material_type == material_type) else: # 기본 목록: 뉴스/메모/법령 제외 (문서함 용도) query = query.where( Document.source_channel != "news", Document.source_channel != "law_monitor", Document.file_type != "note", ) if jurisdiction: query = query.where(Document.jurisdiction == jurisdiction) if has_suggestion is True: query = query.where(Document.ai_suggestion.isnot(None)) elif has_suggestion is False: query = query.where(Document.ai_suggestion.is_(None)) if proposed_category: # ai_suggestion JSONB 의 proposed_category 값 매칭 query = query.where( Document.ai_suggestion["proposed_category"].astext == proposed_category ) if domain: # prefix 매칭: Industrial_Safety 클릭 시 하위 전부 포함 query = query.where(Document.ai_domain.startswith(domain)) if source: query = query.where(Document.source_channel == source) if format: query = query.where(Document.file_format == format) if review_status: query = query.where(Document.review_status == review_status) # 전체 건수 count_query = select(func.count()).select_from(query.subquery()) total = (await session.execute(count_query)).scalar() # 페이지네이션 query = query.order_by(Document.created_at.desc()) query = query.offset((page - 1) * page_size).limit(page_size) result = await session.execute(query) items = result.scalars().all() return DocumentListResponse( items=[DocumentResponse.model_validate(doc) for doc in items], total=total, page=page, page_size=page_size, ) # ─── 중복검사 (dedup) — B-2 ─── # ★ 고정 path 라우트(/duplicates)는 동적 /{doc_id} 라우트보다 *위*에 등록해야 매칭 충돌이 없다. class DuplicateGroup(BaseModel): canonical_id: int members: list[int] reason: str detail: str | None = None class DuplicatesResponse(BaseModel): groups: list[DuplicateGroup] total_groups: int total_duplicate_docs: int @router.get("/duplicates", response_model=DuplicatesResponse) async def list_duplicates( user: Annotated[User, Depends(get_current_user)], session: Annotated[AsyncSession, Depends(get_session)], ): """content_hash(= file_hash exact) 중복 그룹 목록. OFF-whitelist(law_monitor) 제외 + deleted 제외. idx_documents_hash 재사용(신규 인덱스/테이블 불요). near_duplicate(유사도 기반) 그룹은 영속화 보류 → S1 은 exact 그룹만 노출(계약 shape 동일, detail 문구만 'file_hash' 기준). 응답 shape = ds-app contract `documents_duplicates.json`. """ rows = ( await session.execute(DUPLICATE_GROUPS_SQL, {"off_channels": list(DEDUP_OFF_CHANNELS)}) ).all() groups = [ DuplicateGroup( canonical_id=r.canonical_id, members=list(r.members), reason="content_hash", detail="동일 file_hash (원본 바이트 SHA-256 일치)", ) for r in rows ] return DuplicatesResponse( groups=groups, total_groups=len(groups), # 사본 수 = 그룹별 (멤버수-1) 합 (canonical 제외) — fixture total_duplicate_docs 정의와 동일. total_duplicate_docs=sum(len(g.members) - 1 for g in groups), ) @router.get("/{doc_id}", response_model=DocumentDetailResponse) async def get_document( doc_id: int, user: Annotated[User, Depends(get_current_user)], session: Annotated[AsyncSession, Depends(get_session)], ): """문서 단건 조회. 본문(extracted_text)·canonical markdown 동봉.""" doc = await session.get(Document, doc_id) if not doc or doc.deleted_at is not None: raise HTTPException(status_code=404, detail="문서를 찾을 수 없습니다") return DocumentDetailResponse.model_validate(doc) # ─── 절(hier section) 목차 + 요약 (PR-DocSrv-Hier-Section-UI-1) ─── class SectionItem(BaseModel): chunk_id: int section_title: str | None = None # raw 마크다운 포함 — 정제는 프런트(headingPath.ts) heading_path: str | None = None # raw level: int | None = None node_type: str | None = None # window | chapter_split | clause_split | section_split | null is_leaf: bool char_start: int | None = None # md_content 내 heading offset(UTF-16). jump-target 만 값, 그 외 None (Path B) section_type: str | None = None summary: str | None = None # status='summarized' 인 분석행에만, 그 외 None confidence: float | None = None class DocumentSectionsResponse(BaseModel): doc_id: int sections: list[SectionItem] @router.get("/{doc_id}/sections", response_model=DocumentSectionsResponse) async def get_document_sections( doc_id: int, user: Annotated[User, Depends(get_current_user)], session: Annotated[AsyncSession, Depends(get_session)], ): """문서의 hier 절(leaf) 목차 + 절-레벨 요약(chunk_section_analysis). ⚠ 뷰 우회 — 의도적 예외 (변경 금지): retrieval 경로(retrieval_service / *_rag)는 in_corpus=false 누출 방지를 위해 반드시 corpus_chunks 뷰만 본다. 그러나 이 endpoint 는 retrieval 이 아니라 "문서 전체 leaf 목차 표시"라서 in_corpus=false(검색 비활성) 절도 보여야 하므로 document_chunks 를 직접 조회한다. corpus_chunks 로 바꾸면 비활성 절이 목차에서 사라지는 회귀가 생기니 절대 바꾸지 말 것. (Hier-Decomp 코퍼스 격리 규율의 명시적 예외) DISTINCT ON (c.id) + ORDER BY a.created_at/a.id DESC: chunk 당 최신 분석 1행만 (prompt_version 다중 시 중복 JOIN 방지). 절 없는 문서(legacy/news)는 sections=[]. """ from sqlalchemy import text as sql_text doc = await session.get(Document, doc_id) if not doc or doc.deleted_at is not None: raise HTTPException(status_code=404, detail="문서를 찾을 수 없습니다") rows = ( await session.execute( sql_text( """ SELECT chunk_id, section_title, heading_path, level, node_type, is_leaf, char_start, section_type, summary, confidence FROM ( SELECT DISTINCT ON (c.id) c.id AS chunk_id, c.chunk_index, c.section_title, c.heading_path, c.level, c.node_type, c.is_leaf, c.char_start, a.section_type, CASE WHEN a.status = 'summarized' THEN a.summary ELSE NULL END AS summary, a.confidence FROM document_chunks c LEFT JOIN chunk_section_analysis a ON a.chunk_id = c.id AND a.status = 'summarized' WHERE c.doc_id = :doc_id AND c.source_type = 'hier_section' AND (c.is_leaf = true OR c.node_type LIKE '%\\_split' ESCAPE '\\') ORDER BY c.id, a.created_at DESC, a.id DESC ) t ORDER BY t.chunk_index """ ).bindparams(doc_id=doc_id) ) ).mappings().all() return DocumentSectionsResponse( doc_id=doc_id, sections=[SectionItem(**dict(r)) for r in rows], ) # ─── 자료실 인접 자료 (이전/다음) ─── # 학습 흐름: 한 자료 다 읽으면 같은 챕터의 다음 자료로 자연스럽게 이동. # library_path (정확 일치 + 하위 prefix) 안에서 title 오름차순 기준. class NeighborItem(BaseModel): id: int title: str | None class LibraryNeighborsResponse(BaseModel): prev: NeighborItem | None next: NeighborItem | None path: str | None # 같은 path 내에서 계산된 결과 @router.get("/{doc_id}/library-neighbors", response_model=LibraryNeighborsResponse) async def get_library_neighbors( doc_id: int, user: Annotated[User, Depends(get_current_user)], session: Annotated[AsyncSession, Depends(get_session)], ): """현재 자료의 같은 library_path 안에서 이전/다음 자료. title_asc 정렬 기준. library_path 추출: user_tags 의 첫 번째 `@library/...` 태그. """ from core.library import LIBRARY_PREFIX doc = await session.get(Document, doc_id) if not doc or doc.deleted_at is not None or doc.category != "library": raise HTTPException(status_code=404, detail="자료실 자료가 아닙니다") # 첫 번째 library 태그를 path 로 path: str | None = None for t in (doc.user_tags or []): if isinstance(t, str) and t.startswith(LIBRARY_PREFIX): path = t[len(LIBRARY_PREFIX):] break if not path: return LibraryNeighborsResponse(prev=None, next=None, path=None) # 같은 path (정확히) 의 자료들 — title 오름차순. # user_tags 는 JSONB. 다른 endpoint 와 일관되게 EXISTS + jsonb_array_elements_text 사용. from sqlalchemy import text as sql_text exact_tag = f"{LIBRARY_PREFIX}{path}" res = await session.execute( select(Document.id, Document.title) .where( Document.deleted_at == None, # noqa: E711 Document.category == "library", sql_text(""" EXISTS ( SELECT 1 FROM jsonb_array_elements_text(documents.user_tags) AS t WHERE t = :exact ) """).bindparams(exact=exact_tag), ) .order_by(Document.title.asc().nullslast(), Document.id.asc()) ) rows = list(res) idx = next((i for i, r in enumerate(rows) if r.id == doc_id), -1) prev_n = NeighborItem(id=rows[idx - 1].id, title=rows[idx - 1].title) if idx > 0 else None next_n = NeighborItem(id=rows[idx + 1].id, title=rows[idx + 1].title) if 0 <= idx < len(rows) - 1 else None return LibraryNeighborsResponse(prev=prev_n, next=next_n, path=path) @router.get("/{doc_id}/file") async def get_document_file( doc_id: int, session: Annotated[AsyncSession, Depends(get_session)], token: str | None = Query(None, description="Bearer token (iframe용)"), download: bool = Query(False, description="true면 attachment (브라우저 다운로드)"), range_header: str | None = Header(None, alias="Range"), user: User | None = Depends(lambda: None), ): """문서 원본 파일 서빙 (Bearer 헤더 또는 ?token= 쿼리 파라미터)""" from core.auth import decode_token # 쿼리 파라미터 토큰 검증 if token: payload = decode_token(token) if not payload or payload.get("type") != "access": raise HTTPException(status_code=401, detail="유효하지 않은 토큰") else: # 일반 Bearer 헤더 인증 시도 raise HTTPException(status_code=401, detail="토큰이 필요합니다") doc = await session.get(Document, doc_id) if not doc: raise HTTPException(status_code=404, detail="문서를 찾을 수 없습니다") # note(메모)는 물리 파일이 없음 if not doc.file_path: raise HTTPException(status_code=404, detail="파일이 없는 문서입니다 (메모)") # D-2: 물리 경로 해석을 storage 백엔드로 단일화. local=FileResponse(Range 자동) / # 원격=ABC.stream(range). /file URL·바디 shape 불변(non-breaking). 현재 활성 백엔드는 # LocalBackend only 라 동작 변경 0. backend = get_storage_backend() # 미디어 타입 매핑 # HTML5