Files
hyungi_document_server/scripts/law_monitor.py
hyungi c79e26e822 fix: 법령 임포트 경로 수정 — /10_Legislation/Law/{법령명}
기존: /10_Legislation/{법령명} (Law 폴더 누락)
수정: /10_Legislation/Law/{법령명} (architecture 설계 구조와 일치)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-30 15:05:07 +09:00

531 lines
21 KiB
Python

#!/usr/bin/env python3
"""
법령 모니터링 스크립트
- 국가법령정보센터 OpenAPI (open.law.go.kr) 폴링
- 산업안전보건법, 중대재해처벌법 등 변경 추적
- 변경 감지 시 DEVONthink 04_Industrial Safety 자동 임포트
※ API 승인 대기중 — 스크립트만 작성, 실제 호출은 승인 후
"""
import os
import sys
import json
import requests
import xml.etree.ElementTree as ET
from datetime import datetime, timedelta
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent))
from pkm_utils import setup_logger, load_credentials, run_applescript_inline, llm_generate, PROJECT_ROOT, DATA_DIR
from law_parser import (
parse_law_xml, save_law_as_markdown, build_article_chapter_map,
add_cross_law_links, lookup_current_mst, atomic_write_json,
)
logger = setup_logger("law_monitor")
MST_CACHE_FILE = DATA_DIR / "law_mst_cache.json"
MD_OUTPUT_DIR = DATA_DIR / "laws" / "md"
MD_OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
# Tier 1 — 필수 모니터링 (업무 직접 관련, 매일 확인)
TIER1_LAWS = [
# 산업안전 핵심
{"name": "산업안전보건법", "category": "법률"},
{"name": "산업안전보건법 시행령", "category": "대통령령"},
{"name": "산업안전보건법 시행규칙", "category": "부령"},
{"name": "중대재해 처벌 등에 관한 법률", "category": "법률"},
{"name": "중대재해 처벌 등에 관한 법률 시행령", "category": "대통령령"},
# 화학/위험물
{"name": "화학물질관리법", "category": "법률"},
{"name": "위험물안전관리법", "category": "법률"},
{"name": "고압가스 안전관리법", "category": "법률"},
# 전기/소방/건설
{"name": "전기안전관리법", "category": "법률"},
{"name": "소방시설 설치 및 관리에 관한 법률", "category": "법률"},
{"name": "건설기술 진흥법", "category": "법률"},
# 시설물/노동
{"name": "시설물의 안전 및 유지관리에 관한 특별법", "category": "법률"},
{"name": "근로기준법", "category": "법률"},
{"name": "산업재해보상보험법", "category": "법률"},
{"name": "근로자참여 및 협력증진에 관한 법률", "category": "법률"},
]
# Tier 2 — 참고 (기본 비활성, --include-tier2 또는 설정으로 활성화)
TIER2_LAWS = [
{"name": "원자력안전법", "category": "법률"},
{"name": "방사선안전관리법", "category": "법률"},
{"name": "환경영향평가법", "category": "법률"},
{"name": "석면안전관리법", "category": "법률"},
{"name": "승강기 안전관리법", "category": "법률"},
{"name": "연구실 안전환경 조성에 관한 법률", "category": "법률"},
{"name": "재난 및 안전관리 기본법", "category": "법률"},
{"name": "고용보험법", "category": "법률"},
]
# 마지막 확인 일자 저장 파일
LAST_CHECK_FILE = DATA_DIR / "law_last_check.json"
LAWS_DIR = DATA_DIR / "laws"
LAWS_DIR.mkdir(exist_ok=True)
def load_last_check() -> dict:
"""마지막 확인 일자 로딩"""
if LAST_CHECK_FILE.exists():
with open(LAST_CHECK_FILE, "r") as f:
return json.load(f)
return {}
def save_last_check(data: dict):
"""마지막 확인 일자 저장 (원자적 쓰기)"""
atomic_write_json(LAST_CHECK_FILE, data)
def fetch_law_info(law_oc: str, mst: str) -> dict | None:
"""법령 정보 조회 — lawService.do로 MST 직접 조회 (XML → 기본정보 추출)"""
url = "https://www.law.go.kr/DRF/lawService.do"
params = {
"OC": law_oc,
"target": "law",
"type": "XML",
"MST": mst,
}
try:
resp = requests.get(url, params=params, timeout=30)
resp.raise_for_status()
root = ET.fromstring(resp.content)
info_el = root.find(".//기본정보")
if info_el is None:
logger.warning(f"기본정보 없음 [MST={mst}]")
return None
return {
"법령명한글": (info_el.findtext("법령명_한글", "") or "").strip(),
"공포일자": (info_el.findtext("공포일자", "") or "").strip(),
"시행일자": (info_el.findtext("시행일자", "") or "").strip(),
"법령ID": (info_el.findtext("법령ID", "") or "").strip(),
"소관부처": (info_el.findtext("소관부처", "") or "").strip(),
}
except Exception as e:
logger.error(f"법령 조회 실패 [MST={mst}]: {e}")
return None
def fetch_law_text(law_oc: str, law_mst: str) -> str | None:
"""법령 본문 XML 다운로드"""
url = "https://www.law.go.kr/DRF/lawService.do"
params = {
"OC": law_oc,
"target": "law",
"type": "XML",
"MST": law_mst,
}
try:
resp = requests.get(url, params=params, timeout=60)
resp.raise_for_status()
return resp.text
except Exception as e:
logger.error(f"법령 본문 다운로드 실패 [{law_mst}]: {e}")
return None
def save_law_file(law_name: str, content: str) -> Path:
"""법령 XML 저장"""
today = datetime.now().strftime("%Y%m%d")
safe_name = law_name.replace(" ", "_").replace("/", "_")
filepath = LAWS_DIR / f"{safe_name}_{today}.xml"
with open(filepath, "w", encoding="utf-8") as f:
f.write(content)
logger.info(f"법령 저장: {filepath}")
return filepath
def import_law_to_devonthink(law_name: str, md_files: list[Path], category: str):
"""DEVONthink 04_Industrial Safety로 장별 MD 파일 임포트
3단계 교체: 기존 폴더 이동 → 신규 생성 → 구 폴더 삭제 (wiki-link 끊김 최소화)
"""
safe_name = law_name.replace(" ", "_")
group_path = f"/10_Legislation/Law/{safe_name}"
# 1단계: 기존 폴더 이동 (있으면)
rename_script = (
'tell application id "DNtp"\n'
' repeat with db in databases\n'
' if name of db is "04_Industrial safety" then\n'
f' set oldGroup to get record at "{group_path}" in db\n'
' if oldGroup is not missing value then\n'
f' set name of oldGroup to "{safe_name}_old"\n'
' end if\n'
' exit repeat\n'
' end if\n'
' end repeat\n'
'end tell'
)
try:
run_applescript_inline(rename_script)
except Exception:
pass # 기존 폴더 없으면 무시
# 2단계: 신규 폴더 생성 + 파일 임포트
for filepath in md_files:
fp = str(filepath)
script = f'set fp to "{fp}"\n'
script += 'tell application id "DNtp"\n'
script += ' repeat with db in databases\n'
script += ' if name of db is "04_Industrial safety" then\n'
script += f' set targetGroup to create location "{group_path}" in db\n'
script += ' set theRecord to import fp to targetGroup\n'
script += f' set tags of theRecord to {{"#주제/산업안전/법령", "$유형/법령", "{category}"}}\n'
script += ' add custom meta data "law_monitor" for "sourceChannel" to theRecord\n'
script += ' add custom meta data "external" for "dataOrigin" to theRecord\n'
script += ' add custom meta data (current date) for "lastAIProcess" to theRecord\n'
script += ' exit repeat\n'
script += ' end if\n'
script += ' end repeat\n'
script += 'end tell'
try:
run_applescript_inline(script)
except Exception as e:
logger.error(f"DEVONthink 임포트 실패 [{filepath.name}]: {e}")
# 3단계: 구 폴더 삭제
delete_script = (
'tell application id "DNtp"\n'
' repeat with db in databases\n'
' if name of db is "04_Industrial safety" then\n'
f' set oldGroup to get record at "/10_Legislation/Law/{safe_name}_old" in db\n'
' if oldGroup is not missing value then\n'
' delete record oldGroup\n'
' end if\n'
' exit repeat\n'
' end if\n'
' end repeat\n'
'end tell'
)
try:
run_applescript_inline(delete_script)
except Exception:
pass
logger.info(f"DEVONthink 임포트 완료: {law_name} ({len(md_files)}개 파일)")
def _fetch_with_retry(func, *args, retries=3, backoff=(5, 15, 30)):
"""API 호출 재시도 래퍼"""
import time
for i in range(retries):
result = func(*args)
if result is not None:
return result
if i < retries - 1:
logger.warning(f"재시도 {i+2}/{retries} ({backoff[i]}초 후)")
time.sleep(backoff[i])
return None
def run(include_tier2: bool = False):
"""메인 실행 — MST 자동 조회 + 장 단위 MD 분할 + DEVONthink 임포트"""
logger.info("=== 법령 모니터링 시작 ===")
creds = load_credentials()
law_oc = creds.get("LAW_OC")
if not law_oc:
logger.error("LAW_OC 인증키가 설정되지 않았습니다. credentials.env를 확인하세요.")
sys.exit(1)
laws = TIER1_LAWS + (TIER2_LAWS if include_tier2 else [])
last_check = load_last_check()
changes_found = 0
failures = []
for law in laws:
law_name = law["name"]
category = law["category"]
# MST 자동 조회 (캐시 TTL 7일)
mst = lookup_current_mst(law_oc, law_name, category, cache_path=MST_CACHE_FILE)
if not mst:
failures.append({"name": law_name, "error": "MST 조회 실패"})
continue
logger.info(f"확인 중: {law_name} (MST={mst})")
# XML 한 번에 다운로드 (정보 추출 + 파싱 겸용)
xml_text = _fetch_with_retry(fetch_law_text, law_oc, mst)
if not xml_text:
failures.append({"name": law_name, "error": "XML 다운로드 실패"})
continue
# XML에서 기본정보 추출
try:
root = ET.fromstring(xml_text)
info_el = root.find(".//기본정보")
returned_name = (info_el.findtext("법령명_한글", "") or "").strip() if info_el else ""
except Exception:
failures.append({"name": law_name, "error": "XML 파싱 실패"})
continue
# 법령명 검증
if law_name not in returned_name and returned_name not in law_name:
logger.warning(f"법령명 불일치: 요청='{law_name}' 응답='{returned_name}' — 스킵")
failures.append({"name": law_name, "error": f"법령명 불일치: {returned_name}"})
continue
# 공포일자로 변경 감지
announce_date = (info_el.findtext("공포일자", "") or "").strip() if info_el else ""
prev_date = last_check.get(law_name, "")
if announce_date and announce_date != prev_date:
logger.info(f"변경 감지: {law_name} — 공포일자 {announce_date} (이전: {prev_date or '없음'})")
# XML 저장
xml_path = save_law_file(law_name, xml_text)
# XML → MD 장 분할
try:
parsed = parse_law_xml(str(xml_path))
md_files = save_law_as_markdown(law_name, parsed, MD_OUTPUT_DIR)
import_law_to_devonthink(law_name, md_files, category)
changes_found += 1
except Exception as e:
logger.error(f"법령 파싱/임포트 실패 [{law_name}]: {e}", exc_info=True)
failures.append({"name": law_name, "error": str(e)})
continue
last_check[law_name] = announce_date
else:
logger.debug(f"변경 없음: {law_name}")
save_last_check(last_check)
# 실행 결과 기록
run_result = {
"timestamp": datetime.now().isoformat(),
"total": len(laws),
"changes": changes_found,
"failures": failures,
}
atomic_write_json(DATA_DIR / "law_last_run.json", run_result)
if failures:
logger.warning(f"실패 {len(failures)}건: {[f['name'] for f in failures]}")
# ─── 외국 법령 (빈도 체크 후 실행) ───
us_count = fetch_us_osha(last_check)
jp_count = fetch_jp_mhlw(last_check)
eu_count = fetch_eu_osha(last_check)
changes_found += us_count + jp_count + eu_count
save_last_check(last_check)
logger.info(f"=== 법령 모니터링 완료 — {changes_found}건 변경 감지 (한국+외국) ===")
# ═══════════════════════════════════════════════
# 외국 법령 모니터링
# ═══════════════════════════════════════════════
def _should_run(last_check: dict, key: str, interval_days: int) -> bool:
"""빈도 체크: 마지막 실행일로부터 interval_days 경과 여부"""
last_run = last_check.get(key, "")
if not last_run:
return True
try:
last_date = datetime.strptime(last_run, "%Y-%m-%d")
return (datetime.now() - last_date).days >= interval_days
except ValueError:
return True
def _import_foreign_to_devonthink(filepath: Path, title: str, country: str):
"""외국 법령 DEVONthink 임포트 — 변수 방식 (POSIX path 따옴표 문제 회피)"""
folder = {"US": "US", "JP": "JP", "EU": "EU"}.get(country, country)
fp = str(filepath)
script = f'set fp to "{fp}"\n'
script += 'tell application id "DNtp"\n'
script += ' repeat with db in databases\n'
script += ' if name of db is "04_Industrial safety" then\n'
script += f' set targetGroup to create location "/10_Legislation/Foreign/{folder}" in db\n'
script += ' set theRecord to import fp to targetGroup\n'
script += f' set tags of theRecord to {{"#주제/산업안전/법령", "$유형/법령", "{country}"}}\n'
script += ' add custom meta data "law_monitor" for "sourceChannel" to theRecord\n'
script += ' add custom meta data "external" for "dataOrigin" to theRecord\n'
script += ' add custom meta data (current date) for "lastAIProcess" to theRecord\n'
script += ' exit repeat\n'
script += ' end if\n'
script += ' end repeat\n'
script += 'end tell'
try:
run_applescript_inline(script)
safe_title = title[:40].replace('\n', ' ')
logger.info(f"DEVONthink 임포트 [{country}]: {safe_title}")
except Exception as e:
logger.error(f"DEVONthink 임포트 실패 [{country}]: {e}")
def fetch_us_osha(last_check: dict) -> int:
"""US OSHA — Federal Register API (주 1회)"""
if not _should_run(last_check, "_us_osha_last", 7):
logger.debug("US OSHA: 이번 주 이미 실행됨, 건너뜀")
return 0
logger.info("=== US OSHA 확인 ===")
try:
from_date = (datetime.now() - timedelta(days=7)).strftime("%Y-%m-%d")
resp = requests.get("https://www.federalregister.gov/api/v1/documents.json", params={
"conditions[agencies][]": "occupational-safety-and-health-administration",
"conditions[publication_date][gte]": from_date,
"per_page": 10,
"order": "newest",
}, timeout=30)
resp.raise_for_status()
data = resp.json()
results = data.get("results", [])
count = 0
for doc in results:
doc_id = doc.get("document_number", "")
title = doc.get("title", "")
pub_date = doc.get("publication_date", "")
abstract = doc.get("abstract", "")
doc_url = doc.get("html_url", "")
# 마크다운으로 저장
content = f"# {title}\n\n"
content += f"- **Document**: {doc_id}\n"
content += f"- **Date**: {pub_date}\n"
content += f"- **URL**: {doc_url}\n\n"
if abstract:
content += f"## Abstract\n\n{abstract}\n"
safe_title = "".join(c if c.isalnum() or c in " _-" else "_" for c in title)[:50]
filepath = LAWS_DIR / f"US_OSHA_{pub_date}_{safe_title}.md"
with open(filepath, "w", encoding="utf-8") as f:
f.write(content)
_import_foreign_to_devonthink(filepath, title, "US")
count += 1
last_check["_us_osha_last"] = datetime.now().strftime("%Y-%m-%d")
logger.info(f"US OSHA: {count}")
return count
except Exception as e:
logger.error(f"US OSHA 에러: {e}", exc_info=True)
return 0
def fetch_jp_mhlw(last_check: dict) -> int:
"""JP 厚生労働省 — RSS 파싱 + MLX 번역 (주 1회)"""
if not _should_run(last_check, "_jp_mhlw_last", 7):
logger.debug("JP 厚労省: 이번 주 이미 실행됨, 건너뜀")
return 0
logger.info("=== JP 厚生労働省 확인 ===")
try:
import xml.etree.ElementTree as ET
resp = requests.get("https://www.mhlw.go.jp/stf/news.rdf", timeout=30)
resp.raise_for_status()
root = ET.fromstring(resp.content)
safety_keywords = ["労働安全", "安全衛生", "労災", "化学物質", "石綿", "安全管理", "労働", "安全", "衛生"]
rss_ns = "http://purl.org/rss/1.0/"
count = 0
# RDF 1.0 형식: {http://purl.org/rss/1.0/}item
items = root.findall(f"{{{rss_ns}}}item")
logger.info(f"JP RSS 항목: {len(items)}")
for item in items:
title = item.findtext(f"{{{rss_ns}}}title", "")
link = item.findtext(f"{{{rss_ns}}}link", "")
pub_date = item.findtext("pubDate", "")
# 안전위생 키워드 필터
if not any(kw in title for kw in safety_keywords):
continue
# MLX 35B로 한국어 번역
translated = ""
try:
translated = llm_generate(
f"다음 일본어 제목을 한국어로 번역해줘. 번역만 출력하고 다른 말은 하지 마.\n\n{title}",
no_think=True
)
except Exception:
translated = title
content = f"# {title}\n\n"
content += f"**한국어**: {translated}\n\n"
content += f"- **URL**: {link}\n"
content += f"- **Date**: {pub_date}\n"
safe_title = "".join(c if c.isalnum() or c in " _-" else "_" for c in title)[:40]
today = datetime.now().strftime("%Y%m%d")
filepath = LAWS_DIR / f"JP_{today}_{safe_title}.md"
with open(filepath, "w", encoding="utf-8") as f:
f.write(content)
_import_foreign_to_devonthink(filepath, f"{translated} ({title})", "JP")
count += 1
if count >= 10:
break
last_check["_jp_mhlw_last"] = datetime.now().strftime("%Y-%m-%d")
logger.info(f"JP 厚労省: {count}")
return count
except Exception as e:
logger.error(f"JP 厚労省 에러: {e}", exc_info=True)
return 0
def fetch_eu_osha(last_check: dict) -> int:
"""EU-OSHA — RSS 파싱 (월 1회)"""
if not _should_run(last_check, "_eu_osha_last", 30):
logger.debug("EU-OSHA: 이번 달 이미 실행됨, 건너뜀")
return 0
logger.info("=== EU-OSHA 확인 ===")
try:
import xml.etree.ElementTree as ET
resp = requests.get("https://osha.europa.eu/en/rss.xml", timeout=30)
resp.raise_for_status()
root = ET.fromstring(resp.content)
count = 0
for item in root.iter("item"):
title = item.findtext("title", "")
link = item.findtext("link", "")
description = item.findtext("description", "")
pub_date = item.findtext("pubDate", "")
content = f"# {title}\n\n"
content += f"- **URL**: {link}\n"
content += f"- **Date**: {pub_date}\n\n"
if description:
content += f"## Summary\n\n{description}\n"
safe_title = "".join(c if c.isalnum() or c in " _-" else "" for c in title)[:50].strip() or f"item{count+1}"
today = datetime.now().strftime("%Y%m%d")
filepath = LAWS_DIR / f"EU_{today}_{count+1:02d}_{safe_title}.md"
with open(filepath, "w", encoding="utf-8") as f:
f.write(content)
_import_foreign_to_devonthink(filepath, title, "EU")
count += 1
if count >= 5:
break
last_check["_eu_osha_last"] = datetime.now().strftime("%Y-%m-%d")
logger.info(f"EU-OSHA: {count}")
return count
except Exception as e:
logger.error(f"EU-OSHA 에러: {e}", exc_info=True)
return 0
if __name__ == "__main__":
tier2 = "--include-tier2" in sys.argv
run(include_tier2=tier2)