Files
hyungi 12ac18eb70 fix(collector): 수집기 견고화 — 한 건 실패가 전체 사이클을 죽이던 것 차단
C2 csb_collector: 주간 run 의 per-URL 루프에 try/except/continue — URL 1건 실패(page-extract
예외·DB DataError)가 run() 밖으로 전파돼 이후 URL 전부 스킵+watermark 정지하던 것 차단. 각
iteration 자체 session 이라 실패 격리.
H3 news_collector: 공유 세션+종단 단일 commit → 한 소스 DB오류가 오염시켜 전 소스 insert 소실하던
구조를 소스별 독립 세션으로(csb 패턴 동형). 실패 시 rollback 후 깨끗한 상태에서 failure 기록.
실증: 수동 수집서 Taipei Times ReadTimeout 격리하고 327건 정상 완주.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-20 05:42:12 +00:00

788 lines
34 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""뉴스 수집 워커 — RSS/API에서 기사 수집, documents에 저장
plan crawl-24x7-1 A그룹 (2026-06-10):
A-1 조건부 GET(ETag/Last-Modified 그대로 재전송) + 콘텐츠 해시 변경감지
A-2 fulltext_policy='page' 소스는 'fulltext' stage 로 본문 승격 위임
A-5 source_health 기록 + circuit breaker (소스별 실패 격리)
A-6 first-wins + 포털 전재 2차 dedup (제목+최근 3일, 12자 이상 제목 한정)
"""
import asyncio
import hashlib
import re
from datetime import datetime, timedelta, timezone
from html import unescape
from urllib.parse import parse_qsl, urlencode, urlparse, urlunparse
import feedparser
import httpx
from sqlalchemy import select
from core.crawl_politeness import CRAWL_UA
from core.database import async_session
from core.utils import setup_logger
from models.document import Document
from models.news_source import NewsSource
from models.queue import enqueue_stage
from models.source_health import SourceHealth
logger = setup_logger("news_collector")
# 카테고리 표준화 매핑
CATEGORY_MAP = {
# 한국어
"국제": "International", "정치": "Politics", "경제": "Economy",
"사회": "Society", "문화": "Culture", "산업": "Industry",
"환경": "Environment", "기술": "Technology",
# 영어
"World": "International", "International": "International",
"World news": "International", # Guardian sectionName (B-2)
"Technology": "Technology", "Tech": "Technology", "Sci-Tech": "Technology",
"Arts": "Culture", "Culture": "Culture",
"Climate": "Environment", "Environment": "Environment",
# 일본어
"国際": "International", "文化": "Culture", "科学": "Technology",
# 독일어
"Kultur": "Culture", "Wissenschaft": "Technology",
# 프랑스어
"Environnement": "Environment",
# 도메인 채널 (source_channel='crawl', 0-5 (a)) — 양쪽 공통 맵
"안전": "Safety", "Safety": "Safety",
"공학": "Engineering", "Engineering": "Engineering",
"철학": "Philosophy", "Philosophy": "Philosophy",
}
class FeedError(Exception):
"""소스 단위 fetch/parse 실패 — run() 이 source_health 실패로 기록."""
def _normalize_category(raw: str) -> str:
"""카테고리 표준화"""
return CATEGORY_MAP.get(raw, CATEGORY_MAP.get(raw.strip(), "Other"))
def _clean_html(text: str, max_len: int | None = 1000) -> str:
"""HTML 태그 제거 + 정제. max_len=None 이면 절단 없음 (feed-full 전문용)."""
if not text:
return ""
text = re.sub(r"<[^>]+>", "", text)
text = unescape(text)
text = text.strip()
return text if max_len is None else text[:max_len]
# tracking 파라미터 판별 — prefix(utm_/at_=BBC/ns_=BBC/mc_=mailchimp) + 단독 키
_TRACKING_PREFIXES = ("utm_", "at_", "ns_", "mc_")
_TRACKING_PARAMS = {"fbclid", "gclid", "igshid", "ref", "smid", "partner", "cmp", "ocid", "ftag"}
def _normalize_url(url: str) -> str:
"""URL 정규화 — tracking 파라미터만 제거, 콘텐츠 식별 파라미터는 보존.
query 전체 제거 금지: hada.io/topic?id= · aitimes articleView.html?idxno= ·
HN item?id= 등 query-식별 사이트에서 별개 기사가 같은 URL 로 붕괴된다.
저장(edit_url)·조회 양쪽이 이 함수를 공유해야 dedup 이 성립.
★R11c: file_watcher._canonicalize_url(web_clip 채널)과 의도적으로 다르다 — 이쪽은 콘텐츠
식별 query 보존(별개 기사 붕괴 방지)이 핵심이라 query-sort/trailing-slash/소문자화를 안 한다.
두 함수 통합 금지(news dedup 가 깨짐). 채널별 normalization 은 의도된 설계.
"""
parsed = urlparse(url)
kept = [
(k, v) for k, v in parse_qsl(parsed.query, keep_blank_values=True)
if not (k.lower().startswith(_TRACKING_PREFIXES) or k.lower() in _TRACKING_PARAMS)
]
return urlunparse((parsed.scheme, parsed.netloc, parsed.path, "", urlencode(kept), ""))
def _article_hash(title: str, published: str, source_name: str) -> str:
"""기사 고유 해시 (중복 체크용)"""
key = f"{title}|{published}|{source_name}"
return hashlib.sha256(key.encode()).hexdigest()[:32]
def _normalize_to_utc(dt) -> datetime:
"""다양한 시간 형식을 UTC로 정규화"""
if isinstance(dt, datetime):
if dt.tzinfo is None:
return dt.replace(tzinfo=timezone.utc)
return dt.astimezone(timezone.utc)
return datetime.now(timezone.utc)
# ── A-5: circuit breaker 정책 ──
# 연속 실패 >= OPEN 임계 → open (재시도 간격 지수 확대, 6h × 2^n, cap 48h)
# 연속 실패 > DISABLE 임계 → disabled (수집 제외 + 가시 로그, 수동 복구 대상)
# news_sources.enabled 는 건드리지 않는다 — 사용자 의도(enabled)와 자동 상태(circuit) 분리.
_CIRCUIT_OPEN_AFTER = 3
_CIRCUIT_DISABLE_AFTER = 10
_BACKOFF_BASE_HOURS = 6
_BACKOFF_CAP_HOURS = 48
_EMPTY_STREAK_ALERT = 8 # 6h 사이클 × 8 = 약 2일 연속 빈 피드 → 가시 경고
def _should_attempt(health: SourceHealth, now: datetime) -> bool:
"""circuit 상태에 따라 이번 사이클 fetch 여부 결정.
주의 (B-3 계약 ②, r5): 추후 relogin_requested 플래그 소비는 반드시 이
open-스킵 분기보다 *앞*에 두어야 한다 — open 이 스케줄 제외 형태가 되면
배치 경계가 안 와 플래그가 영원히 미소비(half-open 데드 버튼)가 된다.
"""
if health.circuit_state == "disabled":
return False
if health.circuit_state == "open" and health.last_error_at is not None:
over = max(health.consecutive_failures - _CIRCUIT_OPEN_AFTER, 0)
backoff_h = min(_BACKOFF_BASE_HOURS * (2 ** over), _BACKOFF_CAP_HOURS)
if now - health.last_error_at < timedelta(hours=backoff_h):
return False
return True
def _record_success(health: SourceHealth, items: int, not_modified: bool, now: datetime) -> None:
health.consecutive_failures = 0
health.total_fetches += 1
health.last_success_at = now
health.last_fetch_items = items
if health.circuit_state != "closed":
logger.info(f"[health] source={health.source_id} circuit {health.circuit_state}→closed")
health.circuit_state = "closed"
health.circuit_opened_at = None
# 빈 피드 streak: 304/해시동일은 정상 신호라 미집계, 200+entries 0 만 집계 (피드 부패 감시)
if not_modified:
pass
elif items == 0:
health.empty_streak += 1
if health.empty_streak >= _EMPTY_STREAK_ALERT:
logger.error(
f"[health] source={health.source_id} 빈 피드 {health.empty_streak}회 연속 "
f"— 피드 부패 의심 (RSSHub 류 라우트 깨짐 패턴)"
)
else:
health.empty_streak = 0
health.updated_at = now
def _record_failure(health: SourceHealth, error: str, now: datetime) -> None:
health.consecutive_failures += 1
health.total_fetches += 1
health.total_failures += 1
health.last_error = error[:500]
health.last_error_at = now
health.updated_at = now
cf = health.consecutive_failures
if cf > _CIRCUIT_DISABLE_AFTER and health.circuit_state != "disabled":
health.circuit_state = "disabled"
logger.error(
f"[health] source={health.source_id} 연속 실패 {cf}회 — circuit DISABLED "
f"(수집 제외, A-8 패널에서 수동 복구 필요)"
)
elif cf >= _CIRCUIT_OPEN_AFTER and health.circuit_state == "closed":
health.circuit_state = "open"
health.circuit_opened_at = now
logger.warning(f"[health] source={health.source_id} 연속 실패 {cf}회 — circuit open")
async def _get_or_create_health(session, source_id: int) -> SourceHealth:
result = await session.execute(
select(SourceHealth).where(SourceHealth.source_id == source_id)
)
health = result.scalars().first()
if health is None:
health = SourceHealth(source_id=source_id)
session.add(health)
await session.flush()
return health
# 수동 POST /api/news/collect 와 6h 스케줄 사이클의 동시 실행 차단 (단일 프로세스·단일
# 이벤트루프). 동시 진입 시 _get_or_create_health 가 같은 source_id 를 양쪽에서 INSERT
# → uq_source_health_source_id 위반 IntegrityError 로 사이클 전체가 죽는 경합의 원천 봉쇄.
_run_lock = asyncio.Lock()
async def run():
"""뉴스 수집 실행"""
async with _run_lock:
await _run_locked()
async def _run_locked():
now = datetime.now(timezone.utc)
async with async_session() as session:
result = await session.execute(
select(NewsSource).where(NewsSource.enabled == True)
)
source_ids = [s.id for s in result.scalars().all()]
if not source_ids:
logger.info("활성화된 뉴스 소스 없음")
return
# 2026-06-20 H3: 소스마다 독립 세션 — 한 소스의 DB 오류가 종단 단일 commit 을 깨뜨려
# 전 소스 insert 를 잃던 것 차단. 실패 시 rollback 후 깨끗한 상태에서 failure 기록.
# (csb_collector 의 per-iteration 세션 패턴과 동형.)
total = 0
for sid in source_ids:
async with async_session() as session:
source = await session.get(NewsSource, sid)
if source is None:
continue
sname = source.name
health = await _get_or_create_health(session, sid)
if not _should_attempt(health, now):
logger.info(f"[{sname}] circuit {health.circuit_state} — 이번 사이클 skip")
continue
try:
if source.feed_type == "api":
count, status = await _fetch_api(session, source)
else:
count, status = await _fetch_rss(session, source)
source.last_fetched_at = datetime.now(timezone.utc)
_record_success(health, count, status == "not_modified", now)
total += count
await session.commit()
except Exception as e:
# str 이 빈 예외(httpx.ConnectError('')) 대비 — health 기록과 동일 규칙
await session.rollback()
logger.error(f"[{sname}] 수집 실패: {str(e) or repr(e)}")
health = await _get_or_create_health(session, sid)
src = await session.get(NewsSource, sid)
if src is not None:
src.last_fetched_at = datetime.now(timezone.utc)
_record_failure(health, str(e) or repr(e), now)
await session.commit()
logger.info(f"뉴스 수집 완료: {total}건 신규")
MAX_RESPONSE_SIZE = 5 * 1024 * 1024 # 5MB
ALLOWED_CONTENT_TYPES = ("application/rss+xml", "application/atom+xml",
"application/xml", "text/xml")
# 연결 재시도 간격 — MOEL 추가 실측(2026-06-11): 드랍이 연결 단위 랜덤이라
# 1.5s 후 재시도도 연속으로 걸리는 케이스 발생(직후 다른 연결은 즉시 성공) → 2회로 보강.
_CONNECT_RETRY_DELAYS = (2.0, 5.0)
async def _get_with_connect_retry(client, url: str):
"""연결 계층(TCP/TLS) 오류만 재시도(최대 2회) — HTTP 상태 오류는 비대상 (호출측 분기 보존).
MOEL 실측(2026-06-11): 정부 사이트 보안장비가 TLS 핸드셰이크를 연결 단위로 간헐 드랍
(curl rc=35, 직후 재시도는 성공) → 사이클당 1회 fetch 인 피드 수집이 ConnectError('')
로 실패 누적·circuit open. 지속 장애는 그대로 circuit 몫.
"""
for delay in _CONNECT_RETRY_DELAYS:
try:
return await client.get(url)
except (httpx.ConnectError, httpx.ConnectTimeout) as e:
logger.info(f"연결 오류 {delay}s 후 재시도 ({url.split('?')[0]}): {repr(e)}")
await asyncio.sleep(delay)
return await client.get(url)
async def _is_portal_duplicate(session, title: str) -> bool:
"""A-6 2차 dedup: 포털 전재본 vs 원본이 다른 URL 로 이중 적재되는 케이스.
보조 키 = 제목 + 최근 3일 (다른 소스/다른 URL 이므로 1차 키로 안 잡힘).
범용 제목 오탐 방지: 12자 미만 제목은 비적용. skip 은 전부 로그 (silent 누락 회피).
"""
if len(title) < 12:
return False
cutoff = datetime.now(timezone.utc) - timedelta(days=3)
dup = await session.execute(
select(Document.id).where(
Document.title == title,
Document.source_channel == "news",
Document.file_format == "article",
Document.extracted_at >= cutoff,
).limit(1)
)
return dup.scalars().first() is not None
async def _enqueue_processing(session, doc: Document, source: NewsSource, pub_dt: datetime) -> None:
"""후속 단계 enqueue.
fulltext_policy='page' 소스는 'fulltext' stage 만 — summarize/embed/chunk 는
fulltext_worker 가 승격(또는 격하) 확정 후 enqueue (RSS 요약 선요약 → 풀텍스트
도착 시 summarize_worker 의 '이미 요약 있음 skip' 에 막히는 순서 함정 회피).
"""
if source.fetch_method == "signal-only":
# B-4: 시그널 = 검색 색인만 (embed/chunk). fulltext/summarize 절대 enqueue 안 함 —
# 레지스트리가 fulltext_policy='page' 로 잘못 설정돼도 페이지 fetch 0 (방어 우선).
# 요약 LLM 스킵 = 맥미니 부하 0. 다이제스트/브리핑은 ai_summary IS NULL 문서를
# 처음부터 제외(services/digest/loader.py)하므로 시그널 문서가 자연 배제된다.
if source.source_channel == "crawl" or (datetime.now(timezone.utc) - pub_dt).days <= 30:
await enqueue_stage(session, doc.id, "embed")
await enqueue_stage(session, doc.id, "chunk")
return
if source.fulltext_policy == "page" and doc.edit_url:
await enqueue_stage(session, doc.id, "fulltext")
return
await enqueue_stage(session, doc.id, "summarize")
if source.source_channel == "crawl":
# 도메인 재료 코퍼스 — 발행일 무관 전량 색인 (30일 게이트는 뉴스 전용)
await enqueue_stage(session, doc.id, "embed")
await enqueue_stage(session, doc.id, "chunk")
return
days_old = (datetime.now(timezone.utc) - pub_dt).days
if days_old <= 30:
await enqueue_stage(session, doc.id, "embed")
await enqueue_stage(session, doc.id, "chunk")
def _entry_body(source: NewsSource, entry, summary: str) -> tuple[str, str]:
"""(body, extractor_version) — 정책별 본문 선택, 순수 함수 (shape 테스트 대상).
signal-only: 피드 요약이 곧 본문 — 절단 없음 (arXiv 초록 1.3~1.6K자 보존,
1000자 cap 적용 시 초록 꼬리 유실). 페이지 fetch 는 어떤 경우에도 없음 (B-4).
feed-full: 피드 본문이 전문인 소스만 신뢰 (truncate·광고 삽입이 흔해 일반
소스의 summary/content:encoded 를 전문으로 오인 저장 금지 — A-6).
"""
if source.fetch_method == "signal-only":
body = _clean_html(
entry.get("summary", "") or entry.get("description", ""), max_len=None
)
return (body or summary), "rss-signal"
if source.fulltext_policy == "feed-full":
content_list = entry.get("content") or []
raw_body = content_list[0].get("value", "") if content_list else ""
full_body = _clean_html(raw_body or entry.get("summary", ""), max_len=None)
if len(full_body) > len(summary):
return full_body, "rss-feed-full"
return summary, "rss"
def _build_extract_meta(source: NewsSource, pub_dt: datetime) -> dict:
"""fulltext_worker / 패널이 쓰는 출처 메타 (documents 에 source FK 가 없어 여기 기록)."""
meta = {
"source_id": source.id,
"source_name": source.name,
"published_at": pub_dt.isoformat(),
}
# 안전 자료실 A-2: 소스 레지스트리의 라이선스를 deterministic 주입 (0-3 license 메타).
# P3 다이제스트/발행류가 redistribute=false 소스를 구조적으로 제외하는 게이트 입력.
if source.license_scheme:
meta["license"] = {
"scheme": source.license_scheme,
"redistribute": bool(source.license_redistribute),
"attribution": source.name,
}
return meta
def _material_axis(source: NewsSource) -> tuple[str | None, str | None]:
"""안전 자료실 분류 축 (material_type, jurisdiction) — 레지스트리 deterministic.
- material_type = news_sources.material_type (NULL = 비대상, 뉴스/철학 등)
- jurisdiction = source.country 전파. 단 paper 는 NULL 강제
(국제 학술지에 관할 개념 부적합 — plan 0-1 계약. 레지스트리 country=US 여도 미전파).
"""
mt = source.material_type
if not mt:
return None, None
if mt == "paper":
return mt, None
return mt, source.country
def _doc_identity(source: NewsSource, source_short: str, category: str) -> dict:
"""채널별 문서 정체성 — news 채널은 기존 값 그대로(무회귀), crawl 채널은 도메인 정체성.
file_path 접두사가 곧 채널 디렉토리. ai_domain 은 다이제스트/검색 필터의 분기 축이라
crawl 채널이 'News' 를 오염시키지 않게 분리 (0-5 채널 레벨 분리 사상).
"""
material_type, jurisdiction = _material_axis(source)
if source.source_channel == "crawl":
domain = category if category and category != "Other" else "Domain"
return {
"path_prefix": "crawl",
"ai_domain": domain,
"ai_tags": [f"{domain}/{source_short}"],
"material_type": material_type,
"jurisdiction": jurisdiction,
}
return {
"path_prefix": "news",
"ai_domain": "News",
"ai_tags": [f"News/{source_short}/{category}"],
"material_type": material_type,
"jurisdiction": jurisdiction,
}
async def _already_ingested(session, article_id: str, normalized_url: str, link: str) -> bool:
"""이미 적재된 기사인지 — file_hash 또는 정규화/raw edit_url 매칭 (3 fetch 공통, R11c).
레거시 raw URL + 교차 게시 다중 매칭 내성(first). _fetch_rss/_fetch_api_guardian/
_fetch_api_nyt 가 복제하던 동일 존재체크를 단일화.
"""
existing = await session.execute(
select(Document).where(
(Document.file_hash == article_id)
| (Document.edit_url.in_([normalized_url, link]))
).limit(1)
)
return existing.scalars().first() is not None
def _build_news_doc(source, ident, source_short, article_id, title, body,
extractor_version, normalized_url, pub_dt) -> Document:
"""3 fetch 공통 뉴스 Document 빌더 (R11c). 채널별 차이는 인자로만 — body(NYT=summary)·
extractor_version·ident(category 계산 차이 흡수)만 다르고 22 필드 구조는 정적 동일.
edit_url 은 조회와 동일 정규화 저장(raw 저장 시 URL dedup 무력화)."""
return Document(
file_path=f"{ident['path_prefix']}/{source.name}/{article_id}",
file_hash=article_id,
file_format="article",
file_size=len(body.encode()),
file_type="note",
title=title,
extracted_text=f"{title}\n\n{body}",
extracted_at=datetime.now(timezone.utc),
extractor_version=extractor_version,
# article = 텍스트 네이티브 → 생성 시점 terminal 'skipped' 명시(markdown 변환 비대상,
# 미명시 시 'pending' 영구 비수렴 → backlog 지표 오염). page 정책은 fulltext_worker 승격.
md_status="skipped",
md_extraction_error="news article: 텍스트 네이티브, markdown 변환 비대상",
source_channel=source.source_channel,
data_origin="external",
edit_url=normalized_url,
review_status="approved",
ai_domain=ident["ai_domain"],
ai_sub_group=source_short,
ai_tags=ident["ai_tags"],
# 안전 자료실 A-2 — 레지스트리 deterministic (classify-skip 경로라 ingest 시점 필수)
material_type=ident["material_type"],
jurisdiction=ident["jurisdiction"],
published_date=pub_dt.date() if pub_dt else None,
extract_meta=_build_extract_meta(source, pub_dt),
)
async def _fetch_rss(session, source: NewsSource) -> tuple[int, str]:
"""RSS 피드 수집 — redirect 재검증 + 크기/content-type 제한 + 조건부 GET (A-1).
반환 (신규 건수, 상태). 상태 'not_modified' = 304 또는 콘텐츠 해시 동일.
소스 단위 실패는 FeedError raise — run() 이 health 실패로 기록.
"""
from urllib.parse import urljoin
from core.url_validator import validate_feed_url, HTTP_EXCEPTION_DOMAINS
# HTTP 허용 여부: 소스 도메인이 allowlist에 있으면 HTTP 허용
# SCMP처럼 HTTPS 원본이 HTTP로 redirect하는 경우도 커버
source_hostname = urlparse(source.feed_url).hostname
http_allowed = source_hostname in HTTP_EXCEPTION_DOMAINS
# 순수 HTTP 소스인데 allowlist에 없으면 차단
if source.feed_url.startswith("http://") and not http_allowed:
raise FeedError(f"HTTP 차단 (allowlist 미등록): {source_hostname}")
# fetch 전 URL 재검증 (등록 이후 DNS 변경 대비)
try:
validate_feed_url(source.feed_url, allow_http=http_allowed)
except ValueError as e:
raise FeedError(f"URL 검증 실패: {e}") from e
# A-1: 정직 UA + 조건부 GET — 서버가 준 워터마크를 받은 그대로 재전송
headers = {"User-Agent": CRAWL_UA}
if source.etag:
headers["If-None-Match"] = source.etag
if source.last_modified:
headers["If-Modified-Since"] = source.last_modified
async with httpx.AsyncClient(
timeout=10, follow_redirects=False, headers=headers
) as client:
resp = await _get_with_connect_retry(client, source.feed_url)
# 304 는 redirect 처리보다 먼저 — httpx 의 is_redirect 는 3xx 전체(304 포함)에
# True 라, 304 를 redirect 로 오인하면 location 없는 같은 URL 을 재요청해
# "redirect 3회 초과" 로 오류 처리됨(조건부 GET 안정 피드 전멸 버그).
if resp.status_code == 304:
logger.info(f"[{source.name}] 304 Not Modified — 본문 미전송")
return 0, "not_modified"
# redirect 수동 처리 (최대 3회, 각 target 재검증) — location 있는 진짜 redirect 만.
# allowlist 도메인이면 redirect target의 HTTP도 허용
redirects = 0
while resp.has_redirect_location and redirects < 3:
location = urljoin(str(resp.request.url), resp.headers["location"])
try:
validate_feed_url(location, allow_http=http_allowed)
except ValueError as e:
raise FeedError(f"redirect target 차단: {e}") from e
resp = await client.get(location)
if resp.status_code == 304:
logger.info(f"[{source.name}] 304 Not Modified (redirect 후) — 본문 미전송")
return 0, "not_modified"
redirects += 1
if resp.has_redirect_location:
raise FeedError("redirect 3회 초과")
resp.raise_for_status()
if len(resp.content) > MAX_RESPONSE_SIZE:
raise FeedError(f"응답 크기 초과: {len(resp.content)} bytes")
ct = resp.headers.get("content-type", "").lower()
if not any(t in ct for t in ALLOWED_CONTENT_TYPES):
raise FeedError(f"비정상 content-type: {ct}")
# A-1: 콘텐츠 해시 변경감지 (CDN 의 ETag 회전 대비 병행) — 저장된 해시는 항상
# 파싱 검증을 통과한 응답의 것이므로 동일성 비교는 파싱 전에 안전
new_etag = resp.headers.get("etag")
new_last_modified = resp.headers.get("last-modified")
content_hash = hashlib.sha256(resp.content).hexdigest()
if source.feed_content_hash == content_hash:
logger.info(f"[{source.name}] 콘텐츠 해시 동일 — 파싱 skip")
return 0, "not_modified"
feed = feedparser.parse(resp.text)
if feed.bozo and not feed.entries:
raise FeedError(f"RSS 파싱 실패: {feed.bozo_exception}")
# A-1: 워터마크 영속은 파싱 검증 통과 후에만 — 부패(bozo) 응답의 ETag 를 저장하면
# 이후 304 로 영구 skip 되는 silent corruption 차단
if new_etag:
source.etag = new_etag
if new_last_modified:
source.last_modified = new_last_modified
source.feed_content_hash = content_hash
count = 0
for entry in feed.entries:
title = entry.get("title", "").strip()
if not title:
continue
summary = _clean_html(entry.get("summary", "") or entry.get("description", ""))
if not summary:
summary = title
# 정책별 본문 선택 — signal-only(무절단 요약) / feed-full(피드 전문) / 기본(요약)
body, extractor_version = _entry_body(source, entry, summary)
link = entry.get("link", "")
# B-5 quirk: 비디오 항목 필터 (Aeon/Psyche — 텍스트 코퍼스에 비디오 페이지 무가치)
if source.parser_quirk == "skip-video" and re.search(r"/videos?/", link):
continue
published = entry.get("published_parsed") or entry.get("updated_parsed")
pub_dt = datetime(*published[:6], tzinfo=timezone.utc) if published else datetime.now(timezone.utc)
# 중복 체크 — 레거시 행은 raw URL 로 저장돼 있어 normalized/raw 양쪽 매칭.
# 교차 게시(같은 기사가 두 피드에 존재)로 2행 이상 매칭될 수 있어 first() 사용
# (scalar_one_or_none 은 MultipleResultsFound raise — 2026-06 BBC 수집 중단 원인).
article_id = _article_hash(title, pub_dt.strftime("%Y%m%d"), source.name)
normalized_url = _normalize_url(link)
if await _already_ingested(session, article_id, normalized_url, link):
continue
# A-6 2차: 포털 전재 dedup (first-wins — 먼저 적재된 쪽이 정본)
if await _is_portal_duplicate(session, title):
logger.info(f"[{source.name}] portal-dup skip: {title[:60]}")
continue
category = _normalize_category(source.category or "")
source_short = source.name.split(" ")[0] # "경향신문 문화" → "경향신문"
ident = _doc_identity(source, source_short, category)
doc = _build_news_doc(
source, ident, source_short, article_id, title, body,
extractor_version, normalized_url, pub_dt,
)
session.add(doc)
await session.flush()
# summarize + embed + chunk 등록 (classify 불필요).
# page 정책 소스는 fulltext 만 — 후속은 fulltext_worker 가 확정 후 enqueue.
await _enqueue_processing(session, doc, source, pub_dt)
count += 1
logger.info(f"[{source.name}] RSS → {count}건 수집")
return count, "ok"
async def _fetch_api(session, source: NewsSource) -> tuple[int, str]:
"""API 소스 디스패치 — feed_url 호스트로 제공자 판별 (B-2).
레거시 NYT 행(feed_url=api.nytimes.com)은 무변경 경로. 신규 제공자는 호스트 분기 추가.
미지의 호스트 = NYT 경로로 넘기지 않고 명시 실패 (silent fallback 금지).
"""
host = (urlparse(source.feed_url).hostname or "").lower()
if host.endswith("guardianapis.com"):
return await _fetch_api_guardian(session, source)
if host.endswith("nytimes.com"):
return await _fetch_api_nyt(session, source)
raise FeedError(f"API 제공자 미등록 호스트: {host} — 디스패치 분기 추가 필요")
def _guardian_request(feed_url: str, api_key: str) -> tuple[str, dict]:
"""Guardian 호출 형태 단일 source-of-truth — fixture 회귀 테스트 대상
(tests/fixtures/guardian_open_platform_search_response.json 박제 시 호출과 동일해야 함)."""
parsed = urlparse(feed_url)
params = {
**dict(parse_qsl(parsed.query)),
"show-fields": "bodyText,trailText",
"page-size": "20",
"order-by": "newest",
"api-key": api_key,
}
return f"{parsed.scheme}://{parsed.netloc}{parsed.path}", params
async def _fetch_api_guardian(session, source: NewsSource) -> tuple[int, str]:
"""Guardian Open Platform 수집 (B-2) — show-fields=bodyText 로 정식 전문 JSON.
feed_url 에 section 쿼리를 박아 등록 (예: https://content.guardianapis.com/search?section=world).
전문이 API 로 오므로 fulltext stage 불요. 키 미설정 = FeedError (health 실패 기록,
silent fallback 없음 — [[feedback_no_silent_fallback_explicit_opt_in]]).
"""
import os
api_key = os.getenv("GUARDIAN_API_KEY", "")
if not api_key:
raise FeedError("GUARDIAN_API_KEY 미설정 — Guardian 수집 불가")
endpoint, params = _guardian_request(source.feed_url, api_key)
try:
async with httpx.AsyncClient(timeout=15) as client:
resp = await client.get(endpoint, params=params)
resp.raise_for_status()
except httpx.HTTPStatusError as e:
# 쿼리스트링(api-key 포함) 제거 — path 까지만 로깅 (NYT 와 동일 규율)
safe_url = str(e.request.url).split("?")[0]
raise FeedError(f"Guardian API 실패: {e.response.status_code} @ {safe_url}") from e
except httpx.RequestError as e:
safe_url = str(e.request.url).split("?")[0] if e.request else "unknown"
raise FeedError(f"Guardian API 연결 실패: {safe_url}") from e
payload = resp.json().get("response", {})
if payload.get("status") != "ok":
raise FeedError(f"Guardian API status={payload.get('status')}")
count = 0
for item in payload.get("results", []):
title = (item.get("webTitle") or "").strip()
if not title:
continue
fields = item.get("fields") or {}
body_text = (fields.get("bodyText") or "").strip()
trail = _clean_html(fields.get("trailText") or "")
# bodyText = plain text 전문 (HTML 정화 불요). 짧으면(라이브 블로그 잔재 등) trail 격하.
is_full = len(body_text) >= 200
body = body_text if is_full else (trail or title)
link = item.get("webUrl", "")
pub_str = item.get("webPublicationDate", "")
try:
pub_dt = datetime.fromisoformat(pub_str.replace("Z", "+00:00"))
except (ValueError, AttributeError):
pub_dt = datetime.now(timezone.utc)
article_id = _article_hash(title, pub_dt.strftime("%Y%m%d"), source.name)
normalized_url = _normalize_url(link)
# RSS 수집부와 동일: 레거시 raw URL + 교차 게시 다중 매칭 내성 (first)
if await _already_ingested(session, article_id, normalized_url, link):
continue
if await _is_portal_duplicate(session, title):
logger.info(f"[{source.name}] portal-dup skip: {title[:60]}")
continue
category = _normalize_category(item.get("sectionName", source.category or ""))
source_short = source.name.split(" ")[0]
ident = _doc_identity(source, source_short, category)
doc = _build_news_doc(
source, ident, source_short, article_id, title, body,
"guardian_api_full" if is_full else "guardian_api", normalized_url, pub_dt,
)
session.add(doc)
await session.flush()
await _enqueue_processing(session, doc, source, pub_dt)
count += 1
logger.info(f"[{source.name}] API → {count}건 수집")
return count, "ok"
async def _fetch_api_nyt(session, source: NewsSource) -> tuple[int, str]:
"""NYT API 수집 — 키 마스킹 + health degradation"""
import os
nyt_key = os.getenv("NYT_API_KEY", "")
if not nyt_key:
raise FeedError("NYT_API_KEY 미설정 — US 뉴스 수집 불가")
try:
async with httpx.AsyncClient(timeout=10) as client:
resp = await client.get(
f"https://api.nytimes.com/svc/topstories/v2/{source.category or 'world'}.json",
params={"api-key": nyt_key},
)
resp.raise_for_status()
except httpx.HTTPStatusError as e:
# 쿼리스트링(api-key 포함) 제거 — path까지만 로깅
safe_url = str(e.request.url).split("?")[0]
raise FeedError(f"NYT API 실패: {e.response.status_code} @ {safe_url}") from e
except httpx.RequestError as e:
safe_url = str(e.request.url).split("?")[0] if e.request else "unknown"
raise FeedError(f"NYT API 연결 실패: {safe_url}") from e
data = resp.json()
count = 0
for article in data.get("results", []):
title = article.get("title", "").strip()
if not title:
continue
summary = _clean_html(article.get("abstract", ""))
if not summary:
summary = title
link = article.get("url", "")
pub_str = article.get("published_date", "")
try:
pub_dt = datetime.fromisoformat(pub_str.replace("Z", "+00:00"))
except (ValueError, AttributeError):
pub_dt = datetime.now(timezone.utc)
article_id = _article_hash(title, pub_dt.strftime("%Y%m%d"), source.name)
normalized_url = _normalize_url(link)
# RSS 수집부와 동일: 레거시 raw URL + 교차 게시 다중 매칭 내성 (first)
if await _already_ingested(session, article_id, normalized_url, link):
continue
if await _is_portal_duplicate(session, title):
logger.info(f"[{source.name}] portal-dup skip: {title[:60]}")
continue
category = _normalize_category(article.get("section", source.category or ""))
source_short = source.name.split(" ")[0]
ident = _doc_identity(source, source_short, category)
doc = _build_news_doc(
source, ident, source_short, article_id, title, summary,
"nyt_api", normalized_url, pub_dt,
)
session.add(doc)
await session.flush()
await _enqueue_processing(session, doc, source, pub_dt)
count += 1
logger.info(f"[{source.name}] API → {count}건 수집")
return count, "ok"