From 431d4fe010f177e4a73e5af95656aada01d3042a Mon Sep 17 00:00:00 2001 From: Hyungi Ahn Date: Tue, 12 May 2026 12:58:50 +0900 Subject: [PATCH] feat(briefing): add morning briefing schema + services + api (historical off) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 야간 수집 뉴스 (KST 00:00~05:00) topic×country 비교 분석 1페이지 카드. Phase 4 Global Digest 와 코드/로직/테이블 분리, 알고리즘만 services/clustering_common 공유. Backend 신규: - migrations/255_morning_briefings.sql: morning_briefings + briefing_topics (briefing_date UNIQUE, UNIQUE(briefing_id,topic_rank), FK CASCADE, historical_* 3컬럼 nullable, cluster_members JSONB, country_perspectives JSONB, status 4-state success|partial|failed|empty) - app/models/briefing.py: SQLAlchemy ORM - app/services/briefing/loader.py: KST 5h 윈도우 + news_sources prefix fallback (Phase 4 패턴 미러) + historical candidate pool 로더 - app/services/briefing/clustering.py: cluster_global topic-first (LAMBDA=ln(2)/2h, MIN_COUNTRIES_PER_TOPIC=2, MAX_TOPICS=7) - app/services/briefing/comparator.py: call_primary 26B + JSON envelope sanitize (cap perspectives 10 / divergences 3 / convergences 2 / quotes 5) + fallback row 고정 형태 + retrieve_historical cosine top-K - app/services/briefing/pipeline.py: load→cluster→select(K=7,λ=0.6) →historical→compare→status 4-state→delete+insert transaction - app/workers/briefing_worker.py: APScheduler/수동 호출 공용 진입점, 600s hard cap - app/prompts/briefing_comparative.txt: 한국어 비교 분석 JSON 프롬프트, {articles_block} + {historical_block} 2섹션, 인용 금지 라벨 - app/api/briefing.py: GET /latest, GET ?date=, POST /regenerate?date= (admin, sync delete+insert tx, regenerated:true) Backend 수정: - app/main.py: briefing_router 등록 (/api/briefing prefix). scheduler 등록은 PR-3 에서. - app/services/digest/selection.py: select_for_llm 매개변수화 (K, λ caller 주입). Phase 4 동작은 default 값으로 보존. Historical 정책: - BRIEFING_HISTORICAL_ENABLED env flag, default off. - flag off → historical_* 컬럼 모두 NULL, prompt {historical_block} 빈 라벨, retrieval 호출 안 함. - flag on (PR-1b 에서 enable) → cluster centroid 와 과거 30일 doc embedding cosine top-K 5 (sim≥0.70), prompt 에 주입. Country canonical (실측 확인 후): - documents.country 컬럼 부재 확정 - document_chunks.country 매칭률 0% (chunks 자체가 뉴스에 안 만들어짐) - 유일 country 신호 = news_sources prefix 매핑 (Phase 4 와 동일) Tests: - tests/test_briefing_historical.py: 3 경로 회귀 (flag off/on with fixture/on zero match) + sanitize cap + fallback row 형태. Verification: PR-1.8 에서 GPU 컨테이너 pytest + 수동 regenerate. --- app/api/briefing.py | 203 +++++++++++++++++++++ app/main.py | 2 + app/models/briefing.py | 97 ++++++++++ app/prompts/briefing_comparative.txt | 46 +++++ app/services/briefing/__init__.py | 0 app/services/briefing/clustering.py | 80 ++++++++ app/services/briefing/comparator.py | 252 ++++++++++++++++++++++++++ app/services/briefing/loader.py | 199 ++++++++++++++++++++ app/services/briefing/pipeline.py | 261 +++++++++++++++++++++++++++ app/services/digest/selection.py | 20 +- app/workers/briefing_worker.py | 43 +++++ migrations/255_morning_briefings.sql | 67 +++++++ tests/test_briefing_historical.py | 203 +++++++++++++++++++++ 13 files changed, 1466 insertions(+), 7 deletions(-) create mode 100644 app/api/briefing.py create mode 100644 app/models/briefing.py create mode 100644 app/prompts/briefing_comparative.txt create mode 100644 app/services/briefing/__init__.py create mode 100644 app/services/briefing/clustering.py create mode 100644 app/services/briefing/comparator.py create mode 100644 app/services/briefing/loader.py create mode 100644 app/services/briefing/pipeline.py create mode 100644 app/workers/briefing_worker.py create mode 100644 migrations/255_morning_briefings.sql create mode 100644 tests/test_briefing_historical.py diff --git a/app/api/briefing.py b/app/api/briefing.py new file mode 100644 index 0000000..867175d --- /dev/null +++ b/app/api/briefing.py @@ -0,0 +1,203 @@ +"""Morning Briefing API — read-only + 수동 regenerate. + +엔드포인트: +- GET /api/briefing/latest : 가장 최근 briefing +- GET /api/briefing?date=YYYY-MM-DD : 특정 날짜 briefing +- POST /api/briefing/regenerate?date=... : 동기 워커 트리거 (admin), DELETE+INSERT tx + +응답은 topic 평면 list (axis 반대 — Phase 4 와 달리 country 그룹 X). +각 topic 안에 country_perspectives JSONB 가 들어있어 cross-country 비교 분석을 표현. +""" + +from datetime import date as date_type +from datetime import datetime +from typing import Annotated + +from fastapi import APIRouter, Depends, HTTPException, Query +from pydantic import BaseModel +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession +from sqlalchemy.orm import selectinload + +from core.auth import get_current_user, require_admin +from core.database import get_session +from models.briefing import BriefingTopic, MorningBriefing +from models.user import User + +router = APIRouter() + + +# ─── Pydantic 응답 모델 ─── + + +class CountryPerspective(BaseModel): + country: str + summary: str + article_ids: list[int] = [] + + +class KeyQuote(BaseModel): + country: str = "" + source: str = "" + quote: str + + +class TopicResponse(BaseModel): + topic_rank: int + topic_label: str + headline: str + country_perspectives: list[CountryPerspective] + divergences: list[str] + convergences: list[str] + key_quotes: list[KeyQuote] + historical_context: str | None = None + cluster_members: list[int] = [] + article_count: int + country_count: int + importance_score: float + llm_fallback_used: bool + + +class BriefingResponse(BaseModel): + briefing_date: date_type + window_start: datetime + window_end: datetime + decay_lambda: float + total_articles: int + total_countries: int + total_topics: int + generation_ms: int | None + llm_calls: int + llm_failures: int + status: str + headline_oneliner: str | None = None + topics: list[TopicResponse] + + +class RegenerateResponse(BaseModel): + status: str + briefing_id: int | None + briefing_date: date_type + total_topics: int + total_articles: int + llm_calls: int + llm_failures: int + generation_ms: int + regenerated: bool + + +# ─── helpers ─── + + +def _build_response(b: MorningBriefing) -> BriefingResponse: + topics = [] + for t in sorted(b.topics, key=lambda x: x.topic_rank): + topics.append( + TopicResponse( + topic_rank=t.topic_rank, + topic_label=t.topic_label, + headline=t.headline, + country_perspectives=[ + CountryPerspective(**cp) for cp in (t.country_perspectives or []) + ], + divergences=list(t.divergences or []), + convergences=list(t.convergences or []), + key_quotes=[KeyQuote(**q) for q in (t.key_quotes or [])], + historical_context=t.historical_context, + cluster_members=list(t.cluster_members or []), + article_count=t.article_count, + country_count=t.country_count, + importance_score=t.importance_score, + llm_fallback_used=t.llm_fallback_used, + ) + ) + + return BriefingResponse( + briefing_date=b.briefing_date, + window_start=b.window_start, + window_end=b.window_end, + decay_lambda=b.decay_lambda, + total_articles=b.total_articles, + total_countries=b.total_countries, + total_topics=b.total_topics, + generation_ms=b.generation_ms, + llm_calls=b.llm_calls, + llm_failures=b.llm_failures, + status=b.status, + headline_oneliner=b.headline_oneliner, + topics=topics, + ) + + +async def _load_briefing( + session: AsyncSession, + target_date: date_type | None, +) -> MorningBriefing | None: + query = select(MorningBriefing).options(selectinload(MorningBriefing.topics)) + if target_date is not None: + query = query.where(MorningBriefing.briefing_date == target_date) + else: + query = query.order_by(MorningBriefing.briefing_date.desc()) + query = query.limit(1) + result = await session.execute(query) + return result.scalar_one_or_none() + + +# ─── Routes ─── + + +@router.get("/latest", response_model=BriefingResponse) +async def get_latest( + user: Annotated[User, Depends(get_current_user)], + session: Annotated[AsyncSession, Depends(get_session)], +): + """가장 최근 morning briefing.""" + b = await _load_briefing(session, target_date=None) + if b is None: + raise HTTPException(status_code=404, detail="아직 생성된 briefing 없음") + return _build_response(b) + + +@router.get("", response_model=BriefingResponse) +async def get_briefing( + user: Annotated[User, Depends(get_current_user)], + session: Annotated[AsyncSession, Depends(get_session)], + date: date_type | None = Query(default=None, description="YYYY-MM-DD (KST briefing_date)"), +): + """특정 날짜 briefing (date 미지정 시 최신).""" + b = await _load_briefing(session, target_date=date) + if b is None: + raise HTTPException( + status_code=404, + detail=f"briefing 없음 (date={date})" if date else "아직 생성된 briefing 없음", + ) + return _build_response(b) + + +@router.post("/regenerate", response_model=RegenerateResponse) +async def regenerate( + user: Annotated[User, Depends(require_admin)], + date: date_type | None = Query(default=None, description="YYYY-MM-DD KST 기준 briefing_date"), +): + """수동 트리거 (admin). 동기 실행 — delete+insert transaction. + + date 미지정 시 오늘 KST. 같은 날 row 존재 시 transaction 안에서 삭제 후 신규 생성. + 응답 status='success' | 'partial' | 'failed' | 'empty'. + """ + from workers.briefing_worker import run + + result = await run(target_date=date) + if result is None: + raise HTTPException(status_code=500, detail="briefing 워커 실행 실패 (로그 확인)") + + return RegenerateResponse( + status=result["status"], + briefing_id=result.get("briefing_id"), + briefing_date=date or datetime.now().date(), + total_topics=result["total_topics"], + total_articles=result["total_articles"], + llm_calls=result["llm_calls"], + llm_failures=result["llm_failures"], + generation_ms=result["generation_ms"], + regenerated=result.get("regenerated", True), + ) diff --git a/app/main.py b/app/main.py index 1c7bcb1..c4795f7 100644 --- a/app/main.py +++ b/app/main.py @@ -8,6 +8,7 @@ from sqlalchemy import func, select, text from api.audio import router as audio_router from api.auth import router as auth_router +from api.briefing import router as briefing_router from api.config import router as config_router from api.dashboard import router as dashboard_router from api.digest import router as digest_router @@ -135,6 +136,7 @@ app.include_router(dashboard_router, prefix="/api/dashboard", tags=["dashboard"] app.include_router(library_router, prefix="/api/library", tags=["library"]) app.include_router(news_router, prefix="/api/news", tags=["news"]) app.include_router(digest_router, prefix="/api/digest", tags=["digest"]) +app.include_router(briefing_router, prefix="/api/briefing", tags=["briefing"]) app.include_router(audio_router, prefix="/api/audio", tags=["audio"]) app.include_router(video_router, prefix="/api/video", tags=["video"]) app.include_router(study_sessions_router, prefix="/api/study-sessions", tags=["study-sessions"]) diff --git a/app/models/briefing.py b/app/models/briefing.py new file mode 100644 index 0000000..5958bf3 --- /dev/null +++ b/app/models/briefing.py @@ -0,0 +1,97 @@ +"""morning_briefings + briefing_topics 테이블 ORM (야간 수집 뉴스 브리핑). + +axis 반대: Phase 4 = country×topic / Briefing = topic×country. +country_perspectives JSONB 안에 한 topic 의 여러 국가 관점 array. +""" + +from datetime import date, datetime + +from sqlalchemy import ( + BigInteger, + Boolean, + Date, + DateTime, + Float, + ForeignKey, + Integer, + String, + Text, +) +from sqlalchemy.dialects.postgresql import JSONB +from sqlalchemy.orm import Mapped, mapped_column, relationship + +from core.database import Base + + +class MorningBriefing(Base): + """하루 단위 브리핑 메타데이터 (KST 자정~05:00 윈도우)""" + + __tablename__ = "morning_briefings" + + id: Mapped[int] = mapped_column(BigInteger, primary_key=True) + briefing_date: Mapped[date] = mapped_column(Date, nullable=False, unique=True) + window_start: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False) + window_end: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False) + decay_lambda: Mapped[float] = mapped_column(Float, nullable=False) + + total_articles: Mapped[int] = mapped_column(Integer, nullable=False, default=0) + total_countries: Mapped[int] = mapped_column(Integer, nullable=False, default=0) + total_topics: Mapped[int] = mapped_column(Integer, nullable=False, default=0) + + generation_ms: Mapped[int | None] = mapped_column(Integer) + llm_calls: Mapped[int] = mapped_column(Integer, nullable=False, default=0) + llm_failures: Mapped[int] = mapped_column(Integer, nullable=False, default=0) + status: Mapped[str] = mapped_column(String(20), nullable=False, default="success") + + headline_oneliner: Mapped[str | None] = mapped_column(Text) + + created_at: Mapped[datetime] = mapped_column( + DateTime(timezone=True), nullable=False, default=datetime.now + ) + + topics: Mapped[list["BriefingTopic"]] = relationship( + back_populates="briefing", + cascade="all, delete-orphan", + order_by="BriefingTopic.topic_rank", + ) + + +class BriefingTopic(Base): + """1 briefing 안 topic_rank 순 cross-country 비교 분석 결과""" + + __tablename__ = "briefing_topics" + + id: Mapped[int] = mapped_column(BigInteger, primary_key=True) + briefing_id: Mapped[int] = mapped_column( + BigInteger, + ForeignKey("morning_briefings.id", ondelete="CASCADE"), + nullable=False, + ) + + topic_rank: Mapped[int] = mapped_column(Integer, nullable=False) + topic_label: Mapped[str] = mapped_column(String(120), nullable=False) + headline: Mapped[str] = mapped_column(Text, nullable=False) + + country_perspectives: Mapped[list] = mapped_column(JSONB, nullable=False, default=list) + divergences: Mapped[list] = mapped_column(JSONB, nullable=False, default=list) + convergences: Mapped[list] = mapped_column(JSONB, nullable=False, default=list) + key_quotes: Mapped[list] = mapped_column(JSONB, nullable=False, default=list) + + historical_article_ids: Mapped[list | None] = mapped_column(JSONB) + historical_context: Mapped[str | None] = mapped_column(Text) + historical_window_days: Mapped[int | None] = mapped_column(Integer) + + cluster_members: Mapped[list] = mapped_column(JSONB, nullable=False, default=list) + article_count: Mapped[int] = mapped_column(Integer, nullable=False) + country_count: Mapped[int] = mapped_column(Integer, nullable=False) + importance_score: Mapped[float] = mapped_column(Float, nullable=False) + raw_weight_sum: Mapped[float] = mapped_column(Float, nullable=False) + + llm_model: Mapped[str | None] = mapped_column(String(100)) + llm_fallback_used: Mapped[bool] = mapped_column(Boolean, nullable=False, default=False) + + created_at: Mapped[datetime] = mapped_column( + DateTime(timezone=True), nullable=False, default=datetime.now + ) + + briefing: Mapped["MorningBriefing"] = relationship(back_populates="topics") diff --git a/app/prompts/briefing_comparative.txt b/app/prompts/briefing_comparative.txt new file mode 100644 index 0000000..abc7afb --- /dev/null +++ b/app/prompts/briefing_comparative.txt @@ -0,0 +1,46 @@ +너는 다국적 뉴스 비교 분석가다. +아래는 같은 주제로 군집된 야간 수집 뉴스들 — 각 줄 앞 (국가코드 · 소스) 표시로 출처가 표시되어 있다. +이 정보만으로 cross-country 비교 분석을 JSON 으로만 출력하라. + +목표: +- 같은 사건을 각 나라가 어떻게 다르게 다루는지 / 무엇이 공통인지를 1페이지 카드 형태로 정리. +- 사용자는 한국어 독자. 한국어로 출력. + +절대 금지: +- 제공된 summary 에 없는 사실 추가 +- 추측 표현 ("보인다", "~할 것이다", "~할 전망" 등) +- JSON 외의 모든 텍스트 (설명, 마크다운, 코드블록 금지) +- 인용부호 안 원문에 없던 단어 생성 (key_quotes 는 원문 그대로만) + +분량 cap (반드시 지킬 것): +- country_perspectives: 최대 10개, 각 summary 는 1~2문장 (한국어 120자 이내) +- divergences: 최대 3개, 각 200자 이내 +- convergences: 최대 2개, 각 200자 이내 +- key_quotes: 최대 5개, 각 quote 240자 이내 +- historical_context: 1~2문장 (한국어 120자 이내), 의미 있을 때만 채우고 아니면 null + +출력 형식 (JSON 객체 하나만 출력, 위 cap 초과 금지): +{ + "topic_label": "5~10 단어의 한국어 토픽 제목", + "headline": "전체를 한 줄로 압축한 한국어 headline (≤80자)", + "country_perspectives": [ + {"country": "KR", "summary": "...", "article_ids": []}, + {"country": "US", "summary": "...", "article_ids": []} + ], + "divergences": ["A국=X 강조 / B국=Y 비판 / C국=Z 부각"], + "convergences": ["모든 매체가 Z 사실은 일치"], + "key_quotes": [{"country": "US", "source": "NYT", "quote": "..."}], + "historical_context": null +} + +규칙: +- country_perspectives 의 country 는 입력 기사의 국가코드 그대로 (대문자). +- article_ids 는 비워둬도 됨 (서버가 채움). +- 단일 국가만 다룬 경우 divergences 는 빈 배열. +- historical_context 는 아래 "이전 흐름 참고" 섹션이 비어있으면 반드시 null. + +오늘 새벽 기사 묶음: +{articles_block} + +이전 흐름 참고 (직접 인용 금지, 맥락 파악 용도): +{historical_block} diff --git a/app/services/briefing/__init__.py b/app/services/briefing/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/app/services/briefing/clustering.py b/app/services/briefing/clustering.py new file mode 100644 index 0000000..2b62569 --- /dev/null +++ b/app/services/briefing/clustering.py @@ -0,0 +1,80 @@ +"""야간 뉴스 topic-first 클러스터링. + +Phase 4 와 axis 반대: country 별 cluster 가 아닌 **전체 doc 합쳐서 topic cluster**. +각 cluster 안에 country 분포가 자동으로 들어감 (doc dict 의 country field). + +파라미터 (5h 윈도우용): +- LAMBDA = ln(2)/2h ≈ 0.347 (2시간 반감기, 야간 5h 윈도우라 빠른 감쇠) +- threshold = 0.78 고정 (Phase 4 0.75~0.80 중간값) +- MIN_ARTICLES_PER_TOPIC = 2 (야간 sparse 대비 완화) +- MIN_COUNTRIES_PER_TOPIC = 2 (cross-country 가치 핵심) +- MAX_TOPICS = 7 (1페이지 분량) +""" + +import math + +from core.utils import setup_logger +from services.clustering_common import ( + greedy_assign_cluster, + normalize_importance_scores, +) + +logger = setup_logger("briefing_clustering") + +LAMBDA = math.log(2) / (2.0 / 24.0) # 2시간 반감기 (단위: 일) +THRESHOLD = 0.78 +CENTROID_ALPHA = 0.7 +MIN_ARTICLES_PER_TOPIC = 2 +MIN_COUNTRIES_PER_TOPIC = 2 +MAX_TOPICS = 7 + + +def _count_distinct_countries(cluster: dict) -> int: + return len({m.get("country") for m in cluster["members"] if m.get("country")}) + + +def cluster_global(docs: list[dict]) -> list[dict]: + """모든 country docs 를 합쳐 topic cluster 생성. + + Args: + docs: loader.load_night_window 의 출력 (각 dict 에 country field 포함). + + Returns: + [{centroid, members, weight_sum, raw_weight_sum, importance_score, country_count}, ...] + - MIN_ARTICLES + MIN_COUNTRIES 둘 다 충족 cluster 만 + - importance_score 내림차순, MAX_TOPICS 개 cap + """ + if not docs: + logger.info("[briefing] docs=0 → skip") + return [] + + clusters, raw_count = greedy_assign_cluster( + docs, + threshold=THRESHOLD, + centroid_alpha=CENTROID_ALPHA, + min_articles=MIN_ARTICLES_PER_TOPIC, + max_topics=MAX_TOPICS * 4, # MIN_COUNTRIES 필터 전 buffer + lambda_val=LAMBDA, + ) + + # MIN_COUNTRIES_PER_TOPIC 필터 — single-country cluster drop + pre_country_filter = len(clusters) + filtered = [] + for c in clusters: + cc = _count_distinct_countries(c) + if cc >= MIN_COUNTRIES_PER_TOPIC: + c["country_count"] = cc + filtered.append(c) + clusters = filtered[:MAX_TOPICS] + dropped_country = pre_country_filter - len(clusters) + dropped_min_articles = raw_count - pre_country_filter + + # MIN_COUNTRIES + MAX_TOPICS 필터 후 importance 재정규화 (briefing 내 0~1) + normalize_importance_scores(clusters) + + logger.info( + f"[briefing] docs={len(docs)} threshold={THRESHOLD} " + f"raw_clusters={raw_count} dropped_min_articles={dropped_min_articles} " + f"dropped_single_country={dropped_country} kept={len(clusters)}" + ) + return clusters diff --git a/app/services/briefing/comparator.py b/app/services/briefing/comparator.py new file mode 100644 index 0000000..694bfb9 --- /dev/null +++ b/app/services/briefing/comparator.py @@ -0,0 +1,252 @@ +"""Cluster → 26B MLX 비교 분석 호출 + JSON envelope + historical context + fallback row. + +Plan §"LLM Parse 실패 시 Fallback Topic Row (고정 형태)": +LLM JSON parse 2회 재시도 후 실패 → 고정 형태 fallback 저장 (drop 금지). + +Plan §"Historical Context": +BRIEFING_HISTORICAL_ENABLED=true 시 cluster centroid 와 historical candidate +cosine top-K 5 (similarity ≥0.70) 추출 → 프롬프트 {historical_block} 주입. +LLM 응답 envelope 의 historical_context 옵션 필드. +""" + +import asyncio +import json +import os +from pathlib import Path +from typing import Any + +import numpy as np + +from ai.client import parse_json_response +from core.utils import setup_logger +from services.clustering_common import normalize_vector + +logger = setup_logger("briefing_comparator") + +LLM_CALL_TIMEOUT = 25 # 초. Phase 4 와 동일 +HISTORICAL_TOP_K = 5 +HISTORICAL_SIMILARITY_MIN = 0.70 +HISTORICAL_WINDOW_DAYS = 30 + +# JSON envelope cap (프롬프트 + 후처리 양쪽 강제) +MAX_PERSPECTIVES = 10 +MAX_DIVERGENCES = 3 +MAX_CONVERGENCES = 2 +MAX_KEY_QUOTES = 5 +MAX_PERSPECTIVE_SUMMARY_LEN = 240 # 한국어 1~2문장 ≤120자 × 2 +MAX_HISTORICAL_CONTEXT_LEN = 240 +FALLBACK_HEADLINE = "LLM 분석 실패로 원문 기사 묶음만 표시합니다." +FALLBACK_TOPIC_LABEL = "주요 뉴스 묶음" + +_llm_sem = asyncio.Semaphore(1) +_PROMPT_PATH = Path(__file__).resolve().parent.parent.parent / "prompts" / "briefing_comparative.txt" +_PROMPT_TEMPLATE: str | None = None + + +def historical_enabled() -> bool: + return os.environ.get("BRIEFING_HISTORICAL_ENABLED", "false").lower() in {"1", "true", "yes"} + + +def _load_prompt() -> str: + global _PROMPT_TEMPLATE + if _PROMPT_TEMPLATE is None: + _PROMPT_TEMPLATE = _PROMPT_PATH.read_text(encoding="utf-8") + return _PROMPT_TEMPLATE + + +def _build_articles_block(selected: list[dict]) -> str: + lines = [] + for i, m in enumerate(selected, start=1): + country = m.get("country") or "??" + source = m.get("ai_sub_group") or "" + text = (m.get("ai_summary_truncated") or m.get("ai_summary") or m.get("title") or "").strip() + lines.append(f"[{i}] ({country} · {source}) {text}") + return "\n".join(lines) + + +def _build_historical_block(historical_docs: list[dict]) -> str: + if not historical_docs: + return "(과거 참고 자료 없음)" + lines = ["※ 이전 30일 흐름 참고용 — 본 분석에서 직접 인용 금지, 맥락 파악 용도."] + for i, d in enumerate(historical_docs, start=1): + text = (d.get("ai_summary") or d.get("title") or "").strip() + # historical 은 ai_summary 가 길 수 있어 200자 cap + if len(text) > 200: + text = text[:200] + "…" + lines.append(f"[H{i}] {text}") + return "\n".join(lines) + + +def build_prompt(selected: list[dict], historical_docs: list[dict]) -> str: + template = _load_prompt() + articles_block = _build_articles_block(selected) + historical_block = _build_historical_block(historical_docs) + return template.replace("{articles_block}", articles_block).replace( + "{historical_block}", historical_block + ) + + +def retrieve_historical( + cluster: dict, + candidates: list[dict], + *, + top_k: int = HISTORICAL_TOP_K, + sim_min: float = HISTORICAL_SIMILARITY_MIN, +) -> list[dict]: + """cluster centroid 와 candidate pool 의 cosine top-K (sim ≥ sim_min). + + candidates 가 비어있거나 sim 미달 시 빈 list. + """ + if not candidates: + return [] + centroid = cluster["centroid"] + scored = [] + for d in candidates: + v = normalize_vector(d["embedding"]) + sim = float(np.dot(centroid, v)) + if sim >= sim_min: + scored.append((sim, d)) + scored.sort(key=lambda x: -x[0]) + return [d for _, d in scored[:top_k]] + + +async def _try_call_llm(client: Any, prompt: str) -> str: + async with _llm_sem: + return await asyncio.wait_for( + client.call_primary(prompt), + timeout=LLM_CALL_TIMEOUT, + ) + + +def _truncate_str(s: Any, limit: int) -> str: + if not isinstance(s, str): + return "" + s = s.strip() + if len(s) > limit: + s = s[:limit].rstrip() + "…" + return s + + +def _sanitize_envelope(parsed: dict, cluster: dict) -> dict | None: + """LLM 응답 envelope 검증 + cap 강제. None 반환 시 fallback 발동.""" + if not isinstance(parsed, dict): + return None + + topic_label = _truncate_str(parsed.get("topic_label"), 120) + headline = _truncate_str(parsed.get("headline"), 200) + if not topic_label or not headline: + return None + + # country_perspectives + raw_persp = parsed.get("country_perspectives") + perspectives = [] + if isinstance(raw_persp, list): + for p in raw_persp[:MAX_PERSPECTIVES]: + if not isinstance(p, dict): + continue + country = _truncate_str(p.get("country"), 10).upper() + summary = _truncate_str(p.get("summary"), MAX_PERSPECTIVE_SUMMARY_LEN) + ids = p.get("article_ids") or [] + if not isinstance(ids, list): + ids = [] + ids = [int(x) for x in ids if isinstance(x, (int, str)) and str(x).isdigit()] + if country and summary: + perspectives.append({"country": country, "summary": summary, "article_ids": ids}) + if not perspectives: + # 비교 분석 가치가 없는 응답 → fallback + return None + + def _str_array(key: str, cap: int, item_limit: int) -> list[str]: + raw = parsed.get(key) + if not isinstance(raw, list): + return [] + out = [] + for it in raw[:cap]: + t = _truncate_str(it, item_limit) + if t: + out.append(t) + return out + + divergences = _str_array("divergences", MAX_DIVERGENCES, 200) + convergences = _str_array("convergences", MAX_CONVERGENCES, 200) + + # key_quotes: [{country, source, quote}] + raw_quotes = parsed.get("key_quotes") + quotes = [] + if isinstance(raw_quotes, list): + for q in raw_quotes[:MAX_KEY_QUOTES]: + if not isinstance(q, dict): + continue + entry = { + "country": _truncate_str(q.get("country"), 10).upper(), + "source": _truncate_str(q.get("source"), 60), + "quote": _truncate_str(q.get("quote"), 240), + } + if entry["quote"]: + quotes.append(entry) + + historical_context = _truncate_str(parsed.get("historical_context"), MAX_HISTORICAL_CONTEXT_LEN) or None + + return { + "topic_label": topic_label, + "headline": headline, + "country_perspectives": perspectives, + "divergences": divergences, + "convergences": convergences, + "key_quotes": quotes, + "historical_context": historical_context, + "llm_fallback_used": False, + } + + +def _make_fallback(cluster: dict) -> dict: + """Plan §"Fallback Topic Row (고정 형태)". drop 금지, country_perspectives 빈 list.""" + return { + "topic_label": FALLBACK_TOPIC_LABEL, + "headline": FALLBACK_HEADLINE, + "country_perspectives": [], + "divergences": [], + "convergences": [], + "key_quotes": [], + "historical_context": None, + "llm_fallback_used": True, + } + + +async def compare_cluster_with_fallback( + client: Any, + cluster: dict, + selected: list[dict], + historical_docs: list[dict] | None = None, +) -> dict: + """1 cluster 비교 분석. LLM 2회 재시도 → 실패 시 fallback row. + + Returns: + sanitized envelope dict (Plan §"LLM 프롬프트 출력 envelope") + llm_fallback_used. + """ + historical_docs = historical_docs or [] + prompt = build_prompt(selected, historical_docs) + + for attempt in range(2): + try: + raw = await _try_call_llm(client, prompt) + except asyncio.TimeoutError: + logger.warning( + f"LLM timeout {LLM_CALL_TIMEOUT}s " + f"(attempt={attempt + 1}, cluster size={len(cluster['members'])})" + ) + continue + except Exception as e: + logger.warning(f"LLM 호출 실패 attempt={attempt + 1}: {e}") + continue + + parsed = parse_json_response(raw) + sanitized = _sanitize_envelope(parsed, cluster) if parsed else None + if sanitized: + return sanitized + logger.warning( + f"envelope 검증 실패 attempt={attempt + 1} " + f"(raw_len={len(raw) if raw else 0}, parsed_keys={list(parsed.keys()) if isinstance(parsed, dict) else None})" + ) + + return _make_fallback(cluster) diff --git a/app/services/briefing/loader.py b/app/services/briefing/loader.py new file mode 100644 index 0000000..42a1e85 --- /dev/null +++ b/app/services/briefing/loader.py @@ -0,0 +1,199 @@ +"""야간 5h 수집 뉴스 윈도우 로드 + country 정규화 + (옵션) 과거 N일 후보 로드. + +- KST 자정~05:00 사이 수집된 documents (source_channel='news' OR ai_domain='News'). +- country canonical = document_chunks.country first non-null → news_sources prefix fallback (Phase 4 동일). +- ai_summary/embedding NULL 제외 (재요약/재임베딩 0회 원칙). +- 반환: doc dict 의 list (topic-first cluster 입력. country 는 각 dict 의 field). +- 과거 retrieval 용 historical doc 후보는 별도 함수 (BRIEFING_HISTORICAL_ENABLED on 시). +""" + +from datetime import datetime +from typing import Any + +import numpy as np +from sqlalchemy import text + +from core.database import async_session +from core.utils import setup_logger + +logger = setup_logger("briefing_loader") + + +_NEWS_WINDOW_SQL = text(""" + SELECT + d.id, + d.title, + d.ai_summary, + d.embedding, + d.created_at, + d.edit_url, + d.ai_sub_group, + ( + SELECT c.country + FROM document_chunks c + WHERE c.doc_id = d.id AND c.country IS NOT NULL + LIMIT 1 + ) AS chunk_country + FROM documents d + WHERE (d.source_channel = 'news' OR d.ai_domain = 'News') + AND d.deleted_at IS NULL + AND d.created_at >= :window_start + AND d.created_at < :window_end + AND d.embedding IS NOT NULL + AND d.ai_summary IS NOT NULL +""") + + +_SOURCE_COUNTRY_SQL = text(""" + SELECT name, country FROM news_sources WHERE country IS NOT NULL +""") + + +_HISTORICAL_CANDIDATES_SQL = text(""" + SELECT + d.id, + d.title, + d.ai_summary, + d.embedding, + d.created_at + FROM documents d + WHERE (d.source_channel = 'news' OR d.ai_domain = 'News') + AND d.deleted_at IS NULL + AND d.created_at >= :hist_start + AND d.created_at < :hist_end + AND d.embedding IS NOT NULL + AND d.ai_summary IS NOT NULL +""") + + +def _to_numpy_embedding(raw: Any) -> np.ndarray | None: + if raw is None: + return None + if isinstance(raw, str): + import json + try: + raw = json.loads(raw) + except json.JSONDecodeError: + return None + try: + arr = np.asarray(raw, dtype=np.float32) + except (TypeError, ValueError): + return None + if arr.size == 0: + return None + return arr + + +async def _load_source_country_map(session) -> dict[str, str]: + """news_sources name → country prefix 매핑 (Phase 4 패턴 미러).""" + rows = await session.execute(_SOURCE_COUNTRY_SQL) + mapping: dict[str, str] = {} + for name, country in rows: + if not name or not country: + continue + prefix = name.split(" ")[0].strip() + if prefix and prefix not in mapping: + mapping[prefix] = country + tokens = name.split(" ") + if len(tokens) >= 3: + source_prefix = " ".join(tokens[:-1]).strip() + if source_prefix and source_prefix not in mapping: + mapping[source_prefix] = country + return mapping + + +async def load_night_window( + window_start: datetime, + window_end: datetime, +) -> list[dict]: + """야간 윈도우 뉴스 docs 를 country 채워진 list 로 반환. + + Returns: + [{id, title, ai_summary, embedding, created_at, edit_url, ai_sub_group, country}, ...] + country 매핑 실패한 doc 은 drop (cross-country 비교가 핵심이므로). + """ + docs: list[dict] = [] + null_country = 0 + + async with async_session() as session: + source_country = await _load_source_country_map(session) + + result = await session.execute( + _NEWS_WINDOW_SQL, + {"window_start": window_start, "window_end": window_end}, + ) + for row in result.mappings(): + embedding = _to_numpy_embedding(row["embedding"]) + if embedding is None: + continue + + country = row["chunk_country"] + if not country: + ai_sub_group = (row["ai_sub_group"] or "").strip() + if ai_sub_group: + country = source_country.get(ai_sub_group) + if not country: + null_country += 1 + continue + + docs.append({ + "id": int(row["id"]), + "title": row["title"] or "", + "ai_summary": row["ai_summary"] or "", + "embedding": embedding, + "created_at": row["created_at"], + "edit_url": row["edit_url"] or "", + "ai_sub_group": row["ai_sub_group"] or "", + "country": country.upper(), + }) + + if null_country: + logger.warning( + f"[loader] country 매핑 실패 drop {null_country}건 " + f"(chunk_country + news_sources prefix 둘 다 fail)" + ) + logger.info( + f"[loader] night window {window_start} ~ {window_end} → " + f"{len(docs)}건 ({len({d['country'] for d in docs})}개 국가)" + ) + return docs + + +async def load_historical_candidates( + hist_start: datetime, + hist_end: datetime, + exclude_ids: set[int], +) -> list[dict]: + """과거 N일 doc 후보 (BRIEFING_HISTORICAL_ENABLED=true 시만 호출). + + cluster centroid 와 cosine 비교용 raw candidate pool. country 매핑 안 함 + (LLM 분석 input 으로만 사용하고 표시 안 함). + + Args: + exclude_ids: 오늘 윈도우 article id (중복 retrieval 회피). + + Returns: + [{id, title, ai_summary, embedding, created_at}, ...] + """ + out: list[dict] = [] + async with async_session() as session: + result = await session.execute( + _HISTORICAL_CANDIDATES_SQL, + {"hist_start": hist_start, "hist_end": hist_end}, + ) + for row in result.mappings(): + doc_id = int(row["id"]) + if doc_id in exclude_ids: + continue + embedding = _to_numpy_embedding(row["embedding"]) + if embedding is None: + continue + out.append({ + "id": doc_id, + "title": row["title"] or "", + "ai_summary": row["ai_summary"] or "", + "embedding": embedding, + "created_at": row["created_at"], + }) + logger.info(f"[loader] historical candidates: {len(out)} docs (window {hist_start.date()} ~ {hist_end.date()})") + return out diff --git a/app/services/briefing/pipeline.py b/app/services/briefing/pipeline.py new file mode 100644 index 0000000..715b578 --- /dev/null +++ b/app/services/briefing/pipeline.py @@ -0,0 +1,261 @@ +"""야간 수집 뉴스 브리핑 파이프라인 (Plan §"PR-MorningBriefing-1 Backend"). + +흐름: load_night_window → cluster_global → select_for_llm (k=7) → + (옵션) historical retrieval → compare_cluster_with_fallback → DB save. + +regenerate 정책: briefing_date UNIQUE 충돌 시 transaction 안에서 DELETE+INSERT. +""" + +import time +from datetime import date, datetime, timedelta, timezone +from typing import Any +from zoneinfo import ZoneInfo + +from sqlalchemy import delete + +from ai.client import AIClient +from core.database import async_session +from core.utils import setup_logger +from models.briefing import BriefingTopic, MorningBriefing +from services.briefing.clustering import LAMBDA, cluster_global +from services.briefing.comparator import ( + HISTORICAL_WINDOW_DAYS, + compare_cluster_with_fallback, + historical_enabled, + retrieve_historical, +) +from services.briefing.loader import load_historical_candidates, load_night_window +from services.digest.selection import select_for_llm + +logger = setup_logger("briefing_pipeline") + +KST = ZoneInfo("Asia/Seoul") +NIGHT_WINDOW_HOURS = 5 # KST 00:00 ~ 05:00 +SELECT_K = 7 # Plan §"Clustering 파라미터" briefing K_PER_CLUSTER=7 +SELECT_LAMBDA_MMR = 0.6 # Plan briefing MMR lambda 0.6 +PIPELINE_HARD_CAP = 600 # 초. Phase 4 와 동일 + + +def _compute_window(target_date: date | None = None) -> tuple[datetime, datetime, date]: + """target_date (KST 자정 시작일) → (window_start_utc, window_end_utc, kst_date). + + target_date=None 시 오늘 KST. + """ + if target_date is None: + target_date = datetime.now(KST).date() + start_kst = datetime.combine(target_date, datetime.min.time(), tzinfo=KST) + end_kst = start_kst + timedelta(hours=NIGHT_WINDOW_HOURS) + return start_kst.astimezone(timezone.utc), end_kst.astimezone(timezone.utc), target_date + + +def _is_usable_topic(envelope: dict, topic_label: str) -> bool: + """fallback row 가 아닌 진짜 LLM 결과인지 판정.""" + if envelope.get("llm_fallback_used"): + return False + if not envelope.get("country_perspectives"): + return False + if topic_label == "주요 뉴스 묶음": + return False + return True + + +def _compute_status(llm_calls: int, fallback_count: int, usable_count: int, has_topics: bool) -> str: + """Plan §"Status 4-state 판정표".""" + if not has_topics or llm_calls == 0: + return "empty" + if usable_count == 0: + return "failed" + fallback_pct = (fallback_count / llm_calls) if llm_calls else 0.0 + if fallback_pct >= 0.5: + return "failed" + if fallback_count > 0 or usable_count < llm_calls: + return "partial" + return "success" + + +def _build_topic_row( + rank: int, + cluster: dict, + envelope: dict, + historical_docs: list[dict] | None, + primary_model: str, +) -> BriefingTopic: + historical_ids = None + historical_window = None + if historical_enabled(): + historical_ids = [d["id"] for d in (historical_docs or [])] + historical_window = HISTORICAL_WINDOW_DAYS + + return BriefingTopic( + topic_rank=rank, + topic_label=envelope["topic_label"], + headline=envelope["headline"], + country_perspectives=envelope["country_perspectives"], + divergences=envelope["divergences"], + convergences=envelope["convergences"], + key_quotes=envelope["key_quotes"], + historical_article_ids=historical_ids, + historical_context=envelope.get("historical_context"), + historical_window_days=historical_window, + cluster_members=[m["id"] for m in cluster["members"]], + article_count=len(cluster["members"]), + country_count=cluster.get("country_count", 0), + importance_score=cluster.get("importance_score", 0.0), + raw_weight_sum=cluster.get("raw_weight_sum", 0.0), + llm_model=primary_model, + llm_fallback_used=envelope.get("llm_fallback_used", False), + ) + + +async def _save_briefing( + briefing_date: date, + window_start: datetime, + window_end: datetime, + total_articles: int, + total_countries: int, + topic_rows: list[BriefingTopic], + llm_calls: int, + llm_failures: int, + generation_ms: int, + status: str, +) -> int: + """briefing_date UNIQUE 충돌은 DELETE+INSERT transaction 으로 처리.""" + async with async_session() as session: + await session.execute( + delete(MorningBriefing).where(MorningBriefing.briefing_date == briefing_date) + ) + new = MorningBriefing( + briefing_date=briefing_date, + window_start=window_start, + window_end=window_end, + decay_lambda=LAMBDA, + total_articles=total_articles, + total_countries=total_countries, + total_topics=len(topic_rows), + generation_ms=generation_ms, + llm_calls=llm_calls, + llm_failures=llm_failures, + status=status, + ) + new.topics = topic_rows + session.add(new) + await session.commit() + return new.id + + +async def run_briefing_pipeline(target_date: date | None = None) -> dict[str, Any]: + """야간 뉴스 브리핑 1회 실행. cron 또는 수동 regenerate API 에서 호출. + + Returns: + {briefing_id, status, total_topics, total_articles, llm_calls, llm_failures, generation_ms, regenerated} + """ + start = time.time() + window_start, window_end, briefing_date = _compute_window(target_date) + logger.info( + f"[briefing] start date={briefing_date} window {window_start} ~ {window_end} " + f"decay_lambda={LAMBDA:.4f} historical={'on' if historical_enabled() else 'off'}" + ) + + # 1. Load night window + docs = await load_night_window(window_start, window_end) + total_articles = len(docs) + total_countries_in_window = len({d["country"] for d in docs}) + + # 2. Cluster (topic-first) + clusters = cluster_global(docs) + + if not clusters: + briefing_id = await _save_briefing( + briefing_date=briefing_date, + window_start=window_start, + window_end=window_end, + total_articles=total_articles, + total_countries=total_countries_in_window, + topic_rows=[], + llm_calls=0, + llm_failures=0, + generation_ms=int((time.time() - start) * 1000), + status="empty", + ) + logger.info(f"[briefing] empty (no usable clusters) → briefing_id={briefing_id}") + return { + "briefing_id": briefing_id, + "status": "empty", + "total_topics": 0, + "total_articles": total_articles, + "llm_calls": 0, + "llm_failures": 0, + "generation_ms": int((time.time() - start) * 1000), + "regenerated": True, + } + + # 3. (옵션) Historical candidate pool 1회 로드 + historical_candidates: list[dict] = [] + if historical_enabled(): + hist_end = window_start # 오늘 윈도우 직전까지 + hist_start = hist_end - timedelta(days=HISTORICAL_WINDOW_DAYS) + exclude = {d["id"] for d in docs} + historical_candidates = await load_historical_candidates(hist_start, hist_end, exclude) + + # 4. cluster 별 LLM 호출 + client = AIClient() + primary_model = client.ai.primary.model + topic_rows: list[BriefingTopic] = [] + llm_calls = 0 + llm_failures = 0 + usable_count = 0 + + try: + for rank, cluster in enumerate(clusters, start=1): + selected = select_for_llm(cluster, k=SELECT_K, lambda_mmr=SELECT_LAMBDA_MMR) + historical_docs = ( + retrieve_historical(cluster, historical_candidates) + if historical_enabled() else [] + ) + llm_calls += 1 + envelope = await compare_cluster_with_fallback( + client, cluster, selected, historical_docs=historical_docs + ) + if envelope.get("llm_fallback_used"): + llm_failures += 1 + if _is_usable_topic(envelope, envelope["topic_label"]): + usable_count += 1 + topic_rows.append( + _build_topic_row(rank, cluster, envelope, historical_docs, primary_model) + ) + finally: + await client.close() + + generation_ms = int((time.time() - start) * 1000) + status = _compute_status(llm_calls, llm_failures, usable_count, has_topics=bool(topic_rows)) + + briefing_id = await _save_briefing( + briefing_date=briefing_date, + window_start=window_start, + window_end=window_end, + total_articles=total_articles, + total_countries=total_countries_in_window, + topic_rows=topic_rows, + llm_calls=llm_calls, + llm_failures=llm_failures, + generation_ms=generation_ms, + status=status, + ) + + fallback_pct = (llm_failures / llm_calls * 100.0) if llm_calls else 0.0 + logger.info( + f"[briefing] done id={briefing_id} status={status} topics={len(topic_rows)} " + f"usable={usable_count}/{llm_calls} fallback={llm_failures}/{llm_calls} ({fallback_pct:.1f}%) " + f"elapsed={generation_ms / 1000:.1f}s" + ) + + return { + "briefing_id": briefing_id, + "status": status, + "total_topics": len(topic_rows), + "total_articles": total_articles, + "llm_calls": llm_calls, + "llm_failures": llm_failures, + "generation_ms": generation_ms, + "regenerated": True, + } diff --git a/app/services/digest/selection.py b/app/services/digest/selection.py index 701e9d0..1e2e527 100644 --- a/app/services/digest/selection.py +++ b/app/services/digest/selection.py @@ -13,12 +13,20 @@ LAMBDA_MMR = 0.7 # relevance 70% / diversity 30% SUMMARY_TRUNCATE = 300 # long tail ai_summary 방어 -def select_for_llm(cluster: dict, k: int = K_PER_CLUSTER) -> list[dict]: +def select_for_llm( + cluster: dict, + k: int = K_PER_CLUSTER, + *, + lambda_mmr: float = LAMBDA_MMR, + summary_truncate: int = SUMMARY_TRUNCATE, +) -> list[dict]: """cluster 내 LLM 호출용 대표 article 들 선정. Args: - cluster: clustering.cluster_country 결과 단일 cluster - k: 선정 개수 (기본 5) + cluster: clustering.cluster_country / briefing.cluster_global 결과 단일 cluster + k: 선정 개수 (Phase 4=5, briefing=7) + lambda_mmr: relevance vs diversity (Phase 4=0.7, briefing=0.6) + summary_truncate: ai_summary 자르기 길이 (LLM 토큰 보호) Returns: 선정된 doc dict 리스트. 각 항목에 ai_summary_truncated 필드가 추가됨. @@ -28,7 +36,6 @@ def select_for_llm(cluster: dict, k: int = K_PER_CLUSTER) -> list[dict]: selected = list(members) else: centroid = cluster["centroid"] - # relevance = centroid 유사도 × decay weight for m in members: v = _normalize(m["embedding"]) m["_rel"] = float(np.dot(centroid, v)) * m["weight"] @@ -44,14 +51,13 @@ def select_for_llm(cluster: dict, k: int = K_PER_CLUSTER) -> list[dict]: float(np.dot(v, _normalize(s["embedding"]))) for s in selected ) - return LAMBDA_MMR * c["_rel"] - (1.0 - LAMBDA_MMR) * max_sim + return lambda_mmr * c["_rel"] - (1.0 - lambda_mmr) * max_sim pick = max(candidates, key=mmr_score) selected.append(pick) candidates.remove(pick) - # LLM 입력 토큰 보호 for m in selected: - m["ai_summary_truncated"] = (m.get("ai_summary") or "")[:SUMMARY_TRUNCATE] + m["ai_summary_truncated"] = (m.get("ai_summary") or "")[:summary_truncate] return selected diff --git a/app/workers/briefing_worker.py b/app/workers/briefing_worker.py new file mode 100644 index 0000000..9ce62fc --- /dev/null +++ b/app/workers/briefing_worker.py @@ -0,0 +1,43 @@ +"""Morning Briefing 워커 — 야간 수집 뉴스 (KST 00:00~05:00) topic×country 비교 분석. + +- APScheduler cron (매일 05:10 KST, PR-3 에서 등록) + 수동 호출 공용 진입점 +- PIPELINE_HARD_CAP = 600초 hard cap 으로 cron stuck 절대 방지 +- 단독 실행: `python -m workers.briefing_worker` +""" + +import asyncio +from datetime import date + +from core.utils import setup_logger +from services.briefing.pipeline import run_briefing_pipeline + +logger = setup_logger("briefing_worker") + +PIPELINE_HARD_CAP = 600 + + +async def run(target_date: date | None = None) -> dict | None: + """APScheduler + 수동 호출 공용 진입점. + + Args: + target_date: KST 기준 briefing_date (None = 오늘). API regenerate 가 명시 지정 가능. + """ + try: + result = await asyncio.wait_for( + run_briefing_pipeline(target_date), + timeout=PIPELINE_HARD_CAP, + ) + logger.info(f"[briefing] 워커 완료: {result}") + return result + except asyncio.TimeoutError: + logger.error( + f"[briefing] HARD CAP {PIPELINE_HARD_CAP}s 초과 — 워커 강제 중단. " + f"기존 briefing 은 commit 시점에만 갱신되므로 그대로 유지됨." + ) + except Exception as e: + logger.exception(f"[briefing] 워커 실패: {e}") + return None + + +if __name__ == "__main__": + asyncio.run(run()) diff --git a/migrations/255_morning_briefings.sql b/migrations/255_morning_briefings.sql new file mode 100644 index 0000000..67d4999 --- /dev/null +++ b/migrations/255_morning_briefings.sql @@ -0,0 +1,67 @@ +-- 야간 수집 뉴스 브리핑 (Morning Briefing) +-- 매일 KST 자정~05:00 사이 수집된 뉴스를 topic×country 비교 분석 1페이지 카드. +-- 트리거: 05:10 KST APScheduler cron (PR-3 에서 등록). Phase 4 와 axis 반대 (topic-first). +-- 코드/로직/테이블 모두 Phase 4 와 분리. 공통 알고리즘만 services/clustering_common 공유. + +-- 부모: 일일 1행 +CREATE TABLE morning_briefings ( + id BIGSERIAL PRIMARY KEY, + briefing_date DATE NOT NULL, -- KST 기준 (윈도우 자정 시작일) + window_start TIMESTAMPTZ NOT NULL, -- UTC 환산 자정 + window_end TIMESTAMPTZ NOT NULL, -- UTC 환산 05:00 KST + decay_lambda DOUBLE PRECISION NOT NULL, -- 실제 사용된 time-decay λ (briefing = ln(2)/2h) + + total_articles INTEGER NOT NULL DEFAULT 0, + total_countries INTEGER NOT NULL DEFAULT 0, + total_topics INTEGER NOT NULL DEFAULT 0, + + generation_ms INTEGER, + llm_calls INTEGER NOT NULL DEFAULT 0, + llm_failures INTEGER NOT NULL DEFAULT 0, + status VARCHAR(20) NOT NULL DEFAULT 'success', -- success | partial | failed | empty + + headline_oneliner TEXT, -- 향후 별 단계 (지금은 NULL) + + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + + UNIQUE (briefing_date) -- idempotency: regenerate 시 DELETE+INSERT +); + +CREATE INDEX idx_morning_briefings_date ON morning_briefings (briefing_date DESC); + +-- 자식: 1 briefing 안 topic_rank 순 (importance_score 내림차순) +CREATE TABLE briefing_topics ( + id BIGSERIAL PRIMARY KEY, + briefing_id BIGINT NOT NULL REFERENCES morning_briefings(id) ON DELETE CASCADE, + + topic_rank INTEGER NOT NULL, -- 1..N + topic_label VARCHAR(120) NOT NULL, -- "이란-이스라엘 충돌" 등 5~10 단어 한국어 + headline TEXT NOT NULL, -- 1줄 요약 + + -- LLM 비교 분석 결과 (JSONB cap: perspectives ≤10, divergences ≤3, convergences ≤2, quotes ≤5) + country_perspectives JSONB NOT NULL DEFAULT '[]'::jsonb, -- [{country, summary, article_ids}, ...] + divergences JSONB NOT NULL DEFAULT '[]'::jsonb, -- ["A국 X / B국 Y", ...] + convergences JSONB NOT NULL DEFAULT '[]'::jsonb, -- ["모두 Z 일치", ...] + key_quotes JSONB NOT NULL DEFAULT '[]'::jsonb, -- [{country, source, quote}, ...] + + -- Historical context (BRIEFING_HISTORICAL_ENABLED=false default off, PR-1b 에서 on) + historical_article_ids JSONB, -- [doc_id, ...] top-K 과거 참고 (페이지 노출 X) + historical_context TEXT, -- LLM 생성 "지난 흐름" 1~2문장 + historical_window_days INTEGER, -- 사용된 N (기본 30) + + -- Cluster 메타 (LLM 성공/실패와 무관, UI 폴백 link 용) + cluster_members JSONB NOT NULL DEFAULT '[]'::jsonb, -- [doc_id, ...] cluster 전체 article + article_count INTEGER NOT NULL, + country_count INTEGER NOT NULL, -- cluster 안 distinct country 수 (MIN_COUNTRIES_PER_TOPIC=2 필터 통과) + importance_score DOUBLE PRECISION NOT NULL, -- briefing 내 0~1 normalized + raw_weight_sum DOUBLE PRECISION NOT NULL, + + llm_model VARCHAR(100), + llm_fallback_used BOOLEAN NOT NULL DEFAULT FALSE, + + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + + UNIQUE (briefing_id, topic_rank) +); + +CREATE INDEX idx_briefing_topics_briefing_rank ON briefing_topics (briefing_id, topic_rank); diff --git a/tests/test_briefing_historical.py b/tests/test_briefing_historical.py new file mode 100644 index 0000000..c6102eb --- /dev/null +++ b/tests/test_briefing_historical.py @@ -0,0 +1,203 @@ +"""Briefing historical 분기 회귀 — Plan §"Verification 9". + +3 경로 검증: +1. flag off → retrieve_historical 호출 안 함, prompt {historical_block} = "(과거 참고 자료 없음)" +2. flag on + fixture top-K → similarity ≥0.70 docs 만 반환 +3. flag on + zero match → 빈 list (no fallback hallucination) +""" + +import os +import sys +from datetime import datetime, timezone, timedelta +from pathlib import Path + +import numpy as np +import pytest + +# PYTHONPATH = /app (디렉토리 안에서 실행 가정 또는 sys.path 추가) +APP_DIR = Path(__file__).resolve().parent.parent / "app" +if str(APP_DIR) not in sys.path: + sys.path.insert(0, str(APP_DIR)) + +from services.briefing.comparator import ( + HISTORICAL_SIMILARITY_MIN, + HISTORICAL_TOP_K, + _build_historical_block, + _make_fallback, + _sanitize_envelope, + build_prompt, + historical_enabled, + retrieve_historical, +) +from services.clustering_common import normalize_vector + + +def _make_doc(doc_id: int, embedding: np.ndarray, hours_ago: int = 1) -> dict: + return { + "id": doc_id, + "title": f"doc {doc_id}", + "ai_summary": f"summary {doc_id}", + "embedding": embedding, + "created_at": datetime.now(timezone.utc) - timedelta(hours=hours_ago), + } + + +def _make_cluster_with_centroid(centroid_vec: np.ndarray) -> dict: + return { + "centroid": normalize_vector(centroid_vec), + "members": [], + } + + +def test_flag_default_off(): + """env 미설정 → historical disabled.""" + os.environ.pop("BRIEFING_HISTORICAL_ENABLED", None) + assert historical_enabled() is False + + +def test_flag_on(): + os.environ["BRIEFING_HISTORICAL_ENABLED"] = "true" + try: + assert historical_enabled() is True + finally: + os.environ.pop("BRIEFING_HISTORICAL_ENABLED", None) + + +def test_historical_block_empty_when_no_docs(): + """경로 1: flag off 또는 historical_docs=[] → 빈 라벨.""" + block = _build_historical_block([]) + assert block == "(과거 참고 자료 없음)" + + +def test_historical_block_has_label_when_docs(): + docs = [_make_doc(1, np.ones(1024, dtype=np.float32))] + block = _build_historical_block(docs) + assert "이전 30일 흐름" in block + assert "직접 인용 금지" in block + assert "[H1]" in block + + +def test_retrieve_historical_topk(): + """경로 2: flag on + fixture top-K similarity ≥ threshold.""" + # cluster centroid = 모두 1 방향 + centroid = np.ones(8, dtype=np.float32) + cluster = _make_cluster_with_centroid(centroid) + + # 후보 10개: 5개는 centroid 와 유사 (sim≈1.0), 5개는 직교 (sim≈0) + similar_emb = np.ones(8, dtype=np.float32) + orthogonal_emb = np.array([1, -1, 1, -1, 1, -1, 1, -1], dtype=np.float32) + candidates = ( + [_make_doc(i, similar_emb + np.random.rand(8).astype(np.float32) * 0.01) for i in range(1, 6)] + + [_make_doc(10 + i, orthogonal_emb) for i in range(5)] + ) + + out = retrieve_historical(cluster, candidates, top_k=5, sim_min=0.70) + assert len(out) == 5 + # 모두 similar 그룹 (id 1~5) 만 선택됨 + selected_ids = {d["id"] for d in out} + assert selected_ids.issubset({1, 2, 3, 4, 5}) + + +def test_retrieve_historical_zero_match(): + """경로 3: 모든 candidate similarity < threshold → 빈 list.""" + centroid = np.ones(8, dtype=np.float32) + cluster = _make_cluster_with_centroid(centroid) + orthogonal_emb = np.array([1, -1, 1, -1, 1, -1, 1, -1], dtype=np.float32) + candidates = [_make_doc(i, orthogonal_emb) for i in range(5)] + + out = retrieve_historical(cluster, candidates, top_k=5, sim_min=0.70) + assert out == [] + + +def test_retrieve_historical_empty_candidates(): + centroid = np.ones(8, dtype=np.float32) + cluster = _make_cluster_with_centroid(centroid) + assert retrieve_historical(cluster, [], top_k=5) == [] + + +def test_sanitize_envelope_valid(): + cluster = {"members": [{"id": 1}, {"id": 2}]} + parsed = { + "topic_label": "이란 충돌", + "headline": "긴장 격화", + "country_perspectives": [ + {"country": "kr", "summary": "유가 충격", "article_ids": [1]}, + {"country": "us", "summary": "외교 압박", "article_ids": [2]}, + ], + "divergences": ["KR=경제 / US=외교"], + "convergences": ["민간 사상 우려 공통"], + "key_quotes": [{"country": "US", "source": "NYT", "quote": "Tehran ..."}], + "historical_context": "지난 3주 6회 공방", + } + sanitized = _sanitize_envelope(parsed, cluster) + assert sanitized is not None + assert sanitized["topic_label"] == "이란 충돌" + # country 대문자 변환 + assert sanitized["country_perspectives"][0]["country"] == "KR" + assert sanitized["historical_context"] == "지난 3주 6회 공방" + assert sanitized["llm_fallback_used"] is False + + +def test_sanitize_envelope_empty_perspectives_to_fallback(): + """country_perspectives 비어 있으면 None (caller 가 fallback 발동).""" + cluster = {"members": []} + parsed = { + "topic_label": "X", + "headline": "Y", + "country_perspectives": [], + } + assert _sanitize_envelope(parsed, cluster) is None + + +def test_fallback_row_fixed_form(): + """Plan §"Fallback Topic Row 고정 형태".""" + cluster = {"members": [{"id": 1}]} + fb = _make_fallback(cluster) + assert fb["topic_label"] == "주요 뉴스 묶음" + assert fb["country_perspectives"] == [] + assert fb["divergences"] == [] + assert fb["convergences"] == [] + assert fb["key_quotes"] == [] + assert fb["historical_context"] is None + assert fb["llm_fallback_used"] is True + + +def test_prompt_includes_both_blocks(): + selected = [_make_doc(1, np.ones(8, dtype=np.float32))] + selected[0]["country"] = "KR" + selected[0]["ai_sub_group"] = "경향신문" + selected[0]["ai_summary_truncated"] = "오늘 한국 뉴스" + + prompt = build_prompt(selected, historical_docs=[]) + assert "{articles_block}" not in prompt # 치환됨 + assert "{historical_block}" not in prompt + assert "(KR · 경향신문)" in prompt + assert "(과거 참고 자료 없음)" in prompt + + +def test_perspective_summary_cap_enforced(): + """sanitize 가 길이 cap 강제.""" + cluster = {"members": []} + long_summary = "가" * 500 # 500자, cap=240 + parsed = { + "topic_label": "T", + "headline": "H", + "country_perspectives": [{"country": "KR", "summary": long_summary, "article_ids": []}], + } + s = _sanitize_envelope(parsed, cluster) + assert s is not None + assert len(s["country_perspectives"][0]["summary"]) <= 241 # 240 + "…" + + +def test_max_perspectives_cap(): + cluster = {"members": []} + parsed = { + "topic_label": "T", + "headline": "H", + "country_perspectives": [ + {"country": f"C{i}", "summary": "s", "article_ids": []} for i in range(20) + ], + } + s = _sanitize_envelope(parsed, cluster) + assert s is not None + assert len(s["country_perspectives"]) <= 10