"""Infra Monitoring Agent — rule-first, Gemma 2nd, alert-on-change. 5분마다 실행되어: 1. core/ 함수로 상태 수집 2. Rule 기반 1차 판정 → stable issue key 생성 3. 상태 파일과 비교 (debounce: 2회 연속 기준) 4. 새 이슈 → Gemma 설명 + 시놀로지 Chat 알림 5. 복구 → 복구 알림 6. 동일 이슈 지속 → 무음 """ from __future__ import annotations import asyncio import json import logging import os import tempfile from datetime import datetime, timezone from pathlib import Path from zoneinfo import ZoneInfo import httpx from dotenv import load_dotenv from infra.core.docker import docker_status from infra.core.health import service_health from infra.core.system import disk_usage from infra.core.network import tailscale_status load_dotenv(Path(__file__).parent / ".env") logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", datefmt="%Y-%m-%d %H:%M:%S", ) logging.getLogger("asyncssh").setLevel(logging.WARNING) log = logging.getLogger("infra-agent") # --- Config --- SYNOLOGY_WEBHOOK_URL = os.getenv("SYNOLOGY_INCOMING_URL", "") DISK_WARN_PCT = 85 # Only server-grade hosts (MacBook Pro excluded — sleep is normal) EXPECTED_TAILSCALE_HOSTS = {"sub-server", "hyungi-macmini"} IGNORED_CONTAINERS = { "hyungi_document_server-ai-gateway-1", "hyungi_document_server-ollama-1", } HEALTH_SERVICES = ["document-server", "mlx", "ollama-gpu"] GEMMA_URL = "http://localhost:8801/v1/chat/completions" GEMMA_MODEL = "mlx-community/gemma-4-26b-a4b-it-8bit" GEMMA_TIMEOUT = 30 # Debounce: consecutive cycles before alert/recovery ALERT_THRESHOLD = 2 # 2 consecutive failures → alert RECOVERY_THRESHOLD = 2 # 2 consecutive successes → recovery # State file (persistent across reboots) STATE_DIR = Path.home() / "Library" / "Application Support" / "infra-agent" STATE_FILE = STATE_DIR / "state.json" # --- State Management --- def _load_state() -> dict: """Load state file. Returns empty state on failure (graceful fallback).""" try: if STATE_FILE.exists(): data = json.loads(STATE_FILE.read_text()) if isinstance(data, dict) and "issues" in data: return data except (json.JSONDecodeError, OSError) as e: log.warning("State file 손상 — 초기화: %s", e) return {"issues": {}} def _save_state(state: dict) -> None: """Save state file atomically (write tmp + rename).""" STATE_DIR.mkdir(parents=True, exist_ok=True) tmp = STATE_FILE.with_suffix(".tmp") try: tmp.write_text(json.dumps(state, ensure_ascii=False, indent=2)) tmp.rename(STATE_FILE) except OSError: log.exception("State file 저장 실패") # --- Issue Key Generation --- # Key format: {category}:{host}:{target}:{condition} # Examples: docker:gpu:fastapi-1:exited, disk:gpu:/:high, tailscale:sub-server:offline def _docker_key(host: str, container: str, status: str) -> str: return f"docker:{host}:{container}:{status}" def _disk_key(host: str, mount: str) -> str: return f"disk:{host}:{mount}:high" def _health_key(service: str, status: str) -> str: return f"health:{service}:{status}" def _tailscale_key(hostname: str) -> str: return f"tailscale:{hostname}:offline" def _check_key(error: str) -> str: return f"check:error:{error[:30]}" # --- Gemma --- async def generate_explanation(alerts: list[str]) -> str: """Ask Gemma 4 to explain alerts. Returns empty on failure.""" prompt = ( "당신은 서버 인프라 모니터링 AI입니다. " "아래 이상 항목들을 분석해서 간결하게 설명하고 권장 조치를 알려주세요.\n\n" "이상 항목:\n" + "\n".join(f"- {a}" for a in alerts) + "\n\n" "형식: 1~3문장으로 원인 분석 + 권장 조치. 한국어로." ) try: async with httpx.AsyncClient(timeout=GEMMA_TIMEOUT) as client: resp = await client.post( GEMMA_URL, json={ "model": GEMMA_MODEL, "messages": [{"role": "user", "content": prompt}], "max_tokens": 200, "temperature": 0.3, }, ) if resp.status_code == 200: return resp.json()["choices"][0]["message"]["content"].strip() except Exception: log.debug("Gemma 설명 생성 실패", exc_info=True) return "" # --- Alert --- async def send_alert(message: str) -> bool: """Send alert to Synology Chat.""" if not SYNOLOGY_WEBHOOK_URL: log.warning("SYNOLOGY_INCOMING_URL not set — alert skipped") log.info("Alert: %s", message) return False payload = json.dumps({"text": f"[infra-agent] {message}"}, ensure_ascii=False) try: async with httpx.AsyncClient(verify=False, timeout=10.0) as client: resp = await client.post(SYNOLOGY_WEBHOOK_URL, data={"payload": payload}) if resp.status_code == 200: log.info("Alert sent: %s", message[:100]) return True log.error("Alert failed: %d", resp.status_code) return False except Exception: log.exception("Failed to send alert") return False # --- Rules → {key: description} --- async def check_docker_rules() -> dict[str, str]: """Returns {issue_key: human_description}.""" issues: dict[str, str] = {} result = await docker_status("gpu") if result.error_type: k = _check_key("gpu-docker") issues[k] = f"GPU Docker 확인 실패: {result.error}" return issues for c in result.containers: if c.name in IGNORED_CONTAINERS: continue if c.status == "restarting": k = _docker_key("gpu", c.name, "restarting") issues[k] = f"GPU 컨테이너 재시작 중: {c.name}" elif c.status != "running": k = _docker_key("gpu", c.name, c.status) issues[k] = f"GPU 컨테이너 다운: {c.name} ({c.status})" return issues async def check_disk_rules() -> dict[str, str]: issues: dict[str, str] = {} for host in ["gpu", "macmini"]: result = await disk_usage(host) if not result.ok: k = _check_key(f"{host}-disk") issues[k] = f"{host} 디스크 확인 실패: {result.error}" continue for fs in result.filesystems: if fs.used_pct >= DISK_WARN_PCT: k = _disk_key(host, fs.mount) issues[k] = f"{host} 디스크 경고: {fs.mount} {fs.used_pct}% ({fs.used}/{fs.total})" return issues async def check_health_rules() -> dict[str, str]: issues: dict[str, str] = {} # KST 0~7시는 Document Server tier_backfill 가동 시간대 (정책 0~6시 + 잔여 처리 1h buffer). # 26B 가 batch 점유로 /v1/models 응답이 5~10초 lock 되는 게 정상이므로 mlx 알람만 격하. # 정책: ~/Documents/code/hyungi_Document_Server/app/workers/tier_backfill.py NIGHT_START/END_HOUR kst_hour = datetime.now(tz=ZoneInfo("Asia/Seoul")).hour is_backfill_window = 0 <= kst_hour < 7 for svc in HEALTH_SERVICES: result = await service_health(svc) if not result.ok: if svc == "mlx" and is_backfill_window: log.info("[mute] mlx down — KST %d시 backfill window", kst_hour) continue detail = result.error or result.status k = _health_key(svc, "down") issues[k] = f"서비스 다운: {svc} — {detail}" elif result.status == "degraded": if svc == "mlx" and is_backfill_window: log.info("[mute] mlx degraded — KST %d시 backfill window", kst_hour) continue k = _health_key(svc, "degraded") issues[k] = f"서비스 저하: {svc}" return issues async def check_network_rules() -> dict[str, str]: issues: dict[str, str] = {} result = await tailscale_status() if not result.ok: k = _check_key("tailscale") issues[k] = f"Tailscale 확인 실패: {result.error}" return issues online_hosts = {p.hostname for p in result.peers if p.status != "offline"} for expected in EXPECTED_TAILSCALE_HOSTS: if expected not in online_hosts: k = _tailscale_key(expected) issues[k] = f"Tailscale 오프라인: {expected}" return issues # --- State Machine --- async def run_checks() -> None: """Run checks, compare with state, alert on change only.""" now = datetime.now(timezone.utc).isoformat() log.info("=== 상태 수집 시작 (%s) ===", now) # Collect all current issues: {key: description} current_issues: dict[str, str] = {} results = await asyncio.gather( check_docker_rules(), check_disk_rules(), check_health_rules(), check_network_rules(), return_exceptions=True, ) for result in results: if isinstance(result, Exception): k = _check_key(str(result)[:20]) current_issues[k] = f"체크 실패: {result}" else: current_issues.update(result) # Load previous state state = _load_state() issues_state = state.get("issues", {}) # Each issue: {"fail_count": int, "success_count": int, "alerted": bool, "desc": str} new_alerts: list[str] = [] # issues to alert about (newly confirmed) recovery_alerts: list[str] = [] # issues that recovered # Track all keys we've processed processed_keys = set() # --- Process current issues --- for key, desc in current_issues.items(): processed_keys.add(key) entry = issues_state.get(key, {"fail_count": 0, "success_count": 0, "alerted": False, "desc": ""}) # Failure detected: increment fail, reset success entry["fail_count"] = entry.get("fail_count", 0) + 1 entry["success_count"] = 0 entry["desc"] = desc if entry["fail_count"] >= ALERT_THRESHOLD and not entry.get("alerted"): new_alerts.append(desc) entry["alerted"] = True issues_state[key] = entry # --- Process resolved issues (in state but not in current) --- resolved_keys = [] for key in list(issues_state.keys()): if key in processed_keys: continue entry = issues_state[key] # Success: increment success, reset fail entry["success_count"] = entry.get("success_count", 0) + 1 entry["fail_count"] = 0 if entry["success_count"] >= RECOVERY_THRESHOLD: if entry.get("alerted"): recovery_alerts.append(f"정상 복구: {entry.get('desc', key)}") resolved_keys.append(key) else: issues_state[key] = entry # Remove fully resolved issues for key in resolved_keys: issues_state.pop(key, None) # Save state _save_state({"issues": issues_state}) # --- Send alerts --- if new_alerts: log.warning("새 이상 %d건 (debounce 확정):", len(new_alerts)) for a in new_alerts: log.warning(" - %s", a) explanation = await generate_explanation(new_alerts) lines = [f"이상 감지 {len(new_alerts)}건:"] lines.extend(f"- {a}" for a in new_alerts) if explanation: lines.append(f"\n분석: {explanation}") await send_alert("\n".join(lines)) if recovery_alerts: log.info("복구 %d건:", len(recovery_alerts)) for a in recovery_alerts: log.info(" - %s", a) await send_alert("\n".join(recovery_alerts)) if not new_alerts and not recovery_alerts: active = sum(1 for e in issues_state.values() if e.get("alerted")) if active: log.info("전체 정상 (관찰 중 %d건)", len(issues_state)) else: log.info("전체 정상") def main(): asyncio.run(run_checks()) if __name__ == "__main__": main()