feat(briefing): add morning briefing schema + services + api (historical off)

야간 수집 뉴스 (KST 00:00~05:00) topic×country 비교 분석 1페이지 카드.
Phase 4 Global Digest 와 코드/로직/테이블 분리, 알고리즘만 services/clustering_common 공유.

Backend 신규:
- migrations/255_morning_briefings.sql: morning_briefings + briefing_topics
  (briefing_date UNIQUE, UNIQUE(briefing_id,topic_rank), FK CASCADE,
  historical_* 3컬럼 nullable, cluster_members JSONB, country_perspectives
  JSONB, status 4-state success|partial|failed|empty)
- app/models/briefing.py: SQLAlchemy ORM
- app/services/briefing/loader.py: KST 5h 윈도우 + news_sources prefix
  fallback (Phase 4 패턴 미러) + historical candidate pool 로더
- app/services/briefing/clustering.py: cluster_global topic-first
  (LAMBDA=ln(2)/2h, MIN_COUNTRIES_PER_TOPIC=2, MAX_TOPICS=7)
- app/services/briefing/comparator.py: call_primary 26B + JSON envelope
  sanitize (cap perspectives 10 / divergences 3 / convergences 2 /
  quotes 5) + fallback row 고정 형태 + retrieve_historical cosine top-K
- app/services/briefing/pipeline.py: load→cluster→select(K=7,λ=0.6)
  →historical→compare→status 4-state→delete+insert transaction
- app/workers/briefing_worker.py: APScheduler/수동 호출 공용 진입점,
  600s hard cap
- app/prompts/briefing_comparative.txt: 한국어 비교 분석 JSON 프롬프트,
  {articles_block} + {historical_block} 2섹션, 인용 금지 라벨
- app/api/briefing.py: GET /latest, GET ?date=, POST /regenerate?date=
  (admin, sync delete+insert tx, regenerated:true)

Backend 수정:
- app/main.py: briefing_router 등록 (/api/briefing prefix). scheduler
  등록은 PR-3 에서.
- app/services/digest/selection.py: select_for_llm 매개변수화 (K, λ
  caller 주입). Phase 4 동작은 default 값으로 보존.

Historical 정책:
- BRIEFING_HISTORICAL_ENABLED env flag, default off.
- flag off → historical_* 컬럼 모두 NULL, prompt {historical_block} 빈
  라벨, retrieval 호출 안 함.
- flag on (PR-1b 에서 enable) → cluster centroid 와 과거 30일 doc
  embedding cosine top-K 5 (sim≥0.70), prompt 에 주입.

Country canonical (실측 확인 후):
- documents.country 컬럼 부재 확정
- document_chunks.country 매칭률 0% (chunks 자체가 뉴스에 안 만들어짐)
- 유일 country 신호 = news_sources prefix 매핑 (Phase 4 와 동일)

Tests:
- tests/test_briefing_historical.py: 3 경로 회귀 (flag off/on with
  fixture/on zero match) + sanitize cap + fallback row 형태.

Verification: PR-1.8 에서 GPU 컨테이너 pytest + 수동 regenerate.
This commit is contained in:
Hyungi Ahn
2026-05-12 12:58:50 +09:00
parent 1ca6d8b522
commit 431d4fe010
13 changed files with 1466 additions and 7 deletions
+203
View File
@@ -0,0 +1,203 @@
"""Morning Briefing API — read-only + 수동 regenerate.
엔드포인트:
- GET /api/briefing/latest : 가장 최근 briefing
- GET /api/briefing?date=YYYY-MM-DD : 특정 날짜 briefing
- POST /api/briefing/regenerate?date=... : 동기 워커 트리거 (admin), DELETE+INSERT tx
응답은 topic 평면 list (axis 반대 — Phase 4 와 달리 country 그룹 X).
각 topic 안에 country_perspectives JSONB 가 들어있어 cross-country 비교 분석을 표현.
"""
from datetime import date as date_type
from datetime import datetime
from typing import Annotated
from fastapi import APIRouter, Depends, HTTPException, Query
from pydantic import BaseModel
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload
from core.auth import get_current_user, require_admin
from core.database import get_session
from models.briefing import BriefingTopic, MorningBriefing
from models.user import User
router = APIRouter()
# ─── Pydantic 응답 모델 ───
class CountryPerspective(BaseModel):
country: str
summary: str
article_ids: list[int] = []
class KeyQuote(BaseModel):
country: str = ""
source: str = ""
quote: str
class TopicResponse(BaseModel):
topic_rank: int
topic_label: str
headline: str
country_perspectives: list[CountryPerspective]
divergences: list[str]
convergences: list[str]
key_quotes: list[KeyQuote]
historical_context: str | None = None
cluster_members: list[int] = []
article_count: int
country_count: int
importance_score: float
llm_fallback_used: bool
class BriefingResponse(BaseModel):
briefing_date: date_type
window_start: datetime
window_end: datetime
decay_lambda: float
total_articles: int
total_countries: int
total_topics: int
generation_ms: int | None
llm_calls: int
llm_failures: int
status: str
headline_oneliner: str | None = None
topics: list[TopicResponse]
class RegenerateResponse(BaseModel):
status: str
briefing_id: int | None
briefing_date: date_type
total_topics: int
total_articles: int
llm_calls: int
llm_failures: int
generation_ms: int
regenerated: bool
# ─── helpers ───
def _build_response(b: MorningBriefing) -> BriefingResponse:
topics = []
for t in sorted(b.topics, key=lambda x: x.topic_rank):
topics.append(
TopicResponse(
topic_rank=t.topic_rank,
topic_label=t.topic_label,
headline=t.headline,
country_perspectives=[
CountryPerspective(**cp) for cp in (t.country_perspectives or [])
],
divergences=list(t.divergences or []),
convergences=list(t.convergences or []),
key_quotes=[KeyQuote(**q) for q in (t.key_quotes or [])],
historical_context=t.historical_context,
cluster_members=list(t.cluster_members or []),
article_count=t.article_count,
country_count=t.country_count,
importance_score=t.importance_score,
llm_fallback_used=t.llm_fallback_used,
)
)
return BriefingResponse(
briefing_date=b.briefing_date,
window_start=b.window_start,
window_end=b.window_end,
decay_lambda=b.decay_lambda,
total_articles=b.total_articles,
total_countries=b.total_countries,
total_topics=b.total_topics,
generation_ms=b.generation_ms,
llm_calls=b.llm_calls,
llm_failures=b.llm_failures,
status=b.status,
headline_oneliner=b.headline_oneliner,
topics=topics,
)
async def _load_briefing(
session: AsyncSession,
target_date: date_type | None,
) -> MorningBriefing | None:
query = select(MorningBriefing).options(selectinload(MorningBriefing.topics))
if target_date is not None:
query = query.where(MorningBriefing.briefing_date == target_date)
else:
query = query.order_by(MorningBriefing.briefing_date.desc())
query = query.limit(1)
result = await session.execute(query)
return result.scalar_one_or_none()
# ─── Routes ───
@router.get("/latest", response_model=BriefingResponse)
async def get_latest(
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
):
"""가장 최근 morning briefing."""
b = await _load_briefing(session, target_date=None)
if b is None:
raise HTTPException(status_code=404, detail="아직 생성된 briefing 없음")
return _build_response(b)
@router.get("", response_model=BriefingResponse)
async def get_briefing(
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
date: date_type | None = Query(default=None, description="YYYY-MM-DD (KST briefing_date)"),
):
"""특정 날짜 briefing (date 미지정 시 최신)."""
b = await _load_briefing(session, target_date=date)
if b is None:
raise HTTPException(
status_code=404,
detail=f"briefing 없음 (date={date})" if date else "아직 생성된 briefing 없음",
)
return _build_response(b)
@router.post("/regenerate", response_model=RegenerateResponse)
async def regenerate(
user: Annotated[User, Depends(require_admin)],
date: date_type | None = Query(default=None, description="YYYY-MM-DD KST 기준 briefing_date"),
):
"""수동 트리거 (admin). 동기 실행 — delete+insert transaction.
date 미지정 시 오늘 KST. 같은 날 row 존재 시 transaction 안에서 삭제 후 신규 생성.
응답 status='success' | 'partial' | 'failed' | 'empty'.
"""
from workers.briefing_worker import run
result = await run(target_date=date)
if result is None:
raise HTTPException(status_code=500, detail="briefing 워커 실행 실패 (로그 확인)")
return RegenerateResponse(
status=result["status"],
briefing_id=result.get("briefing_id"),
briefing_date=date or datetime.now().date(),
total_topics=result["total_topics"],
total_articles=result["total_articles"],
llm_calls=result["llm_calls"],
llm_failures=result["llm_failures"],
generation_ms=result["generation_ms"],
regenerated=result.get("regenerated", True),
)
+2
View File
@@ -8,6 +8,7 @@ from sqlalchemy import func, select, text
from api.audio import router as audio_router
from api.auth import router as auth_router
from api.briefing import router as briefing_router
from api.config import router as config_router
from api.dashboard import router as dashboard_router
from api.digest import router as digest_router
@@ -135,6 +136,7 @@ app.include_router(dashboard_router, prefix="/api/dashboard", tags=["dashboard"]
app.include_router(library_router, prefix="/api/library", tags=["library"])
app.include_router(news_router, prefix="/api/news", tags=["news"])
app.include_router(digest_router, prefix="/api/digest", tags=["digest"])
app.include_router(briefing_router, prefix="/api/briefing", tags=["briefing"])
app.include_router(audio_router, prefix="/api/audio", tags=["audio"])
app.include_router(video_router, prefix="/api/video", tags=["video"])
app.include_router(study_sessions_router, prefix="/api/study-sessions", tags=["study-sessions"])
+97
View File
@@ -0,0 +1,97 @@
"""morning_briefings + briefing_topics 테이블 ORM (야간 수집 뉴스 브리핑).
axis 반대: Phase 4 = country×topic / Briefing = topic×country.
country_perspectives JSONB 안에 한 topic 의 여러 국가 관점 array.
"""
from datetime import date, datetime
from sqlalchemy import (
BigInteger,
Boolean,
Date,
DateTime,
Float,
ForeignKey,
Integer,
String,
Text,
)
from sqlalchemy.dialects.postgresql import JSONB
from sqlalchemy.orm import Mapped, mapped_column, relationship
from core.database import Base
class MorningBriefing(Base):
"""하루 단위 브리핑 메타데이터 (KST 자정~05:00 윈도우)"""
__tablename__ = "morning_briefings"
id: Mapped[int] = mapped_column(BigInteger, primary_key=True)
briefing_date: Mapped[date] = mapped_column(Date, nullable=False, unique=True)
window_start: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False)
window_end: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False)
decay_lambda: Mapped[float] = mapped_column(Float, nullable=False)
total_articles: Mapped[int] = mapped_column(Integer, nullable=False, default=0)
total_countries: Mapped[int] = mapped_column(Integer, nullable=False, default=0)
total_topics: Mapped[int] = mapped_column(Integer, nullable=False, default=0)
generation_ms: Mapped[int | None] = mapped_column(Integer)
llm_calls: Mapped[int] = mapped_column(Integer, nullable=False, default=0)
llm_failures: Mapped[int] = mapped_column(Integer, nullable=False, default=0)
status: Mapped[str] = mapped_column(String(20), nullable=False, default="success")
headline_oneliner: Mapped[str | None] = mapped_column(Text)
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), nullable=False, default=datetime.now
)
topics: Mapped[list["BriefingTopic"]] = relationship(
back_populates="briefing",
cascade="all, delete-orphan",
order_by="BriefingTopic.topic_rank",
)
class BriefingTopic(Base):
"""1 briefing 안 topic_rank 순 cross-country 비교 분석 결과"""
__tablename__ = "briefing_topics"
id: Mapped[int] = mapped_column(BigInteger, primary_key=True)
briefing_id: Mapped[int] = mapped_column(
BigInteger,
ForeignKey("morning_briefings.id", ondelete="CASCADE"),
nullable=False,
)
topic_rank: Mapped[int] = mapped_column(Integer, nullable=False)
topic_label: Mapped[str] = mapped_column(String(120), nullable=False)
headline: Mapped[str] = mapped_column(Text, nullable=False)
country_perspectives: Mapped[list] = mapped_column(JSONB, nullable=False, default=list)
divergences: Mapped[list] = mapped_column(JSONB, nullable=False, default=list)
convergences: Mapped[list] = mapped_column(JSONB, nullable=False, default=list)
key_quotes: Mapped[list] = mapped_column(JSONB, nullable=False, default=list)
historical_article_ids: Mapped[list | None] = mapped_column(JSONB)
historical_context: Mapped[str | None] = mapped_column(Text)
historical_window_days: Mapped[int | None] = mapped_column(Integer)
cluster_members: Mapped[list] = mapped_column(JSONB, nullable=False, default=list)
article_count: Mapped[int] = mapped_column(Integer, nullable=False)
country_count: Mapped[int] = mapped_column(Integer, nullable=False)
importance_score: Mapped[float] = mapped_column(Float, nullable=False)
raw_weight_sum: Mapped[float] = mapped_column(Float, nullable=False)
llm_model: Mapped[str | None] = mapped_column(String(100))
llm_fallback_used: Mapped[bool] = mapped_column(Boolean, nullable=False, default=False)
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), nullable=False, default=datetime.now
)
briefing: Mapped["MorningBriefing"] = relationship(back_populates="topics")
+46
View File
@@ -0,0 +1,46 @@
너는 다국적 뉴스 비교 분석가다.
아래는 같은 주제로 군집된 야간 수집 뉴스들 — 각 줄 앞 (국가코드 · 소스) 표시로 출처가 표시되어 있다.
이 정보만으로 cross-country 비교 분석을 JSON 으로만 출력하라.
목표:
- 같은 사건을 각 나라가 어떻게 다르게 다루는지 / 무엇이 공통인지를 1페이지 카드 형태로 정리.
- 사용자는 한국어 독자. 한국어로 출력.
절대 금지:
- 제공된 summary 에 없는 사실 추가
- 추측 표현 ("보인다", "~할 것이다", "~할 전망" 등)
- JSON 외의 모든 텍스트 (설명, 마크다운, 코드블록 금지)
- 인용부호 안 원문에 없던 단어 생성 (key_quotes 는 원문 그대로만)
분량 cap (반드시 지킬 것):
- country_perspectives: 최대 10개, 각 summary 는 1~2문장 (한국어 120자 이내)
- divergences: 최대 3개, 각 200자 이내
- convergences: 최대 2개, 각 200자 이내
- key_quotes: 최대 5개, 각 quote 240자 이내
- historical_context: 1~2문장 (한국어 120자 이내), 의미 있을 때만 채우고 아니면 null
출력 형식 (JSON 객체 하나만 출력, 위 cap 초과 금지):
{
"topic_label": "5~10 단어의 한국어 토픽 제목",
"headline": "전체를 한 줄로 압축한 한국어 headline (≤80자)",
"country_perspectives": [
{"country": "KR", "summary": "...", "article_ids": []},
{"country": "US", "summary": "...", "article_ids": []}
],
"divergences": ["A국=X 강조 / B국=Y 비판 / C국=Z 부각"],
"convergences": ["모든 매체가 Z 사실은 일치"],
"key_quotes": [{"country": "US", "source": "NYT", "quote": "..."}],
"historical_context": null
}
규칙:
- country_perspectives 의 country 는 입력 기사의 국가코드 그대로 (대문자).
- article_ids 는 비워둬도 됨 (서버가 채움).
- 단일 국가만 다룬 경우 divergences 는 빈 배열.
- historical_context 는 아래 "이전 흐름 참고" 섹션이 비어있으면 반드시 null.
오늘 새벽 기사 묶음:
{articles_block}
이전 흐름 참고 (직접 인용 금지, 맥락 파악 용도):
{historical_block}
View File
+80
View File
@@ -0,0 +1,80 @@
"""야간 뉴스 topic-first 클러스터링.
Phase 4 와 axis 반대: country 별 cluster 가 아닌 **전체 doc 합쳐서 topic cluster**.
각 cluster 안에 country 분포가 자동으로 들어감 (doc dict 의 country field).
파라미터 (5h 윈도우용):
- LAMBDA = ln(2)/2h ≈ 0.347 (2시간 반감기, 야간 5h 윈도우라 빠른 감쇠)
- threshold = 0.78 고정 (Phase 4 0.75~0.80 중간값)
- MIN_ARTICLES_PER_TOPIC = 2 (야간 sparse 대비 완화)
- MIN_COUNTRIES_PER_TOPIC = 2 (cross-country 가치 핵심)
- MAX_TOPICS = 7 (1페이지 분량)
"""
import math
from core.utils import setup_logger
from services.clustering_common import (
greedy_assign_cluster,
normalize_importance_scores,
)
logger = setup_logger("briefing_clustering")
LAMBDA = math.log(2) / (2.0 / 24.0) # 2시간 반감기 (단위: 일)
THRESHOLD = 0.78
CENTROID_ALPHA = 0.7
MIN_ARTICLES_PER_TOPIC = 2
MIN_COUNTRIES_PER_TOPIC = 2
MAX_TOPICS = 7
def _count_distinct_countries(cluster: dict) -> int:
return len({m.get("country") for m in cluster["members"] if m.get("country")})
def cluster_global(docs: list[dict]) -> list[dict]:
"""모든 country docs 를 합쳐 topic cluster 생성.
Args:
docs: loader.load_night_window 의 출력 (각 dict 에 country field 포함).
Returns:
[{centroid, members, weight_sum, raw_weight_sum, importance_score, country_count}, ...]
- MIN_ARTICLES + MIN_COUNTRIES 둘 다 충족 cluster 만
- importance_score 내림차순, MAX_TOPICS 개 cap
"""
if not docs:
logger.info("[briefing] docs=0 → skip")
return []
clusters, raw_count = greedy_assign_cluster(
docs,
threshold=THRESHOLD,
centroid_alpha=CENTROID_ALPHA,
min_articles=MIN_ARTICLES_PER_TOPIC,
max_topics=MAX_TOPICS * 4, # MIN_COUNTRIES 필터 전 buffer
lambda_val=LAMBDA,
)
# MIN_COUNTRIES_PER_TOPIC 필터 — single-country cluster drop
pre_country_filter = len(clusters)
filtered = []
for c in clusters:
cc = _count_distinct_countries(c)
if cc >= MIN_COUNTRIES_PER_TOPIC:
c["country_count"] = cc
filtered.append(c)
clusters = filtered[:MAX_TOPICS]
dropped_country = pre_country_filter - len(clusters)
dropped_min_articles = raw_count - pre_country_filter
# MIN_COUNTRIES + MAX_TOPICS 필터 후 importance 재정규화 (briefing 내 0~1)
normalize_importance_scores(clusters)
logger.info(
f"[briefing] docs={len(docs)} threshold={THRESHOLD} "
f"raw_clusters={raw_count} dropped_min_articles={dropped_min_articles} "
f"dropped_single_country={dropped_country} kept={len(clusters)}"
)
return clusters
+252
View File
@@ -0,0 +1,252 @@
"""Cluster → 26B MLX 비교 분석 호출 + JSON envelope + historical context + fallback row.
Plan §"LLM Parse 실패 시 Fallback Topic Row (고정 형태)":
LLM JSON parse 2회 재시도 후 실패 → 고정 형태 fallback 저장 (drop 금지).
Plan §"Historical Context":
BRIEFING_HISTORICAL_ENABLED=true 시 cluster centroid 와 historical candidate
cosine top-K 5 (similarity ≥0.70) 추출 → 프롬프트 {historical_block} 주입.
LLM 응답 envelope 의 historical_context 옵션 필드.
"""
import asyncio
import json
import os
from pathlib import Path
from typing import Any
import numpy as np
from ai.client import parse_json_response
from core.utils import setup_logger
from services.clustering_common import normalize_vector
logger = setup_logger("briefing_comparator")
LLM_CALL_TIMEOUT = 25 # 초. Phase 4 와 동일
HISTORICAL_TOP_K = 5
HISTORICAL_SIMILARITY_MIN = 0.70
HISTORICAL_WINDOW_DAYS = 30
# JSON envelope cap (프롬프트 + 후처리 양쪽 강제)
MAX_PERSPECTIVES = 10
MAX_DIVERGENCES = 3
MAX_CONVERGENCES = 2
MAX_KEY_QUOTES = 5
MAX_PERSPECTIVE_SUMMARY_LEN = 240 # 한국어 1~2문장 ≤120자 × 2
MAX_HISTORICAL_CONTEXT_LEN = 240
FALLBACK_HEADLINE = "LLM 분석 실패로 원문 기사 묶음만 표시합니다."
FALLBACK_TOPIC_LABEL = "주요 뉴스 묶음"
_llm_sem = asyncio.Semaphore(1)
_PROMPT_PATH = Path(__file__).resolve().parent.parent.parent / "prompts" / "briefing_comparative.txt"
_PROMPT_TEMPLATE: str | None = None
def historical_enabled() -> bool:
return os.environ.get("BRIEFING_HISTORICAL_ENABLED", "false").lower() in {"1", "true", "yes"}
def _load_prompt() -> str:
global _PROMPT_TEMPLATE
if _PROMPT_TEMPLATE is None:
_PROMPT_TEMPLATE = _PROMPT_PATH.read_text(encoding="utf-8")
return _PROMPT_TEMPLATE
def _build_articles_block(selected: list[dict]) -> str:
lines = []
for i, m in enumerate(selected, start=1):
country = m.get("country") or "??"
source = m.get("ai_sub_group") or ""
text = (m.get("ai_summary_truncated") or m.get("ai_summary") or m.get("title") or "").strip()
lines.append(f"[{i}] ({country} · {source}) {text}")
return "\n".join(lines)
def _build_historical_block(historical_docs: list[dict]) -> str:
if not historical_docs:
return "(과거 참고 자료 없음)"
lines = ["※ 이전 30일 흐름 참고용 — 본 분석에서 직접 인용 금지, 맥락 파악 용도."]
for i, d in enumerate(historical_docs, start=1):
text = (d.get("ai_summary") or d.get("title") or "").strip()
# historical 은 ai_summary 가 길 수 있어 200자 cap
if len(text) > 200:
text = text[:200] + ""
lines.append(f"[H{i}] {text}")
return "\n".join(lines)
def build_prompt(selected: list[dict], historical_docs: list[dict]) -> str:
template = _load_prompt()
articles_block = _build_articles_block(selected)
historical_block = _build_historical_block(historical_docs)
return template.replace("{articles_block}", articles_block).replace(
"{historical_block}", historical_block
)
def retrieve_historical(
cluster: dict,
candidates: list[dict],
*,
top_k: int = HISTORICAL_TOP_K,
sim_min: float = HISTORICAL_SIMILARITY_MIN,
) -> list[dict]:
"""cluster centroid 와 candidate pool 의 cosine top-K (sim ≥ sim_min).
candidates 가 비어있거나 sim 미달 시 빈 list.
"""
if not candidates:
return []
centroid = cluster["centroid"]
scored = []
for d in candidates:
v = normalize_vector(d["embedding"])
sim = float(np.dot(centroid, v))
if sim >= sim_min:
scored.append((sim, d))
scored.sort(key=lambda x: -x[0])
return [d for _, d in scored[:top_k]]
async def _try_call_llm(client: Any, prompt: str) -> str:
async with _llm_sem:
return await asyncio.wait_for(
client.call_primary(prompt),
timeout=LLM_CALL_TIMEOUT,
)
def _truncate_str(s: Any, limit: int) -> str:
if not isinstance(s, str):
return ""
s = s.strip()
if len(s) > limit:
s = s[:limit].rstrip() + ""
return s
def _sanitize_envelope(parsed: dict, cluster: dict) -> dict | None:
"""LLM 응답 envelope 검증 + cap 강제. None 반환 시 fallback 발동."""
if not isinstance(parsed, dict):
return None
topic_label = _truncate_str(parsed.get("topic_label"), 120)
headline = _truncate_str(parsed.get("headline"), 200)
if not topic_label or not headline:
return None
# country_perspectives
raw_persp = parsed.get("country_perspectives")
perspectives = []
if isinstance(raw_persp, list):
for p in raw_persp[:MAX_PERSPECTIVES]:
if not isinstance(p, dict):
continue
country = _truncate_str(p.get("country"), 10).upper()
summary = _truncate_str(p.get("summary"), MAX_PERSPECTIVE_SUMMARY_LEN)
ids = p.get("article_ids") or []
if not isinstance(ids, list):
ids = []
ids = [int(x) for x in ids if isinstance(x, (int, str)) and str(x).isdigit()]
if country and summary:
perspectives.append({"country": country, "summary": summary, "article_ids": ids})
if not perspectives:
# 비교 분석 가치가 없는 응답 → fallback
return None
def _str_array(key: str, cap: int, item_limit: int) -> list[str]:
raw = parsed.get(key)
if not isinstance(raw, list):
return []
out = []
for it in raw[:cap]:
t = _truncate_str(it, item_limit)
if t:
out.append(t)
return out
divergences = _str_array("divergences", MAX_DIVERGENCES, 200)
convergences = _str_array("convergences", MAX_CONVERGENCES, 200)
# key_quotes: [{country, source, quote}]
raw_quotes = parsed.get("key_quotes")
quotes = []
if isinstance(raw_quotes, list):
for q in raw_quotes[:MAX_KEY_QUOTES]:
if not isinstance(q, dict):
continue
entry = {
"country": _truncate_str(q.get("country"), 10).upper(),
"source": _truncate_str(q.get("source"), 60),
"quote": _truncate_str(q.get("quote"), 240),
}
if entry["quote"]:
quotes.append(entry)
historical_context = _truncate_str(parsed.get("historical_context"), MAX_HISTORICAL_CONTEXT_LEN) or None
return {
"topic_label": topic_label,
"headline": headline,
"country_perspectives": perspectives,
"divergences": divergences,
"convergences": convergences,
"key_quotes": quotes,
"historical_context": historical_context,
"llm_fallback_used": False,
}
def _make_fallback(cluster: dict) -> dict:
"""Plan §"Fallback Topic Row (고정 형태)". drop 금지, country_perspectives 빈 list."""
return {
"topic_label": FALLBACK_TOPIC_LABEL,
"headline": FALLBACK_HEADLINE,
"country_perspectives": [],
"divergences": [],
"convergences": [],
"key_quotes": [],
"historical_context": None,
"llm_fallback_used": True,
}
async def compare_cluster_with_fallback(
client: Any,
cluster: dict,
selected: list[dict],
historical_docs: list[dict] | None = None,
) -> dict:
"""1 cluster 비교 분석. LLM 2회 재시도 → 실패 시 fallback row.
Returns:
sanitized envelope dict (Plan §"LLM 프롬프트 출력 envelope") + llm_fallback_used.
"""
historical_docs = historical_docs or []
prompt = build_prompt(selected, historical_docs)
for attempt in range(2):
try:
raw = await _try_call_llm(client, prompt)
except asyncio.TimeoutError:
logger.warning(
f"LLM timeout {LLM_CALL_TIMEOUT}s "
f"(attempt={attempt + 1}, cluster size={len(cluster['members'])})"
)
continue
except Exception as e:
logger.warning(f"LLM 호출 실패 attempt={attempt + 1}: {e}")
continue
parsed = parse_json_response(raw)
sanitized = _sanitize_envelope(parsed, cluster) if parsed else None
if sanitized:
return sanitized
logger.warning(
f"envelope 검증 실패 attempt={attempt + 1} "
f"(raw_len={len(raw) if raw else 0}, parsed_keys={list(parsed.keys()) if isinstance(parsed, dict) else None})"
)
return _make_fallback(cluster)
+199
View File
@@ -0,0 +1,199 @@
"""야간 5h 수집 뉴스 윈도우 로드 + country 정규화 + (옵션) 과거 N일 후보 로드.
- KST 자정~05:00 사이 수집된 documents (source_channel='news' OR ai_domain='News').
- country canonical = document_chunks.country first non-null → news_sources prefix fallback (Phase 4 동일).
- ai_summary/embedding NULL 제외 (재요약/재임베딩 0회 원칙).
- 반환: doc dict 의 list (topic-first cluster 입력. country 는 각 dict 의 field).
- 과거 retrieval 용 historical doc 후보는 별도 함수 (BRIEFING_HISTORICAL_ENABLED on 시).
"""
from datetime import datetime
from typing import Any
import numpy as np
from sqlalchemy import text
from core.database import async_session
from core.utils import setup_logger
logger = setup_logger("briefing_loader")
_NEWS_WINDOW_SQL = text("""
SELECT
d.id,
d.title,
d.ai_summary,
d.embedding,
d.created_at,
d.edit_url,
d.ai_sub_group,
(
SELECT c.country
FROM document_chunks c
WHERE c.doc_id = d.id AND c.country IS NOT NULL
LIMIT 1
) AS chunk_country
FROM documents d
WHERE (d.source_channel = 'news' OR d.ai_domain = 'News')
AND d.deleted_at IS NULL
AND d.created_at >= :window_start
AND d.created_at < :window_end
AND d.embedding IS NOT NULL
AND d.ai_summary IS NOT NULL
""")
_SOURCE_COUNTRY_SQL = text("""
SELECT name, country FROM news_sources WHERE country IS NOT NULL
""")
_HISTORICAL_CANDIDATES_SQL = text("""
SELECT
d.id,
d.title,
d.ai_summary,
d.embedding,
d.created_at
FROM documents d
WHERE (d.source_channel = 'news' OR d.ai_domain = 'News')
AND d.deleted_at IS NULL
AND d.created_at >= :hist_start
AND d.created_at < :hist_end
AND d.embedding IS NOT NULL
AND d.ai_summary IS NOT NULL
""")
def _to_numpy_embedding(raw: Any) -> np.ndarray | None:
if raw is None:
return None
if isinstance(raw, str):
import json
try:
raw = json.loads(raw)
except json.JSONDecodeError:
return None
try:
arr = np.asarray(raw, dtype=np.float32)
except (TypeError, ValueError):
return None
if arr.size == 0:
return None
return arr
async def _load_source_country_map(session) -> dict[str, str]:
"""news_sources name → country prefix 매핑 (Phase 4 패턴 미러)."""
rows = await session.execute(_SOURCE_COUNTRY_SQL)
mapping: dict[str, str] = {}
for name, country in rows:
if not name or not country:
continue
prefix = name.split(" ")[0].strip()
if prefix and prefix not in mapping:
mapping[prefix] = country
tokens = name.split(" ")
if len(tokens) >= 3:
source_prefix = " ".join(tokens[:-1]).strip()
if source_prefix and source_prefix not in mapping:
mapping[source_prefix] = country
return mapping
async def load_night_window(
window_start: datetime,
window_end: datetime,
) -> list[dict]:
"""야간 윈도우 뉴스 docs 를 country 채워진 list 로 반환.
Returns:
[{id, title, ai_summary, embedding, created_at, edit_url, ai_sub_group, country}, ...]
country 매핑 실패한 doc 은 drop (cross-country 비교가 핵심이므로).
"""
docs: list[dict] = []
null_country = 0
async with async_session() as session:
source_country = await _load_source_country_map(session)
result = await session.execute(
_NEWS_WINDOW_SQL,
{"window_start": window_start, "window_end": window_end},
)
for row in result.mappings():
embedding = _to_numpy_embedding(row["embedding"])
if embedding is None:
continue
country = row["chunk_country"]
if not country:
ai_sub_group = (row["ai_sub_group"] or "").strip()
if ai_sub_group:
country = source_country.get(ai_sub_group)
if not country:
null_country += 1
continue
docs.append({
"id": int(row["id"]),
"title": row["title"] or "",
"ai_summary": row["ai_summary"] or "",
"embedding": embedding,
"created_at": row["created_at"],
"edit_url": row["edit_url"] or "",
"ai_sub_group": row["ai_sub_group"] or "",
"country": country.upper(),
})
if null_country:
logger.warning(
f"[loader] country 매핑 실패 drop {null_country}"
f"(chunk_country + news_sources prefix 둘 다 fail)"
)
logger.info(
f"[loader] night window {window_start} ~ {window_end}"
f"{len(docs)}건 ({len({d['country'] for d in docs})}개 국가)"
)
return docs
async def load_historical_candidates(
hist_start: datetime,
hist_end: datetime,
exclude_ids: set[int],
) -> list[dict]:
"""과거 N일 doc 후보 (BRIEFING_HISTORICAL_ENABLED=true 시만 호출).
cluster centroid 와 cosine 비교용 raw candidate pool. country 매핑 안 함
(LLM 분석 input 으로만 사용하고 표시 안 함).
Args:
exclude_ids: 오늘 윈도우 article id (중복 retrieval 회피).
Returns:
[{id, title, ai_summary, embedding, created_at}, ...]
"""
out: list[dict] = []
async with async_session() as session:
result = await session.execute(
_HISTORICAL_CANDIDATES_SQL,
{"hist_start": hist_start, "hist_end": hist_end},
)
for row in result.mappings():
doc_id = int(row["id"])
if doc_id in exclude_ids:
continue
embedding = _to_numpy_embedding(row["embedding"])
if embedding is None:
continue
out.append({
"id": doc_id,
"title": row["title"] or "",
"ai_summary": row["ai_summary"] or "",
"embedding": embedding,
"created_at": row["created_at"],
})
logger.info(f"[loader] historical candidates: {len(out)} docs (window {hist_start.date()} ~ {hist_end.date()})")
return out
+261
View File
@@ -0,0 +1,261 @@
"""야간 수집 뉴스 브리핑 파이프라인 (Plan §"PR-MorningBriefing-1 Backend").
흐름: load_night_window → cluster_global → select_for_llm (k=7) →
(옵션) historical retrieval → compare_cluster_with_fallback → DB save.
regenerate 정책: briefing_date UNIQUE 충돌 시 transaction 안에서 DELETE+INSERT.
"""
import time
from datetime import date, datetime, timedelta, timezone
from typing import Any
from zoneinfo import ZoneInfo
from sqlalchemy import delete
from ai.client import AIClient
from core.database import async_session
from core.utils import setup_logger
from models.briefing import BriefingTopic, MorningBriefing
from services.briefing.clustering import LAMBDA, cluster_global
from services.briefing.comparator import (
HISTORICAL_WINDOW_DAYS,
compare_cluster_with_fallback,
historical_enabled,
retrieve_historical,
)
from services.briefing.loader import load_historical_candidates, load_night_window
from services.digest.selection import select_for_llm
logger = setup_logger("briefing_pipeline")
KST = ZoneInfo("Asia/Seoul")
NIGHT_WINDOW_HOURS = 5 # KST 00:00 ~ 05:00
SELECT_K = 7 # Plan §"Clustering 파라미터" briefing K_PER_CLUSTER=7
SELECT_LAMBDA_MMR = 0.6 # Plan briefing MMR lambda 0.6
PIPELINE_HARD_CAP = 600 # 초. Phase 4 와 동일
def _compute_window(target_date: date | None = None) -> tuple[datetime, datetime, date]:
"""target_date (KST 자정 시작일) → (window_start_utc, window_end_utc, kst_date).
target_date=None 시 오늘 KST.
"""
if target_date is None:
target_date = datetime.now(KST).date()
start_kst = datetime.combine(target_date, datetime.min.time(), tzinfo=KST)
end_kst = start_kst + timedelta(hours=NIGHT_WINDOW_HOURS)
return start_kst.astimezone(timezone.utc), end_kst.astimezone(timezone.utc), target_date
def _is_usable_topic(envelope: dict, topic_label: str) -> bool:
"""fallback row 가 아닌 진짜 LLM 결과인지 판정."""
if envelope.get("llm_fallback_used"):
return False
if not envelope.get("country_perspectives"):
return False
if topic_label == "주요 뉴스 묶음":
return False
return True
def _compute_status(llm_calls: int, fallback_count: int, usable_count: int, has_topics: bool) -> str:
"""Plan §"Status 4-state 판정표"."""
if not has_topics or llm_calls == 0:
return "empty"
if usable_count == 0:
return "failed"
fallback_pct = (fallback_count / llm_calls) if llm_calls else 0.0
if fallback_pct >= 0.5:
return "failed"
if fallback_count > 0 or usable_count < llm_calls:
return "partial"
return "success"
def _build_topic_row(
rank: int,
cluster: dict,
envelope: dict,
historical_docs: list[dict] | None,
primary_model: str,
) -> BriefingTopic:
historical_ids = None
historical_window = None
if historical_enabled():
historical_ids = [d["id"] for d in (historical_docs or [])]
historical_window = HISTORICAL_WINDOW_DAYS
return BriefingTopic(
topic_rank=rank,
topic_label=envelope["topic_label"],
headline=envelope["headline"],
country_perspectives=envelope["country_perspectives"],
divergences=envelope["divergences"],
convergences=envelope["convergences"],
key_quotes=envelope["key_quotes"],
historical_article_ids=historical_ids,
historical_context=envelope.get("historical_context"),
historical_window_days=historical_window,
cluster_members=[m["id"] for m in cluster["members"]],
article_count=len(cluster["members"]),
country_count=cluster.get("country_count", 0),
importance_score=cluster.get("importance_score", 0.0),
raw_weight_sum=cluster.get("raw_weight_sum", 0.0),
llm_model=primary_model,
llm_fallback_used=envelope.get("llm_fallback_used", False),
)
async def _save_briefing(
briefing_date: date,
window_start: datetime,
window_end: datetime,
total_articles: int,
total_countries: int,
topic_rows: list[BriefingTopic],
llm_calls: int,
llm_failures: int,
generation_ms: int,
status: str,
) -> int:
"""briefing_date UNIQUE 충돌은 DELETE+INSERT transaction 으로 처리."""
async with async_session() as session:
await session.execute(
delete(MorningBriefing).where(MorningBriefing.briefing_date == briefing_date)
)
new = MorningBriefing(
briefing_date=briefing_date,
window_start=window_start,
window_end=window_end,
decay_lambda=LAMBDA,
total_articles=total_articles,
total_countries=total_countries,
total_topics=len(topic_rows),
generation_ms=generation_ms,
llm_calls=llm_calls,
llm_failures=llm_failures,
status=status,
)
new.topics = topic_rows
session.add(new)
await session.commit()
return new.id
async def run_briefing_pipeline(target_date: date | None = None) -> dict[str, Any]:
"""야간 뉴스 브리핑 1회 실행. cron 또는 수동 regenerate API 에서 호출.
Returns:
{briefing_id, status, total_topics, total_articles, llm_calls, llm_failures, generation_ms, regenerated}
"""
start = time.time()
window_start, window_end, briefing_date = _compute_window(target_date)
logger.info(
f"[briefing] start date={briefing_date} window {window_start} ~ {window_end} "
f"decay_lambda={LAMBDA:.4f} historical={'on' if historical_enabled() else 'off'}"
)
# 1. Load night window
docs = await load_night_window(window_start, window_end)
total_articles = len(docs)
total_countries_in_window = len({d["country"] for d in docs})
# 2. Cluster (topic-first)
clusters = cluster_global(docs)
if not clusters:
briefing_id = await _save_briefing(
briefing_date=briefing_date,
window_start=window_start,
window_end=window_end,
total_articles=total_articles,
total_countries=total_countries_in_window,
topic_rows=[],
llm_calls=0,
llm_failures=0,
generation_ms=int((time.time() - start) * 1000),
status="empty",
)
logger.info(f"[briefing] empty (no usable clusters) → briefing_id={briefing_id}")
return {
"briefing_id": briefing_id,
"status": "empty",
"total_topics": 0,
"total_articles": total_articles,
"llm_calls": 0,
"llm_failures": 0,
"generation_ms": int((time.time() - start) * 1000),
"regenerated": True,
}
# 3. (옵션) Historical candidate pool 1회 로드
historical_candidates: list[dict] = []
if historical_enabled():
hist_end = window_start # 오늘 윈도우 직전까지
hist_start = hist_end - timedelta(days=HISTORICAL_WINDOW_DAYS)
exclude = {d["id"] for d in docs}
historical_candidates = await load_historical_candidates(hist_start, hist_end, exclude)
# 4. cluster 별 LLM 호출
client = AIClient()
primary_model = client.ai.primary.model
topic_rows: list[BriefingTopic] = []
llm_calls = 0
llm_failures = 0
usable_count = 0
try:
for rank, cluster in enumerate(clusters, start=1):
selected = select_for_llm(cluster, k=SELECT_K, lambda_mmr=SELECT_LAMBDA_MMR)
historical_docs = (
retrieve_historical(cluster, historical_candidates)
if historical_enabled() else []
)
llm_calls += 1
envelope = await compare_cluster_with_fallback(
client, cluster, selected, historical_docs=historical_docs
)
if envelope.get("llm_fallback_used"):
llm_failures += 1
if _is_usable_topic(envelope, envelope["topic_label"]):
usable_count += 1
topic_rows.append(
_build_topic_row(rank, cluster, envelope, historical_docs, primary_model)
)
finally:
await client.close()
generation_ms = int((time.time() - start) * 1000)
status = _compute_status(llm_calls, llm_failures, usable_count, has_topics=bool(topic_rows))
briefing_id = await _save_briefing(
briefing_date=briefing_date,
window_start=window_start,
window_end=window_end,
total_articles=total_articles,
total_countries=total_countries_in_window,
topic_rows=topic_rows,
llm_calls=llm_calls,
llm_failures=llm_failures,
generation_ms=generation_ms,
status=status,
)
fallback_pct = (llm_failures / llm_calls * 100.0) if llm_calls else 0.0
logger.info(
f"[briefing] done id={briefing_id} status={status} topics={len(topic_rows)} "
f"usable={usable_count}/{llm_calls} fallback={llm_failures}/{llm_calls} ({fallback_pct:.1f}%) "
f"elapsed={generation_ms / 1000:.1f}s"
)
return {
"briefing_id": briefing_id,
"status": status,
"total_topics": len(topic_rows),
"total_articles": total_articles,
"llm_calls": llm_calls,
"llm_failures": llm_failures,
"generation_ms": generation_ms,
"regenerated": True,
}
+13 -7
View File
@@ -13,12 +13,20 @@ LAMBDA_MMR = 0.7 # relevance 70% / diversity 30%
SUMMARY_TRUNCATE = 300 # long tail ai_summary 방어
def select_for_llm(cluster: dict, k: int = K_PER_CLUSTER) -> list[dict]:
def select_for_llm(
cluster: dict,
k: int = K_PER_CLUSTER,
*,
lambda_mmr: float = LAMBDA_MMR,
summary_truncate: int = SUMMARY_TRUNCATE,
) -> list[dict]:
"""cluster 내 LLM 호출용 대표 article 들 선정.
Args:
cluster: clustering.cluster_country 결과 단일 cluster
k: 선정 개수 (기본 5)
cluster: clustering.cluster_country / briefing.cluster_global 결과 단일 cluster
k: 선정 개수 (Phase 4=5, briefing=7)
lambda_mmr: relevance vs diversity (Phase 4=0.7, briefing=0.6)
summary_truncate: ai_summary 자르기 길이 (LLM 토큰 보호)
Returns:
선정된 doc dict 리스트. 각 항목에 ai_summary_truncated 필드가 추가됨.
@@ -28,7 +36,6 @@ def select_for_llm(cluster: dict, k: int = K_PER_CLUSTER) -> list[dict]:
selected = list(members)
else:
centroid = cluster["centroid"]
# relevance = centroid 유사도 × decay weight
for m in members:
v = _normalize(m["embedding"])
m["_rel"] = float(np.dot(centroid, v)) * m["weight"]
@@ -44,14 +51,13 @@ def select_for_llm(cluster: dict, k: int = K_PER_CLUSTER) -> list[dict]:
float(np.dot(v, _normalize(s["embedding"])))
for s in selected
)
return LAMBDA_MMR * c["_rel"] - (1.0 - LAMBDA_MMR) * max_sim
return lambda_mmr * c["_rel"] - (1.0 - lambda_mmr) * max_sim
pick = max(candidates, key=mmr_score)
selected.append(pick)
candidates.remove(pick)
# LLM 입력 토큰 보호
for m in selected:
m["ai_summary_truncated"] = (m.get("ai_summary") or "")[:SUMMARY_TRUNCATE]
m["ai_summary_truncated"] = (m.get("ai_summary") or "")[:summary_truncate]
return selected
+43
View File
@@ -0,0 +1,43 @@
"""Morning Briefing 워커 — 야간 수집 뉴스 (KST 00:00~05:00) topic×country 비교 분석.
- APScheduler cron (매일 05:10 KST, PR-3 에서 등록) + 수동 호출 공용 진입점
- PIPELINE_HARD_CAP = 600초 hard cap 으로 cron stuck 절대 방지
- 단독 실행: `python -m workers.briefing_worker`
"""
import asyncio
from datetime import date
from core.utils import setup_logger
from services.briefing.pipeline import run_briefing_pipeline
logger = setup_logger("briefing_worker")
PIPELINE_HARD_CAP = 600
async def run(target_date: date | None = None) -> dict | None:
"""APScheduler + 수동 호출 공용 진입점.
Args:
target_date: KST 기준 briefing_date (None = 오늘). API regenerate 가 명시 지정 가능.
"""
try:
result = await asyncio.wait_for(
run_briefing_pipeline(target_date),
timeout=PIPELINE_HARD_CAP,
)
logger.info(f"[briefing] 워커 완료: {result}")
return result
except asyncio.TimeoutError:
logger.error(
f"[briefing] HARD CAP {PIPELINE_HARD_CAP}s 초과 — 워커 강제 중단. "
f"기존 briefing 은 commit 시점에만 갱신되므로 그대로 유지됨."
)
except Exception as e:
logger.exception(f"[briefing] 워커 실패: {e}")
return None
if __name__ == "__main__":
asyncio.run(run())