diff --git a/app/api/memos.py b/app/api/memos.py index a1b5316..f24fcd9 100644 --- a/app/api/memos.py +++ b/app/api/memos.py @@ -15,7 +15,7 @@ from typing import Annotated, Any from fastapi import APIRouter, Depends, File, Form, HTTPException, Query, UploadFile from pydantic import BaseModel, Field -from sqlalchemy import delete, func, select +from sqlalchemy import delete, func, select, or_, and_ from sqlalchemy.ext.asyncio import AsyncSession from core.auth import get_current_user @@ -178,6 +178,9 @@ class MemoResponse(BaseModel): source_channel: str | None = None # voice/memo 등 진입점 식별 (UI 배지) file_type: str | None = None # audio (voice 메모) vs note (text 메모) file_path: str | None = None # voice 메모의 NAS audio 경로 (audio player 용) + # PR-4 Email Ingest — 이메일 source 메모 식별 + UI 표시용 + source_external_id: str | None = None # email 의 Message-ID 또는 imap UID fallback + email_subject: str | None = None # email_metadata.subject — 메모 카드 부제 / 툴팁 created_at: datetime updated_at: datetime @@ -212,6 +215,8 @@ def _to_memo_response(doc: Document) -> MemoResponse: source_channel=doc.source_channel, file_type=doc.file_type, file_path=doc.file_path, + source_external_id=doc.source_external_id, + email_subject=(doc.email_metadata or {}).get('subject') if doc.email_metadata else None, created_at=doc.created_at, updated_at=doc.updated_at, ) @@ -274,8 +279,12 @@ async def list_memos( voice 메모는 file_type='immutable' + category='audio' + source_channel='voice' 패턴. source_channel 만으로 분리 (file_type 필터는 immutable 다른 binary 까지 끌어옴 — 회피). """ + # PR-4: inbox_ingest 가 만든 email memo 도 포함 (source_external_id != NULL 로 mailplus_archive 의 archive row 제외) base = select(Document).where( - Document.source_channel.in_(("memo", "voice")), + or_( + Document.source_channel.in_(("memo", "voice")), + and_(Document.source_channel == "email", Document.source_external_id.isnot(None)), + ), Document.deleted_at == None, # noqa: E711 Document.archived == archived, ) diff --git a/app/main.py b/app/main.py index e21dfd7..536ae1d 100644 --- a/app/main.py +++ b/app/main.py @@ -45,6 +45,7 @@ async def lifespan(app: FastAPI): from workers.file_watcher import watch_inbox from workers.law_monitor import run as law_monitor_run from workers.mailplus_archive import run as mailplus_run + from workers.inbox_ingest import run as inbox_ingest_run from workers.news_collector import run as news_collector_run from workers.queue_consumer import consume_queue from workers.study_queue_consumer import consume_study_queue @@ -93,6 +94,9 @@ async def lifespan(app: FastAPI): scheduler.add_job(law_monitor_run, CronTrigger(hour=7), id="law_monitor") scheduler.add_job(mailplus_run, CronTrigger(hour=7), id="mailplus_morning") scheduler.add_job(mailplus_run, CronTrigger(hour=18), id="mailplus_evening") + # PR-4: inbox@hyungi.net IMAP ingest (DocumentServer/Ingest 폴더, 5분 cron). + # plan: ~/.claude/plans/document-enchanted-candy.md + scheduler.add_job(inbox_ingest_run, "interval", minutes=5, id="inbox_ingest") scheduler.add_job(daily_digest_run, CronTrigger(hour=20), id="daily_digest") scheduler.add_job(global_digest_run, CronTrigger(hour=4, minute=0), id="global_digest") scheduler.add_job(morning_briefing_run, CronTrigger(hour=5, minute=10), id="morning_briefing") diff --git a/app/models/document.py b/app/models/document.py index a7cd6df..415a8c2 100644 --- a/app/models/document.py +++ b/app/models/document.py @@ -100,6 +100,12 @@ class Document(Base): preview_hash: Mapped[str | None] = mapped_column(String(64)) preview_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True)) + # PR-4 Email Ingest — 외부 source dedup key + 메일 metadata + # source_external_id: email 에선 always non-null (Message-ID 또는 imap UID fallback). 다른 source 는 NULL 가능. + # email_metadata: from/to/cc/subject/folder/uidvalidity/uid/received_at/mailplus_link/attachments[]. + source_external_id: Mapped[str | None] = mapped_column(Text) + email_metadata: Mapped[dict | None] = mapped_column(JSONB) + # 메타데이터 source_channel: Mapped[str | None] = mapped_column( Enum("law_monitor", "devonagent", "email", "web_clip", diff --git a/app/workers/inbox_ingest.py b/app/workers/inbox_ingest.py new file mode 100644 index 0000000..bb02267 --- /dev/null +++ b/app/workers/inbox_ingest.py @@ -0,0 +1,337 @@ +"""inbox@hyungi.net IMAP ingest 워커 — PR-4 Email Ingest. + +Plan: ~/.claude/plans/document-enchanted-candy.md + +MailPlus 의 `DocumentServer/Ingest` 폴더 (또는 사용자 지정 폴더) 를 5분 cron 으로 +polling. 메일을 source_channel='email' memo 로 생성하고 classify queue 등록. +기존 mailplus_archive (INBOX root archive) 와 폴더 분리 + source_external_id dedup 으로 +중복 안 함. + +정책 (사용자 라운드 2026-05-12): +- email_ingest 는 events row 직접 생성 X. memo 만 만들고 사용자 promote 가 events 생성. +- source_external_id always non-null (Message-ID 정규화 또는 imap:{folder}:{uidvalidity}:{uid}). +- DB unique index 가 dedup 진실원장. \\Seen flag 는 best-effort. +- 첨부는 metadata 만, NAS 실 연동은 별 PR. +""" + +import asyncio +import email +import hashlib +import imaplib +import os +import re +from datetime import datetime, timedelta, timezone +from email.header import decode_header +from email.utils import parsedate_to_datetime +from typing import Any + +import httpx +from sqlalchemy import select +from sqlalchemy.dialects.postgresql import insert as pg_insert + +from core.database import async_session +from core.utils import setup_logger +from models.automation import AutomationState +from models.document import Document +from models.queue import enqueue_stage + +logger = setup_logger("inbox_ingest") + +_TAG_RE = re.compile(r"<[^>]+>") +_WS_RE = re.compile(r"[ \t]+") +_NL_RE = re.compile(r"\n{3,}") +_MID_BRACKETS = re.compile(r"^<|>$") + + +def _decode_mime_header(raw: str | None) -> str: + if not raw: + return "" + parts = decode_header(raw) + out = [] + for data, charset in parts: + if isinstance(data, bytes): + out.append(data.decode(charset or "utf-8", errors="replace")) + else: + out.append(data) + return "".join(out) + + +def _normalize_message_id(raw: str | None) -> str | None: + if not raw: + return None + mid = raw.strip() + if not mid: + return None + mid = _MID_BRACKETS.sub("", mid).strip() + return mid.lower() or None + + +def _build_source_external_id(message_id: str | None, folder: str, uidvalidity: str, uid: int) -> str: + if message_id: + return message_id + return f"imap:{folder}:{uidvalidity}:{uid}" + + +def _strip_html(html: str) -> str: + text = _TAG_RE.sub("", html) + text = text.replace(" ", " ").replace("&", "&").replace("<", "<").replace(">", ">").replace(""", '"').replace("'", "'") + text = _WS_RE.sub(" ", text) + text = _NL_RE.sub("\n\n", text) + return text.strip() + + +def _extract_body(msg: email.message.Message) -> str: + text_part = None + html_part = None + if msg.is_multipart(): + for part in msg.walk(): + ctype = part.get_content_type() + if part.get("Content-Disposition", "").startswith("attachment"): + continue + if ctype == "text/plain" and text_part is None: + text_part = part + elif ctype == "text/html" and html_part is None: + html_part = part + else: + ctype = msg.get_content_type() + if ctype == "text/plain": + text_part = msg + elif ctype == "text/html": + html_part = msg + + target = text_part or html_part + if target is None: + return "" + try: + payload = target.get_payload(decode=True) + if payload is None: + return "" + charset = target.get_content_charset() or "utf-8" + body = payload.decode(charset, errors="replace") + except Exception as e: + logger.warning(f"[inbox_ingest] body decode 실패: {e}") + return "" + + if target.get_content_type() == "text/html": + body = _strip_html(body) + return body.strip() + + +def _extract_attachments_meta(msg: email.message.Message) -> list[dict]: + out: list[dict] = [] + if not msg.is_multipart(): + return out + for idx, part in enumerate(msg.walk()): + disp = part.get("Content-Disposition", "") + if not disp.startswith("attachment") and not part.get_filename(): + continue + filename = _decode_mime_header(part.get_filename()) or f"unnamed_{idx}" + size: int | None = None + try: + payload = part.get_payload(decode=True) + if payload is not None: + size = len(payload) + except Exception: + size = None + out.append({ + "filename": filename, + "mime": part.get_content_type(), + "size": size, + "part_id": idx, + }) + return out + + +def _parse_received_at(raw: str | None) -> str | None: + if not raw: + return None + try: + dt = parsedate_to_datetime(raw) + if dt.tzinfo is None: + dt = dt.replace(tzinfo=timezone.utc) + return dt.astimezone(timezone.utc).isoformat() + except Exception: + return None + + +def _fetch_uids_sync(host: str, port: int, user: str, password: str, folder: str, since_days: int) -> tuple[str, list[tuple[int, bytes]]]: + """동기 IMAP fetch — 폴더 선택 + UID SEARCH SINCE + RFC822 fetch. + Returns (uidvalidity_str, [(uid, raw_bytes), ...]). + """ + conn = imaplib.IMAP4_SSL(host, port, timeout=30) + try: + conn.login(user, password) + typ, _ = conn.select(f'"{folder}"') + if typ != "OK": + raise RuntimeError(f"folder select 실패: {folder}") + + # uidvalidity 추출 (STATUS 또는 SELECT 응답) + typ, status_data = conn.status(f'"{folder}"', "(UIDVALIDITY)") + uidvalidity = "0" + if typ == "OK" and status_data: + m = re.search(rb"UIDVALIDITY (\d+)", status_data[0]) + if m: + uidvalidity = m.group(1).decode() + + since = (datetime.now(timezone.utc) - timedelta(days=since_days)).strftime("%d-%b-%Y") + typ, data = conn.uid("search", None, f"SINCE {since}") + if typ != "OK": + return uidvalidity, [] + uids = data[0].split() if data and data[0] else [] + + results = [] + for uid_bytes in uids: + uid = int(uid_bytes) + typ, msg_data = conn.uid("fetch", uid_bytes, "(RFC822)") + if typ != "OK" or not msg_data or msg_data[0] is None: + continue + raw = msg_data[0][1] + if isinstance(raw, bytes): + results.append((uid, raw)) + return uidvalidity, results + finally: + try: + conn.logout() + except Exception: + pass + + +async def _heartbeat(url: str | None, success: bool): + if not url: + return + target = url if success else f"{url.rstrip('/')}/fail" + try: + async with httpx.AsyncClient(timeout=10) as c: + await c.get(target) + except Exception as e: + logger.debug(f"[inbox_ingest] HC heartbeat 실패: {e}") + + +async def run(): + """5분 cron 진입점 — DocumentServer/Ingest 폴더 polling + memo 생성.""" + enabled = os.getenv("INBOX_INGEST_ENABLED", "false").lower() == "true" + if not enabled: + logger.debug("[inbox_ingest] INBOX_INGEST_ENABLED=false, skip") + return + + host = os.getenv("MAILPLUS_HOST", "") + port = int(os.getenv("MAILPLUS_PORT", "993")) + user = os.getenv("MAILPLUS_USER", "") + password = os.getenv("MAILPLUS_PASS", "") + folder = os.getenv("INBOX_INGEST_FOLDER", "DocumentServer/Ingest") + since_days = int(os.getenv("INBOX_INGEST_DAYS", "14")) + hc_url = os.getenv("INBOX_INGEST_HC_URL", "").strip() or None + + if not all([host, user, password]): + logger.warning("[inbox_ingest] MailPlus 인증 정보 미설정 — skip") + return + + try: + uidvalidity, emails = await asyncio.to_thread( + _fetch_uids_sync, host, port, user, password, folder, since_days, + ) + except Exception as e: + logger.error(f"[inbox_ingest] IMAP fetch 실패: {e}") + await _heartbeat(hc_url, success=False) + return + + if not emails: + logger.info("[inbox_ingest] 새 메일 0건") + await _heartbeat(hc_url, success=True) + return + + created = 0 + skipped = 0 + parse_failed = 0 + + async with async_session() as session: + for uid, raw_bytes in emails: + try: + msg = email.message_from_bytes(raw_bytes) + message_id = _normalize_message_id(msg.get("Message-ID")) + source_external_id = _build_source_external_id(message_id, folder, uidvalidity, uid) + + # ON CONFLICT DO NOTHING (DB unique 진실원장) + stmt = ( + pg_insert(Document) + .values( + file_path=None, + file_hash=hashlib.sha256(raw_bytes).hexdigest(), + file_format="eml", + file_size=len(raw_bytes), + file_type="note", + source_channel="email", + source_external_id=source_external_id, + # 이메일 본문/제목/metadata 는 아래에서 채움 (placeholder 로 일단 row 생성) + title=_decode_mime_header(msg.get("Subject"))[:500] or "(제목없음)", + extracted_text="", + email_metadata={}, + ) + .on_conflict_do_nothing( + index_elements=["source_external_id"], + index_where="source_channel = 'email' AND source_external_id IS NOT NULL", + ) + .returning(Document.id) + ) + result = await session.execute(stmt) + row = result.first() + if row is None: + skipped += 1 + continue + doc_id = row[0] + + # 본문/metadata parse (row 생성 후 실패 = email_metadata.parse_error 기록) + try: + body = _extract_body(msg) + attachments = _extract_attachments_meta(msg) + metadata: dict[str, Any] = { + "from": _decode_mime_header(msg.get("From")), + "to": [_decode_mime_header(a) for a in msg.get_all("To", [])], + "cc": [_decode_mime_header(a) for a in msg.get_all("Cc", [])], + "subject": _decode_mime_header(msg.get("Subject")), + "folder": folder, + "uidvalidity": uidvalidity, + "uid": uid, + "received_at": _parse_received_at(msg.get("Date")), + "attachments": attachments, + } + except Exception as parse_exc: + logger.warning(f"[inbox_ingest] doc {doc_id} parse 실패: {parse_exc}") + body = "" + metadata = {"parse_error": str(parse_exc), "folder": folder, "uidvalidity": uidvalidity, "uid": uid} + parse_failed += 1 + + # UPDATE 로 본문/metadata 채움 + doc = (await session.execute(select(Document).where(Document.id == doc_id))).scalar_one() + doc.extracted_text = body + doc.email_metadata = metadata + + # classify pipeline 진입 (4B triage fail 상태는 별 PR. ingest 자체는 정상 진행) + await enqueue_stage(session, doc_id, "classify") + created += 1 + + except Exception as e: + logger.error(f"[inbox_ingest] UID {uid} 처리 실패: {e}") + + # automation_state 기록 (참고용, dedup 진실원장 아님) + state = await session.execute( + select(AutomationState).where(AutomationState.job_name == "inbox_ingest") + ) + state_row = state.scalar_one_or_none() + now_utc = datetime.now(timezone.utc) + if state_row: + state_row.last_run_at = now_utc + state_row.last_check_value = str(uid) if emails else state_row.last_check_value + else: + session.add(AutomationState( + job_name="inbox_ingest", + last_check_value=str(uid) if emails else "0", + last_run_at=now_utc, + )) + await session.commit() + + logger.info( + f"[inbox_ingest] folder={folder} fetched={len(emails)} created={created} " + f"skipped(dedup)={skipped} parse_failed={parse_failed}" + ) + await _heartbeat(hc_url, success=True) diff --git a/credentials.env.example b/credentials.env.example index aefa347..e6690c9 100644 --- a/credentials.env.example +++ b/credentials.env.example @@ -32,6 +32,13 @@ MAILPLUS_PORT=993 MAILPLUS_SMTP_PORT=465 MAILPLUS_USER=hyungi MAILPLUS_PASS= +# PR-4 inbox@hyungi.net IMAP ingest (DocumentServer/Ingest 폴더 → memo). +# enable 시 INBOX_INGEST_ENABLED=true. 폴더는 alias 수신 메일이 자동 이동되는 곳. +INBOX_INGEST_ENABLED=false +INBOX_INGEST_FOLDER=DocumentServer/Ingest +INBOX_INGEST_DAYS=14 +INBOX_INGEST_HC_URL= + # ─── Synology Calendar (CalDAV, 태스크 관리) ─── CALDAV_URL=https://ds1525.hyungi.net/caldav/ diff --git a/frontend/src/routes/memos/+page.svelte b/frontend/src/routes/memos/+page.svelte index c17a0c5..e577278 100644 --- a/frontend/src/routes/memos/+page.svelte +++ b/frontend/src/routes/memos/+page.svelte @@ -3,7 +3,7 @@ import { api } from '$lib/api'; import { addToast } from '$lib/stores/toast'; import { renderMemoHtml, todayIso, countHiddenTasks, DEFAULT_HIDE_AFTER_MS } from '$lib/utils/memoRenderer'; - import { Pin, PinOff, Pencil, Trash2, Eye, EyeOff, X, Check, Archive, ArchiveRestore, ListChecks, Bold, Heading, CalendarDays, Mic, Calendar, Activity, ArrowRight, FileText, BookOpen } from 'lucide-svelte'; + import { Pin, PinOff, Pencil, Trash2, Eye, EyeOff, X, Check, Archive, ArchiveRestore, ListChecks, Bold, Heading, CalendarDays, Mic, Calendar, Activity, ArrowRight, FileText, BookOpen, Mail } from 'lucide-svelte'; import { getAccessToken } from '$lib/api'; import Button from '$lib/components/ui/Button.svelte'; import Card from '$lib/components/ui/Card.svelte'; @@ -523,13 +523,18 @@ {:else} - {#if memo.source_channel === 'voice' || memo.ai_event_kind || memo._last_promoted} + {#if memo.source_channel === 'voice' || memo.source_channel === 'email' || memo.ai_event_kind || memo._last_promoted}
{#if memo.source_channel === 'voice'} 음성 {/if} + {#if memo.source_channel === 'email'} + + 이메일 + + {/if} {#if memo.ai_event_kind && memo.ai_event_kind !== 'note'} AI 추천: {KIND_LABELS[memo.ai_event_kind] || memo.ai_event_kind}{memo.ai_event_confidence != null ? ` · ${Math.round(memo.ai_event_confidence * 100)}%` : ''} diff --git a/migrations/259_documents_source_external_id.sql b/migrations/259_documents_source_external_id.sql new file mode 100644 index 0000000..14678e5 --- /dev/null +++ b/migrations/259_documents_source_external_id.sql @@ -0,0 +1,7 @@ +-- PR-4 Email Ingest — documents.source_external_id 컬럼 추가 +-- 외부 source 의 dedup key. email source 에서는 always non-null (ingest 코드 책임). +-- Message-ID 정규화 또는 imap:{folder}:{uidvalidity}:{uid} fallback. +-- 다른 source_channel 에서는 NULL 허용 (별 의미 부여 시 nullable→unique 검토). + +ALTER TABLE documents + ADD COLUMN IF NOT EXISTS source_external_id TEXT; diff --git a/migrations/260_documents_email_metadata.sql b/migrations/260_documents_email_metadata.sql new file mode 100644 index 0000000..2b0d945 --- /dev/null +++ b/migrations/260_documents_email_metadata.sql @@ -0,0 +1,9 @@ +-- PR-4 Email Ingest — documents.email_metadata JSONB 컬럼 추가 +-- 구조: {from, to[], cc[], subject, folder, uidvalidity, uid, received_at, +-- mailplus_link, attachments: [{filename, mime, size, part_id}], +-- parse_error?: string} +-- mailplus_archive (기존 INBOX root archive 워커) 가 만든 row 는 NULL 유지. +-- inbox_ingest 가 만든 row 만 채움. + +ALTER TABLE documents + ADD COLUMN IF NOT EXISTS email_metadata JSONB; diff --git a/migrations/261_documents_source_external_id_uq.sql b/migrations/261_documents_source_external_id_uq.sql new file mode 100644 index 0000000..bcc159c --- /dev/null +++ b/migrations/261_documents_source_external_id_uq.sql @@ -0,0 +1,7 @@ +-- PR-4 Email Ingest — partial unique on (source_external_id) for email source +-- inbox_ingest 의 dedup 진실원장. 같은 메일 재 ingest 시 ON CONFLICT DO NOTHING. +-- mailplus_archive 의 INBOX root archive row 는 source_external_id=NULL 이라 자동 제외. + +CREATE UNIQUE INDEX IF NOT EXISTS uq_documents_email_source_external_id + ON documents (source_external_id) + WHERE source_channel = 'email' AND source_external_id IS NOT NULL;