401 lines
15 KiB
Python
401 lines
15 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
법령 모니터링 스크립트
|
|
- 국가법령정보센터 OpenAPI (open.law.go.kr) 폴링
|
|
- 산업안전보건법, 중대재해처벌법 등 변경 추적
|
|
- 변경 감지 시 DEVONthink 04_Industrial Safety 자동 임포트
|
|
※ API 승인 대기중 — 스크립트만 작성, 실제 호출은 승인 후
|
|
"""
|
|
|
|
import os
|
|
import sys
|
|
import json
|
|
import requests
|
|
import xml.etree.ElementTree as ET
|
|
from datetime import datetime, timedelta
|
|
from pathlib import Path
|
|
|
|
sys.path.insert(0, str(Path(__file__).parent))
|
|
from pkm_utils import setup_logger, load_credentials, run_applescript_inline, llm_generate, PROJECT_ROOT, DATA_DIR
|
|
|
|
logger = setup_logger("law_monitor")
|
|
|
|
# 모니터링 대상 법령
|
|
MONITORED_LAWS = [
|
|
{"name": "산업안전보건법", "law_id": "001789", "category": "법률"},
|
|
{"name": "산업안전보건법 시행령", "law_id": "001790", "category": "대통령령"},
|
|
{"name": "산업안전보건법 시행규칙", "law_id": "001791", "category": "부령"},
|
|
{"name": "중대재해 처벌 등에 관한 법률", "law_id": "019005", "category": "법률"},
|
|
{"name": "중대재해 처벌 등에 관한 법률 시행령", "law_id": "019006", "category": "대통령령"},
|
|
{"name": "화학물질관리법", "law_id": "012354", "category": "법률"},
|
|
{"name": "위험물안전관리법", "law_id": "001478", "category": "법률"},
|
|
]
|
|
|
|
# 마지막 확인 일자 저장 파일
|
|
LAST_CHECK_FILE = DATA_DIR / "law_last_check.json"
|
|
LAWS_DIR = DATA_DIR / "laws"
|
|
LAWS_DIR.mkdir(exist_ok=True)
|
|
|
|
|
|
def load_last_check() -> dict:
|
|
"""마지막 확인 일자 로딩"""
|
|
if LAST_CHECK_FILE.exists():
|
|
with open(LAST_CHECK_FILE, "r") as f:
|
|
return json.load(f)
|
|
return {}
|
|
|
|
|
|
def save_last_check(data: dict):
|
|
"""마지막 확인 일자 저장"""
|
|
with open(LAST_CHECK_FILE, "w") as f:
|
|
json.dump(data, f, ensure_ascii=False, indent=2)
|
|
|
|
|
|
def fetch_law_info(law_oc: str, law_id: str) -> dict | None:
|
|
"""법령 정보 조회 (법령 API)"""
|
|
url = "https://www.law.go.kr/DRF/lawSearch.do"
|
|
params = {
|
|
"OC": law_oc,
|
|
"target": "law",
|
|
"type": "JSON",
|
|
"MST": law_id,
|
|
}
|
|
try:
|
|
resp = requests.get(url, params=params, timeout=30)
|
|
resp.raise_for_status()
|
|
data = resp.json()
|
|
# API 에러 응답 감지
|
|
if "result" in data and "실패" in str(data.get("result", "")):
|
|
logger.error(f"법령 API 에러 [{law_id}]: {data.get('result')} — {data.get('msg')}")
|
|
return None
|
|
if "LawSearch" in data and "law" in data["LawSearch"]:
|
|
laws = data["LawSearch"]["law"]
|
|
if isinstance(laws, list):
|
|
return laws[0] if laws else None
|
|
return laws
|
|
logger.warning(f"법령 응답에 데이터 없음 [{law_id}]: {list(data.keys())}")
|
|
return None
|
|
except Exception as e:
|
|
logger.error(f"법령 조회 실패 [{law_id}]: {e}")
|
|
return None
|
|
|
|
|
|
def fetch_law_text(law_oc: str, law_mst: str) -> str | None:
|
|
"""법령 본문 XML 다운로드"""
|
|
url = "https://www.law.go.kr/DRF/lawService.do"
|
|
params = {
|
|
"OC": law_oc,
|
|
"target": "law",
|
|
"type": "XML",
|
|
"MST": law_mst,
|
|
}
|
|
try:
|
|
resp = requests.get(url, params=params, timeout=60)
|
|
resp.raise_for_status()
|
|
return resp.text
|
|
except Exception as e:
|
|
logger.error(f"법령 본문 다운로드 실패 [{law_mst}]: {e}")
|
|
return None
|
|
|
|
|
|
def save_law_file(law_name: str, content: str) -> Path:
|
|
"""법령 XML 저장"""
|
|
today = datetime.now().strftime("%Y%m%d")
|
|
safe_name = law_name.replace(" ", "_").replace("/", "_")
|
|
filepath = LAWS_DIR / f"{safe_name}_{today}.xml"
|
|
with open(filepath, "w", encoding="utf-8") as f:
|
|
f.write(content)
|
|
logger.info(f"법령 저장: {filepath}")
|
|
return filepath
|
|
|
|
|
|
def import_to_devonthink(filepath: Path, law_name: str, category: str):
|
|
"""DEVONthink 04_Industrial Safety로 임포트 — 변수 방식"""
|
|
fp = str(filepath)
|
|
script = f'set fp to "{fp}"\n'
|
|
script += 'tell application id "DNtp"\n'
|
|
script += ' repeat with db in databases\n'
|
|
script += ' if name of db is "04_Industrial safety" then\n'
|
|
script += ' set targetGroup to create location "/10_Legislation/Law" in db\n'
|
|
script += ' set theRecord to import fp to targetGroup\n'
|
|
script += f' set tags of theRecord to {{"#주제/산업안전/법령", "$유형/법령", "{category}"}}\n'
|
|
script += ' add custom meta data "law_monitor" for "sourceChannel" to theRecord\n'
|
|
script += ' add custom meta data "external" for "dataOrigin" to theRecord\n'
|
|
script += ' add custom meta data (current date) for "lastAIProcess" to theRecord\n'
|
|
script += ' exit repeat\n'
|
|
script += ' end if\n'
|
|
script += ' end repeat\n'
|
|
script += 'end tell'
|
|
try:
|
|
run_applescript_inline(script)
|
|
logger.info(f"DEVONthink 임포트 완료: {law_name}")
|
|
except Exception as e:
|
|
logger.error(f"DEVONthink 임포트 실패 [{law_name}]: {e}")
|
|
|
|
|
|
def run():
|
|
"""메인 실행"""
|
|
logger.info("=== 법령 모니터링 시작 ===")
|
|
|
|
creds = load_credentials()
|
|
law_oc = creds.get("LAW_OC")
|
|
if not law_oc:
|
|
logger.error("LAW_OC 인증키가 설정되지 않았습니다. credentials.env를 확인하세요.")
|
|
sys.exit(1)
|
|
|
|
last_check = load_last_check()
|
|
changes_found = 0
|
|
|
|
for law in MONITORED_LAWS:
|
|
law_name = law["name"]
|
|
law_id = law["law_id"]
|
|
category = law["category"]
|
|
|
|
logger.info(f"확인 중: {law_name} ({law_id})")
|
|
|
|
info = fetch_law_info(law_oc, law_id)
|
|
if not info:
|
|
continue
|
|
|
|
# 시행일자 또는 공포일자로 변경 감지
|
|
announce_date = info.get("공포일자", info.get("시행일자", ""))
|
|
prev_date = last_check.get(law_id, "")
|
|
|
|
if announce_date and announce_date != prev_date:
|
|
logger.info(f"변경 감지: {law_name} — 공포일자 {announce_date} (이전: {prev_date or '없음'})")
|
|
|
|
# 법령 본문 다운로드
|
|
law_mst = info.get("법령MST", law_id)
|
|
text = fetch_law_text(law_oc, law_mst)
|
|
if text:
|
|
filepath = save_law_file(law_name, text)
|
|
import_to_devonthink(filepath, law_name, category)
|
|
changes_found += 1
|
|
|
|
last_check[law_id] = announce_date
|
|
else:
|
|
logger.debug(f"변경 없음: {law_name}")
|
|
|
|
save_last_check(last_check)
|
|
|
|
# ─── 외국 법령 (빈도 체크 후 실행) ───
|
|
us_count = fetch_us_osha(last_check)
|
|
jp_count = fetch_jp_mhlw(last_check)
|
|
eu_count = fetch_eu_osha(last_check)
|
|
changes_found += us_count + jp_count + eu_count
|
|
|
|
save_last_check(last_check)
|
|
logger.info(f"=== 법령 모니터링 완료 — {changes_found}건 변경 감지 (한국+외국) ===")
|
|
|
|
|
|
# ═══════════════════════════════════════════════
|
|
# 외국 법령 모니터링
|
|
# ═══════════════════════════════════════════════
|
|
|
|
def _should_run(last_check: dict, key: str, interval_days: int) -> bool:
|
|
"""빈도 체크: 마지막 실행일로부터 interval_days 경과 여부"""
|
|
last_run = last_check.get(key, "")
|
|
if not last_run:
|
|
return True
|
|
try:
|
|
last_date = datetime.strptime(last_run, "%Y-%m-%d")
|
|
return (datetime.now() - last_date).days >= interval_days
|
|
except ValueError:
|
|
return True
|
|
|
|
|
|
def _import_foreign_to_devonthink(filepath: Path, title: str, country: str):
|
|
"""외국 법령 DEVONthink 임포트 — 변수 방식 (POSIX path 따옴표 문제 회피)"""
|
|
folder = {"US": "US", "JP": "JP", "EU": "EU"}.get(country, country)
|
|
fp = str(filepath)
|
|
script = f'set fp to "{fp}"\n'
|
|
script += 'tell application id "DNtp"\n'
|
|
script += ' repeat with db in databases\n'
|
|
script += ' if name of db is "04_Industrial safety" then\n'
|
|
script += f' set targetGroup to create location "/10_Legislation/Foreign/{folder}" in db\n'
|
|
script += ' set theRecord to import fp to targetGroup\n'
|
|
script += f' set tags of theRecord to {{"#주제/산업안전/법령", "$유형/법령", "{country}"}}\n'
|
|
script += ' add custom meta data "law_monitor" for "sourceChannel" to theRecord\n'
|
|
script += ' add custom meta data "external" for "dataOrigin" to theRecord\n'
|
|
script += ' add custom meta data (current date) for "lastAIProcess" to theRecord\n'
|
|
script += ' exit repeat\n'
|
|
script += ' end if\n'
|
|
script += ' end repeat\n'
|
|
script += 'end tell'
|
|
try:
|
|
run_applescript_inline(script)
|
|
safe_title = title[:40].replace('\n', ' ')
|
|
logger.info(f"DEVONthink 임포트 [{country}]: {safe_title}")
|
|
except Exception as e:
|
|
logger.error(f"DEVONthink 임포트 실패 [{country}]: {e}")
|
|
|
|
|
|
def fetch_us_osha(last_check: dict) -> int:
|
|
"""US OSHA — Federal Register API (주 1회)"""
|
|
if not _should_run(last_check, "_us_osha_last", 7):
|
|
logger.debug("US OSHA: 이번 주 이미 실행됨, 건너뜀")
|
|
return 0
|
|
|
|
logger.info("=== US OSHA 확인 ===")
|
|
try:
|
|
from_date = (datetime.now() - timedelta(days=7)).strftime("%Y-%m-%d")
|
|
resp = requests.get("https://www.federalregister.gov/api/v1/documents.json", params={
|
|
"conditions[agencies][]": "occupational-safety-and-health-administration",
|
|
"conditions[publication_date][gte]": from_date,
|
|
"per_page": 10,
|
|
"order": "newest",
|
|
}, timeout=30)
|
|
resp.raise_for_status()
|
|
data = resp.json()
|
|
results = data.get("results", [])
|
|
count = 0
|
|
|
|
for doc in results:
|
|
doc_id = doc.get("document_number", "")
|
|
title = doc.get("title", "")
|
|
pub_date = doc.get("publication_date", "")
|
|
abstract = doc.get("abstract", "")
|
|
doc_url = doc.get("html_url", "")
|
|
|
|
# 마크다운으로 저장
|
|
content = f"# {title}\n\n"
|
|
content += f"- **Document**: {doc_id}\n"
|
|
content += f"- **Date**: {pub_date}\n"
|
|
content += f"- **URL**: {doc_url}\n\n"
|
|
if abstract:
|
|
content += f"## Abstract\n\n{abstract}\n"
|
|
|
|
safe_title = "".join(c if c.isalnum() or c in " _-" else "_" for c in title)[:50]
|
|
filepath = LAWS_DIR / f"US_OSHA_{pub_date}_{safe_title}.md"
|
|
with open(filepath, "w", encoding="utf-8") as f:
|
|
f.write(content)
|
|
|
|
_import_foreign_to_devonthink(filepath, title, "US")
|
|
count += 1
|
|
|
|
last_check["_us_osha_last"] = datetime.now().strftime("%Y-%m-%d")
|
|
logger.info(f"US OSHA: {count}건")
|
|
return count
|
|
|
|
except Exception as e:
|
|
logger.error(f"US OSHA 에러: {e}", exc_info=True)
|
|
return 0
|
|
|
|
|
|
def fetch_jp_mhlw(last_check: dict) -> int:
|
|
"""JP 厚生労働省 — RSS 파싱 + MLX 번역 (주 1회)"""
|
|
if not _should_run(last_check, "_jp_mhlw_last", 7):
|
|
logger.debug("JP 厚労省: 이번 주 이미 실행됨, 건너뜀")
|
|
return 0
|
|
|
|
logger.info("=== JP 厚生労働省 확인 ===")
|
|
try:
|
|
import xml.etree.ElementTree as ET
|
|
resp = requests.get("https://www.mhlw.go.jp/stf/news.rdf", timeout=30)
|
|
resp.raise_for_status()
|
|
root = ET.fromstring(resp.content)
|
|
|
|
safety_keywords = ["労働安全", "安全衛生", "労災", "化学物質", "石綿", "安全管理", "労働", "安全", "衛生"]
|
|
rss_ns = "http://purl.org/rss/1.0/"
|
|
count = 0
|
|
|
|
# RDF 1.0 형식: {http://purl.org/rss/1.0/}item
|
|
items = root.findall(f"{{{rss_ns}}}item")
|
|
logger.info(f"JP RSS 항목: {len(items)}건")
|
|
for item in items:
|
|
title = item.findtext(f"{{{rss_ns}}}title", "")
|
|
link = item.findtext(f"{{{rss_ns}}}link", "")
|
|
pub_date = item.findtext("pubDate", "")
|
|
|
|
# 안전위생 키워드 필터
|
|
if not any(kw in title for kw in safety_keywords):
|
|
continue
|
|
|
|
# MLX 35B로 한국어 번역
|
|
translated = ""
|
|
try:
|
|
translated = llm_generate(
|
|
f"다음 일본어 제목을 한국어로 번역해줘. 번역만 출력하고 다른 말은 하지 마.\n\n{title}"
|
|
)
|
|
# thinking 출력 제거 — 마지막 줄만 사용
|
|
lines = [l.strip() for l in translated.strip().split("\n") if l.strip()]
|
|
translated = lines[-1] if lines else title
|
|
except Exception:
|
|
translated = title
|
|
|
|
content = f"# {title}\n\n"
|
|
content += f"**한국어**: {translated}\n\n"
|
|
content += f"- **URL**: {link}\n"
|
|
content += f"- **Date**: {pub_date}\n"
|
|
|
|
safe_title = "".join(c if c.isalnum() or c in " _-" else "_" for c in title)[:40]
|
|
today = datetime.now().strftime("%Y%m%d")
|
|
filepath = LAWS_DIR / f"JP_{today}_{safe_title}.md"
|
|
with open(filepath, "w", encoding="utf-8") as f:
|
|
f.write(content)
|
|
|
|
_import_foreign_to_devonthink(filepath, f"{translated} ({title})", "JP")
|
|
count += 1
|
|
|
|
if count >= 10:
|
|
break
|
|
|
|
last_check["_jp_mhlw_last"] = datetime.now().strftime("%Y-%m-%d")
|
|
logger.info(f"JP 厚労省: {count}건")
|
|
return count
|
|
|
|
except Exception as e:
|
|
logger.error(f"JP 厚労省 에러: {e}", exc_info=True)
|
|
return 0
|
|
|
|
|
|
def fetch_eu_osha(last_check: dict) -> int:
|
|
"""EU-OSHA — RSS 파싱 (월 1회)"""
|
|
if not _should_run(last_check, "_eu_osha_last", 30):
|
|
logger.debug("EU-OSHA: 이번 달 이미 실행됨, 건너뜀")
|
|
return 0
|
|
|
|
logger.info("=== EU-OSHA 확인 ===")
|
|
try:
|
|
import xml.etree.ElementTree as ET
|
|
resp = requests.get("https://osha.europa.eu/en/rss.xml", timeout=30)
|
|
resp.raise_for_status()
|
|
root = ET.fromstring(resp.content)
|
|
|
|
count = 0
|
|
for item in root.iter("item"):
|
|
title = item.findtext("title", "")
|
|
link = item.findtext("link", "")
|
|
description = item.findtext("description", "")
|
|
pub_date = item.findtext("pubDate", "")
|
|
|
|
content = f"# {title}\n\n"
|
|
content += f"- **URL**: {link}\n"
|
|
content += f"- **Date**: {pub_date}\n\n"
|
|
if description:
|
|
content += f"## Summary\n\n{description}\n"
|
|
|
|
safe_title = "".join(c if c.isalnum() or c in " _-" else "" for c in title)[:50].strip() or f"item{count+1}"
|
|
today = datetime.now().strftime("%Y%m%d")
|
|
filepath = LAWS_DIR / f"EU_{today}_{count+1:02d}_{safe_title}.md"
|
|
with open(filepath, "w", encoding="utf-8") as f:
|
|
f.write(content)
|
|
|
|
_import_foreign_to_devonthink(filepath, title, "EU")
|
|
count += 1
|
|
|
|
if count >= 5:
|
|
break
|
|
|
|
last_check["_eu_osha_last"] = datetime.now().strftime("%Y-%m-%d")
|
|
logger.info(f"EU-OSHA: {count}건")
|
|
return count
|
|
|
|
except Exception as e:
|
|
logger.error(f"EU-OSHA 에러: {e}", exc_info=True)
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
run()
|