7dbf37a63f
- _pre_route: 산업안전/ASME/법령 등 도메인 키워드로 "문서 찾아줘" 없이도 자동으로 document tool 라우팅 (도메인 진입 시 ask +1 보너스) - EXAONE classifier: document.ask/search_full 도구 + 예시 추가 - worker: document tool 호출 전 "서고를 확인하는 중입니다" 안내 문구 전송 Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
284 lines
12 KiB
Python
284 lines
12 KiB
Python
"""BackendRegistry — 모델 어댑터 관리 + 헬스체크 루프."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import logging
|
|
import time
|
|
|
|
from services.model_adapter import ModelAdapter
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
CLASSIFIER_PROMPT = """\
|
|
너는 AI 라우터다. 사용��� 메시지를 분석하여 JSON으로 응답하라.
|
|
반드시 아래 4가�� action 중 하나를 선택하라:
|
|
|
|
1. "direct" — 인사, 잡담, 간단한 질문, 자기소개 요청 등. 네가 직접 답변한다.
|
|
2. "route" — 복잡한 질문, 분석, 설명, 코딩 등. 추론 모델에게 넘긴다. prompt 필드에 추론 모델용 프롬프트를 작성하라.
|
|
3. "clarify" — 질문이 모호하거나 정보가 부족할 때. 사용자에게 추가 질문���다.
|
|
4. "tools" — 캘린더, 이메일, 문서 조회/조작이 필요할 때. tool/operation/params를 지정하라.
|
|
|
|
사용 가능한 도구:
|
|
- calendar.today() — 오늘 일정 조회
|
|
- calendar.search(date_from, date_to) — 기간 일정 검색. 날짜는 YYYY-MM-DD
|
|
- calendar.create_draft(title, date, time, end_time, description) — 일정 생성 초안. 시간은 HH:MM. end_time 없으면 1시간. 실제 생성 아님
|
|
- calendar.create_confirmed() — pending_draft가 있을 때, 사용자가 "확인/예/yes" 한 경우만 사용
|
|
- email.search(query, days) — 메일 검색. days 기본 7
|
|
- email.read(uid) — 메일 본문 조회
|
|
- document.ask(query) — 문서 기반 AI 답변. 법령/규정/기술 내용을 물어볼 때 사용. 시간 걸림
|
|
- document.search_full(query) — 문서 고급 검색 (리랭킹 포함). 문서 목록이 필요할 때 사용
|
|
- document.search(query) — 문서 간단 검색
|
|
- document.read(doc_id) — 문서 조회
|
|
- infra.status(host) — 서버 Docker 컨테이너 상태. host: gpu (기본), nas-company
|
|
- infra.health(service) — 서비스 헬스체크. service 생략 시 전체 체크. 개별: document-server, mlx, ollama-gpu
|
|
- infra.disk(host) — 디스크 사용률. host 생략 시 gpu+macmini 전체
|
|
- infra.network() — Tailscale 네트워크 연결 상태
|
|
- infra.models(host) — 모델 목록. host: gpu (Ollama), mlx (맥미니 MLX)
|
|
|
|
메일 전송은 불가능하다. 메일 보내달라는 요청은 거부하라.
|
|
|
|
JSON 형식:
|
|
- direct/route/clarify: {"action": "...", "response": "...", "prompt": "..."}
|
|
- tools: {"action": "tools", "tool": "calendar|email|document", "operation": "...", "params": {...}}
|
|
|
|
중요 규칙:
|
|
- 반드시 순수 JSON만 출력. 백틱, 코드블록, 설명 텍스트 금지
|
|
- 날짜는 YYYY-MM-DD, 시간은 HH:MM
|
|
- pending_draft가 있다고 [대화 이력]에 표시되어 있을 때만 create_confirmed 사용
|
|
- [현재 시간]의 날짜를 참고하여 "오늘", "내일", "이번주" 등을 YYYY-MM-DD로 변환하라
|
|
|
|
판단 예시:
|
|
- "오늘 일정" → tools: calendar.today()
|
|
- "이번주 일정" → tools: calendar.search(이번주 월~일)
|
|
- "내일 3시 회의" → tools: calendar.create_draft(...)
|
|
- "최근 메일" → tools: email.search("", 7)
|
|
- "문서 찾아줘" → tools: document.search_full(...)
|
|
- "산업안전보건법 제6장 내용이 뭐야" → tools: document.ask("산업안전보건법 제6장")
|
|
- "ASME Section VIII 관련 자료" → tools: document.search_full("ASME Section VIII")
|
|
- "위험성평가 절차 알려줘" → tools: document.ask("위험성평가 절차")
|
|
- "GPU 서버 상태" → tools: infra.status("gpu")
|
|
- "서비스 살아있어?" → tools: infra.health()
|
|
- "디스크 용량" → tools: infra.disk()
|
|
- "네트워크 상태" → tools: infra.network()
|
|
- "모델 뭐 있어?" → tools: infra.models("gpu")
|
|
- "안녕" → direct
|
|
- "양자역학 설명해줘" → route
|
|
|
|
너의 이름은 '이드'. 상냥하고 친근하게 대화한다.
|
|
대화 이력이 있으면 맥락을 고려하라.\
|
|
"""
|
|
|
|
REASONER_PROMPT = (
|
|
"너는 '이드'라는 이름의 상냥하고 친근한 AI 어시스턴트야. "
|
|
"간결하고 자연스럽게 대화해. "
|
|
"질문에는 핵심만 명확하게 답해. "
|
|
"불필요한 구조화(번호 매기기, 헤더, 마크다운)는 피하고, 대화하듯 편하게 답변해."
|
|
)
|
|
|
|
|
|
class BackendRegistry:
|
|
def __init__(self) -> None:
|
|
self.classifier: ModelAdapter | None = None # EXAONE: 분류 + 직접응답
|
|
self.reasoner: ModelAdapter | None = None # Gemma4: 추론
|
|
self._health: dict[str, bool] = {"classifier": False, "reasoner": False}
|
|
self._latency: dict[str, float] = {"classifier": 0.0, "reasoner": 0.0}
|
|
self._health_baseline: dict[str, float] = {}
|
|
self._sample_count: dict[str, int] = {}
|
|
self._inference_latency: dict[str, float | None] = {}
|
|
self._inference_latency_at: dict[str, float] = {}
|
|
self._status_cache: dict[str, dict | None] = {}
|
|
self._status_cached_at: dict[str, float] = {}
|
|
self._measure_lock = asyncio.Lock()
|
|
self._health_task: asyncio.Task | None = None
|
|
|
|
def init_from_settings(self, settings) -> None:
|
|
self.classifier = ModelAdapter(
|
|
name="EXAONE",
|
|
base_url=settings.exaone_base_url,
|
|
model=settings.exaone_model,
|
|
system_prompt=CLASSIFIER_PROMPT,
|
|
temperature=0.3, # 분류는 낮은 temperature
|
|
timeout=settings.exaone_timeout,
|
|
)
|
|
self.reasoner = ModelAdapter(
|
|
name="Gemma4",
|
|
base_url=settings.reasoning_base_url,
|
|
model=settings.reasoning_model,
|
|
system_prompt=REASONER_PROMPT,
|
|
temperature=settings.reasoning_temperature,
|
|
timeout=settings.reasoning_timeout,
|
|
max_tokens=16000,
|
|
)
|
|
|
|
def start_health_loop(self, interval: float = 30.0) -> None:
|
|
self._health_task = asyncio.create_task(self._health_loop(interval))
|
|
|
|
def stop_health_loop(self) -> None:
|
|
if self._health_task and not self._health_task.done():
|
|
self._health_task.cancel()
|
|
|
|
async def _health_loop(self, interval: float) -> None:
|
|
while True:
|
|
await self._check_all()
|
|
await asyncio.sleep(interval)
|
|
|
|
async def _check_all(self) -> None:
|
|
for role, adapter in [("classifier", self.classifier), ("reasoner", self.reasoner)]:
|
|
if not adapter:
|
|
continue
|
|
start = time.monotonic()
|
|
healthy = await adapter.health_check()
|
|
elapsed = round((time.monotonic() - start) * 1000, 1)
|
|
prev = self._health[role]
|
|
self._health[role] = healthy
|
|
self._latency[role] = elapsed
|
|
if healthy:
|
|
self._update_baseline(role, elapsed)
|
|
# /status 같이 캐시 (지원하는 백엔드만)
|
|
status_data = await adapter.status_check()
|
|
self._status_cache[role] = status_data
|
|
self._status_cached_at[role] = time.time()
|
|
if prev != healthy:
|
|
status = "UP" if healthy else "DOWN"
|
|
logger.warning("%s (%s) → %s (%.0fms)", adapter.name, role, status, elapsed)
|
|
|
|
def _update_baseline(self, role: str, latency: float) -> None:
|
|
"""baseline EMA 업데이트. 초기 5회는 max로 spike 보호."""
|
|
sample_count = self._sample_count.get(role, 0) + 1
|
|
self._sample_count[role] = sample_count
|
|
current = self._health_baseline.get(role, latency)
|
|
|
|
if sample_count < 5:
|
|
new_baseline = max(current, latency)
|
|
else:
|
|
new_baseline = current * 0.9 + latency * 0.1
|
|
|
|
# 절대 최솟값 (10ms 이하면 의미 없음)
|
|
self._health_baseline[role] = max(new_baseline, 10)
|
|
|
|
async def _measure_inference(self, role: str) -> float | None:
|
|
"""inference latency 측정 (lock으로 동시 1개 제한)."""
|
|
adapter = self.classifier if role == "classifier" else self.reasoner
|
|
if not adapter:
|
|
return None
|
|
async with self._measure_lock:
|
|
latency = await adapter.measure_inference_latency()
|
|
if latency >= 0:
|
|
self._inference_latency[role] = latency
|
|
self._inference_latency_at[role] = time.time()
|
|
return latency
|
|
return None
|
|
|
|
async def get_load_status(self, role: str, force_measure: bool = False) -> dict:
|
|
"""role의 부하 상태 판단. /status API 우선, 없으면 hybrid (health baseline + 조건부 ping)."""
|
|
health_latency = self._latency.get(role, 0)
|
|
baseline = self._health_baseline.get(role, 50)
|
|
adapter = self.classifier if role == "classifier" else self.reasoner
|
|
|
|
# === 1순위: /status API (GPU 무접촉) ===
|
|
# force_measure 시 fresh 호출, 아니면 캐시 사용
|
|
status_data = None
|
|
if force_measure and adapter:
|
|
status_data = await adapter.status_check()
|
|
if status_data:
|
|
self._status_cache[role] = status_data
|
|
self._status_cached_at[role] = time.time()
|
|
else:
|
|
status_data = self._status_cache.get(role)
|
|
|
|
if status_data is not None:
|
|
active_jobs = status_data.get("active_jobs", 0)
|
|
# 부드러운 임계치
|
|
if active_jobs >= 4:
|
|
load = "매우 바쁨"
|
|
elif active_jobs >= 2:
|
|
load = "바쁨"
|
|
elif active_jobs == 1:
|
|
load = "보통"
|
|
else:
|
|
load = "여유"
|
|
|
|
return {
|
|
"load": load,
|
|
"source": "status_api",
|
|
"active_jobs": active_jobs,
|
|
"total_requests": status_data.get("total_requests", 0),
|
|
"health_ms": health_latency,
|
|
"health_baseline_ms": round(baseline, 1),
|
|
"inference_ms": None,
|
|
"measured": False,
|
|
}
|
|
|
|
# === 2순위: hybrid fallback (status 미지원 백엔드) ===
|
|
cached_inference = self._inference_latency.get(role)
|
|
cached_at = self._inference_latency_at.get(role, 0)
|
|
|
|
now = time.time()
|
|
cache_valid = (now - cached_at) < 30
|
|
cooldown_active = (now - cached_at) < 10
|
|
|
|
should_measure = False
|
|
if force_measure and not cooldown_active:
|
|
should_measure = True
|
|
elif not cache_valid and not cooldown_active:
|
|
if health_latency > baseline * 3:
|
|
should_measure = True
|
|
|
|
if should_measure:
|
|
inference_latency = await self._measure_inference(role)
|
|
else:
|
|
inference_latency = cached_inference if cache_valid else None
|
|
|
|
if inference_latency is not None and inference_latency > 8000:
|
|
load = "매우 바쁨"
|
|
elif inference_latency is not None and inference_latency > 4000:
|
|
load = "바쁨"
|
|
elif health_latency > baseline * 5:
|
|
load = "바쁨"
|
|
elif health_latency > baseline * 2:
|
|
load = "보통"
|
|
else:
|
|
load = "여유"
|
|
|
|
# local queue 보조 판단 (자기 자신 제외 — active > 1)
|
|
try:
|
|
from services import job_queue as jq_module
|
|
if jq_module.job_queue:
|
|
active = jq_module.job_queue.stats.get("active", 0)
|
|
if active > 1 and load == "보통":
|
|
load = "바쁨"
|
|
elif active > 1 and load == "여유":
|
|
load = "보통"
|
|
except Exception:
|
|
pass
|
|
|
|
return {
|
|
"load": load,
|
|
"source": "hybrid",
|
|
"active_jobs": None,
|
|
"total_requests": None,
|
|
"health_ms": health_latency,
|
|
"health_baseline_ms": round(baseline, 1),
|
|
"inference_ms": inference_latency,
|
|
"measured": inference_latency is not None,
|
|
}
|
|
|
|
def is_healthy(self, role: str) -> bool:
|
|
return self._health.get(role, False)
|
|
|
|
def health_summary(self) -> dict:
|
|
result = {}
|
|
for role, adapter in [("classifier", self.classifier), ("reasoner", self.reasoner)]:
|
|
if adapter:
|
|
result[role] = {
|
|
"name": adapter.name,
|
|
"model": adapter.model,
|
|
"healthy": self._health[role],
|
|
"latency_ms": self._latency[role],
|
|
}
|
|
return result
|
|
|
|
|
|
backend_registry = BackendRegistry()
|