From 875a4f80d24c644cf50a406914e887c5716098ac Mon Sep 17 00:00:00 2001 From: Hyungi Ahn Date: Tue, 7 Apr 2026 07:46:57 +0900 Subject: [PATCH] =?UTF-8?q?feat:=20Hybrid=20=EB=B6=80=ED=95=98=20=ED=8C=90?= =?UTF-8?q?=EB=8B=A8=20=E2=80=94=20health=20latency=20baseline=20+=20?= =?UTF-8?q?=EC=A1=B0=EA=B1=B4=EB=B6=80=20inference?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - model_adapter: measure_inference_latency() (max_tokens=1, 최소 부하) - backend_registry: - health latency baseline 학습 (초기 5회 max, 이후 EMA) - get_load_status(): inference 우선, health/queue 보조 - cache 30s + cooldown 10s + asyncio.Lock으로 자기증폭 루프 방지 - 조건: health > baseline*3 또는 사용자 명시 요청 시에만 ping - worker: - "system_status" 액션 — 사용자 상태 조회 시 force_measure - _build_system_status() 응답 빌더 (health/baseline/ping/queue) - route busy 안내를 get_load_status 기반으로 변경 Co-Authored-By: Claude Opus 4.6 (1M context) --- nanoclaude/services/backend_registry.py | 90 +++++++++++++++++++++++++ nanoclaude/services/model_adapter.py | 16 +++++ nanoclaude/services/worker.py | 77 ++++++++++++++------- 3 files changed, 158 insertions(+), 25 deletions(-) diff --git a/nanoclaude/services/backend_registry.py b/nanoclaude/services/backend_registry.py index d76c655..04afb21 100644 --- a/nanoclaude/services/backend_registry.py +++ b/nanoclaude/services/backend_registry.py @@ -68,6 +68,11 @@ class BackendRegistry: self.reasoner: ModelAdapter | None = None # Gemma4: 추론 self._health: dict[str, bool] = {"classifier": False, "reasoner": False} self._latency: dict[str, float] = {"classifier": 0.0, "reasoner": 0.0} + self._health_baseline: dict[str, float] = {} + self._sample_count: dict[str, int] = {} + self._inference_latency: dict[str, float | None] = {} + self._inference_latency_at: dict[str, float] = {} + self._measure_lock = asyncio.Lock() self._health_task: asyncio.Task | None = None def init_from_settings(self, settings) -> None: @@ -111,10 +116,95 @@ class BackendRegistry: prev = self._health[role] self._health[role] = healthy self._latency[role] = elapsed + if healthy: + self._update_baseline(role, elapsed) if prev != healthy: status = "UP" if healthy else "DOWN" logger.warning("%s (%s) → %s (%.0fms)", adapter.name, role, status, elapsed) + def _update_baseline(self, role: str, latency: float) -> None: + """baseline EMA 업데이트. 초기 5회는 max로 spike 보호.""" + sample_count = self._sample_count.get(role, 0) + 1 + self._sample_count[role] = sample_count + current = self._health_baseline.get(role, latency) + + if sample_count < 5: + new_baseline = max(current, latency) + else: + new_baseline = current * 0.9 + latency * 0.1 + + # 절대 최솟값 (10ms 이하면 의미 없음) + self._health_baseline[role] = max(new_baseline, 10) + + async def _measure_inference(self, role: str) -> float | None: + """inference latency 측정 (lock으로 동시 1개 제한).""" + adapter = self.classifier if role == "classifier" else self.reasoner + if not adapter: + return None + async with self._measure_lock: + latency = await adapter.measure_inference_latency() + if latency >= 0: + self._inference_latency[role] = latency + self._inference_latency_at[role] = time.time() + return latency + return None + + async def get_load_status(self, role: str, force_measure: bool = False) -> dict: + """role의 부하 상태 판단. inference latency는 캐시 또는 조건부 측정.""" + health_latency = self._latency.get(role, 0) + baseline = self._health_baseline.get(role, 50) + cached_inference = self._inference_latency.get(role) + cached_at = self._inference_latency_at.get(role, 0) + + now = time.time() + cache_valid = (now - cached_at) < 30 + cooldown_active = (now - cached_at) < 10 + + # 조건부 측정 (상대값 + 쿨다운) + should_measure = False + if force_measure and not cooldown_active: + should_measure = True + elif not cache_valid and not cooldown_active: + if health_latency > baseline * 3: + should_measure = True + + if should_measure: + inference_latency = await self._measure_inference(role) + else: + inference_latency = cached_inference if cache_valid else None + + # 부하 판단 (inference 우선, health는 fallback) + if inference_latency is not None and inference_latency > 8000: + load = "매우 바쁨" + elif inference_latency is not None and inference_latency > 4000: + load = "바쁨" + elif health_latency > baseline * 5: + load = "바쁨" + elif health_latency > baseline * 2: + load = "보통" + else: + load = "여유" + + # local queue로 보조 판단 (한 단계 상향) + try: + from services import job_queue as jq_module + if jq_module.job_queue: + active = jq_module.job_queue.stats.get("active", 0) + if active > 0 and load == "보통": + load = "바쁨" + elif active > 0 and load == "여유": + load = "보통" + except Exception: + pass + + return { + "load": load, + "health_ms": health_latency, + "health_baseline_ms": round(baseline, 1), + "inference_ms": inference_latency, + "measured": inference_latency is not None, + } + def is_healthy(self, role: str) -> bool: return self._health.get(role, False) diff --git a/nanoclaude/services/model_adapter.py b/nanoclaude/services/model_adapter.py index 9690709..1e5b3c5 100644 --- a/nanoclaude/services/model_adapter.py +++ b/nanoclaude/services/model_adapter.py @@ -115,3 +115,19 @@ class ModelAdapter: return resp.status_code < 500 except Exception: return False + + async def measure_inference_latency(self) -> float: + """ping 메시지로 실제 inference latency 측정. max_tokens=1로 최소 부하. + 반환: 밀리초. 실패 시 -1.0""" + import time as _time + original_max = self.max_tokens + self.max_tokens = 1 + try: + start = _time.monotonic() + await self.complete_chat("ping") + return (_time.monotonic() - start) * 1000 + except Exception: + logger.warning("inference latency measurement failed for %s", self.name) + return -1.0 + finally: + self.max_tokens = original_max diff --git a/nanoclaude/services/worker.py b/nanoclaude/services/worker.py index 419cf35..0dd8f21 100644 --- a/nanoclaude/services/worker.py +++ b/nanoclaude/services/worker.py @@ -87,6 +87,39 @@ def _parse_classification(raw: str) -> dict: return {"action": "direct", "response": cleaned, "prompt": ""} +async def _build_system_status(force_measure: bool = True) -> str: + """시스템 상태 메시지 생성. force_measure=True면 inference latency 1회 측정.""" + from services import job_queue as jq_module + health = backend_registry.health_summary() + lines = ["🤖 시스템 상태:", ""] + + for role in ("classifier", "reasoner"): + info = health.get(role) + if not info: + continue + load = await backend_registry.get_load_status(role, force_measure=force_measure) + connected = "정상" if info["healthy"] else "연결 안 됨" + load_str = load["load"] + + baseline = load["health_baseline_ms"] + ratio = (load["health_ms"] / baseline) if baseline > 0 else 1.0 + + line1 = f"{info['name']}: {connected} — {load_str}" + line2 = f" health {load['health_ms']:.0f}ms (baseline {baseline:.0f}ms, {ratio:.1f}배)" + if load["measured"]: + line3 = f" ping {load['inference_ms']:.0f}ms" + else: + line3 = " ping: 측정 안 함" + lines.append(line1) + lines.append(line2) + lines.append(line3) + lines.append("") + + queue = jq_module.job_queue.stats if jq_module.job_queue else {"pending": 0, "active": 0} + lines.append(f"내 작업 큐: 처리 중 {queue.get('active', 0)}건, 대기 {queue.get('pending', 0)}건") + return "\n".join(lines) + + async def _send_callback(job: Job, text: str) -> None: """Synology callback이면 전송 + response_sent 플래그.""" if job.callback == "synology": @@ -149,26 +182,10 @@ def _pre_route(message: str) -> dict | None: if query: return {"action": "tools", "tool": "document", "operation": "search", "params": {"query": query}} - # 시스템 상태 질문 + # 시스템 상태 질문 — 마커만 반환 (worker에서 비동기 조회) if any(k in msg for k in ["추론 모델", "gemma", "젬마", "서버 상태", "시스템 상태"]) or \ (any(k in msg for k in ["상태", "젬마", "gemma"]) and any(k in msg for k in ["돌아", "일하", "작동", "상태", "건강", "살아", "대기", "큐", "밀려", "바빠", "느려", "많"])): - health = backend_registry.health_summary() - from services import job_queue as jq_module - queue = jq_module.job_queue.stats if jq_module.job_queue else {"pending": 0, "active": 0} - lines = [] - for role, info in health.items(): - status = "✅ 정상" if info["healthy"] else "❌ 연결 안 됨" - line = f"• {info['name']} ({role}): {status} ({info['latency_ms']:.0f}ms)" - lines.append(line) - # 큐 정보 - active = queue.get("active", 0) - pending = queue.get("pending", 0) - if active > 0 or pending > 0: - lines.append(f"\n작업 현황: 처리 중 {active}건, 대기 {pending}건") - else: - lines.append(f"\n작업 현황: 유휴 상태 (대기 없음)") - status_text = "\n".join(lines) - return {"action": "direct", "response": f"현재 시스템 상태야:\n{status_text}", "prompt": ""} + return {"action": "system_status", "response": "", "prompt": ""} # pending_draft 확인 응답 if msg in ("확인", "예", "yes", "ㅇㅇ", "응", "네", "좋아", "ok"): @@ -249,7 +266,15 @@ async def run(job: Job) -> None: if job.status == JobStatus.cancelled: return - if action == "tools": + if action == "system_status": + # === SYSTEM STATUS: 부하 상태 조회 (사용자 명시 요청 → ping 측정) === + await state_stream.push(job.id, "processing", {"message": "시스템 상태 확인 중..."}) + status_text = await _build_system_status(force_measure=True) + collected.append(status_text) + await state_stream.push(job.id, "result", {"content": status_text}) + conversation_store.add(user_id, "assistant", status_text) + + elif action == "tools": # === TOOLS: 도구 실행 === tool_name = classification.get("tool", "") operation = classification.get("operation", "") @@ -335,12 +360,14 @@ async def run(job: Job) -> None: rewritten_message = (route_prompt or job.message)[:MAX_PROMPT_LENGTH] job.rewritten_message = rewritten_message - # Gemma busy 안내 (큐에 다른 작업이 있으면) - from services import job_queue as jq_module - if jq_module.job_queue: - q = jq_module.job_queue.stats - if q.get("active", 0) > 1 and job.callback == "synology": - await send_to_synology(f"⏳ 추론 모델이 현재 다른 작업도 처리 중이라 조금 걸릴 수 있어... (대기 {q.get('pending', 0)}건)", raw=True) + # Gemma busy 안내 (Hybrid 부하 판단 — inference 강제 측정 안 함) + if job.callback == "synology": + load = await backend_registry.get_load_status("reasoner", force_measure=False) + if load["load"] in ("바쁨", "매우 바쁨"): + await send_to_synology( + f"⏳ 추론 모델이 현재 바쁜 상태야({load['load']}). 좀 걸릴 수 있어...", + raw=True + ) if job.callback != "synology": await state_stream.push(job.id, "rewrite", {"content": rewritten_message})