"""법령 모니터 워커 — 국가법령정보센터 API로 법령 변경 감지 LAW_OC 승인 대기 중 — 코드 완성, 실 호출은 승인 후. v1 scripts/law_monitor.py에서 포팅. """ import os import re from datetime import datetime, timezone from pathlib import Path from xml.etree import ElementTree as ET import httpx from sqlalchemy import select from core.config import settings from core.database import async_session from core.utils import create_caldav_todo, file_hash, send_smtp_email, setup_logger from models.automation import AutomationState from models.document import Document from models.queue import ProcessingQueue logger = setup_logger("law_monitor") LAW_API_BASE = "https://www.law.go.kr" # 모니터링 대상 법령 (Tier 1: 핵심) TIER1_LAWS = [ "산업안전보건법", "산업안전보건법 시행령", "산업안전보건법 시행규칙", "중대재해 처벌 등에 관한 법률", "중대재해 처벌 등에 관한 법률 시행령", "건설기술 진흥법", "건설기술 진흥법 시행령", "위험물안전관리법", "화학물질관리법", "소방시설 설치 및 관리에 관한 법률", "전기사업법", "고압가스 안전관리법", ] async def run(): """법령 변경 모니터링 실행""" law_oc = os.getenv("LAW_OC", "") if not law_oc: logger.warning("LAW_OC 미설정 — 법령 API 승인 대기 중") return async with async_session() as session: # 마지막 체크 날짜 조회 state = await session.execute( select(AutomationState).where(AutomationState.job_name == "law_monitor") ) state_row = state.scalar_one_or_none() last_check = state_row.last_check_value if state_row else None today = datetime.now(timezone.utc).strftime("%Y%m%d") if last_check == today: logger.info("오늘 이미 체크 완료") return new_count = 0 async with httpx.AsyncClient(timeout=30) as client: for law_name in TIER1_LAWS: try: changed = await _check_law(client, law_oc, law_name, session) if changed: new_count += 1 except Exception as e: logger.error(f"[{law_name}] 체크 실패: {e}") # 상태 업데이트 if state_row: state_row.last_check_value = today state_row.last_run_at = datetime.now(timezone.utc) else: session.add(AutomationState( job_name="law_monitor", last_check_value=today, last_run_at=datetime.now(timezone.utc), )) await session.commit() logger.info(f"법령 모니터 완료: {new_count}건 변경 감지") async def _check_law( client: httpx.AsyncClient, law_oc: str, law_name: str, session, ) -> bool: """단일 법령 변경 체크 → 변경 시 NAS 저장 + DB 등록 + CalDAV""" # 법령 정보 조회 resp = await client.get( f"{LAW_API_BASE}/DRF/lawService.do", params={"OC": law_oc, "target": "law", "type": "XML", "query": law_name}, ) resp.raise_for_status() root = ET.fromstring(resp.text) total = root.findtext(".//totalCnt", "0") if total == "0": return False # 첫 번째 결과의 MST (법령 고유번호) mst = root.findtext(".//법령MST", "") proclamation_date = root.findtext(".//공포일자", "") if not mst: return False # 이미 등록된 법령인지 확인 (같은 공포일자) check_path = f"PKM/Knowledge/Industrial_Safety/Legislation/{law_name}_{proclamation_date}.md" existing = await session.execute( select(Document).where(Document.file_path == check_path) ) if existing.scalar_one_or_none(): return False # 법령 본문 조회 text_resp = await client.get( f"{LAW_API_BASE}/DRF/lawService.do", params={"OC": law_oc, "target": "law", "MST": mst, "type": "XML"}, ) text_resp.raise_for_status() # Markdown 변환 markdown = _law_xml_to_markdown(text_resp.text, law_name) # NAS 저장 full_path = Path(settings.nas_mount_path) / check_path full_path.parent.mkdir(parents=True, exist_ok=True) full_path.write_text(markdown, encoding="utf-8") # DB 등록 doc = Document( file_path=check_path, file_hash=file_hash(full_path), file_format="md", file_size=len(markdown.encode()), file_type="immutable", title=f"{law_name} ({proclamation_date})", source_channel="law_monitor", data_origin="work", ) session.add(doc) await session.flush() # 처리 큐 등록 session.add(ProcessingQueue( document_id=doc.id, stage="extract", status="pending", )) # CalDAV 태스크 생성 caldav_url = os.getenv("CALDAV_URL", "") caldav_user = os.getenv("CALDAV_USER", "") caldav_pass = os.getenv("CALDAV_PASS", "") if caldav_url and caldav_user: create_caldav_todo( caldav_url, caldav_user, caldav_pass, title=f"법령 검토: {law_name}", description=f"공포일자: {proclamation_date}\n경로: {check_path}", due_days=7, ) logger.info(f"[법령] {law_name} ({proclamation_date}) 등록 완료") return True def _law_xml_to_markdown(xml_text: str, law_name: str) -> str: """법령 XML을 Markdown으로 변환 (장/조 구조)""" root = ET.fromstring(xml_text) lines = [f"# {law_name}\n"] for article in root.iter("조문단위"): num = article.findtext("조문번호", "") title = article.findtext("조문제목", "") content = article.findtext("조문내용", "") if title: lines.append(f"\n## 제{num}조 ({title})\n") elif num: lines.append(f"\n## 제{num}조\n") if content: # HTML 태그 제거 clean = re.sub(r"<[^>]+>", "", content).strip() lines.append(f"{clean}\n") return "\n".join(lines)