feat(ask): Phase 3.5a guardrails (classifier + refusal gate + grounding + partial)

신규 파일:
- classifier_service.py: exaone binary classifier (sufficient/insufficient)
  parallel with evidence, circuit breaker, timeout 5s
- refusal_gate.py: multi-signal fusion (score + classifier)
  AND 조건, conservative fallback 3-tier (classifier 부재 시)
- grounding_check.py: strong/weak flag 분리
  strong: fabricated_number + intent_misalignment(important keywords)
  weak: uncited_claim + low_overlap + intent_misalignment(generic)
  re-gate: 2+ strong → refuse, 1 strong → partial
- sentence_splitter.py: regex 기반 (Phase 3.5b KSS 업그레이드)
- classifier.txt: exaone Y+ prompt (calibration examples 포함)
- search_synthesis_partial.txt: partial answer 전용 프롬프트
- 102_ask_events.sql: /ask 관측 테이블 (completeness 3-분리 지표)
- queries.yaml: Phase 3.5 smoke test 평가셋 10개

수정 파일:
- search.py /ask: classifier parallel + refusal gate + grounding re-gate
  + defense_layers 로깅 + AskResponse completeness/aspects/confirmed_items
- config.yaml: classifier model 섹션 (exaone3.5:7.8b GPU Ollama)
- config.py: classifier optional 파싱
- AskAnswer.svelte: 4분기 렌더 (full/partial/insufficient/loading)
- ask.ts: Completeness + ConfirmedItem 타입

P1 실측: exaone ternary 불안정 → binary gate 축소. partial은 grounding이 담당.
토론 9라운드 확정. plan: quiet-meandering-nova.md

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Hyungi Ahn
2026-04-10 08:49:11 +09:00
parent 0eecf1afca
commit 06443947bf
13 changed files with 869 additions and 47 deletions

View File

@@ -9,6 +9,7 @@
- `/ask` endpoint wrapper (Phase 3.3 에서 추가)
"""
import asyncio
import time
from typing import Annotated, Literal
@@ -20,8 +21,11 @@ from core.auth import get_current_user
from core.database import get_session
from core.utils import setup_logger
from models.user import User
from services.search.classifier_service import ClassifierResult, classify
from services.search.evidence_service import EvidenceItem, extract_evidence
from services.search.fusion_service import DEFAULT_FUSION
from services.search.grounding_check import check as grounding_check
from services.search.refusal_gate import RefusalDecision, decide as refusal_decide
from services.search.search_pipeline import PipelineResult, run_search
from services.search.synthesis_service import SynthesisResult, synthesize
from services.search_telemetry import record_search_event
@@ -216,6 +220,14 @@ class Citation(BaseModel):
rerank_score: float
class ConfirmedItem(BaseModel):
"""Partial answer 의 개별 aspect 답변."""
aspect: str
text: str
citations: list[int]
class AskDebug(BaseModel):
"""`/ask?debug=true` 응답 확장."""
@@ -230,10 +242,12 @@ class AskDebug(BaseModel):
synthesis_prompt_preview: str | None = None
synthesis_raw_preview: str | None = None
hallucination_flags: list[str] = []
# Phase 3.5a: per-layer defense 로깅
defense_layers: dict | None = None
class AskResponse(BaseModel):
"""`/ask` 응답. `/search` 의 SearchResult 는 그대로 재사용."""
"""`/ask` 응답. Phase 3.5a: completeness + aspects 추가."""
results: list[SearchResult]
ai_answer: str | None
@@ -247,6 +261,11 @@ class AskResponse(BaseModel):
no_results_reason: str | None
query: str
total: int
# Phase 3.5a
completeness: Literal["full", "partial", "insufficient"] = "full"
covered_aspects: list[str] | None = None
missing_aspects: list[str] | None = None
confirmed_items: list[ConfirmedItem] | None = None
debug: AskDebug | None = None
@@ -355,73 +374,211 @@ async def ask(
limit: int = Query(10, ge=1, le=20, description="synthesis 입력 상한"),
debug: bool = Query(False, description="evidence/synthesis 중간 상태 노출"),
):
"""근거 기반 AI 답변 (Phase 3.3).
"""근거 기반 AI 답변 (Phase 3.5a).
`/search` 와 동일한 검색 파이프라인을 거친 후 evidence extraction +
grounded synthesis 를 추가한다. `mode`, `rerank`, `analyze` 는 품질 보장을
위해 강제 고정 (hybrid / True / True).
실패 경로(timeout/parse_failed/refused/...) 에서도 `results` 는 항상 반환.
Phase 3.3 기반 + classifier parallel + refusal gate + grounding re-gate.
실패 경로에서도 `results` 는 항상 반환.
"""
t_total = time.perf_counter()
defense_log: dict = {} # per-layer flag snapshot
# 1. 검색 파이프라인 (run_search — /search 와 동일 로직, 단일 진실 소스)
# 1. 검색 파이프라인
pr = await run_search(
session,
q,
mode="hybrid",
limit=limit,
fusion=DEFAULT_FUSION,
rerank=True,
analyze=True,
session, q, mode="hybrid", limit=limit,
fusion=DEFAULT_FUSION, rerank=True, analyze=True,
)
# 2. Evidence extraction (rule + LLM span select, 1 batched call)
# 2. Evidence + Classifier 병렬
t_ev = time.perf_counter()
evidence, ev_skip = await extract_evidence(q, pr.results)
evidence_task = asyncio.create_task(extract_evidence(q, pr.results))
# classifier input: top 3 chunks meta + rerank scores
top_chunks = [
{
"title": r.title or "",
"section": r.section_title or "",
"snippet": (r.snippet or "")[:200],
}
for r in pr.results[:3]
]
rerank_scores_top = [
r.rerank_score if r.rerank_score is not None else r.score
for r in pr.results[:3]
]
classifier_task = asyncio.create_task(
classify(q, top_chunks, rerank_scores_top)
)
evidence, ev_skip = await evidence_task
ev_ms = (time.perf_counter() - t_ev) * 1000
# 3. Grounded synthesis (gemma-4, 15s timeout, citation 검증)
# classifier await (timeout 보호 — classifier_service 내부에도 있지만 여기서 이중 보호)
try:
classifier_result = await asyncio.wait_for(classifier_task, timeout=6.0)
except (asyncio.TimeoutError, Exception):
classifier_result = ClassifierResult("timeout", None, [], [], 0.0)
defense_log["classifier"] = {
"status": classifier_result.status,
"verdict": classifier_result.verdict,
"covered_aspects": classifier_result.covered_aspects,
"missing_aspects": classifier_result.missing_aspects,
"elapsed_ms": classifier_result.elapsed_ms,
}
# 3. Refusal gate (multi-signal fusion)
all_rerank_scores = [
e.rerank_score for e in evidence
] if evidence else rerank_scores_top
decision = refusal_decide(all_rerank_scores, classifier_result)
defense_log["score_gate"] = {
"max": max(all_rerank_scores) if all_rerank_scores else 0.0,
"agg_top3": sum(sorted(all_rerank_scores, reverse=True)[:3]),
}
defense_log["refusal"] = {
"refused": decision.refused,
"rule_triggered": decision.rule_triggered,
}
if decision.refused:
total_ms = (time.perf_counter() - t_total) * 1000
no_reason = "관련 근거를 찾지 못했습니다."
if not pr.results:
no_reason = "검색 결과가 없습니다."
logger.info(
"ask REFUSED query=%r rule=%s max_score=%.2f total=%.0f",
q[:80], decision.rule_triggered,
max(all_rerank_scores) if all_rerank_scores else 0.0, total_ms,
)
# telemetry
background_tasks.add_task(
record_search_event, q, user.id, pr.results, "hybrid",
pr.confidence_signal, pr.analyzer_confidence,
)
debug_obj = None
if debug:
debug_obj = AskDebug(
timing_ms={**pr.timing_ms, "evidence_ms": ev_ms, "ask_total_ms": total_ms},
search_notes=pr.notes,
confidence_signal=pr.confidence_signal,
evidence_candidate_count=len(evidence),
evidence_kept_count=len(evidence),
evidence_skip_reason=ev_skip,
synthesis_cache_hit=False,
hallucination_flags=[],
defense_layers=defense_log,
)
return AskResponse(
results=pr.results,
ai_answer=None,
citations=[],
synthesis_status="skipped",
synthesis_ms=0.0,
confidence=None,
refused=True,
no_results_reason=no_reason,
query=q,
total=len(pr.results),
completeness="insufficient",
covered_aspects=classifier_result.covered_aspects or None,
missing_aspects=classifier_result.missing_aspects or None,
debug=debug_obj,
)
# 4. Synthesis
t_synth = time.perf_counter()
sr = await synthesize(q, evidence, debug=debug)
synth_ms = (time.perf_counter() - t_synth) * 1000
# 5. Grounding check (post-synthesis) + re-gate
grounding = grounding_check(q, sr.answer or "", evidence)
defense_log["grounding"] = {
"strong": grounding.strong_flags,
"weak": grounding.weak_flags,
}
# Completeness 결정: grounding 기반 (classifier 는 binary gate 만)
completeness: Literal["full", "partial", "insufficient"] = "full"
covered_aspects = classifier_result.covered_aspects or None
missing_aspects = classifier_result.missing_aspects or None
confirmed_items: list[ConfirmedItem] | None = None
if len(grounding.strong_flags) >= 2:
# Re-gate: multiple strong → refuse
completeness = "insufficient"
sr.answer = None
sr.refused = True
sr.confidence = None
defense_log["re_gate"] = "refuse(2+strong)"
elif grounding.strong_flags:
# Single strong → partial downgrade
completeness = "partial"
sr.confidence = "low"
defense_log["re_gate"] = "partial(1strong)"
elif grounding.weak_flags:
# Weak → confidence lower only
if sr.confidence == "high":
sr.confidence = "medium"
defense_log["re_gate"] = "conf_lower(weak)"
# Confidence cap from refusal gate (classifier 부재 시 conservative)
if decision.confidence_cap and sr.confidence:
conf_rank = {"low": 0, "medium": 1, "high": 2}
if conf_rank.get(sr.confidence, 0) > conf_rank.get(decision.confidence_cap, 2):
sr.confidence = decision.confidence_cap
# Partial 이면 max confidence = medium
if completeness == "partial" and sr.confidence == "high":
sr.confidence = "medium"
sr.hallucination_flags.extend(
[f"strong:{f}" for f in grounding.strong_flags]
+ [f"weak:{f}" for f in grounding.weak_flags]
)
total_ms = (time.perf_counter() - t_total) * 1000
# 4. 응답 구성
# 6. 응답 구성
citations = _build_citations(evidence, sr.used_citations)
no_reason = _map_no_results_reason(pr, evidence, ev_skip, sr)
if completeness == "insufficient" and not no_reason:
no_reason = "답변 검증에서 복수 오류 감지"
logger.info(
"ask query=%r results=%d evidence=%d cite=%d synth=%s conf=%s refused=%s ev_ms=%.0f synth_ms=%.0f total=%.0f",
q[:80],
len(pr.results),
len(evidence),
len(citations),
sr.status,
sr.confidence or "-",
sr.refused,
ev_ms,
synth_ms,
total_ms,
"ask query=%r results=%d evidence=%d cite=%d synth=%s conf=%s completeness=%s "
"refused=%s grounding_strong=%d grounding_weak=%d ev_ms=%.0f synth_ms=%.0f total=%.0f",
q[:80], len(pr.results), len(evidence), len(citations),
sr.status, sr.confidence or "-", completeness,
sr.refused, len(grounding.strong_flags), len(grounding.weak_flags),
ev_ms, synth_ms, total_ms,
)
# 5. telemetry — 기존 record_search_event 재사용 (Phase 0.3 호환)
# 7. telemetry
background_tasks.add_task(
record_search_event,
q,
user.id,
pr.results,
"hybrid",
pr.confidence_signal,
pr.analyzer_confidence,
record_search_event, q, user.id, pr.results, "hybrid",
pr.confidence_signal, pr.analyzer_confidence,
)
debug_obj = (
_build_ask_debug(pr, evidence, ev_skip, sr, ev_ms, synth_ms, total_ms)
if debug
else None
)
debug_obj = None
if debug:
timing = dict(pr.timing_ms)
timing["evidence_ms"] = ev_ms
timing["synthesis_ms"] = synth_ms
timing["ask_total_ms"] = total_ms
debug_obj = AskDebug(
timing_ms=timing,
search_notes=pr.notes,
query_analysis=pr.query_analysis,
confidence_signal=pr.confidence_signal,
evidence_candidate_count=len(evidence),
evidence_kept_count=len(evidence),
evidence_skip_reason=ev_skip,
synthesis_cache_hit=sr.cache_hit,
synthesis_raw_preview=sr.raw_preview,
hallucination_flags=sr.hallucination_flags,
defense_layers=defense_log,
)
return AskResponse(
results=pr.results,
@@ -434,5 +591,9 @@ async def ask(
no_results_reason=no_reason,
query=q,
total=len(pr.results),
completeness=completeness,
covered_aspects=covered_aspects,
missing_aspects=missing_aspects,
confirmed_items=confirmed_items,
debug=debug_obj,
)