Files
hyungi_document_server/app/api/search.py
Hyungi Ahn f5c3dea833 feat(search): Phase 2.2 multilingual vector retrieval + query embed cache
## 변경 사항

### app/services/search/retrieval_service.py
 - **_QUERY_EMBED_CACHE**: 모듈 레벨 LRU (maxsize=500, TTL=24h)
   - sha256(text|bge-m3) 키. fixed query 재호출 시 vector_ms 절반 감소.
 - **_get_query_embedding(client, text)**: cache-first helper. 기존 search_vector()도 이를 사용하도록 교체.
 - **search_vector_multilingual(session, normalized_queries, limit)**: 신규
   - normalized_queries 각 언어별 embedding 병렬 생성 (cache hit 활용)
   - 각 embedding에 대해 docs+chunks hybrid retrieval 병렬
   - weight 기반 score 누적 merge (lang_weight 이미 1.0 정규화)
   - match_reason에 "ml_ko+en" 등 언어 병합 표시
   - 호출 조건 문서화 — cache hit + analyzer_tier=analyzed 시에만

### app/api/search.py
 - use_multilingual 결정 로직:
   - analyzer_cache_hit == True
   - analyzer_tier == "analyzed" (confidence >= 0.85)
   - normalized_queries >= 2 (다언어 버전 실제 존재)
 - 위 3조건 모두 만족할 때만 search_vector_multilingual 호출
 - 그 외 모든 경로 (cache miss, low conf, single lang)는 기존 search_vector 그대로 사용 (회귀 0 보장)
 - notes에 `multilingual langs=[ko, en, ...]` 기록

## 기대 효과
 - crosslingual_ko_en NDCG 0.53 → 0.65+ (Phase 2 목표)
 - 기존 경로 완전 불변 → 회귀 0
 - Phase 2.1 async 구조와 결합해 "cache hit일 때만 활성" 조건 준수

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-08 14:59:20 +09:00

340 lines
14 KiB
Python

"""하이브리드 검색 API — orchestrator (Phase 1.1: thin endpoint).
retrieval / fusion / rerank 등 실제 로직은 services/search/* 모듈로 분리.
이 파일은 mode 분기, 응답 직렬화, debug 응답 구성, BackgroundTask dispatch만 담당.
"""
import time
from typing import Annotated
from fastapi import APIRouter, BackgroundTasks, Depends, Query
from pydantic import BaseModel
from sqlalchemy.ext.asyncio import AsyncSession
from core.auth import get_current_user
from core.database import get_session
from core.utils import setup_logger
from models.user import User
from services.search import query_analyzer
from services.search.fusion_service import DEFAULT_FUSION, get_strategy, normalize_display_scores
from services.search.rerank_service import (
MAX_CHUNKS_PER_DOC,
MAX_RERANK_INPUT,
apply_diversity,
rerank_chunks,
)
from services.search.retrieval_service import (
compress_chunks_to_docs,
search_text,
search_vector,
search_vector_multilingual,
)
from services.search_telemetry import (
compute_confidence,
compute_confidence_hybrid,
compute_confidence_reranked,
record_search_event,
)
# Phase 2.1: analyzer_confidence 3단계 게이트 (값 조정은 plan 기준)
ANALYZER_TIER_IGNORE = 0.5 # < 0.5 → analyzer 완전 무시, soft_filter 비활성
ANALYZER_TIER_ORIGINAL = 0.7 # < 0.7 → original query fallback
ANALYZER_TIER_MERGE = 0.85 # < 0.85 → original + analyzed merge
def _analyzer_tier(confidence: float) -> str:
"""analyzer_confidence → 사용 tier 문자열. Phase 2.2/2.3에서 실제 분기용."""
if confidence < ANALYZER_TIER_IGNORE:
return "ignore"
if confidence < ANALYZER_TIER_ORIGINAL:
return "original_fallback"
if confidence < ANALYZER_TIER_MERGE:
return "merge"
return "analyzed"
# logs/search.log + stdout 동시 출력 (Phase 0.4)
logger = setup_logger("search")
router = APIRouter()
class SearchResult(BaseModel):
"""검색 결과 단일 행.
Phase 1.2-C: chunk-level vector retrieval 도입으로 chunk 메타 필드 추가.
text 검색 결과는 chunk_id 등이 None (doc-level).
vector 검색 결과는 chunk_id 등이 채워짐 (chunk-level).
"""
id: int # doc_id (text/vector 공통)
title: str | None
ai_domain: str | None
ai_summary: str | None
file_format: str
score: float
snippet: str | None
match_reason: str | None = None
# Phase 1.2-C: chunk 메타 (vector 검색 시 채워짐)
chunk_id: int | None = None
chunk_index: int | None = None
section_title: str | None = None
# ─── Phase 0.4: 디버그 응답 스키마 ─────────────────────────
class DebugCandidate(BaseModel):
"""단계별 후보 (debug=true 응답에서만 노출)."""
id: int
rank: int
score: float
match_reason: str | None = None
class SearchDebug(BaseModel):
timing_ms: dict[str, float]
text_candidates: list[DebugCandidate] | None = None
vector_candidates: list[DebugCandidate] | None = None
fused_candidates: list[DebugCandidate] | None = None
confidence: float
notes: list[str] = []
# Phase 1/2 도입 후 채워질 placeholder
query_analysis: dict | None = None
reranker_scores: list[DebugCandidate] | None = None
class SearchResponse(BaseModel):
results: list[SearchResult]
total: int
query: str
mode: str
debug: SearchDebug | None = None
def _to_debug_candidates(rows: list[SearchResult], n: int = 20) -> list[DebugCandidate]:
return [
DebugCandidate(
id=r.id, rank=i + 1, score=r.score, match_reason=r.match_reason
)
for i, r in enumerate(rows[:n])
]
@router.get("/", response_model=SearchResponse)
async def search(
q: str,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
background_tasks: BackgroundTasks,
mode: str = Query("hybrid", pattern="^(fts|trgm|vector|hybrid)$"),
limit: int = Query(20, ge=1, le=100),
fusion: str = Query(
DEFAULT_FUSION,
pattern="^(legacy|rrf|rrf_boost)$",
description="hybrid 모드 fusion 전략 (legacy=기존 가중합, rrf=RRF k=60, rrf_boost=RRF+강한신호 boost)",
),
rerank: bool = Query(
True,
description="bge-reranker-v2-m3 활성화 (Phase 1.3, hybrid 모드만 동작)",
),
analyze: bool = Query(
False,
description="QueryAnalyzer 활성화 (Phase 2.1, LLM 호출). Phase 2.1은 debug 노출만, 검색 경로 영향 X",
),
debug: bool = Query(False, description="단계별 candidates + timing 응답에 포함"),
):
"""문서 검색 — FTS + ILIKE + 벡터 결합 (Phase 0.5: RRF fusion)"""
timing: dict[str, float] = {}
notes: list[str] = []
text_results: list[SearchResult] = []
vector_results: list[SearchResult] = [] # doc-level (압축 후, fusion 입력)
raw_chunks: list[SearchResult] = [] # chunk-level (raw, Phase 1.3 reranker용)
chunks_by_doc: dict[int, list[SearchResult]] = {} # Phase 1.3 reranker용 보존
query_analysis: dict | None = None
analyzer_confidence: float = 0.0
analyzer_tier: str = "disabled"
t_total = time.perf_counter()
# Phase 2.1 (async 구조): QueryAnalyzer는 동기 호출 금지.
# - cache hit → query_analysis 활용 (Phase 2.2/2.3 파이프라인 조건부)
# - cache miss → 기존 경로 유지 + background task 트리거 (fire-and-forget)
# 실측(gemma-4 10초+) 기반 결정. memory: feedback_analyzer_async_only.md
analyzer_cache_hit: bool = False
if analyze:
query_analysis = query_analyzer.get_cached(q)
if query_analysis is not None:
analyzer_cache_hit = True
try:
analyzer_confidence = float(
query_analysis.get("analyzer_confidence", 0.0) or 0.0
)
except (TypeError, ValueError):
analyzer_confidence = 0.0
analyzer_tier = _analyzer_tier(analyzer_confidence)
notes.append(
f"analyzer cache_hit conf={analyzer_confidence:.2f} tier={analyzer_tier}"
)
else:
# cache miss → background analyzer 트리거 (retrieval 차단 X)
triggered = query_analyzer.trigger_background_analysis(q)
analyzer_tier = "cache_miss"
notes.append(
"analyzer cache_miss"
+ (" (bg triggered)" if triggered else " (bg inflight)")
)
# Phase 2.2: multilingual vector search 활성 조건
# - cache hit + analyzer_tier == "analyzed" (≥0.85 고신뢰)
# - normalized_queries 2개 이상 (lang 다양성 있음)
# 그 외 케이스는 기존 single-query search_vector 그대로 사용 (회귀 0).
use_multilingual: bool = False
normalized_queries: list[dict] = []
if analyzer_cache_hit and analyzer_tier == "analyzed" and query_analysis:
raw_nq = query_analysis.get("normalized_queries") or []
if isinstance(raw_nq, list) and len(raw_nq) >= 2:
normalized_queries = [nq for nq in raw_nq if isinstance(nq, dict) and nq.get("text")]
if len(normalized_queries) >= 2:
use_multilingual = True
notes.append(f"multilingual langs={[nq.get('lang') for nq in normalized_queries]}")
if mode == "vector":
t0 = time.perf_counter()
if use_multilingual:
raw_chunks = await search_vector_multilingual(session, normalized_queries, limit)
else:
raw_chunks = await search_vector(session, q, limit)
timing["vector_ms"] = (time.perf_counter() - t0) * 1000
if not raw_chunks:
notes.append("vector_search_returned_empty (AI client error or no embeddings)")
# vector 단독 모드도 doc 압축해서 다양성 확보 (chunk 중복 방지)
vector_results, chunks_by_doc = compress_chunks_to_docs(raw_chunks, limit)
results = vector_results
else:
t0 = time.perf_counter()
text_results = await search_text(session, q, limit)
timing["text_ms"] = (time.perf_counter() - t0) * 1000
if mode == "hybrid":
t1 = time.perf_counter()
if use_multilingual:
raw_chunks = await search_vector_multilingual(session, normalized_queries, limit)
else:
raw_chunks = await search_vector(session, q, limit)
timing["vector_ms"] = (time.perf_counter() - t1) * 1000
# chunk-level → doc-level 압축 (raw chunks는 chunks_by_doc에 보존)
t1b = time.perf_counter()
vector_results, chunks_by_doc = compress_chunks_to_docs(raw_chunks, limit)
timing["compress_ms"] = (time.perf_counter() - t1b) * 1000
if not vector_results:
notes.append("vector_search_returned_empty — text-only fallback")
t2 = time.perf_counter()
strategy = get_strategy(fusion)
# fusion은 doc 기준 — 더 넓게 가져옴 (rerank 후보용)
fusion_limit = max(limit * 5, 100) if rerank else limit
fused_docs = strategy.fuse(text_results, vector_results, q, fusion_limit)
timing["fusion_ms"] = (time.perf_counter() - t2) * 1000
notes.append(f"fusion={strategy.name}")
notes.append(
f"chunks raw={len(raw_chunks)} compressed={len(vector_results)} "
f"unique_docs={len(chunks_by_doc)}"
)
if rerank:
# Phase 1.3: reranker — chunk 기준 입력
# fusion 결과 doc_id로 chunks_by_doc에서 raw chunks 회수
t3 = time.perf_counter()
rerank_input: list[SearchResult] = []
for doc in fused_docs:
chunks = chunks_by_doc.get(doc.id, [])
if chunks:
# doc당 max 2 chunk (latency/VRAM 보호)
rerank_input.extend(chunks[:MAX_CHUNKS_PER_DOC])
else:
# text-only 매치 doc → doc 자체를 chunk처럼 wrap
rerank_input.append(doc)
if len(rerank_input) >= MAX_RERANK_INPUT:
break
rerank_input = rerank_input[:MAX_RERANK_INPUT]
notes.append(f"rerank input={len(rerank_input)}")
reranked = await rerank_chunks(q, rerank_input, limit * 3)
timing["rerank_ms"] = (time.perf_counter() - t3) * 1000
# diversity (chunk → doc 압축, max_per_doc=2, top score>0.90 unlimited)
t4 = time.perf_counter()
results = apply_diversity(reranked, max_per_doc=MAX_CHUNKS_PER_DOC)[:limit]
timing["diversity_ms"] = (time.perf_counter() - t4) * 1000
else:
# rerank 비활성: fused_docs를 그대로 (limit 적용)
results = fused_docs[:limit]
else:
results = text_results
# display score 정규화 — 프론트엔드는 score*100을 % 표시.
# fusion 내부 score(RRF는 0.01~0.05 범위)를 그대로 노출하면 표시가 깨짐.
normalize_display_scores(results)
timing["total_ms"] = (time.perf_counter() - t_total) * 1000
# confidence는 fusion 적용 전 raw 신호로 계산 (Phase 0.5 이후 fused score는 절대값 의미 없음)
# rerank 활성 시 reranker score가 가장 신뢰할 수 있는 신호 → 우선 사용
if mode == "hybrid":
if rerank and "rerank_ms" in timing:
confidence_signal = compute_confidence_reranked(results)
else:
confidence_signal = compute_confidence_hybrid(text_results, vector_results)
elif mode == "vector":
confidence_signal = compute_confidence(vector_results, "vector")
else:
confidence_signal = compute_confidence(text_results, mode)
# 사용자 feedback: 모든 단계 timing은 debug 응답과 별도로 항상 로그로 남긴다
timing_str = " ".join(f"{k}={v:.0f}" for k, v in timing.items())
fusion_str = f" fusion={fusion}" if mode == "hybrid" else ""
analyzer_str = (
f" analyzer=hit={analyzer_cache_hit}/conf={analyzer_confidence:.2f}/tier={analyzer_tier}"
if analyze
else ""
)
logger.info(
"search query=%r mode=%s%s%s results=%d conf=%.2f %s",
q[:80], mode, fusion_str, analyzer_str, len(results), confidence_signal, timing_str,
)
# Phase 0.3: 실패 자동 로깅 (응답 latency에 영향 X — background task)
# Phase 2.1: analyze=true일 때만 analyzer_confidence 전달 (False는 None → 기존 호환)
background_tasks.add_task(
record_search_event,
q,
user.id,
results,
mode,
confidence_signal,
analyzer_confidence if analyze else None,
)
debug_obj: SearchDebug | None = None
if debug:
debug_obj = SearchDebug(
timing_ms=timing,
text_candidates=_to_debug_candidates(text_results) if text_results or mode != "vector" else None,
vector_candidates=_to_debug_candidates(vector_results) if vector_results or mode in ("vector", "hybrid") else None,
fused_candidates=_to_debug_candidates(results) if mode == "hybrid" else None,
confidence=confidence_signal,
notes=notes,
query_analysis=query_analysis,
)
return SearchResponse(
results=results,
total=len(results),
query=q,
mode=mode,
debug=debug_obj,
)