0c7211e24b
검색 retrieval 에 domain_bucket(377) 포함/제외 필터 추가. - AxisFilter.domain_buckets(= ANY) / exclude_buckets(<> ALL) + active() - _axis_sql 2절 — 전 leg documents alias(d / chunk df JOIN) 경유, 미지정시 byte-불변(무회귀) - search.py: domain_bucket / exclude_bucket Query 파라미터(CSV) 검증: exclude_bucket=News → News 0건(금리 10→0·인공지능 15→0·반도체 11→0), domain_bucket=Safety → Knowledge/Industrial_Safety 드리프트까지 정규화 포함. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
347 lines
14 KiB
Python
347 lines
14 KiB
Python
"""하이브리드 검색 API — thin endpoint (Phase 3.1 이후).
|
|
|
|
실제 검색 파이프라인(retrieval → fusion → rerank → diversity → confidence)
|
|
은 `services/search/search_pipeline.py::run_search()` 로 분리되어 있다.
|
|
이 파일은 다음만 담당:
|
|
- Pydantic 스키마 (SearchResult / SearchResponse / SearchDebug / DebugCandidate)
|
|
- `/search` endpoint wrapper (run_search 호출 + logger + telemetry + 직렬화)
|
|
"""
|
|
|
|
from datetime import date
|
|
from typing import Annotated
|
|
|
|
from fastapi import APIRouter, BackgroundTasks, Depends, Query
|
|
from fastapi.responses import JSONResponse
|
|
from pydantic import BaseModel
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from core.auth import get_current_user
|
|
from core.database import get_session
|
|
from core.utils import setup_logger
|
|
from models.user import User
|
|
from services.search.fusion_service import DEFAULT_FUSION
|
|
from services.search import query_rewriter
|
|
from services.search.retrieval_service import AxisFilter
|
|
from services.search.result_decorate import compute_facets, decorate_version_status
|
|
from services.search.search_pipeline import PipelineResult, run_search
|
|
from services.search_telemetry import record_search_event
|
|
|
|
# logs/search.log + stdout 동시 출력 (Phase 0.4)
|
|
logger = setup_logger("search")
|
|
|
|
router = APIRouter()
|
|
|
|
|
|
class SearchResult(BaseModel):
|
|
"""검색 결과 단일 행.
|
|
|
|
Phase 1.2-C: chunk-level vector retrieval 도입으로 chunk 메타 필드 추가.
|
|
text 검색 결과는 chunk_id 등이 None (doc-level).
|
|
vector 검색 결과는 chunk_id 등이 채워짐 (chunk-level).
|
|
"""
|
|
|
|
id: int # doc_id (text/vector 공통)
|
|
title: str | None
|
|
ai_domain: str | None
|
|
ai_summary: str | None
|
|
file_format: str
|
|
score: float
|
|
snippet: str | None
|
|
match_reason: str | None = None
|
|
# Phase 1.2-C: chunk 메타 (vector 검색 시 채워짐)
|
|
chunk_id: int | None = None
|
|
chunk_index: int | None = None
|
|
section_title: str | None = None
|
|
# Phase 3.1: reranker raw score 보존 (display score drift 방지).
|
|
# rerank 경로를 탄 chunk에만 채워짐. normalize_display_scores는 이 필드를
|
|
# 건드리지 않는다. Phase 3 evidence fast-path 판단에 사용.
|
|
rerank_score: float | None = None
|
|
# PR-RAG-Time-1: freshness decay 디버그 메타. apply_freshness_decay 가 채움.
|
|
# 비적용 row 도 채워짐(freshness_policy=None). base_score 는 항상 보존.
|
|
freshness_debug: dict | None = None
|
|
# 안전 자료실 C-1: 분류 축 메타 (3 leg SELECT 에서 채움 — additive, ranking 무관).
|
|
# D-1 UI 결과 카드 유형별 렌더 + 해외 법령(B-5) 가동 시 국가 무표지 혼재 차단의 선행 조건.
|
|
material_type: str | None = None
|
|
jurisdiction: str | None = None
|
|
published_date: date | None = None
|
|
# 안전 자료실 C-1 후속: 법령 버전 상태(legal_meta.version_status) — wrapper 1회 decorate.
|
|
# law 결과만 채워짐(legal_meta 위성), 그 외/무매핑 law = None. D-1 버전 뱃지 선행.
|
|
version_status: str | None = None
|
|
|
|
|
|
# ─── Phase 0.4: 디버그 응답 스키마 ─────────────────────────
|
|
|
|
|
|
class DebugCandidate(BaseModel):
|
|
"""단계별 후보 (debug=true 응답에서만 노출)."""
|
|
id: int
|
|
rank: int
|
|
score: float
|
|
match_reason: str | None = None
|
|
|
|
|
|
class SearchDebug(BaseModel):
|
|
timing_ms: dict[str, float]
|
|
text_candidates: list[DebugCandidate] | None = None
|
|
vector_candidates: list[DebugCandidate] | None = None
|
|
fused_candidates: list[DebugCandidate] | None = None
|
|
confidence: float
|
|
notes: list[str] = []
|
|
# Phase 1/2 도입 후 채워질 placeholder
|
|
query_analysis: dict | None = None
|
|
reranker_scores: list[DebugCandidate] | None = None
|
|
|
|
|
|
class SearchResponse(BaseModel):
|
|
results: list[SearchResult]
|
|
total: int
|
|
query: str
|
|
mode: str
|
|
debug: SearchDebug | None = None
|
|
# 안전 자료실 C-1 후속: facets=true 일 때만 채워짐(미요청=None, byte 불변).
|
|
# top-K 결과 내 분류 축 분포 라벨 {axis: {label: count}}.
|
|
facets: dict[str, dict[str, int]] | None = None
|
|
|
|
|
|
def _to_debug_candidates(rows: list[SearchResult], n: int = 20) -> list[DebugCandidate]:
|
|
return [
|
|
DebugCandidate(
|
|
id=r.id, rank=i + 1, score=r.score, match_reason=r.match_reason
|
|
)
|
|
for i, r in enumerate(rows[:n])
|
|
]
|
|
|
|
|
|
def _build_search_debug(pr: PipelineResult) -> SearchDebug:
|
|
"""PipelineResult → SearchDebug (기존 search()의 debug 구성 블록 복사)."""
|
|
return SearchDebug(
|
|
timing_ms=pr.timing_ms,
|
|
text_candidates=(
|
|
_to_debug_candidates(pr.text_results)
|
|
if pr.text_results or pr.mode != "vector"
|
|
else None
|
|
),
|
|
vector_candidates=(
|
|
_to_debug_candidates(pr.vector_results)
|
|
if pr.vector_results or pr.mode in ("vector", "hybrid")
|
|
else None
|
|
),
|
|
fused_candidates=(
|
|
_to_debug_candidates(pr.results) if pr.mode == "hybrid" else None
|
|
),
|
|
confidence=pr.confidence_signal,
|
|
notes=pr.notes,
|
|
query_analysis=pr.query_analysis,
|
|
)
|
|
|
|
|
|
@router.get("/", response_model=SearchResponse)
|
|
async def search(
|
|
q: str,
|
|
user: Annotated[User, Depends(get_current_user)],
|
|
session: Annotated[AsyncSession, Depends(get_session)],
|
|
background_tasks: BackgroundTasks,
|
|
mode: str = Query("hybrid", pattern="^(fts|trgm|vector|hybrid)$"),
|
|
limit: int = Query(20, ge=1, le=100),
|
|
fusion: str = Query(
|
|
DEFAULT_FUSION,
|
|
pattern="^(legacy|rrf|rrf_boost)$",
|
|
description="hybrid 모드 fusion 전략 (legacy=기존 가중합, rrf=RRF k=60, rrf_boost=RRF+강한신호 boost)",
|
|
),
|
|
rerank: bool = Query(
|
|
True,
|
|
description="bge-reranker-v2-m3 활성화 (Phase 1.3, hybrid 모드만 동작)",
|
|
),
|
|
analyze: bool = Query(
|
|
False,
|
|
description="QueryAnalyzer 활성화 (Phase 2.1, LLM 호출). Phase 2.1은 debug 노출만, 검색 경로 영향 X",
|
|
),
|
|
debug: bool = Query(False, description="단계별 candidates + timing 응답에 포함"),
|
|
embedding_backend: str | None = Query(
|
|
None,
|
|
pattern=r"^(baseline|cand_[a-z0-9_]+)$",
|
|
description="Phase 2A Diagnose dispatcher (R2-2 + R2-B1). slug 만 받음 (raw table name X). baseline|cand_<slug>. 미지정/baseline = production path.",
|
|
),
|
|
snapshot_doc_id_max: int | None = Query(
|
|
None, ge=1,
|
|
description="Phase 2A snapshot freeze (R2-D + R2-B2). documents.id <= 값 filter. baseline 측정 시에도 동일 filter 적용.",
|
|
),
|
|
snapshot_chunk_id_max: int | None = Query(
|
|
None, ge=1,
|
|
description="Phase 2A snapshot freeze (R2-D + R2-B2). document_chunks.id <= 값 filter. baseline 측정 시에도 동일 filter 적용.",
|
|
),
|
|
reranker_backend: str | None = Query(
|
|
None,
|
|
pattern=r"^(baseline|cand_[a-z0-9_]+)$",
|
|
description="Phase 2B Diagnose reranker dispatcher (R2-B1 slug-based). slug 만 받음 (raw endpoint URL X). baseline|cand_<slug>. 미지정/baseline = production reranker.",
|
|
),
|
|
rewrite_backend: str | None = Query(
|
|
None,
|
|
pattern=r"^(baseline|cand_[a-z0-9_]+)$",
|
|
description=(
|
|
"⚠️ EXPERIMENTAL / DEPRECATED (Phase 2Q closed 2026-05-24 as evaluated experiment). "
|
|
"Result-level dedup 정정 후 net gain marginal (NDCG +0.019, Recall t≥2 +0.030) "
|
|
"vs latency cost 큼 (cold +876%, warm +320%). default production rollout 권고 X. "
|
|
"slug-based, no silent fallback. baseline|cand_multi_query_macmini|cand_multi_query_macbook. "
|
|
"미지정/baseline = single-query path (회귀 0 invariant, 권장 default). "
|
|
"opt-in 실험 reference 만 유지 — docs/phase_2q_apply_opt_in.md 의 closed status 참조."
|
|
),
|
|
),
|
|
corpus_variant: str | None = Query(
|
|
None,
|
|
pattern=r"^(prehier|hier_sim_raw|hier_sim_clean)$",
|
|
description=(
|
|
"⚠️ EVAL ONLY (Hier-Replace-Diagnose-1). chunk leg 를 측정 뷰로 교체 — "
|
|
"prehier(legacy baseline) | hier_sim_raw | hier_sim_clean(childless-tiny 제외). "
|
|
"doc-level + fts/trgm 는 documents 테이블 = 변종 무관. 미지정 = production corpus_chunks. "
|
|
"embedding_backend cand 와 동시 사용 불가 (400)."
|
|
),
|
|
),
|
|
exact_knn: bool = Query(
|
|
False,
|
|
description=(
|
|
"⚠️ EVAL ONLY (Hier-Replace-Diagnose-1). vector leg 에 SET LOCAL enable_indexscan/"
|
|
"bitmapscan=off → ivfflat 근사 제거(exact seqscan). prehier vs hier_sim 의 index 변수 "
|
|
"분리용. production 검색에는 사용 금지 (latency 큼)."
|
|
),
|
|
),
|
|
material_type: str | None = Query(
|
|
None, description="안전 자료실 C-1: 자료유형 필터 CSV (law,paper,incident,...). material_type = ANY"),
|
|
jurisdiction: str | None = Query(
|
|
None, description="안전 자료실 C-1: 관할 필터 (KR/US/EU/JP/GB/INT)"),
|
|
year_from: int | None = Query(None, ge=1900, le=2100, description="published_date 연도 하한 (NULL=created_at fallback)"),
|
|
year_to: int | None = Query(None, ge=1900, le=2100, description="published_date 연도 상한"),
|
|
domain_bucket: str | None = Query(None, description="377: domain_bucket 스코프 CSV (Safety,Engineering,Law,Philosophy,Programming,General,News). domain_bucket = ANY"),
|
|
exclude_bucket: str | None = Query(None, description="377: domain_bucket 제외 CSV (예: News). 지식질의 시 News 기본제외용"),
|
|
facets: bool = Query(False, description="안전 자료실 C-1 후속: top-K 결과 분류 축 분포(material_type/jurisdiction/version_status)를 응답 facets 에 집계. 미지정=계산/노출 0"),
|
|
):
|
|
"""문서 검색 — FTS + ILIKE + 벡터 결합 (Phase 3.1 이후 run_search wrapper)"""
|
|
try:
|
|
axis = AxisFilter(
|
|
material_types=[m.strip() for m in material_type.split(",") if m.strip()]
|
|
if material_type else None,
|
|
jurisdiction=jurisdiction,
|
|
year_from=year_from,
|
|
year_to=year_to,
|
|
domain_buckets=[b.strip() for b in domain_bucket.split(",") if b.strip()] if domain_bucket else None,
|
|
exclude_buckets=[b.strip() for b in exclude_bucket.split(",") if b.strip()] if exclude_bucket else None,
|
|
)
|
|
pr = await run_search(
|
|
session,
|
|
q,
|
|
mode=mode, # type: ignore[arg-type]
|
|
limit=limit,
|
|
fusion=fusion,
|
|
rerank=rerank,
|
|
analyze=analyze,
|
|
embedding_backend=embedding_backend,
|
|
snapshot_doc_id_max=snapshot_doc_id_max,
|
|
snapshot_chunk_id_max=snapshot_chunk_id_max,
|
|
reranker_backend=reranker_backend,
|
|
rewrite_backend=rewrite_backend,
|
|
corpus_variant=corpus_variant,
|
|
exact_knn=exact_knn,
|
|
axis=axis,
|
|
)
|
|
except ValueError as e:
|
|
# _resolve_backend / _resolve_reranker / _resolve_rewrite_backend / _resolve_corpus_variant unknown slug → HTTP 400
|
|
msg = str(e)
|
|
if msg.startswith("unknown_corpus_variant") or msg.startswith("corpus_variant_incompatible"):
|
|
return JSONResponse(
|
|
status_code=400,
|
|
content={
|
|
"error_reason": msg.split(":")[0].split(" ")[0],
|
|
"corpus_variant_requested": corpus_variant,
|
|
"allowed": ["prehier", "hier_sim_raw", "hier_sim_clean"],
|
|
"detail": msg,
|
|
},
|
|
)
|
|
if msg.startswith("unknown_rewrite_backend"):
|
|
return JSONResponse(
|
|
status_code=400,
|
|
content={
|
|
"error_reason": "unknown_rewrite_backend",
|
|
"backend_requested": rewrite_backend,
|
|
"allowed": query_rewriter.allowed_slugs(),
|
|
"detail": msg,
|
|
},
|
|
)
|
|
if msg.startswith("unknown_reranker_backend"):
|
|
return JSONResponse(
|
|
status_code=400,
|
|
content={
|
|
"error_reason": "unknown_reranker_backend",
|
|
"backend_requested": reranker_backend,
|
|
"allowed": ["baseline", "cand_gte_ml_base"],
|
|
"detail": msg,
|
|
},
|
|
)
|
|
return JSONResponse(
|
|
status_code=400,
|
|
content={
|
|
"error_reason": "unknown_embedding_backend",
|
|
"backend_requested": embedding_backend,
|
|
"allowed": ["baseline"],
|
|
"detail": msg,
|
|
},
|
|
)
|
|
except RuntimeError as e:
|
|
# query_rewriter.rewrite() 실패 (LLM unavailable / parse fail) → HTTP 503
|
|
msg = str(e)
|
|
if msg.startswith("rewrite_llm_unavailable"):
|
|
return JSONResponse(
|
|
status_code=503,
|
|
content={
|
|
"error_reason": "rewrite_llm_unavailable",
|
|
"backend_requested": rewrite_backend,
|
|
"detail": msg,
|
|
},
|
|
)
|
|
raise
|
|
|
|
# 사용자 feedback: 모든 단계 timing은 debug 응답과 별도로 항상 로그로 남긴다
|
|
timing_str = " ".join(f"{k}={v:.0f}" for k, v in pr.timing_ms.items())
|
|
fusion_str = f" fusion={fusion}" if mode == "hybrid" else ""
|
|
analyzer_str = (
|
|
f" analyzer=hit={pr.analyzer_cache_hit}/conf={pr.analyzer_confidence:.2f}/tier={pr.analyzer_tier}"
|
|
if analyze
|
|
else ""
|
|
)
|
|
logger.info(
|
|
"search query=%r mode=%s%s%s results=%d conf=%.2f %s",
|
|
q[:80],
|
|
pr.mode,
|
|
fusion_str,
|
|
analyzer_str,
|
|
len(pr.results),
|
|
pr.confidence_signal,
|
|
timing_str,
|
|
)
|
|
|
|
# Phase 0.3: 실패 자동 로깅 (응답 latency에 영향 X — background task)
|
|
# Phase 2.1: analyze=true일 때만 analyzer_confidence 전달 (False는 None → 기존 호환)
|
|
background_tasks.add_task(
|
|
record_search_event,
|
|
q,
|
|
user.id,
|
|
pr.results,
|
|
pr.mode,
|
|
pr.confidence_signal,
|
|
pr.analyzer_confidence if analyze else None,
|
|
)
|
|
|
|
debug_obj = _build_search_debug(pr) if debug else None
|
|
|
|
# 안전 자료실 C-1 후속 — wrapper decoration (검색 코어 무접촉, ranking 무관)
|
|
await decorate_version_status(session, pr.results) # 법령 결과에 version_status
|
|
facets_obj = compute_facets(pr.results) if facets else None
|
|
|
|
return SearchResponse(
|
|
results=pr.results,
|
|
total=len(pr.results),
|
|
query=q,
|
|
mode=pr.mode,
|
|
debug=debug_obj,
|
|
facets=facets_obj,
|
|
)
|