5038007998
- 신규 url_validator.py: SSRF 차단 (private IP/loopback/link-local/reserved/multicast/CGNAT 블록, HTTPS only) - require_admin dependency 추가 — 소스 CRUD, /collect, /digest/regenerate에 적용 - User.is_admin 컬럼 + migration 104 - NYT API key 로그 마스킹 (쿼리스트링 제거) - RSS fetch: redirect 수동 처리(3회, target 재검증), 5MB 크기 제한, content-type 허용목록, feed.bozo 체크 - /collect 재진입 차단 (asyncio.Lock, 단일 인스턴스 한정) - HTTP feed allowlist (코드 레벨 상수, API 미노출) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
317 lines
11 KiB
Python
317 lines
11 KiB
Python
"""뉴스 수집 워커 — RSS/API에서 기사 수집, documents에 저장"""
|
|
|
|
import hashlib
|
|
import re
|
|
from datetime import datetime, timezone
|
|
from html import unescape
|
|
from urllib.parse import urlparse, urlunparse
|
|
|
|
import feedparser
|
|
import httpx
|
|
from sqlalchemy import select
|
|
|
|
from core.database import async_session
|
|
from core.utils import setup_logger
|
|
from models.document import Document
|
|
from models.news_source import NewsSource
|
|
from models.queue import ProcessingQueue
|
|
|
|
logger = setup_logger("news_collector")
|
|
|
|
# 카테고리 표준화 매핑
|
|
CATEGORY_MAP = {
|
|
# 한국어
|
|
"국제": "International", "정치": "Politics", "경제": "Economy",
|
|
"사회": "Society", "문화": "Culture", "산업": "Industry",
|
|
"환경": "Environment", "기술": "Technology",
|
|
# 영어
|
|
"World": "International", "International": "International",
|
|
"Technology": "Technology", "Tech": "Technology", "Sci-Tech": "Technology",
|
|
"Arts": "Culture", "Culture": "Culture",
|
|
"Climate": "Environment", "Environment": "Environment",
|
|
# 일본어
|
|
"国際": "International", "文化": "Culture", "科学": "Technology",
|
|
# 독일어
|
|
"Kultur": "Culture", "Wissenschaft": "Technology",
|
|
# 프랑스어
|
|
"Environnement": "Environment",
|
|
}
|
|
|
|
|
|
def _normalize_category(raw: str) -> str:
|
|
"""카테고리 표준화"""
|
|
return CATEGORY_MAP.get(raw, CATEGORY_MAP.get(raw.strip(), "Other"))
|
|
|
|
|
|
def _clean_html(text: str) -> str:
|
|
"""HTML 태그 제거 + 정제"""
|
|
if not text:
|
|
return ""
|
|
text = re.sub(r"<[^>]+>", "", text)
|
|
text = unescape(text)
|
|
return text.strip()[:1000]
|
|
|
|
|
|
def _normalize_url(url: str) -> str:
|
|
"""URL 정규화 (tracking params 제거)"""
|
|
parsed = urlparse(url)
|
|
return urlunparse((parsed.scheme, parsed.netloc, parsed.path, "", "", ""))
|
|
|
|
|
|
def _article_hash(title: str, published: str, source_name: str) -> str:
|
|
"""기사 고유 해시 (중복 체크용)"""
|
|
key = f"{title}|{published}|{source_name}"
|
|
return hashlib.sha256(key.encode()).hexdigest()[:32]
|
|
|
|
|
|
def _normalize_to_utc(dt) -> datetime:
|
|
"""다양한 시간 형식을 UTC로 정규화"""
|
|
if isinstance(dt, datetime):
|
|
if dt.tzinfo is None:
|
|
return dt.replace(tzinfo=timezone.utc)
|
|
return dt.astimezone(timezone.utc)
|
|
return datetime.now(timezone.utc)
|
|
|
|
|
|
async def run():
|
|
"""뉴스 수집 실행"""
|
|
async with async_session() as session:
|
|
result = await session.execute(
|
|
select(NewsSource).where(NewsSource.enabled == True)
|
|
)
|
|
sources = result.scalars().all()
|
|
|
|
if not sources:
|
|
logger.info("활성화된 뉴스 소스 없음")
|
|
return
|
|
|
|
total = 0
|
|
for source in sources:
|
|
try:
|
|
if source.feed_type == "api":
|
|
count = await _fetch_api(session, source)
|
|
else:
|
|
count = await _fetch_rss(session, source)
|
|
|
|
source.last_fetched_at = datetime.now(timezone.utc)
|
|
total += count
|
|
except Exception as e:
|
|
logger.error(f"[{source.name}] 수집 실패: {e}")
|
|
source.last_fetched_at = datetime.now(timezone.utc)
|
|
|
|
await session.commit()
|
|
logger.info(f"뉴스 수집 완료: {total}건 신규")
|
|
|
|
|
|
MAX_RESPONSE_SIZE = 5 * 1024 * 1024 # 5MB
|
|
ALLOWED_CONTENT_TYPES = ("application/rss+xml", "application/atom+xml",
|
|
"application/xml", "text/xml")
|
|
|
|
|
|
async def _fetch_rss(session, source: NewsSource) -> int:
|
|
"""RSS 피드 수집 — redirect 재검증 + 크기/content-type 제한"""
|
|
from urllib.parse import urljoin
|
|
from core.url_validator import validate_feed_url, HTTP_EXCEPTION_DOMAINS
|
|
|
|
# HTTP allowlist 체크
|
|
if source.feed_url.startswith("http://"):
|
|
hostname = urlparse(source.feed_url).hostname
|
|
if hostname not in HTTP_EXCEPTION_DOMAINS:
|
|
logger.error(f"[{source.name}] HTTP 차단 (allowlist 미등록): {hostname}")
|
|
return 0
|
|
|
|
# fetch 전 URL 재검증 (등록 이후 DNS 변경 대비)
|
|
try:
|
|
validate_feed_url(source.feed_url, allow_http=source.feed_url.startswith("http://"))
|
|
except ValueError as e:
|
|
logger.error(f"[{source.name}] URL 검증 실패: {e}")
|
|
return 0
|
|
|
|
async with httpx.AsyncClient(timeout=10, follow_redirects=False) as client:
|
|
resp = await client.get(source.feed_url)
|
|
|
|
# redirect 수동 처리 (최대 3회, 각 target 재검증)
|
|
redirects = 0
|
|
while resp.is_redirect and redirects < 3:
|
|
location = resp.headers.get("location", "")
|
|
location = urljoin(str(resp.request.url), location)
|
|
try:
|
|
validate_feed_url(location, allow_http=source.feed_url.startswith("http://"))
|
|
except ValueError as e:
|
|
logger.error(f"[{source.name}] redirect target 차단: {e}")
|
|
return 0
|
|
resp = await client.get(location)
|
|
redirects += 1
|
|
if resp.is_redirect:
|
|
logger.error(f"[{source.name}] redirect 3회 초과")
|
|
return 0
|
|
|
|
resp.raise_for_status()
|
|
|
|
if len(resp.content) > MAX_RESPONSE_SIZE:
|
|
logger.warning(f"[{source.name}] 응답 크기 초과: {len(resp.content)} bytes")
|
|
return 0
|
|
|
|
ct = resp.headers.get("content-type", "").lower()
|
|
if not any(t in ct for t in ALLOWED_CONTENT_TYPES):
|
|
logger.warning(f"[{source.name}] 비정상 content-type: {ct}")
|
|
return 0
|
|
|
|
feed = feedparser.parse(resp.text)
|
|
if feed.bozo and not feed.entries:
|
|
logger.warning(f"[{source.name}] RSS 파싱 실패: {feed.bozo_exception}")
|
|
return 0
|
|
count = 0
|
|
|
|
for entry in feed.entries:
|
|
title = entry.get("title", "").strip()
|
|
if not title:
|
|
continue
|
|
|
|
summary = _clean_html(entry.get("summary", "") or entry.get("description", ""))
|
|
if not summary:
|
|
summary = title
|
|
|
|
link = entry.get("link", "")
|
|
published = entry.get("published_parsed") or entry.get("updated_parsed")
|
|
pub_dt = datetime(*published[:6], tzinfo=timezone.utc) if published else datetime.now(timezone.utc)
|
|
|
|
# 중복 체크
|
|
article_id = _article_hash(title, pub_dt.strftime("%Y%m%d"), source.name)
|
|
normalized_url = _normalize_url(link)
|
|
|
|
existing = await session.execute(
|
|
select(Document).where(
|
|
(Document.file_hash == article_id) |
|
|
(Document.edit_url == normalized_url)
|
|
)
|
|
)
|
|
if existing.scalar_one_or_none():
|
|
continue
|
|
|
|
category = _normalize_category(source.category or "")
|
|
source_short = source.name.split(" ")[0] # "경향신문 문화" → "경향신문"
|
|
|
|
doc = Document(
|
|
file_path=f"news/{source.name}/{article_id}",
|
|
file_hash=article_id,
|
|
file_format="article",
|
|
file_size=len(summary.encode()),
|
|
file_type="note",
|
|
title=title,
|
|
extracted_text=f"{title}\n\n{summary}",
|
|
extracted_at=datetime.now(timezone.utc),
|
|
extractor_version="rss",
|
|
source_channel="news",
|
|
data_origin="external",
|
|
edit_url=link,
|
|
review_status="approved",
|
|
ai_domain="News",
|
|
ai_sub_group=source_short,
|
|
ai_tags=[f"News/{source_short}/{category}"],
|
|
)
|
|
session.add(doc)
|
|
await session.flush()
|
|
|
|
# summarize + embed 등록 (classify 불필요)
|
|
session.add(ProcessingQueue(document_id=doc.id, stage="summarize", status="pending"))
|
|
days_old = (datetime.now(timezone.utc) - pub_dt).days
|
|
if days_old <= 30:
|
|
session.add(ProcessingQueue(document_id=doc.id, stage="embed", status="pending"))
|
|
|
|
count += 1
|
|
|
|
logger.info(f"[{source.name}] RSS → {count}건 수집")
|
|
return count
|
|
|
|
|
|
async def _fetch_api(session, source: NewsSource) -> int:
|
|
"""NYT API 수집 — 키 마스킹 + health degradation"""
|
|
import os
|
|
nyt_key = os.getenv("NYT_API_KEY", "")
|
|
if not nyt_key:
|
|
logger.error("NYT_API_KEY 미설정 — US 뉴스 수집 불가")
|
|
return 0
|
|
|
|
try:
|
|
async with httpx.AsyncClient(timeout=10) as client:
|
|
resp = await client.get(
|
|
f"https://api.nytimes.com/svc/topstories/v2/{source.category or 'world'}.json",
|
|
params={"api-key": nyt_key},
|
|
)
|
|
resp.raise_for_status()
|
|
except httpx.HTTPStatusError as e:
|
|
# 쿼리스트링(api-key 포함) 제거 — path까지만 로깅
|
|
safe_url = str(e.request.url).split("?")[0]
|
|
logger.error(f"NYT API 실패: {e.response.status_code} @ {safe_url}")
|
|
return 0
|
|
except httpx.RequestError as e:
|
|
safe_url = str(e.request.url).split("?")[0] if e.request else "unknown"
|
|
logger.error(f"NYT API 연결 실패: {safe_url}")
|
|
return 0
|
|
|
|
data = resp.json()
|
|
count = 0
|
|
|
|
for article in data.get("results", []):
|
|
title = article.get("title", "").strip()
|
|
if not title:
|
|
continue
|
|
|
|
summary = _clean_html(article.get("abstract", ""))
|
|
if not summary:
|
|
summary = title
|
|
|
|
link = article.get("url", "")
|
|
pub_str = article.get("published_date", "")
|
|
try:
|
|
pub_dt = datetime.fromisoformat(pub_str.replace("Z", "+00:00"))
|
|
except (ValueError, AttributeError):
|
|
pub_dt = datetime.now(timezone.utc)
|
|
|
|
article_id = _article_hash(title, pub_dt.strftime("%Y%m%d"), source.name)
|
|
normalized_url = _normalize_url(link)
|
|
|
|
existing = await session.execute(
|
|
select(Document).where(
|
|
(Document.file_hash == article_id) |
|
|
(Document.edit_url == normalized_url)
|
|
)
|
|
)
|
|
if existing.scalar_one_or_none():
|
|
continue
|
|
|
|
category = _normalize_category(article.get("section", source.category or ""))
|
|
source_short = source.name.split(" ")[0]
|
|
|
|
doc = Document(
|
|
file_path=f"news/{source.name}/{article_id}",
|
|
file_hash=article_id,
|
|
file_format="article",
|
|
file_size=len(summary.encode()),
|
|
file_type="note",
|
|
title=title,
|
|
extracted_text=f"{title}\n\n{summary}",
|
|
extracted_at=datetime.now(timezone.utc),
|
|
extractor_version="nyt_api",
|
|
source_channel="news",
|
|
data_origin="external",
|
|
edit_url=link,
|
|
review_status="approved",
|
|
ai_domain="News",
|
|
ai_sub_group=source_short,
|
|
ai_tags=[f"News/{source_short}/{category}"],
|
|
)
|
|
session.add(doc)
|
|
await session.flush()
|
|
|
|
session.add(ProcessingQueue(document_id=doc.id, stage="summarize", status="pending"))
|
|
days_old = (datetime.now(timezone.utc) - pub_dt).days
|
|
if days_old <= 30:
|
|
session.add(ProcessingQueue(document_id=doc.id, stage="embed", status="pending"))
|
|
|
|
count += 1
|
|
|
|
logger.info(f"[{source.name}] API → {count}건 수집")
|
|
return count
|