feat(email): PKM 폴더 선별 수집 + eml 본문 추출 활성화
mailplus_archive: INBOX 전체 → MAILPLUS_FOLDER(기본 PKM) 선별 수집, eml extract 스킵 가드 제거(검색·색인 편입), 폴더별 UID 상태(job_name=mailplus:<folder>), 폴더 부재 시 no-op. extract_worker: _extract_eml 신설(From/To/Date/Subject 헤더 prepend + 본문 text/plain 우선·html→bs4 fallback, 첨부 extract_meta 인벤토리 scaffold). preview/marker 는 eml 자동 skip 이라 라우팅 무변경. DNS drift(D25, mailplus.hyungi.net→.227) 교정 후 활성. 할일(events) 연계 없음. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -1,9 +1,11 @@
|
||||
"""텍스트 추출 워커 — kordoc / PyMuPDF / Surya OCR / LibreOffice / 직접 읽기 / 웹 HTML"""
|
||||
|
||||
import email
|
||||
import hashlib
|
||||
import re
|
||||
import subprocess
|
||||
from datetime import datetime, timezone
|
||||
from email.header import decode_header
|
||||
from pathlib import Path
|
||||
|
||||
import httpx
|
||||
@@ -23,6 +25,8 @@ TEXT_FORMATS = {"md", "txt", "csv", "json", "xml", "html"}
|
||||
OFFICE_FORMATS = {"xlsx", "xls", "docx", "doc", "pptx", "ppt", "odt", "ods", "odp", "odoc", "osheet"}
|
||||
# OCR 대상 이미지 포맷
|
||||
IMAGE_FORMATS = {"jpg", "jpeg", "png", "tiff", "tif", "bmp", "gif", "webp"}
|
||||
# 이메일 (선별 PKM 폴더 수집 → 헤더+본문 추출)
|
||||
EML_FORMATS = {"eml"}
|
||||
|
||||
EXTRACTOR_VERSION = "kordoc@1.7"
|
||||
PYMUPDF_VERSION = "pymupdf"
|
||||
@@ -233,6 +237,90 @@ async def _extract_web_html(doc: Document, html_path: Path) -> None:
|
||||
)
|
||||
|
||||
|
||||
# ─── 이메일(.eml) 추출 ───
|
||||
|
||||
def _decode_eml_header(raw: str) -> str:
|
||||
"""MIME 인코딩 헤더 디코딩."""
|
||||
if not raw:
|
||||
return ""
|
||||
out = []
|
||||
for data, charset in decode_header(raw):
|
||||
if isinstance(data, bytes):
|
||||
out.append(data.decode(charset or "utf-8", errors="replace"))
|
||||
else:
|
||||
out.append(data)
|
||||
return "".join(out)
|
||||
|
||||
|
||||
async def _extract_eml(doc: Document, eml_path: Path) -> None:
|
||||
"""이메일(.eml) 본문 추출 — From/To/Date/Subject 헤더 블록 + 본문.
|
||||
|
||||
본문은 text/plain 우선, 없으면 text/html → bs4 평문(_extract_web_with_bs4 재사용).
|
||||
헤더를 본문 머리에 prepend 해 검색·요약이 발신자/제목 맥락을 갖게 함.
|
||||
첨부는 extract_meta['email_attachments'] 에 인벤토리만 (본문 추출은 후속 — scaffold).
|
||||
"""
|
||||
raw = eml_path.read_bytes()
|
||||
msg = email.message_from_bytes(raw)
|
||||
|
||||
hdr_lines = []
|
||||
for label in ("From", "To", "Date", "Subject"):
|
||||
val = _decode_eml_header(msg.get(label, ""))
|
||||
if val:
|
||||
hdr_lines.append(f"{label}: {val}")
|
||||
|
||||
body = ""
|
||||
html_body = ""
|
||||
attachments = []
|
||||
if msg.is_multipart():
|
||||
for part in msg.walk():
|
||||
ctype = part.get_content_type()
|
||||
disp = (part.get("Content-Disposition") or "").lower()
|
||||
if "attachment" in disp:
|
||||
payload = part.get_payload(decode=True)
|
||||
attachments.append({
|
||||
"filename": _decode_eml_header(part.get_filename() or ""),
|
||||
"content_type": ctype,
|
||||
"size": len(payload) if payload else 0,
|
||||
})
|
||||
continue
|
||||
if ctype == "text/plain" and not body:
|
||||
payload = part.get_payload(decode=True)
|
||||
if payload is not None:
|
||||
body = payload.decode(part.get_content_charset() or "utf-8", errors="replace")
|
||||
elif ctype == "text/html" and not html_body:
|
||||
payload = part.get_payload(decode=True)
|
||||
if payload is not None:
|
||||
html_body = payload.decode(part.get_content_charset() or "utf-8", errors="replace")
|
||||
else:
|
||||
payload = msg.get_payload(decode=True)
|
||||
if payload is not None:
|
||||
decoded = payload.decode(msg.get_content_charset() or "utf-8", errors="replace")
|
||||
if msg.get_content_type() == "text/html":
|
||||
html_body = decoded
|
||||
else:
|
||||
body = decoded
|
||||
|
||||
# text/plain 없으면 html → bs4 평문 (devonagent 최종 fallback 재사용, 신규 의존성 0)
|
||||
if not body and html_body:
|
||||
body, _ = _extract_web_with_bs4(html_body)
|
||||
|
||||
if attachments:
|
||||
names = ", ".join(a["filename"] for a in attachments if a["filename"])
|
||||
hdr_lines.append(f"Attachments: {len(attachments)}개" + (f" ({names})" if names else ""))
|
||||
|
||||
header_block = "\n".join(hdr_lines)
|
||||
full_text = (header_block + "\n\n" + (body or "")).replace("\x00", "").strip()
|
||||
|
||||
doc.extracted_text = full_text
|
||||
doc.extracted_at = datetime.now(timezone.utc)
|
||||
doc.extractor_version = "eml@stdlib"
|
||||
if attachments:
|
||||
meta = dict(doc.extract_meta or {})
|
||||
meta["email_attachments"] = attachments
|
||||
doc.extract_meta = meta
|
||||
logger.info(f"[eml] {doc.file_path} ({len(full_text)}자, 첨부 {len(attachments)})")
|
||||
|
||||
|
||||
# ─── 메인 처리 ───
|
||||
|
||||
async def process(document_id: int, session: AsyncSession) -> None:
|
||||
@@ -257,6 +345,13 @@ async def process(document_id: int, session: AsyncSession) -> None:
|
||||
await _extract_web_html(doc, full_path)
|
||||
return
|
||||
|
||||
# ─── 이메일 (.eml) — 헤더+본문 추출 (선별 PKM 폴더 수집) ───
|
||||
if fmt in EML_FORMATS:
|
||||
if not full_path.exists():
|
||||
raise FileNotFoundError(f"파일 없음: {full_path}")
|
||||
await _extract_eml(doc, full_path)
|
||||
return
|
||||
|
||||
# ─── 텍스트 파일 — 직접 읽기 ───
|
||||
if fmt in TEXT_FORMATS:
|
||||
if not full_path.exists():
|
||||
|
||||
@@ -55,13 +55,20 @@ def _detect_origin(subject: str, body: str) -> str:
|
||||
return "external"
|
||||
|
||||
|
||||
def _fetch_emails_sync(host: str, port: int, user: str, password: str, last_uid: int | None):
|
||||
"""동기 IMAP 메일 가져오기 (asyncio.to_thread에서 실행)"""
|
||||
def _fetch_emails_sync(host: str, port: int, user: str, password: str, last_uid: int | None, folder: str):
|
||||
"""동기 IMAP 메일 가져오기 (asyncio.to_thread에서 실행).
|
||||
|
||||
선별 폴더(MAILPLUS_FOLDER, 기본 'PKM')만 수집 — INBOX 전체 X.
|
||||
폴더 부재 시 no-op (사용자가 MailPlus 규칙으로 폴더 생성 전까진 안전하게 0건).
|
||||
"""
|
||||
results = []
|
||||
conn = imaplib.IMAP4_SSL(host, port, timeout=30)
|
||||
try:
|
||||
conn.login(user, password)
|
||||
conn.select("INBOX")
|
||||
typ, _ = conn.select(folder)
|
||||
if typ != "OK":
|
||||
logger.info(f"[메일] 폴더 '{folder}' 없음/접근불가 — 수집 건너뜀 (no-op)")
|
||||
return results
|
||||
|
||||
if last_uid:
|
||||
# 증분 동기화: last_uid 이후
|
||||
@@ -71,14 +78,13 @@ def _fetch_emails_sync(host: str, port: int, user: str, password: str, last_uid:
|
||||
since = (datetime.now() - timedelta(days=7)).strftime("%d-%b-%Y")
|
||||
_, data = conn.uid("search", None, f"SINCE {since}")
|
||||
|
||||
uids = data[0].split()
|
||||
uids = (data[0] or b"").split()
|
||||
for uid_bytes in uids:
|
||||
uid = int(uid_bytes)
|
||||
_, msg_data = conn.uid("fetch", uid_bytes, "(RFC822)")
|
||||
if msg_data[0] is None:
|
||||
continue
|
||||
raw = msg_data[0][1]
|
||||
results.append((uid, raw))
|
||||
results.append((uid, msg_data[0][1]))
|
||||
finally:
|
||||
conn.logout()
|
||||
|
||||
@@ -91,15 +97,18 @@ async def run():
|
||||
port = int(os.getenv("MAILPLUS_PORT", "993"))
|
||||
user = os.getenv("MAILPLUS_USER", "")
|
||||
password = os.getenv("MAILPLUS_PASS", "")
|
||||
folder = os.getenv("MAILPLUS_FOLDER", "PKM")
|
||||
|
||||
if not all([host, user, password]):
|
||||
logger.warning("MailPlus 인증 정보 미설정")
|
||||
return
|
||||
|
||||
job_name = f"mailplus:{folder}"
|
||||
|
||||
async with async_session() as session:
|
||||
# 마지막 UID 조회
|
||||
# 마지막 UID 조회 (UID 는 폴더별 네임스페이스 → job_name 에 폴더 포함)
|
||||
state = await session.execute(
|
||||
select(AutomationState).where(AutomationState.job_name == "mailplus")
|
||||
select(AutomationState).where(AutomationState.job_name == job_name)
|
||||
)
|
||||
state_row = state.scalar_one_or_none()
|
||||
last_uid = int(state_row.last_check_value) if state_row and state_row.last_check_value else None
|
||||
@@ -107,7 +116,7 @@ async def run():
|
||||
# IMAP 동기 호출을 비동기로 래핑
|
||||
try:
|
||||
emails = await asyncio.to_thread(
|
||||
_fetch_emails_sync, host, port, user, password, last_uid,
|
||||
_fetch_emails_sync, host, port, user, password, last_uid, folder,
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(f"IMAP 연결 실패: {e}")
|
||||
@@ -174,15 +183,10 @@ async def run():
|
||||
session.add(doc)
|
||||
await session.flush()
|
||||
|
||||
safe_subj = subject.replace("\n", " ").replace("\r", " ")[:200]
|
||||
# 검색·색인 편입 (extract → classify → embed/chunk). 할일 연계 없음.
|
||||
await enqueue_stage(session, doc.id, "extract")
|
||||
|
||||
# TODO: extract_worker가 eml 본문/첨부 파싱 지원 시 이 조건 제거
|
||||
if doc.file_format != "eml":
|
||||
await enqueue_stage(session, doc.id, "extract")
|
||||
else:
|
||||
logger.debug(f"[메일] {safe_subj} — eml extract 미지원, 큐 스킵")
|
||||
|
||||
archived.append(safe_subj)
|
||||
archived.append(subject.replace("\n", " ").replace("\r", " ")[:200])
|
||||
max_uid = max(max_uid, uid)
|
||||
|
||||
except Exception as e:
|
||||
@@ -194,7 +198,7 @@ async def run():
|
||||
state_row.last_run_at = datetime.now(timezone.utc)
|
||||
else:
|
||||
session.add(AutomationState(
|
||||
job_name="mailplus",
|
||||
job_name=job_name,
|
||||
last_check_value=str(max_uid),
|
||||
last_run_at=datetime.now(timezone.utc),
|
||||
))
|
||||
|
||||
Reference in New Issue
Block a user