"""일일 다이제스트 워커 — PostgreSQL/CalDAV 쿼리 → Markdown 생성 v1 scripts/pkm_daily_digest.py에서 포팅. DEVONthink/OmniFocus → PostgreSQL/CalDAV 쿼리로 전환. SMTP 발송은 2026-06-10 제거 (한 번도 전달 성공한 적 없는 기능 — 폐기 결정). """ import asyncio from datetime import datetime, time, timedelta, timezone from zoneinfo import ZoneInfo from pathlib import Path from sqlalchemy import func, select, text from core.config import settings from core.database import async_session from core.utils import setup_logger from models.document import Document from models.queue import ProcessingQueue logger = setup_logger("daily_digest") def _write_and_rotate(digest_dir: Path, today: str, markdown: str) -> Path: """digest 파일 저장 + 90일 초과 아카이브 이동 (blocking — caller 가 to_thread, R8).""" digest_dir.mkdir(parents=True, exist_ok=True) digest_path = digest_dir / f"{today}_digest.md" digest_path.write_text(markdown, encoding="utf-8") archive_dir = digest_dir / "archive" archive_dir.mkdir(exist_ok=True) cutoff = datetime.now(timezone.utc).timestamp() - (90 * 86400) for old in digest_dir.glob("*_digest.md"): if old.stat().st_mtime < cutoff: old.rename(archive_dir / old.name) return digest_path async def run(): """일일 다이제스트 생성 + 저장 + 발송""" # KST 기준 오늘 (cron 이 KST timezone fix 후 20:00 KST 에 fire). kst = ZoneInfo("Asia/Seoul") today = datetime.now(kst).date() # KST 하루를 UTC 범위로 변환 (R8) — func.date(created_at)는 pg TimeZone(UTC) 기준 날짜라 # KST 0~9시 생성 문서(UTC 전날)가 누락되던 경계 버그. created_at(UTC저장) 범위 비교로. start_utc = datetime.combine(today, time.min, tzinfo=kst).astimezone(timezone.utc) end_utc = start_utc + timedelta(days=1) sections = [] async with async_session() as session: # ─── 1. 오늘 추가된 문서 ─── added = await session.execute( select(Document.ai_domain, func.count(Document.id)) .where(Document.created_at >= start_utc, Document.created_at < end_utc) .group_by(Document.ai_domain) ) added_rows = added.all() total_added = sum(row[1] for row in added_rows) section = f"## 오늘 추가된 문서 ({total_added}건)\n" if added_rows: for domain, count in added_rows: section += f"- {domain or '미분류'}: {count}건\n" else: section += "- 없음\n" sections.append(section) # ─── 2. 법령 변경 ─── law_docs = await session.execute( select(Document.title) .where( Document.source_channel == "law_monitor", Document.created_at >= start_utc, Document.created_at < end_utc, ) ) law_rows = law_docs.scalars().all() section = f"## 법령 변경 ({len(law_rows)}건)\n" if law_rows: for title in law_rows: section += f"- {title}\n" else: section += "- 변경 없음\n" sections.append(section) # ─── 3. 이메일 수집 ─── email_count = await session.execute( select(func.count(Document.id)) .where( Document.source_channel == "email", Document.created_at >= start_utc, Document.created_at < end_utc, ) ) email_total = email_count.scalar() or 0 sections.append(f"## 이메일 수집\n- {email_total}건 아카이브\n") # ─── 4. 처리 파이프라인 상태 ─── queue_stats = await session.execute( text(""" SELECT stage, status, COUNT(*) FROM processing_queue WHERE created_at > NOW() - INTERVAL '24 hours' GROUP BY stage, status ORDER BY stage, status """) ) queue_rows = queue_stats.all() section = "## 파이프라인 상태 (24h)\n" if queue_rows: for stage, status, count in queue_rows: section += f"- {stage}/{status}: {count}건\n" else: section += "- 처리 항목 없음\n" # 실패 건수 강조 failed = await session.execute( select(func.count()) .select_from(ProcessingQueue) .where( ProcessingQueue.status == "failed", ProcessingQueue.created_at > text("NOW() - INTERVAL '24 hours'"), ) ) failed_count = failed.scalar() or 0 if failed_count > 0: section += f"\n**[주의] 실패 {failed_count}건** — 수동 확인 필요\n" sections.append(section) # ─── 5. Inbox 미분류 ─── inbox_count = await session.execute( select(func.count(Document.id)) .where(Document.file_path.like("PKM/Inbox/%")) ) inbox_total = inbox_count.scalar() or 0 if inbox_total > 0: sections.append(f"## Inbox 미분류\n- {inbox_total}건 대기 중\n") # ─── Markdown 조합 ─── date_display = datetime.now(timezone.utc).strftime("%Y년 %m월 %d일") markdown = f"# PKM 일일 다이제스트 — {date_display}\n\n" markdown += "\n".join(sections) markdown += f"\n---\n*생성: {datetime.now(timezone.utc).isoformat()}*\n" # ─── NAS 저장 + 90일 아카이브 (blocking 파일 I/O off-thread, R8/R5 일관) ─── digest_dir = Path(settings.nas_mount_path) / "PKM" / "Archive" / "digests" digest_path = await asyncio.to_thread(_write_and_rotate, digest_dir, str(today), markdown) logger.info(f"다이제스트 생성 완료: {digest_path}")