Files
hyungi_document_server/app/services/search_telemetry.py
Hyungi Ahn f005922483 feat(search): Phase 0.3 검색 실패 자동 로깅
검색 실패 케이스를 자동 수집해 gold dataset 시드로 활용.
wiggly-weaving-puppy 플랜 Phase 0.3 산출물.

자동 수집 트리거 (3가지):
- result_count == 0           → no_result
- confidence < 0.5            → low_confidence
- 60초 내 동일 사용자 재쿼리   → user_reformulated (이전 쿼리 기록)

confidence는 Phase 0.3 휴리스틱 (top score + match_reason).
Phase 2 QueryAnalyzer 도입 후 LLM 기반으로 교체 예정.

구현:
- migrations/015_search_failure_logs.sql: 테이블 + 3개 인덱스
- app/models/search_failure.py: ORM
- app/services/search_telemetry.py: confidence 계산 + recent 트래커 + INSERT
- app/api/search.py: BackgroundTasks로 dispatch (응답 latency 영향 X)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-07 08:29:12 +09:00

218 lines
7.3 KiB
Python

"""검색 실패 자동 로깅 (Phase 0.3)
목적: gold dataset 시드 수집. 평가셋 확장의 재료.
자동 수집 트리거:
1) result_count == 0 → no_result
2) confidence < THRESHOLD → low_confidence
3) 60초 내 동일 사용자 재쿼리 → user_reformulated (이전 쿼리 기록)
confidence는 Phase 0.3 시점엔 휴리스틱(top score + match_reason 기반).
Phase 2 QueryAnalyzer 도입 후 LLM 기반 confidence로 교체될 예정.
⚠ 단일 fastapi 워커 가정: recent_searches 트래커는 in-memory dict.
멀티 워커로 확장 시 user_reformulated 신호가 일부 손실되지만 정확성에는 영향 없음.
"""
from __future__ import annotations
import asyncio
import logging
import time
from dataclasses import dataclass
from typing import Any
from sqlalchemy.exc import SQLAlchemyError
from core.database import async_session
from models.search_failure import SearchFailureLog
logger = logging.getLogger("search_telemetry")
# ─── 튜닝 파라미터 ─────────────────────────────────────
LOW_CONFIDENCE_THRESHOLD = 0.5
REFORMULATION_WINDOW_SEC = 60.0
TRACKER_MAX_USERS = 1000 # 인메모리 트래커 상한 (LRU-ish 정리)
# ─── 인메모리 최근 쿼리 트래커 ─────────────────────────
@dataclass
class _RecentSearch:
query: str
normalized: str
ts: float # monotonic seconds
_recent: dict[int, _RecentSearch] = {}
_recent_lock = asyncio.Lock()
def _normalize(query: str) -> str:
return " ".join(query.lower().strip().split())
async def _record_and_get_prior(
user_id: int, query: str
) -> _RecentSearch | None:
"""현재 쿼리를 트래커에 기록하고, 60초 이내 직전 쿼리(있으면)를 반환."""
now = time.monotonic()
normalized = _normalize(query)
async with _recent_lock:
prior = _recent.get(user_id)
# 60초 초과한 prior는 무효
if prior and (now - prior.ts) > REFORMULATION_WINDOW_SEC:
prior = None
_recent[user_id] = _RecentSearch(query=query, normalized=normalized, ts=now)
# 단순 상한 정리 (oldest 절반 제거)
if len(_recent) > TRACKER_MAX_USERS:
stale = sorted(_recent.items(), key=lambda kv: kv[1].ts)[: TRACKER_MAX_USERS // 2]
for uid, _ in stale:
_recent.pop(uid, None)
return prior
# ─── confidence 휴리스틱 ─────────────────────────────────
def compute_confidence(results: list[Any], mode: str) -> float:
"""검색 결과로부터 confidence(0..1)를 휴리스틱으로 산정.
Phase 0.3 임시 구현. Phase 2에서 QueryAnalyzer 결과 + reranker score로 교체.
하이브리드/텍스트 모드는 score가 가중치 합산이라 unbounded → match_reason과 결합.
벡터 모드는 score가 코사인 유사도(0..1)라 그대로 사용.
"""
if not results:
return 0.0
top = results[0]
top_score = float(getattr(top, "score", 0.0) or 0.0)
reason = (getattr(top, "match_reason", "") or "").lower()
if mode == "vector":
# 코사인 유사도 그대로
return max(0.0, min(1.0, top_score))
# text / hybrid: match_reason 강도 + score를 함께 본다
# search.py의 가중치: title=3.0, tags=2.5, note=2.0, summary=1.5, content=1.0, fts bonus=2.0
# vector boost(hybrid 합산)는 +0.5*cosine
if "title" in reason and top_score >= 4.0:
return 0.95
if any(k in reason for k in ("tags", "note")) and top_score >= 3.0:
return 0.85
if "summary" in reason and top_score >= 2.5:
return 0.75
if "content" in reason and top_score >= 2.0:
return 0.65
if "fts" in reason and top_score >= 1.0:
return 0.55
if "vector" in reason:
# vector-only hit (텍스트 매칭 실패) → 코사인 유사도 환산
# hybrid 합산 시 vector 단독 점수는 score * 0.5로 들어옴
cosine = top_score / 0.5 if top_score < 1.0 else top_score
return max(0.2, min(0.6, cosine * 0.7))
# 약한 매치
return 0.3
# ─── 로깅 진입점 ─────────────────────────────────────────
async def _insert_log(
query: str,
user_id: int | None,
result_count: int,
confidence: float | None,
failure_reason: str,
context: dict[str, Any] | None,
) -> None:
"""단독 세션으로 INSERT (background task에서 호출되므로 request 세션 사용 불가)."""
try:
async with async_session() as session:
row = SearchFailureLog(
query=query,
user_id=user_id,
result_count=result_count,
confidence=confidence,
failure_reason=failure_reason,
context=context,
)
session.add(row)
await session.commit()
except SQLAlchemyError as exc:
# 로깅 실패가 검색 자체를 깨뜨리지 않도록 흡수
logger.warning(f"failure log insert failed: {exc}")
def _build_context(
results: list[Any],
mode: str,
extra: dict[str, Any] | None = None,
) -> dict[str, Any]:
ctx: dict[str, Any] = {
"mode": mode,
"result_count": len(results),
"top_score": float(results[0].score) if results else None,
"top_match_reason": (results[0].match_reason if results else None),
"returned_ids": [r.id for r in results[:10]],
}
if extra:
ctx.update(extra)
return ctx
async def record_search_event(
query: str,
user_id: int | None,
results: list[Any],
mode: str,
) -> None:
"""검색 응답 직후 호출. 실패 트리거에 해당하면 로그 INSERT.
background task에서 await로 호출. request 세션과 분리.
user_id가 None이면 reformulation 추적 + 로깅 모두 스킵 (시스템 호출 등).
"""
if user_id is None:
return
confidence = compute_confidence(results, mode)
result_count = len(results)
base_ctx = _build_context(results, mode, extra={"confidence": confidence})
# ── 1) reformulation 체크 (이전 쿼리가 있으면 그걸 로깅) ──
prior = await _record_and_get_prior(user_id, query)
if prior and prior.normalized != _normalize(query):
await _insert_log(
query=prior.query,
user_id=user_id,
result_count=-1, # prior의 result_count는 알 수 없음(요청 세션 끝남)
confidence=None,
failure_reason="user_reformulated",
context={"reformulated_to": query, "elapsed_sec": time.monotonic() - prior.ts},
)
# ── 2) 현재 쿼리에 대한 실패 트리거 ──
if result_count == 0:
await _insert_log(
query=query,
user_id=user_id,
result_count=0,
confidence=0.0,
failure_reason="no_result",
context=base_ctx,
)
return
if confidence < LOW_CONFIDENCE_THRESHOLD:
await _insert_log(
query=query,
user_id=user_id,
result_count=result_count,
confidence=confidence,
failure_reason="low_confidence",
context=base_ctx,
)