Files
hyungi_document_server/scripts/pkm_utils.py
2026-03-26 14:13:48 +09:00

165 lines
5.5 KiB
Python

"""
PKM 시스템 공통 유틸리티
- 로거 설정 (파일 + 콘솔)
- credentials.env 로딩
- osascript 호출 래퍼
"""
import os
import sys
import logging
import subprocess
from pathlib import Path
from dotenv import load_dotenv
# 프로젝트 루트 디렉토리
PROJECT_ROOT = Path(__file__).parent.parent
LOGS_DIR = PROJECT_ROOT / "logs"
DATA_DIR = PROJECT_ROOT / "data"
SCRIPTS_DIR = PROJECT_ROOT / "scripts"
APPLESCRIPT_DIR = PROJECT_ROOT / "applescript"
# 디렉토리 생성
LOGS_DIR.mkdir(exist_ok=True)
DATA_DIR.mkdir(exist_ok=True)
def setup_logger(name: str) -> logging.Logger:
"""모듈별 로거 설정 — 파일 + 콘솔 핸들러"""
logger = logging.getLogger(name)
if logger.handlers:
return logger # 중복 핸들러 방지
logger.setLevel(logging.DEBUG)
fmt = logging.Formatter("[%(asctime)s] [%(name)s] [%(levelname)s] %(message)s",
datefmt="%Y-%m-%d %H:%M:%S")
# 파일 핸들러
fh = logging.FileHandler(LOGS_DIR / f"{name}.log", encoding="utf-8")
fh.setLevel(logging.DEBUG)
fh.setFormatter(fmt)
logger.addHandler(fh)
# 콘솔 핸들러
ch = logging.StreamHandler(sys.stdout)
ch.setLevel(logging.INFO)
ch.setFormatter(fmt)
logger.addHandler(ch)
return logger
def load_credentials() -> dict:
"""~/.config/pkm/credentials.env 로딩 + 누락 키 경고"""
cred_path = Path.home() / ".config" / "pkm" / "credentials.env"
if not cred_path.exists():
# 폴백: 프로젝트 내 credentials.env (개발용)
cred_path = PROJECT_ROOT / "credentials.env"
if cred_path.exists():
load_dotenv(cred_path)
else:
print(f"[경고] credentials.env를 찾을 수 없습니다: {cred_path}")
keys = {
"CLAUDE_API_KEY": os.getenv("CLAUDE_API_KEY"),
"LAW_OC": os.getenv("LAW_OC"),
"NAS_DOMAIN": os.getenv("NAS_DOMAIN"),
"NAS_TAILSCALE_IP": os.getenv("NAS_TAILSCALE_IP"),
"NAS_PORT": os.getenv("NAS_PORT", "15001"),
"MAILPLUS_HOST": os.getenv("MAILPLUS_HOST"),
"MAILPLUS_PORT": os.getenv("MAILPLUS_PORT", "993"),
"MAILPLUS_USER": os.getenv("MAILPLUS_USER"),
"MAILPLUS_PASS": os.getenv("MAILPLUS_PASS"),
"GPU_SERVER_IP": os.getenv("GPU_SERVER_IP"),
}
missing = [k for k, v in keys.items() if not v and k not in ("GPU_SERVER_IP", "CLAUDE_API_KEY")]
if missing:
print(f"[경고] 누락된 인증 키: {', '.join(missing)}")
return keys
def run_applescript(script_path: str, *args) -> str:
"""osascript 호출 래퍼 + 에러 캡처"""
cmd = ["osascript", str(script_path)] + [str(a) for a in args]
try:
result = subprocess.run(cmd, capture_output=True, text=True, timeout=120)
if result.returncode != 0:
raise RuntimeError(f"AppleScript 에러: {result.stderr.strip()}")
return result.stdout.strip()
except subprocess.TimeoutExpired:
raise RuntimeError(f"AppleScript 타임아웃: {script_path}")
def run_applescript_inline(script: str) -> str:
"""인라인 AppleScript 실행 — 행별 -e 분할 방식"""
lines = script.strip().split('\n')
cmd = ["osascript"]
for line in lines:
cmd.extend(["-e", line])
try:
result = subprocess.run(cmd, capture_output=True, text=True, timeout=120)
if result.returncode != 0:
raise RuntimeError(f"AppleScript 에러: {result.stderr.strip()}")
return result.stdout.strip()
except subprocess.TimeoutExpired:
raise RuntimeError("AppleScript 타임아웃 (인라인)")
def llm_generate(prompt: str, model: str = "mlx-community/Qwen3.5-35B-A3B-4bit",
host: str = "http://localhost:8800", json_mode: bool = False) -> str:
"""MLX 서버 API 호출 (OpenAI 호환)"""
import requests
messages = [{"role": "user", "content": prompt}]
resp = requests.post(f"{host}/v1/chat/completions", json={
"model": model,
"messages": messages,
"temperature": 0.3,
"max_tokens": 4096,
}, timeout=300)
resp.raise_for_status()
content = resp.json()["choices"][0]["message"]["content"]
if not json_mode:
return content
# JSON 모드: thinking 허용 → 마지막 유효 JSON 객체 추출
import re
import json as _json
# 배열이 포함된 JSON 객체 매칭
all_jsons = re.findall(r'\{[^{}]*(?:\[[^\]]*\])?[^{}]*\}', content)
for j in reversed(all_jsons):
try:
parsed = _json.loads(j)
if any(k in parsed for k in ("domain_db", "tags", "domain", "classification")):
return j
except _json.JSONDecodeError:
continue
# 폴백: 전체에서 가장 큰 JSON 추출
json_match = re.search(r'\{[\s\S]*\}', content)
return json_match.group(0) if json_match else content
# 하위호환 별칭
ollama_generate = llm_generate
def count_log_errors(log_file: Path, since_hours: int = 24) -> int:
"""로그 파일에서 최근 N시간 ERROR 카운트"""
from datetime import datetime, timedelta
if not log_file.exists():
return 0
cutoff = datetime.now() - timedelta(hours=since_hours)
count = 0
with open(log_file, "r", encoding="utf-8") as f:
for line in f:
if "[ERROR]" in line:
try:
ts_str = line[1:20] # [YYYY-MM-DD HH:MM:SS]
ts = datetime.strptime(ts_str, "%Y-%m-%d %H:%M:%S")
if ts >= cutoff:
count += 1
except (ValueError, IndexError):
count += 1
return count