feat(email): IMAP ingest service for inbox@hyungi.net

신규 워커 app/workers/inbox_ingest.py (337줄):
- 5분 APScheduler cron (mailplus_archive 와 분리 — INBOX root archive vs DocumentServer/Ingest folder)
- UID SEARCH SINCE 14일 (UNSEEN 단독 의존 X, 사용자가 MailPlus UI 에서 먼저 읽어도 누락 회피)
- Message-ID 정규화 또는 imap:{folder}:{uidvalidity}:{uid} fallback → source_external_id always non-null
- ON CONFLICT DO NOTHING (DB unique 진실원장)
- 신규 row 만 BODY parse: snippet + HTML stripping + attachment metadata (binary 저장 X)
- enqueue_stage(doc.id, classify) 로 기존 classify pipeline 진입
- HC.io heartbeat (옵션, INBOX_INGEST_HC_URL)
- parse 실패 분기: row 생성 전 (logger.error + HC fail) / 후 (email_metadata.parse_error 기록)

env (credentials.env.example):
- INBOX_INGEST_ENABLED=false (기본 dormant, 사용자가 alias/folder 셋업 후 true)
- INBOX_INGEST_FOLDER=DocumentServer/Ingest
- INBOX_INGEST_DAYS=14
- INBOX_INGEST_HC_URL=

main.py:
- inbox_ingest_run import + scheduler.add_job interval 5m

email_ingest 정책 (사용자 라운드 2026-05-12):
- 직접 events row 생성 X
- 이메일은 universal inbox item, source_channel=email memo 로 저장
- classify_worker 가 ai_event_kind 채움 (별 PR 의 4B robustness fix 선결)
- 사용자 1-click promote 만이 events row 생성 path

plan: ~/.claude/plans/document-enchanted-candy.md
This commit is contained in:
hyungi
2026-05-12 06:56:35 +00:00
parent c49047bf2a
commit f4eef9e6e0
3 changed files with 348 additions and 0 deletions
+4
View File
@@ -45,6 +45,7 @@ async def lifespan(app: FastAPI):
from workers.file_watcher import watch_inbox from workers.file_watcher import watch_inbox
from workers.law_monitor import run as law_monitor_run from workers.law_monitor import run as law_monitor_run
from workers.mailplus_archive import run as mailplus_run from workers.mailplus_archive import run as mailplus_run
from workers.inbox_ingest import run as inbox_ingest_run
from workers.news_collector import run as news_collector_run from workers.news_collector import run as news_collector_run
from workers.queue_consumer import consume_queue from workers.queue_consumer import consume_queue
from workers.study_queue_consumer import consume_study_queue from workers.study_queue_consumer import consume_study_queue
@@ -93,6 +94,9 @@ async def lifespan(app: FastAPI):
scheduler.add_job(law_monitor_run, CronTrigger(hour=7), id="law_monitor") scheduler.add_job(law_monitor_run, CronTrigger(hour=7), id="law_monitor")
scheduler.add_job(mailplus_run, CronTrigger(hour=7), id="mailplus_morning") scheduler.add_job(mailplus_run, CronTrigger(hour=7), id="mailplus_morning")
scheduler.add_job(mailplus_run, CronTrigger(hour=18), id="mailplus_evening") scheduler.add_job(mailplus_run, CronTrigger(hour=18), id="mailplus_evening")
# PR-4: inbox@hyungi.net IMAP ingest (DocumentServer/Ingest 폴더, 5분 cron).
# plan: ~/.claude/plans/document-enchanted-candy.md
scheduler.add_job(inbox_ingest_run, "interval", minutes=5, id="inbox_ingest")
scheduler.add_job(daily_digest_run, CronTrigger(hour=20), id="daily_digest") scheduler.add_job(daily_digest_run, CronTrigger(hour=20), id="daily_digest")
scheduler.add_job(global_digest_run, CronTrigger(hour=4, minute=0), id="global_digest") scheduler.add_job(global_digest_run, CronTrigger(hour=4, minute=0), id="global_digest")
scheduler.add_job(morning_briefing_run, CronTrigger(hour=5, minute=10), id="morning_briefing") scheduler.add_job(morning_briefing_run, CronTrigger(hour=5, minute=10), id="morning_briefing")
+337
View File
@@ -0,0 +1,337 @@
"""inbox@hyungi.net IMAP ingest 워커 — PR-4 Email Ingest.
Plan: ~/.claude/plans/document-enchanted-candy.md
MailPlus 의 `DocumentServer/Ingest` 폴더 (또는 사용자 지정 폴더) 를 5분 cron 으로
polling. 메일을 source_channel='email' memo 로 생성하고 classify queue 등록.
기존 mailplus_archive (INBOX root archive) 와 폴더 분리 + source_external_id dedup 으로
중복 안 함.
정책 (사용자 라운드 2026-05-12):
- email_ingest 는 events row 직접 생성 X. memo 만 만들고 사용자 promote 가 events 생성.
- source_external_id always non-null (Message-ID 정규화 또는 imap:{folder}:{uidvalidity}:{uid}).
- DB unique index 가 dedup 진실원장. \\Seen flag 는 best-effort.
- 첨부는 metadata 만, NAS 실 연동은 별 PR.
"""
import asyncio
import email
import hashlib
import imaplib
import os
import re
from datetime import datetime, timedelta, timezone
from email.header import decode_header
from email.utils import parsedate_to_datetime
from typing import Any
import httpx
from sqlalchemy import select
from sqlalchemy.dialects.postgresql import insert as pg_insert
from core.database import async_session
from core.utils import setup_logger
from models.automation import AutomationState
from models.document import Document
from models.queue import enqueue_stage
logger = setup_logger("inbox_ingest")
_TAG_RE = re.compile(r"<[^>]+>")
_WS_RE = re.compile(r"[ \t]+")
_NL_RE = re.compile(r"\n{3,}")
_MID_BRACKETS = re.compile(r"^<|>$")
def _decode_mime_header(raw: str | None) -> str:
if not raw:
return ""
parts = decode_header(raw)
out = []
for data, charset in parts:
if isinstance(data, bytes):
out.append(data.decode(charset or "utf-8", errors="replace"))
else:
out.append(data)
return "".join(out)
def _normalize_message_id(raw: str | None) -> str | None:
if not raw:
return None
mid = raw.strip()
if not mid:
return None
mid = _MID_BRACKETS.sub("", mid).strip()
return mid.lower() or None
def _build_source_external_id(message_id: str | None, folder: str, uidvalidity: str, uid: int) -> str:
if message_id:
return message_id
return f"imap:{folder}:{uidvalidity}:{uid}"
def _strip_html(html: str) -> str:
text = _TAG_RE.sub("", html)
text = text.replace("&nbsp;", " ").replace("&amp;", "&").replace("&lt;", "<").replace("&gt;", ">").replace("&quot;", '"').replace("&#39;", "'")
text = _WS_RE.sub(" ", text)
text = _NL_RE.sub("\n\n", text)
return text.strip()
def _extract_body(msg: email.message.Message) -> str:
text_part = None
html_part = None
if msg.is_multipart():
for part in msg.walk():
ctype = part.get_content_type()
if part.get("Content-Disposition", "").startswith("attachment"):
continue
if ctype == "text/plain" and text_part is None:
text_part = part
elif ctype == "text/html" and html_part is None:
html_part = part
else:
ctype = msg.get_content_type()
if ctype == "text/plain":
text_part = msg
elif ctype == "text/html":
html_part = msg
target = text_part or html_part
if target is None:
return ""
try:
payload = target.get_payload(decode=True)
if payload is None:
return ""
charset = target.get_content_charset() or "utf-8"
body = payload.decode(charset, errors="replace")
except Exception as e:
logger.warning(f"[inbox_ingest] body decode 실패: {e}")
return ""
if target.get_content_type() == "text/html":
body = _strip_html(body)
return body.strip()
def _extract_attachments_meta(msg: email.message.Message) -> list[dict]:
out: list[dict] = []
if not msg.is_multipart():
return out
for idx, part in enumerate(msg.walk()):
disp = part.get("Content-Disposition", "")
if not disp.startswith("attachment") and not part.get_filename():
continue
filename = _decode_mime_header(part.get_filename()) or f"unnamed_{idx}"
size: int | None = None
try:
payload = part.get_payload(decode=True)
if payload is not None:
size = len(payload)
except Exception:
size = None
out.append({
"filename": filename,
"mime": part.get_content_type(),
"size": size,
"part_id": idx,
})
return out
def _parse_received_at(raw: str | None) -> str | None:
if not raw:
return None
try:
dt = parsedate_to_datetime(raw)
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
return dt.astimezone(timezone.utc).isoformat()
except Exception:
return None
def _fetch_uids_sync(host: str, port: int, user: str, password: str, folder: str, since_days: int) -> tuple[str, list[tuple[int, bytes]]]:
"""동기 IMAP fetch — 폴더 선택 + UID SEARCH SINCE + RFC822 fetch.
Returns (uidvalidity_str, [(uid, raw_bytes), ...]).
"""
conn = imaplib.IMAP4_SSL(host, port, timeout=30)
try:
conn.login(user, password)
typ, _ = conn.select(f'"{folder}"')
if typ != "OK":
raise RuntimeError(f"folder select 실패: {folder}")
# uidvalidity 추출 (STATUS 또는 SELECT 응답)
typ, status_data = conn.status(f'"{folder}"', "(UIDVALIDITY)")
uidvalidity = "0"
if typ == "OK" and status_data:
m = re.search(rb"UIDVALIDITY (\d+)", status_data[0])
if m:
uidvalidity = m.group(1).decode()
since = (datetime.now(timezone.utc) - timedelta(days=since_days)).strftime("%d-%b-%Y")
typ, data = conn.uid("search", None, f"SINCE {since}")
if typ != "OK":
return uidvalidity, []
uids = data[0].split() if data and data[0] else []
results = []
for uid_bytes in uids:
uid = int(uid_bytes)
typ, msg_data = conn.uid("fetch", uid_bytes, "(RFC822)")
if typ != "OK" or not msg_data or msg_data[0] is None:
continue
raw = msg_data[0][1]
if isinstance(raw, bytes):
results.append((uid, raw))
return uidvalidity, results
finally:
try:
conn.logout()
except Exception:
pass
async def _heartbeat(url: str | None, success: bool):
if not url:
return
target = url if success else f"{url.rstrip('/')}/fail"
try:
async with httpx.AsyncClient(timeout=10) as c:
await c.get(target)
except Exception as e:
logger.debug(f"[inbox_ingest] HC heartbeat 실패: {e}")
async def run():
"""5분 cron 진입점 — DocumentServer/Ingest 폴더 polling + memo 생성."""
enabled = os.getenv("INBOX_INGEST_ENABLED", "false").lower() == "true"
if not enabled:
logger.debug("[inbox_ingest] INBOX_INGEST_ENABLED=false, skip")
return
host = os.getenv("MAILPLUS_HOST", "")
port = int(os.getenv("MAILPLUS_PORT", "993"))
user = os.getenv("MAILPLUS_USER", "")
password = os.getenv("MAILPLUS_PASS", "")
folder = os.getenv("INBOX_INGEST_FOLDER", "DocumentServer/Ingest")
since_days = int(os.getenv("INBOX_INGEST_DAYS", "14"))
hc_url = os.getenv("INBOX_INGEST_HC_URL", "").strip() or None
if not all([host, user, password]):
logger.warning("[inbox_ingest] MailPlus 인증 정보 미설정 — skip")
return
try:
uidvalidity, emails = await asyncio.to_thread(
_fetch_uids_sync, host, port, user, password, folder, since_days,
)
except Exception as e:
logger.error(f"[inbox_ingest] IMAP fetch 실패: {e}")
await _heartbeat(hc_url, success=False)
return
if not emails:
logger.info("[inbox_ingest] 새 메일 0건")
await _heartbeat(hc_url, success=True)
return
created = 0
skipped = 0
parse_failed = 0
async with async_session() as session:
for uid, raw_bytes in emails:
try:
msg = email.message_from_bytes(raw_bytes)
message_id = _normalize_message_id(msg.get("Message-ID"))
source_external_id = _build_source_external_id(message_id, folder, uidvalidity, uid)
# ON CONFLICT DO NOTHING (DB unique 진실원장)
stmt = (
pg_insert(Document)
.values(
file_path=None,
file_hash=hashlib.sha256(raw_bytes).hexdigest(),
file_format="eml",
file_size=len(raw_bytes),
file_type="note",
source_channel="email",
source_external_id=source_external_id,
# 이메일 본문/제목/metadata 는 아래에서 채움 (placeholder 로 일단 row 생성)
title=_decode_mime_header(msg.get("Subject"))[:500] or "(제목없음)",
extracted_text="",
email_metadata={},
)
.on_conflict_do_nothing(
index_elements=["source_external_id"],
index_where="source_channel = 'email' AND source_external_id IS NOT NULL",
)
.returning(Document.id)
)
result = await session.execute(stmt)
row = result.first()
if row is None:
skipped += 1
continue
doc_id = row[0]
# 본문/metadata parse (row 생성 후 실패 = email_metadata.parse_error 기록)
try:
body = _extract_body(msg)
attachments = _extract_attachments_meta(msg)
metadata: dict[str, Any] = {
"from": _decode_mime_header(msg.get("From")),
"to": [_decode_mime_header(a) for a in msg.get_all("To", [])],
"cc": [_decode_mime_header(a) for a in msg.get_all("Cc", [])],
"subject": _decode_mime_header(msg.get("Subject")),
"folder": folder,
"uidvalidity": uidvalidity,
"uid": uid,
"received_at": _parse_received_at(msg.get("Date")),
"attachments": attachments,
}
except Exception as parse_exc:
logger.warning(f"[inbox_ingest] doc {doc_id} parse 실패: {parse_exc}")
body = ""
metadata = {"parse_error": str(parse_exc), "folder": folder, "uidvalidity": uidvalidity, "uid": uid}
parse_failed += 1
# UPDATE 로 본문/metadata 채움
doc = (await session.execute(select(Document).where(Document.id == doc_id))).scalar_one()
doc.extracted_text = body
doc.email_metadata = metadata
# classify pipeline 진입 (4B triage fail 상태는 별 PR. ingest 자체는 정상 진행)
await enqueue_stage(session, doc_id, "classify")
created += 1
except Exception as e:
logger.error(f"[inbox_ingest] UID {uid} 처리 실패: {e}")
# automation_state 기록 (참고용, dedup 진실원장 아님)
state = await session.execute(
select(AutomationState).where(AutomationState.job_name == "inbox_ingest")
)
state_row = state.scalar_one_or_none()
now_utc = datetime.now(timezone.utc)
if state_row:
state_row.last_run_at = now_utc
state_row.last_check_value = str(uid) if emails else state_row.last_check_value
else:
session.add(AutomationState(
job_name="inbox_ingest",
last_check_value=str(uid) if emails else "0",
last_run_at=now_utc,
))
await session.commit()
logger.info(
f"[inbox_ingest] folder={folder} fetched={len(emails)} created={created} "
f"skipped(dedup)={skipped} parse_failed={parse_failed}"
)
await _heartbeat(hc_url, success=True)
+7
View File
@@ -32,6 +32,13 @@ MAILPLUS_PORT=993
MAILPLUS_SMTP_PORT=465 MAILPLUS_SMTP_PORT=465
MAILPLUS_USER=hyungi MAILPLUS_USER=hyungi
MAILPLUS_PASS= MAILPLUS_PASS=
# PR-4 inbox@hyungi.net IMAP ingest (DocumentServer/Ingest 폴더 → memo).
# enable 시 INBOX_INGEST_ENABLED=true. 폴더는 alias 수신 메일이 자동 이동되는 곳.
INBOX_INGEST_ENABLED=false
INBOX_INGEST_FOLDER=DocumentServer/Ingest
INBOX_INGEST_DAYS=14
INBOX_INGEST_HC_URL=
# ─── Synology Calendar (CalDAV, 태스크 관리) ─── # ─── Synology Calendar (CalDAV, 태스크 관리) ───
CALDAV_URL=https://ds1525.hyungi.net/caldav/ CALDAV_URL=https://ds1525.hyungi.net/caldav/