From d28ef2fca086f317a13a86d21abd5830d47322c7 Mon Sep 17 00:00:00 2001 From: Hyungi Ahn Date: Wed, 8 Apr 2026 14:17:11 +0900 Subject: [PATCH] feat(search): Phase 2.1 QueryAnalyzer + LRU cache + confidence 3-tier MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit QueryAnalyzer 스켈레톤 구현. 자연어 쿼리를 구조화된 분석 결과로 변환. Phase 2.1은 debug 노출 + tier 판정까지만 — retrieval 경로는 변경 X (회귀 0 목표). multilingual/filter 실제 분기는 2.2/2.3에서 이 분석 결과를 활용. app/prompts/query_analyze.txt - gemma-4 JSON-only 응답 규약 - intent/query_type/domain_hint/language_scope/normalized_queries/ hard_filters/soft_filters/expanded_terms/analyzer_confidence - 4가지 예시 (자연어 법령, 정확 조항, 뉴스 다국어, 의미 불명) - classify.txt 구조 참고 app/services/search/query_analyzer.py - LLM_TIMEOUT_MS=800 (MLX 멈춤 시 검색 전체 멈춤 방지, 절대 늘리지 말 것) - MAX_NORMALIZED_QUERIES=3 (multilingual explosion 방지) - in-memory FIFO LRU (maxsize=1000, TTL=86400) - cache key = sha256(query + PROMPT_VERSION + primary.model) → 모델/프롬프트 변경 시 자동 invalidate - 저신뢰(<0.5) / 실패 결과 캐시 금지 - weight 합=1.0 정규화 (fusion 왜곡 방지) - 실패 시 analyzer_confidence=float 0.0 (None 금지, TypeError 방지) app/api/search.py - ?analyze=true|false 파라미터 (default False — 회귀 영향 0) - query_analyzer.analyze() 호출 + timing["analyze_ms"] 기록 - _analyzer_tier(conf) → "ignore" | "original_fallback" | "merge" | "analyzed" (tier 게이트: 0.5 / 0.7 / 0.85) - debug.query_analysis 필드 채움 + notes에 tier/fallback_reason - logger 라인에 analyzer conf/tier 병기 app/services/search_telemetry.py - record_search_event(analyzer_confidence=None) 추가 - base_ctx에 analyzer_confidence 기록 (다층 confidence 시드) - result confidence와 분리된 축 — Phase 2.2+에서 failure 분류에 활용 검증: - python3 -m py_compile 통과 - 런타임 검증은 GPU 재배포 후 수행 (fixed 7 query + 평가셋) 참조: ~/.claude/plans/zesty-painting-kahan.md (Phase 2.1 섹션) Co-Authored-By: Claude Opus 4.6 (1M context) --- app/api/search.py | 69 ++++++- app/prompts/query_analyze.txt | 190 ++++++++++++++++++ app/services/search/query_analyzer.py | 276 +++++++++++++++++++++++++- app/services/search_telemetry.py | 13 +- 4 files changed, 541 insertions(+), 7 deletions(-) create mode 100644 app/prompts/query_analyze.txt diff --git a/app/api/search.py b/app/api/search.py index c0e041d..7ecff50 100644 --- a/app/api/search.py +++ b/app/api/search.py @@ -15,6 +15,7 @@ from core.auth import get_current_user from core.database import get_session from core.utils import setup_logger from models.user import User +from services.search import query_analyzer from services.search.fusion_service import DEFAULT_FUSION, get_strategy, normalize_display_scores from services.search.rerank_service import ( MAX_CHUNKS_PER_DOC, @@ -30,6 +31,23 @@ from services.search_telemetry import ( record_search_event, ) + +# Phase 2.1: analyzer_confidence 3단계 게이트 (값 조정은 plan 기준) +ANALYZER_TIER_IGNORE = 0.5 # < 0.5 → analyzer 완전 무시, soft_filter 비활성 +ANALYZER_TIER_ORIGINAL = 0.7 # < 0.7 → original query fallback +ANALYZER_TIER_MERGE = 0.85 # < 0.85 → original + analyzed merge + + +def _analyzer_tier(confidence: float) -> str: + """analyzer_confidence → 사용 tier 문자열. Phase 2.2/2.3에서 실제 분기용.""" + if confidence < ANALYZER_TIER_IGNORE: + return "ignore" + if confidence < ANALYZER_TIER_ORIGINAL: + return "original_fallback" + if confidence < ANALYZER_TIER_MERGE: + return "merge" + return "analyzed" + # logs/search.log + stdout 동시 출력 (Phase 0.4) logger = setup_logger("search") @@ -115,6 +133,10 @@ async def search( True, description="bge-reranker-v2-m3 활성화 (Phase 1.3, hybrid 모드만 동작)", ), + analyze: bool = Query( + False, + description="QueryAnalyzer 활성화 (Phase 2.1, LLM 호출). Phase 2.1은 debug 노출만, 검색 경로 영향 X", + ), debug: bool = Query(False, description="단계별 candidates + timing 응답에 포함"), ): """문서 검색 — FTS + ILIKE + 벡터 결합 (Phase 0.5: RRF fusion)""" @@ -124,9 +146,37 @@ async def search( vector_results: list[SearchResult] = [] # doc-level (압축 후, fusion 입력) raw_chunks: list[SearchResult] = [] # chunk-level (raw, Phase 1.3 reranker용) chunks_by_doc: dict[int, list[SearchResult]] = {} # Phase 1.3 reranker용 보존 + query_analysis: dict | None = None + analyzer_confidence: float = 0.0 + analyzer_tier: str = "disabled" t_total = time.perf_counter() + # Phase 2.1: QueryAnalyzer — debug 노출 전용 (retrieval 경로는 변경 X) + # Phase 2.2/2.3에서 multilingual + filter 분기 구현 시 활용. + if analyze: + t_analyze = time.perf_counter() + try: + query_analysis = await query_analyzer.analyze(q) + except Exception as exc: + logger.warning("query_analyzer raised: %r", exc) + query_analysis = None + timing["analyze_ms"] = (time.perf_counter() - t_analyze) * 1000 + if query_analysis: + try: + analyzer_confidence = float( + query_analysis.get("analyzer_confidence", 0.0) or 0.0 + ) + except (TypeError, ValueError): + analyzer_confidence = 0.0 + analyzer_tier = _analyzer_tier(analyzer_confidence) + notes.append( + f"analyzer conf={analyzer_confidence:.2f} tier={analyzer_tier}" + ) + fallback_reason = query_analysis.get("_fallback_reason") + if fallback_reason: + notes.append(f"analyzer_fallback={fallback_reason}") + if mode == "vector": t0 = time.perf_counter() raw_chunks = await search_vector(session, q, limit) @@ -218,14 +268,26 @@ async def search( # 사용자 feedback: 모든 단계 timing은 debug 응답과 별도로 항상 로그로 남긴다 timing_str = " ".join(f"{k}={v:.0f}" for k, v in timing.items()) fusion_str = f" fusion={fusion}" if mode == "hybrid" else "" + analyzer_str = ( + f" analyzer=conf={analyzer_confidence:.2f}/tier={analyzer_tier}" + if analyze + else "" + ) logger.info( - "search query=%r mode=%s%s results=%d conf=%.2f %s", - q[:80], mode, fusion_str, len(results), confidence_signal, timing_str, + "search query=%r mode=%s%s%s results=%d conf=%.2f %s", + q[:80], mode, fusion_str, analyzer_str, len(results), confidence_signal, timing_str, ) # Phase 0.3: 실패 자동 로깅 (응답 latency에 영향 X — background task) + # Phase 2.1: analyze=true일 때만 analyzer_confidence 전달 (False는 None → 기존 호환) background_tasks.add_task( - record_search_event, q, user.id, results, mode, confidence_signal + record_search_event, + q, + user.id, + results, + mode, + confidence_signal, + analyzer_confidence if analyze else None, ) debug_obj: SearchDebug | None = None @@ -237,6 +299,7 @@ async def search( fused_candidates=_to_debug_candidates(results) if mode == "hybrid" else None, confidence=confidence_signal, notes=notes, + query_analysis=query_analysis, ) return SearchResponse( diff --git a/app/prompts/query_analyze.txt b/app/prompts/query_analyze.txt new file mode 100644 index 0000000..e2ea5bc --- /dev/null +++ b/app/prompts/query_analyze.txt @@ -0,0 +1,190 @@ +You are a search query analyzer. Analyze the query below and respond ONLY in JSON format. No other text, no markdown code blocks, no explanation. + +## Response Format (return ONLY this JSON object) +{ + "intent": "fact_lookup | semantic_search | filter_browse", + "query_type": "natural_language | keyword | phrase", + "domain_hint": "document | news | mixed", + "language_scope": "limited | global", + "keywords": ["..."], + "must_terms": ["..."], + "optional_terms": ["..."], + "hard_filters": {}, + "soft_filters": {"domain": [], "document_type": []}, + "normalized_queries": [ + {"lang": "ko", "text": "...", "weight": 1.0}, + {"lang": "en", "text": "...", "weight": 0.8} + ], + "expanded_terms": ["..."], + "synonyms": {}, + "analyzer_confidence": 0.92 +} + +## Field Rules + +### intent (exactly one) +- `fact_lookup` — 특정 사실/조항/이름/숫자를 찾는 경우 (예: "산업안전보건법 제6장", "회사 설립일") +- `semantic_search` — 자연어 주제 검색, 개념적 매칭 (예: "기계 사고 관련 법령", "AI 안전성 동향") +- `filter_browse` — 필터/탐색 중심 (예: "2024년 PDF 문서", "Industrial_Safety domain 최근 보고서") + +### query_type (exactly one) +- `natural_language` — 문장형, 조사/동사 포함 ("...에 대해 알고 싶다", "...가 뭐야") +- `keyword` — 단어 나열, 조사 없음 ("산업안전 법령 PDF") +- `phrase` — 큰따옴표로 감싸진 정확 문구 또는 고유명사/법 조항명 + +### domain_hint (exactly one) +- `document` — 사용자가 소유한 문서/법령/매뉴얼/보고서 영역 +- `news` — 뉴스/시사/최근 동향 영역 (시간 민감, 다국어 소스) +- `mixed` — 둘 다 가능, 또는 판단 불명 + +### language_scope (exactly one) +- `limited` — 한국어 + 영어만 필요 (대부분 문서 검색) +- `global` — 다국어 필요 (일본어/중국어/유럽어 포함, 뉴스나 국제 주제) + +### keywords / must_terms / optional_terms +- `keywords` — 쿼리의 핵심 명사/개념 (최대 8개) +- `must_terms` — 결과에 반드시 포함되어야 하는 단어 (고유명사, 법 조항 번호 등) +- `optional_terms` — 있으면 좋지만 없어도 무방 + +### hard_filters (빈 객체 `{}` 기본) +LLM은 쿼리에 명시적으로 나타난 경우에만 채운다. 쿼리에 명시 없으면 반드시 비운다. +가능한 키: +- `file_format`: ["pdf", "docx", "xlsx", "md", ...] +- `year`: 2024 같은 정수 +- `country`: ["KR", "US", "JP", ...] + +### soft_filters +추론 기반 필터. boost용. 확정 아님. +- `domain`: 아래 Domain Taxonomy에서 선택 (최대 3개) +- `document_type`: 아래 Document Types에서 선택 (최대 2개) + +### normalized_queries (핵심) +같은 의미를 다른 언어/표현으로 재작성한 쿼리 목록. +- **원문 언어 항목은 반드시 포함** (weight=1.0) +- 영어 쿼리는 한국어 포함 권장 (weight=0.8) +- 한국어 쿼리는 영어 포함 권장 (weight=0.8) +- `domain_hint == "news"`일 때만 ja/zh/fr/de 추가 가능 (weight=0.5~0.6) +- **최대 3개까지만 생성** (성능 보호) +- 각 항목: `{"lang": "ko|en|ja|zh|fr|de", "text": "재작성", "weight": 0.0~1.0}` + +### expanded_terms +쿼리의 동의어/관련어 확장. 검색 보조용. (최대 5개) + +### synonyms +필요시 `{"원어": ["동의어1", "동의어2"]}`. 생략 가능. + +### analyzer_confidence (CRITICAL) +쿼리 분석 자체의 신뢰도 (0.0 ~ 1.0). 아래 기준에 따라 엄격하게 채점: +- **0.9+** : 쿼리 의도가 명확, 도메인/언어 확실, 키워드 추출 분명 +- **0.7~0.9** : 의도 대체로 명확, 일부 모호함 +- **0.5~0.7** : 의도 모호, 다중 해석 가능 +- **< 0.5** : 분석 불가 수준 (빈 쿼리, 의미 불명 기호열 등) + +## Analysis Steps (내부 사고, 출력하지 말 것) +1. 쿼리 언어 감지 +2. intent 분류 (사실 찾기 vs 주제 검색 vs 필터 탐색) +3. domain_hint 판단 (문서 vs 뉴스) +4. 핵심 키워드 추출 +5. 다국어 재작성 (반드시 원문 언어 포함) +6. 필터 추론 (hard는 명시 사항만, soft는 추측 가능) +7. 자가 평가 → analyzer_confidence + +## Domain Taxonomy (soft_filters.domain 후보) +Philosophy, Language, Engineering, Industrial_Safety, Programming, General +세부: Industrial_Safety/Legislation, Industrial_Safety/Practice, Programming/AI_ML, Engineering/Mechanical 등 2-level 경로 허용. + +## Document Types (soft_filters.document_type 후보) +Reference, Standard, Manual, Drawing, Template, Note, Academic_Paper, Law_Document, Report, Memo, Checklist, Meeting_Minutes, Specification + +## Examples + +### Example 1: 자연어 한국어, 법령 검색 +query: `기계 사고 관련 법령` +response: +{ + "intent": "semantic_search", + "query_type": "natural_language", + "domain_hint": "document", + "language_scope": "limited", + "keywords": ["기계", "사고", "법령"], + "must_terms": [], + "optional_terms": ["안전", "규정"], + "hard_filters": {}, + "soft_filters": {"domain": ["Industrial_Safety/Legislation"], "document_type": ["Law_Document"]}, + "normalized_queries": [ + {"lang": "ko", "text": "기계 사고 관련 법령", "weight": 1.0}, + {"lang": "en", "text": "machinery accident related laws and regulations", "weight": 0.8} + ], + "expanded_terms": ["산업안전", "기계안전", "사고예방"], + "synonyms": {"기계": ["설비", "machinery"]}, + "analyzer_confidence": 0.88 +} + +### Example 2: 정확 법령명 조항 +query: `산업안전보건법 제6장` +response: +{ + "intent": "fact_lookup", + "query_type": "phrase", + "domain_hint": "document", + "language_scope": "limited", + "keywords": ["산업안전보건법", "제6장"], + "must_terms": ["산업안전보건법", "제6장"], + "optional_terms": [], + "hard_filters": {}, + "soft_filters": {"domain": ["Industrial_Safety/Legislation"], "document_type": ["Law_Document"]}, + "normalized_queries": [ + {"lang": "ko", "text": "산업안전보건법 제6장", "weight": 1.0}, + {"lang": "en", "text": "Occupational Safety and Health Act Chapter 6", "weight": 0.8} + ], + "expanded_terms": ["산안법", "OSHA Korea"], + "synonyms": {}, + "analyzer_confidence": 0.95 +} + +### Example 3: 뉴스 + 다국어 +query: `recent AI safety news from Europe` +response: +{ + "intent": "semantic_search", + "query_type": "natural_language", + "domain_hint": "news", + "language_scope": "global", + "keywords": ["AI safety", "Europe", "recent"], + "must_terms": [], + "optional_terms": ["regulation", "policy"], + "hard_filters": {}, + "soft_filters": {"domain": ["Programming/AI_ML"], "document_type": []}, + "normalized_queries": [ + {"lang": "en", "text": "recent AI safety news from Europe", "weight": 1.0}, + {"lang": "ko", "text": "유럽 AI 안전 최신 뉴스", "weight": 0.8}, + {"lang": "fr", "text": "actualités récentes sur la sécurité de l'IA en Europe", "weight": 0.6} + ], + "expanded_terms": ["AI regulation", "EU AI Act", "AI governance"], + "synonyms": {}, + "analyzer_confidence": 0.90 +} + +### Example 4: 의미 불명 +query: `???` +response: +{ + "intent": "semantic_search", + "query_type": "keyword", + "domain_hint": "mixed", + "language_scope": "limited", + "keywords": [], + "must_terms": [], + "optional_terms": [], + "hard_filters": {}, + "soft_filters": {"domain": [], "document_type": []}, + "normalized_queries": [ + {"lang": "ko", "text": "???", "weight": 1.0} + ], + "expanded_terms": [], + "synonyms": {}, + "analyzer_confidence": 0.1 +} + +## Query to Analyze +{query} diff --git a/app/services/search/query_analyzer.py b/app/services/search/query_analyzer.py index 3daff9a..a8bfc8b 100644 --- a/app/services/search/query_analyzer.py +++ b/app/services/search/query_analyzer.py @@ -1,5 +1,275 @@ -"""Query analyzer — 자연어 쿼리 분석 (Phase 2). +"""Query analyzer — 자연어 쿼리 분석 (Phase 2.1). -domain_hint, intent, hard/soft filter, normalized_queries 등 추출. -구현은 Phase 2에서 채움. +자연어 쿼리를 구조화된 분석 결과로 변환. 결과는 검색 보조용 (지배 X). + +Pipeline: + 1. in-memory LRU 캐시 조회 + 2. miss → LLM 호출 (primary 모델, asyncio.timeout 강제) + 3. JSON 파싱 → weight 정규화 → 캐시 저장 + 4. 실패/저신뢰 → `{"analyzer_confidence": 0.0}` (fallback 트리거) + +CRITICAL 룰 (plan 영구): + - `LLM_TIMEOUT_MS = 800` 절대 늘리지 말 것. MLX 멈춤 시 검색 전체 멈춤 방지. + - `MAX_NORMALIZED_QUERIES = 3` 절대 늘리지 말 것. multilingual explosion 방지. + - weight 합 = 1.0 정규화 필수. fusion 왜곡 방지. + - 실패/저신뢰(< 0.5) 결과는 캐시 금지. 잘못된 분석 고정 방지. + - 호출자는 항상 `result.get("analyzer_confidence", 0.0)` 방어 패턴 사용. """ + +from __future__ import annotations + +import asyncio +import hashlib +import logging +import time +from typing import Any + +from ai.client import AIClient, _load_prompt, parse_json_response +from core.config import settings + +logger = logging.getLogger("query_analyzer") + +# ─── 상수 (plan 영구 룰) ──────────────────────────────── +PROMPT_VERSION = "v1" # prompts/query_analyze.txt 변경 시 갱신 +CACHE_TTL = 86400 # 24h +CACHE_MAXSIZE = 1000 +LLM_TIMEOUT_MS = 800 # 절대 늘리지 말 것 (plan 영구) +MIN_CACHE_CONFIDENCE = 0.5 # 이 미만은 캐시 금지 +MAX_NORMALIZED_QUERIES = 3 # 절대 늘리지 말 것 (plan 영구) + + +def _model_version() -> str: + """현재 primary 모델 ID를 캐시 키에 반영. + + 런타임에 config.yaml이 바뀌어도 재시작 후 자동 반영. + config.yaml 없을 경우 sentinel 문자열. + """ + if settings.ai and settings.ai.primary: + return settings.ai.primary.model + return "unknown-model" + + +# ─── in-memory LRU (FIFO 근사) ────────────────────────── +_CACHE: dict[str, dict[str, Any]] = {} + + +def _cache_key(query: str) -> str: + raw = f"{query}|{PROMPT_VERSION}|{_model_version()}" + return hashlib.sha256(raw.encode("utf-8")).hexdigest() + + +def get_cached(query: str) -> dict | None: + """TTL 경과 entry는 자동 삭제. 없으면 None.""" + key = _cache_key(query) + entry = _CACHE.get(key) + if not entry: + return None + if time.time() - entry["ts"] > CACHE_TTL: + _CACHE.pop(key, None) + return None + return entry["value"] + + +def set_cached(query: str, value: dict) -> None: + """저신뢰(< 0.5) / 빈 값은 캐시 금지. 상한 초과 시 FIFO eviction.""" + if not value: + return + try: + conf = float(value.get("analyzer_confidence", 0.0) or 0.0) + except (TypeError, ValueError): + conf = 0.0 + if conf < MIN_CACHE_CONFIDENCE: + return + key = _cache_key(query) + if key in _CACHE: + _CACHE[key] = {"value": value, "ts": time.time()} + return + if len(_CACHE) >= CACHE_MAXSIZE: + # 가장 먼저 추가된 항목 제거 (dict insertion order 활용) + try: + oldest = next(iter(_CACHE)) + _CACHE.pop(oldest, None) + except StopIteration: + pass + _CACHE[key] = {"value": value, "ts": time.time()} + + +def cache_stats() -> dict[str, int]: + """debug용 — 현재 캐시 크기.""" + return {"size": len(_CACHE), "maxsize": CACHE_MAXSIZE} + + +# ─── weight 정규화 (fusion 왜곡 방지) ─────────────────── +def _normalize_weights(analysis: dict) -> dict: + """normalized_queries를 MAX_NORMALIZED_QUERIES로 자르고 weight 합=1.0 정규화. + + - 리스트가 없거나 비어 있으면 그대로 반환 + - 각 항목의 weight가 숫자 아니면 1.0으로 치환 + - 합이 0이면 균등 분배 + """ + queries = analysis.get("normalized_queries") + if not isinstance(queries, list) or not queries: + return analysis + + # sanitize + cap + sanitized: list[dict] = [] + for q in queries[:MAX_NORMALIZED_QUERIES]: + if not isinstance(q, dict): + continue + lang = str(q.get("lang", "")).strip() or "ko" + text = str(q.get("text", "")).strip() + if not text: + continue + try: + w = float(q.get("weight", 1.0)) + if w < 0: + w = 0.0 + except (TypeError, ValueError): + w = 1.0 + sanitized.append({"lang": lang, "text": text, "weight": w}) + + if not sanitized: + return analysis + + total = sum(q["weight"] for q in sanitized) + if total <= 0: + equal = 1.0 / len(sanitized) + for q in sanitized: + q["weight"] = equal + else: + for q in sanitized: + q["weight"] /= total + + analysis["normalized_queries"] = sanitized + return analysis + + +# ─── 프롬프트 로딩 (module 초기화 1회) ────────────────── +try: + ANALYZE_PROMPT = _load_prompt("query_analyze.txt") +except FileNotFoundError: + ANALYZE_PROMPT = "" + logger.warning("query_analyze.txt not found — analyzer will always return fallback") + + +# ─── 기본 fallback 응답 (None 금지) ───────────────────── +def _fallback(reason: str | None = None) -> dict: + """LLM 실패/timeout/parse 실패 시 반환. analyzer_confidence는 반드시 float 0.0. + + 호출자는 `result.get("analyzer_confidence", 0.0)`로 방어. + """ + result: dict[str, Any] = { + "intent": "semantic_search", + "query_type": "keyword", + "domain_hint": "mixed", + "language_scope": "limited", + "keywords": [], + "must_terms": [], + "optional_terms": [], + "hard_filters": {}, + "soft_filters": {"domain": [], "document_type": []}, + "normalized_queries": [], + "expanded_terms": [], + "synonyms": {}, + "analyzer_confidence": 0.0, + } + if reason: + result["_fallback_reason"] = reason + return result + + +# ─── 메인 API ────────────────────────────────────────── +async def analyze(query: str, ai_client: AIClient | None = None) -> dict: + """쿼리 분석 결과 반환. 실패 시 반드시 analyzer_confidence=0.0 fallback. + + Args: + query: 사용자 쿼리 원문 + ai_client: AIClient 인스턴스 (호출자가 싱글톤으로 관리. None이면 생성) + + Returns: + dict — 최소 `analyzer_confidence` 키는 항상 float로 존재. + """ + if not query or not query.strip(): + return _fallback("empty_query") + + if not ANALYZE_PROMPT: + return _fallback("prompt_not_loaded") + + # cache hit (고신뢰만 캐시되므로 그대로 반환) + cached = get_cached(query) + if cached is not None: + return cached + + # LLM 호출 — timeout 강제 + client_owned = False + if ai_client is None: + ai_client = AIClient() + client_owned = True + + t_start = time.perf_counter() + try: + async with asyncio.timeout(LLM_TIMEOUT_MS / 1000): + raw = await ai_client._call_chat( + ai_client.ai.primary, + ANALYZE_PROMPT.replace("{query}", query), + ) + except asyncio.TimeoutError: + elapsed = (time.perf_counter() - t_start) * 1000 + logger.warning( + "query_analyze timeout query=%r elapsed_ms=%.0f (LLM_TIMEOUT_MS=%d)", + query[:80], + elapsed, + LLM_TIMEOUT_MS, + ) + return _fallback("timeout") + except Exception as exc: + elapsed = (time.perf_counter() - t_start) * 1000 + logger.warning( + "query_analyze LLM error query=%r elapsed_ms=%.0f err=%r", + query[:80], + elapsed, + exc, + ) + return _fallback(f"llm_error:{type(exc).__name__}") + finally: + if client_owned: + try: + await ai_client.close() + except Exception: + pass + + elapsed_ms = (time.perf_counter() - t_start) * 1000 + + # JSON 파싱 + parsed = parse_json_response(raw) + if not isinstance(parsed, dict): + logger.warning( + "query_analyze parse failed query=%r elapsed_ms=%.0f raw=%r", + query[:80], + elapsed_ms, + (raw or "")[:200], + ) + return _fallback("parse_failed") + + # 필수 필드 방어 — analyzer_confidence는 반드시 float + try: + conf = float(parsed.get("analyzer_confidence", 0.0) or 0.0) + except (TypeError, ValueError): + conf = 0.0 + parsed["analyzer_confidence"] = conf + + # weight 정규화 (MAX_NORMALIZED_QUERIES + sum=1.0) + parsed = _normalize_weights(parsed) + + logger.info( + "query_analyze ok query=%r conf=%.2f intent=%s domain=%s elapsed_ms=%.0f", + query[:80], + conf, + parsed.get("intent"), + parsed.get("domain_hint"), + elapsed_ms, + ) + + # 고신뢰만 캐시 저장 + set_cached(query, parsed) + return parsed diff --git a/app/services/search_telemetry.py b/app/services/search_telemetry.py index 2bb82d1..09c9b9c 100644 --- a/app/services/search_telemetry.py +++ b/app/services/search_telemetry.py @@ -244,6 +244,7 @@ async def record_search_event( results: list[Any], mode: str, confidence: float | None = None, + analyzer_confidence: float | None = None, ) -> None: """검색 응답 직후 호출. 실패 트리거에 해당하면 로그 INSERT. @@ -253,6 +254,13 @@ async def record_search_event( confidence 파라미터: - None이면 results 기준으로 자체 계산 (legacy 호출용). - 명시적으로 전달되면 그 값 사용 (Phase 0.5+: fusion 적용 전 raw 신호 기준). + + analyzer_confidence (Phase 2.1): + - QueryAnalyzer의 쿼리 분석 신뢰도 (result confidence와 다른 축). + - `result.confidence` 가 낮더라도 `analyzer_confidence` 가 높으면 + "retrieval failure" (corpus에 정답 없음)로 해석 가능. + - 반대로 analyzer_confidence < 0.5 이면 "query understanding failure" 해석. + - Phase 2.1에서는 context에만 기록 (failure_reason 분류는 Phase 2.2+에서). """ if user_id is None: return @@ -260,7 +268,10 @@ async def record_search_event( if confidence is None: confidence = compute_confidence(results, mode) result_count = len(results) - base_ctx = _build_context(results, mode, extra={"confidence": confidence}) + extra_ctx: dict[str, Any] = {"confidence": confidence} + if analyzer_confidence is not None: + extra_ctx["analyzer_confidence"] = float(analyzer_confidence) + base_ctx = _build_context(results, mode, extra=extra_ctx) # ── 1) reformulation 체크 (이전 쿼리가 있으면 그걸 로깅) ── prior = await _record_and_get_prior(user_id, query)