Files
hyungi_document_server/app/services/search/retrieval_service.py
Hyungi Ahn f5c3dea833 feat(search): Phase 2.2 multilingual vector retrieval + query embed cache
## 변경 사항

### app/services/search/retrieval_service.py
 - **_QUERY_EMBED_CACHE**: 모듈 레벨 LRU (maxsize=500, TTL=24h)
   - sha256(text|bge-m3) 키. fixed query 재호출 시 vector_ms 절반 감소.
 - **_get_query_embedding(client, text)**: cache-first helper. 기존 search_vector()도 이를 사용하도록 교체.
 - **search_vector_multilingual(session, normalized_queries, limit)**: 신규
   - normalized_queries 각 언어별 embedding 병렬 생성 (cache hit 활용)
   - 각 embedding에 대해 docs+chunks hybrid retrieval 병렬
   - weight 기반 score 누적 merge (lang_weight 이미 1.0 정규화)
   - match_reason에 "ml_ko+en" 등 언어 병합 표시
   - 호출 조건 문서화 — cache hit + analyzer_tier=analyzed 시에만

### app/api/search.py
 - use_multilingual 결정 로직:
   - analyzer_cache_hit == True
   - analyzer_tier == "analyzed" (confidence >= 0.85)
   - normalized_queries >= 2 (다언어 버전 실제 존재)
 - 위 3조건 모두 만족할 때만 search_vector_multilingual 호출
 - 그 외 모든 경로 (cache miss, low conf, single lang)는 기존 search_vector 그대로 사용 (회귀 0 보장)
 - notes에 `multilingual langs=[ko, en, ...]` 기록

## 기대 효과
 - crosslingual_ko_en NDCG 0.53 → 0.65+ (Phase 2 목표)
 - 기존 경로 완전 불변 → 회귀 0
 - Phase 2.1 async 구조와 결합해 "cache hit일 때만 활성" 조건 준수

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-08 14:59:20 +09:00

495 lines
19 KiB
Python

"""검색 후보 수집 서비스 (Phase 1.2 + Phase 2.2 multilingual).
text(documents FTS + trigram) + vector(documents.embedding + chunks.embedding hybrid) 후보를
SearchResult 리스트로 반환.
Phase 1.1a: search.py의 _search_text/_search_vector를 이전 (ILIKE 그대로).
Phase 1.2-B: ILIKE → trigram `%` + `similarity()`. ILIKE 풀 스캔 제거.
Phase 1.2-C: vector retrieval을 document_chunks 테이블로 전환 → catastrophic recall 손실.
Phase 1.2-G: doc + chunks hybrid retrieval 보강.
- documents.embedding (recall robust, 자연어 매칭 강함)
- document_chunks.embedding (precision, segment 매칭)
- 두 SQL 동시 호출 후 doc_id 기준 merge (chunk 가중치 1.2, doc 1.0)
Phase 2.2 추가:
- _QUERY_EMBED_CACHE: bge-m3 query embedding 캐시 (모듈 레벨 LRU, TTL 24h)
- search_vector_multilingual: normalized_queries (lang별 쿼리) 배열 지원
QueryAnalyzer cache hit + analyzer_tier >= merge 일 때만 호출.
- crosslingual_ko_en NDCG 0.53 → 0.65+ 목표
"""
from __future__ import annotations
import asyncio
import hashlib
import time
from typing import TYPE_CHECKING, Any
from sqlalchemy import text
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker
from ai.client import AIClient
from core.database import engine
from core.utils import setup_logger
if TYPE_CHECKING:
from api.search import SearchResult
logger = setup_logger("retrieval_service")
# Hybrid merge 가중치 (1.2-G)
DOC_VECTOR_WEIGHT = 1.0
CHUNK_VECTOR_WEIGHT = 1.2
# ─── Phase 2.2: Query embedding cache ───────────────────
# bge-m3 호출 비용 절반 감소 (동일 normalized_query 재호출 방지)
_QUERY_EMBED_CACHE: dict[str, dict[str, Any]] = {}
QUERY_EMBED_TTL = 86400 # 24h
QUERY_EMBED_MAXSIZE = 500
def _query_embed_key(text_: str) -> str:
return hashlib.sha256(f"{text_}|bge-m3".encode("utf-8")).hexdigest()
async def _get_query_embedding(
client: AIClient, text_: str
) -> list[float] | None:
"""Query embedding with in-memory cache.
동일 텍스트 재호출 시 bge-m3 skip. fixed query 회귀 시 vector_ms 대폭 감소.
"""
if not text_:
return None
key = _query_embed_key(text_)
entry = _QUERY_EMBED_CACHE.get(key)
if entry and time.time() - entry["ts"] < QUERY_EMBED_TTL:
return entry["emb"]
try:
emb = await client.embed(text_)
except Exception as exc:
logger.warning("query embed failed text=%r err=%r", text_[:40], exc)
return None
if len(_QUERY_EMBED_CACHE) >= QUERY_EMBED_MAXSIZE:
try:
oldest = next(iter(_QUERY_EMBED_CACHE))
_QUERY_EMBED_CACHE.pop(oldest, None)
except StopIteration:
pass
_QUERY_EMBED_CACHE[key] = {"emb": emb, "ts": time.time()}
return emb
def query_embed_cache_stats() -> dict[str, int]:
return {"size": len(_QUERY_EMBED_CACHE), "maxsize": QUERY_EMBED_MAXSIZE}
async def search_text(
session: AsyncSession, query: str, limit: int
) -> list["SearchResult"]:
"""FTS + trigram 필드별 가중치 검색 (Phase 1.2-B UNION 분해).
Phase 1.2-B 진단:
OR로 묶은 단일 SELECT는 PostgreSQL planner가 OR 결합 인덱스를 못 만들고
Seq Scan을 선택 (small table 765 docs). EXPLAIN으로 측정 시 525ms.
→ CTE + UNION으로 분해하면 각 branch가 자기 인덱스 활용 → 26ms (95% 감소).
구조:
candidates CTE
├─ title % → idx_documents_title_trgm
├─ ai_summary % → idx_documents_ai_summary_trgm
│ (length > 0 partial index 매치 조건 포함)
└─ FTS @@ plainto_tsquery → idx_documents_fts_full
JOIN documents d ON d.id = c.id
ORDER BY 5컬럼 similarity 가중 합산 + ts_rank * 2.0
가중치: title 3.0 / ai_tags 2.5 / user_note 2.0 / ai_summary 1.5 / extracted_text 1.0
threshold:
pg_trgm.similarity_threshold default = 0.3
→ multi-token 한국어 뉴스 쿼리(예: "이란 미국 전쟁 글로벌 반응")에서
candidates를 못 모음 → recall 감소 (0.788 → 0.750)
→ set_limit(0.15)으로 낮춰 recall 회복. precision은 ORDER BY similarity 합산이 보정.
"""
from api.search import SearchResult # 순환 import 회피
# trigram threshold를 0.15로 낮춰 multi-token query recall 회복
# SQLAlchemy async session 내 두 execute는 같은 connection 사용
await session.execute(text("SELECT set_limit(0.15)"))
result = await session.execute(
text("""
WITH candidates AS (
-- title trigram (idx_documents_title_trgm)
SELECT id FROM documents
WHERE deleted_at IS NULL AND title % :q
UNION
-- ai_summary trigram (idx_documents_ai_summary_trgm 부분 인덱스 매치)
SELECT id FROM documents
WHERE deleted_at IS NULL
AND ai_summary IS NOT NULL
AND length(ai_summary) > 0
AND ai_summary % :q
UNION
-- FTS 통합 인덱스 (idx_documents_fts_full)
SELECT id FROM documents
WHERE deleted_at IS NULL
AND to_tsvector('simple',
coalesce(title, '') || ' ' ||
coalesce(ai_tags::text, '') || ' ' ||
coalesce(ai_summary, '') || ' ' ||
coalesce(user_note, '') || ' ' ||
coalesce(extracted_text, '')
) @@ plainto_tsquery('simple', :q)
)
SELECT d.id, d.title, d.ai_domain, d.ai_summary, d.file_format,
left(d.extracted_text, 200) AS snippet,
(
-- 컬럼별 trigram similarity 가중 합산
similarity(coalesce(d.title, ''), :q) * 3.0
+ similarity(coalesce(d.ai_tags::text, ''), :q) * 2.5
+ similarity(coalesce(d.user_note, ''), :q) * 2.0
+ similarity(coalesce(d.ai_summary, ''), :q) * 1.5
+ similarity(coalesce(d.extracted_text, ''), :q) * 1.0
-- FTS 보너스 (idx_documents_fts_full 활용)
+ coalesce(ts_rank(
to_tsvector('simple',
coalesce(d.title, '') || ' ' ||
coalesce(d.ai_tags::text, '') || ' ' ||
coalesce(d.ai_summary, '') || ' ' ||
coalesce(d.user_note, '') || ' ' ||
coalesce(d.extracted_text, '')
),
plainto_tsquery('simple', :q)
), 0) * 2.0
) AS score,
-- match_reason: similarity 가장 큰 컬럼 또는 FTS
CASE
WHEN similarity(coalesce(d.title, ''), :q) >= 0.3 THEN 'title'
WHEN similarity(coalesce(d.ai_tags::text, ''), :q) >= 0.3 THEN 'tags'
WHEN similarity(coalesce(d.user_note, ''), :q) >= 0.3 THEN 'note'
WHEN similarity(coalesce(d.ai_summary, ''), :q) >= 0.3 THEN 'summary'
WHEN similarity(coalesce(d.extracted_text, ''), :q) >= 0.3 THEN 'content'
ELSE 'fts'
END AS match_reason
FROM documents d
JOIN candidates c ON d.id = c.id
ORDER BY score DESC
LIMIT :limit
"""),
{"q": query, "limit": limit},
)
return [SearchResult(**row._mapping) for row in result]
async def search_vector(
session: AsyncSession, query: str, limit: int
) -> list["SearchResult"]:
"""Hybrid 벡터 검색 — doc + chunks 동시 retrieval (Phase 1.2-G).
Phase 1.2-C 진단:
chunks-only는 segment 의미 손실로 자연어 query에서 catastrophic recall.
doc embedding은 전체 본문 평균 → recall robust.
→ 두 retrieval 동시 사용이 정석.
데이터 흐름:
1. query embedding 1번 (bge-m3)
2. asyncio.gather로 두 SQL 동시 호출:
- _search_vector_docs: documents.embedding cosine top N
- _search_vector_chunks: document_chunks.embedding window partition (doc당 top 2)
3. _merge_doc_and_chunk_vectors로 가중치 + dedup:
- chunk score * 1.2 (precision)
- doc score * 1.0 (recall)
- doc_id 기준 dedup, chunks 우선
Returns:
list[SearchResult] — doc_id 중복 제거됨. compress_chunks_to_docs는 그대로 동작.
chunks_by_doc은 search.py에서 group_by_doc으로 보존.
"""
client = AIClient()
try:
query_embedding = await _get_query_embedding(client, query)
finally:
try:
await client.close()
except Exception:
pass
if query_embedding is None:
return []
embedding_str = str(query_embedding)
# 두 SQL 병렬 호출 — 각각 별도 session 사용 (asyncpg connection은 statement 단위 직렬)
Session = async_sessionmaker(engine)
async def _docs_call() -> list["SearchResult"]:
async with Session() as s:
return await _search_vector_docs(s, embedding_str, limit * 4)
async def _chunks_call() -> list["SearchResult"]:
async with Session() as s:
return await _search_vector_chunks(s, embedding_str, limit * 4)
doc_results, chunk_results = await asyncio.gather(_docs_call(), _chunks_call())
return _merge_doc_and_chunk_vectors(doc_results, chunk_results)
async def _search_vector_docs(
session: AsyncSession, embedding_str: str, limit: int
) -> list["SearchResult"]:
"""documents.embedding 직접 검색 — recall robust (자연어 매칭).
chunks가 없는 doc도 매칭 가능. score는 cosine similarity (1 - distance).
chunk_id/chunk_index/section_title은 None.
"""
from api.search import SearchResult # 순환 import 회피
result = await session.execute(
text("""
SELECT
id,
title,
ai_domain,
ai_summary,
file_format,
(1 - (embedding <=> cast(:embedding AS vector))) AS score,
left(extracted_text, 200) AS snippet,
'vector_doc' AS match_reason,
NULL::bigint AS chunk_id,
NULL::integer AS chunk_index,
NULL::text AS section_title
FROM documents
WHERE embedding IS NOT NULL AND deleted_at IS NULL
ORDER BY embedding <=> cast(:embedding AS vector)
LIMIT :limit
"""),
{"embedding": embedding_str, "limit": limit},
)
return [SearchResult(**row._mapping) for row in result]
async def _search_vector_chunks(
session: AsyncSession, embedding_str: str, limit: int
) -> list["SearchResult"]:
"""document_chunks.embedding 검색 + window partition (doc당 top 2 chunks).
SQL 흐름:
1. inner CTE topk: ivfflat 인덱스로 top-K chunks 추출
2. ranked CTE: doc_id PARTITION + ROW_NUMBER (score 내림차순)
3. outer: rn <= 2 (doc당 max 2 chunks) + JOIN documents
"""
from api.search import SearchResult # 순환 import 회피
inner_k = max(limit * 5, 500)
result = await session.execute(
text("""
WITH topk AS (
SELECT
c.id AS chunk_id,
c.doc_id,
c.chunk_index,
c.section_title,
c.text,
c.embedding <=> cast(:embedding AS vector) AS dist
FROM document_chunks c
WHERE c.embedding IS NOT NULL
ORDER BY c.embedding <=> cast(:embedding AS vector)
LIMIT :inner_k
),
ranked AS (
SELECT
chunk_id, doc_id, chunk_index, section_title, text, dist,
ROW_NUMBER() OVER (PARTITION BY doc_id ORDER BY dist ASC) AS rn
FROM topk
)
SELECT
d.id AS id,
d.title AS title,
d.ai_domain AS ai_domain,
d.ai_summary AS ai_summary,
d.file_format AS file_format,
(1 - r.dist) AS score,
left(r.text, 200) AS snippet,
'vector_chunk' AS match_reason,
r.chunk_id AS chunk_id,
r.chunk_index AS chunk_index,
r.section_title AS section_title
FROM ranked r
JOIN documents d ON d.id = r.doc_id
WHERE r.rn <= 2 AND d.deleted_at IS NULL
ORDER BY r.dist
LIMIT :limit
"""),
{"embedding": embedding_str, "inner_k": inner_k, "limit": limit},
)
return [SearchResult(**row._mapping) for row in result]
def _merge_doc_and_chunk_vectors(
doc_results: list["SearchResult"],
chunk_results: list["SearchResult"],
) -> list["SearchResult"]:
"""doc + chunks vector 결과 merge (Phase 1.2-G).
가중치:
- chunk score * 1.2 (segment 매칭이 더 정확)
- doc score * 1.0 (전체 본문 평균, recall 보완)
Dedup:
- doc_id 기준
- chunks가 있으면 chunks 우선 (segment 정보 + chunk_id 보존)
- chunks에 없는 doc은 doc-wrap으로 추가
Returns:
score 내림차순 정렬된 SearchResult 리스트.
chunk_id가 None이면 doc-wrap 결과(text-only 매치 doc 처리에 사용).
"""
by_doc_id: dict[int, "SearchResult"] = {}
# chunks 먼저 (가중치 적용 + chunk_id 보존)
for c in chunk_results:
c.score = c.score * CHUNK_VECTOR_WEIGHT
prev = by_doc_id.get(c.id)
if prev is None or c.score > prev.score:
by_doc_id[c.id] = c
# doc 매치는 chunks에 없는 doc만 추가 (chunks 우선 원칙)
for d in doc_results:
d.score = d.score * DOC_VECTOR_WEIGHT
if d.id not in by_doc_id:
by_doc_id[d.id] = d
# score 내림차순 정렬
return sorted(by_doc_id.values(), key=lambda r: r.score, reverse=True)
async def search_vector_multilingual(
session: AsyncSession,
normalized_queries: list[dict],
limit: int,
) -> list["SearchResult"]:
"""Phase 2.2 — 다국어 normalized_queries 배열로 vector retrieval.
각 language query에 대해 embedding을 병렬 생성(cache hit 활용),
각 embedding에 대해 기존 docs+chunks hybrid 호출,
결과를 weight 기반으로 merge.
⚠️ 호출 조건:
- QueryAnalyzer cache hit 이어야 함 (async-only 룰)
- analyzer_confidence 높고 normalized_queries 존재해야 함
- search.py에서만 호출. retrieval 경로 동기 LLM 호출 금지 룰 준수.
Args:
session: AsyncSession (호출자 관리, 본 함수 내부는 sessionmaker로 별도 연결 사용)
normalized_queries: [{"lang": "ko", "text": "...", "weight": 0.56}, ...]
weight는 _normalize_weights로 이미 합=1.0 정규화된 상태.
limit: 상위 결과 개수
Returns:
list[SearchResult] — doc_id 중복 제거. merged score = sum(per-query score * lang_weight).
"""
if not normalized_queries:
return []
# 1. 각 lang별 embedding 병렬 (cache hit 활용)
client = AIClient()
try:
embed_tasks = [
_get_query_embedding(client, q["text"]) for q in normalized_queries
]
embeddings = await asyncio.gather(*embed_tasks)
finally:
try:
await client.close()
except Exception:
pass
# embedding 실패한 query는 skip (weight 재정규화 없이 조용히 drop)
per_query_plan: list[tuple[dict, str]] = []
for q, emb in zip(normalized_queries, embeddings):
if emb is None:
logger.warning("multilingual embed skipped lang=%s", q.get("lang"))
continue
per_query_plan.append((q, str(emb)))
if not per_query_plan:
return []
# 2. 각 embedding에 대해 doc + chunks 병렬 retrieval
Session = async_sessionmaker(engine)
async def _one_query(q_meta: dict, embedding_str: str) -> list["SearchResult"]:
async def _docs() -> list["SearchResult"]:
async with Session() as s:
return await _search_vector_docs(s, embedding_str, limit * 4)
async def _chunks() -> list["SearchResult"]:
async with Session() as s:
return await _search_vector_chunks(s, embedding_str, limit * 4)
doc_r, chunk_r = await asyncio.gather(_docs(), _chunks())
return _merge_doc_and_chunk_vectors(doc_r, chunk_r)
per_query_results = await asyncio.gather(
*(_one_query(q, emb_str) for q, emb_str in per_query_plan)
)
# 3. weight 기반 merge — doc_id 중복 시 weighted score 합산
merged: dict[int, "SearchResult"] = {}
for (q_meta, _emb_str), results in zip(per_query_plan, per_query_results):
weight = float(q_meta.get("weight", 1.0) or 1.0)
for r in results:
weighted = r.score * weight
prev = merged.get(r.id)
if prev is None:
# 첫 방문: 원본을 shallow copy 대신 직접 wrap
r.score = weighted
r.match_reason = f"ml_{q_meta.get('lang', '?')}"
merged[r.id] = r
else:
# 중복: score 누적, 가장 높은 weight 소스로 match_reason 표시
prev.score += weighted
# match_reason 병합 (가독성)
if q_meta.get("lang") and q_meta.get("lang") not in (prev.match_reason or ""):
prev.match_reason = (prev.match_reason or "ml") + f"+{q_meta['lang']}"
sorted_results = sorted(merged.values(), key=lambda r: r.score, reverse=True)
return sorted_results[: limit * 4] # rerank 후보로 넉넉히
def compress_chunks_to_docs(
chunks: list["SearchResult"], limit: int
) -> tuple[list["SearchResult"], dict[int, list["SearchResult"]]]:
"""chunk-level 결과를 doc-level로 압축하면서 raw chunks를 보존.
fusion은 doc 기준이어야 하지만(같은 doc 중복 방지), Phase 1.3 reranker는
chunk 기준 raw 데이터가 필요함. 따라서 압축본과 raw를 동시 반환.
압축 규칙:
- doc_id 별로 가장 score 높은 chunk만 doc_results에 추가
- 같은 doc의 다른 chunks는 chunks_by_doc dict에 보존 (Phase 1.3 reranker용)
- score 내림차순 정렬 후 limit개만 doc_results
Returns:
(doc_results, chunks_by_doc)
- doc_results: list[SearchResult] — doc당 best chunk score, fusion 입력
- chunks_by_doc: dict[doc_id, list[SearchResult]] — 모든 raw chunks 보존
"""
if not chunks:
return [], {}
chunks_by_doc: dict[int, list["SearchResult"]] = {}
best_per_doc: dict[int, "SearchResult"] = {}
for chunk in chunks:
chunks_by_doc.setdefault(chunk.id, []).append(chunk)
prev_best = best_per_doc.get(chunk.id)
if prev_best is None or chunk.score > prev_best.score:
best_per_doc[chunk.id] = chunk
# doc 단위 best score 정렬, 상위 limit개
doc_results = sorted(best_per_doc.values(), key=lambda r: r.score, reverse=True)
return doc_results[:limit], chunks_by_doc