"테크니컬코리아 메일만" → Technicalkorea 폴더만 검색 "구글 메일" → Gmail만, "개인 메일" → INBOX만 폴더 미지정 시 전체 검색 유지 Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
194 lines
6.6 KiB
Python
194 lines
6.6 KiB
Python
"""Email 도구 — IMAP을 통한 메일 조회 (read-only, 멀티 폴더)."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import email
|
|
import email.header
|
|
import email.utils
|
|
import imaplib
|
|
import logging
|
|
from datetime import datetime, timedelta, timezone
|
|
|
|
from config import settings
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
TOOL_NAME = "email"
|
|
MAX_RESULTS = 10
|
|
PREVIEW_LEN = 200
|
|
FOLDERS = ["INBOX", "Gmail", "Technicalkorea"]
|
|
|
|
FOLDER_LABELS = {
|
|
"INBOX": "개인",
|
|
"Gmail": "Gmail",
|
|
"Technicalkorea": "회사",
|
|
}
|
|
|
|
|
|
def _make_result(ok: bool, operation: str, data=None, summary: str = "", error: str | None = None) -> dict:
|
|
return {"ok": ok, "tool": TOOL_NAME, "operation": operation, "data": data or [], "summary": summary, "error": error}
|
|
|
|
|
|
def _decode_header(raw: str) -> str:
|
|
parts = email.header.decode_header(raw)
|
|
decoded = []
|
|
for part, charset in parts:
|
|
if isinstance(part, bytes):
|
|
decoded.append(part.decode(charset or "utf-8", errors="replace"))
|
|
else:
|
|
decoded.append(part)
|
|
return "".join(decoded)
|
|
|
|
|
|
def _parse_date(date_str: str) -> datetime:
|
|
"""메일 Date 헤더 → datetime. 실패 시 epoch."""
|
|
try:
|
|
return email.utils.parsedate_to_datetime(date_str)
|
|
except Exception:
|
|
return datetime(1970, 1, 1, tzinfo=timezone.utc)
|
|
|
|
|
|
def _get_body(msg) -> str:
|
|
"""메일 본문 추출."""
|
|
if msg.is_multipart():
|
|
for part in msg.walk():
|
|
if part.get_content_type() == "text/plain":
|
|
payload = part.get_payload(decode=True)
|
|
if payload:
|
|
charset = part.get_content_charset() or "utf-8"
|
|
return payload.decode(charset, errors="replace")
|
|
for part in msg.walk():
|
|
if part.get_content_type() == "text/html":
|
|
payload = part.get_payload(decode=True)
|
|
if payload:
|
|
charset = part.get_content_charset() or "utf-8"
|
|
return payload.decode(charset, errors="replace")
|
|
else:
|
|
payload = msg.get_payload(decode=True)
|
|
if payload:
|
|
charset = msg.get_content_charset() or "utf-8"
|
|
return payload.decode(charset, errors="replace")
|
|
return ""
|
|
|
|
|
|
def _connect():
|
|
if not settings.mailplus_host or not settings.mailplus_user:
|
|
return None
|
|
conn = imaplib.IMAP4_SSL(settings.mailplus_host, settings.mailplus_port)
|
|
conn.login(settings.mailplus_user, settings.mailplus_pass)
|
|
return conn
|
|
|
|
|
|
async def search(query: str = "", days: int = 7, folder: str = "") -> dict:
|
|
"""메일 검색. folder 지정 시 해당 폴더만, 없으면 전체."""
|
|
try:
|
|
conn = _connect()
|
|
if not conn:
|
|
return _make_result(False, "search", error="메일 설정이 없습니다.")
|
|
|
|
since = (datetime.now() - timedelta(days=days)).strftime("%d-%b-%Y")
|
|
if query:
|
|
criteria = f'(SINCE {since} SUBJECT "{query}")'
|
|
else:
|
|
criteria = f"(SINCE {since})"
|
|
|
|
search_folders = [folder] if folder else FOLDERS
|
|
all_results = []
|
|
failed_folders = []
|
|
|
|
for folder in search_folders:
|
|
try:
|
|
conn.select(folder, readonly=True)
|
|
_, data = conn.search(None, criteria)
|
|
uids = data[0].split()
|
|
uids = uids[-MAX_RESULTS:] # 폴더당 최대
|
|
|
|
for uid in uids:
|
|
try:
|
|
_, msg_data = conn.fetch(uid, "(RFC822.HEADER)")
|
|
if not msg_data or not msg_data[0]:
|
|
continue
|
|
raw = msg_data[0][1]
|
|
msg = email.message_from_bytes(raw)
|
|
|
|
subject = _decode_header(msg.get("Subject", "(제목 없음)"))
|
|
from_addr = _decode_header(msg.get("From", ""))
|
|
date_str = msg.get("Date", "")
|
|
dt = _parse_date(date_str)
|
|
label = FOLDER_LABELS.get(folder, folder)
|
|
|
|
all_results.append({
|
|
"uid": f"{folder}:{uid.decode()}",
|
|
"folder": label,
|
|
"subject": subject,
|
|
"from": from_addr,
|
|
"date": dt.strftime("%m/%d %H:%M"),
|
|
"_sort_key": dt.timestamp(),
|
|
})
|
|
except Exception:
|
|
continue
|
|
|
|
conn.close()
|
|
except Exception as e:
|
|
logger.warning("Email folder %s failed: %s", folder, e)
|
|
failed_folders.append(folder)
|
|
continue
|
|
|
|
conn.logout()
|
|
|
|
# 날짜순 정렬 (최신 먼저) + 제한
|
|
all_results.sort(key=lambda x: x.get("_sort_key", 0), reverse=True)
|
|
all_results = all_results[:MAX_RESULTS]
|
|
for r in all_results:
|
|
r.pop("_sort_key", None)
|
|
|
|
summary = f"최근 {days}일간 {len(all_results)}개의 메일이 있습니다."
|
|
if failed_folders:
|
|
summary += f" (일부 폴더 조회 실패: {', '.join(failed_folders)})"
|
|
|
|
return _make_result(True, "search", data=all_results, summary=summary)
|
|
|
|
except Exception as e:
|
|
logger.exception("Email search failed")
|
|
return _make_result(False, "search", error=str(e))
|
|
|
|
|
|
async def read(uid: str) -> dict:
|
|
"""특정 메일 본문 조회. uid 형식: folder:uid"""
|
|
try:
|
|
conn = _connect()
|
|
if not conn:
|
|
return _make_result(False, "read", error="메일 설정이 없습니다.")
|
|
|
|
# folder:uid 파싱
|
|
if ":" in uid:
|
|
folder, imap_uid = uid.split(":", 1)
|
|
else:
|
|
folder, imap_uid = "INBOX", uid
|
|
|
|
conn.select(folder, readonly=True)
|
|
_, msg_data = conn.fetch(imap_uid.encode(), "(RFC822)")
|
|
|
|
if not msg_data or not msg_data[0]:
|
|
conn.close()
|
|
conn.logout()
|
|
return _make_result(False, "read", error=f"메일을 찾을 수 없습니다.")
|
|
|
|
raw = msg_data[0][1]
|
|
msg = email.message_from_bytes(raw)
|
|
|
|
subject = _decode_header(msg.get("Subject", ""))
|
|
from_addr = _decode_header(msg.get("From", ""))
|
|
date_str = msg.get("Date", "")
|
|
body = _get_body(msg)[:PREVIEW_LEN * 5]
|
|
|
|
conn.close()
|
|
conn.logout()
|
|
|
|
data = {"uid": uid, "subject": subject, "from": from_addr, "date": date_str, "body": body}
|
|
return _make_result(True, "read", data=data, summary=f"메일: {subject}")
|
|
|
|
except Exception as e:
|
|
logger.exception("Email read failed")
|
|
return _make_result(False, "read", error=str(e))
|