Files
hyungi daf6a0ade9 feat(documents): S1 dedup·office-md·storage scaffold (B/C/D/E)
plan ds-s1-backend-1 잔여 구현 (A·C-1 은 16b0fe1):
- B 중복검사: services/dedup.py (OFF-list law_monitor 공용) + 업로드 채움(B-1)
  + GET /documents/duplicates(B-2) + post-upload near-dup 비동기(B-3)
  + backfill_dedup.py(B-4) + 야간 dedup_reconcile 잡(03:30 KST 멱등 재계산)
- C MD-first: marker_worker office/hwp 분기 _process_office(C-2) + md_status
  상태머신 postcondition success|failed(C-5) + backfill_nonpdf_markdown.py(C-4)
  + requirements markitdown
- D 스토리지: services/storage ABC+Range 계약 / LocalBackend / NasApiBackend 503
  (D-1) + /file resolver 경유, 로컬 동작 불변(D-2)
- E 운영: pre-change pg_dump + rollback_287.sql + apply runbook(E-3) + 테스트(E-1)

비파괴 불변식 유지(기존 응답 shape 무변경, md_status success→completed read-time 매핑).
어드버서리얼 리뷰 확정 1건(soft-delete canonical 승격 시 stale duplicate_of) → B-1
승격 정규화 + 야간 재계산으로 정합.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-08 03:05:30 +00:00

240 lines
9.7 KiB
Python

"""중복검사(dedup) 공용 로직 — plan ds-s1-backend-1 B 그룹.
세 소비처가 공유:
- B-1 업로드 채움 (api/documents.upload_document) → find_canonical_for_hash
- B-2 GET /documents/duplicates → DEDUP_OFF_CHANNELS (그룹 SQL 은 라우터에)
- B-4 backfill (scripts/backfill_dedup.py) → DEDUP_OFF_CHANNELS / canonical = min(id)
- B-3 near_duplicate → find_near_duplicates
OFF-whitelist (DEDUP_OFF_CHANNELS):
law_monitor = 법령 개정본을 의도적으로 별 행으로 보존(개정일 추적). file_hash 가 같아도
collapse 하면 개정 이력이 사라지므로 dedup 비참여. (P0-2 실측: dup 18그룹/36행 중
law_monitor 17그룹 = 의도된 개정 보존, manual 1그룹 = 진짜 content dedup.)
file_hash 는 이미 채널별 키를 인코딩(note=본문SHA / devonagent=URL / news=article_id)하므로
채널별 키 분기는 두지 않고 단일 OFF-list 만 데이터로 둔다(P0-2 결정).
near_duplicate (B-3):
title trigram 후보 → 후보에만 doc-level embedding 코사인 rerank. 전수 28.9k 임베딩 스캔 회피.
저장된 embedding read-only(검색실험 Soft Lock: 재생성 금지). 임계·결과는 전부 non-gating 기록값
(trigram-first recall gap = 본문동일·제목상이 near-dup 은 놓침 → phase2 ivfflat 회수 대상).
영속화는 보류(on-the-fly) — S1 은 helper + 호출부 로깅까지. duplicate_of 영속화는 exact(file_hash)만.
"""
from __future__ import annotations
import logging
from sqlalchemy import bindparam, or_, select, text
from sqlalchemy.ext.asyncio import AsyncSession
logger = logging.getLogger(__name__)
# file_hash dedup 제외 채널 (단일 OFF-whitelist). B-1/B-2/B-4 공용.
DEDUP_OFF_CHANNELS: tuple[str, ...] = ("law_monitor",)
# near_duplicate 파라미터 — 전부 기록값·non-gating (phase2 ivfflat 가 recall gap 회수).
NEAR_DUP_TRGM_THRESHOLD = 0.30 # pg_trgm title 후보 컷 (느슨 — 후보 생성용)
NEAR_DUP_COSINE_THRESHOLD = 0.95 # 후보 embedding 코사인 near-dup 판정 컷 (≈0.95~0.97)
NEAR_DUP_MAX_CANDIDATES = 50 # trigram 후보 상한 — 전수 임베딩 스캔 회피
async def find_canonical_for_hash(
session: AsyncSession, file_hash: str, *, exclude_id: int | None = None
):
"""주어진 file_hash 의 canonical 문서(가장 오래된 = min id)를 반환. 없으면 None.
OFF-whitelist 채널(law_monitor)은 canonical 후보에서 제외 → 업로드가 법령 개정본에
링크되지 않는다. exclude_id = 방금 INSERT 한 신규 행 자신 제외(B-1).
"""
from models.document import Document # 지연 import (순환 회피)
stmt = (
select(Document)
.where(
Document.file_hash == file_hash,
Document.deleted_at.is_(None),
or_(
Document.source_channel.is_(None),
Document.source_channel.notin_(DEDUP_OFF_CHANNELS),
),
)
.order_by(Document.id.asc())
)
if exclude_id is not None:
stmt = stmt.where(Document.id != exclude_id)
return (await session.execute(stmt)).scalars().first()
# B-2 /documents/duplicates 의 file_hash 그룹 SQL. 라우터가 직접 execute (Pydantic 응답은 라우터에).
# reason='content_hash' = file_hash exact 그룹(idx_documents_hash 재사용, 신규 인덱스/테이블 불요).
# canonical_id = min(id), members = id 오름차순 배열, n = 그룹 크기.
DUPLICATE_GROUPS_SQL = text(
"""
SELECT file_hash,
min(id) AS canonical_id,
array_agg(id ORDER BY id) AS members,
count(*) AS n
FROM documents
WHERE deleted_at IS NULL
AND file_hash IS NOT NULL
AND (source_channel IS NULL OR source_channel NOT IN :off_channels)
GROUP BY file_hash
HAVING count(*) > 1
ORDER BY min(id)
"""
).bindparams(bindparam("off_channels", expanding=True))
async def reconcile_dedup(
session: AsyncSession, *, apply: bool = True, chunk_size: int = 500, sample_size: int = 40
) -> dict:
"""file_hash exact 그룹의 duplicate_of/duplicate_count 를 재계산해 정합화 (B-4 코어).
멱등 — 목표값과 다른 행만 UPDATE. 야간 잡(workers.dedup_reconcile)과 backfill 스크립트가
공유한다. 문서는 soft-delete only(FK ON DELETE SET NULL 미발화) → 비정규화 dedup 컬럼이
삭제 시 드리프트(멤버의 stale 포인터·canonical overcount)하므로 절대 재계산이 정합 보장.
반환 = {groups, docs, changes, applied, sample}. sample = 적용될/된 변경 미리보기(최대 sample_size).
canonical = 그룹 최古(min id): duplicate_of=NULL, duplicate_count=group_size-1. 멤버: duplicate_of=canonical, count=0.
"""
groups = (
await session.execute(
DUPLICATE_GROUPS_SQL, {"off_channels": list(DEDUP_OFF_CHANNELS)}
)
).all()
desired: dict[int, tuple[int | None, int]] = {}
for g in groups:
members = list(g.members)
canonical = g.canonical_id
desired[canonical] = (None, len(members) - 1)
for m in members:
if m != canonical:
desired[m] = (canonical, 0)
if not desired:
return {"groups": 0, "docs": 0, "changes": 0, "applied": 0, "sample": []}
ids = list(desired.keys())
current: dict[int, tuple[int | None, int]] = {}
for i in range(0, len(ids), 1000):
batch = ids[i : i + 1000]
rows = (
await session.execute(
text(
"SELECT id, duplicate_of, duplicate_count "
"FROM documents WHERE id = ANY(:ids)"
).bindparams(ids=batch)
)
).all()
for r in rows:
current[r.id] = (r.duplicate_of, int(r.duplicate_count or 0))
changes = [
(i, dof, dcnt)
for i, (dof, dcnt) in desired.items()
if current.get(i) != (dof, dcnt)
]
sample = [
{"id": i, "duplicate_of": dof, "duplicate_count": dcnt}
for (i, dof, dcnt) in changes[:sample_size]
]
applied = 0
if apply and changes:
for i in range(0, len(changes), chunk_size):
for did, dof, dcnt in changes[i : i + chunk_size]:
await session.execute(
text(
"UPDATE documents SET duplicate_of = :dof, duplicate_count = :dcnt "
"WHERE id = :id"
).bindparams(dof=dof, dcnt=dcnt, id=did)
)
await session.commit()
applied += len(changes[i : i + chunk_size])
return {
"groups": len(groups),
"docs": len(ids),
"changes": len(changes),
"applied": applied,
"sample": sample,
}
async def find_near_duplicates(
session: AsyncSession,
doc_id: int,
*,
cosine_threshold: float = NEAR_DUP_COSINE_THRESHOLD,
trgm_threshold: float = NEAR_DUP_TRGM_THRESHOLD,
max_candidates: int = NEAR_DUP_MAX_CANDIDATES,
) -> list[dict]:
"""anchor doc 의 near-duplicate 후보를 trigram→embedding 2단계로 찾는다(read-only).
반환 = [{doc_id, title, title_sim?, cosine}] (cosine 내림차순). embedding 미생성 시
(업로드 직후 흔함) trigram 후보만 cosine=None 으로 반환(non-gating 기록). 어떤 행도
수정/삭제하지 않으며 저장된 embedding 만 읽는다(Soft Lock 준수).
"""
anchor = (
await session.execute(
text(
"SELECT id, title, (embedding IS NOT NULL) AS has_emb "
"FROM documents WHERE id = :id AND deleted_at IS NULL"
).bindparams(id=doc_id)
)
).first()
if anchor is None or not anchor.title:
return []
# (1) title trigram 후보. similarity() 컷으로 후보를 max_candidates 로 줄여 전수 임베딩
# 스캔을 회피한다. (index-accelerated `%` 연산자 경로는 후보 생성이 병목이 될 때의
# phase2 최적화 — 짧은 title 28.9k seq 평가는 비동기 post-upload 에서 충분히 저렴.)
cand_rows = (
await session.execute(
text(
"""
SELECT id, title, similarity(title, :t) AS title_sim
FROM documents
WHERE id <> :id
AND deleted_at IS NULL
AND title IS NOT NULL
AND similarity(title, :t) >= :trgm
ORDER BY similarity(title, :t) DESC
LIMIT :lim
"""
).bindparams(id=doc_id, t=anchor.title, trgm=trgm_threshold, lim=max_candidates)
)
).all()
if not cand_rows:
return []
if not anchor.has_emb:
# 임베딩 미생성 — 후보만 기록(cosine rerank 는 embed stage 완료 후). non-gating.
return [
{"doc_id": r.id, "title": r.title, "title_sim": float(r.title_sim), "cosine": None}
for r in cand_rows
]
# (2) 후보에만 doc-level embedding 코사인 rerank. 저장값 read-only.
cand_ids = [r.id for r in cand_rows]
rer = (
await session.execute(
text(
"""
SELECT c.id, c.title,
(1 - (c.embedding <=> (SELECT embedding FROM documents WHERE id = :id))) AS cosine
FROM documents c
WHERE c.id = ANY(:ids) AND c.embedding IS NOT NULL
"""
).bindparams(id=doc_id, ids=cand_ids)
)
).all()
out = [
{"doc_id": r.id, "title": r.title, "cosine": float(r.cosine)}
for r in rer
if r.cosine is not None and float(r.cosine) >= cosine_threshold
]
out.sort(key=lambda x: x["cosine"], reverse=True)
return out