feat(search): Phase 1.2-G hybrid retrieval (doc + chunks)

Phase 1.2-C 평가셋: chunks-only Recall 0.788 → 0.660 catastrophic.
ivfflat probes 1 → 10 → 20 진단 결과 잔여 차이는 chunks vs docs embedding의
본질적 차이 (segment 매칭 vs 전체 본문 평균).

해결: doc + chunks hybrid retrieval (정석).

신규 구조:
- search_vector(): 두 SQL을 asyncio.gather로 병렬 호출
- _search_vector_docs(): documents.embedding cosine top N (recall robust)
- _search_vector_chunks(): document_chunks.embedding window partition
  (doc당 top 2 chunks, ivfflat top inner_k 후 ROW_NUMBER PARTITION)
- _merge_doc_and_chunk_vectors(): 가중치 + dedup
  - chunk score * 1.2 (segment 매칭 더 정확)
  - doc score * 1.0 (recall 보완)
  - doc_id 기준 dedup, chunks 우선

데이터 흐름:
  1. query embedding 1번 (bge-m3)
  2. asyncio.gather([_docs_call(), _chunks_call()])
  3. _merge_doc_and_chunk_vectors → list[SearchResult]
  4. compress_chunks_to_docs (그대로 사용)
  5. fusion (그대로)
  6. (Phase 1.3) chunks_by_doc 회수 → reranker

검증 게이트 (회복 목표):
- Recall@10 ≥ 0.75 (baseline 0.788 - 0.04 이내)
- unique_docs per query ≥ 8
- natural_language_ko Recall ≥ 0.65
- latency p95 < 250ms
This commit is contained in:
Hyungi Ahn
2026-04-08 13:02:23 +09:00
parent 2cfe4b126a
commit 2ca67dacea

View File

@@ -1,26 +1,37 @@
"""검색 후보 수집 서비스 (Phase 1.2). """검색 후보 수집 서비스 (Phase 1.2).
text(documents FTS + trigram) + vector(documents.embedding chunks) 후보를 text(documents FTS + trigram) + vector(documents.embedding + chunks.embedding hybrid) 후보를
SearchResult 리스트로 반환. SearchResult 리스트로 반환.
Phase 1.1a: search.py의 _search_text/_search_vector를 이전 (ILIKE 그대로). Phase 1.1a: search.py의 _search_text/_search_vector를 이전 (ILIKE 그대로).
Phase 1.2-B: ILIKE → trigram `%` + `similarity()`. ILIKE 풀 스캔 제거. Phase 1.2-B: ILIKE → trigram `%` + `similarity()`. ILIKE 풀 스캔 제거.
Phase 1.2-B 이후: vector retrieval을 document_chunks 테이블 기반으로 전환. Phase 1.2-C: vector retrieval을 document_chunks 테이블로 전환 → catastrophic recall 손실.
Phase 1.2-G: doc + chunks hybrid retrieval 보강.
- documents.embedding (recall robust, 자연어 매칭 강함)
- document_chunks.embedding (precision, segment 매칭)
- 두 SQL 동시 호출 후 doc_id 기준 merge (chunk 가중치 1.2, doc 1.0)
""" """
from __future__ import annotations from __future__ import annotations
import asyncio
from typing import TYPE_CHECKING from typing import TYPE_CHECKING
from sqlalchemy import text from sqlalchemy import text
from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker
from ai.client import AIClient from ai.client import AIClient
from core.database import engine
if TYPE_CHECKING: if TYPE_CHECKING:
from api.search import SearchResult from api.search import SearchResult
# Hybrid merge 가중치 (1.2-G)
DOC_VECTOR_WEIGHT = 1.0
CHUNK_VECTOR_WEIGHT = 1.2
async def search_text( async def search_text(
session: AsyncSession, query: str, limit: int session: AsyncSession, query: str, limit: int
) -> list["SearchResult"]: ) -> list["SearchResult"]:
@@ -121,27 +132,27 @@ async def search_text(
async def search_vector( async def search_vector(
session: AsyncSession, query: str, limit: int session: AsyncSession, query: str, limit: int
) -> list["SearchResult"]: ) -> list["SearchResult"]:
"""벡터 유사도 검색 — chunk-level + doc 다양성 보장 (Phase 1.2-C). """Hybrid 벡터 검색 — doc + chunks 동시 retrieval (Phase 1.2-G).
Phase 1.2-C 진단: Phase 1.2-C 진단:
단순 chunk top-N 가져오면 같은 doc의 여러 chunks가 상위에 몰려 chunks-only는 segment 의미 손실로 자연어 query에서 catastrophic recall.
unique doc 다양성 붕괴 → recall 0.788 → 0.531 (catastrophic). doc embedding은 전체 본문 평균 → recall robust.
→ 두 retrieval 동시 사용이 정석.
해결 (사용자 추천 C 방식): 데이터 흐름:
Window function으로 doc_id 기준 PARTITION → 각 doc의 top 2 chunks만 반환. 1. query embedding 1번 (bge-m3)
raw_chunks(chunks_by_doc 보존)와 doc-level 압축 둘 다 만족. 2. asyncio.gather로 두 SQL 동시 호출:
- _search_vector_docs: documents.embedding cosine top N
SQL 흐름: - _search_vector_chunks: document_chunks.embedding window partition (doc당 top 2)
1. inner CTE: ivfflat 인덱스로 top-K chunks 빠르게 추출 3. _merge_doc_and_chunk_vectors로 가중치 + dedup:
2. ranked CTE: doc_id PARTITION 후 score 내림차순 ROW_NUMBER - chunk score * 1.2 (precision)
3. outer: rn <= 2 (doc당 max 2 chunks) + JOIN documents - doc score * 1.0 (recall)
- doc_id 기준 dedup, chunks 우선
Returns: Returns:
list[SearchResult] — chunk-level, 각 doc 최대 2개. compress_chunks_to_docs list[SearchResult] — doc_id 중복 제거됨. compress_chunks_to_docs는 그대로 동작.
doc-level 압축 + chunks_by_doc 보존. chunks_by_doc은 search.py에서 group_by_doc으로 보존.
""" """
from api.search import SearchResult # 순환 import 회피
try: try:
client = AIClient() client = AIClient()
query_embedding = await client.embed(query) query_embedding = await client.embed(query)
@@ -149,9 +160,71 @@ async def search_vector(
except Exception: except Exception:
return [] return []
# ivfflat 인덱스로 top-K chunks 추출 후 doc 단위 partition embedding_str = str(query_embedding)
# inner_k = limit * 10 정도로 충분 unique doc 확보 (~30~50 docs)
inner_k = max(limit * 10, 200) # 두 SQL 병렬 호출 — 각각 별도 session 사용 (asyncpg connection은 statement 단위 직렬)
Session = async_sessionmaker(engine)
async def _docs_call() -> list["SearchResult"]:
async with Session() as s:
return await _search_vector_docs(s, embedding_str, limit * 4)
async def _chunks_call() -> list["SearchResult"]:
async with Session() as s:
return await _search_vector_chunks(s, embedding_str, limit * 4)
doc_results, chunk_results = await asyncio.gather(_docs_call(), _chunks_call())
return _merge_doc_and_chunk_vectors(doc_results, chunk_results)
async def _search_vector_docs(
session: AsyncSession, embedding_str: str, limit: int
) -> list["SearchResult"]:
"""documents.embedding 직접 검색 — recall robust (자연어 매칭).
chunks가 없는 doc도 매칭 가능. score는 cosine similarity (1 - distance).
chunk_id/chunk_index/section_title은 None.
"""
from api.search import SearchResult # 순환 import 회피
result = await session.execute(
text("""
SELECT
id,
title,
ai_domain,
ai_summary,
file_format,
(1 - (embedding <=> cast(:embedding AS vector))) AS score,
left(extracted_text, 200) AS snippet,
'vector_doc' AS match_reason,
NULL::bigint AS chunk_id,
NULL::integer AS chunk_index,
NULL::text AS section_title
FROM documents
WHERE embedding IS NOT NULL AND deleted_at IS NULL
ORDER BY embedding <=> cast(:embedding AS vector)
LIMIT :limit
"""),
{"embedding": embedding_str, "limit": limit},
)
return [SearchResult(**row._mapping) for row in result]
async def _search_vector_chunks(
session: AsyncSession, embedding_str: str, limit: int
) -> list["SearchResult"]:
"""document_chunks.embedding 검색 + window partition (doc당 top 2 chunks).
SQL 흐름:
1. inner CTE topk: ivfflat 인덱스로 top-K chunks 추출
2. ranked CTE: doc_id PARTITION + ROW_NUMBER (score 내림차순)
3. outer: rn <= 2 (doc당 max 2 chunks) + JOIN documents
"""
from api.search import SearchResult # 순환 import 회피
inner_k = max(limit * 5, 500)
result = await session.execute( result = await session.execute(
text(""" text("""
WITH topk AS ( WITH topk AS (
@@ -181,7 +254,7 @@ async def search_vector(
d.file_format AS file_format, d.file_format AS file_format,
(1 - r.dist) AS score, (1 - r.dist) AS score,
left(r.text, 200) AS snippet, left(r.text, 200) AS snippet,
'vector' AS match_reason, 'vector_chunk' AS match_reason,
r.chunk_id AS chunk_id, r.chunk_id AS chunk_id,
r.chunk_index AS chunk_index, r.chunk_index AS chunk_index,
r.section_title AS section_title r.section_title AS section_title
@@ -191,11 +264,49 @@ async def search_vector(
ORDER BY r.dist ORDER BY r.dist
LIMIT :limit LIMIT :limit
"""), """),
{"embedding": str(query_embedding), "inner_k": inner_k, "limit": limit * 4}, {"embedding": embedding_str, "inner_k": inner_k, "limit": limit},
) )
return [SearchResult(**row._mapping) for row in result] return [SearchResult(**row._mapping) for row in result]
def _merge_doc_and_chunk_vectors(
doc_results: list["SearchResult"],
chunk_results: list["SearchResult"],
) -> list["SearchResult"]:
"""doc + chunks vector 결과 merge (Phase 1.2-G).
가중치:
- chunk score * 1.2 (segment 매칭이 더 정확)
- doc score * 1.0 (전체 본문 평균, recall 보완)
Dedup:
- doc_id 기준
- chunks가 있으면 chunks 우선 (segment 정보 + chunk_id 보존)
- chunks에 없는 doc은 doc-wrap으로 추가
Returns:
score 내림차순 정렬된 SearchResult 리스트.
chunk_id가 None이면 doc-wrap 결과(text-only 매치 doc 처리에 사용).
"""
by_doc_id: dict[int, "SearchResult"] = {}
# chunks 먼저 (가중치 적용 + chunk_id 보존)
for c in chunk_results:
c.score = c.score * CHUNK_VECTOR_WEIGHT
prev = by_doc_id.get(c.id)
if prev is None or c.score > prev.score:
by_doc_id[c.id] = c
# doc 매치는 chunks에 없는 doc만 추가 (chunks 우선 원칙)
for d in doc_results:
d.score = d.score * DOC_VECTOR_WEIGHT
if d.id not in by_doc_id:
by_doc_id[d.id] = d
# score 내림차순 정렬
return sorted(by_doc_id.values(), key=lambda r: r.score, reverse=True)
def compress_chunks_to_docs( def compress_chunks_to_docs(
chunks: list["SearchResult"], limit: int chunks: list["SearchResult"], limit: int
) -> tuple[list["SearchResult"], dict[int, list["SearchResult"]]]: ) -> tuple[list["SearchResult"], dict[int, list["SearchResult"]]]: