diff --git a/app/api/search.py b/app/api/search.py index 7e5dc9e..451293a 100644 --- a/app/api/search.py +++ b/app/api/search.py @@ -1,11 +1,16 @@ -"""하이브리드 검색 API — orchestrator (Phase 1.1: thin endpoint). +"""하이브리드 검색 API — thin endpoint (Phase 3.1 이후). -retrieval / fusion / rerank 등 실제 로직은 services/search/* 모듈로 분리. -이 파일은 mode 분기, 응답 직렬화, debug 응답 구성, BackgroundTask dispatch만 담당. +실제 검색 파이프라인(retrieval → fusion → rerank → diversity → confidence) +은 `services/search/search_pipeline.py::run_search()` 로 분리되어 있다. +이 파일은 다음만 담당: + - Pydantic 스키마 (SearchResult / SearchResponse / SearchDebug / DebugCandidate + / Citation / AskResponse / AskDebug) + - `/search` endpoint wrapper (run_search 호출 + logger + telemetry + 직렬화) + - `/ask` endpoint wrapper (Phase 3.3 에서 추가) """ import time -from typing import Annotated +from typing import Annotated, Literal from fastapi import APIRouter, BackgroundTasks, Depends, Query from pydantic import BaseModel @@ -15,48 +20,11 @@ from core.auth import get_current_user from core.database import get_session from core.utils import setup_logger from models.user import User -from services.search import query_analyzer -from services.search.fusion_service import ( - DEFAULT_FUSION, - apply_soft_filter_boost, - get_strategy, - normalize_display_scores, -) -from services.search.rerank_service import ( - MAX_CHUNKS_PER_DOC, - MAX_RERANK_INPUT, - apply_diversity, - rerank_chunks, -) -from services.search.retrieval_service import ( - compress_chunks_to_docs, - search_text, - search_vector, - search_vector_multilingual, -) -from services.search_telemetry import ( - compute_confidence, - compute_confidence_hybrid, - compute_confidence_reranked, - record_search_event, -) - - -# Phase 2.1: analyzer_confidence 3단계 게이트 (값 조정은 plan 기준) -ANALYZER_TIER_IGNORE = 0.5 # < 0.5 → analyzer 완전 무시, soft_filter 비활성 -ANALYZER_TIER_ORIGINAL = 0.7 # < 0.7 → original query fallback -ANALYZER_TIER_MERGE = 0.85 # < 0.85 → original + analyzed merge - - -def _analyzer_tier(confidence: float) -> str: - """analyzer_confidence → 사용 tier 문자열. Phase 2.2/2.3에서 실제 분기용.""" - if confidence < ANALYZER_TIER_IGNORE: - return "ignore" - if confidence < ANALYZER_TIER_ORIGINAL: - return "original_fallback" - if confidence < ANALYZER_TIER_MERGE: - return "merge" - return "analyzed" +from services.search.evidence_service import EvidenceItem, extract_evidence +from services.search.fusion_service import DEFAULT_FUSION +from services.search.search_pipeline import PipelineResult, run_search +from services.search.synthesis_service import SynthesisResult, synthesize +from services.search_telemetry import record_search_event # logs/search.log + stdout 동시 출력 (Phase 0.4) logger = setup_logger("search") @@ -84,6 +52,10 @@ class SearchResult(BaseModel): chunk_id: int | None = None chunk_index: int | None = None section_title: str | None = None + # Phase 3.1: reranker raw score 보존 (display score drift 방지). + # rerank 경로를 탄 chunk에만 채워짐. normalize_display_scores는 이 필드를 + # 건드리지 않는다. Phase 3 evidence fast-path 판단에 사용. + rerank_score: float | None = None # ─── Phase 0.4: 디버그 응답 스키마 ───────────────────────── @@ -126,6 +98,29 @@ def _to_debug_candidates(rows: list[SearchResult], n: int = 20) -> list[DebugCan ] +def _build_search_debug(pr: PipelineResult) -> SearchDebug: + """PipelineResult → SearchDebug (기존 search()의 debug 구성 블록 복사).""" + return SearchDebug( + timing_ms=pr.timing_ms, + text_candidates=( + _to_debug_candidates(pr.text_results) + if pr.text_results or pr.mode != "vector" + else None + ), + vector_candidates=( + _to_debug_candidates(pr.vector_results) + if pr.vector_results or pr.mode in ("vector", "hybrid") + else None + ), + fused_candidates=( + _to_debug_candidates(pr.results) if pr.mode == "hybrid" else None + ), + confidence=pr.confidence_signal, + notes=pr.notes, + query_analysis=pr.query_analysis, + ) + + @router.get("/", response_model=SearchResponse) async def search( q: str, @@ -149,193 +144,34 @@ async def search( ), debug: bool = Query(False, description="단계별 candidates + timing 응답에 포함"), ): - """문서 검색 — FTS + ILIKE + 벡터 결합 (Phase 0.5: RRF fusion)""" - timing: dict[str, float] = {} - notes: list[str] = [] - text_results: list[SearchResult] = [] - vector_results: list[SearchResult] = [] # doc-level (압축 후, fusion 입력) - raw_chunks: list[SearchResult] = [] # chunk-level (raw, Phase 1.3 reranker용) - chunks_by_doc: dict[int, list[SearchResult]] = {} # Phase 1.3 reranker용 보존 - query_analysis: dict | None = None - analyzer_confidence: float = 0.0 - analyzer_tier: str = "disabled" - - t_total = time.perf_counter() - - # Phase 2.1 (async 구조): QueryAnalyzer는 동기 호출 금지. - # - cache hit → query_analysis 활용 (Phase 2.2/2.3 파이프라인 조건부) - # - cache miss → 기존 경로 유지 + background task 트리거 (fire-and-forget) - # 실측(gemma-4 10초+) 기반 결정. memory: feedback_analyzer_async_only.md - analyzer_cache_hit: bool = False - if analyze: - query_analysis = query_analyzer.get_cached(q) - if query_analysis is not None: - analyzer_cache_hit = True - try: - analyzer_confidence = float( - query_analysis.get("analyzer_confidence", 0.0) or 0.0 - ) - except (TypeError, ValueError): - analyzer_confidence = 0.0 - analyzer_tier = _analyzer_tier(analyzer_confidence) - notes.append( - f"analyzer cache_hit conf={analyzer_confidence:.2f} tier={analyzer_tier}" - ) - else: - # cache miss → background analyzer 트리거 (retrieval 차단 X) - triggered = query_analyzer.trigger_background_analysis(q) - analyzer_tier = "cache_miss" - notes.append( - "analyzer cache_miss" - + (" (bg triggered)" if triggered else " (bg inflight)") - ) - - # Phase 2.2: multilingual vector search 활성 조건 (보수적) - # - cache hit + analyzer_tier == "analyzed" (≥0.85 고신뢰) - # - normalized_queries 2개 이상 (lang 다양성 있음) - # - domain_hint == "news" 또는 language_scope == "global" - # ↑ 1차 측정 결과: document 도메인에서 multilingual이 natural_language_ko - # -0.10 악화시킴. 영어 번역이 한국어 법령 검색에서 noise로 작용. - # news / global 영역에서만 multilingual 활성 (news_crosslingual +0.10 개선 확인). - use_multilingual: bool = False - normalized_queries: list[dict] = [] - if analyzer_cache_hit and analyzer_tier == "analyzed" and query_analysis: - domain_hint = query_analysis.get("domain_hint", "mixed") - language_scope = query_analysis.get("language_scope", "limited") - is_multilingual_candidate = ( - domain_hint == "news" or language_scope == "global" - ) - if is_multilingual_candidate: - raw_nq = query_analysis.get("normalized_queries") or [] - if isinstance(raw_nq, list) and len(raw_nq) >= 2: - normalized_queries = [ - nq for nq in raw_nq if isinstance(nq, dict) and nq.get("text") - ] - if len(normalized_queries) >= 2: - use_multilingual = True - notes.append( - f"multilingual langs={[nq.get('lang') for nq in normalized_queries]}" - f" hint={domain_hint}/{language_scope}" - ) - - if mode == "vector": - t0 = time.perf_counter() - if use_multilingual: - raw_chunks = await search_vector_multilingual(session, normalized_queries, limit) - else: - raw_chunks = await search_vector(session, q, limit) - timing["vector_ms"] = (time.perf_counter() - t0) * 1000 - if not raw_chunks: - notes.append("vector_search_returned_empty (AI client error or no embeddings)") - # vector 단독 모드도 doc 압축해서 다양성 확보 (chunk 중복 방지) - vector_results, chunks_by_doc = compress_chunks_to_docs(raw_chunks, limit) - results = vector_results - else: - t0 = time.perf_counter() - text_results = await search_text(session, q, limit) - timing["text_ms"] = (time.perf_counter() - t0) * 1000 - - if mode == "hybrid": - t1 = time.perf_counter() - if use_multilingual: - raw_chunks = await search_vector_multilingual(session, normalized_queries, limit) - else: - raw_chunks = await search_vector(session, q, limit) - timing["vector_ms"] = (time.perf_counter() - t1) * 1000 - - # chunk-level → doc-level 압축 (raw chunks는 chunks_by_doc에 보존) - t1b = time.perf_counter() - vector_results, chunks_by_doc = compress_chunks_to_docs(raw_chunks, limit) - timing["compress_ms"] = (time.perf_counter() - t1b) * 1000 - - if not vector_results: - notes.append("vector_search_returned_empty — text-only fallback") - - t2 = time.perf_counter() - strategy = get_strategy(fusion) - # fusion은 doc 기준 — 더 넓게 가져옴 (rerank 후보용) - fusion_limit = max(limit * 5, 100) if rerank else limit - fused_docs = strategy.fuse(text_results, vector_results, q, fusion_limit) - timing["fusion_ms"] = (time.perf_counter() - t2) * 1000 - notes.append(f"fusion={strategy.name}") - notes.append( - f"chunks raw={len(raw_chunks)} compressed={len(vector_results)} " - f"unique_docs={len(chunks_by_doc)}" - ) - - # Phase 2.3: soft_filter boost (cache hit + tier != ignore 일 때만) - # analyzer_confidence < 0.5 (tier=ignore)는 비활성. - if ( - analyzer_cache_hit - and analyzer_tier != "ignore" - and query_analysis - ): - soft_filters = query_analysis.get("soft_filters") or {} - if soft_filters: - boosted = apply_soft_filter_boost(fused_docs, soft_filters) - if boosted > 0: - notes.append(f"soft_filter_boost applied={boosted}") - - if rerank: - # Phase 1.3: reranker — chunk 기준 입력 - # fusion 결과 doc_id로 chunks_by_doc에서 raw chunks 회수 - t3 = time.perf_counter() - rerank_input: list[SearchResult] = [] - for doc in fused_docs: - chunks = chunks_by_doc.get(doc.id, []) - if chunks: - # doc당 max 2 chunk (latency/VRAM 보호) - rerank_input.extend(chunks[:MAX_CHUNKS_PER_DOC]) - else: - # text-only 매치 doc → doc 자체를 chunk처럼 wrap - rerank_input.append(doc) - if len(rerank_input) >= MAX_RERANK_INPUT: - break - rerank_input = rerank_input[:MAX_RERANK_INPUT] - notes.append(f"rerank input={len(rerank_input)}") - - reranked = await rerank_chunks(q, rerank_input, limit * 3) - timing["rerank_ms"] = (time.perf_counter() - t3) * 1000 - - # diversity (chunk → doc 압축, max_per_doc=2, top score>0.90 unlimited) - t4 = time.perf_counter() - results = apply_diversity(reranked, max_per_doc=MAX_CHUNKS_PER_DOC)[:limit] - timing["diversity_ms"] = (time.perf_counter() - t4) * 1000 - else: - # rerank 비활성: fused_docs를 그대로 (limit 적용) - results = fused_docs[:limit] - else: - results = text_results - - # display score 정규화 — 프론트엔드는 score*100을 % 표시. - # fusion 내부 score(RRF는 0.01~0.05 범위)를 그대로 노출하면 표시가 깨짐. - normalize_display_scores(results) - - timing["total_ms"] = (time.perf_counter() - t_total) * 1000 - - # confidence는 fusion 적용 전 raw 신호로 계산 (Phase 0.5 이후 fused score는 절대값 의미 없음) - # rerank 활성 시 reranker score가 가장 신뢰할 수 있는 신호 → 우선 사용 - if mode == "hybrid": - if rerank and "rerank_ms" in timing: - confidence_signal = compute_confidence_reranked(results) - else: - confidence_signal = compute_confidence_hybrid(text_results, vector_results) - elif mode == "vector": - confidence_signal = compute_confidence(vector_results, "vector") - else: - confidence_signal = compute_confidence(text_results, mode) + """문서 검색 — FTS + ILIKE + 벡터 결합 (Phase 3.1 이후 run_search wrapper)""" + pr = await run_search( + session, + q, + mode=mode, # type: ignore[arg-type] + limit=limit, + fusion=fusion, + rerank=rerank, + analyze=analyze, + ) # 사용자 feedback: 모든 단계 timing은 debug 응답과 별도로 항상 로그로 남긴다 - timing_str = " ".join(f"{k}={v:.0f}" for k, v in timing.items()) + timing_str = " ".join(f"{k}={v:.0f}" for k, v in pr.timing_ms.items()) fusion_str = f" fusion={fusion}" if mode == "hybrid" else "" analyzer_str = ( - f" analyzer=hit={analyzer_cache_hit}/conf={analyzer_confidence:.2f}/tier={analyzer_tier}" + f" analyzer=hit={pr.analyzer_cache_hit}/conf={pr.analyzer_confidence:.2f}/tier={pr.analyzer_tier}" if analyze else "" ) logger.info( "search query=%r mode=%s%s%s results=%d conf=%.2f %s", - q[:80], mode, fusion_str, analyzer_str, len(results), confidence_signal, timing_str, + q[:80], + pr.mode, + fusion_str, + analyzer_str, + len(pr.results), + pr.confidence_signal, + timing_str, ) # Phase 0.3: 실패 자동 로깅 (응답 latency에 영향 X — background task) @@ -344,28 +180,259 @@ async def search( record_search_event, q, user.id, - results, - mode, - confidence_signal, - analyzer_confidence if analyze else None, + pr.results, + pr.mode, + pr.confidence_signal, + pr.analyzer_confidence if analyze else None, ) - debug_obj: SearchDebug | None = None - if debug: - debug_obj = SearchDebug( - timing_ms=timing, - text_candidates=_to_debug_candidates(text_results) if text_results or mode != "vector" else None, - vector_candidates=_to_debug_candidates(vector_results) if vector_results or mode in ("vector", "hybrid") else None, - fused_candidates=_to_debug_candidates(results) if mode == "hybrid" else None, - confidence=confidence_signal, - notes=notes, - query_analysis=query_analysis, - ) + debug_obj = _build_search_debug(pr) if debug else None return SearchResponse( - results=results, - total=len(results), + results=pr.results, + total=len(pr.results), query=q, - mode=mode, + mode=pr.mode, + debug=debug_obj, + ) + + +# ═══════════════════════════════════════════════════════════ +# Phase 3.3: /api/search/ask — Evidence + Grounded Synthesis +# ═══════════════════════════════════════════════════════════ + + +class Citation(BaseModel): + """answer 본문의 [n] 에 해당하는 근거 단일 행.""" + + n: int + chunk_id: int | None + doc_id: int + title: str | None + section_title: str | None + span_text: str # evidence LLM 이 추출한 50~300자 + full_snippet: str # 원본 800자 (citation 원문 보기 전용) + relevance: float + rerank_score: float + + +class AskDebug(BaseModel): + """`/ask?debug=true` 응답 확장.""" + + timing_ms: dict[str, float] + search_notes: list[str] + query_analysis: dict | None = None + confidence_signal: float + evidence_candidate_count: int + evidence_kept_count: int + evidence_skip_reason: str | None + synthesis_cache_hit: bool + synthesis_prompt_preview: str | None = None + synthesis_raw_preview: str | None = None + hallucination_flags: list[str] = [] + + +class AskResponse(BaseModel): + """`/ask` 응답. `/search` 의 SearchResult 는 그대로 재사용.""" + + results: list[SearchResult] + ai_answer: str | None + citations: list[Citation] + synthesis_status: Literal[ + "completed", "timeout", "skipped", "no_evidence", "parse_failed", "llm_error" + ] + synthesis_ms: float + confidence: Literal["high", "medium", "low"] | None + refused: bool + no_results_reason: str | None + query: str + total: int + debug: AskDebug | None = None + + +def _map_no_results_reason( + pr: PipelineResult, + evidence: list[EvidenceItem], + ev_skip: str | None, + sr: SynthesisResult, +) -> str | None: + """사용자에게 보여줄 한국어 메시지 매핑. + + Failure mode 표 (plan §Failure Modes) 기반. + """ + # LLM 자가 refused → 모델이 준 사유 그대로 + if sr.refused and sr.refuse_reason: + return sr.refuse_reason + + # synthesis 상태 우선 + if sr.status == "no_evidence": + if not pr.results: + return "검색 결과가 없습니다." + return "관련도 높은 근거를 찾지 못했습니다." + if sr.status == "skipped": + return "검색 결과가 없습니다." + if sr.status == "timeout": + return "답변 생성이 지연되어 생략했습니다. 검색 결과를 확인해 주세요." + if sr.status == "parse_failed": + return "답변 형식 오류로 생략했습니다." + if sr.status == "llm_error": + return "AI 서버에 일시적 문제가 있습니다." + + # evidence 단계 실패는 fallback 을 탔더라도 notes 용 + if ev_skip == "all_low_rerank": + return "관련도 높은 근거를 찾지 못했습니다." + if ev_skip == "empty_retrieval": + return "검색 결과가 없습니다." + + return None + + +def _build_citations( + evidence: list[EvidenceItem], used_citations: list[int] +) -> list[Citation]: + """answer 본문에 실제로 등장한 n 만 Citation 으로 변환.""" + by_n = {e.n: e for e in evidence} + out: list[Citation] = [] + for n in used_citations: + e = by_n.get(n) + if e is None: + continue + out.append( + Citation( + n=e.n, + chunk_id=e.chunk_id, + doc_id=e.doc_id, + title=e.title, + section_title=e.section_title, + span_text=e.span_text, + full_snippet=e.full_snippet, + relevance=e.relevance, + rerank_score=e.rerank_score, + ) + ) + return out + + +def _build_ask_debug( + pr: PipelineResult, + evidence: list[EvidenceItem], + ev_skip: str | None, + sr: SynthesisResult, + ev_ms: float, + synth_ms: float, + total_ms: float, +) -> AskDebug: + timing: dict[str, float] = dict(pr.timing_ms) + timing["evidence_ms"] = ev_ms + timing["synthesis_ms"] = synth_ms + timing["ask_total_ms"] = total_ms + + # candidate count 는 rule filter 통과한 수 (recomputable from results) + # 엄밀히는 evidence_service 내부 숫자인데, evidence 길이 ≈ kept, candidate + # 는 관측이 어려움 → kept 는 evidence 길이, candidate 는 별도 필드 없음. + # 단순화: candidate_count = len(evidence) 를 상한 근사로 둠 (debug 전용). + return AskDebug( + timing_ms=timing, + search_notes=pr.notes, + query_analysis=pr.query_analysis, + confidence_signal=pr.confidence_signal, + evidence_candidate_count=len(evidence), + evidence_kept_count=len(evidence), + evidence_skip_reason=ev_skip, + synthesis_cache_hit=sr.cache_hit, + synthesis_prompt_preview=None, # 현재 synthesis_service 에서 노출 안 함 + synthesis_raw_preview=sr.raw_preview, + hallucination_flags=sr.hallucination_flags, + ) + + +@router.get("/ask", response_model=AskResponse) +async def ask( + q: str, + user: Annotated[User, Depends(get_current_user)], + session: Annotated[AsyncSession, Depends(get_session)], + background_tasks: BackgroundTasks, + limit: int = Query(10, ge=1, le=20, description="synthesis 입력 상한"), + debug: bool = Query(False, description="evidence/synthesis 중간 상태 노출"), +): + """근거 기반 AI 답변 (Phase 3.3). + + `/search` 와 동일한 검색 파이프라인을 거친 후 evidence extraction + + grounded synthesis 를 추가한다. `mode`, `rerank`, `analyze` 는 품질 보장을 + 위해 강제 고정 (hybrid / True / True). + + 실패 경로(timeout/parse_failed/refused/...) 에서도 `results` 는 항상 반환. + """ + t_total = time.perf_counter() + + # 1. 검색 파이프라인 (run_search — /search 와 동일 로직, 단일 진실 소스) + pr = await run_search( + session, + q, + mode="hybrid", + limit=limit, + fusion=DEFAULT_FUSION, + rerank=True, + analyze=True, + ) + + # 2. Evidence extraction (rule + LLM span select, 1 batched call) + t_ev = time.perf_counter() + evidence, ev_skip = await extract_evidence(q, pr.results) + ev_ms = (time.perf_counter() - t_ev) * 1000 + + # 3. Grounded synthesis (gemma-4, 15s timeout, citation 검증) + t_synth = time.perf_counter() + sr = await synthesize(q, evidence, debug=debug) + synth_ms = (time.perf_counter() - t_synth) * 1000 + + total_ms = (time.perf_counter() - t_total) * 1000 + + # 4. 응답 구성 + citations = _build_citations(evidence, sr.used_citations) + no_reason = _map_no_results_reason(pr, evidence, ev_skip, sr) + + logger.info( + "ask query=%r results=%d evidence=%d cite=%d synth=%s conf=%s refused=%s ev_ms=%.0f synth_ms=%.0f total=%.0f", + q[:80], + len(pr.results), + len(evidence), + len(citations), + sr.status, + sr.confidence or "-", + sr.refused, + ev_ms, + synth_ms, + total_ms, + ) + + # 5. telemetry — 기존 record_search_event 재사용 (Phase 0.3 호환) + background_tasks.add_task( + record_search_event, + q, + user.id, + pr.results, + "hybrid", + pr.confidence_signal, + pr.analyzer_confidence, + ) + + debug_obj = ( + _build_ask_debug(pr, evidence, ev_skip, sr, ev_ms, synth_ms, total_ms) + if debug + else None + ) + + return AskResponse( + results=pr.results, + ai_answer=sr.answer, + citations=citations, + synthesis_status=sr.status, + synthesis_ms=sr.elapsed_ms, + confidence=sr.confidence, + refused=sr.refused, + no_results_reason=no_reason, + query=q, + total=len(pr.results), debug=debug_obj, ) diff --git a/app/prompts/evidence_extract.txt b/app/prompts/evidence_extract.txt new file mode 100644 index 0000000..a231f1c --- /dev/null +++ b/app/prompts/evidence_extract.txt @@ -0,0 +1,76 @@ +You are an evidence span extractor. Respond ONLY in JSON. No markdown, no explanation. + +## Task + +For each numbered candidate, extract the most query-relevant span from the original text (copy verbatim, 50-200 chars) and rate relevance 0.0~1.0. If the candidate does not directly answer the query, set span=null, relevance=0.0, skip_reason. + +## Output Schema +{ + "items": [ + { + "n": 1, + "span": "...", + "relevance": 0.0, + "skip_reason": null + } + ] +} + +## Rules +- `n`: candidate 번호 (1-based, 입력 순서와 동일). **모든 n을 반환** (skip된 것도 포함). +- `span`: 원문에서 **그대로 복사한** 50~200자. 요약/변형 금지. 원문에 없는 단어는 절대 포함하지 말 것. 여러 문장이어도 무방. +- 관련 span이 없으면 `span: null`, `relevance: 0.0`, `skip_reason`에 한 줄 사유. +- `relevance`: 0.0~1.0 float + - 0.9+ query에 직접 답함 + - 0.7~0.9 강한 연관 + - 0.5~0.7 부분 연관 + - <0.5 약한/무관 (fallback에서 탈락) +- `skip_reason`: span=null 일 때만 필수. 예: "no_direct_relevance", "off_topic", "generic_boilerplate" +- **원문 그대로 복사 강제**: 번역/paraphrase/요약 모두 금지. evidence span은 citation 원문이 되어야 한다. + +## Example 1 (hit) +query: `산업안전보건법 제6장 주요 내용` +candidates: +[1] title: 산업안전보건법 해설 / text: 제6장은 "안전보건관리체제"에 관한 장으로, 사업주의 안전보건관리책임자 선임 의무와 관리감독자 지정 등을 규정한다. 제15조부터 제19조까지 구성된다... +[2] title: 회사 복지 규정 / text: 직원의 연차휴가 사용 규정과 경조사 지원 내용을 담고 있다... + +→ +{ + "items": [ + { + "n": 1, + "span": "제6장은 \"안전보건관리체제\"에 관한 장으로, 사업주의 안전보건관리책임자 선임 의무와 관리감독자 지정 등을 규정한다. 제15조부터 제19조까지 구성된다", + "relevance": 0.95, + "skip_reason": null + }, + { + "n": 2, + "span": null, + "relevance": 0.0, + "skip_reason": "off_topic" + } + ] +} + +## Example 2 (partial) +query: `Python async best practice` +candidates: +[1] title: FastAPI tutorial / text: FastAPI supports both async and sync endpoints. For I/O-bound operations, use async def with await for database and HTTP calls. Avoid blocking calls in async functions or use run_in_executor... + +→ +{ + "items": [ + { + "n": 1, + "span": "For I/O-bound operations, use async def with await for database and HTTP calls. Avoid blocking calls in async functions or use run_in_executor", + "relevance": 0.82, + "skip_reason": null + } + ] +} + +## Query +{query} + +## Candidates +{numbered_candidates} diff --git a/app/prompts/search_synthesis.txt b/app/prompts/search_synthesis.txt new file mode 100644 index 0000000..927c64c --- /dev/null +++ b/app/prompts/search_synthesis.txt @@ -0,0 +1,80 @@ +You are a grounded answer synthesizer. Respond ONLY in JSON. No markdown, no explanation. + +## Task + +Given a query and numbered evidence spans, write a short answer that cites specific evidence by [n]. **You may only use facts that appear in the evidence.** If the evidence does not directly answer the query, set `refused: true`. + +## Output Schema +{ + "answer": "...", + "used_citations": [1, 2], + "confidence": "high", + "refused": false, + "refuse_reason": null +} + +## Rules +- `answer`: **400 characters max**. Must contain inline `[n]` citations. Every claim sentence ends with at least one `[n]`. Multiple sources: `[1][3]`. **Only use facts present in evidence. No outside knowledge, no guessing, no paraphrasing what is not there.** +- `used_citations`: integer list of `n` values that actually appear in `answer` (for cross-check). Must be sorted ascending, no duplicates. +- `confidence`: + - `high`: 3+ evidence items directly match the query + - `medium`: 2 items match, or strong single match + - `low`: 1 weak item, or partial match +- `refused`: set to `true` if evidence does not directly answer the query (e.g. off-topic, too generic, missing key facts). When refused: + - `answer`: empty string `""` + - `used_citations`: `[]` + - `confidence`: `"low"` + - `refuse_reason`: one sentence explaining why (will be shown to the user) +- **Language**: Korean query → Korean answer. English query → English answer. Match query language. +- **Absolute prohibition**: Do NOT introduce entities, numbers, dates, or claims that are not verbatim in the evidence. If you are unsure whether a fact is in evidence, treat it as not present and either omit it or refuse. + +## Example 1 (happy path, high confidence) +query: `산업안전보건법 제6장 주요 내용` +evidence: +[1] 산업안전보건법 해설: 제6장은 "안전보건관리체제"에 관한 장으로, 사업주의 안전보건관리책임자 선임 의무와 관리감독자 지정 등을 규정한다 +[2] 시행령 해설: 제6장은 제15조부터 제19조까지로 구성되며 안전보건관리책임자의 업무 범위를 세부 규정한다 +[3] 법령 체계도: 안전보건관리책임자 선임은 상시근로자 50명 이상 사업장에 적용된다 + +→ +{ + "answer": "산업안전보건법 제6장은 안전보건관리체제에 관한 장으로, 사업주의 안전보건관리책임자 선임 의무와 관리감독자 지정을 규정한다[1]. 제15조부터 제19조까지 구성되며 관리책임자의 업무 범위를 세부 규정한다[2]. 상시근로자 50명 이상 사업장에 적용된다[3].", + "used_citations": [1, 2, 3], + "confidence": "high", + "refused": false, + "refuse_reason": null +} + +## Example 2 (partial, medium confidence) +query: `Python async best practice` +evidence: +[1] FastAPI tutorial: For I/O-bound operations, use async def with await for database and HTTP calls. Avoid blocking calls in async functions or use run_in_executor + +→ +{ + "answer": "For I/O-bound operations, use async def with await for database and HTTP calls, and avoid blocking calls inside async functions (use run_in_executor instead) [1].", + "used_citations": [1], + "confidence": "low", + "refused": false, + "refuse_reason": null +} + +## Example 3 (refused — evidence does not answer query) +query: `회사 연차 휴가 사용 규정` +evidence: +[1] 산업안전보건법 해설: 제6장은 "안전보건관리체제"에 관한 장으로, 사업주의 안전보건관리책임자 선임 의무와 관리감독자 지정 등을 규정한다 +[2] 회사 복지 안내: 직원 경조사 지원 내용 포함 + +→ +{ + "answer": "", + "used_citations": [], + "confidence": "low", + "refused": true, + "refuse_reason": "연차 휴가 사용 규정에 대한 직접적인 근거가 evidence에 없습니다." +} + +## Query +{query} + +## Evidence +{numbered_evidence} diff --git a/app/services/search/evidence_service.py b/app/services/search/evidence_service.py index adc5a0f..18f3542 100644 --- a/app/services/search/evidence_service.py +++ b/app/services/search/evidence_service.py @@ -1,5 +1,407 @@ -"""Evidence extraction 서비스 (Phase 3). +"""Evidence extraction 서비스 (Phase 3.2). -reranked chunks에서 query-relevant span을 rule + LLM hybrid로 추출. -구현은 Phase 3에서 채움. +reranker 결과 chunks 에서 query-relevant span 을 구조적으로 추출한다. + +## 설계 (EV-A: Rule + LLM span select) + +``` +reranked results + ↓ +[rule filter] score >= 0.25, max_per_doc=2, top MAX_EVIDENCE_CANDIDATES + ↓ +[snippet 재윈도우] _extract_window(full, query, 800) — LLM 입력용 + ↓ +[1 batched LLM call] gemma-4 via get_mlx_gate() (single inference) + ↓ +[post-process] + - relevance >= 0.5 필터 + - span too-short (< 80자) → _extract_window(full, query, 120) 로 재확장 + - span too-long (> 300자) → cut + - doc-group ordering (검색 결과 doc 순서 유지, doc 내부만 relevance desc) + - n 재부여 (1..N) + ↓ +EvidenceItem 리스트 +``` + +## 영구 룰 + +- **LLM 호출은 1번만** (batched). 순차 호출 절대 금지 — MLX single-inference + 큐가 폭발한다. +- **모든 MLX 호출은 `get_mlx_gate()` 경유**. analyzer / synthesis 와 동일 + semaphore 공유. +- **fallback span 도 query 중심 window**. `full_snippet[:200]` 같은 "앞에서부터 + 자르기" 절대 금지. 조용한 품질 붕괴 (citation 은 멀쩡한데 실제 span 이 query + 와 무관) 대표 사례. +- **Span too-short 보정 필수**: `len(span) < 80` 이면 자동 확장. "짧을수록 + 정확" 이 아니라 **짧으면 위험** — synthesis LLM 이 문맥 부족으로 이어 만들기 + (soft hallucination) 를 한다. +- **Evidence ordering 은 doc-group 유지**. 전역 relevance desc 정렬 금지. + answer 는 [1][2][3] 순서로 생성되고 그 순서가 문맥 흐름을 결정한다. + +## 확장 여지 (지금은 비활성) + +`EVIDENCE_FAST_PATH_THRESHOLD` 가 `None` 이 아니고 `results[0].rerank_score >= +THRESHOLD` 이면 LLM 호출 스킵 후 rule-only 경로로 즉시 반환. Activation 조건: +(1) evidence LLM 호출 비율 > 80%, (2) /ask 평균 latency > 15s, (3) rerank +top1 p50 > 0.75. 셋 다 충족해야 켠다. """ + +from __future__ import annotations + +import asyncio +import time +from dataclasses import dataclass, field +from typing import TYPE_CHECKING + +from ai.client import AIClient, _load_prompt, parse_json_response +from core.utils import setup_logger + +from .llm_gate import get_mlx_gate +from .rerank_service import _extract_window + +if TYPE_CHECKING: + from api.search import SearchResult + +logger = setup_logger("evidence") + +# ─── 상수 (plan 영구 룰) ───────────────────────────────── +EVIDENCE_MIN_RERANK = 0.25 # 1차 rule cut — rerank score 이 미만은 제외 +MAX_EVIDENCE_CANDIDATES = 6 # LLM 입력 상한 +MAX_PER_DOC = 2 +CANDIDATE_SNIPPET_CHARS = 800 # LLM 이 볼 원문 창 크기 + +MIN_RELEVANCE_KEEP = 0.5 # LLM 출력 필터 +SPAN_MIN_CHARS = 80 # 이 미만이면 window enlarge +SPAN_ENLARGE_TARGET = 120 # enlarge 시 재윈도우 target_chars +SPAN_MAX_CHARS = 300 # 이 초과면 cut (synthesis token budget 보호) + +LLM_TIMEOUT_MS = 15000 +PROMPT_VERSION = "v1" + +# 확장 여지 — None 이면 비활성 (baseline). 실측 후 0.8 등으로 켠다. +EVIDENCE_FAST_PATH_THRESHOLD: float | None = None + + +# ─── 반환 타입 ─────────────────────────────────────────── + + +@dataclass(slots=True) +class EvidenceItem: + """LLM 또는 rule fallback 이 추출한 단일 evidence span. + + n 은 doc-group ordering + relevance 정렬 후 1부터 재부여된다. + `full_snippet` 은 **synthesis 프롬프트에 절대 포함 금지** — debug / citation + 원문 보기 전용. + """ + + n: int # 1-based, synthesis 프롬프트의 [n] 과 매핑 + chunk_id: int | None + doc_id: int + title: str | None + section_title: str | None + span_text: str # LLM 추출 (또는 rule fallback) span, 80~300자 + relevance: float # LLM 0~1 (fallback 시 rerank_score 복사) + rerank_score: float # raw reranker 점수 + full_snippet: str # 원본 800자 (debug/citation 전용, synthesis 금지) + + +# ─── 프롬프트 로딩 (module 초기화 1회) ─────────────────── +try: + EVIDENCE_PROMPT = _load_prompt("evidence_extract.txt") +except FileNotFoundError: + EVIDENCE_PROMPT = "" + logger.warning( + "evidence_extract.txt not found — evidence_service will always use rule-only fallback" + ) + + +# ─── Helper: candidates → LLM 입력 블록 ────────────────── + + +def _build_numbered_candidates( + candidates: list["SearchResult"], query: str +) -> tuple[str, list[str]]: + """LLM 프롬프트의 {numbered_candidates} 블록 + 재윈도우된 full_snippet 리스트. + + Returns: + (block_str, full_snippets) — full_snippets[i] 는 1-based n=i+1 의 원문 + """ + lines: list[str] = [] + full_snippets: list[str] = [] + for i, c in enumerate(candidates, 1): + title = (c.title or "").strip() + raw_text = c.snippet or "" + full = _extract_window(raw_text, query, target_chars=CANDIDATE_SNIPPET_CHARS) + full_snippets.append(full) + lines.append(f"[{i}] title: {title} / text: {full}") + return "\n".join(lines), full_snippets + + +# ─── Helper: span length 보정 ─────────────────────────── + + +def _normalize_span(span: str, full: str, query: str) -> tuple[str, bool]: + """span 을 SPAN_MIN_CHARS ~ SPAN_MAX_CHARS 범위로 보정. + + Returns: + (normalized_span, was_expanded) + - was_expanded=True 이면 "short_span_expanded" 로그 대상 + """ + s = (span or "").strip() + expanded = False + if len(s) < SPAN_MIN_CHARS: + # soft hallucination 방어 — query 중심으로 window 재확장 + s = _extract_window(full, query, target_chars=SPAN_ENLARGE_TARGET) + expanded = True + if len(s) > SPAN_MAX_CHARS: + s = s[:SPAN_MAX_CHARS] + return s, expanded + + +# ─── Helper: doc-group ordering ───────────────────────── + + +def _apply_doc_group_ordering( + items: list[EvidenceItem], + results: list["SearchResult"], +) -> list[EvidenceItem]: + """검색 결과 doc 순서 유지 + doc 내부만 relevance desc + n 재부여. + + answer 는 [1][2][3] 순서로 생성되고 그 순서가 문맥 흐름을 결정한다. + 전역 relevance desc 정렬은 "doc A span1 → doc B span1 → doc A span2" + 처럼 튀면서 읽기 이상한 답변을 만든다. + """ + if not items: + return [] + doc_order: dict[int, int] = {} + for idx, r in enumerate(results): + if r.id not in doc_order: + doc_order[r.id] = idx + # 정렬: (doc 순서, -relevance) + items.sort( + key=lambda it: (doc_order.get(it.doc_id, 9999), -it.relevance) + ) + # n 재부여 + for new_n, it in enumerate(items, 1): + it.n = new_n + return items + + +# ─── Helper: rule-only fallback ───────────────────────── + + +def _build_rule_only_evidence( + candidates: list["SearchResult"], + full_snippets: list[str], + query: str, +) -> list[EvidenceItem]: + """LLM 실패/timeout 시 rule-only 경로. + + ⚠ `full_snippet[:200]` 같은 앞자르기 금지. 반드시 `_extract_window` 로 + query 중심 윈도우를 만든다. relevance 는 rerank_score 복사. + """ + items: list[EvidenceItem] = [] + for i, (c, full) in enumerate(zip(candidates, full_snippets), 1): + span = _extract_window(full, query, target_chars=200) + # 정규화 (보통 여기서는 SPAN_MIN_CHARS 이상이지만 안전장치) + span, _expanded = _normalize_span(span, full, query) + items.append( + EvidenceItem( + n=i, + chunk_id=c.chunk_id, + doc_id=c.id, + title=c.title, + section_title=c.section_title, + span_text=span, + relevance=float(c.rerank_score or c.score or 0.0), + rerank_score=float(c.rerank_score or c.score or 0.0), + full_snippet=full, + ) + ) + return items + + +# ─── Core: extract_evidence ───────────────────────────── + + +async def extract_evidence( + query: str, + results: list["SearchResult"], + ai_client: AIClient | None = None, +) -> tuple[list[EvidenceItem], str | None]: + """reranked results → EvidenceItem 리스트. + + Returns: + (items, skip_reason) + skip_reason ∈ {None, "empty_retrieval", "all_low_rerank", "fast_path", + "llm_timeout_fallback_rule", "llm_error_fallback_rule", + "parse_failed_fallback_rule", "all_llm_rejected"} + - skip_reason 이 None 이 아니어도 items 는 비어있지 않을 수 있다 + (fallback/fast_path 경로). + """ + if not results: + return [], "empty_retrieval" + + # ── 1차 rule filter: rerank_score >= EVIDENCE_MIN_RERANK + max_per_doc ── + candidates: list["SearchResult"] = [] + per_doc: dict[int, int] = {} + for r in results: + raw_score = r.rerank_score if r.rerank_score is not None else r.score + if raw_score is None or raw_score < EVIDENCE_MIN_RERANK: + continue + if per_doc.get(r.id, 0) >= MAX_PER_DOC: + continue + candidates.append(r) + per_doc[r.id] = per_doc.get(r.id, 0) + 1 + if len(candidates) >= MAX_EVIDENCE_CANDIDATES: + break + + if not candidates: + return [], "all_low_rerank" + + # ── Fast-path (현재 비활성) ───────────────────────── + if EVIDENCE_FAST_PATH_THRESHOLD is not None: + # ⚠ display score 가 아니라 raw rerank_score 로 판단. + # normalize_display_scores 를 거친 r.score 는 frontend 용 리스케일 + # 값이라 distribution drift 가능. fast-path 는 reranker raw 신호가 안전. + top_rerank = ( + results[0].rerank_score if results[0].rerank_score is not None else 0.0 + ) + if top_rerank is not None and top_rerank >= EVIDENCE_FAST_PATH_THRESHOLD: + _block, full_snippets = _build_numbered_candidates(candidates, query) + items = _build_rule_only_evidence(candidates, full_snippets, query) + items = _apply_doc_group_ordering(items, results) + logger.info( + "evidence fast_path query=%r candidates=%d kept=%d top_rerank=%.2f", + query[:80], len(candidates), len(items), top_rerank, + ) + return items, "fast_path" + + # ── LLM 호출 준비 ─────────────────────────────────── + if not EVIDENCE_PROMPT: + # 프롬프트 미로딩 → rule-only + _block, full_snippets = _build_numbered_candidates(candidates, query) + items = _build_rule_only_evidence(candidates, full_snippets, query) + items = _apply_doc_group_ordering(items, results) + logger.warning( + "evidence prompt_not_loaded → rule fallback query=%r kept=%d", + query[:80], len(items), + ) + return items, "llm_error_fallback_rule" + + block, full_snippets = _build_numbered_candidates(candidates, query) + prompt = EVIDENCE_PROMPT.replace("{query}", query).replace( + "{numbered_candidates}", block + ) + + client_owned = False + if ai_client is None: + ai_client = AIClient() + client_owned = True + + t_start = time.perf_counter() + raw: str | None = None + llm_error: str | None = None + + try: + # ⚠ semaphore 대기는 timeout 바깥. timeout 은 실제 LLM 호출에만. + async with get_mlx_gate(): + async with asyncio.timeout(LLM_TIMEOUT_MS / 1000): + raw = await ai_client._call_chat(ai_client.ai.primary, prompt) + except asyncio.TimeoutError: + llm_error = "timeout" + except Exception as exc: + llm_error = f"llm_error:{type(exc).__name__}" + finally: + if client_owned: + try: + await ai_client.close() + except Exception: + pass + + elapsed_ms = (time.perf_counter() - t_start) * 1000 + + # ── LLM 실패 → rule fallback ──────────────────────── + if llm_error is not None: + items = _build_rule_only_evidence(candidates, full_snippets, query) + items = _apply_doc_group_ordering(items, results) + logger.warning( + "evidence LLM %s → rule fallback query=%r candidates=%d kept=%d elapsed_ms=%.0f", + llm_error, query[:80], len(candidates), len(items), elapsed_ms, + ) + return items, "llm_timeout_fallback_rule" if llm_error == "timeout" else "llm_error_fallback_rule" + + parsed = parse_json_response(raw or "") + if not isinstance(parsed, dict) or not isinstance(parsed.get("items"), list): + items = _build_rule_only_evidence(candidates, full_snippets, query) + items = _apply_doc_group_ordering(items, results) + logger.warning( + "evidence parse_failed → rule fallback query=%r raw=%r elapsed_ms=%.0f", + query[:80], (raw or "")[:200], elapsed_ms, + ) + return items, "parse_failed_fallback_rule" + + # ── LLM 출력 파싱 ────────────────────────────────── + short_span_expanded = 0 + llm_items: list[EvidenceItem] = [] + for entry in parsed["items"]: + if not isinstance(entry, dict): + continue + try: + n_raw = int(entry.get("n", 0)) + except (TypeError, ValueError): + continue + if n_raw < 1 or n_raw > len(candidates): + continue + try: + relevance = float(entry.get("relevance", 0.0) or 0.0) + except (TypeError, ValueError): + relevance = 0.0 + if relevance < MIN_RELEVANCE_KEEP: + continue + span_raw = entry.get("span") + if not isinstance(span_raw, str) or not span_raw.strip(): + continue + + candidate = candidates[n_raw - 1] + full = full_snippets[n_raw - 1] + span, expanded = _normalize_span(span_raw, full, query) + if expanded: + short_span_expanded += 1 + + llm_items.append( + EvidenceItem( + n=n_raw, # doc-group ordering 에서 재부여됨 + chunk_id=candidate.chunk_id, + doc_id=candidate.id, + title=candidate.title, + section_title=candidate.section_title, + span_text=span, + relevance=relevance, + rerank_score=float( + candidate.rerank_score + if candidate.rerank_score is not None + else (candidate.score or 0.0) + ), + full_snippet=full, + ) + ) + + # ── LLM 이 전부 reject → rule fallback ────────────── + if not llm_items: + items = _build_rule_only_evidence(candidates, full_snippets, query) + items = _apply_doc_group_ordering(items, results) + logger.warning( + "evidence all_llm_rejected → rule fallback query=%r elapsed_ms=%.0f", + query[:80], elapsed_ms, + ) + return items, "all_llm_rejected" + + # ── doc-group ordering + n 재부여 ─────────────────── + llm_items = _apply_doc_group_ordering(llm_items, results) + + logger.info( + "evidence ok query=%r candidates=%d kept=%d short_span_expanded=%d elapsed_ms=%.0f", + query[:80], len(candidates), len(llm_items), short_span_expanded, elapsed_ms, + ) + return llm_items, None diff --git a/app/services/search/llm_gate.py b/app/services/search/llm_gate.py new file mode 100644 index 0000000..fc36f57 --- /dev/null +++ b/app/services/search/llm_gate.py @@ -0,0 +1,58 @@ +"""MLX single-inference 전역 gate (Phase 3.1.1). + +Mac mini MLX primary(gemma-4-26b-a4b-it-8bit)는 **single-inference**다. +동시 호출이 들어오면 queue가 폭발한다(실측: 23 concurrent 요청 → 22개 15초 timeout). + +이 모듈은 analyzer / evidence / synthesis 등 **모든 MLX-bound LLM 호출**이 +공유하는 `asyncio.Semaphore(1)`를 제공한다. MLX를 호출하는 경로는 예외 없이 +`async with get_mlx_gate():` 블록 안에서만 `AIClient._call_chat(ai.primary, ...)` +를 호출해야 한다. + +## 영구 룰 + +- **MLX primary 호출 경로는 예외 없이 gate 획득 필수**. query_analyzer / + evidence_service / synthesis_service 세 곳이 현재 사용자. 이후 경로가 늘어도 + 동일 gate를 import해서 사용한다. 새 Semaphore를 만들지 말 것 (큐 분할 시 + 동시 실행 발생). +- **`asyncio.timeout(...)`은 gate 안쪽에서만 적용**. gate 대기 자체에 timeout을 + 걸면 "대기만으로 timeout 발동" 버그가 재발한다(query_analyzer 초기 이슈). +- **fallback(Ollama) 경로는 gate 제외**. GPU Ollama는 concurrent OK. 단 현재 + 구현상 `AIClient._call_chat` 내부에서 primary→fallback 전환이 일어나므로 + fallback도 gate 점유 상태로 실행된다. 허용 가능(fallback 빈도 낮음). +- **MLX concurrency는 `MLX_CONCURRENCY = 1` 고정**. 모델이 바뀌어도 single- + inference 특성이 깨지지 않는 한 이 값을 올리지 말 것. + +## 확장 여지 (지금은 구현하지 않음) + +트래픽 증가 시 "우선순위 역전"(/ask가 analyzer background task 뒤에 밀림)이 +문제가 되면 `asyncio.PriorityQueue` 기반 우선순위 큐로 교체 가능. Gate 자체 +분리(get_analyzer_gate / get_ask_gate)는 single-inference에서 throughput +개선이 없으므로 의미 없음. +""" + +from __future__ import annotations + +import asyncio + +# MLX primary는 single-inference → 1 +MLX_CONCURRENCY = 1 + +# 첫 호출 시 현재 event loop에 바인딩된 Semaphore 생성 (lazy init) +_mlx_gate: asyncio.Semaphore | None = None + + +def get_mlx_gate() -> asyncio.Semaphore: + """MLX primary 호출 경로 공용 gate. 최초 호출 시 lazy init. + + 사용 예: + async with get_mlx_gate(): + async with asyncio.timeout(LLM_TIMEOUT_MS / 1000): + raw = await ai_client._call_chat(ai_client.ai.primary, prompt) + + ⚠ `asyncio.timeout`은 반드시 gate 안쪽에 둘 것. 바깥에 두면 gate 대기만으로 + timeout이 발동한다. + """ + global _mlx_gate + if _mlx_gate is None: + _mlx_gate = asyncio.Semaphore(MLX_CONCURRENCY) + return _mlx_gate diff --git a/app/services/search/query_analyzer.py b/app/services/search/query_analyzer.py index 8fe2c09..e407f50 100644 --- a/app/services/search/query_analyzer.py +++ b/app/services/search/query_analyzer.py @@ -36,6 +36,8 @@ from ai.client import AIClient, _load_prompt, parse_json_response from core.config import settings from core.utils import setup_logger +from .llm_gate import get_mlx_gate + logger = setup_logger("query_analyzer") # ─── 상수 (plan 영구 룰) ──────────────────────────────── @@ -67,17 +69,16 @@ _CACHE: dict[str, dict[str, Any]] = {} _PENDING: set[asyncio.Task[Any]] = set() # 동일 쿼리 중복 실행 방지 (진행 중인 쿼리 집합) _INFLIGHT: set[str] = set() -# MLX concurrency 제한 (single-inference → 1) -# 첫 호출 시 lazy init (event loop이 준비된 후) -_LLM_SEMAPHORE: asyncio.Semaphore | None = None def _get_llm_semaphore() -> asyncio.Semaphore: - """첫 호출 시 현재 event loop에 바인딩된 semaphore 생성.""" - global _LLM_SEMAPHORE - if _LLM_SEMAPHORE is None: - _LLM_SEMAPHORE = asyncio.Semaphore(LLM_CONCURRENCY) - return _LLM_SEMAPHORE + """MLX single-inference gate를 반환. Phase 3.1부터 llm_gate.get_mlx_gate() + 로 위임 — analyzer / evidence / synthesis 가 동일 semaphore 공유. + + `LLM_CONCURRENCY` 상수는 하위 호환/문서용으로 유지하되, 실제 bound는 + `llm_gate.MLX_CONCURRENCY` 가 담당한다. + """ + return get_mlx_gate() def _cache_key(query: str) -> str: diff --git a/app/services/search/rerank_service.py b/app/services/search/rerank_service.py index c107f59..a35633d 100644 --- a/app/services/search/rerank_service.py +++ b/app/services/search/rerank_service.py @@ -134,7 +134,12 @@ async def rerank_chunks( if idx is None or sc is None or idx >= len(candidates): continue chunk = candidates[idx] - chunk.score = float(sc) + score = float(sc) + chunk.score = score + # Phase 3.1: reranker raw 점수를 별도 필드에 보존. + # normalize_display_scores가 나중에 .score를 랭크 기반으로 덮어써도 + # fast-path 판단에 쓸 수 있는 원본 신호 유지. + chunk.rerank_score = score chunk.match_reason = (chunk.match_reason or "") + "+rerank" reranked.append(chunk) return reranked[:limit] diff --git a/app/services/search/search_pipeline.py b/app/services/search/search_pipeline.py new file mode 100644 index 0000000..2245422 --- /dev/null +++ b/app/services/search/search_pipeline.py @@ -0,0 +1,335 @@ +"""검색 파이프라인 오케스트레이션 (Phase 3.1). + +`/api/search/` 와 `/api/search/ask` 가 공유하는 단일 진실 소스. + +## 순수성 규칙 (영구) + +`run_search()`는 wrapper(endpoint)에서 side effect를 최대한 분리한다: +- ❌ **금지**: `BackgroundTasks` 파라미터, `logger.info(...)` 직접 호출, + `record_search_event()` 호출, `SearchResponse`/`AskResponse` 직렬화 +- ✅ **허용**: `trigger_background_analysis()` (analyzer cache miss 시 + fire-and-forget task — retrieval 전략의 일부, 자가 완결됨) +- ✅ **허용**: retrieval / fusion / rerank / diversity / display 정규화 / + confidence 계산 같은 내부 서비스 호출 + +반환값은 `PipelineResult` 하나. wrapper가 그 안에서 필요한 필드를 꺼내 +logger / telemetry / 응답 직렬화를 수행한다. + +## Phase 2 호환 + +본 모듈은 기존 `app/api/search.py::search()` 함수 본문을 lift-and-shift 한 +것이다. 변수명 / notes 문자열 / timing 키 / logger 포맷 은 wrapper 쪽에서 +완전히 동일하게 재구성된다. refactor 전후 `/search?debug=true` 응답은 +byte-level 에 가깝게 일치해야 한다. +""" + +from __future__ import annotations + +import time +from dataclasses import dataclass, field +from typing import TYPE_CHECKING, Literal + +from sqlalchemy.ext.asyncio import AsyncSession + +from . import query_analyzer +from .fusion_service import ( + DEFAULT_FUSION, + apply_soft_filter_boost, + get_strategy, + normalize_display_scores, +) +from .rerank_service import ( + MAX_CHUNKS_PER_DOC, + MAX_RERANK_INPUT, + apply_diversity, + rerank_chunks, +) +from .retrieval_service import ( + compress_chunks_to_docs, + search_text, + search_vector, + search_vector_multilingual, +) +from services.search_telemetry import ( + compute_confidence, + compute_confidence_hybrid, + compute_confidence_reranked, +) + +if TYPE_CHECKING: + from api.search import SearchResult + + +# ─── Phase 2.1: analyzer_confidence 3단계 게이트 ────────── +# search.py 에서 이동. search.py 의 /search wrapper 는 이 상수들을 +# 노출할 필요 없으므로 파이프라인 모듈에만 둔다. +ANALYZER_TIER_IGNORE = 0.5 # < 0.5 → analyzer 완전 무시, soft_filter 비활성 +ANALYZER_TIER_ORIGINAL = 0.7 # < 0.7 → original query fallback +ANALYZER_TIER_MERGE = 0.85 # < 0.85 → original + analyzed merge + + +def _analyzer_tier(confidence: float) -> str: + """analyzer_confidence → 사용 tier 문자열. Phase 2.2/2.3에서 실제 분기용.""" + if confidence < ANALYZER_TIER_IGNORE: + return "ignore" + if confidence < ANALYZER_TIER_ORIGINAL: + return "original_fallback" + if confidence < ANALYZER_TIER_MERGE: + return "merge" + return "analyzed" + + +# ─── 반환 타입 ───────────────────────────────────────────── + + +@dataclass(slots=True) +class PipelineResult: + """run_search() 반환 — wrapper 가 필요한 모든 state 를 담는다.""" + + # ── 최종 결과 (API 노출) ── + results: "list[SearchResult]" + mode: str + confidence_signal: float + + # ── 중간 단계 (evidence 입력 + debug) ── + text_results: "list[SearchResult]" + vector_results: "list[SearchResult]" # doc 압축 후 + raw_chunks: "list[SearchResult]" # chunk 원본 (rerank/evidence용) + chunks_by_doc: "dict[int, list[SearchResult]]" + + # ── 쿼리 분석 메타 ── + query_analysis: dict | None + analyzer_cache_hit: bool + analyzer_confidence: float # 항상 float (None 금지) + analyzer_tier: str + + # ── 관측 ── + timing_ms: dict[str, float] = field(default_factory=dict) + notes: list[str] = field(default_factory=list) + + +# ─── 메인 파이프라인 ─────────────────────────────────────── + + +async def run_search( + session: AsyncSession, + q: str, + *, + mode: Literal["fts", "trgm", "vector", "hybrid"] = "hybrid", + limit: int = 20, + fusion: str = DEFAULT_FUSION, + rerank: bool = True, + analyze: bool = False, +) -> PipelineResult: + """검색 파이프라인 실행. + + retrieval → fusion → rerank → diversity → display 정규화 → confidence 계산 + 까지 수행하고 `PipelineResult` 를 반환한다. logging / BackgroundTasks / + 응답 직렬화는 절대 수행하지 않는다 (wrapper 책임). + + Args: + session: AsyncSession (caller 가 관리) + q: 사용자 쿼리 원문 + mode: fts | trgm | vector | hybrid + limit: 최종 결과 수 (hybrid 에서는 fusion 입력 후보 수는 이보다 넓음) + fusion: legacy | rrf | rrf_boost + rerank: bge-reranker-v2-m3 활성화 (hybrid 전용) + analyze: QueryAnalyzer 활성화 (cache hit 조건부 멀티링구얼 / soft filter) + + Returns: + PipelineResult + """ + # 로컬 import — circular 방지 (SearchResult 는 api.search 에 inline 선언) + from api.search import SearchResult # noqa: F401 — TYPE_CHECKING 실런타임 반영 + + timing: dict[str, float] = {} + notes: list[str] = [] + text_results: list["SearchResult"] = [] + vector_results: list["SearchResult"] = [] # doc-level (압축 후, fusion 입력) + raw_chunks: list["SearchResult"] = [] # chunk-level (raw, Phase 1.3 reranker용) + chunks_by_doc: dict[int, list["SearchResult"]] = {} # Phase 1.3 reranker용 보존 + query_analysis: dict | None = None + analyzer_confidence: float = 0.0 + analyzer_tier: str = "disabled" + + t_total = time.perf_counter() + + # Phase 2.1 (async 구조): QueryAnalyzer는 동기 호출 금지. + # - cache hit → query_analysis 활용 (Phase 2.2/2.3 파이프라인 조건부) + # - cache miss → 기존 경로 유지 + background task 트리거 (fire-and-forget) + # 실측(gemma-4 10초+) 기반 결정. memory: feedback_analyzer_async_only.md + analyzer_cache_hit: bool = False + if analyze: + query_analysis = query_analyzer.get_cached(q) + if query_analysis is not None: + analyzer_cache_hit = True + try: + analyzer_confidence = float( + query_analysis.get("analyzer_confidence", 0.0) or 0.0 + ) + except (TypeError, ValueError): + analyzer_confidence = 0.0 + analyzer_tier = _analyzer_tier(analyzer_confidence) + notes.append( + f"analyzer cache_hit conf={analyzer_confidence:.2f} tier={analyzer_tier}" + ) + else: + # cache miss → background analyzer 트리거 (retrieval 차단 X) + triggered = query_analyzer.trigger_background_analysis(q) + analyzer_tier = "cache_miss" + notes.append( + "analyzer cache_miss" + + (" (bg triggered)" if triggered else " (bg inflight)") + ) + + # Phase 2.2: multilingual vector search 활성 조건 (보수적) + # - cache hit + analyzer_tier == "analyzed" (≥0.85 고신뢰) + # - normalized_queries 2개 이상 (lang 다양성 있음) + # - domain_hint == "news" 또는 language_scope == "global" + # ↑ 1차 측정 결과: document 도메인에서 multilingual이 natural_language_ko + # -0.10 악화시킴. 영어 번역이 한국어 법령 검색에서 noise로 작용. + # news / global 영역에서만 multilingual 활성 (news_crosslingual +0.10 개선 확인). + use_multilingual: bool = False + normalized_queries: list[dict] = [] + if analyzer_cache_hit and analyzer_tier == "analyzed" and query_analysis: + domain_hint = query_analysis.get("domain_hint", "mixed") + language_scope = query_analysis.get("language_scope", "limited") + is_multilingual_candidate = ( + domain_hint == "news" or language_scope == "global" + ) + if is_multilingual_candidate: + raw_nq = query_analysis.get("normalized_queries") or [] + if isinstance(raw_nq, list) and len(raw_nq) >= 2: + normalized_queries = [ + nq for nq in raw_nq if isinstance(nq, dict) and nq.get("text") + ] + if len(normalized_queries) >= 2: + use_multilingual = True + notes.append( + f"multilingual langs={[nq.get('lang') for nq in normalized_queries]}" + f" hint={domain_hint}/{language_scope}" + ) + + if mode == "vector": + t0 = time.perf_counter() + if use_multilingual: + raw_chunks = await search_vector_multilingual(session, normalized_queries, limit) + else: + raw_chunks = await search_vector(session, q, limit) + timing["vector_ms"] = (time.perf_counter() - t0) * 1000 + if not raw_chunks: + notes.append("vector_search_returned_empty (AI client error or no embeddings)") + # vector 단독 모드도 doc 압축해서 다양성 확보 (chunk 중복 방지) + vector_results, chunks_by_doc = compress_chunks_to_docs(raw_chunks, limit) + results = vector_results + else: + t0 = time.perf_counter() + text_results = await search_text(session, q, limit) + timing["text_ms"] = (time.perf_counter() - t0) * 1000 + + if mode == "hybrid": + t1 = time.perf_counter() + if use_multilingual: + raw_chunks = await search_vector_multilingual(session, normalized_queries, limit) + else: + raw_chunks = await search_vector(session, q, limit) + timing["vector_ms"] = (time.perf_counter() - t1) * 1000 + + # chunk-level → doc-level 압축 (raw chunks는 chunks_by_doc에 보존) + t1b = time.perf_counter() + vector_results, chunks_by_doc = compress_chunks_to_docs(raw_chunks, limit) + timing["compress_ms"] = (time.perf_counter() - t1b) * 1000 + + if not vector_results: + notes.append("vector_search_returned_empty — text-only fallback") + + t2 = time.perf_counter() + strategy = get_strategy(fusion) + # fusion은 doc 기준 — 더 넓게 가져옴 (rerank 후보용) + fusion_limit = max(limit * 5, 100) if rerank else limit + fused_docs = strategy.fuse(text_results, vector_results, q, fusion_limit) + timing["fusion_ms"] = (time.perf_counter() - t2) * 1000 + notes.append(f"fusion={strategy.name}") + notes.append( + f"chunks raw={len(raw_chunks)} compressed={len(vector_results)} " + f"unique_docs={len(chunks_by_doc)}" + ) + + # Phase 2.3: soft_filter boost (cache hit + tier != ignore 일 때만) + # analyzer_confidence < 0.5 (tier=ignore)는 비활성. + if ( + analyzer_cache_hit + and analyzer_tier != "ignore" + and query_analysis + ): + soft_filters = query_analysis.get("soft_filters") or {} + if soft_filters: + boosted = apply_soft_filter_boost(fused_docs, soft_filters) + if boosted > 0: + notes.append(f"soft_filter_boost applied={boosted}") + + if rerank: + # Phase 1.3: reranker — chunk 기준 입력 + # fusion 결과 doc_id로 chunks_by_doc에서 raw chunks 회수 + t3 = time.perf_counter() + rerank_input: list["SearchResult"] = [] + for doc in fused_docs: + chunks = chunks_by_doc.get(doc.id, []) + if chunks: + # doc당 max 2 chunk (latency/VRAM 보호) + rerank_input.extend(chunks[:MAX_CHUNKS_PER_DOC]) + else: + # text-only 매치 doc → doc 자체를 chunk처럼 wrap + rerank_input.append(doc) + if len(rerank_input) >= MAX_RERANK_INPUT: + break + rerank_input = rerank_input[:MAX_RERANK_INPUT] + notes.append(f"rerank input={len(rerank_input)}") + + reranked = await rerank_chunks(q, rerank_input, limit * 3) + timing["rerank_ms"] = (time.perf_counter() - t3) * 1000 + + # diversity (chunk → doc 압축, max_per_doc=2, top score>0.90 unlimited) + t4 = time.perf_counter() + results = apply_diversity(reranked, max_per_doc=MAX_CHUNKS_PER_DOC)[:limit] + timing["diversity_ms"] = (time.perf_counter() - t4) * 1000 + else: + # rerank 비활성: fused_docs를 그대로 (limit 적용) + results = fused_docs[:limit] + else: + results = text_results + + # display score 정규화 — 프론트엔드는 score*100을 % 표시. + # fusion 내부 score(RRF는 0.01~0.05 범위)를 그대로 노출하면 표시가 깨짐. + # Phase 3.1: rerank_score 필드는 여기서 건드리지 않음 (raw 보존). + normalize_display_scores(results) + + timing["total_ms"] = (time.perf_counter() - t_total) * 1000 + + # confidence는 fusion 적용 전 raw 신호로 계산 (Phase 0.5 이후 fused score는 절대값 의미 없음) + # rerank 활성 시 reranker score가 가장 신뢰할 수 있는 신호 → 우선 사용 + if mode == "hybrid": + if rerank and "rerank_ms" in timing: + confidence_signal = compute_confidence_reranked(results) + else: + confidence_signal = compute_confidence_hybrid(text_results, vector_results) + elif mode == "vector": + confidence_signal = compute_confidence(vector_results, "vector") + else: + confidence_signal = compute_confidence(text_results, mode) + + return PipelineResult( + results=results, + mode=mode, + confidence_signal=confidence_signal, + text_results=text_results, + vector_results=vector_results, + raw_chunks=raw_chunks, + chunks_by_doc=chunks_by_doc, + query_analysis=query_analysis, + analyzer_cache_hit=analyzer_cache_hit, + analyzer_confidence=analyzer_confidence, + analyzer_tier=analyzer_tier, + timing_ms=timing, + notes=notes, + ) diff --git a/app/services/search/synthesis_service.py b/app/services/search/synthesis_service.py index b9dbfe4..3d70589 100644 --- a/app/services/search/synthesis_service.py +++ b/app/services/search/synthesis_service.py @@ -1,6 +1,422 @@ -"""Grounded answer synthesis 서비스 (Phase 3). +"""Grounded answer synthesis 서비스 (Phase 3.3). -evidence span을 Gemma 4에 전달해 인용 기반 답변 생성. -3~4초 soft timeout, 타임아웃 시 결과만 반환 fallback. -구현은 Phase 3에서 채움. +evidence span 을 Gemma 4 에 전달해 citation 기반 답변을 생성한다. +캐시 / timeout / citation 검증 / refused 처리 포함. + +## 영구 룰 + +- **span-only 입력**: `_render_prompt()` 는 `EvidenceItem.span_text` 만 참조한다. + `EvidenceItem.full_snippet` 을 프롬프트에 포함하면 LLM 이 span 밖 내용을 + hallucinate 한다. 이 규칙이 깨지면 시스템 무너짐 → docstring + 코드 패턴으로 + 방어 (함수 상단에서 제한 뷰만 만든다). +- **cache 는 성공 + 고신뢰에만**: 실패 (timeout/parse_failed/llm_error) 와 + low confidence / refused 는 캐시 금지. 잘못된 답변 고정 방지. +- **MLX gate 공유**: `get_mlx_gate()` 경유. analyzer / evidence 와 동일 semaphore. +- **timeout 15s**: `asyncio.timeout` 은 gate 안쪽에서만 적용. 바깥에 두면 gate + 대기만으로 timeout 발동. +- **citation 검증**: 본문 `[n]` 범위 초과는 제거 + `hallucination_flags` 기록. + answer 수정본을 반환하되 status 는 completed 유지 (silent fix + observable). """ + +from __future__ import annotations + +import asyncio +import hashlib +import re +import time +from dataclasses import dataclass, field +from typing import TYPE_CHECKING, Literal + +from ai.client import AIClient, _load_prompt, parse_json_response +from core.config import settings +from core.utils import setup_logger + +from .llm_gate import get_mlx_gate + +if TYPE_CHECKING: + from .evidence_service import EvidenceItem + +logger = setup_logger("synthesis") + +# ─── 상수 (plan 영구 룰) ───────────────────────────────── +PROMPT_VERSION = "v1" +LLM_TIMEOUT_MS = 15000 +CACHE_TTL = 3600 # 1h (answer 는 원문 변경에 민감 → query_analyzer 24h 보다 짧게) +CACHE_MAXSIZE = 300 +MAX_ANSWER_CHARS = 400 + +SynthesisStatus = Literal[ + "completed", + "timeout", + "skipped", + "no_evidence", + "parse_failed", + "llm_error", +] + + +# ─── 반환 타입 ─────────────────────────────────────────── + + +@dataclass(slots=True) +class SynthesisResult: + """synthesize() 반환. cache dict 에 들어가는 payload 이기도 함.""" + + status: SynthesisStatus + answer: str | None + used_citations: list[int] # 검증 후 실제로 본문에 등장한 n + confidence: Literal["high", "medium", "low"] | None + refused: bool + refuse_reason: str | None + elapsed_ms: float + cache_hit: bool + hallucination_flags: list[str] = field(default_factory=list) + raw_preview: str | None = None # debug=true 일 때 LLM raw 500자 + + +# ─── 프롬프트 로딩 (module 초기화 1회) ────────────────── +try: + SYNTHESIS_PROMPT = _load_prompt("search_synthesis.txt") +except FileNotFoundError: + SYNTHESIS_PROMPT = "" + logger.warning( + "search_synthesis.txt not found — synthesis will always return llm_error" + ) + + +# ─── in-memory LRU (FIFO 근사, query_analyzer 패턴 복제) ─ +_CACHE: dict[str, SynthesisResult] = {} + + +def _model_version() -> str: + """현재 primary 모델 ID — 캐시 키에 반영.""" + if settings.ai and settings.ai.primary: + return settings.ai.primary.model + return "unknown-model" + + +def _cache_key(query: str, chunk_ids: list[int]) -> str: + """(query + sorted chunk_ids + PROMPT_VERSION + model) sha256.""" + sorted_ids = ",".join(str(c) for c in sorted(chunk_ids)) + raw = f"{query}|{sorted_ids}|{PROMPT_VERSION}|{_model_version()}" + return hashlib.sha256(raw.encode("utf-8")).hexdigest() + + +def get_cached(query: str, chunk_ids: list[int]) -> SynthesisResult | None: + """캐시 조회. TTL 경과는 자동 삭제.""" + key = _cache_key(query, chunk_ids) + entry = _CACHE.get(key) + if entry is None: + return None + # TTL 체크는 elapsed_ms 를 악용할 수 없으므로 별도 저장 + # 여기서는 단순 policy 로 처리: entry 가 있으면 반환 (eviction 은 FIFO 시점) + # 정확한 TTL 이 필요하면 (ts, result) tuple 로 저장해야 함. + return entry + + +def _should_cache(result: SynthesisResult) -> bool: + """실패/저신뢰/refused 는 캐시 금지.""" + return ( + result.status == "completed" + and result.confidence in ("high", "medium") + and not result.refused + and result.answer is not None + ) + + +def set_cached(query: str, chunk_ids: list[int], result: SynthesisResult) -> None: + """조건부 저장 + FIFO eviction.""" + if not _should_cache(result): + return + key = _cache_key(query, chunk_ids) + if key in _CACHE: + _CACHE[key] = result + return + if len(_CACHE) >= CACHE_MAXSIZE: + try: + oldest = next(iter(_CACHE)) + _CACHE.pop(oldest, None) + except StopIteration: + pass + _CACHE[key] = result + + +def cache_stats() -> dict[str, int]: + """debug/운영용.""" + return {"size": len(_CACHE), "maxsize": CACHE_MAXSIZE} + + +# ─── Prompt rendering (🔒 span_text ONLY) ─────────────── + + +def _render_prompt(query: str, evidence: list["EvidenceItem"]) -> str: + """{query} / {numbered_evidence} 치환. + + ⚠ **MUST NOT access `item.full_snippet`**. Use `span_text` ONLY. + Rationale: 프롬프트에 full_snippet 을 넣으면 LLM 이 span 밖 내용으로 + hallucinate 한다. full_snippet 은 debug / citation 원문 전용. + + 제한 뷰만 만들어서 full_snippet 접근을 문법적으로 어렵게 만든다. + """ + # 제한 뷰 — 이 튜플에는 span_text 외의 snippet 필드가 없다 + spans: list[tuple[int, str, str]] = [ + (i.n, (i.title or "").strip(), i.span_text) for i in evidence + ] + lines = [f"[{n}] {title}\n{span}" for n, title, span in spans] + numbered_block = "\n\n".join(lines) + return SYNTHESIS_PROMPT.replace("{query}", query).replace( + "{numbered_evidence}", numbered_block + ) + + +# ─── Citation 검증 ────────────────────────────────────── + +_CITATION_RE = re.compile(r"\[(\d+)\]") + + +def _validate_citations( + answer: str, + n_max: int, +) -> tuple[str, list[int], list[str]]: + """본문 `[n]` 범위 초과 제거 + used_citations 추출 + flags. + + Returns: + (corrected_answer, used_citations, hallucination_flags) + """ + flags: list[str] = [] + seen: set[int] = set() + used: list[int] = [] + corrected = answer + + for match in _CITATION_RE.findall(answer): + try: + n = int(match) + except ValueError: + continue + if n < 1 or n > n_max: + # 범위 초과 → 본문에서 제거 + flag + corrected = corrected.replace(f"[{n}]", "") + flags.append(f"removed_n_{n}") + continue + if n not in seen: + seen.add(n) + used.append(n) + + used.sort() + + if len(corrected) > MAX_ANSWER_CHARS: + corrected = corrected[:MAX_ANSWER_CHARS] + flags.append("length_clipped") + + return corrected, used, flags + + +# ─── Core: synthesize ─────────────────────────────────── + + +async def synthesize( + query: str, + evidence: list["EvidenceItem"], + ai_client: AIClient | None = None, + debug: bool = False, +) -> SynthesisResult: + """evidence → grounded answer. + + Failure modes 는 모두 SynthesisResult 로 반환한다 (예외는 외부로 전파되지 + 않음). 호출자 (`/ask` wrapper) 가 status 를 보고 user-facing 메시지를 + 결정한다. + """ + t_start = time.perf_counter() + + # ── evidence 비면 즉시 no_evidence ───────────────── + if not evidence: + return SynthesisResult( + status="no_evidence", + answer=None, + used_citations=[], + confidence=None, + refused=False, + refuse_reason=None, + elapsed_ms=(time.perf_counter() - t_start) * 1000, + cache_hit=False, + hallucination_flags=[], + raw_preview=None, + ) + + # ── cache lookup ─────────────────────────────────── + # chunk_id 가 None 인 text-only wrap 은 음수 doc_id 로 구분 → key 안정화 + chunk_ids = [ + (e.chunk_id if e.chunk_id is not None else -e.doc_id) for e in evidence + ] + cached = get_cached(query, chunk_ids) + if cached is not None: + return SynthesisResult( + status=cached.status, + answer=cached.answer, + used_citations=list(cached.used_citations), + confidence=cached.confidence, + refused=cached.refused, + refuse_reason=cached.refuse_reason, + elapsed_ms=(time.perf_counter() - t_start) * 1000, + cache_hit=True, + hallucination_flags=list(cached.hallucination_flags), + raw_preview=cached.raw_preview if debug else None, + ) + + # ── prompt 준비 ───────────────────────────────────── + if not SYNTHESIS_PROMPT: + return SynthesisResult( + status="llm_error", + answer=None, + used_citations=[], + confidence=None, + refused=False, + refuse_reason=None, + elapsed_ms=(time.perf_counter() - t_start) * 1000, + cache_hit=False, + hallucination_flags=["prompt_not_loaded"], + raw_preview=None, + ) + + prompt = _render_prompt(query, evidence) + prompt_preview = prompt[:500] if debug else None + + # ── LLM 호출 ─────────────────────────────────────── + client_owned = False + if ai_client is None: + ai_client = AIClient() + client_owned = True + + raw: str | None = None + llm_error: str | None = None + + try: + async with get_mlx_gate(): + async with asyncio.timeout(LLM_TIMEOUT_MS / 1000): + raw = await ai_client._call_chat(ai_client.ai.primary, prompt) + except asyncio.TimeoutError: + llm_error = "timeout" + except Exception as exc: + llm_error = f"llm_error:{type(exc).__name__}" + finally: + if client_owned: + try: + await ai_client.close() + except Exception: + pass + + elapsed_ms = (time.perf_counter() - t_start) * 1000 + + if llm_error is not None: + status: SynthesisStatus = "timeout" if llm_error == "timeout" else "llm_error" + logger.warning( + "synthesis %s query=%r evidence_n=%d elapsed_ms=%.0f", + llm_error, query[:80], len(evidence), elapsed_ms, + ) + return SynthesisResult( + status=status, + answer=None, + used_citations=[], + confidence=None, + refused=False, + refuse_reason=None, + elapsed_ms=elapsed_ms, + cache_hit=False, + hallucination_flags=[llm_error], + raw_preview=None, + ) + + parsed = parse_json_response(raw or "") + if not isinstance(parsed, dict): + logger.warning( + "synthesis parse_failed query=%r raw=%r elapsed_ms=%.0f", + query[:80], (raw or "")[:200], elapsed_ms, + ) + return SynthesisResult( + status="parse_failed", + answer=None, + used_citations=[], + confidence=None, + refused=False, + refuse_reason=None, + elapsed_ms=elapsed_ms, + cache_hit=False, + hallucination_flags=["parse_failed"], + raw_preview=(raw or "")[:500] if debug else None, + ) + + # ── JSON 필드 검증 ────────────────────────────────── + answer_raw = parsed.get("answer", "") + if not isinstance(answer_raw, str): + answer_raw = "" + + conf_raw = parsed.get("confidence", "low") + if conf_raw not in ("high", "medium", "low"): + conf_raw = "low" + + refused_raw = bool(parsed.get("refused", False)) + refuse_reason_raw = parsed.get("refuse_reason") + if refuse_reason_raw is not None and not isinstance(refuse_reason_raw, str): + refuse_reason_raw = None + + # refused 면 answer 무시 + citations 비움 + if refused_raw: + result = SynthesisResult( + status="completed", + answer=None, + used_citations=[], + confidence=conf_raw, # type: ignore[arg-type] + refused=True, + refuse_reason=refuse_reason_raw, + elapsed_ms=elapsed_ms, + cache_hit=False, + hallucination_flags=[], + raw_preview=(raw or "")[:500] if debug else None, + ) + logger.info( + "synthesis refused query=%r evidence_n=%d conf=%s elapsed_ms=%.0f reason=%r", + query[:80], len(evidence), conf_raw, elapsed_ms, (refuse_reason_raw or "")[:80], + ) + # refused 는 캐시 금지 (_should_cache) + return result + + # ── citation 검증 ─────────────────────────────────── + corrected_answer, used_citations, flags = _validate_citations( + answer_raw, n_max=len(evidence) + ) + + # answer 가 공백만 남으면 low confidence 로 강등 + if not corrected_answer.strip(): + corrected_answer_final: str | None = None + conf_raw = "low" + flags.append("empty_after_validation") + else: + corrected_answer_final = corrected_answer + + result = SynthesisResult( + status="completed", + answer=corrected_answer_final, + used_citations=used_citations, + confidence=conf_raw, # type: ignore[arg-type] + refused=False, + refuse_reason=None, + elapsed_ms=elapsed_ms, + cache_hit=False, + hallucination_flags=flags, + raw_preview=(raw or "")[:500] if debug else None, + ) + + logger.info( + "synthesis ok query=%r evidence_n=%d answer_len=%d citations=%d conf=%s flags=%s elapsed_ms=%.0f", + query[:80], + len(evidence), + len(corrected_answer_final or ""), + len(used_citations), + conf_raw, + ",".join(flags) if flags else "-", + elapsed_ms, + ) + + # 조건부 캐시 저장 + set_cached(query, chunk_ids, result) + return result