73f328cb65
PR-Hermes-Docsrv-Search-1 closure 측정 (synthesis_ms=30~48s / ev_ms=15005 / query_analyze 45s) 으로 15s LLM_TIMEOUT 빈발 timeout 확인. Mac mini 26B 동시 호출 (gate Semaphore 1 직렬화 후에도 evidence + synthesis + classifier + query_analyzer + verifier 가 sequential 누적) 시 각 호출 30s 까지 필요. 5곳 변경: - synthesis_service.LLM_TIMEOUT_MS 15000 → 30000 - evidence_service.LLM_TIMEOUT_MS 15000 → 30000 - verifier_service.LLM_TIMEOUT_MS 3000 → 10000 - query_analyzer.LLM_TIMEOUT_MS 15000 → 30000 - search.py:522 classifier wait_for 15.0 → 30.0 (classifier_service align) - search.py:641 verifier wait_for 4.0 → 10.0 (verifier_service align) classifier (이전 PR 에서 30s 로 align 완료) 와 동일 정책 — outer wait_for 가 inner LLM_TIMEOUT_MS 를 override 하지 않도록 align. ask 응답 latency 상한 ↑ 의도된 trade-off — 안정성 (refusal_gate conservative_refuse 회피 + grounding/verifier 정상 동작) 우선. 영향: PR-1 fixture 회귀 0 예상 (이전 timeout 이 새 한도 안). B-1 Throughput-1 (priority queue / 모델 분리) 별 PR 진입 시 latency 본격 단축 검토.
472 lines
19 KiB
Python
472 lines
19 KiB
Python
"""Evidence extraction 서비스 (Phase 3.2).
|
||
|
||
reranker 결과 chunks 에서 query-relevant span 을 구조적으로 추출한다.
|
||
|
||
## 설계 (EV-A: Rule + LLM span select)
|
||
|
||
```
|
||
reranked results
|
||
↓
|
||
[rule filter] score >= 0.25, max_per_doc=2, top MAX_EVIDENCE_CANDIDATES
|
||
↓
|
||
[snippet 재윈도우] _extract_window(full, query, 800) — LLM 입력용
|
||
↓
|
||
[1 batched LLM call] gemma-4 via get_mlx_gate() (single inference)
|
||
↓
|
||
[post-process]
|
||
- relevance >= 0.5 필터
|
||
- span too-short (< 80자) → _extract_window(full, query, 120) 로 재확장
|
||
- span too-long (> 300자) → cut
|
||
- doc-group ordering (검색 결과 doc 순서 유지, doc 내부만 relevance desc)
|
||
- n 재부여 (1..N)
|
||
↓
|
||
EvidenceItem 리스트
|
||
```
|
||
|
||
## 영구 룰
|
||
|
||
- **LLM 호출은 1번만** (batched). 순차 호출 절대 금지.
|
||
- **B-2 변경**: evidence 추출은 triage(Mac mini 26B MLX) 로 전환. PR #20 이후 triage/primary 동일 endpoint 라
|
||
path 분리는 prompt 레벨만 — `get_mlx_gate()` 외부 실행 (concurrent 안전성 별 검토). primary 의 gate 보호는 synthesis 전용.
|
||
- 기존 analyzer / synthesis 의 `get_mlx_gate()` 공유는 유지 — 26B 경로에만 적용.
|
||
- **fallback span 도 query 중심 window**. `full_snippet[:200]` 같은 "앞에서부터
|
||
자르기" 절대 금지. 조용한 품질 붕괴 (citation 은 멀쩡한데 실제 span 이 query
|
||
와 무관) 대표 사례.
|
||
- **Span too-short 보정 필수**: `len(span) < 80` 이면 자동 확장. "짧을수록
|
||
정확" 이 아니라 **짧으면 위험** — synthesis LLM 이 문맥 부족으로 이어 만들기
|
||
(soft hallucination) 를 한다.
|
||
- **Evidence ordering 은 doc-group 유지**. 전역 relevance desc 정렬 금지.
|
||
answer 는 [1][2][3] 순서로 생성되고 그 순서가 문맥 흐름을 결정한다.
|
||
|
||
## 확장 여지 (지금은 비활성)
|
||
|
||
`EVIDENCE_FAST_PATH_THRESHOLD` 가 `None` 이 아니고 `results[0].rerank_score >=
|
||
THRESHOLD` 이면 LLM 호출 스킵 후 rule-only 경로로 즉시 반환. Activation 조건:
|
||
(1) evidence LLM 호출 비율 > 80%, (2) /ask 평균 latency > 15s, (3) rerank
|
||
top1 p50 > 0.75. 셋 다 충족해야 켠다.
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import asyncio
|
||
import re
|
||
import time
|
||
from dataclasses import dataclass, field
|
||
from typing import TYPE_CHECKING
|
||
|
||
from ai.client import AIClient, _load_prompt, parse_json_response
|
||
from core.utils import setup_logger
|
||
|
||
from .llm_gate import get_mlx_gate
|
||
|
||
from .rerank_service import _extract_window
|
||
|
||
if TYPE_CHECKING:
|
||
from api.search import SearchResult
|
||
|
||
logger = setup_logger("evidence")
|
||
|
||
# ─── 상수 (plan 영구 룰) ─────────────────────────────────
|
||
EVIDENCE_MIN_RERANK = 0.25 # 1차 rule cut — rerank score 이 미만은 제외
|
||
MAX_EVIDENCE_CANDIDATES = 6 # LLM 입력 상한
|
||
MAX_PER_DOC = 2
|
||
CANDIDATE_SNIPPET_CHARS = 1200 # LLM 이 볼 원문 창 크기 (800→1200, 6×1200자≈2880tok < gemma-4 4096 예산)
|
||
MIN_EVIDENCE_FOR_SYNTHESIS = 3 # sparse evidence supplement 임계값
|
||
|
||
MIN_RELEVANCE_KEEP = 0.5 # LLM 출력 필터
|
||
SPAN_MIN_CHARS = 80 # 이 미만이면 window enlarge
|
||
SPAN_ENLARGE_TARGET = 120 # enlarge 시 재윈도우 target_chars
|
||
SPAN_MAX_CHARS = 300 # 이 초과면 cut (synthesis token budget 보호)
|
||
|
||
LLM_TIMEOUT_MS = 30000 # 2026-05-17 B-3: 15s 시 ev_ms=15005 timeout 빈발 — classifier (30s) 와 align
|
||
PROMPT_VERSION = "v2-triage" # B-2: primary(26B MLX) → triage path 전환. PR #20 이후 triage/primary 동일 endpoint (Mac mini 26B).
|
||
|
||
# 확장 여지 — None 이면 비활성 (baseline). 실측 후 0.8 등으로 켠다.
|
||
EVIDENCE_FAST_PATH_THRESHOLD: float | None = None
|
||
|
||
|
||
# ─── 반환 타입 ───────────────────────────────────────────
|
||
|
||
|
||
@dataclass(slots=True)
|
||
class EvidenceItem:
|
||
"""LLM 또는 rule fallback 이 추출한 단일 evidence span.
|
||
|
||
n 은 doc-group ordering + relevance 정렬 후 1부터 재부여된다.
|
||
`full_snippet` 은 **synthesis 프롬프트에 절대 포함 금지** — debug / citation
|
||
원문 보기 전용.
|
||
"""
|
||
|
||
n: int # 1-based, synthesis 프롬프트의 [n] 과 매핑
|
||
chunk_id: int | None
|
||
doc_id: int
|
||
title: str | None
|
||
section_title: str | None
|
||
span_text: str # LLM 추출 (또는 rule fallback) span, 80~300자
|
||
relevance: float # LLM 0~1 (fallback 시 rerank_score 복사)
|
||
rerank_score: float # raw reranker 점수
|
||
full_snippet: str # 원본 800자 (debug/citation 전용, synthesis 금지)
|
||
source: str = "llm" # "llm" | "supplement" | "rule_fallback"
|
||
|
||
|
||
# ─── 프롬프트 로딩 (module 초기화 1회) ───────────────────
|
||
try:
|
||
EVIDENCE_PROMPT = _load_prompt("evidence_extract.txt")
|
||
except FileNotFoundError:
|
||
EVIDENCE_PROMPT = ""
|
||
logger.warning(
|
||
"evidence_extract.txt not found — evidence_service will always use rule-only fallback"
|
||
)
|
||
|
||
|
||
# ─── Helper: candidates → LLM 입력 블록 ──────────────────
|
||
|
||
|
||
def _build_numbered_candidates(
|
||
candidates: list["SearchResult"], query: str
|
||
) -> tuple[str, list[str]]:
|
||
"""LLM 프롬프트의 {numbered_candidates} 블록 + 재윈도우된 full_snippet 리스트.
|
||
|
||
Returns:
|
||
(block_str, full_snippets) — full_snippets[i] 는 1-based n=i+1 의 원문
|
||
"""
|
||
lines: list[str] = []
|
||
full_snippets: list[str] = []
|
||
for i, c in enumerate(candidates, 1):
|
||
title = (c.title or "").strip()
|
||
raw_text = c.snippet or ""
|
||
full = _extract_window(raw_text, query, target_chars=CANDIDATE_SNIPPET_CHARS)
|
||
full_snippets.append(full)
|
||
lines.append(f"[{i}] title: {title} / text: {full}")
|
||
return "\n".join(lines), full_snippets
|
||
|
||
|
||
# ─── Helper: span length 보정 ───────────────────────────
|
||
|
||
|
||
def _normalize_span(span: str, full: str, query: str) -> tuple[str, bool]:
|
||
"""span 을 SPAN_MIN_CHARS ~ SPAN_MAX_CHARS 범위로 보정.
|
||
|
||
Returns:
|
||
(normalized_span, was_expanded)
|
||
- was_expanded=True 이면 "short_span_expanded" 로그 대상
|
||
"""
|
||
s = (span or "").strip()
|
||
expanded = False
|
||
if len(s) < SPAN_MIN_CHARS:
|
||
# soft hallucination 방어 — query 중심으로 window 재확장
|
||
s = _extract_window(full, query, target_chars=SPAN_ENLARGE_TARGET)
|
||
expanded = True
|
||
if len(s) > SPAN_MAX_CHARS:
|
||
s = s[:SPAN_MAX_CHARS]
|
||
return s, expanded
|
||
|
||
|
||
# ─── Helper: doc-group ordering ─────────────────────────
|
||
|
||
|
||
def _apply_doc_group_ordering(
|
||
items: list[EvidenceItem],
|
||
results: list["SearchResult"],
|
||
) -> list[EvidenceItem]:
|
||
"""검색 결과 doc 순서 유지 + doc 내부만 relevance desc + n 재부여.
|
||
|
||
answer 는 [1][2][3] 순서로 생성되고 그 순서가 문맥 흐름을 결정한다.
|
||
전역 relevance desc 정렬은 "doc A span1 → doc B span1 → doc A span2"
|
||
처럼 튀면서 읽기 이상한 답변을 만든다.
|
||
"""
|
||
if not items:
|
||
return []
|
||
doc_order: dict[int, int] = {}
|
||
for idx, r in enumerate(results):
|
||
if r.id not in doc_order:
|
||
doc_order[r.id] = idx
|
||
# 정렬: (doc 순서, -relevance)
|
||
items.sort(
|
||
key=lambda it: (doc_order.get(it.doc_id, 9999), -it.relevance)
|
||
)
|
||
# n 재부여
|
||
for new_n, it in enumerate(items, 1):
|
||
it.n = new_n
|
||
return items
|
||
|
||
|
||
# ─── Helper: rule-only fallback ─────────────────────────
|
||
|
||
|
||
def _build_rule_only_evidence(
|
||
candidates: list["SearchResult"],
|
||
full_snippets: list[str],
|
||
query: str,
|
||
) -> list[EvidenceItem]:
|
||
"""LLM 실패/timeout 시 rule-only 경로.
|
||
|
||
⚠ `full_snippet[:200]` 같은 앞자르기 금지. 반드시 `_extract_window` 로
|
||
query 중심 윈도우를 만든다. relevance 는 rerank_score 복사.
|
||
"""
|
||
items: list[EvidenceItem] = []
|
||
for i, (c, full) in enumerate(zip(candidates, full_snippets), 1):
|
||
span = _extract_window(full, query, target_chars=200)
|
||
# 정규화 (보통 여기서는 SPAN_MIN_CHARS 이상이지만 안전장치)
|
||
span, _expanded = _normalize_span(span, full, query)
|
||
items.append(
|
||
EvidenceItem(
|
||
n=i,
|
||
chunk_id=c.chunk_id,
|
||
doc_id=c.id,
|
||
title=c.title,
|
||
section_title=c.section_title,
|
||
span_text=span,
|
||
relevance=float(c.rerank_score or c.score or 0.0),
|
||
rerank_score=float(c.rerank_score or c.score or 0.0),
|
||
full_snippet=full,
|
||
source="rule_fallback",
|
||
)
|
||
)
|
||
return items
|
||
|
||
|
||
# ─── Core: extract_evidence ─────────────────────────────
|
||
|
||
|
||
async def extract_evidence(
|
||
query: str,
|
||
results: list["SearchResult"],
|
||
ai_client: AIClient | None = None,
|
||
) -> tuple[list[EvidenceItem], str | None]:
|
||
"""reranked results → EvidenceItem 리스트.
|
||
|
||
Returns:
|
||
(items, skip_reason)
|
||
skip_reason ∈ {None, "empty_retrieval", "all_low_rerank", "fast_path",
|
||
"llm_timeout_fallback_rule", "llm_error_fallback_rule",
|
||
"parse_failed_fallback_rule", "all_llm_rejected"}
|
||
- skip_reason 이 None 이 아니어도 items 는 비어있지 않을 수 있다
|
||
(fallback/fast_path 경로).
|
||
"""
|
||
if not results:
|
||
return [], "empty_retrieval"
|
||
|
||
# ── 1차 rule filter: rerank_score >= EVIDENCE_MIN_RERANK + max_per_doc ──
|
||
candidates: list["SearchResult"] = []
|
||
per_doc: dict[int, int] = {}
|
||
for r in results:
|
||
raw_score = r.rerank_score if r.rerank_score is not None else r.score
|
||
if raw_score is None or raw_score < EVIDENCE_MIN_RERANK:
|
||
continue
|
||
if per_doc.get(r.id, 0) >= MAX_PER_DOC:
|
||
continue
|
||
candidates.append(r)
|
||
per_doc[r.id] = per_doc.get(r.id, 0) + 1
|
||
if len(candidates) >= MAX_EVIDENCE_CANDIDATES:
|
||
break
|
||
|
||
if not candidates:
|
||
return [], "all_low_rerank"
|
||
|
||
# ── Fast-path (현재 비활성) ─────────────────────────
|
||
if EVIDENCE_FAST_PATH_THRESHOLD is not None:
|
||
# ⚠ display score 가 아니라 raw rerank_score 로 판단.
|
||
# normalize_display_scores 를 거친 r.score 는 frontend 용 리스케일
|
||
# 값이라 distribution drift 가능. fast-path 는 reranker raw 신호가 안전.
|
||
top_rerank = (
|
||
results[0].rerank_score if results[0].rerank_score is not None else 0.0
|
||
)
|
||
if top_rerank is not None and top_rerank >= EVIDENCE_FAST_PATH_THRESHOLD:
|
||
_block, full_snippets = _build_numbered_candidates(candidates, query)
|
||
items = _build_rule_only_evidence(candidates, full_snippets, query)
|
||
items = _apply_doc_group_ordering(items, results)
|
||
logger.info(
|
||
"evidence fast_path query=%r candidates=%d kept=%d top_rerank=%.2f",
|
||
query[:80], len(candidates), len(items), top_rerank,
|
||
)
|
||
return items, "fast_path"
|
||
|
||
# ── LLM 호출 준비 ───────────────────────────────────
|
||
if not EVIDENCE_PROMPT:
|
||
# 프롬프트 미로딩 → rule-only
|
||
_block, full_snippets = _build_numbered_candidates(candidates, query)
|
||
items = _build_rule_only_evidence(candidates, full_snippets, query)
|
||
items = _apply_doc_group_ordering(items, results)
|
||
logger.warning(
|
||
"evidence prompt_not_loaded → rule fallback query=%r kept=%d",
|
||
query[:80], len(items),
|
||
)
|
||
return items, "llm_error_fallback_rule"
|
||
|
||
block, full_snippets = _build_numbered_candidates(candidates, query)
|
||
prompt = EVIDENCE_PROMPT.replace("{query}", query).replace(
|
||
"{numbered_candidates}", block
|
||
)
|
||
|
||
client_owned = False
|
||
if ai_client is None:
|
||
ai_client = AIClient()
|
||
client_owned = True
|
||
|
||
t_start = time.perf_counter()
|
||
raw: str | None = None
|
||
llm_error: str | None = None
|
||
|
||
try:
|
||
# 2026-05-17: PR #20 이후 triage/primary 동일 Mac mini 26B endpoint. gate 외부 실행이 docstring
|
||
# 영구 룰 ("MLX primary 호출 경로는 예외 없이 gate 획득 필수") 위반 — race condition 으로 동시
|
||
# 호출 timeout 빈번. gate 안쪽으로 이동.
|
||
async with get_mlx_gate():
|
||
async with asyncio.timeout(LLM_TIMEOUT_MS / 1000):
|
||
raw = await ai_client.call_triage(prompt)
|
||
except asyncio.TimeoutError:
|
||
llm_error = "timeout"
|
||
except Exception as exc:
|
||
llm_error = f"llm_error:{type(exc).__name__}"
|
||
finally:
|
||
if client_owned:
|
||
try:
|
||
await ai_client.close()
|
||
except Exception:
|
||
pass
|
||
|
||
elapsed_ms = (time.perf_counter() - t_start) * 1000
|
||
|
||
# ── LLM 실패 → rule fallback ────────────────────────
|
||
if llm_error is not None:
|
||
items = _build_rule_only_evidence(candidates, full_snippets, query)
|
||
items = _apply_doc_group_ordering(items, results)
|
||
logger.warning(
|
||
"evidence LLM %s → rule fallback query=%r candidates=%d kept=%d elapsed_ms=%.0f",
|
||
llm_error, query[:80], len(candidates), len(items), elapsed_ms,
|
||
)
|
||
return items, "llm_timeout_fallback_rule" if llm_error == "timeout" else "llm_error_fallback_rule"
|
||
|
||
parsed = parse_json_response(raw or "")
|
||
if not isinstance(parsed, dict) or not isinstance(parsed.get("items"), list):
|
||
items = _build_rule_only_evidence(candidates, full_snippets, query)
|
||
items = _apply_doc_group_ordering(items, results)
|
||
logger.warning(
|
||
"evidence parse_failed → rule fallback query=%r raw=%r elapsed_ms=%.0f",
|
||
query[:80], (raw or "")[:200], elapsed_ms,
|
||
)
|
||
return items, "parse_failed_fallback_rule"
|
||
|
||
# ── LLM 출력 파싱 ──────────────────────────────────
|
||
short_span_expanded = 0
|
||
llm_items: list[EvidenceItem] = []
|
||
for entry in parsed["items"]:
|
||
if not isinstance(entry, dict):
|
||
continue
|
||
try:
|
||
n_raw = int(entry.get("n", 0))
|
||
except (TypeError, ValueError):
|
||
continue
|
||
if n_raw < 1 or n_raw > len(candidates):
|
||
continue
|
||
try:
|
||
relevance = float(entry.get("relevance", 0.0) or 0.0)
|
||
except (TypeError, ValueError):
|
||
relevance = 0.0
|
||
if relevance < MIN_RELEVANCE_KEEP:
|
||
continue
|
||
span_raw = entry.get("span")
|
||
if not isinstance(span_raw, str) or not span_raw.strip():
|
||
continue
|
||
|
||
candidate = candidates[n_raw - 1]
|
||
full = full_snippets[n_raw - 1]
|
||
span, expanded = _normalize_span(span_raw, full, query)
|
||
if expanded:
|
||
short_span_expanded += 1
|
||
|
||
llm_items.append(
|
||
EvidenceItem(
|
||
n=n_raw, # doc-group ordering 에서 재부여됨
|
||
chunk_id=candidate.chunk_id,
|
||
doc_id=candidate.id,
|
||
title=candidate.title,
|
||
section_title=candidate.section_title,
|
||
span_text=span,
|
||
relevance=relevance,
|
||
rerank_score=float(
|
||
candidate.rerank_score
|
||
if candidate.rerank_score is not None
|
||
else (candidate.score or 0.0)
|
||
),
|
||
full_snippet=full,
|
||
)
|
||
)
|
||
|
||
# ── LLM 이 전부 reject → rule fallback ──────────────
|
||
if not llm_items:
|
||
items = _build_rule_only_evidence(candidates, full_snippets, query)
|
||
items = _apply_doc_group_ordering(items, results)
|
||
logger.warning(
|
||
"evidence all_llm_rejected → rule fallback query=%r elapsed_ms=%.0f",
|
||
query[:80], elapsed_ms,
|
||
)
|
||
return items, "all_llm_rejected"
|
||
|
||
# ── Sparse evidence supplement (Phase 3.5b) ────────
|
||
# dead zone 해소: LLM kept 1~2 + candidates 충분 → rule-only 보충
|
||
supplement_skip = None
|
||
if 0 < len(llm_items) < MIN_EVIDENCE_FOR_SYNTHESIS and len(candidates) >= MIN_EVIDENCE_FOR_SYNTHESIS:
|
||
llm_n_set = {it.n for it in llm_items}
|
||
supplement_count = MIN_EVIDENCE_FOR_SYNTHESIS - len(llm_items)
|
||
supplemented = 0
|
||
|
||
# substring + critical token 필터 준비
|
||
query_tokens = re.findall(r'[가-힣]{2,}|[a-zA-Z]{3,}', query)
|
||
_IMPORTANT_SUFFIXES = {"조건", "기준", "요건", "처벌", "벌칙",
|
||
"정의", "차이", "절차", "방법", "계산"}
|
||
critical_tokens = [
|
||
t for t in query_tokens
|
||
if len(t) >= 3 or any(s in t for s in _IMPORTANT_SUFFIXES)
|
||
]
|
||
|
||
for idx, (c, full) in enumerate(zip(candidates, full_snippets), 1):
|
||
if idx in llm_n_set or supplement_count <= 0:
|
||
continue
|
||
span = _extract_window(full, query, target_chars=200)
|
||
span, _ = _normalize_span(span, full, query)
|
||
|
||
# substring match (recall)
|
||
has_match = any(qt in span for qt in query_tokens)
|
||
# critical token check (precision)
|
||
has_critical = (
|
||
any(ct in span for ct in critical_tokens)
|
||
if critical_tokens else has_match
|
||
)
|
||
if not (has_match and has_critical):
|
||
continue
|
||
|
||
llm_items.append(
|
||
EvidenceItem(
|
||
n=idx,
|
||
chunk_id=c.chunk_id,
|
||
doc_id=c.id,
|
||
title=c.title,
|
||
section_title=c.section_title,
|
||
span_text=span,
|
||
relevance=float(c.rerank_score or c.score or 0.0) * 0.8,
|
||
rerank_score=float(c.rerank_score or c.score or 0.0),
|
||
full_snippet=full,
|
||
source="supplement",
|
||
)
|
||
)
|
||
supplemented += 1
|
||
supplement_count -= 1
|
||
|
||
if supplemented > 0:
|
||
supplement_skip = "sparse_evidence_supplemented"
|
||
logger.info(
|
||
"evidence sparse_supplement query=%r llm_kept=%d supplemented=%d total=%d",
|
||
query[:80], len(llm_items) - supplemented, supplemented, len(llm_items),
|
||
)
|
||
|
||
# ── doc-group ordering + n 재부여 ───────────────────
|
||
llm_items = _apply_doc_group_ordering(llm_items, results)
|
||
|
||
logger.info(
|
||
"evidence ok query=%r candidates=%d kept=%d short_span_expanded=%d elapsed_ms=%.0f",
|
||
query[:80], len(candidates), len(llm_items), short_span_expanded, elapsed_ms,
|
||
)
|
||
return llm_items, supplement_skip
|