- law_parser.py 신규: XML→MD 장 단위 분할, 조문 앵커 링크, 부칙 분리 - 장/절/편 자동 식별 (<조문여부>=전문), 장 없는 법령 fallback - DEVONthink wiki-link 크로스 링크 (같은 법률 내 + 다른 법률 간) - MST 자동 조회 + 7일 TTL 캐시 + 원자적 파일 쓰기 - 법령 약칭 매핑 (산안법→산업안전보건법 등) - law_monitor.py 리팩터링: - MONITORED_LAWS → Tier 1(15개 필수) / Tier 2(8개 참고, 비활성) - law_id → MST 방식 (현행 법령 자동 조회) - XML 통짜 저장 → 장별 Markdown 분할 저장 - DEVONthink 3단계 교체 (이동→생성→삭제, wiki-link 보존) - 에러 핸들링: 재시도 3회/백오프 + 부분 실패 허용 + 법령명 검증 - 실행 결과 law_last_run.json 기록 테스트: 15개 법령 전체 성공 (148개 MD 파일 생성) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
472 lines
16 KiB
Python
472 lines
16 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
법령 XML → Markdown 장 단위 분할 파서
|
|
- law.go.kr XML 파싱 → 장/절 구조 식별
|
|
- 장별 Markdown 파일 생성 (앵커 + 크로스 링크)
|
|
- 부칙 별도 파일 저장
|
|
"""
|
|
|
|
import re
|
|
import json
|
|
import os
|
|
import xml.etree.ElementTree as ET
|
|
from pathlib import Path
|
|
from datetime import datetime, timedelta
|
|
|
|
import sys
|
|
sys.path.insert(0, str(Path(__file__).parent))
|
|
from pkm_utils import setup_logger
|
|
|
|
logger = setup_logger("law_parser")
|
|
|
|
# 법령 약칭 매핑 (조문 내 참조 → 정식명칭)
|
|
LAW_ALIASES = {
|
|
"산안법": "산업안전보건법",
|
|
"산업안전보건법": "산업안전보건법",
|
|
"중대재해법": "중대재해 처벌 등에 관한 법률",
|
|
"중대재해처벌법": "중대재해 처벌 등에 관한 법률",
|
|
"화관법": "화학물질관리법",
|
|
"위안법": "위험물안전관리법",
|
|
"고압가스법": "고압가스 안전관리법",
|
|
"건설기술진흥법": "건설기술 진흥법",
|
|
"산재보험법": "산업재해보상보험법",
|
|
}
|
|
|
|
|
|
def atomic_write_json(filepath: Path, data: dict):
|
|
"""원자적 JSON 파일 쓰기 (경합 방지)"""
|
|
tmp = filepath.with_suffix(".json.tmp")
|
|
with open(tmp, "w", encoding="utf-8") as f:
|
|
json.dump(data, f, ensure_ascii=False, indent=2)
|
|
os.replace(str(tmp), str(filepath))
|
|
|
|
|
|
# --- XML 파싱 ---
|
|
|
|
def parse_law_xml(xml_path: str) -> dict:
|
|
"""XML 파싱 → 법령 구조 추출"""
|
|
tree = ET.parse(xml_path)
|
|
root = tree.getroot()
|
|
|
|
# 기본정보
|
|
info_el = root.find(".//기본정보")
|
|
info = {
|
|
"name": (info_el.findtext("법령명_한글", "") or "").strip(),
|
|
"law_id": (info_el.findtext("법령ID", "") or "").strip(),
|
|
"announce_date": (info_el.findtext("공포일자", "") or "").strip(),
|
|
"enforce_date": (info_el.findtext("시행일자", "") or "").strip(),
|
|
"ministry": (info_el.findtext("소관부처", "") or "").strip(),
|
|
"category": (info_el.findtext("법종구분", "") or "").strip(),
|
|
}
|
|
|
|
# 조문 추출
|
|
articles = []
|
|
for el in root.findall(".//조문단위"):
|
|
kind = (el.findtext("조문여부", "") or "").strip()
|
|
num = (el.findtext("조문번호", "") or "").strip()
|
|
title = (el.findtext("조문제목", "") or "").strip()
|
|
content = (el.findtext("조문내용", "") or "").strip()
|
|
|
|
# 항 추출
|
|
paragraphs = []
|
|
for p_el in el.findall("항"):
|
|
p_num = (p_el.findtext("항번호", "") or "").strip()
|
|
p_content = (p_el.findtext("항내용", "") or "").strip()
|
|
# 호 추출
|
|
sub_items = []
|
|
for h_el in p_el.findall("호"):
|
|
h_num = (h_el.findtext("호번호", "") or "").strip()
|
|
h_content = (h_el.findtext("호내용", "") or "").strip()
|
|
sub_items.append({"num": h_num, "content": h_content})
|
|
paragraphs.append({"num": p_num, "content": p_content, "sub_items": sub_items})
|
|
|
|
articles.append({
|
|
"kind": kind,
|
|
"num": num,
|
|
"title": title,
|
|
"content": content,
|
|
"paragraphs": paragraphs,
|
|
})
|
|
|
|
# 부칙 추출
|
|
appendices = []
|
|
for el in root.findall(".//부칙단위"):
|
|
date = (el.findtext("부칙공포일자", "") or "").strip()
|
|
num = (el.findtext("부칙공포번호", "") or "").strip()
|
|
content = (el.findtext("부칙내용", "") or "").strip()
|
|
appendices.append({"date": date, "num": num, "content": content})
|
|
|
|
return {"info": info, "articles": articles, "appendices": appendices}
|
|
|
|
|
|
# --- 장 분할 ---
|
|
|
|
def split_by_chapter(articles: list) -> list[dict]:
|
|
"""조문 목록을 장 단위로 그룹핑
|
|
Returns: [{"chapter": "제1장 총칙", "sections": [...], "articles": [...]}]
|
|
"""
|
|
chapters = []
|
|
current_chapter = {"chapter": "", "sections": [], "articles": []}
|
|
current_section = ""
|
|
|
|
for article in articles:
|
|
content_stripped = article["content"].strip()
|
|
|
|
if article["kind"] == "전문":
|
|
# 장/절/편 구분자
|
|
if re.match(r"제\d+장", content_stripped):
|
|
# 새 장 시작
|
|
if current_chapter["chapter"] or current_chapter["articles"]:
|
|
chapters.append(current_chapter)
|
|
current_chapter = {"chapter": content_stripped, "sections": [], "articles": []}
|
|
current_section = ""
|
|
elif re.match(r"제\d+절", content_stripped):
|
|
current_section = content_stripped
|
|
current_chapter["sections"].append(current_section)
|
|
elif re.match(r"제\d+편", content_stripped):
|
|
# 편은 장보다 상위 — 별도 처리 없이 장 파일 내 표시
|
|
if current_chapter["articles"]:
|
|
chapters.append(current_chapter)
|
|
current_chapter = {"chapter": content_stripped, "sections": [], "articles": []}
|
|
current_section = ""
|
|
continue
|
|
|
|
if article["kind"] == "조문":
|
|
article["_section"] = current_section
|
|
current_chapter["articles"].append(article)
|
|
|
|
# 마지막 장
|
|
if current_chapter["chapter"] or current_chapter["articles"]:
|
|
chapters.append(current_chapter)
|
|
|
|
# 장이 없는 법령 (fallback)
|
|
if not chapters and articles:
|
|
chapters = [{"chapter": "", "sections": [], "articles": [
|
|
a for a in articles if a["kind"] == "조문"
|
|
]}]
|
|
|
|
return chapters
|
|
|
|
|
|
# --- Markdown 변환 ---
|
|
|
|
def _format_article_num(article: dict) -> str:
|
|
"""조문번호 + 제목 → 앵커용 ID 생성"""
|
|
num = article["num"]
|
|
title = article["title"]
|
|
# "제38조" 또는 "제38조의2" 형태 추출
|
|
content = article["content"]
|
|
match = re.match(r"(제\d+조(?:의\d+)*)\s*", content)
|
|
if match:
|
|
return match.group(1)
|
|
return f"제{num}조"
|
|
|
|
|
|
def article_to_markdown(article: dict) -> str:
|
|
"""단일 조문 → Markdown"""
|
|
article_id = _format_article_num(article)
|
|
title = article["title"]
|
|
|
|
# 제목 정리 (한자 괄호 등)
|
|
if title:
|
|
header = f"## {article_id} ({title})" + " {#" + article_id + "}"
|
|
else:
|
|
header = f"## {article_id}" + " {#" + article_id + "}"
|
|
|
|
lines = [header]
|
|
|
|
# 본문 내용
|
|
content = article["content"].strip()
|
|
# 조문번호 접두사 제거 (예: "제38조 (안전조치)" → 본문만)
|
|
content = re.sub(r"^제\d+조(?:의\d+)*\s*(?:\([^)]*\))?\s*", "", content)
|
|
if content:
|
|
lines.append(content)
|
|
|
|
# 항
|
|
for p in article.get("paragraphs", []):
|
|
p_content = p["content"].strip()
|
|
if p_content:
|
|
lines.append(f"\n{p_content}")
|
|
for si in p.get("sub_items", []):
|
|
si_content = si["content"].strip()
|
|
if si_content:
|
|
lines.append(f" {si_content}")
|
|
|
|
return "\n".join(lines)
|
|
|
|
|
|
def chapter_to_markdown(law_name: str, info: dict, chapter: dict) -> str:
|
|
"""장 → Markdown 파일 내용"""
|
|
chapter_name = chapter["chapter"] or law_name
|
|
enforce = info.get("enforce_date", "")
|
|
if len(enforce) == 8:
|
|
enforce = f"{enforce[:4]}-{enforce[4:6]}-{enforce[6:]}"
|
|
ministry = info.get("ministry", "")
|
|
|
|
lines = [
|
|
f"# {chapter_name}",
|
|
f"> {law_name} | 시행 {enforce} | {ministry}",
|
|
"",
|
|
]
|
|
|
|
# 절 표시
|
|
current_section = ""
|
|
for article in chapter["articles"]:
|
|
section = article.get("_section", "")
|
|
if section and section != current_section:
|
|
current_section = section
|
|
lines.append(f"\n### {section}\n")
|
|
|
|
lines.append(article_to_markdown(article))
|
|
lines.append("")
|
|
|
|
return "\n".join(lines)
|
|
|
|
|
|
def info_to_markdown(info: dict) -> str:
|
|
"""기본정보 → Markdown"""
|
|
enforce = info.get("enforce_date", "")
|
|
if len(enforce) == 8:
|
|
enforce = f"{enforce[:4]}-{enforce[4:6]}-{enforce[6:]}"
|
|
announce = info.get("announce_date", "")
|
|
if len(announce) == 8:
|
|
announce = f"{announce[:4]}-{announce[4:6]}-{announce[6:]}"
|
|
|
|
return f"""# {info['name']} — 기본정보
|
|
|
|
| 항목 | 내용 |
|
|
|------|------|
|
|
| **법령명** | {info['name']} |
|
|
| **법령구분** | {info.get('category', '')} |
|
|
| **소관부처** | {info.get('ministry', '')} |
|
|
| **공포일자** | {announce} |
|
|
| **시행일자** | {enforce} |
|
|
| **법령ID** | {info.get('law_id', '')} |
|
|
|
|
> 이 문서는 law.go.kr API에서 자동 생성되었습니다.
|
|
> 마지막 업데이트: {datetime.now().strftime('%Y-%m-%d')}
|
|
"""
|
|
|
|
|
|
def appendices_to_markdown(law_name: str, appendices: list) -> str:
|
|
"""부칙 → Markdown"""
|
|
lines = [f"# {law_name} — 부칙", ""]
|
|
for ap in appendices:
|
|
date = ap["date"]
|
|
if len(date) == 8:
|
|
date = f"{date[:4]}-{date[4:6]}-{date[6:]}"
|
|
lines.append(f"## 부칙 (공포 {date}, 제{ap['num']}호)")
|
|
lines.append(ap["content"])
|
|
lines.append("")
|
|
return "\n".join(lines)
|
|
|
|
|
|
# --- 크로스 링크 ---
|
|
|
|
def add_internal_links(text: str, article_ids: set[str]) -> str:
|
|
"""같은 법률 내 조문 참조 → Markdown 앵커 링크
|
|
{#...} 앵커 내부와 이미 링크된 부분은 스킵
|
|
"""
|
|
def replace_ref(m):
|
|
full = m.group(0)
|
|
article_ref = m.group(1) # "제38조" or "제38조의2"
|
|
if article_ref in article_ids:
|
|
return f"[{full}](#{article_ref})"
|
|
return full
|
|
|
|
# {#...} 앵커와 [...](...) 링크 내부는 보호
|
|
protected = re.sub(r'\{#[^}]+\}|\[[^\]]*\]\([^)]*\)', lambda m: '\x00' * len(m.group()), text)
|
|
# "제N조(의N)*" 패턴 매칭 (항/호 부분은 링크에 포함하지 않음)
|
|
pattern = r"(제\d+조(?:의\d+)*)(?:제\d+항)?(?:제\d+호)?"
|
|
result = []
|
|
last = 0
|
|
for m in re.finditer(pattern, protected):
|
|
result.append(text[last:m.start()])
|
|
if '\x00' in protected[m.start():m.end()]:
|
|
result.append(text[m.start():m.end()]) # 보호 영역 — 원문 유지
|
|
else:
|
|
orig = text[m.start():m.end()]
|
|
article_ref = re.match(r"(제\d+조(?:의\d+)*)", orig)
|
|
if article_ref and article_ref.group(1) in article_ids:
|
|
result.append(f"[{orig}](#{article_ref.group(1)})")
|
|
else:
|
|
result.append(orig)
|
|
last = m.end()
|
|
result.append(text[last:])
|
|
return "".join(result)
|
|
|
|
|
|
def add_cross_law_links(text: str, law_name: str, article_chapter_map: dict) -> str:
|
|
"""다른 법률 참조 → DEVONthink wiki-link
|
|
article_chapter_map: {법령명: {제X조: 파일명}}
|
|
"""
|
|
# 「법령명」 제X조 패턴
|
|
def replace_cross_ref(m):
|
|
raw_name = m.group(1).strip()
|
|
article_ref = m.group(2)
|
|
|
|
# 약칭 → 정식명칭
|
|
resolved = LAW_ALIASES.get(raw_name, raw_name)
|
|
|
|
if resolved == law_name:
|
|
return m.group(0) # 같은 법률이면 스킵 (내부 링크로 처리)
|
|
|
|
# 장 매핑 조회
|
|
law_map = article_chapter_map.get(resolved, {})
|
|
chapter_file = law_map.get(article_ref)
|
|
if chapter_file:
|
|
return f"[[{chapter_file}#{article_ref}|{m.group(0)}]]"
|
|
return m.group(0)
|
|
|
|
pattern = r"「([^」]+)」\s*(제\d+조(?:의\d+)*)"
|
|
return re.sub(pattern, replace_cross_ref, text)
|
|
|
|
|
|
# --- 파일 저장 ---
|
|
|
|
def save_law_as_markdown(law_name: str, parsed: dict, output_dir: Path) -> list[Path]:
|
|
"""파싱된 법령 → 장별 MD 파일 저장. 생성된 파일 경로 리스트 반환."""
|
|
law_dir = output_dir / law_name.replace(" ", "_")
|
|
law_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
info = parsed["info"]
|
|
chapters = split_by_chapter(parsed["articles"])
|
|
files = []
|
|
|
|
# 기본정보
|
|
info_path = law_dir / "00_기본정보.md"
|
|
info_path.write_text(info_to_markdown(info), encoding="utf-8")
|
|
files.append(info_path)
|
|
|
|
# 같은 법률 내 조문 ID 수집 (내부 링크용)
|
|
all_article_ids = set()
|
|
for ch in chapters:
|
|
for a in ch["articles"]:
|
|
all_article_ids.add(_format_article_num(a))
|
|
|
|
# 장별 파일
|
|
for i, chapter in enumerate(chapters, 1):
|
|
ch_name = chapter["chapter"] or law_name
|
|
# 파일명 안전화
|
|
safe_name = re.sub(r"[·ㆍ\s]+", "_", ch_name)
|
|
safe_name = re.sub(r"[^\w가-힣]", "", safe_name)
|
|
filename = f"{safe_name}.md"
|
|
|
|
md_content = chapter_to_markdown(law_name, info, chapter)
|
|
# 내부 링크 적용
|
|
md_content = add_internal_links(md_content, all_article_ids)
|
|
|
|
filepath = law_dir / filename
|
|
filepath.write_text(md_content, encoding="utf-8")
|
|
files.append(filepath)
|
|
|
|
# 부칙
|
|
if parsed["appendices"]:
|
|
ap_path = law_dir / "부칙.md"
|
|
ap_path.write_text(appendices_to_markdown(law_name, parsed["appendices"]), encoding="utf-8")
|
|
files.append(ap_path)
|
|
|
|
logger.info(f"{law_name}: {len(files)}개 파일 생성 → {law_dir}")
|
|
return files
|
|
|
|
|
|
def build_article_chapter_map(law_name: str, parsed: dict) -> dict:
|
|
"""조문→장 파일명 매핑 생성 (크로스 링크용)
|
|
Returns: {제X조: 파일명(확장자 없음)}
|
|
"""
|
|
chapters = split_by_chapter(parsed["articles"])
|
|
mapping = {}
|
|
for chapter in chapters:
|
|
ch_name = chapter["chapter"] or law_name
|
|
safe_name = re.sub(r"[·ㆍ\s]+", "_", ch_name)
|
|
safe_name = re.sub(r"[^\w가-힣]", "", safe_name)
|
|
file_stem = f"{law_name.replace(' ', '_')}_{safe_name}" if chapter["chapter"] else law_name.replace(" ", "_")
|
|
|
|
for article in chapter["articles"]:
|
|
article_id = _format_article_num(article)
|
|
mapping[article_id] = file_stem
|
|
|
|
return mapping
|
|
|
|
|
|
# --- MST 캐시 ---
|
|
|
|
def load_mst_cache(cache_path: Path) -> dict:
|
|
if cache_path.exists():
|
|
with open(cache_path, "r", encoding="utf-8") as f:
|
|
return json.load(f)
|
|
return {}
|
|
|
|
|
|
def save_mst_cache(cache_path: Path, data: dict):
|
|
atomic_write_json(cache_path, data)
|
|
|
|
|
|
def lookup_current_mst(law_oc: str, law_name: str, category: str = "법률",
|
|
cache_path: Path = None, cache_ttl_days: int = 7) -> str | None:
|
|
"""법령명으로 현행 MST 검색 (캐시 TTL 적용)
|
|
- category → API 법령구분코드 매핑으로 검색 정확도 향상
|
|
"""
|
|
import requests
|
|
|
|
# 캐시 확인
|
|
if cache_path:
|
|
cache = load_mst_cache(cache_path)
|
|
entry = cache.get(law_name)
|
|
if entry:
|
|
cached_at = datetime.fromisoformat(entry["cached_at"])
|
|
if datetime.now() - cached_at < timedelta(days=cache_ttl_days):
|
|
return entry["mst"]
|
|
|
|
try:
|
|
resp = requests.get("https://www.law.go.kr/DRF/lawSearch.do", params={
|
|
"OC": law_oc, "target": "law", "type": "JSON",
|
|
"query": law_name, "display": "5",
|
|
}, timeout=15)
|
|
resp.raise_for_status()
|
|
data = resp.json().get("LawSearch", {})
|
|
laws = data.get("law", [])
|
|
if isinstance(laws, dict):
|
|
laws = [laws]
|
|
|
|
# 현행 필터 + 법령명 정확 매칭
|
|
current = [l for l in laws
|
|
if l.get("현행연혁코드") == "현행"
|
|
and law_name in l.get("법령명한글", "")]
|
|
|
|
if not current:
|
|
logger.warning(f"MST 검색 실패: {law_name} — 현행 법령 없음")
|
|
return None
|
|
|
|
mst = current[0]["법령일련번호"]
|
|
|
|
# 캐시 저장
|
|
if cache_path:
|
|
cache = load_mst_cache(cache_path)
|
|
cache[law_name] = {"mst": mst, "cached_at": datetime.now().isoformat()}
|
|
save_mst_cache(cache_path, cache)
|
|
|
|
return mst
|
|
except Exception as e:
|
|
logger.error(f"MST 조회 에러 [{law_name}]: {e}")
|
|
return None
|
|
|
|
|
|
if __name__ == "__main__":
|
|
# 단독 실행: XML 파일을 MD로 변환
|
|
if len(sys.argv) < 2:
|
|
print("사용법: python3 law_parser.py <xml_path> [output_dir]")
|
|
sys.exit(1)
|
|
|
|
xml_path = sys.argv[1]
|
|
output_dir = Path(sys.argv[2]) if len(sys.argv) > 2 else Path("data/laws/md")
|
|
|
|
parsed = parse_law_xml(xml_path)
|
|
print(f"법령: {parsed['info']['name']}")
|
|
print(f"조문: {len(parsed['articles'])}개, 부칙: {len(parsed['appendices'])}개")
|
|
|
|
files = save_law_as_markdown(parsed["info"]["name"], parsed, output_dir)
|
|
print(f"생성된 파일: {len(files)}개")
|
|
for f in files:
|
|
print(f" {f}")
|