Files
hyungi_document_server/app/services/search/search_pipeline.py
Hyungi Ahn 64322e4f6f feat(search): Phase 3 Ask pipeline (evidence + synthesis + /api/search/ask)
- llm_gate.py: MLX single-inference 전역 semaphore (analyzer/evidence/synthesis 공유)
- search_pipeline.py: run_search() 추출, /search 와 /ask 단일 진실 소스
- evidence_service.py: Rule + LLM span select (EV-A), doc-group ordering,
  span too-short 자동 확장(<80자→120자), fallback 은 query 중심 window 강제
- synthesis_service.py: grounded answer + citation 검증 + LRU 캐시(1h/300),
  refused 처리, span_text ONLY 룰 (full_snippet 프롬프트 금지)
- /api/search/ask: 15s timeout, 9가지 failure mode + 한국어 no_results_reason
- rerank_service: rerank_score raw 보존 (display drift 방지)
- query_analyzer: _get_llm_semaphore 를 llm_gate.get_mlx_gate 로 위임
- prompts: evidence_extract.txt, search_synthesis.txt (JSON-only, example 포함)

config.yaml / docker / ollama / infra_inventory 변경 없음.
plan: ~/.claude/plans/quiet-meandering-nova.md

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-09 07:34:08 +09:00

336 lines
14 KiB
Python

"""검색 파이프라인 오케스트레이션 (Phase 3.1).
`/api/search/` 와 `/api/search/ask` 가 공유하는 단일 진실 소스.
## 순수성 규칙 (영구)
`run_search()`는 wrapper(endpoint)에서 side effect를 최대한 분리한다:
- ❌ **금지**: `BackgroundTasks` 파라미터, `logger.info(...)` 직접 호출,
`record_search_event()` 호출, `SearchResponse`/`AskResponse` 직렬화
- ✅ **허용**: `trigger_background_analysis()` (analyzer cache miss 시
fire-and-forget task — retrieval 전략의 일부, 자가 완결됨)
- ✅ **허용**: retrieval / fusion / rerank / diversity / display 정규화 /
confidence 계산 같은 내부 서비스 호출
반환값은 `PipelineResult` 하나. wrapper가 그 안에서 필요한 필드를 꺼내
logger / telemetry / 응답 직렬화를 수행한다.
## Phase 2 호환
본 모듈은 기존 `app/api/search.py::search()` 함수 본문을 lift-and-shift 한
것이다. 변수명 / notes 문자열 / timing 키 / logger 포맷 은 wrapper 쪽에서
완전히 동일하게 재구성된다. refactor 전후 `/search?debug=true` 응답은
byte-level 에 가깝게 일치해야 한다.
"""
from __future__ import annotations
import time
from dataclasses import dataclass, field
from typing import TYPE_CHECKING, Literal
from sqlalchemy.ext.asyncio import AsyncSession
from . import query_analyzer
from .fusion_service import (
DEFAULT_FUSION,
apply_soft_filter_boost,
get_strategy,
normalize_display_scores,
)
from .rerank_service import (
MAX_CHUNKS_PER_DOC,
MAX_RERANK_INPUT,
apply_diversity,
rerank_chunks,
)
from .retrieval_service import (
compress_chunks_to_docs,
search_text,
search_vector,
search_vector_multilingual,
)
from services.search_telemetry import (
compute_confidence,
compute_confidence_hybrid,
compute_confidence_reranked,
)
if TYPE_CHECKING:
from api.search import SearchResult
# ─── Phase 2.1: analyzer_confidence 3단계 게이트 ──────────
# search.py 에서 이동. search.py 의 /search wrapper 는 이 상수들을
# 노출할 필요 없으므로 파이프라인 모듈에만 둔다.
ANALYZER_TIER_IGNORE = 0.5 # < 0.5 → analyzer 완전 무시, soft_filter 비활성
ANALYZER_TIER_ORIGINAL = 0.7 # < 0.7 → original query fallback
ANALYZER_TIER_MERGE = 0.85 # < 0.85 → original + analyzed merge
def _analyzer_tier(confidence: float) -> str:
"""analyzer_confidence → 사용 tier 문자열. Phase 2.2/2.3에서 실제 분기용."""
if confidence < ANALYZER_TIER_IGNORE:
return "ignore"
if confidence < ANALYZER_TIER_ORIGINAL:
return "original_fallback"
if confidence < ANALYZER_TIER_MERGE:
return "merge"
return "analyzed"
# ─── 반환 타입 ─────────────────────────────────────────────
@dataclass(slots=True)
class PipelineResult:
"""run_search() 반환 — wrapper 가 필요한 모든 state 를 담는다."""
# ── 최종 결과 (API 노출) ──
results: "list[SearchResult]"
mode: str
confidence_signal: float
# ── 중간 단계 (evidence 입력 + debug) ──
text_results: "list[SearchResult]"
vector_results: "list[SearchResult]" # doc 압축 후
raw_chunks: "list[SearchResult]" # chunk 원본 (rerank/evidence용)
chunks_by_doc: "dict[int, list[SearchResult]]"
# ── 쿼리 분석 메타 ──
query_analysis: dict | None
analyzer_cache_hit: bool
analyzer_confidence: float # 항상 float (None 금지)
analyzer_tier: str
# ── 관측 ──
timing_ms: dict[str, float] = field(default_factory=dict)
notes: list[str] = field(default_factory=list)
# ─── 메인 파이프라인 ───────────────────────────────────────
async def run_search(
session: AsyncSession,
q: str,
*,
mode: Literal["fts", "trgm", "vector", "hybrid"] = "hybrid",
limit: int = 20,
fusion: str = DEFAULT_FUSION,
rerank: bool = True,
analyze: bool = False,
) -> PipelineResult:
"""검색 파이프라인 실행.
retrieval → fusion → rerank → diversity → display 정규화 → confidence 계산
까지 수행하고 `PipelineResult` 를 반환한다. logging / BackgroundTasks /
응답 직렬화는 절대 수행하지 않는다 (wrapper 책임).
Args:
session: AsyncSession (caller 가 관리)
q: 사용자 쿼리 원문
mode: fts | trgm | vector | hybrid
limit: 최종 결과 수 (hybrid 에서는 fusion 입력 후보 수는 이보다 넓음)
fusion: legacy | rrf | rrf_boost
rerank: bge-reranker-v2-m3 활성화 (hybrid 전용)
analyze: QueryAnalyzer 활성화 (cache hit 조건부 멀티링구얼 / soft filter)
Returns:
PipelineResult
"""
# 로컬 import — circular 방지 (SearchResult 는 api.search 에 inline 선언)
from api.search import SearchResult # noqa: F401 — TYPE_CHECKING 실런타임 반영
timing: dict[str, float] = {}
notes: list[str] = []
text_results: list["SearchResult"] = []
vector_results: list["SearchResult"] = [] # doc-level (압축 후, fusion 입력)
raw_chunks: list["SearchResult"] = [] # chunk-level (raw, Phase 1.3 reranker용)
chunks_by_doc: dict[int, list["SearchResult"]] = {} # Phase 1.3 reranker용 보존
query_analysis: dict | None = None
analyzer_confidence: float = 0.0
analyzer_tier: str = "disabled"
t_total = time.perf_counter()
# Phase 2.1 (async 구조): QueryAnalyzer는 동기 호출 금지.
# - cache hit → query_analysis 활용 (Phase 2.2/2.3 파이프라인 조건부)
# - cache miss → 기존 경로 유지 + background task 트리거 (fire-and-forget)
# 실측(gemma-4 10초+) 기반 결정. memory: feedback_analyzer_async_only.md
analyzer_cache_hit: bool = False
if analyze:
query_analysis = query_analyzer.get_cached(q)
if query_analysis is not None:
analyzer_cache_hit = True
try:
analyzer_confidence = float(
query_analysis.get("analyzer_confidence", 0.0) or 0.0
)
except (TypeError, ValueError):
analyzer_confidence = 0.0
analyzer_tier = _analyzer_tier(analyzer_confidence)
notes.append(
f"analyzer cache_hit conf={analyzer_confidence:.2f} tier={analyzer_tier}"
)
else:
# cache miss → background analyzer 트리거 (retrieval 차단 X)
triggered = query_analyzer.trigger_background_analysis(q)
analyzer_tier = "cache_miss"
notes.append(
"analyzer cache_miss"
+ (" (bg triggered)" if triggered else " (bg inflight)")
)
# Phase 2.2: multilingual vector search 활성 조건 (보수적)
# - cache hit + analyzer_tier == "analyzed" (≥0.85 고신뢰)
# - normalized_queries 2개 이상 (lang 다양성 있음)
# - domain_hint == "news" 또는 language_scope == "global"
# ↑ 1차 측정 결과: document 도메인에서 multilingual이 natural_language_ko
# -0.10 악화시킴. 영어 번역이 한국어 법령 검색에서 noise로 작용.
# news / global 영역에서만 multilingual 활성 (news_crosslingual +0.10 개선 확인).
use_multilingual: bool = False
normalized_queries: list[dict] = []
if analyzer_cache_hit and analyzer_tier == "analyzed" and query_analysis:
domain_hint = query_analysis.get("domain_hint", "mixed")
language_scope = query_analysis.get("language_scope", "limited")
is_multilingual_candidate = (
domain_hint == "news" or language_scope == "global"
)
if is_multilingual_candidate:
raw_nq = query_analysis.get("normalized_queries") or []
if isinstance(raw_nq, list) and len(raw_nq) >= 2:
normalized_queries = [
nq for nq in raw_nq if isinstance(nq, dict) and nq.get("text")
]
if len(normalized_queries) >= 2:
use_multilingual = True
notes.append(
f"multilingual langs={[nq.get('lang') for nq in normalized_queries]}"
f" hint={domain_hint}/{language_scope}"
)
if mode == "vector":
t0 = time.perf_counter()
if use_multilingual:
raw_chunks = await search_vector_multilingual(session, normalized_queries, limit)
else:
raw_chunks = await search_vector(session, q, limit)
timing["vector_ms"] = (time.perf_counter() - t0) * 1000
if not raw_chunks:
notes.append("vector_search_returned_empty (AI client error or no embeddings)")
# vector 단독 모드도 doc 압축해서 다양성 확보 (chunk 중복 방지)
vector_results, chunks_by_doc = compress_chunks_to_docs(raw_chunks, limit)
results = vector_results
else:
t0 = time.perf_counter()
text_results = await search_text(session, q, limit)
timing["text_ms"] = (time.perf_counter() - t0) * 1000
if mode == "hybrid":
t1 = time.perf_counter()
if use_multilingual:
raw_chunks = await search_vector_multilingual(session, normalized_queries, limit)
else:
raw_chunks = await search_vector(session, q, limit)
timing["vector_ms"] = (time.perf_counter() - t1) * 1000
# chunk-level → doc-level 압축 (raw chunks는 chunks_by_doc에 보존)
t1b = time.perf_counter()
vector_results, chunks_by_doc = compress_chunks_to_docs(raw_chunks, limit)
timing["compress_ms"] = (time.perf_counter() - t1b) * 1000
if not vector_results:
notes.append("vector_search_returned_empty — text-only fallback")
t2 = time.perf_counter()
strategy = get_strategy(fusion)
# fusion은 doc 기준 — 더 넓게 가져옴 (rerank 후보용)
fusion_limit = max(limit * 5, 100) if rerank else limit
fused_docs = strategy.fuse(text_results, vector_results, q, fusion_limit)
timing["fusion_ms"] = (time.perf_counter() - t2) * 1000
notes.append(f"fusion={strategy.name}")
notes.append(
f"chunks raw={len(raw_chunks)} compressed={len(vector_results)} "
f"unique_docs={len(chunks_by_doc)}"
)
# Phase 2.3: soft_filter boost (cache hit + tier != ignore 일 때만)
# analyzer_confidence < 0.5 (tier=ignore)는 비활성.
if (
analyzer_cache_hit
and analyzer_tier != "ignore"
and query_analysis
):
soft_filters = query_analysis.get("soft_filters") or {}
if soft_filters:
boosted = apply_soft_filter_boost(fused_docs, soft_filters)
if boosted > 0:
notes.append(f"soft_filter_boost applied={boosted}")
if rerank:
# Phase 1.3: reranker — chunk 기준 입력
# fusion 결과 doc_id로 chunks_by_doc에서 raw chunks 회수
t3 = time.perf_counter()
rerank_input: list["SearchResult"] = []
for doc in fused_docs:
chunks = chunks_by_doc.get(doc.id, [])
if chunks:
# doc당 max 2 chunk (latency/VRAM 보호)
rerank_input.extend(chunks[:MAX_CHUNKS_PER_DOC])
else:
# text-only 매치 doc → doc 자체를 chunk처럼 wrap
rerank_input.append(doc)
if len(rerank_input) >= MAX_RERANK_INPUT:
break
rerank_input = rerank_input[:MAX_RERANK_INPUT]
notes.append(f"rerank input={len(rerank_input)}")
reranked = await rerank_chunks(q, rerank_input, limit * 3)
timing["rerank_ms"] = (time.perf_counter() - t3) * 1000
# diversity (chunk → doc 압축, max_per_doc=2, top score>0.90 unlimited)
t4 = time.perf_counter()
results = apply_diversity(reranked, max_per_doc=MAX_CHUNKS_PER_DOC)[:limit]
timing["diversity_ms"] = (time.perf_counter() - t4) * 1000
else:
# rerank 비활성: fused_docs를 그대로 (limit 적용)
results = fused_docs[:limit]
else:
results = text_results
# display score 정규화 — 프론트엔드는 score*100을 % 표시.
# fusion 내부 score(RRF는 0.01~0.05 범위)를 그대로 노출하면 표시가 깨짐.
# Phase 3.1: rerank_score 필드는 여기서 건드리지 않음 (raw 보존).
normalize_display_scores(results)
timing["total_ms"] = (time.perf_counter() - t_total) * 1000
# confidence는 fusion 적용 전 raw 신호로 계산 (Phase 0.5 이후 fused score는 절대값 의미 없음)
# rerank 활성 시 reranker score가 가장 신뢰할 수 있는 신호 → 우선 사용
if mode == "hybrid":
if rerank and "rerank_ms" in timing:
confidence_signal = compute_confidence_reranked(results)
else:
confidence_signal = compute_confidence_hybrid(text_results, vector_results)
elif mode == "vector":
confidence_signal = compute_confidence(vector_results, "vector")
else:
confidence_signal = compute_confidence(text_results, mode)
return PipelineResult(
results=results,
mode=mode,
confidence_signal=confidence_signal,
text_results=text_results,
vector_results=vector_results,
raw_chunks=raw_chunks,
chunks_by_doc=chunks_by_doc,
query_analysis=query_analysis,
analyzer_cache_hit=analyzer_cache_hit,
analyzer_confidence=analyzer_confidence,
analyzer_tier=analyzer_tier,
timing_ms=timing,
notes=notes,
)