"""Grounded answer synthesis 서비스 (Phase 3.3). evidence span 을 Gemma 4 에 전달해 citation 기반 답변을 생성한다. 캐시 / timeout / citation 검증 / refused 처리 포함. ## 영구 룰 - **span-only 입력**: `_render_prompt()` 는 `EvidenceItem.span_text` 만 참조한다. `EvidenceItem.full_snippet` 을 프롬프트에 포함하면 LLM 이 span 밖 내용을 hallucinate 한다. 이 규칙이 깨지면 시스템 무너짐 → docstring + 코드 패턴으로 방어 (함수 상단에서 제한 뷰만 만든다). - **cache 는 성공 + 고신뢰에만**: 실패 (timeout/parse_failed/llm_error) 와 low confidence / refused 는 캐시 금지. 잘못된 답변 고정 방지. - **MLX gate 공유**: `get_mlx_gate()` 경유. analyzer / evidence 와 동일 semaphore. - **timeout 15s**: `asyncio.timeout` 은 gate 안쪽에서만 적용. 바깥에 두면 gate 대기만으로 timeout 발동. - **citation 검증**: 본문 `[n]` 범위 초과는 제거 + `hallucination_flags` 기록. answer 수정본을 반환하되 status 는 completed 유지 (silent fix + observable). """ from __future__ import annotations import asyncio import hashlib import re import time from dataclasses import dataclass, field from typing import TYPE_CHECKING, Literal from ai.client import AIClient, _load_prompt, parse_json_response from core.config import settings from core.utils import setup_logger from .llm_gate import get_mlx_gate if TYPE_CHECKING: from .evidence_service import EvidenceItem logger = setup_logger("synthesis") # ─── 상수 (plan 영구 룰) ───────────────────────────────── PROMPT_VERSION = "v1" LLM_TIMEOUT_MS = 15000 CACHE_TTL = 3600 # 1h (answer 는 원문 변경에 민감 → query_analyzer 24h 보다 짧게) CACHE_MAXSIZE = 300 MAX_ANSWER_CHARS = 400 SynthesisStatus = Literal[ "completed", "timeout", "skipped", "no_evidence", "parse_failed", "llm_error", ] # ─── 반환 타입 ─────────────────────────────────────────── @dataclass(slots=True) class SynthesisResult: """synthesize() 반환. cache dict 에 들어가는 payload 이기도 함.""" status: SynthesisStatus answer: str | None used_citations: list[int] # 검증 후 실제로 본문에 등장한 n confidence: Literal["high", "medium", "low"] | None refused: bool refuse_reason: str | None elapsed_ms: float cache_hit: bool hallucination_flags: list[str] = field(default_factory=list) raw_preview: str | None = None # debug=true 일 때 LLM raw 500자 # ─── 프롬프트 로딩 (module 초기화 1회) ────────────────── try: SYNTHESIS_PROMPT = _load_prompt("search_synthesis.txt") except FileNotFoundError: SYNTHESIS_PROMPT = "" logger.warning( "search_synthesis.txt not found — synthesis will always return llm_error" ) # ─── in-memory LRU (FIFO 근사, query_analyzer 패턴 복제) ─ _CACHE: dict[str, SynthesisResult] = {} def _model_version() -> str: """현재 primary 모델 ID — 캐시 키에 반영.""" if settings.ai and settings.ai.primary: return settings.ai.primary.model return "unknown-model" def _cache_key(query: str, chunk_ids: list[int]) -> str: """(query + sorted chunk_ids + PROMPT_VERSION + model) sha256.""" sorted_ids = ",".join(str(c) for c in sorted(chunk_ids)) raw = f"{query}|{sorted_ids}|{PROMPT_VERSION}|{_model_version()}" return hashlib.sha256(raw.encode("utf-8")).hexdigest() def get_cached(query: str, chunk_ids: list[int]) -> SynthesisResult | None: """캐시 조회. TTL 경과는 자동 삭제.""" key = _cache_key(query, chunk_ids) entry = _CACHE.get(key) if entry is None: return None # TTL 체크는 elapsed_ms 를 악용할 수 없으므로 별도 저장 # 여기서는 단순 policy 로 처리: entry 가 있으면 반환 (eviction 은 FIFO 시점) # 정확한 TTL 이 필요하면 (ts, result) tuple 로 저장해야 함. return entry def _should_cache(result: SynthesisResult) -> bool: """실패/저신뢰/refused 는 캐시 금지.""" return ( result.status == "completed" and result.confidence in ("high", "medium") and not result.refused and result.answer is not None ) def set_cached(query: str, chunk_ids: list[int], result: SynthesisResult) -> None: """조건부 저장 + FIFO eviction.""" if not _should_cache(result): return key = _cache_key(query, chunk_ids) if key in _CACHE: _CACHE[key] = result return if len(_CACHE) >= CACHE_MAXSIZE: try: oldest = next(iter(_CACHE)) _CACHE.pop(oldest, None) except StopIteration: pass _CACHE[key] = result def cache_stats() -> dict[str, int]: """debug/운영용.""" return {"size": len(_CACHE), "maxsize": CACHE_MAXSIZE} # ─── Prompt rendering (🔒 span_text ONLY) ─────────────── def _render_prompt(query: str, evidence: list["EvidenceItem"]) -> str: """{query} / {numbered_evidence} 치환. ⚠ **MUST NOT access `item.full_snippet`**. Use `span_text` ONLY. Rationale: 프롬프트에 full_snippet 을 넣으면 LLM 이 span 밖 내용으로 hallucinate 한다. full_snippet 은 debug / citation 원문 전용. 제한 뷰만 만들어서 full_snippet 접근을 문법적으로 어렵게 만든다. """ # 제한 뷰 — 이 튜플에는 span_text 외의 snippet 필드가 없다 spans: list[tuple[int, str, str]] = [ (i.n, (i.title or "").strip(), i.span_text) for i in evidence ] lines = [f"[{n}] {title}\n{span}" for n, title, span in spans] numbered_block = "\n\n".join(lines) return SYNTHESIS_PROMPT.replace("{query}", query).replace( "{numbered_evidence}", numbered_block ) # ─── Citation 검증 ────────────────────────────────────── _CITATION_RE = re.compile(r"\[(\d+)\]") def _validate_citations( answer: str, n_max: int, ) -> tuple[str, list[int], list[str]]: """본문 `[n]` 범위 초과 제거 + used_citations 추출 + flags. Returns: (corrected_answer, used_citations, hallucination_flags) """ flags: list[str] = [] seen: set[int] = set() used: list[int] = [] corrected = answer for match in _CITATION_RE.findall(answer): try: n = int(match) except ValueError: continue if n < 1 or n > n_max: # 범위 초과 → 본문에서 제거 + flag corrected = corrected.replace(f"[{n}]", "") flags.append(f"removed_n_{n}") continue if n not in seen: seen.add(n) used.append(n) used.sort() if len(corrected) > MAX_ANSWER_CHARS: corrected = corrected[:MAX_ANSWER_CHARS] flags.append("length_clipped") return corrected, used, flags # ─── Core: synthesize ─────────────────────────────────── async def synthesize( query: str, evidence: list["EvidenceItem"], ai_client: AIClient | None = None, debug: bool = False, ) -> SynthesisResult: """evidence → grounded answer. Failure modes 는 모두 SynthesisResult 로 반환한다 (예외는 외부로 전파되지 않음). 호출자 (`/ask` wrapper) 가 status 를 보고 user-facing 메시지를 결정한다. """ t_start = time.perf_counter() # ── evidence 비면 즉시 no_evidence ───────────────── if not evidence: return SynthesisResult( status="no_evidence", answer=None, used_citations=[], confidence=None, refused=False, refuse_reason=None, elapsed_ms=(time.perf_counter() - t_start) * 1000, cache_hit=False, hallucination_flags=[], raw_preview=None, ) # ── cache lookup ─────────────────────────────────── # chunk_id 가 None 인 text-only wrap 은 음수 doc_id 로 구분 → key 안정화 chunk_ids = [ (e.chunk_id if e.chunk_id is not None else -e.doc_id) for e in evidence ] cached = get_cached(query, chunk_ids) if cached is not None: return SynthesisResult( status=cached.status, answer=cached.answer, used_citations=list(cached.used_citations), confidence=cached.confidence, refused=cached.refused, refuse_reason=cached.refuse_reason, elapsed_ms=(time.perf_counter() - t_start) * 1000, cache_hit=True, hallucination_flags=list(cached.hallucination_flags), raw_preview=cached.raw_preview if debug else None, ) # ── prompt 준비 ───────────────────────────────────── if not SYNTHESIS_PROMPT: return SynthesisResult( status="llm_error", answer=None, used_citations=[], confidence=None, refused=False, refuse_reason=None, elapsed_ms=(time.perf_counter() - t_start) * 1000, cache_hit=False, hallucination_flags=["prompt_not_loaded"], raw_preview=None, ) prompt = _render_prompt(query, evidence) prompt_preview = prompt[:500] if debug else None # ── LLM 호출 ─────────────────────────────────────── client_owned = False if ai_client is None: ai_client = AIClient() client_owned = True raw: str | None = None llm_error: str | None = None try: async with get_mlx_gate(): async with asyncio.timeout(LLM_TIMEOUT_MS / 1000): raw = await ai_client._call_chat(ai_client.ai.primary, prompt) except asyncio.TimeoutError: llm_error = "timeout" except Exception as exc: llm_error = f"llm_error:{type(exc).__name__}" finally: if client_owned: try: await ai_client.close() except Exception: pass elapsed_ms = (time.perf_counter() - t_start) * 1000 if llm_error is not None: status: SynthesisStatus = "timeout" if llm_error == "timeout" else "llm_error" logger.warning( "synthesis %s query=%r evidence_n=%d elapsed_ms=%.0f", llm_error, query[:80], len(evidence), elapsed_ms, ) return SynthesisResult( status=status, answer=None, used_citations=[], confidence=None, refused=False, refuse_reason=None, elapsed_ms=elapsed_ms, cache_hit=False, hallucination_flags=[llm_error], raw_preview=None, ) parsed = parse_json_response(raw or "") if not isinstance(parsed, dict): logger.warning( "synthesis parse_failed query=%r raw=%r elapsed_ms=%.0f", query[:80], (raw or "")[:200], elapsed_ms, ) return SynthesisResult( status="parse_failed", answer=None, used_citations=[], confidence=None, refused=False, refuse_reason=None, elapsed_ms=elapsed_ms, cache_hit=False, hallucination_flags=["parse_failed"], raw_preview=(raw or "")[:500] if debug else None, ) # ── JSON 필드 검증 ────────────────────────────────── answer_raw = parsed.get("answer", "") if not isinstance(answer_raw, str): answer_raw = "" conf_raw = parsed.get("confidence", "low") if conf_raw not in ("high", "medium", "low"): conf_raw = "low" refused_raw = bool(parsed.get("refused", False)) refuse_reason_raw = parsed.get("refuse_reason") if refuse_reason_raw is not None and not isinstance(refuse_reason_raw, str): refuse_reason_raw = None # refused 면 answer 무시 + citations 비움 if refused_raw: result = SynthesisResult( status="completed", answer=None, used_citations=[], confidence=conf_raw, # type: ignore[arg-type] refused=True, refuse_reason=refuse_reason_raw, elapsed_ms=elapsed_ms, cache_hit=False, hallucination_flags=[], raw_preview=(raw or "")[:500] if debug else None, ) logger.info( "synthesis refused query=%r evidence_n=%d conf=%s elapsed_ms=%.0f reason=%r", query[:80], len(evidence), conf_raw, elapsed_ms, (refuse_reason_raw or "")[:80], ) # refused 는 캐시 금지 (_should_cache) return result # ── citation 검증 ─────────────────────────────────── corrected_answer, used_citations, flags = _validate_citations( answer_raw, n_max=len(evidence) ) # answer 가 공백만 남으면 low confidence 로 강등 if not corrected_answer.strip(): corrected_answer_final: str | None = None conf_raw = "low" flags.append("empty_after_validation") else: corrected_answer_final = corrected_answer result = SynthesisResult( status="completed", answer=corrected_answer_final, used_citations=used_citations, confidence=conf_raw, # type: ignore[arg-type] refused=False, refuse_reason=None, elapsed_ms=elapsed_ms, cache_hit=False, hallucination_flags=flags, raw_preview=(raw or "")[:500] if debug else None, ) logger.info( "synthesis ok query=%r evidence_n=%d answer_len=%d citations=%d conf=%s flags=%s elapsed_ms=%.0f", query[:80], len(evidence), len(corrected_answer_final or ""), len(used_citations), conf_raw, ",".join(flags) if flags else "-", elapsed_ms, ) # 조건부 캐시 저장 set_cached(query, chunk_ids, result) return result