Files
hyungi_document_server/app/workers/news_collector.py
T
Hyungi Ahn 751cdc5be8 fix(queue): enqueue 경로 중복 방어 — partial unique index + 중앙 enqueue_stage 함수
기존 UNIQUE(document_id, stage, status)는 pending+processing 동시 존재를
허용해서 stale 복구 시 충돌 발생. 2-layer 방어로 근본 차단:

1) DB: partial unique index uq_queue_active — 활성 행(pending/processing)은
   (document_id, stage)당 최대 1개만 허용
2) App: enqueue_stage() 중앙 함수 — INSERT ON CONFLICT DO NOTHING으로
   모든 9개 경로의 check-then-insert TOCTOU race 제거

migration 117은 guard check 포함 — 활성 중복이 남아있으면 RAISE EXCEPTION
으로 중단, 수동 정리 유도.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-15 08:37:32 +09:00

321 lines
11 KiB
Python

"""뉴스 수집 워커 — RSS/API에서 기사 수집, documents에 저장"""
import hashlib
import re
from datetime import datetime, timezone
from html import unescape
from urllib.parse import urlparse, urlunparse
import feedparser
import httpx
from sqlalchemy import select
from core.database import async_session
from core.utils import setup_logger
from models.document import Document
from models.news_source import NewsSource
from models.queue import enqueue_stage
logger = setup_logger("news_collector")
# 카테고리 표준화 매핑
CATEGORY_MAP = {
# 한국어
"국제": "International", "정치": "Politics", "경제": "Economy",
"사회": "Society", "문화": "Culture", "산업": "Industry",
"환경": "Environment", "기술": "Technology",
# 영어
"World": "International", "International": "International",
"Technology": "Technology", "Tech": "Technology", "Sci-Tech": "Technology",
"Arts": "Culture", "Culture": "Culture",
"Climate": "Environment", "Environment": "Environment",
# 일본어
"国際": "International", "文化": "Culture", "科学": "Technology",
# 독일어
"Kultur": "Culture", "Wissenschaft": "Technology",
# 프랑스어
"Environnement": "Environment",
}
def _normalize_category(raw: str) -> str:
"""카테고리 표준화"""
return CATEGORY_MAP.get(raw, CATEGORY_MAP.get(raw.strip(), "Other"))
def _clean_html(text: str) -> str:
"""HTML 태그 제거 + 정제"""
if not text:
return ""
text = re.sub(r"<[^>]+>", "", text)
text = unescape(text)
return text.strip()[:1000]
def _normalize_url(url: str) -> str:
"""URL 정규화 (tracking params 제거)"""
parsed = urlparse(url)
return urlunparse((parsed.scheme, parsed.netloc, parsed.path, "", "", ""))
def _article_hash(title: str, published: str, source_name: str) -> str:
"""기사 고유 해시 (중복 체크용)"""
key = f"{title}|{published}|{source_name}"
return hashlib.sha256(key.encode()).hexdigest()[:32]
def _normalize_to_utc(dt) -> datetime:
"""다양한 시간 형식을 UTC로 정규화"""
if isinstance(dt, datetime):
if dt.tzinfo is None:
return dt.replace(tzinfo=timezone.utc)
return dt.astimezone(timezone.utc)
return datetime.now(timezone.utc)
async def run():
"""뉴스 수집 실행"""
async with async_session() as session:
result = await session.execute(
select(NewsSource).where(NewsSource.enabled == True)
)
sources = result.scalars().all()
if not sources:
logger.info("활성화된 뉴스 소스 없음")
return
total = 0
for source in sources:
try:
if source.feed_type == "api":
count = await _fetch_api(session, source)
else:
count = await _fetch_rss(session, source)
source.last_fetched_at = datetime.now(timezone.utc)
total += count
except Exception as e:
logger.error(f"[{source.name}] 수집 실패: {e}")
source.last_fetched_at = datetime.now(timezone.utc)
await session.commit()
logger.info(f"뉴스 수집 완료: {total}건 신규")
MAX_RESPONSE_SIZE = 5 * 1024 * 1024 # 5MB
ALLOWED_CONTENT_TYPES = ("application/rss+xml", "application/atom+xml",
"application/xml", "text/xml")
async def _fetch_rss(session, source: NewsSource) -> int:
"""RSS 피드 수집 — redirect 재검증 + 크기/content-type 제한"""
from urllib.parse import urljoin
from core.url_validator import validate_feed_url, HTTP_EXCEPTION_DOMAINS
# HTTP 허용 여부: 소스 도메인이 allowlist에 있으면 HTTP 허용
# SCMP처럼 HTTPS 원본이 HTTP로 redirect하는 경우도 커버
source_hostname = urlparse(source.feed_url).hostname
http_allowed = source_hostname in HTTP_EXCEPTION_DOMAINS
# 순수 HTTP 소스인데 allowlist에 없으면 차단
if source.feed_url.startswith("http://") and not http_allowed:
logger.error(f"[{source.name}] HTTP 차단 (allowlist 미등록): {source_hostname}")
return 0
# fetch 전 URL 재검증 (등록 이후 DNS 변경 대비)
try:
validate_feed_url(source.feed_url, allow_http=http_allowed)
except ValueError as e:
logger.error(f"[{source.name}] URL 검증 실패: {e}")
return 0
async with httpx.AsyncClient(timeout=10, follow_redirects=False) as client:
resp = await client.get(source.feed_url)
# redirect 수동 처리 (최대 3회, 각 target 재검증)
# allowlist 도메인이면 redirect target의 HTTP도 허용
redirects = 0
while resp.is_redirect and redirects < 3:
location = resp.headers.get("location", "")
location = urljoin(str(resp.request.url), location)
try:
validate_feed_url(location, allow_http=http_allowed)
except ValueError as e:
logger.error(f"[{source.name}] redirect target 차단: {e}")
return 0
resp = await client.get(location)
redirects += 1
if resp.is_redirect:
logger.error(f"[{source.name}] redirect 3회 초과")
return 0
resp.raise_for_status()
if len(resp.content) > MAX_RESPONSE_SIZE:
logger.warning(f"[{source.name}] 응답 크기 초과: {len(resp.content)} bytes")
return 0
ct = resp.headers.get("content-type", "").lower()
if not any(t in ct for t in ALLOWED_CONTENT_TYPES):
logger.warning(f"[{source.name}] 비정상 content-type: {ct}")
return 0
feed = feedparser.parse(resp.text)
if feed.bozo and not feed.entries:
logger.warning(f"[{source.name}] RSS 파싱 실패: {feed.bozo_exception}")
return 0
count = 0
for entry in feed.entries:
title = entry.get("title", "").strip()
if not title:
continue
summary = _clean_html(entry.get("summary", "") or entry.get("description", ""))
if not summary:
summary = title
link = entry.get("link", "")
published = entry.get("published_parsed") or entry.get("updated_parsed")
pub_dt = datetime(*published[:6], tzinfo=timezone.utc) if published else datetime.now(timezone.utc)
# 중복 체크
article_id = _article_hash(title, pub_dt.strftime("%Y%m%d"), source.name)
normalized_url = _normalize_url(link)
existing = await session.execute(
select(Document).where(
(Document.file_hash == article_id) |
(Document.edit_url == normalized_url)
)
)
if existing.scalar_one_or_none():
continue
category = _normalize_category(source.category or "")
source_short = source.name.split(" ")[0] # "경향신문 문화" → "경향신문"
doc = Document(
file_path=f"news/{source.name}/{article_id}",
file_hash=article_id,
file_format="article",
file_size=len(summary.encode()),
file_type="note",
title=title,
extracted_text=f"{title}\n\n{summary}",
extracted_at=datetime.now(timezone.utc),
extractor_version="rss",
source_channel="news",
data_origin="external",
edit_url=link,
review_status="approved",
ai_domain="News",
ai_sub_group=source_short,
ai_tags=[f"News/{source_short}/{category}"],
)
session.add(doc)
await session.flush()
# summarize + embed 등록 (classify 불필요)
await enqueue_stage(session, doc.id, "summarize")
days_old = (datetime.now(timezone.utc) - pub_dt).days
if days_old <= 30:
await enqueue_stage(session, doc.id, "embed")
count += 1
logger.info(f"[{source.name}] RSS → {count}건 수집")
return count
async def _fetch_api(session, source: NewsSource) -> int:
"""NYT API 수집 — 키 마스킹 + health degradation"""
import os
nyt_key = os.getenv("NYT_API_KEY", "")
if not nyt_key:
logger.error("NYT_API_KEY 미설정 — US 뉴스 수집 불가")
return 0
try:
async with httpx.AsyncClient(timeout=10) as client:
resp = await client.get(
f"https://api.nytimes.com/svc/topstories/v2/{source.category or 'world'}.json",
params={"api-key": nyt_key},
)
resp.raise_for_status()
except httpx.HTTPStatusError as e:
# 쿼리스트링(api-key 포함) 제거 — path까지만 로깅
safe_url = str(e.request.url).split("?")[0]
logger.error(f"NYT API 실패: {e.response.status_code} @ {safe_url}")
return 0
except httpx.RequestError as e:
safe_url = str(e.request.url).split("?")[0] if e.request else "unknown"
logger.error(f"NYT API 연결 실패: {safe_url}")
return 0
data = resp.json()
count = 0
for article in data.get("results", []):
title = article.get("title", "").strip()
if not title:
continue
summary = _clean_html(article.get("abstract", ""))
if not summary:
summary = title
link = article.get("url", "")
pub_str = article.get("published_date", "")
try:
pub_dt = datetime.fromisoformat(pub_str.replace("Z", "+00:00"))
except (ValueError, AttributeError):
pub_dt = datetime.now(timezone.utc)
article_id = _article_hash(title, pub_dt.strftime("%Y%m%d"), source.name)
normalized_url = _normalize_url(link)
existing = await session.execute(
select(Document).where(
(Document.file_hash == article_id) |
(Document.edit_url == normalized_url)
)
)
if existing.scalar_one_or_none():
continue
category = _normalize_category(article.get("section", source.category or ""))
source_short = source.name.split(" ")[0]
doc = Document(
file_path=f"news/{source.name}/{article_id}",
file_hash=article_id,
file_format="article",
file_size=len(summary.encode()),
file_type="note",
title=title,
extracted_text=f"{title}\n\n{summary}",
extracted_at=datetime.now(timezone.utc),
extractor_version="nyt_api",
source_channel="news",
data_origin="external",
edit_url=link,
review_status="approved",
ai_domain="News",
ai_sub_group=source_short,
ai_tags=[f"News/{source_short}/{category}"],
)
session.add(doc)
await session.flush()
await enqueue_stage(session, doc.id, "summarize")
days_old = (datetime.now(timezone.utc) - pub_dt).days
if days_old <= 30:
await enqueue_stage(session, doc.id, "embed")
count += 1
logger.info(f"[{source.name}] API → {count}건 수집")
return count