d26b1150d8
- presegment_worker: fitz open/get_toc(동기 blocking, live 스테이지)를 to_thread 로 — 거대/손상 PDF 파싱이 같은 루프의 1분 consumer + FastAPI 요청을 수백 ms~초 정지시키던 것 해소. - csb_collector: 50MB PDF write_bytes + read_bytes(해시)를 to_thread 로 (R5 동형). Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
411 lines
17 KiB
Python
411 lines
17 KiB
Python
"""C-2 잔여 ① US CSB sitemap diff 수집 워커 (plan crawl-24x7-1, 사이클 3).
|
|
|
|
RSS 폐지 → sitemap.xml lastmod diff 폴링이 정석 (정부 사이트라 lastmod 양호 —
|
|
2026-06-11 실측 1,307 URL, 조사 보고서 페이지는 루트 슬러그). 페이지 본문(4-tier
|
|
≥200자 게이트) + 보고서 PDF(/assets/, recommendation 상태요약 제외) →
|
|
기존 extract 파이프라인(marker/kordoc) 재사용.
|
|
|
|
스케줄 = weekly (main.py 월 06:50 KST):
|
|
워터마크(selector_override.sitemap_watermark — B-3 probe 설정과 같은 JSONB 슬롯)
|
|
이후 lastmod 만, 오래된 것부터 cap(40페이지/회). 워터마크는 처리분까지만 전진
|
|
= 잔량 자동 점진 백필 (KOSHA GUIDE cap 패턴). cap 미처리 잔량은 매회 로그
|
|
(silent cap 금지). diff 건수 > sanity(300) = sitemap 부패/lastmod 남발 의심 가시 경고.
|
|
|
|
초기 일괄 (cap 해제, politeness 로 수 시간 — docker exec -d, 진행 중 같은 서비스
|
|
재배포 금지 [[feedback_docker_exec_orphan_kill]] 자매 함정):
|
|
docker exec hyungi_document_server-fastapi-1 \
|
|
python -m workers.csb_collector --limit 3 # 검증용
|
|
docker exec -d hyungi_document_server-fastapi-1 \
|
|
python -m workers.csb_collector --bulk # 전체
|
|
|
|
멱등: 페이지 = edit_url(정규화)+file_hash dedup (first-wins — lastmod 갱신 페이지의
|
|
본문 재적재는 안 함, 갱신의 실체인 신규 PDF 는 개별 dedup 으로 적재됨).
|
|
PDF = file_path dedup. 워터마크 경계는 >= 재조회 — 경계 페이지 1회 재fetch 후
|
|
dedup 이 잡는다 (lastmod 실측 distinct 라 누적 재fetch 없음).
|
|
"""
|
|
|
|
import argparse
|
|
import asyncio
|
|
import hashlib
|
|
import random
|
|
import re
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from urllib.parse import urljoin, urlparse
|
|
|
|
import httpx
|
|
from sqlalchemy import select
|
|
|
|
from core.config import settings
|
|
from core.crawl_politeness import (
|
|
CRAWL_UA,
|
|
CrawlBlocked,
|
|
CrawlFetchError,
|
|
CrawlSkip,
|
|
fetch_page,
|
|
)
|
|
from core.database import async_session
|
|
from core.utils import setup_logger
|
|
from models.document import Document
|
|
from models.news_source import NewsSource
|
|
from models.queue import enqueue_stage
|
|
from workers.fulltext_worker import (
|
|
_WEB_MIN_BODY_LEN,
|
|
_extract_body,
|
|
_raw_html_path,
|
|
_save_raw_html,
|
|
_strip_article_footer,
|
|
)
|
|
from workers.kosha_collector import _safe_filename
|
|
from workers.news_collector import (
|
|
FeedError,
|
|
_get_or_create_health,
|
|
_normalize_url,
|
|
_record_failure,
|
|
_record_success,
|
|
)
|
|
from workers.static_corpus_ingest import _page_title
|
|
|
|
logger = setup_logger("csb_collector")
|
|
|
|
_SITEMAP_URL = "https://www.csb.gov/sitemap.xml"
|
|
_SOURCE_NAME = "US CSB 사고조사보고서"
|
|
|
|
_RUN_PAGE_CAP = 40 # weekly 1회 처리 상한 — 잔량은 워터마크 미전진으로 자동 이월
|
|
_DIFF_SANITY = 300 # 주간 diff 가 이를 넘으면 sitemap lastmod 남발/부패 의심 (카드 C-2)
|
|
_MAX_PDF_BYTES = 50 * 1024 * 1024
|
|
_PDF_DELAY = (2.0, 5.0) # 같은 도메인 연속 PDF 다운로드 간격 (kosha _DOWNLOAD_DELAY 동률)
|
|
|
|
# 텍스트 코퍼스 무가치/관리성 섹션 — 첫 path segment 기준 (조사 보고서·뉴스 릴리스는
|
|
# 루트 슬러그라 영향 없음. /news/·/investigations/ 는 목록 페이지뿐이라 제외).
|
|
_SKIP_FIRST_SEGMENT = {
|
|
"videos", "photos", "events", "members", "disclaimers", "media-room",
|
|
"about-the-csb", "about-us", "foia", "news", "investigations",
|
|
"site-map", "subscribe", "unsubscribe", "optout", "test",
|
|
"privacy-policy", "vulnerability-disclosure-policy", "en-espanol",
|
|
"newsletter", "recom-stats", "500.aspx", "documents", "records-details",
|
|
}
|
|
|
|
|
|
def _parse_sitemap(xml_text: str) -> list[tuple[str, datetime]]:
|
|
"""(url, lastmod) 목록 — lastmod 없는/파싱불가 항목은 제외 (diff 축이 없음)."""
|
|
out: list[tuple[str, datetime]] = []
|
|
for m in re.finditer(
|
|
r"<url>\s*<loc>([^<]+)</loc>\s*<lastmod>([^<]+)</lastmod>", xml_text
|
|
):
|
|
try:
|
|
lastmod = datetime.fromisoformat(m.group(2).strip())
|
|
except ValueError:
|
|
continue
|
|
if lastmod.tzinfo is None:
|
|
lastmod = lastmod.replace(tzinfo=timezone.utc)
|
|
out.append((m.group(1).strip(), lastmod))
|
|
return out
|
|
|
|
|
|
def _should_skip(url: str) -> bool:
|
|
path = urlparse(url).path.strip("/")
|
|
if not path:
|
|
return True # 홈
|
|
return path.split("/", 1)[0].lower() in _SKIP_FIRST_SEGMENT
|
|
|
|
|
|
def _pdf_links(html_text: str, base_url: str) -> list[str]:
|
|
"""페이지 내 보고서 PDF — /assets/recommendation/(상태변경 요약 다수)은 제외.
|
|
|
|
cache-buster 쿼리(?17346)는 다운로드 URL 에는 유지, dedup/파일명은 path 기준.
|
|
"""
|
|
seen: set[str] = set()
|
|
out: list[str] = []
|
|
for m in re.finditer(r'href="([^"]+\.pdf(?:\?[^"]*)?)"', html_text, re.I):
|
|
absolute = urljoin(base_url, m.group(1))
|
|
path = urlparse(absolute).path
|
|
if "/assets/recommendation/" in path.lower():
|
|
continue
|
|
if (urlparse(absolute).hostname or "").lower() != "www.csb.gov":
|
|
continue
|
|
if path not in seen:
|
|
seen.add(path)
|
|
out.append(absolute)
|
|
return out
|
|
|
|
|
|
async def _download_pdf(url: str, dest: Path) -> int:
|
|
"""PDF 다운로드 — 크기 cap + 연속 간격 (politeness 는 순차 실행 전제)."""
|
|
await asyncio.sleep(random.uniform(*_PDF_DELAY))
|
|
async with httpx.AsyncClient(timeout=60, follow_redirects=True) as client:
|
|
resp = await client.get(url, headers={"User-Agent": CRAWL_UA})
|
|
if resp.status_code != 200:
|
|
raise FeedError(f"PDF 다운로드 {resp.status_code}: {url}")
|
|
if len(resp.content) > _MAX_PDF_BYTES:
|
|
raise FeedError(f"PDF 크기 초과 ({len(resp.content)} bytes): {url}")
|
|
dest.parent.mkdir(parents=True, exist_ok=True)
|
|
# 최대 50MB PDF write 는 동기 blocking — 이벤트루프 점유 회피 to_thread (R5 동형).
|
|
await asyncio.to_thread(dest.write_bytes, resp.content)
|
|
return len(resp.content)
|
|
|
|
|
|
async def _get_or_create_source(session) -> NewsSource:
|
|
result = await session.execute(
|
|
select(NewsSource).where(NewsSource.name == _SOURCE_NAME)
|
|
)
|
|
source = result.scalars().first()
|
|
if source is None:
|
|
source = NewsSource(
|
|
name=_SOURCE_NAME, feed_url=_SITEMAP_URL, feed_type="rss",
|
|
fetch_method="sitemap+page", fulltext_policy="none",
|
|
source_channel="crawl", category="Safety", language="en", country="US",
|
|
enabled=False, # 6h 뉴스 사이클 비대상 — 본 워커가 weekly 폴링
|
|
)
|
|
session.add(source)
|
|
await session.flush()
|
|
return source
|
|
|
|
|
|
def _watermark(source: NewsSource) -> datetime | None:
|
|
raw = (source.selector_override or {}).get("sitemap_watermark")
|
|
if not raw:
|
|
return None
|
|
try:
|
|
return datetime.fromisoformat(raw)
|
|
except ValueError:
|
|
return None
|
|
|
|
|
|
def _set_watermark(source: NewsSource, value: datetime) -> None:
|
|
# JSONB 변경 감지를 위해 dict 재할당 (fulltext_worker._set_fulltext_meta 동일 규약)
|
|
cfg = dict(source.selector_override or {})
|
|
cfg["sitemap_watermark"] = value.isoformat()
|
|
source.selector_override = cfg
|
|
|
|
|
|
async def _ingest_pdf(session, page_slug: str, pdf_url: str) -> bool:
|
|
"""PDF 1건 → NAS 저장 + Document + extract enqueue. 반환 = 신규 여부."""
|
|
fname = _safe_filename(Path(urlparse(pdf_url).path).name)
|
|
rel_path = f"crawl_raw/csb/{page_slug}/{fname}"
|
|
existing = await session.execute(
|
|
select(Document).where(Document.file_path == rel_path).limit(1)
|
|
)
|
|
if existing.scalars().first():
|
|
return False
|
|
|
|
dest = Path(settings.nas_mount_path) / rel_path
|
|
size = await _download_pdf(pdf_url, dest)
|
|
# 50MB PDF read + sha256 는 동기 blocking(I/O+CPU) — 이벤트루프 점유 회피 to_thread (R5 동형).
|
|
file_hash = await asyncio.to_thread(lambda: hashlib.sha256(dest.read_bytes()).hexdigest())
|
|
doc = Document(
|
|
file_path=rel_path,
|
|
file_hash=file_hash,
|
|
file_format="pdf",
|
|
file_size=size,
|
|
file_type="immutable",
|
|
title=fname.rsplit(".", 1)[0].replace("_", " "),
|
|
source_channel="crawl",
|
|
data_origin="external",
|
|
import_source="csb_sitemap",
|
|
edit_url=pdf_url,
|
|
ai_tags=["Safety/CSB/보고서"],
|
|
# 안전 자료실 A-2 — ingest 시점 deterministic. CSB = 미 연방기관 = public domain.
|
|
material_type="incident",
|
|
jurisdiction="US",
|
|
extract_meta={"csb": {"page_slug": page_slug, "kind": "report_pdf"},
|
|
"license": {"scheme": "public_domain", "redistribute": True,
|
|
"attribution": "U.S. Chemical Safety Board"}},
|
|
)
|
|
session.add(doc)
|
|
await session.flush()
|
|
await enqueue_stage(session, doc.id, "extract")
|
|
logger.info(f"[csb] PDF ingest: {rel_path} ({size} bytes)")
|
|
return True
|
|
|
|
|
|
async def _ingest_url(session, source: NewsSource, url: str, lastmod: datetime) -> dict:
|
|
"""변경 URL 1건: 페이지 fetch → PDF 전수 스캔(개별 dedup) + 본문 신규면 적재.
|
|
|
|
페이지 재방문(lastmod 갱신)에서도 PDF 스캔은 항상 수행 — 갱신의 실체
|
|
(최종 보고서 추가 등)가 PDF 로 오는 경우가 핵심 가치다.
|
|
"""
|
|
counts = {"page": 0, "pdf": 0, "skip": 0}
|
|
try:
|
|
html_text, final_url = await fetch_page(url)
|
|
except (CrawlBlocked, CrawlSkip, CrawlFetchError) as e:
|
|
logger.warning(f"[csb] fetch 실패 skip: {url} — {type(e).__name__}: {e}")
|
|
counts["skip"] = 1
|
|
return counts
|
|
|
|
page_slug = _safe_filename(urlparse(url).path.strip("/").split("/")[-1] or "root")
|
|
|
|
for pdf_url in _pdf_links(html_text, final_url):
|
|
try:
|
|
if await _ingest_pdf(session, page_slug, pdf_url):
|
|
counts["pdf"] += 1
|
|
except FeedError as e:
|
|
logger.warning(f"[csb] PDF 실패 skip ({pdf_url}): {e}")
|
|
|
|
# 페이지 본문 — first-wins (이미 있으면 본문 재적재 없음)
|
|
normalized_url = _normalize_url(url)
|
|
page_hash = hashlib.sha256(f"csb-page|{normalized_url}".encode()).hexdigest()[:32]
|
|
existing = await session.execute(
|
|
select(Document).where(
|
|
(Document.file_hash == page_hash)
|
|
| (Document.edit_url.in_([normalized_url, url]))
|
|
).limit(1)
|
|
)
|
|
if existing.scalars().first():
|
|
return counts
|
|
|
|
body, engine, engine_ver = _extract_body(html_text)
|
|
if not engine:
|
|
logger.info(f"[csb] 본문 부족 — 페이지 비적재 (PDF 만): {url}")
|
|
return counts
|
|
clean_body = _strip_article_footer(body.replace("\x00", ""))
|
|
if len(clean_body) < _WEB_MIN_BODY_LEN:
|
|
return counts
|
|
|
|
now = datetime.now(timezone.utc)
|
|
raw_path = _raw_html_path(source.id, page_hash, now)
|
|
raw_saved = True
|
|
try:
|
|
_save_raw_html(raw_path, html_text)
|
|
except OSError as e:
|
|
raw_saved = False
|
|
logger.error(f"[csb] 원본 보존 실패 (ingest 는 진행): {e}")
|
|
|
|
title = _page_title(html_text, fallback=page_slug.replace("-", " ")[:90])
|
|
doc = Document(
|
|
file_path=f"crawl/{_SOURCE_NAME}/{page_hash}",
|
|
file_hash=page_hash,
|
|
file_format="article",
|
|
file_size=0,
|
|
file_type="note",
|
|
title=title,
|
|
extracted_text=f"{title}\n\n{clean_body}",
|
|
extracted_at=now,
|
|
extractor_version=f"sitemap+page@{engine}",
|
|
md_content=clean_body,
|
|
md_status="success",
|
|
md_extraction_engine=engine,
|
|
md_extraction_engine_version=engine_ver,
|
|
md_format_version="1.0",
|
|
md_generated_at=now,
|
|
md_source_hash=hashlib.sha256(html_text.encode("utf-8", errors="replace")).hexdigest(),
|
|
md_content_hash=hashlib.sha256(clean_body.encode("utf-8")).hexdigest(),
|
|
content_origin="extracted",
|
|
source_channel="crawl",
|
|
data_origin="external",
|
|
edit_url=normalized_url,
|
|
review_status="approved",
|
|
ai_domain="Safety",
|
|
ai_sub_group=_SOURCE_NAME,
|
|
ai_tags=["Safety/CSB"],
|
|
# 안전 자료실 A-2 — ingest 시점 deterministic (classify-skip 경로)
|
|
material_type="incident",
|
|
jurisdiction="US",
|
|
published_date=lastmod.date() if lastmod else None,
|
|
extract_meta={
|
|
"source_id": source.id,
|
|
"source_name": _SOURCE_NAME,
|
|
"published_at": lastmod.isoformat(),
|
|
"license": {"scheme": "public_domain", "redistribute": True,
|
|
"attribution": "U.S. Chemical Safety Board"},
|
|
"fulltext": {
|
|
"status": "csb_sitemap",
|
|
"engine": engine,
|
|
"final_url": final_url,
|
|
"raw_html_path": str(raw_path) if raw_saved else None,
|
|
"body_chars": len(clean_body),
|
|
"resolved_at": now.isoformat(),
|
|
},
|
|
},
|
|
)
|
|
doc.file_size = len(doc.extracted_text.encode())
|
|
session.add(doc)
|
|
await session.flush()
|
|
await enqueue_stage(session, doc.id, "summarize")
|
|
await enqueue_stage(session, doc.id, "embed")
|
|
await enqueue_stage(session, doc.id, "chunk")
|
|
counts["page"] = 1
|
|
logger.info(f"[csb] page ingest {len(clean_body)}자 ({engine}): {title[:60]}")
|
|
return counts
|
|
|
|
|
|
async def run(bulk: bool = False, limit: int = 0) -> None:
|
|
"""weekly 진입점 (스케줄러) — bulk/limit 은 CLI 전용."""
|
|
now = datetime.now(timezone.utc)
|
|
async with async_session() as session:
|
|
source = await _get_or_create_source(session)
|
|
await session.commit()
|
|
source_id = source.id
|
|
watermark = _watermark(source)
|
|
|
|
try:
|
|
xml_text, _ = await fetch_page(
|
|
_SITEMAP_URL, content_types=("text/xml", "application/xml", "text/html")
|
|
)
|
|
entries = _parse_sitemap(xml_text)
|
|
if not entries:
|
|
raise FeedError("sitemap 파싱 0건 — 포맷 변경/부패 의심")
|
|
except (CrawlBlocked, CrawlSkip, CrawlFetchError, FeedError) as e:
|
|
logger.error(f"[csb] sitemap 수집 실패: {e}")
|
|
async with async_session() as session:
|
|
health = await _get_or_create_health(session, source_id)
|
|
_record_failure(health, str(e) or repr(e), now)
|
|
await session.commit()
|
|
return
|
|
|
|
changed = sorted(
|
|
(
|
|
(url, lastmod) for url, lastmod in entries
|
|
if not _should_skip(url) and (watermark is None or lastmod >= watermark)
|
|
),
|
|
key=lambda pair: pair[1],
|
|
)
|
|
if watermark is not None and len(changed) > _DIFF_SANITY:
|
|
logger.error(
|
|
f"[csb] diff {len(changed)}건 > sanity {_DIFF_SANITY} — "
|
|
f"sitemap lastmod 남발/부패 의심 (cap 처리는 계속, 관찰 필요)"
|
|
)
|
|
|
|
cap = len(changed) if bulk else _RUN_PAGE_CAP
|
|
if limit:
|
|
cap = min(cap, limit)
|
|
todo, deferred = changed[:cap], max(len(changed) - cap, 0)
|
|
logger.info(
|
|
f"[csb] sitemap {len(entries)}건 중 변경 {len(changed)}건, 처리 {len(todo)}건"
|
|
+ (f" (잔여 {deferred}건 — 워터마크 미전진으로 자동 이월)" if deferred else "")
|
|
)
|
|
|
|
totals = {"page": 0, "pdf": 0, "skip": 0}
|
|
for i, (url, lastmod) in enumerate(todo, 1):
|
|
# 2026-06-20 C2: URL 1건 실패가 주간 run 전체를 중단(이후 URL 스킵·watermark 정지)하던 것 차단.
|
|
# 각 iteration 은 자체 session(async with) 이라 실패 격리 — 건너뛰고 계속.
|
|
try:
|
|
async with async_session() as session:
|
|
src = await session.get(NewsSource, source_id)
|
|
counts = await _ingest_url(session, src, url, lastmod)
|
|
_set_watermark(src, lastmod)
|
|
await session.commit()
|
|
except Exception as e:
|
|
logger.error(f"[csb] URL 처리 실패 (건너뜀): {url} — {str(e) or repr(e)}")
|
|
continue
|
|
for k in totals:
|
|
totals[k] += counts[k]
|
|
if i % 10 == 0:
|
|
logger.info(f"[csb] 진행 {i}/{len(todo)} {totals}")
|
|
|
|
async with async_session() as session:
|
|
health = await _get_or_create_health(session, source_id)
|
|
_record_success(health, totals["page"] + totals["pdf"], False, now)
|
|
src = await session.get(NewsSource, source_id)
|
|
src.last_fetched_at = now
|
|
await session.commit()
|
|
logger.info(f"[csb] 완료: {totals} (변경 {len(changed)}건 중 {len(todo)}건 처리)")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
parser = argparse.ArgumentParser(description="CSB sitemap diff 수집")
|
|
parser.add_argument("--bulk", action="store_true", help="cap 해제 — 초기 일괄")
|
|
parser.add_argument("--limit", type=int, default=0, help="처리 상한 (검증용)")
|
|
args = parser.parse_args()
|
|
asyncio.run(run(bulk=args.bulk, limit=args.limit))
|