From 7cd8cfde0af2ecb201274d4160f55365bba7cdeb Mon Sep 17 00:00:00 2001 From: hyungi Date: Wed, 10 Jun 2026 13:03:31 +0900 Subject: [PATCH] =?UTF-8?q?feat(news):=20crawl-24x7=20A=EA=B7=B8=EB=A3=B9?= =?UTF-8?q?=20=E2=80=94=20=EB=A0=88=EC=A7=80=EC=8A=A4=ED=8A=B8=EB=A6=AC=20?= =?UTF-8?q?=EC=A6=9D=EC=B6=95=C2=B7=EC=A1=B0=EA=B1=B4=EB=B6=80=20GET=C2=B7?= =?UTF-8?q?fulltext=20=EC=8A=B9=EA=B2=A9=C2=B7politeness=C2=B7source=5Fhea?= =?UTF-8?q?lth?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit A-3 migrations 319-323 (news_sources 9컬럼 + source_channel 'crawl' + process_stage 'fulltext' + source_health) A-1 조건부 GET(ETag/Last-Modified 그대로 재전송)+콘텐츠 해시 변경감지, A-4 politeness 코어(per-domain 직렬+robots+정직UA), A-2+A-7 fulltext_worker(4-tier 재사용·NAS crawl_raw gzip 보존·격하 경로·03:40 reconcile 안전망), A-5 circuit breaker(3/10 임계, enabled 미터치), A-6 포털 전재 2차 dedup(제목+3일, 12자 게이트). 기존 소스 fulltext_policy='none' 기본 = 무회귀. plan crawl-24x7-1, 예외 박제 crawl-24x7-exec1-20260610.md --- app/core/crawl_politeness.py | 174 +++++++++++ app/main.py | 4 + app/models/document.py | 2 +- app/models/news_source.py | 22 +- app/models/queue.py | 3 +- app/models/source_health.py | 36 +++ app/requirements.txt | 7 +- app/workers/fulltext_worker.py | 218 ++++++++++++++ app/workers/news_collector.py | 282 +++++++++++++++--- app/workers/queue_consumer.py | 11 +- migrations/319_news_sources_crawl_columns.sql | 19 ++ migrations/320_source_channel_add_crawl.sql | 3 + migrations/321_process_stage_add_fulltext.sql | 3 + migrations/322_source_health_table.sql | 19 ++ migrations/323_source_health_source_id_uq.sql | 2 + 15 files changed, 751 insertions(+), 54 deletions(-) create mode 100644 app/core/crawl_politeness.py create mode 100644 app/models/source_health.py create mode 100644 app/workers/fulltext_worker.py create mode 100644 migrations/319_news_sources_crawl_columns.sql create mode 100644 migrations/320_source_channel_add_crawl.sql create mode 100644 migrations/321_process_stage_add_fulltext.sql create mode 100644 migrations/322_source_health_table.sql create mode 100644 migrations/323_source_health_source_id_uq.sql diff --git a/app/core/crawl_politeness.py b/app/core/crawl_politeness.py new file mode 100644 index 0000000..d4f6df2 --- /dev/null +++ b/app/core/crawl_politeness.py @@ -0,0 +1,174 @@ +"""크롤링 politeness 코어 (A-4, plan crawl-24x7-1) + +개인 아카이빙 권장치를 그대로 박은 공용 fetch 계층: +- per-domain 동시성 1 (asyncio.Lock) + 같은 도메인 연속 요청 5–15초 지연 + jitter +- robots.txt 존중 (urllib.robotparser, 24h 캐시) — 비로그인 공개 크롤링 한정. + 로그인 세션 fetch (B-3) 는 사용자 행위 성격이라 robots 대신 사람 속도가 기준. +- 정직 식별 UA + 연락처 (익명 크롤링 트랙. 로그인 세션은 브라우저 UA 유지 — B-3) +- 429 = Retry-After 존중 / 5xx = 재시도 가능 / 403 = 차단 신호 (호출측 circuit 연동) + +도메인별 마지막 요청 시각 등 rate 상태는 in-process (영속 워터마크는 DB — news_sources). +SSRF 차단은 core.url_validator.validate_feed_url 재사용 (redirect target 재검증 포함). +""" + +import asyncio +import logging +import random +import time +import urllib.robotparser +from urllib.parse import urljoin, urlparse + +import httpx + +from core.url_validator import validate_feed_url + +logger = logging.getLogger("crawl_politeness") + +# 정직 식별 UA + 연락처 — 차단 전 연락 통로 (A-4) +CRAWL_UA = "HyungiPKM-Archiver/1.0 (personal archive; +mailto:hyun49196@gmail.com)" + +# 같은 도메인 연속 요청 간격 (초) — 권장치 5–15s + jitter +_DOMAIN_DELAY_MIN = 5.0 +_DOMAIN_DELAY_MAX = 15.0 + +_ROBOTS_CACHE_TTL = 24 * 3600 # 24h +_MAX_PAGE_BYTES = 5 * 1024 * 1024 # 피드 fetch 와 동일 5MB cap +_PAGE_TIMEOUT = 20.0 +_MAX_REDIRECTS = 3 + +_HTML_CONTENT_TYPES = ("text/html", "application/xhtml+xml") + + +class CrawlFetchError(Exception): + """일시 오류 (5xx / timeout / 네트워크) — 큐 재시도 대상.""" + + +class CrawlBlocked(Exception): + """차단 신호 (403 / 429 / robots disallow) — 재시도보다 backoff/circuit 대상.""" + + +class CrawlSkip(Exception): + """영구 비대상 (비-HTML / 크기 초과 / SSRF 차단 / 4xx) — 격하 처리 대상.""" + + +# 도메인별 직렬화 상태 (in-process) +_domain_locks: dict[str, asyncio.Lock] = {} +_domain_last_request: dict[str, float] = {} +# host → (cached_at, RobotFileParser | None). None = robots 없음/4xx (전부 허용) +_robots_cache: dict[str, tuple[float, urllib.robotparser.RobotFileParser | None]] = {} + + +def _domain_of(url: str) -> str: + return (urlparse(url).hostname or "").lower() + + +def _get_lock(domain: str) -> asyncio.Lock: + if domain not in _domain_locks: + _domain_locks[domain] = asyncio.Lock() + return _domain_locks[domain] + + +async def _respect_domain_rate(domain: str) -> None: + """같은 도메인 직전 요청에서 5–15초(jitter) 경과할 때까지 대기.""" + last = _domain_last_request.get(domain) + if last is not None: + delay = random.uniform(_DOMAIN_DELAY_MIN, _DOMAIN_DELAY_MAX) + wait = last + delay - time.monotonic() + if wait > 0: + await asyncio.sleep(wait) + + +async def _fetch_robots(client: httpx.AsyncClient, scheme: str, host: str): + """robots.txt 조회. 4xx/부재 = 전부 허용(None), 5xx/오류 = 보수적으로 이번 사이클 차단.""" + robots_url = f"{scheme}://{host}/robots.txt" + try: + resp = await client.get(robots_url, headers={"User-Agent": CRAWL_UA}) + except httpx.HTTPError as e: + raise CrawlFetchError(f"robots.txt 조회 실패: {host}: {e}") from e + if resp.status_code >= 500: + # 5xx 는 의도 불명 — 표준 관행대로 이번 사이클은 차단 취급 + raise CrawlFetchError(f"robots.txt 5xx: {host}: {resp.status_code}") + if resp.status_code >= 400: + return None # robots 없음 = 전부 허용 + rp = urllib.robotparser.RobotFileParser() + rp.parse(resp.text.splitlines()) + return rp + + +async def _robots_allows(client: httpx.AsyncClient, url: str) -> bool: + parsed = urlparse(url) + host = (parsed.hostname or "").lower() + cached = _robots_cache.get(host) + if cached is None or time.monotonic() - cached[0] > _ROBOTS_CACHE_TTL: + rp = await _fetch_robots(client, parsed.scheme or "https", host) + _robots_cache[host] = (time.monotonic(), rp) + cached = _robots_cache[host] + rp = cached[1] + if rp is None: + return True + return rp.can_fetch(CRAWL_UA, url) + + +async def fetch_page(url: str, *, check_robots: bool = True) -> tuple[str, str]: + """공개 페이지 1건 politeness fetch. (html_text, final_url) 반환. + + - SSRF 검증 (redirect target 포함, news_collector 피드 fetch 와 동일 이중 검증) + - per-domain 동시성 1 + 5–15s jitter 지연 + - 429: Retry-After 로그 후 CrawlBlocked / 403: CrawlBlocked / 그 외 4xx: CrawlSkip + - 5xx/timeout: CrawlFetchError (큐 재시도) + - 비-HTML content-type / 5MB 초과: CrawlSkip + """ + try: + validate_feed_url(url) + except ValueError as e: + raise CrawlSkip(f"URL 검증 실패: {e}") from e + + domain = _domain_of(url) + async with _get_lock(domain): + await _respect_domain_rate(domain) + try: + async with httpx.AsyncClient( + timeout=_PAGE_TIMEOUT, follow_redirects=False, + headers={"User-Agent": CRAWL_UA}, + ) as client: + if check_robots and not await _robots_allows(client, url): + raise CrawlBlocked(f"robots.txt disallow: {url}") + + resp = await client.get(url) + redirects = 0 + while resp.is_redirect and redirects < _MAX_REDIRECTS: + location = urljoin(str(resp.request.url), resp.headers.get("location", "")) + try: + validate_feed_url(location) + except ValueError as e: + raise CrawlSkip(f"redirect target 차단: {e}") from e + # redirect 도 같은 도메인 연속 요청 — 간격은 lock 보유로 충분 (즉시 1회) + resp = await client.get(location) + redirects += 1 + if resp.is_redirect: + raise CrawlSkip(f"redirect {_MAX_REDIRECTS}회 초과: {url}") + except httpx.TimeoutException as e: + raise CrawlFetchError(f"timeout: {url}") from e + except httpx.HTTPError as e: + raise CrawlFetchError(f"네트워크 오류: {url}: {e}") from e + finally: + _domain_last_request[domain] = time.monotonic() + + if resp.status_code == 429: + retry_after = resp.headers.get("retry-after", "") + logger.warning("[politeness] 429 %s (Retry-After=%s)", domain, retry_after or "-") + raise CrawlBlocked(f"429 rate limited: {url} (Retry-After={retry_after or '-'})") + if resp.status_code == 403: + raise CrawlBlocked(f"403 forbidden: {url}") + if resp.status_code >= 500: + raise CrawlFetchError(f"{resp.status_code}: {url}") + if resp.status_code >= 400: + raise CrawlSkip(f"{resp.status_code}: {url}") + + ct = resp.headers.get("content-type", "").lower() + if ct and not any(t in ct for t in _HTML_CONTENT_TYPES): + raise CrawlSkip(f"비-HTML content-type: {ct}: {url}") + if len(resp.content) > _MAX_PAGE_BYTES: + raise CrawlSkip(f"크기 초과: {len(resp.content)} bytes: {url}") + + return resp.text, str(resp.request.url) diff --git a/app/main.py b/app/main.py index edc8c10..eb49003 100644 --- a/app/main.py +++ b/app/main.py @@ -54,6 +54,7 @@ async def lifespan(app: FastAPI): from workers.law_monitor import run as law_monitor_run from workers.mailplus_archive import run as mailplus_run from workers.news_collector import run as news_collector_run + from workers.fulltext_worker import reconcile_unresolved as fulltext_reconcile_run from workers.queue_consumer import consume_queue, consume_markdown_queue from workers.study_queue_consumer import consume_study_queue from workers.study_session_queue_consumer import consume_study_session_queue @@ -121,6 +122,9 @@ async def lifespan(app: FastAPI): # 이드 W3-2: 공부중 토픽 약점 derived 스냅샷 (nightly 04:30 KST, LLM 0). study_diagnosis 표면 source. scheduler.add_job(study_weakness_run, CronTrigger(hour=4, minute=30, timezone=KST), id="study_weakness") scheduler.add_job(news_collector_run, "interval", hours=6, id="news_collector") + # crawl-24x7 A-2 안전망: fulltext 영구 실패(3회 소진) 문서를 RSS 요약 기준으로 + # 후속 enqueue (silent skip 누적 방지). 03:40 = dedup_reconcile(03:30) 직후 비충돌 슬롯. + scheduler.add_job(fulltext_reconcile_run, CronTrigger(hour=3, minute=40, timezone=KST), id="fulltext_reconcile") # plan ds-s1-backend-1 B-4: dedup 컬럼(duplicate_of/duplicate_count) 야간 절대 재계산. # soft-delete 잔여 드리프트 정리(멱등, 드리프트 없으면 no-op). cron 03:30 (다른 잡과 비충돌). scheduler.add_job(dedup_reconcile_run, CronTrigger(hour=3, minute=30, timezone=KST), id="dedup_reconcile") diff --git a/app/models/document.py b/app/models/document.py index 479ae99..ec031b2 100644 --- a/app/models/document.py +++ b/app/models/document.py @@ -118,7 +118,7 @@ class Document(Base): source_channel: Mapped[str | None] = mapped_column( Enum("law_monitor", "devonagent", "email", "web_clip", "tksafety", "inbox_route", "manual", "drive_sync", "news", "memo", - "voice", "hermes", + "voice", "hermes", "crawl", name="source_channel") ) # 외부 채널 (Hermes Discord 등) 의 channel/user/message_id/timestamp 메타. diff --git a/app/models/news_source.py b/app/models/news_source.py index 0373d64..6c58ed8 100644 --- a/app/models/news_source.py +++ b/app/models/news_source.py @@ -2,7 +2,8 @@ from datetime import datetime -from sqlalchemy import Boolean, DateTime, String, Text +from sqlalchemy import Boolean, DateTime, Integer, String, Text +from sqlalchemy.dialects.postgresql import JSONB from sqlalchemy.orm import Mapped, mapped_column from core.database import Base @@ -23,3 +24,22 @@ class NewsSource(Base): created_at: Mapped[datetime] = mapped_column( DateTime(timezone=True), default=datetime.now ) + + # ── A-3 (plan crawl-24x7-1) 레지스트리 증축 — migration 319 ── + # fetch_method: rss / rss+page / sitemap+page / page / api / signal-only + fetch_method: Mapped[str] = mapped_column(String(20), default="rss") + # fulltext_policy: none(현행) / page(기사 페이지 fetch 후 4-tier 승격) / feed-full(피드 본문이 전문) + fulltext_policy: Mapped[str] = mapped_column(String(20), default="none") + # NULL=공개, 값=구독 세션 키 (B-3 Playwright 어댑터 슬롯) + auth_profile: Mapped[str | None] = mapped_column(String(50)) + # 소스별 차등 폴링 (NULL=전역 6h 사이클) + poll_interval_minutes: Mapped[int | None] = mapped_column(Integer) + # 조건부 GET 워터마크 — 서버가 준 값 그대로 저장·재전송 (A-1) + etag: Mapped[str | None] = mapped_column(Text) + last_modified: Mapped[str | None] = mapped_column(Text) + # CDN ETag 회전 대비 콘텐츠 해시 변경감지 병행 (A-1) + feed_content_hash: Mapped[str | None] = mapped_column(String(64)) + # 추출 실패 잦은 소스의 site-specific CSS selector (A-2) + selector_override: Mapped[dict | None] = mapped_column(JSONB) + # rdf / table-strip / gn-redirect 등 파서 특이 케이스 (B-5) + parser_quirk: Mapped[str | None] = mapped_column(String(30)) diff --git a/app/models/queue.py b/app/models/queue.py index 18c574e..a3c142e 100644 --- a/app/models/queue.py +++ b/app/models/queue.py @@ -18,10 +18,11 @@ class ProcessingQueue(Base): stage: Mapped[str] = mapped_column( # 'stt' (audio): migration 150 / 'thumbnail' (video): queue_consumer 가 enqueue. # 'deep_summary' (PR-B B-1): classify_worker 가 에스컬레이션 시 enqueue. + # 'fulltext' (crawl-24x7 A-2): migration 321 — 기사 페이지 fetch 후 본문 승격. # DB enum 변경은 마이그레이션이 처리하므로 create_type=False. Enum( "extract", "classify", "summarize", "embed", "chunk", "preview", - "stt", "thumbnail", "deep_summary", "markdown", + "stt", "thumbnail", "deep_summary", "markdown", "fulltext", name="process_stage", create_type=False, ), diff --git a/app/models/source_health.py b/app/models/source_health.py new file mode 100644 index 0000000..9264e39 --- /dev/null +++ b/app/models/source_health.py @@ -0,0 +1,36 @@ +"""source_health 테이블 ORM (A-5, plan crawl-24x7-1) + +news_sources 와 1:1. 소스별 fetch 성공/실패 기록 + circuit breaker 상태. +silent skip 누적 방지의 가시성 기반 — A-8 헬스 패널이 읽는다. +""" + +from datetime import datetime + +from sqlalchemy import BigInteger, DateTime, ForeignKey, Integer, String, Text +from sqlalchemy.orm import Mapped, mapped_column + +from core.database import Base + + +class SourceHealth(Base): + __tablename__ = "source_health" + + id: Mapped[int] = mapped_column(primary_key=True) + source_id: Mapped[int] = mapped_column( + Integer, ForeignKey("news_sources.id", ondelete="CASCADE"), nullable=False + ) + consecutive_failures: Mapped[int] = mapped_column(Integer, default=0) + total_fetches: Mapped[int] = mapped_column(BigInteger, default=0) + total_failures: Mapped[int] = mapped_column(BigInteger, default=0) + last_success_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True)) + last_error: Mapped[str | None] = mapped_column(Text) + last_error_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True)) + last_fetch_items: Mapped[int | None] = mapped_column(Integer) + # 200 인데 entries 0 인 연속 fetch 횟수 (304/해시동일은 미집계 — 피드 부패 신호 전용) + empty_streak: Mapped[int] = mapped_column(Integer, default=0) + # closed(정상) / open(연속 실패 → 지수 backoff) / disabled(임계 초과, 수동 복구 대상) + circuit_state: Mapped[str] = mapped_column(String(10), default="closed") + circuit_opened_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True)) + updated_at: Mapped[datetime] = mapped_column( + DateTime(timezone=True), default=datetime.now + ) diff --git a/app/requirements.txt b/app/requirements.txt index 8451495..675218c 100644 --- a/app/requirements.txt +++ b/app/requirements.txt @@ -17,10 +17,13 @@ python-multipart>=0.0.9 jinja2>=3.1.0 feedparser>=6.0.0 pymupdf>=1.24.0 -# Web/Blog ingest (devonagent 트랙) — HTML 본문 정화 4-tier fallback -trafilatura>=1.12.0 +# Web/Blog ingest (devonagent 트랙) + 뉴스 fulltext 승격 (crawl-24x7 A-2) — 4-tier fallback. +# trafilatura 는 단일 메인테이너 리스크로 exact pin (A-2 결정). +trafilatura==2.1.0 readability-lxml>=0.8.1 markdownify>=0.13.1 +# tier-4 (bs4) 가 직접 import — 전이 의존 가정 제거 (crawl-24x7 A-2) +beautifulsoup4>=4.12.0 # office OOXML(docx/xlsx/pptx) → md (plan ds-s1-backend-1 C-1). # 정확한 핀은 E-1 markitdown OOXML PoC(devsbx/버전핀 컨텍스트)에서 확정. markitdown[docx,xlsx,pptx]>=0.1.0 diff --git a/app/workers/fulltext_worker.py b/app/workers/fulltext_worker.py new file mode 100644 index 0000000..1fbb505 --- /dev/null +++ b/app/workers/fulltext_worker.py @@ -0,0 +1,218 @@ +"""fulltext 승격 워커 (A-2 + A-7, plan crawl-24x7-1) + +news_collector 가 fulltext_policy='page' 소스의 기사에 enqueue 한 'fulltext' stage 를 소비: + 기사 페이지 politeness fetch (A-4) → 원본 HTML NAS gzip 보존 (A-7) + → extract_worker 4-tier 재사용 (tier 2 sibling .md 는 디스크 원본이 없어 비적용) + → extracted_text/md_content 승격 → summarize + (30일 게이트) embed/chunk enqueue. + +실패 처리 (큐 어휘 = DB enum, 분기만 워커): + - 일시 오류 (5xx/timeout) : raise → 큐 재시도 (max_attempts 3) + - 차단/비대상 (403/429/robots/비HTML/추출부족): RSS 요약으로 격하(degrade) 후 완료 + → summarize/embed/chunk enqueue 보장 (기사 유실 0). 격하 사유는 extract_meta.fulltext 에 기록. + - 영구 실패 (3회 소진) : 야간 reconcile_unresolved() 가 summarize 안전망 enqueue + ([[feedback_silent_skip_accumulation]] — 조건부 skip 이 영구 침묵으로 누적되지 않게). + +승격 게이트: 전 tier 공통 본문 >= 200자 (devonagent 와 달리 tier 4 도 게이트 적용 — +페이월/오류 페이지의 nav 찌꺼기를 본문으로 승격하느니 RSS 요약 격하가 낫다). +""" + +import gzip +import hashlib +import re +from datetime import datetime, timezone +from pathlib import Path + +from sqlalchemy import exists, select +from sqlalchemy.ext.asyncio import AsyncSession + +from core.config import settings +from core.crawl_politeness import CrawlBlocked, CrawlFetchError, CrawlSkip, fetch_page +from core.database import async_session +from core.utils import setup_logger +from models.document import Document +from models.queue import ProcessingQueue, enqueue_stage +from workers.extract_worker import ( + _WEB_MIN_BODY_LEN, + _extract_web_with_bs4, + _extract_web_with_readability, + _extract_web_with_trafilatura, +) + +logger = setup_logger("fulltext_worker") + +# 한국 기사 푸터 1층 후처리 (A-2) — 보수적으로 라인 단위만 제거 +_FOOTER_PATTERNS = [ + re.compile(r"^.{0,120}(무단\s*전재|무단\s*복제|재배포\s*금지|저작권자\s*[ⓒ©(]).*$", re.M), + re.compile(r"^[\w.+-]+@[\w.-]+\.[A-Za-z]{2,}\s*$", re.M), # 단독 이메일 라인 + re.compile(r"^\s*\S{2,4}\s*기자\s*$", re.M), # 단독 '◯◯◯ 기자' 라인 +] + + +def _strip_article_footer(body: str) -> str: + for pat in _FOOTER_PATTERNS: + body = pat.sub("", body) + return re.sub(r"\n{3,}", "\n\n", body).strip() + + +def _extract_body(html_text: str) -> tuple[str, str | None, str | None]: + """(body, engine, engine_version). 전 tier >= 200자 게이트, 미달이면 ("", None, None).""" + body, ver = _extract_web_with_trafilatura(html_text) + if body and len(body) >= _WEB_MIN_BODY_LEN: + return body, "trafilatura", ver + body, ver = _extract_web_with_readability(html_text) + if body and len(body) >= _WEB_MIN_BODY_LEN: + return body, "readability", ver + body, ver = _extract_web_with_bs4(html_text) + if body and len(body) >= _WEB_MIN_BODY_LEN: + return body, "bs4_text", ver + return "", None, None + + +def _raw_html_path(source_id: int | None, file_hash: str, now: datetime) -> Path: + """A-7 원본 보존 경로 — NAS 본진. 한글 디렉토리의 NFC/NFD 비대칭을 피해 source_id 사용.""" + src_dir = f"src_{source_id}" if source_id is not None else "src_unknown" + return ( + Path(settings.nas_mount_path) / "crawl_raw" / src_dir + / now.strftime("%Y-%m") / f"{file_hash}.html.gz" + ) + + +def _save_raw_html(path: Path, html_text: str) -> None: + path.parent.mkdir(parents=True, exist_ok=True) + with gzip.open(path, "wb") as f: + f.write(html_text.encode("utf-8", errors="replace")) + + +async def _enqueue_downstream(session: AsyncSession, doc: Document) -> None: + """승격/격하 공통 후속 — summarize 무조건 + 30일 게이트 통과 시 embed/chunk.""" + await enqueue_stage(session, doc.id, "summarize") + published_raw = (doc.extract_meta or {}).get("published_at") + days_old = 0 + if published_raw: + try: + pub_dt = datetime.fromisoformat(published_raw) + days_old = (datetime.now(timezone.utc) - pub_dt).days + except ValueError: + days_old = 0 # 파싱 불가 = 신규 취급 (수집 시점 기본과 동일) + if days_old <= 30: + await enqueue_stage(session, doc.id, "embed") + await enqueue_stage(session, doc.id, "chunk") + + +def _set_fulltext_meta(doc: Document, **fields) -> None: + """extract_meta.fulltext 갱신 — JSONB 변경 감지를 위해 dict 재할당.""" + meta = dict(doc.extract_meta or {}) + meta["fulltext"] = {**meta.get("fulltext", {}), **fields} + doc.extract_meta = meta + + +async def _degrade(session: AsyncSession, doc: Document, reason: str) -> None: + """본문 승격 실패 — RSS 요약 그대로 후속 단계 진행 (기사 유실 0).""" + _set_fulltext_meta( + doc, status="degraded", reason=reason[:300], + resolved_at=datetime.now(timezone.utc).isoformat(), + ) + await _enqueue_downstream(session, doc) + logger.warning(f"[fulltext] doc={doc.id} 격하(RSS 요약 유지): {reason}") + + +async def process(document_id: int, session: AsyncSession) -> None: + """기사 1건 풀텍스트 승격. queue_consumer 컨벤션 시그니처 (커밋은 consumer 가).""" + doc = await session.get(Document, document_id) + if not doc: + raise ValueError(f"문서 ID {document_id}를 찾을 수 없음") + if not doc.edit_url: + await _degrade(session, doc, "edit_url 없음") + return + + meta = doc.extract_meta or {} + source_id = meta.get("source_id") + + try: + html_text, final_url = await fetch_page(doc.edit_url) + except (CrawlBlocked, CrawlSkip) as e: + await _degrade(session, doc, f"{type(e).__name__}: {e}") + return + except CrawlFetchError: + raise # 일시 오류 — 큐 재시도 + + now = datetime.now(timezone.utc) + + # A-7: 원본 HTML 보존 (추출기 교체 시 전체 재추출 가능 상태 유지) + raw_path = _raw_html_path(source_id, doc.file_hash, now) + try: + _save_raw_html(raw_path, html_text) + raw_saved = True + except OSError as e: + # NAS 일시 장애 시 보존만 누락하고 승격은 진행 — 사유 기록 (silent 누락 회피) + raw_saved = False + logger.error(f"[fulltext] doc={doc.id} 원본 보존 실패 (승격은 진행): {e}") + + body, engine, engine_ver = _extract_body(html_text) + if not engine: + await _degrade(session, doc, f"추출 실패 (전 tier < {_WEB_MIN_BODY_LEN}자)") + return + + clean_body = _strip_article_footer(body.replace("\x00", "")) + if len(clean_body) < _WEB_MIN_BODY_LEN: + await _degrade(session, doc, "푸터 제거 후 본문 부족") + return + + title = doc.title or "" + doc.extracted_text = f"{title}\n\n{clean_body}" if title else clean_body + doc.extracted_at = now + doc.extractor_version = f"rss+page@{engine}" + doc.md_content = clean_body + doc.md_status = "success" + doc.md_extraction_engine = engine + doc.md_extraction_engine_version = engine_ver + doc.md_format_version = "1.0" + doc.md_generated_at = now + doc.md_source_hash = hashlib.sha256(html_text.encode("utf-8", errors="replace")).hexdigest() + doc.md_content_hash = hashlib.sha256(clean_body.encode("utf-8")).hexdigest() + doc.md_extraction_error = None # 수집 시점의 '변환 비대상' 마커 해제 + doc.content_origin = "extracted" + doc.file_size = len(doc.extracted_text.encode()) + _set_fulltext_meta( + doc, status="promoted", engine=engine, + raw_html_path=str(raw_path) if raw_saved else None, + final_url=final_url, body_chars=len(clean_body), + resolved_at=now.isoformat(), + ) + + await _enqueue_downstream(session, doc) + logger.info( + f"[fulltext/{engine}] doc={doc.id} {len(clean_body)}자 승격 " + f"(raw={'saved' if raw_saved else 'MISSING'})" + ) + + +async def reconcile_unresolved() -> None: + """안전망 (야간 1회): fulltext 영구 실패(3회 소진)로 summarize 가 영영 안 잡힌 + 뉴스 문서에 RSS 요약 기준 후속 단계를 enqueue. 멱등 — enqueue 후엔 조건 불일치.""" + async with async_session() as session: + summarize_q = ( + select(ProcessingQueue.id) + .where( + ProcessingQueue.document_id == Document.id, + ProcessingQueue.stage == "summarize", + ) + ) + result = await session.execute( + select(Document) + .join(ProcessingQueue, ProcessingQueue.document_id == Document.id) + .where( + ProcessingQueue.stage == "fulltext", + ProcessingQueue.status == "failed", + Document.source_channel == "news", + ~exists(summarize_q), + ) + .limit(200) + ) + docs = result.scalars().unique().all() + for doc in docs: + _set_fulltext_meta(doc, status="failed_reconciled") + await _enqueue_downstream(session, doc) + if docs: + await session.commit() + logger.warning(f"[fulltext] reconcile: 영구 실패 {len(docs)}건 RSS 요약으로 후속 enqueue") diff --git a/app/workers/news_collector.py b/app/workers/news_collector.py index 5b4c8ad..c64cf95 100644 --- a/app/workers/news_collector.py +++ b/app/workers/news_collector.py @@ -1,8 +1,15 @@ -"""뉴스 수집 워커 — RSS/API에서 기사 수집, documents에 저장""" +"""뉴스 수집 워커 — RSS/API에서 기사 수집, documents에 저장 + +plan crawl-24x7-1 A그룹 (2026-06-10): + A-1 조건부 GET(ETag/Last-Modified 그대로 재전송) + 콘텐츠 해시 변경감지 + A-2 fulltext_policy='page' 소스는 'fulltext' stage 로 본문 승격 위임 + A-5 source_health 기록 + circuit breaker (소스별 실패 격리) + A-6 first-wins + 포털 전재 2차 dedup (제목+최근 3일, 12자 이상 제목 한정) +""" import hashlib import re -from datetime import datetime, timezone +from datetime import datetime, timedelta, timezone from html import unescape from urllib.parse import parse_qsl, urlencode, urlparse, urlunparse @@ -10,11 +17,13 @@ import feedparser import httpx from sqlalchemy import select +from core.crawl_politeness import CRAWL_UA from core.database import async_session from core.utils import setup_logger from models.document import Document from models.news_source import NewsSource from models.queue import enqueue_stage +from models.source_health import SourceHealth logger = setup_logger("news_collector") @@ -38,18 +47,23 @@ CATEGORY_MAP = { } +class FeedError(Exception): + """소스 단위 fetch/parse 실패 — run() 이 source_health 실패로 기록.""" + + def _normalize_category(raw: str) -> str: """카테고리 표준화""" return CATEGORY_MAP.get(raw, CATEGORY_MAP.get(raw.strip(), "Other")) -def _clean_html(text: str) -> str: - """HTML 태그 제거 + 정제""" +def _clean_html(text: str, max_len: int | None = 1000) -> str: + """HTML 태그 제거 + 정제. max_len=None 이면 절단 없음 (feed-full 전문용).""" if not text: return "" text = re.sub(r"<[^>]+>", "", text) text = unescape(text) - return text.strip()[:1000] + text = text.strip() + return text if max_len is None else text[:max_len] # tracking 파라미터 판별 — prefix(utm_/at_=BBC/ns_=BBC/mc_=mailchimp) + 단독 키 @@ -87,8 +101,93 @@ def _normalize_to_utc(dt) -> datetime: return datetime.now(timezone.utc) +# ── A-5: circuit breaker 정책 ── +# 연속 실패 >= OPEN 임계 → open (재시도 간격 지수 확대, 6h × 2^n, cap 48h) +# 연속 실패 > DISABLE 임계 → disabled (수집 제외 + 가시 로그, 수동 복구 대상) +# news_sources.enabled 는 건드리지 않는다 — 사용자 의도(enabled)와 자동 상태(circuit) 분리. +_CIRCUIT_OPEN_AFTER = 3 +_CIRCUIT_DISABLE_AFTER = 10 +_BACKOFF_BASE_HOURS = 6 +_BACKOFF_CAP_HOURS = 48 +_EMPTY_STREAK_ALERT = 8 # 6h 사이클 × 8 = 약 2일 연속 빈 피드 → 가시 경고 + + +def _should_attempt(health: SourceHealth, now: datetime) -> bool: + """circuit 상태에 따라 이번 사이클 fetch 여부 결정. + + 주의 (B-3 계약 ②, r5): 추후 relogin_requested 플래그 소비는 반드시 이 + open-스킵 분기보다 *앞*에 두어야 한다 — open 이 스케줄 제외 형태가 되면 + 배치 경계가 안 와 플래그가 영원히 미소비(half-open 데드 버튼)가 된다. + """ + if health.circuit_state == "disabled": + return False + if health.circuit_state == "open" and health.last_error_at is not None: + over = max(health.consecutive_failures - _CIRCUIT_OPEN_AFTER, 0) + backoff_h = min(_BACKOFF_BASE_HOURS * (2 ** over), _BACKOFF_CAP_HOURS) + if now - health.last_error_at < timedelta(hours=backoff_h): + return False + return True + + +def _record_success(health: SourceHealth, items: int, not_modified: bool, now: datetime) -> None: + health.consecutive_failures = 0 + health.total_fetches += 1 + health.last_success_at = now + health.last_fetch_items = items + if health.circuit_state != "closed": + logger.info(f"[health] source={health.source_id} circuit {health.circuit_state}→closed") + health.circuit_state = "closed" + health.circuit_opened_at = None + # 빈 피드 streak: 304/해시동일은 정상 신호라 미집계, 200+entries 0 만 집계 (피드 부패 감시) + if not_modified: + pass + elif items == 0: + health.empty_streak += 1 + if health.empty_streak >= _EMPTY_STREAK_ALERT: + logger.error( + f"[health] source={health.source_id} 빈 피드 {health.empty_streak}회 연속 " + f"— 피드 부패 의심 (RSSHub 류 라우트 깨짐 패턴)" + ) + else: + health.empty_streak = 0 + health.updated_at = now + + +def _record_failure(health: SourceHealth, error: str, now: datetime) -> None: + health.consecutive_failures += 1 + health.total_fetches += 1 + health.total_failures += 1 + health.last_error = error[:500] + health.last_error_at = now + health.updated_at = now + cf = health.consecutive_failures + if cf > _CIRCUIT_DISABLE_AFTER and health.circuit_state != "disabled": + health.circuit_state = "disabled" + logger.error( + f"[health] source={health.source_id} 연속 실패 {cf}회 — circuit DISABLED " + f"(수집 제외, A-8 패널에서 수동 복구 필요)" + ) + elif cf >= _CIRCUIT_OPEN_AFTER and health.circuit_state == "closed": + health.circuit_state = "open" + health.circuit_opened_at = now + logger.warning(f"[health] source={health.source_id} 연속 실패 {cf}회 — circuit open") + + +async def _get_or_create_health(session, source_id: int) -> SourceHealth: + result = await session.execute( + select(SourceHealth).where(SourceHealth.source_id == source_id) + ) + health = result.scalars().first() + if health is None: + health = SourceHealth(source_id=source_id) + session.add(health) + await session.flush() + return health + + async def run(): """뉴스 수집 실행""" + now = datetime.now(timezone.utc) async with async_session() as session: result = await session.execute( select(NewsSource).where(NewsSource.enabled == True) @@ -101,17 +200,23 @@ async def run(): total = 0 for source in sources: + health = await _get_or_create_health(session, source.id) + if not _should_attempt(health, now): + logger.info(f"[{source.name}] circuit {health.circuit_state} — 이번 사이클 skip") + continue try: if source.feed_type == "api": - count = await _fetch_api(session, source) + count, status = await _fetch_api(session, source) else: - count = await _fetch_rss(session, source) + count, status = await _fetch_rss(session, source) source.last_fetched_at = datetime.now(timezone.utc) + _record_success(health, count, status == "not_modified", now) total += count except Exception as e: logger.error(f"[{source.name}] 수집 실패: {e}") source.last_fetched_at = datetime.now(timezone.utc) + _record_failure(health, str(e) or repr(e), now) await session.commit() logger.info(f"뉴스 수집 완료: {total}건 신규") @@ -122,8 +227,58 @@ ALLOWED_CONTENT_TYPES = ("application/rss+xml", "application/atom+xml", "application/xml", "text/xml") -async def _fetch_rss(session, source: NewsSource) -> int: - """RSS 피드 수집 — redirect 재검증 + 크기/content-type 제한""" +async def _is_portal_duplicate(session, title: str) -> bool: + """A-6 2차 dedup: 포털 전재본 vs 원본이 다른 URL 로 이중 적재되는 케이스. + + 보조 키 = 제목 + 최근 3일 (다른 소스/다른 URL 이므로 1차 키로 안 잡힘). + 범용 제목 오탐 방지: 12자 미만 제목은 비적용. skip 은 전부 로그 (silent 누락 회피). + """ + if len(title) < 12: + return False + cutoff = datetime.now(timezone.utc) - timedelta(days=3) + dup = await session.execute( + select(Document.id).where( + Document.title == title, + Document.source_channel == "news", + Document.file_format == "article", + Document.extracted_at >= cutoff, + ).limit(1) + ) + return dup.scalars().first() is not None + + +async def _enqueue_processing(session, doc: Document, source: NewsSource, pub_dt: datetime) -> None: + """후속 단계 enqueue. + + fulltext_policy='page' 소스는 'fulltext' stage 만 — summarize/embed/chunk 는 + fulltext_worker 가 승격(또는 격하) 확정 후 enqueue (RSS 요약 선요약 → 풀텍스트 + 도착 시 summarize_worker 의 '이미 요약 있음 skip' 에 막히는 순서 함정 회피). + """ + if source.fulltext_policy == "page" and doc.edit_url: + await enqueue_stage(session, doc.id, "fulltext") + return + await enqueue_stage(session, doc.id, "summarize") + days_old = (datetime.now(timezone.utc) - pub_dt).days + if days_old <= 30: + await enqueue_stage(session, doc.id, "embed") + await enqueue_stage(session, doc.id, "chunk") + + +def _build_extract_meta(source: NewsSource, pub_dt: datetime) -> dict: + """fulltext_worker / 패널이 쓰는 출처 메타 (documents 에 source FK 가 없어 여기 기록).""" + return { + "source_id": source.id, + "source_name": source.name, + "published_at": pub_dt.isoformat(), + } + + +async def _fetch_rss(session, source: NewsSource) -> tuple[int, str]: + """RSS 피드 수집 — redirect 재검증 + 크기/content-type 제한 + 조건부 GET (A-1). + + 반환 (신규 건수, 상태). 상태 'not_modified' = 304 또는 콘텐츠 해시 동일. + 소스 단위 실패는 FeedError raise — run() 이 health 실패로 기록. + """ from urllib.parse import urljoin from core.url_validator import validate_feed_url, HTTP_EXCEPTION_DOMAINS @@ -134,17 +289,24 @@ async def _fetch_rss(session, source: NewsSource) -> int: # 순수 HTTP 소스인데 allowlist에 없으면 차단 if source.feed_url.startswith("http://") and not http_allowed: - logger.error(f"[{source.name}] HTTP 차단 (allowlist 미등록): {source_hostname}") - return 0 + raise FeedError(f"HTTP 차단 (allowlist 미등록): {source_hostname}") # fetch 전 URL 재검증 (등록 이후 DNS 변경 대비) try: validate_feed_url(source.feed_url, allow_http=http_allowed) except ValueError as e: - logger.error(f"[{source.name}] URL 검증 실패: {e}") - return 0 + raise FeedError(f"URL 검증 실패: {e}") from e - async with httpx.AsyncClient(timeout=10, follow_redirects=False) as client: + # A-1: 정직 UA + 조건부 GET — 서버가 준 워터마크를 받은 그대로 재전송 + headers = {"User-Agent": CRAWL_UA} + if source.etag: + headers["If-None-Match"] = source.etag + if source.last_modified: + headers["If-Modified-Since"] = source.last_modified + + async with httpx.AsyncClient( + timeout=10, follow_redirects=False, headers=headers + ) as client: resp = await client.get(source.feed_url) # redirect 수동 처리 (최대 3회, 각 target 재검증) @@ -156,29 +318,41 @@ async def _fetch_rss(session, source: NewsSource) -> int: try: validate_feed_url(location, allow_http=http_allowed) except ValueError as e: - logger.error(f"[{source.name}] redirect target 차단: {e}") - return 0 + raise FeedError(f"redirect target 차단: {e}") from e resp = await client.get(location) redirects += 1 if resp.is_redirect: - logger.error(f"[{source.name}] redirect 3회 초과") - return 0 + raise FeedError("redirect 3회 초과") + + if resp.status_code == 304: + logger.info(f"[{source.name}] 304 Not Modified — 본문 미전송") + return 0, "not_modified" resp.raise_for_status() if len(resp.content) > MAX_RESPONSE_SIZE: - logger.warning(f"[{source.name}] 응답 크기 초과: {len(resp.content)} bytes") - return 0 + raise FeedError(f"응답 크기 초과: {len(resp.content)} bytes") ct = resp.headers.get("content-type", "").lower() if not any(t in ct for t in ALLOWED_CONTENT_TYPES): - logger.warning(f"[{source.name}] 비정상 content-type: {ct}") - return 0 + raise FeedError(f"비정상 content-type: {ct}") + + # A-1: 워터마크 갱신 + 콘텐츠 해시 변경감지 (CDN 의 ETag 회전 대비 병행) + new_etag = resp.headers.get("etag") + new_last_modified = resp.headers.get("last-modified") + if new_etag: + source.etag = new_etag + if new_last_modified: + source.last_modified = new_last_modified + content_hash = hashlib.sha256(resp.content).hexdigest() + if source.feed_content_hash == content_hash: + logger.info(f"[{source.name}] 콘텐츠 해시 동일 — 파싱 skip") + return 0, "not_modified" + source.feed_content_hash = content_hash feed = feedparser.parse(resp.text) if feed.bozo and not feed.entries: - logger.warning(f"[{source.name}] RSS 파싱 실패: {feed.bozo_exception}") - return 0 + raise FeedError(f"RSS 파싱 실패: {feed.bozo_exception}") count = 0 for entry in feed.entries: @@ -190,6 +364,18 @@ async def _fetch_rss(session, source: NewsSource) -> int: if not summary: summary = title + # A-6: feed-full 소스만 피드 본문을 전문으로 신뢰 (truncate·광고 삽입이 흔해 + # 일반 소스의 summary/content:encoded 를 전문으로 오인 저장 금지) + body = summary + is_feed_full = False + if source.fulltext_policy == "feed-full": + content_list = entry.get("content") or [] + raw_body = content_list[0].get("value", "") if content_list else "" + full_body = _clean_html(raw_body or entry.get("summary", ""), max_len=None) + if len(full_body) > len(summary): + body = full_body + is_feed_full = True + link = entry.get("link", "") published = entry.get("published_parsed") or entry.get("updated_parsed") pub_dt = datetime(*published[:6], tzinfo=timezone.utc) if published else datetime.now(timezone.utc) @@ -209,6 +395,11 @@ async def _fetch_rss(session, source: NewsSource) -> int: if existing.scalars().first(): continue + # A-6 2차: 포털 전재 dedup (first-wins — 먼저 적재된 쪽이 정본) + if await _is_portal_duplicate(session, title): + logger.info(f"[{source.name}] portal-dup skip: {title[:60]}") + continue + category = _normalize_category(source.category or "") source_short = source.name.split(" ")[0] # "경향신문 문화" → "경향신문" @@ -216,15 +407,16 @@ async def _fetch_rss(session, source: NewsSource) -> int: file_path=f"news/{source.name}/{article_id}", file_hash=article_id, file_format="article", - file_size=len(summary.encode()), + file_size=len(body.encode()), file_type="note", title=title, - extracted_text=f"{title}\n\n{summary}", + extracted_text=f"{title}\n\n{body}", extracted_at=datetime.now(timezone.utc), - extractor_version="rss", + extractor_version="rss-feed-full" if is_feed_full else "rss", # article = 텍스트 네이티브(본문=extracted_text). markdown 단계 미enqueue 라 # 기본값 'pending' 이면 영구 비수렴 → backlog 지표 오염 + md_status_pending partial # 인덱스 비대. 생성 시점에 terminal 'skipped' 로 명시(변환 비대상). + # fulltext_policy='page' 소스는 fulltext_worker 가 승격 시 success 로 갱신. md_status="skipped", md_extraction_error="news article: 텍스트 네이티브, markdown 변환 비대상", source_channel="news", @@ -235,30 +427,27 @@ async def _fetch_rss(session, source: NewsSource) -> int: ai_domain="News", ai_sub_group=source_short, ai_tags=[f"News/{source_short}/{category}"], + extract_meta=_build_extract_meta(source, pub_dt), ) session.add(doc) await session.flush() - # summarize + embed + chunk 등록 (classify 불필요) - await enqueue_stage(session, doc.id, "summarize") - days_old = (datetime.now(timezone.utc) - pub_dt).days - if days_old <= 30: - await enqueue_stage(session, doc.id, "embed") - await enqueue_stage(session, doc.id, "chunk") + # summarize + embed + chunk 등록 (classify 불필요). + # page 정책 소스는 fulltext 만 — 후속은 fulltext_worker 가 확정 후 enqueue. + await _enqueue_processing(session, doc, source, pub_dt) count += 1 logger.info(f"[{source.name}] RSS → {count}건 수집") - return count + return count, "ok" -async def _fetch_api(session, source: NewsSource) -> int: +async def _fetch_api(session, source: NewsSource) -> tuple[int, str]: """NYT API 수집 — 키 마스킹 + health degradation""" import os nyt_key = os.getenv("NYT_API_KEY", "") if not nyt_key: - logger.error("NYT_API_KEY 미설정 — US 뉴스 수집 불가") - return 0 + raise FeedError("NYT_API_KEY 미설정 — US 뉴스 수집 불가") try: async with httpx.AsyncClient(timeout=10) as client: @@ -270,12 +459,10 @@ async def _fetch_api(session, source: NewsSource) -> int: except httpx.HTTPStatusError as e: # 쿼리스트링(api-key 포함) 제거 — path까지만 로깅 safe_url = str(e.request.url).split("?")[0] - logger.error(f"NYT API 실패: {e.response.status_code} @ {safe_url}") - return 0 + raise FeedError(f"NYT API 실패: {e.response.status_code} @ {safe_url}") from e except httpx.RequestError as e: safe_url = str(e.request.url).split("?")[0] if e.request else "unknown" - logger.error(f"NYT API 연결 실패: {safe_url}") - return 0 + raise FeedError(f"NYT API 연결 실패: {safe_url}") from e data = resp.json() count = 0 @@ -309,6 +496,10 @@ async def _fetch_api(session, source: NewsSource) -> int: if existing.scalars().first(): continue + if await _is_portal_duplicate(session, title): + logger.info(f"[{source.name}] portal-dup skip: {title[:60]}") + continue + category = _normalize_category(article.get("section", source.category or "")) source_short = source.name.split(" ")[0] @@ -334,17 +525,14 @@ async def _fetch_api(session, source: NewsSource) -> int: ai_domain="News", ai_sub_group=source_short, ai_tags=[f"News/{source_short}/{category}"], + extract_meta=_build_extract_meta(source, pub_dt), ) session.add(doc) await session.flush() - await enqueue_stage(session, doc.id, "summarize") - days_old = (datetime.now(timezone.utc) - pub_dt).days - if days_old <= 30: - await enqueue_stage(session, doc.id, "embed") - await enqueue_stage(session, doc.id, "chunk") + await _enqueue_processing(session, doc, source, pub_dt) count += 1 logger.info(f"[{source.name}] API → {count}건 수집") - return count + return count, "ok" diff --git a/app/workers/queue_consumer.py b/app/workers/queue_consumer.py index 3cc7fb8..b198067 100644 --- a/app/workers/queue_consumer.py +++ b/app/workers/queue_consumer.py @@ -22,8 +22,11 @@ logger = setup_logger("queue_consumer") # stage별 배치 크기 # stt 는 GPU 단일 점유 + 회의 30분짜리도 가능 → 배치 1. thumbnail 은 ffmpeg subprocess 로 가벼움. # deep_summary (PR-B B-1) 는 MLX 26B 단일 Semaphore(1) 경유 → 배치 1. +# fulltext 는 politeness 지연(같은 도메인 5–15s)이 배치 내 직렬로 걸린다 — 배치 3 이면 +# 같은 도메인 최악 ~45s/사이클, 메인 큐 1m 간격(max_instances=1, coalesce)이 흡수. BATCH_SIZE = {"extract": 5, "classify": 3, "summarize": 3, "embed": 1, "chunk": 1, - "preview": 2, "stt": 1, "thumbnail": 3, "deep_summary": 1, "markdown": 1} + "preview": 2, "stt": 1, "thumbnail": 3, "deep_summary": 1, "markdown": 1, + "fulltext": 3} STALE_THRESHOLD_MINUTES = 10 # markdown 대형 split 변환은 한 doc 이 수십 분(5210 ≈ 40분) 동안 processing 상태로 머문다. # marker_worker 는 queue 행에 heartbeat 를 찍지 않으므로(started_at 고정), main 의 10분 @@ -35,7 +38,7 @@ MARKDOWN_STALE_THRESHOLD_MINUTES = int(os.getenv("MARKDOWN_STALE_MINUTES", "120" # STT 도 장기 작업 가능성이 있으나 본 PR 범위 밖 — main 에 유지(follow-up). MAIN_QUEUE_STAGES = [ "extract", "classify", "summarize", "embed", "chunk", - "preview", "stt", "thumbnail", "deep_summary", + "preview", "stt", "thumbnail", "deep_summary", "fulltext", ] MARKDOWN_QUEUE_STAGES = ["markdown"] @@ -179,6 +182,7 @@ def _load_workers(): from workers.summarize_worker import process as summarize_process from workers.thumbnail_worker import process as thumbnail_process from workers.marker_worker import process as marker_process + from workers.fulltext_worker import process as fulltext_process return { "extract": extract_process, @@ -195,6 +199,9 @@ def _load_workers(): # Phase 1B: classify 완료 후 enqueue. PDF→markdown 변환 (leaf, embed/chunk 와 독립). # consume_markdown_queue 가 전담 (대형 split 변환이 메인 파이프라인을 막지 않도록). "markdown": marker_process, + # crawl-24x7 A-2: 기사 페이지 fetch → 4-tier 본문 승격. 후속(summarize/embed/chunk)은 + # 워커가 직접 enqueue — next_stages dict 미등록 (enqueue_next_stage no-op). + "fulltext": fulltext_process, } diff --git a/migrations/319_news_sources_crawl_columns.sql b/migrations/319_news_sources_crawl_columns.sql new file mode 100644 index 0000000..74ee9c7 --- /dev/null +++ b/migrations/319_news_sources_crawl_columns.sql @@ -0,0 +1,19 @@ +-- A-3 (plan crawl-24x7-1): 소스 레지스트리 증축 — additive only. +-- fetch_method : rss / rss+page / sitemap+page / page / api / signal-only +-- fulltext_policy : none(현행 유지) / page(기사 페이지 fetch 후 4-tier 승격) / feed-full(피드 본문이 전문) +-- auth_profile : NULL=공개, 값=구독 세션 키 (B-3 Playwright 어댑터용 슬롯) +-- poll_interval_minutes : 소스별 차등 폴링 (NULL=전역 6h 사이클) +-- etag / last_modified : 조건부 GET 워터마크 — 받은 그대로 저장·재전송 (상태는 전부 DB, APScheduler in-process) +-- feed_content_hash : CDN ETag 회전 대비 콘텐츠 해시 변경감지 병행 +-- selector_override : 추출 실패 잦은 소스의 site-specific CSS selector (JSONB) +-- parser_quirk : rdf / table-strip / gn-redirect 등 파서 특이 케이스 +ALTER TABLE news_sources + ADD COLUMN IF NOT EXISTS fetch_method VARCHAR(20) NOT NULL DEFAULT 'rss', + ADD COLUMN IF NOT EXISTS fulltext_policy VARCHAR(20) NOT NULL DEFAULT 'none', + ADD COLUMN IF NOT EXISTS auth_profile VARCHAR(50), + ADD COLUMN IF NOT EXISTS poll_interval_minutes INTEGER, + ADD COLUMN IF NOT EXISTS etag TEXT, + ADD COLUMN IF NOT EXISTS last_modified TEXT, + ADD COLUMN IF NOT EXISTS feed_content_hash VARCHAR(64), + ADD COLUMN IF NOT EXISTS selector_override JSONB, + ADD COLUMN IF NOT EXISTS parser_quirk VARCHAR(30); diff --git a/migrations/320_source_channel_add_crawl.sql b/migrations/320_source_channel_add_crawl.sql new file mode 100644 index 0000000..98601bf --- /dev/null +++ b/migrations/320_source_channel_add_crawl.sql @@ -0,0 +1,3 @@ +-- 0-5 (a) 확정 (plan crawl-24x7-1): 도메인 자료(안전/공학/철학) 채널 신설 — news 와 분리. +-- 신규 값은 같은 트랜잭션 내 사용 금지 (PG 제약) — 본 배치의 다른 마이그레이션은 'crawl' 미사용. +ALTER TYPE source_channel ADD VALUE IF NOT EXISTS 'crawl'; diff --git a/migrations/321_process_stage_add_fulltext.sql b/migrations/321_process_stage_add_fulltext.sql new file mode 100644 index 0000000..8abede8 --- /dev/null +++ b/migrations/321_process_stage_add_fulltext.sql @@ -0,0 +1,3 @@ +-- A-2 (plan crawl-24x7-1): RSS 요약 → 기사 페이지 fetch → 4-tier 본문 승격 stage. +-- fulltext_policy='page' 소스의 기사에만 news_collector 가 enqueue. +ALTER TYPE process_stage ADD VALUE IF NOT EXISTS 'fulltext'; diff --git a/migrations/322_source_health_table.sql b/migrations/322_source_health_table.sql new file mode 100644 index 0000000..e093945 --- /dev/null +++ b/migrations/322_source_health_table.sql @@ -0,0 +1,19 @@ +-- A-5 (plan crawl-24x7-1): 소스 건강 — 소스별 실패 격리 기록 + circuit breaker. +-- 한 소스가 죽어도 나머지 영향 0. silent skip 누적 방지의 가시성 기반 (A-8 패널이 읽음). +-- circuit_state: closed(정상) / open(연속 실패로 지수 backoff 중) / disabled(M회 초과, 수동 복구 대상) +-- empty_streak : 200 인데 entries 0 인 연속 fetch 횟수 (피드 부패 감시 — 304/해시동일은 미집계) +CREATE TABLE IF NOT EXISTS source_health ( + id SERIAL PRIMARY KEY, + source_id INTEGER NOT NULL REFERENCES news_sources(id) ON DELETE CASCADE, + consecutive_failures INTEGER NOT NULL DEFAULT 0, + total_fetches BIGINT NOT NULL DEFAULT 0, + total_failures BIGINT NOT NULL DEFAULT 0, + last_success_at TIMESTAMPTZ, + last_error TEXT, + last_error_at TIMESTAMPTZ, + last_fetch_items INTEGER, + empty_streak INTEGER NOT NULL DEFAULT 0, + circuit_state VARCHAR(10) NOT NULL DEFAULT 'closed', + circuit_opened_at TIMESTAMPTZ, + updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); diff --git a/migrations/323_source_health_source_id_uq.sql b/migrations/323_source_health_source_id_uq.sql new file mode 100644 index 0000000..c0dd650 --- /dev/null +++ b/migrations/323_source_health_source_id_uq.sql @@ -0,0 +1,2 @@ +-- A-5: source_health 는 news_sources 와 1:1 — upsert 기준 키. +CREATE UNIQUE INDEX IF NOT EXISTS uq_source_health_source_id ON source_health (source_id);