feat(search): corpus_variant + exact_knn measurement dispatch (replace-diagnose c4+c5)

PR-DocSrv-Hier-Replace-Diagnose-1 c4+c5. hier vs prehier(legacy) go/no-go 비파괴 측정 hook.
- 측정 뷰 3종 (hier_measure_views.sql, additive/droppable): corpus_chunks_prehier
  (legacy+null-source 375 포함) / hier_sim_raw / hier_sim_clean (childless-tiny<30 제외,
  all-tiny doc 은 legacy fallback 정합).
- retrieval_service: _resolve_corpus_variant + CORPUS_VARIANT_MAP + _VALID_CHUNKS_TABLE
  3 뷰 추가 + exact_knn(SET LOCAL enable_indexscan/bitmapscan=off, eval 전용).
  chunk leg 만 영향 (doc-level + fts/trgm = documents 무관). baseline/None path 회귀 0.
- search_pipeline.run_search + search.py: corpus_variant/exact_knn 전달, unknown→400,
  embedding_backend cand 와 동시 사용 금지(400).
- run_eval: --corpus-variant + --exact-knn flag.
- tests/test_corpus_variant.py 22 PASS (resolver/map/allowlist + SQL injection 거부).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
hyungi
2026-05-25 05:37:15 +00:00
parent e860baa179
commit 100aaa3b0c
6 changed files with 249 additions and 11 deletions
+31 -1
View File
@@ -187,6 +187,24 @@ async def search(
"opt-in 실험 reference 만 유지 — docs/phase_2q_apply_opt_in.md 의 closed status 참조."
),
),
corpus_variant: str | None = Query(
None,
pattern=r"^(prehier|hier_sim_raw|hier_sim_clean)$",
description=(
"⚠️ EVAL ONLY (Hier-Replace-Diagnose-1). chunk leg 를 측정 뷰로 교체 — "
"prehier(legacy baseline) | hier_sim_raw | hier_sim_clean(childless-tiny 제외). "
"doc-level + fts/trgm 는 documents 테이블 = 변종 무관. 미지정 = production corpus_chunks. "
"embedding_backend cand 와 동시 사용 불가 (400)."
),
),
exact_knn: bool = Query(
False,
description=(
"⚠️ EVAL ONLY (Hier-Replace-Diagnose-1). vector leg 에 SET LOCAL enable_indexscan/"
"bitmapscan=off → ivfflat 근사 제거(exact seqscan). prehier vs hier_sim 의 index 변수 "
"분리용. production 검색에는 사용 금지 (latency 큼)."
),
),
):
"""문서 검색 — FTS + ILIKE + 벡터 결합 (Phase 3.1 이후 run_search wrapper)"""
try:
@@ -203,10 +221,22 @@ async def search(
snapshot_chunk_id_max=snapshot_chunk_id_max,
reranker_backend=reranker_backend,
rewrite_backend=rewrite_backend,
corpus_variant=corpus_variant,
exact_knn=exact_knn,
)
except ValueError as e:
# _resolve_backend / _resolve_reranker / _resolve_rewrite_backend 가 unknown slug 시 ValueError → HTTP 400
# _resolve_backend / _resolve_reranker / _resolve_rewrite_backend / _resolve_corpus_variant unknown slug → HTTP 400
msg = str(e)
if msg.startswith("unknown_corpus_variant") or msg.startswith("corpus_variant_incompatible"):
return JSONResponse(
status_code=400,
content={
"error_reason": msg.split(":")[0].split(" ")[0],
"corpus_variant_requested": corpus_variant,
"allowed": ["prehier", "hier_sim_raw", "hier_sim_clean"],
"detail": msg,
},
)
if msg.startswith("unknown_rewrite_backend"):
return JSONResponse(
status_code=400,
+72 -7
View File
@@ -69,7 +69,37 @@ CANDIDATE_BACKEND_MAP: dict[str, dict[str, str] | None] = {
_VALID_DOCS_TABLE = re.compile(r"^(documents|documents_cand_[a-z0-9_]+)$")
# corpus_chunks = document_chunks WHERE in_corpus=true 뷰 (Hier-Decomp-1 c2 choke point).
# baseline retrieval 은 이 뷰만 본다 → in_corpus=false(비활성 hier leaf 등) 자동 제외.
_VALID_CHUNKS_TABLE = re.compile(r"^(document_chunks|corpus_chunks|document_chunks_cand_[a-z0-9_]+)$")
# corpus_chunks_{prehier,hier_sim_raw,hier_sim_clean} = Hier-Replace-Diagnose-1 측정 전용 뷰.
_VALID_CHUNKS_TABLE = re.compile(
r"^(document_chunks|corpus_chunks|corpus_chunks_(?:prehier|hier_sim_raw|hier_sim_clean)"
r"|document_chunks_cand_[a-z0-9_]+)$"
)
# Hier-Replace-Diagnose-1: corpus_variant slug → chunks view (baseline embedding path 한정).
# vector chunk leg 만 영향 (doc-level + fts/trgm 는 documents 테이블 = 변종 무관).
CORPUS_VARIANT_MAP: dict[str, str] = {
"prehier": "corpus_chunks_prehier",
"hier_sim_raw": "corpus_chunks_hier_sim_raw",
"hier_sim_clean": "corpus_chunks_hier_sim_clean",
}
def _resolve_corpus_variant(slug: str | None) -> str | None:
"""corpus_variant slug → 측정 뷰 명 | None(production corpus_chunks).
Raises ValueError on unknown slug (caller → HTTP 400)."""
if slug is None:
return None
if slug not in CORPUS_VARIANT_MAP:
raise ValueError(f"unknown_corpus_variant: {slug!r}")
return CORPUS_VARIANT_MAP[slug]
async def _apply_exact_knn(session: AsyncSession) -> None:
"""eval 전용: 현 트랜잭션에 ivfflat 근사 비활성 (seqscan exact KNN).
prehier(legacy, ivfflat 보유) vs hier_sim(미색인) 의 index 변수 제거 = 청킹만 분리.
SET LOCAL = 트랜잭션 scope, 비영구. production path 는 호출 안 함."""
await session.execute(text("SET LOCAL enable_indexscan = off"))
await session.execute(text("SET LOCAL enable_bitmapscan = off"))
def _resolve_backend(slug: str | None) -> dict[str, str] | None:
@@ -248,6 +278,8 @@ async def search_vector(
embedding_backend: str | None = None,
snapshot_doc_id_max: int | None = None,
snapshot_chunk_id_max: int | None = None,
corpus_variant: str | None = None,
exact_knn: bool = False,
) -> list["SearchResult"]:
"""Hybrid 벡터 검색 — doc + chunks 동시 retrieval (Phase 1.2-G).
@@ -257,6 +289,12 @@ async def search_vector(
embedding_backend=cand_<slug> → CANDIDATE_BACKEND_MAP 에서 페어 resolve.
cand 테이블 자체가 snapshot 범위로 INSERT → snapshot filter 무시 (dispatch log 만 박제).
Hier-Replace-Diagnose-1 (baseline embedding path 한정, eval 전용):
corpus_variant=prehier|hier_sim_raw|hier_sim_clean → chunk leg 만 측정 뷰로 교체
(doc-level + fts/trgm 는 documents = 변종 무관). embedding_backend cand 와 동시 X.
exact_knn=True → vector leg 에 SET LOCAL enable_indexscan/bitmapscan=off
(ivfflat 근사 제거 = 청킹 전략만 분리). production path 절대 미적용.
데이터 흐름:
1. query embedding 1번 (baseline=bge-m3 cache / cand=TEI endpoint no-cache)
2. asyncio.gather 로 두 SQL 동시 호출:
@@ -265,12 +303,15 @@ async def search_vector(
3. _merge_doc_and_chunk_vectors 가중치 + dedup (chunk 1.2 / doc 1.0).
"""
cfg = _resolve_backend(embedding_backend)
variant_table = _resolve_corpus_variant(corpus_variant)
if variant_table is not None and cfg is not None:
raise ValueError("corpus_variant_incompatible_with_embedding_backend")
if cfg is None:
docs_table = "documents"
# Hier-Decomp-1 c2: baseline chunk 검색은 corpus_chunks 뷰(in_corpus=true) 경유.
# 현재는 모든 청크 in_corpus=true 라 document_chunks 와 동일 결과(rewire=no-op).
chunks_table = "corpus_chunks"
# Hier-Replace-Diagnose-1: corpus_variant 지정 시 측정 뷰로 교체 (chunk leg 한정).
chunks_table = variant_table or "corpus_chunks"
client = AIClient()
try:
query_embedding = await _get_query_embedding(client, query)
@@ -285,12 +326,15 @@ async def search_vector(
query_embedding = await _embed_query_via_tei(cfg["embed_endpoint"], query)
logger.info(
"[embedding-dispatch] backend=%s docs_table=%s chunks_table=%s snapshot_doc_id_max=%s snapshot_chunk_id_max=%s",
"[embedding-dispatch] backend=%s docs_table=%s chunks_table=%s snapshot_doc_id_max=%s "
"snapshot_chunk_id_max=%s corpus_variant=%s exact_knn=%s",
embedding_backend or "baseline",
docs_table,
chunks_table,
snapshot_doc_id_max,
snapshot_chunk_id_max,
corpus_variant or "none",
exact_knn,
)
if query_embedding is None:
@@ -306,6 +350,7 @@ async def search_vector(
s, embedding_str, limit * 4,
docs_table=docs_table,
snapshot_doc_id_max=snapshot_doc_id_max,
exact_knn=exact_knn,
)
async def _chunks_call() -> list["SearchResult"]:
@@ -314,6 +359,7 @@ async def search_vector(
s, embedding_str, limit * 4,
chunks_table=chunks_table,
snapshot_chunk_id_max=snapshot_chunk_id_max,
exact_knn=exact_knn,
)
doc_results, chunk_results = await asyncio.gather(_docs_call(), _chunks_call())
@@ -328,6 +374,7 @@ async def _search_vector_docs(
*,
docs_table: str = "documents",
snapshot_doc_id_max: int | None = None,
exact_knn: bool = False,
) -> list["SearchResult"]:
"""documents (또는 documents_cand_<slug>).embedding 직접 검색.
@@ -342,6 +389,9 @@ async def _search_vector_docs(
if not _VALID_DOCS_TABLE.match(docs_table):
raise RuntimeError(f"invalid_docs_table: {docs_table!r}")
if exact_knn:
await _apply_exact_knn(session)
params: dict[str, Any] = {"embedding": embedding_str, "limit": limit}
if docs_table == "documents":
@@ -385,6 +435,7 @@ async def _search_vector_chunks(
*,
chunks_table: str = "document_chunks",
snapshot_chunk_id_max: int | None = None,
exact_knn: bool = False,
) -> list["SearchResult"]:
"""document_chunks (또는 document_chunks_cand_<slug>).embedding window partition.
@@ -398,11 +449,15 @@ async def _search_vector_chunks(
if not _VALID_CHUNKS_TABLE.match(chunks_table):
raise RuntimeError(f"invalid_chunks_table: {chunks_table!r}")
if exact_knn:
await _apply_exact_knn(session)
inner_k = max(limit * 5, 500)
params: dict[str, Any] = {"embedding": embedding_str, "inner_k": inner_k, "limit": limit}
snapshot_clause = ""
if chunks_table in ("document_chunks", "corpus_chunks") and snapshot_chunk_id_max is not None:
if (chunks_table in ("document_chunks", "corpus_chunks")
or chunks_table in CORPUS_VARIANT_MAP.values()) and snapshot_chunk_id_max is not None:
snapshot_clause = " AND c.id <= :snapshot_chunk_id_max"
params["snapshot_chunk_id_max"] = snapshot_chunk_id_max
@@ -481,6 +536,8 @@ async def search_vector_multilingual(
embedding_backend: str | None = None,
snapshot_doc_id_max: int | None = None,
snapshot_chunk_id_max: int | None = None,
corpus_variant: str | None = None,
exact_knn: bool = False,
) -> list["SearchResult"]:
"""Phase 2.2 — 다국어 normalized_queries 배열로 vector retrieval.
@@ -537,15 +594,21 @@ async def search_vector_multilingual(
# 2. multilingual dispatcher resolve (모든 lang query 가 동일 backend 사용)
cfg = _resolve_backend(embedding_backend)
variant_table = _resolve_corpus_variant(corpus_variant)
if variant_table is not None and cfg is not None:
raise ValueError("corpus_variant_incompatible_with_embedding_backend")
docs_table = cfg["docs_table"] if cfg else "documents"
chunks_table = cfg["chunks_table"] if cfg else "document_chunks"
chunks_table = cfg["chunks_table"] if cfg else (variant_table or "document_chunks")
logger.info(
"[embedding-dispatch] backend=%s docs_table=%s chunks_table=%s snapshot_doc_id_max=%s snapshot_chunk_id_max=%s multilingual=true",
"[embedding-dispatch] backend=%s docs_table=%s chunks_table=%s snapshot_doc_id_max=%s "
"snapshot_chunk_id_max=%s corpus_variant=%s exact_knn=%s multilingual=true",
embedding_backend or "baseline",
docs_table,
chunks_table,
snapshot_doc_id_max,
snapshot_chunk_id_max,
corpus_variant or "none",
exact_knn,
)
# 3. 각 embedding에 대해 doc + chunks 병렬 retrieval
@@ -558,6 +621,7 @@ async def search_vector_multilingual(
s, embedding_str, limit * 4,
docs_table=docs_table,
snapshot_doc_id_max=snapshot_doc_id_max,
exact_knn=exact_knn,
)
async def _chunks() -> list["SearchResult"]:
@@ -566,6 +630,7 @@ async def search_vector_multilingual(
s, embedding_str, limit * 4,
chunks_table=chunks_table,
snapshot_chunk_id_max=snapshot_chunk_id_max,
exact_knn=exact_knn,
)
doc_r, chunk_r = await asyncio.gather(_docs(), _chunks())
+10
View File
@@ -146,6 +146,8 @@ async def run_search(
snapshot_chunk_id_max: int | None = None,
reranker_backend: str | None = None,
rewrite_backend: str | None = None,
corpus_variant: str | None = None,
exact_knn: bool = False,
) -> PipelineResult:
"""검색 파이프라인 실행.
@@ -262,6 +264,8 @@ async def run_search(
embedding_backend=embedding_backend,
snapshot_doc_id_max=snapshot_doc_id_max,
snapshot_chunk_id_max=snapshot_chunk_id_max,
corpus_variant=corpus_variant,
exact_knn=exact_knn,
)
else:
raw_chunks = await search_vector(
@@ -269,6 +273,8 @@ async def run_search(
embedding_backend=embedding_backend,
snapshot_doc_id_max=snapshot_doc_id_max,
snapshot_chunk_id_max=snapshot_chunk_id_max,
corpus_variant=corpus_variant,
exact_knn=exact_knn,
)
timing["vector_ms"] = (time.perf_counter() - t0) * 1000
if not raw_chunks:
@@ -289,6 +295,8 @@ async def run_search(
embedding_backend=embedding_backend,
snapshot_doc_id_max=snapshot_doc_id_max,
snapshot_chunk_id_max=snapshot_chunk_id_max,
corpus_variant=corpus_variant,
exact_knn=exact_knn,
)
else:
raw_chunks = await search_vector(
@@ -296,6 +304,8 @@ async def run_search(
embedding_backend=embedding_backend,
snapshot_doc_id_max=snapshot_doc_id_max,
snapshot_chunk_id_max=snapshot_chunk_id_max,
corpus_variant=corpus_variant,
exact_knn=exact_knn,
)
timing["vector_ms"] = (time.perf_counter() - t1) * 1000
+37
View File
@@ -0,0 +1,37 @@
-- PR-DocSrv-Hier-Replace-Diagnose-1 c4: 측정 전용 view (additive, droppable, in_corpus 무관)
-- prehier = pre-hier baseline (legacy + null-source). hier_sim_* = post-replace 시뮬(doc 단위 fallback).
-- clean = childless-tiny(<30자) leaf 제외 (A1 held-out 발견). kept-leaf = is_leaf AND (len>=30 OR has child).
DROP VIEW IF EXISTS corpus_chunks_prehier;
DROP VIEW IF EXISTS corpus_chunks_hier_sim_raw;
DROP VIEW IF EXISTS corpus_chunks_hier_sim_clean;
CREATE VIEW corpus_chunks_prehier AS
SELECT * FROM document_chunks
WHERE source_type IS DISTINCT FROM 'hier_section' AND embedding IS NOT NULL;
CREATE VIEW corpus_chunks_hier_sim_raw AS
SELECT * FROM document_chunks dc
WHERE dc.embedding IS NOT NULL AND (
(dc.source_type = 'hier_section' AND dc.is_leaf = true)
OR (dc.source_type IS DISTINCT FROM 'hier_section'
AND NOT EXISTS (SELECT 1 FROM document_chunks h
WHERE h.doc_id = dc.doc_id AND h.source_type = 'hier_section'
AND h.is_leaf = true AND h.embedding IS NOT NULL))
);
CREATE VIEW corpus_chunks_hier_sim_clean AS
SELECT * FROM document_chunks dc
WHERE dc.embedding IS NOT NULL AND (
-- kept hier leaf: is_leaf AND NOT childless-tiny
(dc.source_type = 'hier_section' AND dc.is_leaf = true
AND (length(trim(dc.text)) >= 30
OR EXISTS (SELECT 1 FROM document_chunks ch WHERE ch.parent_id = dc.id)))
-- legacy fallback: doc 에 kept(clean) hier leaf 가 하나도 없을 때만
OR (dc.source_type IS DISTINCT FROM 'hier_section'
AND NOT EXISTS (SELECT 1 FROM document_chunks h
WHERE h.doc_id = dc.doc_id AND h.source_type = 'hier_section'
AND h.is_leaf = true AND h.embedding IS NOT NULL
AND (length(trim(h.text)) >= 30
OR EXISTS (SELECT 1 FROM document_chunks ch2 WHERE ch2.parent_id = h.id))))
);
+25 -3
View File
@@ -243,6 +243,8 @@ async def call_search(
snapshot_chunk_id_max: int | None = None,
reranker_backend: str | None = None,
rewrite_backend: str | None = None,
corpus_variant: str | None = None,
exact_knn: bool = False,
) -> tuple[list[int], float]:
"""검색 API 호출 → (doc_ids, latency_ms)."""
url = f"{base_url.rstrip('/')}/api/search/"
@@ -264,6 +266,10 @@ async def call_search(
params["reranker_backend"] = reranker_backend
if rewrite_backend is not None:
params["rewrite_backend"] = rewrite_backend
if corpus_variant is not None:
params["corpus_variant"] = corpus_variant
if exact_knn:
params["exact_knn"] = "true"
import time
@@ -296,6 +302,8 @@ async def evaluate(
snapshot_chunk_id_max: int | None = None,
reranker_backend: str | None = None,
rewrite_backend: str | None = None,
corpus_variant: str | None = None,
exact_knn: bool = False,
) -> list[QueryResult]:
"""전체 쿼리셋 평가."""
results: list[QueryResult] = []
@@ -310,6 +318,8 @@ async def evaluate(
snapshot_chunk_id_max=snapshot_chunk_id_max,
reranker_backend=reranker_backend,
rewrite_backend=rewrite_backend,
corpus_variant=corpus_variant,
exact_knn=exact_knn,
)
dedup_count = count_dedup(returned_ids, 10)
if dedup_count > 0:
@@ -1392,6 +1402,18 @@ def main() -> int:
default=None,
help="Phase 2Q Diagnose query rewrite dispatcher slug (baseline | cand_multi_query_macmini | cand_multi_query_macbook). 미지정 = single-query path. Phase 1B scaffold = variants 박제만, retrieval 합성은 Phase 2.",
)
parser.add_argument(
"--corpus-variant",
type=str,
default=None,
choices=["prehier", "hier_sim_raw", "hier_sim_clean"],
help="Hier-Replace-Diagnose-1: chunk leg 측정 뷰 (prehier=legacy baseline | hier_sim_raw | hier_sim_clean). 미지정 = production corpus_chunks.",
)
parser.add_argument(
"--exact-knn",
action="store_true",
help="Hier-Replace-Diagnose-1: vector leg exact KNN (ivfflat 근사 제거). prehier vs hier_sim 공정 비교용. eval 전용.",
)
args = parser.parse_args()
@@ -1445,21 +1467,21 @@ def main() -> int:
if args.base_url:
print(f"\n>>> evaluating: {args.base_url}")
results = asyncio.run(
evaluate(queries, args.base_url, args.token, "single", mode=args.mode, fusion=args.fusion, rerank=args.rerank, analyze=args.analyze, embedding_backend=args.embedding_backend, snapshot_doc_id_max=args.snapshot_doc_id_max, snapshot_chunk_id_max=args.snapshot_chunk_id_max, reranker_backend=args.reranker_backend, rewrite_backend=args.rewrite_backend)
evaluate(queries, args.base_url, args.token, "single", mode=args.mode, fusion=args.fusion, rerank=args.rerank, analyze=args.analyze, embedding_backend=args.embedding_backend, snapshot_doc_id_max=args.snapshot_doc_id_max, snapshot_chunk_id_max=args.snapshot_chunk_id_max, reranker_backend=args.reranker_backend, rewrite_backend=args.rewrite_backend, corpus_variant=args.corpus_variant, exact_knn=args.exact_knn)
)
print_summary("single", results, eval_version=args.eval_version)
all_results.extend(results)
else:
print(f"\n>>> baseline: {args.baseline_url}")
baseline_results = asyncio.run(
evaluate(queries, args.baseline_url, args.token, "baseline", mode=args.mode, fusion=args.fusion, rerank=args.rerank, analyze=args.analyze, embedding_backend=args.embedding_backend, snapshot_doc_id_max=args.snapshot_doc_id_max, snapshot_chunk_id_max=args.snapshot_chunk_id_max, reranker_backend=args.reranker_backend, rewrite_backend=args.rewrite_backend)
evaluate(queries, args.baseline_url, args.token, "baseline", mode=args.mode, fusion=args.fusion, rerank=args.rerank, analyze=args.analyze, embedding_backend=args.embedding_backend, snapshot_doc_id_max=args.snapshot_doc_id_max, snapshot_chunk_id_max=args.snapshot_chunk_id_max, reranker_backend=args.reranker_backend, rewrite_backend=args.rewrite_backend, corpus_variant=args.corpus_variant, exact_knn=args.exact_knn)
)
baseline_summary = print_summary("baseline", baseline_results, eval_version=args.eval_version)
print(f"\n>>> candidate: {args.candidate_url}")
candidate_results = asyncio.run(
evaluate(
queries, args.candidate_url, args.token, "candidate", mode=args.mode, fusion=args.fusion, rerank=args.rerank, analyze=args.analyze, embedding_backend=args.embedding_backend, snapshot_doc_id_max=args.snapshot_doc_id_max, snapshot_chunk_id_max=args.snapshot_chunk_id_max, reranker_backend=args.reranker_backend, rewrite_backend=args.rewrite_backend
queries, args.candidate_url, args.token, "candidate", mode=args.mode, fusion=args.fusion, rerank=args.rerank, analyze=args.analyze, embedding_backend=args.embedding_backend, snapshot_doc_id_max=args.snapshot_doc_id_max, snapshot_chunk_id_max=args.snapshot_chunk_id_max, reranker_backend=args.reranker_backend, rewrite_backend=args.rewrite_backend, corpus_variant=args.corpus_variant, exact_knn=args.exact_knn
)
)
candidate_summary = print_summary("candidate", candidate_results, eval_version=args.eval_version)
+74
View File
@@ -0,0 +1,74 @@
"""Hier-Replace-Diagnose-1 c5 — corpus_variant dispatcher 단위 테스트.
가드:
1. _resolve_corpus_variant slugview, unknown ValueError, NoneNone
2. CORPUS_VARIANT_MAP 3 slug 1:1
3. _VALID_CHUNKS_TABLE 측정 3 허용 + junk/injection 거부
"""
from __future__ import annotations
import logging
import os
import sys
import pytest
# logs/llm_gate.log root 소유 → import 시 PermissionError safe-wrap (test_query_rewriter 패턴)
_orig = logging.FileHandler
logging.FileHandler = lambda f, *a, **k: (_orig(f, *a, **k) if _try(f) else logging.NullHandler()) # type: ignore
def _try(f):
try:
open(f, "a").close()
return True
except Exception:
return False
os.environ.setdefault("DATABASE_URL", "postgresql+asyncpg://test:test@localhost:5432/test")
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "app"))
from services.search import retrieval_service as rs
def test_resolve_valid_slugs():
assert rs._resolve_corpus_variant("prehier") == "corpus_chunks_prehier"
assert rs._resolve_corpus_variant("hier_sim_raw") == "corpus_chunks_hier_sim_raw"
assert rs._resolve_corpus_variant("hier_sim_clean") == "corpus_chunks_hier_sim_clean"
def test_resolve_none():
assert rs._resolve_corpus_variant(None) is None
@pytest.mark.parametrize("bad", ["", "hier", "corpus_chunks", "prehier; DROP TABLE", "hier_sim", "HIER_SIM_CLEAN"])
def test_resolve_unknown_raises(bad):
with pytest.raises(ValueError, match="unknown_corpus_variant"):
rs._resolve_corpus_variant(bad)
def test_variant_map_keys():
assert set(rs.CORPUS_VARIANT_MAP) == {"prehier", "hier_sim_raw", "hier_sim_clean"}
@pytest.mark.parametrize("view", [
"corpus_chunks_prehier", "corpus_chunks_hier_sim_raw", "corpus_chunks_hier_sim_clean",
"document_chunks", "corpus_chunks", "document_chunks_cand_me5",
])
def test_valid_chunks_table_allows(view):
assert rs._VALID_CHUNKS_TABLE.match(view)
@pytest.mark.parametrize("bad", [
"corpus_chunks_prehier; DROP TABLE x", "corpus_chunks_hier_sim", "documents",
"corpus_chunks_evil", "'; DELETE--", "corpus_chunks_hier_sim_cleanX",
])
def test_valid_chunks_table_rejects(bad):
assert not rs._VALID_CHUNKS_TABLE.match(bad)
def test_all_mapped_views_pass_allowlist():
# resolver 가 내놓는 모든 뷰는 SQL interpolation gate 통과해야 함
for v in rs.CORPUS_VARIANT_MAP.values():
assert rs._VALID_CHUNKS_TABLE.match(v)