- Add automation_state table for incremental sync (last UID, last check) - Add law_monitor worker: 국가법령정보센터 API → NAS/DB/CalDAV VTODO (LAW_OC 승인 대기 중, 코드 완성) - Add mailplus_archive worker: IMAP(993) → .eml NAS save + DB + SMTP notification (imaplib via asyncio.to_thread, timeout=30) - Add daily_digest worker: PostgreSQL/pipeline stats → Markdown + SMTP (documents, law changes, email, queue errors, inbox backlog) - Add CalDAV VTODO helper and SMTP email helper to core/utils.py - Wire 3 cron jobs in APScheduler (law@07:00, mail@07:00+18:00, digest@20:00) with timezone=Asia/Seoul Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
147 lines
5.3 KiB
Python
147 lines
5.3 KiB
Python
"""일일 다이제스트 워커 — PostgreSQL/CalDAV 쿼리 → Markdown + SMTP
|
|
|
|
v1 scripts/pkm_daily_digest.py에서 포팅.
|
|
DEVONthink/OmniFocus → PostgreSQL/CalDAV 쿼리로 전환.
|
|
"""
|
|
|
|
import os
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
|
|
from sqlalchemy import func, select, text
|
|
|
|
from core.config import settings
|
|
from core.database import async_session
|
|
from core.utils import send_smtp_email, setup_logger
|
|
from models.document import Document
|
|
from models.queue import ProcessingQueue
|
|
|
|
logger = setup_logger("daily_digest")
|
|
|
|
|
|
async def run():
|
|
"""일일 다이제스트 생성 + 저장 + 발송"""
|
|
today = datetime.now(timezone.utc).strftime("%Y-%m-%d")
|
|
sections = []
|
|
|
|
async with async_session() as session:
|
|
# ─── 1. 오늘 추가된 문서 ───
|
|
added = await session.execute(
|
|
select(Document.ai_domain, func.count(Document.id))
|
|
.where(func.date(Document.created_at) == today)
|
|
.group_by(Document.ai_domain)
|
|
)
|
|
added_rows = added.all()
|
|
total_added = sum(row[1] for row in added_rows)
|
|
|
|
section = f"## 오늘 추가된 문서 ({total_added}건)\n"
|
|
if added_rows:
|
|
for domain, count in added_rows:
|
|
section += f"- {domain or '미분류'}: {count}건\n"
|
|
else:
|
|
section += "- 없음\n"
|
|
sections.append(section)
|
|
|
|
# ─── 2. 법령 변경 ───
|
|
law_docs = await session.execute(
|
|
select(Document.title)
|
|
.where(
|
|
Document.source_channel == "law_monitor",
|
|
func.date(Document.created_at) == today,
|
|
)
|
|
)
|
|
law_rows = law_docs.scalars().all()
|
|
section = f"## 법령 변경 ({len(law_rows)}건)\n"
|
|
if law_rows:
|
|
for title in law_rows:
|
|
section += f"- {title}\n"
|
|
else:
|
|
section += "- 변경 없음\n"
|
|
sections.append(section)
|
|
|
|
# ─── 3. 이메일 수집 ───
|
|
email_count = await session.execute(
|
|
select(func.count(Document.id))
|
|
.where(
|
|
Document.source_channel == "email",
|
|
func.date(Document.created_at) == today,
|
|
)
|
|
)
|
|
email_total = email_count.scalar() or 0
|
|
sections.append(f"## 이메일 수집\n- {email_total}건 아카이브\n")
|
|
|
|
# ─── 4. 처리 파이프라인 상태 ───
|
|
queue_stats = await session.execute(
|
|
text("""
|
|
SELECT stage, status, COUNT(*)
|
|
FROM processing_queue
|
|
WHERE created_at > NOW() - INTERVAL '24 hours'
|
|
GROUP BY stage, status
|
|
ORDER BY stage, status
|
|
""")
|
|
)
|
|
queue_rows = queue_stats.all()
|
|
section = "## 파이프라인 상태 (24h)\n"
|
|
if queue_rows:
|
|
for stage, status, count in queue_rows:
|
|
section += f"- {stage}/{status}: {count}건\n"
|
|
else:
|
|
section += "- 처리 항목 없음\n"
|
|
|
|
# 실패 건수 강조
|
|
failed = await session.execute(
|
|
select(func.count())
|
|
.select_from(ProcessingQueue)
|
|
.where(
|
|
ProcessingQueue.status == "failed",
|
|
ProcessingQueue.created_at > text("NOW() - INTERVAL '24 hours'"),
|
|
)
|
|
)
|
|
failed_count = failed.scalar() or 0
|
|
if failed_count > 0:
|
|
section += f"\n⚠️ **실패 {failed_count}건** — 수동 확인 필요\n"
|
|
sections.append(section)
|
|
|
|
# ─── 5. Inbox 미분류 ───
|
|
inbox_count = await session.execute(
|
|
select(func.count(Document.id))
|
|
.where(Document.file_path.like("PKM/Inbox/%"))
|
|
)
|
|
inbox_total = inbox_count.scalar() or 0
|
|
if inbox_total > 0:
|
|
sections.append(f"## Inbox 미분류\n- {inbox_total}건 대기 중\n")
|
|
|
|
# ─── Markdown 조합 ───
|
|
date_display = datetime.now(timezone.utc).strftime("%Y년 %m월 %d일")
|
|
markdown = f"# PKM 일일 다이제스트 — {date_display}\n\n"
|
|
markdown += "\n".join(sections)
|
|
markdown += f"\n---\n*생성: {datetime.now(timezone.utc).isoformat()}*\n"
|
|
|
|
# ─── NAS 저장 ───
|
|
digest_dir = Path(settings.nas_mount_path) / "PKM" / "Archive" / "digests"
|
|
digest_dir.mkdir(parents=True, exist_ok=True)
|
|
digest_path = digest_dir / f"{today}_digest.md"
|
|
digest_path.write_text(markdown, encoding="utf-8")
|
|
|
|
# ─── 90일 초과 아카이브 ───
|
|
archive_dir = digest_dir / "archive"
|
|
archive_dir.mkdir(exist_ok=True)
|
|
cutoff = datetime.now(timezone.utc).timestamp() - (90 * 86400)
|
|
for old in digest_dir.glob("*_digest.md"):
|
|
if old.stat().st_mtime < cutoff:
|
|
old.rename(archive_dir / old.name)
|
|
|
|
# ─── SMTP 발송 ───
|
|
smtp_host = os.getenv("MAILPLUS_HOST", "")
|
|
smtp_port = int(os.getenv("MAILPLUS_SMTP_PORT", "465"))
|
|
smtp_user = os.getenv("MAILPLUS_USER", "")
|
|
smtp_pass = os.getenv("MAILPLUS_PASS", "")
|
|
if smtp_host and smtp_user:
|
|
send_smtp_email(
|
|
smtp_host, smtp_port, smtp_user, smtp_pass,
|
|
f"PKM 다이제스트 — {date_display}",
|
|
markdown,
|
|
)
|
|
|
|
logger.info(f"다이제스트 생성 완료: {digest_path}")
|