Files
hyungi_document_server/app/workers/csb_collector.py
T
hyungi 8583465c58 feat(news): crawl-24x7 사이클 3 — B-4 시그널·C-4 공학 지속·CSB sitemap·CCPS Beacon (마이그 327)
- B-4 fetch_method='signal-only': 페이지 fetch 0 + summarize 스킵(검색 색인만,
  맥미니 부하 0) + 본문 무절단(_entry_body — arXiv 초록 1.6K 보존). 다이제스트는
  ai_summary NULL 제외 규칙으로 자연 배제. 레지스트리 오설정(page) 방어 가드.
- 시드 9 소스 (전 URL 2026-06-11 live 검증): Bloomberg Markets/Technology(skip-video,
  비디오 혼재 실측)·Economist Latest·Nikkei Asia(RDF — feedparser 네이티브, 분기 불요
  fixture 박제)·ASME JPVT(site_1000037 실측 매핑)·arXiv 2종·IEEE Spectrum 2종(feed-full,
  피드 description 이 전문 7.9~14K자 실측).
- csb_collector: sitemap lastmod diff (weekly 월 06:50) — 워터마크(selector_override)
  + cap 40/회 점진 백필 + diff sanity 300 + 보고서 PDF(/assets/, recommendation 제외)
  → extract 파이프라인. 초기 일괄 = CLI --bulk.
- api_standards_collector: 공지 목록 링크 파싱(실측 — 페이지 diff 아님, 상세 URL
  10건/페이지) → 신규 상세만 ingest (monthly 5일 07:05). 초기 백필 = CLI --bulk.
- ccps_collector: aiche.org 평문 403(UA 무관 실측) → playwright-fetcher 익명 컨텍스트
  + referer 쿠키 승계 /download(base64) 신설로 월간 Beacon PDF (monthly 5일 07:20).
  헤드리스 차단 시 CrawlBlocked → health 가시화 (르몽드 PARK 선례).
- B-5 잔여: rdf/feed-reader-UA = 코드 분기 불요 실측 박제 (Economist 는 Archiver UA
  200). table-strip/gn-redirect 는 해당 소스 미진입 — 백로그 유지.
- 테스트 24건 신규 (fixture 9건 live 박제, economist/ieee 는 item trim) — 39 passed.
- 마이그 327 단일 statement (PKM 트랙과 번호 경합 주의 — 327 본 트랙 선점).

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-06-11 07:13:17 +09:00

391 lines
15 KiB
Python

"""C-2 잔여 ① US CSB sitemap diff 수집 워커 (plan crawl-24x7-1, 사이클 3).
RSS 폐지 → sitemap.xml lastmod diff 폴링이 정석 (정부 사이트라 lastmod 양호 —
2026-06-11 실측 1,307 URL, 조사 보고서 페이지는 루트 슬러그). 페이지 본문(4-tier
≥200자 게이트) + 보고서 PDF(/assets/, recommendation 상태요약 제외) →
기존 extract 파이프라인(marker/kordoc) 재사용.
스케줄 = weekly (main.py 월 06:50 KST):
워터마크(selector_override.sitemap_watermark — B-3 probe 설정과 같은 JSONB 슬롯)
이후 lastmod 만, 오래된 것부터 cap(40페이지/회). 워터마크는 처리분까지만 전진
= 잔량 자동 점진 백필 (KOSHA GUIDE cap 패턴). cap 미처리 잔량은 매회 로그
(silent cap 금지). diff 건수 > sanity(300) = sitemap 부패/lastmod 남발 의심 가시 경고.
초기 일괄 (cap 해제, politeness 로 수 시간 — docker exec -d, 진행 중 같은 서비스
재배포 금지 [[feedback_docker_exec_orphan_kill]] 자매 함정):
docker exec hyungi_document_server-fastapi-1 \
python -m workers.csb_collector --limit 3 # 검증용
docker exec -d hyungi_document_server-fastapi-1 \
python -m workers.csb_collector --bulk # 전체
멱등: 페이지 = edit_url(정규화)+file_hash dedup (first-wins — lastmod 갱신 페이지의
본문 재적재는 안 함, 갱신의 실체인 신규 PDF 는 개별 dedup 으로 적재됨).
PDF = file_path dedup. 워터마크 경계는 >= 재조회 — 경계 페이지 1회 재fetch 후
dedup 이 잡는다 (lastmod 실측 distinct 라 누적 재fetch 없음).
"""
import argparse
import asyncio
import hashlib
import random
import re
from datetime import datetime, timezone
from pathlib import Path
from urllib.parse import urljoin, urlparse
import httpx
from sqlalchemy import select
from core.config import settings
from core.crawl_politeness import (
CRAWL_UA,
CrawlBlocked,
CrawlFetchError,
CrawlSkip,
fetch_page,
)
from core.database import async_session
from core.utils import setup_logger
from models.document import Document
from models.news_source import NewsSource
from models.queue import enqueue_stage
from workers.fulltext_worker import (
_WEB_MIN_BODY_LEN,
_extract_body,
_raw_html_path,
_save_raw_html,
_strip_article_footer,
)
from workers.kosha_collector import _safe_filename
from workers.news_collector import (
FeedError,
_get_or_create_health,
_normalize_url,
_record_failure,
_record_success,
)
from workers.static_corpus_ingest import _page_title
logger = setup_logger("csb_collector")
_SITEMAP_URL = "https://www.csb.gov/sitemap.xml"
_SOURCE_NAME = "US CSB 사고조사보고서"
_RUN_PAGE_CAP = 40 # weekly 1회 처리 상한 — 잔량은 워터마크 미전진으로 자동 이월
_DIFF_SANITY = 300 # 주간 diff 가 이를 넘으면 sitemap lastmod 남발/부패 의심 (카드 C-2)
_MAX_PDF_BYTES = 50 * 1024 * 1024
_PDF_DELAY = (2.0, 5.0) # 같은 도메인 연속 PDF 다운로드 간격 (kosha _DOWNLOAD_DELAY 동률)
# 텍스트 코퍼스 무가치/관리성 섹션 — 첫 path segment 기준 (조사 보고서·뉴스 릴리스는
# 루트 슬러그라 영향 없음. /news/·/investigations/ 는 목록 페이지뿐이라 제외).
_SKIP_FIRST_SEGMENT = {
"videos", "photos", "events", "members", "disclaimers", "media-room",
"about-the-csb", "about-us", "foia", "news", "investigations",
"site-map", "subscribe", "unsubscribe", "optout", "test",
"privacy-policy", "vulnerability-disclosure-policy", "en-espanol",
"newsletter", "recom-stats", "500.aspx", "documents", "records-details",
}
def _parse_sitemap(xml_text: str) -> list[tuple[str, datetime]]:
"""(url, lastmod) 목록 — lastmod 없는/파싱불가 항목은 제외 (diff 축이 없음)."""
out: list[tuple[str, datetime]] = []
for m in re.finditer(
r"<url>\s*<loc>([^<]+)</loc>\s*<lastmod>([^<]+)</lastmod>", xml_text
):
try:
lastmod = datetime.fromisoformat(m.group(2).strip())
except ValueError:
continue
if lastmod.tzinfo is None:
lastmod = lastmod.replace(tzinfo=timezone.utc)
out.append((m.group(1).strip(), lastmod))
return out
def _should_skip(url: str) -> bool:
path = urlparse(url).path.strip("/")
if not path:
return True # 홈
return path.split("/", 1)[0].lower() in _SKIP_FIRST_SEGMENT
def _pdf_links(html_text: str, base_url: str) -> list[str]:
"""페이지 내 보고서 PDF — /assets/recommendation/(상태변경 요약 다수)은 제외.
cache-buster 쿼리(?17346)는 다운로드 URL 에는 유지, dedup/파일명은 path 기준.
"""
seen: set[str] = set()
out: list[str] = []
for m in re.finditer(r'href="([^"]+\.pdf(?:\?[^"]*)?)"', html_text, re.I):
absolute = urljoin(base_url, m.group(1))
path = urlparse(absolute).path
if "/assets/recommendation/" in path.lower():
continue
if (urlparse(absolute).hostname or "").lower() != "www.csb.gov":
continue
if path not in seen:
seen.add(path)
out.append(absolute)
return out
async def _download_pdf(url: str, dest: Path) -> int:
"""PDF 다운로드 — 크기 cap + 연속 간격 (politeness 는 순차 실행 전제)."""
await asyncio.sleep(random.uniform(*_PDF_DELAY))
async with httpx.AsyncClient(timeout=60, follow_redirects=True) as client:
resp = await client.get(url, headers={"User-Agent": CRAWL_UA})
if resp.status_code != 200:
raise FeedError(f"PDF 다운로드 {resp.status_code}: {url}")
if len(resp.content) > _MAX_PDF_BYTES:
raise FeedError(f"PDF 크기 초과 ({len(resp.content)} bytes): {url}")
dest.parent.mkdir(parents=True, exist_ok=True)
dest.write_bytes(resp.content)
return len(resp.content)
async def _get_or_create_source(session) -> NewsSource:
result = await session.execute(
select(NewsSource).where(NewsSource.name == _SOURCE_NAME)
)
source = result.scalars().first()
if source is None:
source = NewsSource(
name=_SOURCE_NAME, feed_url=_SITEMAP_URL, feed_type="rss",
fetch_method="sitemap+page", fulltext_policy="none",
source_channel="crawl", category="Safety", language="en", country="US",
enabled=False, # 6h 뉴스 사이클 비대상 — 본 워커가 weekly 폴링
)
session.add(source)
await session.flush()
return source
def _watermark(source: NewsSource) -> datetime | None:
raw = (source.selector_override or {}).get("sitemap_watermark")
if not raw:
return None
try:
return datetime.fromisoformat(raw)
except ValueError:
return None
def _set_watermark(source: NewsSource, value: datetime) -> None:
# JSONB 변경 감지를 위해 dict 재할당 (fulltext_worker._set_fulltext_meta 동일 규약)
cfg = dict(source.selector_override or {})
cfg["sitemap_watermark"] = value.isoformat()
source.selector_override = cfg
async def _ingest_pdf(session, page_slug: str, pdf_url: str) -> bool:
"""PDF 1건 → NAS 저장 + Document + extract enqueue. 반환 = 신규 여부."""
fname = _safe_filename(Path(urlparse(pdf_url).path).name)
rel_path = f"crawl_raw/csb/{page_slug}/{fname}"
existing = await session.execute(
select(Document).where(Document.file_path == rel_path).limit(1)
)
if existing.scalars().first():
return False
dest = Path(settings.nas_mount_path) / rel_path
size = await _download_pdf(pdf_url, dest)
doc = Document(
file_path=rel_path,
file_hash=hashlib.sha256(dest.read_bytes()).hexdigest(),
file_format="pdf",
file_size=size,
file_type="immutable",
title=fname.rsplit(".", 1)[0].replace("_", " "),
source_channel="crawl",
data_origin="external",
import_source="csb_sitemap",
edit_url=pdf_url,
ai_tags=["Safety/CSB/보고서"],
extract_meta={"csb": {"page_slug": page_slug, "kind": "report_pdf"}},
)
session.add(doc)
await session.flush()
await enqueue_stage(session, doc.id, "extract")
logger.info(f"[csb] PDF ingest: {rel_path} ({size} bytes)")
return True
async def _ingest_url(session, source: NewsSource, url: str, lastmod: datetime) -> dict:
"""변경 URL 1건: 페이지 fetch → PDF 전수 스캔(개별 dedup) + 본문 신규면 적재.
페이지 재방문(lastmod 갱신)에서도 PDF 스캔은 항상 수행 — 갱신의 실체
(최종 보고서 추가 등)가 PDF 로 오는 경우가 핵심 가치다.
"""
counts = {"page": 0, "pdf": 0, "skip": 0}
try:
html_text, final_url = await fetch_page(url)
except (CrawlBlocked, CrawlSkip, CrawlFetchError) as e:
logger.warning(f"[csb] fetch 실패 skip: {url}{type(e).__name__}: {e}")
counts["skip"] = 1
return counts
page_slug = _safe_filename(urlparse(url).path.strip("/").split("/")[-1] or "root")
for pdf_url in _pdf_links(html_text, final_url):
try:
if await _ingest_pdf(session, page_slug, pdf_url):
counts["pdf"] += 1
except FeedError as e:
logger.warning(f"[csb] PDF 실패 skip ({pdf_url}): {e}")
# 페이지 본문 — first-wins (이미 있으면 본문 재적재 없음)
normalized_url = _normalize_url(url)
page_hash = hashlib.sha256(f"csb-page|{normalized_url}".encode()).hexdigest()[:32]
existing = await session.execute(
select(Document).where(
(Document.file_hash == page_hash)
| (Document.edit_url.in_([normalized_url, url]))
).limit(1)
)
if existing.scalars().first():
return counts
body, engine, engine_ver = _extract_body(html_text)
if not engine:
logger.info(f"[csb] 본문 부족 — 페이지 비적재 (PDF 만): {url}")
return counts
clean_body = _strip_article_footer(body.replace("\x00", ""))
if len(clean_body) < _WEB_MIN_BODY_LEN:
return counts
now = datetime.now(timezone.utc)
raw_path = _raw_html_path(source.id, page_hash, now)
raw_saved = True
try:
_save_raw_html(raw_path, html_text)
except OSError as e:
raw_saved = False
logger.error(f"[csb] 원본 보존 실패 (ingest 는 진행): {e}")
title = _page_title(html_text, fallback=page_slug.replace("-", " ")[:90])
doc = Document(
file_path=f"crawl/{_SOURCE_NAME}/{page_hash}",
file_hash=page_hash,
file_format="article",
file_size=0,
file_type="note",
title=title,
extracted_text=f"{title}\n\n{clean_body}",
extracted_at=now,
extractor_version=f"sitemap+page@{engine}",
md_content=clean_body,
md_status="success",
md_extraction_engine=engine,
md_extraction_engine_version=engine_ver,
md_format_version="1.0",
md_generated_at=now,
md_source_hash=hashlib.sha256(html_text.encode("utf-8", errors="replace")).hexdigest(),
md_content_hash=hashlib.sha256(clean_body.encode("utf-8")).hexdigest(),
content_origin="extracted",
source_channel="crawl",
data_origin="external",
edit_url=normalized_url,
review_status="approved",
ai_domain="Safety",
ai_sub_group=_SOURCE_NAME,
ai_tags=["Safety/CSB"],
extract_meta={
"source_id": source.id,
"source_name": _SOURCE_NAME,
"published_at": lastmod.isoformat(),
"fulltext": {
"status": "csb_sitemap",
"engine": engine,
"final_url": final_url,
"raw_html_path": str(raw_path) if raw_saved else None,
"body_chars": len(clean_body),
"resolved_at": now.isoformat(),
},
},
)
doc.file_size = len(doc.extracted_text.encode())
session.add(doc)
await session.flush()
await enqueue_stage(session, doc.id, "summarize")
await enqueue_stage(session, doc.id, "embed")
await enqueue_stage(session, doc.id, "chunk")
counts["page"] = 1
logger.info(f"[csb] page ingest {len(clean_body)}자 ({engine}): {title[:60]}")
return counts
async def run(bulk: bool = False, limit: int = 0) -> None:
"""weekly 진입점 (스케줄러) — bulk/limit 은 CLI 전용."""
now = datetime.now(timezone.utc)
async with async_session() as session:
source = await _get_or_create_source(session)
await session.commit()
source_id = source.id
watermark = _watermark(source)
try:
xml_text, _ = await fetch_page(
_SITEMAP_URL, content_types=("text/xml", "application/xml", "text/html")
)
entries = _parse_sitemap(xml_text)
if not entries:
raise FeedError("sitemap 파싱 0건 — 포맷 변경/부패 의심")
except (CrawlBlocked, CrawlSkip, CrawlFetchError, FeedError) as e:
logger.error(f"[csb] sitemap 수집 실패: {e}")
async with async_session() as session:
health = await _get_or_create_health(session, source_id)
_record_failure(health, str(e) or repr(e), now)
await session.commit()
return
changed = sorted(
(
(url, lastmod) for url, lastmod in entries
if not _should_skip(url) and (watermark is None or lastmod >= watermark)
),
key=lambda pair: pair[1],
)
if watermark is not None and len(changed) > _DIFF_SANITY:
logger.error(
f"[csb] diff {len(changed)}건 > sanity {_DIFF_SANITY}"
f"sitemap lastmod 남발/부패 의심 (cap 처리는 계속, 관찰 필요)"
)
cap = len(changed) if bulk else _RUN_PAGE_CAP
if limit:
cap = min(cap, limit)
todo, deferred = changed[:cap], max(len(changed) - cap, 0)
logger.info(
f"[csb] sitemap {len(entries)}건 중 변경 {len(changed)}건, 처리 {len(todo)}"
+ (f" (잔여 {deferred}건 — 워터마크 미전진으로 자동 이월)" if deferred else "")
)
totals = {"page": 0, "pdf": 0, "skip": 0}
for i, (url, lastmod) in enumerate(todo, 1):
async with async_session() as session:
src = await session.get(NewsSource, source_id)
counts = await _ingest_url(session, src, url, lastmod)
_set_watermark(src, lastmod)
await session.commit()
for k in totals:
totals[k] += counts[k]
if i % 10 == 0:
logger.info(f"[csb] 진행 {i}/{len(todo)} {totals}")
async with async_session() as session:
health = await _get_or_create_health(session, source_id)
_record_success(health, totals["page"] + totals["pdf"], False, now)
src = await session.get(NewsSource, source_id)
src.last_fetched_at = now
await session.commit()
logger.info(f"[csb] 완료: {totals} (변경 {len(changed)}건 중 {len(todo)}건 처리)")
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="CSB sitemap diff 수집")
parser.add_argument("--bulk", action="store_true", help="cap 해제 — 초기 일괄")
parser.add_argument("--limit", type=int, default=0, help="처리 상한 (검증용)")
args = parser.parse_args()
asyncio.run(run(bulk=args.bulk, limit=args.limit))