8583465c58
- B-4 fetch_method='signal-only': 페이지 fetch 0 + summarize 스킵(검색 색인만, 맥미니 부하 0) + 본문 무절단(_entry_body — arXiv 초록 1.6K 보존). 다이제스트는 ai_summary NULL 제외 규칙으로 자연 배제. 레지스트리 오설정(page) 방어 가드. - 시드 9 소스 (전 URL 2026-06-11 live 검증): Bloomberg Markets/Technology(skip-video, 비디오 혼재 실측)·Economist Latest·Nikkei Asia(RDF — feedparser 네이티브, 분기 불요 fixture 박제)·ASME JPVT(site_1000037 실측 매핑)·arXiv 2종·IEEE Spectrum 2종(feed-full, 피드 description 이 전문 7.9~14K자 실측). - csb_collector: sitemap lastmod diff (weekly 월 06:50) — 워터마크(selector_override) + cap 40/회 점진 백필 + diff sanity 300 + 보고서 PDF(/assets/, recommendation 제외) → extract 파이프라인. 초기 일괄 = CLI --bulk. - api_standards_collector: 공지 목록 링크 파싱(실측 — 페이지 diff 아님, 상세 URL 10건/페이지) → 신규 상세만 ingest (monthly 5일 07:05). 초기 백필 = CLI --bulk. - ccps_collector: aiche.org 평문 403(UA 무관 실측) → playwright-fetcher 익명 컨텍스트 + referer 쿠키 승계 /download(base64) 신설로 월간 Beacon PDF (monthly 5일 07:20). 헤드리스 차단 시 CrawlBlocked → health 가시화 (르몽드 PARK 선례). - B-5 잔여: rdf/feed-reader-UA = 코드 분기 불요 실측 박제 (Economist 는 Archiver UA 200). table-strip/gn-redirect 는 해당 소스 미진입 — 백로그 유지. - 테스트 24건 신규 (fixture 9건 live 박제, economist/ieee 는 item trim) — 39 passed. - 마이그 327 단일 statement (PKM 트랙과 번호 경합 주의 — 327 본 트랙 선점). Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
186 lines
7.3 KiB
Python
186 lines
7.3 KiB
Python
"""C-2 잔여 ② CCPS Process Safety Beacon 수집 워커 (사이클 3).
|
|
|
|
월간 1페이지 PDF + 한국어 번역판 — RAG 청크로 이상적 크기 (카드 C-2).
|
|
aiche.org 는 평문 httpx 를 UA 무관 403 (2026-06-11 실측: Archiver UA·브라우저 UA 모두)
|
|
→ playwright-fetcher 익명 컨텍스트 경유 (B-3 인프라 재사용):
|
|
목록 페이지 브라우저 fetch → beacon PDF 링크 파싱 → referer 쿠키 승계 다운로드.
|
|
|
|
알려진 리스크: WAF 가 헤드리스 자체를 차단하면 _CHALLENGE_MARKERS → CrawlBlocked
|
|
→ health 실패 기록 후 종료 (르몽드 B-3 PARK 선례 — 그 경우 대안 = 이메일 구독
|
|
.eml 트랙 결합, [[feedback_antibot_headless_subscription_wall]]).
|
|
|
|
스케줄 = monthly (main.py 5일 07:20 KST). 월간 1건 페이스라 diff 는 file_path dedup 으로 충분.
|
|
수동: docker exec hyungi_document_server-fastapi-1 python -m workers.ccps_collector
|
|
"""
|
|
|
|
import asyncio
|
|
import hashlib
|
|
import re
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from urllib.parse import urljoin, urlparse
|
|
|
|
from sqlalchemy import select
|
|
|
|
from core.config import settings
|
|
from core.crawl_politeness import (
|
|
CrawlBlocked,
|
|
CrawlFetchError,
|
|
CrawlSkip,
|
|
download_via_browser,
|
|
fetch_page_via_browser,
|
|
)
|
|
from core.database import async_session
|
|
from core.utils import setup_logger
|
|
from models.document import Document
|
|
from models.news_source import NewsSource
|
|
from models.queue import enqueue_stage
|
|
from workers.kosha_collector import _safe_filename
|
|
from workers.news_collector import (
|
|
_get_or_create_health,
|
|
_record_failure,
|
|
_record_success,
|
|
)
|
|
|
|
logger = setup_logger("ccps_collector")
|
|
|
|
_BEACON_URL = "https://www.aiche.org/ccps/resources/process-safety-beacon"
|
|
_SOURCE_NAME = "CCPS Process Safety Beacon"
|
|
_MAX_PDFS_PER_RUN = 10 # 월간 1~2건(영/한) 페이스 — 페이지 구조 오판 시 폭주 방지
|
|
|
|
|
|
def _beacon_pdf_links(html_text: str, base_url: str) -> list[str]:
|
|
"""beacon 관련 PDF 링크 — href/앵커텍스트에 'beacon' 포함만 (보수적).
|
|
|
|
필터에 안 걸린 PDF 가 있으면 호출측이 로그로 가시화 (첫 실측에서 패턴 보정용).
|
|
"""
|
|
seen: set[str] = set()
|
|
out: list[str] = []
|
|
for m in re.finditer(
|
|
r'<a\s+[^>]*href="([^"]+\.pdf(?:\?[^"]*)?)"[^>]*>(.*?)</a>',
|
|
html_text, re.I | re.S,
|
|
):
|
|
href, text = m.group(1), re.sub(r"<[^>]+>", " ", m.group(2))
|
|
if "beacon" not in href.lower() and "beacon" not in text.lower():
|
|
continue
|
|
absolute = urljoin(base_url, href)
|
|
path = urlparse(absolute).path
|
|
if path not in seen:
|
|
seen.add(path)
|
|
out.append(absolute)
|
|
return out
|
|
|
|
|
|
def _all_pdf_hrefs(html_text: str) -> list[str]:
|
|
return sorted({m.group(1) for m in re.finditer(r'href="([^"]+\.pdf(?:\?[^"]*)?)"', html_text, re.I)})
|
|
|
|
|
|
async def _get_or_create_source(session) -> NewsSource:
|
|
result = await session.execute(
|
|
select(NewsSource).where(NewsSource.name == _SOURCE_NAME)
|
|
)
|
|
source = result.scalars().first()
|
|
if source is None:
|
|
source = NewsSource(
|
|
name=_SOURCE_NAME, feed_url=_BEACON_URL, feed_type="rss",
|
|
fetch_method="page", fulltext_policy="none",
|
|
source_channel="crawl", category="Safety", language="en", country="US",
|
|
enabled=False, # 6h 뉴스 사이클 비대상 — 본 워커가 monthly 폴링
|
|
)
|
|
session.add(source)
|
|
await session.flush()
|
|
return source
|
|
|
|
|
|
async def _ingest_pdf(session, pdf_url: str) -> bool:
|
|
"""Beacon PDF 1건 → NAS 저장 + Document + extract enqueue. 반환 = 신규 여부."""
|
|
fname = _safe_filename(Path(urlparse(pdf_url).path).name)
|
|
rel_path = f"crawl_raw/ccps_beacon/{fname}"
|
|
existing = await session.execute(
|
|
select(Document).where(Document.file_path == rel_path).limit(1)
|
|
)
|
|
if existing.scalars().first():
|
|
return False
|
|
|
|
content, content_type = await download_via_browser(pdf_url, referer=_BEACON_URL)
|
|
if "pdf" not in content_type.lower() and not content.startswith(b"%PDF"):
|
|
raise CrawlSkip(f"PDF 아님 (content-type={content_type[:60]}): {pdf_url}")
|
|
|
|
dest = Path(settings.nas_mount_path) / rel_path
|
|
dest.parent.mkdir(parents=True, exist_ok=True)
|
|
dest.write_bytes(content)
|
|
|
|
doc = Document(
|
|
file_path=rel_path,
|
|
file_hash=hashlib.sha256(content).hexdigest(),
|
|
file_format="pdf",
|
|
file_size=len(content),
|
|
file_type="immutable",
|
|
title=fname.rsplit(".", 1)[0].replace("_", " ").replace("-", " "),
|
|
source_channel="crawl",
|
|
data_origin="external",
|
|
import_source="ccps_beacon",
|
|
edit_url=pdf_url,
|
|
ai_tags=["Safety/CCPS Beacon"],
|
|
extract_meta={"ccps": {"kind": "beacon_pdf"}},
|
|
)
|
|
session.add(doc)
|
|
await session.flush()
|
|
await enqueue_stage(session, doc.id, "extract")
|
|
logger.info(f"[ccps] Beacon ingest: {rel_path} ({len(content)} bytes)")
|
|
return True
|
|
|
|
|
|
async def run() -> None:
|
|
"""monthly 진입점 — 실패는 health 기록 (circuit 가 A-8 패널 가시화)."""
|
|
now = datetime.now(timezone.utc)
|
|
async with async_session() as session:
|
|
source = await _get_or_create_source(session)
|
|
await session.commit()
|
|
source_id = source.id
|
|
|
|
try:
|
|
html_text, final_url = await fetch_page_via_browser(_BEACON_URL, profile=None)
|
|
links = _beacon_pdf_links(html_text, final_url)
|
|
if not links:
|
|
others = _all_pdf_hrefs(html_text)
|
|
# 필터 0건 = 페이지 구조/명명 변경 가능성 — 발견 PDF 를 가시화해 보정 단서 제공
|
|
raise CrawlFetchError(
|
|
f"beacon PDF 0건 (전체 PDF {len(others)}건: {others[:5]})"
|
|
)
|
|
|
|
new_count = 0
|
|
for pdf_url in links[:_MAX_PDFS_PER_RUN]:
|
|
async with async_session() as session:
|
|
try:
|
|
if await _ingest_pdf(session, pdf_url):
|
|
new_count += 1
|
|
await session.commit()
|
|
except (CrawlBlocked, CrawlSkip, CrawlFetchError) as e:
|
|
await session.rollback()
|
|
logger.warning(f"[ccps] PDF 실패 skip ({pdf_url}): {e}")
|
|
if len(links) > _MAX_PDFS_PER_RUN:
|
|
logger.warning(
|
|
f"[ccps] PDF {len(links)}건 중 {_MAX_PDFS_PER_RUN}건만 처리 "
|
|
f"(월간 1~2건 가정 초과 — 페이지 구조 확인 필요)"
|
|
)
|
|
|
|
async with async_session() as session:
|
|
health = await _get_or_create_health(session, source_id)
|
|
_record_success(health, new_count, False, now)
|
|
src = await session.get(NewsSource, source_id)
|
|
src.last_fetched_at = now
|
|
await session.commit()
|
|
logger.info(f"[ccps] 완료: 신규 {new_count}건 (링크 {len(links)}건)")
|
|
except (CrawlBlocked, CrawlSkip, CrawlFetchError) as e:
|
|
# CrawlBlocked = WAF 헤드리스 차단 신호 — 연속되면 circuit open (PARK 판단 근거)
|
|
logger.error(f"[ccps] 수집 실패: {type(e).__name__}: {e}")
|
|
async with async_session() as session:
|
|
health = await _get_or_create_health(session, source_id)
|
|
_record_failure(health, str(e) or repr(e), now)
|
|
await session.commit()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(run())
|