feat: Implement AI classification and Web UI, refactor to IMAP
This commit is contained in:
175
imap_client.py
Normal file
175
imap_client.py
Normal file
@@ -0,0 +1,175 @@
|
||||
import imaplib
|
||||
import email
|
||||
from email.header import decode_header
|
||||
import os
|
||||
|
||||
class ImapMailClient:
|
||||
def __init__(self, server, port, username, password):
|
||||
self.server = server
|
||||
self.port = port
|
||||
self.username = username
|
||||
self.password = password
|
||||
self.mail = None
|
||||
|
||||
def connect(self):
|
||||
"""IMAP 서버에 연결하고 로그인합니다."""
|
||||
try:
|
||||
self.mail = imaplib.IMAP4_SSL(self.server, self.port)
|
||||
self.mail.login(self.username, self.password)
|
||||
print("IMAP 서버에 성공적으로 연결했습니다.")
|
||||
return True
|
||||
except Exception as e:
|
||||
print(f"IMAP 연결 오류: {e}")
|
||||
return False
|
||||
|
||||
def fetch_unread_emails(self, mailbox='inbox'):
|
||||
"""지정된 메일함에서 읽지 않은 모든 이메일의 UID를 가져옵니다."""
|
||||
if not self.mail:
|
||||
print("오류: 먼저 connect()를 호출해야 합니다.")
|
||||
return []
|
||||
|
||||
try:
|
||||
self.mail.select(mailbox)
|
||||
status, messages = self.mail.search(None, 'UNSEEN')
|
||||
if status != 'OK':
|
||||
print(f"'{mailbox}' 메일함에서 UNSEEN 이메일을 검색하지 못했습니다.")
|
||||
return []
|
||||
|
||||
email_uids = messages[0].split()
|
||||
print(f"'{mailbox}'에서 {len(email_uids)}개의 새로운 메일을 발견했습니다.")
|
||||
return email_uids
|
||||
except Exception as e:
|
||||
print(f"읽지 않은 이메일을 가져오는 중 오류 발생: {e}")
|
||||
return []
|
||||
|
||||
def fetch_email(self, uid):
|
||||
"""주어진 UID를 가진 이메일의 상세 정보를 가져옵니다 (보낸사람, 제목, 본문)."""
|
||||
if not self.mail:
|
||||
return None
|
||||
|
||||
try:
|
||||
# UID로 메일 가져오기
|
||||
status, msg_data = self.mail.fetch(uid, '(RFC822)')
|
||||
if status != 'OK':
|
||||
return None
|
||||
|
||||
for response_part in msg_data:
|
||||
if isinstance(response_part, tuple):
|
||||
msg = email.message_from_bytes(response_part[1])
|
||||
|
||||
# 헤더 디코딩
|
||||
subject, encoding = decode_header(msg["subject"])[0]
|
||||
if isinstance(subject, bytes):
|
||||
subject = subject.decode(encoding if encoding else "utf-8")
|
||||
|
||||
from_ = msg.get("From")
|
||||
|
||||
# 본문 내용 추출
|
||||
body = ""
|
||||
if msg.is_multipart():
|
||||
for part in msg.walk():
|
||||
content_type = part.get_content_type()
|
||||
content_disposition = str(part.get("Content-Disposition"))
|
||||
if content_type == "text/plain" and "attachment" not in content_disposition:
|
||||
payload = part.get_payload(decode=True)
|
||||
charset = part.get_content_charset()
|
||||
body = payload.decode(charset if charset else "utf-8")
|
||||
break
|
||||
else:
|
||||
payload = msg.get_payload(decode=True)
|
||||
charset = msg.get_content_charset()
|
||||
body = payload.decode(charset if charset else "utf-8")
|
||||
|
||||
return {
|
||||
"uid": uid,
|
||||
"from": from_,
|
||||
"subject": subject,
|
||||
"body": body.strip(),
|
||||
"raw": response_part[1] # .eml 파일 저장을 위한 원본 데이터
|
||||
}
|
||||
return None
|
||||
except Exception as e:
|
||||
print(f"UID {uid} 이메일을 가져오는 중 오류 발생: {e}")
|
||||
return None
|
||||
|
||||
def move_email(self, uid, target_mailbox):
|
||||
"""이메일을 다른 메일함으로 이동합니다."""
|
||||
if not self.mail:
|
||||
return False
|
||||
try:
|
||||
# 메일을 대상 메일함으로 복사
|
||||
self.mail.copy(uid, target_mailbox)
|
||||
# 원본 메일을 삭제 플래그 처리
|
||||
self.mail.store(uid, '+FLAGS', '\Deleted')
|
||||
# 메일함에서 영구 삭제
|
||||
self.mail.expunge()
|
||||
print(f"메일(UID: {uid})을 '{target_mailbox}'(으)로 이동했습니다.")
|
||||
return True
|
||||
except Exception as e:
|
||||
print(f"메일 이동 중 오류 발생: {e}")
|
||||
return False
|
||||
|
||||
def delete_email(self, uid):
|
||||
"""이메일을 삭제합니다 (휴지통으로 이동)."""
|
||||
# Synology MailPlus에서는 'Deleted' 플래그가 휴지통으로 이동시킵니다.
|
||||
return self.move_email(uid, 'Trash')
|
||||
|
||||
def download_email(self, email_data, directory):
|
||||
"""이메일 원본 데이터를 .eml 파일로 저장합니다."""
|
||||
if not os.path.exists(directory):
|
||||
os.makedirs(directory)
|
||||
|
||||
# 파일명으로 사용할 수 없는 문자 제거
|
||||
safe_subject = "".join([c for c in email_data['subject'] if c.isalpha() or c.isdigit() or c==' ']).rstrip()
|
||||
filename = f"{email_data['uid'].decode()}_{safe_subject}.eml"
|
||||
filepath = os.path.join(directory, filename)
|
||||
|
||||
try:
|
||||
with open(filepath, 'wb') as f:
|
||||
f.write(email_data['raw'])
|
||||
print(f"메일을 '{filepath}'에 다운로드했습니다.")
|
||||
return filepath
|
||||
except Exception as e:
|
||||
print(f"메일 다운로드 중 오류 발생: {e}")
|
||||
return None
|
||||
|
||||
def close(self):
|
||||
"""서버 연결을 종료합니다."""
|
||||
if self.mail:
|
||||
self.mail.close()
|
||||
self.mail.logout()
|
||||
print("IMAP 서버 연결을 종료했습니다.")
|
||||
|
||||
if __name__ == '__main__':
|
||||
# --- 사용 예시 ---
|
||||
# 1. config.json 파일에 imap 정보를 정확히 입력했는지 확인하세요.
|
||||
import json
|
||||
with open('config.json') as f:
|
||||
config = json.load(f)
|
||||
|
||||
imap_client = ImapMailClient(
|
||||
server=config['imap']['server'],
|
||||
port=config['imap']['port'],
|
||||
username=config['username'],
|
||||
password=config['password']
|
||||
)
|
||||
|
||||
if imap_client.connect():
|
||||
unread_uids = imap_client.fetch_unread_emails()
|
||||
if unread_uids:
|
||||
# 첫 번째 안 읽은 메일 테스트
|
||||
first_uid = unread_uids[0]
|
||||
email_details = imap_client.fetch_email(first_uid)
|
||||
if email_details:
|
||||
print("\n--- 첫 번째 메일 정보 ---")
|
||||
print(f" 보낸사람: {email_details['from']}")
|
||||
print(f" 제목: {email_details['subject']}")
|
||||
print("------------------------\n")
|
||||
|
||||
# 테스트: 'Test' 메일함으로 이동
|
||||
# imap_client.move_email(first_uid, 'Test')
|
||||
|
||||
# 테스트: .eml 파일로 다운로드
|
||||
# imap_client.download_email(email_details, 'downloaded_mails')
|
||||
|
||||
imap_client.close()
|
||||
Reference in New Issue
Block a user