Files
hyungi_document_server/app/api/search.py
Hyungi Ahn 06443947bf feat(ask): Phase 3.5a guardrails (classifier + refusal gate + grounding + partial)
신규 파일:
- classifier_service.py: exaone binary classifier (sufficient/insufficient)
  parallel with evidence, circuit breaker, timeout 5s
- refusal_gate.py: multi-signal fusion (score + classifier)
  AND 조건, conservative fallback 3-tier (classifier 부재 시)
- grounding_check.py: strong/weak flag 분리
  strong: fabricated_number + intent_misalignment(important keywords)
  weak: uncited_claim + low_overlap + intent_misalignment(generic)
  re-gate: 2+ strong → refuse, 1 strong → partial
- sentence_splitter.py: regex 기반 (Phase 3.5b KSS 업그레이드)
- classifier.txt: exaone Y+ prompt (calibration examples 포함)
- search_synthesis_partial.txt: partial answer 전용 프롬프트
- 102_ask_events.sql: /ask 관측 테이블 (completeness 3-분리 지표)
- queries.yaml: Phase 3.5 smoke test 평가셋 10개

수정 파일:
- search.py /ask: classifier parallel + refusal gate + grounding re-gate
  + defense_layers 로깅 + AskResponse completeness/aspects/confirmed_items
- config.yaml: classifier model 섹션 (exaone3.5:7.8b GPU Ollama)
- config.py: classifier optional 파싱
- AskAnswer.svelte: 4분기 렌더 (full/partial/insufficient/loading)
- ask.ts: Completeness + ConfirmedItem 타입

P1 실측: exaone ternary 불안정 → binary gate 축소. partial은 grounding이 담당.
토론 9라운드 확정. plan: quiet-meandering-nova.md

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-10 08:49:11 +09:00

600 lines
21 KiB
Python

"""하이브리드 검색 API — thin endpoint (Phase 3.1 이후).
실제 검색 파이프라인(retrieval → fusion → rerank → diversity → confidence)
은 `services/search/search_pipeline.py::run_search()` 로 분리되어 있다.
이 파일은 다음만 담당:
- Pydantic 스키마 (SearchResult / SearchResponse / SearchDebug / DebugCandidate
/ Citation / AskResponse / AskDebug)
- `/search` endpoint wrapper (run_search 호출 + logger + telemetry + 직렬화)
- `/ask` endpoint wrapper (Phase 3.3 에서 추가)
"""
import asyncio
import time
from typing import Annotated, Literal
from fastapi import APIRouter, BackgroundTasks, Depends, Query
from pydantic import BaseModel
from sqlalchemy.ext.asyncio import AsyncSession
from core.auth import get_current_user
from core.database import get_session
from core.utils import setup_logger
from models.user import User
from services.search.classifier_service import ClassifierResult, classify
from services.search.evidence_service import EvidenceItem, extract_evidence
from services.search.fusion_service import DEFAULT_FUSION
from services.search.grounding_check import check as grounding_check
from services.search.refusal_gate import RefusalDecision, decide as refusal_decide
from services.search.search_pipeline import PipelineResult, run_search
from services.search.synthesis_service import SynthesisResult, synthesize
from services.search_telemetry import record_search_event
# logs/search.log + stdout 동시 출력 (Phase 0.4)
logger = setup_logger("search")
router = APIRouter()
class SearchResult(BaseModel):
"""검색 결과 단일 행.
Phase 1.2-C: chunk-level vector retrieval 도입으로 chunk 메타 필드 추가.
text 검색 결과는 chunk_id 등이 None (doc-level).
vector 검색 결과는 chunk_id 등이 채워짐 (chunk-level).
"""
id: int # doc_id (text/vector 공통)
title: str | None
ai_domain: str | None
ai_summary: str | None
file_format: str
score: float
snippet: str | None
match_reason: str | None = None
# Phase 1.2-C: chunk 메타 (vector 검색 시 채워짐)
chunk_id: int | None = None
chunk_index: int | None = None
section_title: str | None = None
# Phase 3.1: reranker raw score 보존 (display score drift 방지).
# rerank 경로를 탄 chunk에만 채워짐. normalize_display_scores는 이 필드를
# 건드리지 않는다. Phase 3 evidence fast-path 판단에 사용.
rerank_score: float | None = None
# ─── Phase 0.4: 디버그 응답 스키마 ─────────────────────────
class DebugCandidate(BaseModel):
"""단계별 후보 (debug=true 응답에서만 노출)."""
id: int
rank: int
score: float
match_reason: str | None = None
class SearchDebug(BaseModel):
timing_ms: dict[str, float]
text_candidates: list[DebugCandidate] | None = None
vector_candidates: list[DebugCandidate] | None = None
fused_candidates: list[DebugCandidate] | None = None
confidence: float
notes: list[str] = []
# Phase 1/2 도입 후 채워질 placeholder
query_analysis: dict | None = None
reranker_scores: list[DebugCandidate] | None = None
class SearchResponse(BaseModel):
results: list[SearchResult]
total: int
query: str
mode: str
debug: SearchDebug | None = None
def _to_debug_candidates(rows: list[SearchResult], n: int = 20) -> list[DebugCandidate]:
return [
DebugCandidate(
id=r.id, rank=i + 1, score=r.score, match_reason=r.match_reason
)
for i, r in enumerate(rows[:n])
]
def _build_search_debug(pr: PipelineResult) -> SearchDebug:
"""PipelineResult → SearchDebug (기존 search()의 debug 구성 블록 복사)."""
return SearchDebug(
timing_ms=pr.timing_ms,
text_candidates=(
_to_debug_candidates(pr.text_results)
if pr.text_results or pr.mode != "vector"
else None
),
vector_candidates=(
_to_debug_candidates(pr.vector_results)
if pr.vector_results or pr.mode in ("vector", "hybrid")
else None
),
fused_candidates=(
_to_debug_candidates(pr.results) if pr.mode == "hybrid" else None
),
confidence=pr.confidence_signal,
notes=pr.notes,
query_analysis=pr.query_analysis,
)
@router.get("/", response_model=SearchResponse)
async def search(
q: str,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
background_tasks: BackgroundTasks,
mode: str = Query("hybrid", pattern="^(fts|trgm|vector|hybrid)$"),
limit: int = Query(20, ge=1, le=100),
fusion: str = Query(
DEFAULT_FUSION,
pattern="^(legacy|rrf|rrf_boost)$",
description="hybrid 모드 fusion 전략 (legacy=기존 가중합, rrf=RRF k=60, rrf_boost=RRF+강한신호 boost)",
),
rerank: bool = Query(
True,
description="bge-reranker-v2-m3 활성화 (Phase 1.3, hybrid 모드만 동작)",
),
analyze: bool = Query(
False,
description="QueryAnalyzer 활성화 (Phase 2.1, LLM 호출). Phase 2.1은 debug 노출만, 검색 경로 영향 X",
),
debug: bool = Query(False, description="단계별 candidates + timing 응답에 포함"),
):
"""문서 검색 — FTS + ILIKE + 벡터 결합 (Phase 3.1 이후 run_search wrapper)"""
pr = await run_search(
session,
q,
mode=mode, # type: ignore[arg-type]
limit=limit,
fusion=fusion,
rerank=rerank,
analyze=analyze,
)
# 사용자 feedback: 모든 단계 timing은 debug 응답과 별도로 항상 로그로 남긴다
timing_str = " ".join(f"{k}={v:.0f}" for k, v in pr.timing_ms.items())
fusion_str = f" fusion={fusion}" if mode == "hybrid" else ""
analyzer_str = (
f" analyzer=hit={pr.analyzer_cache_hit}/conf={pr.analyzer_confidence:.2f}/tier={pr.analyzer_tier}"
if analyze
else ""
)
logger.info(
"search query=%r mode=%s%s%s results=%d conf=%.2f %s",
q[:80],
pr.mode,
fusion_str,
analyzer_str,
len(pr.results),
pr.confidence_signal,
timing_str,
)
# Phase 0.3: 실패 자동 로깅 (응답 latency에 영향 X — background task)
# Phase 2.1: analyze=true일 때만 analyzer_confidence 전달 (False는 None → 기존 호환)
background_tasks.add_task(
record_search_event,
q,
user.id,
pr.results,
pr.mode,
pr.confidence_signal,
pr.analyzer_confidence if analyze else None,
)
debug_obj = _build_search_debug(pr) if debug else None
return SearchResponse(
results=pr.results,
total=len(pr.results),
query=q,
mode=pr.mode,
debug=debug_obj,
)
# ═══════════════════════════════════════════════════════════
# Phase 3.3: /api/search/ask — Evidence + Grounded Synthesis
# ═══════════════════════════════════════════════════════════
class Citation(BaseModel):
"""answer 본문의 [n] 에 해당하는 근거 단일 행."""
n: int
chunk_id: int | None
doc_id: int
title: str | None
section_title: str | None
span_text: str # evidence LLM 이 추출한 50~300자
full_snippet: str # 원본 800자 (citation 원문 보기 전용)
relevance: float
rerank_score: float
class ConfirmedItem(BaseModel):
"""Partial answer 의 개별 aspect 답변."""
aspect: str
text: str
citations: list[int]
class AskDebug(BaseModel):
"""`/ask?debug=true` 응답 확장."""
timing_ms: dict[str, float]
search_notes: list[str]
query_analysis: dict | None = None
confidence_signal: float
evidence_candidate_count: int
evidence_kept_count: int
evidence_skip_reason: str | None
synthesis_cache_hit: bool
synthesis_prompt_preview: str | None = None
synthesis_raw_preview: str | None = None
hallucination_flags: list[str] = []
# Phase 3.5a: per-layer defense 로깅
defense_layers: dict | None = None
class AskResponse(BaseModel):
"""`/ask` 응답. Phase 3.5a: completeness + aspects 추가."""
results: list[SearchResult]
ai_answer: str | None
citations: list[Citation]
synthesis_status: Literal[
"completed", "timeout", "skipped", "no_evidence", "parse_failed", "llm_error"
]
synthesis_ms: float
confidence: Literal["high", "medium", "low"] | None
refused: bool
no_results_reason: str | None
query: str
total: int
# Phase 3.5a
completeness: Literal["full", "partial", "insufficient"] = "full"
covered_aspects: list[str] | None = None
missing_aspects: list[str] | None = None
confirmed_items: list[ConfirmedItem] | None = None
debug: AskDebug | None = None
def _map_no_results_reason(
pr: PipelineResult,
evidence: list[EvidenceItem],
ev_skip: str | None,
sr: SynthesisResult,
) -> str | None:
"""사용자에게 보여줄 한국어 메시지 매핑.
Failure mode 표 (plan §Failure Modes) 기반.
"""
# LLM 자가 refused → 모델이 준 사유 그대로
if sr.refused and sr.refuse_reason:
return sr.refuse_reason
# synthesis 상태 우선
if sr.status == "no_evidence":
if not pr.results:
return "검색 결과가 없습니다."
return "관련도 높은 근거를 찾지 못했습니다."
if sr.status == "skipped":
return "검색 결과가 없습니다."
if sr.status == "timeout":
return "답변 생성이 지연되어 생략했습니다. 검색 결과를 확인해 주세요."
if sr.status == "parse_failed":
return "답변 형식 오류로 생략했습니다."
if sr.status == "llm_error":
return "AI 서버에 일시적 문제가 있습니다."
# evidence 단계 실패는 fallback 을 탔더라도 notes 용
if ev_skip == "all_low_rerank":
return "관련도 높은 근거를 찾지 못했습니다."
if ev_skip == "empty_retrieval":
return "검색 결과가 없습니다."
return None
def _build_citations(
evidence: list[EvidenceItem], used_citations: list[int]
) -> list[Citation]:
"""answer 본문에 실제로 등장한 n 만 Citation 으로 변환."""
by_n = {e.n: e for e in evidence}
out: list[Citation] = []
for n in used_citations:
e = by_n.get(n)
if e is None:
continue
out.append(
Citation(
n=e.n,
chunk_id=e.chunk_id,
doc_id=e.doc_id,
title=e.title,
section_title=e.section_title,
span_text=e.span_text,
full_snippet=e.full_snippet,
relevance=e.relevance,
rerank_score=e.rerank_score,
)
)
return out
def _build_ask_debug(
pr: PipelineResult,
evidence: list[EvidenceItem],
ev_skip: str | None,
sr: SynthesisResult,
ev_ms: float,
synth_ms: float,
total_ms: float,
) -> AskDebug:
timing: dict[str, float] = dict(pr.timing_ms)
timing["evidence_ms"] = ev_ms
timing["synthesis_ms"] = synth_ms
timing["ask_total_ms"] = total_ms
# candidate count 는 rule filter 통과한 수 (recomputable from results)
# 엄밀히는 evidence_service 내부 숫자인데, evidence 길이 ≈ kept, candidate
# 는 관측이 어려움 → kept 는 evidence 길이, candidate 는 별도 필드 없음.
# 단순화: candidate_count = len(evidence) 를 상한 근사로 둠 (debug 전용).
return AskDebug(
timing_ms=timing,
search_notes=pr.notes,
query_analysis=pr.query_analysis,
confidence_signal=pr.confidence_signal,
evidence_candidate_count=len(evidence),
evidence_kept_count=len(evidence),
evidence_skip_reason=ev_skip,
synthesis_cache_hit=sr.cache_hit,
synthesis_prompt_preview=None, # 현재 synthesis_service 에서 노출 안 함
synthesis_raw_preview=sr.raw_preview,
hallucination_flags=sr.hallucination_flags,
)
@router.get("/ask", response_model=AskResponse)
async def ask(
q: str,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
background_tasks: BackgroundTasks,
limit: int = Query(10, ge=1, le=20, description="synthesis 입력 상한"),
debug: bool = Query(False, description="evidence/synthesis 중간 상태 노출"),
):
"""근거 기반 AI 답변 (Phase 3.5a).
Phase 3.3 기반 + classifier parallel + refusal gate + grounding re-gate.
실패 경로에서도 `results` 는 항상 반환.
"""
t_total = time.perf_counter()
defense_log: dict = {} # per-layer flag snapshot
# 1. 검색 파이프라인
pr = await run_search(
session, q, mode="hybrid", limit=limit,
fusion=DEFAULT_FUSION, rerank=True, analyze=True,
)
# 2. Evidence + Classifier 병렬
t_ev = time.perf_counter()
evidence_task = asyncio.create_task(extract_evidence(q, pr.results))
# classifier input: top 3 chunks meta + rerank scores
top_chunks = [
{
"title": r.title or "",
"section": r.section_title or "",
"snippet": (r.snippet or "")[:200],
}
for r in pr.results[:3]
]
rerank_scores_top = [
r.rerank_score if r.rerank_score is not None else r.score
for r in pr.results[:3]
]
classifier_task = asyncio.create_task(
classify(q, top_chunks, rerank_scores_top)
)
evidence, ev_skip = await evidence_task
ev_ms = (time.perf_counter() - t_ev) * 1000
# classifier await (timeout 보호 — classifier_service 내부에도 있지만 여기서 이중 보호)
try:
classifier_result = await asyncio.wait_for(classifier_task, timeout=6.0)
except (asyncio.TimeoutError, Exception):
classifier_result = ClassifierResult("timeout", None, [], [], 0.0)
defense_log["classifier"] = {
"status": classifier_result.status,
"verdict": classifier_result.verdict,
"covered_aspects": classifier_result.covered_aspects,
"missing_aspects": classifier_result.missing_aspects,
"elapsed_ms": classifier_result.elapsed_ms,
}
# 3. Refusal gate (multi-signal fusion)
all_rerank_scores = [
e.rerank_score for e in evidence
] if evidence else rerank_scores_top
decision = refusal_decide(all_rerank_scores, classifier_result)
defense_log["score_gate"] = {
"max": max(all_rerank_scores) if all_rerank_scores else 0.0,
"agg_top3": sum(sorted(all_rerank_scores, reverse=True)[:3]),
}
defense_log["refusal"] = {
"refused": decision.refused,
"rule_triggered": decision.rule_triggered,
}
if decision.refused:
total_ms = (time.perf_counter() - t_total) * 1000
no_reason = "관련 근거를 찾지 못했습니다."
if not pr.results:
no_reason = "검색 결과가 없습니다."
logger.info(
"ask REFUSED query=%r rule=%s max_score=%.2f total=%.0f",
q[:80], decision.rule_triggered,
max(all_rerank_scores) if all_rerank_scores else 0.0, total_ms,
)
# telemetry
background_tasks.add_task(
record_search_event, q, user.id, pr.results, "hybrid",
pr.confidence_signal, pr.analyzer_confidence,
)
debug_obj = None
if debug:
debug_obj = AskDebug(
timing_ms={**pr.timing_ms, "evidence_ms": ev_ms, "ask_total_ms": total_ms},
search_notes=pr.notes,
confidence_signal=pr.confidence_signal,
evidence_candidate_count=len(evidence),
evidence_kept_count=len(evidence),
evidence_skip_reason=ev_skip,
synthesis_cache_hit=False,
hallucination_flags=[],
defense_layers=defense_log,
)
return AskResponse(
results=pr.results,
ai_answer=None,
citations=[],
synthesis_status="skipped",
synthesis_ms=0.0,
confidence=None,
refused=True,
no_results_reason=no_reason,
query=q,
total=len(pr.results),
completeness="insufficient",
covered_aspects=classifier_result.covered_aspects or None,
missing_aspects=classifier_result.missing_aspects or None,
debug=debug_obj,
)
# 4. Synthesis
t_synth = time.perf_counter()
sr = await synthesize(q, evidence, debug=debug)
synth_ms = (time.perf_counter() - t_synth) * 1000
# 5. Grounding check (post-synthesis) + re-gate
grounding = grounding_check(q, sr.answer or "", evidence)
defense_log["grounding"] = {
"strong": grounding.strong_flags,
"weak": grounding.weak_flags,
}
# Completeness 결정: grounding 기반 (classifier 는 binary gate 만)
completeness: Literal["full", "partial", "insufficient"] = "full"
covered_aspects = classifier_result.covered_aspects or None
missing_aspects = classifier_result.missing_aspects or None
confirmed_items: list[ConfirmedItem] | None = None
if len(grounding.strong_flags) >= 2:
# Re-gate: multiple strong → refuse
completeness = "insufficient"
sr.answer = None
sr.refused = True
sr.confidence = None
defense_log["re_gate"] = "refuse(2+strong)"
elif grounding.strong_flags:
# Single strong → partial downgrade
completeness = "partial"
sr.confidence = "low"
defense_log["re_gate"] = "partial(1strong)"
elif grounding.weak_flags:
# Weak → confidence lower only
if sr.confidence == "high":
sr.confidence = "medium"
defense_log["re_gate"] = "conf_lower(weak)"
# Confidence cap from refusal gate (classifier 부재 시 conservative)
if decision.confidence_cap and sr.confidence:
conf_rank = {"low": 0, "medium": 1, "high": 2}
if conf_rank.get(sr.confidence, 0) > conf_rank.get(decision.confidence_cap, 2):
sr.confidence = decision.confidence_cap
# Partial 이면 max confidence = medium
if completeness == "partial" and sr.confidence == "high":
sr.confidence = "medium"
sr.hallucination_flags.extend(
[f"strong:{f}" for f in grounding.strong_flags]
+ [f"weak:{f}" for f in grounding.weak_flags]
)
total_ms = (time.perf_counter() - t_total) * 1000
# 6. 응답 구성
citations = _build_citations(evidence, sr.used_citations)
no_reason = _map_no_results_reason(pr, evidence, ev_skip, sr)
if completeness == "insufficient" and not no_reason:
no_reason = "답변 검증에서 복수 오류 감지"
logger.info(
"ask query=%r results=%d evidence=%d cite=%d synth=%s conf=%s completeness=%s "
"refused=%s grounding_strong=%d grounding_weak=%d ev_ms=%.0f synth_ms=%.0f total=%.0f",
q[:80], len(pr.results), len(evidence), len(citations),
sr.status, sr.confidence or "-", completeness,
sr.refused, len(grounding.strong_flags), len(grounding.weak_flags),
ev_ms, synth_ms, total_ms,
)
# 7. telemetry
background_tasks.add_task(
record_search_event, q, user.id, pr.results, "hybrid",
pr.confidence_signal, pr.analyzer_confidence,
)
debug_obj = None
if debug:
timing = dict(pr.timing_ms)
timing["evidence_ms"] = ev_ms
timing["synthesis_ms"] = synth_ms
timing["ask_total_ms"] = total_ms
debug_obj = AskDebug(
timing_ms=timing,
search_notes=pr.notes,
query_analysis=pr.query_analysis,
confidence_signal=pr.confidence_signal,
evidence_candidate_count=len(evidence),
evidence_kept_count=len(evidence),
evidence_skip_reason=ev_skip,
synthesis_cache_hit=sr.cache_hit,
synthesis_raw_preview=sr.raw_preview,
hallucination_flags=sr.hallucination_flags,
defense_layers=defense_log,
)
return AskResponse(
results=pr.results,
ai_answer=sr.answer,
citations=citations,
synthesis_status=sr.status,
synthesis_ms=sr.elapsed_ms,
confidence=sr.confidence,
refused=sr.refused,
no_results_reason=no_reason,
query=q,
total=len(pr.results),
completeness=completeness,
covered_aspects=covered_aspects,
missing_aspects=missing_aspects,
confirmed_items=confirmed_items,
debug=debug_obj,
)