Files
gpu-services/infra/agent.py
T
Hyungi Ahn cc2c9467fe fix(infra-agent): mute mlx alerts during KST 0-7h backfill window
Document Server tier_backfill 가 KST 0~6시 사이 26B 에 batch enqueue 하면서
/v1/models 응답이 5~10초 lock 돼 healthcheck timeout 알람이 반복 발생.
정책 의도(야간=batch 점유 시간)와 healthcheck SLA(24/7 동일) 불일치 해결.

- KST 0~7시 (정책 0~6 + 잔여 처리 1h buffer) 는 mlx down/degraded 를 log-only 로 격하
- 주간 timeout 은 그대로 알람 (실사용자 영향 시그널 보존)
- 다른 서비스 (document-server, ollama-gpu) 는 영향 없음

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-27 07:34:04 +09:00

354 lines
12 KiB
Python

"""Infra Monitoring Agent — rule-first, Gemma 2nd, alert-on-change.
5분마다 실행되어:
1. core/ 함수로 상태 수집
2. Rule 기반 1차 판정 → stable issue key 생성
3. 상태 파일과 비교 (debounce: 2회 연속 기준)
4. 새 이슈 → Gemma 설명 + 시놀로지 Chat 알림
5. 복구 → 복구 알림
6. 동일 이슈 지속 → 무음
"""
from __future__ import annotations
import asyncio
import json
import logging
import os
import tempfile
from datetime import datetime, timezone
from pathlib import Path
from zoneinfo import ZoneInfo
import httpx
from dotenv import load_dotenv
from infra.core.docker import docker_status
from infra.core.health import service_health
from infra.core.system import disk_usage
from infra.core.network import tailscale_status
load_dotenv(Path(__file__).parent / ".env")
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
logging.getLogger("asyncssh").setLevel(logging.WARNING)
log = logging.getLogger("infra-agent")
# --- Config ---
SYNOLOGY_WEBHOOK_URL = os.getenv("SYNOLOGY_INCOMING_URL", "")
DISK_WARN_PCT = 85
# Only server-grade hosts (MacBook Pro excluded — sleep is normal)
EXPECTED_TAILSCALE_HOSTS = {"sub-server", "hyungi-macmini"}
IGNORED_CONTAINERS = {
"hyungi_document_server-ai-gateway-1",
"hyungi_document_server-ollama-1",
}
HEALTH_SERVICES = ["document-server", "mlx", "ollama-gpu"]
GEMMA_URL = "http://localhost:8801/v1/chat/completions"
GEMMA_MODEL = "mlx-community/gemma-4-26b-a4b-it-8bit"
GEMMA_TIMEOUT = 30
# Debounce: consecutive cycles before alert/recovery
ALERT_THRESHOLD = 2 # 2 consecutive failures → alert
RECOVERY_THRESHOLD = 2 # 2 consecutive successes → recovery
# State file (persistent across reboots)
STATE_DIR = Path.home() / "Library" / "Application Support" / "infra-agent"
STATE_FILE = STATE_DIR / "state.json"
# --- State Management ---
def _load_state() -> dict:
"""Load state file. Returns empty state on failure (graceful fallback)."""
try:
if STATE_FILE.exists():
data = json.loads(STATE_FILE.read_text())
if isinstance(data, dict) and "issues" in data:
return data
except (json.JSONDecodeError, OSError) as e:
log.warning("State file 손상 — 초기화: %s", e)
return {"issues": {}}
def _save_state(state: dict) -> None:
"""Save state file atomically (write tmp + rename)."""
STATE_DIR.mkdir(parents=True, exist_ok=True)
tmp = STATE_FILE.with_suffix(".tmp")
try:
tmp.write_text(json.dumps(state, ensure_ascii=False, indent=2))
tmp.rename(STATE_FILE)
except OSError:
log.exception("State file 저장 실패")
# --- Issue Key Generation ---
# Key format: {category}:{host}:{target}:{condition}
# Examples: docker:gpu:fastapi-1:exited, disk:gpu:/:high, tailscale:sub-server:offline
def _docker_key(host: str, container: str, status: str) -> str:
return f"docker:{host}:{container}:{status}"
def _disk_key(host: str, mount: str) -> str:
return f"disk:{host}:{mount}:high"
def _health_key(service: str, status: str) -> str:
return f"health:{service}:{status}"
def _tailscale_key(hostname: str) -> str:
return f"tailscale:{hostname}:offline"
def _check_key(error: str) -> str:
return f"check:error:{error[:30]}"
# --- Gemma ---
async def generate_explanation(alerts: list[str]) -> str:
"""Ask Gemma 4 to explain alerts. Returns empty on failure."""
prompt = (
"당신은 서버 인프라 모니터링 AI입니다. "
"아래 이상 항목들을 분석해서 간결하게 설명하고 권장 조치를 알려주세요.\n\n"
"이상 항목:\n" + "\n".join(f"- {a}" for a in alerts) + "\n\n"
"형식: 1~3문장으로 원인 분석 + 권장 조치. 한국어로."
)
try:
async with httpx.AsyncClient(timeout=GEMMA_TIMEOUT) as client:
resp = await client.post(
GEMMA_URL,
json={
"model": GEMMA_MODEL,
"messages": [{"role": "user", "content": prompt}],
"max_tokens": 200,
"temperature": 0.3,
},
)
if resp.status_code == 200:
return resp.json()["choices"][0]["message"]["content"].strip()
except Exception:
log.debug("Gemma 설명 생성 실패", exc_info=True)
return ""
# --- Alert ---
async def send_alert(message: str) -> bool:
"""Send alert to Synology Chat."""
if not SYNOLOGY_WEBHOOK_URL:
log.warning("SYNOLOGY_INCOMING_URL not set — alert skipped")
log.info("Alert: %s", message)
return False
payload = json.dumps({"text": f"[infra-agent] {message}"}, ensure_ascii=False)
try:
async with httpx.AsyncClient(verify=False, timeout=10.0) as client:
resp = await client.post(SYNOLOGY_WEBHOOK_URL, data={"payload": payload})
if resp.status_code == 200:
log.info("Alert sent: %s", message[:100])
return True
log.error("Alert failed: %d", resp.status_code)
return False
except Exception:
log.exception("Failed to send alert")
return False
# --- Rules → {key: description} ---
async def check_docker_rules() -> dict[str, str]:
"""Returns {issue_key: human_description}."""
issues: dict[str, str] = {}
result = await docker_status("gpu")
if result.error_type:
k = _check_key("gpu-docker")
issues[k] = f"GPU Docker 확인 실패: {result.error}"
return issues
for c in result.containers:
if c.name in IGNORED_CONTAINERS:
continue
if c.status == "restarting":
k = _docker_key("gpu", c.name, "restarting")
issues[k] = f"GPU 컨테이너 재시작 중: {c.name}"
elif c.status != "running":
k = _docker_key("gpu", c.name, c.status)
issues[k] = f"GPU 컨테이너 다운: {c.name} ({c.status})"
return issues
async def check_disk_rules() -> dict[str, str]:
issues: dict[str, str] = {}
for host in ["gpu", "macmini"]:
result = await disk_usage(host)
if not result.ok:
k = _check_key(f"{host}-disk")
issues[k] = f"{host} 디스크 확인 실패: {result.error}"
continue
for fs in result.filesystems:
if fs.used_pct >= DISK_WARN_PCT:
k = _disk_key(host, fs.mount)
issues[k] = f"{host} 디스크 경고: {fs.mount} {fs.used_pct}% ({fs.used}/{fs.total})"
return issues
async def check_health_rules() -> dict[str, str]:
issues: dict[str, str] = {}
# KST 0~7시는 Document Server tier_backfill 가동 시간대 (정책 0~6시 + 잔여 처리 1h buffer).
# 26B 가 batch 점유로 /v1/models 응답이 5~10초 lock 되는 게 정상이므로 mlx 알람만 격하.
# 정책: ~/Documents/code/hyungi_Document_Server/app/workers/tier_backfill.py NIGHT_START/END_HOUR
kst_hour = datetime.now(tz=ZoneInfo("Asia/Seoul")).hour
is_backfill_window = 0 <= kst_hour < 7
for svc in HEALTH_SERVICES:
result = await service_health(svc)
if not result.ok:
if svc == "mlx" and is_backfill_window:
log.info("[mute] mlx down — KST %d시 backfill window", kst_hour)
continue
detail = result.error or result.status
k = _health_key(svc, "down")
issues[k] = f"서비스 다운: {svc}{detail}"
elif result.status == "degraded":
if svc == "mlx" and is_backfill_window:
log.info("[mute] mlx degraded — KST %d시 backfill window", kst_hour)
continue
k = _health_key(svc, "degraded")
issues[k] = f"서비스 저하: {svc}"
return issues
async def check_network_rules() -> dict[str, str]:
issues: dict[str, str] = {}
result = await tailscale_status()
if not result.ok:
k = _check_key("tailscale")
issues[k] = f"Tailscale 확인 실패: {result.error}"
return issues
online_hosts = {p.hostname for p in result.peers if p.status != "offline"}
for expected in EXPECTED_TAILSCALE_HOSTS:
if expected not in online_hosts:
k = _tailscale_key(expected)
issues[k] = f"Tailscale 오프라인: {expected}"
return issues
# --- State Machine ---
async def run_checks() -> None:
"""Run checks, compare with state, alert on change only."""
now = datetime.now(timezone.utc).isoformat()
log.info("=== 상태 수집 시작 (%s) ===", now)
# Collect all current issues: {key: description}
current_issues: dict[str, str] = {}
results = await asyncio.gather(
check_docker_rules(),
check_disk_rules(),
check_health_rules(),
check_network_rules(),
return_exceptions=True,
)
for result in results:
if isinstance(result, Exception):
k = _check_key(str(result)[:20])
current_issues[k] = f"체크 실패: {result}"
else:
current_issues.update(result)
# Load previous state
state = _load_state()
issues_state = state.get("issues", {})
# Each issue: {"fail_count": int, "success_count": int, "alerted": bool, "desc": str}
new_alerts: list[str] = [] # issues to alert about (newly confirmed)
recovery_alerts: list[str] = [] # issues that recovered
# Track all keys we've processed
processed_keys = set()
# --- Process current issues ---
for key, desc in current_issues.items():
processed_keys.add(key)
entry = issues_state.get(key, {"fail_count": 0, "success_count": 0, "alerted": False, "desc": ""})
# Failure detected: increment fail, reset success
entry["fail_count"] = entry.get("fail_count", 0) + 1
entry["success_count"] = 0
entry["desc"] = desc
if entry["fail_count"] >= ALERT_THRESHOLD and not entry.get("alerted"):
new_alerts.append(desc)
entry["alerted"] = True
issues_state[key] = entry
# --- Process resolved issues (in state but not in current) ---
resolved_keys = []
for key in list(issues_state.keys()):
if key in processed_keys:
continue
entry = issues_state[key]
# Success: increment success, reset fail
entry["success_count"] = entry.get("success_count", 0) + 1
entry["fail_count"] = 0
if entry["success_count"] >= RECOVERY_THRESHOLD:
if entry.get("alerted"):
recovery_alerts.append(f"정상 복구: {entry.get('desc', key)}")
resolved_keys.append(key)
else:
issues_state[key] = entry
# Remove fully resolved issues
for key in resolved_keys:
issues_state.pop(key, None)
# Save state
_save_state({"issues": issues_state})
# --- Send alerts ---
if new_alerts:
log.warning("새 이상 %d건 (debounce 확정):", len(new_alerts))
for a in new_alerts:
log.warning(" - %s", a)
explanation = await generate_explanation(new_alerts)
lines = [f"이상 감지 {len(new_alerts)}건:"]
lines.extend(f"- {a}" for a in new_alerts)
if explanation:
lines.append(f"\n분석: {explanation}")
await send_alert("\n".join(lines))
if recovery_alerts:
log.info("복구 %d건:", len(recovery_alerts))
for a in recovery_alerts:
log.info(" - %s", a)
await send_alert("\n".join(recovery_alerts))
if not new_alerts and not recovery_alerts:
active = sum(1 for e in issues_state.values() if e.get("alerted"))
if active:
log.info("전체 정상 (관찰 중 %d건)", len(issues_state))
else:
log.info("전체 정상")
def main():
asyncio.run(run_checks())
if __name__ == "__main__":
main()