diff --git a/app/ai/client.py b/app/ai/client.py index d1ad612..601dad9 100644 --- a/app/ai/client.py +++ b/app/ai/client.py @@ -85,6 +85,25 @@ class AIClient: # TODO: Qwen2.5-VL-7B 비전 모델 호출 구현 raise NotImplementedError("OCR는 Phase 1에서 구현") + async def rerank(self, query: str, texts: list[str]) -> list[dict]: + """TEI bge-reranker-v2-m3 호출 (Phase 1.3). + + TEI POST /rerank API: + request: {"query": str, "texts": [str, ...]} + response: [{"index": int, "score": float}, ...] (정렬됨) + + timeout은 self.ai.rerank.timeout (config.yaml). + 호출자(rerank_service)가 asyncio.Semaphore + try/except로 감쌈. + """ + timeout = float(self.ai.rerank.timeout) if self.ai.rerank.timeout else 5.0 + response = await self._http.post( + self.ai.rerank.endpoint, + json={"query": query, "texts": texts}, + timeout=timeout, + ) + response.raise_for_status() + return response.json() + async def _call_chat(self, model_config, prompt: str) -> str: """OpenAI 호환 API 호출 + 자동 폴백""" try: diff --git a/app/api/search.py b/app/api/search.py index 5682866..c0e041d 100644 --- a/app/api/search.py +++ b/app/api/search.py @@ -16,10 +16,17 @@ from core.database import get_session from core.utils import setup_logger from models.user import User from services.search.fusion_service import DEFAULT_FUSION, get_strategy, normalize_display_scores -from services.search.retrieval_service import search_text, search_vector +from services.search.rerank_service import ( + MAX_CHUNKS_PER_DOC, + MAX_RERANK_INPUT, + apply_diversity, + rerank_chunks, +) +from services.search.retrieval_service import compress_chunks_to_docs, search_text, search_vector from services.search_telemetry import ( compute_confidence, compute_confidence_hybrid, + compute_confidence_reranked, record_search_event, ) @@ -30,7 +37,14 @@ router = APIRouter() class SearchResult(BaseModel): - id: int + """검색 결과 단일 행. + + Phase 1.2-C: chunk-level vector retrieval 도입으로 chunk 메타 필드 추가. + text 검색 결과는 chunk_id 등이 None (doc-level). + vector 검색 결과는 chunk_id 등이 채워짐 (chunk-level). + """ + + id: int # doc_id (text/vector 공통) title: str | None ai_domain: str | None ai_summary: str | None @@ -38,6 +52,10 @@ class SearchResult(BaseModel): score: float snippet: str | None match_reason: str | None = None + # Phase 1.2-C: chunk 메타 (vector 검색 시 채워짐) + chunk_id: int | None = None + chunk_index: int | None = None + section_title: str | None = None # ─── Phase 0.4: 디버그 응답 스키마 ───────────────────────── @@ -93,22 +111,30 @@ async def search( pattern="^(legacy|rrf|rrf_boost)$", description="hybrid 모드 fusion 전략 (legacy=기존 가중합, rrf=RRF k=60, rrf_boost=RRF+강한신호 boost)", ), + rerank: bool = Query( + True, + description="bge-reranker-v2-m3 활성화 (Phase 1.3, hybrid 모드만 동작)", + ), debug: bool = Query(False, description="단계별 candidates + timing 응답에 포함"), ): """문서 검색 — FTS + ILIKE + 벡터 결합 (Phase 0.5: RRF fusion)""" timing: dict[str, float] = {} notes: list[str] = [] text_results: list[SearchResult] = [] - vector_results: list[SearchResult] = [] + vector_results: list[SearchResult] = [] # doc-level (압축 후, fusion 입력) + raw_chunks: list[SearchResult] = [] # chunk-level (raw, Phase 1.3 reranker용) + chunks_by_doc: dict[int, list[SearchResult]] = {} # Phase 1.3 reranker용 보존 t_total = time.perf_counter() if mode == "vector": t0 = time.perf_counter() - vector_results = await search_vector(session, q, limit) + raw_chunks = await search_vector(session, q, limit) timing["vector_ms"] = (time.perf_counter() - t0) * 1000 - if not vector_results: + if not raw_chunks: notes.append("vector_search_returned_empty (AI client error or no embeddings)") + # vector 단독 모드도 doc 압축해서 다양성 확보 (chunk 중복 방지) + vector_results, chunks_by_doc = compress_chunks_to_docs(raw_chunks, limit) results = vector_results else: t0 = time.perf_counter() @@ -117,16 +143,57 @@ async def search( if mode == "hybrid": t1 = time.perf_counter() - vector_results = await search_vector(session, q, limit) + raw_chunks = await search_vector(session, q, limit) timing["vector_ms"] = (time.perf_counter() - t1) * 1000 + + # chunk-level → doc-level 압축 (raw chunks는 chunks_by_doc에 보존) + t1b = time.perf_counter() + vector_results, chunks_by_doc = compress_chunks_to_docs(raw_chunks, limit) + timing["compress_ms"] = (time.perf_counter() - t1b) * 1000 + if not vector_results: notes.append("vector_search_returned_empty — text-only fallback") t2 = time.perf_counter() strategy = get_strategy(fusion) - results = strategy.fuse(text_results, vector_results, q, limit) + # fusion은 doc 기준 — 더 넓게 가져옴 (rerank 후보용) + fusion_limit = max(limit * 5, 100) if rerank else limit + fused_docs = strategy.fuse(text_results, vector_results, q, fusion_limit) timing["fusion_ms"] = (time.perf_counter() - t2) * 1000 notes.append(f"fusion={strategy.name}") + notes.append( + f"chunks raw={len(raw_chunks)} compressed={len(vector_results)} " + f"unique_docs={len(chunks_by_doc)}" + ) + + if rerank: + # Phase 1.3: reranker — chunk 기준 입력 + # fusion 결과 doc_id로 chunks_by_doc에서 raw chunks 회수 + t3 = time.perf_counter() + rerank_input: list[SearchResult] = [] + for doc in fused_docs: + chunks = chunks_by_doc.get(doc.id, []) + if chunks: + # doc당 max 2 chunk (latency/VRAM 보호) + rerank_input.extend(chunks[:MAX_CHUNKS_PER_DOC]) + else: + # text-only 매치 doc → doc 자체를 chunk처럼 wrap + rerank_input.append(doc) + if len(rerank_input) >= MAX_RERANK_INPUT: + break + rerank_input = rerank_input[:MAX_RERANK_INPUT] + notes.append(f"rerank input={len(rerank_input)}") + + reranked = await rerank_chunks(q, rerank_input, limit * 3) + timing["rerank_ms"] = (time.perf_counter() - t3) * 1000 + + # diversity (chunk → doc 압축, max_per_doc=2, top score>0.90 unlimited) + t4 = time.perf_counter() + results = apply_diversity(reranked, max_per_doc=MAX_CHUNKS_PER_DOC)[:limit] + timing["diversity_ms"] = (time.perf_counter() - t4) * 1000 + else: + # rerank 비활성: fused_docs를 그대로 (limit 적용) + results = fused_docs[:limit] else: results = text_results @@ -137,8 +204,12 @@ async def search( timing["total_ms"] = (time.perf_counter() - t_total) * 1000 # confidence는 fusion 적용 전 raw 신호로 계산 (Phase 0.5 이후 fused score는 절대값 의미 없음) + # rerank 활성 시 reranker score가 가장 신뢰할 수 있는 신호 → 우선 사용 if mode == "hybrid": - confidence_signal = compute_confidence_hybrid(text_results, vector_results) + if rerank and "rerank_ms" in timing: + confidence_signal = compute_confidence_reranked(results) + else: + confidence_signal = compute_confidence_hybrid(text_results, vector_results) elif mode == "vector": confidence_signal = compute_confidence(vector_results, "vector") else: diff --git a/app/services/search/rerank_service.py b/app/services/search/rerank_service.py index 16d9373..c107f59 100644 --- a/app/services/search/rerank_service.py +++ b/app/services/search/rerank_service.py @@ -1,5 +1,199 @@ """Reranker 서비스 — bge-reranker-v2-m3 통합 (Phase 1.3). TEI 컨테이너 호출 + asyncio.Semaphore(2) + soft timeout fallback. -구현은 Phase 1.3에서 채움. + +데이터 흐름 원칙: +- fusion = doc 기준 / reranker = chunk 기준 — 절대 섞지 말 것 +- raw chunks를 끝까지 보존, fusion은 압축본만 사용 +- reranker는 chunks_by_doc dict에서 raw chunks 회수해서 chunk 단위로 호출 +- diversity는 reranker 직후 마지막 단계에서만 적용 + +snippet 생성: +- 200~400 토큰(800~1500자) 기준 +- query keyword 위치 중심 ±target_chars/2 윈도우 +- keyword 매치 없으면 첫 target_chars 문자 fallback (성능 손실 방지) """ + +from __future__ import annotations + +import asyncio +import re +from typing import TYPE_CHECKING + +import httpx + +from ai.client import AIClient +from core.utils import setup_logger + +if TYPE_CHECKING: + from api.search import SearchResult + +logger = setup_logger("rerank") + +# 동시 rerank 호출 제한 (GPU saturation 방지) +RERANK_SEMAPHORE = asyncio.Semaphore(2) + +# rerank input 크기 제한 (latency / VRAM hard cap) +MAX_RERANK_INPUT = 200 +MAX_CHUNKS_PER_DOC = 2 + +# Soft timeout (초) +RERANK_TIMEOUT = 5.0 + + +def _extract_window(text: str, query: str, target_chars: int = 800) -> str: + """query keyword 위치 중심으로 ±target_chars/2 윈도우 추출. + + fallback: keyword 매치 없으면 첫 target_chars 문자 그대로. + 이게 없으면 reranker가 무관한 텍스트만 보고 점수 매겨 성능 급락. + """ + keywords = [k for k in re.split(r"\s+", query) if len(k) >= 2] + best_pos = -1 + for kw in keywords: + pos = text.lower().find(kw.lower()) + if pos >= 0: + best_pos = pos + break + + if best_pos < 0: + # Fallback: 첫 target_chars 문자 + return text[:target_chars] + + half = target_chars // 2 + start = max(0, best_pos - half) + end = min(len(text), start + target_chars) + return text[start:end] + + +def _make_snippet(c: "SearchResult", query: str, max_chars: int = 1500) -> str: + """Reranker input snippet — title + query 중심 본문 윈도우. + + feedback_search_phase1_implementation.md 3번 항목 강제: + snippet 200~400 토큰(800~1500자), full document 절대 안 됨. + """ + title = c.title or "" + text = c.snippet or "" + + # snippet은 chunk text 앞 200자 또는 doc text 앞 200자 + # 더 긴 chunk text가 필요하면 호출자가 따로 채워서 넘김 + if len(text) > max_chars: + text = _extract_window(text, query, target_chars=max_chars - 100) + + return f"{title}\n\n{text}" + + +def _wrap_doc_as_chunk(doc: "SearchResult") -> "SearchResult": + """text-only 매치 doc(chunks_by_doc에 없는 doc)을 ChunkResult 형태로 변환. + + Phase 1.3 reranker 입력에 doc 자체가 들어가야 하는 경우. + snippet은 documents.extracted_text 앞 200자 (이미 SearchResult.snippet에 채워짐). + chunk_id 등은 None 그대로. + """ + return doc + + +async def rerank_chunks( + query: str, + candidates: list["SearchResult"], + limit: int, +) -> list["SearchResult"]: + """RRF 결과 candidates를 bge-reranker로 재정렬. + + Args: + query: 사용자 쿼리 + candidates: chunk-level SearchResult 리스트 (이미 chunks_by_doc에서 회수) + limit: 반환할 결과 수 + + Returns: + reranked SearchResult 리스트 (rerank score로 score 필드 업데이트) + + Fallback (timeout/HTTPError): RRF 순서 그대로 candidates[:limit] 반환. + """ + if not candidates: + return [] + + # input 크기 제한 (latency/VRAM hard cap) + if len(candidates) > MAX_RERANK_INPUT: + logger.warning( + f"rerank input {len(candidates)} > MAX {MAX_RERANK_INPUT}, 자름" + ) + candidates = candidates[:MAX_RERANK_INPUT] + + snippets = [_make_snippet(c, query) for c in candidates] + client = AIClient() + + try: + async with asyncio.timeout(RERANK_TIMEOUT): + async with RERANK_SEMAPHORE: + results = await client.rerank(query, snippets) + # results: [{"index": int, "score": float}, ...] (이미 정렬됨) + reranked: list["SearchResult"] = [] + for r in results: + idx = r.get("index") + sc = r.get("score") + if idx is None or sc is None or idx >= len(candidates): + continue + chunk = candidates[idx] + chunk.score = float(sc) + chunk.match_reason = (chunk.match_reason or "") + "+rerank" + reranked.append(chunk) + return reranked[:limit] + except (asyncio.TimeoutError, httpx.HTTPError) as e: + logger.warning(f"rerank failed → RRF fallback: {type(e).__name__}: {e}") + return candidates[:limit] + except Exception as e: + logger.warning(f"rerank unexpected error → RRF fallback: {type(e).__name__}: {e}") + return candidates[:limit] + finally: + await client.close() + + +async def warmup_reranker() -> bool: + """TEI 부팅 후 모델 로딩 완료 대기 (10회 retry). + + TEI는 health 200을 빠르게 반환하지만 첫 모델 로딩(10~30초) 전에는 + rerank 요청이 실패하거나 매우 느림. FastAPI startup 또는 첫 요청 전 호출. + """ + client = AIClient() + try: + for attempt in range(10): + try: + await client.rerank("warmup", ["dummy text for model load"]) + logger.info(f"reranker warmup OK (attempt {attempt + 1})") + return True + except Exception as e: + logger.info(f"reranker warmup retry {attempt + 1}: {e}") + await asyncio.sleep(3) + logger.error("reranker warmup failed after 10 attempts") + return False + finally: + await client.close() + + +def apply_diversity( + results: list["SearchResult"], + max_per_doc: int = MAX_CHUNKS_PER_DOC, + top_score_threshold: float = 0.90, +) -> list["SearchResult"]: + """chunk-level 결과를 doc 기준으로 압축 (max_per_doc). + + 조건부 완화: 가장 상위 결과 score가 threshold 이상이면 unlimited + (high confidence relevance > diversity). + """ + if not results: + return [] + + # 가장 상위 score가 threshold 이상이면 diversity 제약 해제 + top_score = results[0].score if results else 0.0 + if top_score >= top_score_threshold: + return results + + seen: dict[int, int] = {} + out: list["SearchResult"] = [] + for r in results: + doc_id = r.id + if seen.get(doc_id, 0) >= max_per_doc: + continue + out.append(r) + seen[doc_id] = seen.get(doc_id, 0) + 1 + return out diff --git a/app/services/search/retrieval_service.py b/app/services/search/retrieval_service.py index 3fea96f..e690ef6 100644 --- a/app/services/search/retrieval_service.py +++ b/app/services/search/retrieval_service.py @@ -1,11 +1,11 @@ -"""검색 후보 수집 서비스 (Phase 1.1). +"""검색 후보 수집 서비스 (Phase 1.2). -text(documents FTS + 키워드) + vector(documents.embedding) 후보를 +text(documents FTS + trigram) + vector(documents.embedding → chunks) 후보를 SearchResult 리스트로 반환. -Phase 1.1: search.py의 _search_text/_search_vector를 이전. -Phase 1.1 후속 substep: ILIKE → trigram `similarity()` + `gin_trgm_ops`. -Phase 1.2: vector retrieval을 document_chunks 테이블 기반으로 전환. +Phase 1.1a: search.py의 _search_text/_search_vector를 이전 (ILIKE 그대로). +Phase 1.2-B: ILIKE → trigram `%` + `similarity()`. ILIKE 풀 스캔 제거. +Phase 1.2-B 이후: vector retrieval을 document_chunks 테이블 기반으로 전환. """ from __future__ import annotations @@ -24,52 +24,92 @@ if TYPE_CHECKING: async def search_text( session: AsyncSession, query: str, limit: int ) -> list["SearchResult"]: - """FTS + ILIKE 필드별 가중치 검색. + """FTS + trigram 필드별 가중치 검색 (Phase 1.2-B UNION 분해). + Phase 1.2-B 진단: + OR로 묶은 단일 SELECT는 PostgreSQL planner가 OR 결합 인덱스를 못 만들고 + Seq Scan을 선택 (small table 765 docs). EXPLAIN으로 측정 시 525ms. + → CTE + UNION으로 분해하면 각 branch가 자기 인덱스 활용 → 26ms (95% 감소). + + 구조: + candidates CTE + ├─ title % → idx_documents_title_trgm + ├─ ai_summary % → idx_documents_ai_summary_trgm + │ (length > 0 partial index 매치 조건 포함) + └─ FTS @@ plainto_tsquery → idx_documents_fts_full + JOIN documents d ON d.id = c.id + ORDER BY 5컬럼 similarity 가중 합산 + ts_rank * 2.0 가중치: title 3.0 / ai_tags 2.5 / user_note 2.0 / ai_summary 1.5 / extracted_text 1.0 - + ts_rank * 2.0 보너스. + + threshold: + pg_trgm.similarity_threshold default = 0.3 + → multi-token 한국어 뉴스 쿼리(예: "이란 미국 전쟁 글로벌 반응")에서 + candidates를 못 모음 → recall 감소 (0.788 → 0.750) + → set_limit(0.15)으로 낮춰 recall 회복. precision은 ORDER BY similarity 합산이 보정. """ from api.search import SearchResult # 순환 import 회피 + # trigram threshold를 0.15로 낮춰 multi-token query recall 회복 + # SQLAlchemy async session 내 두 execute는 같은 connection 사용 + await session.execute(text("SELECT set_limit(0.15)")) + result = await session.execute( text(""" - SELECT id, title, ai_domain, ai_summary, file_format, - left(extracted_text, 200) AS snippet, + WITH candidates AS ( + -- title trigram (idx_documents_title_trgm) + SELECT id FROM documents + WHERE deleted_at IS NULL AND title % :q + UNION + -- ai_summary trigram (idx_documents_ai_summary_trgm 부분 인덱스 매치) + SELECT id FROM documents + WHERE deleted_at IS NULL + AND ai_summary IS NOT NULL + AND length(ai_summary) > 0 + AND ai_summary % :q + UNION + -- FTS 통합 인덱스 (idx_documents_fts_full) + SELECT id FROM documents + WHERE deleted_at IS NULL + AND to_tsvector('simple', + coalesce(title, '') || ' ' || + coalesce(ai_tags::text, '') || ' ' || + coalesce(ai_summary, '') || ' ' || + coalesce(user_note, '') || ' ' || + coalesce(extracted_text, '') + ) @@ plainto_tsquery('simple', :q) + ) + SELECT d.id, d.title, d.ai_domain, d.ai_summary, d.file_format, + left(d.extracted_text, 200) AS snippet, ( - -- title 매칭 (가중치 최고) - CASE WHEN coalesce(title, '') ILIKE '%%' || :q || '%%' THEN 3.0 ELSE 0 END - -- ai_tags 매칭 (가중치 높음) - + CASE WHEN coalesce(ai_tags::text, '') ILIKE '%%' || :q || '%%' THEN 2.5 ELSE 0 END - -- user_note 매칭 (가중치 높음) - + CASE WHEN coalesce(user_note, '') ILIKE '%%' || :q || '%%' THEN 2.0 ELSE 0 END - -- ai_summary 매칭 (가중치 중상) - + CASE WHEN coalesce(ai_summary, '') ILIKE '%%' || :q || '%%' THEN 1.5 ELSE 0 END - -- extracted_text 매칭 (가중치 중간) - + CASE WHEN coalesce(extracted_text, '') ILIKE '%%' || :q || '%%' THEN 1.0 ELSE 0 END - -- FTS 점수 (보너스) + -- 컬럼별 trigram similarity 가중 합산 + similarity(coalesce(d.title, ''), :q) * 3.0 + + similarity(coalesce(d.ai_tags::text, ''), :q) * 2.5 + + similarity(coalesce(d.user_note, ''), :q) * 2.0 + + similarity(coalesce(d.ai_summary, ''), :q) * 1.5 + + similarity(coalesce(d.extracted_text, ''), :q) * 1.0 + -- FTS 보너스 (idx_documents_fts_full 활용) + coalesce(ts_rank( - to_tsvector('simple', coalesce(title, '') || ' ' || coalesce(extracted_text, '')), + to_tsvector('simple', + coalesce(d.title, '') || ' ' || + coalesce(d.ai_tags::text, '') || ' ' || + coalesce(d.ai_summary, '') || ' ' || + coalesce(d.user_note, '') || ' ' || + coalesce(d.extracted_text, '') + ), plainto_tsquery('simple', :q) ), 0) * 2.0 ) AS score, - -- match reason + -- match_reason: similarity 가장 큰 컬럼 또는 FTS CASE - WHEN coalesce(title, '') ILIKE '%%' || :q || '%%' THEN 'title' - WHEN coalesce(ai_tags::text, '') ILIKE '%%' || :q || '%%' THEN 'tags' - WHEN coalesce(user_note, '') ILIKE '%%' || :q || '%%' THEN 'note' - WHEN coalesce(ai_summary, '') ILIKE '%%' || :q || '%%' THEN 'summary' - WHEN coalesce(extracted_text, '') ILIKE '%%' || :q || '%%' THEN 'content' + WHEN similarity(coalesce(d.title, ''), :q) >= 0.3 THEN 'title' + WHEN similarity(coalesce(d.ai_tags::text, ''), :q) >= 0.3 THEN 'tags' + WHEN similarity(coalesce(d.user_note, ''), :q) >= 0.3 THEN 'note' + WHEN similarity(coalesce(d.ai_summary, ''), :q) >= 0.3 THEN 'summary' + WHEN similarity(coalesce(d.extracted_text, ''), :q) >= 0.3 THEN 'content' ELSE 'fts' END AS match_reason - FROM documents - WHERE deleted_at IS NULL - AND (coalesce(title, '') ILIKE '%%' || :q || '%%' - OR coalesce(ai_tags::text, '') ILIKE '%%' || :q || '%%' - OR coalesce(user_note, '') ILIKE '%%' || :q || '%%' - OR coalesce(ai_summary, '') ILIKE '%%' || :q || '%%' - OR coalesce(extracted_text, '') ILIKE '%%' || :q || '%%' - OR to_tsvector('simple', coalesce(title, '') || ' ' || coalesce(extracted_text, '')) - @@ plainto_tsquery('simple', :q)) + FROM documents d + JOIN candidates c ON d.id = c.id ORDER BY score DESC LIMIT :limit """), @@ -81,10 +121,24 @@ async def search_text( async def search_vector( session: AsyncSession, query: str, limit: int ) -> list["SearchResult"]: - """벡터 유사도 검색 (코사인 거리). + """벡터 유사도 검색 — chunk-level + doc 다양성 보장 (Phase 1.2-C). - Phase 1.2에서 document_chunks 테이블 기반으로 전환 예정. - 현재는 documents.embedding 사용. + Phase 1.2-C 진단: + 단순 chunk top-N 가져오면 같은 doc의 여러 chunks가 상위에 몰려 + unique doc 다양성 붕괴 → recall 0.788 → 0.531 (catastrophic). + + 해결 (사용자 추천 C 방식): + Window function으로 doc_id 기준 PARTITION → 각 doc의 top 2 chunks만 반환. + raw_chunks(chunks_by_doc 보존)와 doc-level 압축 둘 다 만족. + + SQL 흐름: + 1. inner CTE: ivfflat 인덱스로 top-K chunks 빠르게 추출 + 2. ranked CTE: doc_id PARTITION 후 score 내림차순 ROW_NUMBER + 3. outer: rn <= 2 (doc당 max 2 chunks) + JOIN documents + + Returns: + list[SearchResult] — chunk-level, 각 doc 최대 2개. compress_chunks_to_docs로 + doc-level 압축 + chunks_by_doc 보존. """ from api.search import SearchResult # 순환 import 회피 @@ -95,17 +149,83 @@ async def search_vector( except Exception: return [] + # ivfflat 인덱스로 top-K chunks 추출 후 doc 단위 partition + # inner_k = limit * 10 정도로 충분 unique doc 확보 (~30~50 docs) + inner_k = max(limit * 10, 200) result = await session.execute( text(""" - SELECT id, title, ai_domain, ai_summary, file_format, - (1 - (embedding <=> cast(:embedding AS vector))) AS score, - left(extracted_text, 200) AS snippet, - 'vector' AS match_reason - FROM documents - WHERE embedding IS NOT NULL AND deleted_at IS NULL - ORDER BY embedding <=> cast(:embedding AS vector) + WITH topk AS ( + SELECT + c.id AS chunk_id, + c.doc_id, + c.chunk_index, + c.section_title, + c.text, + c.embedding <=> cast(:embedding AS vector) AS dist + FROM document_chunks c + WHERE c.embedding IS NOT NULL + ORDER BY c.embedding <=> cast(:embedding AS vector) + LIMIT :inner_k + ), + ranked AS ( + SELECT + chunk_id, doc_id, chunk_index, section_title, text, dist, + ROW_NUMBER() OVER (PARTITION BY doc_id ORDER BY dist ASC) AS rn + FROM topk + ) + SELECT + d.id AS id, + d.title AS title, + d.ai_domain AS ai_domain, + d.ai_summary AS ai_summary, + d.file_format AS file_format, + (1 - r.dist) AS score, + left(r.text, 200) AS snippet, + 'vector' AS match_reason, + r.chunk_id AS chunk_id, + r.chunk_index AS chunk_index, + r.section_title AS section_title + FROM ranked r + JOIN documents d ON d.id = r.doc_id + WHERE r.rn <= 2 AND d.deleted_at IS NULL + ORDER BY r.dist LIMIT :limit """), - {"embedding": str(query_embedding), "limit": limit}, + {"embedding": str(query_embedding), "inner_k": inner_k, "limit": limit * 4}, ) return [SearchResult(**row._mapping) for row in result] + + +def compress_chunks_to_docs( + chunks: list["SearchResult"], limit: int +) -> tuple[list["SearchResult"], dict[int, list["SearchResult"]]]: + """chunk-level 결과를 doc-level로 압축하면서 raw chunks를 보존. + + fusion은 doc 기준이어야 하지만(같은 doc 중복 방지), Phase 1.3 reranker는 + chunk 기준 raw 데이터가 필요함. 따라서 압축본과 raw를 동시 반환. + + 압축 규칙: + - doc_id 별로 가장 score 높은 chunk만 doc_results에 추가 + - 같은 doc의 다른 chunks는 chunks_by_doc dict에 보존 (Phase 1.3 reranker용) + - score 내림차순 정렬 후 limit개만 doc_results + + Returns: + (doc_results, chunks_by_doc) + - doc_results: list[SearchResult] — doc당 best chunk score, fusion 입력 + - chunks_by_doc: dict[doc_id, list[SearchResult]] — 모든 raw chunks 보존 + """ + if not chunks: + return [], {} + + chunks_by_doc: dict[int, list["SearchResult"]] = {} + best_per_doc: dict[int, "SearchResult"] = {} + + for chunk in chunks: + chunks_by_doc.setdefault(chunk.id, []).append(chunk) + prev_best = best_per_doc.get(chunk.id) + if prev_best is None or chunk.score > prev_best.score: + best_per_doc[chunk.id] = chunk + + # doc 단위 best score 정렬, 상위 limit개 + doc_results = sorted(best_per_doc.values(), key=lambda r: r.score, reverse=True) + return doc_results[:limit], chunks_by_doc diff --git a/app/services/search_telemetry.py b/app/services/search_telemetry.py index eb2dbb5..2bb82d1 100644 --- a/app/services/search_telemetry.py +++ b/app/services/search_telemetry.py @@ -149,6 +149,33 @@ def _cosine_to_confidence(cosine: float) -> float: return 0.10 +def compute_confidence_reranked(reranked_results: list[Any]) -> float: + """Phase 1.3 reranker score 기반 confidence. + + bge-reranker-v2-m3는 sigmoid score (0~1 범위)를 반환. + rerank 활성 시 fusion score보다 reranker score가 가장 신뢰할 수 있는 신호. + + 임계값(초안, 실측 후 조정 가능): + >= 0.95 → high + >= 0.80 → med-high + >= 0.60 → med + >= 0.40 → low-med + else → low + """ + if not reranked_results: + return 0.0 + top_score = float(getattr(reranked_results[0], "score", 0.0) or 0.0) + if top_score >= 0.95: + return 0.95 + if top_score >= 0.80: + return 0.80 + if top_score >= 0.60: + return 0.65 + if top_score >= 0.40: + return 0.50 + return 0.35 + + def compute_confidence_hybrid( text_results: list[Any], vector_results: list[Any], diff --git a/app/workers/chunk_worker.py b/app/workers/chunk_worker.py index d67ccd1..a9f1baf 100644 --- a/app/workers/chunk_worker.py +++ b/app/workers/chunk_worker.py @@ -79,11 +79,20 @@ def _classify_chunk_strategy(doc: Document) -> str: # ─── Chunking 전략 ─── def _chunk_legal(text: str) -> list[dict]: - """법령: 제N조 단위로 분할 (상위 조문 컨텍스트 보존)""" + """법령: 제N조 단위로 분할 (상위 조문 컨텍스트 보존). + + 영어/외국 법령(ai_domain Foreign_Law 등)은 "제N조" 패턴이 없어 split 결과가 + 1개 element만 나옴 → 서문 chunk 1개만 생성되고 본문 대부분이 손실되는 버그. + 조문 패턴 미검출 시 sliding window fallback으로 처리. + """ # "제 1 조", "제1조", "제 1 조(제목)" 등 매칭 pattern = re.compile(r"(제\s*\d+\s*조(?:의\s*\d+)?(?:\([^)]*\))?)") parts = pattern.split(text) + # 조문 패턴 미검출 (영어/외국 법령 등) → sliding window fallback + if len(parts) <= 1: + return _chunk_sliding(text, DEFAULT_WINDOW_CHARS, DEFAULT_OVERLAP_CHARS, "section") + chunks = [] # parts[0] = 조 이전 서문, parts[1], parts[2] = (마커, 본문) pairs if parts[0].strip() and len(parts[0]) >= MIN_CHUNK_CHARS: diff --git a/migrations/016_fts_expand_and_trgm.sql b/migrations/016_fts_expand_and_trgm.sql new file mode 100644 index 0000000..54c2bda --- /dev/null +++ b/migrations/016_fts_expand_and_trgm.sql @@ -0,0 +1,47 @@ +-- Phase 1.2: documents 테이블 FTS 확장 + trigram 인덱스 +-- +-- 목적: +-- 1) FTS 인덱스를 title + ai_tags + ai_summary + user_note + extracted_text 통합 범위로 확장 +-- 현재 retrieval_service.search_text의 SQL 안 to_tsvector(...)는 인덱스 없이 동작. +-- 2) trigram 인덱스로 ILIKE 풀스캔(text_ms 470ms)을 similarity() + GIN 인덱스로 대체. +-- +-- 데이터 규모 (2026-04-07 측정): documents 765 / 평균 본문 8.5KB / 총 6.5MB +-- 인덱스 빌드 시간 추산: 5~30초 (CONCURRENTLY 불필요, 짧은 lock 수용 가능) +-- +-- Phase 1.2-A 단독 적용. 1.2-B에서 retrieval_service.search_text의 SQL을 +-- ILIKE → similarity() + `%` 연산자로 전환하면서 이 인덱스들을 활용. + +-- pg_trgm extension (014에서 이미 활성화, IF NOT EXISTS로 안전) +CREATE EXTENSION IF NOT EXISTS pg_trgm; + +-- ─── 1) 통합 FTS 인덱스 ──────────────────────────────────── +-- title + ai_tags(JSONB→text) + ai_summary + user_note + extracted_text를 한 번에 토큰화. +-- retrieval_service.search_text의 ts_rank 호출이 이 인덱스를 사용하도록 SQL 갱신 예정. +CREATE INDEX IF NOT EXISTS idx_documents_fts_full ON documents + USING GIN ( + to_tsvector('simple', + coalesce(title, '') || ' ' || + coalesce(ai_tags::text, '') || ' ' || + coalesce(ai_summary, '') || ' ' || + coalesce(user_note, '') || ' ' || + coalesce(extracted_text, '') + ) + ); + +-- ─── 2) title trigram 인덱스 ─────────────────────────────── +-- 가장 자주 매칭되는 컬럼. similarity(title, query) > threshold + ORDER BY로 사용. +CREATE INDEX IF NOT EXISTS idx_documents_title_trgm ON documents + USING GIN (title gin_trgm_ops); + +-- ─── 3) extracted_text trigram 인덱스 ────────────────────── +-- ILIKE의 dominant cost를 trigram GIN 인덱스로 대체. +-- WHERE 절로 NULL/빈 본문 제외해 인덱스 크기 절감. +CREATE INDEX IF NOT EXISTS idx_documents_extracted_text_trgm ON documents + USING GIN (extracted_text gin_trgm_ops) + WHERE extracted_text IS NOT NULL AND length(extracted_text) > 0; + +-- ─── 4) ai_summary trigram 인덱스 ────────────────────────── +-- summary는 짧지만 의미 매칭에 자주 활용 (가중치 1.5). +CREATE INDEX IF NOT EXISTS idx_documents_ai_summary_trgm ON documents + USING GIN (ai_summary gin_trgm_ops) + WHERE ai_summary IS NOT NULL AND length(ai_summary) > 0; diff --git a/tests/scripts/__init__.py b/tests/scripts/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/scripts/reindex_all_chunks.py b/tests/scripts/reindex_all_chunks.py new file mode 100644 index 0000000..851fb8d --- /dev/null +++ b/tests/scripts/reindex_all_chunks.py @@ -0,0 +1,204 @@ +"""문서 chunk 재인덱싱 (Phase 1.2-E). + +전체 documents를 chunk_worker로 재처리. 야간 배치 권장 (00:00~06:00). + +핵심 요건 (사용자 정의): +- concurrency 제한 (asyncio.Semaphore) — Ollama 부하 조절 +- checkpoint resume (중간 실패/중단 대비) +- rate limiting (Ollama API 보호) +- 진행 로그 ([REINDEX] N/total (P%) ETA: ...) + +사용: + cd /home/hyungi/Documents/code/hyungi_Document_Server + PYTHONPATH=app .venv/bin/python tests/scripts/reindex_all_chunks.py \\ + --concurrency 3 \\ + --checkpoint checkpoints/reindex.json \\ + > logs/reindex.log 2>&1 & + +dry-run (5개만): + PYTHONPATH=app .venv/bin/python tests/scripts/reindex_all_chunks.py --limit 5 + +기존 chunks 보유 doc 건너뛰기: + PYTHONPATH=app .venv/bin/python tests/scripts/reindex_all_chunks.py --skip-existing + +기존 chunks 강제 재처리 (chunk_worker가 자동으로 delete + insert): + PYTHONPATH=app .venv/bin/python tests/scripts/reindex_all_chunks.py +""" + +import argparse +import asyncio +import json +import sys +import time +from pathlib import Path + +# PYTHONPATH=app 가정 +sys.path.insert(0, str(Path(__file__).resolve().parent.parent.parent / "app")) + +from sqlalchemy import select # noqa: E402 +from sqlalchemy.ext.asyncio import async_sessionmaker # noqa: E402 + +from core.database import engine # noqa: E402 +from core.utils import setup_logger # noqa: E402 +from models.chunk import DocumentChunk # noqa: E402 +from models.document import Document # noqa: E402 +from workers.chunk_worker import process # noqa: E402 + +logger = setup_logger("reindex") + + +def load_checkpoint(path: Path) -> set[int]: + """checkpoint 파일에서 처리 완료 doc_id 집합 복원.""" + if not path.exists(): + return set() + try: + data = json.loads(path.read_text()) + return set(data.get("processed", [])) + except (json.JSONDecodeError, KeyError) as e: + logger.warning(f"checkpoint {path} invalid ({e}) → 새로 시작") + return set() + + +def save_checkpoint(path: Path, processed: set[int]) -> None: + """처리 완료 doc_id를 checkpoint 파일에 저장 (incremental).""" + path.parent.mkdir(parents=True, exist_ok=True) + tmp = path.with_suffix(path.suffix + ".tmp") + tmp.write_text(json.dumps({"processed": sorted(processed)}, indent=2)) + tmp.replace(path) # atomic swap + + +def format_eta(elapsed: float, done: int, total: int) -> str: + """남은 작업 시간 ETA 포맷.""" + if done == 0: + return "?" + rate = done / elapsed + remaining = (total - done) / rate + if remaining < 60: + return f"{remaining:.0f}s" + if remaining < 3600: + return f"{remaining / 60:.0f}m" + return f"{remaining / 3600:.1f}h" + + +async def main(): + parser = argparse.ArgumentParser(description="문서 chunk 재인덱싱 (Phase 1.2-E)") + parser.add_argument( + "--concurrency", + type=int, + default=3, + help="동시 처리 doc 수 (default 3, Ollama bge-m3 부하 조절)", + ) + parser.add_argument( + "--checkpoint", + type=Path, + default=Path("checkpoints/reindex.json"), + help="checkpoint 파일 경로 (resume 가능)", + ) + parser.add_argument( + "--rate-limit", + type=float, + default=0.1, + help="작업 간 sleep (초, Ollama 보호)", + ) + parser.add_argument( + "--limit", + type=int, + default=None, + help="처리할 doc 수 제한 (dry-run 용)", + ) + parser.add_argument( + "--skip-existing", + action="store_true", + help="이미 chunks 있는 doc skip (재처리 생략)", + ) + args = parser.parse_args() + + Session = async_sessionmaker(engine) + + # 1. 대상 docs 수집 + async with Session() as session: + query = ( + select(Document.id) + .where( + Document.deleted_at.is_(None), + Document.extracted_text.is_not(None), + ) + .order_by(Document.id) + ) + result = await session.execute(query) + all_doc_ids = [row[0] for row in result] + + if args.skip_existing: + existing_query = select(DocumentChunk.doc_id).distinct() + existing_result = await session.execute(existing_query) + existing = {row[0] for row in existing_result} + logger.info(f"skip-existing: 기존 chunks 보유 doc {len(existing)}건") + else: + existing = set() + + # 2. checkpoint resume + processed = load_checkpoint(args.checkpoint) + if processed: + logger.info(f"checkpoint: 이미 처리됨 {len(processed)}건 (resume)") + + # 3. 처리 대상 = 전체 - skip_existing - checkpoint + targets = [d for d in all_doc_ids if d not in processed and d not in existing] + if args.limit: + targets = targets[: args.limit] + + total = len(targets) + logger.info( + f"REINDEX 시작: 전체 {len(all_doc_ids)} docs / 처리 대상 {total} docs" + f" / concurrency={args.concurrency} rate_limit={args.rate_limit}s" + ) + + if total == 0: + logger.info("처리할 doc 없음. 종료.") + return + + semaphore = asyncio.Semaphore(args.concurrency) + done_count = 0 + fail_count = 0 + start_time = time.monotonic() + log_interval = max(1, total // 50) # ~2% 단위 진행 로그 + + async def process_one(doc_id: int) -> None: + nonlocal done_count, fail_count + async with semaphore: + try: + async with Session() as session: + await process(doc_id, session) + await session.commit() + # rate limit (Ollama 보호) + await asyncio.sleep(args.rate_limit) + done_count += 1 + processed.add(doc_id) + + # 진행 로그 + 체크포인트 저장 + if done_count % log_interval == 0 or done_count == total: + elapsed = time.monotonic() - start_time + pct = (done_count / total) * 100 + eta = format_eta(elapsed, done_count, total) + logger.info( + f"[REINDEX] {done_count}/{total} ({pct:.1f}%)" + f" ETA: {eta} elapsed: {elapsed:.0f}s fails: {fail_count}" + ) + save_checkpoint(args.checkpoint, processed) + except Exception as e: + fail_count += 1 + logger.warning( + f"[REINDEX] doc {doc_id} 실패: {type(e).__name__}: {e}" + ) + + tasks = [process_one(doc_id) for doc_id in targets] + await asyncio.gather(*tasks) + + elapsed = time.monotonic() - start_time + save_checkpoint(args.checkpoint, processed) + logger.info( + f"[REINDEX] 완료: {done_count}/{total} done, {fail_count} fails, {elapsed:.0f}s" + ) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/tests/search_eval/run_eval.py b/tests/search_eval/run_eval.py index ac3fa99..a7ac06f 100644 --- a/tests/search_eval/run_eval.py +++ b/tests/search_eval/run_eval.py @@ -133,6 +133,7 @@ async def call_search( mode: str = "hybrid", limit: int = 20, fusion: str | None = None, + rerank: str | None = None, ) -> tuple[list[int], float]: """검색 API 호출 → (doc_ids, latency_ms).""" url = f"{base_url.rstrip('/')}/api/search/" @@ -140,6 +141,8 @@ async def call_search( params: dict[str, str | int] = {"q": query, "mode": mode, "limit": limit} if fusion: params["fusion"] = fusion + if rerank is not None: + params["rerank"] = rerank import time @@ -165,6 +168,7 @@ async def evaluate( label: str, mode: str = "hybrid", fusion: str | None = None, + rerank: str | None = None, ) -> list[QueryResult]: """전체 쿼리셋 평가.""" results: list[QueryResult] = [] @@ -173,7 +177,7 @@ async def evaluate( for q in queries: try: returned_ids, latency_ms = await call_search( - client, base_url, token, q.query, mode=mode, fusion=fusion + client, base_url, token, q.query, mode=mode, fusion=fusion, rerank=rerank ) results.append( QueryResult( @@ -404,6 +408,13 @@ def main() -> int: choices=["legacy", "rrf", "rrf_boost"], help="hybrid 모드 fusion 전략 (Phase 0.5+, 미지정 시 서버 기본값)", ) + parser.add_argument( + "--rerank", + type=str, + default=None, + choices=["true", "false"], + help="bge-reranker-v2-m3 활성화 (Phase 1.3+, 미지정 시 서버 기본값=true)", + ) parser.add_argument( "--token", type=str, @@ -434,6 +445,8 @@ def main() -> int: print(f"Mode: {args.mode}", end="") if args.fusion: print(f" / fusion: {args.fusion}", end="") + if args.rerank: + print(f" / rerank: {args.rerank}", end="") print() all_results: list[QueryResult] = [] @@ -441,21 +454,21 @@ def main() -> int: if args.base_url: print(f"\n>>> evaluating: {args.base_url}") results = asyncio.run( - evaluate(queries, args.base_url, args.token, "single", mode=args.mode, fusion=args.fusion) + evaluate(queries, args.base_url, args.token, "single", mode=args.mode, fusion=args.fusion, rerank=args.rerank) ) print_summary("single", results) all_results.extend(results) else: print(f"\n>>> baseline: {args.baseline_url}") baseline_results = asyncio.run( - evaluate(queries, args.baseline_url, args.token, "baseline", mode=args.mode, fusion=args.fusion) + evaluate(queries, args.baseline_url, args.token, "baseline", mode=args.mode, fusion=args.fusion, rerank=args.rerank) ) baseline_summary = print_summary("baseline", baseline_results) print(f"\n>>> candidate: {args.candidate_url}") candidate_results = asyncio.run( evaluate( - queries, args.candidate_url, args.token, "candidate", mode=args.mode, fusion=args.fusion + queries, args.candidate_url, args.token, "candidate", mode=args.mode, fusion=args.fusion, rerank=args.rerank ) ) candidate_summary = print_summary("candidate", candidate_results)