"""System tools — disk usage.""" from __future__ import annotations from datetime import datetime, timezone from ..config import validate_host from ..schemas import DiskResult, FileSystemInfo from .ssh import run_command, SSHError def _now() -> str: return datetime.now(timezone.utc).isoformat() def _parse_df(output: str) -> list[FileSystemInfo]: """Parse `df -h` output into structured filesystem info.""" filesystems = [] for line in output.strip().splitlines()[1:]: # skip header parts = line.split() if len(parts) < 6: continue # df -h columns: Filesystem Size Used Avail Use% Mounted mount = parts[-1] # Skip pseudo-filesystems if mount.startswith(("/dev", "/sys", "/proc", "/run", "/snap")): continue if parts[0] in ("tmpfs", "devtmpfs", "overlay", "shm", "none"): continue try: used_pct = int(parts[4].rstrip("%")) except ValueError: continue filesystems.append(FileSystemInfo( mount=mount, total=parts[1], used=parts[2], avail=parts[3], used_pct=used_pct, )) return filesystems async def disk_usage(host: str) -> DiskResult: """Get disk usage for a host with structured filesystem info.""" try: cfg = validate_host("disk_usage", host) except ValueError as e: return DiskResult( ok=False, checked_at=_now(), host=host, error_type="parse_error", error=str(e), ) try: stdout, _ = await run_command(cfg, "df -h", use_sudo=cfg.needs_sudo) except SSHError as e: return DiskResult( ok=False, checked_at=_now(), host=host, error_type=e.error_type, error=str(e), ) filesystems = _parse_df(stdout) warnings = [] WARN_THRESHOLD = 85 for fs in filesystems: if fs.used_pct >= WARN_THRESHOLD: warnings.append(f"{fs.mount} 사용률 {fs.used_pct}% — 임계값 {WARN_THRESHOLD}% 초과") return DiskResult( ok=True, checked_at=_now(), host=host, filesystems=filesystems, warnings=warnings, raw=stdout.strip(), )