feat(search): doc-level atomic corpus replace + isolation test (Hier-Decomp-1 c5)

replace_doc_corpus(dry_run): G5 precond(doc-local embed 100% + parent 무결성 + leaf>0) 검증 후
단일 트랜잭션 atomic 교체(legacy in_corpus=false / hier leaf in_corpus=true,
predicate=is_leaf AND embedding NOT NULL, node_type 미사용). 물리삭제 없음. rollback_doc_corpus 역토글.
precond 미충족 시 변경 0(legacy 유지).

tests/hier_decomp/test_corpus_isolation.py: in_corpus=false leaf 가 corpus_chunks 누출 0 단언
(부분 ivfflat + 뷰 이중 choke point 회귀 가드).

c5: dry-run 3 pilot precond_ok(5140 158L→271leaf / 5186 381→199 / 5225 18→164), 격리 테스트 PASS.
실제 replace 는 c6(1-doc-first).

plan: hierarchical-decomposition-tiered-nesting-marmot.md

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
hyungi
2026-05-24 13:14:36 +00:00
parent fa82bd495b
commit a7b16b63db
2 changed files with 123 additions and 0 deletions
+72
View File
@@ -0,0 +1,72 @@
"""doc 단위 atomic 코퍼스 교체 (PR-DocSrv-Hierarchical-Decomposition-1 c5/c6).
legacy 윈도우 청크 → hier_section leaf 청크로 검색 코퍼스 교체(in_corpus 토글).
- 물리 삭제 없음(in_corpus 플래그만). 부분 ivfflat 이 자동 반영.
- G5 precondition(doc-local): hier leaf>0 + 모든 leaf embedding 보유(doc-local 100%) + parent 무결성(dangling 0).
- 단일 트랜잭션 atomic. 실패/precond 미충족 → 변경 0(legacy 유지).
- rollback: in_corpus 역토글(아래 rollback_doc_corpus).
"""
from __future__ import annotations
from sqlalchemy import text
from sqlalchemy.ext.asyncio import AsyncSession
CHUNKER_VERSION = "hier-rule-v1"
async def precheck(session: AsyncSession, doc_id: int) -> dict:
row = (await session.execute(text("""
SELECT
count(*) FILTER (WHERE source_type='hier_section' AND is_leaf) AS hier_leaves,
count(*) FILTER (WHERE source_type='hier_section' AND is_leaf AND embedding IS NOT NULL) AS hier_leaves_emb,
count(*) FILTER (WHERE source_type='legacy' AND in_corpus) AS legacy_active,
count(*) FILTER (WHERE source_type='hier_section' AND parent_id IS NOT NULL
AND parent_id NOT IN (SELECT id FROM document_chunks WHERE doc_id=:d AND source_type='hier_section')) AS dangling
FROM document_chunks WHERE doc_id=:d"""), {"d": doc_id})).one()
leaves, leaves_emb = row.hier_leaves, row.hier_leaves_emb
doc_local_100 = leaves > 0 and leaves_emb == leaves
ok = doc_local_100 and row.dangling == 0
return {
"doc_id": doc_id, "hier_leaves": leaves, "hier_leaves_embedded": leaves_emb,
"doc_local_embed_100": doc_local_100, "legacy_active": row.legacy_active,
"dangling_parent": row.dangling, "precond_ok": ok,
"reason": None if ok else (
"no_hier_leaves" if leaves == 0 else
"embed_incomplete" if not doc_local_100 else
"dangling_parent"),
}
async def replace_doc_corpus(session: AsyncSession, doc_id: int, *, dry_run: bool = True) -> dict:
pc = await precheck(session, doc_id)
pc["dry_run"] = dry_run
if not pc["precond_ok"]:
pc["action"] = "aborted"
return pc
if dry_run:
pc["action"] = "dry_run"
pc["would_deactivate_legacy"] = pc["legacy_active"]
pc["would_activate_hier_leaves"] = pc["hier_leaves"]
return pc
# atomic 교체 (단일 트랜잭션)
deact = (await session.execute(text(
"UPDATE document_chunks SET in_corpus=false WHERE doc_id=:d AND source_type='legacy' AND in_corpus=true"),
{"d": doc_id})).rowcount
act = (await session.execute(text(
"UPDATE document_chunks SET in_corpus=true WHERE doc_id=:d AND source_type='hier_section'"
" AND chunker_version=:cv AND is_leaf=true AND embedding IS NOT NULL AND in_corpus=false"),
{"d": doc_id, "cv": CHUNKER_VERSION})).rowcount
await session.commit()
pc.update({"action": "replaced", "legacy_deactivated": deact, "hier_activated": act})
return pc
async def rollback_doc_corpus(session: AsyncSession, doc_id: int) -> dict:
"""교체 역토글 (legacy 복귀, hier 비활성)."""
act = (await session.execute(text(
"UPDATE document_chunks SET in_corpus=true WHERE doc_id=:d AND source_type='legacy' AND in_corpus=false"),
{"d": doc_id})).rowcount
deact = (await session.execute(text(
"UPDATE document_chunks SET in_corpus=false WHERE doc_id=:d AND source_type='hier_section' AND in_corpus=true"),
{"d": doc_id})).rowcount
await session.commit()
return {"doc_id": doc_id, "action": "rolled_back", "legacy_reactivated": act, "hier_deactivated": deact}
@@ -0,0 +1,51 @@
"""Hier-Decomp-1 코퍼스 격리 회귀 테스트 (committed).
in_corpus=false hier leaf 가 corpus_chunks 검색에 누출되지 않음을 단언.
부분 ivfflat(WHERE in_corpus=true) + corpus_chunks 뷰 choke point 이중 보장 검증.
실행: docker exec -w /app fastapi python tests/hier_decomp/test_corpus_isolation.py
"""
import asyncio, sys
sys.path.insert(0, "/app")
from sqlalchemy import text
from core.database import async_session
from ai.client import AIClient
MARKER = "ZQXJ7F3A 격리회귀 sentinel do-not-match-anything unique probe phrase"
async def main():
cid = None
try:
emb = await AIClient().embed(MARKER)
assert emb and len(emb) == 1024
emb_str = "[" + ",".join(repr(float(x)) for x in emb) + "]"
async with async_session() as s:
doc_id = await s.scalar(text("SELECT id FROM documents WHERE deleted_at IS NULL ORDER BY id DESC LIMIT 1"))
cid = await s.scalar(text("""
INSERT INTO document_chunks
(doc_id, chunk_index, chunk_type, domain_category, text, embedding,
source_type, chunker_version, is_leaf, in_corpus, level, node_type)
VALUES (:d, 999777, 'section_md', 'general', :t, cast(cast(:e AS text) AS vector),
'hier_section', 'hier-rule-v1', true, false, 4, 'leaf') RETURNING id"""),
{"d": doc_id, "t": MARKER, "e": emb_str})
await s.commit()
corpus = [r.cid for r in (await s.execute(text(
"SELECT c.id cid FROM corpus_chunks c WHERE c.embedding IS NOT NULL "
"ORDER BY c.embedding <=> cast(:e AS vector) LIMIT 5"), {"e": emb_str})).all()]
raw = [r.cid for r in (await s.execute(text(
"SELECT c.id cid FROM document_chunks c WHERE c.embedding IS NOT NULL "
"ORDER BY c.embedding <=> cast(:e AS vector) LIMIT 5"), {"e": emb_str})).all()]
leaked = cid in corpus
in_raw = cid in raw
print(f"corpus_chunks top5={corpus}")
print(f"document_chunks top5={raw}")
assert not leaked, f"LEAK: synthetic {cid} in corpus_chunks!"
print(f"PASS — in_corpus=false leaf {cid} corpus 누출 0 (raw 검출={in_raw})")
return 0
finally:
if cid:
async with async_session() as s:
await s.execute(text("DELETE FROM document_chunks WHERE id=:i"), {"i": cid})
await s.commit()
sys.exit(asyncio.run(main()))