"""뉴스 다이제스트 — Karakeep → 번역·요약 → 전달 (LaunchAgent, 매일 07:00)""" import json import logging import os from datetime import datetime, timedelta, timezone import httpx from dotenv import load_dotenv load_dotenv() logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s") logger = logging.getLogger("news_digest") KARAKEEP_URL = os.getenv("KARAKEEP_URL", "http://localhost:3000") KARAKEEP_API_KEY = os.getenv("KARAKEEP_API_KEY", "") ANTHROPIC_API_KEY = os.getenv("ANTHROPIC_API_KEY", "") GPU_OLLAMA_URL = os.getenv("GPU_OLLAMA_URL", "http://192.168.1.186:11434") LOCAL_OLLAMA_URL = os.getenv("LOCAL_OLLAMA_URL", "http://127.0.0.1:11434") QDRANT_URL = os.getenv("QDRANT_URL", "http://127.0.0.1:6333") SYNOLOGY_CHAT_WEBHOOK_URL = os.getenv("SYNOLOGY_CHAT_WEBHOOK_URL", "") DEVONTHINK_BRIDGE_URL = os.getenv("DEVONTHINK_BRIDGE_URL", "http://127.0.0.1:8093") # Postgres 연결 (직접 접속) PG_HOST = os.getenv("PG_HOST", "127.0.0.1") PG_PORT = int(os.getenv("PG_PORT", "15478")) PG_USER = os.getenv("POSTGRES_USER", "bot") PG_PASS = os.getenv("POSTGRES_PASSWORD", "") PG_DB = os.getenv("POSTGRES_DB", "chatbot") KST = timezone(timedelta(hours=9)) def get_db_connection(): import psycopg2 return psycopg2.connect( host=PG_HOST, port=PG_PORT, user=PG_USER, password=PG_PASS, dbname=PG_DB, ) def fetch_new_bookmarks(since: datetime) -> list[dict]: """Karakeep API에서 최근 북마크 가져오기.""" headers = {"Authorization": f"Bearer {KARAKEEP_API_KEY}"} if KARAKEEP_API_KEY else {} try: resp = httpx.get( f"{KARAKEEP_URL}/api/v1/bookmarks", params={"limit": 50}, headers=headers, timeout=15, ) resp.raise_for_status() data = resp.json() bookmarks = data.get("bookmarks", data if isinstance(data, list) else []) new_items = [] for bm in bookmarks: created = bm.get("createdAt") or bm.get("created_at") or "" if created: try: dt = datetime.fromisoformat(created.replace("Z", "+00:00")) if dt < since: continue except ValueError: pass url = bm.get("url") or bm.get("content", {}).get("url", "") title = bm.get("title") or bm.get("content", {}).get("title", "") content = bm.get("content", {}).get("text", "") or bm.get("summary", "") or "" source = bm.get("source", "") if url: new_items.append({ "url": url, "title": title, "content": content[:5000], "source": source, }) return new_items except Exception as e: logger.error(f"Karakeep fetch failed: {e}") return [] def detect_language(text: str) -> str: """간단한 언어 감지.""" if any('\u3040' <= c <= '\u309f' or '\u30a0' <= c <= '\u30ff' for c in text[:200]): return "ja" if any('\u00c0' <= c <= '\u024f' for c in text[:200]) and any(w in text.lower() for w in ["le ", "la ", "les ", "de ", "des ", "un ", "une "]): return "fr" if any('\uac00' <= c <= '\ud7af' for c in text[:200]): return "ko" return "en" def translate_and_summarize(title: str, content: str, lang: str) -> dict: """Haiku로 번역 + 요약.""" if lang == "ko": # 한국어는 번역 불필요, 요약만 try: resp = httpx.post( f"{GPU_OLLAMA_URL}/api/generate", json={ "model": "qwen3.5:9b-q8_0", "prompt": f"다음 기사를 2~3문장으로 요약하세요:\n\n제목: {title}\n본문: {content[:3000]}", "stream": False, "think": False, }, timeout=15, ) summary = resp.json().get("response", title) return {"title_ko": title, "summary_ko": summary} except Exception: return {"title_ko": title, "summary_ko": title} # 외국어: Haiku로 번역+요약 lang_names = {"en": "영어", "fr": "프랑스어", "ja": "일본어"} lang_name = lang_names.get(lang, "외국어") try: resp = httpx.post( "https://api.anthropic.com/v1/messages", json={ "model": "claude-haiku-4-5-20251001", "max_tokens": 512, "messages": [{ "role": "user", "content": f"다음 {lang_name} 기사를 한국어로 번역·요약해주세요.\n\n제목: {title}\n본문: {content[:3000]}\n\nJSON으로 응답:\n{{\"title_ko\": \"한국어 제목\", \"summary_ko\": \"2~3문장 한국어 요약\"}}" }], }, headers={ "x-api-key": ANTHROPIC_API_KEY, "anthropic-version": "2023-06-01", "content-type": "application/json", }, timeout=30, ) text = resp.json()["content"][0]["text"] clean = text.strip().removeprefix("```json").removesuffix("```").strip() return json.loads(clean) except Exception as e: logger.error(f"Translation failed: {e}") return {"title_ko": title, "summary_ko": title} def embed_to_qdrant(text: str) -> str | None: """Qdrant documents 컬렉션에 임베딩.""" try: emb_resp = httpx.post( f"{LOCAL_OLLAMA_URL}/api/embeddings", json={"model": "bge-m3", "prompt": text}, timeout=30, ) embedding = emb_resp.json().get("embedding") if not embedding: return None point_id = int(datetime.now().timestamp() * 1000) httpx.put( f"{QDRANT_URL}/collections/documents/points", json={"points": [{ "id": point_id, "vector": embedding, "payload": { "text": text, "source": "news", "created_at": datetime.now(KST).isoformat(), }, }]}, timeout=10, ) return str(point_id) except Exception as e: logger.error(f"Qdrant embed failed: {e}") return None def save_to_devonthink(title: str, content: str) -> str | None: """DEVONthink에 저장.""" try: resp = httpx.post( f"{DEVONTHINK_BRIDGE_URL}/save", json={ "title": title, "content": content, "type": "markdown", "tags": ["news", "digest"], }, timeout=10, ) data = resp.json() return data.get("uuid") if data.get("success") else None except Exception: return None def send_digest(articles: list[dict]) -> None: """Synology Chat으로 다이제스트 전송.""" if not articles or not SYNOLOGY_CHAT_WEBHOOK_URL: return lines = [] for i, a in enumerate(articles[:10], 1): lines.append(f"{i}. {a['title_ko']}\n {a['summary_ko'][:100]}") text = f"[뉴스 다이제스트] {len(articles)}건\n\n" + "\n\n".join(lines) try: httpx.post( SYNOLOGY_CHAT_WEBHOOK_URL, data={"payload": json.dumps({"text": text})}, verify=False, timeout=10, ) logger.info("Digest sent to Synology Chat") except Exception as e: logger.error(f"Chat notification failed: {e}") def main(): logger.info("News digest started") since = datetime.now(KST) - timedelta(hours=24) bookmarks = fetch_new_bookmarks(since) if not bookmarks: logger.info("No new bookmarks") return logger.info(f"Processing {len(bookmarks)} bookmarks") conn = None try: conn = get_db_connection() except Exception as e: logger.error(f"DB connection failed: {e}") processed = [] for bm in bookmarks: # 중복 체크 if conn: try: with conn.cursor() as cur: cur.execute("SELECT id FROM news_digest_log WHERE article_url = %s", (bm["url"],)) if cur.fetchone(): logger.info(f"Already processed: {bm['url']}") continue except Exception: pass lang = detect_language(bm["title"] + " " + bm["content"][:200]) result = translate_and_summarize(bm["title"], bm["content"], lang) emb_text = f"{result['title_ko']} {result['summary_ko']}" qdrant_id = embed_to_qdrant(emb_text) dt_uuid = save_to_devonthink( result["title_ko"], f"**원문**: {bm['url']}\n**출처**: {bm.get('source', '')}\n\n{result['summary_ko']}", ) # DB에 기록 if conn: try: with conn.cursor() as cur: cur.execute( "INSERT INTO news_digest_log (article_url,source,original_lang,title_ko,summary_ko,qdrant_id,devonthink_uuid) " "VALUES (%s,%s,%s,%s,%s,%s,%s) ON CONFLICT (article_url) DO NOTHING", (bm["url"], bm.get("source", ""), lang, result["title_ko"], result["summary_ko"], qdrant_id, dt_uuid), ) conn.commit() except Exception as e: logger.error(f"DB insert failed: {e}") processed.append(result) logger.info(f"Processed: {result['title_ko']}") if conn: conn.close() # 다이제스트 전송 send_digest(processed) logger.info(f"News digest complete: {len(processed)} articles") if __name__ == "__main__": main()