""" PKM 시스템 공통 유틸리티 - 로거 설정 (파일 + 콘솔) - credentials.env 로딩 - osascript 호출 래퍼 """ import os import sys import logging import subprocess from pathlib import Path from dotenv import load_dotenv # 프로젝트 루트 디렉토리 PROJECT_ROOT = Path(__file__).parent.parent LOGS_DIR = PROJECT_ROOT / "logs" DATA_DIR = PROJECT_ROOT / "data" SCRIPTS_DIR = PROJECT_ROOT / "scripts" APPLESCRIPT_DIR = PROJECT_ROOT / "applescript" # 디렉토리 생성 LOGS_DIR.mkdir(exist_ok=True) DATA_DIR.mkdir(exist_ok=True) def setup_logger(name: str) -> logging.Logger: """모듈별 로거 설정 — 파일 + 콘솔 핸들러""" logger = logging.getLogger(name) if logger.handlers: return logger # 중복 핸들러 방지 logger.setLevel(logging.DEBUG) fmt = logging.Formatter("[%(asctime)s] [%(name)s] [%(levelname)s] %(message)s", datefmt="%Y-%m-%d %H:%M:%S") # 파일 핸들러 fh = logging.FileHandler(LOGS_DIR / f"{name}.log", encoding="utf-8") fh.setLevel(logging.DEBUG) fh.setFormatter(fmt) logger.addHandler(fh) # 콘솔 핸들러 ch = logging.StreamHandler(sys.stdout) ch.setLevel(logging.INFO) ch.setFormatter(fmt) logger.addHandler(ch) return logger def load_credentials() -> dict: """~/.config/pkm/credentials.env 로딩 + 누락 키 경고""" cred_path = Path.home() / ".config" / "pkm" / "credentials.env" if not cred_path.exists(): # 폴백: 프로젝트 내 credentials.env (개발용) cred_path = PROJECT_ROOT / "credentials.env" if cred_path.exists(): load_dotenv(cred_path) else: print(f"[경고] credentials.env를 찾을 수 없습니다: {cred_path}") keys = { "CLAUDE_API_KEY": os.getenv("CLAUDE_API_KEY"), "LAW_OC": os.getenv("LAW_OC"), "NAS_DOMAIN": os.getenv("NAS_DOMAIN"), "NAS_TAILSCALE_IP": os.getenv("NAS_TAILSCALE_IP"), "NAS_PORT": os.getenv("NAS_PORT", "15001"), "MAILPLUS_HOST": os.getenv("MAILPLUS_HOST"), "MAILPLUS_PORT": os.getenv("MAILPLUS_PORT", "993"), "MAILPLUS_USER": os.getenv("MAILPLUS_USER"), "MAILPLUS_PASS": os.getenv("MAILPLUS_PASS"), "GPU_SERVER_IP": os.getenv("GPU_SERVER_IP"), } missing = [k for k, v in keys.items() if not v and k not in ("GPU_SERVER_IP", "CLAUDE_API_KEY")] if missing: print(f"[경고] 누락된 인증 키: {', '.join(missing)}") return keys def run_applescript(script_path: str, *args) -> str: """osascript 호출 래퍼 + 에러 캡처""" cmd = ["osascript", str(script_path)] + [str(a) for a in args] try: result = subprocess.run(cmd, capture_output=True, text=True, timeout=120) if result.returncode != 0: raise RuntimeError(f"AppleScript 에러: {result.stderr.strip()}") return result.stdout.strip() except subprocess.TimeoutExpired: raise RuntimeError(f"AppleScript 타임아웃: {script_path}") def run_applescript_inline(script: str) -> str: """인라인 AppleScript 실행""" cmd = ["osascript", "-e", script] try: result = subprocess.run(cmd, capture_output=True, text=True, timeout=120) if result.returncode != 0: raise RuntimeError(f"AppleScript 에러: {result.stderr.strip()}") return result.stdout.strip() except subprocess.TimeoutExpired: raise RuntimeError("AppleScript 타임아웃 (인라인)") def llm_generate(prompt: str, model: str = "mlx-community/Qwen3.5-35B-A3B-4bit", host: str = "http://localhost:8800", json_mode: bool = False) -> str: """MLX 서버 API 호출 (OpenAI 호환)""" import requests messages = [{"role": "user", "content": prompt}] resp = requests.post(f"{host}/v1/chat/completions", json={ "model": model, "messages": messages, "temperature": 0.3, "max_tokens": 4096, }, timeout=300) resp.raise_for_status() content = resp.json()["choices"][0]["message"]["content"] if not json_mode: return content # JSON 모드: thinking 허용 → 마지막 유효 JSON 객체 추출 import re import json as _json # 배열이 포함된 JSON 객체 매칭 all_jsons = re.findall(r'\{[^{}]*(?:\[[^\]]*\])?[^{}]*\}', content) for j in reversed(all_jsons): try: parsed = _json.loads(j) if any(k in parsed for k in ("domain_db", "tags", "domain", "classification")): return j except _json.JSONDecodeError: continue # 폴백: 전체에서 가장 큰 JSON 추출 json_match = re.search(r'\{[\s\S]*\}', content) return json_match.group(0) if json_match else content # 하위호환 별칭 ollama_generate = llm_generate def count_log_errors(log_file: Path, since_hours: int = 24) -> int: """로그 파일에서 최근 N시간 ERROR 카운트""" from datetime import datetime, timedelta if not log_file.exists(): return 0 cutoff = datetime.now() - timedelta(hours=since_hours) count = 0 with open(log_file, "r", encoding="utf-8") as f: for line in f: if "[ERROR]" in line: try: ts_str = line[1:20] # [YYYY-MM-DD HH:MM:SS] ts = datetime.strptime(ts_str, "%Y-%m-%d %H:%M:%S") if ts >= cutoff: count += 1 except (ValueError, IndexError): count += 1 return count