3b753f18d6
PR-2Q-Search-Result-Dedup. measurement chain 의 마지막 cleanup. plan inline.
root cause: apply_diversity 의 top_score ≥ 0.90 → unlimited path (diversity 제약 해제)
→ 같은 doc 의 N chunks 가 results 에 박제 → returned_ids 에 doc.id 중복 → 모든 graded
metric inflation. multi-query 의 reranker score 가 자주 0.90+ → 다수 case 영향.
변경 (baseline path 영향 0, multi-query 전용 invariant):
- app/services/search/search_pipeline.py:
· _dedup_results_by_doc_id() helper 신규 (doc.id first-only, top score 보존)
· search_with_rewrite() 의 rerank path 에 apply_diversity(top_score_threshold=2.0)
강제 + 후속 _dedup_results_by_doc_id 적용
· rerank=False path 도 _dedup_results_by_doc_id(unified_docs) 적용
- tests/test_query_rewriter.py — 신규 4 test (55/55 PASS)
🎯 진짜 측정값 (모든 dedup layer 적용, 51 case gemma):
cold: NDCG 0.663 / Recall t≥2 0.729 / Recall t≥3 0.761 / p50 3692ms / p95 9992ms
warm: NDCG 0.659 / Recall t≥2 0.721 / Recall t≥3 0.739 / p50 1588ms / p95 3514ms
baseline (rewrite_backend=null): NDCG 0.644 / Recall t≥2 0.699 / Recall t≥3 0.761 / p50 378ms
Dedup audit: gemma 0/51 ✓ 정상 (fix 작동, eval-dedup 42/51 → 0/51 회복)
Δ vs baseline (진짜 multi-query 효과):
NDCG +0.019 (cold) / +0.015 (warm) — sub-noise level
Recall t≥2 +0.030 (cold) / +0.022 (warm) — 소량 개선
Recall t≥3 0.000 / -0.022 — 동등~약간 회귀
latency p50 +876% (cold) / +320% (warm) — major cost
category: english/standards/mixed 약간 우세 / exam/korean 약간 회귀
measurement chain 정정 history:
Phase 3 (a41adb6) 0.927 — chunk_id 중복 inflation
Rerank-Fix (b734fc5) 0.876 — doc_id 중복 잔재
Eval-Dedup (3553573) 0.641 — eval layer 만 dedup
Result-Dedup (본 PR) 0.663 — production + eval 둘 다 dedup ← 정확값
사용자 결정 필요 (3 path, json 박제):
(a) rollback — marginal 개선이 latency cost 정당화 X
(b) opt-in 유지 + PR-2Q-Cache-Prewarm 진입 (warm path 만 노출)
(c) 1주 관찰 종료 후 (2026-05-31) 재결정 (현 상태 유지)
산출물:
reports/v0_2_phase2q_result_dedup_gemma_{cold,warm}_2026-05-24.csv
tests/search_eval/baselines/v0_2_phase2q_result_dedup_2026-05-24.json (요약 + 사용자 결정 옵션)
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
683 lines
29 KiB
Python
683 lines
29 KiB
Python
"""검색 파이프라인 오케스트레이션 (Phase 3.1).
|
|
|
|
`/api/search/` 와 `/api/search/ask` 가 공유하는 단일 진실 소스.
|
|
|
|
## 순수성 규칙 (영구)
|
|
|
|
`run_search()`는 wrapper(endpoint)에서 side effect를 최대한 분리한다:
|
|
- ❌ **금지**: `BackgroundTasks` 파라미터, `logger.info(...)` 직접 호출,
|
|
`record_search_event()` 호출, `SearchResponse`/`AskResponse` 직렬화
|
|
- ✅ **허용**: `trigger_background_analysis()` (analyzer cache miss 시
|
|
fire-and-forget task — retrieval 전략의 일부, 자가 완결됨)
|
|
- ✅ **허용**: retrieval / fusion / rerank / diversity / display 정규화 /
|
|
confidence 계산 같은 내부 서비스 호출
|
|
|
|
반환값은 `PipelineResult` 하나. wrapper가 그 안에서 필요한 필드를 꺼내
|
|
logger / telemetry / 응답 직렬화를 수행한다.
|
|
|
|
## Phase 2 호환
|
|
|
|
본 모듈은 기존 `app/api/search.py::search()` 함수 본문을 lift-and-shift 한
|
|
것이다. 변수명 / notes 문자열 / timing 키 / logger 포맷 은 wrapper 쪽에서
|
|
완전히 동일하게 재구성된다. refactor 전후 `/search?debug=true` 응답은
|
|
byte-level 에 가깝게 일치해야 한다.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import time
|
|
from dataclasses import dataclass, field
|
|
from typing import TYPE_CHECKING, Literal
|
|
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from . import query_analyzer, query_rewriter
|
|
from .fusion_service import (
|
|
DEFAULT_FUSION,
|
|
apply_soft_filter_boost,
|
|
get_strategy,
|
|
normalize_display_scores,
|
|
)
|
|
from .freshness_decay import apply_freshness_decay
|
|
from .rerank_service import (
|
|
MAX_CHUNKS_PER_DOC,
|
|
MAX_RERANK_INPUT,
|
|
apply_diversity,
|
|
rerank_chunks,
|
|
)
|
|
from .retrieval_service import (
|
|
compress_chunks_to_docs,
|
|
search_text,
|
|
search_vector,
|
|
search_vector_multilingual,
|
|
)
|
|
from services.search_telemetry import (
|
|
compute_confidence,
|
|
compute_confidence_hybrid,
|
|
compute_confidence_reranked,
|
|
)
|
|
|
|
if TYPE_CHECKING:
|
|
from api.search import SearchResult
|
|
|
|
|
|
# ─── Phase 2.1: analyzer_confidence 3단계 게이트 ──────────
|
|
# search.py 에서 이동. search.py 의 /search wrapper 는 이 상수들을
|
|
# 노출할 필요 없으므로 파이프라인 모듈에만 둔다.
|
|
ANALYZER_TIER_IGNORE = 0.5 # < 0.5 → analyzer 완전 무시, soft_filter 비활성
|
|
ANALYZER_TIER_ORIGINAL = 0.7 # < 0.7 → original query fallback
|
|
ANALYZER_TIER_MERGE = 0.85 # < 0.85 → original + analyzed merge
|
|
|
|
# ─── Phase 2Q multi-query 합성 상수 (plan v6 §5.5 박제) ──
|
|
# per-variant top-K = PRODUCTION_TOPK // N (50 // 3 = 16, A1 채택).
|
|
# reranker batch ≤ 60 cap → latency 회귀 0.
|
|
PHASE2Q_PRODUCTION_TOPK = 50
|
|
PHASE2Q_UNIFIED_CAP = 60 # variant 합성 후 reranker 입력 후보 doc cap
|
|
PHASE2Q_RRF_K = 60 # production fusion_service.RRFOnly.K 와 동일
|
|
|
|
# PR-2Q-Rerank-Payload-Fix (Apply prereq). multi-query path 의 reranker 입력 후보
|
|
# chunk cap. baseline path (run_search) 의 MAX_RERANK_INPUT=200 과 별도.
|
|
# 진단 history (2026-05-24):
|
|
# 1) cap 60 + dedup 0 = 413 다수 + NDCG 0.927 (Phase 3 baseline)
|
|
# 2) cap 30 + chunks_per_doc=1 + dedup = 413 0건 + NDCG 0.666 (-0.261 catastrophic)
|
|
# 3) cap 60 + chunks_per_doc=2 + dedup + TEI MAX_BATCH_TOKENS 8192→16384 = NDCG 회복
|
|
# 예상 (사용자 결정 = 본 path). doc 다양성 유지 + reranker 가 doc 의 2 best chunks
|
|
# 봄 + payload 한도 16384 안에 안전.
|
|
# baseline MAX_RERANK_INPUT=200 / MAX_CHUNKS_PER_DOC=2 는 영향 0 (multi-query 전용 cap).
|
|
PHASE2Q_RERANK_INPUT_CAP = 60
|
|
PHASE2Q_CHUNKS_PER_DOC = 2
|
|
|
|
|
|
def _analyzer_tier(confidence: float) -> str:
|
|
"""analyzer_confidence → 사용 tier 문자열. Phase 2.2/2.3에서 실제 분기용."""
|
|
if confidence < ANALYZER_TIER_IGNORE:
|
|
return "ignore"
|
|
if confidence < ANALYZER_TIER_ORIGINAL:
|
|
return "original_fallback"
|
|
if confidence < ANALYZER_TIER_MERGE:
|
|
return "merge"
|
|
return "analyzed"
|
|
|
|
|
|
# ─── 반환 타입 ─────────────────────────────────────────────
|
|
|
|
|
|
@dataclass(slots=True)
|
|
class PipelineResult:
|
|
"""run_search() 반환 — wrapper 가 필요한 모든 state 를 담는다."""
|
|
|
|
# ── 최종 결과 (API 노출) ──
|
|
results: "list[SearchResult]"
|
|
mode: str
|
|
confidence_signal: float
|
|
|
|
# ── 중간 단계 (evidence 입력 + debug) ──
|
|
text_results: "list[SearchResult]"
|
|
vector_results: "list[SearchResult]" # doc 압축 후
|
|
raw_chunks: "list[SearchResult]" # chunk 원본 (rerank/evidence용)
|
|
chunks_by_doc: "dict[int, list[SearchResult]]"
|
|
|
|
# ── 쿼리 분석 메타 ──
|
|
query_analysis: dict | None
|
|
analyzer_cache_hit: bool
|
|
analyzer_confidence: float # 항상 float (None 금지)
|
|
analyzer_tier: str
|
|
|
|
# ── 관측 ──
|
|
timing_ms: dict[str, float] = field(default_factory=dict)
|
|
notes: list[str] = field(default_factory=list)
|
|
|
|
|
|
# ─── 메인 파이프라인 ───────────────────────────────────────
|
|
|
|
|
|
async def run_search(
|
|
session: AsyncSession,
|
|
q: str,
|
|
*,
|
|
mode: Literal["fts", "trgm", "vector", "hybrid"] = "hybrid",
|
|
limit: int = 20,
|
|
fusion: str = DEFAULT_FUSION,
|
|
rerank: bool = True,
|
|
analyze: bool = False,
|
|
embedding_backend: str | None = None,
|
|
snapshot_doc_id_max: int | None = None,
|
|
snapshot_chunk_id_max: int | None = None,
|
|
reranker_backend: str | None = None,
|
|
rewrite_backend: str | None = None,
|
|
) -> PipelineResult:
|
|
"""검색 파이프라인 실행.
|
|
|
|
retrieval → fusion → rerank → diversity → display 정규화 → confidence 계산
|
|
까지 수행하고 `PipelineResult` 를 반환한다. logging / BackgroundTasks /
|
|
응답 직렬화는 절대 수행하지 않는다 (wrapper 책임).
|
|
|
|
Args:
|
|
session: AsyncSession (caller 가 관리)
|
|
q: 사용자 쿼리 원문
|
|
mode: fts | trgm | vector | hybrid
|
|
limit: 최종 결과 수 (hybrid 에서는 fusion 입력 후보 수는 이보다 넓음)
|
|
fusion: legacy | rrf | rrf_boost
|
|
rerank: bge-reranker-v2-m3 활성화 (hybrid 전용)
|
|
analyze: QueryAnalyzer 활성화 (cache hit 조건부 멀티링구얼 / soft filter)
|
|
rewrite_backend: Phase 2Q multi-query rewrite dispatcher slug. None/baseline =
|
|
single-query path (기존 동작). hybrid + cand_<slug> 시 search_with_rewrite()
|
|
로 위임 — variant N retrieval → per-variant fusion → unified RRF → reranker 1회.
|
|
|
|
Returns:
|
|
PipelineResult
|
|
"""
|
|
# 로컬 import — circular 방지 (SearchResult 는 api.search 에 inline 선언)
|
|
from api.search import SearchResult # noqa: F401 — TYPE_CHECKING 실런타임 반영
|
|
|
|
# Phase 2Q dispatch — rewrite_backend 활성 + hybrid 만 multi-query path.
|
|
# 기타 mode 또는 baseline/None 은 기존 single-query 경로 그대로.
|
|
if rewrite_backend not in (None, "baseline") and mode == "hybrid":
|
|
return await search_with_rewrite(
|
|
session, q,
|
|
limit=limit,
|
|
fusion=fusion,
|
|
rerank=rerank,
|
|
embedding_backend=embedding_backend,
|
|
snapshot_doc_id_max=snapshot_doc_id_max,
|
|
snapshot_chunk_id_max=snapshot_chunk_id_max,
|
|
reranker_backend=reranker_backend,
|
|
rewrite_backend=rewrite_backend,
|
|
)
|
|
|
|
timing: dict[str, float] = {}
|
|
notes: list[str] = []
|
|
text_results: list["SearchResult"] = []
|
|
vector_results: list["SearchResult"] = [] # doc-level (압축 후, fusion 입력)
|
|
raw_chunks: list["SearchResult"] = [] # chunk-level (raw, Phase 1.3 reranker용)
|
|
chunks_by_doc: dict[int, list["SearchResult"]] = {} # Phase 1.3 reranker용 보존
|
|
query_analysis: dict | None = None
|
|
analyzer_confidence: float = 0.0
|
|
analyzer_tier: str = "disabled"
|
|
|
|
t_total = time.perf_counter()
|
|
|
|
# Phase 2.1 (async 구조): QueryAnalyzer는 동기 호출 금지.
|
|
# - cache hit → query_analysis 활용 (Phase 2.2/2.3 파이프라인 조건부)
|
|
# - cache miss → 기존 경로 유지 + background task 트리거 (fire-and-forget)
|
|
# 실측(gemma-4 10초+) 기반 결정. memory: feedback_analyzer_async_only.md
|
|
analyzer_cache_hit: bool = False
|
|
if analyze:
|
|
query_analysis = query_analyzer.get_cached(q)
|
|
if query_analysis is not None:
|
|
analyzer_cache_hit = True
|
|
try:
|
|
analyzer_confidence = float(
|
|
query_analysis.get("analyzer_confidence", 0.0) or 0.0
|
|
)
|
|
except (TypeError, ValueError):
|
|
analyzer_confidence = 0.0
|
|
analyzer_tier = _analyzer_tier(analyzer_confidence)
|
|
notes.append(
|
|
f"analyzer cache_hit conf={analyzer_confidence:.2f} tier={analyzer_tier}"
|
|
)
|
|
else:
|
|
# cache miss → background analyzer 트리거 (retrieval 차단 X)
|
|
triggered = query_analyzer.trigger_background_analysis(q)
|
|
analyzer_tier = "cache_miss"
|
|
notes.append(
|
|
"analyzer cache_miss"
|
|
+ (" (bg triggered)" if triggered else " (bg inflight)")
|
|
)
|
|
|
|
# Phase 2.2: multilingual vector search 활성 조건 (보수적)
|
|
# - cache hit + analyzer_tier == "analyzed" (≥0.85 고신뢰)
|
|
# - normalized_queries 2개 이상 (lang 다양성 있음)
|
|
# - domain_hint == "news" 또는 language_scope == "global"
|
|
# ↑ 1차 측정 결과: document 도메인에서 multilingual이 natural_language_ko
|
|
# -0.10 악화시킴. 영어 번역이 한국어 법령 검색에서 noise로 작용.
|
|
# news / global 영역에서만 multilingual 활성 (news_crosslingual +0.10 개선 확인).
|
|
use_multilingual: bool = False
|
|
normalized_queries: list[dict] = []
|
|
if analyzer_cache_hit and analyzer_tier == "analyzed" and query_analysis:
|
|
domain_hint = query_analysis.get("domain_hint", "mixed")
|
|
language_scope = query_analysis.get("language_scope", "limited")
|
|
is_multilingual_candidate = (
|
|
domain_hint == "news" or language_scope == "global"
|
|
)
|
|
if is_multilingual_candidate:
|
|
raw_nq = query_analysis.get("normalized_queries") or []
|
|
if isinstance(raw_nq, list) and len(raw_nq) >= 2:
|
|
normalized_queries = [
|
|
nq for nq in raw_nq if isinstance(nq, dict) and nq.get("text")
|
|
]
|
|
if len(normalized_queries) >= 2:
|
|
use_multilingual = True
|
|
notes.append(
|
|
f"multilingual langs={[nq.get('lang') for nq in normalized_queries]}"
|
|
f" hint={domain_hint}/{language_scope}"
|
|
)
|
|
|
|
if mode == "vector":
|
|
t0 = time.perf_counter()
|
|
if use_multilingual:
|
|
raw_chunks = await search_vector_multilingual(
|
|
session, normalized_queries, limit,
|
|
embedding_backend=embedding_backend,
|
|
snapshot_doc_id_max=snapshot_doc_id_max,
|
|
snapshot_chunk_id_max=snapshot_chunk_id_max,
|
|
)
|
|
else:
|
|
raw_chunks = await search_vector(
|
|
session, q, limit,
|
|
embedding_backend=embedding_backend,
|
|
snapshot_doc_id_max=snapshot_doc_id_max,
|
|
snapshot_chunk_id_max=snapshot_chunk_id_max,
|
|
)
|
|
timing["vector_ms"] = (time.perf_counter() - t0) * 1000
|
|
if not raw_chunks:
|
|
notes.append("vector_search_returned_empty (AI client error or no embeddings)")
|
|
# vector 단독 모드도 doc 압축해서 다양성 확보 (chunk 중복 방지)
|
|
vector_results, chunks_by_doc = compress_chunks_to_docs(raw_chunks, limit)
|
|
results = vector_results
|
|
else:
|
|
t0 = time.perf_counter()
|
|
text_results = await search_text(session, q, limit)
|
|
timing["text_ms"] = (time.perf_counter() - t0) * 1000
|
|
|
|
if mode == "hybrid":
|
|
t1 = time.perf_counter()
|
|
if use_multilingual:
|
|
raw_chunks = await search_vector_multilingual(
|
|
session, normalized_queries, limit,
|
|
embedding_backend=embedding_backend,
|
|
snapshot_doc_id_max=snapshot_doc_id_max,
|
|
snapshot_chunk_id_max=snapshot_chunk_id_max,
|
|
)
|
|
else:
|
|
raw_chunks = await search_vector(
|
|
session, q, limit,
|
|
embedding_backend=embedding_backend,
|
|
snapshot_doc_id_max=snapshot_doc_id_max,
|
|
snapshot_chunk_id_max=snapshot_chunk_id_max,
|
|
)
|
|
timing["vector_ms"] = (time.perf_counter() - t1) * 1000
|
|
|
|
# chunk-level → doc-level 압축 (raw chunks는 chunks_by_doc에 보존)
|
|
t1b = time.perf_counter()
|
|
vector_results, chunks_by_doc = compress_chunks_to_docs(raw_chunks, limit)
|
|
timing["compress_ms"] = (time.perf_counter() - t1b) * 1000
|
|
|
|
if not vector_results:
|
|
notes.append("vector_search_returned_empty — text-only fallback")
|
|
|
|
t2 = time.perf_counter()
|
|
strategy = get_strategy(fusion)
|
|
# fusion은 doc 기준 — 더 넓게 가져옴 (rerank 후보용)
|
|
fusion_limit = max(limit * 5, 100) if rerank else limit
|
|
fused_docs = strategy.fuse(text_results, vector_results, q, fusion_limit)
|
|
timing["fusion_ms"] = (time.perf_counter() - t2) * 1000
|
|
notes.append(f"fusion={strategy.name}")
|
|
notes.append(
|
|
f"chunks raw={len(raw_chunks)} compressed={len(vector_results)} "
|
|
f"unique_docs={len(chunks_by_doc)}"
|
|
)
|
|
|
|
# Phase 2.3: soft_filter boost (cache hit + tier != ignore 일 때만)
|
|
# analyzer_confidence < 0.5 (tier=ignore)는 비활성.
|
|
if (
|
|
analyzer_cache_hit
|
|
and analyzer_tier != "ignore"
|
|
and query_analysis
|
|
):
|
|
soft_filters = query_analysis.get("soft_filters") or {}
|
|
if soft_filters:
|
|
boosted = apply_soft_filter_boost(fused_docs, soft_filters)
|
|
if boosted > 0:
|
|
notes.append(f"soft_filter_boost applied={boosted}")
|
|
|
|
if rerank:
|
|
# Phase 1.3: reranker — chunk 기준 입력
|
|
# fusion 결과 doc_id로 chunks_by_doc에서 raw chunks 회수
|
|
t3 = time.perf_counter()
|
|
rerank_input: list["SearchResult"] = []
|
|
for doc in fused_docs:
|
|
chunks = chunks_by_doc.get(doc.id, [])
|
|
if chunks:
|
|
# doc당 max 2 chunk (latency/VRAM 보호)
|
|
rerank_input.extend(chunks[:MAX_CHUNKS_PER_DOC])
|
|
else:
|
|
# text-only 매치 doc → doc 자체를 chunk처럼 wrap
|
|
rerank_input.append(doc)
|
|
if len(rerank_input) >= MAX_RERANK_INPUT:
|
|
break
|
|
rerank_input = rerank_input[:MAX_RERANK_INPUT]
|
|
notes.append(f"rerank input={len(rerank_input)}")
|
|
|
|
reranked = await rerank_chunks(
|
|
q, rerank_input, limit * 3,
|
|
reranker_backend=reranker_backend,
|
|
snapshot_doc_id_max=snapshot_doc_id_max,
|
|
snapshot_chunk_id_max=snapshot_chunk_id_max,
|
|
)
|
|
timing["rerank_ms"] = (time.perf_counter() - t3) * 1000
|
|
|
|
# diversity (chunk → doc 압축, max_per_doc=2, top score>0.90 unlimited)
|
|
t4 = time.perf_counter()
|
|
results = apply_diversity(reranked, max_per_doc=MAX_CHUNKS_PER_DOC)[:limit]
|
|
timing["diversity_ms"] = (time.perf_counter() - t4) * 1000
|
|
else:
|
|
# rerank 비활성: fused_docs를 그대로 (limit 적용)
|
|
results = fused_docs[:limit]
|
|
else:
|
|
results = text_results
|
|
|
|
# PR-RAG-Time-1: freshness decay (reranker 이후, display 정규화 직전).
|
|
# news/law_monitor 만 적용. floor 0.7. 가드는 freshness_decay.py 참조.
|
|
t_fr = time.perf_counter()
|
|
results = await apply_freshness_decay(results, session)
|
|
timing["freshness_ms"] = (time.perf_counter() - t_fr) * 1000
|
|
|
|
# display score 정규화 — 프론트엔드는 score*100을 % 표시.
|
|
# fusion 내부 score(RRF는 0.01~0.05 범위)를 그대로 노출하면 표시가 깨짐.
|
|
# Phase 3.1: rerank_score 필드는 여기서 건드리지 않음 (raw 보존).
|
|
normalize_display_scores(results)
|
|
|
|
timing["total_ms"] = (time.perf_counter() - t_total) * 1000
|
|
|
|
# confidence는 fusion 적용 전 raw 신호로 계산 (Phase 0.5 이후 fused score는 절대값 의미 없음)
|
|
# rerank 활성 시 reranker score가 가장 신뢰할 수 있는 신호 → 우선 사용
|
|
if mode == "hybrid":
|
|
if rerank and "rerank_ms" in timing:
|
|
confidence_signal = compute_confidence_reranked(results)
|
|
else:
|
|
confidence_signal = compute_confidence_hybrid(text_results, vector_results)
|
|
elif mode == "vector":
|
|
confidence_signal = compute_confidence(vector_results, "vector")
|
|
else:
|
|
confidence_signal = compute_confidence(text_results, mode)
|
|
|
|
return PipelineResult(
|
|
results=results,
|
|
mode=mode,
|
|
confidence_signal=confidence_signal,
|
|
text_results=text_results,
|
|
vector_results=vector_results,
|
|
raw_chunks=raw_chunks,
|
|
chunks_by_doc=chunks_by_doc,
|
|
query_analysis=query_analysis,
|
|
analyzer_cache_hit=analyzer_cache_hit,
|
|
analyzer_confidence=analyzer_confidence,
|
|
analyzer_tier=analyzer_tier,
|
|
timing_ms=timing,
|
|
notes=notes,
|
|
)
|
|
|
|
|
|
# ─── Phase 2Q multi-query retrieval 합성 ──────────────────
|
|
|
|
|
|
def _rrf_fuse_variants(
|
|
variant_lists: "list[list[SearchResult]]",
|
|
k: int,
|
|
limit: int,
|
|
) -> "list[SearchResult]":
|
|
"""N variant 의 ranked list 를 RRF 합성. fusion_service.RRFOnly 알고리즘 동일.
|
|
|
|
각 doc_id 의 RRF_score = Σ 1/(k + rank_i) over variant lists.
|
|
같은 doc_id 가 여러 variant 에서 등장하면 점수 누적. 첫 등장 variant 의
|
|
SearchResult 를 representative 로 보존 (snippet/match_reason 등 메타).
|
|
"""
|
|
from api.search import SearchResult # 순환 import 회피
|
|
|
|
scores: dict[int, float] = {}
|
|
representative: dict[int, "SearchResult"] = {}
|
|
|
|
for variant_list in variant_lists:
|
|
for rank, doc in enumerate(variant_list, start=1):
|
|
doc_id = doc.id
|
|
scores[doc_id] = scores.get(doc_id, 0.0) + 1.0 / (k + rank)
|
|
if doc_id not in representative:
|
|
representative[doc_id] = doc
|
|
|
|
fused: list["SearchResult"] = []
|
|
for doc_id, rrf_score in sorted(scores.items(), key=lambda x: x[1], reverse=True):
|
|
doc = representative[doc_id]
|
|
fused.append(SearchResult(
|
|
id=doc.id,
|
|
title=doc.title,
|
|
ai_domain=doc.ai_domain,
|
|
ai_summary=doc.ai_summary,
|
|
file_format=doc.file_format,
|
|
score=rrf_score,
|
|
snippet=doc.snippet,
|
|
match_reason=f"{doc.match_reason}+multi_query_rrf",
|
|
))
|
|
return fused[:limit]
|
|
|
|
|
|
def _dedup_results_by_doc_id(results: "list[SearchResult]") -> "list[SearchResult]":
|
|
"""API response 의 results 를 doc.id 기준 first-only dedup.
|
|
|
|
PR-2Q-Search-Result-Dedup. multi-query path 의 reranker output → apply_diversity 가
|
|
top_score ≥ 0.90 시 diversity 제약 해제 (unlimited path) → 같은 doc 의 N chunks 가
|
|
results 에 박제 → returned_ids 에 doc.id 중복 → graded NDCG inflation 직접 원인.
|
|
|
|
baseline (single-query) path 의 reranker 는 자연스럽게 doc 분산 score → dedup
|
|
audit 0/51 정상. multi-query 의 variants 가 같은 doc 의 정답 chunks 집중 retrieval
|
|
→ unified RRF + reranker score 합산 → 0.90+ 다수 → unlimited path → 중복.
|
|
|
|
본 helper = first-only (top score 보존). [[feedback_graded_ndcg_dedup_invariant]] +
|
|
measurement chain (Phase 3 0.927 → Rerank-Fix 0.876 → eval-dedup 0.641) 의 마지막
|
|
cleanup.
|
|
"""
|
|
seen: set[int] = set()
|
|
out: list["SearchResult"] = []
|
|
for r in results:
|
|
if r.id in seen:
|
|
continue
|
|
seen.add(r.id)
|
|
out.append(r)
|
|
return out
|
|
|
|
|
|
def _dedup_chunks_by_id(chunks: "list[SearchResult]") -> "list[SearchResult]":
|
|
"""chunk_id 기준 dedup. chunk_id None 인 doc-level result 는 doc.id 기준 first-only.
|
|
|
|
PR-2Q-Rerank-Payload-Fix (Apply prereq). multi-query path 의 merged_chunks_by_doc 가
|
|
variant 별 same chunk 중복 누적되는 문제 회피 — 같은 chunk_id 의 SearchResult 가
|
|
여러 variant 에서 등장하면 첫 등장만 유지 (variant 0 = 원본 verbatim 우선).
|
|
중복 누적이 reranker payload 폭발 → 413 → RRF fallback trigger 원인.
|
|
|
|
SearchResult.id = doc_id (api/search.py:54), SearchResult.chunk_id = optional
|
|
chunk identifier (line 63). chunk-level result 는 cid 기준, doc-level (cid=None)
|
|
은 id 기준 dedup.
|
|
"""
|
|
seen_chunk_ids: set[int] = set()
|
|
seen_doc_ids_without_chunk: set[int] = set()
|
|
result: list["SearchResult"] = []
|
|
for c in chunks:
|
|
cid = getattr(c, "chunk_id", None)
|
|
if cid is not None:
|
|
if cid in seen_chunk_ids:
|
|
continue
|
|
seen_chunk_ids.add(cid)
|
|
else:
|
|
if c.id in seen_doc_ids_without_chunk:
|
|
continue
|
|
seen_doc_ids_without_chunk.add(c.id)
|
|
result.append(c)
|
|
return result
|
|
|
|
|
|
async def search_with_rewrite(
|
|
session: AsyncSession,
|
|
q: str,
|
|
*,
|
|
limit: int,
|
|
fusion: str,
|
|
rerank: bool,
|
|
embedding_backend: str | None,
|
|
snapshot_doc_id_max: int | None,
|
|
snapshot_chunk_id_max: int | None,
|
|
reranker_backend: str | None,
|
|
rewrite_backend: str,
|
|
) -> PipelineResult:
|
|
"""Phase 2Q multi-query retrieval 합성 path (plan v6 §5.5).
|
|
|
|
흐름:
|
|
1. query_rewriter.rewrite(q, slug) → variants (N=3, prompt v1 invariant)
|
|
2. variant 별 search_text + search_vector (asyncio.gather, per-variant K=16)
|
|
3. variant 별 strategy.fuse(text, vector) — production fusion 재사용
|
|
4. N variant 의 fused list → _rrf_fuse_variants (k=60, cap 60)
|
|
5. reranker 1회 (variant 무관 unified candidate set) — query = 원본 q
|
|
6. diversity + freshness + display 정규화 (run_search 동일 마무리)
|
|
|
|
LLM call 실패 / parse fail → query_rewriter.rewrite 가 RuntimeError 전파.
|
|
unknown slug → ValueError. caller(search.py) 가 HTTP 503/400 으로 translate.
|
|
|
|
mode 는 hybrid 가정 (run_search 의 분기 조건). rerank=False 시 unified_docs 그대로.
|
|
"""
|
|
from api.search import SearchResult # noqa: F401
|
|
|
|
timing: dict[str, float] = {}
|
|
notes: list[str] = []
|
|
t_total = time.perf_counter()
|
|
|
|
# 1) variants — LLM call (실패 시 caller 가 503 translate)
|
|
t_rw = time.perf_counter()
|
|
variants = await query_rewriter.rewrite(q, rewrite_backend)
|
|
timing["rewrite_ms"] = (time.perf_counter() - t_rw) * 1000
|
|
if not variants:
|
|
# 방어 — query_rewriter.rewrite 는 backend != baseline 시 list 또는 raise.
|
|
# None 이 도달하면 명시적 503 신호.
|
|
raise RuntimeError(f"rewrite_llm_unavailable:{rewrite_backend}:empty_variants")
|
|
|
|
per_variant_k = max(1, PHASE2Q_PRODUCTION_TOPK // len(variants))
|
|
notes.append(
|
|
f"rewrite={rewrite_backend} n_variants={len(variants)} "
|
|
f"per_variant_k={per_variant_k}"
|
|
)
|
|
|
|
# 2) variant 별 retrieval (text + vector) — asyncio.gather 병렬
|
|
t_var = time.perf_counter()
|
|
|
|
async def _variant_retrieve(
|
|
v: str,
|
|
) -> "tuple[list[SearchResult], list[SearchResult], dict[int, list[SearchResult]]]":
|
|
text = await search_text(session, v, per_variant_k)
|
|
raw_chunks = await search_vector(
|
|
session, v, per_variant_k,
|
|
embedding_backend=embedding_backend,
|
|
snapshot_doc_id_max=snapshot_doc_id_max,
|
|
snapshot_chunk_id_max=snapshot_chunk_id_max,
|
|
)
|
|
vector, chunks_by_doc = compress_chunks_to_docs(raw_chunks, per_variant_k)
|
|
return text, vector, chunks_by_doc
|
|
|
|
variant_outputs = await asyncio.gather(
|
|
*[_variant_retrieve(v) for v in variants]
|
|
)
|
|
timing["variant_retrieve_ms"] = (time.perf_counter() - t_var) * 1000
|
|
|
|
# 3) variant 별 fusion (production fusion 재사용)
|
|
t_fuse = time.perf_counter()
|
|
strategy = get_strategy(fusion)
|
|
per_variant_fused: list[list["SearchResult"]] = []
|
|
merged_chunks_by_doc: dict[int, list["SearchResult"]] = {}
|
|
for v, (text, vector, cbd) in zip(variants, variant_outputs):
|
|
fused = strategy.fuse(text, vector, v, per_variant_k)
|
|
per_variant_fused.append(fused)
|
|
for doc_id, chunks in cbd.items():
|
|
merged_chunks_by_doc.setdefault(doc_id, []).extend(chunks)
|
|
# PR-2Q-Rerank-Payload-Fix: variant 별 same chunk 중복 누적 → reranker 413 방지.
|
|
# chunk_id 기준 dedup (chunk_id None 은 doc.id 기준). 첫 등장 variant 보존.
|
|
for doc_id in list(merged_chunks_by_doc.keys()):
|
|
merged_chunks_by_doc[doc_id] = _dedup_chunks_by_id(merged_chunks_by_doc[doc_id])
|
|
timing["variant_fusion_ms"] = (time.perf_counter() - t_fuse) * 1000
|
|
notes.append(f"fusion={strategy.name}")
|
|
|
|
# 4) variant 간 RRF 합성 — unified candidate set (cap 60)
|
|
t_rrf = time.perf_counter()
|
|
unified_docs = _rrf_fuse_variants(
|
|
per_variant_fused,
|
|
k=PHASE2Q_RRF_K,
|
|
limit=PHASE2Q_UNIFIED_CAP,
|
|
)
|
|
timing["unified_rrf_ms"] = (time.perf_counter() - t_rrf) * 1000
|
|
notes.append(
|
|
f"unified docs={len(unified_docs)} cap={PHASE2Q_UNIFIED_CAP}"
|
|
)
|
|
|
|
# 5) reranker 1회 (variant 무관, query = 원본 q)
|
|
if rerank:
|
|
t_re = time.perf_counter()
|
|
rerank_input: list["SearchResult"] = []
|
|
# PR-2Q-Rerank-Payload-Fix: baseline path 의 MAX_RERANK_INPUT=200 와 별도로
|
|
# multi-query 전용 더 작은 cap (30) + doc 당 1 chunk 만 — TEI MAX_BATCH_TOKENS=8192
|
|
# 한도 안에 chunk token 합산 유지. dedup 후 chunks_per_doc=1 으로 doc 다양성
|
|
# 30 docs unique 확보. baseline 의 MAX_CHUNKS_PER_DOC=2 와 별도.
|
|
for doc in unified_docs:
|
|
chunks = merged_chunks_by_doc.get(doc.id, [])
|
|
if chunks:
|
|
rerank_input.extend(chunks[:PHASE2Q_CHUNKS_PER_DOC])
|
|
else:
|
|
rerank_input.append(doc)
|
|
if len(rerank_input) >= PHASE2Q_RERANK_INPUT_CAP:
|
|
break
|
|
rerank_input = rerank_input[:PHASE2Q_RERANK_INPUT_CAP]
|
|
notes.append(f"rerank input={len(rerank_input)} cap={PHASE2Q_RERANK_INPUT_CAP}")
|
|
|
|
reranked = await rerank_chunks(
|
|
q, rerank_input, limit * 3,
|
|
reranker_backend=reranker_backend,
|
|
snapshot_doc_id_max=snapshot_doc_id_max,
|
|
snapshot_chunk_id_max=snapshot_chunk_id_max,
|
|
)
|
|
timing["rerank_ms"] = (time.perf_counter() - t_re) * 1000
|
|
|
|
t_div = time.perf_counter()
|
|
# PR-2Q-Search-Result-Dedup:
|
|
# (a) top_score_threshold=2.0 강제 — apply_diversity 의 unlimited path 우회
|
|
# (top_score ≥ 0.90 다수 case 에서 같은 doc chunks 중복 박제 원인).
|
|
# (b) _dedup_results_by_doc_id — apply_diversity 후에도 max_per_doc 가 2 라서
|
|
# 같은 doc 2 chunks 가능. doc.id 기준 first-only dedup (top score 보존).
|
|
# baseline (run_search) path 는 변경 0 — multi-query 전용 invariant.
|
|
diversified = apply_diversity(reranked, max_per_doc=MAX_CHUNKS_PER_DOC,
|
|
top_score_threshold=2.0)
|
|
results = _dedup_results_by_doc_id(diversified)[:limit]
|
|
timing["diversity_ms"] = (time.perf_counter() - t_div) * 1000
|
|
else:
|
|
results = _dedup_results_by_doc_id(unified_docs)[:limit]
|
|
|
|
# 6) freshness + display 정규화 (run_search 동일 마무리)
|
|
t_fr = time.perf_counter()
|
|
results = await apply_freshness_decay(results, session)
|
|
timing["freshness_ms"] = (time.perf_counter() - t_fr) * 1000
|
|
|
|
normalize_display_scores(results)
|
|
|
|
timing["total_ms"] = (time.perf_counter() - t_total) * 1000
|
|
|
|
# confidence — rerank 활성 시 reranker score 우선.
|
|
# multi-query 시 text/vector 개별 신호 의미 약함 → unified 결과 사용.
|
|
if rerank and "rerank_ms" in timing:
|
|
confidence_signal = compute_confidence_reranked(results)
|
|
else:
|
|
confidence_signal = compute_confidence(results, "vector")
|
|
|
|
# text_results / vector_results 는 원본 variant (index 0, prompt v1 invariant=원본 verbatim) 만 노출
|
|
text_v0, vector_v0, _ = variant_outputs[0]
|
|
|
|
return PipelineResult(
|
|
results=results,
|
|
mode="hybrid",
|
|
confidence_signal=confidence_signal,
|
|
text_results=text_v0,
|
|
vector_results=vector_v0,
|
|
raw_chunks=[], # variant 별 raw chunks 합치는 의미 약함 — debug 노출 X
|
|
chunks_by_doc=merged_chunks_by_doc,
|
|
query_analysis=None,
|
|
analyzer_cache_hit=False,
|
|
analyzer_confidence=0.0,
|
|
analyzer_tier="disabled",
|
|
timing_ms=timing,
|
|
notes=notes,
|
|
)
|