"""검색 후보 수집 서비스 (Phase 1.2). text(documents FTS + trigram) + vector(documents.embedding → chunks) 후보를 SearchResult 리스트로 반환. Phase 1.1a: search.py의 _search_text/_search_vector를 이전 (ILIKE 그대로). Phase 1.2-B: ILIKE → trigram `%` + `similarity()`. ILIKE 풀 스캔 제거. Phase 1.2-B 이후: vector retrieval을 document_chunks 테이블 기반으로 전환. """ from __future__ import annotations from typing import TYPE_CHECKING from sqlalchemy import text from sqlalchemy.ext.asyncio import AsyncSession from ai.client import AIClient if TYPE_CHECKING: from api.search import SearchResult async def search_text( session: AsyncSession, query: str, limit: int ) -> list["SearchResult"]: """FTS + trigram 필드별 가중치 검색 (Phase 1.2-B UNION 분해). Phase 1.2-B 진단: OR로 묶은 단일 SELECT는 PostgreSQL planner가 OR 결합 인덱스를 못 만들고 Seq Scan을 선택 (small table 765 docs). EXPLAIN으로 측정 시 525ms. → CTE + UNION으로 분해하면 각 branch가 자기 인덱스 활용 → 26ms (95% 감소). 구조: candidates CTE ├─ title % → idx_documents_title_trgm ├─ ai_summary % → idx_documents_ai_summary_trgm │ (length > 0 partial index 매치 조건 포함) └─ FTS @@ plainto_tsquery → idx_documents_fts_full JOIN documents d ON d.id = c.id ORDER BY 5컬럼 similarity 가중 합산 + ts_rank * 2.0 가중치: title 3.0 / ai_tags 2.5 / user_note 2.0 / ai_summary 1.5 / extracted_text 1.0 threshold: pg_trgm.similarity_threshold default = 0.3 → multi-token 한국어 뉴스 쿼리(예: "이란 미국 전쟁 글로벌 반응")에서 candidates를 못 모음 → recall 감소 (0.788 → 0.750) → set_limit(0.15)으로 낮춰 recall 회복. precision은 ORDER BY similarity 합산이 보정. """ from api.search import SearchResult # 순환 import 회피 # trigram threshold를 0.15로 낮춰 multi-token query recall 회복 # SQLAlchemy async session 내 두 execute는 같은 connection 사용 await session.execute(text("SELECT set_limit(0.15)")) result = await session.execute( text(""" WITH candidates AS ( -- title trigram (idx_documents_title_trgm) SELECT id FROM documents WHERE deleted_at IS NULL AND title % :q UNION -- ai_summary trigram (idx_documents_ai_summary_trgm 부분 인덱스 매치) SELECT id FROM documents WHERE deleted_at IS NULL AND ai_summary IS NOT NULL AND length(ai_summary) > 0 AND ai_summary % :q UNION -- FTS 통합 인덱스 (idx_documents_fts_full) SELECT id FROM documents WHERE deleted_at IS NULL AND to_tsvector('simple', coalesce(title, '') || ' ' || coalesce(ai_tags::text, '') || ' ' || coalesce(ai_summary, '') || ' ' || coalesce(user_note, '') || ' ' || coalesce(extracted_text, '') ) @@ plainto_tsquery('simple', :q) ) SELECT d.id, d.title, d.ai_domain, d.ai_summary, d.file_format, left(d.extracted_text, 200) AS snippet, ( -- 컬럼별 trigram similarity 가중 합산 similarity(coalesce(d.title, ''), :q) * 3.0 + similarity(coalesce(d.ai_tags::text, ''), :q) * 2.5 + similarity(coalesce(d.user_note, ''), :q) * 2.0 + similarity(coalesce(d.ai_summary, ''), :q) * 1.5 + similarity(coalesce(d.extracted_text, ''), :q) * 1.0 -- FTS 보너스 (idx_documents_fts_full 활용) + coalesce(ts_rank( to_tsvector('simple', coalesce(d.title, '') || ' ' || coalesce(d.ai_tags::text, '') || ' ' || coalesce(d.ai_summary, '') || ' ' || coalesce(d.user_note, '') || ' ' || coalesce(d.extracted_text, '') ), plainto_tsquery('simple', :q) ), 0) * 2.0 ) AS score, -- match_reason: similarity 가장 큰 컬럼 또는 FTS CASE WHEN similarity(coalesce(d.title, ''), :q) >= 0.3 THEN 'title' WHEN similarity(coalesce(d.ai_tags::text, ''), :q) >= 0.3 THEN 'tags' WHEN similarity(coalesce(d.user_note, ''), :q) >= 0.3 THEN 'note' WHEN similarity(coalesce(d.ai_summary, ''), :q) >= 0.3 THEN 'summary' WHEN similarity(coalesce(d.extracted_text, ''), :q) >= 0.3 THEN 'content' ELSE 'fts' END AS match_reason FROM documents d JOIN candidates c ON d.id = c.id ORDER BY score DESC LIMIT :limit """), {"q": query, "limit": limit}, ) return [SearchResult(**row._mapping) for row in result] async def search_vector( session: AsyncSession, query: str, limit: int ) -> list["SearchResult"]: """벡터 유사도 검색 — chunk-level + doc 다양성 보장 (Phase 1.2-C). Phase 1.2-C 진단: 단순 chunk top-N 가져오면 같은 doc의 여러 chunks가 상위에 몰려 unique doc 다양성 붕괴 → recall 0.788 → 0.531 (catastrophic). 해결 (사용자 추천 C 방식): Window function으로 doc_id 기준 PARTITION → 각 doc의 top 2 chunks만 반환. raw_chunks(chunks_by_doc 보존)와 doc-level 압축 둘 다 만족. SQL 흐름: 1. inner CTE: ivfflat 인덱스로 top-K chunks 빠르게 추출 2. ranked CTE: doc_id PARTITION 후 score 내림차순 ROW_NUMBER 3. outer: rn <= 2 (doc당 max 2 chunks) + JOIN documents Returns: list[SearchResult] — chunk-level, 각 doc 최대 2개. compress_chunks_to_docs로 doc-level 압축 + chunks_by_doc 보존. """ from api.search import SearchResult # 순환 import 회피 try: client = AIClient() query_embedding = await client.embed(query) await client.close() except Exception: return [] # ivfflat 인덱스로 top-K chunks 추출 후 doc 단위 partition # inner_k = limit * 10 정도로 충분 unique doc 확보 (~30~50 docs) inner_k = max(limit * 10, 200) result = await session.execute( text(""" WITH topk AS ( SELECT c.id AS chunk_id, c.doc_id, c.chunk_index, c.section_title, c.text, c.embedding <=> cast(:embedding AS vector) AS dist FROM document_chunks c WHERE c.embedding IS NOT NULL ORDER BY c.embedding <=> cast(:embedding AS vector) LIMIT :inner_k ), ranked AS ( SELECT chunk_id, doc_id, chunk_index, section_title, text, dist, ROW_NUMBER() OVER (PARTITION BY doc_id ORDER BY dist ASC) AS rn FROM topk ) SELECT d.id AS id, d.title AS title, d.ai_domain AS ai_domain, d.ai_summary AS ai_summary, d.file_format AS file_format, (1 - r.dist) AS score, left(r.text, 200) AS snippet, 'vector' AS match_reason, r.chunk_id AS chunk_id, r.chunk_index AS chunk_index, r.section_title AS section_title FROM ranked r JOIN documents d ON d.id = r.doc_id WHERE r.rn <= 2 AND d.deleted_at IS NULL ORDER BY r.dist LIMIT :limit """), {"embedding": str(query_embedding), "inner_k": inner_k, "limit": limit * 4}, ) return [SearchResult(**row._mapping) for row in result] def compress_chunks_to_docs( chunks: list["SearchResult"], limit: int ) -> tuple[list["SearchResult"], dict[int, list["SearchResult"]]]: """chunk-level 결과를 doc-level로 압축하면서 raw chunks를 보존. fusion은 doc 기준이어야 하지만(같은 doc 중복 방지), Phase 1.3 reranker는 chunk 기준 raw 데이터가 필요함. 따라서 압축본과 raw를 동시 반환. 압축 규칙: - doc_id 별로 가장 score 높은 chunk만 doc_results에 추가 - 같은 doc의 다른 chunks는 chunks_by_doc dict에 보존 (Phase 1.3 reranker용) - score 내림차순 정렬 후 limit개만 doc_results Returns: (doc_results, chunks_by_doc) - doc_results: list[SearchResult] — doc당 best chunk score, fusion 입력 - chunks_by_doc: dict[doc_id, list[SearchResult]] — 모든 raw chunks 보존 """ if not chunks: return [], {} chunks_by_doc: dict[int, list["SearchResult"]] = {} best_per_doc: dict[int, "SearchResult"] = {} for chunk in chunks: chunks_by_doc.setdefault(chunk.id, []).append(chunk) prev_best = best_per_doc.get(chunk.id) if prev_best is None or chunk.score > prev_best.score: best_per_doc[chunk.id] = chunk # doc 단위 best score 정렬, 상위 limit개 doc_results = sorted(best_per_doc.values(), key=lambda r: r.score, reverse=True) return doc_results[:limit], chunks_by_doc