Files
hyungi_document_server/app/workers/law_monitor.py
hyungi 3feddd012b feat(safety): A-2 수집기 ingest 시점 분류 축 부여 — 레지스트리 전파 + 승인 가드 (mig 352~355)
plan safety-library-1 A-2 (classify-skip 경로 전수 커버):
- news_sources 에 material_type/license_scheme/license_redistribute + 안전·공학 12행 시드
- news_collector: 레지스트리 → documents 전파 (_material_axis — paper 는 jurisdiction NULL 강제)
- kosha(사례·첨부=incident, GUIDE=guide)/csb(incident·US)/api_std(standard·US)/law_monitor(law·KR)
  /file_watcher(KGS=law·KR 타깃 매핑) deterministic 부여 + extract_meta.license 주입
- published_date: 소스별 가용 날짜 (GUIDE 공표일·CSB lastmod·API 공지일·법령 공포일·뉴스 발행일)
- classify_worker: document_type→material_type 결정적 매핑 제안 (자동 전이 금지)
- accept-suggestion: material 제안 적용 + law=jurisdiction 필수(기본값 없음) + 청크 미러 1문 동기화
- chunk_worker: 비뉴스 문서 country=jurisdiction 미러 (R3-m3: 검색측 country 소비자 0 실측)

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-06-13 06:23:22 +09:00

368 lines
13 KiB
Python

"""법령 모니터 워커 — 국가법령정보센터 API 연동
26개 법령 모니터링, 편/장 단위 분할 저장, 변경 이력 추적.
매일 07:00 실행 (APScheduler).
"""
import os
import re
from datetime import date, datetime, timezone
from pathlib import Path
from xml.etree import ElementTree as ET
import httpx
from sqlalchemy import select
from core.config import settings
from core.database import async_session
from core.utils import create_caldav_todo, file_hash, setup_logger
from models.automation import AutomationState
from models.document import Document
from models.queue import enqueue_stage
logger = setup_logger("law_monitor")
LAW_SEARCH_URL = "https://www.law.go.kr/DRF/lawSearch.do"
LAW_SERVICE_URL = "https://www.law.go.kr/DRF/lawService.do"
# 모니터링 대상 법령 (26개)
MONITORED_LAWS = [
# 산업안전보건 핵심
"산업안전보건법",
"산업안전보건법 시행령",
"산업안전보건법 시행규칙",
"산업안전보건기준에 관한 규칙",
"유해위험작업의 취업 제한에 관한 규칙",
"중대재해 처벌 등에 관한 법률",
"중대재해 처벌 등에 관한 법률 시행령",
# 건설안전
"건설기술 진흥법",
"건설기술 진흥법 시행령",
"건설기술 진흥법 시행규칙",
"시설물의 안전 및 유지관리에 관한 특별법",
# 위험물/화학
"위험물안전관리법",
"위험물안전관리법 시행령",
"위험물안전관리법 시행규칙",
"화학물질관리법",
"화학물질관리법 시행령",
"화학물질의 등록 및 평가 등에 관한 법률",
# 소방/전기/가스
"소방시설 설치 및 관리에 관한 법률",
"소방시설 설치 및 관리에 관한 법률 시행령",
"전기사업법",
"전기안전관리법",
"고압가스 안전관리법",
"고압가스 안전관리법 시행령",
"액화석유가스의 안전관리 및 사업법",
# 근로/환경
"근로기준법",
"환경영향평가법",
]
async def run():
"""법령 변경 모니터링 실행"""
law_oc = os.getenv("LAW_OC", "")
if not law_oc:
logger.warning("LAW_OC 미설정 — 법령 API 승인 대기 중")
return
async with async_session() as session:
state = await session.execute(
select(AutomationState).where(AutomationState.job_name == "law_monitor")
)
state_row = state.scalar_one_or_none()
last_check = state_row.last_check_value if state_row else None
today = datetime.now(timezone.utc).strftime("%Y%m%d")
if last_check == today:
logger.info("오늘 이미 체크 완료")
return
new_count = 0
async with httpx.AsyncClient(timeout=30) as client:
for law_name in MONITORED_LAWS:
try:
count = await _check_law(client, law_oc, law_name, session)
new_count += count
except Exception as e:
logger.error(f"[{law_name}] 체크 실패: {e}")
# 상태 업데이트
if state_row:
state_row.last_check_value = today
state_row.last_run_at = datetime.now(timezone.utc)
else:
session.add(AutomationState(
job_name="law_monitor",
last_check_value=today,
last_run_at=datetime.now(timezone.utc),
))
await session.commit()
logger.info(f"법령 모니터 완료: {new_count}건 신규/변경 감지")
async def _check_law(
client: httpx.AsyncClient,
law_oc: str,
law_name: str,
session,
) -> int:
"""단일 법령 검색 → 변경 감지 → 분할 저장"""
# 법령 검색 (lawSearch.do)
resp = await client.get(
LAW_SEARCH_URL,
params={"OC": law_oc, "target": "law", "type": "XML", "query": law_name},
)
resp.raise_for_status()
root = ET.fromstring(resp.text)
total = root.findtext(".//totalCnt", "0")
if total == "0":
logger.debug(f"[{law_name}] 검색 결과 없음")
return 0
# 정확히 일치하는 법령 찾기
for law_elem in root.findall(".//law"):
found_name = law_elem.findtext("법령명한글", "").strip()
if found_name != law_name:
continue
mst = law_elem.findtext("법령일련번호", "")
proclamation_date = law_elem.findtext("공포일자", "")
revision_type = law_elem.findtext("제개정구분명", "")
if not mst:
continue
# 이미 등록된 법령인지 확인 (같은 법령명 + 공포일자)
existing = await session.execute(
select(Document).where(
Document.title.like(f"{law_name}%"),
Document.source_channel == "law_monitor",
)
)
existing_docs = existing.scalars().all()
# 같은 공포일자 이미 있으면 skip
for doc in existing_docs:
if proclamation_date in (doc.title or ""):
return 0
# 이전 공포일 찾기 (변경 이력용)
prev_date = ""
if existing_docs:
prev_date = max(
(re.search(r'\d{8}', doc.title or "").group() for doc in existing_docs
if re.search(r'\d{8}', doc.title or "")),
default=""
)
# 본문 조회 (lawService.do)
text_resp = await client.get(
LAW_SERVICE_URL,
params={"OC": law_oc, "target": "law", "MST": mst, "type": "XML"},
)
text_resp.raise_for_status()
# 분할 저장
count = await _save_law_split(
session, text_resp.text, law_name, proclamation_date,
revision_type, prev_date,
)
# DB 먼저 커밋 (알림 실패가 저장을 막지 않도록)
await session.commit()
# CalDAV + SMTP 알림 (실패해도 무시)
try:
_send_notifications(law_name, proclamation_date, revision_type)
except Exception as e:
logger.warning(f"[{law_name}] 알림 발송 실패 (무시): {e}")
return count
return 0
async def _save_law_split(
session, xml_text: str, law_name: str, proclamation_date: str,
revision_type: str, prev_date: str,
) -> int:
"""법령 XML → 장(章) 단위 Markdown 분할 저장"""
root = ET.fromstring(xml_text)
# 조문단위에서 장 구분자 찾기 (조문키가 000으로 끝나는 조문)
units = root.findall(".//조문단위")
chapters = [] # [(장제목, [조문들])]
current_chapter = None
current_articles = []
for unit in units:
key = unit.attrib.get("조문키", "")
content = (unit.findtext("조문내용", "") or "").strip()
# 장 구분자: 키가 000으로 끝나고 내용에 "제X장" 포함
if key.endswith("000") and re.search(r"\d+장", content):
# 이전 장/서문 저장
if current_articles:
chapter_name = current_chapter or "서문"
chapters.append((chapter_name, current_articles))
chapter_match = re.search(r"(제\d+장\s*.+)", content)
current_chapter = chapter_match.group(1).strip() if chapter_match else content.strip()
current_articles = []
else:
current_articles.append(unit)
# 마지막 장 저장
if current_articles:
chapter_name = current_chapter or "서문"
chapters.append((chapter_name, current_articles))
# 장 분할 성공
sections = []
if chapters:
for chapter_title, articles in chapters:
md_lines = [f"# {law_name}\n", f"## {chapter_title}\n"]
for article in articles:
title = article.findtext("조문제목", "")
content = article.findtext("조문내용", "")
if title:
md_lines.append(f"\n### {title}\n")
if content:
md_lines.append(content.strip())
section_name = _safe_name(chapter_title)
sections.append((section_name, "\n".join(md_lines)))
else:
# 장 분할 실패 → 전체 1파일
full_md = _law_xml_to_markdown(xml_text, law_name)
sections.append(("전문", full_md))
# 각 섹션 저장
inbox_dir = Path(settings.nas_mount_path) / "PKM" / "Inbox"
inbox_dir.mkdir(parents=True, exist_ok=True)
count = 0
for section_name, content in sections:
filename = f"{law_name}_{proclamation_date}_{section_name}.md"
file_path = inbox_dir / filename
file_path.write_text(content, encoding="utf-8")
rel_path = str(file_path.relative_to(Path(settings.nas_mount_path)))
# 변경 이력 메모
note = ""
if prev_date:
note = (
f"[자동] 법령 개정 감지\n"
f"이전 공포일: {prev_date}\n"
f"현재 공포일: {proclamation_date}\n"
f"개정구분: {revision_type}"
)
# 안전 자료실 A-2 — 공포일 파싱 (law published_date = COALESCE(시행일, 공포일) 계약,
# 본 레거시 워커는 공포일만 보유 — 시행일 기반 버전 체인은 B-1 statute_collector 소관)
_digits = re.sub(r"\D", "", str(proclamation_date or ""))
pub_date = None
if len(_digits) == 8:
try:
pub_date = date(int(_digits[:4]), int(_digits[4:6]), int(_digits[6:8]))
except ValueError:
pub_date = None
doc = Document(
file_path=rel_path,
file_hash=file_hash(file_path),
file_format="md",
file_size=len(content.encode()),
file_type="immutable",
title=f"{law_name} ({proclamation_date}) {section_name}",
source_channel="law_monitor",
data_origin="work",
category="law",
# 안전 자료실 A-2 — ingest 시점 deterministic. 법령 텍스트 = 저작권법 제7조
# 비보호 저작물 (public domain). 본 워커는 휴면(LAW_OC 미설정)이나 코드 경로 유지.
material_type="law",
jurisdiction="KR",
published_date=pub_date,
extract_meta={"license": {"scheme": "public_domain", "redistribute": True,
"attribution": "국가법령정보센터"}},
user_note=note or None,
)
session.add(doc)
await session.flush()
await enqueue_stage(session, doc.id, "extract")
count += 1
logger.info(f"[법령] {law_name} ({proclamation_date}) → {count}개 섹션 저장")
return count
def _xml_section_to_markdown(elem) -> str:
"""XML 섹션(편/장)을 Markdown으로 변환"""
lines = []
for article in elem.iter():
tag = article.tag
text = (article.text or "").strip()
if not text:
continue
if "" in tag:
lines.append(f"\n### {text}\n")
elif "" in tag:
lines.append(f"\n{text}\n")
elif "" in tag:
lines.append(f"- {text}")
elif "" in tag:
lines.append(f" - {text}")
else:
lines.append(text)
return "\n".join(lines)
def _law_xml_to_markdown(xml_text: str, law_name: str) -> str:
"""법령 XML 전체를 Markdown으로 변환"""
root = ET.fromstring(xml_text)
lines = [f"# {law_name}\n"]
for elem in root.iter():
tag = elem.tag
text = (elem.text or "").strip()
if not text:
continue
if "" in tag and "제목" not in tag:
lines.append(f"\n## {text}\n")
elif "" in tag and "제목" not in tag:
lines.append(f"\n## {text}\n")
elif "" in tag:
lines.append(f"\n### {text}\n")
elif "" in tag:
lines.append(f"\n{text}\n")
elif "" in tag:
lines.append(f"- {text}")
elif "" in tag:
lines.append(f" - {text}")
return "\n".join(lines)
def _safe_name(name: str) -> str:
"""파일명 안전 변환"""
return re.sub(r'[^\w가-힣-]', '_', name).strip("_")
def _send_notifications(law_name: str, proclamation_date: str, revision_type: str):
"""CalDAV 할일 알림 (SMTP 발송은 2026-06-10 폐기 — CalDAV 가 단일 알림 채널)"""
caldav_url = os.getenv("CALDAV_URL", "")
caldav_user = os.getenv("CALDAV_USER", "")
caldav_pass = os.getenv("CALDAV_PASS", "")
if caldav_url and caldav_user:
create_caldav_todo(
caldav_url, caldav_user, caldav_pass,
title=f"법령 검토: {law_name}",
description=f"공포일자: {proclamation_date}, 개정구분: {revision_type}",
due_days=7,
)