Phase 1.2-C 평가셋: Recall 0.788 → 0.531, natural_language 0.73 → 0.07.
진단:
단순 chunk top-N(limit*5=25)으로 raw chunks 가져왔는데 같은 doc의
여러 chunks가 상위에 몰림 → unique doc 다양성 붕괴.
warm test debug: 'chunks raw=16 compressed=5 unique_docs=10'
해결 (사용자 추천 C):
Window function ROW_NUMBER() PARTITION BY doc_id로 doc당 top 2 chunks만 반환.
SQL 흐름:
1. inner CTE topk: ivfflat 인덱스로 top inner_k chunks 빠르게
(inner_k = max(limit*10, 200))
2. ranked CTE: PARTITION BY doc_id ORDER BY dist ROW_NUMBER
3. outer: rn <= 2 (doc당 max 2 chunks) + JOIN documents
4. limit = limit * 4 (chunks 단위, ~limit*2 unique docs)
reranker 호환:
doc당 max 2 chunks 그대로 반환 → chunks_by_doc 보존
compress_chunks_to_docs는 그대로 동작 (best chunk per doc)
Phase 1.3 reranker가 chunks_by_doc에서 raw chunks 회수 가능
핵심 원칙: vector retrieval은 chunk로 찾고 doc으로 선택해야 한다.
232 lines
9.8 KiB
Python
232 lines
9.8 KiB
Python
"""검색 후보 수집 서비스 (Phase 1.2).
|
|
|
|
text(documents FTS + trigram) + vector(documents.embedding → chunks) 후보를
|
|
SearchResult 리스트로 반환.
|
|
|
|
Phase 1.1a: search.py의 _search_text/_search_vector를 이전 (ILIKE 그대로).
|
|
Phase 1.2-B: ILIKE → trigram `%` + `similarity()`. ILIKE 풀 스캔 제거.
|
|
Phase 1.2-B 이후: vector retrieval을 document_chunks 테이블 기반으로 전환.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
from typing import TYPE_CHECKING
|
|
|
|
from sqlalchemy import text
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from ai.client import AIClient
|
|
|
|
if TYPE_CHECKING:
|
|
from api.search import SearchResult
|
|
|
|
|
|
async def search_text(
|
|
session: AsyncSession, query: str, limit: int
|
|
) -> list["SearchResult"]:
|
|
"""FTS + trigram 필드별 가중치 검색 (Phase 1.2-B UNION 분해).
|
|
|
|
Phase 1.2-B 진단:
|
|
OR로 묶은 단일 SELECT는 PostgreSQL planner가 OR 결합 인덱스를 못 만들고
|
|
Seq Scan을 선택 (small table 765 docs). EXPLAIN으로 측정 시 525ms.
|
|
→ CTE + UNION으로 분해하면 각 branch가 자기 인덱스 활용 → 26ms (95% 감소).
|
|
|
|
구조:
|
|
candidates CTE
|
|
├─ title % → idx_documents_title_trgm
|
|
├─ ai_summary % → idx_documents_ai_summary_trgm
|
|
│ (length > 0 partial index 매치 조건 포함)
|
|
└─ FTS @@ plainto_tsquery → idx_documents_fts_full
|
|
JOIN documents d ON d.id = c.id
|
|
ORDER BY 5컬럼 similarity 가중 합산 + ts_rank * 2.0
|
|
가중치: title 3.0 / ai_tags 2.5 / user_note 2.0 / ai_summary 1.5 / extracted_text 1.0
|
|
|
|
threshold:
|
|
pg_trgm.similarity_threshold default = 0.3
|
|
→ multi-token 한국어 뉴스 쿼리(예: "이란 미국 전쟁 글로벌 반응")에서
|
|
candidates를 못 모음 → recall 감소 (0.788 → 0.750)
|
|
→ set_limit(0.15)으로 낮춰 recall 회복. precision은 ORDER BY similarity 합산이 보정.
|
|
"""
|
|
from api.search import SearchResult # 순환 import 회피
|
|
|
|
# trigram threshold를 0.15로 낮춰 multi-token query recall 회복
|
|
# SQLAlchemy async session 내 두 execute는 같은 connection 사용
|
|
await session.execute(text("SELECT set_limit(0.15)"))
|
|
|
|
result = await session.execute(
|
|
text("""
|
|
WITH candidates AS (
|
|
-- title trigram (idx_documents_title_trgm)
|
|
SELECT id FROM documents
|
|
WHERE deleted_at IS NULL AND title % :q
|
|
UNION
|
|
-- ai_summary trigram (idx_documents_ai_summary_trgm 부분 인덱스 매치)
|
|
SELECT id FROM documents
|
|
WHERE deleted_at IS NULL
|
|
AND ai_summary IS NOT NULL
|
|
AND length(ai_summary) > 0
|
|
AND ai_summary % :q
|
|
UNION
|
|
-- FTS 통합 인덱스 (idx_documents_fts_full)
|
|
SELECT id FROM documents
|
|
WHERE deleted_at IS NULL
|
|
AND to_tsvector('simple',
|
|
coalesce(title, '') || ' ' ||
|
|
coalesce(ai_tags::text, '') || ' ' ||
|
|
coalesce(ai_summary, '') || ' ' ||
|
|
coalesce(user_note, '') || ' ' ||
|
|
coalesce(extracted_text, '')
|
|
) @@ plainto_tsquery('simple', :q)
|
|
)
|
|
SELECT d.id, d.title, d.ai_domain, d.ai_summary, d.file_format,
|
|
left(d.extracted_text, 200) AS snippet,
|
|
(
|
|
-- 컬럼별 trigram similarity 가중 합산
|
|
similarity(coalesce(d.title, ''), :q) * 3.0
|
|
+ similarity(coalesce(d.ai_tags::text, ''), :q) * 2.5
|
|
+ similarity(coalesce(d.user_note, ''), :q) * 2.0
|
|
+ similarity(coalesce(d.ai_summary, ''), :q) * 1.5
|
|
+ similarity(coalesce(d.extracted_text, ''), :q) * 1.0
|
|
-- FTS 보너스 (idx_documents_fts_full 활용)
|
|
+ coalesce(ts_rank(
|
|
to_tsvector('simple',
|
|
coalesce(d.title, '') || ' ' ||
|
|
coalesce(d.ai_tags::text, '') || ' ' ||
|
|
coalesce(d.ai_summary, '') || ' ' ||
|
|
coalesce(d.user_note, '') || ' ' ||
|
|
coalesce(d.extracted_text, '')
|
|
),
|
|
plainto_tsquery('simple', :q)
|
|
), 0) * 2.0
|
|
) AS score,
|
|
-- match_reason: similarity 가장 큰 컬럼 또는 FTS
|
|
CASE
|
|
WHEN similarity(coalesce(d.title, ''), :q) >= 0.3 THEN 'title'
|
|
WHEN similarity(coalesce(d.ai_tags::text, ''), :q) >= 0.3 THEN 'tags'
|
|
WHEN similarity(coalesce(d.user_note, ''), :q) >= 0.3 THEN 'note'
|
|
WHEN similarity(coalesce(d.ai_summary, ''), :q) >= 0.3 THEN 'summary'
|
|
WHEN similarity(coalesce(d.extracted_text, ''), :q) >= 0.3 THEN 'content'
|
|
ELSE 'fts'
|
|
END AS match_reason
|
|
FROM documents d
|
|
JOIN candidates c ON d.id = c.id
|
|
ORDER BY score DESC
|
|
LIMIT :limit
|
|
"""),
|
|
{"q": query, "limit": limit},
|
|
)
|
|
return [SearchResult(**row._mapping) for row in result]
|
|
|
|
|
|
async def search_vector(
|
|
session: AsyncSession, query: str, limit: int
|
|
) -> list["SearchResult"]:
|
|
"""벡터 유사도 검색 — chunk-level + doc 다양성 보장 (Phase 1.2-C).
|
|
|
|
Phase 1.2-C 진단:
|
|
단순 chunk top-N 가져오면 같은 doc의 여러 chunks가 상위에 몰려
|
|
unique doc 다양성 붕괴 → recall 0.788 → 0.531 (catastrophic).
|
|
|
|
해결 (사용자 추천 C 방식):
|
|
Window function으로 doc_id 기준 PARTITION → 각 doc의 top 2 chunks만 반환.
|
|
raw_chunks(chunks_by_doc 보존)와 doc-level 압축 둘 다 만족.
|
|
|
|
SQL 흐름:
|
|
1. inner CTE: ivfflat 인덱스로 top-K chunks 빠르게 추출
|
|
2. ranked CTE: doc_id PARTITION 후 score 내림차순 ROW_NUMBER
|
|
3. outer: rn <= 2 (doc당 max 2 chunks) + JOIN documents
|
|
|
|
Returns:
|
|
list[SearchResult] — chunk-level, 각 doc 최대 2개. compress_chunks_to_docs로
|
|
doc-level 압축 + chunks_by_doc 보존.
|
|
"""
|
|
from api.search import SearchResult # 순환 import 회피
|
|
|
|
try:
|
|
client = AIClient()
|
|
query_embedding = await client.embed(query)
|
|
await client.close()
|
|
except Exception:
|
|
return []
|
|
|
|
# ivfflat 인덱스로 top-K chunks 추출 후 doc 단위 partition
|
|
# inner_k = limit * 10 정도로 충분 unique doc 확보 (~30~50 docs)
|
|
inner_k = max(limit * 10, 200)
|
|
result = await session.execute(
|
|
text("""
|
|
WITH topk AS (
|
|
SELECT
|
|
c.id AS chunk_id,
|
|
c.doc_id,
|
|
c.chunk_index,
|
|
c.section_title,
|
|
c.text,
|
|
c.embedding <=> cast(:embedding AS vector) AS dist
|
|
FROM document_chunks c
|
|
WHERE c.embedding IS NOT NULL
|
|
ORDER BY c.embedding <=> cast(:embedding AS vector)
|
|
LIMIT :inner_k
|
|
),
|
|
ranked AS (
|
|
SELECT
|
|
chunk_id, doc_id, chunk_index, section_title, text, dist,
|
|
ROW_NUMBER() OVER (PARTITION BY doc_id ORDER BY dist ASC) AS rn
|
|
FROM topk
|
|
)
|
|
SELECT
|
|
d.id AS id,
|
|
d.title AS title,
|
|
d.ai_domain AS ai_domain,
|
|
d.ai_summary AS ai_summary,
|
|
d.file_format AS file_format,
|
|
(1 - r.dist) AS score,
|
|
left(r.text, 200) AS snippet,
|
|
'vector' AS match_reason,
|
|
r.chunk_id AS chunk_id,
|
|
r.chunk_index AS chunk_index,
|
|
r.section_title AS section_title
|
|
FROM ranked r
|
|
JOIN documents d ON d.id = r.doc_id
|
|
WHERE r.rn <= 2 AND d.deleted_at IS NULL
|
|
ORDER BY r.dist
|
|
LIMIT :limit
|
|
"""),
|
|
{"embedding": str(query_embedding), "inner_k": inner_k, "limit": limit * 4},
|
|
)
|
|
return [SearchResult(**row._mapping) for row in result]
|
|
|
|
|
|
def compress_chunks_to_docs(
|
|
chunks: list["SearchResult"], limit: int
|
|
) -> tuple[list["SearchResult"], dict[int, list["SearchResult"]]]:
|
|
"""chunk-level 결과를 doc-level로 압축하면서 raw chunks를 보존.
|
|
|
|
fusion은 doc 기준이어야 하지만(같은 doc 중복 방지), Phase 1.3 reranker는
|
|
chunk 기준 raw 데이터가 필요함. 따라서 압축본과 raw를 동시 반환.
|
|
|
|
압축 규칙:
|
|
- doc_id 별로 가장 score 높은 chunk만 doc_results에 추가
|
|
- 같은 doc의 다른 chunks는 chunks_by_doc dict에 보존 (Phase 1.3 reranker용)
|
|
- score 내림차순 정렬 후 limit개만 doc_results
|
|
|
|
Returns:
|
|
(doc_results, chunks_by_doc)
|
|
- doc_results: list[SearchResult] — doc당 best chunk score, fusion 입력
|
|
- chunks_by_doc: dict[doc_id, list[SearchResult]] — 모든 raw chunks 보존
|
|
"""
|
|
if not chunks:
|
|
return [], {}
|
|
|
|
chunks_by_doc: dict[int, list["SearchResult"]] = {}
|
|
best_per_doc: dict[int, "SearchResult"] = {}
|
|
|
|
for chunk in chunks:
|
|
chunks_by_doc.setdefault(chunk.id, []).append(chunk)
|
|
prev_best = best_per_doc.get(chunk.id)
|
|
if prev_best is None or chunk.score > prev_best.score:
|
|
best_per_doc[chunk.id] = chunk
|
|
|
|
# doc 단위 best score 정렬, 상위 limit개
|
|
doc_results = sorted(best_per_doc.values(), key=lambda r: r.score, reverse=True)
|
|
return doc_results[:limit], chunks_by_doc
|