From 4b7ddf39c1df80fbdd8fa1e1bb850359e6a9fdb0 Mon Sep 17 00:00:00 2001 From: hyungi Date: Mon, 30 Mar 2026 15:00:28 +0900 Subject: [PATCH] =?UTF-8?q?feat:=20=EB=B2=95=EB=A0=B9=20=EB=AA=A8=EB=8B=88?= =?UTF-8?q?=ED=84=B0=EB=A7=81=20=EB=8C=80=ED=8F=AD=20=EA=B0=9C=EC=84=A0=20?= =?UTF-8?q?=E2=80=94=20=EC=9E=A5=20=EB=8B=A8=EC=9C=84=20MD=20=EB=B6=84?= =?UTF-8?q?=ED=95=A0=20+=20=ED=81=AC=EB=A1=9C=EC=8A=A4=20=EB=A7=81?= =?UTF-8?q?=ED=81=AC=20+=20Tier=20=EB=B6=84=EB=A6=AC?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - law_parser.py 신규: XML→MD 장 단위 분할, 조문 앵커 링크, 부칙 분리 - 장/절/편 자동 식별 (<조문여부>=전문), 장 없는 법령 fallback - DEVONthink wiki-link 크로스 링크 (같은 법률 내 + 다른 법률 간) - MST 자동 조회 + 7일 TTL 캐시 + 원자적 파일 쓰기 - 법령 약칭 매핑 (산안법→산업안전보건법 등) - law_monitor.py 리팩터링: - MONITORED_LAWS → Tier 1(15개 필수) / Tier 2(8개 참고, 비활성) - law_id → MST 방식 (현행 법령 자동 조회) - XML 통짜 저장 → 장별 Markdown 분할 저장 - DEVONthink 3단계 교체 (이동→생성→삭제, wiki-link 보존) - 에러 핸들링: 재시도 3회/백오프 + 부분 실패 허용 + 법령명 검증 - 실행 결과 law_last_run.json 기록 테스트: 15개 법령 전체 성공 (148개 MD 파일 생성) Co-Authored-By: Claude Opus 4.6 (1M context) --- scripts/law_monitor.py | 272 ++++++++++++++++++------ scripts/law_parser.py | 471 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 673 insertions(+), 70 deletions(-) create mode 100644 scripts/law_parser.py diff --git a/scripts/law_monitor.py b/scripts/law_monitor.py index d8d94b3..bc378aa 100644 --- a/scripts/law_monitor.py +++ b/scripts/law_monitor.py @@ -17,18 +17,50 @@ from pathlib import Path sys.path.insert(0, str(Path(__file__).parent)) from pkm_utils import setup_logger, load_credentials, run_applescript_inline, llm_generate, PROJECT_ROOT, DATA_DIR +from law_parser import ( + parse_law_xml, save_law_as_markdown, build_article_chapter_map, + add_cross_law_links, lookup_current_mst, atomic_write_json, +) logger = setup_logger("law_monitor") -# 모니터링 대상 법령 -MONITORED_LAWS = [ - {"name": "산업안전보건법", "law_id": "001789", "category": "법률"}, - {"name": "산업안전보건법 시행령", "law_id": "001790", "category": "대통령령"}, - {"name": "산업안전보건법 시행규칙", "law_id": "001791", "category": "부령"}, - {"name": "중대재해 처벌 등에 관한 법률", "law_id": "019005", "category": "법률"}, - {"name": "중대재해 처벌 등에 관한 법률 시행령", "law_id": "019006", "category": "대통령령"}, - {"name": "화학물질관리법", "law_id": "012354", "category": "법률"}, - {"name": "위험물안전관리법", "law_id": "001478", "category": "법률"}, +MST_CACHE_FILE = DATA_DIR / "law_mst_cache.json" +MD_OUTPUT_DIR = DATA_DIR / "laws" / "md" +MD_OUTPUT_DIR.mkdir(parents=True, exist_ok=True) + +# Tier 1 — 필수 모니터링 (업무 직접 관련, 매일 확인) +TIER1_LAWS = [ + # 산업안전 핵심 + {"name": "산업안전보건법", "category": "법률"}, + {"name": "산업안전보건법 시행령", "category": "대통령령"}, + {"name": "산업안전보건법 시행규칙", "category": "부령"}, + {"name": "중대재해 처벌 등에 관한 법률", "category": "법률"}, + {"name": "중대재해 처벌 등에 관한 법률 시행령", "category": "대통령령"}, + # 화학/위험물 + {"name": "화학물질관리법", "category": "법률"}, + {"name": "위험물안전관리법", "category": "법률"}, + {"name": "고압가스 안전관리법", "category": "법률"}, + # 전기/소방/건설 + {"name": "전기안전관리법", "category": "법률"}, + {"name": "소방시설 설치 및 관리에 관한 법률", "category": "법률"}, + {"name": "건설기술 진흥법", "category": "법률"}, + # 시설물/노동 + {"name": "시설물의 안전 및 유지관리에 관한 특별법", "category": "법률"}, + {"name": "근로기준법", "category": "법률"}, + {"name": "산업재해보상보험법", "category": "법률"}, + {"name": "근로자참여 및 협력증진에 관한 법률", "category": "법률"}, +] + +# Tier 2 — 참고 (기본 비활성, --include-tier2 또는 설정으로 활성화) +TIER2_LAWS = [ + {"name": "원자력안전법", "category": "법률"}, + {"name": "방사선안전관리법", "category": "법률"}, + {"name": "환경영향평가법", "category": "법률"}, + {"name": "석면안전관리법", "category": "법률"}, + {"name": "승강기 안전관리법", "category": "법률"}, + {"name": "연구실 안전환경 조성에 관한 법률", "category": "법률"}, + {"name": "재난 및 안전관리 기본법", "category": "법률"}, + {"name": "고용보험법", "category": "법률"}, ] # 마지막 확인 일자 저장 파일 @@ -46,37 +78,36 @@ def load_last_check() -> dict: def save_last_check(data: dict): - """마지막 확인 일자 저장""" - with open(LAST_CHECK_FILE, "w") as f: - json.dump(data, f, ensure_ascii=False, indent=2) + """마지막 확인 일자 저장 (원자적 쓰기)""" + atomic_write_json(LAST_CHECK_FILE, data) -def fetch_law_info(law_oc: str, law_id: str) -> dict | None: - """법령 정보 조회 (법령 API)""" - url = "https://www.law.go.kr/DRF/lawSearch.do" +def fetch_law_info(law_oc: str, mst: str) -> dict | None: + """법령 정보 조회 — lawService.do로 MST 직접 조회 (XML → 기본정보 추출)""" + url = "https://www.law.go.kr/DRF/lawService.do" params = { "OC": law_oc, "target": "law", - "type": "JSON", - "MST": law_id, + "type": "XML", + "MST": mst, } try: resp = requests.get(url, params=params, timeout=30) resp.raise_for_status() - data = resp.json() - # API 에러 응답 감지 - if "result" in data and "실패" in str(data.get("result", "")): - logger.error(f"법령 API 에러 [{law_id}]: {data.get('result')} — {data.get('msg')}") + root = ET.fromstring(resp.content) + info_el = root.find(".//기본정보") + if info_el is None: + logger.warning(f"기본정보 없음 [MST={mst}]") return None - if "LawSearch" in data and "law" in data["LawSearch"]: - laws = data["LawSearch"]["law"] - if isinstance(laws, list): - return laws[0] if laws else None - return laws - logger.warning(f"법령 응답에 데이터 없음 [{law_id}]: {list(data.keys())}") - return None + return { + "법령명한글": (info_el.findtext("법령명_한글", "") or "").strip(), + "공포일자": (info_el.findtext("공포일자", "") or "").strip(), + "시행일자": (info_el.findtext("시행일자", "") or "").strip(), + "법령ID": (info_el.findtext("법령ID", "") or "").strip(), + "소관부처": (info_el.findtext("소관부처", "") or "").strip(), + } except Exception as e: - logger.error(f"법령 조회 실패 [{law_id}]: {e}") + logger.error(f"법령 조회 실패 [MST={mst}]: {e}") return None @@ -109,32 +140,91 @@ def save_law_file(law_name: str, content: str) -> Path: return filepath -def import_to_devonthink(filepath: Path, law_name: str, category: str): - """DEVONthink 04_Industrial Safety로 임포트 — 변수 방식""" - fp = str(filepath) - script = f'set fp to "{fp}"\n' - script += 'tell application id "DNtp"\n' - script += ' repeat with db in databases\n' - script += ' if name of db is "04_Industrial safety" then\n' - script += ' set targetGroup to create location "/10_Legislation/Law" in db\n' - script += ' set theRecord to import fp to targetGroup\n' - script += f' set tags of theRecord to {{"#주제/산업안전/법령", "$유형/법령", "{category}"}}\n' - script += ' add custom meta data "law_monitor" for "sourceChannel" to theRecord\n' - script += ' add custom meta data "external" for "dataOrigin" to theRecord\n' - script += ' add custom meta data (current date) for "lastAIProcess" to theRecord\n' - script += ' exit repeat\n' - script += ' end if\n' - script += ' end repeat\n' - script += 'end tell' +def import_law_to_devonthink(law_name: str, md_files: list[Path], category: str): + """DEVONthink 04_Industrial Safety로 장별 MD 파일 임포트 + 3단계 교체: 기존 폴더 이동 → 신규 생성 → 구 폴더 삭제 (wiki-link 끊김 최소화) + """ + safe_name = law_name.replace(" ", "_") + group_path = f"/10_Legislation/{safe_name}" + + # 1단계: 기존 폴더 이동 (있으면) + rename_script = ( + 'tell application id "DNtp"\n' + ' repeat with db in databases\n' + ' if name of db is "04_Industrial safety" then\n' + f' set oldGroup to get record at "{group_path}" in db\n' + ' if oldGroup is not missing value then\n' + f' set name of oldGroup to "{safe_name}_old"\n' + ' end if\n' + ' exit repeat\n' + ' end if\n' + ' end repeat\n' + 'end tell' + ) try: - run_applescript_inline(script) - logger.info(f"DEVONthink 임포트 완료: {law_name}") - except Exception as e: - logger.error(f"DEVONthink 임포트 실패 [{law_name}]: {e}") + run_applescript_inline(rename_script) + except Exception: + pass # 기존 폴더 없으면 무시 + + # 2단계: 신규 폴더 생성 + 파일 임포트 + for filepath in md_files: + fp = str(filepath) + script = f'set fp to "{fp}"\n' + script += 'tell application id "DNtp"\n' + script += ' repeat with db in databases\n' + script += ' if name of db is "04_Industrial safety" then\n' + script += f' set targetGroup to create location "{group_path}" in db\n' + script += ' set theRecord to import fp to targetGroup\n' + script += f' set tags of theRecord to {{"#주제/산업안전/법령", "$유형/법령", "{category}"}}\n' + script += ' add custom meta data "law_monitor" for "sourceChannel" to theRecord\n' + script += ' add custom meta data "external" for "dataOrigin" to theRecord\n' + script += ' add custom meta data (current date) for "lastAIProcess" to theRecord\n' + script += ' exit repeat\n' + script += ' end if\n' + script += ' end repeat\n' + script += 'end tell' + try: + run_applescript_inline(script) + except Exception as e: + logger.error(f"DEVONthink 임포트 실패 [{filepath.name}]: {e}") + + # 3단계: 구 폴더 삭제 + delete_script = ( + 'tell application id "DNtp"\n' + ' repeat with db in databases\n' + ' if name of db is "04_Industrial safety" then\n' + f' set oldGroup to get record at "/10_Legislation/{safe_name}_old" in db\n' + ' if oldGroup is not missing value then\n' + ' delete record oldGroup\n' + ' end if\n' + ' exit repeat\n' + ' end if\n' + ' end repeat\n' + 'end tell' + ) + try: + run_applescript_inline(delete_script) + except Exception: + pass + + logger.info(f"DEVONthink 임포트 완료: {law_name} ({len(md_files)}개 파일)") -def run(): - """메인 실행""" +def _fetch_with_retry(func, *args, retries=3, backoff=(5, 15, 30)): + """API 호출 재시도 래퍼""" + import time + for i in range(retries): + result = func(*args) + if result is not None: + return result + if i < retries - 1: + logger.warning(f"재시도 {i+2}/{retries} ({backoff[i]}초 후)") + time.sleep(backoff[i]) + return None + + +def run(include_tier2: bool = False): + """메인 실행 — MST 자동 조회 + 장 단위 MD 분할 + DEVONthink 임포트""" logger.info("=== 법령 모니터링 시작 ===") creds = load_credentials() @@ -143,41 +233,82 @@ def run(): logger.error("LAW_OC 인증키가 설정되지 않았습니다. credentials.env를 확인하세요.") sys.exit(1) + laws = TIER1_LAWS + (TIER2_LAWS if include_tier2 else []) last_check = load_last_check() changes_found = 0 + failures = [] - for law in MONITORED_LAWS: + for law in laws: law_name = law["name"] - law_id = law["law_id"] category = law["category"] - logger.info(f"확인 중: {law_name} ({law_id})") - - info = fetch_law_info(law_oc, law_id) - if not info: + # MST 자동 조회 (캐시 TTL 7일) + mst = lookup_current_mst(law_oc, law_name, category, cache_path=MST_CACHE_FILE) + if not mst: + failures.append({"name": law_name, "error": "MST 조회 실패"}) continue - # 시행일자 또는 공포일자로 변경 감지 - announce_date = info.get("공포일자", info.get("시행일자", "")) - prev_date = last_check.get(law_id, "") + logger.info(f"확인 중: {law_name} (MST={mst})") + + # XML 한 번에 다운로드 (정보 추출 + 파싱 겸용) + xml_text = _fetch_with_retry(fetch_law_text, law_oc, mst) + if not xml_text: + failures.append({"name": law_name, "error": "XML 다운로드 실패"}) + continue + + # XML에서 기본정보 추출 + try: + root = ET.fromstring(xml_text) + info_el = root.find(".//기본정보") + returned_name = (info_el.findtext("법령명_한글", "") or "").strip() if info_el else "" + except Exception: + failures.append({"name": law_name, "error": "XML 파싱 실패"}) + continue + + # 법령명 검증 + if law_name not in returned_name and returned_name not in law_name: + logger.warning(f"법령명 불일치: 요청='{law_name}' 응답='{returned_name}' — 스킵") + failures.append({"name": law_name, "error": f"법령명 불일치: {returned_name}"}) + continue + + # 공포일자로 변경 감지 + announce_date = (info_el.findtext("공포일자", "") or "").strip() if info_el else "" + prev_date = last_check.get(law_name, "") if announce_date and announce_date != prev_date: logger.info(f"변경 감지: {law_name} — 공포일자 {announce_date} (이전: {prev_date or '없음'})") - # 법령 본문 다운로드 - law_mst = info.get("법령MST", law_id) - text = fetch_law_text(law_oc, law_mst) - if text: - filepath = save_law_file(law_name, text) - import_to_devonthink(filepath, law_name, category) - changes_found += 1 + # XML 저장 + xml_path = save_law_file(law_name, xml_text) - last_check[law_id] = announce_date + # XML → MD 장 분할 + try: + parsed = parse_law_xml(str(xml_path)) + md_files = save_law_as_markdown(law_name, parsed, MD_OUTPUT_DIR) + import_law_to_devonthink(law_name, md_files, category) + changes_found += 1 + except Exception as e: + logger.error(f"법령 파싱/임포트 실패 [{law_name}]: {e}", exc_info=True) + failures.append({"name": law_name, "error": str(e)}) + continue + + last_check[law_name] = announce_date else: logger.debug(f"변경 없음: {law_name}") save_last_check(last_check) + # 실행 결과 기록 + run_result = { + "timestamp": datetime.now().isoformat(), + "total": len(laws), + "changes": changes_found, + "failures": failures, + } + atomic_write_json(DATA_DIR / "law_last_run.json", run_result) + if failures: + logger.warning(f"실패 {len(failures)}건: {[f['name'] for f in failures]}") + # ─── 외국 법령 (빈도 체크 후 실행) ─── us_count = fetch_us_osha(last_check) jp_count = fetch_jp_mhlw(last_check) @@ -395,4 +526,5 @@ def fetch_eu_osha(last_check: dict) -> int: if __name__ == "__main__": - run() + tier2 = "--include-tier2" in sys.argv + run(include_tier2=tier2) diff --git a/scripts/law_parser.py b/scripts/law_parser.py new file mode 100644 index 0000000..e996e94 --- /dev/null +++ b/scripts/law_parser.py @@ -0,0 +1,471 @@ +#!/usr/bin/env python3 +""" +법령 XML → Markdown 장 단위 분할 파서 +- law.go.kr XML 파싱 → 장/절 구조 식별 +- 장별 Markdown 파일 생성 (앵커 + 크로스 링크) +- 부칙 별도 파일 저장 +""" + +import re +import json +import os +import xml.etree.ElementTree as ET +from pathlib import Path +from datetime import datetime, timedelta + +import sys +sys.path.insert(0, str(Path(__file__).parent)) +from pkm_utils import setup_logger + +logger = setup_logger("law_parser") + +# 법령 약칭 매핑 (조문 내 참조 → 정식명칭) +LAW_ALIASES = { + "산안법": "산업안전보건법", + "산업안전보건법": "산업안전보건법", + "중대재해법": "중대재해 처벌 등에 관한 법률", + "중대재해처벌법": "중대재해 처벌 등에 관한 법률", + "화관법": "화학물질관리법", + "위안법": "위험물안전관리법", + "고압가스법": "고압가스 안전관리법", + "건설기술진흥법": "건설기술 진흥법", + "산재보험법": "산업재해보상보험법", +} + + +def atomic_write_json(filepath: Path, data: dict): + """원자적 JSON 파일 쓰기 (경합 방지)""" + tmp = filepath.with_suffix(".json.tmp") + with open(tmp, "w", encoding="utf-8") as f: + json.dump(data, f, ensure_ascii=False, indent=2) + os.replace(str(tmp), str(filepath)) + + +# --- XML 파싱 --- + +def parse_law_xml(xml_path: str) -> dict: + """XML 파싱 → 법령 구조 추출""" + tree = ET.parse(xml_path) + root = tree.getroot() + + # 기본정보 + info_el = root.find(".//기본정보") + info = { + "name": (info_el.findtext("법령명_한글", "") or "").strip(), + "law_id": (info_el.findtext("법령ID", "") or "").strip(), + "announce_date": (info_el.findtext("공포일자", "") or "").strip(), + "enforce_date": (info_el.findtext("시행일자", "") or "").strip(), + "ministry": (info_el.findtext("소관부처", "") or "").strip(), + "category": (info_el.findtext("법종구분", "") or "").strip(), + } + + # 조문 추출 + articles = [] + for el in root.findall(".//조문단위"): + kind = (el.findtext("조문여부", "") or "").strip() + num = (el.findtext("조문번호", "") or "").strip() + title = (el.findtext("조문제목", "") or "").strip() + content = (el.findtext("조문내용", "") or "").strip() + + # 항 추출 + paragraphs = [] + for p_el in el.findall("항"): + p_num = (p_el.findtext("항번호", "") or "").strip() + p_content = (p_el.findtext("항내용", "") or "").strip() + # 호 추출 + sub_items = [] + for h_el in p_el.findall("호"): + h_num = (h_el.findtext("호번호", "") or "").strip() + h_content = (h_el.findtext("호내용", "") or "").strip() + sub_items.append({"num": h_num, "content": h_content}) + paragraphs.append({"num": p_num, "content": p_content, "sub_items": sub_items}) + + articles.append({ + "kind": kind, + "num": num, + "title": title, + "content": content, + "paragraphs": paragraphs, + }) + + # 부칙 추출 + appendices = [] + for el in root.findall(".//부칙단위"): + date = (el.findtext("부칙공포일자", "") or "").strip() + num = (el.findtext("부칙공포번호", "") or "").strip() + content = (el.findtext("부칙내용", "") or "").strip() + appendices.append({"date": date, "num": num, "content": content}) + + return {"info": info, "articles": articles, "appendices": appendices} + + +# --- 장 분할 --- + +def split_by_chapter(articles: list) -> list[dict]: + """조문 목록을 장 단위로 그룹핑 + Returns: [{"chapter": "제1장 총칙", "sections": [...], "articles": [...]}] + """ + chapters = [] + current_chapter = {"chapter": "", "sections": [], "articles": []} + current_section = "" + + for article in articles: + content_stripped = article["content"].strip() + + if article["kind"] == "전문": + # 장/절/편 구분자 + if re.match(r"제\d+장", content_stripped): + # 새 장 시작 + if current_chapter["chapter"] or current_chapter["articles"]: + chapters.append(current_chapter) + current_chapter = {"chapter": content_stripped, "sections": [], "articles": []} + current_section = "" + elif re.match(r"제\d+절", content_stripped): + current_section = content_stripped + current_chapter["sections"].append(current_section) + elif re.match(r"제\d+편", content_stripped): + # 편은 장보다 상위 — 별도 처리 없이 장 파일 내 표시 + if current_chapter["articles"]: + chapters.append(current_chapter) + current_chapter = {"chapter": content_stripped, "sections": [], "articles": []} + current_section = "" + continue + + if article["kind"] == "조문": + article["_section"] = current_section + current_chapter["articles"].append(article) + + # 마지막 장 + if current_chapter["chapter"] or current_chapter["articles"]: + chapters.append(current_chapter) + + # 장이 없는 법령 (fallback) + if not chapters and articles: + chapters = [{"chapter": "", "sections": [], "articles": [ + a for a in articles if a["kind"] == "조문" + ]}] + + return chapters + + +# --- Markdown 변환 --- + +def _format_article_num(article: dict) -> str: + """조문번호 + 제목 → 앵커용 ID 생성""" + num = article["num"] + title = article["title"] + # "제38조" 또는 "제38조의2" 형태 추출 + content = article["content"] + match = re.match(r"(제\d+조(?:의\d+)*)\s*", content) + if match: + return match.group(1) + return f"제{num}조" + + +def article_to_markdown(article: dict) -> str: + """단일 조문 → Markdown""" + article_id = _format_article_num(article) + title = article["title"] + + # 제목 정리 (한자 괄호 등) + if title: + header = f"## {article_id} ({title})" + " {#" + article_id + "}" + else: + header = f"## {article_id}" + " {#" + article_id + "}" + + lines = [header] + + # 본문 내용 + content = article["content"].strip() + # 조문번호 접두사 제거 (예: "제38조 (안전조치)" → 본문만) + content = re.sub(r"^제\d+조(?:의\d+)*\s*(?:\([^)]*\))?\s*", "", content) + if content: + lines.append(content) + + # 항 + for p in article.get("paragraphs", []): + p_content = p["content"].strip() + if p_content: + lines.append(f"\n{p_content}") + for si in p.get("sub_items", []): + si_content = si["content"].strip() + if si_content: + lines.append(f" {si_content}") + + return "\n".join(lines) + + +def chapter_to_markdown(law_name: str, info: dict, chapter: dict) -> str: + """장 → Markdown 파일 내용""" + chapter_name = chapter["chapter"] or law_name + enforce = info.get("enforce_date", "") + if len(enforce) == 8: + enforce = f"{enforce[:4]}-{enforce[4:6]}-{enforce[6:]}" + ministry = info.get("ministry", "") + + lines = [ + f"# {chapter_name}", + f"> {law_name} | 시행 {enforce} | {ministry}", + "", + ] + + # 절 표시 + current_section = "" + for article in chapter["articles"]: + section = article.get("_section", "") + if section and section != current_section: + current_section = section + lines.append(f"\n### {section}\n") + + lines.append(article_to_markdown(article)) + lines.append("") + + return "\n".join(lines) + + +def info_to_markdown(info: dict) -> str: + """기본정보 → Markdown""" + enforce = info.get("enforce_date", "") + if len(enforce) == 8: + enforce = f"{enforce[:4]}-{enforce[4:6]}-{enforce[6:]}" + announce = info.get("announce_date", "") + if len(announce) == 8: + announce = f"{announce[:4]}-{announce[4:6]}-{announce[6:]}" + + return f"""# {info['name']} — 기본정보 + +| 항목 | 내용 | +|------|------| +| **법령명** | {info['name']} | +| **법령구분** | {info.get('category', '')} | +| **소관부처** | {info.get('ministry', '')} | +| **공포일자** | {announce} | +| **시행일자** | {enforce} | +| **법령ID** | {info.get('law_id', '')} | + +> 이 문서는 law.go.kr API에서 자동 생성되었습니다. +> 마지막 업데이트: {datetime.now().strftime('%Y-%m-%d')} +""" + + +def appendices_to_markdown(law_name: str, appendices: list) -> str: + """부칙 → Markdown""" + lines = [f"# {law_name} — 부칙", ""] + for ap in appendices: + date = ap["date"] + if len(date) == 8: + date = f"{date[:4]}-{date[4:6]}-{date[6:]}" + lines.append(f"## 부칙 (공포 {date}, 제{ap['num']}호)") + lines.append(ap["content"]) + lines.append("") + return "\n".join(lines) + + +# --- 크로스 링크 --- + +def add_internal_links(text: str, article_ids: set[str]) -> str: + """같은 법률 내 조문 참조 → Markdown 앵커 링크 + {#...} 앵커 내부와 이미 링크된 부분은 스킵 + """ + def replace_ref(m): + full = m.group(0) + article_ref = m.group(1) # "제38조" or "제38조의2" + if article_ref in article_ids: + return f"[{full}](#{article_ref})" + return full + + # {#...} 앵커와 [...](...) 링크 내부는 보호 + protected = re.sub(r'\{#[^}]+\}|\[[^\]]*\]\([^)]*\)', lambda m: '\x00' * len(m.group()), text) + # "제N조(의N)*" 패턴 매칭 (항/호 부분은 링크에 포함하지 않음) + pattern = r"(제\d+조(?:의\d+)*)(?:제\d+항)?(?:제\d+호)?" + result = [] + last = 0 + for m in re.finditer(pattern, protected): + result.append(text[last:m.start()]) + if '\x00' in protected[m.start():m.end()]: + result.append(text[m.start():m.end()]) # 보호 영역 — 원문 유지 + else: + orig = text[m.start():m.end()] + article_ref = re.match(r"(제\d+조(?:의\d+)*)", orig) + if article_ref and article_ref.group(1) in article_ids: + result.append(f"[{orig}](#{article_ref.group(1)})") + else: + result.append(orig) + last = m.end() + result.append(text[last:]) + return "".join(result) + + +def add_cross_law_links(text: str, law_name: str, article_chapter_map: dict) -> str: + """다른 법률 참조 → DEVONthink wiki-link + article_chapter_map: {법령명: {제X조: 파일명}} + """ + # 「법령명」 제X조 패턴 + def replace_cross_ref(m): + raw_name = m.group(1).strip() + article_ref = m.group(2) + + # 약칭 → 정식명칭 + resolved = LAW_ALIASES.get(raw_name, raw_name) + + if resolved == law_name: + return m.group(0) # 같은 법률이면 스킵 (내부 링크로 처리) + + # 장 매핑 조회 + law_map = article_chapter_map.get(resolved, {}) + chapter_file = law_map.get(article_ref) + if chapter_file: + return f"[[{chapter_file}#{article_ref}|{m.group(0)}]]" + return m.group(0) + + pattern = r"「([^」]+)」\s*(제\d+조(?:의\d+)*)" + return re.sub(pattern, replace_cross_ref, text) + + +# --- 파일 저장 --- + +def save_law_as_markdown(law_name: str, parsed: dict, output_dir: Path) -> list[Path]: + """파싱된 법령 → 장별 MD 파일 저장. 생성된 파일 경로 리스트 반환.""" + law_dir = output_dir / law_name.replace(" ", "_") + law_dir.mkdir(parents=True, exist_ok=True) + + info = parsed["info"] + chapters = split_by_chapter(parsed["articles"]) + files = [] + + # 기본정보 + info_path = law_dir / "00_기본정보.md" + info_path.write_text(info_to_markdown(info), encoding="utf-8") + files.append(info_path) + + # 같은 법률 내 조문 ID 수집 (내부 링크용) + all_article_ids = set() + for ch in chapters: + for a in ch["articles"]: + all_article_ids.add(_format_article_num(a)) + + # 장별 파일 + for i, chapter in enumerate(chapters, 1): + ch_name = chapter["chapter"] or law_name + # 파일명 안전화 + safe_name = re.sub(r"[·ㆍ\s]+", "_", ch_name) + safe_name = re.sub(r"[^\w가-힣]", "", safe_name) + filename = f"{safe_name}.md" + + md_content = chapter_to_markdown(law_name, info, chapter) + # 내부 링크 적용 + md_content = add_internal_links(md_content, all_article_ids) + + filepath = law_dir / filename + filepath.write_text(md_content, encoding="utf-8") + files.append(filepath) + + # 부칙 + if parsed["appendices"]: + ap_path = law_dir / "부칙.md" + ap_path.write_text(appendices_to_markdown(law_name, parsed["appendices"]), encoding="utf-8") + files.append(ap_path) + + logger.info(f"{law_name}: {len(files)}개 파일 생성 → {law_dir}") + return files + + +def build_article_chapter_map(law_name: str, parsed: dict) -> dict: + """조문→장 파일명 매핑 생성 (크로스 링크용) + Returns: {제X조: 파일명(확장자 없음)} + """ + chapters = split_by_chapter(parsed["articles"]) + mapping = {} + for chapter in chapters: + ch_name = chapter["chapter"] or law_name + safe_name = re.sub(r"[·ㆍ\s]+", "_", ch_name) + safe_name = re.sub(r"[^\w가-힣]", "", safe_name) + file_stem = f"{law_name.replace(' ', '_')}_{safe_name}" if chapter["chapter"] else law_name.replace(" ", "_") + + for article in chapter["articles"]: + article_id = _format_article_num(article) + mapping[article_id] = file_stem + + return mapping + + +# --- MST 캐시 --- + +def load_mst_cache(cache_path: Path) -> dict: + if cache_path.exists(): + with open(cache_path, "r", encoding="utf-8") as f: + return json.load(f) + return {} + + +def save_mst_cache(cache_path: Path, data: dict): + atomic_write_json(cache_path, data) + + +def lookup_current_mst(law_oc: str, law_name: str, category: str = "법률", + cache_path: Path = None, cache_ttl_days: int = 7) -> str | None: + """법령명으로 현행 MST 검색 (캐시 TTL 적용) + - category → API 법령구분코드 매핑으로 검색 정확도 향상 + """ + import requests + + # 캐시 확인 + if cache_path: + cache = load_mst_cache(cache_path) + entry = cache.get(law_name) + if entry: + cached_at = datetime.fromisoformat(entry["cached_at"]) + if datetime.now() - cached_at < timedelta(days=cache_ttl_days): + return entry["mst"] + + try: + resp = requests.get("https://www.law.go.kr/DRF/lawSearch.do", params={ + "OC": law_oc, "target": "law", "type": "JSON", + "query": law_name, "display": "5", + }, timeout=15) + resp.raise_for_status() + data = resp.json().get("LawSearch", {}) + laws = data.get("law", []) + if isinstance(laws, dict): + laws = [laws] + + # 현행 필터 + 법령명 정확 매칭 + current = [l for l in laws + if l.get("현행연혁코드") == "현행" + and law_name in l.get("법령명한글", "")] + + if not current: + logger.warning(f"MST 검색 실패: {law_name} — 현행 법령 없음") + return None + + mst = current[0]["법령일련번호"] + + # 캐시 저장 + if cache_path: + cache = load_mst_cache(cache_path) + cache[law_name] = {"mst": mst, "cached_at": datetime.now().isoformat()} + save_mst_cache(cache_path, cache) + + return mst + except Exception as e: + logger.error(f"MST 조회 에러 [{law_name}]: {e}") + return None + + +if __name__ == "__main__": + # 단독 실행: XML 파일을 MD로 변환 + if len(sys.argv) < 2: + print("사용법: python3 law_parser.py [output_dir]") + sys.exit(1) + + xml_path = sys.argv[1] + output_dir = Path(sys.argv[2]) if len(sys.argv) > 2 else Path("data/laws/md") + + parsed = parse_law_xml(xml_path) + print(f"법령: {parsed['info']['name']}") + print(f"조문: {len(parsed['articles'])}개, 부칙: {len(parsed['appendices'])}개") + + files = save_law_as_markdown(parsed["info"]["name"], parsed, output_dir) + print(f"생성된 파일: {len(files)}개") + for f in files: + print(f" {f}")