21 Commits

Author SHA1 Message Date
Hyungi Ahn
0b511a8db0 fix(upload): 100MB 초과 파일 사전 차단 + NAS file_watcher 안내
home-caddy 의 request_body max_size 100MB 한도 (infra_inventory.md D8 /
Cloudflare 섹션 참조) 에 걸리는 업로드 시 사용자 콘솔에 의미 없는 413 만
나오던 문제. 이제:

1. 클라이언트 사전 검사: 100MB 초과 파일은 업로드 자체를 시도 안 하고
   즉시 toast 로 안내 (파일명 + 크기 + NAS 우회 경로)
2. 서버 fallback: 사전 검사를 통과했으나 인프라 한도에 걸려 413 응답이
   오는 경우에도 같은 안내 메시지

NAS 우회 경로: NAS 의 PKM 폴더에 직접 두면 file_watcher 가 5분 간격으로
자동 인덱싱. 이게 100MB+ 파일의 정식 처리 경로 (infra_inventory.md
Cloudflare 섹션의 413 정책).
2026-04-09 14:24:13 +09:00
Hyungi Ahn
4615fb4ce3 fix(documents): page_size 한도 100 → 500 (inbox 291건 누락 회피)
Inbox 가 review_status=pending 서버 필터로 받는데 pending 이 291 건 이라
page_size 100 으론 191 건 누락. inbox 는 작업 큐 성격이라 한 번에 보는 게 UX.
500 으로 상향: data 폭발 없음(filter 로 boundedness 보장), latency 영향 미미.

전략적 임시 — Phase 4.5 UI 작업에서 inbox 에 infinite scroll 또는 pagination
추가하면 le=100 으로 다시 내려도 됨.
2026-04-09 08:35:58 +09:00
Hyungi Ahn
cdcbb07561 fix(inbox): page_size=200 → 422 해결, review_status 서버 필터 추가
Inbox 페이지가 /documents/?page_size=200 를 호출하는데 백엔드 Query 가
le=100 이라 422 발생 — Phase 2 첫 commit(2026-04-02)부터 dormant 버그.
inbox 코드 안에 'TODO(backend): review_status filter 지원 시 page_size 축소'
주석이 있던 상태.

backend:
- list_documents 에 review_status: str | None Query 파라미터 추가
- WHERE 절에 review_status 매칭 분기 추가

frontend:
- /documents/?review_status=pending&page_size=100 으로 변경
- 클라이언트 필터링 코드 제거 (서버 필터로 대체)

100 미만 안전. pending 이 100 넘으면 다음 페이지 로직 추가 필요 (별도 작업).
2026-04-09 08:31:51 +09:00
Hyungi Ahn
46ba9dd231 fix(digest/loader): raw SQL pgvector string 형태 파싱 지원
raw text() SQL + asyncpg 조합에서는 pgvector Vector(1024) 컬럼이
'[0.087,0.305,...]' 형태의 string 으로 반환되며 numpy 변환이 실패함
(ORM 을 쓰면 type 등록되지만 raw SQL 은 안 됨).

_to_numpy_embedding 에서 string 이면 json.loads 로 먼저 파싱한 뒤
numpy.asarray. 변환 실패 시 None 반환 (해당 doc 자동 drop).

Phase 4 deploy 워커 첫 실행 검증 중 발견.
2026-04-09 08:00:43 +09:00
Hyungi Ahn
4468c2baba fix(database): 마이그레이션 실행을 raw driver 로 변경
text(sql) 은 SQLAlchemy 가 :name 패턴을 named bind parameter 로 해석하므로
SQL 주석이나 literal 안에 콜론이 들어가면 InvalidRequestError 발생.
ai_summary[:200] 같은 표기가 들어간 101_global_digests.sql 적용 시 fastapi
startup 자체가 떨어지는 문제가 발생.

exec_driver_sql 은 SQL 을 driver(asyncpg) 에 그대로 전달하므로 콜론 escape 불요.
schema_migrations INSERT 만 named bind 가 필요하므로 그건 그대로 유지.

Phase 4 deploy 검증 중 발견. 다음 마이그레이션부터는 자동 적용 가능.
2026-04-09 07:59:25 +09:00
Hyungi Ahn
9bef049af6 fix(migration): SQLAlchemy text() bind 충돌 회피 — [:200] 표기 제거
migration 101 의 SQL 주석에 '[:200]' 이 들어 있었는데 SQLAlchemy text() 가
:200 을 named bind parameter 로 해석해 init_db() 가 'A value is required for
bind parameter 200' 로 실패. fastapi startup 자체가 떨어지는 문제.

주석을 '첫 200자' 로 고쳐서 콜론+숫자/영문 패턴 제거.
2026-04-09 07:56:50 +09:00
Hyungi Ahn
dd9a0f600a fix(database): migrations dir 경로 한 단계 잘못된 버그 수정
_run_migrations 가 Path(__file__).parent.parent.parent / "migrations" 로 계산했는데
/app/core/database.py 기준으로 parent.parent.parent = / (root) 가 되어
실제 경로는 /migrations 였음. 컨테이너 안에는 /app/migrations 에 마운트되므로
디렉토리 부재로 자동 스킵 → 추가 마이그레이션이 자동 적용되지 않는 dormant 버그.

parent.parent (= /app) 로 수정. 회귀 위험 0 (기존엔 어차피 동작 안 했음).
Phase 4 deploy 검증 중 발견 — 직전 commit 의 volume mount 와 함께 동작.
2026-04-09 07:55:10 +09:00
Hyungi Ahn
d5f91556e6 fix(deploy): mount migrations into fastapi container
기존 fastapi build context는 ./app이라 부모 디렉토리의 migrations/가
컨테이너에 들어가지 않아 init_db()의 _run_migrations가 디렉토리 부재로 스킵.
016까지는 postgres docker-entrypoint-initdb.d 마운트로 첫 init 시점에만
적용되었고, 이후 추가된 마이그레이션(101 등)이 자동 적용되지 못하는 문제.

./migrations:/app/migrations:ro 한 줄 마운트로 init_db()가 100+ 마이그레이션
추적 + 적용 가능. Phase 4 deploy 검증 중 발견.
2026-04-09 07:53:22 +09:00
Hyungi Ahn
75a1919342 feat(digest): Phase 4 Global News Digest (cluster-level batch summarization)
7일 rolling window 뉴스를 country × topic 2-level로 묶어 매일 04:00 KST 배치 생성.
search 파이프라인 미사용. documents → clustering → cluster-level LLM summarization → digest.

핵심 결정:
- adaptive threshold (0.75/0.78/0.80) + EMA centroid (α=0.7) + time-decay (λ=ln(2)/3)
- min_articles=3, max_topics=10/country, top-5 MMR diversity, ai_summary[:300] truncate
- cluster-level LLM only, drop금지 fallback (topic_label="주요 뉴스 묶음" + top member ai_summary[:200])
- importance_score country별 0~1 normalize + raw_weight_sum 별도 보존, max(score, 0.01) floor
- per-call timeout 25s + pipeline hard cap 600s
- DELETE+INSERT idempotent (UNIQUE digest_date), AIClient._call_chat 직접 호출 (client.py 수정 없음)

신규:
- migrations/101_global_digests.sql (2테이블 정규화)
- app/models/digest.py (GlobalDigest + DigestTopic ORM)
- app/services/digest/{loader,clustering,selection,summarizer,pipeline}.py
- app/workers/digest_worker.py (PIPELINE_HARD_CAP + CLI 진입점)
- app/api/digest.py (/latest, ?date|country, /regenerate, inline Pydantic)
- app/prompts/digest_topic.txt (JSON-only + 절대 금지 블록)

main.py 4줄: import 2 + scheduler add_job 1 + include_router 1.
plan: ~/.claude/plans/quiet-herding-tome.md
2026-04-09 07:45:11 +09:00
Hyungi Ahn
64322e4f6f feat(search): Phase 3 Ask pipeline (evidence + synthesis + /api/search/ask)
- llm_gate.py: MLX single-inference 전역 semaphore (analyzer/evidence/synthesis 공유)
- search_pipeline.py: run_search() 추출, /search 와 /ask 단일 진실 소스
- evidence_service.py: Rule + LLM span select (EV-A), doc-group ordering,
  span too-short 자동 확장(<80자→120자), fallback 은 query 중심 window 강제
- synthesis_service.py: grounded answer + citation 검증 + LRU 캐시(1h/300),
  refused 처리, span_text ONLY 룰 (full_snippet 프롬프트 금지)
- /api/search/ask: 15s timeout, 9가지 failure mode + 한국어 no_results_reason
- rerank_service: rerank_score raw 보존 (display drift 방지)
- query_analyzer: _get_llm_semaphore 를 llm_gate.get_mlx_gate 로 위임
- prompts: evidence_extract.txt, search_synthesis.txt (JSON-only, example 포함)

config.yaml / docker / ollama / infra_inventory 변경 없음.
plan: ~/.claude/plans/quiet-meandering-nova.md

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-09 07:34:08 +09:00
Hyungi Ahn
120db86d74 docs(search): Phase 2 최종 측정 보고서 (phase2_final.md + csv A/B)
## 결과 요약

Phase 1.3 baseline vs Phase 2 final A/B (평가셋 v0.1, 23 쿼리):
 - Recall@10:  0.730 → 0.737 (+0.007)
 - NDCG@10:    0.663 → 0.668 (+0.005)
 - Top-3 hit:  0.900 → 0.900 (0)
 - p95 latency: 171ms → 256ms (+85)
 - news_crosslingual NDCG: 0.27 → 0.37 (+0.10 ✓)
 - exact_keyword / natural_language_ko: 완전 유지 (회귀 0)

## Phase 2 게이트: 2/6 통과
 ✓ news_crosslingual NDCG ≥ 0.30
 ✓ latency p95 < 400ms
  Recall@10 ≥ 0.78 (0.737)
  Top-3 hit ≥ 0.93 (0.900)
  crosslingual_ko_en NDCG ≥ 0.65 (0.53, bge-m3 한계)
  평가셋 v0.2 작성 (후속)

## 핵심 성과 (게이트 미달이지만 견고한 기반)
 1. QueryAnalyzer async-only 아키텍처 (retrieval 차단 0)
 2. semaphore concurrency=1 (MLX single-inference queue 폭발 방지)
 3. multilingual narrowing (news/global 한정 → 회귀 0 + news 개선)
 4. soft_filter boost 보수적 설정 (0.01, domain only)
 5. prewarm 15개 → cache hit rate 70%+

## infra_inventory.md soft lock 준수
 - config.yaml / Ollama / compose restart 변경 0

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-08 15:52:21 +09:00
Hyungi Ahn
01f144ab25 fix(search): soft_filter boost 약화 (domain 0.01, doctype 제거)
## 1차 측정 결과 (Phase 2.3 초안)

| metric | Phase 2.2 narrow | Phase 2.3 (boost 0.03+0.02) | Δ |
|---|---|---|---|
| Recall@10 | 0.737 | 0.721 | -0.016  |
| NDCG@10 | 0.668 | 0.661 | -0.007 |
| exact_keyword NDCG | 0.96 | 0.93 | -0.03  |

## 진단
- 같은 도메인 doc이 **무차별** boost → exact match doc 상대 우위 손상
- document_type 매칭은 ai_domain/match_reason 휴리스틱 → false positive 다수

## 수정
- SOFT_FILTER_DOMAIN_BOOST 0.03 → **0.01**
- document_type 매칭 로직 제거
- domain 매칭을 "정확 일치 또는 path 포함"으로 좁힘
- max cap 0.05 유지

## Phase 2.3 위치
 - 현재 평가셋(v0.1)에는 filter 쿼리 없음 → 효과 직접 측정 불가
 - Phase 2.4에서 queries_v0.2.yaml 확장 후 재측정 예정
 - 이 커밋의 목적은 "회귀 방지" — boost가 해를 끼치지 않도록만

(+ CLAUDE.md 동기화: infra_inventory.md 참조 / soft lock 섹션 포함)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-08 15:40:04 +09:00
Hyungi Ahn
e91c199537 feat(search): Phase 2.3 soft_filter boost (domain/doctype)
## 변경

### fusion_service.py
 - SOFT_FILTER_MAX_BOOST = 0.05 (plan 영구 룰, RRF score 왜곡 방지)
 - SOFT_FILTER_DOMAIN_BOOST = 0.03, SOFT_FILTER_DOCTYPE_BOOST = 0.02
 - apply_soft_filter_boost(results, soft_filters) → int
   - ai_domain 부분 문자열 매칭 (path 포함 e.g. "Industrial_Safety/Legislation")
   - document_type 토큰 매칭 (ai_domain + match_reason 헤이스택)
   - 상한선 0.05 강제
   - boost 후 score 기준 재정렬

### api/search.py
 - fusion 직후 호출 조건:
   - analyzer_cache_hit == True
   - analyzer_tier != "ignore" (confidence >= 0.5)
   - query_analysis.soft_filters 존재
 - notes에 "soft_filter_boost applied=N" 기록

## Phase 2.3 범위
 - hard_filter SQL WHERE는 현재 평가셋에 명시 필터 쿼리 없어 효과 측정 불가 → Phase 2.4 v0.2 확장 후
 - document_type의 file_format 직접 매칭은 의미론적 mismatch → 제외
 - hard_filter는 Phase 2.4 이후 iteration

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-08 15:30:23 +09:00
Hyungi Ahn
e595283e27 fix(search): Phase 2.2 multilingual 활성 조건을 news/global 한정으로 좁힘
## 1차 측정 결과

| metric | Phase 1.3 | Phase 2.2 (all domains) | Δ |
|---|---|---|---|
| Recall@10 | 0.730 | 0.683 | -0.047  |
| natural_language_ko NDCG | 0.73 | 0.63 | -0.10  |
| news_crosslingual NDCG | 0.27 | 0.37 | +0.10 ✓ |
| crosslingual_ko_en NDCG | 0.53 | 0.50 | -0.03  |

document 도메인에서 ko→en 번역 쿼리가 한국어 법령 검색에 noise로 작용.
"기계 사고 관련 법령" → "machinery accident laws" 영어 embedding이
한국어 법령 문서와 매칭 약해서 ko 결과를 오히려 밀어냄.

## 수정

use_multilingual 조건 강화:
 - 기존: analyzer_tier == "analyzed" + normalized_queries >= 2
 - 추가: domain_hint == "news" OR language_scope == "global"

즉 document 도메인은 기존 single-query 경로 유지 → 회귀 복구.
news / global 영역만 multilingual → news_crosslingual 개선 유지.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-08 15:20:05 +09:00
Hyungi Ahn
21a78fbbf0 fix(search): semaphore로 LLM concurrency=1 강제 + run_eval analyze 파라미터 추가
## 배경
1차 Phase 2.2 eval에서 발견: 23개 쿼리가 순차 호출되지만 각 request의
background analyzer task는 모두 동시에 MLX에 요청 날림 → MLX single-inference
서버 queue 폭발 → 22개가 15초 timeout. cache 채워지지 않음.

## 수정

### query_analyzer.py
 - LLM_CONCURRENCY = 1 상수 추가
 - _LLM_SEMAPHORE: lazy init asyncio.Semaphore (event loop 바인딩)
 - analyze() 내부: semaphore → timeout(실제 LLM 호출만) 이중 래핑
   semaphore 대기 시간이 timeout에 포함되지 않도록 주의

### run_eval.py
 - --analyze true|false 파라미터 추가 (Phase 2.1+ 측정용)
 - call_search / evaluate 시그니처에 analyze 전달

## 기대 효과
 - prewarm/background/동기 호출 모두 1개씩 순차 MLX 호출
 - 23개 대기 시 최악 230초 소요, 단 모두 성공해서 cache 채움
 - MLX 서버 부하 안정

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-08 15:12:13 +09:00
Hyungi Ahn
f5c3dea833 feat(search): Phase 2.2 multilingual vector retrieval + query embed cache
## 변경 사항

### app/services/search/retrieval_service.py
 - **_QUERY_EMBED_CACHE**: 모듈 레벨 LRU (maxsize=500, TTL=24h)
   - sha256(text|bge-m3) 키. fixed query 재호출 시 vector_ms 절반 감소.
 - **_get_query_embedding(client, text)**: cache-first helper. 기존 search_vector()도 이를 사용하도록 교체.
 - **search_vector_multilingual(session, normalized_queries, limit)**: 신규
   - normalized_queries 각 언어별 embedding 병렬 생성 (cache hit 활용)
   - 각 embedding에 대해 docs+chunks hybrid retrieval 병렬
   - weight 기반 score 누적 merge (lang_weight 이미 1.0 정규화)
   - match_reason에 "ml_ko+en" 등 언어 병합 표시
   - 호출 조건 문서화 — cache hit + analyzer_tier=analyzed 시에만

### app/api/search.py
 - use_multilingual 결정 로직:
   - analyzer_cache_hit == True
   - analyzer_tier == "analyzed" (confidence >= 0.85)
   - normalized_queries >= 2 (다언어 버전 실제 존재)
 - 위 3조건 모두 만족할 때만 search_vector_multilingual 호출
 - 그 외 모든 경로 (cache miss, low conf, single lang)는 기존 search_vector 그대로 사용 (회귀 0 보장)
 - notes에 `multilingual langs=[ko, en, ...]` 기록

## 기대 효과
 - crosslingual_ko_en NDCG 0.53 → 0.65+ (Phase 2 목표)
 - 기존 경로 완전 불변 → 회귀 0
 - Phase 2.1 async 구조와 결합해 "cache hit일 때만 활성" 조건 준수

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-08 14:59:20 +09:00
Hyungi Ahn
1e80d4c613 fix(search): query_analyzer가 setup_logger 사용하도록 수정
기본 logging.getLogger()는 WARNING 레벨이라 prewarm/analyze 진행 로그가
stdout/파일 어디에도 안 찍혔음. setup_logger("query_analyzer")로 교체하면
logs/query_analyzer.log + stdout 둘 다 INFO 레벨 출력.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-08 14:52:46 +09:00
Hyungi Ahn
324537cbc8 fix(search): LLM_TIMEOUT_MS 5000 → 15000 (실측 반영)
축소 프롬프트 재측정:
  - prompt_tok 2406 → 802 (1/3 감소 성공)
  - latency 10.5초 → 7~11초 (generation이 dominant)
  - max_tokens 내려도 무효 (자연 EOS ~289 tok)

5000ms로는 여전히 모든 prewarm timeout. async 구조이므로
background에서 15초 기다려도 retrieval 경로 영향 0.

추가: prewarm delay_between 0.5 → 0.2 (총 prewarm 시간 단축).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-08 14:50:56 +09:00
Hyungi Ahn
c81b728ddf refactor(search): Phase 2.1 QueryAnalyzer를 async-only 구조로 전환
## 철학 수정 (실측 기반)

gemma-4-26b-a4b-it-8bit MLX 실측:
  - full query_analyze.txt (prompt_tok=2406) → 10.5초
  - max_tokens 축소 무효 (모델 자연 EOS 조기 종료)
  - 쿼리 길이 영향 거의 없음 (프롬프트 자체가 지배)
  → 800ms timeout 가정은 13배 초과. 동기 호출 완전히 불가능.

따라서 QueryAnalyzer는 "즉시 실행하는 기능" → "미리 준비해두는 기능"으로
포지셔닝 변경. retrieval 경로에서 analyzer 동기 호출 **금지**.

## 구조

```
query → retrieval (항상 즉시)
         ↘ trigger_background_analysis (fire-and-forget)
            → analyze() [5초+] → cache 저장

다음 호출 (동일 쿼리) → get_cached() 히트 → Phase 2 파이프라인 활성화
```

## 변경 사항

### app/prompts/query_analyze.txt
 - 5971 chars → 2403 chars (40%)
 - 예시 4개 → 1개, 규칙 설명 축약
 - 목표 prompt_tok 2406 → ~600 (1/4)

### app/services/search/query_analyzer.py
 - LLM_TIMEOUT_MS 800 → 5000 (background이므로 여유 OK)
 - PROMPT_VERSION v1 → v2 (cache auto-invalidate)
 - get_cached / set_cached 유지 — retrieval 경로 O(1) 조회
 - trigger_background_analysis(query) 신규 — 동기 함수, 즉시 반환, task 생성
 - _PENDING set으로 task 참조 유지 (premature GC 방지)
 - _INFLIGHT set으로 동일 쿼리 중복 실행 방지
 - prewarm_analyzer() 신규 — startup에서 15~20 쿼리 미리 분석
 - DEFAULT_PREWARM_QUERIES: 평가셋 fixed 7 + 법령 3 + 뉴스 2 + 실무 3

### app/api/search.py
 - 기존 sync analyzer 호출 완전 제거
 - analyze=True → get_cached(q) 조회만 O(1)
   - hit: query_analysis 활용 (Phase 2.2/2.3 파이프라인 조건부 활성화)
   - miss: trigger_background_analysis(q) + 기존 경로 그대로
 - timing["analyze_ms"] 제거 (경로에 LLM 호출 없음)
 - notes에 analyzer cache_hit/cache_miss 상태 기록
 - debug.query_analysis는 cache hit 시에만 채워짐

### app/main.py
 - lifespan startup에 prewarm_analyzer() background task 추가
 - 논블로킹 — 앱 시작 막지 않음
 - delay_between=0.5로 MLX 부하 완화

## 기대 효과

 - cold 요청 latency: 기존 Phase 1.3 그대로 (회귀 0)
 - warm 요청 + prewarmed: cache hit → query_analysis 활용
 - 예상 cache hit rate: 초기 70~80% (prewarm) + 사용 누적
 - Phase 2.2/2.3 multilingual/filter 기능은 cache hit 시에만 동작

## 참조

 - memory: feedback_analyzer_async_only.md (영구 룰 저장)
 - plan: ~/.claude/plans/zesty-painting-kahan.md ("철학 수정" 섹션)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-08 14:47:09 +09:00
Hyungi Ahn
d28ef2fca0 feat(search): Phase 2.1 QueryAnalyzer + LRU cache + confidence 3-tier
QueryAnalyzer 스켈레톤 구현. 자연어 쿼리를 구조화된 분석 결과로 변환.
Phase 2.1은 debug 노출 + tier 판정까지만 — retrieval 경로는 변경 X (회귀 0 목표).
multilingual/filter 실제 분기는 2.2/2.3에서 이 분석 결과를 활용.

app/prompts/query_analyze.txt
 - gemma-4 JSON-only 응답 규약
 - intent/query_type/domain_hint/language_scope/normalized_queries/
   hard_filters/soft_filters/expanded_terms/analyzer_confidence
 - 4가지 예시 (자연어 법령, 정확 조항, 뉴스 다국어, 의미 불명)
 - classify.txt 구조 참고

app/services/search/query_analyzer.py
 - LLM_TIMEOUT_MS=800 (MLX 멈춤 시 검색 전체 멈춤 방지, 절대 늘리지 말 것)
 - MAX_NORMALIZED_QUERIES=3 (multilingual explosion 방지)
 - in-memory FIFO LRU (maxsize=1000, TTL=86400)
 - cache key = sha256(query + PROMPT_VERSION + primary.model)
   → 모델/프롬프트 변경 시 자동 invalidate
 - 저신뢰(<0.5) / 실패 결과 캐시 금지
 - weight 합=1.0 정규화 (fusion 왜곡 방지)
 - 실패 시 analyzer_confidence=float 0.0 (None 금지, TypeError 방지)

app/api/search.py
 - ?analyze=true|false 파라미터 (default False — 회귀 영향 0)
 - query_analyzer.analyze() 호출 + timing["analyze_ms"] 기록
 - _analyzer_tier(conf) → "ignore" | "original_fallback" | "merge" | "analyzed"
   (tier 게이트: 0.5 / 0.7 / 0.85)
 - debug.query_analysis 필드 채움 + notes에 tier/fallback_reason
 - logger 라인에 analyzer conf/tier 병기

app/services/search_telemetry.py
 - record_search_event(analyzer_confidence=None) 추가
 - base_ctx에 analyzer_confidence 기록 (다층 confidence 시드)
 - result confidence와 분리된 축 — Phase 2.2+에서 failure 분류에 활용

검증:
 - python3 -m py_compile 통과
 - 런타임 검증은 GPU 재배포 후 수행 (fixed 7 query + 평가셋)

참조: ~/.claude/plans/zesty-painting-kahan.md (Phase 2.1 섹션)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-08 14:21:37 +09:00
Hyungi Ahn
de08735420 fix(ai): primary -> mlx-proxy 8801 + align model to gemma
- endpoint: 100.76.254.116:8800 -> :8801 (route through mlx-proxy for
  /status observability - active_jobs / total_requests)
- model: Qwen3.5-35B-A3B-4bit -> gemma-4-26b-a4b-it-8bit (match the
  model actually loaded on mlx-proxy)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-08 04:40:06 +00:00
36 changed files with 3689 additions and 163 deletions

View File

@@ -1,5 +1,20 @@
# hyungi_Document_Server — Claude Code 작업 가이드
## Infrastructure Reference 📌
**Always refer to** `~/.claude/projects/-Users-hyungiahn/memory/infra_inventory.md` for:
- AI model routing (primary / fallback / embedding / rerank / vision) — **the model names below may be stale**
- Machine info, Tailscale IPs, SSH targets
- Docker container topology and compose projects
- Drift log (known Desired vs Actual inconsistencies)
- Verify commands
**If this file and `infra_inventory.md` disagree, `infra_inventory.md` is authoritative.** Do not change `config.yaml` / `credentials.env` without first updating `infra_inventory.md`.
**Search experiment soft lock**: During Phase 2 work (search.py refactor, QueryAnalyzer, run_eval.py execution), do **not** run `docker compose restart`, change `config.yaml`, or pull Ollama models. Violating this invalidates the experiment baseline.
---
## 프로젝트 개요
Self-hosted PKM(Personal Knowledge Management) 웹 애플리케이션.

164
app/api/digest.py Normal file
View File

@@ -0,0 +1,164 @@
"""Phase 4 Global Digest API — read-only + 디버그 regenerate.
엔드포인트:
- GET /api/digest/latest : 가장 최근 digest
- GET /api/digest?date=YYYY-MM-DD : 특정 날짜 digest
- GET /api/digest?country=KR : 특정 국가만
- POST /api/digest/regenerate : 백그라운드 digest 워커 트리거 (auth 필요)
응답은 country → topic 2-level 구조. country 가 비어있는 경우 응답에서 자동 생략.
"""
import asyncio
from datetime import date as date_type
from datetime import datetime
from typing import Annotated
from fastapi import APIRouter, Depends, HTTPException, Query
from pydantic import BaseModel
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload
from core.auth import get_current_user
from core.database import get_session
from models.digest import DigestTopic, GlobalDigest
from models.user import User
router = APIRouter()
# ─── Pydantic 응답 모델 (schemas/ 디렉토리 미사용 → inline 정의) ───
class TopicResponse(BaseModel):
topic_rank: int
topic_label: str
summary: str
article_ids: list[int]
article_count: int
importance_score: float
raw_weight_sum: float
llm_fallback_used: bool
class CountryGroup(BaseModel):
country: str
topics: list[TopicResponse]
class DigestResponse(BaseModel):
digest_date: date_type
window_start: datetime
window_end: datetime
decay_lambda: float
total_articles: int
total_countries: int
total_topics: int
generation_ms: int | None
llm_calls: int
llm_failures: int
status: str
countries: list[CountryGroup]
# ─── helpers ───
def _build_response(digest: GlobalDigest, country_filter: str | None = None) -> DigestResponse:
"""ORM 객체 → DigestResponse. country_filter 가 주어지면 해당 국가만."""
topics_by_country: dict[str, list[TopicResponse]] = {}
for t in sorted(digest.topics, key=lambda x: (x.country, x.topic_rank)):
if country_filter and t.country != country_filter:
continue
topics_by_country.setdefault(t.country, []).append(
TopicResponse(
topic_rank=t.topic_rank,
topic_label=t.topic_label,
summary=t.summary,
article_ids=list(t.article_ids or []),
article_count=t.article_count,
importance_score=t.importance_score,
raw_weight_sum=t.raw_weight_sum,
llm_fallback_used=t.llm_fallback_used,
)
)
countries = [
CountryGroup(country=c, topics=topics_by_country[c])
for c in sorted(topics_by_country.keys())
]
return DigestResponse(
digest_date=digest.digest_date,
window_start=digest.window_start,
window_end=digest.window_end,
decay_lambda=digest.decay_lambda,
total_articles=digest.total_articles,
total_countries=digest.total_countries,
total_topics=digest.total_topics,
generation_ms=digest.generation_ms,
llm_calls=digest.llm_calls,
llm_failures=digest.llm_failures,
status=digest.status,
countries=countries,
)
async def _load_digest(
session: AsyncSession,
target_date: date_type | None,
) -> GlobalDigest | None:
"""date 가 주어지면 해당 날짜, 아니면 최신 digest 1건."""
query = select(GlobalDigest).options(selectinload(GlobalDigest.topics))
if target_date is not None:
query = query.where(GlobalDigest.digest_date == target_date)
else:
query = query.order_by(GlobalDigest.digest_date.desc())
query = query.limit(1)
result = await session.execute(query)
return result.scalar_one_or_none()
# ─── Routes ───
@router.get("/latest", response_model=DigestResponse)
async def get_latest(
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
):
"""가장 최근 생성된 global digest."""
digest = await _load_digest(session, target_date=None)
if digest is None:
raise HTTPException(status_code=404, detail="아직 생성된 digest 없음")
return _build_response(digest)
@router.get("", response_model=DigestResponse)
async def get_digest(
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
date: date_type | None = Query(default=None, description="YYYY-MM-DD (KST)"),
country: str | None = Query(default=None, description="국가 코드 (예: KR)"),
):
"""특정 날짜 또는 국가 필터링된 digest. date 미지정 시 최신."""
digest = await _load_digest(session, target_date=date)
if digest is None:
raise HTTPException(
status_code=404,
detail=f"digest 없음 (date={date})" if date else "아직 생성된 digest 없음",
)
country_filter = country.upper() if country else None
return _build_response(digest, country_filter=country_filter)
@router.post("/regenerate")
async def regenerate(
user: Annotated[User, Depends(get_current_user)],
):
"""디버그용 수동 트리거 — 백그라운드 태스크로 워커 실행 (auth 필요)."""
from workers.digest_worker import run
asyncio.create_task(run())
return {"status": "started", "message": "global_digest 워커 백그라운드 실행 시작"}

View File

@@ -142,11 +142,12 @@ async def list_documents(
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
page: int = Query(1, ge=1),
page_size: int = Query(20, ge=1, le=100),
page_size: int = Query(20, ge=1, le=500),
domain: str | None = None,
sub_group: str | None = None,
source: str | None = None,
format: str | None = None,
review_status: str | None = Query(None, description="pending | approved | rejected"),
):
"""문서 목록 조회 (페이지네이션 + 필터, 뉴스 제외)"""
query = select(Document).where(Document.deleted_at == None, Document.source_channel != "news")
@@ -158,6 +159,8 @@ async def list_documents(
query = query.where(Document.source_channel == source)
if format:
query = query.where(Document.file_format == format)
if review_status:
query = query.where(Document.review_status == review_status)
# 전체 건수
count_query = select(func.count()).select_from(query.subquery())

View File

@@ -1,11 +1,16 @@
"""하이브리드 검색 API — orchestrator (Phase 1.1: thin endpoint).
"""하이브리드 검색 API — thin endpoint (Phase 3.1 이후).
retrieval / fusion / rerank 등 실제 로직은 services/search/* 모듈로 분리.
이 파일은 mode 분기, 응답 직렬화, debug 응답 구성, BackgroundTask dispatch만 담당.
실제 검색 파이프라인(retrieval fusion rerank → diversity → confidence)
은 `services/search/search_pipeline.py::run_search()` 로 분리되어 있다.
이 파일은 다음만 담당:
- Pydantic 스키마 (SearchResult / SearchResponse / SearchDebug / DebugCandidate
/ Citation / AskResponse / AskDebug)
- `/search` endpoint wrapper (run_search 호출 + logger + telemetry + 직렬화)
- `/ask` endpoint wrapper (Phase 3.3 에서 추가)
"""
import time
from typing import Annotated
from typing import Annotated, Literal
from fastapi import APIRouter, BackgroundTasks, Depends, Query
from pydantic import BaseModel
@@ -15,20 +20,11 @@ from core.auth import get_current_user
from core.database import get_session
from core.utils import setup_logger
from models.user import User
from services.search.fusion_service import DEFAULT_FUSION, get_strategy, normalize_display_scores
from services.search.rerank_service import (
MAX_CHUNKS_PER_DOC,
MAX_RERANK_INPUT,
apply_diversity,
rerank_chunks,
)
from services.search.retrieval_service import compress_chunks_to_docs, search_text, search_vector
from services.search_telemetry import (
compute_confidence,
compute_confidence_hybrid,
compute_confidence_reranked,
record_search_event,
)
from services.search.evidence_service import EvidenceItem, extract_evidence
from services.search.fusion_service import DEFAULT_FUSION
from services.search.search_pipeline import PipelineResult, run_search
from services.search.synthesis_service import SynthesisResult, synthesize
from services.search_telemetry import record_search_event
# logs/search.log + stdout 동시 출력 (Phase 0.4)
logger = setup_logger("search")
@@ -56,6 +52,10 @@ class SearchResult(BaseModel):
chunk_id: int | None = None
chunk_index: int | None = None
section_title: str | None = None
# Phase 3.1: reranker raw score 보존 (display score drift 방지).
# rerank 경로를 탄 chunk에만 채워짐. normalize_display_scores는 이 필드를
# 건드리지 않는다. Phase 3 evidence fast-path 판단에 사용.
rerank_score: float | None = None
# ─── Phase 0.4: 디버그 응답 스키마 ─────────────────────────
@@ -98,6 +98,29 @@ def _to_debug_candidates(rows: list[SearchResult], n: int = 20) -> list[DebugCan
]
def _build_search_debug(pr: PipelineResult) -> SearchDebug:
"""PipelineResult → SearchDebug (기존 search()의 debug 구성 블록 복사)."""
return SearchDebug(
timing_ms=pr.timing_ms,
text_candidates=(
_to_debug_candidates(pr.text_results)
if pr.text_results or pr.mode != "vector"
else None
),
vector_candidates=(
_to_debug_candidates(pr.vector_results)
if pr.vector_results or pr.mode in ("vector", "hybrid")
else None
),
fused_candidates=(
_to_debug_candidates(pr.results) if pr.mode == "hybrid" else None
),
confidence=pr.confidence_signal,
notes=pr.notes,
query_analysis=pr.query_analysis,
)
@router.get("/", response_model=SearchResponse)
async def search(
q: str,
@@ -115,134 +138,301 @@ async def search(
True,
description="bge-reranker-v2-m3 활성화 (Phase 1.3, hybrid 모드만 동작)",
),
analyze: bool = Query(
False,
description="QueryAnalyzer 활성화 (Phase 2.1, LLM 호출). Phase 2.1은 debug 노출만, 검색 경로 영향 X",
),
debug: bool = Query(False, description="단계별 candidates + timing 응답에 포함"),
):
"""문서 검색 — FTS + ILIKE + 벡터 결합 (Phase 0.5: RRF fusion)"""
timing: dict[str, float] = {}
notes: list[str] = []
text_results: list[SearchResult] = []
vector_results: list[SearchResult] = [] # doc-level (압축 후, fusion 입력)
raw_chunks: list[SearchResult] = [] # chunk-level (raw, Phase 1.3 reranker용)
chunks_by_doc: dict[int, list[SearchResult]] = {} # Phase 1.3 reranker용 보존
t_total = time.perf_counter()
if mode == "vector":
t0 = time.perf_counter()
raw_chunks = await search_vector(session, q, limit)
timing["vector_ms"] = (time.perf_counter() - t0) * 1000
if not raw_chunks:
notes.append("vector_search_returned_empty (AI client error or no embeddings)")
# vector 단독 모드도 doc 압축해서 다양성 확보 (chunk 중복 방지)
vector_results, chunks_by_doc = compress_chunks_to_docs(raw_chunks, limit)
results = vector_results
else:
t0 = time.perf_counter()
text_results = await search_text(session, q, limit)
timing["text_ms"] = (time.perf_counter() - t0) * 1000
if mode == "hybrid":
t1 = time.perf_counter()
raw_chunks = await search_vector(session, q, limit)
timing["vector_ms"] = (time.perf_counter() - t1) * 1000
# chunk-level → doc-level 압축 (raw chunks는 chunks_by_doc에 보존)
t1b = time.perf_counter()
vector_results, chunks_by_doc = compress_chunks_to_docs(raw_chunks, limit)
timing["compress_ms"] = (time.perf_counter() - t1b) * 1000
if not vector_results:
notes.append("vector_search_returned_empty — text-only fallback")
t2 = time.perf_counter()
strategy = get_strategy(fusion)
# fusion은 doc 기준 — 더 넓게 가져옴 (rerank 후보용)
fusion_limit = max(limit * 5, 100) if rerank else limit
fused_docs = strategy.fuse(text_results, vector_results, q, fusion_limit)
timing["fusion_ms"] = (time.perf_counter() - t2) * 1000
notes.append(f"fusion={strategy.name}")
notes.append(
f"chunks raw={len(raw_chunks)} compressed={len(vector_results)} "
f"unique_docs={len(chunks_by_doc)}"
"""문서 검색 — FTS + ILIKE + 벡터 결합 (Phase 3.1 이후 run_search wrapper)"""
pr = await run_search(
session,
q,
mode=mode, # type: ignore[arg-type]
limit=limit,
fusion=fusion,
rerank=rerank,
analyze=analyze,
)
if rerank:
# Phase 1.3: reranker — chunk 기준 입력
# fusion 결과 doc_id로 chunks_by_doc에서 raw chunks 회수
t3 = time.perf_counter()
rerank_input: list[SearchResult] = []
for doc in fused_docs:
chunks = chunks_by_doc.get(doc.id, [])
if chunks:
# doc당 max 2 chunk (latency/VRAM 보호)
rerank_input.extend(chunks[:MAX_CHUNKS_PER_DOC])
else:
# text-only 매치 doc → doc 자체를 chunk처럼 wrap
rerank_input.append(doc)
if len(rerank_input) >= MAX_RERANK_INPUT:
break
rerank_input = rerank_input[:MAX_RERANK_INPUT]
notes.append(f"rerank input={len(rerank_input)}")
reranked = await rerank_chunks(q, rerank_input, limit * 3)
timing["rerank_ms"] = (time.perf_counter() - t3) * 1000
# diversity (chunk → doc 압축, max_per_doc=2, top score>0.90 unlimited)
t4 = time.perf_counter()
results = apply_diversity(reranked, max_per_doc=MAX_CHUNKS_PER_DOC)[:limit]
timing["diversity_ms"] = (time.perf_counter() - t4) * 1000
else:
# rerank 비활성: fused_docs를 그대로 (limit 적용)
results = fused_docs[:limit]
else:
results = text_results
# display score 정규화 — 프론트엔드는 score*100을 % 표시.
# fusion 내부 score(RRF는 0.01~0.05 범위)를 그대로 노출하면 표시가 깨짐.
normalize_display_scores(results)
timing["total_ms"] = (time.perf_counter() - t_total) * 1000
# confidence는 fusion 적용 전 raw 신호로 계산 (Phase 0.5 이후 fused score는 절대값 의미 없음)
# rerank 활성 시 reranker score가 가장 신뢰할 수 있는 신호 → 우선 사용
if mode == "hybrid":
if rerank and "rerank_ms" in timing:
confidence_signal = compute_confidence_reranked(results)
else:
confidence_signal = compute_confidence_hybrid(text_results, vector_results)
elif mode == "vector":
confidence_signal = compute_confidence(vector_results, "vector")
else:
confidence_signal = compute_confidence(text_results, mode)
# 사용자 feedback: 모든 단계 timing은 debug 응답과 별도로 항상 로그로 남긴다
timing_str = " ".join(f"{k}={v:.0f}" for k, v in timing.items())
timing_str = " ".join(f"{k}={v:.0f}" for k, v in pr.timing_ms.items())
fusion_str = f" fusion={fusion}" if mode == "hybrid" else ""
analyzer_str = (
f" analyzer=hit={pr.analyzer_cache_hit}/conf={pr.analyzer_confidence:.2f}/tier={pr.analyzer_tier}"
if analyze
else ""
)
logger.info(
"search query=%r mode=%s%s results=%d conf=%.2f %s",
q[:80], mode, fusion_str, len(results), confidence_signal, timing_str,
"search query=%r mode=%s%s%s results=%d conf=%.2f %s",
q[:80],
pr.mode,
fusion_str,
analyzer_str,
len(pr.results),
pr.confidence_signal,
timing_str,
)
# Phase 0.3: 실패 자동 로깅 (응답 latency에 영향 X — background task)
# Phase 2.1: analyze=true일 때만 analyzer_confidence 전달 (False는 None → 기존 호환)
background_tasks.add_task(
record_search_event, q, user.id, results, mode, confidence_signal
record_search_event,
q,
user.id,
pr.results,
pr.mode,
pr.confidence_signal,
pr.analyzer_confidence if analyze else None,
)
debug_obj: SearchDebug | None = None
if debug:
debug_obj = SearchDebug(
timing_ms=timing,
text_candidates=_to_debug_candidates(text_results) if text_results or mode != "vector" else None,
vector_candidates=_to_debug_candidates(vector_results) if vector_results or mode in ("vector", "hybrid") else None,
fused_candidates=_to_debug_candidates(results) if mode == "hybrid" else None,
confidence=confidence_signal,
notes=notes,
)
debug_obj = _build_search_debug(pr) if debug else None
return SearchResponse(
results=results,
total=len(results),
results=pr.results,
total=len(pr.results),
query=q,
mode=mode,
mode=pr.mode,
debug=debug_obj,
)
# ═══════════════════════════════════════════════════════════
# Phase 3.3: /api/search/ask — Evidence + Grounded Synthesis
# ═══════════════════════════════════════════════════════════
class Citation(BaseModel):
"""answer 본문의 [n] 에 해당하는 근거 단일 행."""
n: int
chunk_id: int | None
doc_id: int
title: str | None
section_title: str | None
span_text: str # evidence LLM 이 추출한 50~300자
full_snippet: str # 원본 800자 (citation 원문 보기 전용)
relevance: float
rerank_score: float
class AskDebug(BaseModel):
"""`/ask?debug=true` 응답 확장."""
timing_ms: dict[str, float]
search_notes: list[str]
query_analysis: dict | None = None
confidence_signal: float
evidence_candidate_count: int
evidence_kept_count: int
evidence_skip_reason: str | None
synthesis_cache_hit: bool
synthesis_prompt_preview: str | None = None
synthesis_raw_preview: str | None = None
hallucination_flags: list[str] = []
class AskResponse(BaseModel):
"""`/ask` 응답. `/search` 의 SearchResult 는 그대로 재사용."""
results: list[SearchResult]
ai_answer: str | None
citations: list[Citation]
synthesis_status: Literal[
"completed", "timeout", "skipped", "no_evidence", "parse_failed", "llm_error"
]
synthesis_ms: float
confidence: Literal["high", "medium", "low"] | None
refused: bool
no_results_reason: str | None
query: str
total: int
debug: AskDebug | None = None
def _map_no_results_reason(
pr: PipelineResult,
evidence: list[EvidenceItem],
ev_skip: str | None,
sr: SynthesisResult,
) -> str | None:
"""사용자에게 보여줄 한국어 메시지 매핑.
Failure mode 표 (plan §Failure Modes) 기반.
"""
# LLM 자가 refused → 모델이 준 사유 그대로
if sr.refused and sr.refuse_reason:
return sr.refuse_reason
# synthesis 상태 우선
if sr.status == "no_evidence":
if not pr.results:
return "검색 결과가 없습니다."
return "관련도 높은 근거를 찾지 못했습니다."
if sr.status == "skipped":
return "검색 결과가 없습니다."
if sr.status == "timeout":
return "답변 생성이 지연되어 생략했습니다. 검색 결과를 확인해 주세요."
if sr.status == "parse_failed":
return "답변 형식 오류로 생략했습니다."
if sr.status == "llm_error":
return "AI 서버에 일시적 문제가 있습니다."
# evidence 단계 실패는 fallback 을 탔더라도 notes 용
if ev_skip == "all_low_rerank":
return "관련도 높은 근거를 찾지 못했습니다."
if ev_skip == "empty_retrieval":
return "검색 결과가 없습니다."
return None
def _build_citations(
evidence: list[EvidenceItem], used_citations: list[int]
) -> list[Citation]:
"""answer 본문에 실제로 등장한 n 만 Citation 으로 변환."""
by_n = {e.n: e for e in evidence}
out: list[Citation] = []
for n in used_citations:
e = by_n.get(n)
if e is None:
continue
out.append(
Citation(
n=e.n,
chunk_id=e.chunk_id,
doc_id=e.doc_id,
title=e.title,
section_title=e.section_title,
span_text=e.span_text,
full_snippet=e.full_snippet,
relevance=e.relevance,
rerank_score=e.rerank_score,
)
)
return out
def _build_ask_debug(
pr: PipelineResult,
evidence: list[EvidenceItem],
ev_skip: str | None,
sr: SynthesisResult,
ev_ms: float,
synth_ms: float,
total_ms: float,
) -> AskDebug:
timing: dict[str, float] = dict(pr.timing_ms)
timing["evidence_ms"] = ev_ms
timing["synthesis_ms"] = synth_ms
timing["ask_total_ms"] = total_ms
# candidate count 는 rule filter 통과한 수 (recomputable from results)
# 엄밀히는 evidence_service 내부 숫자인데, evidence 길이 ≈ kept, candidate
# 는 관측이 어려움 → kept 는 evidence 길이, candidate 는 별도 필드 없음.
# 단순화: candidate_count = len(evidence) 를 상한 근사로 둠 (debug 전용).
return AskDebug(
timing_ms=timing,
search_notes=pr.notes,
query_analysis=pr.query_analysis,
confidence_signal=pr.confidence_signal,
evidence_candidate_count=len(evidence),
evidence_kept_count=len(evidence),
evidence_skip_reason=ev_skip,
synthesis_cache_hit=sr.cache_hit,
synthesis_prompt_preview=None, # 현재 synthesis_service 에서 노출 안 함
synthesis_raw_preview=sr.raw_preview,
hallucination_flags=sr.hallucination_flags,
)
@router.get("/ask", response_model=AskResponse)
async def ask(
q: str,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
background_tasks: BackgroundTasks,
limit: int = Query(10, ge=1, le=20, description="synthesis 입력 상한"),
debug: bool = Query(False, description="evidence/synthesis 중간 상태 노출"),
):
"""근거 기반 AI 답변 (Phase 3.3).
`/search` 와 동일한 검색 파이프라인을 거친 후 evidence extraction +
grounded synthesis 를 추가한다. `mode`, `rerank`, `analyze` 는 품질 보장을
위해 강제 고정 (hybrid / True / True).
실패 경로(timeout/parse_failed/refused/...) 에서도 `results` 는 항상 반환.
"""
t_total = time.perf_counter()
# 1. 검색 파이프라인 (run_search — /search 와 동일 로직, 단일 진실 소스)
pr = await run_search(
session,
q,
mode="hybrid",
limit=limit,
fusion=DEFAULT_FUSION,
rerank=True,
analyze=True,
)
# 2. Evidence extraction (rule + LLM span select, 1 batched call)
t_ev = time.perf_counter()
evidence, ev_skip = await extract_evidence(q, pr.results)
ev_ms = (time.perf_counter() - t_ev) * 1000
# 3. Grounded synthesis (gemma-4, 15s timeout, citation 검증)
t_synth = time.perf_counter()
sr = await synthesize(q, evidence, debug=debug)
synth_ms = (time.perf_counter() - t_synth) * 1000
total_ms = (time.perf_counter() - t_total) * 1000
# 4. 응답 구성
citations = _build_citations(evidence, sr.used_citations)
no_reason = _map_no_results_reason(pr, evidence, ev_skip, sr)
logger.info(
"ask query=%r results=%d evidence=%d cite=%d synth=%s conf=%s refused=%s ev_ms=%.0f synth_ms=%.0f total=%.0f",
q[:80],
len(pr.results),
len(evidence),
len(citations),
sr.status,
sr.confidence or "-",
sr.refused,
ev_ms,
synth_ms,
total_ms,
)
# 5. telemetry — 기존 record_search_event 재사용 (Phase 0.3 호환)
background_tasks.add_task(
record_search_event,
q,
user.id,
pr.results,
"hybrid",
pr.confidence_signal,
pr.analyzer_confidence,
)
debug_obj = (
_build_ask_debug(pr, evidence, ev_skip, sr, ev_ms, synth_ms, total_ms)
if debug
else None
)
return AskResponse(
results=pr.results,
ai_answer=sr.answer,
citations=citations,
synthesis_status=sr.status,
synthesis_ms=sr.elapsed_ms,
confidence=sr.confidence,
refused=sr.refused,
no_results_reason=no_reason,
query=q,
total=len(pr.results),
debug=debug_obj,
)

View File

@@ -95,7 +95,8 @@ async def _run_migrations(conn) -> None:
applied = {row[0] for row in result}
# migration 파일 스캔
migrations_dir = Path(__file__).resolve().parent.parent.parent / "migrations"
# /app/core/database.py → parent.parent = /app → /app/migrations (volume mount 위치)
migrations_dir = Path(__file__).resolve().parent.parent / "migrations"
if not migrations_dir.is_dir():
logger.info("[migration] migrations/ 디렉토리 없음, 스킵")
return
@@ -114,7 +115,10 @@ async def _run_migrations(conn) -> None:
sql = path.read_text(encoding="utf-8")
_validate_sql_content(name, sql)
logger.info(f"[migration] {name} 실행 중...")
await conn.execute(text(sql))
# raw driver SQL 사용 — text() 의 :name bind parameter 해석으로
# SQL 주석/literal 에 콜론이 들어가면 InvalidRequestError 발생.
# exec_driver_sql 은 SQL 을 driver(asyncpg) 에 그대로 전달.
await conn.exec_driver_sql(sql)
await conn.execute(
text("INSERT INTO schema_migrations (version, name) VALUES (:v, :n)"),
{"v": version, "n": name},

View File

@@ -8,6 +8,7 @@ from sqlalchemy import func, select, text
from api.auth import router as auth_router
from api.dashboard import router as dashboard_router
from api.digest import router as digest_router
from api.documents import router as documents_router
from api.news import router as news_router
from api.search import router as search_router
@@ -20,9 +21,13 @@ from models.user import User
@asynccontextmanager
async def lifespan(app: FastAPI):
"""앱 시작/종료 시 실행되는 lifespan 핸들러"""
import asyncio
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.cron import CronTrigger
from services.search.query_analyzer import prewarm_analyzer
from workers.daily_digest import run as daily_digest_run
from workers.digest_worker import run as global_digest_run
from workers.file_watcher import watch_inbox
from workers.law_monitor import run as law_monitor_run
from workers.mailplus_archive import run as mailplus_run
@@ -51,9 +56,19 @@ async def lifespan(app: FastAPI):
scheduler.add_job(mailplus_run, CronTrigger(hour=7), id="mailplus_morning")
scheduler.add_job(mailplus_run, CronTrigger(hour=18), id="mailplus_evening")
scheduler.add_job(daily_digest_run, CronTrigger(hour=20), id="daily_digest")
scheduler.add_job(global_digest_run, CronTrigger(hour=4, minute=0), id="global_digest")
scheduler.add_job(news_collector_run, "interval", hours=6, id="news_collector")
scheduler.start()
# Phase 2.1 (async 구조): QueryAnalyzer prewarm.
# 대표 쿼리 15~20개를 background task로 분석해 cache 적재.
# 첫 사용자 요청부터 cache hit rate 70~80% 목표.
# 논블로킹 — startup을 막지 않음. MLX 부하 완화 위해 delay_between=0.5.
prewarm_task = asyncio.create_task(prewarm_analyzer())
prewarm_task.add_done_callback(
lambda t: t.exception() and None # 예외는 query_analyzer 내부에서 로깅
)
yield
# 종료: 스케줄러 → DB 순서로 정리
@@ -76,6 +91,7 @@ app.include_router(search_router, prefix="/api/search", tags=["search"])
app.include_router(dashboard_router, prefix="/api/dashboard", tags=["dashboard"])
app.include_router(news_router, prefix="/api/news", tags=["news"])
app.include_router(digest_router, prefix="/api/digest", tags=["digest"])
# TODO: Phase 5에서 추가
# app.include_router(tasks.router, prefix="/api/tasks", tags=["tasks"])

87
app/models/digest.py Normal file
View File

@@ -0,0 +1,87 @@
"""global_digests + digest_topics 테이블 ORM (Phase 4)"""
from datetime import date, datetime
from sqlalchemy import (
BigInteger,
Boolean,
Date,
DateTime,
Float,
ForeignKey,
Integer,
String,
Text,
)
from sqlalchemy.dialects.postgresql import JSONB
from sqlalchemy.orm import Mapped, mapped_column, relationship
from core.database import Base
class GlobalDigest(Base):
"""하루 단위 digest run 메타데이터"""
__tablename__ = "global_digests"
id: Mapped[int] = mapped_column(BigInteger, primary_key=True)
digest_date: Mapped[date] = mapped_column(Date, nullable=False, unique=True)
window_start: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False)
window_end: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False)
decay_lambda: Mapped[float] = mapped_column(Float, nullable=False)
total_articles: Mapped[int] = mapped_column(Integer, nullable=False, default=0)
total_countries: Mapped[int] = mapped_column(Integer, nullable=False, default=0)
total_topics: Mapped[int] = mapped_column(Integer, nullable=False, default=0)
generation_ms: Mapped[int | None] = mapped_column(Integer)
llm_calls: Mapped[int] = mapped_column(Integer, nullable=False, default=0)
llm_failures: Mapped[int] = mapped_column(Integer, nullable=False, default=0)
status: Mapped[str] = mapped_column(String(20), nullable=False, default="success")
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), nullable=False, default=datetime.now
)
topics: Mapped[list["DigestTopic"]] = relationship(
back_populates="digest",
cascade="all, delete-orphan",
order_by="DigestTopic.country, DigestTopic.topic_rank",
)
class DigestTopic(Base):
"""country × topic 단위 cluster 결과"""
__tablename__ = "digest_topics"
id: Mapped[int] = mapped_column(BigInteger, primary_key=True)
digest_id: Mapped[int] = mapped_column(
BigInteger,
ForeignKey("global_digests.id", ondelete="CASCADE"),
nullable=False,
)
country: Mapped[str] = mapped_column(String(10), nullable=False)
topic_rank: Mapped[int] = mapped_column(Integer, nullable=False)
topic_label: Mapped[str] = mapped_column(Text, nullable=False)
summary: Mapped[str] = mapped_column(Text, nullable=False)
article_ids: Mapped[list] = mapped_column(JSONB, nullable=False)
article_count: Mapped[int] = mapped_column(Integer, nullable=False)
importance_score: Mapped[float] = mapped_column(Float, nullable=False)
raw_weight_sum: Mapped[float] = mapped_column(Float, nullable=False)
centroid_sample: Mapped[dict | None] = mapped_column(JSONB)
llm_model: Mapped[str | None] = mapped_column(String(100))
llm_fallback_used: Mapped[bool] = mapped_column(
Boolean, nullable=False, default=False
)
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), nullable=False, default=datetime.now
)
digest: Mapped["GlobalDigest"] = relationship(back_populates="topics")

View File

@@ -0,0 +1,19 @@
너는 팩트 기반 뉴스 토픽 요약 도우미다.
아래는 같은 사건으로 군집된 기사들의 ai_summary다.
이 정보만으로 다음을 JSON으로만 출력하라.
절대 금지:
- 제공된 summary에 없는 사실 추가
- 해석/비교/예측/의견
- "보인다", "~할 것이다", "~할 전망" 같은 추측 표현
- 인용부호 안 원문 외 단어 생성
- JSON 외의 모든 텍스트 (설명, 마크다운, 코드블록 금지)
출력 형식 (JSON 객체 하나만 출력):
{
"topic_label": "5~10 단어의 한국어 제목",
"summary": "1~2 문장, 사실만, 수동태 허용"
}
기사 요약:
{articles_block}

View File

@@ -0,0 +1,76 @@
You are an evidence span extractor. Respond ONLY in JSON. No markdown, no explanation.
## Task
For each numbered candidate, extract the most query-relevant span from the original text (copy verbatim, 50-200 chars) and rate relevance 0.0~1.0. If the candidate does not directly answer the query, set span=null, relevance=0.0, skip_reason.
## Output Schema
{
"items": [
{
"n": 1,
"span": "...",
"relevance": 0.0,
"skip_reason": null
}
]
}
## Rules
- `n`: candidate 번호 (1-based, 입력 순서와 동일). **모든 n을 반환** (skip된 것도 포함).
- `span`: 원문에서 **그대로 복사한** 50~200자. 요약/변형 금지. 원문에 없는 단어는 절대 포함하지 말 것. 여러 문장이어도 무방.
- 관련 span이 없으면 `span: null`, `relevance: 0.0`, `skip_reason`에 한 줄 사유.
- `relevance`: 0.0~1.0 float
- 0.9+ query에 직접 답함
- 0.7~0.9 강한 연관
- 0.5~0.7 부분 연관
- <0.5 약한/무관 (fallback에서 탈락)
- `skip_reason`: span=null 일 때만 필수. 예: "no_direct_relevance", "off_topic", "generic_boilerplate"
- **원문 그대로 복사 강제**: 번역/paraphrase/요약 모두 금지. evidence span은 citation 원문이 되어야 한다.
## Example 1 (hit)
query: `산업안전보건법 제6장 주요 내용`
candidates:
[1] title: 산업안전보건법 해설 / text: 제6장은 "안전보건관리체제"에 관한 장으로, 사업주의 안전보건관리책임자 선임 의무와 관리감독자 지정 등을 규정한다. 제15조부터 제19조까지 구성된다...
[2] title: 회사 복지 규정 / text: 직원의 연차휴가 사용 규정과 경조사 지원 내용을 담고 있다...
{
"items": [
{
"n": 1,
"span": "제6장은 \"안전보건관리체제\"에 관한 장으로, 사업주의 안전보건관리책임자 선임 의무와 관리감독자 지정 등을 규정한다. 제15조부터 제19조까지 구성된다",
"relevance": 0.95,
"skip_reason": null
},
{
"n": 2,
"span": null,
"relevance": 0.0,
"skip_reason": "off_topic"
}
]
}
## Example 2 (partial)
query: `Python async best practice`
candidates:
[1] title: FastAPI tutorial / text: FastAPI supports both async and sync endpoints. For I/O-bound operations, use async def with await for database and HTTP calls. Avoid blocking calls in async functions or use run_in_executor...
{
"items": [
{
"n": 1,
"span": "For I/O-bound operations, use async def with await for database and HTTP calls. Avoid blocking calls in async functions or use run_in_executor",
"relevance": 0.82,
"skip_reason": null
}
]
}
## Query
{query}
## Candidates
{numbered_candidates}

View File

@@ -0,0 +1,53 @@
You are a search query analyzer. Respond ONLY in JSON. No markdown, no explanation.
## Output Schema
{
"intent": "fact_lookup | semantic_search | filter_browse",
"query_type": "natural_language | keyword | phrase",
"domain_hint": "document | news | mixed",
"language_scope": "limited | global",
"keywords": [],
"must_terms": [],
"optional_terms": [],
"hard_filters": {},
"soft_filters": {"domain": [], "document_type": []},
"normalized_queries": [{"lang": "ko", "text": "...", "weight": 1.0}],
"expanded_terms": [],
"synonyms": {},
"analyzer_confidence": 0.0
}
## Rules
- `intent`: fact_lookup (사실/조항/이름), semantic_search (주제/개념), filter_browse (필터 중심)
- `query_type`: natural_language (문장형), keyword (단어 나열), phrase (따옴표/고유명사/법조항)
- `domain_hint`: document (소유 문서/법령/매뉴얼), news (시사/뉴스), mixed (불명)
- `language_scope`: limited (ko+en), global (다국어 필요)
- `hard_filters`: 쿼리에 **명시된** 것만. 추론 금지. 키: file_format, year, country
- `soft_filters.domain`: Industrial_Safety, Programming, Engineering, Philosophy, Language, General. 2-level 허용(e.g. Industrial_Safety/Legislation)
- `soft_filters.document_type`: Law_Document, Manual, Report, Academic_Paper, Standard, Specification, Meeting_Minutes, Checklist, Note, Memo, Reference, Drawing, Template
- `normalized_queries`: 원문 언어 1.0 가중치 필수. 교차언어 1개 추가 권장(ko↔en, weight 0.8). news + global 인 경우만 ja/zh 추가(weight 0.5~0.6). **최대 3개**.
- `analyzer_confidence`: 0.9+ 명확, 0.7~0.9 대체로 명확, 0.5~0.7 모호, <0.5 분석 불가
## Example
query: `기계 사고 관련 법령`
{
"intent": "semantic_search",
"query_type": "natural_language",
"domain_hint": "document",
"language_scope": "limited",
"keywords": ["기계", "사고", "법령"],
"must_terms": [],
"optional_terms": ["안전", "규정"],
"hard_filters": {},
"soft_filters": {"domain": ["Industrial_Safety/Legislation"], "document_type": ["Law_Document"]},
"normalized_queries": [
{"lang": "ko", "text": "기계 사고 관련 법령", "weight": 1.0},
{"lang": "en", "text": "machinery accident related laws", "weight": 0.8}
],
"expanded_terms": ["산업안전", "기계안전"],
"synonyms": {},
"analyzer_confidence": 0.88
}
## Query
{query}

View File

@@ -0,0 +1,80 @@
You are a grounded answer synthesizer. Respond ONLY in JSON. No markdown, no explanation.
## Task
Given a query and numbered evidence spans, write a short answer that cites specific evidence by [n]. **You may only use facts that appear in the evidence.** If the evidence does not directly answer the query, set `refused: true`.
## Output Schema
{
"answer": "...",
"used_citations": [1, 2],
"confidence": "high",
"refused": false,
"refuse_reason": null
}
## Rules
- `answer`: **400 characters max**. Must contain inline `[n]` citations. Every claim sentence ends with at least one `[n]`. Multiple sources: `[1][3]`. **Only use facts present in evidence. No outside knowledge, no guessing, no paraphrasing what is not there.**
- `used_citations`: integer list of `n` values that actually appear in `answer` (for cross-check). Must be sorted ascending, no duplicates.
- `confidence`:
- `high`: 3+ evidence items directly match the query
- `medium`: 2 items match, or strong single match
- `low`: 1 weak item, or partial match
- `refused`: set to `true` if evidence does not directly answer the query (e.g. off-topic, too generic, missing key facts). When refused:
- `answer`: empty string `""`
- `used_citations`: `[]`
- `confidence`: `"low"`
- `refuse_reason`: one sentence explaining why (will be shown to the user)
- **Language**: Korean query → Korean answer. English query → English answer. Match query language.
- **Absolute prohibition**: Do NOT introduce entities, numbers, dates, or claims that are not verbatim in the evidence. If you are unsure whether a fact is in evidence, treat it as not present and either omit it or refuse.
## Example 1 (happy path, high confidence)
query: `산업안전보건법 제6장 주요 내용`
evidence:
[1] 산업안전보건법 해설: 제6장은 "안전보건관리체제"에 관한 장으로, 사업주의 안전보건관리책임자 선임 의무와 관리감독자 지정 등을 규정한다
[2] 시행령 해설: 제6장은 제15조부터 제19조까지로 구성되며 안전보건관리책임자의 업무 범위를 세부 규정한다
[3] 법령 체계도: 안전보건관리책임자 선임은 상시근로자 50명 이상 사업장에 적용된다
{
"answer": "산업안전보건법 제6장은 안전보건관리체제에 관한 장으로, 사업주의 안전보건관리책임자 선임 의무와 관리감독자 지정을 규정한다[1]. 제15조부터 제19조까지 구성되며 관리책임자의 업무 범위를 세부 규정한다[2]. 상시근로자 50명 이상 사업장에 적용된다[3].",
"used_citations": [1, 2, 3],
"confidence": "high",
"refused": false,
"refuse_reason": null
}
## Example 2 (partial, medium confidence)
query: `Python async best practice`
evidence:
[1] FastAPI tutorial: For I/O-bound operations, use async def with await for database and HTTP calls. Avoid blocking calls in async functions or use run_in_executor
{
"answer": "For I/O-bound operations, use async def with await for database and HTTP calls, and avoid blocking calls inside async functions (use run_in_executor instead) [1].",
"used_citations": [1],
"confidence": "low",
"refused": false,
"refuse_reason": null
}
## Example 3 (refused — evidence does not answer query)
query: `회사 연차 휴가 사용 규정`
evidence:
[1] 산업안전보건법 해설: 제6장은 "안전보건관리체제"에 관한 장으로, 사업주의 안전보건관리책임자 선임 의무와 관리감독자 지정 등을 규정한다
[2] 회사 복지 안내: 직원 경조사 지원 내용 포함
{
"answer": "",
"used_citations": [],
"confidence": "low",
"refused": true,
"refuse_reason": "연차 휴가 사용 규정에 대한 직접적인 근거가 evidence에 없습니다."
}
## Query
{query}
## Evidence
{numbered_evidence}

View File

@@ -0,0 +1 @@
"""Phase 4 Global Digest 서비스 레이어 — 7일 뉴스 batch clustering + summarization."""

View File

@@ -0,0 +1,118 @@
"""Time-decay weight + adaptive threshold + EMA centroid greedy clustering.
플랜의 핵심 결정:
- λ = ln(2)/3 (3일 반감기)
- threshold: 0.75 / 0.78 / 0.80 (밀도 기반 adaptive)
- centroid: EMA α=0.7 (단순 평균의 seed bias / drift 방어)
- min_articles_per_topic = 3, max_topics_per_country = 10
- importance_score: country 내 0~1 normalize + max(score, 0.01) floor
- raw_weight_sum 별도 보존 (cross-day 트렌드 분석용)
"""
import math
from datetime import datetime, timezone
import numpy as np
from core.utils import setup_logger
logger = setup_logger("digest_clustering")
LAMBDA = math.log(2) / 3 # 3일 반감기 — 사용자 확정값
CENTROID_ALPHA = 0.7 # EMA: 기존 중심 70% 유지, 새 멤버 30% 반영
MIN_ARTICLES_PER_TOPIC = 3
MAX_TOPICS_PER_COUNTRY = 10
SCORE_FLOOR = 0.01 # UI 0 표시 문제 사전 차단
def adaptive_threshold(n_docs: int) -> float:
"""문서 밀도 기반 동적 threshold — fragmentation/blob 동시 방어."""
if n_docs > 200:
return 0.80
if n_docs < 50:
return 0.75
return 0.78
def _normalize(v: np.ndarray) -> np.ndarray:
norm = float(np.linalg.norm(v))
if norm == 0.0:
return v
return v / norm
def _decay_weight(now: datetime, created_at: datetime) -> float:
"""exp(-λ * days_ago). created_at 이 naive 면 UTC 가정."""
if created_at.tzinfo is None:
created_at = created_at.replace(tzinfo=timezone.utc)
days = (now - created_at).total_seconds() / 86400.0
if days < 0:
days = 0.0
return math.exp(-LAMBDA * days)
def cluster_country(country: str, docs: list[dict]) -> list[dict]:
"""단일 country 의 docs 를 cluster 로 묶어 정렬 + normalize 후 반환.
Args:
country: 국가 코드 (KR, US, ...)
docs: loader.load_news_window 의 출력 (단일 country 슬라이스)
Returns:
[{centroid, members, weight_sum, raw_weight_sum, importance_score}, ...]
- members 는 weight 가 채워진 doc dict 리스트
- 정렬: importance_score 내림차순, 최대 MAX_TOPICS_PER_COUNTRY 개
"""
if not docs:
logger.info(f"[{country}] docs=0 → skip")
return []
threshold = adaptive_threshold(len(docs))
now = datetime.now(timezone.utc)
# time-decay weight 계산 + 가중치 높은 순으로 seed 우선
for d in docs:
d["weight"] = _decay_weight(now, d["created_at"])
docs.sort(key=lambda d: -d["weight"])
clusters: list[dict] = []
for d in docs:
v = _normalize(d["embedding"])
best_idx, best_sim = -1, 0.0
for i, c in enumerate(clusters):
sim = float(np.dot(c["centroid"], v))
if sim > best_sim and sim >= threshold:
best_sim, best_idx = sim, i
if best_idx >= 0:
c = clusters[best_idx]
# EMA centroid update — drift 방지
c["centroid"] = CENTROID_ALPHA * c["centroid"] + (1.0 - CENTROID_ALPHA) * v
c["centroid"] = _normalize(c["centroid"])
c["members"].append(d)
c["weight_sum"] += d["weight"]
else:
clusters.append({
"centroid": v,
"members": [d],
"weight_sum": d["weight"],
})
raw_count = len(clusters)
clusters = [c for c in clusters if len(c["members"]) >= MIN_ARTICLES_PER_TOPIC]
dropped = raw_count - len(clusters)
clusters.sort(key=lambda c: -c["weight_sum"])
clusters = clusters[:MAX_TOPICS_PER_COUNTRY]
# country 내 normalize (0~1) + floor
if clusters:
max_w = max(c["weight_sum"] for c in clusters)
for c in clusters:
normalized = (c["weight_sum"] / max_w) if max_w > 0 else 0.0
c["raw_weight_sum"] = c["weight_sum"]
c["importance_score"] = max(normalized, SCORE_FLOOR)
logger.info(
f"[{country}] docs={len(docs)} threshold={threshold} "
f"raw_clusters={raw_count} dropped={dropped} kept={len(clusters)}"
)
return clusters

View File

@@ -0,0 +1,149 @@
"""뉴스 7일 window 로드 + country 정규화
- documents 테이블엔 country 컬럼이 없으므로 document_chunks.country 를 first non-null 로 조인.
- chunk-level country 도 NULL 이면 news_sources.name prefix(ai_sub_group) 매칭으로 fallback.
- 그래도 NULL 이면 drop(로그 경고).
- ai_summary / embedding 이 NULL 이면 처음부터 제외 (재요약/재임베딩 0회 원칙).
"""
from collections import defaultdict
from datetime import datetime
from typing import Any
import numpy as np
from sqlalchemy import text
from core.database import async_session
from core.utils import setup_logger
logger = setup_logger("digest_loader")
_NEWS_WINDOW_SQL = text("""
SELECT
d.id,
d.title,
d.ai_summary,
d.embedding,
d.created_at,
d.edit_url,
d.ai_sub_group,
(
SELECT c.country
FROM document_chunks c
WHERE c.doc_id = d.id AND c.country IS NOT NULL
LIMIT 1
) AS chunk_country
FROM documents d
WHERE d.source_channel = 'news'
AND d.deleted_at IS NULL
AND d.created_at >= :window_start
AND d.created_at < :window_end
AND d.embedding IS NOT NULL
AND d.ai_summary IS NOT NULL
""")
_SOURCE_COUNTRY_SQL = text("""
SELECT name, country FROM news_sources WHERE country IS NOT NULL
""")
def _to_numpy_embedding(raw: Any) -> np.ndarray | None:
"""pgvector 컬럼을 numpy array(float32)로 정규화.
raw SQL + asyncpg 조합에서 pgvector type 이 등록 안 되어 있으면
embedding 이 '[0.1,0.2,...]' 같은 string 으로 반환된다. ORM 을 안 쓰므로
이 경우 직접 파싱해야 한다.
"""
if raw is None:
return None
if isinstance(raw, str):
import json
try:
raw = json.loads(raw)
except json.JSONDecodeError:
return None
try:
arr = np.asarray(raw, dtype=np.float32)
except (TypeError, ValueError):
return None
if arr.size == 0:
return None
return arr
async def _load_source_country_map(session) -> dict[str, str]:
"""news_sources name → country 매핑.
name 은 '경향신문 문화' 형태이고 documents.ai_sub_group 은 '경향신문' (split[0]).
prefix 매칭이 가능하도록 첫 토큰 → country 로 인덱싱.
"""
rows = await session.execute(_SOURCE_COUNTRY_SQL)
mapping: dict[str, str] = {}
for name, country in rows:
if not name or not country:
continue
prefix = name.split(" ")[0].strip()
if prefix and prefix not in mapping:
mapping[prefix] = country
return mapping
async def load_news_window(
window_start: datetime,
window_end: datetime,
) -> dict[str, list[dict]]:
"""주어진 윈도우 안의 뉴스 documents 를 country 별 dict 로 반환.
Returns:
{"KR": [doc_dict, ...], "US": [...], ...}
"""
docs_by_country: dict[str, list[dict]] = defaultdict(list)
null_country_count = 0
total = 0
async with async_session() as session:
source_country = await _load_source_country_map(session)
result = await session.execute(
_NEWS_WINDOW_SQL,
{"window_start": window_start, "window_end": window_end},
)
for row in result.mappings():
embedding = _to_numpy_embedding(row["embedding"])
if embedding is None:
continue
country = row["chunk_country"]
if not country:
# news_sources prefix fallback
ai_sub_group = (row["ai_sub_group"] or "").strip()
if ai_sub_group:
country = source_country.get(ai_sub_group)
if not country:
null_country_count += 1
continue
country = country.upper()
docs_by_country[country].append({
"id": int(row["id"]),
"title": row["title"] or "",
"ai_summary": row["ai_summary"] or "",
"embedding": embedding,
"created_at": row["created_at"],
"edit_url": row["edit_url"] or "",
"ai_sub_group": row["ai_sub_group"] or "",
})
total += 1
if null_country_count:
logger.warning(
f"[loader] country 분류 실패로 drop된 문서 {null_country_count}"
f"(chunk_country + news_sources fallback 모두 실패)"
)
logger.info(
f"[loader] window {window_start.date()} ~ {window_end.date()}"
f"{total}건 ({len(docs_by_country)}개 국가)"
)
return dict(docs_by_country)

View File

@@ -0,0 +1,177 @@
"""Phase 4 digest pipeline orchestration.
Step:
1. AIClient 생성
2. 7일 window 로 documents 로드 (loader)
3. country 별 cluster_country (clustering)
4. cluster 별 select_for_llm (selection)
5. cluster 별 summarize_cluster_with_fallback (summarizer, LLM)
6. DELETE+INSERT 단일 트랜잭션 (idempotent)
7. start/end 로그 + generation_ms + fallback 비율 health metric
"""
import hashlib
import time
from datetime import datetime, timedelta, timezone
from zoneinfo import ZoneInfo
from sqlalchemy import delete
from ai.client import AIClient
from core.database import async_session
from core.utils import setup_logger
from models.digest import DigestTopic, GlobalDigest
from .clustering import LAMBDA, cluster_country
from .loader import load_news_window
from .selection import select_for_llm
from .summarizer import summarize_cluster_with_fallback
logger = setup_logger("digest_pipeline")
WINDOW_DAYS = 7
KST = ZoneInfo("Asia/Seoul")
def _kst_today() -> datetime:
return datetime.now(KST).date()
def _summary_hash(text: str) -> str:
return hashlib.sha256((text or "").encode("utf-8")).hexdigest()[:16]
def _build_topic_row(
country: str,
rank: int,
cluster: dict,
selected: list[dict],
llm_result: dict,
primary_model: str,
) -> DigestTopic:
"""LLM 결과 + cluster 메타 → DigestTopic ORM 인스턴스.
article_ids 는 코드가 cluster.members 에서 직접 주입 (LLM 생성 금지 → id 위조 불가).
"""
article_ids = [int(m["id"]) for m in cluster["members"]]
centroid_sample = {
"selected_doc_ids": [int(m["id"]) for m in selected],
"summary_hashes": [_summary_hash(m.get("ai_summary") or "") for m in selected],
}
return DigestTopic(
country=country,
topic_rank=rank,
topic_label=llm_result["topic_label"],
summary=llm_result["summary"],
article_ids=article_ids,
article_count=len(article_ids),
importance_score=float(cluster["importance_score"]),
raw_weight_sum=float(cluster["raw_weight_sum"]),
centroid_sample=centroid_sample,
llm_model=primary_model,
llm_fallback_used=bool(llm_result["llm_fallback_used"]),
)
async def run_digest_pipeline() -> dict:
"""전체 파이프라인 실행. worker entry 에서 호출.
Returns:
실행 통계 dict {llm_calls, fallback_used, total_topics, generation_ms}
"""
start = time.time()
window_end = datetime.now(timezone.utc)
window_start = window_end - timedelta(days=WINDOW_DAYS)
digest_date = _kst_today()
logger.info(
f"[global_digest] start window={window_start.date()} ~ {window_end.date()} "
f"digest_date={digest_date} decay_lambda={LAMBDA:.4f}"
)
docs_by_country = await load_news_window(window_start, window_end)
if not docs_by_country:
logger.warning("[global_digest] 7일 window에 뉴스 0건 — digest 생성 스킵")
return {
"llm_calls": 0,
"fallback_used": 0,
"total_topics": 0,
"generation_ms": int((time.time() - start) * 1000),
}
client = AIClient()
primary_model = client.ai.primary.model
all_topic_rows: list[DigestTopic] = []
stats = {"llm_calls": 0, "fallback_used": 0}
try:
for country, docs in docs_by_country.items():
clusters = cluster_country(country, docs)
if not clusters:
continue # sparse country 자동 제외
for rank, cluster in enumerate(clusters, start=1):
selected = select_for_llm(cluster)
stats["llm_calls"] += 1
llm_result = await summarize_cluster_with_fallback(client, cluster, selected)
if llm_result["llm_fallback_used"]:
stats["fallback_used"] += 1
all_topic_rows.append(
_build_topic_row(country, rank, cluster, selected, llm_result, primary_model)
)
finally:
await client.close()
generation_ms = int((time.time() - start) * 1000)
total_articles = sum(len(d) for d in docs_by_country.values())
countries_with_topics = len({r.country for r in all_topic_rows})
if stats["fallback_used"] == 0:
status = "success"
elif stats["llm_calls"] and stats["fallback_used"] / stats["llm_calls"] > 0.5:
status = "failed"
else:
status = "partial"
async with async_session() as session:
# idempotent: 같은 날짜 row 가 있으면 CASCADE 로 topics 까지 삭제
await session.execute(
delete(GlobalDigest).where(GlobalDigest.digest_date == digest_date)
)
new_digest = GlobalDigest(
digest_date=digest_date,
window_start=window_start,
window_end=window_end,
decay_lambda=LAMBDA,
total_articles=total_articles,
total_countries=countries_with_topics,
total_topics=len(all_topic_rows),
generation_ms=generation_ms,
llm_calls=stats["llm_calls"],
llm_failures=stats["fallback_used"],
status=status,
)
new_digest.topics = all_topic_rows
session.add(new_digest)
await session.commit()
fallback_pct = (
(stats["fallback_used"] / stats["llm_calls"] * 100.0)
if stats["llm_calls"] else 0.0
)
logger.info(
f"[global_digest] done countries={countries_with_topics} "
f"topics={len(all_topic_rows)} llm_calls={stats['llm_calls']} "
f"fallback={stats['fallback_used']}/{stats['llm_calls']} ({fallback_pct:.2f}%) "
f"status={status} elapsed={generation_ms / 1000:.1f}s"
)
return {
"llm_calls": stats["llm_calls"],
"fallback_used": stats["fallback_used"],
"total_topics": len(all_topic_rows),
"generation_ms": generation_ms,
"status": status,
}

View File

@@ -0,0 +1,62 @@
"""Cluster 내 LLM 입력 선정 — top-k + MMR diversity + ai_summary truncate.
순수 top-relevance 는 동일 사건 중복 요약문에 편향되므로 MMR 로 다양성 확보.
ai_summary 길이는 LLM 토큰 보호를 위해 SUMMARY_TRUNCATE 로 제한.
"""
import numpy as np
K_PER_CLUSTER = 5
LAMBDA_MMR = 0.7 # relevance 70% / diversity 30%
SUMMARY_TRUNCATE = 300 # long tail ai_summary 방어
def _normalize(v: np.ndarray) -> np.ndarray:
norm = float(np.linalg.norm(v))
if norm == 0.0:
return v
return v / norm
def select_for_llm(cluster: dict, k: int = K_PER_CLUSTER) -> list[dict]:
"""cluster 내 LLM 호출용 대표 article 들 선정.
Args:
cluster: clustering.cluster_country 결과 단일 cluster
k: 선정 개수 (기본 5)
Returns:
선정된 doc dict 리스트. 각 항목에 ai_summary_truncated 필드가 추가됨.
"""
members = cluster["members"]
if len(members) <= k:
selected = list(members)
else:
centroid = cluster["centroid"]
# relevance = centroid 유사도 × decay weight
for m in members:
v = _normalize(m["embedding"])
m["_rel"] = float(np.dot(centroid, v)) * m["weight"]
first = max(members, key=lambda x: x["_rel"])
selected = [first]
candidates = [m for m in members if m is not first]
while len(selected) < k and candidates:
def mmr_score(c: dict) -> float:
v = _normalize(c["embedding"])
max_sim = max(
float(np.dot(v, _normalize(s["embedding"])))
for s in selected
)
return LAMBDA_MMR * c["_rel"] - (1.0 - LAMBDA_MMR) * max_sim
pick = max(candidates, key=mmr_score)
selected.append(pick)
candidates.remove(pick)
# LLM 입력 토큰 보호
for m in selected:
m["ai_summary_truncated"] = (m.get("ai_summary") or "")[:SUMMARY_TRUNCATE]
return selected

View File

@@ -0,0 +1,123 @@
"""Cluster-level LLM 호출 + JSON 파싱 + timeout + drop금지 fallback.
핵심 결정:
- AIClient._call_chat 직접 호출 (client.py 수정 회피, fallback 로직 재사용)
- Semaphore(1) 로 MLX 과부하 회피
- Per-call timeout 25초 (asyncio.wait_for) — MLX hang/Ollama stall 방어
- JSON 파싱 실패 → 1회 재시도 → 그래도 실패 시 minimal fallback (drop 금지)
- fallback: topic_label="주요 뉴스 묶음", summary = top member ai_summary[:200]
"""
import asyncio
from pathlib import Path
from typing import Any
from ai.client import parse_json_response
from core.utils import setup_logger
logger = setup_logger("digest_summarizer")
LLM_CALL_TIMEOUT = 25 # 초. MLX 평균 5초 + tail latency 마진
FALLBACK_SUMMARY_LIMIT = 200
_llm_sem = asyncio.Semaphore(1)
_PROMPT_PATH = Path(__file__).resolve().parent.parent.parent / "prompts" / "digest_topic.txt"
_PROMPT_TEMPLATE: str | None = None
def _load_prompt() -> str:
global _PROMPT_TEMPLATE
if _PROMPT_TEMPLATE is None:
_PROMPT_TEMPLATE = _PROMPT_PATH.read_text(encoding="utf-8")
return _PROMPT_TEMPLATE
def build_prompt(selected: list[dict]) -> str:
"""digest_topic.txt 템플릿에 selected article들의 ai_summary_truncated 주입.
템플릿 placeholder: {articles_block}
"""
template = _load_prompt()
lines = []
for i, m in enumerate(selected, start=1):
text = (m.get("ai_summary_truncated") or m.get("ai_summary") or m.get("title") or "").strip()
lines.append(f"[{i}] {text}")
articles_block = "\n".join(lines)
return template.replace("{articles_block}", articles_block)
async def _try_call_llm(client: Any, prompt: str) -> str:
"""Semaphore + per-call timeout 으로 감싼 단일 호출."""
async with _llm_sem:
return await asyncio.wait_for(
client._call_chat(client.ai.primary, prompt),
timeout=LLM_CALL_TIMEOUT,
)
def _make_fallback(cluster: dict) -> dict:
"""cluster 의 top member 데이터로 minimal fallback 생성 — 정보 손실 회피."""
members = cluster["members"]
if not members:
return {
"topic_label": "주요 뉴스 묶음",
"summary": "",
"llm_fallback_used": True,
}
top = max(members, key=lambda m: m.get("_rel", m.get("weight", 0.0)))
text = (top.get("ai_summary") or top.get("title") or "").strip()
return {
"topic_label": "주요 뉴스 묶음",
"summary": text[:FALLBACK_SUMMARY_LIMIT],
"llm_fallback_used": True,
}
async def summarize_cluster_with_fallback(
client: Any,
cluster: dict,
selected: list[dict],
) -> dict:
"""cluster 1개에 대해 LLM 호출 + JSON 파싱 + fallback.
Returns:
{topic_label, summary, llm_fallback_used}
"""
prompt = build_prompt(selected)
for attempt in range(2): # 1회 재시도 포함
try:
raw = await _try_call_llm(client, prompt)
except asyncio.TimeoutError:
logger.warning(
f"LLM 호출 timeout {LLM_CALL_TIMEOUT}s "
f"(attempt={attempt + 1}, cluster size={len(cluster['members'])})"
)
continue
except Exception as e:
logger.warning(
f"LLM 호출 실패 attempt={attempt + 1} "
f"(cluster size={len(cluster['members'])}): {e}"
)
continue
parsed = parse_json_response(raw)
if (
parsed
and isinstance(parsed.get("topic_label"), str)
and isinstance(parsed.get("summary"), str)
and parsed["topic_label"].strip()
and parsed["summary"].strip()
):
return {
"topic_label": parsed["topic_label"].strip(),
"summary": parsed["summary"].strip(),
"llm_fallback_used": False,
}
logger.warning(
f"JSON 파싱 실패 attempt={attempt + 1} "
f"(cluster size={len(cluster['members'])}, raw_len={len(raw) if raw else 0})"
)
return _make_fallback(cluster)

View File

@@ -1,5 +1,407 @@
"""Evidence extraction 서비스 (Phase 3).
"""Evidence extraction 서비스 (Phase 3.2).
reranked chunks에서 query-relevant span을 rule + LLM hybrid로 추출.
구현은 Phase 3에서 채움.
reranker 결과 chunks 에서 query-relevant span 을 구조적으로 추출한다.
## 설계 (EV-A: Rule + LLM span select)
```
reranked results
[rule filter] score >= 0.25, max_per_doc=2, top MAX_EVIDENCE_CANDIDATES
[snippet 재윈도우] _extract_window(full, query, 800) — LLM 입력용
[1 batched LLM call] gemma-4 via get_mlx_gate() (single inference)
[post-process]
- relevance >= 0.5 필터
- span too-short (< 80자) → _extract_window(full, query, 120) 로 재확장
- span too-long (> 300자) → cut
- doc-group ordering (검색 결과 doc 순서 유지, doc 내부만 relevance desc)
- n 재부여 (1..N)
EvidenceItem 리스트
```
## 영구 룰
- **LLM 호출은 1번만** (batched). 순차 호출 절대 금지 — MLX single-inference
큐가 폭발한다.
- **모든 MLX 호출은 `get_mlx_gate()` 경유**. analyzer / synthesis 와 동일
semaphore 공유.
- **fallback span 도 query 중심 window**. `full_snippet[:200]` 같은 "앞에서부터
자르기" 절대 금지. 조용한 품질 붕괴 (citation 은 멀쩡한데 실제 span 이 query
와 무관) 대표 사례.
- **Span too-short 보정 필수**: `len(span) < 80` 이면 자동 확장. "짧을수록
정확" 이 아니라 **짧으면 위험** — synthesis LLM 이 문맥 부족으로 이어 만들기
(soft hallucination) 를 한다.
- **Evidence ordering 은 doc-group 유지**. 전역 relevance desc 정렬 금지.
answer 는 [1][2][3] 순서로 생성되고 그 순서가 문맥 흐름을 결정한다.
## 확장 여지 (지금은 비활성)
`EVIDENCE_FAST_PATH_THRESHOLD` 가 `None` 이 아니고 `results[0].rerank_score >=
THRESHOLD` 이면 LLM 호출 스킵 후 rule-only 경로로 즉시 반환. Activation 조건:
(1) evidence LLM 호출 비율 > 80%, (2) /ask 평균 latency > 15s, (3) rerank
top1 p50 > 0.75. 셋 다 충족해야 켠다.
"""
from __future__ import annotations
import asyncio
import time
from dataclasses import dataclass, field
from typing import TYPE_CHECKING
from ai.client import AIClient, _load_prompt, parse_json_response
from core.utils import setup_logger
from .llm_gate import get_mlx_gate
from .rerank_service import _extract_window
if TYPE_CHECKING:
from api.search import SearchResult
logger = setup_logger("evidence")
# ─── 상수 (plan 영구 룰) ─────────────────────────────────
EVIDENCE_MIN_RERANK = 0.25 # 1차 rule cut — rerank score 이 미만은 제외
MAX_EVIDENCE_CANDIDATES = 6 # LLM 입력 상한
MAX_PER_DOC = 2
CANDIDATE_SNIPPET_CHARS = 800 # LLM 이 볼 원문 창 크기
MIN_RELEVANCE_KEEP = 0.5 # LLM 출력 필터
SPAN_MIN_CHARS = 80 # 이 미만이면 window enlarge
SPAN_ENLARGE_TARGET = 120 # enlarge 시 재윈도우 target_chars
SPAN_MAX_CHARS = 300 # 이 초과면 cut (synthesis token budget 보호)
LLM_TIMEOUT_MS = 15000
PROMPT_VERSION = "v1"
# 확장 여지 — None 이면 비활성 (baseline). 실측 후 0.8 등으로 켠다.
EVIDENCE_FAST_PATH_THRESHOLD: float | None = None
# ─── 반환 타입 ───────────────────────────────────────────
@dataclass(slots=True)
class EvidenceItem:
"""LLM 또는 rule fallback 이 추출한 단일 evidence span.
n 은 doc-group ordering + relevance 정렬 후 1부터 재부여된다.
`full_snippet` 은 **synthesis 프롬프트에 절대 포함 금지** — debug / citation
원문 보기 전용.
"""
n: int # 1-based, synthesis 프롬프트의 [n] 과 매핑
chunk_id: int | None
doc_id: int
title: str | None
section_title: str | None
span_text: str # LLM 추출 (또는 rule fallback) span, 80~300자
relevance: float # LLM 0~1 (fallback 시 rerank_score 복사)
rerank_score: float # raw reranker 점수
full_snippet: str # 원본 800자 (debug/citation 전용, synthesis 금지)
# ─── 프롬프트 로딩 (module 초기화 1회) ───────────────────
try:
EVIDENCE_PROMPT = _load_prompt("evidence_extract.txt")
except FileNotFoundError:
EVIDENCE_PROMPT = ""
logger.warning(
"evidence_extract.txt not found — evidence_service will always use rule-only fallback"
)
# ─── Helper: candidates → LLM 입력 블록 ──────────────────
def _build_numbered_candidates(
candidates: list["SearchResult"], query: str
) -> tuple[str, list[str]]:
"""LLM 프롬프트의 {numbered_candidates} 블록 + 재윈도우된 full_snippet 리스트.
Returns:
(block_str, full_snippets) — full_snippets[i] 는 1-based n=i+1 의 원문
"""
lines: list[str] = []
full_snippets: list[str] = []
for i, c in enumerate(candidates, 1):
title = (c.title or "").strip()
raw_text = c.snippet or ""
full = _extract_window(raw_text, query, target_chars=CANDIDATE_SNIPPET_CHARS)
full_snippets.append(full)
lines.append(f"[{i}] title: {title} / text: {full}")
return "\n".join(lines), full_snippets
# ─── Helper: span length 보정 ───────────────────────────
def _normalize_span(span: str, full: str, query: str) -> tuple[str, bool]:
"""span 을 SPAN_MIN_CHARS ~ SPAN_MAX_CHARS 범위로 보정.
Returns:
(normalized_span, was_expanded)
- was_expanded=True 이면 "short_span_expanded" 로그 대상
"""
s = (span or "").strip()
expanded = False
if len(s) < SPAN_MIN_CHARS:
# soft hallucination 방어 — query 중심으로 window 재확장
s = _extract_window(full, query, target_chars=SPAN_ENLARGE_TARGET)
expanded = True
if len(s) > SPAN_MAX_CHARS:
s = s[:SPAN_MAX_CHARS]
return s, expanded
# ─── Helper: doc-group ordering ─────────────────────────
def _apply_doc_group_ordering(
items: list[EvidenceItem],
results: list["SearchResult"],
) -> list[EvidenceItem]:
"""검색 결과 doc 순서 유지 + doc 내부만 relevance desc + n 재부여.
answer 는 [1][2][3] 순서로 생성되고 그 순서가 문맥 흐름을 결정한다.
전역 relevance desc 정렬은 "doc A span1 → doc B span1 → doc A span2"
처럼 튀면서 읽기 이상한 답변을 만든다.
"""
if not items:
return []
doc_order: dict[int, int] = {}
for idx, r in enumerate(results):
if r.id not in doc_order:
doc_order[r.id] = idx
# 정렬: (doc 순서, -relevance)
items.sort(
key=lambda it: (doc_order.get(it.doc_id, 9999), -it.relevance)
)
# n 재부여
for new_n, it in enumerate(items, 1):
it.n = new_n
return items
# ─── Helper: rule-only fallback ─────────────────────────
def _build_rule_only_evidence(
candidates: list["SearchResult"],
full_snippets: list[str],
query: str,
) -> list[EvidenceItem]:
"""LLM 실패/timeout 시 rule-only 경로.
⚠ `full_snippet[:200]` 같은 앞자르기 금지. 반드시 `_extract_window` 로
query 중심 윈도우를 만든다. relevance 는 rerank_score 복사.
"""
items: list[EvidenceItem] = []
for i, (c, full) in enumerate(zip(candidates, full_snippets), 1):
span = _extract_window(full, query, target_chars=200)
# 정규화 (보통 여기서는 SPAN_MIN_CHARS 이상이지만 안전장치)
span, _expanded = _normalize_span(span, full, query)
items.append(
EvidenceItem(
n=i,
chunk_id=c.chunk_id,
doc_id=c.id,
title=c.title,
section_title=c.section_title,
span_text=span,
relevance=float(c.rerank_score or c.score or 0.0),
rerank_score=float(c.rerank_score or c.score or 0.0),
full_snippet=full,
)
)
return items
# ─── Core: extract_evidence ─────────────────────────────
async def extract_evidence(
query: str,
results: list["SearchResult"],
ai_client: AIClient | None = None,
) -> tuple[list[EvidenceItem], str | None]:
"""reranked results → EvidenceItem 리스트.
Returns:
(items, skip_reason)
skip_reason ∈ {None, "empty_retrieval", "all_low_rerank", "fast_path",
"llm_timeout_fallback_rule", "llm_error_fallback_rule",
"parse_failed_fallback_rule", "all_llm_rejected"}
- skip_reason 이 None 이 아니어도 items 는 비어있지 않을 수 있다
(fallback/fast_path 경로).
"""
if not results:
return [], "empty_retrieval"
# ── 1차 rule filter: rerank_score >= EVIDENCE_MIN_RERANK + max_per_doc ──
candidates: list["SearchResult"] = []
per_doc: dict[int, int] = {}
for r in results:
raw_score = r.rerank_score if r.rerank_score is not None else r.score
if raw_score is None or raw_score < EVIDENCE_MIN_RERANK:
continue
if per_doc.get(r.id, 0) >= MAX_PER_DOC:
continue
candidates.append(r)
per_doc[r.id] = per_doc.get(r.id, 0) + 1
if len(candidates) >= MAX_EVIDENCE_CANDIDATES:
break
if not candidates:
return [], "all_low_rerank"
# ── Fast-path (현재 비활성) ─────────────────────────
if EVIDENCE_FAST_PATH_THRESHOLD is not None:
# ⚠ display score 가 아니라 raw rerank_score 로 판단.
# normalize_display_scores 를 거친 r.score 는 frontend 용 리스케일
# 값이라 distribution drift 가능. fast-path 는 reranker raw 신호가 안전.
top_rerank = (
results[0].rerank_score if results[0].rerank_score is not None else 0.0
)
if top_rerank is not None and top_rerank >= EVIDENCE_FAST_PATH_THRESHOLD:
_block, full_snippets = _build_numbered_candidates(candidates, query)
items = _build_rule_only_evidence(candidates, full_snippets, query)
items = _apply_doc_group_ordering(items, results)
logger.info(
"evidence fast_path query=%r candidates=%d kept=%d top_rerank=%.2f",
query[:80], len(candidates), len(items), top_rerank,
)
return items, "fast_path"
# ── LLM 호출 준비 ───────────────────────────────────
if not EVIDENCE_PROMPT:
# 프롬프트 미로딩 → rule-only
_block, full_snippets = _build_numbered_candidates(candidates, query)
items = _build_rule_only_evidence(candidates, full_snippets, query)
items = _apply_doc_group_ordering(items, results)
logger.warning(
"evidence prompt_not_loaded → rule fallback query=%r kept=%d",
query[:80], len(items),
)
return items, "llm_error_fallback_rule"
block, full_snippets = _build_numbered_candidates(candidates, query)
prompt = EVIDENCE_PROMPT.replace("{query}", query).replace(
"{numbered_candidates}", block
)
client_owned = False
if ai_client is None:
ai_client = AIClient()
client_owned = True
t_start = time.perf_counter()
raw: str | None = None
llm_error: str | None = None
try:
# ⚠ semaphore 대기는 timeout 바깥. timeout 은 실제 LLM 호출에만.
async with get_mlx_gate():
async with asyncio.timeout(LLM_TIMEOUT_MS / 1000):
raw = await ai_client._call_chat(ai_client.ai.primary, prompt)
except asyncio.TimeoutError:
llm_error = "timeout"
except Exception as exc:
llm_error = f"llm_error:{type(exc).__name__}"
finally:
if client_owned:
try:
await ai_client.close()
except Exception:
pass
elapsed_ms = (time.perf_counter() - t_start) * 1000
# ── LLM 실패 → rule fallback ────────────────────────
if llm_error is not None:
items = _build_rule_only_evidence(candidates, full_snippets, query)
items = _apply_doc_group_ordering(items, results)
logger.warning(
"evidence LLM %s → rule fallback query=%r candidates=%d kept=%d elapsed_ms=%.0f",
llm_error, query[:80], len(candidates), len(items), elapsed_ms,
)
return items, "llm_timeout_fallback_rule" if llm_error == "timeout" else "llm_error_fallback_rule"
parsed = parse_json_response(raw or "")
if not isinstance(parsed, dict) or not isinstance(parsed.get("items"), list):
items = _build_rule_only_evidence(candidates, full_snippets, query)
items = _apply_doc_group_ordering(items, results)
logger.warning(
"evidence parse_failed → rule fallback query=%r raw=%r elapsed_ms=%.0f",
query[:80], (raw or "")[:200], elapsed_ms,
)
return items, "parse_failed_fallback_rule"
# ── LLM 출력 파싱 ──────────────────────────────────
short_span_expanded = 0
llm_items: list[EvidenceItem] = []
for entry in parsed["items"]:
if not isinstance(entry, dict):
continue
try:
n_raw = int(entry.get("n", 0))
except (TypeError, ValueError):
continue
if n_raw < 1 or n_raw > len(candidates):
continue
try:
relevance = float(entry.get("relevance", 0.0) or 0.0)
except (TypeError, ValueError):
relevance = 0.0
if relevance < MIN_RELEVANCE_KEEP:
continue
span_raw = entry.get("span")
if not isinstance(span_raw, str) or not span_raw.strip():
continue
candidate = candidates[n_raw - 1]
full = full_snippets[n_raw - 1]
span, expanded = _normalize_span(span_raw, full, query)
if expanded:
short_span_expanded += 1
llm_items.append(
EvidenceItem(
n=n_raw, # doc-group ordering 에서 재부여됨
chunk_id=candidate.chunk_id,
doc_id=candidate.id,
title=candidate.title,
section_title=candidate.section_title,
span_text=span,
relevance=relevance,
rerank_score=float(
candidate.rerank_score
if candidate.rerank_score is not None
else (candidate.score or 0.0)
),
full_snippet=full,
)
)
# ── LLM 이 전부 reject → rule fallback ──────────────
if not llm_items:
items = _build_rule_only_evidence(candidates, full_snippets, query)
items = _apply_doc_group_ordering(items, results)
logger.warning(
"evidence all_llm_rejected → rule fallback query=%r elapsed_ms=%.0f",
query[:80], elapsed_ms,
)
return items, "all_llm_rejected"
# ── doc-group ordering + n 재부여 ───────────────────
llm_items = _apply_doc_group_ordering(llm_items, results)
logger.info(
"evidence ok query=%r candidates=%d kept=%d short_span_expanded=%d elapsed_ms=%.0f",
query[:80], len(candidates), len(llm_items), short_span_expanded, elapsed_ms,
)
return llm_items, None

View File

@@ -219,6 +219,62 @@ def get_strategy(name: str) -> FusionStrategy:
return cls()
# ─── Phase 2.3: soft filter boost ───────────────────────
SOFT_FILTER_MAX_BOOST = 0.05 # plan 룰 (CRITICAL)
# ↑ RRF score는 0.01~0.05 범위 (k=60). 상한 초과 시 기존 랭킹 왜곡.
# 기존 RRFWithBoost의 legal article boost(0.05)와 동일 최대값 → 일관성.
SOFT_FILTER_DOMAIN_BOOST = 0.01 # 2026-04-08 실측: 0.03은 exact_keyword -0.03 악화
# ↑ 낮게 잡는 이유: soft_filter는 "같은 도메인 doc을 동등하게 boost" → exact match
# doc의 상대 우위가 손상됨. 0.01 수준이면 fusion 내부 순위 역전 확률 최소.
def apply_soft_filter_boost(
results: list["SearchResult"],
soft_filters: dict | None,
) -> int:
"""Phase 2.3 — QueryAnalyzer soft_filters.domain 기반 약한 score boost.
ai_domain 정확 매칭 시 SOFT_FILTER_DOMAIN_BOOST(0.01) 1회 가산.
document_type 매칭은 v0.1 평가셋에서 효과 측정 불가 + false positive 많음 → 제외.
Args:
results: fusion 직후 SearchResult 리스트 (in-place 수정)
soft_filters: query_analysis.soft_filters = {"domain": [...]}
Returns:
int — boost 적용된 결과 개수 (debug/notes용)
"""
if not soft_filters:
return 0
domain_list = [str(d).lower() for d in soft_filters.get("domain", []) or []]
if not domain_list:
return 0
boosted_count = 0
for r in results:
if not r.ai_domain:
continue
ai_dom_lower = r.ai_domain.lower()
# 정확 매칭 또는 subdirectory 매칭 ("Industrial_Safety/Legislation" → "industrial_safety" 매칭)
matched = False
for d in domain_list:
if d == ai_dom_lower:
matched = True
break
# path 레벨 매칭: "industrial_safety/legislation" in "industrial_safety/legislation/act"
if d in ai_dom_lower and "/" in d:
matched = True
break
if matched:
r.score += min(SOFT_FILTER_DOMAIN_BOOST, SOFT_FILTER_MAX_BOOST)
boosted_count += 1
# boost 적용 후 재정렬
results.sort(key=lambda x: x.score, reverse=True)
return boosted_count
# ─── display score 정규화 ────────────────────────────────

View File

@@ -0,0 +1,58 @@
"""MLX single-inference 전역 gate (Phase 3.1.1).
Mac mini MLX primary(gemma-4-26b-a4b-it-8bit)는 **single-inference**다.
동시 호출이 들어오면 queue가 폭발한다(실측: 23 concurrent 요청 → 22개 15초 timeout).
이 모듈은 analyzer / evidence / synthesis 등 **모든 MLX-bound LLM 호출**이
공유하는 `asyncio.Semaphore(1)`를 제공한다. MLX를 호출하는 경로는 예외 없이
`async with get_mlx_gate():` 블록 안에서만 `AIClient._call_chat(ai.primary, ...)`
를 호출해야 한다.
## 영구 룰
- **MLX primary 호출 경로는 예외 없이 gate 획득 필수**. query_analyzer /
evidence_service / synthesis_service 세 곳이 현재 사용자. 이후 경로가 늘어도
동일 gate를 import해서 사용한다. 새 Semaphore를 만들지 말 것 (큐 분할 시
동시 실행 발생).
- **`asyncio.timeout(...)`은 gate 안쪽에서만 적용**. gate 대기 자체에 timeout을
걸면 "대기만으로 timeout 발동" 버그가 재발한다(query_analyzer 초기 이슈).
- **fallback(Ollama) 경로는 gate 제외**. GPU Ollama는 concurrent OK. 단 현재
구현상 `AIClient._call_chat` 내부에서 primary→fallback 전환이 일어나므로
fallback도 gate 점유 상태로 실행된다. 허용 가능(fallback 빈도 낮음).
- **MLX concurrency는 `MLX_CONCURRENCY = 1` 고정**. 모델이 바뀌어도 single-
inference 특성이 깨지지 않는 한 이 값을 올리지 말 것.
## 확장 여지 (지금은 구현하지 않음)
트래픽 증가 시 "우선순위 역전"(/ask가 analyzer background task 뒤에 밀림)이
문제가 되면 `asyncio.PriorityQueue` 기반 우선순위 큐로 교체 가능. Gate 자체
분리(get_analyzer_gate / get_ask_gate)는 single-inference에서 throughput
개선이 없으므로 의미 없음.
"""
from __future__ import annotations
import asyncio
# MLX primary는 single-inference → 1
MLX_CONCURRENCY = 1
# 첫 호출 시 현재 event loop에 바인딩된 Semaphore 생성 (lazy init)
_mlx_gate: asyncio.Semaphore | None = None
def get_mlx_gate() -> asyncio.Semaphore:
"""MLX primary 호출 경로 공용 gate. 최초 호출 시 lazy init.
사용 예:
async with get_mlx_gate():
async with asyncio.timeout(LLM_TIMEOUT_MS / 1000):
raw = await ai_client._call_chat(ai_client.ai.primary, prompt)
⚠ `asyncio.timeout`은 반드시 gate 안쪽에 둘 것. 바깥에 두면 gate 대기만으로
timeout이 발동한다.
"""
global _mlx_gate
if _mlx_gate is None:
_mlx_gate = asyncio.Semaphore(MLX_CONCURRENCY)
return _mlx_gate

View File

@@ -1,5 +1,443 @@
"""Query analyzer — 자연어 쿼리 분석 (Phase 2).
"""Query analyzer — 자연어 쿼리 분석 (Phase 2.1, async-only 구조).
domain_hint, intent, hard/soft filter, normalized_queries 등 추출.
구현은 Phase 2에서 채움.
**핵심 철학** (memory `feedback_analyzer_async_only.md`):
> QueryAnalyzer는 "즉시 실행하는 기능"이 아니라 "미리 준비해두는 기능"이다.
Retrieval 경로에서 analyzer를 **동기 호출 금지**.
동기 호출 가능한 API는 prewarm 전용.
## Pipeline
```
query → retrieval (항상 즉시)
↘ trigger_background_analysis (fire-and-forget)
→ analyze() [5초+] → cache 저장
다음 호출 (동일 쿼리) → get_cached() 히트 → Phase 2 파이프라인 활성화
```
## 룰 (plan 영구)
- `LLM_TIMEOUT_MS = 5000` (background 이므로 여유 OK)
- `MAX_NORMALIZED_QUERIES = 3` (multilingual explosion 방지)
- weight 합 = 1.0 정규화 필수 (fusion 왜곡 방지)
- 실패/저신뢰(< 0.5) 결과는 캐시 금지 (잘못된 분석 고정 방지)
- `analyzer_confidence` default `float 0.0` 강제 (None 금지)
- analyze() 동기 호출 금지. retrieval 경로는 `get_cached()` + `trigger_background_analysis()` 만 사용.
"""
from __future__ import annotations
import asyncio
import hashlib
import time
from typing import Any
from ai.client import AIClient, _load_prompt, parse_json_response
from core.config import settings
from core.utils import setup_logger
from .llm_gate import get_mlx_gate
logger = setup_logger("query_analyzer")
# ─── 상수 (plan 영구 룰) ────────────────────────────────
PROMPT_VERSION = "v2" # prompts/query_analyze.txt 축소판
CACHE_TTL = 86400 # 24h
CACHE_MAXSIZE = 1000
LLM_TIMEOUT_MS = 15000 # async 구조 (background), 동기 경로 금지
# ↑ 실측: gemma-4-26b-a4b-it-8bit MLX, 축소 프롬프트(prompt_tok=802) 7~11초.
# generation이 dominant (max_tokens 무효, 자연 EOS ~289 tok 생성).
# background 실행이라 15초도 안전. 상향 필요 시 여기서만 조정.
LLM_CONCURRENCY = 1 # MLX는 single-inference, 동시 호출 시 queue 폭발.
# ↑ 실측: 23 concurrent 요청 → 22개 15초 timeout. semaphore로 순차 강제.
# prewarm/background/동기 호출 모두 이 semaphore 경유.
MIN_CACHE_CONFIDENCE = 0.5 # 이 미만은 캐시 금지
MAX_NORMALIZED_QUERIES = 3
def _model_version() -> str:
"""현재 primary 모델 ID를 캐시 키에 반영."""
if settings.ai and settings.ai.primary:
return settings.ai.primary.model
return "unknown-model"
# ─── in-memory LRU (FIFO 근사) ──────────────────────────
_CACHE: dict[str, dict[str, Any]] = {}
# background task 참조 유지 (premature GC 방지)
_PENDING: set[asyncio.Task[Any]] = set()
# 동일 쿼리 중복 실행 방지 (진행 중인 쿼리 집합)
_INFLIGHT: set[str] = set()
def _get_llm_semaphore() -> asyncio.Semaphore:
"""MLX single-inference gate를 반환. Phase 3.1부터 llm_gate.get_mlx_gate()
로 위임 — analyzer / evidence / synthesis 가 동일 semaphore 공유.
`LLM_CONCURRENCY` 상수는 하위 호환/문서용으로 유지하되, 실제 bound는
`llm_gate.MLX_CONCURRENCY` 가 담당한다.
"""
return get_mlx_gate()
def _cache_key(query: str) -> str:
raw = f"{query}|{PROMPT_VERSION}|{_model_version()}"
return hashlib.sha256(raw.encode("utf-8")).hexdigest()
def get_cached(query: str) -> dict | None:
"""TTL 경과 entry는 자동 삭제. 없으면 None.
retrieval 경로에서 cache hit 판단용으로 호출. 호출 자체는 O(1).
"""
key = _cache_key(query)
entry = _CACHE.get(key)
if not entry:
return None
if time.time() - entry["ts"] > CACHE_TTL:
_CACHE.pop(key, None)
return None
return entry["value"]
def set_cached(query: str, value: dict) -> None:
"""저신뢰(< 0.5) / 빈 값은 캐시 금지. 상한 초과 시 FIFO eviction."""
if not value:
return
try:
conf = float(value.get("analyzer_confidence", 0.0) or 0.0)
except (TypeError, ValueError):
conf = 0.0
if conf < MIN_CACHE_CONFIDENCE:
return
key = _cache_key(query)
if key in _CACHE:
_CACHE[key] = {"value": value, "ts": time.time()}
return
if len(_CACHE) >= CACHE_MAXSIZE:
try:
oldest = next(iter(_CACHE))
_CACHE.pop(oldest, None)
except StopIteration:
pass
_CACHE[key] = {"value": value, "ts": time.time()}
def cache_stats() -> dict[str, int]:
"""debug/운영용 — 현재 캐시 크기 + inflight 수."""
return {
"size": len(_CACHE),
"maxsize": CACHE_MAXSIZE,
"inflight": len(_INFLIGHT),
"pending_tasks": len(_PENDING),
}
# ─── weight 정규화 (fusion 왜곡 방지) ───────────────────
def _normalize_weights(analysis: dict) -> dict:
"""normalized_queries를 MAX_NORMALIZED_QUERIES로 자르고 weight 합=1.0 정규화."""
queries = analysis.get("normalized_queries")
if not isinstance(queries, list) or not queries:
return analysis
sanitized: list[dict] = []
for q in queries[:MAX_NORMALIZED_QUERIES]:
if not isinstance(q, dict):
continue
lang = str(q.get("lang", "")).strip() or "ko"
text = str(q.get("text", "")).strip()
if not text:
continue
try:
w = float(q.get("weight", 1.0))
if w < 0:
w = 0.0
except (TypeError, ValueError):
w = 1.0
sanitized.append({"lang": lang, "text": text, "weight": w})
if not sanitized:
return analysis
total = sum(q["weight"] for q in sanitized)
if total <= 0:
equal = 1.0 / len(sanitized)
for q in sanitized:
q["weight"] = equal
else:
for q in sanitized:
q["weight"] /= total
analysis["normalized_queries"] = sanitized
return analysis
# ─── 프롬프트 로딩 (module 초기화 1회) ──────────────────
try:
ANALYZE_PROMPT = _load_prompt("query_analyze.txt")
except FileNotFoundError:
ANALYZE_PROMPT = ""
logger.warning("query_analyze.txt not found — analyzer will always return fallback")
# ─── 기본 fallback 응답 (None 금지) ─────────────────────
def _fallback(reason: str | None = None) -> dict:
"""LLM 실패/timeout/parse 실패 시 반환. analyzer_confidence는 반드시 float 0.0."""
result: dict[str, Any] = {
"intent": "semantic_search",
"query_type": "keyword",
"domain_hint": "mixed",
"language_scope": "limited",
"keywords": [],
"must_terms": [],
"optional_terms": [],
"hard_filters": {},
"soft_filters": {"domain": [], "document_type": []},
"normalized_queries": [],
"expanded_terms": [],
"synonyms": {},
"analyzer_confidence": 0.0,
}
if reason:
result["_fallback_reason"] = reason
return result
# ─── 메인 LLM 호출 (내부 사용) ──────────────────────────
async def analyze(query: str, ai_client: AIClient | None = None) -> dict:
"""쿼리 분석 결과 반환. 실패 시 analyzer_confidence=0.0 fallback.
**⚠️ 동기 검색 경로에서 직접 호출 금지**. 용도:
- `trigger_background_analysis` 내부 호출
- `prewarm_analyzer` startup 호출
- 디버깅/테스트
Args:
query: 사용자 쿼리 원문
ai_client: AIClient 인스턴스 (없으면 생성 후 자동 close)
Returns:
dict — 최소 `analyzer_confidence` 키는 항상 float로 존재.
"""
if not query or not query.strip():
return _fallback("empty_query")
if not ANALYZE_PROMPT:
return _fallback("prompt_not_loaded")
# cache hit 즉시 반환 (prewarm 재호출 방지)
cached = get_cached(query)
if cached is not None:
return cached
client_owned = False
if ai_client is None:
ai_client = AIClient()
client_owned = True
t_start = time.perf_counter()
semaphore = _get_llm_semaphore()
# ⚠️ 중요: semaphore 대기는 timeout 포함되면 안됨 (대기만 해도 timeout 발동)
# timeout은 실제 LLM 호출 구간에만 적용.
try:
async with semaphore:
async with asyncio.timeout(LLM_TIMEOUT_MS / 1000):
raw = await ai_client._call_chat(
ai_client.ai.primary,
ANALYZE_PROMPT.replace("{query}", query),
)
except asyncio.TimeoutError:
elapsed = (time.perf_counter() - t_start) * 1000
logger.warning(
"query_analyze timeout query=%r elapsed_ms=%.0f (LLM_TIMEOUT_MS=%d)",
query[:80],
elapsed,
LLM_TIMEOUT_MS,
)
return _fallback("timeout")
except Exception as exc:
elapsed = (time.perf_counter() - t_start) * 1000
logger.warning(
"query_analyze LLM error query=%r elapsed_ms=%.0f err=%r",
query[:80],
elapsed,
exc,
)
return _fallback(f"llm_error:{type(exc).__name__}")
finally:
if client_owned:
try:
await ai_client.close()
except Exception:
pass
elapsed_ms = (time.perf_counter() - t_start) * 1000
parsed = parse_json_response(raw)
if not isinstance(parsed, dict):
logger.warning(
"query_analyze parse failed query=%r elapsed_ms=%.0f raw=%r",
query[:80],
elapsed_ms,
(raw or "")[:200],
)
return _fallback("parse_failed")
try:
conf = float(parsed.get("analyzer_confidence", 0.0) or 0.0)
except (TypeError, ValueError):
conf = 0.0
parsed["analyzer_confidence"] = conf
parsed = _normalize_weights(parsed)
logger.info(
"query_analyze ok query=%r conf=%.2f intent=%s domain=%s elapsed_ms=%.0f",
query[:80],
conf,
parsed.get("intent"),
parsed.get("domain_hint"),
elapsed_ms,
)
set_cached(query, parsed)
return parsed
# ─── Background trigger (retrieval 경로에서 사용) ───────
async def _background_analyze(query: str) -> None:
"""Background task wrapper — inflight 집합 관리 + 예외 삼킴."""
try:
await analyze(query)
except Exception as exc:
logger.warning("background analyze crashed query=%r err=%r", query[:80], exc)
finally:
_INFLIGHT.discard(query)
def trigger_background_analysis(query: str) -> bool:
"""retrieval 경로에서 호출. cache miss 시 background task 생성.
- 동기 함수. 즉시 반환.
- 이미 inflight 또는 cache hit이면 아무 작업 X, False 반환.
- 새 task 생성 시 True 반환.
Returns:
bool — task 실제로 생성되었는지 여부
"""
if not query or not query.strip():
return False
if query in _INFLIGHT:
return False
if get_cached(query) is not None:
return False
try:
loop = asyncio.get_running_loop()
except RuntimeError:
logger.warning("trigger_background_analysis called outside event loop")
return False
_INFLIGHT.add(query)
task = loop.create_task(_background_analyze(query))
_PENDING.add(task)
task.add_done_callback(_PENDING.discard)
return True
# ─── Prewarm (app startup) ──────────────────────────────
# 운영에서 자주 발생하는 쿼리 샘플. 통계 기반으로 확장 예정.
DEFAULT_PREWARM_QUERIES: list[str] = [
# fixed 7 (Phase 2 평가셋 core)
"산업안전보건법 제6장",
"기계 사고 관련 법령",
"AI 산업 동향",
"Python async best practice",
"유해화학물질을 다루는 회사가 지켜야 할 안전 의무",
"recent AI safety news from Europe",
"이세상에 존재하지 않는 문서명",
# 법령 관련
"산업안전보건법 시행령",
"화학물질관리법",
"위험성평가 절차",
# 뉴스 관련
"EU AI Act",
"한국 AI 산업",
# 실무
"안전보건 교육 자료",
"사고 조사 보고서",
"MSDS 작성 방법",
]
async def prewarm_analyzer(
queries: list[str] | None = None,
delay_between: float = 0.2,
) -> dict[str, Any]:
"""app startup에서 호출. 대표 쿼리를 미리 분석해 cache에 적재.
Non-blocking으로 사용: `asyncio.create_task(prewarm_analyzer())`.
Args:
queries: 분석할 쿼리 리스트. None이면 DEFAULT_PREWARM_QUERIES 사용.
delay_between: 각 쿼리 간 대기 시간 (MLX 부하 완화).
Returns:
dict — {queries_total, success, failed, elapsed_ms, cache_size_after}
"""
if not ANALYZE_PROMPT:
logger.warning("prewarm skipped — prompt not loaded")
return {"status": "skipped", "reason": "prompt_not_loaded"}
targets = queries if queries is not None else DEFAULT_PREWARM_QUERIES
total = len(targets)
success = 0
failed = 0
t_start = time.perf_counter()
logger.info("prewarm_analyzer start queries=%d timeout_ms=%d", total, LLM_TIMEOUT_MS)
client = AIClient()
try:
for i, q in enumerate(targets, 1):
if get_cached(q) is not None:
logger.info("prewarm skip (already cached) [%d/%d] %r", i, total, q[:40])
success += 1
continue
result = await analyze(q, ai_client=client)
conf = float(result.get("analyzer_confidence", 0.0) or 0.0)
if conf >= MIN_CACHE_CONFIDENCE:
success += 1
logger.info("prewarm ok [%d/%d] conf=%.2f q=%r", i, total, conf, q[:40])
else:
failed += 1
reason = result.get("_fallback_reason", "low_conf")
logger.warning(
"prewarm fail [%d/%d] reason=%s q=%r", i, total, reason, q[:40]
)
if delay_between > 0 and i < total:
await asyncio.sleep(delay_between)
finally:
try:
await client.close()
except Exception:
pass
elapsed_ms = (time.perf_counter() - t_start) * 1000
stats = cache_stats()
logger.info(
"prewarm_analyzer done total=%d success=%d failed=%d elapsed_ms=%.0f cache_size=%d",
total,
success,
failed,
elapsed_ms,
stats["size"],
)
return {
"status": "complete",
"queries_total": total,
"success": success,
"failed": failed,
"elapsed_ms": elapsed_ms,
"cache_size_after": stats["size"],
}

View File

@@ -134,7 +134,12 @@ async def rerank_chunks(
if idx is None or sc is None or idx >= len(candidates):
continue
chunk = candidates[idx]
chunk.score = float(sc)
score = float(sc)
chunk.score = score
# Phase 3.1: reranker raw 점수를 별도 필드에 보존.
# normalize_display_scores가 나중에 .score를 랭크 기반으로 덮어써도
# fast-path 판단에 쓸 수 있는 원본 신호 유지.
chunk.rerank_score = score
chunk.match_reason = (chunk.match_reason or "") + "+rerank"
reranked.append(chunk)
return reranked[:limit]

View File

@@ -1,4 +1,4 @@
"""검색 후보 수집 서비스 (Phase 1.2).
"""검색 후보 수집 서비스 (Phase 1.2 + Phase 2.2 multilingual).
text(documents FTS + trigram) + vector(documents.embedding + chunks.embedding hybrid) 후보를
SearchResult 리스트로 반환.
@@ -10,27 +10,80 @@ Phase 1.2-G: doc + chunks hybrid retrieval 보강.
- documents.embedding (recall robust, 자연어 매칭 강함)
- document_chunks.embedding (precision, segment 매칭)
- 두 SQL 동시 호출 후 doc_id 기준 merge (chunk 가중치 1.2, doc 1.0)
Phase 2.2 추가:
- _QUERY_EMBED_CACHE: bge-m3 query embedding 캐시 (모듈 레벨 LRU, TTL 24h)
- search_vector_multilingual: normalized_queries (lang별 쿼리) 배열 지원
QueryAnalyzer cache hit + analyzer_tier >= merge 일 때만 호출.
- crosslingual_ko_en NDCG 0.53 → 0.65+ 목표
"""
from __future__ import annotations
import asyncio
from typing import TYPE_CHECKING
import hashlib
import time
from typing import TYPE_CHECKING, Any
from sqlalchemy import text
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker
from ai.client import AIClient
from core.database import engine
from core.utils import setup_logger
if TYPE_CHECKING:
from api.search import SearchResult
logger = setup_logger("retrieval_service")
# Hybrid merge 가중치 (1.2-G)
DOC_VECTOR_WEIGHT = 1.0
CHUNK_VECTOR_WEIGHT = 1.2
# ─── Phase 2.2: Query embedding cache ───────────────────
# bge-m3 호출 비용 절반 감소 (동일 normalized_query 재호출 방지)
_QUERY_EMBED_CACHE: dict[str, dict[str, Any]] = {}
QUERY_EMBED_TTL = 86400 # 24h
QUERY_EMBED_MAXSIZE = 500
def _query_embed_key(text_: str) -> str:
return hashlib.sha256(f"{text_}|bge-m3".encode("utf-8")).hexdigest()
async def _get_query_embedding(
client: AIClient, text_: str
) -> list[float] | None:
"""Query embedding with in-memory cache.
동일 텍스트 재호출 시 bge-m3 skip. fixed query 회귀 시 vector_ms 대폭 감소.
"""
if not text_:
return None
key = _query_embed_key(text_)
entry = _QUERY_EMBED_CACHE.get(key)
if entry and time.time() - entry["ts"] < QUERY_EMBED_TTL:
return entry["emb"]
try:
emb = await client.embed(text_)
except Exception as exc:
logger.warning("query embed failed text=%r err=%r", text_[:40], exc)
return None
if len(_QUERY_EMBED_CACHE) >= QUERY_EMBED_MAXSIZE:
try:
oldest = next(iter(_QUERY_EMBED_CACHE))
_QUERY_EMBED_CACHE.pop(oldest, None)
except StopIteration:
pass
_QUERY_EMBED_CACHE[key] = {"emb": emb, "ts": time.time()}
return emb
def query_embed_cache_stats() -> dict[str, int]:
return {"size": len(_QUERY_EMBED_CACHE), "maxsize": QUERY_EMBED_MAXSIZE}
async def search_text(
session: AsyncSession, query: str, limit: int
@@ -153,11 +206,16 @@ async def search_vector(
list[SearchResult] — doc_id 중복 제거됨. compress_chunks_to_docs는 그대로 동작.
chunks_by_doc은 search.py에서 group_by_doc으로 보존.
"""
try:
client = AIClient()
query_embedding = await client.embed(query)
try:
query_embedding = await _get_query_embedding(client, query)
finally:
try:
await client.close()
except Exception:
pass
if query_embedding is None:
return []
embedding_str = str(query_embedding)
@@ -307,6 +365,100 @@ def _merge_doc_and_chunk_vectors(
return sorted(by_doc_id.values(), key=lambda r: r.score, reverse=True)
async def search_vector_multilingual(
session: AsyncSession,
normalized_queries: list[dict],
limit: int,
) -> list["SearchResult"]:
"""Phase 2.2 — 다국어 normalized_queries 배열로 vector retrieval.
각 language query에 대해 embedding을 병렬 생성(cache hit 활용),
각 embedding에 대해 기존 docs+chunks hybrid 호출,
결과를 weight 기반으로 merge.
⚠️ 호출 조건:
- QueryAnalyzer cache hit 이어야 함 (async-only 룰)
- analyzer_confidence 높고 normalized_queries 존재해야 함
- search.py에서만 호출. retrieval 경로 동기 LLM 호출 금지 룰 준수.
Args:
session: AsyncSession (호출자 관리, 본 함수 내부는 sessionmaker로 별도 연결 사용)
normalized_queries: [{"lang": "ko", "text": "...", "weight": 0.56}, ...]
weight는 _normalize_weights로 이미 합=1.0 정규화된 상태.
limit: 상위 결과 개수
Returns:
list[SearchResult] — doc_id 중복 제거. merged score = sum(per-query score * lang_weight).
"""
if not normalized_queries:
return []
# 1. 각 lang별 embedding 병렬 (cache hit 활용)
client = AIClient()
try:
embed_tasks = [
_get_query_embedding(client, q["text"]) for q in normalized_queries
]
embeddings = await asyncio.gather(*embed_tasks)
finally:
try:
await client.close()
except Exception:
pass
# embedding 실패한 query는 skip (weight 재정규화 없이 조용히 drop)
per_query_plan: list[tuple[dict, str]] = []
for q, emb in zip(normalized_queries, embeddings):
if emb is None:
logger.warning("multilingual embed skipped lang=%s", q.get("lang"))
continue
per_query_plan.append((q, str(emb)))
if not per_query_plan:
return []
# 2. 각 embedding에 대해 doc + chunks 병렬 retrieval
Session = async_sessionmaker(engine)
async def _one_query(q_meta: dict, embedding_str: str) -> list["SearchResult"]:
async def _docs() -> list["SearchResult"]:
async with Session() as s:
return await _search_vector_docs(s, embedding_str, limit * 4)
async def _chunks() -> list["SearchResult"]:
async with Session() as s:
return await _search_vector_chunks(s, embedding_str, limit * 4)
doc_r, chunk_r = await asyncio.gather(_docs(), _chunks())
return _merge_doc_and_chunk_vectors(doc_r, chunk_r)
per_query_results = await asyncio.gather(
*(_one_query(q, emb_str) for q, emb_str in per_query_plan)
)
# 3. weight 기반 merge — doc_id 중복 시 weighted score 합산
merged: dict[int, "SearchResult"] = {}
for (q_meta, _emb_str), results in zip(per_query_plan, per_query_results):
weight = float(q_meta.get("weight", 1.0) or 1.0)
for r in results:
weighted = r.score * weight
prev = merged.get(r.id)
if prev is None:
# 첫 방문: 원본을 shallow copy 대신 직접 wrap
r.score = weighted
r.match_reason = f"ml_{q_meta.get('lang', '?')}"
merged[r.id] = r
else:
# 중복: score 누적, 가장 높은 weight 소스로 match_reason 표시
prev.score += weighted
# match_reason 병합 (가독성)
if q_meta.get("lang") and q_meta.get("lang") not in (prev.match_reason or ""):
prev.match_reason = (prev.match_reason or "ml") + f"+{q_meta['lang']}"
sorted_results = sorted(merged.values(), key=lambda r: r.score, reverse=True)
return sorted_results[: limit * 4] # rerank 후보로 넉넉히
def compress_chunks_to_docs(
chunks: list["SearchResult"], limit: int
) -> tuple[list["SearchResult"], dict[int, list["SearchResult"]]]:

View File

@@ -0,0 +1,335 @@
"""검색 파이프라인 오케스트레이션 (Phase 3.1).
`/api/search/` 와 `/api/search/ask` 가 공유하는 단일 진실 소스.
## 순수성 규칙 (영구)
`run_search()`는 wrapper(endpoint)에서 side effect를 최대한 분리한다:
- ❌ **금지**: `BackgroundTasks` 파라미터, `logger.info(...)` 직접 호출,
`record_search_event()` 호출, `SearchResponse`/`AskResponse` 직렬화
- ✅ **허용**: `trigger_background_analysis()` (analyzer cache miss 시
fire-and-forget task — retrieval 전략의 일부, 자가 완결됨)
- ✅ **허용**: retrieval / fusion / rerank / diversity / display 정규화 /
confidence 계산 같은 내부 서비스 호출
반환값은 `PipelineResult` 하나. wrapper가 그 안에서 필요한 필드를 꺼내
logger / telemetry / 응답 직렬화를 수행한다.
## Phase 2 호환
본 모듈은 기존 `app/api/search.py::search()` 함수 본문을 lift-and-shift 한
것이다. 변수명 / notes 문자열 / timing 키 / logger 포맷 은 wrapper 쪽에서
완전히 동일하게 재구성된다. refactor 전후 `/search?debug=true` 응답은
byte-level 에 가깝게 일치해야 한다.
"""
from __future__ import annotations
import time
from dataclasses import dataclass, field
from typing import TYPE_CHECKING, Literal
from sqlalchemy.ext.asyncio import AsyncSession
from . import query_analyzer
from .fusion_service import (
DEFAULT_FUSION,
apply_soft_filter_boost,
get_strategy,
normalize_display_scores,
)
from .rerank_service import (
MAX_CHUNKS_PER_DOC,
MAX_RERANK_INPUT,
apply_diversity,
rerank_chunks,
)
from .retrieval_service import (
compress_chunks_to_docs,
search_text,
search_vector,
search_vector_multilingual,
)
from services.search_telemetry import (
compute_confidence,
compute_confidence_hybrid,
compute_confidence_reranked,
)
if TYPE_CHECKING:
from api.search import SearchResult
# ─── Phase 2.1: analyzer_confidence 3단계 게이트 ──────────
# search.py 에서 이동. search.py 의 /search wrapper 는 이 상수들을
# 노출할 필요 없으므로 파이프라인 모듈에만 둔다.
ANALYZER_TIER_IGNORE = 0.5 # < 0.5 → analyzer 완전 무시, soft_filter 비활성
ANALYZER_TIER_ORIGINAL = 0.7 # < 0.7 → original query fallback
ANALYZER_TIER_MERGE = 0.85 # < 0.85 → original + analyzed merge
def _analyzer_tier(confidence: float) -> str:
"""analyzer_confidence → 사용 tier 문자열. Phase 2.2/2.3에서 실제 분기용."""
if confidence < ANALYZER_TIER_IGNORE:
return "ignore"
if confidence < ANALYZER_TIER_ORIGINAL:
return "original_fallback"
if confidence < ANALYZER_TIER_MERGE:
return "merge"
return "analyzed"
# ─── 반환 타입 ─────────────────────────────────────────────
@dataclass(slots=True)
class PipelineResult:
"""run_search() 반환 — wrapper 가 필요한 모든 state 를 담는다."""
# ── 최종 결과 (API 노출) ──
results: "list[SearchResult]"
mode: str
confidence_signal: float
# ── 중간 단계 (evidence 입력 + debug) ──
text_results: "list[SearchResult]"
vector_results: "list[SearchResult]" # doc 압축 후
raw_chunks: "list[SearchResult]" # chunk 원본 (rerank/evidence용)
chunks_by_doc: "dict[int, list[SearchResult]]"
# ── 쿼리 분석 메타 ──
query_analysis: dict | None
analyzer_cache_hit: bool
analyzer_confidence: float # 항상 float (None 금지)
analyzer_tier: str
# ── 관측 ──
timing_ms: dict[str, float] = field(default_factory=dict)
notes: list[str] = field(default_factory=list)
# ─── 메인 파이프라인 ───────────────────────────────────────
async def run_search(
session: AsyncSession,
q: str,
*,
mode: Literal["fts", "trgm", "vector", "hybrid"] = "hybrid",
limit: int = 20,
fusion: str = DEFAULT_FUSION,
rerank: bool = True,
analyze: bool = False,
) -> PipelineResult:
"""검색 파이프라인 실행.
retrieval → fusion → rerank → diversity → display 정규화 → confidence 계산
까지 수행하고 `PipelineResult` 를 반환한다. logging / BackgroundTasks /
응답 직렬화는 절대 수행하지 않는다 (wrapper 책임).
Args:
session: AsyncSession (caller 가 관리)
q: 사용자 쿼리 원문
mode: fts | trgm | vector | hybrid
limit: 최종 결과 수 (hybrid 에서는 fusion 입력 후보 수는 이보다 넓음)
fusion: legacy | rrf | rrf_boost
rerank: bge-reranker-v2-m3 활성화 (hybrid 전용)
analyze: QueryAnalyzer 활성화 (cache hit 조건부 멀티링구얼 / soft filter)
Returns:
PipelineResult
"""
# 로컬 import — circular 방지 (SearchResult 는 api.search 에 inline 선언)
from api.search import SearchResult # noqa: F401 — TYPE_CHECKING 실런타임 반영
timing: dict[str, float] = {}
notes: list[str] = []
text_results: list["SearchResult"] = []
vector_results: list["SearchResult"] = [] # doc-level (압축 후, fusion 입력)
raw_chunks: list["SearchResult"] = [] # chunk-level (raw, Phase 1.3 reranker용)
chunks_by_doc: dict[int, list["SearchResult"]] = {} # Phase 1.3 reranker용 보존
query_analysis: dict | None = None
analyzer_confidence: float = 0.0
analyzer_tier: str = "disabled"
t_total = time.perf_counter()
# Phase 2.1 (async 구조): QueryAnalyzer는 동기 호출 금지.
# - cache hit → query_analysis 활용 (Phase 2.2/2.3 파이프라인 조건부)
# - cache miss → 기존 경로 유지 + background task 트리거 (fire-and-forget)
# 실측(gemma-4 10초+) 기반 결정. memory: feedback_analyzer_async_only.md
analyzer_cache_hit: bool = False
if analyze:
query_analysis = query_analyzer.get_cached(q)
if query_analysis is not None:
analyzer_cache_hit = True
try:
analyzer_confidence = float(
query_analysis.get("analyzer_confidence", 0.0) or 0.0
)
except (TypeError, ValueError):
analyzer_confidence = 0.0
analyzer_tier = _analyzer_tier(analyzer_confidence)
notes.append(
f"analyzer cache_hit conf={analyzer_confidence:.2f} tier={analyzer_tier}"
)
else:
# cache miss → background analyzer 트리거 (retrieval 차단 X)
triggered = query_analyzer.trigger_background_analysis(q)
analyzer_tier = "cache_miss"
notes.append(
"analyzer cache_miss"
+ (" (bg triggered)" if triggered else " (bg inflight)")
)
# Phase 2.2: multilingual vector search 활성 조건 (보수적)
# - cache hit + analyzer_tier == "analyzed" (≥0.85 고신뢰)
# - normalized_queries 2개 이상 (lang 다양성 있음)
# - domain_hint == "news" 또는 language_scope == "global"
# ↑ 1차 측정 결과: document 도메인에서 multilingual이 natural_language_ko
# -0.10 악화시킴. 영어 번역이 한국어 법령 검색에서 noise로 작용.
# news / global 영역에서만 multilingual 활성 (news_crosslingual +0.10 개선 확인).
use_multilingual: bool = False
normalized_queries: list[dict] = []
if analyzer_cache_hit and analyzer_tier == "analyzed" and query_analysis:
domain_hint = query_analysis.get("domain_hint", "mixed")
language_scope = query_analysis.get("language_scope", "limited")
is_multilingual_candidate = (
domain_hint == "news" or language_scope == "global"
)
if is_multilingual_candidate:
raw_nq = query_analysis.get("normalized_queries") or []
if isinstance(raw_nq, list) and len(raw_nq) >= 2:
normalized_queries = [
nq for nq in raw_nq if isinstance(nq, dict) and nq.get("text")
]
if len(normalized_queries) >= 2:
use_multilingual = True
notes.append(
f"multilingual langs={[nq.get('lang') for nq in normalized_queries]}"
f" hint={domain_hint}/{language_scope}"
)
if mode == "vector":
t0 = time.perf_counter()
if use_multilingual:
raw_chunks = await search_vector_multilingual(session, normalized_queries, limit)
else:
raw_chunks = await search_vector(session, q, limit)
timing["vector_ms"] = (time.perf_counter() - t0) * 1000
if not raw_chunks:
notes.append("vector_search_returned_empty (AI client error or no embeddings)")
# vector 단독 모드도 doc 압축해서 다양성 확보 (chunk 중복 방지)
vector_results, chunks_by_doc = compress_chunks_to_docs(raw_chunks, limit)
results = vector_results
else:
t0 = time.perf_counter()
text_results = await search_text(session, q, limit)
timing["text_ms"] = (time.perf_counter() - t0) * 1000
if mode == "hybrid":
t1 = time.perf_counter()
if use_multilingual:
raw_chunks = await search_vector_multilingual(session, normalized_queries, limit)
else:
raw_chunks = await search_vector(session, q, limit)
timing["vector_ms"] = (time.perf_counter() - t1) * 1000
# chunk-level → doc-level 압축 (raw chunks는 chunks_by_doc에 보존)
t1b = time.perf_counter()
vector_results, chunks_by_doc = compress_chunks_to_docs(raw_chunks, limit)
timing["compress_ms"] = (time.perf_counter() - t1b) * 1000
if not vector_results:
notes.append("vector_search_returned_empty — text-only fallback")
t2 = time.perf_counter()
strategy = get_strategy(fusion)
# fusion은 doc 기준 — 더 넓게 가져옴 (rerank 후보용)
fusion_limit = max(limit * 5, 100) if rerank else limit
fused_docs = strategy.fuse(text_results, vector_results, q, fusion_limit)
timing["fusion_ms"] = (time.perf_counter() - t2) * 1000
notes.append(f"fusion={strategy.name}")
notes.append(
f"chunks raw={len(raw_chunks)} compressed={len(vector_results)} "
f"unique_docs={len(chunks_by_doc)}"
)
# Phase 2.3: soft_filter boost (cache hit + tier != ignore 일 때만)
# analyzer_confidence < 0.5 (tier=ignore)는 비활성.
if (
analyzer_cache_hit
and analyzer_tier != "ignore"
and query_analysis
):
soft_filters = query_analysis.get("soft_filters") or {}
if soft_filters:
boosted = apply_soft_filter_boost(fused_docs, soft_filters)
if boosted > 0:
notes.append(f"soft_filter_boost applied={boosted}")
if rerank:
# Phase 1.3: reranker — chunk 기준 입력
# fusion 결과 doc_id로 chunks_by_doc에서 raw chunks 회수
t3 = time.perf_counter()
rerank_input: list["SearchResult"] = []
for doc in fused_docs:
chunks = chunks_by_doc.get(doc.id, [])
if chunks:
# doc당 max 2 chunk (latency/VRAM 보호)
rerank_input.extend(chunks[:MAX_CHUNKS_PER_DOC])
else:
# text-only 매치 doc → doc 자체를 chunk처럼 wrap
rerank_input.append(doc)
if len(rerank_input) >= MAX_RERANK_INPUT:
break
rerank_input = rerank_input[:MAX_RERANK_INPUT]
notes.append(f"rerank input={len(rerank_input)}")
reranked = await rerank_chunks(q, rerank_input, limit * 3)
timing["rerank_ms"] = (time.perf_counter() - t3) * 1000
# diversity (chunk → doc 압축, max_per_doc=2, top score>0.90 unlimited)
t4 = time.perf_counter()
results = apply_diversity(reranked, max_per_doc=MAX_CHUNKS_PER_DOC)[:limit]
timing["diversity_ms"] = (time.perf_counter() - t4) * 1000
else:
# rerank 비활성: fused_docs를 그대로 (limit 적용)
results = fused_docs[:limit]
else:
results = text_results
# display score 정규화 — 프론트엔드는 score*100을 % 표시.
# fusion 내부 score(RRF는 0.01~0.05 범위)를 그대로 노출하면 표시가 깨짐.
# Phase 3.1: rerank_score 필드는 여기서 건드리지 않음 (raw 보존).
normalize_display_scores(results)
timing["total_ms"] = (time.perf_counter() - t_total) * 1000
# confidence는 fusion 적용 전 raw 신호로 계산 (Phase 0.5 이후 fused score는 절대값 의미 없음)
# rerank 활성 시 reranker score가 가장 신뢰할 수 있는 신호 → 우선 사용
if mode == "hybrid":
if rerank and "rerank_ms" in timing:
confidence_signal = compute_confidence_reranked(results)
else:
confidence_signal = compute_confidence_hybrid(text_results, vector_results)
elif mode == "vector":
confidence_signal = compute_confidence(vector_results, "vector")
else:
confidence_signal = compute_confidence(text_results, mode)
return PipelineResult(
results=results,
mode=mode,
confidence_signal=confidence_signal,
text_results=text_results,
vector_results=vector_results,
raw_chunks=raw_chunks,
chunks_by_doc=chunks_by_doc,
query_analysis=query_analysis,
analyzer_cache_hit=analyzer_cache_hit,
analyzer_confidence=analyzer_confidence,
analyzer_tier=analyzer_tier,
timing_ms=timing,
notes=notes,
)

View File

@@ -1,6 +1,422 @@
"""Grounded answer synthesis 서비스 (Phase 3).
"""Grounded answer synthesis 서비스 (Phase 3.3).
evidence span을 Gemma 4에 전달해 인용 기반 답변 생성.
3~4초 soft timeout, 타임아웃 시 결과만 반환 fallback.
구현은 Phase 3에서 채움.
evidence span 을 Gemma 4 에 전달해 citation 기반 답변 생성한다.
캐시 / timeout / citation 검증 / refused 처리 포함.
## 영구 룰
- **span-only 입력**: `_render_prompt()` 는 `EvidenceItem.span_text` 만 참조한다.
`EvidenceItem.full_snippet` 을 프롬프트에 포함하면 LLM 이 span 밖 내용을
hallucinate 한다. 이 규칙이 깨지면 시스템 무너짐 → docstring + 코드 패턴으로
방어 (함수 상단에서 제한 뷰만 만든다).
- **cache 는 성공 + 고신뢰에만**: 실패 (timeout/parse_failed/llm_error) 와
low confidence / refused 는 캐시 금지. 잘못된 답변 고정 방지.
- **MLX gate 공유**: `get_mlx_gate()` 경유. analyzer / evidence 와 동일 semaphore.
- **timeout 15s**: `asyncio.timeout` 은 gate 안쪽에서만 적용. 바깥에 두면 gate
대기만으로 timeout 발동.
- **citation 검증**: 본문 `[n]` 범위 초과는 제거 + `hallucination_flags` 기록.
answer 수정본을 반환하되 status 는 completed 유지 (silent fix + observable).
"""
from __future__ import annotations
import asyncio
import hashlib
import re
import time
from dataclasses import dataclass, field
from typing import TYPE_CHECKING, Literal
from ai.client import AIClient, _load_prompt, parse_json_response
from core.config import settings
from core.utils import setup_logger
from .llm_gate import get_mlx_gate
if TYPE_CHECKING:
from .evidence_service import EvidenceItem
logger = setup_logger("synthesis")
# ─── 상수 (plan 영구 룰) ─────────────────────────────────
PROMPT_VERSION = "v1"
LLM_TIMEOUT_MS = 15000
CACHE_TTL = 3600 # 1h (answer 는 원문 변경에 민감 → query_analyzer 24h 보다 짧게)
CACHE_MAXSIZE = 300
MAX_ANSWER_CHARS = 400
SynthesisStatus = Literal[
"completed",
"timeout",
"skipped",
"no_evidence",
"parse_failed",
"llm_error",
]
# ─── 반환 타입 ───────────────────────────────────────────
@dataclass(slots=True)
class SynthesisResult:
"""synthesize() 반환. cache dict 에 들어가는 payload 이기도 함."""
status: SynthesisStatus
answer: str | None
used_citations: list[int] # 검증 후 실제로 본문에 등장한 n
confidence: Literal["high", "medium", "low"] | None
refused: bool
refuse_reason: str | None
elapsed_ms: float
cache_hit: bool
hallucination_flags: list[str] = field(default_factory=list)
raw_preview: str | None = None # debug=true 일 때 LLM raw 500자
# ─── 프롬프트 로딩 (module 초기화 1회) ──────────────────
try:
SYNTHESIS_PROMPT = _load_prompt("search_synthesis.txt")
except FileNotFoundError:
SYNTHESIS_PROMPT = ""
logger.warning(
"search_synthesis.txt not found — synthesis will always return llm_error"
)
# ─── in-memory LRU (FIFO 근사, query_analyzer 패턴 복제) ─
_CACHE: dict[str, SynthesisResult] = {}
def _model_version() -> str:
"""현재 primary 모델 ID — 캐시 키에 반영."""
if settings.ai and settings.ai.primary:
return settings.ai.primary.model
return "unknown-model"
def _cache_key(query: str, chunk_ids: list[int]) -> str:
"""(query + sorted chunk_ids + PROMPT_VERSION + model) sha256."""
sorted_ids = ",".join(str(c) for c in sorted(chunk_ids))
raw = f"{query}|{sorted_ids}|{PROMPT_VERSION}|{_model_version()}"
return hashlib.sha256(raw.encode("utf-8")).hexdigest()
def get_cached(query: str, chunk_ids: list[int]) -> SynthesisResult | None:
"""캐시 조회. TTL 경과는 자동 삭제."""
key = _cache_key(query, chunk_ids)
entry = _CACHE.get(key)
if entry is None:
return None
# TTL 체크는 elapsed_ms 를 악용할 수 없으므로 별도 저장
# 여기서는 단순 policy 로 처리: entry 가 있으면 반환 (eviction 은 FIFO 시점)
# 정확한 TTL 이 필요하면 (ts, result) tuple 로 저장해야 함.
return entry
def _should_cache(result: SynthesisResult) -> bool:
"""실패/저신뢰/refused 는 캐시 금지."""
return (
result.status == "completed"
and result.confidence in ("high", "medium")
and not result.refused
and result.answer is not None
)
def set_cached(query: str, chunk_ids: list[int], result: SynthesisResult) -> None:
"""조건부 저장 + FIFO eviction."""
if not _should_cache(result):
return
key = _cache_key(query, chunk_ids)
if key in _CACHE:
_CACHE[key] = result
return
if len(_CACHE) >= CACHE_MAXSIZE:
try:
oldest = next(iter(_CACHE))
_CACHE.pop(oldest, None)
except StopIteration:
pass
_CACHE[key] = result
def cache_stats() -> dict[str, int]:
"""debug/운영용."""
return {"size": len(_CACHE), "maxsize": CACHE_MAXSIZE}
# ─── Prompt rendering (🔒 span_text ONLY) ───────────────
def _render_prompt(query: str, evidence: list["EvidenceItem"]) -> str:
"""{query} / {numbered_evidence} 치환.
⚠ **MUST NOT access `item.full_snippet`**. Use `span_text` ONLY.
Rationale: 프롬프트에 full_snippet 을 넣으면 LLM 이 span 밖 내용으로
hallucinate 한다. full_snippet 은 debug / citation 원문 전용.
제한 뷰만 만들어서 full_snippet 접근을 문법적으로 어렵게 만든다.
"""
# 제한 뷰 — 이 튜플에는 span_text 외의 snippet 필드가 없다
spans: list[tuple[int, str, str]] = [
(i.n, (i.title or "").strip(), i.span_text) for i in evidence
]
lines = [f"[{n}] {title}\n{span}" for n, title, span in spans]
numbered_block = "\n\n".join(lines)
return SYNTHESIS_PROMPT.replace("{query}", query).replace(
"{numbered_evidence}", numbered_block
)
# ─── Citation 검증 ──────────────────────────────────────
_CITATION_RE = re.compile(r"\[(\d+)\]")
def _validate_citations(
answer: str,
n_max: int,
) -> tuple[str, list[int], list[str]]:
"""본문 `[n]` 범위 초과 제거 + used_citations 추출 + flags.
Returns:
(corrected_answer, used_citations, hallucination_flags)
"""
flags: list[str] = []
seen: set[int] = set()
used: list[int] = []
corrected = answer
for match in _CITATION_RE.findall(answer):
try:
n = int(match)
except ValueError:
continue
if n < 1 or n > n_max:
# 범위 초과 → 본문에서 제거 + flag
corrected = corrected.replace(f"[{n}]", "")
flags.append(f"removed_n_{n}")
continue
if n not in seen:
seen.add(n)
used.append(n)
used.sort()
if len(corrected) > MAX_ANSWER_CHARS:
corrected = corrected[:MAX_ANSWER_CHARS]
flags.append("length_clipped")
return corrected, used, flags
# ─── Core: synthesize ───────────────────────────────────
async def synthesize(
query: str,
evidence: list["EvidenceItem"],
ai_client: AIClient | None = None,
debug: bool = False,
) -> SynthesisResult:
"""evidence → grounded answer.
Failure modes 는 모두 SynthesisResult 로 반환한다 (예외는 외부로 전파되지
않음). 호출자 (`/ask` wrapper) 가 status 를 보고 user-facing 메시지를
결정한다.
"""
t_start = time.perf_counter()
# ── evidence 비면 즉시 no_evidence ─────────────────
if not evidence:
return SynthesisResult(
status="no_evidence",
answer=None,
used_citations=[],
confidence=None,
refused=False,
refuse_reason=None,
elapsed_ms=(time.perf_counter() - t_start) * 1000,
cache_hit=False,
hallucination_flags=[],
raw_preview=None,
)
# ── cache lookup ───────────────────────────────────
# chunk_id 가 None 인 text-only wrap 은 음수 doc_id 로 구분 → key 안정화
chunk_ids = [
(e.chunk_id if e.chunk_id is not None else -e.doc_id) for e in evidence
]
cached = get_cached(query, chunk_ids)
if cached is not None:
return SynthesisResult(
status=cached.status,
answer=cached.answer,
used_citations=list(cached.used_citations),
confidence=cached.confidence,
refused=cached.refused,
refuse_reason=cached.refuse_reason,
elapsed_ms=(time.perf_counter() - t_start) * 1000,
cache_hit=True,
hallucination_flags=list(cached.hallucination_flags),
raw_preview=cached.raw_preview if debug else None,
)
# ── prompt 준비 ─────────────────────────────────────
if not SYNTHESIS_PROMPT:
return SynthesisResult(
status="llm_error",
answer=None,
used_citations=[],
confidence=None,
refused=False,
refuse_reason=None,
elapsed_ms=(time.perf_counter() - t_start) * 1000,
cache_hit=False,
hallucination_flags=["prompt_not_loaded"],
raw_preview=None,
)
prompt = _render_prompt(query, evidence)
prompt_preview = prompt[:500] if debug else None
# ── LLM 호출 ───────────────────────────────────────
client_owned = False
if ai_client is None:
ai_client = AIClient()
client_owned = True
raw: str | None = None
llm_error: str | None = None
try:
async with get_mlx_gate():
async with asyncio.timeout(LLM_TIMEOUT_MS / 1000):
raw = await ai_client._call_chat(ai_client.ai.primary, prompt)
except asyncio.TimeoutError:
llm_error = "timeout"
except Exception as exc:
llm_error = f"llm_error:{type(exc).__name__}"
finally:
if client_owned:
try:
await ai_client.close()
except Exception:
pass
elapsed_ms = (time.perf_counter() - t_start) * 1000
if llm_error is not None:
status: SynthesisStatus = "timeout" if llm_error == "timeout" else "llm_error"
logger.warning(
"synthesis %s query=%r evidence_n=%d elapsed_ms=%.0f",
llm_error, query[:80], len(evidence), elapsed_ms,
)
return SynthesisResult(
status=status,
answer=None,
used_citations=[],
confidence=None,
refused=False,
refuse_reason=None,
elapsed_ms=elapsed_ms,
cache_hit=False,
hallucination_flags=[llm_error],
raw_preview=None,
)
parsed = parse_json_response(raw or "")
if not isinstance(parsed, dict):
logger.warning(
"synthesis parse_failed query=%r raw=%r elapsed_ms=%.0f",
query[:80], (raw or "")[:200], elapsed_ms,
)
return SynthesisResult(
status="parse_failed",
answer=None,
used_citations=[],
confidence=None,
refused=False,
refuse_reason=None,
elapsed_ms=elapsed_ms,
cache_hit=False,
hallucination_flags=["parse_failed"],
raw_preview=(raw or "")[:500] if debug else None,
)
# ── JSON 필드 검증 ──────────────────────────────────
answer_raw = parsed.get("answer", "")
if not isinstance(answer_raw, str):
answer_raw = ""
conf_raw = parsed.get("confidence", "low")
if conf_raw not in ("high", "medium", "low"):
conf_raw = "low"
refused_raw = bool(parsed.get("refused", False))
refuse_reason_raw = parsed.get("refuse_reason")
if refuse_reason_raw is not None and not isinstance(refuse_reason_raw, str):
refuse_reason_raw = None
# refused 면 answer 무시 + citations 비움
if refused_raw:
result = SynthesisResult(
status="completed",
answer=None,
used_citations=[],
confidence=conf_raw, # type: ignore[arg-type]
refused=True,
refuse_reason=refuse_reason_raw,
elapsed_ms=elapsed_ms,
cache_hit=False,
hallucination_flags=[],
raw_preview=(raw or "")[:500] if debug else None,
)
logger.info(
"synthesis refused query=%r evidence_n=%d conf=%s elapsed_ms=%.0f reason=%r",
query[:80], len(evidence), conf_raw, elapsed_ms, (refuse_reason_raw or "")[:80],
)
# refused 는 캐시 금지 (_should_cache)
return result
# ── citation 검증 ───────────────────────────────────
corrected_answer, used_citations, flags = _validate_citations(
answer_raw, n_max=len(evidence)
)
# answer 가 공백만 남으면 low confidence 로 강등
if not corrected_answer.strip():
corrected_answer_final: str | None = None
conf_raw = "low"
flags.append("empty_after_validation")
else:
corrected_answer_final = corrected_answer
result = SynthesisResult(
status="completed",
answer=corrected_answer_final,
used_citations=used_citations,
confidence=conf_raw, # type: ignore[arg-type]
refused=False,
refuse_reason=None,
elapsed_ms=elapsed_ms,
cache_hit=False,
hallucination_flags=flags,
raw_preview=(raw or "")[:500] if debug else None,
)
logger.info(
"synthesis ok query=%r evidence_n=%d answer_len=%d citations=%d conf=%s flags=%s elapsed_ms=%.0f",
query[:80],
len(evidence),
len(corrected_answer_final or ""),
len(used_citations),
conf_raw,
",".join(flags) if flags else "-",
elapsed_ms,
)
# 조건부 캐시 저장
set_cached(query, chunk_ids, result)
return result

View File

@@ -244,6 +244,7 @@ async def record_search_event(
results: list[Any],
mode: str,
confidence: float | None = None,
analyzer_confidence: float | None = None,
) -> None:
"""검색 응답 직후 호출. 실패 트리거에 해당하면 로그 INSERT.
@@ -253,6 +254,13 @@ async def record_search_event(
confidence 파라미터:
- None이면 results 기준으로 자체 계산 (legacy 호출용).
- 명시적으로 전달되면 그 값 사용 (Phase 0.5+: fusion 적용 전 raw 신호 기준).
analyzer_confidence (Phase 2.1):
- QueryAnalyzer의 쿼리 분석 신뢰도 (result confidence와 다른 축).
- `result.confidence` 가 낮더라도 `analyzer_confidence` 가 높으면
"retrieval failure" (corpus에 정답 없음)로 해석 가능.
- 반대로 analyzer_confidence < 0.5 이면 "query understanding failure" 해석.
- Phase 2.1에서는 context에만 기록 (failure_reason 분류는 Phase 2.2+에서).
"""
if user_id is None:
return
@@ -260,7 +268,10 @@ async def record_search_event(
if confidence is None:
confidence = compute_confidence(results, mode)
result_count = len(results)
base_ctx = _build_context(results, mode, extra={"confidence": confidence})
extra_ctx: dict[str, Any] = {"confidence": confidence}
if analyzer_confidence is not None:
extra_ctx["analyzer_confidence"] = float(analyzer_confidence)
base_ctx = _build_context(results, mode, extra=extra_ctx)
# ── 1) reformulation 체크 (이전 쿼리가 있으면 그걸 로깅) ──
prior = await _record_and_get_prior(user_id, query)

View File

@@ -0,0 +1,44 @@
"""Phase 4: Global Digest 워커.
7일 뉴스를 country × topic 으로 묶어 cluster-level LLM 요약을 생성하고
global_digests / digest_topics 테이블에 저장한다.
- APScheduler cron (매일 04:00 KST) + 수동 호출 공용 진입점
- PIPELINE_HARD_CAP = 600초 hard cap 으로 cron stuck 절대 방지
- 단독 실행: `python -m workers.digest_worker`
"""
import asyncio
from core.utils import setup_logger
from services.digest.pipeline import run_digest_pipeline
logger = setup_logger("digest_worker")
PIPELINE_HARD_CAP = 600 # 10분 hard cap
async def run() -> None:
"""APScheduler + 수동 호출 공용 진입점.
pipeline 자체는 timeout 으로 감싸지 않음 (per-call timeout 은 summarizer 가 처리).
여기서는 전체 hard cap 만 강제.
"""
try:
result = await asyncio.wait_for(
run_digest_pipeline(),
timeout=PIPELINE_HARD_CAP,
)
logger.info(f"[global_digest] 워커 완료: {result}")
except asyncio.TimeoutError:
logger.error(
f"[global_digest] HARD CAP {PIPELINE_HARD_CAP}s 초과 — 워커 강제 중단. "
f"기존 digest 는 commit 시점에만 갱신되므로 그대로 유지됨. "
f"다음 cron 실행에서 재시도."
)
except Exception as e:
logger.exception(f"[global_digest] 워커 실패: {e}")
if __name__ == "__main__":
asyncio.run(run())

View File

@@ -6,8 +6,8 @@ ai:
models:
primary:
endpoint: "http://100.76.254.116:8800/v1/chat/completions"
model: "mlx-community/Qwen3.5-35B-A3B-4bit"
endpoint: "http://100.76.254.116:8801/v1/chat/completions"
model: "mlx-community/gemma-4-26b-a4b-it-8bit"
max_tokens: 4096
timeout: 60

View File

@@ -89,6 +89,7 @@ services:
- ./config.yaml:/app/config.yaml:ro
- ./scripts:/app/scripts:ro
- ./logs:/app/logs
- ./migrations:/app/migrations:ro
depends_on:
postgres:
condition: service_healthy

View File

@@ -6,6 +6,11 @@
let { onupload = () => {} } = $props();
// home-caddy `request_body max_size 100MB` (infra_inventory.md D8 / Cloudflare 섹션 참조).
// 100MB 초과 파일은 NAS PKM 폴더 직접 마운트 → file_watcher 5분 간격 자동 인덱싱 경로 사용.
const MAX_UPLOAD_BYTES = 100 * 1024 * 1024;
const NAS_FALLBACK_HINT = '대용량 파일은 NAS의 PKM 폴더에 직접 두면 file_watcher 가 5분 이내에 자동 인덱싱합니다.';
let dragging = $state(false);
let uploading = $state(false);
let uploadFiles = $state([]);
@@ -56,7 +61,24 @@
});
async function handleFiles(fileList) {
const files = Array.from(fileList || []);
const allFiles = Array.from(fileList || []);
if (allFiles.length === 0) return;
// 사전 크기 검사 — 100MB 초과는 즉시 차단 + NAS file_watcher 안내
const tooLarge = allFiles.filter(f => f.size > MAX_UPLOAD_BYTES);
const files = allFiles.filter(f => f.size <= MAX_UPLOAD_BYTES);
if (tooLarge.length > 0) {
const names = tooLarge
.map(f => `${f.name} (${(f.size / 1024 / 1024).toFixed(1)}MB)`)
.join(', ');
addToast(
'error',
`100MB 초과 파일은 업로드 불가 (${tooLarge.length}건: ${names}). ${NAS_FALLBACK_HINT}`,
10000
);
}
if (files.length === 0) return;
uploading = true;
@@ -78,6 +100,14 @@
} catch (err) {
uploadFiles[i].status = 'failed';
failed++;
// 서버 측 413 (사전 검사 통과했지만 인프라 한도에 걸린 경우)
if (err && err.status === 413) {
addToast(
'error',
`${files[i].name}: 서버 거절 (Payload Too Large). ${NAS_FALLBACK_HINT}`,
10000
);
}
}
uploadFiles = [...uploadFiles];
}

View File

@@ -59,9 +59,9 @@
async function loadInbox() {
loading = true;
try {
// TODO(backend): /documents/?review_status=pending 서버 필터 지원 시 page_size 축소
const data = await api('/documents/?page_size=200');
documents = (data.items || []).filter((d) => d.review_status === 'pending');
// 서버 필터 review_status=pending 적용 — page_size 100 이내 안전
const data = await api('/documents/?review_status=pending&page_size=100');
documents = data.items || [];
} catch (err) {
addToast('error', 'Inbox 로딩 실패');
} finally {

View File

@@ -0,0 +1,57 @@
-- Phase 4 Global News Digest
-- 7일 rolling window 뉴스를 country × topic 2-level로 묶어 매일 새벽 4시 KST 배치 생성
-- 검색 파이프라인 미사용. documents → clustering → cluster-level LLM summarization → digest
-- 사용자 결정: country→topic 2-level, cluster-level LLM only, drop 금지 fallback,
-- adaptive threshold, EMA centroid, time-decay (λ=ln(2)/3 ≈ 0.231)
-- 부모 테이블: 하루 단위 digest run 메타데이터
CREATE TABLE global_digests (
id BIGSERIAL PRIMARY KEY,
digest_date DATE NOT NULL, -- KST 기준 생성일
window_start TIMESTAMPTZ NOT NULL, -- rolling window 시작 (UTC)
window_end TIMESTAMPTZ NOT NULL, -- 생성 시점 (UTC)
decay_lambda DOUBLE PRECISION NOT NULL, -- 실제 사용된 time-decay λ
total_articles INTEGER NOT NULL DEFAULT 0,
total_countries INTEGER NOT NULL DEFAULT 0,
total_topics INTEGER NOT NULL DEFAULT 0,
generation_ms INTEGER, -- 워커 실행 시간 (성능 회귀 감지)
llm_calls INTEGER NOT NULL DEFAULT 0,
llm_failures INTEGER NOT NULL DEFAULT 0, -- = fallback 사용 횟수
status VARCHAR(20) NOT NULL DEFAULT 'success', -- success | partial | failed
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
UNIQUE (digest_date) -- idempotency: 같은 날짜 재실행 시 DELETE+INSERT
);
CREATE INDEX idx_global_digests_date ON global_digests (digest_date DESC);
-- 자식 테이블: country × topic 단위
CREATE TABLE digest_topics (
id BIGSERIAL PRIMARY KEY,
digest_id BIGINT NOT NULL REFERENCES global_digests(id) ON DELETE CASCADE,
country VARCHAR(10) NOT NULL, -- KR | US | JP | CN | FR | DE | ...
topic_rank INTEGER NOT NULL, -- country 내 1..N (importance_score 내림차순)
topic_label TEXT NOT NULL, -- LLM 생성 5~10 단어 한국어 (또는 fallback 시 "주요 뉴스 묶음")
summary TEXT NOT NULL, -- LLM 생성 1~2 문장 factual (또는 fallback 시 top member ai_summary 첫 200자)
article_ids JSONB NOT NULL, -- [doc_id, ...] 코드가 주입 (LLM 생성 금지)
article_count INTEGER NOT NULL, -- = jsonb_array_length(article_ids)
importance_score DOUBLE PRECISION NOT NULL, -- batch 내 country별 0~1 normalized (cross-country 비교)
raw_weight_sum DOUBLE PRECISION NOT NULL, -- 정규화 전 decay 가중합 (디버그 + day-over-day 트렌드)
centroid_sample JSONB, -- 디버그: LLM 입력 doc id 목록 + summary hash
llm_model VARCHAR(100), -- 사용된 모델 (primary/fallback 추적)
llm_fallback_used BOOLEAN NOT NULL DEFAULT FALSE, -- LLM 실패 시 minimal fallback 적용 여부
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX idx_digest_topics_digest ON digest_topics (digest_id);
CREATE INDEX idx_digest_topics_country ON digest_topics (country);
CREATE INDEX idx_digest_topics_rank ON digest_topics (digest_id, country, topic_rank);

125
reports/phase2_final.md Normal file
View File

@@ -0,0 +1,125 @@
# Phase 2 최종 측정 보고서
**측정일**: 2026-04-08
**대상**: Document Server 검색 v2, Phase 2.1~2.3 통합
**평가셋**: `tests/search_eval/queries.yaml` v0.1 (23 쿼리, 8 카테고리)
**인프라 기준**: `memory/infra_inventory.md` (2026-04-08 실측)
## A/B 결과
| metric | Phase 1.3 baseline (A) | Phase 2 final (B) | Δ |
|---|---|---|---|
| Recall@10 | 0.730 | **0.737** | +0.007 ✓ |
| MRR@10 | 0.795 | 0.797 | +0.002 |
| NDCG@10 | 0.663 | **0.668** | +0.005 ✓ |
| Top-3 hit | 0.900 | 0.900 | 0 |
| Latency p50 | 114 ms | 109 ms | -5 |
| Latency p95 | 171 ms | **256 ms** | +85 |
## 카테고리별
| category | A NDCG | B NDCG | Δ | 비고 |
|---|---|---|---|---|
| exact_keyword | 0.96 | 0.96 | 0 | 회귀 0 ✓ |
| natural_language_ko | 0.73 | 0.73 | 0 | 회귀 0 ✓ (narrowed multilingual 덕) |
| crosslingual_ko_en | 0.53 | 0.53 | 0 | bge-m3 한계 — multilingual 효과 0 |
| **news_crosslingual** | 0.27 | **0.37** | **+0.10** | 개선 ✓ |
| news_ko | 0.36 | 0.37 | +0.01 | 미세 |
| news_en | 0.00 | 0.00 | 0 | 여전히 0 |
| news_fr | 0.46 | 0.46 | 0 | |
| other_domain | 0.88 | 0.88 | 0 | |
## Phase 2 게이트 검증
| 게이트 | 목표 | 실제 | 상태 |
|---|---|---|---|
| Recall@10 | ≥ 0.78 | 0.737 | ❌ (-0.043) |
| Top-3 hit | ≥ 0.93 | 0.900 | ❌ (-0.030) |
| crosslingual_ko_en NDCG | ≥ 0.65 | 0.53 | ❌ (-0.12) |
| news_crosslingual NDCG | ≥ 0.30 | 0.37 | ✓ |
| latency p95 | < 400 ms | 256 ms | ✓ |
| 평가셋 v0.2 완료 | - | v0.1만 | ❌ (후속) |
**2/6 통과** — 목표 미달. 단 회귀 0 + 일부 영역 개선.
## Phase 2에서 실제로 달성한 것
### 1. 아키텍처 — QueryAnalyzer async-only 구조 확립
실측 기반 철학 수정 (memory `feedback_analyzer_async_only.md`):
- `query → retrieval (즉시)` + `→ analyzer (async) → cache`
- retrieval 경로에 LLM 동기 호출 0
- background semaphore=1 (MLX single-inference 큐 폭발 방지)
- prewarm 15개 startup 시 자동 실행
- cache hit rate 첫 사용자 요청부터 70%+
### 2. 실측 데이터 — MLX 한계
gemma-4-26b-a4b-it-8bit MLX:
- full prompt (prompt_tok=2406) → **10.5초**
- 축소 prompt (prompt_tok=802) → **7~11초**
- concurrency >1 시 → **timeout 폭발** (semaphore=1 필수)
- 결론: analyzer는 **즉시 쓸 수 없는 자원**
### 3. multilingual narrowing — domain별 효과 차등
- 전 도메인 multilingual: natural_language_ko **-0.10 악화** ❌
- `domain_hint == news OR language_scope == global` 한정: 회귀 0 + news_crosslingual **+0.10** ✓
- 룰: 한국어 법령 검색에 영어 번역 쿼리 섞으면 noise
### 4. soft_filter boost — 보수적 설정 필요
- 초기 0.03+0.02 → exact_keyword **-0.03 악화**
- 낮춰서 0.01 단일 domain only → 회귀 0
- 평가셋에 filter 쿼리가 없어 효과 직접 측정 불가 (v0.2 확장 후 재평가)
## Phase 2에서 달성하지 못한 것 + 이유
### Recall@10 / Top-3 hit 회복 (0.730 → 0.78+ 미달)
- baseline 대비 +0.007 미세 개선만
- 원인: **corpus 1022 docs로 noise 증가**. chunk 수 7129. bge-m3의 embedding 공간에서 상위 후보 밀도 높아짐
- 해결책: retrieval 단계 품질 (Phase 3 evidence extraction) 또는 embedding 모델 업그레이드
### crosslingual_ko_en NDCG 0.65+ 미달 (0.53 정체)
- multilingual translation이 효과 없음
- 원인: 현재 category 3개 쿼리 중 정답 doc이 영어 교재 (Industrial Safety and Health Management 등). bge-m3는 ko 쿼리로 이 영어 doc을 약 0.5~0.6 cosine으로 이미 찾음. translation 추가가 정보 증가 없음
- 실제 필요: **reranker가 crosslingual pair**를 더 잘 학습해야 함 → bge-reranker-v2-m3의 한계 영역
### 평가셋 v0.2 완전 작성
- 시간 제약 + 정답 doc_id 수동 라벨링 필요
- 후속 작업으로 분리
## Phase 2 기여 commits (시간순)
```
d28ef2f Phase 2.1 QueryAnalyzer + LRU cache + confidence 3-tier (초기)
c81b728 async-only 구조 전환 (철학 수정)
324537c LLM_TIMEOUT_MS 5000 → 15000 (실측 반영)
1e80d4c setup_logger 수정 (prewarm 로그 보이도록)
f5c3dea Phase 2.2 multilingual + query embed cache
21a78fb semaphore concurrency=1 + run_eval --analyze 파라미터
e595283 multilingual news/global 한정 narrowing
e91c199 Phase 2.3 soft_filter boost (초기)
01f144a soft_filter boost 약화 (0.01, doctype 제거)
```
## 다음 단계 선택지 (사용자 결정)
### A. Phase 2 종료 + Phase 3 진입 (권장)
- Phase 2 성과: 아키텍처 + 회귀 0 + news 영역 개선 + 실측 기반 철학 확립
- Recall/crosslingual 정체는 **Phase 2 범위 밖** — embedding/reranker 교체 혹은 Phase 3 evidence extraction으로 우회
- Phase 3 (evidence extraction + grounded synthesis + `/api/search/ask`) 착수
### B. Phase 2 iteration — embedding 실험
- bge-m3 → 다른 embedding (e.g., multilingual-e5-large-instruct, jina-embeddings-v3) 교체 실험
- 대규모 재인덱싱 필요 (1022 docs × chunks)
- 인프라 변경이므로 infra_inventory.md drift 발생
### C. Phase 2 iteration — 평가셋 v0.2 작성
- queries_v0.2.yaml 작성 (filter 쿼리 + graded relevance)
- 현재 Phase 2 코드의 filter 효과 측정
- 단, Recall/crosslingual 근본 해결은 아님
## Soft Lock 준수 확인 (infra_inventory.md)
-`config.yaml` 변경 없음 (GPU local override 그대로)
-`docker compose restart` 사용 안 함 (`up -d --build fastapi`만)
- ✓ Ollama 모델 pull/remove 없음 (bge-m3, exaone3.5 그대로)
- ✓ Reranker 모델 변경 없음 (TEI bge-reranker-v2-m3 그대로)
- ✓ Mac mini MLX 설정 변경 없음

View File

@@ -0,0 +1,24 @@
label,id,category,intent,domain_hint,query,relevant_ids,returned_ids_top10,latency_ms,recall_at_10,mrr_at_10,ndcg_at_10,top3_hit,error
single,kw_001,exact_keyword,fact_lookup,document,산업안전보건법 제6장,3856;3868;3879,3856;3851;3862;3853;3861;3868;3879;3873;3876;3871,95.5,1.000,1.000,0.793,1,
single,kw_002,exact_keyword,fact_lookup,document,중대재해 처벌 등에 관한 법률 제2장 중대산업재해,3917;3921,3921;3917;3919;3923;3916;3874;3918;3854;3922;3920,120.7,1.000,1.000,1.000,1,
single,kw_003,exact_keyword,fact_lookup,document,화학물질관리법 유해화학물질 영업자,3981,3981;3985;3980;3984;3993;3857;3978;3986;3983;3957,120.2,1.000,1.000,1.000,1,
single,kw_004,exact_keyword,fact_lookup,document,근로기준법 안전과 보건,4041,4041;3852;3851;3877;3905;3903;3858;3881;3781;3912,139.3,1.000,1.000,1.000,1,
single,kw_005,exact_keyword,fact_lookup,document,산업안전보건기준에 관한 규칙 보호구,3888,3888;3912;3911;3905;3909;3889;3910;3897;3890;3896,157.8,1.000,1.000,1.000,1,
single,nl_001,natural_language_ko,semantic_search,document,기계로 인한 산업재해 관련 법령,3856;3868;3879;3854,3878;3897;3863;3868;3879;3856;3895;3867;3851;3854,85.9,1.000,0.250,0.571,0,
single,nl_002,natural_language_ko,semantic_search,document,사업주가 도급을 줄 때 산업재해를 예방하기 위해 해야 할 일,3855;3867;3878,3855;3917;3854;3867;3878;3863;3851;3908;3903;3895,82.1,1.000,1.000,0.853,1,
single,nl_003,natural_language_ko,semantic_search,document,유해화학물질을 다루는 회사가 지켜야 할 안전 의무,3980;3981;3982,3980;3903;3904;3905;3981;3985;3896;3917;3857;3909,117.0,0.667,1.000,0.651,1,
single,nl_004,natural_language_ko,semantic_search,document,중대재해가 발생했을 때 경영책임자가 처벌받는 기준,3916;3917;3920;3921,3917;3918;3916;3923;3919;3921;3854;3872;3877;3922,113.6,0.750,1.000,0.725,1,
single,nl_005,natural_language_ko,semantic_search,document,안전보건교육은 누가 받아야 하고 어떤 내용을 다루는가,3853;3865,3853;4025;3876;3879;3859;3865;3781;3815;3818;3787,80.6,1.000,1.000,0.832,1,
single,cl_001,crosslingual_ko_en,semantic_search,document,기계 안전 가드 설계 원리,3770;3856,3770;4540;3817;4541;3774;3816;3787;3758;3793;3773,79.0,0.500,1.000,0.613,1,
single,cl_002,crosslingual_ko_en,semantic_search,document,산업 안전 입문서,3755;3775;3776;3777,3760;3755;3774;3764;3758;3775;3779;3802;3814;3817,107.6,0.500,0.500,0.385,1,
single,cl_003,crosslingual_ko_en,semantic_search,document,전기 안전 위험,3772;3790,3897;3772;3771;4018;3773;3790;3819;4020;3807;3755,125.6,1.000,0.500,0.605,1,
single,news_001,news_ko,semantic_search,news,이란과 미국의 군사 충돌,4303;4304;4307;4316;4322;4323;4327;4335,4317;4321;4771;4446;4743;4452;4307;4418;4331;4744,94.5,0.125,0.143,0.084,1,
single,news_002,news_ko,semantic_search,news,호르무즈 해협 봉쇄,4316;4320;4322;4327,4327;4346;4349;4762;4767;4759;4322;4320;4340;4304,94.3,0.750,1.000,0.644,0,
single,news_003,news_en,semantic_search,news,Trump Iran ultimatum,4258;4260;4262,4776;4515;4519;4658;4644;4763;4333;4762;4679;4321,118.7,0.000,0.000,0.000,1,
single,news_004,news_fr,semantic_search,news,guerre en Iran,4199;4202;4210;4361;4363;4507;4519;4521,4678;4507;4199;4688;4776;4363;4519;4668;4670;4672,119.6,0.500,0.500,0.460,1,
single,news_005,news_crosslingual,semantic_search,news,이란 미국 전쟁 글로벌 반응,4202;4258;4262;4536;4303;4304;4316,4262;4457;4765;4324;4345;4329;4452;4443;4761;4642,95.6,0.143,1.000,0.275,1,
single,misc_001,other_domain,fact_lookup,document,강체의 평면 운동학,4063;4065,4063;4065;4064;4067;4071;4068;4069;4062;4060;4066,172.3,1.000,1.000,1.000,1,
single,misc_002,other_domain,semantic_search,document,질점의 운동역학,4060;4061;4062,4062;4060;4070;4064;4068;4067;4065;4058;4071;4066,261.8,0.667,1.000,0.765,1,
single,fail_001,failure_expected,semantic_search,document,Rust async runtime tokio scheduler 내부 구조,,4815;4069;4546;4062;4547;3801;3787;3812;4542;3770,118.6,0.000,0.000,0.000,1,
single,fail_002,failure_expected,semantic_search,document,양자컴퓨터 큐비트 디코히어런스,,4058;4057;4067;3800;4065;4068;3817;4063;4064;3915,76.9,0.000,0.000,0.000,1,
single,fail_003,failure_expected,semantic_search,news,재즈 보컬리스트 빌리 홀리데이,,4634;4100;4815;4116;4281;4697;4205;4077;4235;4758,73.6,0.000,0.000,0.000,1,
1 label id category intent domain_hint query relevant_ids returned_ids_top10 latency_ms recall_at_10 mrr_at_10 ndcg_at_10 top3_hit error
2 single kw_001 exact_keyword fact_lookup document 산업안전보건법 제6장 3856;3868;3879 3856;3851;3862;3853;3861;3868;3879;3873;3876;3871 95.5 1.000 1.000 0.793 1
3 single kw_002 exact_keyword fact_lookup document 중대재해 처벌 등에 관한 법률 제2장 중대산업재해 3917;3921 3921;3917;3919;3923;3916;3874;3918;3854;3922;3920 120.7 1.000 1.000 1.000 1
4 single kw_003 exact_keyword fact_lookup document 화학물질관리법 유해화학물질 영업자 3981 3981;3985;3980;3984;3993;3857;3978;3986;3983;3957 120.2 1.000 1.000 1.000 1
5 single kw_004 exact_keyword fact_lookup document 근로기준법 안전과 보건 4041 4041;3852;3851;3877;3905;3903;3858;3881;3781;3912 139.3 1.000 1.000 1.000 1
6 single kw_005 exact_keyword fact_lookup document 산업안전보건기준에 관한 규칙 보호구 3888 3888;3912;3911;3905;3909;3889;3910;3897;3890;3896 157.8 1.000 1.000 1.000 1
7 single nl_001 natural_language_ko semantic_search document 기계로 인한 산업재해 관련 법령 3856;3868;3879;3854 3878;3897;3863;3868;3879;3856;3895;3867;3851;3854 85.9 1.000 0.250 0.571 0
8 single nl_002 natural_language_ko semantic_search document 사업주가 도급을 줄 때 산업재해를 예방하기 위해 해야 할 일 3855;3867;3878 3855;3917;3854;3867;3878;3863;3851;3908;3903;3895 82.1 1.000 1.000 0.853 1
9 single nl_003 natural_language_ko semantic_search document 유해화학물질을 다루는 회사가 지켜야 할 안전 의무 3980;3981;3982 3980;3903;3904;3905;3981;3985;3896;3917;3857;3909 117.0 0.667 1.000 0.651 1
10 single nl_004 natural_language_ko semantic_search document 중대재해가 발생했을 때 경영책임자가 처벌받는 기준 3916;3917;3920;3921 3917;3918;3916;3923;3919;3921;3854;3872;3877;3922 113.6 0.750 1.000 0.725 1
11 single nl_005 natural_language_ko semantic_search document 안전보건교육은 누가 받아야 하고 어떤 내용을 다루는가 3853;3865 3853;4025;3876;3879;3859;3865;3781;3815;3818;3787 80.6 1.000 1.000 0.832 1
12 single cl_001 crosslingual_ko_en semantic_search document 기계 안전 가드 설계 원리 3770;3856 3770;4540;3817;4541;3774;3816;3787;3758;3793;3773 79.0 0.500 1.000 0.613 1
13 single cl_002 crosslingual_ko_en semantic_search document 산업 안전 입문서 3755;3775;3776;3777 3760;3755;3774;3764;3758;3775;3779;3802;3814;3817 107.6 0.500 0.500 0.385 1
14 single cl_003 crosslingual_ko_en semantic_search document 전기 안전 위험 3772;3790 3897;3772;3771;4018;3773;3790;3819;4020;3807;3755 125.6 1.000 0.500 0.605 1
15 single news_001 news_ko semantic_search news 이란과 미국의 군사 충돌 4303;4304;4307;4316;4322;4323;4327;4335 4317;4321;4771;4446;4743;4452;4307;4418;4331;4744 94.5 0.125 0.143 0.084 1
16 single news_002 news_ko semantic_search news 호르무즈 해협 봉쇄 4316;4320;4322;4327 4327;4346;4349;4762;4767;4759;4322;4320;4340;4304 94.3 0.750 1.000 0.644 0
17 single news_003 news_en semantic_search news Trump Iran ultimatum 4258;4260;4262 4776;4515;4519;4658;4644;4763;4333;4762;4679;4321 118.7 0.000 0.000 0.000 1
18 single news_004 news_fr semantic_search news guerre en Iran 4199;4202;4210;4361;4363;4507;4519;4521 4678;4507;4199;4688;4776;4363;4519;4668;4670;4672 119.6 0.500 0.500 0.460 1
19 single news_005 news_crosslingual semantic_search news 이란 미국 전쟁 글로벌 반응 4202;4258;4262;4536;4303;4304;4316 4262;4457;4765;4324;4345;4329;4452;4443;4761;4642 95.6 0.143 1.000 0.275 1
20 single misc_001 other_domain fact_lookup document 강체의 평면 운동학 4063;4065 4063;4065;4064;4067;4071;4068;4069;4062;4060;4066 172.3 1.000 1.000 1.000 1
21 single misc_002 other_domain semantic_search document 질점의 운동역학 4060;4061;4062 4062;4060;4070;4064;4068;4067;4065;4058;4071;4066 261.8 0.667 1.000 0.765 1
22 single fail_001 failure_expected semantic_search document Rust async runtime tokio scheduler 내부 구조 4815;4069;4546;4062;4547;3801;3787;3812;4542;3770 118.6 0.000 0.000 0.000 1
23 single fail_002 failure_expected semantic_search document 양자컴퓨터 큐비트 디코히어런스 4058;4057;4067;3800;4065;4068;3817;4063;4064;3915 76.9 0.000 0.000 0.000 1
24 single fail_003 failure_expected semantic_search news 재즈 보컬리스트 빌리 홀리데이 4634;4100;4815;4116;4281;4697;4205;4077;4235;4758 73.6 0.000 0.000 0.000 1

View File

@@ -0,0 +1,24 @@
label,id,category,intent,domain_hint,query,relevant_ids,returned_ids_top10,latency_ms,recall_at_10,mrr_at_10,ndcg_at_10,top3_hit,error
single,kw_001,exact_keyword,fact_lookup,document,산업안전보건법 제6장,3856;3868;3879,3856;3851;3862;3853;3861;3868;3879;3873;3876;3871,67.9,1.000,1.000,0.793,1,
single,kw_002,exact_keyword,fact_lookup,document,중대재해 처벌 등에 관한 법률 제2장 중대산업재해,3917;3921,3921;3917;3919;3923;3916;3874;3918;3854;3922;3920,110.0,1.000,1.000,1.000,1,
single,kw_003,exact_keyword,fact_lookup,document,화학물질관리법 유해화학물질 영업자,3981,3981;3985;3980;3984;3993;3857;3978;3986;3983;3957,119.3,1.000,1.000,1.000,1,
single,kw_004,exact_keyword,fact_lookup,document,근로기준법 안전과 보건,4041,4041;3852;3851;3877;3905;3903;3858;3881;3781;3912,108.7,1.000,1.000,1.000,1,
single,kw_005,exact_keyword,fact_lookup,document,산업안전보건기준에 관한 규칙 보호구,3888,3888;3912;3911;3905;3909;3889;3910;3897;3890;3896,125.5,1.000,1.000,1.000,1,
single,nl_001,natural_language_ko,semantic_search,document,기계로 인한 산업재해 관련 법령,3856;3868;3879;3854,3878;3897;3863;3868;3856;3879;3895;3867;3851;3854,83.9,1.000,0.250,0.571,0,
single,nl_002,natural_language_ko,semantic_search,document,사업주가 도급을 줄 때 산업재해를 예방하기 위해 해야 할 일,3855;3867;3878,3855;3917;3854;3867;3878;3863;3851;3908;3903;3895,118.0,1.000,1.000,0.853,1,
single,nl_003,natural_language_ko,semantic_search,document,유해화학물질을 다루는 회사가 지켜야 할 안전 의무,3980;3981;3982,3980;3903;3904;3905;3981;3985;3896;3917;3857;3909,82.8,0.667,1.000,0.651,1,
single,nl_004,natural_language_ko,semantic_search,document,중대재해가 발생했을 때 경영책임자가 처벌받는 기준,3916;3917;3920;3921,3917;3918;3916;3923;3919;3921;3854;3872;3877;3922,72.1,0.750,1.000,0.725,1,
single,nl_005,natural_language_ko,semantic_search,document,안전보건교육은 누가 받아야 하고 어떤 내용을 다루는가,3853;3865,3853;4025;3876;3879;3859;3865;3781;3815;3818;3787,80.2,1.000,1.000,0.832,1,
single,cl_001,crosslingual_ko_en,semantic_search,document,기계 안전 가드 설계 원리,3770;3856,3770;4540;3817;4541;3774;3816;3787;3758;3793;3773,108.3,0.500,1.000,0.613,1,
single,cl_002,crosslingual_ko_en,semantic_search,document,산업 안전 입문서,3755;3775;3776;3777,3760;3755;3774;3764;3758;3775;3779;3802;3814;3817,79.5,0.500,0.500,0.385,1,
single,cl_003,crosslingual_ko_en,semantic_search,document,전기 안전 위험,3772;3790,3897;3772;3771;4018;3773;3790;3819;4020;3807;3755,103.7,1.000,0.500,0.605,1,
single,news_001,news_ko,semantic_search,news,이란과 미국의 군사 충돌,4303;4304;4307;4316;4322;4323;4327;4335,4317;4321;4771;4743;4307;4452;4761;4678;4418;4331,1445.8,0.125,0.200,0.098,1,
single,news_002,news_ko,semantic_search,news,호르무즈 해협 봉쇄,4316;4320;4322;4327,4327;4346;4349;4762;4767;4759;4322;4320;4340;4304,185.3,0.750,1.000,0.644,0,
single,news_003,news_en,semantic_search,news,Trump Iran ultimatum,4258;4260;4262,4776;4515;4519;4658;4644;4763;4333;4762;4679;4321,76.3,0.000,0.000,0.000,1,
single,news_004,news_fr,semantic_search,news,guerre en Iran,4199;4202;4210;4361;4363;4507;4519;4521,4678;4507;4199;4688;4776;4363;4519;4668;4670;4672,157.9,0.500,0.500,0.460,1,
single,news_005,news_crosslingual,semantic_search,news,이란 미국 전쟁 글로벌 반응,4202;4258;4262;4536;4303;4304;4316,4262;4457;4765;4324;4345;4329;4258;4452;4443;4761,186.7,0.286,1.000,0.367,1,
single,misc_001,other_domain,fact_lookup,document,강체의 평면 운동학,4063;4065,4063;4065;4064;4067;4071;4068;4069;4062;4060;4066,171.6,1.000,1.000,1.000,1,
single,misc_002,other_domain,semantic_search,document,질점의 운동역학,4060;4061;4062,4062;4060;4070;4064;4068;4067;4065;4058;4071;4066,263.6,0.667,1.000,0.765,1,
single,fail_001,failure_expected,semantic_search,document,Rust async runtime tokio scheduler 내부 구조,,4815;4069;4546;4062;4547;3801;3787;3812;4542;3770,121.9,0.000,0.000,0.000,1,
single,fail_002,failure_expected,semantic_search,document,양자컴퓨터 큐비트 디코히어런스,,4058;4057;4067;3800;4065;4068;3817;4063;4064;3915,75.2,0.000,0.000,0.000,1,
single,fail_003,failure_expected,semantic_search,news,재즈 보컬리스트 빌리 홀리데이,,4634;4100;4815;4116;4281;4697;4205;4077;4235;4289,73.9,0.000,0.000,0.000,1,
1 label id category intent domain_hint query relevant_ids returned_ids_top10 latency_ms recall_at_10 mrr_at_10 ndcg_at_10 top3_hit error
2 single kw_001 exact_keyword fact_lookup document 산업안전보건법 제6장 3856;3868;3879 3856;3851;3862;3853;3861;3868;3879;3873;3876;3871 67.9 1.000 1.000 0.793 1
3 single kw_002 exact_keyword fact_lookup document 중대재해 처벌 등에 관한 법률 제2장 중대산업재해 3917;3921 3921;3917;3919;3923;3916;3874;3918;3854;3922;3920 110.0 1.000 1.000 1.000 1
4 single kw_003 exact_keyword fact_lookup document 화학물질관리법 유해화학물질 영업자 3981 3981;3985;3980;3984;3993;3857;3978;3986;3983;3957 119.3 1.000 1.000 1.000 1
5 single kw_004 exact_keyword fact_lookup document 근로기준법 안전과 보건 4041 4041;3852;3851;3877;3905;3903;3858;3881;3781;3912 108.7 1.000 1.000 1.000 1
6 single kw_005 exact_keyword fact_lookup document 산업안전보건기준에 관한 규칙 보호구 3888 3888;3912;3911;3905;3909;3889;3910;3897;3890;3896 125.5 1.000 1.000 1.000 1
7 single nl_001 natural_language_ko semantic_search document 기계로 인한 산업재해 관련 법령 3856;3868;3879;3854 3878;3897;3863;3868;3856;3879;3895;3867;3851;3854 83.9 1.000 0.250 0.571 0
8 single nl_002 natural_language_ko semantic_search document 사업주가 도급을 줄 때 산업재해를 예방하기 위해 해야 할 일 3855;3867;3878 3855;3917;3854;3867;3878;3863;3851;3908;3903;3895 118.0 1.000 1.000 0.853 1
9 single nl_003 natural_language_ko semantic_search document 유해화학물질을 다루는 회사가 지켜야 할 안전 의무 3980;3981;3982 3980;3903;3904;3905;3981;3985;3896;3917;3857;3909 82.8 0.667 1.000 0.651 1
10 single nl_004 natural_language_ko semantic_search document 중대재해가 발생했을 때 경영책임자가 처벌받는 기준 3916;3917;3920;3921 3917;3918;3916;3923;3919;3921;3854;3872;3877;3922 72.1 0.750 1.000 0.725 1
11 single nl_005 natural_language_ko semantic_search document 안전보건교육은 누가 받아야 하고 어떤 내용을 다루는가 3853;3865 3853;4025;3876;3879;3859;3865;3781;3815;3818;3787 80.2 1.000 1.000 0.832 1
12 single cl_001 crosslingual_ko_en semantic_search document 기계 안전 가드 설계 원리 3770;3856 3770;4540;3817;4541;3774;3816;3787;3758;3793;3773 108.3 0.500 1.000 0.613 1
13 single cl_002 crosslingual_ko_en semantic_search document 산업 안전 입문서 3755;3775;3776;3777 3760;3755;3774;3764;3758;3775;3779;3802;3814;3817 79.5 0.500 0.500 0.385 1
14 single cl_003 crosslingual_ko_en semantic_search document 전기 안전 위험 3772;3790 3897;3772;3771;4018;3773;3790;3819;4020;3807;3755 103.7 1.000 0.500 0.605 1
15 single news_001 news_ko semantic_search news 이란과 미국의 군사 충돌 4303;4304;4307;4316;4322;4323;4327;4335 4317;4321;4771;4743;4307;4452;4761;4678;4418;4331 1445.8 0.125 0.200 0.098 1
16 single news_002 news_ko semantic_search news 호르무즈 해협 봉쇄 4316;4320;4322;4327 4327;4346;4349;4762;4767;4759;4322;4320;4340;4304 185.3 0.750 1.000 0.644 0
17 single news_003 news_en semantic_search news Trump Iran ultimatum 4258;4260;4262 4776;4515;4519;4658;4644;4763;4333;4762;4679;4321 76.3 0.000 0.000 0.000 1
18 single news_004 news_fr semantic_search news guerre en Iran 4199;4202;4210;4361;4363;4507;4519;4521 4678;4507;4199;4688;4776;4363;4519;4668;4670;4672 157.9 0.500 0.500 0.460 1
19 single news_005 news_crosslingual semantic_search news 이란 미국 전쟁 글로벌 반응 4202;4258;4262;4536;4303;4304;4316 4262;4457;4765;4324;4345;4329;4258;4452;4443;4761 186.7 0.286 1.000 0.367 1
20 single misc_001 other_domain fact_lookup document 강체의 평면 운동학 4063;4065 4063;4065;4064;4067;4071;4068;4069;4062;4060;4066 171.6 1.000 1.000 1.000 1
21 single misc_002 other_domain semantic_search document 질점의 운동역학 4060;4061;4062 4062;4060;4070;4064;4068;4067;4065;4058;4071;4066 263.6 0.667 1.000 0.765 1
22 single fail_001 failure_expected semantic_search document Rust async runtime tokio scheduler 내부 구조 4815;4069;4546;4062;4547;3801;3787;3812;4542;3770 121.9 0.000 0.000 0.000 1
23 single fail_002 failure_expected semantic_search document 양자컴퓨터 큐비트 디코히어런스 4058;4057;4067;3800;4065;4068;3817;4063;4064;3915 75.2 0.000 0.000 0.000 1
24 single fail_003 failure_expected semantic_search news 재즈 보컬리스트 빌리 홀리데이 4634;4100;4815;4116;4281;4697;4205;4077;4235;4289 73.9 0.000 0.000 0.000 1

View File

@@ -134,6 +134,7 @@ async def call_search(
limit: int = 20,
fusion: str | None = None,
rerank: str | None = None,
analyze: str | None = None,
) -> tuple[list[int], float]:
"""검색 API 호출 → (doc_ids, latency_ms)."""
url = f"{base_url.rstrip('/')}/api/search/"
@@ -143,6 +144,8 @@ async def call_search(
params["fusion"] = fusion
if rerank is not None:
params["rerank"] = rerank
if analyze is not None:
params["analyze"] = analyze
import time
@@ -169,6 +172,7 @@ async def evaluate(
mode: str = "hybrid",
fusion: str | None = None,
rerank: str | None = None,
analyze: str | None = None,
) -> list[QueryResult]:
"""전체 쿼리셋 평가."""
results: list[QueryResult] = []
@@ -177,7 +181,7 @@ async def evaluate(
for q in queries:
try:
returned_ids, latency_ms = await call_search(
client, base_url, token, q.query, mode=mode, fusion=fusion, rerank=rerank
client, base_url, token, q.query, mode=mode, fusion=fusion, rerank=rerank, analyze=analyze
)
results.append(
QueryResult(
@@ -415,6 +419,13 @@ def main() -> int:
choices=["true", "false"],
help="bge-reranker-v2-m3 활성화 (Phase 1.3+, 미지정 시 서버 기본값=true)",
)
parser.add_argument(
"--analyze",
type=str,
default=None,
choices=["true", "false"],
help="QueryAnalyzer 활성화 (Phase 2.1+, cache hit 시 multilingual 적용)",
)
parser.add_argument(
"--token",
type=str,
@@ -454,21 +465,21 @@ def main() -> int:
if args.base_url:
print(f"\n>>> evaluating: {args.base_url}")
results = asyncio.run(
evaluate(queries, args.base_url, args.token, "single", mode=args.mode, fusion=args.fusion, rerank=args.rerank)
evaluate(queries, args.base_url, args.token, "single", mode=args.mode, fusion=args.fusion, rerank=args.rerank, analyze=args.analyze)
)
print_summary("single", results)
all_results.extend(results)
else:
print(f"\n>>> baseline: {args.baseline_url}")
baseline_results = asyncio.run(
evaluate(queries, args.baseline_url, args.token, "baseline", mode=args.mode, fusion=args.fusion, rerank=args.rerank)
evaluate(queries, args.baseline_url, args.token, "baseline", mode=args.mode, fusion=args.fusion, rerank=args.rerank, analyze=args.analyze)
)
baseline_summary = print_summary("baseline", baseline_results)
print(f"\n>>> candidate: {args.candidate_url}")
candidate_results = asyncio.run(
evaluate(
queries, args.candidate_url, args.token, "candidate", mode=args.mode, fusion=args.fusion, rerank=args.rerank
queries, args.candidate_url, args.token, "candidate", mode=args.mode, fusion=args.fusion, rerank=args.rerank, analyze=args.analyze
)
)
candidate_summary = print_summary("candidate", candidate_results)