feat: MLX /status API 통합 — GPU 무접촉 부하 측정

- model_adapter.status_check(): GET /status, 미지원 백엔드는 None
- backend_registry.get_load_status(): /status API 1순위, hybrid fallback
  - active_jobs >=4 매우 바쁨, >=2 바쁨, =1 보통, =0 여유
  - source: status_api | hybrid 표시
- worker._build_system_status(): source별 분기, active_jobs/total_requests 표시
- hybrid 경로의 self-job 제외 (active > 1)
- health loop에서 status도 같이 캐시 (지원 백엔드만)

Mac mini /opt/mlx-proxy.py에 active_jobs 카운터 + /status 엔드포인트 추가됨

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Hyungi Ahn
2026-04-07 08:06:44 +09:00
parent 3eacbac964
commit cc792eddbb
3 changed files with 80 additions and 16 deletions

View File

@@ -72,6 +72,8 @@ class BackendRegistry:
self._sample_count: dict[str, int] = {}
self._inference_latency: dict[str, float | None] = {}
self._inference_latency_at: dict[str, float] = {}
self._status_cache: dict[str, dict | None] = {}
self._status_cached_at: dict[str, float] = {}
self._measure_lock = asyncio.Lock()
self._health_task: asyncio.Task | None = None
@@ -118,6 +120,10 @@ class BackendRegistry:
self._latency[role] = elapsed
if healthy:
self._update_baseline(role, elapsed)
# /status 같이 캐시 (지원하는 백엔드만)
status_data = await adapter.status_check()
self._status_cache[role] = status_data
self._status_cached_at[role] = time.time()
if prev != healthy:
status = "UP" if healthy else "DOWN"
logger.warning("%s (%s) → %s (%.0fms)", adapter.name, role, status, elapsed)
@@ -150,9 +156,46 @@ class BackendRegistry:
return None
async def get_load_status(self, role: str, force_measure: bool = False) -> dict:
"""role의 부하 상태 판단. inference latency는 캐시 또는 조건부 측정."""
"""role의 부하 상태 판단. /status API 우선, 없으면 hybrid (health baseline + 조건부 ping)."""
health_latency = self._latency.get(role, 0)
baseline = self._health_baseline.get(role, 50)
adapter = self.classifier if role == "classifier" else self.reasoner
# === 1순위: /status API (GPU 무접촉) ===
# force_measure 시 fresh 호출, 아니면 캐시 사용
status_data = None
if force_measure and adapter:
status_data = await adapter.status_check()
if status_data:
self._status_cache[role] = status_data
self._status_cached_at[role] = time.time()
else:
status_data = self._status_cache.get(role)
if status_data is not None:
active_jobs = status_data.get("active_jobs", 0)
# 부드러운 임계치
if active_jobs >= 4:
load = "매우 바쁨"
elif active_jobs >= 2:
load = "바쁨"
elif active_jobs == 1:
load = "보통"
else:
load = "여유"
return {
"load": load,
"source": "status_api",
"active_jobs": active_jobs,
"total_requests": status_data.get("total_requests", 0),
"health_ms": health_latency,
"health_baseline_ms": round(baseline, 1),
"inference_ms": None,
"measured": False,
}
# === 2순위: hybrid fallback (status 미지원 백엔드) ===
cached_inference = self._inference_latency.get(role)
cached_at = self._inference_latency_at.get(role, 0)
@@ -160,7 +203,6 @@ class BackendRegistry:
cache_valid = (now - cached_at) < 30
cooldown_active = (now - cached_at) < 10
# 조건부 측정 (상대값 + 쿨다운)
should_measure = False
if force_measure and not cooldown_active:
should_measure = True
@@ -173,7 +215,6 @@ class BackendRegistry:
else:
inference_latency = cached_inference if cache_valid else None
# 부하 판단 (inference 우선, health는 fallback)
if inference_latency is not None and inference_latency > 8000:
load = "매우 바쁨"
elif inference_latency is not None and inference_latency > 4000:
@@ -185,20 +226,23 @@ class BackendRegistry:
else:
load = "여유"
# local queue 보조 판단 (한 단계 상향)
# local queue 보조 판단 (자기 자신 제외 — active > 1)
try:
from services import job_queue as jq_module
if jq_module.job_queue:
active = jq_module.job_queue.stats.get("active", 0)
if active > 0 and load == "보통":
if active > 1 and load == "보통":
load = "바쁨"
elif active > 0 and load == "여유":
elif active > 1 and load == "여유":
load = "보통"
except Exception:
pass
return {
"load": load,
"source": "hybrid",
"active_jobs": None,
"total_requests": None,
"health_ms": health_latency,
"health_baseline_ms": round(baseline, 1),
"inference_ms": inference_latency,

View File

@@ -116,6 +116,19 @@ class ModelAdapter:
except Exception:
return False
async def status_check(self) -> dict | None:
"""GET /status — 백엔드가 지원하는 경우만. 없으면 None."""
try:
async with httpx.AsyncClient(timeout=3.0) as client:
resp = await client.get(f"{self.base_url}/status")
if resp.status_code == 200:
data = resp.json()
if "active_jobs" in data:
return data
except Exception:
pass
return None
async def measure_inference_latency(self) -> float:
"""ping 메시지로 실제 inference latency 측정. max_tokens=1로 최소 부하.
반환: 밀리초. 실패 시 -1.0"""

View File

@@ -100,19 +100,26 @@ async def _build_system_status(force_measure: bool = True) -> str:
load = await backend_registry.get_load_status(role, force_measure=force_measure)
connected = "정상" if info["healthy"] else "연결 안 됨"
load_str = load["load"]
baseline = load["health_baseline_ms"]
ratio = (load["health_ms"] / baseline) if baseline > 0 else 1.0
source = load.get("source", "hybrid")
line1 = f"{info['name']}: {connected}{load_str}"
line2 = f" health {load['health_ms']:.0f}ms (baseline {baseline:.0f}ms, {ratio:.1f}배)"
if load["measured"]:
line3 = f" ping {load['inference_ms']:.0f}ms"
else:
line3 = " ping: 측정 안 함"
lines.append(line1)
lines.append(line2)
lines.append(line3)
if source == "status_api":
# /status API 직접 조회 — 정확한 active_jobs
active = load.get("active_jobs", 0)
total = load.get("total_requests", 0)
lines.append(f" active_jobs: {active} (total: {total}) [status API]")
lines.append(f" health {load['health_ms']:.0f}ms")
else:
# hybrid fallback (health baseline + 조건부 ping)
baseline = load["health_baseline_ms"]
ratio = (load["health_ms"] / baseline) if baseline > 0 else 1.0
lines.append(f" health {load['health_ms']:.0f}ms (baseline {baseline:.0f}ms, {ratio:.1f}배)")
if load["measured"]:
lines.append(f" ping {load['inference_ms']:.0f}ms")
else:
lines.append(" ping: 측정 안 함")
lines.append("")
queue = jq_module.job_queue.stats if jq_module.job_queue else {"pending": 0, "active": 0}