feat/email-ingest-inbox #18

Open
hyungi wants to merge 4 commits from feat/email-ingest-inbox into main
9 changed files with 395 additions and 4 deletions
+11 -2
View File
@@ -15,7 +15,7 @@ from typing import Annotated, Any
from fastapi import APIRouter, Depends, File, Form, HTTPException, Query, UploadFile
from pydantic import BaseModel, Field
from sqlalchemy import delete, func, select
from sqlalchemy import delete, func, select, or_, and_
from sqlalchemy.ext.asyncio import AsyncSession
from core.auth import get_current_user
@@ -178,6 +178,9 @@ class MemoResponse(BaseModel):
source_channel: str | None = None # voice/memo 등 진입점 식별 (UI 배지)
file_type: str | None = None # audio (voice 메모) vs note (text 메모)
file_path: str | None = None # voice 메모의 NAS audio 경로 (audio player 용)
# PR-4 Email Ingest — 이메일 source 메모 식별 + UI 표시용
source_external_id: str | None = None # email 의 Message-ID 또는 imap UID fallback
email_subject: str | None = None # email_metadata.subject — 메모 카드 부제 / 툴팁
created_at: datetime
updated_at: datetime
@@ -212,6 +215,8 @@ def _to_memo_response(doc: Document) -> MemoResponse:
source_channel=doc.source_channel,
file_type=doc.file_type,
file_path=doc.file_path,
source_external_id=doc.source_external_id,
email_subject=(doc.email_metadata or {}).get('subject') if doc.email_metadata else None,
created_at=doc.created_at,
updated_at=doc.updated_at,
)
@@ -274,8 +279,12 @@ async def list_memos(
voice 메모는 file_type='immutable' + category='audio' + source_channel='voice' 패턴.
source_channel 만으로 분리 (file_type 필터는 immutable 다른 binary 까지 끌어옴 — 회피).
"""
# PR-4: inbox_ingest 가 만든 email memo 도 포함 (source_external_id != NULL 로 mailplus_archive 의 archive row 제외)
base = select(Document).where(
Document.source_channel.in_(("memo", "voice")),
or_(
Document.source_channel.in_(("memo", "voice")),
and_(Document.source_channel == "email", Document.source_external_id.isnot(None)),
),
Document.deleted_at == None, # noqa: E711
Document.archived == archived,
)
+4
View File
@@ -45,6 +45,7 @@ async def lifespan(app: FastAPI):
from workers.file_watcher import watch_inbox
from workers.law_monitor import run as law_monitor_run
from workers.mailplus_archive import run as mailplus_run
from workers.inbox_ingest import run as inbox_ingest_run
from workers.news_collector import run as news_collector_run
from workers.queue_consumer import consume_queue
from workers.study_queue_consumer import consume_study_queue
@@ -93,6 +94,9 @@ async def lifespan(app: FastAPI):
scheduler.add_job(law_monitor_run, CronTrigger(hour=7), id="law_monitor")
scheduler.add_job(mailplus_run, CronTrigger(hour=7), id="mailplus_morning")
scheduler.add_job(mailplus_run, CronTrigger(hour=18), id="mailplus_evening")
# PR-4: inbox@hyungi.net IMAP ingest (DocumentServer/Ingest 폴더, 5분 cron).
# plan: ~/.claude/plans/document-enchanted-candy.md
scheduler.add_job(inbox_ingest_run, "interval", minutes=5, id="inbox_ingest")
scheduler.add_job(daily_digest_run, CronTrigger(hour=20), id="daily_digest")
scheduler.add_job(global_digest_run, CronTrigger(hour=4, minute=0), id="global_digest")
scheduler.add_job(morning_briefing_run, CronTrigger(hour=5, minute=10), id="morning_briefing")
+6
View File
@@ -100,6 +100,12 @@ class Document(Base):
preview_hash: Mapped[str | None] = mapped_column(String(64))
preview_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True))
# PR-4 Email Ingest — 외부 source dedup key + 메일 metadata
# source_external_id: email 에선 always non-null (Message-ID 또는 imap UID fallback). 다른 source 는 NULL 가능.
# email_metadata: from/to/cc/subject/folder/uidvalidity/uid/received_at/mailplus_link/attachments[].
source_external_id: Mapped[str | None] = mapped_column(Text)
email_metadata: Mapped[dict | None] = mapped_column(JSONB)
# 메타데이터
source_channel: Mapped[str | None] = mapped_column(
Enum("law_monitor", "devonagent", "email", "web_clip",
+337
View File
@@ -0,0 +1,337 @@
"""inbox@hyungi.net IMAP ingest 워커 — PR-4 Email Ingest.
Plan: ~/.claude/plans/document-enchanted-candy.md
MailPlus 의 `DocumentServer/Ingest` 폴더 (또는 사용자 지정 폴더) 를 5분 cron 으로
polling. 메일을 source_channel='email' memo 로 생성하고 classify queue 등록.
기존 mailplus_archive (INBOX root archive) 와 폴더 분리 + source_external_id dedup 으로
중복 안 함.
정책 (사용자 라운드 2026-05-12):
- email_ingest 는 events row 직접 생성 X. memo 만 만들고 사용자 promote 가 events 생성.
- source_external_id always non-null (Message-ID 정규화 또는 imap:{folder}:{uidvalidity}:{uid}).
- DB unique index 가 dedup 진실원장. \\Seen flag 는 best-effort.
- 첨부는 metadata 만, NAS 실 연동은 별 PR.
"""
import asyncio
import email
import hashlib
import imaplib
import os
import re
from datetime import datetime, timedelta, timezone
from email.header import decode_header
from email.utils import parsedate_to_datetime
from typing import Any
import httpx
from sqlalchemy import select
from sqlalchemy.dialects.postgresql import insert as pg_insert
from core.database import async_session
from core.utils import setup_logger
from models.automation import AutomationState
from models.document import Document
from models.queue import enqueue_stage
logger = setup_logger("inbox_ingest")
_TAG_RE = re.compile(r"<[^>]+>")
_WS_RE = re.compile(r"[ \t]+")
_NL_RE = re.compile(r"\n{3,}")
_MID_BRACKETS = re.compile(r"^<|>$")
def _decode_mime_header(raw: str | None) -> str:
if not raw:
return ""
parts = decode_header(raw)
out = []
for data, charset in parts:
if isinstance(data, bytes):
out.append(data.decode(charset or "utf-8", errors="replace"))
else:
out.append(data)
return "".join(out)
def _normalize_message_id(raw: str | None) -> str | None:
if not raw:
return None
mid = raw.strip()
if not mid:
return None
mid = _MID_BRACKETS.sub("", mid).strip()
return mid.lower() or None
def _build_source_external_id(message_id: str | None, folder: str, uidvalidity: str, uid: int) -> str:
if message_id:
return message_id
return f"imap:{folder}:{uidvalidity}:{uid}"
def _strip_html(html: str) -> str:
text = _TAG_RE.sub("", html)
text = text.replace("&nbsp;", " ").replace("&amp;", "&").replace("&lt;", "<").replace("&gt;", ">").replace("&quot;", '"').replace("&#39;", "'")
text = _WS_RE.sub(" ", text)
text = _NL_RE.sub("\n\n", text)
return text.strip()
def _extract_body(msg: email.message.Message) -> str:
text_part = None
html_part = None
if msg.is_multipart():
for part in msg.walk():
ctype = part.get_content_type()
if part.get("Content-Disposition", "").startswith("attachment"):
continue
if ctype == "text/plain" and text_part is None:
text_part = part
elif ctype == "text/html" and html_part is None:
html_part = part
else:
ctype = msg.get_content_type()
if ctype == "text/plain":
text_part = msg
elif ctype == "text/html":
html_part = msg
target = text_part or html_part
if target is None:
return ""
try:
payload = target.get_payload(decode=True)
if payload is None:
return ""
charset = target.get_content_charset() or "utf-8"
body = payload.decode(charset, errors="replace")
except Exception as e:
logger.warning(f"[inbox_ingest] body decode 실패: {e}")
return ""
if target.get_content_type() == "text/html":
body = _strip_html(body)
return body.strip()
def _extract_attachments_meta(msg: email.message.Message) -> list[dict]:
out: list[dict] = []
if not msg.is_multipart():
return out
for idx, part in enumerate(msg.walk()):
disp = part.get("Content-Disposition", "")
if not disp.startswith("attachment") and not part.get_filename():
continue
filename = _decode_mime_header(part.get_filename()) or f"unnamed_{idx}"
size: int | None = None
try:
payload = part.get_payload(decode=True)
if payload is not None:
size = len(payload)
except Exception:
size = None
out.append({
"filename": filename,
"mime": part.get_content_type(),
"size": size,
"part_id": idx,
})
return out
def _parse_received_at(raw: str | None) -> str | None:
if not raw:
return None
try:
dt = parsedate_to_datetime(raw)
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
return dt.astimezone(timezone.utc).isoformat()
except Exception:
return None
def _fetch_uids_sync(host: str, port: int, user: str, password: str, folder: str, since_days: int) -> tuple[str, list[tuple[int, bytes]]]:
"""동기 IMAP fetch — 폴더 선택 + UID SEARCH SINCE + RFC822 fetch.
Returns (uidvalidity_str, [(uid, raw_bytes), ...]).
"""
conn = imaplib.IMAP4_SSL(host, port, timeout=30)
try:
conn.login(user, password)
typ, _ = conn.select(f'"{folder}"')
if typ != "OK":
raise RuntimeError(f"folder select 실패: {folder}")
# uidvalidity 추출 (STATUS 또는 SELECT 응답)
typ, status_data = conn.status(f'"{folder}"', "(UIDVALIDITY)")
uidvalidity = "0"
if typ == "OK" and status_data:
m = re.search(rb"UIDVALIDITY (\d+)", status_data[0])
if m:
uidvalidity = m.group(1).decode()
since = (datetime.now(timezone.utc) - timedelta(days=since_days)).strftime("%d-%b-%Y")
typ, data = conn.uid("search", None, f"SINCE {since}")
if typ != "OK":
return uidvalidity, []
uids = data[0].split() if data and data[0] else []
results = []
for uid_bytes in uids:
uid = int(uid_bytes)
typ, msg_data = conn.uid("fetch", uid_bytes, "(RFC822)")
if typ != "OK" or not msg_data or msg_data[0] is None:
continue
raw = msg_data[0][1]
if isinstance(raw, bytes):
results.append((uid, raw))
return uidvalidity, results
finally:
try:
conn.logout()
except Exception:
pass
async def _heartbeat(url: str | None, success: bool):
if not url:
return
target = url if success else f"{url.rstrip('/')}/fail"
try:
async with httpx.AsyncClient(timeout=10) as c:
await c.get(target)
except Exception as e:
logger.debug(f"[inbox_ingest] HC heartbeat 실패: {e}")
async def run():
"""5분 cron 진입점 — DocumentServer/Ingest 폴더 polling + memo 생성."""
enabled = os.getenv("INBOX_INGEST_ENABLED", "false").lower() == "true"
if not enabled:
logger.debug("[inbox_ingest] INBOX_INGEST_ENABLED=false, skip")
return
host = os.getenv("MAILPLUS_HOST", "")
port = int(os.getenv("MAILPLUS_PORT", "993"))
user = os.getenv("MAILPLUS_USER", "")
password = os.getenv("MAILPLUS_PASS", "")
folder = os.getenv("INBOX_INGEST_FOLDER", "DocumentServer/Ingest")
since_days = int(os.getenv("INBOX_INGEST_DAYS", "14"))
hc_url = os.getenv("INBOX_INGEST_HC_URL", "").strip() or None
if not all([host, user, password]):
logger.warning("[inbox_ingest] MailPlus 인증 정보 미설정 — skip")
return
try:
uidvalidity, emails = await asyncio.to_thread(
_fetch_uids_sync, host, port, user, password, folder, since_days,
)
except Exception as e:
logger.error(f"[inbox_ingest] IMAP fetch 실패: {e}")
await _heartbeat(hc_url, success=False)
return
if not emails:
logger.info("[inbox_ingest] 새 메일 0건")
await _heartbeat(hc_url, success=True)
return
created = 0
skipped = 0
parse_failed = 0
async with async_session() as session:
for uid, raw_bytes in emails:
try:
msg = email.message_from_bytes(raw_bytes)
message_id = _normalize_message_id(msg.get("Message-ID"))
source_external_id = _build_source_external_id(message_id, folder, uidvalidity, uid)
# ON CONFLICT DO NOTHING (DB unique 진실원장)
stmt = (
pg_insert(Document)
.values(
file_path=None,
file_hash=hashlib.sha256(raw_bytes).hexdigest(),
file_format="eml",
file_size=len(raw_bytes),
file_type="note",
source_channel="email",
source_external_id=source_external_id,
# 이메일 본문/제목/metadata 는 아래에서 채움 (placeholder 로 일단 row 생성)
title=_decode_mime_header(msg.get("Subject"))[:500] or "(제목없음)",
extracted_text="",
email_metadata={},
)
.on_conflict_do_nothing(
index_elements=["source_external_id"],
index_where="source_channel = 'email' AND source_external_id IS NOT NULL",
)
.returning(Document.id)
)
result = await session.execute(stmt)
row = result.first()
if row is None:
skipped += 1
continue
doc_id = row[0]
# 본문/metadata parse (row 생성 후 실패 = email_metadata.parse_error 기록)
try:
body = _extract_body(msg)
attachments = _extract_attachments_meta(msg)
metadata: dict[str, Any] = {
"from": _decode_mime_header(msg.get("From")),
"to": [_decode_mime_header(a) for a in msg.get_all("To", [])],
"cc": [_decode_mime_header(a) for a in msg.get_all("Cc", [])],
"subject": _decode_mime_header(msg.get("Subject")),
"folder": folder,
"uidvalidity": uidvalidity,
"uid": uid,
"received_at": _parse_received_at(msg.get("Date")),
"attachments": attachments,
}
except Exception as parse_exc:
logger.warning(f"[inbox_ingest] doc {doc_id} parse 실패: {parse_exc}")
body = ""
metadata = {"parse_error": str(parse_exc), "folder": folder, "uidvalidity": uidvalidity, "uid": uid}
parse_failed += 1
# UPDATE 로 본문/metadata 채움
doc = (await session.execute(select(Document).where(Document.id == doc_id))).scalar_one()
doc.extracted_text = body
doc.email_metadata = metadata
# classify pipeline 진입 (4B triage fail 상태는 별 PR. ingest 자체는 정상 진행)
await enqueue_stage(session, doc_id, "classify")
created += 1
except Exception as e:
logger.error(f"[inbox_ingest] UID {uid} 처리 실패: {e}")
# automation_state 기록 (참고용, dedup 진실원장 아님)
state = await session.execute(
select(AutomationState).where(AutomationState.job_name == "inbox_ingest")
)
state_row = state.scalar_one_or_none()
now_utc = datetime.now(timezone.utc)
if state_row:
state_row.last_run_at = now_utc
state_row.last_check_value = str(uid) if emails else state_row.last_check_value
else:
session.add(AutomationState(
job_name="inbox_ingest",
last_check_value=str(uid) if emails else "0",
last_run_at=now_utc,
))
await session.commit()
logger.info(
f"[inbox_ingest] folder={folder} fetched={len(emails)} created={created} "
f"skipped(dedup)={skipped} parse_failed={parse_failed}"
)
await _heartbeat(hc_url, success=True)
+7
View File
@@ -32,6 +32,13 @@ MAILPLUS_PORT=993
MAILPLUS_SMTP_PORT=465
MAILPLUS_USER=hyungi
MAILPLUS_PASS=
# PR-4 inbox@hyungi.net IMAP ingest (DocumentServer/Ingest 폴더 → memo).
# enable 시 INBOX_INGEST_ENABLED=true. 폴더는 alias 수신 메일이 자동 이동되는 곳.
INBOX_INGEST_ENABLED=false
INBOX_INGEST_FOLDER=DocumentServer/Ingest
INBOX_INGEST_DAYS=14
INBOX_INGEST_HC_URL=
# ─── Synology Calendar (CalDAV, 태스크 관리) ───
CALDAV_URL=https://ds1525.hyungi.net/caldav/
+7 -2
View File
@@ -3,7 +3,7 @@
import { api } from '$lib/api';
import { addToast } from '$lib/stores/toast';
import { renderMemoHtml, todayIso, countHiddenTasks, DEFAULT_HIDE_AFTER_MS } from '$lib/utils/memoRenderer';
import { Pin, PinOff, Pencil, Trash2, Eye, EyeOff, X, Check, Archive, ArchiveRestore, ListChecks, Bold, Heading, CalendarDays, Mic, Calendar, Activity, ArrowRight, FileText, BookOpen } from 'lucide-svelte';
import { Pin, PinOff, Pencil, Trash2, Eye, EyeOff, X, Check, Archive, ArchiveRestore, ListChecks, Bold, Heading, CalendarDays, Mic, Calendar, Activity, ArrowRight, FileText, BookOpen, Mail } from 'lucide-svelte';
import { getAccessToken } from '$lib/api';
import Button from '$lib/components/ui/Button.svelte';
import Card from '$lib/components/ui/Card.svelte';
@@ -523,13 +523,18 @@
{:else}
<!-- ═══ 읽기 모드 ═══ -->
<!-- PR-2B/2C: 분류 배지 + voice icon + 마지막 promote 결과 -->
{#if memo.source_channel === 'voice' || memo.ai_event_kind || memo._last_promoted}
{#if memo.source_channel === 'voice' || memo.source_channel === 'email' || memo.ai_event_kind || memo._last_promoted}
<div class="flex flex-wrap items-center gap-1.5 mb-1.5">
{#if memo.source_channel === 'voice'}
<span class="inline-flex items-center gap-1 rounded px-1.5 py-0.5 text-[10px] bg-rose-100 text-rose-700" title="음성 메모">
<Mic size={10} /> 음성
</span>
{/if}
{#if memo.source_channel === 'email'}
<span class="inline-flex items-center gap-1 rounded px-1.5 py-0.5 text-[10px] bg-sky-100 text-sky-700" title={memo.email_subject || '이메일 inbox'}>
<Mail size={10} /> 이메일
</span>
{/if}
{#if memo.ai_event_kind && memo.ai_event_kind !== 'note'}
<span class="inline-flex items-center rounded px-1.5 py-0.5 text-[10px] {KIND_BADGE_CLASS[memo.ai_event_kind] || 'bg-surface text-dim'}">
AI 추천: {KIND_LABELS[memo.ai_event_kind] || memo.ai_event_kind}{memo.ai_event_confidence != null ? ` · ${Math.round(memo.ai_event_confidence * 100)}%` : ''}
@@ -0,0 +1,7 @@
-- PR-4 Email Ingest — documents.source_external_id 컬럼 추가
-- 외부 source 의 dedup key. email source 에서는 always non-null (ingest 코드 책임).
-- Message-ID 정규화 또는 imap:{folder}:{uidvalidity}:{uid} fallback.
-- 다른 source_channel 에서는 NULL 허용 (별 의미 부여 시 nullable→unique 검토).
ALTER TABLE documents
ADD COLUMN IF NOT EXISTS source_external_id TEXT;
@@ -0,0 +1,9 @@
-- PR-4 Email Ingest — documents.email_metadata JSONB 컬럼 추가
-- 구조: {from, to[], cc[], subject, folder, uidvalidity, uid, received_at,
-- mailplus_link, attachments: [{filename, mime, size, part_id}],
-- parse_error?: string}
-- mailplus_archive (기존 INBOX root archive 워커) 가 만든 row 는 NULL 유지.
-- inbox_ingest 가 만든 row 만 채움.
ALTER TABLE documents
ADD COLUMN IF NOT EXISTS email_metadata JSONB;
@@ -0,0 +1,7 @@
-- PR-4 Email Ingest — partial unique on (source_external_id) for email source
-- inbox_ingest 의 dedup 진실원장. 같은 메일 재 ingest 시 ON CONFLICT DO NOTHING.
-- mailplus_archive 의 INBOX root archive row 는 source_external_id=NULL 이라 자동 제외.
CREATE UNIQUE INDEX IF NOT EXISTS uq_documents_email_source_external_id
ON documents (source_external_id)
WHERE source_channel = 'email' AND source_external_id IS NOT NULL;