Files
hyungi_document_server/app/workers/news_collector.py
T
hyungi acd595244a fix(news): URL dedup 정규화 저장·조회 통일 + 다중매칭 내성
BBC Technology 매 사이클 MultipleResultsFound (06-04~) 해소.
- 저장 edit_url=raw vs 조회 normalized 비대칭으로 URL dedup 무력화돼
  교차게시(HN x BBC) 시 2행 동시매칭 -> scalar_one_or_none raise.
- _normalize_url: query 전체 제거 -> tracking 파라미터만 제거로 교정
  (hada.io/topic?id= 등 query-식별 사이트 870건 붕괴 방지, 리뷰 게이트).
- 조회 .first() + edit_url IN (normalized, raw) 레거시 행 내성. RSS/NYT 양쪽.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-06-09 22:26:22 +00:00

351 lines
13 KiB
Python

"""뉴스 수집 워커 — RSS/API에서 기사 수집, documents에 저장"""
import hashlib
import re
from datetime import datetime, timezone
from html import unescape
from urllib.parse import parse_qsl, urlencode, urlparse, urlunparse
import feedparser
import httpx
from sqlalchemy import select
from core.database import async_session
from core.utils import setup_logger
from models.document import Document
from models.news_source import NewsSource
from models.queue import enqueue_stage
logger = setup_logger("news_collector")
# 카테고리 표준화 매핑
CATEGORY_MAP = {
# 한국어
"국제": "International", "정치": "Politics", "경제": "Economy",
"사회": "Society", "문화": "Culture", "산업": "Industry",
"환경": "Environment", "기술": "Technology",
# 영어
"World": "International", "International": "International",
"Technology": "Technology", "Tech": "Technology", "Sci-Tech": "Technology",
"Arts": "Culture", "Culture": "Culture",
"Climate": "Environment", "Environment": "Environment",
# 일본어
"国際": "International", "文化": "Culture", "科学": "Technology",
# 독일어
"Kultur": "Culture", "Wissenschaft": "Technology",
# 프랑스어
"Environnement": "Environment",
}
def _normalize_category(raw: str) -> str:
"""카테고리 표준화"""
return CATEGORY_MAP.get(raw, CATEGORY_MAP.get(raw.strip(), "Other"))
def _clean_html(text: str) -> str:
"""HTML 태그 제거 + 정제"""
if not text:
return ""
text = re.sub(r"<[^>]+>", "", text)
text = unescape(text)
return text.strip()[:1000]
# tracking 파라미터 판별 — prefix(utm_/at_=BBC/ns_=BBC/mc_=mailchimp) + 단독 키
_TRACKING_PREFIXES = ("utm_", "at_", "ns_", "mc_")
_TRACKING_PARAMS = {"fbclid", "gclid", "igshid", "ref", "smid", "partner", "cmp", "ocid", "ftag"}
def _normalize_url(url: str) -> str:
"""URL 정규화 — tracking 파라미터만 제거, 콘텐츠 식별 파라미터는 보존.
query 전체 제거 금지: hada.io/topic?id= · aitimes articleView.html?idxno= ·
HN item?id= 등 query-식별 사이트에서 별개 기사가 같은 URL 로 붕괴된다.
저장(edit_url)·조회 양쪽이 이 함수를 공유해야 dedup 이 성립.
"""
parsed = urlparse(url)
kept = [
(k, v) for k, v in parse_qsl(parsed.query, keep_blank_values=True)
if not (k.lower().startswith(_TRACKING_PREFIXES) or k.lower() in _TRACKING_PARAMS)
]
return urlunparse((parsed.scheme, parsed.netloc, parsed.path, "", urlencode(kept), ""))
def _article_hash(title: str, published: str, source_name: str) -> str:
"""기사 고유 해시 (중복 체크용)"""
key = f"{title}|{published}|{source_name}"
return hashlib.sha256(key.encode()).hexdigest()[:32]
def _normalize_to_utc(dt) -> datetime:
"""다양한 시간 형식을 UTC로 정규화"""
if isinstance(dt, datetime):
if dt.tzinfo is None:
return dt.replace(tzinfo=timezone.utc)
return dt.astimezone(timezone.utc)
return datetime.now(timezone.utc)
async def run():
"""뉴스 수집 실행"""
async with async_session() as session:
result = await session.execute(
select(NewsSource).where(NewsSource.enabled == True)
)
sources = result.scalars().all()
if not sources:
logger.info("활성화된 뉴스 소스 없음")
return
total = 0
for source in sources:
try:
if source.feed_type == "api":
count = await _fetch_api(session, source)
else:
count = await _fetch_rss(session, source)
source.last_fetched_at = datetime.now(timezone.utc)
total += count
except Exception as e:
logger.error(f"[{source.name}] 수집 실패: {e}")
source.last_fetched_at = datetime.now(timezone.utc)
await session.commit()
logger.info(f"뉴스 수집 완료: {total}건 신규")
MAX_RESPONSE_SIZE = 5 * 1024 * 1024 # 5MB
ALLOWED_CONTENT_TYPES = ("application/rss+xml", "application/atom+xml",
"application/xml", "text/xml")
async def _fetch_rss(session, source: NewsSource) -> int:
"""RSS 피드 수집 — redirect 재검증 + 크기/content-type 제한"""
from urllib.parse import urljoin
from core.url_validator import validate_feed_url, HTTP_EXCEPTION_DOMAINS
# HTTP 허용 여부: 소스 도메인이 allowlist에 있으면 HTTP 허용
# SCMP처럼 HTTPS 원본이 HTTP로 redirect하는 경우도 커버
source_hostname = urlparse(source.feed_url).hostname
http_allowed = source_hostname in HTTP_EXCEPTION_DOMAINS
# 순수 HTTP 소스인데 allowlist에 없으면 차단
if source.feed_url.startswith("http://") and not http_allowed:
logger.error(f"[{source.name}] HTTP 차단 (allowlist 미등록): {source_hostname}")
return 0
# fetch 전 URL 재검증 (등록 이후 DNS 변경 대비)
try:
validate_feed_url(source.feed_url, allow_http=http_allowed)
except ValueError as e:
logger.error(f"[{source.name}] URL 검증 실패: {e}")
return 0
async with httpx.AsyncClient(timeout=10, follow_redirects=False) as client:
resp = await client.get(source.feed_url)
# redirect 수동 처리 (최대 3회, 각 target 재검증)
# allowlist 도메인이면 redirect target의 HTTP도 허용
redirects = 0
while resp.is_redirect and redirects < 3:
location = resp.headers.get("location", "")
location = urljoin(str(resp.request.url), location)
try:
validate_feed_url(location, allow_http=http_allowed)
except ValueError as e:
logger.error(f"[{source.name}] redirect target 차단: {e}")
return 0
resp = await client.get(location)
redirects += 1
if resp.is_redirect:
logger.error(f"[{source.name}] redirect 3회 초과")
return 0
resp.raise_for_status()
if len(resp.content) > MAX_RESPONSE_SIZE:
logger.warning(f"[{source.name}] 응답 크기 초과: {len(resp.content)} bytes")
return 0
ct = resp.headers.get("content-type", "").lower()
if not any(t in ct for t in ALLOWED_CONTENT_TYPES):
logger.warning(f"[{source.name}] 비정상 content-type: {ct}")
return 0
feed = feedparser.parse(resp.text)
if feed.bozo and not feed.entries:
logger.warning(f"[{source.name}] RSS 파싱 실패: {feed.bozo_exception}")
return 0
count = 0
for entry in feed.entries:
title = entry.get("title", "").strip()
if not title:
continue
summary = _clean_html(entry.get("summary", "") or entry.get("description", ""))
if not summary:
summary = title
link = entry.get("link", "")
published = entry.get("published_parsed") or entry.get("updated_parsed")
pub_dt = datetime(*published[:6], tzinfo=timezone.utc) if published else datetime.now(timezone.utc)
# 중복 체크 — 레거시 행은 raw URL 로 저장돼 있어 normalized/raw 양쪽 매칭.
# 교차 게시(같은 기사가 두 피드에 존재)로 2행 이상 매칭될 수 있어 first() 사용
# (scalar_one_or_none 은 MultipleResultsFound raise — 2026-06 BBC 수집 중단 원인).
article_id = _article_hash(title, pub_dt.strftime("%Y%m%d"), source.name)
normalized_url = _normalize_url(link)
existing = await session.execute(
select(Document).where(
(Document.file_hash == article_id) |
(Document.edit_url.in_([normalized_url, link]))
).limit(1)
)
if existing.scalars().first():
continue
category = _normalize_category(source.category or "")
source_short = source.name.split(" ")[0] # "경향신문 문화" → "경향신문"
doc = Document(
file_path=f"news/{source.name}/{article_id}",
file_hash=article_id,
file_format="article",
file_size=len(summary.encode()),
file_type="note",
title=title,
extracted_text=f"{title}\n\n{summary}",
extracted_at=datetime.now(timezone.utc),
extractor_version="rss",
# article = 텍스트 네이티브(본문=extracted_text). markdown 단계 미enqueue 라
# 기본값 'pending' 이면 영구 비수렴 → backlog 지표 오염 + md_status_pending partial
# 인덱스 비대. 생성 시점에 terminal 'skipped' 로 명시(변환 비대상).
md_status="skipped",
md_extraction_error="news article: 텍스트 네이티브, markdown 변환 비대상",
source_channel="news",
data_origin="external",
# 조회와 동일하게 정규화해 저장 — raw(tracking param 포함) 저장 시 URL dedup 무력화
edit_url=normalized_url,
review_status="approved",
ai_domain="News",
ai_sub_group=source_short,
ai_tags=[f"News/{source_short}/{category}"],
)
session.add(doc)
await session.flush()
# summarize + embed + chunk 등록 (classify 불필요)
await enqueue_stage(session, doc.id, "summarize")
days_old = (datetime.now(timezone.utc) - pub_dt).days
if days_old <= 30:
await enqueue_stage(session, doc.id, "embed")
await enqueue_stage(session, doc.id, "chunk")
count += 1
logger.info(f"[{source.name}] RSS → {count}건 수집")
return count
async def _fetch_api(session, source: NewsSource) -> int:
"""NYT API 수집 — 키 마스킹 + health degradation"""
import os
nyt_key = os.getenv("NYT_API_KEY", "")
if not nyt_key:
logger.error("NYT_API_KEY 미설정 — US 뉴스 수집 불가")
return 0
try:
async with httpx.AsyncClient(timeout=10) as client:
resp = await client.get(
f"https://api.nytimes.com/svc/topstories/v2/{source.category or 'world'}.json",
params={"api-key": nyt_key},
)
resp.raise_for_status()
except httpx.HTTPStatusError as e:
# 쿼리스트링(api-key 포함) 제거 — path까지만 로깅
safe_url = str(e.request.url).split("?")[0]
logger.error(f"NYT API 실패: {e.response.status_code} @ {safe_url}")
return 0
except httpx.RequestError as e:
safe_url = str(e.request.url).split("?")[0] if e.request else "unknown"
logger.error(f"NYT API 연결 실패: {safe_url}")
return 0
data = resp.json()
count = 0
for article in data.get("results", []):
title = article.get("title", "").strip()
if not title:
continue
summary = _clean_html(article.get("abstract", ""))
if not summary:
summary = title
link = article.get("url", "")
pub_str = article.get("published_date", "")
try:
pub_dt = datetime.fromisoformat(pub_str.replace("Z", "+00:00"))
except (ValueError, AttributeError):
pub_dt = datetime.now(timezone.utc)
article_id = _article_hash(title, pub_dt.strftime("%Y%m%d"), source.name)
normalized_url = _normalize_url(link)
# RSS 수집부와 동일: 레거시 raw URL + 교차 게시 다중 매칭 내성 (first)
existing = await session.execute(
select(Document).where(
(Document.file_hash == article_id) |
(Document.edit_url.in_([normalized_url, link]))
).limit(1)
)
if existing.scalars().first():
continue
category = _normalize_category(article.get("section", source.category or ""))
source_short = source.name.split(" ")[0]
doc = Document(
file_path=f"news/{source.name}/{article_id}",
file_hash=article_id,
file_format="article",
file_size=len(summary.encode()),
file_type="note",
title=title,
extracted_text=f"{title}\n\n{summary}",
extracted_at=datetime.now(timezone.utc),
extractor_version="nyt_api",
# article = 텍스트 네이티브(본문=extracted_text). markdown 단계 미enqueue 라
# 기본값 'pending' 이면 영구 비수렴 → backlog 지표 오염 + md_status_pending partial
# 인덱스 비대. 생성 시점에 terminal 'skipped' 로 명시(변환 비대상).
md_status="skipped",
md_extraction_error="news article: 텍스트 네이티브, markdown 변환 비대상",
source_channel="news",
data_origin="external",
edit_url=normalized_url,
review_status="approved",
ai_domain="News",
ai_sub_group=source_short,
ai_tags=[f"News/{source_short}/{category}"],
)
session.add(doc)
await session.flush()
await enqueue_stage(session, doc.id, "summarize")
days_old = (datetime.now(timezone.utc) - pub_dt).days
if days_old <= 30:
await enqueue_stage(session, doc.id, "embed")
await enqueue_stage(session, doc.id, "chunk")
count += 1
logger.info(f"[{source.name}] API → {count}건 수집")
return count