diff --git a/infra/core/docserver.py b/infra/core/docserver.py new file mode 100644 index 0000000..9e732d5 --- /dev/null +++ b/infra/core/docserver.py @@ -0,0 +1,63 @@ +"""Document Server diagnostic tools — scheduler and queue status.""" + +from __future__ import annotations + +import json +import re +from datetime import datetime, timezone + +from ..config import HOSTS +from ..schemas import BaseResult +from .ssh import run_command, run_local, SSHError, _is_local_host + + +def _now() -> str: + return datetime.now(timezone.utc).isoformat() + + +async def scheduler_status() -> dict: + """Get APScheduler job status from Document Server logs.""" + cfg = HOSTS["gpu"] + cmd = "docker logs hyungi_document_server-fastapi-1 --tail 100 2>&1 | grep -iE 'scheduler|apscheduler|job|trigger|cron|interval' | tail -20" + + try: + if _is_local_host(cfg): + stdout, _ = await run_local(cmd) + else: + stdout, _ = await run_command(cfg, cmd) + except SSHError as e: + return {"ok": False, "checked_at": _now(), "error_type": e.error_type, + "error": str(e), "data": [], "summary": ""} + + lines = [l.strip() for l in stdout.strip().splitlines() if l.strip()] + return { + "ok": True, + "checked_at": _now(), + "data": lines, + "summary": f"최근 스케줄러 로그 {len(lines)}줄", + "error": "", + } + + +async def queue_status() -> dict: + """Get document processing queue status from Document Server logs.""" + cfg = HOSTS["gpu"] + cmd = "docker logs hyungi_document_server-fastapi-1 --tail 200 2>&1 | grep -iE 'queue_consumer|pending|processing|completed|stale|batch' | tail -20" + + try: + if _is_local_host(cfg): + stdout, _ = await run_local(cmd) + else: + stdout, _ = await run_command(cfg, cmd) + except SSHError as e: + return {"ok": False, "checked_at": _now(), "error_type": e.error_type, + "error": str(e), "data": [], "summary": ""} + + lines = [l.strip() for l in stdout.strip().splitlines() if l.strip()] + return { + "ok": True, + "checked_at": _now(), + "data": lines, + "summary": f"최근 큐 로그 {len(lines)}줄", + "error": "", + } diff --git a/infra/core/verify.py b/infra/core/verify.py new file mode 100644 index 0000000..f58789c --- /dev/null +++ b/infra/core/verify.py @@ -0,0 +1,73 @@ +"""Run predefined verify commands from infra_inventory.md.""" + +from __future__ import annotations + +from datetime import datetime, timezone + +from ..config import HOSTS +from .ssh import run_command, run_local, SSHError, _is_local_host + + +def _now() -> str: + return datetime.now(timezone.utc).isoformat() + + +# Predefined verify commands (from infra_inventory.md) +VERIFY_COMMANDS = { + "gpu-snapshot": { + "host": "gpu", + "cmd": "docker compose ls 2>/dev/null; echo '---'; ollama list 2>/dev/null; echo '---'; pgrep -af 'Plex Media' | head -1 2>/dev/null", + "desc": "GPU 서버 전체 스냅샷 (docker, ollama, plex)", + }, + "macmini-snapshot": { + "host": "macmini", + "cmd": "bash -lc 'launchctl list | grep com.user; curl -s localhost:8800/v1/models 2>/dev/null | python3 -m json.tool 2>/dev/null; ollama list 2>/dev/null'", + "desc": "Mac mini 전체 스냅샷 (launchd, MLX, ollama)", + }, + "docserver-health": { + "host": "gpu", + "cmd": "curl -sf http://localhost:8000/health", + "desc": "Document Server 헬스체크", + }, + "config-model-match": { + "host": "gpu", + "cmd": "grep -E 'model:' ~/Documents/code/hyungi_Document_Server/config.yaml 2>/dev/null; echo '---'; ollama list 2>/dev/null", + "desc": "config.yaml 모델과 실제 설치 모델 비교", + }, +} + + +async def run_verify(check_name: str) -> dict: + """Run a predefined verify command.""" + if check_name not in VERIFY_COMMANDS: + available = ", ".join(VERIFY_COMMANDS.keys()) + return { + "ok": False, "checked_at": _now(), + "error_type": "parse_error", + "error": f"알 수 없는 체크: '{check_name}'. 사용 가능: {available}", + "data": [], "summary": "", "raw": "", + } + + spec = VERIFY_COMMANDS[check_name] + cfg = HOSTS[spec["host"]] + + try: + if _is_local_host(cfg): + stdout, _ = await run_local(spec["cmd"], timeout=15) + else: + stdout, _ = await run_command(cfg, spec["cmd"], timeout=15) + except SSHError as e: + return { + "ok": False, "checked_at": _now(), + "error_type": e.error_type, "error": str(e), + "data": [], "summary": spec["desc"], "raw": "", + } + + return { + "ok": True, + "checked_at": _now(), + "data": stdout.strip().splitlines(), + "summary": spec["desc"], + "error": "", + "raw": stdout.strip(), + } diff --git a/infra/mcp_server.py b/infra/mcp_server.py index 5939e40..c76b890 100644 --- a/infra/mcp_server.py +++ b/infra/mcp_server.py @@ -11,6 +11,7 @@ All actual logic lives in src/core/. from __future__ import annotations +import json from mcp.server.fastmcp import FastMCP from .core.docker import docker_status, docker_logs @@ -18,6 +19,8 @@ from .core.health import service_health, VALID_SERVICES from .core.system import disk_usage from .core.network import tailscale_status from .core.models import ollama_models, mlx_models +from .core.docserver import scheduler_status as _scheduler_status, queue_status as _queue_status +from .core.verify import run_verify as _run_verify, VERIFY_COMMANDS mcp = FastMCP( "infra", @@ -99,6 +102,31 @@ async def check_mlx_models() -> str: return result.model_dump_json(indent=2) +@mcp.tool() +async def check_scheduler_status() -> str: + """Document Server APScheduler 잡 상태. 최근 스케줄러 로그에서 추출.""" + result = await _scheduler_status() + return json.dumps(result, ensure_ascii=False, indent=2) + + +@mcp.tool() +async def check_queue_status() -> str: + """Document Server 문서 처리 큐 현황. 최근 큐 로그에서 추출.""" + result = await _queue_status() + return json.dumps(result, ensure_ascii=False, indent=2) + + +@mcp.tool() +async def check_verify(check_name: str) -> str: + """인프라 검증 명령 실행 (infra_inventory.md 기반). + + Args: + check_name: 체크 이름 (gpu-snapshot | macmini-snapshot | docserver-health | config-model-match) + """ + result = await _run_verify(check_name) + return json.dumps(result, ensure_ascii=False, indent=2) + + def main(): mcp.run(transport="stdio") diff --git a/nanoclaude/services/worker.py b/nanoclaude/services/worker.py index c715676..e47c403 100644 --- a/nanoclaude/services/worker.py +++ b/nanoclaude/services/worker.py @@ -190,7 +190,7 @@ def _pre_route(message: str) -> dict | None: return {"action": "tools", "tool": "document", "operation": "search", "params": {"query": query}} # 인프라 도구 키워드 - infra_keywords = ["docker", "컨테이너", "디스크", "용량", "헬스체크", "tailscale", "ollama 모델", "mlx 모델"] + infra_keywords = ["docker", "컨테이너", "디스크", "용량", "헬스체크", "tailscale", "ollama 모델", "mlx 모델", "스케줄러", "scheduler", "큐 상태", "queue", "처리 큐", "verify", "검증"] if any(k in msg for k in infra_keywords): # docker/컨테이너 상태 if any(k in msg for k in ["docker", "컨테이너"]): @@ -212,6 +212,15 @@ def _pre_route(message: str) -> dict | None: if any(k in msg for k in ["ollama 모델", "mlx 모델"]): host = "mlx" if "mlx" in msg else "gpu" return {"action": "tools", "tool": "infra", "operation": "models", "params": {"host": host}} + # 스케줄러 + if any(k in msg for k in ["스케줄러", "scheduler"]): + return {"action": "tools", "tool": "infra", "operation": "scheduler", "params": {}} + # 큐 + if any(k in msg for k in ["큐 상태", "queue", "처리 큐"]): + return {"action": "tools", "tool": "infra", "operation": "queue", "params": {}} + # 검증 + if any(k in msg for k in ["verify", "검증"]): + return {"action": "tools", "tool": "infra", "operation": "verify", "params": {"check_name": "gpu-snapshot"}} # 시스템 상태 질문 — 마커만 반환 (worker에서 비동기 조회) if any(k in msg for k in ["추론 모델", "gemma", "젬마", "서버 상태", "시스템 상태"]) or \ @@ -254,11 +263,9 @@ async def run(job: Job) -> None: # --- 사전 라우팅 (키워드 기반, EXAONE 스킵) --- pre = _pre_route(job.message) classify_latency = 0 - print(f"[TRACE] Job {job.id} pre_route result: {pre}", flush=True) if pre: classification = pre - print(f"[TRACE] Job {job.id} PRE-ROUTED: {pre.get('tool','')}.{pre.get('operation','')}", flush=True) logger.info("Job %s pre-routed: %s.%s", job.id, pre.get("tool", ""), pre.get("operation", pre.get("action", ""))) else: # --- EXAONE 분류기 호출 --- @@ -291,7 +298,6 @@ async def run(job: Job) -> None: response_text = classification.get("response", "") route_prompt = classification.get("prompt", "") - print(f"[TRACE] Job {job.id} final action='{action}' classification={classification}", flush=True) logger.info("Job %s classified as '%s'", job.id, action) # 대화 기록: 사용자 메시지 diff --git a/nanoclaude/tools/infra_tool.py b/nanoclaude/tools/infra_tool.py index ce8ecff..93b0006 100644 --- a/nanoclaude/tools/infra_tool.py +++ b/nanoclaude/tools/infra_tool.py @@ -14,6 +14,8 @@ from infra.core.health import service_health, VALID_SERVICES from infra.core.system import disk_usage from infra.core.network import tailscale_status from infra.core.models import ollama_models, mlx_models +from infra.core.docserver import scheduler_status as _scheduler_status, queue_status as _queue_status +from infra.core.verify import run_verify as _run_verify, VERIFY_COMMANDS logger = logging.getLogger(__name__) @@ -111,3 +113,18 @@ async def models(host: str = "gpu") -> dict: summary = f"{result.source} on {result.host}: {len(result.models)}개 모델" return {"ok": True, "tool": "infra", "operation": "models", "data": data, "summary": summary, "error": ""} + + +async def scheduler() -> dict: + """Document Server scheduler status.""" + return await _scheduler_status() + + +async def queue() -> dict: + """Document Server queue status.""" + return await _queue_status() + + +async def verify(check_name: str = "gpu-snapshot") -> dict: + """Run predefined verify command.""" + return await _run_verify(check_name) diff --git a/nanoclaude/tools/registry.py b/nanoclaude/tools/registry.py index 5a0a7b6..fafb7a1 100644 --- a/nanoclaude/tools/registry.py +++ b/nanoclaude/tools/registry.py @@ -21,7 +21,7 @@ ALLOWED_OPS = { "calendar": {"today", "search", "create_draft", "create_confirmed"}, "email": {"search", "read"}, "document": {"search", "read"}, - "infra": {"status", "health", "disk", "network", "models"}, + "infra": {"status", "health", "disk", "network", "models", "scheduler", "queue", "verify"}, } # payload hard limit @@ -113,6 +113,12 @@ async def _exec_infra(operation: str, params: dict) -> dict: return await infra_tool.network() elif operation == "models": return await infra_tool.models(params.get("host", "gpu")) + elif operation == "scheduler": + return await infra_tool.scheduler() + elif operation == "queue": + return await infra_tool.queue() + elif operation == "verify": + return await infra_tool.verify(params.get("check_name", "gpu-snapshot")) return _error("infra", operation, "미구현")