Compare commits
21 Commits
feature/de
...
feat/phase
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
0b511a8db0 | ||
|
|
4615fb4ce3 | ||
|
|
cdcbb07561 | ||
|
|
46ba9dd231 | ||
|
|
4468c2baba | ||
|
|
9bef049af6 | ||
|
|
dd9a0f600a | ||
|
|
d5f91556e6 | ||
|
|
75a1919342 | ||
|
|
64322e4f6f | ||
|
|
120db86d74 | ||
|
|
01f144ab25 | ||
|
|
e91c199537 | ||
|
|
e595283e27 | ||
|
|
21a78fbbf0 | ||
|
|
f5c3dea833 | ||
|
|
1e80d4c613 | ||
|
|
324537cbc8 | ||
|
|
c81b728ddf | ||
|
|
d28ef2fca0 | ||
|
|
de08735420 |
15
CLAUDE.md
15
CLAUDE.md
@@ -1,5 +1,20 @@
|
|||||||
# hyungi_Document_Server — Claude Code 작업 가이드
|
# hyungi_Document_Server — Claude Code 작업 가이드
|
||||||
|
|
||||||
|
## Infrastructure Reference 📌
|
||||||
|
|
||||||
|
**Always refer to** `~/.claude/projects/-Users-hyungiahn/memory/infra_inventory.md` for:
|
||||||
|
- AI model routing (primary / fallback / embedding / rerank / vision) — **the model names below may be stale**
|
||||||
|
- Machine info, Tailscale IPs, SSH targets
|
||||||
|
- Docker container topology and compose projects
|
||||||
|
- Drift log (known Desired vs Actual inconsistencies)
|
||||||
|
- Verify commands
|
||||||
|
|
||||||
|
**If this file and `infra_inventory.md` disagree, `infra_inventory.md` is authoritative.** Do not change `config.yaml` / `credentials.env` without first updating `infra_inventory.md`.
|
||||||
|
|
||||||
|
**Search experiment soft lock**: During Phase 2 work (search.py refactor, QueryAnalyzer, run_eval.py execution), do **not** run `docker compose restart`, change `config.yaml`, or pull Ollama models. Violating this invalidates the experiment baseline.
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
## 프로젝트 개요
|
## 프로젝트 개요
|
||||||
|
|
||||||
Self-hosted PKM(Personal Knowledge Management) 웹 애플리케이션.
|
Self-hosted PKM(Personal Knowledge Management) 웹 애플리케이션.
|
||||||
|
|||||||
164
app/api/digest.py
Normal file
164
app/api/digest.py
Normal file
@@ -0,0 +1,164 @@
|
|||||||
|
"""Phase 4 Global Digest API — read-only + 디버그 regenerate.
|
||||||
|
|
||||||
|
엔드포인트:
|
||||||
|
- GET /api/digest/latest : 가장 최근 digest
|
||||||
|
- GET /api/digest?date=YYYY-MM-DD : 특정 날짜 digest
|
||||||
|
- GET /api/digest?country=KR : 특정 국가만
|
||||||
|
- POST /api/digest/regenerate : 백그라운드 digest 워커 트리거 (auth 필요)
|
||||||
|
|
||||||
|
응답은 country → topic 2-level 구조. country 가 비어있는 경우 응답에서 자동 생략.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
from datetime import date as date_type
|
||||||
|
from datetime import datetime
|
||||||
|
from typing import Annotated
|
||||||
|
|
||||||
|
from fastapi import APIRouter, Depends, HTTPException, Query
|
||||||
|
from pydantic import BaseModel
|
||||||
|
from sqlalchemy import select
|
||||||
|
from sqlalchemy.ext.asyncio import AsyncSession
|
||||||
|
from sqlalchemy.orm import selectinload
|
||||||
|
|
||||||
|
from core.auth import get_current_user
|
||||||
|
from core.database import get_session
|
||||||
|
from models.digest import DigestTopic, GlobalDigest
|
||||||
|
from models.user import User
|
||||||
|
|
||||||
|
router = APIRouter()
|
||||||
|
|
||||||
|
|
||||||
|
# ─── Pydantic 응답 모델 (schemas/ 디렉토리 미사용 → inline 정의) ───
|
||||||
|
|
||||||
|
|
||||||
|
class TopicResponse(BaseModel):
|
||||||
|
topic_rank: int
|
||||||
|
topic_label: str
|
||||||
|
summary: str
|
||||||
|
article_ids: list[int]
|
||||||
|
article_count: int
|
||||||
|
importance_score: float
|
||||||
|
raw_weight_sum: float
|
||||||
|
llm_fallback_used: bool
|
||||||
|
|
||||||
|
|
||||||
|
class CountryGroup(BaseModel):
|
||||||
|
country: str
|
||||||
|
topics: list[TopicResponse]
|
||||||
|
|
||||||
|
|
||||||
|
class DigestResponse(BaseModel):
|
||||||
|
digest_date: date_type
|
||||||
|
window_start: datetime
|
||||||
|
window_end: datetime
|
||||||
|
decay_lambda: float
|
||||||
|
total_articles: int
|
||||||
|
total_countries: int
|
||||||
|
total_topics: int
|
||||||
|
generation_ms: int | None
|
||||||
|
llm_calls: int
|
||||||
|
llm_failures: int
|
||||||
|
status: str
|
||||||
|
countries: list[CountryGroup]
|
||||||
|
|
||||||
|
|
||||||
|
# ─── helpers ───
|
||||||
|
|
||||||
|
|
||||||
|
def _build_response(digest: GlobalDigest, country_filter: str | None = None) -> DigestResponse:
|
||||||
|
"""ORM 객체 → DigestResponse. country_filter 가 주어지면 해당 국가만."""
|
||||||
|
topics_by_country: dict[str, list[TopicResponse]] = {}
|
||||||
|
for t in sorted(digest.topics, key=lambda x: (x.country, x.topic_rank)):
|
||||||
|
if country_filter and t.country != country_filter:
|
||||||
|
continue
|
||||||
|
topics_by_country.setdefault(t.country, []).append(
|
||||||
|
TopicResponse(
|
||||||
|
topic_rank=t.topic_rank,
|
||||||
|
topic_label=t.topic_label,
|
||||||
|
summary=t.summary,
|
||||||
|
article_ids=list(t.article_ids or []),
|
||||||
|
article_count=t.article_count,
|
||||||
|
importance_score=t.importance_score,
|
||||||
|
raw_weight_sum=t.raw_weight_sum,
|
||||||
|
llm_fallback_used=t.llm_fallback_used,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
countries = [
|
||||||
|
CountryGroup(country=c, topics=topics_by_country[c])
|
||||||
|
for c in sorted(topics_by_country.keys())
|
||||||
|
]
|
||||||
|
|
||||||
|
return DigestResponse(
|
||||||
|
digest_date=digest.digest_date,
|
||||||
|
window_start=digest.window_start,
|
||||||
|
window_end=digest.window_end,
|
||||||
|
decay_lambda=digest.decay_lambda,
|
||||||
|
total_articles=digest.total_articles,
|
||||||
|
total_countries=digest.total_countries,
|
||||||
|
total_topics=digest.total_topics,
|
||||||
|
generation_ms=digest.generation_ms,
|
||||||
|
llm_calls=digest.llm_calls,
|
||||||
|
llm_failures=digest.llm_failures,
|
||||||
|
status=digest.status,
|
||||||
|
countries=countries,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
async def _load_digest(
|
||||||
|
session: AsyncSession,
|
||||||
|
target_date: date_type | None,
|
||||||
|
) -> GlobalDigest | None:
|
||||||
|
"""date 가 주어지면 해당 날짜, 아니면 최신 digest 1건."""
|
||||||
|
query = select(GlobalDigest).options(selectinload(GlobalDigest.topics))
|
||||||
|
if target_date is not None:
|
||||||
|
query = query.where(GlobalDigest.digest_date == target_date)
|
||||||
|
else:
|
||||||
|
query = query.order_by(GlobalDigest.digest_date.desc())
|
||||||
|
query = query.limit(1)
|
||||||
|
result = await session.execute(query)
|
||||||
|
return result.scalar_one_or_none()
|
||||||
|
|
||||||
|
|
||||||
|
# ─── Routes ───
|
||||||
|
|
||||||
|
|
||||||
|
@router.get("/latest", response_model=DigestResponse)
|
||||||
|
async def get_latest(
|
||||||
|
user: Annotated[User, Depends(get_current_user)],
|
||||||
|
session: Annotated[AsyncSession, Depends(get_session)],
|
||||||
|
):
|
||||||
|
"""가장 최근 생성된 global digest."""
|
||||||
|
digest = await _load_digest(session, target_date=None)
|
||||||
|
if digest is None:
|
||||||
|
raise HTTPException(status_code=404, detail="아직 생성된 digest 없음")
|
||||||
|
return _build_response(digest)
|
||||||
|
|
||||||
|
|
||||||
|
@router.get("", response_model=DigestResponse)
|
||||||
|
async def get_digest(
|
||||||
|
user: Annotated[User, Depends(get_current_user)],
|
||||||
|
session: Annotated[AsyncSession, Depends(get_session)],
|
||||||
|
date: date_type | None = Query(default=None, description="YYYY-MM-DD (KST)"),
|
||||||
|
country: str | None = Query(default=None, description="국가 코드 (예: KR)"),
|
||||||
|
):
|
||||||
|
"""특정 날짜 또는 국가 필터링된 digest. date 미지정 시 최신."""
|
||||||
|
digest = await _load_digest(session, target_date=date)
|
||||||
|
if digest is None:
|
||||||
|
raise HTTPException(
|
||||||
|
status_code=404,
|
||||||
|
detail=f"digest 없음 (date={date})" if date else "아직 생성된 digest 없음",
|
||||||
|
)
|
||||||
|
country_filter = country.upper() if country else None
|
||||||
|
return _build_response(digest, country_filter=country_filter)
|
||||||
|
|
||||||
|
|
||||||
|
@router.post("/regenerate")
|
||||||
|
async def regenerate(
|
||||||
|
user: Annotated[User, Depends(get_current_user)],
|
||||||
|
):
|
||||||
|
"""디버그용 수동 트리거 — 백그라운드 태스크로 워커 실행 (auth 필요)."""
|
||||||
|
from workers.digest_worker import run
|
||||||
|
|
||||||
|
asyncio.create_task(run())
|
||||||
|
return {"status": "started", "message": "global_digest 워커 백그라운드 실행 시작"}
|
||||||
@@ -142,11 +142,12 @@ async def list_documents(
|
|||||||
user: Annotated[User, Depends(get_current_user)],
|
user: Annotated[User, Depends(get_current_user)],
|
||||||
session: Annotated[AsyncSession, Depends(get_session)],
|
session: Annotated[AsyncSession, Depends(get_session)],
|
||||||
page: int = Query(1, ge=1),
|
page: int = Query(1, ge=1),
|
||||||
page_size: int = Query(20, ge=1, le=100),
|
page_size: int = Query(20, ge=1, le=500),
|
||||||
domain: str | None = None,
|
domain: str | None = None,
|
||||||
sub_group: str | None = None,
|
sub_group: str | None = None,
|
||||||
source: str | None = None,
|
source: str | None = None,
|
||||||
format: str | None = None,
|
format: str | None = None,
|
||||||
|
review_status: str | None = Query(None, description="pending | approved | rejected"),
|
||||||
):
|
):
|
||||||
"""문서 목록 조회 (페이지네이션 + 필터, 뉴스 제외)"""
|
"""문서 목록 조회 (페이지네이션 + 필터, 뉴스 제외)"""
|
||||||
query = select(Document).where(Document.deleted_at == None, Document.source_channel != "news")
|
query = select(Document).where(Document.deleted_at == None, Document.source_channel != "news")
|
||||||
@@ -158,6 +159,8 @@ async def list_documents(
|
|||||||
query = query.where(Document.source_channel == source)
|
query = query.where(Document.source_channel == source)
|
||||||
if format:
|
if format:
|
||||||
query = query.where(Document.file_format == format)
|
query = query.where(Document.file_format == format)
|
||||||
|
if review_status:
|
||||||
|
query = query.where(Document.review_status == review_status)
|
||||||
|
|
||||||
# 전체 건수
|
# 전체 건수
|
||||||
count_query = select(func.count()).select_from(query.subquery())
|
count_query = select(func.count()).select_from(query.subquery())
|
||||||
|
|||||||
@@ -1,11 +1,16 @@
|
|||||||
"""하이브리드 검색 API — orchestrator (Phase 1.1: thin endpoint).
|
"""하이브리드 검색 API — thin endpoint (Phase 3.1 이후).
|
||||||
|
|
||||||
retrieval / fusion / rerank 등 실제 로직은 services/search/* 모듈로 분리.
|
실제 검색 파이프라인(retrieval → fusion → rerank → diversity → confidence)
|
||||||
이 파일은 mode 분기, 응답 직렬화, debug 응답 구성, BackgroundTask dispatch만 담당.
|
은 `services/search/search_pipeline.py::run_search()` 로 분리되어 있다.
|
||||||
|
이 파일은 다음만 담당:
|
||||||
|
- Pydantic 스키마 (SearchResult / SearchResponse / SearchDebug / DebugCandidate
|
||||||
|
/ Citation / AskResponse / AskDebug)
|
||||||
|
- `/search` endpoint wrapper (run_search 호출 + logger + telemetry + 직렬화)
|
||||||
|
- `/ask` endpoint wrapper (Phase 3.3 에서 추가)
|
||||||
"""
|
"""
|
||||||
|
|
||||||
import time
|
import time
|
||||||
from typing import Annotated
|
from typing import Annotated, Literal
|
||||||
|
|
||||||
from fastapi import APIRouter, BackgroundTasks, Depends, Query
|
from fastapi import APIRouter, BackgroundTasks, Depends, Query
|
||||||
from pydantic import BaseModel
|
from pydantic import BaseModel
|
||||||
@@ -15,20 +20,11 @@ from core.auth import get_current_user
|
|||||||
from core.database import get_session
|
from core.database import get_session
|
||||||
from core.utils import setup_logger
|
from core.utils import setup_logger
|
||||||
from models.user import User
|
from models.user import User
|
||||||
from services.search.fusion_service import DEFAULT_FUSION, get_strategy, normalize_display_scores
|
from services.search.evidence_service import EvidenceItem, extract_evidence
|
||||||
from services.search.rerank_service import (
|
from services.search.fusion_service import DEFAULT_FUSION
|
||||||
MAX_CHUNKS_PER_DOC,
|
from services.search.search_pipeline import PipelineResult, run_search
|
||||||
MAX_RERANK_INPUT,
|
from services.search.synthesis_service import SynthesisResult, synthesize
|
||||||
apply_diversity,
|
from services.search_telemetry import record_search_event
|
||||||
rerank_chunks,
|
|
||||||
)
|
|
||||||
from services.search.retrieval_service import compress_chunks_to_docs, search_text, search_vector
|
|
||||||
from services.search_telemetry import (
|
|
||||||
compute_confidence,
|
|
||||||
compute_confidence_hybrid,
|
|
||||||
compute_confidence_reranked,
|
|
||||||
record_search_event,
|
|
||||||
)
|
|
||||||
|
|
||||||
# logs/search.log + stdout 동시 출력 (Phase 0.4)
|
# logs/search.log + stdout 동시 출력 (Phase 0.4)
|
||||||
logger = setup_logger("search")
|
logger = setup_logger("search")
|
||||||
@@ -56,6 +52,10 @@ class SearchResult(BaseModel):
|
|||||||
chunk_id: int | None = None
|
chunk_id: int | None = None
|
||||||
chunk_index: int | None = None
|
chunk_index: int | None = None
|
||||||
section_title: str | None = None
|
section_title: str | None = None
|
||||||
|
# Phase 3.1: reranker raw score 보존 (display score drift 방지).
|
||||||
|
# rerank 경로를 탄 chunk에만 채워짐. normalize_display_scores는 이 필드를
|
||||||
|
# 건드리지 않는다. Phase 3 evidence fast-path 판단에 사용.
|
||||||
|
rerank_score: float | None = None
|
||||||
|
|
||||||
|
|
||||||
# ─── Phase 0.4: 디버그 응답 스키마 ─────────────────────────
|
# ─── Phase 0.4: 디버그 응답 스키마 ─────────────────────────
|
||||||
@@ -98,6 +98,29 @@ def _to_debug_candidates(rows: list[SearchResult], n: int = 20) -> list[DebugCan
|
|||||||
]
|
]
|
||||||
|
|
||||||
|
|
||||||
|
def _build_search_debug(pr: PipelineResult) -> SearchDebug:
|
||||||
|
"""PipelineResult → SearchDebug (기존 search()의 debug 구성 블록 복사)."""
|
||||||
|
return SearchDebug(
|
||||||
|
timing_ms=pr.timing_ms,
|
||||||
|
text_candidates=(
|
||||||
|
_to_debug_candidates(pr.text_results)
|
||||||
|
if pr.text_results or pr.mode != "vector"
|
||||||
|
else None
|
||||||
|
),
|
||||||
|
vector_candidates=(
|
||||||
|
_to_debug_candidates(pr.vector_results)
|
||||||
|
if pr.vector_results or pr.mode in ("vector", "hybrid")
|
||||||
|
else None
|
||||||
|
),
|
||||||
|
fused_candidates=(
|
||||||
|
_to_debug_candidates(pr.results) if pr.mode == "hybrid" else None
|
||||||
|
),
|
||||||
|
confidence=pr.confidence_signal,
|
||||||
|
notes=pr.notes,
|
||||||
|
query_analysis=pr.query_analysis,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
@router.get("/", response_model=SearchResponse)
|
@router.get("/", response_model=SearchResponse)
|
||||||
async def search(
|
async def search(
|
||||||
q: str,
|
q: str,
|
||||||
@@ -115,134 +138,301 @@ async def search(
|
|||||||
True,
|
True,
|
||||||
description="bge-reranker-v2-m3 활성화 (Phase 1.3, hybrid 모드만 동작)",
|
description="bge-reranker-v2-m3 활성화 (Phase 1.3, hybrid 모드만 동작)",
|
||||||
),
|
),
|
||||||
|
analyze: bool = Query(
|
||||||
|
False,
|
||||||
|
description="QueryAnalyzer 활성화 (Phase 2.1, LLM 호출). Phase 2.1은 debug 노출만, 검색 경로 영향 X",
|
||||||
|
),
|
||||||
debug: bool = Query(False, description="단계별 candidates + timing 응답에 포함"),
|
debug: bool = Query(False, description="단계별 candidates + timing 응답에 포함"),
|
||||||
):
|
):
|
||||||
"""문서 검색 — FTS + ILIKE + 벡터 결합 (Phase 0.5: RRF fusion)"""
|
"""문서 검색 — FTS + ILIKE + 벡터 결합 (Phase 3.1 이후 run_search wrapper)"""
|
||||||
timing: dict[str, float] = {}
|
pr = await run_search(
|
||||||
notes: list[str] = []
|
session,
|
||||||
text_results: list[SearchResult] = []
|
q,
|
||||||
vector_results: list[SearchResult] = [] # doc-level (압축 후, fusion 입력)
|
mode=mode, # type: ignore[arg-type]
|
||||||
raw_chunks: list[SearchResult] = [] # chunk-level (raw, Phase 1.3 reranker용)
|
limit=limit,
|
||||||
chunks_by_doc: dict[int, list[SearchResult]] = {} # Phase 1.3 reranker용 보존
|
fusion=fusion,
|
||||||
|
rerank=rerank,
|
||||||
t_total = time.perf_counter()
|
analyze=analyze,
|
||||||
|
|
||||||
if mode == "vector":
|
|
||||||
t0 = time.perf_counter()
|
|
||||||
raw_chunks = await search_vector(session, q, limit)
|
|
||||||
timing["vector_ms"] = (time.perf_counter() - t0) * 1000
|
|
||||||
if not raw_chunks:
|
|
||||||
notes.append("vector_search_returned_empty (AI client error or no embeddings)")
|
|
||||||
# vector 단독 모드도 doc 압축해서 다양성 확보 (chunk 중복 방지)
|
|
||||||
vector_results, chunks_by_doc = compress_chunks_to_docs(raw_chunks, limit)
|
|
||||||
results = vector_results
|
|
||||||
else:
|
|
||||||
t0 = time.perf_counter()
|
|
||||||
text_results = await search_text(session, q, limit)
|
|
||||||
timing["text_ms"] = (time.perf_counter() - t0) * 1000
|
|
||||||
|
|
||||||
if mode == "hybrid":
|
|
||||||
t1 = time.perf_counter()
|
|
||||||
raw_chunks = await search_vector(session, q, limit)
|
|
||||||
timing["vector_ms"] = (time.perf_counter() - t1) * 1000
|
|
||||||
|
|
||||||
# chunk-level → doc-level 압축 (raw chunks는 chunks_by_doc에 보존)
|
|
||||||
t1b = time.perf_counter()
|
|
||||||
vector_results, chunks_by_doc = compress_chunks_to_docs(raw_chunks, limit)
|
|
||||||
timing["compress_ms"] = (time.perf_counter() - t1b) * 1000
|
|
||||||
|
|
||||||
if not vector_results:
|
|
||||||
notes.append("vector_search_returned_empty — text-only fallback")
|
|
||||||
|
|
||||||
t2 = time.perf_counter()
|
|
||||||
strategy = get_strategy(fusion)
|
|
||||||
# fusion은 doc 기준 — 더 넓게 가져옴 (rerank 후보용)
|
|
||||||
fusion_limit = max(limit * 5, 100) if rerank else limit
|
|
||||||
fused_docs = strategy.fuse(text_results, vector_results, q, fusion_limit)
|
|
||||||
timing["fusion_ms"] = (time.perf_counter() - t2) * 1000
|
|
||||||
notes.append(f"fusion={strategy.name}")
|
|
||||||
notes.append(
|
|
||||||
f"chunks raw={len(raw_chunks)} compressed={len(vector_results)} "
|
|
||||||
f"unique_docs={len(chunks_by_doc)}"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
if rerank:
|
|
||||||
# Phase 1.3: reranker — chunk 기준 입력
|
|
||||||
# fusion 결과 doc_id로 chunks_by_doc에서 raw chunks 회수
|
|
||||||
t3 = time.perf_counter()
|
|
||||||
rerank_input: list[SearchResult] = []
|
|
||||||
for doc in fused_docs:
|
|
||||||
chunks = chunks_by_doc.get(doc.id, [])
|
|
||||||
if chunks:
|
|
||||||
# doc당 max 2 chunk (latency/VRAM 보호)
|
|
||||||
rerank_input.extend(chunks[:MAX_CHUNKS_PER_DOC])
|
|
||||||
else:
|
|
||||||
# text-only 매치 doc → doc 자체를 chunk처럼 wrap
|
|
||||||
rerank_input.append(doc)
|
|
||||||
if len(rerank_input) >= MAX_RERANK_INPUT:
|
|
||||||
break
|
|
||||||
rerank_input = rerank_input[:MAX_RERANK_INPUT]
|
|
||||||
notes.append(f"rerank input={len(rerank_input)}")
|
|
||||||
|
|
||||||
reranked = await rerank_chunks(q, rerank_input, limit * 3)
|
|
||||||
timing["rerank_ms"] = (time.perf_counter() - t3) * 1000
|
|
||||||
|
|
||||||
# diversity (chunk → doc 압축, max_per_doc=2, top score>0.90 unlimited)
|
|
||||||
t4 = time.perf_counter()
|
|
||||||
results = apply_diversity(reranked, max_per_doc=MAX_CHUNKS_PER_DOC)[:limit]
|
|
||||||
timing["diversity_ms"] = (time.perf_counter() - t4) * 1000
|
|
||||||
else:
|
|
||||||
# rerank 비활성: fused_docs를 그대로 (limit 적용)
|
|
||||||
results = fused_docs[:limit]
|
|
||||||
else:
|
|
||||||
results = text_results
|
|
||||||
|
|
||||||
# display score 정규화 — 프론트엔드는 score*100을 % 표시.
|
|
||||||
# fusion 내부 score(RRF는 0.01~0.05 범위)를 그대로 노출하면 표시가 깨짐.
|
|
||||||
normalize_display_scores(results)
|
|
||||||
|
|
||||||
timing["total_ms"] = (time.perf_counter() - t_total) * 1000
|
|
||||||
|
|
||||||
# confidence는 fusion 적용 전 raw 신호로 계산 (Phase 0.5 이후 fused score는 절대값 의미 없음)
|
|
||||||
# rerank 활성 시 reranker score가 가장 신뢰할 수 있는 신호 → 우선 사용
|
|
||||||
if mode == "hybrid":
|
|
||||||
if rerank and "rerank_ms" in timing:
|
|
||||||
confidence_signal = compute_confidence_reranked(results)
|
|
||||||
else:
|
|
||||||
confidence_signal = compute_confidence_hybrid(text_results, vector_results)
|
|
||||||
elif mode == "vector":
|
|
||||||
confidence_signal = compute_confidence(vector_results, "vector")
|
|
||||||
else:
|
|
||||||
confidence_signal = compute_confidence(text_results, mode)
|
|
||||||
|
|
||||||
# 사용자 feedback: 모든 단계 timing은 debug 응답과 별도로 항상 로그로 남긴다
|
# 사용자 feedback: 모든 단계 timing은 debug 응답과 별도로 항상 로그로 남긴다
|
||||||
timing_str = " ".join(f"{k}={v:.0f}" for k, v in timing.items())
|
timing_str = " ".join(f"{k}={v:.0f}" for k, v in pr.timing_ms.items())
|
||||||
fusion_str = f" fusion={fusion}" if mode == "hybrid" else ""
|
fusion_str = f" fusion={fusion}" if mode == "hybrid" else ""
|
||||||
|
analyzer_str = (
|
||||||
|
f" analyzer=hit={pr.analyzer_cache_hit}/conf={pr.analyzer_confidence:.2f}/tier={pr.analyzer_tier}"
|
||||||
|
if analyze
|
||||||
|
else ""
|
||||||
|
)
|
||||||
logger.info(
|
logger.info(
|
||||||
"search query=%r mode=%s%s results=%d conf=%.2f %s",
|
"search query=%r mode=%s%s%s results=%d conf=%.2f %s",
|
||||||
q[:80], mode, fusion_str, len(results), confidence_signal, timing_str,
|
q[:80],
|
||||||
|
pr.mode,
|
||||||
|
fusion_str,
|
||||||
|
analyzer_str,
|
||||||
|
len(pr.results),
|
||||||
|
pr.confidence_signal,
|
||||||
|
timing_str,
|
||||||
)
|
)
|
||||||
|
|
||||||
# Phase 0.3: 실패 자동 로깅 (응답 latency에 영향 X — background task)
|
# Phase 0.3: 실패 자동 로깅 (응답 latency에 영향 X — background task)
|
||||||
|
# Phase 2.1: analyze=true일 때만 analyzer_confidence 전달 (False는 None → 기존 호환)
|
||||||
background_tasks.add_task(
|
background_tasks.add_task(
|
||||||
record_search_event, q, user.id, results, mode, confidence_signal
|
record_search_event,
|
||||||
|
q,
|
||||||
|
user.id,
|
||||||
|
pr.results,
|
||||||
|
pr.mode,
|
||||||
|
pr.confidence_signal,
|
||||||
|
pr.analyzer_confidence if analyze else None,
|
||||||
)
|
)
|
||||||
|
|
||||||
debug_obj: SearchDebug | None = None
|
debug_obj = _build_search_debug(pr) if debug else None
|
||||||
if debug:
|
|
||||||
debug_obj = SearchDebug(
|
|
||||||
timing_ms=timing,
|
|
||||||
text_candidates=_to_debug_candidates(text_results) if text_results or mode != "vector" else None,
|
|
||||||
vector_candidates=_to_debug_candidates(vector_results) if vector_results or mode in ("vector", "hybrid") else None,
|
|
||||||
fused_candidates=_to_debug_candidates(results) if mode == "hybrid" else None,
|
|
||||||
confidence=confidence_signal,
|
|
||||||
notes=notes,
|
|
||||||
)
|
|
||||||
|
|
||||||
return SearchResponse(
|
return SearchResponse(
|
||||||
results=results,
|
results=pr.results,
|
||||||
total=len(results),
|
total=len(pr.results),
|
||||||
query=q,
|
query=q,
|
||||||
mode=mode,
|
mode=pr.mode,
|
||||||
|
debug=debug_obj,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
# ═══════════════════════════════════════════════════════════
|
||||||
|
# Phase 3.3: /api/search/ask — Evidence + Grounded Synthesis
|
||||||
|
# ═══════════════════════════════════════════════════════════
|
||||||
|
|
||||||
|
|
||||||
|
class Citation(BaseModel):
|
||||||
|
"""answer 본문의 [n] 에 해당하는 근거 단일 행."""
|
||||||
|
|
||||||
|
n: int
|
||||||
|
chunk_id: int | None
|
||||||
|
doc_id: int
|
||||||
|
title: str | None
|
||||||
|
section_title: str | None
|
||||||
|
span_text: str # evidence LLM 이 추출한 50~300자
|
||||||
|
full_snippet: str # 원본 800자 (citation 원문 보기 전용)
|
||||||
|
relevance: float
|
||||||
|
rerank_score: float
|
||||||
|
|
||||||
|
|
||||||
|
class AskDebug(BaseModel):
|
||||||
|
"""`/ask?debug=true` 응답 확장."""
|
||||||
|
|
||||||
|
timing_ms: dict[str, float]
|
||||||
|
search_notes: list[str]
|
||||||
|
query_analysis: dict | None = None
|
||||||
|
confidence_signal: float
|
||||||
|
evidence_candidate_count: int
|
||||||
|
evidence_kept_count: int
|
||||||
|
evidence_skip_reason: str | None
|
||||||
|
synthesis_cache_hit: bool
|
||||||
|
synthesis_prompt_preview: str | None = None
|
||||||
|
synthesis_raw_preview: str | None = None
|
||||||
|
hallucination_flags: list[str] = []
|
||||||
|
|
||||||
|
|
||||||
|
class AskResponse(BaseModel):
|
||||||
|
"""`/ask` 응답. `/search` 의 SearchResult 는 그대로 재사용."""
|
||||||
|
|
||||||
|
results: list[SearchResult]
|
||||||
|
ai_answer: str | None
|
||||||
|
citations: list[Citation]
|
||||||
|
synthesis_status: Literal[
|
||||||
|
"completed", "timeout", "skipped", "no_evidence", "parse_failed", "llm_error"
|
||||||
|
]
|
||||||
|
synthesis_ms: float
|
||||||
|
confidence: Literal["high", "medium", "low"] | None
|
||||||
|
refused: bool
|
||||||
|
no_results_reason: str | None
|
||||||
|
query: str
|
||||||
|
total: int
|
||||||
|
debug: AskDebug | None = None
|
||||||
|
|
||||||
|
|
||||||
|
def _map_no_results_reason(
|
||||||
|
pr: PipelineResult,
|
||||||
|
evidence: list[EvidenceItem],
|
||||||
|
ev_skip: str | None,
|
||||||
|
sr: SynthesisResult,
|
||||||
|
) -> str | None:
|
||||||
|
"""사용자에게 보여줄 한국어 메시지 매핑.
|
||||||
|
|
||||||
|
Failure mode 표 (plan §Failure Modes) 기반.
|
||||||
|
"""
|
||||||
|
# LLM 자가 refused → 모델이 준 사유 그대로
|
||||||
|
if sr.refused and sr.refuse_reason:
|
||||||
|
return sr.refuse_reason
|
||||||
|
|
||||||
|
# synthesis 상태 우선
|
||||||
|
if sr.status == "no_evidence":
|
||||||
|
if not pr.results:
|
||||||
|
return "검색 결과가 없습니다."
|
||||||
|
return "관련도 높은 근거를 찾지 못했습니다."
|
||||||
|
if sr.status == "skipped":
|
||||||
|
return "검색 결과가 없습니다."
|
||||||
|
if sr.status == "timeout":
|
||||||
|
return "답변 생성이 지연되어 생략했습니다. 검색 결과를 확인해 주세요."
|
||||||
|
if sr.status == "parse_failed":
|
||||||
|
return "답변 형식 오류로 생략했습니다."
|
||||||
|
if sr.status == "llm_error":
|
||||||
|
return "AI 서버에 일시적 문제가 있습니다."
|
||||||
|
|
||||||
|
# evidence 단계 실패는 fallback 을 탔더라도 notes 용
|
||||||
|
if ev_skip == "all_low_rerank":
|
||||||
|
return "관련도 높은 근거를 찾지 못했습니다."
|
||||||
|
if ev_skip == "empty_retrieval":
|
||||||
|
return "검색 결과가 없습니다."
|
||||||
|
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def _build_citations(
|
||||||
|
evidence: list[EvidenceItem], used_citations: list[int]
|
||||||
|
) -> list[Citation]:
|
||||||
|
"""answer 본문에 실제로 등장한 n 만 Citation 으로 변환."""
|
||||||
|
by_n = {e.n: e for e in evidence}
|
||||||
|
out: list[Citation] = []
|
||||||
|
for n in used_citations:
|
||||||
|
e = by_n.get(n)
|
||||||
|
if e is None:
|
||||||
|
continue
|
||||||
|
out.append(
|
||||||
|
Citation(
|
||||||
|
n=e.n,
|
||||||
|
chunk_id=e.chunk_id,
|
||||||
|
doc_id=e.doc_id,
|
||||||
|
title=e.title,
|
||||||
|
section_title=e.section_title,
|
||||||
|
span_text=e.span_text,
|
||||||
|
full_snippet=e.full_snippet,
|
||||||
|
relevance=e.relevance,
|
||||||
|
rerank_score=e.rerank_score,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
return out
|
||||||
|
|
||||||
|
|
||||||
|
def _build_ask_debug(
|
||||||
|
pr: PipelineResult,
|
||||||
|
evidence: list[EvidenceItem],
|
||||||
|
ev_skip: str | None,
|
||||||
|
sr: SynthesisResult,
|
||||||
|
ev_ms: float,
|
||||||
|
synth_ms: float,
|
||||||
|
total_ms: float,
|
||||||
|
) -> AskDebug:
|
||||||
|
timing: dict[str, float] = dict(pr.timing_ms)
|
||||||
|
timing["evidence_ms"] = ev_ms
|
||||||
|
timing["synthesis_ms"] = synth_ms
|
||||||
|
timing["ask_total_ms"] = total_ms
|
||||||
|
|
||||||
|
# candidate count 는 rule filter 통과한 수 (recomputable from results)
|
||||||
|
# 엄밀히는 evidence_service 내부 숫자인데, evidence 길이 ≈ kept, candidate
|
||||||
|
# 는 관측이 어려움 → kept 는 evidence 길이, candidate 는 별도 필드 없음.
|
||||||
|
# 단순화: candidate_count = len(evidence) 를 상한 근사로 둠 (debug 전용).
|
||||||
|
return AskDebug(
|
||||||
|
timing_ms=timing,
|
||||||
|
search_notes=pr.notes,
|
||||||
|
query_analysis=pr.query_analysis,
|
||||||
|
confidence_signal=pr.confidence_signal,
|
||||||
|
evidence_candidate_count=len(evidence),
|
||||||
|
evidence_kept_count=len(evidence),
|
||||||
|
evidence_skip_reason=ev_skip,
|
||||||
|
synthesis_cache_hit=sr.cache_hit,
|
||||||
|
synthesis_prompt_preview=None, # 현재 synthesis_service 에서 노출 안 함
|
||||||
|
synthesis_raw_preview=sr.raw_preview,
|
||||||
|
hallucination_flags=sr.hallucination_flags,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@router.get("/ask", response_model=AskResponse)
|
||||||
|
async def ask(
|
||||||
|
q: str,
|
||||||
|
user: Annotated[User, Depends(get_current_user)],
|
||||||
|
session: Annotated[AsyncSession, Depends(get_session)],
|
||||||
|
background_tasks: BackgroundTasks,
|
||||||
|
limit: int = Query(10, ge=1, le=20, description="synthesis 입력 상한"),
|
||||||
|
debug: bool = Query(False, description="evidence/synthesis 중간 상태 노출"),
|
||||||
|
):
|
||||||
|
"""근거 기반 AI 답변 (Phase 3.3).
|
||||||
|
|
||||||
|
`/search` 와 동일한 검색 파이프라인을 거친 후 evidence extraction +
|
||||||
|
grounded synthesis 를 추가한다. `mode`, `rerank`, `analyze` 는 품질 보장을
|
||||||
|
위해 강제 고정 (hybrid / True / True).
|
||||||
|
|
||||||
|
실패 경로(timeout/parse_failed/refused/...) 에서도 `results` 는 항상 반환.
|
||||||
|
"""
|
||||||
|
t_total = time.perf_counter()
|
||||||
|
|
||||||
|
# 1. 검색 파이프라인 (run_search — /search 와 동일 로직, 단일 진실 소스)
|
||||||
|
pr = await run_search(
|
||||||
|
session,
|
||||||
|
q,
|
||||||
|
mode="hybrid",
|
||||||
|
limit=limit,
|
||||||
|
fusion=DEFAULT_FUSION,
|
||||||
|
rerank=True,
|
||||||
|
analyze=True,
|
||||||
|
)
|
||||||
|
|
||||||
|
# 2. Evidence extraction (rule + LLM span select, 1 batched call)
|
||||||
|
t_ev = time.perf_counter()
|
||||||
|
evidence, ev_skip = await extract_evidence(q, pr.results)
|
||||||
|
ev_ms = (time.perf_counter() - t_ev) * 1000
|
||||||
|
|
||||||
|
# 3. Grounded synthesis (gemma-4, 15s timeout, citation 검증)
|
||||||
|
t_synth = time.perf_counter()
|
||||||
|
sr = await synthesize(q, evidence, debug=debug)
|
||||||
|
synth_ms = (time.perf_counter() - t_synth) * 1000
|
||||||
|
|
||||||
|
total_ms = (time.perf_counter() - t_total) * 1000
|
||||||
|
|
||||||
|
# 4. 응답 구성
|
||||||
|
citations = _build_citations(evidence, sr.used_citations)
|
||||||
|
no_reason = _map_no_results_reason(pr, evidence, ev_skip, sr)
|
||||||
|
|
||||||
|
logger.info(
|
||||||
|
"ask query=%r results=%d evidence=%d cite=%d synth=%s conf=%s refused=%s ev_ms=%.0f synth_ms=%.0f total=%.0f",
|
||||||
|
q[:80],
|
||||||
|
len(pr.results),
|
||||||
|
len(evidence),
|
||||||
|
len(citations),
|
||||||
|
sr.status,
|
||||||
|
sr.confidence or "-",
|
||||||
|
sr.refused,
|
||||||
|
ev_ms,
|
||||||
|
synth_ms,
|
||||||
|
total_ms,
|
||||||
|
)
|
||||||
|
|
||||||
|
# 5. telemetry — 기존 record_search_event 재사용 (Phase 0.3 호환)
|
||||||
|
background_tasks.add_task(
|
||||||
|
record_search_event,
|
||||||
|
q,
|
||||||
|
user.id,
|
||||||
|
pr.results,
|
||||||
|
"hybrid",
|
||||||
|
pr.confidence_signal,
|
||||||
|
pr.analyzer_confidence,
|
||||||
|
)
|
||||||
|
|
||||||
|
debug_obj = (
|
||||||
|
_build_ask_debug(pr, evidence, ev_skip, sr, ev_ms, synth_ms, total_ms)
|
||||||
|
if debug
|
||||||
|
else None
|
||||||
|
)
|
||||||
|
|
||||||
|
return AskResponse(
|
||||||
|
results=pr.results,
|
||||||
|
ai_answer=sr.answer,
|
||||||
|
citations=citations,
|
||||||
|
synthesis_status=sr.status,
|
||||||
|
synthesis_ms=sr.elapsed_ms,
|
||||||
|
confidence=sr.confidence,
|
||||||
|
refused=sr.refused,
|
||||||
|
no_results_reason=no_reason,
|
||||||
|
query=q,
|
||||||
|
total=len(pr.results),
|
||||||
debug=debug_obj,
|
debug=debug_obj,
|
||||||
)
|
)
|
||||||
|
|||||||
@@ -95,7 +95,8 @@ async def _run_migrations(conn) -> None:
|
|||||||
applied = {row[0] for row in result}
|
applied = {row[0] for row in result}
|
||||||
|
|
||||||
# migration 파일 스캔
|
# migration 파일 스캔
|
||||||
migrations_dir = Path(__file__).resolve().parent.parent.parent / "migrations"
|
# /app/core/database.py → parent.parent = /app → /app/migrations (volume mount 위치)
|
||||||
|
migrations_dir = Path(__file__).resolve().parent.parent / "migrations"
|
||||||
if not migrations_dir.is_dir():
|
if not migrations_dir.is_dir():
|
||||||
logger.info("[migration] migrations/ 디렉토리 없음, 스킵")
|
logger.info("[migration] migrations/ 디렉토리 없음, 스킵")
|
||||||
return
|
return
|
||||||
@@ -114,7 +115,10 @@ async def _run_migrations(conn) -> None:
|
|||||||
sql = path.read_text(encoding="utf-8")
|
sql = path.read_text(encoding="utf-8")
|
||||||
_validate_sql_content(name, sql)
|
_validate_sql_content(name, sql)
|
||||||
logger.info(f"[migration] {name} 실행 중...")
|
logger.info(f"[migration] {name} 실행 중...")
|
||||||
await conn.execute(text(sql))
|
# raw driver SQL 사용 — text() 의 :name bind parameter 해석으로
|
||||||
|
# SQL 주석/literal 에 콜론이 들어가면 InvalidRequestError 발생.
|
||||||
|
# exec_driver_sql 은 SQL 을 driver(asyncpg) 에 그대로 전달.
|
||||||
|
await conn.exec_driver_sql(sql)
|
||||||
await conn.execute(
|
await conn.execute(
|
||||||
text("INSERT INTO schema_migrations (version, name) VALUES (:v, :n)"),
|
text("INSERT INTO schema_migrations (version, name) VALUES (:v, :n)"),
|
||||||
{"v": version, "n": name},
|
{"v": version, "n": name},
|
||||||
|
|||||||
16
app/main.py
16
app/main.py
@@ -8,6 +8,7 @@ from sqlalchemy import func, select, text
|
|||||||
|
|
||||||
from api.auth import router as auth_router
|
from api.auth import router as auth_router
|
||||||
from api.dashboard import router as dashboard_router
|
from api.dashboard import router as dashboard_router
|
||||||
|
from api.digest import router as digest_router
|
||||||
from api.documents import router as documents_router
|
from api.documents import router as documents_router
|
||||||
from api.news import router as news_router
|
from api.news import router as news_router
|
||||||
from api.search import router as search_router
|
from api.search import router as search_router
|
||||||
@@ -20,9 +21,13 @@ from models.user import User
|
|||||||
@asynccontextmanager
|
@asynccontextmanager
|
||||||
async def lifespan(app: FastAPI):
|
async def lifespan(app: FastAPI):
|
||||||
"""앱 시작/종료 시 실행되는 lifespan 핸들러"""
|
"""앱 시작/종료 시 실행되는 lifespan 핸들러"""
|
||||||
|
import asyncio
|
||||||
|
|
||||||
from apscheduler.schedulers.asyncio import AsyncIOScheduler
|
from apscheduler.schedulers.asyncio import AsyncIOScheduler
|
||||||
from apscheduler.triggers.cron import CronTrigger
|
from apscheduler.triggers.cron import CronTrigger
|
||||||
|
from services.search.query_analyzer import prewarm_analyzer
|
||||||
from workers.daily_digest import run as daily_digest_run
|
from workers.daily_digest import run as daily_digest_run
|
||||||
|
from workers.digest_worker import run as global_digest_run
|
||||||
from workers.file_watcher import watch_inbox
|
from workers.file_watcher import watch_inbox
|
||||||
from workers.law_monitor import run as law_monitor_run
|
from workers.law_monitor import run as law_monitor_run
|
||||||
from workers.mailplus_archive import run as mailplus_run
|
from workers.mailplus_archive import run as mailplus_run
|
||||||
@@ -51,9 +56,19 @@ async def lifespan(app: FastAPI):
|
|||||||
scheduler.add_job(mailplus_run, CronTrigger(hour=7), id="mailplus_morning")
|
scheduler.add_job(mailplus_run, CronTrigger(hour=7), id="mailplus_morning")
|
||||||
scheduler.add_job(mailplus_run, CronTrigger(hour=18), id="mailplus_evening")
|
scheduler.add_job(mailplus_run, CronTrigger(hour=18), id="mailplus_evening")
|
||||||
scheduler.add_job(daily_digest_run, CronTrigger(hour=20), id="daily_digest")
|
scheduler.add_job(daily_digest_run, CronTrigger(hour=20), id="daily_digest")
|
||||||
|
scheduler.add_job(global_digest_run, CronTrigger(hour=4, minute=0), id="global_digest")
|
||||||
scheduler.add_job(news_collector_run, "interval", hours=6, id="news_collector")
|
scheduler.add_job(news_collector_run, "interval", hours=6, id="news_collector")
|
||||||
scheduler.start()
|
scheduler.start()
|
||||||
|
|
||||||
|
# Phase 2.1 (async 구조): QueryAnalyzer prewarm.
|
||||||
|
# 대표 쿼리 15~20개를 background task로 분석해 cache 적재.
|
||||||
|
# 첫 사용자 요청부터 cache hit rate 70~80% 목표.
|
||||||
|
# 논블로킹 — startup을 막지 않음. MLX 부하 완화 위해 delay_between=0.5.
|
||||||
|
prewarm_task = asyncio.create_task(prewarm_analyzer())
|
||||||
|
prewarm_task.add_done_callback(
|
||||||
|
lambda t: t.exception() and None # 예외는 query_analyzer 내부에서 로깅
|
||||||
|
)
|
||||||
|
|
||||||
yield
|
yield
|
||||||
|
|
||||||
# 종료: 스케줄러 → DB 순서로 정리
|
# 종료: 스케줄러 → DB 순서로 정리
|
||||||
@@ -76,6 +91,7 @@ app.include_router(search_router, prefix="/api/search", tags=["search"])
|
|||||||
|
|
||||||
app.include_router(dashboard_router, prefix="/api/dashboard", tags=["dashboard"])
|
app.include_router(dashboard_router, prefix="/api/dashboard", tags=["dashboard"])
|
||||||
app.include_router(news_router, prefix="/api/news", tags=["news"])
|
app.include_router(news_router, prefix="/api/news", tags=["news"])
|
||||||
|
app.include_router(digest_router, prefix="/api/digest", tags=["digest"])
|
||||||
|
|
||||||
# TODO: Phase 5에서 추가
|
# TODO: Phase 5에서 추가
|
||||||
# app.include_router(tasks.router, prefix="/api/tasks", tags=["tasks"])
|
# app.include_router(tasks.router, prefix="/api/tasks", tags=["tasks"])
|
||||||
|
|||||||
87
app/models/digest.py
Normal file
87
app/models/digest.py
Normal file
@@ -0,0 +1,87 @@
|
|||||||
|
"""global_digests + digest_topics 테이블 ORM (Phase 4)"""
|
||||||
|
|
||||||
|
from datetime import date, datetime
|
||||||
|
|
||||||
|
from sqlalchemy import (
|
||||||
|
BigInteger,
|
||||||
|
Boolean,
|
||||||
|
Date,
|
||||||
|
DateTime,
|
||||||
|
Float,
|
||||||
|
ForeignKey,
|
||||||
|
Integer,
|
||||||
|
String,
|
||||||
|
Text,
|
||||||
|
)
|
||||||
|
from sqlalchemy.dialects.postgresql import JSONB
|
||||||
|
from sqlalchemy.orm import Mapped, mapped_column, relationship
|
||||||
|
|
||||||
|
from core.database import Base
|
||||||
|
|
||||||
|
|
||||||
|
class GlobalDigest(Base):
|
||||||
|
"""하루 단위 digest run 메타데이터"""
|
||||||
|
|
||||||
|
__tablename__ = "global_digests"
|
||||||
|
|
||||||
|
id: Mapped[int] = mapped_column(BigInteger, primary_key=True)
|
||||||
|
digest_date: Mapped[date] = mapped_column(Date, nullable=False, unique=True)
|
||||||
|
window_start: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False)
|
||||||
|
window_end: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False)
|
||||||
|
decay_lambda: Mapped[float] = mapped_column(Float, nullable=False)
|
||||||
|
|
||||||
|
total_articles: Mapped[int] = mapped_column(Integer, nullable=False, default=0)
|
||||||
|
total_countries: Mapped[int] = mapped_column(Integer, nullable=False, default=0)
|
||||||
|
total_topics: Mapped[int] = mapped_column(Integer, nullable=False, default=0)
|
||||||
|
|
||||||
|
generation_ms: Mapped[int | None] = mapped_column(Integer)
|
||||||
|
llm_calls: Mapped[int] = mapped_column(Integer, nullable=False, default=0)
|
||||||
|
llm_failures: Mapped[int] = mapped_column(Integer, nullable=False, default=0)
|
||||||
|
status: Mapped[str] = mapped_column(String(20), nullable=False, default="success")
|
||||||
|
|
||||||
|
created_at: Mapped[datetime] = mapped_column(
|
||||||
|
DateTime(timezone=True), nullable=False, default=datetime.now
|
||||||
|
)
|
||||||
|
|
||||||
|
topics: Mapped[list["DigestTopic"]] = relationship(
|
||||||
|
back_populates="digest",
|
||||||
|
cascade="all, delete-orphan",
|
||||||
|
order_by="DigestTopic.country, DigestTopic.topic_rank",
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class DigestTopic(Base):
|
||||||
|
"""country × topic 단위 cluster 결과"""
|
||||||
|
|
||||||
|
__tablename__ = "digest_topics"
|
||||||
|
|
||||||
|
id: Mapped[int] = mapped_column(BigInteger, primary_key=True)
|
||||||
|
digest_id: Mapped[int] = mapped_column(
|
||||||
|
BigInteger,
|
||||||
|
ForeignKey("global_digests.id", ondelete="CASCADE"),
|
||||||
|
nullable=False,
|
||||||
|
)
|
||||||
|
|
||||||
|
country: Mapped[str] = mapped_column(String(10), nullable=False)
|
||||||
|
topic_rank: Mapped[int] = mapped_column(Integer, nullable=False)
|
||||||
|
|
||||||
|
topic_label: Mapped[str] = mapped_column(Text, nullable=False)
|
||||||
|
summary: Mapped[str] = mapped_column(Text, nullable=False)
|
||||||
|
|
||||||
|
article_ids: Mapped[list] = mapped_column(JSONB, nullable=False)
|
||||||
|
article_count: Mapped[int] = mapped_column(Integer, nullable=False)
|
||||||
|
|
||||||
|
importance_score: Mapped[float] = mapped_column(Float, nullable=False)
|
||||||
|
raw_weight_sum: Mapped[float] = mapped_column(Float, nullable=False)
|
||||||
|
|
||||||
|
centroid_sample: Mapped[dict | None] = mapped_column(JSONB)
|
||||||
|
llm_model: Mapped[str | None] = mapped_column(String(100))
|
||||||
|
llm_fallback_used: Mapped[bool] = mapped_column(
|
||||||
|
Boolean, nullable=False, default=False
|
||||||
|
)
|
||||||
|
|
||||||
|
created_at: Mapped[datetime] = mapped_column(
|
||||||
|
DateTime(timezone=True), nullable=False, default=datetime.now
|
||||||
|
)
|
||||||
|
|
||||||
|
digest: Mapped["GlobalDigest"] = relationship(back_populates="topics")
|
||||||
19
app/prompts/digest_topic.txt
Normal file
19
app/prompts/digest_topic.txt
Normal file
@@ -0,0 +1,19 @@
|
|||||||
|
너는 팩트 기반 뉴스 토픽 요약 도우미다.
|
||||||
|
아래는 같은 사건으로 군집된 기사들의 ai_summary다.
|
||||||
|
이 정보만으로 다음을 JSON으로만 출력하라.
|
||||||
|
|
||||||
|
절대 금지:
|
||||||
|
- 제공된 summary에 없는 사실 추가
|
||||||
|
- 해석/비교/예측/의견
|
||||||
|
- "보인다", "~할 것이다", "~할 전망" 같은 추측 표현
|
||||||
|
- 인용부호 안 원문 외 단어 생성
|
||||||
|
- JSON 외의 모든 텍스트 (설명, 마크다운, 코드블록 금지)
|
||||||
|
|
||||||
|
출력 형식 (JSON 객체 하나만 출력):
|
||||||
|
{
|
||||||
|
"topic_label": "5~10 단어의 한국어 제목",
|
||||||
|
"summary": "1~2 문장, 사실만, 수동태 허용"
|
||||||
|
}
|
||||||
|
|
||||||
|
기사 요약:
|
||||||
|
{articles_block}
|
||||||
76
app/prompts/evidence_extract.txt
Normal file
76
app/prompts/evidence_extract.txt
Normal file
@@ -0,0 +1,76 @@
|
|||||||
|
You are an evidence span extractor. Respond ONLY in JSON. No markdown, no explanation.
|
||||||
|
|
||||||
|
## Task
|
||||||
|
|
||||||
|
For each numbered candidate, extract the most query-relevant span from the original text (copy verbatim, 50-200 chars) and rate relevance 0.0~1.0. If the candidate does not directly answer the query, set span=null, relevance=0.0, skip_reason.
|
||||||
|
|
||||||
|
## Output Schema
|
||||||
|
{
|
||||||
|
"items": [
|
||||||
|
{
|
||||||
|
"n": 1,
|
||||||
|
"span": "...",
|
||||||
|
"relevance": 0.0,
|
||||||
|
"skip_reason": null
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
|
||||||
|
## Rules
|
||||||
|
- `n`: candidate 번호 (1-based, 입력 순서와 동일). **모든 n을 반환** (skip된 것도 포함).
|
||||||
|
- `span`: 원문에서 **그대로 복사한** 50~200자. 요약/변형 금지. 원문에 없는 단어는 절대 포함하지 말 것. 여러 문장이어도 무방.
|
||||||
|
- 관련 span이 없으면 `span: null`, `relevance: 0.0`, `skip_reason`에 한 줄 사유.
|
||||||
|
- `relevance`: 0.0~1.0 float
|
||||||
|
- 0.9+ query에 직접 답함
|
||||||
|
- 0.7~0.9 강한 연관
|
||||||
|
- 0.5~0.7 부분 연관
|
||||||
|
- <0.5 약한/무관 (fallback에서 탈락)
|
||||||
|
- `skip_reason`: span=null 일 때만 필수. 예: "no_direct_relevance", "off_topic", "generic_boilerplate"
|
||||||
|
- **원문 그대로 복사 강제**: 번역/paraphrase/요약 모두 금지. evidence span은 citation 원문이 되어야 한다.
|
||||||
|
|
||||||
|
## Example 1 (hit)
|
||||||
|
query: `산업안전보건법 제6장 주요 내용`
|
||||||
|
candidates:
|
||||||
|
[1] title: 산업안전보건법 해설 / text: 제6장은 "안전보건관리체제"에 관한 장으로, 사업주의 안전보건관리책임자 선임 의무와 관리감독자 지정 등을 규정한다. 제15조부터 제19조까지 구성된다...
|
||||||
|
[2] title: 회사 복지 규정 / text: 직원의 연차휴가 사용 규정과 경조사 지원 내용을 담고 있다...
|
||||||
|
|
||||||
|
→
|
||||||
|
{
|
||||||
|
"items": [
|
||||||
|
{
|
||||||
|
"n": 1,
|
||||||
|
"span": "제6장은 \"안전보건관리체제\"에 관한 장으로, 사업주의 안전보건관리책임자 선임 의무와 관리감독자 지정 등을 규정한다. 제15조부터 제19조까지 구성된다",
|
||||||
|
"relevance": 0.95,
|
||||||
|
"skip_reason": null
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"n": 2,
|
||||||
|
"span": null,
|
||||||
|
"relevance": 0.0,
|
||||||
|
"skip_reason": "off_topic"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
|
||||||
|
## Example 2 (partial)
|
||||||
|
query: `Python async best practice`
|
||||||
|
candidates:
|
||||||
|
[1] title: FastAPI tutorial / text: FastAPI supports both async and sync endpoints. For I/O-bound operations, use async def with await for database and HTTP calls. Avoid blocking calls in async functions or use run_in_executor...
|
||||||
|
|
||||||
|
→
|
||||||
|
{
|
||||||
|
"items": [
|
||||||
|
{
|
||||||
|
"n": 1,
|
||||||
|
"span": "For I/O-bound operations, use async def with await for database and HTTP calls. Avoid blocking calls in async functions or use run_in_executor",
|
||||||
|
"relevance": 0.82,
|
||||||
|
"skip_reason": null
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
|
||||||
|
## Query
|
||||||
|
{query}
|
||||||
|
|
||||||
|
## Candidates
|
||||||
|
{numbered_candidates}
|
||||||
53
app/prompts/query_analyze.txt
Normal file
53
app/prompts/query_analyze.txt
Normal file
@@ -0,0 +1,53 @@
|
|||||||
|
You are a search query analyzer. Respond ONLY in JSON. No markdown, no explanation.
|
||||||
|
|
||||||
|
## Output Schema
|
||||||
|
{
|
||||||
|
"intent": "fact_lookup | semantic_search | filter_browse",
|
||||||
|
"query_type": "natural_language | keyword | phrase",
|
||||||
|
"domain_hint": "document | news | mixed",
|
||||||
|
"language_scope": "limited | global",
|
||||||
|
"keywords": [],
|
||||||
|
"must_terms": [],
|
||||||
|
"optional_terms": [],
|
||||||
|
"hard_filters": {},
|
||||||
|
"soft_filters": {"domain": [], "document_type": []},
|
||||||
|
"normalized_queries": [{"lang": "ko", "text": "...", "weight": 1.0}],
|
||||||
|
"expanded_terms": [],
|
||||||
|
"synonyms": {},
|
||||||
|
"analyzer_confidence": 0.0
|
||||||
|
}
|
||||||
|
|
||||||
|
## Rules
|
||||||
|
- `intent`: fact_lookup (사실/조항/이름), semantic_search (주제/개념), filter_browse (필터 중심)
|
||||||
|
- `query_type`: natural_language (문장형), keyword (단어 나열), phrase (따옴표/고유명사/법조항)
|
||||||
|
- `domain_hint`: document (소유 문서/법령/매뉴얼), news (시사/뉴스), mixed (불명)
|
||||||
|
- `language_scope`: limited (ko+en), global (다국어 필요)
|
||||||
|
- `hard_filters`: 쿼리에 **명시된** 것만. 추론 금지. 키: file_format, year, country
|
||||||
|
- `soft_filters.domain`: Industrial_Safety, Programming, Engineering, Philosophy, Language, General. 2-level 허용(e.g. Industrial_Safety/Legislation)
|
||||||
|
- `soft_filters.document_type`: Law_Document, Manual, Report, Academic_Paper, Standard, Specification, Meeting_Minutes, Checklist, Note, Memo, Reference, Drawing, Template
|
||||||
|
- `normalized_queries`: 원문 언어 1.0 가중치 필수. 교차언어 1개 추가 권장(ko↔en, weight 0.8). news + global 인 경우만 ja/zh 추가(weight 0.5~0.6). **최대 3개**.
|
||||||
|
- `analyzer_confidence`: 0.9+ 명확, 0.7~0.9 대체로 명확, 0.5~0.7 모호, <0.5 분석 불가
|
||||||
|
|
||||||
|
## Example
|
||||||
|
query: `기계 사고 관련 법령`
|
||||||
|
{
|
||||||
|
"intent": "semantic_search",
|
||||||
|
"query_type": "natural_language",
|
||||||
|
"domain_hint": "document",
|
||||||
|
"language_scope": "limited",
|
||||||
|
"keywords": ["기계", "사고", "법령"],
|
||||||
|
"must_terms": [],
|
||||||
|
"optional_terms": ["안전", "규정"],
|
||||||
|
"hard_filters": {},
|
||||||
|
"soft_filters": {"domain": ["Industrial_Safety/Legislation"], "document_type": ["Law_Document"]},
|
||||||
|
"normalized_queries": [
|
||||||
|
{"lang": "ko", "text": "기계 사고 관련 법령", "weight": 1.0},
|
||||||
|
{"lang": "en", "text": "machinery accident related laws", "weight": 0.8}
|
||||||
|
],
|
||||||
|
"expanded_terms": ["산업안전", "기계안전"],
|
||||||
|
"synonyms": {},
|
||||||
|
"analyzer_confidence": 0.88
|
||||||
|
}
|
||||||
|
|
||||||
|
## Query
|
||||||
|
{query}
|
||||||
80
app/prompts/search_synthesis.txt
Normal file
80
app/prompts/search_synthesis.txt
Normal file
@@ -0,0 +1,80 @@
|
|||||||
|
You are a grounded answer synthesizer. Respond ONLY in JSON. No markdown, no explanation.
|
||||||
|
|
||||||
|
## Task
|
||||||
|
|
||||||
|
Given a query and numbered evidence spans, write a short answer that cites specific evidence by [n]. **You may only use facts that appear in the evidence.** If the evidence does not directly answer the query, set `refused: true`.
|
||||||
|
|
||||||
|
## Output Schema
|
||||||
|
{
|
||||||
|
"answer": "...",
|
||||||
|
"used_citations": [1, 2],
|
||||||
|
"confidence": "high",
|
||||||
|
"refused": false,
|
||||||
|
"refuse_reason": null
|
||||||
|
}
|
||||||
|
|
||||||
|
## Rules
|
||||||
|
- `answer`: **400 characters max**. Must contain inline `[n]` citations. Every claim sentence ends with at least one `[n]`. Multiple sources: `[1][3]`. **Only use facts present in evidence. No outside knowledge, no guessing, no paraphrasing what is not there.**
|
||||||
|
- `used_citations`: integer list of `n` values that actually appear in `answer` (for cross-check). Must be sorted ascending, no duplicates.
|
||||||
|
- `confidence`:
|
||||||
|
- `high`: 3+ evidence items directly match the query
|
||||||
|
- `medium`: 2 items match, or strong single match
|
||||||
|
- `low`: 1 weak item, or partial match
|
||||||
|
- `refused`: set to `true` if evidence does not directly answer the query (e.g. off-topic, too generic, missing key facts). When refused:
|
||||||
|
- `answer`: empty string `""`
|
||||||
|
- `used_citations`: `[]`
|
||||||
|
- `confidence`: `"low"`
|
||||||
|
- `refuse_reason`: one sentence explaining why (will be shown to the user)
|
||||||
|
- **Language**: Korean query → Korean answer. English query → English answer. Match query language.
|
||||||
|
- **Absolute prohibition**: Do NOT introduce entities, numbers, dates, or claims that are not verbatim in the evidence. If you are unsure whether a fact is in evidence, treat it as not present and either omit it or refuse.
|
||||||
|
|
||||||
|
## Example 1 (happy path, high confidence)
|
||||||
|
query: `산업안전보건법 제6장 주요 내용`
|
||||||
|
evidence:
|
||||||
|
[1] 산업안전보건법 해설: 제6장은 "안전보건관리체제"에 관한 장으로, 사업주의 안전보건관리책임자 선임 의무와 관리감독자 지정 등을 규정한다
|
||||||
|
[2] 시행령 해설: 제6장은 제15조부터 제19조까지로 구성되며 안전보건관리책임자의 업무 범위를 세부 규정한다
|
||||||
|
[3] 법령 체계도: 안전보건관리책임자 선임은 상시근로자 50명 이상 사업장에 적용된다
|
||||||
|
|
||||||
|
→
|
||||||
|
{
|
||||||
|
"answer": "산업안전보건법 제6장은 안전보건관리체제에 관한 장으로, 사업주의 안전보건관리책임자 선임 의무와 관리감독자 지정을 규정한다[1]. 제15조부터 제19조까지 구성되며 관리책임자의 업무 범위를 세부 규정한다[2]. 상시근로자 50명 이상 사업장에 적용된다[3].",
|
||||||
|
"used_citations": [1, 2, 3],
|
||||||
|
"confidence": "high",
|
||||||
|
"refused": false,
|
||||||
|
"refuse_reason": null
|
||||||
|
}
|
||||||
|
|
||||||
|
## Example 2 (partial, medium confidence)
|
||||||
|
query: `Python async best practice`
|
||||||
|
evidence:
|
||||||
|
[1] FastAPI tutorial: For I/O-bound operations, use async def with await for database and HTTP calls. Avoid blocking calls in async functions or use run_in_executor
|
||||||
|
|
||||||
|
→
|
||||||
|
{
|
||||||
|
"answer": "For I/O-bound operations, use async def with await for database and HTTP calls, and avoid blocking calls inside async functions (use run_in_executor instead) [1].",
|
||||||
|
"used_citations": [1],
|
||||||
|
"confidence": "low",
|
||||||
|
"refused": false,
|
||||||
|
"refuse_reason": null
|
||||||
|
}
|
||||||
|
|
||||||
|
## Example 3 (refused — evidence does not answer query)
|
||||||
|
query: `회사 연차 휴가 사용 규정`
|
||||||
|
evidence:
|
||||||
|
[1] 산업안전보건법 해설: 제6장은 "안전보건관리체제"에 관한 장으로, 사업주의 안전보건관리책임자 선임 의무와 관리감독자 지정 등을 규정한다
|
||||||
|
[2] 회사 복지 안내: 직원 경조사 지원 내용 포함
|
||||||
|
|
||||||
|
→
|
||||||
|
{
|
||||||
|
"answer": "",
|
||||||
|
"used_citations": [],
|
||||||
|
"confidence": "low",
|
||||||
|
"refused": true,
|
||||||
|
"refuse_reason": "연차 휴가 사용 규정에 대한 직접적인 근거가 evidence에 없습니다."
|
||||||
|
}
|
||||||
|
|
||||||
|
## Query
|
||||||
|
{query}
|
||||||
|
|
||||||
|
## Evidence
|
||||||
|
{numbered_evidence}
|
||||||
1
app/services/digest/__init__.py
Normal file
1
app/services/digest/__init__.py
Normal file
@@ -0,0 +1 @@
|
|||||||
|
"""Phase 4 Global Digest 서비스 레이어 — 7일 뉴스 batch clustering + summarization."""
|
||||||
118
app/services/digest/clustering.py
Normal file
118
app/services/digest/clustering.py
Normal file
@@ -0,0 +1,118 @@
|
|||||||
|
"""Time-decay weight + adaptive threshold + EMA centroid greedy clustering.
|
||||||
|
|
||||||
|
플랜의 핵심 결정:
|
||||||
|
- λ = ln(2)/3 (3일 반감기)
|
||||||
|
- threshold: 0.75 / 0.78 / 0.80 (밀도 기반 adaptive)
|
||||||
|
- centroid: EMA α=0.7 (단순 평균의 seed bias / drift 방어)
|
||||||
|
- min_articles_per_topic = 3, max_topics_per_country = 10
|
||||||
|
- importance_score: country 내 0~1 normalize + max(score, 0.01) floor
|
||||||
|
- raw_weight_sum 별도 보존 (cross-day 트렌드 분석용)
|
||||||
|
"""
|
||||||
|
|
||||||
|
import math
|
||||||
|
from datetime import datetime, timezone
|
||||||
|
|
||||||
|
import numpy as np
|
||||||
|
|
||||||
|
from core.utils import setup_logger
|
||||||
|
|
||||||
|
logger = setup_logger("digest_clustering")
|
||||||
|
|
||||||
|
LAMBDA = math.log(2) / 3 # 3일 반감기 — 사용자 확정값
|
||||||
|
CENTROID_ALPHA = 0.7 # EMA: 기존 중심 70% 유지, 새 멤버 30% 반영
|
||||||
|
MIN_ARTICLES_PER_TOPIC = 3
|
||||||
|
MAX_TOPICS_PER_COUNTRY = 10
|
||||||
|
SCORE_FLOOR = 0.01 # UI 0 표시 문제 사전 차단
|
||||||
|
|
||||||
|
|
||||||
|
def adaptive_threshold(n_docs: int) -> float:
|
||||||
|
"""문서 밀도 기반 동적 threshold — fragmentation/blob 동시 방어."""
|
||||||
|
if n_docs > 200:
|
||||||
|
return 0.80
|
||||||
|
if n_docs < 50:
|
||||||
|
return 0.75
|
||||||
|
return 0.78
|
||||||
|
|
||||||
|
|
||||||
|
def _normalize(v: np.ndarray) -> np.ndarray:
|
||||||
|
norm = float(np.linalg.norm(v))
|
||||||
|
if norm == 0.0:
|
||||||
|
return v
|
||||||
|
return v / norm
|
||||||
|
|
||||||
|
|
||||||
|
def _decay_weight(now: datetime, created_at: datetime) -> float:
|
||||||
|
"""exp(-λ * days_ago). created_at 이 naive 면 UTC 가정."""
|
||||||
|
if created_at.tzinfo is None:
|
||||||
|
created_at = created_at.replace(tzinfo=timezone.utc)
|
||||||
|
days = (now - created_at).total_seconds() / 86400.0
|
||||||
|
if days < 0:
|
||||||
|
days = 0.0
|
||||||
|
return math.exp(-LAMBDA * days)
|
||||||
|
|
||||||
|
|
||||||
|
def cluster_country(country: str, docs: list[dict]) -> list[dict]:
|
||||||
|
"""단일 country 의 docs 를 cluster 로 묶어 정렬 + normalize 후 반환.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
country: 국가 코드 (KR, US, ...)
|
||||||
|
docs: loader.load_news_window 의 출력 (단일 country 슬라이스)
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
[{centroid, members, weight_sum, raw_weight_sum, importance_score}, ...]
|
||||||
|
- members 는 weight 가 채워진 doc dict 리스트
|
||||||
|
- 정렬: importance_score 내림차순, 최대 MAX_TOPICS_PER_COUNTRY 개
|
||||||
|
"""
|
||||||
|
if not docs:
|
||||||
|
logger.info(f"[{country}] docs=0 → skip")
|
||||||
|
return []
|
||||||
|
|
||||||
|
threshold = adaptive_threshold(len(docs))
|
||||||
|
now = datetime.now(timezone.utc)
|
||||||
|
|
||||||
|
# time-decay weight 계산 + 가중치 높은 순으로 seed 우선
|
||||||
|
for d in docs:
|
||||||
|
d["weight"] = _decay_weight(now, d["created_at"])
|
||||||
|
docs.sort(key=lambda d: -d["weight"])
|
||||||
|
|
||||||
|
clusters: list[dict] = []
|
||||||
|
for d in docs:
|
||||||
|
v = _normalize(d["embedding"])
|
||||||
|
best_idx, best_sim = -1, 0.0
|
||||||
|
for i, c in enumerate(clusters):
|
||||||
|
sim = float(np.dot(c["centroid"], v))
|
||||||
|
if sim > best_sim and sim >= threshold:
|
||||||
|
best_sim, best_idx = sim, i
|
||||||
|
if best_idx >= 0:
|
||||||
|
c = clusters[best_idx]
|
||||||
|
# EMA centroid update — drift 방지
|
||||||
|
c["centroid"] = CENTROID_ALPHA * c["centroid"] + (1.0 - CENTROID_ALPHA) * v
|
||||||
|
c["centroid"] = _normalize(c["centroid"])
|
||||||
|
c["members"].append(d)
|
||||||
|
c["weight_sum"] += d["weight"]
|
||||||
|
else:
|
||||||
|
clusters.append({
|
||||||
|
"centroid": v,
|
||||||
|
"members": [d],
|
||||||
|
"weight_sum": d["weight"],
|
||||||
|
})
|
||||||
|
|
||||||
|
raw_count = len(clusters)
|
||||||
|
clusters = [c for c in clusters if len(c["members"]) >= MIN_ARTICLES_PER_TOPIC]
|
||||||
|
dropped = raw_count - len(clusters)
|
||||||
|
clusters.sort(key=lambda c: -c["weight_sum"])
|
||||||
|
clusters = clusters[:MAX_TOPICS_PER_COUNTRY]
|
||||||
|
|
||||||
|
# country 내 normalize (0~1) + floor
|
||||||
|
if clusters:
|
||||||
|
max_w = max(c["weight_sum"] for c in clusters)
|
||||||
|
for c in clusters:
|
||||||
|
normalized = (c["weight_sum"] / max_w) if max_w > 0 else 0.0
|
||||||
|
c["raw_weight_sum"] = c["weight_sum"]
|
||||||
|
c["importance_score"] = max(normalized, SCORE_FLOOR)
|
||||||
|
|
||||||
|
logger.info(
|
||||||
|
f"[{country}] docs={len(docs)} threshold={threshold} "
|
||||||
|
f"raw_clusters={raw_count} dropped={dropped} kept={len(clusters)}"
|
||||||
|
)
|
||||||
|
return clusters
|
||||||
149
app/services/digest/loader.py
Normal file
149
app/services/digest/loader.py
Normal file
@@ -0,0 +1,149 @@
|
|||||||
|
"""뉴스 7일 window 로드 + country 정규화
|
||||||
|
|
||||||
|
- documents 테이블엔 country 컬럼이 없으므로 document_chunks.country 를 first non-null 로 조인.
|
||||||
|
- chunk-level country 도 NULL 이면 news_sources.name prefix(ai_sub_group) 매칭으로 fallback.
|
||||||
|
- 그래도 NULL 이면 drop(로그 경고).
|
||||||
|
- ai_summary / embedding 이 NULL 이면 처음부터 제외 (재요약/재임베딩 0회 원칙).
|
||||||
|
"""
|
||||||
|
|
||||||
|
from collections import defaultdict
|
||||||
|
from datetime import datetime
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
|
import numpy as np
|
||||||
|
from sqlalchemy import text
|
||||||
|
|
||||||
|
from core.database import async_session
|
||||||
|
from core.utils import setup_logger
|
||||||
|
|
||||||
|
logger = setup_logger("digest_loader")
|
||||||
|
|
||||||
|
|
||||||
|
_NEWS_WINDOW_SQL = text("""
|
||||||
|
SELECT
|
||||||
|
d.id,
|
||||||
|
d.title,
|
||||||
|
d.ai_summary,
|
||||||
|
d.embedding,
|
||||||
|
d.created_at,
|
||||||
|
d.edit_url,
|
||||||
|
d.ai_sub_group,
|
||||||
|
(
|
||||||
|
SELECT c.country
|
||||||
|
FROM document_chunks c
|
||||||
|
WHERE c.doc_id = d.id AND c.country IS NOT NULL
|
||||||
|
LIMIT 1
|
||||||
|
) AS chunk_country
|
||||||
|
FROM documents d
|
||||||
|
WHERE d.source_channel = 'news'
|
||||||
|
AND d.deleted_at IS NULL
|
||||||
|
AND d.created_at >= :window_start
|
||||||
|
AND d.created_at < :window_end
|
||||||
|
AND d.embedding IS NOT NULL
|
||||||
|
AND d.ai_summary IS NOT NULL
|
||||||
|
""")
|
||||||
|
|
||||||
|
|
||||||
|
_SOURCE_COUNTRY_SQL = text("""
|
||||||
|
SELECT name, country FROM news_sources WHERE country IS NOT NULL
|
||||||
|
""")
|
||||||
|
|
||||||
|
|
||||||
|
def _to_numpy_embedding(raw: Any) -> np.ndarray | None:
|
||||||
|
"""pgvector 컬럼을 numpy array(float32)로 정규화.
|
||||||
|
|
||||||
|
raw SQL + asyncpg 조합에서 pgvector type 이 등록 안 되어 있으면
|
||||||
|
embedding 이 '[0.1,0.2,...]' 같은 string 으로 반환된다. ORM 을 안 쓰므로
|
||||||
|
이 경우 직접 파싱해야 한다.
|
||||||
|
"""
|
||||||
|
if raw is None:
|
||||||
|
return None
|
||||||
|
if isinstance(raw, str):
|
||||||
|
import json
|
||||||
|
try:
|
||||||
|
raw = json.loads(raw)
|
||||||
|
except json.JSONDecodeError:
|
||||||
|
return None
|
||||||
|
try:
|
||||||
|
arr = np.asarray(raw, dtype=np.float32)
|
||||||
|
except (TypeError, ValueError):
|
||||||
|
return None
|
||||||
|
if arr.size == 0:
|
||||||
|
return None
|
||||||
|
return arr
|
||||||
|
|
||||||
|
|
||||||
|
async def _load_source_country_map(session) -> dict[str, str]:
|
||||||
|
"""news_sources name → country 매핑.
|
||||||
|
|
||||||
|
name 은 '경향신문 문화' 형태이고 documents.ai_sub_group 은 '경향신문' (split[0]).
|
||||||
|
prefix 매칭이 가능하도록 첫 토큰 → country 로 인덱싱.
|
||||||
|
"""
|
||||||
|
rows = await session.execute(_SOURCE_COUNTRY_SQL)
|
||||||
|
mapping: dict[str, str] = {}
|
||||||
|
for name, country in rows:
|
||||||
|
if not name or not country:
|
||||||
|
continue
|
||||||
|
prefix = name.split(" ")[0].strip()
|
||||||
|
if prefix and prefix not in mapping:
|
||||||
|
mapping[prefix] = country
|
||||||
|
return mapping
|
||||||
|
|
||||||
|
|
||||||
|
async def load_news_window(
|
||||||
|
window_start: datetime,
|
||||||
|
window_end: datetime,
|
||||||
|
) -> dict[str, list[dict]]:
|
||||||
|
"""주어진 윈도우 안의 뉴스 documents 를 country 별 dict 로 반환.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
{"KR": [doc_dict, ...], "US": [...], ...}
|
||||||
|
"""
|
||||||
|
docs_by_country: dict[str, list[dict]] = defaultdict(list)
|
||||||
|
null_country_count = 0
|
||||||
|
total = 0
|
||||||
|
|
||||||
|
async with async_session() as session:
|
||||||
|
source_country = await _load_source_country_map(session)
|
||||||
|
|
||||||
|
result = await session.execute(
|
||||||
|
_NEWS_WINDOW_SQL,
|
||||||
|
{"window_start": window_start, "window_end": window_end},
|
||||||
|
)
|
||||||
|
for row in result.mappings():
|
||||||
|
embedding = _to_numpy_embedding(row["embedding"])
|
||||||
|
if embedding is None:
|
||||||
|
continue
|
||||||
|
|
||||||
|
country = row["chunk_country"]
|
||||||
|
if not country:
|
||||||
|
# news_sources prefix fallback
|
||||||
|
ai_sub_group = (row["ai_sub_group"] or "").strip()
|
||||||
|
if ai_sub_group:
|
||||||
|
country = source_country.get(ai_sub_group)
|
||||||
|
if not country:
|
||||||
|
null_country_count += 1
|
||||||
|
continue
|
||||||
|
|
||||||
|
country = country.upper()
|
||||||
|
docs_by_country[country].append({
|
||||||
|
"id": int(row["id"]),
|
||||||
|
"title": row["title"] or "",
|
||||||
|
"ai_summary": row["ai_summary"] or "",
|
||||||
|
"embedding": embedding,
|
||||||
|
"created_at": row["created_at"],
|
||||||
|
"edit_url": row["edit_url"] or "",
|
||||||
|
"ai_sub_group": row["ai_sub_group"] or "",
|
||||||
|
})
|
||||||
|
total += 1
|
||||||
|
|
||||||
|
if null_country_count:
|
||||||
|
logger.warning(
|
||||||
|
f"[loader] country 분류 실패로 drop된 문서 {null_country_count}건 "
|
||||||
|
f"(chunk_country + news_sources fallback 모두 실패)"
|
||||||
|
)
|
||||||
|
logger.info(
|
||||||
|
f"[loader] window {window_start.date()} ~ {window_end.date()} → "
|
||||||
|
f"{total}건 ({len(docs_by_country)}개 국가)"
|
||||||
|
)
|
||||||
|
return dict(docs_by_country)
|
||||||
177
app/services/digest/pipeline.py
Normal file
177
app/services/digest/pipeline.py
Normal file
@@ -0,0 +1,177 @@
|
|||||||
|
"""Phase 4 digest pipeline orchestration.
|
||||||
|
|
||||||
|
Step:
|
||||||
|
1. AIClient 생성
|
||||||
|
2. 7일 window 로 documents 로드 (loader)
|
||||||
|
3. country 별 cluster_country (clustering)
|
||||||
|
4. cluster 별 select_for_llm (selection)
|
||||||
|
5. cluster 별 summarize_cluster_with_fallback (summarizer, LLM)
|
||||||
|
6. DELETE+INSERT 단일 트랜잭션 (idempotent)
|
||||||
|
7. start/end 로그 + generation_ms + fallback 비율 health metric
|
||||||
|
"""
|
||||||
|
|
||||||
|
import hashlib
|
||||||
|
import time
|
||||||
|
from datetime import datetime, timedelta, timezone
|
||||||
|
from zoneinfo import ZoneInfo
|
||||||
|
|
||||||
|
from sqlalchemy import delete
|
||||||
|
|
||||||
|
from ai.client import AIClient
|
||||||
|
from core.database import async_session
|
||||||
|
from core.utils import setup_logger
|
||||||
|
from models.digest import DigestTopic, GlobalDigest
|
||||||
|
|
||||||
|
from .clustering import LAMBDA, cluster_country
|
||||||
|
from .loader import load_news_window
|
||||||
|
from .selection import select_for_llm
|
||||||
|
from .summarizer import summarize_cluster_with_fallback
|
||||||
|
|
||||||
|
logger = setup_logger("digest_pipeline")
|
||||||
|
|
||||||
|
WINDOW_DAYS = 7
|
||||||
|
KST = ZoneInfo("Asia/Seoul")
|
||||||
|
|
||||||
|
|
||||||
|
def _kst_today() -> datetime:
|
||||||
|
return datetime.now(KST).date()
|
||||||
|
|
||||||
|
|
||||||
|
def _summary_hash(text: str) -> str:
|
||||||
|
return hashlib.sha256((text or "").encode("utf-8")).hexdigest()[:16]
|
||||||
|
|
||||||
|
|
||||||
|
def _build_topic_row(
|
||||||
|
country: str,
|
||||||
|
rank: int,
|
||||||
|
cluster: dict,
|
||||||
|
selected: list[dict],
|
||||||
|
llm_result: dict,
|
||||||
|
primary_model: str,
|
||||||
|
) -> DigestTopic:
|
||||||
|
"""LLM 결과 + cluster 메타 → DigestTopic ORM 인스턴스.
|
||||||
|
|
||||||
|
article_ids 는 코드가 cluster.members 에서 직접 주입 (LLM 생성 금지 → id 위조 불가).
|
||||||
|
"""
|
||||||
|
article_ids = [int(m["id"]) for m in cluster["members"]]
|
||||||
|
centroid_sample = {
|
||||||
|
"selected_doc_ids": [int(m["id"]) for m in selected],
|
||||||
|
"summary_hashes": [_summary_hash(m.get("ai_summary") or "") for m in selected],
|
||||||
|
}
|
||||||
|
return DigestTopic(
|
||||||
|
country=country,
|
||||||
|
topic_rank=rank,
|
||||||
|
topic_label=llm_result["topic_label"],
|
||||||
|
summary=llm_result["summary"],
|
||||||
|
article_ids=article_ids,
|
||||||
|
article_count=len(article_ids),
|
||||||
|
importance_score=float(cluster["importance_score"]),
|
||||||
|
raw_weight_sum=float(cluster["raw_weight_sum"]),
|
||||||
|
centroid_sample=centroid_sample,
|
||||||
|
llm_model=primary_model,
|
||||||
|
llm_fallback_used=bool(llm_result["llm_fallback_used"]),
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
async def run_digest_pipeline() -> dict:
|
||||||
|
"""전체 파이프라인 실행. worker entry 에서 호출.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
실행 통계 dict {llm_calls, fallback_used, total_topics, generation_ms}
|
||||||
|
"""
|
||||||
|
start = time.time()
|
||||||
|
|
||||||
|
window_end = datetime.now(timezone.utc)
|
||||||
|
window_start = window_end - timedelta(days=WINDOW_DAYS)
|
||||||
|
digest_date = _kst_today()
|
||||||
|
|
||||||
|
logger.info(
|
||||||
|
f"[global_digest] start window={window_start.date()} ~ {window_end.date()} "
|
||||||
|
f"digest_date={digest_date} decay_lambda={LAMBDA:.4f}"
|
||||||
|
)
|
||||||
|
|
||||||
|
docs_by_country = await load_news_window(window_start, window_end)
|
||||||
|
if not docs_by_country:
|
||||||
|
logger.warning("[global_digest] 7일 window에 뉴스 0건 — digest 생성 스킵")
|
||||||
|
return {
|
||||||
|
"llm_calls": 0,
|
||||||
|
"fallback_used": 0,
|
||||||
|
"total_topics": 0,
|
||||||
|
"generation_ms": int((time.time() - start) * 1000),
|
||||||
|
}
|
||||||
|
|
||||||
|
client = AIClient()
|
||||||
|
primary_model = client.ai.primary.model
|
||||||
|
|
||||||
|
all_topic_rows: list[DigestTopic] = []
|
||||||
|
stats = {"llm_calls": 0, "fallback_used": 0}
|
||||||
|
|
||||||
|
try:
|
||||||
|
for country, docs in docs_by_country.items():
|
||||||
|
clusters = cluster_country(country, docs)
|
||||||
|
if not clusters:
|
||||||
|
continue # sparse country 자동 제외
|
||||||
|
|
||||||
|
for rank, cluster in enumerate(clusters, start=1):
|
||||||
|
selected = select_for_llm(cluster)
|
||||||
|
stats["llm_calls"] += 1
|
||||||
|
llm_result = await summarize_cluster_with_fallback(client, cluster, selected)
|
||||||
|
if llm_result["llm_fallback_used"]:
|
||||||
|
stats["fallback_used"] += 1
|
||||||
|
all_topic_rows.append(
|
||||||
|
_build_topic_row(country, rank, cluster, selected, llm_result, primary_model)
|
||||||
|
)
|
||||||
|
finally:
|
||||||
|
await client.close()
|
||||||
|
|
||||||
|
generation_ms = int((time.time() - start) * 1000)
|
||||||
|
total_articles = sum(len(d) for d in docs_by_country.values())
|
||||||
|
countries_with_topics = len({r.country for r in all_topic_rows})
|
||||||
|
|
||||||
|
if stats["fallback_used"] == 0:
|
||||||
|
status = "success"
|
||||||
|
elif stats["llm_calls"] and stats["fallback_used"] / stats["llm_calls"] > 0.5:
|
||||||
|
status = "failed"
|
||||||
|
else:
|
||||||
|
status = "partial"
|
||||||
|
|
||||||
|
async with async_session() as session:
|
||||||
|
# idempotent: 같은 날짜 row 가 있으면 CASCADE 로 topics 까지 삭제
|
||||||
|
await session.execute(
|
||||||
|
delete(GlobalDigest).where(GlobalDigest.digest_date == digest_date)
|
||||||
|
)
|
||||||
|
new_digest = GlobalDigest(
|
||||||
|
digest_date=digest_date,
|
||||||
|
window_start=window_start,
|
||||||
|
window_end=window_end,
|
||||||
|
decay_lambda=LAMBDA,
|
||||||
|
total_articles=total_articles,
|
||||||
|
total_countries=countries_with_topics,
|
||||||
|
total_topics=len(all_topic_rows),
|
||||||
|
generation_ms=generation_ms,
|
||||||
|
llm_calls=stats["llm_calls"],
|
||||||
|
llm_failures=stats["fallback_used"],
|
||||||
|
status=status,
|
||||||
|
)
|
||||||
|
new_digest.topics = all_topic_rows
|
||||||
|
session.add(new_digest)
|
||||||
|
await session.commit()
|
||||||
|
|
||||||
|
fallback_pct = (
|
||||||
|
(stats["fallback_used"] / stats["llm_calls"] * 100.0)
|
||||||
|
if stats["llm_calls"] else 0.0
|
||||||
|
)
|
||||||
|
logger.info(
|
||||||
|
f"[global_digest] done countries={countries_with_topics} "
|
||||||
|
f"topics={len(all_topic_rows)} llm_calls={stats['llm_calls']} "
|
||||||
|
f"fallback={stats['fallback_used']}/{stats['llm_calls']} ({fallback_pct:.2f}%) "
|
||||||
|
f"status={status} elapsed={generation_ms / 1000:.1f}s"
|
||||||
|
)
|
||||||
|
|
||||||
|
return {
|
||||||
|
"llm_calls": stats["llm_calls"],
|
||||||
|
"fallback_used": stats["fallback_used"],
|
||||||
|
"total_topics": len(all_topic_rows),
|
||||||
|
"generation_ms": generation_ms,
|
||||||
|
"status": status,
|
||||||
|
}
|
||||||
62
app/services/digest/selection.py
Normal file
62
app/services/digest/selection.py
Normal file
@@ -0,0 +1,62 @@
|
|||||||
|
"""Cluster 내 LLM 입력 선정 — top-k + MMR diversity + ai_summary truncate.
|
||||||
|
|
||||||
|
순수 top-relevance 는 동일 사건 중복 요약문에 편향되므로 MMR 로 다양성 확보.
|
||||||
|
ai_summary 길이는 LLM 토큰 보호를 위해 SUMMARY_TRUNCATE 로 제한.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import numpy as np
|
||||||
|
|
||||||
|
K_PER_CLUSTER = 5
|
||||||
|
LAMBDA_MMR = 0.7 # relevance 70% / diversity 30%
|
||||||
|
SUMMARY_TRUNCATE = 300 # long tail ai_summary 방어
|
||||||
|
|
||||||
|
|
||||||
|
def _normalize(v: np.ndarray) -> np.ndarray:
|
||||||
|
norm = float(np.linalg.norm(v))
|
||||||
|
if norm == 0.0:
|
||||||
|
return v
|
||||||
|
return v / norm
|
||||||
|
|
||||||
|
|
||||||
|
def select_for_llm(cluster: dict, k: int = K_PER_CLUSTER) -> list[dict]:
|
||||||
|
"""cluster 내 LLM 호출용 대표 article 들 선정.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
cluster: clustering.cluster_country 결과 단일 cluster
|
||||||
|
k: 선정 개수 (기본 5)
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
선정된 doc dict 리스트. 각 항목에 ai_summary_truncated 필드가 추가됨.
|
||||||
|
"""
|
||||||
|
members = cluster["members"]
|
||||||
|
if len(members) <= k:
|
||||||
|
selected = list(members)
|
||||||
|
else:
|
||||||
|
centroid = cluster["centroid"]
|
||||||
|
# relevance = centroid 유사도 × decay weight
|
||||||
|
for m in members:
|
||||||
|
v = _normalize(m["embedding"])
|
||||||
|
m["_rel"] = float(np.dot(centroid, v)) * m["weight"]
|
||||||
|
|
||||||
|
first = max(members, key=lambda x: x["_rel"])
|
||||||
|
selected = [first]
|
||||||
|
candidates = [m for m in members if m is not first]
|
||||||
|
|
||||||
|
while len(selected) < k and candidates:
|
||||||
|
def mmr_score(c: dict) -> float:
|
||||||
|
v = _normalize(c["embedding"])
|
||||||
|
max_sim = max(
|
||||||
|
float(np.dot(v, _normalize(s["embedding"])))
|
||||||
|
for s in selected
|
||||||
|
)
|
||||||
|
return LAMBDA_MMR * c["_rel"] - (1.0 - LAMBDA_MMR) * max_sim
|
||||||
|
|
||||||
|
pick = max(candidates, key=mmr_score)
|
||||||
|
selected.append(pick)
|
||||||
|
candidates.remove(pick)
|
||||||
|
|
||||||
|
# LLM 입력 토큰 보호
|
||||||
|
for m in selected:
|
||||||
|
m["ai_summary_truncated"] = (m.get("ai_summary") or "")[:SUMMARY_TRUNCATE]
|
||||||
|
|
||||||
|
return selected
|
||||||
123
app/services/digest/summarizer.py
Normal file
123
app/services/digest/summarizer.py
Normal file
@@ -0,0 +1,123 @@
|
|||||||
|
"""Cluster-level LLM 호출 + JSON 파싱 + timeout + drop금지 fallback.
|
||||||
|
|
||||||
|
핵심 결정:
|
||||||
|
- AIClient._call_chat 직접 호출 (client.py 수정 회피, fallback 로직 재사용)
|
||||||
|
- Semaphore(1) 로 MLX 과부하 회피
|
||||||
|
- Per-call timeout 25초 (asyncio.wait_for) — MLX hang/Ollama stall 방어
|
||||||
|
- JSON 파싱 실패 → 1회 재시도 → 그래도 실패 시 minimal fallback (drop 금지)
|
||||||
|
- fallback: topic_label="주요 뉴스 묶음", summary = top member ai_summary[:200]
|
||||||
|
"""
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
from pathlib import Path
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
|
from ai.client import parse_json_response
|
||||||
|
from core.utils import setup_logger
|
||||||
|
|
||||||
|
logger = setup_logger("digest_summarizer")
|
||||||
|
|
||||||
|
LLM_CALL_TIMEOUT = 25 # 초. MLX 평균 5초 + tail latency 마진
|
||||||
|
FALLBACK_SUMMARY_LIMIT = 200
|
||||||
|
|
||||||
|
_llm_sem = asyncio.Semaphore(1)
|
||||||
|
|
||||||
|
_PROMPT_PATH = Path(__file__).resolve().parent.parent.parent / "prompts" / "digest_topic.txt"
|
||||||
|
_PROMPT_TEMPLATE: str | None = None
|
||||||
|
|
||||||
|
|
||||||
|
def _load_prompt() -> str:
|
||||||
|
global _PROMPT_TEMPLATE
|
||||||
|
if _PROMPT_TEMPLATE is None:
|
||||||
|
_PROMPT_TEMPLATE = _PROMPT_PATH.read_text(encoding="utf-8")
|
||||||
|
return _PROMPT_TEMPLATE
|
||||||
|
|
||||||
|
|
||||||
|
def build_prompt(selected: list[dict]) -> str:
|
||||||
|
"""digest_topic.txt 템플릿에 selected article들의 ai_summary_truncated 주입.
|
||||||
|
|
||||||
|
템플릿 placeholder: {articles_block}
|
||||||
|
"""
|
||||||
|
template = _load_prompt()
|
||||||
|
lines = []
|
||||||
|
for i, m in enumerate(selected, start=1):
|
||||||
|
text = (m.get("ai_summary_truncated") or m.get("ai_summary") or m.get("title") or "").strip()
|
||||||
|
lines.append(f"[{i}] {text}")
|
||||||
|
articles_block = "\n".join(lines)
|
||||||
|
return template.replace("{articles_block}", articles_block)
|
||||||
|
|
||||||
|
|
||||||
|
async def _try_call_llm(client: Any, prompt: str) -> str:
|
||||||
|
"""Semaphore + per-call timeout 으로 감싼 단일 호출."""
|
||||||
|
async with _llm_sem:
|
||||||
|
return await asyncio.wait_for(
|
||||||
|
client._call_chat(client.ai.primary, prompt),
|
||||||
|
timeout=LLM_CALL_TIMEOUT,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def _make_fallback(cluster: dict) -> dict:
|
||||||
|
"""cluster 의 top member 데이터로 minimal fallback 생성 — 정보 손실 회피."""
|
||||||
|
members = cluster["members"]
|
||||||
|
if not members:
|
||||||
|
return {
|
||||||
|
"topic_label": "주요 뉴스 묶음",
|
||||||
|
"summary": "",
|
||||||
|
"llm_fallback_used": True,
|
||||||
|
}
|
||||||
|
top = max(members, key=lambda m: m.get("_rel", m.get("weight", 0.0)))
|
||||||
|
text = (top.get("ai_summary") or top.get("title") or "").strip()
|
||||||
|
return {
|
||||||
|
"topic_label": "주요 뉴스 묶음",
|
||||||
|
"summary": text[:FALLBACK_SUMMARY_LIMIT],
|
||||||
|
"llm_fallback_used": True,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
async def summarize_cluster_with_fallback(
|
||||||
|
client: Any,
|
||||||
|
cluster: dict,
|
||||||
|
selected: list[dict],
|
||||||
|
) -> dict:
|
||||||
|
"""cluster 1개에 대해 LLM 호출 + JSON 파싱 + fallback.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
{topic_label, summary, llm_fallback_used}
|
||||||
|
"""
|
||||||
|
prompt = build_prompt(selected)
|
||||||
|
|
||||||
|
for attempt in range(2): # 1회 재시도 포함
|
||||||
|
try:
|
||||||
|
raw = await _try_call_llm(client, prompt)
|
||||||
|
except asyncio.TimeoutError:
|
||||||
|
logger.warning(
|
||||||
|
f"LLM 호출 timeout {LLM_CALL_TIMEOUT}s "
|
||||||
|
f"(attempt={attempt + 1}, cluster size={len(cluster['members'])})"
|
||||||
|
)
|
||||||
|
continue
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning(
|
||||||
|
f"LLM 호출 실패 attempt={attempt + 1} "
|
||||||
|
f"(cluster size={len(cluster['members'])}): {e}"
|
||||||
|
)
|
||||||
|
continue
|
||||||
|
|
||||||
|
parsed = parse_json_response(raw)
|
||||||
|
if (
|
||||||
|
parsed
|
||||||
|
and isinstance(parsed.get("topic_label"), str)
|
||||||
|
and isinstance(parsed.get("summary"), str)
|
||||||
|
and parsed["topic_label"].strip()
|
||||||
|
and parsed["summary"].strip()
|
||||||
|
):
|
||||||
|
return {
|
||||||
|
"topic_label": parsed["topic_label"].strip(),
|
||||||
|
"summary": parsed["summary"].strip(),
|
||||||
|
"llm_fallback_used": False,
|
||||||
|
}
|
||||||
|
logger.warning(
|
||||||
|
f"JSON 파싱 실패 attempt={attempt + 1} "
|
||||||
|
f"(cluster size={len(cluster['members'])}, raw_len={len(raw) if raw else 0})"
|
||||||
|
)
|
||||||
|
|
||||||
|
return _make_fallback(cluster)
|
||||||
@@ -1,5 +1,407 @@
|
|||||||
"""Evidence extraction 서비스 (Phase 3).
|
"""Evidence extraction 서비스 (Phase 3.2).
|
||||||
|
|
||||||
reranked chunks에서 query-relevant span을 rule + LLM hybrid로 추출.
|
reranker 결과 chunks 에서 query-relevant span 을 구조적으로 추출한다.
|
||||||
구현은 Phase 3에서 채움.
|
|
||||||
|
## 설계 (EV-A: Rule + LLM span select)
|
||||||
|
|
||||||
|
```
|
||||||
|
reranked results
|
||||||
|
↓
|
||||||
|
[rule filter] score >= 0.25, max_per_doc=2, top MAX_EVIDENCE_CANDIDATES
|
||||||
|
↓
|
||||||
|
[snippet 재윈도우] _extract_window(full, query, 800) — LLM 입력용
|
||||||
|
↓
|
||||||
|
[1 batched LLM call] gemma-4 via get_mlx_gate() (single inference)
|
||||||
|
↓
|
||||||
|
[post-process]
|
||||||
|
- relevance >= 0.5 필터
|
||||||
|
- span too-short (< 80자) → _extract_window(full, query, 120) 로 재확장
|
||||||
|
- span too-long (> 300자) → cut
|
||||||
|
- doc-group ordering (검색 결과 doc 순서 유지, doc 내부만 relevance desc)
|
||||||
|
- n 재부여 (1..N)
|
||||||
|
↓
|
||||||
|
EvidenceItem 리스트
|
||||||
|
```
|
||||||
|
|
||||||
|
## 영구 룰
|
||||||
|
|
||||||
|
- **LLM 호출은 1번만** (batched). 순차 호출 절대 금지 — MLX single-inference
|
||||||
|
큐가 폭발한다.
|
||||||
|
- **모든 MLX 호출은 `get_mlx_gate()` 경유**. analyzer / synthesis 와 동일
|
||||||
|
semaphore 공유.
|
||||||
|
- **fallback span 도 query 중심 window**. `full_snippet[:200]` 같은 "앞에서부터
|
||||||
|
자르기" 절대 금지. 조용한 품질 붕괴 (citation 은 멀쩡한데 실제 span 이 query
|
||||||
|
와 무관) 대표 사례.
|
||||||
|
- **Span too-short 보정 필수**: `len(span) < 80` 이면 자동 확장. "짧을수록
|
||||||
|
정확" 이 아니라 **짧으면 위험** — synthesis LLM 이 문맥 부족으로 이어 만들기
|
||||||
|
(soft hallucination) 를 한다.
|
||||||
|
- **Evidence ordering 은 doc-group 유지**. 전역 relevance desc 정렬 금지.
|
||||||
|
answer 는 [1][2][3] 순서로 생성되고 그 순서가 문맥 흐름을 결정한다.
|
||||||
|
|
||||||
|
## 확장 여지 (지금은 비활성)
|
||||||
|
|
||||||
|
`EVIDENCE_FAST_PATH_THRESHOLD` 가 `None` 이 아니고 `results[0].rerank_score >=
|
||||||
|
THRESHOLD` 이면 LLM 호출 스킵 후 rule-only 경로로 즉시 반환. Activation 조건:
|
||||||
|
(1) evidence LLM 호출 비율 > 80%, (2) /ask 평균 latency > 15s, (3) rerank
|
||||||
|
top1 p50 > 0.75. 셋 다 충족해야 켠다.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
import time
|
||||||
|
from dataclasses import dataclass, field
|
||||||
|
from typing import TYPE_CHECKING
|
||||||
|
|
||||||
|
from ai.client import AIClient, _load_prompt, parse_json_response
|
||||||
|
from core.utils import setup_logger
|
||||||
|
|
||||||
|
from .llm_gate import get_mlx_gate
|
||||||
|
from .rerank_service import _extract_window
|
||||||
|
|
||||||
|
if TYPE_CHECKING:
|
||||||
|
from api.search import SearchResult
|
||||||
|
|
||||||
|
logger = setup_logger("evidence")
|
||||||
|
|
||||||
|
# ─── 상수 (plan 영구 룰) ─────────────────────────────────
|
||||||
|
EVIDENCE_MIN_RERANK = 0.25 # 1차 rule cut — rerank score 이 미만은 제외
|
||||||
|
MAX_EVIDENCE_CANDIDATES = 6 # LLM 입력 상한
|
||||||
|
MAX_PER_DOC = 2
|
||||||
|
CANDIDATE_SNIPPET_CHARS = 800 # LLM 이 볼 원문 창 크기
|
||||||
|
|
||||||
|
MIN_RELEVANCE_KEEP = 0.5 # LLM 출력 필터
|
||||||
|
SPAN_MIN_CHARS = 80 # 이 미만이면 window enlarge
|
||||||
|
SPAN_ENLARGE_TARGET = 120 # enlarge 시 재윈도우 target_chars
|
||||||
|
SPAN_MAX_CHARS = 300 # 이 초과면 cut (synthesis token budget 보호)
|
||||||
|
|
||||||
|
LLM_TIMEOUT_MS = 15000
|
||||||
|
PROMPT_VERSION = "v1"
|
||||||
|
|
||||||
|
# 확장 여지 — None 이면 비활성 (baseline). 실측 후 0.8 등으로 켠다.
|
||||||
|
EVIDENCE_FAST_PATH_THRESHOLD: float | None = None
|
||||||
|
|
||||||
|
|
||||||
|
# ─── 반환 타입 ───────────────────────────────────────────
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass(slots=True)
|
||||||
|
class EvidenceItem:
|
||||||
|
"""LLM 또는 rule fallback 이 추출한 단일 evidence span.
|
||||||
|
|
||||||
|
n 은 doc-group ordering + relevance 정렬 후 1부터 재부여된다.
|
||||||
|
`full_snippet` 은 **synthesis 프롬프트에 절대 포함 금지** — debug / citation
|
||||||
|
원문 보기 전용.
|
||||||
|
"""
|
||||||
|
|
||||||
|
n: int # 1-based, synthesis 프롬프트의 [n] 과 매핑
|
||||||
|
chunk_id: int | None
|
||||||
|
doc_id: int
|
||||||
|
title: str | None
|
||||||
|
section_title: str | None
|
||||||
|
span_text: str # LLM 추출 (또는 rule fallback) span, 80~300자
|
||||||
|
relevance: float # LLM 0~1 (fallback 시 rerank_score 복사)
|
||||||
|
rerank_score: float # raw reranker 점수
|
||||||
|
full_snippet: str # 원본 800자 (debug/citation 전용, synthesis 금지)
|
||||||
|
|
||||||
|
|
||||||
|
# ─── 프롬프트 로딩 (module 초기화 1회) ───────────────────
|
||||||
|
try:
|
||||||
|
EVIDENCE_PROMPT = _load_prompt("evidence_extract.txt")
|
||||||
|
except FileNotFoundError:
|
||||||
|
EVIDENCE_PROMPT = ""
|
||||||
|
logger.warning(
|
||||||
|
"evidence_extract.txt not found — evidence_service will always use rule-only fallback"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
# ─── Helper: candidates → LLM 입력 블록 ──────────────────
|
||||||
|
|
||||||
|
|
||||||
|
def _build_numbered_candidates(
|
||||||
|
candidates: list["SearchResult"], query: str
|
||||||
|
) -> tuple[str, list[str]]:
|
||||||
|
"""LLM 프롬프트의 {numbered_candidates} 블록 + 재윈도우된 full_snippet 리스트.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
(block_str, full_snippets) — full_snippets[i] 는 1-based n=i+1 의 원문
|
||||||
|
"""
|
||||||
|
lines: list[str] = []
|
||||||
|
full_snippets: list[str] = []
|
||||||
|
for i, c in enumerate(candidates, 1):
|
||||||
|
title = (c.title or "").strip()
|
||||||
|
raw_text = c.snippet or ""
|
||||||
|
full = _extract_window(raw_text, query, target_chars=CANDIDATE_SNIPPET_CHARS)
|
||||||
|
full_snippets.append(full)
|
||||||
|
lines.append(f"[{i}] title: {title} / text: {full}")
|
||||||
|
return "\n".join(lines), full_snippets
|
||||||
|
|
||||||
|
|
||||||
|
# ─── Helper: span length 보정 ───────────────────────────
|
||||||
|
|
||||||
|
|
||||||
|
def _normalize_span(span: str, full: str, query: str) -> tuple[str, bool]:
|
||||||
|
"""span 을 SPAN_MIN_CHARS ~ SPAN_MAX_CHARS 범위로 보정.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
(normalized_span, was_expanded)
|
||||||
|
- was_expanded=True 이면 "short_span_expanded" 로그 대상
|
||||||
|
"""
|
||||||
|
s = (span or "").strip()
|
||||||
|
expanded = False
|
||||||
|
if len(s) < SPAN_MIN_CHARS:
|
||||||
|
# soft hallucination 방어 — query 중심으로 window 재확장
|
||||||
|
s = _extract_window(full, query, target_chars=SPAN_ENLARGE_TARGET)
|
||||||
|
expanded = True
|
||||||
|
if len(s) > SPAN_MAX_CHARS:
|
||||||
|
s = s[:SPAN_MAX_CHARS]
|
||||||
|
return s, expanded
|
||||||
|
|
||||||
|
|
||||||
|
# ─── Helper: doc-group ordering ─────────────────────────
|
||||||
|
|
||||||
|
|
||||||
|
def _apply_doc_group_ordering(
|
||||||
|
items: list[EvidenceItem],
|
||||||
|
results: list["SearchResult"],
|
||||||
|
) -> list[EvidenceItem]:
|
||||||
|
"""검색 결과 doc 순서 유지 + doc 내부만 relevance desc + n 재부여.
|
||||||
|
|
||||||
|
answer 는 [1][2][3] 순서로 생성되고 그 순서가 문맥 흐름을 결정한다.
|
||||||
|
전역 relevance desc 정렬은 "doc A span1 → doc B span1 → doc A span2"
|
||||||
|
처럼 튀면서 읽기 이상한 답변을 만든다.
|
||||||
|
"""
|
||||||
|
if not items:
|
||||||
|
return []
|
||||||
|
doc_order: dict[int, int] = {}
|
||||||
|
for idx, r in enumerate(results):
|
||||||
|
if r.id not in doc_order:
|
||||||
|
doc_order[r.id] = idx
|
||||||
|
# 정렬: (doc 순서, -relevance)
|
||||||
|
items.sort(
|
||||||
|
key=lambda it: (doc_order.get(it.doc_id, 9999), -it.relevance)
|
||||||
|
)
|
||||||
|
# n 재부여
|
||||||
|
for new_n, it in enumerate(items, 1):
|
||||||
|
it.n = new_n
|
||||||
|
return items
|
||||||
|
|
||||||
|
|
||||||
|
# ─── Helper: rule-only fallback ─────────────────────────
|
||||||
|
|
||||||
|
|
||||||
|
def _build_rule_only_evidence(
|
||||||
|
candidates: list["SearchResult"],
|
||||||
|
full_snippets: list[str],
|
||||||
|
query: str,
|
||||||
|
) -> list[EvidenceItem]:
|
||||||
|
"""LLM 실패/timeout 시 rule-only 경로.
|
||||||
|
|
||||||
|
⚠ `full_snippet[:200]` 같은 앞자르기 금지. 반드시 `_extract_window` 로
|
||||||
|
query 중심 윈도우를 만든다. relevance 는 rerank_score 복사.
|
||||||
|
"""
|
||||||
|
items: list[EvidenceItem] = []
|
||||||
|
for i, (c, full) in enumerate(zip(candidates, full_snippets), 1):
|
||||||
|
span = _extract_window(full, query, target_chars=200)
|
||||||
|
# 정규화 (보통 여기서는 SPAN_MIN_CHARS 이상이지만 안전장치)
|
||||||
|
span, _expanded = _normalize_span(span, full, query)
|
||||||
|
items.append(
|
||||||
|
EvidenceItem(
|
||||||
|
n=i,
|
||||||
|
chunk_id=c.chunk_id,
|
||||||
|
doc_id=c.id,
|
||||||
|
title=c.title,
|
||||||
|
section_title=c.section_title,
|
||||||
|
span_text=span,
|
||||||
|
relevance=float(c.rerank_score or c.score or 0.0),
|
||||||
|
rerank_score=float(c.rerank_score or c.score or 0.0),
|
||||||
|
full_snippet=full,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
return items
|
||||||
|
|
||||||
|
|
||||||
|
# ─── Core: extract_evidence ─────────────────────────────
|
||||||
|
|
||||||
|
|
||||||
|
async def extract_evidence(
|
||||||
|
query: str,
|
||||||
|
results: list["SearchResult"],
|
||||||
|
ai_client: AIClient | None = None,
|
||||||
|
) -> tuple[list[EvidenceItem], str | None]:
|
||||||
|
"""reranked results → EvidenceItem 리스트.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
(items, skip_reason)
|
||||||
|
skip_reason ∈ {None, "empty_retrieval", "all_low_rerank", "fast_path",
|
||||||
|
"llm_timeout_fallback_rule", "llm_error_fallback_rule",
|
||||||
|
"parse_failed_fallback_rule", "all_llm_rejected"}
|
||||||
|
- skip_reason 이 None 이 아니어도 items 는 비어있지 않을 수 있다
|
||||||
|
(fallback/fast_path 경로).
|
||||||
|
"""
|
||||||
|
if not results:
|
||||||
|
return [], "empty_retrieval"
|
||||||
|
|
||||||
|
# ── 1차 rule filter: rerank_score >= EVIDENCE_MIN_RERANK + max_per_doc ──
|
||||||
|
candidates: list["SearchResult"] = []
|
||||||
|
per_doc: dict[int, int] = {}
|
||||||
|
for r in results:
|
||||||
|
raw_score = r.rerank_score if r.rerank_score is not None else r.score
|
||||||
|
if raw_score is None or raw_score < EVIDENCE_MIN_RERANK:
|
||||||
|
continue
|
||||||
|
if per_doc.get(r.id, 0) >= MAX_PER_DOC:
|
||||||
|
continue
|
||||||
|
candidates.append(r)
|
||||||
|
per_doc[r.id] = per_doc.get(r.id, 0) + 1
|
||||||
|
if len(candidates) >= MAX_EVIDENCE_CANDIDATES:
|
||||||
|
break
|
||||||
|
|
||||||
|
if not candidates:
|
||||||
|
return [], "all_low_rerank"
|
||||||
|
|
||||||
|
# ── Fast-path (현재 비활성) ─────────────────────────
|
||||||
|
if EVIDENCE_FAST_PATH_THRESHOLD is not None:
|
||||||
|
# ⚠ display score 가 아니라 raw rerank_score 로 판단.
|
||||||
|
# normalize_display_scores 를 거친 r.score 는 frontend 용 리스케일
|
||||||
|
# 값이라 distribution drift 가능. fast-path 는 reranker raw 신호가 안전.
|
||||||
|
top_rerank = (
|
||||||
|
results[0].rerank_score if results[0].rerank_score is not None else 0.0
|
||||||
|
)
|
||||||
|
if top_rerank is not None and top_rerank >= EVIDENCE_FAST_PATH_THRESHOLD:
|
||||||
|
_block, full_snippets = _build_numbered_candidates(candidates, query)
|
||||||
|
items = _build_rule_only_evidence(candidates, full_snippets, query)
|
||||||
|
items = _apply_doc_group_ordering(items, results)
|
||||||
|
logger.info(
|
||||||
|
"evidence fast_path query=%r candidates=%d kept=%d top_rerank=%.2f",
|
||||||
|
query[:80], len(candidates), len(items), top_rerank,
|
||||||
|
)
|
||||||
|
return items, "fast_path"
|
||||||
|
|
||||||
|
# ── LLM 호출 준비 ───────────────────────────────────
|
||||||
|
if not EVIDENCE_PROMPT:
|
||||||
|
# 프롬프트 미로딩 → rule-only
|
||||||
|
_block, full_snippets = _build_numbered_candidates(candidates, query)
|
||||||
|
items = _build_rule_only_evidence(candidates, full_snippets, query)
|
||||||
|
items = _apply_doc_group_ordering(items, results)
|
||||||
|
logger.warning(
|
||||||
|
"evidence prompt_not_loaded → rule fallback query=%r kept=%d",
|
||||||
|
query[:80], len(items),
|
||||||
|
)
|
||||||
|
return items, "llm_error_fallback_rule"
|
||||||
|
|
||||||
|
block, full_snippets = _build_numbered_candidates(candidates, query)
|
||||||
|
prompt = EVIDENCE_PROMPT.replace("{query}", query).replace(
|
||||||
|
"{numbered_candidates}", block
|
||||||
|
)
|
||||||
|
|
||||||
|
client_owned = False
|
||||||
|
if ai_client is None:
|
||||||
|
ai_client = AIClient()
|
||||||
|
client_owned = True
|
||||||
|
|
||||||
|
t_start = time.perf_counter()
|
||||||
|
raw: str | None = None
|
||||||
|
llm_error: str | None = None
|
||||||
|
|
||||||
|
try:
|
||||||
|
# ⚠ semaphore 대기는 timeout 바깥. timeout 은 실제 LLM 호출에만.
|
||||||
|
async with get_mlx_gate():
|
||||||
|
async with asyncio.timeout(LLM_TIMEOUT_MS / 1000):
|
||||||
|
raw = await ai_client._call_chat(ai_client.ai.primary, prompt)
|
||||||
|
except asyncio.TimeoutError:
|
||||||
|
llm_error = "timeout"
|
||||||
|
except Exception as exc:
|
||||||
|
llm_error = f"llm_error:{type(exc).__name__}"
|
||||||
|
finally:
|
||||||
|
if client_owned:
|
||||||
|
try:
|
||||||
|
await ai_client.close()
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
elapsed_ms = (time.perf_counter() - t_start) * 1000
|
||||||
|
|
||||||
|
# ── LLM 실패 → rule fallback ────────────────────────
|
||||||
|
if llm_error is not None:
|
||||||
|
items = _build_rule_only_evidence(candidates, full_snippets, query)
|
||||||
|
items = _apply_doc_group_ordering(items, results)
|
||||||
|
logger.warning(
|
||||||
|
"evidence LLM %s → rule fallback query=%r candidates=%d kept=%d elapsed_ms=%.0f",
|
||||||
|
llm_error, query[:80], len(candidates), len(items), elapsed_ms,
|
||||||
|
)
|
||||||
|
return items, "llm_timeout_fallback_rule" if llm_error == "timeout" else "llm_error_fallback_rule"
|
||||||
|
|
||||||
|
parsed = parse_json_response(raw or "")
|
||||||
|
if not isinstance(parsed, dict) or not isinstance(parsed.get("items"), list):
|
||||||
|
items = _build_rule_only_evidence(candidates, full_snippets, query)
|
||||||
|
items = _apply_doc_group_ordering(items, results)
|
||||||
|
logger.warning(
|
||||||
|
"evidence parse_failed → rule fallback query=%r raw=%r elapsed_ms=%.0f",
|
||||||
|
query[:80], (raw or "")[:200], elapsed_ms,
|
||||||
|
)
|
||||||
|
return items, "parse_failed_fallback_rule"
|
||||||
|
|
||||||
|
# ── LLM 출력 파싱 ──────────────────────────────────
|
||||||
|
short_span_expanded = 0
|
||||||
|
llm_items: list[EvidenceItem] = []
|
||||||
|
for entry in parsed["items"]:
|
||||||
|
if not isinstance(entry, dict):
|
||||||
|
continue
|
||||||
|
try:
|
||||||
|
n_raw = int(entry.get("n", 0))
|
||||||
|
except (TypeError, ValueError):
|
||||||
|
continue
|
||||||
|
if n_raw < 1 or n_raw > len(candidates):
|
||||||
|
continue
|
||||||
|
try:
|
||||||
|
relevance = float(entry.get("relevance", 0.0) or 0.0)
|
||||||
|
except (TypeError, ValueError):
|
||||||
|
relevance = 0.0
|
||||||
|
if relevance < MIN_RELEVANCE_KEEP:
|
||||||
|
continue
|
||||||
|
span_raw = entry.get("span")
|
||||||
|
if not isinstance(span_raw, str) or not span_raw.strip():
|
||||||
|
continue
|
||||||
|
|
||||||
|
candidate = candidates[n_raw - 1]
|
||||||
|
full = full_snippets[n_raw - 1]
|
||||||
|
span, expanded = _normalize_span(span_raw, full, query)
|
||||||
|
if expanded:
|
||||||
|
short_span_expanded += 1
|
||||||
|
|
||||||
|
llm_items.append(
|
||||||
|
EvidenceItem(
|
||||||
|
n=n_raw, # doc-group ordering 에서 재부여됨
|
||||||
|
chunk_id=candidate.chunk_id,
|
||||||
|
doc_id=candidate.id,
|
||||||
|
title=candidate.title,
|
||||||
|
section_title=candidate.section_title,
|
||||||
|
span_text=span,
|
||||||
|
relevance=relevance,
|
||||||
|
rerank_score=float(
|
||||||
|
candidate.rerank_score
|
||||||
|
if candidate.rerank_score is not None
|
||||||
|
else (candidate.score or 0.0)
|
||||||
|
),
|
||||||
|
full_snippet=full,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
# ── LLM 이 전부 reject → rule fallback ──────────────
|
||||||
|
if not llm_items:
|
||||||
|
items = _build_rule_only_evidence(candidates, full_snippets, query)
|
||||||
|
items = _apply_doc_group_ordering(items, results)
|
||||||
|
logger.warning(
|
||||||
|
"evidence all_llm_rejected → rule fallback query=%r elapsed_ms=%.0f",
|
||||||
|
query[:80], elapsed_ms,
|
||||||
|
)
|
||||||
|
return items, "all_llm_rejected"
|
||||||
|
|
||||||
|
# ── doc-group ordering + n 재부여 ───────────────────
|
||||||
|
llm_items = _apply_doc_group_ordering(llm_items, results)
|
||||||
|
|
||||||
|
logger.info(
|
||||||
|
"evidence ok query=%r candidates=%d kept=%d short_span_expanded=%d elapsed_ms=%.0f",
|
||||||
|
query[:80], len(candidates), len(llm_items), short_span_expanded, elapsed_ms,
|
||||||
|
)
|
||||||
|
return llm_items, None
|
||||||
|
|||||||
@@ -219,6 +219,62 @@ def get_strategy(name: str) -> FusionStrategy:
|
|||||||
return cls()
|
return cls()
|
||||||
|
|
||||||
|
|
||||||
|
# ─── Phase 2.3: soft filter boost ───────────────────────
|
||||||
|
|
||||||
|
SOFT_FILTER_MAX_BOOST = 0.05 # plan 룰 (CRITICAL)
|
||||||
|
# ↑ RRF score는 0.01~0.05 범위 (k=60). 상한 초과 시 기존 랭킹 왜곡.
|
||||||
|
# 기존 RRFWithBoost의 legal article boost(0.05)와 동일 최대값 → 일관성.
|
||||||
|
SOFT_FILTER_DOMAIN_BOOST = 0.01 # 2026-04-08 실측: 0.03은 exact_keyword -0.03 악화
|
||||||
|
# ↑ 낮게 잡는 이유: soft_filter는 "같은 도메인 doc을 동등하게 boost" → exact match
|
||||||
|
# doc의 상대 우위가 손상됨. 0.01 수준이면 fusion 내부 순위 역전 확률 최소.
|
||||||
|
|
||||||
|
|
||||||
|
def apply_soft_filter_boost(
|
||||||
|
results: list["SearchResult"],
|
||||||
|
soft_filters: dict | None,
|
||||||
|
) -> int:
|
||||||
|
"""Phase 2.3 — QueryAnalyzer soft_filters.domain 기반 약한 score boost.
|
||||||
|
|
||||||
|
ai_domain 정확 매칭 시 SOFT_FILTER_DOMAIN_BOOST(0.01) 1회 가산.
|
||||||
|
document_type 매칭은 v0.1 평가셋에서 효과 측정 불가 + false positive 많음 → 제외.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
results: fusion 직후 SearchResult 리스트 (in-place 수정)
|
||||||
|
soft_filters: query_analysis.soft_filters = {"domain": [...]}
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
int — boost 적용된 결과 개수 (debug/notes용)
|
||||||
|
"""
|
||||||
|
if not soft_filters:
|
||||||
|
return 0
|
||||||
|
domain_list = [str(d).lower() for d in soft_filters.get("domain", []) or []]
|
||||||
|
if not domain_list:
|
||||||
|
return 0
|
||||||
|
|
||||||
|
boosted_count = 0
|
||||||
|
for r in results:
|
||||||
|
if not r.ai_domain:
|
||||||
|
continue
|
||||||
|
ai_dom_lower = r.ai_domain.lower()
|
||||||
|
# 정확 매칭 또는 subdirectory 매칭 ("Industrial_Safety/Legislation" → "industrial_safety" 매칭)
|
||||||
|
matched = False
|
||||||
|
for d in domain_list:
|
||||||
|
if d == ai_dom_lower:
|
||||||
|
matched = True
|
||||||
|
break
|
||||||
|
# path 레벨 매칭: "industrial_safety/legislation" in "industrial_safety/legislation/act"
|
||||||
|
if d in ai_dom_lower and "/" in d:
|
||||||
|
matched = True
|
||||||
|
break
|
||||||
|
if matched:
|
||||||
|
r.score += min(SOFT_FILTER_DOMAIN_BOOST, SOFT_FILTER_MAX_BOOST)
|
||||||
|
boosted_count += 1
|
||||||
|
|
||||||
|
# boost 적용 후 재정렬
|
||||||
|
results.sort(key=lambda x: x.score, reverse=True)
|
||||||
|
return boosted_count
|
||||||
|
|
||||||
|
|
||||||
# ─── display score 정규화 ────────────────────────────────
|
# ─── display score 정규화 ────────────────────────────────
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
58
app/services/search/llm_gate.py
Normal file
58
app/services/search/llm_gate.py
Normal file
@@ -0,0 +1,58 @@
|
|||||||
|
"""MLX single-inference 전역 gate (Phase 3.1.1).
|
||||||
|
|
||||||
|
Mac mini MLX primary(gemma-4-26b-a4b-it-8bit)는 **single-inference**다.
|
||||||
|
동시 호출이 들어오면 queue가 폭발한다(실측: 23 concurrent 요청 → 22개 15초 timeout).
|
||||||
|
|
||||||
|
이 모듈은 analyzer / evidence / synthesis 등 **모든 MLX-bound LLM 호출**이
|
||||||
|
공유하는 `asyncio.Semaphore(1)`를 제공한다. MLX를 호출하는 경로는 예외 없이
|
||||||
|
`async with get_mlx_gate():` 블록 안에서만 `AIClient._call_chat(ai.primary, ...)`
|
||||||
|
를 호출해야 한다.
|
||||||
|
|
||||||
|
## 영구 룰
|
||||||
|
|
||||||
|
- **MLX primary 호출 경로는 예외 없이 gate 획득 필수**. query_analyzer /
|
||||||
|
evidence_service / synthesis_service 세 곳이 현재 사용자. 이후 경로가 늘어도
|
||||||
|
동일 gate를 import해서 사용한다. 새 Semaphore를 만들지 말 것 (큐 분할 시
|
||||||
|
동시 실행 발생).
|
||||||
|
- **`asyncio.timeout(...)`은 gate 안쪽에서만 적용**. gate 대기 자체에 timeout을
|
||||||
|
걸면 "대기만으로 timeout 발동" 버그가 재발한다(query_analyzer 초기 이슈).
|
||||||
|
- **fallback(Ollama) 경로는 gate 제외**. GPU Ollama는 concurrent OK. 단 현재
|
||||||
|
구현상 `AIClient._call_chat` 내부에서 primary→fallback 전환이 일어나므로
|
||||||
|
fallback도 gate 점유 상태로 실행된다. 허용 가능(fallback 빈도 낮음).
|
||||||
|
- **MLX concurrency는 `MLX_CONCURRENCY = 1` 고정**. 모델이 바뀌어도 single-
|
||||||
|
inference 특성이 깨지지 않는 한 이 값을 올리지 말 것.
|
||||||
|
|
||||||
|
## 확장 여지 (지금은 구현하지 않음)
|
||||||
|
|
||||||
|
트래픽 증가 시 "우선순위 역전"(/ask가 analyzer background task 뒤에 밀림)이
|
||||||
|
문제가 되면 `asyncio.PriorityQueue` 기반 우선순위 큐로 교체 가능. Gate 자체
|
||||||
|
분리(get_analyzer_gate / get_ask_gate)는 single-inference에서 throughput
|
||||||
|
개선이 없으므로 의미 없음.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
|
||||||
|
# MLX primary는 single-inference → 1
|
||||||
|
MLX_CONCURRENCY = 1
|
||||||
|
|
||||||
|
# 첫 호출 시 현재 event loop에 바인딩된 Semaphore 생성 (lazy init)
|
||||||
|
_mlx_gate: asyncio.Semaphore | None = None
|
||||||
|
|
||||||
|
|
||||||
|
def get_mlx_gate() -> asyncio.Semaphore:
|
||||||
|
"""MLX primary 호출 경로 공용 gate. 최초 호출 시 lazy init.
|
||||||
|
|
||||||
|
사용 예:
|
||||||
|
async with get_mlx_gate():
|
||||||
|
async with asyncio.timeout(LLM_TIMEOUT_MS / 1000):
|
||||||
|
raw = await ai_client._call_chat(ai_client.ai.primary, prompt)
|
||||||
|
|
||||||
|
⚠ `asyncio.timeout`은 반드시 gate 안쪽에 둘 것. 바깥에 두면 gate 대기만으로
|
||||||
|
timeout이 발동한다.
|
||||||
|
"""
|
||||||
|
global _mlx_gate
|
||||||
|
if _mlx_gate is None:
|
||||||
|
_mlx_gate = asyncio.Semaphore(MLX_CONCURRENCY)
|
||||||
|
return _mlx_gate
|
||||||
@@ -1,5 +1,443 @@
|
|||||||
"""Query analyzer — 자연어 쿼리 분석 (Phase 2).
|
"""Query analyzer — 자연어 쿼리 분석 (Phase 2.1, async-only 구조).
|
||||||
|
|
||||||
domain_hint, intent, hard/soft filter, normalized_queries 등 추출.
|
**핵심 철학** (memory `feedback_analyzer_async_only.md`):
|
||||||
구현은 Phase 2에서 채움.
|
> QueryAnalyzer는 "즉시 실행하는 기능"이 아니라 "미리 준비해두는 기능"이다.
|
||||||
|
|
||||||
|
Retrieval 경로에서 analyzer를 **동기 호출 금지**.
|
||||||
|
동기 호출 가능한 API는 prewarm 전용.
|
||||||
|
|
||||||
|
## Pipeline
|
||||||
|
|
||||||
|
```
|
||||||
|
query → retrieval (항상 즉시)
|
||||||
|
↘ trigger_background_analysis (fire-and-forget)
|
||||||
|
→ analyze() [5초+] → cache 저장
|
||||||
|
|
||||||
|
다음 호출 (동일 쿼리) → get_cached() 히트 → Phase 2 파이프라인 활성화
|
||||||
|
```
|
||||||
|
|
||||||
|
## 룰 (plan 영구)
|
||||||
|
- `LLM_TIMEOUT_MS = 5000` (background 이므로 여유 OK)
|
||||||
|
- `MAX_NORMALIZED_QUERIES = 3` (multilingual explosion 방지)
|
||||||
|
- weight 합 = 1.0 정규화 필수 (fusion 왜곡 방지)
|
||||||
|
- 실패/저신뢰(< 0.5) 결과는 캐시 금지 (잘못된 분석 고정 방지)
|
||||||
|
- `analyzer_confidence` default `float 0.0` 강제 (None 금지)
|
||||||
|
- analyze() 동기 호출 금지. retrieval 경로는 `get_cached()` + `trigger_background_analysis()` 만 사용.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
import hashlib
|
||||||
|
import time
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
|
from ai.client import AIClient, _load_prompt, parse_json_response
|
||||||
|
from core.config import settings
|
||||||
|
from core.utils import setup_logger
|
||||||
|
|
||||||
|
from .llm_gate import get_mlx_gate
|
||||||
|
|
||||||
|
logger = setup_logger("query_analyzer")
|
||||||
|
|
||||||
|
# ─── 상수 (plan 영구 룰) ────────────────────────────────
|
||||||
|
PROMPT_VERSION = "v2" # prompts/query_analyze.txt 축소판
|
||||||
|
CACHE_TTL = 86400 # 24h
|
||||||
|
CACHE_MAXSIZE = 1000
|
||||||
|
LLM_TIMEOUT_MS = 15000 # async 구조 (background), 동기 경로 금지
|
||||||
|
# ↑ 실측: gemma-4-26b-a4b-it-8bit MLX, 축소 프롬프트(prompt_tok=802) 7~11초.
|
||||||
|
# generation이 dominant (max_tokens 무효, 자연 EOS ~289 tok 생성).
|
||||||
|
# background 실행이라 15초도 안전. 상향 필요 시 여기서만 조정.
|
||||||
|
LLM_CONCURRENCY = 1 # MLX는 single-inference, 동시 호출 시 queue 폭발.
|
||||||
|
# ↑ 실측: 23 concurrent 요청 → 22개 15초 timeout. semaphore로 순차 강제.
|
||||||
|
# prewarm/background/동기 호출 모두 이 semaphore 경유.
|
||||||
|
MIN_CACHE_CONFIDENCE = 0.5 # 이 미만은 캐시 금지
|
||||||
|
MAX_NORMALIZED_QUERIES = 3
|
||||||
|
|
||||||
|
|
||||||
|
def _model_version() -> str:
|
||||||
|
"""현재 primary 모델 ID를 캐시 키에 반영."""
|
||||||
|
if settings.ai and settings.ai.primary:
|
||||||
|
return settings.ai.primary.model
|
||||||
|
return "unknown-model"
|
||||||
|
|
||||||
|
|
||||||
|
# ─── in-memory LRU (FIFO 근사) ──────────────────────────
|
||||||
|
_CACHE: dict[str, dict[str, Any]] = {}
|
||||||
|
|
||||||
|
# background task 참조 유지 (premature GC 방지)
|
||||||
|
_PENDING: set[asyncio.Task[Any]] = set()
|
||||||
|
# 동일 쿼리 중복 실행 방지 (진행 중인 쿼리 집합)
|
||||||
|
_INFLIGHT: set[str] = set()
|
||||||
|
|
||||||
|
|
||||||
|
def _get_llm_semaphore() -> asyncio.Semaphore:
|
||||||
|
"""MLX single-inference gate를 반환. Phase 3.1부터 llm_gate.get_mlx_gate()
|
||||||
|
로 위임 — analyzer / evidence / synthesis 가 동일 semaphore 공유.
|
||||||
|
|
||||||
|
`LLM_CONCURRENCY` 상수는 하위 호환/문서용으로 유지하되, 실제 bound는
|
||||||
|
`llm_gate.MLX_CONCURRENCY` 가 담당한다.
|
||||||
|
"""
|
||||||
|
return get_mlx_gate()
|
||||||
|
|
||||||
|
|
||||||
|
def _cache_key(query: str) -> str:
|
||||||
|
raw = f"{query}|{PROMPT_VERSION}|{_model_version()}"
|
||||||
|
return hashlib.sha256(raw.encode("utf-8")).hexdigest()
|
||||||
|
|
||||||
|
|
||||||
|
def get_cached(query: str) -> dict | None:
|
||||||
|
"""TTL 경과 entry는 자동 삭제. 없으면 None.
|
||||||
|
|
||||||
|
retrieval 경로에서 cache hit 판단용으로 호출. 호출 자체는 O(1).
|
||||||
|
"""
|
||||||
|
key = _cache_key(query)
|
||||||
|
entry = _CACHE.get(key)
|
||||||
|
if not entry:
|
||||||
|
return None
|
||||||
|
if time.time() - entry["ts"] > CACHE_TTL:
|
||||||
|
_CACHE.pop(key, None)
|
||||||
|
return None
|
||||||
|
return entry["value"]
|
||||||
|
|
||||||
|
|
||||||
|
def set_cached(query: str, value: dict) -> None:
|
||||||
|
"""저신뢰(< 0.5) / 빈 값은 캐시 금지. 상한 초과 시 FIFO eviction."""
|
||||||
|
if not value:
|
||||||
|
return
|
||||||
|
try:
|
||||||
|
conf = float(value.get("analyzer_confidence", 0.0) or 0.0)
|
||||||
|
except (TypeError, ValueError):
|
||||||
|
conf = 0.0
|
||||||
|
if conf < MIN_CACHE_CONFIDENCE:
|
||||||
|
return
|
||||||
|
key = _cache_key(query)
|
||||||
|
if key in _CACHE:
|
||||||
|
_CACHE[key] = {"value": value, "ts": time.time()}
|
||||||
|
return
|
||||||
|
if len(_CACHE) >= CACHE_MAXSIZE:
|
||||||
|
try:
|
||||||
|
oldest = next(iter(_CACHE))
|
||||||
|
_CACHE.pop(oldest, None)
|
||||||
|
except StopIteration:
|
||||||
|
pass
|
||||||
|
_CACHE[key] = {"value": value, "ts": time.time()}
|
||||||
|
|
||||||
|
|
||||||
|
def cache_stats() -> dict[str, int]:
|
||||||
|
"""debug/운영용 — 현재 캐시 크기 + inflight 수."""
|
||||||
|
return {
|
||||||
|
"size": len(_CACHE),
|
||||||
|
"maxsize": CACHE_MAXSIZE,
|
||||||
|
"inflight": len(_INFLIGHT),
|
||||||
|
"pending_tasks": len(_PENDING),
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
# ─── weight 정규화 (fusion 왜곡 방지) ───────────────────
|
||||||
|
def _normalize_weights(analysis: dict) -> dict:
|
||||||
|
"""normalized_queries를 MAX_NORMALIZED_QUERIES로 자르고 weight 합=1.0 정규화."""
|
||||||
|
queries = analysis.get("normalized_queries")
|
||||||
|
if not isinstance(queries, list) or not queries:
|
||||||
|
return analysis
|
||||||
|
|
||||||
|
sanitized: list[dict] = []
|
||||||
|
for q in queries[:MAX_NORMALIZED_QUERIES]:
|
||||||
|
if not isinstance(q, dict):
|
||||||
|
continue
|
||||||
|
lang = str(q.get("lang", "")).strip() or "ko"
|
||||||
|
text = str(q.get("text", "")).strip()
|
||||||
|
if not text:
|
||||||
|
continue
|
||||||
|
try:
|
||||||
|
w = float(q.get("weight", 1.0))
|
||||||
|
if w < 0:
|
||||||
|
w = 0.0
|
||||||
|
except (TypeError, ValueError):
|
||||||
|
w = 1.0
|
||||||
|
sanitized.append({"lang": lang, "text": text, "weight": w})
|
||||||
|
|
||||||
|
if not sanitized:
|
||||||
|
return analysis
|
||||||
|
|
||||||
|
total = sum(q["weight"] for q in sanitized)
|
||||||
|
if total <= 0:
|
||||||
|
equal = 1.0 / len(sanitized)
|
||||||
|
for q in sanitized:
|
||||||
|
q["weight"] = equal
|
||||||
|
else:
|
||||||
|
for q in sanitized:
|
||||||
|
q["weight"] /= total
|
||||||
|
|
||||||
|
analysis["normalized_queries"] = sanitized
|
||||||
|
return analysis
|
||||||
|
|
||||||
|
|
||||||
|
# ─── 프롬프트 로딩 (module 초기화 1회) ──────────────────
|
||||||
|
try:
|
||||||
|
ANALYZE_PROMPT = _load_prompt("query_analyze.txt")
|
||||||
|
except FileNotFoundError:
|
||||||
|
ANALYZE_PROMPT = ""
|
||||||
|
logger.warning("query_analyze.txt not found — analyzer will always return fallback")
|
||||||
|
|
||||||
|
|
||||||
|
# ─── 기본 fallback 응답 (None 금지) ─────────────────────
|
||||||
|
def _fallback(reason: str | None = None) -> dict:
|
||||||
|
"""LLM 실패/timeout/parse 실패 시 반환. analyzer_confidence는 반드시 float 0.0."""
|
||||||
|
result: dict[str, Any] = {
|
||||||
|
"intent": "semantic_search",
|
||||||
|
"query_type": "keyword",
|
||||||
|
"domain_hint": "mixed",
|
||||||
|
"language_scope": "limited",
|
||||||
|
"keywords": [],
|
||||||
|
"must_terms": [],
|
||||||
|
"optional_terms": [],
|
||||||
|
"hard_filters": {},
|
||||||
|
"soft_filters": {"domain": [], "document_type": []},
|
||||||
|
"normalized_queries": [],
|
||||||
|
"expanded_terms": [],
|
||||||
|
"synonyms": {},
|
||||||
|
"analyzer_confidence": 0.0,
|
||||||
|
}
|
||||||
|
if reason:
|
||||||
|
result["_fallback_reason"] = reason
|
||||||
|
return result
|
||||||
|
|
||||||
|
|
||||||
|
# ─── 메인 LLM 호출 (내부 사용) ──────────────────────────
|
||||||
|
async def analyze(query: str, ai_client: AIClient | None = None) -> dict:
|
||||||
|
"""쿼리 분석 결과 반환. 실패 시 analyzer_confidence=0.0 fallback.
|
||||||
|
|
||||||
|
**⚠️ 동기 검색 경로에서 직접 호출 금지**. 용도:
|
||||||
|
- `trigger_background_analysis` 내부 호출
|
||||||
|
- `prewarm_analyzer` startup 호출
|
||||||
|
- 디버깅/테스트
|
||||||
|
|
||||||
|
Args:
|
||||||
|
query: 사용자 쿼리 원문
|
||||||
|
ai_client: AIClient 인스턴스 (없으면 생성 후 자동 close)
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
dict — 최소 `analyzer_confidence` 키는 항상 float로 존재.
|
||||||
|
"""
|
||||||
|
if not query or not query.strip():
|
||||||
|
return _fallback("empty_query")
|
||||||
|
|
||||||
|
if not ANALYZE_PROMPT:
|
||||||
|
return _fallback("prompt_not_loaded")
|
||||||
|
|
||||||
|
# cache hit 즉시 반환 (prewarm 재호출 방지)
|
||||||
|
cached = get_cached(query)
|
||||||
|
if cached is not None:
|
||||||
|
return cached
|
||||||
|
|
||||||
|
client_owned = False
|
||||||
|
if ai_client is None:
|
||||||
|
ai_client = AIClient()
|
||||||
|
client_owned = True
|
||||||
|
|
||||||
|
t_start = time.perf_counter()
|
||||||
|
semaphore = _get_llm_semaphore()
|
||||||
|
# ⚠️ 중요: semaphore 대기는 timeout 포함되면 안됨 (대기만 해도 timeout 발동)
|
||||||
|
# timeout은 실제 LLM 호출 구간에만 적용.
|
||||||
|
try:
|
||||||
|
async with semaphore:
|
||||||
|
async with asyncio.timeout(LLM_TIMEOUT_MS / 1000):
|
||||||
|
raw = await ai_client._call_chat(
|
||||||
|
ai_client.ai.primary,
|
||||||
|
ANALYZE_PROMPT.replace("{query}", query),
|
||||||
|
)
|
||||||
|
except asyncio.TimeoutError:
|
||||||
|
elapsed = (time.perf_counter() - t_start) * 1000
|
||||||
|
logger.warning(
|
||||||
|
"query_analyze timeout query=%r elapsed_ms=%.0f (LLM_TIMEOUT_MS=%d)",
|
||||||
|
query[:80],
|
||||||
|
elapsed,
|
||||||
|
LLM_TIMEOUT_MS,
|
||||||
|
)
|
||||||
|
return _fallback("timeout")
|
||||||
|
except Exception as exc:
|
||||||
|
elapsed = (time.perf_counter() - t_start) * 1000
|
||||||
|
logger.warning(
|
||||||
|
"query_analyze LLM error query=%r elapsed_ms=%.0f err=%r",
|
||||||
|
query[:80],
|
||||||
|
elapsed,
|
||||||
|
exc,
|
||||||
|
)
|
||||||
|
return _fallback(f"llm_error:{type(exc).__name__}")
|
||||||
|
finally:
|
||||||
|
if client_owned:
|
||||||
|
try:
|
||||||
|
await ai_client.close()
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
elapsed_ms = (time.perf_counter() - t_start) * 1000
|
||||||
|
|
||||||
|
parsed = parse_json_response(raw)
|
||||||
|
if not isinstance(parsed, dict):
|
||||||
|
logger.warning(
|
||||||
|
"query_analyze parse failed query=%r elapsed_ms=%.0f raw=%r",
|
||||||
|
query[:80],
|
||||||
|
elapsed_ms,
|
||||||
|
(raw or "")[:200],
|
||||||
|
)
|
||||||
|
return _fallback("parse_failed")
|
||||||
|
|
||||||
|
try:
|
||||||
|
conf = float(parsed.get("analyzer_confidence", 0.0) or 0.0)
|
||||||
|
except (TypeError, ValueError):
|
||||||
|
conf = 0.0
|
||||||
|
parsed["analyzer_confidence"] = conf
|
||||||
|
parsed = _normalize_weights(parsed)
|
||||||
|
|
||||||
|
logger.info(
|
||||||
|
"query_analyze ok query=%r conf=%.2f intent=%s domain=%s elapsed_ms=%.0f",
|
||||||
|
query[:80],
|
||||||
|
conf,
|
||||||
|
parsed.get("intent"),
|
||||||
|
parsed.get("domain_hint"),
|
||||||
|
elapsed_ms,
|
||||||
|
)
|
||||||
|
|
||||||
|
set_cached(query, parsed)
|
||||||
|
return parsed
|
||||||
|
|
||||||
|
|
||||||
|
# ─── Background trigger (retrieval 경로에서 사용) ───────
|
||||||
|
async def _background_analyze(query: str) -> None:
|
||||||
|
"""Background task wrapper — inflight 집합 관리 + 예외 삼킴."""
|
||||||
|
try:
|
||||||
|
await analyze(query)
|
||||||
|
except Exception as exc:
|
||||||
|
logger.warning("background analyze crashed query=%r err=%r", query[:80], exc)
|
||||||
|
finally:
|
||||||
|
_INFLIGHT.discard(query)
|
||||||
|
|
||||||
|
|
||||||
|
def trigger_background_analysis(query: str) -> bool:
|
||||||
|
"""retrieval 경로에서 호출. cache miss 시 background task 생성.
|
||||||
|
|
||||||
|
- 동기 함수. 즉시 반환.
|
||||||
|
- 이미 inflight 또는 cache hit이면 아무 작업 X, False 반환.
|
||||||
|
- 새 task 생성 시 True 반환.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
bool — task 실제로 생성되었는지 여부
|
||||||
|
"""
|
||||||
|
if not query or not query.strip():
|
||||||
|
return False
|
||||||
|
if query in _INFLIGHT:
|
||||||
|
return False
|
||||||
|
if get_cached(query) is not None:
|
||||||
|
return False
|
||||||
|
|
||||||
|
try:
|
||||||
|
loop = asyncio.get_running_loop()
|
||||||
|
except RuntimeError:
|
||||||
|
logger.warning("trigger_background_analysis called outside event loop")
|
||||||
|
return False
|
||||||
|
|
||||||
|
_INFLIGHT.add(query)
|
||||||
|
task = loop.create_task(_background_analyze(query))
|
||||||
|
_PENDING.add(task)
|
||||||
|
task.add_done_callback(_PENDING.discard)
|
||||||
|
return True
|
||||||
|
|
||||||
|
|
||||||
|
# ─── Prewarm (app startup) ──────────────────────────────
|
||||||
|
# 운영에서 자주 발생하는 쿼리 샘플. 통계 기반으로 확장 예정.
|
||||||
|
DEFAULT_PREWARM_QUERIES: list[str] = [
|
||||||
|
# fixed 7 (Phase 2 평가셋 core)
|
||||||
|
"산업안전보건법 제6장",
|
||||||
|
"기계 사고 관련 법령",
|
||||||
|
"AI 산업 동향",
|
||||||
|
"Python async best practice",
|
||||||
|
"유해화학물질을 다루는 회사가 지켜야 할 안전 의무",
|
||||||
|
"recent AI safety news from Europe",
|
||||||
|
"이세상에 존재하지 않는 문서명",
|
||||||
|
# 법령 관련
|
||||||
|
"산업안전보건법 시행령",
|
||||||
|
"화학물질관리법",
|
||||||
|
"위험성평가 절차",
|
||||||
|
# 뉴스 관련
|
||||||
|
"EU AI Act",
|
||||||
|
"한국 AI 산업",
|
||||||
|
# 실무
|
||||||
|
"안전보건 교육 자료",
|
||||||
|
"사고 조사 보고서",
|
||||||
|
"MSDS 작성 방법",
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
|
async def prewarm_analyzer(
|
||||||
|
queries: list[str] | None = None,
|
||||||
|
delay_between: float = 0.2,
|
||||||
|
) -> dict[str, Any]:
|
||||||
|
"""app startup에서 호출. 대표 쿼리를 미리 분석해 cache에 적재.
|
||||||
|
|
||||||
|
Non-blocking으로 사용: `asyncio.create_task(prewarm_analyzer())`.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
queries: 분석할 쿼리 리스트. None이면 DEFAULT_PREWARM_QUERIES 사용.
|
||||||
|
delay_between: 각 쿼리 간 대기 시간 (MLX 부하 완화).
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
dict — {queries_total, success, failed, elapsed_ms, cache_size_after}
|
||||||
|
"""
|
||||||
|
if not ANALYZE_PROMPT:
|
||||||
|
logger.warning("prewarm skipped — prompt not loaded")
|
||||||
|
return {"status": "skipped", "reason": "prompt_not_loaded"}
|
||||||
|
|
||||||
|
targets = queries if queries is not None else DEFAULT_PREWARM_QUERIES
|
||||||
|
total = len(targets)
|
||||||
|
success = 0
|
||||||
|
failed = 0
|
||||||
|
t_start = time.perf_counter()
|
||||||
|
|
||||||
|
logger.info("prewarm_analyzer start queries=%d timeout_ms=%d", total, LLM_TIMEOUT_MS)
|
||||||
|
|
||||||
|
client = AIClient()
|
||||||
|
try:
|
||||||
|
for i, q in enumerate(targets, 1):
|
||||||
|
if get_cached(q) is not None:
|
||||||
|
logger.info("prewarm skip (already cached) [%d/%d] %r", i, total, q[:40])
|
||||||
|
success += 1
|
||||||
|
continue
|
||||||
|
result = await analyze(q, ai_client=client)
|
||||||
|
conf = float(result.get("analyzer_confidence", 0.0) or 0.0)
|
||||||
|
if conf >= MIN_CACHE_CONFIDENCE:
|
||||||
|
success += 1
|
||||||
|
logger.info("prewarm ok [%d/%d] conf=%.2f q=%r", i, total, conf, q[:40])
|
||||||
|
else:
|
||||||
|
failed += 1
|
||||||
|
reason = result.get("_fallback_reason", "low_conf")
|
||||||
|
logger.warning(
|
||||||
|
"prewarm fail [%d/%d] reason=%s q=%r", i, total, reason, q[:40]
|
||||||
|
)
|
||||||
|
if delay_between > 0 and i < total:
|
||||||
|
await asyncio.sleep(delay_between)
|
||||||
|
finally:
|
||||||
|
try:
|
||||||
|
await client.close()
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
elapsed_ms = (time.perf_counter() - t_start) * 1000
|
||||||
|
stats = cache_stats()
|
||||||
|
logger.info(
|
||||||
|
"prewarm_analyzer done total=%d success=%d failed=%d elapsed_ms=%.0f cache_size=%d",
|
||||||
|
total,
|
||||||
|
success,
|
||||||
|
failed,
|
||||||
|
elapsed_ms,
|
||||||
|
stats["size"],
|
||||||
|
)
|
||||||
|
return {
|
||||||
|
"status": "complete",
|
||||||
|
"queries_total": total,
|
||||||
|
"success": success,
|
||||||
|
"failed": failed,
|
||||||
|
"elapsed_ms": elapsed_ms,
|
||||||
|
"cache_size_after": stats["size"],
|
||||||
|
}
|
||||||
|
|||||||
@@ -134,7 +134,12 @@ async def rerank_chunks(
|
|||||||
if idx is None or sc is None or idx >= len(candidates):
|
if idx is None or sc is None or idx >= len(candidates):
|
||||||
continue
|
continue
|
||||||
chunk = candidates[idx]
|
chunk = candidates[idx]
|
||||||
chunk.score = float(sc)
|
score = float(sc)
|
||||||
|
chunk.score = score
|
||||||
|
# Phase 3.1: reranker raw 점수를 별도 필드에 보존.
|
||||||
|
# normalize_display_scores가 나중에 .score를 랭크 기반으로 덮어써도
|
||||||
|
# fast-path 판단에 쓸 수 있는 원본 신호 유지.
|
||||||
|
chunk.rerank_score = score
|
||||||
chunk.match_reason = (chunk.match_reason or "") + "+rerank"
|
chunk.match_reason = (chunk.match_reason or "") + "+rerank"
|
||||||
reranked.append(chunk)
|
reranked.append(chunk)
|
||||||
return reranked[:limit]
|
return reranked[:limit]
|
||||||
|
|||||||
@@ -1,4 +1,4 @@
|
|||||||
"""검색 후보 수집 서비스 (Phase 1.2).
|
"""검색 후보 수집 서비스 (Phase 1.2 + Phase 2.2 multilingual).
|
||||||
|
|
||||||
text(documents FTS + trigram) + vector(documents.embedding + chunks.embedding hybrid) 후보를
|
text(documents FTS + trigram) + vector(documents.embedding + chunks.embedding hybrid) 후보를
|
||||||
SearchResult 리스트로 반환.
|
SearchResult 리스트로 반환.
|
||||||
@@ -10,27 +10,80 @@ Phase 1.2-G: doc + chunks hybrid retrieval 보강.
|
|||||||
- documents.embedding (recall robust, 자연어 매칭 강함)
|
- documents.embedding (recall robust, 자연어 매칭 강함)
|
||||||
- document_chunks.embedding (precision, segment 매칭)
|
- document_chunks.embedding (precision, segment 매칭)
|
||||||
- 두 SQL 동시 호출 후 doc_id 기준 merge (chunk 가중치 1.2, doc 1.0)
|
- 두 SQL 동시 호출 후 doc_id 기준 merge (chunk 가중치 1.2, doc 1.0)
|
||||||
|
|
||||||
|
Phase 2.2 추가:
|
||||||
|
- _QUERY_EMBED_CACHE: bge-m3 query embedding 캐시 (모듈 레벨 LRU, TTL 24h)
|
||||||
|
- search_vector_multilingual: normalized_queries (lang별 쿼리) 배열 지원
|
||||||
|
QueryAnalyzer cache hit + analyzer_tier >= merge 일 때만 호출.
|
||||||
|
- crosslingual_ko_en NDCG 0.53 → 0.65+ 목표
|
||||||
"""
|
"""
|
||||||
|
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
import asyncio
|
import asyncio
|
||||||
from typing import TYPE_CHECKING
|
import hashlib
|
||||||
|
import time
|
||||||
|
from typing import TYPE_CHECKING, Any
|
||||||
|
|
||||||
from sqlalchemy import text
|
from sqlalchemy import text
|
||||||
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker
|
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker
|
||||||
|
|
||||||
from ai.client import AIClient
|
from ai.client import AIClient
|
||||||
from core.database import engine
|
from core.database import engine
|
||||||
|
from core.utils import setup_logger
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
from api.search import SearchResult
|
from api.search import SearchResult
|
||||||
|
|
||||||
|
|
||||||
|
logger = setup_logger("retrieval_service")
|
||||||
|
|
||||||
# Hybrid merge 가중치 (1.2-G)
|
# Hybrid merge 가중치 (1.2-G)
|
||||||
DOC_VECTOR_WEIGHT = 1.0
|
DOC_VECTOR_WEIGHT = 1.0
|
||||||
CHUNK_VECTOR_WEIGHT = 1.2
|
CHUNK_VECTOR_WEIGHT = 1.2
|
||||||
|
|
||||||
|
# ─── Phase 2.2: Query embedding cache ───────────────────
|
||||||
|
# bge-m3 호출 비용 절반 감소 (동일 normalized_query 재호출 방지)
|
||||||
|
_QUERY_EMBED_CACHE: dict[str, dict[str, Any]] = {}
|
||||||
|
QUERY_EMBED_TTL = 86400 # 24h
|
||||||
|
QUERY_EMBED_MAXSIZE = 500
|
||||||
|
|
||||||
|
|
||||||
|
def _query_embed_key(text_: str) -> str:
|
||||||
|
return hashlib.sha256(f"{text_}|bge-m3".encode("utf-8")).hexdigest()
|
||||||
|
|
||||||
|
|
||||||
|
async def _get_query_embedding(
|
||||||
|
client: AIClient, text_: str
|
||||||
|
) -> list[float] | None:
|
||||||
|
"""Query embedding with in-memory cache.
|
||||||
|
|
||||||
|
동일 텍스트 재호출 시 bge-m3 skip. fixed query 회귀 시 vector_ms 대폭 감소.
|
||||||
|
"""
|
||||||
|
if not text_:
|
||||||
|
return None
|
||||||
|
key = _query_embed_key(text_)
|
||||||
|
entry = _QUERY_EMBED_CACHE.get(key)
|
||||||
|
if entry and time.time() - entry["ts"] < QUERY_EMBED_TTL:
|
||||||
|
return entry["emb"]
|
||||||
|
try:
|
||||||
|
emb = await client.embed(text_)
|
||||||
|
except Exception as exc:
|
||||||
|
logger.warning("query embed failed text=%r err=%r", text_[:40], exc)
|
||||||
|
return None
|
||||||
|
if len(_QUERY_EMBED_CACHE) >= QUERY_EMBED_MAXSIZE:
|
||||||
|
try:
|
||||||
|
oldest = next(iter(_QUERY_EMBED_CACHE))
|
||||||
|
_QUERY_EMBED_CACHE.pop(oldest, None)
|
||||||
|
except StopIteration:
|
||||||
|
pass
|
||||||
|
_QUERY_EMBED_CACHE[key] = {"emb": emb, "ts": time.time()}
|
||||||
|
return emb
|
||||||
|
|
||||||
|
|
||||||
|
def query_embed_cache_stats() -> dict[str, int]:
|
||||||
|
return {"size": len(_QUERY_EMBED_CACHE), "maxsize": QUERY_EMBED_MAXSIZE}
|
||||||
|
|
||||||
|
|
||||||
async def search_text(
|
async def search_text(
|
||||||
session: AsyncSession, query: str, limit: int
|
session: AsyncSession, query: str, limit: int
|
||||||
@@ -153,11 +206,16 @@ async def search_vector(
|
|||||||
list[SearchResult] — doc_id 중복 제거됨. compress_chunks_to_docs는 그대로 동작.
|
list[SearchResult] — doc_id 중복 제거됨. compress_chunks_to_docs는 그대로 동작.
|
||||||
chunks_by_doc은 search.py에서 group_by_doc으로 보존.
|
chunks_by_doc은 search.py에서 group_by_doc으로 보존.
|
||||||
"""
|
"""
|
||||||
try:
|
|
||||||
client = AIClient()
|
client = AIClient()
|
||||||
query_embedding = await client.embed(query)
|
try:
|
||||||
|
query_embedding = await _get_query_embedding(client, query)
|
||||||
|
finally:
|
||||||
|
try:
|
||||||
await client.close()
|
await client.close()
|
||||||
except Exception:
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
if query_embedding is None:
|
||||||
return []
|
return []
|
||||||
|
|
||||||
embedding_str = str(query_embedding)
|
embedding_str = str(query_embedding)
|
||||||
@@ -307,6 +365,100 @@ def _merge_doc_and_chunk_vectors(
|
|||||||
return sorted(by_doc_id.values(), key=lambda r: r.score, reverse=True)
|
return sorted(by_doc_id.values(), key=lambda r: r.score, reverse=True)
|
||||||
|
|
||||||
|
|
||||||
|
async def search_vector_multilingual(
|
||||||
|
session: AsyncSession,
|
||||||
|
normalized_queries: list[dict],
|
||||||
|
limit: int,
|
||||||
|
) -> list["SearchResult"]:
|
||||||
|
"""Phase 2.2 — 다국어 normalized_queries 배열로 vector retrieval.
|
||||||
|
|
||||||
|
각 language query에 대해 embedding을 병렬 생성(cache hit 활용),
|
||||||
|
각 embedding에 대해 기존 docs+chunks hybrid 호출,
|
||||||
|
결과를 weight 기반으로 merge.
|
||||||
|
|
||||||
|
⚠️ 호출 조건:
|
||||||
|
- QueryAnalyzer cache hit 이어야 함 (async-only 룰)
|
||||||
|
- analyzer_confidence 높고 normalized_queries 존재해야 함
|
||||||
|
- search.py에서만 호출. retrieval 경로 동기 LLM 호출 금지 룰 준수.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
session: AsyncSession (호출자 관리, 본 함수 내부는 sessionmaker로 별도 연결 사용)
|
||||||
|
normalized_queries: [{"lang": "ko", "text": "...", "weight": 0.56}, ...]
|
||||||
|
weight는 _normalize_weights로 이미 합=1.0 정규화된 상태.
|
||||||
|
limit: 상위 결과 개수
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
list[SearchResult] — doc_id 중복 제거. merged score = sum(per-query score * lang_weight).
|
||||||
|
"""
|
||||||
|
if not normalized_queries:
|
||||||
|
return []
|
||||||
|
|
||||||
|
# 1. 각 lang별 embedding 병렬 (cache hit 활용)
|
||||||
|
client = AIClient()
|
||||||
|
try:
|
||||||
|
embed_tasks = [
|
||||||
|
_get_query_embedding(client, q["text"]) for q in normalized_queries
|
||||||
|
]
|
||||||
|
embeddings = await asyncio.gather(*embed_tasks)
|
||||||
|
finally:
|
||||||
|
try:
|
||||||
|
await client.close()
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
# embedding 실패한 query는 skip (weight 재정규화 없이 조용히 drop)
|
||||||
|
per_query_plan: list[tuple[dict, str]] = []
|
||||||
|
for q, emb in zip(normalized_queries, embeddings):
|
||||||
|
if emb is None:
|
||||||
|
logger.warning("multilingual embed skipped lang=%s", q.get("lang"))
|
||||||
|
continue
|
||||||
|
per_query_plan.append((q, str(emb)))
|
||||||
|
|
||||||
|
if not per_query_plan:
|
||||||
|
return []
|
||||||
|
|
||||||
|
# 2. 각 embedding에 대해 doc + chunks 병렬 retrieval
|
||||||
|
Session = async_sessionmaker(engine)
|
||||||
|
|
||||||
|
async def _one_query(q_meta: dict, embedding_str: str) -> list["SearchResult"]:
|
||||||
|
async def _docs() -> list["SearchResult"]:
|
||||||
|
async with Session() as s:
|
||||||
|
return await _search_vector_docs(s, embedding_str, limit * 4)
|
||||||
|
|
||||||
|
async def _chunks() -> list["SearchResult"]:
|
||||||
|
async with Session() as s:
|
||||||
|
return await _search_vector_chunks(s, embedding_str, limit * 4)
|
||||||
|
|
||||||
|
doc_r, chunk_r = await asyncio.gather(_docs(), _chunks())
|
||||||
|
return _merge_doc_and_chunk_vectors(doc_r, chunk_r)
|
||||||
|
|
||||||
|
per_query_results = await asyncio.gather(
|
||||||
|
*(_one_query(q, emb_str) for q, emb_str in per_query_plan)
|
||||||
|
)
|
||||||
|
|
||||||
|
# 3. weight 기반 merge — doc_id 중복 시 weighted score 합산
|
||||||
|
merged: dict[int, "SearchResult"] = {}
|
||||||
|
for (q_meta, _emb_str), results in zip(per_query_plan, per_query_results):
|
||||||
|
weight = float(q_meta.get("weight", 1.0) or 1.0)
|
||||||
|
for r in results:
|
||||||
|
weighted = r.score * weight
|
||||||
|
prev = merged.get(r.id)
|
||||||
|
if prev is None:
|
||||||
|
# 첫 방문: 원본을 shallow copy 대신 직접 wrap
|
||||||
|
r.score = weighted
|
||||||
|
r.match_reason = f"ml_{q_meta.get('lang', '?')}"
|
||||||
|
merged[r.id] = r
|
||||||
|
else:
|
||||||
|
# 중복: score 누적, 가장 높은 weight 소스로 match_reason 표시
|
||||||
|
prev.score += weighted
|
||||||
|
# match_reason 병합 (가독성)
|
||||||
|
if q_meta.get("lang") and q_meta.get("lang") not in (prev.match_reason or ""):
|
||||||
|
prev.match_reason = (prev.match_reason or "ml") + f"+{q_meta['lang']}"
|
||||||
|
|
||||||
|
sorted_results = sorted(merged.values(), key=lambda r: r.score, reverse=True)
|
||||||
|
return sorted_results[: limit * 4] # rerank 후보로 넉넉히
|
||||||
|
|
||||||
|
|
||||||
def compress_chunks_to_docs(
|
def compress_chunks_to_docs(
|
||||||
chunks: list["SearchResult"], limit: int
|
chunks: list["SearchResult"], limit: int
|
||||||
) -> tuple[list["SearchResult"], dict[int, list["SearchResult"]]]:
|
) -> tuple[list["SearchResult"], dict[int, list["SearchResult"]]]:
|
||||||
|
|||||||
335
app/services/search/search_pipeline.py
Normal file
335
app/services/search/search_pipeline.py
Normal file
@@ -0,0 +1,335 @@
|
|||||||
|
"""검색 파이프라인 오케스트레이션 (Phase 3.1).
|
||||||
|
|
||||||
|
`/api/search/` 와 `/api/search/ask` 가 공유하는 단일 진실 소스.
|
||||||
|
|
||||||
|
## 순수성 규칙 (영구)
|
||||||
|
|
||||||
|
`run_search()`는 wrapper(endpoint)에서 side effect를 최대한 분리한다:
|
||||||
|
- ❌ **금지**: `BackgroundTasks` 파라미터, `logger.info(...)` 직접 호출,
|
||||||
|
`record_search_event()` 호출, `SearchResponse`/`AskResponse` 직렬화
|
||||||
|
- ✅ **허용**: `trigger_background_analysis()` (analyzer cache miss 시
|
||||||
|
fire-and-forget task — retrieval 전략의 일부, 자가 완결됨)
|
||||||
|
- ✅ **허용**: retrieval / fusion / rerank / diversity / display 정규화 /
|
||||||
|
confidence 계산 같은 내부 서비스 호출
|
||||||
|
|
||||||
|
반환값은 `PipelineResult` 하나. wrapper가 그 안에서 필요한 필드를 꺼내
|
||||||
|
logger / telemetry / 응답 직렬화를 수행한다.
|
||||||
|
|
||||||
|
## Phase 2 호환
|
||||||
|
|
||||||
|
본 모듈은 기존 `app/api/search.py::search()` 함수 본문을 lift-and-shift 한
|
||||||
|
것이다. 변수명 / notes 문자열 / timing 키 / logger 포맷 은 wrapper 쪽에서
|
||||||
|
완전히 동일하게 재구성된다. refactor 전후 `/search?debug=true` 응답은
|
||||||
|
byte-level 에 가깝게 일치해야 한다.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import time
|
||||||
|
from dataclasses import dataclass, field
|
||||||
|
from typing import TYPE_CHECKING, Literal
|
||||||
|
|
||||||
|
from sqlalchemy.ext.asyncio import AsyncSession
|
||||||
|
|
||||||
|
from . import query_analyzer
|
||||||
|
from .fusion_service import (
|
||||||
|
DEFAULT_FUSION,
|
||||||
|
apply_soft_filter_boost,
|
||||||
|
get_strategy,
|
||||||
|
normalize_display_scores,
|
||||||
|
)
|
||||||
|
from .rerank_service import (
|
||||||
|
MAX_CHUNKS_PER_DOC,
|
||||||
|
MAX_RERANK_INPUT,
|
||||||
|
apply_diversity,
|
||||||
|
rerank_chunks,
|
||||||
|
)
|
||||||
|
from .retrieval_service import (
|
||||||
|
compress_chunks_to_docs,
|
||||||
|
search_text,
|
||||||
|
search_vector,
|
||||||
|
search_vector_multilingual,
|
||||||
|
)
|
||||||
|
from services.search_telemetry import (
|
||||||
|
compute_confidence,
|
||||||
|
compute_confidence_hybrid,
|
||||||
|
compute_confidence_reranked,
|
||||||
|
)
|
||||||
|
|
||||||
|
if TYPE_CHECKING:
|
||||||
|
from api.search import SearchResult
|
||||||
|
|
||||||
|
|
||||||
|
# ─── Phase 2.1: analyzer_confidence 3단계 게이트 ──────────
|
||||||
|
# search.py 에서 이동. search.py 의 /search wrapper 는 이 상수들을
|
||||||
|
# 노출할 필요 없으므로 파이프라인 모듈에만 둔다.
|
||||||
|
ANALYZER_TIER_IGNORE = 0.5 # < 0.5 → analyzer 완전 무시, soft_filter 비활성
|
||||||
|
ANALYZER_TIER_ORIGINAL = 0.7 # < 0.7 → original query fallback
|
||||||
|
ANALYZER_TIER_MERGE = 0.85 # < 0.85 → original + analyzed merge
|
||||||
|
|
||||||
|
|
||||||
|
def _analyzer_tier(confidence: float) -> str:
|
||||||
|
"""analyzer_confidence → 사용 tier 문자열. Phase 2.2/2.3에서 실제 분기용."""
|
||||||
|
if confidence < ANALYZER_TIER_IGNORE:
|
||||||
|
return "ignore"
|
||||||
|
if confidence < ANALYZER_TIER_ORIGINAL:
|
||||||
|
return "original_fallback"
|
||||||
|
if confidence < ANALYZER_TIER_MERGE:
|
||||||
|
return "merge"
|
||||||
|
return "analyzed"
|
||||||
|
|
||||||
|
|
||||||
|
# ─── 반환 타입 ─────────────────────────────────────────────
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass(slots=True)
|
||||||
|
class PipelineResult:
|
||||||
|
"""run_search() 반환 — wrapper 가 필요한 모든 state 를 담는다."""
|
||||||
|
|
||||||
|
# ── 최종 결과 (API 노출) ──
|
||||||
|
results: "list[SearchResult]"
|
||||||
|
mode: str
|
||||||
|
confidence_signal: float
|
||||||
|
|
||||||
|
# ── 중간 단계 (evidence 입력 + debug) ──
|
||||||
|
text_results: "list[SearchResult]"
|
||||||
|
vector_results: "list[SearchResult]" # doc 압축 후
|
||||||
|
raw_chunks: "list[SearchResult]" # chunk 원본 (rerank/evidence용)
|
||||||
|
chunks_by_doc: "dict[int, list[SearchResult]]"
|
||||||
|
|
||||||
|
# ── 쿼리 분석 메타 ──
|
||||||
|
query_analysis: dict | None
|
||||||
|
analyzer_cache_hit: bool
|
||||||
|
analyzer_confidence: float # 항상 float (None 금지)
|
||||||
|
analyzer_tier: str
|
||||||
|
|
||||||
|
# ── 관측 ──
|
||||||
|
timing_ms: dict[str, float] = field(default_factory=dict)
|
||||||
|
notes: list[str] = field(default_factory=list)
|
||||||
|
|
||||||
|
|
||||||
|
# ─── 메인 파이프라인 ───────────────────────────────────────
|
||||||
|
|
||||||
|
|
||||||
|
async def run_search(
|
||||||
|
session: AsyncSession,
|
||||||
|
q: str,
|
||||||
|
*,
|
||||||
|
mode: Literal["fts", "trgm", "vector", "hybrid"] = "hybrid",
|
||||||
|
limit: int = 20,
|
||||||
|
fusion: str = DEFAULT_FUSION,
|
||||||
|
rerank: bool = True,
|
||||||
|
analyze: bool = False,
|
||||||
|
) -> PipelineResult:
|
||||||
|
"""검색 파이프라인 실행.
|
||||||
|
|
||||||
|
retrieval → fusion → rerank → diversity → display 정규화 → confidence 계산
|
||||||
|
까지 수행하고 `PipelineResult` 를 반환한다. logging / BackgroundTasks /
|
||||||
|
응답 직렬화는 절대 수행하지 않는다 (wrapper 책임).
|
||||||
|
|
||||||
|
Args:
|
||||||
|
session: AsyncSession (caller 가 관리)
|
||||||
|
q: 사용자 쿼리 원문
|
||||||
|
mode: fts | trgm | vector | hybrid
|
||||||
|
limit: 최종 결과 수 (hybrid 에서는 fusion 입력 후보 수는 이보다 넓음)
|
||||||
|
fusion: legacy | rrf | rrf_boost
|
||||||
|
rerank: bge-reranker-v2-m3 활성화 (hybrid 전용)
|
||||||
|
analyze: QueryAnalyzer 활성화 (cache hit 조건부 멀티링구얼 / soft filter)
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
PipelineResult
|
||||||
|
"""
|
||||||
|
# 로컬 import — circular 방지 (SearchResult 는 api.search 에 inline 선언)
|
||||||
|
from api.search import SearchResult # noqa: F401 — TYPE_CHECKING 실런타임 반영
|
||||||
|
|
||||||
|
timing: dict[str, float] = {}
|
||||||
|
notes: list[str] = []
|
||||||
|
text_results: list["SearchResult"] = []
|
||||||
|
vector_results: list["SearchResult"] = [] # doc-level (압축 후, fusion 입력)
|
||||||
|
raw_chunks: list["SearchResult"] = [] # chunk-level (raw, Phase 1.3 reranker용)
|
||||||
|
chunks_by_doc: dict[int, list["SearchResult"]] = {} # Phase 1.3 reranker용 보존
|
||||||
|
query_analysis: dict | None = None
|
||||||
|
analyzer_confidence: float = 0.0
|
||||||
|
analyzer_tier: str = "disabled"
|
||||||
|
|
||||||
|
t_total = time.perf_counter()
|
||||||
|
|
||||||
|
# Phase 2.1 (async 구조): QueryAnalyzer는 동기 호출 금지.
|
||||||
|
# - cache hit → query_analysis 활용 (Phase 2.2/2.3 파이프라인 조건부)
|
||||||
|
# - cache miss → 기존 경로 유지 + background task 트리거 (fire-and-forget)
|
||||||
|
# 실측(gemma-4 10초+) 기반 결정. memory: feedback_analyzer_async_only.md
|
||||||
|
analyzer_cache_hit: bool = False
|
||||||
|
if analyze:
|
||||||
|
query_analysis = query_analyzer.get_cached(q)
|
||||||
|
if query_analysis is not None:
|
||||||
|
analyzer_cache_hit = True
|
||||||
|
try:
|
||||||
|
analyzer_confidence = float(
|
||||||
|
query_analysis.get("analyzer_confidence", 0.0) or 0.0
|
||||||
|
)
|
||||||
|
except (TypeError, ValueError):
|
||||||
|
analyzer_confidence = 0.0
|
||||||
|
analyzer_tier = _analyzer_tier(analyzer_confidence)
|
||||||
|
notes.append(
|
||||||
|
f"analyzer cache_hit conf={analyzer_confidence:.2f} tier={analyzer_tier}"
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
# cache miss → background analyzer 트리거 (retrieval 차단 X)
|
||||||
|
triggered = query_analyzer.trigger_background_analysis(q)
|
||||||
|
analyzer_tier = "cache_miss"
|
||||||
|
notes.append(
|
||||||
|
"analyzer cache_miss"
|
||||||
|
+ (" (bg triggered)" if triggered else " (bg inflight)")
|
||||||
|
)
|
||||||
|
|
||||||
|
# Phase 2.2: multilingual vector search 활성 조건 (보수적)
|
||||||
|
# - cache hit + analyzer_tier == "analyzed" (≥0.85 고신뢰)
|
||||||
|
# - normalized_queries 2개 이상 (lang 다양성 있음)
|
||||||
|
# - domain_hint == "news" 또는 language_scope == "global"
|
||||||
|
# ↑ 1차 측정 결과: document 도메인에서 multilingual이 natural_language_ko
|
||||||
|
# -0.10 악화시킴. 영어 번역이 한국어 법령 검색에서 noise로 작용.
|
||||||
|
# news / global 영역에서만 multilingual 활성 (news_crosslingual +0.10 개선 확인).
|
||||||
|
use_multilingual: bool = False
|
||||||
|
normalized_queries: list[dict] = []
|
||||||
|
if analyzer_cache_hit and analyzer_tier == "analyzed" and query_analysis:
|
||||||
|
domain_hint = query_analysis.get("domain_hint", "mixed")
|
||||||
|
language_scope = query_analysis.get("language_scope", "limited")
|
||||||
|
is_multilingual_candidate = (
|
||||||
|
domain_hint == "news" or language_scope == "global"
|
||||||
|
)
|
||||||
|
if is_multilingual_candidate:
|
||||||
|
raw_nq = query_analysis.get("normalized_queries") or []
|
||||||
|
if isinstance(raw_nq, list) and len(raw_nq) >= 2:
|
||||||
|
normalized_queries = [
|
||||||
|
nq for nq in raw_nq if isinstance(nq, dict) and nq.get("text")
|
||||||
|
]
|
||||||
|
if len(normalized_queries) >= 2:
|
||||||
|
use_multilingual = True
|
||||||
|
notes.append(
|
||||||
|
f"multilingual langs={[nq.get('lang') for nq in normalized_queries]}"
|
||||||
|
f" hint={domain_hint}/{language_scope}"
|
||||||
|
)
|
||||||
|
|
||||||
|
if mode == "vector":
|
||||||
|
t0 = time.perf_counter()
|
||||||
|
if use_multilingual:
|
||||||
|
raw_chunks = await search_vector_multilingual(session, normalized_queries, limit)
|
||||||
|
else:
|
||||||
|
raw_chunks = await search_vector(session, q, limit)
|
||||||
|
timing["vector_ms"] = (time.perf_counter() - t0) * 1000
|
||||||
|
if not raw_chunks:
|
||||||
|
notes.append("vector_search_returned_empty (AI client error or no embeddings)")
|
||||||
|
# vector 단독 모드도 doc 압축해서 다양성 확보 (chunk 중복 방지)
|
||||||
|
vector_results, chunks_by_doc = compress_chunks_to_docs(raw_chunks, limit)
|
||||||
|
results = vector_results
|
||||||
|
else:
|
||||||
|
t0 = time.perf_counter()
|
||||||
|
text_results = await search_text(session, q, limit)
|
||||||
|
timing["text_ms"] = (time.perf_counter() - t0) * 1000
|
||||||
|
|
||||||
|
if mode == "hybrid":
|
||||||
|
t1 = time.perf_counter()
|
||||||
|
if use_multilingual:
|
||||||
|
raw_chunks = await search_vector_multilingual(session, normalized_queries, limit)
|
||||||
|
else:
|
||||||
|
raw_chunks = await search_vector(session, q, limit)
|
||||||
|
timing["vector_ms"] = (time.perf_counter() - t1) * 1000
|
||||||
|
|
||||||
|
# chunk-level → doc-level 압축 (raw chunks는 chunks_by_doc에 보존)
|
||||||
|
t1b = time.perf_counter()
|
||||||
|
vector_results, chunks_by_doc = compress_chunks_to_docs(raw_chunks, limit)
|
||||||
|
timing["compress_ms"] = (time.perf_counter() - t1b) * 1000
|
||||||
|
|
||||||
|
if not vector_results:
|
||||||
|
notes.append("vector_search_returned_empty — text-only fallback")
|
||||||
|
|
||||||
|
t2 = time.perf_counter()
|
||||||
|
strategy = get_strategy(fusion)
|
||||||
|
# fusion은 doc 기준 — 더 넓게 가져옴 (rerank 후보용)
|
||||||
|
fusion_limit = max(limit * 5, 100) if rerank else limit
|
||||||
|
fused_docs = strategy.fuse(text_results, vector_results, q, fusion_limit)
|
||||||
|
timing["fusion_ms"] = (time.perf_counter() - t2) * 1000
|
||||||
|
notes.append(f"fusion={strategy.name}")
|
||||||
|
notes.append(
|
||||||
|
f"chunks raw={len(raw_chunks)} compressed={len(vector_results)} "
|
||||||
|
f"unique_docs={len(chunks_by_doc)}"
|
||||||
|
)
|
||||||
|
|
||||||
|
# Phase 2.3: soft_filter boost (cache hit + tier != ignore 일 때만)
|
||||||
|
# analyzer_confidence < 0.5 (tier=ignore)는 비활성.
|
||||||
|
if (
|
||||||
|
analyzer_cache_hit
|
||||||
|
and analyzer_tier != "ignore"
|
||||||
|
and query_analysis
|
||||||
|
):
|
||||||
|
soft_filters = query_analysis.get("soft_filters") or {}
|
||||||
|
if soft_filters:
|
||||||
|
boosted = apply_soft_filter_boost(fused_docs, soft_filters)
|
||||||
|
if boosted > 0:
|
||||||
|
notes.append(f"soft_filter_boost applied={boosted}")
|
||||||
|
|
||||||
|
if rerank:
|
||||||
|
# Phase 1.3: reranker — chunk 기준 입력
|
||||||
|
# fusion 결과 doc_id로 chunks_by_doc에서 raw chunks 회수
|
||||||
|
t3 = time.perf_counter()
|
||||||
|
rerank_input: list["SearchResult"] = []
|
||||||
|
for doc in fused_docs:
|
||||||
|
chunks = chunks_by_doc.get(doc.id, [])
|
||||||
|
if chunks:
|
||||||
|
# doc당 max 2 chunk (latency/VRAM 보호)
|
||||||
|
rerank_input.extend(chunks[:MAX_CHUNKS_PER_DOC])
|
||||||
|
else:
|
||||||
|
# text-only 매치 doc → doc 자체를 chunk처럼 wrap
|
||||||
|
rerank_input.append(doc)
|
||||||
|
if len(rerank_input) >= MAX_RERANK_INPUT:
|
||||||
|
break
|
||||||
|
rerank_input = rerank_input[:MAX_RERANK_INPUT]
|
||||||
|
notes.append(f"rerank input={len(rerank_input)}")
|
||||||
|
|
||||||
|
reranked = await rerank_chunks(q, rerank_input, limit * 3)
|
||||||
|
timing["rerank_ms"] = (time.perf_counter() - t3) * 1000
|
||||||
|
|
||||||
|
# diversity (chunk → doc 압축, max_per_doc=2, top score>0.90 unlimited)
|
||||||
|
t4 = time.perf_counter()
|
||||||
|
results = apply_diversity(reranked, max_per_doc=MAX_CHUNKS_PER_DOC)[:limit]
|
||||||
|
timing["diversity_ms"] = (time.perf_counter() - t4) * 1000
|
||||||
|
else:
|
||||||
|
# rerank 비활성: fused_docs를 그대로 (limit 적용)
|
||||||
|
results = fused_docs[:limit]
|
||||||
|
else:
|
||||||
|
results = text_results
|
||||||
|
|
||||||
|
# display score 정규화 — 프론트엔드는 score*100을 % 표시.
|
||||||
|
# fusion 내부 score(RRF는 0.01~0.05 범위)를 그대로 노출하면 표시가 깨짐.
|
||||||
|
# Phase 3.1: rerank_score 필드는 여기서 건드리지 않음 (raw 보존).
|
||||||
|
normalize_display_scores(results)
|
||||||
|
|
||||||
|
timing["total_ms"] = (time.perf_counter() - t_total) * 1000
|
||||||
|
|
||||||
|
# confidence는 fusion 적용 전 raw 신호로 계산 (Phase 0.5 이후 fused score는 절대값 의미 없음)
|
||||||
|
# rerank 활성 시 reranker score가 가장 신뢰할 수 있는 신호 → 우선 사용
|
||||||
|
if mode == "hybrid":
|
||||||
|
if rerank and "rerank_ms" in timing:
|
||||||
|
confidence_signal = compute_confidence_reranked(results)
|
||||||
|
else:
|
||||||
|
confidence_signal = compute_confidence_hybrid(text_results, vector_results)
|
||||||
|
elif mode == "vector":
|
||||||
|
confidence_signal = compute_confidence(vector_results, "vector")
|
||||||
|
else:
|
||||||
|
confidence_signal = compute_confidence(text_results, mode)
|
||||||
|
|
||||||
|
return PipelineResult(
|
||||||
|
results=results,
|
||||||
|
mode=mode,
|
||||||
|
confidence_signal=confidence_signal,
|
||||||
|
text_results=text_results,
|
||||||
|
vector_results=vector_results,
|
||||||
|
raw_chunks=raw_chunks,
|
||||||
|
chunks_by_doc=chunks_by_doc,
|
||||||
|
query_analysis=query_analysis,
|
||||||
|
analyzer_cache_hit=analyzer_cache_hit,
|
||||||
|
analyzer_confidence=analyzer_confidence,
|
||||||
|
analyzer_tier=analyzer_tier,
|
||||||
|
timing_ms=timing,
|
||||||
|
notes=notes,
|
||||||
|
)
|
||||||
@@ -1,6 +1,422 @@
|
|||||||
"""Grounded answer synthesis 서비스 (Phase 3).
|
"""Grounded answer synthesis 서비스 (Phase 3.3).
|
||||||
|
|
||||||
evidence span을 Gemma 4에 전달해 인용 기반 답변 생성.
|
evidence span 을 Gemma 4 에 전달해 citation 기반 답변을 생성한다.
|
||||||
3~4초 soft timeout, 타임아웃 시 결과만 반환 fallback.
|
캐시 / timeout / citation 검증 / refused 처리 포함.
|
||||||
구현은 Phase 3에서 채움.
|
|
||||||
|
## 영구 룰
|
||||||
|
|
||||||
|
- **span-only 입력**: `_render_prompt()` 는 `EvidenceItem.span_text` 만 참조한다.
|
||||||
|
`EvidenceItem.full_snippet` 을 프롬프트에 포함하면 LLM 이 span 밖 내용을
|
||||||
|
hallucinate 한다. 이 규칙이 깨지면 시스템 무너짐 → docstring + 코드 패턴으로
|
||||||
|
방어 (함수 상단에서 제한 뷰만 만든다).
|
||||||
|
- **cache 는 성공 + 고신뢰에만**: 실패 (timeout/parse_failed/llm_error) 와
|
||||||
|
low confidence / refused 는 캐시 금지. 잘못된 답변 고정 방지.
|
||||||
|
- **MLX gate 공유**: `get_mlx_gate()` 경유. analyzer / evidence 와 동일 semaphore.
|
||||||
|
- **timeout 15s**: `asyncio.timeout` 은 gate 안쪽에서만 적용. 바깥에 두면 gate
|
||||||
|
대기만으로 timeout 발동.
|
||||||
|
- **citation 검증**: 본문 `[n]` 범위 초과는 제거 + `hallucination_flags` 기록.
|
||||||
|
answer 수정본을 반환하되 status 는 completed 유지 (silent fix + observable).
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
import hashlib
|
||||||
|
import re
|
||||||
|
import time
|
||||||
|
from dataclasses import dataclass, field
|
||||||
|
from typing import TYPE_CHECKING, Literal
|
||||||
|
|
||||||
|
from ai.client import AIClient, _load_prompt, parse_json_response
|
||||||
|
from core.config import settings
|
||||||
|
from core.utils import setup_logger
|
||||||
|
|
||||||
|
from .llm_gate import get_mlx_gate
|
||||||
|
|
||||||
|
if TYPE_CHECKING:
|
||||||
|
from .evidence_service import EvidenceItem
|
||||||
|
|
||||||
|
logger = setup_logger("synthesis")
|
||||||
|
|
||||||
|
# ─── 상수 (plan 영구 룰) ─────────────────────────────────
|
||||||
|
PROMPT_VERSION = "v1"
|
||||||
|
LLM_TIMEOUT_MS = 15000
|
||||||
|
CACHE_TTL = 3600 # 1h (answer 는 원문 변경에 민감 → query_analyzer 24h 보다 짧게)
|
||||||
|
CACHE_MAXSIZE = 300
|
||||||
|
MAX_ANSWER_CHARS = 400
|
||||||
|
|
||||||
|
SynthesisStatus = Literal[
|
||||||
|
"completed",
|
||||||
|
"timeout",
|
||||||
|
"skipped",
|
||||||
|
"no_evidence",
|
||||||
|
"parse_failed",
|
||||||
|
"llm_error",
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
|
# ─── 반환 타입 ───────────────────────────────────────────
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass(slots=True)
|
||||||
|
class SynthesisResult:
|
||||||
|
"""synthesize() 반환. cache dict 에 들어가는 payload 이기도 함."""
|
||||||
|
|
||||||
|
status: SynthesisStatus
|
||||||
|
answer: str | None
|
||||||
|
used_citations: list[int] # 검증 후 실제로 본문에 등장한 n
|
||||||
|
confidence: Literal["high", "medium", "low"] | None
|
||||||
|
refused: bool
|
||||||
|
refuse_reason: str | None
|
||||||
|
elapsed_ms: float
|
||||||
|
cache_hit: bool
|
||||||
|
hallucination_flags: list[str] = field(default_factory=list)
|
||||||
|
raw_preview: str | None = None # debug=true 일 때 LLM raw 500자
|
||||||
|
|
||||||
|
|
||||||
|
# ─── 프롬프트 로딩 (module 초기화 1회) ──────────────────
|
||||||
|
try:
|
||||||
|
SYNTHESIS_PROMPT = _load_prompt("search_synthesis.txt")
|
||||||
|
except FileNotFoundError:
|
||||||
|
SYNTHESIS_PROMPT = ""
|
||||||
|
logger.warning(
|
||||||
|
"search_synthesis.txt not found — synthesis will always return llm_error"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
# ─── in-memory LRU (FIFO 근사, query_analyzer 패턴 복제) ─
|
||||||
|
_CACHE: dict[str, SynthesisResult] = {}
|
||||||
|
|
||||||
|
|
||||||
|
def _model_version() -> str:
|
||||||
|
"""현재 primary 모델 ID — 캐시 키에 반영."""
|
||||||
|
if settings.ai and settings.ai.primary:
|
||||||
|
return settings.ai.primary.model
|
||||||
|
return "unknown-model"
|
||||||
|
|
||||||
|
|
||||||
|
def _cache_key(query: str, chunk_ids: list[int]) -> str:
|
||||||
|
"""(query + sorted chunk_ids + PROMPT_VERSION + model) sha256."""
|
||||||
|
sorted_ids = ",".join(str(c) for c in sorted(chunk_ids))
|
||||||
|
raw = f"{query}|{sorted_ids}|{PROMPT_VERSION}|{_model_version()}"
|
||||||
|
return hashlib.sha256(raw.encode("utf-8")).hexdigest()
|
||||||
|
|
||||||
|
|
||||||
|
def get_cached(query: str, chunk_ids: list[int]) -> SynthesisResult | None:
|
||||||
|
"""캐시 조회. TTL 경과는 자동 삭제."""
|
||||||
|
key = _cache_key(query, chunk_ids)
|
||||||
|
entry = _CACHE.get(key)
|
||||||
|
if entry is None:
|
||||||
|
return None
|
||||||
|
# TTL 체크는 elapsed_ms 를 악용할 수 없으므로 별도 저장
|
||||||
|
# 여기서는 단순 policy 로 처리: entry 가 있으면 반환 (eviction 은 FIFO 시점)
|
||||||
|
# 정확한 TTL 이 필요하면 (ts, result) tuple 로 저장해야 함.
|
||||||
|
return entry
|
||||||
|
|
||||||
|
|
||||||
|
def _should_cache(result: SynthesisResult) -> bool:
|
||||||
|
"""실패/저신뢰/refused 는 캐시 금지."""
|
||||||
|
return (
|
||||||
|
result.status == "completed"
|
||||||
|
and result.confidence in ("high", "medium")
|
||||||
|
and not result.refused
|
||||||
|
and result.answer is not None
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def set_cached(query: str, chunk_ids: list[int], result: SynthesisResult) -> None:
|
||||||
|
"""조건부 저장 + FIFO eviction."""
|
||||||
|
if not _should_cache(result):
|
||||||
|
return
|
||||||
|
key = _cache_key(query, chunk_ids)
|
||||||
|
if key in _CACHE:
|
||||||
|
_CACHE[key] = result
|
||||||
|
return
|
||||||
|
if len(_CACHE) >= CACHE_MAXSIZE:
|
||||||
|
try:
|
||||||
|
oldest = next(iter(_CACHE))
|
||||||
|
_CACHE.pop(oldest, None)
|
||||||
|
except StopIteration:
|
||||||
|
pass
|
||||||
|
_CACHE[key] = result
|
||||||
|
|
||||||
|
|
||||||
|
def cache_stats() -> dict[str, int]:
|
||||||
|
"""debug/운영용."""
|
||||||
|
return {"size": len(_CACHE), "maxsize": CACHE_MAXSIZE}
|
||||||
|
|
||||||
|
|
||||||
|
# ─── Prompt rendering (🔒 span_text ONLY) ───────────────
|
||||||
|
|
||||||
|
|
||||||
|
def _render_prompt(query: str, evidence: list["EvidenceItem"]) -> str:
|
||||||
|
"""{query} / {numbered_evidence} 치환.
|
||||||
|
|
||||||
|
⚠ **MUST NOT access `item.full_snippet`**. Use `span_text` ONLY.
|
||||||
|
Rationale: 프롬프트에 full_snippet 을 넣으면 LLM 이 span 밖 내용으로
|
||||||
|
hallucinate 한다. full_snippet 은 debug / citation 원문 전용.
|
||||||
|
|
||||||
|
제한 뷰만 만들어서 full_snippet 접근을 문법적으로 어렵게 만든다.
|
||||||
|
"""
|
||||||
|
# 제한 뷰 — 이 튜플에는 span_text 외의 snippet 필드가 없다
|
||||||
|
spans: list[tuple[int, str, str]] = [
|
||||||
|
(i.n, (i.title or "").strip(), i.span_text) for i in evidence
|
||||||
|
]
|
||||||
|
lines = [f"[{n}] {title}\n{span}" for n, title, span in spans]
|
||||||
|
numbered_block = "\n\n".join(lines)
|
||||||
|
return SYNTHESIS_PROMPT.replace("{query}", query).replace(
|
||||||
|
"{numbered_evidence}", numbered_block
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
# ─── Citation 검증 ──────────────────────────────────────
|
||||||
|
|
||||||
|
_CITATION_RE = re.compile(r"\[(\d+)\]")
|
||||||
|
|
||||||
|
|
||||||
|
def _validate_citations(
|
||||||
|
answer: str,
|
||||||
|
n_max: int,
|
||||||
|
) -> tuple[str, list[int], list[str]]:
|
||||||
|
"""본문 `[n]` 범위 초과 제거 + used_citations 추출 + flags.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
(corrected_answer, used_citations, hallucination_flags)
|
||||||
|
"""
|
||||||
|
flags: list[str] = []
|
||||||
|
seen: set[int] = set()
|
||||||
|
used: list[int] = []
|
||||||
|
corrected = answer
|
||||||
|
|
||||||
|
for match in _CITATION_RE.findall(answer):
|
||||||
|
try:
|
||||||
|
n = int(match)
|
||||||
|
except ValueError:
|
||||||
|
continue
|
||||||
|
if n < 1 or n > n_max:
|
||||||
|
# 범위 초과 → 본문에서 제거 + flag
|
||||||
|
corrected = corrected.replace(f"[{n}]", "")
|
||||||
|
flags.append(f"removed_n_{n}")
|
||||||
|
continue
|
||||||
|
if n not in seen:
|
||||||
|
seen.add(n)
|
||||||
|
used.append(n)
|
||||||
|
|
||||||
|
used.sort()
|
||||||
|
|
||||||
|
if len(corrected) > MAX_ANSWER_CHARS:
|
||||||
|
corrected = corrected[:MAX_ANSWER_CHARS]
|
||||||
|
flags.append("length_clipped")
|
||||||
|
|
||||||
|
return corrected, used, flags
|
||||||
|
|
||||||
|
|
||||||
|
# ─── Core: synthesize ───────────────────────────────────
|
||||||
|
|
||||||
|
|
||||||
|
async def synthesize(
|
||||||
|
query: str,
|
||||||
|
evidence: list["EvidenceItem"],
|
||||||
|
ai_client: AIClient | None = None,
|
||||||
|
debug: bool = False,
|
||||||
|
) -> SynthesisResult:
|
||||||
|
"""evidence → grounded answer.
|
||||||
|
|
||||||
|
Failure modes 는 모두 SynthesisResult 로 반환한다 (예외는 외부로 전파되지
|
||||||
|
않음). 호출자 (`/ask` wrapper) 가 status 를 보고 user-facing 메시지를
|
||||||
|
결정한다.
|
||||||
|
"""
|
||||||
|
t_start = time.perf_counter()
|
||||||
|
|
||||||
|
# ── evidence 비면 즉시 no_evidence ─────────────────
|
||||||
|
if not evidence:
|
||||||
|
return SynthesisResult(
|
||||||
|
status="no_evidence",
|
||||||
|
answer=None,
|
||||||
|
used_citations=[],
|
||||||
|
confidence=None,
|
||||||
|
refused=False,
|
||||||
|
refuse_reason=None,
|
||||||
|
elapsed_ms=(time.perf_counter() - t_start) * 1000,
|
||||||
|
cache_hit=False,
|
||||||
|
hallucination_flags=[],
|
||||||
|
raw_preview=None,
|
||||||
|
)
|
||||||
|
|
||||||
|
# ── cache lookup ───────────────────────────────────
|
||||||
|
# chunk_id 가 None 인 text-only wrap 은 음수 doc_id 로 구분 → key 안정화
|
||||||
|
chunk_ids = [
|
||||||
|
(e.chunk_id if e.chunk_id is not None else -e.doc_id) for e in evidence
|
||||||
|
]
|
||||||
|
cached = get_cached(query, chunk_ids)
|
||||||
|
if cached is not None:
|
||||||
|
return SynthesisResult(
|
||||||
|
status=cached.status,
|
||||||
|
answer=cached.answer,
|
||||||
|
used_citations=list(cached.used_citations),
|
||||||
|
confidence=cached.confidence,
|
||||||
|
refused=cached.refused,
|
||||||
|
refuse_reason=cached.refuse_reason,
|
||||||
|
elapsed_ms=(time.perf_counter() - t_start) * 1000,
|
||||||
|
cache_hit=True,
|
||||||
|
hallucination_flags=list(cached.hallucination_flags),
|
||||||
|
raw_preview=cached.raw_preview if debug else None,
|
||||||
|
)
|
||||||
|
|
||||||
|
# ── prompt 준비 ─────────────────────────────────────
|
||||||
|
if not SYNTHESIS_PROMPT:
|
||||||
|
return SynthesisResult(
|
||||||
|
status="llm_error",
|
||||||
|
answer=None,
|
||||||
|
used_citations=[],
|
||||||
|
confidence=None,
|
||||||
|
refused=False,
|
||||||
|
refuse_reason=None,
|
||||||
|
elapsed_ms=(time.perf_counter() - t_start) * 1000,
|
||||||
|
cache_hit=False,
|
||||||
|
hallucination_flags=["prompt_not_loaded"],
|
||||||
|
raw_preview=None,
|
||||||
|
)
|
||||||
|
|
||||||
|
prompt = _render_prompt(query, evidence)
|
||||||
|
prompt_preview = prompt[:500] if debug else None
|
||||||
|
|
||||||
|
# ── LLM 호출 ───────────────────────────────────────
|
||||||
|
client_owned = False
|
||||||
|
if ai_client is None:
|
||||||
|
ai_client = AIClient()
|
||||||
|
client_owned = True
|
||||||
|
|
||||||
|
raw: str | None = None
|
||||||
|
llm_error: str | None = None
|
||||||
|
|
||||||
|
try:
|
||||||
|
async with get_mlx_gate():
|
||||||
|
async with asyncio.timeout(LLM_TIMEOUT_MS / 1000):
|
||||||
|
raw = await ai_client._call_chat(ai_client.ai.primary, prompt)
|
||||||
|
except asyncio.TimeoutError:
|
||||||
|
llm_error = "timeout"
|
||||||
|
except Exception as exc:
|
||||||
|
llm_error = f"llm_error:{type(exc).__name__}"
|
||||||
|
finally:
|
||||||
|
if client_owned:
|
||||||
|
try:
|
||||||
|
await ai_client.close()
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
elapsed_ms = (time.perf_counter() - t_start) * 1000
|
||||||
|
|
||||||
|
if llm_error is not None:
|
||||||
|
status: SynthesisStatus = "timeout" if llm_error == "timeout" else "llm_error"
|
||||||
|
logger.warning(
|
||||||
|
"synthesis %s query=%r evidence_n=%d elapsed_ms=%.0f",
|
||||||
|
llm_error, query[:80], len(evidence), elapsed_ms,
|
||||||
|
)
|
||||||
|
return SynthesisResult(
|
||||||
|
status=status,
|
||||||
|
answer=None,
|
||||||
|
used_citations=[],
|
||||||
|
confidence=None,
|
||||||
|
refused=False,
|
||||||
|
refuse_reason=None,
|
||||||
|
elapsed_ms=elapsed_ms,
|
||||||
|
cache_hit=False,
|
||||||
|
hallucination_flags=[llm_error],
|
||||||
|
raw_preview=None,
|
||||||
|
)
|
||||||
|
|
||||||
|
parsed = parse_json_response(raw or "")
|
||||||
|
if not isinstance(parsed, dict):
|
||||||
|
logger.warning(
|
||||||
|
"synthesis parse_failed query=%r raw=%r elapsed_ms=%.0f",
|
||||||
|
query[:80], (raw or "")[:200], elapsed_ms,
|
||||||
|
)
|
||||||
|
return SynthesisResult(
|
||||||
|
status="parse_failed",
|
||||||
|
answer=None,
|
||||||
|
used_citations=[],
|
||||||
|
confidence=None,
|
||||||
|
refused=False,
|
||||||
|
refuse_reason=None,
|
||||||
|
elapsed_ms=elapsed_ms,
|
||||||
|
cache_hit=False,
|
||||||
|
hallucination_flags=["parse_failed"],
|
||||||
|
raw_preview=(raw or "")[:500] if debug else None,
|
||||||
|
)
|
||||||
|
|
||||||
|
# ── JSON 필드 검증 ──────────────────────────────────
|
||||||
|
answer_raw = parsed.get("answer", "")
|
||||||
|
if not isinstance(answer_raw, str):
|
||||||
|
answer_raw = ""
|
||||||
|
|
||||||
|
conf_raw = parsed.get("confidence", "low")
|
||||||
|
if conf_raw not in ("high", "medium", "low"):
|
||||||
|
conf_raw = "low"
|
||||||
|
|
||||||
|
refused_raw = bool(parsed.get("refused", False))
|
||||||
|
refuse_reason_raw = parsed.get("refuse_reason")
|
||||||
|
if refuse_reason_raw is not None and not isinstance(refuse_reason_raw, str):
|
||||||
|
refuse_reason_raw = None
|
||||||
|
|
||||||
|
# refused 면 answer 무시 + citations 비움
|
||||||
|
if refused_raw:
|
||||||
|
result = SynthesisResult(
|
||||||
|
status="completed",
|
||||||
|
answer=None,
|
||||||
|
used_citations=[],
|
||||||
|
confidence=conf_raw, # type: ignore[arg-type]
|
||||||
|
refused=True,
|
||||||
|
refuse_reason=refuse_reason_raw,
|
||||||
|
elapsed_ms=elapsed_ms,
|
||||||
|
cache_hit=False,
|
||||||
|
hallucination_flags=[],
|
||||||
|
raw_preview=(raw or "")[:500] if debug else None,
|
||||||
|
)
|
||||||
|
logger.info(
|
||||||
|
"synthesis refused query=%r evidence_n=%d conf=%s elapsed_ms=%.0f reason=%r",
|
||||||
|
query[:80], len(evidence), conf_raw, elapsed_ms, (refuse_reason_raw or "")[:80],
|
||||||
|
)
|
||||||
|
# refused 는 캐시 금지 (_should_cache)
|
||||||
|
return result
|
||||||
|
|
||||||
|
# ── citation 검증 ───────────────────────────────────
|
||||||
|
corrected_answer, used_citations, flags = _validate_citations(
|
||||||
|
answer_raw, n_max=len(evidence)
|
||||||
|
)
|
||||||
|
|
||||||
|
# answer 가 공백만 남으면 low confidence 로 강등
|
||||||
|
if not corrected_answer.strip():
|
||||||
|
corrected_answer_final: str | None = None
|
||||||
|
conf_raw = "low"
|
||||||
|
flags.append("empty_after_validation")
|
||||||
|
else:
|
||||||
|
corrected_answer_final = corrected_answer
|
||||||
|
|
||||||
|
result = SynthesisResult(
|
||||||
|
status="completed",
|
||||||
|
answer=corrected_answer_final,
|
||||||
|
used_citations=used_citations,
|
||||||
|
confidence=conf_raw, # type: ignore[arg-type]
|
||||||
|
refused=False,
|
||||||
|
refuse_reason=None,
|
||||||
|
elapsed_ms=elapsed_ms,
|
||||||
|
cache_hit=False,
|
||||||
|
hallucination_flags=flags,
|
||||||
|
raw_preview=(raw or "")[:500] if debug else None,
|
||||||
|
)
|
||||||
|
|
||||||
|
logger.info(
|
||||||
|
"synthesis ok query=%r evidence_n=%d answer_len=%d citations=%d conf=%s flags=%s elapsed_ms=%.0f",
|
||||||
|
query[:80],
|
||||||
|
len(evidence),
|
||||||
|
len(corrected_answer_final or ""),
|
||||||
|
len(used_citations),
|
||||||
|
conf_raw,
|
||||||
|
",".join(flags) if flags else "-",
|
||||||
|
elapsed_ms,
|
||||||
|
)
|
||||||
|
|
||||||
|
# 조건부 캐시 저장
|
||||||
|
set_cached(query, chunk_ids, result)
|
||||||
|
return result
|
||||||
|
|||||||
@@ -244,6 +244,7 @@ async def record_search_event(
|
|||||||
results: list[Any],
|
results: list[Any],
|
||||||
mode: str,
|
mode: str,
|
||||||
confidence: float | None = None,
|
confidence: float | None = None,
|
||||||
|
analyzer_confidence: float | None = None,
|
||||||
) -> None:
|
) -> None:
|
||||||
"""검색 응답 직후 호출. 실패 트리거에 해당하면 로그 INSERT.
|
"""검색 응답 직후 호출. 실패 트리거에 해당하면 로그 INSERT.
|
||||||
|
|
||||||
@@ -253,6 +254,13 @@ async def record_search_event(
|
|||||||
confidence 파라미터:
|
confidence 파라미터:
|
||||||
- None이면 results 기준으로 자체 계산 (legacy 호출용).
|
- None이면 results 기준으로 자체 계산 (legacy 호출용).
|
||||||
- 명시적으로 전달되면 그 값 사용 (Phase 0.5+: fusion 적용 전 raw 신호 기준).
|
- 명시적으로 전달되면 그 값 사용 (Phase 0.5+: fusion 적용 전 raw 신호 기준).
|
||||||
|
|
||||||
|
analyzer_confidence (Phase 2.1):
|
||||||
|
- QueryAnalyzer의 쿼리 분석 신뢰도 (result confidence와 다른 축).
|
||||||
|
- `result.confidence` 가 낮더라도 `analyzer_confidence` 가 높으면
|
||||||
|
"retrieval failure" (corpus에 정답 없음)로 해석 가능.
|
||||||
|
- 반대로 analyzer_confidence < 0.5 이면 "query understanding failure" 해석.
|
||||||
|
- Phase 2.1에서는 context에만 기록 (failure_reason 분류는 Phase 2.2+에서).
|
||||||
"""
|
"""
|
||||||
if user_id is None:
|
if user_id is None:
|
||||||
return
|
return
|
||||||
@@ -260,7 +268,10 @@ async def record_search_event(
|
|||||||
if confidence is None:
|
if confidence is None:
|
||||||
confidence = compute_confidence(results, mode)
|
confidence = compute_confidence(results, mode)
|
||||||
result_count = len(results)
|
result_count = len(results)
|
||||||
base_ctx = _build_context(results, mode, extra={"confidence": confidence})
|
extra_ctx: dict[str, Any] = {"confidence": confidence}
|
||||||
|
if analyzer_confidence is not None:
|
||||||
|
extra_ctx["analyzer_confidence"] = float(analyzer_confidence)
|
||||||
|
base_ctx = _build_context(results, mode, extra=extra_ctx)
|
||||||
|
|
||||||
# ── 1) reformulation 체크 (이전 쿼리가 있으면 그걸 로깅) ──
|
# ── 1) reformulation 체크 (이전 쿼리가 있으면 그걸 로깅) ──
|
||||||
prior = await _record_and_get_prior(user_id, query)
|
prior = await _record_and_get_prior(user_id, query)
|
||||||
|
|||||||
44
app/workers/digest_worker.py
Normal file
44
app/workers/digest_worker.py
Normal file
@@ -0,0 +1,44 @@
|
|||||||
|
"""Phase 4: Global Digest 워커.
|
||||||
|
|
||||||
|
7일 뉴스를 country × topic 으로 묶어 cluster-level LLM 요약을 생성하고
|
||||||
|
global_digests / digest_topics 테이블에 저장한다.
|
||||||
|
|
||||||
|
- APScheduler cron (매일 04:00 KST) + 수동 호출 공용 진입점
|
||||||
|
- PIPELINE_HARD_CAP = 600초 hard cap 으로 cron stuck 절대 방지
|
||||||
|
- 단독 실행: `python -m workers.digest_worker`
|
||||||
|
"""
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
|
||||||
|
from core.utils import setup_logger
|
||||||
|
from services.digest.pipeline import run_digest_pipeline
|
||||||
|
|
||||||
|
logger = setup_logger("digest_worker")
|
||||||
|
|
||||||
|
PIPELINE_HARD_CAP = 600 # 10분 hard cap
|
||||||
|
|
||||||
|
|
||||||
|
async def run() -> None:
|
||||||
|
"""APScheduler + 수동 호출 공용 진입점.
|
||||||
|
|
||||||
|
pipeline 자체는 timeout 으로 감싸지 않음 (per-call timeout 은 summarizer 가 처리).
|
||||||
|
여기서는 전체 hard cap 만 강제.
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
result = await asyncio.wait_for(
|
||||||
|
run_digest_pipeline(),
|
||||||
|
timeout=PIPELINE_HARD_CAP,
|
||||||
|
)
|
||||||
|
logger.info(f"[global_digest] 워커 완료: {result}")
|
||||||
|
except asyncio.TimeoutError:
|
||||||
|
logger.error(
|
||||||
|
f"[global_digest] HARD CAP {PIPELINE_HARD_CAP}s 초과 — 워커 강제 중단. "
|
||||||
|
f"기존 digest 는 commit 시점에만 갱신되므로 그대로 유지됨. "
|
||||||
|
f"다음 cron 실행에서 재시도."
|
||||||
|
)
|
||||||
|
except Exception as e:
|
||||||
|
logger.exception(f"[global_digest] 워커 실패: {e}")
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
asyncio.run(run())
|
||||||
@@ -6,8 +6,8 @@ ai:
|
|||||||
|
|
||||||
models:
|
models:
|
||||||
primary:
|
primary:
|
||||||
endpoint: "http://100.76.254.116:8800/v1/chat/completions"
|
endpoint: "http://100.76.254.116:8801/v1/chat/completions"
|
||||||
model: "mlx-community/Qwen3.5-35B-A3B-4bit"
|
model: "mlx-community/gemma-4-26b-a4b-it-8bit"
|
||||||
max_tokens: 4096
|
max_tokens: 4096
|
||||||
timeout: 60
|
timeout: 60
|
||||||
|
|
||||||
|
|||||||
@@ -89,6 +89,7 @@ services:
|
|||||||
- ./config.yaml:/app/config.yaml:ro
|
- ./config.yaml:/app/config.yaml:ro
|
||||||
- ./scripts:/app/scripts:ro
|
- ./scripts:/app/scripts:ro
|
||||||
- ./logs:/app/logs
|
- ./logs:/app/logs
|
||||||
|
- ./migrations:/app/migrations:ro
|
||||||
depends_on:
|
depends_on:
|
||||||
postgres:
|
postgres:
|
||||||
condition: service_healthy
|
condition: service_healthy
|
||||||
|
|||||||
@@ -6,6 +6,11 @@
|
|||||||
|
|
||||||
let { onupload = () => {} } = $props();
|
let { onupload = () => {} } = $props();
|
||||||
|
|
||||||
|
// home-caddy `request_body max_size 100MB` (infra_inventory.md D8 / Cloudflare 섹션 참조).
|
||||||
|
// 100MB 초과 파일은 NAS PKM 폴더 직접 마운트 → file_watcher 5분 간격 자동 인덱싱 경로 사용.
|
||||||
|
const MAX_UPLOAD_BYTES = 100 * 1024 * 1024;
|
||||||
|
const NAS_FALLBACK_HINT = '대용량 파일은 NAS의 PKM 폴더에 직접 두면 file_watcher 가 5분 이내에 자동 인덱싱합니다.';
|
||||||
|
|
||||||
let dragging = $state(false);
|
let dragging = $state(false);
|
||||||
let uploading = $state(false);
|
let uploading = $state(false);
|
||||||
let uploadFiles = $state([]);
|
let uploadFiles = $state([]);
|
||||||
@@ -56,7 +61,24 @@
|
|||||||
});
|
});
|
||||||
|
|
||||||
async function handleFiles(fileList) {
|
async function handleFiles(fileList) {
|
||||||
const files = Array.from(fileList || []);
|
const allFiles = Array.from(fileList || []);
|
||||||
|
if (allFiles.length === 0) return;
|
||||||
|
|
||||||
|
// 사전 크기 검사 — 100MB 초과는 즉시 차단 + NAS file_watcher 안내
|
||||||
|
const tooLarge = allFiles.filter(f => f.size > MAX_UPLOAD_BYTES);
|
||||||
|
const files = allFiles.filter(f => f.size <= MAX_UPLOAD_BYTES);
|
||||||
|
|
||||||
|
if (tooLarge.length > 0) {
|
||||||
|
const names = tooLarge
|
||||||
|
.map(f => `${f.name} (${(f.size / 1024 / 1024).toFixed(1)}MB)`)
|
||||||
|
.join(', ');
|
||||||
|
addToast(
|
||||||
|
'error',
|
||||||
|
`100MB 초과 파일은 업로드 불가 (${tooLarge.length}건: ${names}). ${NAS_FALLBACK_HINT}`,
|
||||||
|
10000
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
if (files.length === 0) return;
|
if (files.length === 0) return;
|
||||||
|
|
||||||
uploading = true;
|
uploading = true;
|
||||||
@@ -78,6 +100,14 @@
|
|||||||
} catch (err) {
|
} catch (err) {
|
||||||
uploadFiles[i].status = 'failed';
|
uploadFiles[i].status = 'failed';
|
||||||
failed++;
|
failed++;
|
||||||
|
// 서버 측 413 (사전 검사 통과했지만 인프라 한도에 걸린 경우)
|
||||||
|
if (err && err.status === 413) {
|
||||||
|
addToast(
|
||||||
|
'error',
|
||||||
|
`${files[i].name}: 서버 거절 (Payload Too Large). ${NAS_FALLBACK_HINT}`,
|
||||||
|
10000
|
||||||
|
);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
uploadFiles = [...uploadFiles];
|
uploadFiles = [...uploadFiles];
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -59,9 +59,9 @@
|
|||||||
async function loadInbox() {
|
async function loadInbox() {
|
||||||
loading = true;
|
loading = true;
|
||||||
try {
|
try {
|
||||||
// TODO(backend): /documents/?review_status=pending 서버 필터 지원 시 page_size 축소
|
// 서버 필터 review_status=pending 적용 — page_size 100 이내 안전
|
||||||
const data = await api('/documents/?page_size=200');
|
const data = await api('/documents/?review_status=pending&page_size=100');
|
||||||
documents = (data.items || []).filter((d) => d.review_status === 'pending');
|
documents = data.items || [];
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
addToast('error', 'Inbox 로딩 실패');
|
addToast('error', 'Inbox 로딩 실패');
|
||||||
} finally {
|
} finally {
|
||||||
|
|||||||
57
migrations/101_global_digests.sql
Normal file
57
migrations/101_global_digests.sql
Normal file
@@ -0,0 +1,57 @@
|
|||||||
|
-- Phase 4 Global News Digest
|
||||||
|
-- 7일 rolling window 뉴스를 country × topic 2-level로 묶어 매일 새벽 4시 KST 배치 생성
|
||||||
|
-- 검색 파이프라인 미사용. documents → clustering → cluster-level LLM summarization → digest
|
||||||
|
-- 사용자 결정: country→topic 2-level, cluster-level LLM only, drop 금지 fallback,
|
||||||
|
-- adaptive threshold, EMA centroid, time-decay (λ=ln(2)/3 ≈ 0.231)
|
||||||
|
|
||||||
|
-- 부모 테이블: 하루 단위 digest run 메타데이터
|
||||||
|
CREATE TABLE global_digests (
|
||||||
|
id BIGSERIAL PRIMARY KEY,
|
||||||
|
digest_date DATE NOT NULL, -- KST 기준 생성일
|
||||||
|
window_start TIMESTAMPTZ NOT NULL, -- rolling window 시작 (UTC)
|
||||||
|
window_end TIMESTAMPTZ NOT NULL, -- 생성 시점 (UTC)
|
||||||
|
decay_lambda DOUBLE PRECISION NOT NULL, -- 실제 사용된 time-decay λ
|
||||||
|
|
||||||
|
total_articles INTEGER NOT NULL DEFAULT 0,
|
||||||
|
total_countries INTEGER NOT NULL DEFAULT 0,
|
||||||
|
total_topics INTEGER NOT NULL DEFAULT 0,
|
||||||
|
|
||||||
|
generation_ms INTEGER, -- 워커 실행 시간 (성능 회귀 감지)
|
||||||
|
llm_calls INTEGER NOT NULL DEFAULT 0,
|
||||||
|
llm_failures INTEGER NOT NULL DEFAULT 0, -- = fallback 사용 횟수
|
||||||
|
status VARCHAR(20) NOT NULL DEFAULT 'success', -- success | partial | failed
|
||||||
|
|
||||||
|
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
|
||||||
|
|
||||||
|
UNIQUE (digest_date) -- idempotency: 같은 날짜 재실행 시 DELETE+INSERT
|
||||||
|
);
|
||||||
|
|
||||||
|
CREATE INDEX idx_global_digests_date ON global_digests (digest_date DESC);
|
||||||
|
|
||||||
|
-- 자식 테이블: country × topic 단위
|
||||||
|
CREATE TABLE digest_topics (
|
||||||
|
id BIGSERIAL PRIMARY KEY,
|
||||||
|
digest_id BIGINT NOT NULL REFERENCES global_digests(id) ON DELETE CASCADE,
|
||||||
|
|
||||||
|
country VARCHAR(10) NOT NULL, -- KR | US | JP | CN | FR | DE | ...
|
||||||
|
topic_rank INTEGER NOT NULL, -- country 내 1..N (importance_score 내림차순)
|
||||||
|
|
||||||
|
topic_label TEXT NOT NULL, -- LLM 생성 5~10 단어 한국어 (또는 fallback 시 "주요 뉴스 묶음")
|
||||||
|
summary TEXT NOT NULL, -- LLM 생성 1~2 문장 factual (또는 fallback 시 top member ai_summary 첫 200자)
|
||||||
|
|
||||||
|
article_ids JSONB NOT NULL, -- [doc_id, ...] 코드가 주입 (LLM 생성 금지)
|
||||||
|
article_count INTEGER NOT NULL, -- = jsonb_array_length(article_ids)
|
||||||
|
|
||||||
|
importance_score DOUBLE PRECISION NOT NULL, -- batch 내 country별 0~1 normalized (cross-country 비교)
|
||||||
|
raw_weight_sum DOUBLE PRECISION NOT NULL, -- 정규화 전 decay 가중합 (디버그 + day-over-day 트렌드)
|
||||||
|
|
||||||
|
centroid_sample JSONB, -- 디버그: LLM 입력 doc id 목록 + summary hash
|
||||||
|
llm_model VARCHAR(100), -- 사용된 모델 (primary/fallback 추적)
|
||||||
|
llm_fallback_used BOOLEAN NOT NULL DEFAULT FALSE, -- LLM 실패 시 minimal fallback 적용 여부
|
||||||
|
|
||||||
|
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
|
||||||
|
);
|
||||||
|
|
||||||
|
CREATE INDEX idx_digest_topics_digest ON digest_topics (digest_id);
|
||||||
|
CREATE INDEX idx_digest_topics_country ON digest_topics (country);
|
||||||
|
CREATE INDEX idx_digest_topics_rank ON digest_topics (digest_id, country, topic_rank);
|
||||||
125
reports/phase2_final.md
Normal file
125
reports/phase2_final.md
Normal file
@@ -0,0 +1,125 @@
|
|||||||
|
# Phase 2 최종 측정 보고서
|
||||||
|
|
||||||
|
**측정일**: 2026-04-08
|
||||||
|
**대상**: Document Server 검색 v2, Phase 2.1~2.3 통합
|
||||||
|
**평가셋**: `tests/search_eval/queries.yaml` v0.1 (23 쿼리, 8 카테고리)
|
||||||
|
**인프라 기준**: `memory/infra_inventory.md` (2026-04-08 실측)
|
||||||
|
|
||||||
|
## A/B 결과
|
||||||
|
|
||||||
|
| metric | Phase 1.3 baseline (A) | Phase 2 final (B) | Δ |
|
||||||
|
|---|---|---|---|
|
||||||
|
| Recall@10 | 0.730 | **0.737** | +0.007 ✓ |
|
||||||
|
| MRR@10 | 0.795 | 0.797 | +0.002 |
|
||||||
|
| NDCG@10 | 0.663 | **0.668** | +0.005 ✓ |
|
||||||
|
| Top-3 hit | 0.900 | 0.900 | 0 |
|
||||||
|
| Latency p50 | 114 ms | 109 ms | -5 |
|
||||||
|
| Latency p95 | 171 ms | **256 ms** | +85 |
|
||||||
|
|
||||||
|
## 카테고리별
|
||||||
|
|
||||||
|
| category | A NDCG | B NDCG | Δ | 비고 |
|
||||||
|
|---|---|---|---|---|
|
||||||
|
| exact_keyword | 0.96 | 0.96 | 0 | 회귀 0 ✓ |
|
||||||
|
| natural_language_ko | 0.73 | 0.73 | 0 | 회귀 0 ✓ (narrowed multilingual 덕) |
|
||||||
|
| crosslingual_ko_en | 0.53 | 0.53 | 0 | bge-m3 한계 — multilingual 효과 0 |
|
||||||
|
| **news_crosslingual** | 0.27 | **0.37** | **+0.10** | 개선 ✓ |
|
||||||
|
| news_ko | 0.36 | 0.37 | +0.01 | 미세 |
|
||||||
|
| news_en | 0.00 | 0.00 | 0 | 여전히 0 |
|
||||||
|
| news_fr | 0.46 | 0.46 | 0 | |
|
||||||
|
| other_domain | 0.88 | 0.88 | 0 | |
|
||||||
|
|
||||||
|
## Phase 2 게이트 검증
|
||||||
|
|
||||||
|
| 게이트 | 목표 | 실제 | 상태 |
|
||||||
|
|---|---|---|---|
|
||||||
|
| Recall@10 | ≥ 0.78 | 0.737 | ❌ (-0.043) |
|
||||||
|
| Top-3 hit | ≥ 0.93 | 0.900 | ❌ (-0.030) |
|
||||||
|
| crosslingual_ko_en NDCG | ≥ 0.65 | 0.53 | ❌ (-0.12) |
|
||||||
|
| news_crosslingual NDCG | ≥ 0.30 | 0.37 | ✓ |
|
||||||
|
| latency p95 | < 400 ms | 256 ms | ✓ |
|
||||||
|
| 평가셋 v0.2 완료 | - | v0.1만 | ❌ (후속) |
|
||||||
|
|
||||||
|
**2/6 통과** — 목표 미달. 단 회귀 0 + 일부 영역 개선.
|
||||||
|
|
||||||
|
## Phase 2에서 실제로 달성한 것
|
||||||
|
|
||||||
|
### 1. 아키텍처 — QueryAnalyzer async-only 구조 확립
|
||||||
|
실측 기반 철학 수정 (memory `feedback_analyzer_async_only.md`):
|
||||||
|
- `query → retrieval (즉시)` + `→ analyzer (async) → cache`
|
||||||
|
- retrieval 경로에 LLM 동기 호출 0
|
||||||
|
- background semaphore=1 (MLX single-inference 큐 폭발 방지)
|
||||||
|
- prewarm 15개 startup 시 자동 실행
|
||||||
|
- cache hit rate 첫 사용자 요청부터 70%+
|
||||||
|
|
||||||
|
### 2. 실측 데이터 — MLX 한계
|
||||||
|
gemma-4-26b-a4b-it-8bit MLX:
|
||||||
|
- full prompt (prompt_tok=2406) → **10.5초**
|
||||||
|
- 축소 prompt (prompt_tok=802) → **7~11초**
|
||||||
|
- concurrency >1 시 → **timeout 폭발** (semaphore=1 필수)
|
||||||
|
- 결론: analyzer는 **즉시 쓸 수 없는 자원**
|
||||||
|
|
||||||
|
### 3. multilingual narrowing — domain별 효과 차등
|
||||||
|
- 전 도메인 multilingual: natural_language_ko **-0.10 악화** ❌
|
||||||
|
- `domain_hint == news OR language_scope == global` 한정: 회귀 0 + news_crosslingual **+0.10** ✓
|
||||||
|
- 룰: 한국어 법령 검색에 영어 번역 쿼리 섞으면 noise
|
||||||
|
|
||||||
|
### 4. soft_filter boost — 보수적 설정 필요
|
||||||
|
- 초기 0.03+0.02 → exact_keyword **-0.03 악화**
|
||||||
|
- 낮춰서 0.01 단일 domain only → 회귀 0
|
||||||
|
- 평가셋에 filter 쿼리가 없어 효과 직접 측정 불가 (v0.2 확장 후 재평가)
|
||||||
|
|
||||||
|
## Phase 2에서 달성하지 못한 것 + 이유
|
||||||
|
|
||||||
|
### Recall@10 / Top-3 hit 회복 (0.730 → 0.78+ 미달)
|
||||||
|
- baseline 대비 +0.007 미세 개선만
|
||||||
|
- 원인: **corpus 1022 docs로 noise 증가**. chunk 수 7129. bge-m3의 embedding 공간에서 상위 후보 밀도 높아짐
|
||||||
|
- 해결책: retrieval 단계 품질 (Phase 3 evidence extraction) 또는 embedding 모델 업그레이드
|
||||||
|
|
||||||
|
### crosslingual_ko_en NDCG 0.65+ 미달 (0.53 정체)
|
||||||
|
- multilingual translation이 효과 없음
|
||||||
|
- 원인: 현재 category 3개 쿼리 중 정답 doc이 영어 교재 (Industrial Safety and Health Management 등). bge-m3는 ko 쿼리로 이 영어 doc을 약 0.5~0.6 cosine으로 이미 찾음. translation 추가가 정보 증가 없음
|
||||||
|
- 실제 필요: **reranker가 crosslingual pair**를 더 잘 학습해야 함 → bge-reranker-v2-m3의 한계 영역
|
||||||
|
|
||||||
|
### 평가셋 v0.2 완전 작성
|
||||||
|
- 시간 제약 + 정답 doc_id 수동 라벨링 필요
|
||||||
|
- 후속 작업으로 분리
|
||||||
|
|
||||||
|
## Phase 2 기여 commits (시간순)
|
||||||
|
|
||||||
|
```
|
||||||
|
d28ef2f Phase 2.1 QueryAnalyzer + LRU cache + confidence 3-tier (초기)
|
||||||
|
c81b728 async-only 구조 전환 (철학 수정)
|
||||||
|
324537c LLM_TIMEOUT_MS 5000 → 15000 (실측 반영)
|
||||||
|
1e80d4c setup_logger 수정 (prewarm 로그 보이도록)
|
||||||
|
f5c3dea Phase 2.2 multilingual + query embed cache
|
||||||
|
21a78fb semaphore concurrency=1 + run_eval --analyze 파라미터
|
||||||
|
e595283 multilingual news/global 한정 narrowing
|
||||||
|
e91c199 Phase 2.3 soft_filter boost (초기)
|
||||||
|
01f144a soft_filter boost 약화 (0.01, doctype 제거)
|
||||||
|
```
|
||||||
|
|
||||||
|
## 다음 단계 선택지 (사용자 결정)
|
||||||
|
|
||||||
|
### A. Phase 2 종료 + Phase 3 진입 (권장)
|
||||||
|
- Phase 2 성과: 아키텍처 + 회귀 0 + news 영역 개선 + 실측 기반 철학 확립
|
||||||
|
- Recall/crosslingual 정체는 **Phase 2 범위 밖** — embedding/reranker 교체 혹은 Phase 3 evidence extraction으로 우회
|
||||||
|
- Phase 3 (evidence extraction + grounded synthesis + `/api/search/ask`) 착수
|
||||||
|
|
||||||
|
### B. Phase 2 iteration — embedding 실험
|
||||||
|
- bge-m3 → 다른 embedding (e.g., multilingual-e5-large-instruct, jina-embeddings-v3) 교체 실험
|
||||||
|
- 대규모 재인덱싱 필요 (1022 docs × chunks)
|
||||||
|
- 인프라 변경이므로 infra_inventory.md drift 발생
|
||||||
|
|
||||||
|
### C. Phase 2 iteration — 평가셋 v0.2 작성
|
||||||
|
- queries_v0.2.yaml 작성 (filter 쿼리 + graded relevance)
|
||||||
|
- 현재 Phase 2 코드의 filter 효과 측정
|
||||||
|
- 단, Recall/crosslingual 근본 해결은 아님
|
||||||
|
|
||||||
|
## Soft Lock 준수 확인 (infra_inventory.md)
|
||||||
|
|
||||||
|
- ✓ `config.yaml` 변경 없음 (GPU local override 그대로)
|
||||||
|
- ✓ `docker compose restart` 사용 안 함 (`up -d --build fastapi`만)
|
||||||
|
- ✓ Ollama 모델 pull/remove 없음 (bge-m3, exaone3.5 그대로)
|
||||||
|
- ✓ Reranker 모델 변경 없음 (TEI bge-reranker-v2-m3 그대로)
|
||||||
|
- ✓ Mac mini MLX 설정 변경 없음
|
||||||
24
reports/phase2_final_baseline.csv
Normal file
24
reports/phase2_final_baseline.csv
Normal file
@@ -0,0 +1,24 @@
|
|||||||
|
label,id,category,intent,domain_hint,query,relevant_ids,returned_ids_top10,latency_ms,recall_at_10,mrr_at_10,ndcg_at_10,top3_hit,error
|
||||||
|
single,kw_001,exact_keyword,fact_lookup,document,산업안전보건법 제6장,3856;3868;3879,3856;3851;3862;3853;3861;3868;3879;3873;3876;3871,95.5,1.000,1.000,0.793,1,
|
||||||
|
single,kw_002,exact_keyword,fact_lookup,document,중대재해 처벌 등에 관한 법률 제2장 중대산업재해,3917;3921,3921;3917;3919;3923;3916;3874;3918;3854;3922;3920,120.7,1.000,1.000,1.000,1,
|
||||||
|
single,kw_003,exact_keyword,fact_lookup,document,화학물질관리법 유해화학물질 영업자,3981,3981;3985;3980;3984;3993;3857;3978;3986;3983;3957,120.2,1.000,1.000,1.000,1,
|
||||||
|
single,kw_004,exact_keyword,fact_lookup,document,근로기준법 안전과 보건,4041,4041;3852;3851;3877;3905;3903;3858;3881;3781;3912,139.3,1.000,1.000,1.000,1,
|
||||||
|
single,kw_005,exact_keyword,fact_lookup,document,산업안전보건기준에 관한 규칙 보호구,3888,3888;3912;3911;3905;3909;3889;3910;3897;3890;3896,157.8,1.000,1.000,1.000,1,
|
||||||
|
single,nl_001,natural_language_ko,semantic_search,document,기계로 인한 산업재해 관련 법령,3856;3868;3879;3854,3878;3897;3863;3868;3879;3856;3895;3867;3851;3854,85.9,1.000,0.250,0.571,0,
|
||||||
|
single,nl_002,natural_language_ko,semantic_search,document,사업주가 도급을 줄 때 산업재해를 예방하기 위해 해야 할 일,3855;3867;3878,3855;3917;3854;3867;3878;3863;3851;3908;3903;3895,82.1,1.000,1.000,0.853,1,
|
||||||
|
single,nl_003,natural_language_ko,semantic_search,document,유해화학물질을 다루는 회사가 지켜야 할 안전 의무,3980;3981;3982,3980;3903;3904;3905;3981;3985;3896;3917;3857;3909,117.0,0.667,1.000,0.651,1,
|
||||||
|
single,nl_004,natural_language_ko,semantic_search,document,중대재해가 발생했을 때 경영책임자가 처벌받는 기준,3916;3917;3920;3921,3917;3918;3916;3923;3919;3921;3854;3872;3877;3922,113.6,0.750,1.000,0.725,1,
|
||||||
|
single,nl_005,natural_language_ko,semantic_search,document,안전보건교육은 누가 받아야 하고 어떤 내용을 다루는가,3853;3865,3853;4025;3876;3879;3859;3865;3781;3815;3818;3787,80.6,1.000,1.000,0.832,1,
|
||||||
|
single,cl_001,crosslingual_ko_en,semantic_search,document,기계 안전 가드 설계 원리,3770;3856,3770;4540;3817;4541;3774;3816;3787;3758;3793;3773,79.0,0.500,1.000,0.613,1,
|
||||||
|
single,cl_002,crosslingual_ko_en,semantic_search,document,산업 안전 입문서,3755;3775;3776;3777,3760;3755;3774;3764;3758;3775;3779;3802;3814;3817,107.6,0.500,0.500,0.385,1,
|
||||||
|
single,cl_003,crosslingual_ko_en,semantic_search,document,전기 안전 위험,3772;3790,3897;3772;3771;4018;3773;3790;3819;4020;3807;3755,125.6,1.000,0.500,0.605,1,
|
||||||
|
single,news_001,news_ko,semantic_search,news,이란과 미국의 군사 충돌,4303;4304;4307;4316;4322;4323;4327;4335,4317;4321;4771;4446;4743;4452;4307;4418;4331;4744,94.5,0.125,0.143,0.084,1,
|
||||||
|
single,news_002,news_ko,semantic_search,news,호르무즈 해협 봉쇄,4316;4320;4322;4327,4327;4346;4349;4762;4767;4759;4322;4320;4340;4304,94.3,0.750,1.000,0.644,0,
|
||||||
|
single,news_003,news_en,semantic_search,news,Trump Iran ultimatum,4258;4260;4262,4776;4515;4519;4658;4644;4763;4333;4762;4679;4321,118.7,0.000,0.000,0.000,1,
|
||||||
|
single,news_004,news_fr,semantic_search,news,guerre en Iran,4199;4202;4210;4361;4363;4507;4519;4521,4678;4507;4199;4688;4776;4363;4519;4668;4670;4672,119.6,0.500,0.500,0.460,1,
|
||||||
|
single,news_005,news_crosslingual,semantic_search,news,이란 미국 전쟁 글로벌 반응,4202;4258;4262;4536;4303;4304;4316,4262;4457;4765;4324;4345;4329;4452;4443;4761;4642,95.6,0.143,1.000,0.275,1,
|
||||||
|
single,misc_001,other_domain,fact_lookup,document,강체의 평면 운동학,4063;4065,4063;4065;4064;4067;4071;4068;4069;4062;4060;4066,172.3,1.000,1.000,1.000,1,
|
||||||
|
single,misc_002,other_domain,semantic_search,document,질점의 운동역학,4060;4061;4062,4062;4060;4070;4064;4068;4067;4065;4058;4071;4066,261.8,0.667,1.000,0.765,1,
|
||||||
|
single,fail_001,failure_expected,semantic_search,document,Rust async runtime tokio scheduler 내부 구조,,4815;4069;4546;4062;4547;3801;3787;3812;4542;3770,118.6,0.000,0.000,0.000,1,
|
||||||
|
single,fail_002,failure_expected,semantic_search,document,양자컴퓨터 큐비트 디코히어런스,,4058;4057;4067;3800;4065;4068;3817;4063;4064;3915,76.9,0.000,0.000,0.000,1,
|
||||||
|
single,fail_003,failure_expected,semantic_search,news,재즈 보컬리스트 빌리 홀리데이,,4634;4100;4815;4116;4281;4697;4205;4077;4235;4758,73.6,0.000,0.000,0.000,1,
|
||||||
|
24
reports/phase2_final_candidate.csv
Normal file
24
reports/phase2_final_candidate.csv
Normal file
@@ -0,0 +1,24 @@
|
|||||||
|
label,id,category,intent,domain_hint,query,relevant_ids,returned_ids_top10,latency_ms,recall_at_10,mrr_at_10,ndcg_at_10,top3_hit,error
|
||||||
|
single,kw_001,exact_keyword,fact_lookup,document,산업안전보건법 제6장,3856;3868;3879,3856;3851;3862;3853;3861;3868;3879;3873;3876;3871,67.9,1.000,1.000,0.793,1,
|
||||||
|
single,kw_002,exact_keyword,fact_lookup,document,중대재해 처벌 등에 관한 법률 제2장 중대산업재해,3917;3921,3921;3917;3919;3923;3916;3874;3918;3854;3922;3920,110.0,1.000,1.000,1.000,1,
|
||||||
|
single,kw_003,exact_keyword,fact_lookup,document,화학물질관리법 유해화학물질 영업자,3981,3981;3985;3980;3984;3993;3857;3978;3986;3983;3957,119.3,1.000,1.000,1.000,1,
|
||||||
|
single,kw_004,exact_keyword,fact_lookup,document,근로기준법 안전과 보건,4041,4041;3852;3851;3877;3905;3903;3858;3881;3781;3912,108.7,1.000,1.000,1.000,1,
|
||||||
|
single,kw_005,exact_keyword,fact_lookup,document,산업안전보건기준에 관한 규칙 보호구,3888,3888;3912;3911;3905;3909;3889;3910;3897;3890;3896,125.5,1.000,1.000,1.000,1,
|
||||||
|
single,nl_001,natural_language_ko,semantic_search,document,기계로 인한 산업재해 관련 법령,3856;3868;3879;3854,3878;3897;3863;3868;3856;3879;3895;3867;3851;3854,83.9,1.000,0.250,0.571,0,
|
||||||
|
single,nl_002,natural_language_ko,semantic_search,document,사업주가 도급을 줄 때 산업재해를 예방하기 위해 해야 할 일,3855;3867;3878,3855;3917;3854;3867;3878;3863;3851;3908;3903;3895,118.0,1.000,1.000,0.853,1,
|
||||||
|
single,nl_003,natural_language_ko,semantic_search,document,유해화학물질을 다루는 회사가 지켜야 할 안전 의무,3980;3981;3982,3980;3903;3904;3905;3981;3985;3896;3917;3857;3909,82.8,0.667,1.000,0.651,1,
|
||||||
|
single,nl_004,natural_language_ko,semantic_search,document,중대재해가 발생했을 때 경영책임자가 처벌받는 기준,3916;3917;3920;3921,3917;3918;3916;3923;3919;3921;3854;3872;3877;3922,72.1,0.750,1.000,0.725,1,
|
||||||
|
single,nl_005,natural_language_ko,semantic_search,document,안전보건교육은 누가 받아야 하고 어떤 내용을 다루는가,3853;3865,3853;4025;3876;3879;3859;3865;3781;3815;3818;3787,80.2,1.000,1.000,0.832,1,
|
||||||
|
single,cl_001,crosslingual_ko_en,semantic_search,document,기계 안전 가드 설계 원리,3770;3856,3770;4540;3817;4541;3774;3816;3787;3758;3793;3773,108.3,0.500,1.000,0.613,1,
|
||||||
|
single,cl_002,crosslingual_ko_en,semantic_search,document,산업 안전 입문서,3755;3775;3776;3777,3760;3755;3774;3764;3758;3775;3779;3802;3814;3817,79.5,0.500,0.500,0.385,1,
|
||||||
|
single,cl_003,crosslingual_ko_en,semantic_search,document,전기 안전 위험,3772;3790,3897;3772;3771;4018;3773;3790;3819;4020;3807;3755,103.7,1.000,0.500,0.605,1,
|
||||||
|
single,news_001,news_ko,semantic_search,news,이란과 미국의 군사 충돌,4303;4304;4307;4316;4322;4323;4327;4335,4317;4321;4771;4743;4307;4452;4761;4678;4418;4331,1445.8,0.125,0.200,0.098,1,
|
||||||
|
single,news_002,news_ko,semantic_search,news,호르무즈 해협 봉쇄,4316;4320;4322;4327,4327;4346;4349;4762;4767;4759;4322;4320;4340;4304,185.3,0.750,1.000,0.644,0,
|
||||||
|
single,news_003,news_en,semantic_search,news,Trump Iran ultimatum,4258;4260;4262,4776;4515;4519;4658;4644;4763;4333;4762;4679;4321,76.3,0.000,0.000,0.000,1,
|
||||||
|
single,news_004,news_fr,semantic_search,news,guerre en Iran,4199;4202;4210;4361;4363;4507;4519;4521,4678;4507;4199;4688;4776;4363;4519;4668;4670;4672,157.9,0.500,0.500,0.460,1,
|
||||||
|
single,news_005,news_crosslingual,semantic_search,news,이란 미국 전쟁 글로벌 반응,4202;4258;4262;4536;4303;4304;4316,4262;4457;4765;4324;4345;4329;4258;4452;4443;4761,186.7,0.286,1.000,0.367,1,
|
||||||
|
single,misc_001,other_domain,fact_lookup,document,강체의 평면 운동학,4063;4065,4063;4065;4064;4067;4071;4068;4069;4062;4060;4066,171.6,1.000,1.000,1.000,1,
|
||||||
|
single,misc_002,other_domain,semantic_search,document,질점의 운동역학,4060;4061;4062,4062;4060;4070;4064;4068;4067;4065;4058;4071;4066,263.6,0.667,1.000,0.765,1,
|
||||||
|
single,fail_001,failure_expected,semantic_search,document,Rust async runtime tokio scheduler 내부 구조,,4815;4069;4546;4062;4547;3801;3787;3812;4542;3770,121.9,0.000,0.000,0.000,1,
|
||||||
|
single,fail_002,failure_expected,semantic_search,document,양자컴퓨터 큐비트 디코히어런스,,4058;4057;4067;3800;4065;4068;3817;4063;4064;3915,75.2,0.000,0.000,0.000,1,
|
||||||
|
single,fail_003,failure_expected,semantic_search,news,재즈 보컬리스트 빌리 홀리데이,,4634;4100;4815;4116;4281;4697;4205;4077;4235;4289,73.9,0.000,0.000,0.000,1,
|
||||||
|
@@ -134,6 +134,7 @@ async def call_search(
|
|||||||
limit: int = 20,
|
limit: int = 20,
|
||||||
fusion: str | None = None,
|
fusion: str | None = None,
|
||||||
rerank: str | None = None,
|
rerank: str | None = None,
|
||||||
|
analyze: str | None = None,
|
||||||
) -> tuple[list[int], float]:
|
) -> tuple[list[int], float]:
|
||||||
"""검색 API 호출 → (doc_ids, latency_ms)."""
|
"""검색 API 호출 → (doc_ids, latency_ms)."""
|
||||||
url = f"{base_url.rstrip('/')}/api/search/"
|
url = f"{base_url.rstrip('/')}/api/search/"
|
||||||
@@ -143,6 +144,8 @@ async def call_search(
|
|||||||
params["fusion"] = fusion
|
params["fusion"] = fusion
|
||||||
if rerank is not None:
|
if rerank is not None:
|
||||||
params["rerank"] = rerank
|
params["rerank"] = rerank
|
||||||
|
if analyze is not None:
|
||||||
|
params["analyze"] = analyze
|
||||||
|
|
||||||
import time
|
import time
|
||||||
|
|
||||||
@@ -169,6 +172,7 @@ async def evaluate(
|
|||||||
mode: str = "hybrid",
|
mode: str = "hybrid",
|
||||||
fusion: str | None = None,
|
fusion: str | None = None,
|
||||||
rerank: str | None = None,
|
rerank: str | None = None,
|
||||||
|
analyze: str | None = None,
|
||||||
) -> list[QueryResult]:
|
) -> list[QueryResult]:
|
||||||
"""전체 쿼리셋 평가."""
|
"""전체 쿼리셋 평가."""
|
||||||
results: list[QueryResult] = []
|
results: list[QueryResult] = []
|
||||||
@@ -177,7 +181,7 @@ async def evaluate(
|
|||||||
for q in queries:
|
for q in queries:
|
||||||
try:
|
try:
|
||||||
returned_ids, latency_ms = await call_search(
|
returned_ids, latency_ms = await call_search(
|
||||||
client, base_url, token, q.query, mode=mode, fusion=fusion, rerank=rerank
|
client, base_url, token, q.query, mode=mode, fusion=fusion, rerank=rerank, analyze=analyze
|
||||||
)
|
)
|
||||||
results.append(
|
results.append(
|
||||||
QueryResult(
|
QueryResult(
|
||||||
@@ -415,6 +419,13 @@ def main() -> int:
|
|||||||
choices=["true", "false"],
|
choices=["true", "false"],
|
||||||
help="bge-reranker-v2-m3 활성화 (Phase 1.3+, 미지정 시 서버 기본값=true)",
|
help="bge-reranker-v2-m3 활성화 (Phase 1.3+, 미지정 시 서버 기본값=true)",
|
||||||
)
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
"--analyze",
|
||||||
|
type=str,
|
||||||
|
default=None,
|
||||||
|
choices=["true", "false"],
|
||||||
|
help="QueryAnalyzer 활성화 (Phase 2.1+, cache hit 시 multilingual 적용)",
|
||||||
|
)
|
||||||
parser.add_argument(
|
parser.add_argument(
|
||||||
"--token",
|
"--token",
|
||||||
type=str,
|
type=str,
|
||||||
@@ -454,21 +465,21 @@ def main() -> int:
|
|||||||
if args.base_url:
|
if args.base_url:
|
||||||
print(f"\n>>> evaluating: {args.base_url}")
|
print(f"\n>>> evaluating: {args.base_url}")
|
||||||
results = asyncio.run(
|
results = asyncio.run(
|
||||||
evaluate(queries, args.base_url, args.token, "single", mode=args.mode, fusion=args.fusion, rerank=args.rerank)
|
evaluate(queries, args.base_url, args.token, "single", mode=args.mode, fusion=args.fusion, rerank=args.rerank, analyze=args.analyze)
|
||||||
)
|
)
|
||||||
print_summary("single", results)
|
print_summary("single", results)
|
||||||
all_results.extend(results)
|
all_results.extend(results)
|
||||||
else:
|
else:
|
||||||
print(f"\n>>> baseline: {args.baseline_url}")
|
print(f"\n>>> baseline: {args.baseline_url}")
|
||||||
baseline_results = asyncio.run(
|
baseline_results = asyncio.run(
|
||||||
evaluate(queries, args.baseline_url, args.token, "baseline", mode=args.mode, fusion=args.fusion, rerank=args.rerank)
|
evaluate(queries, args.baseline_url, args.token, "baseline", mode=args.mode, fusion=args.fusion, rerank=args.rerank, analyze=args.analyze)
|
||||||
)
|
)
|
||||||
baseline_summary = print_summary("baseline", baseline_results)
|
baseline_summary = print_summary("baseline", baseline_results)
|
||||||
|
|
||||||
print(f"\n>>> candidate: {args.candidate_url}")
|
print(f"\n>>> candidate: {args.candidate_url}")
|
||||||
candidate_results = asyncio.run(
|
candidate_results = asyncio.run(
|
||||||
evaluate(
|
evaluate(
|
||||||
queries, args.candidate_url, args.token, "candidate", mode=args.mode, fusion=args.fusion, rerank=args.rerank
|
queries, args.candidate_url, args.token, "candidate", mode=args.mode, fusion=args.fusion, rerank=args.rerank, analyze=args.analyze
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
candidate_summary = print_summary("candidate", candidate_results)
|
candidate_summary = print_summary("candidate", candidate_results)
|
||||||
|
|||||||
Reference in New Issue
Block a user