Files
hyungi_document_server/app/services/search/retrieval_service.py
Hyungi Ahn b80116243f feat(search): Phase 1.2-C chunks 기반 vector retrieval + raw chunks 보존
retrieval_service.search_vector를 documents.embedding → document_chunks.embedding로 전환.
fetch_limit = limit*5로 raw chunks를 넓게 가져온 후 doc 기준 압축.

신규: compress_chunks_to_docs(chunks, limit) → (doc_results, chunks_by_doc)
- doc_id 별 best score chunk만 doc_results (fusion 입력)
- 모든 raw chunks는 chunks_by_doc dict에 보존 (Phase 1.3 reranker용)
- '같은 doc 중복으로 RRF가 false boost' 방지

SearchResult: chunk_id / chunk_index / section_title optional 필드 추가.
- text 검색 결과는 None (doc-level)
- vector 검색 결과는 채워짐 (chunk-level)

search.py 흐름:
1. raw_chunks = await search_vector(...)
2. vector_results, chunks_by_doc = compress_chunks_to_docs(raw_chunks, limit)
3. fusion(text_results, vector_results) — doc 기준
4. (Phase 1.3) chunks_by_doc → reranker — chunk 기준

debug notes: raw=N compressed=M unique_docs=K로 흐름 검증.

데이터 의존: 재인덱싱(reindex_all_chunks.py 진행 중) 완료 후 평가셋으로 검증.
2026-04-08 12:36:47 +09:00

204 lines
8.7 KiB
Python

"""검색 후보 수집 서비스 (Phase 1.2).
text(documents FTS + trigram) + vector(documents.embedding → chunks) 후보를
SearchResult 리스트로 반환.
Phase 1.1a: search.py의 _search_text/_search_vector를 이전 (ILIKE 그대로).
Phase 1.2-B: ILIKE → trigram `%` + `similarity()`. ILIKE 풀 스캔 제거.
Phase 1.2-B 이후: vector retrieval을 document_chunks 테이블 기반으로 전환.
"""
from __future__ import annotations
from typing import TYPE_CHECKING
from sqlalchemy import text
from sqlalchemy.ext.asyncio import AsyncSession
from ai.client import AIClient
if TYPE_CHECKING:
from api.search import SearchResult
async def search_text(
session: AsyncSession, query: str, limit: int
) -> list["SearchResult"]:
"""FTS + trigram 필드별 가중치 검색 (Phase 1.2-B UNION 분해).
Phase 1.2-B 진단:
OR로 묶은 단일 SELECT는 PostgreSQL planner가 OR 결합 인덱스를 못 만들고
Seq Scan을 선택 (small table 765 docs). EXPLAIN으로 측정 시 525ms.
→ CTE + UNION으로 분해하면 각 branch가 자기 인덱스 활용 → 26ms (95% 감소).
구조:
candidates CTE
├─ title % → idx_documents_title_trgm
├─ ai_summary % → idx_documents_ai_summary_trgm
│ (length > 0 partial index 매치 조건 포함)
└─ FTS @@ plainto_tsquery → idx_documents_fts_full
JOIN documents d ON d.id = c.id
ORDER BY 5컬럼 similarity 가중 합산 + ts_rank * 2.0
가중치: title 3.0 / ai_tags 2.5 / user_note 2.0 / ai_summary 1.5 / extracted_text 1.0
threshold:
pg_trgm.similarity_threshold default = 0.3
→ multi-token 한국어 뉴스 쿼리(예: "이란 미국 전쟁 글로벌 반응")에서
candidates를 못 모음 → recall 감소 (0.788 → 0.750)
→ set_limit(0.15)으로 낮춰 recall 회복. precision은 ORDER BY similarity 합산이 보정.
"""
from api.search import SearchResult # 순환 import 회피
# trigram threshold를 0.15로 낮춰 multi-token query recall 회복
# SQLAlchemy async session 내 두 execute는 같은 connection 사용
await session.execute(text("SELECT set_limit(0.15)"))
result = await session.execute(
text("""
WITH candidates AS (
-- title trigram (idx_documents_title_trgm)
SELECT id FROM documents
WHERE deleted_at IS NULL AND title % :q
UNION
-- ai_summary trigram (idx_documents_ai_summary_trgm 부분 인덱스 매치)
SELECT id FROM documents
WHERE deleted_at IS NULL
AND ai_summary IS NOT NULL
AND length(ai_summary) > 0
AND ai_summary % :q
UNION
-- FTS 통합 인덱스 (idx_documents_fts_full)
SELECT id FROM documents
WHERE deleted_at IS NULL
AND to_tsvector('simple',
coalesce(title, '') || ' ' ||
coalesce(ai_tags::text, '') || ' ' ||
coalesce(ai_summary, '') || ' ' ||
coalesce(user_note, '') || ' ' ||
coalesce(extracted_text, '')
) @@ plainto_tsquery('simple', :q)
)
SELECT d.id, d.title, d.ai_domain, d.ai_summary, d.file_format,
left(d.extracted_text, 200) AS snippet,
(
-- 컬럼별 trigram similarity 가중 합산
similarity(coalesce(d.title, ''), :q) * 3.0
+ similarity(coalesce(d.ai_tags::text, ''), :q) * 2.5
+ similarity(coalesce(d.user_note, ''), :q) * 2.0
+ similarity(coalesce(d.ai_summary, ''), :q) * 1.5
+ similarity(coalesce(d.extracted_text, ''), :q) * 1.0
-- FTS 보너스 (idx_documents_fts_full 활용)
+ coalesce(ts_rank(
to_tsvector('simple',
coalesce(d.title, '') || ' ' ||
coalesce(d.ai_tags::text, '') || ' ' ||
coalesce(d.ai_summary, '') || ' ' ||
coalesce(d.user_note, '') || ' ' ||
coalesce(d.extracted_text, '')
),
plainto_tsquery('simple', :q)
), 0) * 2.0
) AS score,
-- match_reason: similarity 가장 큰 컬럼 또는 FTS
CASE
WHEN similarity(coalesce(d.title, ''), :q) >= 0.3 THEN 'title'
WHEN similarity(coalesce(d.ai_tags::text, ''), :q) >= 0.3 THEN 'tags'
WHEN similarity(coalesce(d.user_note, ''), :q) >= 0.3 THEN 'note'
WHEN similarity(coalesce(d.ai_summary, ''), :q) >= 0.3 THEN 'summary'
WHEN similarity(coalesce(d.extracted_text, ''), :q) >= 0.3 THEN 'content'
ELSE 'fts'
END AS match_reason
FROM documents d
JOIN candidates c ON d.id = c.id
ORDER BY score DESC
LIMIT :limit
"""),
{"q": query, "limit": limit},
)
return [SearchResult(**row._mapping) for row in result]
async def search_vector(
session: AsyncSession, query: str, limit: int
) -> list["SearchResult"]:
"""벡터 유사도 검색 — chunk-level (Phase 1.2-C).
document_chunks 테이블에서 cosine similarity로 raw chunks 반환.
같은 doc에서 여러 chunks가 들어올 수 있음 (압축 안 함).
fusion 직전에 compress_chunks_to_docs() helper로 doc 기준 압축 필요.
Phase 1.3 reranker는 raw chunks를 그대로 활용.
SearchResult.id = doc_id (fusion 호환)
SearchResult.chunk_id / chunk_index / section_title = chunk 메타
snippet = chunk의 text 앞 200자
"""
from api.search import SearchResult # 순환 import 회피
try:
client = AIClient()
query_embedding = await client.embed(query)
await client.close()
except Exception:
return []
# raw chunks를 doc 메타와 join. limit * 5 정도 넓게 → 압축 후 doc 다양성.
fetch_limit = limit * 5
result = await session.execute(
text("""
SELECT
d.id AS id,
d.title AS title,
d.ai_domain AS ai_domain,
d.ai_summary AS ai_summary,
d.file_format AS file_format,
(1 - (c.embedding <=> cast(:embedding AS vector))) AS score,
left(c.text, 200) AS snippet,
'vector' AS match_reason,
c.id AS chunk_id,
c.chunk_index AS chunk_index,
c.section_title AS section_title
FROM document_chunks c
JOIN documents d ON d.id = c.doc_id
WHERE c.embedding IS NOT NULL AND d.deleted_at IS NULL
ORDER BY c.embedding <=> cast(:embedding AS vector)
LIMIT :limit
"""),
{"embedding": str(query_embedding), "limit": fetch_limit},
)
return [SearchResult(**row._mapping) for row in result]
def compress_chunks_to_docs(
chunks: list["SearchResult"], limit: int
) -> tuple[list["SearchResult"], dict[int, list["SearchResult"]]]:
"""chunk-level 결과를 doc-level로 압축하면서 raw chunks를 보존.
fusion은 doc 기준이어야 하지만(같은 doc 중복 방지), Phase 1.3 reranker는
chunk 기준 raw 데이터가 필요함. 따라서 압축본과 raw를 동시 반환.
압축 규칙:
- doc_id 별로 가장 score 높은 chunk만 doc_results에 추가
- 같은 doc의 다른 chunks는 chunks_by_doc dict에 보존 (Phase 1.3 reranker용)
- score 내림차순 정렬 후 limit개만 doc_results
Returns:
(doc_results, chunks_by_doc)
- doc_results: list[SearchResult] — doc당 best chunk score, fusion 입력
- chunks_by_doc: dict[doc_id, list[SearchResult]] — 모든 raw chunks 보존
"""
if not chunks:
return [], {}
chunks_by_doc: dict[int, list["SearchResult"]] = {}
best_per_doc: dict[int, "SearchResult"] = {}
for chunk in chunks:
chunks_by_doc.setdefault(chunk.id, []).append(chunk)
prev_best = best_per_doc.get(chunk.id)
if prev_best is None or chunk.score > prev_best.score:
best_per_doc[chunk.id] = chunk
# doc 단위 best score 정렬, 상위 limit개
doc_results = sorted(best_per_doc.values(), key=lambda r: r.score, reverse=True)
return doc_results[:limit], chunks_by_doc