Files
hyungi_document_server/app/api/search.py
Hyungi Ahn 161ff18a31 feat(search): Phase 0.5 RRF fusion + 강한 신호 boost
기존 weighted-sum merge를 Reciprocal Rank Fusion으로 교체.
정확 키워드 매치에서 RRF가 평탄화되는 문제는 boost로 보완.

신규 모듈 app/services/search_fusion.py:
- FusionStrategy ABC
- LegacyWeightedSum  : 기존 _merge_results 동작 (A/B 비교용)
- RRFOnly            : 순수 RRF, k=60
- RRFWithBoost       : RRF + title/tags/법령조문/high-text-score boost (default)
- normalize_display_scores: SearchResult.score를 [0..1] 랭크 기반 정규화
  (프론트엔드가 score*100을 % 표시하므로 RRF 원본 점수 노출 시 표시 깨짐)

search.py:
- ?fusion=legacy|rrf|rrf_boost 파라미터 (default rrf_boost)
- _merge_results 제거 (LegacyWeightedSum에 흡수)
- pre-fusion confidence: hybrid는 raw text/vector 신호로 계산
  (fused score는 fusion 전략마다 스케일이 달라 일관 비교 불가)
- timing에 fusion_ms 추가
- debug notes에 fusion 전략 표시

telemetry:
- compute_confidence_hybrid(text_results, vector_results) 헬퍼
- record_search_event에 confidence override 파라미터

run_eval.py:
- --fusion CLI 옵션, call_search 쿼리 파라미터에 전달

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-07 08:58:33 +09:00

251 lines
9.7 KiB
Python

"""하이브리드 검색 API — FTS + ILIKE + 벡터 (필드별 가중치)"""
import time
from typing import Annotated
from fastapi import APIRouter, BackgroundTasks, Depends, Query
from pydantic import BaseModel
from sqlalchemy import text
from sqlalchemy.ext.asyncio import AsyncSession
from ai.client import AIClient
from core.auth import get_current_user
from core.database import get_session
from core.utils import setup_logger
from models.user import User
from services.search_fusion import DEFAULT_FUSION, get_strategy, normalize_display_scores
from services.search_telemetry import (
compute_confidence,
compute_confidence_hybrid,
record_search_event,
)
# logs/search.log + stdout 동시 출력 (Phase 0.4)
logger = setup_logger("search")
router = APIRouter()
class SearchResult(BaseModel):
id: int
title: str | None
ai_domain: str | None
ai_summary: str | None
file_format: str
score: float
snippet: str | None
match_reason: str | None = None
# ─── Phase 0.4: 디버그 응답 스키마 ─────────────────────────
class DebugCandidate(BaseModel):
"""단계별 후보 (debug=true 응답에서만 노출)."""
id: int
rank: int
score: float
match_reason: str | None = None
class SearchDebug(BaseModel):
timing_ms: dict[str, float]
text_candidates: list[DebugCandidate] | None = None
vector_candidates: list[DebugCandidate] | None = None
fused_candidates: list[DebugCandidate] | None = None
confidence: float
notes: list[str] = []
# Phase 1/2 도입 후 채워질 placeholder
query_analysis: dict | None = None
reranker_scores: list[DebugCandidate] | None = None
class SearchResponse(BaseModel):
results: list[SearchResult]
total: int
query: str
mode: str
debug: SearchDebug | None = None
def _to_debug_candidates(rows: list[SearchResult], n: int = 20) -> list[DebugCandidate]:
return [
DebugCandidate(
id=r.id, rank=i + 1, score=r.score, match_reason=r.match_reason
)
for i, r in enumerate(rows[:n])
]
@router.get("/", response_model=SearchResponse)
async def search(
q: str,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
background_tasks: BackgroundTasks,
mode: str = Query("hybrid", pattern="^(fts|trgm|vector|hybrid)$"),
limit: int = Query(20, ge=1, le=100),
fusion: str = Query(
DEFAULT_FUSION,
pattern="^(legacy|rrf|rrf_boost)$",
description="hybrid 모드 fusion 전략 (legacy=기존 가중합, rrf=RRF k=60, rrf_boost=RRF+강한신호 boost)",
),
debug: bool = Query(False, description="단계별 candidates + timing 응답에 포함"),
):
"""문서 검색 — FTS + ILIKE + 벡터 결합 (Phase 0.5: RRF fusion)"""
timing: dict[str, float] = {}
notes: list[str] = []
text_results: list[SearchResult] = []
vector_results: list[SearchResult] = []
t_total = time.perf_counter()
if mode == "vector":
t0 = time.perf_counter()
vector_results = await _search_vector(session, q, limit)
timing["vector_ms"] = (time.perf_counter() - t0) * 1000
if not vector_results:
notes.append("vector_search_returned_empty (AI client error or no embeddings)")
results = vector_results
else:
t0 = time.perf_counter()
text_results = await _search_text(session, q, limit)
timing["text_ms"] = (time.perf_counter() - t0) * 1000
if mode == "hybrid":
t1 = time.perf_counter()
vector_results = await _search_vector(session, q, limit)
timing["vector_ms"] = (time.perf_counter() - t1) * 1000
if not vector_results:
notes.append("vector_search_returned_empty — text-only fallback")
t2 = time.perf_counter()
strategy = get_strategy(fusion)
results = strategy.fuse(text_results, vector_results, q, limit)
timing["fusion_ms"] = (time.perf_counter() - t2) * 1000
notes.append(f"fusion={strategy.name}")
else:
results = text_results
# display score 정규화 — 프론트엔드는 score*100을 % 표시.
# fusion 내부 score(RRF는 0.01~0.05 범위)를 그대로 노출하면 표시가 깨짐.
normalize_display_scores(results)
timing["total_ms"] = (time.perf_counter() - t_total) * 1000
# confidence는 fusion 적용 전 raw 신호로 계산 (Phase 0.5 이후 fused score는 절대값 의미 없음)
if mode == "hybrid":
confidence_signal = compute_confidence_hybrid(text_results, vector_results)
elif mode == "vector":
confidence_signal = compute_confidence(vector_results, "vector")
else:
confidence_signal = compute_confidence(text_results, mode)
# 사용자 feedback: 모든 단계 timing은 debug 응답과 별도로 항상 로그로 남긴다
timing_str = " ".join(f"{k}={v:.0f}" for k, v in timing.items())
fusion_str = f" fusion={fusion}" if mode == "hybrid" else ""
logger.info(
"search query=%r mode=%s%s results=%d conf=%.2f %s",
q[:80], mode, fusion_str, len(results), confidence_signal, timing_str,
)
# Phase 0.3: 실패 자동 로깅 (응답 latency에 영향 X — background task)
background_tasks.add_task(
record_search_event, q, user.id, results, mode, confidence_signal
)
debug_obj: SearchDebug | None = None
if debug:
debug_obj = SearchDebug(
timing_ms=timing,
text_candidates=_to_debug_candidates(text_results) if text_results or mode != "vector" else None,
vector_candidates=_to_debug_candidates(vector_results) if vector_results or mode in ("vector", "hybrid") else None,
fused_candidates=_to_debug_candidates(results) if mode == "hybrid" else None,
confidence=confidence_signal,
notes=notes,
)
return SearchResponse(
results=results,
total=len(results),
query=q,
mode=mode,
debug=debug_obj,
)
async def _search_text(session: AsyncSession, query: str, limit: int) -> list[SearchResult]:
"""FTS + ILIKE — 필드별 가중치 적용"""
result = await session.execute(
text("""
SELECT id, title, ai_domain, ai_summary, file_format,
left(extracted_text, 200) AS snippet,
(
-- title 매칭 (가중치 최고)
CASE WHEN coalesce(title, '') ILIKE '%%' || :q || '%%' THEN 3.0 ELSE 0 END
-- ai_tags 매칭 (가중치 높음)
+ CASE WHEN coalesce(ai_tags::text, '') ILIKE '%%' || :q || '%%' THEN 2.5 ELSE 0 END
-- user_note 매칭 (가중치 높음)
+ CASE WHEN coalesce(user_note, '') ILIKE '%%' || :q || '%%' THEN 2.0 ELSE 0 END
-- ai_summary 매칭 (가중치 중상)
+ CASE WHEN coalesce(ai_summary, '') ILIKE '%%' || :q || '%%' THEN 1.5 ELSE 0 END
-- extracted_text 매칭 (가중치 중간)
+ CASE WHEN coalesce(extracted_text, '') ILIKE '%%' || :q || '%%' THEN 1.0 ELSE 0 END
-- FTS 점수 (보너스)
+ coalesce(ts_rank(
to_tsvector('simple', coalesce(title, '') || ' ' || coalesce(extracted_text, '')),
plainto_tsquery('simple', :q)
), 0) * 2.0
) AS score,
-- match reason
CASE
WHEN coalesce(title, '') ILIKE '%%' || :q || '%%' THEN 'title'
WHEN coalesce(ai_tags::text, '') ILIKE '%%' || :q || '%%' THEN 'tags'
WHEN coalesce(user_note, '') ILIKE '%%' || :q || '%%' THEN 'note'
WHEN coalesce(ai_summary, '') ILIKE '%%' || :q || '%%' THEN 'summary'
WHEN coalesce(extracted_text, '') ILIKE '%%' || :q || '%%' THEN 'content'
ELSE 'fts'
END AS match_reason
FROM documents
WHERE deleted_at IS NULL
AND (coalesce(title, '') ILIKE '%%' || :q || '%%'
OR coalesce(ai_tags::text, '') ILIKE '%%' || :q || '%%'
OR coalesce(user_note, '') ILIKE '%%' || :q || '%%'
OR coalesce(ai_summary, '') ILIKE '%%' || :q || '%%'
OR coalesce(extracted_text, '') ILIKE '%%' || :q || '%%'
OR to_tsvector('simple', coalesce(title, '') || ' ' || coalesce(extracted_text, ''))
@@ plainto_tsquery('simple', :q))
ORDER BY score DESC
LIMIT :limit
"""),
{"q": query, "limit": limit},
)
return [SearchResult(**row._mapping) for row in result]
async def _search_vector(session: AsyncSession, query: str, limit: int) -> list[SearchResult]:
"""벡터 유사도 검색 (코사인 거리)"""
try:
client = AIClient()
query_embedding = await client.embed(query)
await client.close()
except Exception:
return []
result = await session.execute(
text("""
SELECT id, title, ai_domain, ai_summary, file_format,
(1 - (embedding <=> cast(:embedding AS vector))) AS score,
left(extracted_text, 200) AS snippet,
'vector' AS match_reason
FROM documents
WHERE embedding IS NOT NULL AND deleted_at IS NULL
ORDER BY embedding <=> cast(:embedding AS vector)
LIMIT :limit
"""),
{"embedding": str(query_embedding), "limit": limit},
)
return [SearchResult(**row._mapping) for row in result]