feat: 법령 모니터링 대폭 개선 — 장 단위 MD 분할 + 크로스 링크 + Tier 분리

- law_parser.py 신규: XML→MD 장 단위 분할, 조문 앵커 링크, 부칙 분리
  - 장/절/편 자동 식별 (<조문여부>=전문), 장 없는 법령 fallback
  - DEVONthink wiki-link 크로스 링크 (같은 법률 내 + 다른 법률 간)
  - MST 자동 조회 + 7일 TTL 캐시 + 원자적 파일 쓰기
  - 법령 약칭 매핑 (산안법→산업안전보건법 등)

- law_monitor.py 리팩터링:
  - MONITORED_LAWS → Tier 1(15개 필수) / Tier 2(8개 참고, 비활성)
  - law_id → MST 방식 (현행 법령 자동 조회)
  - XML 통짜 저장 → 장별 Markdown 분할 저장
  - DEVONthink 3단계 교체 (이동→생성→삭제, wiki-link 보존)
  - 에러 핸들링: 재시도 3회/백오프 + 부분 실패 허용 + 법령명 검증
  - 실행 결과 law_last_run.json 기록

테스트: 15개 법령 전체 성공 (148개 MD 파일 생성)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
hyungi
2026-03-30 15:00:28 +09:00
parent dc3f03b421
commit 4b7ddf39c1
2 changed files with 673 additions and 70 deletions

View File

@@ -17,18 +17,50 @@ from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent))
from pkm_utils import setup_logger, load_credentials, run_applescript_inline, llm_generate, PROJECT_ROOT, DATA_DIR
from law_parser import (
parse_law_xml, save_law_as_markdown, build_article_chapter_map,
add_cross_law_links, lookup_current_mst, atomic_write_json,
)
logger = setup_logger("law_monitor")
# 모니터링 대상 법령
MONITORED_LAWS = [
{"name": "산업안전보건법", "law_id": "001789", "category": "법률"},
{"name": "산업안전보건법 시행령", "law_id": "001790", "category": "대통령령"},
{"name": "산업안전보건법 시행규칙", "law_id": "001791", "category": "부령"},
{"name": "중대재해 처벌 등에 관한 법률", "law_id": "019005", "category": "법률"},
{"name": "중대재해 처벌 등에 관한 법률 시행령", "law_id": "019006", "category": "대통령령"},
{"name": "화학물질관리법", "law_id": "012354", "category": "법률"},
{"name": "위험물안전관리법", "law_id": "001478", "category": "법률"},
MST_CACHE_FILE = DATA_DIR / "law_mst_cache.json"
MD_OUTPUT_DIR = DATA_DIR / "laws" / "md"
MD_OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
# Tier 1 — 필수 모니터링 (업무 직접 관련, 매일 확인)
TIER1_LAWS = [
# 산업안전 핵심
{"name": "산업안전보건법", "category": "법률"},
{"name": "산업안전보건법 시행령", "category": "대통령령"},
{"name": "산업안전보건법 시행규칙", "category": "부령"},
{"name": "중대재해 처벌 등에 관한 법률", "category": "법률"},
{"name": "중대재해 처벌 등에 관한 법률 시행령", "category": "대통령령"},
# 화학/위험물
{"name": "화학물질관리법", "category": "법률"},
{"name": "위험물안전관리법", "category": "법률"},
{"name": "고압가스 안전관리법", "category": "법률"},
# 전기/소방/건설
{"name": "전기안전관리법", "category": "법률"},
{"name": "소방시설 설치 및 관리에 관한 법률", "category": "법률"},
{"name": "건설기술 진흥법", "category": "법률"},
# 시설물/노동
{"name": "시설물의 안전 및 유지관리에 관한 특별법", "category": "법률"},
{"name": "근로기준법", "category": "법률"},
{"name": "산업재해보상보험법", "category": "법률"},
{"name": "근로자참여 및 협력증진에 관한 법률", "category": "법률"},
]
# Tier 2 — 참고 (기본 비활성, --include-tier2 또는 설정으로 활성화)
TIER2_LAWS = [
{"name": "원자력안전법", "category": "법률"},
{"name": "방사선안전관리법", "category": "법률"},
{"name": "환경영향평가법", "category": "법률"},
{"name": "석면안전관리법", "category": "법률"},
{"name": "승강기 안전관리법", "category": "법률"},
{"name": "연구실 안전환경 조성에 관한 법률", "category": "법률"},
{"name": "재난 및 안전관리 기본법", "category": "법률"},
{"name": "고용보험법", "category": "법률"},
]
# 마지막 확인 일자 저장 파일
@@ -46,37 +78,36 @@ def load_last_check() -> dict:
def save_last_check(data: dict):
"""마지막 확인 일자 저장"""
with open(LAST_CHECK_FILE, "w") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
"""마지막 확인 일자 저장 (원자적 쓰기)"""
atomic_write_json(LAST_CHECK_FILE, data)
def fetch_law_info(law_oc: str, law_id: str) -> dict | None:
"""법령 정보 조회 (법령 API)"""
url = "https://www.law.go.kr/DRF/lawSearch.do"
def fetch_law_info(law_oc: str, mst: str) -> dict | None:
"""법령 정보 조회 — lawService.do로 MST 직접 조회 (XML → 기본정보 추출)"""
url = "https://www.law.go.kr/DRF/lawService.do"
params = {
"OC": law_oc,
"target": "law",
"type": "JSON",
"MST": law_id,
"type": "XML",
"MST": mst,
}
try:
resp = requests.get(url, params=params, timeout=30)
resp.raise_for_status()
data = resp.json()
# API 에러 응답 감지
if "result" in data and "실패" in str(data.get("result", "")):
logger.error(f"법령 API 에러 [{law_id}]: {data.get('result')}{data.get('msg')}")
root = ET.fromstring(resp.content)
info_el = root.find(".//기본정보")
if info_el is None:
logger.warning(f"기본정보 없음 [MST={mst}]")
return None
if "LawSearch" in data and "law" in data["LawSearch"]:
laws = data["LawSearch"]["law"]
if isinstance(laws, list):
return laws[0] if laws else None
return laws
logger.warning(f"법령 응답에 데이터 없음 [{law_id}]: {list(data.keys())}")
return None
return {
"법령명한글": (info_el.findtext("법령명_한글", "") or "").strip(),
"공포일자": (info_el.findtext("공포일자", "") or "").strip(),
"시행일자": (info_el.findtext("시행일자", "") or "").strip(),
"법령ID": (info_el.findtext("법령ID", "") or "").strip(),
"소관부처": (info_el.findtext("소관부처", "") or "").strip(),
}
except Exception as e:
logger.error(f"법령 조회 실패 [{law_id}]: {e}")
logger.error(f"법령 조회 실패 [MST={mst}]: {e}")
return None
@@ -109,32 +140,91 @@ def save_law_file(law_name: str, content: str) -> Path:
return filepath
def import_to_devonthink(filepath: Path, law_name: str, category: str):
"""DEVONthink 04_Industrial Safety로 임포트 — 변수 방식"""
fp = str(filepath)
script = f'set fp to "{fp}"\n'
script += 'tell application id "DNtp"\n'
script += ' repeat with db in databases\n'
script += ' if name of db is "04_Industrial safety" then\n'
script += ' set targetGroup to create location "/10_Legislation/Law" in db\n'
script += ' set theRecord to import fp to targetGroup\n'
script += f' set tags of theRecord to {{"#주제/산업안전/법령", "$유형/법령", "{category}"}}\n'
script += ' add custom meta data "law_monitor" for "sourceChannel" to theRecord\n'
script += ' add custom meta data "external" for "dataOrigin" to theRecord\n'
script += ' add custom meta data (current date) for "lastAIProcess" to theRecord\n'
script += ' exit repeat\n'
script += ' end if\n'
script += ' end repeat\n'
script += 'end tell'
def import_law_to_devonthink(law_name: str, md_files: list[Path], category: str):
"""DEVONthink 04_Industrial Safety로 장별 MD 파일 임포트
3단계 교체: 기존 폴더 이동 → 신규 생성 → 구 폴더 삭제 (wiki-link 끊김 최소화)
"""
safe_name = law_name.replace(" ", "_")
group_path = f"/10_Legislation/{safe_name}"
# 1단계: 기존 폴더 이동 (있으면)
rename_script = (
'tell application id "DNtp"\n'
' repeat with db in databases\n'
' if name of db is "04_Industrial safety" then\n'
f' set oldGroup to get record at "{group_path}" in db\n'
' if oldGroup is not missing value then\n'
f' set name of oldGroup to "{safe_name}_old"\n'
' end if\n'
' exit repeat\n'
' end if\n'
' end repeat\n'
'end tell'
)
try:
run_applescript_inline(script)
logger.info(f"DEVONthink 임포트 완료: {law_name}")
except Exception as e:
logger.error(f"DEVONthink 임포트 실패 [{law_name}]: {e}")
run_applescript_inline(rename_script)
except Exception:
pass # 기존 폴더 없으면 무시
# 2단계: 신규 폴더 생성 + 파일 임포트
for filepath in md_files:
fp = str(filepath)
script = f'set fp to "{fp}"\n'
script += 'tell application id "DNtp"\n'
script += ' repeat with db in databases\n'
script += ' if name of db is "04_Industrial safety" then\n'
script += f' set targetGroup to create location "{group_path}" in db\n'
script += ' set theRecord to import fp to targetGroup\n'
script += f' set tags of theRecord to {{"#주제/산업안전/법령", "$유형/법령", "{category}"}}\n'
script += ' add custom meta data "law_monitor" for "sourceChannel" to theRecord\n'
script += ' add custom meta data "external" for "dataOrigin" to theRecord\n'
script += ' add custom meta data (current date) for "lastAIProcess" to theRecord\n'
script += ' exit repeat\n'
script += ' end if\n'
script += ' end repeat\n'
script += 'end tell'
try:
run_applescript_inline(script)
except Exception as e:
logger.error(f"DEVONthink 임포트 실패 [{filepath.name}]: {e}")
# 3단계: 구 폴더 삭제
delete_script = (
'tell application id "DNtp"\n'
' repeat with db in databases\n'
' if name of db is "04_Industrial safety" then\n'
f' set oldGroup to get record at "/10_Legislation/{safe_name}_old" in db\n'
' if oldGroup is not missing value then\n'
' delete record oldGroup\n'
' end if\n'
' exit repeat\n'
' end if\n'
' end repeat\n'
'end tell'
)
try:
run_applescript_inline(delete_script)
except Exception:
pass
logger.info(f"DEVONthink 임포트 완료: {law_name} ({len(md_files)}개 파일)")
def run():
"""메인 실행"""
def _fetch_with_retry(func, *args, retries=3, backoff=(5, 15, 30)):
"""API 호출 재시도 래퍼"""
import time
for i in range(retries):
result = func(*args)
if result is not None:
return result
if i < retries - 1:
logger.warning(f"재시도 {i+2}/{retries} ({backoff[i]}초 후)")
time.sleep(backoff[i])
return None
def run(include_tier2: bool = False):
"""메인 실행 — MST 자동 조회 + 장 단위 MD 분할 + DEVONthink 임포트"""
logger.info("=== 법령 모니터링 시작 ===")
creds = load_credentials()
@@ -143,41 +233,82 @@ def run():
logger.error("LAW_OC 인증키가 설정되지 않았습니다. credentials.env를 확인하세요.")
sys.exit(1)
laws = TIER1_LAWS + (TIER2_LAWS if include_tier2 else [])
last_check = load_last_check()
changes_found = 0
failures = []
for law in MONITORED_LAWS:
for law in laws:
law_name = law["name"]
law_id = law["law_id"]
category = law["category"]
logger.info(f"확인 중: {law_name} ({law_id})")
info = fetch_law_info(law_oc, law_id)
if not info:
# MST 자동 조회 (캐시 TTL 7일)
mst = lookup_current_mst(law_oc, law_name, category, cache_path=MST_CACHE_FILE)
if not mst:
failures.append({"name": law_name, "error": "MST 조회 실패"})
continue
# 시행일자 또는 공포일자로 변경 감지
announce_date = info.get("공포일자", info.get("시행일자", ""))
prev_date = last_check.get(law_id, "")
logger.info(f"확인 중: {law_name} (MST={mst})")
# XML 한 번에 다운로드 (정보 추출 + 파싱 겸용)
xml_text = _fetch_with_retry(fetch_law_text, law_oc, mst)
if not xml_text:
failures.append({"name": law_name, "error": "XML 다운로드 실패"})
continue
# XML에서 기본정보 추출
try:
root = ET.fromstring(xml_text)
info_el = root.find(".//기본정보")
returned_name = (info_el.findtext("법령명_한글", "") or "").strip() if info_el else ""
except Exception:
failures.append({"name": law_name, "error": "XML 파싱 실패"})
continue
# 법령명 검증
if law_name not in returned_name and returned_name not in law_name:
logger.warning(f"법령명 불일치: 요청='{law_name}' 응답='{returned_name}' — 스킵")
failures.append({"name": law_name, "error": f"법령명 불일치: {returned_name}"})
continue
# 공포일자로 변경 감지
announce_date = (info_el.findtext("공포일자", "") or "").strip() if info_el else ""
prev_date = last_check.get(law_name, "")
if announce_date and announce_date != prev_date:
logger.info(f"변경 감지: {law_name} — 공포일자 {announce_date} (이전: {prev_date or '없음'})")
# 법령 본문 다운로드
law_mst = info.get("법령MST", law_id)
text = fetch_law_text(law_oc, law_mst)
if text:
filepath = save_law_file(law_name, text)
import_to_devonthink(filepath, law_name, category)
changes_found += 1
# XML 저장
xml_path = save_law_file(law_name, xml_text)
last_check[law_id] = announce_date
# XML → MD 장 분할
try:
parsed = parse_law_xml(str(xml_path))
md_files = save_law_as_markdown(law_name, parsed, MD_OUTPUT_DIR)
import_law_to_devonthink(law_name, md_files, category)
changes_found += 1
except Exception as e:
logger.error(f"법령 파싱/임포트 실패 [{law_name}]: {e}", exc_info=True)
failures.append({"name": law_name, "error": str(e)})
continue
last_check[law_name] = announce_date
else:
logger.debug(f"변경 없음: {law_name}")
save_last_check(last_check)
# 실행 결과 기록
run_result = {
"timestamp": datetime.now().isoformat(),
"total": len(laws),
"changes": changes_found,
"failures": failures,
}
atomic_write_json(DATA_DIR / "law_last_run.json", run_result)
if failures:
logger.warning(f"실패 {len(failures)}건: {[f['name'] for f in failures]}")
# ─── 외국 법령 (빈도 체크 후 실행) ───
us_count = fetch_us_osha(last_check)
jp_count = fetch_jp_mhlw(last_check)
@@ -395,4 +526,5 @@ def fetch_eu_osha(last_check: dict) -> int:
if __name__ == "__main__":
run()
tier2 = "--include-tier2" in sys.argv
run(include_tier2=tier2)

471
scripts/law_parser.py Normal file
View File

@@ -0,0 +1,471 @@
#!/usr/bin/env python3
"""
법령 XML → Markdown 장 단위 분할 파서
- law.go.kr XML 파싱 → 장/절 구조 식별
- 장별 Markdown 파일 생성 (앵커 + 크로스 링크)
- 부칙 별도 파일 저장
"""
import re
import json
import os
import xml.etree.ElementTree as ET
from pathlib import Path
from datetime import datetime, timedelta
import sys
sys.path.insert(0, str(Path(__file__).parent))
from pkm_utils import setup_logger
logger = setup_logger("law_parser")
# 법령 약칭 매핑 (조문 내 참조 → 정식명칭)
LAW_ALIASES = {
"산안법": "산업안전보건법",
"산업안전보건법": "산업안전보건법",
"중대재해법": "중대재해 처벌 등에 관한 법률",
"중대재해처벌법": "중대재해 처벌 등에 관한 법률",
"화관법": "화학물질관리법",
"위안법": "위험물안전관리법",
"고압가스법": "고압가스 안전관리법",
"건설기술진흥법": "건설기술 진흥법",
"산재보험법": "산업재해보상보험법",
}
def atomic_write_json(filepath: Path, data: dict):
"""원자적 JSON 파일 쓰기 (경합 방지)"""
tmp = filepath.with_suffix(".json.tmp")
with open(tmp, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
os.replace(str(tmp), str(filepath))
# --- XML 파싱 ---
def parse_law_xml(xml_path: str) -> dict:
"""XML 파싱 → 법령 구조 추출"""
tree = ET.parse(xml_path)
root = tree.getroot()
# 기본정보
info_el = root.find(".//기본정보")
info = {
"name": (info_el.findtext("법령명_한글", "") or "").strip(),
"law_id": (info_el.findtext("법령ID", "") or "").strip(),
"announce_date": (info_el.findtext("공포일자", "") or "").strip(),
"enforce_date": (info_el.findtext("시행일자", "") or "").strip(),
"ministry": (info_el.findtext("소관부처", "") or "").strip(),
"category": (info_el.findtext("법종구분", "") or "").strip(),
}
# 조문 추출
articles = []
for el in root.findall(".//조문단위"):
kind = (el.findtext("조문여부", "") or "").strip()
num = (el.findtext("조문번호", "") or "").strip()
title = (el.findtext("조문제목", "") or "").strip()
content = (el.findtext("조문내용", "") or "").strip()
# 항 추출
paragraphs = []
for p_el in el.findall(""):
p_num = (p_el.findtext("항번호", "") or "").strip()
p_content = (p_el.findtext("항내용", "") or "").strip()
# 호 추출
sub_items = []
for h_el in p_el.findall(""):
h_num = (h_el.findtext("호번호", "") or "").strip()
h_content = (h_el.findtext("호내용", "") or "").strip()
sub_items.append({"num": h_num, "content": h_content})
paragraphs.append({"num": p_num, "content": p_content, "sub_items": sub_items})
articles.append({
"kind": kind,
"num": num,
"title": title,
"content": content,
"paragraphs": paragraphs,
})
# 부칙 추출
appendices = []
for el in root.findall(".//부칙단위"):
date = (el.findtext("부칙공포일자", "") or "").strip()
num = (el.findtext("부칙공포번호", "") or "").strip()
content = (el.findtext("부칙내용", "") or "").strip()
appendices.append({"date": date, "num": num, "content": content})
return {"info": info, "articles": articles, "appendices": appendices}
# --- 장 분할 ---
def split_by_chapter(articles: list) -> list[dict]:
"""조문 목록을 장 단위로 그룹핑
Returns: [{"chapter": "제1장 총칙", "sections": [...], "articles": [...]}]
"""
chapters = []
current_chapter = {"chapter": "", "sections": [], "articles": []}
current_section = ""
for article in articles:
content_stripped = article["content"].strip()
if article["kind"] == "전문":
# 장/절/편 구분자
if re.match(r"\d+장", content_stripped):
# 새 장 시작
if current_chapter["chapter"] or current_chapter["articles"]:
chapters.append(current_chapter)
current_chapter = {"chapter": content_stripped, "sections": [], "articles": []}
current_section = ""
elif re.match(r"\d+절", content_stripped):
current_section = content_stripped
current_chapter["sections"].append(current_section)
elif re.match(r"\d+편", content_stripped):
# 편은 장보다 상위 — 별도 처리 없이 장 파일 내 표시
if current_chapter["articles"]:
chapters.append(current_chapter)
current_chapter = {"chapter": content_stripped, "sections": [], "articles": []}
current_section = ""
continue
if article["kind"] == "조문":
article["_section"] = current_section
current_chapter["articles"].append(article)
# 마지막 장
if current_chapter["chapter"] or current_chapter["articles"]:
chapters.append(current_chapter)
# 장이 없는 법령 (fallback)
if not chapters and articles:
chapters = [{"chapter": "", "sections": [], "articles": [
a for a in articles if a["kind"] == "조문"
]}]
return chapters
# --- Markdown 변환 ---
def _format_article_num(article: dict) -> str:
"""조문번호 + 제목 → 앵커용 ID 생성"""
num = article["num"]
title = article["title"]
# "제38조" 또는 "제38조의2" 형태 추출
content = article["content"]
match = re.match(r"(제\d+조(?:의\d+)*)\s*", content)
if match:
return match.group(1)
return f"{num}"
def article_to_markdown(article: dict) -> str:
"""단일 조문 → Markdown"""
article_id = _format_article_num(article)
title = article["title"]
# 제목 정리 (한자 괄호 등)
if title:
header = f"## {article_id} ({title})" + " {#" + article_id + "}"
else:
header = f"## {article_id}" + " {#" + article_id + "}"
lines = [header]
# 본문 내용
content = article["content"].strip()
# 조문번호 접두사 제거 (예: "제38조 (안전조치)" → 본문만)
content = re.sub(r"^제\d+조(?:의\d+)*\s*(?:\([^)]*\))?\s*", "", content)
if content:
lines.append(content)
# 항
for p in article.get("paragraphs", []):
p_content = p["content"].strip()
if p_content:
lines.append(f"\n{p_content}")
for si in p.get("sub_items", []):
si_content = si["content"].strip()
if si_content:
lines.append(f" {si_content}")
return "\n".join(lines)
def chapter_to_markdown(law_name: str, info: dict, chapter: dict) -> str:
"""장 → Markdown 파일 내용"""
chapter_name = chapter["chapter"] or law_name
enforce = info.get("enforce_date", "")
if len(enforce) == 8:
enforce = f"{enforce[:4]}-{enforce[4:6]}-{enforce[6:]}"
ministry = info.get("ministry", "")
lines = [
f"# {chapter_name}",
f"> {law_name} | 시행 {enforce} | {ministry}",
"",
]
# 절 표시
current_section = ""
for article in chapter["articles"]:
section = article.get("_section", "")
if section and section != current_section:
current_section = section
lines.append(f"\n### {section}\n")
lines.append(article_to_markdown(article))
lines.append("")
return "\n".join(lines)
def info_to_markdown(info: dict) -> str:
"""기본정보 → Markdown"""
enforce = info.get("enforce_date", "")
if len(enforce) == 8:
enforce = f"{enforce[:4]}-{enforce[4:6]}-{enforce[6:]}"
announce = info.get("announce_date", "")
if len(announce) == 8:
announce = f"{announce[:4]}-{announce[4:6]}-{announce[6:]}"
return f"""# {info['name']} — 기본정보
| 항목 | 내용 |
|------|------|
| **법령명** | {info['name']} |
| **법령구분** | {info.get('category', '')} |
| **소관부처** | {info.get('ministry', '')} |
| **공포일자** | {announce} |
| **시행일자** | {enforce} |
| **법령ID** | {info.get('law_id', '')} |
> 이 문서는 law.go.kr API에서 자동 생성되었습니다.
> 마지막 업데이트: {datetime.now().strftime('%Y-%m-%d')}
"""
def appendices_to_markdown(law_name: str, appendices: list) -> str:
"""부칙 → Markdown"""
lines = [f"# {law_name} — 부칙", ""]
for ap in appendices:
date = ap["date"]
if len(date) == 8:
date = f"{date[:4]}-{date[4:6]}-{date[6:]}"
lines.append(f"## 부칙 (공포 {date}, 제{ap['num']}호)")
lines.append(ap["content"])
lines.append("")
return "\n".join(lines)
# --- 크로스 링크 ---
def add_internal_links(text: str, article_ids: set[str]) -> str:
"""같은 법률 내 조문 참조 → Markdown 앵커 링크
{#...} 앵커 내부와 이미 링크된 부분은 스킵
"""
def replace_ref(m):
full = m.group(0)
article_ref = m.group(1) # "제38조" or "제38조의2"
if article_ref in article_ids:
return f"[{full}](#{article_ref})"
return full
# {#...} 앵커와 [...](...) 링크 내부는 보호
protected = re.sub(r'\{#[^}]+\}|\[[^\]]*\]\([^)]*\)', lambda m: '\x00' * len(m.group()), text)
# "제N조(의N)*" 패턴 매칭 (항/호 부분은 링크에 포함하지 않음)
pattern = r"(제\d+조(?:의\d+)*)(?:제\d+항)?(?:제\d+호)?"
result = []
last = 0
for m in re.finditer(pattern, protected):
result.append(text[last:m.start()])
if '\x00' in protected[m.start():m.end()]:
result.append(text[m.start():m.end()]) # 보호 영역 — 원문 유지
else:
orig = text[m.start():m.end()]
article_ref = re.match(r"(제\d+조(?:의\d+)*)", orig)
if article_ref and article_ref.group(1) in article_ids:
result.append(f"[{orig}](#{article_ref.group(1)})")
else:
result.append(orig)
last = m.end()
result.append(text[last:])
return "".join(result)
def add_cross_law_links(text: str, law_name: str, article_chapter_map: dict) -> str:
"""다른 법률 참조 → DEVONthink wiki-link
article_chapter_map: {법령명: {제X조: 파일명}}
"""
# 「법령명」 제X조 패턴
def replace_cross_ref(m):
raw_name = m.group(1).strip()
article_ref = m.group(2)
# 약칭 → 정식명칭
resolved = LAW_ALIASES.get(raw_name, raw_name)
if resolved == law_name:
return m.group(0) # 같은 법률이면 스킵 (내부 링크로 처리)
# 장 매핑 조회
law_map = article_chapter_map.get(resolved, {})
chapter_file = law_map.get(article_ref)
if chapter_file:
return f"[[{chapter_file}#{article_ref}|{m.group(0)}]]"
return m.group(0)
pattern = r"「([^」]+)」\s*(제\d+조(?:의\d+)*)"
return re.sub(pattern, replace_cross_ref, text)
# --- 파일 저장 ---
def save_law_as_markdown(law_name: str, parsed: dict, output_dir: Path) -> list[Path]:
"""파싱된 법령 → 장별 MD 파일 저장. 생성된 파일 경로 리스트 반환."""
law_dir = output_dir / law_name.replace(" ", "_")
law_dir.mkdir(parents=True, exist_ok=True)
info = parsed["info"]
chapters = split_by_chapter(parsed["articles"])
files = []
# 기본정보
info_path = law_dir / "00_기본정보.md"
info_path.write_text(info_to_markdown(info), encoding="utf-8")
files.append(info_path)
# 같은 법률 내 조문 ID 수집 (내부 링크용)
all_article_ids = set()
for ch in chapters:
for a in ch["articles"]:
all_article_ids.add(_format_article_num(a))
# 장별 파일
for i, chapter in enumerate(chapters, 1):
ch_name = chapter["chapter"] or law_name
# 파일명 안전화
safe_name = re.sub(r"[·ㆍ\s]+", "_", ch_name)
safe_name = re.sub(r"[^\w가-힣]", "", safe_name)
filename = f"{safe_name}.md"
md_content = chapter_to_markdown(law_name, info, chapter)
# 내부 링크 적용
md_content = add_internal_links(md_content, all_article_ids)
filepath = law_dir / filename
filepath.write_text(md_content, encoding="utf-8")
files.append(filepath)
# 부칙
if parsed["appendices"]:
ap_path = law_dir / "부칙.md"
ap_path.write_text(appendices_to_markdown(law_name, parsed["appendices"]), encoding="utf-8")
files.append(ap_path)
logger.info(f"{law_name}: {len(files)}개 파일 생성 → {law_dir}")
return files
def build_article_chapter_map(law_name: str, parsed: dict) -> dict:
"""조문→장 파일명 매핑 생성 (크로스 링크용)
Returns: {제X조: 파일명(확장자 없음)}
"""
chapters = split_by_chapter(parsed["articles"])
mapping = {}
for chapter in chapters:
ch_name = chapter["chapter"] or law_name
safe_name = re.sub(r"[·ㆍ\s]+", "_", ch_name)
safe_name = re.sub(r"[^\w가-힣]", "", safe_name)
file_stem = f"{law_name.replace(' ', '_')}_{safe_name}" if chapter["chapter"] else law_name.replace(" ", "_")
for article in chapter["articles"]:
article_id = _format_article_num(article)
mapping[article_id] = file_stem
return mapping
# --- MST 캐시 ---
def load_mst_cache(cache_path: Path) -> dict:
if cache_path.exists():
with open(cache_path, "r", encoding="utf-8") as f:
return json.load(f)
return {}
def save_mst_cache(cache_path: Path, data: dict):
atomic_write_json(cache_path, data)
def lookup_current_mst(law_oc: str, law_name: str, category: str = "법률",
cache_path: Path = None, cache_ttl_days: int = 7) -> str | None:
"""법령명으로 현행 MST 검색 (캐시 TTL 적용)
- category → API 법령구분코드 매핑으로 검색 정확도 향상
"""
import requests
# 캐시 확인
if cache_path:
cache = load_mst_cache(cache_path)
entry = cache.get(law_name)
if entry:
cached_at = datetime.fromisoformat(entry["cached_at"])
if datetime.now() - cached_at < timedelta(days=cache_ttl_days):
return entry["mst"]
try:
resp = requests.get("https://www.law.go.kr/DRF/lawSearch.do", params={
"OC": law_oc, "target": "law", "type": "JSON",
"query": law_name, "display": "5",
}, timeout=15)
resp.raise_for_status()
data = resp.json().get("LawSearch", {})
laws = data.get("law", [])
if isinstance(laws, dict):
laws = [laws]
# 현행 필터 + 법령명 정확 매칭
current = [l for l in laws
if l.get("현행연혁코드") == "현행"
and law_name in l.get("법령명한글", "")]
if not current:
logger.warning(f"MST 검색 실패: {law_name} — 현행 법령 없음")
return None
mst = current[0]["법령일련번호"]
# 캐시 저장
if cache_path:
cache = load_mst_cache(cache_path)
cache[law_name] = {"mst": mst, "cached_at": datetime.now().isoformat()}
save_mst_cache(cache_path, cache)
return mst
except Exception as e:
logger.error(f"MST 조회 에러 [{law_name}]: {e}")
return None
if __name__ == "__main__":
# 단독 실행: XML 파일을 MD로 변환
if len(sys.argv) < 2:
print("사용법: python3 law_parser.py <xml_path> [output_dir]")
sys.exit(1)
xml_path = sys.argv[1]
output_dir = Path(sys.argv[2]) if len(sys.argv) > 2 else Path("data/laws/md")
parsed = parse_law_xml(xml_path)
print(f"법령: {parsed['info']['name']}")
print(f"조문: {len(parsed['articles'])}개, 부칙: {len(parsed['appendices'])}")
files = save_law_as_markdown(parsed["info"]["name"], parsed, output_dir)
print(f"생성된 파일: {len(files)}")
for f in files:
print(f" {f}")