"""하이브리드 검색 API — orchestrator (Phase 1.1: thin endpoint). retrieval / fusion / rerank 등 실제 로직은 services/search/* 모듈로 분리. 이 파일은 mode 분기, 응답 직렬화, debug 응답 구성, BackgroundTask dispatch만 담당. """ import time from typing import Annotated from fastapi import APIRouter, BackgroundTasks, Depends, Query from pydantic import BaseModel from sqlalchemy.ext.asyncio import AsyncSession from core.auth import get_current_user from core.database import get_session from core.utils import setup_logger from models.user import User from services.search import query_analyzer from services.search.fusion_service import DEFAULT_FUSION, get_strategy, normalize_display_scores from services.search.rerank_service import ( MAX_CHUNKS_PER_DOC, MAX_RERANK_INPUT, apply_diversity, rerank_chunks, ) from services.search.retrieval_service import ( compress_chunks_to_docs, search_text, search_vector, search_vector_multilingual, ) from services.search_telemetry import ( compute_confidence, compute_confidence_hybrid, compute_confidence_reranked, record_search_event, ) # Phase 2.1: analyzer_confidence 3단계 게이트 (값 조정은 plan 기준) ANALYZER_TIER_IGNORE = 0.5 # < 0.5 → analyzer 완전 무시, soft_filter 비활성 ANALYZER_TIER_ORIGINAL = 0.7 # < 0.7 → original query fallback ANALYZER_TIER_MERGE = 0.85 # < 0.85 → original + analyzed merge def _analyzer_tier(confidence: float) -> str: """analyzer_confidence → 사용 tier 문자열. Phase 2.2/2.3에서 실제 분기용.""" if confidence < ANALYZER_TIER_IGNORE: return "ignore" if confidence < ANALYZER_TIER_ORIGINAL: return "original_fallback" if confidence < ANALYZER_TIER_MERGE: return "merge" return "analyzed" # logs/search.log + stdout 동시 출력 (Phase 0.4) logger = setup_logger("search") router = APIRouter() class SearchResult(BaseModel): """검색 결과 단일 행. Phase 1.2-C: chunk-level vector retrieval 도입으로 chunk 메타 필드 추가. text 검색 결과는 chunk_id 등이 None (doc-level). vector 검색 결과는 chunk_id 등이 채워짐 (chunk-level). """ id: int # doc_id (text/vector 공통) title: str | None ai_domain: str | None ai_summary: str | None file_format: str score: float snippet: str | None match_reason: str | None = None # Phase 1.2-C: chunk 메타 (vector 검색 시 채워짐) chunk_id: int | None = None chunk_index: int | None = None section_title: str | None = None # ─── Phase 0.4: 디버그 응답 스키마 ───────────────────────── class DebugCandidate(BaseModel): """단계별 후보 (debug=true 응답에서만 노출).""" id: int rank: int score: float match_reason: str | None = None class SearchDebug(BaseModel): timing_ms: dict[str, float] text_candidates: list[DebugCandidate] | None = None vector_candidates: list[DebugCandidate] | None = None fused_candidates: list[DebugCandidate] | None = None confidence: float notes: list[str] = [] # Phase 1/2 도입 후 채워질 placeholder query_analysis: dict | None = None reranker_scores: list[DebugCandidate] | None = None class SearchResponse(BaseModel): results: list[SearchResult] total: int query: str mode: str debug: SearchDebug | None = None def _to_debug_candidates(rows: list[SearchResult], n: int = 20) -> list[DebugCandidate]: return [ DebugCandidate( id=r.id, rank=i + 1, score=r.score, match_reason=r.match_reason ) for i, r in enumerate(rows[:n]) ] @router.get("/", response_model=SearchResponse) async def search( q: str, user: Annotated[User, Depends(get_current_user)], session: Annotated[AsyncSession, Depends(get_session)], background_tasks: BackgroundTasks, mode: str = Query("hybrid", pattern="^(fts|trgm|vector|hybrid)$"), limit: int = Query(20, ge=1, le=100), fusion: str = Query( DEFAULT_FUSION, pattern="^(legacy|rrf|rrf_boost)$", description="hybrid 모드 fusion 전략 (legacy=기존 가중합, rrf=RRF k=60, rrf_boost=RRF+강한신호 boost)", ), rerank: bool = Query( True, description="bge-reranker-v2-m3 활성화 (Phase 1.3, hybrid 모드만 동작)", ), analyze: bool = Query( False, description="QueryAnalyzer 활성화 (Phase 2.1, LLM 호출). Phase 2.1은 debug 노출만, 검색 경로 영향 X", ), debug: bool = Query(False, description="단계별 candidates + timing 응답에 포함"), ): """문서 검색 — FTS + ILIKE + 벡터 결합 (Phase 0.5: RRF fusion)""" timing: dict[str, float] = {} notes: list[str] = [] text_results: list[SearchResult] = [] vector_results: list[SearchResult] = [] # doc-level (압축 후, fusion 입력) raw_chunks: list[SearchResult] = [] # chunk-level (raw, Phase 1.3 reranker용) chunks_by_doc: dict[int, list[SearchResult]] = {} # Phase 1.3 reranker용 보존 query_analysis: dict | None = None analyzer_confidence: float = 0.0 analyzer_tier: str = "disabled" t_total = time.perf_counter() # Phase 2.1 (async 구조): QueryAnalyzer는 동기 호출 금지. # - cache hit → query_analysis 활용 (Phase 2.2/2.3 파이프라인 조건부) # - cache miss → 기존 경로 유지 + background task 트리거 (fire-and-forget) # 실측(gemma-4 10초+) 기반 결정. memory: feedback_analyzer_async_only.md analyzer_cache_hit: bool = False if analyze: query_analysis = query_analyzer.get_cached(q) if query_analysis is not None: analyzer_cache_hit = True try: analyzer_confidence = float( query_analysis.get("analyzer_confidence", 0.0) or 0.0 ) except (TypeError, ValueError): analyzer_confidence = 0.0 analyzer_tier = _analyzer_tier(analyzer_confidence) notes.append( f"analyzer cache_hit conf={analyzer_confidence:.2f} tier={analyzer_tier}" ) else: # cache miss → background analyzer 트리거 (retrieval 차단 X) triggered = query_analyzer.trigger_background_analysis(q) analyzer_tier = "cache_miss" notes.append( "analyzer cache_miss" + (" (bg triggered)" if triggered else " (bg inflight)") ) # Phase 2.2: multilingual vector search 활성 조건 # - cache hit + analyzer_tier == "analyzed" (≥0.85 고신뢰) # - normalized_queries 2개 이상 (lang 다양성 있음) # 그 외 케이스는 기존 single-query search_vector 그대로 사용 (회귀 0). use_multilingual: bool = False normalized_queries: list[dict] = [] if analyzer_cache_hit and analyzer_tier == "analyzed" and query_analysis: raw_nq = query_analysis.get("normalized_queries") or [] if isinstance(raw_nq, list) and len(raw_nq) >= 2: normalized_queries = [nq for nq in raw_nq if isinstance(nq, dict) and nq.get("text")] if len(normalized_queries) >= 2: use_multilingual = True notes.append(f"multilingual langs={[nq.get('lang') for nq in normalized_queries]}") if mode == "vector": t0 = time.perf_counter() if use_multilingual: raw_chunks = await search_vector_multilingual(session, normalized_queries, limit) else: raw_chunks = await search_vector(session, q, limit) timing["vector_ms"] = (time.perf_counter() - t0) * 1000 if not raw_chunks: notes.append("vector_search_returned_empty (AI client error or no embeddings)") # vector 단독 모드도 doc 압축해서 다양성 확보 (chunk 중복 방지) vector_results, chunks_by_doc = compress_chunks_to_docs(raw_chunks, limit) results = vector_results else: t0 = time.perf_counter() text_results = await search_text(session, q, limit) timing["text_ms"] = (time.perf_counter() - t0) * 1000 if mode == "hybrid": t1 = time.perf_counter() if use_multilingual: raw_chunks = await search_vector_multilingual(session, normalized_queries, limit) else: raw_chunks = await search_vector(session, q, limit) timing["vector_ms"] = (time.perf_counter() - t1) * 1000 # chunk-level → doc-level 압축 (raw chunks는 chunks_by_doc에 보존) t1b = time.perf_counter() vector_results, chunks_by_doc = compress_chunks_to_docs(raw_chunks, limit) timing["compress_ms"] = (time.perf_counter() - t1b) * 1000 if not vector_results: notes.append("vector_search_returned_empty — text-only fallback") t2 = time.perf_counter() strategy = get_strategy(fusion) # fusion은 doc 기준 — 더 넓게 가져옴 (rerank 후보용) fusion_limit = max(limit * 5, 100) if rerank else limit fused_docs = strategy.fuse(text_results, vector_results, q, fusion_limit) timing["fusion_ms"] = (time.perf_counter() - t2) * 1000 notes.append(f"fusion={strategy.name}") notes.append( f"chunks raw={len(raw_chunks)} compressed={len(vector_results)} " f"unique_docs={len(chunks_by_doc)}" ) if rerank: # Phase 1.3: reranker — chunk 기준 입력 # fusion 결과 doc_id로 chunks_by_doc에서 raw chunks 회수 t3 = time.perf_counter() rerank_input: list[SearchResult] = [] for doc in fused_docs: chunks = chunks_by_doc.get(doc.id, []) if chunks: # doc당 max 2 chunk (latency/VRAM 보호) rerank_input.extend(chunks[:MAX_CHUNKS_PER_DOC]) else: # text-only 매치 doc → doc 자체를 chunk처럼 wrap rerank_input.append(doc) if len(rerank_input) >= MAX_RERANK_INPUT: break rerank_input = rerank_input[:MAX_RERANK_INPUT] notes.append(f"rerank input={len(rerank_input)}") reranked = await rerank_chunks(q, rerank_input, limit * 3) timing["rerank_ms"] = (time.perf_counter() - t3) * 1000 # diversity (chunk → doc 압축, max_per_doc=2, top score>0.90 unlimited) t4 = time.perf_counter() results = apply_diversity(reranked, max_per_doc=MAX_CHUNKS_PER_DOC)[:limit] timing["diversity_ms"] = (time.perf_counter() - t4) * 1000 else: # rerank 비활성: fused_docs를 그대로 (limit 적용) results = fused_docs[:limit] else: results = text_results # display score 정규화 — 프론트엔드는 score*100을 % 표시. # fusion 내부 score(RRF는 0.01~0.05 범위)를 그대로 노출하면 표시가 깨짐. normalize_display_scores(results) timing["total_ms"] = (time.perf_counter() - t_total) * 1000 # confidence는 fusion 적용 전 raw 신호로 계산 (Phase 0.5 이후 fused score는 절대값 의미 없음) # rerank 활성 시 reranker score가 가장 신뢰할 수 있는 신호 → 우선 사용 if mode == "hybrid": if rerank and "rerank_ms" in timing: confidence_signal = compute_confidence_reranked(results) else: confidence_signal = compute_confidence_hybrid(text_results, vector_results) elif mode == "vector": confidence_signal = compute_confidence(vector_results, "vector") else: confidence_signal = compute_confidence(text_results, mode) # 사용자 feedback: 모든 단계 timing은 debug 응답과 별도로 항상 로그로 남긴다 timing_str = " ".join(f"{k}={v:.0f}" for k, v in timing.items()) fusion_str = f" fusion={fusion}" if mode == "hybrid" else "" analyzer_str = ( f" analyzer=hit={analyzer_cache_hit}/conf={analyzer_confidence:.2f}/tier={analyzer_tier}" if analyze else "" ) logger.info( "search query=%r mode=%s%s%s results=%d conf=%.2f %s", q[:80], mode, fusion_str, analyzer_str, len(results), confidence_signal, timing_str, ) # Phase 0.3: 실패 자동 로깅 (응답 latency에 영향 X — background task) # Phase 2.1: analyze=true일 때만 analyzer_confidence 전달 (False는 None → 기존 호환) background_tasks.add_task( record_search_event, q, user.id, results, mode, confidence_signal, analyzer_confidence if analyze else None, ) debug_obj: SearchDebug | None = None if debug: debug_obj = SearchDebug( timing_ms=timing, text_candidates=_to_debug_candidates(text_results) if text_results or mode != "vector" else None, vector_candidates=_to_debug_candidates(vector_results) if vector_results or mode in ("vector", "hybrid") else None, fused_candidates=_to_debug_candidates(results) if mode == "hybrid" else None, confidence=confidence_signal, notes=notes, query_analysis=query_analysis, ) return SearchResponse( results=results, total=len(results), query=q, mode=mode, debug=debug_obj, )