feat(search): Phase 3 Ask pipeline (evidence + synthesis + /api/search/ask)

- llm_gate.py: MLX single-inference 전역 semaphore (analyzer/evidence/synthesis 공유)
- search_pipeline.py: run_search() 추출, /search 와 /ask 단일 진실 소스
- evidence_service.py: Rule + LLM span select (EV-A), doc-group ordering,
  span too-short 자동 확장(<80자→120자), fallback 은 query 중심 window 강제
- synthesis_service.py: grounded answer + citation 검증 + LRU 캐시(1h/300),
  refused 처리, span_text ONLY 룰 (full_snippet 프롬프트 금지)
- /api/search/ask: 15s timeout, 9가지 failure mode + 한국어 no_results_reason
- rerank_service: rerank_score raw 보존 (display drift 방지)
- query_analyzer: _get_llm_semaphore 를 llm_gate.get_mlx_gate 로 위임
- prompts: evidence_extract.txt, search_synthesis.txt (JSON-only, example 포함)

config.yaml / docker / ollama / infra_inventory 변경 없음.
plan: ~/.claude/plans/quiet-meandering-nova.md

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Hyungi Ahn
2026-04-09 07:34:08 +09:00
parent 120db86d74
commit 64322e4f6f
9 changed files with 1698 additions and 258 deletions

View File

@@ -1,11 +1,16 @@
"""하이브리드 검색 API — orchestrator (Phase 1.1: thin endpoint). """하이브리드 검색 API — thin endpoint (Phase 3.1 이후).
retrieval / fusion / rerank 등 실제 로직은 services/search/* 모듈로 분리. 실제 검색 파이프라인(retrieval fusion rerank → diversity → confidence)
이 파일은 mode 분기, 응답 직렬화, debug 응답 구성, BackgroundTask dispatch만 담당. 은 `services/search/search_pipeline.py::run_search()` 로 분리되어 있다.
이 파일은 다음만 담당:
- Pydantic 스키마 (SearchResult / SearchResponse / SearchDebug / DebugCandidate
/ Citation / AskResponse / AskDebug)
- `/search` endpoint wrapper (run_search 호출 + logger + telemetry + 직렬화)
- `/ask` endpoint wrapper (Phase 3.3 에서 추가)
""" """
import time import time
from typing import Annotated from typing import Annotated, Literal
from fastapi import APIRouter, BackgroundTasks, Depends, Query from fastapi import APIRouter, BackgroundTasks, Depends, Query
from pydantic import BaseModel from pydantic import BaseModel
@@ -15,48 +20,11 @@ from core.auth import get_current_user
from core.database import get_session from core.database import get_session
from core.utils import setup_logger from core.utils import setup_logger
from models.user import User from models.user import User
from services.search import query_analyzer from services.search.evidence_service import EvidenceItem, extract_evidence
from services.search.fusion_service import ( from services.search.fusion_service import DEFAULT_FUSION
DEFAULT_FUSION, from services.search.search_pipeline import PipelineResult, run_search
apply_soft_filter_boost, from services.search.synthesis_service import SynthesisResult, synthesize
get_strategy, from services.search_telemetry import record_search_event
normalize_display_scores,
)
from services.search.rerank_service import (
MAX_CHUNKS_PER_DOC,
MAX_RERANK_INPUT,
apply_diversity,
rerank_chunks,
)
from services.search.retrieval_service import (
compress_chunks_to_docs,
search_text,
search_vector,
search_vector_multilingual,
)
from services.search_telemetry import (
compute_confidence,
compute_confidence_hybrid,
compute_confidence_reranked,
record_search_event,
)
# Phase 2.1: analyzer_confidence 3단계 게이트 (값 조정은 plan 기준)
ANALYZER_TIER_IGNORE = 0.5 # < 0.5 → analyzer 완전 무시, soft_filter 비활성
ANALYZER_TIER_ORIGINAL = 0.7 # < 0.7 → original query fallback
ANALYZER_TIER_MERGE = 0.85 # < 0.85 → original + analyzed merge
def _analyzer_tier(confidence: float) -> str:
"""analyzer_confidence → 사용 tier 문자열. Phase 2.2/2.3에서 실제 분기용."""
if confidence < ANALYZER_TIER_IGNORE:
return "ignore"
if confidence < ANALYZER_TIER_ORIGINAL:
return "original_fallback"
if confidence < ANALYZER_TIER_MERGE:
return "merge"
return "analyzed"
# logs/search.log + stdout 동시 출력 (Phase 0.4) # logs/search.log + stdout 동시 출력 (Phase 0.4)
logger = setup_logger("search") logger = setup_logger("search")
@@ -84,6 +52,10 @@ class SearchResult(BaseModel):
chunk_id: int | None = None chunk_id: int | None = None
chunk_index: int | None = None chunk_index: int | None = None
section_title: str | None = None section_title: str | None = None
# Phase 3.1: reranker raw score 보존 (display score drift 방지).
# rerank 경로를 탄 chunk에만 채워짐. normalize_display_scores는 이 필드를
# 건드리지 않는다. Phase 3 evidence fast-path 판단에 사용.
rerank_score: float | None = None
# ─── Phase 0.4: 디버그 응답 스키마 ───────────────────────── # ─── Phase 0.4: 디버그 응답 스키마 ─────────────────────────
@@ -126,6 +98,29 @@ def _to_debug_candidates(rows: list[SearchResult], n: int = 20) -> list[DebugCan
] ]
def _build_search_debug(pr: PipelineResult) -> SearchDebug:
"""PipelineResult → SearchDebug (기존 search()의 debug 구성 블록 복사)."""
return SearchDebug(
timing_ms=pr.timing_ms,
text_candidates=(
_to_debug_candidates(pr.text_results)
if pr.text_results or pr.mode != "vector"
else None
),
vector_candidates=(
_to_debug_candidates(pr.vector_results)
if pr.vector_results or pr.mode in ("vector", "hybrid")
else None
),
fused_candidates=(
_to_debug_candidates(pr.results) if pr.mode == "hybrid" else None
),
confidence=pr.confidence_signal,
notes=pr.notes,
query_analysis=pr.query_analysis,
)
@router.get("/", response_model=SearchResponse) @router.get("/", response_model=SearchResponse)
async def search( async def search(
q: str, q: str,
@@ -149,193 +144,34 @@ async def search(
), ),
debug: bool = Query(False, description="단계별 candidates + timing 응답에 포함"), debug: bool = Query(False, description="단계별 candidates + timing 응답에 포함"),
): ):
"""문서 검색 — FTS + ILIKE + 벡터 결합 (Phase 0.5: RRF fusion)""" """문서 검색 — FTS + ILIKE + 벡터 결합 (Phase 3.1 이후 run_search wrapper)"""
timing: dict[str, float] = {} pr = await run_search(
notes: list[str] = [] session,
text_results: list[SearchResult] = [] q,
vector_results: list[SearchResult] = [] # doc-level (압축 후, fusion 입력) mode=mode, # type: ignore[arg-type]
raw_chunks: list[SearchResult] = [] # chunk-level (raw, Phase 1.3 reranker용) limit=limit,
chunks_by_doc: dict[int, list[SearchResult]] = {} # Phase 1.3 reranker용 보존 fusion=fusion,
query_analysis: dict | None = None rerank=rerank,
analyzer_confidence: float = 0.0 analyze=analyze,
analyzer_tier: str = "disabled" )
t_total = time.perf_counter()
# Phase 2.1 (async 구조): QueryAnalyzer는 동기 호출 금지.
# - cache hit → query_analysis 활용 (Phase 2.2/2.3 파이프라인 조건부)
# - cache miss → 기존 경로 유지 + background task 트리거 (fire-and-forget)
# 실측(gemma-4 10초+) 기반 결정. memory: feedback_analyzer_async_only.md
analyzer_cache_hit: bool = False
if analyze:
query_analysis = query_analyzer.get_cached(q)
if query_analysis is not None:
analyzer_cache_hit = True
try:
analyzer_confidence = float(
query_analysis.get("analyzer_confidence", 0.0) or 0.0
)
except (TypeError, ValueError):
analyzer_confidence = 0.0
analyzer_tier = _analyzer_tier(analyzer_confidence)
notes.append(
f"analyzer cache_hit conf={analyzer_confidence:.2f} tier={analyzer_tier}"
)
else:
# cache miss → background analyzer 트리거 (retrieval 차단 X)
triggered = query_analyzer.trigger_background_analysis(q)
analyzer_tier = "cache_miss"
notes.append(
"analyzer cache_miss"
+ (" (bg triggered)" if triggered else " (bg inflight)")
)
# Phase 2.2: multilingual vector search 활성 조건 (보수적)
# - cache hit + analyzer_tier == "analyzed" (≥0.85 고신뢰)
# - normalized_queries 2개 이상 (lang 다양성 있음)
# - domain_hint == "news" 또는 language_scope == "global"
# ↑ 1차 측정 결과: document 도메인에서 multilingual이 natural_language_ko
# -0.10 악화시킴. 영어 번역이 한국어 법령 검색에서 noise로 작용.
# news / global 영역에서만 multilingual 활성 (news_crosslingual +0.10 개선 확인).
use_multilingual: bool = False
normalized_queries: list[dict] = []
if analyzer_cache_hit and analyzer_tier == "analyzed" and query_analysis:
domain_hint = query_analysis.get("domain_hint", "mixed")
language_scope = query_analysis.get("language_scope", "limited")
is_multilingual_candidate = (
domain_hint == "news" or language_scope == "global"
)
if is_multilingual_candidate:
raw_nq = query_analysis.get("normalized_queries") or []
if isinstance(raw_nq, list) and len(raw_nq) >= 2:
normalized_queries = [
nq for nq in raw_nq if isinstance(nq, dict) and nq.get("text")
]
if len(normalized_queries) >= 2:
use_multilingual = True
notes.append(
f"multilingual langs={[nq.get('lang') for nq in normalized_queries]}"
f" hint={domain_hint}/{language_scope}"
)
if mode == "vector":
t0 = time.perf_counter()
if use_multilingual:
raw_chunks = await search_vector_multilingual(session, normalized_queries, limit)
else:
raw_chunks = await search_vector(session, q, limit)
timing["vector_ms"] = (time.perf_counter() - t0) * 1000
if not raw_chunks:
notes.append("vector_search_returned_empty (AI client error or no embeddings)")
# vector 단독 모드도 doc 압축해서 다양성 확보 (chunk 중복 방지)
vector_results, chunks_by_doc = compress_chunks_to_docs(raw_chunks, limit)
results = vector_results
else:
t0 = time.perf_counter()
text_results = await search_text(session, q, limit)
timing["text_ms"] = (time.perf_counter() - t0) * 1000
if mode == "hybrid":
t1 = time.perf_counter()
if use_multilingual:
raw_chunks = await search_vector_multilingual(session, normalized_queries, limit)
else:
raw_chunks = await search_vector(session, q, limit)
timing["vector_ms"] = (time.perf_counter() - t1) * 1000
# chunk-level → doc-level 압축 (raw chunks는 chunks_by_doc에 보존)
t1b = time.perf_counter()
vector_results, chunks_by_doc = compress_chunks_to_docs(raw_chunks, limit)
timing["compress_ms"] = (time.perf_counter() - t1b) * 1000
if not vector_results:
notes.append("vector_search_returned_empty — text-only fallback")
t2 = time.perf_counter()
strategy = get_strategy(fusion)
# fusion은 doc 기준 — 더 넓게 가져옴 (rerank 후보용)
fusion_limit = max(limit * 5, 100) if rerank else limit
fused_docs = strategy.fuse(text_results, vector_results, q, fusion_limit)
timing["fusion_ms"] = (time.perf_counter() - t2) * 1000
notes.append(f"fusion={strategy.name}")
notes.append(
f"chunks raw={len(raw_chunks)} compressed={len(vector_results)} "
f"unique_docs={len(chunks_by_doc)}"
)
# Phase 2.3: soft_filter boost (cache hit + tier != ignore 일 때만)
# analyzer_confidence < 0.5 (tier=ignore)는 비활성.
if (
analyzer_cache_hit
and analyzer_tier != "ignore"
and query_analysis
):
soft_filters = query_analysis.get("soft_filters") or {}
if soft_filters:
boosted = apply_soft_filter_boost(fused_docs, soft_filters)
if boosted > 0:
notes.append(f"soft_filter_boost applied={boosted}")
if rerank:
# Phase 1.3: reranker — chunk 기준 입력
# fusion 결과 doc_id로 chunks_by_doc에서 raw chunks 회수
t3 = time.perf_counter()
rerank_input: list[SearchResult] = []
for doc in fused_docs:
chunks = chunks_by_doc.get(doc.id, [])
if chunks:
# doc당 max 2 chunk (latency/VRAM 보호)
rerank_input.extend(chunks[:MAX_CHUNKS_PER_DOC])
else:
# text-only 매치 doc → doc 자체를 chunk처럼 wrap
rerank_input.append(doc)
if len(rerank_input) >= MAX_RERANK_INPUT:
break
rerank_input = rerank_input[:MAX_RERANK_INPUT]
notes.append(f"rerank input={len(rerank_input)}")
reranked = await rerank_chunks(q, rerank_input, limit * 3)
timing["rerank_ms"] = (time.perf_counter() - t3) * 1000
# diversity (chunk → doc 압축, max_per_doc=2, top score>0.90 unlimited)
t4 = time.perf_counter()
results = apply_diversity(reranked, max_per_doc=MAX_CHUNKS_PER_DOC)[:limit]
timing["diversity_ms"] = (time.perf_counter() - t4) * 1000
else:
# rerank 비활성: fused_docs를 그대로 (limit 적용)
results = fused_docs[:limit]
else:
results = text_results
# display score 정규화 — 프론트엔드는 score*100을 % 표시.
# fusion 내부 score(RRF는 0.01~0.05 범위)를 그대로 노출하면 표시가 깨짐.
normalize_display_scores(results)
timing["total_ms"] = (time.perf_counter() - t_total) * 1000
# confidence는 fusion 적용 전 raw 신호로 계산 (Phase 0.5 이후 fused score는 절대값 의미 없음)
# rerank 활성 시 reranker score가 가장 신뢰할 수 있는 신호 → 우선 사용
if mode == "hybrid":
if rerank and "rerank_ms" in timing:
confidence_signal = compute_confidence_reranked(results)
else:
confidence_signal = compute_confidence_hybrid(text_results, vector_results)
elif mode == "vector":
confidence_signal = compute_confidence(vector_results, "vector")
else:
confidence_signal = compute_confidence(text_results, mode)
# 사용자 feedback: 모든 단계 timing은 debug 응답과 별도로 항상 로그로 남긴다 # 사용자 feedback: 모든 단계 timing은 debug 응답과 별도로 항상 로그로 남긴다
timing_str = " ".join(f"{k}={v:.0f}" for k, v in timing.items()) timing_str = " ".join(f"{k}={v:.0f}" for k, v in pr.timing_ms.items())
fusion_str = f" fusion={fusion}" if mode == "hybrid" else "" fusion_str = f" fusion={fusion}" if mode == "hybrid" else ""
analyzer_str = ( analyzer_str = (
f" analyzer=hit={analyzer_cache_hit}/conf={analyzer_confidence:.2f}/tier={analyzer_tier}" f" analyzer=hit={pr.analyzer_cache_hit}/conf={pr.analyzer_confidence:.2f}/tier={pr.analyzer_tier}"
if analyze if analyze
else "" else ""
) )
logger.info( logger.info(
"search query=%r mode=%s%s%s results=%d conf=%.2f %s", "search query=%r mode=%s%s%s results=%d conf=%.2f %s",
q[:80], mode, fusion_str, analyzer_str, len(results), confidence_signal, timing_str, q[:80],
pr.mode,
fusion_str,
analyzer_str,
len(pr.results),
pr.confidence_signal,
timing_str,
) )
# Phase 0.3: 실패 자동 로깅 (응답 latency에 영향 X — background task) # Phase 0.3: 실패 자동 로깅 (응답 latency에 영향 X — background task)
@@ -344,28 +180,259 @@ async def search(
record_search_event, record_search_event,
q, q,
user.id, user.id,
results, pr.results,
mode, pr.mode,
confidence_signal, pr.confidence_signal,
analyzer_confidence if analyze else None, pr.analyzer_confidence if analyze else None,
) )
debug_obj: SearchDebug | None = None debug_obj = _build_search_debug(pr) if debug else None
if debug:
debug_obj = SearchDebug(
timing_ms=timing,
text_candidates=_to_debug_candidates(text_results) if text_results or mode != "vector" else None,
vector_candidates=_to_debug_candidates(vector_results) if vector_results or mode in ("vector", "hybrid") else None,
fused_candidates=_to_debug_candidates(results) if mode == "hybrid" else None,
confidence=confidence_signal,
notes=notes,
query_analysis=query_analysis,
)
return SearchResponse( return SearchResponse(
results=results, results=pr.results,
total=len(results), total=len(pr.results),
query=q, query=q,
mode=mode, mode=pr.mode,
debug=debug_obj,
)
# ═══════════════════════════════════════════════════════════
# Phase 3.3: /api/search/ask — Evidence + Grounded Synthesis
# ═══════════════════════════════════════════════════════════
class Citation(BaseModel):
"""answer 본문의 [n] 에 해당하는 근거 단일 행."""
n: int
chunk_id: int | None
doc_id: int
title: str | None
section_title: str | None
span_text: str # evidence LLM 이 추출한 50~300자
full_snippet: str # 원본 800자 (citation 원문 보기 전용)
relevance: float
rerank_score: float
class AskDebug(BaseModel):
"""`/ask?debug=true` 응답 확장."""
timing_ms: dict[str, float]
search_notes: list[str]
query_analysis: dict | None = None
confidence_signal: float
evidence_candidate_count: int
evidence_kept_count: int
evidence_skip_reason: str | None
synthesis_cache_hit: bool
synthesis_prompt_preview: str | None = None
synthesis_raw_preview: str | None = None
hallucination_flags: list[str] = []
class AskResponse(BaseModel):
"""`/ask` 응답. `/search` 의 SearchResult 는 그대로 재사용."""
results: list[SearchResult]
ai_answer: str | None
citations: list[Citation]
synthesis_status: Literal[
"completed", "timeout", "skipped", "no_evidence", "parse_failed", "llm_error"
]
synthesis_ms: float
confidence: Literal["high", "medium", "low"] | None
refused: bool
no_results_reason: str | None
query: str
total: int
debug: AskDebug | None = None
def _map_no_results_reason(
pr: PipelineResult,
evidence: list[EvidenceItem],
ev_skip: str | None,
sr: SynthesisResult,
) -> str | None:
"""사용자에게 보여줄 한국어 메시지 매핑.
Failure mode 표 (plan §Failure Modes) 기반.
"""
# LLM 자가 refused → 모델이 준 사유 그대로
if sr.refused and sr.refuse_reason:
return sr.refuse_reason
# synthesis 상태 우선
if sr.status == "no_evidence":
if not pr.results:
return "검색 결과가 없습니다."
return "관련도 높은 근거를 찾지 못했습니다."
if sr.status == "skipped":
return "검색 결과가 없습니다."
if sr.status == "timeout":
return "답변 생성이 지연되어 생략했습니다. 검색 결과를 확인해 주세요."
if sr.status == "parse_failed":
return "답변 형식 오류로 생략했습니다."
if sr.status == "llm_error":
return "AI 서버에 일시적 문제가 있습니다."
# evidence 단계 실패는 fallback 을 탔더라도 notes 용
if ev_skip == "all_low_rerank":
return "관련도 높은 근거를 찾지 못했습니다."
if ev_skip == "empty_retrieval":
return "검색 결과가 없습니다."
return None
def _build_citations(
evidence: list[EvidenceItem], used_citations: list[int]
) -> list[Citation]:
"""answer 본문에 실제로 등장한 n 만 Citation 으로 변환."""
by_n = {e.n: e for e in evidence}
out: list[Citation] = []
for n in used_citations:
e = by_n.get(n)
if e is None:
continue
out.append(
Citation(
n=e.n,
chunk_id=e.chunk_id,
doc_id=e.doc_id,
title=e.title,
section_title=e.section_title,
span_text=e.span_text,
full_snippet=e.full_snippet,
relevance=e.relevance,
rerank_score=e.rerank_score,
)
)
return out
def _build_ask_debug(
pr: PipelineResult,
evidence: list[EvidenceItem],
ev_skip: str | None,
sr: SynthesisResult,
ev_ms: float,
synth_ms: float,
total_ms: float,
) -> AskDebug:
timing: dict[str, float] = dict(pr.timing_ms)
timing["evidence_ms"] = ev_ms
timing["synthesis_ms"] = synth_ms
timing["ask_total_ms"] = total_ms
# candidate count 는 rule filter 통과한 수 (recomputable from results)
# 엄밀히는 evidence_service 내부 숫자인데, evidence 길이 ≈ kept, candidate
# 는 관측이 어려움 → kept 는 evidence 길이, candidate 는 별도 필드 없음.
# 단순화: candidate_count = len(evidence) 를 상한 근사로 둠 (debug 전용).
return AskDebug(
timing_ms=timing,
search_notes=pr.notes,
query_analysis=pr.query_analysis,
confidence_signal=pr.confidence_signal,
evidence_candidate_count=len(evidence),
evidence_kept_count=len(evidence),
evidence_skip_reason=ev_skip,
synthesis_cache_hit=sr.cache_hit,
synthesis_prompt_preview=None, # 현재 synthesis_service 에서 노출 안 함
synthesis_raw_preview=sr.raw_preview,
hallucination_flags=sr.hallucination_flags,
)
@router.get("/ask", response_model=AskResponse)
async def ask(
q: str,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
background_tasks: BackgroundTasks,
limit: int = Query(10, ge=1, le=20, description="synthesis 입력 상한"),
debug: bool = Query(False, description="evidence/synthesis 중간 상태 노출"),
):
"""근거 기반 AI 답변 (Phase 3.3).
`/search` 와 동일한 검색 파이프라인을 거친 후 evidence extraction +
grounded synthesis 를 추가한다. `mode`, `rerank`, `analyze` 는 품질 보장을
위해 강제 고정 (hybrid / True / True).
실패 경로(timeout/parse_failed/refused/...) 에서도 `results` 는 항상 반환.
"""
t_total = time.perf_counter()
# 1. 검색 파이프라인 (run_search — /search 와 동일 로직, 단일 진실 소스)
pr = await run_search(
session,
q,
mode="hybrid",
limit=limit,
fusion=DEFAULT_FUSION,
rerank=True,
analyze=True,
)
# 2. Evidence extraction (rule + LLM span select, 1 batched call)
t_ev = time.perf_counter()
evidence, ev_skip = await extract_evidence(q, pr.results)
ev_ms = (time.perf_counter() - t_ev) * 1000
# 3. Grounded synthesis (gemma-4, 15s timeout, citation 검증)
t_synth = time.perf_counter()
sr = await synthesize(q, evidence, debug=debug)
synth_ms = (time.perf_counter() - t_synth) * 1000
total_ms = (time.perf_counter() - t_total) * 1000
# 4. 응답 구성
citations = _build_citations(evidence, sr.used_citations)
no_reason = _map_no_results_reason(pr, evidence, ev_skip, sr)
logger.info(
"ask query=%r results=%d evidence=%d cite=%d synth=%s conf=%s refused=%s ev_ms=%.0f synth_ms=%.0f total=%.0f",
q[:80],
len(pr.results),
len(evidence),
len(citations),
sr.status,
sr.confidence or "-",
sr.refused,
ev_ms,
synth_ms,
total_ms,
)
# 5. telemetry — 기존 record_search_event 재사용 (Phase 0.3 호환)
background_tasks.add_task(
record_search_event,
q,
user.id,
pr.results,
"hybrid",
pr.confidence_signal,
pr.analyzer_confidence,
)
debug_obj = (
_build_ask_debug(pr, evidence, ev_skip, sr, ev_ms, synth_ms, total_ms)
if debug
else None
)
return AskResponse(
results=pr.results,
ai_answer=sr.answer,
citations=citations,
synthesis_status=sr.status,
synthesis_ms=sr.elapsed_ms,
confidence=sr.confidence,
refused=sr.refused,
no_results_reason=no_reason,
query=q,
total=len(pr.results),
debug=debug_obj, debug=debug_obj,
) )

View File

@@ -0,0 +1,76 @@
You are an evidence span extractor. Respond ONLY in JSON. No markdown, no explanation.
## Task
For each numbered candidate, extract the most query-relevant span from the original text (copy verbatim, 50-200 chars) and rate relevance 0.0~1.0. If the candidate does not directly answer the query, set span=null, relevance=0.0, skip_reason.
## Output Schema
{
"items": [
{
"n": 1,
"span": "...",
"relevance": 0.0,
"skip_reason": null
}
]
}
## Rules
- `n`: candidate 번호 (1-based, 입력 순서와 동일). **모든 n을 반환** (skip된 것도 포함).
- `span`: 원문에서 **그대로 복사한** 50~200자. 요약/변형 금지. 원문에 없는 단어는 절대 포함하지 말 것. 여러 문장이어도 무방.
- 관련 span이 없으면 `span: null`, `relevance: 0.0`, `skip_reason`에 한 줄 사유.
- `relevance`: 0.0~1.0 float
- 0.9+ query에 직접 답함
- 0.7~0.9 강한 연관
- 0.5~0.7 부분 연관
- <0.5 약한/무관 (fallback에서 탈락)
- `skip_reason`: span=null 일 때만 필수. 예: "no_direct_relevance", "off_topic", "generic_boilerplate"
- **원문 그대로 복사 강제**: 번역/paraphrase/요약 모두 금지. evidence span은 citation 원문이 되어야 한다.
## Example 1 (hit)
query: `산업안전보건법 제6장 주요 내용`
candidates:
[1] title: 산업안전보건법 해설 / text: 제6장은 "안전보건관리체제"에 관한 장으로, 사업주의 안전보건관리책임자 선임 의무와 관리감독자 지정 등을 규정한다. 제15조부터 제19조까지 구성된다...
[2] title: 회사 복지 규정 / text: 직원의 연차휴가 사용 규정과 경조사 지원 내용을 담고 있다...
{
"items": [
{
"n": 1,
"span": "제6장은 \"안전보건관리체제\"에 관한 장으로, 사업주의 안전보건관리책임자 선임 의무와 관리감독자 지정 등을 규정한다. 제15조부터 제19조까지 구성된다",
"relevance": 0.95,
"skip_reason": null
},
{
"n": 2,
"span": null,
"relevance": 0.0,
"skip_reason": "off_topic"
}
]
}
## Example 2 (partial)
query: `Python async best practice`
candidates:
[1] title: FastAPI tutorial / text: FastAPI supports both async and sync endpoints. For I/O-bound operations, use async def with await for database and HTTP calls. Avoid blocking calls in async functions or use run_in_executor...
{
"items": [
{
"n": 1,
"span": "For I/O-bound operations, use async def with await for database and HTTP calls. Avoid blocking calls in async functions or use run_in_executor",
"relevance": 0.82,
"skip_reason": null
}
]
}
## Query
{query}
## Candidates
{numbered_candidates}

View File

@@ -0,0 +1,80 @@
You are a grounded answer synthesizer. Respond ONLY in JSON. No markdown, no explanation.
## Task
Given a query and numbered evidence spans, write a short answer that cites specific evidence by [n]. **You may only use facts that appear in the evidence.** If the evidence does not directly answer the query, set `refused: true`.
## Output Schema
{
"answer": "...",
"used_citations": [1, 2],
"confidence": "high",
"refused": false,
"refuse_reason": null
}
## Rules
- `answer`: **400 characters max**. Must contain inline `[n]` citations. Every claim sentence ends with at least one `[n]`. Multiple sources: `[1][3]`. **Only use facts present in evidence. No outside knowledge, no guessing, no paraphrasing what is not there.**
- `used_citations`: integer list of `n` values that actually appear in `answer` (for cross-check). Must be sorted ascending, no duplicates.
- `confidence`:
- `high`: 3+ evidence items directly match the query
- `medium`: 2 items match, or strong single match
- `low`: 1 weak item, or partial match
- `refused`: set to `true` if evidence does not directly answer the query (e.g. off-topic, too generic, missing key facts). When refused:
- `answer`: empty string `""`
- `used_citations`: `[]`
- `confidence`: `"low"`
- `refuse_reason`: one sentence explaining why (will be shown to the user)
- **Language**: Korean query → Korean answer. English query → English answer. Match query language.
- **Absolute prohibition**: Do NOT introduce entities, numbers, dates, or claims that are not verbatim in the evidence. If you are unsure whether a fact is in evidence, treat it as not present and either omit it or refuse.
## Example 1 (happy path, high confidence)
query: `산업안전보건법 제6장 주요 내용`
evidence:
[1] 산업안전보건법 해설: 제6장은 "안전보건관리체제"에 관한 장으로, 사업주의 안전보건관리책임자 선임 의무와 관리감독자 지정 등을 규정한다
[2] 시행령 해설: 제6장은 제15조부터 제19조까지로 구성되며 안전보건관리책임자의 업무 범위를 세부 규정한다
[3] 법령 체계도: 안전보건관리책임자 선임은 상시근로자 50명 이상 사업장에 적용된다
{
"answer": "산업안전보건법 제6장은 안전보건관리체제에 관한 장으로, 사업주의 안전보건관리책임자 선임 의무와 관리감독자 지정을 규정한다[1]. 제15조부터 제19조까지 구성되며 관리책임자의 업무 범위를 세부 규정한다[2]. 상시근로자 50명 이상 사업장에 적용된다[3].",
"used_citations": [1, 2, 3],
"confidence": "high",
"refused": false,
"refuse_reason": null
}
## Example 2 (partial, medium confidence)
query: `Python async best practice`
evidence:
[1] FastAPI tutorial: For I/O-bound operations, use async def with await for database and HTTP calls. Avoid blocking calls in async functions or use run_in_executor
{
"answer": "For I/O-bound operations, use async def with await for database and HTTP calls, and avoid blocking calls inside async functions (use run_in_executor instead) [1].",
"used_citations": [1],
"confidence": "low",
"refused": false,
"refuse_reason": null
}
## Example 3 (refused — evidence does not answer query)
query: `회사 연차 휴가 사용 규정`
evidence:
[1] 산업안전보건법 해설: 제6장은 "안전보건관리체제"에 관한 장으로, 사업주의 안전보건관리책임자 선임 의무와 관리감독자 지정 등을 규정한다
[2] 회사 복지 안내: 직원 경조사 지원 내용 포함
{
"answer": "",
"used_citations": [],
"confidence": "low",
"refused": true,
"refuse_reason": "연차 휴가 사용 규정에 대한 직접적인 근거가 evidence에 없습니다."
}
## Query
{query}
## Evidence
{numbered_evidence}

View File

@@ -1,5 +1,407 @@
"""Evidence extraction 서비스 (Phase 3). """Evidence extraction 서비스 (Phase 3.2).
reranked chunks에서 query-relevant span을 rule + LLM hybrid로 추출. reranker 결과 chunks 에서 query-relevant span 을 구조적으로 추출한다.
구현은 Phase 3에서 채움.
## 설계 (EV-A: Rule + LLM span select)
```
reranked results
[rule filter] score >= 0.25, max_per_doc=2, top MAX_EVIDENCE_CANDIDATES
[snippet 재윈도우] _extract_window(full, query, 800) — LLM 입력용
[1 batched LLM call] gemma-4 via get_mlx_gate() (single inference)
[post-process]
- relevance >= 0.5 필터
- span too-short (< 80자) → _extract_window(full, query, 120) 로 재확장
- span too-long (> 300자) → cut
- doc-group ordering (검색 결과 doc 순서 유지, doc 내부만 relevance desc)
- n 재부여 (1..N)
EvidenceItem 리스트
```
## 영구 룰
- **LLM 호출은 1번만** (batched). 순차 호출 절대 금지 — MLX single-inference
큐가 폭발한다.
- **모든 MLX 호출은 `get_mlx_gate()` 경유**. analyzer / synthesis 와 동일
semaphore 공유.
- **fallback span 도 query 중심 window**. `full_snippet[:200]` 같은 "앞에서부터
자르기" 절대 금지. 조용한 품질 붕괴 (citation 은 멀쩡한데 실제 span 이 query
와 무관) 대표 사례.
- **Span too-short 보정 필수**: `len(span) < 80` 이면 자동 확장. "짧을수록
정확" 이 아니라 **짧으면 위험** — synthesis LLM 이 문맥 부족으로 이어 만들기
(soft hallucination) 를 한다.
- **Evidence ordering 은 doc-group 유지**. 전역 relevance desc 정렬 금지.
answer 는 [1][2][3] 순서로 생성되고 그 순서가 문맥 흐름을 결정한다.
## 확장 여지 (지금은 비활성)
`EVIDENCE_FAST_PATH_THRESHOLD` 가 `None` 이 아니고 `results[0].rerank_score >=
THRESHOLD` 이면 LLM 호출 스킵 후 rule-only 경로로 즉시 반환. Activation 조건:
(1) evidence LLM 호출 비율 > 80%, (2) /ask 평균 latency > 15s, (3) rerank
top1 p50 > 0.75. 셋 다 충족해야 켠다.
""" """
from __future__ import annotations
import asyncio
import time
from dataclasses import dataclass, field
from typing import TYPE_CHECKING
from ai.client import AIClient, _load_prompt, parse_json_response
from core.utils import setup_logger
from .llm_gate import get_mlx_gate
from .rerank_service import _extract_window
if TYPE_CHECKING:
from api.search import SearchResult
logger = setup_logger("evidence")
# ─── 상수 (plan 영구 룰) ─────────────────────────────────
EVIDENCE_MIN_RERANK = 0.25 # 1차 rule cut — rerank score 이 미만은 제외
MAX_EVIDENCE_CANDIDATES = 6 # LLM 입력 상한
MAX_PER_DOC = 2
CANDIDATE_SNIPPET_CHARS = 800 # LLM 이 볼 원문 창 크기
MIN_RELEVANCE_KEEP = 0.5 # LLM 출력 필터
SPAN_MIN_CHARS = 80 # 이 미만이면 window enlarge
SPAN_ENLARGE_TARGET = 120 # enlarge 시 재윈도우 target_chars
SPAN_MAX_CHARS = 300 # 이 초과면 cut (synthesis token budget 보호)
LLM_TIMEOUT_MS = 15000
PROMPT_VERSION = "v1"
# 확장 여지 — None 이면 비활성 (baseline). 실측 후 0.8 등으로 켠다.
EVIDENCE_FAST_PATH_THRESHOLD: float | None = None
# ─── 반환 타입 ───────────────────────────────────────────
@dataclass(slots=True)
class EvidenceItem:
"""LLM 또는 rule fallback 이 추출한 단일 evidence span.
n 은 doc-group ordering + relevance 정렬 후 1부터 재부여된다.
`full_snippet` 은 **synthesis 프롬프트에 절대 포함 금지** — debug / citation
원문 보기 전용.
"""
n: int # 1-based, synthesis 프롬프트의 [n] 과 매핑
chunk_id: int | None
doc_id: int
title: str | None
section_title: str | None
span_text: str # LLM 추출 (또는 rule fallback) span, 80~300자
relevance: float # LLM 0~1 (fallback 시 rerank_score 복사)
rerank_score: float # raw reranker 점수
full_snippet: str # 원본 800자 (debug/citation 전용, synthesis 금지)
# ─── 프롬프트 로딩 (module 초기화 1회) ───────────────────
try:
EVIDENCE_PROMPT = _load_prompt("evidence_extract.txt")
except FileNotFoundError:
EVIDENCE_PROMPT = ""
logger.warning(
"evidence_extract.txt not found — evidence_service will always use rule-only fallback"
)
# ─── Helper: candidates → LLM 입력 블록 ──────────────────
def _build_numbered_candidates(
candidates: list["SearchResult"], query: str
) -> tuple[str, list[str]]:
"""LLM 프롬프트의 {numbered_candidates} 블록 + 재윈도우된 full_snippet 리스트.
Returns:
(block_str, full_snippets) — full_snippets[i] 는 1-based n=i+1 의 원문
"""
lines: list[str] = []
full_snippets: list[str] = []
for i, c in enumerate(candidates, 1):
title = (c.title or "").strip()
raw_text = c.snippet or ""
full = _extract_window(raw_text, query, target_chars=CANDIDATE_SNIPPET_CHARS)
full_snippets.append(full)
lines.append(f"[{i}] title: {title} / text: {full}")
return "\n".join(lines), full_snippets
# ─── Helper: span length 보정 ───────────────────────────
def _normalize_span(span: str, full: str, query: str) -> tuple[str, bool]:
"""span 을 SPAN_MIN_CHARS ~ SPAN_MAX_CHARS 범위로 보정.
Returns:
(normalized_span, was_expanded)
- was_expanded=True 이면 "short_span_expanded" 로그 대상
"""
s = (span or "").strip()
expanded = False
if len(s) < SPAN_MIN_CHARS:
# soft hallucination 방어 — query 중심으로 window 재확장
s = _extract_window(full, query, target_chars=SPAN_ENLARGE_TARGET)
expanded = True
if len(s) > SPAN_MAX_CHARS:
s = s[:SPAN_MAX_CHARS]
return s, expanded
# ─── Helper: doc-group ordering ─────────────────────────
def _apply_doc_group_ordering(
items: list[EvidenceItem],
results: list["SearchResult"],
) -> list[EvidenceItem]:
"""검색 결과 doc 순서 유지 + doc 내부만 relevance desc + n 재부여.
answer 는 [1][2][3] 순서로 생성되고 그 순서가 문맥 흐름을 결정한다.
전역 relevance desc 정렬은 "doc A span1 → doc B span1 → doc A span2"
처럼 튀면서 읽기 이상한 답변을 만든다.
"""
if not items:
return []
doc_order: dict[int, int] = {}
for idx, r in enumerate(results):
if r.id not in doc_order:
doc_order[r.id] = idx
# 정렬: (doc 순서, -relevance)
items.sort(
key=lambda it: (doc_order.get(it.doc_id, 9999), -it.relevance)
)
# n 재부여
for new_n, it in enumerate(items, 1):
it.n = new_n
return items
# ─── Helper: rule-only fallback ─────────────────────────
def _build_rule_only_evidence(
candidates: list["SearchResult"],
full_snippets: list[str],
query: str,
) -> list[EvidenceItem]:
"""LLM 실패/timeout 시 rule-only 경로.
⚠ `full_snippet[:200]` 같은 앞자르기 금지. 반드시 `_extract_window` 로
query 중심 윈도우를 만든다. relevance 는 rerank_score 복사.
"""
items: list[EvidenceItem] = []
for i, (c, full) in enumerate(zip(candidates, full_snippets), 1):
span = _extract_window(full, query, target_chars=200)
# 정규화 (보통 여기서는 SPAN_MIN_CHARS 이상이지만 안전장치)
span, _expanded = _normalize_span(span, full, query)
items.append(
EvidenceItem(
n=i,
chunk_id=c.chunk_id,
doc_id=c.id,
title=c.title,
section_title=c.section_title,
span_text=span,
relevance=float(c.rerank_score or c.score or 0.0),
rerank_score=float(c.rerank_score or c.score or 0.0),
full_snippet=full,
)
)
return items
# ─── Core: extract_evidence ─────────────────────────────
async def extract_evidence(
query: str,
results: list["SearchResult"],
ai_client: AIClient | None = None,
) -> tuple[list[EvidenceItem], str | None]:
"""reranked results → EvidenceItem 리스트.
Returns:
(items, skip_reason)
skip_reason ∈ {None, "empty_retrieval", "all_low_rerank", "fast_path",
"llm_timeout_fallback_rule", "llm_error_fallback_rule",
"parse_failed_fallback_rule", "all_llm_rejected"}
- skip_reason 이 None 이 아니어도 items 는 비어있지 않을 수 있다
(fallback/fast_path 경로).
"""
if not results:
return [], "empty_retrieval"
# ── 1차 rule filter: rerank_score >= EVIDENCE_MIN_RERANK + max_per_doc ──
candidates: list["SearchResult"] = []
per_doc: dict[int, int] = {}
for r in results:
raw_score = r.rerank_score if r.rerank_score is not None else r.score
if raw_score is None or raw_score < EVIDENCE_MIN_RERANK:
continue
if per_doc.get(r.id, 0) >= MAX_PER_DOC:
continue
candidates.append(r)
per_doc[r.id] = per_doc.get(r.id, 0) + 1
if len(candidates) >= MAX_EVIDENCE_CANDIDATES:
break
if not candidates:
return [], "all_low_rerank"
# ── Fast-path (현재 비활성) ─────────────────────────
if EVIDENCE_FAST_PATH_THRESHOLD is not None:
# ⚠ display score 가 아니라 raw rerank_score 로 판단.
# normalize_display_scores 를 거친 r.score 는 frontend 용 리스케일
# 값이라 distribution drift 가능. fast-path 는 reranker raw 신호가 안전.
top_rerank = (
results[0].rerank_score if results[0].rerank_score is not None else 0.0
)
if top_rerank is not None and top_rerank >= EVIDENCE_FAST_PATH_THRESHOLD:
_block, full_snippets = _build_numbered_candidates(candidates, query)
items = _build_rule_only_evidence(candidates, full_snippets, query)
items = _apply_doc_group_ordering(items, results)
logger.info(
"evidence fast_path query=%r candidates=%d kept=%d top_rerank=%.2f",
query[:80], len(candidates), len(items), top_rerank,
)
return items, "fast_path"
# ── LLM 호출 준비 ───────────────────────────────────
if not EVIDENCE_PROMPT:
# 프롬프트 미로딩 → rule-only
_block, full_snippets = _build_numbered_candidates(candidates, query)
items = _build_rule_only_evidence(candidates, full_snippets, query)
items = _apply_doc_group_ordering(items, results)
logger.warning(
"evidence prompt_not_loaded → rule fallback query=%r kept=%d",
query[:80], len(items),
)
return items, "llm_error_fallback_rule"
block, full_snippets = _build_numbered_candidates(candidates, query)
prompt = EVIDENCE_PROMPT.replace("{query}", query).replace(
"{numbered_candidates}", block
)
client_owned = False
if ai_client is None:
ai_client = AIClient()
client_owned = True
t_start = time.perf_counter()
raw: str | None = None
llm_error: str | None = None
try:
# ⚠ semaphore 대기는 timeout 바깥. timeout 은 실제 LLM 호출에만.
async with get_mlx_gate():
async with asyncio.timeout(LLM_TIMEOUT_MS / 1000):
raw = await ai_client._call_chat(ai_client.ai.primary, prompt)
except asyncio.TimeoutError:
llm_error = "timeout"
except Exception as exc:
llm_error = f"llm_error:{type(exc).__name__}"
finally:
if client_owned:
try:
await ai_client.close()
except Exception:
pass
elapsed_ms = (time.perf_counter() - t_start) * 1000
# ── LLM 실패 → rule fallback ────────────────────────
if llm_error is not None:
items = _build_rule_only_evidence(candidates, full_snippets, query)
items = _apply_doc_group_ordering(items, results)
logger.warning(
"evidence LLM %s → rule fallback query=%r candidates=%d kept=%d elapsed_ms=%.0f",
llm_error, query[:80], len(candidates), len(items), elapsed_ms,
)
return items, "llm_timeout_fallback_rule" if llm_error == "timeout" else "llm_error_fallback_rule"
parsed = parse_json_response(raw or "")
if not isinstance(parsed, dict) or not isinstance(parsed.get("items"), list):
items = _build_rule_only_evidence(candidates, full_snippets, query)
items = _apply_doc_group_ordering(items, results)
logger.warning(
"evidence parse_failed → rule fallback query=%r raw=%r elapsed_ms=%.0f",
query[:80], (raw or "")[:200], elapsed_ms,
)
return items, "parse_failed_fallback_rule"
# ── LLM 출력 파싱 ──────────────────────────────────
short_span_expanded = 0
llm_items: list[EvidenceItem] = []
for entry in parsed["items"]:
if not isinstance(entry, dict):
continue
try:
n_raw = int(entry.get("n", 0))
except (TypeError, ValueError):
continue
if n_raw < 1 or n_raw > len(candidates):
continue
try:
relevance = float(entry.get("relevance", 0.0) or 0.0)
except (TypeError, ValueError):
relevance = 0.0
if relevance < MIN_RELEVANCE_KEEP:
continue
span_raw = entry.get("span")
if not isinstance(span_raw, str) or not span_raw.strip():
continue
candidate = candidates[n_raw - 1]
full = full_snippets[n_raw - 1]
span, expanded = _normalize_span(span_raw, full, query)
if expanded:
short_span_expanded += 1
llm_items.append(
EvidenceItem(
n=n_raw, # doc-group ordering 에서 재부여됨
chunk_id=candidate.chunk_id,
doc_id=candidate.id,
title=candidate.title,
section_title=candidate.section_title,
span_text=span,
relevance=relevance,
rerank_score=float(
candidate.rerank_score
if candidate.rerank_score is not None
else (candidate.score or 0.0)
),
full_snippet=full,
)
)
# ── LLM 이 전부 reject → rule fallback ──────────────
if not llm_items:
items = _build_rule_only_evidence(candidates, full_snippets, query)
items = _apply_doc_group_ordering(items, results)
logger.warning(
"evidence all_llm_rejected → rule fallback query=%r elapsed_ms=%.0f",
query[:80], elapsed_ms,
)
return items, "all_llm_rejected"
# ── doc-group ordering + n 재부여 ───────────────────
llm_items = _apply_doc_group_ordering(llm_items, results)
logger.info(
"evidence ok query=%r candidates=%d kept=%d short_span_expanded=%d elapsed_ms=%.0f",
query[:80], len(candidates), len(llm_items), short_span_expanded, elapsed_ms,
)
return llm_items, None

View File

@@ -0,0 +1,58 @@
"""MLX single-inference 전역 gate (Phase 3.1.1).
Mac mini MLX primary(gemma-4-26b-a4b-it-8bit)는 **single-inference**다.
동시 호출이 들어오면 queue가 폭발한다(실측: 23 concurrent 요청 → 22개 15초 timeout).
이 모듈은 analyzer / evidence / synthesis 등 **모든 MLX-bound LLM 호출**이
공유하는 `asyncio.Semaphore(1)`를 제공한다. MLX를 호출하는 경로는 예외 없이
`async with get_mlx_gate():` 블록 안에서만 `AIClient._call_chat(ai.primary, ...)`
를 호출해야 한다.
## 영구 룰
- **MLX primary 호출 경로는 예외 없이 gate 획득 필수**. query_analyzer /
evidence_service / synthesis_service 세 곳이 현재 사용자. 이후 경로가 늘어도
동일 gate를 import해서 사용한다. 새 Semaphore를 만들지 말 것 (큐 분할 시
동시 실행 발생).
- **`asyncio.timeout(...)`은 gate 안쪽에서만 적용**. gate 대기 자체에 timeout을
걸면 "대기만으로 timeout 발동" 버그가 재발한다(query_analyzer 초기 이슈).
- **fallback(Ollama) 경로는 gate 제외**. GPU Ollama는 concurrent OK. 단 현재
구현상 `AIClient._call_chat` 내부에서 primary→fallback 전환이 일어나므로
fallback도 gate 점유 상태로 실행된다. 허용 가능(fallback 빈도 낮음).
- **MLX concurrency는 `MLX_CONCURRENCY = 1` 고정**. 모델이 바뀌어도 single-
inference 특성이 깨지지 않는 한 이 값을 올리지 말 것.
## 확장 여지 (지금은 구현하지 않음)
트래픽 증가 시 "우선순위 역전"(/ask가 analyzer background task 뒤에 밀림)이
문제가 되면 `asyncio.PriorityQueue` 기반 우선순위 큐로 교체 가능. Gate 자체
분리(get_analyzer_gate / get_ask_gate)는 single-inference에서 throughput
개선이 없으므로 의미 없음.
"""
from __future__ import annotations
import asyncio
# MLX primary는 single-inference → 1
MLX_CONCURRENCY = 1
# 첫 호출 시 현재 event loop에 바인딩된 Semaphore 생성 (lazy init)
_mlx_gate: asyncio.Semaphore | None = None
def get_mlx_gate() -> asyncio.Semaphore:
"""MLX primary 호출 경로 공용 gate. 최초 호출 시 lazy init.
사용 예:
async with get_mlx_gate():
async with asyncio.timeout(LLM_TIMEOUT_MS / 1000):
raw = await ai_client._call_chat(ai_client.ai.primary, prompt)
⚠ `asyncio.timeout`은 반드시 gate 안쪽에 둘 것. 바깥에 두면 gate 대기만으로
timeout이 발동한다.
"""
global _mlx_gate
if _mlx_gate is None:
_mlx_gate = asyncio.Semaphore(MLX_CONCURRENCY)
return _mlx_gate

View File

@@ -36,6 +36,8 @@ from ai.client import AIClient, _load_prompt, parse_json_response
from core.config import settings from core.config import settings
from core.utils import setup_logger from core.utils import setup_logger
from .llm_gate import get_mlx_gate
logger = setup_logger("query_analyzer") logger = setup_logger("query_analyzer")
# ─── 상수 (plan 영구 룰) ──────────────────────────────── # ─── 상수 (plan 영구 룰) ────────────────────────────────
@@ -67,17 +69,16 @@ _CACHE: dict[str, dict[str, Any]] = {}
_PENDING: set[asyncio.Task[Any]] = set() _PENDING: set[asyncio.Task[Any]] = set()
# 동일 쿼리 중복 실행 방지 (진행 중인 쿼리 집합) # 동일 쿼리 중복 실행 방지 (진행 중인 쿼리 집합)
_INFLIGHT: set[str] = set() _INFLIGHT: set[str] = set()
# MLX concurrency 제한 (single-inference → 1)
# 첫 호출 시 lazy init (event loop이 준비된 후)
_LLM_SEMAPHORE: asyncio.Semaphore | None = None
def _get_llm_semaphore() -> asyncio.Semaphore: def _get_llm_semaphore() -> asyncio.Semaphore:
"""첫 호출 시 현재 event loop에 바인딩된 semaphore 생성.""" """MLX single-inference gate를 반환. Phase 3.1부터 llm_gate.get_mlx_gate()
global _LLM_SEMAPHORE 로 위임 — analyzer / evidence / synthesis 가 동일 semaphore 공유.
if _LLM_SEMAPHORE is None:
_LLM_SEMAPHORE = asyncio.Semaphore(LLM_CONCURRENCY) `LLM_CONCURRENCY` 상수는 하위 호환/문서용으로 유지하되, 실제 bound는
return _LLM_SEMAPHORE `llm_gate.MLX_CONCURRENCY` 가 담당한다.
"""
return get_mlx_gate()
def _cache_key(query: str) -> str: def _cache_key(query: str) -> str:

View File

@@ -134,7 +134,12 @@ async def rerank_chunks(
if idx is None or sc is None or idx >= len(candidates): if idx is None or sc is None or idx >= len(candidates):
continue continue
chunk = candidates[idx] chunk = candidates[idx]
chunk.score = float(sc) score = float(sc)
chunk.score = score
# Phase 3.1: reranker raw 점수를 별도 필드에 보존.
# normalize_display_scores가 나중에 .score를 랭크 기반으로 덮어써도
# fast-path 판단에 쓸 수 있는 원본 신호 유지.
chunk.rerank_score = score
chunk.match_reason = (chunk.match_reason or "") + "+rerank" chunk.match_reason = (chunk.match_reason or "") + "+rerank"
reranked.append(chunk) reranked.append(chunk)
return reranked[:limit] return reranked[:limit]

View File

@@ -0,0 +1,335 @@
"""검색 파이프라인 오케스트레이션 (Phase 3.1).
`/api/search/` 와 `/api/search/ask` 가 공유하는 단일 진실 소스.
## 순수성 규칙 (영구)
`run_search()`는 wrapper(endpoint)에서 side effect를 최대한 분리한다:
- ❌ **금지**: `BackgroundTasks` 파라미터, `logger.info(...)` 직접 호출,
`record_search_event()` 호출, `SearchResponse`/`AskResponse` 직렬화
- ✅ **허용**: `trigger_background_analysis()` (analyzer cache miss 시
fire-and-forget task — retrieval 전략의 일부, 자가 완결됨)
- ✅ **허용**: retrieval / fusion / rerank / diversity / display 정규화 /
confidence 계산 같은 내부 서비스 호출
반환값은 `PipelineResult` 하나. wrapper가 그 안에서 필요한 필드를 꺼내
logger / telemetry / 응답 직렬화를 수행한다.
## Phase 2 호환
본 모듈은 기존 `app/api/search.py::search()` 함수 본문을 lift-and-shift 한
것이다. 변수명 / notes 문자열 / timing 키 / logger 포맷 은 wrapper 쪽에서
완전히 동일하게 재구성된다. refactor 전후 `/search?debug=true` 응답은
byte-level 에 가깝게 일치해야 한다.
"""
from __future__ import annotations
import time
from dataclasses import dataclass, field
from typing import TYPE_CHECKING, Literal
from sqlalchemy.ext.asyncio import AsyncSession
from . import query_analyzer
from .fusion_service import (
DEFAULT_FUSION,
apply_soft_filter_boost,
get_strategy,
normalize_display_scores,
)
from .rerank_service import (
MAX_CHUNKS_PER_DOC,
MAX_RERANK_INPUT,
apply_diversity,
rerank_chunks,
)
from .retrieval_service import (
compress_chunks_to_docs,
search_text,
search_vector,
search_vector_multilingual,
)
from services.search_telemetry import (
compute_confidence,
compute_confidence_hybrid,
compute_confidence_reranked,
)
if TYPE_CHECKING:
from api.search import SearchResult
# ─── Phase 2.1: analyzer_confidence 3단계 게이트 ──────────
# search.py 에서 이동. search.py 의 /search wrapper 는 이 상수들을
# 노출할 필요 없으므로 파이프라인 모듈에만 둔다.
ANALYZER_TIER_IGNORE = 0.5 # < 0.5 → analyzer 완전 무시, soft_filter 비활성
ANALYZER_TIER_ORIGINAL = 0.7 # < 0.7 → original query fallback
ANALYZER_TIER_MERGE = 0.85 # < 0.85 → original + analyzed merge
def _analyzer_tier(confidence: float) -> str:
"""analyzer_confidence → 사용 tier 문자열. Phase 2.2/2.3에서 실제 분기용."""
if confidence < ANALYZER_TIER_IGNORE:
return "ignore"
if confidence < ANALYZER_TIER_ORIGINAL:
return "original_fallback"
if confidence < ANALYZER_TIER_MERGE:
return "merge"
return "analyzed"
# ─── 반환 타입 ─────────────────────────────────────────────
@dataclass(slots=True)
class PipelineResult:
"""run_search() 반환 — wrapper 가 필요한 모든 state 를 담는다."""
# ── 최종 결과 (API 노출) ──
results: "list[SearchResult]"
mode: str
confidence_signal: float
# ── 중간 단계 (evidence 입력 + debug) ──
text_results: "list[SearchResult]"
vector_results: "list[SearchResult]" # doc 압축 후
raw_chunks: "list[SearchResult]" # chunk 원본 (rerank/evidence용)
chunks_by_doc: "dict[int, list[SearchResult]]"
# ── 쿼리 분석 메타 ──
query_analysis: dict | None
analyzer_cache_hit: bool
analyzer_confidence: float # 항상 float (None 금지)
analyzer_tier: str
# ── 관측 ──
timing_ms: dict[str, float] = field(default_factory=dict)
notes: list[str] = field(default_factory=list)
# ─── 메인 파이프라인 ───────────────────────────────────────
async def run_search(
session: AsyncSession,
q: str,
*,
mode: Literal["fts", "trgm", "vector", "hybrid"] = "hybrid",
limit: int = 20,
fusion: str = DEFAULT_FUSION,
rerank: bool = True,
analyze: bool = False,
) -> PipelineResult:
"""검색 파이프라인 실행.
retrieval → fusion → rerank → diversity → display 정규화 → confidence 계산
까지 수행하고 `PipelineResult` 를 반환한다. logging / BackgroundTasks /
응답 직렬화는 절대 수행하지 않는다 (wrapper 책임).
Args:
session: AsyncSession (caller 가 관리)
q: 사용자 쿼리 원문
mode: fts | trgm | vector | hybrid
limit: 최종 결과 수 (hybrid 에서는 fusion 입력 후보 수는 이보다 넓음)
fusion: legacy | rrf | rrf_boost
rerank: bge-reranker-v2-m3 활성화 (hybrid 전용)
analyze: QueryAnalyzer 활성화 (cache hit 조건부 멀티링구얼 / soft filter)
Returns:
PipelineResult
"""
# 로컬 import — circular 방지 (SearchResult 는 api.search 에 inline 선언)
from api.search import SearchResult # noqa: F401 — TYPE_CHECKING 실런타임 반영
timing: dict[str, float] = {}
notes: list[str] = []
text_results: list["SearchResult"] = []
vector_results: list["SearchResult"] = [] # doc-level (압축 후, fusion 입력)
raw_chunks: list["SearchResult"] = [] # chunk-level (raw, Phase 1.3 reranker용)
chunks_by_doc: dict[int, list["SearchResult"]] = {} # Phase 1.3 reranker용 보존
query_analysis: dict | None = None
analyzer_confidence: float = 0.0
analyzer_tier: str = "disabled"
t_total = time.perf_counter()
# Phase 2.1 (async 구조): QueryAnalyzer는 동기 호출 금지.
# - cache hit → query_analysis 활용 (Phase 2.2/2.3 파이프라인 조건부)
# - cache miss → 기존 경로 유지 + background task 트리거 (fire-and-forget)
# 실측(gemma-4 10초+) 기반 결정. memory: feedback_analyzer_async_only.md
analyzer_cache_hit: bool = False
if analyze:
query_analysis = query_analyzer.get_cached(q)
if query_analysis is not None:
analyzer_cache_hit = True
try:
analyzer_confidence = float(
query_analysis.get("analyzer_confidence", 0.0) or 0.0
)
except (TypeError, ValueError):
analyzer_confidence = 0.0
analyzer_tier = _analyzer_tier(analyzer_confidence)
notes.append(
f"analyzer cache_hit conf={analyzer_confidence:.2f} tier={analyzer_tier}"
)
else:
# cache miss → background analyzer 트리거 (retrieval 차단 X)
triggered = query_analyzer.trigger_background_analysis(q)
analyzer_tier = "cache_miss"
notes.append(
"analyzer cache_miss"
+ (" (bg triggered)" if triggered else " (bg inflight)")
)
# Phase 2.2: multilingual vector search 활성 조건 (보수적)
# - cache hit + analyzer_tier == "analyzed" (≥0.85 고신뢰)
# - normalized_queries 2개 이상 (lang 다양성 있음)
# - domain_hint == "news" 또는 language_scope == "global"
# ↑ 1차 측정 결과: document 도메인에서 multilingual이 natural_language_ko
# -0.10 악화시킴. 영어 번역이 한국어 법령 검색에서 noise로 작용.
# news / global 영역에서만 multilingual 활성 (news_crosslingual +0.10 개선 확인).
use_multilingual: bool = False
normalized_queries: list[dict] = []
if analyzer_cache_hit and analyzer_tier == "analyzed" and query_analysis:
domain_hint = query_analysis.get("domain_hint", "mixed")
language_scope = query_analysis.get("language_scope", "limited")
is_multilingual_candidate = (
domain_hint == "news" or language_scope == "global"
)
if is_multilingual_candidate:
raw_nq = query_analysis.get("normalized_queries") or []
if isinstance(raw_nq, list) and len(raw_nq) >= 2:
normalized_queries = [
nq for nq in raw_nq if isinstance(nq, dict) and nq.get("text")
]
if len(normalized_queries) >= 2:
use_multilingual = True
notes.append(
f"multilingual langs={[nq.get('lang') for nq in normalized_queries]}"
f" hint={domain_hint}/{language_scope}"
)
if mode == "vector":
t0 = time.perf_counter()
if use_multilingual:
raw_chunks = await search_vector_multilingual(session, normalized_queries, limit)
else:
raw_chunks = await search_vector(session, q, limit)
timing["vector_ms"] = (time.perf_counter() - t0) * 1000
if not raw_chunks:
notes.append("vector_search_returned_empty (AI client error or no embeddings)")
# vector 단독 모드도 doc 압축해서 다양성 확보 (chunk 중복 방지)
vector_results, chunks_by_doc = compress_chunks_to_docs(raw_chunks, limit)
results = vector_results
else:
t0 = time.perf_counter()
text_results = await search_text(session, q, limit)
timing["text_ms"] = (time.perf_counter() - t0) * 1000
if mode == "hybrid":
t1 = time.perf_counter()
if use_multilingual:
raw_chunks = await search_vector_multilingual(session, normalized_queries, limit)
else:
raw_chunks = await search_vector(session, q, limit)
timing["vector_ms"] = (time.perf_counter() - t1) * 1000
# chunk-level → doc-level 압축 (raw chunks는 chunks_by_doc에 보존)
t1b = time.perf_counter()
vector_results, chunks_by_doc = compress_chunks_to_docs(raw_chunks, limit)
timing["compress_ms"] = (time.perf_counter() - t1b) * 1000
if not vector_results:
notes.append("vector_search_returned_empty — text-only fallback")
t2 = time.perf_counter()
strategy = get_strategy(fusion)
# fusion은 doc 기준 — 더 넓게 가져옴 (rerank 후보용)
fusion_limit = max(limit * 5, 100) if rerank else limit
fused_docs = strategy.fuse(text_results, vector_results, q, fusion_limit)
timing["fusion_ms"] = (time.perf_counter() - t2) * 1000
notes.append(f"fusion={strategy.name}")
notes.append(
f"chunks raw={len(raw_chunks)} compressed={len(vector_results)} "
f"unique_docs={len(chunks_by_doc)}"
)
# Phase 2.3: soft_filter boost (cache hit + tier != ignore 일 때만)
# analyzer_confidence < 0.5 (tier=ignore)는 비활성.
if (
analyzer_cache_hit
and analyzer_tier != "ignore"
and query_analysis
):
soft_filters = query_analysis.get("soft_filters") or {}
if soft_filters:
boosted = apply_soft_filter_boost(fused_docs, soft_filters)
if boosted > 0:
notes.append(f"soft_filter_boost applied={boosted}")
if rerank:
# Phase 1.3: reranker — chunk 기준 입력
# fusion 결과 doc_id로 chunks_by_doc에서 raw chunks 회수
t3 = time.perf_counter()
rerank_input: list["SearchResult"] = []
for doc in fused_docs:
chunks = chunks_by_doc.get(doc.id, [])
if chunks:
# doc당 max 2 chunk (latency/VRAM 보호)
rerank_input.extend(chunks[:MAX_CHUNKS_PER_DOC])
else:
# text-only 매치 doc → doc 자체를 chunk처럼 wrap
rerank_input.append(doc)
if len(rerank_input) >= MAX_RERANK_INPUT:
break
rerank_input = rerank_input[:MAX_RERANK_INPUT]
notes.append(f"rerank input={len(rerank_input)}")
reranked = await rerank_chunks(q, rerank_input, limit * 3)
timing["rerank_ms"] = (time.perf_counter() - t3) * 1000
# diversity (chunk → doc 압축, max_per_doc=2, top score>0.90 unlimited)
t4 = time.perf_counter()
results = apply_diversity(reranked, max_per_doc=MAX_CHUNKS_PER_DOC)[:limit]
timing["diversity_ms"] = (time.perf_counter() - t4) * 1000
else:
# rerank 비활성: fused_docs를 그대로 (limit 적용)
results = fused_docs[:limit]
else:
results = text_results
# display score 정규화 — 프론트엔드는 score*100을 % 표시.
# fusion 내부 score(RRF는 0.01~0.05 범위)를 그대로 노출하면 표시가 깨짐.
# Phase 3.1: rerank_score 필드는 여기서 건드리지 않음 (raw 보존).
normalize_display_scores(results)
timing["total_ms"] = (time.perf_counter() - t_total) * 1000
# confidence는 fusion 적용 전 raw 신호로 계산 (Phase 0.5 이후 fused score는 절대값 의미 없음)
# rerank 활성 시 reranker score가 가장 신뢰할 수 있는 신호 → 우선 사용
if mode == "hybrid":
if rerank and "rerank_ms" in timing:
confidence_signal = compute_confidence_reranked(results)
else:
confidence_signal = compute_confidence_hybrid(text_results, vector_results)
elif mode == "vector":
confidence_signal = compute_confidence(vector_results, "vector")
else:
confidence_signal = compute_confidence(text_results, mode)
return PipelineResult(
results=results,
mode=mode,
confidence_signal=confidence_signal,
text_results=text_results,
vector_results=vector_results,
raw_chunks=raw_chunks,
chunks_by_doc=chunks_by_doc,
query_analysis=query_analysis,
analyzer_cache_hit=analyzer_cache_hit,
analyzer_confidence=analyzer_confidence,
analyzer_tier=analyzer_tier,
timing_ms=timing,
notes=notes,
)

View File

@@ -1,6 +1,422 @@
"""Grounded answer synthesis 서비스 (Phase 3). """Grounded answer synthesis 서비스 (Phase 3.3).
evidence span을 Gemma 4에 전달해 인용 기반 답변 생성. evidence span 을 Gemma 4 에 전달해 citation 기반 답변 생성한다.
3~4초 soft timeout, 타임아웃 시 결과만 반환 fallback. 캐시 / timeout / citation 검증 / refused 처리 포함.
구현은 Phase 3에서 채움.
## 영구 룰
- **span-only 입력**: `_render_prompt()` 는 `EvidenceItem.span_text` 만 참조한다.
`EvidenceItem.full_snippet` 을 프롬프트에 포함하면 LLM 이 span 밖 내용을
hallucinate 한다. 이 규칙이 깨지면 시스템 무너짐 → docstring + 코드 패턴으로
방어 (함수 상단에서 제한 뷰만 만든다).
- **cache 는 성공 + 고신뢰에만**: 실패 (timeout/parse_failed/llm_error) 와
low confidence / refused 는 캐시 금지. 잘못된 답변 고정 방지.
- **MLX gate 공유**: `get_mlx_gate()` 경유. analyzer / evidence 와 동일 semaphore.
- **timeout 15s**: `asyncio.timeout` 은 gate 안쪽에서만 적용. 바깥에 두면 gate
대기만으로 timeout 발동.
- **citation 검증**: 본문 `[n]` 범위 초과는 제거 + `hallucination_flags` 기록.
answer 수정본을 반환하되 status 는 completed 유지 (silent fix + observable).
""" """
from __future__ import annotations
import asyncio
import hashlib
import re
import time
from dataclasses import dataclass, field
from typing import TYPE_CHECKING, Literal
from ai.client import AIClient, _load_prompt, parse_json_response
from core.config import settings
from core.utils import setup_logger
from .llm_gate import get_mlx_gate
if TYPE_CHECKING:
from .evidence_service import EvidenceItem
logger = setup_logger("synthesis")
# ─── 상수 (plan 영구 룰) ─────────────────────────────────
PROMPT_VERSION = "v1"
LLM_TIMEOUT_MS = 15000
CACHE_TTL = 3600 # 1h (answer 는 원문 변경에 민감 → query_analyzer 24h 보다 짧게)
CACHE_MAXSIZE = 300
MAX_ANSWER_CHARS = 400
SynthesisStatus = Literal[
"completed",
"timeout",
"skipped",
"no_evidence",
"parse_failed",
"llm_error",
]
# ─── 반환 타입 ───────────────────────────────────────────
@dataclass(slots=True)
class SynthesisResult:
"""synthesize() 반환. cache dict 에 들어가는 payload 이기도 함."""
status: SynthesisStatus
answer: str | None
used_citations: list[int] # 검증 후 실제로 본문에 등장한 n
confidence: Literal["high", "medium", "low"] | None
refused: bool
refuse_reason: str | None
elapsed_ms: float
cache_hit: bool
hallucination_flags: list[str] = field(default_factory=list)
raw_preview: str | None = None # debug=true 일 때 LLM raw 500자
# ─── 프롬프트 로딩 (module 초기화 1회) ──────────────────
try:
SYNTHESIS_PROMPT = _load_prompt("search_synthesis.txt")
except FileNotFoundError:
SYNTHESIS_PROMPT = ""
logger.warning(
"search_synthesis.txt not found — synthesis will always return llm_error"
)
# ─── in-memory LRU (FIFO 근사, query_analyzer 패턴 복제) ─
_CACHE: dict[str, SynthesisResult] = {}
def _model_version() -> str:
"""현재 primary 모델 ID — 캐시 키에 반영."""
if settings.ai and settings.ai.primary:
return settings.ai.primary.model
return "unknown-model"
def _cache_key(query: str, chunk_ids: list[int]) -> str:
"""(query + sorted chunk_ids + PROMPT_VERSION + model) sha256."""
sorted_ids = ",".join(str(c) for c in sorted(chunk_ids))
raw = f"{query}|{sorted_ids}|{PROMPT_VERSION}|{_model_version()}"
return hashlib.sha256(raw.encode("utf-8")).hexdigest()
def get_cached(query: str, chunk_ids: list[int]) -> SynthesisResult | None:
"""캐시 조회. TTL 경과는 자동 삭제."""
key = _cache_key(query, chunk_ids)
entry = _CACHE.get(key)
if entry is None:
return None
# TTL 체크는 elapsed_ms 를 악용할 수 없으므로 별도 저장
# 여기서는 단순 policy 로 처리: entry 가 있으면 반환 (eviction 은 FIFO 시점)
# 정확한 TTL 이 필요하면 (ts, result) tuple 로 저장해야 함.
return entry
def _should_cache(result: SynthesisResult) -> bool:
"""실패/저신뢰/refused 는 캐시 금지."""
return (
result.status == "completed"
and result.confidence in ("high", "medium")
and not result.refused
and result.answer is not None
)
def set_cached(query: str, chunk_ids: list[int], result: SynthesisResult) -> None:
"""조건부 저장 + FIFO eviction."""
if not _should_cache(result):
return
key = _cache_key(query, chunk_ids)
if key in _CACHE:
_CACHE[key] = result
return
if len(_CACHE) >= CACHE_MAXSIZE:
try:
oldest = next(iter(_CACHE))
_CACHE.pop(oldest, None)
except StopIteration:
pass
_CACHE[key] = result
def cache_stats() -> dict[str, int]:
"""debug/운영용."""
return {"size": len(_CACHE), "maxsize": CACHE_MAXSIZE}
# ─── Prompt rendering (🔒 span_text ONLY) ───────────────
def _render_prompt(query: str, evidence: list["EvidenceItem"]) -> str:
"""{query} / {numbered_evidence} 치환.
⚠ **MUST NOT access `item.full_snippet`**. Use `span_text` ONLY.
Rationale: 프롬프트에 full_snippet 을 넣으면 LLM 이 span 밖 내용으로
hallucinate 한다. full_snippet 은 debug / citation 원문 전용.
제한 뷰만 만들어서 full_snippet 접근을 문법적으로 어렵게 만든다.
"""
# 제한 뷰 — 이 튜플에는 span_text 외의 snippet 필드가 없다
spans: list[tuple[int, str, str]] = [
(i.n, (i.title or "").strip(), i.span_text) for i in evidence
]
lines = [f"[{n}] {title}\n{span}" for n, title, span in spans]
numbered_block = "\n\n".join(lines)
return SYNTHESIS_PROMPT.replace("{query}", query).replace(
"{numbered_evidence}", numbered_block
)
# ─── Citation 검증 ──────────────────────────────────────
_CITATION_RE = re.compile(r"\[(\d+)\]")
def _validate_citations(
answer: str,
n_max: int,
) -> tuple[str, list[int], list[str]]:
"""본문 `[n]` 범위 초과 제거 + used_citations 추출 + flags.
Returns:
(corrected_answer, used_citations, hallucination_flags)
"""
flags: list[str] = []
seen: set[int] = set()
used: list[int] = []
corrected = answer
for match in _CITATION_RE.findall(answer):
try:
n = int(match)
except ValueError:
continue
if n < 1 or n > n_max:
# 범위 초과 → 본문에서 제거 + flag
corrected = corrected.replace(f"[{n}]", "")
flags.append(f"removed_n_{n}")
continue
if n not in seen:
seen.add(n)
used.append(n)
used.sort()
if len(corrected) > MAX_ANSWER_CHARS:
corrected = corrected[:MAX_ANSWER_CHARS]
flags.append("length_clipped")
return corrected, used, flags
# ─── Core: synthesize ───────────────────────────────────
async def synthesize(
query: str,
evidence: list["EvidenceItem"],
ai_client: AIClient | None = None,
debug: bool = False,
) -> SynthesisResult:
"""evidence → grounded answer.
Failure modes 는 모두 SynthesisResult 로 반환한다 (예외는 외부로 전파되지
않음). 호출자 (`/ask` wrapper) 가 status 를 보고 user-facing 메시지를
결정한다.
"""
t_start = time.perf_counter()
# ── evidence 비면 즉시 no_evidence ─────────────────
if not evidence:
return SynthesisResult(
status="no_evidence",
answer=None,
used_citations=[],
confidence=None,
refused=False,
refuse_reason=None,
elapsed_ms=(time.perf_counter() - t_start) * 1000,
cache_hit=False,
hallucination_flags=[],
raw_preview=None,
)
# ── cache lookup ───────────────────────────────────
# chunk_id 가 None 인 text-only wrap 은 음수 doc_id 로 구분 → key 안정화
chunk_ids = [
(e.chunk_id if e.chunk_id is not None else -e.doc_id) for e in evidence
]
cached = get_cached(query, chunk_ids)
if cached is not None:
return SynthesisResult(
status=cached.status,
answer=cached.answer,
used_citations=list(cached.used_citations),
confidence=cached.confidence,
refused=cached.refused,
refuse_reason=cached.refuse_reason,
elapsed_ms=(time.perf_counter() - t_start) * 1000,
cache_hit=True,
hallucination_flags=list(cached.hallucination_flags),
raw_preview=cached.raw_preview if debug else None,
)
# ── prompt 준비 ─────────────────────────────────────
if not SYNTHESIS_PROMPT:
return SynthesisResult(
status="llm_error",
answer=None,
used_citations=[],
confidence=None,
refused=False,
refuse_reason=None,
elapsed_ms=(time.perf_counter() - t_start) * 1000,
cache_hit=False,
hallucination_flags=["prompt_not_loaded"],
raw_preview=None,
)
prompt = _render_prompt(query, evidence)
prompt_preview = prompt[:500] if debug else None
# ── LLM 호출 ───────────────────────────────────────
client_owned = False
if ai_client is None:
ai_client = AIClient()
client_owned = True
raw: str | None = None
llm_error: str | None = None
try:
async with get_mlx_gate():
async with asyncio.timeout(LLM_TIMEOUT_MS / 1000):
raw = await ai_client._call_chat(ai_client.ai.primary, prompt)
except asyncio.TimeoutError:
llm_error = "timeout"
except Exception as exc:
llm_error = f"llm_error:{type(exc).__name__}"
finally:
if client_owned:
try:
await ai_client.close()
except Exception:
pass
elapsed_ms = (time.perf_counter() - t_start) * 1000
if llm_error is not None:
status: SynthesisStatus = "timeout" if llm_error == "timeout" else "llm_error"
logger.warning(
"synthesis %s query=%r evidence_n=%d elapsed_ms=%.0f",
llm_error, query[:80], len(evidence), elapsed_ms,
)
return SynthesisResult(
status=status,
answer=None,
used_citations=[],
confidence=None,
refused=False,
refuse_reason=None,
elapsed_ms=elapsed_ms,
cache_hit=False,
hallucination_flags=[llm_error],
raw_preview=None,
)
parsed = parse_json_response(raw or "")
if not isinstance(parsed, dict):
logger.warning(
"synthesis parse_failed query=%r raw=%r elapsed_ms=%.0f",
query[:80], (raw or "")[:200], elapsed_ms,
)
return SynthesisResult(
status="parse_failed",
answer=None,
used_citations=[],
confidence=None,
refused=False,
refuse_reason=None,
elapsed_ms=elapsed_ms,
cache_hit=False,
hallucination_flags=["parse_failed"],
raw_preview=(raw or "")[:500] if debug else None,
)
# ── JSON 필드 검증 ──────────────────────────────────
answer_raw = parsed.get("answer", "")
if not isinstance(answer_raw, str):
answer_raw = ""
conf_raw = parsed.get("confidence", "low")
if conf_raw not in ("high", "medium", "low"):
conf_raw = "low"
refused_raw = bool(parsed.get("refused", False))
refuse_reason_raw = parsed.get("refuse_reason")
if refuse_reason_raw is not None and not isinstance(refuse_reason_raw, str):
refuse_reason_raw = None
# refused 면 answer 무시 + citations 비움
if refused_raw:
result = SynthesisResult(
status="completed",
answer=None,
used_citations=[],
confidence=conf_raw, # type: ignore[arg-type]
refused=True,
refuse_reason=refuse_reason_raw,
elapsed_ms=elapsed_ms,
cache_hit=False,
hallucination_flags=[],
raw_preview=(raw or "")[:500] if debug else None,
)
logger.info(
"synthesis refused query=%r evidence_n=%d conf=%s elapsed_ms=%.0f reason=%r",
query[:80], len(evidence), conf_raw, elapsed_ms, (refuse_reason_raw or "")[:80],
)
# refused 는 캐시 금지 (_should_cache)
return result
# ── citation 검증 ───────────────────────────────────
corrected_answer, used_citations, flags = _validate_citations(
answer_raw, n_max=len(evidence)
)
# answer 가 공백만 남으면 low confidence 로 강등
if not corrected_answer.strip():
corrected_answer_final: str | None = None
conf_raw = "low"
flags.append("empty_after_validation")
else:
corrected_answer_final = corrected_answer
result = SynthesisResult(
status="completed",
answer=corrected_answer_final,
used_citations=used_citations,
confidence=conf_raw, # type: ignore[arg-type]
refused=False,
refuse_reason=None,
elapsed_ms=elapsed_ms,
cache_hit=False,
hallucination_flags=flags,
raw_preview=(raw or "")[:500] if debug else None,
)
logger.info(
"synthesis ok query=%r evidence_n=%d answer_len=%d citations=%d conf=%s flags=%s elapsed_ms=%.0f",
query[:80],
len(evidence),
len(corrected_answer_final or ""),
len(used_citations),
conf_raw,
",".join(flags) if flags else "-",
elapsed_ms,
)
# 조건부 캐시 저장
set_cached(query, chunk_ids, result)
return result