From 22117a2a6d9803b14ea5f7cedf705ad6b27f3952 Mon Sep 17 00:00:00 2001 From: Hyungi Ahn Date: Tue, 7 Apr 2026 14:36:08 +0900 Subject: [PATCH 1/9] =?UTF-8?q?feat(search):=20Phase=201.2-AB=20=E2=80=94?= =?UTF-8?q?=20migration=20016=20+=20trigram=20retrieval?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit migration 016: documents FTS 확장 + trigram 인덱스 (1.5초 빌드) - idx_documents_fts_full — title+ai_tags+ai_summary+user_note+extracted_text 통합 FTS - idx_documents_title_trgm — title 단독 trigram - idx_documents_extracted_text_trgm — 본문 trigram (NULL 제외) - idx_documents_ai_summary_trgm — AI 요약 trigram - CONCURRENTLY 불필요 (765 docs / 6.5MB) retrieval_service.search_text: ILIKE 완전 제거 → trigram % + similarity() - WHERE: title %, ai_summary %, FTS @@ (모두 인덱스 활용) - ORDER BY: 5컬럼 similarity 가중 합산 + ts_rank * 2.0 - 가중치 그대로 (title 3.0 / tags 2.5 / note 2.0 / summary 1.5 / extracted 1.0) - threshold default 0.3 (필요 시 set_limit으로 조정) 목표: text_ms 470ms → 100~200ms (ILIKE 풀스캔 제거 효과) --- app/services/search/retrieval_service.py | 77 ++++++++++++++---------- migrations/016_fts_expand_and_trgm.sql | 47 +++++++++++++++ 2 files changed, 92 insertions(+), 32 deletions(-) create mode 100644 migrations/016_fts_expand_and_trgm.sql diff --git a/app/services/search/retrieval_service.py b/app/services/search/retrieval_service.py index 3fea96f..a628f49 100644 --- a/app/services/search/retrieval_service.py +++ b/app/services/search/retrieval_service.py @@ -1,11 +1,11 @@ -"""검색 후보 수집 서비스 (Phase 1.1). +"""검색 후보 수집 서비스 (Phase 1.2). -text(documents FTS + 키워드) + vector(documents.embedding) 후보를 +text(documents FTS + trigram) + vector(documents.embedding → chunks) 후보를 SearchResult 리스트로 반환. -Phase 1.1: search.py의 _search_text/_search_vector를 이전. -Phase 1.1 후속 substep: ILIKE → trigram `similarity()` + `gin_trgm_ops`. -Phase 1.2: vector retrieval을 document_chunks 테이블 기반으로 전환. +Phase 1.1a: search.py의 _search_text/_search_vector를 이전 (ILIKE 그대로). +Phase 1.2-B: ILIKE → trigram `%` + `similarity()`. ILIKE 풀 스캔 제거. +Phase 1.2-B 이후: vector retrieval을 document_chunks 테이블 기반으로 전환. """ from __future__ import annotations @@ -24,10 +24,15 @@ if TYPE_CHECKING: async def search_text( session: AsyncSession, query: str, limit: int ) -> list["SearchResult"]: - """FTS + ILIKE 필드별 가중치 검색. + """FTS + trigram 필드별 가중치 검색 (Phase 1.2-B). + WHERE: 인덱스 있는 trigram 컬럼(title, ai_summary)으로 후보 필터 + FTS 통합 인덱스 + - idx_documents_title_trgm + - idx_documents_ai_summary_trgm + - idx_documents_fts_full (title + ai_tags + ai_summary + user_note + extracted_text) + - extracted_text는 trigram threshold 0.3에서 매우 낮은 similarity → WHERE에선 FTS만 + ORDER BY: 5개 컬럼 similarity 가중 합산 + ts_rank * 2.0 가중치: title 3.0 / ai_tags 2.5 / user_note 2.0 / ai_summary 1.5 / extracted_text 1.0 - + ts_rank * 2.0 보너스. """ from api.search import SearchResult # 순환 import 회피 @@ -36,40 +41,48 @@ async def search_text( SELECT id, title, ai_domain, ai_summary, file_format, left(extracted_text, 200) AS snippet, ( - -- title 매칭 (가중치 최고) - CASE WHEN coalesce(title, '') ILIKE '%%' || :q || '%%' THEN 3.0 ELSE 0 END - -- ai_tags 매칭 (가중치 높음) - + CASE WHEN coalesce(ai_tags::text, '') ILIKE '%%' || :q || '%%' THEN 2.5 ELSE 0 END - -- user_note 매칭 (가중치 높음) - + CASE WHEN coalesce(user_note, '') ILIKE '%%' || :q || '%%' THEN 2.0 ELSE 0 END - -- ai_summary 매칭 (가중치 중상) - + CASE WHEN coalesce(ai_summary, '') ILIKE '%%' || :q || '%%' THEN 1.5 ELSE 0 END - -- extracted_text 매칭 (가중치 중간) - + CASE WHEN coalesce(extracted_text, '') ILIKE '%%' || :q || '%%' THEN 1.0 ELSE 0 END - -- FTS 점수 (보너스) + -- 컬럼별 trigram similarity 가중 합산 + similarity(coalesce(title, ''), :q) * 3.0 + + similarity(coalesce(ai_tags::text, ''), :q) * 2.5 + + similarity(coalesce(user_note, ''), :q) * 2.0 + + similarity(coalesce(ai_summary, ''), :q) * 1.5 + + similarity(coalesce(extracted_text, ''), :q) * 1.0 + -- FTS 보너스 (idx_documents_fts_full 활용) + coalesce(ts_rank( - to_tsvector('simple', coalesce(title, '') || ' ' || coalesce(extracted_text, '')), + to_tsvector('simple', + coalesce(title, '') || ' ' || + coalesce(ai_tags::text, '') || ' ' || + coalesce(ai_summary, '') || ' ' || + coalesce(user_note, '') || ' ' || + coalesce(extracted_text, '') + ), plainto_tsquery('simple', :q) ), 0) * 2.0 ) AS score, - -- match reason + -- match_reason: similarity 가장 큰 컬럼 또는 FTS CASE - WHEN coalesce(title, '') ILIKE '%%' || :q || '%%' THEN 'title' - WHEN coalesce(ai_tags::text, '') ILIKE '%%' || :q || '%%' THEN 'tags' - WHEN coalesce(user_note, '') ILIKE '%%' || :q || '%%' THEN 'note' - WHEN coalesce(ai_summary, '') ILIKE '%%' || :q || '%%' THEN 'summary' - WHEN coalesce(extracted_text, '') ILIKE '%%' || :q || '%%' THEN 'content' + WHEN similarity(coalesce(title, ''), :q) >= 0.3 THEN 'title' + WHEN similarity(coalesce(ai_tags::text, ''), :q) >= 0.3 THEN 'tags' + WHEN similarity(coalesce(user_note, ''), :q) >= 0.3 THEN 'note' + WHEN similarity(coalesce(ai_summary, ''), :q) >= 0.3 THEN 'summary' + WHEN similarity(coalesce(extracted_text, ''), :q) >= 0.3 THEN 'content' ELSE 'fts' END AS match_reason FROM documents WHERE deleted_at IS NULL - AND (coalesce(title, '') ILIKE '%%' || :q || '%%' - OR coalesce(ai_tags::text, '') ILIKE '%%' || :q || '%%' - OR coalesce(user_note, '') ILIKE '%%' || :q || '%%' - OR coalesce(ai_summary, '') ILIKE '%%' || :q || '%%' - OR coalesce(extracted_text, '') ILIKE '%%' || :q || '%%' - OR to_tsvector('simple', coalesce(title, '') || ' ' || coalesce(extracted_text, '')) - @@ plainto_tsquery('simple', :q)) + AND ( + -- trigram 후보 필터 (인덱스 있는 짧은 컬럼만) + title % :q + OR (ai_summary IS NOT NULL AND ai_summary % :q) + -- FTS 통합 인덱스 + OR to_tsvector('simple', + coalesce(title, '') || ' ' || + coalesce(ai_tags::text, '') || ' ' || + coalesce(ai_summary, '') || ' ' || + coalesce(user_note, '') || ' ' || + coalesce(extracted_text, '') + ) @@ plainto_tsquery('simple', :q) + ) ORDER BY score DESC LIMIT :limit """), diff --git a/migrations/016_fts_expand_and_trgm.sql b/migrations/016_fts_expand_and_trgm.sql new file mode 100644 index 0000000..54c2bda --- /dev/null +++ b/migrations/016_fts_expand_and_trgm.sql @@ -0,0 +1,47 @@ +-- Phase 1.2: documents 테이블 FTS 확장 + trigram 인덱스 +-- +-- 목적: +-- 1) FTS 인덱스를 title + ai_tags + ai_summary + user_note + extracted_text 통합 범위로 확장 +-- 현재 retrieval_service.search_text의 SQL 안 to_tsvector(...)는 인덱스 없이 동작. +-- 2) trigram 인덱스로 ILIKE 풀스캔(text_ms 470ms)을 similarity() + GIN 인덱스로 대체. +-- +-- 데이터 규모 (2026-04-07 측정): documents 765 / 평균 본문 8.5KB / 총 6.5MB +-- 인덱스 빌드 시간 추산: 5~30초 (CONCURRENTLY 불필요, 짧은 lock 수용 가능) +-- +-- Phase 1.2-A 단독 적용. 1.2-B에서 retrieval_service.search_text의 SQL을 +-- ILIKE → similarity() + `%` 연산자로 전환하면서 이 인덱스들을 활용. + +-- pg_trgm extension (014에서 이미 활성화, IF NOT EXISTS로 안전) +CREATE EXTENSION IF NOT EXISTS pg_trgm; + +-- ─── 1) 통합 FTS 인덱스 ──────────────────────────────────── +-- title + ai_tags(JSONB→text) + ai_summary + user_note + extracted_text를 한 번에 토큰화. +-- retrieval_service.search_text의 ts_rank 호출이 이 인덱스를 사용하도록 SQL 갱신 예정. +CREATE INDEX IF NOT EXISTS idx_documents_fts_full ON documents + USING GIN ( + to_tsvector('simple', + coalesce(title, '') || ' ' || + coalesce(ai_tags::text, '') || ' ' || + coalesce(ai_summary, '') || ' ' || + coalesce(user_note, '') || ' ' || + coalesce(extracted_text, '') + ) + ); + +-- ─── 2) title trigram 인덱스 ─────────────────────────────── +-- 가장 자주 매칭되는 컬럼. similarity(title, query) > threshold + ORDER BY로 사용. +CREATE INDEX IF NOT EXISTS idx_documents_title_trgm ON documents + USING GIN (title gin_trgm_ops); + +-- ─── 3) extracted_text trigram 인덱스 ────────────────────── +-- ILIKE의 dominant cost를 trigram GIN 인덱스로 대체. +-- WHERE 절로 NULL/빈 본문 제외해 인덱스 크기 절감. +CREATE INDEX IF NOT EXISTS idx_documents_extracted_text_trgm ON documents + USING GIN (extracted_text gin_trgm_ops) + WHERE extracted_text IS NOT NULL AND length(extracted_text) > 0; + +-- ─── 4) ai_summary trigram 인덱스 ────────────────────────── +-- summary는 짧지만 의미 매칭에 자주 활용 (가중치 1.5). +CREATE INDEX IF NOT EXISTS idx_documents_ai_summary_trgm ON documents + USING GIN (ai_summary gin_trgm_ops) + WHERE ai_summary IS NOT NULL AND length(ai_summary) > 0; From fab3c81a0feedbe76994232c585dee38ddf6fcc4 Mon Sep 17 00:00:00 2001 From: Hyungi Ahn Date: Wed, 8 Apr 2026 11:51:06 +0900 Subject: [PATCH 2/9] =?UTF-8?q?fix(search):=20Phase=201.2-B=20UNION=20?= =?UTF-8?q?=EB=B6=84=ED=95=B4=EB=A1=9C=20trigram/FTS=20=EC=9D=B8=EB=8D=B1?= =?UTF-8?q?=EC=8A=A4=20=EA=B0=95=EC=A0=9C=20=ED=99=9C=EC=9A=A9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit EXPLAIN 진단: OR 통합 WHERE는 PostgreSQL planner가 인덱스 결합 못 함 (small table 765 docs라 Seq Scan 선택). Filter 524ms. 해결: WHERE OR을 CTE candidates UNION으로 분해. - title trigram → idx_documents_title_trgm (0.5ms) - ai_summary trigram → idx_documents_ai_summary_trgm (length>0 매치 추가) - FTS @@ → idx_documents_fts_full (0.05ms) EXPLAIN 측정: 525ms → 26ms (95% 감소). 본 SELECT(similarity 가중 합산 + ORDER BY) 추가하면 100~150ms 예상. --- app/services/search/retrieval_service.py | 109 +++++++++++++---------- 1 file changed, 63 insertions(+), 46 deletions(-) diff --git a/app/services/search/retrieval_service.py b/app/services/search/retrieval_service.py index a628f49..13b7f69 100644 --- a/app/services/search/retrieval_service.py +++ b/app/services/search/retrieval_service.py @@ -24,65 +24,82 @@ if TYPE_CHECKING: async def search_text( session: AsyncSession, query: str, limit: int ) -> list["SearchResult"]: - """FTS + trigram 필드별 가중치 검색 (Phase 1.2-B). + """FTS + trigram 필드별 가중치 검색 (Phase 1.2-B UNION 분해). - WHERE: 인덱스 있는 trigram 컬럼(title, ai_summary)으로 후보 필터 + FTS 통합 인덱스 - - idx_documents_title_trgm - - idx_documents_ai_summary_trgm - - idx_documents_fts_full (title + ai_tags + ai_summary + user_note + extracted_text) - - extracted_text는 trigram threshold 0.3에서 매우 낮은 similarity → WHERE에선 FTS만 - ORDER BY: 5개 컬럼 similarity 가중 합산 + ts_rank * 2.0 + Phase 1.2-B 진단: + OR로 묶은 단일 SELECT는 PostgreSQL planner가 OR 결합 인덱스를 못 만들고 + Seq Scan을 선택 (small table 765 docs). EXPLAIN으로 측정 시 525ms. + → CTE + UNION으로 분해하면 각 branch가 자기 인덱스 활용 → 26ms (95% 감소). + + 구조: + candidates CTE + ├─ title % → idx_documents_title_trgm + ├─ ai_summary % → idx_documents_ai_summary_trgm + │ (length > 0 partial index 매치 조건 포함) + └─ FTS @@ plainto_tsquery → idx_documents_fts_full + JOIN documents d ON d.id = c.id + ORDER BY 5컬럼 similarity 가중 합산 + ts_rank * 2.0 가중치: title 3.0 / ai_tags 2.5 / user_note 2.0 / ai_summary 1.5 / extracted_text 1.0 """ from api.search import SearchResult # 순환 import 회피 result = await session.execute( text(""" - SELECT id, title, ai_domain, ai_summary, file_format, - left(extracted_text, 200) AS snippet, - ( - -- 컬럼별 trigram similarity 가중 합산 - similarity(coalesce(title, ''), :q) * 3.0 - + similarity(coalesce(ai_tags::text, ''), :q) * 2.5 - + similarity(coalesce(user_note, ''), :q) * 2.0 - + similarity(coalesce(ai_summary, ''), :q) * 1.5 - + similarity(coalesce(extracted_text, ''), :q) * 1.0 - -- FTS 보너스 (idx_documents_fts_full 활용) - + coalesce(ts_rank( - to_tsvector('simple', - coalesce(title, '') || ' ' || - coalesce(ai_tags::text, '') || ' ' || - coalesce(ai_summary, '') || ' ' || - coalesce(user_note, '') || ' ' || - coalesce(extracted_text, '') - ), - plainto_tsquery('simple', :q) - ), 0) * 2.0 - ) AS score, - -- match_reason: similarity 가장 큰 컬럼 또는 FTS - CASE - WHEN similarity(coalesce(title, ''), :q) >= 0.3 THEN 'title' - WHEN similarity(coalesce(ai_tags::text, ''), :q) >= 0.3 THEN 'tags' - WHEN similarity(coalesce(user_note, ''), :q) >= 0.3 THEN 'note' - WHEN similarity(coalesce(ai_summary, ''), :q) >= 0.3 THEN 'summary' - WHEN similarity(coalesce(extracted_text, ''), :q) >= 0.3 THEN 'content' - ELSE 'fts' - END AS match_reason - FROM documents - WHERE deleted_at IS NULL - AND ( - -- trigram 후보 필터 (인덱스 있는 짧은 컬럼만) - title % :q - OR (ai_summary IS NOT NULL AND ai_summary % :q) - -- FTS 통합 인덱스 - OR to_tsvector('simple', + WITH candidates AS ( + -- title trigram (idx_documents_title_trgm) + SELECT id FROM documents + WHERE deleted_at IS NULL AND title %% :q + UNION + -- ai_summary trigram (idx_documents_ai_summary_trgm 부분 인덱스 매치) + SELECT id FROM documents + WHERE deleted_at IS NULL + AND ai_summary IS NOT NULL + AND length(ai_summary) > 0 + AND ai_summary %% :q + UNION + -- FTS 통합 인덱스 (idx_documents_fts_full) + SELECT id FROM documents + WHERE deleted_at IS NULL + AND to_tsvector('simple', coalesce(title, '') || ' ' || coalesce(ai_tags::text, '') || ' ' || coalesce(ai_summary, '') || ' ' || coalesce(user_note, '') || ' ' || coalesce(extracted_text, '') ) @@ plainto_tsquery('simple', :q) - ) + ) + SELECT d.id, d.title, d.ai_domain, d.ai_summary, d.file_format, + left(d.extracted_text, 200) AS snippet, + ( + -- 컬럼별 trigram similarity 가중 합산 + similarity(coalesce(d.title, ''), :q) * 3.0 + + similarity(coalesce(d.ai_tags::text, ''), :q) * 2.5 + + similarity(coalesce(d.user_note, ''), :q) * 2.0 + + similarity(coalesce(d.ai_summary, ''), :q) * 1.5 + + similarity(coalesce(d.extracted_text, ''), :q) * 1.0 + -- FTS 보너스 (idx_documents_fts_full 활용) + + coalesce(ts_rank( + to_tsvector('simple', + coalesce(d.title, '') || ' ' || + coalesce(d.ai_tags::text, '') || ' ' || + coalesce(d.ai_summary, '') || ' ' || + coalesce(d.user_note, '') || ' ' || + coalesce(d.extracted_text, '') + ), + plainto_tsquery('simple', :q) + ), 0) * 2.0 + ) AS score, + -- match_reason: similarity 가장 큰 컬럼 또는 FTS + CASE + WHEN similarity(coalesce(d.title, ''), :q) >= 0.3 THEN 'title' + WHEN similarity(coalesce(d.ai_tags::text, ''), :q) >= 0.3 THEN 'tags' + WHEN similarity(coalesce(d.user_note, ''), :q) >= 0.3 THEN 'note' + WHEN similarity(coalesce(d.ai_summary, ''), :q) >= 0.3 THEN 'summary' + WHEN similarity(coalesce(d.extracted_text, ''), :q) >= 0.3 THEN 'content' + ELSE 'fts' + END AS match_reason + FROM documents d + JOIN candidates c ON d.id = c.id ORDER BY score DESC LIMIT :limit """), From ca3e1952d20801b7797add33e3c9d6e2550ff7ce Mon Sep 17 00:00:00 2001 From: Hyungi Ahn Date: Wed, 8 Apr 2026 11:53:24 +0900 Subject: [PATCH 3/9] =?UTF-8?q?fix(search):=20trigram=20%=20operator=20esc?= =?UTF-8?q?ape=20=EC=88=98=EC=A0=95=20(%%=20=E2=86=92=20%)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit SQLAlchemy text() + asyncpg dialect에서 trigram operator 위치의 %%는 unescape 안 되어 'text %% unknown' 에러 발생. 단일 %로 변경. ILIKE의 string literal 안의 %%는 PostgreSQL에서 두 wildcard로 동작했으나, operator 위치는 escape 처리 경로가 다름. --- app/services/search/retrieval_service.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/app/services/search/retrieval_service.py b/app/services/search/retrieval_service.py index 13b7f69..ac8ae9d 100644 --- a/app/services/search/retrieval_service.py +++ b/app/services/search/retrieval_service.py @@ -48,14 +48,14 @@ async def search_text( WITH candidates AS ( -- title trigram (idx_documents_title_trgm) SELECT id FROM documents - WHERE deleted_at IS NULL AND title %% :q + WHERE deleted_at IS NULL AND title % :q UNION -- ai_summary trigram (idx_documents_ai_summary_trgm 부분 인덱스 매치) SELECT id FROM documents WHERE deleted_at IS NULL AND ai_summary IS NOT NULL AND length(ai_summary) > 0 - AND ai_summary %% :q + AND ai_summary % :q UNION -- FTS 통합 인덱스 (idx_documents_fts_full) SELECT id FROM documents From f9af8dd355b9f363978045626e483c0441262e39 Mon Sep 17 00:00:00 2001 From: Hyungi Ahn Date: Wed, 8 Apr 2026 11:58:41 +0900 Subject: [PATCH 4/9] =?UTF-8?q?fix(search):=20trigram=20threshold=200.3=20?= =?UTF-8?q?=E2=86=92=200.15=20(set=5Flimit)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Phase 1.2-B 평가셋 결과 recall 0.788 → 0.750 회귀. 원인: trigram default threshold 0.3이 multi-token 쿼리에서 너무 엄격. 예: '이란 미국 전쟁 글로벌 반응' 같은 5단어 한국어 뉴스 쿼리는 title/ai_summary trigram 매칭이 거의 안 됨. 해결: search_text 시작 시 set_limit(0.15) 호출. - trigram 매칭 더 관대 (recall ↑) - precision은 ORDER BY similarity 가중 합산이 보정 - p95 latency 169ms 여유 충분 (목표 500ms) --- app/services/search/retrieval_service.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/app/services/search/retrieval_service.py b/app/services/search/retrieval_service.py index ac8ae9d..fd9d6ca 100644 --- a/app/services/search/retrieval_service.py +++ b/app/services/search/retrieval_service.py @@ -40,9 +40,19 @@ async def search_text( JOIN documents d ON d.id = c.id ORDER BY 5컬럼 similarity 가중 합산 + ts_rank * 2.0 가중치: title 3.0 / ai_tags 2.5 / user_note 2.0 / ai_summary 1.5 / extracted_text 1.0 + + threshold: + pg_trgm.similarity_threshold default = 0.3 + → multi-token 한국어 뉴스 쿼리(예: "이란 미국 전쟁 글로벌 반응")에서 + candidates를 못 모음 → recall 감소 (0.788 → 0.750) + → set_limit(0.15)으로 낮춰 recall 회복. precision은 ORDER BY similarity 합산이 보정. """ from api.search import SearchResult # 순환 import 회피 + # trigram threshold를 0.15로 낮춰 multi-token query recall 회복 + # SQLAlchemy async session 내 두 execute는 같은 connection 사용 + await session.execute(text("SELECT set_limit(0.15)")) + result = await session.execute( text(""" WITH candidates AS ( From 731d1396e8d7f402a537bea3f269b92417a28594 Mon Sep 17 00:00:00 2001 From: Hyungi Ahn Date: Wed, 8 Apr 2026 12:26:38 +0900 Subject: [PATCH 5/9] =?UTF-8?q?fix(chunk):=20=5Fchunk=5Flegal=20=EC=98=81?= =?UTF-8?q?=EC=96=B4=20=EB=B2=95=EB=A0=B9=20sliding=20window=20fallback?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 영어/외국 법령(ai_domain Foreign_Law 등)은 '제N조' 패턴이 없어 split 결과가 1개 element만 나옴 → 서문 chunk(첫 1500자)만 생성되고 본문 대부분 손실. 발견: doc 3759 (Industrial Safety, 93KB 영어) → 1개 chunk만 생성. 수정: parts split 결과가 1개 이하면 _chunk_sliding fallback 호출. 한국어 법령(제N조 패턴 있음)은 기존 분할 로직 그대로 작동. Phase 1.2-D smoke test에서 발견. 재인덱싱 전 fix 필수. --- app/workers/chunk_worker.py | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/app/workers/chunk_worker.py b/app/workers/chunk_worker.py index d67ccd1..a9f1baf 100644 --- a/app/workers/chunk_worker.py +++ b/app/workers/chunk_worker.py @@ -79,11 +79,20 @@ def _classify_chunk_strategy(doc: Document) -> str: # ─── Chunking 전략 ─── def _chunk_legal(text: str) -> list[dict]: - """법령: 제N조 단위로 분할 (상위 조문 컨텍스트 보존)""" + """법령: 제N조 단위로 분할 (상위 조문 컨텍스트 보존). + + 영어/외국 법령(ai_domain Foreign_Law 등)은 "제N조" 패턴이 없어 split 결과가 + 1개 element만 나옴 → 서문 chunk 1개만 생성되고 본문 대부분이 손실되는 버그. + 조문 패턴 미검출 시 sliding window fallback으로 처리. + """ # "제 1 조", "제1조", "제 1 조(제목)" 등 매칭 pattern = re.compile(r"(제\s*\d+\s*조(?:의\s*\d+)?(?:\([^)]*\))?)") parts = pattern.split(text) + # 조문 패턴 미검출 (영어/외국 법령 등) → sliding window fallback + if len(parts) <= 1: + return _chunk_sliding(text, DEFAULT_WINDOW_CHARS, DEFAULT_OVERLAP_CHARS, "section") + chunks = [] # parts[0] = 조 이전 서문, parts[1], parts[2] = (마커, 본문) pairs if parts[0].strip() and len(parts[0]) >= MIN_CHUNK_CHARS: From 42dfe82c9bf56df2a38d73c1559d742547c184bd Mon Sep 17 00:00:00 2001 From: Hyungi Ahn Date: Wed, 8 Apr 2026 12:31:29 +0900 Subject: [PATCH 6/9] =?UTF-8?q?feat(chunk):=20Phase=201.2-E=20reindex=20?= =?UTF-8?q?=EC=8A=A4=ED=81=AC=EB=A6=BD=ED=8A=B8=20=EC=B6=94=EA=B0=80?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit tests/scripts/reindex_all_chunks.py — 전체 documents chunk 재인덱싱 도구. 핵심 요건 (사용자 정의): - asyncio.Semaphore(N) — 동시 처리 수 제한 (기본 3, Ollama bge-m3 부하 조절) - checkpoint resume — JSON 파일 atomic swap, 중간 실패/중단 후 재시작 가능 - rate limiting — 작업 간 sleep 0.1초 (Ollama API 보호) - 진행 로그 — [REINDEX] N/total (P%) ETA: ... fails: N (~2% 단위) CLI: - --concurrency, --checkpoint, --rate-limit, --limit (dry-run), --skip-existing 야간 배치 (00:00~06:00): PYTHONPATH=app .venv/bin/python tests/scripts/reindex_all_chunks.py \ --concurrency 3 --checkpoint checkpoints/reindex.json \ > logs/reindex.log 2>&1 & --- tests/scripts/__init__.py | 0 tests/scripts/reindex_all_chunks.py | 204 ++++++++++++++++++++++++++++ 2 files changed, 204 insertions(+) create mode 100644 tests/scripts/__init__.py create mode 100644 tests/scripts/reindex_all_chunks.py diff --git a/tests/scripts/__init__.py b/tests/scripts/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/scripts/reindex_all_chunks.py b/tests/scripts/reindex_all_chunks.py new file mode 100644 index 0000000..851fb8d --- /dev/null +++ b/tests/scripts/reindex_all_chunks.py @@ -0,0 +1,204 @@ +"""문서 chunk 재인덱싱 (Phase 1.2-E). + +전체 documents를 chunk_worker로 재처리. 야간 배치 권장 (00:00~06:00). + +핵심 요건 (사용자 정의): +- concurrency 제한 (asyncio.Semaphore) — Ollama 부하 조절 +- checkpoint resume (중간 실패/중단 대비) +- rate limiting (Ollama API 보호) +- 진행 로그 ([REINDEX] N/total (P%) ETA: ...) + +사용: + cd /home/hyungi/Documents/code/hyungi_Document_Server + PYTHONPATH=app .venv/bin/python tests/scripts/reindex_all_chunks.py \\ + --concurrency 3 \\ + --checkpoint checkpoints/reindex.json \\ + > logs/reindex.log 2>&1 & + +dry-run (5개만): + PYTHONPATH=app .venv/bin/python tests/scripts/reindex_all_chunks.py --limit 5 + +기존 chunks 보유 doc 건너뛰기: + PYTHONPATH=app .venv/bin/python tests/scripts/reindex_all_chunks.py --skip-existing + +기존 chunks 강제 재처리 (chunk_worker가 자동으로 delete + insert): + PYTHONPATH=app .venv/bin/python tests/scripts/reindex_all_chunks.py +""" + +import argparse +import asyncio +import json +import sys +import time +from pathlib import Path + +# PYTHONPATH=app 가정 +sys.path.insert(0, str(Path(__file__).resolve().parent.parent.parent / "app")) + +from sqlalchemy import select # noqa: E402 +from sqlalchemy.ext.asyncio import async_sessionmaker # noqa: E402 + +from core.database import engine # noqa: E402 +from core.utils import setup_logger # noqa: E402 +from models.chunk import DocumentChunk # noqa: E402 +from models.document import Document # noqa: E402 +from workers.chunk_worker import process # noqa: E402 + +logger = setup_logger("reindex") + + +def load_checkpoint(path: Path) -> set[int]: + """checkpoint 파일에서 처리 완료 doc_id 집합 복원.""" + if not path.exists(): + return set() + try: + data = json.loads(path.read_text()) + return set(data.get("processed", [])) + except (json.JSONDecodeError, KeyError) as e: + logger.warning(f"checkpoint {path} invalid ({e}) → 새로 시작") + return set() + + +def save_checkpoint(path: Path, processed: set[int]) -> None: + """처리 완료 doc_id를 checkpoint 파일에 저장 (incremental).""" + path.parent.mkdir(parents=True, exist_ok=True) + tmp = path.with_suffix(path.suffix + ".tmp") + tmp.write_text(json.dumps({"processed": sorted(processed)}, indent=2)) + tmp.replace(path) # atomic swap + + +def format_eta(elapsed: float, done: int, total: int) -> str: + """남은 작업 시간 ETA 포맷.""" + if done == 0: + return "?" + rate = done / elapsed + remaining = (total - done) / rate + if remaining < 60: + return f"{remaining:.0f}s" + if remaining < 3600: + return f"{remaining / 60:.0f}m" + return f"{remaining / 3600:.1f}h" + + +async def main(): + parser = argparse.ArgumentParser(description="문서 chunk 재인덱싱 (Phase 1.2-E)") + parser.add_argument( + "--concurrency", + type=int, + default=3, + help="동시 처리 doc 수 (default 3, Ollama bge-m3 부하 조절)", + ) + parser.add_argument( + "--checkpoint", + type=Path, + default=Path("checkpoints/reindex.json"), + help="checkpoint 파일 경로 (resume 가능)", + ) + parser.add_argument( + "--rate-limit", + type=float, + default=0.1, + help="작업 간 sleep (초, Ollama 보호)", + ) + parser.add_argument( + "--limit", + type=int, + default=None, + help="처리할 doc 수 제한 (dry-run 용)", + ) + parser.add_argument( + "--skip-existing", + action="store_true", + help="이미 chunks 있는 doc skip (재처리 생략)", + ) + args = parser.parse_args() + + Session = async_sessionmaker(engine) + + # 1. 대상 docs 수집 + async with Session() as session: + query = ( + select(Document.id) + .where( + Document.deleted_at.is_(None), + Document.extracted_text.is_not(None), + ) + .order_by(Document.id) + ) + result = await session.execute(query) + all_doc_ids = [row[0] for row in result] + + if args.skip_existing: + existing_query = select(DocumentChunk.doc_id).distinct() + existing_result = await session.execute(existing_query) + existing = {row[0] for row in existing_result} + logger.info(f"skip-existing: 기존 chunks 보유 doc {len(existing)}건") + else: + existing = set() + + # 2. checkpoint resume + processed = load_checkpoint(args.checkpoint) + if processed: + logger.info(f"checkpoint: 이미 처리됨 {len(processed)}건 (resume)") + + # 3. 처리 대상 = 전체 - skip_existing - checkpoint + targets = [d for d in all_doc_ids if d not in processed and d not in existing] + if args.limit: + targets = targets[: args.limit] + + total = len(targets) + logger.info( + f"REINDEX 시작: 전체 {len(all_doc_ids)} docs / 처리 대상 {total} docs" + f" / concurrency={args.concurrency} rate_limit={args.rate_limit}s" + ) + + if total == 0: + logger.info("처리할 doc 없음. 종료.") + return + + semaphore = asyncio.Semaphore(args.concurrency) + done_count = 0 + fail_count = 0 + start_time = time.monotonic() + log_interval = max(1, total // 50) # ~2% 단위 진행 로그 + + async def process_one(doc_id: int) -> None: + nonlocal done_count, fail_count + async with semaphore: + try: + async with Session() as session: + await process(doc_id, session) + await session.commit() + # rate limit (Ollama 보호) + await asyncio.sleep(args.rate_limit) + done_count += 1 + processed.add(doc_id) + + # 진행 로그 + 체크포인트 저장 + if done_count % log_interval == 0 or done_count == total: + elapsed = time.monotonic() - start_time + pct = (done_count / total) * 100 + eta = format_eta(elapsed, done_count, total) + logger.info( + f"[REINDEX] {done_count}/{total} ({pct:.1f}%)" + f" ETA: {eta} elapsed: {elapsed:.0f}s fails: {fail_count}" + ) + save_checkpoint(args.checkpoint, processed) + except Exception as e: + fail_count += 1 + logger.warning( + f"[REINDEX] doc {doc_id} 실패: {type(e).__name__}: {e}" + ) + + tasks = [process_one(doc_id) for doc_id in targets] + await asyncio.gather(*tasks) + + elapsed = time.monotonic() - start_time + save_checkpoint(args.checkpoint, processed) + logger.info( + f"[REINDEX] 완료: {done_count}/{total} done, {fail_count} fails, {elapsed:.0f}s" + ) + + +if __name__ == "__main__": + asyncio.run(main()) From b80116243f863253b496748793988b04c73eb146 Mon Sep 17 00:00:00 2001 From: Hyungi Ahn Date: Wed, 8 Apr 2026 12:36:47 +0900 Subject: [PATCH 7/9] =?UTF-8?q?feat(search):=20Phase=201.2-C=20chunks=20?= =?UTF-8?q?=EA=B8=B0=EB=B0=98=20vector=20retrieval=20+=20raw=20chunks=20?= =?UTF-8?q?=EB=B3=B4=EC=A1=B4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit retrieval_service.search_vector를 documents.embedding → document_chunks.embedding로 전환. fetch_limit = limit*5로 raw chunks를 넓게 가져온 후 doc 기준 압축. 신규: compress_chunks_to_docs(chunks, limit) → (doc_results, chunks_by_doc) - doc_id 별 best score chunk만 doc_results (fusion 입력) - 모든 raw chunks는 chunks_by_doc dict에 보존 (Phase 1.3 reranker용) - '같은 doc 중복으로 RRF가 false boost' 방지 SearchResult: chunk_id / chunk_index / section_title optional 필드 추가. - text 검색 결과는 None (doc-level) - vector 검색 결과는 채워짐 (chunk-level) search.py 흐름: 1. raw_chunks = await search_vector(...) 2. vector_results, chunks_by_doc = compress_chunks_to_docs(raw_chunks, limit) 3. fusion(text_results, vector_results) — doc 기준 4. (Phase 1.3) chunks_by_doc → reranker — chunk 기준 debug notes: raw=N compressed=M unique_docs=K로 흐름 검증. 데이터 의존: 재인덱싱(reindex_all_chunks.py 진행 중) 완료 후 평가셋으로 검증. --- app/api/search.py | 37 ++++++++++-- app/services/search/retrieval_service.py | 74 ++++++++++++++++++++---- 2 files changed, 94 insertions(+), 17 deletions(-) diff --git a/app/api/search.py b/app/api/search.py index 5682866..918d2aa 100644 --- a/app/api/search.py +++ b/app/api/search.py @@ -16,7 +16,7 @@ from core.database import get_session from core.utils import setup_logger from models.user import User from services.search.fusion_service import DEFAULT_FUSION, get_strategy, normalize_display_scores -from services.search.retrieval_service import search_text, search_vector +from services.search.retrieval_service import compress_chunks_to_docs, search_text, search_vector from services.search_telemetry import ( compute_confidence, compute_confidence_hybrid, @@ -30,7 +30,14 @@ router = APIRouter() class SearchResult(BaseModel): - id: int + """검색 결과 단일 행. + + Phase 1.2-C: chunk-level vector retrieval 도입으로 chunk 메타 필드 추가. + text 검색 결과는 chunk_id 등이 None (doc-level). + vector 검색 결과는 chunk_id 등이 채워짐 (chunk-level). + """ + + id: int # doc_id (text/vector 공통) title: str | None ai_domain: str | None ai_summary: str | None @@ -38,6 +45,10 @@ class SearchResult(BaseModel): score: float snippet: str | None match_reason: str | None = None + # Phase 1.2-C: chunk 메타 (vector 검색 시 채워짐) + chunk_id: int | None = None + chunk_index: int | None = None + section_title: str | None = None # ─── Phase 0.4: 디버그 응답 스키마 ───────────────────────── @@ -99,16 +110,20 @@ async def search( timing: dict[str, float] = {} notes: list[str] = [] text_results: list[SearchResult] = [] - vector_results: list[SearchResult] = [] + vector_results: list[SearchResult] = [] # doc-level (압축 후, fusion 입력) + raw_chunks: list[SearchResult] = [] # chunk-level (raw, Phase 1.3 reranker용) + chunks_by_doc: dict[int, list[SearchResult]] = {} # Phase 1.3 reranker용 보존 t_total = time.perf_counter() if mode == "vector": t0 = time.perf_counter() - vector_results = await search_vector(session, q, limit) + raw_chunks = await search_vector(session, q, limit) timing["vector_ms"] = (time.perf_counter() - t0) * 1000 - if not vector_results: + if not raw_chunks: notes.append("vector_search_returned_empty (AI client error or no embeddings)") + # vector 단독 모드도 doc 압축해서 다양성 확보 (chunk 중복 방지) + vector_results, chunks_by_doc = compress_chunks_to_docs(raw_chunks, limit) results = vector_results else: t0 = time.perf_counter() @@ -117,8 +132,14 @@ async def search( if mode == "hybrid": t1 = time.perf_counter() - vector_results = await search_vector(session, q, limit) + raw_chunks = await search_vector(session, q, limit) timing["vector_ms"] = (time.perf_counter() - t1) * 1000 + + # chunk-level → doc-level 압축 (raw chunks는 chunks_by_doc에 보존) + t1b = time.perf_counter() + vector_results, chunks_by_doc = compress_chunks_to_docs(raw_chunks, limit) + timing["compress_ms"] = (time.perf_counter() - t1b) * 1000 + if not vector_results: notes.append("vector_search_returned_empty — text-only fallback") @@ -127,6 +148,10 @@ async def search( results = strategy.fuse(text_results, vector_results, q, limit) timing["fusion_ms"] = (time.perf_counter() - t2) * 1000 notes.append(f"fusion={strategy.name}") + notes.append( + f"chunks raw={len(raw_chunks)} compressed={len(vector_results)} " + f"unique_docs={len(chunks_by_doc)}" + ) else: results = text_results diff --git a/app/services/search/retrieval_service.py b/app/services/search/retrieval_service.py index fd9d6ca..568ec4a 100644 --- a/app/services/search/retrieval_service.py +++ b/app/services/search/retrieval_service.py @@ -121,10 +121,16 @@ async def search_text( async def search_vector( session: AsyncSession, query: str, limit: int ) -> list["SearchResult"]: - """벡터 유사도 검색 (코사인 거리). + """벡터 유사도 검색 — chunk-level (Phase 1.2-C). - Phase 1.2에서 document_chunks 테이블 기반으로 전환 예정. - 현재는 documents.embedding 사용. + document_chunks 테이블에서 cosine similarity로 raw chunks 반환. + 같은 doc에서 여러 chunks가 들어올 수 있음 (압축 안 함). + fusion 직전에 compress_chunks_to_docs() helper로 doc 기준 압축 필요. + Phase 1.3 reranker는 raw chunks를 그대로 활용. + + SearchResult.id = doc_id (fusion 호환) + SearchResult.chunk_id / chunk_index / section_title = chunk 메타 + snippet = chunk의 text 앞 200자 """ from api.search import SearchResult # 순환 import 회피 @@ -135,17 +141,63 @@ async def search_vector( except Exception: return [] + # raw chunks를 doc 메타와 join. limit * 5 정도 넓게 → 압축 후 doc 다양성. + fetch_limit = limit * 5 result = await session.execute( text(""" - SELECT id, title, ai_domain, ai_summary, file_format, - (1 - (embedding <=> cast(:embedding AS vector))) AS score, - left(extracted_text, 200) AS snippet, - 'vector' AS match_reason - FROM documents - WHERE embedding IS NOT NULL AND deleted_at IS NULL - ORDER BY embedding <=> cast(:embedding AS vector) + SELECT + d.id AS id, + d.title AS title, + d.ai_domain AS ai_domain, + d.ai_summary AS ai_summary, + d.file_format AS file_format, + (1 - (c.embedding <=> cast(:embedding AS vector))) AS score, + left(c.text, 200) AS snippet, + 'vector' AS match_reason, + c.id AS chunk_id, + c.chunk_index AS chunk_index, + c.section_title AS section_title + FROM document_chunks c + JOIN documents d ON d.id = c.doc_id + WHERE c.embedding IS NOT NULL AND d.deleted_at IS NULL + ORDER BY c.embedding <=> cast(:embedding AS vector) LIMIT :limit """), - {"embedding": str(query_embedding), "limit": limit}, + {"embedding": str(query_embedding), "limit": fetch_limit}, ) return [SearchResult(**row._mapping) for row in result] + + +def compress_chunks_to_docs( + chunks: list["SearchResult"], limit: int +) -> tuple[list["SearchResult"], dict[int, list["SearchResult"]]]: + """chunk-level 결과를 doc-level로 압축하면서 raw chunks를 보존. + + fusion은 doc 기준이어야 하지만(같은 doc 중복 방지), Phase 1.3 reranker는 + chunk 기준 raw 데이터가 필요함. 따라서 압축본과 raw를 동시 반환. + + 압축 규칙: + - doc_id 별로 가장 score 높은 chunk만 doc_results에 추가 + - 같은 doc의 다른 chunks는 chunks_by_doc dict에 보존 (Phase 1.3 reranker용) + - score 내림차순 정렬 후 limit개만 doc_results + + Returns: + (doc_results, chunks_by_doc) + - doc_results: list[SearchResult] — doc당 best chunk score, fusion 입력 + - chunks_by_doc: dict[doc_id, list[SearchResult]] — 모든 raw chunks 보존 + """ + if not chunks: + return [], {} + + chunks_by_doc: dict[int, list["SearchResult"]] = {} + best_per_doc: dict[int, "SearchResult"] = {} + + for chunk in chunks: + chunks_by_doc.setdefault(chunk.id, []).append(chunk) + prev_best = best_per_doc.get(chunk.id) + if prev_best is None or chunk.score > prev_best.score: + best_per_doc[chunk.id] = chunk + + # doc 단위 best score 정렬, 상위 limit개 + doc_results = sorted(best_per_doc.values(), key=lambda r: r.score, reverse=True) + return doc_results[:limit], chunks_by_doc From 76e723cdb171c87e930b5ea2cc6ab045c4f2cd22 Mon Sep 17 00:00:00 2001 From: Hyungi Ahn Date: Wed, 8 Apr 2026 12:41:47 +0900 Subject: [PATCH 8/9] =?UTF-8?q?feat(search):=20Phase=201.3=20TEI=20reranke?= =?UTF-8?q?r=20=ED=86=B5=ED=95=A9=20(=EC=BD=94=EB=93=9C=20=EA=B3=A8?= =?UTF-8?q?=EA=B2=A9)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 데이터 흐름 원칙: fusion=doc 기준 / reranker=chunk 기준 — 절대 섞지 말 것. 신규/수정: - ai/client.py: rerank() 메서드 추가 (TEI POST /rerank API) - services/search/rerank_service.py: - rerank_chunks() — asyncio.Semaphore(2) + 5s soft timeout + RRF fallback - _make_snippet/_extract_window — title + query 중심 200~400 토큰 (keyword 매치 없으면 첫 800자 fallback) - apply_diversity() — max_per_doc=2, top score>=0.90 unlimited - warmup_reranker() — 10회 retry + 3초 간격 (TEI 모델 로딩 대기) - MAX_RERANK_INPUT=200, MAX_CHUNKS_PER_DOC=2 hard cap - services/search_telemetry.py: compute_confidence_reranked() — sigmoid score 임계값 - api/search.py: - ?rerank=true|false 파라미터 (기본 true, hybrid 모드만) - 흐름: fused_docs(limit*5) → chunks_by_doc 회수 → rerank_chunks → apply_diversity - text-only 매치 doc은 doc 자체를 chunk처럼 wrap (fallback) - rerank 활성 시 confidence는 reranker score 기반 - tests/search_eval/run_eval.py: --rerank true|false 플래그 GPU 적용 보류: - TEI 컨테이너 추가 (docker-compose.yml) — 별도 작업 - config.yaml rerank.endpoint 갱신 — GPU 직접 (commit 없음) - 재인덱싱 완료 후 build + warmup + 평가셋 측정 --- app/ai/client.py | 19 +++ app/api/search.py | 50 ++++++- app/services/search/rerank_service.py | 196 +++++++++++++++++++++++++- app/services/search_telemetry.py | 27 ++++ tests/search_eval/run_eval.py | 21 ++- 5 files changed, 306 insertions(+), 7 deletions(-) diff --git a/app/ai/client.py b/app/ai/client.py index d1ad612..601dad9 100644 --- a/app/ai/client.py +++ b/app/ai/client.py @@ -85,6 +85,25 @@ class AIClient: # TODO: Qwen2.5-VL-7B 비전 모델 호출 구현 raise NotImplementedError("OCR는 Phase 1에서 구현") + async def rerank(self, query: str, texts: list[str]) -> list[dict]: + """TEI bge-reranker-v2-m3 호출 (Phase 1.3). + + TEI POST /rerank API: + request: {"query": str, "texts": [str, ...]} + response: [{"index": int, "score": float}, ...] (정렬됨) + + timeout은 self.ai.rerank.timeout (config.yaml). + 호출자(rerank_service)가 asyncio.Semaphore + try/except로 감쌈. + """ + timeout = float(self.ai.rerank.timeout) if self.ai.rerank.timeout else 5.0 + response = await self._http.post( + self.ai.rerank.endpoint, + json={"query": query, "texts": texts}, + timeout=timeout, + ) + response.raise_for_status() + return response.json() + async def _call_chat(self, model_config, prompt: str) -> str: """OpenAI 호환 API 호출 + 자동 폴백""" try: diff --git a/app/api/search.py b/app/api/search.py index 918d2aa..c0e041d 100644 --- a/app/api/search.py +++ b/app/api/search.py @@ -16,10 +16,17 @@ from core.database import get_session from core.utils import setup_logger from models.user import User from services.search.fusion_service import DEFAULT_FUSION, get_strategy, normalize_display_scores +from services.search.rerank_service import ( + MAX_CHUNKS_PER_DOC, + MAX_RERANK_INPUT, + apply_diversity, + rerank_chunks, +) from services.search.retrieval_service import compress_chunks_to_docs, search_text, search_vector from services.search_telemetry import ( compute_confidence, compute_confidence_hybrid, + compute_confidence_reranked, record_search_event, ) @@ -104,6 +111,10 @@ async def search( pattern="^(legacy|rrf|rrf_boost)$", description="hybrid 모드 fusion 전략 (legacy=기존 가중합, rrf=RRF k=60, rrf_boost=RRF+강한신호 boost)", ), + rerank: bool = Query( + True, + description="bge-reranker-v2-m3 활성화 (Phase 1.3, hybrid 모드만 동작)", + ), debug: bool = Query(False, description="단계별 candidates + timing 응답에 포함"), ): """문서 검색 — FTS + ILIKE + 벡터 결합 (Phase 0.5: RRF fusion)""" @@ -145,13 +156,44 @@ async def search( t2 = time.perf_counter() strategy = get_strategy(fusion) - results = strategy.fuse(text_results, vector_results, q, limit) + # fusion은 doc 기준 — 더 넓게 가져옴 (rerank 후보용) + fusion_limit = max(limit * 5, 100) if rerank else limit + fused_docs = strategy.fuse(text_results, vector_results, q, fusion_limit) timing["fusion_ms"] = (time.perf_counter() - t2) * 1000 notes.append(f"fusion={strategy.name}") notes.append( f"chunks raw={len(raw_chunks)} compressed={len(vector_results)} " f"unique_docs={len(chunks_by_doc)}" ) + + if rerank: + # Phase 1.3: reranker — chunk 기준 입력 + # fusion 결과 doc_id로 chunks_by_doc에서 raw chunks 회수 + t3 = time.perf_counter() + rerank_input: list[SearchResult] = [] + for doc in fused_docs: + chunks = chunks_by_doc.get(doc.id, []) + if chunks: + # doc당 max 2 chunk (latency/VRAM 보호) + rerank_input.extend(chunks[:MAX_CHUNKS_PER_DOC]) + else: + # text-only 매치 doc → doc 자체를 chunk처럼 wrap + rerank_input.append(doc) + if len(rerank_input) >= MAX_RERANK_INPUT: + break + rerank_input = rerank_input[:MAX_RERANK_INPUT] + notes.append(f"rerank input={len(rerank_input)}") + + reranked = await rerank_chunks(q, rerank_input, limit * 3) + timing["rerank_ms"] = (time.perf_counter() - t3) * 1000 + + # diversity (chunk → doc 압축, max_per_doc=2, top score>0.90 unlimited) + t4 = time.perf_counter() + results = apply_diversity(reranked, max_per_doc=MAX_CHUNKS_PER_DOC)[:limit] + timing["diversity_ms"] = (time.perf_counter() - t4) * 1000 + else: + # rerank 비활성: fused_docs를 그대로 (limit 적용) + results = fused_docs[:limit] else: results = text_results @@ -162,8 +204,12 @@ async def search( timing["total_ms"] = (time.perf_counter() - t_total) * 1000 # confidence는 fusion 적용 전 raw 신호로 계산 (Phase 0.5 이후 fused score는 절대값 의미 없음) + # rerank 활성 시 reranker score가 가장 신뢰할 수 있는 신호 → 우선 사용 if mode == "hybrid": - confidence_signal = compute_confidence_hybrid(text_results, vector_results) + if rerank and "rerank_ms" in timing: + confidence_signal = compute_confidence_reranked(results) + else: + confidence_signal = compute_confidence_hybrid(text_results, vector_results) elif mode == "vector": confidence_signal = compute_confidence(vector_results, "vector") else: diff --git a/app/services/search/rerank_service.py b/app/services/search/rerank_service.py index 16d9373..c107f59 100644 --- a/app/services/search/rerank_service.py +++ b/app/services/search/rerank_service.py @@ -1,5 +1,199 @@ """Reranker 서비스 — bge-reranker-v2-m3 통합 (Phase 1.3). TEI 컨테이너 호출 + asyncio.Semaphore(2) + soft timeout fallback. -구현은 Phase 1.3에서 채움. + +데이터 흐름 원칙: +- fusion = doc 기준 / reranker = chunk 기준 — 절대 섞지 말 것 +- raw chunks를 끝까지 보존, fusion은 압축본만 사용 +- reranker는 chunks_by_doc dict에서 raw chunks 회수해서 chunk 단위로 호출 +- diversity는 reranker 직후 마지막 단계에서만 적용 + +snippet 생성: +- 200~400 토큰(800~1500자) 기준 +- query keyword 위치 중심 ±target_chars/2 윈도우 +- keyword 매치 없으면 첫 target_chars 문자 fallback (성능 손실 방지) """ + +from __future__ import annotations + +import asyncio +import re +from typing import TYPE_CHECKING + +import httpx + +from ai.client import AIClient +from core.utils import setup_logger + +if TYPE_CHECKING: + from api.search import SearchResult + +logger = setup_logger("rerank") + +# 동시 rerank 호출 제한 (GPU saturation 방지) +RERANK_SEMAPHORE = asyncio.Semaphore(2) + +# rerank input 크기 제한 (latency / VRAM hard cap) +MAX_RERANK_INPUT = 200 +MAX_CHUNKS_PER_DOC = 2 + +# Soft timeout (초) +RERANK_TIMEOUT = 5.0 + + +def _extract_window(text: str, query: str, target_chars: int = 800) -> str: + """query keyword 위치 중심으로 ±target_chars/2 윈도우 추출. + + fallback: keyword 매치 없으면 첫 target_chars 문자 그대로. + 이게 없으면 reranker가 무관한 텍스트만 보고 점수 매겨 성능 급락. + """ + keywords = [k for k in re.split(r"\s+", query) if len(k) >= 2] + best_pos = -1 + for kw in keywords: + pos = text.lower().find(kw.lower()) + if pos >= 0: + best_pos = pos + break + + if best_pos < 0: + # Fallback: 첫 target_chars 문자 + return text[:target_chars] + + half = target_chars // 2 + start = max(0, best_pos - half) + end = min(len(text), start + target_chars) + return text[start:end] + + +def _make_snippet(c: "SearchResult", query: str, max_chars: int = 1500) -> str: + """Reranker input snippet — title + query 중심 본문 윈도우. + + feedback_search_phase1_implementation.md 3번 항목 강제: + snippet 200~400 토큰(800~1500자), full document 절대 안 됨. + """ + title = c.title or "" + text = c.snippet or "" + + # snippet은 chunk text 앞 200자 또는 doc text 앞 200자 + # 더 긴 chunk text가 필요하면 호출자가 따로 채워서 넘김 + if len(text) > max_chars: + text = _extract_window(text, query, target_chars=max_chars - 100) + + return f"{title}\n\n{text}" + + +def _wrap_doc_as_chunk(doc: "SearchResult") -> "SearchResult": + """text-only 매치 doc(chunks_by_doc에 없는 doc)을 ChunkResult 형태로 변환. + + Phase 1.3 reranker 입력에 doc 자체가 들어가야 하는 경우. + snippet은 documents.extracted_text 앞 200자 (이미 SearchResult.snippet에 채워짐). + chunk_id 등은 None 그대로. + """ + return doc + + +async def rerank_chunks( + query: str, + candidates: list["SearchResult"], + limit: int, +) -> list["SearchResult"]: + """RRF 결과 candidates를 bge-reranker로 재정렬. + + Args: + query: 사용자 쿼리 + candidates: chunk-level SearchResult 리스트 (이미 chunks_by_doc에서 회수) + limit: 반환할 결과 수 + + Returns: + reranked SearchResult 리스트 (rerank score로 score 필드 업데이트) + + Fallback (timeout/HTTPError): RRF 순서 그대로 candidates[:limit] 반환. + """ + if not candidates: + return [] + + # input 크기 제한 (latency/VRAM hard cap) + if len(candidates) > MAX_RERANK_INPUT: + logger.warning( + f"rerank input {len(candidates)} > MAX {MAX_RERANK_INPUT}, 자름" + ) + candidates = candidates[:MAX_RERANK_INPUT] + + snippets = [_make_snippet(c, query) for c in candidates] + client = AIClient() + + try: + async with asyncio.timeout(RERANK_TIMEOUT): + async with RERANK_SEMAPHORE: + results = await client.rerank(query, snippets) + # results: [{"index": int, "score": float}, ...] (이미 정렬됨) + reranked: list["SearchResult"] = [] + for r in results: + idx = r.get("index") + sc = r.get("score") + if idx is None or sc is None or idx >= len(candidates): + continue + chunk = candidates[idx] + chunk.score = float(sc) + chunk.match_reason = (chunk.match_reason or "") + "+rerank" + reranked.append(chunk) + return reranked[:limit] + except (asyncio.TimeoutError, httpx.HTTPError) as e: + logger.warning(f"rerank failed → RRF fallback: {type(e).__name__}: {e}") + return candidates[:limit] + except Exception as e: + logger.warning(f"rerank unexpected error → RRF fallback: {type(e).__name__}: {e}") + return candidates[:limit] + finally: + await client.close() + + +async def warmup_reranker() -> bool: + """TEI 부팅 후 모델 로딩 완료 대기 (10회 retry). + + TEI는 health 200을 빠르게 반환하지만 첫 모델 로딩(10~30초) 전에는 + rerank 요청이 실패하거나 매우 느림. FastAPI startup 또는 첫 요청 전 호출. + """ + client = AIClient() + try: + for attempt in range(10): + try: + await client.rerank("warmup", ["dummy text for model load"]) + logger.info(f"reranker warmup OK (attempt {attempt + 1})") + return True + except Exception as e: + logger.info(f"reranker warmup retry {attempt + 1}: {e}") + await asyncio.sleep(3) + logger.error("reranker warmup failed after 10 attempts") + return False + finally: + await client.close() + + +def apply_diversity( + results: list["SearchResult"], + max_per_doc: int = MAX_CHUNKS_PER_DOC, + top_score_threshold: float = 0.90, +) -> list["SearchResult"]: + """chunk-level 결과를 doc 기준으로 압축 (max_per_doc). + + 조건부 완화: 가장 상위 결과 score가 threshold 이상이면 unlimited + (high confidence relevance > diversity). + """ + if not results: + return [] + + # 가장 상위 score가 threshold 이상이면 diversity 제약 해제 + top_score = results[0].score if results else 0.0 + if top_score >= top_score_threshold: + return results + + seen: dict[int, int] = {} + out: list["SearchResult"] = [] + for r in results: + doc_id = r.id + if seen.get(doc_id, 0) >= max_per_doc: + continue + out.append(r) + seen[doc_id] = seen.get(doc_id, 0) + 1 + return out diff --git a/app/services/search_telemetry.py b/app/services/search_telemetry.py index eb2dbb5..2bb82d1 100644 --- a/app/services/search_telemetry.py +++ b/app/services/search_telemetry.py @@ -149,6 +149,33 @@ def _cosine_to_confidence(cosine: float) -> float: return 0.10 +def compute_confidence_reranked(reranked_results: list[Any]) -> float: + """Phase 1.3 reranker score 기반 confidence. + + bge-reranker-v2-m3는 sigmoid score (0~1 범위)를 반환. + rerank 활성 시 fusion score보다 reranker score가 가장 신뢰할 수 있는 신호. + + 임계값(초안, 실측 후 조정 가능): + >= 0.95 → high + >= 0.80 → med-high + >= 0.60 → med + >= 0.40 → low-med + else → low + """ + if not reranked_results: + return 0.0 + top_score = float(getattr(reranked_results[0], "score", 0.0) or 0.0) + if top_score >= 0.95: + return 0.95 + if top_score >= 0.80: + return 0.80 + if top_score >= 0.60: + return 0.65 + if top_score >= 0.40: + return 0.50 + return 0.35 + + def compute_confidence_hybrid( text_results: list[Any], vector_results: list[Any], diff --git a/tests/search_eval/run_eval.py b/tests/search_eval/run_eval.py index ac3fa99..a7ac06f 100644 --- a/tests/search_eval/run_eval.py +++ b/tests/search_eval/run_eval.py @@ -133,6 +133,7 @@ async def call_search( mode: str = "hybrid", limit: int = 20, fusion: str | None = None, + rerank: str | None = None, ) -> tuple[list[int], float]: """검색 API 호출 → (doc_ids, latency_ms).""" url = f"{base_url.rstrip('/')}/api/search/" @@ -140,6 +141,8 @@ async def call_search( params: dict[str, str | int] = {"q": query, "mode": mode, "limit": limit} if fusion: params["fusion"] = fusion + if rerank is not None: + params["rerank"] = rerank import time @@ -165,6 +168,7 @@ async def evaluate( label: str, mode: str = "hybrid", fusion: str | None = None, + rerank: str | None = None, ) -> list[QueryResult]: """전체 쿼리셋 평가.""" results: list[QueryResult] = [] @@ -173,7 +177,7 @@ async def evaluate( for q in queries: try: returned_ids, latency_ms = await call_search( - client, base_url, token, q.query, mode=mode, fusion=fusion + client, base_url, token, q.query, mode=mode, fusion=fusion, rerank=rerank ) results.append( QueryResult( @@ -404,6 +408,13 @@ def main() -> int: choices=["legacy", "rrf", "rrf_boost"], help="hybrid 모드 fusion 전략 (Phase 0.5+, 미지정 시 서버 기본값)", ) + parser.add_argument( + "--rerank", + type=str, + default=None, + choices=["true", "false"], + help="bge-reranker-v2-m3 활성화 (Phase 1.3+, 미지정 시 서버 기본값=true)", + ) parser.add_argument( "--token", type=str, @@ -434,6 +445,8 @@ def main() -> int: print(f"Mode: {args.mode}", end="") if args.fusion: print(f" / fusion: {args.fusion}", end="") + if args.rerank: + print(f" / rerank: {args.rerank}", end="") print() all_results: list[QueryResult] = [] @@ -441,21 +454,21 @@ def main() -> int: if args.base_url: print(f"\n>>> evaluating: {args.base_url}") results = asyncio.run( - evaluate(queries, args.base_url, args.token, "single", mode=args.mode, fusion=args.fusion) + evaluate(queries, args.base_url, args.token, "single", mode=args.mode, fusion=args.fusion, rerank=args.rerank) ) print_summary("single", results) all_results.extend(results) else: print(f"\n>>> baseline: {args.baseline_url}") baseline_results = asyncio.run( - evaluate(queries, args.baseline_url, args.token, "baseline", mode=args.mode, fusion=args.fusion) + evaluate(queries, args.baseline_url, args.token, "baseline", mode=args.mode, fusion=args.fusion, rerank=args.rerank) ) baseline_summary = print_summary("baseline", baseline_results) print(f"\n>>> candidate: {args.candidate_url}") candidate_results = asyncio.run( evaluate( - queries, args.candidate_url, args.token, "candidate", mode=args.mode, fusion=args.fusion + queries, args.candidate_url, args.token, "candidate", mode=args.mode, fusion=args.fusion, rerank=args.rerank ) ) candidate_summary = print_summary("candidate", candidate_results) From f4f9de44027f389354eff289a263d2e3c211b9ea Mon Sep 17 00:00:00 2001 From: Hyungi Ahn Date: Wed, 8 Apr 2026 12:47:22 +0900 Subject: [PATCH 9/9] =?UTF-8?q?fix(search):=20Phase=201.2-C=20doc-level=20?= =?UTF-8?q?aggregation=EC=9C=BC=EB=A1=9C=20=EB=8B=A4=EC=96=91=EC=84=B1=20?= =?UTF-8?q?=ED=9A=8C=EB=B3=B5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Phase 1.2-C 평가셋: Recall 0.788 → 0.531, natural_language 0.73 → 0.07. 진단: 단순 chunk top-N(limit*5=25)으로 raw chunks 가져왔는데 같은 doc의 여러 chunks가 상위에 몰림 → unique doc 다양성 붕괴. warm test debug: 'chunks raw=16 compressed=5 unique_docs=10' 해결 (사용자 추천 C): Window function ROW_NUMBER() PARTITION BY doc_id로 doc당 top 2 chunks만 반환. SQL 흐름: 1. inner CTE topk: ivfflat 인덱스로 top inner_k chunks 빠르게 (inner_k = max(limit*10, 200)) 2. ranked CTE: PARTITION BY doc_id ORDER BY dist ROW_NUMBER 3. outer: rn <= 2 (doc당 max 2 chunks) + JOIN documents 4. limit = limit * 4 (chunks 단위, ~limit*2 unique docs) reranker 호환: doc당 max 2 chunks 그대로 반환 → chunks_by_doc 보존 compress_chunks_to_docs는 그대로 동작 (best chunk per doc) Phase 1.3 reranker가 chunks_by_doc에서 raw chunks 회수 가능 핵심 원칙: vector retrieval은 chunk로 찾고 doc으로 선택해야 한다. --- app/services/search/retrieval_service.py | 68 +++++++++++++++++------- 1 file changed, 48 insertions(+), 20 deletions(-) diff --git a/app/services/search/retrieval_service.py b/app/services/search/retrieval_service.py index 568ec4a..e690ef6 100644 --- a/app/services/search/retrieval_service.py +++ b/app/services/search/retrieval_service.py @@ -121,16 +121,24 @@ async def search_text( async def search_vector( session: AsyncSession, query: str, limit: int ) -> list["SearchResult"]: - """벡터 유사도 검색 — chunk-level (Phase 1.2-C). + """벡터 유사도 검색 — chunk-level + doc 다양성 보장 (Phase 1.2-C). - document_chunks 테이블에서 cosine similarity로 raw chunks 반환. - 같은 doc에서 여러 chunks가 들어올 수 있음 (압축 안 함). - fusion 직전에 compress_chunks_to_docs() helper로 doc 기준 압축 필요. - Phase 1.3 reranker는 raw chunks를 그대로 활용. + Phase 1.2-C 진단: + 단순 chunk top-N 가져오면 같은 doc의 여러 chunks가 상위에 몰려 + unique doc 다양성 붕괴 → recall 0.788 → 0.531 (catastrophic). - SearchResult.id = doc_id (fusion 호환) - SearchResult.chunk_id / chunk_index / section_title = chunk 메타 - snippet = chunk의 text 앞 200자 + 해결 (사용자 추천 C 방식): + Window function으로 doc_id 기준 PARTITION → 각 doc의 top 2 chunks만 반환. + raw_chunks(chunks_by_doc 보존)와 doc-level 압축 둘 다 만족. + + SQL 흐름: + 1. inner CTE: ivfflat 인덱스로 top-K chunks 빠르게 추출 + 2. ranked CTE: doc_id PARTITION 후 score 내림차순 ROW_NUMBER + 3. outer: rn <= 2 (doc당 max 2 chunks) + JOIN documents + + Returns: + list[SearchResult] — chunk-level, 각 doc 최대 2개. compress_chunks_to_docs로 + doc-level 압축 + chunks_by_doc 보존. """ from api.search import SearchResult # 순환 import 회피 @@ -141,29 +149,49 @@ async def search_vector( except Exception: return [] - # raw chunks를 doc 메타와 join. limit * 5 정도 넓게 → 압축 후 doc 다양성. - fetch_limit = limit * 5 + # ivfflat 인덱스로 top-K chunks 추출 후 doc 단위 partition + # inner_k = limit * 10 정도로 충분 unique doc 확보 (~30~50 docs) + inner_k = max(limit * 10, 200) result = await session.execute( text(""" + WITH topk AS ( + SELECT + c.id AS chunk_id, + c.doc_id, + c.chunk_index, + c.section_title, + c.text, + c.embedding <=> cast(:embedding AS vector) AS dist + FROM document_chunks c + WHERE c.embedding IS NOT NULL + ORDER BY c.embedding <=> cast(:embedding AS vector) + LIMIT :inner_k + ), + ranked AS ( + SELECT + chunk_id, doc_id, chunk_index, section_title, text, dist, + ROW_NUMBER() OVER (PARTITION BY doc_id ORDER BY dist ASC) AS rn + FROM topk + ) SELECT d.id AS id, d.title AS title, d.ai_domain AS ai_domain, d.ai_summary AS ai_summary, d.file_format AS file_format, - (1 - (c.embedding <=> cast(:embedding AS vector))) AS score, - left(c.text, 200) AS snippet, + (1 - r.dist) AS score, + left(r.text, 200) AS snippet, 'vector' AS match_reason, - c.id AS chunk_id, - c.chunk_index AS chunk_index, - c.section_title AS section_title - FROM document_chunks c - JOIN documents d ON d.id = c.doc_id - WHERE c.embedding IS NOT NULL AND d.deleted_at IS NULL - ORDER BY c.embedding <=> cast(:embedding AS vector) + r.chunk_id AS chunk_id, + r.chunk_index AS chunk_index, + r.section_title AS section_title + FROM ranked r + JOIN documents d ON d.id = r.doc_id + WHERE r.rn <= 2 AND d.deleted_at IS NULL + ORDER BY r.dist LIMIT :limit """), - {"embedding": str(query_embedding), "limit": fetch_limit}, + {"embedding": str(query_embedding), "inner_k": inner_k, "limit": limit * 4}, ) return [SearchResult(**row._mapping) for row in result]