diff --git a/app/api/documents.py b/app/api/documents.py index 2fc295f..ab46978 100644 --- a/app/api/documents.py +++ b/app/api/documents.py @@ -21,7 +21,7 @@ from fastapi import ( UploadFile, status, ) -from fastapi.responses import FileResponse +from fastapi.responses import FileResponse, StreamingResponse from pydantic import BaseModel, field_validator from sqlalchemy import func, select from sqlalchemy.ext.asyncio import AsyncSession @@ -30,12 +30,19 @@ from starlette.requests import ClientDisconnect from ai.client import AIClient, _load_prompt, parse_json_response from core.auth import get_current_user from core.config import settings -from core.database import get_session +from core.database import async_session, get_session from core.utils import file_hash from models.document import Document from models.document_image import DocumentImage from models.queue import ProcessingQueue, enqueue_stage from models.user import User +from services.dedup import ( + DUPLICATE_GROUPS_SQL, + DEDUP_OFF_CHANNELS, + find_canonical_for_hash, + find_near_duplicates, +) +from services.storage import StorageNotConfigured, get_storage_backend from services.document_telemetry import record_analyze_event, sanitize_source from services.prompt_versions import ANALYZE_PROMPT_VERSION, resolve_primary_model from services.search.llm_gate import Priority, acquire_mlx_gate @@ -62,6 +69,53 @@ def _upload_error(status_code: int, error_code: str, message: str) -> HTTPExcept ) +async def _near_dup_scan_bg(doc_id: int) -> None: + """B-3: post-upload near_duplicate 스캔 (BackgroundTask). 자체 세션, best-effort. + + 업로드 직후엔 doc.embedding 이 아직 없을 수 있어(embed stage 미완) trigram 후보만 + 기록되는 경우가 많다 — non-gating. 어떤 예외도 업로드 결과(201)에 영향 주지 않는다. + 영속화는 보류(on-the-fly) — 현재는 로깅까지. /duplicates 의 near-dup 노출은 phase2. + """ + try: + async with async_session() as bg_session: + findings = await find_near_duplicates(bg_session, doc_id) + if findings: + top = findings[0] + logger.info( + "[dedup] near_dup_scan doc=%s candidates=%d top=%s(cosine=%s)", + doc_id, len(findings), top["doc_id"], top.get("cosine"), + ) + except Exception: + logger.warning("[dedup] near_dup_scan failed doc=%s", doc_id, exc_info=True) + + +def _parse_byte_range(range_header: str | None, size: int) -> tuple[int | None, int | None]: + """HTTP Range 헤더(`bytes=start-end`) 파싱 → (start, end) inclusive. 없거나 무효면 (None, None). + + D-2 원격 백엔드 Range pass-through 용 (local 은 FileResponse 가 자동 처리). suffix 형식 + (`bytes=-N`) 도 지원. 다중 range 는 첫 구간만. + """ + if not range_header or not range_header.startswith("bytes=") or size <= 0: + return None, None + spec = range_header[len("bytes="):].split(",")[0].strip() + if "-" not in spec: + return None, None + lo, hi = spec.split("-", 1) + try: + if lo == "": # suffix range: 마지막 N 바이트 + n = int(hi) + if n <= 0: + return None, None + return max(0, size - n), size - 1 + start = int(lo) + end = int(hi) if hi else size - 1 + except ValueError: + return None, None + if start > end or start >= size: + return None, None + return start, min(end, size - 1) + + # ─── 스키마 ─── @@ -543,6 +597,53 @@ async def list_documents( ) +# ─── 중복검사 (dedup) — B-2 ─── +# ★ 고정 path 라우트(/duplicates)는 동적 /{doc_id} 라우트보다 *위*에 등록해야 매칭 충돌이 없다. +class DuplicateGroup(BaseModel): + canonical_id: int + members: list[int] + reason: str + detail: str | None = None + + +class DuplicatesResponse(BaseModel): + groups: list[DuplicateGroup] + total_groups: int + total_duplicate_docs: int + + +@router.get("/duplicates", response_model=DuplicatesResponse) +async def list_duplicates( + user: Annotated[User, Depends(get_current_user)], + session: Annotated[AsyncSession, Depends(get_session)], +): + """content_hash(= file_hash exact) 중복 그룹 목록. + + OFF-whitelist(law_monitor) 제외 + deleted 제외. idx_documents_hash 재사용(신규 인덱스/테이블 불요). + near_duplicate(유사도 기반) 그룹은 영속화 보류 → S1 은 exact 그룹만 노출(계약 shape 동일, + detail 문구만 'file_hash' 기준). 응답 shape = ds-app contract `documents_duplicates.json`. + """ + rows = ( + await session.execute(DUPLICATE_GROUPS_SQL, {"off_channels": list(DEDUP_OFF_CHANNELS)}) + ).all() + + groups = [ + DuplicateGroup( + canonical_id=r.canonical_id, + members=list(r.members), + reason="content_hash", + detail="동일 file_hash (원본 바이트 SHA-256 일치)", + ) + for r in rows + ] + return DuplicatesResponse( + groups=groups, + total_groups=len(groups), + # 사본 수 = 그룹별 (멤버수-1) 합 (canonical 제외) — fixture total_duplicate_docs 정의와 동일. + total_duplicate_docs=sum(len(g.members) - 1 for g in groups), + ) + + @router.get("/{doc_id}", response_model=DocumentDetailResponse) async def get_document( doc_id: int, @@ -701,6 +802,7 @@ async def get_document_file( session: Annotated[AsyncSession, Depends(get_session)], token: str | None = Query(None, description="Bearer token (iframe용)"), download: bool = Query(False, description="true면 attachment (브라우저 다운로드)"), + range_header: str | None = Header(None, alias="Range"), user: User | None = Depends(lambda: None), ): """문서 원본 파일 서빙 (Bearer 헤더 또는 ?token= 쿼리 파라미터)""" @@ -723,9 +825,10 @@ async def get_document_file( if not doc.file_path: raise HTTPException(status_code=404, detail="파일이 없는 문서입니다 (메모)") - file_path = Path(settings.nas_mount_path) / doc.file_path - if not file_path.exists(): - raise HTTPException(status_code=404, detail="파일을 찾을 수 없습니다") + # D-2: 물리 경로 해석을 storage 백엔드로 단일화. local=FileResponse(Range 자동) / + # 원격=ABC.stream(range). /file URL·바디 shape 불변(non-breaking). 현재 활성 백엔드는 + # LocalBackend only 라 동작 변경 0. + backend = get_storage_backend() # 미디어 타입 매핑 # HTML5