feat(documents): S1 dedup·office-md·storage scaffold (B/C/D/E)

plan ds-s1-backend-1 잔여 구현 (A·C-1 은 16b0fe1):
- B 중복검사: services/dedup.py (OFF-list law_monitor 공용) + 업로드 채움(B-1)
  + GET /documents/duplicates(B-2) + post-upload near-dup 비동기(B-3)
  + backfill_dedup.py(B-4) + 야간 dedup_reconcile 잡(03:30 KST 멱등 재계산)
- C MD-first: marker_worker office/hwp 분기 _process_office(C-2) + md_status
  상태머신 postcondition success|failed(C-5) + backfill_nonpdf_markdown.py(C-4)
  + requirements markitdown
- D 스토리지: services/storage ABC+Range 계약 / LocalBackend / NasApiBackend 503
  (D-1) + /file resolver 경유, 로컬 동작 불변(D-2)
- E 운영: pre-change pg_dump + rollback_287.sql + apply runbook(E-3) + 테스트(E-1)

비파괴 불변식 유지(기존 응답 shape 무변경, md_status success→completed read-time 매핑).
어드버서리얼 리뷰 확정 1건(soft-delete canonical 승격 시 stale duplicate_of) → B-1
승격 정규화 + 야간 재계산으로 정합.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
hyungi
2026-06-05 07:13:21 +09:00
parent 68e2d7ea04
commit daf6a0ade9
17 changed files with 1157 additions and 16 deletions
+165 -9
View File
@@ -21,7 +21,7 @@ from fastapi import (
UploadFile,
status,
)
from fastapi.responses import FileResponse
from fastapi.responses import FileResponse, StreamingResponse
from pydantic import BaseModel, field_validator
from sqlalchemy import func, select
from sqlalchemy.ext.asyncio import AsyncSession
@@ -30,12 +30,19 @@ from starlette.requests import ClientDisconnect
from ai.client import AIClient, _load_prompt, parse_json_response
from core.auth import get_current_user
from core.config import settings
from core.database import get_session
from core.database import async_session, get_session
from core.utils import file_hash
from models.document import Document
from models.document_image import DocumentImage
from models.queue import ProcessingQueue, enqueue_stage
from models.user import User
from services.dedup import (
DUPLICATE_GROUPS_SQL,
DEDUP_OFF_CHANNELS,
find_canonical_for_hash,
find_near_duplicates,
)
from services.storage import StorageNotConfigured, get_storage_backend
from services.document_telemetry import record_analyze_event, sanitize_source
from services.prompt_versions import ANALYZE_PROMPT_VERSION, resolve_primary_model
from services.search.llm_gate import Priority, acquire_mlx_gate
@@ -62,6 +69,53 @@ def _upload_error(status_code: int, error_code: str, message: str) -> HTTPExcept
)
async def _near_dup_scan_bg(doc_id: int) -> None:
"""B-3: post-upload near_duplicate 스캔 (BackgroundTask). 자체 세션, best-effort.
업로드 직후엔 doc.embedding 이 아직 없을 수 있어(embed stage 미완) trigram 후보만
기록되는 경우가 많다 — non-gating. 어떤 예외도 업로드 결과(201)에 영향 주지 않는다.
영속화는 보류(on-the-fly) — 현재는 로깅까지. /duplicates 의 near-dup 노출은 phase2.
"""
try:
async with async_session() as bg_session:
findings = await find_near_duplicates(bg_session, doc_id)
if findings:
top = findings[0]
logger.info(
"[dedup] near_dup_scan doc=%s candidates=%d top=%s(cosine=%s)",
doc_id, len(findings), top["doc_id"], top.get("cosine"),
)
except Exception:
logger.warning("[dedup] near_dup_scan failed doc=%s", doc_id, exc_info=True)
def _parse_byte_range(range_header: str | None, size: int) -> tuple[int | None, int | None]:
"""HTTP Range 헤더(`bytes=start-end`) 파싱 → (start, end) inclusive. 없거나 무효면 (None, None).
D-2 원격 백엔드 Range pass-through 용 (local 은 FileResponse 가 자동 처리). suffix 형식
(`bytes=-N`) 도 지원. 다중 range 는 첫 구간만.
"""
if not range_header or not range_header.startswith("bytes=") or size <= 0:
return None, None
spec = range_header[len("bytes="):].split(",")[0].strip()
if "-" not in spec:
return None, None
lo, hi = spec.split("-", 1)
try:
if lo == "": # suffix range: 마지막 N 바이트
n = int(hi)
if n <= 0:
return None, None
return max(0, size - n), size - 1
start = int(lo)
end = int(hi) if hi else size - 1
except ValueError:
return None, None
if start > end or start >= size:
return None, None
return start, min(end, size - 1)
# ─── 스키마 ───
@@ -543,6 +597,53 @@ async def list_documents(
)
# ─── 중복검사 (dedup) — B-2 ───
# ★ 고정 path 라우트(/duplicates)는 동적 /{doc_id} 라우트보다 *위*에 등록해야 매칭 충돌이 없다.
class DuplicateGroup(BaseModel):
canonical_id: int
members: list[int]
reason: str
detail: str | None = None
class DuplicatesResponse(BaseModel):
groups: list[DuplicateGroup]
total_groups: int
total_duplicate_docs: int
@router.get("/duplicates", response_model=DuplicatesResponse)
async def list_duplicates(
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
):
"""content_hash(= file_hash exact) 중복 그룹 목록.
OFF-whitelist(law_monitor) 제외 + deleted 제외. idx_documents_hash 재사용(신규 인덱스/테이블 불요).
near_duplicate(유사도 기반) 그룹은 영속화 보류 → S1 은 exact 그룹만 노출(계약 shape 동일,
detail 문구만 'file_hash' 기준). 응답 shape = ds-app contract `documents_duplicates.json`.
"""
rows = (
await session.execute(DUPLICATE_GROUPS_SQL, {"off_channels": list(DEDUP_OFF_CHANNELS)})
).all()
groups = [
DuplicateGroup(
canonical_id=r.canonical_id,
members=list(r.members),
reason="content_hash",
detail="동일 file_hash (원본 바이트 SHA-256 일치)",
)
for r in rows
]
return DuplicatesResponse(
groups=groups,
total_groups=len(groups),
# 사본 수 = 그룹별 (멤버수-1) 합 (canonical 제외) — fixture total_duplicate_docs 정의와 동일.
total_duplicate_docs=sum(len(g.members) - 1 for g in groups),
)
@router.get("/{doc_id}", response_model=DocumentDetailResponse)
async def get_document(
doc_id: int,
@@ -701,6 +802,7 @@ async def get_document_file(
session: Annotated[AsyncSession, Depends(get_session)],
token: str | None = Query(None, description="Bearer token (iframe용)"),
download: bool = Query(False, description="true면 attachment (브라우저 다운로드)"),
range_header: str | None = Header(None, alias="Range"),
user: User | None = Depends(lambda: None),
):
"""문서 원본 파일 서빙 (Bearer 헤더 또는 ?token= 쿼리 파라미터)"""
@@ -723,9 +825,10 @@ async def get_document_file(
if not doc.file_path:
raise HTTPException(status_code=404, detail="파일이 없는 문서입니다 (메모)")
file_path = Path(settings.nas_mount_path) / doc.file_path
if not file_path.exists():
raise HTTPException(status_code=404, detail="파일을 찾을 수 없습니다")
# D-2: 물리 경로 해석을 storage 백엔드로 단일화. local=FileResponse(Range 자동) /
# 원격=ABC.stream(range). /file URL·바디 shape 불변(non-breaking). 현재 활성 백엔드는
# LocalBackend only 라 동작 변경 0.
backend = get_storage_backend()
# 미디어 타입 매핑
# HTML5 <audio>/<video> 직접 재생을 위해 audio/video mime 포함. Starlette
@@ -746,7 +849,7 @@ async def get_document_file(
# 비디오 — direct play 호환 (§3 최소판)
".mp4": "video/mp4", ".webm": "video/webm",
}
suffix = file_path.suffix.lower()
suffix = Path(doc.file_path).suffix.lower()
media_type = media_types.get(suffix, "application/octet-stream")
# Content-Disposition: download=true면 attachment (한글 filename* 호환)
@@ -758,10 +861,40 @@ async def get_document_file(
else:
disposition = "inline"
return FileResponse(
path=str(file_path),
# 로컬 백엔드: 기존과 동일하게 FileResponse (Range 자동 처리).
if backend.is_local:
local = backend.local_path(doc.file_path)
if local is None or not Path(local).exists():
raise HTTPException(status_code=404, detail="파일을 찾을 수 없습니다")
return FileResponse(
path=str(local),
media_type=media_type,
headers={"Content-Disposition": disposition},
)
# 원격 백엔드: D-1 ABC 의 Range pass-through. 미프로비전 백엔드는 stat() 가
# StorageNotConfigured → 503 (silent fallback 금지). 현재 LocalBackend only 라 미도달.
try:
st = await backend.stat(doc.file_path)
except StorageNotConfigured as exc:
raise HTTPException(status_code=503, detail=str(exc))
if not st.exists:
raise HTTPException(status_code=404, detail="파일을 찾을 수 없습니다")
start, end = _parse_byte_range(range_header, st.size)
headers = {"Content-Disposition": disposition, "Accept-Ranges": "bytes"}
if start is None:
headers["Content-Length"] = str(st.size)
status_code = 200
else:
headers["Content-Range"] = f"bytes {start}-{end}/{st.size}"
headers["Content-Length"] = str(end - start + 1)
status_code = 206
return StreamingResponse(
backend.stream(doc.file_path, start=start, end=end),
status_code=status_code,
media_type=media_type,
headers={"Content-Disposition": disposition},
headers=headers,
)
@@ -822,6 +955,7 @@ async def get_document_image_raw(
async def upload_document(
request: Request,
file: UploadFile,
background_tasks: BackgroundTasks,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
doc_purpose: str | None = Form(None, description="business | knowledge"),
@@ -973,6 +1107,9 @@ async def upload_document(
file_size=written,
file_type="immutable",
title=target.stem,
# B-1: 업로드 원본 파일명(다운로드 라벨용). file_path 는 충돌 시 _N 리네임되므로
# 원본명을 별도 보존. safe_name = Path(file.filename).name (경로 이탈 제거된 basename).
original_filename=safe_name,
source_channel="manual",
doc_purpose=doc_purpose,
user_tags=[library_tag] if library_tag else [],
@@ -983,6 +1120,22 @@ async def upload_document(
)
session.add(doc)
await session.flush()
# B-1: file_hash exact 중복 채움 (OFF-whitelist=law_monitor 제외). 거부(409) 아님 —
# 허용 + duplicate_of 링크 + canonical duplicate_count++ (법령 의도적 중복 보존 정책).
# 홈랩 저동시성이라 동시 동일-hash 업로드 TOCTOU 는 멱등/B-4 backfill 로 수습(락 불요).
canonical = await find_canonical_for_hash(session, fhash, exclude_id=doc.id)
if canonical is not None:
# 원래 canonical 이 soft-delete(deleted_at) 되어 former member 가 승격되면, 그 survivor 의
# stale duplicate_of 를 비워 'member 이자 counter' 모순을 막는다(B-4 불변식 유지). 문서는
# soft-delete only 라 FK ON DELETE SET NULL 이 발화하지 않아 잔여가 남기 때문(리뷰 발견).
# (삭제된 canonical 을 가리키는 다른 sibling 멤버의 잔여 포인터·overcount 는 야간
# dedup_reconcile 잡(B-4, 03:30 KST 멱등 절대 재계산)이 정리.)
if canonical.duplicate_of is not None:
canonical.duplicate_of = None
doc.duplicate_of = canonical.id
canonical.duplicate_count = (canonical.duplicate_count or 0) + 1
# document + processing_queue 는 단일 트랜잭션으로 묶어 원자적 정리
await enqueue_stage(session, doc.id, "extract")
await session.commit()
@@ -992,6 +1145,9 @@ async def upload_document(
target.unlink(missing_ok=True)
raise
# B-3: near_duplicate 스캔은 post-upload 비동기 — 201 응답을 막지 않는다(non-gating 기록).
background_tasks.add_task(_near_dup_scan_bg, doc.id)
return DocumentResponse.model_validate(doc)
+4
View File
@@ -48,6 +48,7 @@ async def lifespan(app: FastAPI):
from services.search.query_analyzer import prewarm_analyzer
from workers.briefing_worker import run as morning_briefing_run
from workers.daily_digest import run as daily_digest_run
from workers.dedup_reconcile import run as dedup_reconcile_run
from workers.digest_worker import run as global_digest_run
from workers.file_watcher import watch_inbox
from workers.law_monitor import run as law_monitor_run
@@ -120,6 +121,9 @@ async def lifespan(app: FastAPI):
# 이드 W3-2: 공부중 토픽 약점 derived 스냅샷 (nightly 04:30 KST, LLM 0). study_diagnosis 표면 source.
scheduler.add_job(study_weakness_run, CronTrigger(hour=4, minute=30, timezone=KST), id="study_weakness")
scheduler.add_job(news_collector_run, "interval", hours=6, id="news_collector")
# plan ds-s1-backend-1 B-4: dedup 컬럼(duplicate_of/duplicate_count) 야간 절대 재계산.
# soft-delete 잔여 드리프트 정리(멱등, 드리프트 없으면 no-op). cron 03:30 (다른 잡과 비충돌).
scheduler.add_job(dedup_reconcile_run, CronTrigger(hour=3, minute=30, timezone=KST), id="dedup_reconcile")
scheduler.start()
# Phase 2.1 (async 구조): QueryAnalyzer prewarm.
+3
View File
@@ -21,3 +21,6 @@ pymupdf>=1.24.0
trafilatura>=1.12.0
readability-lxml>=0.8.1
markdownify>=0.13.1
# office OOXML(docx/xlsx/pptx) → md (plan ds-s1-backend-1 C-1). hwp 는 LibreOffice+markdownify 경로.
# 정확한 핀은 E-1 markitdown OOXML PoC(devsbx/버전핀 컨텍스트)에서 확정.
markitdown[docx,xlsx,pptx]>=0.1.0
+239
View File
@@ -0,0 +1,239 @@
"""중복검사(dedup) 공용 로직 — plan ds-s1-backend-1 B 그룹.
소비처가 공유:
- B-1 업로드 채움 (api/documents.upload_document) find_canonical_for_hash
- B-2 GET /documents/duplicates DEDUP_OFF_CHANNELS (그룹 SQL 라우터에)
- B-4 backfill (scripts/backfill_dedup.py) DEDUP_OFF_CHANNELS / canonical = min(id)
- B-3 near_duplicate find_near_duplicates
OFF-whitelist (DEDUP_OFF_CHANNELS):
law_monitor = 법령 개정본을 의도적으로 행으로 보존(개정일 추적). file_hash 같아도
collapse 하면 개정 이력이 사라지므로 dedup 비참여. (P0-2 실측: dup 18그룹/36
law_monitor 17그룹 = 의도된 개정 보존, manual 1그룹 = 진짜 content dedup.)
file_hash 이미 채널별 키를 인코딩(note=본문SHA / devonagent=URL / news=article_id)하므로
채널별 분기는 두지 않고 단일 OFF-list 데이터로 둔다(P0-2 결정).
near_duplicate (B-3):
title trigram 후보 후보에만 doc-level embedding 코사인 rerank. 전수 28.9k 임베딩 스캔 회피.
저장된 embedding read-only(검색실험 Soft Lock: 재생성 금지). 임계·결과는 전부 non-gating 기록값
(trigram-first recall gap = 본문동일·제목상이 near-dup 놓침 phase2 ivfflat 회수 대상).
영속화는 보류(on-the-fly) S1 helper + 호출부 로깅까지. duplicate_of 영속화는 exact(file_hash).
"""
from __future__ import annotations
import logging
from sqlalchemy import bindparam, or_, select, text
from sqlalchemy.ext.asyncio import AsyncSession
logger = logging.getLogger(__name__)
# file_hash dedup 제외 채널 (단일 OFF-whitelist). B-1/B-2/B-4 공용.
DEDUP_OFF_CHANNELS: tuple[str, ...] = ("law_monitor",)
# near_duplicate 파라미터 — 전부 기록값·non-gating (phase2 ivfflat 가 recall gap 회수).
NEAR_DUP_TRGM_THRESHOLD = 0.30 # pg_trgm title 후보 컷 (느슨 — 후보 생성용)
NEAR_DUP_COSINE_THRESHOLD = 0.95 # 후보 embedding 코사인 near-dup 판정 컷 (≈0.95~0.97)
NEAR_DUP_MAX_CANDIDATES = 50 # trigram 후보 상한 — 전수 임베딩 스캔 회피
async def find_canonical_for_hash(
session: AsyncSession, file_hash: str, *, exclude_id: int | None = None
):
"""주어진 file_hash 의 canonical 문서(가장 오래된 = min id)를 반환. 없으면 None.
OFF-whitelist 채널(law_monitor) canonical 후보에서 제외 업로드가 법령 개정본에
링크되지 않는다. exclude_id = 방금 INSERT 신규 자신 제외(B-1).
"""
from models.document import Document # 지연 import (순환 회피)
stmt = (
select(Document)
.where(
Document.file_hash == file_hash,
Document.deleted_at.is_(None),
or_(
Document.source_channel.is_(None),
Document.source_channel.notin_(DEDUP_OFF_CHANNELS),
),
)
.order_by(Document.id.asc())
)
if exclude_id is not None:
stmt = stmt.where(Document.id != exclude_id)
return (await session.execute(stmt)).scalars().first()
# B-2 /documents/duplicates 의 file_hash 그룹 SQL. 라우터가 직접 execute (Pydantic 응답은 라우터에).
# reason='content_hash' = file_hash exact 그룹(idx_documents_hash 재사용, 신규 인덱스/테이블 불요).
# canonical_id = min(id), members = id 오름차순 배열, n = 그룹 크기.
DUPLICATE_GROUPS_SQL = text(
"""
SELECT file_hash,
min(id) AS canonical_id,
array_agg(id ORDER BY id) AS members,
count(*) AS n
FROM documents
WHERE deleted_at IS NULL
AND file_hash IS NOT NULL
AND (source_channel IS NULL OR source_channel NOT IN :off_channels)
GROUP BY file_hash
HAVING count(*) > 1
ORDER BY min(id)
"""
).bindparams(bindparam("off_channels", expanding=True))
async def reconcile_dedup(
session: AsyncSession, *, apply: bool = True, chunk_size: int = 500, sample_size: int = 40
) -> dict:
"""file_hash exact 그룹의 duplicate_of/duplicate_count 를 재계산해 정합화 (B-4 코어).
멱등 목표값과 다른 행만 UPDATE. 야간 (workers.dedup_reconcile) backfill 스크립트가
공유한다. 문서는 soft-delete only(FK ON DELETE SET NULL 미발화) 비정규화 dedup 컬럼이
삭제 드리프트(멤버의 stale 포인터·canonical overcount)하므로 절대 재계산이 정합 보장.
반환 = {groups, docs, changes, applied, sample}. sample = 적용될/ 변경 미리보기(최대 sample_size).
canonical = 그룹 최古(min id): duplicate_of=NULL, duplicate_count=group_size-1. 멤버: duplicate_of=canonical, count=0.
"""
groups = (
await session.execute(
DUPLICATE_GROUPS_SQL, {"off_channels": list(DEDUP_OFF_CHANNELS)}
)
).all()
desired: dict[int, tuple[int | None, int]] = {}
for g in groups:
members = list(g.members)
canonical = g.canonical_id
desired[canonical] = (None, len(members) - 1)
for m in members:
if m != canonical:
desired[m] = (canonical, 0)
if not desired:
return {"groups": 0, "docs": 0, "changes": 0, "applied": 0, "sample": []}
ids = list(desired.keys())
current: dict[int, tuple[int | None, int]] = {}
for i in range(0, len(ids), 1000):
batch = ids[i : i + 1000]
rows = (
await session.execute(
text(
"SELECT id, duplicate_of, duplicate_count "
"FROM documents WHERE id = ANY(:ids)"
).bindparams(ids=batch)
)
).all()
for r in rows:
current[r.id] = (r.duplicate_of, int(r.duplicate_count or 0))
changes = [
(i, dof, dcnt)
for i, (dof, dcnt) in desired.items()
if current.get(i) != (dof, dcnt)
]
sample = [
{"id": i, "duplicate_of": dof, "duplicate_count": dcnt}
for (i, dof, dcnt) in changes[:sample_size]
]
applied = 0
if apply and changes:
for i in range(0, len(changes), chunk_size):
for did, dof, dcnt in changes[i : i + chunk_size]:
await session.execute(
text(
"UPDATE documents SET duplicate_of = :dof, duplicate_count = :dcnt "
"WHERE id = :id"
).bindparams(dof=dof, dcnt=dcnt, id=did)
)
await session.commit()
applied += len(changes[i : i + chunk_size])
return {
"groups": len(groups),
"docs": len(ids),
"changes": len(changes),
"applied": applied,
"sample": sample,
}
async def find_near_duplicates(
session: AsyncSession,
doc_id: int,
*,
cosine_threshold: float = NEAR_DUP_COSINE_THRESHOLD,
trgm_threshold: float = NEAR_DUP_TRGM_THRESHOLD,
max_candidates: int = NEAR_DUP_MAX_CANDIDATES,
) -> list[dict]:
"""anchor doc 의 near-duplicate 후보를 trigram→embedding 2단계로 찾는다(read-only).
반환 = [{doc_id, title, title_sim?, cosine}] (cosine 내림차순). embedding 미생성
(업로드 직후 흔함) trigram 후보만 cosine=None 으로 반환(non-gating 기록). 어떤 행도
수정/삭제하지 않으며 저장된 embedding 읽는다(Soft Lock 준수).
"""
anchor = (
await session.execute(
text(
"SELECT id, title, (embedding IS NOT NULL) AS has_emb "
"FROM documents WHERE id = :id AND deleted_at IS NULL"
).bindparams(id=doc_id)
)
).first()
if anchor is None or not anchor.title:
return []
# (1) title trigram 후보. similarity() 컷으로 후보를 max_candidates 로 줄여 전수 임베딩
# 스캔을 회피한다. (index-accelerated `%` 연산자 경로는 후보 생성이 병목이 될 때의
# phase2 최적화 — 짧은 title 28.9k seq 평가는 비동기 post-upload 에서 충분히 저렴.)
cand_rows = (
await session.execute(
text(
"""
SELECT id, title, similarity(title, :t) AS title_sim
FROM documents
WHERE id <> :id
AND deleted_at IS NULL
AND title IS NOT NULL
AND similarity(title, :t) >= :trgm
ORDER BY similarity(title, :t) DESC
LIMIT :lim
"""
).bindparams(id=doc_id, t=anchor.title, trgm=trgm_threshold, lim=max_candidates)
)
).all()
if not cand_rows:
return []
if not anchor.has_emb:
# 임베딩 미생성 — 후보만 기록(cosine rerank 는 embed stage 완료 후). non-gating.
return [
{"doc_id": r.id, "title": r.title, "title_sim": float(r.title_sim), "cosine": None}
for r in cand_rows
]
# (2) 후보에만 doc-level embedding 코사인 rerank. 저장값 read-only.
cand_ids = [r.id for r in cand_rows]
rer = (
await session.execute(
text(
"""
SELECT c.id, c.title,
(1 - (c.embedding <=> (SELECT embedding FROM documents WHERE id = :id))) AS cosine
FROM documents c
WHERE c.id = ANY(:ids) AND c.embedding IS NOT NULL
"""
).bindparams(id=doc_id, ids=cand_ids)
)
).all()
out = [
{"doc_id": r.id, "title": r.title, "cosine": float(r.cosine)}
for r in rer
if r.cosine is not None and float(r.cosine) >= cosine_threshold
]
out.sort(key=lambda x: x["cosine"], reverse=True)
return out
+39
View File
@@ -0,0 +1,39 @@
"""스토리지 계층 추상화 패키지 (plan ds-s1-backend-1 D 그룹, scaffold-first).
활성 백엔드 선택 = get_storage_backend():
- env DS_STORAGE_BACKEND (기본 'local') 결정 config.yaml storage 섹션 편집 없이도
동작(검색실험 Soft Lock 동안 config 불가침). 활성(외부 백엔드) D-3.
- 'local' LocalBackend(settings.nas_mount_path) : 현행 NAS NFS, /file 동작 불변.
- 'nas_api'/'nas' NasApiBackend(env DS_NAS_API_BASE_URL) : 미프로비전 503(silent fallback X).
"""
from __future__ import annotations
import os
from functools import lru_cache
from core.config import settings
from .base import StatResult, StorageBackend, StorageNotConfigured
from .local import LocalBackend
from .nas_api import NasApiBackend
__all__ = [
"StorageBackend",
"StorageNotConfigured",
"StatResult",
"LocalBackend",
"NasApiBackend",
"get_storage_backend",
]
@lru_cache(maxsize=1)
def get_storage_backend() -> StorageBackend:
"""활성 스토리지 백엔드 1개 반환 (프로세스 단위 캐시)."""
backend = os.getenv("DS_STORAGE_BACKEND", "local").lower()
if backend == "local":
return LocalBackend(settings.nas_mount_path)
if backend in ("nas_api", "nas"):
return NasApiBackend(os.getenv("DS_NAS_API_BASE_URL"))
raise StorageNotConfigured(f"unknown DS_STORAGE_BACKEND={backend!r}")
+50
View File
@@ -0,0 +1,50 @@
"""스토리지 백엔드 추상 인터페이스 — plan ds-s1-backend-1 D-1.
ABC 첫날부터 Range(offset/length) stream 계약을 포함한다 D-2 원격 streaming
Range pass-through afterthought 아니라 인터페이스 의무가 되도록.
is_local=True 백엔드는 로컬 파일시스템 경로를 노출 호출부가 Starlette FileResponse
(Range 자동 처리) 그대로 쓴다. 원격 백엔드는 stream()/stat() Range 구현한다.
"""
from __future__ import annotations
import os
from abc import ABC, abstractmethod
from collections.abc import AsyncIterator
from dataclasses import dataclass
class StorageNotConfigured(RuntimeError):
"""활성화되지 않은(미프로비전) 백엔드 호출 — 503 으로 표면화. silent fallback 금지."""
@dataclass
class StatResult:
exists: bool
size: int
class StorageBackend(ABC):
"""원본 파일 접근 추상 인터페이스."""
# 로컬 파일시스템 경로를 노출하는가 (FileResponse 직결 가능 여부).
is_local: bool = False
@abstractmethod
def local_path(self, rel_path: str) -> os.PathLike[str] | None:
"""is_local=True 면 물리 경로 반환(FileResponse 용). 원격 백엔드는 None."""
@abstractmethod
async def stat(self, rel_path: str) -> StatResult:
"""크기/존재 여부. 미구성 백엔드는 StorageNotConfigured raise."""
@abstractmethod
def stream(
self, rel_path: str, *, start: int | None = None, end: int | None = None
) -> AsyncIterator[bytes]:
"""[start, end] 바이트 범위(inclusive)를 async 청크로 yield (Range pass-through).
start/end None 이면 전체. 미구성 백엔드는 StorageNotConfigured raise.
"""
raise NotImplementedError
+50
View File
@@ -0,0 +1,50 @@
"""LocalBackend — 현행 NAS NFS(volume4) 마운트. /file 동작 불변 (plan D-1)."""
from __future__ import annotations
import os
from collections.abc import AsyncIterator
from pathlib import Path
from .base import StatResult, StorageBackend
_STREAM_CHUNK = 256 * 1024
class LocalBackend(StorageBackend):
"""루트(=settings.nas_mount_path) 하위 상대경로를 로컬 파일시스템으로 해석."""
is_local = True
def __init__(self, root: str) -> None:
self._root = Path(root)
def local_path(self, rel_path: str) -> os.PathLike[str]:
return self._root / rel_path
async def stat(self, rel_path: str) -> StatResult:
p = self._root / rel_path
if not p.exists():
return StatResult(exists=False, size=0)
return StatResult(exists=True, size=p.stat().st_size)
async def stream(
self, rel_path: str, *, start: int | None = None, end: int | None = None
) -> AsyncIterator[bytes]:
"""로컬 파일을 청크 stream (Range 지원). /file 의 로컬 경로는 FileResponse 가
Range 자동 처리하므로 메서드는 인터페이스 대칭/원격 동등성을 위한 구현."""
p = self._root / rel_path
with p.open("rb") as f:
if start:
f.seek(start)
remaining = None if end is None else (end - (start or 0) + 1)
while True:
to_read = _STREAM_CHUNK if remaining is None else min(_STREAM_CHUNK, remaining)
if to_read <= 0:
break
data = f.read(to_read)
if not data:
break
yield data
if remaining is not None:
remaining -= len(data)
+33
View File
@@ -0,0 +1,33 @@
"""NasApiBackend — 외부 스토리지(맥미니4TB / NAS Docker API) stub (plan D-1).
미프로비전 = 503. silent fallback 금지(다른 백엔드로 자동 우회 X). 프로비전
D-3 에서 활성화. infra_inventory.md 갱신(Update Rule) 선행이다.
"""
from __future__ import annotations
import os
from collections.abc import AsyncIterator
from .base import StatResult, StorageBackend, StorageNotConfigured
_MSG = "NasApiBackend 미구성 — 외부 스토리지 프로비전 후 활성(D-3). silent fallback 없음."
class NasApiBackend(StorageBackend):
is_local = False
def __init__(self, base_url: str | None = None) -> None:
self._base_url = base_url
def local_path(self, rel_path: str) -> os.PathLike[str] | None:
return None
async def stat(self, rel_path: str) -> StatResult:
raise StorageNotConfigured(_MSG)
async def stream(
self, rel_path: str, *, start: int | None = None, end: int | None = None
) -> AsyncIterator[bytes]:
raise StorageNotConfigured(_MSG)
yield b"" # 도달 불가 — async generator 형태 유지용(호출부 `async for` 계약 일치).
+32
View File
@@ -0,0 +1,32 @@
"""야간 dedup 컬럼 재계산 잡 (plan ds-s1-backend-1 B-4 '야간 배치').
duplicate_of / duplicate_count 비정규화 캐시다. 문서는 soft-delete only(deleted_at)
FK ON DELETE SET NULL 발화하지 않아, canonical/멤버를 soft-delete 하면 잔여 드리프트가
생긴다(멤버의 stale 포인터·canonical overcount). B-1 업로드 채움은 신규 행만 다루므로,
야간 절대 재계산이 전체 정합을 보장한다. 멱등 드리프트 없으면 no-op(로그만).
응답 계약(DocumentResponse.duplicate_count/duplicate_of) (S3) 읽으므로 정합이 중요.
"""
import logging
from core.database import async_session
from services.dedup import reconcile_dedup
logger = logging.getLogger("dedup_reconcile")
async def run() -> None:
try:
async with async_session() as session:
r = await reconcile_dedup(session, apply=True)
if r["changes"]:
logger.info(
"[dedup_reconcile] groups=%s docs=%s changes=%s applied=%s",
r["groups"], r["docs"], r["changes"], r["applied"],
)
else:
logger.info(
"[dedup_reconcile] no drift (groups=%s docs=%s)", r["groups"], r["docs"]
)
except Exception:
logger.exception("[dedup_reconcile] failed")
+72 -6
View File
@@ -17,6 +17,7 @@ md_content ref 형식: `![alt](docimg:img_001)` — image_key 가 sequence 기
plan: ~/.claude/plans/piped-humming-crystal.md
"""
import asyncio
import base64
import hashlib
import json
@@ -68,9 +69,13 @@ _FORMAT_TO_MIME = {
"gif": "image/gif",
}
# Phase 1B = PDF only. DOCX 등은 후속 Phase.
# Phase 1B = PDF only (marker-service). office/hwp 는 C-2 에서 office_md 하이브리드로 분기.
SUPPORTED_EXTENSIONS = {".pdf"}
# C-2: office/hwp → md (OOXML=markitdown / hwp=LibreOffice). 변환기가 지원하는 suffix 집합.
# 레거시 바이너리(.doc/.xls/.ppt)는 markitdown 미지원 → 여기 없음(=PDF-only 게이트에서 skip).
from workers.office_md import SUPPORTED as OFFICE_MD_SUPPORTED # noqa: E402
# config.yaml document_types 의 한국어 label 직접 사용 (Pre-flight 결과).
# Round 0 사용자 의도 = 표 중심 발주/계산/명세 도메인.
SKIP_DOC_TYPES = {
@@ -177,9 +182,18 @@ async def process(document_id: int, session: AsyncSession) -> None:
return
container_path = _to_marker_path(doc.file_path)
# ---- (3) PDF only ----
suffix = Path(container_path).suffix.lower()
# ---- (3) office/hwp → md (C-2): PDF 외 지원 포맷은 office_md 하이브리드 변환 ----
if suffix in OFFICE_MD_SUPPORTED:
await session.execute(
update(Document).where(Document.id == document_id).values(md_status="processing")
)
await session.commit()
await _process_office(doc, document_id, container_path, session)
return
# ---- (3.5) PDF only (그 외 확장자 = skip) ----
if suffix not in SUPPORTED_EXTENSIONS:
logger.info(f"markdown_skip_unsupported_extension id={document_id} ext={suffix}")
await _set_skipped(
@@ -368,6 +382,56 @@ async def _process_markdown_passthrough(
)
async def _process_office(
doc: Document, document_id: int, container_path: str, session: AsyncSession
) -> None:
"""office/hwp → md (C-2). C-5 상태머신 postcondition 의 office arm.
office_md.convert_office_to_md 이진 계약: 성공=비공백 md 반환 / 실패·빈출력·타임아웃·
의존성부재=OfficeMdError raise. 따라서:
- 성공 md_status='success' (+ 비공백 md). 불변식 md_status {success,partial} md 非공백 유지.
- 실패/예외 _fail (md_status='failed', ¬success·¬skipped). silent 'success+빈md' 절대 없음.
partial arm PDF split 전용 office 이진이라 여기 없음. 'completed' A-3 직렬화 전용(워커 미사용).
quality content-type-aware: office=scored(_compute_quality). 동기 변환은 to_thread event loop 비차단.
"""
from workers.office_md import OfficeMdError, convert_office_to_md
is_hwp = Path(container_path).suffix.lower() in (".hwp", ".hwpx")
engine = "libreoffice_hwp" if is_hwp else "markitdown"
try:
# 동기 subprocess(LibreOffice)/markitdown — 스레드로 빼서 이벤트 루프 비차단.
md_content = await asyncio.to_thread(convert_office_to_md, container_path)
except OfficeMdError as exc:
logger.warning(f"[marker] office md 변환 실패 id={document_id} engine={engine}: {exc}")
await _fail(session, document_id, f"office_md: {str(exc)[:990]}", engine=engine)
return
except Exception as exc: # 예기치 못한 예외도 failed (success+빈md 절대 금지)
logger.exception(f"[marker] office md unexpected error id={document_id}: {exc}")
await _fail(session, document_id, f"office_md_unexpected: {str(exc)[:980]}", engine=engine)
return
# 성공 — 계약상 md_content 는 비공백(빈출력은 raise). quality scored.
quality = _compute_quality(md_content, doc.extracted_text or "", {"page_count": None})
await session.execute(
update(Document).where(Document.id == document_id).values(
md_content=md_content,
md_status="success",
md_extraction_engine=engine,
md_extraction_engine_version=None,
md_extraction_quality=quality,
md_content_hash=hashlib.sha256(md_content.encode("utf-8")).hexdigest(),
md_source_hash=doc.file_hash,
md_generated_at=_now(),
md_extraction_error=None,
md_frontmatter=doc.md_frontmatter or {},
md_format_version="1.0",
content_origin="extracted",
)
)
await session.commit()
logger.info(f"[marker] office success id={document_id} engine={engine} len={len(md_content)}")
async def _process_split(
doc: Document,
document_id: int,
@@ -779,15 +843,17 @@ async def _set_skipped(session: AsyncSession, document_id: int, reason: str) ->
await session.commit()
async def _fail(session: AsyncSession, document_id: int, error: str) -> None:
"""doc-level failed (재시도 무의미)."""
async def _fail(
session: AsyncSession, document_id: int, error: str, *, engine: str = "marker"
) -> None:
"""doc-level failed (재시도 무의미). engine = 실패한 변환 엔진(office=markitdown/libreoffice_hwp)."""
await session.execute(
update(Document).where(Document.id == document_id).values(
md_status="failed",
md_content=None,
md_content_hash=None,
md_extraction_error=error,
md_extraction_engine="marker",
md_extraction_engine=engine,
md_generated_at=_now(),
content_origin="extracted",
)
+3 -1
View File
@@ -30,7 +30,9 @@ SUPPORTED = OOXML_FORMATS | HWP_FORMATS
# 빈 출력 판정 임계 — 공백 제거 후 이 미만이면 '실패(빈 변환)'로 본다.
_MIN_BODY_CHARS = 16
_SOFFICE_BIN = os.environ.get("LIBREOFFICE_BIN", "soffice")
# extract_worker.py 가 이미 `libreoffice` 바이너리로 office 텍스트 추출에 성공(컨테이너 검증된
# 이름) → 기본값 정합. soffice 만 있는 환경은 LIBREOFFICE_BIN 으로 override.
_SOFFICE_BIN = os.environ.get("LIBREOFFICE_BIN", "libreoffice")
class OfficeMdError(Exception):
+87
View File
@@ -0,0 +1,87 @@
# S1 데이터·백엔드 트랙 적용 runbook (plan ds-s1-backend-1)
> 코드는 `feat/s1-dedup-fields` 브랜치에 완성. 이 문서는 **prod(GPU) 적용 게이트** 절차.
> ⚠ 적용은 사용자 명시 go 필요 — 본 runbook 은 자동 실행되지 않는다.
## 0. 사전 조건 (게이트)
- [ ] **검색실험 Soft Lock 확인**`~/.claude/.search-experiment-active` 부재여야 함.
현재(2026-06-05) 부재 = 비활성. migration 287 은 startup 자동적용 → `docker compose up`
이 restart 를 유발하므로, 실험 활성 시엔 예외창 합의 후에만.
- [ ] **불가침 면 (검색실험 유효성)**: embedding 모델 / 벡터 인덱스(ivfflat/partial) /
retrieval config / config.yaml 의 ai·model 섹션 **미접촉**. 본 트랙 변경면은
dedup 컬럼 + office_md + storage scaffold(env) 뿐.
## 1. migration 번호
- 287(dedup 3컬럼) **단일** 클레임. P0-4=(C) 무변경이라 신규 migration 미추가.
- S2/S3 트랙이 같은 287 을 발행하지 않도록 조율(startup 카오스 방지).
## 2. restart 셋 (한 번에 배치)
| 서비스 | 변경 | 재시작 사유 |
|---|---|---|
| `fastapi` | A(287 dedup) + B(dedup API) + D(storage scaffold) | startup migration 자동적용 + 코드 |
| `marker_worker`(fastapi 내 스케줄러) | C(office_md 분기) + **markitdown 신규 pip dep** | rebuild 필요 |
> markitdown 은 신규 의존성 → `docker compose build` 필수(force-recreate 만으론 image 미갱신,
> feedback_docker_compose_build_vs_force_recreate). office 변환(OOXML)에만 필요.
## 3. 적용 순서 (inventory → config → deploy → verify)
```bash
ssh gpu && cd ~/Documents/code/hyungi_Document_Server
# (1) pre-A-1 안전망 — DB 덤프 (repo 밖)
bash scripts/s1_pre_change_backup.sh pre-a1
# (2) 코드 가져오기 + 빌드(markitdown dep 반영) + 적용
git fetch && git checkout feat/s1-dedup-fields # 또는 main 머지 후 main
docker compose build fastapi # markitdown 설치 (requirements 에 추가 필요)
docker compose up -d fastapi # startup 에서 migration 287 자동적용
# (3) migration 287 적용 확인
docker compose exec -T postgres psql -U pkm -d pkm -c \
"SELECT version,name FROM schema_migrations WHERE version=287;"
docker compose exec -T postgres psql -U pkm -d pkm -c \
"\d documents" | grep -E 'original_filename|duplicate_of|duplicate_count'
```
> **requirements**: office OOXML 변환에 `markitdown` 추가 필요(`requirements.txt`/pyproject).
> markdownify·LibreOffice 는 기존. 빌드 전 dep 추가 PR 필수(없으면 OOXML 변환이 OfficeMdError→failed,
> hwp/PDF/passthrough 는 정상).
## 4. backfill (코드 적용·검증 후, 야간 비중첩창)
> dedup 컬럼 정합은 **야간 잡 `dedup_reconcile`(03:30 KST, main.py)** 이 매일 멱등 재계산한다
> (soft-delete 잔여 드리프트 자동 정리). 아래 `backfill_dedup.py` 수동 실행은 적용 직후 1회
> 초기 채움/즉시 확인용 — 이후엔 야간 잡이 유지.
```bash
# (4a) dedup backfill (초기 1회) — 먼저 dry-run 으로 정확한 UPDATE set 확인
bash scripts/s1_pre_change_backup.sh pre-b4
docker compose exec fastapi python /app/scripts/backfill_dedup.py --dry-run
docker compose exec fastapi python /app/scripts/backfill_dedup.py --apply
# (4b) office/hwp pending markdown 백필 — C-2 라이브 ingestion 과 비중첩 야간창
docker compose exec fastapi python /app/scripts/backfill_nonpdf_markdown.py --dry-run
docker compose exec fastapi python /app/scripts/backfill_nonpdf_markdown.py --apply --limit 20 # sample 먼저
docker compose exec fastapi python /app/scripts/backfill_nonpdf_markdown.py --apply # 전체
```
## 5. verify (smoke)
```bash
# /duplicates shape
curl -s -H "Authorization: Bearer $TOK" https://document.hyungi.net/api/documents/duplicates | jq '{total_groups,total_duplicate_docs, g0:.groups[0]}'
# office 변환 결과 (sample doc)
docker compose exec -T postgres psql -U pkm -d pkm -c \
"SELECT md_status,md_extraction_engine,length(md_content) FROM documents WHERE id=<office_doc_id>;"
# md_status success→completed 직렬화 (앱 계약)
curl -s -H "Authorization: Bearer $TOK" https://document.hyungi.net/api/documents/<id> | jq '.md_status'
```
## 6. 롤백
- 컬럼만 빠른 롤백: `scripts/rollback_287.sql` (수동, schema_migrations 287 행도 삭제).
- 전체 복원: `scripts/s1_pre_change_backup.sh` 가 출력한 `.sql.gz` → psql 복원.
+90
View File
@@ -0,0 +1,90 @@
"""기존 file_hash 중복 그룹 backfill — plan ds-s1-backend-1 B-4.
목적:
A-1 migration 287 추가된 duplicate_of / duplicate_count *기존* 중복 그룹에 채운다.
migration(단일 트랜잭션) 분리한 배치(database.py:29-30 정책 대량 UPDATE
startup migration 넣지 않는다). 업로드 시점 채움(B-1) 신규 행만 다루므로 과거는 스크립트.
판정:
- file_hash exact 그룹(OFF-whitelist=law_monitor 제외, deleted 제외, count>1).
near_duplicate 영속화 보류(on-the-fly) 여기서 다루지 않는다.
- canonical = 그룹 최古(min id). canonical.duplicate_of=NULL, duplicate_count=group_size-1.
- -canonical 멤버 = duplicate_of=canonical, duplicate_count=0.
안전:
- 멱등 이미 목표값인 행은 UPDATE (재실행 안전). --dry-run 적용될 정확한 set 미리보기.
- --chunk(기본 500)/txn 청크 커밋 28,941 단일 트랜잭션 lock 회피.
실행:
docker compose exec fastapi python /app/scripts/backfill_dedup.py --dry-run
docker compose exec fastapi python /app/scripts/backfill_dedup.py --apply
# 변경 전 안전망은 E-3 pre-B-4 pg_dump (별 단계).
"""
import argparse
import asyncio
import os
import sys
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "app"))
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
from services.dedup import reconcile_dedup # 코어 재계산 (야간 잡과 공유)
async def run(*, apply: bool, chunk_size: int) -> int:
database_url = os.getenv(
"DATABASE_URL", "postgresql+asyncpg://pkm:pkm@localhost:5432/pkm"
)
engine = create_async_engine(database_url)
session_factory = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
try:
async with session_factory() as session:
result = await reconcile_dedup(session, apply=apply, chunk_size=chunk_size)
print(f"=== dedup 그룹 {result['groups']}개 · 관련 문서 {result['docs']}건 ===")
if result["groups"] == 0:
print("dedup 그룹 없음(OFF-whitelist 제외 후 count>1 없음) — 종료.")
return 0
already = result["docs"] - result["changes"]
print(f"변경 필요 {result['changes']}건 / 이미 목표값 {already}건 (멱등)")
if result["changes"] == 0:
print("모두 목표값 — 적용할 변경 없음.")
return 0
# 적용될/된 정확한 UPDATE set 미리보기 (상위 40건)
print("\n=== UPDATE set (id → duplicate_of / duplicate_count) ===")
for s in result["sample"]:
role = "canonical" if s["duplicate_of"] is None else f"dup→{s['duplicate_of']}"
print(
f" id={s['id']:>7} duplicate_of={s['duplicate_of']} "
f"duplicate_count={s['duplicate_count']} [{role}]"
)
if result["changes"] > len(result["sample"]):
print(f" ... 외 {result['changes'] - len(result['sample'])}")
if not apply:
print(f"\n[dry-run] {result['changes']}건 변경 예정. --apply 로 실제 적용.")
else:
print(f"\n[apply] 완료 — {result['applied']}건 갱신.")
return 0
finally:
await engine.dispose()
def main() -> int:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--apply", action="store_true", help="실제 적용 (기본 dry-run)")
parser.add_argument("--dry-run", action="store_true", help="명시적 dry-run (default 동등)")
parser.add_argument("--chunk", type=int, default=500, help="txn 당 UPDATE 행 수 (기본 500)")
args = parser.parse_args()
if args.apply and args.dry_run:
parser.error("--apply 와 --dry-run 동시 지정 불가")
return asyncio.run(run(apply=args.apply, chunk_size=args.chunk))
if __name__ == "__main__":
sys.exit(main())
+146
View File
@@ -0,0 +1,146 @@
"""과거 office/hwp pending 문서 markdown stage 백필 — plan ds-s1-backend-1 C-4.
신규 ingest classifymarkdown 전이(queue_consumer.py:142) 자동 도달하므로 스크립트는
*과거* office/hwp 행만 다룬다. C-2 office_md 변환을 붙이기 전까지 markdown stage 에서
skip 행들을 다시 큐에 넣어 md_content 생성한다.
대상 (WHERE):
- file_format IN (office_md 지원 실값: docx, xlsx, pptx, hwp, hwpx)
제외 = file_format. INCLUDE 필터가 article(file_format='article') 구조적으로 배제
P0-3 가드(md 없는 article completed 도달 금지, correctness-critical). source_channel 불필요.
레거시 바이너리(.doc/.xls/.ppt) markitdown 미지원 기본 목록 제외(넣어도 marker skip).
- md_status = 'pending' (이미 success/failed/skipped 건드리지 않음)
- extracted_text IS NOT NULL (폴백 존재 모집단)
C-5 failed-postcondition 상속: 변환 실패는 md_status='failed' 시끄럽게 남는다(앱이
'변환 실패' 표시). extracted_text NULL office(폴백 없음) 배제 실패 시끄러운
집합이라 phase2 재검토(C-4 배제 honest).
스케줄:
C-2 라이브 office ingestion 백필 비중첩 markdown 워커는 BATCH=1 직렬이라
야간 단발로 돌려 라이브 office 업로드 stall 회피(plan C-2 reflection).
실행:
docker compose exec fastapi python /app/scripts/backfill_nonpdf_markdown.py --dry-run
docker compose exec fastapi python /app/scripts/backfill_nonpdf_markdown.py --apply
docker compose exec fastapi python /app/scripts/backfill_nonpdf_markdown.py --apply --limit 50
"""
import argparse
import asyncio
import json
import os
import sys
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "app"))
from sqlalchemy import bindparam, text
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
# office_md 가 실제 변환하는 file_format(확장자 소문자, 점 없음). 단일 source.
DEFAULT_FORMATS = ("docx", "xlsx", "pptx", "hwp", "hwpx")
CANDIDATES_SQL = text(
"""
SELECT id, file_format, title, file_path
FROM documents
WHERE deleted_at IS NULL
AND md_status = 'pending'
AND extracted_text IS NOT NULL
AND file_format IN :formats
ORDER BY id
"""
).bindparams(bindparam("formats", expanding=True))
# 활성 markdown 큐 행이 없는 doc 만 통과 (UNIQUE 부분 인덱스). 충돌 = silent skip.
ENQUEUE_SQL = text(
"""
INSERT INTO processing_queue (document_id, stage, status, payload)
VALUES (:doc_id, 'markdown', 'pending', CAST(:payload AS jsonb))
ON CONFLICT DO NOTHING
"""
)
def _chunks(seq, size):
for i in range(0, len(seq), size):
yield seq[i : i + size]
async def run(*, apply: bool, formats: tuple[str, ...], limit: int | None, chunk_size: int) -> int:
database_url = os.getenv(
"DATABASE_URL", "postgresql+asyncpg://pkm:pkm@localhost:5432/pkm"
)
engine = create_async_engine(database_url)
session_factory = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
try:
async with session_factory() as session:
rows = (
await session.execute(CANDIDATES_SQL, {"formats": list(formats)})
).all()
if limit:
rows = rows[:limit]
print(f"=== office/hwp pending 후보 = {len(rows)}건 (formats={','.join(formats)}) ===")
if not rows:
print("후보 없음 — 종료.")
return 0
by_fmt: dict[str, int] = {}
for r in rows:
by_fmt[r.file_format] = by_fmt.get(r.file_format, 0) + 1
print("포맷별:", ", ".join(f"{k}={v}" for k, v in sorted(by_fmt.items())))
for r in rows[:20]:
print(f" id={r.id:>7} {r.file_format:<5} {(r.title or '')[:70]}")
if len(rows) > 20:
print(f" ... 외 {len(rows) - 20}")
if not apply:
print(f"\n[dry-run] {len(rows)}건 markdown 큐 enqueue 예정. --apply 로 실제 적용.")
print(" (적용 전 C-2 라이브 office ingestion 과 비중첩 야간창 확인.)")
return 0
payload = json.dumps(
{"force_reprocess": True, "reason": "c4_nonpdf_markdown_backfill"}
)
inserted = 0
processed = 0
for batch in _chunks(rows, chunk_size):
for r in batch:
result = await session.execute(
ENQUEUE_SQL, {"doc_id": r.id, "payload": payload}
)
if result.rowcount > 0:
inserted += 1
await session.commit()
processed += len(batch)
print(f"[apply] {processed}/{len(rows)} 처리 (enqueue 누적 {inserted})")
print(f"\n[apply] 완료 — {inserted}/{len(rows)} 신규 markdown 큐 추가.")
print(" (skip = 이미 활성 markdown 큐 행이 있는 문서)")
return 0
finally:
await engine.dispose()
def main() -> int:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--apply", action="store_true", help="실제 enqueue (기본 dry-run)")
parser.add_argument("--dry-run", action="store_true", help="명시적 dry-run (default 동등)")
parser.add_argument(
"--formats", type=str, default=",".join(DEFAULT_FORMATS),
help=f"쉼표 구분 file_format (기본 {','.join(DEFAULT_FORMATS)})",
)
parser.add_argument("--limit", type=int, default=None, help="후보 상한(샘플 검증용)")
parser.add_argument("--chunk", type=int, default=200, help="enqueue txn 청크 크기")
args = parser.parse_args()
if args.apply and args.dry_run:
parser.error("--apply 와 --dry-run 동시 지정 불가")
formats = tuple(f.strip().lower() for f in args.formats.split(",") if f.strip())
return asyncio.run(
run(apply=args.apply, formats=formats, limit=args.limit, chunk_size=args.chunk)
)
if __name__ == "__main__":
sys.exit(main())
+18
View File
@@ -0,0 +1,18 @@
-- rollback_287.sql — plan ds-s1-backend-1 E-3. migration 287(dedup 3컬럼) 되돌림.
--
-- ★ migrations/ 밖에 둔다 — init_db() 자동 스캔(NNN_*.sql) 대상이 아니므로 자동 적용되지 않는다.
-- 수동 실행 전용:
-- docker compose cp scripts/rollback_287.sql postgres:/tmp/rollback_287.sql
-- docker compose exec -T postgres psql -U pkm -d pkm -f /tmp/rollback_287.sql
-- (또는) docker compose exec -T postgres psql -U pkm -d pkm < scripts/rollback_287.sql
--
-- 주의: original_filename / duplicate_of / duplicate_count 데이터 영구 삭제(B-1 채움·B-4 backfill 결과 포함).
-- schema_migrations 의 287 행도 함께 제거해야 재적용(다음 startup)이 가능하다.
-- 전체 복원이 필요하면 E-3 pre-change pg_dump 를 쓴다(이 스크립트는 '컬럼만 빠른 롤백').
ALTER TABLE documents
DROP COLUMN IF EXISTS duplicate_of,
DROP COLUMN IF EXISTS duplicate_count,
DROP COLUMN IF EXISTS original_filename;
DELETE FROM schema_migrations WHERE version = 287;
+30
View File
@@ -0,0 +1,30 @@
#!/usr/bin/env bash
# pre-change pg_dump — plan ds-s1-backend-1 E-3.
# A-1(migration 287) / B-4 backfill 적용 *전* 안전망. repo cp -p 가 아니라 진짜 DB 덤프.
#
# 사용 (GPU 서버, repo 루트에서):
# bash scripts/s1_pre_change_backup.sh # pre-A-1
# bash scripts/s1_pre_change_backup.sh pre-b4 # pre-B-4 (라벨만 다름)
#
# 백업 위치 = repo 밖 (feedback_backup_outside_repo): $HOME/.local/share/ds-s1-backups/
set -euo pipefail
LABEL="${1:-pre-a1}"
DATE="$(date +%Y%m%d-%H%M%S)"
BACKUP_DIR="${BACKUP_DIR:-$HOME/.local/share/ds-s1-backups}"
mkdir -p "$BACKUP_DIR"
OUT="$BACKUP_DIR/pkm-${LABEL}-${DATE}.sql.gz"
echo "[s1-backup] pg_dump pkm → $OUT"
# 단일 pkm DB 덤프(pg_dumpall 아님). gzip 은 redirect(파일명 추측 함정 회피).
docker compose exec -T postgres pg_dump -U pkm -d pkm | gzip > "$OUT"
echo "[s1-backup] done: $(du -h "$OUT" | cut -f1)"
echo -n "[s1-backup] gzip 무결성: "
gzip -t "$OUT" && echo "OK"
echo
echo "[s1-backup] 롤백 옵션:"
echo " (a) 287 컬럼만 되돌림(빠름): scripts/rollback_287.sql 수동 실행"
echo " (b) 전체 복원: gunzip -c '$OUT' | docker compose exec -T postgres psql -U pkm -d pkm"
echo "[s1-backup] 보존 7일 권장. (DR-grade 검증은 ephemeral restore — D5 트랙, 본 안전망 범위 밖.)"
+96
View File
@@ -0,0 +1,96 @@
"""S1-ADD (plan ds-s1-backend-1) B-2 /duplicates shape + D-2 Range 파서 + dedup 상수 단위 검증.
순수 단위(DB 불요). 실행 환경 = app/ 의존성 설치 컨텍스트(devsbx/GPU) 기존
test_s1_dedup_shape.py 동일 부트스트랩. DB 타는 검증(find_canonical/near_dup/엔드포인트)
GPU read-only/통합 매트릭스(E-1)에서.
"""
from __future__ import annotations
import json
import logging
import os
import sys
from pathlib import Path
import pytest
# logs/ 가 운영 daemon 소유일 때 import-time FileHandler PermissionError 방어 (test 한정).
_orig_file_handler = logging.FileHandler
def _safe_file_handler(filename, *args, **kwargs): # type: ignore[no-untyped-def]
try:
return _orig_file_handler(filename, *args, **kwargs)
except PermissionError:
return logging.NullHandler()
logging.FileHandler = _safe_file_handler # type: ignore[assignment]
os.environ.setdefault("DATABASE_URL", "postgresql+asyncpg://test:test@localhost:5432/test")
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "app"))
from api.documents import ( # noqa: E402
DuplicateGroup,
DuplicatesResponse,
_parse_byte_range,
)
from services.dedup import DEDUP_OFF_CHANNELS # noqa: E402
_FIXDIR = Path(os.path.expanduser("~/Documents/code/ds-app/contract/fixtures"))
# ── 1. /duplicates 응답 shape = contract fixture ───────────────────────────────
def test_duplicates_response_shape_matches_total_formula():
# 엔드포인트 정의: total_duplicate_docs = Σ(멤버수-1). fixture 와 동일해야 함.
groups = [
DuplicateGroup(canonical_id=4912, members=[4912, 4977], reason="content_hash"),
DuplicateGroup(canonical_id=5120, members=[5120, 5121, 5260], reason="content_hash"),
]
total_dup = sum(len(g.members) - 1 for g in groups)
resp = DuplicatesResponse(
groups=groups, total_groups=len(groups), total_duplicate_docs=total_dup
)
assert resp.total_groups == 2
assert resp.total_duplicate_docs == 3 # (2-1)+(3-1)
@pytest.mark.skipif(not _FIXDIR.exists(), reason="ds-app contract fixtures 미존재")
def test_duplicates_contract_fixture_decodes():
payload = json.loads((_FIXDIR / "documents_duplicates.json").read_text())
m = DuplicatesResponse.model_validate(payload)
assert m.total_groups == payload["total_groups"]
assert m.total_duplicate_docs == payload["total_duplicate_docs"]
# Σ(멤버수-1) 정의가 fixture total 과 일치(계약 self-consistency).
assert sum(len(g.members) - 1 for g in m.groups) == payload["total_duplicate_docs"]
assert m.groups[0].canonical_id == payload["groups"][0]["canonical_id"]
# ── 2. D-2 Range 파서 (원격 백엔드 pass-through; local 은 FileResponse 자동) ──────
@pytest.mark.parametrize(
"header,size,expected",
[
(None, 1000, (None, None)),
("", 1000, (None, None)),
("bytes=0-99", 1000, (0, 99)),
("bytes=100-", 1000, (100, 999)), # 끝까지
("bytes=-200", 1000, (800, 999)), # suffix: 마지막 200
("bytes=0-99999", 1000, (0, 999)), # end clamp
("bytes=2000-3000", 1000, (None, None)), # start >= size → 무효(전체)
("bytes=abc-def", 1000, (None, None)), # 파싱 실패
("bytes=50-10", 1000, (None, None)), # start>end
("bytes=0-99", 0, (None, None)), # 빈 파일
],
)
def test_parse_byte_range(header, size, expected):
assert _parse_byte_range(header, size) == expected
# ── 3. dedup OFF-whitelist 단일 source ─────────────────────────────────────────
def test_dedup_off_channels_is_law_monitor_only():
# P0-2 결정: 단일 OFF-list = law_monitor (법령 개정본 보존). 확장은 의도적 결정으로만.
assert DEDUP_OFF_CHANNELS == ("law_monitor",)