f325bd0509
큐 밖 cron 생성 작업(global_digest/morning_briefing)이 processing_queue stage 가
아니라 보드에 안 잡혀, 맥미니가 11분짜리 digest 를 돌려도 idle 처럼 보였다.
ebbcaf8 의 background_jobs 메커니즘 재사용:
- digest_worker/briefing_worker = start_job→finish_job (best-effort, 본작업 무해)
- pipeline = cluster 완료마다 heartbeat(processed/total) → 진행바
- queue_overview = kind→machine 맵으로 payload 에 machine 필드 (맥미니 귀속)
- 보드 = 머신 레인에 dot 점등 + "생성 중: <label> N/T" 표시
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
284 lines
10 KiB
Python
284 lines
10 KiB
Python
"""야간 수집 뉴스 브리핑 파이프라인 (Plan §"PR-MorningBriefing-1 Backend").
|
|
|
|
흐름: load_night_window → cluster_global → select_for_llm (k=7) →
|
|
(옵션) historical retrieval → compare_cluster_with_fallback → DB save.
|
|
|
|
regenerate 정책: briefing_date UNIQUE 충돌 시 transaction 안에서 DELETE+INSERT.
|
|
"""
|
|
|
|
import asyncio
|
|
import time
|
|
from datetime import date, datetime, timedelta, timezone
|
|
from typing import Any
|
|
from zoneinfo import ZoneInfo
|
|
|
|
from sqlalchemy import delete
|
|
|
|
from ai.client import AIClient
|
|
from core.database import async_session
|
|
from core.database import engine as db_engine
|
|
from core.utils import setup_logger
|
|
from services import background_jobs as bgj
|
|
from models.briefing import BriefingTopic, MorningBriefing
|
|
from services.briefing.clustering import LAMBDA, cluster_global
|
|
from services.briefing.comparator import (
|
|
HISTORICAL_WINDOW_DAYS,
|
|
compare_cluster_with_fallback,
|
|
historical_enabled,
|
|
retrieve_historical,
|
|
)
|
|
from services.briefing.loader import load_historical_candidates, load_night_window
|
|
from services.digest.selection import select_for_llm
|
|
|
|
logger = setup_logger("briefing_pipeline")
|
|
|
|
KST = ZoneInfo("Asia/Seoul")
|
|
NIGHT_WINDOW_HOURS = 5 # KST 00:00 ~ 05:00
|
|
SELECT_K = 7 # Plan §"Clustering 파라미터" briefing K_PER_CLUSTER=7
|
|
SELECT_LAMBDA_MMR = 0.6 # Plan briefing MMR lambda 0.6
|
|
|
|
|
|
def _compute_window(target_date: date | None = None) -> tuple[datetime, datetime, date]:
|
|
"""target_date (KST 자정 시작일) → (window_start_utc, window_end_utc, kst_date).
|
|
|
|
target_date=None 시 오늘 KST.
|
|
"""
|
|
if target_date is None:
|
|
target_date = datetime.now(KST).date()
|
|
start_kst = datetime.combine(target_date, datetime.min.time(), tzinfo=KST)
|
|
end_kst = start_kst + timedelta(hours=NIGHT_WINDOW_HOURS)
|
|
return start_kst.astimezone(timezone.utc), end_kst.astimezone(timezone.utc), target_date
|
|
|
|
|
|
def _is_usable_topic(envelope: dict, topic_label: str) -> bool:
|
|
"""fallback row 가 아닌 진짜 LLM 결과인지 판정."""
|
|
if envelope.get("llm_fallback_used"):
|
|
return False
|
|
if not envelope.get("country_perspectives"):
|
|
return False
|
|
if topic_label == "주요 뉴스 묶음":
|
|
return False
|
|
return True
|
|
|
|
|
|
def _compute_status(llm_calls: int, fallback_count: int, usable_count: int, has_topics: bool) -> str:
|
|
"""Plan §"Status 4-state 판정표"."""
|
|
if not has_topics or llm_calls == 0:
|
|
return "empty"
|
|
if usable_count == 0:
|
|
return "failed"
|
|
fallback_pct = (fallback_count / llm_calls) if llm_calls else 0.0
|
|
if fallback_pct >= 0.5:
|
|
return "failed"
|
|
if fallback_count > 0 or usable_count < llm_calls:
|
|
return "partial"
|
|
return "success"
|
|
|
|
|
|
def _build_topic_row(
|
|
rank: int,
|
|
cluster: dict,
|
|
envelope: dict,
|
|
historical_docs: list[dict] | None,
|
|
primary_model: str,
|
|
) -> BriefingTopic:
|
|
historical_ids = None
|
|
historical_window = None
|
|
if historical_enabled():
|
|
historical_ids = [d["id"] for d in (historical_docs or [])]
|
|
historical_window = HISTORICAL_WINDOW_DAYS
|
|
|
|
return BriefingTopic(
|
|
topic_rank=rank,
|
|
topic_label=envelope["topic_label"],
|
|
headline=envelope["headline"],
|
|
country_perspectives=envelope["country_perspectives"],
|
|
divergences=envelope["divergences"],
|
|
convergences=envelope["convergences"],
|
|
key_quotes=envelope["key_quotes"],
|
|
historical_article_ids=historical_ids,
|
|
historical_context=envelope.get("historical_context"),
|
|
historical_window_days=historical_window,
|
|
cluster_members=[m["id"] for m in cluster["members"]],
|
|
article_count=len(cluster["members"]),
|
|
country_count=cluster.get("country_count", 0),
|
|
importance_score=cluster.get("importance_score", 0.0),
|
|
raw_weight_sum=cluster.get("raw_weight_sum", 0.0),
|
|
llm_model=primary_model,
|
|
llm_fallback_used=envelope.get("llm_fallback_used", False),
|
|
)
|
|
|
|
|
|
async def _save_briefing(
|
|
briefing_date: date,
|
|
window_start: datetime,
|
|
window_end: datetime,
|
|
total_articles: int,
|
|
total_countries: int,
|
|
topic_rows: list[BriefingTopic],
|
|
llm_calls: int,
|
|
llm_failures: int,
|
|
generation_ms: int,
|
|
status: str,
|
|
) -> int:
|
|
"""briefing_date UNIQUE 충돌은 DELETE+INSERT transaction 으로 처리."""
|
|
async with async_session() as session:
|
|
await session.execute(
|
|
delete(MorningBriefing).where(MorningBriefing.briefing_date == briefing_date)
|
|
)
|
|
new = MorningBriefing(
|
|
briefing_date=briefing_date,
|
|
window_start=window_start,
|
|
window_end=window_end,
|
|
decay_lambda=LAMBDA,
|
|
total_articles=total_articles,
|
|
total_countries=total_countries,
|
|
total_topics=len(topic_rows),
|
|
generation_ms=generation_ms,
|
|
llm_calls=llm_calls,
|
|
llm_failures=llm_failures,
|
|
status=status,
|
|
)
|
|
new.topics = topic_rows
|
|
session.add(new)
|
|
await session.commit()
|
|
return new.id
|
|
|
|
|
|
async def run_briefing_pipeline(target_date: date | None = None, job_id: int | None = None) -> dict[str, Any]:
|
|
"""야간 뉴스 브리핑 1회 실행. cron 또는 수동 regenerate API 에서 호출.
|
|
|
|
Returns:
|
|
{briefing_id, status, total_topics, total_articles, llm_calls, llm_failures, generation_ms, regenerated}
|
|
"""
|
|
start = time.time()
|
|
window_start, window_end, briefing_date = _compute_window(target_date)
|
|
logger.info(
|
|
f"[briefing] start date={briefing_date} window {window_start} ~ {window_end} "
|
|
f"decay_lambda={LAMBDA:.4f} historical={'on' if historical_enabled() else 'off'}"
|
|
)
|
|
|
|
# 1. Load night window
|
|
docs = await load_night_window(window_start, window_end)
|
|
total_articles = len(docs)
|
|
total_countries_in_window = len({d["country"] for d in docs})
|
|
|
|
# 2. Cluster (topic-first)
|
|
clusters = cluster_global(docs)
|
|
|
|
if not clusters:
|
|
briefing_id = await _save_briefing(
|
|
briefing_date=briefing_date,
|
|
window_start=window_start,
|
|
window_end=window_end,
|
|
total_articles=total_articles,
|
|
total_countries=total_countries_in_window,
|
|
topic_rows=[],
|
|
llm_calls=0,
|
|
llm_failures=0,
|
|
generation_ms=int((time.time() - start) * 1000),
|
|
status="empty",
|
|
)
|
|
logger.info(f"[briefing] empty (no usable clusters) → briefing_id={briefing_id}")
|
|
return {
|
|
"briefing_id": briefing_id,
|
|
"status": "empty",
|
|
"total_topics": 0,
|
|
"total_articles": total_articles,
|
|
"llm_calls": 0,
|
|
"llm_failures": 0,
|
|
"generation_ms": int((time.time() - start) * 1000),
|
|
"regenerated": True,
|
|
}
|
|
|
|
# 3. (옵션) Historical candidate pool 1회 로드
|
|
historical_candidates: list[dict] = []
|
|
if historical_enabled():
|
|
hist_end = window_start # 오늘 윈도우 직전까지
|
|
hist_start = hist_end - timedelta(days=HISTORICAL_WINDOW_DAYS)
|
|
exclude = {d["id"] for d in docs}
|
|
historical_candidates = await load_historical_candidates(hist_start, hist_end, exclude)
|
|
|
|
# 4. cluster 별 LLM 호출
|
|
client = AIClient()
|
|
primary_model = client.ai.primary.model
|
|
topic_rows: list[BriefingTopic] = []
|
|
llm_calls = 0
|
|
llm_failures = 0
|
|
usable_count = 0
|
|
|
|
try:
|
|
# 2026-06-15: cluster 호출 gather 동시 실행. 실동시성 = 전역 MLX gate
|
|
# (config.mlx_gate_concurrency, BACKGROUND 우선순위). rank/순서 보존.
|
|
jobs = []
|
|
for rank, cluster in enumerate(clusters, start=1):
|
|
selected = select_for_llm(cluster, k=SELECT_K, lambda_mmr=SELECT_LAMBDA_MMR)
|
|
historical_docs = (
|
|
retrieve_historical(cluster, historical_candidates)
|
|
if historical_enabled() else []
|
|
)
|
|
jobs.append((rank, cluster, selected, historical_docs))
|
|
|
|
if job_id is not None:
|
|
await bgj.heartbeat(db_engine, job_id, total=len(jobs))
|
|
_prog = {"n": 0}
|
|
|
|
async def _run_one(cluster, selected, historical_docs):
|
|
r = await compare_cluster_with_fallback(
|
|
client, cluster, selected, historical_docs=historical_docs
|
|
)
|
|
if job_id is not None:
|
|
_prog["n"] += 1
|
|
await bgj.heartbeat(db_engine, job_id, processed=_prog["n"])
|
|
return r
|
|
|
|
results = await asyncio.gather(
|
|
*[_run_one(c, s, h) for (_, c, s, h) in jobs]
|
|
)
|
|
|
|
for (rank, cluster, selected, historical_docs), envelope in zip(jobs, results):
|
|
llm_calls += 1
|
|
if envelope.get("llm_fallback_used"):
|
|
llm_failures += 1
|
|
if _is_usable_topic(envelope, envelope["topic_label"]):
|
|
usable_count += 1
|
|
topic_rows.append(
|
|
_build_topic_row(rank, cluster, envelope, historical_docs, primary_model)
|
|
)
|
|
finally:
|
|
await client.close()
|
|
|
|
generation_ms = int((time.time() - start) * 1000)
|
|
status = _compute_status(llm_calls, llm_failures, usable_count, has_topics=bool(topic_rows))
|
|
|
|
briefing_id = await _save_briefing(
|
|
briefing_date=briefing_date,
|
|
window_start=window_start,
|
|
window_end=window_end,
|
|
total_articles=total_articles,
|
|
total_countries=total_countries_in_window,
|
|
topic_rows=topic_rows,
|
|
llm_calls=llm_calls,
|
|
llm_failures=llm_failures,
|
|
generation_ms=generation_ms,
|
|
status=status,
|
|
)
|
|
|
|
fallback_pct = (llm_failures / llm_calls * 100.0) if llm_calls else 0.0
|
|
logger.info(
|
|
f"[briefing] done id={briefing_id} status={status} topics={len(topic_rows)} "
|
|
f"usable={usable_count}/{llm_calls} fallback={llm_failures}/{llm_calls} ({fallback_pct:.1f}%) "
|
|
f"elapsed={generation_ms / 1000:.1f}s"
|
|
)
|
|
|
|
return {
|
|
"briefing_id": briefing_id,
|
|
"status": status,
|
|
"total_topics": len(topic_rows),
|
|
"total_articles": total_articles,
|
|
"llm_calls": llm_calls,
|
|
"llm_failures": llm_failures,
|
|
"generation_ms": generation_ms,
|
|
"regenerated": True,
|
|
}
|