2e5baa8329
mailplus_archive: INBOX 전체 → MAILPLUS_FOLDER(기본 PKM) 선별 수집, eml extract 스킵 가드 제거(검색·색인 편입), 폴더별 UID 상태(job_name=mailplus:<folder>), 폴더 부재 시 no-op. extract_worker: _extract_eml 신설(From/To/Date/Subject 헤더 prepend + 본문 text/plain 우선·html→bs4 fallback, 첨부 extract_meta 인벤토리 scaffold). preview/marker 는 eml 자동 skip 이라 라우팅 무변경. DNS drift(D25, mailplus.hyungi.net→.227) 교정 후 활성. 할일(events) 연계 없음. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
216 lines
7.8 KiB
Python
216 lines
7.8 KiB
Python
"""이메일 수집 워커 — Synology MailPlus IMAP → NAS 저장 + DB 등록
|
|
|
|
v1 scripts/mailplus_archive.py에서 포팅.
|
|
imaplib (동기)를 asyncio.to_thread()로 래핑.
|
|
"""
|
|
|
|
import asyncio
|
|
import email
|
|
import imaplib
|
|
import os
|
|
import re
|
|
from datetime import datetime, timedelta, timezone
|
|
from email.header import decode_header
|
|
from pathlib import Path
|
|
|
|
from sqlalchemy import select
|
|
|
|
from core.config import settings
|
|
from core.database import async_session
|
|
from core.utils import file_hash, send_smtp_email, setup_logger
|
|
from models.automation import AutomationState
|
|
from models.document import Document
|
|
from models.queue import enqueue_stage
|
|
|
|
logger = setup_logger("mailplus_archive")
|
|
|
|
# 업무 키워드 (data_origin 자동 감지)
|
|
WORK_KEYWORDS = {"테크니컬코리아", "TK", "공장", "생산", "사내", "안전", "점검"}
|
|
|
|
|
|
def _decode_mime_header(raw: str) -> str:
|
|
"""MIME 헤더 디코딩"""
|
|
parts = decode_header(raw)
|
|
decoded = []
|
|
for data, charset in parts:
|
|
if isinstance(data, bytes):
|
|
decoded.append(data.decode(charset or "utf-8", errors="replace"))
|
|
else:
|
|
decoded.append(data)
|
|
return "".join(decoded)
|
|
|
|
|
|
def _sanitize_filename(name: str, max_len: int = 80) -> str:
|
|
"""파일명에 사용 불가한 문자 제거"""
|
|
clean = re.sub(r'[<>:"/\\|?*\x00-\x1f]', "_", name)
|
|
return clean[:max_len].strip()
|
|
|
|
|
|
def _detect_origin(subject: str, body: str) -> str:
|
|
"""work/external 자동 감지"""
|
|
text = f"{subject} {body[:500]}".lower()
|
|
for kw in WORK_KEYWORDS:
|
|
if kw.lower() in text:
|
|
return "work"
|
|
return "external"
|
|
|
|
|
|
def _fetch_emails_sync(host: str, port: int, user: str, password: str, last_uid: int | None, folder: str):
|
|
"""동기 IMAP 메일 가져오기 (asyncio.to_thread에서 실행).
|
|
|
|
선별 폴더(MAILPLUS_FOLDER, 기본 'PKM')만 수집 — INBOX 전체 X.
|
|
폴더 부재 시 no-op (사용자가 MailPlus 규칙으로 폴더 생성 전까진 안전하게 0건).
|
|
"""
|
|
results = []
|
|
conn = imaplib.IMAP4_SSL(host, port, timeout=30)
|
|
try:
|
|
conn.login(user, password)
|
|
typ, _ = conn.select(folder)
|
|
if typ != "OK":
|
|
logger.info(f"[메일] 폴더 '{folder}' 없음/접근불가 — 수집 건너뜀 (no-op)")
|
|
return results
|
|
|
|
if last_uid:
|
|
# 증분 동기화: last_uid 이후
|
|
_, data = conn.uid("search", None, f"UID {last_uid + 1}:*")
|
|
else:
|
|
# 최초 실행: 최근 7일
|
|
since = (datetime.now() - timedelta(days=7)).strftime("%d-%b-%Y")
|
|
_, data = conn.uid("search", None, f"SINCE {since}")
|
|
|
|
uids = (data[0] or b"").split()
|
|
for uid_bytes in uids:
|
|
uid = int(uid_bytes)
|
|
_, msg_data = conn.uid("fetch", uid_bytes, "(RFC822)")
|
|
if msg_data[0] is None:
|
|
continue
|
|
results.append((uid, msg_data[0][1]))
|
|
finally:
|
|
conn.logout()
|
|
|
|
return results
|
|
|
|
|
|
async def run():
|
|
"""이메일 수집 실행"""
|
|
host = os.getenv("MAILPLUS_HOST", "")
|
|
port = int(os.getenv("MAILPLUS_PORT", "993"))
|
|
user = os.getenv("MAILPLUS_USER", "")
|
|
password = os.getenv("MAILPLUS_PASS", "")
|
|
folder = os.getenv("MAILPLUS_FOLDER", "PKM")
|
|
|
|
if not all([host, user, password]):
|
|
logger.warning("MailPlus 인증 정보 미설정")
|
|
return
|
|
|
|
job_name = f"mailplus:{folder}"
|
|
|
|
async with async_session() as session:
|
|
# 마지막 UID 조회 (UID 는 폴더별 네임스페이스 → job_name 에 폴더 포함)
|
|
state = await session.execute(
|
|
select(AutomationState).where(AutomationState.job_name == job_name)
|
|
)
|
|
state_row = state.scalar_one_or_none()
|
|
last_uid = int(state_row.last_check_value) if state_row and state_row.last_check_value else None
|
|
|
|
# IMAP 동기 호출을 비동기로 래핑
|
|
try:
|
|
emails = await asyncio.to_thread(
|
|
_fetch_emails_sync, host, port, user, password, last_uid, folder,
|
|
)
|
|
except Exception as e:
|
|
logger.error(f"IMAP 연결 실패: {e}")
|
|
return
|
|
|
|
if not emails:
|
|
logger.info("새 이메일 없음")
|
|
return
|
|
|
|
# 이메일 저장 디렉토리
|
|
email_dir = Path(settings.nas_mount_path) / "PKM" / "Archive" / "emails"
|
|
email_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
max_uid = last_uid or 0
|
|
archived = []
|
|
|
|
for uid, raw_bytes in emails:
|
|
try:
|
|
msg = email.message_from_bytes(raw_bytes)
|
|
subject = _decode_mime_header(msg.get("Subject", "제목없음"))
|
|
date_str = msg.get("Date", "")
|
|
date = datetime.now().strftime("%Y%m%d")
|
|
|
|
# .eml 파일 저장
|
|
safe_subject = _sanitize_filename(subject)
|
|
filename = f"{date}_{uid}_{safe_subject}.eml"
|
|
eml_path = email_dir / filename
|
|
eml_path.write_bytes(raw_bytes)
|
|
|
|
# 본문 추출 (텍스트 파트)
|
|
body = ""
|
|
charset = None
|
|
if msg.is_multipart():
|
|
for part in msg.walk():
|
|
if part.get_content_type() == "text/plain":
|
|
payload = part.get_payload(decode=True)
|
|
if payload is not None:
|
|
charset = part.get_content_charset() or "utf-8"
|
|
body = payload.decode(charset, errors="replace")
|
|
break
|
|
else:
|
|
payload = msg.get_payload(decode=True)
|
|
if payload is not None:
|
|
charset = msg.get_content_charset() or "utf-8"
|
|
body = payload.decode(charset, errors="replace")
|
|
|
|
if "\ufffd" in body[:1000]:
|
|
logger.debug(f"[메일] charset={charset or 'unknown'} 디코딩 중 replacement 발생")
|
|
|
|
# DB 등록
|
|
rel_path = str(eml_path.relative_to(Path(settings.nas_mount_path)))
|
|
origin = _detect_origin(subject, body)
|
|
|
|
doc = Document(
|
|
file_path=rel_path,
|
|
file_hash=file_hash(eml_path),
|
|
file_format="eml",
|
|
file_size=len(raw_bytes),
|
|
file_type="immutable",
|
|
title=subject,
|
|
source_channel="email",
|
|
data_origin=origin,
|
|
)
|
|
session.add(doc)
|
|
await session.flush()
|
|
|
|
# 검색·색인 편입 (extract → classify → embed/chunk). 할일 연계 없음.
|
|
await enqueue_stage(session, doc.id, "extract")
|
|
|
|
archived.append(subject.replace("\n", " ").replace("\r", " ")[:200])
|
|
max_uid = max(max_uid, uid)
|
|
|
|
except Exception as e:
|
|
logger.error(f"UID {uid} 처리 실패: {e}")
|
|
|
|
# 상태 업데이트
|
|
if state_row:
|
|
state_row.last_check_value = str(max_uid)
|
|
state_row.last_run_at = datetime.now(timezone.utc)
|
|
else:
|
|
session.add(AutomationState(
|
|
job_name=job_name,
|
|
last_check_value=str(max_uid),
|
|
last_run_at=datetime.now(timezone.utc),
|
|
))
|
|
|
|
await session.commit()
|
|
|
|
# SMTP 알림
|
|
smtp_host = os.getenv("MAILPLUS_HOST", "")
|
|
smtp_port = int(os.getenv("MAILPLUS_SMTP_PORT", "465"))
|
|
if archived and smtp_host:
|
|
body = f"이메일 {len(archived)}건 수집 완료:\n\n" + "\n".join(f"- {s}" for s in archived)
|
|
send_smtp_email(smtp_host, smtp_port, user, password, "PKM 이메일 수집 알림", body)
|
|
|
|
logger.info(f"이메일 {len(archived)}건 수집 완료 (max_uid={max_uid})")
|