diff --git a/app/api/news.py b/app/api/news.py new file mode 100644 index 0000000..dfabe96 --- /dev/null +++ b/app/api/news.py @@ -0,0 +1,109 @@ +"""뉴스 소스 관리 API""" + +from typing import Annotated + +from fastapi import APIRouter, Depends, HTTPException +from pydantic import BaseModel +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession + +from core.auth import get_current_user +from core.database import get_session +from models.news_source import NewsSource +from models.user import User + +router = APIRouter() + + +class NewsSourceResponse(BaseModel): + id: int + name: str + country: str | None + feed_url: str + feed_type: str + category: str | None + language: str | None + enabled: bool + last_fetched_at: str | None + created_at: str + + class Config: + from_attributes = True + + +class NewsSourceCreate(BaseModel): + name: str + country: str | None = None + feed_url: str + feed_type: str = "rss" + category: str | None = None + language: str | None = None + + +class NewsSourceUpdate(BaseModel): + name: str | None = None + feed_url: str | None = None + category: str | None = None + enabled: bool | None = None + + +@router.get("/sources") +async def list_sources( + user: Annotated[User, Depends(get_current_user)], + session: Annotated[AsyncSession, Depends(get_session)], +): + result = await session.execute(select(NewsSource).order_by(NewsSource.id)) + return [NewsSourceResponse.model_validate(s) for s in result.scalars().all()] + + +@router.post("/sources") +async def create_source( + body: NewsSourceCreate, + user: Annotated[User, Depends(get_current_user)], + session: Annotated[AsyncSession, Depends(get_session)], +): + source = NewsSource(**body.model_dump()) + session.add(source) + await session.commit() + return NewsSourceResponse.model_validate(source) + + +@router.patch("/sources/{source_id}") +async def update_source( + source_id: int, + body: NewsSourceUpdate, + user: Annotated[User, Depends(get_current_user)], + session: Annotated[AsyncSession, Depends(get_session)], +): + source = await session.get(NewsSource, source_id) + if not source: + raise HTTPException(status_code=404) + for field, value in body.model_dump(exclude_unset=True).items(): + setattr(source, field, value) + await session.commit() + return NewsSourceResponse.model_validate(source) + + +@router.delete("/sources/{source_id}") +async def delete_source( + source_id: int, + user: Annotated[User, Depends(get_current_user)], + session: Annotated[AsyncSession, Depends(get_session)], +): + source = await session.get(NewsSource, source_id) + if not source: + raise HTTPException(status_code=404) + await session.delete(source) + await session.commit() + return {"message": f"소스 {source_id} 삭제됨"} + + +@router.post("/collect") +async def trigger_collect( + user: Annotated[User, Depends(get_current_user)], +): + """수동 수집 트리거""" + from workers.news_collector import run + import asyncio + asyncio.create_task(run()) + return {"message": "뉴스 수집 시작됨"} diff --git a/app/main.py b/app/main.py index 72ed2f4..3cb6415 100644 --- a/app/main.py +++ b/app/main.py @@ -9,6 +9,7 @@ from sqlalchemy import func, select, text from api.auth import router as auth_router from api.dashboard import router as dashboard_router from api.documents import router as documents_router +from api.news import router as news_router from api.search import router as search_router from api.setup import router as setup_router from core.config import settings @@ -25,6 +26,7 @@ async def lifespan(app: FastAPI): from workers.file_watcher import watch_inbox from workers.law_monitor import run as law_monitor_run from workers.mailplus_archive import run as mailplus_run + from workers.news_collector import run as news_collector_run from workers.queue_consumer import consume_queue # 시작: DB 연결 확인 @@ -49,6 +51,7 @@ async def lifespan(app: FastAPI): scheduler.add_job(mailplus_run, CronTrigger(hour=7), id="mailplus_morning") scheduler.add_job(mailplus_run, CronTrigger(hour=18), id="mailplus_evening") scheduler.add_job(daily_digest_run, CronTrigger(hour=20), id="daily_digest") + scheduler.add_job(news_collector_run, "interval", hours=6, id="news_collector") scheduler.start() yield @@ -72,6 +75,7 @@ app.include_router(documents_router, prefix="/api/documents", tags=["documents"] app.include_router(search_router, prefix="/api/search", tags=["search"]) app.include_router(dashboard_router, prefix="/api/dashboard", tags=["dashboard"]) +app.include_router(news_router, prefix="/api/news", tags=["news"]) # TODO: Phase 5에서 추가 # app.include_router(tasks.router, prefix="/api/tasks", tags=["tasks"]) diff --git a/app/models/news_source.py b/app/models/news_source.py new file mode 100644 index 0000000..0373d64 --- /dev/null +++ b/app/models/news_source.py @@ -0,0 +1,25 @@ +"""news_sources 테이블 ORM""" + +from datetime import datetime + +from sqlalchemy import Boolean, DateTime, String, Text +from sqlalchemy.orm import Mapped, mapped_column + +from core.database import Base + + +class NewsSource(Base): + __tablename__ = "news_sources" + + id: Mapped[int] = mapped_column(primary_key=True) + name: Mapped[str] = mapped_column(String(100), nullable=False) + country: Mapped[str | None] = mapped_column(String(10)) + feed_url: Mapped[str] = mapped_column(Text, nullable=False) + feed_type: Mapped[str] = mapped_column(String(20), default="rss") + category: Mapped[str | None] = mapped_column(String(50)) + language: Mapped[str | None] = mapped_column(String(10)) + enabled: Mapped[bool] = mapped_column(Boolean, default=True) + last_fetched_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True)) + created_at: Mapped[datetime] = mapped_column( + DateTime(timezone=True), default=datetime.now + ) diff --git a/app/requirements.txt b/app/requirements.txt index 726d27f..6ff0e38 100644 --- a/app/requirements.txt +++ b/app/requirements.txt @@ -15,3 +15,4 @@ anthropic>=0.40.0 markdown>=3.5.0 python-multipart>=0.0.9 jinja2>=3.1.0 +feedparser>=6.0.0 diff --git a/app/workers/news_collector.py b/app/workers/news_collector.py new file mode 100644 index 0000000..350bc7a --- /dev/null +++ b/app/workers/news_collector.py @@ -0,0 +1,250 @@ +"""뉴스 수집 워커 — RSS/API에서 기사 수집, documents에 저장""" + +import hashlib +import re +from datetime import datetime, timezone +from html import unescape +from urllib.parse import urlparse, urlunparse + +import feedparser +import httpx +from sqlalchemy import select + +from core.database import async_session +from core.utils import setup_logger +from models.document import Document +from models.news_source import NewsSource +from models.queue import ProcessingQueue + +logger = setup_logger("news_collector") + +# 카테고리 표준화 매핑 +CATEGORY_MAP = { + # 한국어 + "국제": "International", "정치": "Politics", "경제": "Economy", + "사회": "Society", "문화": "Culture", "산업": "Industry", + "환경": "Environment", "기술": "Technology", + # 영어 + "World": "International", "International": "International", + "Technology": "Technology", "Tech": "Technology", "Sci-Tech": "Technology", + "Arts": "Culture", "Culture": "Culture", + "Climate": "Environment", "Environment": "Environment", + # 일본어 + "国際": "International", "文化": "Culture", "科学": "Technology", + # 독일어 + "Kultur": "Culture", "Wissenschaft": "Technology", + # 프랑스어 + "Environnement": "Environment", +} + + +def _normalize_category(raw: str) -> str: + """카테고리 표준화""" + return CATEGORY_MAP.get(raw, CATEGORY_MAP.get(raw.strip(), "Other")) + + +def _clean_html(text: str) -> str: + """HTML 태그 제거 + 정제""" + if not text: + return "" + text = re.sub(r"<[^>]+>", "", text) + text = unescape(text) + return text.strip()[:1000] + + +def _normalize_url(url: str) -> str: + """URL 정규화 (tracking params 제거)""" + parsed = urlparse(url) + return urlunparse((parsed.scheme, parsed.netloc, parsed.path, "", "", "")) + + +def _article_hash(title: str, published: str, source_name: str) -> str: + """기사 고유 해시 (중복 체크용)""" + key = f"{title}|{published}|{source_name}" + return hashlib.sha256(key.encode()).hexdigest()[:32] + + +def _normalize_to_utc(dt) -> datetime: + """다양한 시간 형식을 UTC로 정규화""" + if isinstance(dt, datetime): + if dt.tzinfo is None: + return dt.replace(tzinfo=timezone.utc) + return dt.astimezone(timezone.utc) + return datetime.now(timezone.utc) + + +async def run(): + """뉴스 수집 실행""" + async with async_session() as session: + result = await session.execute( + select(NewsSource).where(NewsSource.enabled == True) + ) + sources = result.scalars().all() + + if not sources: + logger.info("활성화된 뉴스 소스 없음") + return + + total = 0 + for source in sources: + try: + if source.feed_type == "api": + count = await _fetch_api(session, source) + else: + count = await _fetch_rss(session, source) + + source.last_fetched_at = datetime.now(timezone.utc) + total += count + except Exception as e: + logger.error(f"[{source.name}] 수집 실패: {e}") + source.last_fetched_at = datetime.now(timezone.utc) + + await session.commit() + logger.info(f"뉴스 수집 완료: {total}건 신규") + + +async def _fetch_rss(session, source: NewsSource) -> int: + """RSS 피드 수집""" + async with httpx.AsyncClient(timeout=10) as client: + resp = await client.get(source.feed_url) + resp.raise_for_status() + + feed = feedparser.parse(resp.text) + count = 0 + + for entry in feed.entries: + title = entry.get("title", "").strip() + if not title: + continue + + summary = _clean_html(entry.get("summary", "") or entry.get("description", "")) + if not summary: + summary = title + + link = entry.get("link", "") + published = entry.get("published_parsed") or entry.get("updated_parsed") + pub_dt = datetime(*published[:6], tzinfo=timezone.utc) if published else datetime.now(timezone.utc) + + # 중복 체크 + article_id = _article_hash(title, pub_dt.strftime("%Y%m%d"), source.name) + normalized_url = _normalize_url(link) + + existing = await session.execute( + select(Document).where( + (Document.file_hash == article_id) | + (Document.edit_url == normalized_url) + ) + ) + if existing.scalar_one_or_none(): + continue + + category = _normalize_category(source.category or "") + + doc = Document( + file_path=f"news/{source.name}/{article_id}", + file_hash=article_id, + file_format="article", + file_size=len(summary.encode()), + file_type="note", + title=title, + extracted_text=f"{title}\n\n{summary}", + extracted_at=datetime.now(timezone.utc), + extractor_version="rss", + source_channel="news", + data_origin="external", + edit_url=link, + review_status="approved", + ) + session.add(doc) + await session.flush() + + # classify + embed 큐 등록 (extract 불필요) + session.add(ProcessingQueue(document_id=doc.id, stage="classify", status="pending")) + + # 30일 이내만 embed + days_old = (datetime.now(timezone.utc) - pub_dt).days + if days_old <= 30: + session.add(ProcessingQueue(document_id=doc.id, stage="embed", status="pending")) + + count += 1 + + logger.info(f"[{source.name}] RSS → {count}건 수집") + return count + + +async def _fetch_api(session, source: NewsSource) -> int: + """NYT API 수집""" + import os + nyt_key = os.getenv("NYT_API_KEY", "") + if not nyt_key: + logger.warning("NYT_API_KEY 미설정") + return 0 + + async with httpx.AsyncClient(timeout=10) as client: + resp = await client.get( + f"https://api.nytimes.com/svc/topstories/v2/{source.category or 'world'}.json", + params={"api-key": nyt_key}, + ) + resp.raise_for_status() + + data = resp.json() + count = 0 + + for article in data.get("results", []): + title = article.get("title", "").strip() + if not title: + continue + + summary = _clean_html(article.get("abstract", "")) + if not summary: + summary = title + + link = article.get("url", "") + pub_str = article.get("published_date", "") + try: + pub_dt = datetime.fromisoformat(pub_str.replace("Z", "+00:00")) + except (ValueError, AttributeError): + pub_dt = datetime.now(timezone.utc) + + article_id = _article_hash(title, pub_dt.strftime("%Y%m%d"), source.name) + normalized_url = _normalize_url(link) + + existing = await session.execute( + select(Document).where( + (Document.file_hash == article_id) | + (Document.edit_url == normalized_url) + ) + ) + if existing.scalar_one_or_none(): + continue + + category = _normalize_category(article.get("section", source.category or "")) + + doc = Document( + file_path=f"news/{source.name}/{article_id}", + file_hash=article_id, + file_format="article", + file_size=len(summary.encode()), + file_type="note", + title=title, + extracted_text=f"{title}\n\n{summary}", + extracted_at=datetime.now(timezone.utc), + extractor_version="nyt_api", + source_channel="news", + data_origin="external", + edit_url=link, + review_status="approved", + ) + session.add(doc) + await session.flush() + + session.add(ProcessingQueue(document_id=doc.id, stage="classify", status="pending")) + + days_old = (datetime.now(timezone.utc) - pub_dt).days + if days_old <= 30: + session.add(ProcessingQueue(document_id=doc.id, stage="embed", status="pending")) + + count += 1 + + logger.info(f"[{source.name}] API → {count}건 수집") + return count diff --git a/migrations/012_news_sources.sql b/migrations/012_news_sources.sql new file mode 100644 index 0000000..76d5bce --- /dev/null +++ b/migrations/012_news_sources.sql @@ -0,0 +1,16 @@ +-- 뉴스 소스 관리 테이블 +CREATE TABLE IF NOT EXISTS news_sources ( + id SERIAL PRIMARY KEY, + name VARCHAR(100) NOT NULL, + country VARCHAR(10), + feed_url TEXT NOT NULL, + feed_type VARCHAR(20) DEFAULT 'rss', + category VARCHAR(50), + language VARCHAR(10), + enabled BOOLEAN DEFAULT true, + last_fetched_at TIMESTAMPTZ, + created_at TIMESTAMPTZ DEFAULT NOW() +); + +-- source_channel enum에 'news' 추가 +ALTER TYPE source_channel ADD VALUE IF NOT EXISTS 'news';