"""C-2 잔여 ① US CSB sitemap diff 수집 워커 (plan crawl-24x7-1, 사이클 3). RSS 폐지 → sitemap.xml lastmod diff 폴링이 정석 (정부 사이트라 lastmod 양호 — 2026-06-11 실측 1,307 URL, 조사 보고서 페이지는 루트 슬러그). 페이지 본문(4-tier ≥200자 게이트) + 보고서 PDF(/assets/, recommendation 상태요약 제외) → 기존 extract 파이프라인(marker/kordoc) 재사용. 스케줄 = weekly (main.py 월 06:50 KST): 워터마크(selector_override.sitemap_watermark — B-3 probe 설정과 같은 JSONB 슬롯) 이후 lastmod 만, 오래된 것부터 cap(40페이지/회). 워터마크는 처리분까지만 전진 = 잔량 자동 점진 백필 (KOSHA GUIDE cap 패턴). cap 미처리 잔량은 매회 로그 (silent cap 금지). diff 건수 > sanity(300) = sitemap 부패/lastmod 남발 의심 가시 경고. 초기 일괄 (cap 해제, politeness 로 수 시간 — docker exec -d, 진행 중 같은 서비스 재배포 금지 [[feedback_docker_exec_orphan_kill]] 자매 함정): docker exec hyungi_document_server-fastapi-1 \ python -m workers.csb_collector --limit 3 # 검증용 docker exec -d hyungi_document_server-fastapi-1 \ python -m workers.csb_collector --bulk # 전체 멱등: 페이지 = edit_url(정규화)+file_hash dedup (first-wins — lastmod 갱신 페이지의 본문 재적재는 안 함, 갱신의 실체인 신규 PDF 는 개별 dedup 으로 적재됨). PDF = file_path dedup. 워터마크 경계는 >= 재조회 — 경계 페이지 1회 재fetch 후 dedup 이 잡는다 (lastmod 실측 distinct 라 누적 재fetch 없음). """ import argparse import asyncio import hashlib import random import re from datetime import datetime, timezone from pathlib import Path from urllib.parse import urljoin, urlparse import httpx from sqlalchemy import select from core.config import settings from core.crawl_politeness import ( CRAWL_UA, CrawlBlocked, CrawlFetchError, CrawlSkip, fetch_page, ) from core.database import async_session from core.utils import setup_logger from models.document import Document from models.news_source import NewsSource from models.queue import enqueue_stage from workers.fulltext_worker import ( _WEB_MIN_BODY_LEN, _extract_body, _raw_html_path, _save_raw_html, _strip_article_footer, ) from workers.kosha_collector import _safe_filename from workers.news_collector import ( FeedError, _get_or_create_health, _normalize_url, _record_failure, _record_success, ) from workers.static_corpus_ingest import _page_title logger = setup_logger("csb_collector") _SITEMAP_URL = "https://www.csb.gov/sitemap.xml" _SOURCE_NAME = "US CSB 사고조사보고서" _RUN_PAGE_CAP = 40 # weekly 1회 처리 상한 — 잔량은 워터마크 미전진으로 자동 이월 _DIFF_SANITY = 300 # 주간 diff 가 이를 넘으면 sitemap lastmod 남발/부패 의심 (카드 C-2) _MAX_PDF_BYTES = 50 * 1024 * 1024 _PDF_DELAY = (2.0, 5.0) # 같은 도메인 연속 PDF 다운로드 간격 (kosha _DOWNLOAD_DELAY 동률) # 텍스트 코퍼스 무가치/관리성 섹션 — 첫 path segment 기준 (조사 보고서·뉴스 릴리스는 # 루트 슬러그라 영향 없음. /news/·/investigations/ 는 목록 페이지뿐이라 제외). _SKIP_FIRST_SEGMENT = { "videos", "photos", "events", "members", "disclaimers", "media-room", "about-the-csb", "about-us", "foia", "news", "investigations", "site-map", "subscribe", "unsubscribe", "optout", "test", "privacy-policy", "vulnerability-disclosure-policy", "en-espanol", "newsletter", "recom-stats", "500.aspx", "documents", "records-details", } def _parse_sitemap(xml_text: str) -> list[tuple[str, datetime]]: """(url, lastmod) 목록 — lastmod 없는/파싱불가 항목은 제외 (diff 축이 없음).""" out: list[tuple[str, datetime]] = [] for m in re.finditer( r"\s*([^<]+)\s*([^<]+)", xml_text ): try: lastmod = datetime.fromisoformat(m.group(2).strip()) except ValueError: continue if lastmod.tzinfo is None: lastmod = lastmod.replace(tzinfo=timezone.utc) out.append((m.group(1).strip(), lastmod)) return out def _should_skip(url: str) -> bool: path = urlparse(url).path.strip("/") if not path: return True # 홈 return path.split("/", 1)[0].lower() in _SKIP_FIRST_SEGMENT def _pdf_links(html_text: str, base_url: str) -> list[str]: """페이지 내 보고서 PDF — /assets/recommendation/(상태변경 요약 다수)은 제외. cache-buster 쿼리(?17346)는 다운로드 URL 에는 유지, dedup/파일명은 path 기준. """ seen: set[str] = set() out: list[str] = [] for m in re.finditer(r'href="([^"]+\.pdf(?:\?[^"]*)?)"', html_text, re.I): absolute = urljoin(base_url, m.group(1)) path = urlparse(absolute).path if "/assets/recommendation/" in path.lower(): continue if (urlparse(absolute).hostname or "").lower() != "www.csb.gov": continue if path not in seen: seen.add(path) out.append(absolute) return out async def _download_pdf(url: str, dest: Path) -> int: """PDF 다운로드 — 크기 cap + 연속 간격 (politeness 는 순차 실행 전제).""" await asyncio.sleep(random.uniform(*_PDF_DELAY)) async with httpx.AsyncClient(timeout=60, follow_redirects=True) as client: resp = await client.get(url, headers={"User-Agent": CRAWL_UA}) if resp.status_code != 200: raise FeedError(f"PDF 다운로드 {resp.status_code}: {url}") if len(resp.content) > _MAX_PDF_BYTES: raise FeedError(f"PDF 크기 초과 ({len(resp.content)} bytes): {url}") dest.parent.mkdir(parents=True, exist_ok=True) # 최대 50MB PDF write 는 동기 blocking — 이벤트루프 점유 회피 to_thread (R5 동형). await asyncio.to_thread(dest.write_bytes, resp.content) return len(resp.content) async def _get_or_create_source(session) -> NewsSource: result = await session.execute( select(NewsSource).where(NewsSource.name == _SOURCE_NAME) ) source = result.scalars().first() if source is None: source = NewsSource( name=_SOURCE_NAME, feed_url=_SITEMAP_URL, feed_type="rss", fetch_method="sitemap+page", fulltext_policy="none", source_channel="crawl", category="Safety", language="en", country="US", enabled=False, # 6h 뉴스 사이클 비대상 — 본 워커가 weekly 폴링 ) session.add(source) await session.flush() return source def _watermark(source: NewsSource) -> datetime | None: raw = (source.selector_override or {}).get("sitemap_watermark") if not raw: return None try: return datetime.fromisoformat(raw) except ValueError: return None def _set_watermark(source: NewsSource, value: datetime) -> None: # JSONB 변경 감지를 위해 dict 재할당 (fulltext_worker._set_fulltext_meta 동일 규약) cfg = dict(source.selector_override or {}) cfg["sitemap_watermark"] = value.isoformat() source.selector_override = cfg async def _ingest_pdf(session, page_slug: str, pdf_url: str) -> bool: """PDF 1건 → NAS 저장 + Document + extract enqueue. 반환 = 신규 여부.""" fname = _safe_filename(Path(urlparse(pdf_url).path).name) rel_path = f"crawl_raw/csb/{page_slug}/{fname}" existing = await session.execute( select(Document).where(Document.file_path == rel_path).limit(1) ) if existing.scalars().first(): return False dest = Path(settings.nas_mount_path) / rel_path size = await _download_pdf(pdf_url, dest) # 50MB PDF read + sha256 는 동기 blocking(I/O+CPU) — 이벤트루프 점유 회피 to_thread (R5 동형). file_hash = await asyncio.to_thread(lambda: hashlib.sha256(dest.read_bytes()).hexdigest()) doc = Document( file_path=rel_path, file_hash=file_hash, file_format="pdf", file_size=size, file_type="immutable", title=fname.rsplit(".", 1)[0].replace("_", " "), source_channel="crawl", data_origin="external", import_source="csb_sitemap", edit_url=pdf_url, ai_tags=["Safety/CSB/보고서"], # 안전 자료실 A-2 — ingest 시점 deterministic. CSB = 미 연방기관 = public domain. material_type="incident", jurisdiction="US", extract_meta={"csb": {"page_slug": page_slug, "kind": "report_pdf"}, "license": {"scheme": "public_domain", "redistribute": True, "attribution": "U.S. Chemical Safety Board"}}, ) session.add(doc) await session.flush() await enqueue_stage(session, doc.id, "extract") logger.info(f"[csb] PDF ingest: {rel_path} ({size} bytes)") return True async def _ingest_url(session, source: NewsSource, url: str, lastmod: datetime) -> dict: """변경 URL 1건: 페이지 fetch → PDF 전수 스캔(개별 dedup) + 본문 신규면 적재. 페이지 재방문(lastmod 갱신)에서도 PDF 스캔은 항상 수행 — 갱신의 실체 (최종 보고서 추가 등)가 PDF 로 오는 경우가 핵심 가치다. """ counts = {"page": 0, "pdf": 0, "skip": 0} try: html_text, final_url = await fetch_page(url) except (CrawlBlocked, CrawlSkip, CrawlFetchError) as e: logger.warning(f"[csb] fetch 실패 skip: {url} — {type(e).__name__}: {e}") counts["skip"] = 1 return counts page_slug = _safe_filename(urlparse(url).path.strip("/").split("/")[-1] or "root") for pdf_url in _pdf_links(html_text, final_url): try: if await _ingest_pdf(session, page_slug, pdf_url): counts["pdf"] += 1 except FeedError as e: logger.warning(f"[csb] PDF 실패 skip ({pdf_url}): {e}") # 페이지 본문 — first-wins (이미 있으면 본문 재적재 없음) normalized_url = _normalize_url(url) page_hash = hashlib.sha256(f"csb-page|{normalized_url}".encode()).hexdigest()[:32] existing = await session.execute( select(Document).where( (Document.file_hash == page_hash) | (Document.edit_url.in_([normalized_url, url])) ).limit(1) ) if existing.scalars().first(): return counts body, engine, engine_ver = _extract_body(html_text) if not engine: logger.info(f"[csb] 본문 부족 — 페이지 비적재 (PDF 만): {url}") return counts clean_body = _strip_article_footer(body.replace("\x00", "")) if len(clean_body) < _WEB_MIN_BODY_LEN: return counts now = datetime.now(timezone.utc) raw_path = _raw_html_path(source.id, page_hash, now) raw_saved = True try: _save_raw_html(raw_path, html_text) except OSError as e: raw_saved = False logger.error(f"[csb] 원본 보존 실패 (ingest 는 진행): {e}") title = _page_title(html_text, fallback=page_slug.replace("-", " ")[:90]) doc = Document( file_path=f"crawl/{_SOURCE_NAME}/{page_hash}", file_hash=page_hash, file_format="article", file_size=0, file_type="note", title=title, extracted_text=f"{title}\n\n{clean_body}", extracted_at=now, extractor_version=f"sitemap+page@{engine}", md_content=clean_body, md_status="success", md_extraction_engine=engine, md_extraction_engine_version=engine_ver, md_format_version="1.0", md_generated_at=now, md_source_hash=hashlib.sha256(html_text.encode("utf-8", errors="replace")).hexdigest(), md_content_hash=hashlib.sha256(clean_body.encode("utf-8")).hexdigest(), content_origin="extracted", source_channel="crawl", data_origin="external", edit_url=normalized_url, review_status="approved", ai_domain="Safety", ai_sub_group=_SOURCE_NAME, ai_tags=["Safety/CSB"], # 안전 자료실 A-2 — ingest 시점 deterministic (classify-skip 경로) material_type="incident", jurisdiction="US", published_date=lastmod.date() if lastmod else None, extract_meta={ "source_id": source.id, "source_name": _SOURCE_NAME, "published_at": lastmod.isoformat(), "license": {"scheme": "public_domain", "redistribute": True, "attribution": "U.S. Chemical Safety Board"}, "fulltext": { "status": "csb_sitemap", "engine": engine, "final_url": final_url, "raw_html_path": str(raw_path) if raw_saved else None, "body_chars": len(clean_body), "resolved_at": now.isoformat(), }, }, ) doc.file_size = len(doc.extracted_text.encode()) session.add(doc) await session.flush() await enqueue_stage(session, doc.id, "summarize") await enqueue_stage(session, doc.id, "embed") await enqueue_stage(session, doc.id, "chunk") counts["page"] = 1 logger.info(f"[csb] page ingest {len(clean_body)}자 ({engine}): {title[:60]}") return counts async def run(bulk: bool = False, limit: int = 0) -> None: """weekly 진입점 (스케줄러) — bulk/limit 은 CLI 전용.""" now = datetime.now(timezone.utc) async with async_session() as session: source = await _get_or_create_source(session) await session.commit() source_id = source.id watermark = _watermark(source) try: xml_text, _ = await fetch_page( _SITEMAP_URL, content_types=("text/xml", "application/xml", "text/html") ) entries = _parse_sitemap(xml_text) if not entries: raise FeedError("sitemap 파싱 0건 — 포맷 변경/부패 의심") except (CrawlBlocked, CrawlSkip, CrawlFetchError, FeedError) as e: logger.error(f"[csb] sitemap 수집 실패: {e}") async with async_session() as session: health = await _get_or_create_health(session, source_id) _record_failure(health, str(e) or repr(e), now) await session.commit() return changed = sorted( ( (url, lastmod) for url, lastmod in entries if not _should_skip(url) and (watermark is None or lastmod >= watermark) ), key=lambda pair: pair[1], ) if watermark is not None and len(changed) > _DIFF_SANITY: logger.error( f"[csb] diff {len(changed)}건 > sanity {_DIFF_SANITY} — " f"sitemap lastmod 남발/부패 의심 (cap 처리는 계속, 관찰 필요)" ) cap = len(changed) if bulk else _RUN_PAGE_CAP if limit: cap = min(cap, limit) todo, deferred = changed[:cap], max(len(changed) - cap, 0) logger.info( f"[csb] sitemap {len(entries)}건 중 변경 {len(changed)}건, 처리 {len(todo)}건" + (f" (잔여 {deferred}건 — 워터마크 미전진으로 자동 이월)" if deferred else "") ) totals = {"page": 0, "pdf": 0, "skip": 0} for i, (url, lastmod) in enumerate(todo, 1): # 2026-06-20 C2: URL 1건 실패가 주간 run 전체를 중단(이후 URL 스킵·watermark 정지)하던 것 차단. # 각 iteration 은 자체 session(async with) 이라 실패 격리 — 건너뛰고 계속. try: async with async_session() as session: src = await session.get(NewsSource, source_id) counts = await _ingest_url(session, src, url, lastmod) _set_watermark(src, lastmod) await session.commit() except Exception as e: logger.error(f"[csb] URL 처리 실패 (건너뜀): {url} — {str(e) or repr(e)}") continue for k in totals: totals[k] += counts[k] if i % 10 == 0: logger.info(f"[csb] 진행 {i}/{len(todo)} {totals}") async with async_session() as session: health = await _get_or_create_health(session, source_id) _record_success(health, totals["page"] + totals["pdf"], False, now) src = await session.get(NewsSource, source_id) src.last_fetched_at = now await session.commit() logger.info(f"[csb] 완료: {totals} (변경 {len(changed)}건 중 {len(todo)}건 처리)") if __name__ == "__main__": parser = argparse.ArgumentParser(description="CSB sitemap diff 수집") parser.add_argument("--bulk", action="store_true", help="cap 해제 — 초기 일괄") parser.add_argument("--limit", type=int, default=0, help="처리 상한 (검증용)") args = parser.parse_args() asyncio.run(run(bulk=args.bulk, limit=args.limit))