Files
hyungi_document_server/app/api/search.py
Hyungi Ahn a4eb71d368 feat(search): Phase 1.1a 모듈 분리 — services/search/ 디렉토리
검색 로직을 services/search/* 모듈로 분리. trigram 도입은 Phase 1.2 인덱스와 함께.

신규:
- services/search/{__init__,retrieval_service,rerank_service,query_analyzer,evidence_service,synthesis_service}.py
- retrieval_service는 search_text/search_vector 이전 (ILIKE 동작 그대로)
- 나머지는 Phase 1.3/2/3 placeholder

이동:
- services/search_fusion.py → services/search/fusion_service.py (R100)

수정:
- api/search.py — thin orchestrator로 축소 (251줄 → 178줄)

동작 변경 없음 — 구조만 분리. 회귀 검증 후 Phase 1.2 진입.
2026-04-07 13:46:04 +09:00

178 lines
6.2 KiB
Python

"""하이브리드 검색 API — orchestrator (Phase 1.1: thin endpoint).
retrieval / fusion / rerank 등 실제 로직은 services/search/* 모듈로 분리.
이 파일은 mode 분기, 응답 직렬화, debug 응답 구성, BackgroundTask dispatch만 담당.
"""
import time
from typing import Annotated
from fastapi import APIRouter, BackgroundTasks, Depends, Query
from pydantic import BaseModel
from sqlalchemy.ext.asyncio import AsyncSession
from core.auth import get_current_user
from core.database import get_session
from core.utils import setup_logger
from models.user import User
from services.search.fusion_service import DEFAULT_FUSION, get_strategy, normalize_display_scores
from services.search.retrieval_service import search_text, search_vector
from services.search_telemetry import (
compute_confidence,
compute_confidence_hybrid,
record_search_event,
)
# logs/search.log + stdout 동시 출력 (Phase 0.4)
logger = setup_logger("search")
router = APIRouter()
class SearchResult(BaseModel):
id: int
title: str | None
ai_domain: str | None
ai_summary: str | None
file_format: str
score: float
snippet: str | None
match_reason: str | None = None
# ─── Phase 0.4: 디버그 응답 스키마 ─────────────────────────
class DebugCandidate(BaseModel):
"""단계별 후보 (debug=true 응답에서만 노출)."""
id: int
rank: int
score: float
match_reason: str | None = None
class SearchDebug(BaseModel):
timing_ms: dict[str, float]
text_candidates: list[DebugCandidate] | None = None
vector_candidates: list[DebugCandidate] | None = None
fused_candidates: list[DebugCandidate] | None = None
confidence: float
notes: list[str] = []
# Phase 1/2 도입 후 채워질 placeholder
query_analysis: dict | None = None
reranker_scores: list[DebugCandidate] | None = None
class SearchResponse(BaseModel):
results: list[SearchResult]
total: int
query: str
mode: str
debug: SearchDebug | None = None
def _to_debug_candidates(rows: list[SearchResult], n: int = 20) -> list[DebugCandidate]:
return [
DebugCandidate(
id=r.id, rank=i + 1, score=r.score, match_reason=r.match_reason
)
for i, r in enumerate(rows[:n])
]
@router.get("/", response_model=SearchResponse)
async def search(
q: str,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
background_tasks: BackgroundTasks,
mode: str = Query("hybrid", pattern="^(fts|trgm|vector|hybrid)$"),
limit: int = Query(20, ge=1, le=100),
fusion: str = Query(
DEFAULT_FUSION,
pattern="^(legacy|rrf|rrf_boost)$",
description="hybrid 모드 fusion 전략 (legacy=기존 가중합, rrf=RRF k=60, rrf_boost=RRF+강한신호 boost)",
),
debug: bool = Query(False, description="단계별 candidates + timing 응답에 포함"),
):
"""문서 검색 — FTS + ILIKE + 벡터 결합 (Phase 0.5: RRF fusion)"""
timing: dict[str, float] = {}
notes: list[str] = []
text_results: list[SearchResult] = []
vector_results: list[SearchResult] = []
t_total = time.perf_counter()
if mode == "vector":
t0 = time.perf_counter()
vector_results = await search_vector(session, q, limit)
timing["vector_ms"] = (time.perf_counter() - t0) * 1000
if not vector_results:
notes.append("vector_search_returned_empty (AI client error or no embeddings)")
results = vector_results
else:
t0 = time.perf_counter()
text_results = await search_text(session, q, limit)
timing["text_ms"] = (time.perf_counter() - t0) * 1000
if mode == "hybrid":
t1 = time.perf_counter()
vector_results = await search_vector(session, q, limit)
timing["vector_ms"] = (time.perf_counter() - t1) * 1000
if not vector_results:
notes.append("vector_search_returned_empty — text-only fallback")
t2 = time.perf_counter()
strategy = get_strategy(fusion)
results = strategy.fuse(text_results, vector_results, q, limit)
timing["fusion_ms"] = (time.perf_counter() - t2) * 1000
notes.append(f"fusion={strategy.name}")
else:
results = text_results
# display score 정규화 — 프론트엔드는 score*100을 % 표시.
# fusion 내부 score(RRF는 0.01~0.05 범위)를 그대로 노출하면 표시가 깨짐.
normalize_display_scores(results)
timing["total_ms"] = (time.perf_counter() - t_total) * 1000
# confidence는 fusion 적용 전 raw 신호로 계산 (Phase 0.5 이후 fused score는 절대값 의미 없음)
if mode == "hybrid":
confidence_signal = compute_confidence_hybrid(text_results, vector_results)
elif mode == "vector":
confidence_signal = compute_confidence(vector_results, "vector")
else:
confidence_signal = compute_confidence(text_results, mode)
# 사용자 feedback: 모든 단계 timing은 debug 응답과 별도로 항상 로그로 남긴다
timing_str = " ".join(f"{k}={v:.0f}" for k, v in timing.items())
fusion_str = f" fusion={fusion}" if mode == "hybrid" else ""
logger.info(
"search query=%r mode=%s%s results=%d conf=%.2f %s",
q[:80], mode, fusion_str, len(results), confidence_signal, timing_str,
)
# Phase 0.3: 실패 자동 로깅 (응답 latency에 영향 X — background task)
background_tasks.add_task(
record_search_event, q, user.id, results, mode, confidence_signal
)
debug_obj: SearchDebug | None = None
if debug:
debug_obj = SearchDebug(
timing_ms=timing,
text_candidates=_to_debug_candidates(text_results) if text_results or mode != "vector" else None,
vector_candidates=_to_debug_candidates(vector_results) if vector_results or mode in ("vector", "hybrid") else None,
fused_candidates=_to_debug_candidates(results) if mode == "hybrid" else None,
confidence=confidence_signal,
notes=notes,
)
return SearchResponse(
results=results,
total=len(results),
query=q,
mode=mode,
debug=debug_obj,
)