Files
hyungi_document_server/app/api/search.py
Hyungi Ahn c81b728ddf refactor(search): Phase 2.1 QueryAnalyzer를 async-only 구조로 전환
## 철학 수정 (실측 기반)

gemma-4-26b-a4b-it-8bit MLX 실측:
  - full query_analyze.txt (prompt_tok=2406) → 10.5초
  - max_tokens 축소 무효 (모델 자연 EOS 조기 종료)
  - 쿼리 길이 영향 거의 없음 (프롬프트 자체가 지배)
  → 800ms timeout 가정은 13배 초과. 동기 호출 완전히 불가능.

따라서 QueryAnalyzer는 "즉시 실행하는 기능" → "미리 준비해두는 기능"으로
포지셔닝 변경. retrieval 경로에서 analyzer 동기 호출 **금지**.

## 구조

```
query → retrieval (항상 즉시)
         ↘ trigger_background_analysis (fire-and-forget)
            → analyze() [5초+] → cache 저장

다음 호출 (동일 쿼리) → get_cached() 히트 → Phase 2 파이프라인 활성화
```

## 변경 사항

### app/prompts/query_analyze.txt
 - 5971 chars → 2403 chars (40%)
 - 예시 4개 → 1개, 규칙 설명 축약
 - 목표 prompt_tok 2406 → ~600 (1/4)

### app/services/search/query_analyzer.py
 - LLM_TIMEOUT_MS 800 → 5000 (background이므로 여유 OK)
 - PROMPT_VERSION v1 → v2 (cache auto-invalidate)
 - get_cached / set_cached 유지 — retrieval 경로 O(1) 조회
 - trigger_background_analysis(query) 신규 — 동기 함수, 즉시 반환, task 생성
 - _PENDING set으로 task 참조 유지 (premature GC 방지)
 - _INFLIGHT set으로 동일 쿼리 중복 실행 방지
 - prewarm_analyzer() 신규 — startup에서 15~20 쿼리 미리 분석
 - DEFAULT_PREWARM_QUERIES: 평가셋 fixed 7 + 법령 3 + 뉴스 2 + 실무 3

### app/api/search.py
 - 기존 sync analyzer 호출 완전 제거
 - analyze=True → get_cached(q) 조회만 O(1)
   - hit: query_analysis 활용 (Phase 2.2/2.3 파이프라인 조건부 활성화)
   - miss: trigger_background_analysis(q) + 기존 경로 그대로
 - timing["analyze_ms"] 제거 (경로에 LLM 호출 없음)
 - notes에 analyzer cache_hit/cache_miss 상태 기록
 - debug.query_analysis는 cache hit 시에만 채워짐

### app/main.py
 - lifespan startup에 prewarm_analyzer() background task 추가
 - 논블로킹 — 앱 시작 막지 않음
 - delay_between=0.5로 MLX 부하 완화

## 기대 효과

 - cold 요청 latency: 기존 Phase 1.3 그대로 (회귀 0)
 - warm 요청 + prewarmed: cache hit → query_analysis 활용
 - 예상 cache hit rate: 초기 70~80% (prewarm) + 사용 누적
 - Phase 2.2/2.3 multilingual/filter 기능은 cache hit 시에만 동작

## 참조

 - memory: feedback_analyzer_async_only.md (영구 룰 저장)
 - plan: ~/.claude/plans/zesty-painting-kahan.md ("철학 수정" 섹션)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-08 14:47:09 +09:00

315 lines
12 KiB
Python

"""하이브리드 검색 API — orchestrator (Phase 1.1: thin endpoint).
retrieval / fusion / rerank 등 실제 로직은 services/search/* 모듈로 분리.
이 파일은 mode 분기, 응답 직렬화, debug 응답 구성, BackgroundTask dispatch만 담당.
"""
import time
from typing import Annotated
from fastapi import APIRouter, BackgroundTasks, Depends, Query
from pydantic import BaseModel
from sqlalchemy.ext.asyncio import AsyncSession
from core.auth import get_current_user
from core.database import get_session
from core.utils import setup_logger
from models.user import User
from services.search import query_analyzer
from services.search.fusion_service import DEFAULT_FUSION, get_strategy, normalize_display_scores
from services.search.rerank_service import (
MAX_CHUNKS_PER_DOC,
MAX_RERANK_INPUT,
apply_diversity,
rerank_chunks,
)
from services.search.retrieval_service import compress_chunks_to_docs, search_text, search_vector
from services.search_telemetry import (
compute_confidence,
compute_confidence_hybrid,
compute_confidence_reranked,
record_search_event,
)
# Phase 2.1: analyzer_confidence 3단계 게이트 (값 조정은 plan 기준)
ANALYZER_TIER_IGNORE = 0.5 # < 0.5 → analyzer 완전 무시, soft_filter 비활성
ANALYZER_TIER_ORIGINAL = 0.7 # < 0.7 → original query fallback
ANALYZER_TIER_MERGE = 0.85 # < 0.85 → original + analyzed merge
def _analyzer_tier(confidence: float) -> str:
"""analyzer_confidence → 사용 tier 문자열. Phase 2.2/2.3에서 실제 분기용."""
if confidence < ANALYZER_TIER_IGNORE:
return "ignore"
if confidence < ANALYZER_TIER_ORIGINAL:
return "original_fallback"
if confidence < ANALYZER_TIER_MERGE:
return "merge"
return "analyzed"
# logs/search.log + stdout 동시 출력 (Phase 0.4)
logger = setup_logger("search")
router = APIRouter()
class SearchResult(BaseModel):
"""검색 결과 단일 행.
Phase 1.2-C: chunk-level vector retrieval 도입으로 chunk 메타 필드 추가.
text 검색 결과는 chunk_id 등이 None (doc-level).
vector 검색 결과는 chunk_id 등이 채워짐 (chunk-level).
"""
id: int # doc_id (text/vector 공통)
title: str | None
ai_domain: str | None
ai_summary: str | None
file_format: str
score: float
snippet: str | None
match_reason: str | None = None
# Phase 1.2-C: chunk 메타 (vector 검색 시 채워짐)
chunk_id: int | None = None
chunk_index: int | None = None
section_title: str | None = None
# ─── Phase 0.4: 디버그 응답 스키마 ─────────────────────────
class DebugCandidate(BaseModel):
"""단계별 후보 (debug=true 응답에서만 노출)."""
id: int
rank: int
score: float
match_reason: str | None = None
class SearchDebug(BaseModel):
timing_ms: dict[str, float]
text_candidates: list[DebugCandidate] | None = None
vector_candidates: list[DebugCandidate] | None = None
fused_candidates: list[DebugCandidate] | None = None
confidence: float
notes: list[str] = []
# Phase 1/2 도입 후 채워질 placeholder
query_analysis: dict | None = None
reranker_scores: list[DebugCandidate] | None = None
class SearchResponse(BaseModel):
results: list[SearchResult]
total: int
query: str
mode: str
debug: SearchDebug | None = None
def _to_debug_candidates(rows: list[SearchResult], n: int = 20) -> list[DebugCandidate]:
return [
DebugCandidate(
id=r.id, rank=i + 1, score=r.score, match_reason=r.match_reason
)
for i, r in enumerate(rows[:n])
]
@router.get("/", response_model=SearchResponse)
async def search(
q: str,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
background_tasks: BackgroundTasks,
mode: str = Query("hybrid", pattern="^(fts|trgm|vector|hybrid)$"),
limit: int = Query(20, ge=1, le=100),
fusion: str = Query(
DEFAULT_FUSION,
pattern="^(legacy|rrf|rrf_boost)$",
description="hybrid 모드 fusion 전략 (legacy=기존 가중합, rrf=RRF k=60, rrf_boost=RRF+강한신호 boost)",
),
rerank: bool = Query(
True,
description="bge-reranker-v2-m3 활성화 (Phase 1.3, hybrid 모드만 동작)",
),
analyze: bool = Query(
False,
description="QueryAnalyzer 활성화 (Phase 2.1, LLM 호출). Phase 2.1은 debug 노출만, 검색 경로 영향 X",
),
debug: bool = Query(False, description="단계별 candidates + timing 응답에 포함"),
):
"""문서 검색 — FTS + ILIKE + 벡터 결합 (Phase 0.5: RRF fusion)"""
timing: dict[str, float] = {}
notes: list[str] = []
text_results: list[SearchResult] = []
vector_results: list[SearchResult] = [] # doc-level (압축 후, fusion 입력)
raw_chunks: list[SearchResult] = [] # chunk-level (raw, Phase 1.3 reranker용)
chunks_by_doc: dict[int, list[SearchResult]] = {} # Phase 1.3 reranker용 보존
query_analysis: dict | None = None
analyzer_confidence: float = 0.0
analyzer_tier: str = "disabled"
t_total = time.perf_counter()
# Phase 2.1 (async 구조): QueryAnalyzer는 동기 호출 금지.
# - cache hit → query_analysis 활용 (Phase 2.2/2.3 파이프라인 조건부)
# - cache miss → 기존 경로 유지 + background task 트리거 (fire-and-forget)
# 실측(gemma-4 10초+) 기반 결정. memory: feedback_analyzer_async_only.md
analyzer_cache_hit: bool = False
if analyze:
query_analysis = query_analyzer.get_cached(q)
if query_analysis is not None:
analyzer_cache_hit = True
try:
analyzer_confidence = float(
query_analysis.get("analyzer_confidence", 0.0) or 0.0
)
except (TypeError, ValueError):
analyzer_confidence = 0.0
analyzer_tier = _analyzer_tier(analyzer_confidence)
notes.append(
f"analyzer cache_hit conf={analyzer_confidence:.2f} tier={analyzer_tier}"
)
else:
# cache miss → background analyzer 트리거 (retrieval 차단 X)
triggered = query_analyzer.trigger_background_analysis(q)
analyzer_tier = "cache_miss"
notes.append(
"analyzer cache_miss"
+ (" (bg triggered)" if triggered else " (bg inflight)")
)
if mode == "vector":
t0 = time.perf_counter()
raw_chunks = await search_vector(session, q, limit)
timing["vector_ms"] = (time.perf_counter() - t0) * 1000
if not raw_chunks:
notes.append("vector_search_returned_empty (AI client error or no embeddings)")
# vector 단독 모드도 doc 압축해서 다양성 확보 (chunk 중복 방지)
vector_results, chunks_by_doc = compress_chunks_to_docs(raw_chunks, limit)
results = vector_results
else:
t0 = time.perf_counter()
text_results = await search_text(session, q, limit)
timing["text_ms"] = (time.perf_counter() - t0) * 1000
if mode == "hybrid":
t1 = time.perf_counter()
raw_chunks = await search_vector(session, q, limit)
timing["vector_ms"] = (time.perf_counter() - t1) * 1000
# chunk-level → doc-level 압축 (raw chunks는 chunks_by_doc에 보존)
t1b = time.perf_counter()
vector_results, chunks_by_doc = compress_chunks_to_docs(raw_chunks, limit)
timing["compress_ms"] = (time.perf_counter() - t1b) * 1000
if not vector_results:
notes.append("vector_search_returned_empty — text-only fallback")
t2 = time.perf_counter()
strategy = get_strategy(fusion)
# fusion은 doc 기준 — 더 넓게 가져옴 (rerank 후보용)
fusion_limit = max(limit * 5, 100) if rerank else limit
fused_docs = strategy.fuse(text_results, vector_results, q, fusion_limit)
timing["fusion_ms"] = (time.perf_counter() - t2) * 1000
notes.append(f"fusion={strategy.name}")
notes.append(
f"chunks raw={len(raw_chunks)} compressed={len(vector_results)} "
f"unique_docs={len(chunks_by_doc)}"
)
if rerank:
# Phase 1.3: reranker — chunk 기준 입력
# fusion 결과 doc_id로 chunks_by_doc에서 raw chunks 회수
t3 = time.perf_counter()
rerank_input: list[SearchResult] = []
for doc in fused_docs:
chunks = chunks_by_doc.get(doc.id, [])
if chunks:
# doc당 max 2 chunk (latency/VRAM 보호)
rerank_input.extend(chunks[:MAX_CHUNKS_PER_DOC])
else:
# text-only 매치 doc → doc 자체를 chunk처럼 wrap
rerank_input.append(doc)
if len(rerank_input) >= MAX_RERANK_INPUT:
break
rerank_input = rerank_input[:MAX_RERANK_INPUT]
notes.append(f"rerank input={len(rerank_input)}")
reranked = await rerank_chunks(q, rerank_input, limit * 3)
timing["rerank_ms"] = (time.perf_counter() - t3) * 1000
# diversity (chunk → doc 압축, max_per_doc=2, top score>0.90 unlimited)
t4 = time.perf_counter()
results = apply_diversity(reranked, max_per_doc=MAX_CHUNKS_PER_DOC)[:limit]
timing["diversity_ms"] = (time.perf_counter() - t4) * 1000
else:
# rerank 비활성: fused_docs를 그대로 (limit 적용)
results = fused_docs[:limit]
else:
results = text_results
# display score 정규화 — 프론트엔드는 score*100을 % 표시.
# fusion 내부 score(RRF는 0.01~0.05 범위)를 그대로 노출하면 표시가 깨짐.
normalize_display_scores(results)
timing["total_ms"] = (time.perf_counter() - t_total) * 1000
# confidence는 fusion 적용 전 raw 신호로 계산 (Phase 0.5 이후 fused score는 절대값 의미 없음)
# rerank 활성 시 reranker score가 가장 신뢰할 수 있는 신호 → 우선 사용
if mode == "hybrid":
if rerank and "rerank_ms" in timing:
confidence_signal = compute_confidence_reranked(results)
else:
confidence_signal = compute_confidence_hybrid(text_results, vector_results)
elif mode == "vector":
confidence_signal = compute_confidence(vector_results, "vector")
else:
confidence_signal = compute_confidence(text_results, mode)
# 사용자 feedback: 모든 단계 timing은 debug 응답과 별도로 항상 로그로 남긴다
timing_str = " ".join(f"{k}={v:.0f}" for k, v in timing.items())
fusion_str = f" fusion={fusion}" if mode == "hybrid" else ""
analyzer_str = (
f" analyzer=hit={analyzer_cache_hit}/conf={analyzer_confidence:.2f}/tier={analyzer_tier}"
if analyze
else ""
)
logger.info(
"search query=%r mode=%s%s%s results=%d conf=%.2f %s",
q[:80], mode, fusion_str, analyzer_str, len(results), confidence_signal, timing_str,
)
# Phase 0.3: 실패 자동 로깅 (응답 latency에 영향 X — background task)
# Phase 2.1: analyze=true일 때만 analyzer_confidence 전달 (False는 None → 기존 호환)
background_tasks.add_task(
record_search_event,
q,
user.id,
results,
mode,
confidence_signal,
analyzer_confidence if analyze else None,
)
debug_obj: SearchDebug | None = None
if debug:
debug_obj = SearchDebug(
timing_ms=timing,
text_candidates=_to_debug_candidates(text_results) if text_results or mode != "vector" else None,
vector_candidates=_to_debug_candidates(vector_results) if vector_results or mode in ("vector", "hybrid") else None,
fused_candidates=_to_debug_candidates(results) if mode == "hybrid" else None,
confidence=confidence_signal,
notes=notes,
query_analysis=query_analysis,
)
return SearchResponse(
results=results,
total=len(results),
query=q,
mode=mode,
debug=debug_obj,
)