diff --git a/app/api/search.py b/app/api/search.py index 5179b8c..d9a3235 100644 --- a/app/api/search.py +++ b/app/api/search.py @@ -23,7 +23,12 @@ from services.search.rerank_service import ( apply_diversity, rerank_chunks, ) -from services.search.retrieval_service import compress_chunks_to_docs, search_text, search_vector +from services.search.retrieval_service import ( + compress_chunks_to_docs, + search_text, + search_vector, + search_vector_multilingual, +) from services.search_telemetry import ( compute_confidence, compute_confidence_hybrid, @@ -180,9 +185,26 @@ async def search( + (" (bg triggered)" if triggered else " (bg inflight)") ) + # Phase 2.2: multilingual vector search 활성 조건 + # - cache hit + analyzer_tier == "analyzed" (≥0.85 고신뢰) + # - normalized_queries 2개 이상 (lang 다양성 있음) + # 그 외 케이스는 기존 single-query search_vector 그대로 사용 (회귀 0). + use_multilingual: bool = False + normalized_queries: list[dict] = [] + if analyzer_cache_hit and analyzer_tier == "analyzed" and query_analysis: + raw_nq = query_analysis.get("normalized_queries") or [] + if isinstance(raw_nq, list) and len(raw_nq) >= 2: + normalized_queries = [nq for nq in raw_nq if isinstance(nq, dict) and nq.get("text")] + if len(normalized_queries) >= 2: + use_multilingual = True + notes.append(f"multilingual langs={[nq.get('lang') for nq in normalized_queries]}") + if mode == "vector": t0 = time.perf_counter() - raw_chunks = await search_vector(session, q, limit) + if use_multilingual: + raw_chunks = await search_vector_multilingual(session, normalized_queries, limit) + else: + raw_chunks = await search_vector(session, q, limit) timing["vector_ms"] = (time.perf_counter() - t0) * 1000 if not raw_chunks: notes.append("vector_search_returned_empty (AI client error or no embeddings)") @@ -196,7 +218,10 @@ async def search( if mode == "hybrid": t1 = time.perf_counter() - raw_chunks = await search_vector(session, q, limit) + if use_multilingual: + raw_chunks = await search_vector_multilingual(session, normalized_queries, limit) + else: + raw_chunks = await search_vector(session, q, limit) timing["vector_ms"] = (time.perf_counter() - t1) * 1000 # chunk-level → doc-level 압축 (raw chunks는 chunks_by_doc에 보존) diff --git a/app/services/search/retrieval_service.py b/app/services/search/retrieval_service.py index 8097a00..7653c07 100644 --- a/app/services/search/retrieval_service.py +++ b/app/services/search/retrieval_service.py @@ -1,4 +1,4 @@ -"""검색 후보 수집 서비스 (Phase 1.2). +"""검색 후보 수집 서비스 (Phase 1.2 + Phase 2.2 multilingual). text(documents FTS + trigram) + vector(documents.embedding + chunks.embedding hybrid) 후보를 SearchResult 리스트로 반환. @@ -10,27 +10,80 @@ Phase 1.2-G: doc + chunks hybrid retrieval 보강. - documents.embedding (recall robust, 자연어 매칭 강함) - document_chunks.embedding (precision, segment 매칭) - 두 SQL 동시 호출 후 doc_id 기준 merge (chunk 가중치 1.2, doc 1.0) + +Phase 2.2 추가: + - _QUERY_EMBED_CACHE: bge-m3 query embedding 캐시 (모듈 레벨 LRU, TTL 24h) + - search_vector_multilingual: normalized_queries (lang별 쿼리) 배열 지원 + QueryAnalyzer cache hit + analyzer_tier >= merge 일 때만 호출. + - crosslingual_ko_en NDCG 0.53 → 0.65+ 목표 """ from __future__ import annotations import asyncio -from typing import TYPE_CHECKING +import hashlib +import time +from typing import TYPE_CHECKING, Any from sqlalchemy import text from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker from ai.client import AIClient from core.database import engine +from core.utils import setup_logger if TYPE_CHECKING: from api.search import SearchResult +logger = setup_logger("retrieval_service") + # Hybrid merge 가중치 (1.2-G) DOC_VECTOR_WEIGHT = 1.0 CHUNK_VECTOR_WEIGHT = 1.2 +# ─── Phase 2.2: Query embedding cache ─────────────────── +# bge-m3 호출 비용 절반 감소 (동일 normalized_query 재호출 방지) +_QUERY_EMBED_CACHE: dict[str, dict[str, Any]] = {} +QUERY_EMBED_TTL = 86400 # 24h +QUERY_EMBED_MAXSIZE = 500 + + +def _query_embed_key(text_: str) -> str: + return hashlib.sha256(f"{text_}|bge-m3".encode("utf-8")).hexdigest() + + +async def _get_query_embedding( + client: AIClient, text_: str +) -> list[float] | None: + """Query embedding with in-memory cache. + + 동일 텍스트 재호출 시 bge-m3 skip. fixed query 회귀 시 vector_ms 대폭 감소. + """ + if not text_: + return None + key = _query_embed_key(text_) + entry = _QUERY_EMBED_CACHE.get(key) + if entry and time.time() - entry["ts"] < QUERY_EMBED_TTL: + return entry["emb"] + try: + emb = await client.embed(text_) + except Exception as exc: + logger.warning("query embed failed text=%r err=%r", text_[:40], exc) + return None + if len(_QUERY_EMBED_CACHE) >= QUERY_EMBED_MAXSIZE: + try: + oldest = next(iter(_QUERY_EMBED_CACHE)) + _QUERY_EMBED_CACHE.pop(oldest, None) + except StopIteration: + pass + _QUERY_EMBED_CACHE[key] = {"emb": emb, "ts": time.time()} + return emb + + +def query_embed_cache_stats() -> dict[str, int]: + return {"size": len(_QUERY_EMBED_CACHE), "maxsize": QUERY_EMBED_MAXSIZE} + async def search_text( session: AsyncSession, query: str, limit: int @@ -153,11 +206,16 @@ async def search_vector( list[SearchResult] — doc_id 중복 제거됨. compress_chunks_to_docs는 그대로 동작. chunks_by_doc은 search.py에서 group_by_doc으로 보존. """ + client = AIClient() try: - client = AIClient() - query_embedding = await client.embed(query) - await client.close() - except Exception: + query_embedding = await _get_query_embedding(client, query) + finally: + try: + await client.close() + except Exception: + pass + + if query_embedding is None: return [] embedding_str = str(query_embedding) @@ -307,6 +365,100 @@ def _merge_doc_and_chunk_vectors( return sorted(by_doc_id.values(), key=lambda r: r.score, reverse=True) +async def search_vector_multilingual( + session: AsyncSession, + normalized_queries: list[dict], + limit: int, +) -> list["SearchResult"]: + """Phase 2.2 — 다국어 normalized_queries 배열로 vector retrieval. + + 각 language query에 대해 embedding을 병렬 생성(cache hit 활용), + 각 embedding에 대해 기존 docs+chunks hybrid 호출, + 결과를 weight 기반으로 merge. + + ⚠️ 호출 조건: + - QueryAnalyzer cache hit 이어야 함 (async-only 룰) + - analyzer_confidence 높고 normalized_queries 존재해야 함 + - search.py에서만 호출. retrieval 경로 동기 LLM 호출 금지 룰 준수. + + Args: + session: AsyncSession (호출자 관리, 본 함수 내부는 sessionmaker로 별도 연결 사용) + normalized_queries: [{"lang": "ko", "text": "...", "weight": 0.56}, ...] + weight는 _normalize_weights로 이미 합=1.0 정규화된 상태. + limit: 상위 결과 개수 + + Returns: + list[SearchResult] — doc_id 중복 제거. merged score = sum(per-query score * lang_weight). + """ + if not normalized_queries: + return [] + + # 1. 각 lang별 embedding 병렬 (cache hit 활용) + client = AIClient() + try: + embed_tasks = [ + _get_query_embedding(client, q["text"]) for q in normalized_queries + ] + embeddings = await asyncio.gather(*embed_tasks) + finally: + try: + await client.close() + except Exception: + pass + + # embedding 실패한 query는 skip (weight 재정규화 없이 조용히 drop) + per_query_plan: list[tuple[dict, str]] = [] + for q, emb in zip(normalized_queries, embeddings): + if emb is None: + logger.warning("multilingual embed skipped lang=%s", q.get("lang")) + continue + per_query_plan.append((q, str(emb))) + + if not per_query_plan: + return [] + + # 2. 각 embedding에 대해 doc + chunks 병렬 retrieval + Session = async_sessionmaker(engine) + + async def _one_query(q_meta: dict, embedding_str: str) -> list["SearchResult"]: + async def _docs() -> list["SearchResult"]: + async with Session() as s: + return await _search_vector_docs(s, embedding_str, limit * 4) + + async def _chunks() -> list["SearchResult"]: + async with Session() as s: + return await _search_vector_chunks(s, embedding_str, limit * 4) + + doc_r, chunk_r = await asyncio.gather(_docs(), _chunks()) + return _merge_doc_and_chunk_vectors(doc_r, chunk_r) + + per_query_results = await asyncio.gather( + *(_one_query(q, emb_str) for q, emb_str in per_query_plan) + ) + + # 3. weight 기반 merge — doc_id 중복 시 weighted score 합산 + merged: dict[int, "SearchResult"] = {} + for (q_meta, _emb_str), results in zip(per_query_plan, per_query_results): + weight = float(q_meta.get("weight", 1.0) or 1.0) + for r in results: + weighted = r.score * weight + prev = merged.get(r.id) + if prev is None: + # 첫 방문: 원본을 shallow copy 대신 직접 wrap + r.score = weighted + r.match_reason = f"ml_{q_meta.get('lang', '?')}" + merged[r.id] = r + else: + # 중복: score 누적, 가장 높은 weight 소스로 match_reason 표시 + prev.score += weighted + # match_reason 병합 (가독성) + if q_meta.get("lang") and q_meta.get("lang") not in (prev.match_reason or ""): + prev.match_reason = (prev.match_reason or "ml") + f"+{q_meta['lang']}" + + sorted_results = sorted(merged.values(), key=lambda r: r.score, reverse=True) + return sorted_results[: limit * 4] # rerank 후보로 넉넉히 + + def compress_chunks_to_docs( chunks: list["SearchResult"], limit: int ) -> tuple[list["SearchResult"], dict[int, list["SearchResult"]]]: