"""C-2 잔여 ② CCPS Process Safety Beacon 수집 워커 (사이클 3). 월간 1페이지 PDF + 한국어 번역판 — RAG 청크로 이상적 크기 (카드 C-2). aiche.org 는 평문 httpx 를 UA 무관 403 (2026-06-11 실측: Archiver UA·브라우저 UA 모두) → playwright-fetcher 익명 컨텍스트 경유 (B-3 인프라 재사용): 목록 페이지 브라우저 fetch → beacon PDF 링크 파싱 → referer 쿠키 승계 다운로드. 알려진 리스크: WAF 가 헤드리스 자체를 차단하면 _CHALLENGE_MARKERS → CrawlBlocked → health 실패 기록 후 종료 (르몽드 B-3 PARK 선례 — 그 경우 대안 = 이메일 구독 .eml 트랙 결합, [[feedback_antibot_headless_subscription_wall]]). 스케줄 = monthly (main.py 5일 07:20 KST). 월간 1건 페이스라 diff 는 file_path dedup 으로 충분. 수동: docker exec hyungi_document_server-fastapi-1 python -m workers.ccps_collector """ import asyncio import hashlib import re from datetime import datetime, timezone from pathlib import Path from urllib.parse import urljoin, urlparse from sqlalchemy import select from core.config import settings from core.crawl_politeness import ( CrawlBlocked, CrawlFetchError, CrawlSkip, download_via_browser, fetch_page_via_browser, ) from core.database import async_session from core.utils import setup_logger from models.document import Document from models.news_source import NewsSource from models.queue import enqueue_stage from workers.kosha_collector import _safe_filename from workers.news_collector import ( _get_or_create_health, _record_failure, _record_success, ) logger = setup_logger("ccps_collector") _BEACON_URL = "https://www.aiche.org/ccps/resources/process-safety-beacon" _SOURCE_NAME = "CCPS Process Safety Beacon" _MAX_PDFS_PER_RUN = 10 # 월간 1~2건(영/한) 페이스 — 페이지 구조 오판 시 폭주 방지 def _beacon_pdf_links(html_text: str, base_url: str) -> list[str]: """beacon 관련 PDF 링크 — href/앵커텍스트에 'beacon' 포함만 (보수적). 필터에 안 걸린 PDF 가 있으면 호출측이 로그로 가시화 (첫 실측에서 패턴 보정용). """ seen: set[str] = set() out: list[str] = [] for m in re.finditer( r']*href="([^"]+\.pdf(?:\?[^"]*)?)"[^>]*>(.*?)', html_text, re.I | re.S, ): href, text = m.group(1), re.sub(r"<[^>]+>", " ", m.group(2)) if "beacon" not in href.lower() and "beacon" not in text.lower(): continue absolute = urljoin(base_url, href) path = urlparse(absolute).path if path not in seen: seen.add(path) out.append(absolute) return out def _all_pdf_hrefs(html_text: str) -> list[str]: return sorted({m.group(1) for m in re.finditer(r'href="([^"]+\.pdf(?:\?[^"]*)?)"', html_text, re.I)}) async def _get_or_create_source(session) -> NewsSource: result = await session.execute( select(NewsSource).where(NewsSource.name == _SOURCE_NAME) ) source = result.scalars().first() if source is None: source = NewsSource( name=_SOURCE_NAME, feed_url=_BEACON_URL, feed_type="rss", fetch_method="page", fulltext_policy="none", source_channel="crawl", category="Safety", language="en", country="US", enabled=False, # 6h 뉴스 사이클 비대상 — 본 워커가 monthly 폴링 ) session.add(source) await session.flush() return source async def _ingest_pdf(session, pdf_url: str) -> bool: """Beacon PDF 1건 → NAS 저장 + Document + extract enqueue. 반환 = 신규 여부.""" fname = _safe_filename(Path(urlparse(pdf_url).path).name) rel_path = f"crawl_raw/ccps_beacon/{fname}" existing = await session.execute( select(Document).where(Document.file_path == rel_path).limit(1) ) if existing.scalars().first(): return False content, content_type = await download_via_browser(pdf_url, referer=_BEACON_URL) if "pdf" not in content_type.lower() and not content.startswith(b"%PDF"): raise CrawlSkip(f"PDF 아님 (content-type={content_type[:60]}): {pdf_url}") dest = Path(settings.nas_mount_path) / rel_path dest.parent.mkdir(parents=True, exist_ok=True) dest.write_bytes(content) doc = Document( file_path=rel_path, file_hash=hashlib.sha256(content).hexdigest(), file_format="pdf", file_size=len(content), file_type="immutable", title=fname.rsplit(".", 1)[0].replace("_", " ").replace("-", " "), source_channel="crawl", data_origin="external", import_source="ccps_beacon", edit_url=pdf_url, ai_tags=["Safety/CCPS Beacon"], extract_meta={"ccps": {"kind": "beacon_pdf"}}, ) session.add(doc) await session.flush() await enqueue_stage(session, doc.id, "extract") logger.info(f"[ccps] Beacon ingest: {rel_path} ({len(content)} bytes)") return True async def run() -> None: """monthly 진입점 — 실패는 health 기록 (circuit 가 A-8 패널 가시화).""" now = datetime.now(timezone.utc) async with async_session() as session: source = await _get_or_create_source(session) await session.commit() source_id = source.id try: html_text, final_url = await fetch_page_via_browser(_BEACON_URL, profile=None) links = _beacon_pdf_links(html_text, final_url) if not links: others = _all_pdf_hrefs(html_text) # 필터 0건 = 페이지 구조/명명 변경 가능성 — 발견 PDF 를 가시화해 보정 단서 제공 raise CrawlFetchError( f"beacon PDF 0건 (전체 PDF {len(others)}건: {others[:5]})" ) new_count = 0 for pdf_url in links[:_MAX_PDFS_PER_RUN]: async with async_session() as session: try: if await _ingest_pdf(session, pdf_url): new_count += 1 await session.commit() except (CrawlBlocked, CrawlSkip, CrawlFetchError) as e: await session.rollback() logger.warning(f"[ccps] PDF 실패 skip ({pdf_url}): {e}") if len(links) > _MAX_PDFS_PER_RUN: logger.warning( f"[ccps] PDF {len(links)}건 중 {_MAX_PDFS_PER_RUN}건만 처리 " f"(월간 1~2건 가정 초과 — 페이지 구조 확인 필요)" ) async with async_session() as session: health = await _get_or_create_health(session, source_id) _record_success(health, new_count, False, now) src = await session.get(NewsSource, source_id) src.last_fetched_at = now await session.commit() logger.info(f"[ccps] 완료: 신규 {new_count}건 (링크 {len(links)}건)") except (CrawlBlocked, CrawlSkip, CrawlFetchError) as e: # CrawlBlocked = WAF 헤드리스 차단 신호 — 연속되면 circuit open (PARK 판단 근거) logger.error(f"[ccps] 수집 실패: {type(e).__name__}: {e}") async with async_session() as session: health = await _get_or_create_health(session, source_id) _record_failure(health, str(e) or repr(e), now) await session.commit() if __name__ == "__main__": asyncio.run(run())