feat(search): Phase 2Q Diagnose Phase 2 — multi-query retrieval fusion

phase-2q-query-rewrite-diagnose.md v6 plan §5.5 + §7 Phase 2.
Phase 1B 3e6866b (scaffold + dispatcher) 위 retrieval 합성 wire-up.

신규:
- search_pipeline._rrf_fuse_variants() — N variant ranked list RRF 합성.
  fusion_service.RRFOnly 알고리즘 동일 (k=60), 첫 등장 variant representative 보존.
- search_pipeline.search_with_rewrite() — variant N 별 retrieval+fusion 후
  unified RRF (cap 60) → reranker 1회 (query=원본 q) → diversity+freshness+display.
  · per-variant K = 50//3 = 16 (PHASE2Q_PRODUCTION_TOPK//N, A1 채택)
  · variant 별 retrieval asyncio.gather 병렬
  · chunks_by_doc merge (variant 무관 unified reranker input)
  · production fusion_service.get_strategy() + rerank_chunks() 재사용
- 상수: PHASE2Q_PRODUCTION_TOPK=50, PHASE2Q_UNIFIED_CAP=60, PHASE2Q_RRF_K=60.

수정:
- search_pipeline.run_search() — rewrite_backend param 추가. hybrid + cand_<slug> 시
  search_with_rewrite() 위임. baseline/None 시 기존 single-query path 그대로 (invariant).
- app/api/search.py — Phase 1B scaffold discard call 제거. run_search 에 rewrite_backend
  전달. ValueError → 400 (unknown_rewrite_backend 우선 분기) / RuntimeError → 503
  (rewrite_llm_unavailable).
- tests/test_query_rewriter.py — Phase 2 test 9개 추가:
  · _rrf_fuse_variants 6 (single / overlap accumulation / representative / cap limit /
    empty / rank position)
  · search_pipeline import + run_search rewrite_backend default=None signature 1
  · PHASE2Q_* constants 1
  · DATABASE_URL dummy 주입 (api.search import → SQLAlchemy engine init 회피)

30/30 unit test PASS (Phase 1B 21 + Phase 2 9).

baseline 회귀 0 invariant:
- run_search(rewrite_backend=None) → 기존 path 100% 그대로 (분기 first line guard)
- run_search(rewrite_backend=baseline) → 동일
- mode != hybrid → multi-query path 비활성 (text-only/vector-only/trgm 영향 0)

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
hyungi
2026-05-23 22:41:50 +00:00
parent 3e6866b4ae
commit ecd2350c15
3 changed files with 369 additions and 28 deletions
+26 -27
View File
@@ -178,35 +178,10 @@ async def search(
rewrite_backend: str | None = Query(
None,
pattern=r"^(baseline|cand_[a-z0-9_]+)$",
description="Phase 2Q Diagnose query rewrite dispatcher (slug-based, no silent fallback). baseline|cand_multi_query_macmini|cand_multi_query_macbook. 미지정/baseline = single-query path. Phase 1B scaffold = variants 박제만, retrieval 합성은 Phase 2.",
description="Phase 2Q Diagnose query rewrite dispatcher (slug-based, no silent fallback). baseline|cand_multi_query_macmini|cand_multi_query_macbook. 미지정/baseline = single-query path. Phase 2 = variant N 별 retrieval+fusion → unified RRF → reranker 1회.",
),
):
"""문서 검색 — FTS + ILIKE + 벡터 결합 (Phase 3.1 이후 run_search wrapper)"""
# Phase 2Q Diagnose scaffold (plan v6 Phase 1):
# slug 명시 시 LLM rewrite 호출 → variants 박제 (logger). retrieval path 영향 0
# (results 미사용, baseline single-query path 유지). Phase 2 에서 search_with_rewrite() 합성.
if rewrite_backend not in (None, "baseline"):
try:
await query_rewriter.rewrite(q, rewrite_backend)
except ValueError:
return JSONResponse(
status_code=400,
content={
"error_reason": "unknown_rewrite_backend",
"backend_requested": rewrite_backend,
"allowed": query_rewriter.allowed_slugs(),
},
)
except RuntimeError as e:
return JSONResponse(
status_code=503,
content={
"error_reason": "rewrite_llm_unavailable",
"backend_requested": rewrite_backend,
"detail": str(e),
},
)
try:
pr = await run_search(
session,
@@ -220,10 +195,21 @@ async def search(
snapshot_doc_id_max=snapshot_doc_id_max,
snapshot_chunk_id_max=snapshot_chunk_id_max,
reranker_backend=reranker_backend,
rewrite_backend=rewrite_backend,
)
except ValueError as e:
# _resolve_backend / _resolve_reranker 가 unknown slug 시 ValueError → HTTP 400
# _resolve_backend / _resolve_reranker / _resolve_rewrite_backend 가 unknown slug 시 ValueError → HTTP 400
msg = str(e)
if msg.startswith("unknown_rewrite_backend"):
return JSONResponse(
status_code=400,
content={
"error_reason": "unknown_rewrite_backend",
"backend_requested": rewrite_backend,
"allowed": query_rewriter.allowed_slugs(),
"detail": msg,
},
)
if msg.startswith("unknown_reranker_backend"):
return JSONResponse(
status_code=400,
@@ -243,6 +229,19 @@ async def search(
"detail": msg,
},
)
except RuntimeError as e:
# query_rewriter.rewrite() 실패 (LLM unavailable / parse fail) → HTTP 503
msg = str(e)
if msg.startswith("rewrite_llm_unavailable"):
return JSONResponse(
status_code=503,
content={
"error_reason": "rewrite_llm_unavailable",
"backend_requested": rewrite_backend,
"detail": msg,
},
)
raise
# 사용자 feedback: 모든 단계 timing은 debug 응답과 별도로 항상 로그로 남긴다
timing_str = " ".join(f"{k}={v:.0f}" for k, v in pr.timing_ms.items())
+230 -1
View File
@@ -25,13 +25,14 @@ byte-level 에 가깝게 일치해야 한다.
from __future__ import annotations
import asyncio
import time
from dataclasses import dataclass, field
from typing import TYPE_CHECKING, Literal
from sqlalchemy.ext.asyncio import AsyncSession
from . import query_analyzer
from . import query_analyzer, query_rewriter
from .fusion_service import (
DEFAULT_FUSION,
apply_soft_filter_boost,
@@ -68,6 +69,13 @@ ANALYZER_TIER_IGNORE = 0.5 # < 0.5 → analyzer 완전 무시, soft_filter 비
ANALYZER_TIER_ORIGINAL = 0.7 # < 0.7 → original query fallback
ANALYZER_TIER_MERGE = 0.85 # < 0.85 → original + analyzed merge
# ─── Phase 2Q multi-query 합성 상수 (plan v6 §5.5 박제) ──
# per-variant top-K = PRODUCTION_TOPK // N (50 // 3 = 16, A1 채택).
# reranker batch ≤ 60 cap → latency 회귀 0.
PHASE2Q_PRODUCTION_TOPK = 50
PHASE2Q_UNIFIED_CAP = 60 # variant 합성 후 reranker 입력 후보 doc cap
PHASE2Q_RRF_K = 60 # production fusion_service.RRFOnly.K 와 동일
def _analyzer_tier(confidence: float) -> str:
"""analyzer_confidence → 사용 tier 문자열. Phase 2.2/2.3에서 실제 분기용."""
@@ -125,6 +133,7 @@ async def run_search(
snapshot_doc_id_max: int | None = None,
snapshot_chunk_id_max: int | None = None,
reranker_backend: str | None = None,
rewrite_backend: str | None = None,
) -> PipelineResult:
"""검색 파이프라인 실행.
@@ -140,6 +149,9 @@ async def run_search(
fusion: legacy | rrf | rrf_boost
rerank: bge-reranker-v2-m3 활성화 (hybrid 전용)
analyze: QueryAnalyzer 활성화 (cache hit 조건부 멀티링구얼 / soft filter)
rewrite_backend: Phase 2Q multi-query rewrite dispatcher slug. None/baseline =
single-query path (기존 동작). hybrid + cand_<slug> 시 search_with_rewrite()
로 위임 — variant N retrieval → per-variant fusion → unified RRF → reranker 1회.
Returns:
PipelineResult
@@ -147,6 +159,21 @@ async def run_search(
# 로컬 import — circular 방지 (SearchResult 는 api.search 에 inline 선언)
from api.search import SearchResult # noqa: F401 — TYPE_CHECKING 실런타임 반영
# Phase 2Q dispatch — rewrite_backend 활성 + hybrid 만 multi-query path.
# 기타 mode 또는 baseline/None 은 기존 single-query 경로 그대로.
if rewrite_backend not in (None, "baseline") and mode == "hybrid":
return await search_with_rewrite(
session, q,
limit=limit,
fusion=fusion,
rerank=rerank,
embedding_backend=embedding_backend,
snapshot_doc_id_max=snapshot_doc_id_max,
snapshot_chunk_id_max=snapshot_chunk_id_max,
reranker_backend=reranker_backend,
rewrite_backend=rewrite_backend,
)
timing: dict[str, float] = {}
notes: list[str] = []
text_results: list["SearchResult"] = []
@@ -369,3 +396,205 @@ async def run_search(
timing_ms=timing,
notes=notes,
)
# ─── Phase 2Q multi-query retrieval 합성 ──────────────────
def _rrf_fuse_variants(
variant_lists: "list[list[SearchResult]]",
k: int,
limit: int,
) -> "list[SearchResult]":
"""N variant 의 ranked list 를 RRF 합성. fusion_service.RRFOnly 알고리즘 동일.
각 doc_id 의 RRF_score = Σ 1/(k + rank_i) over variant lists.
같은 doc_id 가 여러 variant 에서 등장하면 점수 누적. 첫 등장 variant 의
SearchResult 를 representative 로 보존 (snippet/match_reason 등 메타).
"""
from api.search import SearchResult # 순환 import 회피
scores: dict[int, float] = {}
representative: dict[int, "SearchResult"] = {}
for variant_list in variant_lists:
for rank, doc in enumerate(variant_list, start=1):
doc_id = doc.id
scores[doc_id] = scores.get(doc_id, 0.0) + 1.0 / (k + rank)
if doc_id not in representative:
representative[doc_id] = doc
fused: list["SearchResult"] = []
for doc_id, rrf_score in sorted(scores.items(), key=lambda x: x[1], reverse=True):
doc = representative[doc_id]
fused.append(SearchResult(
id=doc.id,
title=doc.title,
ai_domain=doc.ai_domain,
ai_summary=doc.ai_summary,
file_format=doc.file_format,
score=rrf_score,
snippet=doc.snippet,
match_reason=f"{doc.match_reason}+multi_query_rrf",
))
return fused[:limit]
async def search_with_rewrite(
session: AsyncSession,
q: str,
*,
limit: int,
fusion: str,
rerank: bool,
embedding_backend: str | None,
snapshot_doc_id_max: int | None,
snapshot_chunk_id_max: int | None,
reranker_backend: str | None,
rewrite_backend: str,
) -> PipelineResult:
"""Phase 2Q multi-query retrieval 합성 path (plan v6 §5.5).
흐름:
1. query_rewriter.rewrite(q, slug) → variants (N=3, prompt v1 invariant)
2. variant 별 search_text + search_vector (asyncio.gather, per-variant K=16)
3. variant 별 strategy.fuse(text, vector) — production fusion 재사용
4. N variant 의 fused list → _rrf_fuse_variants (k=60, cap 60)
5. reranker 1회 (variant 무관 unified candidate set) — query = 원본 q
6. diversity + freshness + display 정규화 (run_search 동일 마무리)
LLM call 실패 / parse fail → query_rewriter.rewrite 가 RuntimeError 전파.
unknown slug → ValueError. caller(search.py) 가 HTTP 503/400 으로 translate.
mode 는 hybrid 가정 (run_search 의 분기 조건). rerank=False 시 unified_docs 그대로.
"""
from api.search import SearchResult # noqa: F401
timing: dict[str, float] = {}
notes: list[str] = []
t_total = time.perf_counter()
# 1) variants — LLM call (실패 시 caller 가 503 translate)
t_rw = time.perf_counter()
variants = await query_rewriter.rewrite(q, rewrite_backend)
timing["rewrite_ms"] = (time.perf_counter() - t_rw) * 1000
if not variants:
# 방어 — query_rewriter.rewrite 는 backend != baseline 시 list 또는 raise.
# None 이 도달하면 명시적 503 신호.
raise RuntimeError(f"rewrite_llm_unavailable:{rewrite_backend}:empty_variants")
per_variant_k = max(1, PHASE2Q_PRODUCTION_TOPK // len(variants))
notes.append(
f"rewrite={rewrite_backend} n_variants={len(variants)} "
f"per_variant_k={per_variant_k}"
)
# 2) variant 별 retrieval (text + vector) — asyncio.gather 병렬
t_var = time.perf_counter()
async def _variant_retrieve(
v: str,
) -> "tuple[list[SearchResult], list[SearchResult], dict[int, list[SearchResult]]]":
text = await search_text(session, v, per_variant_k)
raw_chunks = await search_vector(
session, v, per_variant_k,
embedding_backend=embedding_backend,
snapshot_doc_id_max=snapshot_doc_id_max,
snapshot_chunk_id_max=snapshot_chunk_id_max,
)
vector, chunks_by_doc = compress_chunks_to_docs(raw_chunks, per_variant_k)
return text, vector, chunks_by_doc
variant_outputs = await asyncio.gather(
*[_variant_retrieve(v) for v in variants]
)
timing["variant_retrieve_ms"] = (time.perf_counter() - t_var) * 1000
# 3) variant 별 fusion (production fusion 재사용)
t_fuse = time.perf_counter()
strategy = get_strategy(fusion)
per_variant_fused: list[list["SearchResult"]] = []
merged_chunks_by_doc: dict[int, list["SearchResult"]] = {}
for v, (text, vector, cbd) in zip(variants, variant_outputs):
fused = strategy.fuse(text, vector, v, per_variant_k)
per_variant_fused.append(fused)
for doc_id, chunks in cbd.items():
merged_chunks_by_doc.setdefault(doc_id, []).extend(chunks)
timing["variant_fusion_ms"] = (time.perf_counter() - t_fuse) * 1000
notes.append(f"fusion={strategy.name}")
# 4) variant 간 RRF 합성 — unified candidate set (cap 60)
t_rrf = time.perf_counter()
unified_docs = _rrf_fuse_variants(
per_variant_fused,
k=PHASE2Q_RRF_K,
limit=PHASE2Q_UNIFIED_CAP,
)
timing["unified_rrf_ms"] = (time.perf_counter() - t_rrf) * 1000
notes.append(
f"unified docs={len(unified_docs)} cap={PHASE2Q_UNIFIED_CAP}"
)
# 5) reranker 1회 (variant 무관, query = 원본 q)
if rerank:
t_re = time.perf_counter()
rerank_input: list["SearchResult"] = []
for doc in unified_docs:
chunks = merged_chunks_by_doc.get(doc.id, [])
if chunks:
rerank_input.extend(chunks[:MAX_CHUNKS_PER_DOC])
else:
rerank_input.append(doc)
if len(rerank_input) >= MAX_RERANK_INPUT:
break
rerank_input = rerank_input[:MAX_RERANK_INPUT]
notes.append(f"rerank input={len(rerank_input)}")
reranked = await rerank_chunks(
q, rerank_input, limit * 3,
reranker_backend=reranker_backend,
snapshot_doc_id_max=snapshot_doc_id_max,
snapshot_chunk_id_max=snapshot_chunk_id_max,
)
timing["rerank_ms"] = (time.perf_counter() - t_re) * 1000
t_div = time.perf_counter()
results = apply_diversity(reranked, max_per_doc=MAX_CHUNKS_PER_DOC)[:limit]
timing["diversity_ms"] = (time.perf_counter() - t_div) * 1000
else:
results = unified_docs[:limit]
# 6) freshness + display 정규화 (run_search 동일 마무리)
t_fr = time.perf_counter()
results = await apply_freshness_decay(results, session)
timing["freshness_ms"] = (time.perf_counter() - t_fr) * 1000
normalize_display_scores(results)
timing["total_ms"] = (time.perf_counter() - t_total) * 1000
# confidence — rerank 활성 시 reranker score 우선.
# multi-query 시 text/vector 개별 신호 의미 약함 → unified 결과 사용.
if rerank and "rerank_ms" in timing:
confidence_signal = compute_confidence_reranked(results)
else:
confidence_signal = compute_confidence(results, "vector")
# text_results / vector_results 는 원본 variant (index 0, prompt v1 invariant=원본 verbatim) 만 노출
text_v0, vector_v0, _ = variant_outputs[0]
return PipelineResult(
results=results,
mode="hybrid",
confidence_signal=confidence_signal,
text_results=text_v0,
vector_results=vector_v0,
raw_chunks=[], # variant 별 raw chunks 합치는 의미 약함 — debug 노출 X
chunks_by_doc=merged_chunks_by_doc,
query_analysis=None,
analyzer_cache_hit=False,
analyzer_confidence=0.0,
analyzer_tier="disabled",
timing_ms=timing,
notes=notes,
)
+113
View File
@@ -30,6 +30,10 @@ def _safe_file_handler(filename, *args, **kwargs): # type: ignore
logging.FileHandler = _safe_file_handler # type: ignore[assignment]
# Phase 2 test (search_pipeline import) 는 api.search → SQLAlchemy engine init 트리거.
# DATABASE_URL 미설정 시 ArgumentError 로 collection 실패. dummy URL 주입 (실제 connect X).
os.environ.setdefault("DATABASE_URL", "postgresql+asyncpg://test:test@localhost:5432/test")
# tests/ → 프로젝트 루트 → app/
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "app"))
@@ -210,3 +214,112 @@ def test_constants_match_plan_v6():
assert query_rewriter.LLM_REWRITE_TIMEOUT_MS == 15000
assert query_rewriter.CACHE_TTL == 86400
assert query_rewriter.CACHE_MAXSIZE == 1000
# ─── 6. Phase 2 — _rrf_fuse_variants 합성 알고리즘 ────────
def _mk_search_result(doc_id: int, score: float = 1.0, match_reason: str = "test"):
"""SearchResult 인스턴스 (api.search 의 BaseModel). file_format 은 str 필수."""
from api.search import SearchResult
return SearchResult(
id=doc_id, title=f"doc-{doc_id}", ai_domain=None,
ai_summary=None, file_format="pdf",
score=score, snippet=None, match_reason=match_reason,
)
def test_rrf_fuse_variants_single_variant_preserves_order():
from services.search.search_pipeline import _rrf_fuse_variants
docs = [_mk_search_result(i) for i in (10, 20, 30)]
out = _rrf_fuse_variants([docs], k=60, limit=10)
assert [r.id for r in out] == [10, 20, 30]
# RRF score = 1/(60+1) > 1/(60+2) > 1/(60+3)
assert out[0].score > out[1].score > out[2].score
assert "multi_query_rrf" in out[0].match_reason
def test_rrf_fuse_variants_accumulates_overlapping_doc_ids():
"""같은 doc_id 가 여러 variant 에서 top rank 면 점수 누적 → 상위."""
from services.search.search_pipeline import _rrf_fuse_variants
v1 = [_mk_search_result(i) for i in (10, 20, 30)]
v2 = [_mk_search_result(i) for i in (40, 10, 50)] # 10 이 두 variant 모두 등장
out = _rrf_fuse_variants([v1, v2], k=60, limit=10)
# 10 = 1/61 + 1/62 (rank 1 + rank 2). 다른 doc 은 1 variant 만 → 단일 RRF score.
ids = [r.id for r in out]
assert ids[0] == 10 # 누적 점수 최상위
# 40 (1/61) vs 20 (1/62) — variant 1 에서 rank 1 인 40 이 단일 등장 doc 중 최상위
assert ids[1] == 40
assert set(ids) == {10, 20, 30, 40, 50}
def test_rrf_fuse_variants_first_variant_representative():
"""같은 doc_id 가 여러 variant 에 있으면 첫 등장 variant 의 SearchResult 보존."""
from services.search.search_pipeline import _rrf_fuse_variants
v1 = [_mk_search_result(10, match_reason="from_v1")]
v2 = [_mk_search_result(10, match_reason="from_v2")]
out = _rrf_fuse_variants([v1, v2], k=60, limit=10)
assert len(out) == 1
assert out[0].id == 10
assert "from_v1" in out[0].match_reason # 첫 등장 보존
assert "multi_query_rrf" in out[0].match_reason
def test_rrf_fuse_variants_respects_limit_cap():
from services.search.search_pipeline import _rrf_fuse_variants
v1 = [_mk_search_result(i) for i in range(100, 130)] # 30 docs
v2 = [_mk_search_result(i) for i in range(200, 230)] # 30 docs, 모두 unique
out = _rrf_fuse_variants([v1, v2], k=60, limit=5)
assert len(out) == 5
def test_rrf_fuse_variants_empty_lists_returns_empty():
from services.search.search_pipeline import _rrf_fuse_variants
assert _rrf_fuse_variants([], k=60, limit=10) == []
assert _rrf_fuse_variants([[], [], []], k=60, limit=10) == []
def test_rrf_fuse_variants_rank_position_matters():
"""variant 가 길어져도 RRF 공식이 rank 만 사용."""
from services.search.search_pipeline import _rrf_fuse_variants
v1 = [_mk_search_result(10)] # rank 1
v2 = [_mk_search_result(99), _mk_search_result(10)] # 10 이 rank 2
out = _rrf_fuse_variants([v1, v2], k=60, limit=10)
# 10 = 1/61 + 1/62, 99 = 1/61. 둘 다 등장 doc 중 10 점수 높음.
assert out[0].id == 10
assert out[1].id == 99
# ─── 7. Phase 2 — search_pipeline import + run_search signature ───
def test_search_pipeline_imports_query_rewriter():
"""search_pipeline 이 query_rewriter 를 import 하는지 (dispatch 분기 활성)."""
from services.search import search_pipeline
assert hasattr(search_pipeline, "query_rewriter")
assert hasattr(search_pipeline, "search_with_rewrite")
assert hasattr(search_pipeline, "_rrf_fuse_variants")
def test_run_search_has_rewrite_backend_param():
"""run_search signature 에 rewrite_backend 가 추가됐는지."""
import inspect
from services.search.search_pipeline import run_search
sig = inspect.signature(run_search)
assert "rewrite_backend" in sig.parameters
# default = None (baseline 회귀 0 invariant)
assert sig.parameters["rewrite_backend"].default is None
def test_phase2q_constants():
"""plan v6 §5.5 박제값."""
from services.search.search_pipeline import (
PHASE2Q_PRODUCTION_TOPK,
PHASE2Q_RRF_K,
PHASE2Q_UNIFIED_CAP,
)
assert PHASE2Q_PRODUCTION_TOPK == 50
assert PHASE2Q_RRF_K == 60
assert PHASE2Q_UNIFIED_CAP == 60
# per-variant K = 50 // 3 = 16 (A1 채택)
assert PHASE2Q_PRODUCTION_TOPK // EXPECTED_N_VARIANTS == 16