cc2c9467fe
Document Server tier_backfill 가 KST 0~6시 사이 26B 에 batch enqueue 하면서 /v1/models 응답이 5~10초 lock 돼 healthcheck timeout 알람이 반복 발생. 정책 의도(야간=batch 점유 시간)와 healthcheck SLA(24/7 동일) 불일치 해결. - KST 0~7시 (정책 0~6 + 잔여 처리 1h buffer) 는 mlx down/degraded 를 log-only 로 격하 - 주간 timeout 은 그대로 알람 (실사용자 영향 시그널 보존) - 다른 서비스 (document-server, ollama-gpu) 는 영향 없음 Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
354 lines
12 KiB
Python
354 lines
12 KiB
Python
"""Infra Monitoring Agent — rule-first, Gemma 2nd, alert-on-change.
|
|
|
|
5분마다 실행되어:
|
|
1. core/ 함수로 상태 수집
|
|
2. Rule 기반 1차 판정 → stable issue key 생성
|
|
3. 상태 파일과 비교 (debounce: 2회 연속 기준)
|
|
4. 새 이슈 → Gemma 설명 + 시놀로지 Chat 알림
|
|
5. 복구 → 복구 알림
|
|
6. 동일 이슈 지속 → 무음
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import json
|
|
import logging
|
|
import os
|
|
import tempfile
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from zoneinfo import ZoneInfo
|
|
|
|
import httpx
|
|
from dotenv import load_dotenv
|
|
|
|
from infra.core.docker import docker_status
|
|
from infra.core.health import service_health
|
|
from infra.core.system import disk_usage
|
|
from infra.core.network import tailscale_status
|
|
|
|
load_dotenv(Path(__file__).parent / ".env")
|
|
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format="%(asctime)s [%(levelname)s] %(message)s",
|
|
datefmt="%Y-%m-%d %H:%M:%S",
|
|
)
|
|
logging.getLogger("asyncssh").setLevel(logging.WARNING)
|
|
log = logging.getLogger("infra-agent")
|
|
|
|
# --- Config ---
|
|
|
|
SYNOLOGY_WEBHOOK_URL = os.getenv("SYNOLOGY_INCOMING_URL", "")
|
|
DISK_WARN_PCT = 85
|
|
|
|
# Only server-grade hosts (MacBook Pro excluded — sleep is normal)
|
|
EXPECTED_TAILSCALE_HOSTS = {"sub-server", "hyungi-macmini"}
|
|
|
|
IGNORED_CONTAINERS = {
|
|
"hyungi_document_server-ai-gateway-1",
|
|
"hyungi_document_server-ollama-1",
|
|
}
|
|
|
|
HEALTH_SERVICES = ["document-server", "mlx", "ollama-gpu"]
|
|
|
|
GEMMA_URL = "http://localhost:8801/v1/chat/completions"
|
|
GEMMA_MODEL = "mlx-community/gemma-4-26b-a4b-it-8bit"
|
|
GEMMA_TIMEOUT = 30
|
|
|
|
# Debounce: consecutive cycles before alert/recovery
|
|
ALERT_THRESHOLD = 2 # 2 consecutive failures → alert
|
|
RECOVERY_THRESHOLD = 2 # 2 consecutive successes → recovery
|
|
|
|
# State file (persistent across reboots)
|
|
STATE_DIR = Path.home() / "Library" / "Application Support" / "infra-agent"
|
|
STATE_FILE = STATE_DIR / "state.json"
|
|
|
|
|
|
# --- State Management ---
|
|
|
|
def _load_state() -> dict:
|
|
"""Load state file. Returns empty state on failure (graceful fallback)."""
|
|
try:
|
|
if STATE_FILE.exists():
|
|
data = json.loads(STATE_FILE.read_text())
|
|
if isinstance(data, dict) and "issues" in data:
|
|
return data
|
|
except (json.JSONDecodeError, OSError) as e:
|
|
log.warning("State file 손상 — 초기화: %s", e)
|
|
return {"issues": {}}
|
|
|
|
|
|
def _save_state(state: dict) -> None:
|
|
"""Save state file atomically (write tmp + rename)."""
|
|
STATE_DIR.mkdir(parents=True, exist_ok=True)
|
|
tmp = STATE_FILE.with_suffix(".tmp")
|
|
try:
|
|
tmp.write_text(json.dumps(state, ensure_ascii=False, indent=2))
|
|
tmp.rename(STATE_FILE)
|
|
except OSError:
|
|
log.exception("State file 저장 실패")
|
|
|
|
|
|
# --- Issue Key Generation ---
|
|
# Key format: {category}:{host}:{target}:{condition}
|
|
# Examples: docker:gpu:fastapi-1:exited, disk:gpu:/:high, tailscale:sub-server:offline
|
|
|
|
def _docker_key(host: str, container: str, status: str) -> str:
|
|
return f"docker:{host}:{container}:{status}"
|
|
|
|
def _disk_key(host: str, mount: str) -> str:
|
|
return f"disk:{host}:{mount}:high"
|
|
|
|
def _health_key(service: str, status: str) -> str:
|
|
return f"health:{service}:{status}"
|
|
|
|
def _tailscale_key(hostname: str) -> str:
|
|
return f"tailscale:{hostname}:offline"
|
|
|
|
def _check_key(error: str) -> str:
|
|
return f"check:error:{error[:30]}"
|
|
|
|
|
|
# --- Gemma ---
|
|
|
|
async def generate_explanation(alerts: list[str]) -> str:
|
|
"""Ask Gemma 4 to explain alerts. Returns empty on failure."""
|
|
prompt = (
|
|
"당신은 서버 인프라 모니터링 AI입니다. "
|
|
"아래 이상 항목들을 분석해서 간결하게 설명하고 권장 조치를 알려주세요.\n\n"
|
|
"이상 항목:\n" + "\n".join(f"- {a}" for a in alerts) + "\n\n"
|
|
"형식: 1~3문장으로 원인 분석 + 권장 조치. 한국어로."
|
|
)
|
|
try:
|
|
async with httpx.AsyncClient(timeout=GEMMA_TIMEOUT) as client:
|
|
resp = await client.post(
|
|
GEMMA_URL,
|
|
json={
|
|
"model": GEMMA_MODEL,
|
|
"messages": [{"role": "user", "content": prompt}],
|
|
"max_tokens": 200,
|
|
"temperature": 0.3,
|
|
},
|
|
)
|
|
if resp.status_code == 200:
|
|
return resp.json()["choices"][0]["message"]["content"].strip()
|
|
except Exception:
|
|
log.debug("Gemma 설명 생성 실패", exc_info=True)
|
|
return ""
|
|
|
|
|
|
# --- Alert ---
|
|
|
|
async def send_alert(message: str) -> bool:
|
|
"""Send alert to Synology Chat."""
|
|
if not SYNOLOGY_WEBHOOK_URL:
|
|
log.warning("SYNOLOGY_INCOMING_URL not set — alert skipped")
|
|
log.info("Alert: %s", message)
|
|
return False
|
|
|
|
payload = json.dumps({"text": f"[infra-agent] {message}"}, ensure_ascii=False)
|
|
try:
|
|
async with httpx.AsyncClient(verify=False, timeout=10.0) as client:
|
|
resp = await client.post(SYNOLOGY_WEBHOOK_URL, data={"payload": payload})
|
|
if resp.status_code == 200:
|
|
log.info("Alert sent: %s", message[:100])
|
|
return True
|
|
log.error("Alert failed: %d", resp.status_code)
|
|
return False
|
|
except Exception:
|
|
log.exception("Failed to send alert")
|
|
return False
|
|
|
|
|
|
# --- Rules → {key: description} ---
|
|
|
|
async def check_docker_rules() -> dict[str, str]:
|
|
"""Returns {issue_key: human_description}."""
|
|
issues: dict[str, str] = {}
|
|
result = await docker_status("gpu")
|
|
if result.error_type:
|
|
k = _check_key("gpu-docker")
|
|
issues[k] = f"GPU Docker 확인 실패: {result.error}"
|
|
return issues
|
|
|
|
for c in result.containers:
|
|
if c.name in IGNORED_CONTAINERS:
|
|
continue
|
|
if c.status == "restarting":
|
|
k = _docker_key("gpu", c.name, "restarting")
|
|
issues[k] = f"GPU 컨테이너 재시작 중: {c.name}"
|
|
elif c.status != "running":
|
|
k = _docker_key("gpu", c.name, c.status)
|
|
issues[k] = f"GPU 컨테이너 다운: {c.name} ({c.status})"
|
|
return issues
|
|
|
|
|
|
async def check_disk_rules() -> dict[str, str]:
|
|
issues: dict[str, str] = {}
|
|
for host in ["gpu", "macmini"]:
|
|
result = await disk_usage(host)
|
|
if not result.ok:
|
|
k = _check_key(f"{host}-disk")
|
|
issues[k] = f"{host} 디스크 확인 실패: {result.error}"
|
|
continue
|
|
for fs in result.filesystems:
|
|
if fs.used_pct >= DISK_WARN_PCT:
|
|
k = _disk_key(host, fs.mount)
|
|
issues[k] = f"{host} 디스크 경고: {fs.mount} {fs.used_pct}% ({fs.used}/{fs.total})"
|
|
return issues
|
|
|
|
|
|
async def check_health_rules() -> dict[str, str]:
|
|
issues: dict[str, str] = {}
|
|
# KST 0~7시는 Document Server tier_backfill 가동 시간대 (정책 0~6시 + 잔여 처리 1h buffer).
|
|
# 26B 가 batch 점유로 /v1/models 응답이 5~10초 lock 되는 게 정상이므로 mlx 알람만 격하.
|
|
# 정책: ~/Documents/code/hyungi_Document_Server/app/workers/tier_backfill.py NIGHT_START/END_HOUR
|
|
kst_hour = datetime.now(tz=ZoneInfo("Asia/Seoul")).hour
|
|
is_backfill_window = 0 <= kst_hour < 7
|
|
|
|
for svc in HEALTH_SERVICES:
|
|
result = await service_health(svc)
|
|
if not result.ok:
|
|
if svc == "mlx" and is_backfill_window:
|
|
log.info("[mute] mlx down — KST %d시 backfill window", kst_hour)
|
|
continue
|
|
detail = result.error or result.status
|
|
k = _health_key(svc, "down")
|
|
issues[k] = f"서비스 다운: {svc} — {detail}"
|
|
elif result.status == "degraded":
|
|
if svc == "mlx" and is_backfill_window:
|
|
log.info("[mute] mlx degraded — KST %d시 backfill window", kst_hour)
|
|
continue
|
|
k = _health_key(svc, "degraded")
|
|
issues[k] = f"서비스 저하: {svc}"
|
|
return issues
|
|
|
|
|
|
async def check_network_rules() -> dict[str, str]:
|
|
issues: dict[str, str] = {}
|
|
result = await tailscale_status()
|
|
if not result.ok:
|
|
k = _check_key("tailscale")
|
|
issues[k] = f"Tailscale 확인 실패: {result.error}"
|
|
return issues
|
|
|
|
online_hosts = {p.hostname for p in result.peers if p.status != "offline"}
|
|
for expected in EXPECTED_TAILSCALE_HOSTS:
|
|
if expected not in online_hosts:
|
|
k = _tailscale_key(expected)
|
|
issues[k] = f"Tailscale 오프라인: {expected}"
|
|
return issues
|
|
|
|
|
|
# --- State Machine ---
|
|
|
|
async def run_checks() -> None:
|
|
"""Run checks, compare with state, alert on change only."""
|
|
now = datetime.now(timezone.utc).isoformat()
|
|
log.info("=== 상태 수집 시작 (%s) ===", now)
|
|
|
|
# Collect all current issues: {key: description}
|
|
current_issues: dict[str, str] = {}
|
|
|
|
results = await asyncio.gather(
|
|
check_docker_rules(),
|
|
check_disk_rules(),
|
|
check_health_rules(),
|
|
check_network_rules(),
|
|
return_exceptions=True,
|
|
)
|
|
for result in results:
|
|
if isinstance(result, Exception):
|
|
k = _check_key(str(result)[:20])
|
|
current_issues[k] = f"체크 실패: {result}"
|
|
else:
|
|
current_issues.update(result)
|
|
|
|
# Load previous state
|
|
state = _load_state()
|
|
issues_state = state.get("issues", {})
|
|
# Each issue: {"fail_count": int, "success_count": int, "alerted": bool, "desc": str}
|
|
|
|
new_alerts: list[str] = [] # issues to alert about (newly confirmed)
|
|
recovery_alerts: list[str] = [] # issues that recovered
|
|
|
|
# Track all keys we've processed
|
|
processed_keys = set()
|
|
|
|
# --- Process current issues ---
|
|
for key, desc in current_issues.items():
|
|
processed_keys.add(key)
|
|
entry = issues_state.get(key, {"fail_count": 0, "success_count": 0, "alerted": False, "desc": ""})
|
|
|
|
# Failure detected: increment fail, reset success
|
|
entry["fail_count"] = entry.get("fail_count", 0) + 1
|
|
entry["success_count"] = 0
|
|
entry["desc"] = desc
|
|
|
|
if entry["fail_count"] >= ALERT_THRESHOLD and not entry.get("alerted"):
|
|
new_alerts.append(desc)
|
|
entry["alerted"] = True
|
|
|
|
issues_state[key] = entry
|
|
|
|
# --- Process resolved issues (in state but not in current) ---
|
|
resolved_keys = []
|
|
for key in list(issues_state.keys()):
|
|
if key in processed_keys:
|
|
continue
|
|
|
|
entry = issues_state[key]
|
|
# Success: increment success, reset fail
|
|
entry["success_count"] = entry.get("success_count", 0) + 1
|
|
entry["fail_count"] = 0
|
|
|
|
if entry["success_count"] >= RECOVERY_THRESHOLD:
|
|
if entry.get("alerted"):
|
|
recovery_alerts.append(f"정상 복구: {entry.get('desc', key)}")
|
|
resolved_keys.append(key)
|
|
else:
|
|
issues_state[key] = entry
|
|
|
|
# Remove fully resolved issues
|
|
for key in resolved_keys:
|
|
issues_state.pop(key, None)
|
|
|
|
# Save state
|
|
_save_state({"issues": issues_state})
|
|
|
|
# --- Send alerts ---
|
|
if new_alerts:
|
|
log.warning("새 이상 %d건 (debounce 확정):", len(new_alerts))
|
|
for a in new_alerts:
|
|
log.warning(" - %s", a)
|
|
|
|
explanation = await generate_explanation(new_alerts)
|
|
lines = [f"이상 감지 {len(new_alerts)}건:"]
|
|
lines.extend(f"- {a}" for a in new_alerts)
|
|
if explanation:
|
|
lines.append(f"\n분석: {explanation}")
|
|
await send_alert("\n".join(lines))
|
|
|
|
if recovery_alerts:
|
|
log.info("복구 %d건:", len(recovery_alerts))
|
|
for a in recovery_alerts:
|
|
log.info(" - %s", a)
|
|
await send_alert("\n".join(recovery_alerts))
|
|
|
|
if not new_alerts and not recovery_alerts:
|
|
active = sum(1 for e in issues_state.values() if e.get("alerted"))
|
|
if active:
|
|
log.info("전체 정상 (관찰 중 %d건)", len(issues_state))
|
|
else:
|
|
log.info("전체 정상")
|
|
|
|
|
|
def main():
|
|
asyncio.run(run_checks())
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|