daf6a0ade9
plan ds-s1-backend-1 잔여 구현 (A·C-1 은 16b0fe1):
- B 중복검사: services/dedup.py (OFF-list law_monitor 공용) + 업로드 채움(B-1)
+ GET /documents/duplicates(B-2) + post-upload near-dup 비동기(B-3)
+ backfill_dedup.py(B-4) + 야간 dedup_reconcile 잡(03:30 KST 멱등 재계산)
- C MD-first: marker_worker office/hwp 분기 _process_office(C-2) + md_status
상태머신 postcondition success|failed(C-5) + backfill_nonpdf_markdown.py(C-4)
+ requirements markitdown
- D 스토리지: services/storage ABC+Range 계약 / LocalBackend / NasApiBackend 503
(D-1) + /file resolver 경유, 로컬 동작 불변(D-2)
- E 운영: pre-change pg_dump + rollback_287.sql + apply runbook(E-3) + 테스트(E-1)
비파괴 불변식 유지(기존 응답 shape 무변경, md_status success→completed read-time 매핑).
어드버서리얼 리뷰 확정 1건(soft-delete canonical 승격 시 stale duplicate_of) → B-1
승격 정규화 + 야간 재계산으로 정합.
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
91 lines
4.0 KiB
Python
91 lines
4.0 KiB
Python
"""기존 file_hash 중복 그룹 backfill — plan ds-s1-backend-1 B-4.
|
|
|
|
목적:
|
|
A-1 migration 287 로 추가된 duplicate_of / duplicate_count 를 *기존* 중복 그룹에 채운다.
|
|
migration(단일 트랜잭션)과 분리한 별 배치(database.py:29-30 정책 — 대량 UPDATE 를
|
|
startup migration 에 넣지 않는다). 업로드 시점 채움(B-1)은 신규 행만 다루므로 과거는 이 스크립트.
|
|
|
|
판정:
|
|
- file_hash exact 그룹(OFF-whitelist=law_monitor 제외, deleted 제외, count>1).
|
|
near_duplicate 는 영속화 보류(on-the-fly) — 여기서 다루지 않는다.
|
|
- canonical = 그룹 최古(min id). canonical.duplicate_of=NULL, duplicate_count=group_size-1.
|
|
- 비-canonical 멤버 = duplicate_of=canonical, duplicate_count=0.
|
|
|
|
안전:
|
|
- 멱등 — 이미 목표값인 행은 UPDATE 안 함(재실행 안전). --dry-run 이 적용될 정확한 set 미리보기.
|
|
- --chunk(기본 500)행/txn 청크 커밋 — 28,941행 단일 트랜잭션 lock 회피.
|
|
|
|
실행:
|
|
docker compose exec fastapi python /app/scripts/backfill_dedup.py --dry-run
|
|
docker compose exec fastapi python /app/scripts/backfill_dedup.py --apply
|
|
# 변경 전 안전망은 E-3 pre-B-4 pg_dump (별 단계).
|
|
"""
|
|
|
|
import argparse
|
|
import asyncio
|
|
import os
|
|
import sys
|
|
|
|
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "app"))
|
|
|
|
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
|
|
|
|
from services.dedup import reconcile_dedup # 코어 재계산 (야간 잡과 공유)
|
|
|
|
|
|
async def run(*, apply: bool, chunk_size: int) -> int:
|
|
database_url = os.getenv(
|
|
"DATABASE_URL", "postgresql+asyncpg://pkm:pkm@localhost:5432/pkm"
|
|
)
|
|
engine = create_async_engine(database_url)
|
|
session_factory = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
|
|
|
|
try:
|
|
async with session_factory() as session:
|
|
result = await reconcile_dedup(session, apply=apply, chunk_size=chunk_size)
|
|
|
|
print(f"=== dedup 그룹 {result['groups']}개 · 관련 문서 {result['docs']}건 ===")
|
|
if result["groups"] == 0:
|
|
print("dedup 그룹 없음(OFF-whitelist 제외 후 count>1 없음) — 종료.")
|
|
return 0
|
|
|
|
already = result["docs"] - result["changes"]
|
|
print(f"변경 필요 {result['changes']}건 / 이미 목표값 {already}건 (멱등)")
|
|
if result["changes"] == 0:
|
|
print("모두 목표값 — 적용할 변경 없음.")
|
|
return 0
|
|
|
|
# 적용될/된 정확한 UPDATE set 미리보기 (상위 40건)
|
|
print("\n=== UPDATE set (id → duplicate_of / duplicate_count) ===")
|
|
for s in result["sample"]:
|
|
role = "canonical" if s["duplicate_of"] is None else f"dup→{s['duplicate_of']}"
|
|
print(
|
|
f" id={s['id']:>7} duplicate_of={s['duplicate_of']} "
|
|
f"duplicate_count={s['duplicate_count']} [{role}]"
|
|
)
|
|
if result["changes"] > len(result["sample"]):
|
|
print(f" ... 외 {result['changes'] - len(result['sample'])}건")
|
|
|
|
if not apply:
|
|
print(f"\n[dry-run] {result['changes']}건 변경 예정. --apply 로 실제 적용.")
|
|
else:
|
|
print(f"\n[apply] 완료 — {result['applied']}건 갱신.")
|
|
return 0
|
|
finally:
|
|
await engine.dispose()
|
|
|
|
|
|
def main() -> int:
|
|
parser = argparse.ArgumentParser(description=__doc__)
|
|
parser.add_argument("--apply", action="store_true", help="실제 적용 (기본 dry-run)")
|
|
parser.add_argument("--dry-run", action="store_true", help="명시적 dry-run (default 동등)")
|
|
parser.add_argument("--chunk", type=int, default=500, help="txn 당 UPDATE 행 수 (기본 500)")
|
|
args = parser.parse_args()
|
|
if args.apply and args.dry_run:
|
|
parser.error("--apply 와 --dry-run 동시 지정 불가")
|
|
return asyncio.run(run(apply=args.apply, chunk_size=args.chunk))
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|