From daf6a0ade9de7babee6b6003bf3e567401a72a6d Mon Sep 17 00:00:00 2001 From: hyungi Date: Fri, 5 Jun 2026 07:13:21 +0900 Subject: [PATCH] =?UTF-8?q?feat(documents):=20S1=20dedup=C2=B7office-md?= =?UTF-8?q?=C2=B7storage=20scaffold=20(B/C/D/E)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit plan ds-s1-backend-1 잔여 구현 (A·C-1 은 16b0fe1): - B 중복검사: services/dedup.py (OFF-list law_monitor 공용) + 업로드 채움(B-1) + GET /documents/duplicates(B-2) + post-upload near-dup 비동기(B-3) + backfill_dedup.py(B-4) + 야간 dedup_reconcile 잡(03:30 KST 멱등 재계산) - C MD-first: marker_worker office/hwp 분기 _process_office(C-2) + md_status 상태머신 postcondition success|failed(C-5) + backfill_nonpdf_markdown.py(C-4) + requirements markitdown - D 스토리지: services/storage ABC+Range 계약 / LocalBackend / NasApiBackend 503 (D-1) + /file resolver 경유, 로컬 동작 불변(D-2) - E 운영: pre-change pg_dump + rollback_287.sql + apply runbook(E-3) + 테스트(E-1) 비파괴 불변식 유지(기존 응답 shape 무변경, md_status success→completed read-time 매핑). 어드버서리얼 리뷰 확정 1건(soft-delete canonical 승격 시 stale duplicate_of) → B-1 승격 정규화 + 야간 재계산으로 정합. Co-Authored-By: Claude Opus 4.8 (1M context) --- app/api/documents.py | 174 ++++++++++++++++++-- app/main.py | 4 + app/requirements.txt | 3 + app/services/dedup.py | 239 ++++++++++++++++++++++++++++ app/services/storage/__init__.py | 39 +++++ app/services/storage/base.py | 50 ++++++ app/services/storage/local.py | 50 ++++++ app/services/storage/nas_api.py | 33 ++++ app/workers/dedup_reconcile.py | 32 ++++ app/workers/marker_worker.py | 78 ++++++++- app/workers/office_md.py | 4 +- docs/ops/s1-apply-runbook.md | 87 ++++++++++ scripts/backfill_dedup.py | 90 +++++++++++ scripts/backfill_nonpdf_markdown.py | 146 +++++++++++++++++ scripts/rollback_287.sql | 18 +++ scripts/s1_pre_change_backup.sh | 30 ++++ tests/test_s1_dedup_b2.py | 96 +++++++++++ 17 files changed, 1157 insertions(+), 16 deletions(-) create mode 100644 app/services/dedup.py create mode 100644 app/services/storage/__init__.py create mode 100644 app/services/storage/base.py create mode 100644 app/services/storage/local.py create mode 100644 app/services/storage/nas_api.py create mode 100644 app/workers/dedup_reconcile.py create mode 100644 docs/ops/s1-apply-runbook.md create mode 100644 scripts/backfill_dedup.py create mode 100644 scripts/backfill_nonpdf_markdown.py create mode 100644 scripts/rollback_287.sql create mode 100644 scripts/s1_pre_change_backup.sh create mode 100644 tests/test_s1_dedup_b2.py diff --git a/app/api/documents.py b/app/api/documents.py index 2fc295f..ab46978 100644 --- a/app/api/documents.py +++ b/app/api/documents.py @@ -21,7 +21,7 @@ from fastapi import ( UploadFile, status, ) -from fastapi.responses import FileResponse +from fastapi.responses import FileResponse, StreamingResponse from pydantic import BaseModel, field_validator from sqlalchemy import func, select from sqlalchemy.ext.asyncio import AsyncSession @@ -30,12 +30,19 @@ from starlette.requests import ClientDisconnect from ai.client import AIClient, _load_prompt, parse_json_response from core.auth import get_current_user from core.config import settings -from core.database import get_session +from core.database import async_session, get_session from core.utils import file_hash from models.document import Document from models.document_image import DocumentImage from models.queue import ProcessingQueue, enqueue_stage from models.user import User +from services.dedup import ( + DUPLICATE_GROUPS_SQL, + DEDUP_OFF_CHANNELS, + find_canonical_for_hash, + find_near_duplicates, +) +from services.storage import StorageNotConfigured, get_storage_backend from services.document_telemetry import record_analyze_event, sanitize_source from services.prompt_versions import ANALYZE_PROMPT_VERSION, resolve_primary_model from services.search.llm_gate import Priority, acquire_mlx_gate @@ -62,6 +69,53 @@ def _upload_error(status_code: int, error_code: str, message: str) -> HTTPExcept ) +async def _near_dup_scan_bg(doc_id: int) -> None: + """B-3: post-upload near_duplicate 스캔 (BackgroundTask). 자체 세션, best-effort. + + 업로드 직후엔 doc.embedding 이 아직 없을 수 있어(embed stage 미완) trigram 후보만 + 기록되는 경우가 많다 — non-gating. 어떤 예외도 업로드 결과(201)에 영향 주지 않는다. + 영속화는 보류(on-the-fly) — 현재는 로깅까지. /duplicates 의 near-dup 노출은 phase2. + """ + try: + async with async_session() as bg_session: + findings = await find_near_duplicates(bg_session, doc_id) + if findings: + top = findings[0] + logger.info( + "[dedup] near_dup_scan doc=%s candidates=%d top=%s(cosine=%s)", + doc_id, len(findings), top["doc_id"], top.get("cosine"), + ) + except Exception: + logger.warning("[dedup] near_dup_scan failed doc=%s", doc_id, exc_info=True) + + +def _parse_byte_range(range_header: str | None, size: int) -> tuple[int | None, int | None]: + """HTTP Range 헤더(`bytes=start-end`) 파싱 → (start, end) inclusive. 없거나 무효면 (None, None). + + D-2 원격 백엔드 Range pass-through 용 (local 은 FileResponse 가 자동 처리). suffix 형식 + (`bytes=-N`) 도 지원. 다중 range 는 첫 구간만. + """ + if not range_header or not range_header.startswith("bytes=") or size <= 0: + return None, None + spec = range_header[len("bytes="):].split(",")[0].strip() + if "-" not in spec: + return None, None + lo, hi = spec.split("-", 1) + try: + if lo == "": # suffix range: 마지막 N 바이트 + n = int(hi) + if n <= 0: + return None, None + return max(0, size - n), size - 1 + start = int(lo) + end = int(hi) if hi else size - 1 + except ValueError: + return None, None + if start > end or start >= size: + return None, None + return start, min(end, size - 1) + + # ─── 스키마 ─── @@ -543,6 +597,53 @@ async def list_documents( ) +# ─── 중복검사 (dedup) — B-2 ─── +# ★ 고정 path 라우트(/duplicates)는 동적 /{doc_id} 라우트보다 *위*에 등록해야 매칭 충돌이 없다. +class DuplicateGroup(BaseModel): + canonical_id: int + members: list[int] + reason: str + detail: str | None = None + + +class DuplicatesResponse(BaseModel): + groups: list[DuplicateGroup] + total_groups: int + total_duplicate_docs: int + + +@router.get("/duplicates", response_model=DuplicatesResponse) +async def list_duplicates( + user: Annotated[User, Depends(get_current_user)], + session: Annotated[AsyncSession, Depends(get_session)], +): + """content_hash(= file_hash exact) 중복 그룹 목록. + + OFF-whitelist(law_monitor) 제외 + deleted 제외. idx_documents_hash 재사용(신규 인덱스/테이블 불요). + near_duplicate(유사도 기반) 그룹은 영속화 보류 → S1 은 exact 그룹만 노출(계약 shape 동일, + detail 문구만 'file_hash' 기준). 응답 shape = ds-app contract `documents_duplicates.json`. + """ + rows = ( + await session.execute(DUPLICATE_GROUPS_SQL, {"off_channels": list(DEDUP_OFF_CHANNELS)}) + ).all() + + groups = [ + DuplicateGroup( + canonical_id=r.canonical_id, + members=list(r.members), + reason="content_hash", + detail="동일 file_hash (원본 바이트 SHA-256 일치)", + ) + for r in rows + ] + return DuplicatesResponse( + groups=groups, + total_groups=len(groups), + # 사본 수 = 그룹별 (멤버수-1) 합 (canonical 제외) — fixture total_duplicate_docs 정의와 동일. + total_duplicate_docs=sum(len(g.members) - 1 for g in groups), + ) + + @router.get("/{doc_id}", response_model=DocumentDetailResponse) async def get_document( doc_id: int, @@ -701,6 +802,7 @@ async def get_document_file( session: Annotated[AsyncSession, Depends(get_session)], token: str | None = Query(None, description="Bearer token (iframe용)"), download: bool = Query(False, description="true면 attachment (브라우저 다운로드)"), + range_header: str | None = Header(None, alias="Range"), user: User | None = Depends(lambda: None), ): """문서 원본 파일 서빙 (Bearer 헤더 또는 ?token= 쿼리 파라미터)""" @@ -723,9 +825,10 @@ async def get_document_file( if not doc.file_path: raise HTTPException(status_code=404, detail="파일이 없는 문서입니다 (메모)") - file_path = Path(settings.nas_mount_path) / doc.file_path - if not file_path.exists(): - raise HTTPException(status_code=404, detail="파일을 찾을 수 없습니다") + # D-2: 물리 경로 해석을 storage 백엔드로 단일화. local=FileResponse(Range 자동) / + # 원격=ABC.stream(range). /file URL·바디 shape 불변(non-breaking). 현재 활성 백엔드는 + # LocalBackend only 라 동작 변경 0. + backend = get_storage_backend() # 미디어 타입 매핑 # HTML5