- tools/calendar_tool.py: CalDAV search/today/create_draft/create_confirmed - tools/email_tool.py: IMAP search/read (전송 비활성화) - tools/document_tool.py: Document Server search/read (read-only) - tools/registry.py: 도구 디스패처 + WRITE_OPS 안전장치 + 에러 표준화 - 분류기: "tools" 액션 추가, 도구 목록/파라미터 스키마/규칙 명시 - Worker: tools 분기 + tool timeout 10초 + payload 2000자 제한 - conversation: pending_draft (TTL 5분) + create 확인 플로우 - 현재 시간을 분류기에 전달 (날짜 질문 대응) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
152 lines
4.8 KiB
Python
152 lines
4.8 KiB
Python
"""Email 도구 — IMAP을 통한 메일 조회 (read-only)."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import email
|
|
import email.header
|
|
import imaplib
|
|
import logging
|
|
from datetime import datetime, timedelta
|
|
|
|
from config import settings
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
TOOL_NAME = "email"
|
|
MAX_RESULTS = 10
|
|
PREVIEW_LEN = 200
|
|
|
|
|
|
def _make_result(ok: bool, operation: str, data=None, summary: str = "", error: str | None = None) -> dict:
|
|
return {"ok": ok, "tool": TOOL_NAME, "operation": operation, "data": data or [], "summary": summary, "error": error}
|
|
|
|
|
|
def _decode_header(raw: str) -> str:
|
|
parts = email.header.decode_header(raw)
|
|
decoded = []
|
|
for part, charset in parts:
|
|
if isinstance(part, bytes):
|
|
decoded.append(part.decode(charset or "utf-8", errors="replace"))
|
|
else:
|
|
decoded.append(part)
|
|
return "".join(decoded)
|
|
|
|
|
|
def _get_body(msg) -> str:
|
|
"""메일 본문 추출."""
|
|
if msg.is_multipart():
|
|
for part in msg.walk():
|
|
ct = part.get_content_type()
|
|
if ct == "text/plain":
|
|
payload = part.get_payload(decode=True)
|
|
if payload:
|
|
charset = part.get_content_charset() or "utf-8"
|
|
return payload.decode(charset, errors="replace")
|
|
# fallback to html
|
|
for part in msg.walk():
|
|
if part.get_content_type() == "text/html":
|
|
payload = part.get_payload(decode=True)
|
|
if payload:
|
|
charset = part.get_content_charset() or "utf-8"
|
|
return payload.decode(charset, errors="replace")
|
|
else:
|
|
payload = msg.get_payload(decode=True)
|
|
if payload:
|
|
charset = msg.get_content_charset() or "utf-8"
|
|
return payload.decode(charset, errors="replace")
|
|
return ""
|
|
|
|
|
|
def _connect():
|
|
if not settings.mailplus_host or not settings.mailplus_user:
|
|
return None
|
|
conn = imaplib.IMAP4_SSL(settings.mailplus_host, settings.mailplus_port)
|
|
conn.login(settings.mailplus_user, settings.mailplus_pass)
|
|
return conn
|
|
|
|
|
|
async def search(query: str = "", days: int = 7) -> dict:
|
|
"""최근 메일 검색."""
|
|
try:
|
|
conn = _connect()
|
|
if not conn:
|
|
return _make_result(False, "search", error="메일 설정이 없습니다.")
|
|
|
|
conn.select("INBOX", readonly=True)
|
|
|
|
since = (datetime.now() - timedelta(days=days)).strftime("%d-%b-%Y")
|
|
if query:
|
|
criteria = f'(SINCE {since} SUBJECT "{query}")'
|
|
else:
|
|
criteria = f"(SINCE {since})"
|
|
|
|
_, data = conn.search(None, criteria)
|
|
uids = data[0].split()
|
|
|
|
# 최신 MAX_RESULTS개만
|
|
uids = uids[-MAX_RESULTS:]
|
|
uids.reverse()
|
|
|
|
results = []
|
|
for uid in uids:
|
|
_, msg_data = conn.fetch(uid, "(RFC822.HEADER)")
|
|
if not msg_data or not msg_data[0]:
|
|
continue
|
|
raw = msg_data[0][1]
|
|
msg = email.message_from_bytes(raw)
|
|
|
|
subject = _decode_header(msg.get("Subject", "(제목 없음)"))
|
|
from_addr = _decode_header(msg.get("From", ""))
|
|
date_str = msg.get("Date", "")
|
|
|
|
results.append({
|
|
"uid": uid.decode(),
|
|
"subject": subject,
|
|
"from": from_addr,
|
|
"date": date_str[:20],
|
|
})
|
|
|
|
conn.close()
|
|
conn.logout()
|
|
|
|
summary = f"최근 {days}일간 {len(results)}개의 메일이 있습니다."
|
|
return _make_result(True, "search", data=results, summary=summary)
|
|
|
|
except Exception as e:
|
|
logger.exception("Email search failed")
|
|
return _make_result(False, "search", error=str(e))
|
|
|
|
|
|
async def read(uid: str) -> dict:
|
|
"""특정 메일 본문 조회."""
|
|
try:
|
|
conn = _connect()
|
|
if not conn:
|
|
return _make_result(False, "read", error="메일 설정이 없습니다.")
|
|
|
|
conn.select("INBOX", readonly=True)
|
|
_, msg_data = conn.fetch(uid.encode(), "(RFC822)")
|
|
|
|
if not msg_data or not msg_data[0]:
|
|
conn.close()
|
|
conn.logout()
|
|
return _make_result(False, "read", error=f"UID {uid} 메일을 찾을 수 없습니다.")
|
|
|
|
raw = msg_data[0][1]
|
|
msg = email.message_from_bytes(raw)
|
|
|
|
subject = _decode_header(msg.get("Subject", ""))
|
|
from_addr = _decode_header(msg.get("From", ""))
|
|
date_str = msg.get("Date", "")
|
|
body = _get_body(msg)[:PREVIEW_LEN * 5] # read는 더 긴 본문
|
|
|
|
conn.close()
|
|
conn.logout()
|
|
|
|
data = {"uid": uid, "subject": subject, "from": from_addr, "date": date_str, "body": body}
|
|
return _make_result(True, "read", data=data, summary=f"메일: {subject}")
|
|
|
|
except Exception as e:
|
|
logger.exception("Email read failed")
|
|
return _make_result(False, "read", error=str(e))
|