Files
hyungi_document_server/app/services/digest/pipeline.py
Hyungi Ahn 75a1919342 feat(digest): Phase 4 Global News Digest (cluster-level batch summarization)
7일 rolling window 뉴스를 country × topic 2-level로 묶어 매일 04:00 KST 배치 생성.
search 파이프라인 미사용. documents → clustering → cluster-level LLM summarization → digest.

핵심 결정:
- adaptive threshold (0.75/0.78/0.80) + EMA centroid (α=0.7) + time-decay (λ=ln(2)/3)
- min_articles=3, max_topics=10/country, top-5 MMR diversity, ai_summary[:300] truncate
- cluster-level LLM only, drop금지 fallback (topic_label="주요 뉴스 묶음" + top member ai_summary[:200])
- importance_score country별 0~1 normalize + raw_weight_sum 별도 보존, max(score, 0.01) floor
- per-call timeout 25s + pipeline hard cap 600s
- DELETE+INSERT idempotent (UNIQUE digest_date), AIClient._call_chat 직접 호출 (client.py 수정 없음)

신규:
- migrations/101_global_digests.sql (2테이블 정규화)
- app/models/digest.py (GlobalDigest + DigestTopic ORM)
- app/services/digest/{loader,clustering,selection,summarizer,pipeline}.py
- app/workers/digest_worker.py (PIPELINE_HARD_CAP + CLI 진입점)
- app/api/digest.py (/latest, ?date|country, /regenerate, inline Pydantic)
- app/prompts/digest_topic.txt (JSON-only + 절대 금지 블록)

main.py 4줄: import 2 + scheduler add_job 1 + include_router 1.
plan: ~/.claude/plans/quiet-herding-tome.md
2026-04-09 07:45:11 +09:00

178 lines
5.9 KiB
Python

"""Phase 4 digest pipeline orchestration.
Step:
1. AIClient 생성
2. 7일 window 로 documents 로드 (loader)
3. country 별 cluster_country (clustering)
4. cluster 별 select_for_llm (selection)
5. cluster 별 summarize_cluster_with_fallback (summarizer, LLM)
6. DELETE+INSERT 단일 트랜잭션 (idempotent)
7. start/end 로그 + generation_ms + fallback 비율 health metric
"""
import hashlib
import time
from datetime import datetime, timedelta, timezone
from zoneinfo import ZoneInfo
from sqlalchemy import delete
from ai.client import AIClient
from core.database import async_session
from core.utils import setup_logger
from models.digest import DigestTopic, GlobalDigest
from .clustering import LAMBDA, cluster_country
from .loader import load_news_window
from .selection import select_for_llm
from .summarizer import summarize_cluster_with_fallback
logger = setup_logger("digest_pipeline")
WINDOW_DAYS = 7
KST = ZoneInfo("Asia/Seoul")
def _kst_today() -> datetime:
return datetime.now(KST).date()
def _summary_hash(text: str) -> str:
return hashlib.sha256((text or "").encode("utf-8")).hexdigest()[:16]
def _build_topic_row(
country: str,
rank: int,
cluster: dict,
selected: list[dict],
llm_result: dict,
primary_model: str,
) -> DigestTopic:
"""LLM 결과 + cluster 메타 → DigestTopic ORM 인스턴스.
article_ids 는 코드가 cluster.members 에서 직접 주입 (LLM 생성 금지 → id 위조 불가).
"""
article_ids = [int(m["id"]) for m in cluster["members"]]
centroid_sample = {
"selected_doc_ids": [int(m["id"]) for m in selected],
"summary_hashes": [_summary_hash(m.get("ai_summary") or "") for m in selected],
}
return DigestTopic(
country=country,
topic_rank=rank,
topic_label=llm_result["topic_label"],
summary=llm_result["summary"],
article_ids=article_ids,
article_count=len(article_ids),
importance_score=float(cluster["importance_score"]),
raw_weight_sum=float(cluster["raw_weight_sum"]),
centroid_sample=centroid_sample,
llm_model=primary_model,
llm_fallback_used=bool(llm_result["llm_fallback_used"]),
)
async def run_digest_pipeline() -> dict:
"""전체 파이프라인 실행. worker entry 에서 호출.
Returns:
실행 통계 dict {llm_calls, fallback_used, total_topics, generation_ms}
"""
start = time.time()
window_end = datetime.now(timezone.utc)
window_start = window_end - timedelta(days=WINDOW_DAYS)
digest_date = _kst_today()
logger.info(
f"[global_digest] start window={window_start.date()} ~ {window_end.date()} "
f"digest_date={digest_date} decay_lambda={LAMBDA:.4f}"
)
docs_by_country = await load_news_window(window_start, window_end)
if not docs_by_country:
logger.warning("[global_digest] 7일 window에 뉴스 0건 — digest 생성 스킵")
return {
"llm_calls": 0,
"fallback_used": 0,
"total_topics": 0,
"generation_ms": int((time.time() - start) * 1000),
}
client = AIClient()
primary_model = client.ai.primary.model
all_topic_rows: list[DigestTopic] = []
stats = {"llm_calls": 0, "fallback_used": 0}
try:
for country, docs in docs_by_country.items():
clusters = cluster_country(country, docs)
if not clusters:
continue # sparse country 자동 제외
for rank, cluster in enumerate(clusters, start=1):
selected = select_for_llm(cluster)
stats["llm_calls"] += 1
llm_result = await summarize_cluster_with_fallback(client, cluster, selected)
if llm_result["llm_fallback_used"]:
stats["fallback_used"] += 1
all_topic_rows.append(
_build_topic_row(country, rank, cluster, selected, llm_result, primary_model)
)
finally:
await client.close()
generation_ms = int((time.time() - start) * 1000)
total_articles = sum(len(d) for d in docs_by_country.values())
countries_with_topics = len({r.country for r in all_topic_rows})
if stats["fallback_used"] == 0:
status = "success"
elif stats["llm_calls"] and stats["fallback_used"] / stats["llm_calls"] > 0.5:
status = "failed"
else:
status = "partial"
async with async_session() as session:
# idempotent: 같은 날짜 row 가 있으면 CASCADE 로 topics 까지 삭제
await session.execute(
delete(GlobalDigest).where(GlobalDigest.digest_date == digest_date)
)
new_digest = GlobalDigest(
digest_date=digest_date,
window_start=window_start,
window_end=window_end,
decay_lambda=LAMBDA,
total_articles=total_articles,
total_countries=countries_with_topics,
total_topics=len(all_topic_rows),
generation_ms=generation_ms,
llm_calls=stats["llm_calls"],
llm_failures=stats["fallback_used"],
status=status,
)
new_digest.topics = all_topic_rows
session.add(new_digest)
await session.commit()
fallback_pct = (
(stats["fallback_used"] / stats["llm_calls"] * 100.0)
if stats["llm_calls"] else 0.0
)
logger.info(
f"[global_digest] done countries={countries_with_topics} "
f"topics={len(all_topic_rows)} llm_calls={stats['llm_calls']} "
f"fallback={stats['fallback_used']}/{stats['llm_calls']} ({fallback_pct:.2f}%) "
f"status={status} elapsed={generation_ms / 1000:.1f}s"
)
return {
"llm_calls": stats["llm_calls"],
"fallback_used": stats["fallback_used"],
"total_topics": len(all_topic_rows),
"generation_ms": generation_ms,
"status": status,
}