"""Phase 4 digest pipeline orchestration. Step: 1. AIClient 생성 2. 7일 window 로 documents 로드 (loader) 3. country 별 cluster_country (clustering) 4. cluster 별 select_for_llm (selection) 5. cluster 별 summarize_cluster_with_fallback (summarizer, LLM) 6. DELETE+INSERT 단일 트랜잭션 (idempotent) 7. start/end 로그 + generation_ms + fallback 비율 health metric """ import hashlib import time from datetime import datetime, timedelta, timezone from zoneinfo import ZoneInfo from sqlalchemy import delete from ai.client import AIClient from core.database import async_session from core.utils import setup_logger from models.digest import DigestTopic, GlobalDigest from .clustering import LAMBDA, cluster_country from .loader import load_news_window from .selection import select_for_llm from .summarizer import summarize_cluster_with_fallback logger = setup_logger("digest_pipeline") WINDOW_DAYS = 7 KST = ZoneInfo("Asia/Seoul") def _kst_today() -> datetime: return datetime.now(KST).date() def _summary_hash(text: str) -> str: return hashlib.sha256((text or "").encode("utf-8")).hexdigest()[:16] def _build_topic_row( country: str, rank: int, cluster: dict, selected: list[dict], llm_result: dict, primary_model: str, ) -> DigestTopic: """LLM 결과 + cluster 메타 → DigestTopic ORM 인스턴스. article_ids 는 코드가 cluster.members 에서 직접 주입 (LLM 생성 금지 → id 위조 불가). """ article_ids = [int(m["id"]) for m in cluster["members"]] centroid_sample = { "selected_doc_ids": [int(m["id"]) for m in selected], "summary_hashes": [_summary_hash(m.get("ai_summary") or "") for m in selected], } return DigestTopic( country=country, topic_rank=rank, topic_label=llm_result["topic_label"], summary=llm_result["summary"], article_ids=article_ids, article_count=len(article_ids), importance_score=float(cluster["importance_score"]), raw_weight_sum=float(cluster["raw_weight_sum"]), centroid_sample=centroid_sample, llm_model=primary_model, llm_fallback_used=bool(llm_result["llm_fallback_used"]), ) async def run_digest_pipeline() -> dict: """전체 파이프라인 실행. worker entry 에서 호출. Returns: 실행 통계 dict {llm_calls, fallback_used, total_topics, generation_ms} """ start = time.time() window_end = datetime.now(timezone.utc) window_start = window_end - timedelta(days=WINDOW_DAYS) digest_date = _kst_today() logger.info( f"[global_digest] start window={window_start.date()} ~ {window_end.date()} " f"digest_date={digest_date} decay_lambda={LAMBDA:.4f}" ) docs_by_country = await load_news_window(window_start, window_end) if not docs_by_country: logger.warning("[global_digest] 7일 window에 뉴스 0건 — digest 생성 스킵") return { "llm_calls": 0, "fallback_used": 0, "total_topics": 0, "generation_ms": int((time.time() - start) * 1000), } client = AIClient() primary_model = client.ai.primary.model all_topic_rows: list[DigestTopic] = [] stats = {"llm_calls": 0, "fallback_used": 0} try: for country, docs in docs_by_country.items(): clusters = cluster_country(country, docs) if not clusters: continue # sparse country 자동 제외 for rank, cluster in enumerate(clusters, start=1): selected = select_for_llm(cluster) stats["llm_calls"] += 1 llm_result = await summarize_cluster_with_fallback(client, cluster, selected) if llm_result["llm_fallback_used"]: stats["fallback_used"] += 1 all_topic_rows.append( _build_topic_row(country, rank, cluster, selected, llm_result, primary_model) ) finally: await client.close() generation_ms = int((time.time() - start) * 1000) total_articles = sum(len(d) for d in docs_by_country.values()) countries_with_topics = len({r.country for r in all_topic_rows}) if stats["fallback_used"] == 0: status = "success" elif stats["llm_calls"] and stats["fallback_used"] / stats["llm_calls"] > 0.5: status = "failed" else: status = "partial" async with async_session() as session: # idempotent: 같은 날짜 row 가 있으면 CASCADE 로 topics 까지 삭제 await session.execute( delete(GlobalDigest).where(GlobalDigest.digest_date == digest_date) ) new_digest = GlobalDigest( digest_date=digest_date, window_start=window_start, window_end=window_end, decay_lambda=LAMBDA, total_articles=total_articles, total_countries=countries_with_topics, total_topics=len(all_topic_rows), generation_ms=generation_ms, llm_calls=stats["llm_calls"], llm_failures=stats["fallback_used"], status=status, ) new_digest.topics = all_topic_rows session.add(new_digest) await session.commit() fallback_pct = ( (stats["fallback_used"] / stats["llm_calls"] * 100.0) if stats["llm_calls"] else 0.0 ) logger.info( f"[global_digest] done countries={countries_with_topics} " f"topics={len(all_topic_rows)} llm_calls={stats['llm_calls']} " f"fallback={stats['fallback_used']}/{stats['llm_calls']} ({fallback_pct:.2f}%) " f"status={status} elapsed={generation_ms / 1000:.1f}s" ) return { "llm_calls": stats["llm_calls"], "fallback_used": stats["fallback_used"], "total_topics": len(all_topic_rows), "generation_ms": generation_ms, "status": status, }