3092e3009d
phase-2a-embedding-diagnose.md v4 § 6 (dispatcher) + § 7 Phase 3 (51 case 측정) + § 7 Phase 4 (decision)
Round 2 review: round-2-review-mighty-starfish.md (R2-2 + R2-B1 페어 invariant + slug-based resolve)
코드 변경:
- app/services/search/retrieval_service.py:
- CANDIDATE_BACKEND_MAP allowlist (baseline / cand_me5_large_inst / cand_snowflake_l_v2)
- _resolve_backend(slug) → docs_table/chunks_table/embed_endpoint or None
- _embed_query_via_tei() — candidate TEI 엔드포인트 호출 (cache 미사용)
- _VALID_DOCS_TABLE + _VALID_CHUNKS_TABLE regex (R2-B1 2단계 gate)
- _search_vector_docs / _search_vector_chunks: docs_table/chunks_table + snapshot_*_id_max 파라미터
- search_vector + search_vector_multilingual: embedding_backend + snapshot_*_id_max 파라미터 + dispatch log
- app/services/search/search_pipeline.py: run_search() 시그니처 + 4 search_vector* 호출 threading
- app/api/search.py: 3 Query parameter + ValueError → HTTP 400 (allowed list 응답)
- tests/search_eval/run_eval.py: --embedding-backend + --snapshot-doc-id-max + --snapshot-chunk-id-max
+ call_search/call_search_full/evaluate threading + main 3 asyncio.run threading
측정 산출물 (51 case, scored=46, failure=5):
- reports/v0_2_phase2a_baseline_snapshot_2026-05-23.csv (snapshot filter 적용 production path)
- reports/v0_2_phase2a_me5_large_inst_2026-05-23.csv
- reports/v0_2_phase2a_snowflake_l_v2_2026-05-23.csv
- tests/search_eval/baselines/v0_2_phase2a_{baseline_snapshot,me5_large_inst,snowflake_l_v2}_2026-05-23.json (3개)
결과:
| Candidate | NDCG | Δ vs baseline | mixed | korean_only | p50 ms |
|------------------------------------|-----:|--------------:|------:|------------:|-------:|
| bge-m3 (baseline snapshot) | 0.659| — | 0.39 | 0.51 | 464 |
| cand_me5_large_inst | 0.477| -0.182 | 0.17 | 0.47 | 194 |
| cand_snowflake_l_v2 | 0.616| -0.043 | 0.35 | 0.52 | 254 |
Decision (H3): bge-m3 유지. 둘 다 net 회귀.
- mE5-large-instruct: 전 카테고리 회귀 (-0.182). prefix 미적용 변수 — 별 PR PR-2A-mE5-Prefix-Retry 후보.
- snowflake_l_v2: 가벼운 회귀 (-0.043). korean_only +0.01 미세 개선 신호.
- korean_only/mixed 약점 보완은 Phase 2B (Reranker) 또는 Phase 2Q (Query rewrite) 권고.
Decision report: reports/phase_2a_embedding_decision_2026-05-23.md (§ 1~8 포함, Closure gate 16 항목 모두 PASS).
후속 PR 백로그:
- PR-2A-mE5-Prefix-Retry (별 PR)
- PR-2A-Extended-Bge-Mgemma2 (별 PR, v3 결정)
- PR-2A-Cloud-Embedding-Scaffold-1 (Cohere/Voyage scaffold-only, 선택)
- PR-Search-Query-Rewrite-1 (Phase 2Q)
- PR-Search-Reranker-V2-Diagnose (Phase 2B)
- PR-2A-Chunks-Cand-Cleanup-1 (1주 후 cand 테이블 DROP)
production 영향:
- documents / document_chunks 컬럼/row 변경 0
- config.yaml 변경 0 (ollama bge-m3 unchanged)
- 추가된 endpoint = query parameter opt-in (미지정 시 production path 회귀 0)
- smoke 4건 PASS (baseline / baseline+snapshot / cand_me5 / cand_invalid → HTTP 400)
- dispatch log 박제 verify (snapshot_doc/chunk_id_max 박제)
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
630 lines
26 KiB
Python
630 lines
26 KiB
Python
"""검색 후보 수집 서비스 (Phase 1.2 + Phase 2.2 multilingual).
|
|
|
|
text(documents FTS + trigram) + vector(documents.embedding + chunks.embedding hybrid) 후보를
|
|
SearchResult 리스트로 반환.
|
|
|
|
Phase 1.1a: search.py의 _search_text/_search_vector를 이전 (ILIKE 그대로).
|
|
Phase 1.2-B: ILIKE → trigram `%` + `similarity()`. ILIKE 풀 스캔 제거.
|
|
Phase 1.2-C: vector retrieval을 document_chunks 테이블로 전환 → catastrophic recall 손실.
|
|
Phase 1.2-G: doc + chunks hybrid retrieval 보강.
|
|
- documents.embedding (recall robust, 자연어 매칭 강함)
|
|
- document_chunks.embedding (precision, segment 매칭)
|
|
- 두 SQL 동시 호출 후 doc_id 기준 merge (chunk 가중치 1.2, doc 1.0)
|
|
|
|
Phase 2.2 추가:
|
|
- _QUERY_EMBED_CACHE: bge-m3 query embedding 캐시 (모듈 레벨 LRU, TTL 24h)
|
|
- search_vector_multilingual: normalized_queries (lang별 쿼리) 배열 지원
|
|
QueryAnalyzer cache hit + analyzer_tier >= merge 일 때만 호출.
|
|
- crosslingual_ko_en NDCG 0.53 → 0.65+ 목표
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import hashlib
|
|
import re
|
|
import time
|
|
from typing import TYPE_CHECKING, Any
|
|
|
|
from sqlalchemy import text
|
|
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker
|
|
|
|
from ai.client import AIClient
|
|
from core.database import engine
|
|
from core.utils import setup_logger
|
|
|
|
if TYPE_CHECKING:
|
|
from api.search import SearchResult
|
|
|
|
|
|
logger = setup_logger("retrieval_service")
|
|
|
|
# Hybrid merge 가중치 (1.2-G)
|
|
DOC_VECTOR_WEIGHT = 1.0
|
|
CHUNK_VECTOR_WEIGHT = 1.2
|
|
|
|
# ─── Phase 2.2: Query embedding cache ───────────────────
|
|
# bge-m3 호출 비용 절반 감소 (동일 normalized_query 재호출 방지)
|
|
_QUERY_EMBED_CACHE: dict[str, dict[str, Any]] = {}
|
|
QUERY_EMBED_TTL = 86400 # 24h
|
|
QUERY_EMBED_MAXSIZE = 500
|
|
|
|
# ─── Phase 2A Diagnose dispatcher (R2-2 + R2-B1) ──────────────
|
|
# server-side allowlist map. query parameter 가 raw table name 받지 않음.
|
|
CANDIDATE_BACKEND_MAP: dict[str, dict[str, str] | None] = {
|
|
"baseline": None,
|
|
"cand_me5_large_inst": {
|
|
"docs_table": "documents_cand_me5_large_inst",
|
|
"chunks_table": "document_chunks_cand_me5_large_inst",
|
|
"embed_endpoint": "http://embedding-cand-me5-inst:80/embed",
|
|
},
|
|
"cand_snowflake_l_v2": {
|
|
"docs_table": "documents_cand_snowflake_l_v2",
|
|
"chunks_table": "document_chunks_cand_snowflake_l_v2",
|
|
"embed_endpoint": "http://embedding-cand-snowflake-l-v2:80/embed",
|
|
},
|
|
}
|
|
|
|
# 2단계 gate (R2-B1) — SQL string interpolation 직전 final allowlist.
|
|
_VALID_DOCS_TABLE = re.compile(r"^(documents|documents_cand_[a-z0-9_]+)$")
|
|
_VALID_CHUNKS_TABLE = re.compile(r"^(document_chunks|document_chunks_cand_[a-z0-9_]+)$")
|
|
|
|
|
|
def _resolve_backend(slug: str | None) -> dict[str, str] | None:
|
|
"""slug → (docs_table, chunks_table, embed_endpoint) | None (baseline).
|
|
|
|
Raises ValueError on unknown slug (caller 가 HTTP 400 으로 translate).
|
|
"""
|
|
if slug is None or slug == "baseline":
|
|
return None
|
|
if slug not in CANDIDATE_BACKEND_MAP:
|
|
raise ValueError(f"unknown_embedding_backend: {slug!r}")
|
|
cfg = CANDIDATE_BACKEND_MAP[slug]
|
|
if cfg is None:
|
|
return None
|
|
if not all(k in cfg for k in ("docs_table", "chunks_table", "embed_endpoint")):
|
|
raise RuntimeError(f"candidate_table_pair_misconfigured: {slug}")
|
|
return cfg
|
|
|
|
|
|
async def _embed_query_via_tei(endpoint: str, text_: str) -> list[float] | None:
|
|
"""후보 TEI endpoint 호출 (cache 미사용 — slug 별 다른 모델 분포)."""
|
|
if not text_:
|
|
return None
|
|
import httpx
|
|
try:
|
|
async with httpx.AsyncClient(timeout=30.0) as c:
|
|
r = await c.post(endpoint, json={"inputs": [text_], "truncate": True})
|
|
r.raise_for_status()
|
|
data = r.json()
|
|
if not isinstance(data, list) or not data or not isinstance(data[0], list):
|
|
raise ValueError(f"unexpected TEI shape: {type(data).__name__}")
|
|
return data[0]
|
|
except Exception as exc:
|
|
logger.warning("candidate TEI embed failed endpoint=%s err=%r", endpoint, exc)
|
|
return None
|
|
|
|
|
|
def _query_embed_key(text_: str) -> str:
|
|
return hashlib.sha256(f"{text_}|bge-m3".encode("utf-8")).hexdigest()
|
|
|
|
|
|
async def _get_query_embedding(
|
|
client: AIClient, text_: str
|
|
) -> list[float] | None:
|
|
"""Query embedding with in-memory cache.
|
|
|
|
동일 텍스트 재호출 시 bge-m3 skip. fixed query 회귀 시 vector_ms 대폭 감소.
|
|
"""
|
|
if not text_:
|
|
return None
|
|
key = _query_embed_key(text_)
|
|
entry = _QUERY_EMBED_CACHE.get(key)
|
|
if entry and time.time() - entry["ts"] < QUERY_EMBED_TTL:
|
|
return entry["emb"]
|
|
try:
|
|
emb = await client.embed(text_)
|
|
except Exception as exc:
|
|
logger.warning("query embed failed text=%r err=%r", text_[:40], exc)
|
|
return None
|
|
if len(_QUERY_EMBED_CACHE) >= QUERY_EMBED_MAXSIZE:
|
|
try:
|
|
oldest = next(iter(_QUERY_EMBED_CACHE))
|
|
_QUERY_EMBED_CACHE.pop(oldest, None)
|
|
except StopIteration:
|
|
pass
|
|
_QUERY_EMBED_CACHE[key] = {"emb": emb, "ts": time.time()}
|
|
return emb
|
|
|
|
|
|
def query_embed_cache_stats() -> dict[str, int]:
|
|
return {"size": len(_QUERY_EMBED_CACHE), "maxsize": QUERY_EMBED_MAXSIZE}
|
|
|
|
|
|
async def search_text(
|
|
session: AsyncSession, query: str, limit: int
|
|
) -> list["SearchResult"]:
|
|
"""FTS + trigram 필드별 가중치 검색 (Phase 1.2-B UNION 분해).
|
|
|
|
Phase 1.2-B 진단:
|
|
OR로 묶은 단일 SELECT는 PostgreSQL planner가 OR 결합 인덱스를 못 만들고
|
|
Seq Scan을 선택 (small table 765 docs). EXPLAIN으로 측정 시 525ms.
|
|
→ CTE + UNION으로 분해하면 각 branch가 자기 인덱스 활용 → 26ms (95% 감소).
|
|
|
|
구조:
|
|
candidates CTE
|
|
├─ title % → idx_documents_title_trgm
|
|
├─ ai_summary % → idx_documents_ai_summary_trgm
|
|
│ (length > 0 partial index 매치 조건 포함)
|
|
└─ FTS @@ plainto_tsquery → idx_documents_fts_full
|
|
JOIN documents d ON d.id = c.id
|
|
ORDER BY 5컬럼 similarity 가중 합산 + ts_rank * 2.0
|
|
가중치: title 3.0 / ai_tags 2.5 / user_note 2.0 / ai_summary 1.5 / extracted_text 1.0
|
|
|
|
threshold:
|
|
pg_trgm.similarity_threshold default = 0.3
|
|
→ multi-token 한국어 뉴스 쿼리(예: "이란 미국 전쟁 글로벌 반응")에서
|
|
candidates를 못 모음 → recall 감소 (0.788 → 0.750)
|
|
→ set_limit(0.15)으로 낮춰 recall 회복. precision은 ORDER BY similarity 합산이 보정.
|
|
"""
|
|
from api.search import SearchResult # 순환 import 회피
|
|
|
|
# trigram threshold를 0.15로 낮춰 multi-token query recall 회복
|
|
# SQLAlchemy async session 내 두 execute는 같은 connection 사용
|
|
await session.execute(text("SELECT set_limit(0.15)"))
|
|
|
|
result = await session.execute(
|
|
text("""
|
|
WITH candidates AS (
|
|
-- title trigram (idx_documents_title_trgm)
|
|
SELECT id FROM documents
|
|
WHERE deleted_at IS NULL AND title % :q
|
|
UNION
|
|
-- ai_summary trigram (idx_documents_ai_summary_trgm 부분 인덱스 매치)
|
|
SELECT id FROM documents
|
|
WHERE deleted_at IS NULL
|
|
AND ai_summary IS NOT NULL
|
|
AND length(ai_summary) > 0
|
|
AND ai_summary % :q
|
|
UNION
|
|
-- FTS 통합 인덱스 (idx_documents_fts_full)
|
|
SELECT id FROM documents
|
|
WHERE deleted_at IS NULL
|
|
AND to_tsvector('simple',
|
|
coalesce(title, '') || ' ' ||
|
|
coalesce(ai_tags::text, '') || ' ' ||
|
|
coalesce(ai_summary, '') || ' ' ||
|
|
coalesce(user_note, '') || ' ' ||
|
|
coalesce(extracted_text, '')
|
|
) @@ plainto_tsquery('simple', :q)
|
|
)
|
|
SELECT d.id, d.title, d.ai_domain, d.ai_summary, d.file_format,
|
|
left(d.extracted_text, 1200) AS snippet,
|
|
(
|
|
-- 컬럼별 trigram similarity 가중 합산
|
|
similarity(coalesce(d.title, ''), :q) * 3.0
|
|
+ similarity(coalesce(d.ai_tags::text, ''), :q) * 2.5
|
|
+ similarity(coalesce(d.user_note, ''), :q) * 2.0
|
|
+ similarity(coalesce(d.ai_summary, ''), :q) * 1.5
|
|
+ similarity(coalesce(d.extracted_text, ''), :q) * 1.0
|
|
-- FTS 보너스 (idx_documents_fts_full 활용)
|
|
+ coalesce(ts_rank(
|
|
to_tsvector('simple',
|
|
coalesce(d.title, '') || ' ' ||
|
|
coalesce(d.ai_tags::text, '') || ' ' ||
|
|
coalesce(d.ai_summary, '') || ' ' ||
|
|
coalesce(d.user_note, '') || ' ' ||
|
|
coalesce(d.extracted_text, '')
|
|
),
|
|
plainto_tsquery('simple', :q)
|
|
), 0) * 2.0
|
|
) AS score,
|
|
-- match_reason: similarity 가장 큰 컬럼 또는 FTS
|
|
CASE
|
|
WHEN similarity(coalesce(d.title, ''), :q) >= 0.3 THEN 'title'
|
|
WHEN similarity(coalesce(d.ai_tags::text, ''), :q) >= 0.3 THEN 'tags'
|
|
WHEN similarity(coalesce(d.user_note, ''), :q) >= 0.3 THEN 'note'
|
|
WHEN similarity(coalesce(d.ai_summary, ''), :q) >= 0.3 THEN 'summary'
|
|
WHEN similarity(coalesce(d.extracted_text, ''), :q) >= 0.3 THEN 'content'
|
|
ELSE 'fts'
|
|
END AS match_reason
|
|
FROM documents d
|
|
JOIN candidates c ON d.id = c.id
|
|
ORDER BY score DESC
|
|
LIMIT :limit
|
|
"""),
|
|
{"q": query, "limit": limit},
|
|
)
|
|
return [SearchResult(**row._mapping) for row in result]
|
|
|
|
|
|
async def search_vector(
|
|
session: AsyncSession,
|
|
query: str,
|
|
limit: int,
|
|
*,
|
|
embedding_backend: str | None = None,
|
|
snapshot_doc_id_max: int | None = None,
|
|
snapshot_chunk_id_max: int | None = None,
|
|
) -> list["SearchResult"]:
|
|
"""Hybrid 벡터 검색 — doc + chunks 동시 retrieval (Phase 1.2-G).
|
|
|
|
Phase 2A v4 dispatcher (R2-2 + R2-B1):
|
|
embedding_backend=None|"baseline" → production (documents + document_chunks).
|
|
snapshot_*_id_max 지정 시 baseline 도 동일 filter (rebaseline measurement).
|
|
embedding_backend=cand_<slug> → CANDIDATE_BACKEND_MAP 에서 페어 resolve.
|
|
cand 테이블 자체가 snapshot 범위로 INSERT → snapshot filter 무시 (dispatch log 만 박제).
|
|
|
|
데이터 흐름:
|
|
1. query embedding 1번 (baseline=bge-m3 cache / cand=TEI endpoint no-cache)
|
|
2. asyncio.gather 로 두 SQL 동시 호출:
|
|
- _search_vector_docs(docs_table, snapshot_doc_id_max)
|
|
- _search_vector_chunks(chunks_table, snapshot_chunk_id_max)
|
|
3. _merge_doc_and_chunk_vectors 가중치 + dedup (chunk 1.2 / doc 1.0).
|
|
"""
|
|
cfg = _resolve_backend(embedding_backend)
|
|
|
|
if cfg is None:
|
|
docs_table = "documents"
|
|
chunks_table = "document_chunks"
|
|
client = AIClient()
|
|
try:
|
|
query_embedding = await _get_query_embedding(client, query)
|
|
finally:
|
|
try:
|
|
await client.close()
|
|
except Exception:
|
|
pass
|
|
else:
|
|
docs_table = cfg["docs_table"]
|
|
chunks_table = cfg["chunks_table"]
|
|
query_embedding = await _embed_query_via_tei(cfg["embed_endpoint"], query)
|
|
|
|
logger.info(
|
|
"[embedding-dispatch] backend=%s docs_table=%s chunks_table=%s snapshot_doc_id_max=%s snapshot_chunk_id_max=%s",
|
|
embedding_backend or "baseline",
|
|
docs_table,
|
|
chunks_table,
|
|
snapshot_doc_id_max,
|
|
snapshot_chunk_id_max,
|
|
)
|
|
|
|
if query_embedding is None:
|
|
return []
|
|
|
|
embedding_str = str(query_embedding)
|
|
|
|
Session = async_sessionmaker(engine)
|
|
|
|
async def _docs_call() -> list["SearchResult"]:
|
|
async with Session() as s:
|
|
return await _search_vector_docs(
|
|
s, embedding_str, limit * 4,
|
|
docs_table=docs_table,
|
|
snapshot_doc_id_max=snapshot_doc_id_max,
|
|
)
|
|
|
|
async def _chunks_call() -> list["SearchResult"]:
|
|
async with Session() as s:
|
|
return await _search_vector_chunks(
|
|
s, embedding_str, limit * 4,
|
|
chunks_table=chunks_table,
|
|
snapshot_chunk_id_max=snapshot_chunk_id_max,
|
|
)
|
|
|
|
doc_results, chunk_results = await asyncio.gather(_docs_call(), _chunks_call())
|
|
|
|
return _merge_doc_and_chunk_vectors(doc_results, chunk_results)
|
|
|
|
|
|
async def _search_vector_docs(
|
|
session: AsyncSession,
|
|
embedding_str: str,
|
|
limit: int,
|
|
*,
|
|
docs_table: str = "documents",
|
|
snapshot_doc_id_max: int | None = None,
|
|
) -> list["SearchResult"]:
|
|
"""documents (또는 documents_cand_<slug>).embedding 직접 검색.
|
|
|
|
docs_table = "documents": production path. snapshot_doc_id_max 지정 시 id <= max filter.
|
|
docs_table = "documents_cand_<slug>": 후보 path. cand 테이블이 이미 snapshot 범위로 INSERT됨 →
|
|
snapshot_doc_id_max 무시. metadata 는 production documents 와 JOIN.
|
|
|
|
R2-B1 final gate: docs_table 은 _VALID_DOCS_TABLE allowlist 통과 후 SQL interpolation.
|
|
"""
|
|
from api.search import SearchResult # 순환 import 회피
|
|
|
|
if not _VALID_DOCS_TABLE.match(docs_table):
|
|
raise RuntimeError(f"invalid_docs_table: {docs_table!r}")
|
|
|
|
params: dict[str, Any] = {"embedding": embedding_str, "limit": limit}
|
|
|
|
if docs_table == "documents":
|
|
snapshot_clause = ""
|
|
if snapshot_doc_id_max is not None:
|
|
snapshot_clause = " AND id <= :snapshot_doc_id_max"
|
|
params["snapshot_doc_id_max"] = snapshot_doc_id_max
|
|
sql = f"""
|
|
SELECT id, title, ai_domain, ai_summary, file_format,
|
|
(1 - (embedding <=> cast(:embedding AS vector))) AS score,
|
|
left(extracted_text, 1200) AS snippet,
|
|
'vector_doc' AS match_reason,
|
|
NULL::bigint AS chunk_id, NULL::integer AS chunk_index, NULL::text AS section_title
|
|
FROM documents
|
|
WHERE embedding IS NOT NULL AND deleted_at IS NULL{snapshot_clause}
|
|
ORDER BY embedding <=> cast(:embedding AS vector)
|
|
LIMIT :limit
|
|
"""
|
|
else:
|
|
# candidate: docs_table 은 (doc_id, embed_input, embed_input_hash, embedding) 만 보유 → JOIN documents
|
|
sql = f"""
|
|
SELECT d.id, d.title, d.ai_domain, d.ai_summary, d.file_format,
|
|
(1 - (c.embedding <=> cast(:embedding AS vector))) AS score,
|
|
left(d.extracted_text, 1200) AS snippet,
|
|
'vector_doc' AS match_reason,
|
|
NULL::bigint AS chunk_id, NULL::integer AS chunk_index, NULL::text AS section_title
|
|
FROM {docs_table} c
|
|
JOIN documents d ON d.id = c.doc_id
|
|
WHERE d.deleted_at IS NULL
|
|
ORDER BY c.embedding <=> cast(:embedding AS vector)
|
|
LIMIT :limit
|
|
"""
|
|
result = await session.execute(text(sql), params)
|
|
return [SearchResult(**row._mapping) for row in result]
|
|
|
|
|
|
async def _search_vector_chunks(
|
|
session: AsyncSession,
|
|
embedding_str: str,
|
|
limit: int,
|
|
*,
|
|
chunks_table: str = "document_chunks",
|
|
snapshot_chunk_id_max: int | None = None,
|
|
) -> list["SearchResult"]:
|
|
"""document_chunks (또는 document_chunks_cand_<slug>).embedding window partition.
|
|
|
|
chunks_table = "document_chunks": production path. snapshot_chunk_id_max 지정 시 c.id <= max filter.
|
|
chunks_table = "document_chunks_cand_<slug>": cand 테이블 (이미 snapshot 범위로 INSERT) → filter 무시.
|
|
|
|
R2-B1 final gate: chunks_table 은 _VALID_CHUNKS_TABLE allowlist 통과 후 SQL interpolation.
|
|
"""
|
|
from api.search import SearchResult # 순환 import 회피
|
|
|
|
if not _VALID_CHUNKS_TABLE.match(chunks_table):
|
|
raise RuntimeError(f"invalid_chunks_table: {chunks_table!r}")
|
|
|
|
inner_k = max(limit * 5, 500)
|
|
params: dict[str, Any] = {"embedding": embedding_str, "inner_k": inner_k, "limit": limit}
|
|
|
|
snapshot_clause = ""
|
|
if chunks_table == "document_chunks" and snapshot_chunk_id_max is not None:
|
|
snapshot_clause = " AND c.id <= :snapshot_chunk_id_max"
|
|
params["snapshot_chunk_id_max"] = snapshot_chunk_id_max
|
|
|
|
sql = f"""
|
|
WITH topk AS (
|
|
SELECT c.id AS chunk_id, c.doc_id, c.chunk_index, c.section_title, c.text,
|
|
c.embedding <=> cast(:embedding AS vector) AS dist
|
|
FROM {chunks_table} c
|
|
WHERE c.embedding IS NOT NULL{snapshot_clause}
|
|
ORDER BY c.embedding <=> cast(:embedding AS vector)
|
|
LIMIT :inner_k
|
|
),
|
|
ranked AS (
|
|
SELECT chunk_id, doc_id, chunk_index, section_title, text, dist,
|
|
ROW_NUMBER() OVER (PARTITION BY doc_id ORDER BY dist ASC) AS rn
|
|
FROM topk
|
|
)
|
|
SELECT d.id AS id, d.title AS title, d.ai_domain AS ai_domain,
|
|
d.ai_summary AS ai_summary, d.file_format AS file_format,
|
|
(1 - r.dist) AS score, left(r.text, 1200) AS snippet,
|
|
'vector_chunk' AS match_reason,
|
|
r.chunk_id AS chunk_id, r.chunk_index AS chunk_index, r.section_title AS section_title
|
|
FROM ranked r
|
|
JOIN documents d ON d.id = r.doc_id
|
|
WHERE r.rn <= 2 AND d.deleted_at IS NULL
|
|
ORDER BY r.dist
|
|
LIMIT :limit
|
|
"""
|
|
result = await session.execute(text(sql), params)
|
|
return [SearchResult(**row._mapping) for row in result]
|
|
|
|
|
|
def _merge_doc_and_chunk_vectors(
|
|
doc_results: list["SearchResult"],
|
|
chunk_results: list["SearchResult"],
|
|
) -> list["SearchResult"]:
|
|
"""doc + chunks vector 결과 merge (Phase 1.2-G).
|
|
|
|
가중치:
|
|
- chunk score * 1.2 (segment 매칭이 더 정확)
|
|
- doc score * 1.0 (전체 본문 평균, recall 보완)
|
|
|
|
Dedup:
|
|
- doc_id 기준
|
|
- chunks가 있으면 chunks 우선 (segment 정보 + chunk_id 보존)
|
|
- chunks에 없는 doc은 doc-wrap으로 추가
|
|
|
|
Returns:
|
|
score 내림차순 정렬된 SearchResult 리스트.
|
|
chunk_id가 None이면 doc-wrap 결과(text-only 매치 doc 처리에 사용).
|
|
"""
|
|
by_doc_id: dict[int, "SearchResult"] = {}
|
|
|
|
# chunks 먼저 (가중치 적용 + chunk_id 보존)
|
|
for c in chunk_results:
|
|
c.score = c.score * CHUNK_VECTOR_WEIGHT
|
|
prev = by_doc_id.get(c.id)
|
|
if prev is None or c.score > prev.score:
|
|
by_doc_id[c.id] = c
|
|
|
|
# doc 매치는 chunks에 없는 doc만 추가 (chunks 우선 원칙)
|
|
for d in doc_results:
|
|
d.score = d.score * DOC_VECTOR_WEIGHT
|
|
if d.id not in by_doc_id:
|
|
by_doc_id[d.id] = d
|
|
|
|
# score 내림차순 정렬
|
|
return sorted(by_doc_id.values(), key=lambda r: r.score, reverse=True)
|
|
|
|
|
|
async def search_vector_multilingual(
|
|
session: AsyncSession,
|
|
normalized_queries: list[dict],
|
|
limit: int,
|
|
*,
|
|
embedding_backend: str | None = None,
|
|
snapshot_doc_id_max: int | None = None,
|
|
snapshot_chunk_id_max: int | None = None,
|
|
) -> list["SearchResult"]:
|
|
"""Phase 2.2 — 다국어 normalized_queries 배열로 vector retrieval.
|
|
|
|
각 language query에 대해 embedding을 병렬 생성(cache hit 활용),
|
|
각 embedding에 대해 기존 docs+chunks hybrid 호출,
|
|
결과를 weight 기반으로 merge.
|
|
|
|
⚠️ 호출 조건:
|
|
- QueryAnalyzer cache hit 이어야 함 (async-only 룰)
|
|
- analyzer_confidence 높고 normalized_queries 존재해야 함
|
|
- search.py에서만 호출. retrieval 경로 동기 LLM 호출 금지 룰 준수.
|
|
|
|
Args:
|
|
session: AsyncSession (호출자 관리, 본 함수 내부는 sessionmaker로 별도 연결 사용)
|
|
normalized_queries: [{"lang": "ko", "text": "...", "weight": 0.56}, ...]
|
|
weight는 _normalize_weights로 이미 합=1.0 정규화된 상태.
|
|
limit: 상위 결과 개수
|
|
|
|
Returns:
|
|
list[SearchResult] — doc_id 중복 제거. merged score = sum(per-query score * lang_weight).
|
|
"""
|
|
if not normalized_queries:
|
|
return []
|
|
|
|
# 1. 각 lang별 embedding 병렬 (baseline=AIClient.embed cache / cand=TEI endpoint no-cache)
|
|
_cfg_for_embed = _resolve_backend(embedding_backend)
|
|
if _cfg_for_embed is None:
|
|
client = AIClient()
|
|
try:
|
|
embed_tasks = [
|
|
_get_query_embedding(client, q["text"]) for q in normalized_queries
|
|
]
|
|
embeddings = await asyncio.gather(*embed_tasks)
|
|
finally:
|
|
try:
|
|
await client.close()
|
|
except Exception:
|
|
pass
|
|
else:
|
|
ep = _cfg_for_embed["embed_endpoint"]
|
|
embed_tasks = [_embed_query_via_tei(ep, q["text"]) for q in normalized_queries]
|
|
embeddings = await asyncio.gather(*embed_tasks)
|
|
|
|
# embedding 실패한 query는 skip (weight 재정규화 없이 조용히 drop)
|
|
per_query_plan: list[tuple[dict, str]] = []
|
|
for q, emb in zip(normalized_queries, embeddings):
|
|
if emb is None:
|
|
logger.warning("multilingual embed skipped lang=%s", q.get("lang"))
|
|
continue
|
|
per_query_plan.append((q, str(emb)))
|
|
|
|
if not per_query_plan:
|
|
return []
|
|
|
|
# 2. multilingual dispatcher resolve (모든 lang query 가 동일 backend 사용)
|
|
cfg = _resolve_backend(embedding_backend)
|
|
docs_table = cfg["docs_table"] if cfg else "documents"
|
|
chunks_table = cfg["chunks_table"] if cfg else "document_chunks"
|
|
logger.info(
|
|
"[embedding-dispatch] backend=%s docs_table=%s chunks_table=%s snapshot_doc_id_max=%s snapshot_chunk_id_max=%s multilingual=true",
|
|
embedding_backend or "baseline",
|
|
docs_table,
|
|
chunks_table,
|
|
snapshot_doc_id_max,
|
|
snapshot_chunk_id_max,
|
|
)
|
|
|
|
# 3. 각 embedding에 대해 doc + chunks 병렬 retrieval
|
|
Session = async_sessionmaker(engine)
|
|
|
|
async def _one_query(q_meta: dict, embedding_str: str) -> list["SearchResult"]:
|
|
async def _docs() -> list["SearchResult"]:
|
|
async with Session() as s:
|
|
return await _search_vector_docs(
|
|
s, embedding_str, limit * 4,
|
|
docs_table=docs_table,
|
|
snapshot_doc_id_max=snapshot_doc_id_max,
|
|
)
|
|
|
|
async def _chunks() -> list["SearchResult"]:
|
|
async with Session() as s:
|
|
return await _search_vector_chunks(
|
|
s, embedding_str, limit * 4,
|
|
chunks_table=chunks_table,
|
|
snapshot_chunk_id_max=snapshot_chunk_id_max,
|
|
)
|
|
|
|
doc_r, chunk_r = await asyncio.gather(_docs(), _chunks())
|
|
return _merge_doc_and_chunk_vectors(doc_r, chunk_r)
|
|
|
|
per_query_results = await asyncio.gather(
|
|
*(_one_query(q, emb_str) for q, emb_str in per_query_plan)
|
|
)
|
|
|
|
# 3. weight 기반 merge — doc_id 중복 시 weighted score 합산
|
|
merged: dict[int, "SearchResult"] = {}
|
|
for (q_meta, _emb_str), results in zip(per_query_plan, per_query_results):
|
|
weight = float(q_meta.get("weight", 1.0) or 1.0)
|
|
for r in results:
|
|
weighted = r.score * weight
|
|
prev = merged.get(r.id)
|
|
if prev is None:
|
|
# 첫 방문: 원본을 shallow copy 대신 직접 wrap
|
|
r.score = weighted
|
|
r.match_reason = f"ml_{q_meta.get('lang', '?')}"
|
|
merged[r.id] = r
|
|
else:
|
|
# 중복: score 누적, 가장 높은 weight 소스로 match_reason 표시
|
|
prev.score += weighted
|
|
# match_reason 병합 (가독성)
|
|
if q_meta.get("lang") and q_meta.get("lang") not in (prev.match_reason or ""):
|
|
prev.match_reason = (prev.match_reason or "ml") + f"+{q_meta['lang']}"
|
|
|
|
sorted_results = sorted(merged.values(), key=lambda r: r.score, reverse=True)
|
|
return sorted_results[: limit * 4] # rerank 후보로 넉넉히
|
|
|
|
|
|
def compress_chunks_to_docs(
|
|
chunks: list["SearchResult"], limit: int
|
|
) -> tuple[list["SearchResult"], dict[int, list["SearchResult"]]]:
|
|
"""chunk-level 결과를 doc-level로 압축하면서 raw chunks를 보존.
|
|
|
|
fusion은 doc 기준이어야 하지만(같은 doc 중복 방지), Phase 1.3 reranker는
|
|
chunk 기준 raw 데이터가 필요함. 따라서 압축본과 raw를 동시 반환.
|
|
|
|
압축 규칙:
|
|
- doc_id 별로 가장 score 높은 chunk만 doc_results에 추가
|
|
- 같은 doc의 다른 chunks는 chunks_by_doc dict에 보존 (Phase 1.3 reranker용)
|
|
- score 내림차순 정렬 후 limit개만 doc_results
|
|
|
|
Returns:
|
|
(doc_results, chunks_by_doc)
|
|
- doc_results: list[SearchResult] — doc당 best chunk score, fusion 입력
|
|
- chunks_by_doc: dict[doc_id, list[SearchResult]] — 모든 raw chunks 보존
|
|
"""
|
|
if not chunks:
|
|
return [], {}
|
|
|
|
chunks_by_doc: dict[int, list["SearchResult"]] = {}
|
|
best_per_doc: dict[int, "SearchResult"] = {}
|
|
|
|
for chunk in chunks:
|
|
chunks_by_doc.setdefault(chunk.id, []).append(chunk)
|
|
prev_best = best_per_doc.get(chunk.id)
|
|
if prev_best is None or chunk.score > prev_best.score:
|
|
best_per_doc[chunk.id] = chunk
|
|
|
|
# doc 단위 best score 정렬, 상위 limit개
|
|
doc_results = sorted(best_per_doc.values(), key=lambda r: r.score, reverse=True)
|
|
return doc_results[:limit], chunks_by_doc
|