feat/morning-briefing-backend #10
@@ -0,0 +1,203 @@
|
||||
"""Morning Briefing API — read-only + 수동 regenerate.
|
||||
|
||||
엔드포인트:
|
||||
- GET /api/briefing/latest : 가장 최근 briefing
|
||||
- GET /api/briefing?date=YYYY-MM-DD : 특정 날짜 briefing
|
||||
- POST /api/briefing/regenerate?date=... : 동기 워커 트리거 (admin), DELETE+INSERT tx
|
||||
|
||||
응답은 topic 평면 list (axis 반대 — Phase 4 와 달리 country 그룹 X).
|
||||
각 topic 안에 country_perspectives JSONB 가 들어있어 cross-country 비교 분석을 표현.
|
||||
"""
|
||||
|
||||
from datetime import date as date_type
|
||||
from datetime import datetime
|
||||
from typing import Annotated
|
||||
|
||||
from fastapi import APIRouter, Depends, HTTPException, Query
|
||||
from pydantic import BaseModel
|
||||
from sqlalchemy import select
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
from sqlalchemy.orm import selectinload
|
||||
|
||||
from core.auth import get_current_user, require_admin
|
||||
from core.database import get_session
|
||||
from models.briefing import BriefingTopic, MorningBriefing
|
||||
from models.user import User
|
||||
|
||||
router = APIRouter()
|
||||
|
||||
|
||||
# ─── Pydantic 응답 모델 ───
|
||||
|
||||
|
||||
class CountryPerspective(BaseModel):
|
||||
country: str
|
||||
summary: str
|
||||
article_ids: list[int] = []
|
||||
|
||||
|
||||
class KeyQuote(BaseModel):
|
||||
country: str = ""
|
||||
source: str = ""
|
||||
quote: str
|
||||
|
||||
|
||||
class TopicResponse(BaseModel):
|
||||
topic_rank: int
|
||||
topic_label: str
|
||||
headline: str
|
||||
country_perspectives: list[CountryPerspective]
|
||||
divergences: list[str]
|
||||
convergences: list[str]
|
||||
key_quotes: list[KeyQuote]
|
||||
historical_context: str | None = None
|
||||
cluster_members: list[int] = []
|
||||
article_count: int
|
||||
country_count: int
|
||||
importance_score: float
|
||||
llm_fallback_used: bool
|
||||
|
||||
|
||||
class BriefingResponse(BaseModel):
|
||||
briefing_date: date_type
|
||||
window_start: datetime
|
||||
window_end: datetime
|
||||
decay_lambda: float
|
||||
total_articles: int
|
||||
total_countries: int
|
||||
total_topics: int
|
||||
generation_ms: int | None
|
||||
llm_calls: int
|
||||
llm_failures: int
|
||||
status: str
|
||||
headline_oneliner: str | None = None
|
||||
topics: list[TopicResponse]
|
||||
|
||||
|
||||
class RegenerateResponse(BaseModel):
|
||||
status: str
|
||||
briefing_id: int | None
|
||||
briefing_date: date_type
|
||||
total_topics: int
|
||||
total_articles: int
|
||||
llm_calls: int
|
||||
llm_failures: int
|
||||
generation_ms: int
|
||||
regenerated: bool
|
||||
|
||||
|
||||
# ─── helpers ───
|
||||
|
||||
|
||||
def _build_response(b: MorningBriefing) -> BriefingResponse:
|
||||
topics = []
|
||||
for t in sorted(b.topics, key=lambda x: x.topic_rank):
|
||||
topics.append(
|
||||
TopicResponse(
|
||||
topic_rank=t.topic_rank,
|
||||
topic_label=t.topic_label,
|
||||
headline=t.headline,
|
||||
country_perspectives=[
|
||||
CountryPerspective(**cp) for cp in (t.country_perspectives or [])
|
||||
],
|
||||
divergences=list(t.divergences or []),
|
||||
convergences=list(t.convergences or []),
|
||||
key_quotes=[KeyQuote(**q) for q in (t.key_quotes or [])],
|
||||
historical_context=t.historical_context,
|
||||
cluster_members=list(t.cluster_members or []),
|
||||
article_count=t.article_count,
|
||||
country_count=t.country_count,
|
||||
importance_score=t.importance_score,
|
||||
llm_fallback_used=t.llm_fallback_used,
|
||||
)
|
||||
)
|
||||
|
||||
return BriefingResponse(
|
||||
briefing_date=b.briefing_date,
|
||||
window_start=b.window_start,
|
||||
window_end=b.window_end,
|
||||
decay_lambda=b.decay_lambda,
|
||||
total_articles=b.total_articles,
|
||||
total_countries=b.total_countries,
|
||||
total_topics=b.total_topics,
|
||||
generation_ms=b.generation_ms,
|
||||
llm_calls=b.llm_calls,
|
||||
llm_failures=b.llm_failures,
|
||||
status=b.status,
|
||||
headline_oneliner=b.headline_oneliner,
|
||||
topics=topics,
|
||||
)
|
||||
|
||||
|
||||
async def _load_briefing(
|
||||
session: AsyncSession,
|
||||
target_date: date_type | None,
|
||||
) -> MorningBriefing | None:
|
||||
query = select(MorningBriefing).options(selectinload(MorningBriefing.topics))
|
||||
if target_date is not None:
|
||||
query = query.where(MorningBriefing.briefing_date == target_date)
|
||||
else:
|
||||
query = query.order_by(MorningBriefing.briefing_date.desc())
|
||||
query = query.limit(1)
|
||||
result = await session.execute(query)
|
||||
return result.scalar_one_or_none()
|
||||
|
||||
|
||||
# ─── Routes ───
|
||||
|
||||
|
||||
@router.get("/latest", response_model=BriefingResponse)
|
||||
async def get_latest(
|
||||
user: Annotated[User, Depends(get_current_user)],
|
||||
session: Annotated[AsyncSession, Depends(get_session)],
|
||||
):
|
||||
"""가장 최근 morning briefing."""
|
||||
b = await _load_briefing(session, target_date=None)
|
||||
if b is None:
|
||||
raise HTTPException(status_code=404, detail="아직 생성된 briefing 없음")
|
||||
return _build_response(b)
|
||||
|
||||
|
||||
@router.get("", response_model=BriefingResponse)
|
||||
async def get_briefing(
|
||||
user: Annotated[User, Depends(get_current_user)],
|
||||
session: Annotated[AsyncSession, Depends(get_session)],
|
||||
date: date_type | None = Query(default=None, description="YYYY-MM-DD (KST briefing_date)"),
|
||||
):
|
||||
"""특정 날짜 briefing (date 미지정 시 최신)."""
|
||||
b = await _load_briefing(session, target_date=date)
|
||||
if b is None:
|
||||
raise HTTPException(
|
||||
status_code=404,
|
||||
detail=f"briefing 없음 (date={date})" if date else "아직 생성된 briefing 없음",
|
||||
)
|
||||
return _build_response(b)
|
||||
|
||||
|
||||
@router.post("/regenerate", response_model=RegenerateResponse)
|
||||
async def regenerate(
|
||||
user: Annotated[User, Depends(require_admin)],
|
||||
date: date_type | None = Query(default=None, description="YYYY-MM-DD KST 기준 briefing_date"),
|
||||
):
|
||||
"""수동 트리거 (admin). 동기 실행 — delete+insert transaction.
|
||||
|
||||
date 미지정 시 오늘 KST. 같은 날 row 존재 시 transaction 안에서 삭제 후 신규 생성.
|
||||
응답 status='success' | 'partial' | 'failed' | 'empty'.
|
||||
"""
|
||||
from workers.briefing_worker import run
|
||||
|
||||
result = await run(target_date=date)
|
||||
if result is None:
|
||||
raise HTTPException(status_code=500, detail="briefing 워커 실행 실패 (로그 확인)")
|
||||
|
||||
return RegenerateResponse(
|
||||
status=result["status"],
|
||||
briefing_id=result.get("briefing_id"),
|
||||
briefing_date=date or datetime.now().date(),
|
||||
total_topics=result["total_topics"],
|
||||
total_articles=result["total_articles"],
|
||||
llm_calls=result["llm_calls"],
|
||||
llm_failures=result["llm_failures"],
|
||||
generation_ms=result["generation_ms"],
|
||||
regenerated=result.get("regenerated", True),
|
||||
)
|
||||
@@ -8,6 +8,7 @@ from sqlalchemy import func, select, text
|
||||
|
||||
from api.audio import router as audio_router
|
||||
from api.auth import router as auth_router
|
||||
from api.briefing import router as briefing_router
|
||||
from api.config import router as config_router
|
||||
from api.dashboard import router as dashboard_router
|
||||
from api.digest import router as digest_router
|
||||
@@ -135,6 +136,7 @@ app.include_router(dashboard_router, prefix="/api/dashboard", tags=["dashboard"]
|
||||
app.include_router(library_router, prefix="/api/library", tags=["library"])
|
||||
app.include_router(news_router, prefix="/api/news", tags=["news"])
|
||||
app.include_router(digest_router, prefix="/api/digest", tags=["digest"])
|
||||
app.include_router(briefing_router, prefix="/api/briefing", tags=["briefing"])
|
||||
app.include_router(audio_router, prefix="/api/audio", tags=["audio"])
|
||||
app.include_router(video_router, prefix="/api/video", tags=["video"])
|
||||
app.include_router(study_sessions_router, prefix="/api/study-sessions", tags=["study-sessions"])
|
||||
|
||||
@@ -0,0 +1,97 @@
|
||||
"""morning_briefings + briefing_topics 테이블 ORM (야간 수집 뉴스 브리핑).
|
||||
|
||||
axis 반대: Phase 4 = country×topic / Briefing = topic×country.
|
||||
country_perspectives JSONB 안에 한 topic 의 여러 국가 관점 array.
|
||||
"""
|
||||
|
||||
from datetime import date, datetime
|
||||
|
||||
from sqlalchemy import (
|
||||
BigInteger,
|
||||
Boolean,
|
||||
Date,
|
||||
DateTime,
|
||||
Float,
|
||||
ForeignKey,
|
||||
Integer,
|
||||
String,
|
||||
Text,
|
||||
)
|
||||
from sqlalchemy.dialects.postgresql import JSONB
|
||||
from sqlalchemy.orm import Mapped, mapped_column, relationship
|
||||
|
||||
from core.database import Base
|
||||
|
||||
|
||||
class MorningBriefing(Base):
|
||||
"""하루 단위 브리핑 메타데이터 (KST 자정~05:00 윈도우)"""
|
||||
|
||||
__tablename__ = "morning_briefings"
|
||||
|
||||
id: Mapped[int] = mapped_column(BigInteger, primary_key=True)
|
||||
briefing_date: Mapped[date] = mapped_column(Date, nullable=False, unique=True)
|
||||
window_start: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False)
|
||||
window_end: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False)
|
||||
decay_lambda: Mapped[float] = mapped_column(Float, nullable=False)
|
||||
|
||||
total_articles: Mapped[int] = mapped_column(Integer, nullable=False, default=0)
|
||||
total_countries: Mapped[int] = mapped_column(Integer, nullable=False, default=0)
|
||||
total_topics: Mapped[int] = mapped_column(Integer, nullable=False, default=0)
|
||||
|
||||
generation_ms: Mapped[int | None] = mapped_column(Integer)
|
||||
llm_calls: Mapped[int] = mapped_column(Integer, nullable=False, default=0)
|
||||
llm_failures: Mapped[int] = mapped_column(Integer, nullable=False, default=0)
|
||||
status: Mapped[str] = mapped_column(String(20), nullable=False, default="success")
|
||||
|
||||
headline_oneliner: Mapped[str | None] = mapped_column(Text)
|
||||
|
||||
created_at: Mapped[datetime] = mapped_column(
|
||||
DateTime(timezone=True), nullable=False, default=datetime.now
|
||||
)
|
||||
|
||||
topics: Mapped[list["BriefingTopic"]] = relationship(
|
||||
back_populates="briefing",
|
||||
cascade="all, delete-orphan",
|
||||
order_by="BriefingTopic.topic_rank",
|
||||
)
|
||||
|
||||
|
||||
class BriefingTopic(Base):
|
||||
"""1 briefing 안 topic_rank 순 cross-country 비교 분석 결과"""
|
||||
|
||||
__tablename__ = "briefing_topics"
|
||||
|
||||
id: Mapped[int] = mapped_column(BigInteger, primary_key=True)
|
||||
briefing_id: Mapped[int] = mapped_column(
|
||||
BigInteger,
|
||||
ForeignKey("morning_briefings.id", ondelete="CASCADE"),
|
||||
nullable=False,
|
||||
)
|
||||
|
||||
topic_rank: Mapped[int] = mapped_column(Integer, nullable=False)
|
||||
topic_label: Mapped[str] = mapped_column(String(120), nullable=False)
|
||||
headline: Mapped[str] = mapped_column(Text, nullable=False)
|
||||
|
||||
country_perspectives: Mapped[list] = mapped_column(JSONB, nullable=False, default=list)
|
||||
divergences: Mapped[list] = mapped_column(JSONB, nullable=False, default=list)
|
||||
convergences: Mapped[list] = mapped_column(JSONB, nullable=False, default=list)
|
||||
key_quotes: Mapped[list] = mapped_column(JSONB, nullable=False, default=list)
|
||||
|
||||
historical_article_ids: Mapped[list | None] = mapped_column(JSONB)
|
||||
historical_context: Mapped[str | None] = mapped_column(Text)
|
||||
historical_window_days: Mapped[int | None] = mapped_column(Integer)
|
||||
|
||||
cluster_members: Mapped[list] = mapped_column(JSONB, nullable=False, default=list)
|
||||
article_count: Mapped[int] = mapped_column(Integer, nullable=False)
|
||||
country_count: Mapped[int] = mapped_column(Integer, nullable=False)
|
||||
importance_score: Mapped[float] = mapped_column(Float, nullable=False)
|
||||
raw_weight_sum: Mapped[float] = mapped_column(Float, nullable=False)
|
||||
|
||||
llm_model: Mapped[str | None] = mapped_column(String(100))
|
||||
llm_fallback_used: Mapped[bool] = mapped_column(Boolean, nullable=False, default=False)
|
||||
|
||||
created_at: Mapped[datetime] = mapped_column(
|
||||
DateTime(timezone=True), nullable=False, default=datetime.now
|
||||
)
|
||||
|
||||
briefing: Mapped["MorningBriefing"] = relationship(back_populates="topics")
|
||||
@@ -0,0 +1,46 @@
|
||||
너는 다국적 뉴스 비교 분석가다.
|
||||
아래는 같은 주제로 군집된 야간 수집 뉴스들 — 각 줄 앞 (국가코드 · 소스) 표시로 출처가 표시되어 있다.
|
||||
이 정보만으로 cross-country 비교 분석을 JSON 으로만 출력하라.
|
||||
|
||||
목표:
|
||||
- 같은 사건을 각 나라가 어떻게 다르게 다루는지 / 무엇이 공통인지를 1페이지 카드 형태로 정리.
|
||||
- 사용자는 한국어 독자. 한국어로 출력.
|
||||
|
||||
절대 금지:
|
||||
- 제공된 summary 에 없는 사실 추가
|
||||
- 추측 표현 ("보인다", "~할 것이다", "~할 전망" 등)
|
||||
- JSON 외의 모든 텍스트 (설명, 마크다운, 코드블록 금지)
|
||||
- 인용부호 안 원문에 없던 단어 생성 (key_quotes 는 원문 그대로만)
|
||||
|
||||
분량 cap (반드시 지킬 것):
|
||||
- country_perspectives: 최대 10개, 각 summary 는 1~2문장 (한국어 120자 이내)
|
||||
- divergences: 최대 3개, 각 200자 이내
|
||||
- convergences: 최대 2개, 각 200자 이내
|
||||
- key_quotes: 최대 5개, 각 quote 240자 이내
|
||||
- historical_context: 1~2문장 (한국어 120자 이내), 의미 있을 때만 채우고 아니면 null
|
||||
|
||||
출력 형식 (JSON 객체 하나만 출력, 위 cap 초과 금지):
|
||||
{
|
||||
"topic_label": "5~10 단어의 한국어 토픽 제목",
|
||||
"headline": "전체를 한 줄로 압축한 한국어 headline (≤80자)",
|
||||
"country_perspectives": [
|
||||
{"country": "KR", "summary": "...", "article_ids": []},
|
||||
{"country": "US", "summary": "...", "article_ids": []}
|
||||
],
|
||||
"divergences": ["A국=X 강조 / B국=Y 비판 / C국=Z 부각"],
|
||||
"convergences": ["모든 매체가 Z 사실은 일치"],
|
||||
"key_quotes": [{"country": "US", "source": "NYT", "quote": "..."}],
|
||||
"historical_context": null
|
||||
}
|
||||
|
||||
규칙:
|
||||
- country_perspectives 의 country 는 입력 기사의 국가코드 그대로 (대문자).
|
||||
- article_ids 는 비워둬도 됨 (서버가 채움).
|
||||
- 단일 국가만 다룬 경우 divergences 는 빈 배열.
|
||||
- historical_context 는 아래 "이전 흐름 참고" 섹션이 비어있으면 반드시 null.
|
||||
|
||||
오늘 새벽 기사 묶음:
|
||||
{articles_block}
|
||||
|
||||
이전 흐름 참고 (직접 인용 금지, 맥락 파악 용도):
|
||||
{historical_block}
|
||||
@@ -0,0 +1,80 @@
|
||||
"""야간 뉴스 topic-first 클러스터링.
|
||||
|
||||
Phase 4 와 axis 반대: country 별 cluster 가 아닌 **전체 doc 합쳐서 topic cluster**.
|
||||
각 cluster 안에 country 분포가 자동으로 들어감 (doc dict 의 country field).
|
||||
|
||||
파라미터 (5h 윈도우용):
|
||||
- LAMBDA = ln(2)/2h ≈ 0.347 (2시간 반감기, 야간 5h 윈도우라 빠른 감쇠)
|
||||
- threshold = 0.78 고정 (Phase 4 0.75~0.80 중간값)
|
||||
- MIN_ARTICLES_PER_TOPIC = 2 (야간 sparse 대비 완화)
|
||||
- MIN_COUNTRIES_PER_TOPIC = 2 (cross-country 가치 핵심)
|
||||
- MAX_TOPICS = 7 (1페이지 분량)
|
||||
"""
|
||||
|
||||
import math
|
||||
|
||||
from core.utils import setup_logger
|
||||
from services.clustering_common import (
|
||||
greedy_assign_cluster,
|
||||
normalize_importance_scores,
|
||||
)
|
||||
|
||||
logger = setup_logger("briefing_clustering")
|
||||
|
||||
LAMBDA = math.log(2) / (2.0 / 24.0) # 2시간 반감기 (단위: 일)
|
||||
THRESHOLD = 0.78
|
||||
CENTROID_ALPHA = 0.7
|
||||
MIN_ARTICLES_PER_TOPIC = 2
|
||||
MIN_COUNTRIES_PER_TOPIC = 2
|
||||
MAX_TOPICS = 7
|
||||
|
||||
|
||||
def _count_distinct_countries(cluster: dict) -> int:
|
||||
return len({m.get("country") for m in cluster["members"] if m.get("country")})
|
||||
|
||||
|
||||
def cluster_global(docs: list[dict]) -> list[dict]:
|
||||
"""모든 country docs 를 합쳐 topic cluster 생성.
|
||||
|
||||
Args:
|
||||
docs: loader.load_night_window 의 출력 (각 dict 에 country field 포함).
|
||||
|
||||
Returns:
|
||||
[{centroid, members, weight_sum, raw_weight_sum, importance_score, country_count}, ...]
|
||||
- MIN_ARTICLES + MIN_COUNTRIES 둘 다 충족 cluster 만
|
||||
- importance_score 내림차순, MAX_TOPICS 개 cap
|
||||
"""
|
||||
if not docs:
|
||||
logger.info("[briefing] docs=0 → skip")
|
||||
return []
|
||||
|
||||
clusters, raw_count = greedy_assign_cluster(
|
||||
docs,
|
||||
threshold=THRESHOLD,
|
||||
centroid_alpha=CENTROID_ALPHA,
|
||||
min_articles=MIN_ARTICLES_PER_TOPIC,
|
||||
max_topics=MAX_TOPICS * 4, # MIN_COUNTRIES 필터 전 buffer
|
||||
lambda_val=LAMBDA,
|
||||
)
|
||||
|
||||
# MIN_COUNTRIES_PER_TOPIC 필터 — single-country cluster drop
|
||||
pre_country_filter = len(clusters)
|
||||
filtered = []
|
||||
for c in clusters:
|
||||
cc = _count_distinct_countries(c)
|
||||
if cc >= MIN_COUNTRIES_PER_TOPIC:
|
||||
c["country_count"] = cc
|
||||
filtered.append(c)
|
||||
clusters = filtered[:MAX_TOPICS]
|
||||
dropped_country = pre_country_filter - len(clusters)
|
||||
dropped_min_articles = raw_count - pre_country_filter
|
||||
|
||||
# MIN_COUNTRIES + MAX_TOPICS 필터 후 importance 재정규화 (briefing 내 0~1)
|
||||
normalize_importance_scores(clusters)
|
||||
|
||||
logger.info(
|
||||
f"[briefing] docs={len(docs)} threshold={THRESHOLD} "
|
||||
f"raw_clusters={raw_count} dropped_min_articles={dropped_min_articles} "
|
||||
f"dropped_single_country={dropped_country} kept={len(clusters)}"
|
||||
)
|
||||
return clusters
|
||||
@@ -0,0 +1,307 @@
|
||||
"""Cluster → 26B MLX 비교 분석 호출 + JSON envelope + historical context + fallback row.
|
||||
|
||||
Plan §"LLM Parse 실패 시 Fallback Topic Row (고정 형태)":
|
||||
LLM JSON parse 2회 재시도 후 실패 → 고정 형태 fallback 저장 (drop 금지).
|
||||
|
||||
Plan §"Historical Context":
|
||||
BRIEFING_HISTORICAL_ENABLED=true 시 cluster centroid 와 historical candidate
|
||||
cosine top-K 5 (similarity ≥0.70) 추출 → 프롬프트 {historical_block} 주입.
|
||||
LLM 응답 envelope 의 historical_context 옵션 필드.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
import os
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
import numpy as np
|
||||
|
||||
from ai.client import parse_json_response
|
||||
from core.utils import setup_logger
|
||||
from services.clustering_common import normalize_vector
|
||||
|
||||
logger = setup_logger("briefing_comparator")
|
||||
|
||||
LLM_CALL_TIMEOUT = 25 # 초. Phase 4 와 동일
|
||||
HISTORICAL_TOP_K = 5
|
||||
HISTORICAL_SIMILARITY_MIN = 0.70
|
||||
HISTORICAL_WINDOW_DAYS = 30
|
||||
|
||||
# JSON envelope cap (프롬프트 + 후처리 양쪽 강제)
|
||||
MAX_PERSPECTIVES = 10
|
||||
MAX_DIVERGENCES = 3
|
||||
MAX_CONVERGENCES = 2
|
||||
MAX_KEY_QUOTES = 5
|
||||
MAX_PERSPECTIVE_SUMMARY_LEN = 240 # 한국어 1~2문장 ≤120자 × 2
|
||||
MAX_HISTORICAL_CONTEXT_LEN = 240
|
||||
MAX_ARTICLE_IDS_PER_COUNTRY = 5 # country_perspectives[].article_ids 후처리 cap
|
||||
FALLBACK_HEADLINE = "LLM 분석 실패로 원문 기사 묶음만 표시합니다."
|
||||
FALLBACK_TOPIC_LABEL = "주요 뉴스 묶음"
|
||||
|
||||
_llm_sem = asyncio.Semaphore(1)
|
||||
_PROMPT_PATH = Path(__file__).resolve().parent.parent.parent / "prompts" / "briefing_comparative.txt"
|
||||
_PROMPT_TEMPLATE: str | None = None
|
||||
|
||||
|
||||
def historical_enabled() -> bool:
|
||||
return os.environ.get("BRIEFING_HISTORICAL_ENABLED", "false").lower() in {"1", "true", "yes"}
|
||||
|
||||
|
||||
def _load_prompt() -> str:
|
||||
global _PROMPT_TEMPLATE
|
||||
if _PROMPT_TEMPLATE is None:
|
||||
_PROMPT_TEMPLATE = _PROMPT_PATH.read_text(encoding="utf-8")
|
||||
return _PROMPT_TEMPLATE
|
||||
|
||||
|
||||
def _build_articles_block(selected: list[dict]) -> str:
|
||||
lines = []
|
||||
for i, m in enumerate(selected, start=1):
|
||||
country = m.get("country") or "??"
|
||||
source = m.get("ai_sub_group") or ""
|
||||
text = (m.get("ai_summary_truncated") or m.get("ai_summary") or m.get("title") or "").strip()
|
||||
lines.append(f"[{i}] ({country} · {source}) {text}")
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
def _build_historical_block(historical_docs: list[dict]) -> str:
|
||||
if not historical_docs:
|
||||
return "(과거 참고 자료 없음)"
|
||||
lines = ["※ 이전 30일 흐름 참고용 — 본 분석에서 직접 인용 금지, 맥락 파악 용도."]
|
||||
for i, d in enumerate(historical_docs, start=1):
|
||||
text = (d.get("ai_summary") or d.get("title") or "").strip()
|
||||
# historical 은 ai_summary 가 길 수 있어 200자 cap
|
||||
if len(text) > 200:
|
||||
text = text[:200] + "…"
|
||||
lines.append(f"[H{i}] {text}")
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
def build_prompt(selected: list[dict], historical_docs: list[dict]) -> str:
|
||||
template = _load_prompt()
|
||||
articles_block = _build_articles_block(selected)
|
||||
historical_block = _build_historical_block(historical_docs)
|
||||
return template.replace("{articles_block}", articles_block).replace(
|
||||
"{historical_block}", historical_block
|
||||
)
|
||||
|
||||
|
||||
def retrieve_historical(
|
||||
cluster: dict,
|
||||
candidates: list[dict],
|
||||
*,
|
||||
top_k: int = HISTORICAL_TOP_K,
|
||||
sim_min: float = HISTORICAL_SIMILARITY_MIN,
|
||||
) -> list[dict]:
|
||||
"""cluster centroid 와 candidate pool 의 cosine top-K (sim ≥ sim_min).
|
||||
|
||||
candidates 가 비어있거나 sim 미달 시 빈 list.
|
||||
"""
|
||||
if not candidates:
|
||||
return []
|
||||
centroid = cluster["centroid"]
|
||||
scored = []
|
||||
for d in candidates:
|
||||
v = normalize_vector(d["embedding"])
|
||||
sim = float(np.dot(centroid, v))
|
||||
if sim >= sim_min:
|
||||
scored.append((sim, d))
|
||||
scored.sort(key=lambda x: -x[0])
|
||||
return [d for _, d in scored[:top_k]]
|
||||
|
||||
|
||||
async def _try_call_llm(client: Any, prompt: str) -> str:
|
||||
async with _llm_sem:
|
||||
return await asyncio.wait_for(
|
||||
client.call_primary(prompt),
|
||||
timeout=LLM_CALL_TIMEOUT,
|
||||
)
|
||||
|
||||
|
||||
def _truncate_str(s: Any, limit: int) -> str:
|
||||
if not isinstance(s, str):
|
||||
return ""
|
||||
s = s.strip()
|
||||
if len(s) > limit:
|
||||
s = s[:limit].rstrip() + "…"
|
||||
return s
|
||||
|
||||
|
||||
def _country_article_id_map(cluster: dict) -> dict[str, list[int]]:
|
||||
"""cluster.members 를 country 별 article_id list 로 그룹 (weight 내림차순).
|
||||
|
||||
Phase 4 selection 단계에서 m['weight'] 가 채워져 있음. 누락 시 0.0 으로 fallback.
|
||||
"""
|
||||
grouped: dict[str, list[tuple[float, int]]] = {}
|
||||
for m in cluster.get("members", []):
|
||||
country = (m.get("country") or "").upper()
|
||||
if not country:
|
||||
continue
|
||||
weight = float(m.get("weight", 0.0))
|
||||
grouped.setdefault(country, []).append((weight, int(m["id"])))
|
||||
out: dict[str, list[int]] = {}
|
||||
for country, pairs in grouped.items():
|
||||
pairs.sort(key=lambda x: -x[0])
|
||||
out[country] = [doc_id for _, doc_id in pairs]
|
||||
return out
|
||||
|
||||
|
||||
def _resolve_article_ids(
|
||||
raw_ids: list,
|
||||
country: str,
|
||||
cluster_country_ids: dict[str, list[int]],
|
||||
) -> list[int]:
|
||||
"""country_perspectives[].article_ids 후처리.
|
||||
|
||||
1) LLM 이 준 id 가 cluster member 와 교집합인 것만 유지 (엉뚱한 id 차단).
|
||||
2) 비어있으면 같은 country 의 cluster member top weight N 개 자동 주입.
|
||||
3) 그래도 없으면 [] (country 매핑된 member 부재).
|
||||
"""
|
||||
cluster_ids = cluster_country_ids.get(country, [])
|
||||
cluster_id_set = set(cluster_ids)
|
||||
|
||||
# 1) LLM id ∩ cluster
|
||||
cleaned = []
|
||||
if isinstance(raw_ids, list):
|
||||
for x in raw_ids:
|
||||
try:
|
||||
doc_id = int(x)
|
||||
except (TypeError, ValueError):
|
||||
continue
|
||||
if doc_id in cluster_id_set and doc_id not in cleaned:
|
||||
cleaned.append(doc_id)
|
||||
if cleaned:
|
||||
return cleaned[:MAX_ARTICLE_IDS_PER_COUNTRY]
|
||||
|
||||
# 2) Country fallback top-N
|
||||
return cluster_ids[:MAX_ARTICLE_IDS_PER_COUNTRY]
|
||||
|
||||
|
||||
def _sanitize_envelope(parsed: dict, cluster: dict) -> dict | None:
|
||||
"""LLM 응답 envelope 검증 + cap 강제 + article_ids 후처리. None → fallback."""
|
||||
if not isinstance(parsed, dict):
|
||||
return None
|
||||
|
||||
topic_label = _truncate_str(parsed.get("topic_label"), 120)
|
||||
headline = _truncate_str(parsed.get("headline"), 200)
|
||||
if not topic_label or not headline:
|
||||
return None
|
||||
|
||||
# cluster.members 의 country → [id] 매핑을 미리 만들어 후처리 input 으로 사용
|
||||
country_ids_map = _country_article_id_map(cluster)
|
||||
|
||||
# country_perspectives
|
||||
raw_persp = parsed.get("country_perspectives")
|
||||
perspectives = []
|
||||
if isinstance(raw_persp, list):
|
||||
for p in raw_persp[:MAX_PERSPECTIVES]:
|
||||
if not isinstance(p, dict):
|
||||
continue
|
||||
country = _truncate_str(p.get("country"), 10).upper()
|
||||
summary = _truncate_str(p.get("summary"), MAX_PERSPECTIVE_SUMMARY_LEN)
|
||||
raw_ids = p.get("article_ids") or []
|
||||
article_ids = _resolve_article_ids(raw_ids, country, country_ids_map)
|
||||
if country and summary:
|
||||
perspectives.append({
|
||||
"country": country,
|
||||
"summary": summary,
|
||||
"article_ids": article_ids,
|
||||
})
|
||||
if not perspectives:
|
||||
return None
|
||||
|
||||
def _str_array(key: str, cap: int, item_limit: int) -> list[str]:
|
||||
raw = parsed.get(key)
|
||||
if not isinstance(raw, list):
|
||||
return []
|
||||
out = []
|
||||
for it in raw[:cap]:
|
||||
t = _truncate_str(it, item_limit)
|
||||
if t:
|
||||
out.append(t)
|
||||
return out
|
||||
|
||||
divergences = _str_array("divergences", MAX_DIVERGENCES, 200)
|
||||
convergences = _str_array("convergences", MAX_CONVERGENCES, 200)
|
||||
|
||||
# key_quotes: [{country, source, quote}]
|
||||
raw_quotes = parsed.get("key_quotes")
|
||||
quotes = []
|
||||
if isinstance(raw_quotes, list):
|
||||
for q in raw_quotes[:MAX_KEY_QUOTES]:
|
||||
if not isinstance(q, dict):
|
||||
continue
|
||||
entry = {
|
||||
"country": _truncate_str(q.get("country"), 10).upper(),
|
||||
"source": _truncate_str(q.get("source"), 60),
|
||||
"quote": _truncate_str(q.get("quote"), 240),
|
||||
}
|
||||
if entry["quote"]:
|
||||
quotes.append(entry)
|
||||
|
||||
historical_context = _truncate_str(parsed.get("historical_context"), MAX_HISTORICAL_CONTEXT_LEN) or None
|
||||
|
||||
return {
|
||||
"topic_label": topic_label,
|
||||
"headline": headline,
|
||||
"country_perspectives": perspectives,
|
||||
"divergences": divergences,
|
||||
"convergences": convergences,
|
||||
"key_quotes": quotes,
|
||||
"historical_context": historical_context,
|
||||
"llm_fallback_used": False,
|
||||
}
|
||||
|
||||
|
||||
def _make_fallback(cluster: dict) -> dict:
|
||||
"""Plan §"Fallback Topic Row (고정 형태)". drop 금지, country_perspectives 빈 list."""
|
||||
return {
|
||||
"topic_label": FALLBACK_TOPIC_LABEL,
|
||||
"headline": FALLBACK_HEADLINE,
|
||||
"country_perspectives": [],
|
||||
"divergences": [],
|
||||
"convergences": [],
|
||||
"key_quotes": [],
|
||||
"historical_context": None,
|
||||
"llm_fallback_used": True,
|
||||
}
|
||||
|
||||
|
||||
async def compare_cluster_with_fallback(
|
||||
client: Any,
|
||||
cluster: dict,
|
||||
selected: list[dict],
|
||||
historical_docs: list[dict] | None = None,
|
||||
) -> dict:
|
||||
"""1 cluster 비교 분석. LLM 2회 재시도 → 실패 시 fallback row.
|
||||
|
||||
Returns:
|
||||
sanitized envelope dict (Plan §"LLM 프롬프트 출력 envelope") + llm_fallback_used.
|
||||
"""
|
||||
historical_docs = historical_docs or []
|
||||
prompt = build_prompt(selected, historical_docs)
|
||||
|
||||
for attempt in range(2):
|
||||
try:
|
||||
raw = await _try_call_llm(client, prompt)
|
||||
except asyncio.TimeoutError:
|
||||
logger.warning(
|
||||
f"LLM timeout {LLM_CALL_TIMEOUT}s "
|
||||
f"(attempt={attempt + 1}, cluster size={len(cluster['members'])})"
|
||||
)
|
||||
continue
|
||||
except Exception as e:
|
||||
logger.warning(f"LLM 호출 실패 attempt={attempt + 1}: {e}")
|
||||
continue
|
||||
|
||||
parsed = parse_json_response(raw)
|
||||
sanitized = _sanitize_envelope(parsed, cluster) if parsed else None
|
||||
if sanitized:
|
||||
return sanitized
|
||||
logger.warning(
|
||||
f"envelope 검증 실패 attempt={attempt + 1} "
|
||||
f"(raw_len={len(raw) if raw else 0}, parsed_keys={list(parsed.keys()) if isinstance(parsed, dict) else None})"
|
||||
)
|
||||
|
||||
return _make_fallback(cluster)
|
||||
@@ -0,0 +1,199 @@
|
||||
"""야간 5h 수집 뉴스 윈도우 로드 + country 정규화 + (옵션) 과거 N일 후보 로드.
|
||||
|
||||
- KST 자정~05:00 사이 수집된 documents (source_channel='news' OR ai_domain='News').
|
||||
- country canonical = document_chunks.country first non-null → news_sources prefix fallback (Phase 4 동일).
|
||||
- ai_summary/embedding NULL 제외 (재요약/재임베딩 0회 원칙).
|
||||
- 반환: doc dict 의 list (topic-first cluster 입력. country 는 각 dict 의 field).
|
||||
- 과거 retrieval 용 historical doc 후보는 별도 함수 (BRIEFING_HISTORICAL_ENABLED on 시).
|
||||
"""
|
||||
|
||||
from datetime import datetime
|
||||
from typing import Any
|
||||
|
||||
import numpy as np
|
||||
from sqlalchemy import text
|
||||
|
||||
from core.database import async_session
|
||||
from core.utils import setup_logger
|
||||
|
||||
logger = setup_logger("briefing_loader")
|
||||
|
||||
|
||||
_NEWS_WINDOW_SQL = text("""
|
||||
SELECT
|
||||
d.id,
|
||||
d.title,
|
||||
d.ai_summary,
|
||||
d.embedding,
|
||||
d.created_at,
|
||||
d.edit_url,
|
||||
d.ai_sub_group,
|
||||
(
|
||||
SELECT c.country
|
||||
FROM document_chunks c
|
||||
WHERE c.doc_id = d.id AND c.country IS NOT NULL
|
||||
LIMIT 1
|
||||
) AS chunk_country
|
||||
FROM documents d
|
||||
WHERE (d.source_channel = 'news' OR d.ai_domain = 'News')
|
||||
AND d.deleted_at IS NULL
|
||||
AND d.created_at >= :window_start
|
||||
AND d.created_at < :window_end
|
||||
AND d.embedding IS NOT NULL
|
||||
AND d.ai_summary IS NOT NULL
|
||||
""")
|
||||
|
||||
|
||||
_SOURCE_COUNTRY_SQL = text("""
|
||||
SELECT name, country FROM news_sources WHERE country IS NOT NULL
|
||||
""")
|
||||
|
||||
|
||||
_HISTORICAL_CANDIDATES_SQL = text("""
|
||||
SELECT
|
||||
d.id,
|
||||
d.title,
|
||||
d.ai_summary,
|
||||
d.embedding,
|
||||
d.created_at
|
||||
FROM documents d
|
||||
WHERE (d.source_channel = 'news' OR d.ai_domain = 'News')
|
||||
AND d.deleted_at IS NULL
|
||||
AND d.created_at >= :hist_start
|
||||
AND d.created_at < :hist_end
|
||||
AND d.embedding IS NOT NULL
|
||||
AND d.ai_summary IS NOT NULL
|
||||
""")
|
||||
|
||||
|
||||
def _to_numpy_embedding(raw: Any) -> np.ndarray | None:
|
||||
if raw is None:
|
||||
return None
|
||||
if isinstance(raw, str):
|
||||
import json
|
||||
try:
|
||||
raw = json.loads(raw)
|
||||
except json.JSONDecodeError:
|
||||
return None
|
||||
try:
|
||||
arr = np.asarray(raw, dtype=np.float32)
|
||||
except (TypeError, ValueError):
|
||||
return None
|
||||
if arr.size == 0:
|
||||
return None
|
||||
return arr
|
||||
|
||||
|
||||
async def _load_source_country_map(session) -> dict[str, str]:
|
||||
"""news_sources name → country prefix 매핑 (Phase 4 패턴 미러)."""
|
||||
rows = await session.execute(_SOURCE_COUNTRY_SQL)
|
||||
mapping: dict[str, str] = {}
|
||||
for name, country in rows:
|
||||
if not name or not country:
|
||||
continue
|
||||
prefix = name.split(" ")[0].strip()
|
||||
if prefix and prefix not in mapping:
|
||||
mapping[prefix] = country
|
||||
tokens = name.split(" ")
|
||||
if len(tokens) >= 3:
|
||||
source_prefix = " ".join(tokens[:-1]).strip()
|
||||
if source_prefix and source_prefix not in mapping:
|
||||
mapping[source_prefix] = country
|
||||
return mapping
|
||||
|
||||
|
||||
async def load_night_window(
|
||||
window_start: datetime,
|
||||
window_end: datetime,
|
||||
) -> list[dict]:
|
||||
"""야간 윈도우 뉴스 docs 를 country 채워진 list 로 반환.
|
||||
|
||||
Returns:
|
||||
[{id, title, ai_summary, embedding, created_at, edit_url, ai_sub_group, country}, ...]
|
||||
country 매핑 실패한 doc 은 drop (cross-country 비교가 핵심이므로).
|
||||
"""
|
||||
docs: list[dict] = []
|
||||
null_country = 0
|
||||
|
||||
async with async_session() as session:
|
||||
source_country = await _load_source_country_map(session)
|
||||
|
||||
result = await session.execute(
|
||||
_NEWS_WINDOW_SQL,
|
||||
{"window_start": window_start, "window_end": window_end},
|
||||
)
|
||||
for row in result.mappings():
|
||||
embedding = _to_numpy_embedding(row["embedding"])
|
||||
if embedding is None:
|
||||
continue
|
||||
|
||||
country = row["chunk_country"]
|
||||
if not country:
|
||||
ai_sub_group = (row["ai_sub_group"] or "").strip()
|
||||
if ai_sub_group:
|
||||
country = source_country.get(ai_sub_group)
|
||||
if not country:
|
||||
null_country += 1
|
||||
continue
|
||||
|
||||
docs.append({
|
||||
"id": int(row["id"]),
|
||||
"title": row["title"] or "",
|
||||
"ai_summary": row["ai_summary"] or "",
|
||||
"embedding": embedding,
|
||||
"created_at": row["created_at"],
|
||||
"edit_url": row["edit_url"] or "",
|
||||
"ai_sub_group": row["ai_sub_group"] or "",
|
||||
"country": country.upper(),
|
||||
})
|
||||
|
||||
if null_country:
|
||||
logger.warning(
|
||||
f"[loader] country 매핑 실패 drop {null_country}건 "
|
||||
f"(chunk_country + news_sources prefix 둘 다 fail)"
|
||||
)
|
||||
logger.info(
|
||||
f"[loader] night window {window_start} ~ {window_end} → "
|
||||
f"{len(docs)}건 ({len({d['country'] for d in docs})}개 국가)"
|
||||
)
|
||||
return docs
|
||||
|
||||
|
||||
async def load_historical_candidates(
|
||||
hist_start: datetime,
|
||||
hist_end: datetime,
|
||||
exclude_ids: set[int],
|
||||
) -> list[dict]:
|
||||
"""과거 N일 doc 후보 (BRIEFING_HISTORICAL_ENABLED=true 시만 호출).
|
||||
|
||||
cluster centroid 와 cosine 비교용 raw candidate pool. country 매핑 안 함
|
||||
(LLM 분석 input 으로만 사용하고 표시 안 함).
|
||||
|
||||
Args:
|
||||
exclude_ids: 오늘 윈도우 article id (중복 retrieval 회피).
|
||||
|
||||
Returns:
|
||||
[{id, title, ai_summary, embedding, created_at}, ...]
|
||||
"""
|
||||
out: list[dict] = []
|
||||
async with async_session() as session:
|
||||
result = await session.execute(
|
||||
_HISTORICAL_CANDIDATES_SQL,
|
||||
{"hist_start": hist_start, "hist_end": hist_end},
|
||||
)
|
||||
for row in result.mappings():
|
||||
doc_id = int(row["id"])
|
||||
if doc_id in exclude_ids:
|
||||
continue
|
||||
embedding = _to_numpy_embedding(row["embedding"])
|
||||
if embedding is None:
|
||||
continue
|
||||
out.append({
|
||||
"id": doc_id,
|
||||
"title": row["title"] or "",
|
||||
"ai_summary": row["ai_summary"] or "",
|
||||
"embedding": embedding,
|
||||
"created_at": row["created_at"],
|
||||
})
|
||||
logger.info(f"[loader] historical candidates: {len(out)} docs (window {hist_start.date()} ~ {hist_end.date()})")
|
||||
return out
|
||||
@@ -0,0 +1,261 @@
|
||||
"""야간 수집 뉴스 브리핑 파이프라인 (Plan §"PR-MorningBriefing-1 Backend").
|
||||
|
||||
흐름: load_night_window → cluster_global → select_for_llm (k=7) →
|
||||
(옵션) historical retrieval → compare_cluster_with_fallback → DB save.
|
||||
|
||||
regenerate 정책: briefing_date UNIQUE 충돌 시 transaction 안에서 DELETE+INSERT.
|
||||
"""
|
||||
|
||||
import time
|
||||
from datetime import date, datetime, timedelta, timezone
|
||||
from typing import Any
|
||||
from zoneinfo import ZoneInfo
|
||||
|
||||
from sqlalchemy import delete
|
||||
|
||||
from ai.client import AIClient
|
||||
from core.database import async_session
|
||||
from core.utils import setup_logger
|
||||
from models.briefing import BriefingTopic, MorningBriefing
|
||||
from services.briefing.clustering import LAMBDA, cluster_global
|
||||
from services.briefing.comparator import (
|
||||
HISTORICAL_WINDOW_DAYS,
|
||||
compare_cluster_with_fallback,
|
||||
historical_enabled,
|
||||
retrieve_historical,
|
||||
)
|
||||
from services.briefing.loader import load_historical_candidates, load_night_window
|
||||
from services.digest.selection import select_for_llm
|
||||
|
||||
logger = setup_logger("briefing_pipeline")
|
||||
|
||||
KST = ZoneInfo("Asia/Seoul")
|
||||
NIGHT_WINDOW_HOURS = 5 # KST 00:00 ~ 05:00
|
||||
SELECT_K = 7 # Plan §"Clustering 파라미터" briefing K_PER_CLUSTER=7
|
||||
SELECT_LAMBDA_MMR = 0.6 # Plan briefing MMR lambda 0.6
|
||||
PIPELINE_HARD_CAP = 600 # 초. Phase 4 와 동일
|
||||
|
||||
|
||||
def _compute_window(target_date: date | None = None) -> tuple[datetime, datetime, date]:
|
||||
"""target_date (KST 자정 시작일) → (window_start_utc, window_end_utc, kst_date).
|
||||
|
||||
target_date=None 시 오늘 KST.
|
||||
"""
|
||||
if target_date is None:
|
||||
target_date = datetime.now(KST).date()
|
||||
start_kst = datetime.combine(target_date, datetime.min.time(), tzinfo=KST)
|
||||
end_kst = start_kst + timedelta(hours=NIGHT_WINDOW_HOURS)
|
||||
return start_kst.astimezone(timezone.utc), end_kst.astimezone(timezone.utc), target_date
|
||||
|
||||
|
||||
def _is_usable_topic(envelope: dict, topic_label: str) -> bool:
|
||||
"""fallback row 가 아닌 진짜 LLM 결과인지 판정."""
|
||||
if envelope.get("llm_fallback_used"):
|
||||
return False
|
||||
if not envelope.get("country_perspectives"):
|
||||
return False
|
||||
if topic_label == "주요 뉴스 묶음":
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
def _compute_status(llm_calls: int, fallback_count: int, usable_count: int, has_topics: bool) -> str:
|
||||
"""Plan §"Status 4-state 판정표"."""
|
||||
if not has_topics or llm_calls == 0:
|
||||
return "empty"
|
||||
if usable_count == 0:
|
||||
return "failed"
|
||||
fallback_pct = (fallback_count / llm_calls) if llm_calls else 0.0
|
||||
if fallback_pct >= 0.5:
|
||||
return "failed"
|
||||
if fallback_count > 0 or usable_count < llm_calls:
|
||||
return "partial"
|
||||
return "success"
|
||||
|
||||
|
||||
def _build_topic_row(
|
||||
rank: int,
|
||||
cluster: dict,
|
||||
envelope: dict,
|
||||
historical_docs: list[dict] | None,
|
||||
primary_model: str,
|
||||
) -> BriefingTopic:
|
||||
historical_ids = None
|
||||
historical_window = None
|
||||
if historical_enabled():
|
||||
historical_ids = [d["id"] for d in (historical_docs or [])]
|
||||
historical_window = HISTORICAL_WINDOW_DAYS
|
||||
|
||||
return BriefingTopic(
|
||||
topic_rank=rank,
|
||||
topic_label=envelope["topic_label"],
|
||||
headline=envelope["headline"],
|
||||
country_perspectives=envelope["country_perspectives"],
|
||||
divergences=envelope["divergences"],
|
||||
convergences=envelope["convergences"],
|
||||
key_quotes=envelope["key_quotes"],
|
||||
historical_article_ids=historical_ids,
|
||||
historical_context=envelope.get("historical_context"),
|
||||
historical_window_days=historical_window,
|
||||
cluster_members=[m["id"] for m in cluster["members"]],
|
||||
article_count=len(cluster["members"]),
|
||||
country_count=cluster.get("country_count", 0),
|
||||
importance_score=cluster.get("importance_score", 0.0),
|
||||
raw_weight_sum=cluster.get("raw_weight_sum", 0.0),
|
||||
llm_model=primary_model,
|
||||
llm_fallback_used=envelope.get("llm_fallback_used", False),
|
||||
)
|
||||
|
||||
|
||||
async def _save_briefing(
|
||||
briefing_date: date,
|
||||
window_start: datetime,
|
||||
window_end: datetime,
|
||||
total_articles: int,
|
||||
total_countries: int,
|
||||
topic_rows: list[BriefingTopic],
|
||||
llm_calls: int,
|
||||
llm_failures: int,
|
||||
generation_ms: int,
|
||||
status: str,
|
||||
) -> int:
|
||||
"""briefing_date UNIQUE 충돌은 DELETE+INSERT transaction 으로 처리."""
|
||||
async with async_session() as session:
|
||||
await session.execute(
|
||||
delete(MorningBriefing).where(MorningBriefing.briefing_date == briefing_date)
|
||||
)
|
||||
new = MorningBriefing(
|
||||
briefing_date=briefing_date,
|
||||
window_start=window_start,
|
||||
window_end=window_end,
|
||||
decay_lambda=LAMBDA,
|
||||
total_articles=total_articles,
|
||||
total_countries=total_countries,
|
||||
total_topics=len(topic_rows),
|
||||
generation_ms=generation_ms,
|
||||
llm_calls=llm_calls,
|
||||
llm_failures=llm_failures,
|
||||
status=status,
|
||||
)
|
||||
new.topics = topic_rows
|
||||
session.add(new)
|
||||
await session.commit()
|
||||
return new.id
|
||||
|
||||
|
||||
async def run_briefing_pipeline(target_date: date | None = None) -> dict[str, Any]:
|
||||
"""야간 뉴스 브리핑 1회 실행. cron 또는 수동 regenerate API 에서 호출.
|
||||
|
||||
Returns:
|
||||
{briefing_id, status, total_topics, total_articles, llm_calls, llm_failures, generation_ms, regenerated}
|
||||
"""
|
||||
start = time.time()
|
||||
window_start, window_end, briefing_date = _compute_window(target_date)
|
||||
logger.info(
|
||||
f"[briefing] start date={briefing_date} window {window_start} ~ {window_end} "
|
||||
f"decay_lambda={LAMBDA:.4f} historical={'on' if historical_enabled() else 'off'}"
|
||||
)
|
||||
|
||||
# 1. Load night window
|
||||
docs = await load_night_window(window_start, window_end)
|
||||
total_articles = len(docs)
|
||||
total_countries_in_window = len({d["country"] for d in docs})
|
||||
|
||||
# 2. Cluster (topic-first)
|
||||
clusters = cluster_global(docs)
|
||||
|
||||
if not clusters:
|
||||
briefing_id = await _save_briefing(
|
||||
briefing_date=briefing_date,
|
||||
window_start=window_start,
|
||||
window_end=window_end,
|
||||
total_articles=total_articles,
|
||||
total_countries=total_countries_in_window,
|
||||
topic_rows=[],
|
||||
llm_calls=0,
|
||||
llm_failures=0,
|
||||
generation_ms=int((time.time() - start) * 1000),
|
||||
status="empty",
|
||||
)
|
||||
logger.info(f"[briefing] empty (no usable clusters) → briefing_id={briefing_id}")
|
||||
return {
|
||||
"briefing_id": briefing_id,
|
||||
"status": "empty",
|
||||
"total_topics": 0,
|
||||
"total_articles": total_articles,
|
||||
"llm_calls": 0,
|
||||
"llm_failures": 0,
|
||||
"generation_ms": int((time.time() - start) * 1000),
|
||||
"regenerated": True,
|
||||
}
|
||||
|
||||
# 3. (옵션) Historical candidate pool 1회 로드
|
||||
historical_candidates: list[dict] = []
|
||||
if historical_enabled():
|
||||
hist_end = window_start # 오늘 윈도우 직전까지
|
||||
hist_start = hist_end - timedelta(days=HISTORICAL_WINDOW_DAYS)
|
||||
exclude = {d["id"] for d in docs}
|
||||
historical_candidates = await load_historical_candidates(hist_start, hist_end, exclude)
|
||||
|
||||
# 4. cluster 별 LLM 호출
|
||||
client = AIClient()
|
||||
primary_model = client.ai.primary.model
|
||||
topic_rows: list[BriefingTopic] = []
|
||||
llm_calls = 0
|
||||
llm_failures = 0
|
||||
usable_count = 0
|
||||
|
||||
try:
|
||||
for rank, cluster in enumerate(clusters, start=1):
|
||||
selected = select_for_llm(cluster, k=SELECT_K, lambda_mmr=SELECT_LAMBDA_MMR)
|
||||
historical_docs = (
|
||||
retrieve_historical(cluster, historical_candidates)
|
||||
if historical_enabled() else []
|
||||
)
|
||||
llm_calls += 1
|
||||
envelope = await compare_cluster_with_fallback(
|
||||
client, cluster, selected, historical_docs=historical_docs
|
||||
)
|
||||
if envelope.get("llm_fallback_used"):
|
||||
llm_failures += 1
|
||||
if _is_usable_topic(envelope, envelope["topic_label"]):
|
||||
usable_count += 1
|
||||
topic_rows.append(
|
||||
_build_topic_row(rank, cluster, envelope, historical_docs, primary_model)
|
||||
)
|
||||
finally:
|
||||
await client.close()
|
||||
|
||||
generation_ms = int((time.time() - start) * 1000)
|
||||
status = _compute_status(llm_calls, llm_failures, usable_count, has_topics=bool(topic_rows))
|
||||
|
||||
briefing_id = await _save_briefing(
|
||||
briefing_date=briefing_date,
|
||||
window_start=window_start,
|
||||
window_end=window_end,
|
||||
total_articles=total_articles,
|
||||
total_countries=total_countries_in_window,
|
||||
topic_rows=topic_rows,
|
||||
llm_calls=llm_calls,
|
||||
llm_failures=llm_failures,
|
||||
generation_ms=generation_ms,
|
||||
status=status,
|
||||
)
|
||||
|
||||
fallback_pct = (llm_failures / llm_calls * 100.0) if llm_calls else 0.0
|
||||
logger.info(
|
||||
f"[briefing] done id={briefing_id} status={status} topics={len(topic_rows)} "
|
||||
f"usable={usable_count}/{llm_calls} fallback={llm_failures}/{llm_calls} ({fallback_pct:.1f}%) "
|
||||
f"elapsed={generation_ms / 1000:.1f}s"
|
||||
)
|
||||
|
||||
return {
|
||||
"briefing_id": briefing_id,
|
||||
"status": status,
|
||||
"total_topics": len(topic_rows),
|
||||
"total_articles": total_articles,
|
||||
"llm_calls": llm_calls,
|
||||
"llm_failures": llm_failures,
|
||||
"generation_ms": generation_ms,
|
||||
"regenerated": True,
|
||||
}
|
||||
@@ -0,0 +1,124 @@
|
||||
"""Cluster 알고리즘 공통 util — digest(country×topic) / briefing(topic×country) 양쪽이 import.
|
||||
|
||||
추출 원칙:
|
||||
- digest.clustering.cluster_country / briefing.clustering.cluster_global 의 country 축은 caller 책임.
|
||||
- 본 모듈은 docs list (이미 분류된 슬라이스 또는 전체) 에 대한 순수 greedy assign + normalize.
|
||||
- LAMBDA / threshold / EMA alpha / MIN_ARTICLES 는 caller 가 주입 (Phase 4 = 3일 / Briefing = 2시간 등).
|
||||
"""
|
||||
|
||||
import math
|
||||
from datetime import datetime, timezone
|
||||
|
||||
import numpy as np
|
||||
|
||||
|
||||
SCORE_FLOOR = 0.01
|
||||
|
||||
|
||||
def normalize_vector(v: np.ndarray) -> np.ndarray:
|
||||
norm = float(np.linalg.norm(v))
|
||||
if norm == 0.0:
|
||||
return v
|
||||
return v / norm
|
||||
|
||||
|
||||
def time_decay_weight(now: datetime, created_at: datetime, lambda_val: float) -> float:
|
||||
"""exp(-λ · days_ago). created_at naive → UTC 가정."""
|
||||
if created_at.tzinfo is None:
|
||||
created_at = created_at.replace(tzinfo=timezone.utc)
|
||||
days = (now - created_at).total_seconds() / 86400.0
|
||||
if days < 0:
|
||||
days = 0.0
|
||||
return math.exp(-lambda_val * days)
|
||||
|
||||
|
||||
def adaptive_threshold_by_density(
|
||||
n_docs: int,
|
||||
*,
|
||||
low_n: int = 50,
|
||||
high_n: int = 200,
|
||||
t_low: float = 0.75,
|
||||
t_mid: float = 0.78,
|
||||
t_high: float = 0.80,
|
||||
) -> float:
|
||||
"""문서 밀도 기반 동적 threshold — fragmentation / blob 동시 방어."""
|
||||
if n_docs > high_n:
|
||||
return t_high
|
||||
if n_docs < low_n:
|
||||
return t_low
|
||||
return t_mid
|
||||
|
||||
|
||||
def greedy_assign_cluster(
|
||||
docs: list[dict],
|
||||
*,
|
||||
threshold: float,
|
||||
centroid_alpha: float = 0.7,
|
||||
min_articles: int = 3,
|
||||
max_topics: int = 10,
|
||||
now: datetime | None = None,
|
||||
lambda_val: float,
|
||||
) -> tuple[list[dict], int]:
|
||||
"""time-decay weight 적용 + greedy cosine assign + EMA centroid + MIN drop.
|
||||
|
||||
Args:
|
||||
docs: [{embedding: np.ndarray, created_at: datetime, ...}]. 함수가 in-place 로 `weight` 키 추가.
|
||||
threshold: cosine 유사도 cluster 병합 임계.
|
||||
centroid_alpha: EMA 계수 (0.7 = 기존 70% 유지).
|
||||
min_articles: cluster 당 최소 article 수 (미만 시 drop).
|
||||
max_topics: 상위 cluster 보존 개수.
|
||||
now: 기준 시각 (default = datetime.now(UTC)).
|
||||
lambda_val: time-decay λ (caller 가 윈도우 폭에 맞게 주입).
|
||||
|
||||
Returns:
|
||||
(clusters, raw_cluster_count_before_drop)
|
||||
clusters = [{centroid, members, weight_sum, raw_weight_sum, importance_score}, ...]
|
||||
"""
|
||||
if not docs:
|
||||
return [], 0
|
||||
|
||||
now = now or datetime.now(timezone.utc)
|
||||
|
||||
for d in docs:
|
||||
d["weight"] = time_decay_weight(now, d["created_at"], lambda_val)
|
||||
docs_sorted = sorted(docs, key=lambda d: -d["weight"])
|
||||
|
||||
clusters: list[dict] = []
|
||||
for d in docs_sorted:
|
||||
v = normalize_vector(d["embedding"])
|
||||
best_idx, best_sim = -1, 0.0
|
||||
for i, c in enumerate(clusters):
|
||||
sim = float(np.dot(c["centroid"], v))
|
||||
if sim > best_sim and sim >= threshold:
|
||||
best_sim, best_idx = sim, i
|
||||
if best_idx >= 0:
|
||||
c = clusters[best_idx]
|
||||
c["centroid"] = centroid_alpha * c["centroid"] + (1.0 - centroid_alpha) * v
|
||||
c["centroid"] = normalize_vector(c["centroid"])
|
||||
c["members"].append(d)
|
||||
c["weight_sum"] += d["weight"]
|
||||
else:
|
||||
clusters.append({
|
||||
"centroid": v,
|
||||
"members": [d],
|
||||
"weight_sum": d["weight"],
|
||||
})
|
||||
|
||||
raw_count = len(clusters)
|
||||
clusters = [c for c in clusters if len(c["members"]) >= min_articles]
|
||||
clusters.sort(key=lambda c: -c["weight_sum"])
|
||||
clusters = clusters[:max_topics]
|
||||
|
||||
normalize_importance_scores(clusters)
|
||||
return clusters, raw_count
|
||||
|
||||
|
||||
def normalize_importance_scores(clusters: list[dict], *, floor: float = SCORE_FLOOR) -> None:
|
||||
"""cluster.weight_sum 을 0~1 로 정규화 + floor. in-place. raw_weight_sum 보존."""
|
||||
if not clusters:
|
||||
return
|
||||
max_w = max(c["weight_sum"] for c in clusters)
|
||||
for c in clusters:
|
||||
normalized = (c["weight_sum"] / max_w) if max_w > 0 else 0.0
|
||||
c["raw_weight_sum"] = c["weight_sum"]
|
||||
c["importance_score"] = max(normalized, floor)
|
||||
@@ -1,20 +1,16 @@
|
||||
"""Time-decay weight + adaptive threshold + EMA centroid greedy clustering.
|
||||
"""Phase 4 Global Digest — country 내 topic cluster (time-decay + EMA + adaptive threshold).
|
||||
|
||||
플랜의 핵심 결정:
|
||||
- λ = ln(2)/3 (3일 반감기)
|
||||
- threshold: 0.75 / 0.78 / 0.80 (밀도 기반 adaptive)
|
||||
- centroid: EMA α=0.7 (단순 평균의 seed bias / drift 방어)
|
||||
- min_articles_per_topic = 3, max_topics_per_country = 10
|
||||
- importance_score: country 내 0~1 normalize + max(score, 0.01) floor
|
||||
- raw_weight_sum 별도 보존 (cross-day 트렌드 분석용)
|
||||
알고리즘 코어는 `app/services/clustering_common.py` 로 추출되어 briefing 모듈과 공유.
|
||||
본 파일은 Phase 4 고유 파라미터 (LAMBDA = ln(2)/3 일, MIN 3, MAX 10) 와 country 축 호출만 담당.
|
||||
"""
|
||||
|
||||
import math
|
||||
from datetime import datetime, timezone
|
||||
|
||||
import numpy as np
|
||||
|
||||
from core.utils import setup_logger
|
||||
from services.clustering_common import (
|
||||
adaptive_threshold_by_density,
|
||||
greedy_assign_cluster,
|
||||
)
|
||||
|
||||
logger = setup_logger("digest_clustering")
|
||||
|
||||
@@ -22,94 +18,32 @@ LAMBDA = math.log(2) / 3 # 3일 반감기 — 사용자 확정값
|
||||
CENTROID_ALPHA = 0.7 # EMA: 기존 중심 70% 유지, 새 멤버 30% 반영
|
||||
MIN_ARTICLES_PER_TOPIC = 3
|
||||
MAX_TOPICS_PER_COUNTRY = 10
|
||||
SCORE_FLOOR = 0.01 # UI 0 표시 문제 사전 차단
|
||||
|
||||
|
||||
def adaptive_threshold(n_docs: int) -> float:
|
||||
"""문서 밀도 기반 동적 threshold — fragmentation/blob 동시 방어."""
|
||||
if n_docs > 200:
|
||||
return 0.80
|
||||
if n_docs < 50:
|
||||
return 0.75
|
||||
return 0.78
|
||||
|
||||
|
||||
def _normalize(v: np.ndarray) -> np.ndarray:
|
||||
norm = float(np.linalg.norm(v))
|
||||
if norm == 0.0:
|
||||
return v
|
||||
return v / norm
|
||||
|
||||
|
||||
def _decay_weight(now: datetime, created_at: datetime) -> float:
|
||||
"""exp(-λ * days_ago). created_at 이 naive 면 UTC 가정."""
|
||||
if created_at.tzinfo is None:
|
||||
created_at = created_at.replace(tzinfo=timezone.utc)
|
||||
days = (now - created_at).total_seconds() / 86400.0
|
||||
if days < 0:
|
||||
days = 0.0
|
||||
return math.exp(-LAMBDA * days)
|
||||
"""Phase 4 임계 (0.75 / 0.78 / 0.80). 외부 import 호환용 alias."""
|
||||
return adaptive_threshold_by_density(n_docs)
|
||||
|
||||
|
||||
def cluster_country(country: str, docs: list[dict]) -> list[dict]:
|
||||
"""단일 country 의 docs 를 cluster 로 묶어 정렬 + normalize 후 반환.
|
||||
|
||||
Args:
|
||||
country: 국가 코드 (KR, US, ...)
|
||||
docs: loader.load_news_window 의 출력 (단일 country 슬라이스)
|
||||
|
||||
Returns:
|
||||
[{centroid, members, weight_sum, raw_weight_sum, importance_score}, ...]
|
||||
- members 는 weight 가 채워진 doc dict 리스트
|
||||
- 정렬: importance_score 내림차순, 최대 MAX_TOPICS_PER_COUNTRY 개
|
||||
공통 util `greedy_assign_cluster` 위에 country 라벨 로깅만 추가.
|
||||
"""
|
||||
if not docs:
|
||||
logger.info(f"[{country}] docs=0 → skip")
|
||||
return []
|
||||
|
||||
threshold = adaptive_threshold(len(docs))
|
||||
now = datetime.now(timezone.utc)
|
||||
|
||||
# time-decay weight 계산 + 가중치 높은 순으로 seed 우선
|
||||
for d in docs:
|
||||
d["weight"] = _decay_weight(now, d["created_at"])
|
||||
docs.sort(key=lambda d: -d["weight"])
|
||||
|
||||
clusters: list[dict] = []
|
||||
for d in docs:
|
||||
v = _normalize(d["embedding"])
|
||||
best_idx, best_sim = -1, 0.0
|
||||
for i, c in enumerate(clusters):
|
||||
sim = float(np.dot(c["centroid"], v))
|
||||
if sim > best_sim and sim >= threshold:
|
||||
best_sim, best_idx = sim, i
|
||||
if best_idx >= 0:
|
||||
c = clusters[best_idx]
|
||||
# EMA centroid update — drift 방지
|
||||
c["centroid"] = CENTROID_ALPHA * c["centroid"] + (1.0 - CENTROID_ALPHA) * v
|
||||
c["centroid"] = _normalize(c["centroid"])
|
||||
c["members"].append(d)
|
||||
c["weight_sum"] += d["weight"]
|
||||
else:
|
||||
clusters.append({
|
||||
"centroid": v,
|
||||
"members": [d],
|
||||
"weight_sum": d["weight"],
|
||||
})
|
||||
|
||||
raw_count = len(clusters)
|
||||
clusters = [c for c in clusters if len(c["members"]) >= MIN_ARTICLES_PER_TOPIC]
|
||||
clusters, raw_count = greedy_assign_cluster(
|
||||
docs,
|
||||
threshold=threshold,
|
||||
centroid_alpha=CENTROID_ALPHA,
|
||||
min_articles=MIN_ARTICLES_PER_TOPIC,
|
||||
max_topics=MAX_TOPICS_PER_COUNTRY,
|
||||
lambda_val=LAMBDA,
|
||||
)
|
||||
dropped = raw_count - len(clusters)
|
||||
clusters.sort(key=lambda c: -c["weight_sum"])
|
||||
clusters = clusters[:MAX_TOPICS_PER_COUNTRY]
|
||||
|
||||
# country 내 normalize (0~1) + floor
|
||||
if clusters:
|
||||
max_w = max(c["weight_sum"] for c in clusters)
|
||||
for c in clusters:
|
||||
normalized = (c["weight_sum"] / max_w) if max_w > 0 else 0.0
|
||||
c["raw_weight_sum"] = c["weight_sum"]
|
||||
c["importance_score"] = max(normalized, SCORE_FLOOR)
|
||||
|
||||
logger.info(
|
||||
f"[{country}] docs={len(docs)} threshold={threshold} "
|
||||
|
||||
@@ -6,24 +6,27 @@ ai_summary 길이는 LLM 토큰 보호를 위해 SUMMARY_TRUNCATE 로 제한.
|
||||
|
||||
import numpy as np
|
||||
|
||||
from services.clustering_common import normalize_vector as _normalize
|
||||
|
||||
K_PER_CLUSTER = 5
|
||||
LAMBDA_MMR = 0.7 # relevance 70% / diversity 30%
|
||||
SUMMARY_TRUNCATE = 300 # long tail ai_summary 방어
|
||||
|
||||
|
||||
def _normalize(v: np.ndarray) -> np.ndarray:
|
||||
norm = float(np.linalg.norm(v))
|
||||
if norm == 0.0:
|
||||
return v
|
||||
return v / norm
|
||||
|
||||
|
||||
def select_for_llm(cluster: dict, k: int = K_PER_CLUSTER) -> list[dict]:
|
||||
def select_for_llm(
|
||||
cluster: dict,
|
||||
k: int = K_PER_CLUSTER,
|
||||
*,
|
||||
lambda_mmr: float = LAMBDA_MMR,
|
||||
summary_truncate: int = SUMMARY_TRUNCATE,
|
||||
) -> list[dict]:
|
||||
"""cluster 내 LLM 호출용 대표 article 들 선정.
|
||||
|
||||
Args:
|
||||
cluster: clustering.cluster_country 결과 단일 cluster
|
||||
k: 선정 개수 (기본 5)
|
||||
cluster: clustering.cluster_country / briefing.cluster_global 결과 단일 cluster
|
||||
k: 선정 개수 (Phase 4=5, briefing=7)
|
||||
lambda_mmr: relevance vs diversity (Phase 4=0.7, briefing=0.6)
|
||||
summary_truncate: ai_summary 자르기 길이 (LLM 토큰 보호)
|
||||
|
||||
Returns:
|
||||
선정된 doc dict 리스트. 각 항목에 ai_summary_truncated 필드가 추가됨.
|
||||
@@ -33,7 +36,6 @@ def select_for_llm(cluster: dict, k: int = K_PER_CLUSTER) -> list[dict]:
|
||||
selected = list(members)
|
||||
else:
|
||||
centroid = cluster["centroid"]
|
||||
# relevance = centroid 유사도 × decay weight
|
||||
for m in members:
|
||||
v = _normalize(m["embedding"])
|
||||
m["_rel"] = float(np.dot(centroid, v)) * m["weight"]
|
||||
@@ -49,14 +51,13 @@ def select_for_llm(cluster: dict, k: int = K_PER_CLUSTER) -> list[dict]:
|
||||
float(np.dot(v, _normalize(s["embedding"])))
|
||||
for s in selected
|
||||
)
|
||||
return LAMBDA_MMR * c["_rel"] - (1.0 - LAMBDA_MMR) * max_sim
|
||||
return lambda_mmr * c["_rel"] - (1.0 - lambda_mmr) * max_sim
|
||||
|
||||
pick = max(candidates, key=mmr_score)
|
||||
selected.append(pick)
|
||||
candidates.remove(pick)
|
||||
|
||||
# LLM 입력 토큰 보호
|
||||
for m in selected:
|
||||
m["ai_summary_truncated"] = (m.get("ai_summary") or "")[:SUMMARY_TRUNCATE]
|
||||
m["ai_summary_truncated"] = (m.get("ai_summary") or "")[:summary_truncate]
|
||||
|
||||
return selected
|
||||
|
||||
@@ -0,0 +1,43 @@
|
||||
"""Morning Briefing 워커 — 야간 수집 뉴스 (KST 00:00~05:00) topic×country 비교 분석.
|
||||
|
||||
- APScheduler cron (매일 05:10 KST, PR-3 에서 등록) + 수동 호출 공용 진입점
|
||||
- PIPELINE_HARD_CAP = 600초 hard cap 으로 cron stuck 절대 방지
|
||||
- 단독 실행: `python -m workers.briefing_worker`
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
from datetime import date
|
||||
|
||||
from core.utils import setup_logger
|
||||
from services.briefing.pipeline import run_briefing_pipeline
|
||||
|
||||
logger = setup_logger("briefing_worker")
|
||||
|
||||
PIPELINE_HARD_CAP = 600
|
||||
|
||||
|
||||
async def run(target_date: date | None = None) -> dict | None:
|
||||
"""APScheduler + 수동 호출 공용 진입점.
|
||||
|
||||
Args:
|
||||
target_date: KST 기준 briefing_date (None = 오늘). API regenerate 가 명시 지정 가능.
|
||||
"""
|
||||
try:
|
||||
result = await asyncio.wait_for(
|
||||
run_briefing_pipeline(target_date),
|
||||
timeout=PIPELINE_HARD_CAP,
|
||||
)
|
||||
logger.info(f"[briefing] 워커 완료: {result}")
|
||||
return result
|
||||
except asyncio.TimeoutError:
|
||||
logger.error(
|
||||
f"[briefing] HARD CAP {PIPELINE_HARD_CAP}s 초과 — 워커 강제 중단. "
|
||||
f"기존 briefing 은 commit 시점에만 갱신되므로 그대로 유지됨."
|
||||
)
|
||||
except Exception as e:
|
||||
logger.exception(f"[briefing] 워커 실패: {e}")
|
||||
return None
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(run())
|
||||
@@ -0,0 +1,19 @@
|
||||
-- Morning Briefing 부모 테이블 (일일 1행, briefing_date UNIQUE).
|
||||
-- asyncpg prepared statement 가 multi-statement 불허라 인덱스 + 자식 테이블은 별 migration.
|
||||
CREATE TABLE morning_briefings (
|
||||
id BIGSERIAL PRIMARY KEY,
|
||||
briefing_date DATE NOT NULL,
|
||||
window_start TIMESTAMPTZ NOT NULL,
|
||||
window_end TIMESTAMPTZ NOT NULL,
|
||||
decay_lambda DOUBLE PRECISION NOT NULL,
|
||||
total_articles INTEGER NOT NULL DEFAULT 0,
|
||||
total_countries INTEGER NOT NULL DEFAULT 0,
|
||||
total_topics INTEGER NOT NULL DEFAULT 0,
|
||||
generation_ms INTEGER,
|
||||
llm_calls INTEGER NOT NULL DEFAULT 0,
|
||||
llm_failures INTEGER NOT NULL DEFAULT 0,
|
||||
status VARCHAR(20) NOT NULL DEFAULT 'success',
|
||||
headline_oneliner TEXT,
|
||||
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
|
||||
UNIQUE (briefing_date)
|
||||
)
|
||||
@@ -0,0 +1 @@
|
||||
CREATE INDEX idx_morning_briefings_date ON morning_briefings (briefing_date DESC)
|
||||
@@ -0,0 +1,26 @@
|
||||
-- Morning Briefing 자식 (topic_rank 순, UNIQUE briefing_id + topic_rank).
|
||||
-- country_perspectives/divergences/convergences/key_quotes JSONB (cap 은 application).
|
||||
-- historical_* 3 컬럼은 BRIEFING_HISTORICAL_ENABLED on 시만 채움 (nullable).
|
||||
CREATE TABLE briefing_topics (
|
||||
id BIGSERIAL PRIMARY KEY,
|
||||
briefing_id BIGINT NOT NULL REFERENCES morning_briefings(id) ON DELETE CASCADE,
|
||||
topic_rank INTEGER NOT NULL,
|
||||
topic_label VARCHAR(120) NOT NULL,
|
||||
headline TEXT NOT NULL,
|
||||
country_perspectives JSONB NOT NULL DEFAULT '[]',
|
||||
divergences JSONB NOT NULL DEFAULT '[]',
|
||||
convergences JSONB NOT NULL DEFAULT '[]',
|
||||
key_quotes JSONB NOT NULL DEFAULT '[]',
|
||||
historical_article_ids JSONB,
|
||||
historical_context TEXT,
|
||||
historical_window_days INTEGER,
|
||||
cluster_members JSONB NOT NULL DEFAULT '[]',
|
||||
article_count INTEGER NOT NULL,
|
||||
country_count INTEGER NOT NULL,
|
||||
importance_score DOUBLE PRECISION NOT NULL,
|
||||
raw_weight_sum DOUBLE PRECISION NOT NULL,
|
||||
llm_model VARCHAR(100),
|
||||
llm_fallback_used BOOLEAN NOT NULL DEFAULT FALSE,
|
||||
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
|
||||
UNIQUE (briefing_id, topic_rank)
|
||||
)
|
||||
@@ -0,0 +1 @@
|
||||
CREATE INDEX idx_briefing_topics_briefing_rank ON briefing_topics (briefing_id, topic_rank)
|
||||
@@ -0,0 +1,269 @@
|
||||
"""Briefing historical 분기 회귀 — Plan §"Verification 9".
|
||||
|
||||
3 경로 검증:
|
||||
1. flag off → retrieve_historical 호출 안 함, prompt {historical_block} = "(과거 참고 자료 없음)"
|
||||
2. flag on + fixture top-K → similarity ≥0.70 docs 만 반환
|
||||
3. flag on + zero match → 빈 list (no fallback hallucination)
|
||||
"""
|
||||
|
||||
import os
|
||||
import sys
|
||||
from datetime import datetime, timezone, timedelta
|
||||
from pathlib import Path
|
||||
|
||||
import numpy as np
|
||||
import pytest
|
||||
|
||||
# PYTHONPATH = /app (디렉토리 안에서 실행 가정 또는 sys.path 추가)
|
||||
APP_DIR = Path(__file__).resolve().parent.parent / "app"
|
||||
if str(APP_DIR) not in sys.path:
|
||||
sys.path.insert(0, str(APP_DIR))
|
||||
|
||||
from services.briefing.comparator import (
|
||||
HISTORICAL_SIMILARITY_MIN,
|
||||
HISTORICAL_TOP_K,
|
||||
_build_historical_block,
|
||||
_make_fallback,
|
||||
_sanitize_envelope,
|
||||
build_prompt,
|
||||
historical_enabled,
|
||||
retrieve_historical,
|
||||
)
|
||||
from services.clustering_common import normalize_vector
|
||||
|
||||
|
||||
def _make_doc(doc_id: int, embedding: np.ndarray, hours_ago: int = 1) -> dict:
|
||||
return {
|
||||
"id": doc_id,
|
||||
"title": f"doc {doc_id}",
|
||||
"ai_summary": f"summary {doc_id}",
|
||||
"embedding": embedding,
|
||||
"created_at": datetime.now(timezone.utc) - timedelta(hours=hours_ago),
|
||||
}
|
||||
|
||||
|
||||
def _make_cluster_with_centroid(centroid_vec: np.ndarray) -> dict:
|
||||
return {
|
||||
"centroid": normalize_vector(centroid_vec),
|
||||
"members": [],
|
||||
}
|
||||
|
||||
|
||||
def test_flag_default_off():
|
||||
"""env 미설정 → historical disabled."""
|
||||
os.environ.pop("BRIEFING_HISTORICAL_ENABLED", None)
|
||||
assert historical_enabled() is False
|
||||
|
||||
|
||||
def test_flag_on():
|
||||
os.environ["BRIEFING_HISTORICAL_ENABLED"] = "true"
|
||||
try:
|
||||
assert historical_enabled() is True
|
||||
finally:
|
||||
os.environ.pop("BRIEFING_HISTORICAL_ENABLED", None)
|
||||
|
||||
|
||||
def test_historical_block_empty_when_no_docs():
|
||||
"""경로 1: flag off 또는 historical_docs=[] → 빈 라벨."""
|
||||
block = _build_historical_block([])
|
||||
assert block == "(과거 참고 자료 없음)"
|
||||
|
||||
|
||||
def test_historical_block_has_label_when_docs():
|
||||
docs = [_make_doc(1, np.ones(1024, dtype=np.float32))]
|
||||
block = _build_historical_block(docs)
|
||||
assert "이전 30일 흐름" in block
|
||||
assert "직접 인용 금지" in block
|
||||
assert "[H1]" in block
|
||||
|
||||
|
||||
def test_retrieve_historical_topk():
|
||||
"""경로 2: flag on + fixture top-K similarity ≥ threshold."""
|
||||
# cluster centroid = 모두 1 방향
|
||||
centroid = np.ones(8, dtype=np.float32)
|
||||
cluster = _make_cluster_with_centroid(centroid)
|
||||
|
||||
# 후보 10개: 5개는 centroid 와 유사 (sim≈1.0), 5개는 직교 (sim≈0)
|
||||
similar_emb = np.ones(8, dtype=np.float32)
|
||||
orthogonal_emb = np.array([1, -1, 1, -1, 1, -1, 1, -1], dtype=np.float32)
|
||||
candidates = (
|
||||
[_make_doc(i, similar_emb + np.random.rand(8).astype(np.float32) * 0.01) for i in range(1, 6)]
|
||||
+ [_make_doc(10 + i, orthogonal_emb) for i in range(5)]
|
||||
)
|
||||
|
||||
out = retrieve_historical(cluster, candidates, top_k=5, sim_min=0.70)
|
||||
assert len(out) == 5
|
||||
# 모두 similar 그룹 (id 1~5) 만 선택됨
|
||||
selected_ids = {d["id"] for d in out}
|
||||
assert selected_ids.issubset({1, 2, 3, 4, 5})
|
||||
|
||||
|
||||
def test_retrieve_historical_zero_match():
|
||||
"""경로 3: 모든 candidate similarity < threshold → 빈 list."""
|
||||
centroid = np.ones(8, dtype=np.float32)
|
||||
cluster = _make_cluster_with_centroid(centroid)
|
||||
orthogonal_emb = np.array([1, -1, 1, -1, 1, -1, 1, -1], dtype=np.float32)
|
||||
candidates = [_make_doc(i, orthogonal_emb) for i in range(5)]
|
||||
|
||||
out = retrieve_historical(cluster, candidates, top_k=5, sim_min=0.70)
|
||||
assert out == []
|
||||
|
||||
|
||||
def test_retrieve_historical_empty_candidates():
|
||||
centroid = np.ones(8, dtype=np.float32)
|
||||
cluster = _make_cluster_with_centroid(centroid)
|
||||
assert retrieve_historical(cluster, [], top_k=5) == []
|
||||
|
||||
|
||||
def test_sanitize_envelope_valid():
|
||||
cluster = {"members": [{"id": 1}, {"id": 2}]}
|
||||
parsed = {
|
||||
"topic_label": "이란 충돌",
|
||||
"headline": "긴장 격화",
|
||||
"country_perspectives": [
|
||||
{"country": "kr", "summary": "유가 충격", "article_ids": [1]},
|
||||
{"country": "us", "summary": "외교 압박", "article_ids": [2]},
|
||||
],
|
||||
"divergences": ["KR=경제 / US=외교"],
|
||||
"convergences": ["민간 사상 우려 공통"],
|
||||
"key_quotes": [{"country": "US", "source": "NYT", "quote": "Tehran ..."}],
|
||||
"historical_context": "지난 3주 6회 공방",
|
||||
}
|
||||
sanitized = _sanitize_envelope(parsed, cluster)
|
||||
assert sanitized is not None
|
||||
assert sanitized["topic_label"] == "이란 충돌"
|
||||
# country 대문자 변환
|
||||
assert sanitized["country_perspectives"][0]["country"] == "KR"
|
||||
assert sanitized["historical_context"] == "지난 3주 6회 공방"
|
||||
assert sanitized["llm_fallback_used"] is False
|
||||
|
||||
|
||||
def test_sanitize_envelope_empty_perspectives_to_fallback():
|
||||
"""country_perspectives 비어 있으면 None (caller 가 fallback 발동)."""
|
||||
cluster = {"members": []}
|
||||
parsed = {
|
||||
"topic_label": "X",
|
||||
"headline": "Y",
|
||||
"country_perspectives": [],
|
||||
}
|
||||
assert _sanitize_envelope(parsed, cluster) is None
|
||||
|
||||
|
||||
def test_fallback_row_fixed_form():
|
||||
"""Plan §"Fallback Topic Row 고정 형태"."""
|
||||
cluster = {"members": [{"id": 1}]}
|
||||
fb = _make_fallback(cluster)
|
||||
assert fb["topic_label"] == "주요 뉴스 묶음"
|
||||
assert fb["country_perspectives"] == []
|
||||
assert fb["divergences"] == []
|
||||
assert fb["convergences"] == []
|
||||
assert fb["key_quotes"] == []
|
||||
assert fb["historical_context"] is None
|
||||
assert fb["llm_fallback_used"] is True
|
||||
|
||||
|
||||
def test_prompt_includes_both_blocks():
|
||||
selected = [_make_doc(1, np.ones(8, dtype=np.float32))]
|
||||
selected[0]["country"] = "KR"
|
||||
selected[0]["ai_sub_group"] = "경향신문"
|
||||
selected[0]["ai_summary_truncated"] = "오늘 한국 뉴스"
|
||||
|
||||
prompt = build_prompt(selected, historical_docs=[])
|
||||
assert "{articles_block}" not in prompt # 치환됨
|
||||
assert "{historical_block}" not in prompt
|
||||
assert "(KR · 경향신문)" in prompt
|
||||
assert "(과거 참고 자료 없음)" in prompt
|
||||
|
||||
|
||||
def test_perspective_summary_cap_enforced():
|
||||
"""sanitize 가 길이 cap 강제."""
|
||||
cluster = {"members": []}
|
||||
long_summary = "가" * 500 # 500자, cap=240
|
||||
parsed = {
|
||||
"topic_label": "T",
|
||||
"headline": "H",
|
||||
"country_perspectives": [{"country": "KR", "summary": long_summary, "article_ids": []}],
|
||||
}
|
||||
s = _sanitize_envelope(parsed, cluster)
|
||||
assert s is not None
|
||||
assert len(s["country_perspectives"][0]["summary"]) <= 241 # 240 + "…"
|
||||
|
||||
|
||||
def test_article_ids_fallback_when_llm_empty():
|
||||
"""LLM 이 article_ids 를 비워두면 같은 country cluster member top-N 자동 주입."""
|
||||
cluster = {
|
||||
"members": [
|
||||
{"id": 101, "country": "KR", "weight": 0.9},
|
||||
{"id": 102, "country": "KR", "weight": 0.8},
|
||||
{"id": 103, "country": "KR", "weight": 0.7},
|
||||
{"id": 201, "country": "US", "weight": 0.5},
|
||||
]
|
||||
}
|
||||
parsed = {
|
||||
"topic_label": "T",
|
||||
"headline": "H",
|
||||
"country_perspectives": [
|
||||
{"country": "KR", "summary": "한국 시각", "article_ids": []},
|
||||
{"country": "US", "summary": "미국 시각", "article_ids": []},
|
||||
],
|
||||
}
|
||||
s = _sanitize_envelope(parsed, cluster)
|
||||
assert s is not None
|
||||
kr = next(p for p in s["country_perspectives"] if p["country"] == "KR")
|
||||
us = next(p for p in s["country_perspectives"] if p["country"] == "US")
|
||||
assert kr["article_ids"] == [101, 102, 103] # weight desc
|
||||
assert us["article_ids"] == [201]
|
||||
|
||||
|
||||
def test_article_ids_intersect_with_cluster():
|
||||
"""LLM 이 엉뚱한 id 를 넣으면 cluster member 와 교집합만."""
|
||||
cluster = {
|
||||
"members": [
|
||||
{"id": 101, "country": "KR", "weight": 0.9},
|
||||
{"id": 102, "country": "KR", "weight": 0.8},
|
||||
]
|
||||
}
|
||||
parsed = {
|
||||
"topic_label": "T",
|
||||
"headline": "H",
|
||||
"country_perspectives": [
|
||||
{"country": "KR", "summary": "한국 시각", "article_ids": [101, 999, 888]},
|
||||
],
|
||||
}
|
||||
s = _sanitize_envelope(parsed, cluster)
|
||||
assert s is not None
|
||||
assert s["country_perspectives"][0]["article_ids"] == [101]
|
||||
|
||||
|
||||
def test_article_ids_capped_to_max():
|
||||
"""후처리 후에도 country 당 MAX_ARTICLE_IDS_PER_COUNTRY cap."""
|
||||
cluster = {
|
||||
"members": [
|
||||
{"id": i, "country": "KR", "weight": 1.0 / i} for i in range(1, 15)
|
||||
]
|
||||
}
|
||||
parsed = {
|
||||
"topic_label": "T",
|
||||
"headline": "H",
|
||||
"country_perspectives": [
|
||||
{"country": "KR", "summary": "한국 시각", "article_ids": []},
|
||||
],
|
||||
}
|
||||
s = _sanitize_envelope(parsed, cluster)
|
||||
assert s is not None
|
||||
from services.briefing.comparator import MAX_ARTICLE_IDS_PER_COUNTRY
|
||||
assert len(s["country_perspectives"][0]["article_ids"]) == MAX_ARTICLE_IDS_PER_COUNTRY
|
||||
|
||||
|
||||
def test_max_perspectives_cap():
|
||||
cluster = {"members": []}
|
||||
parsed = {
|
||||
"topic_label": "T",
|
||||
"headline": "H",
|
||||
"country_perspectives": [
|
||||
{"country": f"C{i}", "summary": "s", "article_ids": []} for i in range(20)
|
||||
],
|
||||
}
|
||||
s = _sanitize_envelope(parsed, cluster)
|
||||
assert s is not None
|
||||
assert len(s["country_perspectives"]) <= 10
|
||||
Reference in New Issue
Block a user