diff --git a/app/api/search.py b/app/api/search.py index fed8d2f..91c9ad2 100644 --- a/app/api/search.py +++ b/app/api/search.py @@ -178,35 +178,10 @@ async def search( rewrite_backend: str | None = Query( None, pattern=r"^(baseline|cand_[a-z0-9_]+)$", - description="Phase 2Q Diagnose query rewrite dispatcher (slug-based, no silent fallback). baseline|cand_multi_query_macmini|cand_multi_query_macbook. 미지정/baseline = single-query path. Phase 1B scaffold = variants 박제만, retrieval 합성은 Phase 2.", + description="Phase 2Q Diagnose query rewrite dispatcher (slug-based, no silent fallback). baseline|cand_multi_query_macmini|cand_multi_query_macbook. 미지정/baseline = single-query path. Phase 2 = variant N 별 retrieval+fusion → unified RRF → reranker 1회.", ), ): """문서 검색 — FTS + ILIKE + 벡터 결합 (Phase 3.1 이후 run_search wrapper)""" - # Phase 2Q Diagnose scaffold (plan v6 Phase 1): - # slug 명시 시 LLM rewrite 호출 → variants 박제 (logger). retrieval path 영향 0 - # (results 미사용, baseline single-query path 유지). Phase 2 에서 search_with_rewrite() 합성. - if rewrite_backend not in (None, "baseline"): - try: - await query_rewriter.rewrite(q, rewrite_backend) - except ValueError: - return JSONResponse( - status_code=400, - content={ - "error_reason": "unknown_rewrite_backend", - "backend_requested": rewrite_backend, - "allowed": query_rewriter.allowed_slugs(), - }, - ) - except RuntimeError as e: - return JSONResponse( - status_code=503, - content={ - "error_reason": "rewrite_llm_unavailable", - "backend_requested": rewrite_backend, - "detail": str(e), - }, - ) - try: pr = await run_search( session, @@ -220,10 +195,21 @@ async def search( snapshot_doc_id_max=snapshot_doc_id_max, snapshot_chunk_id_max=snapshot_chunk_id_max, reranker_backend=reranker_backend, + rewrite_backend=rewrite_backend, ) except ValueError as e: - # _resolve_backend / _resolve_reranker 가 unknown slug 시 ValueError → HTTP 400 + # _resolve_backend / _resolve_reranker / _resolve_rewrite_backend 가 unknown slug 시 ValueError → HTTP 400 msg = str(e) + if msg.startswith("unknown_rewrite_backend"): + return JSONResponse( + status_code=400, + content={ + "error_reason": "unknown_rewrite_backend", + "backend_requested": rewrite_backend, + "allowed": query_rewriter.allowed_slugs(), + "detail": msg, + }, + ) if msg.startswith("unknown_reranker_backend"): return JSONResponse( status_code=400, @@ -243,6 +229,19 @@ async def search( "detail": msg, }, ) + except RuntimeError as e: + # query_rewriter.rewrite() 실패 (LLM unavailable / parse fail) → HTTP 503 + msg = str(e) + if msg.startswith("rewrite_llm_unavailable"): + return JSONResponse( + status_code=503, + content={ + "error_reason": "rewrite_llm_unavailable", + "backend_requested": rewrite_backend, + "detail": msg, + }, + ) + raise # 사용자 feedback: 모든 단계 timing은 debug 응답과 별도로 항상 로그로 남긴다 timing_str = " ".join(f"{k}={v:.0f}" for k, v in pr.timing_ms.items()) diff --git a/app/services/search/search_pipeline.py b/app/services/search/search_pipeline.py index 07236e8..c59a728 100644 --- a/app/services/search/search_pipeline.py +++ b/app/services/search/search_pipeline.py @@ -25,13 +25,14 @@ byte-level 에 가깝게 일치해야 한다. from __future__ import annotations +import asyncio import time from dataclasses import dataclass, field from typing import TYPE_CHECKING, Literal from sqlalchemy.ext.asyncio import AsyncSession -from . import query_analyzer +from . import query_analyzer, query_rewriter from .fusion_service import ( DEFAULT_FUSION, apply_soft_filter_boost, @@ -68,6 +69,13 @@ ANALYZER_TIER_IGNORE = 0.5 # < 0.5 → analyzer 완전 무시, soft_filter 비 ANALYZER_TIER_ORIGINAL = 0.7 # < 0.7 → original query fallback ANALYZER_TIER_MERGE = 0.85 # < 0.85 → original + analyzed merge +# ─── Phase 2Q multi-query 합성 상수 (plan v6 §5.5 박제) ── +# per-variant top-K = PRODUCTION_TOPK // N (50 // 3 = 16, A1 채택). +# reranker batch ≤ 60 cap → latency 회귀 0. +PHASE2Q_PRODUCTION_TOPK = 50 +PHASE2Q_UNIFIED_CAP = 60 # variant 합성 후 reranker 입력 후보 doc cap +PHASE2Q_RRF_K = 60 # production fusion_service.RRFOnly.K 와 동일 + def _analyzer_tier(confidence: float) -> str: """analyzer_confidence → 사용 tier 문자열. Phase 2.2/2.3에서 실제 분기용.""" @@ -125,6 +133,7 @@ async def run_search( snapshot_doc_id_max: int | None = None, snapshot_chunk_id_max: int | None = None, reranker_backend: str | None = None, + rewrite_backend: str | None = None, ) -> PipelineResult: """검색 파이프라인 실행. @@ -140,6 +149,9 @@ async def run_search( fusion: legacy | rrf | rrf_boost rerank: bge-reranker-v2-m3 활성화 (hybrid 전용) analyze: QueryAnalyzer 활성화 (cache hit 조건부 멀티링구얼 / soft filter) + rewrite_backend: Phase 2Q multi-query rewrite dispatcher slug. None/baseline = + single-query path (기존 동작). hybrid + cand_ 시 search_with_rewrite() + 로 위임 — variant N retrieval → per-variant fusion → unified RRF → reranker 1회. Returns: PipelineResult @@ -147,6 +159,21 @@ async def run_search( # 로컬 import — circular 방지 (SearchResult 는 api.search 에 inline 선언) from api.search import SearchResult # noqa: F401 — TYPE_CHECKING 실런타임 반영 + # Phase 2Q dispatch — rewrite_backend 활성 + hybrid 만 multi-query path. + # 기타 mode 또는 baseline/None 은 기존 single-query 경로 그대로. + if rewrite_backend not in (None, "baseline") and mode == "hybrid": + return await search_with_rewrite( + session, q, + limit=limit, + fusion=fusion, + rerank=rerank, + embedding_backend=embedding_backend, + snapshot_doc_id_max=snapshot_doc_id_max, + snapshot_chunk_id_max=snapshot_chunk_id_max, + reranker_backend=reranker_backend, + rewrite_backend=rewrite_backend, + ) + timing: dict[str, float] = {} notes: list[str] = [] text_results: list["SearchResult"] = [] @@ -369,3 +396,205 @@ async def run_search( timing_ms=timing, notes=notes, ) + + +# ─── Phase 2Q multi-query retrieval 합성 ────────────────── + + +def _rrf_fuse_variants( + variant_lists: "list[list[SearchResult]]", + k: int, + limit: int, +) -> "list[SearchResult]": + """N variant 의 ranked list 를 RRF 합성. fusion_service.RRFOnly 알고리즘 동일. + + 각 doc_id 의 RRF_score = Σ 1/(k + rank_i) over variant lists. + 같은 doc_id 가 여러 variant 에서 등장하면 점수 누적. 첫 등장 variant 의 + SearchResult 를 representative 로 보존 (snippet/match_reason 등 메타). + """ + from api.search import SearchResult # 순환 import 회피 + + scores: dict[int, float] = {} + representative: dict[int, "SearchResult"] = {} + + for variant_list in variant_lists: + for rank, doc in enumerate(variant_list, start=1): + doc_id = doc.id + scores[doc_id] = scores.get(doc_id, 0.0) + 1.0 / (k + rank) + if doc_id not in representative: + representative[doc_id] = doc + + fused: list["SearchResult"] = [] + for doc_id, rrf_score in sorted(scores.items(), key=lambda x: x[1], reverse=True): + doc = representative[doc_id] + fused.append(SearchResult( + id=doc.id, + title=doc.title, + ai_domain=doc.ai_domain, + ai_summary=doc.ai_summary, + file_format=doc.file_format, + score=rrf_score, + snippet=doc.snippet, + match_reason=f"{doc.match_reason}+multi_query_rrf", + )) + return fused[:limit] + + +async def search_with_rewrite( + session: AsyncSession, + q: str, + *, + limit: int, + fusion: str, + rerank: bool, + embedding_backend: str | None, + snapshot_doc_id_max: int | None, + snapshot_chunk_id_max: int | None, + reranker_backend: str | None, + rewrite_backend: str, +) -> PipelineResult: + """Phase 2Q multi-query retrieval 합성 path (plan v6 §5.5). + + 흐름: + 1. query_rewriter.rewrite(q, slug) → variants (N=3, prompt v1 invariant) + 2. variant 별 search_text + search_vector (asyncio.gather, per-variant K=16) + 3. variant 별 strategy.fuse(text, vector) — production fusion 재사용 + 4. N variant 의 fused list → _rrf_fuse_variants (k=60, cap 60) + 5. reranker 1회 (variant 무관 unified candidate set) — query = 원본 q + 6. diversity + freshness + display 정규화 (run_search 동일 마무리) + + LLM call 실패 / parse fail → query_rewriter.rewrite 가 RuntimeError 전파. + unknown slug → ValueError. caller(search.py) 가 HTTP 503/400 으로 translate. + + mode 는 hybrid 가정 (run_search 의 분기 조건). rerank=False 시 unified_docs 그대로. + """ + from api.search import SearchResult # noqa: F401 + + timing: dict[str, float] = {} + notes: list[str] = [] + t_total = time.perf_counter() + + # 1) variants — LLM call (실패 시 caller 가 503 translate) + t_rw = time.perf_counter() + variants = await query_rewriter.rewrite(q, rewrite_backend) + timing["rewrite_ms"] = (time.perf_counter() - t_rw) * 1000 + if not variants: + # 방어 — query_rewriter.rewrite 는 backend != baseline 시 list 또는 raise. + # None 이 도달하면 명시적 503 신호. + raise RuntimeError(f"rewrite_llm_unavailable:{rewrite_backend}:empty_variants") + + per_variant_k = max(1, PHASE2Q_PRODUCTION_TOPK // len(variants)) + notes.append( + f"rewrite={rewrite_backend} n_variants={len(variants)} " + f"per_variant_k={per_variant_k}" + ) + + # 2) variant 별 retrieval (text + vector) — asyncio.gather 병렬 + t_var = time.perf_counter() + + async def _variant_retrieve( + v: str, + ) -> "tuple[list[SearchResult], list[SearchResult], dict[int, list[SearchResult]]]": + text = await search_text(session, v, per_variant_k) + raw_chunks = await search_vector( + session, v, per_variant_k, + embedding_backend=embedding_backend, + snapshot_doc_id_max=snapshot_doc_id_max, + snapshot_chunk_id_max=snapshot_chunk_id_max, + ) + vector, chunks_by_doc = compress_chunks_to_docs(raw_chunks, per_variant_k) + return text, vector, chunks_by_doc + + variant_outputs = await asyncio.gather( + *[_variant_retrieve(v) for v in variants] + ) + timing["variant_retrieve_ms"] = (time.perf_counter() - t_var) * 1000 + + # 3) variant 별 fusion (production fusion 재사용) + t_fuse = time.perf_counter() + strategy = get_strategy(fusion) + per_variant_fused: list[list["SearchResult"]] = [] + merged_chunks_by_doc: dict[int, list["SearchResult"]] = {} + for v, (text, vector, cbd) in zip(variants, variant_outputs): + fused = strategy.fuse(text, vector, v, per_variant_k) + per_variant_fused.append(fused) + for doc_id, chunks in cbd.items(): + merged_chunks_by_doc.setdefault(doc_id, []).extend(chunks) + timing["variant_fusion_ms"] = (time.perf_counter() - t_fuse) * 1000 + notes.append(f"fusion={strategy.name}") + + # 4) variant 간 RRF 합성 — unified candidate set (cap 60) + t_rrf = time.perf_counter() + unified_docs = _rrf_fuse_variants( + per_variant_fused, + k=PHASE2Q_RRF_K, + limit=PHASE2Q_UNIFIED_CAP, + ) + timing["unified_rrf_ms"] = (time.perf_counter() - t_rrf) * 1000 + notes.append( + f"unified docs={len(unified_docs)} cap={PHASE2Q_UNIFIED_CAP}" + ) + + # 5) reranker 1회 (variant 무관, query = 원본 q) + if rerank: + t_re = time.perf_counter() + rerank_input: list["SearchResult"] = [] + for doc in unified_docs: + chunks = merged_chunks_by_doc.get(doc.id, []) + if chunks: + rerank_input.extend(chunks[:MAX_CHUNKS_PER_DOC]) + else: + rerank_input.append(doc) + if len(rerank_input) >= MAX_RERANK_INPUT: + break + rerank_input = rerank_input[:MAX_RERANK_INPUT] + notes.append(f"rerank input={len(rerank_input)}") + + reranked = await rerank_chunks( + q, rerank_input, limit * 3, + reranker_backend=reranker_backend, + snapshot_doc_id_max=snapshot_doc_id_max, + snapshot_chunk_id_max=snapshot_chunk_id_max, + ) + timing["rerank_ms"] = (time.perf_counter() - t_re) * 1000 + + t_div = time.perf_counter() + results = apply_diversity(reranked, max_per_doc=MAX_CHUNKS_PER_DOC)[:limit] + timing["diversity_ms"] = (time.perf_counter() - t_div) * 1000 + else: + results = unified_docs[:limit] + + # 6) freshness + display 정규화 (run_search 동일 마무리) + t_fr = time.perf_counter() + results = await apply_freshness_decay(results, session) + timing["freshness_ms"] = (time.perf_counter() - t_fr) * 1000 + + normalize_display_scores(results) + + timing["total_ms"] = (time.perf_counter() - t_total) * 1000 + + # confidence — rerank 활성 시 reranker score 우선. + # multi-query 시 text/vector 개별 신호 의미 약함 → unified 결과 사용. + if rerank and "rerank_ms" in timing: + confidence_signal = compute_confidence_reranked(results) + else: + confidence_signal = compute_confidence(results, "vector") + + # text_results / vector_results 는 원본 variant (index 0, prompt v1 invariant=원본 verbatim) 만 노출 + text_v0, vector_v0, _ = variant_outputs[0] + + return PipelineResult( + results=results, + mode="hybrid", + confidence_signal=confidence_signal, + text_results=text_v0, + vector_results=vector_v0, + raw_chunks=[], # variant 별 raw chunks 합치는 의미 약함 — debug 노출 X + chunks_by_doc=merged_chunks_by_doc, + query_analysis=None, + analyzer_cache_hit=False, + analyzer_confidence=0.0, + analyzer_tier="disabled", + timing_ms=timing, + notes=notes, + ) diff --git a/tests/test_query_rewriter.py b/tests/test_query_rewriter.py index 9c328e0..03669e8 100644 --- a/tests/test_query_rewriter.py +++ b/tests/test_query_rewriter.py @@ -30,6 +30,10 @@ def _safe_file_handler(filename, *args, **kwargs): # type: ignore logging.FileHandler = _safe_file_handler # type: ignore[assignment] +# Phase 2 test (search_pipeline import) 는 api.search → SQLAlchemy engine init 트리거. +# DATABASE_URL 미설정 시 ArgumentError 로 collection 실패. dummy URL 주입 (실제 connect X). +os.environ.setdefault("DATABASE_URL", "postgresql+asyncpg://test:test@localhost:5432/test") + # tests/ → 프로젝트 루트 → app/ sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "app")) @@ -210,3 +214,112 @@ def test_constants_match_plan_v6(): assert query_rewriter.LLM_REWRITE_TIMEOUT_MS == 15000 assert query_rewriter.CACHE_TTL == 86400 assert query_rewriter.CACHE_MAXSIZE == 1000 + + +# ─── 6. Phase 2 — _rrf_fuse_variants 합성 알고리즘 ──────── + + +def _mk_search_result(doc_id: int, score: float = 1.0, match_reason: str = "test"): + """SearchResult 인스턴스 (api.search 의 BaseModel). file_format 은 str 필수.""" + from api.search import SearchResult + return SearchResult( + id=doc_id, title=f"doc-{doc_id}", ai_domain=None, + ai_summary=None, file_format="pdf", + score=score, snippet=None, match_reason=match_reason, + ) + + +def test_rrf_fuse_variants_single_variant_preserves_order(): + from services.search.search_pipeline import _rrf_fuse_variants + docs = [_mk_search_result(i) for i in (10, 20, 30)] + out = _rrf_fuse_variants([docs], k=60, limit=10) + assert [r.id for r in out] == [10, 20, 30] + # RRF score = 1/(60+1) > 1/(60+2) > 1/(60+3) + assert out[0].score > out[1].score > out[2].score + assert "multi_query_rrf" in out[0].match_reason + + +def test_rrf_fuse_variants_accumulates_overlapping_doc_ids(): + """같은 doc_id 가 여러 variant 에서 top rank 면 점수 누적 → 상위.""" + from services.search.search_pipeline import _rrf_fuse_variants + v1 = [_mk_search_result(i) for i in (10, 20, 30)] + v2 = [_mk_search_result(i) for i in (40, 10, 50)] # 10 이 두 variant 모두 등장 + out = _rrf_fuse_variants([v1, v2], k=60, limit=10) + # 10 = 1/61 + 1/62 (rank 1 + rank 2). 다른 doc 은 1 variant 만 → 단일 RRF score. + ids = [r.id for r in out] + assert ids[0] == 10 # 누적 점수 최상위 + # 40 (1/61) vs 20 (1/62) — variant 1 에서 rank 1 인 40 이 단일 등장 doc 중 최상위 + assert ids[1] == 40 + assert set(ids) == {10, 20, 30, 40, 50} + + +def test_rrf_fuse_variants_first_variant_representative(): + """같은 doc_id 가 여러 variant 에 있으면 첫 등장 variant 의 SearchResult 보존.""" + from services.search.search_pipeline import _rrf_fuse_variants + v1 = [_mk_search_result(10, match_reason="from_v1")] + v2 = [_mk_search_result(10, match_reason="from_v2")] + out = _rrf_fuse_variants([v1, v2], k=60, limit=10) + assert len(out) == 1 + assert out[0].id == 10 + assert "from_v1" in out[0].match_reason # 첫 등장 보존 + assert "multi_query_rrf" in out[0].match_reason + + +def test_rrf_fuse_variants_respects_limit_cap(): + from services.search.search_pipeline import _rrf_fuse_variants + v1 = [_mk_search_result(i) for i in range(100, 130)] # 30 docs + v2 = [_mk_search_result(i) for i in range(200, 230)] # 30 docs, 모두 unique + out = _rrf_fuse_variants([v1, v2], k=60, limit=5) + assert len(out) == 5 + + +def test_rrf_fuse_variants_empty_lists_returns_empty(): + from services.search.search_pipeline import _rrf_fuse_variants + assert _rrf_fuse_variants([], k=60, limit=10) == [] + assert _rrf_fuse_variants([[], [], []], k=60, limit=10) == [] + + +def test_rrf_fuse_variants_rank_position_matters(): + """variant 가 길어져도 RRF 공식이 rank 만 사용.""" + from services.search.search_pipeline import _rrf_fuse_variants + v1 = [_mk_search_result(10)] # rank 1 + v2 = [_mk_search_result(99), _mk_search_result(10)] # 10 이 rank 2 + out = _rrf_fuse_variants([v1, v2], k=60, limit=10) + # 10 = 1/61 + 1/62, 99 = 1/61. 둘 다 등장 doc 중 10 점수 높음. + assert out[0].id == 10 + assert out[1].id == 99 + + +# ─── 7. Phase 2 — search_pipeline import + run_search signature ─── + + +def test_search_pipeline_imports_query_rewriter(): + """search_pipeline 이 query_rewriter 를 import 하는지 (dispatch 분기 활성).""" + from services.search import search_pipeline + assert hasattr(search_pipeline, "query_rewriter") + assert hasattr(search_pipeline, "search_with_rewrite") + assert hasattr(search_pipeline, "_rrf_fuse_variants") + + +def test_run_search_has_rewrite_backend_param(): + """run_search signature 에 rewrite_backend 가 추가됐는지.""" + import inspect + from services.search.search_pipeline import run_search + sig = inspect.signature(run_search) + assert "rewrite_backend" in sig.parameters + # default = None (baseline 회귀 0 invariant) + assert sig.parameters["rewrite_backend"].default is None + + +def test_phase2q_constants(): + """plan v6 §5.5 박제값.""" + from services.search.search_pipeline import ( + PHASE2Q_PRODUCTION_TOPK, + PHASE2Q_RRF_K, + PHASE2Q_UNIFIED_CAP, + ) + assert PHASE2Q_PRODUCTION_TOPK == 50 + assert PHASE2Q_RRF_K == 60 + assert PHASE2Q_UNIFIED_CAP == 60 + # per-variant K = 50 // 3 = 16 (A1 채택) + assert PHASE2Q_PRODUCTION_TOPK // EXPECTED_N_VARIANTS == 16