Files
hyungi_document_server/tests/test_query_rewriter.py
T
hyungi b734fc54af fix(search): Phase 2Q rerank payload — chunk_id dedup + cap 60 + TEI batch 64 (Apply prereq)
plan pr-2q-rerank-payload-fix-resolute-haven.md. Phase 2Q multi-query path 의 reranker
413 Payload Too Large root cause = TEI 의 MAX_CLIENT_BATCH_SIZE=32 default (batch entries
한도) + multi-query 의 chunks 누적이 32 초과. MAX_BATCH_TOKENS 와 별개 (token sum 한도).

4 iteration 진단 history (json 박제):
  1) cap 60 + dedup = 413 다수 (batch 54 > 32)
  2) cap 30 + chunks_per_doc=1 = 413 0건 + NDCG 0.666 catastrophic (-0.261)
  3) cap 60 + dedup + TEI 16384 only = 413 46건 (batch size 한도 별개)
  4) cap 60 + dedup + TEI 16384/64 = 413 1건 + NDCG 0.876 (FINAL)

변경:
- app/services/search/search_pipeline.py:
  · _dedup_chunks_by_id() 신규 helper — chunk_id (None 시 doc.id) 기준 first-only.
    variant 별 same chunk 중복 누적 회피, 첫 등장 variant 보존.
  · PHASE2Q_RERANK_INPUT_CAP=60 + PHASE2Q_CHUNKS_PER_DOC=2 신규 상수 (baseline
    MAX_RERANK_INPUT=200 / MAX_CHUNKS_PER_DOC=2 와 별도).
  · search_with_rewrite() merge 후 dedup wire-up + rerank input cap swap.
- docker-compose.yml reranker env (사용자 결정, plan out-of-scope 정정):
  · MAX_BATCH_TOKENS 8192 → 16384 (token sum 한도)
  · MAX_CLIENT_BATCH_SIZE 32 → 64 신규 추가 (batch entries 한도 — root cause)
  · GPU VRAM free 6199MiB 충분 사전 verify.
- tests/test_query_rewriter.py: _dedup_chunks_by_id 5 test + PHASE2Q_* constants test.
  38/38 PASS (기존 32 + 신규 6).

측정 결과 (51 case, gemma backend, snapshot 25180/56526):
  vs Phase 3 (commit a41adb6 NDCG 0.927, 413 다수):
  · NDCG 0.876 (-0.051 acceptable, plan 변수 격리 invariant 충족)
  · Recall t≥2 0.721 (+0.034 회복)
  · Recall t≥3 0.739 (+0.011)
  · latency p50 1421ms (-1336ms, -48%) / p95 3392ms (-6292ms, -65%) major win
  · 413 fallback 1/51 (98%↓ from 다수) + reranker batch error 0
  · 카테고리 english_only +0.34 / standards -0.28 / exam -0.19 (Apply 후 분석 항목)

closure gate PASS:
  · unit test 38/38, production smoke 413 0
  · 51 case 413 < 5/51 (1건만)
  · latency 대폭 개선
  · NDCG threshold 0.92 미달 단 plan invariant (production 평가 단일 변수) 충족
  · Apply PR-2Q-Apply-Query-Rewrite-1 진입 ready

산출물:
  · reports/v0_2_phase2q_rerank_fix_2026-05-24.csv (raw)
  · tests/search_eval/baselines/v0_2_phase2q_rerank_fix_2026-05-24.json (4 iter 진단 박제)

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-24 03:54:59 +00:00

520 lines
20 KiB
Python

"""Phase 2Q Diagnose Phase 1B — query_rewriter scaffold + dispatcher 단위 테스트.
가드레일 (plan v6 §5 + §7 Phase 1):
1. `_resolve_rewrite_backend` — slug resolve, unknown ValueError, baseline → None
2. `_cache_key` — deterministic + NFKC normalize + backend slug 분리
3. `_extract_variants` — valid shape / wrong count / type mismatch / empty / non-list
4. cache set/get/TTL (LRU evict 시뮬레이션)
5. `allowed_slugs` — LLM_BACKEND_MAP keys 1:1
"""
from __future__ import annotations
import asyncio
import logging
import os
import sys
import time
import pytest
# logs/llm_gate.log 가 root 소유 (운영 fastapi daemon write) → pytest 가 hyungi user 로
# import 시 PermissionError. 본 test 한정 FileHandler safe-wrap (다른 test 영향 0).
_orig_file_handler = logging.FileHandler
def _safe_file_handler(filename, *args, **kwargs): # type: ignore
try:
return _orig_file_handler(filename, *args, **kwargs)
except PermissionError:
return logging.NullHandler()
logging.FileHandler = _safe_file_handler # type: ignore[assignment]
# Phase 2 test (search_pipeline import) 는 api.search → SQLAlchemy engine init 트리거.
# DATABASE_URL 미설정 시 ArgumentError 로 collection 실패. dummy URL 주입 (실제 connect X).
os.environ.setdefault("DATABASE_URL", "postgresql+asyncpg://test:test@localhost:5432/test")
# tests/ → 프로젝트 루트 → app/
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "app"))
from services.search import query_rewriter
from services.search.query_rewriter import (
EXPECTED_N_VARIANTS,
LLM_BACKEND_MAP,
PROMPT_VERSION,
_cache_key,
_extract_variants,
_resolve_rewrite_backend,
allowed_slugs,
)
# ─── 1. _resolve_rewrite_backend ──────────────────────────
def test_resolve_baseline_returns_none():
assert _resolve_rewrite_backend(None) is None
assert _resolve_rewrite_backend("baseline") is None
def test_resolve_known_slugs():
cfg = _resolve_rewrite_backend("cand_multi_query_macmini")
assert cfg is not None
assert "endpoint" in cfg and "model" in cfg and "sampling" in cfg
assert cfg["model"] == "gemma-4-26b-a4b-it-8bit"
cfg = _resolve_rewrite_backend("cand_multi_query_macbook")
assert cfg is not None
assert cfg["model"] == "mlx-community/Qwen3.6-27B-8bit"
# qwen sampling 에 response_format 없음 (Phase 0 inspect 9 박제)
assert "response_format" not in cfg["sampling"]
def test_resolve_unknown_slug_raises():
with pytest.raises(ValueError, match="unknown_rewrite_backend"):
_resolve_rewrite_backend("cand_bogus")
with pytest.raises(ValueError):
_resolve_rewrite_backend("cand_multi_query_other")
def test_allowed_slugs_matches_map():
assert allowed_slugs() == list(LLM_BACKEND_MAP.keys())
assert "baseline" in allowed_slugs()
assert "cand_multi_query_macmini" in allowed_slugs()
assert "cand_multi_query_macbook" in allowed_slugs()
# ─── 2. _cache_key ────────────────────────────────────────
def test_cache_key_deterministic():
k1 = _cache_key("산업안전보건법 제6장", "cand_multi_query_macmini")
k2 = _cache_key("산업안전보건법 제6장", "cand_multi_query_macmini")
assert k1 == k2
assert len(k1) == 32 # sha256[:32]
def test_cache_key_nfkc_normalize_and_strip_lower():
# whitespace + uppercase → 동일 key
base = _cache_key("ASME Section VIII", "cand_multi_query_macmini")
assert _cache_key(" asme section viii ", "cand_multi_query_macmini") == base
assert _cache_key("ASME SECTION VIII", "cand_multi_query_macmini") == base
def test_cache_key_differs_by_backend_slug():
k_a = _cache_key("query", "cand_multi_query_macmini")
k_b = _cache_key("query", "cand_multi_query_macbook")
assert k_a != k_b
def test_cache_key_includes_prompt_version():
# PROMPT_VERSION 변경 시 cache 분리 — 직접 test 어렵지만 raw 구성 확인
assert PROMPT_VERSION == "v1"
k = _cache_key("query", "cand_multi_query_macmini")
assert len(k) == 32
# ─── 3. _extract_variants ─────────────────────────────────
def test_extract_variants_valid_shape():
raw = '{"variants": ["원본", "한국어 변형", "english"]}'
out = _extract_variants(raw, expected_n=3)
assert out == ["원본", "한국어 변형", "english"]
def test_extract_variants_strips_whitespace():
raw = '{"variants": [" 원본 ", "한국어\\n", " english "]}'
out = _extract_variants(raw, expected_n=3)
assert out == ["원본", "한국어", "english"]
def test_extract_variants_wrong_count_returns_none():
raw = '{"variants": ["only_one"]}'
assert _extract_variants(raw, expected_n=3) is None
raw = '{"variants": ["a", "b", "c", "d"]}'
assert _extract_variants(raw, expected_n=3) is None
def test_extract_variants_missing_key_returns_none():
raw = '{"queries": ["a", "b", "c"]}'
assert _extract_variants(raw, expected_n=3) is None
def test_extract_variants_non_list_returns_none():
raw = '{"variants": "single string"}'
assert _extract_variants(raw, expected_n=3) is None
def test_extract_variants_empty_string_returns_none():
raw = '{"variants": ["a", "", "c"]}'
assert _extract_variants(raw, expected_n=3) is None
def test_extract_variants_non_string_element_returns_none():
raw = '{"variants": ["a", 123, "c"]}'
assert _extract_variants(raw, expected_n=3) is None
def test_extract_variants_invalid_json_returns_none():
raw = "not json at all"
assert _extract_variants(raw, expected_n=3) is None
def test_extract_variants_markdown_fence_fallback():
# parse_json_response 가 ```json fenced 블록 내부 추출 — production parser 재사용 검증
raw = '```json\n{"variants": ["a", "b", "c"]}\n```'
out = _extract_variants(raw, expected_n=3)
assert out == ["a", "b", "c"]
# ─── 4. cache set / get ───────────────────────────────────
@pytest.mark.asyncio
async def test_cache_set_get_roundtrip():
# 격리: 전역 _CACHE 초기화 (다른 테스트와 격리)
query_rewriter._CACHE.clear()
key = _cache_key("__test_unique_key__", "cand_multi_query_macmini")
assert await query_rewriter._get_cached(key) is None
await query_rewriter._set_cached(key, ["v0", "v1", "v2"])
out = await query_rewriter._get_cached(key)
assert out == ["v0", "v1", "v2"]
@pytest.mark.asyncio
async def test_cache_ttl_expiry():
query_rewriter._CACHE.clear()
key = "ttl_test_key"
# manual entry with past expire_at
query_rewriter._CACHE[key] = (time.time() - 1.0, ["a", "b", "c"])
assert await query_rewriter._get_cached(key) is None
# lazy delete verify
assert key not in query_rewriter._CACHE
@pytest.mark.asyncio
async def test_cache_returns_copy_not_reference():
"""_get_cached 반환 list 를 외부에서 수정해도 internal cache 안전."""
query_rewriter._CACHE.clear()
key = "copy_test_key"
await query_rewriter._set_cached(key, ["a", "b", "c"])
out = await query_rewriter._get_cached(key)
out.append("mutated")
out2 = await query_rewriter._get_cached(key)
assert out2 == ["a", "b", "c"]
# ─── 5. constants ─────────────────────────────────────────
def test_constants_match_plan_v6():
assert PROMPT_VERSION == "v1"
assert EXPECTED_N_VARIANTS == 3
assert query_rewriter.LLM_REWRITE_TIMEOUT_MS == 15000
assert query_rewriter.CACHE_TTL == 86400
assert query_rewriter.CACHE_MAXSIZE == 1000
# ─── 6. Phase 2 — _rrf_fuse_variants 합성 알고리즘 ────────
def _mk_search_result(doc_id: int, score: float = 1.0, match_reason: str = "test"):
"""SearchResult 인스턴스 (api.search 의 BaseModel). file_format 은 str 필수."""
from api.search import SearchResult
return SearchResult(
id=doc_id, title=f"doc-{doc_id}", ai_domain=None,
ai_summary=None, file_format="pdf",
score=score, snippet=None, match_reason=match_reason,
)
def test_rrf_fuse_variants_single_variant_preserves_order():
from services.search.search_pipeline import _rrf_fuse_variants
docs = [_mk_search_result(i) for i in (10, 20, 30)]
out = _rrf_fuse_variants([docs], k=60, limit=10)
assert [r.id for r in out] == [10, 20, 30]
# RRF score = 1/(60+1) > 1/(60+2) > 1/(60+3)
assert out[0].score > out[1].score > out[2].score
assert "multi_query_rrf" in out[0].match_reason
def test_rrf_fuse_variants_accumulates_overlapping_doc_ids():
"""같은 doc_id 가 여러 variant 에서 top rank 면 점수 누적 → 상위."""
from services.search.search_pipeline import _rrf_fuse_variants
v1 = [_mk_search_result(i) for i in (10, 20, 30)]
v2 = [_mk_search_result(i) for i in (40, 10, 50)] # 10 이 두 variant 모두 등장
out = _rrf_fuse_variants([v1, v2], k=60, limit=10)
# 10 = 1/61 + 1/62 (rank 1 + rank 2). 다른 doc 은 1 variant 만 → 단일 RRF score.
ids = [r.id for r in out]
assert ids[0] == 10 # 누적 점수 최상위
# 40 (1/61) vs 20 (1/62) — variant 1 에서 rank 1 인 40 이 단일 등장 doc 중 최상위
assert ids[1] == 40
assert set(ids) == {10, 20, 30, 40, 50}
def test_rrf_fuse_variants_first_variant_representative():
"""같은 doc_id 가 여러 variant 에 있으면 첫 등장 variant 의 SearchResult 보존."""
from services.search.search_pipeline import _rrf_fuse_variants
v1 = [_mk_search_result(10, match_reason="from_v1")]
v2 = [_mk_search_result(10, match_reason="from_v2")]
out = _rrf_fuse_variants([v1, v2], k=60, limit=10)
assert len(out) == 1
assert out[0].id == 10
assert "from_v1" in out[0].match_reason # 첫 등장 보존
assert "multi_query_rrf" in out[0].match_reason
def test_rrf_fuse_variants_respects_limit_cap():
from services.search.search_pipeline import _rrf_fuse_variants
v1 = [_mk_search_result(i) for i in range(100, 130)] # 30 docs
v2 = [_mk_search_result(i) for i in range(200, 230)] # 30 docs, 모두 unique
out = _rrf_fuse_variants([v1, v2], k=60, limit=5)
assert len(out) == 5
def test_rrf_fuse_variants_empty_lists_returns_empty():
from services.search.search_pipeline import _rrf_fuse_variants
assert _rrf_fuse_variants([], k=60, limit=10) == []
assert _rrf_fuse_variants([[], [], []], k=60, limit=10) == []
def test_rrf_fuse_variants_rank_position_matters():
"""variant 가 길어져도 RRF 공식이 rank 만 사용."""
from services.search.search_pipeline import _rrf_fuse_variants
v1 = [_mk_search_result(10)] # rank 1
v2 = [_mk_search_result(99), _mk_search_result(10)] # 10 이 rank 2
out = _rrf_fuse_variants([v1, v2], k=60, limit=10)
# 10 = 1/61 + 1/62, 99 = 1/61. 둘 다 등장 doc 중 10 점수 높음.
assert out[0].id == 10
assert out[1].id == 99
# ─── 7. Phase 2 — search_pipeline import + run_search signature ───
def test_search_pipeline_imports_query_rewriter():
"""search_pipeline 이 query_rewriter 를 import 하는지 (dispatch 분기 활성)."""
from services.search import search_pipeline
assert hasattr(search_pipeline, "query_rewriter")
assert hasattr(search_pipeline, "search_with_rewrite")
assert hasattr(search_pipeline, "_rrf_fuse_variants")
def test_run_search_has_rewrite_backend_param():
"""run_search signature 에 rewrite_backend 가 추가됐는지."""
import inspect
from services.search.search_pipeline import run_search
sig = inspect.signature(run_search)
assert "rewrite_backend" in sig.parameters
# default = None (baseline 회귀 0 invariant)
assert sig.parameters["rewrite_backend"].default is None
def test_phase2q_constants():
"""plan v6 §5.5 박제값."""
from services.search.search_pipeline import (
PHASE2Q_PRODUCTION_TOPK,
PHASE2Q_RRF_K,
PHASE2Q_UNIFIED_CAP,
)
assert PHASE2Q_PRODUCTION_TOPK == 50
assert PHASE2Q_RRF_K == 60
assert PHASE2Q_UNIFIED_CAP == 60
# per-variant K = 50 // 3 = 16 (A1 채택)
assert PHASE2Q_PRODUCTION_TOPK // EXPECTED_N_VARIANTS == 16
# ─── 8. Phase 3 incident regression — fixture-first call shape ───
# Phase 3 cold 측정에서 NDCG 0.033 catastrophic 발견 → variants 가 query 무관 동일 응답.
# root cause = _call_llm 이 user 메시지 1개에 prompt template 전체 박음. fixture 의 정확한
# request_body 는 system=prompt / user=query 분리. fixture-first invariant 위반.
# 본 test 는 호출 형식이 fixture 와 일치하는지 verify (regression 방지).
@pytest.mark.asyncio
async def test_call_llm_uses_system_user_message_split(monkeypatch):
"""_call_llm 이 fixture 의 request_body 형식 (system=prompt / user=query) 으로 호출하는지."""
captured = {}
class _MockResponse:
def raise_for_status(self):
return None
def json(self):
return {"choices": [{"message": {"content": '{"variants": ["a", "b", "c"]}'}}]}
class _MockClient:
def __init__(self, *args, **kwargs):
pass
async def __aenter__(self):
return self
async def __aexit__(self, *args):
return None
async def post(self, url, json):
captured["url"] = url
captured["payload"] = json
return _MockResponse()
monkeypatch.setattr(query_rewriter.httpx, "AsyncClient", _MockClient)
cfg = query_rewriter.LLM_BACKEND_MAP["cand_multi_query_macmini"]
raw = await query_rewriter._call_llm(cfg, "LPG 저장탱크 안전거리")
# raw 응답 정상
assert "variants" in raw
# endpoint = cfg endpoint 사용
assert captured["url"] == cfg["endpoint"]
payload = captured["payload"]
# model = cfg model
assert payload["model"] == cfg["model"]
# messages = 2 entry, system + user 분리
messages = payload["messages"]
assert len(messages) == 2
assert messages[0]["role"] == "system"
assert messages[1]["role"] == "user"
# user 메시지 = query verbatim (prompt template 안 박힘)
assert messages[1]["content"] == "LPG 저장탱크 안전거리"
# system 메시지 = prompt template (instruction). query 본문은 포함되지 않음.
assert "LPG 저장탱크 안전거리" not in messages[0]["content"]
assert "search query rewriter" in messages[0]["content"].lower()
# sampling 박제 적용 (gemma → response_format json_object)
assert payload["temperature"] == 0.3
assert payload["max_tokens"] == 256
assert payload.get("response_format") == {"type": "json_object"}
@pytest.mark.asyncio
async def test_call_llm_qwen_no_response_format(monkeypatch):
"""qwen backend = response_format 미사용 (mlx-vlm.server 미지원, Phase 0 inspect 9 박제)."""
captured = {}
class _MockResponse:
def raise_for_status(self):
return None
def json(self):
return {"choices": [{"message": {"content": '{"variants": ["a", "b", "c"]}'}}]}
class _MockClient:
def __init__(self, *args, **kwargs):
pass
async def __aenter__(self):
return self
async def __aexit__(self, *args):
return None
async def post(self, url, json):
captured["payload"] = json
return _MockResponse()
monkeypatch.setattr(query_rewriter.httpx, "AsyncClient", _MockClient)
cfg = query_rewriter.LLM_BACKEND_MAP["cand_multi_query_macbook"]
await query_rewriter._call_llm(cfg, "ASME Section VIII")
payload = captured["payload"]
# qwen 은 response_format 박제 0 (prompt rule 만)
assert "response_format" not in payload
# ─── 9. PR-2Q-Rerank-Payload-Fix — chunk_id dedup + input cap ───
# multi-query path 의 merged_chunks_by_doc 가 variant 별 same chunk 중복 누적 →
# reranker 413 trigger. dedup helper + cap 강제 invariant.
def test_dedup_chunks_empty_returns_empty():
from services.search.search_pipeline import _dedup_chunks_by_id
assert _dedup_chunks_by_id([]) == []
def _mk_chunk_result(doc_id: int, chunk_id: int | None = None, score: float = 1.0):
"""chunk-level SearchResult (chunk_id 별 dedup test 용)."""
from api.search import SearchResult
return SearchResult(
id=doc_id, title=f"doc-{doc_id}", ai_domain=None,
ai_summary=None, file_format="pdf",
score=score, snippet=None, match_reason="test",
chunk_id=chunk_id,
)
def test_dedup_chunks_by_chunk_id_first_only():
"""같은 chunk_id 의 SearchResult 여러 개 → 첫 등장만 유지."""
from services.search.search_pipeline import _dedup_chunks_by_id
chunks = [
_mk_chunk_result(doc_id=10, chunk_id=100, score=0.9),
_mk_chunk_result(doc_id=10, chunk_id=100, score=0.8), # 중복 (variant 다른 등장)
_mk_chunk_result(doc_id=10, chunk_id=101, score=0.7),
]
out = _dedup_chunks_by_id(chunks)
assert len(out) == 2
assert out[0].chunk_id == 100
assert out[0].score == 0.9 # 첫 등장 보존
assert out[1].chunk_id == 101
def test_dedup_chunks_none_chunk_id_doc_level_first_only():
"""chunk_id None 인 doc-level result 는 doc.id 기준 first-only."""
from services.search.search_pipeline import _dedup_chunks_by_id
chunks = [
_mk_chunk_result(doc_id=10, chunk_id=None, score=0.9),
_mk_chunk_result(doc_id=10, chunk_id=None, score=0.8), # 같은 doc_id 중복
_mk_chunk_result(doc_id=20, chunk_id=None, score=0.7),
]
out = _dedup_chunks_by_id(chunks)
assert len(out) == 2
assert out[0].id == 10
assert out[0].score == 0.9
assert out[1].id == 20
def test_dedup_chunks_mixed_chunk_id_and_none():
"""chunk_id 있는 것 + None 혼합 — 각각 별도 set 으로 dedup."""
from services.search.search_pipeline import _dedup_chunks_by_id
chunks = [
_mk_chunk_result(doc_id=10, chunk_id=100), # keep (chunk_id 100)
_mk_chunk_result(doc_id=10, chunk_id=None), # keep (doc-level, first)
_mk_chunk_result(doc_id=10, chunk_id=100), # drop (chunk_id 100 중복)
_mk_chunk_result(doc_id=10, chunk_id=None), # drop (doc-level 중복)
_mk_chunk_result(doc_id=20, chunk_id=200), # keep (chunk_id 200 신규)
]
out = _dedup_chunks_by_id(chunks)
assert len(out) == 3
assert out[0].chunk_id == 100
assert out[1].chunk_id is None and out[1].id == 10
assert out[2].chunk_id == 200
def test_dedup_chunks_order_preserved():
"""입력 순서 유지 (variant 0 = 원본 verbatim 우선 invariant)."""
from services.search.search_pipeline import _dedup_chunks_by_id
chunks = [
_mk_chunk_result(doc_id=10, chunk_id=cid)
for cid in (300, 100, 200, 100, 300, 400) # 100/300 중복
]
out = _dedup_chunks_by_id(chunks)
assert [c.chunk_id for c in out] == [300, 100, 200, 400]
def test_phase2q_rerank_input_cap_constants():
"""PHASE2Q_RERANK_INPUT_CAP + PHASE2Q_CHUNKS_PER_DOC (baseline MAX_* 와 별도).
cap 60 + chunks_per_doc=2 + dedup + TEI MAX_BATCH_TOKENS 16384 조합 (사용자 결정,
2026-05-24). doc 다양성 유지 + reranker 가 doc 의 2 best chunks 봄 + payload 한도
16384 안에 안전. 진단 history 는 모듈 docstring 박제.
"""
from services.search.search_pipeline import (
PHASE2Q_CHUNKS_PER_DOC,
PHASE2Q_RERANK_INPUT_CAP,
)
assert PHASE2Q_RERANK_INPUT_CAP == 60
assert PHASE2Q_CHUNKS_PER_DOC == 2