Files
hyungi_document_server/app/services/search/query_rewriter.py
hyungi a41adb63a0 fix(search): Phase 2Q variants bug fix + Phase 3 3 measurement 박제
Phase 3 cold 측정 1차에서 NDCG 0.033 catastrophic 발견 — 모든 query 에 동일 variants
반환. root cause = _call_llm 이 user 메시지 1개에 prompt template 전체 박음. LLM 이
actual query 인식 못 함. fixture request_body 형식 (system=prompt / user=query) 과
mismatch. fixture-first invariant 위반.

fix:
- app/services/search/query_rewriter.py _call_llm — system/user 메시지 분리.
  fixture request_body 와 단일 source-of-truth. _render_prompt 는 [deprecated] 유지.
- tests/test_query_rewriter.py — Phase 3 regression test 2:
  · _call_llm 가 system + user 분리 호출 verify (httpx.AsyncClient monkeypatch)
  · qwen backend = response_format 미사용 verify
- 32/32 unit test PASS.

Phase 3 측정 (fix 후 재측정, 51 case × 3 candidate × cold/warm = 5 run):
- baseline_rebaseline (rewrite_backend=null): NDCG 0.659 = Phase 2A 0.659, diff 0.000 PASS
- cand_multi_query_macmini cold: NDCG 0.927 (Δ +0.268), p50 2757ms / p95 9684ms
- cand_multi_query_macmini warm: NDCG 0.927 동일, p50 998ms (cache hit -64%)
- cand_multi_query_macbook cold: NDCG 0.919 (Δ +0.260), p50 3647ms / p95 5202ms
- cand_multi_query_macbook warm: NDCG 0.919 동일, p50 873ms (cache hit -76%)

핵심 약점 회복 (gemma / qwen):
- mixed 0.39 → 0.57 / 0.65
- korean_only 0.51 → 0.71 / 0.67
- standards 0.87 → 1.44 / 1.31
- exam 0.74 → 1.11 / 1.04

decision = H1 (both backends 유의미 net 개선). LLM 선택 = Phase 4 decision md 별 step.

산출물:
- reports/v0_2_phase2q_*.csv (5 raw run_eval output)
- tests/search_eval/baselines/v0_2_phase2q_results_2026-05-24.json (요약 + incident 박제)

follow-up:
- rerank 413 Payload Too Large 다수 관찰 (RRF fallback 작동, NDCG 영향 없음). Apply PR 전 별 chore — chunk dedup 또는 reranker batch cap 검토.
- p95 cold 9684ms 매우 큼. production rollout 시 cache prewarm 정책 필수.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-24 00:51:56 +00:00

287 lines
11 KiB
Python

"""Query rewriter — multi-query expansion (Phase 2Q Diagnose).
Phase 2Q Diagnose 의 dispatcher + cache + LLM call layer. retrieval 합성 (search_with_rewrite)
은 Phase 2 별 commit. 본 모듈은 scaffold = slug → variants[3] 변환만 담당.
## 핵심 룰 (plan v6 영구)
- ``Priority.FOREGROUND`` semaphore (retrieval inline path, user-facing).
- ``LLM_REWRITE_TIMEOUT_MS = 15000`` (fail-fast — background 와 다름).
- LLM 호출 실패 / parse fail / empty variants → cache 저장 X + caller 503 raise.
- baseline (slug=None) 호출은 LLM 우회 = ``None`` 반환.
- prompt template 1종 고정 (``app/prompts/query_rewrite.txt`` v1).
- raw endpoint URL query param X — slug-based allowlist (``LLM_BACKEND_MAP``).
"""
from __future__ import annotations
import asyncio
import hashlib
import json
import time
import unicodedata
from typing import Any
import httpx
from ai.client import _load_prompt, parse_json_response
from core.utils import setup_logger
from .llm_gate import Priority, acquire_mlx_gate
logger = setup_logger("query_rewriter")
# ─── 상수 (plan v6 영구 룰) ──────────────────────────────
PROMPT_VERSION = "v1" # prompts/query_rewrite.txt manual string. 변경 시 cache 자동 분리.
CACHE_TTL = 86400 # 24h
CACHE_MAXSIZE = 1000
LLM_REWRITE_TIMEOUT_MS = 15000 # retrieval inline path, fail-fast (B-3 background 와 다른 사유)
EXPECTED_N_VARIANTS = 3 # multi-query variant count, prompt v1 hardcoded
# ─── Backend allowlist (plan v6 §5.1) ────────────────────
# slug → backend cfg or None (baseline = no rewrite). sampling 박제 = fixture 와 단일 source.
LLM_BACKEND_MAP: dict[str, dict[str, Any] | None] = {
"baseline": None,
"cand_multi_query_macmini": {
"endpoint": "http://100.76.254.116:8801/v1/chat/completions",
"model": "gemma-4-26b-a4b-it-8bit",
"n_variants": 3,
"sampling": {
"temperature": 0.3,
"max_tokens": 256,
"response_format": {"type": "json_object"}, # MLX 호환 (Phase 0 inspect 9 PASS)
},
"auth": None,
},
"cand_multi_query_macbook": {
"endpoint": "http://100.118.112.84:8810/v1/chat/completions",
"model": "mlx-community/Qwen3.6-27B-8bit",
"n_variants": 3,
"sampling": {
"temperature": 0.3,
"max_tokens": 256,
# response_format 제거 — mlx-vlm.server json_object 미지원 (120s hang).
# prompt rule "Output STRICT JSON only" 강제 (Phase 0 inspect 9 박제).
},
"auth": None,
},
}
def _resolve_rewrite_backend(slug: str | None) -> dict[str, Any] | None:
"""slug → backend cfg or None (baseline). Raises ValueError on unknown slug."""
if slug is None or slug == "baseline":
return None
if slug not in LLM_BACKEND_MAP:
raise ValueError(f"unknown_rewrite_backend: {slug!r}")
return LLM_BACKEND_MAP[slug]
def allowed_slugs() -> list[str]:
"""HTTP 400 error 응답의 ``allowed`` 필드용. caller 가 사용."""
return list(LLM_BACKEND_MAP.keys())
# ─── In-memory cache (query_analyzer.py 패턴 1:1) ────────
_CACHE: dict[str, tuple[float, list[str]]] = {} # key → (expire_at, variants)
_CACHE_LOCK = asyncio.Lock()
def _cache_key(query: str, backend_slug: str) -> str:
canonical = unicodedata.normalize("NFKC", query.strip().lower())
raw = f"{canonical}|{backend_slug}|{PROMPT_VERSION}"
return hashlib.sha256(raw.encode("utf-8")).hexdigest()[:32]
async def _get_cached(key: str) -> list[str] | None:
"""TTL 경과 entry 는 lazy delete. 없으면 None."""
async with _CACHE_LOCK:
entry = _CACHE.get(key)
if entry is None:
return None
expire_at, variants = entry
if expire_at < time.time():
_CACHE.pop(key, None)
return None
return list(variants)
async def _set_cached(key: str, variants: list[str]) -> None:
"""LRU evict (FIFO 근사, query_analyzer 패턴)."""
async with _CACHE_LOCK:
if len(_CACHE) >= CACHE_MAXSIZE:
# oldest insert 1 entry evict (insertion order)
try:
oldest = next(iter(_CACHE))
_CACHE.pop(oldest, None)
except StopIteration:
pass
_CACHE[key] = (time.time() + CACHE_TTL, list(variants))
def cache_stats() -> dict[str, int]:
"""diagnostics 용 — current size + maxsize."""
return {"size": len(_CACHE), "maxsize": CACHE_MAXSIZE}
# ─── Prompt loading (lazy, 1회) ──────────────────────────
_PROMPT_TEMPLATE: str | None = None
def _get_prompt_template() -> str:
global _PROMPT_TEMPLATE
if _PROMPT_TEMPLATE is None:
_PROMPT_TEMPLATE = _load_prompt("query_rewrite.txt")
return _PROMPT_TEMPLATE
def _render_prompt(query: str) -> str:
"""[deprecated, fixture-first 패턴 후 unused] ``{query}`` placeholder 치환.
실제 LLM 호출은 ``_call_llm`` 에서 system/user 메시지 분리 (fixture invariant).
본 헬퍼는 호환성만 보존 — prompt template 에 ``{query}`` placeholder 없으면 no-op.
"""
return _get_prompt_template().replace("{query}", query)
# ─── Variant extraction (parser fallback) ────────────────
def _extract_variants(raw: str, expected_n: int) -> list[str] | None:
"""LLM 응답 raw text → variants list. parse_json_response (production layer) 재사용.
valid shape: ``{"variants": ["...", "...", "..."]}``.
크기 부족 / type mismatch / 빈 string → None (caller 가 cache 저장 X + 503).
"""
obj = parse_json_response(raw)
if obj is None:
return None
variants = obj.get("variants")
if not isinstance(variants, list) or len(variants) != expected_n:
return None
cleaned: list[str] = []
for v in variants:
if not isinstance(v, str):
return None
v_stripped = v.strip()
if not v_stripped:
return None
cleaned.append(v_stripped)
return cleaned
# ─── LLM call (httpx 직접, backends.py 패턴) ─────────────
async def _call_llm(cfg: dict[str, Any], query: str) -> str:
"""OpenAI 호환 chat/completions 호출. cfg = LLM_BACKEND_MAP entry.
호출 형식 = fixture 단일 source-of-truth:
- system 메시지 = prompt template (instruction)
- user 메시지 = query (rewrite 대상)
이전 implementation (user 메시지에 prompt 전체 박음) 은 모델이 actual query 인식 못 함
→ 모든 query 에 동일 response 반환하는 NDCG catastrophic 버그 (Phase 3 cold 측정에서 발견).
fixture 의 request_body 와 일치 = production 호출 형식.
Returns: raw response text (first choice message content).
Raises: httpx.* / KeyError / ValueError on protocol mismatch.
"""
system_prompt = _get_prompt_template()
payload: dict[str, Any] = {
"model": cfg["model"],
"messages": [
{"role": "system", "content": system_prompt},
{"role": "user", "content": query},
],
}
sampling = cfg.get("sampling") or {}
payload.update(sampling)
timeout_s = LLM_REWRITE_TIMEOUT_MS / 1000.0
async with httpx.AsyncClient(timeout=timeout_s) as client:
response = await client.post(cfg["endpoint"], json=payload)
response.raise_for_status()
data = response.json()
return data["choices"][0]["message"]["content"]
# ─── Public entry: rewrite() ─────────────────────────────
async def rewrite(query: str, backend_slug: str | None) -> list[str] | None:
"""Multi-query rewrite. 성공 시 variants list, baseline 시 None.
Args:
query: 원본 사용자 query
backend_slug: ``LLM_BACKEND_MAP`` key 또는 None/baseline
Returns:
list[str] of EXPECTED_N_VARIANTS items (변형 0번 = 원본 verbatim — prompt 정책)
또는 None (baseline = no rewrite, retrieval 은 single-query path).
Raises:
ValueError: unknown slug (caller 가 HTTP 400 으로 translate)
RuntimeError: LLM 호출 실패 / parse fail (caller 가 HTTP 503 으로 translate)
"""
cfg = _resolve_rewrite_backend(backend_slug)
if cfg is None:
return None
slug = backend_slug or "baseline"
key = _cache_key(query, slug)
cached = await _get_cached(key)
if cached is not None:
logger.info(
"[rewrite-dispatch] backend=%s n_variants=%d cache_hit=true "
"llm_endpoint=cached llm_model=cached llm_latency_ms=0 "
"rewrite_total_ms=0 query_hash=%s",
slug, len(cached), key[:8],
)
return cached
expected_n = int(cfg.get("n_variants", EXPECTED_N_VARIANTS))
started = time.monotonic()
llm_started = 0.0
llm_elapsed_ms = 0
try:
async with acquire_mlx_gate(Priority.FOREGROUND):
llm_started = time.monotonic()
raw = await _call_llm(cfg, query)
llm_elapsed_ms = int((time.monotonic() - llm_started) * 1000)
except httpx.HTTPError as e:
logger.warning(
"[rewrite-dispatch] backend=%s cache_hit=false error=http "
"detail=%s query_hash=%s", slug, type(e).__name__, key[:8],
)
raise RuntimeError(f"rewrite_llm_unavailable:{slug}:{type(e).__name__}") from e
except (KeyError, ValueError, json.JSONDecodeError) as e:
logger.warning(
"[rewrite-dispatch] backend=%s cache_hit=false error=protocol "
"detail=%s query_hash=%s", slug, type(e).__name__, key[:8],
)
raise RuntimeError(f"rewrite_llm_unavailable:{slug}:protocol") from e
variants = _extract_variants(raw, expected_n)
total_ms = int((time.monotonic() - started) * 1000)
if variants is None:
logger.warning(
"[rewrite-dispatch] backend=%s cache_hit=false error=parse "
"llm_latency_ms=%d rewrite_total_ms=%d query_hash=%s",
slug, llm_elapsed_ms, total_ms, key[:8],
)
raise RuntimeError(f"rewrite_llm_unavailable:{slug}:parse")
await _set_cached(key, variants)
logger.info(
"[rewrite-dispatch] backend=%s n_variants=%d cache_hit=false "
"llm_endpoint=%s llm_model=%s llm_latency_ms=%d "
"rewrite_total_ms=%d query_hash=%s",
slug, len(variants), cfg["endpoint"], cfg["model"],
llm_elapsed_ms, total_ms, key[:8],
)
for idx, text in enumerate(variants):
logger.info(
"[rewrite-variant] backend=%s query_hash=%s idx=%d text=%r",
slug, key[:8], idx, text[:120],
)
return variants