Files
syn-chat-bot/morning_briefing.py
Hyungi Ahn 782caf5130 feat: DEVONthink 제거 + 모닝 브리핑 추가
- DEVONthink 의존성 제거 → kb_writer 전환 (news_digest, inbox_processor, mail pipeline)
- devonthink_bridge.py, plist 삭제
- morning_briefing.py 신규 (매일 07:30, 일정·메일·보고·뉴스 → Synology Chat)
- intent_service.py 분류기 프롬프트 개선 + 키워드 fallback
- migrate-v5.sql (news_digest_log kb_path 컬럼)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-19 14:12:38 +09:00

281 lines
9.0 KiB
Python

"""모닝 브리핑 — 일정·메일·보고·뉴스 요약 → Synology Chat (LaunchAgent, 매일 07:30)"""
import json
import logging
import os
from datetime import datetime, timedelta, timezone
import httpx
from dotenv import load_dotenv
load_dotenv()
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
logger = logging.getLogger("morning_briefing")
CALDAV_BRIDGE_URL = os.getenv("CALDAV_BRIDGE_URL", "http://127.0.0.1:8092")
KARAKEEP_URL = os.getenv("KARAKEEP_URL", "http://localhost:3000")
KARAKEEP_API_KEY = os.getenv("KARAKEEP_API_KEY", "")
GPU_OLLAMA_URL = os.getenv("GPU_OLLAMA_URL", "http://192.168.1.186:11434")
SYNOLOGY_CHAT_WEBHOOK_URL = os.getenv("SYNOLOGY_CHAT_WEBHOOK_URL", "")
PG_HOST = os.getenv("PG_HOST", "127.0.0.1")
PG_PORT = int(os.getenv("PG_PORT", "15478"))
PG_USER = os.getenv("POSTGRES_USER", "bot")
PG_PASS = os.getenv("POSTGRES_PASSWORD", "")
PG_DB = os.getenv("POSTGRES_DB", "chatbot")
KST = timezone(timedelta(hours=9))
WEEKDAYS = ["", "", "", "", "", "", ""]
def get_db_connection():
import psycopg2
return psycopg2.connect(
host=PG_HOST, port=PG_PORT,
user=PG_USER, password=PG_PASS, dbname=PG_DB,
)
def fetch_today_events() -> list[dict]:
"""CalDAV 브릿지에서 오늘 일정 조회."""
now = datetime.now(KST)
start = now.replace(hour=0, minute=0, second=0, microsecond=0).isoformat()
end = now.replace(hour=23, minute=59, second=59, microsecond=0).isoformat()
try:
resp = httpx.post(
f"{CALDAV_BRIDGE_URL}/calendar/query",
json={"start": start, "end": end},
timeout=10,
)
data = resp.json()
if not data.get("success"):
return []
events = data.get("events", [])
events.sort(key=lambda e: e.get("start", ""))
return events
except Exception as e:
logger.error(f"CalDAV fetch failed: {e}")
return []
def fetch_important_mails() -> list[dict]:
"""DB에서 최근 24시간 업무 메일 조회."""
try:
conn = get_db_connection()
with conn.cursor() as cur:
cur.execute(
"SELECT from_address, subject, summary FROM mail_logs "
"WHERE label = '업무' AND mail_date > NOW() - INTERVAL '24 hours' "
"ORDER BY mail_date DESC LIMIT 10"
)
rows = cur.fetchall()
conn.close()
return [{"from": r[0], "subject": r[1], "summary": r[2]} for r in rows]
except Exception as e:
logger.error(f"Mail fetch failed: {e}")
return []
def fetch_open_reports() -> list[dict]:
"""DB에서 미해결 현장보고 조회."""
try:
conn = get_db_connection()
with conn.cursor() as cur:
cur.execute(
"SELECT category, description, created_at FROM field_reports "
"WHERE status = 'open' "
"ORDER BY created_at DESC LIMIT 10"
)
rows = cur.fetchall()
conn.close()
return [{"category": r[0], "description": r[1], "created_at": r[2]} for r in rows]
except Exception as e:
logger.error(f"Reports fetch failed: {e}")
return []
def fetch_news(since: datetime) -> list[dict]:
"""Karakeep에서 최근 24시간 뉴스 조회."""
headers = {"Authorization": f"Bearer {KARAKEEP_API_KEY}"} if KARAKEEP_API_KEY else {}
try:
resp = httpx.get(
f"{KARAKEEP_URL}/api/v1/bookmarks",
params={"limit": 50},
headers=headers,
timeout=15,
)
resp.raise_for_status()
data = resp.json()
bookmarks = data.get("bookmarks", data if isinstance(data, list) else [])
articles = []
for bm in bookmarks:
created = bm.get("createdAt") or bm.get("created_at") or ""
if created:
try:
dt = datetime.fromisoformat(created.replace("Z", "+00:00"))
if dt < since:
continue
except ValueError:
pass
title = bm.get("title") or bm.get("content", {}).get("title", "")
if title:
articles.append({"title": title})
return articles
except Exception as e:
logger.error(f"Karakeep fetch failed: {e}")
return []
def summarize_news(articles: list[dict]) -> list[str]:
"""LLM으로 뉴스 한줄 요약. 실패 시 원본 제목 fallback."""
if not articles:
return []
titles = [a["title"] for a in articles[:5]]
prompt = "다음 뉴스 제목들을 각각 한 줄로 짧게 한국어 요약하세요. 번호 없이 줄바꿈으로 구분.\n\n" + "\n".join(titles)
try:
resp = httpx.post(
f"{GPU_OLLAMA_URL}/api/generate",
json={
"model": "id-9b:latest",
"system": "/no_think",
"prompt": prompt,
"stream": False,
"think": False,
},
timeout=15,
)
lines = [l.strip() for l in resp.json().get("response", "").strip().split("\n") if l.strip()]
if lines:
return lines[:5]
except Exception as e:
logger.error(f"News summarize failed: {e}")
return titles[:5]
def format_briefing(events: list | None, mails: list | None,
reports: list | None, news: list[str] | None) -> str | None:
"""브리핑 텍스트 조립. 전체 데이터 없으면 None."""
now = datetime.now(KST)
weekday = WEEKDAYS[now.weekday()]
header = f"[모닝 브리핑] {now.strftime('%Y-%m-%d')} ({weekday})"
sections = []
# 일정
if events:
limit = 10
lines = []
for e in events[:limit]:
start = e.get("start", "")
time_str = start[11:16] if len(start) >= 16 else ""
title = e.get("summary") or e.get("title", "")
location = e.get("location", "")
entry = f"- {time_str} {title}" if time_str else f"- {title}"
if location:
entry += f" ({location})"
lines.append(entry)
count_str = f"{len(events)}"
if len(events) > limit:
count_str = f"{limit}건, 외 {len(events) - limit}"
sections.append(f"[일정] 오늘 ({count_str})\n" + "\n".join(lines))
# 메일
if mails:
limit = 5
lines = []
for m in mails[:limit]:
sender = m.get("from", "").split("<")[0].strip().strip('"')
lines.append(f"- {sender}: {m.get('subject', '')}")
count_str = f"{len(mails)}"
if len(mails) > limit:
count_str = f"{limit}건, 외 {len(mails) - limit}"
sections.append(f"[메일] 주요 ({count_str})\n" + "\n".join(lines))
# 보고
if reports:
limit = 5
lines = []
for r in reports[:limit]:
cat = r.get("category", "")
desc = (r.get("description") or "")[:50]
created = r.get("created_at")
date_str = ""
if created:
if isinstance(created, datetime):
date_str = created.strftime("%m/%d")
else:
date_str = str(created)[:10]
entry = f"- [{cat}] {desc}"
if date_str:
entry += f" -- {date_str} 접수"
lines.append(entry)
count_str = f"{len(reports)}"
if len(reports) > limit:
count_str = f"{limit}건, 외 {len(reports) - limit}"
sections.append(f"[보고] 미해결 ({count_str})\n" + "\n".join(lines))
# 뉴스
if news:
lines = [f"- {n}" for n in news[:5]]
count_str = f"{len(news)}"
sections.append(f"[뉴스] ({count_str})\n" + "\n".join(lines))
if not sections:
return None
return header + "\n\n" + "\n\n".join(sections)
def send_briefing(text: str) -> None:
"""Synology Chat 웹훅으로 브리핑 전송."""
if not SYNOLOGY_CHAT_WEBHOOK_URL:
logger.warning("SYNOLOGY_CHAT_WEBHOOK_URL not set")
return
try:
httpx.post(
SYNOLOGY_CHAT_WEBHOOK_URL,
data={"payload": json.dumps({"text": text})},
verify=False,
timeout=10,
)
logger.info("Briefing sent to Synology Chat")
except Exception as e:
logger.error(f"Chat send failed: {e}")
def main():
logger.info("Morning briefing started")
since = datetime.now(KST) - timedelta(hours=24)
# 데이터 수집 (각각 독립, 실패해도 계속)
events = fetch_today_events() or None
mails = fetch_important_mails() or None
reports = fetch_open_reports() or None
news_articles = fetch_news(since)
news_lines = summarize_news(news_articles) if news_articles else None
text = format_briefing(events, mails, reports, news_lines)
if not text:
logger.info("No data for briefing — skipping")
return
send_briefing(text)
logger.info("Morning briefing complete")
if __name__ == "__main__":
main()