From 76e723cdb171c87e930b5ea2cc6ab045c4f2cd22 Mon Sep 17 00:00:00 2001 From: Hyungi Ahn Date: Wed, 8 Apr 2026 12:41:47 +0900 Subject: [PATCH] =?UTF-8?q?feat(search):=20Phase=201.3=20TEI=20reranker=20?= =?UTF-8?q?=ED=86=B5=ED=95=A9=20(=EC=BD=94=EB=93=9C=20=EA=B3=A8=EA=B2=A9)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 데이터 흐름 원칙: fusion=doc 기준 / reranker=chunk 기준 — 절대 섞지 말 것. 신규/수정: - ai/client.py: rerank() 메서드 추가 (TEI POST /rerank API) - services/search/rerank_service.py: - rerank_chunks() — asyncio.Semaphore(2) + 5s soft timeout + RRF fallback - _make_snippet/_extract_window — title + query 중심 200~400 토큰 (keyword 매치 없으면 첫 800자 fallback) - apply_diversity() — max_per_doc=2, top score>=0.90 unlimited - warmup_reranker() — 10회 retry + 3초 간격 (TEI 모델 로딩 대기) - MAX_RERANK_INPUT=200, MAX_CHUNKS_PER_DOC=2 hard cap - services/search_telemetry.py: compute_confidence_reranked() — sigmoid score 임계값 - api/search.py: - ?rerank=true|false 파라미터 (기본 true, hybrid 모드만) - 흐름: fused_docs(limit*5) → chunks_by_doc 회수 → rerank_chunks → apply_diversity - text-only 매치 doc은 doc 자체를 chunk처럼 wrap (fallback) - rerank 활성 시 confidence는 reranker score 기반 - tests/search_eval/run_eval.py: --rerank true|false 플래그 GPU 적용 보류: - TEI 컨테이너 추가 (docker-compose.yml) — 별도 작업 - config.yaml rerank.endpoint 갱신 — GPU 직접 (commit 없음) - 재인덱싱 완료 후 build + warmup + 평가셋 측정 --- app/ai/client.py | 19 +++ app/api/search.py | 50 ++++++- app/services/search/rerank_service.py | 196 +++++++++++++++++++++++++- app/services/search_telemetry.py | 27 ++++ tests/search_eval/run_eval.py | 21 ++- 5 files changed, 306 insertions(+), 7 deletions(-) diff --git a/app/ai/client.py b/app/ai/client.py index d1ad612..601dad9 100644 --- a/app/ai/client.py +++ b/app/ai/client.py @@ -85,6 +85,25 @@ class AIClient: # TODO: Qwen2.5-VL-7B 비전 모델 호출 구현 raise NotImplementedError("OCR는 Phase 1에서 구현") + async def rerank(self, query: str, texts: list[str]) -> list[dict]: + """TEI bge-reranker-v2-m3 호출 (Phase 1.3). + + TEI POST /rerank API: + request: {"query": str, "texts": [str, ...]} + response: [{"index": int, "score": float}, ...] (정렬됨) + + timeout은 self.ai.rerank.timeout (config.yaml). + 호출자(rerank_service)가 asyncio.Semaphore + try/except로 감쌈. + """ + timeout = float(self.ai.rerank.timeout) if self.ai.rerank.timeout else 5.0 + response = await self._http.post( + self.ai.rerank.endpoint, + json={"query": query, "texts": texts}, + timeout=timeout, + ) + response.raise_for_status() + return response.json() + async def _call_chat(self, model_config, prompt: str) -> str: """OpenAI 호환 API 호출 + 자동 폴백""" try: diff --git a/app/api/search.py b/app/api/search.py index 918d2aa..c0e041d 100644 --- a/app/api/search.py +++ b/app/api/search.py @@ -16,10 +16,17 @@ from core.database import get_session from core.utils import setup_logger from models.user import User from services.search.fusion_service import DEFAULT_FUSION, get_strategy, normalize_display_scores +from services.search.rerank_service import ( + MAX_CHUNKS_PER_DOC, + MAX_RERANK_INPUT, + apply_diversity, + rerank_chunks, +) from services.search.retrieval_service import compress_chunks_to_docs, search_text, search_vector from services.search_telemetry import ( compute_confidence, compute_confidence_hybrid, + compute_confidence_reranked, record_search_event, ) @@ -104,6 +111,10 @@ async def search( pattern="^(legacy|rrf|rrf_boost)$", description="hybrid 모드 fusion 전략 (legacy=기존 가중합, rrf=RRF k=60, rrf_boost=RRF+강한신호 boost)", ), + rerank: bool = Query( + True, + description="bge-reranker-v2-m3 활성화 (Phase 1.3, hybrid 모드만 동작)", + ), debug: bool = Query(False, description="단계별 candidates + timing 응답에 포함"), ): """문서 검색 — FTS + ILIKE + 벡터 결합 (Phase 0.5: RRF fusion)""" @@ -145,13 +156,44 @@ async def search( t2 = time.perf_counter() strategy = get_strategy(fusion) - results = strategy.fuse(text_results, vector_results, q, limit) + # fusion은 doc 기준 — 더 넓게 가져옴 (rerank 후보용) + fusion_limit = max(limit * 5, 100) if rerank else limit + fused_docs = strategy.fuse(text_results, vector_results, q, fusion_limit) timing["fusion_ms"] = (time.perf_counter() - t2) * 1000 notes.append(f"fusion={strategy.name}") notes.append( f"chunks raw={len(raw_chunks)} compressed={len(vector_results)} " f"unique_docs={len(chunks_by_doc)}" ) + + if rerank: + # Phase 1.3: reranker — chunk 기준 입력 + # fusion 결과 doc_id로 chunks_by_doc에서 raw chunks 회수 + t3 = time.perf_counter() + rerank_input: list[SearchResult] = [] + for doc in fused_docs: + chunks = chunks_by_doc.get(doc.id, []) + if chunks: + # doc당 max 2 chunk (latency/VRAM 보호) + rerank_input.extend(chunks[:MAX_CHUNKS_PER_DOC]) + else: + # text-only 매치 doc → doc 자체를 chunk처럼 wrap + rerank_input.append(doc) + if len(rerank_input) >= MAX_RERANK_INPUT: + break + rerank_input = rerank_input[:MAX_RERANK_INPUT] + notes.append(f"rerank input={len(rerank_input)}") + + reranked = await rerank_chunks(q, rerank_input, limit * 3) + timing["rerank_ms"] = (time.perf_counter() - t3) * 1000 + + # diversity (chunk → doc 압축, max_per_doc=2, top score>0.90 unlimited) + t4 = time.perf_counter() + results = apply_diversity(reranked, max_per_doc=MAX_CHUNKS_PER_DOC)[:limit] + timing["diversity_ms"] = (time.perf_counter() - t4) * 1000 + else: + # rerank 비활성: fused_docs를 그대로 (limit 적용) + results = fused_docs[:limit] else: results = text_results @@ -162,8 +204,12 @@ async def search( timing["total_ms"] = (time.perf_counter() - t_total) * 1000 # confidence는 fusion 적용 전 raw 신호로 계산 (Phase 0.5 이후 fused score는 절대값 의미 없음) + # rerank 활성 시 reranker score가 가장 신뢰할 수 있는 신호 → 우선 사용 if mode == "hybrid": - confidence_signal = compute_confidence_hybrid(text_results, vector_results) + if rerank and "rerank_ms" in timing: + confidence_signal = compute_confidence_reranked(results) + else: + confidence_signal = compute_confidence_hybrid(text_results, vector_results) elif mode == "vector": confidence_signal = compute_confidence(vector_results, "vector") else: diff --git a/app/services/search/rerank_service.py b/app/services/search/rerank_service.py index 16d9373..c107f59 100644 --- a/app/services/search/rerank_service.py +++ b/app/services/search/rerank_service.py @@ -1,5 +1,199 @@ """Reranker 서비스 — bge-reranker-v2-m3 통합 (Phase 1.3). TEI 컨테이너 호출 + asyncio.Semaphore(2) + soft timeout fallback. -구현은 Phase 1.3에서 채움. + +데이터 흐름 원칙: +- fusion = doc 기준 / reranker = chunk 기준 — 절대 섞지 말 것 +- raw chunks를 끝까지 보존, fusion은 압축본만 사용 +- reranker는 chunks_by_doc dict에서 raw chunks 회수해서 chunk 단위로 호출 +- diversity는 reranker 직후 마지막 단계에서만 적용 + +snippet 생성: +- 200~400 토큰(800~1500자) 기준 +- query keyword 위치 중심 ±target_chars/2 윈도우 +- keyword 매치 없으면 첫 target_chars 문자 fallback (성능 손실 방지) """ + +from __future__ import annotations + +import asyncio +import re +from typing import TYPE_CHECKING + +import httpx + +from ai.client import AIClient +from core.utils import setup_logger + +if TYPE_CHECKING: + from api.search import SearchResult + +logger = setup_logger("rerank") + +# 동시 rerank 호출 제한 (GPU saturation 방지) +RERANK_SEMAPHORE = asyncio.Semaphore(2) + +# rerank input 크기 제한 (latency / VRAM hard cap) +MAX_RERANK_INPUT = 200 +MAX_CHUNKS_PER_DOC = 2 + +# Soft timeout (초) +RERANK_TIMEOUT = 5.0 + + +def _extract_window(text: str, query: str, target_chars: int = 800) -> str: + """query keyword 위치 중심으로 ±target_chars/2 윈도우 추출. + + fallback: keyword 매치 없으면 첫 target_chars 문자 그대로. + 이게 없으면 reranker가 무관한 텍스트만 보고 점수 매겨 성능 급락. + """ + keywords = [k for k in re.split(r"\s+", query) if len(k) >= 2] + best_pos = -1 + for kw in keywords: + pos = text.lower().find(kw.lower()) + if pos >= 0: + best_pos = pos + break + + if best_pos < 0: + # Fallback: 첫 target_chars 문자 + return text[:target_chars] + + half = target_chars // 2 + start = max(0, best_pos - half) + end = min(len(text), start + target_chars) + return text[start:end] + + +def _make_snippet(c: "SearchResult", query: str, max_chars: int = 1500) -> str: + """Reranker input snippet — title + query 중심 본문 윈도우. + + feedback_search_phase1_implementation.md 3번 항목 강제: + snippet 200~400 토큰(800~1500자), full document 절대 안 됨. + """ + title = c.title or "" + text = c.snippet or "" + + # snippet은 chunk text 앞 200자 또는 doc text 앞 200자 + # 더 긴 chunk text가 필요하면 호출자가 따로 채워서 넘김 + if len(text) > max_chars: + text = _extract_window(text, query, target_chars=max_chars - 100) + + return f"{title}\n\n{text}" + + +def _wrap_doc_as_chunk(doc: "SearchResult") -> "SearchResult": + """text-only 매치 doc(chunks_by_doc에 없는 doc)을 ChunkResult 형태로 변환. + + Phase 1.3 reranker 입력에 doc 자체가 들어가야 하는 경우. + snippet은 documents.extracted_text 앞 200자 (이미 SearchResult.snippet에 채워짐). + chunk_id 등은 None 그대로. + """ + return doc + + +async def rerank_chunks( + query: str, + candidates: list["SearchResult"], + limit: int, +) -> list["SearchResult"]: + """RRF 결과 candidates를 bge-reranker로 재정렬. + + Args: + query: 사용자 쿼리 + candidates: chunk-level SearchResult 리스트 (이미 chunks_by_doc에서 회수) + limit: 반환할 결과 수 + + Returns: + reranked SearchResult 리스트 (rerank score로 score 필드 업데이트) + + Fallback (timeout/HTTPError): RRF 순서 그대로 candidates[:limit] 반환. + """ + if not candidates: + return [] + + # input 크기 제한 (latency/VRAM hard cap) + if len(candidates) > MAX_RERANK_INPUT: + logger.warning( + f"rerank input {len(candidates)} > MAX {MAX_RERANK_INPUT}, 자름" + ) + candidates = candidates[:MAX_RERANK_INPUT] + + snippets = [_make_snippet(c, query) for c in candidates] + client = AIClient() + + try: + async with asyncio.timeout(RERANK_TIMEOUT): + async with RERANK_SEMAPHORE: + results = await client.rerank(query, snippets) + # results: [{"index": int, "score": float}, ...] (이미 정렬됨) + reranked: list["SearchResult"] = [] + for r in results: + idx = r.get("index") + sc = r.get("score") + if idx is None or sc is None or idx >= len(candidates): + continue + chunk = candidates[idx] + chunk.score = float(sc) + chunk.match_reason = (chunk.match_reason or "") + "+rerank" + reranked.append(chunk) + return reranked[:limit] + except (asyncio.TimeoutError, httpx.HTTPError) as e: + logger.warning(f"rerank failed → RRF fallback: {type(e).__name__}: {e}") + return candidates[:limit] + except Exception as e: + logger.warning(f"rerank unexpected error → RRF fallback: {type(e).__name__}: {e}") + return candidates[:limit] + finally: + await client.close() + + +async def warmup_reranker() -> bool: + """TEI 부팅 후 모델 로딩 완료 대기 (10회 retry). + + TEI는 health 200을 빠르게 반환하지만 첫 모델 로딩(10~30초) 전에는 + rerank 요청이 실패하거나 매우 느림. FastAPI startup 또는 첫 요청 전 호출. + """ + client = AIClient() + try: + for attempt in range(10): + try: + await client.rerank("warmup", ["dummy text for model load"]) + logger.info(f"reranker warmup OK (attempt {attempt + 1})") + return True + except Exception as e: + logger.info(f"reranker warmup retry {attempt + 1}: {e}") + await asyncio.sleep(3) + logger.error("reranker warmup failed after 10 attempts") + return False + finally: + await client.close() + + +def apply_diversity( + results: list["SearchResult"], + max_per_doc: int = MAX_CHUNKS_PER_DOC, + top_score_threshold: float = 0.90, +) -> list["SearchResult"]: + """chunk-level 결과를 doc 기준으로 압축 (max_per_doc). + + 조건부 완화: 가장 상위 결과 score가 threshold 이상이면 unlimited + (high confidence relevance > diversity). + """ + if not results: + return [] + + # 가장 상위 score가 threshold 이상이면 diversity 제약 해제 + top_score = results[0].score if results else 0.0 + if top_score >= top_score_threshold: + return results + + seen: dict[int, int] = {} + out: list["SearchResult"] = [] + for r in results: + doc_id = r.id + if seen.get(doc_id, 0) >= max_per_doc: + continue + out.append(r) + seen[doc_id] = seen.get(doc_id, 0) + 1 + return out diff --git a/app/services/search_telemetry.py b/app/services/search_telemetry.py index eb2dbb5..2bb82d1 100644 --- a/app/services/search_telemetry.py +++ b/app/services/search_telemetry.py @@ -149,6 +149,33 @@ def _cosine_to_confidence(cosine: float) -> float: return 0.10 +def compute_confidence_reranked(reranked_results: list[Any]) -> float: + """Phase 1.3 reranker score 기반 confidence. + + bge-reranker-v2-m3는 sigmoid score (0~1 범위)를 반환. + rerank 활성 시 fusion score보다 reranker score가 가장 신뢰할 수 있는 신호. + + 임계값(초안, 실측 후 조정 가능): + >= 0.95 → high + >= 0.80 → med-high + >= 0.60 → med + >= 0.40 → low-med + else → low + """ + if not reranked_results: + return 0.0 + top_score = float(getattr(reranked_results[0], "score", 0.0) or 0.0) + if top_score >= 0.95: + return 0.95 + if top_score >= 0.80: + return 0.80 + if top_score >= 0.60: + return 0.65 + if top_score >= 0.40: + return 0.50 + return 0.35 + + def compute_confidence_hybrid( text_results: list[Any], vector_results: list[Any], diff --git a/tests/search_eval/run_eval.py b/tests/search_eval/run_eval.py index ac3fa99..a7ac06f 100644 --- a/tests/search_eval/run_eval.py +++ b/tests/search_eval/run_eval.py @@ -133,6 +133,7 @@ async def call_search( mode: str = "hybrid", limit: int = 20, fusion: str | None = None, + rerank: str | None = None, ) -> tuple[list[int], float]: """검색 API 호출 → (doc_ids, latency_ms).""" url = f"{base_url.rstrip('/')}/api/search/" @@ -140,6 +141,8 @@ async def call_search( params: dict[str, str | int] = {"q": query, "mode": mode, "limit": limit} if fusion: params["fusion"] = fusion + if rerank is not None: + params["rerank"] = rerank import time @@ -165,6 +168,7 @@ async def evaluate( label: str, mode: str = "hybrid", fusion: str | None = None, + rerank: str | None = None, ) -> list[QueryResult]: """전체 쿼리셋 평가.""" results: list[QueryResult] = [] @@ -173,7 +177,7 @@ async def evaluate( for q in queries: try: returned_ids, latency_ms = await call_search( - client, base_url, token, q.query, mode=mode, fusion=fusion + client, base_url, token, q.query, mode=mode, fusion=fusion, rerank=rerank ) results.append( QueryResult( @@ -404,6 +408,13 @@ def main() -> int: choices=["legacy", "rrf", "rrf_boost"], help="hybrid 모드 fusion 전략 (Phase 0.5+, 미지정 시 서버 기본값)", ) + parser.add_argument( + "--rerank", + type=str, + default=None, + choices=["true", "false"], + help="bge-reranker-v2-m3 활성화 (Phase 1.3+, 미지정 시 서버 기본값=true)", + ) parser.add_argument( "--token", type=str, @@ -434,6 +445,8 @@ def main() -> int: print(f"Mode: {args.mode}", end="") if args.fusion: print(f" / fusion: {args.fusion}", end="") + if args.rerank: + print(f" / rerank: {args.rerank}", end="") print() all_results: list[QueryResult] = [] @@ -441,21 +454,21 @@ def main() -> int: if args.base_url: print(f"\n>>> evaluating: {args.base_url}") results = asyncio.run( - evaluate(queries, args.base_url, args.token, "single", mode=args.mode, fusion=args.fusion) + evaluate(queries, args.base_url, args.token, "single", mode=args.mode, fusion=args.fusion, rerank=args.rerank) ) print_summary("single", results) all_results.extend(results) else: print(f"\n>>> baseline: {args.baseline_url}") baseline_results = asyncio.run( - evaluate(queries, args.baseline_url, args.token, "baseline", mode=args.mode, fusion=args.fusion) + evaluate(queries, args.baseline_url, args.token, "baseline", mode=args.mode, fusion=args.fusion, rerank=args.rerank) ) baseline_summary = print_summary("baseline", baseline_results) print(f"\n>>> candidate: {args.candidate_url}") candidate_results = asyncio.run( evaluate( - queries, args.candidate_url, args.token, "candidate", mode=args.mode, fusion=args.fusion + queries, args.candidate_url, args.token, "candidate", mode=args.mode, fusion=args.fusion, rerank=args.rerank ) ) candidate_summary = print_summary("candidate", candidate_results)