Files
hyungi_document_server/app/services/search_telemetry.py
Hyungi Ahn 09883d0358 feat(ask): Phase 3.5 A0 — ask_events source/eval_case_id + eval auth boundary
- migrations 138~142: source TEXT DEFAULT 'document_server' + eval_case_id TEXT
  추가, 인덱스 2개, backfill, 1주 관찰 후 NOT NULL (140 적용 분리)
- app/models/ask_event.py: source / eval_case_id ORM 필드 (138~141 단계 nullable)
- app/services/search_telemetry.py: record_ask_event 시그니처에 source / eval_case_id
- app/core/config.py: settings.eval_runner_token + EVAL_RUNNER_TOKEN env 로드
- app/api/search.py:
  - X-Source / X-Eval-Case-Id / X-Eval-Token 헤더 수신
  - _resolve_eval_identity(): hmac.compare_digest 로 token 검증, 실패 시 source
    'document_server' 강등 + warning log + eval_case_id=None
  - 두 record_ask_event 호출에 검증된 source/eval_case_id 전달
- credentials.env.example: EVAL_RUNNER_TOKEN= (empty default = 모든 eval claim 거부)
- tests/test_ask_eval_auth.py: 9 케이스 — token 없음/틀림/일치, env 미설정,
  case_id only, non-eval source forces case_id None

trust boundary: 일반 client 의 X-Source=eval / X-Eval-Case-Id 시도는 무시되어
calibration telemetry 오염 불가. eval runner 만 EVAL_RUNNER_TOKEN 으로 인증.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-17 08:11:06 +09:00

381 lines
14 KiB
Python

"""검색 실패 자동 로깅 (Phase 0.3)
목적: gold dataset 시드 수집. 평가셋 확장의 재료.
자동 수집 트리거:
1) result_count == 0 → no_result
2) confidence < THRESHOLD → low_confidence
3) 60초 내 동일 사용자 재쿼리 → user_reformulated (이전 쿼리 기록)
confidence는 Phase 0.3 시점엔 휴리스틱(top score + match_reason 기반).
Phase 2 QueryAnalyzer 도입 후 LLM 기반 confidence로 교체될 예정.
⚠ 단일 fastapi 워커 가정: recent_searches 트래커는 in-memory dict.
멀티 워커로 확장 시 user_reformulated 신호가 일부 손실되지만 정확성에는 영향 없음.
"""
from __future__ import annotations
import asyncio
import logging
import time
from dataclasses import dataclass
from typing import Any
from sqlalchemy.exc import SQLAlchemyError
from core.database import async_session
from models.ask_event import AskEvent
from models.search_failure import SearchFailureLog
logger = logging.getLogger("search_telemetry")
# ─── 튜닝 파라미터 ─────────────────────────────────────
LOW_CONFIDENCE_THRESHOLD = 0.5
REFORMULATION_WINDOW_SEC = 60.0
TRACKER_MAX_USERS = 1000 # 인메모리 트래커 상한 (LRU-ish 정리)
# ─── 인메모리 최근 쿼리 트래커 ─────────────────────────
@dataclass
class _RecentSearch:
query: str
normalized: str
ts: float # monotonic seconds
_recent: dict[int, _RecentSearch] = {}
_recent_lock = asyncio.Lock()
def _normalize(query: str) -> str:
return " ".join(query.lower().strip().split())
async def _record_and_get_prior(
user_id: int, query: str
) -> _RecentSearch | None:
"""현재 쿼리를 트래커에 기록하고, 60초 이내 직전 쿼리(있으면)를 반환."""
now = time.monotonic()
normalized = _normalize(query)
async with _recent_lock:
prior = _recent.get(user_id)
# 60초 초과한 prior는 무효
if prior and (now - prior.ts) > REFORMULATION_WINDOW_SEC:
prior = None
_recent[user_id] = _RecentSearch(query=query, normalized=normalized, ts=now)
# 단순 상한 정리 (oldest 절반 제거)
if len(_recent) > TRACKER_MAX_USERS:
stale = sorted(_recent.items(), key=lambda kv: kv[1].ts)[: TRACKER_MAX_USERS // 2]
for uid, _ in stale:
_recent.pop(uid, None)
return prior
# ─── confidence 휴리스틱 ─────────────────────────────────
def compute_confidence(results: list[Any], mode: str) -> float:
"""검색 결과로부터 confidence(0..1)를 휴리스틱으로 산정.
Phase 0.3 임시 구현. Phase 2에서 QueryAnalyzer 결과 + reranker score로 교체.
score 의미 정리 (search.py 기준):
- mode=vector → score = 코사인 유사도 [0..1]
- mode=fts/trgm/hybrid에서 텍스트 매치 → score = 가중치 합산 (unbounded)
가중치: title=3.0 / tags=2.5 / note=2.0 / summary=1.5 / content=1.0 / fts bonus≈2.0
- mode=hybrid에서 텍스트 0건 → 벡터 결과만, score는 코사인 그대로
- mode=hybrid 텍스트+벡터 동시 매치 → score = 텍스트가중치 + 0.5*코사인,
match_reason = "<텍스트reason>+vector"
핵심: match_reason이 정확히 'vector'(=문자열 "vector")면 텍스트 매치 0건인 vector-only.
이 경우 score는 raw 코사인이므로 amplify 금지.
"""
if not results:
return 0.0
top = results[0]
top_score = float(getattr(top, "score", 0.0) or 0.0)
reason = (getattr(top, "match_reason", "") or "").lower()
if mode == "vector":
# 코사인 유사도 그대로
return _cosine_to_confidence(top_score)
# hybrid에서 텍스트+벡터 합성 매치는 reason에 "+vector" 접미. 신뢰 가산.
has_vector_boost = "+vector" in reason
boost = 0.10 if has_vector_boost else 0.0
# text / hybrid: 강한 텍스트 매치 우선 판정.
# 임계값은 search.py의 가중치 합산 분포(텍스트base + FTS bonus + 0.5*cosine)를 반영.
if "title" in reason and top_score >= 3.5:
return min(1.0, 0.95 + boost)
if any(k in reason for k in ("tags", "note")) and top_score >= 2.5:
return min(1.0, 0.85 + boost)
if "summary" in reason and top_score >= 2.0:
return min(1.0, 0.75 + boost)
if "content" in reason and top_score >= 1.5:
return min(1.0, 0.65 + boost)
if "fts" in reason and top_score >= 1.0:
return min(1.0, 0.55 + boost)
# vector-only hit (텍스트 0건 → 코사인 raw, amplify 금지)
if reason == "vector":
return _cosine_to_confidence(top_score)
# 그 외(약한 매치 또는 알 수 없는 reason)
return 0.3
def _cosine_to_confidence(cosine: float) -> float:
"""bge-m3 임베딩 코사인 유사도 → confidence 환산.
bge-m3는 무관한 텍스트도 보통 0.3~0.5 정도 코사인을 만든다.
따라서 0.5는 "약하게 닮음", 0.7+는 "꽤 관련", 0.85+는 "매우 관련"으로 본다.
"""
if cosine >= 0.85:
return 0.95
if cosine >= 0.75:
return 0.80
if cosine >= 0.65:
return 0.65
if cosine >= 0.55:
return 0.50 # threshold 경계
if cosine >= 0.45:
return 0.35
if cosine >= 0.35:
return 0.20
return 0.10
def compute_confidence_reranked(reranked_results: list[Any]) -> float:
"""Phase 1.3 reranker score 기반 confidence.
bge-reranker-v2-m3는 sigmoid score (0~1 범위)를 반환.
rerank 활성 시 fusion score보다 reranker score가 가장 신뢰할 수 있는 신호.
임계값(초안, 실측 후 조정 가능):
>= 0.95 → high
>= 0.80 → med-high
>= 0.60 → med
>= 0.40 → low-med
else → low
"""
if not reranked_results:
return 0.0
top_score = float(getattr(reranked_results[0], "score", 0.0) or 0.0)
if top_score >= 0.95:
return 0.95
if top_score >= 0.80:
return 0.80
if top_score >= 0.60:
return 0.65
if top_score >= 0.40:
return 0.50
return 0.35
def compute_confidence_hybrid(
text_results: list[Any],
vector_results: list[Any],
) -> float:
"""hybrid 모드 confidence — fusion 적용 *전*의 raw text/vector 결과로 계산.
Phase 0.5에서 RRF 도입 후 fused score는 절대값 의미가 사라지므로,
원본 retrieval 신호의 더 강한 쪽을 confidence로 채택.
"""
text_conf = compute_confidence(text_results, "fts") if text_results else 0.0
vector_conf = (
compute_confidence(vector_results, "vector") if vector_results else 0.0
)
return max(text_conf, vector_conf)
# ─── 로깅 진입점 ─────────────────────────────────────────
async def _insert_log(
query: str,
user_id: int | None,
result_count: int,
confidence: float | None,
failure_reason: str,
context: dict[str, Any] | None,
) -> None:
"""단독 세션으로 INSERT (background task에서 호출되므로 request 세션 사용 불가)."""
try:
async with async_session() as session:
row = SearchFailureLog(
query=query,
user_id=user_id,
result_count=result_count,
confidence=confidence,
failure_reason=failure_reason,
context=context,
)
session.add(row)
await session.commit()
except SQLAlchemyError as exc:
# 로깅 실패가 검색 자체를 깨뜨리지 않도록 흡수
logger.warning(f"failure log insert failed: {exc}")
def _build_context(
results: list[Any],
mode: str,
extra: dict[str, Any] | None = None,
) -> dict[str, Any]:
ctx: dict[str, Any] = {
"mode": mode,
"result_count": len(results),
"top_score": float(results[0].score) if results else None,
"top_match_reason": (results[0].match_reason if results else None),
"returned_ids": [r.id for r in results[:10]],
}
if extra:
ctx.update(extra)
return ctx
async def record_search_event(
query: str,
user_id: int | None,
results: list[Any],
mode: str,
confidence: float | None = None,
analyzer_confidence: float | None = None,
) -> None:
"""검색 응답 직후 호출. 실패 트리거에 해당하면 로그 INSERT.
background task에서 await로 호출. request 세션과 분리.
user_id가 None이면 reformulation 추적 + 로깅 모두 스킵 (시스템 호출 등).
confidence 파라미터:
- None이면 results 기준으로 자체 계산 (legacy 호출용).
- 명시적으로 전달되면 그 값 사용 (Phase 0.5+: fusion 적용 전 raw 신호 기준).
analyzer_confidence (Phase 2.1):
- QueryAnalyzer의 쿼리 분석 신뢰도 (result confidence와 다른 축).
- `result.confidence` 가 낮더라도 `analyzer_confidence` 가 높으면
"retrieval failure" (corpus에 정답 없음)로 해석 가능.
- 반대로 analyzer_confidence < 0.5 이면 "query understanding failure" 해석.
- Phase 2.1에서는 context에만 기록 (failure_reason 분류는 Phase 2.2+에서).
"""
if user_id is None:
return
if confidence is None:
confidence = compute_confidence(results, mode)
result_count = len(results)
extra_ctx: dict[str, Any] = {"confidence": confidence}
if analyzer_confidence is not None:
extra_ctx["analyzer_confidence"] = float(analyzer_confidence)
base_ctx = _build_context(results, mode, extra=extra_ctx)
# ── 1) reformulation 체크 (이전 쿼리가 있으면 그걸 로깅) ──
prior = await _record_and_get_prior(user_id, query)
if prior and prior.normalized != _normalize(query):
await _insert_log(
query=prior.query,
user_id=user_id,
result_count=-1, # prior의 result_count는 알 수 없음(요청 세션 끝남)
confidence=None,
failure_reason="user_reformulated",
context={"reformulated_to": query, "elapsed_sec": time.monotonic() - prior.ts},
)
# ── 2) 현재 쿼리에 대한 실패 트리거 ──
if result_count == 0:
await _insert_log(
query=query,
user_id=user_id,
result_count=0,
confidence=0.0,
failure_reason="no_result",
context=base_ctx,
)
return
if confidence < LOW_CONFIDENCE_THRESHOLD:
await _insert_log(
query=query,
user_id=user_id,
result_count=result_count,
confidence=confidence,
failure_reason="low_confidence",
context=base_ctx,
)
# ─── /ask 전용 telemetry (Phase 3.5b) ─────────────────────
async def record_ask_event(
query: str,
user_id: int | None,
completeness: str | None,
synthesis_status: str | None,
confidence: str | None,
refused: bool,
classifier_verdict: str | None,
max_rerank_score: float,
aggregate_score: float,
hallucination_flags: list[str],
evidence_count: int,
citation_count: int,
defense_layers: dict[str, Any],
total_ms: int,
# Phase E.1: 측정 필드 확장
answer_length: int | None = None,
covered_aspects: list[str] | None = None,
missing_aspects: list[str] | None = None,
model_name: str | None = None,
prompt_version: str | None = None,
# Phase 3.5 calibration: source 분리 + golden join
source: str | None = None,
eval_case_id: str | None = None,
) -> None:
"""ask_events INSERT. background task에서 호출 — 에러 삼킴.
Phase E.1 확장 필드(키워드 전달 권장):
- answer_length: len(ai_answer or "") — 400→600자 효과 측정 핵심
- covered_aspects / missing_aspects: classifier 결과 그대로
- model_name: resolve_primary_model() 또는 호출사이트 명시
- prompt_version: ASK_PROMPT_VERSION 상수
Phase 3.5 calibration:
- source: sanitize_source(X-Source 헤더) — eval/ui_search/ui_detail/...
- eval_case_id: X-Eval-Case-Id 헤더 (eval 호출만 채움)
"""
try:
async with async_session() as session:
row = AskEvent(
query=query,
user_id=user_id,
completeness=completeness,
synthesis_status=synthesis_status,
confidence=confidence,
refused=refused,
classifier_verdict=classifier_verdict,
max_rerank_score=max_rerank_score,
aggregate_score=aggregate_score,
hallucination_flags=hallucination_flags,
evidence_count=evidence_count,
citation_count=citation_count,
defense_layers=defense_layers,
total_ms=total_ms,
answer_length=answer_length,
covered_aspects=covered_aspects,
missing_aspects=missing_aspects,
model_name=model_name,
prompt_version=prompt_version,
source=source,
eval_case_id=eval_case_id,
)
session.add(row)
await session.commit()
except SQLAlchemyError as exc:
logger.warning(f"ask_event insert failed: {exc}")