feat: implement Phase 3 automation workers

- Add automation_state table for incremental sync (last UID, last check)
- Add law_monitor worker: 국가법령정보센터 API → NAS/DB/CalDAV VTODO
  (LAW_OC 승인 대기 중, 코드 완성)
- Add mailplus_archive worker: IMAP(993) → .eml NAS save + DB + SMTP
  notification (imaplib via asyncio.to_thread, timeout=30)
- Add daily_digest worker: PostgreSQL/pipeline stats → Markdown + SMTP
  (documents, law changes, email, queue errors, inbox backlog)
- Add CalDAV VTODO helper and SMTP email helper to core/utils.py
- Wire 3 cron jobs in APScheduler (law@07:00, mail@07:00+18:00,
  digest@20:00) with timezone=Asia/Seoul

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Hyungi Ahn
2026-04-02 15:24:50 +09:00
parent a5312c044b
commit 31d5498f8d
7 changed files with 652 additions and 1 deletions

View File

@@ -44,3 +44,81 @@ def count_log_errors(log_path: str) -> int:
return sum(1 for line in f if "[ERROR]" in line)
except FileNotFoundError:
return 0
# ─── CalDAV 헬퍼 ───
def create_caldav_todo(
caldav_url: str,
username: str,
password: str,
title: str,
description: str = "",
due_days: int = 7,
) -> str | None:
"""Synology Calendar에 VTODO 생성, UID 반환"""
import uuid
from datetime import datetime, timedelta, timezone
import caldav
try:
client = caldav.DAVClient(url=caldav_url, username=username, password=password)
principal = client.principal()
calendars = principal.calendars()
if not calendars:
return None
calendar = calendars[0]
uid = str(uuid.uuid4())
due = datetime.now(timezone.utc) + timedelta(days=due_days)
due_str = due.strftime("%Y%m%dT%H%M%SZ")
vtodo = f"""BEGIN:VCALENDAR
VERSION:2.0
BEGIN:VTODO
UID:{uid}
SUMMARY:{title}
DESCRIPTION:{description}
DUE:{due_str}
STATUS:NEEDS-ACTION
PRIORITY:5
END:VTODO
END:VCALENDAR"""
calendar.save_event(vtodo)
return uid
except Exception as e:
logging.getLogger("caldav").error(f"CalDAV VTODO 생성 실패: {e}")
return None
# ─── SMTP 헬퍼 ───
def send_smtp_email(
host: str,
port: int,
username: str,
password: str,
subject: str,
body: str,
to_addr: str | None = None,
):
"""Synology MailPlus SMTP로 이메일 발송"""
import smtplib
from email.mime.text import MIMEText
to_addr = to_addr or username
msg = MIMEText(body, "plain", "utf-8")
msg["Subject"] = subject
msg["From"] = username
msg["To"] = to_addr
try:
with smtplib.SMTP_SSL(host, port, timeout=30) as server:
server.login(username, password)
server.send_message(msg)
except Exception as e:
logging.getLogger("smtp").error(f"SMTP 발송 실패: {e}")

View File

@@ -19,16 +19,26 @@ from models.user import User
async def lifespan(app: FastAPI):
"""앱 시작/종료 시 실행되는 lifespan 핸들러"""
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.cron import CronTrigger
from workers.daily_digest import run as daily_digest_run
from workers.file_watcher import watch_inbox
from workers.law_monitor import run as law_monitor_run
from workers.mailplus_archive import run as mailplus_run
from workers.queue_consumer import consume_queue
# 시작: DB 연결 확인
await init_db()
# APScheduler: 백그라운드 작업
scheduler = AsyncIOScheduler()
scheduler = AsyncIOScheduler(timezone="Asia/Seoul")
# 상시 실행
scheduler.add_job(consume_queue, "interval", minutes=1, id="queue_consumer")
scheduler.add_job(watch_inbox, "interval", minutes=5, id="file_watcher")
# 일일 스케줄 (KST)
scheduler.add_job(law_monitor_run, CronTrigger(hour=7), id="law_monitor")
scheduler.add_job(mailplus_run, CronTrigger(hour=7), id="mailplus_morning")
scheduler.add_job(mailplus_run, CronTrigger(hour=18), id="mailplus_evening")
scheduler.add_job(daily_digest_run, CronTrigger(hour=20), id="daily_digest")
scheduler.start()
yield

20
app/models/automation.py Normal file
View File

@@ -0,0 +1,20 @@
"""automation_state 테이블 ORM — 자동화 워커 증분 동기화 상태"""
from datetime import datetime
from sqlalchemy import BigInteger, DateTime, String, Text
from sqlalchemy.orm import Mapped, mapped_column
from core.database import Base
class AutomationState(Base):
__tablename__ = "automation_state"
id: Mapped[int] = mapped_column(BigInteger, primary_key=True)
job_name: Mapped[str] = mapped_column(String(50), unique=True, nullable=False)
last_check_value: Mapped[str | None] = mapped_column(Text)
last_run_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True))
updated_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), default=datetime.now, onupdate=datetime.now
)

146
app/workers/daily_digest.py Normal file
View File

@@ -0,0 +1,146 @@
"""일일 다이제스트 워커 — PostgreSQL/CalDAV 쿼리 → Markdown + SMTP
v1 scripts/pkm_daily_digest.py에서 포팅.
DEVONthink/OmniFocus → PostgreSQL/CalDAV 쿼리로 전환.
"""
import os
from datetime import datetime, timezone
from pathlib import Path
from sqlalchemy import func, select, text
from core.config import settings
from core.database import async_session
from core.utils import send_smtp_email, setup_logger
from models.document import Document
from models.queue import ProcessingQueue
logger = setup_logger("daily_digest")
async def run():
"""일일 다이제스트 생성 + 저장 + 발송"""
today = datetime.now(timezone.utc).strftime("%Y-%m-%d")
sections = []
async with async_session() as session:
# ─── 1. 오늘 추가된 문서 ───
added = await session.execute(
select(Document.ai_domain, func.count(Document.id))
.where(func.date(Document.created_at) == today)
.group_by(Document.ai_domain)
)
added_rows = added.all()
total_added = sum(row[1] for row in added_rows)
section = f"## 오늘 추가된 문서 ({total_added}건)\n"
if added_rows:
for domain, count in added_rows:
section += f"- {domain or '미분류'}: {count}\n"
else:
section += "- 없음\n"
sections.append(section)
# ─── 2. 법령 변경 ───
law_docs = await session.execute(
select(Document.title)
.where(
Document.source_channel == "law_monitor",
func.date(Document.created_at) == today,
)
)
law_rows = law_docs.scalars().all()
section = f"## 법령 변경 ({len(law_rows)}건)\n"
if law_rows:
for title in law_rows:
section += f"- {title}\n"
else:
section += "- 변경 없음\n"
sections.append(section)
# ─── 3. 이메일 수집 ───
email_count = await session.execute(
select(func.count(Document.id))
.where(
Document.source_channel == "email",
func.date(Document.created_at) == today,
)
)
email_total = email_count.scalar() or 0
sections.append(f"## 이메일 수집\n- {email_total}건 아카이브\n")
# ─── 4. 처리 파이프라인 상태 ───
queue_stats = await session.execute(
text("""
SELECT stage, status, COUNT(*)
FROM processing_queue
WHERE created_at > NOW() - INTERVAL '24 hours'
GROUP BY stage, status
ORDER BY stage, status
""")
)
queue_rows = queue_stats.all()
section = "## 파이프라인 상태 (24h)\n"
if queue_rows:
for stage, status, count in queue_rows:
section += f"- {stage}/{status}: {count}\n"
else:
section += "- 처리 항목 없음\n"
# 실패 건수 강조
failed = await session.execute(
select(func.count())
.select_from(ProcessingQueue)
.where(
ProcessingQueue.status == "failed",
ProcessingQueue.created_at > text("NOW() - INTERVAL '24 hours'"),
)
)
failed_count = failed.scalar() or 0
if failed_count > 0:
section += f"\n⚠️ **실패 {failed_count}건** — 수동 확인 필요\n"
sections.append(section)
# ─── 5. Inbox 미분류 ───
inbox_count = await session.execute(
select(func.count(Document.id))
.where(Document.file_path.like("PKM/Inbox/%"))
)
inbox_total = inbox_count.scalar() or 0
if inbox_total > 0:
sections.append(f"## Inbox 미분류\n- {inbox_total}건 대기 중\n")
# ─── Markdown 조합 ───
date_display = datetime.now(timezone.utc).strftime("%Y년 %m월 %d")
markdown = f"# PKM 일일 다이제스트 — {date_display}\n\n"
markdown += "\n".join(sections)
markdown += f"\n---\n*생성: {datetime.now(timezone.utc).isoformat()}*\n"
# ─── NAS 저장 ───
digest_dir = Path(settings.nas_mount_path) / "PKM" / "Archive" / "digests"
digest_dir.mkdir(parents=True, exist_ok=True)
digest_path = digest_dir / f"{today}_digest.md"
digest_path.write_text(markdown, encoding="utf-8")
# ─── 90일 초과 아카이브 ───
archive_dir = digest_dir / "archive"
archive_dir.mkdir(exist_ok=True)
cutoff = datetime.now(timezone.utc).timestamp() - (90 * 86400)
for old in digest_dir.glob("*_digest.md"):
if old.stat().st_mtime < cutoff:
old.rename(archive_dir / old.name)
# ─── SMTP 발송 ───
smtp_host = os.getenv("MAILPLUS_HOST", "")
smtp_port = int(os.getenv("MAILPLUS_SMTP_PORT", "465"))
smtp_user = os.getenv("MAILPLUS_USER", "")
smtp_pass = os.getenv("MAILPLUS_PASS", "")
if smtp_host and smtp_user:
send_smtp_email(
smtp_host, smtp_port, smtp_user, smtp_pass,
f"PKM 다이제스트 — {date_display}",
markdown,
)
logger.info(f"다이제스트 생성 완료: {digest_path}")

192
app/workers/law_monitor.py Normal file
View File

@@ -0,0 +1,192 @@
"""법령 모니터 워커 — 국가법령정보센터 API로 법령 변경 감지
LAW_OC 승인 대기 중 — 코드 완성, 실 호출은 승인 후.
v1 scripts/law_monitor.py에서 포팅.
"""
import os
import re
from datetime import datetime, timezone
from pathlib import Path
from xml.etree import ElementTree as ET
import httpx
from sqlalchemy import select
from core.config import settings
from core.database import async_session
from core.utils import create_caldav_todo, file_hash, send_smtp_email, setup_logger
from models.automation import AutomationState
from models.document import Document
from models.queue import ProcessingQueue
logger = setup_logger("law_monitor")
LAW_API_BASE = "https://www.law.go.kr"
# 모니터링 대상 법령 (Tier 1: 핵심)
TIER1_LAWS = [
"산업안전보건법",
"산업안전보건법 시행령",
"산업안전보건법 시행규칙",
"중대재해 처벌 등에 관한 법률",
"중대재해 처벌 등에 관한 법률 시행령",
"건설기술 진흥법",
"건설기술 진흥법 시행령",
"위험물안전관리법",
"화학물질관리법",
"소방시설 설치 및 관리에 관한 법률",
"전기사업법",
"고압가스 안전관리법",
]
async def run():
"""법령 변경 모니터링 실행"""
law_oc = os.getenv("LAW_OC", "")
if not law_oc:
logger.warning("LAW_OC 미설정 — 법령 API 승인 대기 중")
return
async with async_session() as session:
# 마지막 체크 날짜 조회
state = await session.execute(
select(AutomationState).where(AutomationState.job_name == "law_monitor")
)
state_row = state.scalar_one_or_none()
last_check = state_row.last_check_value if state_row else None
today = datetime.now(timezone.utc).strftime("%Y%m%d")
if last_check == today:
logger.info("오늘 이미 체크 완료")
return
new_count = 0
async with httpx.AsyncClient(timeout=30) as client:
for law_name in TIER1_LAWS:
try:
changed = await _check_law(client, law_oc, law_name, session)
if changed:
new_count += 1
except Exception as e:
logger.error(f"[{law_name}] 체크 실패: {e}")
# 상태 업데이트
if state_row:
state_row.last_check_value = today
state_row.last_run_at = datetime.now(timezone.utc)
else:
session.add(AutomationState(
job_name="law_monitor",
last_check_value=today,
last_run_at=datetime.now(timezone.utc),
))
await session.commit()
logger.info(f"법령 모니터 완료: {new_count}건 변경 감지")
async def _check_law(
client: httpx.AsyncClient,
law_oc: str,
law_name: str,
session,
) -> bool:
"""단일 법령 변경 체크 → 변경 시 NAS 저장 + DB 등록 + CalDAV"""
# 법령 정보 조회
resp = await client.get(
f"{LAW_API_BASE}/DRF/lawService.do",
params={"OC": law_oc, "target": "law", "type": "XML", "query": law_name},
)
resp.raise_for_status()
root = ET.fromstring(resp.text)
total = root.findtext(".//totalCnt", "0")
if total == "0":
return False
# 첫 번째 결과의 MST (법령 고유번호)
mst = root.findtext(".//법령MST", "")
proclamation_date = root.findtext(".//공포일자", "")
if not mst:
return False
# 이미 등록된 법령인지 확인 (같은 공포일자)
check_path = f"PKM/Knowledge/Industrial_Safety/Legislation/{law_name}_{proclamation_date}.md"
existing = await session.execute(
select(Document).where(Document.file_path == check_path)
)
if existing.scalar_one_or_none():
return False
# 법령 본문 조회
text_resp = await client.get(
f"{LAW_API_BASE}/DRF/lawService.do",
params={"OC": law_oc, "target": "law", "MST": mst, "type": "XML"},
)
text_resp.raise_for_status()
# Markdown 변환
markdown = _law_xml_to_markdown(text_resp.text, law_name)
# NAS 저장
full_path = Path(settings.nas_mount_path) / check_path
full_path.parent.mkdir(parents=True, exist_ok=True)
full_path.write_text(markdown, encoding="utf-8")
# DB 등록
doc = Document(
file_path=check_path,
file_hash=file_hash(full_path),
file_format="md",
file_size=len(markdown.encode()),
file_type="immutable",
title=f"{law_name} ({proclamation_date})",
source_channel="law_monitor",
data_origin="work",
)
session.add(doc)
await session.flush()
# 처리 큐 등록
session.add(ProcessingQueue(
document_id=doc.id, stage="extract", status="pending",
))
# CalDAV 태스크 생성
caldav_url = os.getenv("CALDAV_URL", "")
caldav_user = os.getenv("CALDAV_USER", "")
caldav_pass = os.getenv("CALDAV_PASS", "")
if caldav_url and caldav_user:
create_caldav_todo(
caldav_url, caldav_user, caldav_pass,
title=f"법령 검토: {law_name}",
description=f"공포일자: {proclamation_date}\n경로: {check_path}",
due_days=7,
)
logger.info(f"[법령] {law_name} ({proclamation_date}) 등록 완료")
return True
def _law_xml_to_markdown(xml_text: str, law_name: str) -> str:
"""법령 XML을 Markdown으로 변환 (장/조 구조)"""
root = ET.fromstring(xml_text)
lines = [f"# {law_name}\n"]
for article in root.iter("조문단위"):
num = article.findtext("조문번호", "")
title = article.findtext("조문제목", "")
content = article.findtext("조문내용", "")
if title:
lines.append(f"\n## 제{num}조 ({title})\n")
elif num:
lines.append(f"\n## 제{num}\n")
if content:
# HTML 태그 제거
clean = re.sub(r"<[^>]+>", "", content).strip()
lines.append(f"{clean}\n")
return "\n".join(lines)

View File

@@ -0,0 +1,197 @@
"""이메일 수집 워커 — Synology MailPlus IMAP → NAS 저장 + DB 등록
v1 scripts/mailplus_archive.py에서 포팅.
imaplib (동기)를 asyncio.to_thread()로 래핑.
"""
import asyncio
import email
import imaplib
import os
import re
from datetime import datetime, timedelta, timezone
from email.header import decode_header
from pathlib import Path
from sqlalchemy import select
from core.config import settings
from core.database import async_session
from core.utils import file_hash, send_smtp_email, setup_logger
from models.automation import AutomationState
from models.document import Document
from models.queue import ProcessingQueue
logger = setup_logger("mailplus_archive")
# 업무 키워드 (data_origin 자동 감지)
WORK_KEYWORDS = {"테크니컬코리아", "TK", "공장", "생산", "사내", "안전", "점검"}
def _decode_mime_header(raw: str) -> str:
"""MIME 헤더 디코딩"""
parts = decode_header(raw)
decoded = []
for data, charset in parts:
if isinstance(data, bytes):
decoded.append(data.decode(charset or "utf-8", errors="replace"))
else:
decoded.append(data)
return "".join(decoded)
def _sanitize_filename(name: str, max_len: int = 80) -> str:
"""파일명에 사용 불가한 문자 제거"""
clean = re.sub(r'[<>:"/\\|?*\x00-\x1f]', "_", name)
return clean[:max_len].strip()
def _detect_origin(subject: str, body: str) -> str:
"""work/external 자동 감지"""
text = f"{subject} {body[:500]}".lower()
for kw in WORK_KEYWORDS:
if kw.lower() in text:
return "work"
return "external"
def _fetch_emails_sync(host: str, port: int, user: str, password: str, last_uid: int | None):
"""동기 IMAP 메일 가져오기 (asyncio.to_thread에서 실행)"""
results = []
conn = imaplib.IMAP4_SSL(host, port, timeout=30)
try:
conn.login(user, password)
conn.select("INBOX")
if last_uid:
# 증분 동기화: last_uid 이후
_, data = conn.uid("search", None, f"UID {last_uid + 1}:*")
else:
# 최초 실행: 최근 7일
since = (datetime.now() - timedelta(days=7)).strftime("%d-%b-%Y")
_, data = conn.uid("search", None, f"SINCE {since}")
uids = data[0].split()
for uid_bytes in uids:
uid = int(uid_bytes)
_, msg_data = conn.uid("fetch", uid_bytes, "(RFC822)")
if msg_data[0] is None:
continue
raw = msg_data[0][1]
results.append((uid, raw))
finally:
conn.logout()
return results
async def run():
"""이메일 수집 실행"""
host = os.getenv("MAILPLUS_HOST", "")
port = int(os.getenv("MAILPLUS_PORT", "993"))
user = os.getenv("MAILPLUS_USER", "")
password = os.getenv("MAILPLUS_PASS", "")
if not all([host, user, password]):
logger.warning("MailPlus 인증 정보 미설정")
return
async with async_session() as session:
# 마지막 UID 조회
state = await session.execute(
select(AutomationState).where(AutomationState.job_name == "mailplus")
)
state_row = state.scalar_one_or_none()
last_uid = int(state_row.last_check_value) if state_row and state_row.last_check_value else None
# IMAP 동기 호출을 비동기로 래핑
try:
emails = await asyncio.to_thread(
_fetch_emails_sync, host, port, user, password, last_uid,
)
except Exception as e:
logger.error(f"IMAP 연결 실패: {e}")
return
if not emails:
logger.info("새 이메일 없음")
return
# 이메일 저장 디렉토리
email_dir = Path(settings.nas_mount_path) / "PKM" / "Archive" / "emails"
email_dir.mkdir(parents=True, exist_ok=True)
max_uid = last_uid or 0
archived = []
for uid, raw_bytes in emails:
try:
msg = email.message_from_bytes(raw_bytes)
subject = _decode_mime_header(msg.get("Subject", "제목없음"))
date_str = msg.get("Date", "")
date = datetime.now().strftime("%Y%m%d")
# .eml 파일 저장
safe_subject = _sanitize_filename(subject)
filename = f"{date}_{uid}_{safe_subject}.eml"
eml_path = email_dir / filename
eml_path.write_bytes(raw_bytes)
# 본문 추출 (텍스트 파트)
body = ""
if msg.is_multipart():
for part in msg.walk():
if part.get_content_type() == "text/plain":
body = part.get_payload(decode=True).decode("utf-8", errors="replace")
break
else:
body = msg.get_payload(decode=True).decode("utf-8", errors="replace")
# DB 등록
rel_path = str(eml_path.relative_to(Path(settings.nas_mount_path)))
origin = _detect_origin(subject, body)
doc = Document(
file_path=rel_path,
file_hash=file_hash(eml_path),
file_format="eml",
file_size=len(raw_bytes),
file_type="immutable",
title=subject,
source_channel="email",
data_origin=origin,
)
session.add(doc)
await session.flush()
session.add(ProcessingQueue(
document_id=doc.id, stage="extract", status="pending",
))
archived.append(subject)
max_uid = max(max_uid, uid)
except Exception as e:
logger.error(f"UID {uid} 처리 실패: {e}")
# 상태 업데이트
if state_row:
state_row.last_check_value = str(max_uid)
state_row.last_run_at = datetime.now(timezone.utc)
else:
session.add(AutomationState(
job_name="mailplus",
last_check_value=str(max_uid),
last_run_at=datetime.now(timezone.utc),
))
await session.commit()
# SMTP 알림
smtp_host = os.getenv("MAILPLUS_HOST", "")
smtp_port = int(os.getenv("MAILPLUS_SMTP_PORT", "465"))
if archived and smtp_host:
body = f"이메일 {len(archived)}건 수집 완료:\n\n" + "\n".join(f"- {s}" for s in archived)
send_smtp_email(smtp_host, smtp_port, user, password, "PKM 이메일 수집 알림", body)
logger.info(f"이메일 {len(archived)}건 수집 완료 (max_uid={max_uid})")

View File

@@ -0,0 +1,8 @@
-- 자동화 워커 상태 저장 (증분 동기화용)
CREATE TABLE automation_state (
id BIGSERIAL PRIMARY KEY,
job_name VARCHAR(50) NOT NULL UNIQUE,
last_check_value TEXT,
last_run_at TIMESTAMPTZ,
updated_at TIMESTAMPTZ DEFAULT NOW()
);