"""Mail Bridge — IMAP 날짜 기반 메일 조회 서비스 (port 8094)""" import email import email.header import email.utils import imaplib import logging import os from datetime import datetime, timedelta from dotenv import load_dotenv from fastapi import FastAPI, Query from fastapi.responses import JSONResponse load_dotenv() logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s") logger = logging.getLogger("mail_bridge") IMAP_HOST = os.getenv("IMAP_HOST", "192.168.1.227") IMAP_PORT = int(os.getenv("IMAP_PORT", "993")) IMAP_USER = os.getenv("IMAP_USER", "") IMAP_PASSWORD = os.getenv("IMAP_PASSWORD", "") IMAP_SSL = os.getenv("IMAP_SSL", "true").lower() == "true" IMAP_FOLDERS = [f.strip() for f in os.getenv("IMAP_FOLDERS", "INBOX,Gmail,Technicalkorea").split(",") if f.strip()] app = FastAPI() def _connect() -> imaplib.IMAP4: if IMAP_SSL: conn = imaplib.IMAP4_SSL(IMAP_HOST, IMAP_PORT) else: conn = imaplib.IMAP4(IMAP_HOST, IMAP_PORT) conn.login(IMAP_USER, IMAP_PASSWORD) return conn def _decode_header(raw: str | None) -> str: if not raw: return "" parts = email.header.decode_header(raw) decoded = [] for data, charset in parts: if isinstance(data, bytes): decoded.append(data.decode(charset or "utf-8", errors="replace")) else: decoded.append(data) return " ".join(decoded) def _get_text(msg: email.message.Message) -> str: if msg.is_multipart(): for part in msg.walk(): ct = part.get_content_type() if ct == "text/plain": payload = part.get_payload(decode=True) if payload: charset = part.get_content_charset() or "utf-8" return payload.decode(charset, errors="replace") # fallback: text/html for part in msg.walk(): ct = part.get_content_type() if ct == "text/html": payload = part.get_payload(decode=True) if payload: charset = part.get_content_charset() or "utf-8" return payload.decode(charset, errors="replace") return "" payload = msg.get_payload(decode=True) if payload: charset = msg.get_content_charset() or "utf-8" return payload.decode(charset, errors="replace") return "" def _dedup_key(mail: dict) -> str: if mail["messageId"]: return mail["messageId"] return f"{mail['subject']}|{mail['date']}|{mail['from']}" @app.get("/recent") def recent_mails(days: int = Query(default=1, ge=1, le=30)): try: conn = _connect() except Exception as e: logger.error(f"IMAP connection failed: {e}") return JSONResponse({"success": False, "error": f"IMAP connection failed: {e}"}, status_code=502) try: since = (datetime.now() - timedelta(days=days)).strftime("%d-%b-%Y") seen_keys: set[str] = set() mails: list[dict] = [] for folder in IMAP_FOLDERS: try: status, _ = conn.select(folder, readonly=True) if status != "OK": logger.warning(f"Cannot select folder '{folder}', skipping") continue except Exception as e: logger.warning(f"Failed to select folder '{folder}': {e}") continue _, msg_nums = conn.search(None, f"SINCE {since}") nums = msg_nums[0].split() if msg_nums[0] else [] for num in nums: _, data = conn.fetch(num, "(RFC822)") if not data or not data[0]: continue raw = data[0][1] msg = email.message_from_bytes(raw) message_id = msg.get("Message-ID", "").strip() from_addr = _decode_header(msg.get("From", "")) subject = _decode_header(msg.get("Subject", "")) date_str = msg.get("Date", "") text = _get_text(msg) parsed_date = email.utils.parsedate_to_datetime(date_str).isoformat() if date_str else "" mail_entry = { "messageId": message_id, "from": from_addr, "subject": subject, "text": text, "date": parsed_date, "folder": folder, } key = _dedup_key(mail_entry) if key not in seen_keys: seen_keys.add(key) mails.append(mail_entry) logger.info(f"Folder '{folder}': {len(nums)} messages since {since}") mails.sort(key=lambda m: m["date"], reverse=True) logger.info(f"Total {len(mails)} unique mails across {len(IMAP_FOLDERS)} folders (SINCE {since})") return {"success": True, "count": len(mails), "mails": mails} except Exception as e: logger.error(f"IMAP fetch error: {e}") return JSONResponse({"success": False, "error": str(e)}, status_code=500) finally: try: conn.logout() except Exception: pass @app.get("/health") def health(): folders_status: dict[str, bool] = {} try: conn = _connect() for folder in IMAP_FOLDERS: try: status, _ = conn.select(folder, readonly=True) folders_status[folder] = status == "OK" except Exception: folders_status[folder] = False conn.logout() except Exception as e: logger.warning(f"IMAP health check failed: {e}") for folder in IMAP_FOLDERS: folders_status.setdefault(folder, False) any_ok = any(folders_status.values()) all_ok = all(folders_status.values()) if all_ok: status = "ok" elif any_ok: status = "degraded" else: status = "error" return {"status": status, "imap_reachable": any_ok, "folders": folders_status}