diff --git a/app/workers/extract_worker.py b/app/workers/extract_worker.py index cc162b1..f1e467a 100644 --- a/app/workers/extract_worker.py +++ b/app/workers/extract_worker.py @@ -1,9 +1,11 @@ """텍스트 추출 워커 — kordoc / PyMuPDF / Surya OCR / LibreOffice / 직접 읽기 / 웹 HTML""" +import email import hashlib import re import subprocess from datetime import datetime, timezone +from email.header import decode_header from pathlib import Path import httpx @@ -23,6 +25,8 @@ TEXT_FORMATS = {"md", "txt", "csv", "json", "xml", "html"} OFFICE_FORMATS = {"xlsx", "xls", "docx", "doc", "pptx", "ppt", "odt", "ods", "odp", "odoc", "osheet"} # OCR 대상 이미지 포맷 IMAGE_FORMATS = {"jpg", "jpeg", "png", "tiff", "tif", "bmp", "gif", "webp"} +# 이메일 (선별 PKM 폴더 수집 → 헤더+본문 추출) +EML_FORMATS = {"eml"} EXTRACTOR_VERSION = "kordoc@1.7" PYMUPDF_VERSION = "pymupdf" @@ -233,6 +237,90 @@ async def _extract_web_html(doc: Document, html_path: Path) -> None: ) +# ─── 이메일(.eml) 추출 ─── + +def _decode_eml_header(raw: str) -> str: + """MIME 인코딩 헤더 디코딩.""" + if not raw: + return "" + out = [] + for data, charset in decode_header(raw): + if isinstance(data, bytes): + out.append(data.decode(charset or "utf-8", errors="replace")) + else: + out.append(data) + return "".join(out) + + +async def _extract_eml(doc: Document, eml_path: Path) -> None: + """이메일(.eml) 본문 추출 — From/To/Date/Subject 헤더 블록 + 본문. + + 본문은 text/plain 우선, 없으면 text/html → bs4 평문(_extract_web_with_bs4 재사용). + 헤더를 본문 머리에 prepend 해 검색·요약이 발신자/제목 맥락을 갖게 함. + 첨부는 extract_meta['email_attachments'] 에 인벤토리만 (본문 추출은 후속 — scaffold). + """ + raw = eml_path.read_bytes() + msg = email.message_from_bytes(raw) + + hdr_lines = [] + for label in ("From", "To", "Date", "Subject"): + val = _decode_eml_header(msg.get(label, "")) + if val: + hdr_lines.append(f"{label}: {val}") + + body = "" + html_body = "" + attachments = [] + if msg.is_multipart(): + for part in msg.walk(): + ctype = part.get_content_type() + disp = (part.get("Content-Disposition") or "").lower() + if "attachment" in disp: + payload = part.get_payload(decode=True) + attachments.append({ + "filename": _decode_eml_header(part.get_filename() or ""), + "content_type": ctype, + "size": len(payload) if payload else 0, + }) + continue + if ctype == "text/plain" and not body: + payload = part.get_payload(decode=True) + if payload is not None: + body = payload.decode(part.get_content_charset() or "utf-8", errors="replace") + elif ctype == "text/html" and not html_body: + payload = part.get_payload(decode=True) + if payload is not None: + html_body = payload.decode(part.get_content_charset() or "utf-8", errors="replace") + else: + payload = msg.get_payload(decode=True) + if payload is not None: + decoded = payload.decode(msg.get_content_charset() or "utf-8", errors="replace") + if msg.get_content_type() == "text/html": + html_body = decoded + else: + body = decoded + + # text/plain 없으면 html → bs4 평문 (devonagent 최종 fallback 재사용, 신규 의존성 0) + if not body and html_body: + body, _ = _extract_web_with_bs4(html_body) + + if attachments: + names = ", ".join(a["filename"] for a in attachments if a["filename"]) + hdr_lines.append(f"Attachments: {len(attachments)}개" + (f" ({names})" if names else "")) + + header_block = "\n".join(hdr_lines) + full_text = (header_block + "\n\n" + (body or "")).replace("\x00", "").strip() + + doc.extracted_text = full_text + doc.extracted_at = datetime.now(timezone.utc) + doc.extractor_version = "eml@stdlib" + if attachments: + meta = dict(doc.extract_meta or {}) + meta["email_attachments"] = attachments + doc.extract_meta = meta + logger.info(f"[eml] {doc.file_path} ({len(full_text)}자, 첨부 {len(attachments)})") + + # ─── 메인 처리 ─── async def process(document_id: int, session: AsyncSession) -> None: @@ -257,6 +345,13 @@ async def process(document_id: int, session: AsyncSession) -> None: await _extract_web_html(doc, full_path) return + # ─── 이메일 (.eml) — 헤더+본문 추출 (선별 PKM 폴더 수집) ─── + if fmt in EML_FORMATS: + if not full_path.exists(): + raise FileNotFoundError(f"파일 없음: {full_path}") + await _extract_eml(doc, full_path) + return + # ─── 텍스트 파일 — 직접 읽기 ─── if fmt in TEXT_FORMATS: if not full_path.exists(): diff --git a/app/workers/mailplus_archive.py b/app/workers/mailplus_archive.py index 7bbfd4d..c673cc4 100644 --- a/app/workers/mailplus_archive.py +++ b/app/workers/mailplus_archive.py @@ -55,13 +55,20 @@ def _detect_origin(subject: str, body: str) -> str: return "external" -def _fetch_emails_sync(host: str, port: int, user: str, password: str, last_uid: int | None): - """동기 IMAP 메일 가져오기 (asyncio.to_thread에서 실행)""" +def _fetch_emails_sync(host: str, port: int, user: str, password: str, last_uid: int | None, folder: str): + """동기 IMAP 메일 가져오기 (asyncio.to_thread에서 실행). + + 선별 폴더(MAILPLUS_FOLDER, 기본 'PKM')만 수집 — INBOX 전체 X. + 폴더 부재 시 no-op (사용자가 MailPlus 규칙으로 폴더 생성 전까진 안전하게 0건). + """ results = [] conn = imaplib.IMAP4_SSL(host, port, timeout=30) try: conn.login(user, password) - conn.select("INBOX") + typ, _ = conn.select(folder) + if typ != "OK": + logger.info(f"[메일] 폴더 '{folder}' 없음/접근불가 — 수집 건너뜀 (no-op)") + return results if last_uid: # 증분 동기화: last_uid 이후 @@ -71,14 +78,13 @@ def _fetch_emails_sync(host: str, port: int, user: str, password: str, last_uid: since = (datetime.now() - timedelta(days=7)).strftime("%d-%b-%Y") _, data = conn.uid("search", None, f"SINCE {since}") - uids = data[0].split() + uids = (data[0] or b"").split() for uid_bytes in uids: uid = int(uid_bytes) _, msg_data = conn.uid("fetch", uid_bytes, "(RFC822)") if msg_data[0] is None: continue - raw = msg_data[0][1] - results.append((uid, raw)) + results.append((uid, msg_data[0][1])) finally: conn.logout() @@ -91,15 +97,18 @@ async def run(): port = int(os.getenv("MAILPLUS_PORT", "993")) user = os.getenv("MAILPLUS_USER", "") password = os.getenv("MAILPLUS_PASS", "") + folder = os.getenv("MAILPLUS_FOLDER", "PKM") if not all([host, user, password]): logger.warning("MailPlus 인증 정보 미설정") return + job_name = f"mailplus:{folder}" + async with async_session() as session: - # 마지막 UID 조회 + # 마지막 UID 조회 (UID 는 폴더별 네임스페이스 → job_name 에 폴더 포함) state = await session.execute( - select(AutomationState).where(AutomationState.job_name == "mailplus") + select(AutomationState).where(AutomationState.job_name == job_name) ) state_row = state.scalar_one_or_none() last_uid = int(state_row.last_check_value) if state_row and state_row.last_check_value else None @@ -107,7 +116,7 @@ async def run(): # IMAP 동기 호출을 비동기로 래핑 try: emails = await asyncio.to_thread( - _fetch_emails_sync, host, port, user, password, last_uid, + _fetch_emails_sync, host, port, user, password, last_uid, folder, ) except Exception as e: logger.error(f"IMAP 연결 실패: {e}") @@ -174,15 +183,10 @@ async def run(): session.add(doc) await session.flush() - safe_subj = subject.replace("\n", " ").replace("\r", " ")[:200] + # 검색·색인 편입 (extract → classify → embed/chunk). 할일 연계 없음. + await enqueue_stage(session, doc.id, "extract") - # TODO: extract_worker가 eml 본문/첨부 파싱 지원 시 이 조건 제거 - if doc.file_format != "eml": - await enqueue_stage(session, doc.id, "extract") - else: - logger.debug(f"[메일] {safe_subj} — eml extract 미지원, 큐 스킵") - - archived.append(safe_subj) + archived.append(subject.replace("\n", " ").replace("\r", " ")[:200]) max_uid = max(max_uid, uid) except Exception as e: @@ -194,7 +198,7 @@ async def run(): state_row.last_run_at = datetime.now(timezone.utc) else: session.add(AutomationState( - job_name="mailplus", + job_name=job_name, last_check_value=str(max_uid), last_run_at=datetime.now(timezone.utc), ))