diff --git a/app/api/search.py b/app/api/search.py index 5682866..918d2aa 100644 --- a/app/api/search.py +++ b/app/api/search.py @@ -16,7 +16,7 @@ from core.database import get_session from core.utils import setup_logger from models.user import User from services.search.fusion_service import DEFAULT_FUSION, get_strategy, normalize_display_scores -from services.search.retrieval_service import search_text, search_vector +from services.search.retrieval_service import compress_chunks_to_docs, search_text, search_vector from services.search_telemetry import ( compute_confidence, compute_confidence_hybrid, @@ -30,7 +30,14 @@ router = APIRouter() class SearchResult(BaseModel): - id: int + """검색 결과 단일 행. + + Phase 1.2-C: chunk-level vector retrieval 도입으로 chunk 메타 필드 추가. + text 검색 결과는 chunk_id 등이 None (doc-level). + vector 검색 결과는 chunk_id 등이 채워짐 (chunk-level). + """ + + id: int # doc_id (text/vector 공통) title: str | None ai_domain: str | None ai_summary: str | None @@ -38,6 +45,10 @@ class SearchResult(BaseModel): score: float snippet: str | None match_reason: str | None = None + # Phase 1.2-C: chunk 메타 (vector 검색 시 채워짐) + chunk_id: int | None = None + chunk_index: int | None = None + section_title: str | None = None # ─── Phase 0.4: 디버그 응답 스키마 ───────────────────────── @@ -99,16 +110,20 @@ async def search( timing: dict[str, float] = {} notes: list[str] = [] text_results: list[SearchResult] = [] - vector_results: list[SearchResult] = [] + vector_results: list[SearchResult] = [] # doc-level (압축 후, fusion 입력) + raw_chunks: list[SearchResult] = [] # chunk-level (raw, Phase 1.3 reranker용) + chunks_by_doc: dict[int, list[SearchResult]] = {} # Phase 1.3 reranker용 보존 t_total = time.perf_counter() if mode == "vector": t0 = time.perf_counter() - vector_results = await search_vector(session, q, limit) + raw_chunks = await search_vector(session, q, limit) timing["vector_ms"] = (time.perf_counter() - t0) * 1000 - if not vector_results: + if not raw_chunks: notes.append("vector_search_returned_empty (AI client error or no embeddings)") + # vector 단독 모드도 doc 압축해서 다양성 확보 (chunk 중복 방지) + vector_results, chunks_by_doc = compress_chunks_to_docs(raw_chunks, limit) results = vector_results else: t0 = time.perf_counter() @@ -117,8 +132,14 @@ async def search( if mode == "hybrid": t1 = time.perf_counter() - vector_results = await search_vector(session, q, limit) + raw_chunks = await search_vector(session, q, limit) timing["vector_ms"] = (time.perf_counter() - t1) * 1000 + + # chunk-level → doc-level 압축 (raw chunks는 chunks_by_doc에 보존) + t1b = time.perf_counter() + vector_results, chunks_by_doc = compress_chunks_to_docs(raw_chunks, limit) + timing["compress_ms"] = (time.perf_counter() - t1b) * 1000 + if not vector_results: notes.append("vector_search_returned_empty — text-only fallback") @@ -127,6 +148,10 @@ async def search( results = strategy.fuse(text_results, vector_results, q, limit) timing["fusion_ms"] = (time.perf_counter() - t2) * 1000 notes.append(f"fusion={strategy.name}") + notes.append( + f"chunks raw={len(raw_chunks)} compressed={len(vector_results)} " + f"unique_docs={len(chunks_by_doc)}" + ) else: results = text_results diff --git a/app/services/search/retrieval_service.py b/app/services/search/retrieval_service.py index fd9d6ca..568ec4a 100644 --- a/app/services/search/retrieval_service.py +++ b/app/services/search/retrieval_service.py @@ -121,10 +121,16 @@ async def search_text( async def search_vector( session: AsyncSession, query: str, limit: int ) -> list["SearchResult"]: - """벡터 유사도 검색 (코사인 거리). + """벡터 유사도 검색 — chunk-level (Phase 1.2-C). - Phase 1.2에서 document_chunks 테이블 기반으로 전환 예정. - 현재는 documents.embedding 사용. + document_chunks 테이블에서 cosine similarity로 raw chunks 반환. + 같은 doc에서 여러 chunks가 들어올 수 있음 (압축 안 함). + fusion 직전에 compress_chunks_to_docs() helper로 doc 기준 압축 필요. + Phase 1.3 reranker는 raw chunks를 그대로 활용. + + SearchResult.id = doc_id (fusion 호환) + SearchResult.chunk_id / chunk_index / section_title = chunk 메타 + snippet = chunk의 text 앞 200자 """ from api.search import SearchResult # 순환 import 회피 @@ -135,17 +141,63 @@ async def search_vector( except Exception: return [] + # raw chunks를 doc 메타와 join. limit * 5 정도 넓게 → 압축 후 doc 다양성. + fetch_limit = limit * 5 result = await session.execute( text(""" - SELECT id, title, ai_domain, ai_summary, file_format, - (1 - (embedding <=> cast(:embedding AS vector))) AS score, - left(extracted_text, 200) AS snippet, - 'vector' AS match_reason - FROM documents - WHERE embedding IS NOT NULL AND deleted_at IS NULL - ORDER BY embedding <=> cast(:embedding AS vector) + SELECT + d.id AS id, + d.title AS title, + d.ai_domain AS ai_domain, + d.ai_summary AS ai_summary, + d.file_format AS file_format, + (1 - (c.embedding <=> cast(:embedding AS vector))) AS score, + left(c.text, 200) AS snippet, + 'vector' AS match_reason, + c.id AS chunk_id, + c.chunk_index AS chunk_index, + c.section_title AS section_title + FROM document_chunks c + JOIN documents d ON d.id = c.doc_id + WHERE c.embedding IS NOT NULL AND d.deleted_at IS NULL + ORDER BY c.embedding <=> cast(:embedding AS vector) LIMIT :limit """), - {"embedding": str(query_embedding), "limit": limit}, + {"embedding": str(query_embedding), "limit": fetch_limit}, ) return [SearchResult(**row._mapping) for row in result] + + +def compress_chunks_to_docs( + chunks: list["SearchResult"], limit: int +) -> tuple[list["SearchResult"], dict[int, list["SearchResult"]]]: + """chunk-level 결과를 doc-level로 압축하면서 raw chunks를 보존. + + fusion은 doc 기준이어야 하지만(같은 doc 중복 방지), Phase 1.3 reranker는 + chunk 기준 raw 데이터가 필요함. 따라서 압축본과 raw를 동시 반환. + + 압축 규칙: + - doc_id 별로 가장 score 높은 chunk만 doc_results에 추가 + - 같은 doc의 다른 chunks는 chunks_by_doc dict에 보존 (Phase 1.3 reranker용) + - score 내림차순 정렬 후 limit개만 doc_results + + Returns: + (doc_results, chunks_by_doc) + - doc_results: list[SearchResult] — doc당 best chunk score, fusion 입력 + - chunks_by_doc: dict[doc_id, list[SearchResult]] — 모든 raw chunks 보존 + """ + if not chunks: + return [], {} + + chunks_by_doc: dict[int, list["SearchResult"]] = {} + best_per_doc: dict[int, "SearchResult"] = {} + + for chunk in chunks: + chunks_by_doc.setdefault(chunk.id, []).append(chunk) + prev_best = best_per_doc.get(chunk.id) + if prev_best is None or chunk.score > prev_best.score: + best_per_doc[chunk.id] = chunk + + # doc 단위 best score 정렬, 상위 limit개 + doc_results = sorted(best_per_doc.values(), key=lambda r: r.score, reverse=True) + return doc_results[:limit], chunks_by_doc