3565ef9ac4
실패 강조 라인의 ⚠️ → **[주의]** 텍스트 마커. 산출물(다이제스트 markdown) no-emoji 준수. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
149 lines
5.8 KiB
Python
149 lines
5.8 KiB
Python
"""일일 다이제스트 워커 — PostgreSQL/CalDAV 쿼리 → Markdown 생성
|
|
|
|
v1 scripts/pkm_daily_digest.py에서 포팅.
|
|
DEVONthink/OmniFocus → PostgreSQL/CalDAV 쿼리로 전환.
|
|
SMTP 발송은 2026-06-10 제거 (한 번도 전달 성공한 적 없는 기능 — 폐기 결정).
|
|
"""
|
|
|
|
import asyncio
|
|
from datetime import datetime, time, timedelta, timezone
|
|
from zoneinfo import ZoneInfo
|
|
from pathlib import Path
|
|
|
|
from sqlalchemy import func, select, text
|
|
|
|
from core.config import settings
|
|
from core.database import async_session
|
|
from core.utils import setup_logger
|
|
from models.document import Document
|
|
from models.queue import ProcessingQueue
|
|
|
|
logger = setup_logger("daily_digest")
|
|
|
|
|
|
def _write_and_rotate(digest_dir: Path, today: str, markdown: str) -> Path:
|
|
"""digest 파일 저장 + 90일 초과 아카이브 이동 (blocking — caller 가 to_thread, R8)."""
|
|
digest_dir.mkdir(parents=True, exist_ok=True)
|
|
digest_path = digest_dir / f"{today}_digest.md"
|
|
digest_path.write_text(markdown, encoding="utf-8")
|
|
archive_dir = digest_dir / "archive"
|
|
archive_dir.mkdir(exist_ok=True)
|
|
cutoff = datetime.now(timezone.utc).timestamp() - (90 * 86400)
|
|
for old in digest_dir.glob("*_digest.md"):
|
|
if old.stat().st_mtime < cutoff:
|
|
old.rename(archive_dir / old.name)
|
|
return digest_path
|
|
|
|
|
|
async def run():
|
|
"""일일 다이제스트 생성 + 저장 + 발송"""
|
|
# KST 기준 오늘 (cron 이 KST timezone fix 후 20:00 KST 에 fire).
|
|
kst = ZoneInfo("Asia/Seoul")
|
|
today = datetime.now(kst).date()
|
|
# KST 하루를 UTC 범위로 변환 (R8) — func.date(created_at)는 pg TimeZone(UTC) 기준 날짜라
|
|
# KST 0~9시 생성 문서(UTC 전날)가 누락되던 경계 버그. created_at(UTC저장) 범위 비교로.
|
|
start_utc = datetime.combine(today, time.min, tzinfo=kst).astimezone(timezone.utc)
|
|
end_utc = start_utc + timedelta(days=1)
|
|
sections = []
|
|
|
|
async with async_session() as session:
|
|
# ─── 1. 오늘 추가된 문서 ───
|
|
added = await session.execute(
|
|
select(Document.ai_domain, func.count(Document.id))
|
|
.where(Document.created_at >= start_utc, Document.created_at < end_utc)
|
|
.group_by(Document.ai_domain)
|
|
)
|
|
added_rows = added.all()
|
|
total_added = sum(row[1] for row in added_rows)
|
|
|
|
section = f"## 오늘 추가된 문서 ({total_added}건)\n"
|
|
if added_rows:
|
|
for domain, count in added_rows:
|
|
section += f"- {domain or '미분류'}: {count}건\n"
|
|
else:
|
|
section += "- 없음\n"
|
|
sections.append(section)
|
|
|
|
# ─── 2. 법령 변경 ───
|
|
law_docs = await session.execute(
|
|
select(Document.title)
|
|
.where(
|
|
Document.source_channel == "law_monitor",
|
|
Document.created_at >= start_utc,
|
|
Document.created_at < end_utc,
|
|
)
|
|
)
|
|
law_rows = law_docs.scalars().all()
|
|
section = f"## 법령 변경 ({len(law_rows)}건)\n"
|
|
if law_rows:
|
|
for title in law_rows:
|
|
section += f"- {title}\n"
|
|
else:
|
|
section += "- 변경 없음\n"
|
|
sections.append(section)
|
|
|
|
# ─── 3. 이메일 수집 ───
|
|
email_count = await session.execute(
|
|
select(func.count(Document.id))
|
|
.where(
|
|
Document.source_channel == "email",
|
|
Document.created_at >= start_utc,
|
|
Document.created_at < end_utc,
|
|
)
|
|
)
|
|
email_total = email_count.scalar() or 0
|
|
sections.append(f"## 이메일 수집\n- {email_total}건 아카이브\n")
|
|
|
|
# ─── 4. 처리 파이프라인 상태 ───
|
|
queue_stats = await session.execute(
|
|
text("""
|
|
SELECT stage, status, COUNT(*)
|
|
FROM processing_queue
|
|
WHERE created_at > NOW() - INTERVAL '24 hours'
|
|
GROUP BY stage, status
|
|
ORDER BY stage, status
|
|
""")
|
|
)
|
|
queue_rows = queue_stats.all()
|
|
section = "## 파이프라인 상태 (24h)\n"
|
|
if queue_rows:
|
|
for stage, status, count in queue_rows:
|
|
section += f"- {stage}/{status}: {count}건\n"
|
|
else:
|
|
section += "- 처리 항목 없음\n"
|
|
|
|
# 실패 건수 강조
|
|
failed = await session.execute(
|
|
select(func.count())
|
|
.select_from(ProcessingQueue)
|
|
.where(
|
|
ProcessingQueue.status == "failed",
|
|
ProcessingQueue.created_at > text("NOW() - INTERVAL '24 hours'"),
|
|
)
|
|
)
|
|
failed_count = failed.scalar() or 0
|
|
if failed_count > 0:
|
|
section += f"\n**[주의] 실패 {failed_count}건** — 수동 확인 필요\n"
|
|
sections.append(section)
|
|
|
|
# ─── 5. Inbox 미분류 ───
|
|
inbox_count = await session.execute(
|
|
select(func.count(Document.id))
|
|
.where(Document.file_path.like("PKM/Inbox/%"))
|
|
)
|
|
inbox_total = inbox_count.scalar() or 0
|
|
if inbox_total > 0:
|
|
sections.append(f"## Inbox 미분류\n- {inbox_total}건 대기 중\n")
|
|
|
|
# ─── Markdown 조합 ───
|
|
date_display = datetime.now(timezone.utc).strftime("%Y년 %m월 %d일")
|
|
markdown = f"# PKM 일일 다이제스트 — {date_display}\n\n"
|
|
markdown += "\n".join(sections)
|
|
markdown += f"\n---\n*생성: {datetime.now(timezone.utc).isoformat()}*\n"
|
|
|
|
# ─── NAS 저장 + 90일 아카이브 (blocking 파일 I/O off-thread, R8/R5 일관) ───
|
|
digest_dir = Path(settings.nas_mount_path) / "PKM" / "Archive" / "digests"
|
|
digest_path = await asyncio.to_thread(_write_and_rotate, digest_dir, str(today), markdown)
|
|
|
|
logger.info(f"다이제스트 생성 완료: {digest_path}")
|