"""Infra Monitoring Agent — rule-first, Gemma 2nd, alert-on-change. 5분마다 실행되어: 1. core/ 함수로 상태 수집 2. Rule 기반 1차 판정 → stable issue key 생성 3. 상태 파일과 비교 (debounce: 2회 연속 기준) 4. 새 이슈 → Gemma 설명 + 시놀로지 Chat 알림 5. 복구 → 복구 알림 6. 동일 이슈 지속 → 무음 """ from __future__ import annotations import asyncio import json import logging import os import tempfile from datetime import datetime, timezone from pathlib import Path import httpx from dotenv import load_dotenv from infra.core.docker import docker_status from infra.core.health import service_health from infra.core.system import disk_usage from infra.core.network import tailscale_status load_dotenv(Path(__file__).parent / ".env") logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", datefmt="%Y-%m-%d %H:%M:%S", ) logging.getLogger("asyncssh").setLevel(logging.WARNING) log = logging.getLogger("infra-agent") # --- Config --- SYNOLOGY_WEBHOOK_URL = os.getenv("SYNOLOGY_INCOMING_URL", "") DISK_WARN_PCT = 85 # Only server-grade hosts (MacBook Pro excluded — sleep is normal) EXPECTED_TAILSCALE_HOSTS = {"sub-server", "hyungi-macmini"} IGNORED_CONTAINERS = { "hyungi_document_server-ai-gateway-1", "hyungi_document_server-ollama-1", } HEALTH_SERVICES = ["document-server", "mlx", "ollama-gpu"] GEMMA_URL = "http://localhost:8801/v1/chat/completions" GEMMA_MODEL = "mlx-community/gemma-4-26b-a4b-it-8bit" GEMMA_TIMEOUT = 30 # Debounce: consecutive cycles before alert/recovery ALERT_THRESHOLD = 2 # 2 consecutive failures → alert RECOVERY_THRESHOLD = 2 # 2 consecutive successes → recovery # State file (persistent across reboots) STATE_DIR = Path.home() / "Library" / "Application Support" / "infra-agent" STATE_FILE = STATE_DIR / "state.json" # --- State Management --- def _load_state() -> dict: """Load state file. Returns empty state on failure (graceful fallback).""" try: if STATE_FILE.exists(): data = json.loads(STATE_FILE.read_text()) if isinstance(data, dict) and "issues" in data: return data except (json.JSONDecodeError, OSError) as e: log.warning("State file 손상 — 초기화: %s", e) return {"issues": {}} def _save_state(state: dict) -> None: """Save state file atomically (write tmp + rename).""" STATE_DIR.mkdir(parents=True, exist_ok=True) tmp = STATE_FILE.with_suffix(".tmp") try: tmp.write_text(json.dumps(state, ensure_ascii=False, indent=2)) tmp.rename(STATE_FILE) except OSError: log.exception("State file 저장 실패") # --- Issue Key Generation --- # Key format: {category}:{host}:{target}:{condition} # Examples: docker:gpu:fastapi-1:exited, disk:gpu:/:high, tailscale:sub-server:offline def _docker_key(host: str, container: str, status: str) -> str: return f"docker:{host}:{container}:{status}" def _disk_key(host: str, mount: str) -> str: return f"disk:{host}:{mount}:high" def _health_key(service: str, status: str) -> str: return f"health:{service}:{status}" def _tailscale_key(hostname: str) -> str: return f"tailscale:{hostname}:offline" def _check_key(error: str) -> str: return f"check:error:{error[:30]}" # --- Gemma --- async def generate_explanation(alerts: list[str]) -> str: """Ask Gemma 4 to explain alerts. Returns empty on failure.""" prompt = ( "당신은 서버 인프라 모니터링 AI입니다. " "아래 이상 항목들을 분석해서 간결하게 설명하고 권장 조치를 알려주세요.\n\n" "이상 항목:\n" + "\n".join(f"- {a}" for a in alerts) + "\n\n" "형식: 1~3문장으로 원인 분석 + 권장 조치. 한국어로." ) try: async with httpx.AsyncClient(timeout=GEMMA_TIMEOUT) as client: resp = await client.post( GEMMA_URL, json={ "model": GEMMA_MODEL, "messages": [{"role": "user", "content": prompt}], "max_tokens": 200, "temperature": 0.3, }, ) if resp.status_code == 200: return resp.json()["choices"][0]["message"]["content"].strip() except Exception: log.debug("Gemma 설명 생성 실패", exc_info=True) return "" # --- Alert --- async def send_alert(message: str) -> bool: """Send alert to Synology Chat.""" if not SYNOLOGY_WEBHOOK_URL: log.warning("SYNOLOGY_INCOMING_URL not set — alert skipped") log.info("Alert: %s", message) return False payload = json.dumps({"text": f"[infra-agent] {message}"}, ensure_ascii=False) try: async with httpx.AsyncClient(verify=False, timeout=10.0) as client: resp = await client.post(SYNOLOGY_WEBHOOK_URL, data={"payload": payload}) if resp.status_code == 200: log.info("Alert sent: %s", message[:100]) return True log.error("Alert failed: %d", resp.status_code) return False except Exception: log.exception("Failed to send alert") return False # --- Rules → {key: description} --- async def check_docker_rules() -> dict[str, str]: """Returns {issue_key: human_description}.""" issues: dict[str, str] = {} result = await docker_status("gpu") if result.error_type: k = _check_key("gpu-docker") issues[k] = f"GPU Docker 확인 실패: {result.error}" return issues for c in result.containers: if c.name in IGNORED_CONTAINERS: continue if c.status == "restarting": k = _docker_key("gpu", c.name, "restarting") issues[k] = f"GPU 컨테이너 재시작 중: {c.name}" elif c.status != "running": k = _docker_key("gpu", c.name, c.status) issues[k] = f"GPU 컨테이너 다운: {c.name} ({c.status})" return issues async def check_disk_rules() -> dict[str, str]: issues: dict[str, str] = {} for host in ["gpu", "macmini"]: result = await disk_usage(host) if not result.ok: k = _check_key(f"{host}-disk") issues[k] = f"{host} 디스크 확인 실패: {result.error}" continue for fs in result.filesystems: if fs.used_pct >= DISK_WARN_PCT: k = _disk_key(host, fs.mount) issues[k] = f"{host} 디스크 경고: {fs.mount} {fs.used_pct}% ({fs.used}/{fs.total})" return issues async def check_health_rules() -> dict[str, str]: issues: dict[str, str] = {} for svc in HEALTH_SERVICES: result = await service_health(svc) if not result.ok: detail = result.error or result.status k = _health_key(svc, "down") issues[k] = f"서비스 다운: {svc} — {detail}" elif result.status == "degraded": k = _health_key(svc, "degraded") issues[k] = f"서비스 저하: {svc}" return issues async def check_network_rules() -> dict[str, str]: issues: dict[str, str] = {} result = await tailscale_status() if not result.ok: k = _check_key("tailscale") issues[k] = f"Tailscale 확인 실패: {result.error}" return issues online_hosts = {p.hostname for p in result.peers if p.status != "offline"} for expected in EXPECTED_TAILSCALE_HOSTS: if expected not in online_hosts: k = _tailscale_key(expected) issues[k] = f"Tailscale 오프라인: {expected}" return issues # --- State Machine --- async def run_checks() -> None: """Run checks, compare with state, alert on change only.""" now = datetime.now(timezone.utc).isoformat() log.info("=== 상태 수집 시작 (%s) ===", now) # Collect all current issues: {key: description} current_issues: dict[str, str] = {} results = await asyncio.gather( check_docker_rules(), check_disk_rules(), check_health_rules(), check_network_rules(), return_exceptions=True, ) for result in results: if isinstance(result, Exception): k = _check_key(str(result)[:20]) current_issues[k] = f"체크 실패: {result}" else: current_issues.update(result) # Load previous state state = _load_state() issues_state = state.get("issues", {}) # Each issue: {"fail_count": int, "success_count": int, "alerted": bool, "desc": str} new_alerts: list[str] = [] # issues to alert about (newly confirmed) recovery_alerts: list[str] = [] # issues that recovered # Track all keys we've processed processed_keys = set() # --- Process current issues --- for key, desc in current_issues.items(): processed_keys.add(key) entry = issues_state.get(key, {"fail_count": 0, "success_count": 0, "alerted": False, "desc": ""}) # Failure detected: increment fail, reset success entry["fail_count"] = entry.get("fail_count", 0) + 1 entry["success_count"] = 0 entry["desc"] = desc if entry["fail_count"] >= ALERT_THRESHOLD and not entry.get("alerted"): new_alerts.append(desc) entry["alerted"] = True issues_state[key] = entry # --- Process resolved issues (in state but not in current) --- resolved_keys = [] for key in list(issues_state.keys()): if key in processed_keys: continue entry = issues_state[key] # Success: increment success, reset fail entry["success_count"] = entry.get("success_count", 0) + 1 entry["fail_count"] = 0 if entry["success_count"] >= RECOVERY_THRESHOLD: if entry.get("alerted"): recovery_alerts.append(f"정상 복구: {entry.get('desc', key)}") resolved_keys.append(key) else: issues_state[key] = entry # Remove fully resolved issues for key in resolved_keys: issues_state.pop(key, None) # Save state _save_state({"issues": issues_state}) # --- Send alerts --- if new_alerts: log.warning("새 이상 %d건 (debounce 확정):", len(new_alerts)) for a in new_alerts: log.warning(" - %s", a) explanation = await generate_explanation(new_alerts) lines = [f"이상 감지 {len(new_alerts)}건:"] lines.extend(f"- {a}" for a in new_alerts) if explanation: lines.append(f"\n분석: {explanation}") await send_alert("\n".join(lines)) if recovery_alerts: log.info("복구 %d건:", len(recovery_alerts)) for a in recovery_alerts: log.info(" - %s", a) await send_alert("\n".join(recovery_alerts)) if not new_alerts and not recovery_alerts: active = sum(1 for e in issues_state.values() if e.get("alerted")) if active: log.info("전체 정상 (관찰 중 %d건)", len(issues_state)) else: log.info("전체 정상") def main(): asyncio.run(run_checks()) if __name__ == "__main__": main()