"""검색 파이프라인 오케스트레이션 (Phase 3.1). `/api/search/` 와 `/api/search/ask` 가 공유하는 단일 진실 소스. ## 순수성 규칙 (영구) `run_search()`는 wrapper(endpoint)에서 side effect를 최대한 분리한다: - ❌ **금지**: `BackgroundTasks` 파라미터, `logger.info(...)` 직접 호출, `record_search_event()` 호출, `SearchResponse`/`AskResponse` 직렬화 - ✅ **허용**: `trigger_background_analysis()` (analyzer cache miss 시 fire-and-forget task — retrieval 전략의 일부, 자가 완결됨) - ✅ **허용**: retrieval / fusion / rerank / diversity / display 정규화 / confidence 계산 같은 내부 서비스 호출 반환값은 `PipelineResult` 하나. wrapper가 그 안에서 필요한 필드를 꺼내 logger / telemetry / 응답 직렬화를 수행한다. ## Phase 2 호환 본 모듈은 기존 `app/api/search.py::search()` 함수 본문을 lift-and-shift 한 것이다. 변수명 / notes 문자열 / timing 키 / logger 포맷 은 wrapper 쪽에서 완전히 동일하게 재구성된다. refactor 전후 `/search?debug=true` 응답은 byte-level 에 가깝게 일치해야 한다. """ from __future__ import annotations import time from dataclasses import dataclass, field from typing import TYPE_CHECKING, Literal from sqlalchemy.ext.asyncio import AsyncSession from . import query_analyzer from .fusion_service import ( DEFAULT_FUSION, apply_soft_filter_boost, get_strategy, normalize_display_scores, ) from .rerank_service import ( MAX_CHUNKS_PER_DOC, MAX_RERANK_INPUT, apply_diversity, rerank_chunks, ) from .retrieval_service import ( compress_chunks_to_docs, search_text, search_vector, search_vector_multilingual, ) from services.search_telemetry import ( compute_confidence, compute_confidence_hybrid, compute_confidence_reranked, ) if TYPE_CHECKING: from api.search import SearchResult # ─── Phase 2.1: analyzer_confidence 3단계 게이트 ────────── # search.py 에서 이동. search.py 의 /search wrapper 는 이 상수들을 # 노출할 필요 없으므로 파이프라인 모듈에만 둔다. ANALYZER_TIER_IGNORE = 0.5 # < 0.5 → analyzer 완전 무시, soft_filter 비활성 ANALYZER_TIER_ORIGINAL = 0.7 # < 0.7 → original query fallback ANALYZER_TIER_MERGE = 0.85 # < 0.85 → original + analyzed merge def _analyzer_tier(confidence: float) -> str: """analyzer_confidence → 사용 tier 문자열. Phase 2.2/2.3에서 실제 분기용.""" if confidence < ANALYZER_TIER_IGNORE: return "ignore" if confidence < ANALYZER_TIER_ORIGINAL: return "original_fallback" if confidence < ANALYZER_TIER_MERGE: return "merge" return "analyzed" # ─── 반환 타입 ───────────────────────────────────────────── @dataclass(slots=True) class PipelineResult: """run_search() 반환 — wrapper 가 필요한 모든 state 를 담는다.""" # ── 최종 결과 (API 노출) ── results: "list[SearchResult]" mode: str confidence_signal: float # ── 중간 단계 (evidence 입력 + debug) ── text_results: "list[SearchResult]" vector_results: "list[SearchResult]" # doc 압축 후 raw_chunks: "list[SearchResult]" # chunk 원본 (rerank/evidence용) chunks_by_doc: "dict[int, list[SearchResult]]" # ── 쿼리 분석 메타 ── query_analysis: dict | None analyzer_cache_hit: bool analyzer_confidence: float # 항상 float (None 금지) analyzer_tier: str # ── 관측 ── timing_ms: dict[str, float] = field(default_factory=dict) notes: list[str] = field(default_factory=list) # ─── 메인 파이프라인 ─────────────────────────────────────── async def run_search( session: AsyncSession, q: str, *, mode: Literal["fts", "trgm", "vector", "hybrid"] = "hybrid", limit: int = 20, fusion: str = DEFAULT_FUSION, rerank: bool = True, analyze: bool = False, ) -> PipelineResult: """검색 파이프라인 실행. retrieval → fusion → rerank → diversity → display 정규화 → confidence 계산 까지 수행하고 `PipelineResult` 를 반환한다. logging / BackgroundTasks / 응답 직렬화는 절대 수행하지 않는다 (wrapper 책임). Args: session: AsyncSession (caller 가 관리) q: 사용자 쿼리 원문 mode: fts | trgm | vector | hybrid limit: 최종 결과 수 (hybrid 에서는 fusion 입력 후보 수는 이보다 넓음) fusion: legacy | rrf | rrf_boost rerank: bge-reranker-v2-m3 활성화 (hybrid 전용) analyze: QueryAnalyzer 활성화 (cache hit 조건부 멀티링구얼 / soft filter) Returns: PipelineResult """ # 로컬 import — circular 방지 (SearchResult 는 api.search 에 inline 선언) from api.search import SearchResult # noqa: F401 — TYPE_CHECKING 실런타임 반영 timing: dict[str, float] = {} notes: list[str] = [] text_results: list["SearchResult"] = [] vector_results: list["SearchResult"] = [] # doc-level (압축 후, fusion 입력) raw_chunks: list["SearchResult"] = [] # chunk-level (raw, Phase 1.3 reranker용) chunks_by_doc: dict[int, list["SearchResult"]] = {} # Phase 1.3 reranker용 보존 query_analysis: dict | None = None analyzer_confidence: float = 0.0 analyzer_tier: str = "disabled" t_total = time.perf_counter() # Phase 2.1 (async 구조): QueryAnalyzer는 동기 호출 금지. # - cache hit → query_analysis 활용 (Phase 2.2/2.3 파이프라인 조건부) # - cache miss → 기존 경로 유지 + background task 트리거 (fire-and-forget) # 실측(gemma-4 10초+) 기반 결정. memory: feedback_analyzer_async_only.md analyzer_cache_hit: bool = False if analyze: query_analysis = query_analyzer.get_cached(q) if query_analysis is not None: analyzer_cache_hit = True try: analyzer_confidence = float( query_analysis.get("analyzer_confidence", 0.0) or 0.0 ) except (TypeError, ValueError): analyzer_confidence = 0.0 analyzer_tier = _analyzer_tier(analyzer_confidence) notes.append( f"analyzer cache_hit conf={analyzer_confidence:.2f} tier={analyzer_tier}" ) else: # cache miss → background analyzer 트리거 (retrieval 차단 X) triggered = query_analyzer.trigger_background_analysis(q) analyzer_tier = "cache_miss" notes.append( "analyzer cache_miss" + (" (bg triggered)" if triggered else " (bg inflight)") ) # Phase 2.2: multilingual vector search 활성 조건 (보수적) # - cache hit + analyzer_tier == "analyzed" (≥0.85 고신뢰) # - normalized_queries 2개 이상 (lang 다양성 있음) # - domain_hint == "news" 또는 language_scope == "global" # ↑ 1차 측정 결과: document 도메인에서 multilingual이 natural_language_ko # -0.10 악화시킴. 영어 번역이 한국어 법령 검색에서 noise로 작용. # news / global 영역에서만 multilingual 활성 (news_crosslingual +0.10 개선 확인). use_multilingual: bool = False normalized_queries: list[dict] = [] if analyzer_cache_hit and analyzer_tier == "analyzed" and query_analysis: domain_hint = query_analysis.get("domain_hint", "mixed") language_scope = query_analysis.get("language_scope", "limited") is_multilingual_candidate = ( domain_hint == "news" or language_scope == "global" ) if is_multilingual_candidate: raw_nq = query_analysis.get("normalized_queries") or [] if isinstance(raw_nq, list) and len(raw_nq) >= 2: normalized_queries = [ nq for nq in raw_nq if isinstance(nq, dict) and nq.get("text") ] if len(normalized_queries) >= 2: use_multilingual = True notes.append( f"multilingual langs={[nq.get('lang') for nq in normalized_queries]}" f" hint={domain_hint}/{language_scope}" ) if mode == "vector": t0 = time.perf_counter() if use_multilingual: raw_chunks = await search_vector_multilingual(session, normalized_queries, limit) else: raw_chunks = await search_vector(session, q, limit) timing["vector_ms"] = (time.perf_counter() - t0) * 1000 if not raw_chunks: notes.append("vector_search_returned_empty (AI client error or no embeddings)") # vector 단독 모드도 doc 압축해서 다양성 확보 (chunk 중복 방지) vector_results, chunks_by_doc = compress_chunks_to_docs(raw_chunks, limit) results = vector_results else: t0 = time.perf_counter() text_results = await search_text(session, q, limit) timing["text_ms"] = (time.perf_counter() - t0) * 1000 if mode == "hybrid": t1 = time.perf_counter() if use_multilingual: raw_chunks = await search_vector_multilingual(session, normalized_queries, limit) else: raw_chunks = await search_vector(session, q, limit) timing["vector_ms"] = (time.perf_counter() - t1) * 1000 # chunk-level → doc-level 압축 (raw chunks는 chunks_by_doc에 보존) t1b = time.perf_counter() vector_results, chunks_by_doc = compress_chunks_to_docs(raw_chunks, limit) timing["compress_ms"] = (time.perf_counter() - t1b) * 1000 if not vector_results: notes.append("vector_search_returned_empty — text-only fallback") t2 = time.perf_counter() strategy = get_strategy(fusion) # fusion은 doc 기준 — 더 넓게 가져옴 (rerank 후보용) fusion_limit = max(limit * 5, 100) if rerank else limit fused_docs = strategy.fuse(text_results, vector_results, q, fusion_limit) timing["fusion_ms"] = (time.perf_counter() - t2) * 1000 notes.append(f"fusion={strategy.name}") notes.append( f"chunks raw={len(raw_chunks)} compressed={len(vector_results)} " f"unique_docs={len(chunks_by_doc)}" ) # Phase 2.3: soft_filter boost (cache hit + tier != ignore 일 때만) # analyzer_confidence < 0.5 (tier=ignore)는 비활성. if ( analyzer_cache_hit and analyzer_tier != "ignore" and query_analysis ): soft_filters = query_analysis.get("soft_filters") or {} if soft_filters: boosted = apply_soft_filter_boost(fused_docs, soft_filters) if boosted > 0: notes.append(f"soft_filter_boost applied={boosted}") if rerank: # Phase 1.3: reranker — chunk 기준 입력 # fusion 결과 doc_id로 chunks_by_doc에서 raw chunks 회수 t3 = time.perf_counter() rerank_input: list["SearchResult"] = [] for doc in fused_docs: chunks = chunks_by_doc.get(doc.id, []) if chunks: # doc당 max 2 chunk (latency/VRAM 보호) rerank_input.extend(chunks[:MAX_CHUNKS_PER_DOC]) else: # text-only 매치 doc → doc 자체를 chunk처럼 wrap rerank_input.append(doc) if len(rerank_input) >= MAX_RERANK_INPUT: break rerank_input = rerank_input[:MAX_RERANK_INPUT] notes.append(f"rerank input={len(rerank_input)}") reranked = await rerank_chunks(q, rerank_input, limit * 3) timing["rerank_ms"] = (time.perf_counter() - t3) * 1000 # diversity (chunk → doc 압축, max_per_doc=2, top score>0.90 unlimited) t4 = time.perf_counter() results = apply_diversity(reranked, max_per_doc=MAX_CHUNKS_PER_DOC)[:limit] timing["diversity_ms"] = (time.perf_counter() - t4) * 1000 else: # rerank 비활성: fused_docs를 그대로 (limit 적용) results = fused_docs[:limit] else: results = text_results # display score 정규화 — 프론트엔드는 score*100을 % 표시. # fusion 내부 score(RRF는 0.01~0.05 범위)를 그대로 노출하면 표시가 깨짐. # Phase 3.1: rerank_score 필드는 여기서 건드리지 않음 (raw 보존). normalize_display_scores(results) timing["total_ms"] = (time.perf_counter() - t_total) * 1000 # confidence는 fusion 적용 전 raw 신호로 계산 (Phase 0.5 이후 fused score는 절대값 의미 없음) # rerank 활성 시 reranker score가 가장 신뢰할 수 있는 신호 → 우선 사용 if mode == "hybrid": if rerank and "rerank_ms" in timing: confidence_signal = compute_confidence_reranked(results) else: confidence_signal = compute_confidence_hybrid(text_results, vector_results) elif mode == "vector": confidence_signal = compute_confidence(vector_results, "vector") else: confidence_signal = compute_confidence(text_results, mode) return PipelineResult( results=results, mode=mode, confidence_signal=confidence_signal, text_results=text_results, vector_results=vector_results, raw_chunks=raw_chunks, chunks_by_doc=chunks_by_doc, query_analysis=query_analysis, analyzer_cache_hit=analyzer_cache_hit, analyzer_confidence=analyzer_confidence, analyzer_tier=analyzer_tier, timing_ms=timing, notes=notes, )