feat: Hybrid 부하 판단 — health latency baseline + 조건부 inference

- model_adapter: measure_inference_latency() (max_tokens=1, 최소 부하)
- backend_registry:
  - health latency baseline 학습 (초기 5회 max, 이후 EMA)
  - get_load_status(): inference 우선, health/queue 보조
  - cache 30s + cooldown 10s + asyncio.Lock으로 자기증폭 루프 방지
  - 조건: health > baseline*3 또는 사용자 명시 요청 시에만 ping
- worker:
  - "system_status" 액션 — 사용자 상태 조회 시 force_measure
  - _build_system_status() 응답 빌더 (health/baseline/ping/queue)
  - route busy 안내를 get_load_status 기반으로 변경

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Hyungi Ahn
2026-04-07 07:46:57 +09:00
parent fcd29a82c3
commit 875a4f80d2
3 changed files with 158 additions and 25 deletions

View File

@@ -68,6 +68,11 @@ class BackendRegistry:
self.reasoner: ModelAdapter | None = None # Gemma4: 추론
self._health: dict[str, bool] = {"classifier": False, "reasoner": False}
self._latency: dict[str, float] = {"classifier": 0.0, "reasoner": 0.0}
self._health_baseline: dict[str, float] = {}
self._sample_count: dict[str, int] = {}
self._inference_latency: dict[str, float | None] = {}
self._inference_latency_at: dict[str, float] = {}
self._measure_lock = asyncio.Lock()
self._health_task: asyncio.Task | None = None
def init_from_settings(self, settings) -> None:
@@ -111,10 +116,95 @@ class BackendRegistry:
prev = self._health[role]
self._health[role] = healthy
self._latency[role] = elapsed
if healthy:
self._update_baseline(role, elapsed)
if prev != healthy:
status = "UP" if healthy else "DOWN"
logger.warning("%s (%s) → %s (%.0fms)", adapter.name, role, status, elapsed)
def _update_baseline(self, role: str, latency: float) -> None:
"""baseline EMA 업데이트. 초기 5회는 max로 spike 보호."""
sample_count = self._sample_count.get(role, 0) + 1
self._sample_count[role] = sample_count
current = self._health_baseline.get(role, latency)
if sample_count < 5:
new_baseline = max(current, latency)
else:
new_baseline = current * 0.9 + latency * 0.1
# 절대 최솟값 (10ms 이하면 의미 없음)
self._health_baseline[role] = max(new_baseline, 10)
async def _measure_inference(self, role: str) -> float | None:
"""inference latency 측정 (lock으로 동시 1개 제한)."""
adapter = self.classifier if role == "classifier" else self.reasoner
if not adapter:
return None
async with self._measure_lock:
latency = await adapter.measure_inference_latency()
if latency >= 0:
self._inference_latency[role] = latency
self._inference_latency_at[role] = time.time()
return latency
return None
async def get_load_status(self, role: str, force_measure: bool = False) -> dict:
"""role의 부하 상태 판단. inference latency는 캐시 또는 조건부 측정."""
health_latency = self._latency.get(role, 0)
baseline = self._health_baseline.get(role, 50)
cached_inference = self._inference_latency.get(role)
cached_at = self._inference_latency_at.get(role, 0)
now = time.time()
cache_valid = (now - cached_at) < 30
cooldown_active = (now - cached_at) < 10
# 조건부 측정 (상대값 + 쿨다운)
should_measure = False
if force_measure and not cooldown_active:
should_measure = True
elif not cache_valid and not cooldown_active:
if health_latency > baseline * 3:
should_measure = True
if should_measure:
inference_latency = await self._measure_inference(role)
else:
inference_latency = cached_inference if cache_valid else None
# 부하 판단 (inference 우선, health는 fallback)
if inference_latency is not None and inference_latency > 8000:
load = "매우 바쁨"
elif inference_latency is not None and inference_latency > 4000:
load = "바쁨"
elif health_latency > baseline * 5:
load = "바쁨"
elif health_latency > baseline * 2:
load = "보통"
else:
load = "여유"
# local queue로 보조 판단 (한 단계 상향)
try:
from services import job_queue as jq_module
if jq_module.job_queue:
active = jq_module.job_queue.stats.get("active", 0)
if active > 0 and load == "보통":
load = "바쁨"
elif active > 0 and load == "여유":
load = "보통"
except Exception:
pass
return {
"load": load,
"health_ms": health_latency,
"health_baseline_ms": round(baseline, 1),
"inference_ms": inference_latency,
"measured": inference_latency is not None,
}
def is_healthy(self, role: str) -> bool:
return self._health.get(role, False)

View File

@@ -115,3 +115,19 @@ class ModelAdapter:
return resp.status_code < 500
except Exception:
return False
async def measure_inference_latency(self) -> float:
"""ping 메시지로 실제 inference latency 측정. max_tokens=1로 최소 부하.
반환: 밀리초. 실패 시 -1.0"""
import time as _time
original_max = self.max_tokens
self.max_tokens = 1
try:
start = _time.monotonic()
await self.complete_chat("ping")
return (_time.monotonic() - start) * 1000
except Exception:
logger.warning("inference latency measurement failed for %s", self.name)
return -1.0
finally:
self.max_tokens = original_max

View File

@@ -87,6 +87,39 @@ def _parse_classification(raw: str) -> dict:
return {"action": "direct", "response": cleaned, "prompt": ""}
async def _build_system_status(force_measure: bool = True) -> str:
"""시스템 상태 메시지 생성. force_measure=True면 inference latency 1회 측정."""
from services import job_queue as jq_module
health = backend_registry.health_summary()
lines = ["🤖 시스템 상태:", ""]
for role in ("classifier", "reasoner"):
info = health.get(role)
if not info:
continue
load = await backend_registry.get_load_status(role, force_measure=force_measure)
connected = "정상" if info["healthy"] else "연결 안 됨"
load_str = load["load"]
baseline = load["health_baseline_ms"]
ratio = (load["health_ms"] / baseline) if baseline > 0 else 1.0
line1 = f"{info['name']}: {connected}{load_str}"
line2 = f" health {load['health_ms']:.0f}ms (baseline {baseline:.0f}ms, {ratio:.1f}배)"
if load["measured"]:
line3 = f" ping {load['inference_ms']:.0f}ms"
else:
line3 = " ping: 측정 안 함"
lines.append(line1)
lines.append(line2)
lines.append(line3)
lines.append("")
queue = jq_module.job_queue.stats if jq_module.job_queue else {"pending": 0, "active": 0}
lines.append(f"내 작업 큐: 처리 중 {queue.get('active', 0)}건, 대기 {queue.get('pending', 0)}")
return "\n".join(lines)
async def _send_callback(job: Job, text: str) -> None:
"""Synology callback이면 전송 + response_sent 플래그."""
if job.callback == "synology":
@@ -149,26 +182,10 @@ def _pre_route(message: str) -> dict | None:
if query:
return {"action": "tools", "tool": "document", "operation": "search", "params": {"query": query}}
# 시스템 상태 질문
# 시스템 상태 질문 — 마커만 반환 (worker에서 비동기 조회)
if any(k in msg for k in ["추론 모델", "gemma", "젬마", "서버 상태", "시스템 상태"]) or \
(any(k in msg for k in ["상태", "젬마", "gemma"]) and any(k in msg for k in ["돌아", "일하", "작동", "상태", "건강", "살아", "대기", "", "밀려", "바빠", "느려", ""])):
health = backend_registry.health_summary()
from services import job_queue as jq_module
queue = jq_module.job_queue.stats if jq_module.job_queue else {"pending": 0, "active": 0}
lines = []
for role, info in health.items():
status = "✅ 정상" if info["healthy"] else "❌ 연결 안 됨"
line = f"{info['name']} ({role}): {status} ({info['latency_ms']:.0f}ms)"
lines.append(line)
# 큐 정보
active = queue.get("active", 0)
pending = queue.get("pending", 0)
if active > 0 or pending > 0:
lines.append(f"\n작업 현황: 처리 중 {active}건, 대기 {pending}")
else:
lines.append(f"\n작업 현황: 유휴 상태 (대기 없음)")
status_text = "\n".join(lines)
return {"action": "direct", "response": f"현재 시스템 상태야:\n{status_text}", "prompt": ""}
return {"action": "system_status", "response": "", "prompt": ""}
# pending_draft 확인 응답
if msg in ("확인", "", "yes", "ㅇㅇ", "", "", "좋아", "ok"):
@@ -249,7 +266,15 @@ async def run(job: Job) -> None:
if job.status == JobStatus.cancelled:
return
if action == "tools":
if action == "system_status":
# === SYSTEM STATUS: 부하 상태 조회 (사용자 명시 요청 → ping 측정) ===
await state_stream.push(job.id, "processing", {"message": "시스템 상태 확인 중..."})
status_text = await _build_system_status(force_measure=True)
collected.append(status_text)
await state_stream.push(job.id, "result", {"content": status_text})
conversation_store.add(user_id, "assistant", status_text)
elif action == "tools":
# === TOOLS: 도구 실행 ===
tool_name = classification.get("tool", "")
operation = classification.get("operation", "")
@@ -335,12 +360,14 @@ async def run(job: Job) -> None:
rewritten_message = (route_prompt or job.message)[:MAX_PROMPT_LENGTH]
job.rewritten_message = rewritten_message
# Gemma busy 안내 (큐에 다른 작업이 있으면)
from services import job_queue as jq_module
if jq_module.job_queue:
q = jq_module.job_queue.stats
if q.get("active", 0) > 1 and job.callback == "synology":
await send_to_synology(f"⏳ 추론 모델이 현재 다른 작업도 처리 중이라 조금 걸릴 수 있어... (대기 {q.get('pending', 0)}건)", raw=True)
# Gemma busy 안내 (Hybrid 부하 판단 — inference 강제 측정 안 함)
if job.callback == "synology":
load = await backend_registry.get_load_status("reasoner", force_measure=False)
if load["load"] in ("바쁨", "매우 바쁨"):
await send_to_synology(
f"⏳ 추론 모델이 현재 바쁜 상태야({load['load']}). 좀 걸릴 수 있어...",
raw=True
)
if job.callback != "synology":
await state_stream.push(job.id, "rewrite", {"content": rewritten_message})