2e19dc3d37
kosha run() 이 소스별 단일 세션으로 collector 전체를 돌리고 예외 시 rollback → 페이지 _api_get 실패가 앞서 적재한 케이스/항목을 전부 폐기(부분 적재 손실 + 매번 같은 지점 실패 시 영구 미적재). disaster_cases/fatal_accidents/guide 의 케이스·항목 단위로 session.commit() 경계 추가(csb/api_standards idiom) — 실패 이전 적재분 보존, dedup 으로 다음 run 이 이어받음. 첨부 실패는 기존대로 격리(변경 없음). 검증: py_compile 통과. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
497 lines
21 KiB
Python
497 lines
21 KiB
Python
"""C-2 KOSHA Open API 수집 워커 (plan crawl-24x7-1).
|
||
|
||
4 API (2026-06-10/06-13 실키 live 검증 + fixture 박제 — tests/fixtures/kosha_*_response.json):
|
||
재해사례 게시판: GET /B552468/disaster_api02/getdisaster_api02 callApiId=1060
|
||
재해사례 첨부: GET /B552468/disaster_attach_api02/Disaster_attach_api02 callApiId=1070
|
||
KOSHA GUIDE: GET /B552468/koshaguide/getKoshaGuide callApiId=1050
|
||
사망사고 속보: GET /B552468/news_api02/getNews_api02 callApiId=1040
|
||
|
||
daily 스케줄 1회 (main.py):
|
||
재해사례 = 최근 페이지만 diff (boardno dedup) — 사례 본문 Document(텍스트 네이티브)
|
||
+ 첨부 PDF/HWP 다운로드 → /documents/crawl_raw/kosha/{boardno}/ 저장
|
||
→ 파일 Document + extract enqueue (kordoc HWP/PDF 기존 파이프라인 재사용).
|
||
사망사고 = 최근 페이지만 diff (arno dedup) — 속보 본문 Document(HTML → _clean_html).
|
||
첨부 API 없음·business 필드 없음. 등록일 = arno 접두 8자리(YYYYMMDD).
|
||
GUIDE = 전체 레지스트리 메타 diff (1039건, 100/page = 11 call) → 신규/개정만,
|
||
일일 ingest cap(기본 25) = backlog 자동 점진 백필(~6주) + 부하 평탄화.
|
||
cap 으로 미처리 잔량은 매회 로그 (silent cap 금지).
|
||
|
||
키: KOSHA_API_KEY (credentials.env) — 공공데이터포털 '인코딩' 키를 그대로 저장.
|
||
httpx params= 로 넘기면 % 가 재인코딩되므로 반드시 URL 문자열에 직접 결합.
|
||
개정 감지: GUIDE dedup 키 = 규정번호+공표일자 — 같은 번호의 새 공표일자 = 신규 문서로 적재.
|
||
"""
|
||
|
||
import asyncio
|
||
import hashlib
|
||
import os
|
||
import random
|
||
import re
|
||
from datetime import date, datetime, timezone
|
||
from pathlib import Path
|
||
|
||
import httpx
|
||
from sqlalchemy import select
|
||
|
||
from core.config import settings
|
||
from core.crawl_politeness import CRAWL_UA
|
||
from core.database import async_session
|
||
from core.utils import setup_logger
|
||
from models.document import Document
|
||
from models.news_source import NewsSource
|
||
from models.queue import enqueue_stage
|
||
from workers.news_collector import (
|
||
FeedError,
|
||
_clean_html,
|
||
_get_or_create_health,
|
||
_record_failure,
|
||
_record_success,
|
||
)
|
||
|
||
logger = setup_logger("kosha_collector")
|
||
|
||
_BASE = "https://apis.data.go.kr/B552468"
|
||
_BOARD_EP = f"{_BASE}/disaster_api02/getdisaster_api02"
|
||
_ATTACH_EP = f"{_BASE}/disaster_attach_api02/Disaster_attach_api02"
|
||
_GUIDE_EP = f"{_BASE}/koshaguide/getKoshaGuide"
|
||
_FATAL_EP = f"{_BASE}/news_api02/getNews_api02"
|
||
|
||
_CASE_SOURCE = "KOSHA 재해사례"
|
||
_GUIDE_SOURCE = "KOSHA GUIDE"
|
||
_FATAL_SOURCE = "KOSHA 사망사고"
|
||
|
||
_CASE_PAGES = 2 # daily diff 범위 (30×2 = 최근 60건 — 등록일 역순 API)
|
||
_CASE_ROWS = 30
|
||
_FATAL_PAGES = 2 # 사망사고 속보 daily diff (30×2 = 최근 60건 — 등록일 역순)
|
||
_FATAL_ROWS = 30
|
||
_GUIDE_ROWS = 100
|
||
_GUIDE_DAILY_CAP = int(os.getenv("KOSHA_GUIDE_DAILY_CAP", "25"))
|
||
_MAX_FILE_BYTES = 50 * 1024 * 1024
|
||
_DOWNLOAD_DELAY = (2.0, 5.0) # portal.kosha.or.kr 파일서버 — 연속 다운로드 간격
|
||
|
||
# 안전 자료실 A-2 — KOSHA 산출물 라이선스 (KOGL 유형 미확정 → 보수적 redistribute=False,
|
||
# 근거 확보 시 완화. 0-3 license 메타 deterministic 주입).
|
||
_KOSHA_LICENSE = {"scheme": "kogl", "redistribute": False, "attribution": "한국산업안전보건공단(KOSHA)"}
|
||
|
||
|
||
def _ymd_to_date(ymd: str | None) -> date | None:
|
||
"""'YYYYMMDD'/'YYYY-MM-DD' → date. 형식 불일치는 None (fail-quiet — 날짜는 보조 축)."""
|
||
digits = re.sub(r"\D", "", ymd or "")
|
||
if len(digits) != 8:
|
||
return None
|
||
try:
|
||
return date(int(digits[:4]), int(digits[4:6]), int(digits[6:8]))
|
||
except ValueError:
|
||
return None
|
||
|
||
|
||
def _api_key() -> str:
|
||
key = os.getenv("KOSHA_API_KEY", "")
|
||
if not key:
|
||
raise FeedError("KOSHA_API_KEY 미설정 — KOSHA 수집 불가")
|
||
return key
|
||
|
||
|
||
async def _api_get(url: str) -> dict:
|
||
"""공통 GET — 게이트웨이/제공자 이중 에러 체계 검사."""
|
||
async with httpx.AsyncClient(timeout=25) as client:
|
||
resp = await client.get(url, headers={"User-Agent": CRAWL_UA})
|
||
if resp.status_code != 200:
|
||
raise FeedError(f"KOSHA API {resp.status_code} @ {url.split('?')[0]}")
|
||
try:
|
||
payload = resp.json()
|
||
except ValueError as e:
|
||
# 게이트웨이 에러는 XML/plain 으로 옴 (SERVICE_KEY_IS_NOT_REGISTERED 등)
|
||
raise FeedError(f"KOSHA API 비-JSON 응답: {resp.text[:120]}") from e
|
||
code = (payload.get("header") or {}).get("resultCode")
|
||
if code != "00":
|
||
raise FeedError(f"KOSHA API resultCode={code}: {(payload.get('header') or {}).get('resultMsg')}")
|
||
return payload
|
||
|
||
|
||
def _items(payload: dict) -> list[dict]:
|
||
"""body.items.item — 단건이면 dict 로 오는 data.go.kr 관행 방어."""
|
||
item = ((payload.get("body") or {}).get("items") or {}).get("item")
|
||
if item is None:
|
||
return []
|
||
return [item] if isinstance(item, dict) else list(item)
|
||
|
||
|
||
def _fatal_fields(item: dict) -> dict | None:
|
||
"""사망사고 item(arno/keyword/contents 3필드 고정) → Document 필드 매핑.
|
||
|
||
순수 함수(httpx/DB 불요 — fixture 단위 테스트 대상). 필수 = arno+keyword,
|
||
부재 시 None(skip). 날짜 전용 필드가 없어 등록 식별자 arno 접두에서 유도:
|
||
arno = 'YYYYMMDDHHMMSS' + 임의 6자 (2019~ 라이브 전수 동형 검증). 접두 8자리=KST
|
||
등록일 → published_date, 14자리=등록시각 → reg_dt(원문 그대로, tz 해석 미주장).
|
||
"""
|
||
arno = str(item.get("arno") or "").strip()
|
||
title = (item.get("keyword") or "").strip()
|
||
if not arno or not title:
|
||
return None
|
||
text = _clean_html(item.get("contents") or "", max_len=None)
|
||
reg_dt = arno[:14] if re.fullmatch(r"\d{14}", arno[:14]) else None
|
||
return {
|
||
"arno": arno,
|
||
"title": title,
|
||
"text": text,
|
||
"published_date": _ymd_to_date(arno[:8]),
|
||
"reg_dt": reg_dt,
|
||
}
|
||
|
||
|
||
def _safe_filename(name: str) -> str:
|
||
"""NAS 파일명 정화 — 경로분리자/제어문자/공백연쇄 제거 (쉘 함정 회피)."""
|
||
name = re.sub(r"[/\\\x00-\x1f]", "_", name).strip()
|
||
name = re.sub(r"\s+", " ", name)
|
||
return name[:140] or "unnamed"
|
||
|
||
|
||
async def _download(url: str, dest: Path) -> int:
|
||
"""첨부/규정 파일 다운로드 — 크기 cap + 디렉토리 생성 + 연속 간격."""
|
||
await asyncio.sleep(random.uniform(*_DOWNLOAD_DELAY))
|
||
async with httpx.AsyncClient(timeout=60, follow_redirects=True) as client:
|
||
resp = await client.get(url, headers={"User-Agent": CRAWL_UA})
|
||
if resp.status_code != 200:
|
||
raise FeedError(f"파일 다운로드 {resp.status_code}: {url}")
|
||
if len(resp.content) > _MAX_FILE_BYTES:
|
||
raise FeedError(f"파일 크기 초과 ({len(resp.content)} bytes): {url}")
|
||
dest.parent.mkdir(parents=True, exist_ok=True)
|
||
dest.write_bytes(resp.content)
|
||
return len(resp.content)
|
||
|
||
|
||
async def _get_or_create_source(session, name: str, feed_url: str) -> NewsSource:
|
||
result = await session.execute(select(NewsSource).where(NewsSource.name == name))
|
||
source = result.scalars().first()
|
||
if source is None:
|
||
source = NewsSource(
|
||
name=name, feed_url=feed_url, feed_type="rss", fetch_method="api",
|
||
fulltext_policy="none", source_channel="crawl", category="Safety",
|
||
language="ko", country="KR",
|
||
enabled=False, # 6h 뉴스 사이클 비대상 — 본 워커가 daily 폴링
|
||
)
|
||
session.add(source)
|
||
await session.flush()
|
||
return source
|
||
|
||
|
||
async def _ingest_attachment(session, boardno: str, filenm: str, filepath: str) -> bool:
|
||
"""첨부 1건 → NAS 저장 + 파일 Document + extract enqueue. 반환 = 신규 여부."""
|
||
safe = _safe_filename(filenm)
|
||
rel_path = f"crawl_raw/kosha/{boardno}/{safe}"
|
||
existing = await session.execute(
|
||
select(Document).where(Document.file_path == rel_path).limit(1)
|
||
)
|
||
if existing.scalars().first():
|
||
return False
|
||
|
||
dest = Path(settings.nas_mount_path) / rel_path
|
||
size = await _download(filepath, dest)
|
||
ext = (safe.rsplit(".", 1)[-1].lower() if "." in safe else "bin")[:10]
|
||
|
||
doc = Document(
|
||
file_path=rel_path,
|
||
file_hash=hashlib.sha256(dest.read_bytes()).hexdigest(),
|
||
file_format=ext,
|
||
file_size=size,
|
||
file_type="immutable",
|
||
title=safe.rsplit(".", 1)[0],
|
||
source_channel="crawl",
|
||
data_origin="external",
|
||
import_source="kosha_api",
|
||
edit_url=filepath,
|
||
ai_tags=["Safety/KOSHA재해사례/첨부"],
|
||
# 안전 자료실 A-2 — ingest 시점 deterministic (classify 경유해도 LLM 비의존)
|
||
material_type="incident",
|
||
jurisdiction="KR",
|
||
extract_meta={"kosha": {"boardno": boardno, "kind": "case_attachment"},
|
||
"license": dict(_KOSHA_LICENSE)},
|
||
)
|
||
session.add(doc)
|
||
await session.flush()
|
||
# extract → (crawl override) classify → embed/chunk — 기존 파일 파이프라인 재사용
|
||
await enqueue_stage(session, doc.id, "extract")
|
||
logger.info(f"[kosha] 첨부 ingest: {rel_path} ({size} bytes)")
|
||
return True
|
||
|
||
|
||
async def collect_disaster_cases(session) -> int:
|
||
"""재해사례 daily diff — 최근 _CASE_PAGES 페이지, boardno dedup."""
|
||
key = _api_key()
|
||
source = await _get_or_create_source(session, _CASE_SOURCE, _BOARD_EP)
|
||
new_count = 0
|
||
|
||
for page in range(1, _CASE_PAGES + 1):
|
||
payload = await _api_get(
|
||
f"{_BOARD_EP}?serviceKey={key}&callApiId=1060&pageNo={page}&numOfRows={_CASE_ROWS}"
|
||
)
|
||
items = _items(payload)
|
||
if not items:
|
||
break
|
||
page_all_dup = True
|
||
for item in items:
|
||
boardno = str(item.get("boardno") or "").strip()
|
||
title = (item.get("keyword") or "").strip()
|
||
if not boardno or not title:
|
||
continue
|
||
fhash = hashlib.sha256(f"kosha-case|{boardno}".encode()).hexdigest()[:32]
|
||
existing = await session.execute(
|
||
select(Document).where(Document.file_hash == fhash).limit(1)
|
||
)
|
||
if existing.scalars().first():
|
||
continue
|
||
page_all_dup = False
|
||
|
||
contents = (item.get("contents") or "").strip()
|
||
business = (item.get("business") or "").strip()
|
||
now = datetime.now(timezone.utc)
|
||
doc = Document(
|
||
file_path=f"crawl/{_CASE_SOURCE}/{boardno}",
|
||
file_hash=fhash,
|
||
file_format="article",
|
||
file_size=len(contents.encode()),
|
||
file_type="note",
|
||
title=title,
|
||
extracted_text=f"{title}\n\n[{business}]\n{contents}",
|
||
extracted_at=now,
|
||
extractor_version="kosha_api",
|
||
md_status="skipped",
|
||
md_extraction_error="kosha case: 텍스트 네이티브, markdown 변환 비대상",
|
||
source_channel="crawl",
|
||
data_origin="external",
|
||
review_status="approved",
|
||
ai_domain="Safety",
|
||
ai_sub_group=_CASE_SOURCE,
|
||
ai_tags=[f"Safety/KOSHA재해사례/{business or '기타'}"],
|
||
# 안전 자료실 A-2 — ingest 시점 deterministic (classify-skip 경로)
|
||
material_type="incident",
|
||
jurisdiction="KR",
|
||
extract_meta={
|
||
"source_id": source.id,
|
||
"source_name": _CASE_SOURCE,
|
||
"published_at": None,
|
||
"kosha": {"boardno": boardno, "business": business,
|
||
"atcflcnt": item.get("atcflcnt")},
|
||
"license": dict(_KOSHA_LICENSE),
|
||
},
|
||
)
|
||
session.add(doc)
|
||
await session.flush()
|
||
await enqueue_stage(session, doc.id, "summarize")
|
||
await enqueue_stage(session, doc.id, "embed")
|
||
await enqueue_stage(session, doc.id, "chunk")
|
||
new_count += 1
|
||
|
||
# 첨부 (PDF/HWP) — 본문보다 정보량 큰 정식 사례 보고서
|
||
if int(item.get("atcflcnt") or 0) > 0:
|
||
attach = await _api_get(
|
||
f"{_ATTACH_EP}?serviceKey={key}&callApiId=1070"
|
||
f"&pageNo=1&numOfRows=10&boardno={boardno}"
|
||
)
|
||
for att in _items(attach):
|
||
filenm = (att.get("filenm") or "").strip()
|
||
filepath = (att.get("filepath") or "").strip()
|
||
if not filenm or not filepath.startswith("https://"):
|
||
continue
|
||
try:
|
||
await _ingest_attachment(session, boardno, filenm, filepath)
|
||
except FeedError as e:
|
||
logger.warning(f"[kosha] 첨부 실패 skip ({boardno}/{filenm}): {e}")
|
||
|
||
# 케이스 단위 commit (R4) — 이후 페이지/케이스의 _api_get 실패가 앞서 적재한
|
||
# 케이스까지 전체 rollback 하지 않게 부분 적재 보존 (csb/api_standards idiom).
|
||
await session.commit()
|
||
if page_all_dup:
|
||
break # 등록일 역순 — 페이지 전체가 기존이면 이후 페이지도 기존
|
||
|
||
logger.info(f"[kosha] 재해사례 신규 {new_count}건")
|
||
return new_count
|
||
|
||
|
||
async def collect_fatal_accidents(session) -> int:
|
||
"""사망사고 속보 daily diff — 최근 _FATAL_PAGES 페이지, arno dedup.
|
||
|
||
재해사례(1060)와 별 채널(1040): business 필드·첨부 API 없음, contents=HTML.
|
||
본문 = 텍스트 네이티브(_clean_html) → md 변환 비대상, summarize/embed/chunk 큐.
|
||
"""
|
||
key = _api_key()
|
||
source = await _get_or_create_source(session, _FATAL_SOURCE, _FATAL_EP)
|
||
new_count = 0
|
||
|
||
for page in range(1, _FATAL_PAGES + 1):
|
||
payload = await _api_get(
|
||
f"{_FATAL_EP}?serviceKey={key}&callApiId=1040&pageNo={page}&numOfRows={_FATAL_ROWS}"
|
||
)
|
||
items = _items(payload)
|
||
if not items:
|
||
break
|
||
page_all_dup = True
|
||
for item in items:
|
||
fields = _fatal_fields(item)
|
||
if fields is None:
|
||
continue
|
||
arno = fields["arno"]
|
||
fhash = hashlib.sha256(f"kosha-fatal|{arno}".encode()).hexdigest()[:32]
|
||
existing = await session.execute(
|
||
select(Document).where(Document.file_hash == fhash).limit(1)
|
||
)
|
||
if existing.scalars().first():
|
||
continue
|
||
page_all_dup = False
|
||
|
||
text = fields["text"]
|
||
now = datetime.now(timezone.utc)
|
||
doc = Document(
|
||
file_path=f"crawl/{_FATAL_SOURCE}/{arno}",
|
||
file_hash=fhash,
|
||
file_format="article",
|
||
file_size=len(text.encode()),
|
||
file_type="note",
|
||
title=fields["title"],
|
||
extracted_text=f"{fields['title']}\n\n{text}",
|
||
extracted_at=now,
|
||
extractor_version="kosha_api",
|
||
md_status="skipped",
|
||
md_extraction_error="kosha fatal: 텍스트 네이티브, markdown 변환 비대상",
|
||
source_channel="crawl",
|
||
data_origin="external",
|
||
review_status="approved",
|
||
ai_domain="Safety",
|
||
ai_sub_group=_FATAL_SOURCE,
|
||
ai_tags=["Safety/KOSHA사망사고"],
|
||
# 안전 자료실 A-2 — ingest 시점 deterministic (classify-skip 경로)
|
||
material_type="incident",
|
||
jurisdiction="KR",
|
||
published_date=fields["published_date"],
|
||
extract_meta={
|
||
"source_id": source.id,
|
||
"source_name": _FATAL_SOURCE,
|
||
"published_at": None,
|
||
"kosha": {"arno": arno, "kind": "fatal_accident",
|
||
"reg_dt": fields["reg_dt"]},
|
||
"license": dict(_KOSHA_LICENSE),
|
||
},
|
||
)
|
||
session.add(doc)
|
||
await session.flush()
|
||
await enqueue_stage(session, doc.id, "summarize")
|
||
await enqueue_stage(session, doc.id, "embed")
|
||
await enqueue_stage(session, doc.id, "chunk")
|
||
new_count += 1
|
||
# 케이스 단위 commit (R4) — 이후 페이지 실패가 앞 케이스 전체 rollback 방지.
|
||
await session.commit()
|
||
if page_all_dup:
|
||
break # 등록일 역순 — 페이지 전체가 기존이면 이후 페이지도 기존
|
||
|
||
logger.info(f"[kosha] 사망사고 신규 {new_count}건")
|
||
return new_count
|
||
|
||
|
||
async def collect_kosha_guide(session, cap: int = _GUIDE_DAILY_CAP) -> int:
|
||
"""GUIDE 레지스트리 전체 메타 diff → 신규/개정만 다운로드 (일일 cap 점진 백필)."""
|
||
key = _api_key()
|
||
await _get_or_create_source(session, _GUIDE_SOURCE, _GUIDE_EP)
|
||
new_specs: list[dict] = []
|
||
page, total = 1, None
|
||
|
||
while True:
|
||
payload = await _api_get(
|
||
f"{_GUIDE_EP}?serviceKey={key}&callApiId=1050&pageNo={page}&numOfRows={_GUIDE_ROWS}"
|
||
)
|
||
if total is None:
|
||
total = int((payload.get("body") or {}).get("totalCount") or 0)
|
||
items = _items(payload)
|
||
if not items:
|
||
break
|
||
for item in items:
|
||
no = (item.get("techGdlnNo") or "").strip()
|
||
ymd = (item.get("techGdlnOfancYmd") or "").strip()
|
||
url = (item.get("fileDownloadUrl") or "").strip()
|
||
if not no or not url.startswith("https://"):
|
||
continue
|
||
fhash = hashlib.sha256(f"kosha-guide|{no}|{ymd}".encode()).hexdigest()[:32]
|
||
existing = await session.execute(
|
||
select(Document).where(Document.file_hash == fhash).limit(1)
|
||
)
|
||
if not existing.scalars().first():
|
||
new_specs.append({"no": no, "ymd": ymd, "url": url,
|
||
"name": (item.get("techGdlnNm") or no).strip(),
|
||
"fhash": fhash})
|
||
if page * _GUIDE_ROWS >= total:
|
||
break
|
||
page += 1
|
||
|
||
todo, deferred = new_specs[:cap], len(new_specs) - min(len(new_specs), cap)
|
||
ingested = 0
|
||
for spec in todo:
|
||
safe_no = _safe_filename(spec["no"])
|
||
rel_path = f"crawl_raw/kosha_guide/{safe_no}-{spec['ymd'] or 'nodate'}.pdf"
|
||
dest = Path(settings.nas_mount_path) / rel_path
|
||
try:
|
||
size = await _download(spec["url"], dest)
|
||
except FeedError as e:
|
||
logger.warning(f"[kosha] GUIDE 다운로드 실패 skip ({spec['no']}): {e}")
|
||
continue
|
||
doc = Document(
|
||
file_path=rel_path,
|
||
file_hash=spec["fhash"],
|
||
file_format="pdf",
|
||
file_size=size,
|
||
file_type="immutable",
|
||
title=f"{spec['name']} ({spec['no']})",
|
||
source_channel="crawl",
|
||
data_origin="external",
|
||
import_source="kosha_api",
|
||
edit_url=spec["url"],
|
||
ai_tags=["Safety/KOSHA GUIDE"],
|
||
# 안전 자료실 A-2 — GUIDE = 구속력 없는 권고 기술지침 (law 아님, plan 0-1)
|
||
material_type="guide",
|
||
jurisdiction="KR",
|
||
published_date=_ymd_to_date(spec["ymd"]),
|
||
extract_meta={"kosha": {"kind": "guide", "techGdlnNo": spec["no"],
|
||
"ofancYmd": spec["ymd"]},
|
||
"license": dict(_KOSHA_LICENSE)},
|
||
)
|
||
session.add(doc)
|
||
await session.flush()
|
||
await enqueue_stage(session, doc.id, "extract")
|
||
ingested += 1
|
||
# 항목 단위 commit (R4) — 다운로드 실패가 앞서 적재한 GUIDE 항목 전체 rollback 방지.
|
||
await session.commit()
|
||
|
||
# silent cap 금지 — 잔량 가시화 (자동 점진 백필: 내일 cap 만큼 또 소화)
|
||
logger.info(f"[kosha] GUIDE 신규/개정 {len(new_specs)}건 중 {ingested}건 ingest"
|
||
+ (f" (cap {cap}, 잔여 {deferred}건 — 일일 점진 백필)" if deferred > 0 else ""))
|
||
return ingested
|
||
|
||
|
||
async def run() -> None:
|
||
"""daily 1회 — 소스별 실패 격리 (재해사례 실패가 GUIDE 를 막지 않게)."""
|
||
now = datetime.now(timezone.utc)
|
||
for name, collector in ((_CASE_SOURCE, collect_disaster_cases),
|
||
(_FATAL_SOURCE, collect_fatal_accidents),
|
||
(_GUIDE_SOURCE, collect_kosha_guide)):
|
||
async with async_session() as session:
|
||
result = await session.execute(select(NewsSource).where(NewsSource.name == name))
|
||
source = result.scalars().first()
|
||
try:
|
||
count = await collector(session)
|
||
if source is None: # 첫 실행에서 collector 가 생성
|
||
result = await session.execute(
|
||
select(NewsSource).where(NewsSource.name == name))
|
||
source = result.scalars().first()
|
||
health = await _get_or_create_health(session, source.id)
|
||
_record_success(health, count, False, now)
|
||
await session.commit()
|
||
except Exception as e:
|
||
logger.error(f"[kosha] {name} 수집 실패: {e}")
|
||
await session.rollback() # 부분 적재 폐기 후 health 만 기록
|
||
if source is not None:
|
||
health = await _get_or_create_health(session, source.id)
|
||
_record_failure(health, str(e) or repr(e), now)
|
||
await session.commit()
|
||
|
||
|
||
if __name__ == "__main__":
|
||
asyncio.run(run())
|