diff --git a/app/api/digest.py b/app/api/digest.py new file mode 100644 index 0000000..ceb8061 --- /dev/null +++ b/app/api/digest.py @@ -0,0 +1,164 @@ +"""Phase 4 Global Digest API — read-only + 디버그 regenerate. + +엔드포인트: +- GET /api/digest/latest : 가장 최근 digest +- GET /api/digest?date=YYYY-MM-DD : 특정 날짜 digest +- GET /api/digest?country=KR : 특정 국가만 +- POST /api/digest/regenerate : 백그라운드 digest 워커 트리거 (auth 필요) + +응답은 country → topic 2-level 구조. country 가 비어있는 경우 응답에서 자동 생략. +""" + +import asyncio +from datetime import date as date_type +from datetime import datetime +from typing import Annotated + +from fastapi import APIRouter, Depends, HTTPException, Query +from pydantic import BaseModel +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession +from sqlalchemy.orm import selectinload + +from core.auth import get_current_user +from core.database import get_session +from models.digest import DigestTopic, GlobalDigest +from models.user import User + +router = APIRouter() + + +# ─── Pydantic 응답 모델 (schemas/ 디렉토리 미사용 → inline 정의) ─── + + +class TopicResponse(BaseModel): + topic_rank: int + topic_label: str + summary: str + article_ids: list[int] + article_count: int + importance_score: float + raw_weight_sum: float + llm_fallback_used: bool + + +class CountryGroup(BaseModel): + country: str + topics: list[TopicResponse] + + +class DigestResponse(BaseModel): + digest_date: date_type + window_start: datetime + window_end: datetime + decay_lambda: float + total_articles: int + total_countries: int + total_topics: int + generation_ms: int | None + llm_calls: int + llm_failures: int + status: str + countries: list[CountryGroup] + + +# ─── helpers ─── + + +def _build_response(digest: GlobalDigest, country_filter: str | None = None) -> DigestResponse: + """ORM 객체 → DigestResponse. country_filter 가 주어지면 해당 국가만.""" + topics_by_country: dict[str, list[TopicResponse]] = {} + for t in sorted(digest.topics, key=lambda x: (x.country, x.topic_rank)): + if country_filter and t.country != country_filter: + continue + topics_by_country.setdefault(t.country, []).append( + TopicResponse( + topic_rank=t.topic_rank, + topic_label=t.topic_label, + summary=t.summary, + article_ids=list(t.article_ids or []), + article_count=t.article_count, + importance_score=t.importance_score, + raw_weight_sum=t.raw_weight_sum, + llm_fallback_used=t.llm_fallback_used, + ) + ) + + countries = [ + CountryGroup(country=c, topics=topics_by_country[c]) + for c in sorted(topics_by_country.keys()) + ] + + return DigestResponse( + digest_date=digest.digest_date, + window_start=digest.window_start, + window_end=digest.window_end, + decay_lambda=digest.decay_lambda, + total_articles=digest.total_articles, + total_countries=digest.total_countries, + total_topics=digest.total_topics, + generation_ms=digest.generation_ms, + llm_calls=digest.llm_calls, + llm_failures=digest.llm_failures, + status=digest.status, + countries=countries, + ) + + +async def _load_digest( + session: AsyncSession, + target_date: date_type | None, +) -> GlobalDigest | None: + """date 가 주어지면 해당 날짜, 아니면 최신 digest 1건.""" + query = select(GlobalDigest).options(selectinload(GlobalDigest.topics)) + if target_date is not None: + query = query.where(GlobalDigest.digest_date == target_date) + else: + query = query.order_by(GlobalDigest.digest_date.desc()) + query = query.limit(1) + result = await session.execute(query) + return result.scalar_one_or_none() + + +# ─── Routes ─── + + +@router.get("/latest", response_model=DigestResponse) +async def get_latest( + user: Annotated[User, Depends(get_current_user)], + session: Annotated[AsyncSession, Depends(get_session)], +): + """가장 최근 생성된 global digest.""" + digest = await _load_digest(session, target_date=None) + if digest is None: + raise HTTPException(status_code=404, detail="아직 생성된 digest 없음") + return _build_response(digest) + + +@router.get("", response_model=DigestResponse) +async def get_digest( + user: Annotated[User, Depends(get_current_user)], + session: Annotated[AsyncSession, Depends(get_session)], + date: date_type | None = Query(default=None, description="YYYY-MM-DD (KST)"), + country: str | None = Query(default=None, description="국가 코드 (예: KR)"), +): + """특정 날짜 또는 국가 필터링된 digest. date 미지정 시 최신.""" + digest = await _load_digest(session, target_date=date) + if digest is None: + raise HTTPException( + status_code=404, + detail=f"digest 없음 (date={date})" if date else "아직 생성된 digest 없음", + ) + country_filter = country.upper() if country else None + return _build_response(digest, country_filter=country_filter) + + +@router.post("/regenerate") +async def regenerate( + user: Annotated[User, Depends(get_current_user)], +): + """디버그용 수동 트리거 — 백그라운드 태스크로 워커 실행 (auth 필요).""" + from workers.digest_worker import run + + asyncio.create_task(run()) + return {"status": "started", "message": "global_digest 워커 백그라운드 실행 시작"} diff --git a/app/main.py b/app/main.py index e3e9a2c..35381a3 100644 --- a/app/main.py +++ b/app/main.py @@ -8,6 +8,7 @@ from sqlalchemy import func, select, text from api.auth import router as auth_router from api.dashboard import router as dashboard_router +from api.digest import router as digest_router from api.documents import router as documents_router from api.news import router as news_router from api.search import router as search_router @@ -26,6 +27,7 @@ async def lifespan(app: FastAPI): from apscheduler.triggers.cron import CronTrigger from services.search.query_analyzer import prewarm_analyzer from workers.daily_digest import run as daily_digest_run + from workers.digest_worker import run as global_digest_run from workers.file_watcher import watch_inbox from workers.law_monitor import run as law_monitor_run from workers.mailplus_archive import run as mailplus_run @@ -54,6 +56,7 @@ async def lifespan(app: FastAPI): scheduler.add_job(mailplus_run, CronTrigger(hour=7), id="mailplus_morning") scheduler.add_job(mailplus_run, CronTrigger(hour=18), id="mailplus_evening") scheduler.add_job(daily_digest_run, CronTrigger(hour=20), id="daily_digest") + scheduler.add_job(global_digest_run, CronTrigger(hour=4, minute=0), id="global_digest") scheduler.add_job(news_collector_run, "interval", hours=6, id="news_collector") scheduler.start() @@ -88,6 +91,7 @@ app.include_router(search_router, prefix="/api/search", tags=["search"]) app.include_router(dashboard_router, prefix="/api/dashboard", tags=["dashboard"]) app.include_router(news_router, prefix="/api/news", tags=["news"]) +app.include_router(digest_router, prefix="/api/digest", tags=["digest"]) # TODO: Phase 5에서 추가 # app.include_router(tasks.router, prefix="/api/tasks", tags=["tasks"]) diff --git a/app/models/digest.py b/app/models/digest.py new file mode 100644 index 0000000..2fb4e63 --- /dev/null +++ b/app/models/digest.py @@ -0,0 +1,87 @@ +"""global_digests + digest_topics 테이블 ORM (Phase 4)""" + +from datetime import date, datetime + +from sqlalchemy import ( + BigInteger, + Boolean, + Date, + DateTime, + Float, + ForeignKey, + Integer, + String, + Text, +) +from sqlalchemy.dialects.postgresql import JSONB +from sqlalchemy.orm import Mapped, mapped_column, relationship + +from core.database import Base + + +class GlobalDigest(Base): + """하루 단위 digest run 메타데이터""" + + __tablename__ = "global_digests" + + id: Mapped[int] = mapped_column(BigInteger, primary_key=True) + digest_date: Mapped[date] = mapped_column(Date, nullable=False, unique=True) + window_start: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False) + window_end: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False) + decay_lambda: Mapped[float] = mapped_column(Float, nullable=False) + + total_articles: Mapped[int] = mapped_column(Integer, nullable=False, default=0) + total_countries: Mapped[int] = mapped_column(Integer, nullable=False, default=0) + total_topics: Mapped[int] = mapped_column(Integer, nullable=False, default=0) + + generation_ms: Mapped[int | None] = mapped_column(Integer) + llm_calls: Mapped[int] = mapped_column(Integer, nullable=False, default=0) + llm_failures: Mapped[int] = mapped_column(Integer, nullable=False, default=0) + status: Mapped[str] = mapped_column(String(20), nullable=False, default="success") + + created_at: Mapped[datetime] = mapped_column( + DateTime(timezone=True), nullable=False, default=datetime.now + ) + + topics: Mapped[list["DigestTopic"]] = relationship( + back_populates="digest", + cascade="all, delete-orphan", + order_by="DigestTopic.country, DigestTopic.topic_rank", + ) + + +class DigestTopic(Base): + """country × topic 단위 cluster 결과""" + + __tablename__ = "digest_topics" + + id: Mapped[int] = mapped_column(BigInteger, primary_key=True) + digest_id: Mapped[int] = mapped_column( + BigInteger, + ForeignKey("global_digests.id", ondelete="CASCADE"), + nullable=False, + ) + + country: Mapped[str] = mapped_column(String(10), nullable=False) + topic_rank: Mapped[int] = mapped_column(Integer, nullable=False) + + topic_label: Mapped[str] = mapped_column(Text, nullable=False) + summary: Mapped[str] = mapped_column(Text, nullable=False) + + article_ids: Mapped[list] = mapped_column(JSONB, nullable=False) + article_count: Mapped[int] = mapped_column(Integer, nullable=False) + + importance_score: Mapped[float] = mapped_column(Float, nullable=False) + raw_weight_sum: Mapped[float] = mapped_column(Float, nullable=False) + + centroid_sample: Mapped[dict | None] = mapped_column(JSONB) + llm_model: Mapped[str | None] = mapped_column(String(100)) + llm_fallback_used: Mapped[bool] = mapped_column( + Boolean, nullable=False, default=False + ) + + created_at: Mapped[datetime] = mapped_column( + DateTime(timezone=True), nullable=False, default=datetime.now + ) + + digest: Mapped["GlobalDigest"] = relationship(back_populates="topics") diff --git a/app/prompts/digest_topic.txt b/app/prompts/digest_topic.txt new file mode 100644 index 0000000..3b8bce4 --- /dev/null +++ b/app/prompts/digest_topic.txt @@ -0,0 +1,19 @@ +너는 팩트 기반 뉴스 토픽 요약 도우미다. +아래는 같은 사건으로 군집된 기사들의 ai_summary다. +이 정보만으로 다음을 JSON으로만 출력하라. + +절대 금지: +- 제공된 summary에 없는 사실 추가 +- 해석/비교/예측/의견 +- "보인다", "~할 것이다", "~할 전망" 같은 추측 표현 +- 인용부호 안 원문 외 단어 생성 +- JSON 외의 모든 텍스트 (설명, 마크다운, 코드블록 금지) + +출력 형식 (JSON 객체 하나만 출력): +{ + "topic_label": "5~10 단어의 한국어 제목", + "summary": "1~2 문장, 사실만, 수동태 허용" +} + +기사 요약: +{articles_block} diff --git a/app/services/digest/__init__.py b/app/services/digest/__init__.py new file mode 100644 index 0000000..d2bc3e4 --- /dev/null +++ b/app/services/digest/__init__.py @@ -0,0 +1 @@ +"""Phase 4 Global Digest 서비스 레이어 — 7일 뉴스 batch clustering + summarization.""" diff --git a/app/services/digest/clustering.py b/app/services/digest/clustering.py new file mode 100644 index 0000000..1a22f40 --- /dev/null +++ b/app/services/digest/clustering.py @@ -0,0 +1,118 @@ +"""Time-decay weight + adaptive threshold + EMA centroid greedy clustering. + +플랜의 핵심 결정: +- λ = ln(2)/3 (3일 반감기) +- threshold: 0.75 / 0.78 / 0.80 (밀도 기반 adaptive) +- centroid: EMA α=0.7 (단순 평균의 seed bias / drift 방어) +- min_articles_per_topic = 3, max_topics_per_country = 10 +- importance_score: country 내 0~1 normalize + max(score, 0.01) floor +- raw_weight_sum 별도 보존 (cross-day 트렌드 분석용) +""" + +import math +from datetime import datetime, timezone + +import numpy as np + +from core.utils import setup_logger + +logger = setup_logger("digest_clustering") + +LAMBDA = math.log(2) / 3 # 3일 반감기 — 사용자 확정값 +CENTROID_ALPHA = 0.7 # EMA: 기존 중심 70% 유지, 새 멤버 30% 반영 +MIN_ARTICLES_PER_TOPIC = 3 +MAX_TOPICS_PER_COUNTRY = 10 +SCORE_FLOOR = 0.01 # UI 0 표시 문제 사전 차단 + + +def adaptive_threshold(n_docs: int) -> float: + """문서 밀도 기반 동적 threshold — fragmentation/blob 동시 방어.""" + if n_docs > 200: + return 0.80 + if n_docs < 50: + return 0.75 + return 0.78 + + +def _normalize(v: np.ndarray) -> np.ndarray: + norm = float(np.linalg.norm(v)) + if norm == 0.0: + return v + return v / norm + + +def _decay_weight(now: datetime, created_at: datetime) -> float: + """exp(-λ * days_ago). created_at 이 naive 면 UTC 가정.""" + if created_at.tzinfo is None: + created_at = created_at.replace(tzinfo=timezone.utc) + days = (now - created_at).total_seconds() / 86400.0 + if days < 0: + days = 0.0 + return math.exp(-LAMBDA * days) + + +def cluster_country(country: str, docs: list[dict]) -> list[dict]: + """단일 country 의 docs 를 cluster 로 묶어 정렬 + normalize 후 반환. + + Args: + country: 국가 코드 (KR, US, ...) + docs: loader.load_news_window 의 출력 (단일 country 슬라이스) + + Returns: + [{centroid, members, weight_sum, raw_weight_sum, importance_score}, ...] + - members 는 weight 가 채워진 doc dict 리스트 + - 정렬: importance_score 내림차순, 최대 MAX_TOPICS_PER_COUNTRY 개 + """ + if not docs: + logger.info(f"[{country}] docs=0 → skip") + return [] + + threshold = adaptive_threshold(len(docs)) + now = datetime.now(timezone.utc) + + # time-decay weight 계산 + 가중치 높은 순으로 seed 우선 + for d in docs: + d["weight"] = _decay_weight(now, d["created_at"]) + docs.sort(key=lambda d: -d["weight"]) + + clusters: list[dict] = [] + for d in docs: + v = _normalize(d["embedding"]) + best_idx, best_sim = -1, 0.0 + for i, c in enumerate(clusters): + sim = float(np.dot(c["centroid"], v)) + if sim > best_sim and sim >= threshold: + best_sim, best_idx = sim, i + if best_idx >= 0: + c = clusters[best_idx] + # EMA centroid update — drift 방지 + c["centroid"] = CENTROID_ALPHA * c["centroid"] + (1.0 - CENTROID_ALPHA) * v + c["centroid"] = _normalize(c["centroid"]) + c["members"].append(d) + c["weight_sum"] += d["weight"] + else: + clusters.append({ + "centroid": v, + "members": [d], + "weight_sum": d["weight"], + }) + + raw_count = len(clusters) + clusters = [c for c in clusters if len(c["members"]) >= MIN_ARTICLES_PER_TOPIC] + dropped = raw_count - len(clusters) + clusters.sort(key=lambda c: -c["weight_sum"]) + clusters = clusters[:MAX_TOPICS_PER_COUNTRY] + + # country 내 normalize (0~1) + floor + if clusters: + max_w = max(c["weight_sum"] for c in clusters) + for c in clusters: + normalized = (c["weight_sum"] / max_w) if max_w > 0 else 0.0 + c["raw_weight_sum"] = c["weight_sum"] + c["importance_score"] = max(normalized, SCORE_FLOOR) + + logger.info( + f"[{country}] docs={len(docs)} threshold={threshold} " + f"raw_clusters={raw_count} dropped={dropped} kept={len(clusters)}" + ) + return clusters diff --git a/app/services/digest/loader.py b/app/services/digest/loader.py new file mode 100644 index 0000000..ccbc627 --- /dev/null +++ b/app/services/digest/loader.py @@ -0,0 +1,135 @@ +"""뉴스 7일 window 로드 + country 정규화 + +- documents 테이블엔 country 컬럼이 없으므로 document_chunks.country 를 first non-null 로 조인. +- chunk-level country 도 NULL 이면 news_sources.name prefix(ai_sub_group) 매칭으로 fallback. +- 그래도 NULL 이면 drop(로그 경고). +- ai_summary / embedding 이 NULL 이면 처음부터 제외 (재요약/재임베딩 0회 원칙). +""" + +from collections import defaultdict +from datetime import datetime +from typing import Any + +import numpy as np +from sqlalchemy import text + +from core.database import async_session +from core.utils import setup_logger + +logger = setup_logger("digest_loader") + + +_NEWS_WINDOW_SQL = text(""" + SELECT + d.id, + d.title, + d.ai_summary, + d.embedding, + d.created_at, + d.edit_url, + d.ai_sub_group, + ( + SELECT c.country + FROM document_chunks c + WHERE c.doc_id = d.id AND c.country IS NOT NULL + LIMIT 1 + ) AS chunk_country + FROM documents d + WHERE d.source_channel = 'news' + AND d.deleted_at IS NULL + AND d.created_at >= :window_start + AND d.created_at < :window_end + AND d.embedding IS NOT NULL + AND d.ai_summary IS NOT NULL +""") + + +_SOURCE_COUNTRY_SQL = text(""" + SELECT name, country FROM news_sources WHERE country IS NOT NULL +""") + + +def _to_numpy_embedding(raw: Any) -> np.ndarray | None: + """pgvector 컬럼을 numpy array(float32)로 정규화.""" + if raw is None: + return None + arr = np.asarray(raw, dtype=np.float32) + if arr.size == 0: + return None + return arr + + +async def _load_source_country_map(session) -> dict[str, str]: + """news_sources name → country 매핑. + + name 은 '경향신문 문화' 형태이고 documents.ai_sub_group 은 '경향신문' (split[0]). + prefix 매칭이 가능하도록 첫 토큰 → country 로 인덱싱. + """ + rows = await session.execute(_SOURCE_COUNTRY_SQL) + mapping: dict[str, str] = {} + for name, country in rows: + if not name or not country: + continue + prefix = name.split(" ")[0].strip() + if prefix and prefix not in mapping: + mapping[prefix] = country + return mapping + + +async def load_news_window( + window_start: datetime, + window_end: datetime, +) -> dict[str, list[dict]]: + """주어진 윈도우 안의 뉴스 documents 를 country 별 dict 로 반환. + + Returns: + {"KR": [doc_dict, ...], "US": [...], ...} + """ + docs_by_country: dict[str, list[dict]] = defaultdict(list) + null_country_count = 0 + total = 0 + + async with async_session() as session: + source_country = await _load_source_country_map(session) + + result = await session.execute( + _NEWS_WINDOW_SQL, + {"window_start": window_start, "window_end": window_end}, + ) + for row in result.mappings(): + embedding = _to_numpy_embedding(row["embedding"]) + if embedding is None: + continue + + country = row["chunk_country"] + if not country: + # news_sources prefix fallback + ai_sub_group = (row["ai_sub_group"] or "").strip() + if ai_sub_group: + country = source_country.get(ai_sub_group) + if not country: + null_country_count += 1 + continue + + country = country.upper() + docs_by_country[country].append({ + "id": int(row["id"]), + "title": row["title"] or "", + "ai_summary": row["ai_summary"] or "", + "embedding": embedding, + "created_at": row["created_at"], + "edit_url": row["edit_url"] or "", + "ai_sub_group": row["ai_sub_group"] or "", + }) + total += 1 + + if null_country_count: + logger.warning( + f"[loader] country 분류 실패로 drop된 문서 {null_country_count}건 " + f"(chunk_country + news_sources fallback 모두 실패)" + ) + logger.info( + f"[loader] window {window_start.date()} ~ {window_end.date()} → " + f"{total}건 ({len(docs_by_country)}개 국가)" + ) + return dict(docs_by_country) diff --git a/app/services/digest/pipeline.py b/app/services/digest/pipeline.py new file mode 100644 index 0000000..e4f45ab --- /dev/null +++ b/app/services/digest/pipeline.py @@ -0,0 +1,177 @@ +"""Phase 4 digest pipeline orchestration. + +Step: + 1. AIClient 생성 + 2. 7일 window 로 documents 로드 (loader) + 3. country 별 cluster_country (clustering) + 4. cluster 별 select_for_llm (selection) + 5. cluster 별 summarize_cluster_with_fallback (summarizer, LLM) + 6. DELETE+INSERT 단일 트랜잭션 (idempotent) + 7. start/end 로그 + generation_ms + fallback 비율 health metric +""" + +import hashlib +import time +from datetime import datetime, timedelta, timezone +from zoneinfo import ZoneInfo + +from sqlalchemy import delete + +from ai.client import AIClient +from core.database import async_session +from core.utils import setup_logger +from models.digest import DigestTopic, GlobalDigest + +from .clustering import LAMBDA, cluster_country +from .loader import load_news_window +from .selection import select_for_llm +from .summarizer import summarize_cluster_with_fallback + +logger = setup_logger("digest_pipeline") + +WINDOW_DAYS = 7 +KST = ZoneInfo("Asia/Seoul") + + +def _kst_today() -> datetime: + return datetime.now(KST).date() + + +def _summary_hash(text: str) -> str: + return hashlib.sha256((text or "").encode("utf-8")).hexdigest()[:16] + + +def _build_topic_row( + country: str, + rank: int, + cluster: dict, + selected: list[dict], + llm_result: dict, + primary_model: str, +) -> DigestTopic: + """LLM 결과 + cluster 메타 → DigestTopic ORM 인스턴스. + + article_ids 는 코드가 cluster.members 에서 직접 주입 (LLM 생성 금지 → id 위조 불가). + """ + article_ids = [int(m["id"]) for m in cluster["members"]] + centroid_sample = { + "selected_doc_ids": [int(m["id"]) for m in selected], + "summary_hashes": [_summary_hash(m.get("ai_summary") or "") for m in selected], + } + return DigestTopic( + country=country, + topic_rank=rank, + topic_label=llm_result["topic_label"], + summary=llm_result["summary"], + article_ids=article_ids, + article_count=len(article_ids), + importance_score=float(cluster["importance_score"]), + raw_weight_sum=float(cluster["raw_weight_sum"]), + centroid_sample=centroid_sample, + llm_model=primary_model, + llm_fallback_used=bool(llm_result["llm_fallback_used"]), + ) + + +async def run_digest_pipeline() -> dict: + """전체 파이프라인 실행. worker entry 에서 호출. + + Returns: + 실행 통계 dict {llm_calls, fallback_used, total_topics, generation_ms} + """ + start = time.time() + + window_end = datetime.now(timezone.utc) + window_start = window_end - timedelta(days=WINDOW_DAYS) + digest_date = _kst_today() + + logger.info( + f"[global_digest] start window={window_start.date()} ~ {window_end.date()} " + f"digest_date={digest_date} decay_lambda={LAMBDA:.4f}" + ) + + docs_by_country = await load_news_window(window_start, window_end) + if not docs_by_country: + logger.warning("[global_digest] 7일 window에 뉴스 0건 — digest 생성 스킵") + return { + "llm_calls": 0, + "fallback_used": 0, + "total_topics": 0, + "generation_ms": int((time.time() - start) * 1000), + } + + client = AIClient() + primary_model = client.ai.primary.model + + all_topic_rows: list[DigestTopic] = [] + stats = {"llm_calls": 0, "fallback_used": 0} + + try: + for country, docs in docs_by_country.items(): + clusters = cluster_country(country, docs) + if not clusters: + continue # sparse country 자동 제외 + + for rank, cluster in enumerate(clusters, start=1): + selected = select_for_llm(cluster) + stats["llm_calls"] += 1 + llm_result = await summarize_cluster_with_fallback(client, cluster, selected) + if llm_result["llm_fallback_used"]: + stats["fallback_used"] += 1 + all_topic_rows.append( + _build_topic_row(country, rank, cluster, selected, llm_result, primary_model) + ) + finally: + await client.close() + + generation_ms = int((time.time() - start) * 1000) + total_articles = sum(len(d) for d in docs_by_country.values()) + countries_with_topics = len({r.country for r in all_topic_rows}) + + if stats["fallback_used"] == 0: + status = "success" + elif stats["llm_calls"] and stats["fallback_used"] / stats["llm_calls"] > 0.5: + status = "failed" + else: + status = "partial" + + async with async_session() as session: + # idempotent: 같은 날짜 row 가 있으면 CASCADE 로 topics 까지 삭제 + await session.execute( + delete(GlobalDigest).where(GlobalDigest.digest_date == digest_date) + ) + new_digest = GlobalDigest( + digest_date=digest_date, + window_start=window_start, + window_end=window_end, + decay_lambda=LAMBDA, + total_articles=total_articles, + total_countries=countries_with_topics, + total_topics=len(all_topic_rows), + generation_ms=generation_ms, + llm_calls=stats["llm_calls"], + llm_failures=stats["fallback_used"], + status=status, + ) + new_digest.topics = all_topic_rows + session.add(new_digest) + await session.commit() + + fallback_pct = ( + (stats["fallback_used"] / stats["llm_calls"] * 100.0) + if stats["llm_calls"] else 0.0 + ) + logger.info( + f"[global_digest] done countries={countries_with_topics} " + f"topics={len(all_topic_rows)} llm_calls={stats['llm_calls']} " + f"fallback={stats['fallback_used']}/{stats['llm_calls']} ({fallback_pct:.2f}%) " + f"status={status} elapsed={generation_ms / 1000:.1f}s" + ) + + return { + "llm_calls": stats["llm_calls"], + "fallback_used": stats["fallback_used"], + "total_topics": len(all_topic_rows), + "generation_ms": generation_ms, + "status": status, + } diff --git a/app/services/digest/selection.py b/app/services/digest/selection.py new file mode 100644 index 0000000..4504009 --- /dev/null +++ b/app/services/digest/selection.py @@ -0,0 +1,62 @@ +"""Cluster 내 LLM 입력 선정 — top-k + MMR diversity + ai_summary truncate. + +순수 top-relevance 는 동일 사건 중복 요약문에 편향되므로 MMR 로 다양성 확보. +ai_summary 길이는 LLM 토큰 보호를 위해 SUMMARY_TRUNCATE 로 제한. +""" + +import numpy as np + +K_PER_CLUSTER = 5 +LAMBDA_MMR = 0.7 # relevance 70% / diversity 30% +SUMMARY_TRUNCATE = 300 # long tail ai_summary 방어 + + +def _normalize(v: np.ndarray) -> np.ndarray: + norm = float(np.linalg.norm(v)) + if norm == 0.0: + return v + return v / norm + + +def select_for_llm(cluster: dict, k: int = K_PER_CLUSTER) -> list[dict]: + """cluster 내 LLM 호출용 대표 article 들 선정. + + Args: + cluster: clustering.cluster_country 결과 단일 cluster + k: 선정 개수 (기본 5) + + Returns: + 선정된 doc dict 리스트. 각 항목에 ai_summary_truncated 필드가 추가됨. + """ + members = cluster["members"] + if len(members) <= k: + selected = list(members) + else: + centroid = cluster["centroid"] + # relevance = centroid 유사도 × decay weight + for m in members: + v = _normalize(m["embedding"]) + m["_rel"] = float(np.dot(centroid, v)) * m["weight"] + + first = max(members, key=lambda x: x["_rel"]) + selected = [first] + candidates = [m for m in members if m is not first] + + while len(selected) < k and candidates: + def mmr_score(c: dict) -> float: + v = _normalize(c["embedding"]) + max_sim = max( + float(np.dot(v, _normalize(s["embedding"]))) + for s in selected + ) + return LAMBDA_MMR * c["_rel"] - (1.0 - LAMBDA_MMR) * max_sim + + pick = max(candidates, key=mmr_score) + selected.append(pick) + candidates.remove(pick) + + # LLM 입력 토큰 보호 + for m in selected: + m["ai_summary_truncated"] = (m.get("ai_summary") or "")[:SUMMARY_TRUNCATE] + + return selected diff --git a/app/services/digest/summarizer.py b/app/services/digest/summarizer.py new file mode 100644 index 0000000..35d85fb --- /dev/null +++ b/app/services/digest/summarizer.py @@ -0,0 +1,123 @@ +"""Cluster-level LLM 호출 + JSON 파싱 + timeout + drop금지 fallback. + +핵심 결정: +- AIClient._call_chat 직접 호출 (client.py 수정 회피, fallback 로직 재사용) +- Semaphore(1) 로 MLX 과부하 회피 +- Per-call timeout 25초 (asyncio.wait_for) — MLX hang/Ollama stall 방어 +- JSON 파싱 실패 → 1회 재시도 → 그래도 실패 시 minimal fallback (drop 금지) +- fallback: topic_label="주요 뉴스 묶음", summary = top member ai_summary[:200] +""" + +import asyncio +from pathlib import Path +from typing import Any + +from ai.client import parse_json_response +from core.utils import setup_logger + +logger = setup_logger("digest_summarizer") + +LLM_CALL_TIMEOUT = 25 # 초. MLX 평균 5초 + tail latency 마진 +FALLBACK_SUMMARY_LIMIT = 200 + +_llm_sem = asyncio.Semaphore(1) + +_PROMPT_PATH = Path(__file__).resolve().parent.parent.parent / "prompts" / "digest_topic.txt" +_PROMPT_TEMPLATE: str | None = None + + +def _load_prompt() -> str: + global _PROMPT_TEMPLATE + if _PROMPT_TEMPLATE is None: + _PROMPT_TEMPLATE = _PROMPT_PATH.read_text(encoding="utf-8") + return _PROMPT_TEMPLATE + + +def build_prompt(selected: list[dict]) -> str: + """digest_topic.txt 템플릿에 selected article들의 ai_summary_truncated 주입. + + 템플릿 placeholder: {articles_block} + """ + template = _load_prompt() + lines = [] + for i, m in enumerate(selected, start=1): + text = (m.get("ai_summary_truncated") or m.get("ai_summary") or m.get("title") or "").strip() + lines.append(f"[{i}] {text}") + articles_block = "\n".join(lines) + return template.replace("{articles_block}", articles_block) + + +async def _try_call_llm(client: Any, prompt: str) -> str: + """Semaphore + per-call timeout 으로 감싼 단일 호출.""" + async with _llm_sem: + return await asyncio.wait_for( + client._call_chat(client.ai.primary, prompt), + timeout=LLM_CALL_TIMEOUT, + ) + + +def _make_fallback(cluster: dict) -> dict: + """cluster 의 top member 데이터로 minimal fallback 생성 — 정보 손실 회피.""" + members = cluster["members"] + if not members: + return { + "topic_label": "주요 뉴스 묶음", + "summary": "", + "llm_fallback_used": True, + } + top = max(members, key=lambda m: m.get("_rel", m.get("weight", 0.0))) + text = (top.get("ai_summary") or top.get("title") or "").strip() + return { + "topic_label": "주요 뉴스 묶음", + "summary": text[:FALLBACK_SUMMARY_LIMIT], + "llm_fallback_used": True, + } + + +async def summarize_cluster_with_fallback( + client: Any, + cluster: dict, + selected: list[dict], +) -> dict: + """cluster 1개에 대해 LLM 호출 + JSON 파싱 + fallback. + + Returns: + {topic_label, summary, llm_fallback_used} + """ + prompt = build_prompt(selected) + + for attempt in range(2): # 1회 재시도 포함 + try: + raw = await _try_call_llm(client, prompt) + except asyncio.TimeoutError: + logger.warning( + f"LLM 호출 timeout {LLM_CALL_TIMEOUT}s " + f"(attempt={attempt + 1}, cluster size={len(cluster['members'])})" + ) + continue + except Exception as e: + logger.warning( + f"LLM 호출 실패 attempt={attempt + 1} " + f"(cluster size={len(cluster['members'])}): {e}" + ) + continue + + parsed = parse_json_response(raw) + if ( + parsed + and isinstance(parsed.get("topic_label"), str) + and isinstance(parsed.get("summary"), str) + and parsed["topic_label"].strip() + and parsed["summary"].strip() + ): + return { + "topic_label": parsed["topic_label"].strip(), + "summary": parsed["summary"].strip(), + "llm_fallback_used": False, + } + logger.warning( + f"JSON 파싱 실패 attempt={attempt + 1} " + f"(cluster size={len(cluster['members'])}, raw_len={len(raw) if raw else 0})" + ) + + return _make_fallback(cluster) diff --git a/app/workers/digest_worker.py b/app/workers/digest_worker.py new file mode 100644 index 0000000..d3ebf45 --- /dev/null +++ b/app/workers/digest_worker.py @@ -0,0 +1,44 @@ +"""Phase 4: Global Digest 워커. + +7일 뉴스를 country × topic 으로 묶어 cluster-level LLM 요약을 생성하고 +global_digests / digest_topics 테이블에 저장한다. + +- APScheduler cron (매일 04:00 KST) + 수동 호출 공용 진입점 +- PIPELINE_HARD_CAP = 600초 hard cap 으로 cron stuck 절대 방지 +- 단독 실행: `python -m workers.digest_worker` +""" + +import asyncio + +from core.utils import setup_logger +from services.digest.pipeline import run_digest_pipeline + +logger = setup_logger("digest_worker") + +PIPELINE_HARD_CAP = 600 # 10분 hard cap + + +async def run() -> None: + """APScheduler + 수동 호출 공용 진입점. + + pipeline 자체는 timeout 으로 감싸지 않음 (per-call timeout 은 summarizer 가 처리). + 여기서는 전체 hard cap 만 강제. + """ + try: + result = await asyncio.wait_for( + run_digest_pipeline(), + timeout=PIPELINE_HARD_CAP, + ) + logger.info(f"[global_digest] 워커 완료: {result}") + except asyncio.TimeoutError: + logger.error( + f"[global_digest] HARD CAP {PIPELINE_HARD_CAP}s 초과 — 워커 강제 중단. " + f"기존 digest 는 commit 시점에만 갱신되므로 그대로 유지됨. " + f"다음 cron 실행에서 재시도." + ) + except Exception as e: + logger.exception(f"[global_digest] 워커 실패: {e}") + + +if __name__ == "__main__": + asyncio.run(run()) diff --git a/migrations/101_global_digests.sql b/migrations/101_global_digests.sql new file mode 100644 index 0000000..d513640 --- /dev/null +++ b/migrations/101_global_digests.sql @@ -0,0 +1,57 @@ +-- Phase 4 Global News Digest +-- 7일 rolling window 뉴스를 country × topic 2-level로 묶어 매일 새벽 4시 KST 배치 생성 +-- 검색 파이프라인 미사용. documents → clustering → cluster-level LLM summarization → digest +-- 사용자 결정: country→topic 2-level, cluster-level LLM only, drop 금지 fallback, +-- adaptive threshold, EMA centroid, time-decay (λ=ln(2)/3 ≈ 0.231) + +-- 부모 테이블: 하루 단위 digest run 메타데이터 +CREATE TABLE global_digests ( + id BIGSERIAL PRIMARY KEY, + digest_date DATE NOT NULL, -- KST 기준 생성일 + window_start TIMESTAMPTZ NOT NULL, -- rolling window 시작 (UTC) + window_end TIMESTAMPTZ NOT NULL, -- 생성 시점 (UTC) + decay_lambda DOUBLE PRECISION NOT NULL, -- 실제 사용된 time-decay λ + + total_articles INTEGER NOT NULL DEFAULT 0, + total_countries INTEGER NOT NULL DEFAULT 0, + total_topics INTEGER NOT NULL DEFAULT 0, + + generation_ms INTEGER, -- 워커 실행 시간 (성능 회귀 감지) + llm_calls INTEGER NOT NULL DEFAULT 0, + llm_failures INTEGER NOT NULL DEFAULT 0, -- = fallback 사용 횟수 + status VARCHAR(20) NOT NULL DEFAULT 'success', -- success | partial | failed + + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + + UNIQUE (digest_date) -- idempotency: 같은 날짜 재실행 시 DELETE+INSERT +); + +CREATE INDEX idx_global_digests_date ON global_digests (digest_date DESC); + +-- 자식 테이블: country × topic 단위 +CREATE TABLE digest_topics ( + id BIGSERIAL PRIMARY KEY, + digest_id BIGINT NOT NULL REFERENCES global_digests(id) ON DELETE CASCADE, + + country VARCHAR(10) NOT NULL, -- KR | US | JP | CN | FR | DE | ... + topic_rank INTEGER NOT NULL, -- country 내 1..N (importance_score 내림차순) + + topic_label TEXT NOT NULL, -- LLM 생성 5~10 단어 한국어 (또는 fallback 시 "주요 뉴스 묶음") + summary TEXT NOT NULL, -- LLM 생성 1~2 문장 factual (또는 fallback 시 top member ai_summary[:200]) + + article_ids JSONB NOT NULL, -- [doc_id, ...] 코드가 주입 (LLM 생성 금지) + article_count INTEGER NOT NULL, -- = jsonb_array_length(article_ids) + + importance_score DOUBLE PRECISION NOT NULL, -- batch 내 country별 0~1 normalized (cross-country 비교) + raw_weight_sum DOUBLE PRECISION NOT NULL, -- 정규화 전 decay 가중합 (디버그 + day-over-day 트렌드) + + centroid_sample JSONB, -- 디버그: LLM 입력 doc id 목록 + summary hash + llm_model VARCHAR(100), -- 사용된 모델 (primary/fallback 추적) + llm_fallback_used BOOLEAN NOT NULL DEFAULT FALSE, -- LLM 실패 시 minimal fallback 적용 여부 + + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); + +CREATE INDEX idx_digest_topics_digest ON digest_topics (digest_id); +CREATE INDEX idx_digest_topics_country ON digest_topics (country); +CREATE INDEX idx_digest_topics_rank ON digest_topics (digest_id, country, topic_rank);