Files
hyungi_document_server/app/services/digest/pipeline.py
T
hyungi f325bd0509 feat(observability): digest/briefing 을 처리 보드에 맥미니 작업으로 노출 (background_jobs)
큐 밖 cron 생성 작업(global_digest/morning_briefing)이 processing_queue stage 가
아니라 보드에 안 잡혀, 맥미니가 11분짜리 digest 를 돌려도 idle 처럼 보였다.
ebbcaf8 의 background_jobs 메커니즘 재사용:
- digest_worker/briefing_worker = start_job→finish_job (best-effort, 본작업 무해)
- pipeline = cluster 완료마다 heartbeat(processed/total) → 진행바
- queue_overview = kind→machine 맵으로 payload 에 machine 필드 (맥미니 귀속)
- 보드 = 머신 레인에 dot 점등 + "생성 중: <label> N/T" 표시

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-15 03:36:57 +00:00

198 lines
6.7 KiB
Python

"""Phase 4 digest pipeline orchestration.
Step:
1. AIClient 생성
2. 7일 window 로 documents 로드 (loader)
3. country 별 cluster_country (clustering)
4. cluster 별 select_for_llm (selection)
5. cluster 별 summarize_cluster_with_fallback (summarizer, LLM)
6. DELETE+INSERT 단일 트랜잭션 (idempotent)
7. start/end 로그 + generation_ms + fallback 비율 health metric
"""
import asyncio
import hashlib
import time
from datetime import datetime, timedelta, timezone
from zoneinfo import ZoneInfo
from sqlalchemy import delete
from ai.client import AIClient
from core.database import async_session
from core.database import engine as db_engine
from core.utils import setup_logger
from services import background_jobs as bgj
from models.digest import DigestTopic, GlobalDigest
from .clustering import LAMBDA, cluster_country
from .loader import load_news_window
from .selection import select_for_llm
from .summarizer import summarize_cluster_with_fallback
logger = setup_logger("digest_pipeline")
WINDOW_DAYS = 7
KST = ZoneInfo("Asia/Seoul")
def _kst_today() -> datetime:
return datetime.now(KST).date()
def _summary_hash(text: str) -> str:
return hashlib.sha256((text or "").encode("utf-8")).hexdigest()[:16]
def _build_topic_row(
country: str,
rank: int,
cluster: dict,
selected: list[dict],
llm_result: dict,
primary_model: str,
) -> DigestTopic:
"""LLM 결과 + cluster 메타 → DigestTopic ORM 인스턴스.
article_ids 는 코드가 cluster.members 에서 직접 주입 (LLM 생성 금지 → id 위조 불가).
"""
article_ids = [int(m["id"]) for m in cluster["members"]]
centroid_sample = {
"selected_doc_ids": [int(m["id"]) for m in selected],
"summary_hashes": [_summary_hash(m.get("ai_summary") or "") for m in selected],
}
return DigestTopic(
country=country,
topic_rank=rank,
topic_label=llm_result["topic_label"],
summary=llm_result["summary"],
article_ids=article_ids,
article_count=len(article_ids),
importance_score=float(cluster["importance_score"]),
raw_weight_sum=float(cluster["raw_weight_sum"]),
centroid_sample=centroid_sample,
llm_model=primary_model,
llm_fallback_used=bool(llm_result["llm_fallback_used"]),
)
async def run_digest_pipeline(job_id: int | None = None) -> dict:
"""전체 파이프라인 실행. worker entry 에서 호출.
Returns:
실행 통계 dict {llm_calls, fallback_used, total_topics, generation_ms}
"""
start = time.time()
window_end = datetime.now(timezone.utc)
window_start = window_end - timedelta(days=WINDOW_DAYS)
digest_date = _kst_today()
logger.info(
f"[global_digest] start window={window_start.date()} ~ {window_end.date()} "
f"digest_date={digest_date} decay_lambda={LAMBDA:.4f}"
)
docs_by_country = await load_news_window(window_start, window_end)
if not docs_by_country:
logger.warning("[global_digest] 7일 window에 뉴스 0건 — digest 생성 스킵")
return {
"llm_calls": 0,
"fallback_used": 0,
"total_topics": 0,
"generation_ms": int((time.time() - start) * 1000),
}
client = AIClient()
primary_model = client.ai.primary.model
all_topic_rows: list[DigestTopic] = []
stats = {"llm_calls": 0, "fallback_used": 0}
try:
# 2026-06-15: cluster 호출을 gather 로 동시 실행. 실제 동시성은 전역 MLX gate
# (config.mlx_gate_concurrency, BACKGROUND 우선순위) 가 제한한다. rank/순서 보존.
jobs = []
for country, docs in docs_by_country.items():
clusters = cluster_country(country, docs)
if not clusters:
continue # sparse country 자동 제외
for rank, cluster in enumerate(clusters, start=1):
selected = select_for_llm(cluster)
jobs.append((country, rank, cluster, selected))
if job_id is not None:
await bgj.heartbeat(db_engine, job_id, total=len(jobs))
_prog = {"n": 0}
async def _run_one(cluster, selected):
r = await summarize_cluster_with_fallback(client, cluster, selected)
if job_id is not None:
_prog["n"] += 1
await bgj.heartbeat(db_engine, job_id, processed=_prog["n"])
return r
results = await asyncio.gather(*[_run_one(c, s) for (_, _, c, s) in jobs])
for (country, rank, cluster, selected), llm_result in zip(jobs, results):
stats["llm_calls"] += 1
if llm_result["llm_fallback_used"]:
stats["fallback_used"] += 1
all_topic_rows.append(
_build_topic_row(country, rank, cluster, selected, llm_result, primary_model)
)
finally:
await client.close()
generation_ms = int((time.time() - start) * 1000)
total_articles = sum(len(d) for d in docs_by_country.values())
countries_with_topics = len({r.country for r in all_topic_rows})
if stats["fallback_used"] == 0:
status = "success"
elif stats["llm_calls"] and stats["fallback_used"] / stats["llm_calls"] > 0.5:
status = "failed"
else:
status = "partial"
async with async_session() as session:
# idempotent: 같은 날짜 row 가 있으면 CASCADE 로 topics 까지 삭제
await session.execute(
delete(GlobalDigest).where(GlobalDigest.digest_date == digest_date)
)
new_digest = GlobalDigest(
digest_date=digest_date,
window_start=window_start,
window_end=window_end,
decay_lambda=LAMBDA,
total_articles=total_articles,
total_countries=countries_with_topics,
total_topics=len(all_topic_rows),
generation_ms=generation_ms,
llm_calls=stats["llm_calls"],
llm_failures=stats["fallback_used"],
status=status,
)
new_digest.topics = all_topic_rows
session.add(new_digest)
await session.commit()
fallback_pct = (
(stats["fallback_used"] / stats["llm_calls"] * 100.0)
if stats["llm_calls"] else 0.0
)
logger.info(
f"[global_digest] done countries={countries_with_topics} "
f"topics={len(all_topic_rows)} llm_calls={stats['llm_calls']} "
f"fallback={stats['fallback_used']}/{stats['llm_calls']} ({fallback_pct:.2f}%) "
f"status={status} elapsed={generation_ms / 1000:.1f}s"
)
return {
"llm_calls": stats["llm_calls"],
"fallback_used": stats["fallback_used"],
"total_topics": len(all_topic_rows),
"generation_ms": generation_ms,
"status": status,
}