Files
hyungi_document_server/app/workers/daily_digest.py
Hyungi Ahn 31d5498f8d feat: implement Phase 3 automation workers
- Add automation_state table for incremental sync (last UID, last check)
- Add law_monitor worker: 국가법령정보센터 API → NAS/DB/CalDAV VTODO
  (LAW_OC 승인 대기 중, 코드 완성)
- Add mailplus_archive worker: IMAP(993) → .eml NAS save + DB + SMTP
  notification (imaplib via asyncio.to_thread, timeout=30)
- Add daily_digest worker: PostgreSQL/pipeline stats → Markdown + SMTP
  (documents, law changes, email, queue errors, inbox backlog)
- Add CalDAV VTODO helper and SMTP email helper to core/utils.py
- Wire 3 cron jobs in APScheduler (law@07:00, mail@07:00+18:00,
  digest@20:00) with timezone=Asia/Seoul

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-02 15:24:50 +09:00

147 lines
5.3 KiB
Python

"""일일 다이제스트 워커 — PostgreSQL/CalDAV 쿼리 → Markdown + SMTP
v1 scripts/pkm_daily_digest.py에서 포팅.
DEVONthink/OmniFocus → PostgreSQL/CalDAV 쿼리로 전환.
"""
import os
from datetime import datetime, timezone
from pathlib import Path
from sqlalchemy import func, select, text
from core.config import settings
from core.database import async_session
from core.utils import send_smtp_email, setup_logger
from models.document import Document
from models.queue import ProcessingQueue
logger = setup_logger("daily_digest")
async def run():
"""일일 다이제스트 생성 + 저장 + 발송"""
today = datetime.now(timezone.utc).strftime("%Y-%m-%d")
sections = []
async with async_session() as session:
# ─── 1. 오늘 추가된 문서 ───
added = await session.execute(
select(Document.ai_domain, func.count(Document.id))
.where(func.date(Document.created_at) == today)
.group_by(Document.ai_domain)
)
added_rows = added.all()
total_added = sum(row[1] for row in added_rows)
section = f"## 오늘 추가된 문서 ({total_added}건)\n"
if added_rows:
for domain, count in added_rows:
section += f"- {domain or '미분류'}: {count}\n"
else:
section += "- 없음\n"
sections.append(section)
# ─── 2. 법령 변경 ───
law_docs = await session.execute(
select(Document.title)
.where(
Document.source_channel == "law_monitor",
func.date(Document.created_at) == today,
)
)
law_rows = law_docs.scalars().all()
section = f"## 법령 변경 ({len(law_rows)}건)\n"
if law_rows:
for title in law_rows:
section += f"- {title}\n"
else:
section += "- 변경 없음\n"
sections.append(section)
# ─── 3. 이메일 수집 ───
email_count = await session.execute(
select(func.count(Document.id))
.where(
Document.source_channel == "email",
func.date(Document.created_at) == today,
)
)
email_total = email_count.scalar() or 0
sections.append(f"## 이메일 수집\n- {email_total}건 아카이브\n")
# ─── 4. 처리 파이프라인 상태 ───
queue_stats = await session.execute(
text("""
SELECT stage, status, COUNT(*)
FROM processing_queue
WHERE created_at > NOW() - INTERVAL '24 hours'
GROUP BY stage, status
ORDER BY stage, status
""")
)
queue_rows = queue_stats.all()
section = "## 파이프라인 상태 (24h)\n"
if queue_rows:
for stage, status, count in queue_rows:
section += f"- {stage}/{status}: {count}\n"
else:
section += "- 처리 항목 없음\n"
# 실패 건수 강조
failed = await session.execute(
select(func.count())
.select_from(ProcessingQueue)
.where(
ProcessingQueue.status == "failed",
ProcessingQueue.created_at > text("NOW() - INTERVAL '24 hours'"),
)
)
failed_count = failed.scalar() or 0
if failed_count > 0:
section += f"\n⚠️ **실패 {failed_count}건** — 수동 확인 필요\n"
sections.append(section)
# ─── 5. Inbox 미분류 ───
inbox_count = await session.execute(
select(func.count(Document.id))
.where(Document.file_path.like("PKM/Inbox/%"))
)
inbox_total = inbox_count.scalar() or 0
if inbox_total > 0:
sections.append(f"## Inbox 미분류\n- {inbox_total}건 대기 중\n")
# ─── Markdown 조합 ───
date_display = datetime.now(timezone.utc).strftime("%Y년 %m월 %d")
markdown = f"# PKM 일일 다이제스트 — {date_display}\n\n"
markdown += "\n".join(sections)
markdown += f"\n---\n*생성: {datetime.now(timezone.utc).isoformat()}*\n"
# ─── NAS 저장 ───
digest_dir = Path(settings.nas_mount_path) / "PKM" / "Archive" / "digests"
digest_dir.mkdir(parents=True, exist_ok=True)
digest_path = digest_dir / f"{today}_digest.md"
digest_path.write_text(markdown, encoding="utf-8")
# ─── 90일 초과 아카이브 ───
archive_dir = digest_dir / "archive"
archive_dir.mkdir(exist_ok=True)
cutoff = datetime.now(timezone.utc).timestamp() - (90 * 86400)
for old in digest_dir.glob("*_digest.md"):
if old.stat().st_mtime < cutoff:
old.rename(archive_dir / old.name)
# ─── SMTP 발송 ───
smtp_host = os.getenv("MAILPLUS_HOST", "")
smtp_port = int(os.getenv("MAILPLUS_SMTP_PORT", "465"))
smtp_user = os.getenv("MAILPLUS_USER", "")
smtp_pass = os.getenv("MAILPLUS_PASS", "")
if smtp_host and smtp_user:
send_smtp_email(
smtp_host, smtp_port, smtp_user, smtp_pass,
f"PKM 다이제스트 — {date_display}",
markdown,
)
logger.info(f"다이제스트 생성 완료: {digest_path}")