"""뉴스 수집 워커 — RSS/API에서 기사 수집, documents에 저장 plan crawl-24x7-1 A그룹 (2026-06-10): A-1 조건부 GET(ETag/Last-Modified 그대로 재전송) + 콘텐츠 해시 변경감지 A-2 fulltext_policy='page' 소스는 'fulltext' stage 로 본문 승격 위임 A-5 source_health 기록 + circuit breaker (소스별 실패 격리) A-6 first-wins + 포털 전재 2차 dedup (제목+최근 3일, 12자 이상 제목 한정) """ import asyncio import hashlib import re from datetime import datetime, timedelta, timezone from html import unescape from urllib.parse import parse_qsl, urlencode, urlparse, urlunparse import feedparser import httpx from sqlalchemy import select from core.crawl_politeness import CRAWL_UA from core.database import async_session from core.utils import setup_logger from models.document import Document from models.news_source import NewsSource from models.queue import enqueue_stage from models.source_health import SourceHealth logger = setup_logger("news_collector") # 카테고리 표준화 매핑 CATEGORY_MAP = { # 한국어 "국제": "International", "정치": "Politics", "경제": "Economy", "사회": "Society", "문화": "Culture", "산업": "Industry", "환경": "Environment", "기술": "Technology", # 영어 "World": "International", "International": "International", "World news": "International", # Guardian sectionName (B-2) "Technology": "Technology", "Tech": "Technology", "Sci-Tech": "Technology", "Arts": "Culture", "Culture": "Culture", "Climate": "Environment", "Environment": "Environment", # 일본어 "国際": "International", "文化": "Culture", "科学": "Technology", # 독일어 "Kultur": "Culture", "Wissenschaft": "Technology", # 프랑스어 "Environnement": "Environment", # 도메인 채널 (source_channel='crawl', 0-5 (a)) — 양쪽 공통 맵 "안전": "Safety", "Safety": "Safety", "공학": "Engineering", "Engineering": "Engineering", "철학": "Philosophy", "Philosophy": "Philosophy", } class FeedError(Exception): """소스 단위 fetch/parse 실패 — run() 이 source_health 실패로 기록.""" def _normalize_category(raw: str) -> str: """카테고리 표준화""" return CATEGORY_MAP.get(raw, CATEGORY_MAP.get(raw.strip(), "Other")) def _clean_html(text: str, max_len: int | None = 1000) -> str: """HTML 태그 제거 + 정제. max_len=None 이면 절단 없음 (feed-full 전문용).""" if not text: return "" text = re.sub(r"<[^>]+>", "", text) text = unescape(text) text = text.strip() return text if max_len is None else text[:max_len] # tracking 파라미터 판별 — prefix(utm_/at_=BBC/ns_=BBC/mc_=mailchimp) + 단독 키 _TRACKING_PREFIXES = ("utm_", "at_", "ns_", "mc_") _TRACKING_PARAMS = {"fbclid", "gclid", "igshid", "ref", "smid", "partner", "cmp", "ocid", "ftag"} def _normalize_url(url: str) -> str: """URL 정규화 — tracking 파라미터만 제거, 콘텐츠 식별 파라미터는 보존. query 전체 제거 금지: hada.io/topic?id= · aitimes articleView.html?idxno= · HN item?id= 등 query-식별 사이트에서 별개 기사가 같은 URL 로 붕괴된다. 저장(edit_url)·조회 양쪽이 이 함수를 공유해야 dedup 이 성립. """ parsed = urlparse(url) kept = [ (k, v) for k, v in parse_qsl(parsed.query, keep_blank_values=True) if not (k.lower().startswith(_TRACKING_PREFIXES) or k.lower() in _TRACKING_PARAMS) ] return urlunparse((parsed.scheme, parsed.netloc, parsed.path, "", urlencode(kept), "")) def _article_hash(title: str, published: str, source_name: str) -> str: """기사 고유 해시 (중복 체크용)""" key = f"{title}|{published}|{source_name}" return hashlib.sha256(key.encode()).hexdigest()[:32] def _normalize_to_utc(dt) -> datetime: """다양한 시간 형식을 UTC로 정규화""" if isinstance(dt, datetime): if dt.tzinfo is None: return dt.replace(tzinfo=timezone.utc) return dt.astimezone(timezone.utc) return datetime.now(timezone.utc) # ── A-5: circuit breaker 정책 ── # 연속 실패 >= OPEN 임계 → open (재시도 간격 지수 확대, 6h × 2^n, cap 48h) # 연속 실패 > DISABLE 임계 → disabled (수집 제외 + 가시 로그, 수동 복구 대상) # news_sources.enabled 는 건드리지 않는다 — 사용자 의도(enabled)와 자동 상태(circuit) 분리. _CIRCUIT_OPEN_AFTER = 3 _CIRCUIT_DISABLE_AFTER = 10 _BACKOFF_BASE_HOURS = 6 _BACKOFF_CAP_HOURS = 48 _EMPTY_STREAK_ALERT = 8 # 6h 사이클 × 8 = 약 2일 연속 빈 피드 → 가시 경고 def _should_attempt(health: SourceHealth, now: datetime) -> bool: """circuit 상태에 따라 이번 사이클 fetch 여부 결정. 주의 (B-3 계약 ②, r5): 추후 relogin_requested 플래그 소비는 반드시 이 open-스킵 분기보다 *앞*에 두어야 한다 — open 이 스케줄 제외 형태가 되면 배치 경계가 안 와 플래그가 영원히 미소비(half-open 데드 버튼)가 된다. """ if health.circuit_state == "disabled": return False if health.circuit_state == "open" and health.last_error_at is not None: over = max(health.consecutive_failures - _CIRCUIT_OPEN_AFTER, 0) backoff_h = min(_BACKOFF_BASE_HOURS * (2 ** over), _BACKOFF_CAP_HOURS) if now - health.last_error_at < timedelta(hours=backoff_h): return False return True def _record_success(health: SourceHealth, items: int, not_modified: bool, now: datetime) -> None: health.consecutive_failures = 0 health.total_fetches += 1 health.last_success_at = now health.last_fetch_items = items if health.circuit_state != "closed": logger.info(f"[health] source={health.source_id} circuit {health.circuit_state}→closed") health.circuit_state = "closed" health.circuit_opened_at = None # 빈 피드 streak: 304/해시동일은 정상 신호라 미집계, 200+entries 0 만 집계 (피드 부패 감시) if not_modified: pass elif items == 0: health.empty_streak += 1 if health.empty_streak >= _EMPTY_STREAK_ALERT: logger.error( f"[health] source={health.source_id} 빈 피드 {health.empty_streak}회 연속 " f"— 피드 부패 의심 (RSSHub 류 라우트 깨짐 패턴)" ) else: health.empty_streak = 0 health.updated_at = now def _record_failure(health: SourceHealth, error: str, now: datetime) -> None: health.consecutive_failures += 1 health.total_fetches += 1 health.total_failures += 1 health.last_error = error[:500] health.last_error_at = now health.updated_at = now cf = health.consecutive_failures if cf > _CIRCUIT_DISABLE_AFTER and health.circuit_state != "disabled": health.circuit_state = "disabled" logger.error( f"[health] source={health.source_id} 연속 실패 {cf}회 — circuit DISABLED " f"(수집 제외, A-8 패널에서 수동 복구 필요)" ) elif cf >= _CIRCUIT_OPEN_AFTER and health.circuit_state == "closed": health.circuit_state = "open" health.circuit_opened_at = now logger.warning(f"[health] source={health.source_id} 연속 실패 {cf}회 — circuit open") async def _get_or_create_health(session, source_id: int) -> SourceHealth: result = await session.execute( select(SourceHealth).where(SourceHealth.source_id == source_id) ) health = result.scalars().first() if health is None: health = SourceHealth(source_id=source_id) session.add(health) await session.flush() return health # 수동 POST /api/news/collect 와 6h 스케줄 사이클의 동시 실행 차단 (단일 프로세스·단일 # 이벤트루프). 동시 진입 시 _get_or_create_health 가 같은 source_id 를 양쪽에서 INSERT # → uq_source_health_source_id 위반 IntegrityError 로 사이클 전체가 죽는 경합의 원천 봉쇄. _run_lock = asyncio.Lock() async def run(): """뉴스 수집 실행""" async with _run_lock: await _run_locked() async def _run_locked(): now = datetime.now(timezone.utc) async with async_session() as session: result = await session.execute( select(NewsSource).where(NewsSource.enabled == True) ) sources = result.scalars().all() if not sources: logger.info("활성화된 뉴스 소스 없음") return total = 0 for source in sources: health = await _get_or_create_health(session, source.id) if not _should_attempt(health, now): logger.info(f"[{source.name}] circuit {health.circuit_state} — 이번 사이클 skip") continue try: if source.feed_type == "api": count, status = await _fetch_api(session, source) else: count, status = await _fetch_rss(session, source) source.last_fetched_at = datetime.now(timezone.utc) _record_success(health, count, status == "not_modified", now) total += count except Exception as e: # str 이 빈 예외(httpx.ConnectError('')) 대비 — health 기록과 동일 규칙 logger.error(f"[{source.name}] 수집 실패: {str(e) or repr(e)}") source.last_fetched_at = datetime.now(timezone.utc) _record_failure(health, str(e) or repr(e), now) await session.commit() logger.info(f"뉴스 수집 완료: {total}건 신규") MAX_RESPONSE_SIZE = 5 * 1024 * 1024 # 5MB ALLOWED_CONTENT_TYPES = ("application/rss+xml", "application/atom+xml", "application/xml", "text/xml") # 연결 재시도 간격 — MOEL 추가 실측(2026-06-11): 드랍이 연결 단위 랜덤이라 # 1.5s 후 재시도도 연속으로 걸리는 케이스 발생(직후 다른 연결은 즉시 성공) → 2회로 보강. _CONNECT_RETRY_DELAYS = (2.0, 5.0) async def _get_with_connect_retry(client, url: str): """연결 계층(TCP/TLS) 오류만 재시도(최대 2회) — HTTP 상태 오류는 비대상 (호출측 분기 보존). MOEL 실측(2026-06-11): 정부 사이트 보안장비가 TLS 핸드셰이크를 연결 단위로 간헐 드랍 (curl rc=35, 직후 재시도는 성공) → 사이클당 1회 fetch 인 피드 수집이 ConnectError('') 로 실패 누적·circuit open. 지속 장애는 그대로 circuit 몫. """ for delay in _CONNECT_RETRY_DELAYS: try: return await client.get(url) except (httpx.ConnectError, httpx.ConnectTimeout) as e: logger.info(f"연결 오류 {delay}s 후 재시도 ({url.split('?')[0]}): {repr(e)}") await asyncio.sleep(delay) return await client.get(url) async def _is_portal_duplicate(session, title: str) -> bool: """A-6 2차 dedup: 포털 전재본 vs 원본이 다른 URL 로 이중 적재되는 케이스. 보조 키 = 제목 + 최근 3일 (다른 소스/다른 URL 이므로 1차 키로 안 잡힘). 범용 제목 오탐 방지: 12자 미만 제목은 비적용. skip 은 전부 로그 (silent 누락 회피). """ if len(title) < 12: return False cutoff = datetime.now(timezone.utc) - timedelta(days=3) dup = await session.execute( select(Document.id).where( Document.title == title, Document.source_channel == "news", Document.file_format == "article", Document.extracted_at >= cutoff, ).limit(1) ) return dup.scalars().first() is not None async def _enqueue_processing(session, doc: Document, source: NewsSource, pub_dt: datetime) -> None: """후속 단계 enqueue. fulltext_policy='page' 소스는 'fulltext' stage 만 — summarize/embed/chunk 는 fulltext_worker 가 승격(또는 격하) 확정 후 enqueue (RSS 요약 선요약 → 풀텍스트 도착 시 summarize_worker 의 '이미 요약 있음 skip' 에 막히는 순서 함정 회피). """ if source.fetch_method == "signal-only": # B-4: 시그널 = 검색 색인만 (embed/chunk). fulltext/summarize 절대 enqueue 안 함 — # 레지스트리가 fulltext_policy='page' 로 잘못 설정돼도 페이지 fetch 0 (방어 우선). # 요약 LLM 스킵 = 맥미니 부하 0. 다이제스트/브리핑은 ai_summary IS NULL 문서를 # 처음부터 제외(services/digest/loader.py)하므로 시그널 문서가 자연 배제된다. if source.source_channel == "crawl" or (datetime.now(timezone.utc) - pub_dt).days <= 30: await enqueue_stage(session, doc.id, "embed") await enqueue_stage(session, doc.id, "chunk") return if source.fulltext_policy == "page" and doc.edit_url: await enqueue_stage(session, doc.id, "fulltext") return await enqueue_stage(session, doc.id, "summarize") if source.source_channel == "crawl": # 도메인 재료 코퍼스 — 발행일 무관 전량 색인 (30일 게이트는 뉴스 전용) await enqueue_stage(session, doc.id, "embed") await enqueue_stage(session, doc.id, "chunk") return days_old = (datetime.now(timezone.utc) - pub_dt).days if days_old <= 30: await enqueue_stage(session, doc.id, "embed") await enqueue_stage(session, doc.id, "chunk") def _entry_body(source: NewsSource, entry, summary: str) -> tuple[str, str]: """(body, extractor_version) — 정책별 본문 선택, 순수 함수 (shape 테스트 대상). signal-only: 피드 요약이 곧 본문 — 절단 없음 (arXiv 초록 1.3~1.6K자 보존, 1000자 cap 적용 시 초록 꼬리 유실). 페이지 fetch 는 어떤 경우에도 없음 (B-4). feed-full: 피드 본문이 전문인 소스만 신뢰 (truncate·광고 삽입이 흔해 일반 소스의 summary/content:encoded 를 전문으로 오인 저장 금지 — A-6). """ if source.fetch_method == "signal-only": body = _clean_html( entry.get("summary", "") or entry.get("description", ""), max_len=None ) return (body or summary), "rss-signal" if source.fulltext_policy == "feed-full": content_list = entry.get("content") or [] raw_body = content_list[0].get("value", "") if content_list else "" full_body = _clean_html(raw_body or entry.get("summary", ""), max_len=None) if len(full_body) > len(summary): return full_body, "rss-feed-full" return summary, "rss" def _build_extract_meta(source: NewsSource, pub_dt: datetime) -> dict: """fulltext_worker / 패널이 쓰는 출처 메타 (documents 에 source FK 가 없어 여기 기록).""" return { "source_id": source.id, "source_name": source.name, "published_at": pub_dt.isoformat(), } def _doc_identity(source: NewsSource, source_short: str, category: str) -> dict: """채널별 문서 정체성 — news 채널은 기존 값 그대로(무회귀), crawl 채널은 도메인 정체성. file_path 접두사가 곧 채널 디렉토리. ai_domain 은 다이제스트/검색 필터의 분기 축이라 crawl 채널이 'News' 를 오염시키지 않게 분리 (0-5 채널 레벨 분리 사상). """ if source.source_channel == "crawl": domain = category if category and category != "Other" else "Domain" return { "path_prefix": "crawl", "ai_domain": domain, "ai_tags": [f"{domain}/{source_short}"], } return { "path_prefix": "news", "ai_domain": "News", "ai_tags": [f"News/{source_short}/{category}"], } async def _fetch_rss(session, source: NewsSource) -> tuple[int, str]: """RSS 피드 수집 — redirect 재검증 + 크기/content-type 제한 + 조건부 GET (A-1). 반환 (신규 건수, 상태). 상태 'not_modified' = 304 또는 콘텐츠 해시 동일. 소스 단위 실패는 FeedError raise — run() 이 health 실패로 기록. """ from urllib.parse import urljoin from core.url_validator import validate_feed_url, HTTP_EXCEPTION_DOMAINS # HTTP 허용 여부: 소스 도메인이 allowlist에 있으면 HTTP 허용 # SCMP처럼 HTTPS 원본이 HTTP로 redirect하는 경우도 커버 source_hostname = urlparse(source.feed_url).hostname http_allowed = source_hostname in HTTP_EXCEPTION_DOMAINS # 순수 HTTP 소스인데 allowlist에 없으면 차단 if source.feed_url.startswith("http://") and not http_allowed: raise FeedError(f"HTTP 차단 (allowlist 미등록): {source_hostname}") # fetch 전 URL 재검증 (등록 이후 DNS 변경 대비) try: validate_feed_url(source.feed_url, allow_http=http_allowed) except ValueError as e: raise FeedError(f"URL 검증 실패: {e}") from e # A-1: 정직 UA + 조건부 GET — 서버가 준 워터마크를 받은 그대로 재전송 headers = {"User-Agent": CRAWL_UA} if source.etag: headers["If-None-Match"] = source.etag if source.last_modified: headers["If-Modified-Since"] = source.last_modified async with httpx.AsyncClient( timeout=10, follow_redirects=False, headers=headers ) as client: resp = await _get_with_connect_retry(client, source.feed_url) # 304 는 redirect 처리보다 먼저 — httpx 의 is_redirect 는 3xx 전체(304 포함)에 # True 라, 304 를 redirect 로 오인하면 location 없는 같은 URL 을 재요청해 # "redirect 3회 초과" 로 오류 처리됨(조건부 GET 안정 피드 전멸 버그). if resp.status_code == 304: logger.info(f"[{source.name}] 304 Not Modified — 본문 미전송") return 0, "not_modified" # redirect 수동 처리 (최대 3회, 각 target 재검증) — location 있는 진짜 redirect 만. # allowlist 도메인이면 redirect target의 HTTP도 허용 redirects = 0 while resp.has_redirect_location and redirects < 3: location = urljoin(str(resp.request.url), resp.headers["location"]) try: validate_feed_url(location, allow_http=http_allowed) except ValueError as e: raise FeedError(f"redirect target 차단: {e}") from e resp = await client.get(location) if resp.status_code == 304: logger.info(f"[{source.name}] 304 Not Modified (redirect 후) — 본문 미전송") return 0, "not_modified" redirects += 1 if resp.has_redirect_location: raise FeedError("redirect 3회 초과") resp.raise_for_status() if len(resp.content) > MAX_RESPONSE_SIZE: raise FeedError(f"응답 크기 초과: {len(resp.content)} bytes") ct = resp.headers.get("content-type", "").lower() if not any(t in ct for t in ALLOWED_CONTENT_TYPES): raise FeedError(f"비정상 content-type: {ct}") # A-1: 콘텐츠 해시 변경감지 (CDN 의 ETag 회전 대비 병행) — 저장된 해시는 항상 # 파싱 검증을 통과한 응답의 것이므로 동일성 비교는 파싱 전에 안전 new_etag = resp.headers.get("etag") new_last_modified = resp.headers.get("last-modified") content_hash = hashlib.sha256(resp.content).hexdigest() if source.feed_content_hash == content_hash: logger.info(f"[{source.name}] 콘텐츠 해시 동일 — 파싱 skip") return 0, "not_modified" feed = feedparser.parse(resp.text) if feed.bozo and not feed.entries: raise FeedError(f"RSS 파싱 실패: {feed.bozo_exception}") # A-1: 워터마크 영속은 파싱 검증 통과 후에만 — 부패(bozo) 응답의 ETag 를 저장하면 # 이후 304 로 영구 skip 되는 silent corruption 차단 if new_etag: source.etag = new_etag if new_last_modified: source.last_modified = new_last_modified source.feed_content_hash = content_hash count = 0 for entry in feed.entries: title = entry.get("title", "").strip() if not title: continue summary = _clean_html(entry.get("summary", "") or entry.get("description", "")) if not summary: summary = title # 정책별 본문 선택 — signal-only(무절단 요약) / feed-full(피드 전문) / 기본(요약) body, extractor_version = _entry_body(source, entry, summary) link = entry.get("link", "") # B-5 quirk: 비디오 항목 필터 (Aeon/Psyche — 텍스트 코퍼스에 비디오 페이지 무가치) if source.parser_quirk == "skip-video" and re.search(r"/videos?/", link): continue published = entry.get("published_parsed") or entry.get("updated_parsed") pub_dt = datetime(*published[:6], tzinfo=timezone.utc) if published else datetime.now(timezone.utc) # 중복 체크 — 레거시 행은 raw URL 로 저장돼 있어 normalized/raw 양쪽 매칭. # 교차 게시(같은 기사가 두 피드에 존재)로 2행 이상 매칭될 수 있어 first() 사용 # (scalar_one_or_none 은 MultipleResultsFound raise — 2026-06 BBC 수집 중단 원인). article_id = _article_hash(title, pub_dt.strftime("%Y%m%d"), source.name) normalized_url = _normalize_url(link) existing = await session.execute( select(Document).where( (Document.file_hash == article_id) | (Document.edit_url.in_([normalized_url, link])) ).limit(1) ) if existing.scalars().first(): continue # A-6 2차: 포털 전재 dedup (first-wins — 먼저 적재된 쪽이 정본) if await _is_portal_duplicate(session, title): logger.info(f"[{source.name}] portal-dup skip: {title[:60]}") continue category = _normalize_category(source.category or "") source_short = source.name.split(" ")[0] # "경향신문 문화" → "경향신문" ident = _doc_identity(source, source_short, category) doc = Document( file_path=f"{ident['path_prefix']}/{source.name}/{article_id}", file_hash=article_id, file_format="article", file_size=len(body.encode()), file_type="note", title=title, extracted_text=f"{title}\n\n{body}", extracted_at=datetime.now(timezone.utc), extractor_version=extractor_version, # article = 텍스트 네이티브(본문=extracted_text). markdown 단계 미enqueue 라 # 기본값 'pending' 이면 영구 비수렴 → backlog 지표 오염 + md_status_pending partial # 인덱스 비대. 생성 시점에 terminal 'skipped' 로 명시(변환 비대상). # fulltext_policy='page' 소스는 fulltext_worker 가 승격 시 success 로 갱신. md_status="skipped", md_extraction_error="news article: 텍스트 네이티브, markdown 변환 비대상", source_channel=source.source_channel, data_origin="external", # 조회와 동일하게 정규화해 저장 — raw(tracking param 포함) 저장 시 URL dedup 무력화 edit_url=normalized_url, review_status="approved", ai_domain=ident["ai_domain"], ai_sub_group=source_short, ai_tags=ident["ai_tags"], extract_meta=_build_extract_meta(source, pub_dt), ) session.add(doc) await session.flush() # summarize + embed + chunk 등록 (classify 불필요). # page 정책 소스는 fulltext 만 — 후속은 fulltext_worker 가 확정 후 enqueue. await _enqueue_processing(session, doc, source, pub_dt) count += 1 logger.info(f"[{source.name}] RSS → {count}건 수집") return count, "ok" async def _fetch_api(session, source: NewsSource) -> tuple[int, str]: """API 소스 디스패치 — feed_url 호스트로 제공자 판별 (B-2). 레거시 NYT 행(feed_url=api.nytimes.com)은 무변경 경로. 신규 제공자는 호스트 분기 추가. 미지의 호스트 = NYT 경로로 넘기지 않고 명시 실패 (silent fallback 금지). """ host = (urlparse(source.feed_url).hostname or "").lower() if host.endswith("guardianapis.com"): return await _fetch_api_guardian(session, source) if host.endswith("nytimes.com"): return await _fetch_api_nyt(session, source) raise FeedError(f"API 제공자 미등록 호스트: {host} — 디스패치 분기 추가 필요") def _guardian_request(feed_url: str, api_key: str) -> tuple[str, dict]: """Guardian 호출 형태 단일 source-of-truth — fixture 회귀 테스트 대상 (tests/fixtures/guardian_open_platform_search_response.json 박제 시 호출과 동일해야 함).""" parsed = urlparse(feed_url) params = { **dict(parse_qsl(parsed.query)), "show-fields": "bodyText,trailText", "page-size": "20", "order-by": "newest", "api-key": api_key, } return f"{parsed.scheme}://{parsed.netloc}{parsed.path}", params async def _fetch_api_guardian(session, source: NewsSource) -> tuple[int, str]: """Guardian Open Platform 수집 (B-2) — show-fields=bodyText 로 정식 전문 JSON. feed_url 에 section 쿼리를 박아 등록 (예: https://content.guardianapis.com/search?section=world). 전문이 API 로 오므로 fulltext stage 불요. 키 미설정 = FeedError (health 실패 기록, silent fallback 없음 — [[feedback_no_silent_fallback_explicit_opt_in]]). """ import os api_key = os.getenv("GUARDIAN_API_KEY", "") if not api_key: raise FeedError("GUARDIAN_API_KEY 미설정 — Guardian 수집 불가") endpoint, params = _guardian_request(source.feed_url, api_key) try: async with httpx.AsyncClient(timeout=15) as client: resp = await client.get(endpoint, params=params) resp.raise_for_status() except httpx.HTTPStatusError as e: # 쿼리스트링(api-key 포함) 제거 — path 까지만 로깅 (NYT 와 동일 규율) safe_url = str(e.request.url).split("?")[0] raise FeedError(f"Guardian API 실패: {e.response.status_code} @ {safe_url}") from e except httpx.RequestError as e: safe_url = str(e.request.url).split("?")[0] if e.request else "unknown" raise FeedError(f"Guardian API 연결 실패: {safe_url}") from e payload = resp.json().get("response", {}) if payload.get("status") != "ok": raise FeedError(f"Guardian API status={payload.get('status')}") count = 0 for item in payload.get("results", []): title = (item.get("webTitle") or "").strip() if not title: continue fields = item.get("fields") or {} body_text = (fields.get("bodyText") or "").strip() trail = _clean_html(fields.get("trailText") or "") # bodyText = plain text 전문 (HTML 정화 불요). 짧으면(라이브 블로그 잔재 등) trail 격하. is_full = len(body_text) >= 200 body = body_text if is_full else (trail or title) link = item.get("webUrl", "") pub_str = item.get("webPublicationDate", "") try: pub_dt = datetime.fromisoformat(pub_str.replace("Z", "+00:00")) except (ValueError, AttributeError): pub_dt = datetime.now(timezone.utc) article_id = _article_hash(title, pub_dt.strftime("%Y%m%d"), source.name) normalized_url = _normalize_url(link) # RSS 수집부와 동일: 레거시 raw URL + 교차 게시 다중 매칭 내성 (first) existing = await session.execute( select(Document).where( (Document.file_hash == article_id) | (Document.edit_url.in_([normalized_url, link])) ).limit(1) ) if existing.scalars().first(): continue if await _is_portal_duplicate(session, title): logger.info(f"[{source.name}] portal-dup skip: {title[:60]}") continue category = _normalize_category(item.get("sectionName", source.category or "")) source_short = source.name.split(" ")[0] ident = _doc_identity(source, source_short, category) doc = Document( file_path=f"{ident['path_prefix']}/{source.name}/{article_id}", file_hash=article_id, file_format="article", file_size=len(body.encode()), file_type="note", title=title, extracted_text=f"{title}\n\n{body}", extracted_at=datetime.now(timezone.utc), extractor_version="guardian_api_full" if is_full else "guardian_api", md_status="skipped", md_extraction_error="news article: 텍스트 네이티브, markdown 변환 비대상", source_channel=source.source_channel, data_origin="external", edit_url=normalized_url, review_status="approved", ai_domain=ident["ai_domain"], ai_sub_group=source_short, ai_tags=ident["ai_tags"], extract_meta=_build_extract_meta(source, pub_dt), ) session.add(doc) await session.flush() await _enqueue_processing(session, doc, source, pub_dt) count += 1 logger.info(f"[{source.name}] API → {count}건 수집") return count, "ok" async def _fetch_api_nyt(session, source: NewsSource) -> tuple[int, str]: """NYT API 수집 — 키 마스킹 + health degradation""" import os nyt_key = os.getenv("NYT_API_KEY", "") if not nyt_key: raise FeedError("NYT_API_KEY 미설정 — US 뉴스 수집 불가") try: async with httpx.AsyncClient(timeout=10) as client: resp = await client.get( f"https://api.nytimes.com/svc/topstories/v2/{source.category or 'world'}.json", params={"api-key": nyt_key}, ) resp.raise_for_status() except httpx.HTTPStatusError as e: # 쿼리스트링(api-key 포함) 제거 — path까지만 로깅 safe_url = str(e.request.url).split("?")[0] raise FeedError(f"NYT API 실패: {e.response.status_code} @ {safe_url}") from e except httpx.RequestError as e: safe_url = str(e.request.url).split("?")[0] if e.request else "unknown" raise FeedError(f"NYT API 연결 실패: {safe_url}") from e data = resp.json() count = 0 for article in data.get("results", []): title = article.get("title", "").strip() if not title: continue summary = _clean_html(article.get("abstract", "")) if not summary: summary = title link = article.get("url", "") pub_str = article.get("published_date", "") try: pub_dt = datetime.fromisoformat(pub_str.replace("Z", "+00:00")) except (ValueError, AttributeError): pub_dt = datetime.now(timezone.utc) article_id = _article_hash(title, pub_dt.strftime("%Y%m%d"), source.name) normalized_url = _normalize_url(link) # RSS 수집부와 동일: 레거시 raw URL + 교차 게시 다중 매칭 내성 (first) existing = await session.execute( select(Document).where( (Document.file_hash == article_id) | (Document.edit_url.in_([normalized_url, link])) ).limit(1) ) if existing.scalars().first(): continue if await _is_portal_duplicate(session, title): logger.info(f"[{source.name}] portal-dup skip: {title[:60]}") continue category = _normalize_category(article.get("section", source.category or "")) source_short = source.name.split(" ")[0] ident = _doc_identity(source, source_short, category) doc = Document( file_path=f"{ident['path_prefix']}/{source.name}/{article_id}", file_hash=article_id, file_format="article", file_size=len(summary.encode()), file_type="note", title=title, extracted_text=f"{title}\n\n{summary}", extracted_at=datetime.now(timezone.utc), extractor_version="nyt_api", # article = 텍스트 네이티브(본문=extracted_text). markdown 단계 미enqueue 라 # 기본값 'pending' 이면 영구 비수렴 → backlog 지표 오염 + md_status_pending partial # 인덱스 비대. 생성 시점에 terminal 'skipped' 로 명시(변환 비대상). md_status="skipped", md_extraction_error="news article: 텍스트 네이티브, markdown 변환 비대상", source_channel=source.source_channel, data_origin="external", edit_url=normalized_url, review_status="approved", ai_domain=ident["ai_domain"], ai_sub_group=source_short, ai_tags=ident["ai_tags"], extract_meta=_build_extract_meta(source, pub_dt), ) session.add(doc) await session.flush() await _enqueue_processing(session, doc, source, pub_dt) count += 1 logger.info(f"[{source.name}] API → {count}건 수집") return count, "ok"