Files
hyungi_document_server/app/workers/csb_collector.py
T
hyungi 12ac18eb70 fix(collector): 수집기 견고화 — 한 건 실패가 전체 사이클을 죽이던 것 차단
C2 csb_collector: 주간 run 의 per-URL 루프에 try/except/continue — URL 1건 실패(page-extract
예외·DB DataError)가 run() 밖으로 전파돼 이후 URL 전부 스킵+watermark 정지하던 것 차단. 각
iteration 자체 session 이라 실패 격리.
H3 news_collector: 공유 세션+종단 단일 commit → 한 소스 DB오류가 오염시켜 전 소스 insert 소실하던
구조를 소스별 독립 세션으로(csb 패턴 동형). 실패 시 rollback 후 깨끗한 상태에서 failure 기록.
실증: 수동 수집서 Taipei Times ReadTimeout 격리하고 327건 정상 완주.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-20 05:42:12 +00:00

408 lines
16 KiB
Python

"""C-2 잔여 ① US CSB sitemap diff 수집 워커 (plan crawl-24x7-1, 사이클 3).
RSS 폐지 → sitemap.xml lastmod diff 폴링이 정석 (정부 사이트라 lastmod 양호 —
2026-06-11 실측 1,307 URL, 조사 보고서 페이지는 루트 슬러그). 페이지 본문(4-tier
≥200자 게이트) + 보고서 PDF(/assets/, recommendation 상태요약 제외) →
기존 extract 파이프라인(marker/kordoc) 재사용.
스케줄 = weekly (main.py 월 06:50 KST):
워터마크(selector_override.sitemap_watermark — B-3 probe 설정과 같은 JSONB 슬롯)
이후 lastmod 만, 오래된 것부터 cap(40페이지/회). 워터마크는 처리분까지만 전진
= 잔량 자동 점진 백필 (KOSHA GUIDE cap 패턴). cap 미처리 잔량은 매회 로그
(silent cap 금지). diff 건수 > sanity(300) = sitemap 부패/lastmod 남발 의심 가시 경고.
초기 일괄 (cap 해제, politeness 로 수 시간 — docker exec -d, 진행 중 같은 서비스
재배포 금지 [[feedback_docker_exec_orphan_kill]] 자매 함정):
docker exec hyungi_document_server-fastapi-1 \
python -m workers.csb_collector --limit 3 # 검증용
docker exec -d hyungi_document_server-fastapi-1 \
python -m workers.csb_collector --bulk # 전체
멱등: 페이지 = edit_url(정규화)+file_hash dedup (first-wins — lastmod 갱신 페이지의
본문 재적재는 안 함, 갱신의 실체인 신규 PDF 는 개별 dedup 으로 적재됨).
PDF = file_path dedup. 워터마크 경계는 >= 재조회 — 경계 페이지 1회 재fetch 후
dedup 이 잡는다 (lastmod 실측 distinct 라 누적 재fetch 없음).
"""
import argparse
import asyncio
import hashlib
import random
import re
from datetime import datetime, timezone
from pathlib import Path
from urllib.parse import urljoin, urlparse
import httpx
from sqlalchemy import select
from core.config import settings
from core.crawl_politeness import (
CRAWL_UA,
CrawlBlocked,
CrawlFetchError,
CrawlSkip,
fetch_page,
)
from core.database import async_session
from core.utils import setup_logger
from models.document import Document
from models.news_source import NewsSource
from models.queue import enqueue_stage
from workers.fulltext_worker import (
_WEB_MIN_BODY_LEN,
_extract_body,
_raw_html_path,
_save_raw_html,
_strip_article_footer,
)
from workers.kosha_collector import _safe_filename
from workers.news_collector import (
FeedError,
_get_or_create_health,
_normalize_url,
_record_failure,
_record_success,
)
from workers.static_corpus_ingest import _page_title
logger = setup_logger("csb_collector")
_SITEMAP_URL = "https://www.csb.gov/sitemap.xml"
_SOURCE_NAME = "US CSB 사고조사보고서"
_RUN_PAGE_CAP = 40 # weekly 1회 처리 상한 — 잔량은 워터마크 미전진으로 자동 이월
_DIFF_SANITY = 300 # 주간 diff 가 이를 넘으면 sitemap lastmod 남발/부패 의심 (카드 C-2)
_MAX_PDF_BYTES = 50 * 1024 * 1024
_PDF_DELAY = (2.0, 5.0) # 같은 도메인 연속 PDF 다운로드 간격 (kosha _DOWNLOAD_DELAY 동률)
# 텍스트 코퍼스 무가치/관리성 섹션 — 첫 path segment 기준 (조사 보고서·뉴스 릴리스는
# 루트 슬러그라 영향 없음. /news/·/investigations/ 는 목록 페이지뿐이라 제외).
_SKIP_FIRST_SEGMENT = {
"videos", "photos", "events", "members", "disclaimers", "media-room",
"about-the-csb", "about-us", "foia", "news", "investigations",
"site-map", "subscribe", "unsubscribe", "optout", "test",
"privacy-policy", "vulnerability-disclosure-policy", "en-espanol",
"newsletter", "recom-stats", "500.aspx", "documents", "records-details",
}
def _parse_sitemap(xml_text: str) -> list[tuple[str, datetime]]:
"""(url, lastmod) 목록 — lastmod 없는/파싱불가 항목은 제외 (diff 축이 없음)."""
out: list[tuple[str, datetime]] = []
for m in re.finditer(
r"<url>\s*<loc>([^<]+)</loc>\s*<lastmod>([^<]+)</lastmod>", xml_text
):
try:
lastmod = datetime.fromisoformat(m.group(2).strip())
except ValueError:
continue
if lastmod.tzinfo is None:
lastmod = lastmod.replace(tzinfo=timezone.utc)
out.append((m.group(1).strip(), lastmod))
return out
def _should_skip(url: str) -> bool:
path = urlparse(url).path.strip("/")
if not path:
return True # 홈
return path.split("/", 1)[0].lower() in _SKIP_FIRST_SEGMENT
def _pdf_links(html_text: str, base_url: str) -> list[str]:
"""페이지 내 보고서 PDF — /assets/recommendation/(상태변경 요약 다수)은 제외.
cache-buster 쿼리(?17346)는 다운로드 URL 에는 유지, dedup/파일명은 path 기준.
"""
seen: set[str] = set()
out: list[str] = []
for m in re.finditer(r'href="([^"]+\.pdf(?:\?[^"]*)?)"', html_text, re.I):
absolute = urljoin(base_url, m.group(1))
path = urlparse(absolute).path
if "/assets/recommendation/" in path.lower():
continue
if (urlparse(absolute).hostname or "").lower() != "www.csb.gov":
continue
if path not in seen:
seen.add(path)
out.append(absolute)
return out
async def _download_pdf(url: str, dest: Path) -> int:
"""PDF 다운로드 — 크기 cap + 연속 간격 (politeness 는 순차 실행 전제)."""
await asyncio.sleep(random.uniform(*_PDF_DELAY))
async with httpx.AsyncClient(timeout=60, follow_redirects=True) as client:
resp = await client.get(url, headers={"User-Agent": CRAWL_UA})
if resp.status_code != 200:
raise FeedError(f"PDF 다운로드 {resp.status_code}: {url}")
if len(resp.content) > _MAX_PDF_BYTES:
raise FeedError(f"PDF 크기 초과 ({len(resp.content)} bytes): {url}")
dest.parent.mkdir(parents=True, exist_ok=True)
dest.write_bytes(resp.content)
return len(resp.content)
async def _get_or_create_source(session) -> NewsSource:
result = await session.execute(
select(NewsSource).where(NewsSource.name == _SOURCE_NAME)
)
source = result.scalars().first()
if source is None:
source = NewsSource(
name=_SOURCE_NAME, feed_url=_SITEMAP_URL, feed_type="rss",
fetch_method="sitemap+page", fulltext_policy="none",
source_channel="crawl", category="Safety", language="en", country="US",
enabled=False, # 6h 뉴스 사이클 비대상 — 본 워커가 weekly 폴링
)
session.add(source)
await session.flush()
return source
def _watermark(source: NewsSource) -> datetime | None:
raw = (source.selector_override or {}).get("sitemap_watermark")
if not raw:
return None
try:
return datetime.fromisoformat(raw)
except ValueError:
return None
def _set_watermark(source: NewsSource, value: datetime) -> None:
# JSONB 변경 감지를 위해 dict 재할당 (fulltext_worker._set_fulltext_meta 동일 규약)
cfg = dict(source.selector_override or {})
cfg["sitemap_watermark"] = value.isoformat()
source.selector_override = cfg
async def _ingest_pdf(session, page_slug: str, pdf_url: str) -> bool:
"""PDF 1건 → NAS 저장 + Document + extract enqueue. 반환 = 신규 여부."""
fname = _safe_filename(Path(urlparse(pdf_url).path).name)
rel_path = f"crawl_raw/csb/{page_slug}/{fname}"
existing = await session.execute(
select(Document).where(Document.file_path == rel_path).limit(1)
)
if existing.scalars().first():
return False
dest = Path(settings.nas_mount_path) / rel_path
size = await _download_pdf(pdf_url, dest)
doc = Document(
file_path=rel_path,
file_hash=hashlib.sha256(dest.read_bytes()).hexdigest(),
file_format="pdf",
file_size=size,
file_type="immutable",
title=fname.rsplit(".", 1)[0].replace("_", " "),
source_channel="crawl",
data_origin="external",
import_source="csb_sitemap",
edit_url=pdf_url,
ai_tags=["Safety/CSB/보고서"],
# 안전 자료실 A-2 — ingest 시점 deterministic. CSB = 미 연방기관 = public domain.
material_type="incident",
jurisdiction="US",
extract_meta={"csb": {"page_slug": page_slug, "kind": "report_pdf"},
"license": {"scheme": "public_domain", "redistribute": True,
"attribution": "U.S. Chemical Safety Board"}},
)
session.add(doc)
await session.flush()
await enqueue_stage(session, doc.id, "extract")
logger.info(f"[csb] PDF ingest: {rel_path} ({size} bytes)")
return True
async def _ingest_url(session, source: NewsSource, url: str, lastmod: datetime) -> dict:
"""변경 URL 1건: 페이지 fetch → PDF 전수 스캔(개별 dedup) + 본문 신규면 적재.
페이지 재방문(lastmod 갱신)에서도 PDF 스캔은 항상 수행 — 갱신의 실체
(최종 보고서 추가 등)가 PDF 로 오는 경우가 핵심 가치다.
"""
counts = {"page": 0, "pdf": 0, "skip": 0}
try:
html_text, final_url = await fetch_page(url)
except (CrawlBlocked, CrawlSkip, CrawlFetchError) as e:
logger.warning(f"[csb] fetch 실패 skip: {url}{type(e).__name__}: {e}")
counts["skip"] = 1
return counts
page_slug = _safe_filename(urlparse(url).path.strip("/").split("/")[-1] or "root")
for pdf_url in _pdf_links(html_text, final_url):
try:
if await _ingest_pdf(session, page_slug, pdf_url):
counts["pdf"] += 1
except FeedError as e:
logger.warning(f"[csb] PDF 실패 skip ({pdf_url}): {e}")
# 페이지 본문 — first-wins (이미 있으면 본문 재적재 없음)
normalized_url = _normalize_url(url)
page_hash = hashlib.sha256(f"csb-page|{normalized_url}".encode()).hexdigest()[:32]
existing = await session.execute(
select(Document).where(
(Document.file_hash == page_hash)
| (Document.edit_url.in_([normalized_url, url]))
).limit(1)
)
if existing.scalars().first():
return counts
body, engine, engine_ver = _extract_body(html_text)
if not engine:
logger.info(f"[csb] 본문 부족 — 페이지 비적재 (PDF 만): {url}")
return counts
clean_body = _strip_article_footer(body.replace("\x00", ""))
if len(clean_body) < _WEB_MIN_BODY_LEN:
return counts
now = datetime.now(timezone.utc)
raw_path = _raw_html_path(source.id, page_hash, now)
raw_saved = True
try:
_save_raw_html(raw_path, html_text)
except OSError as e:
raw_saved = False
logger.error(f"[csb] 원본 보존 실패 (ingest 는 진행): {e}")
title = _page_title(html_text, fallback=page_slug.replace("-", " ")[:90])
doc = Document(
file_path=f"crawl/{_SOURCE_NAME}/{page_hash}",
file_hash=page_hash,
file_format="article",
file_size=0,
file_type="note",
title=title,
extracted_text=f"{title}\n\n{clean_body}",
extracted_at=now,
extractor_version=f"sitemap+page@{engine}",
md_content=clean_body,
md_status="success",
md_extraction_engine=engine,
md_extraction_engine_version=engine_ver,
md_format_version="1.0",
md_generated_at=now,
md_source_hash=hashlib.sha256(html_text.encode("utf-8", errors="replace")).hexdigest(),
md_content_hash=hashlib.sha256(clean_body.encode("utf-8")).hexdigest(),
content_origin="extracted",
source_channel="crawl",
data_origin="external",
edit_url=normalized_url,
review_status="approved",
ai_domain="Safety",
ai_sub_group=_SOURCE_NAME,
ai_tags=["Safety/CSB"],
# 안전 자료실 A-2 — ingest 시점 deterministic (classify-skip 경로)
material_type="incident",
jurisdiction="US",
published_date=lastmod.date() if lastmod else None,
extract_meta={
"source_id": source.id,
"source_name": _SOURCE_NAME,
"published_at": lastmod.isoformat(),
"license": {"scheme": "public_domain", "redistribute": True,
"attribution": "U.S. Chemical Safety Board"},
"fulltext": {
"status": "csb_sitemap",
"engine": engine,
"final_url": final_url,
"raw_html_path": str(raw_path) if raw_saved else None,
"body_chars": len(clean_body),
"resolved_at": now.isoformat(),
},
},
)
doc.file_size = len(doc.extracted_text.encode())
session.add(doc)
await session.flush()
await enqueue_stage(session, doc.id, "summarize")
await enqueue_stage(session, doc.id, "embed")
await enqueue_stage(session, doc.id, "chunk")
counts["page"] = 1
logger.info(f"[csb] page ingest {len(clean_body)}자 ({engine}): {title[:60]}")
return counts
async def run(bulk: bool = False, limit: int = 0) -> None:
"""weekly 진입점 (스케줄러) — bulk/limit 은 CLI 전용."""
now = datetime.now(timezone.utc)
async with async_session() as session:
source = await _get_or_create_source(session)
await session.commit()
source_id = source.id
watermark = _watermark(source)
try:
xml_text, _ = await fetch_page(
_SITEMAP_URL, content_types=("text/xml", "application/xml", "text/html")
)
entries = _parse_sitemap(xml_text)
if not entries:
raise FeedError("sitemap 파싱 0건 — 포맷 변경/부패 의심")
except (CrawlBlocked, CrawlSkip, CrawlFetchError, FeedError) as e:
logger.error(f"[csb] sitemap 수집 실패: {e}")
async with async_session() as session:
health = await _get_or_create_health(session, source_id)
_record_failure(health, str(e) or repr(e), now)
await session.commit()
return
changed = sorted(
(
(url, lastmod) for url, lastmod in entries
if not _should_skip(url) and (watermark is None or lastmod >= watermark)
),
key=lambda pair: pair[1],
)
if watermark is not None and len(changed) > _DIFF_SANITY:
logger.error(
f"[csb] diff {len(changed)}건 > sanity {_DIFF_SANITY}"
f"sitemap lastmod 남발/부패 의심 (cap 처리는 계속, 관찰 필요)"
)
cap = len(changed) if bulk else _RUN_PAGE_CAP
if limit:
cap = min(cap, limit)
todo, deferred = changed[:cap], max(len(changed) - cap, 0)
logger.info(
f"[csb] sitemap {len(entries)}건 중 변경 {len(changed)}건, 처리 {len(todo)}"
+ (f" (잔여 {deferred}건 — 워터마크 미전진으로 자동 이월)" if deferred else "")
)
totals = {"page": 0, "pdf": 0, "skip": 0}
for i, (url, lastmod) in enumerate(todo, 1):
# 2026-06-20 C2: URL 1건 실패가 주간 run 전체를 중단(이후 URL 스킵·watermark 정지)하던 것 차단.
# 각 iteration 은 자체 session(async with) 이라 실패 격리 — 건너뛰고 계속.
try:
async with async_session() as session:
src = await session.get(NewsSource, source_id)
counts = await _ingest_url(session, src, url, lastmod)
_set_watermark(src, lastmod)
await session.commit()
except Exception as e:
logger.error(f"[csb] URL 처리 실패 (건너뜀): {url}{str(e) or repr(e)}")
continue
for k in totals:
totals[k] += counts[k]
if i % 10 == 0:
logger.info(f"[csb] 진행 {i}/{len(todo)} {totals}")
async with async_session() as session:
health = await _get_or_create_health(session, source_id)
_record_success(health, totals["page"] + totals["pdf"], False, now)
src = await session.get(NewsSource, source_id)
src.last_fetched_at = now
await session.commit()
logger.info(f"[csb] 완료: {totals} (변경 {len(changed)}건 중 {len(todo)}건 처리)")
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="CSB sitemap diff 수집")
parser.add_argument("--bulk", action="store_true", help="cap 해제 — 초기 일괄")
parser.add_argument("--limit", type=int, default=0, help="처리 상한 (검증용)")
args = parser.parse_args()
asyncio.run(run(bulk=args.bulk, limit=args.limit))