Files
hyungi_document_server/app/api/search.py
Hyungi Ahn f005922483 feat(search): Phase 0.3 검색 실패 자동 로깅
검색 실패 케이스를 자동 수집해 gold dataset 시드로 활용.
wiggly-weaving-puppy 플랜 Phase 0.3 산출물.

자동 수집 트리거 (3가지):
- result_count == 0           → no_result
- confidence < 0.5            → low_confidence
- 60초 내 동일 사용자 재쿼리   → user_reformulated (이전 쿼리 기록)

confidence는 Phase 0.3 휴리스틱 (top score + match_reason).
Phase 2 QueryAnalyzer 도입 후 LLM 기반으로 교체 예정.

구현:
- migrations/015_search_failure_logs.sql: 테이블 + 3개 인덱스
- app/models/search_failure.py: ORM
- app/services/search_telemetry.py: confidence 계산 + recent 트래커 + INSERT
- app/api/search.py: BackgroundTasks로 dispatch (응답 latency 영향 X)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-07 08:29:12 +09:00

172 lines
6.6 KiB
Python

"""하이브리드 검색 API — FTS + ILIKE + 벡터 (필드별 가중치)"""
from typing import Annotated
from fastapi import APIRouter, BackgroundTasks, Depends, Query
from pydantic import BaseModel
from sqlalchemy import text
from sqlalchemy.ext.asyncio import AsyncSession
from ai.client import AIClient
from core.auth import get_current_user
from core.database import get_session
from models.user import User
from services.search_telemetry import record_search_event
router = APIRouter()
class SearchResult(BaseModel):
id: int
title: str | None
ai_domain: str | None
ai_summary: str | None
file_format: str
score: float
snippet: str | None
match_reason: str | None = None
class SearchResponse(BaseModel):
results: list[SearchResult]
total: int
query: str
mode: str
@router.get("/", response_model=SearchResponse)
async def search(
q: str,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
background_tasks: BackgroundTasks,
mode: str = Query("hybrid", pattern="^(fts|trgm|vector|hybrid)$"),
limit: int = Query(20, ge=1, le=100),
):
"""문서 검색 — FTS + ILIKE + 벡터 결합"""
if mode == "vector":
results = await _search_vector(session, q, limit)
else:
results = await _search_text(session, q, limit)
# hybrid: 벡터 결과도 합산
if mode == "hybrid":
vector_results = await _search_vector(session, q, limit)
results = _merge_results(results, vector_results, limit)
# Phase 0.3: 실패 자동 로깅 (응답 latency에 영향 X — background task)
background_tasks.add_task(record_search_event, q, user.id, results, mode)
return SearchResponse(
results=results,
total=len(results),
query=q,
mode=mode,
)
async def _search_text(session: AsyncSession, query: str, limit: int) -> list[SearchResult]:
"""FTS + ILIKE — 필드별 가중치 적용"""
result = await session.execute(
text("""
SELECT id, title, ai_domain, ai_summary, file_format,
left(extracted_text, 200) AS snippet,
(
-- title 매칭 (가중치 최고)
CASE WHEN coalesce(title, '') ILIKE '%%' || :q || '%%' THEN 3.0 ELSE 0 END
-- ai_tags 매칭 (가중치 높음)
+ CASE WHEN coalesce(ai_tags::text, '') ILIKE '%%' || :q || '%%' THEN 2.5 ELSE 0 END
-- user_note 매칭 (가중치 높음)
+ CASE WHEN coalesce(user_note, '') ILIKE '%%' || :q || '%%' THEN 2.0 ELSE 0 END
-- ai_summary 매칭 (가중치 중상)
+ CASE WHEN coalesce(ai_summary, '') ILIKE '%%' || :q || '%%' THEN 1.5 ELSE 0 END
-- extracted_text 매칭 (가중치 중간)
+ CASE WHEN coalesce(extracted_text, '') ILIKE '%%' || :q || '%%' THEN 1.0 ELSE 0 END
-- FTS 점수 (보너스)
+ coalesce(ts_rank(
to_tsvector('simple', coalesce(title, '') || ' ' || coalesce(extracted_text, '')),
plainto_tsquery('simple', :q)
), 0) * 2.0
) AS score,
-- match reason
CASE
WHEN coalesce(title, '') ILIKE '%%' || :q || '%%' THEN 'title'
WHEN coalesce(ai_tags::text, '') ILIKE '%%' || :q || '%%' THEN 'tags'
WHEN coalesce(user_note, '') ILIKE '%%' || :q || '%%' THEN 'note'
WHEN coalesce(ai_summary, '') ILIKE '%%' || :q || '%%' THEN 'summary'
WHEN coalesce(extracted_text, '') ILIKE '%%' || :q || '%%' THEN 'content'
ELSE 'fts'
END AS match_reason
FROM documents
WHERE deleted_at IS NULL
AND (coalesce(title, '') ILIKE '%%' || :q || '%%'
OR coalesce(ai_tags::text, '') ILIKE '%%' || :q || '%%'
OR coalesce(user_note, '') ILIKE '%%' || :q || '%%'
OR coalesce(ai_summary, '') ILIKE '%%' || :q || '%%'
OR coalesce(extracted_text, '') ILIKE '%%' || :q || '%%'
OR to_tsvector('simple', coalesce(title, '') || ' ' || coalesce(extracted_text, ''))
@@ plainto_tsquery('simple', :q))
ORDER BY score DESC
LIMIT :limit
"""),
{"q": query, "limit": limit},
)
return [SearchResult(**row._mapping) for row in result]
async def _search_vector(session: AsyncSession, query: str, limit: int) -> list[SearchResult]:
"""벡터 유사도 검색 (코사인 거리)"""
try:
client = AIClient()
query_embedding = await client.embed(query)
await client.close()
except Exception:
return []
result = await session.execute(
text("""
SELECT id, title, ai_domain, ai_summary, file_format,
(1 - (embedding <=> cast(:embedding AS vector))) AS score,
left(extracted_text, 200) AS snippet,
'vector' AS match_reason
FROM documents
WHERE embedding IS NOT NULL AND deleted_at IS NULL
ORDER BY embedding <=> cast(:embedding AS vector)
LIMIT :limit
"""),
{"embedding": str(query_embedding), "limit": limit},
)
return [SearchResult(**row._mapping) for row in result]
def _merge_results(
text_results: list[SearchResult],
vector_results: list[SearchResult],
limit: int,
) -> list[SearchResult]:
"""텍스트 + 벡터 결과 합산 (중복 제거, 점수 합산)"""
merged: dict[int, SearchResult] = {}
for r in text_results:
merged[r.id] = r
for r in vector_results:
if r.id in merged:
# 이미 텍스트로 잡힌 문서 — 벡터 점수 가산
existing = merged[r.id]
merged[r.id] = SearchResult(
id=existing.id,
title=existing.title,
ai_domain=existing.ai_domain,
ai_summary=existing.ai_summary,
file_format=existing.file_format,
score=existing.score + r.score * 0.5,
snippet=existing.snippet,
match_reason=f"{existing.match_reason}+vector",
)
elif r.score > 0.3: # 벡터 유사도 최소 threshold
merged[r.id] = r
results = sorted(merged.values(), key=lambda x: x.score, reverse=True)
return results[:limit]