Files
hyungi_document_server/app/services/search/retrieval_service.py
T
hyungi 8cdfe6006d feat(search): cloud-egress 게이트를 단건 문서 fetch 로 확장
GET /api/documents/{id} 가 egress=cloud 토큰일 때 search 와 동일한
cloud-eligibility 게이트(egress allowlist 갭2 + license 제한 B-4)를 통과한
문서만 반환. id 직접 fetch 로 비공개/인프라/개인/restricted 문서를 우회
열람하는 경로 차단 — 부적격은 404(존재 비노출). local 토큰=무회귀.

술어는 retrieval_service.cloud_eligible_doc_sql 로 단일화(_axis_sql
cloud_egress + _license_sql 합성) → search retrieval 과 byte-동일 게이트
공유, 경로별 드리프트 방지. MCP fetch_document 툴의 서버사이드 강제.

e2e: cloud 토큰 적격 Eng 200 / 인프라알림·리디북스memo·개인노트 404,
local 토큰 전부 200(무회귀).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-29 21:52:41 +00:00

854 lines
37 KiB
Python

"""검색 후보 수집 서비스 (Phase 1.2 + Phase 2.2 multilingual).
text(documents FTS + trigram) + vector(documents.embedding + chunks.embedding hybrid) 후보를
SearchResult 리스트로 반환.
Phase 1.1a: search.py의 _search_text/_search_vector를 이전 (ILIKE 그대로).
Phase 1.2-B: ILIKE → trigram `%` + `similarity()`. ILIKE 풀 스캔 제거.
Phase 1.2-C: vector retrieval을 document_chunks 테이블로 전환 → catastrophic recall 손실.
Phase 1.2-G: doc + chunks hybrid retrieval 보강.
- documents.embedding (recall robust, 자연어 매칭 강함)
- document_chunks.embedding (precision, segment 매칭)
- 두 SQL 동시 호출 후 doc_id 기준 merge (chunk 가중치 1.2, doc 1.0)
Phase 2.2 추가:
- _QUERY_EMBED_CACHE: bge-m3 query embedding 캐시 (모듈 레벨 LRU, TTL 24h)
- search_vector_multilingual: normalized_queries (lang별 쿼리) 배열 지원
QueryAnalyzer cache hit + analyzer_tier >= merge 일 때만 호출.
- crosslingual_ko_en NDCG 0.53 → 0.65+ 목표
"""
from __future__ import annotations
import asyncio
import hashlib
import re
import time
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any
from sqlalchemy import text
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker
from ai.client import AIClient
from core.database import engine
from core.utils import setup_logger
if TYPE_CHECKING:
from api.search import SearchResult
logger = setup_logger("retrieval_service")
# Hybrid merge 가중치 (1.2-G)
DOC_VECTOR_WEIGHT = 1.0
CHUNK_VECTOR_WEIGHT = 1.2
# ─── Phase 2.2: Query embedding cache ───────────────────
# bge-m3 호출 비용 절반 감소 (동일 normalized_query 재호출 방지)
_QUERY_EMBED_CACHE: dict[str, dict[str, Any]] = {}
QUERY_EMBED_TTL = 86400 # 24h
QUERY_EMBED_MAXSIZE = 500
# ─── Phase 2A Diagnose dispatcher (R2-2 + R2-B1) ──────────────
# server-side allowlist map. query parameter 가 raw table name 받지 않음.
CANDIDATE_BACKEND_MAP: dict[str, dict[str, str] | None] = {
"baseline": None,
# Phase 2A 임베딩 후보(me5_large_inst·snowflake_l_v2·qwen06·qwen4·qwen4m) 전량 no-go
# 종결(2026-06-12, 후보 전부 -0.03~-0.04) → cand 슬러그·테이블 제거 (R13, 마이그 360
# DROP). read-path 슬러그를 먼저 빼야 embedding_backend=cand_X /search 가 dropped 테이블을
# 읽어 500 나지 않는다. baseline(production)만 잔존.
}
# G-1 핀 고정 instruct 문자열 (inventory 2026-06-12-c 기록과 동일해야 함 —
# 문구 변경 = 저장=조회 불변식 위반과 동급. 쿼리 측 전용, 문서 적재는 plain).
QWEN3_QUERY_INSTRUCT = (
"Instruct: Given a web search query, retrieve relevant passages that answer the query"
"\nQuery: "
)
# ─── 안전 자료실 C-1: 분류 축 명시 필터 (3 leg 동등, byte 불변) ───────────────
# 미지정(active=False) 시 모든 SQL 절이 빈 문자열 → 기존 SQL byte 불변(run_eval 회귀 0).
# year 는 published_date NULL fallback created_at (freshness 와 동일 COALESCE 사상).
@dataclass
class AxisFilter:
material_types: list[str] | None = None # CSV → list, material_type = ANY
jurisdiction: str | None = None
year_from: int | None = None
year_to: int | None = None
domain_buckets: list[str] | None = None # 377: domain_bucket = ANY (도메인 스코프)
exclude_buckets: list[str] | None = None # 377: domain_bucket <> ALL (예: News 제외)
cloud_egress: bool = False # 갭2: 클라우드 소비자 cloud-eligibility allowlist 강제(토큰 claim 유래)
def active(self) -> bool:
return bool(self.material_types or self.jurisdiction
or self.year_from is not None or self.year_to is not None
or self.domain_buckets or self.exclude_buckets
or self.cloud_egress)
def _axis_sql(alias: str, af: "AxisFilter | None", params: dict) -> str:
"""alias 기준 axis 필터 SQL — 미지정 시 '' (byte 불변). 반환 형태 ' AND ...'.
alias='' 이면 컬럼 직접 참조(단일 테이블 FROM documents 경로). 파라미터는 af_ prefix
로 호출측 기존 bind 와 충돌 방지.
"""
if af is None or not af.active():
return ""
p = (alias + ".") if alias else ""
cl: list[str] = []
if af.material_types:
cl.append(f"{p}material_type = ANY(:af_mt)")
params["af_mt"] = af.material_types
if af.jurisdiction:
cl.append(f"{p}jurisdiction = :af_jur")
params["af_jur"] = af.jurisdiction
if af.year_from is not None:
cl.append(f"COALESCE({p}published_date, {p}created_at::date) >= make_date(:af_yf, 1, 1)")
params["af_yf"] = af.year_from
if af.year_to is not None:
cl.append(f"COALESCE({p}published_date, {p}created_at::date) <= make_date(:af_yt, 12, 31)")
params["af_yt"] = af.year_to
if af.domain_buckets:
cl.append(f"{p}domain_bucket = ANY(:af_db)")
params["af_db"] = af.domain_buckets
if af.exclude_buckets:
cl.append(f"{p}domain_bucket <> ALL(:af_xdb)")
params["af_xdb"] = af.exclude_buckets
if af.cloud_egress:
# 갭2 클라우드 egress allowlist(default-deny). restricted 는 _license_sql 별도 차단.
cl.append(
f"({p}data_origin = 'external' OR ("
f"{p}data_origin = 'work' "
f"AND {p}domain_bucket IN ('Engineering','Safety','Law') "
f"AND ({p}source_channel IS NULL OR {p}source_channel::text NOT IN ('voice','chat','memo')) "
f"AND {p}category::text IS DISTINCT FROM 'memo' "
f"AND ({p}user_note IS NULL OR {p}user_note = '')))"
)
return " AND " + " AND ".join(cl)
# ─── 안전 자료실 B-4: licensed_restricted 단일 술어 (a안 U-2① — 항상 적용) ──────
def _license_sql(alias: str) -> str:
"""licensed_restricted(extract_meta.license.restricted=true) 문서를 retrieval 에서 제외.
a안: 색인은 허용하되, 구매 전자책/유료자료의 verbatim span 이 RAG 증거·digest 발행에
들어가는 경로를 구조적으로 차단. 이 단일 술어를 모든 retrieval leg + digest loader 가
공유 — 경로별 가드 누락 방지([[feedback_structural_integrity_over_path_discipline]]).
개인 파일 열람(GET /documents/{id}?download)은 a안상 허용이라 미적용.
axis 필터(조건부)와 달리 항상 적용. restricted 부재/false = COALESCE 로 미제외 →
기존 코퍼스(restricted=true 0건)에서 결과 불변. 반환 ' AND ...' (alias='' = 컬럼 직접).
술어 정의 = license_filter.restricted_exclude_sql 공유(digest/briefing/study 풀이와 단일 source).
"""
from services.search.license_filter import restricted_exclude_sql
return " AND " + restricted_exclude_sql(alias)
def cloud_eligible_doc_sql(alias: str = "") -> str:
"""단일 문서가 cloud 소비자(예: Claude/MCP)에게 노출 가능한가 = search retrieval 과
동일한 egress allowlist(갭2) + license 제한(B-4) 결합 술어. fetch_document(cloud) 가
search 와 byte-동일 게이트를 공유하도록 단일 source([[feedback_structural_integrity_over_path_discipline]]).
cloud_egress·license leg 모두 bind 파라미터 없는 리터럴 술어라 호출측 추가 params 불요.
주의: _license_sql 은 소유자 단건 다운로드엔 미적용(a안)이지만, cloud 노출은 구매 전자책
verbatim 누출을 막아야 하므로 여기선 항상 적용 = search 와 동일(local 토큰은 이 게이트 미발동).
반환 ' AND (egress allowlist) AND (license)' (alias='' = 컬럼 직접 참조). default-deny."""
return _axis_sql(alias, AxisFilter(cloud_egress=True), {}) + _license_sql(alias)
# 2단계 gate (R2-B1) — SQL string interpolation 직전 final allowlist.
_VALID_DOCS_TABLE = re.compile(r"^(documents|documents_cand_[a-z0-9_]+)$")
# corpus_chunks = document_chunks WHERE in_corpus=true 뷰 (Hier-Decomp-1 c2 choke point).
# baseline retrieval 은 이 뷰만 본다 → in_corpus=false(비활성 hier leaf 등) 자동 제외.
# corpus_chunks_{prehier,hier_sim_raw,hier_sim_clean} = Hier-Replace-Diagnose-1 측정 전용 뷰.
_VALID_CHUNKS_TABLE = re.compile(
r"^(document_chunks|corpus_chunks|corpus_chunks_(?:prehier|hier_sim_raw|hier_sim_clean)"
r"|document_chunks_cand_[a-z0-9_]+)$"
)
# Hier-Replace-Diagnose-1: corpus_variant slug → chunks view (baseline embedding path 한정).
# vector chunk leg 만 영향 (doc-level + fts/trgm 는 documents 테이블 = 변종 무관).
CORPUS_VARIANT_MAP: dict[str, str] = {
"prehier": "corpus_chunks_prehier",
"hier_sim_raw": "corpus_chunks_hier_sim_raw",
"hier_sim_clean": "corpus_chunks_hier_sim_clean",
}
def _resolve_corpus_variant(slug: str | None) -> str | None:
"""corpus_variant slug → 측정 뷰 명 | None(production corpus_chunks).
Raises ValueError on unknown slug (caller → HTTP 400)."""
if slug is None:
return None
if slug not in CORPUS_VARIANT_MAP:
raise ValueError(f"unknown_corpus_variant: {slug!r}")
return CORPUS_VARIANT_MAP[slug]
async def _apply_exact_knn(session: AsyncSession) -> None:
"""eval 전용: 현 트랜잭션에 ivfflat 근사 비활성 (seqscan exact KNN).
prehier(legacy, ivfflat 보유) vs hier_sim(미색인) 의 index 변수 제거 = 청킹만 분리.
SET LOCAL = 트랜잭션 scope, 비영구. production path 는 호출 안 함."""
await session.execute(text("SET LOCAL enable_indexscan = off"))
await session.execute(text("SET LOCAL enable_bitmapscan = off"))
def _resolve_backend(slug: str | None) -> dict[str, str] | None:
"""slug → (docs_table, chunks_table, embed_endpoint) | None (baseline).
Raises ValueError on unknown slug (caller 가 HTTP 400 으로 translate).
"""
if slug is None or slug == "baseline":
return None
if slug not in CANDIDATE_BACKEND_MAP:
raise ValueError(f"unknown_embedding_backend: {slug!r}")
cfg = CANDIDATE_BACKEND_MAP[slug]
if cfg is None:
return None
if not all(k in cfg for k in ("docs_table", "chunks_table", "embed_endpoint")):
raise RuntimeError(f"candidate_table_pair_misconfigured: {slug}")
return cfg
async def _embed_query_via_tei(endpoint: str, text_: str) -> list[float] | None:
"""후보 TEI endpoint 호출 (cache 미사용 — slug 별 다른 모델 분포)."""
if not text_:
return None
import httpx
try:
async with httpx.AsyncClient(timeout=30.0) as c:
r = await c.post(endpoint, json={"inputs": [text_], "truncate": True})
r.raise_for_status()
data = r.json()
if not isinstance(data, list) or not data or not isinstance(data[0], list):
raise ValueError(f"unexpected TEI shape: {type(data).__name__}")
return data[0]
except Exception as exc:
logger.warning("candidate TEI embed failed endpoint=%s err=%r", endpoint, exc)
return None
async def _embed_query_via_ollama(cfg: dict, text_: str) -> list[float] | None:
"""Phase 2A 후보 쿼리 임베딩 — Ollama /api/embed + 비대칭 instruct prefix.
쿼리 측 전용: QWEN3_QUERY_INSTRUCT 를 선두에 붙인다 (문서 적재 = plain).
embed_dimensions 지정(qwen4m) 시 Ollama dimensions 옵션 = MRL truncate+재정규화
(G-1 fixture: 1024 출력 L2=1.0 실측). cache 미사용 — slug 별 분포 상이.
"""
if not text_:
return None
import httpx
body: dict = {"model": cfg["embed_model"], "input": [QWEN3_QUERY_INSTRUCT + text_]}
if cfg.get("embed_dimensions"):
body["dimensions"] = cfg["embed_dimensions"]
try:
async with httpx.AsyncClient(timeout=60.0) as c:
r = await c.post(cfg["embed_endpoint"], json=body)
r.raise_for_status()
embs = r.json().get("embeddings")
if not isinstance(embs, list) or not embs or not isinstance(embs[0], list):
raise ValueError("unexpected /api/embed shape")
return embs[0]
except Exception as exc:
logger.warning(
"candidate ollama embed failed model=%s err=%r", cfg.get("embed_model"), exc
)
return None
def _query_embed_key(text_: str) -> str:
return hashlib.sha256(f"{text_}|bge-m3".encode("utf-8")).hexdigest()
async def _get_query_embedding(
client: AIClient, text_: str
) -> list[float] | None:
"""Query embedding with in-memory cache.
동일 텍스트 재호출 시 bge-m3 skip. fixed query 회귀 시 vector_ms 대폭 감소.
"""
if not text_:
return None
key = _query_embed_key(text_)
entry = _QUERY_EMBED_CACHE.get(key)
if entry and time.time() - entry["ts"] < QUERY_EMBED_TTL:
return entry["emb"]
try:
emb = await client.embed(text_)
except Exception as exc:
logger.warning("query embed failed text=%r err=%r", text_[:40], exc)
return None
if len(_QUERY_EMBED_CACHE) >= QUERY_EMBED_MAXSIZE:
try:
oldest = next(iter(_QUERY_EMBED_CACHE))
_QUERY_EMBED_CACHE.pop(oldest, None)
except StopIteration:
pass
_QUERY_EMBED_CACHE[key] = {"emb": emb, "ts": time.time()}
return emb
def query_embed_cache_stats() -> dict[str, int]:
return {"size": len(_QUERY_EMBED_CACHE), "maxsize": QUERY_EMBED_MAXSIZE}
async def search_text(
session: AsyncSession, query: str, limit: int, *, axis: "AxisFilter | None" = None
) -> list["SearchResult"]:
"""FTS + trigram 필드별 가중치 검색 (Phase 1.2-B UNION 분해).
Phase 1.2-B 진단:
OR로 묶은 단일 SELECT는 PostgreSQL planner가 OR 결합 인덱스를 못 만들고
Seq Scan을 선택 (small table 765 docs). EXPLAIN으로 측정 시 525ms.
→ CTE + UNION으로 분해하면 각 branch가 자기 인덱스 활용 → 26ms (95% 감소).
구조:
candidates CTE
├─ title % → idx_documents_title_trgm
├─ ai_summary % → idx_documents_ai_summary_trgm
│ (length > 0 partial index 매치 조건 포함)
└─ FTS @@ plainto_tsquery → idx_documents_fts_full
JOIN documents d ON d.id = c.id
ORDER BY 5컬럼 similarity 가중 합산 + ts_rank * 2.0
가중치: title 3.0 / ai_tags 2.5 / user_note 2.0 / ai_summary 1.5 / extracted_text 1.0
threshold:
pg_trgm.similarity_threshold default = 0.3
→ multi-token 한국어 뉴스 쿼리(예: "이란 미국 전쟁 글로벌 반응")에서
candidates를 못 모음 → recall 감소 (0.788 → 0.750)
→ set_limit(0.15)으로 낮춰 recall 회복. precision은 ORDER BY similarity 합산이 보정.
"""
from api.search import SearchResult # 순환 import 회피
# trigram threshold를 0.15로 낮춰 multi-token query recall 회복
# SQLAlchemy async session 내 두 execute는 같은 connection 사용
await session.execute(text("SELECT set_limit(0.15)"))
_params: dict[str, Any] = {"q": query, "limit": limit}
# license(항상) + axis(조건부). license 가 항상 ' AND ...' 이라 WHERE 는 늘 존재.
_where = _license_sql("d") + _axis_sql("d", axis, _params)
result = await session.execute(
text(f"""
WITH candidates AS (
-- title trigram (idx_documents_title_trgm)
SELECT id FROM documents
WHERE deleted_at IS NULL AND title % :q
UNION
-- ai_summary trigram (idx_documents_ai_summary_trgm 부분 인덱스 매치)
SELECT id FROM documents
WHERE deleted_at IS NULL
AND ai_summary IS NOT NULL
AND length(ai_summary) > 0
AND ai_summary % :q
UNION
-- FTS 통합 인덱스 (idx_documents_fts_full)
SELECT id FROM documents
WHERE deleted_at IS NULL
AND to_tsvector('simple',
coalesce(title, '') || ' ' ||
coalesce(ai_tags::text, '') || ' ' ||
coalesce(ai_summary, '') || ' ' ||
coalesce(user_note, '') || ' ' ||
coalesce(extracted_text, '')
) @@ plainto_tsquery('simple', :q)
)
SELECT d.id, d.title, d.ai_domain, d.ai_summary, d.file_format,
left(d.extracted_text, 1200) AS snippet,
(
-- 컬럼별 trigram similarity 가중 합산
similarity(coalesce(d.title, ''), :q) * 3.0
+ similarity(coalesce(d.ai_tags::text, ''), :q) * 2.5
+ similarity(coalesce(d.user_note, ''), :q) * 2.0
+ similarity(coalesce(d.ai_summary, ''), :q) * 1.5
+ similarity(left(coalesce(d.extracted_text, ''), 2000), :q) * 1.0
-- FTS 보너스 (idx_documents_fts_full 활용)
+ coalesce(ts_rank(
to_tsvector('simple',
coalesce(d.title, '') || ' ' ||
coalesce(d.ai_tags::text, '') || ' ' ||
coalesce(d.ai_summary, '') || ' ' ||
coalesce(d.user_note, '') || ' ' ||
left(coalesce(d.extracted_text, ''), 2000)
),
plainto_tsquery('simple', :q)
), 0) * 2.0
) AS score,
-- match_reason: similarity 가장 큰 컬럼 또는 FTS
CASE
WHEN similarity(coalesce(d.title, ''), :q) >= 0.3 THEN 'title'
WHEN similarity(coalesce(d.ai_tags::text, ''), :q) >= 0.3 THEN 'tags'
WHEN similarity(coalesce(d.user_note, ''), :q) >= 0.3 THEN 'note'
WHEN similarity(coalesce(d.ai_summary, ''), :q) >= 0.3 THEN 'summary'
WHEN similarity(left(coalesce(d.extracted_text, ''), 2000), :q) >= 0.3 THEN 'content'
ELSE 'fts'
END AS match_reason,
d.material_type, d.jurisdiction, d.published_date
FROM documents d
JOIN candidates c ON d.id = c.id
WHERE{_where[4:]}
ORDER BY score DESC
LIMIT :limit
"""),
_params,
)
return [SearchResult(**row._mapping) for row in result]
async def search_vector(
session: AsyncSession,
query: str,
limit: int,
*,
embedding_backend: str | None = None,
snapshot_doc_id_max: int | None = None,
snapshot_chunk_id_max: int | None = None,
corpus_variant: str | None = None,
exact_knn: bool = False,
axis: "AxisFilter | None" = None,
) -> list["SearchResult"]:
"""Hybrid 벡터 검색 — doc + chunks 동시 retrieval (Phase 1.2-G).
Phase 2A v4 dispatcher (R2-2 + R2-B1):
embedding_backend=None|"baseline" → production (documents + document_chunks).
snapshot_*_id_max 지정 시 baseline 도 동일 filter (rebaseline measurement).
embedding_backend=cand_<slug> → CANDIDATE_BACKEND_MAP 에서 페어 resolve.
cand 테이블 자체가 snapshot 범위로 INSERT → snapshot filter 무시 (dispatch log 만 박제).
Hier-Replace-Diagnose-1 (baseline embedding path 한정, eval 전용):
corpus_variant=prehier|hier_sim_raw|hier_sim_clean → chunk leg 만 측정 뷰로 교체
(doc-level + fts/trgm 는 documents = 변종 무관). embedding_backend cand 와 동시 X.
exact_knn=True → vector leg 에 SET LOCAL enable_indexscan/bitmapscan=off
(ivfflat 근사 제거 = 청킹 전략만 분리). production path 절대 미적용.
데이터 흐름:
1. query embedding 1번 (baseline=bge-m3 cache / cand=TEI endpoint no-cache)
2. asyncio.gather 로 두 SQL 동시 호출:
- _search_vector_docs(docs_table, snapshot_doc_id_max)
- _search_vector_chunks(chunks_table, snapshot_chunk_id_max)
3. _merge_doc_and_chunk_vectors 가중치 + dedup (chunk 1.2 / doc 1.0).
"""
cfg = _resolve_backend(embedding_backend)
variant_table = _resolve_corpus_variant(corpus_variant)
if variant_table is not None and cfg is not None:
raise ValueError("corpus_variant_incompatible_with_embedding_backend")
if cfg is None:
docs_table = "documents"
# Hier-Decomp-1 c2: baseline chunk 검색은 corpus_chunks 뷰(in_corpus=true) 경유.
# Hier-Replace-Diagnose-1: corpus_variant 지정 시 측정 뷰로 교체 (chunk leg 한정).
chunks_table = variant_table or "corpus_chunks"
client = AIClient()
try:
query_embedding = await _get_query_embedding(client, query)
finally:
try:
await client.close()
except Exception:
pass
else:
docs_table = cfg["docs_table"]
chunks_table = cfg["chunks_table"]
if cfg.get("embed_kind") == "ollama":
query_embedding = await _embed_query_via_ollama(cfg, query)
else:
query_embedding = await _embed_query_via_tei(cfg["embed_endpoint"], query)
logger.info(
"[embedding-dispatch] backend=%s docs_table=%s chunks_table=%s snapshot_doc_id_max=%s "
"snapshot_chunk_id_max=%s corpus_variant=%s exact_knn=%s",
embedding_backend or "baseline",
docs_table,
chunks_table,
snapshot_doc_id_max,
snapshot_chunk_id_max,
corpus_variant or "none",
exact_knn,
)
if query_embedding is None:
return []
embedding_str = str(query_embedding)
Session = async_sessionmaker(engine)
async def _docs_call() -> list["SearchResult"]:
async with Session() as s:
return await _search_vector_docs(
s, embedding_str, limit * 4,
docs_table=docs_table,
snapshot_doc_id_max=snapshot_doc_id_max,
exact_knn=exact_knn,
axis=axis,
)
async def _chunks_call() -> list["SearchResult"]:
async with Session() as s:
return await _search_vector_chunks(
s, embedding_str, limit * 4,
chunks_table=chunks_table,
snapshot_chunk_id_max=snapshot_chunk_id_max,
exact_knn=exact_knn,
axis=axis,
)
doc_results, chunk_results = await asyncio.gather(_docs_call(), _chunks_call())
return _merge_doc_and_chunk_vectors(doc_results, chunk_results)
async def _search_vector_docs(
session: AsyncSession,
embedding_str: str,
limit: int,
*,
docs_table: str = "documents",
snapshot_doc_id_max: int | None = None,
exact_knn: bool = False,
axis: "AxisFilter | None" = None,
) -> list["SearchResult"]:
"""documents (또는 documents_cand_<slug>).embedding 직접 검색.
docs_table = "documents": production path. snapshot_doc_id_max 지정 시 id <= max filter.
docs_table = "documents_cand_<slug>": 후보 path. cand 테이블이 이미 snapshot 범위로 INSERT됨 →
snapshot_doc_id_max 무시. metadata 는 production documents 와 JOIN.
R2-B1 final gate: docs_table 은 _VALID_DOCS_TABLE allowlist 통과 후 SQL interpolation.
"""
from api.search import SearchResult # 순환 import 회피
if not _VALID_DOCS_TABLE.match(docs_table):
raise RuntimeError(f"invalid_docs_table: {docs_table!r}")
if exact_knn:
await _apply_exact_knn(session)
params: dict[str, Any] = {"embedding": embedding_str, "limit": limit}
if docs_table == "documents":
snapshot_clause = ""
if snapshot_doc_id_max is not None:
snapshot_clause = " AND id <= :snapshot_doc_id_max"
params["snapshot_doc_id_max"] = snapshot_doc_id_max
axis_clause = _axis_sql("", axis, params) # alias 없음 (단일 FROM documents)
license_clause = _license_sql("") # B-4: restricted 항상 제외
sql = f"""
SELECT id, title, ai_domain, ai_summary, file_format,
(1 - (embedding <=> cast(:embedding AS vector))) AS score,
left(extracted_text, 1200) AS snippet,
'vector_doc' AS match_reason,
NULL::bigint AS chunk_id, NULL::integer AS chunk_index, NULL::text AS section_title,
material_type, jurisdiction, published_date
FROM documents
WHERE embedding IS NOT NULL AND deleted_at IS NULL{snapshot_clause}{axis_clause}{license_clause}
ORDER BY embedding <=> cast(:embedding AS vector)
LIMIT :limit
"""
else:
# candidate: docs_table 은 (doc_id, embed_input, embed_input_hash, embedding) 만 보유 → JOIN documents
axis_clause = _axis_sql("d", axis, params)
license_clause = _license_sql("d") # B-4: restricted 항상 제외
sql = f"""
SELECT d.id, d.title, d.ai_domain, d.ai_summary, d.file_format,
(1 - (c.embedding <=> cast(:embedding AS vector))) AS score,
left(d.extracted_text, 1200) AS snippet,
'vector_doc' AS match_reason,
NULL::bigint AS chunk_id, NULL::integer AS chunk_index, NULL::text AS section_title,
d.material_type, d.jurisdiction, d.published_date
FROM {docs_table} c
JOIN documents d ON d.id = c.doc_id
WHERE d.deleted_at IS NULL{axis_clause}{license_clause}
ORDER BY c.embedding <=> cast(:embedding AS vector)
LIMIT :limit
"""
result = await session.execute(text(sql), params)
return [SearchResult(**row._mapping) for row in result]
async def _search_vector_chunks(
session: AsyncSession,
embedding_str: str,
limit: int,
*,
chunks_table: str = "document_chunks",
snapshot_chunk_id_max: int | None = None,
exact_knn: bool = False,
axis: "AxisFilter | None" = None,
) -> list["SearchResult"]:
"""document_chunks (또는 document_chunks_cand_<slug>).embedding window partition.
chunks_table = "document_chunks": production path. snapshot_chunk_id_max 지정 시 c.id <= max filter.
chunks_table = "document_chunks_cand_<slug>": cand 테이블 (이미 snapshot 범위로 INSERT) → filter 무시.
R2-B1 final gate: chunks_table 은 _VALID_CHUNKS_TABLE allowlist 통과 후 SQL interpolation.
"""
from api.search import SearchResult # 순환 import 회피
if not _VALID_CHUNKS_TABLE.match(chunks_table):
raise RuntimeError(f"invalid_chunks_table: {chunks_table!r}")
if exact_knn:
await _apply_exact_knn(session)
inner_k = max(limit * 5, 500)
params: dict[str, Any] = {"embedding": embedding_str, "inner_k": inner_k, "limit": limit}
snapshot_clause = ""
if (chunks_table in ("document_chunks", "corpus_chunks")
or chunks_table in CORPUS_VARIANT_MAP.values()) and snapshot_chunk_id_max is not None:
snapshot_clause = " AND c.id <= :snapshot_chunk_id_max"
params["snapshot_chunk_id_max"] = snapshot_chunk_id_max
# C-1: axis 필터는 inner topk 에 JOIN (R6 결정 — outer post-filter 면 ANN top-:inner_k
# 후보를 뽑은 뒤 거르므로 좁은 필터(GB 법령 등)에서 후보 붕괴). 미지정 시 JOIN 없음 = byte 불변.
if axis and axis.active():
chunk_join = " JOIN documents df ON df.id = c.doc_id"
chunk_axis = _axis_sql("df", axis, params)
else:
chunk_join = ""
chunk_axis = ""
# B-4: restricted 제외 — outer 가 documents d 를 항상 JOIN 하므로 post-rank 위치.
# restricted 는 소수(구매자료)라 inner topk 후 제외해도 candidate collapse 없음(axis 와 상이).
license_clause = _license_sql("d")
sql = f"""
WITH topk AS (
SELECT c.id AS chunk_id, c.doc_id, c.chunk_index, c.section_title, c.text,
c.embedding <=> cast(:embedding AS vector) AS dist
FROM {chunks_table} c{chunk_join}
WHERE c.embedding IS NOT NULL{snapshot_clause}{chunk_axis}
ORDER BY c.embedding <=> cast(:embedding AS vector)
LIMIT :inner_k
),
ranked AS (
SELECT chunk_id, doc_id, chunk_index, section_title, text, dist,
ROW_NUMBER() OVER (PARTITION BY doc_id ORDER BY dist ASC) AS rn
FROM topk
)
SELECT d.id AS id, d.title AS title, d.ai_domain AS ai_domain,
d.ai_summary AS ai_summary, d.file_format AS file_format,
(1 - r.dist) AS score, left(r.text, 1200) AS snippet,
'vector_chunk' AS match_reason,
r.chunk_id AS chunk_id, r.chunk_index AS chunk_index, r.section_title AS section_title,
d.material_type AS material_type, d.jurisdiction AS jurisdiction,
d.published_date AS published_date
FROM ranked r
JOIN documents d ON d.id = r.doc_id
WHERE r.rn <= 2 AND d.deleted_at IS NULL{license_clause}
ORDER BY r.dist
LIMIT :limit
"""
result = await session.execute(text(sql), params)
return [SearchResult(**row._mapping) for row in result]
def _merge_doc_and_chunk_vectors(
doc_results: list["SearchResult"],
chunk_results: list["SearchResult"],
) -> list["SearchResult"]:
"""doc + chunks vector 결과 merge (Phase 1.2-G).
가중치:
- chunk score * 1.2 (segment 매칭이 더 정확)
- doc score * 1.0 (전체 본문 평균, recall 보완)
Dedup:
- doc_id 기준
- chunks가 있으면 chunks 우선 (segment 정보 + chunk_id 보존)
- chunks에 없는 doc은 doc-wrap으로 추가
Returns:
score 내림차순 정렬된 SearchResult 리스트.
chunk_id가 None이면 doc-wrap 결과(text-only 매치 doc 처리에 사용).
"""
by_doc_id: dict[int, "SearchResult"] = {}
# chunks 먼저 (가중치 적용 + chunk_id 보존)
for c in chunk_results:
c.score = c.score * CHUNK_VECTOR_WEIGHT
prev = by_doc_id.get(c.id)
if prev is None or c.score > prev.score:
by_doc_id[c.id] = c
# doc 매치는 chunks에 없는 doc만 추가 (chunks 우선 원칙)
for d in doc_results:
d.score = d.score * DOC_VECTOR_WEIGHT
if d.id not in by_doc_id:
by_doc_id[d.id] = d
# score 내림차순 정렬
return sorted(by_doc_id.values(), key=lambda r: r.score, reverse=True)
async def search_vector_multilingual(
session: AsyncSession,
normalized_queries: list[dict],
limit: int,
*,
embedding_backend: str | None = None,
snapshot_doc_id_max: int | None = None,
snapshot_chunk_id_max: int | None = None,
corpus_variant: str | None = None,
exact_knn: bool = False,
) -> list["SearchResult"]:
"""Phase 2.2 — 다국어 normalized_queries 배열로 vector retrieval.
각 language query에 대해 embedding을 병렬 생성(cache hit 활용),
각 embedding에 대해 기존 docs+chunks hybrid 호출,
결과를 weight 기반으로 merge.
⚠️ 호출 조건:
- QueryAnalyzer cache hit 이어야 함 (async-only 룰)
- analyzer_confidence 높고 normalized_queries 존재해야 함
- search.py에서만 호출. retrieval 경로 동기 LLM 호출 금지 룰 준수.
Args:
session: AsyncSession (호출자 관리, 본 함수 내부는 sessionmaker로 별도 연결 사용)
normalized_queries: [{"lang": "ko", "text": "...", "weight": 0.56}, ...]
weight는 _normalize_weights로 이미 합=1.0 정규화된 상태.
limit: 상위 결과 개수
Returns:
list[SearchResult] — doc_id 중복 제거. merged score = sum(per-query score * lang_weight).
"""
if not normalized_queries:
return []
# 1. 각 lang별 embedding 병렬 (baseline=AIClient.embed cache / cand=TEI endpoint no-cache)
_cfg_for_embed = _resolve_backend(embedding_backend)
if _cfg_for_embed is None:
client = AIClient()
try:
embed_tasks = [
_get_query_embedding(client, q["text"]) for q in normalized_queries
]
embeddings = await asyncio.gather(*embed_tasks)
finally:
try:
await client.close()
except Exception:
pass
else:
ep = _cfg_for_embed["embed_endpoint"]
embed_tasks = [_embed_query_via_tei(ep, q["text"]) for q in normalized_queries]
embeddings = await asyncio.gather(*embed_tasks)
# embedding 실패한 query는 skip (weight 재정규화 없이 조용히 drop)
per_query_plan: list[tuple[dict, str]] = []
for q, emb in zip(normalized_queries, embeddings):
if emb is None:
logger.warning("multilingual embed skipped lang=%s", q.get("lang"))
continue
per_query_plan.append((q, str(emb)))
if not per_query_plan:
return []
# 2. multilingual dispatcher resolve (모든 lang query 가 동일 backend 사용)
cfg = _resolve_backend(embedding_backend)
variant_table = _resolve_corpus_variant(corpus_variant)
if variant_table is not None and cfg is not None:
raise ValueError("corpus_variant_incompatible_with_embedding_backend")
docs_table = cfg["docs_table"] if cfg else "documents"
chunks_table = cfg["chunks_table"] if cfg else (variant_table or "document_chunks")
logger.info(
"[embedding-dispatch] backend=%s docs_table=%s chunks_table=%s snapshot_doc_id_max=%s "
"snapshot_chunk_id_max=%s corpus_variant=%s exact_knn=%s multilingual=true",
embedding_backend or "baseline",
docs_table,
chunks_table,
snapshot_doc_id_max,
snapshot_chunk_id_max,
corpus_variant or "none",
exact_knn,
)
# 3. 각 embedding에 대해 doc + chunks 병렬 retrieval
Session = async_sessionmaker(engine)
async def _one_query(q_meta: dict, embedding_str: str) -> list["SearchResult"]:
async def _docs() -> list["SearchResult"]:
async with Session() as s:
return await _search_vector_docs(
s, embedding_str, limit * 4,
docs_table=docs_table,
snapshot_doc_id_max=snapshot_doc_id_max,
exact_knn=exact_knn,
)
async def _chunks() -> list["SearchResult"]:
async with Session() as s:
return await _search_vector_chunks(
s, embedding_str, limit * 4,
chunks_table=chunks_table,
snapshot_chunk_id_max=snapshot_chunk_id_max,
exact_knn=exact_knn,
)
doc_r, chunk_r = await asyncio.gather(_docs(), _chunks())
return _merge_doc_and_chunk_vectors(doc_r, chunk_r)
per_query_results = await asyncio.gather(
*(_one_query(q, emb_str) for q, emb_str in per_query_plan)
)
# 3. weight 기반 merge — doc_id 중복 시 weighted score 합산
merged: dict[int, "SearchResult"] = {}
for (q_meta, _emb_str), results in zip(per_query_plan, per_query_results):
weight = float(q_meta.get("weight", 1.0) or 1.0)
for r in results:
weighted = r.score * weight
prev = merged.get(r.id)
if prev is None:
# 첫 방문: 원본을 shallow copy 대신 직접 wrap
r.score = weighted
r.match_reason = f"ml_{q_meta.get('lang', '?')}"
merged[r.id] = r
else:
# 중복: score 누적, 가장 높은 weight 소스로 match_reason 표시
prev.score += weighted
# match_reason 병합 (가독성)
if q_meta.get("lang") and q_meta.get("lang") not in (prev.match_reason or ""):
prev.match_reason = (prev.match_reason or "ml") + f"+{q_meta['lang']}"
sorted_results = sorted(merged.values(), key=lambda r: r.score, reverse=True)
return sorted_results[: limit * 4] # rerank 후보로 넉넉히
def compress_chunks_to_docs(
chunks: list["SearchResult"], limit: int
) -> tuple[list["SearchResult"], dict[int, list["SearchResult"]]]:
"""chunk-level 결과를 doc-level로 압축하면서 raw chunks를 보존.
fusion은 doc 기준이어야 하지만(같은 doc 중복 방지), Phase 1.3 reranker는
chunk 기준 raw 데이터가 필요함. 따라서 압축본과 raw를 동시 반환.
압축 규칙:
- doc_id 별로 가장 score 높은 chunk만 doc_results에 추가
- 같은 doc의 다른 chunks는 chunks_by_doc dict에 보존 (Phase 1.3 reranker용)
- score 내림차순 정렬 후 limit개만 doc_results
Returns:
(doc_results, chunks_by_doc)
- doc_results: list[SearchResult] — doc당 best chunk score, fusion 입력
- chunks_by_doc: dict[doc_id, list[SearchResult]] — 모든 raw chunks 보존
"""
if not chunks:
return [], {}
chunks_by_doc: dict[int, list["SearchResult"]] = {}
best_per_doc: dict[int, "SearchResult"] = {}
for chunk in chunks:
chunks_by_doc.setdefault(chunk.id, []).append(chunk)
prev_best = best_per_doc.get(chunk.id)
if prev_best is None or chunk.score > prev_best.score:
best_per_doc[chunk.id] = chunk
# doc 단위 best score 정렬, 상위 limit개
doc_results = sorted(best_per_doc.values(), key=lambda r: r.score, reverse=True)
return doc_results[:limit], chunks_by_doc