167 lines
5.7 KiB
Python
167 lines
5.7 KiB
Python
"""
|
|
PKM 시스템 공통 유틸리티
|
|
- 로거 설정 (파일 + 콘솔)
|
|
- credentials.env 로딩
|
|
- osascript 호출 래퍼
|
|
"""
|
|
|
|
import os
|
|
import sys
|
|
import logging
|
|
import subprocess
|
|
from pathlib import Path
|
|
from dotenv import load_dotenv
|
|
|
|
# 프로젝트 루트 디렉토리
|
|
PROJECT_ROOT = Path(__file__).parent.parent
|
|
LOGS_DIR = PROJECT_ROOT / "logs"
|
|
DATA_DIR = PROJECT_ROOT / "data"
|
|
SCRIPTS_DIR = PROJECT_ROOT / "scripts"
|
|
APPLESCRIPT_DIR = PROJECT_ROOT / "applescript"
|
|
|
|
# 디렉토리 생성
|
|
LOGS_DIR.mkdir(exist_ok=True)
|
|
DATA_DIR.mkdir(exist_ok=True)
|
|
|
|
|
|
def setup_logger(name: str) -> logging.Logger:
|
|
"""모듈별 로거 설정 — 파일 + 콘솔 핸들러"""
|
|
logger = logging.getLogger(name)
|
|
if logger.handlers:
|
|
return logger # 중복 핸들러 방지
|
|
|
|
logger.setLevel(logging.DEBUG)
|
|
fmt = logging.Formatter("[%(asctime)s] [%(name)s] [%(levelname)s] %(message)s",
|
|
datefmt="%Y-%m-%d %H:%M:%S")
|
|
|
|
# 파일 핸들러
|
|
fh = logging.FileHandler(LOGS_DIR / f"{name}.log", encoding="utf-8")
|
|
fh.setLevel(logging.DEBUG)
|
|
fh.setFormatter(fmt)
|
|
logger.addHandler(fh)
|
|
|
|
# 콘솔 핸들러
|
|
ch = logging.StreamHandler(sys.stdout)
|
|
ch.setLevel(logging.INFO)
|
|
ch.setFormatter(fmt)
|
|
logger.addHandler(ch)
|
|
|
|
return logger
|
|
|
|
|
|
def load_credentials() -> dict:
|
|
"""~/.config/pkm/credentials.env 로딩 + 누락 키 경고"""
|
|
cred_path = Path.home() / ".config" / "pkm" / "credentials.env"
|
|
if not cred_path.exists():
|
|
# 폴백: 프로젝트 내 credentials.env (개발용)
|
|
cred_path = PROJECT_ROOT / "credentials.env"
|
|
|
|
if cred_path.exists():
|
|
load_dotenv(cred_path)
|
|
else:
|
|
print(f"[경고] credentials.env를 찾을 수 없습니다: {cred_path}")
|
|
|
|
keys = {
|
|
"CLAUDE_API_KEY": os.getenv("CLAUDE_API_KEY"),
|
|
"LAW_OC": os.getenv("LAW_OC"),
|
|
"NAS_DOMAIN": os.getenv("NAS_DOMAIN"),
|
|
"NAS_TAILSCALE_IP": os.getenv("NAS_TAILSCALE_IP"),
|
|
"NAS_PORT": os.getenv("NAS_PORT", "15001"),
|
|
"MAILPLUS_HOST": os.getenv("MAILPLUS_HOST"),
|
|
"MAILPLUS_PORT": os.getenv("MAILPLUS_PORT", "993"),
|
|
"MAILPLUS_USER": os.getenv("MAILPLUS_USER"),
|
|
"MAILPLUS_PASS": os.getenv("MAILPLUS_PASS"),
|
|
"GPU_SERVER_IP": os.getenv("GPU_SERVER_IP"),
|
|
}
|
|
|
|
missing = [k for k, v in keys.items() if not v and k not in ("GPU_SERVER_IP", "CLAUDE_API_KEY")]
|
|
if missing:
|
|
print(f"[경고] 누락된 인증 키: {', '.join(missing)}")
|
|
|
|
return keys
|
|
|
|
|
|
def run_applescript(script_path: str, *args) -> str:
|
|
"""osascript 호출 래퍼 + 에러 캡처"""
|
|
cmd = ["osascript", str(script_path)] + [str(a) for a in args]
|
|
try:
|
|
result = subprocess.run(cmd, capture_output=True, text=True, timeout=120)
|
|
if result.returncode != 0:
|
|
raise RuntimeError(f"AppleScript 에러: {result.stderr.strip()}")
|
|
return result.stdout.strip()
|
|
except subprocess.TimeoutExpired:
|
|
raise RuntimeError(f"AppleScript 타임아웃: {script_path}")
|
|
|
|
|
|
def run_applescript_inline(script: str) -> str:
|
|
"""인라인 AppleScript 실행 — 임시 파일 방식 (특수문자 안전)"""
|
|
import tempfile
|
|
tmp = tempfile.NamedTemporaryFile(mode='w', suffix='.applescript', delete=False, encoding='utf-8')
|
|
try:
|
|
tmp.write(script)
|
|
tmp.close()
|
|
result = subprocess.run(["osascript", tmp.name], capture_output=True, text=True, timeout=120)
|
|
if result.returncode != 0:
|
|
raise RuntimeError(f"AppleScript 에러: {result.stderr.strip()}")
|
|
return result.stdout.strip()
|
|
except subprocess.TimeoutExpired:
|
|
raise RuntimeError("AppleScript 타임아웃 (인라인)")
|
|
finally:
|
|
os.unlink(tmp.name)
|
|
|
|
|
|
def llm_generate(prompt: str, model: str = "mlx-community/Qwen3.5-35B-A3B-4bit",
|
|
host: str = "http://localhost:8800", json_mode: bool = False) -> str:
|
|
"""MLX 서버 API 호출 (OpenAI 호환)"""
|
|
import requests
|
|
messages = [{"role": "user", "content": prompt}]
|
|
resp = requests.post(f"{host}/v1/chat/completions", json={
|
|
"model": model,
|
|
"messages": messages,
|
|
"temperature": 0.3,
|
|
"max_tokens": 4096,
|
|
}, timeout=300)
|
|
resp.raise_for_status()
|
|
content = resp.json()["choices"][0]["message"]["content"]
|
|
if not json_mode:
|
|
return content
|
|
# JSON 모드: thinking 허용 → 마지막 유효 JSON 객체 추출
|
|
import re
|
|
import json as _json
|
|
# 배열이 포함된 JSON 객체 매칭
|
|
all_jsons = re.findall(r'\{[^{}]*(?:\[[^\]]*\])?[^{}]*\}', content)
|
|
for j in reversed(all_jsons):
|
|
try:
|
|
parsed = _json.loads(j)
|
|
if any(k in parsed for k in ("domain_db", "tags", "domain", "classification")):
|
|
return j
|
|
except _json.JSONDecodeError:
|
|
continue
|
|
# 폴백: 전체에서 가장 큰 JSON 추출
|
|
json_match = re.search(r'\{[\s\S]*\}', content)
|
|
return json_match.group(0) if json_match else content
|
|
|
|
|
|
# 하위호환 별칭
|
|
ollama_generate = llm_generate
|
|
|
|
|
|
def count_log_errors(log_file: Path, since_hours: int = 24) -> int:
|
|
"""로그 파일에서 최근 N시간 ERROR 카운트"""
|
|
from datetime import datetime, timedelta
|
|
if not log_file.exists():
|
|
return 0
|
|
cutoff = datetime.now() - timedelta(hours=since_hours)
|
|
count = 0
|
|
with open(log_file, "r", encoding="utf-8") as f:
|
|
for line in f:
|
|
if "[ERROR]" in line:
|
|
try:
|
|
ts_str = line[1:20] # [YYYY-MM-DD HH:MM:SS]
|
|
ts = datetime.strptime(ts_str, "%Y-%m-%d %H:%M:%S")
|
|
if ts >= cutoff:
|
|
count += 1
|
|
except (ValueError, IndexError):
|
|
count += 1
|
|
return count
|