From ac8787c153db63208e4b005f34df4759e03fce92 Mon Sep 17 00:00:00 2001 From: Hyungi Ahn Date: Mon, 13 Apr 2026 13:16:01 +0900 Subject: [PATCH] =?UTF-8?q?feat(infra):=20Phase=202=20monitoring=20agent?= =?UTF-8?q?=20=E2=80=94=20rule-first=20+=20=EC=8B=9C=EB=86=80=EB=A1=9C?= =?UTF-8?q?=EC=A7=80=20Chat=20=EC=95=8C=EB=A6=BC?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 5분 cron용 agent. docker/disk/health/network 4개 체크. asyncssh 로그 억제, 작은 파티션(< 1G) 무시. Co-Authored-By: Claude Opus 4.6 (1M context) --- infra/agent.py | 194 +++++++++++++++++++++++++++++++++++++++++++ infra/core/system.py | 4 + 2 files changed, 198 insertions(+) create mode 100644 infra/agent.py diff --git a/infra/agent.py b/infra/agent.py new file mode 100644 index 0000000..ef74a46 --- /dev/null +++ b/infra/agent.py @@ -0,0 +1,194 @@ +"""Infra Monitoring Agent — rule-first, Gemma 2nd. + +5분마다 실행되어: +1. core/ 함수로 상태 수집 +2. Rule 기반 1차 판정 (threshold/패턴) +3. 이상 감지 시 → 시놀로지 Chat 알림 +4. 로그 → stdout (launchd가 캡처) + +Gemma 4는 Phase 2.1에서 추가 (알림 메시지 자연어 생성). +""" + +from __future__ import annotations + +import asyncio +import json +import logging +import os +import sys +from datetime import datetime, timezone +from pathlib import Path + +import httpx +from dotenv import load_dotenv + +# gpu-services 루트에서 실행되므로 infra.core를 import 가능 +from infra.core.docker import docker_status +from infra.core.health import service_health +from infra.core.system import disk_usage +from infra.core.network import tailscale_status + +load_dotenv(Path(__file__).parent / ".env") + +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s [%(levelname)s] %(message)s", + datefmt="%Y-%m-%d %H:%M:%S", +) +# asyncssh is extremely verbose at INFO level +logging.getLogger("asyncssh").setLevel(logging.WARNING) +log = logging.getLogger("infra-agent") + +# --- Config --- + +SYNOLOGY_WEBHOOK_URL = os.getenv("SYNOLOGY_INCOMING_URL", "") +DISK_WARN_PCT = 85 +EXPECTED_TAILSCALE_HOSTS = {"sub-server", "hyungi-macmini", "hyungi-macbookpro"} + +# Docker containers known to be intentionally stopped +IGNORED_CONTAINERS = { + "hyungi_document_server-ai-gateway-1", + "hyungi_document_server-ollama-1", +} + +# Services to health-check +HEALTH_SERVICES = ["document-server", "mlx", "ollama-gpu"] + + +# --- Alert --- + +async def send_alert(message: str) -> bool: + """Send alert to Synology Chat via incoming webhook.""" + if not SYNOLOGY_WEBHOOK_URL: + log.warning("SYNOLOGY_INCOMING_URL not set — alert skipped") + log.info("Alert content: %s", message) + return False + + payload = json.dumps({"text": f"[infra-agent] {message}"}, ensure_ascii=False) + try: + async with httpx.AsyncClient(verify=False, timeout=10.0) as client: + resp = await client.post( + SYNOLOGY_WEBHOOK_URL, + data={"payload": payload}, + ) + if resp.status_code == 200: + log.info("Alert sent: %s", message[:100]) + return True + log.error("Alert failed: %d %s", resp.status_code, resp.text) + return False + except Exception: + log.exception("Failed to send alert") + return False + + +# --- Rules --- + +async def check_docker_rules() -> list[str]: + """Check Docker containers on GPU server.""" + alerts = [] + result = await docker_status("gpu") + + if not result.ok: + if result.error_type: + alerts.append(f"GPU Docker 확인 실패: {result.error}") + return alerts + + for c in result.containers: + if c.name in IGNORED_CONTAINERS: + continue + if c.status != "running": + alerts.append(f"GPU 컨테이너 다운: {c.name} ({c.status})") + elif c.status == "restarting": + alerts.append(f"GPU 컨테이너 재시작 중: {c.name}") + + return alerts + + +async def check_disk_rules() -> list[str]: + """Check disk usage on all hosts.""" + alerts = [] + for host in ["gpu", "macmini"]: + result = await disk_usage(host) + if not result.ok: + alerts.append(f"{host} 디스크 확인 실패: {result.error}") + continue + for fs in result.filesystems: + if fs.used_pct >= DISK_WARN_PCT: + alerts.append( + f"{host} 디스크 경고: {fs.mount} {fs.used_pct}% " + f"(사용 {fs.used}/{fs.total})" + ) + return alerts + + +async def check_health_rules() -> list[str]: + """Check critical services health.""" + alerts = [] + for svc in HEALTH_SERVICES: + result = await service_health(svc) + if not result.ok: + detail = result.error or result.status + alerts.append(f"서비스 다운: {svc} — {detail}") + elif result.status == "degraded": + alerts.append(f"서비스 저하: {svc} — {result.details}") + return alerts + + +async def check_network_rules() -> list[str]: + """Check Tailscale connectivity for critical hosts.""" + alerts = [] + result = await tailscale_status() + if not result.ok: + alerts.append(f"Tailscale 확인 실패: {result.error}") + return alerts + + online_hosts = {p.hostname for p in result.peers if p.status != "offline"} + for expected in EXPECTED_TAILSCALE_HOSTS: + if expected not in online_hosts: + alerts.append(f"Tailscale 오프라인: {expected}") + + return alerts + + +# --- Main --- + +async def run_checks() -> None: + """Run all checks and send alerts if needed.""" + now = datetime.now(timezone.utc).isoformat() + log.info("=== 상태 수집 시작 (%s) ===", now) + + all_alerts: list[str] = [] + + # Run all checks concurrently + results = await asyncio.gather( + check_docker_rules(), + check_disk_rules(), + check_health_rules(), + check_network_rules(), + return_exceptions=True, + ) + + for result in results: + if isinstance(result, Exception): + all_alerts.append(f"체크 실패: {result}") + else: + all_alerts.extend(result) + + if all_alerts: + log.warning("이상 감지 %d건:", len(all_alerts)) + for a in all_alerts: + log.warning(" - %s", a) + + # Send combined alert + message = f"이상 감지 {len(all_alerts)}건:\n" + "\n".join(f"- {a}" for a in all_alerts) + await send_alert(message) + else: + log.info("전체 정상") + + +def main(): + asyncio.run(run_checks()) + + +if __name__ == "__main__": + main() diff --git a/infra/core/system.py b/infra/core/system.py index 26ecc45..293a398 100644 --- a/infra/core/system.py +++ b/infra/core/system.py @@ -27,6 +27,10 @@ def _parse_df(output: str) -> list[FileSystemInfo]: continue if parts[0] in ("tmpfs", "devtmpfs", "overlay", "shm", "none"): continue + # Skip tiny partitions (< 1G) — boot/system/snap, not worth monitoring + total_str = parts[1] + if total_str.endswith(("K", "Ki", "M", "Mi")) and not total_str.endswith(("G", "Gi", "T", "Ti")): + continue try: used_pct = int(parts[4].rstrip("%"))