- llm_gate.py: MLX single-inference 전역 semaphore (analyzer/evidence/synthesis 공유) - search_pipeline.py: run_search() 추출, /search 와 /ask 단일 진실 소스 - evidence_service.py: Rule + LLM span select (EV-A), doc-group ordering, span too-short 자동 확장(<80자→120자), fallback 은 query 중심 window 강제 - synthesis_service.py: grounded answer + citation 검증 + LRU 캐시(1h/300), refused 처리, span_text ONLY 룰 (full_snippet 프롬프트 금지) - /api/search/ask: 15s timeout, 9가지 failure mode + 한국어 no_results_reason - rerank_service: rerank_score raw 보존 (display drift 방지) - query_analyzer: _get_llm_semaphore 를 llm_gate.get_mlx_gate 로 위임 - prompts: evidence_extract.txt, search_synthesis.txt (JSON-only, example 포함) config.yaml / docker / ollama / infra_inventory 변경 없음. plan: ~/.claude/plans/quiet-meandering-nova.md Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
439 lines
14 KiB
Python
439 lines
14 KiB
Python
"""하이브리드 검색 API — thin endpoint (Phase 3.1 이후).
|
|
|
|
실제 검색 파이프라인(retrieval → fusion → rerank → diversity → confidence)
|
|
은 `services/search/search_pipeline.py::run_search()` 로 분리되어 있다.
|
|
이 파일은 다음만 담당:
|
|
- Pydantic 스키마 (SearchResult / SearchResponse / SearchDebug / DebugCandidate
|
|
/ Citation / AskResponse / AskDebug)
|
|
- `/search` endpoint wrapper (run_search 호출 + logger + telemetry + 직렬화)
|
|
- `/ask` endpoint wrapper (Phase 3.3 에서 추가)
|
|
"""
|
|
|
|
import time
|
|
from typing import Annotated, Literal
|
|
|
|
from fastapi import APIRouter, BackgroundTasks, Depends, Query
|
|
from pydantic import BaseModel
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from core.auth import get_current_user
|
|
from core.database import get_session
|
|
from core.utils import setup_logger
|
|
from models.user import User
|
|
from services.search.evidence_service import EvidenceItem, extract_evidence
|
|
from services.search.fusion_service import DEFAULT_FUSION
|
|
from services.search.search_pipeline import PipelineResult, run_search
|
|
from services.search.synthesis_service import SynthesisResult, synthesize
|
|
from services.search_telemetry import record_search_event
|
|
|
|
# logs/search.log + stdout 동시 출력 (Phase 0.4)
|
|
logger = setup_logger("search")
|
|
|
|
router = APIRouter()
|
|
|
|
|
|
class SearchResult(BaseModel):
|
|
"""검색 결과 단일 행.
|
|
|
|
Phase 1.2-C: chunk-level vector retrieval 도입으로 chunk 메타 필드 추가.
|
|
text 검색 결과는 chunk_id 등이 None (doc-level).
|
|
vector 검색 결과는 chunk_id 등이 채워짐 (chunk-level).
|
|
"""
|
|
|
|
id: int # doc_id (text/vector 공통)
|
|
title: str | None
|
|
ai_domain: str | None
|
|
ai_summary: str | None
|
|
file_format: str
|
|
score: float
|
|
snippet: str | None
|
|
match_reason: str | None = None
|
|
# Phase 1.2-C: chunk 메타 (vector 검색 시 채워짐)
|
|
chunk_id: int | None = None
|
|
chunk_index: int | None = None
|
|
section_title: str | None = None
|
|
# Phase 3.1: reranker raw score 보존 (display score drift 방지).
|
|
# rerank 경로를 탄 chunk에만 채워짐. normalize_display_scores는 이 필드를
|
|
# 건드리지 않는다. Phase 3 evidence fast-path 판단에 사용.
|
|
rerank_score: float | None = None
|
|
|
|
|
|
# ─── Phase 0.4: 디버그 응답 스키마 ─────────────────────────
|
|
|
|
|
|
class DebugCandidate(BaseModel):
|
|
"""단계별 후보 (debug=true 응답에서만 노출)."""
|
|
id: int
|
|
rank: int
|
|
score: float
|
|
match_reason: str | None = None
|
|
|
|
|
|
class SearchDebug(BaseModel):
|
|
timing_ms: dict[str, float]
|
|
text_candidates: list[DebugCandidate] | None = None
|
|
vector_candidates: list[DebugCandidate] | None = None
|
|
fused_candidates: list[DebugCandidate] | None = None
|
|
confidence: float
|
|
notes: list[str] = []
|
|
# Phase 1/2 도입 후 채워질 placeholder
|
|
query_analysis: dict | None = None
|
|
reranker_scores: list[DebugCandidate] | None = None
|
|
|
|
|
|
class SearchResponse(BaseModel):
|
|
results: list[SearchResult]
|
|
total: int
|
|
query: str
|
|
mode: str
|
|
debug: SearchDebug | None = None
|
|
|
|
|
|
def _to_debug_candidates(rows: list[SearchResult], n: int = 20) -> list[DebugCandidate]:
|
|
return [
|
|
DebugCandidate(
|
|
id=r.id, rank=i + 1, score=r.score, match_reason=r.match_reason
|
|
)
|
|
for i, r in enumerate(rows[:n])
|
|
]
|
|
|
|
|
|
def _build_search_debug(pr: PipelineResult) -> SearchDebug:
|
|
"""PipelineResult → SearchDebug (기존 search()의 debug 구성 블록 복사)."""
|
|
return SearchDebug(
|
|
timing_ms=pr.timing_ms,
|
|
text_candidates=(
|
|
_to_debug_candidates(pr.text_results)
|
|
if pr.text_results or pr.mode != "vector"
|
|
else None
|
|
),
|
|
vector_candidates=(
|
|
_to_debug_candidates(pr.vector_results)
|
|
if pr.vector_results or pr.mode in ("vector", "hybrid")
|
|
else None
|
|
),
|
|
fused_candidates=(
|
|
_to_debug_candidates(pr.results) if pr.mode == "hybrid" else None
|
|
),
|
|
confidence=pr.confidence_signal,
|
|
notes=pr.notes,
|
|
query_analysis=pr.query_analysis,
|
|
)
|
|
|
|
|
|
@router.get("/", response_model=SearchResponse)
|
|
async def search(
|
|
q: str,
|
|
user: Annotated[User, Depends(get_current_user)],
|
|
session: Annotated[AsyncSession, Depends(get_session)],
|
|
background_tasks: BackgroundTasks,
|
|
mode: str = Query("hybrid", pattern="^(fts|trgm|vector|hybrid)$"),
|
|
limit: int = Query(20, ge=1, le=100),
|
|
fusion: str = Query(
|
|
DEFAULT_FUSION,
|
|
pattern="^(legacy|rrf|rrf_boost)$",
|
|
description="hybrid 모드 fusion 전략 (legacy=기존 가중합, rrf=RRF k=60, rrf_boost=RRF+강한신호 boost)",
|
|
),
|
|
rerank: bool = Query(
|
|
True,
|
|
description="bge-reranker-v2-m3 활성화 (Phase 1.3, hybrid 모드만 동작)",
|
|
),
|
|
analyze: bool = Query(
|
|
False,
|
|
description="QueryAnalyzer 활성화 (Phase 2.1, LLM 호출). Phase 2.1은 debug 노출만, 검색 경로 영향 X",
|
|
),
|
|
debug: bool = Query(False, description="단계별 candidates + timing 응답에 포함"),
|
|
):
|
|
"""문서 검색 — FTS + ILIKE + 벡터 결합 (Phase 3.1 이후 run_search wrapper)"""
|
|
pr = await run_search(
|
|
session,
|
|
q,
|
|
mode=mode, # type: ignore[arg-type]
|
|
limit=limit,
|
|
fusion=fusion,
|
|
rerank=rerank,
|
|
analyze=analyze,
|
|
)
|
|
|
|
# 사용자 feedback: 모든 단계 timing은 debug 응답과 별도로 항상 로그로 남긴다
|
|
timing_str = " ".join(f"{k}={v:.0f}" for k, v in pr.timing_ms.items())
|
|
fusion_str = f" fusion={fusion}" if mode == "hybrid" else ""
|
|
analyzer_str = (
|
|
f" analyzer=hit={pr.analyzer_cache_hit}/conf={pr.analyzer_confidence:.2f}/tier={pr.analyzer_tier}"
|
|
if analyze
|
|
else ""
|
|
)
|
|
logger.info(
|
|
"search query=%r mode=%s%s%s results=%d conf=%.2f %s",
|
|
q[:80],
|
|
pr.mode,
|
|
fusion_str,
|
|
analyzer_str,
|
|
len(pr.results),
|
|
pr.confidence_signal,
|
|
timing_str,
|
|
)
|
|
|
|
# Phase 0.3: 실패 자동 로깅 (응답 latency에 영향 X — background task)
|
|
# Phase 2.1: analyze=true일 때만 analyzer_confidence 전달 (False는 None → 기존 호환)
|
|
background_tasks.add_task(
|
|
record_search_event,
|
|
q,
|
|
user.id,
|
|
pr.results,
|
|
pr.mode,
|
|
pr.confidence_signal,
|
|
pr.analyzer_confidence if analyze else None,
|
|
)
|
|
|
|
debug_obj = _build_search_debug(pr) if debug else None
|
|
|
|
return SearchResponse(
|
|
results=pr.results,
|
|
total=len(pr.results),
|
|
query=q,
|
|
mode=pr.mode,
|
|
debug=debug_obj,
|
|
)
|
|
|
|
|
|
# ═══════════════════════════════════════════════════════════
|
|
# Phase 3.3: /api/search/ask — Evidence + Grounded Synthesis
|
|
# ═══════════════════════════════════════════════════════════
|
|
|
|
|
|
class Citation(BaseModel):
|
|
"""answer 본문의 [n] 에 해당하는 근거 단일 행."""
|
|
|
|
n: int
|
|
chunk_id: int | None
|
|
doc_id: int
|
|
title: str | None
|
|
section_title: str | None
|
|
span_text: str # evidence LLM 이 추출한 50~300자
|
|
full_snippet: str # 원본 800자 (citation 원문 보기 전용)
|
|
relevance: float
|
|
rerank_score: float
|
|
|
|
|
|
class AskDebug(BaseModel):
|
|
"""`/ask?debug=true` 응답 확장."""
|
|
|
|
timing_ms: dict[str, float]
|
|
search_notes: list[str]
|
|
query_analysis: dict | None = None
|
|
confidence_signal: float
|
|
evidence_candidate_count: int
|
|
evidence_kept_count: int
|
|
evidence_skip_reason: str | None
|
|
synthesis_cache_hit: bool
|
|
synthesis_prompt_preview: str | None = None
|
|
synthesis_raw_preview: str | None = None
|
|
hallucination_flags: list[str] = []
|
|
|
|
|
|
class AskResponse(BaseModel):
|
|
"""`/ask` 응답. `/search` 의 SearchResult 는 그대로 재사용."""
|
|
|
|
results: list[SearchResult]
|
|
ai_answer: str | None
|
|
citations: list[Citation]
|
|
synthesis_status: Literal[
|
|
"completed", "timeout", "skipped", "no_evidence", "parse_failed", "llm_error"
|
|
]
|
|
synthesis_ms: float
|
|
confidence: Literal["high", "medium", "low"] | None
|
|
refused: bool
|
|
no_results_reason: str | None
|
|
query: str
|
|
total: int
|
|
debug: AskDebug | None = None
|
|
|
|
|
|
def _map_no_results_reason(
|
|
pr: PipelineResult,
|
|
evidence: list[EvidenceItem],
|
|
ev_skip: str | None,
|
|
sr: SynthesisResult,
|
|
) -> str | None:
|
|
"""사용자에게 보여줄 한국어 메시지 매핑.
|
|
|
|
Failure mode 표 (plan §Failure Modes) 기반.
|
|
"""
|
|
# LLM 자가 refused → 모델이 준 사유 그대로
|
|
if sr.refused and sr.refuse_reason:
|
|
return sr.refuse_reason
|
|
|
|
# synthesis 상태 우선
|
|
if sr.status == "no_evidence":
|
|
if not pr.results:
|
|
return "검색 결과가 없습니다."
|
|
return "관련도 높은 근거를 찾지 못했습니다."
|
|
if sr.status == "skipped":
|
|
return "검색 결과가 없습니다."
|
|
if sr.status == "timeout":
|
|
return "답변 생성이 지연되어 생략했습니다. 검색 결과를 확인해 주세요."
|
|
if sr.status == "parse_failed":
|
|
return "답변 형식 오류로 생략했습니다."
|
|
if sr.status == "llm_error":
|
|
return "AI 서버에 일시적 문제가 있습니다."
|
|
|
|
# evidence 단계 실패는 fallback 을 탔더라도 notes 용
|
|
if ev_skip == "all_low_rerank":
|
|
return "관련도 높은 근거를 찾지 못했습니다."
|
|
if ev_skip == "empty_retrieval":
|
|
return "검색 결과가 없습니다."
|
|
|
|
return None
|
|
|
|
|
|
def _build_citations(
|
|
evidence: list[EvidenceItem], used_citations: list[int]
|
|
) -> list[Citation]:
|
|
"""answer 본문에 실제로 등장한 n 만 Citation 으로 변환."""
|
|
by_n = {e.n: e for e in evidence}
|
|
out: list[Citation] = []
|
|
for n in used_citations:
|
|
e = by_n.get(n)
|
|
if e is None:
|
|
continue
|
|
out.append(
|
|
Citation(
|
|
n=e.n,
|
|
chunk_id=e.chunk_id,
|
|
doc_id=e.doc_id,
|
|
title=e.title,
|
|
section_title=e.section_title,
|
|
span_text=e.span_text,
|
|
full_snippet=e.full_snippet,
|
|
relevance=e.relevance,
|
|
rerank_score=e.rerank_score,
|
|
)
|
|
)
|
|
return out
|
|
|
|
|
|
def _build_ask_debug(
|
|
pr: PipelineResult,
|
|
evidence: list[EvidenceItem],
|
|
ev_skip: str | None,
|
|
sr: SynthesisResult,
|
|
ev_ms: float,
|
|
synth_ms: float,
|
|
total_ms: float,
|
|
) -> AskDebug:
|
|
timing: dict[str, float] = dict(pr.timing_ms)
|
|
timing["evidence_ms"] = ev_ms
|
|
timing["synthesis_ms"] = synth_ms
|
|
timing["ask_total_ms"] = total_ms
|
|
|
|
# candidate count 는 rule filter 통과한 수 (recomputable from results)
|
|
# 엄밀히는 evidence_service 내부 숫자인데, evidence 길이 ≈ kept, candidate
|
|
# 는 관측이 어려움 → kept 는 evidence 길이, candidate 는 별도 필드 없음.
|
|
# 단순화: candidate_count = len(evidence) 를 상한 근사로 둠 (debug 전용).
|
|
return AskDebug(
|
|
timing_ms=timing,
|
|
search_notes=pr.notes,
|
|
query_analysis=pr.query_analysis,
|
|
confidence_signal=pr.confidence_signal,
|
|
evidence_candidate_count=len(evidence),
|
|
evidence_kept_count=len(evidence),
|
|
evidence_skip_reason=ev_skip,
|
|
synthesis_cache_hit=sr.cache_hit,
|
|
synthesis_prompt_preview=None, # 현재 synthesis_service 에서 노출 안 함
|
|
synthesis_raw_preview=sr.raw_preview,
|
|
hallucination_flags=sr.hallucination_flags,
|
|
)
|
|
|
|
|
|
@router.get("/ask", response_model=AskResponse)
|
|
async def ask(
|
|
q: str,
|
|
user: Annotated[User, Depends(get_current_user)],
|
|
session: Annotated[AsyncSession, Depends(get_session)],
|
|
background_tasks: BackgroundTasks,
|
|
limit: int = Query(10, ge=1, le=20, description="synthesis 입력 상한"),
|
|
debug: bool = Query(False, description="evidence/synthesis 중간 상태 노출"),
|
|
):
|
|
"""근거 기반 AI 답변 (Phase 3.3).
|
|
|
|
`/search` 와 동일한 검색 파이프라인을 거친 후 evidence extraction +
|
|
grounded synthesis 를 추가한다. `mode`, `rerank`, `analyze` 는 품질 보장을
|
|
위해 강제 고정 (hybrid / True / True).
|
|
|
|
실패 경로(timeout/parse_failed/refused/...) 에서도 `results` 는 항상 반환.
|
|
"""
|
|
t_total = time.perf_counter()
|
|
|
|
# 1. 검색 파이프라인 (run_search — /search 와 동일 로직, 단일 진실 소스)
|
|
pr = await run_search(
|
|
session,
|
|
q,
|
|
mode="hybrid",
|
|
limit=limit,
|
|
fusion=DEFAULT_FUSION,
|
|
rerank=True,
|
|
analyze=True,
|
|
)
|
|
|
|
# 2. Evidence extraction (rule + LLM span select, 1 batched call)
|
|
t_ev = time.perf_counter()
|
|
evidence, ev_skip = await extract_evidence(q, pr.results)
|
|
ev_ms = (time.perf_counter() - t_ev) * 1000
|
|
|
|
# 3. Grounded synthesis (gemma-4, 15s timeout, citation 검증)
|
|
t_synth = time.perf_counter()
|
|
sr = await synthesize(q, evidence, debug=debug)
|
|
synth_ms = (time.perf_counter() - t_synth) * 1000
|
|
|
|
total_ms = (time.perf_counter() - t_total) * 1000
|
|
|
|
# 4. 응답 구성
|
|
citations = _build_citations(evidence, sr.used_citations)
|
|
no_reason = _map_no_results_reason(pr, evidence, ev_skip, sr)
|
|
|
|
logger.info(
|
|
"ask query=%r results=%d evidence=%d cite=%d synth=%s conf=%s refused=%s ev_ms=%.0f synth_ms=%.0f total=%.0f",
|
|
q[:80],
|
|
len(pr.results),
|
|
len(evidence),
|
|
len(citations),
|
|
sr.status,
|
|
sr.confidence or "-",
|
|
sr.refused,
|
|
ev_ms,
|
|
synth_ms,
|
|
total_ms,
|
|
)
|
|
|
|
# 5. telemetry — 기존 record_search_event 재사용 (Phase 0.3 호환)
|
|
background_tasks.add_task(
|
|
record_search_event,
|
|
q,
|
|
user.id,
|
|
pr.results,
|
|
"hybrid",
|
|
pr.confidence_signal,
|
|
pr.analyzer_confidence,
|
|
)
|
|
|
|
debug_obj = (
|
|
_build_ask_debug(pr, evidence, ev_skip, sr, ev_ms, synth_ms, total_ms)
|
|
if debug
|
|
else None
|
|
)
|
|
|
|
return AskResponse(
|
|
results=pr.results,
|
|
ai_answer=sr.answer,
|
|
citations=citations,
|
|
synthesis_status=sr.status,
|
|
synthesis_ms=sr.elapsed_ms,
|
|
confidence=sr.confidence,
|
|
refused=sr.refused,
|
|
no_results_reason=no_reason,
|
|
query=q,
|
|
total=len(pr.results),
|
|
debug=debug_obj,
|
|
)
|