"""Email 도구 — IMAP을 통한 메일 조회 (read-only).""" from __future__ import annotations import email import email.header import imaplib import logging from datetime import datetime, timedelta from config import settings logger = logging.getLogger(__name__) TOOL_NAME = "email" MAX_RESULTS = 10 PREVIEW_LEN = 200 def _make_result(ok: bool, operation: str, data=None, summary: str = "", error: str | None = None) -> dict: return {"ok": ok, "tool": TOOL_NAME, "operation": operation, "data": data or [], "summary": summary, "error": error} def _decode_header(raw: str) -> str: parts = email.header.decode_header(raw) decoded = [] for part, charset in parts: if isinstance(part, bytes): decoded.append(part.decode(charset or "utf-8", errors="replace")) else: decoded.append(part) return "".join(decoded) def _get_body(msg) -> str: """메일 본문 추출.""" if msg.is_multipart(): for part in msg.walk(): ct = part.get_content_type() if ct == "text/plain": payload = part.get_payload(decode=True) if payload: charset = part.get_content_charset() or "utf-8" return payload.decode(charset, errors="replace") # fallback to html for part in msg.walk(): if part.get_content_type() == "text/html": payload = part.get_payload(decode=True) if payload: charset = part.get_content_charset() or "utf-8" return payload.decode(charset, errors="replace") else: payload = msg.get_payload(decode=True) if payload: charset = msg.get_content_charset() or "utf-8" return payload.decode(charset, errors="replace") return "" def _connect(): if not settings.mailplus_host or not settings.mailplus_user: return None conn = imaplib.IMAP4_SSL(settings.mailplus_host, settings.mailplus_port) conn.login(settings.mailplus_user, settings.mailplus_pass) return conn async def search(query: str = "", days: int = 7) -> dict: """최근 메일 검색.""" try: conn = _connect() if not conn: return _make_result(False, "search", error="메일 설정이 없습니다.") conn.select("INBOX", readonly=True) since = (datetime.now() - timedelta(days=days)).strftime("%d-%b-%Y") if query: criteria = f'(SINCE {since} SUBJECT "{query}")' else: criteria = f"(SINCE {since})" _, data = conn.search(None, criteria) uids = data[0].split() # 최신 MAX_RESULTS개만 uids = uids[-MAX_RESULTS:] uids.reverse() results = [] for uid in uids: _, msg_data = conn.fetch(uid, "(RFC822.HEADER)") if not msg_data or not msg_data[0]: continue raw = msg_data[0][1] msg = email.message_from_bytes(raw) subject = _decode_header(msg.get("Subject", "(제목 없음)")) from_addr = _decode_header(msg.get("From", "")) date_str = msg.get("Date", "") results.append({ "uid": uid.decode(), "subject": subject, "from": from_addr, "date": date_str[:20], }) conn.close() conn.logout() summary = f"최근 {days}일간 {len(results)}개의 메일이 있습니다." return _make_result(True, "search", data=results, summary=summary) except Exception as e: logger.exception("Email search failed") return _make_result(False, "search", error=str(e)) async def read(uid: str) -> dict: """특정 메일 본문 조회.""" try: conn = _connect() if not conn: return _make_result(False, "read", error="메일 설정이 없습니다.") conn.select("INBOX", readonly=True) _, msg_data = conn.fetch(uid.encode(), "(RFC822)") if not msg_data or not msg_data[0]: conn.close() conn.logout() return _make_result(False, "read", error=f"UID {uid} 메일을 찾을 수 없습니다.") raw = msg_data[0][1] msg = email.message_from_bytes(raw) subject = _decode_header(msg.get("Subject", "")) from_addr = _decode_header(msg.get("From", "")) date_str = msg.get("Date", "") body = _get_body(msg)[:PREVIEW_LEN * 5] # read는 더 긴 본문 conn.close() conn.logout() data = {"uid": uid, "subject": subject, "from": from_addr, "date": date_str, "body": body} return _make_result(True, "read", data=data, summary=f"메일: {subject}") except Exception as e: logger.exception("Email read failed") return _make_result(False, "read", error=str(e))