데이터 흐름 원칙: fusion=doc 기준 / reranker=chunk 기준 — 절대 섞지 말 것.
신규/수정:
- ai/client.py: rerank() 메서드 추가 (TEI POST /rerank API)
- services/search/rerank_service.py:
- rerank_chunks() — asyncio.Semaphore(2) + 5s soft timeout + RRF fallback
- _make_snippet/_extract_window — title + query 중심 200~400 토큰
(keyword 매치 없으면 첫 800자 fallback)
- apply_diversity() — max_per_doc=2, top score>=0.90 unlimited
- warmup_reranker() — 10회 retry + 3초 간격 (TEI 모델 로딩 대기)
- MAX_RERANK_INPUT=200, MAX_CHUNKS_PER_DOC=2 hard cap
- services/search_telemetry.py: compute_confidence_reranked() — sigmoid score 임계값
- api/search.py:
- ?rerank=true|false 파라미터 (기본 true, hybrid 모드만)
- 흐름: fused_docs(limit*5) → chunks_by_doc 회수 → rerank_chunks → apply_diversity
- text-only 매치 doc은 doc 자체를 chunk처럼 wrap (fallback)
- rerank 활성 시 confidence는 reranker score 기반
- tests/search_eval/run_eval.py: --rerank true|false 플래그
GPU 적용 보류:
- TEI 컨테이너 추가 (docker-compose.yml) — 별도 작업
- config.yaml rerank.endpoint 갱신 — GPU 직접 (commit 없음)
- 재인덱싱 완료 후 build + warmup + 평가셋 측정
249 lines
9.7 KiB
Python
249 lines
9.7 KiB
Python
"""하이브리드 검색 API — orchestrator (Phase 1.1: thin endpoint).
|
|
|
|
retrieval / fusion / rerank 등 실제 로직은 services/search/* 모듈로 분리.
|
|
이 파일은 mode 분기, 응답 직렬화, debug 응답 구성, BackgroundTask dispatch만 담당.
|
|
"""
|
|
|
|
import time
|
|
from typing import Annotated
|
|
|
|
from fastapi import APIRouter, BackgroundTasks, Depends, Query
|
|
from pydantic import BaseModel
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from core.auth import get_current_user
|
|
from core.database import get_session
|
|
from core.utils import setup_logger
|
|
from models.user import User
|
|
from services.search.fusion_service import DEFAULT_FUSION, get_strategy, normalize_display_scores
|
|
from services.search.rerank_service import (
|
|
MAX_CHUNKS_PER_DOC,
|
|
MAX_RERANK_INPUT,
|
|
apply_diversity,
|
|
rerank_chunks,
|
|
)
|
|
from services.search.retrieval_service import compress_chunks_to_docs, search_text, search_vector
|
|
from services.search_telemetry import (
|
|
compute_confidence,
|
|
compute_confidence_hybrid,
|
|
compute_confidence_reranked,
|
|
record_search_event,
|
|
)
|
|
|
|
# logs/search.log + stdout 동시 출력 (Phase 0.4)
|
|
logger = setup_logger("search")
|
|
|
|
router = APIRouter()
|
|
|
|
|
|
class SearchResult(BaseModel):
|
|
"""검색 결과 단일 행.
|
|
|
|
Phase 1.2-C: chunk-level vector retrieval 도입으로 chunk 메타 필드 추가.
|
|
text 검색 결과는 chunk_id 등이 None (doc-level).
|
|
vector 검색 결과는 chunk_id 등이 채워짐 (chunk-level).
|
|
"""
|
|
|
|
id: int # doc_id (text/vector 공통)
|
|
title: str | None
|
|
ai_domain: str | None
|
|
ai_summary: str | None
|
|
file_format: str
|
|
score: float
|
|
snippet: str | None
|
|
match_reason: str | None = None
|
|
# Phase 1.2-C: chunk 메타 (vector 검색 시 채워짐)
|
|
chunk_id: int | None = None
|
|
chunk_index: int | None = None
|
|
section_title: str | None = None
|
|
|
|
|
|
# ─── Phase 0.4: 디버그 응답 스키마 ─────────────────────────
|
|
|
|
|
|
class DebugCandidate(BaseModel):
|
|
"""단계별 후보 (debug=true 응답에서만 노출)."""
|
|
id: int
|
|
rank: int
|
|
score: float
|
|
match_reason: str | None = None
|
|
|
|
|
|
class SearchDebug(BaseModel):
|
|
timing_ms: dict[str, float]
|
|
text_candidates: list[DebugCandidate] | None = None
|
|
vector_candidates: list[DebugCandidate] | None = None
|
|
fused_candidates: list[DebugCandidate] | None = None
|
|
confidence: float
|
|
notes: list[str] = []
|
|
# Phase 1/2 도입 후 채워질 placeholder
|
|
query_analysis: dict | None = None
|
|
reranker_scores: list[DebugCandidate] | None = None
|
|
|
|
|
|
class SearchResponse(BaseModel):
|
|
results: list[SearchResult]
|
|
total: int
|
|
query: str
|
|
mode: str
|
|
debug: SearchDebug | None = None
|
|
|
|
|
|
def _to_debug_candidates(rows: list[SearchResult], n: int = 20) -> list[DebugCandidate]:
|
|
return [
|
|
DebugCandidate(
|
|
id=r.id, rank=i + 1, score=r.score, match_reason=r.match_reason
|
|
)
|
|
for i, r in enumerate(rows[:n])
|
|
]
|
|
|
|
|
|
@router.get("/", response_model=SearchResponse)
|
|
async def search(
|
|
q: str,
|
|
user: Annotated[User, Depends(get_current_user)],
|
|
session: Annotated[AsyncSession, Depends(get_session)],
|
|
background_tasks: BackgroundTasks,
|
|
mode: str = Query("hybrid", pattern="^(fts|trgm|vector|hybrid)$"),
|
|
limit: int = Query(20, ge=1, le=100),
|
|
fusion: str = Query(
|
|
DEFAULT_FUSION,
|
|
pattern="^(legacy|rrf|rrf_boost)$",
|
|
description="hybrid 모드 fusion 전략 (legacy=기존 가중합, rrf=RRF k=60, rrf_boost=RRF+강한신호 boost)",
|
|
),
|
|
rerank: bool = Query(
|
|
True,
|
|
description="bge-reranker-v2-m3 활성화 (Phase 1.3, hybrid 모드만 동작)",
|
|
),
|
|
debug: bool = Query(False, description="단계별 candidates + timing 응답에 포함"),
|
|
):
|
|
"""문서 검색 — FTS + ILIKE + 벡터 결합 (Phase 0.5: RRF fusion)"""
|
|
timing: dict[str, float] = {}
|
|
notes: list[str] = []
|
|
text_results: list[SearchResult] = []
|
|
vector_results: list[SearchResult] = [] # doc-level (압축 후, fusion 입력)
|
|
raw_chunks: list[SearchResult] = [] # chunk-level (raw, Phase 1.3 reranker용)
|
|
chunks_by_doc: dict[int, list[SearchResult]] = {} # Phase 1.3 reranker용 보존
|
|
|
|
t_total = time.perf_counter()
|
|
|
|
if mode == "vector":
|
|
t0 = time.perf_counter()
|
|
raw_chunks = await search_vector(session, q, limit)
|
|
timing["vector_ms"] = (time.perf_counter() - t0) * 1000
|
|
if not raw_chunks:
|
|
notes.append("vector_search_returned_empty (AI client error or no embeddings)")
|
|
# vector 단독 모드도 doc 압축해서 다양성 확보 (chunk 중복 방지)
|
|
vector_results, chunks_by_doc = compress_chunks_to_docs(raw_chunks, limit)
|
|
results = vector_results
|
|
else:
|
|
t0 = time.perf_counter()
|
|
text_results = await search_text(session, q, limit)
|
|
timing["text_ms"] = (time.perf_counter() - t0) * 1000
|
|
|
|
if mode == "hybrid":
|
|
t1 = time.perf_counter()
|
|
raw_chunks = await search_vector(session, q, limit)
|
|
timing["vector_ms"] = (time.perf_counter() - t1) * 1000
|
|
|
|
# chunk-level → doc-level 압축 (raw chunks는 chunks_by_doc에 보존)
|
|
t1b = time.perf_counter()
|
|
vector_results, chunks_by_doc = compress_chunks_to_docs(raw_chunks, limit)
|
|
timing["compress_ms"] = (time.perf_counter() - t1b) * 1000
|
|
|
|
if not vector_results:
|
|
notes.append("vector_search_returned_empty — text-only fallback")
|
|
|
|
t2 = time.perf_counter()
|
|
strategy = get_strategy(fusion)
|
|
# fusion은 doc 기준 — 더 넓게 가져옴 (rerank 후보용)
|
|
fusion_limit = max(limit * 5, 100) if rerank else limit
|
|
fused_docs = strategy.fuse(text_results, vector_results, q, fusion_limit)
|
|
timing["fusion_ms"] = (time.perf_counter() - t2) * 1000
|
|
notes.append(f"fusion={strategy.name}")
|
|
notes.append(
|
|
f"chunks raw={len(raw_chunks)} compressed={len(vector_results)} "
|
|
f"unique_docs={len(chunks_by_doc)}"
|
|
)
|
|
|
|
if rerank:
|
|
# Phase 1.3: reranker — chunk 기준 입력
|
|
# fusion 결과 doc_id로 chunks_by_doc에서 raw chunks 회수
|
|
t3 = time.perf_counter()
|
|
rerank_input: list[SearchResult] = []
|
|
for doc in fused_docs:
|
|
chunks = chunks_by_doc.get(doc.id, [])
|
|
if chunks:
|
|
# doc당 max 2 chunk (latency/VRAM 보호)
|
|
rerank_input.extend(chunks[:MAX_CHUNKS_PER_DOC])
|
|
else:
|
|
# text-only 매치 doc → doc 자체를 chunk처럼 wrap
|
|
rerank_input.append(doc)
|
|
if len(rerank_input) >= MAX_RERANK_INPUT:
|
|
break
|
|
rerank_input = rerank_input[:MAX_RERANK_INPUT]
|
|
notes.append(f"rerank input={len(rerank_input)}")
|
|
|
|
reranked = await rerank_chunks(q, rerank_input, limit * 3)
|
|
timing["rerank_ms"] = (time.perf_counter() - t3) * 1000
|
|
|
|
# diversity (chunk → doc 압축, max_per_doc=2, top score>0.90 unlimited)
|
|
t4 = time.perf_counter()
|
|
results = apply_diversity(reranked, max_per_doc=MAX_CHUNKS_PER_DOC)[:limit]
|
|
timing["diversity_ms"] = (time.perf_counter() - t4) * 1000
|
|
else:
|
|
# rerank 비활성: fused_docs를 그대로 (limit 적용)
|
|
results = fused_docs[:limit]
|
|
else:
|
|
results = text_results
|
|
|
|
# display score 정규화 — 프론트엔드는 score*100을 % 표시.
|
|
# fusion 내부 score(RRF는 0.01~0.05 범위)를 그대로 노출하면 표시가 깨짐.
|
|
normalize_display_scores(results)
|
|
|
|
timing["total_ms"] = (time.perf_counter() - t_total) * 1000
|
|
|
|
# confidence는 fusion 적용 전 raw 신호로 계산 (Phase 0.5 이후 fused score는 절대값 의미 없음)
|
|
# rerank 활성 시 reranker score가 가장 신뢰할 수 있는 신호 → 우선 사용
|
|
if mode == "hybrid":
|
|
if rerank and "rerank_ms" in timing:
|
|
confidence_signal = compute_confidence_reranked(results)
|
|
else:
|
|
confidence_signal = compute_confidence_hybrid(text_results, vector_results)
|
|
elif mode == "vector":
|
|
confidence_signal = compute_confidence(vector_results, "vector")
|
|
else:
|
|
confidence_signal = compute_confidence(text_results, mode)
|
|
|
|
# 사용자 feedback: 모든 단계 timing은 debug 응답과 별도로 항상 로그로 남긴다
|
|
timing_str = " ".join(f"{k}={v:.0f}" for k, v in timing.items())
|
|
fusion_str = f" fusion={fusion}" if mode == "hybrid" else ""
|
|
logger.info(
|
|
"search query=%r mode=%s%s results=%d conf=%.2f %s",
|
|
q[:80], mode, fusion_str, len(results), confidence_signal, timing_str,
|
|
)
|
|
|
|
# Phase 0.3: 실패 자동 로깅 (응답 latency에 영향 X — background task)
|
|
background_tasks.add_task(
|
|
record_search_event, q, user.id, results, mode, confidence_signal
|
|
)
|
|
|
|
debug_obj: SearchDebug | None = None
|
|
if debug:
|
|
debug_obj = SearchDebug(
|
|
timing_ms=timing,
|
|
text_candidates=_to_debug_candidates(text_results) if text_results or mode != "vector" else None,
|
|
vector_candidates=_to_debug_candidates(vector_results) if vector_results or mode in ("vector", "hybrid") else None,
|
|
fused_candidates=_to_debug_candidates(results) if mode == "hybrid" else None,
|
|
confidence=confidence_signal,
|
|
notes=notes,
|
|
)
|
|
|
|
return SearchResponse(
|
|
results=results,
|
|
total=len(results),
|
|
query=q,
|
|
mode=mode,
|
|
debug=debug_obj,
|
|
)
|