- kb_writer.py: DEVONthink AppleScript 브릿지 → 마크다운 파일 기반 전환 (포트 8095) - knowledge-base/ 디렉토리 구조 (note, chat-memory, news) - Handle Note: kb_writer 파일 저장 + Qdrant 임베딩 추가 - Embed & Save Memory: DEVONthink → kb_writer 교체 - mail_bridge.py: IMAP 날짜 기반 메일 조회 (포트 8094) - mail-processing-pipeline: IMAP Trigger → Schedule + mail_bridge + dedup - docker-compose, manage_services, LaunchAgent plist 업데이트 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
181 lines
5.9 KiB
Python
181 lines
5.9 KiB
Python
"""Mail Bridge — IMAP 날짜 기반 메일 조회 서비스 (port 8094)"""
|
|
|
|
import email
|
|
import email.header
|
|
import email.utils
|
|
import imaplib
|
|
import logging
|
|
import os
|
|
from datetime import datetime, timedelta
|
|
|
|
from dotenv import load_dotenv
|
|
from fastapi import FastAPI, Query
|
|
from fastapi.responses import JSONResponse
|
|
|
|
load_dotenv()
|
|
|
|
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
|
|
logger = logging.getLogger("mail_bridge")
|
|
|
|
IMAP_HOST = os.getenv("IMAP_HOST", "192.168.1.227")
|
|
IMAP_PORT = int(os.getenv("IMAP_PORT", "993"))
|
|
IMAP_USER = os.getenv("IMAP_USER", "")
|
|
IMAP_PASSWORD = os.getenv("IMAP_PASSWORD", "")
|
|
IMAP_SSL = os.getenv("IMAP_SSL", "true").lower() == "true"
|
|
IMAP_FOLDERS = [f.strip() for f in os.getenv("IMAP_FOLDERS", "INBOX,Gmail,Technicalkorea").split(",") if f.strip()]
|
|
|
|
app = FastAPI()
|
|
|
|
|
|
def _connect() -> imaplib.IMAP4:
|
|
if IMAP_SSL:
|
|
conn = imaplib.IMAP4_SSL(IMAP_HOST, IMAP_PORT)
|
|
else:
|
|
conn = imaplib.IMAP4(IMAP_HOST, IMAP_PORT)
|
|
conn.login(IMAP_USER, IMAP_PASSWORD)
|
|
return conn
|
|
|
|
|
|
def _decode_header(raw: str | None) -> str:
|
|
if not raw:
|
|
return ""
|
|
parts = email.header.decode_header(raw)
|
|
decoded = []
|
|
for data, charset in parts:
|
|
if isinstance(data, bytes):
|
|
decoded.append(data.decode(charset or "utf-8", errors="replace"))
|
|
else:
|
|
decoded.append(data)
|
|
return " ".join(decoded)
|
|
|
|
|
|
def _get_text(msg: email.message.Message) -> str:
|
|
if msg.is_multipart():
|
|
for part in msg.walk():
|
|
ct = part.get_content_type()
|
|
if ct == "text/plain":
|
|
payload = part.get_payload(decode=True)
|
|
if payload:
|
|
charset = part.get_content_charset() or "utf-8"
|
|
return payload.decode(charset, errors="replace")
|
|
# fallback: text/html
|
|
for part in msg.walk():
|
|
ct = part.get_content_type()
|
|
if ct == "text/html":
|
|
payload = part.get_payload(decode=True)
|
|
if payload:
|
|
charset = part.get_content_charset() or "utf-8"
|
|
return payload.decode(charset, errors="replace")
|
|
return ""
|
|
payload = msg.get_payload(decode=True)
|
|
if payload:
|
|
charset = msg.get_content_charset() or "utf-8"
|
|
return payload.decode(charset, errors="replace")
|
|
return ""
|
|
|
|
|
|
def _dedup_key(mail: dict) -> str:
|
|
if mail["messageId"]:
|
|
return mail["messageId"]
|
|
return f"{mail['subject']}|{mail['date']}|{mail['from']}"
|
|
|
|
|
|
@app.get("/recent")
|
|
def recent_mails(days: int = Query(default=1, ge=1, le=30)):
|
|
try:
|
|
conn = _connect()
|
|
except Exception as e:
|
|
logger.error(f"IMAP connection failed: {e}")
|
|
return JSONResponse({"success": False, "error": f"IMAP connection failed: {e}"}, status_code=502)
|
|
|
|
try:
|
|
since = (datetime.now() - timedelta(days=days)).strftime("%d-%b-%Y")
|
|
seen_keys: set[str] = set()
|
|
mails: list[dict] = []
|
|
|
|
for folder in IMAP_FOLDERS:
|
|
try:
|
|
status, _ = conn.select(folder, readonly=True)
|
|
if status != "OK":
|
|
logger.warning(f"Cannot select folder '{folder}', skipping")
|
|
continue
|
|
except Exception as e:
|
|
logger.warning(f"Failed to select folder '{folder}': {e}")
|
|
continue
|
|
|
|
_, msg_nums = conn.search(None, f"SINCE {since}")
|
|
nums = msg_nums[0].split() if msg_nums[0] else []
|
|
|
|
for num in nums:
|
|
_, data = conn.fetch(num, "(RFC822)")
|
|
if not data or not data[0]:
|
|
continue
|
|
raw = data[0][1]
|
|
msg = email.message_from_bytes(raw)
|
|
|
|
message_id = msg.get("Message-ID", "").strip()
|
|
from_addr = _decode_header(msg.get("From", ""))
|
|
subject = _decode_header(msg.get("Subject", ""))
|
|
date_str = msg.get("Date", "")
|
|
text = _get_text(msg)
|
|
|
|
parsed_date = email.utils.parsedate_to_datetime(date_str).isoformat() if date_str else ""
|
|
|
|
mail_entry = {
|
|
"messageId": message_id,
|
|
"from": from_addr,
|
|
"subject": subject,
|
|
"text": text,
|
|
"date": parsed_date,
|
|
"folder": folder,
|
|
}
|
|
|
|
key = _dedup_key(mail_entry)
|
|
if key not in seen_keys:
|
|
seen_keys.add(key)
|
|
mails.append(mail_entry)
|
|
|
|
logger.info(f"Folder '{folder}': {len(nums)} messages since {since}")
|
|
|
|
mails.sort(key=lambda m: m["date"], reverse=True)
|
|
logger.info(f"Total {len(mails)} unique mails across {len(IMAP_FOLDERS)} folders (SINCE {since})")
|
|
return {"success": True, "count": len(mails), "mails": mails}
|
|
except Exception as e:
|
|
logger.error(f"IMAP fetch error: {e}")
|
|
return JSONResponse({"success": False, "error": str(e)}, status_code=500)
|
|
finally:
|
|
try:
|
|
conn.logout()
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
@app.get("/health")
|
|
def health():
|
|
folders_status: dict[str, bool] = {}
|
|
try:
|
|
conn = _connect()
|
|
for folder in IMAP_FOLDERS:
|
|
try:
|
|
status, _ = conn.select(folder, readonly=True)
|
|
folders_status[folder] = status == "OK"
|
|
except Exception:
|
|
folders_status[folder] = False
|
|
conn.logout()
|
|
except Exception as e:
|
|
logger.warning(f"IMAP health check failed: {e}")
|
|
for folder in IMAP_FOLDERS:
|
|
folders_status.setdefault(folder, False)
|
|
|
|
any_ok = any(folders_status.values())
|
|
all_ok = all(folders_status.values())
|
|
|
|
if all_ok:
|
|
status = "ok"
|
|
elif any_ok:
|
|
status = "degraded"
|
|
else:
|
|
status = "error"
|
|
|
|
return {"status": status, "imap_reachable": any_ok, "folders": folders_status}
|