feat: 법령 API 전면 개편 — 26개 법령, 분할 저장, 변경 이력 추적

- 모니터링 법령 12개 → 26개 (산업안전/건설/위험물/소방/전기/가스/근로/환경)
- lawSearch.do로 검색, lawService.do로 본문 조회
- 대형 법령 편/장 단위 분할 저장 (fallback: 편→장→전체)
- 저장 경로: PKM/Inbox/ (AI 자동 분류 연계)
- 변경 감지 시 user_note에 이력 자동 기록
- CalDAV + SMTP 알림

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Hyungi Ahn
2026-04-03 14:47:08 +09:00
parent b4ca918125
commit 93c5805060

View File

@@ -1,7 +1,7 @@
"""법령 모니터 워커 — 국가법령정보센터 API로 법령 변경 감지 """법령 모니터 워커 — 국가법령정보센터 API 연동
LAW_OC 승인 대기 중 — 코드 완성, 실 호출은 승인 후. 26개 법령 모니터링, 편/장 단위 분할 저장, 변경 이력 추적.
v1 scripts/law_monitor.py에서 포팅. 매일 07:00 실행 (APScheduler).
""" """
import os import os
@@ -15,29 +15,49 @@ from sqlalchemy import select
from core.config import settings from core.config import settings
from core.database import async_session from core.database import async_session
from core.utils import create_caldav_todo, file_hash, send_smtp_email, setup_logger from core.utils import create_caldav_todo, escape_ical_text, file_hash, send_smtp_email, setup_logger
from models.automation import AutomationState from models.automation import AutomationState
from models.document import Document from models.document import Document
from models.queue import ProcessingQueue from models.queue import ProcessingQueue
logger = setup_logger("law_monitor") logger = setup_logger("law_monitor")
LAW_API_BASE = "https://www.law.go.kr" LAW_SEARCH_URL = "https://www.law.go.kr/DRF/lawSearch.do"
LAW_SERVICE_URL = "https://www.law.go.kr/DRF/lawService.do"
# 모니터링 대상 법령 (Tier 1: 핵심) # 모니터링 대상 법령 (26개)
TIER1_LAWS = [ MONITORED_LAWS = [
# 산업안전보건 핵심
"산업안전보건법", "산업안전보건법",
"산업안전보건법 시행령", "산업안전보건법 시행령",
"산업안전보건법 시행규칙", "산업안전보건법 시행규칙",
"산업안전보건기준에 관한 규칙",
"유해위험작업의 취업 제한에 관한 규칙",
"중대재해 처벌 등에 관한 법률", "중대재해 처벌 등에 관한 법률",
"중대재해 처벌 등에 관한 법률 시행령", "중대재해 처벌 등에 관한 법률 시행령",
# 건설안전
"건설기술 진흥법", "건설기술 진흥법",
"건설기술 진흥법 시행령", "건설기술 진흥법 시행령",
"건설기술 진흥법 시행규칙",
"시설물의 안전 및 유지관리에 관한 특별법",
# 위험물/화학
"위험물안전관리법", "위험물안전관리법",
"위험물안전관리법 시행령",
"위험물안전관리법 시행규칙",
"화학물질관리법", "화학물질관리법",
"화학물질관리법 시행령",
"화학물질의 등록 및 평가 등에 관한 법률",
# 소방/전기/가스
"소방시설 설치 및 관리에 관한 법률", "소방시설 설치 및 관리에 관한 법률",
"소방시설 설치 및 관리에 관한 법률 시행령",
"전기사업법", "전기사업법",
"전기안전관리법",
"고압가스 안전관리법", "고압가스 안전관리법",
"고압가스 안전관리법 시행령",
"액화석유가스의 안전관리 및 사업법",
# 근로/환경
"근로기준법",
"환경영향평가법",
] ]
@@ -49,7 +69,6 @@ async def run():
return return
async with async_session() as session: async with async_session() as session:
# 마지막 체크 날짜 조회
state = await session.execute( state = await session.execute(
select(AutomationState).where(AutomationState.job_name == "law_monitor") select(AutomationState).where(AutomationState.job_name == "law_monitor")
) )
@@ -63,11 +82,10 @@ async def run():
new_count = 0 new_count = 0
async with httpx.AsyncClient(timeout=30) as client: async with httpx.AsyncClient(timeout=30) as client:
for law_name in TIER1_LAWS: for law_name in MONITORED_LAWS:
try: try:
changed = await _check_law(client, law_oc, law_name, session) count = await _check_law(client, law_oc, law_name, session)
if changed: new_count += count
new_count += 1
except Exception as e: except Exception as e:
logger.error(f"[{law_name}] 체크 실패: {e}") logger.error(f"[{law_name}] 체크 실패: {e}")
@@ -83,7 +101,7 @@ async def run():
)) ))
await session.commit() await session.commit()
logger.info(f"법령 모니터 완료: {new_count}건 변경 감지") logger.info(f"법령 모니터 완료: {new_count}신규/변경 감지")
async def _check_law( async def _check_law(
@@ -91,11 +109,11 @@ async def _check_law(
law_oc: str, law_oc: str,
law_name: str, law_name: str,
session, session,
) -> bool: ) -> int:
"""단일 법령 변경 체크 → 변경 시 NAS 저장 + DB 등록 + CalDAV""" """단일 법령 검색 → 변경 감지 → 분할 저장"""
# 법령 정보 조회 # 법령 검색 (lawSearch.do)
resp = await client.get( resp = await client.get(
f"{LAW_API_BASE}/DRF/lawService.do", LAW_SEARCH_URL,
params={"OC": law_oc, "target": "law", "type": "XML", "query": law_name}, params={"OC": law_oc, "target": "law", "type": "XML", "query": law_name},
) )
resp.raise_for_status() resp.raise_for_status()
@@ -103,57 +121,196 @@ async def _check_law(
root = ET.fromstring(resp.text) root = ET.fromstring(resp.text)
total = root.findtext(".//totalCnt", "0") total = root.findtext(".//totalCnt", "0")
if total == "0": if total == "0":
return False logger.debug(f"[{law_name}] 검색 결과 없음")
return 0
# 첫 번째 결과의 MST (법령 고유번호) # 정확히 일치하는 법령 찾기
mst = root.findtext(".//법령MST", "") for law_elem in root.findall(".//law"):
proclamation_date = root.findtext(".//공포일자", "") found_name = law_elem.findtext("법령명한글", "").strip()
if not mst: if found_name != law_name:
return False continue
# 이미 등록된 법령인지 확인 (같은 공포일자) mst = law_elem.findtext("법령일련번호", "")
check_path = f"PKM/Knowledge/Industrial_Safety/Legislation/{law_name}_{proclamation_date}.md" proclamation_date = law_elem.findtext("공포일자", "")
existing = await session.execute( revision_type = law_elem.findtext("제개정구분명", "")
select(Document).where(Document.file_path == check_path)
)
if existing.scalar_one_or_none():
return False
# 법령 본문 조회 if not mst:
text_resp = await client.get( continue
f"{LAW_API_BASE}/DRF/lawService.do",
params={"OC": law_oc, "target": "law", "MST": mst, "type": "XML"},
)
text_resp.raise_for_status()
# Markdown 변환 # 이미 등록된 법령인지 확인 (같은 법령명 + 공포일자)
markdown = _law_xml_to_markdown(text_resp.text, law_name) existing = await session.execute(
select(Document).where(
Document.title.like(f"{law_name}%"),
Document.source_channel == "law_monitor",
)
)
existing_docs = existing.scalars().all()
# NAS 저장 # 같은 공포일자 이미 있으면 skip
full_path = Path(settings.nas_mount_path) / check_path for doc in existing_docs:
full_path.parent.mkdir(parents=True, exist_ok=True) if proclamation_date in (doc.title or ""):
full_path.write_text(markdown, encoding="utf-8") return 0
# DB 등록 # 이전 공포일 찾기 (변경 이력용)
doc = Document( prev_date = ""
file_path=check_path, if existing_docs:
file_hash=file_hash(full_path), prev_date = max(
file_format="md", (re.search(r'\d{8}', doc.title or "").group() for doc in existing_docs
file_size=len(markdown.encode()), if re.search(r'\d{8}', doc.title or "")),
file_type="immutable", default=""
title=f"{law_name} ({proclamation_date})", )
source_channel="law_monitor",
data_origin="work",
)
session.add(doc)
await session.flush()
# 처리 큐 등록 # 본문 조회 (lawService.do)
session.add(ProcessingQueue( text_resp = await client.get(
document_id=doc.id, stage="extract", status="pending", LAW_SERVICE_URL,
)) params={"OC": law_oc, "target": "law", "MST": mst, "type": "XML"},
)
text_resp.raise_for_status()
# CalDAV 태스크 생성 # 분할 저장
count = await _save_law_split(
session, text_resp.text, law_name, proclamation_date,
revision_type, prev_date,
)
# CalDAV + SMTP 알림
_send_notifications(law_name, proclamation_date, revision_type)
return count
return 0
async def _save_law_split(
session, xml_text: str, law_name: str, proclamation_date: str,
revision_type: str, prev_date: str,
) -> int:
"""법령 XML → 편/장 단위 Markdown 분할 저장"""
root = ET.fromstring(xml_text)
sections = []
# 편(編) 단위 분할 시도
for part in root.findall(".//*편"):
title = part.attrib.get("제목", part.findtext("편제목", ""))
number = part.attrib.get("번호", "")
content = _xml_section_to_markdown(part)
if content.strip():
sections.append((f"{number}편_{_safe_name(title)}", content))
# 편이 없으면 장(章) 단위 시도
if not sections:
for chapter in root.findall(".//*장"):
title = chapter.attrib.get("제목", chapter.findtext("장제목", ""))
number = chapter.attrib.get("번호", "")
content = _xml_section_to_markdown(chapter)
if content.strip():
sections.append((f"{number}장_{_safe_name(title)}", content))
# 편/장 둘 다 없으면 전체 1파일
if not sections:
full_md = _law_xml_to_markdown(xml_text, law_name)
sections.append(("전문", full_md))
# 각 섹션 저장
inbox_dir = Path(settings.nas_mount_path) / "PKM" / "Inbox"
inbox_dir.mkdir(parents=True, exist_ok=True)
count = 0
for section_name, content in sections:
filename = f"{law_name}_{proclamation_date}_{section_name}.md"
file_path = inbox_dir / filename
file_path.write_text(content, encoding="utf-8")
rel_path = str(file_path.relative_to(Path(settings.nas_mount_path)))
# 변경 이력 메모
note = ""
if prev_date:
note = (
f"[자동] 법령 개정 감지\n"
f"이전 공포일: {prev_date}\n"
f"현재 공포일: {proclamation_date}\n"
f"개정구분: {revision_type}"
)
doc = Document(
file_path=rel_path,
file_hash=file_hash(file_path),
file_format="md",
file_size=len(content.encode()),
file_type="immutable",
title=f"{law_name} ({proclamation_date}) {section_name}",
source_channel="law_monitor",
data_origin="work",
user_note=note or None,
)
session.add(doc)
await session.flush()
session.add(ProcessingQueue(
document_id=doc.id, stage="extract", status="pending",
))
count += 1
logger.info(f"[법령] {law_name} ({proclamation_date}) → {count}개 섹션 저장")
return count
def _xml_section_to_markdown(elem) -> str:
"""XML 섹션(편/장)을 Markdown으로 변환"""
lines = []
for article in elem.iter():
tag = article.tag
text = (article.text or "").strip()
if not text:
continue
if "" in tag:
lines.append(f"\n### {text}\n")
elif "" in tag:
lines.append(f"\n{text}\n")
elif "" in tag:
lines.append(f"- {text}")
elif "" in tag:
lines.append(f" - {text}")
else:
lines.append(text)
return "\n".join(lines)
def _law_xml_to_markdown(xml_text: str, law_name: str) -> str:
"""법령 XML 전체를 Markdown으로 변환"""
root = ET.fromstring(xml_text)
lines = [f"# {law_name}\n"]
for elem in root.iter():
tag = elem.tag
text = (elem.text or "").strip()
if not text:
continue
if "" in tag and "제목" not in tag:
lines.append(f"\n## {text}\n")
elif "" in tag and "제목" not in tag:
lines.append(f"\n## {text}\n")
elif "" in tag:
lines.append(f"\n### {text}\n")
elif "" in tag:
lines.append(f"\n{text}\n")
elif "" in tag:
lines.append(f"- {text}")
elif "" in tag:
lines.append(f" - {text}")
return "\n".join(lines)
def _safe_name(name: str) -> str:
"""파일명 안전 변환"""
return re.sub(r'[^\w가-힣-]', '_', name).strip("_")
def _send_notifications(law_name: str, proclamation_date: str, revision_type: str):
"""CalDAV + SMTP 알림"""
# CalDAV
caldav_url = os.getenv("CALDAV_URL", "") caldav_url = os.getenv("CALDAV_URL", "")
caldav_user = os.getenv("CALDAV_USER", "") caldav_user = os.getenv("CALDAV_USER", "")
caldav_pass = os.getenv("CALDAV_PASS", "") caldav_pass = os.getenv("CALDAV_PASS", "")
@@ -161,32 +318,18 @@ async def _check_law(
create_caldav_todo( create_caldav_todo(
caldav_url, caldav_user, caldav_pass, caldav_url, caldav_user, caldav_pass,
title=f"법령 검토: {law_name}", title=f"법령 검토: {law_name}",
description=f"공포일자: {proclamation_date}\n경로: {check_path}", description=f"공포일자: {proclamation_date}, 개정구분: {revision_type}",
due_days=7, due_days=7,
) )
logger.info(f"[법령] {law_name} ({proclamation_date}) 등록 완료") # SMTP
return True smtp_host = os.getenv("MAILPLUS_HOST", "")
smtp_port = int(os.getenv("MAILPLUS_SMTP_PORT", "465"))
smtp_user = os.getenv("MAILPLUS_USER", "")
def _law_xml_to_markdown(xml_text: str, law_name: str) -> str: smtp_pass = os.getenv("MAILPLUS_PASS", "")
"""법령 XML을 Markdown으로 변환 (장/조 구조)""" if smtp_host and smtp_user:
root = ET.fromstring(xml_text) send_smtp_email(
lines = [f"# {law_name}\n"] smtp_host, smtp_port, smtp_user, smtp_pass,
subject=f"[법령 변경] {law_name} ({revision_type})",
for article in root.iter("조문단위"): body=f"법령명: {law_name}\n공포일자: {proclamation_date}\n개정구분: {revision_type}",
num = article.findtext("조문번호", "") )
title = article.findtext("조문제목", "")
content = article.findtext("조문내용", "")
if title:
lines.append(f"\n## 제{num}조 ({title})\n")
elif num:
lines.append(f"\n## 제{num}\n")
if content:
# HTML 태그 제거
clean = re.sub(r"<[^>]+>", "", content).strip()
lines.append(f"{clean}\n")
return "\n".join(lines)