fix(infra): agent alert-on-change — debounce + stable key + MacBook 제외

- 상태 파일로 이전 이슈 추적 (~/Library/Application Support/infra-agent/)
- stable issue key (docker:gpu:container:status 형태)
- 2회 연속 실패 시 알림, 2회 연속 성공 시 복구 알림
- 동일 이슈 지속 시 무음 (alert storm 방지)
- MacBook Pro를 EXPECTED_TAILSCALE_HOSTS에서 제거 (잠자기는 정상)
- state file atomic write + 손상 시 graceful fallback

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Hyungi Ahn
2026-04-14 06:53:24 +09:00
parent 03e3df058f
commit 378866a99c
+187 -84
View File
@@ -1,11 +1,12 @@
"""Infra Monitoring Agent — rule-first, Gemma 2nd.
"""Infra Monitoring Agent — rule-first, Gemma 2nd, alert-on-change.
5분마다 실행되어:
1. core/ 함수로 상태 수집
2. Rule 기반 1차 판정 (threshold/패턴)
3. 이상 감지 시 → Gemma 4가 자연어 설명 생성
4. 시놀로지 Chat 알림 (rule 요약 + Gemma 설명)
5. 로그stdout (launchd가 캡처)
2. Rule 기반 1차 판정 → stable issue key 생성
3. 상태 파일과 비교 (debounce: 2회 연속 기준)
4. 새 이슈 → Gemma 설명 + 시놀로지 Chat 알림
5. 복구복구 알림
6. 동일 이슈 지속 → 무음
"""
from __future__ import annotations
@@ -14,14 +15,13 @@ import asyncio
import json
import logging
import os
import sys
import tempfile
from datetime import datetime, timezone
from pathlib import Path
import httpx
from dotenv import load_dotenv
# gpu-services 루트에서 실행되므로 infra.core를 import 가능
from infra.core.docker import docker_status
from infra.core.health import service_health
from infra.core.system import disk_usage
@@ -34,7 +34,6 @@ logging.basicConfig(
format="%(asctime)s [%(levelname)s] %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
# asyncssh is extremely verbose at INFO level
logging.getLogger("asyncssh").setLevel(logging.WARNING)
log = logging.getLogger("infra-agent")
@@ -42,27 +41,79 @@ log = logging.getLogger("infra-agent")
SYNOLOGY_WEBHOOK_URL = os.getenv("SYNOLOGY_INCOMING_URL", "")
DISK_WARN_PCT = 85
EXPECTED_TAILSCALE_HOSTS = {"sub-server", "hyungi-macmini", "hyungi-macbookpro"}
# Docker containers known to be intentionally stopped
# Only server-grade hosts (MacBook Pro excluded — sleep is normal)
EXPECTED_TAILSCALE_HOSTS = {"sub-server", "hyungi-macmini"}
IGNORED_CONTAINERS = {
"hyungi_document_server-ai-gateway-1",
"hyungi_document_server-ollama-1",
}
# Services to health-check
HEALTH_SERVICES = ["document-server", "mlx", "ollama-gpu"]
# Gemma 4 — Mac mini MLX proxy (localhost on Mac mini)
GEMMA_URL = "http://localhost:8801/v1/chat/completions"
GEMMA_MODEL = "mlx-community/gemma-4-26b-a4b-it-8bit"
GEMMA_TIMEOUT = 30 # seconds
GEMMA_TIMEOUT = 30
# Debounce: consecutive cycles before alert/recovery
ALERT_THRESHOLD = 2 # 2 consecutive failures → alert
RECOVERY_THRESHOLD = 2 # 2 consecutive successes → recovery
# State file (persistent across reboots)
STATE_DIR = Path.home() / "Library" / "Application Support" / "infra-agent"
STATE_FILE = STATE_DIR / "state.json"
# --- State Management ---
def _load_state() -> dict:
"""Load state file. Returns empty state on failure (graceful fallback)."""
try:
if STATE_FILE.exists():
data = json.loads(STATE_FILE.read_text())
if isinstance(data, dict) and "issues" in data:
return data
except (json.JSONDecodeError, OSError) as e:
log.warning("State file 손상 — 초기화: %s", e)
return {"issues": {}}
def _save_state(state: dict) -> None:
"""Save state file atomically (write tmp + rename)."""
STATE_DIR.mkdir(parents=True, exist_ok=True)
tmp = STATE_FILE.with_suffix(".tmp")
try:
tmp.write_text(json.dumps(state, ensure_ascii=False, indent=2))
tmp.rename(STATE_FILE)
except OSError:
log.exception("State file 저장 실패")
# --- Issue Key Generation ---
# Key format: {category}:{host}:{target}:{condition}
# Examples: docker:gpu:fastapi-1:exited, disk:gpu:/:high, tailscale:sub-server:offline
def _docker_key(host: str, container: str, status: str) -> str:
return f"docker:{host}:{container}:{status}"
def _disk_key(host: str, mount: str) -> str:
return f"disk:{host}:{mount}:high"
def _health_key(service: str, status: str) -> str:
return f"health:{service}:{status}"
def _tailscale_key(hostname: str) -> str:
return f"tailscale:{hostname}:offline"
def _check_key(error: str) -> str:
return f"check:error:{error[:30]}"
# --- Gemma ---
async def generate_explanation(alerts: list[str]) -> str:
"""Ask Gemma 4 to explain alerts and suggest actions. Returns empty on failure."""
"""Ask Gemma 4 to explain alerts. Returns empty on failure."""
prompt = (
"당신은 서버 인프라 모니터링 AI입니다. "
"아래 이상 항목들을 분석해서 간결하게 설명하고 권장 조치를 알려주세요.\n\n"
@@ -81,118 +132,113 @@ async def generate_explanation(alerts: list[str]) -> str:
},
)
if resp.status_code == 200:
data = resp.json()
return data["choices"][0]["message"]["content"].strip()
return resp.json()["choices"][0]["message"]["content"].strip()
except Exception:
log.debug("Gemma 설명 생성 실패 — rule 결과만 전송", exc_info=True)
log.debug("Gemma 설명 생성 실패", exc_info=True)
return ""
# --- Alert ---
async def send_alert(message: str) -> bool:
"""Send alert to Synology Chat via incoming webhook."""
"""Send alert to Synology Chat."""
if not SYNOLOGY_WEBHOOK_URL:
log.warning("SYNOLOGY_INCOMING_URL not set — alert skipped")
log.info("Alert content: %s", message)
log.info("Alert: %s", message)
return False
payload = json.dumps({"text": f"[infra-agent] {message}"}, ensure_ascii=False)
try:
async with httpx.AsyncClient(verify=False, timeout=10.0) as client:
resp = await client.post(
SYNOLOGY_WEBHOOK_URL,
data={"payload": payload},
)
resp = await client.post(SYNOLOGY_WEBHOOK_URL, data={"payload": payload})
if resp.status_code == 200:
log.info("Alert sent: %s", message[:100])
return True
log.error("Alert failed: %d %s", resp.status_code, resp.text)
log.error("Alert failed: %d", resp.status_code)
return False
except Exception:
log.exception("Failed to send alert")
return False
# --- Rules ---
# --- Rules → {key: description} ---
async def check_docker_rules() -> list[str]:
"""Check Docker containers on GPU server."""
alerts = []
async def check_docker_rules() -> dict[str, str]:
"""Returns {issue_key: human_description}."""
issues: dict[str, str] = {}
result = await docker_status("gpu")
if not result.ok:
if result.error_type:
alerts.append(f"GPU Docker 확인 실패: {result.error}")
return alerts
if result.error_type:
k = _check_key("gpu-docker")
issues[k] = f"GPU Docker 확인 실패: {result.error}"
return issues
for c in result.containers:
if c.name in IGNORED_CONTAINERS:
continue
if c.status != "running":
alerts.append(f"GPU 컨테이너 다운: {c.name} ({c.status})")
elif c.status == "restarting":
alerts.append(f"GPU 컨테이너 재시작 중: {c.name}")
return alerts
if c.status == "restarting":
k = _docker_key("gpu", c.name, "restarting")
issues[k] = f"GPU 컨테이너 재시작 중: {c.name}"
elif c.status != "running":
k = _docker_key("gpu", c.name, c.status)
issues[k] = f"GPU 컨테이너 다운: {c.name} ({c.status})"
return issues
async def check_disk_rules() -> list[str]:
"""Check disk usage on all hosts."""
alerts = []
async def check_disk_rules() -> dict[str, str]:
issues: dict[str, str] = {}
for host in ["gpu", "macmini"]:
result = await disk_usage(host)
if not result.ok:
alerts.append(f"{host} 디스크 확인 실패: {result.error}")
k = _check_key(f"{host}-disk")
issues[k] = f"{host} 디스크 확인 실패: {result.error}"
continue
for fs in result.filesystems:
if fs.used_pct >= DISK_WARN_PCT:
alerts.append(
f"{host} 디스크 경고: {fs.mount} {fs.used_pct}% "
f"(사용 {fs.used}/{fs.total})"
)
return alerts
k = _disk_key(host, fs.mount)
issues[k] = f"{host} 디스크 경고: {fs.mount} {fs.used_pct}% ({fs.used}/{fs.total})"
return issues
async def check_health_rules() -> list[str]:
"""Check critical services health."""
alerts = []
async def check_health_rules() -> dict[str, str]:
issues: dict[str, str] = {}
for svc in HEALTH_SERVICES:
result = await service_health(svc)
if not result.ok:
detail = result.error or result.status
alerts.append(f"서비스 다운: {svc}{detail}")
k = _health_key(svc, "down")
issues[k] = f"서비스 다운: {svc}{detail}"
elif result.status == "degraded":
alerts.append(f"서비스 저하: {svc}{result.details}")
return alerts
k = _health_key(svc, "degraded")
issues[k] = f"서비스 저하: {svc}"
return issues
async def check_network_rules() -> list[str]:
"""Check Tailscale connectivity for critical hosts."""
alerts = []
async def check_network_rules() -> dict[str, str]:
issues: dict[str, str] = {}
result = await tailscale_status()
if not result.ok:
alerts.append(f"Tailscale 확인 실패: {result.error}")
return alerts
k = _check_key("tailscale")
issues[k] = f"Tailscale 확인 실패: {result.error}"
return issues
online_hosts = {p.hostname for p in result.peers if p.status != "offline"}
for expected in EXPECTED_TAILSCALE_HOSTS:
if expected not in online_hosts:
alerts.append(f"Tailscale 오프라인: {expected}")
return alerts
k = _tailscale_key(expected)
issues[k] = f"Tailscale 오프라인: {expected}"
return issues
# --- Main ---
# --- State Machine ---
async def run_checks() -> None:
"""Run all checks and send alerts if needed."""
"""Run checks, compare with state, alert on change only."""
now = datetime.now(timezone.utc).isoformat()
log.info("=== 상태 수집 시작 (%s) ===", now)
all_alerts: list[str] = []
# Collect all current issues: {key: description}
current_issues: dict[str, str] = {}
# Run all checks concurrently
results = await asyncio.gather(
check_docker_rules(),
check_disk_rules(),
@@ -200,33 +246,90 @@ async def run_checks() -> None:
check_network_rules(),
return_exceptions=True,
)
for result in results:
if isinstance(result, Exception):
all_alerts.append(f"체크 실패: {result}")
k = _check_key(str(result)[:20])
current_issues[k] = f"체크 실패: {result}"
else:
all_alerts.extend(result)
current_issues.update(result)
if all_alerts:
log.warning("이상 감지 %d건:", len(all_alerts))
for a in all_alerts:
# Load previous state
state = _load_state()
issues_state = state.get("issues", {})
# Each issue: {"fail_count": int, "success_count": int, "alerted": bool, "desc": str}
new_alerts: list[str] = [] # issues to alert about (newly confirmed)
recovery_alerts: list[str] = [] # issues that recovered
# Track all keys we've processed
processed_keys = set()
# --- Process current issues ---
for key, desc in current_issues.items():
processed_keys.add(key)
entry = issues_state.get(key, {"fail_count": 0, "success_count": 0, "alerted": False, "desc": ""})
# Failure detected: increment fail, reset success
entry["fail_count"] = entry.get("fail_count", 0) + 1
entry["success_count"] = 0
entry["desc"] = desc
if entry["fail_count"] >= ALERT_THRESHOLD and not entry.get("alerted"):
new_alerts.append(desc)
entry["alerted"] = True
issues_state[key] = entry
# --- Process resolved issues (in state but not in current) ---
resolved_keys = []
for key in list(issues_state.keys()):
if key in processed_keys:
continue
entry = issues_state[key]
# Success: increment success, reset fail
entry["success_count"] = entry.get("success_count", 0) + 1
entry["fail_count"] = 0
if entry["success_count"] >= RECOVERY_THRESHOLD:
if entry.get("alerted"):
recovery_alerts.append(f"정상 복구: {entry.get('desc', key)}")
resolved_keys.append(key)
else:
issues_state[key] = entry
# Remove fully resolved issues
for key in resolved_keys:
issues_state.pop(key, None)
# Save state
_save_state({"issues": issues_state})
# --- Send alerts ---
if new_alerts:
log.warning("새 이상 %d건 (debounce 확정):", len(new_alerts))
for a in new_alerts:
log.warning(" - %s", a)
# Gemma 2nd: generate explanation
explanation = await generate_explanation(all_alerts)
if explanation:
log.info("Gemma 설명: %s", explanation[:100])
# Build alert message
lines = [f"이상 감지 {len(all_alerts)}건:"]
lines.extend(f"- {a}" for a in all_alerts)
explanation = await generate_explanation(new_alerts)
lines = [f"이상 감지 {len(new_alerts)}건:"]
lines.extend(f"- {a}" for a in new_alerts)
if explanation:
lines.append(f"\n분석: {explanation}")
message = "\n".join(lines)
await send_alert("\n".join(lines))
await send_alert(message)
else:
log.info("전체 정상")
if recovery_alerts:
log.info("복구 %d건:", len(recovery_alerts))
for a in recovery_alerts:
log.info(" - %s", a)
await send_alert("\n".join(recovery_alerts))
if not new_alerts and not recovery_alerts:
active = sum(1 for e in issues_state.values() if e.get("alerted"))
if active:
log.info("전체 정상 (관찰 중 %d건)", len(issues_state))
else:
log.info("전체 정상")
def main():