"""하이브리드 검색 API — FTS + ILIKE + 벡터 (필드별 가중치)""" import time from typing import Annotated from fastapi import APIRouter, BackgroundTasks, Depends, Query from pydantic import BaseModel from sqlalchemy import text from sqlalchemy.ext.asyncio import AsyncSession from ai.client import AIClient from core.auth import get_current_user from core.database import get_session from core.utils import setup_logger from models.user import User from services.search_telemetry import compute_confidence, record_search_event # logs/search.log + stdout 동시 출력 (Phase 0.4) logger = setup_logger("search") router = APIRouter() class SearchResult(BaseModel): id: int title: str | None ai_domain: str | None ai_summary: str | None file_format: str score: float snippet: str | None match_reason: str | None = None # ─── Phase 0.4: 디버그 응답 스키마 ───────────────────────── class DebugCandidate(BaseModel): """단계별 후보 (debug=true 응답에서만 노출).""" id: int rank: int score: float match_reason: str | None = None class SearchDebug(BaseModel): timing_ms: dict[str, float] text_candidates: list[DebugCandidate] | None = None vector_candidates: list[DebugCandidate] | None = None fused_candidates: list[DebugCandidate] | None = None confidence: float notes: list[str] = [] # Phase 1/2 도입 후 채워질 placeholder query_analysis: dict | None = None reranker_scores: list[DebugCandidate] | None = None class SearchResponse(BaseModel): results: list[SearchResult] total: int query: str mode: str debug: SearchDebug | None = None def _to_debug_candidates(rows: list[SearchResult], n: int = 20) -> list[DebugCandidate]: return [ DebugCandidate( id=r.id, rank=i + 1, score=r.score, match_reason=r.match_reason ) for i, r in enumerate(rows[:n]) ] @router.get("/", response_model=SearchResponse) async def search( q: str, user: Annotated[User, Depends(get_current_user)], session: Annotated[AsyncSession, Depends(get_session)], background_tasks: BackgroundTasks, mode: str = Query("hybrid", pattern="^(fts|trgm|vector|hybrid)$"), limit: int = Query(20, ge=1, le=100), debug: bool = Query(False, description="단계별 candidates + timing 응답에 포함"), ): """문서 검색 — FTS + ILIKE + 벡터 결합""" timing: dict[str, float] = {} notes: list[str] = [] text_results: list[SearchResult] = [] vector_results: list[SearchResult] = [] t_total = time.perf_counter() if mode == "vector": t0 = time.perf_counter() vector_results = await _search_vector(session, q, limit) timing["vector_ms"] = (time.perf_counter() - t0) * 1000 if not vector_results: notes.append("vector_search_returned_empty (AI client error or no embeddings)") results = vector_results else: t0 = time.perf_counter() text_results = await _search_text(session, q, limit) timing["text_ms"] = (time.perf_counter() - t0) * 1000 if mode == "hybrid": t1 = time.perf_counter() vector_results = await _search_vector(session, q, limit) timing["vector_ms"] = (time.perf_counter() - t1) * 1000 if not vector_results: notes.append("vector_search_returned_empty — text-only fallback") t2 = time.perf_counter() results = _merge_results(text_results, vector_results, limit) timing["merge_ms"] = (time.perf_counter() - t2) * 1000 else: results = text_results timing["total_ms"] = (time.perf_counter() - t_total) * 1000 # 사용자 feedback: 모든 단계 timing은 debug 응답과 별도로 항상 로그로 남긴다 timing_str = " ".join(f"{k}={v:.0f}" for k, v in timing.items()) logger.info( "search query=%r mode=%s results=%d %s", q[:80], mode, len(results), timing_str, ) # Phase 0.3: 실패 자동 로깅 (응답 latency에 영향 X — background task) background_tasks.add_task(record_search_event, q, user.id, results, mode) debug_obj: SearchDebug | None = None if debug: debug_obj = SearchDebug( timing_ms=timing, text_candidates=_to_debug_candidates(text_results) if text_results or mode != "vector" else None, vector_candidates=_to_debug_candidates(vector_results) if vector_results or mode in ("vector", "hybrid") else None, fused_candidates=_to_debug_candidates(results) if mode == "hybrid" else None, confidence=compute_confidence(results, mode), notes=notes, ) return SearchResponse( results=results, total=len(results), query=q, mode=mode, debug=debug_obj, ) async def _search_text(session: AsyncSession, query: str, limit: int) -> list[SearchResult]: """FTS + ILIKE — 필드별 가중치 적용""" result = await session.execute( text(""" SELECT id, title, ai_domain, ai_summary, file_format, left(extracted_text, 200) AS snippet, ( -- title 매칭 (가중치 최고) CASE WHEN coalesce(title, '') ILIKE '%%' || :q || '%%' THEN 3.0 ELSE 0 END -- ai_tags 매칭 (가중치 높음) + CASE WHEN coalesce(ai_tags::text, '') ILIKE '%%' || :q || '%%' THEN 2.5 ELSE 0 END -- user_note 매칭 (가중치 높음) + CASE WHEN coalesce(user_note, '') ILIKE '%%' || :q || '%%' THEN 2.0 ELSE 0 END -- ai_summary 매칭 (가중치 중상) + CASE WHEN coalesce(ai_summary, '') ILIKE '%%' || :q || '%%' THEN 1.5 ELSE 0 END -- extracted_text 매칭 (가중치 중간) + CASE WHEN coalesce(extracted_text, '') ILIKE '%%' || :q || '%%' THEN 1.0 ELSE 0 END -- FTS 점수 (보너스) + coalesce(ts_rank( to_tsvector('simple', coalesce(title, '') || ' ' || coalesce(extracted_text, '')), plainto_tsquery('simple', :q) ), 0) * 2.0 ) AS score, -- match reason CASE WHEN coalesce(title, '') ILIKE '%%' || :q || '%%' THEN 'title' WHEN coalesce(ai_tags::text, '') ILIKE '%%' || :q || '%%' THEN 'tags' WHEN coalesce(user_note, '') ILIKE '%%' || :q || '%%' THEN 'note' WHEN coalesce(ai_summary, '') ILIKE '%%' || :q || '%%' THEN 'summary' WHEN coalesce(extracted_text, '') ILIKE '%%' || :q || '%%' THEN 'content' ELSE 'fts' END AS match_reason FROM documents WHERE deleted_at IS NULL AND (coalesce(title, '') ILIKE '%%' || :q || '%%' OR coalesce(ai_tags::text, '') ILIKE '%%' || :q || '%%' OR coalesce(user_note, '') ILIKE '%%' || :q || '%%' OR coalesce(ai_summary, '') ILIKE '%%' || :q || '%%' OR coalesce(extracted_text, '') ILIKE '%%' || :q || '%%' OR to_tsvector('simple', coalesce(title, '') || ' ' || coalesce(extracted_text, '')) @@ plainto_tsquery('simple', :q)) ORDER BY score DESC LIMIT :limit """), {"q": query, "limit": limit}, ) return [SearchResult(**row._mapping) for row in result] async def _search_vector(session: AsyncSession, query: str, limit: int) -> list[SearchResult]: """벡터 유사도 검색 (코사인 거리)""" try: client = AIClient() query_embedding = await client.embed(query) await client.close() except Exception: return [] result = await session.execute( text(""" SELECT id, title, ai_domain, ai_summary, file_format, (1 - (embedding <=> cast(:embedding AS vector))) AS score, left(extracted_text, 200) AS snippet, 'vector' AS match_reason FROM documents WHERE embedding IS NOT NULL AND deleted_at IS NULL ORDER BY embedding <=> cast(:embedding AS vector) LIMIT :limit """), {"embedding": str(query_embedding), "limit": limit}, ) return [SearchResult(**row._mapping) for row in result] def _merge_results( text_results: list[SearchResult], vector_results: list[SearchResult], limit: int, ) -> list[SearchResult]: """텍스트 + 벡터 결과 합산 (중복 제거, 점수 합산)""" merged: dict[int, SearchResult] = {} for r in text_results: merged[r.id] = r for r in vector_results: if r.id in merged: # 이미 텍스트로 잡힌 문서 — 벡터 점수 가산 existing = merged[r.id] merged[r.id] = SearchResult( id=existing.id, title=existing.title, ai_domain=existing.ai_domain, ai_summary=existing.ai_summary, file_format=existing.file_format, score=existing.score + r.score * 0.5, snippet=existing.snippet, match_reason=f"{existing.match_reason}+vector", ) elif r.score > 0.3: # 벡터 유사도 최소 threshold merged[r.id] = r results = sorted(merged.values(), key=lambda x: x.score, reverse=True) return results[:limit]