diff --git a/app/services/search/retrieval_service.py b/app/services/search/retrieval_service.py index e690ef6..8097a00 100644 --- a/app/services/search/retrieval_service.py +++ b/app/services/search/retrieval_service.py @@ -1,26 +1,37 @@ """검색 후보 수집 서비스 (Phase 1.2). -text(documents FTS + trigram) + vector(documents.embedding → chunks) 후보를 +text(documents FTS + trigram) + vector(documents.embedding + chunks.embedding hybrid) 후보를 SearchResult 리스트로 반환. Phase 1.1a: search.py의 _search_text/_search_vector를 이전 (ILIKE 그대로). Phase 1.2-B: ILIKE → trigram `%` + `similarity()`. ILIKE 풀 스캔 제거. -Phase 1.2-B 이후: vector retrieval을 document_chunks 테이블 기반으로 전환. +Phase 1.2-C: vector retrieval을 document_chunks 테이블로 전환 → catastrophic recall 손실. +Phase 1.2-G: doc + chunks hybrid retrieval 보강. + - documents.embedding (recall robust, 자연어 매칭 강함) + - document_chunks.embedding (precision, segment 매칭) + - 두 SQL 동시 호출 후 doc_id 기준 merge (chunk 가중치 1.2, doc 1.0) """ from __future__ import annotations +import asyncio from typing import TYPE_CHECKING from sqlalchemy import text -from sqlalchemy.ext.asyncio import AsyncSession +from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker from ai.client import AIClient +from core.database import engine if TYPE_CHECKING: from api.search import SearchResult +# Hybrid merge 가중치 (1.2-G) +DOC_VECTOR_WEIGHT = 1.0 +CHUNK_VECTOR_WEIGHT = 1.2 + + async def search_text( session: AsyncSession, query: str, limit: int ) -> list["SearchResult"]: @@ -121,27 +132,27 @@ async def search_text( async def search_vector( session: AsyncSession, query: str, limit: int ) -> list["SearchResult"]: - """벡터 유사도 검색 — chunk-level + doc 다양성 보장 (Phase 1.2-C). + """Hybrid 벡터 검색 — doc + chunks 동시 retrieval (Phase 1.2-G). Phase 1.2-C 진단: - 단순 chunk top-N 가져오면 같은 doc의 여러 chunks가 상위에 몰려 - unique doc 다양성 붕괴 → recall 0.788 → 0.531 (catastrophic). + chunks-only는 segment 의미 손실로 자연어 query에서 catastrophic recall. + doc embedding은 전체 본문 평균 → recall robust. + → 두 retrieval 동시 사용이 정석. - 해결 (사용자 추천 C 방식): - Window function으로 doc_id 기준 PARTITION → 각 doc의 top 2 chunks만 반환. - raw_chunks(chunks_by_doc 보존)와 doc-level 압축 둘 다 만족. - - SQL 흐름: - 1. inner CTE: ivfflat 인덱스로 top-K chunks 빠르게 추출 - 2. ranked CTE: doc_id PARTITION 후 score 내림차순 ROW_NUMBER - 3. outer: rn <= 2 (doc당 max 2 chunks) + JOIN documents + 데이터 흐름: + 1. query embedding 1번 (bge-m3) + 2. asyncio.gather로 두 SQL 동시 호출: + - _search_vector_docs: documents.embedding cosine top N + - _search_vector_chunks: document_chunks.embedding window partition (doc당 top 2) + 3. _merge_doc_and_chunk_vectors로 가중치 + dedup: + - chunk score * 1.2 (precision) + - doc score * 1.0 (recall) + - doc_id 기준 dedup, chunks 우선 Returns: - list[SearchResult] — chunk-level, 각 doc 최대 2개. compress_chunks_to_docs로 - doc-level 압축 + chunks_by_doc 보존. + list[SearchResult] — doc_id 중복 제거됨. compress_chunks_to_docs는 그대로 동작. + chunks_by_doc은 search.py에서 group_by_doc으로 보존. """ - from api.search import SearchResult # 순환 import 회피 - try: client = AIClient() query_embedding = await client.embed(query) @@ -149,9 +160,71 @@ async def search_vector( except Exception: return [] - # ivfflat 인덱스로 top-K chunks 추출 후 doc 단위 partition - # inner_k = limit * 10 정도로 충분 unique doc 확보 (~30~50 docs) - inner_k = max(limit * 10, 200) + embedding_str = str(query_embedding) + + # 두 SQL 병렬 호출 — 각각 별도 session 사용 (asyncpg connection은 statement 단위 직렬) + Session = async_sessionmaker(engine) + + async def _docs_call() -> list["SearchResult"]: + async with Session() as s: + return await _search_vector_docs(s, embedding_str, limit * 4) + + async def _chunks_call() -> list["SearchResult"]: + async with Session() as s: + return await _search_vector_chunks(s, embedding_str, limit * 4) + + doc_results, chunk_results = await asyncio.gather(_docs_call(), _chunks_call()) + + return _merge_doc_and_chunk_vectors(doc_results, chunk_results) + + +async def _search_vector_docs( + session: AsyncSession, embedding_str: str, limit: int +) -> list["SearchResult"]: + """documents.embedding 직접 검색 — recall robust (자연어 매칭). + + chunks가 없는 doc도 매칭 가능. score는 cosine similarity (1 - distance). + chunk_id/chunk_index/section_title은 None. + """ + from api.search import SearchResult # 순환 import 회피 + + result = await session.execute( + text(""" + SELECT + id, + title, + ai_domain, + ai_summary, + file_format, + (1 - (embedding <=> cast(:embedding AS vector))) AS score, + left(extracted_text, 200) AS snippet, + 'vector_doc' AS match_reason, + NULL::bigint AS chunk_id, + NULL::integer AS chunk_index, + NULL::text AS section_title + FROM documents + WHERE embedding IS NOT NULL AND deleted_at IS NULL + ORDER BY embedding <=> cast(:embedding AS vector) + LIMIT :limit + """), + {"embedding": embedding_str, "limit": limit}, + ) + return [SearchResult(**row._mapping) for row in result] + + +async def _search_vector_chunks( + session: AsyncSession, embedding_str: str, limit: int +) -> list["SearchResult"]: + """document_chunks.embedding 검색 + window partition (doc당 top 2 chunks). + + SQL 흐름: + 1. inner CTE topk: ivfflat 인덱스로 top-K chunks 추출 + 2. ranked CTE: doc_id PARTITION + ROW_NUMBER (score 내림차순) + 3. outer: rn <= 2 (doc당 max 2 chunks) + JOIN documents + """ + from api.search import SearchResult # 순환 import 회피 + + inner_k = max(limit * 5, 500) result = await session.execute( text(""" WITH topk AS ( @@ -181,7 +254,7 @@ async def search_vector( d.file_format AS file_format, (1 - r.dist) AS score, left(r.text, 200) AS snippet, - 'vector' AS match_reason, + 'vector_chunk' AS match_reason, r.chunk_id AS chunk_id, r.chunk_index AS chunk_index, r.section_title AS section_title @@ -191,11 +264,49 @@ async def search_vector( ORDER BY r.dist LIMIT :limit """), - {"embedding": str(query_embedding), "inner_k": inner_k, "limit": limit * 4}, + {"embedding": embedding_str, "inner_k": inner_k, "limit": limit}, ) return [SearchResult(**row._mapping) for row in result] +def _merge_doc_and_chunk_vectors( + doc_results: list["SearchResult"], + chunk_results: list["SearchResult"], +) -> list["SearchResult"]: + """doc + chunks vector 결과 merge (Phase 1.2-G). + + 가중치: + - chunk score * 1.2 (segment 매칭이 더 정확) + - doc score * 1.0 (전체 본문 평균, recall 보완) + + Dedup: + - doc_id 기준 + - chunks가 있으면 chunks 우선 (segment 정보 + chunk_id 보존) + - chunks에 없는 doc은 doc-wrap으로 추가 + + Returns: + score 내림차순 정렬된 SearchResult 리스트. + chunk_id가 None이면 doc-wrap 결과(text-only 매치 doc 처리에 사용). + """ + by_doc_id: dict[int, "SearchResult"] = {} + + # chunks 먼저 (가중치 적용 + chunk_id 보존) + for c in chunk_results: + c.score = c.score * CHUNK_VECTOR_WEIGHT + prev = by_doc_id.get(c.id) + if prev is None or c.score > prev.score: + by_doc_id[c.id] = c + + # doc 매치는 chunks에 없는 doc만 추가 (chunks 우선 원칙) + for d in doc_results: + d.score = d.score * DOC_VECTOR_WEIGHT + if d.id not in by_doc_id: + by_doc_id[d.id] = d + + # score 내림차순 정렬 + return sorted(by_doc_id.values(), key=lambda r: r.score, reverse=True) + + def compress_chunks_to_docs( chunks: list["SearchResult"], limit: int ) -> tuple[list["SearchResult"], dict[int, list["SearchResult"]]]: