"""뉴스 소스 관리 API""" from datetime import datetime from typing import Annotated from fastapi import APIRouter, Depends, HTTPException from pydantic import BaseModel from sqlalchemy import String, select from sqlalchemy.ext.asyncio import AsyncSession from core.auth import get_current_user, require_admin from core.database import get_session from models.news_source import NewsSource from models.user import User router = APIRouter() class NewsSourceResponse(BaseModel): id: int name: str country: str | None feed_url: str feed_type: str category: str | None language: str | None enabled: bool last_fetched_at: datetime | None = None created_at: datetime | None = None class Config: from_attributes = True class NewsSourceCreate(BaseModel): name: str country: str | None = None feed_url: str feed_type: str = "rss" category: str | None = None language: str | None = None class NewsSourceUpdate(BaseModel): name: str | None = None feed_url: str | None = None category: str | None = None enabled: bool | None = None @router.get("/sources") async def list_sources( user: Annotated[User, Depends(get_current_user)], session: Annotated[AsyncSession, Depends(get_session)], ): result = await session.execute(select(NewsSource).order_by(NewsSource.id)) return [NewsSourceResponse.model_validate(s) for s in result.scalars().all()] @router.post("/sources") async def create_source( body: NewsSourceCreate, user: Annotated[User, Depends(require_admin)], session: Annotated[AsyncSession, Depends(get_session)], ): from core.url_validator import validate_feed_url try: # getaddrinfo(DNS) 는 blocking — 이벤트 루프 점유 방지 위해 off-thread (R5) await asyncio.to_thread(validate_feed_url, body.feed_url) except ValueError as e: raise HTTPException(status_code=422, detail=f"feed_url 검증 실패: {e}") source = NewsSource(**body.model_dump()) session.add(source) await session.commit() return NewsSourceResponse.model_validate(source) @router.patch("/sources/{source_id}") async def update_source( source_id: int, body: NewsSourceUpdate, user: Annotated[User, Depends(require_admin)], session: Annotated[AsyncSession, Depends(get_session)], ): source = await session.get(NewsSource, source_id) if not source: raise HTTPException(status_code=404) if body.feed_url is not None: from core.url_validator import validate_feed_url try: validate_feed_url(body.feed_url) except ValueError as e: raise HTTPException(status_code=422, detail=f"feed_url 검증 실패: {e}") for field, value in body.model_dump(exclude_unset=True).items(): setattr(source, field, value) await session.commit() return NewsSourceResponse.model_validate(source) @router.delete("/sources/{source_id}") async def delete_source( source_id: int, user: Annotated[User, Depends(require_admin)], session: Annotated[AsyncSession, Depends(get_session)], ): source = await session.get(NewsSource, source_id) if not source: raise HTTPException(status_code=404) await session.delete(source) await session.commit() return {"message": f"소스 {source_id} 삭제됨"} @router.get("/articles") async def list_articles( user: Annotated[User, Depends(get_current_user)], session: Annotated[AsyncSession, Depends(get_session)], source: str | None = None, unread_only: bool = False, pinned_only: bool = False, page: int = 1, page_size: int = 30, ): """뉴스 기사 목록""" from sqlalchemy import func from models.document import Document query = select(Document).where( Document.source_channel == "news", Document.deleted_at == None, ) if source: if '/' in source: # 신문사/분야 형태 → file_path에서 폴더명 매칭 # source = "경향신문/문화" → file_path LIKE 'news/경향신문 문화/%' folder = source.replace('/', ' ') query = query.where(Document.file_path.like(f"news/{folder}/%")) else: # 신문사만 → ai_sub_group query = query.where(Document.ai_sub_group == source) if unread_only: query = query.where(Document.is_read == False) if pinned_only: query = query.where(Document.pinned.is_(True)) count_q = select(func.count()).select_from(query.subquery()) total = (await session.execute(count_q)).scalar() query = query.order_by(Document.is_read.asc(), Document.created_at.desc()) query = query.offset((page - 1) * page_size).limit(page_size) result = await session.execute(query) items = result.scalars().all() from api.documents import DocumentResponse return { "items": [DocumentResponse.model_validate(doc) for doc in items], "total": total, "page": page, } @router.post("/mark-all-read") async def mark_all_read( user: Annotated[User, Depends(get_current_user)], session: Annotated[AsyncSession, Depends(get_session)], ): """전체 읽음 처리""" from sqlalchemy import update from models.document import Document result = await session.execute( update(Document) .where(Document.source_channel == "news", Document.is_read == False) .values(is_read=True) ) await session.commit() return {"marked": result.rowcount} import asyncio _collect_lock = asyncio.Lock() @router.post("/collect") async def trigger_collect( user: Annotated[User, Depends(require_admin)], ): """수동 수집 트리거 (admin 전용). asyncio.Lock은 단일 프로세스/이벤트루프 기준. 현재 FastAPI 단일 인스턴스 운영이므로 유효하지만, scale-out 시 DB advisory lock으로 교체 필요. """ if _collect_lock.locked(): raise HTTPException(status_code=429, detail="수집이 이미 진행 중입니다") # TOCTOU 제거 (R9) — 기존엔 locked() 체크 후 실제 acquire 가 별도 task 안에서 일어나, 그 # 사이 다른 요청이 끼어들어 이중 수집 task 가 생길 수 있었다. 핸들러에서 동기적으로(uncontended # Lock.acquire 는 이벤트루프 양보 없이 즉시 완료) acquire 하고 task 의 finally 에서 release. await _collect_lock.acquire() async def _run_with_lock(): try: from workers.news_collector import run await run() finally: _collect_lock.release() asyncio.create_task(_run_with_lock()) return {"message": "뉴스 수집 시작됨"}