feat(news): crawl-24x7 A그룹 — 레지스트리 증축·조건부 GET·fulltext 승격·politeness·source_health

A-3 migrations 319-323 (news_sources 9컬럼 + source_channel 'crawl' + process_stage 'fulltext' + source_health)
A-1 조건부 GET(ETag/Last-Modified 그대로 재전송)+콘텐츠 해시 변경감지, A-4 politeness 코어(per-domain 직렬+robots+정직UA),
A-2+A-7 fulltext_worker(4-tier 재사용·NAS crawl_raw gzip 보존·격하 경로·03:40 reconcile 안전망),
A-5 circuit breaker(3/10 임계, enabled 미터치), A-6 포털 전재 2차 dedup(제목+3일, 12자 게이트).
기존 소스 fulltext_policy='none' 기본 = 무회귀. plan crawl-24x7-1, 예외 박제 crawl-24x7-exec1-20260610.md
This commit is contained in:
hyungi
2026-06-10 13:03:31 +09:00
parent acd595244a
commit 7cd8cfde0a
15 changed files with 751 additions and 54 deletions
+174
View File
@@ -0,0 +1,174 @@
"""크롤링 politeness 코어 (A-4, plan crawl-24x7-1)
개인 아카이빙 권장치를 그대로 박은 공용 fetch 계층:
- per-domain 동시성 1 (asyncio.Lock) + 같은 도메인 연속 요청 515초 지연 + jitter
- robots.txt 존중 (urllib.robotparser, 24h 캐시) — 비로그인 공개 크롤링 한정.
로그인 세션 fetch (B-3) 는 사용자 행위 성격이라 robots 대신 사람 속도가 기준.
- 정직 식별 UA + 연락처 (익명 크롤링 트랙. 로그인 세션은 브라우저 UA 유지 — B-3)
- 429 = Retry-After 존중 / 5xx = 재시도 가능 / 403 = 차단 신호 (호출측 circuit 연동)
도메인별 마지막 요청 시각 등 rate 상태는 in-process (영속 워터마크는 DB — news_sources).
SSRF 차단은 core.url_validator.validate_feed_url 재사용 (redirect target 재검증 포함).
"""
import asyncio
import logging
import random
import time
import urllib.robotparser
from urllib.parse import urljoin, urlparse
import httpx
from core.url_validator import validate_feed_url
logger = logging.getLogger("crawl_politeness")
# 정직 식별 UA + 연락처 — 차단 전 연락 통로 (A-4)
CRAWL_UA = "HyungiPKM-Archiver/1.0 (personal archive; +mailto:hyun49196@gmail.com)"
# 같은 도메인 연속 요청 간격 (초) — 권장치 515s + jitter
_DOMAIN_DELAY_MIN = 5.0
_DOMAIN_DELAY_MAX = 15.0
_ROBOTS_CACHE_TTL = 24 * 3600 # 24h
_MAX_PAGE_BYTES = 5 * 1024 * 1024 # 피드 fetch 와 동일 5MB cap
_PAGE_TIMEOUT = 20.0
_MAX_REDIRECTS = 3
_HTML_CONTENT_TYPES = ("text/html", "application/xhtml+xml")
class CrawlFetchError(Exception):
"""일시 오류 (5xx / timeout / 네트워크) — 큐 재시도 대상."""
class CrawlBlocked(Exception):
"""차단 신호 (403 / 429 / robots disallow) — 재시도보다 backoff/circuit 대상."""
class CrawlSkip(Exception):
"""영구 비대상 (비-HTML / 크기 초과 / SSRF 차단 / 4xx) — 격하 처리 대상."""
# 도메인별 직렬화 상태 (in-process)
_domain_locks: dict[str, asyncio.Lock] = {}
_domain_last_request: dict[str, float] = {}
# host → (cached_at, RobotFileParser | None). None = robots 없음/4xx (전부 허용)
_robots_cache: dict[str, tuple[float, urllib.robotparser.RobotFileParser | None]] = {}
def _domain_of(url: str) -> str:
return (urlparse(url).hostname or "").lower()
def _get_lock(domain: str) -> asyncio.Lock:
if domain not in _domain_locks:
_domain_locks[domain] = asyncio.Lock()
return _domain_locks[domain]
async def _respect_domain_rate(domain: str) -> None:
"""같은 도메인 직전 요청에서 5–15초(jitter) 경과할 때까지 대기."""
last = _domain_last_request.get(domain)
if last is not None:
delay = random.uniform(_DOMAIN_DELAY_MIN, _DOMAIN_DELAY_MAX)
wait = last + delay - time.monotonic()
if wait > 0:
await asyncio.sleep(wait)
async def _fetch_robots(client: httpx.AsyncClient, scheme: str, host: str):
"""robots.txt 조회. 4xx/부재 = 전부 허용(None), 5xx/오류 = 보수적으로 이번 사이클 차단."""
robots_url = f"{scheme}://{host}/robots.txt"
try:
resp = await client.get(robots_url, headers={"User-Agent": CRAWL_UA})
except httpx.HTTPError as e:
raise CrawlFetchError(f"robots.txt 조회 실패: {host}: {e}") from e
if resp.status_code >= 500:
# 5xx 는 의도 불명 — 표준 관행대로 이번 사이클은 차단 취급
raise CrawlFetchError(f"robots.txt 5xx: {host}: {resp.status_code}")
if resp.status_code >= 400:
return None # robots 없음 = 전부 허용
rp = urllib.robotparser.RobotFileParser()
rp.parse(resp.text.splitlines())
return rp
async def _robots_allows(client: httpx.AsyncClient, url: str) -> bool:
parsed = urlparse(url)
host = (parsed.hostname or "").lower()
cached = _robots_cache.get(host)
if cached is None or time.monotonic() - cached[0] > _ROBOTS_CACHE_TTL:
rp = await _fetch_robots(client, parsed.scheme or "https", host)
_robots_cache[host] = (time.monotonic(), rp)
cached = _robots_cache[host]
rp = cached[1]
if rp is None:
return True
return rp.can_fetch(CRAWL_UA, url)
async def fetch_page(url: str, *, check_robots: bool = True) -> tuple[str, str]:
"""공개 페이지 1건 politeness fetch. (html_text, final_url) 반환.
- SSRF 검증 (redirect target 포함, news_collector 피드 fetch 와 동일 이중 검증)
- per-domain 동시성 1 + 515s jitter 지연
- 429: Retry-After 로그 후 CrawlBlocked / 403: CrawlBlocked / 그 외 4xx: CrawlSkip
- 5xx/timeout: CrawlFetchError (큐 재시도)
- 비-HTML content-type / 5MB 초과: CrawlSkip
"""
try:
validate_feed_url(url)
except ValueError as e:
raise CrawlSkip(f"URL 검증 실패: {e}") from e
domain = _domain_of(url)
async with _get_lock(domain):
await _respect_domain_rate(domain)
try:
async with httpx.AsyncClient(
timeout=_PAGE_TIMEOUT, follow_redirects=False,
headers={"User-Agent": CRAWL_UA},
) as client:
if check_robots and not await _robots_allows(client, url):
raise CrawlBlocked(f"robots.txt disallow: {url}")
resp = await client.get(url)
redirects = 0
while resp.is_redirect and redirects < _MAX_REDIRECTS:
location = urljoin(str(resp.request.url), resp.headers.get("location", ""))
try:
validate_feed_url(location)
except ValueError as e:
raise CrawlSkip(f"redirect target 차단: {e}") from e
# redirect 도 같은 도메인 연속 요청 — 간격은 lock 보유로 충분 (즉시 1회)
resp = await client.get(location)
redirects += 1
if resp.is_redirect:
raise CrawlSkip(f"redirect {_MAX_REDIRECTS}회 초과: {url}")
except httpx.TimeoutException as e:
raise CrawlFetchError(f"timeout: {url}") from e
except httpx.HTTPError as e:
raise CrawlFetchError(f"네트워크 오류: {url}: {e}") from e
finally:
_domain_last_request[domain] = time.monotonic()
if resp.status_code == 429:
retry_after = resp.headers.get("retry-after", "")
logger.warning("[politeness] 429 %s (Retry-After=%s)", domain, retry_after or "-")
raise CrawlBlocked(f"429 rate limited: {url} (Retry-After={retry_after or '-'})")
if resp.status_code == 403:
raise CrawlBlocked(f"403 forbidden: {url}")
if resp.status_code >= 500:
raise CrawlFetchError(f"{resp.status_code}: {url}")
if resp.status_code >= 400:
raise CrawlSkip(f"{resp.status_code}: {url}")
ct = resp.headers.get("content-type", "").lower()
if ct and not any(t in ct for t in _HTML_CONTENT_TYPES):
raise CrawlSkip(f"비-HTML content-type: {ct}: {url}")
if len(resp.content) > _MAX_PAGE_BYTES:
raise CrawlSkip(f"크기 초과: {len(resp.content)} bytes: {url}")
return resp.text, str(resp.request.url)
+4
View File
@@ -54,6 +54,7 @@ async def lifespan(app: FastAPI):
from workers.law_monitor import run as law_monitor_run from workers.law_monitor import run as law_monitor_run
from workers.mailplus_archive import run as mailplus_run from workers.mailplus_archive import run as mailplus_run
from workers.news_collector import run as news_collector_run from workers.news_collector import run as news_collector_run
from workers.fulltext_worker import reconcile_unresolved as fulltext_reconcile_run
from workers.queue_consumer import consume_queue, consume_markdown_queue from workers.queue_consumer import consume_queue, consume_markdown_queue
from workers.study_queue_consumer import consume_study_queue from workers.study_queue_consumer import consume_study_queue
from workers.study_session_queue_consumer import consume_study_session_queue from workers.study_session_queue_consumer import consume_study_session_queue
@@ -121,6 +122,9 @@ async def lifespan(app: FastAPI):
# 이드 W3-2: 공부중 토픽 약점 derived 스냅샷 (nightly 04:30 KST, LLM 0). study_diagnosis 표면 source. # 이드 W3-2: 공부중 토픽 약점 derived 스냅샷 (nightly 04:30 KST, LLM 0). study_diagnosis 표면 source.
scheduler.add_job(study_weakness_run, CronTrigger(hour=4, minute=30, timezone=KST), id="study_weakness") scheduler.add_job(study_weakness_run, CronTrigger(hour=4, minute=30, timezone=KST), id="study_weakness")
scheduler.add_job(news_collector_run, "interval", hours=6, id="news_collector") scheduler.add_job(news_collector_run, "interval", hours=6, id="news_collector")
# crawl-24x7 A-2 안전망: fulltext 영구 실패(3회 소진) 문서를 RSS 요약 기준으로
# 후속 enqueue (silent skip 누적 방지). 03:40 = dedup_reconcile(03:30) 직후 비충돌 슬롯.
scheduler.add_job(fulltext_reconcile_run, CronTrigger(hour=3, minute=40, timezone=KST), id="fulltext_reconcile")
# plan ds-s1-backend-1 B-4: dedup 컬럼(duplicate_of/duplicate_count) 야간 절대 재계산. # plan ds-s1-backend-1 B-4: dedup 컬럼(duplicate_of/duplicate_count) 야간 절대 재계산.
# soft-delete 잔여 드리프트 정리(멱등, 드리프트 없으면 no-op). cron 03:30 (다른 잡과 비충돌). # soft-delete 잔여 드리프트 정리(멱등, 드리프트 없으면 no-op). cron 03:30 (다른 잡과 비충돌).
scheduler.add_job(dedup_reconcile_run, CronTrigger(hour=3, minute=30, timezone=KST), id="dedup_reconcile") scheduler.add_job(dedup_reconcile_run, CronTrigger(hour=3, minute=30, timezone=KST), id="dedup_reconcile")
+1 -1
View File
@@ -118,7 +118,7 @@ class Document(Base):
source_channel: Mapped[str | None] = mapped_column( source_channel: Mapped[str | None] = mapped_column(
Enum("law_monitor", "devonagent", "email", "web_clip", Enum("law_monitor", "devonagent", "email", "web_clip",
"tksafety", "inbox_route", "manual", "drive_sync", "news", "memo", "tksafety", "inbox_route", "manual", "drive_sync", "news", "memo",
"voice", "hermes", "voice", "hermes", "crawl",
name="source_channel") name="source_channel")
) )
# 외부 채널 (Hermes Discord 등) 의 channel/user/message_id/timestamp 메타. # 외부 채널 (Hermes Discord 등) 의 channel/user/message_id/timestamp 메타.
+21 -1
View File
@@ -2,7 +2,8 @@
from datetime import datetime from datetime import datetime
from sqlalchemy import Boolean, DateTime, String, Text from sqlalchemy import Boolean, DateTime, Integer, String, Text
from sqlalchemy.dialects.postgresql import JSONB
from sqlalchemy.orm import Mapped, mapped_column from sqlalchemy.orm import Mapped, mapped_column
from core.database import Base from core.database import Base
@@ -23,3 +24,22 @@ class NewsSource(Base):
created_at: Mapped[datetime] = mapped_column( created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), default=datetime.now DateTime(timezone=True), default=datetime.now
) )
# ── A-3 (plan crawl-24x7-1) 레지스트리 증축 — migration 319 ──
# fetch_method: rss / rss+page / sitemap+page / page / api / signal-only
fetch_method: Mapped[str] = mapped_column(String(20), default="rss")
# fulltext_policy: none(현행) / page(기사 페이지 fetch 후 4-tier 승격) / feed-full(피드 본문이 전문)
fulltext_policy: Mapped[str] = mapped_column(String(20), default="none")
# NULL=공개, 값=구독 세션 키 (B-3 Playwright 어댑터 슬롯)
auth_profile: Mapped[str | None] = mapped_column(String(50))
# 소스별 차등 폴링 (NULL=전역 6h 사이클)
poll_interval_minutes: Mapped[int | None] = mapped_column(Integer)
# 조건부 GET 워터마크 — 서버가 준 값 그대로 저장·재전송 (A-1)
etag: Mapped[str | None] = mapped_column(Text)
last_modified: Mapped[str | None] = mapped_column(Text)
# CDN ETag 회전 대비 콘텐츠 해시 변경감지 병행 (A-1)
feed_content_hash: Mapped[str | None] = mapped_column(String(64))
# 추출 실패 잦은 소스의 site-specific CSS selector (A-2)
selector_override: Mapped[dict | None] = mapped_column(JSONB)
# rdf / table-strip / gn-redirect 등 파서 특이 케이스 (B-5)
parser_quirk: Mapped[str | None] = mapped_column(String(30))
+2 -1
View File
@@ -18,10 +18,11 @@ class ProcessingQueue(Base):
stage: Mapped[str] = mapped_column( stage: Mapped[str] = mapped_column(
# 'stt' (audio): migration 150 / 'thumbnail' (video): queue_consumer 가 enqueue. # 'stt' (audio): migration 150 / 'thumbnail' (video): queue_consumer 가 enqueue.
# 'deep_summary' (PR-B B-1): classify_worker 가 에스컬레이션 시 enqueue. # 'deep_summary' (PR-B B-1): classify_worker 가 에스컬레이션 시 enqueue.
# 'fulltext' (crawl-24x7 A-2): migration 321 — 기사 페이지 fetch 후 본문 승격.
# DB enum 변경은 마이그레이션이 처리하므로 create_type=False. # DB enum 변경은 마이그레이션이 처리하므로 create_type=False.
Enum( Enum(
"extract", "classify", "summarize", "embed", "chunk", "preview", "extract", "classify", "summarize", "embed", "chunk", "preview",
"stt", "thumbnail", "deep_summary", "markdown", "stt", "thumbnail", "deep_summary", "markdown", "fulltext",
name="process_stage", name="process_stage",
create_type=False, create_type=False,
), ),
+36
View File
@@ -0,0 +1,36 @@
"""source_health 테이블 ORM (A-5, plan crawl-24x7-1)
news_sources 와 1:1. 소스별 fetch 성공/실패 기록 + circuit breaker 상태.
silent skip 누적 방지의 가시성 기반 — A-8 헬스 패널이 읽는다.
"""
from datetime import datetime
from sqlalchemy import BigInteger, DateTime, ForeignKey, Integer, String, Text
from sqlalchemy.orm import Mapped, mapped_column
from core.database import Base
class SourceHealth(Base):
__tablename__ = "source_health"
id: Mapped[int] = mapped_column(primary_key=True)
source_id: Mapped[int] = mapped_column(
Integer, ForeignKey("news_sources.id", ondelete="CASCADE"), nullable=False
)
consecutive_failures: Mapped[int] = mapped_column(Integer, default=0)
total_fetches: Mapped[int] = mapped_column(BigInteger, default=0)
total_failures: Mapped[int] = mapped_column(BigInteger, default=0)
last_success_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True))
last_error: Mapped[str | None] = mapped_column(Text)
last_error_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True))
last_fetch_items: Mapped[int | None] = mapped_column(Integer)
# 200 인데 entries 0 인 연속 fetch 횟수 (304/해시동일은 미집계 — 피드 부패 신호 전용)
empty_streak: Mapped[int] = mapped_column(Integer, default=0)
# closed(정상) / open(연속 실패 → 지수 backoff) / disabled(임계 초과, 수동 복구 대상)
circuit_state: Mapped[str] = mapped_column(String(10), default="closed")
circuit_opened_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True))
updated_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), default=datetime.now
)
+5 -2
View File
@@ -17,10 +17,13 @@ python-multipart>=0.0.9
jinja2>=3.1.0 jinja2>=3.1.0
feedparser>=6.0.0 feedparser>=6.0.0
pymupdf>=1.24.0 pymupdf>=1.24.0
# Web/Blog ingest (devonagent 트랙) — HTML 본문 정화 4-tier fallback # Web/Blog ingest (devonagent 트랙) + 뉴스 fulltext 승격 (crawl-24x7 A-2) — 4-tier fallback.
trafilatura>=1.12.0 # trafilatura 는 단일 메인테이너 리스크로 exact pin (A-2 결정).
trafilatura==2.1.0
readability-lxml>=0.8.1 readability-lxml>=0.8.1
markdownify>=0.13.1 markdownify>=0.13.1
# tier-4 (bs4) 가 직접 import — 전이 의존 가정 제거 (crawl-24x7 A-2)
beautifulsoup4>=4.12.0
# office OOXML(docx/xlsx/pptx) → md (plan ds-s1-backend-1 C-1). # office OOXML(docx/xlsx/pptx) → md (plan ds-s1-backend-1 C-1).
# 정확한 핀은 E-1 markitdown OOXML PoC(devsbx/버전핀 컨텍스트)에서 확정. # 정확한 핀은 E-1 markitdown OOXML PoC(devsbx/버전핀 컨텍스트)에서 확정.
markitdown[docx,xlsx,pptx]>=0.1.0 markitdown[docx,xlsx,pptx]>=0.1.0
+218
View File
@@ -0,0 +1,218 @@
"""fulltext 승격 워커 (A-2 + A-7, plan crawl-24x7-1)
news_collector 가 fulltext_policy='page' 소스의 기사에 enqueue 한 'fulltext' stage 를 소비:
기사 페이지 politeness fetch (A-4) → 원본 HTML NAS gzip 보존 (A-7)
→ extract_worker 4-tier 재사용 (tier 2 sibling .md 는 디스크 원본이 없어 비적용)
→ extracted_text/md_content 승격 → summarize + (30일 게이트) embed/chunk enqueue.
실패 처리 (큐 어휘 = DB enum, 분기만 워커):
- 일시 오류 (5xx/timeout) : raise → 큐 재시도 (max_attempts 3)
- 차단/비대상 (403/429/robots/비HTML/추출부족): RSS 요약으로 격하(degrade) 후 완료
→ summarize/embed/chunk enqueue 보장 (기사 유실 0). 격하 사유는 extract_meta.fulltext 에 기록.
- 영구 실패 (3회 소진) : 야간 reconcile_unresolved() 가 summarize 안전망 enqueue
([[feedback_silent_skip_accumulation]] — 조건부 skip 이 영구 침묵으로 누적되지 않게).
승격 게이트: 전 tier 공통 본문 >= 200자 (devonagent 와 달리 tier 4 도 게이트 적용 —
페이월/오류 페이지의 nav 찌꺼기를 본문으로 승격하느니 RSS 요약 격하가 낫다).
"""
import gzip
import hashlib
import re
from datetime import datetime, timezone
from pathlib import Path
from sqlalchemy import exists, select
from sqlalchemy.ext.asyncio import AsyncSession
from core.config import settings
from core.crawl_politeness import CrawlBlocked, CrawlFetchError, CrawlSkip, fetch_page
from core.database import async_session
from core.utils import setup_logger
from models.document import Document
from models.queue import ProcessingQueue, enqueue_stage
from workers.extract_worker import (
_WEB_MIN_BODY_LEN,
_extract_web_with_bs4,
_extract_web_with_readability,
_extract_web_with_trafilatura,
)
logger = setup_logger("fulltext_worker")
# 한국 기사 푸터 1층 후처리 (A-2) — 보수적으로 라인 단위만 제거
_FOOTER_PATTERNS = [
re.compile(r"^.{0,120}(무단\s*전재|무단\s*복제|재배포\s*금지|저작권자\s*[ⓒ©(]).*$", re.M),
re.compile(r"^[\w.+-]+@[\w.-]+\.[A-Za-z]{2,}\s*$", re.M), # 단독 이메일 라인
re.compile(r"^\s*\S{2,4}\s*기자\s*$", re.M), # 단독 '◯◯◯ 기자' 라인
]
def _strip_article_footer(body: str) -> str:
for pat in _FOOTER_PATTERNS:
body = pat.sub("", body)
return re.sub(r"\n{3,}", "\n\n", body).strip()
def _extract_body(html_text: str) -> tuple[str, str | None, str | None]:
"""(body, engine, engine_version). 전 tier >= 200자 게이트, 미달이면 ("", None, None)."""
body, ver = _extract_web_with_trafilatura(html_text)
if body and len(body) >= _WEB_MIN_BODY_LEN:
return body, "trafilatura", ver
body, ver = _extract_web_with_readability(html_text)
if body and len(body) >= _WEB_MIN_BODY_LEN:
return body, "readability", ver
body, ver = _extract_web_with_bs4(html_text)
if body and len(body) >= _WEB_MIN_BODY_LEN:
return body, "bs4_text", ver
return "", None, None
def _raw_html_path(source_id: int | None, file_hash: str, now: datetime) -> Path:
"""A-7 원본 보존 경로 — NAS 본진. 한글 디렉토리의 NFC/NFD 비대칭을 피해 source_id 사용."""
src_dir = f"src_{source_id}" if source_id is not None else "src_unknown"
return (
Path(settings.nas_mount_path) / "crawl_raw" / src_dir
/ now.strftime("%Y-%m") / f"{file_hash}.html.gz"
)
def _save_raw_html(path: Path, html_text: str) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
with gzip.open(path, "wb") as f:
f.write(html_text.encode("utf-8", errors="replace"))
async def _enqueue_downstream(session: AsyncSession, doc: Document) -> None:
"""승격/격하 공통 후속 — summarize 무조건 + 30일 게이트 통과 시 embed/chunk."""
await enqueue_stage(session, doc.id, "summarize")
published_raw = (doc.extract_meta or {}).get("published_at")
days_old = 0
if published_raw:
try:
pub_dt = datetime.fromisoformat(published_raw)
days_old = (datetime.now(timezone.utc) - pub_dt).days
except ValueError:
days_old = 0 # 파싱 불가 = 신규 취급 (수집 시점 기본과 동일)
if days_old <= 30:
await enqueue_stage(session, doc.id, "embed")
await enqueue_stage(session, doc.id, "chunk")
def _set_fulltext_meta(doc: Document, **fields) -> None:
"""extract_meta.fulltext 갱신 — JSONB 변경 감지를 위해 dict 재할당."""
meta = dict(doc.extract_meta or {})
meta["fulltext"] = {**meta.get("fulltext", {}), **fields}
doc.extract_meta = meta
async def _degrade(session: AsyncSession, doc: Document, reason: str) -> None:
"""본문 승격 실패 — RSS 요약 그대로 후속 단계 진행 (기사 유실 0)."""
_set_fulltext_meta(
doc, status="degraded", reason=reason[:300],
resolved_at=datetime.now(timezone.utc).isoformat(),
)
await _enqueue_downstream(session, doc)
logger.warning(f"[fulltext] doc={doc.id} 격하(RSS 요약 유지): {reason}")
async def process(document_id: int, session: AsyncSession) -> None:
"""기사 1건 풀텍스트 승격. queue_consumer 컨벤션 시그니처 (커밋은 consumer 가)."""
doc = await session.get(Document, document_id)
if not doc:
raise ValueError(f"문서 ID {document_id}를 찾을 수 없음")
if not doc.edit_url:
await _degrade(session, doc, "edit_url 없음")
return
meta = doc.extract_meta or {}
source_id = meta.get("source_id")
try:
html_text, final_url = await fetch_page(doc.edit_url)
except (CrawlBlocked, CrawlSkip) as e:
await _degrade(session, doc, f"{type(e).__name__}: {e}")
return
except CrawlFetchError:
raise # 일시 오류 — 큐 재시도
now = datetime.now(timezone.utc)
# A-7: 원본 HTML 보존 (추출기 교체 시 전체 재추출 가능 상태 유지)
raw_path = _raw_html_path(source_id, doc.file_hash, now)
try:
_save_raw_html(raw_path, html_text)
raw_saved = True
except OSError as e:
# NAS 일시 장애 시 보존만 누락하고 승격은 진행 — 사유 기록 (silent 누락 회피)
raw_saved = False
logger.error(f"[fulltext] doc={doc.id} 원본 보존 실패 (승격은 진행): {e}")
body, engine, engine_ver = _extract_body(html_text)
if not engine:
await _degrade(session, doc, f"추출 실패 (전 tier < {_WEB_MIN_BODY_LEN}자)")
return
clean_body = _strip_article_footer(body.replace("\x00", ""))
if len(clean_body) < _WEB_MIN_BODY_LEN:
await _degrade(session, doc, "푸터 제거 후 본문 부족")
return
title = doc.title or ""
doc.extracted_text = f"{title}\n\n{clean_body}" if title else clean_body
doc.extracted_at = now
doc.extractor_version = f"rss+page@{engine}"
doc.md_content = clean_body
doc.md_status = "success"
doc.md_extraction_engine = engine
doc.md_extraction_engine_version = engine_ver
doc.md_format_version = "1.0"
doc.md_generated_at = now
doc.md_source_hash = hashlib.sha256(html_text.encode("utf-8", errors="replace")).hexdigest()
doc.md_content_hash = hashlib.sha256(clean_body.encode("utf-8")).hexdigest()
doc.md_extraction_error = None # 수집 시점의 '변환 비대상' 마커 해제
doc.content_origin = "extracted"
doc.file_size = len(doc.extracted_text.encode())
_set_fulltext_meta(
doc, status="promoted", engine=engine,
raw_html_path=str(raw_path) if raw_saved else None,
final_url=final_url, body_chars=len(clean_body),
resolved_at=now.isoformat(),
)
await _enqueue_downstream(session, doc)
logger.info(
f"[fulltext/{engine}] doc={doc.id} {len(clean_body)}자 승격 "
f"(raw={'saved' if raw_saved else 'MISSING'})"
)
async def reconcile_unresolved() -> None:
"""안전망 (야간 1회): fulltext 영구 실패(3회 소진)로 summarize 가 영영 안 잡힌
뉴스 문서에 RSS 요약 기준 후속 단계를 enqueue. 멱등 — enqueue 후엔 조건 불일치."""
async with async_session() as session:
summarize_q = (
select(ProcessingQueue.id)
.where(
ProcessingQueue.document_id == Document.id,
ProcessingQueue.stage == "summarize",
)
)
result = await session.execute(
select(Document)
.join(ProcessingQueue, ProcessingQueue.document_id == Document.id)
.where(
ProcessingQueue.stage == "fulltext",
ProcessingQueue.status == "failed",
Document.source_channel == "news",
~exists(summarize_q),
)
.limit(200)
)
docs = result.scalars().unique().all()
for doc in docs:
_set_fulltext_meta(doc, status="failed_reconciled")
await _enqueue_downstream(session, doc)
if docs:
await session.commit()
logger.warning(f"[fulltext] reconcile: 영구 실패 {len(docs)}건 RSS 요약으로 후속 enqueue")
+235 -47
View File
@@ -1,8 +1,15 @@
"""뉴스 수집 워커 — RSS/API에서 기사 수집, documents에 저장""" """뉴스 수집 워커 — RSS/API에서 기사 수집, documents에 저장
plan crawl-24x7-1 A그룹 (2026-06-10):
A-1 조건부 GET(ETag/Last-Modified 그대로 재전송) + 콘텐츠 해시 변경감지
A-2 fulltext_policy='page' 소스는 'fulltext' stage 로 본문 승격 위임
A-5 source_health 기록 + circuit breaker (소스별 실패 격리)
A-6 first-wins + 포털 전재 2차 dedup (제목+최근 3일, 12자 이상 제목 한정)
"""
import hashlib import hashlib
import re import re
from datetime import datetime, timezone from datetime import datetime, timedelta, timezone
from html import unescape from html import unescape
from urllib.parse import parse_qsl, urlencode, urlparse, urlunparse from urllib.parse import parse_qsl, urlencode, urlparse, urlunparse
@@ -10,11 +17,13 @@ import feedparser
import httpx import httpx
from sqlalchemy import select from sqlalchemy import select
from core.crawl_politeness import CRAWL_UA
from core.database import async_session from core.database import async_session
from core.utils import setup_logger from core.utils import setup_logger
from models.document import Document from models.document import Document
from models.news_source import NewsSource from models.news_source import NewsSource
from models.queue import enqueue_stage from models.queue import enqueue_stage
from models.source_health import SourceHealth
logger = setup_logger("news_collector") logger = setup_logger("news_collector")
@@ -38,18 +47,23 @@ CATEGORY_MAP = {
} }
class FeedError(Exception):
"""소스 단위 fetch/parse 실패 — run() 이 source_health 실패로 기록."""
def _normalize_category(raw: str) -> str: def _normalize_category(raw: str) -> str:
"""카테고리 표준화""" """카테고리 표준화"""
return CATEGORY_MAP.get(raw, CATEGORY_MAP.get(raw.strip(), "Other")) return CATEGORY_MAP.get(raw, CATEGORY_MAP.get(raw.strip(), "Other"))
def _clean_html(text: str) -> str: def _clean_html(text: str, max_len: int | None = 1000) -> str:
"""HTML 태그 제거 + 정제""" """HTML 태그 제거 + 정제. max_len=None 이면 절단 없음 (feed-full 전문용)."""
if not text: if not text:
return "" return ""
text = re.sub(r"<[^>]+>", "", text) text = re.sub(r"<[^>]+>", "", text)
text = unescape(text) text = unescape(text)
return text.strip()[:1000] text = text.strip()
return text if max_len is None else text[:max_len]
# tracking 파라미터 판별 — prefix(utm_/at_=BBC/ns_=BBC/mc_=mailchimp) + 단독 키 # tracking 파라미터 판별 — prefix(utm_/at_=BBC/ns_=BBC/mc_=mailchimp) + 단독 키
@@ -87,8 +101,93 @@ def _normalize_to_utc(dt) -> datetime:
return datetime.now(timezone.utc) return datetime.now(timezone.utc)
# ── A-5: circuit breaker 정책 ──
# 연속 실패 >= OPEN 임계 → open (재시도 간격 지수 확대, 6h × 2^n, cap 48h)
# 연속 실패 > DISABLE 임계 → disabled (수집 제외 + 가시 로그, 수동 복구 대상)
# news_sources.enabled 는 건드리지 않는다 — 사용자 의도(enabled)와 자동 상태(circuit) 분리.
_CIRCUIT_OPEN_AFTER = 3
_CIRCUIT_DISABLE_AFTER = 10
_BACKOFF_BASE_HOURS = 6
_BACKOFF_CAP_HOURS = 48
_EMPTY_STREAK_ALERT = 8 # 6h 사이클 × 8 = 약 2일 연속 빈 피드 → 가시 경고
def _should_attempt(health: SourceHealth, now: datetime) -> bool:
"""circuit 상태에 따라 이번 사이클 fetch 여부 결정.
주의 (B-3 계약 ②, r5): 추후 relogin_requested 플래그 소비는 반드시 이
open-스킵 분기보다 *앞*에 두어야 한다 — open 이 스케줄 제외 형태가 되면
배치 경계가 안 와 플래그가 영원히 미소비(half-open 데드 버튼)가 된다.
"""
if health.circuit_state == "disabled":
return False
if health.circuit_state == "open" and health.last_error_at is not None:
over = max(health.consecutive_failures - _CIRCUIT_OPEN_AFTER, 0)
backoff_h = min(_BACKOFF_BASE_HOURS * (2 ** over), _BACKOFF_CAP_HOURS)
if now - health.last_error_at < timedelta(hours=backoff_h):
return False
return True
def _record_success(health: SourceHealth, items: int, not_modified: bool, now: datetime) -> None:
health.consecutive_failures = 0
health.total_fetches += 1
health.last_success_at = now
health.last_fetch_items = items
if health.circuit_state != "closed":
logger.info(f"[health] source={health.source_id} circuit {health.circuit_state}→closed")
health.circuit_state = "closed"
health.circuit_opened_at = None
# 빈 피드 streak: 304/해시동일은 정상 신호라 미집계, 200+entries 0 만 집계 (피드 부패 감시)
if not_modified:
pass
elif items == 0:
health.empty_streak += 1
if health.empty_streak >= _EMPTY_STREAK_ALERT:
logger.error(
f"[health] source={health.source_id} 빈 피드 {health.empty_streak}회 연속 "
f"— 피드 부패 의심 (RSSHub 류 라우트 깨짐 패턴)"
)
else:
health.empty_streak = 0
health.updated_at = now
def _record_failure(health: SourceHealth, error: str, now: datetime) -> None:
health.consecutive_failures += 1
health.total_fetches += 1
health.total_failures += 1
health.last_error = error[:500]
health.last_error_at = now
health.updated_at = now
cf = health.consecutive_failures
if cf > _CIRCUIT_DISABLE_AFTER and health.circuit_state != "disabled":
health.circuit_state = "disabled"
logger.error(
f"[health] source={health.source_id} 연속 실패 {cf}회 — circuit DISABLED "
f"(수집 제외, A-8 패널에서 수동 복구 필요)"
)
elif cf >= _CIRCUIT_OPEN_AFTER and health.circuit_state == "closed":
health.circuit_state = "open"
health.circuit_opened_at = now
logger.warning(f"[health] source={health.source_id} 연속 실패 {cf}회 — circuit open")
async def _get_or_create_health(session, source_id: int) -> SourceHealth:
result = await session.execute(
select(SourceHealth).where(SourceHealth.source_id == source_id)
)
health = result.scalars().first()
if health is None:
health = SourceHealth(source_id=source_id)
session.add(health)
await session.flush()
return health
async def run(): async def run():
"""뉴스 수집 실행""" """뉴스 수집 실행"""
now = datetime.now(timezone.utc)
async with async_session() as session: async with async_session() as session:
result = await session.execute( result = await session.execute(
select(NewsSource).where(NewsSource.enabled == True) select(NewsSource).where(NewsSource.enabled == True)
@@ -101,17 +200,23 @@ async def run():
total = 0 total = 0
for source in sources: for source in sources:
health = await _get_or_create_health(session, source.id)
if not _should_attempt(health, now):
logger.info(f"[{source.name}] circuit {health.circuit_state} — 이번 사이클 skip")
continue
try: try:
if source.feed_type == "api": if source.feed_type == "api":
count = await _fetch_api(session, source) count, status = await _fetch_api(session, source)
else: else:
count = await _fetch_rss(session, source) count, status = await _fetch_rss(session, source)
source.last_fetched_at = datetime.now(timezone.utc) source.last_fetched_at = datetime.now(timezone.utc)
_record_success(health, count, status == "not_modified", now)
total += count total += count
except Exception as e: except Exception as e:
logger.error(f"[{source.name}] 수집 실패: {e}") logger.error(f"[{source.name}] 수집 실패: {e}")
source.last_fetched_at = datetime.now(timezone.utc) source.last_fetched_at = datetime.now(timezone.utc)
_record_failure(health, str(e) or repr(e), now)
await session.commit() await session.commit()
logger.info(f"뉴스 수집 완료: {total}건 신규") logger.info(f"뉴스 수집 완료: {total}건 신규")
@@ -122,8 +227,58 @@ ALLOWED_CONTENT_TYPES = ("application/rss+xml", "application/atom+xml",
"application/xml", "text/xml") "application/xml", "text/xml")
async def _fetch_rss(session, source: NewsSource) -> int: async def _is_portal_duplicate(session, title: str) -> bool:
"""RSS 피드 수집 — redirect 재검증 + 크기/content-type 제한""" """A-6 2차 dedup: 포털 전재본 vs 원본이 다른 URL 로 이중 적재되는 케이스.
보조 키 = 제목 + 최근 3일 (다른 소스/다른 URL 이므로 1차 키로 안 잡힘).
범용 제목 오탐 방지: 12자 미만 제목은 비적용. skip 은 전부 로그 (silent 누락 회피).
"""
if len(title) < 12:
return False
cutoff = datetime.now(timezone.utc) - timedelta(days=3)
dup = await session.execute(
select(Document.id).where(
Document.title == title,
Document.source_channel == "news",
Document.file_format == "article",
Document.extracted_at >= cutoff,
).limit(1)
)
return dup.scalars().first() is not None
async def _enqueue_processing(session, doc: Document, source: NewsSource, pub_dt: datetime) -> None:
"""후속 단계 enqueue.
fulltext_policy='page' 소스는 'fulltext' stage 만 — summarize/embed/chunk 는
fulltext_worker 가 승격(또는 격하) 확정 후 enqueue (RSS 요약 선요약 → 풀텍스트
도착 시 summarize_worker 의 '이미 요약 있음 skip' 에 막히는 순서 함정 회피).
"""
if source.fulltext_policy == "page" and doc.edit_url:
await enqueue_stage(session, doc.id, "fulltext")
return
await enqueue_stage(session, doc.id, "summarize")
days_old = (datetime.now(timezone.utc) - pub_dt).days
if days_old <= 30:
await enqueue_stage(session, doc.id, "embed")
await enqueue_stage(session, doc.id, "chunk")
def _build_extract_meta(source: NewsSource, pub_dt: datetime) -> dict:
"""fulltext_worker / 패널이 쓰는 출처 메타 (documents 에 source FK 가 없어 여기 기록)."""
return {
"source_id": source.id,
"source_name": source.name,
"published_at": pub_dt.isoformat(),
}
async def _fetch_rss(session, source: NewsSource) -> tuple[int, str]:
"""RSS 피드 수집 — redirect 재검증 + 크기/content-type 제한 + 조건부 GET (A-1).
반환 (신규 건수, 상태). 상태 'not_modified' = 304 또는 콘텐츠 해시 동일.
소스 단위 실패는 FeedError raise — run() 이 health 실패로 기록.
"""
from urllib.parse import urljoin from urllib.parse import urljoin
from core.url_validator import validate_feed_url, HTTP_EXCEPTION_DOMAINS from core.url_validator import validate_feed_url, HTTP_EXCEPTION_DOMAINS
@@ -134,17 +289,24 @@ async def _fetch_rss(session, source: NewsSource) -> int:
# 순수 HTTP 소스인데 allowlist에 없으면 차단 # 순수 HTTP 소스인데 allowlist에 없으면 차단
if source.feed_url.startswith("http://") and not http_allowed: if source.feed_url.startswith("http://") and not http_allowed:
logger.error(f"[{source.name}] HTTP 차단 (allowlist 미등록): {source_hostname}") raise FeedError(f"HTTP 차단 (allowlist 미등록): {source_hostname}")
return 0
# fetch 전 URL 재검증 (등록 이후 DNS 변경 대비) # fetch 전 URL 재검증 (등록 이후 DNS 변경 대비)
try: try:
validate_feed_url(source.feed_url, allow_http=http_allowed) validate_feed_url(source.feed_url, allow_http=http_allowed)
except ValueError as e: except ValueError as e:
logger.error(f"[{source.name}] URL 검증 실패: {e}") raise FeedError(f"URL 검증 실패: {e}") from e
return 0
async with httpx.AsyncClient(timeout=10, follow_redirects=False) as client: # A-1: 정직 UA + 조건부 GET — 서버가 준 워터마크를 받은 그대로 재전송
headers = {"User-Agent": CRAWL_UA}
if source.etag:
headers["If-None-Match"] = source.etag
if source.last_modified:
headers["If-Modified-Since"] = source.last_modified
async with httpx.AsyncClient(
timeout=10, follow_redirects=False, headers=headers
) as client:
resp = await client.get(source.feed_url) resp = await client.get(source.feed_url)
# redirect 수동 처리 (최대 3회, 각 target 재검증) # redirect 수동 처리 (최대 3회, 각 target 재검증)
@@ -156,29 +318,41 @@ async def _fetch_rss(session, source: NewsSource) -> int:
try: try:
validate_feed_url(location, allow_http=http_allowed) validate_feed_url(location, allow_http=http_allowed)
except ValueError as e: except ValueError as e:
logger.error(f"[{source.name}] redirect target 차단: {e}") raise FeedError(f"redirect target 차단: {e}") from e
return 0
resp = await client.get(location) resp = await client.get(location)
redirects += 1 redirects += 1
if resp.is_redirect: if resp.is_redirect:
logger.error(f"[{source.name}] redirect 3회 초과") raise FeedError("redirect 3회 초과")
return 0
if resp.status_code == 304:
logger.info(f"[{source.name}] 304 Not Modified — 본문 미전송")
return 0, "not_modified"
resp.raise_for_status() resp.raise_for_status()
if len(resp.content) > MAX_RESPONSE_SIZE: if len(resp.content) > MAX_RESPONSE_SIZE:
logger.warning(f"[{source.name}] 응답 크기 초과: {len(resp.content)} bytes") raise FeedError(f"응답 크기 초과: {len(resp.content)} bytes")
return 0
ct = resp.headers.get("content-type", "").lower() ct = resp.headers.get("content-type", "").lower()
if not any(t in ct for t in ALLOWED_CONTENT_TYPES): if not any(t in ct for t in ALLOWED_CONTENT_TYPES):
logger.warning(f"[{source.name}] 비정상 content-type: {ct}") raise FeedError(f"비정상 content-type: {ct}")
return 0
# A-1: 워터마크 갱신 + 콘텐츠 해시 변경감지 (CDN 의 ETag 회전 대비 병행)
new_etag = resp.headers.get("etag")
new_last_modified = resp.headers.get("last-modified")
if new_etag:
source.etag = new_etag
if new_last_modified:
source.last_modified = new_last_modified
content_hash = hashlib.sha256(resp.content).hexdigest()
if source.feed_content_hash == content_hash:
logger.info(f"[{source.name}] 콘텐츠 해시 동일 — 파싱 skip")
return 0, "not_modified"
source.feed_content_hash = content_hash
feed = feedparser.parse(resp.text) feed = feedparser.parse(resp.text)
if feed.bozo and not feed.entries: if feed.bozo and not feed.entries:
logger.warning(f"[{source.name}] RSS 파싱 실패: {feed.bozo_exception}") raise FeedError(f"RSS 파싱 실패: {feed.bozo_exception}")
return 0
count = 0 count = 0
for entry in feed.entries: for entry in feed.entries:
@@ -190,6 +364,18 @@ async def _fetch_rss(session, source: NewsSource) -> int:
if not summary: if not summary:
summary = title summary = title
# A-6: feed-full 소스만 피드 본문을 전문으로 신뢰 (truncate·광고 삽입이 흔해
# 일반 소스의 summary/content:encoded 를 전문으로 오인 저장 금지)
body = summary
is_feed_full = False
if source.fulltext_policy == "feed-full":
content_list = entry.get("content") or []
raw_body = content_list[0].get("value", "") if content_list else ""
full_body = _clean_html(raw_body or entry.get("summary", ""), max_len=None)
if len(full_body) > len(summary):
body = full_body
is_feed_full = True
link = entry.get("link", "") link = entry.get("link", "")
published = entry.get("published_parsed") or entry.get("updated_parsed") published = entry.get("published_parsed") or entry.get("updated_parsed")
pub_dt = datetime(*published[:6], tzinfo=timezone.utc) if published else datetime.now(timezone.utc) pub_dt = datetime(*published[:6], tzinfo=timezone.utc) if published else datetime.now(timezone.utc)
@@ -209,6 +395,11 @@ async def _fetch_rss(session, source: NewsSource) -> int:
if existing.scalars().first(): if existing.scalars().first():
continue continue
# A-6 2차: 포털 전재 dedup (first-wins — 먼저 적재된 쪽이 정본)
if await _is_portal_duplicate(session, title):
logger.info(f"[{source.name}] portal-dup skip: {title[:60]}")
continue
category = _normalize_category(source.category or "") category = _normalize_category(source.category or "")
source_short = source.name.split(" ")[0] # "경향신문 문화" → "경향신문" source_short = source.name.split(" ")[0] # "경향신문 문화" → "경향신문"
@@ -216,15 +407,16 @@ async def _fetch_rss(session, source: NewsSource) -> int:
file_path=f"news/{source.name}/{article_id}", file_path=f"news/{source.name}/{article_id}",
file_hash=article_id, file_hash=article_id,
file_format="article", file_format="article",
file_size=len(summary.encode()), file_size=len(body.encode()),
file_type="note", file_type="note",
title=title, title=title,
extracted_text=f"{title}\n\n{summary}", extracted_text=f"{title}\n\n{body}",
extracted_at=datetime.now(timezone.utc), extracted_at=datetime.now(timezone.utc),
extractor_version="rss", extractor_version="rss-feed-full" if is_feed_full else "rss",
# article = 텍스트 네이티브(본문=extracted_text). markdown 단계 미enqueue 라 # article = 텍스트 네이티브(본문=extracted_text). markdown 단계 미enqueue 라
# 기본값 'pending' 이면 영구 비수렴 → backlog 지표 오염 + md_status_pending partial # 기본값 'pending' 이면 영구 비수렴 → backlog 지표 오염 + md_status_pending partial
# 인덱스 비대. 생성 시점에 terminal 'skipped' 로 명시(변환 비대상). # 인덱스 비대. 생성 시점에 terminal 'skipped' 로 명시(변환 비대상).
# fulltext_policy='page' 소스는 fulltext_worker 가 승격 시 success 로 갱신.
md_status="skipped", md_status="skipped",
md_extraction_error="news article: 텍스트 네이티브, markdown 변환 비대상", md_extraction_error="news article: 텍스트 네이티브, markdown 변환 비대상",
source_channel="news", source_channel="news",
@@ -235,30 +427,27 @@ async def _fetch_rss(session, source: NewsSource) -> int:
ai_domain="News", ai_domain="News",
ai_sub_group=source_short, ai_sub_group=source_short,
ai_tags=[f"News/{source_short}/{category}"], ai_tags=[f"News/{source_short}/{category}"],
extract_meta=_build_extract_meta(source, pub_dt),
) )
session.add(doc) session.add(doc)
await session.flush() await session.flush()
# summarize + embed + chunk 등록 (classify 불필요) # summarize + embed + chunk 등록 (classify 불필요).
await enqueue_stage(session, doc.id, "summarize") # page 정책 소스는 fulltext 만 — 후속은 fulltext_worker 가 확정 후 enqueue.
days_old = (datetime.now(timezone.utc) - pub_dt).days await _enqueue_processing(session, doc, source, pub_dt)
if days_old <= 30:
await enqueue_stage(session, doc.id, "embed")
await enqueue_stage(session, doc.id, "chunk")
count += 1 count += 1
logger.info(f"[{source.name}] RSS → {count}건 수집") logger.info(f"[{source.name}] RSS → {count}건 수집")
return count return count, "ok"
async def _fetch_api(session, source: NewsSource) -> int: async def _fetch_api(session, source: NewsSource) -> tuple[int, str]:
"""NYT API 수집 — 키 마스킹 + health degradation""" """NYT API 수집 — 키 마스킹 + health degradation"""
import os import os
nyt_key = os.getenv("NYT_API_KEY", "") nyt_key = os.getenv("NYT_API_KEY", "")
if not nyt_key: if not nyt_key:
logger.error("NYT_API_KEY 미설정 — US 뉴스 수집 불가") raise FeedError("NYT_API_KEY 미설정 — US 뉴스 수집 불가")
return 0
try: try:
async with httpx.AsyncClient(timeout=10) as client: async with httpx.AsyncClient(timeout=10) as client:
@@ -270,12 +459,10 @@ async def _fetch_api(session, source: NewsSource) -> int:
except httpx.HTTPStatusError as e: except httpx.HTTPStatusError as e:
# 쿼리스트링(api-key 포함) 제거 — path까지만 로깅 # 쿼리스트링(api-key 포함) 제거 — path까지만 로깅
safe_url = str(e.request.url).split("?")[0] safe_url = str(e.request.url).split("?")[0]
logger.error(f"NYT API 실패: {e.response.status_code} @ {safe_url}") raise FeedError(f"NYT API 실패: {e.response.status_code} @ {safe_url}") from e
return 0
except httpx.RequestError as e: except httpx.RequestError as e:
safe_url = str(e.request.url).split("?")[0] if e.request else "unknown" safe_url = str(e.request.url).split("?")[0] if e.request else "unknown"
logger.error(f"NYT API 연결 실패: {safe_url}") raise FeedError(f"NYT API 연결 실패: {safe_url}") from e
return 0
data = resp.json() data = resp.json()
count = 0 count = 0
@@ -309,6 +496,10 @@ async def _fetch_api(session, source: NewsSource) -> int:
if existing.scalars().first(): if existing.scalars().first():
continue continue
if await _is_portal_duplicate(session, title):
logger.info(f"[{source.name}] portal-dup skip: {title[:60]}")
continue
category = _normalize_category(article.get("section", source.category or "")) category = _normalize_category(article.get("section", source.category or ""))
source_short = source.name.split(" ")[0] source_short = source.name.split(" ")[0]
@@ -334,17 +525,14 @@ async def _fetch_api(session, source: NewsSource) -> int:
ai_domain="News", ai_domain="News",
ai_sub_group=source_short, ai_sub_group=source_short,
ai_tags=[f"News/{source_short}/{category}"], ai_tags=[f"News/{source_short}/{category}"],
extract_meta=_build_extract_meta(source, pub_dt),
) )
session.add(doc) session.add(doc)
await session.flush() await session.flush()
await enqueue_stage(session, doc.id, "summarize") await _enqueue_processing(session, doc, source, pub_dt)
days_old = (datetime.now(timezone.utc) - pub_dt).days
if days_old <= 30:
await enqueue_stage(session, doc.id, "embed")
await enqueue_stage(session, doc.id, "chunk")
count += 1 count += 1
logger.info(f"[{source.name}] API → {count}건 수집") logger.info(f"[{source.name}] API → {count}건 수집")
return count return count, "ok"
+9 -2
View File
@@ -22,8 +22,11 @@ logger = setup_logger("queue_consumer")
# stage별 배치 크기 # stage별 배치 크기
# stt 는 GPU 단일 점유 + 회의 30분짜리도 가능 → 배치 1. thumbnail 은 ffmpeg subprocess 로 가벼움. # stt 는 GPU 단일 점유 + 회의 30분짜리도 가능 → 배치 1. thumbnail 은 ffmpeg subprocess 로 가벼움.
# deep_summary (PR-B B-1) 는 MLX 26B 단일 Semaphore(1) 경유 → 배치 1. # deep_summary (PR-B B-1) 는 MLX 26B 단일 Semaphore(1) 경유 → 배치 1.
# fulltext 는 politeness 지연(같은 도메인 5–15s)이 배치 내 직렬로 걸린다 — 배치 3 이면
# 같은 도메인 최악 ~45s/사이클, 메인 큐 1m 간격(max_instances=1, coalesce)이 흡수.
BATCH_SIZE = {"extract": 5, "classify": 3, "summarize": 3, "embed": 1, "chunk": 1, BATCH_SIZE = {"extract": 5, "classify": 3, "summarize": 3, "embed": 1, "chunk": 1,
"preview": 2, "stt": 1, "thumbnail": 3, "deep_summary": 1, "markdown": 1} "preview": 2, "stt": 1, "thumbnail": 3, "deep_summary": 1, "markdown": 1,
"fulltext": 3}
STALE_THRESHOLD_MINUTES = 10 STALE_THRESHOLD_MINUTES = 10
# markdown 대형 split 변환은 한 doc 이 수십 분(5210 ≈ 40분) 동안 processing 상태로 머문다. # markdown 대형 split 변환은 한 doc 이 수십 분(5210 ≈ 40분) 동안 processing 상태로 머문다.
# marker_worker 는 queue 행에 heartbeat 를 찍지 않으므로(started_at 고정), main 의 10분 # marker_worker 는 queue 행에 heartbeat 를 찍지 않으므로(started_at 고정), main 의 10분
@@ -35,7 +38,7 @@ MARKDOWN_STALE_THRESHOLD_MINUTES = int(os.getenv("MARKDOWN_STALE_MINUTES", "120"
# STT 도 장기 작업 가능성이 있으나 본 PR 범위 밖 — main 에 유지(follow-up). # STT 도 장기 작업 가능성이 있으나 본 PR 범위 밖 — main 에 유지(follow-up).
MAIN_QUEUE_STAGES = [ MAIN_QUEUE_STAGES = [
"extract", "classify", "summarize", "embed", "chunk", "extract", "classify", "summarize", "embed", "chunk",
"preview", "stt", "thumbnail", "deep_summary", "preview", "stt", "thumbnail", "deep_summary", "fulltext",
] ]
MARKDOWN_QUEUE_STAGES = ["markdown"] MARKDOWN_QUEUE_STAGES = ["markdown"]
@@ -179,6 +182,7 @@ def _load_workers():
from workers.summarize_worker import process as summarize_process from workers.summarize_worker import process as summarize_process
from workers.thumbnail_worker import process as thumbnail_process from workers.thumbnail_worker import process as thumbnail_process
from workers.marker_worker import process as marker_process from workers.marker_worker import process as marker_process
from workers.fulltext_worker import process as fulltext_process
return { return {
"extract": extract_process, "extract": extract_process,
@@ -195,6 +199,9 @@ def _load_workers():
# Phase 1B: classify 완료 후 enqueue. PDF→markdown 변환 (leaf, embed/chunk 와 독립). # Phase 1B: classify 완료 후 enqueue. PDF→markdown 변환 (leaf, embed/chunk 와 독립).
# consume_markdown_queue 가 전담 (대형 split 변환이 메인 파이프라인을 막지 않도록). # consume_markdown_queue 가 전담 (대형 split 변환이 메인 파이프라인을 막지 않도록).
"markdown": marker_process, "markdown": marker_process,
# crawl-24x7 A-2: 기사 페이지 fetch → 4-tier 본문 승격. 후속(summarize/embed/chunk)은
# 워커가 직접 enqueue — next_stages dict 미등록 (enqueue_next_stage no-op).
"fulltext": fulltext_process,
} }
@@ -0,0 +1,19 @@
-- A-3 (plan crawl-24x7-1): 소스 레지스트리 증축 — additive only.
-- fetch_method : rss / rss+page / sitemap+page / page / api / signal-only
-- fulltext_policy : none(현행 유지) / page(기사 페이지 fetch 후 4-tier 승격) / feed-full(피드 본문이 전문)
-- auth_profile : NULL=공개, 값=구독 세션 키 (B-3 Playwright 어댑터용 슬롯)
-- poll_interval_minutes : 소스별 차등 폴링 (NULL=전역 6h 사이클)
-- etag / last_modified : 조건부 GET 워터마크 — 받은 그대로 저장·재전송 (상태는 전부 DB, APScheduler in-process)
-- feed_content_hash : CDN ETag 회전 대비 콘텐츠 해시 변경감지 병행
-- selector_override : 추출 실패 잦은 소스의 site-specific CSS selector (JSONB)
-- parser_quirk : rdf / table-strip / gn-redirect 등 파서 특이 케이스
ALTER TABLE news_sources
ADD COLUMN IF NOT EXISTS fetch_method VARCHAR(20) NOT NULL DEFAULT 'rss',
ADD COLUMN IF NOT EXISTS fulltext_policy VARCHAR(20) NOT NULL DEFAULT 'none',
ADD COLUMN IF NOT EXISTS auth_profile VARCHAR(50),
ADD COLUMN IF NOT EXISTS poll_interval_minutes INTEGER,
ADD COLUMN IF NOT EXISTS etag TEXT,
ADD COLUMN IF NOT EXISTS last_modified TEXT,
ADD COLUMN IF NOT EXISTS feed_content_hash VARCHAR(64),
ADD COLUMN IF NOT EXISTS selector_override JSONB,
ADD COLUMN IF NOT EXISTS parser_quirk VARCHAR(30);
@@ -0,0 +1,3 @@
-- 0-5 (a) 확정 (plan crawl-24x7-1): 도메인 자료(안전/공학/철학) 채널 신설 — news 와 분리.
-- 신규 값은 같은 트랜잭션 내 사용 금지 (PG 제약) — 본 배치의 다른 마이그레이션은 'crawl' 미사용.
ALTER TYPE source_channel ADD VALUE IF NOT EXISTS 'crawl';
@@ -0,0 +1,3 @@
-- A-2 (plan crawl-24x7-1): RSS 요약 → 기사 페이지 fetch → 4-tier 본문 승격 stage.
-- fulltext_policy='page' 소스의 기사에만 news_collector 가 enqueue.
ALTER TYPE process_stage ADD VALUE IF NOT EXISTS 'fulltext';
+19
View File
@@ -0,0 +1,19 @@
-- A-5 (plan crawl-24x7-1): 소스 건강 — 소스별 실패 격리 기록 + circuit breaker.
-- 한 소스가 죽어도 나머지 영향 0. silent skip 누적 방지의 가시성 기반 (A-8 패널이 읽음).
-- circuit_state: closed(정상) / open(연속 실패로 지수 backoff 중) / disabled(M회 초과, 수동 복구 대상)
-- empty_streak : 200 인데 entries 0 인 연속 fetch 횟수 (피드 부패 감시 — 304/해시동일은 미집계)
CREATE TABLE IF NOT EXISTS source_health (
id SERIAL PRIMARY KEY,
source_id INTEGER NOT NULL REFERENCES news_sources(id) ON DELETE CASCADE,
consecutive_failures INTEGER NOT NULL DEFAULT 0,
total_fetches BIGINT NOT NULL DEFAULT 0,
total_failures BIGINT NOT NULL DEFAULT 0,
last_success_at TIMESTAMPTZ,
last_error TEXT,
last_error_at TIMESTAMPTZ,
last_fetch_items INTEGER,
empty_streak INTEGER NOT NULL DEFAULT 0,
circuit_state VARCHAR(10) NOT NULL DEFAULT 'closed',
circuit_opened_at TIMESTAMPTZ,
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
@@ -0,0 +1,2 @@
-- A-5: source_health 는 news_sources 와 1:1 — upsert 기준 키.
CREATE UNIQUE INDEX IF NOT EXISTS uq_source_health_source_id ON source_health (source_id);