Files
hyungi_document_server/app/api/search.py
Hyungi Ahn 473e7e2e6d feat(search): Phase 0.4 debug 응답 옵션 + timing 로그
?debug=true로 호출 시 단계별 candidates + timing을 응답에 포함.
디버그 옵션과 별개로 모든 검색에 timing 라인을 구조화 로그로 출력
(사용자 feedback: 운영 관찰엔 debug 응답만으론 부족).

신규 응답 필드 (debug=true 시):
- timing_ms: text_ms / vector_ms / merge_ms / total_ms
- text_candidates / vector_candidates / fused_candidates (top 20)
- confidence (telemetry와 동일 휴리스틱)
- notes (예: vector 검색 실패 시 fallback 표시)
- query_analysis / reranker_scores: Phase 1/2용 placeholder

기본 응답(debug=false)은 변화 없음 (results, total, query, mode).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-07 08:41:33 +09:00

253 lines
9.5 KiB
Python

"""하이브리드 검색 API — FTS + ILIKE + 벡터 (필드별 가중치)"""
import logging
import time
from typing import Annotated
from fastapi import APIRouter, BackgroundTasks, Depends, Query
from pydantic import BaseModel
from sqlalchemy import text
from sqlalchemy.ext.asyncio import AsyncSession
from ai.client import AIClient
from core.auth import get_current_user
from core.database import get_session
from models.user import User
from services.search_telemetry import compute_confidence, record_search_event
logger = logging.getLogger("search")
router = APIRouter()
class SearchResult(BaseModel):
id: int
title: str | None
ai_domain: str | None
ai_summary: str | None
file_format: str
score: float
snippet: str | None
match_reason: str | None = None
# ─── Phase 0.4: 디버그 응답 스키마 ─────────────────────────
class DebugCandidate(BaseModel):
"""단계별 후보 (debug=true 응답에서만 노출)."""
id: int
rank: int
score: float
match_reason: str | None = None
class SearchDebug(BaseModel):
timing_ms: dict[str, float]
text_candidates: list[DebugCandidate] | None = None
vector_candidates: list[DebugCandidate] | None = None
fused_candidates: list[DebugCandidate] | None = None
confidence: float
notes: list[str] = []
# Phase 1/2 도입 후 채워질 placeholder
query_analysis: dict | None = None
reranker_scores: list[DebugCandidate] | None = None
class SearchResponse(BaseModel):
results: list[SearchResult]
total: int
query: str
mode: str
debug: SearchDebug | None = None
def _to_debug_candidates(rows: list[SearchResult], n: int = 20) -> list[DebugCandidate]:
return [
DebugCandidate(
id=r.id, rank=i + 1, score=r.score, match_reason=r.match_reason
)
for i, r in enumerate(rows[:n])
]
@router.get("/", response_model=SearchResponse)
async def search(
q: str,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
background_tasks: BackgroundTasks,
mode: str = Query("hybrid", pattern="^(fts|trgm|vector|hybrid)$"),
limit: int = Query(20, ge=1, le=100),
debug: bool = Query(False, description="단계별 candidates + timing 응답에 포함"),
):
"""문서 검색 — FTS + ILIKE + 벡터 결합"""
timing: dict[str, float] = {}
notes: list[str] = []
text_results: list[SearchResult] = []
vector_results: list[SearchResult] = []
t_total = time.perf_counter()
if mode == "vector":
t0 = time.perf_counter()
vector_results = await _search_vector(session, q, limit)
timing["vector_ms"] = (time.perf_counter() - t0) * 1000
if not vector_results:
notes.append("vector_search_returned_empty (AI client error or no embeddings)")
results = vector_results
else:
t0 = time.perf_counter()
text_results = await _search_text(session, q, limit)
timing["text_ms"] = (time.perf_counter() - t0) * 1000
if mode == "hybrid":
t1 = time.perf_counter()
vector_results = await _search_vector(session, q, limit)
timing["vector_ms"] = (time.perf_counter() - t1) * 1000
if not vector_results:
notes.append("vector_search_returned_empty — text-only fallback")
t2 = time.perf_counter()
results = _merge_results(text_results, vector_results, limit)
timing["merge_ms"] = (time.perf_counter() - t2) * 1000
else:
results = text_results
timing["total_ms"] = (time.perf_counter() - t_total) * 1000
# 사용자 feedback: 모든 단계 timing은 debug 응답과 별도로 항상 로그로 남긴다
timing_str = " ".join(f"{k}={v:.0f}" for k, v in timing.items())
logger.info(
"search query=%r mode=%s results=%d %s",
q[:80], mode, len(results), timing_str,
)
# Phase 0.3: 실패 자동 로깅 (응답 latency에 영향 X — background task)
background_tasks.add_task(record_search_event, q, user.id, results, mode)
debug_obj: SearchDebug | None = None
if debug:
debug_obj = SearchDebug(
timing_ms=timing,
text_candidates=_to_debug_candidates(text_results) if text_results or mode != "vector" else None,
vector_candidates=_to_debug_candidates(vector_results) if vector_results or mode in ("vector", "hybrid") else None,
fused_candidates=_to_debug_candidates(results) if mode == "hybrid" else None,
confidence=compute_confidence(results, mode),
notes=notes,
)
return SearchResponse(
results=results,
total=len(results),
query=q,
mode=mode,
debug=debug_obj,
)
async def _search_text(session: AsyncSession, query: str, limit: int) -> list[SearchResult]:
"""FTS + ILIKE — 필드별 가중치 적용"""
result = await session.execute(
text("""
SELECT id, title, ai_domain, ai_summary, file_format,
left(extracted_text, 200) AS snippet,
(
-- title 매칭 (가중치 최고)
CASE WHEN coalesce(title, '') ILIKE '%%' || :q || '%%' THEN 3.0 ELSE 0 END
-- ai_tags 매칭 (가중치 높음)
+ CASE WHEN coalesce(ai_tags::text, '') ILIKE '%%' || :q || '%%' THEN 2.5 ELSE 0 END
-- user_note 매칭 (가중치 높음)
+ CASE WHEN coalesce(user_note, '') ILIKE '%%' || :q || '%%' THEN 2.0 ELSE 0 END
-- ai_summary 매칭 (가중치 중상)
+ CASE WHEN coalesce(ai_summary, '') ILIKE '%%' || :q || '%%' THEN 1.5 ELSE 0 END
-- extracted_text 매칭 (가중치 중간)
+ CASE WHEN coalesce(extracted_text, '') ILIKE '%%' || :q || '%%' THEN 1.0 ELSE 0 END
-- FTS 점수 (보너스)
+ coalesce(ts_rank(
to_tsvector('simple', coalesce(title, '') || ' ' || coalesce(extracted_text, '')),
plainto_tsquery('simple', :q)
), 0) * 2.0
) AS score,
-- match reason
CASE
WHEN coalesce(title, '') ILIKE '%%' || :q || '%%' THEN 'title'
WHEN coalesce(ai_tags::text, '') ILIKE '%%' || :q || '%%' THEN 'tags'
WHEN coalesce(user_note, '') ILIKE '%%' || :q || '%%' THEN 'note'
WHEN coalesce(ai_summary, '') ILIKE '%%' || :q || '%%' THEN 'summary'
WHEN coalesce(extracted_text, '') ILIKE '%%' || :q || '%%' THEN 'content'
ELSE 'fts'
END AS match_reason
FROM documents
WHERE deleted_at IS NULL
AND (coalesce(title, '') ILIKE '%%' || :q || '%%'
OR coalesce(ai_tags::text, '') ILIKE '%%' || :q || '%%'
OR coalesce(user_note, '') ILIKE '%%' || :q || '%%'
OR coalesce(ai_summary, '') ILIKE '%%' || :q || '%%'
OR coalesce(extracted_text, '') ILIKE '%%' || :q || '%%'
OR to_tsvector('simple', coalesce(title, '') || ' ' || coalesce(extracted_text, ''))
@@ plainto_tsquery('simple', :q))
ORDER BY score DESC
LIMIT :limit
"""),
{"q": query, "limit": limit},
)
return [SearchResult(**row._mapping) for row in result]
async def _search_vector(session: AsyncSession, query: str, limit: int) -> list[SearchResult]:
"""벡터 유사도 검색 (코사인 거리)"""
try:
client = AIClient()
query_embedding = await client.embed(query)
await client.close()
except Exception:
return []
result = await session.execute(
text("""
SELECT id, title, ai_domain, ai_summary, file_format,
(1 - (embedding <=> cast(:embedding AS vector))) AS score,
left(extracted_text, 200) AS snippet,
'vector' AS match_reason
FROM documents
WHERE embedding IS NOT NULL AND deleted_at IS NULL
ORDER BY embedding <=> cast(:embedding AS vector)
LIMIT :limit
"""),
{"embedding": str(query_embedding), "limit": limit},
)
return [SearchResult(**row._mapping) for row in result]
def _merge_results(
text_results: list[SearchResult],
vector_results: list[SearchResult],
limit: int,
) -> list[SearchResult]:
"""텍스트 + 벡터 결과 합산 (중복 제거, 점수 합산)"""
merged: dict[int, SearchResult] = {}
for r in text_results:
merged[r.id] = r
for r in vector_results:
if r.id in merged:
# 이미 텍스트로 잡힌 문서 — 벡터 점수 가산
existing = merged[r.id]
merged[r.id] = SearchResult(
id=existing.id,
title=existing.title,
ai_domain=existing.ai_domain,
ai_summary=existing.ai_summary,
file_format=existing.file_format,
score=existing.score + r.score * 0.5,
snippet=existing.snippet,
match_reason=f"{existing.match_reason}+vector",
)
elif r.score > 0.3: # 벡터 유사도 최소 threshold
merged[r.id] = r
results = sorted(merged.values(), key=lambda x: x.score, reverse=True)
return results[:limit]