데이터 흐름 원칙: fusion=doc 기준 / reranker=chunk 기준 — 절대 섞지 말 것.
신규/수정:
- ai/client.py: rerank() 메서드 추가 (TEI POST /rerank API)
- services/search/rerank_service.py:
- rerank_chunks() — asyncio.Semaphore(2) + 5s soft timeout + RRF fallback
- _make_snippet/_extract_window — title + query 중심 200~400 토큰
(keyword 매치 없으면 첫 800자 fallback)
- apply_diversity() — max_per_doc=2, top score>=0.90 unlimited
- warmup_reranker() — 10회 retry + 3초 간격 (TEI 모델 로딩 대기)
- MAX_RERANK_INPUT=200, MAX_CHUNKS_PER_DOC=2 hard cap
- services/search_telemetry.py: compute_confidence_reranked() — sigmoid score 임계값
- api/search.py:
- ?rerank=true|false 파라미터 (기본 true, hybrid 모드만)
- 흐름: fused_docs(limit*5) → chunks_by_doc 회수 → rerank_chunks → apply_diversity
- text-only 매치 doc은 doc 자체를 chunk처럼 wrap (fallback)
- rerank 활성 시 confidence는 reranker score 기반
- tests/search_eval/run_eval.py: --rerank true|false 플래그
GPU 적용 보류:
- TEI 컨테이너 추가 (docker-compose.yml) — 별도 작업
- config.yaml rerank.endpoint 갱신 — GPU 직접 (commit 없음)
- 재인덱싱 완료 후 build + warmup + 평가셋 측정
157 lines
5.8 KiB
Python
157 lines
5.8 KiB
Python
"""AI 추상화 레이어 — 통합 클라이언트. 기본값은 항상 Qwen3.5."""
|
|
|
|
import json
|
|
import re
|
|
from pathlib import Path
|
|
|
|
import httpx
|
|
|
|
from core.config import settings
|
|
|
|
|
|
def strip_thinking(text: str) -> str:
|
|
"""Qwen3.5의 <think>...</think> 블록 및 Thinking Process 텍스트 제거"""
|
|
# <think> 태그 제거
|
|
text = re.sub(r"<think>.*?</think>", "", text, flags=re.DOTALL)
|
|
# "Thinking Process:" 등 사고 과정 텍스트 제거 (첫 번째 { 이전의 모든 텍스트)
|
|
json_start = text.find("{")
|
|
if json_start > 0:
|
|
text = text[json_start:]
|
|
return text.strip()
|
|
|
|
|
|
def parse_json_response(raw: str) -> dict | None:
|
|
"""AI 응답에서 JSON 객체 추출 (think 태그, 코드블록 등 제거)"""
|
|
cleaned = strip_thinking(raw)
|
|
# 코드블록 내부 JSON 추출
|
|
code_match = re.search(r"```(?:json)?\s*(\{.*?\})\s*```", cleaned, re.DOTALL)
|
|
if code_match:
|
|
cleaned = code_match.group(1)
|
|
# 마지막 유효 JSON 객체 찾기
|
|
matches = list(re.finditer(r"\{[^{}]*(?:\{[^{}]*\}[^{}]*)*\}", cleaned, re.DOTALL))
|
|
for m in reversed(matches):
|
|
try:
|
|
return json.loads(m.group())
|
|
except json.JSONDecodeError:
|
|
continue
|
|
# 최후 시도: 전체 텍스트를 JSON으로
|
|
try:
|
|
return json.loads(cleaned)
|
|
except json.JSONDecodeError:
|
|
return None
|
|
|
|
# 프롬프트 로딩
|
|
PROMPTS_DIR = Path(__file__).parent.parent / "prompts"
|
|
|
|
|
|
def _load_prompt(name: str) -> str:
|
|
return (PROMPTS_DIR / name).read_text(encoding="utf-8")
|
|
|
|
|
|
CLASSIFY_PROMPT = _load_prompt("classify.txt") if (PROMPTS_DIR / "classify.txt").exists() else ""
|
|
|
|
|
|
class AIClient:
|
|
"""AI Gateway를 통한 통합 클라이언트. 기본값은 항상 Qwen3.5."""
|
|
|
|
def __init__(self):
|
|
self.ai = settings.ai
|
|
self._http = httpx.AsyncClient(timeout=120)
|
|
|
|
async def classify(self, text: str) -> dict:
|
|
"""문서 분류 — 항상 primary(Qwen3.5) 사용"""
|
|
prompt = CLASSIFY_PROMPT.replace("{document_text}", text)
|
|
response = await self._call_chat(self.ai.primary, prompt)
|
|
return response
|
|
|
|
async def summarize(self, text: str, force_premium: bool = False) -> str:
|
|
"""문서 요약 — 기본 Qwen3.5, 장문이거나 명시적 요청 시만 Claude"""
|
|
model = self.ai.primary
|
|
if force_premium or len(text) > 15000:
|
|
model = self.ai.premium
|
|
return await self._call_chat(model, f"다음 문서를 500자 이내로 요약해주세요:\n\n{text}")
|
|
|
|
async def embed(self, text: str) -> list[float]:
|
|
"""벡터 임베딩 — GPU 서버 전용"""
|
|
response = await self._http.post(
|
|
self.ai.embedding.endpoint,
|
|
json={"model": self.ai.embedding.model, "prompt": text},
|
|
)
|
|
response.raise_for_status()
|
|
return response.json()["embedding"]
|
|
|
|
async def ocr(self, image_bytes: bytes) -> str:
|
|
"""이미지 OCR — GPU 서버 전용"""
|
|
# TODO: Qwen2.5-VL-7B 비전 모델 호출 구현
|
|
raise NotImplementedError("OCR는 Phase 1에서 구현")
|
|
|
|
async def rerank(self, query: str, texts: list[str]) -> list[dict]:
|
|
"""TEI bge-reranker-v2-m3 호출 (Phase 1.3).
|
|
|
|
TEI POST /rerank API:
|
|
request: {"query": str, "texts": [str, ...]}
|
|
response: [{"index": int, "score": float}, ...] (정렬됨)
|
|
|
|
timeout은 self.ai.rerank.timeout (config.yaml).
|
|
호출자(rerank_service)가 asyncio.Semaphore + try/except로 감쌈.
|
|
"""
|
|
timeout = float(self.ai.rerank.timeout) if self.ai.rerank.timeout else 5.0
|
|
response = await self._http.post(
|
|
self.ai.rerank.endpoint,
|
|
json={"query": query, "texts": texts},
|
|
timeout=timeout,
|
|
)
|
|
response.raise_for_status()
|
|
return response.json()
|
|
|
|
async def _call_chat(self, model_config, prompt: str) -> str:
|
|
"""OpenAI 호환 API 호출 + 자동 폴백"""
|
|
try:
|
|
return await self._request(model_config, prompt)
|
|
except (httpx.TimeoutException, httpx.ConnectError):
|
|
if model_config == self.ai.primary:
|
|
return await self._request(self.ai.fallback, prompt)
|
|
raise
|
|
|
|
async def _request(self, model_config, prompt: str) -> str:
|
|
"""단일 모델 API 호출 (OpenAI 호환 + Anthropic Messages API)"""
|
|
is_anthropic = "anthropic.com" in model_config.endpoint
|
|
|
|
if is_anthropic:
|
|
import os
|
|
headers = {
|
|
"x-api-key": os.getenv("CLAUDE_API_KEY", ""),
|
|
"anthropic-version": "2023-06-01",
|
|
"content-type": "application/json",
|
|
}
|
|
response = await self._http.post(
|
|
model_config.endpoint,
|
|
headers=headers,
|
|
json={
|
|
"model": model_config.model,
|
|
"max_tokens": model_config.max_tokens,
|
|
"messages": [{"role": "user", "content": prompt}],
|
|
},
|
|
timeout=model_config.timeout,
|
|
)
|
|
response.raise_for_status()
|
|
data = response.json()
|
|
return data["content"][0]["text"]
|
|
else:
|
|
response = await self._http.post(
|
|
model_config.endpoint,
|
|
json={
|
|
"model": model_config.model,
|
|
"messages": [{"role": "user", "content": prompt}],
|
|
"max_tokens": model_config.max_tokens,
|
|
"chat_template_kwargs": {"enable_thinking": False},
|
|
},
|
|
timeout=model_config.timeout,
|
|
)
|
|
response.raise_for_status()
|
|
data = response.json()
|
|
return data["choices"][0]["message"]["content"]
|
|
|
|
async def close(self):
|
|
await self._http.aclose()
|