"""하이브리드 검색 API — thin endpoint (Phase 3.1 이후). 실제 검색 파이프라인(retrieval → fusion → rerank → diversity → confidence) 은 `services/search/search_pipeline.py::run_search()` 로 분리되어 있다. 이 파일은 다음만 담당: - Pydantic 스키마 (SearchResult / SearchResponse / SearchDebug / DebugCandidate / Citation / AskResponse / AskDebug) - `/search` endpoint wrapper (run_search 호출 + logger + telemetry + 직렬화) - `/ask` endpoint wrapper (Phase 3.3 에서 추가) """ import time from typing import Annotated, Literal from fastapi import APIRouter, BackgroundTasks, Depends, Query from pydantic import BaseModel from sqlalchemy.ext.asyncio import AsyncSession from core.auth import get_current_user from core.database import get_session from core.utils import setup_logger from models.user import User from services.search.evidence_service import EvidenceItem, extract_evidence from services.search.fusion_service import DEFAULT_FUSION from services.search.search_pipeline import PipelineResult, run_search from services.search.synthesis_service import SynthesisResult, synthesize from services.search_telemetry import record_search_event # logs/search.log + stdout 동시 출력 (Phase 0.4) logger = setup_logger("search") router = APIRouter() class SearchResult(BaseModel): """검색 결과 단일 행. Phase 1.2-C: chunk-level vector retrieval 도입으로 chunk 메타 필드 추가. text 검색 결과는 chunk_id 등이 None (doc-level). vector 검색 결과는 chunk_id 등이 채워짐 (chunk-level). """ id: int # doc_id (text/vector 공통) title: str | None ai_domain: str | None ai_summary: str | None file_format: str score: float snippet: str | None match_reason: str | None = None # Phase 1.2-C: chunk 메타 (vector 검색 시 채워짐) chunk_id: int | None = None chunk_index: int | None = None section_title: str | None = None # Phase 3.1: reranker raw score 보존 (display score drift 방지). # rerank 경로를 탄 chunk에만 채워짐. normalize_display_scores는 이 필드를 # 건드리지 않는다. Phase 3 evidence fast-path 판단에 사용. rerank_score: float | None = None # ─── Phase 0.4: 디버그 응답 스키마 ───────────────────────── class DebugCandidate(BaseModel): """단계별 후보 (debug=true 응답에서만 노출).""" id: int rank: int score: float match_reason: str | None = None class SearchDebug(BaseModel): timing_ms: dict[str, float] text_candidates: list[DebugCandidate] | None = None vector_candidates: list[DebugCandidate] | None = None fused_candidates: list[DebugCandidate] | None = None confidence: float notes: list[str] = [] # Phase 1/2 도입 후 채워질 placeholder query_analysis: dict | None = None reranker_scores: list[DebugCandidate] | None = None class SearchResponse(BaseModel): results: list[SearchResult] total: int query: str mode: str debug: SearchDebug | None = None def _to_debug_candidates(rows: list[SearchResult], n: int = 20) -> list[DebugCandidate]: return [ DebugCandidate( id=r.id, rank=i + 1, score=r.score, match_reason=r.match_reason ) for i, r in enumerate(rows[:n]) ] def _build_search_debug(pr: PipelineResult) -> SearchDebug: """PipelineResult → SearchDebug (기존 search()의 debug 구성 블록 복사).""" return SearchDebug( timing_ms=pr.timing_ms, text_candidates=( _to_debug_candidates(pr.text_results) if pr.text_results or pr.mode != "vector" else None ), vector_candidates=( _to_debug_candidates(pr.vector_results) if pr.vector_results or pr.mode in ("vector", "hybrid") else None ), fused_candidates=( _to_debug_candidates(pr.results) if pr.mode == "hybrid" else None ), confidence=pr.confidence_signal, notes=pr.notes, query_analysis=pr.query_analysis, ) @router.get("/", response_model=SearchResponse) async def search( q: str, user: Annotated[User, Depends(get_current_user)], session: Annotated[AsyncSession, Depends(get_session)], background_tasks: BackgroundTasks, mode: str = Query("hybrid", pattern="^(fts|trgm|vector|hybrid)$"), limit: int = Query(20, ge=1, le=100), fusion: str = Query( DEFAULT_FUSION, pattern="^(legacy|rrf|rrf_boost)$", description="hybrid 모드 fusion 전략 (legacy=기존 가중합, rrf=RRF k=60, rrf_boost=RRF+강한신호 boost)", ), rerank: bool = Query( True, description="bge-reranker-v2-m3 활성화 (Phase 1.3, hybrid 모드만 동작)", ), analyze: bool = Query( False, description="QueryAnalyzer 활성화 (Phase 2.1, LLM 호출). Phase 2.1은 debug 노출만, 검색 경로 영향 X", ), debug: bool = Query(False, description="단계별 candidates + timing 응답에 포함"), ): """문서 검색 — FTS + ILIKE + 벡터 결합 (Phase 3.1 이후 run_search wrapper)""" pr = await run_search( session, q, mode=mode, # type: ignore[arg-type] limit=limit, fusion=fusion, rerank=rerank, analyze=analyze, ) # 사용자 feedback: 모든 단계 timing은 debug 응답과 별도로 항상 로그로 남긴다 timing_str = " ".join(f"{k}={v:.0f}" for k, v in pr.timing_ms.items()) fusion_str = f" fusion={fusion}" if mode == "hybrid" else "" analyzer_str = ( f" analyzer=hit={pr.analyzer_cache_hit}/conf={pr.analyzer_confidence:.2f}/tier={pr.analyzer_tier}" if analyze else "" ) logger.info( "search query=%r mode=%s%s%s results=%d conf=%.2f %s", q[:80], pr.mode, fusion_str, analyzer_str, len(pr.results), pr.confidence_signal, timing_str, ) # Phase 0.3: 실패 자동 로깅 (응답 latency에 영향 X — background task) # Phase 2.1: analyze=true일 때만 analyzer_confidence 전달 (False는 None → 기존 호환) background_tasks.add_task( record_search_event, q, user.id, pr.results, pr.mode, pr.confidence_signal, pr.analyzer_confidence if analyze else None, ) debug_obj = _build_search_debug(pr) if debug else None return SearchResponse( results=pr.results, total=len(pr.results), query=q, mode=pr.mode, debug=debug_obj, ) # ═══════════════════════════════════════════════════════════ # Phase 3.3: /api/search/ask — Evidence + Grounded Synthesis # ═══════════════════════════════════════════════════════════ class Citation(BaseModel): """answer 본문의 [n] 에 해당하는 근거 단일 행.""" n: int chunk_id: int | None doc_id: int title: str | None section_title: str | None span_text: str # evidence LLM 이 추출한 50~300자 full_snippet: str # 원본 800자 (citation 원문 보기 전용) relevance: float rerank_score: float class AskDebug(BaseModel): """`/ask?debug=true` 응답 확장.""" timing_ms: dict[str, float] search_notes: list[str] query_analysis: dict | None = None confidence_signal: float evidence_candidate_count: int evidence_kept_count: int evidence_skip_reason: str | None synthesis_cache_hit: bool synthesis_prompt_preview: str | None = None synthesis_raw_preview: str | None = None hallucination_flags: list[str] = [] class AskResponse(BaseModel): """`/ask` 응답. `/search` 의 SearchResult 는 그대로 재사용.""" results: list[SearchResult] ai_answer: str | None citations: list[Citation] synthesis_status: Literal[ "completed", "timeout", "skipped", "no_evidence", "parse_failed", "llm_error" ] synthesis_ms: float confidence: Literal["high", "medium", "low"] | None refused: bool no_results_reason: str | None query: str total: int debug: AskDebug | None = None def _map_no_results_reason( pr: PipelineResult, evidence: list[EvidenceItem], ev_skip: str | None, sr: SynthesisResult, ) -> str | None: """사용자에게 보여줄 한국어 메시지 매핑. Failure mode 표 (plan §Failure Modes) 기반. """ # LLM 자가 refused → 모델이 준 사유 그대로 if sr.refused and sr.refuse_reason: return sr.refuse_reason # synthesis 상태 우선 if sr.status == "no_evidence": if not pr.results: return "검색 결과가 없습니다." return "관련도 높은 근거를 찾지 못했습니다." if sr.status == "skipped": return "검색 결과가 없습니다." if sr.status == "timeout": return "답변 생성이 지연되어 생략했습니다. 검색 결과를 확인해 주세요." if sr.status == "parse_failed": return "답변 형식 오류로 생략했습니다." if sr.status == "llm_error": return "AI 서버에 일시적 문제가 있습니다." # evidence 단계 실패는 fallback 을 탔더라도 notes 용 if ev_skip == "all_low_rerank": return "관련도 높은 근거를 찾지 못했습니다." if ev_skip == "empty_retrieval": return "검색 결과가 없습니다." return None def _build_citations( evidence: list[EvidenceItem], used_citations: list[int] ) -> list[Citation]: """answer 본문에 실제로 등장한 n 만 Citation 으로 변환.""" by_n = {e.n: e for e in evidence} out: list[Citation] = [] for n in used_citations: e = by_n.get(n) if e is None: continue out.append( Citation( n=e.n, chunk_id=e.chunk_id, doc_id=e.doc_id, title=e.title, section_title=e.section_title, span_text=e.span_text, full_snippet=e.full_snippet, relevance=e.relevance, rerank_score=e.rerank_score, ) ) return out def _build_ask_debug( pr: PipelineResult, evidence: list[EvidenceItem], ev_skip: str | None, sr: SynthesisResult, ev_ms: float, synth_ms: float, total_ms: float, ) -> AskDebug: timing: dict[str, float] = dict(pr.timing_ms) timing["evidence_ms"] = ev_ms timing["synthesis_ms"] = synth_ms timing["ask_total_ms"] = total_ms # candidate count 는 rule filter 통과한 수 (recomputable from results) # 엄밀히는 evidence_service 내부 숫자인데, evidence 길이 ≈ kept, candidate # 는 관측이 어려움 → kept 는 evidence 길이, candidate 는 별도 필드 없음. # 단순화: candidate_count = len(evidence) 를 상한 근사로 둠 (debug 전용). return AskDebug( timing_ms=timing, search_notes=pr.notes, query_analysis=pr.query_analysis, confidence_signal=pr.confidence_signal, evidence_candidate_count=len(evidence), evidence_kept_count=len(evidence), evidence_skip_reason=ev_skip, synthesis_cache_hit=sr.cache_hit, synthesis_prompt_preview=None, # 현재 synthesis_service 에서 노출 안 함 synthesis_raw_preview=sr.raw_preview, hallucination_flags=sr.hallucination_flags, ) @router.get("/ask", response_model=AskResponse) async def ask( q: str, user: Annotated[User, Depends(get_current_user)], session: Annotated[AsyncSession, Depends(get_session)], background_tasks: BackgroundTasks, limit: int = Query(10, ge=1, le=20, description="synthesis 입력 상한"), debug: bool = Query(False, description="evidence/synthesis 중간 상태 노출"), ): """근거 기반 AI 답변 (Phase 3.3). `/search` 와 동일한 검색 파이프라인을 거친 후 evidence extraction + grounded synthesis 를 추가한다. `mode`, `rerank`, `analyze` 는 품질 보장을 위해 강제 고정 (hybrid / True / True). 실패 경로(timeout/parse_failed/refused/...) 에서도 `results` 는 항상 반환. """ t_total = time.perf_counter() # 1. 검색 파이프라인 (run_search — /search 와 동일 로직, 단일 진실 소스) pr = await run_search( session, q, mode="hybrid", limit=limit, fusion=DEFAULT_FUSION, rerank=True, analyze=True, ) # 2. Evidence extraction (rule + LLM span select, 1 batched call) t_ev = time.perf_counter() evidence, ev_skip = await extract_evidence(q, pr.results) ev_ms = (time.perf_counter() - t_ev) * 1000 # 3. Grounded synthesis (gemma-4, 15s timeout, citation 검증) t_synth = time.perf_counter() sr = await synthesize(q, evidence, debug=debug) synth_ms = (time.perf_counter() - t_synth) * 1000 total_ms = (time.perf_counter() - t_total) * 1000 # 4. 응답 구성 citations = _build_citations(evidence, sr.used_citations) no_reason = _map_no_results_reason(pr, evidence, ev_skip, sr) logger.info( "ask query=%r results=%d evidence=%d cite=%d synth=%s conf=%s refused=%s ev_ms=%.0f synth_ms=%.0f total=%.0f", q[:80], len(pr.results), len(evidence), len(citations), sr.status, sr.confidence or "-", sr.refused, ev_ms, synth_ms, total_ms, ) # 5. telemetry — 기존 record_search_event 재사용 (Phase 0.3 호환) background_tasks.add_task( record_search_event, q, user.id, pr.results, "hybrid", pr.confidence_signal, pr.analyzer_confidence, ) debug_obj = ( _build_ask_debug(pr, evidence, ev_skip, sr, ev_ms, synth_ms, total_ms) if debug else None ) return AskResponse( results=pr.results, ai_answer=sr.answer, citations=citations, synthesis_status=sr.status, synthesis_ms=sr.elapsed_ms, confidence=sr.confidence, refused=sr.refused, no_results_reason=no_reason, query=q, total=len(pr.results), debug=debug_obj, )