Files
hyungi 3565ef9ac4 fix(digest): daily_digest 산출물 이모지 제거 — no-emoji 규칙 (R11a)
실패 강조 라인의 ⚠️ → **[주의]** 텍스트 마커. 산출물(다이제스트 markdown) no-emoji 준수.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-16 14:03:31 +09:00

149 lines
5.8 KiB
Python

"""일일 다이제스트 워커 — PostgreSQL/CalDAV 쿼리 → Markdown 생성
v1 scripts/pkm_daily_digest.py에서 포팅.
DEVONthink/OmniFocus → PostgreSQL/CalDAV 쿼리로 전환.
SMTP 발송은 2026-06-10 제거 (한 번도 전달 성공한 적 없는 기능 — 폐기 결정).
"""
import asyncio
from datetime import datetime, time, timedelta, timezone
from zoneinfo import ZoneInfo
from pathlib import Path
from sqlalchemy import func, select, text
from core.config import settings
from core.database import async_session
from core.utils import setup_logger
from models.document import Document
from models.queue import ProcessingQueue
logger = setup_logger("daily_digest")
def _write_and_rotate(digest_dir: Path, today: str, markdown: str) -> Path:
"""digest 파일 저장 + 90일 초과 아카이브 이동 (blocking — caller 가 to_thread, R8)."""
digest_dir.mkdir(parents=True, exist_ok=True)
digest_path = digest_dir / f"{today}_digest.md"
digest_path.write_text(markdown, encoding="utf-8")
archive_dir = digest_dir / "archive"
archive_dir.mkdir(exist_ok=True)
cutoff = datetime.now(timezone.utc).timestamp() - (90 * 86400)
for old in digest_dir.glob("*_digest.md"):
if old.stat().st_mtime < cutoff:
old.rename(archive_dir / old.name)
return digest_path
async def run():
"""일일 다이제스트 생성 + 저장 + 발송"""
# KST 기준 오늘 (cron 이 KST timezone fix 후 20:00 KST 에 fire).
kst = ZoneInfo("Asia/Seoul")
today = datetime.now(kst).date()
# KST 하루를 UTC 범위로 변환 (R8) — func.date(created_at)는 pg TimeZone(UTC) 기준 날짜라
# KST 0~9시 생성 문서(UTC 전날)가 누락되던 경계 버그. created_at(UTC저장) 범위 비교로.
start_utc = datetime.combine(today, time.min, tzinfo=kst).astimezone(timezone.utc)
end_utc = start_utc + timedelta(days=1)
sections = []
async with async_session() as session:
# ─── 1. 오늘 추가된 문서 ───
added = await session.execute(
select(Document.ai_domain, func.count(Document.id))
.where(Document.created_at >= start_utc, Document.created_at < end_utc)
.group_by(Document.ai_domain)
)
added_rows = added.all()
total_added = sum(row[1] for row in added_rows)
section = f"## 오늘 추가된 문서 ({total_added}건)\n"
if added_rows:
for domain, count in added_rows:
section += f"- {domain or '미분류'}: {count}\n"
else:
section += "- 없음\n"
sections.append(section)
# ─── 2. 법령 변경 ───
law_docs = await session.execute(
select(Document.title)
.where(
Document.source_channel == "law_monitor",
Document.created_at >= start_utc,
Document.created_at < end_utc,
)
)
law_rows = law_docs.scalars().all()
section = f"## 법령 변경 ({len(law_rows)}건)\n"
if law_rows:
for title in law_rows:
section += f"- {title}\n"
else:
section += "- 변경 없음\n"
sections.append(section)
# ─── 3. 이메일 수집 ───
email_count = await session.execute(
select(func.count(Document.id))
.where(
Document.source_channel == "email",
Document.created_at >= start_utc,
Document.created_at < end_utc,
)
)
email_total = email_count.scalar() or 0
sections.append(f"## 이메일 수집\n- {email_total}건 아카이브\n")
# ─── 4. 처리 파이프라인 상태 ───
queue_stats = await session.execute(
text("""
SELECT stage, status, COUNT(*)
FROM processing_queue
WHERE created_at > NOW() - INTERVAL '24 hours'
GROUP BY stage, status
ORDER BY stage, status
""")
)
queue_rows = queue_stats.all()
section = "## 파이프라인 상태 (24h)\n"
if queue_rows:
for stage, status, count in queue_rows:
section += f"- {stage}/{status}: {count}\n"
else:
section += "- 처리 항목 없음\n"
# 실패 건수 강조
failed = await session.execute(
select(func.count())
.select_from(ProcessingQueue)
.where(
ProcessingQueue.status == "failed",
ProcessingQueue.created_at > text("NOW() - INTERVAL '24 hours'"),
)
)
failed_count = failed.scalar() or 0
if failed_count > 0:
section += f"\n**[주의] 실패 {failed_count}건** — 수동 확인 필요\n"
sections.append(section)
# ─── 5. Inbox 미분류 ───
inbox_count = await session.execute(
select(func.count(Document.id))
.where(Document.file_path.like("PKM/Inbox/%"))
)
inbox_total = inbox_count.scalar() or 0
if inbox_total > 0:
sections.append(f"## Inbox 미분류\n- {inbox_total}건 대기 중\n")
# ─── Markdown 조합 ───
date_display = datetime.now(timezone.utc).strftime("%Y년 %m월 %d")
markdown = f"# PKM 일일 다이제스트 — {date_display}\n\n"
markdown += "\n".join(sections)
markdown += f"\n---\n*생성: {datetime.now(timezone.utc).isoformat()}*\n"
# ─── NAS 저장 + 90일 아카이브 (blocking 파일 I/O off-thread, R8/R5 일관) ───
digest_dir = Path(settings.nas_mount_path) / "PKM" / "Archive" / "digests"
digest_path = await asyncio.to_thread(_write_and_rotate, digest_dir, str(today), markdown)
logger.info(f"다이제스트 생성 완료: {digest_path}")