diff --git a/app/workers/kosha_collector.py b/app/workers/kosha_collector.py index 611d707..0ece796 100644 --- a/app/workers/kosha_collector.py +++ b/app/workers/kosha_collector.py @@ -1,14 +1,17 @@ """C-2 KOSHA Open API 수집 워커 (plan crawl-24x7-1). -3 API (2026-06-10 실키 live 검증 + fixture 박제 — tests/fixtures/kosha_*_response.json): +4 API (2026-06-10/06-13 실키 live 검증 + fixture 박제 — tests/fixtures/kosha_*_response.json): 재해사례 게시판: GET /B552468/disaster_api02/getdisaster_api02 callApiId=1060 재해사례 첨부: GET /B552468/disaster_attach_api02/Disaster_attach_api02 callApiId=1070 KOSHA GUIDE: GET /B552468/koshaguide/getKoshaGuide callApiId=1050 + 사망사고 속보: GET /B552468/news_api02/getNews_api02 callApiId=1040 daily 스케줄 1회 (main.py): 재해사례 = 최근 페이지만 diff (boardno dedup) — 사례 본문 Document(텍스트 네이티브) + 첨부 PDF/HWP 다운로드 → /documents/crawl_raw/kosha/{boardno}/ 저장 → 파일 Document + extract enqueue (kordoc HWP/PDF 기존 파이프라인 재사용). + 사망사고 = 최근 페이지만 diff (arno dedup) — 속보 본문 Document(HTML → _clean_html). + 첨부 API 없음·business 필드 없음. 등록일 = arno 접두 8자리(YYYYMMDD). GUIDE = 전체 레지스트리 메타 diff (1039건, 100/page = 11 call) → 신규/개정만, 일일 ingest cap(기본 25) = backlog 자동 점진 백필(~6주) + 부하 평탄화. cap 으로 미처리 잔량은 매회 로그 (silent cap 금지). @@ -38,6 +41,7 @@ from models.news_source import NewsSource from models.queue import enqueue_stage from workers.news_collector import ( FeedError, + _clean_html, _get_or_create_health, _record_failure, _record_success, @@ -49,12 +53,16 @@ _BASE = "https://apis.data.go.kr/B552468" _BOARD_EP = f"{_BASE}/disaster_api02/getdisaster_api02" _ATTACH_EP = f"{_BASE}/disaster_attach_api02/Disaster_attach_api02" _GUIDE_EP = f"{_BASE}/koshaguide/getKoshaGuide" +_FATAL_EP = f"{_BASE}/news_api02/getNews_api02" _CASE_SOURCE = "KOSHA 재해사례" _GUIDE_SOURCE = "KOSHA GUIDE" +_FATAL_SOURCE = "KOSHA 사망사고" _CASE_PAGES = 2 # daily diff 범위 (30×2 = 최근 60건 — 등록일 역순 API) _CASE_ROWS = 30 +_FATAL_PAGES = 2 # 사망사고 속보 daily diff (30×2 = 최근 60건 — 등록일 역순) +_FATAL_ROWS = 30 _GUIDE_ROWS = 100 _GUIDE_DAILY_CAP = int(os.getenv("KOSHA_GUIDE_DAILY_CAP", "25")) _MAX_FILE_BYTES = 50 * 1024 * 1024 @@ -108,6 +116,29 @@ def _items(payload: dict) -> list[dict]: return [item] if isinstance(item, dict) else list(item) +def _fatal_fields(item: dict) -> dict | None: + """사망사고 item(arno/keyword/contents 3필드 고정) → Document 필드 매핑. + + 순수 함수(httpx/DB 불요 — fixture 단위 테스트 대상). 필수 = arno+keyword, + 부재 시 None(skip). 날짜 전용 필드가 없어 등록 식별자 arno 접두에서 유도: + arno = 'YYYYMMDDHHMMSS' + 임의 6자 (2019~ 라이브 전수 동형 검증). 접두 8자리=KST + 등록일 → published_date, 14자리=등록시각 → reg_dt(원문 그대로, tz 해석 미주장). + """ + arno = str(item.get("arno") or "").strip() + title = (item.get("keyword") or "").strip() + if not arno or not title: + return None + text = _clean_html(item.get("contents") or "", max_len=None) + reg_dt = arno[:14] if re.fullmatch(r"\d{14}", arno[:14]) else None + return { + "arno": arno, + "title": title, + "text": text, + "published_date": _ymd_to_date(arno[:8]), + "reg_dt": reg_dt, + } + + def _safe_filename(name: str) -> str: """NAS 파일명 정화 — 경로분리자/제어문자/공백연쇄 제거 (쉘 함정 회피).""" name = re.sub(r"[/\\\x00-\x1f]", "_", name).strip() @@ -273,6 +304,83 @@ async def collect_disaster_cases(session) -> int: return new_count +async def collect_fatal_accidents(session) -> int: + """사망사고 속보 daily diff — 최근 _FATAL_PAGES 페이지, arno dedup. + + 재해사례(1060)와 별 채널(1040): business 필드·첨부 API 없음, contents=HTML. + 본문 = 텍스트 네이티브(_clean_html) → md 변환 비대상, summarize/embed/chunk 큐. + """ + key = _api_key() + source = await _get_or_create_source(session, _FATAL_SOURCE, _FATAL_EP) + new_count = 0 + + for page in range(1, _FATAL_PAGES + 1): + payload = await _api_get( + f"{_FATAL_EP}?serviceKey={key}&callApiId=1040&pageNo={page}&numOfRows={_FATAL_ROWS}" + ) + items = _items(payload) + if not items: + break + page_all_dup = True + for item in items: + fields = _fatal_fields(item) + if fields is None: + continue + arno = fields["arno"] + fhash = hashlib.sha256(f"kosha-fatal|{arno}".encode()).hexdigest()[:32] + existing = await session.execute( + select(Document).where(Document.file_hash == fhash).limit(1) + ) + if existing.scalars().first(): + continue + page_all_dup = False + + text = fields["text"] + now = datetime.now(timezone.utc) + doc = Document( + file_path=f"crawl/{_FATAL_SOURCE}/{arno}", + file_hash=fhash, + file_format="article", + file_size=len(text.encode()), + file_type="note", + title=fields["title"], + extracted_text=f"{fields['title']}\n\n{text}", + extracted_at=now, + extractor_version="kosha_api", + md_status="skipped", + md_extraction_error="kosha fatal: 텍스트 네이티브, markdown 변환 비대상", + source_channel="crawl", + data_origin="external", + review_status="approved", + ai_domain="Safety", + ai_sub_group=_FATAL_SOURCE, + ai_tags=["Safety/KOSHA사망사고"], + # 안전 자료실 A-2 — ingest 시점 deterministic (classify-skip 경로) + material_type="incident", + jurisdiction="KR", + published_date=fields["published_date"], + extract_meta={ + "source_id": source.id, + "source_name": _FATAL_SOURCE, + "published_at": None, + "kosha": {"arno": arno, "kind": "fatal_accident", + "reg_dt": fields["reg_dt"]}, + "license": dict(_KOSHA_LICENSE), + }, + ) + session.add(doc) + await session.flush() + await enqueue_stage(session, doc.id, "summarize") + await enqueue_stage(session, doc.id, "embed") + await enqueue_stage(session, doc.id, "chunk") + new_count += 1 + if page_all_dup: + break # 등록일 역순 — 페이지 전체가 기존이면 이후 페이지도 기존 + + logger.info(f"[kosha] 사망사고 신규 {new_count}건") + return new_count + + async def collect_kosha_guide(session, cap: int = _GUIDE_DAILY_CAP) -> int: """GUIDE 레지스트리 전체 메타 diff → 신규/개정만 다운로드 (일일 cap 점진 백필).""" key = _api_key() @@ -353,6 +461,7 @@ async def run() -> None: """daily 1회 — 소스별 실패 격리 (재해사례 실패가 GUIDE 를 막지 않게).""" now = datetime.now(timezone.utc) for name, collector in ((_CASE_SOURCE, collect_disaster_cases), + (_FATAL_SOURCE, collect_fatal_accidents), (_GUIDE_SOURCE, collect_kosha_guide)): async with async_session() as session: result = await session.execute(select(NewsSource).where(NewsSource.name == name)) diff --git a/tests/fixtures/kosha_fatal_response.json b/tests/fixtures/kosha_fatal_response.json new file mode 100644 index 0000000..3422528 --- /dev/null +++ b/tests/fixtures/kosha_fatal_response.json @@ -0,0 +1 @@ +{"header": {"resultCode": "00", "resultMsg": "NORMAL_CODE"}, "body": {"pageNo": 1, "totalCount": 2845, "numOfRows": 3, "items": {"item": [{"contents": "
2026. 6. 9. (화), 12:22경부산 사상구 소재 아파트에서
재해자가 2명이 실외기 설치 작업 중
베란다 난간이 파손되며 바닥으로 떨어짐
(사망 2명)
※ 위 내용은 신고 및 현재 파악된 내용으로 조사결과에 따라 변경될 수 있습니다.
2026. 6. 9. (화), 17:26경서울 관악구 철도 공사 현장에서
재해자가 수직형 케이블 거치대 설치 준비 작업 중
개구부로 떨어짐(사망 1명)
※ 위 내용은 신고 및 현재 파악된 내용으로 조사결과에 따라 변경될 수 있습니다.
2026. 5. 14. (목), 16:51경전남 광양시 소재 화학물질 제조사업장에서
재해자가 정제설비 내부에서 플랜지 해체 작업 중
고온 응축수가 쏟아져 화상을 입음(사망 1명)※ 위 내용은 신고 및 현재 파악된 내용으로 조사결과에 따라 변경될 수 있습니다.
본문
"}) + assert f is not None + assert f["published_date"] is None + assert f["reg_dt"] is None + assert f["text"] == "본문"