Files
hyungi_document_server/app/workers/csb_collector.py
T
hyungi 3feddd012b feat(safety): A-2 수집기 ingest 시점 분류 축 부여 — 레지스트리 전파 + 승인 가드 (mig 352~355)
plan safety-library-1 A-2 (classify-skip 경로 전수 커버):
- news_sources 에 material_type/license_scheme/license_redistribute + 안전·공학 12행 시드
- news_collector: 레지스트리 → documents 전파 (_material_axis — paper 는 jurisdiction NULL 강제)
- kosha(사례·첨부=incident, GUIDE=guide)/csb(incident·US)/api_std(standard·US)/law_monitor(law·KR)
  /file_watcher(KGS=law·KR 타깃 매핑) deterministic 부여 + extract_meta.license 주입
- published_date: 소스별 가용 날짜 (GUIDE 공표일·CSB lastmod·API 공지일·법령 공포일·뉴스 발행일)
- classify_worker: document_type→material_type 결정적 매핑 제안 (자동 전이 금지)
- accept-suggestion: material 제안 적용 + law=jurisdiction 필수(기본값 없음) + 청크 미러 1문 동기화
- chunk_worker: 비뉴스 문서 country=jurisdiction 미러 (R3-m3: 검색측 country 소비자 0 실측)

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-06-13 06:23:22 +09:00

402 lines
16 KiB
Python

"""C-2 잔여 ① US CSB sitemap diff 수집 워커 (plan crawl-24x7-1, 사이클 3).
RSS 폐지 → sitemap.xml lastmod diff 폴링이 정석 (정부 사이트라 lastmod 양호 —
2026-06-11 실측 1,307 URL, 조사 보고서 페이지는 루트 슬러그). 페이지 본문(4-tier
≥200자 게이트) + 보고서 PDF(/assets/, recommendation 상태요약 제외) →
기존 extract 파이프라인(marker/kordoc) 재사용.
스케줄 = weekly (main.py 월 06:50 KST):
워터마크(selector_override.sitemap_watermark — B-3 probe 설정과 같은 JSONB 슬롯)
이후 lastmod 만, 오래된 것부터 cap(40페이지/회). 워터마크는 처리분까지만 전진
= 잔량 자동 점진 백필 (KOSHA GUIDE cap 패턴). cap 미처리 잔량은 매회 로그
(silent cap 금지). diff 건수 > sanity(300) = sitemap 부패/lastmod 남발 의심 가시 경고.
초기 일괄 (cap 해제, politeness 로 수 시간 — docker exec -d, 진행 중 같은 서비스
재배포 금지 [[feedback_docker_exec_orphan_kill]] 자매 함정):
docker exec hyungi_document_server-fastapi-1 \
python -m workers.csb_collector --limit 3 # 검증용
docker exec -d hyungi_document_server-fastapi-1 \
python -m workers.csb_collector --bulk # 전체
멱등: 페이지 = edit_url(정규화)+file_hash dedup (first-wins — lastmod 갱신 페이지의
본문 재적재는 안 함, 갱신의 실체인 신규 PDF 는 개별 dedup 으로 적재됨).
PDF = file_path dedup. 워터마크 경계는 >= 재조회 — 경계 페이지 1회 재fetch 후
dedup 이 잡는다 (lastmod 실측 distinct 라 누적 재fetch 없음).
"""
import argparse
import asyncio
import hashlib
import random
import re
from datetime import datetime, timezone
from pathlib import Path
from urllib.parse import urljoin, urlparse
import httpx
from sqlalchemy import select
from core.config import settings
from core.crawl_politeness import (
CRAWL_UA,
CrawlBlocked,
CrawlFetchError,
CrawlSkip,
fetch_page,
)
from core.database import async_session
from core.utils import setup_logger
from models.document import Document
from models.news_source import NewsSource
from models.queue import enqueue_stage
from workers.fulltext_worker import (
_WEB_MIN_BODY_LEN,
_extract_body,
_raw_html_path,
_save_raw_html,
_strip_article_footer,
)
from workers.kosha_collector import _safe_filename
from workers.news_collector import (
FeedError,
_get_or_create_health,
_normalize_url,
_record_failure,
_record_success,
)
from workers.static_corpus_ingest import _page_title
logger = setup_logger("csb_collector")
_SITEMAP_URL = "https://www.csb.gov/sitemap.xml"
_SOURCE_NAME = "US CSB 사고조사보고서"
_RUN_PAGE_CAP = 40 # weekly 1회 처리 상한 — 잔량은 워터마크 미전진으로 자동 이월
_DIFF_SANITY = 300 # 주간 diff 가 이를 넘으면 sitemap lastmod 남발/부패 의심 (카드 C-2)
_MAX_PDF_BYTES = 50 * 1024 * 1024
_PDF_DELAY = (2.0, 5.0) # 같은 도메인 연속 PDF 다운로드 간격 (kosha _DOWNLOAD_DELAY 동률)
# 텍스트 코퍼스 무가치/관리성 섹션 — 첫 path segment 기준 (조사 보고서·뉴스 릴리스는
# 루트 슬러그라 영향 없음. /news/·/investigations/ 는 목록 페이지뿐이라 제외).
_SKIP_FIRST_SEGMENT = {
"videos", "photos", "events", "members", "disclaimers", "media-room",
"about-the-csb", "about-us", "foia", "news", "investigations",
"site-map", "subscribe", "unsubscribe", "optout", "test",
"privacy-policy", "vulnerability-disclosure-policy", "en-espanol",
"newsletter", "recom-stats", "500.aspx", "documents", "records-details",
}
def _parse_sitemap(xml_text: str) -> list[tuple[str, datetime]]:
"""(url, lastmod) 목록 — lastmod 없는/파싱불가 항목은 제외 (diff 축이 없음)."""
out: list[tuple[str, datetime]] = []
for m in re.finditer(
r"<url>\s*<loc>([^<]+)</loc>\s*<lastmod>([^<]+)</lastmod>", xml_text
):
try:
lastmod = datetime.fromisoformat(m.group(2).strip())
except ValueError:
continue
if lastmod.tzinfo is None:
lastmod = lastmod.replace(tzinfo=timezone.utc)
out.append((m.group(1).strip(), lastmod))
return out
def _should_skip(url: str) -> bool:
path = urlparse(url).path.strip("/")
if not path:
return True # 홈
return path.split("/", 1)[0].lower() in _SKIP_FIRST_SEGMENT
def _pdf_links(html_text: str, base_url: str) -> list[str]:
"""페이지 내 보고서 PDF — /assets/recommendation/(상태변경 요약 다수)은 제외.
cache-buster 쿼리(?17346)는 다운로드 URL 에는 유지, dedup/파일명은 path 기준.
"""
seen: set[str] = set()
out: list[str] = []
for m in re.finditer(r'href="([^"]+\.pdf(?:\?[^"]*)?)"', html_text, re.I):
absolute = urljoin(base_url, m.group(1))
path = urlparse(absolute).path
if "/assets/recommendation/" in path.lower():
continue
if (urlparse(absolute).hostname or "").lower() != "www.csb.gov":
continue
if path not in seen:
seen.add(path)
out.append(absolute)
return out
async def _download_pdf(url: str, dest: Path) -> int:
"""PDF 다운로드 — 크기 cap + 연속 간격 (politeness 는 순차 실행 전제)."""
await asyncio.sleep(random.uniform(*_PDF_DELAY))
async with httpx.AsyncClient(timeout=60, follow_redirects=True) as client:
resp = await client.get(url, headers={"User-Agent": CRAWL_UA})
if resp.status_code != 200:
raise FeedError(f"PDF 다운로드 {resp.status_code}: {url}")
if len(resp.content) > _MAX_PDF_BYTES:
raise FeedError(f"PDF 크기 초과 ({len(resp.content)} bytes): {url}")
dest.parent.mkdir(parents=True, exist_ok=True)
dest.write_bytes(resp.content)
return len(resp.content)
async def _get_or_create_source(session) -> NewsSource:
result = await session.execute(
select(NewsSource).where(NewsSource.name == _SOURCE_NAME)
)
source = result.scalars().first()
if source is None:
source = NewsSource(
name=_SOURCE_NAME, feed_url=_SITEMAP_URL, feed_type="rss",
fetch_method="sitemap+page", fulltext_policy="none",
source_channel="crawl", category="Safety", language="en", country="US",
enabled=False, # 6h 뉴스 사이클 비대상 — 본 워커가 weekly 폴링
)
session.add(source)
await session.flush()
return source
def _watermark(source: NewsSource) -> datetime | None:
raw = (source.selector_override or {}).get("sitemap_watermark")
if not raw:
return None
try:
return datetime.fromisoformat(raw)
except ValueError:
return None
def _set_watermark(source: NewsSource, value: datetime) -> None:
# JSONB 변경 감지를 위해 dict 재할당 (fulltext_worker._set_fulltext_meta 동일 규약)
cfg = dict(source.selector_override or {})
cfg["sitemap_watermark"] = value.isoformat()
source.selector_override = cfg
async def _ingest_pdf(session, page_slug: str, pdf_url: str) -> bool:
"""PDF 1건 → NAS 저장 + Document + extract enqueue. 반환 = 신규 여부."""
fname = _safe_filename(Path(urlparse(pdf_url).path).name)
rel_path = f"crawl_raw/csb/{page_slug}/{fname}"
existing = await session.execute(
select(Document).where(Document.file_path == rel_path).limit(1)
)
if existing.scalars().first():
return False
dest = Path(settings.nas_mount_path) / rel_path
size = await _download_pdf(pdf_url, dest)
doc = Document(
file_path=rel_path,
file_hash=hashlib.sha256(dest.read_bytes()).hexdigest(),
file_format="pdf",
file_size=size,
file_type="immutable",
title=fname.rsplit(".", 1)[0].replace("_", " "),
source_channel="crawl",
data_origin="external",
import_source="csb_sitemap",
edit_url=pdf_url,
ai_tags=["Safety/CSB/보고서"],
# 안전 자료실 A-2 — ingest 시점 deterministic. CSB = 미 연방기관 = public domain.
material_type="incident",
jurisdiction="US",
extract_meta={"csb": {"page_slug": page_slug, "kind": "report_pdf"},
"license": {"scheme": "public_domain", "redistribute": True,
"attribution": "U.S. Chemical Safety Board"}},
)
session.add(doc)
await session.flush()
await enqueue_stage(session, doc.id, "extract")
logger.info(f"[csb] PDF ingest: {rel_path} ({size} bytes)")
return True
async def _ingest_url(session, source: NewsSource, url: str, lastmod: datetime) -> dict:
"""변경 URL 1건: 페이지 fetch → PDF 전수 스캔(개별 dedup) + 본문 신규면 적재.
페이지 재방문(lastmod 갱신)에서도 PDF 스캔은 항상 수행 — 갱신의 실체
(최종 보고서 추가 등)가 PDF 로 오는 경우가 핵심 가치다.
"""
counts = {"page": 0, "pdf": 0, "skip": 0}
try:
html_text, final_url = await fetch_page(url)
except (CrawlBlocked, CrawlSkip, CrawlFetchError) as e:
logger.warning(f"[csb] fetch 실패 skip: {url}{type(e).__name__}: {e}")
counts["skip"] = 1
return counts
page_slug = _safe_filename(urlparse(url).path.strip("/").split("/")[-1] or "root")
for pdf_url in _pdf_links(html_text, final_url):
try:
if await _ingest_pdf(session, page_slug, pdf_url):
counts["pdf"] += 1
except FeedError as e:
logger.warning(f"[csb] PDF 실패 skip ({pdf_url}): {e}")
# 페이지 본문 — first-wins (이미 있으면 본문 재적재 없음)
normalized_url = _normalize_url(url)
page_hash = hashlib.sha256(f"csb-page|{normalized_url}".encode()).hexdigest()[:32]
existing = await session.execute(
select(Document).where(
(Document.file_hash == page_hash)
| (Document.edit_url.in_([normalized_url, url]))
).limit(1)
)
if existing.scalars().first():
return counts
body, engine, engine_ver = _extract_body(html_text)
if not engine:
logger.info(f"[csb] 본문 부족 — 페이지 비적재 (PDF 만): {url}")
return counts
clean_body = _strip_article_footer(body.replace("\x00", ""))
if len(clean_body) < _WEB_MIN_BODY_LEN:
return counts
now = datetime.now(timezone.utc)
raw_path = _raw_html_path(source.id, page_hash, now)
raw_saved = True
try:
_save_raw_html(raw_path, html_text)
except OSError as e:
raw_saved = False
logger.error(f"[csb] 원본 보존 실패 (ingest 는 진행): {e}")
title = _page_title(html_text, fallback=page_slug.replace("-", " ")[:90])
doc = Document(
file_path=f"crawl/{_SOURCE_NAME}/{page_hash}",
file_hash=page_hash,
file_format="article",
file_size=0,
file_type="note",
title=title,
extracted_text=f"{title}\n\n{clean_body}",
extracted_at=now,
extractor_version=f"sitemap+page@{engine}",
md_content=clean_body,
md_status="success",
md_extraction_engine=engine,
md_extraction_engine_version=engine_ver,
md_format_version="1.0",
md_generated_at=now,
md_source_hash=hashlib.sha256(html_text.encode("utf-8", errors="replace")).hexdigest(),
md_content_hash=hashlib.sha256(clean_body.encode("utf-8")).hexdigest(),
content_origin="extracted",
source_channel="crawl",
data_origin="external",
edit_url=normalized_url,
review_status="approved",
ai_domain="Safety",
ai_sub_group=_SOURCE_NAME,
ai_tags=["Safety/CSB"],
# 안전 자료실 A-2 — ingest 시점 deterministic (classify-skip 경로)
material_type="incident",
jurisdiction="US",
published_date=lastmod.date() if lastmod else None,
extract_meta={
"source_id": source.id,
"source_name": _SOURCE_NAME,
"published_at": lastmod.isoformat(),
"license": {"scheme": "public_domain", "redistribute": True,
"attribution": "U.S. Chemical Safety Board"},
"fulltext": {
"status": "csb_sitemap",
"engine": engine,
"final_url": final_url,
"raw_html_path": str(raw_path) if raw_saved else None,
"body_chars": len(clean_body),
"resolved_at": now.isoformat(),
},
},
)
doc.file_size = len(doc.extracted_text.encode())
session.add(doc)
await session.flush()
await enqueue_stage(session, doc.id, "summarize")
await enqueue_stage(session, doc.id, "embed")
await enqueue_stage(session, doc.id, "chunk")
counts["page"] = 1
logger.info(f"[csb] page ingest {len(clean_body)}자 ({engine}): {title[:60]}")
return counts
async def run(bulk: bool = False, limit: int = 0) -> None:
"""weekly 진입점 (스케줄러) — bulk/limit 은 CLI 전용."""
now = datetime.now(timezone.utc)
async with async_session() as session:
source = await _get_or_create_source(session)
await session.commit()
source_id = source.id
watermark = _watermark(source)
try:
xml_text, _ = await fetch_page(
_SITEMAP_URL, content_types=("text/xml", "application/xml", "text/html")
)
entries = _parse_sitemap(xml_text)
if not entries:
raise FeedError("sitemap 파싱 0건 — 포맷 변경/부패 의심")
except (CrawlBlocked, CrawlSkip, CrawlFetchError, FeedError) as e:
logger.error(f"[csb] sitemap 수집 실패: {e}")
async with async_session() as session:
health = await _get_or_create_health(session, source_id)
_record_failure(health, str(e) or repr(e), now)
await session.commit()
return
changed = sorted(
(
(url, lastmod) for url, lastmod in entries
if not _should_skip(url) and (watermark is None or lastmod >= watermark)
),
key=lambda pair: pair[1],
)
if watermark is not None and len(changed) > _DIFF_SANITY:
logger.error(
f"[csb] diff {len(changed)}건 > sanity {_DIFF_SANITY}"
f"sitemap lastmod 남발/부패 의심 (cap 처리는 계속, 관찰 필요)"
)
cap = len(changed) if bulk else _RUN_PAGE_CAP
if limit:
cap = min(cap, limit)
todo, deferred = changed[:cap], max(len(changed) - cap, 0)
logger.info(
f"[csb] sitemap {len(entries)}건 중 변경 {len(changed)}건, 처리 {len(todo)}"
+ (f" (잔여 {deferred}건 — 워터마크 미전진으로 자동 이월)" if deferred else "")
)
totals = {"page": 0, "pdf": 0, "skip": 0}
for i, (url, lastmod) in enumerate(todo, 1):
async with async_session() as session:
src = await session.get(NewsSource, source_id)
counts = await _ingest_url(session, src, url, lastmod)
_set_watermark(src, lastmod)
await session.commit()
for k in totals:
totals[k] += counts[k]
if i % 10 == 0:
logger.info(f"[csb] 진행 {i}/{len(todo)} {totals}")
async with async_session() as session:
health = await _get_or_create_health(session, source_id)
_record_success(health, totals["page"] + totals["pdf"], False, now)
src = await session.get(NewsSource, source_id)
src.last_fetched_at = now
await session.commit()
logger.info(f"[csb] 완료: {totals} (변경 {len(changed)}건 중 {len(todo)}건 처리)")
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="CSB sitemap diff 수집")
parser.add_argument("--bulk", action="store_true", help="cap 해제 — 초기 일괄")
parser.add_argument("--limit", type=int, default=0, help="처리 상한 (검증용)")
args = parser.parse_args()
asyncio.run(run(bulk=args.bulk, limit=args.limit))