Files
hyungi_document_server/app/api/digest.py
hyungi aa2d7814e3 feat(digest): date picker URL sync + article→문서 라우팅 + country 국기·한국어
- GET /api/digest/dates 신설 (브리핑 /briefing/dates 패턴 미러, read-only)
- topic article 제목 enrich (documents 배치 1쿼리 + dedupe(set) + map-miss=null → 프론트 '(제목 없음)')
- /digest 재작성: ?date=&country= URL sync(공유·뒤로가기), 국가 탭=인라인 SVG 국기+한국어, 기사=/documents/{id} 링크(상위5+펼치기)
- Phase 4.5(PR #22) 후속. 검증: py_compile·dates/enrich 쿼리(275 resolve·miss 0)·frontend docker build PASS. 시각 렌더 검증=preview 게이트 대기

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-03 23:39:07 +00:00

251 lines
8.5 KiB
Python

"""Phase 4 Global Digest API — read-only + 디버그 regenerate.
엔드포인트:
- GET /api/digest/latest : 가장 최근 digest
- GET /api/digest/dates : 생성된 digest 날짜 목록 (date picker 용)
- GET /api/digest?date=YYYY-MM-DD : 특정 날짜 digest
- GET /api/digest?country=KR : 특정 국가만
- POST /api/digest/regenerate : 백그라운드 digest 워커 트리거 (auth 필요)
응답은 country → topic 2-level 구조. country 가 비어있는 경우 응답에서 자동 생략.
각 topic 은 article_ids(doc_id) 와 함께 articles([{id, title}]) 를 반환 — title 은 documents
배치 조회로 채우며(한 digest 당 1 쿼리), 매칭 없는 id(하드삭제 등)는 title=null 로 둔다
(프론트는 "(제목 없음)" 으로 렌더, 빈 링크 금지). article → /documents/{id} 라우팅용.
"""
import asyncio
from datetime import date as date_type
from datetime import datetime
from typing import Annotated
from fastapi import APIRouter, Depends, HTTPException, Query
from pydantic import BaseModel
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload
from core.auth import get_current_user, require_admin
from core.database import get_session
from models.digest import DigestTopic, GlobalDigest
from models.document import Document
from models.user import User
router = APIRouter()
# ─── Pydantic 응답 모델 (schemas/ 디렉토리 미사용 → inline 정의) ───
class ArticleRef(BaseModel):
id: int
title: str | None = None
class TopicResponse(BaseModel):
topic_rank: int
topic_label: str
summary: str
article_ids: list[int]
articles: list[ArticleRef]
article_count: int
importance_score: float
raw_weight_sum: float
llm_fallback_used: bool
class CountryGroup(BaseModel):
country: str
topics: list[TopicResponse]
class DigestResponse(BaseModel):
digest_date: date_type
window_start: datetime
window_end: datetime
decay_lambda: float
total_articles: int
total_countries: int
total_topics: int
generation_ms: int | None
llm_calls: int
llm_failures: int
status: str
countries: list[CountryGroup]
class DigestDateSummary(BaseModel):
"""date picker 용 경량 요약 (브리핑 /briefing/dates 와 동형)."""
digest_date: date_type
total_topics: int
total_countries: int
total_articles: int
status: str
# ─── helpers ───
def _collect_article_ids(digest: GlobalDigest) -> set[int]:
"""digest 의 모든 topic article_ids 를 dedupe 한 set (배치 title 조회용).
같은 기사가 여러 topic 에 걸리면 중복 id 가 생기므로 set 으로 한 번 줄인다.
"""
ids: set[int] = set()
for t in digest.topics:
for aid in t.article_ids or []:
try:
ids.add(int(aid))
except (TypeError, ValueError):
continue
return ids
async def _fetch_titles(session: AsyncSession, ids: set[int]) -> dict[int, str | None]:
"""doc_id → title 배치 조회. 매칭 없는 id 는 map 에 부재(호출부가 None 처리)."""
if not ids:
return {}
result = await session.execute(
select(Document.id, Document.title).where(Document.id.in_(ids))
)
return {row.id: row.title for row in result.all()}
def _build_response(
digest: GlobalDigest,
title_map: dict[int, str | None],
country_filter: str | None = None,
) -> DigestResponse:
"""ORM 객체 → DigestResponse. country_filter 가 주어지면 해당 국가만.
title_map miss(삭제/아카이브된 문서)는 title=None 으로 — 프론트가 "(제목 없음)" 처리.
"""
topics_by_country: dict[str, list[TopicResponse]] = {}
for t in sorted(digest.topics, key=lambda x: (x.country, x.topic_rank)):
if country_filter and t.country != country_filter:
continue
ids = [int(a) for a in (t.article_ids or [])]
topics_by_country.setdefault(t.country, []).append(
TopicResponse(
topic_rank=t.topic_rank,
topic_label=t.topic_label,
summary=t.summary,
article_ids=ids,
articles=[ArticleRef(id=aid, title=title_map.get(aid)) for aid in ids],
article_count=t.article_count,
importance_score=t.importance_score,
raw_weight_sum=t.raw_weight_sum,
llm_fallback_used=t.llm_fallback_used,
)
)
countries = [
CountryGroup(country=c, topics=topics_by_country[c])
for c in sorted(topics_by_country.keys())
]
return DigestResponse(
digest_date=digest.digest_date,
window_start=digest.window_start,
window_end=digest.window_end,
decay_lambda=digest.decay_lambda,
total_articles=digest.total_articles,
total_countries=digest.total_countries,
total_topics=digest.total_topics,
generation_ms=digest.generation_ms,
llm_calls=digest.llm_calls,
llm_failures=digest.llm_failures,
status=digest.status,
countries=countries,
)
async def _load_digest(
session: AsyncSession,
target_date: date_type | None,
) -> GlobalDigest | None:
"""date 가 주어지면 해당 날짜, 아니면 최신 digest 1건."""
query = select(GlobalDigest).options(selectinload(GlobalDigest.topics))
if target_date is not None:
query = query.where(GlobalDigest.digest_date == target_date)
else:
query = query.order_by(GlobalDigest.digest_date.desc())
query = query.limit(1)
result = await session.execute(query)
return result.scalar_one_or_none()
async def _respond(session: AsyncSession, digest: GlobalDigest, country_filter: str | None = None) -> DigestResponse:
"""digest 1건 → article 제목 배치 enrich 후 응답 빌드."""
title_map = await _fetch_titles(session, _collect_article_ids(digest))
return _build_response(digest, title_map, country_filter=country_filter)
# ─── Routes ───
@router.get("/latest", response_model=DigestResponse)
async def get_latest(
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
):
"""가장 최근 생성된 global digest."""
digest = await _load_digest(session, target_date=None)
if digest is None:
raise HTTPException(status_code=404, detail="아직 생성된 digest 없음")
return await _respond(session, digest)
@router.get("/dates", response_model=list[DigestDateSummary])
async def list_dates(
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
limit: int = Query(default=60, ge=1, le=365, description="최신부터 N개"),
):
"""생성된 digest 날짜 목록 (date picker 용, 최신 내림차순)."""
query = (
select(GlobalDigest)
.order_by(GlobalDigest.digest_date.desc())
.limit(limit)
)
rows = (await session.execute(query)).scalars().all()
return [
DigestDateSummary(
digest_date=g.digest_date,
total_topics=g.total_topics,
total_countries=g.total_countries,
total_articles=g.total_articles,
status=g.status,
)
for g in rows
]
@router.get("", response_model=DigestResponse)
async def get_digest(
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
date: date_type | None = Query(default=None, description="YYYY-MM-DD (KST)"),
country: str | None = Query(default=None, description="국가 코드 (예: KR)"),
):
"""특정 날짜 또는 국가 필터링된 digest. date 미지정 시 최신."""
digest = await _load_digest(session, target_date=date)
if digest is None:
raise HTTPException(
status_code=404,
detail=f"digest 없음 (date={date})" if date else "아직 생성된 digest 없음",
)
country_filter = country.upper() if country else None
return await _respond(session, digest, country_filter=country_filter)
@router.post("/regenerate")
async def regenerate(
user: Annotated[User, Depends(require_admin)],
):
"""수동 트리거 — 백그라운드 태스크로 워커 실행 (admin 필요)."""
from workers.digest_worker import run
asyncio.create_task(run())
return {"status": "started", "message": "global_digest 워커 백그라운드 실행 시작"}