From a7b16b63db5cbe88b644dd1224a14229efd13039 Mon Sep 17 00:00:00 2001 From: hyungi Date: Sun, 24 May 2026 13:14:36 +0000 Subject: [PATCH] feat(search): doc-level atomic corpus replace + isolation test (Hier-Decomp-1 c5) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit replace_doc_corpus(dry_run): G5 precond(doc-local embed 100% + parent 무결성 + leaf>0) 검증 후 단일 트랜잭션 atomic 교체(legacy in_corpus=false / hier leaf in_corpus=true, predicate=is_leaf AND embedding NOT NULL, node_type 미사용). 물리삭제 없음. rollback_doc_corpus 역토글. precond 미충족 시 변경 0(legacy 유지). tests/hier_decomp/test_corpus_isolation.py: in_corpus=false leaf 가 corpus_chunks 누출 0 단언 (부분 ivfflat + 뷰 이중 choke point 회귀 가드). c5: dry-run 3 pilot precond_ok(5140 158L→271leaf / 5186 381→199 / 5225 18→164), 격리 테스트 PASS. 실제 replace 는 c6(1-doc-first). plan: hierarchical-decomposition-tiered-nesting-marmot.md Co-Authored-By: Claude Opus 4.7 (1M context) --- app/services/hier_decomp/replace.py | 72 ++++++++++++++++++++++ tests/hier_decomp/test_corpus_isolation.py | 51 +++++++++++++++ 2 files changed, 123 insertions(+) create mode 100644 app/services/hier_decomp/replace.py create mode 100644 tests/hier_decomp/test_corpus_isolation.py diff --git a/app/services/hier_decomp/replace.py b/app/services/hier_decomp/replace.py new file mode 100644 index 0000000..f3c6c78 --- /dev/null +++ b/app/services/hier_decomp/replace.py @@ -0,0 +1,72 @@ +"""doc 단위 atomic 코퍼스 교체 (PR-DocSrv-Hierarchical-Decomposition-1 c5/c6). + +legacy 윈도우 청크 → hier_section leaf 청크로 검색 코퍼스 교체(in_corpus 토글). +- 물리 삭제 없음(in_corpus 플래그만). 부분 ivfflat 이 자동 반영. +- G5 precondition(doc-local): hier leaf>0 + 모든 leaf embedding 보유(doc-local 100%) + parent 무결성(dangling 0). +- 단일 트랜잭션 atomic. 실패/precond 미충족 → 변경 0(legacy 유지). +- rollback: in_corpus 역토글(아래 rollback_doc_corpus). +""" +from __future__ import annotations +from sqlalchemy import text +from sqlalchemy.ext.asyncio import AsyncSession + +CHUNKER_VERSION = "hier-rule-v1" + + +async def precheck(session: AsyncSession, doc_id: int) -> dict: + row = (await session.execute(text(""" + SELECT + count(*) FILTER (WHERE source_type='hier_section' AND is_leaf) AS hier_leaves, + count(*) FILTER (WHERE source_type='hier_section' AND is_leaf AND embedding IS NOT NULL) AS hier_leaves_emb, + count(*) FILTER (WHERE source_type='legacy' AND in_corpus) AS legacy_active, + count(*) FILTER (WHERE source_type='hier_section' AND parent_id IS NOT NULL + AND parent_id NOT IN (SELECT id FROM document_chunks WHERE doc_id=:d AND source_type='hier_section')) AS dangling + FROM document_chunks WHERE doc_id=:d"""), {"d": doc_id})).one() + leaves, leaves_emb = row.hier_leaves, row.hier_leaves_emb + doc_local_100 = leaves > 0 and leaves_emb == leaves + ok = doc_local_100 and row.dangling == 0 + return { + "doc_id": doc_id, "hier_leaves": leaves, "hier_leaves_embedded": leaves_emb, + "doc_local_embed_100": doc_local_100, "legacy_active": row.legacy_active, + "dangling_parent": row.dangling, "precond_ok": ok, + "reason": None if ok else ( + "no_hier_leaves" if leaves == 0 else + "embed_incomplete" if not doc_local_100 else + "dangling_parent"), + } + + +async def replace_doc_corpus(session: AsyncSession, doc_id: int, *, dry_run: bool = True) -> dict: + pc = await precheck(session, doc_id) + pc["dry_run"] = dry_run + if not pc["precond_ok"]: + pc["action"] = "aborted" + return pc + if dry_run: + pc["action"] = "dry_run" + pc["would_deactivate_legacy"] = pc["legacy_active"] + pc["would_activate_hier_leaves"] = pc["hier_leaves"] + return pc + # atomic 교체 (단일 트랜잭션) + deact = (await session.execute(text( + "UPDATE document_chunks SET in_corpus=false WHERE doc_id=:d AND source_type='legacy' AND in_corpus=true"), + {"d": doc_id})).rowcount + act = (await session.execute(text( + "UPDATE document_chunks SET in_corpus=true WHERE doc_id=:d AND source_type='hier_section'" + " AND chunker_version=:cv AND is_leaf=true AND embedding IS NOT NULL AND in_corpus=false"), + {"d": doc_id, "cv": CHUNKER_VERSION})).rowcount + await session.commit() + pc.update({"action": "replaced", "legacy_deactivated": deact, "hier_activated": act}) + return pc + + +async def rollback_doc_corpus(session: AsyncSession, doc_id: int) -> dict: + """교체 역토글 (legacy 복귀, hier 비활성).""" + act = (await session.execute(text( + "UPDATE document_chunks SET in_corpus=true WHERE doc_id=:d AND source_type='legacy' AND in_corpus=false"), + {"d": doc_id})).rowcount + deact = (await session.execute(text( + "UPDATE document_chunks SET in_corpus=false WHERE doc_id=:d AND source_type='hier_section' AND in_corpus=true"), + {"d": doc_id})).rowcount + await session.commit() + return {"doc_id": doc_id, "action": "rolled_back", "legacy_reactivated": act, "hier_deactivated": deact} diff --git a/tests/hier_decomp/test_corpus_isolation.py b/tests/hier_decomp/test_corpus_isolation.py new file mode 100644 index 0000000..87c5a73 --- /dev/null +++ b/tests/hier_decomp/test_corpus_isolation.py @@ -0,0 +1,51 @@ +"""Hier-Decomp-1 코퍼스 격리 회귀 테스트 (committed). +in_corpus=false hier leaf 가 corpus_chunks 검색에 누출되지 않음을 단언. +부분 ivfflat(WHERE in_corpus=true) + corpus_chunks 뷰 choke point 이중 보장 검증. +실행: docker exec -w /app fastapi python tests/hier_decomp/test_corpus_isolation.py +""" +import asyncio, sys +sys.path.insert(0, "/app") +from sqlalchemy import text +from core.database import async_session +from ai.client import AIClient + +MARKER = "ZQXJ7F3A 격리회귀 sentinel do-not-match-anything unique probe phrase" + + +async def main(): + cid = None + try: + emb = await AIClient().embed(MARKER) + assert emb and len(emb) == 1024 + emb_str = "[" + ",".join(repr(float(x)) for x in emb) + "]" + async with async_session() as s: + doc_id = await s.scalar(text("SELECT id FROM documents WHERE deleted_at IS NULL ORDER BY id DESC LIMIT 1")) + cid = await s.scalar(text(""" + INSERT INTO document_chunks + (doc_id, chunk_index, chunk_type, domain_category, text, embedding, + source_type, chunker_version, is_leaf, in_corpus, level, node_type) + VALUES (:d, 999777, 'section_md', 'general', :t, cast(cast(:e AS text) AS vector), + 'hier_section', 'hier-rule-v1', true, false, 4, 'leaf') RETURNING id"""), + {"d": doc_id, "t": MARKER, "e": emb_str}) + await s.commit() + corpus = [r.cid for r in (await s.execute(text( + "SELECT c.id cid FROM corpus_chunks c WHERE c.embedding IS NOT NULL " + "ORDER BY c.embedding <=> cast(:e AS vector) LIMIT 5"), {"e": emb_str})).all()] + raw = [r.cid for r in (await s.execute(text( + "SELECT c.id cid FROM document_chunks c WHERE c.embedding IS NOT NULL " + "ORDER BY c.embedding <=> cast(:e AS vector) LIMIT 5"), {"e": emb_str})).all()] + leaked = cid in corpus + in_raw = cid in raw + print(f"corpus_chunks top5={corpus}") + print(f"document_chunks top5={raw}") + assert not leaked, f"LEAK: synthetic {cid} in corpus_chunks!" + print(f"PASS — in_corpus=false leaf {cid} corpus 누출 0 (raw 검출={in_raw})") + return 0 + finally: + if cid: + async with async_session() as s: + await s.execute(text("DELETE FROM document_chunks WHERE id=:i"), {"i": cid}) + await s.commit() + + +sys.exit(asyncio.run(main()))