chore: remove v1 files from main branch

v1 코드는 v1-archive 브랜치 + v1-final 태그로 보존.
필요시 git show v1-final:<파일경로>로 참조 가능.

삭제: applescript/, launchd/, v1 scripts, v1 docs, requirements.txt

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Hyungi Ahn
2026-04-02 09:35:09 +09:00
parent 852b7da797
commit e48b6a2bb4
15 changed files with 0 additions and 3713 deletions

View File

@@ -1,104 +0,0 @@
#!/usr/bin/env python3
"""
벡터 임베딩 스크립트
- DEVONthink 문서 UUID로 텍스트 추출
- GPU 서버(nomic-embed-text)로 임베딩 생성
- ChromaDB에 저장
"""
import os
import sys
import requests
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent))
from pkm_utils import setup_logger, load_credentials, run_applescript_inline
logger = setup_logger("embed")
# ChromaDB 저장 경로
CHROMA_DIR = Path.home() / ".local" / "share" / "pkm" / "chromadb"
CHROMA_DIR.mkdir(parents=True, exist_ok=True)
def get_document_text(uuid: str) -> tuple[str, str]:
"""DEVONthink에서 UUID로 문서 텍스트 + 제목 추출"""
script = f'''
tell application id "DNtp"
set theRecord to get record with uuid "{uuid}"
set docText to plain text of theRecord
set docTitle to name of theRecord
return docTitle & "|||" & docText
end tell
'''
result = run_applescript_inline(script)
parts = result.split("|||", 1)
title = parts[0] if len(parts) > 0 else ""
text = parts[1] if len(parts) > 1 else ""
return title, text
def get_embedding(text: str, gpu_server_ip: str) -> list[float] | None:
"""GPU 서버의 nomic-embed-text로 임베딩 생성"""
url = f"http://{gpu_server_ip}:11434/api/embeddings"
try:
resp = requests.post(url, json={
"model": "nomic-embed-text",
"prompt": text[:8000] # 토큰 제한
}, timeout=60)
resp.raise_for_status()
return resp.json().get("embedding")
except Exception as e:
logger.error(f"임베딩 생성 실패: {e}")
return None
def store_in_chromadb(doc_id: str, title: str, text: str, embedding: list[float]):
"""ChromaDB에 저장"""
import chromadb
client = chromadb.PersistentClient(path=str(CHROMA_DIR))
collection = client.get_or_create_collection(
name="pkm_documents",
metadata={"hnsw:space": "cosine"}
)
collection.upsert(
ids=[doc_id],
embeddings=[embedding],
documents=[text[:2000]],
metadatas=[{"title": title, "source": "devonthink"}]
)
logger.info(f"ChromaDB 저장: {doc_id} ({title[:30]})")
def run(uuid: str):
"""단일 문서 임베딩 처리"""
logger.info(f"임베딩 처리 시작: {uuid}")
creds = load_credentials()
gpu_ip = creds.get("GPU_SERVER_IP")
if not gpu_ip:
logger.warning("GPU_SERVER_IP 미설정 — 임베딩 건너뜀")
return
try:
title, text = get_document_text(uuid)
if not text or len(text) < 10:
logger.warning(f"텍스트 부족 [{uuid}]: {len(text)}")
return
embedding = get_embedding(text, gpu_ip)
if embedding:
store_in_chromadb(uuid, title, text, embedding)
logger.info(f"임베딩 완료: {uuid}")
else:
logger.error(f"임베딩 실패: {uuid}")
except Exception as e:
logger.error(f"임베딩 처리 에러 [{uuid}]: {e}", exc_info=True)
if __name__ == "__main__":
if len(sys.argv) < 2:
print("사용법: python3 embed_to_chroma.py <DEVONthink_UUID>")
sys.exit(1)
run(sys.argv[1])

View File

@@ -1,400 +0,0 @@
#!/usr/bin/env python3
"""
법령 모니터링 스크립트
- 국가법령정보센터 OpenAPI (open.law.go.kr) 폴링
- 산업안전보건법, 중대재해처벌법 등 변경 추적
- 변경 감지 시 DEVONthink 04_Industrial Safety 자동 임포트
※ API 승인 대기중 — 스크립트만 작성, 실제 호출은 승인 후
"""
import os
import sys
import json
import requests
import xml.etree.ElementTree as ET
from datetime import datetime, timedelta
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent))
from pkm_utils import setup_logger, load_credentials, run_applescript_inline, llm_generate, PROJECT_ROOT, DATA_DIR
logger = setup_logger("law_monitor")
# 모니터링 대상 법령
MONITORED_LAWS = [
{"name": "산업안전보건법", "law_id": "001789", "category": "법률"},
{"name": "산업안전보건법 시행령", "law_id": "001790", "category": "대통령령"},
{"name": "산업안전보건법 시행규칙", "law_id": "001791", "category": "부령"},
{"name": "중대재해 처벌 등에 관한 법률", "law_id": "019005", "category": "법률"},
{"name": "중대재해 처벌 등에 관한 법률 시행령", "law_id": "019006", "category": "대통령령"},
{"name": "화학물질관리법", "law_id": "012354", "category": "법률"},
{"name": "위험물안전관리법", "law_id": "001478", "category": "법률"},
]
# 마지막 확인 일자 저장 파일
LAST_CHECK_FILE = DATA_DIR / "law_last_check.json"
LAWS_DIR = DATA_DIR / "laws"
LAWS_DIR.mkdir(exist_ok=True)
def load_last_check() -> dict:
"""마지막 확인 일자 로딩"""
if LAST_CHECK_FILE.exists():
with open(LAST_CHECK_FILE, "r") as f:
return json.load(f)
return {}
def save_last_check(data: dict):
"""마지막 확인 일자 저장"""
with open(LAST_CHECK_FILE, "w") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
def fetch_law_info(law_oc: str, law_id: str) -> dict | None:
"""법령 정보 조회 (법령 API)"""
url = "https://www.law.go.kr/DRF/lawSearch.do"
params = {
"OC": law_oc,
"target": "law",
"type": "JSON",
"MST": law_id,
}
try:
resp = requests.get(url, params=params, timeout=30)
resp.raise_for_status()
data = resp.json()
# API 에러 응답 감지
if "result" in data and "실패" in str(data.get("result", "")):
logger.error(f"법령 API 에러 [{law_id}]: {data.get('result')}{data.get('msg')}")
return None
if "LawSearch" in data and "law" in data["LawSearch"]:
laws = data["LawSearch"]["law"]
if isinstance(laws, list):
return laws[0] if laws else None
return laws
logger.warning(f"법령 응답에 데이터 없음 [{law_id}]: {list(data.keys())}")
return None
except Exception as e:
logger.error(f"법령 조회 실패 [{law_id}]: {e}")
return None
def fetch_law_text(law_oc: str, law_mst: str) -> str | None:
"""법령 본문 XML 다운로드"""
url = "https://www.law.go.kr/DRF/lawService.do"
params = {
"OC": law_oc,
"target": "law",
"type": "XML",
"MST": law_mst,
}
try:
resp = requests.get(url, params=params, timeout=60)
resp.raise_for_status()
return resp.text
except Exception as e:
logger.error(f"법령 본문 다운로드 실패 [{law_mst}]: {e}")
return None
def save_law_file(law_name: str, content: str) -> Path:
"""법령 XML 저장"""
today = datetime.now().strftime("%Y%m%d")
safe_name = law_name.replace(" ", "_").replace("/", "_")
filepath = LAWS_DIR / f"{safe_name}_{today}.xml"
with open(filepath, "w", encoding="utf-8") as f:
f.write(content)
logger.info(f"법령 저장: {filepath}")
return filepath
def import_to_devonthink(filepath: Path, law_name: str, category: str):
"""DEVONthink 04_Industrial Safety로 임포트 — 변수 방식"""
fp = str(filepath)
script = f'set fp to "{fp}"\n'
script += 'tell application id "DNtp"\n'
script += ' repeat with db in databases\n'
script += ' if name of db is "04_Industrial safety" then\n'
script += ' set targetGroup to create location "/10_Legislation/Law" in db\n'
script += ' set theRecord to import fp to targetGroup\n'
script += f' set tags of theRecord to {{"#주제/산업안전/법령", "$유형/법령", "{category}"}}\n'
script += ' add custom meta data "law_monitor" for "sourceChannel" to theRecord\n'
script += ' add custom meta data "external" for "dataOrigin" to theRecord\n'
script += ' add custom meta data (current date) for "lastAIProcess" to theRecord\n'
script += ' exit repeat\n'
script += ' end if\n'
script += ' end repeat\n'
script += 'end tell'
try:
run_applescript_inline(script)
logger.info(f"DEVONthink 임포트 완료: {law_name}")
except Exception as e:
logger.error(f"DEVONthink 임포트 실패 [{law_name}]: {e}")
def run():
"""메인 실행"""
logger.info("=== 법령 모니터링 시작 ===")
creds = load_credentials()
law_oc = creds.get("LAW_OC")
if not law_oc:
logger.error("LAW_OC 인증키가 설정되지 않았습니다. credentials.env를 확인하세요.")
sys.exit(1)
last_check = load_last_check()
changes_found = 0
for law in MONITORED_LAWS:
law_name = law["name"]
law_id = law["law_id"]
category = law["category"]
logger.info(f"확인 중: {law_name} ({law_id})")
info = fetch_law_info(law_oc, law_id)
if not info:
continue
# 시행일자 또는 공포일자로 변경 감지
announce_date = info.get("공포일자", info.get("시행일자", ""))
prev_date = last_check.get(law_id, "")
if announce_date and announce_date != prev_date:
logger.info(f"변경 감지: {law_name} — 공포일자 {announce_date} (이전: {prev_date or '없음'})")
# 법령 본문 다운로드
law_mst = info.get("법령MST", law_id)
text = fetch_law_text(law_oc, law_mst)
if text:
filepath = save_law_file(law_name, text)
import_to_devonthink(filepath, law_name, category)
changes_found += 1
last_check[law_id] = announce_date
else:
logger.debug(f"변경 없음: {law_name}")
save_last_check(last_check)
# ─── 외국 법령 (빈도 체크 후 실행) ───
us_count = fetch_us_osha(last_check)
jp_count = fetch_jp_mhlw(last_check)
eu_count = fetch_eu_osha(last_check)
changes_found += us_count + jp_count + eu_count
save_last_check(last_check)
logger.info(f"=== 법령 모니터링 완료 — {changes_found}건 변경 감지 (한국+외국) ===")
# ═══════════════════════════════════════════════
# 외국 법령 모니터링
# ═══════════════════════════════════════════════
def _should_run(last_check: dict, key: str, interval_days: int) -> bool:
"""빈도 체크: 마지막 실행일로부터 interval_days 경과 여부"""
last_run = last_check.get(key, "")
if not last_run:
return True
try:
last_date = datetime.strptime(last_run, "%Y-%m-%d")
return (datetime.now() - last_date).days >= interval_days
except ValueError:
return True
def _import_foreign_to_devonthink(filepath: Path, title: str, country: str):
"""외국 법령 DEVONthink 임포트 — 변수 방식 (POSIX path 따옴표 문제 회피)"""
folder = {"US": "US", "JP": "JP", "EU": "EU"}.get(country, country)
fp = str(filepath)
script = f'set fp to "{fp}"\n'
script += 'tell application id "DNtp"\n'
script += ' repeat with db in databases\n'
script += ' if name of db is "04_Industrial safety" then\n'
script += f' set targetGroup to create location "/10_Legislation/Foreign/{folder}" in db\n'
script += ' set theRecord to import fp to targetGroup\n'
script += f' set tags of theRecord to {{"#주제/산업안전/법령", "$유형/법령", "{country}"}}\n'
script += ' add custom meta data "law_monitor" for "sourceChannel" to theRecord\n'
script += ' add custom meta data "external" for "dataOrigin" to theRecord\n'
script += ' add custom meta data (current date) for "lastAIProcess" to theRecord\n'
script += ' exit repeat\n'
script += ' end if\n'
script += ' end repeat\n'
script += 'end tell'
try:
run_applescript_inline(script)
safe_title = title[:40].replace('\n', ' ')
logger.info(f"DEVONthink 임포트 [{country}]: {safe_title}")
except Exception as e:
logger.error(f"DEVONthink 임포트 실패 [{country}]: {e}")
def fetch_us_osha(last_check: dict) -> int:
"""US OSHA — Federal Register API (주 1회)"""
if not _should_run(last_check, "_us_osha_last", 7):
logger.debug("US OSHA: 이번 주 이미 실행됨, 건너뜀")
return 0
logger.info("=== US OSHA 확인 ===")
try:
from_date = (datetime.now() - timedelta(days=7)).strftime("%Y-%m-%d")
resp = requests.get("https://www.federalregister.gov/api/v1/documents.json", params={
"conditions[agencies][]": "occupational-safety-and-health-administration",
"conditions[publication_date][gte]": from_date,
"per_page": 10,
"order": "newest",
}, timeout=30)
resp.raise_for_status()
data = resp.json()
results = data.get("results", [])
count = 0
for doc in results:
doc_id = doc.get("document_number", "")
title = doc.get("title", "")
pub_date = doc.get("publication_date", "")
abstract = doc.get("abstract", "")
doc_url = doc.get("html_url", "")
# 마크다운으로 저장
content = f"# {title}\n\n"
content += f"- **Document**: {doc_id}\n"
content += f"- **Date**: {pub_date}\n"
content += f"- **URL**: {doc_url}\n\n"
if abstract:
content += f"## Abstract\n\n{abstract}\n"
safe_title = "".join(c if c.isalnum() or c in " _-" else "_" for c in title)[:50]
filepath = LAWS_DIR / f"US_OSHA_{pub_date}_{safe_title}.md"
with open(filepath, "w", encoding="utf-8") as f:
f.write(content)
_import_foreign_to_devonthink(filepath, title, "US")
count += 1
last_check["_us_osha_last"] = datetime.now().strftime("%Y-%m-%d")
logger.info(f"US OSHA: {count}")
return count
except Exception as e:
logger.error(f"US OSHA 에러: {e}", exc_info=True)
return 0
def fetch_jp_mhlw(last_check: dict) -> int:
"""JP 厚生労働省 — RSS 파싱 + MLX 번역 (주 1회)"""
if not _should_run(last_check, "_jp_mhlw_last", 7):
logger.debug("JP 厚労省: 이번 주 이미 실행됨, 건너뜀")
return 0
logger.info("=== JP 厚生労働省 확인 ===")
try:
import xml.etree.ElementTree as ET
resp = requests.get("https://www.mhlw.go.jp/stf/news.rdf", timeout=30)
resp.raise_for_status()
root = ET.fromstring(resp.content)
safety_keywords = ["労働安全", "安全衛生", "労災", "化学物質", "石綿", "安全管理", "労働", "安全", "衛生"]
rss_ns = "http://purl.org/rss/1.0/"
count = 0
# RDF 1.0 형식: {http://purl.org/rss/1.0/}item
items = root.findall(f"{{{rss_ns}}}item")
logger.info(f"JP RSS 항목: {len(items)}")
for item in items:
title = item.findtext(f"{{{rss_ns}}}title", "")
link = item.findtext(f"{{{rss_ns}}}link", "")
pub_date = item.findtext("pubDate", "")
# 안전위생 키워드 필터
if not any(kw in title for kw in safety_keywords):
continue
# MLX 35B로 한국어 번역
translated = ""
try:
translated = llm_generate(
f"다음 일본어 제목을 한국어로 번역해줘. 번역만 출력하고 다른 말은 하지 마.\n\n{title}"
)
# thinking 출력 제거 — 마지막 줄만 사용
lines = [l.strip() for l in translated.strip().split("\n") if l.strip()]
translated = lines[-1] if lines else title
except Exception:
translated = title
content = f"# {title}\n\n"
content += f"**한국어**: {translated}\n\n"
content += f"- **URL**: {link}\n"
content += f"- **Date**: {pub_date}\n"
safe_title = "".join(c if c.isalnum() or c in " _-" else "_" for c in title)[:40]
today = datetime.now().strftime("%Y%m%d")
filepath = LAWS_DIR / f"JP_{today}_{safe_title}.md"
with open(filepath, "w", encoding="utf-8") as f:
f.write(content)
_import_foreign_to_devonthink(filepath, f"{translated} ({title})", "JP")
count += 1
if count >= 10:
break
last_check["_jp_mhlw_last"] = datetime.now().strftime("%Y-%m-%d")
logger.info(f"JP 厚労省: {count}")
return count
except Exception as e:
logger.error(f"JP 厚労省 에러: {e}", exc_info=True)
return 0
def fetch_eu_osha(last_check: dict) -> int:
"""EU-OSHA — RSS 파싱 (월 1회)"""
if not _should_run(last_check, "_eu_osha_last", 30):
logger.debug("EU-OSHA: 이번 달 이미 실행됨, 건너뜀")
return 0
logger.info("=== EU-OSHA 확인 ===")
try:
import xml.etree.ElementTree as ET
resp = requests.get("https://osha.europa.eu/en/rss.xml", timeout=30)
resp.raise_for_status()
root = ET.fromstring(resp.content)
count = 0
for item in root.iter("item"):
title = item.findtext("title", "")
link = item.findtext("link", "")
description = item.findtext("description", "")
pub_date = item.findtext("pubDate", "")
content = f"# {title}\n\n"
content += f"- **URL**: {link}\n"
content += f"- **Date**: {pub_date}\n\n"
if description:
content += f"## Summary\n\n{description}\n"
safe_title = "".join(c if c.isalnum() or c in " _-" else "" for c in title)[:50].strip() or f"item{count+1}"
today = datetime.now().strftime("%Y%m%d")
filepath = LAWS_DIR / f"EU_{today}_{count+1:02d}_{safe_title}.md"
with open(filepath, "w", encoding="utf-8") as f:
f.write(content)
_import_foreign_to_devonthink(filepath, title, "EU")
count += 1
if count >= 5:
break
last_check["_eu_osha_last"] = datetime.now().strftime("%Y-%m-%d")
logger.info(f"EU-OSHA: {count}")
return count
except Exception as e:
logger.error(f"EU-OSHA 에러: {e}", exc_info=True)
return 0
if __name__ == "__main__":
run()

View File

@@ -1,209 +0,0 @@
#!/usr/bin/env python3
"""
MailPlus → DEVONthink Archive DB 이메일 수집
- Synology MailPlus IMAP 접속
- 마지막 동기화 이후 새 메일 가져오기
- DEVONthink Archive DB 임포트
"""
import os
import sys
import imaplib
import email
from email.header import decode_header
from datetime import datetime
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent))
from pkm_utils import setup_logger, load_credentials, run_applescript_inline, DATA_DIR
logger = setup_logger("mailplus")
LAST_UID_FILE = DATA_DIR / "mailplus_last_uid.txt"
MAIL_TMP_DIR = DATA_DIR / "mail_tmp"
MAIL_TMP_DIR.mkdir(exist_ok=True)
# 안전 관련 키워드 (dataOrigin 판별용)
SAFETY_KEYWORDS = [
"안전", "위험", "사고", "재해", "점검", "보건", "화학물질",
"OSHA", "safety", "hazard", "incident", "KOSHA"
]
def decode_mime_header(value: str) -> str:
"""MIME 헤더 디코딩"""
if not value:
return ""
decoded_parts = decode_header(value)
result = []
for part, charset in decoded_parts:
if isinstance(part, bytes):
result.append(part.decode(charset or "utf-8", errors="replace"))
else:
result.append(part)
return " ".join(result)
def load_last_uid() -> int:
"""마지막 처리 UID 로딩"""
if LAST_UID_FILE.exists():
return int(LAST_UID_FILE.read_text().strip())
return 0
def save_last_uid(uid: int):
"""마지막 처리 UID 저장"""
LAST_UID_FILE.write_text(str(uid))
def detect_data_origin(subject: str, body: str) -> str:
"""안전 키워드 감지로 dataOrigin 판별"""
text = (subject + " " + body).lower()
for kw in SAFETY_KEYWORDS:
if kw.lower() in text:
return "work"
return "external"
def save_email_file(msg: email.message.Message, uid: int) -> Path:
"""이메일을 .eml 파일로 저장"""
subject = decode_mime_header(msg.get("Subject", ""))
safe_subject = "".join(c if c.isalnum() or c in " _-" else "_" for c in subject)[:50]
date_str = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"{date_str}_{uid}_{safe_subject}.eml"
filepath = MAIL_TMP_DIR / filename
with open(filepath, "wb") as f:
f.write(msg.as_bytes())
return filepath
def get_email_body(msg: email.message.Message) -> str:
"""이메일 본문 추출"""
body = ""
if msg.is_multipart():
for part in msg.walk():
if part.get_content_type() == "text/plain":
payload = part.get_payload(decode=True)
if payload:
charset = part.get_content_charset() or "utf-8"
body += payload.decode(charset, errors="replace")
else:
payload = msg.get_payload(decode=True)
if payload:
charset = msg.get_content_charset() or "utf-8"
body = payload.decode(charset, errors="replace")
return body[:2000]
def import_to_devonthink(filepath: Path, subject: str, data_origin: str):
"""DEVONthink Archive DB로 임포트"""
escaped_path = str(filepath).replace('"', '\\"')
escaped_subject = subject.replace('"', '\\"').replace("'", "\\'")
script = f'''
tell application id "DNtp"
set targetDB to missing value
repeat with db in databases
if name of db is "Archive" then
set targetDB to db
exit repeat
end if
end repeat
if targetDB is not missing value then
set targetGroup to create location "/Email" in targetDB
set theRecord to import POSIX path "{escaped_path}" to targetGroup
add custom meta data "email" for "sourceChannel" to theRecord
add custom meta data "{data_origin}" for "dataOrigin" to theRecord
add custom meta data (current date) for "lastAIProcess" to theRecord
end if
end tell
'''
try:
run_applescript_inline(script)
logger.info(f"DEVONthink 임포트: {subject[:40]}")
except Exception as e:
logger.error(f"DEVONthink 임포트 실패: {e}")
def run():
"""메인 실행"""
logger.info("=== MailPlus 이메일 수집 시작 ===")
creds = load_credentials()
host = creds.get("MAILPLUS_HOST")
port = int(creds.get("MAILPLUS_PORT", "993"))
user = creds.get("MAILPLUS_USER")
password = creds.get("MAILPLUS_PASS")
if not all([host, user, password]):
logger.error("MAILPLUS 접속 정보가 불완전합니다. credentials.env를 확인하세요.")
sys.exit(1)
last_uid = load_last_uid()
logger.info(f"마지막 처리 UID: {last_uid}")
try:
# IMAP SSL 접속
mail = imaplib.IMAP4_SSL(host, port)
mail.login(user, password)
mail.select("INBOX")
logger.info("IMAP 접속 성공")
# 마지막 UID 이후 메일 검색
if last_uid > 0:
status, data = mail.uid("search", None, f"UID {last_uid + 1}:*")
else:
# 최초 실행: 최근 7일치만
from datetime import timedelta
since = (datetime.now() - timedelta(days=7)).strftime("%d-%b-%Y")
status, data = mail.uid("search", None, f"SINCE {since}")
if status != "OK":
logger.error(f"메일 검색 실패: {status}")
mail.logout()
sys.exit(1)
uids = data[0].split()
logger.info(f"새 메일: {len(uids)}")
max_uid = last_uid
imported = 0
for uid_bytes in uids:
uid = int(uid_bytes)
if uid <= last_uid:
continue
status, msg_data = mail.uid("fetch", uid_bytes, "(RFC822)")
if status != "OK":
continue
raw_email = msg_data[0][1]
msg = email.message_from_bytes(raw_email)
subject = decode_mime_header(msg.get("Subject", "(제목 없음)"))
body = get_email_body(msg)
data_origin = detect_data_origin(subject, body)
filepath = save_email_file(msg, uid)
import_to_devonthink(filepath, subject, data_origin)
max_uid = max(max_uid, uid)
imported += 1
if max_uid > last_uid:
save_last_uid(max_uid)
mail.logout()
logger.info(f"=== MailPlus 수집 완료 — {imported}건 임포트 ===")
except imaplib.IMAP4.error as e:
logger.error(f"IMAP 에러: {e}")
sys.exit(1)
except Exception as e:
logger.error(f"예상치 못한 에러: {e}", exc_info=True)
sys.exit(1)
if __name__ == "__main__":
run()

View File

@@ -1,284 +0,0 @@
#!/usr/bin/env python3
"""
PKM 일일 다이제스트
- DEVONthink 오늘 추가/수정 집계
- law_monitor 법령 변경 건 파싱
- OmniFocus 완료/추가/기한초과 집계
- 상위 뉴스 Ollama 요약
- OmniFocus 액션 자동 생성
- 90일 지난 다이제스트 아카이브
"""
import os
import sys
import re
from datetime import datetime, timedelta
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent))
from pkm_utils import (
setup_logger, load_credentials, run_applescript_inline,
ollama_generate, count_log_errors, PROJECT_ROOT, LOGS_DIR, DATA_DIR
)
logger = setup_logger("digest")
DIGEST_DIR = DATA_DIR / "digests"
DIGEST_DIR.mkdir(exist_ok=True)
def get_devonthink_stats() -> dict:
"""DEVONthink 오늘 추가/수정 문서 집계"""
script = '''
tell application id "DNtp"
set today to current date
set time of today to 0
set stats to {}
repeat with db in databases
set dbName to name of db
set addedCount to count of (search "date:today" in db)
set modifiedCount to count of (search "modified:today" in db)
if addedCount > 0 or modifiedCount > 0 then
set end of stats to dbName & ":" & addedCount & ":" & modifiedCount
end if
end repeat
set AppleScript's text item delimiters to "|"
return stats as text
end tell
'''
try:
result = run_applescript_inline(script)
stats = {}
if result:
for item in result.split("|"):
parts = item.split(":")
if len(parts) == 3:
stats[parts[0]] = {"added": int(parts[1]), "modified": int(parts[2])}
return stats
except Exception as e:
logger.error(f"DEVONthink 집계 실패: {e}")
return {}
def get_omnifocus_stats() -> dict:
"""OmniFocus 오늘 완료/추가/기한초과 집계"""
script = '''
tell application "OmniFocus"
tell default document
set today to current date
set time of today to 0
set tomorrow to today + 1 * days
set completedCount to count of (every flattened task whose completed is true and completion date ≥ today)
set addedCount to count of (every flattened task whose creation date ≥ today)
set overdueCount to count of (every flattened task whose completed is false and due date < today and due date is not missing value)
return (completedCount as text) & "|" & (addedCount as text) & "|" & (overdueCount as text)
end tell
end tell
'''
try:
result = run_applescript_inline(script)
parts = result.split("|")
return {
"completed": int(parts[0]) if len(parts) > 0 else 0,
"added": int(parts[1]) if len(parts) > 1 else 0,
"overdue": int(parts[2]) if len(parts) > 2 else 0,
}
except Exception as e:
logger.error(f"OmniFocus 집계 실패: {e}")
return {"completed": 0, "added": 0, "overdue": 0}
def parse_law_changes() -> list:
"""law_monitor 로그에서 오늘 법령 변경 건 파싱"""
log_file = LOGS_DIR / "law_monitor.log"
if not log_file.exists():
return []
today = datetime.now().strftime("%Y-%m-%d")
changes = []
with open(log_file, "r", encoding="utf-8") as f:
for line in f:
if today in line and "변경 감지" in line:
# "[2026-03-26 07:00:15] [law_monitor] [INFO] 변경 감지: 산업안전보건법 — 공포일자 ..."
match = re.search(r"변경 감지: (.+?)$", line)
if match:
changes.append(match.group(1).strip())
return changes
def get_inbox_count() -> int:
"""DEVONthink Inbox 미처리 문서 수"""
script = '''
tell application id "DNtp"
repeat with db in databases
if name of db is "Inbox" then
return count of children of root group of db
end if
end repeat
return 0
end tell
'''
try:
return int(run_applescript_inline(script))
except:
return 0
def create_omnifocus_task(task_name: str, note: str = "", flagged: bool = False):
"""OmniFocus 작업 생성"""
flag_str = "true" if flagged else "false"
escaped_name = task_name.replace('"', '\\"')
escaped_note = note.replace('"', '\\"')
script = f'''
tell application "OmniFocus"
tell default document
make new inbox task with properties {{name:"{escaped_name}", note:"{escaped_note}", flagged:{flag_str}}}
end tell
end tell
'''
try:
run_applescript_inline(script)
logger.info(f"OmniFocus 작업 생성: {task_name}")
except Exception as e:
logger.error(f"OmniFocus 작업 생성 실패: {e}")
def get_system_health() -> dict:
"""각 모듈 로그의 최근 24시간 ERROR 카운트"""
modules = ["law_monitor", "mailplus", "digest", "embed", "auto_classify"]
health = {}
for mod in modules:
log_file = LOGS_DIR / f"{mod}.log"
health[mod] = count_log_errors(log_file, since_hours=24)
return health
def generate_digest():
"""다이제스트 생성"""
logger.info("=== Daily Digest 생성 시작 ===")
today = datetime.now()
date_str = today.strftime("%Y-%m-%d")
# 데이터 수집
dt_stats = get_devonthink_stats()
of_stats = get_omnifocus_stats()
law_changes = parse_law_changes()
inbox_count = get_inbox_count()
system_health = get_system_health()
# 마크다운 생성
md = f"# PKM Daily Digest — {date_str}\n\n"
# DEVONthink 현황
md += "## DEVONthink 변화\n\n"
if dt_stats:
md += "| DB | 신규 | 수정 |\n|---|---|---|\n"
total_added = 0
total_modified = 0
for db_name, counts in dt_stats.items():
md += f"| {db_name} | {counts['added']} | {counts['modified']} |\n"
total_added += counts["added"]
total_modified += counts["modified"]
md += f"| **합계** | **{total_added}** | **{total_modified}** |\n\n"
else:
md += "변화 없음\n\n"
# 법령 변경
md += "## 법령 변경\n\n"
if law_changes:
for change in law_changes:
md += f"- {change}\n"
md += "\n"
else:
md += "변경 없음\n\n"
# OmniFocus 현황
md += "## OmniFocus 현황\n\n"
md += f"- 완료: {of_stats['completed']}\n"
md += f"- 신규: {of_stats['added']}\n"
md += f"- 기한초과: {of_stats['overdue']}\n\n"
# Inbox 상태
md += f"## Inbox 미처리: {inbox_count}\n\n"
# 시스템 상태
md += "## 시스템 상태\n\n"
total_errors = sum(system_health.values())
if total_errors == 0:
md += "모든 모듈 정상\n\n"
else:
md += "| 모듈 | 에러 수 |\n|---|---|\n"
for mod, cnt in system_health.items():
status = f"**{cnt}**" if cnt > 0 else "0"
md += f"| {mod} | {status} |\n"
md += "\n"
# 파일 저장
digest_file = DIGEST_DIR / f"{date_str}_digest.md"
with open(digest_file, "w", encoding="utf-8") as f:
f.write(md)
logger.info(f"다이제스트 저장: {digest_file}")
# DEVONthink 저장
import_digest_to_devonthink(digest_file, date_str)
# OmniFocus 액션 자동 생성
if law_changes:
for change in law_changes:
create_omnifocus_task(f"법령 변경 검토: {change[:30]}", note=change)
if inbox_count >= 3:
create_omnifocus_task(f"Inbox 정리 ({inbox_count}건 미처리)", note="DEVONthink Inbox에 미분류 문서가 쌓여있습니다.")
if of_stats["overdue"] > 0:
create_omnifocus_task(f"기한초과 작업 처리 ({of_stats['overdue']}건)", flagged=True)
# 90일 지난 다이제스트 아카이브
archive_old_digests()
logger.info("=== Daily Digest 완료 ===")
def import_digest_to_devonthink(filepath: Path, date_str: str):
"""다이제스트를 DEVONthink에 저장"""
escaped_path = str(filepath).replace('"', '\\"')
script = f'''
tell application id "DNtp"
repeat with db in databases
if name of db is "00_Note_BOX" then
set targetGroup to create location "/Daily_Digest" in db
import POSIX path "{escaped_path}" to targetGroup
exit repeat
end if
end repeat
end tell
'''
try:
run_applescript_inline(script)
except Exception as e:
logger.error(f"DEVONthink 다이제스트 임포트 실패: {e}")
def archive_old_digests():
"""90일 지난 다이제스트 이동"""
cutoff = datetime.now() - timedelta(days=90)
for f in DIGEST_DIR.glob("*_digest.md"):
try:
date_part = f.stem.split("_digest")[0]
file_date = datetime.strptime(date_part, "%Y-%m-%d")
if file_date < cutoff:
archive_dir = DIGEST_DIR / "archive"
archive_dir.mkdir(exist_ok=True)
f.rename(archive_dir / f.name)
logger.info(f"아카이브: {f.name}")
except ValueError:
pass
if __name__ == "__main__":
generate_digest()

View File

@@ -1,161 +0,0 @@
"""
PKM 시스템 공통 유틸리티
- 로거 설정 (파일 + 콘솔)
- credentials.env 로딩
- osascript 호출 래퍼
"""
import os
import sys
import logging
import subprocess
from pathlib import Path
from dotenv import load_dotenv
# 프로젝트 루트 디렉토리
PROJECT_ROOT = Path(__file__).parent.parent
LOGS_DIR = PROJECT_ROOT / "logs"
DATA_DIR = PROJECT_ROOT / "data"
SCRIPTS_DIR = PROJECT_ROOT / "scripts"
APPLESCRIPT_DIR = PROJECT_ROOT / "applescript"
# 디렉토리 생성
LOGS_DIR.mkdir(exist_ok=True)
DATA_DIR.mkdir(exist_ok=True)
def setup_logger(name: str) -> logging.Logger:
"""모듈별 로거 설정 — 파일 + 콘솔 핸들러"""
logger = logging.getLogger(name)
if logger.handlers:
return logger # 중복 핸들러 방지
logger.setLevel(logging.DEBUG)
fmt = logging.Formatter("[%(asctime)s] [%(name)s] [%(levelname)s] %(message)s",
datefmt="%Y-%m-%d %H:%M:%S")
# 파일 핸들러
fh = logging.FileHandler(LOGS_DIR / f"{name}.log", encoding="utf-8")
fh.setLevel(logging.DEBUG)
fh.setFormatter(fmt)
logger.addHandler(fh)
# 콘솔 핸들러
ch = logging.StreamHandler(sys.stdout)
ch.setLevel(logging.INFO)
ch.setFormatter(fmt)
logger.addHandler(ch)
return logger
def load_credentials() -> dict:
"""~/.config/pkm/credentials.env 로딩 + 누락 키 경고"""
cred_path = Path.home() / ".config" / "pkm" / "credentials.env"
if not cred_path.exists():
# 폴백: 프로젝트 내 credentials.env (개발용)
cred_path = PROJECT_ROOT / "credentials.env"
if cred_path.exists():
load_dotenv(cred_path)
else:
print(f"[경고] credentials.env를 찾을 수 없습니다: {cred_path}")
keys = {
"CLAUDE_API_KEY": os.getenv("CLAUDE_API_KEY"),
"LAW_OC": os.getenv("LAW_OC"),
"NAS_DOMAIN": os.getenv("NAS_DOMAIN"),
"NAS_TAILSCALE_IP": os.getenv("NAS_TAILSCALE_IP"),
"NAS_PORT": os.getenv("NAS_PORT", "15001"),
"MAILPLUS_HOST": os.getenv("MAILPLUS_HOST"),
"MAILPLUS_PORT": os.getenv("MAILPLUS_PORT", "993"),
"MAILPLUS_USER": os.getenv("MAILPLUS_USER"),
"MAILPLUS_PASS": os.getenv("MAILPLUS_PASS"),
"GPU_SERVER_IP": os.getenv("GPU_SERVER_IP"),
}
missing = [k for k, v in keys.items() if not v and k not in ("GPU_SERVER_IP", "CLAUDE_API_KEY")]
if missing:
print(f"[경고] 누락된 인증 키: {', '.join(missing)}")
return keys
def run_applescript(script_path: str, *args) -> str:
"""osascript 호출 래퍼 + 에러 캡처"""
cmd = ["osascript", str(script_path)] + [str(a) for a in args]
try:
result = subprocess.run(cmd, capture_output=True, text=True, timeout=120)
if result.returncode != 0:
raise RuntimeError(f"AppleScript 에러: {result.stderr.strip()}")
return result.stdout.strip()
except subprocess.TimeoutExpired:
raise RuntimeError(f"AppleScript 타임아웃: {script_path}")
def run_applescript_inline(script: str) -> str:
"""인라인 AppleScript 실행 — 단일 -e 방식"""
cmd = ["osascript", "-e", script]
try:
result = subprocess.run(cmd, capture_output=True, text=True, timeout=120)
if result.returncode != 0:
raise RuntimeError(f"AppleScript 에러: {result.stderr.strip()}")
return result.stdout.strip()
except subprocess.TimeoutExpired:
raise RuntimeError("AppleScript 타임아웃 (인라인)")
def llm_generate(prompt: str, model: str = "mlx-community/Qwen3.5-35B-A3B-4bit",
host: str = "http://localhost:8800", json_mode: bool = False) -> str:
"""MLX 서버 API 호출 (OpenAI 호환)"""
import requests
messages = [{"role": "user", "content": prompt}]
resp = requests.post(f"{host}/v1/chat/completions", json={
"model": model,
"messages": messages,
"temperature": 0.3,
"max_tokens": 4096,
}, timeout=300)
resp.raise_for_status()
content = resp.json()["choices"][0]["message"]["content"]
if not json_mode:
return content
# JSON 모드: thinking 허용 → 마지막 유효 JSON 객체 추출
import re
import json as _json
# 배열이 포함된 JSON 객체 매칭
all_jsons = re.findall(r'\{[^{}]*(?:\[[^\]]*\])?[^{}]*\}', content)
for j in reversed(all_jsons):
try:
parsed = _json.loads(j)
if any(k in parsed for k in ("domain_db", "tags", "domain", "classification")):
return j
except _json.JSONDecodeError:
continue
# 폴백: 전체에서 가장 큰 JSON 추출
json_match = re.search(r'\{[\s\S]*\}', content)
return json_match.group(0) if json_match else content
# 하위호환 별칭
ollama_generate = llm_generate
def count_log_errors(log_file: Path, since_hours: int = 24) -> int:
"""로그 파일에서 최근 N시간 ERROR 카운트"""
from datetime import datetime, timedelta
if not log_file.exists():
return 0
cutoff = datetime.now() - timedelta(hours=since_hours)
count = 0
with open(log_file, "r", encoding="utf-8") as f:
for line in f:
if "[ERROR]" in line:
try:
ts_str = line[1:20] # [YYYY-MM-DD HH:MM:SS]
ts = datetime.strptime(ts_str, "%Y-%m-%d %H:%M:%S")
if ts >= cutoff:
count += 1
except (ValueError, IndexError):
count += 1
return count