diff --git a/app/core/utils.py b/app/core/utils.py index 7981411..646c77b 100644 --- a/app/core/utils.py +++ b/app/core/utils.py @@ -44,3 +44,81 @@ def count_log_errors(log_path: str) -> int: return sum(1 for line in f if "[ERROR]" in line) except FileNotFoundError: return 0 + + +# ─── CalDAV 헬퍼 ─── + + +def create_caldav_todo( + caldav_url: str, + username: str, + password: str, + title: str, + description: str = "", + due_days: int = 7, +) -> str | None: + """Synology Calendar에 VTODO 생성, UID 반환""" + import uuid + from datetime import datetime, timedelta, timezone + + import caldav + + try: + client = caldav.DAVClient(url=caldav_url, username=username, password=password) + principal = client.principal() + calendars = principal.calendars() + if not calendars: + return None + + calendar = calendars[0] + uid = str(uuid.uuid4()) + due = datetime.now(timezone.utc) + timedelta(days=due_days) + due_str = due.strftime("%Y%m%dT%H%M%SZ") + + vtodo = f"""BEGIN:VCALENDAR +VERSION:2.0 +BEGIN:VTODO +UID:{uid} +SUMMARY:{title} +DESCRIPTION:{description} +DUE:{due_str} +STATUS:NEEDS-ACTION +PRIORITY:5 +END:VTODO +END:VCALENDAR""" + + calendar.save_event(vtodo) + return uid + except Exception as e: + logging.getLogger("caldav").error(f"CalDAV VTODO 생성 실패: {e}") + return None + + +# ─── SMTP 헬퍼 ─── + + +def send_smtp_email( + host: str, + port: int, + username: str, + password: str, + subject: str, + body: str, + to_addr: str | None = None, +): + """Synology MailPlus SMTP로 이메일 발송""" + import smtplib + from email.mime.text import MIMEText + + to_addr = to_addr or username + msg = MIMEText(body, "plain", "utf-8") + msg["Subject"] = subject + msg["From"] = username + msg["To"] = to_addr + + try: + with smtplib.SMTP_SSL(host, port, timeout=30) as server: + server.login(username, password) + server.send_message(msg) + except Exception as e: + logging.getLogger("smtp").error(f"SMTP 발송 실패: {e}") diff --git a/app/main.py b/app/main.py index 2167924..d4d140a 100644 --- a/app/main.py +++ b/app/main.py @@ -19,16 +19,26 @@ from models.user import User async def lifespan(app: FastAPI): """앱 시작/종료 시 실행되는 lifespan 핸들러""" from apscheduler.schedulers.asyncio import AsyncIOScheduler + from apscheduler.triggers.cron import CronTrigger + from workers.daily_digest import run as daily_digest_run from workers.file_watcher import watch_inbox + from workers.law_monitor import run as law_monitor_run + from workers.mailplus_archive import run as mailplus_run from workers.queue_consumer import consume_queue # 시작: DB 연결 확인 await init_db() # APScheduler: 백그라운드 작업 - scheduler = AsyncIOScheduler() + scheduler = AsyncIOScheduler(timezone="Asia/Seoul") + # 상시 실행 scheduler.add_job(consume_queue, "interval", minutes=1, id="queue_consumer") scheduler.add_job(watch_inbox, "interval", minutes=5, id="file_watcher") + # 일일 스케줄 (KST) + scheduler.add_job(law_monitor_run, CronTrigger(hour=7), id="law_monitor") + scheduler.add_job(mailplus_run, CronTrigger(hour=7), id="mailplus_morning") + scheduler.add_job(mailplus_run, CronTrigger(hour=18), id="mailplus_evening") + scheduler.add_job(daily_digest_run, CronTrigger(hour=20), id="daily_digest") scheduler.start() yield diff --git a/app/models/automation.py b/app/models/automation.py new file mode 100644 index 0000000..d7a2622 --- /dev/null +++ b/app/models/automation.py @@ -0,0 +1,20 @@ +"""automation_state 테이블 ORM — 자동화 워커 증분 동기화 상태""" + +from datetime import datetime + +from sqlalchemy import BigInteger, DateTime, String, Text +from sqlalchemy.orm import Mapped, mapped_column + +from core.database import Base + + +class AutomationState(Base): + __tablename__ = "automation_state" + + id: Mapped[int] = mapped_column(BigInteger, primary_key=True) + job_name: Mapped[str] = mapped_column(String(50), unique=True, nullable=False) + last_check_value: Mapped[str | None] = mapped_column(Text) + last_run_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True)) + updated_at: Mapped[datetime] = mapped_column( + DateTime(timezone=True), default=datetime.now, onupdate=datetime.now + ) diff --git a/app/workers/daily_digest.py b/app/workers/daily_digest.py new file mode 100644 index 0000000..213a9be --- /dev/null +++ b/app/workers/daily_digest.py @@ -0,0 +1,146 @@ +"""일일 다이제스트 워커 — PostgreSQL/CalDAV 쿼리 → Markdown + SMTP + +v1 scripts/pkm_daily_digest.py에서 포팅. +DEVONthink/OmniFocus → PostgreSQL/CalDAV 쿼리로 전환. +""" + +import os +from datetime import datetime, timezone +from pathlib import Path + +from sqlalchemy import func, select, text + +from core.config import settings +from core.database import async_session +from core.utils import send_smtp_email, setup_logger +from models.document import Document +from models.queue import ProcessingQueue + +logger = setup_logger("daily_digest") + + +async def run(): + """일일 다이제스트 생성 + 저장 + 발송""" + today = datetime.now(timezone.utc).strftime("%Y-%m-%d") + sections = [] + + async with async_session() as session: + # ─── 1. 오늘 추가된 문서 ─── + added = await session.execute( + select(Document.ai_domain, func.count(Document.id)) + .where(func.date(Document.created_at) == today) + .group_by(Document.ai_domain) + ) + added_rows = added.all() + total_added = sum(row[1] for row in added_rows) + + section = f"## 오늘 추가된 문서 ({total_added}건)\n" + if added_rows: + for domain, count in added_rows: + section += f"- {domain or '미분류'}: {count}건\n" + else: + section += "- 없음\n" + sections.append(section) + + # ─── 2. 법령 변경 ─── + law_docs = await session.execute( + select(Document.title) + .where( + Document.source_channel == "law_monitor", + func.date(Document.created_at) == today, + ) + ) + law_rows = law_docs.scalars().all() + section = f"## 법령 변경 ({len(law_rows)}건)\n" + if law_rows: + for title in law_rows: + section += f"- {title}\n" + else: + section += "- 변경 없음\n" + sections.append(section) + + # ─── 3. 이메일 수집 ─── + email_count = await session.execute( + select(func.count(Document.id)) + .where( + Document.source_channel == "email", + func.date(Document.created_at) == today, + ) + ) + email_total = email_count.scalar() or 0 + sections.append(f"## 이메일 수집\n- {email_total}건 아카이브\n") + + # ─── 4. 처리 파이프라인 상태 ─── + queue_stats = await session.execute( + text(""" + SELECT stage, status, COUNT(*) + FROM processing_queue + WHERE created_at > NOW() - INTERVAL '24 hours' + GROUP BY stage, status + ORDER BY stage, status + """) + ) + queue_rows = queue_stats.all() + section = "## 파이프라인 상태 (24h)\n" + if queue_rows: + for stage, status, count in queue_rows: + section += f"- {stage}/{status}: {count}건\n" + else: + section += "- 처리 항목 없음\n" + + # 실패 건수 강조 + failed = await session.execute( + select(func.count()) + .select_from(ProcessingQueue) + .where( + ProcessingQueue.status == "failed", + ProcessingQueue.created_at > text("NOW() - INTERVAL '24 hours'"), + ) + ) + failed_count = failed.scalar() or 0 + if failed_count > 0: + section += f"\n⚠️ **실패 {failed_count}건** — 수동 확인 필요\n" + sections.append(section) + + # ─── 5. Inbox 미분류 ─── + inbox_count = await session.execute( + select(func.count(Document.id)) + .where(Document.file_path.like("PKM/Inbox/%")) + ) + inbox_total = inbox_count.scalar() or 0 + if inbox_total > 0: + sections.append(f"## Inbox 미분류\n- {inbox_total}건 대기 중\n") + + # ─── Markdown 조합 ─── + date_display = datetime.now(timezone.utc).strftime("%Y년 %m월 %d일") + markdown = f"# PKM 일일 다이제스트 — {date_display}\n\n" + markdown += "\n".join(sections) + markdown += f"\n---\n*생성: {datetime.now(timezone.utc).isoformat()}*\n" + + # ─── NAS 저장 ─── + digest_dir = Path(settings.nas_mount_path) / "PKM" / "Archive" / "digests" + digest_dir.mkdir(parents=True, exist_ok=True) + digest_path = digest_dir / f"{today}_digest.md" + digest_path.write_text(markdown, encoding="utf-8") + + # ─── 90일 초과 아카이브 ─── + archive_dir = digest_dir / "archive" + archive_dir.mkdir(exist_ok=True) + cutoff = datetime.now(timezone.utc).timestamp() - (90 * 86400) + for old in digest_dir.glob("*_digest.md"): + if old.stat().st_mtime < cutoff: + old.rename(archive_dir / old.name) + + # ─── SMTP 발송 ─── + smtp_host = os.getenv("MAILPLUS_HOST", "") + smtp_port = int(os.getenv("MAILPLUS_SMTP_PORT", "465")) + smtp_user = os.getenv("MAILPLUS_USER", "") + smtp_pass = os.getenv("MAILPLUS_PASS", "") + if smtp_host and smtp_user: + send_smtp_email( + smtp_host, smtp_port, smtp_user, smtp_pass, + f"PKM 다이제스트 — {date_display}", + markdown, + ) + + logger.info(f"다이제스트 생성 완료: {digest_path}") diff --git a/app/workers/law_monitor.py b/app/workers/law_monitor.py new file mode 100644 index 0000000..d5d2f8d --- /dev/null +++ b/app/workers/law_monitor.py @@ -0,0 +1,192 @@ +"""법령 모니터 워커 — 국가법령정보센터 API로 법령 변경 감지 + +LAW_OC 승인 대기 중 — 코드 완성, 실 호출은 승인 후. +v1 scripts/law_monitor.py에서 포팅. +""" + +import os +import re +from datetime import datetime, timezone +from pathlib import Path +from xml.etree import ElementTree as ET + +import httpx +from sqlalchemy import select + +from core.config import settings +from core.database import async_session +from core.utils import create_caldav_todo, file_hash, send_smtp_email, setup_logger +from models.automation import AutomationState +from models.document import Document +from models.queue import ProcessingQueue + +logger = setup_logger("law_monitor") + +LAW_API_BASE = "https://www.law.go.kr" + +# 모니터링 대상 법령 (Tier 1: 핵심) +TIER1_LAWS = [ + "산업안전보건법", + "산업안전보건법 시행령", + "산업안전보건법 시행규칙", + "중대재해 처벌 등에 관한 법률", + "중대재해 처벌 등에 관한 법률 시행령", + "건설기술 진흥법", + "건설기술 진흥법 시행령", + "위험물안전관리법", + "화학물질관리법", + "소방시설 설치 및 관리에 관한 법률", + "전기사업법", + "고압가스 안전관리법", +] + + +async def run(): + """법령 변경 모니터링 실행""" + law_oc = os.getenv("LAW_OC", "") + if not law_oc: + logger.warning("LAW_OC 미설정 — 법령 API 승인 대기 중") + return + + async with async_session() as session: + # 마지막 체크 날짜 조회 + state = await session.execute( + select(AutomationState).where(AutomationState.job_name == "law_monitor") + ) + state_row = state.scalar_one_or_none() + last_check = state_row.last_check_value if state_row else None + + today = datetime.now(timezone.utc).strftime("%Y%m%d") + if last_check == today: + logger.info("오늘 이미 체크 완료") + return + + new_count = 0 + async with httpx.AsyncClient(timeout=30) as client: + for law_name in TIER1_LAWS: + try: + changed = await _check_law(client, law_oc, law_name, session) + if changed: + new_count += 1 + except Exception as e: + logger.error(f"[{law_name}] 체크 실패: {e}") + + # 상태 업데이트 + if state_row: + state_row.last_check_value = today + state_row.last_run_at = datetime.now(timezone.utc) + else: + session.add(AutomationState( + job_name="law_monitor", + last_check_value=today, + last_run_at=datetime.now(timezone.utc), + )) + + await session.commit() + logger.info(f"법령 모니터 완료: {new_count}건 변경 감지") + + +async def _check_law( + client: httpx.AsyncClient, + law_oc: str, + law_name: str, + session, +) -> bool: + """단일 법령 변경 체크 → 변경 시 NAS 저장 + DB 등록 + CalDAV""" + # 법령 정보 조회 + resp = await client.get( + f"{LAW_API_BASE}/DRF/lawService.do", + params={"OC": law_oc, "target": "law", "type": "XML", "query": law_name}, + ) + resp.raise_for_status() + + root = ET.fromstring(resp.text) + total = root.findtext(".//totalCnt", "0") + if total == "0": + return False + + # 첫 번째 결과의 MST (법령 고유번호) + mst = root.findtext(".//법령MST", "") + proclamation_date = root.findtext(".//공포일자", "") + if not mst: + return False + + # 이미 등록된 법령인지 확인 (같은 공포일자) + check_path = f"PKM/Knowledge/Industrial_Safety/Legislation/{law_name}_{proclamation_date}.md" + existing = await session.execute( + select(Document).where(Document.file_path == check_path) + ) + if existing.scalar_one_or_none(): + return False + + # 법령 본문 조회 + text_resp = await client.get( + f"{LAW_API_BASE}/DRF/lawService.do", + params={"OC": law_oc, "target": "law", "MST": mst, "type": "XML"}, + ) + text_resp.raise_for_status() + + # Markdown 변환 + markdown = _law_xml_to_markdown(text_resp.text, law_name) + + # NAS 저장 + full_path = Path(settings.nas_mount_path) / check_path + full_path.parent.mkdir(parents=True, exist_ok=True) + full_path.write_text(markdown, encoding="utf-8") + + # DB 등록 + doc = Document( + file_path=check_path, + file_hash=file_hash(full_path), + file_format="md", + file_size=len(markdown.encode()), + file_type="immutable", + title=f"{law_name} ({proclamation_date})", + source_channel="law_monitor", + data_origin="work", + ) + session.add(doc) + await session.flush() + + # 처리 큐 등록 + session.add(ProcessingQueue( + document_id=doc.id, stage="extract", status="pending", + )) + + # CalDAV 태스크 생성 + caldav_url = os.getenv("CALDAV_URL", "") + caldav_user = os.getenv("CALDAV_USER", "") + caldav_pass = os.getenv("CALDAV_PASS", "") + if caldav_url and caldav_user: + create_caldav_todo( + caldav_url, caldav_user, caldav_pass, + title=f"법령 검토: {law_name}", + description=f"공포일자: {proclamation_date}\n경로: {check_path}", + due_days=7, + ) + + logger.info(f"[법령] {law_name} ({proclamation_date}) 등록 완료") + return True + + +def _law_xml_to_markdown(xml_text: str, law_name: str) -> str: + """법령 XML을 Markdown으로 변환 (장/조 구조)""" + root = ET.fromstring(xml_text) + lines = [f"# {law_name}\n"] + + for article in root.iter("조문단위"): + num = article.findtext("조문번호", "") + title = article.findtext("조문제목", "") + content = article.findtext("조문내용", "") + + if title: + lines.append(f"\n## 제{num}조 ({title})\n") + elif num: + lines.append(f"\n## 제{num}조\n") + + if content: + # HTML 태그 제거 + clean = re.sub(r"<[^>]+>", "", content).strip() + lines.append(f"{clean}\n") + + return "\n".join(lines) diff --git a/app/workers/mailplus_archive.py b/app/workers/mailplus_archive.py new file mode 100644 index 0000000..75be597 --- /dev/null +++ b/app/workers/mailplus_archive.py @@ -0,0 +1,197 @@ +"""이메일 수집 워커 — Synology MailPlus IMAP → NAS 저장 + DB 등록 + +v1 scripts/mailplus_archive.py에서 포팅. +imaplib (동기)를 asyncio.to_thread()로 래핑. +""" + +import asyncio +import email +import imaplib +import os +import re +from datetime import datetime, timedelta, timezone +from email.header import decode_header +from pathlib import Path + +from sqlalchemy import select + +from core.config import settings +from core.database import async_session +from core.utils import file_hash, send_smtp_email, setup_logger +from models.automation import AutomationState +from models.document import Document +from models.queue import ProcessingQueue + +logger = setup_logger("mailplus_archive") + +# 업무 키워드 (data_origin 자동 감지) +WORK_KEYWORDS = {"테크니컬코리아", "TK", "공장", "생산", "사내", "안전", "점검"} + + +def _decode_mime_header(raw: str) -> str: + """MIME 헤더 디코딩""" + parts = decode_header(raw) + decoded = [] + for data, charset in parts: + if isinstance(data, bytes): + decoded.append(data.decode(charset or "utf-8", errors="replace")) + else: + decoded.append(data) + return "".join(decoded) + + +def _sanitize_filename(name: str, max_len: int = 80) -> str: + """파일명에 사용 불가한 문자 제거""" + clean = re.sub(r'[<>:"/\\|?*\x00-\x1f]', "_", name) + return clean[:max_len].strip() + + +def _detect_origin(subject: str, body: str) -> str: + """work/external 자동 감지""" + text = f"{subject} {body[:500]}".lower() + for kw in WORK_KEYWORDS: + if kw.lower() in text: + return "work" + return "external" + + +def _fetch_emails_sync(host: str, port: int, user: str, password: str, last_uid: int | None): + """동기 IMAP 메일 가져오기 (asyncio.to_thread에서 실행)""" + results = [] + conn = imaplib.IMAP4_SSL(host, port, timeout=30) + try: + conn.login(user, password) + conn.select("INBOX") + + if last_uid: + # 증분 동기화: last_uid 이후 + _, data = conn.uid("search", None, f"UID {last_uid + 1}:*") + else: + # 최초 실행: 최근 7일 + since = (datetime.now() - timedelta(days=7)).strftime("%d-%b-%Y") + _, data = conn.uid("search", None, f"SINCE {since}") + + uids = data[0].split() + for uid_bytes in uids: + uid = int(uid_bytes) + _, msg_data = conn.uid("fetch", uid_bytes, "(RFC822)") + if msg_data[0] is None: + continue + raw = msg_data[0][1] + results.append((uid, raw)) + finally: + conn.logout() + + return results + + +async def run(): + """이메일 수집 실행""" + host = os.getenv("MAILPLUS_HOST", "") + port = int(os.getenv("MAILPLUS_PORT", "993")) + user = os.getenv("MAILPLUS_USER", "") + password = os.getenv("MAILPLUS_PASS", "") + + if not all([host, user, password]): + logger.warning("MailPlus 인증 정보 미설정") + return + + async with async_session() as session: + # 마지막 UID 조회 + state = await session.execute( + select(AutomationState).where(AutomationState.job_name == "mailplus") + ) + state_row = state.scalar_one_or_none() + last_uid = int(state_row.last_check_value) if state_row and state_row.last_check_value else None + + # IMAP 동기 호출을 비동기로 래핑 + try: + emails = await asyncio.to_thread( + _fetch_emails_sync, host, port, user, password, last_uid, + ) + except Exception as e: + logger.error(f"IMAP 연결 실패: {e}") + return + + if not emails: + logger.info("새 이메일 없음") + return + + # 이메일 저장 디렉토리 + email_dir = Path(settings.nas_mount_path) / "PKM" / "Archive" / "emails" + email_dir.mkdir(parents=True, exist_ok=True) + + max_uid = last_uid or 0 + archived = [] + + for uid, raw_bytes in emails: + try: + msg = email.message_from_bytes(raw_bytes) + subject = _decode_mime_header(msg.get("Subject", "제목없음")) + date_str = msg.get("Date", "") + date = datetime.now().strftime("%Y%m%d") + + # .eml 파일 저장 + safe_subject = _sanitize_filename(subject) + filename = f"{date}_{uid}_{safe_subject}.eml" + eml_path = email_dir / filename + eml_path.write_bytes(raw_bytes) + + # 본문 추출 (텍스트 파트) + body = "" + if msg.is_multipart(): + for part in msg.walk(): + if part.get_content_type() == "text/plain": + body = part.get_payload(decode=True).decode("utf-8", errors="replace") + break + else: + body = msg.get_payload(decode=True).decode("utf-8", errors="replace") + + # DB 등록 + rel_path = str(eml_path.relative_to(Path(settings.nas_mount_path))) + origin = _detect_origin(subject, body) + + doc = Document( + file_path=rel_path, + file_hash=file_hash(eml_path), + file_format="eml", + file_size=len(raw_bytes), + file_type="immutable", + title=subject, + source_channel="email", + data_origin=origin, + ) + session.add(doc) + await session.flush() + + session.add(ProcessingQueue( + document_id=doc.id, stage="extract", status="pending", + )) + + archived.append(subject) + max_uid = max(max_uid, uid) + + except Exception as e: + logger.error(f"UID {uid} 처리 실패: {e}") + + # 상태 업데이트 + if state_row: + state_row.last_check_value = str(max_uid) + state_row.last_run_at = datetime.now(timezone.utc) + else: + session.add(AutomationState( + job_name="mailplus", + last_check_value=str(max_uid), + last_run_at=datetime.now(timezone.utc), + )) + + await session.commit() + + # SMTP 알림 + smtp_host = os.getenv("MAILPLUS_HOST", "") + smtp_port = int(os.getenv("MAILPLUS_SMTP_PORT", "465")) + if archived and smtp_host: + body = f"이메일 {len(archived)}건 수집 완료:\n\n" + "\n".join(f"- {s}" for s in archived) + send_smtp_email(smtp_host, smtp_port, user, password, "PKM 이메일 수집 알림", body) + + logger.info(f"이메일 {len(archived)}건 수집 완료 (max_uid={max_uid})") diff --git a/migrations/003_automation_state.sql b/migrations/003_automation_state.sql new file mode 100644 index 0000000..1dcf517 --- /dev/null +++ b/migrations/003_automation_state.sql @@ -0,0 +1,8 @@ +-- 자동화 워커 상태 저장 (증분 동기화용) +CREATE TABLE automation_state ( + id BIGSERIAL PRIMARY KEY, + job_name VARCHAR(50) NOT NULL UNIQUE, + last_check_value TEXT, + last_run_at TIMESTAMPTZ, + updated_at TIMESTAMPTZ DEFAULT NOW() +);