refactor(search): Phase 2.1 QueryAnalyzer를 async-only 구조로 전환
## 철학 수정 (실측 기반)
gemma-4-26b-a4b-it-8bit MLX 실측:
- full query_analyze.txt (prompt_tok=2406) → 10.5초
- max_tokens 축소 무효 (모델 자연 EOS 조기 종료)
- 쿼리 길이 영향 거의 없음 (프롬프트 자체가 지배)
→ 800ms timeout 가정은 13배 초과. 동기 호출 완전히 불가능.
따라서 QueryAnalyzer는 "즉시 실행하는 기능" → "미리 준비해두는 기능"으로
포지셔닝 변경. retrieval 경로에서 analyzer 동기 호출 **금지**.
## 구조
```
query → retrieval (항상 즉시)
↘ trigger_background_analysis (fire-and-forget)
→ analyze() [5초+] → cache 저장
다음 호출 (동일 쿼리) → get_cached() 히트 → Phase 2 파이프라인 활성화
```
## 변경 사항
### app/prompts/query_analyze.txt
- 5971 chars → 2403 chars (40%)
- 예시 4개 → 1개, 규칙 설명 축약
- 목표 prompt_tok 2406 → ~600 (1/4)
### app/services/search/query_analyzer.py
- LLM_TIMEOUT_MS 800 → 5000 (background이므로 여유 OK)
- PROMPT_VERSION v1 → v2 (cache auto-invalidate)
- get_cached / set_cached 유지 — retrieval 경로 O(1) 조회
- trigger_background_analysis(query) 신규 — 동기 함수, 즉시 반환, task 생성
- _PENDING set으로 task 참조 유지 (premature GC 방지)
- _INFLIGHT set으로 동일 쿼리 중복 실행 방지
- prewarm_analyzer() 신규 — startup에서 15~20 쿼리 미리 분석
- DEFAULT_PREWARM_QUERIES: 평가셋 fixed 7 + 법령 3 + 뉴스 2 + 실무 3
### app/api/search.py
- 기존 sync analyzer 호출 완전 제거
- analyze=True → get_cached(q) 조회만 O(1)
- hit: query_analysis 활용 (Phase 2.2/2.3 파이프라인 조건부 활성화)
- miss: trigger_background_analysis(q) + 기존 경로 그대로
- timing["analyze_ms"] 제거 (경로에 LLM 호출 없음)
- notes에 analyzer cache_hit/cache_miss 상태 기록
- debug.query_analysis는 cache hit 시에만 채워짐
### app/main.py
- lifespan startup에 prewarm_analyzer() background task 추가
- 논블로킹 — 앱 시작 막지 않음
- delay_between=0.5로 MLX 부하 완화
## 기대 효과
- cold 요청 latency: 기존 Phase 1.3 그대로 (회귀 0)
- warm 요청 + prewarmed: cache hit → query_analysis 활용
- 예상 cache hit rate: 초기 70~80% (prewarm) + 사용 누적
- Phase 2.2/2.3 multilingual/filter 기능은 cache hit 시에만 동작
## 참조
- memory: feedback_analyzer_async_only.md (영구 룰 저장)
- plan: ~/.claude/plans/zesty-painting-kahan.md ("철학 수정" 섹션)
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -1,19 +1,28 @@
|
||||
"""Query analyzer — 자연어 쿼리 분석 (Phase 2.1).
|
||||
"""Query analyzer — 자연어 쿼리 분석 (Phase 2.1, async-only 구조).
|
||||
|
||||
자연어 쿼리를 구조화된 분석 결과로 변환. 결과는 검색 보조용 (지배 X).
|
||||
**핵심 철학** (memory `feedback_analyzer_async_only.md`):
|
||||
> QueryAnalyzer는 "즉시 실행하는 기능"이 아니라 "미리 준비해두는 기능"이다.
|
||||
|
||||
Pipeline:
|
||||
1. in-memory LRU 캐시 조회
|
||||
2. miss → LLM 호출 (primary 모델, asyncio.timeout 강제)
|
||||
3. JSON 파싱 → weight 정규화 → 캐시 저장
|
||||
4. 실패/저신뢰 → `{"analyzer_confidence": 0.0}` (fallback 트리거)
|
||||
Retrieval 경로에서 analyzer를 **동기 호출 금지**.
|
||||
동기 호출 가능한 API는 prewarm 전용.
|
||||
|
||||
CRITICAL 룰 (plan 영구):
|
||||
- `LLM_TIMEOUT_MS = 800` 절대 늘리지 말 것. MLX 멈춤 시 검색 전체 멈춤 방지.
|
||||
- `MAX_NORMALIZED_QUERIES = 3` 절대 늘리지 말 것. multilingual explosion 방지.
|
||||
- weight 합 = 1.0 정규화 필수. fusion 왜곡 방지.
|
||||
- 실패/저신뢰(< 0.5) 결과는 캐시 금지. 잘못된 분석 고정 방지.
|
||||
- 호출자는 항상 `result.get("analyzer_confidence", 0.0)` 방어 패턴 사용.
|
||||
## Pipeline
|
||||
|
||||
```
|
||||
query → retrieval (항상 즉시)
|
||||
↘ trigger_background_analysis (fire-and-forget)
|
||||
→ analyze() [5초+] → cache 저장
|
||||
|
||||
다음 호출 (동일 쿼리) → get_cached() 히트 → Phase 2 파이프라인 활성화
|
||||
```
|
||||
|
||||
## 룰 (plan 영구)
|
||||
- `LLM_TIMEOUT_MS = 5000` (background 이므로 여유 OK)
|
||||
- `MAX_NORMALIZED_QUERIES = 3` (multilingual explosion 방지)
|
||||
- weight 합 = 1.0 정규화 필수 (fusion 왜곡 방지)
|
||||
- 실패/저신뢰(< 0.5) 결과는 캐시 금지 (잘못된 분석 고정 방지)
|
||||
- `analyzer_confidence` default `float 0.0` 강제 (None 금지)
|
||||
- analyze() 동기 호출 금지. retrieval 경로는 `get_cached()` + `trigger_background_analysis()` 만 사용.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
@@ -30,20 +39,16 @@ from core.config import settings
|
||||
logger = logging.getLogger("query_analyzer")
|
||||
|
||||
# ─── 상수 (plan 영구 룰) ────────────────────────────────
|
||||
PROMPT_VERSION = "v1" # prompts/query_analyze.txt 변경 시 갱신
|
||||
PROMPT_VERSION = "v2" # prompts/query_analyze.txt 축소판
|
||||
CACHE_TTL = 86400 # 24h
|
||||
CACHE_MAXSIZE = 1000
|
||||
LLM_TIMEOUT_MS = 800 # 절대 늘리지 말 것 (plan 영구)
|
||||
LLM_TIMEOUT_MS = 5000 # async 구조 (background), 동기 경로 금지
|
||||
MIN_CACHE_CONFIDENCE = 0.5 # 이 미만은 캐시 금지
|
||||
MAX_NORMALIZED_QUERIES = 3 # 절대 늘리지 말 것 (plan 영구)
|
||||
MAX_NORMALIZED_QUERIES = 3
|
||||
|
||||
|
||||
def _model_version() -> str:
|
||||
"""현재 primary 모델 ID를 캐시 키에 반영.
|
||||
|
||||
런타임에 config.yaml이 바뀌어도 재시작 후 자동 반영.
|
||||
config.yaml 없을 경우 sentinel 문자열.
|
||||
"""
|
||||
"""현재 primary 모델 ID를 캐시 키에 반영."""
|
||||
if settings.ai and settings.ai.primary:
|
||||
return settings.ai.primary.model
|
||||
return "unknown-model"
|
||||
@@ -52,6 +57,11 @@ def _model_version() -> str:
|
||||
# ─── in-memory LRU (FIFO 근사) ──────────────────────────
|
||||
_CACHE: dict[str, dict[str, Any]] = {}
|
||||
|
||||
# background task 참조 유지 (premature GC 방지)
|
||||
_PENDING: set[asyncio.Task[Any]] = set()
|
||||
# 동일 쿼리 중복 실행 방지 (진행 중인 쿼리 집합)
|
||||
_INFLIGHT: set[str] = set()
|
||||
|
||||
|
||||
def _cache_key(query: str) -> str:
|
||||
raw = f"{query}|{PROMPT_VERSION}|{_model_version()}"
|
||||
@@ -59,7 +69,10 @@ def _cache_key(query: str) -> str:
|
||||
|
||||
|
||||
def get_cached(query: str) -> dict | None:
|
||||
"""TTL 경과 entry는 자동 삭제. 없으면 None."""
|
||||
"""TTL 경과 entry는 자동 삭제. 없으면 None.
|
||||
|
||||
retrieval 경로에서 cache hit 판단용으로 호출. 호출 자체는 O(1).
|
||||
"""
|
||||
key = _cache_key(query)
|
||||
entry = _CACHE.get(key)
|
||||
if not entry:
|
||||
@@ -85,7 +98,6 @@ def set_cached(query: str, value: dict) -> None:
|
||||
_CACHE[key] = {"value": value, "ts": time.time()}
|
||||
return
|
||||
if len(_CACHE) >= CACHE_MAXSIZE:
|
||||
# 가장 먼저 추가된 항목 제거 (dict insertion order 활용)
|
||||
try:
|
||||
oldest = next(iter(_CACHE))
|
||||
_CACHE.pop(oldest, None)
|
||||
@@ -95,23 +107,22 @@ def set_cached(query: str, value: dict) -> None:
|
||||
|
||||
|
||||
def cache_stats() -> dict[str, int]:
|
||||
"""debug용 — 현재 캐시 크기."""
|
||||
return {"size": len(_CACHE), "maxsize": CACHE_MAXSIZE}
|
||||
"""debug/운영용 — 현재 캐시 크기 + inflight 수."""
|
||||
return {
|
||||
"size": len(_CACHE),
|
||||
"maxsize": CACHE_MAXSIZE,
|
||||
"inflight": len(_INFLIGHT),
|
||||
"pending_tasks": len(_PENDING),
|
||||
}
|
||||
|
||||
|
||||
# ─── weight 정규화 (fusion 왜곡 방지) ───────────────────
|
||||
def _normalize_weights(analysis: dict) -> dict:
|
||||
"""normalized_queries를 MAX_NORMALIZED_QUERIES로 자르고 weight 합=1.0 정규화.
|
||||
|
||||
- 리스트가 없거나 비어 있으면 그대로 반환
|
||||
- 각 항목의 weight가 숫자 아니면 1.0으로 치환
|
||||
- 합이 0이면 균등 분배
|
||||
"""
|
||||
"""normalized_queries를 MAX_NORMALIZED_QUERIES로 자르고 weight 합=1.0 정규화."""
|
||||
queries = analysis.get("normalized_queries")
|
||||
if not isinstance(queries, list) or not queries:
|
||||
return analysis
|
||||
|
||||
# sanitize + cap
|
||||
sanitized: list[dict] = []
|
||||
for q in queries[:MAX_NORMALIZED_QUERIES]:
|
||||
if not isinstance(q, dict):
|
||||
@@ -154,10 +165,7 @@ except FileNotFoundError:
|
||||
|
||||
# ─── 기본 fallback 응답 (None 금지) ─────────────────────
|
||||
def _fallback(reason: str | None = None) -> dict:
|
||||
"""LLM 실패/timeout/parse 실패 시 반환. analyzer_confidence는 반드시 float 0.0.
|
||||
|
||||
호출자는 `result.get("analyzer_confidence", 0.0)`로 방어.
|
||||
"""
|
||||
"""LLM 실패/timeout/parse 실패 시 반환. analyzer_confidence는 반드시 float 0.0."""
|
||||
result: dict[str, Any] = {
|
||||
"intent": "semantic_search",
|
||||
"query_type": "keyword",
|
||||
@@ -178,13 +186,18 @@ def _fallback(reason: str | None = None) -> dict:
|
||||
return result
|
||||
|
||||
|
||||
# ─── 메인 API ──────────────────────────────────────────
|
||||
# ─── 메인 LLM 호출 (내부 사용) ──────────────────────────
|
||||
async def analyze(query: str, ai_client: AIClient | None = None) -> dict:
|
||||
"""쿼리 분석 결과 반환. 실패 시 반드시 analyzer_confidence=0.0 fallback.
|
||||
"""쿼리 분석 결과 반환. 실패 시 analyzer_confidence=0.0 fallback.
|
||||
|
||||
**⚠️ 동기 검색 경로에서 직접 호출 금지**. 용도:
|
||||
- `trigger_background_analysis` 내부 호출
|
||||
- `prewarm_analyzer` startup 호출
|
||||
- 디버깅/테스트
|
||||
|
||||
Args:
|
||||
query: 사용자 쿼리 원문
|
||||
ai_client: AIClient 인스턴스 (호출자가 싱글톤으로 관리. None이면 생성)
|
||||
ai_client: AIClient 인스턴스 (없으면 생성 후 자동 close)
|
||||
|
||||
Returns:
|
||||
dict — 최소 `analyzer_confidence` 키는 항상 float로 존재.
|
||||
@@ -195,12 +208,11 @@ async def analyze(query: str, ai_client: AIClient | None = None) -> dict:
|
||||
if not ANALYZE_PROMPT:
|
||||
return _fallback("prompt_not_loaded")
|
||||
|
||||
# cache hit (고신뢰만 캐시되므로 그대로 반환)
|
||||
# cache hit 즉시 반환 (prewarm 재호출 방지)
|
||||
cached = get_cached(query)
|
||||
if cached is not None:
|
||||
return cached
|
||||
|
||||
# LLM 호출 — timeout 강제
|
||||
client_owned = False
|
||||
if ai_client is None:
|
||||
ai_client = AIClient()
|
||||
@@ -240,7 +252,6 @@ async def analyze(query: str, ai_client: AIClient | None = None) -> dict:
|
||||
|
||||
elapsed_ms = (time.perf_counter() - t_start) * 1000
|
||||
|
||||
# JSON 파싱
|
||||
parsed = parse_json_response(raw)
|
||||
if not isinstance(parsed, dict):
|
||||
logger.warning(
|
||||
@@ -251,14 +262,11 @@ async def analyze(query: str, ai_client: AIClient | None = None) -> dict:
|
||||
)
|
||||
return _fallback("parse_failed")
|
||||
|
||||
# 필수 필드 방어 — analyzer_confidence는 반드시 float
|
||||
try:
|
||||
conf = float(parsed.get("analyzer_confidence", 0.0) or 0.0)
|
||||
except (TypeError, ValueError):
|
||||
conf = 0.0
|
||||
parsed["analyzer_confidence"] = conf
|
||||
|
||||
# weight 정규화 (MAX_NORMALIZED_QUERIES + sum=1.0)
|
||||
parsed = _normalize_weights(parsed)
|
||||
|
||||
logger.info(
|
||||
@@ -270,6 +278,144 @@ async def analyze(query: str, ai_client: AIClient | None = None) -> dict:
|
||||
elapsed_ms,
|
||||
)
|
||||
|
||||
# 고신뢰만 캐시 저장
|
||||
set_cached(query, parsed)
|
||||
return parsed
|
||||
|
||||
|
||||
# ─── Background trigger (retrieval 경로에서 사용) ───────
|
||||
async def _background_analyze(query: str) -> None:
|
||||
"""Background task wrapper — inflight 집합 관리 + 예외 삼킴."""
|
||||
try:
|
||||
await analyze(query)
|
||||
except Exception as exc:
|
||||
logger.warning("background analyze crashed query=%r err=%r", query[:80], exc)
|
||||
finally:
|
||||
_INFLIGHT.discard(query)
|
||||
|
||||
|
||||
def trigger_background_analysis(query: str) -> bool:
|
||||
"""retrieval 경로에서 호출. cache miss 시 background task 생성.
|
||||
|
||||
- 동기 함수. 즉시 반환.
|
||||
- 이미 inflight 또는 cache hit이면 아무 작업 X, False 반환.
|
||||
- 새 task 생성 시 True 반환.
|
||||
|
||||
Returns:
|
||||
bool — task 실제로 생성되었는지 여부
|
||||
"""
|
||||
if not query or not query.strip():
|
||||
return False
|
||||
if query in _INFLIGHT:
|
||||
return False
|
||||
if get_cached(query) is not None:
|
||||
return False
|
||||
|
||||
try:
|
||||
loop = asyncio.get_running_loop()
|
||||
except RuntimeError:
|
||||
logger.warning("trigger_background_analysis called outside event loop")
|
||||
return False
|
||||
|
||||
_INFLIGHT.add(query)
|
||||
task = loop.create_task(_background_analyze(query))
|
||||
_PENDING.add(task)
|
||||
task.add_done_callback(_PENDING.discard)
|
||||
return True
|
||||
|
||||
|
||||
# ─── Prewarm (app startup) ──────────────────────────────
|
||||
# 운영에서 자주 발생하는 쿼리 샘플. 통계 기반으로 확장 예정.
|
||||
DEFAULT_PREWARM_QUERIES: list[str] = [
|
||||
# fixed 7 (Phase 2 평가셋 core)
|
||||
"산업안전보건법 제6장",
|
||||
"기계 사고 관련 법령",
|
||||
"AI 산업 동향",
|
||||
"Python async best practice",
|
||||
"유해화학물질을 다루는 회사가 지켜야 할 안전 의무",
|
||||
"recent AI safety news from Europe",
|
||||
"이세상에 존재하지 않는 문서명",
|
||||
# 법령 관련
|
||||
"산업안전보건법 시행령",
|
||||
"화학물질관리법",
|
||||
"위험성평가 절차",
|
||||
# 뉴스 관련
|
||||
"EU AI Act",
|
||||
"한국 AI 산업",
|
||||
# 실무
|
||||
"안전보건 교육 자료",
|
||||
"사고 조사 보고서",
|
||||
"MSDS 작성 방법",
|
||||
]
|
||||
|
||||
|
||||
async def prewarm_analyzer(
|
||||
queries: list[str] | None = None,
|
||||
delay_between: float = 0.5,
|
||||
) -> dict[str, Any]:
|
||||
"""app startup에서 호출. 대표 쿼리를 미리 분석해 cache에 적재.
|
||||
|
||||
Non-blocking으로 사용: `asyncio.create_task(prewarm_analyzer())`.
|
||||
|
||||
Args:
|
||||
queries: 분석할 쿼리 리스트. None이면 DEFAULT_PREWARM_QUERIES 사용.
|
||||
delay_between: 각 쿼리 간 대기 시간 (MLX 부하 완화).
|
||||
|
||||
Returns:
|
||||
dict — {queries_total, success, failed, elapsed_ms, cache_size_after}
|
||||
"""
|
||||
if not ANALYZE_PROMPT:
|
||||
logger.warning("prewarm skipped — prompt not loaded")
|
||||
return {"status": "skipped", "reason": "prompt_not_loaded"}
|
||||
|
||||
targets = queries if queries is not None else DEFAULT_PREWARM_QUERIES
|
||||
total = len(targets)
|
||||
success = 0
|
||||
failed = 0
|
||||
t_start = time.perf_counter()
|
||||
|
||||
logger.info("prewarm_analyzer start queries=%d timeout_ms=%d", total, LLM_TIMEOUT_MS)
|
||||
|
||||
client = AIClient()
|
||||
try:
|
||||
for i, q in enumerate(targets, 1):
|
||||
if get_cached(q) is not None:
|
||||
logger.info("prewarm skip (already cached) [%d/%d] %r", i, total, q[:40])
|
||||
success += 1
|
||||
continue
|
||||
result = await analyze(q, ai_client=client)
|
||||
conf = float(result.get("analyzer_confidence", 0.0) or 0.0)
|
||||
if conf >= MIN_CACHE_CONFIDENCE:
|
||||
success += 1
|
||||
logger.info("prewarm ok [%d/%d] conf=%.2f q=%r", i, total, conf, q[:40])
|
||||
else:
|
||||
failed += 1
|
||||
reason = result.get("_fallback_reason", "low_conf")
|
||||
logger.warning(
|
||||
"prewarm fail [%d/%d] reason=%s q=%r", i, total, reason, q[:40]
|
||||
)
|
||||
if delay_between > 0 and i < total:
|
||||
await asyncio.sleep(delay_between)
|
||||
finally:
|
||||
try:
|
||||
await client.close()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
elapsed_ms = (time.perf_counter() - t_start) * 1000
|
||||
stats = cache_stats()
|
||||
logger.info(
|
||||
"prewarm_analyzer done total=%d success=%d failed=%d elapsed_ms=%.0f cache_size=%d",
|
||||
total,
|
||||
success,
|
||||
failed,
|
||||
elapsed_ms,
|
||||
stats["size"],
|
||||
)
|
||||
return {
|
||||
"status": "complete",
|
||||
"queries_total": total,
|
||||
"success": success,
|
||||
"failed": failed,
|
||||
"elapsed_ms": elapsed_ms,
|
||||
"cache_size_after": stats["size"],
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user