Files
hyungi_document_server/app/workers/kosha_collector.py
T
hyungi 235aa648ad feat(safety): B-2 KOSHA 사망사고 속보 수집기 (callApiId=1040)
data.go.kr 15119137 활용신청 전파 완료 → news_api02/getNews_api02 라이브.
collect_fatal_accidents: arno dedup(kosha-fatal|{arno}) + material_type=incident/
jurisdiction=KR + license=kogl. contents=HTML → _clean_html, published_date =
arno 접두 8자리(YYYYMMDD 등록일, 2019~ 라이브 전수 동형 검증). 첨부 API·business
필드 없는 별 채널(1040). run() 일일 잡(06:40 KST) 튜플 합류 — 소스별 실패 격리 유지.
순수 헬퍼 _fatal_fields + fixture 테스트(tests/test_kosha_fatal.py).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-13 13:42:12 +09:00

489 lines
21 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""C-2 KOSHA Open API 수집 워커 (plan crawl-24x7-1).
4 API (2026-06-10/06-13 실키 live 검증 + fixture 박제 — tests/fixtures/kosha_*_response.json):
재해사례 게시판: GET /B552468/disaster_api02/getdisaster_api02 callApiId=1060
재해사례 첨부: GET /B552468/disaster_attach_api02/Disaster_attach_api02 callApiId=1070
KOSHA GUIDE: GET /B552468/koshaguide/getKoshaGuide callApiId=1050
사망사고 속보: GET /B552468/news_api02/getNews_api02 callApiId=1040
daily 스케줄 1회 (main.py):
재해사례 = 최근 페이지만 diff (boardno dedup) — 사례 본문 Document(텍스트 네이티브)
+ 첨부 PDF/HWP 다운로드 → /documents/crawl_raw/kosha/{boardno}/ 저장
→ 파일 Document + extract enqueue (kordoc HWP/PDF 기존 파이프라인 재사용).
사망사고 = 최근 페이지만 diff (arno dedup) — 속보 본문 Document(HTML → _clean_html).
첨부 API 없음·business 필드 없음. 등록일 = arno 접두 8자리(YYYYMMDD).
GUIDE = 전체 레지스트리 메타 diff (1039건, 100/page = 11 call) → 신규/개정만,
일일 ingest cap(기본 25) = backlog 자동 점진 백필(~6주) + 부하 평탄화.
cap 으로 미처리 잔량은 매회 로그 (silent cap 금지).
키: KOSHA_API_KEY (credentials.env) — 공공데이터포털 '인코딩' 키를 그대로 저장.
httpx params= 로 넘기면 % 가 재인코딩되므로 반드시 URL 문자열에 직접 결합.
개정 감지: GUIDE dedup 키 = 규정번호+공표일자 — 같은 번호의 새 공표일자 = 신규 문서로 적재.
"""
import asyncio
import hashlib
import os
import random
import re
from datetime import date, datetime, timezone
from pathlib import Path
import httpx
from sqlalchemy import select
from core.config import settings
from core.crawl_politeness import CRAWL_UA
from core.database import async_session
from core.utils import setup_logger
from models.document import Document
from models.news_source import NewsSource
from models.queue import enqueue_stage
from workers.news_collector import (
FeedError,
_clean_html,
_get_or_create_health,
_record_failure,
_record_success,
)
logger = setup_logger("kosha_collector")
_BASE = "https://apis.data.go.kr/B552468"
_BOARD_EP = f"{_BASE}/disaster_api02/getdisaster_api02"
_ATTACH_EP = f"{_BASE}/disaster_attach_api02/Disaster_attach_api02"
_GUIDE_EP = f"{_BASE}/koshaguide/getKoshaGuide"
_FATAL_EP = f"{_BASE}/news_api02/getNews_api02"
_CASE_SOURCE = "KOSHA 재해사례"
_GUIDE_SOURCE = "KOSHA GUIDE"
_FATAL_SOURCE = "KOSHA 사망사고"
_CASE_PAGES = 2 # daily diff 범위 (30×2 = 최근 60건 — 등록일 역순 API)
_CASE_ROWS = 30
_FATAL_PAGES = 2 # 사망사고 속보 daily diff (30×2 = 최근 60건 — 등록일 역순)
_FATAL_ROWS = 30
_GUIDE_ROWS = 100
_GUIDE_DAILY_CAP = int(os.getenv("KOSHA_GUIDE_DAILY_CAP", "25"))
_MAX_FILE_BYTES = 50 * 1024 * 1024
_DOWNLOAD_DELAY = (2.0, 5.0) # portal.kosha.or.kr 파일서버 — 연속 다운로드 간격
# 안전 자료실 A-2 — KOSHA 산출물 라이선스 (KOGL 유형 미확정 → 보수적 redistribute=False,
# 근거 확보 시 완화. 0-3 license 메타 deterministic 주입).
_KOSHA_LICENSE = {"scheme": "kogl", "redistribute": False, "attribution": "한국산업안전보건공단(KOSHA)"}
def _ymd_to_date(ymd: str | None) -> date | None:
"""'YYYYMMDD'/'YYYY-MM-DD' → date. 형식 불일치는 None (fail-quiet — 날짜는 보조 축)."""
digits = re.sub(r"\D", "", ymd or "")
if len(digits) != 8:
return None
try:
return date(int(digits[:4]), int(digits[4:6]), int(digits[6:8]))
except ValueError:
return None
def _api_key() -> str:
key = os.getenv("KOSHA_API_KEY", "")
if not key:
raise FeedError("KOSHA_API_KEY 미설정 — KOSHA 수집 불가")
return key
async def _api_get(url: str) -> dict:
"""공통 GET — 게이트웨이/제공자 이중 에러 체계 검사."""
async with httpx.AsyncClient(timeout=25) as client:
resp = await client.get(url, headers={"User-Agent": CRAWL_UA})
if resp.status_code != 200:
raise FeedError(f"KOSHA API {resp.status_code} @ {url.split('?')[0]}")
try:
payload = resp.json()
except ValueError as e:
# 게이트웨이 에러는 XML/plain 으로 옴 (SERVICE_KEY_IS_NOT_REGISTERED 등)
raise FeedError(f"KOSHA API 비-JSON 응답: {resp.text[:120]}") from e
code = (payload.get("header") or {}).get("resultCode")
if code != "00":
raise FeedError(f"KOSHA API resultCode={code}: {(payload.get('header') or {}).get('resultMsg')}")
return payload
def _items(payload: dict) -> list[dict]:
"""body.items.item — 단건이면 dict 로 오는 data.go.kr 관행 방어."""
item = ((payload.get("body") or {}).get("items") or {}).get("item")
if item is None:
return []
return [item] if isinstance(item, dict) else list(item)
def _fatal_fields(item: dict) -> dict | None:
"""사망사고 item(arno/keyword/contents 3필드 고정) → Document 필드 매핑.
순수 함수(httpx/DB 불요 — fixture 단위 테스트 대상). 필수 = arno+keyword,
부재 시 None(skip). 날짜 전용 필드가 없어 등록 식별자 arno 접두에서 유도:
arno = 'YYYYMMDDHHMMSS' + 임의 6자 (2019~ 라이브 전수 동형 검증). 접두 8자리=KST
등록일 → published_date, 14자리=등록시각 → reg_dt(원문 그대로, tz 해석 미주장).
"""
arno = str(item.get("arno") or "").strip()
title = (item.get("keyword") or "").strip()
if not arno or not title:
return None
text = _clean_html(item.get("contents") or "", max_len=None)
reg_dt = arno[:14] if re.fullmatch(r"\d{14}", arno[:14]) else None
return {
"arno": arno,
"title": title,
"text": text,
"published_date": _ymd_to_date(arno[:8]),
"reg_dt": reg_dt,
}
def _safe_filename(name: str) -> str:
"""NAS 파일명 정화 — 경로분리자/제어문자/공백연쇄 제거 (쉘 함정 회피)."""
name = re.sub(r"[/\\\x00-\x1f]", "_", name).strip()
name = re.sub(r"\s+", " ", name)
return name[:140] or "unnamed"
async def _download(url: str, dest: Path) -> int:
"""첨부/규정 파일 다운로드 — 크기 cap + 디렉토리 생성 + 연속 간격."""
await asyncio.sleep(random.uniform(*_DOWNLOAD_DELAY))
async with httpx.AsyncClient(timeout=60, follow_redirects=True) as client:
resp = await client.get(url, headers={"User-Agent": CRAWL_UA})
if resp.status_code != 200:
raise FeedError(f"파일 다운로드 {resp.status_code}: {url}")
if len(resp.content) > _MAX_FILE_BYTES:
raise FeedError(f"파일 크기 초과 ({len(resp.content)} bytes): {url}")
dest.parent.mkdir(parents=True, exist_ok=True)
dest.write_bytes(resp.content)
return len(resp.content)
async def _get_or_create_source(session, name: str, feed_url: str) -> NewsSource:
result = await session.execute(select(NewsSource).where(NewsSource.name == name))
source = result.scalars().first()
if source is None:
source = NewsSource(
name=name, feed_url=feed_url, feed_type="rss", fetch_method="api",
fulltext_policy="none", source_channel="crawl", category="Safety",
language="ko", country="KR",
enabled=False, # 6h 뉴스 사이클 비대상 — 본 워커가 daily 폴링
)
session.add(source)
await session.flush()
return source
async def _ingest_attachment(session, boardno: str, filenm: str, filepath: str) -> bool:
"""첨부 1건 → NAS 저장 + 파일 Document + extract enqueue. 반환 = 신규 여부."""
safe = _safe_filename(filenm)
rel_path = f"crawl_raw/kosha/{boardno}/{safe}"
existing = await session.execute(
select(Document).where(Document.file_path == rel_path).limit(1)
)
if existing.scalars().first():
return False
dest = Path(settings.nas_mount_path) / rel_path
size = await _download(filepath, dest)
ext = (safe.rsplit(".", 1)[-1].lower() if "." in safe else "bin")[:10]
doc = Document(
file_path=rel_path,
file_hash=hashlib.sha256(dest.read_bytes()).hexdigest(),
file_format=ext,
file_size=size,
file_type="immutable",
title=safe.rsplit(".", 1)[0],
source_channel="crawl",
data_origin="external",
import_source="kosha_api",
edit_url=filepath,
ai_tags=["Safety/KOSHA재해사례/첨부"],
# 안전 자료실 A-2 — ingest 시점 deterministic (classify 경유해도 LLM 비의존)
material_type="incident",
jurisdiction="KR",
extract_meta={"kosha": {"boardno": boardno, "kind": "case_attachment"},
"license": dict(_KOSHA_LICENSE)},
)
session.add(doc)
await session.flush()
# extract → (crawl override) classify → embed/chunk — 기존 파일 파이프라인 재사용
await enqueue_stage(session, doc.id, "extract")
logger.info(f"[kosha] 첨부 ingest: {rel_path} ({size} bytes)")
return True
async def collect_disaster_cases(session) -> int:
"""재해사례 daily diff — 최근 _CASE_PAGES 페이지, boardno dedup."""
key = _api_key()
source = await _get_or_create_source(session, _CASE_SOURCE, _BOARD_EP)
new_count = 0
for page in range(1, _CASE_PAGES + 1):
payload = await _api_get(
f"{_BOARD_EP}?serviceKey={key}&callApiId=1060&pageNo={page}&numOfRows={_CASE_ROWS}"
)
items = _items(payload)
if not items:
break
page_all_dup = True
for item in items:
boardno = str(item.get("boardno") or "").strip()
title = (item.get("keyword") or "").strip()
if not boardno or not title:
continue
fhash = hashlib.sha256(f"kosha-case|{boardno}".encode()).hexdigest()[:32]
existing = await session.execute(
select(Document).where(Document.file_hash == fhash).limit(1)
)
if existing.scalars().first():
continue
page_all_dup = False
contents = (item.get("contents") or "").strip()
business = (item.get("business") or "").strip()
now = datetime.now(timezone.utc)
doc = Document(
file_path=f"crawl/{_CASE_SOURCE}/{boardno}",
file_hash=fhash,
file_format="article",
file_size=len(contents.encode()),
file_type="note",
title=title,
extracted_text=f"{title}\n\n[{business}]\n{contents}",
extracted_at=now,
extractor_version="kosha_api",
md_status="skipped",
md_extraction_error="kosha case: 텍스트 네이티브, markdown 변환 비대상",
source_channel="crawl",
data_origin="external",
review_status="approved",
ai_domain="Safety",
ai_sub_group=_CASE_SOURCE,
ai_tags=[f"Safety/KOSHA재해사례/{business or '기타'}"],
# 안전 자료실 A-2 — ingest 시점 deterministic (classify-skip 경로)
material_type="incident",
jurisdiction="KR",
extract_meta={
"source_id": source.id,
"source_name": _CASE_SOURCE,
"published_at": None,
"kosha": {"boardno": boardno, "business": business,
"atcflcnt": item.get("atcflcnt")},
"license": dict(_KOSHA_LICENSE),
},
)
session.add(doc)
await session.flush()
await enqueue_stage(session, doc.id, "summarize")
await enqueue_stage(session, doc.id, "embed")
await enqueue_stage(session, doc.id, "chunk")
new_count += 1
# 첨부 (PDF/HWP) — 본문보다 정보량 큰 정식 사례 보고서
if int(item.get("atcflcnt") or 0) > 0:
attach = await _api_get(
f"{_ATTACH_EP}?serviceKey={key}&callApiId=1070"
f"&pageNo=1&numOfRows=10&boardno={boardno}"
)
for att in _items(attach):
filenm = (att.get("filenm") or "").strip()
filepath = (att.get("filepath") or "").strip()
if not filenm or not filepath.startswith("https://"):
continue
try:
await _ingest_attachment(session, boardno, filenm, filepath)
except FeedError as e:
logger.warning(f"[kosha] 첨부 실패 skip ({boardno}/{filenm}): {e}")
if page_all_dup:
break # 등록일 역순 — 페이지 전체가 기존이면 이후 페이지도 기존
logger.info(f"[kosha] 재해사례 신규 {new_count}")
return new_count
async def collect_fatal_accidents(session) -> int:
"""사망사고 속보 daily diff — 최근 _FATAL_PAGES 페이지, arno dedup.
재해사례(1060)와 별 채널(1040): business 필드·첨부 API 없음, contents=HTML.
본문 = 텍스트 네이티브(_clean_html) → md 변환 비대상, summarize/embed/chunk 큐.
"""
key = _api_key()
source = await _get_or_create_source(session, _FATAL_SOURCE, _FATAL_EP)
new_count = 0
for page in range(1, _FATAL_PAGES + 1):
payload = await _api_get(
f"{_FATAL_EP}?serviceKey={key}&callApiId=1040&pageNo={page}&numOfRows={_FATAL_ROWS}"
)
items = _items(payload)
if not items:
break
page_all_dup = True
for item in items:
fields = _fatal_fields(item)
if fields is None:
continue
arno = fields["arno"]
fhash = hashlib.sha256(f"kosha-fatal|{arno}".encode()).hexdigest()[:32]
existing = await session.execute(
select(Document).where(Document.file_hash == fhash).limit(1)
)
if existing.scalars().first():
continue
page_all_dup = False
text = fields["text"]
now = datetime.now(timezone.utc)
doc = Document(
file_path=f"crawl/{_FATAL_SOURCE}/{arno}",
file_hash=fhash,
file_format="article",
file_size=len(text.encode()),
file_type="note",
title=fields["title"],
extracted_text=f"{fields['title']}\n\n{text}",
extracted_at=now,
extractor_version="kosha_api",
md_status="skipped",
md_extraction_error="kosha fatal: 텍스트 네이티브, markdown 변환 비대상",
source_channel="crawl",
data_origin="external",
review_status="approved",
ai_domain="Safety",
ai_sub_group=_FATAL_SOURCE,
ai_tags=["Safety/KOSHA사망사고"],
# 안전 자료실 A-2 — ingest 시점 deterministic (classify-skip 경로)
material_type="incident",
jurisdiction="KR",
published_date=fields["published_date"],
extract_meta={
"source_id": source.id,
"source_name": _FATAL_SOURCE,
"published_at": None,
"kosha": {"arno": arno, "kind": "fatal_accident",
"reg_dt": fields["reg_dt"]},
"license": dict(_KOSHA_LICENSE),
},
)
session.add(doc)
await session.flush()
await enqueue_stage(session, doc.id, "summarize")
await enqueue_stage(session, doc.id, "embed")
await enqueue_stage(session, doc.id, "chunk")
new_count += 1
if page_all_dup:
break # 등록일 역순 — 페이지 전체가 기존이면 이후 페이지도 기존
logger.info(f"[kosha] 사망사고 신규 {new_count}")
return new_count
async def collect_kosha_guide(session, cap: int = _GUIDE_DAILY_CAP) -> int:
"""GUIDE 레지스트리 전체 메타 diff → 신규/개정만 다운로드 (일일 cap 점진 백필)."""
key = _api_key()
await _get_or_create_source(session, _GUIDE_SOURCE, _GUIDE_EP)
new_specs: list[dict] = []
page, total = 1, None
while True:
payload = await _api_get(
f"{_GUIDE_EP}?serviceKey={key}&callApiId=1050&pageNo={page}&numOfRows={_GUIDE_ROWS}"
)
if total is None:
total = int((payload.get("body") or {}).get("totalCount") or 0)
items = _items(payload)
if not items:
break
for item in items:
no = (item.get("techGdlnNo") or "").strip()
ymd = (item.get("techGdlnOfancYmd") or "").strip()
url = (item.get("fileDownloadUrl") or "").strip()
if not no or not url.startswith("https://"):
continue
fhash = hashlib.sha256(f"kosha-guide|{no}|{ymd}".encode()).hexdigest()[:32]
existing = await session.execute(
select(Document).where(Document.file_hash == fhash).limit(1)
)
if not existing.scalars().first():
new_specs.append({"no": no, "ymd": ymd, "url": url,
"name": (item.get("techGdlnNm") or no).strip(),
"fhash": fhash})
if page * _GUIDE_ROWS >= total:
break
page += 1
todo, deferred = new_specs[:cap], len(new_specs) - min(len(new_specs), cap)
ingested = 0
for spec in todo:
safe_no = _safe_filename(spec["no"])
rel_path = f"crawl_raw/kosha_guide/{safe_no}-{spec['ymd'] or 'nodate'}.pdf"
dest = Path(settings.nas_mount_path) / rel_path
try:
size = await _download(spec["url"], dest)
except FeedError as e:
logger.warning(f"[kosha] GUIDE 다운로드 실패 skip ({spec['no']}): {e}")
continue
doc = Document(
file_path=rel_path,
file_hash=spec["fhash"],
file_format="pdf",
file_size=size,
file_type="immutable",
title=f"{spec['name']} ({spec['no']})",
source_channel="crawl",
data_origin="external",
import_source="kosha_api",
edit_url=spec["url"],
ai_tags=["Safety/KOSHA GUIDE"],
# 안전 자료실 A-2 — GUIDE = 구속력 없는 권고 기술지침 (law 아님, plan 0-1)
material_type="guide",
jurisdiction="KR",
published_date=_ymd_to_date(spec["ymd"]),
extract_meta={"kosha": {"kind": "guide", "techGdlnNo": spec["no"],
"ofancYmd": spec["ymd"]},
"license": dict(_KOSHA_LICENSE)},
)
session.add(doc)
await session.flush()
await enqueue_stage(session, doc.id, "extract")
ingested += 1
# silent cap 금지 — 잔량 가시화 (자동 점진 백필: 내일 cap 만큼 또 소화)
logger.info(f"[kosha] GUIDE 신규/개정 {len(new_specs)}건 중 {ingested}건 ingest"
+ (f" (cap {cap}, 잔여 {deferred}건 — 일일 점진 백필)" if deferred > 0 else ""))
return ingested
async def run() -> None:
"""daily 1회 — 소스별 실패 격리 (재해사례 실패가 GUIDE 를 막지 않게)."""
now = datetime.now(timezone.utc)
for name, collector in ((_CASE_SOURCE, collect_disaster_cases),
(_FATAL_SOURCE, collect_fatal_accidents),
(_GUIDE_SOURCE, collect_kosha_guide)):
async with async_session() as session:
result = await session.execute(select(NewsSource).where(NewsSource.name == name))
source = result.scalars().first()
try:
count = await collector(session)
if source is None: # 첫 실행에서 collector 가 생성
result = await session.execute(
select(NewsSource).where(NewsSource.name == name))
source = result.scalars().first()
health = await _get_or_create_health(session, source.id)
_record_success(health, count, False, now)
await session.commit()
except Exception as e:
logger.error(f"[kosha] {name} 수집 실패: {e}")
await session.rollback() # 부분 적재 폐기 후 health 만 기록
if source is not None:
health = await _get_or_create_health(session, source.id)
_record_failure(health, str(e) or repr(e), now)
await session.commit()
if __name__ == "__main__":
asyncio.run(run())