176 lines
7.0 KiB
Python
176 lines
7.0 KiB
Python
import imaplib
|
|
import email
|
|
from email.header import decode_header
|
|
import os
|
|
|
|
class ImapMailClient:
|
|
def __init__(self, server, port, username, password):
|
|
self.server = server
|
|
self.port = port
|
|
self.username = username
|
|
self.password = password
|
|
self.mail = None
|
|
|
|
def connect(self):
|
|
"""IMAP 서버에 연결하고 로그인합니다."""
|
|
try:
|
|
self.mail = imaplib.IMAP4_SSL(self.server, self.port)
|
|
self.mail.login(self.username, self.password)
|
|
print("IMAP 서버에 성공적으로 연결했습니다.")
|
|
return True
|
|
except Exception as e:
|
|
print(f"IMAP 연결 오류: {e}")
|
|
return False
|
|
|
|
def fetch_unread_emails(self, mailbox='inbox'):
|
|
"""지정된 메일함에서 읽지 않은 모든 이메일의 UID를 가져옵니다."""
|
|
if not self.mail:
|
|
print("오류: 먼저 connect()를 호출해야 합니다.")
|
|
return []
|
|
|
|
try:
|
|
self.mail.select(mailbox)
|
|
status, messages = self.mail.search(None, 'UNSEEN')
|
|
if status != 'OK':
|
|
print(f"'{mailbox}' 메일함에서 UNSEEN 이메일을 검색하지 못했습니다.")
|
|
return []
|
|
|
|
email_uids = messages[0].split()
|
|
print(f"'{mailbox}'에서 {len(email_uids)}개의 새로운 메일을 발견했습니다.")
|
|
return email_uids
|
|
except Exception as e:
|
|
print(f"읽지 않은 이메일을 가져오는 중 오류 발생: {e}")
|
|
return []
|
|
|
|
def fetch_email(self, uid):
|
|
"""주어진 UID를 가진 이메일의 상세 정보를 가져옵니다 (보낸사람, 제목, 본문)."""
|
|
if not self.mail:
|
|
return None
|
|
|
|
try:
|
|
# UID로 메일 가져오기
|
|
status, msg_data = self.mail.fetch(uid, '(RFC822)')
|
|
if status != 'OK':
|
|
return None
|
|
|
|
for response_part in msg_data:
|
|
if isinstance(response_part, tuple):
|
|
msg = email.message_from_bytes(response_part[1])
|
|
|
|
# 헤더 디코딩
|
|
subject, encoding = decode_header(msg["subject"])[0]
|
|
if isinstance(subject, bytes):
|
|
subject = subject.decode(encoding if encoding else "utf-8")
|
|
|
|
from_ = msg.get("From")
|
|
|
|
# 본문 내용 추출
|
|
body = ""
|
|
if msg.is_multipart():
|
|
for part in msg.walk():
|
|
content_type = part.get_content_type()
|
|
content_disposition = str(part.get("Content-Disposition"))
|
|
if content_type == "text/plain" and "attachment" not in content_disposition:
|
|
payload = part.get_payload(decode=True)
|
|
charset = part.get_content_charset()
|
|
body = payload.decode(charset if charset else "utf-8")
|
|
break
|
|
else:
|
|
payload = msg.get_payload(decode=True)
|
|
charset = msg.get_content_charset()
|
|
body = payload.decode(charset if charset else "utf-8")
|
|
|
|
return {
|
|
"uid": uid,
|
|
"from": from_,
|
|
"subject": subject,
|
|
"body": body.strip(),
|
|
"raw": response_part[1] # .eml 파일 저장을 위한 원본 데이터
|
|
}
|
|
return None
|
|
except Exception as e:
|
|
print(f"UID {uid} 이메일을 가져오는 중 오류 발생: {e}")
|
|
return None
|
|
|
|
def move_email(self, uid, target_mailbox):
|
|
"""이메일을 다른 메일함으로 이동합니다."""
|
|
if not self.mail:
|
|
return False
|
|
try:
|
|
# 메일을 대상 메일함으로 복사
|
|
self.mail.copy(uid, target_mailbox)
|
|
# 원본 메일을 삭제 플래그 처리
|
|
self.mail.store(uid, '+FLAGS', '\Deleted')
|
|
# 메일함에서 영구 삭제
|
|
self.mail.expunge()
|
|
print(f"메일(UID: {uid})을 '{target_mailbox}'(으)로 이동했습니다.")
|
|
return True
|
|
except Exception as e:
|
|
print(f"메일 이동 중 오류 발생: {e}")
|
|
return False
|
|
|
|
def delete_email(self, uid):
|
|
"""이메일을 삭제합니다 (휴지통으로 이동)."""
|
|
# Synology MailPlus에서는 'Deleted' 플래그가 휴지통으로 이동시킵니다.
|
|
return self.move_email(uid, 'Trash')
|
|
|
|
def download_email(self, email_data, directory):
|
|
"""이메일 원본 데이터를 .eml 파일로 저장합니다."""
|
|
if not os.path.exists(directory):
|
|
os.makedirs(directory)
|
|
|
|
# 파일명으로 사용할 수 없는 문자 제거
|
|
safe_subject = "".join([c for c in email_data['subject'] if c.isalpha() or c.isdigit() or c==' ']).rstrip()
|
|
filename = f"{email_data['uid'].decode()}_{safe_subject}.eml"
|
|
filepath = os.path.join(directory, filename)
|
|
|
|
try:
|
|
with open(filepath, 'wb') as f:
|
|
f.write(email_data['raw'])
|
|
print(f"메일을 '{filepath}'에 다운로드했습니다.")
|
|
return filepath
|
|
except Exception as e:
|
|
print(f"메일 다운로드 중 오류 발생: {e}")
|
|
return None
|
|
|
|
def close(self):
|
|
"""서버 연결을 종료합니다."""
|
|
if self.mail:
|
|
self.mail.close()
|
|
self.mail.logout()
|
|
print("IMAP 서버 연결을 종료했습니다.")
|
|
|
|
if __name__ == '__main__':
|
|
# --- 사용 예시 ---
|
|
# 1. config.json 파일에 imap 정보를 정확히 입력했는지 확인하세요.
|
|
import json
|
|
with open('config.json') as f:
|
|
config = json.load(f)
|
|
|
|
imap_client = ImapMailClient(
|
|
server=config['imap']['server'],
|
|
port=config['imap']['port'],
|
|
username=config['username'],
|
|
password=config['password']
|
|
)
|
|
|
|
if imap_client.connect():
|
|
unread_uids = imap_client.fetch_unread_emails()
|
|
if unread_uids:
|
|
# 첫 번째 안 읽은 메일 테스트
|
|
first_uid = unread_uids[0]
|
|
email_details = imap_client.fetch_email(first_uid)
|
|
if email_details:
|
|
print("\n--- 첫 번째 메일 정보 ---")
|
|
print(f" 보낸사람: {email_details['from']}")
|
|
print(f" 제목: {email_details['subject']}")
|
|
print("------------------------\n")
|
|
|
|
# 테스트: 'Test' 메일함으로 이동
|
|
# imap_client.move_email(first_uid, 'Test')
|
|
|
|
# 테스트: .eml 파일로 다운로드
|
|
# imap_client.download_email(email_details, 'downloaded_mails')
|
|
|
|
imap_client.close()
|