Files
hyungi_document_server/app/workers/mailplus_archive.py
Hyungi Ahn 46537ee11a fix: Codex 리뷰 P1/P2 버그 4건 수정
- [P1] migration runner 도입: schema_migrations 추적, advisory lock,
  단일 트랜잭션 실행, SQL 검증 (기존 DB 업그레이드 대응)
- [P1] eml extract 큐 조건 분기: extract_worker 미지원 포맷 큐 스킵
- [P2] iCalendar escape_ical_text() 추가: RFC 5545 준수
- [P2] 이메일 charset 감지: get_content_charset() 사용 + payload None 방어

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-02 15:55:38 +09:00

214 lines
7.5 KiB
Python

"""이메일 수집 워커 — Synology MailPlus IMAP → NAS 저장 + DB 등록
v1 scripts/mailplus_archive.py에서 포팅.
imaplib (동기)를 asyncio.to_thread()로 래핑.
"""
import asyncio
import email
import imaplib
import os
import re
from datetime import datetime, timedelta, timezone
from email.header import decode_header
from pathlib import Path
from sqlalchemy import select
from core.config import settings
from core.database import async_session
from core.utils import file_hash, send_smtp_email, setup_logger
from models.automation import AutomationState
from models.document import Document
from models.queue import ProcessingQueue
logger = setup_logger("mailplus_archive")
# 업무 키워드 (data_origin 자동 감지)
WORK_KEYWORDS = {"테크니컬코리아", "TK", "공장", "생산", "사내", "안전", "점검"}
def _decode_mime_header(raw: str) -> str:
"""MIME 헤더 디코딩"""
parts = decode_header(raw)
decoded = []
for data, charset in parts:
if isinstance(data, bytes):
decoded.append(data.decode(charset or "utf-8", errors="replace"))
else:
decoded.append(data)
return "".join(decoded)
def _sanitize_filename(name: str, max_len: int = 80) -> str:
"""파일명에 사용 불가한 문자 제거"""
clean = re.sub(r'[<>:"/\\|?*\x00-\x1f]', "_", name)
return clean[:max_len].strip()
def _detect_origin(subject: str, body: str) -> str:
"""work/external 자동 감지"""
text = f"{subject} {body[:500]}".lower()
for kw in WORK_KEYWORDS:
if kw.lower() in text:
return "work"
return "external"
def _fetch_emails_sync(host: str, port: int, user: str, password: str, last_uid: int | None):
"""동기 IMAP 메일 가져오기 (asyncio.to_thread에서 실행)"""
results = []
conn = imaplib.IMAP4_SSL(host, port, timeout=30)
try:
conn.login(user, password)
conn.select("INBOX")
if last_uid:
# 증분 동기화: last_uid 이후
_, data = conn.uid("search", None, f"UID {last_uid + 1}:*")
else:
# 최초 실행: 최근 7일
since = (datetime.now() - timedelta(days=7)).strftime("%d-%b-%Y")
_, data = conn.uid("search", None, f"SINCE {since}")
uids = data[0].split()
for uid_bytes in uids:
uid = int(uid_bytes)
_, msg_data = conn.uid("fetch", uid_bytes, "(RFC822)")
if msg_data[0] is None:
continue
raw = msg_data[0][1]
results.append((uid, raw))
finally:
conn.logout()
return results
async def run():
"""이메일 수집 실행"""
host = os.getenv("MAILPLUS_HOST", "")
port = int(os.getenv("MAILPLUS_PORT", "993"))
user = os.getenv("MAILPLUS_USER", "")
password = os.getenv("MAILPLUS_PASS", "")
if not all([host, user, password]):
logger.warning("MailPlus 인증 정보 미설정")
return
async with async_session() as session:
# 마지막 UID 조회
state = await session.execute(
select(AutomationState).where(AutomationState.job_name == "mailplus")
)
state_row = state.scalar_one_or_none()
last_uid = int(state_row.last_check_value) if state_row and state_row.last_check_value else None
# IMAP 동기 호출을 비동기로 래핑
try:
emails = await asyncio.to_thread(
_fetch_emails_sync, host, port, user, password, last_uid,
)
except Exception as e:
logger.error(f"IMAP 연결 실패: {e}")
return
if not emails:
logger.info("새 이메일 없음")
return
# 이메일 저장 디렉토리
email_dir = Path(settings.nas_mount_path) / "PKM" / "Archive" / "emails"
email_dir.mkdir(parents=True, exist_ok=True)
max_uid = last_uid or 0
archived = []
for uid, raw_bytes in emails:
try:
msg = email.message_from_bytes(raw_bytes)
subject = _decode_mime_header(msg.get("Subject", "제목없음"))
date_str = msg.get("Date", "")
date = datetime.now().strftime("%Y%m%d")
# .eml 파일 저장
safe_subject = _sanitize_filename(subject)
filename = f"{date}_{uid}_{safe_subject}.eml"
eml_path = email_dir / filename
eml_path.write_bytes(raw_bytes)
# 본문 추출 (텍스트 파트)
body = ""
charset = None
if msg.is_multipart():
for part in msg.walk():
if part.get_content_type() == "text/plain":
payload = part.get_payload(decode=True)
if payload is not None:
charset = part.get_content_charset() or "utf-8"
body = payload.decode(charset, errors="replace")
break
else:
payload = msg.get_payload(decode=True)
if payload is not None:
charset = msg.get_content_charset() or "utf-8"
body = payload.decode(charset, errors="replace")
if "\ufffd" in body[:1000]:
logger.debug(f"[메일] charset={charset or 'unknown'} 디코딩 중 replacement 발생")
# DB 등록
rel_path = str(eml_path.relative_to(Path(settings.nas_mount_path)))
origin = _detect_origin(subject, body)
doc = Document(
file_path=rel_path,
file_hash=file_hash(eml_path),
file_format="eml",
file_size=len(raw_bytes),
file_type="immutable",
title=subject,
source_channel="email",
data_origin=origin,
)
session.add(doc)
await session.flush()
safe_subj = subject.replace("\n", " ").replace("\r", " ")[:200]
# TODO: extract_worker가 eml 본문/첨부 파싱 지원 시 이 조건 제거
if doc.file_format != "eml":
session.add(ProcessingQueue(
document_id=doc.id, stage="extract", status="pending",
))
else:
logger.debug(f"[메일] {safe_subj} — eml extract 미지원, 큐 스킵")
archived.append(safe_subj)
max_uid = max(max_uid, uid)
except Exception as e:
logger.error(f"UID {uid} 처리 실패: {e}")
# 상태 업데이트
if state_row:
state_row.last_check_value = str(max_uid)
state_row.last_run_at = datetime.now(timezone.utc)
else:
session.add(AutomationState(
job_name="mailplus",
last_check_value=str(max_uid),
last_run_at=datetime.now(timezone.utc),
))
await session.commit()
# SMTP 알림
smtp_host = os.getenv("MAILPLUS_HOST", "")
smtp_port = int(os.getenv("MAILPLUS_SMTP_PORT", "465"))
if archived and smtp_host:
body = f"이메일 {len(archived)}건 수집 완료:\n\n" + "\n".join(f"- {s}" for s in archived)
send_smtp_email(smtp_host, smtp_port, user, password, "PKM 이메일 수집 알림", body)
logger.info(f"이메일 {len(archived)}건 수집 완료 (max_uid={max_uid})")