기존 weighted-sum merge를 Reciprocal Rank Fusion으로 교체. 정확 키워드 매치에서 RRF가 평탄화되는 문제는 boost로 보완. 신규 모듈 app/services/search_fusion.py: - FusionStrategy ABC - LegacyWeightedSum : 기존 _merge_results 동작 (A/B 비교용) - RRFOnly : 순수 RRF, k=60 - RRFWithBoost : RRF + title/tags/법령조문/high-text-score boost (default) - normalize_display_scores: SearchResult.score를 [0..1] 랭크 기반 정규화 (프론트엔드가 score*100을 % 표시하므로 RRF 원본 점수 노출 시 표시 깨짐) search.py: - ?fusion=legacy|rrf|rrf_boost 파라미터 (default rrf_boost) - _merge_results 제거 (LegacyWeightedSum에 흡수) - pre-fusion confidence: hybrid는 raw text/vector 신호로 계산 (fused score는 fusion 전략마다 스케일이 달라 일관 비교 불가) - timing에 fusion_ms 추가 - debug notes에 fusion 전략 표시 telemetry: - compute_confidence_hybrid(text_results, vector_results) 헬퍼 - record_search_event에 confidence override 파라미터 run_eval.py: - --fusion CLI 옵션, call_search 쿼리 파라미터에 전달 Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
251 lines
9.7 KiB
Python
251 lines
9.7 KiB
Python
"""하이브리드 검색 API — FTS + ILIKE + 벡터 (필드별 가중치)"""
|
|
|
|
import time
|
|
from typing import Annotated
|
|
|
|
from fastapi import APIRouter, BackgroundTasks, Depends, Query
|
|
from pydantic import BaseModel
|
|
from sqlalchemy import text
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from ai.client import AIClient
|
|
from core.auth import get_current_user
|
|
from core.database import get_session
|
|
from core.utils import setup_logger
|
|
from models.user import User
|
|
from services.search_fusion import DEFAULT_FUSION, get_strategy, normalize_display_scores
|
|
from services.search_telemetry import (
|
|
compute_confidence,
|
|
compute_confidence_hybrid,
|
|
record_search_event,
|
|
)
|
|
|
|
# logs/search.log + stdout 동시 출력 (Phase 0.4)
|
|
logger = setup_logger("search")
|
|
|
|
router = APIRouter()
|
|
|
|
|
|
class SearchResult(BaseModel):
|
|
id: int
|
|
title: str | None
|
|
ai_domain: str | None
|
|
ai_summary: str | None
|
|
file_format: str
|
|
score: float
|
|
snippet: str | None
|
|
match_reason: str | None = None
|
|
|
|
|
|
# ─── Phase 0.4: 디버그 응답 스키마 ─────────────────────────
|
|
|
|
|
|
class DebugCandidate(BaseModel):
|
|
"""단계별 후보 (debug=true 응답에서만 노출)."""
|
|
id: int
|
|
rank: int
|
|
score: float
|
|
match_reason: str | None = None
|
|
|
|
|
|
class SearchDebug(BaseModel):
|
|
timing_ms: dict[str, float]
|
|
text_candidates: list[DebugCandidate] | None = None
|
|
vector_candidates: list[DebugCandidate] | None = None
|
|
fused_candidates: list[DebugCandidate] | None = None
|
|
confidence: float
|
|
notes: list[str] = []
|
|
# Phase 1/2 도입 후 채워질 placeholder
|
|
query_analysis: dict | None = None
|
|
reranker_scores: list[DebugCandidate] | None = None
|
|
|
|
|
|
class SearchResponse(BaseModel):
|
|
results: list[SearchResult]
|
|
total: int
|
|
query: str
|
|
mode: str
|
|
debug: SearchDebug | None = None
|
|
|
|
|
|
def _to_debug_candidates(rows: list[SearchResult], n: int = 20) -> list[DebugCandidate]:
|
|
return [
|
|
DebugCandidate(
|
|
id=r.id, rank=i + 1, score=r.score, match_reason=r.match_reason
|
|
)
|
|
for i, r in enumerate(rows[:n])
|
|
]
|
|
|
|
|
|
@router.get("/", response_model=SearchResponse)
|
|
async def search(
|
|
q: str,
|
|
user: Annotated[User, Depends(get_current_user)],
|
|
session: Annotated[AsyncSession, Depends(get_session)],
|
|
background_tasks: BackgroundTasks,
|
|
mode: str = Query("hybrid", pattern="^(fts|trgm|vector|hybrid)$"),
|
|
limit: int = Query(20, ge=1, le=100),
|
|
fusion: str = Query(
|
|
DEFAULT_FUSION,
|
|
pattern="^(legacy|rrf|rrf_boost)$",
|
|
description="hybrid 모드 fusion 전략 (legacy=기존 가중합, rrf=RRF k=60, rrf_boost=RRF+강한신호 boost)",
|
|
),
|
|
debug: bool = Query(False, description="단계별 candidates + timing 응답에 포함"),
|
|
):
|
|
"""문서 검색 — FTS + ILIKE + 벡터 결합 (Phase 0.5: RRF fusion)"""
|
|
timing: dict[str, float] = {}
|
|
notes: list[str] = []
|
|
text_results: list[SearchResult] = []
|
|
vector_results: list[SearchResult] = []
|
|
|
|
t_total = time.perf_counter()
|
|
|
|
if mode == "vector":
|
|
t0 = time.perf_counter()
|
|
vector_results = await _search_vector(session, q, limit)
|
|
timing["vector_ms"] = (time.perf_counter() - t0) * 1000
|
|
if not vector_results:
|
|
notes.append("vector_search_returned_empty (AI client error or no embeddings)")
|
|
results = vector_results
|
|
else:
|
|
t0 = time.perf_counter()
|
|
text_results = await _search_text(session, q, limit)
|
|
timing["text_ms"] = (time.perf_counter() - t0) * 1000
|
|
|
|
if mode == "hybrid":
|
|
t1 = time.perf_counter()
|
|
vector_results = await _search_vector(session, q, limit)
|
|
timing["vector_ms"] = (time.perf_counter() - t1) * 1000
|
|
if not vector_results:
|
|
notes.append("vector_search_returned_empty — text-only fallback")
|
|
|
|
t2 = time.perf_counter()
|
|
strategy = get_strategy(fusion)
|
|
results = strategy.fuse(text_results, vector_results, q, limit)
|
|
timing["fusion_ms"] = (time.perf_counter() - t2) * 1000
|
|
notes.append(f"fusion={strategy.name}")
|
|
else:
|
|
results = text_results
|
|
|
|
# display score 정규화 — 프론트엔드는 score*100을 % 표시.
|
|
# fusion 내부 score(RRF는 0.01~0.05 범위)를 그대로 노출하면 표시가 깨짐.
|
|
normalize_display_scores(results)
|
|
|
|
timing["total_ms"] = (time.perf_counter() - t_total) * 1000
|
|
|
|
# confidence는 fusion 적용 전 raw 신호로 계산 (Phase 0.5 이후 fused score는 절대값 의미 없음)
|
|
if mode == "hybrid":
|
|
confidence_signal = compute_confidence_hybrid(text_results, vector_results)
|
|
elif mode == "vector":
|
|
confidence_signal = compute_confidence(vector_results, "vector")
|
|
else:
|
|
confidence_signal = compute_confidence(text_results, mode)
|
|
|
|
# 사용자 feedback: 모든 단계 timing은 debug 응답과 별도로 항상 로그로 남긴다
|
|
timing_str = " ".join(f"{k}={v:.0f}" for k, v in timing.items())
|
|
fusion_str = f" fusion={fusion}" if mode == "hybrid" else ""
|
|
logger.info(
|
|
"search query=%r mode=%s%s results=%d conf=%.2f %s",
|
|
q[:80], mode, fusion_str, len(results), confidence_signal, timing_str,
|
|
)
|
|
|
|
# Phase 0.3: 실패 자동 로깅 (응답 latency에 영향 X — background task)
|
|
background_tasks.add_task(
|
|
record_search_event, q, user.id, results, mode, confidence_signal
|
|
)
|
|
|
|
debug_obj: SearchDebug | None = None
|
|
if debug:
|
|
debug_obj = SearchDebug(
|
|
timing_ms=timing,
|
|
text_candidates=_to_debug_candidates(text_results) if text_results or mode != "vector" else None,
|
|
vector_candidates=_to_debug_candidates(vector_results) if vector_results or mode in ("vector", "hybrid") else None,
|
|
fused_candidates=_to_debug_candidates(results) if mode == "hybrid" else None,
|
|
confidence=confidence_signal,
|
|
notes=notes,
|
|
)
|
|
|
|
return SearchResponse(
|
|
results=results,
|
|
total=len(results),
|
|
query=q,
|
|
mode=mode,
|
|
debug=debug_obj,
|
|
)
|
|
|
|
|
|
async def _search_text(session: AsyncSession, query: str, limit: int) -> list[SearchResult]:
|
|
"""FTS + ILIKE — 필드별 가중치 적용"""
|
|
result = await session.execute(
|
|
text("""
|
|
SELECT id, title, ai_domain, ai_summary, file_format,
|
|
left(extracted_text, 200) AS snippet,
|
|
(
|
|
-- title 매칭 (가중치 최고)
|
|
CASE WHEN coalesce(title, '') ILIKE '%%' || :q || '%%' THEN 3.0 ELSE 0 END
|
|
-- ai_tags 매칭 (가중치 높음)
|
|
+ CASE WHEN coalesce(ai_tags::text, '') ILIKE '%%' || :q || '%%' THEN 2.5 ELSE 0 END
|
|
-- user_note 매칭 (가중치 높음)
|
|
+ CASE WHEN coalesce(user_note, '') ILIKE '%%' || :q || '%%' THEN 2.0 ELSE 0 END
|
|
-- ai_summary 매칭 (가중치 중상)
|
|
+ CASE WHEN coalesce(ai_summary, '') ILIKE '%%' || :q || '%%' THEN 1.5 ELSE 0 END
|
|
-- extracted_text 매칭 (가중치 중간)
|
|
+ CASE WHEN coalesce(extracted_text, '') ILIKE '%%' || :q || '%%' THEN 1.0 ELSE 0 END
|
|
-- FTS 점수 (보너스)
|
|
+ coalesce(ts_rank(
|
|
to_tsvector('simple', coalesce(title, '') || ' ' || coalesce(extracted_text, '')),
|
|
plainto_tsquery('simple', :q)
|
|
), 0) * 2.0
|
|
) AS score,
|
|
-- match reason
|
|
CASE
|
|
WHEN coalesce(title, '') ILIKE '%%' || :q || '%%' THEN 'title'
|
|
WHEN coalesce(ai_tags::text, '') ILIKE '%%' || :q || '%%' THEN 'tags'
|
|
WHEN coalesce(user_note, '') ILIKE '%%' || :q || '%%' THEN 'note'
|
|
WHEN coalesce(ai_summary, '') ILIKE '%%' || :q || '%%' THEN 'summary'
|
|
WHEN coalesce(extracted_text, '') ILIKE '%%' || :q || '%%' THEN 'content'
|
|
ELSE 'fts'
|
|
END AS match_reason
|
|
FROM documents
|
|
WHERE deleted_at IS NULL
|
|
AND (coalesce(title, '') ILIKE '%%' || :q || '%%'
|
|
OR coalesce(ai_tags::text, '') ILIKE '%%' || :q || '%%'
|
|
OR coalesce(user_note, '') ILIKE '%%' || :q || '%%'
|
|
OR coalesce(ai_summary, '') ILIKE '%%' || :q || '%%'
|
|
OR coalesce(extracted_text, '') ILIKE '%%' || :q || '%%'
|
|
OR to_tsvector('simple', coalesce(title, '') || ' ' || coalesce(extracted_text, ''))
|
|
@@ plainto_tsquery('simple', :q))
|
|
ORDER BY score DESC
|
|
LIMIT :limit
|
|
"""),
|
|
{"q": query, "limit": limit},
|
|
)
|
|
return [SearchResult(**row._mapping) for row in result]
|
|
|
|
|
|
async def _search_vector(session: AsyncSession, query: str, limit: int) -> list[SearchResult]:
|
|
"""벡터 유사도 검색 (코사인 거리)"""
|
|
try:
|
|
client = AIClient()
|
|
query_embedding = await client.embed(query)
|
|
await client.close()
|
|
except Exception:
|
|
return []
|
|
|
|
result = await session.execute(
|
|
text("""
|
|
SELECT id, title, ai_domain, ai_summary, file_format,
|
|
(1 - (embedding <=> cast(:embedding AS vector))) AS score,
|
|
left(extracted_text, 200) AS snippet,
|
|
'vector' AS match_reason
|
|
FROM documents
|
|
WHERE embedding IS NOT NULL AND deleted_at IS NULL
|
|
ORDER BY embedding <=> cast(:embedding AS vector)
|
|
LIMIT :limit
|
|
"""),
|
|
{"embedding": str(query_embedding), "limit": limit},
|
|
)
|
|
return [SearchResult(**row._mapping) for row in result]
|
|
|
|
|