Compare commits
23 Commits
feature/de
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
563f54d7d5 | ||
|
|
010e25cb23 | ||
|
|
bfdf33b442 | ||
|
|
4615fb4ce3 | ||
|
|
cdcbb07561 | ||
|
|
46ba9dd231 | ||
|
|
4468c2baba | ||
|
|
9bef049af6 | ||
|
|
dd9a0f600a | ||
|
|
d5f91556e6 | ||
|
|
75a1919342 | ||
|
|
64322e4f6f | ||
|
|
120db86d74 | ||
|
|
01f144ab25 | ||
|
|
e91c199537 | ||
|
|
e595283e27 | ||
|
|
21a78fbbf0 | ||
|
|
f5c3dea833 | ||
|
|
1e80d4c613 | ||
|
|
324537cbc8 | ||
|
|
c81b728ddf | ||
|
|
d28ef2fca0 | ||
|
|
de08735420 |
15
CLAUDE.md
15
CLAUDE.md
@@ -1,5 +1,20 @@
|
||||
# hyungi_Document_Server — Claude Code 작업 가이드
|
||||
|
||||
## Infrastructure Reference 📌
|
||||
|
||||
**Always refer to** `~/.claude/projects/-Users-hyungiahn/memory/infra_inventory.md` for:
|
||||
- AI model routing (primary / fallback / embedding / rerank / vision) — **the model names below may be stale**
|
||||
- Machine info, Tailscale IPs, SSH targets
|
||||
- Docker container topology and compose projects
|
||||
- Drift log (known Desired vs Actual inconsistencies)
|
||||
- Verify commands
|
||||
|
||||
**If this file and `infra_inventory.md` disagree, `infra_inventory.md` is authoritative.** Do not change `config.yaml` / `credentials.env` without first updating `infra_inventory.md`.
|
||||
|
||||
**Search experiment soft lock**: During Phase 2 work (search.py refactor, QueryAnalyzer, run_eval.py execution), do **not** run `docker compose restart`, change `config.yaml`, or pull Ollama models. Violating this invalidates the experiment baseline.
|
||||
|
||||
---
|
||||
|
||||
## 프로젝트 개요
|
||||
|
||||
Self-hosted PKM(Personal Knowledge Management) 웹 애플리케이션.
|
||||
|
||||
164
app/api/digest.py
Normal file
164
app/api/digest.py
Normal file
@@ -0,0 +1,164 @@
|
||||
"""Phase 4 Global Digest API — read-only + 디버그 regenerate.
|
||||
|
||||
엔드포인트:
|
||||
- GET /api/digest/latest : 가장 최근 digest
|
||||
- GET /api/digest?date=YYYY-MM-DD : 특정 날짜 digest
|
||||
- GET /api/digest?country=KR : 특정 국가만
|
||||
- POST /api/digest/regenerate : 백그라운드 digest 워커 트리거 (auth 필요)
|
||||
|
||||
응답은 country → topic 2-level 구조. country 가 비어있는 경우 응답에서 자동 생략.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
from datetime import date as date_type
|
||||
from datetime import datetime
|
||||
from typing import Annotated
|
||||
|
||||
from fastapi import APIRouter, Depends, HTTPException, Query
|
||||
from pydantic import BaseModel
|
||||
from sqlalchemy import select
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
from sqlalchemy.orm import selectinload
|
||||
|
||||
from core.auth import get_current_user
|
||||
from core.database import get_session
|
||||
from models.digest import DigestTopic, GlobalDigest
|
||||
from models.user import User
|
||||
|
||||
router = APIRouter()
|
||||
|
||||
|
||||
# ─── Pydantic 응답 모델 (schemas/ 디렉토리 미사용 → inline 정의) ───
|
||||
|
||||
|
||||
class TopicResponse(BaseModel):
|
||||
topic_rank: int
|
||||
topic_label: str
|
||||
summary: str
|
||||
article_ids: list[int]
|
||||
article_count: int
|
||||
importance_score: float
|
||||
raw_weight_sum: float
|
||||
llm_fallback_used: bool
|
||||
|
||||
|
||||
class CountryGroup(BaseModel):
|
||||
country: str
|
||||
topics: list[TopicResponse]
|
||||
|
||||
|
||||
class DigestResponse(BaseModel):
|
||||
digest_date: date_type
|
||||
window_start: datetime
|
||||
window_end: datetime
|
||||
decay_lambda: float
|
||||
total_articles: int
|
||||
total_countries: int
|
||||
total_topics: int
|
||||
generation_ms: int | None
|
||||
llm_calls: int
|
||||
llm_failures: int
|
||||
status: str
|
||||
countries: list[CountryGroup]
|
||||
|
||||
|
||||
# ─── helpers ───
|
||||
|
||||
|
||||
def _build_response(digest: GlobalDigest, country_filter: str | None = None) -> DigestResponse:
|
||||
"""ORM 객체 → DigestResponse. country_filter 가 주어지면 해당 국가만."""
|
||||
topics_by_country: dict[str, list[TopicResponse]] = {}
|
||||
for t in sorted(digest.topics, key=lambda x: (x.country, x.topic_rank)):
|
||||
if country_filter and t.country != country_filter:
|
||||
continue
|
||||
topics_by_country.setdefault(t.country, []).append(
|
||||
TopicResponse(
|
||||
topic_rank=t.topic_rank,
|
||||
topic_label=t.topic_label,
|
||||
summary=t.summary,
|
||||
article_ids=list(t.article_ids or []),
|
||||
article_count=t.article_count,
|
||||
importance_score=t.importance_score,
|
||||
raw_weight_sum=t.raw_weight_sum,
|
||||
llm_fallback_used=t.llm_fallback_used,
|
||||
)
|
||||
)
|
||||
|
||||
countries = [
|
||||
CountryGroup(country=c, topics=topics_by_country[c])
|
||||
for c in sorted(topics_by_country.keys())
|
||||
]
|
||||
|
||||
return DigestResponse(
|
||||
digest_date=digest.digest_date,
|
||||
window_start=digest.window_start,
|
||||
window_end=digest.window_end,
|
||||
decay_lambda=digest.decay_lambda,
|
||||
total_articles=digest.total_articles,
|
||||
total_countries=digest.total_countries,
|
||||
total_topics=digest.total_topics,
|
||||
generation_ms=digest.generation_ms,
|
||||
llm_calls=digest.llm_calls,
|
||||
llm_failures=digest.llm_failures,
|
||||
status=digest.status,
|
||||
countries=countries,
|
||||
)
|
||||
|
||||
|
||||
async def _load_digest(
|
||||
session: AsyncSession,
|
||||
target_date: date_type | None,
|
||||
) -> GlobalDigest | None:
|
||||
"""date 가 주어지면 해당 날짜, 아니면 최신 digest 1건."""
|
||||
query = select(GlobalDigest).options(selectinload(GlobalDigest.topics))
|
||||
if target_date is not None:
|
||||
query = query.where(GlobalDigest.digest_date == target_date)
|
||||
else:
|
||||
query = query.order_by(GlobalDigest.digest_date.desc())
|
||||
query = query.limit(1)
|
||||
result = await session.execute(query)
|
||||
return result.scalar_one_or_none()
|
||||
|
||||
|
||||
# ─── Routes ───
|
||||
|
||||
|
||||
@router.get("/latest", response_model=DigestResponse)
|
||||
async def get_latest(
|
||||
user: Annotated[User, Depends(get_current_user)],
|
||||
session: Annotated[AsyncSession, Depends(get_session)],
|
||||
):
|
||||
"""가장 최근 생성된 global digest."""
|
||||
digest = await _load_digest(session, target_date=None)
|
||||
if digest is None:
|
||||
raise HTTPException(status_code=404, detail="아직 생성된 digest 없음")
|
||||
return _build_response(digest)
|
||||
|
||||
|
||||
@router.get("", response_model=DigestResponse)
|
||||
async def get_digest(
|
||||
user: Annotated[User, Depends(get_current_user)],
|
||||
session: Annotated[AsyncSession, Depends(get_session)],
|
||||
date: date_type | None = Query(default=None, description="YYYY-MM-DD (KST)"),
|
||||
country: str | None = Query(default=None, description="국가 코드 (예: KR)"),
|
||||
):
|
||||
"""특정 날짜 또는 국가 필터링된 digest. date 미지정 시 최신."""
|
||||
digest = await _load_digest(session, target_date=date)
|
||||
if digest is None:
|
||||
raise HTTPException(
|
||||
status_code=404,
|
||||
detail=f"digest 없음 (date={date})" if date else "아직 생성된 digest 없음",
|
||||
)
|
||||
country_filter = country.upper() if country else None
|
||||
return _build_response(digest, country_filter=country_filter)
|
||||
|
||||
|
||||
@router.post("/regenerate")
|
||||
async def regenerate(
|
||||
user: Annotated[User, Depends(get_current_user)],
|
||||
):
|
||||
"""디버그용 수동 트리거 — 백그라운드 태스크로 워커 실행 (auth 필요)."""
|
||||
from workers.digest_worker import run
|
||||
|
||||
asyncio.create_task(run())
|
||||
return {"status": "started", "message": "global_digest 워커 백그라운드 실행 시작"}
|
||||
@@ -142,11 +142,12 @@ async def list_documents(
|
||||
user: Annotated[User, Depends(get_current_user)],
|
||||
session: Annotated[AsyncSession, Depends(get_session)],
|
||||
page: int = Query(1, ge=1),
|
||||
page_size: int = Query(20, ge=1, le=100),
|
||||
page_size: int = Query(20, ge=1, le=500),
|
||||
domain: str | None = None,
|
||||
sub_group: str | None = None,
|
||||
source: str | None = None,
|
||||
format: str | None = None,
|
||||
review_status: str | None = Query(None, description="pending | approved | rejected"),
|
||||
):
|
||||
"""문서 목록 조회 (페이지네이션 + 필터, 뉴스 제외)"""
|
||||
query = select(Document).where(Document.deleted_at == None, Document.source_channel != "news")
|
||||
@@ -158,6 +159,8 @@ async def list_documents(
|
||||
query = query.where(Document.source_channel == source)
|
||||
if format:
|
||||
query = query.where(Document.file_format == format)
|
||||
if review_status:
|
||||
query = query.where(Document.review_status == review_status)
|
||||
|
||||
# 전체 건수
|
||||
count_query = select(func.count()).select_from(query.subquery())
|
||||
|
||||
@@ -1,11 +1,16 @@
|
||||
"""하이브리드 검색 API — orchestrator (Phase 1.1: thin endpoint).
|
||||
"""하이브리드 검색 API — thin endpoint (Phase 3.1 이후).
|
||||
|
||||
retrieval / fusion / rerank 등 실제 로직은 services/search/* 모듈로 분리.
|
||||
이 파일은 mode 분기, 응답 직렬화, debug 응답 구성, BackgroundTask dispatch만 담당.
|
||||
실제 검색 파이프라인(retrieval → fusion → rerank → diversity → confidence)
|
||||
은 `services/search/search_pipeline.py::run_search()` 로 분리되어 있다.
|
||||
이 파일은 다음만 담당:
|
||||
- Pydantic 스키마 (SearchResult / SearchResponse / SearchDebug / DebugCandidate
|
||||
/ Citation / AskResponse / AskDebug)
|
||||
- `/search` endpoint wrapper (run_search 호출 + logger + telemetry + 직렬화)
|
||||
- `/ask` endpoint wrapper (Phase 3.3 에서 추가)
|
||||
"""
|
||||
|
||||
import time
|
||||
from typing import Annotated
|
||||
from typing import Annotated, Literal
|
||||
|
||||
from fastapi import APIRouter, BackgroundTasks, Depends, Query
|
||||
from pydantic import BaseModel
|
||||
@@ -15,20 +20,11 @@ from core.auth import get_current_user
|
||||
from core.database import get_session
|
||||
from core.utils import setup_logger
|
||||
from models.user import User
|
||||
from services.search.fusion_service import DEFAULT_FUSION, get_strategy, normalize_display_scores
|
||||
from services.search.rerank_service import (
|
||||
MAX_CHUNKS_PER_DOC,
|
||||
MAX_RERANK_INPUT,
|
||||
apply_diversity,
|
||||
rerank_chunks,
|
||||
)
|
||||
from services.search.retrieval_service import compress_chunks_to_docs, search_text, search_vector
|
||||
from services.search_telemetry import (
|
||||
compute_confidence,
|
||||
compute_confidence_hybrid,
|
||||
compute_confidence_reranked,
|
||||
record_search_event,
|
||||
)
|
||||
from services.search.evidence_service import EvidenceItem, extract_evidence
|
||||
from services.search.fusion_service import DEFAULT_FUSION
|
||||
from services.search.search_pipeline import PipelineResult, run_search
|
||||
from services.search.synthesis_service import SynthesisResult, synthesize
|
||||
from services.search_telemetry import record_search_event
|
||||
|
||||
# logs/search.log + stdout 동시 출력 (Phase 0.4)
|
||||
logger = setup_logger("search")
|
||||
@@ -56,6 +52,10 @@ class SearchResult(BaseModel):
|
||||
chunk_id: int | None = None
|
||||
chunk_index: int | None = None
|
||||
section_title: str | None = None
|
||||
# Phase 3.1: reranker raw score 보존 (display score drift 방지).
|
||||
# rerank 경로를 탄 chunk에만 채워짐. normalize_display_scores는 이 필드를
|
||||
# 건드리지 않는다. Phase 3 evidence fast-path 판단에 사용.
|
||||
rerank_score: float | None = None
|
||||
|
||||
|
||||
# ─── Phase 0.4: 디버그 응답 스키마 ─────────────────────────
|
||||
@@ -98,6 +98,29 @@ def _to_debug_candidates(rows: list[SearchResult], n: int = 20) -> list[DebugCan
|
||||
]
|
||||
|
||||
|
||||
def _build_search_debug(pr: PipelineResult) -> SearchDebug:
|
||||
"""PipelineResult → SearchDebug (기존 search()의 debug 구성 블록 복사)."""
|
||||
return SearchDebug(
|
||||
timing_ms=pr.timing_ms,
|
||||
text_candidates=(
|
||||
_to_debug_candidates(pr.text_results)
|
||||
if pr.text_results or pr.mode != "vector"
|
||||
else None
|
||||
),
|
||||
vector_candidates=(
|
||||
_to_debug_candidates(pr.vector_results)
|
||||
if pr.vector_results or pr.mode in ("vector", "hybrid")
|
||||
else None
|
||||
),
|
||||
fused_candidates=(
|
||||
_to_debug_candidates(pr.results) if pr.mode == "hybrid" else None
|
||||
),
|
||||
confidence=pr.confidence_signal,
|
||||
notes=pr.notes,
|
||||
query_analysis=pr.query_analysis,
|
||||
)
|
||||
|
||||
|
||||
@router.get("/", response_model=SearchResponse)
|
||||
async def search(
|
||||
q: str,
|
||||
@@ -115,134 +138,301 @@ async def search(
|
||||
True,
|
||||
description="bge-reranker-v2-m3 활성화 (Phase 1.3, hybrid 모드만 동작)",
|
||||
),
|
||||
analyze: bool = Query(
|
||||
False,
|
||||
description="QueryAnalyzer 활성화 (Phase 2.1, LLM 호출). Phase 2.1은 debug 노출만, 검색 경로 영향 X",
|
||||
),
|
||||
debug: bool = Query(False, description="단계별 candidates + timing 응답에 포함"),
|
||||
):
|
||||
"""문서 검색 — FTS + ILIKE + 벡터 결합 (Phase 0.5: RRF fusion)"""
|
||||
timing: dict[str, float] = {}
|
||||
notes: list[str] = []
|
||||
text_results: list[SearchResult] = []
|
||||
vector_results: list[SearchResult] = [] # doc-level (압축 후, fusion 입력)
|
||||
raw_chunks: list[SearchResult] = [] # chunk-level (raw, Phase 1.3 reranker용)
|
||||
chunks_by_doc: dict[int, list[SearchResult]] = {} # Phase 1.3 reranker용 보존
|
||||
|
||||
t_total = time.perf_counter()
|
||||
|
||||
if mode == "vector":
|
||||
t0 = time.perf_counter()
|
||||
raw_chunks = await search_vector(session, q, limit)
|
||||
timing["vector_ms"] = (time.perf_counter() - t0) * 1000
|
||||
if not raw_chunks:
|
||||
notes.append("vector_search_returned_empty (AI client error or no embeddings)")
|
||||
# vector 단독 모드도 doc 압축해서 다양성 확보 (chunk 중복 방지)
|
||||
vector_results, chunks_by_doc = compress_chunks_to_docs(raw_chunks, limit)
|
||||
results = vector_results
|
||||
else:
|
||||
t0 = time.perf_counter()
|
||||
text_results = await search_text(session, q, limit)
|
||||
timing["text_ms"] = (time.perf_counter() - t0) * 1000
|
||||
|
||||
if mode == "hybrid":
|
||||
t1 = time.perf_counter()
|
||||
raw_chunks = await search_vector(session, q, limit)
|
||||
timing["vector_ms"] = (time.perf_counter() - t1) * 1000
|
||||
|
||||
# chunk-level → doc-level 압축 (raw chunks는 chunks_by_doc에 보존)
|
||||
t1b = time.perf_counter()
|
||||
vector_results, chunks_by_doc = compress_chunks_to_docs(raw_chunks, limit)
|
||||
timing["compress_ms"] = (time.perf_counter() - t1b) * 1000
|
||||
|
||||
if not vector_results:
|
||||
notes.append("vector_search_returned_empty — text-only fallback")
|
||||
|
||||
t2 = time.perf_counter()
|
||||
strategy = get_strategy(fusion)
|
||||
# fusion은 doc 기준 — 더 넓게 가져옴 (rerank 후보용)
|
||||
fusion_limit = max(limit * 5, 100) if rerank else limit
|
||||
fused_docs = strategy.fuse(text_results, vector_results, q, fusion_limit)
|
||||
timing["fusion_ms"] = (time.perf_counter() - t2) * 1000
|
||||
notes.append(f"fusion={strategy.name}")
|
||||
notes.append(
|
||||
f"chunks raw={len(raw_chunks)} compressed={len(vector_results)} "
|
||||
f"unique_docs={len(chunks_by_doc)}"
|
||||
"""문서 검색 — FTS + ILIKE + 벡터 결합 (Phase 3.1 이후 run_search wrapper)"""
|
||||
pr = await run_search(
|
||||
session,
|
||||
q,
|
||||
mode=mode, # type: ignore[arg-type]
|
||||
limit=limit,
|
||||
fusion=fusion,
|
||||
rerank=rerank,
|
||||
analyze=analyze,
|
||||
)
|
||||
|
||||
if rerank:
|
||||
# Phase 1.3: reranker — chunk 기준 입력
|
||||
# fusion 결과 doc_id로 chunks_by_doc에서 raw chunks 회수
|
||||
t3 = time.perf_counter()
|
||||
rerank_input: list[SearchResult] = []
|
||||
for doc in fused_docs:
|
||||
chunks = chunks_by_doc.get(doc.id, [])
|
||||
if chunks:
|
||||
# doc당 max 2 chunk (latency/VRAM 보호)
|
||||
rerank_input.extend(chunks[:MAX_CHUNKS_PER_DOC])
|
||||
else:
|
||||
# text-only 매치 doc → doc 자체를 chunk처럼 wrap
|
||||
rerank_input.append(doc)
|
||||
if len(rerank_input) >= MAX_RERANK_INPUT:
|
||||
break
|
||||
rerank_input = rerank_input[:MAX_RERANK_INPUT]
|
||||
notes.append(f"rerank input={len(rerank_input)}")
|
||||
|
||||
reranked = await rerank_chunks(q, rerank_input, limit * 3)
|
||||
timing["rerank_ms"] = (time.perf_counter() - t3) * 1000
|
||||
|
||||
# diversity (chunk → doc 압축, max_per_doc=2, top score>0.90 unlimited)
|
||||
t4 = time.perf_counter()
|
||||
results = apply_diversity(reranked, max_per_doc=MAX_CHUNKS_PER_DOC)[:limit]
|
||||
timing["diversity_ms"] = (time.perf_counter() - t4) * 1000
|
||||
else:
|
||||
# rerank 비활성: fused_docs를 그대로 (limit 적용)
|
||||
results = fused_docs[:limit]
|
||||
else:
|
||||
results = text_results
|
||||
|
||||
# display score 정규화 — 프론트엔드는 score*100을 % 표시.
|
||||
# fusion 내부 score(RRF는 0.01~0.05 범위)를 그대로 노출하면 표시가 깨짐.
|
||||
normalize_display_scores(results)
|
||||
|
||||
timing["total_ms"] = (time.perf_counter() - t_total) * 1000
|
||||
|
||||
# confidence는 fusion 적용 전 raw 신호로 계산 (Phase 0.5 이후 fused score는 절대값 의미 없음)
|
||||
# rerank 활성 시 reranker score가 가장 신뢰할 수 있는 신호 → 우선 사용
|
||||
if mode == "hybrid":
|
||||
if rerank and "rerank_ms" in timing:
|
||||
confidence_signal = compute_confidence_reranked(results)
|
||||
else:
|
||||
confidence_signal = compute_confidence_hybrid(text_results, vector_results)
|
||||
elif mode == "vector":
|
||||
confidence_signal = compute_confidence(vector_results, "vector")
|
||||
else:
|
||||
confidence_signal = compute_confidence(text_results, mode)
|
||||
|
||||
# 사용자 feedback: 모든 단계 timing은 debug 응답과 별도로 항상 로그로 남긴다
|
||||
timing_str = " ".join(f"{k}={v:.0f}" for k, v in timing.items())
|
||||
timing_str = " ".join(f"{k}={v:.0f}" for k, v in pr.timing_ms.items())
|
||||
fusion_str = f" fusion={fusion}" if mode == "hybrid" else ""
|
||||
analyzer_str = (
|
||||
f" analyzer=hit={pr.analyzer_cache_hit}/conf={pr.analyzer_confidence:.2f}/tier={pr.analyzer_tier}"
|
||||
if analyze
|
||||
else ""
|
||||
)
|
||||
logger.info(
|
||||
"search query=%r mode=%s%s results=%d conf=%.2f %s",
|
||||
q[:80], mode, fusion_str, len(results), confidence_signal, timing_str,
|
||||
"search query=%r mode=%s%s%s results=%d conf=%.2f %s",
|
||||
q[:80],
|
||||
pr.mode,
|
||||
fusion_str,
|
||||
analyzer_str,
|
||||
len(pr.results),
|
||||
pr.confidence_signal,
|
||||
timing_str,
|
||||
)
|
||||
|
||||
# Phase 0.3: 실패 자동 로깅 (응답 latency에 영향 X — background task)
|
||||
# Phase 2.1: analyze=true일 때만 analyzer_confidence 전달 (False는 None → 기존 호환)
|
||||
background_tasks.add_task(
|
||||
record_search_event, q, user.id, results, mode, confidence_signal
|
||||
record_search_event,
|
||||
q,
|
||||
user.id,
|
||||
pr.results,
|
||||
pr.mode,
|
||||
pr.confidence_signal,
|
||||
pr.analyzer_confidence if analyze else None,
|
||||
)
|
||||
|
||||
debug_obj: SearchDebug | None = None
|
||||
if debug:
|
||||
debug_obj = SearchDebug(
|
||||
timing_ms=timing,
|
||||
text_candidates=_to_debug_candidates(text_results) if text_results or mode != "vector" else None,
|
||||
vector_candidates=_to_debug_candidates(vector_results) if vector_results or mode in ("vector", "hybrid") else None,
|
||||
fused_candidates=_to_debug_candidates(results) if mode == "hybrid" else None,
|
||||
confidence=confidence_signal,
|
||||
notes=notes,
|
||||
)
|
||||
debug_obj = _build_search_debug(pr) if debug else None
|
||||
|
||||
return SearchResponse(
|
||||
results=results,
|
||||
total=len(results),
|
||||
results=pr.results,
|
||||
total=len(pr.results),
|
||||
query=q,
|
||||
mode=mode,
|
||||
mode=pr.mode,
|
||||
debug=debug_obj,
|
||||
)
|
||||
|
||||
|
||||
# ═══════════════════════════════════════════════════════════
|
||||
# Phase 3.3: /api/search/ask — Evidence + Grounded Synthesis
|
||||
# ═══════════════════════════════════════════════════════════
|
||||
|
||||
|
||||
class Citation(BaseModel):
|
||||
"""answer 본문의 [n] 에 해당하는 근거 단일 행."""
|
||||
|
||||
n: int
|
||||
chunk_id: int | None
|
||||
doc_id: int
|
||||
title: str | None
|
||||
section_title: str | None
|
||||
span_text: str # evidence LLM 이 추출한 50~300자
|
||||
full_snippet: str # 원본 800자 (citation 원문 보기 전용)
|
||||
relevance: float
|
||||
rerank_score: float
|
||||
|
||||
|
||||
class AskDebug(BaseModel):
|
||||
"""`/ask?debug=true` 응답 확장."""
|
||||
|
||||
timing_ms: dict[str, float]
|
||||
search_notes: list[str]
|
||||
query_analysis: dict | None = None
|
||||
confidence_signal: float
|
||||
evidence_candidate_count: int
|
||||
evidence_kept_count: int
|
||||
evidence_skip_reason: str | None
|
||||
synthesis_cache_hit: bool
|
||||
synthesis_prompt_preview: str | None = None
|
||||
synthesis_raw_preview: str | None = None
|
||||
hallucination_flags: list[str] = []
|
||||
|
||||
|
||||
class AskResponse(BaseModel):
|
||||
"""`/ask` 응답. `/search` 의 SearchResult 는 그대로 재사용."""
|
||||
|
||||
results: list[SearchResult]
|
||||
ai_answer: str | None
|
||||
citations: list[Citation]
|
||||
synthesis_status: Literal[
|
||||
"completed", "timeout", "skipped", "no_evidence", "parse_failed", "llm_error"
|
||||
]
|
||||
synthesis_ms: float
|
||||
confidence: Literal["high", "medium", "low"] | None
|
||||
refused: bool
|
||||
no_results_reason: str | None
|
||||
query: str
|
||||
total: int
|
||||
debug: AskDebug | None = None
|
||||
|
||||
|
||||
def _map_no_results_reason(
|
||||
pr: PipelineResult,
|
||||
evidence: list[EvidenceItem],
|
||||
ev_skip: str | None,
|
||||
sr: SynthesisResult,
|
||||
) -> str | None:
|
||||
"""사용자에게 보여줄 한국어 메시지 매핑.
|
||||
|
||||
Failure mode 표 (plan §Failure Modes) 기반.
|
||||
"""
|
||||
# LLM 자가 refused → 모델이 준 사유 그대로
|
||||
if sr.refused and sr.refuse_reason:
|
||||
return sr.refuse_reason
|
||||
|
||||
# synthesis 상태 우선
|
||||
if sr.status == "no_evidence":
|
||||
if not pr.results:
|
||||
return "검색 결과가 없습니다."
|
||||
return "관련도 높은 근거를 찾지 못했습니다."
|
||||
if sr.status == "skipped":
|
||||
return "검색 결과가 없습니다."
|
||||
if sr.status == "timeout":
|
||||
return "답변 생성이 지연되어 생략했습니다. 검색 결과를 확인해 주세요."
|
||||
if sr.status == "parse_failed":
|
||||
return "답변 형식 오류로 생략했습니다."
|
||||
if sr.status == "llm_error":
|
||||
return "AI 서버에 일시적 문제가 있습니다."
|
||||
|
||||
# evidence 단계 실패는 fallback 을 탔더라도 notes 용
|
||||
if ev_skip == "all_low_rerank":
|
||||
return "관련도 높은 근거를 찾지 못했습니다."
|
||||
if ev_skip == "empty_retrieval":
|
||||
return "검색 결과가 없습니다."
|
||||
|
||||
return None
|
||||
|
||||
|
||||
def _build_citations(
|
||||
evidence: list[EvidenceItem], used_citations: list[int]
|
||||
) -> list[Citation]:
|
||||
"""answer 본문에 실제로 등장한 n 만 Citation 으로 변환."""
|
||||
by_n = {e.n: e for e in evidence}
|
||||
out: list[Citation] = []
|
||||
for n in used_citations:
|
||||
e = by_n.get(n)
|
||||
if e is None:
|
||||
continue
|
||||
out.append(
|
||||
Citation(
|
||||
n=e.n,
|
||||
chunk_id=e.chunk_id,
|
||||
doc_id=e.doc_id,
|
||||
title=e.title,
|
||||
section_title=e.section_title,
|
||||
span_text=e.span_text,
|
||||
full_snippet=e.full_snippet,
|
||||
relevance=e.relevance,
|
||||
rerank_score=e.rerank_score,
|
||||
)
|
||||
)
|
||||
return out
|
||||
|
||||
|
||||
def _build_ask_debug(
|
||||
pr: PipelineResult,
|
||||
evidence: list[EvidenceItem],
|
||||
ev_skip: str | None,
|
||||
sr: SynthesisResult,
|
||||
ev_ms: float,
|
||||
synth_ms: float,
|
||||
total_ms: float,
|
||||
) -> AskDebug:
|
||||
timing: dict[str, float] = dict(pr.timing_ms)
|
||||
timing["evidence_ms"] = ev_ms
|
||||
timing["synthesis_ms"] = synth_ms
|
||||
timing["ask_total_ms"] = total_ms
|
||||
|
||||
# candidate count 는 rule filter 통과한 수 (recomputable from results)
|
||||
# 엄밀히는 evidence_service 내부 숫자인데, evidence 길이 ≈ kept, candidate
|
||||
# 는 관측이 어려움 → kept 는 evidence 길이, candidate 는 별도 필드 없음.
|
||||
# 단순화: candidate_count = len(evidence) 를 상한 근사로 둠 (debug 전용).
|
||||
return AskDebug(
|
||||
timing_ms=timing,
|
||||
search_notes=pr.notes,
|
||||
query_analysis=pr.query_analysis,
|
||||
confidence_signal=pr.confidence_signal,
|
||||
evidence_candidate_count=len(evidence),
|
||||
evidence_kept_count=len(evidence),
|
||||
evidence_skip_reason=ev_skip,
|
||||
synthesis_cache_hit=sr.cache_hit,
|
||||
synthesis_prompt_preview=None, # 현재 synthesis_service 에서 노출 안 함
|
||||
synthesis_raw_preview=sr.raw_preview,
|
||||
hallucination_flags=sr.hallucination_flags,
|
||||
)
|
||||
|
||||
|
||||
@router.get("/ask", response_model=AskResponse)
|
||||
async def ask(
|
||||
q: str,
|
||||
user: Annotated[User, Depends(get_current_user)],
|
||||
session: Annotated[AsyncSession, Depends(get_session)],
|
||||
background_tasks: BackgroundTasks,
|
||||
limit: int = Query(10, ge=1, le=20, description="synthesis 입력 상한"),
|
||||
debug: bool = Query(False, description="evidence/synthesis 중간 상태 노출"),
|
||||
):
|
||||
"""근거 기반 AI 답변 (Phase 3.3).
|
||||
|
||||
`/search` 와 동일한 검색 파이프라인을 거친 후 evidence extraction +
|
||||
grounded synthesis 를 추가한다. `mode`, `rerank`, `analyze` 는 품질 보장을
|
||||
위해 강제 고정 (hybrid / True / True).
|
||||
|
||||
실패 경로(timeout/parse_failed/refused/...) 에서도 `results` 는 항상 반환.
|
||||
"""
|
||||
t_total = time.perf_counter()
|
||||
|
||||
# 1. 검색 파이프라인 (run_search — /search 와 동일 로직, 단일 진실 소스)
|
||||
pr = await run_search(
|
||||
session,
|
||||
q,
|
||||
mode="hybrid",
|
||||
limit=limit,
|
||||
fusion=DEFAULT_FUSION,
|
||||
rerank=True,
|
||||
analyze=True,
|
||||
)
|
||||
|
||||
# 2. Evidence extraction (rule + LLM span select, 1 batched call)
|
||||
t_ev = time.perf_counter()
|
||||
evidence, ev_skip = await extract_evidence(q, pr.results)
|
||||
ev_ms = (time.perf_counter() - t_ev) * 1000
|
||||
|
||||
# 3. Grounded synthesis (gemma-4, 15s timeout, citation 검증)
|
||||
t_synth = time.perf_counter()
|
||||
sr = await synthesize(q, evidence, debug=debug)
|
||||
synth_ms = (time.perf_counter() - t_synth) * 1000
|
||||
|
||||
total_ms = (time.perf_counter() - t_total) * 1000
|
||||
|
||||
# 4. 응답 구성
|
||||
citations = _build_citations(evidence, sr.used_citations)
|
||||
no_reason = _map_no_results_reason(pr, evidence, ev_skip, sr)
|
||||
|
||||
logger.info(
|
||||
"ask query=%r results=%d evidence=%d cite=%d synth=%s conf=%s refused=%s ev_ms=%.0f synth_ms=%.0f total=%.0f",
|
||||
q[:80],
|
||||
len(pr.results),
|
||||
len(evidence),
|
||||
len(citations),
|
||||
sr.status,
|
||||
sr.confidence or "-",
|
||||
sr.refused,
|
||||
ev_ms,
|
||||
synth_ms,
|
||||
total_ms,
|
||||
)
|
||||
|
||||
# 5. telemetry — 기존 record_search_event 재사용 (Phase 0.3 호환)
|
||||
background_tasks.add_task(
|
||||
record_search_event,
|
||||
q,
|
||||
user.id,
|
||||
pr.results,
|
||||
"hybrid",
|
||||
pr.confidence_signal,
|
||||
pr.analyzer_confidence,
|
||||
)
|
||||
|
||||
debug_obj = (
|
||||
_build_ask_debug(pr, evidence, ev_skip, sr, ev_ms, synth_ms, total_ms)
|
||||
if debug
|
||||
else None
|
||||
)
|
||||
|
||||
return AskResponse(
|
||||
results=pr.results,
|
||||
ai_answer=sr.answer,
|
||||
citations=citations,
|
||||
synthesis_status=sr.status,
|
||||
synthesis_ms=sr.elapsed_ms,
|
||||
confidence=sr.confidence,
|
||||
refused=sr.refused,
|
||||
no_results_reason=no_reason,
|
||||
query=q,
|
||||
total=len(pr.results),
|
||||
debug=debug_obj,
|
||||
)
|
||||
|
||||
@@ -95,7 +95,8 @@ async def _run_migrations(conn) -> None:
|
||||
applied = {row[0] for row in result}
|
||||
|
||||
# migration 파일 스캔
|
||||
migrations_dir = Path(__file__).resolve().parent.parent.parent / "migrations"
|
||||
# /app/core/database.py → parent.parent = /app → /app/migrations (volume mount 위치)
|
||||
migrations_dir = Path(__file__).resolve().parent.parent / "migrations"
|
||||
if not migrations_dir.is_dir():
|
||||
logger.info("[migration] migrations/ 디렉토리 없음, 스킵")
|
||||
return
|
||||
@@ -114,7 +115,10 @@ async def _run_migrations(conn) -> None:
|
||||
sql = path.read_text(encoding="utf-8")
|
||||
_validate_sql_content(name, sql)
|
||||
logger.info(f"[migration] {name} 실행 중...")
|
||||
await conn.execute(text(sql))
|
||||
# raw driver SQL 사용 — text() 의 :name bind parameter 해석으로
|
||||
# SQL 주석/literal 에 콜론이 들어가면 InvalidRequestError 발생.
|
||||
# exec_driver_sql 은 SQL 을 driver(asyncpg) 에 그대로 전달.
|
||||
await conn.exec_driver_sql(sql)
|
||||
await conn.execute(
|
||||
text("INSERT INTO schema_migrations (version, name) VALUES (:v, :n)"),
|
||||
{"v": version, "n": name},
|
||||
|
||||
16
app/main.py
16
app/main.py
@@ -8,6 +8,7 @@ from sqlalchemy import func, select, text
|
||||
|
||||
from api.auth import router as auth_router
|
||||
from api.dashboard import router as dashboard_router
|
||||
from api.digest import router as digest_router
|
||||
from api.documents import router as documents_router
|
||||
from api.news import router as news_router
|
||||
from api.search import router as search_router
|
||||
@@ -20,9 +21,13 @@ from models.user import User
|
||||
@asynccontextmanager
|
||||
async def lifespan(app: FastAPI):
|
||||
"""앱 시작/종료 시 실행되는 lifespan 핸들러"""
|
||||
import asyncio
|
||||
|
||||
from apscheduler.schedulers.asyncio import AsyncIOScheduler
|
||||
from apscheduler.triggers.cron import CronTrigger
|
||||
from services.search.query_analyzer import prewarm_analyzer
|
||||
from workers.daily_digest import run as daily_digest_run
|
||||
from workers.digest_worker import run as global_digest_run
|
||||
from workers.file_watcher import watch_inbox
|
||||
from workers.law_monitor import run as law_monitor_run
|
||||
from workers.mailplus_archive import run as mailplus_run
|
||||
@@ -51,9 +56,19 @@ async def lifespan(app: FastAPI):
|
||||
scheduler.add_job(mailplus_run, CronTrigger(hour=7), id="mailplus_morning")
|
||||
scheduler.add_job(mailplus_run, CronTrigger(hour=18), id="mailplus_evening")
|
||||
scheduler.add_job(daily_digest_run, CronTrigger(hour=20), id="daily_digest")
|
||||
scheduler.add_job(global_digest_run, CronTrigger(hour=4, minute=0), id="global_digest")
|
||||
scheduler.add_job(news_collector_run, "interval", hours=6, id="news_collector")
|
||||
scheduler.start()
|
||||
|
||||
# Phase 2.1 (async 구조): QueryAnalyzer prewarm.
|
||||
# 대표 쿼리 15~20개를 background task로 분석해 cache 적재.
|
||||
# 첫 사용자 요청부터 cache hit rate 70~80% 목표.
|
||||
# 논블로킹 — startup을 막지 않음. MLX 부하 완화 위해 delay_between=0.5.
|
||||
prewarm_task = asyncio.create_task(prewarm_analyzer())
|
||||
prewarm_task.add_done_callback(
|
||||
lambda t: t.exception() and None # 예외는 query_analyzer 내부에서 로깅
|
||||
)
|
||||
|
||||
yield
|
||||
|
||||
# 종료: 스케줄러 → DB 순서로 정리
|
||||
@@ -76,6 +91,7 @@ app.include_router(search_router, prefix="/api/search", tags=["search"])
|
||||
|
||||
app.include_router(dashboard_router, prefix="/api/dashboard", tags=["dashboard"])
|
||||
app.include_router(news_router, prefix="/api/news", tags=["news"])
|
||||
app.include_router(digest_router, prefix="/api/digest", tags=["digest"])
|
||||
|
||||
# TODO: Phase 5에서 추가
|
||||
# app.include_router(tasks.router, prefix="/api/tasks", tags=["tasks"])
|
||||
|
||||
87
app/models/digest.py
Normal file
87
app/models/digest.py
Normal file
@@ -0,0 +1,87 @@
|
||||
"""global_digests + digest_topics 테이블 ORM (Phase 4)"""
|
||||
|
||||
from datetime import date, datetime
|
||||
|
||||
from sqlalchemy import (
|
||||
BigInteger,
|
||||
Boolean,
|
||||
Date,
|
||||
DateTime,
|
||||
Float,
|
||||
ForeignKey,
|
||||
Integer,
|
||||
String,
|
||||
Text,
|
||||
)
|
||||
from sqlalchemy.dialects.postgresql import JSONB
|
||||
from sqlalchemy.orm import Mapped, mapped_column, relationship
|
||||
|
||||
from core.database import Base
|
||||
|
||||
|
||||
class GlobalDigest(Base):
|
||||
"""하루 단위 digest run 메타데이터"""
|
||||
|
||||
__tablename__ = "global_digests"
|
||||
|
||||
id: Mapped[int] = mapped_column(BigInteger, primary_key=True)
|
||||
digest_date: Mapped[date] = mapped_column(Date, nullable=False, unique=True)
|
||||
window_start: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False)
|
||||
window_end: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False)
|
||||
decay_lambda: Mapped[float] = mapped_column(Float, nullable=False)
|
||||
|
||||
total_articles: Mapped[int] = mapped_column(Integer, nullable=False, default=0)
|
||||
total_countries: Mapped[int] = mapped_column(Integer, nullable=False, default=0)
|
||||
total_topics: Mapped[int] = mapped_column(Integer, nullable=False, default=0)
|
||||
|
||||
generation_ms: Mapped[int | None] = mapped_column(Integer)
|
||||
llm_calls: Mapped[int] = mapped_column(Integer, nullable=False, default=0)
|
||||
llm_failures: Mapped[int] = mapped_column(Integer, nullable=False, default=0)
|
||||
status: Mapped[str] = mapped_column(String(20), nullable=False, default="success")
|
||||
|
||||
created_at: Mapped[datetime] = mapped_column(
|
||||
DateTime(timezone=True), nullable=False, default=datetime.now
|
||||
)
|
||||
|
||||
topics: Mapped[list["DigestTopic"]] = relationship(
|
||||
back_populates="digest",
|
||||
cascade="all, delete-orphan",
|
||||
order_by="DigestTopic.country, DigestTopic.topic_rank",
|
||||
)
|
||||
|
||||
|
||||
class DigestTopic(Base):
|
||||
"""country × topic 단위 cluster 결과"""
|
||||
|
||||
__tablename__ = "digest_topics"
|
||||
|
||||
id: Mapped[int] = mapped_column(BigInteger, primary_key=True)
|
||||
digest_id: Mapped[int] = mapped_column(
|
||||
BigInteger,
|
||||
ForeignKey("global_digests.id", ondelete="CASCADE"),
|
||||
nullable=False,
|
||||
)
|
||||
|
||||
country: Mapped[str] = mapped_column(String(10), nullable=False)
|
||||
topic_rank: Mapped[int] = mapped_column(Integer, nullable=False)
|
||||
|
||||
topic_label: Mapped[str] = mapped_column(Text, nullable=False)
|
||||
summary: Mapped[str] = mapped_column(Text, nullable=False)
|
||||
|
||||
article_ids: Mapped[list] = mapped_column(JSONB, nullable=False)
|
||||
article_count: Mapped[int] = mapped_column(Integer, nullable=False)
|
||||
|
||||
importance_score: Mapped[float] = mapped_column(Float, nullable=False)
|
||||
raw_weight_sum: Mapped[float] = mapped_column(Float, nullable=False)
|
||||
|
||||
centroid_sample: Mapped[dict | None] = mapped_column(JSONB)
|
||||
llm_model: Mapped[str | None] = mapped_column(String(100))
|
||||
llm_fallback_used: Mapped[bool] = mapped_column(
|
||||
Boolean, nullable=False, default=False
|
||||
)
|
||||
|
||||
created_at: Mapped[datetime] = mapped_column(
|
||||
DateTime(timezone=True), nullable=False, default=datetime.now
|
||||
)
|
||||
|
||||
digest: Mapped["GlobalDigest"] = relationship(back_populates="topics")
|
||||
19
app/prompts/digest_topic.txt
Normal file
19
app/prompts/digest_topic.txt
Normal file
@@ -0,0 +1,19 @@
|
||||
너는 팩트 기반 뉴스 토픽 요약 도우미다.
|
||||
아래는 같은 사건으로 군집된 기사들의 ai_summary다.
|
||||
이 정보만으로 다음을 JSON으로만 출력하라.
|
||||
|
||||
절대 금지:
|
||||
- 제공된 summary에 없는 사실 추가
|
||||
- 해석/비교/예측/의견
|
||||
- "보인다", "~할 것이다", "~할 전망" 같은 추측 표현
|
||||
- 인용부호 안 원문 외 단어 생성
|
||||
- JSON 외의 모든 텍스트 (설명, 마크다운, 코드블록 금지)
|
||||
|
||||
출력 형식 (JSON 객체 하나만 출력):
|
||||
{
|
||||
"topic_label": "5~10 단어의 한국어 제목",
|
||||
"summary": "1~2 문장, 사실만, 수동태 허용"
|
||||
}
|
||||
|
||||
기사 요약:
|
||||
{articles_block}
|
||||
76
app/prompts/evidence_extract.txt
Normal file
76
app/prompts/evidence_extract.txt
Normal file
@@ -0,0 +1,76 @@
|
||||
You are an evidence span extractor. Respond ONLY in JSON. No markdown, no explanation.
|
||||
|
||||
## Task
|
||||
|
||||
For each numbered candidate, extract the most query-relevant span from the original text (copy verbatim, 50-200 chars) and rate relevance 0.0~1.0. If the candidate does not directly answer the query, set span=null, relevance=0.0, skip_reason.
|
||||
|
||||
## Output Schema
|
||||
{
|
||||
"items": [
|
||||
{
|
||||
"n": 1,
|
||||
"span": "...",
|
||||
"relevance": 0.0,
|
||||
"skip_reason": null
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
## Rules
|
||||
- `n`: candidate 번호 (1-based, 입력 순서와 동일). **모든 n을 반환** (skip된 것도 포함).
|
||||
- `span`: 원문에서 **그대로 복사한** 50~200자. 요약/변형 금지. 원문에 없는 단어는 절대 포함하지 말 것. 여러 문장이어도 무방.
|
||||
- 관련 span이 없으면 `span: null`, `relevance: 0.0`, `skip_reason`에 한 줄 사유.
|
||||
- `relevance`: 0.0~1.0 float
|
||||
- 0.9+ query에 직접 답함
|
||||
- 0.7~0.9 강한 연관
|
||||
- 0.5~0.7 부분 연관
|
||||
- <0.5 약한/무관 (fallback에서 탈락)
|
||||
- `skip_reason`: span=null 일 때만 필수. 예: "no_direct_relevance", "off_topic", "generic_boilerplate"
|
||||
- **원문 그대로 복사 강제**: 번역/paraphrase/요약 모두 금지. evidence span은 citation 원문이 되어야 한다.
|
||||
|
||||
## Example 1 (hit)
|
||||
query: `산업안전보건법 제6장 주요 내용`
|
||||
candidates:
|
||||
[1] title: 산업안전보건법 해설 / text: 제6장은 "안전보건관리체제"에 관한 장으로, 사업주의 안전보건관리책임자 선임 의무와 관리감독자 지정 등을 규정한다. 제15조부터 제19조까지 구성된다...
|
||||
[2] title: 회사 복지 규정 / text: 직원의 연차휴가 사용 규정과 경조사 지원 내용을 담고 있다...
|
||||
|
||||
→
|
||||
{
|
||||
"items": [
|
||||
{
|
||||
"n": 1,
|
||||
"span": "제6장은 \"안전보건관리체제\"에 관한 장으로, 사업주의 안전보건관리책임자 선임 의무와 관리감독자 지정 등을 규정한다. 제15조부터 제19조까지 구성된다",
|
||||
"relevance": 0.95,
|
||||
"skip_reason": null
|
||||
},
|
||||
{
|
||||
"n": 2,
|
||||
"span": null,
|
||||
"relevance": 0.0,
|
||||
"skip_reason": "off_topic"
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
## Example 2 (partial)
|
||||
query: `Python async best practice`
|
||||
candidates:
|
||||
[1] title: FastAPI tutorial / text: FastAPI supports both async and sync endpoints. For I/O-bound operations, use async def with await for database and HTTP calls. Avoid blocking calls in async functions or use run_in_executor...
|
||||
|
||||
→
|
||||
{
|
||||
"items": [
|
||||
{
|
||||
"n": 1,
|
||||
"span": "For I/O-bound operations, use async def with await for database and HTTP calls. Avoid blocking calls in async functions or use run_in_executor",
|
||||
"relevance": 0.82,
|
||||
"skip_reason": null
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
## Query
|
||||
{query}
|
||||
|
||||
## Candidates
|
||||
{numbered_candidates}
|
||||
53
app/prompts/query_analyze.txt
Normal file
53
app/prompts/query_analyze.txt
Normal file
@@ -0,0 +1,53 @@
|
||||
You are a search query analyzer. Respond ONLY in JSON. No markdown, no explanation.
|
||||
|
||||
## Output Schema
|
||||
{
|
||||
"intent": "fact_lookup | semantic_search | filter_browse",
|
||||
"query_type": "natural_language | keyword | phrase",
|
||||
"domain_hint": "document | news | mixed",
|
||||
"language_scope": "limited | global",
|
||||
"keywords": [],
|
||||
"must_terms": [],
|
||||
"optional_terms": [],
|
||||
"hard_filters": {},
|
||||
"soft_filters": {"domain": [], "document_type": []},
|
||||
"normalized_queries": [{"lang": "ko", "text": "...", "weight": 1.0}],
|
||||
"expanded_terms": [],
|
||||
"synonyms": {},
|
||||
"analyzer_confidence": 0.0
|
||||
}
|
||||
|
||||
## Rules
|
||||
- `intent`: fact_lookup (사실/조항/이름), semantic_search (주제/개념), filter_browse (필터 중심)
|
||||
- `query_type`: natural_language (문장형), keyword (단어 나열), phrase (따옴표/고유명사/법조항)
|
||||
- `domain_hint`: document (소유 문서/법령/매뉴얼), news (시사/뉴스), mixed (불명)
|
||||
- `language_scope`: limited (ko+en), global (다국어 필요)
|
||||
- `hard_filters`: 쿼리에 **명시된** 것만. 추론 금지. 키: file_format, year, country
|
||||
- `soft_filters.domain`: Industrial_Safety, Programming, Engineering, Philosophy, Language, General. 2-level 허용(e.g. Industrial_Safety/Legislation)
|
||||
- `soft_filters.document_type`: Law_Document, Manual, Report, Academic_Paper, Standard, Specification, Meeting_Minutes, Checklist, Note, Memo, Reference, Drawing, Template
|
||||
- `normalized_queries`: 원문 언어 1.0 가중치 필수. 교차언어 1개 추가 권장(ko↔en, weight 0.8). news + global 인 경우만 ja/zh 추가(weight 0.5~0.6). **최대 3개**.
|
||||
- `analyzer_confidence`: 0.9+ 명확, 0.7~0.9 대체로 명확, 0.5~0.7 모호, <0.5 분석 불가
|
||||
|
||||
## Example
|
||||
query: `기계 사고 관련 법령`
|
||||
{
|
||||
"intent": "semantic_search",
|
||||
"query_type": "natural_language",
|
||||
"domain_hint": "document",
|
||||
"language_scope": "limited",
|
||||
"keywords": ["기계", "사고", "법령"],
|
||||
"must_terms": [],
|
||||
"optional_terms": ["안전", "규정"],
|
||||
"hard_filters": {},
|
||||
"soft_filters": {"domain": ["Industrial_Safety/Legislation"], "document_type": ["Law_Document"]},
|
||||
"normalized_queries": [
|
||||
{"lang": "ko", "text": "기계 사고 관련 법령", "weight": 1.0},
|
||||
{"lang": "en", "text": "machinery accident related laws", "weight": 0.8}
|
||||
],
|
||||
"expanded_terms": ["산업안전", "기계안전"],
|
||||
"synonyms": {},
|
||||
"analyzer_confidence": 0.88
|
||||
}
|
||||
|
||||
## Query
|
||||
{query}
|
||||
80
app/prompts/search_synthesis.txt
Normal file
80
app/prompts/search_synthesis.txt
Normal file
@@ -0,0 +1,80 @@
|
||||
You are a grounded answer synthesizer. Respond ONLY in JSON. No markdown, no explanation.
|
||||
|
||||
## Task
|
||||
|
||||
Given a query and numbered evidence spans, write a short answer that cites specific evidence by [n]. **You may only use facts that appear in the evidence.** If the evidence does not directly answer the query, set `refused: true`.
|
||||
|
||||
## Output Schema
|
||||
{
|
||||
"answer": "...",
|
||||
"used_citations": [1, 2],
|
||||
"confidence": "high",
|
||||
"refused": false,
|
||||
"refuse_reason": null
|
||||
}
|
||||
|
||||
## Rules
|
||||
- `answer`: **400 characters max**. Must contain inline `[n]` citations. Every claim sentence ends with at least one `[n]`. Multiple sources: `[1][3]`. **Only use facts present in evidence. No outside knowledge, no guessing, no paraphrasing what is not there.**
|
||||
- `used_citations`: integer list of `n` values that actually appear in `answer` (for cross-check). Must be sorted ascending, no duplicates.
|
||||
- `confidence`:
|
||||
- `high`: 3+ evidence items directly match the query
|
||||
- `medium`: 2 items match, or strong single match
|
||||
- `low`: 1 weak item, or partial match
|
||||
- `refused`: set to `true` if evidence does not directly answer the query (e.g. off-topic, too generic, missing key facts). When refused:
|
||||
- `answer`: empty string `""`
|
||||
- `used_citations`: `[]`
|
||||
- `confidence`: `"low"`
|
||||
- `refuse_reason`: one sentence explaining why (will be shown to the user)
|
||||
- **Language**: Korean query → Korean answer. English query → English answer. Match query language.
|
||||
- **Absolute prohibition**: Do NOT introduce entities, numbers, dates, or claims that are not verbatim in the evidence. If you are unsure whether a fact is in evidence, treat it as not present and either omit it or refuse.
|
||||
|
||||
## Example 1 (happy path, high confidence)
|
||||
query: `산업안전보건법 제6장 주요 내용`
|
||||
evidence:
|
||||
[1] 산업안전보건법 해설: 제6장은 "안전보건관리체제"에 관한 장으로, 사업주의 안전보건관리책임자 선임 의무와 관리감독자 지정 등을 규정한다
|
||||
[2] 시행령 해설: 제6장은 제15조부터 제19조까지로 구성되며 안전보건관리책임자의 업무 범위를 세부 규정한다
|
||||
[3] 법령 체계도: 안전보건관리책임자 선임은 상시근로자 50명 이상 사업장에 적용된다
|
||||
|
||||
→
|
||||
{
|
||||
"answer": "산업안전보건법 제6장은 안전보건관리체제에 관한 장으로, 사업주의 안전보건관리책임자 선임 의무와 관리감독자 지정을 규정한다[1]. 제15조부터 제19조까지 구성되며 관리책임자의 업무 범위를 세부 규정한다[2]. 상시근로자 50명 이상 사업장에 적용된다[3].",
|
||||
"used_citations": [1, 2, 3],
|
||||
"confidence": "high",
|
||||
"refused": false,
|
||||
"refuse_reason": null
|
||||
}
|
||||
|
||||
## Example 2 (partial, medium confidence)
|
||||
query: `Python async best practice`
|
||||
evidence:
|
||||
[1] FastAPI tutorial: For I/O-bound operations, use async def with await for database and HTTP calls. Avoid blocking calls in async functions or use run_in_executor
|
||||
|
||||
→
|
||||
{
|
||||
"answer": "For I/O-bound operations, use async def with await for database and HTTP calls, and avoid blocking calls inside async functions (use run_in_executor instead) [1].",
|
||||
"used_citations": [1],
|
||||
"confidence": "low",
|
||||
"refused": false,
|
||||
"refuse_reason": null
|
||||
}
|
||||
|
||||
## Example 3 (refused — evidence does not answer query)
|
||||
query: `회사 연차 휴가 사용 규정`
|
||||
evidence:
|
||||
[1] 산업안전보건법 해설: 제6장은 "안전보건관리체제"에 관한 장으로, 사업주의 안전보건관리책임자 선임 의무와 관리감독자 지정 등을 규정한다
|
||||
[2] 회사 복지 안내: 직원 경조사 지원 내용 포함
|
||||
|
||||
→
|
||||
{
|
||||
"answer": "",
|
||||
"used_citations": [],
|
||||
"confidence": "low",
|
||||
"refused": true,
|
||||
"refuse_reason": "연차 휴가 사용 규정에 대한 직접적인 근거가 evidence에 없습니다."
|
||||
}
|
||||
|
||||
## Query
|
||||
{query}
|
||||
|
||||
## Evidence
|
||||
{numbered_evidence}
|
||||
1
app/services/digest/__init__.py
Normal file
1
app/services/digest/__init__.py
Normal file
@@ -0,0 +1 @@
|
||||
"""Phase 4 Global Digest 서비스 레이어 — 7일 뉴스 batch clustering + summarization."""
|
||||
118
app/services/digest/clustering.py
Normal file
118
app/services/digest/clustering.py
Normal file
@@ -0,0 +1,118 @@
|
||||
"""Time-decay weight + adaptive threshold + EMA centroid greedy clustering.
|
||||
|
||||
플랜의 핵심 결정:
|
||||
- λ = ln(2)/3 (3일 반감기)
|
||||
- threshold: 0.75 / 0.78 / 0.80 (밀도 기반 adaptive)
|
||||
- centroid: EMA α=0.7 (단순 평균의 seed bias / drift 방어)
|
||||
- min_articles_per_topic = 3, max_topics_per_country = 10
|
||||
- importance_score: country 내 0~1 normalize + max(score, 0.01) floor
|
||||
- raw_weight_sum 별도 보존 (cross-day 트렌드 분석용)
|
||||
"""
|
||||
|
||||
import math
|
||||
from datetime import datetime, timezone
|
||||
|
||||
import numpy as np
|
||||
|
||||
from core.utils import setup_logger
|
||||
|
||||
logger = setup_logger("digest_clustering")
|
||||
|
||||
LAMBDA = math.log(2) / 3 # 3일 반감기 — 사용자 확정값
|
||||
CENTROID_ALPHA = 0.7 # EMA: 기존 중심 70% 유지, 새 멤버 30% 반영
|
||||
MIN_ARTICLES_PER_TOPIC = 3
|
||||
MAX_TOPICS_PER_COUNTRY = 10
|
||||
SCORE_FLOOR = 0.01 # UI 0 표시 문제 사전 차단
|
||||
|
||||
|
||||
def adaptive_threshold(n_docs: int) -> float:
|
||||
"""문서 밀도 기반 동적 threshold — fragmentation/blob 동시 방어."""
|
||||
if n_docs > 200:
|
||||
return 0.80
|
||||
if n_docs < 50:
|
||||
return 0.75
|
||||
return 0.78
|
||||
|
||||
|
||||
def _normalize(v: np.ndarray) -> np.ndarray:
|
||||
norm = float(np.linalg.norm(v))
|
||||
if norm == 0.0:
|
||||
return v
|
||||
return v / norm
|
||||
|
||||
|
||||
def _decay_weight(now: datetime, created_at: datetime) -> float:
|
||||
"""exp(-λ * days_ago). created_at 이 naive 면 UTC 가정."""
|
||||
if created_at.tzinfo is None:
|
||||
created_at = created_at.replace(tzinfo=timezone.utc)
|
||||
days = (now - created_at).total_seconds() / 86400.0
|
||||
if days < 0:
|
||||
days = 0.0
|
||||
return math.exp(-LAMBDA * days)
|
||||
|
||||
|
||||
def cluster_country(country: str, docs: list[dict]) -> list[dict]:
|
||||
"""단일 country 의 docs 를 cluster 로 묶어 정렬 + normalize 후 반환.
|
||||
|
||||
Args:
|
||||
country: 국가 코드 (KR, US, ...)
|
||||
docs: loader.load_news_window 의 출력 (단일 country 슬라이스)
|
||||
|
||||
Returns:
|
||||
[{centroid, members, weight_sum, raw_weight_sum, importance_score}, ...]
|
||||
- members 는 weight 가 채워진 doc dict 리스트
|
||||
- 정렬: importance_score 내림차순, 최대 MAX_TOPICS_PER_COUNTRY 개
|
||||
"""
|
||||
if not docs:
|
||||
logger.info(f"[{country}] docs=0 → skip")
|
||||
return []
|
||||
|
||||
threshold = adaptive_threshold(len(docs))
|
||||
now = datetime.now(timezone.utc)
|
||||
|
||||
# time-decay weight 계산 + 가중치 높은 순으로 seed 우선
|
||||
for d in docs:
|
||||
d["weight"] = _decay_weight(now, d["created_at"])
|
||||
docs.sort(key=lambda d: -d["weight"])
|
||||
|
||||
clusters: list[dict] = []
|
||||
for d in docs:
|
||||
v = _normalize(d["embedding"])
|
||||
best_idx, best_sim = -1, 0.0
|
||||
for i, c in enumerate(clusters):
|
||||
sim = float(np.dot(c["centroid"], v))
|
||||
if sim > best_sim and sim >= threshold:
|
||||
best_sim, best_idx = sim, i
|
||||
if best_idx >= 0:
|
||||
c = clusters[best_idx]
|
||||
# EMA centroid update — drift 방지
|
||||
c["centroid"] = CENTROID_ALPHA * c["centroid"] + (1.0 - CENTROID_ALPHA) * v
|
||||
c["centroid"] = _normalize(c["centroid"])
|
||||
c["members"].append(d)
|
||||
c["weight_sum"] += d["weight"]
|
||||
else:
|
||||
clusters.append({
|
||||
"centroid": v,
|
||||
"members": [d],
|
||||
"weight_sum": d["weight"],
|
||||
})
|
||||
|
||||
raw_count = len(clusters)
|
||||
clusters = [c for c in clusters if len(c["members"]) >= MIN_ARTICLES_PER_TOPIC]
|
||||
dropped = raw_count - len(clusters)
|
||||
clusters.sort(key=lambda c: -c["weight_sum"])
|
||||
clusters = clusters[:MAX_TOPICS_PER_COUNTRY]
|
||||
|
||||
# country 내 normalize (0~1) + floor
|
||||
if clusters:
|
||||
max_w = max(c["weight_sum"] for c in clusters)
|
||||
for c in clusters:
|
||||
normalized = (c["weight_sum"] / max_w) if max_w > 0 else 0.0
|
||||
c["raw_weight_sum"] = c["weight_sum"]
|
||||
c["importance_score"] = max(normalized, SCORE_FLOOR)
|
||||
|
||||
logger.info(
|
||||
f"[{country}] docs={len(docs)} threshold={threshold} "
|
||||
f"raw_clusters={raw_count} dropped={dropped} kept={len(clusters)}"
|
||||
)
|
||||
return clusters
|
||||
149
app/services/digest/loader.py
Normal file
149
app/services/digest/loader.py
Normal file
@@ -0,0 +1,149 @@
|
||||
"""뉴스 7일 window 로드 + country 정규화
|
||||
|
||||
- documents 테이블엔 country 컬럼이 없으므로 document_chunks.country 를 first non-null 로 조인.
|
||||
- chunk-level country 도 NULL 이면 news_sources.name prefix(ai_sub_group) 매칭으로 fallback.
|
||||
- 그래도 NULL 이면 drop(로그 경고).
|
||||
- ai_summary / embedding 이 NULL 이면 처음부터 제외 (재요약/재임베딩 0회 원칙).
|
||||
"""
|
||||
|
||||
from collections import defaultdict
|
||||
from datetime import datetime
|
||||
from typing import Any
|
||||
|
||||
import numpy as np
|
||||
from sqlalchemy import text
|
||||
|
||||
from core.database import async_session
|
||||
from core.utils import setup_logger
|
||||
|
||||
logger = setup_logger("digest_loader")
|
||||
|
||||
|
||||
_NEWS_WINDOW_SQL = text("""
|
||||
SELECT
|
||||
d.id,
|
||||
d.title,
|
||||
d.ai_summary,
|
||||
d.embedding,
|
||||
d.created_at,
|
||||
d.edit_url,
|
||||
d.ai_sub_group,
|
||||
(
|
||||
SELECT c.country
|
||||
FROM document_chunks c
|
||||
WHERE c.doc_id = d.id AND c.country IS NOT NULL
|
||||
LIMIT 1
|
||||
) AS chunk_country
|
||||
FROM documents d
|
||||
WHERE d.source_channel = 'news'
|
||||
AND d.deleted_at IS NULL
|
||||
AND d.created_at >= :window_start
|
||||
AND d.created_at < :window_end
|
||||
AND d.embedding IS NOT NULL
|
||||
AND d.ai_summary IS NOT NULL
|
||||
""")
|
||||
|
||||
|
||||
_SOURCE_COUNTRY_SQL = text("""
|
||||
SELECT name, country FROM news_sources WHERE country IS NOT NULL
|
||||
""")
|
||||
|
||||
|
||||
def _to_numpy_embedding(raw: Any) -> np.ndarray | None:
|
||||
"""pgvector 컬럼을 numpy array(float32)로 정규화.
|
||||
|
||||
raw SQL + asyncpg 조합에서 pgvector type 이 등록 안 되어 있으면
|
||||
embedding 이 '[0.1,0.2,...]' 같은 string 으로 반환된다. ORM 을 안 쓰므로
|
||||
이 경우 직접 파싱해야 한다.
|
||||
"""
|
||||
if raw is None:
|
||||
return None
|
||||
if isinstance(raw, str):
|
||||
import json
|
||||
try:
|
||||
raw = json.loads(raw)
|
||||
except json.JSONDecodeError:
|
||||
return None
|
||||
try:
|
||||
arr = np.asarray(raw, dtype=np.float32)
|
||||
except (TypeError, ValueError):
|
||||
return None
|
||||
if arr.size == 0:
|
||||
return None
|
||||
return arr
|
||||
|
||||
|
||||
async def _load_source_country_map(session) -> dict[str, str]:
|
||||
"""news_sources name → country 매핑.
|
||||
|
||||
name 은 '경향신문 문화' 형태이고 documents.ai_sub_group 은 '경향신문' (split[0]).
|
||||
prefix 매칭이 가능하도록 첫 토큰 → country 로 인덱싱.
|
||||
"""
|
||||
rows = await session.execute(_SOURCE_COUNTRY_SQL)
|
||||
mapping: dict[str, str] = {}
|
||||
for name, country in rows:
|
||||
if not name or not country:
|
||||
continue
|
||||
prefix = name.split(" ")[0].strip()
|
||||
if prefix and prefix not in mapping:
|
||||
mapping[prefix] = country
|
||||
return mapping
|
||||
|
||||
|
||||
async def load_news_window(
|
||||
window_start: datetime,
|
||||
window_end: datetime,
|
||||
) -> dict[str, list[dict]]:
|
||||
"""주어진 윈도우 안의 뉴스 documents 를 country 별 dict 로 반환.
|
||||
|
||||
Returns:
|
||||
{"KR": [doc_dict, ...], "US": [...], ...}
|
||||
"""
|
||||
docs_by_country: dict[str, list[dict]] = defaultdict(list)
|
||||
null_country_count = 0
|
||||
total = 0
|
||||
|
||||
async with async_session() as session:
|
||||
source_country = await _load_source_country_map(session)
|
||||
|
||||
result = await session.execute(
|
||||
_NEWS_WINDOW_SQL,
|
||||
{"window_start": window_start, "window_end": window_end},
|
||||
)
|
||||
for row in result.mappings():
|
||||
embedding = _to_numpy_embedding(row["embedding"])
|
||||
if embedding is None:
|
||||
continue
|
||||
|
||||
country = row["chunk_country"]
|
||||
if not country:
|
||||
# news_sources prefix fallback
|
||||
ai_sub_group = (row["ai_sub_group"] or "").strip()
|
||||
if ai_sub_group:
|
||||
country = source_country.get(ai_sub_group)
|
||||
if not country:
|
||||
null_country_count += 1
|
||||
continue
|
||||
|
||||
country = country.upper()
|
||||
docs_by_country[country].append({
|
||||
"id": int(row["id"]),
|
||||
"title": row["title"] or "",
|
||||
"ai_summary": row["ai_summary"] or "",
|
||||
"embedding": embedding,
|
||||
"created_at": row["created_at"],
|
||||
"edit_url": row["edit_url"] or "",
|
||||
"ai_sub_group": row["ai_sub_group"] or "",
|
||||
})
|
||||
total += 1
|
||||
|
||||
if null_country_count:
|
||||
logger.warning(
|
||||
f"[loader] country 분류 실패로 drop된 문서 {null_country_count}건 "
|
||||
f"(chunk_country + news_sources fallback 모두 실패)"
|
||||
)
|
||||
logger.info(
|
||||
f"[loader] window {window_start.date()} ~ {window_end.date()} → "
|
||||
f"{total}건 ({len(docs_by_country)}개 국가)"
|
||||
)
|
||||
return dict(docs_by_country)
|
||||
177
app/services/digest/pipeline.py
Normal file
177
app/services/digest/pipeline.py
Normal file
@@ -0,0 +1,177 @@
|
||||
"""Phase 4 digest pipeline orchestration.
|
||||
|
||||
Step:
|
||||
1. AIClient 생성
|
||||
2. 7일 window 로 documents 로드 (loader)
|
||||
3. country 별 cluster_country (clustering)
|
||||
4. cluster 별 select_for_llm (selection)
|
||||
5. cluster 별 summarize_cluster_with_fallback (summarizer, LLM)
|
||||
6. DELETE+INSERT 단일 트랜잭션 (idempotent)
|
||||
7. start/end 로그 + generation_ms + fallback 비율 health metric
|
||||
"""
|
||||
|
||||
import hashlib
|
||||
import time
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from zoneinfo import ZoneInfo
|
||||
|
||||
from sqlalchemy import delete
|
||||
|
||||
from ai.client import AIClient
|
||||
from core.database import async_session
|
||||
from core.utils import setup_logger
|
||||
from models.digest import DigestTopic, GlobalDigest
|
||||
|
||||
from .clustering import LAMBDA, cluster_country
|
||||
from .loader import load_news_window
|
||||
from .selection import select_for_llm
|
||||
from .summarizer import summarize_cluster_with_fallback
|
||||
|
||||
logger = setup_logger("digest_pipeline")
|
||||
|
||||
WINDOW_DAYS = 7
|
||||
KST = ZoneInfo("Asia/Seoul")
|
||||
|
||||
|
||||
def _kst_today() -> datetime:
|
||||
return datetime.now(KST).date()
|
||||
|
||||
|
||||
def _summary_hash(text: str) -> str:
|
||||
return hashlib.sha256((text or "").encode("utf-8")).hexdigest()[:16]
|
||||
|
||||
|
||||
def _build_topic_row(
|
||||
country: str,
|
||||
rank: int,
|
||||
cluster: dict,
|
||||
selected: list[dict],
|
||||
llm_result: dict,
|
||||
primary_model: str,
|
||||
) -> DigestTopic:
|
||||
"""LLM 결과 + cluster 메타 → DigestTopic ORM 인스턴스.
|
||||
|
||||
article_ids 는 코드가 cluster.members 에서 직접 주입 (LLM 생성 금지 → id 위조 불가).
|
||||
"""
|
||||
article_ids = [int(m["id"]) for m in cluster["members"]]
|
||||
centroid_sample = {
|
||||
"selected_doc_ids": [int(m["id"]) for m in selected],
|
||||
"summary_hashes": [_summary_hash(m.get("ai_summary") or "") for m in selected],
|
||||
}
|
||||
return DigestTopic(
|
||||
country=country,
|
||||
topic_rank=rank,
|
||||
topic_label=llm_result["topic_label"],
|
||||
summary=llm_result["summary"],
|
||||
article_ids=article_ids,
|
||||
article_count=len(article_ids),
|
||||
importance_score=float(cluster["importance_score"]),
|
||||
raw_weight_sum=float(cluster["raw_weight_sum"]),
|
||||
centroid_sample=centroid_sample,
|
||||
llm_model=primary_model,
|
||||
llm_fallback_used=bool(llm_result["llm_fallback_used"]),
|
||||
)
|
||||
|
||||
|
||||
async def run_digest_pipeline() -> dict:
|
||||
"""전체 파이프라인 실행. worker entry 에서 호출.
|
||||
|
||||
Returns:
|
||||
실행 통계 dict {llm_calls, fallback_used, total_topics, generation_ms}
|
||||
"""
|
||||
start = time.time()
|
||||
|
||||
window_end = datetime.now(timezone.utc)
|
||||
window_start = window_end - timedelta(days=WINDOW_DAYS)
|
||||
digest_date = _kst_today()
|
||||
|
||||
logger.info(
|
||||
f"[global_digest] start window={window_start.date()} ~ {window_end.date()} "
|
||||
f"digest_date={digest_date} decay_lambda={LAMBDA:.4f}"
|
||||
)
|
||||
|
||||
docs_by_country = await load_news_window(window_start, window_end)
|
||||
if not docs_by_country:
|
||||
logger.warning("[global_digest] 7일 window에 뉴스 0건 — digest 생성 스킵")
|
||||
return {
|
||||
"llm_calls": 0,
|
||||
"fallback_used": 0,
|
||||
"total_topics": 0,
|
||||
"generation_ms": int((time.time() - start) * 1000),
|
||||
}
|
||||
|
||||
client = AIClient()
|
||||
primary_model = client.ai.primary.model
|
||||
|
||||
all_topic_rows: list[DigestTopic] = []
|
||||
stats = {"llm_calls": 0, "fallback_used": 0}
|
||||
|
||||
try:
|
||||
for country, docs in docs_by_country.items():
|
||||
clusters = cluster_country(country, docs)
|
||||
if not clusters:
|
||||
continue # sparse country 자동 제외
|
||||
|
||||
for rank, cluster in enumerate(clusters, start=1):
|
||||
selected = select_for_llm(cluster)
|
||||
stats["llm_calls"] += 1
|
||||
llm_result = await summarize_cluster_with_fallback(client, cluster, selected)
|
||||
if llm_result["llm_fallback_used"]:
|
||||
stats["fallback_used"] += 1
|
||||
all_topic_rows.append(
|
||||
_build_topic_row(country, rank, cluster, selected, llm_result, primary_model)
|
||||
)
|
||||
finally:
|
||||
await client.close()
|
||||
|
||||
generation_ms = int((time.time() - start) * 1000)
|
||||
total_articles = sum(len(d) for d in docs_by_country.values())
|
||||
countries_with_topics = len({r.country for r in all_topic_rows})
|
||||
|
||||
if stats["fallback_used"] == 0:
|
||||
status = "success"
|
||||
elif stats["llm_calls"] and stats["fallback_used"] / stats["llm_calls"] > 0.5:
|
||||
status = "failed"
|
||||
else:
|
||||
status = "partial"
|
||||
|
||||
async with async_session() as session:
|
||||
# idempotent: 같은 날짜 row 가 있으면 CASCADE 로 topics 까지 삭제
|
||||
await session.execute(
|
||||
delete(GlobalDigest).where(GlobalDigest.digest_date == digest_date)
|
||||
)
|
||||
new_digest = GlobalDigest(
|
||||
digest_date=digest_date,
|
||||
window_start=window_start,
|
||||
window_end=window_end,
|
||||
decay_lambda=LAMBDA,
|
||||
total_articles=total_articles,
|
||||
total_countries=countries_with_topics,
|
||||
total_topics=len(all_topic_rows),
|
||||
generation_ms=generation_ms,
|
||||
llm_calls=stats["llm_calls"],
|
||||
llm_failures=stats["fallback_used"],
|
||||
status=status,
|
||||
)
|
||||
new_digest.topics = all_topic_rows
|
||||
session.add(new_digest)
|
||||
await session.commit()
|
||||
|
||||
fallback_pct = (
|
||||
(stats["fallback_used"] / stats["llm_calls"] * 100.0)
|
||||
if stats["llm_calls"] else 0.0
|
||||
)
|
||||
logger.info(
|
||||
f"[global_digest] done countries={countries_with_topics} "
|
||||
f"topics={len(all_topic_rows)} llm_calls={stats['llm_calls']} "
|
||||
f"fallback={stats['fallback_used']}/{stats['llm_calls']} ({fallback_pct:.2f}%) "
|
||||
f"status={status} elapsed={generation_ms / 1000:.1f}s"
|
||||
)
|
||||
|
||||
return {
|
||||
"llm_calls": stats["llm_calls"],
|
||||
"fallback_used": stats["fallback_used"],
|
||||
"total_topics": len(all_topic_rows),
|
||||
"generation_ms": generation_ms,
|
||||
"status": status,
|
||||
}
|
||||
62
app/services/digest/selection.py
Normal file
62
app/services/digest/selection.py
Normal file
@@ -0,0 +1,62 @@
|
||||
"""Cluster 내 LLM 입력 선정 — top-k + MMR diversity + ai_summary truncate.
|
||||
|
||||
순수 top-relevance 는 동일 사건 중복 요약문에 편향되므로 MMR 로 다양성 확보.
|
||||
ai_summary 길이는 LLM 토큰 보호를 위해 SUMMARY_TRUNCATE 로 제한.
|
||||
"""
|
||||
|
||||
import numpy as np
|
||||
|
||||
K_PER_CLUSTER = 5
|
||||
LAMBDA_MMR = 0.7 # relevance 70% / diversity 30%
|
||||
SUMMARY_TRUNCATE = 300 # long tail ai_summary 방어
|
||||
|
||||
|
||||
def _normalize(v: np.ndarray) -> np.ndarray:
|
||||
norm = float(np.linalg.norm(v))
|
||||
if norm == 0.0:
|
||||
return v
|
||||
return v / norm
|
||||
|
||||
|
||||
def select_for_llm(cluster: dict, k: int = K_PER_CLUSTER) -> list[dict]:
|
||||
"""cluster 내 LLM 호출용 대표 article 들 선정.
|
||||
|
||||
Args:
|
||||
cluster: clustering.cluster_country 결과 단일 cluster
|
||||
k: 선정 개수 (기본 5)
|
||||
|
||||
Returns:
|
||||
선정된 doc dict 리스트. 각 항목에 ai_summary_truncated 필드가 추가됨.
|
||||
"""
|
||||
members = cluster["members"]
|
||||
if len(members) <= k:
|
||||
selected = list(members)
|
||||
else:
|
||||
centroid = cluster["centroid"]
|
||||
# relevance = centroid 유사도 × decay weight
|
||||
for m in members:
|
||||
v = _normalize(m["embedding"])
|
||||
m["_rel"] = float(np.dot(centroid, v)) * m["weight"]
|
||||
|
||||
first = max(members, key=lambda x: x["_rel"])
|
||||
selected = [first]
|
||||
candidates = [m for m in members if m is not first]
|
||||
|
||||
while len(selected) < k and candidates:
|
||||
def mmr_score(c: dict) -> float:
|
||||
v = _normalize(c["embedding"])
|
||||
max_sim = max(
|
||||
float(np.dot(v, _normalize(s["embedding"])))
|
||||
for s in selected
|
||||
)
|
||||
return LAMBDA_MMR * c["_rel"] - (1.0 - LAMBDA_MMR) * max_sim
|
||||
|
||||
pick = max(candidates, key=mmr_score)
|
||||
selected.append(pick)
|
||||
candidates.remove(pick)
|
||||
|
||||
# LLM 입력 토큰 보호
|
||||
for m in selected:
|
||||
m["ai_summary_truncated"] = (m.get("ai_summary") or "")[:SUMMARY_TRUNCATE]
|
||||
|
||||
return selected
|
||||
123
app/services/digest/summarizer.py
Normal file
123
app/services/digest/summarizer.py
Normal file
@@ -0,0 +1,123 @@
|
||||
"""Cluster-level LLM 호출 + JSON 파싱 + timeout + drop금지 fallback.
|
||||
|
||||
핵심 결정:
|
||||
- AIClient._call_chat 직접 호출 (client.py 수정 회피, fallback 로직 재사용)
|
||||
- Semaphore(1) 로 MLX 과부하 회피
|
||||
- Per-call timeout 25초 (asyncio.wait_for) — MLX hang/Ollama stall 방어
|
||||
- JSON 파싱 실패 → 1회 재시도 → 그래도 실패 시 minimal fallback (drop 금지)
|
||||
- fallback: topic_label="주요 뉴스 묶음", summary = top member ai_summary[:200]
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
from ai.client import parse_json_response
|
||||
from core.utils import setup_logger
|
||||
|
||||
logger = setup_logger("digest_summarizer")
|
||||
|
||||
LLM_CALL_TIMEOUT = 25 # 초. MLX 평균 5초 + tail latency 마진
|
||||
FALLBACK_SUMMARY_LIMIT = 200
|
||||
|
||||
_llm_sem = asyncio.Semaphore(1)
|
||||
|
||||
_PROMPT_PATH = Path(__file__).resolve().parent.parent.parent / "prompts" / "digest_topic.txt"
|
||||
_PROMPT_TEMPLATE: str | None = None
|
||||
|
||||
|
||||
def _load_prompt() -> str:
|
||||
global _PROMPT_TEMPLATE
|
||||
if _PROMPT_TEMPLATE is None:
|
||||
_PROMPT_TEMPLATE = _PROMPT_PATH.read_text(encoding="utf-8")
|
||||
return _PROMPT_TEMPLATE
|
||||
|
||||
|
||||
def build_prompt(selected: list[dict]) -> str:
|
||||
"""digest_topic.txt 템플릿에 selected article들의 ai_summary_truncated 주입.
|
||||
|
||||
템플릿 placeholder: {articles_block}
|
||||
"""
|
||||
template = _load_prompt()
|
||||
lines = []
|
||||
for i, m in enumerate(selected, start=1):
|
||||
text = (m.get("ai_summary_truncated") or m.get("ai_summary") or m.get("title") or "").strip()
|
||||
lines.append(f"[{i}] {text}")
|
||||
articles_block = "\n".join(lines)
|
||||
return template.replace("{articles_block}", articles_block)
|
||||
|
||||
|
||||
async def _try_call_llm(client: Any, prompt: str) -> str:
|
||||
"""Semaphore + per-call timeout 으로 감싼 단일 호출."""
|
||||
async with _llm_sem:
|
||||
return await asyncio.wait_for(
|
||||
client._call_chat(client.ai.primary, prompt),
|
||||
timeout=LLM_CALL_TIMEOUT,
|
||||
)
|
||||
|
||||
|
||||
def _make_fallback(cluster: dict) -> dict:
|
||||
"""cluster 의 top member 데이터로 minimal fallback 생성 — 정보 손실 회피."""
|
||||
members = cluster["members"]
|
||||
if not members:
|
||||
return {
|
||||
"topic_label": "주요 뉴스 묶음",
|
||||
"summary": "",
|
||||
"llm_fallback_used": True,
|
||||
}
|
||||
top = max(members, key=lambda m: m.get("_rel", m.get("weight", 0.0)))
|
||||
text = (top.get("ai_summary") or top.get("title") or "").strip()
|
||||
return {
|
||||
"topic_label": "주요 뉴스 묶음",
|
||||
"summary": text[:FALLBACK_SUMMARY_LIMIT],
|
||||
"llm_fallback_used": True,
|
||||
}
|
||||
|
||||
|
||||
async def summarize_cluster_with_fallback(
|
||||
client: Any,
|
||||
cluster: dict,
|
||||
selected: list[dict],
|
||||
) -> dict:
|
||||
"""cluster 1개에 대해 LLM 호출 + JSON 파싱 + fallback.
|
||||
|
||||
Returns:
|
||||
{topic_label, summary, llm_fallback_used}
|
||||
"""
|
||||
prompt = build_prompt(selected)
|
||||
|
||||
for attempt in range(2): # 1회 재시도 포함
|
||||
try:
|
||||
raw = await _try_call_llm(client, prompt)
|
||||
except asyncio.TimeoutError:
|
||||
logger.warning(
|
||||
f"LLM 호출 timeout {LLM_CALL_TIMEOUT}s "
|
||||
f"(attempt={attempt + 1}, cluster size={len(cluster['members'])})"
|
||||
)
|
||||
continue
|
||||
except Exception as e:
|
||||
logger.warning(
|
||||
f"LLM 호출 실패 attempt={attempt + 1} "
|
||||
f"(cluster size={len(cluster['members'])}): {e}"
|
||||
)
|
||||
continue
|
||||
|
||||
parsed = parse_json_response(raw)
|
||||
if (
|
||||
parsed
|
||||
and isinstance(parsed.get("topic_label"), str)
|
||||
and isinstance(parsed.get("summary"), str)
|
||||
and parsed["topic_label"].strip()
|
||||
and parsed["summary"].strip()
|
||||
):
|
||||
return {
|
||||
"topic_label": parsed["topic_label"].strip(),
|
||||
"summary": parsed["summary"].strip(),
|
||||
"llm_fallback_used": False,
|
||||
}
|
||||
logger.warning(
|
||||
f"JSON 파싱 실패 attempt={attempt + 1} "
|
||||
f"(cluster size={len(cluster['members'])}, raw_len={len(raw) if raw else 0})"
|
||||
)
|
||||
|
||||
return _make_fallback(cluster)
|
||||
@@ -1,5 +1,407 @@
|
||||
"""Evidence extraction 서비스 (Phase 3).
|
||||
"""Evidence extraction 서비스 (Phase 3.2).
|
||||
|
||||
reranked chunks에서 query-relevant span을 rule + LLM hybrid로 추출.
|
||||
구현은 Phase 3에서 채움.
|
||||
reranker 결과 chunks 에서 query-relevant span 을 구조적으로 추출한다.
|
||||
|
||||
## 설계 (EV-A: Rule + LLM span select)
|
||||
|
||||
```
|
||||
reranked results
|
||||
↓
|
||||
[rule filter] score >= 0.25, max_per_doc=2, top MAX_EVIDENCE_CANDIDATES
|
||||
↓
|
||||
[snippet 재윈도우] _extract_window(full, query, 800) — LLM 입력용
|
||||
↓
|
||||
[1 batched LLM call] gemma-4 via get_mlx_gate() (single inference)
|
||||
↓
|
||||
[post-process]
|
||||
- relevance >= 0.5 필터
|
||||
- span too-short (< 80자) → _extract_window(full, query, 120) 로 재확장
|
||||
- span too-long (> 300자) → cut
|
||||
- doc-group ordering (검색 결과 doc 순서 유지, doc 내부만 relevance desc)
|
||||
- n 재부여 (1..N)
|
||||
↓
|
||||
EvidenceItem 리스트
|
||||
```
|
||||
|
||||
## 영구 룰
|
||||
|
||||
- **LLM 호출은 1번만** (batched). 순차 호출 절대 금지 — MLX single-inference
|
||||
큐가 폭발한다.
|
||||
- **모든 MLX 호출은 `get_mlx_gate()` 경유**. analyzer / synthesis 와 동일
|
||||
semaphore 공유.
|
||||
- **fallback span 도 query 중심 window**. `full_snippet[:200]` 같은 "앞에서부터
|
||||
자르기" 절대 금지. 조용한 품질 붕괴 (citation 은 멀쩡한데 실제 span 이 query
|
||||
와 무관) 대표 사례.
|
||||
- **Span too-short 보정 필수**: `len(span) < 80` 이면 자동 확장. "짧을수록
|
||||
정확" 이 아니라 **짧으면 위험** — synthesis LLM 이 문맥 부족으로 이어 만들기
|
||||
(soft hallucination) 를 한다.
|
||||
- **Evidence ordering 은 doc-group 유지**. 전역 relevance desc 정렬 금지.
|
||||
answer 는 [1][2][3] 순서로 생성되고 그 순서가 문맥 흐름을 결정한다.
|
||||
|
||||
## 확장 여지 (지금은 비활성)
|
||||
|
||||
`EVIDENCE_FAST_PATH_THRESHOLD` 가 `None` 이 아니고 `results[0].rerank_score >=
|
||||
THRESHOLD` 이면 LLM 호출 스킵 후 rule-only 경로로 즉시 반환. Activation 조건:
|
||||
(1) evidence LLM 호출 비율 > 80%, (2) /ask 평균 latency > 15s, (3) rerank
|
||||
top1 p50 > 0.75. 셋 다 충족해야 켠다.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import time
|
||||
from dataclasses import dataclass, field
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from ai.client import AIClient, _load_prompt, parse_json_response
|
||||
from core.utils import setup_logger
|
||||
|
||||
from .llm_gate import get_mlx_gate
|
||||
from .rerank_service import _extract_window
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from api.search import SearchResult
|
||||
|
||||
logger = setup_logger("evidence")
|
||||
|
||||
# ─── 상수 (plan 영구 룰) ─────────────────────────────────
|
||||
EVIDENCE_MIN_RERANK = 0.25 # 1차 rule cut — rerank score 이 미만은 제외
|
||||
MAX_EVIDENCE_CANDIDATES = 6 # LLM 입력 상한
|
||||
MAX_PER_DOC = 2
|
||||
CANDIDATE_SNIPPET_CHARS = 800 # LLM 이 볼 원문 창 크기
|
||||
|
||||
MIN_RELEVANCE_KEEP = 0.5 # LLM 출력 필터
|
||||
SPAN_MIN_CHARS = 80 # 이 미만이면 window enlarge
|
||||
SPAN_ENLARGE_TARGET = 120 # enlarge 시 재윈도우 target_chars
|
||||
SPAN_MAX_CHARS = 300 # 이 초과면 cut (synthesis token budget 보호)
|
||||
|
||||
LLM_TIMEOUT_MS = 15000
|
||||
PROMPT_VERSION = "v1"
|
||||
|
||||
# 확장 여지 — None 이면 비활성 (baseline). 실측 후 0.8 등으로 켠다.
|
||||
EVIDENCE_FAST_PATH_THRESHOLD: float | None = None
|
||||
|
||||
|
||||
# ─── 반환 타입 ───────────────────────────────────────────
|
||||
|
||||
|
||||
@dataclass(slots=True)
|
||||
class EvidenceItem:
|
||||
"""LLM 또는 rule fallback 이 추출한 단일 evidence span.
|
||||
|
||||
n 은 doc-group ordering + relevance 정렬 후 1부터 재부여된다.
|
||||
`full_snippet` 은 **synthesis 프롬프트에 절대 포함 금지** — debug / citation
|
||||
원문 보기 전용.
|
||||
"""
|
||||
|
||||
n: int # 1-based, synthesis 프롬프트의 [n] 과 매핑
|
||||
chunk_id: int | None
|
||||
doc_id: int
|
||||
title: str | None
|
||||
section_title: str | None
|
||||
span_text: str # LLM 추출 (또는 rule fallback) span, 80~300자
|
||||
relevance: float # LLM 0~1 (fallback 시 rerank_score 복사)
|
||||
rerank_score: float # raw reranker 점수
|
||||
full_snippet: str # 원본 800자 (debug/citation 전용, synthesis 금지)
|
||||
|
||||
|
||||
# ─── 프롬프트 로딩 (module 초기화 1회) ───────────────────
|
||||
try:
|
||||
EVIDENCE_PROMPT = _load_prompt("evidence_extract.txt")
|
||||
except FileNotFoundError:
|
||||
EVIDENCE_PROMPT = ""
|
||||
logger.warning(
|
||||
"evidence_extract.txt not found — evidence_service will always use rule-only fallback"
|
||||
)
|
||||
|
||||
|
||||
# ─── Helper: candidates → LLM 입력 블록 ──────────────────
|
||||
|
||||
|
||||
def _build_numbered_candidates(
|
||||
candidates: list["SearchResult"], query: str
|
||||
) -> tuple[str, list[str]]:
|
||||
"""LLM 프롬프트의 {numbered_candidates} 블록 + 재윈도우된 full_snippet 리스트.
|
||||
|
||||
Returns:
|
||||
(block_str, full_snippets) — full_snippets[i] 는 1-based n=i+1 의 원문
|
||||
"""
|
||||
lines: list[str] = []
|
||||
full_snippets: list[str] = []
|
||||
for i, c in enumerate(candidates, 1):
|
||||
title = (c.title or "").strip()
|
||||
raw_text = c.snippet or ""
|
||||
full = _extract_window(raw_text, query, target_chars=CANDIDATE_SNIPPET_CHARS)
|
||||
full_snippets.append(full)
|
||||
lines.append(f"[{i}] title: {title} / text: {full}")
|
||||
return "\n".join(lines), full_snippets
|
||||
|
||||
|
||||
# ─── Helper: span length 보정 ───────────────────────────
|
||||
|
||||
|
||||
def _normalize_span(span: str, full: str, query: str) -> tuple[str, bool]:
|
||||
"""span 을 SPAN_MIN_CHARS ~ SPAN_MAX_CHARS 범위로 보정.
|
||||
|
||||
Returns:
|
||||
(normalized_span, was_expanded)
|
||||
- was_expanded=True 이면 "short_span_expanded" 로그 대상
|
||||
"""
|
||||
s = (span or "").strip()
|
||||
expanded = False
|
||||
if len(s) < SPAN_MIN_CHARS:
|
||||
# soft hallucination 방어 — query 중심으로 window 재확장
|
||||
s = _extract_window(full, query, target_chars=SPAN_ENLARGE_TARGET)
|
||||
expanded = True
|
||||
if len(s) > SPAN_MAX_CHARS:
|
||||
s = s[:SPAN_MAX_CHARS]
|
||||
return s, expanded
|
||||
|
||||
|
||||
# ─── Helper: doc-group ordering ─────────────────────────
|
||||
|
||||
|
||||
def _apply_doc_group_ordering(
|
||||
items: list[EvidenceItem],
|
||||
results: list["SearchResult"],
|
||||
) -> list[EvidenceItem]:
|
||||
"""검색 결과 doc 순서 유지 + doc 내부만 relevance desc + n 재부여.
|
||||
|
||||
answer 는 [1][2][3] 순서로 생성되고 그 순서가 문맥 흐름을 결정한다.
|
||||
전역 relevance desc 정렬은 "doc A span1 → doc B span1 → doc A span2"
|
||||
처럼 튀면서 읽기 이상한 답변을 만든다.
|
||||
"""
|
||||
if not items:
|
||||
return []
|
||||
doc_order: dict[int, int] = {}
|
||||
for idx, r in enumerate(results):
|
||||
if r.id not in doc_order:
|
||||
doc_order[r.id] = idx
|
||||
# 정렬: (doc 순서, -relevance)
|
||||
items.sort(
|
||||
key=lambda it: (doc_order.get(it.doc_id, 9999), -it.relevance)
|
||||
)
|
||||
# n 재부여
|
||||
for new_n, it in enumerate(items, 1):
|
||||
it.n = new_n
|
||||
return items
|
||||
|
||||
|
||||
# ─── Helper: rule-only fallback ─────────────────────────
|
||||
|
||||
|
||||
def _build_rule_only_evidence(
|
||||
candidates: list["SearchResult"],
|
||||
full_snippets: list[str],
|
||||
query: str,
|
||||
) -> list[EvidenceItem]:
|
||||
"""LLM 실패/timeout 시 rule-only 경로.
|
||||
|
||||
⚠ `full_snippet[:200]` 같은 앞자르기 금지. 반드시 `_extract_window` 로
|
||||
query 중심 윈도우를 만든다. relevance 는 rerank_score 복사.
|
||||
"""
|
||||
items: list[EvidenceItem] = []
|
||||
for i, (c, full) in enumerate(zip(candidates, full_snippets), 1):
|
||||
span = _extract_window(full, query, target_chars=200)
|
||||
# 정규화 (보통 여기서는 SPAN_MIN_CHARS 이상이지만 안전장치)
|
||||
span, _expanded = _normalize_span(span, full, query)
|
||||
items.append(
|
||||
EvidenceItem(
|
||||
n=i,
|
||||
chunk_id=c.chunk_id,
|
||||
doc_id=c.id,
|
||||
title=c.title,
|
||||
section_title=c.section_title,
|
||||
span_text=span,
|
||||
relevance=float(c.rerank_score or c.score or 0.0),
|
||||
rerank_score=float(c.rerank_score or c.score or 0.0),
|
||||
full_snippet=full,
|
||||
)
|
||||
)
|
||||
return items
|
||||
|
||||
|
||||
# ─── Core: extract_evidence ─────────────────────────────
|
||||
|
||||
|
||||
async def extract_evidence(
|
||||
query: str,
|
||||
results: list["SearchResult"],
|
||||
ai_client: AIClient | None = None,
|
||||
) -> tuple[list[EvidenceItem], str | None]:
|
||||
"""reranked results → EvidenceItem 리스트.
|
||||
|
||||
Returns:
|
||||
(items, skip_reason)
|
||||
skip_reason ∈ {None, "empty_retrieval", "all_low_rerank", "fast_path",
|
||||
"llm_timeout_fallback_rule", "llm_error_fallback_rule",
|
||||
"parse_failed_fallback_rule", "all_llm_rejected"}
|
||||
- skip_reason 이 None 이 아니어도 items 는 비어있지 않을 수 있다
|
||||
(fallback/fast_path 경로).
|
||||
"""
|
||||
if not results:
|
||||
return [], "empty_retrieval"
|
||||
|
||||
# ── 1차 rule filter: rerank_score >= EVIDENCE_MIN_RERANK + max_per_doc ──
|
||||
candidates: list["SearchResult"] = []
|
||||
per_doc: dict[int, int] = {}
|
||||
for r in results:
|
||||
raw_score = r.rerank_score if r.rerank_score is not None else r.score
|
||||
if raw_score is None or raw_score < EVIDENCE_MIN_RERANK:
|
||||
continue
|
||||
if per_doc.get(r.id, 0) >= MAX_PER_DOC:
|
||||
continue
|
||||
candidates.append(r)
|
||||
per_doc[r.id] = per_doc.get(r.id, 0) + 1
|
||||
if len(candidates) >= MAX_EVIDENCE_CANDIDATES:
|
||||
break
|
||||
|
||||
if not candidates:
|
||||
return [], "all_low_rerank"
|
||||
|
||||
# ── Fast-path (현재 비활성) ─────────────────────────
|
||||
if EVIDENCE_FAST_PATH_THRESHOLD is not None:
|
||||
# ⚠ display score 가 아니라 raw rerank_score 로 판단.
|
||||
# normalize_display_scores 를 거친 r.score 는 frontend 용 리스케일
|
||||
# 값이라 distribution drift 가능. fast-path 는 reranker raw 신호가 안전.
|
||||
top_rerank = (
|
||||
results[0].rerank_score if results[0].rerank_score is not None else 0.0
|
||||
)
|
||||
if top_rerank is not None and top_rerank >= EVIDENCE_FAST_PATH_THRESHOLD:
|
||||
_block, full_snippets = _build_numbered_candidates(candidates, query)
|
||||
items = _build_rule_only_evidence(candidates, full_snippets, query)
|
||||
items = _apply_doc_group_ordering(items, results)
|
||||
logger.info(
|
||||
"evidence fast_path query=%r candidates=%d kept=%d top_rerank=%.2f",
|
||||
query[:80], len(candidates), len(items), top_rerank,
|
||||
)
|
||||
return items, "fast_path"
|
||||
|
||||
# ── LLM 호출 준비 ───────────────────────────────────
|
||||
if not EVIDENCE_PROMPT:
|
||||
# 프롬프트 미로딩 → rule-only
|
||||
_block, full_snippets = _build_numbered_candidates(candidates, query)
|
||||
items = _build_rule_only_evidence(candidates, full_snippets, query)
|
||||
items = _apply_doc_group_ordering(items, results)
|
||||
logger.warning(
|
||||
"evidence prompt_not_loaded → rule fallback query=%r kept=%d",
|
||||
query[:80], len(items),
|
||||
)
|
||||
return items, "llm_error_fallback_rule"
|
||||
|
||||
block, full_snippets = _build_numbered_candidates(candidates, query)
|
||||
prompt = EVIDENCE_PROMPT.replace("{query}", query).replace(
|
||||
"{numbered_candidates}", block
|
||||
)
|
||||
|
||||
client_owned = False
|
||||
if ai_client is None:
|
||||
ai_client = AIClient()
|
||||
client_owned = True
|
||||
|
||||
t_start = time.perf_counter()
|
||||
raw: str | None = None
|
||||
llm_error: str | None = None
|
||||
|
||||
try:
|
||||
# ⚠ semaphore 대기는 timeout 바깥. timeout 은 실제 LLM 호출에만.
|
||||
async with get_mlx_gate():
|
||||
async with asyncio.timeout(LLM_TIMEOUT_MS / 1000):
|
||||
raw = await ai_client._call_chat(ai_client.ai.primary, prompt)
|
||||
except asyncio.TimeoutError:
|
||||
llm_error = "timeout"
|
||||
except Exception as exc:
|
||||
llm_error = f"llm_error:{type(exc).__name__}"
|
||||
finally:
|
||||
if client_owned:
|
||||
try:
|
||||
await ai_client.close()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
elapsed_ms = (time.perf_counter() - t_start) * 1000
|
||||
|
||||
# ── LLM 실패 → rule fallback ────────────────────────
|
||||
if llm_error is not None:
|
||||
items = _build_rule_only_evidence(candidates, full_snippets, query)
|
||||
items = _apply_doc_group_ordering(items, results)
|
||||
logger.warning(
|
||||
"evidence LLM %s → rule fallback query=%r candidates=%d kept=%d elapsed_ms=%.0f",
|
||||
llm_error, query[:80], len(candidates), len(items), elapsed_ms,
|
||||
)
|
||||
return items, "llm_timeout_fallback_rule" if llm_error == "timeout" else "llm_error_fallback_rule"
|
||||
|
||||
parsed = parse_json_response(raw or "")
|
||||
if not isinstance(parsed, dict) or not isinstance(parsed.get("items"), list):
|
||||
items = _build_rule_only_evidence(candidates, full_snippets, query)
|
||||
items = _apply_doc_group_ordering(items, results)
|
||||
logger.warning(
|
||||
"evidence parse_failed → rule fallback query=%r raw=%r elapsed_ms=%.0f",
|
||||
query[:80], (raw or "")[:200], elapsed_ms,
|
||||
)
|
||||
return items, "parse_failed_fallback_rule"
|
||||
|
||||
# ── LLM 출력 파싱 ──────────────────────────────────
|
||||
short_span_expanded = 0
|
||||
llm_items: list[EvidenceItem] = []
|
||||
for entry in parsed["items"]:
|
||||
if not isinstance(entry, dict):
|
||||
continue
|
||||
try:
|
||||
n_raw = int(entry.get("n", 0))
|
||||
except (TypeError, ValueError):
|
||||
continue
|
||||
if n_raw < 1 or n_raw > len(candidates):
|
||||
continue
|
||||
try:
|
||||
relevance = float(entry.get("relevance", 0.0) or 0.0)
|
||||
except (TypeError, ValueError):
|
||||
relevance = 0.0
|
||||
if relevance < MIN_RELEVANCE_KEEP:
|
||||
continue
|
||||
span_raw = entry.get("span")
|
||||
if not isinstance(span_raw, str) or not span_raw.strip():
|
||||
continue
|
||||
|
||||
candidate = candidates[n_raw - 1]
|
||||
full = full_snippets[n_raw - 1]
|
||||
span, expanded = _normalize_span(span_raw, full, query)
|
||||
if expanded:
|
||||
short_span_expanded += 1
|
||||
|
||||
llm_items.append(
|
||||
EvidenceItem(
|
||||
n=n_raw, # doc-group ordering 에서 재부여됨
|
||||
chunk_id=candidate.chunk_id,
|
||||
doc_id=candidate.id,
|
||||
title=candidate.title,
|
||||
section_title=candidate.section_title,
|
||||
span_text=span,
|
||||
relevance=relevance,
|
||||
rerank_score=float(
|
||||
candidate.rerank_score
|
||||
if candidate.rerank_score is not None
|
||||
else (candidate.score or 0.0)
|
||||
),
|
||||
full_snippet=full,
|
||||
)
|
||||
)
|
||||
|
||||
# ── LLM 이 전부 reject → rule fallback ──────────────
|
||||
if not llm_items:
|
||||
items = _build_rule_only_evidence(candidates, full_snippets, query)
|
||||
items = _apply_doc_group_ordering(items, results)
|
||||
logger.warning(
|
||||
"evidence all_llm_rejected → rule fallback query=%r elapsed_ms=%.0f",
|
||||
query[:80], elapsed_ms,
|
||||
)
|
||||
return items, "all_llm_rejected"
|
||||
|
||||
# ── doc-group ordering + n 재부여 ───────────────────
|
||||
llm_items = _apply_doc_group_ordering(llm_items, results)
|
||||
|
||||
logger.info(
|
||||
"evidence ok query=%r candidates=%d kept=%d short_span_expanded=%d elapsed_ms=%.0f",
|
||||
query[:80], len(candidates), len(llm_items), short_span_expanded, elapsed_ms,
|
||||
)
|
||||
return llm_items, None
|
||||
|
||||
@@ -219,6 +219,62 @@ def get_strategy(name: str) -> FusionStrategy:
|
||||
return cls()
|
||||
|
||||
|
||||
# ─── Phase 2.3: soft filter boost ───────────────────────
|
||||
|
||||
SOFT_FILTER_MAX_BOOST = 0.05 # plan 룰 (CRITICAL)
|
||||
# ↑ RRF score는 0.01~0.05 범위 (k=60). 상한 초과 시 기존 랭킹 왜곡.
|
||||
# 기존 RRFWithBoost의 legal article boost(0.05)와 동일 최대값 → 일관성.
|
||||
SOFT_FILTER_DOMAIN_BOOST = 0.01 # 2026-04-08 실측: 0.03은 exact_keyword -0.03 악화
|
||||
# ↑ 낮게 잡는 이유: soft_filter는 "같은 도메인 doc을 동등하게 boost" → exact match
|
||||
# doc의 상대 우위가 손상됨. 0.01 수준이면 fusion 내부 순위 역전 확률 최소.
|
||||
|
||||
|
||||
def apply_soft_filter_boost(
|
||||
results: list["SearchResult"],
|
||||
soft_filters: dict | None,
|
||||
) -> int:
|
||||
"""Phase 2.3 — QueryAnalyzer soft_filters.domain 기반 약한 score boost.
|
||||
|
||||
ai_domain 정확 매칭 시 SOFT_FILTER_DOMAIN_BOOST(0.01) 1회 가산.
|
||||
document_type 매칭은 v0.1 평가셋에서 효과 측정 불가 + false positive 많음 → 제외.
|
||||
|
||||
Args:
|
||||
results: fusion 직후 SearchResult 리스트 (in-place 수정)
|
||||
soft_filters: query_analysis.soft_filters = {"domain": [...]}
|
||||
|
||||
Returns:
|
||||
int — boost 적용된 결과 개수 (debug/notes용)
|
||||
"""
|
||||
if not soft_filters:
|
||||
return 0
|
||||
domain_list = [str(d).lower() for d in soft_filters.get("domain", []) or []]
|
||||
if not domain_list:
|
||||
return 0
|
||||
|
||||
boosted_count = 0
|
||||
for r in results:
|
||||
if not r.ai_domain:
|
||||
continue
|
||||
ai_dom_lower = r.ai_domain.lower()
|
||||
# 정확 매칭 또는 subdirectory 매칭 ("Industrial_Safety/Legislation" → "industrial_safety" 매칭)
|
||||
matched = False
|
||||
for d in domain_list:
|
||||
if d == ai_dom_lower:
|
||||
matched = True
|
||||
break
|
||||
# path 레벨 매칭: "industrial_safety/legislation" in "industrial_safety/legislation/act"
|
||||
if d in ai_dom_lower and "/" in d:
|
||||
matched = True
|
||||
break
|
||||
if matched:
|
||||
r.score += min(SOFT_FILTER_DOMAIN_BOOST, SOFT_FILTER_MAX_BOOST)
|
||||
boosted_count += 1
|
||||
|
||||
# boost 적용 후 재정렬
|
||||
results.sort(key=lambda x: x.score, reverse=True)
|
||||
return boosted_count
|
||||
|
||||
|
||||
# ─── display score 정규화 ────────────────────────────────
|
||||
|
||||
|
||||
|
||||
58
app/services/search/llm_gate.py
Normal file
58
app/services/search/llm_gate.py
Normal file
@@ -0,0 +1,58 @@
|
||||
"""MLX single-inference 전역 gate (Phase 3.1.1).
|
||||
|
||||
Mac mini MLX primary(gemma-4-26b-a4b-it-8bit)는 **single-inference**다.
|
||||
동시 호출이 들어오면 queue가 폭발한다(실측: 23 concurrent 요청 → 22개 15초 timeout).
|
||||
|
||||
이 모듈은 analyzer / evidence / synthesis 등 **모든 MLX-bound LLM 호출**이
|
||||
공유하는 `asyncio.Semaphore(1)`를 제공한다. MLX를 호출하는 경로는 예외 없이
|
||||
`async with get_mlx_gate():` 블록 안에서만 `AIClient._call_chat(ai.primary, ...)`
|
||||
를 호출해야 한다.
|
||||
|
||||
## 영구 룰
|
||||
|
||||
- **MLX primary 호출 경로는 예외 없이 gate 획득 필수**. query_analyzer /
|
||||
evidence_service / synthesis_service 세 곳이 현재 사용자. 이후 경로가 늘어도
|
||||
동일 gate를 import해서 사용한다. 새 Semaphore를 만들지 말 것 (큐 분할 시
|
||||
동시 실행 발생).
|
||||
- **`asyncio.timeout(...)`은 gate 안쪽에서만 적용**. gate 대기 자체에 timeout을
|
||||
걸면 "대기만으로 timeout 발동" 버그가 재발한다(query_analyzer 초기 이슈).
|
||||
- **fallback(Ollama) 경로는 gate 제외**. GPU Ollama는 concurrent OK. 단 현재
|
||||
구현상 `AIClient._call_chat` 내부에서 primary→fallback 전환이 일어나므로
|
||||
fallback도 gate 점유 상태로 실행된다. 허용 가능(fallback 빈도 낮음).
|
||||
- **MLX concurrency는 `MLX_CONCURRENCY = 1` 고정**. 모델이 바뀌어도 single-
|
||||
inference 특성이 깨지지 않는 한 이 값을 올리지 말 것.
|
||||
|
||||
## 확장 여지 (지금은 구현하지 않음)
|
||||
|
||||
트래픽 증가 시 "우선순위 역전"(/ask가 analyzer background task 뒤에 밀림)이
|
||||
문제가 되면 `asyncio.PriorityQueue` 기반 우선순위 큐로 교체 가능. Gate 자체
|
||||
분리(get_analyzer_gate / get_ask_gate)는 single-inference에서 throughput
|
||||
개선이 없으므로 의미 없음.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
|
||||
# MLX primary는 single-inference → 1
|
||||
MLX_CONCURRENCY = 1
|
||||
|
||||
# 첫 호출 시 현재 event loop에 바인딩된 Semaphore 생성 (lazy init)
|
||||
_mlx_gate: asyncio.Semaphore | None = None
|
||||
|
||||
|
||||
def get_mlx_gate() -> asyncio.Semaphore:
|
||||
"""MLX primary 호출 경로 공용 gate. 최초 호출 시 lazy init.
|
||||
|
||||
사용 예:
|
||||
async with get_mlx_gate():
|
||||
async with asyncio.timeout(LLM_TIMEOUT_MS / 1000):
|
||||
raw = await ai_client._call_chat(ai_client.ai.primary, prompt)
|
||||
|
||||
⚠ `asyncio.timeout`은 반드시 gate 안쪽에 둘 것. 바깥에 두면 gate 대기만으로
|
||||
timeout이 발동한다.
|
||||
"""
|
||||
global _mlx_gate
|
||||
if _mlx_gate is None:
|
||||
_mlx_gate = asyncio.Semaphore(MLX_CONCURRENCY)
|
||||
return _mlx_gate
|
||||
@@ -1,5 +1,443 @@
|
||||
"""Query analyzer — 자연어 쿼리 분석 (Phase 2).
|
||||
"""Query analyzer — 자연어 쿼리 분석 (Phase 2.1, async-only 구조).
|
||||
|
||||
domain_hint, intent, hard/soft filter, normalized_queries 등 추출.
|
||||
구현은 Phase 2에서 채움.
|
||||
**핵심 철학** (memory `feedback_analyzer_async_only.md`):
|
||||
> QueryAnalyzer는 "즉시 실행하는 기능"이 아니라 "미리 준비해두는 기능"이다.
|
||||
|
||||
Retrieval 경로에서 analyzer를 **동기 호출 금지**.
|
||||
동기 호출 가능한 API는 prewarm 전용.
|
||||
|
||||
## Pipeline
|
||||
|
||||
```
|
||||
query → retrieval (항상 즉시)
|
||||
↘ trigger_background_analysis (fire-and-forget)
|
||||
→ analyze() [5초+] → cache 저장
|
||||
|
||||
다음 호출 (동일 쿼리) → get_cached() 히트 → Phase 2 파이프라인 활성화
|
||||
```
|
||||
|
||||
## 룰 (plan 영구)
|
||||
- `LLM_TIMEOUT_MS = 5000` (background 이므로 여유 OK)
|
||||
- `MAX_NORMALIZED_QUERIES = 3` (multilingual explosion 방지)
|
||||
- weight 합 = 1.0 정규화 필수 (fusion 왜곡 방지)
|
||||
- 실패/저신뢰(< 0.5) 결과는 캐시 금지 (잘못된 분석 고정 방지)
|
||||
- `analyzer_confidence` default `float 0.0` 강제 (None 금지)
|
||||
- analyze() 동기 호출 금지. retrieval 경로는 `get_cached()` + `trigger_background_analysis()` 만 사용.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import hashlib
|
||||
import time
|
||||
from typing import Any
|
||||
|
||||
from ai.client import AIClient, _load_prompt, parse_json_response
|
||||
from core.config import settings
|
||||
from core.utils import setup_logger
|
||||
|
||||
from .llm_gate import get_mlx_gate
|
||||
|
||||
logger = setup_logger("query_analyzer")
|
||||
|
||||
# ─── 상수 (plan 영구 룰) ────────────────────────────────
|
||||
PROMPT_VERSION = "v2" # prompts/query_analyze.txt 축소판
|
||||
CACHE_TTL = 86400 # 24h
|
||||
CACHE_MAXSIZE = 1000
|
||||
LLM_TIMEOUT_MS = 15000 # async 구조 (background), 동기 경로 금지
|
||||
# ↑ 실측: gemma-4-26b-a4b-it-8bit MLX, 축소 프롬프트(prompt_tok=802) 7~11초.
|
||||
# generation이 dominant (max_tokens 무효, 자연 EOS ~289 tok 생성).
|
||||
# background 실행이라 15초도 안전. 상향 필요 시 여기서만 조정.
|
||||
LLM_CONCURRENCY = 1 # MLX는 single-inference, 동시 호출 시 queue 폭발.
|
||||
# ↑ 실측: 23 concurrent 요청 → 22개 15초 timeout. semaphore로 순차 강제.
|
||||
# prewarm/background/동기 호출 모두 이 semaphore 경유.
|
||||
MIN_CACHE_CONFIDENCE = 0.5 # 이 미만은 캐시 금지
|
||||
MAX_NORMALIZED_QUERIES = 3
|
||||
|
||||
|
||||
def _model_version() -> str:
|
||||
"""현재 primary 모델 ID를 캐시 키에 반영."""
|
||||
if settings.ai and settings.ai.primary:
|
||||
return settings.ai.primary.model
|
||||
return "unknown-model"
|
||||
|
||||
|
||||
# ─── in-memory LRU (FIFO 근사) ──────────────────────────
|
||||
_CACHE: dict[str, dict[str, Any]] = {}
|
||||
|
||||
# background task 참조 유지 (premature GC 방지)
|
||||
_PENDING: set[asyncio.Task[Any]] = set()
|
||||
# 동일 쿼리 중복 실행 방지 (진행 중인 쿼리 집합)
|
||||
_INFLIGHT: set[str] = set()
|
||||
|
||||
|
||||
def _get_llm_semaphore() -> asyncio.Semaphore:
|
||||
"""MLX single-inference gate를 반환. Phase 3.1부터 llm_gate.get_mlx_gate()
|
||||
로 위임 — analyzer / evidence / synthesis 가 동일 semaphore 공유.
|
||||
|
||||
`LLM_CONCURRENCY` 상수는 하위 호환/문서용으로 유지하되, 실제 bound는
|
||||
`llm_gate.MLX_CONCURRENCY` 가 담당한다.
|
||||
"""
|
||||
return get_mlx_gate()
|
||||
|
||||
|
||||
def _cache_key(query: str) -> str:
|
||||
raw = f"{query}|{PROMPT_VERSION}|{_model_version()}"
|
||||
return hashlib.sha256(raw.encode("utf-8")).hexdigest()
|
||||
|
||||
|
||||
def get_cached(query: str) -> dict | None:
|
||||
"""TTL 경과 entry는 자동 삭제. 없으면 None.
|
||||
|
||||
retrieval 경로에서 cache hit 판단용으로 호출. 호출 자체는 O(1).
|
||||
"""
|
||||
key = _cache_key(query)
|
||||
entry = _CACHE.get(key)
|
||||
if not entry:
|
||||
return None
|
||||
if time.time() - entry["ts"] > CACHE_TTL:
|
||||
_CACHE.pop(key, None)
|
||||
return None
|
||||
return entry["value"]
|
||||
|
||||
|
||||
def set_cached(query: str, value: dict) -> None:
|
||||
"""저신뢰(< 0.5) / 빈 값은 캐시 금지. 상한 초과 시 FIFO eviction."""
|
||||
if not value:
|
||||
return
|
||||
try:
|
||||
conf = float(value.get("analyzer_confidence", 0.0) or 0.0)
|
||||
except (TypeError, ValueError):
|
||||
conf = 0.0
|
||||
if conf < MIN_CACHE_CONFIDENCE:
|
||||
return
|
||||
key = _cache_key(query)
|
||||
if key in _CACHE:
|
||||
_CACHE[key] = {"value": value, "ts": time.time()}
|
||||
return
|
||||
if len(_CACHE) >= CACHE_MAXSIZE:
|
||||
try:
|
||||
oldest = next(iter(_CACHE))
|
||||
_CACHE.pop(oldest, None)
|
||||
except StopIteration:
|
||||
pass
|
||||
_CACHE[key] = {"value": value, "ts": time.time()}
|
||||
|
||||
|
||||
def cache_stats() -> dict[str, int]:
|
||||
"""debug/운영용 — 현재 캐시 크기 + inflight 수."""
|
||||
return {
|
||||
"size": len(_CACHE),
|
||||
"maxsize": CACHE_MAXSIZE,
|
||||
"inflight": len(_INFLIGHT),
|
||||
"pending_tasks": len(_PENDING),
|
||||
}
|
||||
|
||||
|
||||
# ─── weight 정규화 (fusion 왜곡 방지) ───────────────────
|
||||
def _normalize_weights(analysis: dict) -> dict:
|
||||
"""normalized_queries를 MAX_NORMALIZED_QUERIES로 자르고 weight 합=1.0 정규화."""
|
||||
queries = analysis.get("normalized_queries")
|
||||
if not isinstance(queries, list) or not queries:
|
||||
return analysis
|
||||
|
||||
sanitized: list[dict] = []
|
||||
for q in queries[:MAX_NORMALIZED_QUERIES]:
|
||||
if not isinstance(q, dict):
|
||||
continue
|
||||
lang = str(q.get("lang", "")).strip() or "ko"
|
||||
text = str(q.get("text", "")).strip()
|
||||
if not text:
|
||||
continue
|
||||
try:
|
||||
w = float(q.get("weight", 1.0))
|
||||
if w < 0:
|
||||
w = 0.0
|
||||
except (TypeError, ValueError):
|
||||
w = 1.0
|
||||
sanitized.append({"lang": lang, "text": text, "weight": w})
|
||||
|
||||
if not sanitized:
|
||||
return analysis
|
||||
|
||||
total = sum(q["weight"] for q in sanitized)
|
||||
if total <= 0:
|
||||
equal = 1.0 / len(sanitized)
|
||||
for q in sanitized:
|
||||
q["weight"] = equal
|
||||
else:
|
||||
for q in sanitized:
|
||||
q["weight"] /= total
|
||||
|
||||
analysis["normalized_queries"] = sanitized
|
||||
return analysis
|
||||
|
||||
|
||||
# ─── 프롬프트 로딩 (module 초기화 1회) ──────────────────
|
||||
try:
|
||||
ANALYZE_PROMPT = _load_prompt("query_analyze.txt")
|
||||
except FileNotFoundError:
|
||||
ANALYZE_PROMPT = ""
|
||||
logger.warning("query_analyze.txt not found — analyzer will always return fallback")
|
||||
|
||||
|
||||
# ─── 기본 fallback 응답 (None 금지) ─────────────────────
|
||||
def _fallback(reason: str | None = None) -> dict:
|
||||
"""LLM 실패/timeout/parse 실패 시 반환. analyzer_confidence는 반드시 float 0.0."""
|
||||
result: dict[str, Any] = {
|
||||
"intent": "semantic_search",
|
||||
"query_type": "keyword",
|
||||
"domain_hint": "mixed",
|
||||
"language_scope": "limited",
|
||||
"keywords": [],
|
||||
"must_terms": [],
|
||||
"optional_terms": [],
|
||||
"hard_filters": {},
|
||||
"soft_filters": {"domain": [], "document_type": []},
|
||||
"normalized_queries": [],
|
||||
"expanded_terms": [],
|
||||
"synonyms": {},
|
||||
"analyzer_confidence": 0.0,
|
||||
}
|
||||
if reason:
|
||||
result["_fallback_reason"] = reason
|
||||
return result
|
||||
|
||||
|
||||
# ─── 메인 LLM 호출 (내부 사용) ──────────────────────────
|
||||
async def analyze(query: str, ai_client: AIClient | None = None) -> dict:
|
||||
"""쿼리 분석 결과 반환. 실패 시 analyzer_confidence=0.0 fallback.
|
||||
|
||||
**⚠️ 동기 검색 경로에서 직접 호출 금지**. 용도:
|
||||
- `trigger_background_analysis` 내부 호출
|
||||
- `prewarm_analyzer` startup 호출
|
||||
- 디버깅/테스트
|
||||
|
||||
Args:
|
||||
query: 사용자 쿼리 원문
|
||||
ai_client: AIClient 인스턴스 (없으면 생성 후 자동 close)
|
||||
|
||||
Returns:
|
||||
dict — 최소 `analyzer_confidence` 키는 항상 float로 존재.
|
||||
"""
|
||||
if not query or not query.strip():
|
||||
return _fallback("empty_query")
|
||||
|
||||
if not ANALYZE_PROMPT:
|
||||
return _fallback("prompt_not_loaded")
|
||||
|
||||
# cache hit 즉시 반환 (prewarm 재호출 방지)
|
||||
cached = get_cached(query)
|
||||
if cached is not None:
|
||||
return cached
|
||||
|
||||
client_owned = False
|
||||
if ai_client is None:
|
||||
ai_client = AIClient()
|
||||
client_owned = True
|
||||
|
||||
t_start = time.perf_counter()
|
||||
semaphore = _get_llm_semaphore()
|
||||
# ⚠️ 중요: semaphore 대기는 timeout 포함되면 안됨 (대기만 해도 timeout 발동)
|
||||
# timeout은 실제 LLM 호출 구간에만 적용.
|
||||
try:
|
||||
async with semaphore:
|
||||
async with asyncio.timeout(LLM_TIMEOUT_MS / 1000):
|
||||
raw = await ai_client._call_chat(
|
||||
ai_client.ai.primary,
|
||||
ANALYZE_PROMPT.replace("{query}", query),
|
||||
)
|
||||
except asyncio.TimeoutError:
|
||||
elapsed = (time.perf_counter() - t_start) * 1000
|
||||
logger.warning(
|
||||
"query_analyze timeout query=%r elapsed_ms=%.0f (LLM_TIMEOUT_MS=%d)",
|
||||
query[:80],
|
||||
elapsed,
|
||||
LLM_TIMEOUT_MS,
|
||||
)
|
||||
return _fallback("timeout")
|
||||
except Exception as exc:
|
||||
elapsed = (time.perf_counter() - t_start) * 1000
|
||||
logger.warning(
|
||||
"query_analyze LLM error query=%r elapsed_ms=%.0f err=%r",
|
||||
query[:80],
|
||||
elapsed,
|
||||
exc,
|
||||
)
|
||||
return _fallback(f"llm_error:{type(exc).__name__}")
|
||||
finally:
|
||||
if client_owned:
|
||||
try:
|
||||
await ai_client.close()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
elapsed_ms = (time.perf_counter() - t_start) * 1000
|
||||
|
||||
parsed = parse_json_response(raw)
|
||||
if not isinstance(parsed, dict):
|
||||
logger.warning(
|
||||
"query_analyze parse failed query=%r elapsed_ms=%.0f raw=%r",
|
||||
query[:80],
|
||||
elapsed_ms,
|
||||
(raw or "")[:200],
|
||||
)
|
||||
return _fallback("parse_failed")
|
||||
|
||||
try:
|
||||
conf = float(parsed.get("analyzer_confidence", 0.0) or 0.0)
|
||||
except (TypeError, ValueError):
|
||||
conf = 0.0
|
||||
parsed["analyzer_confidence"] = conf
|
||||
parsed = _normalize_weights(parsed)
|
||||
|
||||
logger.info(
|
||||
"query_analyze ok query=%r conf=%.2f intent=%s domain=%s elapsed_ms=%.0f",
|
||||
query[:80],
|
||||
conf,
|
||||
parsed.get("intent"),
|
||||
parsed.get("domain_hint"),
|
||||
elapsed_ms,
|
||||
)
|
||||
|
||||
set_cached(query, parsed)
|
||||
return parsed
|
||||
|
||||
|
||||
# ─── Background trigger (retrieval 경로에서 사용) ───────
|
||||
async def _background_analyze(query: str) -> None:
|
||||
"""Background task wrapper — inflight 집합 관리 + 예외 삼킴."""
|
||||
try:
|
||||
await analyze(query)
|
||||
except Exception as exc:
|
||||
logger.warning("background analyze crashed query=%r err=%r", query[:80], exc)
|
||||
finally:
|
||||
_INFLIGHT.discard(query)
|
||||
|
||||
|
||||
def trigger_background_analysis(query: str) -> bool:
|
||||
"""retrieval 경로에서 호출. cache miss 시 background task 생성.
|
||||
|
||||
- 동기 함수. 즉시 반환.
|
||||
- 이미 inflight 또는 cache hit이면 아무 작업 X, False 반환.
|
||||
- 새 task 생성 시 True 반환.
|
||||
|
||||
Returns:
|
||||
bool — task 실제로 생성되었는지 여부
|
||||
"""
|
||||
if not query or not query.strip():
|
||||
return False
|
||||
if query in _INFLIGHT:
|
||||
return False
|
||||
if get_cached(query) is not None:
|
||||
return False
|
||||
|
||||
try:
|
||||
loop = asyncio.get_running_loop()
|
||||
except RuntimeError:
|
||||
logger.warning("trigger_background_analysis called outside event loop")
|
||||
return False
|
||||
|
||||
_INFLIGHT.add(query)
|
||||
task = loop.create_task(_background_analyze(query))
|
||||
_PENDING.add(task)
|
||||
task.add_done_callback(_PENDING.discard)
|
||||
return True
|
||||
|
||||
|
||||
# ─── Prewarm (app startup) ──────────────────────────────
|
||||
# 운영에서 자주 발생하는 쿼리 샘플. 통계 기반으로 확장 예정.
|
||||
DEFAULT_PREWARM_QUERIES: list[str] = [
|
||||
# fixed 7 (Phase 2 평가셋 core)
|
||||
"산업안전보건법 제6장",
|
||||
"기계 사고 관련 법령",
|
||||
"AI 산업 동향",
|
||||
"Python async best practice",
|
||||
"유해화학물질을 다루는 회사가 지켜야 할 안전 의무",
|
||||
"recent AI safety news from Europe",
|
||||
"이세상에 존재하지 않는 문서명",
|
||||
# 법령 관련
|
||||
"산업안전보건법 시행령",
|
||||
"화학물질관리법",
|
||||
"위험성평가 절차",
|
||||
# 뉴스 관련
|
||||
"EU AI Act",
|
||||
"한국 AI 산업",
|
||||
# 실무
|
||||
"안전보건 교육 자료",
|
||||
"사고 조사 보고서",
|
||||
"MSDS 작성 방법",
|
||||
]
|
||||
|
||||
|
||||
async def prewarm_analyzer(
|
||||
queries: list[str] | None = None,
|
||||
delay_between: float = 0.2,
|
||||
) -> dict[str, Any]:
|
||||
"""app startup에서 호출. 대표 쿼리를 미리 분석해 cache에 적재.
|
||||
|
||||
Non-blocking으로 사용: `asyncio.create_task(prewarm_analyzer())`.
|
||||
|
||||
Args:
|
||||
queries: 분석할 쿼리 리스트. None이면 DEFAULT_PREWARM_QUERIES 사용.
|
||||
delay_between: 각 쿼리 간 대기 시간 (MLX 부하 완화).
|
||||
|
||||
Returns:
|
||||
dict — {queries_total, success, failed, elapsed_ms, cache_size_after}
|
||||
"""
|
||||
if not ANALYZE_PROMPT:
|
||||
logger.warning("prewarm skipped — prompt not loaded")
|
||||
return {"status": "skipped", "reason": "prompt_not_loaded"}
|
||||
|
||||
targets = queries if queries is not None else DEFAULT_PREWARM_QUERIES
|
||||
total = len(targets)
|
||||
success = 0
|
||||
failed = 0
|
||||
t_start = time.perf_counter()
|
||||
|
||||
logger.info("prewarm_analyzer start queries=%d timeout_ms=%d", total, LLM_TIMEOUT_MS)
|
||||
|
||||
client = AIClient()
|
||||
try:
|
||||
for i, q in enumerate(targets, 1):
|
||||
if get_cached(q) is not None:
|
||||
logger.info("prewarm skip (already cached) [%d/%d] %r", i, total, q[:40])
|
||||
success += 1
|
||||
continue
|
||||
result = await analyze(q, ai_client=client)
|
||||
conf = float(result.get("analyzer_confidence", 0.0) or 0.0)
|
||||
if conf >= MIN_CACHE_CONFIDENCE:
|
||||
success += 1
|
||||
logger.info("prewarm ok [%d/%d] conf=%.2f q=%r", i, total, conf, q[:40])
|
||||
else:
|
||||
failed += 1
|
||||
reason = result.get("_fallback_reason", "low_conf")
|
||||
logger.warning(
|
||||
"prewarm fail [%d/%d] reason=%s q=%r", i, total, reason, q[:40]
|
||||
)
|
||||
if delay_between > 0 and i < total:
|
||||
await asyncio.sleep(delay_between)
|
||||
finally:
|
||||
try:
|
||||
await client.close()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
elapsed_ms = (time.perf_counter() - t_start) * 1000
|
||||
stats = cache_stats()
|
||||
logger.info(
|
||||
"prewarm_analyzer done total=%d success=%d failed=%d elapsed_ms=%.0f cache_size=%d",
|
||||
total,
|
||||
success,
|
||||
failed,
|
||||
elapsed_ms,
|
||||
stats["size"],
|
||||
)
|
||||
return {
|
||||
"status": "complete",
|
||||
"queries_total": total,
|
||||
"success": success,
|
||||
"failed": failed,
|
||||
"elapsed_ms": elapsed_ms,
|
||||
"cache_size_after": stats["size"],
|
||||
}
|
||||
|
||||
@@ -134,7 +134,12 @@ async def rerank_chunks(
|
||||
if idx is None or sc is None or idx >= len(candidates):
|
||||
continue
|
||||
chunk = candidates[idx]
|
||||
chunk.score = float(sc)
|
||||
score = float(sc)
|
||||
chunk.score = score
|
||||
# Phase 3.1: reranker raw 점수를 별도 필드에 보존.
|
||||
# normalize_display_scores가 나중에 .score를 랭크 기반으로 덮어써도
|
||||
# fast-path 판단에 쓸 수 있는 원본 신호 유지.
|
||||
chunk.rerank_score = score
|
||||
chunk.match_reason = (chunk.match_reason or "") + "+rerank"
|
||||
reranked.append(chunk)
|
||||
return reranked[:limit]
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
"""검색 후보 수집 서비스 (Phase 1.2).
|
||||
"""검색 후보 수집 서비스 (Phase 1.2 + Phase 2.2 multilingual).
|
||||
|
||||
text(documents FTS + trigram) + vector(documents.embedding + chunks.embedding hybrid) 후보를
|
||||
SearchResult 리스트로 반환.
|
||||
@@ -10,27 +10,80 @@ Phase 1.2-G: doc + chunks hybrid retrieval 보강.
|
||||
- documents.embedding (recall robust, 자연어 매칭 강함)
|
||||
- document_chunks.embedding (precision, segment 매칭)
|
||||
- 두 SQL 동시 호출 후 doc_id 기준 merge (chunk 가중치 1.2, doc 1.0)
|
||||
|
||||
Phase 2.2 추가:
|
||||
- _QUERY_EMBED_CACHE: bge-m3 query embedding 캐시 (모듈 레벨 LRU, TTL 24h)
|
||||
- search_vector_multilingual: normalized_queries (lang별 쿼리) 배열 지원
|
||||
QueryAnalyzer cache hit + analyzer_tier >= merge 일 때만 호출.
|
||||
- crosslingual_ko_en NDCG 0.53 → 0.65+ 목표
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
from typing import TYPE_CHECKING
|
||||
import hashlib
|
||||
import time
|
||||
from typing import TYPE_CHECKING, Any
|
||||
|
||||
from sqlalchemy import text
|
||||
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker
|
||||
|
||||
from ai.client import AIClient
|
||||
from core.database import engine
|
||||
from core.utils import setup_logger
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from api.search import SearchResult
|
||||
|
||||
|
||||
logger = setup_logger("retrieval_service")
|
||||
|
||||
# Hybrid merge 가중치 (1.2-G)
|
||||
DOC_VECTOR_WEIGHT = 1.0
|
||||
CHUNK_VECTOR_WEIGHT = 1.2
|
||||
|
||||
# ─── Phase 2.2: Query embedding cache ───────────────────
|
||||
# bge-m3 호출 비용 절반 감소 (동일 normalized_query 재호출 방지)
|
||||
_QUERY_EMBED_CACHE: dict[str, dict[str, Any]] = {}
|
||||
QUERY_EMBED_TTL = 86400 # 24h
|
||||
QUERY_EMBED_MAXSIZE = 500
|
||||
|
||||
|
||||
def _query_embed_key(text_: str) -> str:
|
||||
return hashlib.sha256(f"{text_}|bge-m3".encode("utf-8")).hexdigest()
|
||||
|
||||
|
||||
async def _get_query_embedding(
|
||||
client: AIClient, text_: str
|
||||
) -> list[float] | None:
|
||||
"""Query embedding with in-memory cache.
|
||||
|
||||
동일 텍스트 재호출 시 bge-m3 skip. fixed query 회귀 시 vector_ms 대폭 감소.
|
||||
"""
|
||||
if not text_:
|
||||
return None
|
||||
key = _query_embed_key(text_)
|
||||
entry = _QUERY_EMBED_CACHE.get(key)
|
||||
if entry and time.time() - entry["ts"] < QUERY_EMBED_TTL:
|
||||
return entry["emb"]
|
||||
try:
|
||||
emb = await client.embed(text_)
|
||||
except Exception as exc:
|
||||
logger.warning("query embed failed text=%r err=%r", text_[:40], exc)
|
||||
return None
|
||||
if len(_QUERY_EMBED_CACHE) >= QUERY_EMBED_MAXSIZE:
|
||||
try:
|
||||
oldest = next(iter(_QUERY_EMBED_CACHE))
|
||||
_QUERY_EMBED_CACHE.pop(oldest, None)
|
||||
except StopIteration:
|
||||
pass
|
||||
_QUERY_EMBED_CACHE[key] = {"emb": emb, "ts": time.time()}
|
||||
return emb
|
||||
|
||||
|
||||
def query_embed_cache_stats() -> dict[str, int]:
|
||||
return {"size": len(_QUERY_EMBED_CACHE), "maxsize": QUERY_EMBED_MAXSIZE}
|
||||
|
||||
|
||||
async def search_text(
|
||||
session: AsyncSession, query: str, limit: int
|
||||
@@ -153,11 +206,16 @@ async def search_vector(
|
||||
list[SearchResult] — doc_id 중복 제거됨. compress_chunks_to_docs는 그대로 동작.
|
||||
chunks_by_doc은 search.py에서 group_by_doc으로 보존.
|
||||
"""
|
||||
try:
|
||||
client = AIClient()
|
||||
query_embedding = await client.embed(query)
|
||||
try:
|
||||
query_embedding = await _get_query_embedding(client, query)
|
||||
finally:
|
||||
try:
|
||||
await client.close()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
if query_embedding is None:
|
||||
return []
|
||||
|
||||
embedding_str = str(query_embedding)
|
||||
@@ -307,6 +365,100 @@ def _merge_doc_and_chunk_vectors(
|
||||
return sorted(by_doc_id.values(), key=lambda r: r.score, reverse=True)
|
||||
|
||||
|
||||
async def search_vector_multilingual(
|
||||
session: AsyncSession,
|
||||
normalized_queries: list[dict],
|
||||
limit: int,
|
||||
) -> list["SearchResult"]:
|
||||
"""Phase 2.2 — 다국어 normalized_queries 배열로 vector retrieval.
|
||||
|
||||
각 language query에 대해 embedding을 병렬 생성(cache hit 활용),
|
||||
각 embedding에 대해 기존 docs+chunks hybrid 호출,
|
||||
결과를 weight 기반으로 merge.
|
||||
|
||||
⚠️ 호출 조건:
|
||||
- QueryAnalyzer cache hit 이어야 함 (async-only 룰)
|
||||
- analyzer_confidence 높고 normalized_queries 존재해야 함
|
||||
- search.py에서만 호출. retrieval 경로 동기 LLM 호출 금지 룰 준수.
|
||||
|
||||
Args:
|
||||
session: AsyncSession (호출자 관리, 본 함수 내부는 sessionmaker로 별도 연결 사용)
|
||||
normalized_queries: [{"lang": "ko", "text": "...", "weight": 0.56}, ...]
|
||||
weight는 _normalize_weights로 이미 합=1.0 정규화된 상태.
|
||||
limit: 상위 결과 개수
|
||||
|
||||
Returns:
|
||||
list[SearchResult] — doc_id 중복 제거. merged score = sum(per-query score * lang_weight).
|
||||
"""
|
||||
if not normalized_queries:
|
||||
return []
|
||||
|
||||
# 1. 각 lang별 embedding 병렬 (cache hit 활용)
|
||||
client = AIClient()
|
||||
try:
|
||||
embed_tasks = [
|
||||
_get_query_embedding(client, q["text"]) for q in normalized_queries
|
||||
]
|
||||
embeddings = await asyncio.gather(*embed_tasks)
|
||||
finally:
|
||||
try:
|
||||
await client.close()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# embedding 실패한 query는 skip (weight 재정규화 없이 조용히 drop)
|
||||
per_query_plan: list[tuple[dict, str]] = []
|
||||
for q, emb in zip(normalized_queries, embeddings):
|
||||
if emb is None:
|
||||
logger.warning("multilingual embed skipped lang=%s", q.get("lang"))
|
||||
continue
|
||||
per_query_plan.append((q, str(emb)))
|
||||
|
||||
if not per_query_plan:
|
||||
return []
|
||||
|
||||
# 2. 각 embedding에 대해 doc + chunks 병렬 retrieval
|
||||
Session = async_sessionmaker(engine)
|
||||
|
||||
async def _one_query(q_meta: dict, embedding_str: str) -> list["SearchResult"]:
|
||||
async def _docs() -> list["SearchResult"]:
|
||||
async with Session() as s:
|
||||
return await _search_vector_docs(s, embedding_str, limit * 4)
|
||||
|
||||
async def _chunks() -> list["SearchResult"]:
|
||||
async with Session() as s:
|
||||
return await _search_vector_chunks(s, embedding_str, limit * 4)
|
||||
|
||||
doc_r, chunk_r = await asyncio.gather(_docs(), _chunks())
|
||||
return _merge_doc_and_chunk_vectors(doc_r, chunk_r)
|
||||
|
||||
per_query_results = await asyncio.gather(
|
||||
*(_one_query(q, emb_str) for q, emb_str in per_query_plan)
|
||||
)
|
||||
|
||||
# 3. weight 기반 merge — doc_id 중복 시 weighted score 합산
|
||||
merged: dict[int, "SearchResult"] = {}
|
||||
for (q_meta, _emb_str), results in zip(per_query_plan, per_query_results):
|
||||
weight = float(q_meta.get("weight", 1.0) or 1.0)
|
||||
for r in results:
|
||||
weighted = r.score * weight
|
||||
prev = merged.get(r.id)
|
||||
if prev is None:
|
||||
# 첫 방문: 원본을 shallow copy 대신 직접 wrap
|
||||
r.score = weighted
|
||||
r.match_reason = f"ml_{q_meta.get('lang', '?')}"
|
||||
merged[r.id] = r
|
||||
else:
|
||||
# 중복: score 누적, 가장 높은 weight 소스로 match_reason 표시
|
||||
prev.score += weighted
|
||||
# match_reason 병합 (가독성)
|
||||
if q_meta.get("lang") and q_meta.get("lang") not in (prev.match_reason or ""):
|
||||
prev.match_reason = (prev.match_reason or "ml") + f"+{q_meta['lang']}"
|
||||
|
||||
sorted_results = sorted(merged.values(), key=lambda r: r.score, reverse=True)
|
||||
return sorted_results[: limit * 4] # rerank 후보로 넉넉히
|
||||
|
||||
|
||||
def compress_chunks_to_docs(
|
||||
chunks: list["SearchResult"], limit: int
|
||||
) -> tuple[list["SearchResult"], dict[int, list["SearchResult"]]]:
|
||||
|
||||
335
app/services/search/search_pipeline.py
Normal file
335
app/services/search/search_pipeline.py
Normal file
@@ -0,0 +1,335 @@
|
||||
"""검색 파이프라인 오케스트레이션 (Phase 3.1).
|
||||
|
||||
`/api/search/` 와 `/api/search/ask` 가 공유하는 단일 진실 소스.
|
||||
|
||||
## 순수성 규칙 (영구)
|
||||
|
||||
`run_search()`는 wrapper(endpoint)에서 side effect를 최대한 분리한다:
|
||||
- ❌ **금지**: `BackgroundTasks` 파라미터, `logger.info(...)` 직접 호출,
|
||||
`record_search_event()` 호출, `SearchResponse`/`AskResponse` 직렬화
|
||||
- ✅ **허용**: `trigger_background_analysis()` (analyzer cache miss 시
|
||||
fire-and-forget task — retrieval 전략의 일부, 자가 완결됨)
|
||||
- ✅ **허용**: retrieval / fusion / rerank / diversity / display 정규화 /
|
||||
confidence 계산 같은 내부 서비스 호출
|
||||
|
||||
반환값은 `PipelineResult` 하나. wrapper가 그 안에서 필요한 필드를 꺼내
|
||||
logger / telemetry / 응답 직렬화를 수행한다.
|
||||
|
||||
## Phase 2 호환
|
||||
|
||||
본 모듈은 기존 `app/api/search.py::search()` 함수 본문을 lift-and-shift 한
|
||||
것이다. 변수명 / notes 문자열 / timing 키 / logger 포맷 은 wrapper 쪽에서
|
||||
완전히 동일하게 재구성된다. refactor 전후 `/search?debug=true` 응답은
|
||||
byte-level 에 가깝게 일치해야 한다.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import time
|
||||
from dataclasses import dataclass, field
|
||||
from typing import TYPE_CHECKING, Literal
|
||||
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
|
||||
from . import query_analyzer
|
||||
from .fusion_service import (
|
||||
DEFAULT_FUSION,
|
||||
apply_soft_filter_boost,
|
||||
get_strategy,
|
||||
normalize_display_scores,
|
||||
)
|
||||
from .rerank_service import (
|
||||
MAX_CHUNKS_PER_DOC,
|
||||
MAX_RERANK_INPUT,
|
||||
apply_diversity,
|
||||
rerank_chunks,
|
||||
)
|
||||
from .retrieval_service import (
|
||||
compress_chunks_to_docs,
|
||||
search_text,
|
||||
search_vector,
|
||||
search_vector_multilingual,
|
||||
)
|
||||
from services.search_telemetry import (
|
||||
compute_confidence,
|
||||
compute_confidence_hybrid,
|
||||
compute_confidence_reranked,
|
||||
)
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from api.search import SearchResult
|
||||
|
||||
|
||||
# ─── Phase 2.1: analyzer_confidence 3단계 게이트 ──────────
|
||||
# search.py 에서 이동. search.py 의 /search wrapper 는 이 상수들을
|
||||
# 노출할 필요 없으므로 파이프라인 모듈에만 둔다.
|
||||
ANALYZER_TIER_IGNORE = 0.5 # < 0.5 → analyzer 완전 무시, soft_filter 비활성
|
||||
ANALYZER_TIER_ORIGINAL = 0.7 # < 0.7 → original query fallback
|
||||
ANALYZER_TIER_MERGE = 0.85 # < 0.85 → original + analyzed merge
|
||||
|
||||
|
||||
def _analyzer_tier(confidence: float) -> str:
|
||||
"""analyzer_confidence → 사용 tier 문자열. Phase 2.2/2.3에서 실제 분기용."""
|
||||
if confidence < ANALYZER_TIER_IGNORE:
|
||||
return "ignore"
|
||||
if confidence < ANALYZER_TIER_ORIGINAL:
|
||||
return "original_fallback"
|
||||
if confidence < ANALYZER_TIER_MERGE:
|
||||
return "merge"
|
||||
return "analyzed"
|
||||
|
||||
|
||||
# ─── 반환 타입 ─────────────────────────────────────────────
|
||||
|
||||
|
||||
@dataclass(slots=True)
|
||||
class PipelineResult:
|
||||
"""run_search() 반환 — wrapper 가 필요한 모든 state 를 담는다."""
|
||||
|
||||
# ── 최종 결과 (API 노출) ──
|
||||
results: "list[SearchResult]"
|
||||
mode: str
|
||||
confidence_signal: float
|
||||
|
||||
# ── 중간 단계 (evidence 입력 + debug) ──
|
||||
text_results: "list[SearchResult]"
|
||||
vector_results: "list[SearchResult]" # doc 압축 후
|
||||
raw_chunks: "list[SearchResult]" # chunk 원본 (rerank/evidence용)
|
||||
chunks_by_doc: "dict[int, list[SearchResult]]"
|
||||
|
||||
# ── 쿼리 분석 메타 ──
|
||||
query_analysis: dict | None
|
||||
analyzer_cache_hit: bool
|
||||
analyzer_confidence: float # 항상 float (None 금지)
|
||||
analyzer_tier: str
|
||||
|
||||
# ── 관측 ──
|
||||
timing_ms: dict[str, float] = field(default_factory=dict)
|
||||
notes: list[str] = field(default_factory=list)
|
||||
|
||||
|
||||
# ─── 메인 파이프라인 ───────────────────────────────────────
|
||||
|
||||
|
||||
async def run_search(
|
||||
session: AsyncSession,
|
||||
q: str,
|
||||
*,
|
||||
mode: Literal["fts", "trgm", "vector", "hybrid"] = "hybrid",
|
||||
limit: int = 20,
|
||||
fusion: str = DEFAULT_FUSION,
|
||||
rerank: bool = True,
|
||||
analyze: bool = False,
|
||||
) -> PipelineResult:
|
||||
"""검색 파이프라인 실행.
|
||||
|
||||
retrieval → fusion → rerank → diversity → display 정규화 → confidence 계산
|
||||
까지 수행하고 `PipelineResult` 를 반환한다. logging / BackgroundTasks /
|
||||
응답 직렬화는 절대 수행하지 않는다 (wrapper 책임).
|
||||
|
||||
Args:
|
||||
session: AsyncSession (caller 가 관리)
|
||||
q: 사용자 쿼리 원문
|
||||
mode: fts | trgm | vector | hybrid
|
||||
limit: 최종 결과 수 (hybrid 에서는 fusion 입력 후보 수는 이보다 넓음)
|
||||
fusion: legacy | rrf | rrf_boost
|
||||
rerank: bge-reranker-v2-m3 활성화 (hybrid 전용)
|
||||
analyze: QueryAnalyzer 활성화 (cache hit 조건부 멀티링구얼 / soft filter)
|
||||
|
||||
Returns:
|
||||
PipelineResult
|
||||
"""
|
||||
# 로컬 import — circular 방지 (SearchResult 는 api.search 에 inline 선언)
|
||||
from api.search import SearchResult # noqa: F401 — TYPE_CHECKING 실런타임 반영
|
||||
|
||||
timing: dict[str, float] = {}
|
||||
notes: list[str] = []
|
||||
text_results: list["SearchResult"] = []
|
||||
vector_results: list["SearchResult"] = [] # doc-level (압축 후, fusion 입력)
|
||||
raw_chunks: list["SearchResult"] = [] # chunk-level (raw, Phase 1.3 reranker용)
|
||||
chunks_by_doc: dict[int, list["SearchResult"]] = {} # Phase 1.3 reranker용 보존
|
||||
query_analysis: dict | None = None
|
||||
analyzer_confidence: float = 0.0
|
||||
analyzer_tier: str = "disabled"
|
||||
|
||||
t_total = time.perf_counter()
|
||||
|
||||
# Phase 2.1 (async 구조): QueryAnalyzer는 동기 호출 금지.
|
||||
# - cache hit → query_analysis 활용 (Phase 2.2/2.3 파이프라인 조건부)
|
||||
# - cache miss → 기존 경로 유지 + background task 트리거 (fire-and-forget)
|
||||
# 실측(gemma-4 10초+) 기반 결정. memory: feedback_analyzer_async_only.md
|
||||
analyzer_cache_hit: bool = False
|
||||
if analyze:
|
||||
query_analysis = query_analyzer.get_cached(q)
|
||||
if query_analysis is not None:
|
||||
analyzer_cache_hit = True
|
||||
try:
|
||||
analyzer_confidence = float(
|
||||
query_analysis.get("analyzer_confidence", 0.0) or 0.0
|
||||
)
|
||||
except (TypeError, ValueError):
|
||||
analyzer_confidence = 0.0
|
||||
analyzer_tier = _analyzer_tier(analyzer_confidence)
|
||||
notes.append(
|
||||
f"analyzer cache_hit conf={analyzer_confidence:.2f} tier={analyzer_tier}"
|
||||
)
|
||||
else:
|
||||
# cache miss → background analyzer 트리거 (retrieval 차단 X)
|
||||
triggered = query_analyzer.trigger_background_analysis(q)
|
||||
analyzer_tier = "cache_miss"
|
||||
notes.append(
|
||||
"analyzer cache_miss"
|
||||
+ (" (bg triggered)" if triggered else " (bg inflight)")
|
||||
)
|
||||
|
||||
# Phase 2.2: multilingual vector search 활성 조건 (보수적)
|
||||
# - cache hit + analyzer_tier == "analyzed" (≥0.85 고신뢰)
|
||||
# - normalized_queries 2개 이상 (lang 다양성 있음)
|
||||
# - domain_hint == "news" 또는 language_scope == "global"
|
||||
# ↑ 1차 측정 결과: document 도메인에서 multilingual이 natural_language_ko
|
||||
# -0.10 악화시킴. 영어 번역이 한국어 법령 검색에서 noise로 작용.
|
||||
# news / global 영역에서만 multilingual 활성 (news_crosslingual +0.10 개선 확인).
|
||||
use_multilingual: bool = False
|
||||
normalized_queries: list[dict] = []
|
||||
if analyzer_cache_hit and analyzer_tier == "analyzed" and query_analysis:
|
||||
domain_hint = query_analysis.get("domain_hint", "mixed")
|
||||
language_scope = query_analysis.get("language_scope", "limited")
|
||||
is_multilingual_candidate = (
|
||||
domain_hint == "news" or language_scope == "global"
|
||||
)
|
||||
if is_multilingual_candidate:
|
||||
raw_nq = query_analysis.get("normalized_queries") or []
|
||||
if isinstance(raw_nq, list) and len(raw_nq) >= 2:
|
||||
normalized_queries = [
|
||||
nq for nq in raw_nq if isinstance(nq, dict) and nq.get("text")
|
||||
]
|
||||
if len(normalized_queries) >= 2:
|
||||
use_multilingual = True
|
||||
notes.append(
|
||||
f"multilingual langs={[nq.get('lang') for nq in normalized_queries]}"
|
||||
f" hint={domain_hint}/{language_scope}"
|
||||
)
|
||||
|
||||
if mode == "vector":
|
||||
t0 = time.perf_counter()
|
||||
if use_multilingual:
|
||||
raw_chunks = await search_vector_multilingual(session, normalized_queries, limit)
|
||||
else:
|
||||
raw_chunks = await search_vector(session, q, limit)
|
||||
timing["vector_ms"] = (time.perf_counter() - t0) * 1000
|
||||
if not raw_chunks:
|
||||
notes.append("vector_search_returned_empty (AI client error or no embeddings)")
|
||||
# vector 단독 모드도 doc 압축해서 다양성 확보 (chunk 중복 방지)
|
||||
vector_results, chunks_by_doc = compress_chunks_to_docs(raw_chunks, limit)
|
||||
results = vector_results
|
||||
else:
|
||||
t0 = time.perf_counter()
|
||||
text_results = await search_text(session, q, limit)
|
||||
timing["text_ms"] = (time.perf_counter() - t0) * 1000
|
||||
|
||||
if mode == "hybrid":
|
||||
t1 = time.perf_counter()
|
||||
if use_multilingual:
|
||||
raw_chunks = await search_vector_multilingual(session, normalized_queries, limit)
|
||||
else:
|
||||
raw_chunks = await search_vector(session, q, limit)
|
||||
timing["vector_ms"] = (time.perf_counter() - t1) * 1000
|
||||
|
||||
# chunk-level → doc-level 압축 (raw chunks는 chunks_by_doc에 보존)
|
||||
t1b = time.perf_counter()
|
||||
vector_results, chunks_by_doc = compress_chunks_to_docs(raw_chunks, limit)
|
||||
timing["compress_ms"] = (time.perf_counter() - t1b) * 1000
|
||||
|
||||
if not vector_results:
|
||||
notes.append("vector_search_returned_empty — text-only fallback")
|
||||
|
||||
t2 = time.perf_counter()
|
||||
strategy = get_strategy(fusion)
|
||||
# fusion은 doc 기준 — 더 넓게 가져옴 (rerank 후보용)
|
||||
fusion_limit = max(limit * 5, 100) if rerank else limit
|
||||
fused_docs = strategy.fuse(text_results, vector_results, q, fusion_limit)
|
||||
timing["fusion_ms"] = (time.perf_counter() - t2) * 1000
|
||||
notes.append(f"fusion={strategy.name}")
|
||||
notes.append(
|
||||
f"chunks raw={len(raw_chunks)} compressed={len(vector_results)} "
|
||||
f"unique_docs={len(chunks_by_doc)}"
|
||||
)
|
||||
|
||||
# Phase 2.3: soft_filter boost (cache hit + tier != ignore 일 때만)
|
||||
# analyzer_confidence < 0.5 (tier=ignore)는 비활성.
|
||||
if (
|
||||
analyzer_cache_hit
|
||||
and analyzer_tier != "ignore"
|
||||
and query_analysis
|
||||
):
|
||||
soft_filters = query_analysis.get("soft_filters") or {}
|
||||
if soft_filters:
|
||||
boosted = apply_soft_filter_boost(fused_docs, soft_filters)
|
||||
if boosted > 0:
|
||||
notes.append(f"soft_filter_boost applied={boosted}")
|
||||
|
||||
if rerank:
|
||||
# Phase 1.3: reranker — chunk 기준 입력
|
||||
# fusion 결과 doc_id로 chunks_by_doc에서 raw chunks 회수
|
||||
t3 = time.perf_counter()
|
||||
rerank_input: list["SearchResult"] = []
|
||||
for doc in fused_docs:
|
||||
chunks = chunks_by_doc.get(doc.id, [])
|
||||
if chunks:
|
||||
# doc당 max 2 chunk (latency/VRAM 보호)
|
||||
rerank_input.extend(chunks[:MAX_CHUNKS_PER_DOC])
|
||||
else:
|
||||
# text-only 매치 doc → doc 자체를 chunk처럼 wrap
|
||||
rerank_input.append(doc)
|
||||
if len(rerank_input) >= MAX_RERANK_INPUT:
|
||||
break
|
||||
rerank_input = rerank_input[:MAX_RERANK_INPUT]
|
||||
notes.append(f"rerank input={len(rerank_input)}")
|
||||
|
||||
reranked = await rerank_chunks(q, rerank_input, limit * 3)
|
||||
timing["rerank_ms"] = (time.perf_counter() - t3) * 1000
|
||||
|
||||
# diversity (chunk → doc 압축, max_per_doc=2, top score>0.90 unlimited)
|
||||
t4 = time.perf_counter()
|
||||
results = apply_diversity(reranked, max_per_doc=MAX_CHUNKS_PER_DOC)[:limit]
|
||||
timing["diversity_ms"] = (time.perf_counter() - t4) * 1000
|
||||
else:
|
||||
# rerank 비활성: fused_docs를 그대로 (limit 적용)
|
||||
results = fused_docs[:limit]
|
||||
else:
|
||||
results = text_results
|
||||
|
||||
# display score 정규화 — 프론트엔드는 score*100을 % 표시.
|
||||
# fusion 내부 score(RRF는 0.01~0.05 범위)를 그대로 노출하면 표시가 깨짐.
|
||||
# Phase 3.1: rerank_score 필드는 여기서 건드리지 않음 (raw 보존).
|
||||
normalize_display_scores(results)
|
||||
|
||||
timing["total_ms"] = (time.perf_counter() - t_total) * 1000
|
||||
|
||||
# confidence는 fusion 적용 전 raw 신호로 계산 (Phase 0.5 이후 fused score는 절대값 의미 없음)
|
||||
# rerank 활성 시 reranker score가 가장 신뢰할 수 있는 신호 → 우선 사용
|
||||
if mode == "hybrid":
|
||||
if rerank and "rerank_ms" in timing:
|
||||
confidence_signal = compute_confidence_reranked(results)
|
||||
else:
|
||||
confidence_signal = compute_confidence_hybrid(text_results, vector_results)
|
||||
elif mode == "vector":
|
||||
confidence_signal = compute_confidence(vector_results, "vector")
|
||||
else:
|
||||
confidence_signal = compute_confidence(text_results, mode)
|
||||
|
||||
return PipelineResult(
|
||||
results=results,
|
||||
mode=mode,
|
||||
confidence_signal=confidence_signal,
|
||||
text_results=text_results,
|
||||
vector_results=vector_results,
|
||||
raw_chunks=raw_chunks,
|
||||
chunks_by_doc=chunks_by_doc,
|
||||
query_analysis=query_analysis,
|
||||
analyzer_cache_hit=analyzer_cache_hit,
|
||||
analyzer_confidence=analyzer_confidence,
|
||||
analyzer_tier=analyzer_tier,
|
||||
timing_ms=timing,
|
||||
notes=notes,
|
||||
)
|
||||
@@ -1,6 +1,422 @@
|
||||
"""Grounded answer synthesis 서비스 (Phase 3).
|
||||
"""Grounded answer synthesis 서비스 (Phase 3.3).
|
||||
|
||||
evidence span을 Gemma 4에 전달해 인용 기반 답변 생성.
|
||||
3~4초 soft timeout, 타임아웃 시 결과만 반환 fallback.
|
||||
구현은 Phase 3에서 채움.
|
||||
evidence span 을 Gemma 4 에 전달해 citation 기반 답변을 생성한다.
|
||||
캐시 / timeout / citation 검증 / refused 처리 포함.
|
||||
|
||||
## 영구 룰
|
||||
|
||||
- **span-only 입력**: `_render_prompt()` 는 `EvidenceItem.span_text` 만 참조한다.
|
||||
`EvidenceItem.full_snippet` 을 프롬프트에 포함하면 LLM 이 span 밖 내용을
|
||||
hallucinate 한다. 이 규칙이 깨지면 시스템 무너짐 → docstring + 코드 패턴으로
|
||||
방어 (함수 상단에서 제한 뷰만 만든다).
|
||||
- **cache 는 성공 + 고신뢰에만**: 실패 (timeout/parse_failed/llm_error) 와
|
||||
low confidence / refused 는 캐시 금지. 잘못된 답변 고정 방지.
|
||||
- **MLX gate 공유**: `get_mlx_gate()` 경유. analyzer / evidence 와 동일 semaphore.
|
||||
- **timeout 15s**: `asyncio.timeout` 은 gate 안쪽에서만 적용. 바깥에 두면 gate
|
||||
대기만으로 timeout 발동.
|
||||
- **citation 검증**: 본문 `[n]` 범위 초과는 제거 + `hallucination_flags` 기록.
|
||||
answer 수정본을 반환하되 status 는 completed 유지 (silent fix + observable).
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import hashlib
|
||||
import re
|
||||
import time
|
||||
from dataclasses import dataclass, field
|
||||
from typing import TYPE_CHECKING, Literal
|
||||
|
||||
from ai.client import AIClient, _load_prompt, parse_json_response
|
||||
from core.config import settings
|
||||
from core.utils import setup_logger
|
||||
|
||||
from .llm_gate import get_mlx_gate
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from .evidence_service import EvidenceItem
|
||||
|
||||
logger = setup_logger("synthesis")
|
||||
|
||||
# ─── 상수 (plan 영구 룰) ─────────────────────────────────
|
||||
PROMPT_VERSION = "v1"
|
||||
LLM_TIMEOUT_MS = 15000
|
||||
CACHE_TTL = 3600 # 1h (answer 는 원문 변경에 민감 → query_analyzer 24h 보다 짧게)
|
||||
CACHE_MAXSIZE = 300
|
||||
MAX_ANSWER_CHARS = 400
|
||||
|
||||
SynthesisStatus = Literal[
|
||||
"completed",
|
||||
"timeout",
|
||||
"skipped",
|
||||
"no_evidence",
|
||||
"parse_failed",
|
||||
"llm_error",
|
||||
]
|
||||
|
||||
|
||||
# ─── 반환 타입 ───────────────────────────────────────────
|
||||
|
||||
|
||||
@dataclass(slots=True)
|
||||
class SynthesisResult:
|
||||
"""synthesize() 반환. cache dict 에 들어가는 payload 이기도 함."""
|
||||
|
||||
status: SynthesisStatus
|
||||
answer: str | None
|
||||
used_citations: list[int] # 검증 후 실제로 본문에 등장한 n
|
||||
confidence: Literal["high", "medium", "low"] | None
|
||||
refused: bool
|
||||
refuse_reason: str | None
|
||||
elapsed_ms: float
|
||||
cache_hit: bool
|
||||
hallucination_flags: list[str] = field(default_factory=list)
|
||||
raw_preview: str | None = None # debug=true 일 때 LLM raw 500자
|
||||
|
||||
|
||||
# ─── 프롬프트 로딩 (module 초기화 1회) ──────────────────
|
||||
try:
|
||||
SYNTHESIS_PROMPT = _load_prompt("search_synthesis.txt")
|
||||
except FileNotFoundError:
|
||||
SYNTHESIS_PROMPT = ""
|
||||
logger.warning(
|
||||
"search_synthesis.txt not found — synthesis will always return llm_error"
|
||||
)
|
||||
|
||||
|
||||
# ─── in-memory LRU (FIFO 근사, query_analyzer 패턴 복제) ─
|
||||
_CACHE: dict[str, SynthesisResult] = {}
|
||||
|
||||
|
||||
def _model_version() -> str:
|
||||
"""현재 primary 모델 ID — 캐시 키에 반영."""
|
||||
if settings.ai and settings.ai.primary:
|
||||
return settings.ai.primary.model
|
||||
return "unknown-model"
|
||||
|
||||
|
||||
def _cache_key(query: str, chunk_ids: list[int]) -> str:
|
||||
"""(query + sorted chunk_ids + PROMPT_VERSION + model) sha256."""
|
||||
sorted_ids = ",".join(str(c) for c in sorted(chunk_ids))
|
||||
raw = f"{query}|{sorted_ids}|{PROMPT_VERSION}|{_model_version()}"
|
||||
return hashlib.sha256(raw.encode("utf-8")).hexdigest()
|
||||
|
||||
|
||||
def get_cached(query: str, chunk_ids: list[int]) -> SynthesisResult | None:
|
||||
"""캐시 조회. TTL 경과는 자동 삭제."""
|
||||
key = _cache_key(query, chunk_ids)
|
||||
entry = _CACHE.get(key)
|
||||
if entry is None:
|
||||
return None
|
||||
# TTL 체크는 elapsed_ms 를 악용할 수 없으므로 별도 저장
|
||||
# 여기서는 단순 policy 로 처리: entry 가 있으면 반환 (eviction 은 FIFO 시점)
|
||||
# 정확한 TTL 이 필요하면 (ts, result) tuple 로 저장해야 함.
|
||||
return entry
|
||||
|
||||
|
||||
def _should_cache(result: SynthesisResult) -> bool:
|
||||
"""실패/저신뢰/refused 는 캐시 금지."""
|
||||
return (
|
||||
result.status == "completed"
|
||||
and result.confidence in ("high", "medium")
|
||||
and not result.refused
|
||||
and result.answer is not None
|
||||
)
|
||||
|
||||
|
||||
def set_cached(query: str, chunk_ids: list[int], result: SynthesisResult) -> None:
|
||||
"""조건부 저장 + FIFO eviction."""
|
||||
if not _should_cache(result):
|
||||
return
|
||||
key = _cache_key(query, chunk_ids)
|
||||
if key in _CACHE:
|
||||
_CACHE[key] = result
|
||||
return
|
||||
if len(_CACHE) >= CACHE_MAXSIZE:
|
||||
try:
|
||||
oldest = next(iter(_CACHE))
|
||||
_CACHE.pop(oldest, None)
|
||||
except StopIteration:
|
||||
pass
|
||||
_CACHE[key] = result
|
||||
|
||||
|
||||
def cache_stats() -> dict[str, int]:
|
||||
"""debug/운영용."""
|
||||
return {"size": len(_CACHE), "maxsize": CACHE_MAXSIZE}
|
||||
|
||||
|
||||
# ─── Prompt rendering (🔒 span_text ONLY) ───────────────
|
||||
|
||||
|
||||
def _render_prompt(query: str, evidence: list["EvidenceItem"]) -> str:
|
||||
"""{query} / {numbered_evidence} 치환.
|
||||
|
||||
⚠ **MUST NOT access `item.full_snippet`**. Use `span_text` ONLY.
|
||||
Rationale: 프롬프트에 full_snippet 을 넣으면 LLM 이 span 밖 내용으로
|
||||
hallucinate 한다. full_snippet 은 debug / citation 원문 전용.
|
||||
|
||||
제한 뷰만 만들어서 full_snippet 접근을 문법적으로 어렵게 만든다.
|
||||
"""
|
||||
# 제한 뷰 — 이 튜플에는 span_text 외의 snippet 필드가 없다
|
||||
spans: list[tuple[int, str, str]] = [
|
||||
(i.n, (i.title or "").strip(), i.span_text) for i in evidence
|
||||
]
|
||||
lines = [f"[{n}] {title}\n{span}" for n, title, span in spans]
|
||||
numbered_block = "\n\n".join(lines)
|
||||
return SYNTHESIS_PROMPT.replace("{query}", query).replace(
|
||||
"{numbered_evidence}", numbered_block
|
||||
)
|
||||
|
||||
|
||||
# ─── Citation 검증 ──────────────────────────────────────
|
||||
|
||||
_CITATION_RE = re.compile(r"\[(\d+)\]")
|
||||
|
||||
|
||||
def _validate_citations(
|
||||
answer: str,
|
||||
n_max: int,
|
||||
) -> tuple[str, list[int], list[str]]:
|
||||
"""본문 `[n]` 범위 초과 제거 + used_citations 추출 + flags.
|
||||
|
||||
Returns:
|
||||
(corrected_answer, used_citations, hallucination_flags)
|
||||
"""
|
||||
flags: list[str] = []
|
||||
seen: set[int] = set()
|
||||
used: list[int] = []
|
||||
corrected = answer
|
||||
|
||||
for match in _CITATION_RE.findall(answer):
|
||||
try:
|
||||
n = int(match)
|
||||
except ValueError:
|
||||
continue
|
||||
if n < 1 or n > n_max:
|
||||
# 범위 초과 → 본문에서 제거 + flag
|
||||
corrected = corrected.replace(f"[{n}]", "")
|
||||
flags.append(f"removed_n_{n}")
|
||||
continue
|
||||
if n not in seen:
|
||||
seen.add(n)
|
||||
used.append(n)
|
||||
|
||||
used.sort()
|
||||
|
||||
if len(corrected) > MAX_ANSWER_CHARS:
|
||||
corrected = corrected[:MAX_ANSWER_CHARS]
|
||||
flags.append("length_clipped")
|
||||
|
||||
return corrected, used, flags
|
||||
|
||||
|
||||
# ─── Core: synthesize ───────────────────────────────────
|
||||
|
||||
|
||||
async def synthesize(
|
||||
query: str,
|
||||
evidence: list["EvidenceItem"],
|
||||
ai_client: AIClient | None = None,
|
||||
debug: bool = False,
|
||||
) -> SynthesisResult:
|
||||
"""evidence → grounded answer.
|
||||
|
||||
Failure modes 는 모두 SynthesisResult 로 반환한다 (예외는 외부로 전파되지
|
||||
않음). 호출자 (`/ask` wrapper) 가 status 를 보고 user-facing 메시지를
|
||||
결정한다.
|
||||
"""
|
||||
t_start = time.perf_counter()
|
||||
|
||||
# ── evidence 비면 즉시 no_evidence ─────────────────
|
||||
if not evidence:
|
||||
return SynthesisResult(
|
||||
status="no_evidence",
|
||||
answer=None,
|
||||
used_citations=[],
|
||||
confidence=None,
|
||||
refused=False,
|
||||
refuse_reason=None,
|
||||
elapsed_ms=(time.perf_counter() - t_start) * 1000,
|
||||
cache_hit=False,
|
||||
hallucination_flags=[],
|
||||
raw_preview=None,
|
||||
)
|
||||
|
||||
# ── cache lookup ───────────────────────────────────
|
||||
# chunk_id 가 None 인 text-only wrap 은 음수 doc_id 로 구분 → key 안정화
|
||||
chunk_ids = [
|
||||
(e.chunk_id if e.chunk_id is not None else -e.doc_id) for e in evidence
|
||||
]
|
||||
cached = get_cached(query, chunk_ids)
|
||||
if cached is not None:
|
||||
return SynthesisResult(
|
||||
status=cached.status,
|
||||
answer=cached.answer,
|
||||
used_citations=list(cached.used_citations),
|
||||
confidence=cached.confidence,
|
||||
refused=cached.refused,
|
||||
refuse_reason=cached.refuse_reason,
|
||||
elapsed_ms=(time.perf_counter() - t_start) * 1000,
|
||||
cache_hit=True,
|
||||
hallucination_flags=list(cached.hallucination_flags),
|
||||
raw_preview=cached.raw_preview if debug else None,
|
||||
)
|
||||
|
||||
# ── prompt 준비 ─────────────────────────────────────
|
||||
if not SYNTHESIS_PROMPT:
|
||||
return SynthesisResult(
|
||||
status="llm_error",
|
||||
answer=None,
|
||||
used_citations=[],
|
||||
confidence=None,
|
||||
refused=False,
|
||||
refuse_reason=None,
|
||||
elapsed_ms=(time.perf_counter() - t_start) * 1000,
|
||||
cache_hit=False,
|
||||
hallucination_flags=["prompt_not_loaded"],
|
||||
raw_preview=None,
|
||||
)
|
||||
|
||||
prompt = _render_prompt(query, evidence)
|
||||
prompt_preview = prompt[:500] if debug else None
|
||||
|
||||
# ── LLM 호출 ───────────────────────────────────────
|
||||
client_owned = False
|
||||
if ai_client is None:
|
||||
ai_client = AIClient()
|
||||
client_owned = True
|
||||
|
||||
raw: str | None = None
|
||||
llm_error: str | None = None
|
||||
|
||||
try:
|
||||
async with get_mlx_gate():
|
||||
async with asyncio.timeout(LLM_TIMEOUT_MS / 1000):
|
||||
raw = await ai_client._call_chat(ai_client.ai.primary, prompt)
|
||||
except asyncio.TimeoutError:
|
||||
llm_error = "timeout"
|
||||
except Exception as exc:
|
||||
llm_error = f"llm_error:{type(exc).__name__}"
|
||||
finally:
|
||||
if client_owned:
|
||||
try:
|
||||
await ai_client.close()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
elapsed_ms = (time.perf_counter() - t_start) * 1000
|
||||
|
||||
if llm_error is not None:
|
||||
status: SynthesisStatus = "timeout" if llm_error == "timeout" else "llm_error"
|
||||
logger.warning(
|
||||
"synthesis %s query=%r evidence_n=%d elapsed_ms=%.0f",
|
||||
llm_error, query[:80], len(evidence), elapsed_ms,
|
||||
)
|
||||
return SynthesisResult(
|
||||
status=status,
|
||||
answer=None,
|
||||
used_citations=[],
|
||||
confidence=None,
|
||||
refused=False,
|
||||
refuse_reason=None,
|
||||
elapsed_ms=elapsed_ms,
|
||||
cache_hit=False,
|
||||
hallucination_flags=[llm_error],
|
||||
raw_preview=None,
|
||||
)
|
||||
|
||||
parsed = parse_json_response(raw or "")
|
||||
if not isinstance(parsed, dict):
|
||||
logger.warning(
|
||||
"synthesis parse_failed query=%r raw=%r elapsed_ms=%.0f",
|
||||
query[:80], (raw or "")[:200], elapsed_ms,
|
||||
)
|
||||
return SynthesisResult(
|
||||
status="parse_failed",
|
||||
answer=None,
|
||||
used_citations=[],
|
||||
confidence=None,
|
||||
refused=False,
|
||||
refuse_reason=None,
|
||||
elapsed_ms=elapsed_ms,
|
||||
cache_hit=False,
|
||||
hallucination_flags=["parse_failed"],
|
||||
raw_preview=(raw or "")[:500] if debug else None,
|
||||
)
|
||||
|
||||
# ── JSON 필드 검증 ──────────────────────────────────
|
||||
answer_raw = parsed.get("answer", "")
|
||||
if not isinstance(answer_raw, str):
|
||||
answer_raw = ""
|
||||
|
||||
conf_raw = parsed.get("confidence", "low")
|
||||
if conf_raw not in ("high", "medium", "low"):
|
||||
conf_raw = "low"
|
||||
|
||||
refused_raw = bool(parsed.get("refused", False))
|
||||
refuse_reason_raw = parsed.get("refuse_reason")
|
||||
if refuse_reason_raw is not None and not isinstance(refuse_reason_raw, str):
|
||||
refuse_reason_raw = None
|
||||
|
||||
# refused 면 answer 무시 + citations 비움
|
||||
if refused_raw:
|
||||
result = SynthesisResult(
|
||||
status="completed",
|
||||
answer=None,
|
||||
used_citations=[],
|
||||
confidence=conf_raw, # type: ignore[arg-type]
|
||||
refused=True,
|
||||
refuse_reason=refuse_reason_raw,
|
||||
elapsed_ms=elapsed_ms,
|
||||
cache_hit=False,
|
||||
hallucination_flags=[],
|
||||
raw_preview=(raw or "")[:500] if debug else None,
|
||||
)
|
||||
logger.info(
|
||||
"synthesis refused query=%r evidence_n=%d conf=%s elapsed_ms=%.0f reason=%r",
|
||||
query[:80], len(evidence), conf_raw, elapsed_ms, (refuse_reason_raw or "")[:80],
|
||||
)
|
||||
# refused 는 캐시 금지 (_should_cache)
|
||||
return result
|
||||
|
||||
# ── citation 검증 ───────────────────────────────────
|
||||
corrected_answer, used_citations, flags = _validate_citations(
|
||||
answer_raw, n_max=len(evidence)
|
||||
)
|
||||
|
||||
# answer 가 공백만 남으면 low confidence 로 강등
|
||||
if not corrected_answer.strip():
|
||||
corrected_answer_final: str | None = None
|
||||
conf_raw = "low"
|
||||
flags.append("empty_after_validation")
|
||||
else:
|
||||
corrected_answer_final = corrected_answer
|
||||
|
||||
result = SynthesisResult(
|
||||
status="completed",
|
||||
answer=corrected_answer_final,
|
||||
used_citations=used_citations,
|
||||
confidence=conf_raw, # type: ignore[arg-type]
|
||||
refused=False,
|
||||
refuse_reason=None,
|
||||
elapsed_ms=elapsed_ms,
|
||||
cache_hit=False,
|
||||
hallucination_flags=flags,
|
||||
raw_preview=(raw or "")[:500] if debug else None,
|
||||
)
|
||||
|
||||
logger.info(
|
||||
"synthesis ok query=%r evidence_n=%d answer_len=%d citations=%d conf=%s flags=%s elapsed_ms=%.0f",
|
||||
query[:80],
|
||||
len(evidence),
|
||||
len(corrected_answer_final or ""),
|
||||
len(used_citations),
|
||||
conf_raw,
|
||||
",".join(flags) if flags else "-",
|
||||
elapsed_ms,
|
||||
)
|
||||
|
||||
# 조건부 캐시 저장
|
||||
set_cached(query, chunk_ids, result)
|
||||
return result
|
||||
|
||||
@@ -244,6 +244,7 @@ async def record_search_event(
|
||||
results: list[Any],
|
||||
mode: str,
|
||||
confidence: float | None = None,
|
||||
analyzer_confidence: float | None = None,
|
||||
) -> None:
|
||||
"""검색 응답 직후 호출. 실패 트리거에 해당하면 로그 INSERT.
|
||||
|
||||
@@ -253,6 +254,13 @@ async def record_search_event(
|
||||
confidence 파라미터:
|
||||
- None이면 results 기준으로 자체 계산 (legacy 호출용).
|
||||
- 명시적으로 전달되면 그 값 사용 (Phase 0.5+: fusion 적용 전 raw 신호 기준).
|
||||
|
||||
analyzer_confidence (Phase 2.1):
|
||||
- QueryAnalyzer의 쿼리 분석 신뢰도 (result confidence와 다른 축).
|
||||
- `result.confidence` 가 낮더라도 `analyzer_confidence` 가 높으면
|
||||
"retrieval failure" (corpus에 정답 없음)로 해석 가능.
|
||||
- 반대로 analyzer_confidence < 0.5 이면 "query understanding failure" 해석.
|
||||
- Phase 2.1에서는 context에만 기록 (failure_reason 분류는 Phase 2.2+에서).
|
||||
"""
|
||||
if user_id is None:
|
||||
return
|
||||
@@ -260,7 +268,10 @@ async def record_search_event(
|
||||
if confidence is None:
|
||||
confidence = compute_confidence(results, mode)
|
||||
result_count = len(results)
|
||||
base_ctx = _build_context(results, mode, extra={"confidence": confidence})
|
||||
extra_ctx: dict[str, Any] = {"confidence": confidence}
|
||||
if analyzer_confidence is not None:
|
||||
extra_ctx["analyzer_confidence"] = float(analyzer_confidence)
|
||||
base_ctx = _build_context(results, mode, extra=extra_ctx)
|
||||
|
||||
# ── 1) reformulation 체크 (이전 쿼리가 있으면 그걸 로깅) ──
|
||||
prior = await _record_and_get_prior(user_id, query)
|
||||
|
||||
44
app/workers/digest_worker.py
Normal file
44
app/workers/digest_worker.py
Normal file
@@ -0,0 +1,44 @@
|
||||
"""Phase 4: Global Digest 워커.
|
||||
|
||||
7일 뉴스를 country × topic 으로 묶어 cluster-level LLM 요약을 생성하고
|
||||
global_digests / digest_topics 테이블에 저장한다.
|
||||
|
||||
- APScheduler cron (매일 04:00 KST) + 수동 호출 공용 진입점
|
||||
- PIPELINE_HARD_CAP = 600초 hard cap 으로 cron stuck 절대 방지
|
||||
- 단독 실행: `python -m workers.digest_worker`
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
|
||||
from core.utils import setup_logger
|
||||
from services.digest.pipeline import run_digest_pipeline
|
||||
|
||||
logger = setup_logger("digest_worker")
|
||||
|
||||
PIPELINE_HARD_CAP = 600 # 10분 hard cap
|
||||
|
||||
|
||||
async def run() -> None:
|
||||
"""APScheduler + 수동 호출 공용 진입점.
|
||||
|
||||
pipeline 자체는 timeout 으로 감싸지 않음 (per-call timeout 은 summarizer 가 처리).
|
||||
여기서는 전체 hard cap 만 강제.
|
||||
"""
|
||||
try:
|
||||
result = await asyncio.wait_for(
|
||||
run_digest_pipeline(),
|
||||
timeout=PIPELINE_HARD_CAP,
|
||||
)
|
||||
logger.info(f"[global_digest] 워커 완료: {result}")
|
||||
except asyncio.TimeoutError:
|
||||
logger.error(
|
||||
f"[global_digest] HARD CAP {PIPELINE_HARD_CAP}s 초과 — 워커 강제 중단. "
|
||||
f"기존 digest 는 commit 시점에만 갱신되므로 그대로 유지됨. "
|
||||
f"다음 cron 실행에서 재시도."
|
||||
)
|
||||
except Exception as e:
|
||||
logger.exception(f"[global_digest] 워커 실패: {e}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(run())
|
||||
@@ -1,4 +1,19 @@
|
||||
"""벡터 임베딩 워커 — GPU 서버 bge-m3 호출"""
|
||||
"""벡터 임베딩 워커 — GPU 서버 bge-m3 호출 (doc-level recall vector)
|
||||
|
||||
## 구조 원칙 (영구)
|
||||
|
||||
doc-level embedding 은 "요약 벡터" (recall 담당). chunk-level embedding (chunk_worker)
|
||||
이 precision 을 담당하는 hybrid 구조 (`retrieval_service._search_vector_docs` 참조).
|
||||
|
||||
**본문 일부를 임베딩 입력으로 쓰면 안 된다**. 500k자 교재의 앞 6000자는 표지+목차 —
|
||||
임베딩 품질이 쓰레기가 된다. 대신 AI 가 이미 생성한 `ai_summary` 를 중심으로 한
|
||||
metadata (title + summary + tags) 를 입력으로 사용한다.
|
||||
|
||||
이 선택의 이점:
|
||||
- 입력 길이 ~1500자 이하 → Ollama 기본 context 안전 (num_ctx 조정 불필요)
|
||||
- AI 요약은 "전체 문서의 압축 의미" → doc-level 역할에 정확히 부합
|
||||
- 태그는 상위 semantic signal → noise 없음
|
||||
"""
|
||||
|
||||
from datetime import datetime, timezone
|
||||
|
||||
@@ -10,27 +25,59 @@ from models.document import Document
|
||||
|
||||
logger = setup_logger("embed_worker")
|
||||
|
||||
# 임베딩용 텍스트 최대 길이 (bge-m3: 8192 토큰)
|
||||
MAX_EMBED_TEXT = 6000
|
||||
# ─── 품질 가드 상수 ──────────────────────────────────
|
||||
MIN_SUMMARY_CHARS = 50 # 너무 짧은 요약은 저품질 — 본문 fallback 사용
|
||||
MAX_TAGS = 5 # 상위 N개만 (과도한 태그는 임베딩 노이즈)
|
||||
FALLBACK_PREFIX_CHARS = 800 # ai_summary 누락/저품질 시 본문 프리픽스
|
||||
|
||||
EMBED_MODEL_VERSION = "bge-m3"
|
||||
|
||||
|
||||
def _build_embed_input(doc: Document) -> str:
|
||||
"""doc-level recall vector 용 metadata 입력 빌더.
|
||||
|
||||
Returns:
|
||||
임베딩 모델에 보낼 문자열. 평균 ~500~1500자.
|
||||
|
||||
품질 가드:
|
||||
- ai_summary 가 MIN_SUMMARY_CHARS 미만이면 저품질로 보고 본문 fallback
|
||||
- tags 는 상위 MAX_TAGS 개만 (과도한 태그는 임베딩에 노이즈)
|
||||
- ai_domain 은 현 단계에서 제외 (taxonomy 품질이 안정화될 때까지)
|
||||
"""
|
||||
parts = [f"제목: {(doc.title or '').strip()}"]
|
||||
|
||||
summary = (doc.ai_summary or "").strip()
|
||||
use_summary = len(summary) >= MIN_SUMMARY_CHARS
|
||||
if use_summary:
|
||||
parts.append(f"요약: {summary}")
|
||||
|
||||
# tags: 리스트면 상위 MAX_TAGS, 문자열이면 그대로 (이상 케이스)
|
||||
if doc.ai_tags:
|
||||
if isinstance(doc.ai_tags, list):
|
||||
tags_list = [str(t).strip() for t in doc.ai_tags[:MAX_TAGS] if t]
|
||||
tags_str = ", ".join(tags_list)
|
||||
else:
|
||||
tags_str = str(doc.ai_tags)
|
||||
if tags_str:
|
||||
parts.append(f"키워드: {tags_str}")
|
||||
|
||||
# ai_summary 품질 미달 시 본문 프리픽스 fallback (최소 recall 확보)
|
||||
if not use_summary and doc.extracted_text:
|
||||
parts.append(f"본문: {doc.extracted_text[:FALLBACK_PREFIX_CHARS]}")
|
||||
|
||||
return "\n".join(p for p in parts if p).strip()
|
||||
|
||||
|
||||
async def process(document_id: int, session: AsyncSession) -> None:
|
||||
"""문서 벡터 임베딩 생성"""
|
||||
"""문서 벡터 임베딩 생성 (doc-level recall vector)"""
|
||||
doc = await session.get(Document, document_id)
|
||||
if not doc:
|
||||
raise ValueError(f"문서 ID {document_id}를 찾을 수 없음")
|
||||
|
||||
if not doc.extracted_text:
|
||||
raise ValueError(f"문서 ID {document_id}: extracted_text가 비어있음")
|
||||
|
||||
# title + 본문 앞부분을 결합하여 임베딩 입력 생성
|
||||
title_part = doc.title or ""
|
||||
text_part = doc.extracted_text[:MAX_EMBED_TEXT]
|
||||
embed_input = f"{title_part}\n\n{text_part}".strip()
|
||||
embed_input = _build_embed_input(doc)
|
||||
|
||||
if not embed_input:
|
||||
logger.warning(f"[임베딩] document_id={document_id}: 빈 텍스트, 스킵")
|
||||
logger.warning(f"[임베딩] document_id={document_id}: 빈 입력, 스킵")
|
||||
return
|
||||
|
||||
client = AIClient()
|
||||
@@ -39,6 +86,9 @@ async def process(document_id: int, session: AsyncSession) -> None:
|
||||
doc.embedding = vector
|
||||
doc.embed_model_version = EMBED_MODEL_VERSION
|
||||
doc.embedded_at = datetime.now(timezone.utc)
|
||||
logger.info(f"[임베딩] document_id={document_id}: {len(vector)}차원 벡터 생성")
|
||||
logger.info(
|
||||
f"[임베딩] document_id={document_id}: {len(vector)}차원 벡터 "
|
||||
f"(input_len={len(embed_input)}, has_summary={bool(doc.ai_summary)})"
|
||||
)
|
||||
finally:
|
||||
await client.close()
|
||||
|
||||
@@ -39,7 +39,8 @@ async def process(document_id: int, session: AsyncSession) -> None:
|
||||
if not full_path.exists():
|
||||
raise FileNotFoundError(f"파일 없음: {full_path}")
|
||||
text = full_path.read_text(encoding="utf-8", errors="replace")
|
||||
doc.extracted_text = text
|
||||
# NUL 바이트 제거 (Postgres TEXT 저장 시 CharacterNotInRepertoireError 방지)
|
||||
doc.extracted_text = text.replace("\x00", "")
|
||||
doc.extracted_at = datetime.now(timezone.utc)
|
||||
doc.extractor_version = "direct_read"
|
||||
logger.info(f"[텍스트] {doc.file_path} ({len(text)}자)")
|
||||
@@ -70,7 +71,8 @@ async def process(document_id: int, session: AsyncSession) -> None:
|
||||
resp.raise_for_status()
|
||||
|
||||
data = resp.json()
|
||||
doc.extracted_text = data.get("markdown", "")
|
||||
# NUL 바이트 제거 (Postgres TEXT 저장 시 CharacterNotInRepertoireError 방지)
|
||||
doc.extracted_text = data.get("markdown", "").replace("\x00", "")
|
||||
doc.extracted_at = datetime.now(timezone.utc)
|
||||
doc.extractor_version = EXTRACTOR_VERSION
|
||||
logger.info(f"[kordoc] {doc.file_path} ({len(doc.extracted_text)}자)")
|
||||
@@ -106,7 +108,8 @@ async def process(document_id: int, session: AsyncSession) -> None:
|
||||
out_file = tmp_dir / f"input_{document_id}.{out_ext}"
|
||||
if out_file.exists():
|
||||
text = out_file.read_text(encoding="utf-8", errors="replace")
|
||||
doc.extracted_text = text[:15000]
|
||||
# NUL 바이트 제거 (Postgres TEXT 저장 시 CharacterNotInRepertoireError 방지)
|
||||
doc.extracted_text = text.replace("\x00", "")[:15000]
|
||||
doc.extracted_at = datetime.now(timezone.utc)
|
||||
doc.extractor_version = "libreoffice"
|
||||
out_file.unlink()
|
||||
|
||||
@@ -133,7 +133,9 @@ async def consume_queue():
|
||||
if not item:
|
||||
logger.warning(f"[{stage}] queue_id={queue_id} 없음 (삭제됨?), skip")
|
||||
continue
|
||||
item.error_message = str(e)[:500]
|
||||
# 빈 메시지 방어: str → repr → 클래스명 순 fallback
|
||||
err_text = str(e) or repr(e) or type(e).__name__
|
||||
item.error_message = err_text[:500]
|
||||
if item.attempts >= item.max_attempts:
|
||||
item.status = "failed"
|
||||
logger.error(f"[{stage}] document_id={document_id} 영구 실패: {e}")
|
||||
|
||||
@@ -6,8 +6,8 @@ ai:
|
||||
|
||||
models:
|
||||
primary:
|
||||
endpoint: "http://100.76.254.116:8800/v1/chat/completions"
|
||||
model: "mlx-community/Qwen3.5-35B-A3B-4bit"
|
||||
endpoint: "http://100.76.254.116:8801/v1/chat/completions"
|
||||
model: "mlx-community/gemma-4-26b-a4b-it-8bit"
|
||||
max_tokens: 4096
|
||||
timeout: 60
|
||||
|
||||
|
||||
@@ -89,6 +89,7 @@ services:
|
||||
- ./config.yaml:/app/config.yaml:ro
|
||||
- ./scripts:/app/scripts:ro
|
||||
- ./logs:/app/logs
|
||||
- ./migrations:/app/migrations:ro
|
||||
depends_on:
|
||||
postgres:
|
||||
condition: service_healthy
|
||||
|
||||
@@ -6,6 +6,11 @@
|
||||
|
||||
let { onupload = () => {} } = $props();
|
||||
|
||||
// home-caddy `request_body max_size 100MB` (infra_inventory.md D8 / Cloudflare 섹션 참조).
|
||||
// 100MB 초과 파일은 NAS PKM 폴더 직접 마운트 → file_watcher 5분 간격 자동 인덱싱 경로 사용.
|
||||
const MAX_UPLOAD_BYTES = 100 * 1024 * 1024;
|
||||
const NAS_FALLBACK_HINT = '대용량 파일은 NAS의 PKM 폴더에 직접 두면 file_watcher 가 5분 이내에 자동 인덱싱합니다.';
|
||||
|
||||
let dragging = $state(false);
|
||||
let uploading = $state(false);
|
||||
let uploadFiles = $state([]);
|
||||
@@ -56,7 +61,24 @@
|
||||
});
|
||||
|
||||
async function handleFiles(fileList) {
|
||||
const files = Array.from(fileList || []);
|
||||
const allFiles = Array.from(fileList || []);
|
||||
if (allFiles.length === 0) return;
|
||||
|
||||
// 사전 크기 검사 — 100MB 초과는 즉시 차단 + NAS file_watcher 안내
|
||||
const tooLarge = allFiles.filter(f => f.size > MAX_UPLOAD_BYTES);
|
||||
const files = allFiles.filter(f => f.size <= MAX_UPLOAD_BYTES);
|
||||
|
||||
if (tooLarge.length > 0) {
|
||||
const names = tooLarge
|
||||
.map(f => `${f.name} (${(f.size / 1024 / 1024).toFixed(1)}MB)`)
|
||||
.join(', ');
|
||||
addToast(
|
||||
'error',
|
||||
`100MB 초과 파일은 업로드 불가 (${tooLarge.length}건: ${names}). ${NAS_FALLBACK_HINT}`,
|
||||
10000
|
||||
);
|
||||
}
|
||||
|
||||
if (files.length === 0) return;
|
||||
|
||||
uploading = true;
|
||||
@@ -78,6 +100,14 @@
|
||||
} catch (err) {
|
||||
uploadFiles[i].status = 'failed';
|
||||
failed++;
|
||||
// 서버 측 413 (사전 검사 통과했지만 인프라 한도에 걸린 경우)
|
||||
if (err && err.status === 413) {
|
||||
addToast(
|
||||
'error',
|
||||
`${files[i].name}: 서버 거절 (Payload Too Large). ${NAS_FALLBACK_HINT}`,
|
||||
10000
|
||||
);
|
||||
}
|
||||
}
|
||||
uploadFiles = [...uploadFiles];
|
||||
}
|
||||
|
||||
144
frontend/src/lib/components/ask/AskAnswer.svelte
Normal file
144
frontend/src/lib/components/ask/AskAnswer.svelte
Normal file
@@ -0,0 +1,144 @@
|
||||
<!--
|
||||
AskAnswer.svelte — /ask 페이지 상단 패널.
|
||||
|
||||
Answer 본문 + clickable [n] citations + 신뢰도/상태 Badge.
|
||||
status != completed 또는 refused=true → warning empty state +
|
||||
no_results_reason + "검색 결과 확인하기" 역링크.
|
||||
-->
|
||||
<script lang="ts">
|
||||
import Badge from '$lib/components/ui/Badge.svelte';
|
||||
import Button from '$lib/components/ui/Button.svelte';
|
||||
import EmptyState from '$lib/components/ui/EmptyState.svelte';
|
||||
import Skeleton from '$lib/components/ui/Skeleton.svelte';
|
||||
import { AlertTriangle, Sparkles } from 'lucide-svelte';
|
||||
import type { AskResponse, Confidence, SynthesisStatus } from '$lib/types/ask';
|
||||
|
||||
interface Props {
|
||||
data: AskResponse | null;
|
||||
loading: boolean;
|
||||
onCitationClick: (n: number) => void;
|
||||
}
|
||||
|
||||
let { data, loading, onCitationClick }: Props = $props();
|
||||
|
||||
type Token =
|
||||
| { type: 'text'; value: string }
|
||||
| { type: 'cite'; n: number; raw: string };
|
||||
|
||||
function splitAnswer(text: string): Token[] {
|
||||
return text
|
||||
.split(/(\[\d+\])/g)
|
||||
.filter(Boolean)
|
||||
.map((tok): Token => {
|
||||
const m = tok.match(/^\[(\d+)\]$/);
|
||||
return m
|
||||
? { type: 'cite', n: Number(m[1]), raw: tok }
|
||||
: { type: 'text', value: tok };
|
||||
});
|
||||
}
|
||||
|
||||
function confidenceTone(
|
||||
c: Confidence | null,
|
||||
): 'success' | 'warning' | 'error' | 'neutral' {
|
||||
if (c === 'high') return 'success';
|
||||
if (c === 'medium') return 'warning';
|
||||
if (c === 'low') return 'error';
|
||||
return 'neutral';
|
||||
}
|
||||
|
||||
function confidenceLabel(c: Confidence | null): string {
|
||||
if (c === 'high') return '높음';
|
||||
if (c === 'medium') return '중간';
|
||||
if (c === 'low') return '낮음';
|
||||
return '없음';
|
||||
}
|
||||
|
||||
const STATUS_LABEL: Record<SynthesisStatus, string> = {
|
||||
completed: '답변 완료',
|
||||
timeout: '답변 지연',
|
||||
skipped: '답변 생략',
|
||||
no_evidence: '근거 없음',
|
||||
parse_failed: '형식 오류',
|
||||
llm_error: 'AI 오류',
|
||||
};
|
||||
|
||||
let tokens = $derived(data?.ai_answer ? splitAnswer(data.ai_answer) : []);
|
||||
let showAnswer = $derived(
|
||||
!!data && !!data.ai_answer && data.synthesis_status === 'completed' && !data.refused,
|
||||
);
|
||||
let showWarning = $derived(!!data && !showAnswer);
|
||||
</script>
|
||||
|
||||
<section class="bg-surface border border-default rounded-card p-5">
|
||||
<!-- 헤더 -->
|
||||
<div class="flex items-start justify-between gap-3 mb-4">
|
||||
<div>
|
||||
<p class="text-[10px] font-semibold tracking-wider uppercase text-dim flex items-center gap-1.5">
|
||||
<Sparkles size={12} /> AI Answer
|
||||
</p>
|
||||
<h2 class="mt-1 text-base font-semibold text-text">근거 기반 답변</h2>
|
||||
</div>
|
||||
|
||||
{#if data && !loading}
|
||||
<div class="flex flex-wrap gap-1.5">
|
||||
<Badge tone={confidenceTone(data.confidence)} size="sm">
|
||||
신뢰도 {confidenceLabel(data.confidence)}
|
||||
</Badge>
|
||||
<Badge tone="neutral" size="sm">
|
||||
{STATUS_LABEL[data.synthesis_status]}
|
||||
</Badge>
|
||||
{#if data.synthesis_ms > 0}
|
||||
<Badge tone="neutral" size="sm">
|
||||
{Math.round(data.synthesis_ms)}ms
|
||||
</Badge>
|
||||
{/if}
|
||||
</div>
|
||||
{/if}
|
||||
</div>
|
||||
|
||||
<!-- 본문 -->
|
||||
{#if loading}
|
||||
<div class="space-y-3">
|
||||
<Skeleton w="w-3/4" h="h-4" />
|
||||
<Skeleton w="w-full" h="h-4" />
|
||||
<Skeleton w="w-5/6" h="h-4" />
|
||||
<p class="mt-4 text-xs text-dim flex items-center gap-2">
|
||||
<span class="inline-block w-3 h-3 rounded-full border-2 border-dim border-t-accent animate-spin"></span>
|
||||
근거 기반 답변 생성 중… 약 15초 소요
|
||||
</p>
|
||||
</div>
|
||||
{:else if showAnswer && data}
|
||||
<div class="text-sm leading-7 text-text">
|
||||
{#each tokens as tok}
|
||||
{#if tok.type === 'cite'}
|
||||
<button
|
||||
type="button"
|
||||
class="inline-block align-baseline text-accent font-semibold hover:underline focus-visible:outline-none focus-visible:ring-2 focus-visible:ring-accent-ring rounded px-0.5"
|
||||
onclick={() => onCitationClick(tok.n)}
|
||||
aria-label={`인용 ${tok.n}번 보기`}
|
||||
>
|
||||
{tok.raw}
|
||||
</button>
|
||||
{:else}
|
||||
<span>{tok.value}</span>
|
||||
{/if}
|
||||
{/each}
|
||||
</div>
|
||||
{:else if showWarning && data}
|
||||
<EmptyState
|
||||
icon={AlertTriangle}
|
||||
title={data.refused && data.no_results_reason
|
||||
? data.no_results_reason
|
||||
: (data.no_results_reason ?? '관련 근거를 찾지 못했습니다.')}
|
||||
description="검색 결과를 직접 확인해 보세요."
|
||||
>
|
||||
<Button
|
||||
variant="secondary"
|
||||
size="sm"
|
||||
href={`/documents?q=${encodeURIComponent(data.query)}`}
|
||||
>
|
||||
검색 결과 확인하기
|
||||
</Button>
|
||||
</EmptyState>
|
||||
{/if}
|
||||
</section>
|
||||
91
frontend/src/lib/components/ask/AskEvidence.svelte
Normal file
91
frontend/src/lib/components/ask/AskEvidence.svelte
Normal file
@@ -0,0 +1,91 @@
|
||||
<!--
|
||||
AskEvidence.svelte — /ask 페이지 우측 sticky 패널.
|
||||
|
||||
⚠ 영구 룰 (Phase 3.4 plan):
|
||||
`citation.full_snippet` 은 UI 에 직접 렌더 금지. debug 모드(`?debug=1`)
|
||||
에서 hover tooltip 으로만 조건부 노출 가능.
|
||||
|
||||
이 규칙이 깨지면 backend span-precision UX 가치가 사라진다. 코드 리뷰에서
|
||||
반드시 reject. span_text 만 본문으로 노출한다.
|
||||
-->
|
||||
<script lang="ts">
|
||||
import Badge from '$lib/components/ui/Badge.svelte';
|
||||
import EmptyState from '$lib/components/ui/EmptyState.svelte';
|
||||
import Skeleton from '$lib/components/ui/Skeleton.svelte';
|
||||
import { BookOpen } from 'lucide-svelte';
|
||||
import type { AskResponse } from '$lib/types/ask';
|
||||
|
||||
interface Props {
|
||||
data: AskResponse | null;
|
||||
loading: boolean;
|
||||
activeCitation: number | null;
|
||||
registerCitation: (n: number, node: HTMLElement) => { destroy: () => void };
|
||||
}
|
||||
|
||||
let { data, loading, activeCitation, registerCitation }: Props = $props();
|
||||
|
||||
let citations = $derived(data?.citations ?? []);
|
||||
</script>
|
||||
|
||||
<section class="bg-surface border border-default rounded-card p-5">
|
||||
<div class="flex items-start justify-between gap-3 mb-4">
|
||||
<div>
|
||||
<p class="text-[10px] font-semibold tracking-wider uppercase text-dim flex items-center gap-1.5">
|
||||
<BookOpen size={12} /> Evidence Highlights
|
||||
</p>
|
||||
<h3 class="mt-1 text-sm font-semibold text-text">인용 근거</h3>
|
||||
</div>
|
||||
{#if data && !loading}
|
||||
<Badge tone="neutral" size="sm">{citations.length}개</Badge>
|
||||
{/if}
|
||||
</div>
|
||||
|
||||
{#if loading}
|
||||
<div class="space-y-3">
|
||||
{#each Array(2) as _}
|
||||
<div class="border border-default rounded-card p-4 space-y-2">
|
||||
<Skeleton w="w-24" h="h-3" />
|
||||
<Skeleton w="w-full" h="h-3" />
|
||||
<Skeleton w="w-5/6" h="h-3" />
|
||||
<Skeleton w="w-3/4" h="h-3" />
|
||||
</div>
|
||||
{/each}
|
||||
</div>
|
||||
{:else if citations.length === 0}
|
||||
<EmptyState title="표시할 근거가 없습니다." class="py-6" />
|
||||
{:else}
|
||||
<div class="space-y-3">
|
||||
{#each citations as citation (citation.n)}
|
||||
{@const isActive = activeCitation === citation.n}
|
||||
<article
|
||||
class="border rounded-card p-4 transition-colors {isActive
|
||||
? 'border-accent ring-2 ring-accent/20 bg-accent/5'
|
||||
: 'border-default'}"
|
||||
use:registerCitation={citation.n}
|
||||
>
|
||||
<div class="flex items-start gap-2">
|
||||
<span class="text-accent font-bold text-sm shrink-0">[{citation.n}]</span>
|
||||
<div class="flex-1 min-w-0">
|
||||
<strong class="block text-sm text-text truncate">
|
||||
{citation.title ?? `문서 ${citation.doc_id}`}
|
||||
</strong>
|
||||
{#if citation.section_title}
|
||||
<p class="mt-0.5 text-xs text-dim truncate">{citation.section_title}</p>
|
||||
{/if}
|
||||
</div>
|
||||
</div>
|
||||
|
||||
<!-- ⚠ span_text 만 렌더. full_snippet 금지 -->
|
||||
<p class="mt-3 text-sm leading-relaxed text-text whitespace-pre-wrap">
|
||||
{citation.span_text}
|
||||
</p>
|
||||
|
||||
<div class="mt-3 flex gap-2 text-[10px] text-dim">
|
||||
<span>relevance {citation.relevance.toFixed(2)}</span>
|
||||
<span>rerank {citation.rerank_score.toFixed(2)}</span>
|
||||
</div>
|
||||
</article>
|
||||
{/each}
|
||||
</div>
|
||||
{/if}
|
||||
</section>
|
||||
78
frontend/src/lib/components/ask/AskResults.svelte
Normal file
78
frontend/src/lib/components/ask/AskResults.svelte
Normal file
@@ -0,0 +1,78 @@
|
||||
<!--
|
||||
AskResults.svelte — /ask 페이지 하단 패널.
|
||||
|
||||
검색 결과 리스트. DocumentCard 재사용 X — SearchResult 필드 셋이 달라서
|
||||
의존성 리스크 회피. inline 간단 카드로 title/score/snippet/section_title 표시.
|
||||
클릭 시 `/documents/{id}` 로 이동.
|
||||
-->
|
||||
<script lang="ts">
|
||||
import Badge from '$lib/components/ui/Badge.svelte';
|
||||
import EmptyState from '$lib/components/ui/EmptyState.svelte';
|
||||
import Skeleton from '$lib/components/ui/Skeleton.svelte';
|
||||
import { FileText } from 'lucide-svelte';
|
||||
import type { AskResponse } from '$lib/types/ask';
|
||||
|
||||
interface Props {
|
||||
data: AskResponse | null;
|
||||
loading: boolean;
|
||||
}
|
||||
|
||||
let { data, loading }: Props = $props();
|
||||
|
||||
let results = $derived(data?.results ?? []);
|
||||
</script>
|
||||
|
||||
<section class="bg-surface border border-default rounded-card p-5">
|
||||
<div class="flex items-start justify-between gap-3 mb-4">
|
||||
<div>
|
||||
<p class="text-[10px] font-semibold tracking-wider uppercase text-dim flex items-center gap-1.5">
|
||||
<FileText size={12} /> Search Results
|
||||
</p>
|
||||
<h3 class="mt-1 text-sm font-semibold text-text">검색 결과</h3>
|
||||
</div>
|
||||
{#if data && !loading}
|
||||
<Badge tone="neutral" size="sm">{data.total}개</Badge>
|
||||
{/if}
|
||||
</div>
|
||||
|
||||
{#if loading}
|
||||
<div class="space-y-3">
|
||||
{#each Array(5) as _}
|
||||
<div class="border border-default rounded-card p-4 space-y-2">
|
||||
<Skeleton w="w-2/3" h="h-4" />
|
||||
<Skeleton w="w-full" h="h-3" />
|
||||
<Skeleton w="w-4/5" h="h-3" />
|
||||
</div>
|
||||
{/each}
|
||||
</div>
|
||||
{:else if results.length === 0}
|
||||
<EmptyState title="검색 결과가 없습니다." class="py-6" />
|
||||
{:else}
|
||||
<div class="space-y-3">
|
||||
{#each results as result (result.id)}
|
||||
<a
|
||||
href={`/documents/${result.id}`}
|
||||
class="block border border-default rounded-card p-4 hover:border-accent hover:bg-surface-hover transition-colors"
|
||||
>
|
||||
<div class="flex items-start justify-between gap-3">
|
||||
<strong class="text-sm text-text flex-1 min-w-0 truncate">
|
||||
{result.title ?? `문서 ${result.id}`}
|
||||
</strong>
|
||||
<div class="flex gap-1.5 text-[10px] text-dim shrink-0">
|
||||
<span>score {result.score.toFixed(2)}</span>
|
||||
{#if result.rerank_score != null}
|
||||
<span>rerank {result.rerank_score.toFixed(2)}</span>
|
||||
{/if}
|
||||
</div>
|
||||
</div>
|
||||
{#if result.section_title}
|
||||
<p class="mt-1 text-xs text-dim truncate">{result.section_title}</p>
|
||||
{/if}
|
||||
{#if result.snippet}
|
||||
<p class="mt-2 text-xs text-dim line-clamp-2">{result.snippet}</p>
|
||||
{/if}
|
||||
</a>
|
||||
{/each}
|
||||
</div>
|
||||
{/if}
|
||||
</section>
|
||||
64
frontend/src/lib/types/ask.ts
Normal file
64
frontend/src/lib/types/ask.ts
Normal file
@@ -0,0 +1,64 @@
|
||||
/**
|
||||
* Phase 3.4: `/api/search/ask` 응답 타입.
|
||||
*
|
||||
* Backend Pydantic 모델 (`app/api/search.py::AskResponse`) 과 1:1 매칭.
|
||||
* 필드 변경 시 양쪽 동시 수정 필수.
|
||||
*/
|
||||
|
||||
export type SynthesisStatus =
|
||||
| 'completed'
|
||||
| 'timeout'
|
||||
| 'skipped'
|
||||
| 'no_evidence'
|
||||
| 'parse_failed'
|
||||
| 'llm_error';
|
||||
|
||||
export type Confidence = 'high' | 'medium' | 'low';
|
||||
|
||||
export interface Citation {
|
||||
n: number;
|
||||
chunk_id: number | null;
|
||||
doc_id: number;
|
||||
title: string | null;
|
||||
section_title: string | null;
|
||||
/** LLM이 추출한 50~300자 핵심 span. UI에서 이것만 노출. */
|
||||
span_text: string;
|
||||
/**
|
||||
* 원본 800자 window.
|
||||
*
|
||||
* ⚠ UI 기본 경로에서 절대 렌더 금지. debug 모드에서 hover tooltip 용도로만
|
||||
* 조건부 노출 가능. full_snippet을 보여주면 backend span-precision UX
|
||||
* 가치가 사라진다 (plan §Evidence 표시 규칙).
|
||||
*/
|
||||
full_snippet: string;
|
||||
relevance: number;
|
||||
rerank_score: number;
|
||||
}
|
||||
|
||||
export interface SearchResult {
|
||||
id: number;
|
||||
title: string | null;
|
||||
ai_domain: string | null;
|
||||
ai_summary: string | null;
|
||||
file_format: string;
|
||||
score: number;
|
||||
snippet: string | null;
|
||||
match_reason: string | null;
|
||||
chunk_id: number | null;
|
||||
chunk_index: number | null;
|
||||
section_title: string | null;
|
||||
rerank_score: number | null;
|
||||
}
|
||||
|
||||
export interface AskResponse {
|
||||
results: SearchResult[];
|
||||
ai_answer: string | null;
|
||||
citations: Citation[];
|
||||
synthesis_status: SynthesisStatus;
|
||||
synthesis_ms: number;
|
||||
confidence: Confidence | null;
|
||||
refused: boolean;
|
||||
no_results_reason: string | null;
|
||||
query: string;
|
||||
total: number;
|
||||
}
|
||||
@@ -80,6 +80,7 @@
|
||||
<SystemStatusDot />
|
||||
</div>
|
||||
<div class="flex items-center gap-1">
|
||||
<Button variant="ghost" size="sm" href="/ask">질문</Button>
|
||||
<Button variant="ghost" size="sm" href="/news">뉴스</Button>
|
||||
<Button variant="ghost" size="sm" href="/inbox">Inbox</Button>
|
||||
<Button variant="ghost" size="sm" href="/settings">설정</Button>
|
||||
|
||||
150
frontend/src/routes/ask/+page.svelte
Normal file
150
frontend/src/routes/ask/+page.svelte
Normal file
@@ -0,0 +1,150 @@
|
||||
<!--
|
||||
/ask — Phase 3.4 Ask Pipeline Frontend.
|
||||
|
||||
URL-driven: `/ask?q=<encoded>` 가 진입점. $effect 로 q 변화 감지 →
|
||||
`/api/search/ask` 호출 → 3-panel 렌더 (Answer / Evidence / Results).
|
||||
|
||||
중복 호출 방지: lastQuery 가드 (hydration + reactive trigger 시 같은 q 2번 발동 방지).
|
||||
-->
|
||||
<script lang="ts">
|
||||
import { page } from '$app/stores';
|
||||
import { goto } from '$app/navigation';
|
||||
import { api, type ApiError } from '$lib/api';
|
||||
import { addToast } from '$lib/stores/toast';
|
||||
import EmptyState from '$lib/components/ui/EmptyState.svelte';
|
||||
import AskAnswer from '$lib/components/ask/AskAnswer.svelte';
|
||||
import AskEvidence from '$lib/components/ask/AskEvidence.svelte';
|
||||
import AskResults from '$lib/components/ask/AskResults.svelte';
|
||||
import { Sparkles, Search } from 'lucide-svelte';
|
||||
import type { AskResponse } from '$lib/types/ask';
|
||||
|
||||
// ── state ───────────────────────────────────────────
|
||||
let queryInput = $state('');
|
||||
let data = $state<AskResponse | null>(null);
|
||||
let loading = $state(false);
|
||||
|
||||
// 중복 호출 방지 가드 (hydration + reactive trigger 이중 발동 방지)
|
||||
let lastQuery = '';
|
||||
|
||||
// citation scroll 연동: Answer 가 [n] 클릭 → Evidence 카드로 이동 + highlight
|
||||
const citationNodes = new Map<number, HTMLElement>();
|
||||
let activeCitation = $state<number | null>(null);
|
||||
|
||||
function registerCitation(n: number, node: HTMLElement) {
|
||||
citationNodes.set(n, node);
|
||||
return {
|
||||
destroy() {
|
||||
citationNodes.delete(n);
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
function scrollToCitation(n: number) {
|
||||
activeCitation = n;
|
||||
const node = citationNodes.get(n);
|
||||
node?.scrollIntoView({ behavior: 'smooth', block: 'center' });
|
||||
}
|
||||
|
||||
// ── submit (URL-driven, back 자동) ──────────────────
|
||||
function submit() {
|
||||
const q = queryInput.trim();
|
||||
if (!q) return;
|
||||
goto(`/ask?q=${encodeURIComponent(q)}`);
|
||||
}
|
||||
|
||||
function handleKeydown(e: KeyboardEvent) {
|
||||
if (e.key === 'Enter' && !e.isComposing) {
|
||||
e.preventDefault();
|
||||
submit();
|
||||
}
|
||||
}
|
||||
|
||||
// ── API 호출 ───────────────────────────────────────
|
||||
async function runAsk(q: string) {
|
||||
loading = true;
|
||||
activeCitation = null;
|
||||
try {
|
||||
data = await api<AskResponse>(
|
||||
`/search/ask?q=${encodeURIComponent(q)}&limit=10`,
|
||||
);
|
||||
} catch (err) {
|
||||
const apiErr = err as ApiError;
|
||||
addToast('error', apiErr.detail || '답변 생성 실패');
|
||||
data = null;
|
||||
} finally {
|
||||
loading = false;
|
||||
}
|
||||
}
|
||||
|
||||
// ── URL → runAsk (중복 가드) ────────────────────────
|
||||
$effect(() => {
|
||||
const q = $page.url.searchParams.get('q') ?? '';
|
||||
queryInput = q;
|
||||
if (!q) {
|
||||
data = null;
|
||||
loading = false;
|
||||
lastQuery = '';
|
||||
return;
|
||||
}
|
||||
if (q === lastQuery) return;
|
||||
lastQuery = q;
|
||||
runAsk(q);
|
||||
});
|
||||
</script>
|
||||
|
||||
<svelte:head>
|
||||
<title>질문 - PKM</title>
|
||||
</svelte:head>
|
||||
|
||||
<div class="h-full overflow-auto">
|
||||
<!-- 상단 검색바 (sticky) -->
|
||||
<div class="sticky top-0 z-10 bg-bg/80 backdrop-blur border-b border-default px-4 py-3">
|
||||
<div class="flex items-center gap-2 max-w-5xl mx-auto">
|
||||
<div class="relative flex-1">
|
||||
<Search
|
||||
size={14}
|
||||
class="absolute left-3 top-1/2 -translate-y-1/2 text-dim pointer-events-none"
|
||||
/>
|
||||
<input
|
||||
data-search-input
|
||||
type="text"
|
||||
bind:value={queryInput}
|
||||
onkeydown={handleKeydown}
|
||||
placeholder="질문을 입력하세요 (/ 키로 포커스)"
|
||||
class="w-full pl-9 pr-3 py-2 bg-surface border border-default rounded-lg text-text text-sm focus:border-accent outline-none"
|
||||
/>
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
||||
|
||||
<!-- 본문 -->
|
||||
<div class="max-w-5xl mx-auto p-4">
|
||||
{#if !queryInput && !loading && !data}
|
||||
<div class="py-16">
|
||||
<EmptyState
|
||||
icon={Sparkles}
|
||||
title="근거 기반 답변을 받아보세요"
|
||||
description="질문을 입력하면 문서에서 근거를 찾아 인용 기반 답변을 생성합니다."
|
||||
/>
|
||||
</div>
|
||||
{:else}
|
||||
<div class="grid gap-4 lg:grid-cols-[1.2fr_0.9fr] items-start">
|
||||
<!-- 좌: Answer + Results -->
|
||||
<div class="flex flex-col gap-4 min-w-0">
|
||||
<AskAnswer {data} {loading} onCitationClick={scrollToCitation} />
|
||||
<AskResults {data} {loading} />
|
||||
</div>
|
||||
|
||||
<!-- 우: Evidence (lg 이상 sticky) -->
|
||||
<div class="lg:sticky lg:top-20 lg:self-start min-w-0">
|
||||
<AskEvidence
|
||||
{data}
|
||||
{loading}
|
||||
{activeCitation}
|
||||
{registerCitation}
|
||||
/>
|
||||
</div>
|
||||
</div>
|
||||
{/if}
|
||||
</div>
|
||||
</div>
|
||||
@@ -3,7 +3,7 @@
|
||||
import { goto } from '$app/navigation';
|
||||
import { api } from '$lib/api';
|
||||
import { addToast } from '$lib/stores/toast';
|
||||
import { Info, List, LayoutGrid, ChevronLeft, X, Plus, Trash2, Tag, FolderTree, Rows3, Rows2 } from 'lucide-svelte';
|
||||
import { Info, List, LayoutGrid, ChevronLeft, X, Plus, Trash2, Tag, FolderTree, Rows3, Rows2, Sparkles } from 'lucide-svelte';
|
||||
import DocumentCard from '$lib/components/DocumentCard.svelte';
|
||||
import DocumentTable from '$lib/components/DocumentTable.svelte';
|
||||
import DocumentViewer from '$lib/components/DocumentViewer.svelte';
|
||||
@@ -441,6 +441,15 @@
|
||||
<option value="trgm">부분매칭</option>
|
||||
<option value="vector">의미검색</option>
|
||||
</select>
|
||||
{#if searchQuery.trim()}
|
||||
<a
|
||||
href={`/ask?q=${encodeURIComponent(searchQuery.trim())}`}
|
||||
class="flex items-center gap-1 px-2.5 py-1.5 rounded-lg border border-default text-dim hover:text-accent hover:border-accent transition-colors text-xs"
|
||||
title="이 쿼리로 AI 답변 보기"
|
||||
>
|
||||
<Sparkles size={14} /> AI 답변
|
||||
</a>
|
||||
{/if}
|
||||
<button
|
||||
onclick={toggleViewMode}
|
||||
class="p-1.5 rounded-lg border border-default text-dim hover:text-accent hover:border-accent transition-colors"
|
||||
|
||||
@@ -59,9 +59,9 @@
|
||||
async function loadInbox() {
|
||||
loading = true;
|
||||
try {
|
||||
// TODO(backend): /documents/?review_status=pending 서버 필터 지원 시 page_size 축소
|
||||
const data = await api('/documents/?page_size=200');
|
||||
documents = (data.items || []).filter((d) => d.review_status === 'pending');
|
||||
// 서버 필터 review_status=pending 적용 — page_size 100 이내 안전
|
||||
const data = await api('/documents/?review_status=pending&page_size=100');
|
||||
documents = data.items || [];
|
||||
} catch (err) {
|
||||
addToast('error', 'Inbox 로딩 실패');
|
||||
} finally {
|
||||
|
||||
57
migrations/101_global_digests.sql
Normal file
57
migrations/101_global_digests.sql
Normal file
@@ -0,0 +1,57 @@
|
||||
-- Phase 4 Global News Digest
|
||||
-- 7일 rolling window 뉴스를 country × topic 2-level로 묶어 매일 새벽 4시 KST 배치 생성
|
||||
-- 검색 파이프라인 미사용. documents → clustering → cluster-level LLM summarization → digest
|
||||
-- 사용자 결정: country→topic 2-level, cluster-level LLM only, drop 금지 fallback,
|
||||
-- adaptive threshold, EMA centroid, time-decay (λ=ln(2)/3 ≈ 0.231)
|
||||
|
||||
-- 부모 테이블: 하루 단위 digest run 메타데이터
|
||||
CREATE TABLE global_digests (
|
||||
id BIGSERIAL PRIMARY KEY,
|
||||
digest_date DATE NOT NULL, -- KST 기준 생성일
|
||||
window_start TIMESTAMPTZ NOT NULL, -- rolling window 시작 (UTC)
|
||||
window_end TIMESTAMPTZ NOT NULL, -- 생성 시점 (UTC)
|
||||
decay_lambda DOUBLE PRECISION NOT NULL, -- 실제 사용된 time-decay λ
|
||||
|
||||
total_articles INTEGER NOT NULL DEFAULT 0,
|
||||
total_countries INTEGER NOT NULL DEFAULT 0,
|
||||
total_topics INTEGER NOT NULL DEFAULT 0,
|
||||
|
||||
generation_ms INTEGER, -- 워커 실행 시간 (성능 회귀 감지)
|
||||
llm_calls INTEGER NOT NULL DEFAULT 0,
|
||||
llm_failures INTEGER NOT NULL DEFAULT 0, -- = fallback 사용 횟수
|
||||
status VARCHAR(20) NOT NULL DEFAULT 'success', -- success | partial | failed
|
||||
|
||||
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
|
||||
|
||||
UNIQUE (digest_date) -- idempotency: 같은 날짜 재실행 시 DELETE+INSERT
|
||||
);
|
||||
|
||||
CREATE INDEX idx_global_digests_date ON global_digests (digest_date DESC);
|
||||
|
||||
-- 자식 테이블: country × topic 단위
|
||||
CREATE TABLE digest_topics (
|
||||
id BIGSERIAL PRIMARY KEY,
|
||||
digest_id BIGINT NOT NULL REFERENCES global_digests(id) ON DELETE CASCADE,
|
||||
|
||||
country VARCHAR(10) NOT NULL, -- KR | US | JP | CN | FR | DE | ...
|
||||
topic_rank INTEGER NOT NULL, -- country 내 1..N (importance_score 내림차순)
|
||||
|
||||
topic_label TEXT NOT NULL, -- LLM 생성 5~10 단어 한국어 (또는 fallback 시 "주요 뉴스 묶음")
|
||||
summary TEXT NOT NULL, -- LLM 생성 1~2 문장 factual (또는 fallback 시 top member ai_summary 첫 200자)
|
||||
|
||||
article_ids JSONB NOT NULL, -- [doc_id, ...] 코드가 주입 (LLM 생성 금지)
|
||||
article_count INTEGER NOT NULL, -- = jsonb_array_length(article_ids)
|
||||
|
||||
importance_score DOUBLE PRECISION NOT NULL, -- batch 내 country별 0~1 normalized (cross-country 비교)
|
||||
raw_weight_sum DOUBLE PRECISION NOT NULL, -- 정규화 전 decay 가중합 (디버그 + day-over-day 트렌드)
|
||||
|
||||
centroid_sample JSONB, -- 디버그: LLM 입력 doc id 목록 + summary hash
|
||||
llm_model VARCHAR(100), -- 사용된 모델 (primary/fallback 추적)
|
||||
llm_fallback_used BOOLEAN NOT NULL DEFAULT FALSE, -- LLM 실패 시 minimal fallback 적용 여부
|
||||
|
||||
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
|
||||
);
|
||||
|
||||
CREATE INDEX idx_digest_topics_digest ON digest_topics (digest_id);
|
||||
CREATE INDEX idx_digest_topics_country ON digest_topics (country);
|
||||
CREATE INDEX idx_digest_topics_rank ON digest_topics (digest_id, country, topic_rank);
|
||||
125
reports/phase2_final.md
Normal file
125
reports/phase2_final.md
Normal file
@@ -0,0 +1,125 @@
|
||||
# Phase 2 최종 측정 보고서
|
||||
|
||||
**측정일**: 2026-04-08
|
||||
**대상**: Document Server 검색 v2, Phase 2.1~2.3 통합
|
||||
**평가셋**: `tests/search_eval/queries.yaml` v0.1 (23 쿼리, 8 카테고리)
|
||||
**인프라 기준**: `memory/infra_inventory.md` (2026-04-08 실측)
|
||||
|
||||
## A/B 결과
|
||||
|
||||
| metric | Phase 1.3 baseline (A) | Phase 2 final (B) | Δ |
|
||||
|---|---|---|---|
|
||||
| Recall@10 | 0.730 | **0.737** | +0.007 ✓ |
|
||||
| MRR@10 | 0.795 | 0.797 | +0.002 |
|
||||
| NDCG@10 | 0.663 | **0.668** | +0.005 ✓ |
|
||||
| Top-3 hit | 0.900 | 0.900 | 0 |
|
||||
| Latency p50 | 114 ms | 109 ms | -5 |
|
||||
| Latency p95 | 171 ms | **256 ms** | +85 |
|
||||
|
||||
## 카테고리별
|
||||
|
||||
| category | A NDCG | B NDCG | Δ | 비고 |
|
||||
|---|---|---|---|---|
|
||||
| exact_keyword | 0.96 | 0.96 | 0 | 회귀 0 ✓ |
|
||||
| natural_language_ko | 0.73 | 0.73 | 0 | 회귀 0 ✓ (narrowed multilingual 덕) |
|
||||
| crosslingual_ko_en | 0.53 | 0.53 | 0 | bge-m3 한계 — multilingual 효과 0 |
|
||||
| **news_crosslingual** | 0.27 | **0.37** | **+0.10** | 개선 ✓ |
|
||||
| news_ko | 0.36 | 0.37 | +0.01 | 미세 |
|
||||
| news_en | 0.00 | 0.00 | 0 | 여전히 0 |
|
||||
| news_fr | 0.46 | 0.46 | 0 | |
|
||||
| other_domain | 0.88 | 0.88 | 0 | |
|
||||
|
||||
## Phase 2 게이트 검증
|
||||
|
||||
| 게이트 | 목표 | 실제 | 상태 |
|
||||
|---|---|---|---|
|
||||
| Recall@10 | ≥ 0.78 | 0.737 | ❌ (-0.043) |
|
||||
| Top-3 hit | ≥ 0.93 | 0.900 | ❌ (-0.030) |
|
||||
| crosslingual_ko_en NDCG | ≥ 0.65 | 0.53 | ❌ (-0.12) |
|
||||
| news_crosslingual NDCG | ≥ 0.30 | 0.37 | ✓ |
|
||||
| latency p95 | < 400 ms | 256 ms | ✓ |
|
||||
| 평가셋 v0.2 완료 | - | v0.1만 | ❌ (후속) |
|
||||
|
||||
**2/6 통과** — 목표 미달. 단 회귀 0 + 일부 영역 개선.
|
||||
|
||||
## Phase 2에서 실제로 달성한 것
|
||||
|
||||
### 1. 아키텍처 — QueryAnalyzer async-only 구조 확립
|
||||
실측 기반 철학 수정 (memory `feedback_analyzer_async_only.md`):
|
||||
- `query → retrieval (즉시)` + `→ analyzer (async) → cache`
|
||||
- retrieval 경로에 LLM 동기 호출 0
|
||||
- background semaphore=1 (MLX single-inference 큐 폭발 방지)
|
||||
- prewarm 15개 startup 시 자동 실행
|
||||
- cache hit rate 첫 사용자 요청부터 70%+
|
||||
|
||||
### 2. 실측 데이터 — MLX 한계
|
||||
gemma-4-26b-a4b-it-8bit MLX:
|
||||
- full prompt (prompt_tok=2406) → **10.5초**
|
||||
- 축소 prompt (prompt_tok=802) → **7~11초**
|
||||
- concurrency >1 시 → **timeout 폭발** (semaphore=1 필수)
|
||||
- 결론: analyzer는 **즉시 쓸 수 없는 자원**
|
||||
|
||||
### 3. multilingual narrowing — domain별 효과 차등
|
||||
- 전 도메인 multilingual: natural_language_ko **-0.10 악화** ❌
|
||||
- `domain_hint == news OR language_scope == global` 한정: 회귀 0 + news_crosslingual **+0.10** ✓
|
||||
- 룰: 한국어 법령 검색에 영어 번역 쿼리 섞으면 noise
|
||||
|
||||
### 4. soft_filter boost — 보수적 설정 필요
|
||||
- 초기 0.03+0.02 → exact_keyword **-0.03 악화**
|
||||
- 낮춰서 0.01 단일 domain only → 회귀 0
|
||||
- 평가셋에 filter 쿼리가 없어 효과 직접 측정 불가 (v0.2 확장 후 재평가)
|
||||
|
||||
## Phase 2에서 달성하지 못한 것 + 이유
|
||||
|
||||
### Recall@10 / Top-3 hit 회복 (0.730 → 0.78+ 미달)
|
||||
- baseline 대비 +0.007 미세 개선만
|
||||
- 원인: **corpus 1022 docs로 noise 증가**. chunk 수 7129. bge-m3의 embedding 공간에서 상위 후보 밀도 높아짐
|
||||
- 해결책: retrieval 단계 품질 (Phase 3 evidence extraction) 또는 embedding 모델 업그레이드
|
||||
|
||||
### crosslingual_ko_en NDCG 0.65+ 미달 (0.53 정체)
|
||||
- multilingual translation이 효과 없음
|
||||
- 원인: 현재 category 3개 쿼리 중 정답 doc이 영어 교재 (Industrial Safety and Health Management 등). bge-m3는 ko 쿼리로 이 영어 doc을 약 0.5~0.6 cosine으로 이미 찾음. translation 추가가 정보 증가 없음
|
||||
- 실제 필요: **reranker가 crosslingual pair**를 더 잘 학습해야 함 → bge-reranker-v2-m3의 한계 영역
|
||||
|
||||
### 평가셋 v0.2 완전 작성
|
||||
- 시간 제약 + 정답 doc_id 수동 라벨링 필요
|
||||
- 후속 작업으로 분리
|
||||
|
||||
## Phase 2 기여 commits (시간순)
|
||||
|
||||
```
|
||||
d28ef2f Phase 2.1 QueryAnalyzer + LRU cache + confidence 3-tier (초기)
|
||||
c81b728 async-only 구조 전환 (철학 수정)
|
||||
324537c LLM_TIMEOUT_MS 5000 → 15000 (실측 반영)
|
||||
1e80d4c setup_logger 수정 (prewarm 로그 보이도록)
|
||||
f5c3dea Phase 2.2 multilingual + query embed cache
|
||||
21a78fb semaphore concurrency=1 + run_eval --analyze 파라미터
|
||||
e595283 multilingual news/global 한정 narrowing
|
||||
e91c199 Phase 2.3 soft_filter boost (초기)
|
||||
01f144a soft_filter boost 약화 (0.01, doctype 제거)
|
||||
```
|
||||
|
||||
## 다음 단계 선택지 (사용자 결정)
|
||||
|
||||
### A. Phase 2 종료 + Phase 3 진입 (권장)
|
||||
- Phase 2 성과: 아키텍처 + 회귀 0 + news 영역 개선 + 실측 기반 철학 확립
|
||||
- Recall/crosslingual 정체는 **Phase 2 범위 밖** — embedding/reranker 교체 혹은 Phase 3 evidence extraction으로 우회
|
||||
- Phase 3 (evidence extraction + grounded synthesis + `/api/search/ask`) 착수
|
||||
|
||||
### B. Phase 2 iteration — embedding 실험
|
||||
- bge-m3 → 다른 embedding (e.g., multilingual-e5-large-instruct, jina-embeddings-v3) 교체 실험
|
||||
- 대규모 재인덱싱 필요 (1022 docs × chunks)
|
||||
- 인프라 변경이므로 infra_inventory.md drift 발생
|
||||
|
||||
### C. Phase 2 iteration — 평가셋 v0.2 작성
|
||||
- queries_v0.2.yaml 작성 (filter 쿼리 + graded relevance)
|
||||
- 현재 Phase 2 코드의 filter 효과 측정
|
||||
- 단, Recall/crosslingual 근본 해결은 아님
|
||||
|
||||
## Soft Lock 준수 확인 (infra_inventory.md)
|
||||
|
||||
- ✓ `config.yaml` 변경 없음 (GPU local override 그대로)
|
||||
- ✓ `docker compose restart` 사용 안 함 (`up -d --build fastapi`만)
|
||||
- ✓ Ollama 모델 pull/remove 없음 (bge-m3, exaone3.5 그대로)
|
||||
- ✓ Reranker 모델 변경 없음 (TEI bge-reranker-v2-m3 그대로)
|
||||
- ✓ Mac mini MLX 설정 변경 없음
|
||||
24
reports/phase2_final_baseline.csv
Normal file
24
reports/phase2_final_baseline.csv
Normal file
@@ -0,0 +1,24 @@
|
||||
label,id,category,intent,domain_hint,query,relevant_ids,returned_ids_top10,latency_ms,recall_at_10,mrr_at_10,ndcg_at_10,top3_hit,error
|
||||
single,kw_001,exact_keyword,fact_lookup,document,산업안전보건법 제6장,3856;3868;3879,3856;3851;3862;3853;3861;3868;3879;3873;3876;3871,95.5,1.000,1.000,0.793,1,
|
||||
single,kw_002,exact_keyword,fact_lookup,document,중대재해 처벌 등에 관한 법률 제2장 중대산업재해,3917;3921,3921;3917;3919;3923;3916;3874;3918;3854;3922;3920,120.7,1.000,1.000,1.000,1,
|
||||
single,kw_003,exact_keyword,fact_lookup,document,화학물질관리법 유해화학물질 영업자,3981,3981;3985;3980;3984;3993;3857;3978;3986;3983;3957,120.2,1.000,1.000,1.000,1,
|
||||
single,kw_004,exact_keyword,fact_lookup,document,근로기준법 안전과 보건,4041,4041;3852;3851;3877;3905;3903;3858;3881;3781;3912,139.3,1.000,1.000,1.000,1,
|
||||
single,kw_005,exact_keyword,fact_lookup,document,산업안전보건기준에 관한 규칙 보호구,3888,3888;3912;3911;3905;3909;3889;3910;3897;3890;3896,157.8,1.000,1.000,1.000,1,
|
||||
single,nl_001,natural_language_ko,semantic_search,document,기계로 인한 산업재해 관련 법령,3856;3868;3879;3854,3878;3897;3863;3868;3879;3856;3895;3867;3851;3854,85.9,1.000,0.250,0.571,0,
|
||||
single,nl_002,natural_language_ko,semantic_search,document,사업주가 도급을 줄 때 산업재해를 예방하기 위해 해야 할 일,3855;3867;3878,3855;3917;3854;3867;3878;3863;3851;3908;3903;3895,82.1,1.000,1.000,0.853,1,
|
||||
single,nl_003,natural_language_ko,semantic_search,document,유해화학물질을 다루는 회사가 지켜야 할 안전 의무,3980;3981;3982,3980;3903;3904;3905;3981;3985;3896;3917;3857;3909,117.0,0.667,1.000,0.651,1,
|
||||
single,nl_004,natural_language_ko,semantic_search,document,중대재해가 발생했을 때 경영책임자가 처벌받는 기준,3916;3917;3920;3921,3917;3918;3916;3923;3919;3921;3854;3872;3877;3922,113.6,0.750,1.000,0.725,1,
|
||||
single,nl_005,natural_language_ko,semantic_search,document,안전보건교육은 누가 받아야 하고 어떤 내용을 다루는가,3853;3865,3853;4025;3876;3879;3859;3865;3781;3815;3818;3787,80.6,1.000,1.000,0.832,1,
|
||||
single,cl_001,crosslingual_ko_en,semantic_search,document,기계 안전 가드 설계 원리,3770;3856,3770;4540;3817;4541;3774;3816;3787;3758;3793;3773,79.0,0.500,1.000,0.613,1,
|
||||
single,cl_002,crosslingual_ko_en,semantic_search,document,산업 안전 입문서,3755;3775;3776;3777,3760;3755;3774;3764;3758;3775;3779;3802;3814;3817,107.6,0.500,0.500,0.385,1,
|
||||
single,cl_003,crosslingual_ko_en,semantic_search,document,전기 안전 위험,3772;3790,3897;3772;3771;4018;3773;3790;3819;4020;3807;3755,125.6,1.000,0.500,0.605,1,
|
||||
single,news_001,news_ko,semantic_search,news,이란과 미국의 군사 충돌,4303;4304;4307;4316;4322;4323;4327;4335,4317;4321;4771;4446;4743;4452;4307;4418;4331;4744,94.5,0.125,0.143,0.084,1,
|
||||
single,news_002,news_ko,semantic_search,news,호르무즈 해협 봉쇄,4316;4320;4322;4327,4327;4346;4349;4762;4767;4759;4322;4320;4340;4304,94.3,0.750,1.000,0.644,0,
|
||||
single,news_003,news_en,semantic_search,news,Trump Iran ultimatum,4258;4260;4262,4776;4515;4519;4658;4644;4763;4333;4762;4679;4321,118.7,0.000,0.000,0.000,1,
|
||||
single,news_004,news_fr,semantic_search,news,guerre en Iran,4199;4202;4210;4361;4363;4507;4519;4521,4678;4507;4199;4688;4776;4363;4519;4668;4670;4672,119.6,0.500,0.500,0.460,1,
|
||||
single,news_005,news_crosslingual,semantic_search,news,이란 미국 전쟁 글로벌 반응,4202;4258;4262;4536;4303;4304;4316,4262;4457;4765;4324;4345;4329;4452;4443;4761;4642,95.6,0.143,1.000,0.275,1,
|
||||
single,misc_001,other_domain,fact_lookup,document,강체의 평면 운동학,4063;4065,4063;4065;4064;4067;4071;4068;4069;4062;4060;4066,172.3,1.000,1.000,1.000,1,
|
||||
single,misc_002,other_domain,semantic_search,document,질점의 운동역학,4060;4061;4062,4062;4060;4070;4064;4068;4067;4065;4058;4071;4066,261.8,0.667,1.000,0.765,1,
|
||||
single,fail_001,failure_expected,semantic_search,document,Rust async runtime tokio scheduler 내부 구조,,4815;4069;4546;4062;4547;3801;3787;3812;4542;3770,118.6,0.000,0.000,0.000,1,
|
||||
single,fail_002,failure_expected,semantic_search,document,양자컴퓨터 큐비트 디코히어런스,,4058;4057;4067;3800;4065;4068;3817;4063;4064;3915,76.9,0.000,0.000,0.000,1,
|
||||
single,fail_003,failure_expected,semantic_search,news,재즈 보컬리스트 빌리 홀리데이,,4634;4100;4815;4116;4281;4697;4205;4077;4235;4758,73.6,0.000,0.000,0.000,1,
|
||||
|
24
reports/phase2_final_candidate.csv
Normal file
24
reports/phase2_final_candidate.csv
Normal file
@@ -0,0 +1,24 @@
|
||||
label,id,category,intent,domain_hint,query,relevant_ids,returned_ids_top10,latency_ms,recall_at_10,mrr_at_10,ndcg_at_10,top3_hit,error
|
||||
single,kw_001,exact_keyword,fact_lookup,document,산업안전보건법 제6장,3856;3868;3879,3856;3851;3862;3853;3861;3868;3879;3873;3876;3871,67.9,1.000,1.000,0.793,1,
|
||||
single,kw_002,exact_keyword,fact_lookup,document,중대재해 처벌 등에 관한 법률 제2장 중대산업재해,3917;3921,3921;3917;3919;3923;3916;3874;3918;3854;3922;3920,110.0,1.000,1.000,1.000,1,
|
||||
single,kw_003,exact_keyword,fact_lookup,document,화학물질관리법 유해화학물질 영업자,3981,3981;3985;3980;3984;3993;3857;3978;3986;3983;3957,119.3,1.000,1.000,1.000,1,
|
||||
single,kw_004,exact_keyword,fact_lookup,document,근로기준법 안전과 보건,4041,4041;3852;3851;3877;3905;3903;3858;3881;3781;3912,108.7,1.000,1.000,1.000,1,
|
||||
single,kw_005,exact_keyword,fact_lookup,document,산업안전보건기준에 관한 규칙 보호구,3888,3888;3912;3911;3905;3909;3889;3910;3897;3890;3896,125.5,1.000,1.000,1.000,1,
|
||||
single,nl_001,natural_language_ko,semantic_search,document,기계로 인한 산업재해 관련 법령,3856;3868;3879;3854,3878;3897;3863;3868;3856;3879;3895;3867;3851;3854,83.9,1.000,0.250,0.571,0,
|
||||
single,nl_002,natural_language_ko,semantic_search,document,사업주가 도급을 줄 때 산업재해를 예방하기 위해 해야 할 일,3855;3867;3878,3855;3917;3854;3867;3878;3863;3851;3908;3903;3895,118.0,1.000,1.000,0.853,1,
|
||||
single,nl_003,natural_language_ko,semantic_search,document,유해화학물질을 다루는 회사가 지켜야 할 안전 의무,3980;3981;3982,3980;3903;3904;3905;3981;3985;3896;3917;3857;3909,82.8,0.667,1.000,0.651,1,
|
||||
single,nl_004,natural_language_ko,semantic_search,document,중대재해가 발생했을 때 경영책임자가 처벌받는 기준,3916;3917;3920;3921,3917;3918;3916;3923;3919;3921;3854;3872;3877;3922,72.1,0.750,1.000,0.725,1,
|
||||
single,nl_005,natural_language_ko,semantic_search,document,안전보건교육은 누가 받아야 하고 어떤 내용을 다루는가,3853;3865,3853;4025;3876;3879;3859;3865;3781;3815;3818;3787,80.2,1.000,1.000,0.832,1,
|
||||
single,cl_001,crosslingual_ko_en,semantic_search,document,기계 안전 가드 설계 원리,3770;3856,3770;4540;3817;4541;3774;3816;3787;3758;3793;3773,108.3,0.500,1.000,0.613,1,
|
||||
single,cl_002,crosslingual_ko_en,semantic_search,document,산업 안전 입문서,3755;3775;3776;3777,3760;3755;3774;3764;3758;3775;3779;3802;3814;3817,79.5,0.500,0.500,0.385,1,
|
||||
single,cl_003,crosslingual_ko_en,semantic_search,document,전기 안전 위험,3772;3790,3897;3772;3771;4018;3773;3790;3819;4020;3807;3755,103.7,1.000,0.500,0.605,1,
|
||||
single,news_001,news_ko,semantic_search,news,이란과 미국의 군사 충돌,4303;4304;4307;4316;4322;4323;4327;4335,4317;4321;4771;4743;4307;4452;4761;4678;4418;4331,1445.8,0.125,0.200,0.098,1,
|
||||
single,news_002,news_ko,semantic_search,news,호르무즈 해협 봉쇄,4316;4320;4322;4327,4327;4346;4349;4762;4767;4759;4322;4320;4340;4304,185.3,0.750,1.000,0.644,0,
|
||||
single,news_003,news_en,semantic_search,news,Trump Iran ultimatum,4258;4260;4262,4776;4515;4519;4658;4644;4763;4333;4762;4679;4321,76.3,0.000,0.000,0.000,1,
|
||||
single,news_004,news_fr,semantic_search,news,guerre en Iran,4199;4202;4210;4361;4363;4507;4519;4521,4678;4507;4199;4688;4776;4363;4519;4668;4670;4672,157.9,0.500,0.500,0.460,1,
|
||||
single,news_005,news_crosslingual,semantic_search,news,이란 미국 전쟁 글로벌 반응,4202;4258;4262;4536;4303;4304;4316,4262;4457;4765;4324;4345;4329;4258;4452;4443;4761,186.7,0.286,1.000,0.367,1,
|
||||
single,misc_001,other_domain,fact_lookup,document,강체의 평면 운동학,4063;4065,4063;4065;4064;4067;4071;4068;4069;4062;4060;4066,171.6,1.000,1.000,1.000,1,
|
||||
single,misc_002,other_domain,semantic_search,document,질점의 운동역학,4060;4061;4062,4062;4060;4070;4064;4068;4067;4065;4058;4071;4066,263.6,0.667,1.000,0.765,1,
|
||||
single,fail_001,failure_expected,semantic_search,document,Rust async runtime tokio scheduler 내부 구조,,4815;4069;4546;4062;4547;3801;3787;3812;4542;3770,121.9,0.000,0.000,0.000,1,
|
||||
single,fail_002,failure_expected,semantic_search,document,양자컴퓨터 큐비트 디코히어런스,,4058;4057;4067;3800;4065;4068;3817;4063;4064;3915,75.2,0.000,0.000,0.000,1,
|
||||
single,fail_003,failure_expected,semantic_search,news,재즈 보컬리스트 빌리 홀리데이,,4634;4100;4815;4116;4281;4697;4205;4077;4235;4289,73.9,0.000,0.000,0.000,1,
|
||||
|
@@ -134,6 +134,7 @@ async def call_search(
|
||||
limit: int = 20,
|
||||
fusion: str | None = None,
|
||||
rerank: str | None = None,
|
||||
analyze: str | None = None,
|
||||
) -> tuple[list[int], float]:
|
||||
"""검색 API 호출 → (doc_ids, latency_ms)."""
|
||||
url = f"{base_url.rstrip('/')}/api/search/"
|
||||
@@ -143,6 +144,8 @@ async def call_search(
|
||||
params["fusion"] = fusion
|
||||
if rerank is not None:
|
||||
params["rerank"] = rerank
|
||||
if analyze is not None:
|
||||
params["analyze"] = analyze
|
||||
|
||||
import time
|
||||
|
||||
@@ -169,6 +172,7 @@ async def evaluate(
|
||||
mode: str = "hybrid",
|
||||
fusion: str | None = None,
|
||||
rerank: str | None = None,
|
||||
analyze: str | None = None,
|
||||
) -> list[QueryResult]:
|
||||
"""전체 쿼리셋 평가."""
|
||||
results: list[QueryResult] = []
|
||||
@@ -177,7 +181,7 @@ async def evaluate(
|
||||
for q in queries:
|
||||
try:
|
||||
returned_ids, latency_ms = await call_search(
|
||||
client, base_url, token, q.query, mode=mode, fusion=fusion, rerank=rerank
|
||||
client, base_url, token, q.query, mode=mode, fusion=fusion, rerank=rerank, analyze=analyze
|
||||
)
|
||||
results.append(
|
||||
QueryResult(
|
||||
@@ -415,6 +419,13 @@ def main() -> int:
|
||||
choices=["true", "false"],
|
||||
help="bge-reranker-v2-m3 활성화 (Phase 1.3+, 미지정 시 서버 기본값=true)",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--analyze",
|
||||
type=str,
|
||||
default=None,
|
||||
choices=["true", "false"],
|
||||
help="QueryAnalyzer 활성화 (Phase 2.1+, cache hit 시 multilingual 적용)",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--token",
|
||||
type=str,
|
||||
@@ -454,21 +465,21 @@ def main() -> int:
|
||||
if args.base_url:
|
||||
print(f"\n>>> evaluating: {args.base_url}")
|
||||
results = asyncio.run(
|
||||
evaluate(queries, args.base_url, args.token, "single", mode=args.mode, fusion=args.fusion, rerank=args.rerank)
|
||||
evaluate(queries, args.base_url, args.token, "single", mode=args.mode, fusion=args.fusion, rerank=args.rerank, analyze=args.analyze)
|
||||
)
|
||||
print_summary("single", results)
|
||||
all_results.extend(results)
|
||||
else:
|
||||
print(f"\n>>> baseline: {args.baseline_url}")
|
||||
baseline_results = asyncio.run(
|
||||
evaluate(queries, args.baseline_url, args.token, "baseline", mode=args.mode, fusion=args.fusion, rerank=args.rerank)
|
||||
evaluate(queries, args.baseline_url, args.token, "baseline", mode=args.mode, fusion=args.fusion, rerank=args.rerank, analyze=args.analyze)
|
||||
)
|
||||
baseline_summary = print_summary("baseline", baseline_results)
|
||||
|
||||
print(f"\n>>> candidate: {args.candidate_url}")
|
||||
candidate_results = asyncio.run(
|
||||
evaluate(
|
||||
queries, args.candidate_url, args.token, "candidate", mode=args.mode, fusion=args.fusion, rerank=args.rerank
|
||||
queries, args.candidate_url, args.token, "candidate", mode=args.mode, fusion=args.fusion, rerank=args.rerank, analyze=args.analyze
|
||||
)
|
||||
)
|
||||
candidate_summary = print_summary("candidate", candidate_results)
|
||||
|
||||
Reference in New Issue
Block a user