From 378866a99c99782c80cfd0975bba480a2706500e Mon Sep 17 00:00:00 2001 From: Hyungi Ahn Date: Tue, 14 Apr 2026 06:53:24 +0900 Subject: [PATCH] =?UTF-8?q?fix(infra):=20agent=20alert-on-change=20?= =?UTF-8?q?=E2=80=94=20debounce=20+=20stable=20key=20+=20MacBook=20?= =?UTF-8?q?=EC=A0=9C=EC=99=B8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 상태 파일로 이전 이슈 추적 (~/Library/Application Support/infra-agent/) - stable issue key (docker:gpu:container:status 형태) - 2회 연속 실패 시 알림, 2회 연속 성공 시 복구 알림 - 동일 이슈 지속 시 무음 (alert storm 방지) - MacBook Pro를 EXPECTED_TAILSCALE_HOSTS에서 제거 (잠자기는 정상) - state file atomic write + 손상 시 graceful fallback Co-Authored-By: Claude Opus 4.6 (1M context) --- infra/agent.py | 271 ++++++++++++++++++++++++++++++++++--------------- 1 file changed, 187 insertions(+), 84 deletions(-) diff --git a/infra/agent.py b/infra/agent.py index cf9f303..3611a2f 100644 --- a/infra/agent.py +++ b/infra/agent.py @@ -1,11 +1,12 @@ -"""Infra Monitoring Agent — rule-first, Gemma 2nd. +"""Infra Monitoring Agent — rule-first, Gemma 2nd, alert-on-change. 5분마다 실행되어: 1. core/ 함수로 상태 수집 -2. Rule 기반 1차 판정 (threshold/패턴) -3. 이상 감지 시 → Gemma 4가 자연어 설명 생성 -4. 시놀로지 Chat 알림 (rule 요약 + Gemma 설명) -5. 로그 → stdout (launchd가 캡처) +2. Rule 기반 1차 판정 → stable issue key 생성 +3. 상태 파일과 비교 (debounce: 2회 연속 기준) +4. 새 이슈 → Gemma 설명 + 시놀로지 Chat 알림 +5. 복구 → 복구 알림 +6. 동일 이슈 지속 → 무음 """ from __future__ import annotations @@ -14,14 +15,13 @@ import asyncio import json import logging import os -import sys +import tempfile from datetime import datetime, timezone from pathlib import Path import httpx from dotenv import load_dotenv -# gpu-services 루트에서 실행되므로 infra.core를 import 가능 from infra.core.docker import docker_status from infra.core.health import service_health from infra.core.system import disk_usage @@ -34,7 +34,6 @@ logging.basicConfig( format="%(asctime)s [%(levelname)s] %(message)s", datefmt="%Y-%m-%d %H:%M:%S", ) -# asyncssh is extremely verbose at INFO level logging.getLogger("asyncssh").setLevel(logging.WARNING) log = logging.getLogger("infra-agent") @@ -42,27 +41,79 @@ log = logging.getLogger("infra-agent") SYNOLOGY_WEBHOOK_URL = os.getenv("SYNOLOGY_INCOMING_URL", "") DISK_WARN_PCT = 85 -EXPECTED_TAILSCALE_HOSTS = {"sub-server", "hyungi-macmini", "hyungi-macbookpro"} -# Docker containers known to be intentionally stopped +# Only server-grade hosts (MacBook Pro excluded — sleep is normal) +EXPECTED_TAILSCALE_HOSTS = {"sub-server", "hyungi-macmini"} + IGNORED_CONTAINERS = { "hyungi_document_server-ai-gateway-1", "hyungi_document_server-ollama-1", } -# Services to health-check HEALTH_SERVICES = ["document-server", "mlx", "ollama-gpu"] -# Gemma 4 — Mac mini MLX proxy (localhost on Mac mini) GEMMA_URL = "http://localhost:8801/v1/chat/completions" GEMMA_MODEL = "mlx-community/gemma-4-26b-a4b-it-8bit" -GEMMA_TIMEOUT = 30 # seconds +GEMMA_TIMEOUT = 30 + +# Debounce: consecutive cycles before alert/recovery +ALERT_THRESHOLD = 2 # 2 consecutive failures → alert +RECOVERY_THRESHOLD = 2 # 2 consecutive successes → recovery + +# State file (persistent across reboots) +STATE_DIR = Path.home() / "Library" / "Application Support" / "infra-agent" +STATE_FILE = STATE_DIR / "state.json" + + +# --- State Management --- + +def _load_state() -> dict: + """Load state file. Returns empty state on failure (graceful fallback).""" + try: + if STATE_FILE.exists(): + data = json.loads(STATE_FILE.read_text()) + if isinstance(data, dict) and "issues" in data: + return data + except (json.JSONDecodeError, OSError) as e: + log.warning("State file 손상 — 초기화: %s", e) + return {"issues": {}} + + +def _save_state(state: dict) -> None: + """Save state file atomically (write tmp + rename).""" + STATE_DIR.mkdir(parents=True, exist_ok=True) + tmp = STATE_FILE.with_suffix(".tmp") + try: + tmp.write_text(json.dumps(state, ensure_ascii=False, indent=2)) + tmp.rename(STATE_FILE) + except OSError: + log.exception("State file 저장 실패") + + +# --- Issue Key Generation --- +# Key format: {category}:{host}:{target}:{condition} +# Examples: docker:gpu:fastapi-1:exited, disk:gpu:/:high, tailscale:sub-server:offline + +def _docker_key(host: str, container: str, status: str) -> str: + return f"docker:{host}:{container}:{status}" + +def _disk_key(host: str, mount: str) -> str: + return f"disk:{host}:{mount}:high" + +def _health_key(service: str, status: str) -> str: + return f"health:{service}:{status}" + +def _tailscale_key(hostname: str) -> str: + return f"tailscale:{hostname}:offline" + +def _check_key(error: str) -> str: + return f"check:error:{error[:30]}" # --- Gemma --- async def generate_explanation(alerts: list[str]) -> str: - """Ask Gemma 4 to explain alerts and suggest actions. Returns empty on failure.""" + """Ask Gemma 4 to explain alerts. Returns empty on failure.""" prompt = ( "당신은 서버 인프라 모니터링 AI입니다. " "아래 이상 항목들을 분석해서 간결하게 설명하고 권장 조치를 알려주세요.\n\n" @@ -81,118 +132,113 @@ async def generate_explanation(alerts: list[str]) -> str: }, ) if resp.status_code == 200: - data = resp.json() - return data["choices"][0]["message"]["content"].strip() + return resp.json()["choices"][0]["message"]["content"].strip() except Exception: - log.debug("Gemma 설명 생성 실패 — rule 결과만 전송", exc_info=True) + log.debug("Gemma 설명 생성 실패", exc_info=True) return "" # --- Alert --- async def send_alert(message: str) -> bool: - """Send alert to Synology Chat via incoming webhook.""" + """Send alert to Synology Chat.""" if not SYNOLOGY_WEBHOOK_URL: log.warning("SYNOLOGY_INCOMING_URL not set — alert skipped") - log.info("Alert content: %s", message) + log.info("Alert: %s", message) return False payload = json.dumps({"text": f"[infra-agent] {message}"}, ensure_ascii=False) try: async with httpx.AsyncClient(verify=False, timeout=10.0) as client: - resp = await client.post( - SYNOLOGY_WEBHOOK_URL, - data={"payload": payload}, - ) + resp = await client.post(SYNOLOGY_WEBHOOK_URL, data={"payload": payload}) if resp.status_code == 200: log.info("Alert sent: %s", message[:100]) return True - log.error("Alert failed: %d %s", resp.status_code, resp.text) + log.error("Alert failed: %d", resp.status_code) return False except Exception: log.exception("Failed to send alert") return False -# --- Rules --- +# --- Rules → {key: description} --- -async def check_docker_rules() -> list[str]: - """Check Docker containers on GPU server.""" - alerts = [] +async def check_docker_rules() -> dict[str, str]: + """Returns {issue_key: human_description}.""" + issues: dict[str, str] = {} result = await docker_status("gpu") - - if not result.ok: - if result.error_type: - alerts.append(f"GPU Docker 확인 실패: {result.error}") - return alerts + if result.error_type: + k = _check_key("gpu-docker") + issues[k] = f"GPU Docker 확인 실패: {result.error}" + return issues for c in result.containers: if c.name in IGNORED_CONTAINERS: continue - if c.status != "running": - alerts.append(f"GPU 컨테이너 다운: {c.name} ({c.status})") - elif c.status == "restarting": - alerts.append(f"GPU 컨테이너 재시작 중: {c.name}") - - return alerts + if c.status == "restarting": + k = _docker_key("gpu", c.name, "restarting") + issues[k] = f"GPU 컨테이너 재시작 중: {c.name}" + elif c.status != "running": + k = _docker_key("gpu", c.name, c.status) + issues[k] = f"GPU 컨테이너 다운: {c.name} ({c.status})" + return issues -async def check_disk_rules() -> list[str]: - """Check disk usage on all hosts.""" - alerts = [] +async def check_disk_rules() -> dict[str, str]: + issues: dict[str, str] = {} for host in ["gpu", "macmini"]: result = await disk_usage(host) if not result.ok: - alerts.append(f"{host} 디스크 확인 실패: {result.error}") + k = _check_key(f"{host}-disk") + issues[k] = f"{host} 디스크 확인 실패: {result.error}" continue for fs in result.filesystems: if fs.used_pct >= DISK_WARN_PCT: - alerts.append( - f"{host} 디스크 경고: {fs.mount} {fs.used_pct}% " - f"(사용 {fs.used}/{fs.total})" - ) - return alerts + k = _disk_key(host, fs.mount) + issues[k] = f"{host} 디스크 경고: {fs.mount} {fs.used_pct}% ({fs.used}/{fs.total})" + return issues -async def check_health_rules() -> list[str]: - """Check critical services health.""" - alerts = [] +async def check_health_rules() -> dict[str, str]: + issues: dict[str, str] = {} for svc in HEALTH_SERVICES: result = await service_health(svc) if not result.ok: detail = result.error or result.status - alerts.append(f"서비스 다운: {svc} — {detail}") + k = _health_key(svc, "down") + issues[k] = f"서비스 다운: {svc} — {detail}" elif result.status == "degraded": - alerts.append(f"서비스 저하: {svc} — {result.details}") - return alerts + k = _health_key(svc, "degraded") + issues[k] = f"서비스 저하: {svc}" + return issues -async def check_network_rules() -> list[str]: - """Check Tailscale connectivity for critical hosts.""" - alerts = [] +async def check_network_rules() -> dict[str, str]: + issues: dict[str, str] = {} result = await tailscale_status() if not result.ok: - alerts.append(f"Tailscale 확인 실패: {result.error}") - return alerts + k = _check_key("tailscale") + issues[k] = f"Tailscale 확인 실패: {result.error}" + return issues online_hosts = {p.hostname for p in result.peers if p.status != "offline"} for expected in EXPECTED_TAILSCALE_HOSTS: if expected not in online_hosts: - alerts.append(f"Tailscale 오프라인: {expected}") - - return alerts + k = _tailscale_key(expected) + issues[k] = f"Tailscale 오프라인: {expected}" + return issues -# --- Main --- +# --- State Machine --- async def run_checks() -> None: - """Run all checks and send alerts if needed.""" + """Run checks, compare with state, alert on change only.""" now = datetime.now(timezone.utc).isoformat() log.info("=== 상태 수집 시작 (%s) ===", now) - all_alerts: list[str] = [] + # Collect all current issues: {key: description} + current_issues: dict[str, str] = {} - # Run all checks concurrently results = await asyncio.gather( check_docker_rules(), check_disk_rules(), @@ -200,33 +246,90 @@ async def run_checks() -> None: check_network_rules(), return_exceptions=True, ) - for result in results: if isinstance(result, Exception): - all_alerts.append(f"체크 실패: {result}") + k = _check_key(str(result)[:20]) + current_issues[k] = f"체크 실패: {result}" else: - all_alerts.extend(result) + current_issues.update(result) - if all_alerts: - log.warning("이상 감지 %d건:", len(all_alerts)) - for a in all_alerts: + # Load previous state + state = _load_state() + issues_state = state.get("issues", {}) + # Each issue: {"fail_count": int, "success_count": int, "alerted": bool, "desc": str} + + new_alerts: list[str] = [] # issues to alert about (newly confirmed) + recovery_alerts: list[str] = [] # issues that recovered + + # Track all keys we've processed + processed_keys = set() + + # --- Process current issues --- + for key, desc in current_issues.items(): + processed_keys.add(key) + entry = issues_state.get(key, {"fail_count": 0, "success_count": 0, "alerted": False, "desc": ""}) + + # Failure detected: increment fail, reset success + entry["fail_count"] = entry.get("fail_count", 0) + 1 + entry["success_count"] = 0 + entry["desc"] = desc + + if entry["fail_count"] >= ALERT_THRESHOLD and not entry.get("alerted"): + new_alerts.append(desc) + entry["alerted"] = True + + issues_state[key] = entry + + # --- Process resolved issues (in state but not in current) --- + resolved_keys = [] + for key in list(issues_state.keys()): + if key in processed_keys: + continue + + entry = issues_state[key] + # Success: increment success, reset fail + entry["success_count"] = entry.get("success_count", 0) + 1 + entry["fail_count"] = 0 + + if entry["success_count"] >= RECOVERY_THRESHOLD: + if entry.get("alerted"): + recovery_alerts.append(f"정상 복구: {entry.get('desc', key)}") + resolved_keys.append(key) + else: + issues_state[key] = entry + + # Remove fully resolved issues + for key in resolved_keys: + issues_state.pop(key, None) + + # Save state + _save_state({"issues": issues_state}) + + # --- Send alerts --- + if new_alerts: + log.warning("새 이상 %d건 (debounce 확정):", len(new_alerts)) + for a in new_alerts: log.warning(" - %s", a) - # Gemma 2nd: generate explanation - explanation = await generate_explanation(all_alerts) - if explanation: - log.info("Gemma 설명: %s", explanation[:100]) - - # Build alert message - lines = [f"이상 감지 {len(all_alerts)}건:"] - lines.extend(f"- {a}" for a in all_alerts) + explanation = await generate_explanation(new_alerts) + lines = [f"이상 감지 {len(new_alerts)}건:"] + lines.extend(f"- {a}" for a in new_alerts) if explanation: lines.append(f"\n분석: {explanation}") - message = "\n".join(lines) + await send_alert("\n".join(lines)) - await send_alert(message) - else: - log.info("전체 정상") + if recovery_alerts: + log.info("복구 %d건:", len(recovery_alerts)) + for a in recovery_alerts: + log.info(" - %s", a) + await send_alert("\n".join(recovery_alerts)) + + if not new_alerts and not recovery_alerts: + active = sum(1 for e in issues_state.values() if e.get("alerted")) + if active: + log.info("전체 정상 (관찰 중 %d건)", len(issues_state)) + else: + log.info("전체 정상") def main():