162 lines
5.6 KiB
Python
162 lines
5.6 KiB
Python
"""
|
|
PKM 시스템 공통 유틸리티
|
|
- 로거 설정 (파일 + 콘솔)
|
|
- credentials.env 로딩
|
|
- osascript 호출 래퍼
|
|
"""
|
|
|
|
import os
|
|
import sys
|
|
import logging
|
|
import subprocess
|
|
from pathlib import Path
|
|
from dotenv import load_dotenv
|
|
|
|
# 프로젝트 루트 디렉토리
|
|
PROJECT_ROOT = Path(__file__).parent.parent
|
|
LOGS_DIR = PROJECT_ROOT / "logs"
|
|
DATA_DIR = PROJECT_ROOT / "data"
|
|
SCRIPTS_DIR = PROJECT_ROOT / "scripts"
|
|
APPLESCRIPT_DIR = PROJECT_ROOT / "applescript"
|
|
|
|
# 디렉토리 생성
|
|
LOGS_DIR.mkdir(exist_ok=True)
|
|
DATA_DIR.mkdir(exist_ok=True)
|
|
|
|
|
|
def setup_logger(name: str) -> logging.Logger:
|
|
"""모듈별 로거 설정 — 파일 + 콘솔 핸들러"""
|
|
logger = logging.getLogger(name)
|
|
if logger.handlers:
|
|
return logger # 중복 핸들러 방지
|
|
|
|
logger.setLevel(logging.DEBUG)
|
|
fmt = logging.Formatter("[%(asctime)s] [%(name)s] [%(levelname)s] %(message)s",
|
|
datefmt="%Y-%m-%d %H:%M:%S")
|
|
|
|
# 파일 핸들러
|
|
fh = logging.FileHandler(LOGS_DIR / f"{name}.log", encoding="utf-8")
|
|
fh.setLevel(logging.DEBUG)
|
|
fh.setFormatter(fmt)
|
|
logger.addHandler(fh)
|
|
|
|
# 콘솔 핸들러
|
|
ch = logging.StreamHandler(sys.stdout)
|
|
ch.setLevel(logging.INFO)
|
|
ch.setFormatter(fmt)
|
|
logger.addHandler(ch)
|
|
|
|
return logger
|
|
|
|
|
|
def load_credentials() -> dict:
|
|
"""~/.config/pkm/credentials.env 로딩 + 누락 키 경고"""
|
|
cred_path = Path.home() / ".config" / "pkm" / "credentials.env"
|
|
if not cred_path.exists():
|
|
# 폴백: 프로젝트 내 credentials.env (개발용)
|
|
cred_path = PROJECT_ROOT / "credentials.env"
|
|
|
|
if cred_path.exists():
|
|
load_dotenv(cred_path)
|
|
else:
|
|
print(f"[경고] credentials.env를 찾을 수 없습니다: {cred_path}")
|
|
|
|
keys = {
|
|
"CLAUDE_API_KEY": os.getenv("CLAUDE_API_KEY"),
|
|
"LAW_OC": os.getenv("LAW_OC"),
|
|
"NAS_DOMAIN": os.getenv("NAS_DOMAIN"),
|
|
"NAS_TAILSCALE_IP": os.getenv("NAS_TAILSCALE_IP"),
|
|
"NAS_PORT": os.getenv("NAS_PORT", "15001"),
|
|
"MAILPLUS_HOST": os.getenv("MAILPLUS_HOST"),
|
|
"MAILPLUS_PORT": os.getenv("MAILPLUS_PORT", "993"),
|
|
"MAILPLUS_USER": os.getenv("MAILPLUS_USER"),
|
|
"MAILPLUS_PASS": os.getenv("MAILPLUS_PASS"),
|
|
"GPU_SERVER_IP": os.getenv("GPU_SERVER_IP"),
|
|
}
|
|
|
|
missing = [k for k, v in keys.items() if not v and k not in ("GPU_SERVER_IP", "CLAUDE_API_KEY")]
|
|
if missing:
|
|
print(f"[경고] 누락된 인증 키: {', '.join(missing)}")
|
|
|
|
return keys
|
|
|
|
|
|
def run_applescript(script_path: str, *args) -> str:
|
|
"""osascript 호출 래퍼 + 에러 캡처"""
|
|
cmd = ["osascript", str(script_path)] + [str(a) for a in args]
|
|
try:
|
|
result = subprocess.run(cmd, capture_output=True, text=True, timeout=120)
|
|
if result.returncode != 0:
|
|
raise RuntimeError(f"AppleScript 에러: {result.stderr.strip()}")
|
|
return result.stdout.strip()
|
|
except subprocess.TimeoutExpired:
|
|
raise RuntimeError(f"AppleScript 타임아웃: {script_path}")
|
|
|
|
|
|
def run_applescript_inline(script: str) -> str:
|
|
"""인라인 AppleScript 실행"""
|
|
cmd = ["osascript", "-e", script]
|
|
try:
|
|
result = subprocess.run(cmd, capture_output=True, text=True, timeout=120)
|
|
if result.returncode != 0:
|
|
raise RuntimeError(f"AppleScript 에러: {result.stderr.strip()}")
|
|
return result.stdout.strip()
|
|
except subprocess.TimeoutExpired:
|
|
raise RuntimeError("AppleScript 타임아웃 (인라인)")
|
|
|
|
|
|
def llm_generate(prompt: str, model: str = "mlx-community/Qwen3.5-35B-A3B-4bit",
|
|
host: str = "http://localhost:8800", json_mode: bool = False) -> str:
|
|
"""MLX 서버 API 호출 (OpenAI 호환)"""
|
|
import requests
|
|
messages = []
|
|
if json_mode:
|
|
messages.append({"role": "system", "content": "IMPORTANT: Output ONLY valid JSON. No thinking process, no explanation, no markdown fences. Start your response with { and end with }."})
|
|
messages.append({"role": "user", "content": prompt})
|
|
resp = requests.post(f"{host}/v1/chat/completions", json={
|
|
"model": model,
|
|
"messages": messages,
|
|
"temperature": 0.1 if json_mode else 0.3,
|
|
"max_tokens": 2048,
|
|
}, timeout=180)
|
|
resp.raise_for_status()
|
|
content = resp.json()["choices"][0]["message"]["content"]
|
|
# thinking 블록 제거 (Qwen3.5 thinking 모델 대응)
|
|
if "<think>" in content and "</think>" in content:
|
|
content = content.split("</think>")[-1].strip()
|
|
# JSON 블록 추출
|
|
if "```json" in content:
|
|
content = content.split("```json")[1].split("```")[0].strip()
|
|
elif "```" in content:
|
|
content = content.split("```")[1].split("```")[0].strip()
|
|
# { 로 시작하는 JSON 추출
|
|
import re
|
|
json_match = re.search(r'\{[\s\S]*\}', content)
|
|
if json_match:
|
|
content = json_match.group(0)
|
|
return content
|
|
|
|
|
|
# 하위호환 별칭
|
|
ollama_generate = llm_generate
|
|
|
|
|
|
def count_log_errors(log_file: Path, since_hours: int = 24) -> int:
|
|
"""로그 파일에서 최근 N시간 ERROR 카운트"""
|
|
from datetime import datetime, timedelta
|
|
if not log_file.exists():
|
|
return 0
|
|
cutoff = datetime.now() - timedelta(hours=since_hours)
|
|
count = 0
|
|
with open(log_file, "r", encoding="utf-8") as f:
|
|
for line in f:
|
|
if "[ERROR]" in line:
|
|
try:
|
|
ts_str = line[1:20] # [YYYY-MM-DD HH:MM:SS]
|
|
ts = datetime.strptime(ts_str, "%Y-%m-%d %H:%M:%S")
|
|
if ts >= cutoff:
|
|
count += 1
|
|
except (ValueError, IndexError):
|
|
count += 1
|
|
return count
|