feat(search): Phase 1.3 TEI reranker 통합 (코드 골격)

데이터 흐름 원칙: fusion=doc 기준 / reranker=chunk 기준 — 절대 섞지 말 것.

신규/수정:
- ai/client.py: rerank() 메서드 추가 (TEI POST /rerank API)
- services/search/rerank_service.py:
    - rerank_chunks() — asyncio.Semaphore(2) + 5s soft timeout + RRF fallback
    - _make_snippet/_extract_window — title + query 중심 200~400 토큰
      (keyword 매치 없으면 첫 800자 fallback)
    - apply_diversity() — max_per_doc=2, top score>=0.90 unlimited
    - warmup_reranker() — 10회 retry + 3초 간격 (TEI 모델 로딩 대기)
    - MAX_RERANK_INPUT=200, MAX_CHUNKS_PER_DOC=2 hard cap
- services/search_telemetry.py: compute_confidence_reranked() — sigmoid score 임계값
- api/search.py:
    - ?rerank=true|false 파라미터 (기본 true, hybrid 모드만)
    - 흐름: fused_docs(limit*5) → chunks_by_doc 회수 → rerank_chunks → apply_diversity
    - text-only 매치 doc은 doc 자체를 chunk처럼 wrap (fallback)
    - rerank 활성 시 confidence는 reranker score 기반
- tests/search_eval/run_eval.py: --rerank true|false 플래그

GPU 적용 보류:
- TEI 컨테이너 추가 (docker-compose.yml) — 별도 작업
- config.yaml rerank.endpoint 갱신 — GPU 직접 (commit 없음)
- 재인덱싱 완료 후 build + warmup + 평가셋 측정
This commit is contained in:
Hyungi Ahn
2026-04-08 12:41:47 +09:00
parent b80116243f
commit 76e723cdb1
5 changed files with 306 additions and 7 deletions

View File

@@ -85,6 +85,25 @@ class AIClient:
# TODO: Qwen2.5-VL-7B 비전 모델 호출 구현 # TODO: Qwen2.5-VL-7B 비전 모델 호출 구현
raise NotImplementedError("OCR는 Phase 1에서 구현") raise NotImplementedError("OCR는 Phase 1에서 구현")
async def rerank(self, query: str, texts: list[str]) -> list[dict]:
"""TEI bge-reranker-v2-m3 호출 (Phase 1.3).
TEI POST /rerank API:
request: {"query": str, "texts": [str, ...]}
response: [{"index": int, "score": float}, ...] (정렬됨)
timeout은 self.ai.rerank.timeout (config.yaml).
호출자(rerank_service)가 asyncio.Semaphore + try/except로 감쌈.
"""
timeout = float(self.ai.rerank.timeout) if self.ai.rerank.timeout else 5.0
response = await self._http.post(
self.ai.rerank.endpoint,
json={"query": query, "texts": texts},
timeout=timeout,
)
response.raise_for_status()
return response.json()
async def _call_chat(self, model_config, prompt: str) -> str: async def _call_chat(self, model_config, prompt: str) -> str:
"""OpenAI 호환 API 호출 + 자동 폴백""" """OpenAI 호환 API 호출 + 자동 폴백"""
try: try:

View File

@@ -16,10 +16,17 @@ from core.database import get_session
from core.utils import setup_logger from core.utils import setup_logger
from models.user import User from models.user import User
from services.search.fusion_service import DEFAULT_FUSION, get_strategy, normalize_display_scores from services.search.fusion_service import DEFAULT_FUSION, get_strategy, normalize_display_scores
from services.search.rerank_service import (
MAX_CHUNKS_PER_DOC,
MAX_RERANK_INPUT,
apply_diversity,
rerank_chunks,
)
from services.search.retrieval_service import compress_chunks_to_docs, search_text, search_vector from services.search.retrieval_service import compress_chunks_to_docs, search_text, search_vector
from services.search_telemetry import ( from services.search_telemetry import (
compute_confidence, compute_confidence,
compute_confidence_hybrid, compute_confidence_hybrid,
compute_confidence_reranked,
record_search_event, record_search_event,
) )
@@ -104,6 +111,10 @@ async def search(
pattern="^(legacy|rrf|rrf_boost)$", pattern="^(legacy|rrf|rrf_boost)$",
description="hybrid 모드 fusion 전략 (legacy=기존 가중합, rrf=RRF k=60, rrf_boost=RRF+강한신호 boost)", description="hybrid 모드 fusion 전략 (legacy=기존 가중합, rrf=RRF k=60, rrf_boost=RRF+강한신호 boost)",
), ),
rerank: bool = Query(
True,
description="bge-reranker-v2-m3 활성화 (Phase 1.3, hybrid 모드만 동작)",
),
debug: bool = Query(False, description="단계별 candidates + timing 응답에 포함"), debug: bool = Query(False, description="단계별 candidates + timing 응답에 포함"),
): ):
"""문서 검색 — FTS + ILIKE + 벡터 결합 (Phase 0.5: RRF fusion)""" """문서 검색 — FTS + ILIKE + 벡터 결합 (Phase 0.5: RRF fusion)"""
@@ -145,13 +156,44 @@ async def search(
t2 = time.perf_counter() t2 = time.perf_counter()
strategy = get_strategy(fusion) strategy = get_strategy(fusion)
results = strategy.fuse(text_results, vector_results, q, limit) # fusion은 doc 기준 — 더 넓게 가져옴 (rerank 후보용)
fusion_limit = max(limit * 5, 100) if rerank else limit
fused_docs = strategy.fuse(text_results, vector_results, q, fusion_limit)
timing["fusion_ms"] = (time.perf_counter() - t2) * 1000 timing["fusion_ms"] = (time.perf_counter() - t2) * 1000
notes.append(f"fusion={strategy.name}") notes.append(f"fusion={strategy.name}")
notes.append( notes.append(
f"chunks raw={len(raw_chunks)} compressed={len(vector_results)} " f"chunks raw={len(raw_chunks)} compressed={len(vector_results)} "
f"unique_docs={len(chunks_by_doc)}" f"unique_docs={len(chunks_by_doc)}"
) )
if rerank:
# Phase 1.3: reranker — chunk 기준 입력
# fusion 결과 doc_id로 chunks_by_doc에서 raw chunks 회수
t3 = time.perf_counter()
rerank_input: list[SearchResult] = []
for doc in fused_docs:
chunks = chunks_by_doc.get(doc.id, [])
if chunks:
# doc당 max 2 chunk (latency/VRAM 보호)
rerank_input.extend(chunks[:MAX_CHUNKS_PER_DOC])
else:
# text-only 매치 doc → doc 자체를 chunk처럼 wrap
rerank_input.append(doc)
if len(rerank_input) >= MAX_RERANK_INPUT:
break
rerank_input = rerank_input[:MAX_RERANK_INPUT]
notes.append(f"rerank input={len(rerank_input)}")
reranked = await rerank_chunks(q, rerank_input, limit * 3)
timing["rerank_ms"] = (time.perf_counter() - t3) * 1000
# diversity (chunk → doc 압축, max_per_doc=2, top score>0.90 unlimited)
t4 = time.perf_counter()
results = apply_diversity(reranked, max_per_doc=MAX_CHUNKS_PER_DOC)[:limit]
timing["diversity_ms"] = (time.perf_counter() - t4) * 1000
else:
# rerank 비활성: fused_docs를 그대로 (limit 적용)
results = fused_docs[:limit]
else: else:
results = text_results results = text_results
@@ -162,8 +204,12 @@ async def search(
timing["total_ms"] = (time.perf_counter() - t_total) * 1000 timing["total_ms"] = (time.perf_counter() - t_total) * 1000
# confidence는 fusion 적용 전 raw 신호로 계산 (Phase 0.5 이후 fused score는 절대값 의미 없음) # confidence는 fusion 적용 전 raw 신호로 계산 (Phase 0.5 이후 fused score는 절대값 의미 없음)
# rerank 활성 시 reranker score가 가장 신뢰할 수 있는 신호 → 우선 사용
if mode == "hybrid": if mode == "hybrid":
confidence_signal = compute_confidence_hybrid(text_results, vector_results) if rerank and "rerank_ms" in timing:
confidence_signal = compute_confidence_reranked(results)
else:
confidence_signal = compute_confidence_hybrid(text_results, vector_results)
elif mode == "vector": elif mode == "vector":
confidence_signal = compute_confidence(vector_results, "vector") confidence_signal = compute_confidence(vector_results, "vector")
else: else:

View File

@@ -1,5 +1,199 @@
"""Reranker 서비스 — bge-reranker-v2-m3 통합 (Phase 1.3). """Reranker 서비스 — bge-reranker-v2-m3 통합 (Phase 1.3).
TEI 컨테이너 호출 + asyncio.Semaphore(2) + soft timeout fallback. TEI 컨테이너 호출 + asyncio.Semaphore(2) + soft timeout fallback.
구현은 Phase 1.3에서 채움.
데이터 흐름 원칙:
- fusion = doc 기준 / reranker = chunk 기준 — 절대 섞지 말 것
- raw chunks를 끝까지 보존, fusion은 압축본만 사용
- reranker는 chunks_by_doc dict에서 raw chunks 회수해서 chunk 단위로 호출
- diversity는 reranker 직후 마지막 단계에서만 적용
snippet 생성:
- 200~400 토큰(800~1500자) 기준
- query keyword 위치 중심 ±target_chars/2 윈도우
- keyword 매치 없으면 첫 target_chars 문자 fallback (성능 손실 방지)
""" """
from __future__ import annotations
import asyncio
import re
from typing import TYPE_CHECKING
import httpx
from ai.client import AIClient
from core.utils import setup_logger
if TYPE_CHECKING:
from api.search import SearchResult
logger = setup_logger("rerank")
# 동시 rerank 호출 제한 (GPU saturation 방지)
RERANK_SEMAPHORE = asyncio.Semaphore(2)
# rerank input 크기 제한 (latency / VRAM hard cap)
MAX_RERANK_INPUT = 200
MAX_CHUNKS_PER_DOC = 2
# Soft timeout (초)
RERANK_TIMEOUT = 5.0
def _extract_window(text: str, query: str, target_chars: int = 800) -> str:
"""query keyword 위치 중심으로 ±target_chars/2 윈도우 추출.
fallback: keyword 매치 없으면 첫 target_chars 문자 그대로.
이게 없으면 reranker가 무관한 텍스트만 보고 점수 매겨 성능 급락.
"""
keywords = [k for k in re.split(r"\s+", query) if len(k) >= 2]
best_pos = -1
for kw in keywords:
pos = text.lower().find(kw.lower())
if pos >= 0:
best_pos = pos
break
if best_pos < 0:
# Fallback: 첫 target_chars 문자
return text[:target_chars]
half = target_chars // 2
start = max(0, best_pos - half)
end = min(len(text), start + target_chars)
return text[start:end]
def _make_snippet(c: "SearchResult", query: str, max_chars: int = 1500) -> str:
"""Reranker input snippet — title + query 중심 본문 윈도우.
feedback_search_phase1_implementation.md 3번 항목 강제:
snippet 200~400 토큰(800~1500자), full document 절대 안 됨.
"""
title = c.title or ""
text = c.snippet or ""
# snippet은 chunk text 앞 200자 또는 doc text 앞 200자
# 더 긴 chunk text가 필요하면 호출자가 따로 채워서 넘김
if len(text) > max_chars:
text = _extract_window(text, query, target_chars=max_chars - 100)
return f"{title}\n\n{text}"
def _wrap_doc_as_chunk(doc: "SearchResult") -> "SearchResult":
"""text-only 매치 doc(chunks_by_doc에 없는 doc)을 ChunkResult 형태로 변환.
Phase 1.3 reranker 입력에 doc 자체가 들어가야 하는 경우.
snippet은 documents.extracted_text 앞 200자 (이미 SearchResult.snippet에 채워짐).
chunk_id 등은 None 그대로.
"""
return doc
async def rerank_chunks(
query: str,
candidates: list["SearchResult"],
limit: int,
) -> list["SearchResult"]:
"""RRF 결과 candidates를 bge-reranker로 재정렬.
Args:
query: 사용자 쿼리
candidates: chunk-level SearchResult 리스트 (이미 chunks_by_doc에서 회수)
limit: 반환할 결과 수
Returns:
reranked SearchResult 리스트 (rerank score로 score 필드 업데이트)
Fallback (timeout/HTTPError): RRF 순서 그대로 candidates[:limit] 반환.
"""
if not candidates:
return []
# input 크기 제한 (latency/VRAM hard cap)
if len(candidates) > MAX_RERANK_INPUT:
logger.warning(
f"rerank input {len(candidates)} > MAX {MAX_RERANK_INPUT}, 자름"
)
candidates = candidates[:MAX_RERANK_INPUT]
snippets = [_make_snippet(c, query) for c in candidates]
client = AIClient()
try:
async with asyncio.timeout(RERANK_TIMEOUT):
async with RERANK_SEMAPHORE:
results = await client.rerank(query, snippets)
# results: [{"index": int, "score": float}, ...] (이미 정렬됨)
reranked: list["SearchResult"] = []
for r in results:
idx = r.get("index")
sc = r.get("score")
if idx is None or sc is None or idx >= len(candidates):
continue
chunk = candidates[idx]
chunk.score = float(sc)
chunk.match_reason = (chunk.match_reason or "") + "+rerank"
reranked.append(chunk)
return reranked[:limit]
except (asyncio.TimeoutError, httpx.HTTPError) as e:
logger.warning(f"rerank failed → RRF fallback: {type(e).__name__}: {e}")
return candidates[:limit]
except Exception as e:
logger.warning(f"rerank unexpected error → RRF fallback: {type(e).__name__}: {e}")
return candidates[:limit]
finally:
await client.close()
async def warmup_reranker() -> bool:
"""TEI 부팅 후 모델 로딩 완료 대기 (10회 retry).
TEI는 health 200을 빠르게 반환하지만 첫 모델 로딩(10~30초) 전에는
rerank 요청이 실패하거나 매우 느림. FastAPI startup 또는 첫 요청 전 호출.
"""
client = AIClient()
try:
for attempt in range(10):
try:
await client.rerank("warmup", ["dummy text for model load"])
logger.info(f"reranker warmup OK (attempt {attempt + 1})")
return True
except Exception as e:
logger.info(f"reranker warmup retry {attempt + 1}: {e}")
await asyncio.sleep(3)
logger.error("reranker warmup failed after 10 attempts")
return False
finally:
await client.close()
def apply_diversity(
results: list["SearchResult"],
max_per_doc: int = MAX_CHUNKS_PER_DOC,
top_score_threshold: float = 0.90,
) -> list["SearchResult"]:
"""chunk-level 결과를 doc 기준으로 압축 (max_per_doc).
조건부 완화: 가장 상위 결과 score가 threshold 이상이면 unlimited
(high confidence relevance > diversity).
"""
if not results:
return []
# 가장 상위 score가 threshold 이상이면 diversity 제약 해제
top_score = results[0].score if results else 0.0
if top_score >= top_score_threshold:
return results
seen: dict[int, int] = {}
out: list["SearchResult"] = []
for r in results:
doc_id = r.id
if seen.get(doc_id, 0) >= max_per_doc:
continue
out.append(r)
seen[doc_id] = seen.get(doc_id, 0) + 1
return out

View File

@@ -149,6 +149,33 @@ def _cosine_to_confidence(cosine: float) -> float:
return 0.10 return 0.10
def compute_confidence_reranked(reranked_results: list[Any]) -> float:
"""Phase 1.3 reranker score 기반 confidence.
bge-reranker-v2-m3는 sigmoid score (0~1 범위)를 반환.
rerank 활성 시 fusion score보다 reranker score가 가장 신뢰할 수 있는 신호.
임계값(초안, 실측 후 조정 가능):
>= 0.95 → high
>= 0.80 → med-high
>= 0.60 → med
>= 0.40 → low-med
else → low
"""
if not reranked_results:
return 0.0
top_score = float(getattr(reranked_results[0], "score", 0.0) or 0.0)
if top_score >= 0.95:
return 0.95
if top_score >= 0.80:
return 0.80
if top_score >= 0.60:
return 0.65
if top_score >= 0.40:
return 0.50
return 0.35
def compute_confidence_hybrid( def compute_confidence_hybrid(
text_results: list[Any], text_results: list[Any],
vector_results: list[Any], vector_results: list[Any],

View File

@@ -133,6 +133,7 @@ async def call_search(
mode: str = "hybrid", mode: str = "hybrid",
limit: int = 20, limit: int = 20,
fusion: str | None = None, fusion: str | None = None,
rerank: str | None = None,
) -> tuple[list[int], float]: ) -> tuple[list[int], float]:
"""검색 API 호출 → (doc_ids, latency_ms).""" """검색 API 호출 → (doc_ids, latency_ms)."""
url = f"{base_url.rstrip('/')}/api/search/" url = f"{base_url.rstrip('/')}/api/search/"
@@ -140,6 +141,8 @@ async def call_search(
params: dict[str, str | int] = {"q": query, "mode": mode, "limit": limit} params: dict[str, str | int] = {"q": query, "mode": mode, "limit": limit}
if fusion: if fusion:
params["fusion"] = fusion params["fusion"] = fusion
if rerank is not None:
params["rerank"] = rerank
import time import time
@@ -165,6 +168,7 @@ async def evaluate(
label: str, label: str,
mode: str = "hybrid", mode: str = "hybrid",
fusion: str | None = None, fusion: str | None = None,
rerank: str | None = None,
) -> list[QueryResult]: ) -> list[QueryResult]:
"""전체 쿼리셋 평가.""" """전체 쿼리셋 평가."""
results: list[QueryResult] = [] results: list[QueryResult] = []
@@ -173,7 +177,7 @@ async def evaluate(
for q in queries: for q in queries:
try: try:
returned_ids, latency_ms = await call_search( returned_ids, latency_ms = await call_search(
client, base_url, token, q.query, mode=mode, fusion=fusion client, base_url, token, q.query, mode=mode, fusion=fusion, rerank=rerank
) )
results.append( results.append(
QueryResult( QueryResult(
@@ -404,6 +408,13 @@ def main() -> int:
choices=["legacy", "rrf", "rrf_boost"], choices=["legacy", "rrf", "rrf_boost"],
help="hybrid 모드 fusion 전략 (Phase 0.5+, 미지정 시 서버 기본값)", help="hybrid 모드 fusion 전략 (Phase 0.5+, 미지정 시 서버 기본값)",
) )
parser.add_argument(
"--rerank",
type=str,
default=None,
choices=["true", "false"],
help="bge-reranker-v2-m3 활성화 (Phase 1.3+, 미지정 시 서버 기본값=true)",
)
parser.add_argument( parser.add_argument(
"--token", "--token",
type=str, type=str,
@@ -434,6 +445,8 @@ def main() -> int:
print(f"Mode: {args.mode}", end="") print(f"Mode: {args.mode}", end="")
if args.fusion: if args.fusion:
print(f" / fusion: {args.fusion}", end="") print(f" / fusion: {args.fusion}", end="")
if args.rerank:
print(f" / rerank: {args.rerank}", end="")
print() print()
all_results: list[QueryResult] = [] all_results: list[QueryResult] = []
@@ -441,21 +454,21 @@ def main() -> int:
if args.base_url: if args.base_url:
print(f"\n>>> evaluating: {args.base_url}") print(f"\n>>> evaluating: {args.base_url}")
results = asyncio.run( results = asyncio.run(
evaluate(queries, args.base_url, args.token, "single", mode=args.mode, fusion=args.fusion) evaluate(queries, args.base_url, args.token, "single", mode=args.mode, fusion=args.fusion, rerank=args.rerank)
) )
print_summary("single", results) print_summary("single", results)
all_results.extend(results) all_results.extend(results)
else: else:
print(f"\n>>> baseline: {args.baseline_url}") print(f"\n>>> baseline: {args.baseline_url}")
baseline_results = asyncio.run( baseline_results = asyncio.run(
evaluate(queries, args.baseline_url, args.token, "baseline", mode=args.mode, fusion=args.fusion) evaluate(queries, args.baseline_url, args.token, "baseline", mode=args.mode, fusion=args.fusion, rerank=args.rerank)
) )
baseline_summary = print_summary("baseline", baseline_results) baseline_summary = print_summary("baseline", baseline_results)
print(f"\n>>> candidate: {args.candidate_url}") print(f"\n>>> candidate: {args.candidate_url}")
candidate_results = asyncio.run( candidate_results = asyncio.run(
evaluate( evaluate(
queries, args.candidate_url, args.token, "candidate", mode=args.mode, fusion=args.fusion queries, args.candidate_url, args.token, "candidate", mode=args.mode, fusion=args.fusion, rerank=args.rerank
) )
) )
candidate_summary = print_summary("candidate", candidate_results) candidate_summary = print_summary("candidate", candidate_results)