Files
hyungi_document_server/app/workers/kosha_collector.py
T
hyungi 1842f27d89 feat(news): crawl-24x7 사이클 2 — B-2/B-3/C-1/C-2/C-3/C-5 (마이그 324-326)
- 채널 인지화: news_sources.source_channel(324, documents enum 재사용) →
  문서 생성 정체성(_doc_identity)·embed/chunk 30일 게이트(crawl=전량 색인)·
  extract 후속 override(crawl→classify, preview 스킵) 분기.
- B-2 Guardian Open Platform: API 디스패치(호스트 분기, 미지 호스트=명시 실패)
  + show-fields=bodyText 전문 어댑터. fixture live 박제 + call-shape 테스트.
- B-3 구독지: playwright-fetcher 격리 컨테이너(동시 1·요청당 브라우저·storage_state
  ro mount) + politeness 사람속도(30-60s) 브라우저 경로 + fulltext 인증 라우팅
  (내용 기반 probe 게이트·relogin_requested 소비=open-스킵보다 앞·본문 페이월 마커
  게이트) + source_health probe 컬럼(325) + 세션 박제 스크립트(맥북용).
- C-2 KOSHA: 3 API live 검증·fixture 박제(board/attach/guide) — 재해사례 daily diff
  +첨부 PDF/HWP→extract 파이프라인, GUIDE 일일 cap 점진 백필(silent cap 금지 로그).
  키는 URL 직결합(재인코딩 함정 회피). daily 06:40 KST.
- C-3 정적 코퍼스: National Board 86 + TWI job-knowledge 153 일괄 CLI(멱등·politeness
  ·crawl_raw 보존·fulltext_worker 승격 필드 규약 동일).
- C-1/C-5 시드(326): 전 URL live 검증 — UK HSE(feed-full)/안전신문/고용노동부 3종
  (rss/*.do)/OSHA/EU-OSHA(후보)/SEP/1000-Word(feed-full)/Doing Philosophy/Aeon/Psyche
  (skip-video quirk).

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-06-10 15:08:18 +09:00

352 lines
15 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""C-2 KOSHA Open API 수집 워커 (plan crawl-24x7-1).
3 API (2026-06-10 실키 live 검증 + fixture 박제 — tests/fixtures/kosha_*_response.json):
재해사례 게시판: GET /B552468/disaster_api02/getdisaster_api02 callApiId=1060
재해사례 첨부: GET /B552468/disaster_attach_api02/Disaster_attach_api02 callApiId=1070
KOSHA GUIDE: GET /B552468/koshaguide/getKoshaGuide callApiId=1050
daily 스케줄 1회 (main.py):
재해사례 = 최근 페이지만 diff (boardno dedup) — 사례 본문 Document(텍스트 네이티브)
+ 첨부 PDF/HWP 다운로드 → /documents/crawl_raw/kosha/{boardno}/ 저장
→ 파일 Document + extract enqueue (kordoc HWP/PDF 기존 파이프라인 재사용).
GUIDE = 전체 레지스트리 메타 diff (1039건, 100/page = 11 call) → 신규/개정만,
일일 ingest cap(기본 25) = backlog 자동 점진 백필(~6주) + 부하 평탄화.
cap 으로 미처리 잔량은 매회 로그 (silent cap 금지).
키: KOSHA_API_KEY (credentials.env) — 공공데이터포털 '인코딩' 키를 그대로 저장.
httpx params= 로 넘기면 % 가 재인코딩되므로 반드시 URL 문자열에 직접 결합.
개정 감지: GUIDE dedup 키 = 규정번호+공표일자 — 같은 번호의 새 공표일자 = 신규 문서로 적재.
"""
import asyncio
import hashlib
import os
import random
import re
from datetime import datetime, timezone
from pathlib import Path
import httpx
from sqlalchemy import select
from core.config import settings
from core.crawl_politeness import CRAWL_UA
from core.database import async_session
from core.utils import setup_logger
from models.document import Document
from models.news_source import NewsSource
from models.queue import enqueue_stage
from workers.news_collector import (
FeedError,
_get_or_create_health,
_record_failure,
_record_success,
)
logger = setup_logger("kosha_collector")
_BASE = "https://apis.data.go.kr/B552468"
_BOARD_EP = f"{_BASE}/disaster_api02/getdisaster_api02"
_ATTACH_EP = f"{_BASE}/disaster_attach_api02/Disaster_attach_api02"
_GUIDE_EP = f"{_BASE}/koshaguide/getKoshaGuide"
_CASE_SOURCE = "KOSHA 재해사례"
_GUIDE_SOURCE = "KOSHA GUIDE"
_CASE_PAGES = 2 # daily diff 범위 (30×2 = 최근 60건 — 등록일 역순 API)
_CASE_ROWS = 30
_GUIDE_ROWS = 100
_GUIDE_DAILY_CAP = int(os.getenv("KOSHA_GUIDE_DAILY_CAP", "25"))
_MAX_FILE_BYTES = 50 * 1024 * 1024
_DOWNLOAD_DELAY = (2.0, 5.0) # portal.kosha.or.kr 파일서버 — 연속 다운로드 간격
def _api_key() -> str:
key = os.getenv("KOSHA_API_KEY", "")
if not key:
raise FeedError("KOSHA_API_KEY 미설정 — KOSHA 수집 불가")
return key
async def _api_get(url: str) -> dict:
"""공통 GET — 게이트웨이/제공자 이중 에러 체계 검사."""
async with httpx.AsyncClient(timeout=25) as client:
resp = await client.get(url, headers={"User-Agent": CRAWL_UA})
if resp.status_code != 200:
raise FeedError(f"KOSHA API {resp.status_code} @ {url.split('?')[0]}")
try:
payload = resp.json()
except ValueError as e:
# 게이트웨이 에러는 XML/plain 으로 옴 (SERVICE_KEY_IS_NOT_REGISTERED 등)
raise FeedError(f"KOSHA API 비-JSON 응답: {resp.text[:120]}") from e
code = (payload.get("header") or {}).get("resultCode")
if code != "00":
raise FeedError(f"KOSHA API resultCode={code}: {(payload.get('header') or {}).get('resultMsg')}")
return payload
def _items(payload: dict) -> list[dict]:
"""body.items.item — 단건이면 dict 로 오는 data.go.kr 관행 방어."""
item = ((payload.get("body") or {}).get("items") or {}).get("item")
if item is None:
return []
return [item] if isinstance(item, dict) else list(item)
def _safe_filename(name: str) -> str:
"""NAS 파일명 정화 — 경로분리자/제어문자/공백연쇄 제거 (쉘 함정 회피)."""
name = re.sub(r"[/\\\x00-\x1f]", "_", name).strip()
name = re.sub(r"\s+", " ", name)
return name[:140] or "unnamed"
async def _download(url: str, dest: Path) -> int:
"""첨부/규정 파일 다운로드 — 크기 cap + 디렉토리 생성 + 연속 간격."""
await asyncio.sleep(random.uniform(*_DOWNLOAD_DELAY))
async with httpx.AsyncClient(timeout=60, follow_redirects=True) as client:
resp = await client.get(url, headers={"User-Agent": CRAWL_UA})
if resp.status_code != 200:
raise FeedError(f"파일 다운로드 {resp.status_code}: {url}")
if len(resp.content) > _MAX_FILE_BYTES:
raise FeedError(f"파일 크기 초과 ({len(resp.content)} bytes): {url}")
dest.parent.mkdir(parents=True, exist_ok=True)
dest.write_bytes(resp.content)
return len(resp.content)
async def _get_or_create_source(session, name: str, feed_url: str) -> NewsSource:
result = await session.execute(select(NewsSource).where(NewsSource.name == name))
source = result.scalars().first()
if source is None:
source = NewsSource(
name=name, feed_url=feed_url, feed_type="rss", fetch_method="api",
fulltext_policy="none", source_channel="crawl", category="Safety",
language="ko", country="KR",
enabled=False, # 6h 뉴스 사이클 비대상 — 본 워커가 daily 폴링
)
session.add(source)
await session.flush()
return source
async def _ingest_attachment(session, boardno: str, filenm: str, filepath: str) -> bool:
"""첨부 1건 → NAS 저장 + 파일 Document + extract enqueue. 반환 = 신규 여부."""
safe = _safe_filename(filenm)
rel_path = f"crawl_raw/kosha/{boardno}/{safe}"
existing = await session.execute(
select(Document).where(Document.file_path == rel_path).limit(1)
)
if existing.scalars().first():
return False
dest = Path(settings.nas_mount_path) / rel_path
size = await _download(filepath, dest)
ext = (safe.rsplit(".", 1)[-1].lower() if "." in safe else "bin")[:10]
doc = Document(
file_path=rel_path,
file_hash=hashlib.sha256(dest.read_bytes()).hexdigest(),
file_format=ext,
file_size=size,
file_type="immutable",
title=safe.rsplit(".", 1)[0],
source_channel="crawl",
data_origin="external",
import_source="kosha_api",
edit_url=filepath,
ai_tags=["Safety/KOSHA재해사례/첨부"],
extract_meta={"kosha": {"boardno": boardno, "kind": "case_attachment"}},
)
session.add(doc)
await session.flush()
# extract → (crawl override) classify → embed/chunk — 기존 파일 파이프라인 재사용
await enqueue_stage(session, doc.id, "extract")
logger.info(f"[kosha] 첨부 ingest: {rel_path} ({size} bytes)")
return True
async def collect_disaster_cases(session) -> int:
"""재해사례 daily diff — 최근 _CASE_PAGES 페이지, boardno dedup."""
key = _api_key()
source = await _get_or_create_source(session, _CASE_SOURCE, _BOARD_EP)
new_count = 0
for page in range(1, _CASE_PAGES + 1):
payload = await _api_get(
f"{_BOARD_EP}?serviceKey={key}&callApiId=1060&pageNo={page}&numOfRows={_CASE_ROWS}"
)
items = _items(payload)
if not items:
break
page_all_dup = True
for item in items:
boardno = str(item.get("boardno") or "").strip()
title = (item.get("keyword") or "").strip()
if not boardno or not title:
continue
fhash = hashlib.sha256(f"kosha-case|{boardno}".encode()).hexdigest()[:32]
existing = await session.execute(
select(Document).where(Document.file_hash == fhash).limit(1)
)
if existing.scalars().first():
continue
page_all_dup = False
contents = (item.get("contents") or "").strip()
business = (item.get("business") or "").strip()
now = datetime.now(timezone.utc)
doc = Document(
file_path=f"crawl/{_CASE_SOURCE}/{boardno}",
file_hash=fhash,
file_format="article",
file_size=len(contents.encode()),
file_type="note",
title=title,
extracted_text=f"{title}\n\n[{business}]\n{contents}",
extracted_at=now,
extractor_version="kosha_api",
md_status="skipped",
md_extraction_error="kosha case: 텍스트 네이티브, markdown 변환 비대상",
source_channel="crawl",
data_origin="external",
review_status="approved",
ai_domain="Safety",
ai_sub_group=_CASE_SOURCE,
ai_tags=[f"Safety/KOSHA재해사례/{business or '기타'}"],
extract_meta={
"source_id": source.id,
"source_name": _CASE_SOURCE,
"published_at": None,
"kosha": {"boardno": boardno, "business": business,
"atcflcnt": item.get("atcflcnt")},
},
)
session.add(doc)
await session.flush()
await enqueue_stage(session, doc.id, "summarize")
await enqueue_stage(session, doc.id, "embed")
await enqueue_stage(session, doc.id, "chunk")
new_count += 1
# 첨부 (PDF/HWP) — 본문보다 정보량 큰 정식 사례 보고서
if int(item.get("atcflcnt") or 0) > 0:
attach = await _api_get(
f"{_ATTACH_EP}?serviceKey={key}&callApiId=1070"
f"&pageNo=1&numOfRows=10&boardno={boardno}"
)
for att in _items(attach):
filenm = (att.get("filenm") or "").strip()
filepath = (att.get("filepath") or "").strip()
if not filenm or not filepath.startswith("https://"):
continue
try:
await _ingest_attachment(session, boardno, filenm, filepath)
except FeedError as e:
logger.warning(f"[kosha] 첨부 실패 skip ({boardno}/{filenm}): {e}")
if page_all_dup:
break # 등록일 역순 — 페이지 전체가 기존이면 이후 페이지도 기존
logger.info(f"[kosha] 재해사례 신규 {new_count}")
return new_count
async def collect_kosha_guide(session, cap: int = _GUIDE_DAILY_CAP) -> int:
"""GUIDE 레지스트리 전체 메타 diff → 신규/개정만 다운로드 (일일 cap 점진 백필)."""
key = _api_key()
await _get_or_create_source(session, _GUIDE_SOURCE, _GUIDE_EP)
new_specs: list[dict] = []
page, total = 1, None
while True:
payload = await _api_get(
f"{_GUIDE_EP}?serviceKey={key}&callApiId=1050&pageNo={page}&numOfRows={_GUIDE_ROWS}"
)
if total is None:
total = int((payload.get("body") or {}).get("totalCount") or 0)
items = _items(payload)
if not items:
break
for item in items:
no = (item.get("techGdlnNo") or "").strip()
ymd = (item.get("techGdlnOfancYmd") or "").strip()
url = (item.get("fileDownloadUrl") or "").strip()
if not no or not url.startswith("https://"):
continue
fhash = hashlib.sha256(f"kosha-guide|{no}|{ymd}".encode()).hexdigest()[:32]
existing = await session.execute(
select(Document).where(Document.file_hash == fhash).limit(1)
)
if not existing.scalars().first():
new_specs.append({"no": no, "ymd": ymd, "url": url,
"name": (item.get("techGdlnNm") or no).strip(),
"fhash": fhash})
if page * _GUIDE_ROWS >= total:
break
page += 1
todo, deferred = new_specs[:cap], len(new_specs) - min(len(new_specs), cap)
ingested = 0
for spec in todo:
safe_no = _safe_filename(spec["no"])
rel_path = f"crawl_raw/kosha_guide/{safe_no}-{spec['ymd'] or 'nodate'}.pdf"
dest = Path(settings.nas_mount_path) / rel_path
try:
size = await _download(spec["url"], dest)
except FeedError as e:
logger.warning(f"[kosha] GUIDE 다운로드 실패 skip ({spec['no']}): {e}")
continue
doc = Document(
file_path=rel_path,
file_hash=spec["fhash"],
file_format="pdf",
file_size=size,
file_type="immutable",
title=f"{spec['name']} ({spec['no']})",
source_channel="crawl",
data_origin="external",
import_source="kosha_api",
edit_url=spec["url"],
ai_tags=["Safety/KOSHA GUIDE"],
extract_meta={"kosha": {"kind": "guide", "techGdlnNo": spec["no"],
"ofancYmd": spec["ymd"]}},
)
session.add(doc)
await session.flush()
await enqueue_stage(session, doc.id, "extract")
ingested += 1
# silent cap 금지 — 잔량 가시화 (자동 점진 백필: 내일 cap 만큼 또 소화)
logger.info(f"[kosha] GUIDE 신규/개정 {len(new_specs)}건 중 {ingested}건 ingest"
+ (f" (cap {cap}, 잔여 {deferred}건 — 일일 점진 백필)" if deferred > 0 else ""))
return ingested
async def run() -> None:
"""daily 1회 — 소스별 실패 격리 (재해사례 실패가 GUIDE 를 막지 않게)."""
now = datetime.now(timezone.utc)
for name, collector in ((_CASE_SOURCE, collect_disaster_cases),
(_GUIDE_SOURCE, collect_kosha_guide)):
async with async_session() as session:
result = await session.execute(select(NewsSource).where(NewsSource.name == name))
source = result.scalars().first()
try:
count = await collector(session)
if source is None: # 첫 실행에서 collector 가 생성
result = await session.execute(
select(NewsSource).where(NewsSource.name == name))
source = result.scalars().first()
health = await _get_or_create_health(session, source.id)
_record_success(health, count, False, now)
await session.commit()
except Exception as e:
logger.error(f"[kosha] {name} 수집 실패: {e}")
await session.rollback() # 부분 적재 폐기 후 health 만 기록
if source is not None:
health = await _get_or_create_health(session, source.id)
_record_failure(health, str(e) or repr(e), now)
await session.commit()
if __name__ == "__main__":
asyncio.run(run())