"""이메일 수집 워커 — Synology MailPlus IMAP → NAS 저장 + DB 등록 v1 scripts/mailplus_archive.py에서 포팅. imaplib (동기)를 asyncio.to_thread()로 래핑. """ import asyncio import email import imaplib import os import re from datetime import datetime, timedelta, timezone from email.header import decode_header from pathlib import Path from sqlalchemy import select from core.config import settings from core.database import async_session from core.utils import file_hash, send_smtp_email, setup_logger from models.automation import AutomationState from models.document import Document from models.queue import ProcessingQueue logger = setup_logger("mailplus_archive") # 업무 키워드 (data_origin 자동 감지) WORK_KEYWORDS = {"테크니컬코리아", "TK", "공장", "생산", "사내", "안전", "점검"} def _decode_mime_header(raw: str) -> str: """MIME 헤더 디코딩""" parts = decode_header(raw) decoded = [] for data, charset in parts: if isinstance(data, bytes): decoded.append(data.decode(charset or "utf-8", errors="replace")) else: decoded.append(data) return "".join(decoded) def _sanitize_filename(name: str, max_len: int = 80) -> str: """파일명에 사용 불가한 문자 제거""" clean = re.sub(r'[<>:"/\\|?*\x00-\x1f]', "_", name) return clean[:max_len].strip() def _detect_origin(subject: str, body: str) -> str: """work/external 자동 감지""" text = f"{subject} {body[:500]}".lower() for kw in WORK_KEYWORDS: if kw.lower() in text: return "work" return "external" def _fetch_emails_sync(host: str, port: int, user: str, password: str, last_uid: int | None): """동기 IMAP 메일 가져오기 (asyncio.to_thread에서 실행)""" results = [] conn = imaplib.IMAP4_SSL(host, port, timeout=30) try: conn.login(user, password) conn.select("INBOX") if last_uid: # 증분 동기화: last_uid 이후 _, data = conn.uid("search", None, f"UID {last_uid + 1}:*") else: # 최초 실행: 최근 7일 since = (datetime.now() - timedelta(days=7)).strftime("%d-%b-%Y") _, data = conn.uid("search", None, f"SINCE {since}") uids = data[0].split() for uid_bytes in uids: uid = int(uid_bytes) _, msg_data = conn.uid("fetch", uid_bytes, "(RFC822)") if msg_data[0] is None: continue raw = msg_data[0][1] results.append((uid, raw)) finally: conn.logout() return results async def run(): """이메일 수집 실행""" host = os.getenv("MAILPLUS_HOST", "") port = int(os.getenv("MAILPLUS_PORT", "993")) user = os.getenv("MAILPLUS_USER", "") password = os.getenv("MAILPLUS_PASS", "") if not all([host, user, password]): logger.warning("MailPlus 인증 정보 미설정") return async with async_session() as session: # 마지막 UID 조회 state = await session.execute( select(AutomationState).where(AutomationState.job_name == "mailplus") ) state_row = state.scalar_one_or_none() last_uid = int(state_row.last_check_value) if state_row and state_row.last_check_value else None # IMAP 동기 호출을 비동기로 래핑 try: emails = await asyncio.to_thread( _fetch_emails_sync, host, port, user, password, last_uid, ) except Exception as e: logger.error(f"IMAP 연결 실패: {e}") return if not emails: logger.info("새 이메일 없음") return # 이메일 저장 디렉토리 email_dir = Path(settings.nas_mount_path) / "PKM" / "Archive" / "emails" email_dir.mkdir(parents=True, exist_ok=True) max_uid = last_uid or 0 archived = [] for uid, raw_bytes in emails: try: msg = email.message_from_bytes(raw_bytes) subject = _decode_mime_header(msg.get("Subject", "제목없음")) date_str = msg.get("Date", "") date = datetime.now().strftime("%Y%m%d") # .eml 파일 저장 safe_subject = _sanitize_filename(subject) filename = f"{date}_{uid}_{safe_subject}.eml" eml_path = email_dir / filename eml_path.write_bytes(raw_bytes) # 본문 추출 (텍스트 파트) body = "" charset = None if msg.is_multipart(): for part in msg.walk(): if part.get_content_type() == "text/plain": payload = part.get_payload(decode=True) if payload is not None: charset = part.get_content_charset() or "utf-8" body = payload.decode(charset, errors="replace") break else: payload = msg.get_payload(decode=True) if payload is not None: charset = msg.get_content_charset() or "utf-8" body = payload.decode(charset, errors="replace") if "\ufffd" in body[:1000]: logger.debug(f"[메일] charset={charset or 'unknown'} 디코딩 중 replacement 발생") # DB 등록 rel_path = str(eml_path.relative_to(Path(settings.nas_mount_path))) origin = _detect_origin(subject, body) doc = Document( file_path=rel_path, file_hash=file_hash(eml_path), file_format="eml", file_size=len(raw_bytes), file_type="immutable", title=subject, source_channel="email", data_origin=origin, ) session.add(doc) await session.flush() safe_subj = subject.replace("\n", " ").replace("\r", " ")[:200] # TODO: extract_worker가 eml 본문/첨부 파싱 지원 시 이 조건 제거 if doc.file_format != "eml": session.add(ProcessingQueue( document_id=doc.id, stage="extract", status="pending", )) else: logger.debug(f"[메일] {safe_subj} — eml extract 미지원, 큐 스킵") archived.append(safe_subj) max_uid = max(max_uid, uid) except Exception as e: logger.error(f"UID {uid} 처리 실패: {e}") # 상태 업데이트 if state_row: state_row.last_check_value = str(max_uid) state_row.last_run_at = datetime.now(timezone.utc) else: session.add(AutomationState( job_name="mailplus", last_check_value=str(max_uid), last_run_at=datetime.now(timezone.utc), )) await session.commit() # SMTP 알림 smtp_host = os.getenv("MAILPLUS_HOST", "") smtp_port = int(os.getenv("MAILPLUS_SMTP_PORT", "465")) if archived and smtp_host: body = f"이메일 {len(archived)}건 수집 완료:\n\n" + "\n".join(f"- {s}" for s in archived) send_smtp_email(smtp_host, smtp_port, user, password, "PKM 이메일 수집 알림", body) logger.info(f"이메일 {len(archived)}건 수집 완료 (max_uid={max_uid})")