Files
hyungi_document_server/app/services/digest/pipeline.py
T
hyungi a82b0724df fix(news): digest/briefing 생성 LLM 타임아웃 게이트 단일소스화 + deep_summary 컨슈머 분리
2026-06-11 맥미니 모델 교체(Gemma4 26B→Qwen3.6-27B-6bit, 콜당 ~90~300s)의
타임아웃 상향 sweep 이 config.yaml/synthesis 만 갱신하고 digest/briefing 코드의
하드코딩 LLM_CALL_TIMEOUT=25(빠른 Gemma 기준)를 누락 → digest 600s 하드캡 초과로
06-10 이후 미생성, briefing 4/4 LLM 폴백(status=failed). (적대 리뷰로 블로커 정정:
concurrency=1 사설 세마포로는 digest 44~68 클러스터가 하드캡에 여전히 걸림 + llm_gate
영구 룰 위반.)

- 타임아웃·재시도·하드캡을 config.pipeline 단일소스로 이관(digest_llm_timeout_s=300,
  attempts=2, pipeline_hard_cap_s=3000). 다음 모델 교체 때 재발 차단.
- digest/briefing LLM 호출을 사설 Semaphore 제거하고 전역 MLX gate(BACKGROUND)
  경유로 변경 — llm_gate 영구 룰(같은 endpoint 단일 게이트, 새 Semaphore 금지) 준수 +
  ask/eid(FOREGROUND)와 조율. 동시성 lever = 기존 mlx_gate_concurrency 2→4
  (continuous batching 실측 — 3동시콜 wall 121s ≈ 단일콜, 직렬 대비 ~3배).
- digest/briefing pipeline cluster 루프를 asyncio.gather 동시 실행으로 전환
  (실동시성은 게이트가 제한, rank/순서 보존).
- deep_summary(70~300s)를 메인 consume_queue 에서 분리해 consume_deep_queue 신설
  (markdown/fast split 선례) — 단일 deep 호출이 1분 틱 초과로 메인 큐를 영구 coalesce
  시키던 문제 제거.
- 죽은 PIPELINE_HARD_CAP=600(briefing/pipeline.py) 제거, summarizer docstring 갱신,
  deep 컨슈머 disjoint/hold 테스트 추가.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-14 23:29:56 +00:00

188 lines
6.3 KiB
Python

"""Phase 4 digest pipeline orchestration.
Step:
1. AIClient 생성
2. 7일 window 로 documents 로드 (loader)
3. country 별 cluster_country (clustering)
4. cluster 별 select_for_llm (selection)
5. cluster 별 summarize_cluster_with_fallback (summarizer, LLM)
6. DELETE+INSERT 단일 트랜잭션 (idempotent)
7. start/end 로그 + generation_ms + fallback 비율 health metric
"""
import asyncio
import hashlib
import time
from datetime import datetime, timedelta, timezone
from zoneinfo import ZoneInfo
from sqlalchemy import delete
from ai.client import AIClient
from core.database import async_session
from core.utils import setup_logger
from models.digest import DigestTopic, GlobalDigest
from .clustering import LAMBDA, cluster_country
from .loader import load_news_window
from .selection import select_for_llm
from .summarizer import summarize_cluster_with_fallback
logger = setup_logger("digest_pipeline")
WINDOW_DAYS = 7
KST = ZoneInfo("Asia/Seoul")
def _kst_today() -> datetime:
return datetime.now(KST).date()
def _summary_hash(text: str) -> str:
return hashlib.sha256((text or "").encode("utf-8")).hexdigest()[:16]
def _build_topic_row(
country: str,
rank: int,
cluster: dict,
selected: list[dict],
llm_result: dict,
primary_model: str,
) -> DigestTopic:
"""LLM 결과 + cluster 메타 → DigestTopic ORM 인스턴스.
article_ids 는 코드가 cluster.members 에서 직접 주입 (LLM 생성 금지 → id 위조 불가).
"""
article_ids = [int(m["id"]) for m in cluster["members"]]
centroid_sample = {
"selected_doc_ids": [int(m["id"]) for m in selected],
"summary_hashes": [_summary_hash(m.get("ai_summary") or "") for m in selected],
}
return DigestTopic(
country=country,
topic_rank=rank,
topic_label=llm_result["topic_label"],
summary=llm_result["summary"],
article_ids=article_ids,
article_count=len(article_ids),
importance_score=float(cluster["importance_score"]),
raw_weight_sum=float(cluster["raw_weight_sum"]),
centroid_sample=centroid_sample,
llm_model=primary_model,
llm_fallback_used=bool(llm_result["llm_fallback_used"]),
)
async def run_digest_pipeline() -> dict:
"""전체 파이프라인 실행. worker entry 에서 호출.
Returns:
실행 통계 dict {llm_calls, fallback_used, total_topics, generation_ms}
"""
start = time.time()
window_end = datetime.now(timezone.utc)
window_start = window_end - timedelta(days=WINDOW_DAYS)
digest_date = _kst_today()
logger.info(
f"[global_digest] start window={window_start.date()} ~ {window_end.date()} "
f"digest_date={digest_date} decay_lambda={LAMBDA:.4f}"
)
docs_by_country = await load_news_window(window_start, window_end)
if not docs_by_country:
logger.warning("[global_digest] 7일 window에 뉴스 0건 — digest 생성 스킵")
return {
"llm_calls": 0,
"fallback_used": 0,
"total_topics": 0,
"generation_ms": int((time.time() - start) * 1000),
}
client = AIClient()
primary_model = client.ai.primary.model
all_topic_rows: list[DigestTopic] = []
stats = {"llm_calls": 0, "fallback_used": 0}
try:
# 2026-06-15: cluster 호출을 gather 로 동시 실행. 실제 동시성은 전역 MLX gate
# (config.mlx_gate_concurrency, BACKGROUND 우선순위) 가 제한한다. rank/순서 보존.
jobs = []
for country, docs in docs_by_country.items():
clusters = cluster_country(country, docs)
if not clusters:
continue # sparse country 자동 제외
for rank, cluster in enumerate(clusters, start=1):
selected = select_for_llm(cluster)
jobs.append((country, rank, cluster, selected))
async def _run_one(cluster, selected):
return await summarize_cluster_with_fallback(client, cluster, selected)
results = await asyncio.gather(*[_run_one(c, s) for (_, _, c, s) in jobs])
for (country, rank, cluster, selected), llm_result in zip(jobs, results):
stats["llm_calls"] += 1
if llm_result["llm_fallback_used"]:
stats["fallback_used"] += 1
all_topic_rows.append(
_build_topic_row(country, rank, cluster, selected, llm_result, primary_model)
)
finally:
await client.close()
generation_ms = int((time.time() - start) * 1000)
total_articles = sum(len(d) for d in docs_by_country.values())
countries_with_topics = len({r.country for r in all_topic_rows})
if stats["fallback_used"] == 0:
status = "success"
elif stats["llm_calls"] and stats["fallback_used"] / stats["llm_calls"] > 0.5:
status = "failed"
else:
status = "partial"
async with async_session() as session:
# idempotent: 같은 날짜 row 가 있으면 CASCADE 로 topics 까지 삭제
await session.execute(
delete(GlobalDigest).where(GlobalDigest.digest_date == digest_date)
)
new_digest = GlobalDigest(
digest_date=digest_date,
window_start=window_start,
window_end=window_end,
decay_lambda=LAMBDA,
total_articles=total_articles,
total_countries=countries_with_topics,
total_topics=len(all_topic_rows),
generation_ms=generation_ms,
llm_calls=stats["llm_calls"],
llm_failures=stats["fallback_used"],
status=status,
)
new_digest.topics = all_topic_rows
session.add(new_digest)
await session.commit()
fallback_pct = (
(stats["fallback_used"] / stats["llm_calls"] * 100.0)
if stats["llm_calls"] else 0.0
)
logger.info(
f"[global_digest] done countries={countries_with_topics} "
f"topics={len(all_topic_rows)} llm_calls={stats['llm_calls']} "
f"fallback={stats['fallback_used']}/{stats['llm_calls']} ({fallback_pct:.2f}%) "
f"status={status} elapsed={generation_ms / 1000:.1f}s"
)
return {
"llm_calls": stats["llm_calls"],
"fallback_used": stats["fallback_used"],
"total_topics": len(all_topic_rows),
"generation_ms": generation_ms,
"status": status,
}