feat: 전체 PKM 스크립트 일괄 작성 — 분류/법령/메일/다이제스트/임베딩

- scripts/pkm_utils.py: 공통 유틸 (로거, dotenv, osascript 래퍼)
- scripts/prompts/classify_document.txt: Ollama 분류 프롬프트
- applescript/auto_classify.scpt: Inbox → AI 분류 → DB 이동
- applescript/omnifocus_sync.scpt: Projects → OmniFocus 작업 생성
- scripts/law_monitor.py: 법령 변경 모니터링 + DEVONthink 임포트
- scripts/mailplus_archive.py: MailPlus IMAP → Archive DB
- scripts/pkm_daily_digest.py: 일일 다이제스트 + OmniFocus 액션
- scripts/embed_to_chroma.py: GPU 서버 벡터 임베딩 → ChromaDB
- launchd/*.plist: 3개 스케줄 (07:00, 07:00+18:00, 20:00)
- docs/deploy.md: Mac mini 배포 가이드
- docs/devonagent-setup.md: 검색 세트 9종 설정 가이드
- tests/test_classify.py: 5종 문서 분류 테스트

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Hyungi Ahn
2026-03-26 12:32:36 +09:00
parent bec9579a8a
commit 084d3a8c63
14 changed files with 1564 additions and 0 deletions

104
scripts/embed_to_chroma.py Normal file
View File

@@ -0,0 +1,104 @@
#!/usr/bin/env python3
"""
벡터 임베딩 스크립트
- DEVONthink 문서 UUID로 텍스트 추출
- GPU 서버(nomic-embed-text)로 임베딩 생성
- ChromaDB에 저장
"""
import os
import sys
import requests
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent))
from pkm_utils import setup_logger, load_credentials, run_applescript_inline
logger = setup_logger("embed")
# ChromaDB 저장 경로
CHROMA_DIR = Path.home() / ".local" / "share" / "pkm" / "chromadb"
CHROMA_DIR.mkdir(parents=True, exist_ok=True)
def get_document_text(uuid: str) -> tuple[str, str]:
"""DEVONthink에서 UUID로 문서 텍스트 + 제목 추출"""
script = f'''
tell application id "DNtp"
set theRecord to get record with uuid "{uuid}"
set docText to plain text of theRecord
set docTitle to name of theRecord
return docTitle & "|||" & docText
end tell
'''
result = run_applescript_inline(script)
parts = result.split("|||", 1)
title = parts[0] if len(parts) > 0 else ""
text = parts[1] if len(parts) > 1 else ""
return title, text
def get_embedding(text: str, gpu_server_ip: str) -> list[float] | None:
"""GPU 서버의 nomic-embed-text로 임베딩 생성"""
url = f"http://{gpu_server_ip}:11434/api/embeddings"
try:
resp = requests.post(url, json={
"model": "nomic-embed-text",
"prompt": text[:8000] # 토큰 제한
}, timeout=60)
resp.raise_for_status()
return resp.json().get("embedding")
except Exception as e:
logger.error(f"임베딩 생성 실패: {e}")
return None
def store_in_chromadb(doc_id: str, title: str, text: str, embedding: list[float]):
"""ChromaDB에 저장"""
import chromadb
client = chromadb.PersistentClient(path=str(CHROMA_DIR))
collection = client.get_or_create_collection(
name="pkm_documents",
metadata={"hnsw:space": "cosine"}
)
collection.upsert(
ids=[doc_id],
embeddings=[embedding],
documents=[text[:2000]],
metadatas=[{"title": title, "source": "devonthink"}]
)
logger.info(f"ChromaDB 저장: {doc_id} ({title[:30]})")
def run(uuid: str):
"""단일 문서 임베딩 처리"""
logger.info(f"임베딩 처리 시작: {uuid}")
creds = load_credentials()
gpu_ip = creds.get("GPU_SERVER_IP")
if not gpu_ip:
logger.warning("GPU_SERVER_IP 미설정 — 임베딩 건너뜀")
return
try:
title, text = get_document_text(uuid)
if not text or len(text) < 10:
logger.warning(f"텍스트 부족 [{uuid}]: {len(text)}")
return
embedding = get_embedding(text, gpu_ip)
if embedding:
store_in_chromadb(uuid, title, text, embedding)
logger.info(f"임베딩 완료: {uuid}")
else:
logger.error(f"임베딩 실패: {uuid}")
except Exception as e:
logger.error(f"임베딩 처리 에러 [{uuid}]: {e}", exc_info=True)
if __name__ == "__main__":
if len(sys.argv) < 2:
print("사용법: python3 embed_to_chroma.py <DEVONthink_UUID>")
sys.exit(1)
run(sys.argv[1])

184
scripts/law_monitor.py Normal file
View File

@@ -0,0 +1,184 @@
#!/usr/bin/env python3
"""
법령 모니터링 스크립트
- 국가법령정보센터 OpenAPI (open.law.go.kr) 폴링
- 산업안전보건법, 중대재해처벌법 등 변경 추적
- 변경 감지 시 DEVONthink 04_Industrial Safety 자동 임포트
※ API 승인 대기중 — 스크립트만 작성, 실제 호출은 승인 후
"""
import os
import sys
import json
import requests
import xml.etree.ElementTree as ET
from datetime import datetime, timedelta
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent))
from pkm_utils import setup_logger, load_credentials, run_applescript_inline, PROJECT_ROOT, DATA_DIR
logger = setup_logger("law_monitor")
# 모니터링 대상 법령
MONITORED_LAWS = [
{"name": "산업안전보건법", "law_id": "001789", "category": "법률"},
{"name": "산업안전보건법 시행령", "law_id": "001790", "category": "대통령령"},
{"name": "산업안전보건법 시행규칙", "law_id": "001791", "category": "부령"},
{"name": "중대재해 처벌 등에 관한 법률", "law_id": "019005", "category": "법률"},
{"name": "중대재해 처벌 등에 관한 법률 시행령", "law_id": "019006", "category": "대통령령"},
{"name": "화학물질관리법", "law_id": "012354", "category": "법률"},
{"name": "위험물안전관리법", "law_id": "001478", "category": "법률"},
]
# 마지막 확인 일자 저장 파일
LAST_CHECK_FILE = DATA_DIR / "law_last_check.json"
LAWS_DIR = DATA_DIR / "laws"
LAWS_DIR.mkdir(exist_ok=True)
def load_last_check() -> dict:
"""마지막 확인 일자 로딩"""
if LAST_CHECK_FILE.exists():
with open(LAST_CHECK_FILE, "r") as f:
return json.load(f)
return {}
def save_last_check(data: dict):
"""마지막 확인 일자 저장"""
with open(LAST_CHECK_FILE, "w") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
def fetch_law_info(law_oc: str, law_id: str) -> dict | None:
"""법령 정보 조회 (법령 API)"""
url = "https://www.law.go.kr/DRF/lawSearch.do"
params = {
"OC": law_oc,
"target": "law",
"type": "JSON",
"MST": law_id,
}
try:
resp = requests.get(url, params=params, timeout=30)
resp.raise_for_status()
data = resp.json()
if "LawSearch" in data and "law" in data["LawSearch"]:
laws = data["LawSearch"]["law"]
if isinstance(laws, list):
return laws[0] if laws else None
return laws
return None
except Exception as e:
logger.error(f"법령 조회 실패 [{law_id}]: {e}")
return None
def fetch_law_text(law_oc: str, law_mst: str) -> str | None:
"""법령 본문 XML 다운로드"""
url = "https://www.law.go.kr/DRF/lawService.do"
params = {
"OC": law_oc,
"target": "law",
"type": "XML",
"MST": law_mst,
}
try:
resp = requests.get(url, params=params, timeout=60)
resp.raise_for_status()
return resp.text
except Exception as e:
logger.error(f"법령 본문 다운로드 실패 [{law_mst}]: {e}")
return None
def save_law_file(law_name: str, content: str) -> Path:
"""법령 XML 저장"""
today = datetime.now().strftime("%Y%m%d")
safe_name = law_name.replace(" ", "_").replace("/", "_")
filepath = LAWS_DIR / f"{safe_name}_{today}.xml"
with open(filepath, "w", encoding="utf-8") as f:
f.write(content)
logger.info(f"법령 저장: {filepath}")
return filepath
def import_to_devonthink(filepath: Path, law_name: str, category: str):
"""DEVONthink 04_Industrial Safety로 임포트"""
script = f'''
tell application id "DNtp"
set targetDB to missing value
repeat with db in databases
if name of db is "04_Industrial safety" then
set targetDB to db
exit repeat
end if
end repeat
if targetDB is not missing value then
set targetGroup to create location "/10_Legislation/Law" in targetDB
set theRecord to import POSIX path "{filepath}" to targetGroup
set tags of theRecord to {{"#주제/산업안전/법령", "$유형/법령", "{category}"}}
add custom meta data "law_monitor" for "sourceChannel" to theRecord
add custom meta data "external" for "dataOrigin" to theRecord
add custom meta data (current date) for "lastAIProcess" to theRecord
end if
end tell
'''
try:
run_applescript_inline(script)
logger.info(f"DEVONthink 임포트 완료: {law_name}")
except Exception as e:
logger.error(f"DEVONthink 임포트 실패 [{law_name}]: {e}")
def run():
"""메인 실행"""
logger.info("=== 법령 모니터링 시작 ===")
creds = load_credentials()
law_oc = creds.get("LAW_OC")
if not law_oc:
logger.error("LAW_OC 인증키가 설정되지 않았습니다. credentials.env를 확인하세요.")
sys.exit(1)
last_check = load_last_check()
changes_found = 0
for law in MONITORED_LAWS:
law_name = law["name"]
law_id = law["law_id"]
category = law["category"]
logger.info(f"확인 중: {law_name} ({law_id})")
info = fetch_law_info(law_oc, law_id)
if not info:
continue
# 시행일자 또는 공포일자로 변경 감지
announce_date = info.get("공포일자", info.get("시행일자", ""))
prev_date = last_check.get(law_id, "")
if announce_date and announce_date != prev_date:
logger.info(f"변경 감지: {law_name} — 공포일자 {announce_date} (이전: {prev_date or '없음'})")
# 법령 본문 다운로드
law_mst = info.get("법령MST", law_id)
text = fetch_law_text(law_oc, law_mst)
if text:
filepath = save_law_file(law_name, text)
import_to_devonthink(filepath, law_name, category)
changes_found += 1
last_check[law_id] = announce_date
else:
logger.debug(f"변경 없음: {law_name}")
save_last_check(last_check)
logger.info(f"=== 법령 모니터링 완료 — {changes_found}건 변경 감지 ===")
if __name__ == "__main__":
run()

209
scripts/mailplus_archive.py Normal file
View File

@@ -0,0 +1,209 @@
#!/usr/bin/env python3
"""
MailPlus → DEVONthink Archive DB 이메일 수집
- Synology MailPlus IMAP 접속
- 마지막 동기화 이후 새 메일 가져오기
- DEVONthink Archive DB 임포트
"""
import os
import sys
import imaplib
import email
from email.header import decode_header
from datetime import datetime
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent))
from pkm_utils import setup_logger, load_credentials, run_applescript_inline, DATA_DIR
logger = setup_logger("mailplus")
LAST_UID_FILE = DATA_DIR / "mailplus_last_uid.txt"
MAIL_TMP_DIR = DATA_DIR / "mail_tmp"
MAIL_TMP_DIR.mkdir(exist_ok=True)
# 안전 관련 키워드 (dataOrigin 판별용)
SAFETY_KEYWORDS = [
"안전", "위험", "사고", "재해", "점검", "보건", "화학물질",
"OSHA", "safety", "hazard", "incident", "KOSHA"
]
def decode_mime_header(value: str) -> str:
"""MIME 헤더 디코딩"""
if not value:
return ""
decoded_parts = decode_header(value)
result = []
for part, charset in decoded_parts:
if isinstance(part, bytes):
result.append(part.decode(charset or "utf-8", errors="replace"))
else:
result.append(part)
return " ".join(result)
def load_last_uid() -> int:
"""마지막 처리 UID 로딩"""
if LAST_UID_FILE.exists():
return int(LAST_UID_FILE.read_text().strip())
return 0
def save_last_uid(uid: int):
"""마지막 처리 UID 저장"""
LAST_UID_FILE.write_text(str(uid))
def detect_data_origin(subject: str, body: str) -> str:
"""안전 키워드 감지로 dataOrigin 판별"""
text = (subject + " " + body).lower()
for kw in SAFETY_KEYWORDS:
if kw.lower() in text:
return "work"
return "external"
def save_email_file(msg: email.message.Message, uid: int) -> Path:
"""이메일을 .eml 파일로 저장"""
subject = decode_mime_header(msg.get("Subject", ""))
safe_subject = "".join(c if c.isalnum() or c in " _-" else "_" for c in subject)[:50]
date_str = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"{date_str}_{uid}_{safe_subject}.eml"
filepath = MAIL_TMP_DIR / filename
with open(filepath, "wb") as f:
f.write(msg.as_bytes())
return filepath
def get_email_body(msg: email.message.Message) -> str:
"""이메일 본문 추출"""
body = ""
if msg.is_multipart():
for part in msg.walk():
if part.get_content_type() == "text/plain":
payload = part.get_payload(decode=True)
if payload:
charset = part.get_content_charset() or "utf-8"
body += payload.decode(charset, errors="replace")
else:
payload = msg.get_payload(decode=True)
if payload:
charset = msg.get_content_charset() or "utf-8"
body = payload.decode(charset, errors="replace")
return body[:2000]
def import_to_devonthink(filepath: Path, subject: str, data_origin: str):
"""DEVONthink Archive DB로 임포트"""
escaped_path = str(filepath).replace('"', '\\"')
escaped_subject = subject.replace('"', '\\"').replace("'", "\\'")
script = f'''
tell application id "DNtp"
set targetDB to missing value
repeat with db in databases
if name of db is "Archive" then
set targetDB to db
exit repeat
end if
end repeat
if targetDB is not missing value then
set targetGroup to create location "/Email" in targetDB
set theRecord to import POSIX path "{escaped_path}" to targetGroup
add custom meta data "email" for "sourceChannel" to theRecord
add custom meta data "{data_origin}" for "dataOrigin" to theRecord
add custom meta data (current date) for "lastAIProcess" to theRecord
end if
end tell
'''
try:
run_applescript_inline(script)
logger.info(f"DEVONthink 임포트: {subject[:40]}")
except Exception as e:
logger.error(f"DEVONthink 임포트 실패: {e}")
def run():
"""메인 실행"""
logger.info("=== MailPlus 이메일 수집 시작 ===")
creds = load_credentials()
host = creds.get("MAILPLUS_HOST")
port = int(creds.get("MAILPLUS_PORT", "993"))
user = creds.get("MAILPLUS_USER")
password = creds.get("MAILPLUS_PASS")
if not all([host, user, password]):
logger.error("MAILPLUS 접속 정보가 불완전합니다. credentials.env를 확인하세요.")
sys.exit(1)
last_uid = load_last_uid()
logger.info(f"마지막 처리 UID: {last_uid}")
try:
# IMAP SSL 접속
mail = imaplib.IMAP4_SSL(host, port)
mail.login(user, password)
mail.select("INBOX")
logger.info("IMAP 접속 성공")
# 마지막 UID 이후 메일 검색
if last_uid > 0:
status, data = mail.uid("search", None, f"UID {last_uid + 1}:*")
else:
# 최초 실행: 최근 7일치만
from datetime import timedelta
since = (datetime.now() - timedelta(days=7)).strftime("%d-%b-%Y")
status, data = mail.uid("search", None, f"SINCE {since}")
if status != "OK":
logger.error(f"메일 검색 실패: {status}")
mail.logout()
sys.exit(1)
uids = data[0].split()
logger.info(f"새 메일: {len(uids)}")
max_uid = last_uid
imported = 0
for uid_bytes in uids:
uid = int(uid_bytes)
if uid <= last_uid:
continue
status, msg_data = mail.uid("fetch", uid_bytes, "(RFC822)")
if status != "OK":
continue
raw_email = msg_data[0][1]
msg = email.message_from_bytes(raw_email)
subject = decode_mime_header(msg.get("Subject", "(제목 없음)"))
body = get_email_body(msg)
data_origin = detect_data_origin(subject, body)
filepath = save_email_file(msg, uid)
import_to_devonthink(filepath, subject, data_origin)
max_uid = max(max_uid, uid)
imported += 1
if max_uid > last_uid:
save_last_uid(max_uid)
mail.logout()
logger.info(f"=== MailPlus 수집 완료 — {imported}건 임포트 ===")
except imaplib.IMAP4.error as e:
logger.error(f"IMAP 에러: {e}")
sys.exit(1)
except Exception as e:
logger.error(f"예상치 못한 에러: {e}", exc_info=True)
sys.exit(1)
if __name__ == "__main__":
run()

284
scripts/pkm_daily_digest.py Normal file
View File

@@ -0,0 +1,284 @@
#!/usr/bin/env python3
"""
PKM 일일 다이제스트
- DEVONthink 오늘 추가/수정 집계
- law_monitor 법령 변경 건 파싱
- OmniFocus 완료/추가/기한초과 집계
- 상위 뉴스 Ollama 요약
- OmniFocus 액션 자동 생성
- 90일 지난 다이제스트 아카이브
"""
import os
import sys
import re
from datetime import datetime, timedelta
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent))
from pkm_utils import (
setup_logger, load_credentials, run_applescript_inline,
ollama_generate, count_log_errors, PROJECT_ROOT, LOGS_DIR, DATA_DIR
)
logger = setup_logger("digest")
DIGEST_DIR = DATA_DIR / "digests"
DIGEST_DIR.mkdir(exist_ok=True)
def get_devonthink_stats() -> dict:
"""DEVONthink 오늘 추가/수정 문서 집계"""
script = '''
tell application id "DNtp"
set today to current date
set time of today to 0
set stats to {}
repeat with db in databases
set dbName to name of db
set addedCount to count of (search "date:today" in db)
set modifiedCount to count of (search "modified:today" in db)
if addedCount > 0 or modifiedCount > 0 then
set end of stats to dbName & ":" & addedCount & ":" & modifiedCount
end if
end repeat
set AppleScript's text item delimiters to "|"
return stats as text
end tell
'''
try:
result = run_applescript_inline(script)
stats = {}
if result:
for item in result.split("|"):
parts = item.split(":")
if len(parts) == 3:
stats[parts[0]] = {"added": int(parts[1]), "modified": int(parts[2])}
return stats
except Exception as e:
logger.error(f"DEVONthink 집계 실패: {e}")
return {}
def get_omnifocus_stats() -> dict:
"""OmniFocus 오늘 완료/추가/기한초과 집계"""
script = '''
tell application "OmniFocus"
tell default document
set today to current date
set time of today to 0
set tomorrow to today + 1 * days
set completedCount to count of (every flattened task whose completed is true and completion date ≥ today)
set addedCount to count of (every flattened task whose creation date ≥ today)
set overdueCount to count of (every flattened task whose completed is false and due date < today and due date is not missing value)
return (completedCount as text) & "|" & (addedCount as text) & "|" & (overdueCount as text)
end tell
end tell
'''
try:
result = run_applescript_inline(script)
parts = result.split("|")
return {
"completed": int(parts[0]) if len(parts) > 0 else 0,
"added": int(parts[1]) if len(parts) > 1 else 0,
"overdue": int(parts[2]) if len(parts) > 2 else 0,
}
except Exception as e:
logger.error(f"OmniFocus 집계 실패: {e}")
return {"completed": 0, "added": 0, "overdue": 0}
def parse_law_changes() -> list:
"""law_monitor 로그에서 오늘 법령 변경 건 파싱"""
log_file = LOGS_DIR / "law_monitor.log"
if not log_file.exists():
return []
today = datetime.now().strftime("%Y-%m-%d")
changes = []
with open(log_file, "r", encoding="utf-8") as f:
for line in f:
if today in line and "변경 감지" in line:
# "[2026-03-26 07:00:15] [law_monitor] [INFO] 변경 감지: 산업안전보건법 — 공포일자 ..."
match = re.search(r"변경 감지: (.+?)$", line)
if match:
changes.append(match.group(1).strip())
return changes
def get_inbox_count() -> int:
"""DEVONthink Inbox 미처리 문서 수"""
script = '''
tell application id "DNtp"
repeat with db in databases
if name of db is "Inbox" then
return count of children of root group of db
end if
end repeat
return 0
end tell
'''
try:
return int(run_applescript_inline(script))
except:
return 0
def create_omnifocus_task(task_name: str, note: str = "", flagged: bool = False):
"""OmniFocus 작업 생성"""
flag_str = "true" if flagged else "false"
escaped_name = task_name.replace('"', '\\"')
escaped_note = note.replace('"', '\\"')
script = f'''
tell application "OmniFocus"
tell default document
make new inbox task with properties {{name:"{escaped_name}", note:"{escaped_note}", flagged:{flag_str}}}
end tell
end tell
'''
try:
run_applescript_inline(script)
logger.info(f"OmniFocus 작업 생성: {task_name}")
except Exception as e:
logger.error(f"OmniFocus 작업 생성 실패: {e}")
def get_system_health() -> dict:
"""각 모듈 로그의 최근 24시간 ERROR 카운트"""
modules = ["law_monitor", "mailplus", "digest", "embed", "auto_classify"]
health = {}
for mod in modules:
log_file = LOGS_DIR / f"{mod}.log"
health[mod] = count_log_errors(log_file, since_hours=24)
return health
def generate_digest():
"""다이제스트 생성"""
logger.info("=== Daily Digest 생성 시작 ===")
today = datetime.now()
date_str = today.strftime("%Y-%m-%d")
# 데이터 수집
dt_stats = get_devonthink_stats()
of_stats = get_omnifocus_stats()
law_changes = parse_law_changes()
inbox_count = get_inbox_count()
system_health = get_system_health()
# 마크다운 생성
md = f"# PKM Daily Digest — {date_str}\n\n"
# DEVONthink 현황
md += "## DEVONthink 변화\n\n"
if dt_stats:
md += "| DB | 신규 | 수정 |\n|---|---|---|\n"
total_added = 0
total_modified = 0
for db_name, counts in dt_stats.items():
md += f"| {db_name} | {counts['added']} | {counts['modified']} |\n"
total_added += counts["added"]
total_modified += counts["modified"]
md += f"| **합계** | **{total_added}** | **{total_modified}** |\n\n"
else:
md += "변화 없음\n\n"
# 법령 변경
md += "## 법령 변경\n\n"
if law_changes:
for change in law_changes:
md += f"- {change}\n"
md += "\n"
else:
md += "변경 없음\n\n"
# OmniFocus 현황
md += "## OmniFocus 현황\n\n"
md += f"- 완료: {of_stats['completed']}\n"
md += f"- 신규: {of_stats['added']}\n"
md += f"- 기한초과: {of_stats['overdue']}\n\n"
# Inbox 상태
md += f"## Inbox 미처리: {inbox_count}\n\n"
# 시스템 상태
md += "## 시스템 상태\n\n"
total_errors = sum(system_health.values())
if total_errors == 0:
md += "모든 모듈 정상\n\n"
else:
md += "| 모듈 | 에러 수 |\n|---|---|\n"
for mod, cnt in system_health.items():
status = f"**{cnt}**" if cnt > 0 else "0"
md += f"| {mod} | {status} |\n"
md += "\n"
# 파일 저장
digest_file = DIGEST_DIR / f"{date_str}_digest.md"
with open(digest_file, "w", encoding="utf-8") as f:
f.write(md)
logger.info(f"다이제스트 저장: {digest_file}")
# DEVONthink 저장
import_digest_to_devonthink(digest_file, date_str)
# OmniFocus 액션 자동 생성
if law_changes:
for change in law_changes:
create_omnifocus_task(f"법령 변경 검토: {change[:30]}", note=change)
if inbox_count >= 3:
create_omnifocus_task(f"Inbox 정리 ({inbox_count}건 미처리)", note="DEVONthink Inbox에 미분류 문서가 쌓여있습니다.")
if of_stats["overdue"] > 0:
create_omnifocus_task(f"기한초과 작업 처리 ({of_stats['overdue']}건)", flagged=True)
# 90일 지난 다이제스트 아카이브
archive_old_digests()
logger.info("=== Daily Digest 완료 ===")
def import_digest_to_devonthink(filepath: Path, date_str: str):
"""다이제스트를 DEVONthink에 저장"""
escaped_path = str(filepath).replace('"', '\\"')
script = f'''
tell application id "DNtp"
repeat with db in databases
if name of db is "00_Note_BOX" then
set targetGroup to create location "/Daily_Digest" in db
import POSIX path "{escaped_path}" to targetGroup
exit repeat
end if
end repeat
end tell
'''
try:
run_applescript_inline(script)
except Exception as e:
logger.error(f"DEVONthink 다이제스트 임포트 실패: {e}")
def archive_old_digests():
"""90일 지난 다이제스트 이동"""
cutoff = datetime.now() - timedelta(days=90)
for f in DIGEST_DIR.glob("*_digest.md"):
try:
date_part = f.stem.split("_digest")[0]
file_date = datetime.strptime(date_part, "%Y-%m-%d")
if file_date < cutoff:
archive_dir = DIGEST_DIR / "archive"
archive_dir.mkdir(exist_ok=True)
f.rename(archive_dir / f.name)
logger.info(f"아카이브: {f.name}")
except ValueError:
pass
if __name__ == "__main__":
generate_digest()

138
scripts/pkm_utils.py Normal file
View File

@@ -0,0 +1,138 @@
"""
PKM 시스템 공통 유틸리티
- 로거 설정 (파일 + 콘솔)
- credentials.env 로딩
- osascript 호출 래퍼
"""
import os
import sys
import logging
import subprocess
from pathlib import Path
from dotenv import load_dotenv
# 프로젝트 루트 디렉토리
PROJECT_ROOT = Path(__file__).parent.parent
LOGS_DIR = PROJECT_ROOT / "logs"
DATA_DIR = PROJECT_ROOT / "data"
SCRIPTS_DIR = PROJECT_ROOT / "scripts"
APPLESCRIPT_DIR = PROJECT_ROOT / "applescript"
# 디렉토리 생성
LOGS_DIR.mkdir(exist_ok=True)
DATA_DIR.mkdir(exist_ok=True)
def setup_logger(name: str) -> logging.Logger:
"""모듈별 로거 설정 — 파일 + 콘솔 핸들러"""
logger = logging.getLogger(name)
if logger.handlers:
return logger # 중복 핸들러 방지
logger.setLevel(logging.DEBUG)
fmt = logging.Formatter("[%(asctime)s] [%(name)s] [%(levelname)s] %(message)s",
datefmt="%Y-%m-%d %H:%M:%S")
# 파일 핸들러
fh = logging.FileHandler(LOGS_DIR / f"{name}.log", encoding="utf-8")
fh.setLevel(logging.DEBUG)
fh.setFormatter(fmt)
logger.addHandler(fh)
# 콘솔 핸들러
ch = logging.StreamHandler(sys.stdout)
ch.setLevel(logging.INFO)
ch.setFormatter(fmt)
logger.addHandler(ch)
return logger
def load_credentials() -> dict:
"""~/.config/pkm/credentials.env 로딩 + 누락 키 경고"""
cred_path = Path.home() / ".config" / "pkm" / "credentials.env"
if not cred_path.exists():
# 폴백: 프로젝트 내 credentials.env (개발용)
cred_path = PROJECT_ROOT / "credentials.env"
if cred_path.exists():
load_dotenv(cred_path)
else:
print(f"[경고] credentials.env를 찾을 수 없습니다: {cred_path}")
keys = {
"CLAUDE_API_KEY": os.getenv("CLAUDE_API_KEY"),
"LAW_OC": os.getenv("LAW_OC"),
"NAS_DOMAIN": os.getenv("NAS_DOMAIN"),
"NAS_TAILSCALE_IP": os.getenv("NAS_TAILSCALE_IP"),
"NAS_PORT": os.getenv("NAS_PORT", "15001"),
"MAILPLUS_HOST": os.getenv("MAILPLUS_HOST"),
"MAILPLUS_PORT": os.getenv("MAILPLUS_PORT", "993"),
"MAILPLUS_USER": os.getenv("MAILPLUS_USER"),
"MAILPLUS_PASS": os.getenv("MAILPLUS_PASS"),
"GPU_SERVER_IP": os.getenv("GPU_SERVER_IP"),
}
missing = [k for k, v in keys.items() if not v and k not in ("GPU_SERVER_IP", "CLAUDE_API_KEY")]
if missing:
print(f"[경고] 누락된 인증 키: {', '.join(missing)}")
return keys
def run_applescript(script_path: str, *args) -> str:
"""osascript 호출 래퍼 + 에러 캡처"""
cmd = ["osascript", str(script_path)] + [str(a) for a in args]
try:
result = subprocess.run(cmd, capture_output=True, text=True, timeout=120)
if result.returncode != 0:
raise RuntimeError(f"AppleScript 에러: {result.stderr.strip()}")
return result.stdout.strip()
except subprocess.TimeoutExpired:
raise RuntimeError(f"AppleScript 타임아웃: {script_path}")
def run_applescript_inline(script: str) -> str:
"""인라인 AppleScript 실행"""
cmd = ["osascript", "-e", script]
try:
result = subprocess.run(cmd, capture_output=True, text=True, timeout=120)
if result.returncode != 0:
raise RuntimeError(f"AppleScript 에러: {result.stderr.strip()}")
return result.stdout.strip()
except subprocess.TimeoutExpired:
raise RuntimeError("AppleScript 타임아웃 (인라인)")
def ollama_generate(prompt: str, model: str = "qwen3.5:35b-a3b-q4_K_M",
host: str = "http://localhost:11434") -> str:
"""Ollama API 호출"""
import requests
resp = requests.post(f"{host}/api/generate", json={
"model": model,
"prompt": prompt,
"stream": False
}, timeout=120)
resp.raise_for_status()
return resp.json().get("response", "")
def count_log_errors(log_file: Path, since_hours: int = 24) -> int:
"""로그 파일에서 최근 N시간 ERROR 카운트"""
from datetime import datetime, timedelta
if not log_file.exists():
return 0
cutoff = datetime.now() - timedelta(hours=since_hours)
count = 0
with open(log_file, "r", encoding="utf-8") as f:
for line in f:
if "[ERROR]" in line:
try:
ts_str = line[1:20] # [YYYY-MM-DD HH:MM:SS]
ts = datetime.strptime(ts_str, "%Y-%m-%d %H:%M:%S")
if ts >= cutoff:
count += 1
except (ValueError, IndexError):
count += 1
return count

View File

@@ -0,0 +1,53 @@
당신은 문서 분류 AI입니다. 아래 문서를 분석하고 반드시 JSON 형식으로만 응답하세요. 다른 텍스트는 출력하지 마세요.
## 응답 형식
{
"tags": ["태그1", "태그2", "태그3"],
"domain_db": "DB이름",
"sub_group": "하위그룹경로",
"sourceChannel": "유입경로",
"dataOrigin": "work 또는 external"
}
## 도메인 DB 선택지 (정확히 이 이름 사용)
- 00_Note_BOX — 일반 메모, 스크랩, 잡다한 노트
- 01_Philosophie — 철학, 사상, 인문학
- 02_Language — 어학, 번역, 언어학
- 03_Engineering — 공학 전반 기술 문서
- 04_Industrial safety — 산업안전, 규정, 인증
- 05_Programming — 개발, 코드, IT 기술
- 07_General Book — 일반 도서, 독서 노트
- 97_Production drawing — 생산 도면, CAD, 설계
- 99_Reference Data — 범용 레퍼런스, 규격표
- 99_Technicalkorea — 한국 기술 규정, 국내 기술 자료
## 하위 그룹 경로 예시 (DB마다 다름)
- 04_Industrial safety: 10_Legislation/Notice, 10_Legislation/Law, 20_Theory, 30_Papers, 40_Cases/Domestic, 50_Practice/Risk_Assessment, 50_Practice/Patrol_Inspection, 50_Practice/Education, 60_Compliance, 70_Safety_Manager, 80_Reference
- 05_Programming: 10_Language, 20_Framework, 30_DevOps, 40_AI_ML
- 03_Engineering: 10_Mechanical, 20_Electrical, 30_Network
- 잘 모르겠으면: 00_Inbox
## 태그 체계
태그는 최대 5개, 한글 사용. 아래 계층 구조 중에서 선택:
- @상태/: 처리중, 검토필요, 완료, 아카이브
- #주제/기술/: 서버관리, 네트워크, AI-ML
- #주제/산업안전/: 법령, 위험성평가, 순회점검, 안전교육, 사고사례, 신고보고, 안전관리자, 보건관리자
- #주제/업무/: 프로젝트, 회의, 보고서
- $유형/: 논문, 법령, 기사, 메모, 이메일, 채팅로그, 도면, 체크리스트
- !우선순위/: 긴급, 중요, 참고
## sourceChannel 값
- tksafety: TKSafety API 업무 실적
- devonagent: DEVONagent 자동 수집 뉴스
- law_monitor: 법령 API 법령 변경
- inbox_route: Inbox AI 분류 (이 프롬프트에 의한 분류)
- email: MailPlus 이메일
- web_clip: Web Clipper 스크랩
- manual: 직접 추가
## dataOrigin 값
- work: 자사 업무 관련 (TK, 테크니컬코리아, 공장, 생산, 사내)
- external: 외부 참고 자료 (뉴스, 논문, 법령, 일반 정보)
## 분류 대상 문서
{document_text}