08cf676c26
document_chunks.country 가 7일 분포 기준 99.9% NULL 이었던 root cause = news_collector 가 summarize + embed 만 enqueue 하고 chunk 를 enqueue 하지 않아 chunk_worker 가 news 문서에 한 번도 안 돌고 있었음. queue_consumer.next_stages 의 summarize 키 부재가 follow-up 미연결 원인. news 외 summarize 흐름 부수영향 회피를 위해 next_stages 가 아니라 news_collector RSS/API 양쪽에 chunk enqueue 1줄씩 명시 추가. days_old <= 30 가드 안에서 embed 와 동일 정책. scripts/news_chunk_country_backfill.py — doc 단위 small batch, 실패 doc skip, 50건마다 progress. queue 우회 직접 chunk_worker.process 호출로 timing 통제. Gate (PR closure): A) chunked_doc_pct > 95% 최근 7일 news doc 중 chunk 보유 비율 B) country null_pct < 5% 최근 7일 news chunk country NULL 비율 plan: ~/.claude/plans/7-whimsical-crab.md (PR-News-Prep-Layer-1) Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
323 lines
11 KiB
Python
323 lines
11 KiB
Python
"""뉴스 수집 워커 — RSS/API에서 기사 수집, documents에 저장"""
|
|
|
|
import hashlib
|
|
import re
|
|
from datetime import datetime, timezone
|
|
from html import unescape
|
|
from urllib.parse import urlparse, urlunparse
|
|
|
|
import feedparser
|
|
import httpx
|
|
from sqlalchemy import select
|
|
|
|
from core.database import async_session
|
|
from core.utils import setup_logger
|
|
from models.document import Document
|
|
from models.news_source import NewsSource
|
|
from models.queue import enqueue_stage
|
|
|
|
logger = setup_logger("news_collector")
|
|
|
|
# 카테고리 표준화 매핑
|
|
CATEGORY_MAP = {
|
|
# 한국어
|
|
"국제": "International", "정치": "Politics", "경제": "Economy",
|
|
"사회": "Society", "문화": "Culture", "산업": "Industry",
|
|
"환경": "Environment", "기술": "Technology",
|
|
# 영어
|
|
"World": "International", "International": "International",
|
|
"Technology": "Technology", "Tech": "Technology", "Sci-Tech": "Technology",
|
|
"Arts": "Culture", "Culture": "Culture",
|
|
"Climate": "Environment", "Environment": "Environment",
|
|
# 일본어
|
|
"国際": "International", "文化": "Culture", "科学": "Technology",
|
|
# 독일어
|
|
"Kultur": "Culture", "Wissenschaft": "Technology",
|
|
# 프랑스어
|
|
"Environnement": "Environment",
|
|
}
|
|
|
|
|
|
def _normalize_category(raw: str) -> str:
|
|
"""카테고리 표준화"""
|
|
return CATEGORY_MAP.get(raw, CATEGORY_MAP.get(raw.strip(), "Other"))
|
|
|
|
|
|
def _clean_html(text: str) -> str:
|
|
"""HTML 태그 제거 + 정제"""
|
|
if not text:
|
|
return ""
|
|
text = re.sub(r"<[^>]+>", "", text)
|
|
text = unescape(text)
|
|
return text.strip()[:1000]
|
|
|
|
|
|
def _normalize_url(url: str) -> str:
|
|
"""URL 정규화 (tracking params 제거)"""
|
|
parsed = urlparse(url)
|
|
return urlunparse((parsed.scheme, parsed.netloc, parsed.path, "", "", ""))
|
|
|
|
|
|
def _article_hash(title: str, published: str, source_name: str) -> str:
|
|
"""기사 고유 해시 (중복 체크용)"""
|
|
key = f"{title}|{published}|{source_name}"
|
|
return hashlib.sha256(key.encode()).hexdigest()[:32]
|
|
|
|
|
|
def _normalize_to_utc(dt) -> datetime:
|
|
"""다양한 시간 형식을 UTC로 정규화"""
|
|
if isinstance(dt, datetime):
|
|
if dt.tzinfo is None:
|
|
return dt.replace(tzinfo=timezone.utc)
|
|
return dt.astimezone(timezone.utc)
|
|
return datetime.now(timezone.utc)
|
|
|
|
|
|
async def run():
|
|
"""뉴스 수집 실행"""
|
|
async with async_session() as session:
|
|
result = await session.execute(
|
|
select(NewsSource).where(NewsSource.enabled == True)
|
|
)
|
|
sources = result.scalars().all()
|
|
|
|
if not sources:
|
|
logger.info("활성화된 뉴스 소스 없음")
|
|
return
|
|
|
|
total = 0
|
|
for source in sources:
|
|
try:
|
|
if source.feed_type == "api":
|
|
count = await _fetch_api(session, source)
|
|
else:
|
|
count = await _fetch_rss(session, source)
|
|
|
|
source.last_fetched_at = datetime.now(timezone.utc)
|
|
total += count
|
|
except Exception as e:
|
|
logger.error(f"[{source.name}] 수집 실패: {e}")
|
|
source.last_fetched_at = datetime.now(timezone.utc)
|
|
|
|
await session.commit()
|
|
logger.info(f"뉴스 수집 완료: {total}건 신규")
|
|
|
|
|
|
MAX_RESPONSE_SIZE = 5 * 1024 * 1024 # 5MB
|
|
ALLOWED_CONTENT_TYPES = ("application/rss+xml", "application/atom+xml",
|
|
"application/xml", "text/xml")
|
|
|
|
|
|
async def _fetch_rss(session, source: NewsSource) -> int:
|
|
"""RSS 피드 수집 — redirect 재검증 + 크기/content-type 제한"""
|
|
from urllib.parse import urljoin
|
|
from core.url_validator import validate_feed_url, HTTP_EXCEPTION_DOMAINS
|
|
|
|
# HTTP 허용 여부: 소스 도메인이 allowlist에 있으면 HTTP 허용
|
|
# SCMP처럼 HTTPS 원본이 HTTP로 redirect하는 경우도 커버
|
|
source_hostname = urlparse(source.feed_url).hostname
|
|
http_allowed = source_hostname in HTTP_EXCEPTION_DOMAINS
|
|
|
|
# 순수 HTTP 소스인데 allowlist에 없으면 차단
|
|
if source.feed_url.startswith("http://") and not http_allowed:
|
|
logger.error(f"[{source.name}] HTTP 차단 (allowlist 미등록): {source_hostname}")
|
|
return 0
|
|
|
|
# fetch 전 URL 재검증 (등록 이후 DNS 변경 대비)
|
|
try:
|
|
validate_feed_url(source.feed_url, allow_http=http_allowed)
|
|
except ValueError as e:
|
|
logger.error(f"[{source.name}] URL 검증 실패: {e}")
|
|
return 0
|
|
|
|
async with httpx.AsyncClient(timeout=10, follow_redirects=False) as client:
|
|
resp = await client.get(source.feed_url)
|
|
|
|
# redirect 수동 처리 (최대 3회, 각 target 재검증)
|
|
# allowlist 도메인이면 redirect target의 HTTP도 허용
|
|
redirects = 0
|
|
while resp.is_redirect and redirects < 3:
|
|
location = resp.headers.get("location", "")
|
|
location = urljoin(str(resp.request.url), location)
|
|
try:
|
|
validate_feed_url(location, allow_http=http_allowed)
|
|
except ValueError as e:
|
|
logger.error(f"[{source.name}] redirect target 차단: {e}")
|
|
return 0
|
|
resp = await client.get(location)
|
|
redirects += 1
|
|
if resp.is_redirect:
|
|
logger.error(f"[{source.name}] redirect 3회 초과")
|
|
return 0
|
|
|
|
resp.raise_for_status()
|
|
|
|
if len(resp.content) > MAX_RESPONSE_SIZE:
|
|
logger.warning(f"[{source.name}] 응답 크기 초과: {len(resp.content)} bytes")
|
|
return 0
|
|
|
|
ct = resp.headers.get("content-type", "").lower()
|
|
if not any(t in ct for t in ALLOWED_CONTENT_TYPES):
|
|
logger.warning(f"[{source.name}] 비정상 content-type: {ct}")
|
|
return 0
|
|
|
|
feed = feedparser.parse(resp.text)
|
|
if feed.bozo and not feed.entries:
|
|
logger.warning(f"[{source.name}] RSS 파싱 실패: {feed.bozo_exception}")
|
|
return 0
|
|
count = 0
|
|
|
|
for entry in feed.entries:
|
|
title = entry.get("title", "").strip()
|
|
if not title:
|
|
continue
|
|
|
|
summary = _clean_html(entry.get("summary", "") or entry.get("description", ""))
|
|
if not summary:
|
|
summary = title
|
|
|
|
link = entry.get("link", "")
|
|
published = entry.get("published_parsed") or entry.get("updated_parsed")
|
|
pub_dt = datetime(*published[:6], tzinfo=timezone.utc) if published else datetime.now(timezone.utc)
|
|
|
|
# 중복 체크
|
|
article_id = _article_hash(title, pub_dt.strftime("%Y%m%d"), source.name)
|
|
normalized_url = _normalize_url(link)
|
|
|
|
existing = await session.execute(
|
|
select(Document).where(
|
|
(Document.file_hash == article_id) |
|
|
(Document.edit_url == normalized_url)
|
|
)
|
|
)
|
|
if existing.scalar_one_or_none():
|
|
continue
|
|
|
|
category = _normalize_category(source.category or "")
|
|
source_short = source.name.split(" ")[0] # "경향신문 문화" → "경향신문"
|
|
|
|
doc = Document(
|
|
file_path=f"news/{source.name}/{article_id}",
|
|
file_hash=article_id,
|
|
file_format="article",
|
|
file_size=len(summary.encode()),
|
|
file_type="note",
|
|
title=title,
|
|
extracted_text=f"{title}\n\n{summary}",
|
|
extracted_at=datetime.now(timezone.utc),
|
|
extractor_version="rss",
|
|
source_channel="news",
|
|
data_origin="external",
|
|
edit_url=link,
|
|
review_status="approved",
|
|
ai_domain="News",
|
|
ai_sub_group=source_short,
|
|
ai_tags=[f"News/{source_short}/{category}"],
|
|
)
|
|
session.add(doc)
|
|
await session.flush()
|
|
|
|
# summarize + embed + chunk 등록 (classify 불필요)
|
|
await enqueue_stage(session, doc.id, "summarize")
|
|
days_old = (datetime.now(timezone.utc) - pub_dt).days
|
|
if days_old <= 30:
|
|
await enqueue_stage(session, doc.id, "embed")
|
|
await enqueue_stage(session, doc.id, "chunk")
|
|
|
|
count += 1
|
|
|
|
logger.info(f"[{source.name}] RSS → {count}건 수집")
|
|
return count
|
|
|
|
|
|
async def _fetch_api(session, source: NewsSource) -> int:
|
|
"""NYT API 수집 — 키 마스킹 + health degradation"""
|
|
import os
|
|
nyt_key = os.getenv("NYT_API_KEY", "")
|
|
if not nyt_key:
|
|
logger.error("NYT_API_KEY 미설정 — US 뉴스 수집 불가")
|
|
return 0
|
|
|
|
try:
|
|
async with httpx.AsyncClient(timeout=10) as client:
|
|
resp = await client.get(
|
|
f"https://api.nytimes.com/svc/topstories/v2/{source.category or 'world'}.json",
|
|
params={"api-key": nyt_key},
|
|
)
|
|
resp.raise_for_status()
|
|
except httpx.HTTPStatusError as e:
|
|
# 쿼리스트링(api-key 포함) 제거 — path까지만 로깅
|
|
safe_url = str(e.request.url).split("?")[0]
|
|
logger.error(f"NYT API 실패: {e.response.status_code} @ {safe_url}")
|
|
return 0
|
|
except httpx.RequestError as e:
|
|
safe_url = str(e.request.url).split("?")[0] if e.request else "unknown"
|
|
logger.error(f"NYT API 연결 실패: {safe_url}")
|
|
return 0
|
|
|
|
data = resp.json()
|
|
count = 0
|
|
|
|
for article in data.get("results", []):
|
|
title = article.get("title", "").strip()
|
|
if not title:
|
|
continue
|
|
|
|
summary = _clean_html(article.get("abstract", ""))
|
|
if not summary:
|
|
summary = title
|
|
|
|
link = article.get("url", "")
|
|
pub_str = article.get("published_date", "")
|
|
try:
|
|
pub_dt = datetime.fromisoformat(pub_str.replace("Z", "+00:00"))
|
|
except (ValueError, AttributeError):
|
|
pub_dt = datetime.now(timezone.utc)
|
|
|
|
article_id = _article_hash(title, pub_dt.strftime("%Y%m%d"), source.name)
|
|
normalized_url = _normalize_url(link)
|
|
|
|
existing = await session.execute(
|
|
select(Document).where(
|
|
(Document.file_hash == article_id) |
|
|
(Document.edit_url == normalized_url)
|
|
)
|
|
)
|
|
if existing.scalar_one_or_none():
|
|
continue
|
|
|
|
category = _normalize_category(article.get("section", source.category or ""))
|
|
source_short = source.name.split(" ")[0]
|
|
|
|
doc = Document(
|
|
file_path=f"news/{source.name}/{article_id}",
|
|
file_hash=article_id,
|
|
file_format="article",
|
|
file_size=len(summary.encode()),
|
|
file_type="note",
|
|
title=title,
|
|
extracted_text=f"{title}\n\n{summary}",
|
|
extracted_at=datetime.now(timezone.utc),
|
|
extractor_version="nyt_api",
|
|
source_channel="news",
|
|
data_origin="external",
|
|
edit_url=link,
|
|
review_status="approved",
|
|
ai_domain="News",
|
|
ai_sub_group=source_short,
|
|
ai_tags=[f"News/{source_short}/{category}"],
|
|
)
|
|
session.add(doc)
|
|
await session.flush()
|
|
|
|
await enqueue_stage(session, doc.id, "summarize")
|
|
days_old = (datetime.now(timezone.utc) - pub_dt).days
|
|
if days_old <= 30:
|
|
await enqueue_stage(session, doc.id, "embed")
|
|
await enqueue_stage(session, doc.id, "chunk")
|
|
|
|
count += 1
|
|
|
|
logger.info(f"[{source.name}] API → {count}건 수집")
|
|
return count
|