"""중복검사(dedup) 공용 로직 — plan ds-s1-backend-1 B 그룹. 세 소비처가 공유: - B-1 업로드 채움 (api/documents.upload_document) → find_canonical_for_hash - B-2 GET /documents/duplicates → DEDUP_OFF_CHANNELS (그룹 SQL 은 라우터에) - B-4 backfill (scripts/backfill_dedup.py) → DEDUP_OFF_CHANNELS / canonical = min(id) - B-3 near_duplicate → find_near_duplicates OFF-whitelist (DEDUP_OFF_CHANNELS): law_monitor = 법령 개정본을 의도적으로 별 행으로 보존(개정일 추적). file_hash 가 같아도 collapse 하면 개정 이력이 사라지므로 dedup 비참여. (P0-2 실측: dup 18그룹/36행 중 law_monitor 17그룹 = 의도된 개정 보존, manual 1그룹 = 진짜 content dedup.) file_hash 는 이미 채널별 키를 인코딩(note=본문SHA / devonagent=URL / news=article_id)하므로 채널별 키 분기는 두지 않고 단일 OFF-list 만 데이터로 둔다(P0-2 결정). near_duplicate (B-3): title trigram 후보 → 후보에만 doc-level embedding 코사인 rerank. 전수 28.9k 임베딩 스캔 회피. 저장된 embedding read-only(검색실험 Soft Lock: 재생성 금지). 임계·결과는 전부 non-gating 기록값 (trigram-first recall gap = 본문동일·제목상이 near-dup 은 놓침 → phase2 ivfflat 회수 대상). 영속화는 보류(on-the-fly) — S1 은 helper + 호출부 로깅까지. duplicate_of 영속화는 exact(file_hash)만. """ from __future__ import annotations import logging from sqlalchemy import bindparam, or_, select, text from sqlalchemy.ext.asyncio import AsyncSession logger = logging.getLogger(__name__) # file_hash dedup 제외 채널 (단일 OFF-whitelist). B-1/B-2/B-4 공용. DEDUP_OFF_CHANNELS: tuple[str, ...] = ("law_monitor",) # near_duplicate 파라미터 — 전부 기록값·non-gating (phase2 ivfflat 가 recall gap 회수). NEAR_DUP_TRGM_THRESHOLD = 0.30 # pg_trgm title 후보 컷 (느슨 — 후보 생성용) NEAR_DUP_COSINE_THRESHOLD = 0.95 # 후보 embedding 코사인 near-dup 판정 컷 (≈0.95~0.97) NEAR_DUP_MAX_CANDIDATES = 50 # trigram 후보 상한 — 전수 임베딩 스캔 회피 async def find_canonical_for_hash( session: AsyncSession, file_hash: str, *, exclude_id: int | None = None ): """주어진 file_hash 의 canonical 문서(가장 오래된 = min id)를 반환. 없으면 None. OFF-whitelist 채널(law_monitor)은 canonical 후보에서 제외 → 업로드가 법령 개정본에 링크되지 않는다. exclude_id = 방금 INSERT 한 신규 행 자신 제외(B-1). """ from models.document import Document # 지연 import (순환 회피) stmt = ( select(Document) .where( Document.file_hash == file_hash, Document.deleted_at.is_(None), or_( Document.source_channel.is_(None), Document.source_channel.notin_(DEDUP_OFF_CHANNELS), ), ) .order_by(Document.id.asc()) ) if exclude_id is not None: stmt = stmt.where(Document.id != exclude_id) return (await session.execute(stmt)).scalars().first() # B-2 /documents/duplicates 의 file_hash 그룹 SQL. 라우터가 직접 execute (Pydantic 응답은 라우터에). # reason='content_hash' = file_hash exact 그룹(idx_documents_hash 재사용, 신규 인덱스/테이블 불요). # canonical_id = min(id), members = id 오름차순 배열, n = 그룹 크기. DUPLICATE_GROUPS_SQL = text( """ SELECT file_hash, min(id) AS canonical_id, array_agg(id ORDER BY id) AS members, count(*) AS n FROM documents WHERE deleted_at IS NULL AND file_hash IS NOT NULL AND (source_channel IS NULL OR source_channel NOT IN :off_channels) GROUP BY file_hash HAVING count(*) > 1 ORDER BY min(id) """ ).bindparams(bindparam("off_channels", expanding=True)) async def reconcile_dedup( session: AsyncSession, *, apply: bool = True, chunk_size: int = 500, sample_size: int = 40 ) -> dict: """file_hash exact 그룹의 duplicate_of/duplicate_count 를 재계산해 정합화 (B-4 코어). 멱등 — 목표값과 다른 행만 UPDATE. 야간 잡(workers.dedup_reconcile)과 backfill 스크립트가 공유한다. 문서는 soft-delete only(FK ON DELETE SET NULL 미발화) → 비정규화 dedup 컬럼이 삭제 시 드리프트(멤버의 stale 포인터·canonical overcount)하므로 절대 재계산이 정합 보장. 반환 = {groups, docs, changes, applied, sample}. sample = 적용될/된 변경 미리보기(최대 sample_size). canonical = 그룹 최古(min id): duplicate_of=NULL, duplicate_count=group_size-1. 멤버: duplicate_of=canonical, count=0. """ groups = ( await session.execute( DUPLICATE_GROUPS_SQL, {"off_channels": list(DEDUP_OFF_CHANNELS)} ) ).all() desired: dict[int, tuple[int | None, int]] = {} for g in groups: members = list(g.members) canonical = g.canonical_id desired[canonical] = (None, len(members) - 1) for m in members: if m != canonical: desired[m] = (canonical, 0) if not desired: return {"groups": 0, "docs": 0, "changes": 0, "applied": 0, "sample": []} ids = list(desired.keys()) current: dict[int, tuple[int | None, int]] = {} for i in range(0, len(ids), 1000): batch = ids[i : i + 1000] rows = ( await session.execute( text( "SELECT id, duplicate_of, duplicate_count " "FROM documents WHERE id = ANY(:ids)" ).bindparams(ids=batch) ) ).all() for r in rows: current[r.id] = (r.duplicate_of, int(r.duplicate_count or 0)) changes = [ (i, dof, dcnt) for i, (dof, dcnt) in desired.items() if current.get(i) != (dof, dcnt) ] sample = [ {"id": i, "duplicate_of": dof, "duplicate_count": dcnt} for (i, dof, dcnt) in changes[:sample_size] ] applied = 0 if apply and changes: for i in range(0, len(changes), chunk_size): for did, dof, dcnt in changes[i : i + chunk_size]: await session.execute( text( "UPDATE documents SET duplicate_of = :dof, duplicate_count = :dcnt " "WHERE id = :id" ).bindparams(dof=dof, dcnt=dcnt, id=did) ) await session.commit() applied += len(changes[i : i + chunk_size]) return { "groups": len(groups), "docs": len(ids), "changes": len(changes), "applied": applied, "sample": sample, } async def find_near_duplicates( session: AsyncSession, doc_id: int, *, cosine_threshold: float = NEAR_DUP_COSINE_THRESHOLD, trgm_threshold: float = NEAR_DUP_TRGM_THRESHOLD, max_candidates: int = NEAR_DUP_MAX_CANDIDATES, ) -> list[dict]: """anchor doc 의 near-duplicate 후보를 trigram→embedding 2단계로 찾는다(read-only). 반환 = [{doc_id, title, title_sim?, cosine}] (cosine 내림차순). embedding 미생성 시 (업로드 직후 흔함) trigram 후보만 cosine=None 으로 반환(non-gating 기록). 어떤 행도 수정/삭제하지 않으며 저장된 embedding 만 읽는다(Soft Lock 준수). """ anchor = ( await session.execute( text( "SELECT id, title, (embedding IS NOT NULL) AS has_emb " "FROM documents WHERE id = :id AND deleted_at IS NULL" ).bindparams(id=doc_id) ) ).first() if anchor is None or not anchor.title: return [] # (1) title trigram 후보. similarity() 컷으로 후보를 max_candidates 로 줄여 전수 임베딩 # 스캔을 회피한다. (index-accelerated `%` 연산자 경로는 후보 생성이 병목이 될 때의 # phase2 최적화 — 짧은 title 28.9k seq 평가는 비동기 post-upload 에서 충분히 저렴.) cand_rows = ( await session.execute( text( """ SELECT id, title, similarity(title, :t) AS title_sim FROM documents WHERE id <> :id AND deleted_at IS NULL AND title IS NOT NULL AND similarity(title, :t) >= :trgm ORDER BY similarity(title, :t) DESC LIMIT :lim """ ).bindparams(id=doc_id, t=anchor.title, trgm=trgm_threshold, lim=max_candidates) ) ).all() if not cand_rows: return [] if not anchor.has_emb: # 임베딩 미생성 — 후보만 기록(cosine rerank 는 embed stage 완료 후). non-gating. return [ {"doc_id": r.id, "title": r.title, "title_sim": float(r.title_sim), "cosine": None} for r in cand_rows ] # (2) 후보에만 doc-level embedding 코사인 rerank. 저장값 read-only. cand_ids = [r.id for r in cand_rows] rer = ( await session.execute( text( """ SELECT c.id, c.title, (1 - (c.embedding <=> (SELECT embedding FROM documents WHERE id = :id))) AS cosine FROM documents c WHERE c.id = ANY(:ids) AND c.embedding IS NOT NULL """ ).bindparams(id=doc_id, ids=cand_ids) ) ).all() out = [ {"doc_id": r.id, "title": r.title, "cosine": float(r.cosine)} for r in rer if r.cosine is not None and float(r.cosine) >= cosine_threshold ] out.sort(key=lambda x: x["cosine"], reverse=True) return out