diff --git a/app/workers/daily_digest.py b/app/workers/daily_digest.py index be68abd..5cc5ad3 100644 --- a/app/workers/daily_digest.py +++ b/app/workers/daily_digest.py @@ -5,7 +5,8 @@ DEVONthink/OmniFocus → PostgreSQL/CalDAV 쿼리로 전환. SMTP 발송은 2026-06-10 제거 (한 번도 전달 성공한 적 없는 기능 — 폐기 결정). """ -from datetime import datetime, timezone +import asyncio +from datetime import datetime, time, timedelta, timezone from zoneinfo import ZoneInfo from pathlib import Path @@ -20,17 +21,36 @@ from models.queue import ProcessingQueue logger = setup_logger("daily_digest") +def _write_and_rotate(digest_dir: Path, today: str, markdown: str) -> Path: + """digest 파일 저장 + 90일 초과 아카이브 이동 (blocking — caller 가 to_thread, R8).""" + digest_dir.mkdir(parents=True, exist_ok=True) + digest_path = digest_dir / f"{today}_digest.md" + digest_path.write_text(markdown, encoding="utf-8") + archive_dir = digest_dir / "archive" + archive_dir.mkdir(exist_ok=True) + cutoff = datetime.now(timezone.utc).timestamp() - (90 * 86400) + for old in digest_dir.glob("*_digest.md"): + if old.stat().st_mtime < cutoff: + old.rename(archive_dir / old.name) + return digest_path + + async def run(): """일일 다이제스트 생성 + 저장 + 발송""" - # KST 기준 오늘 (cron 이 KST timezone fix 후 20:00 KST 에 fire). date 객체로 비교 — Document.created_at::date 와 직접 매칭. - today = datetime.now(ZoneInfo("Asia/Seoul")).date() + # KST 기준 오늘 (cron 이 KST timezone fix 후 20:00 KST 에 fire). + kst = ZoneInfo("Asia/Seoul") + today = datetime.now(kst).date() + # KST 하루를 UTC 범위로 변환 (R8) — func.date(created_at)는 pg TimeZone(UTC) 기준 날짜라 + # KST 0~9시 생성 문서(UTC 전날)가 누락되던 경계 버그. created_at(UTC저장) 범위 비교로. + start_utc = datetime.combine(today, time.min, tzinfo=kst).astimezone(timezone.utc) + end_utc = start_utc + timedelta(days=1) sections = [] async with async_session() as session: # ─── 1. 오늘 추가된 문서 ─── added = await session.execute( select(Document.ai_domain, func.count(Document.id)) - .where(func.date(Document.created_at) == today) + .where(Document.created_at >= start_utc, Document.created_at < end_utc) .group_by(Document.ai_domain) ) added_rows = added.all() @@ -49,7 +69,8 @@ async def run(): select(Document.title) .where( Document.source_channel == "law_monitor", - func.date(Document.created_at) == today, + Document.created_at >= start_utc, + Document.created_at < end_utc, ) ) law_rows = law_docs.scalars().all() @@ -66,7 +87,8 @@ async def run(): select(func.count(Document.id)) .where( Document.source_channel == "email", - func.date(Document.created_at) == today, + Document.created_at >= start_utc, + Document.created_at < end_utc, ) ) email_total = email_count.scalar() or 0 @@ -119,18 +141,8 @@ async def run(): markdown += "\n".join(sections) markdown += f"\n---\n*생성: {datetime.now(timezone.utc).isoformat()}*\n" - # ─── NAS 저장 ─── + # ─── NAS 저장 + 90일 아카이브 (blocking 파일 I/O off-thread, R8/R5 일관) ─── digest_dir = Path(settings.nas_mount_path) / "PKM" / "Archive" / "digests" - digest_dir.mkdir(parents=True, exist_ok=True) - digest_path = digest_dir / f"{today}_digest.md" - digest_path.write_text(markdown, encoding="utf-8") - - # ─── 90일 초과 아카이브 ─── - archive_dir = digest_dir / "archive" - archive_dir.mkdir(exist_ok=True) - cutoff = datetime.now(timezone.utc).timestamp() - (90 * 86400) - for old in digest_dir.glob("*_digest.md"): - if old.stat().st_mtime < cutoff: - old.rename(archive_dir / old.name) + digest_path = await asyncio.to_thread(_write_and_rotate, digest_dir, str(today), markdown) logger.info(f"다이제스트 생성 완료: {digest_path}")