diff --git a/app/workers/law_monitor.py b/app/workers/law_monitor.py index d5d2f8d..3b75cb5 100644 --- a/app/workers/law_monitor.py +++ b/app/workers/law_monitor.py @@ -1,7 +1,7 @@ -"""법령 모니터 워커 — 국가법령정보센터 API로 법령 변경 감지 +"""법령 모니터 워커 — 국가법령정보센터 API 연동 -LAW_OC 승인 대기 중 — 코드 완성, 실 호출은 승인 후. -v1 scripts/law_monitor.py에서 포팅. +26개 법령 모니터링, 편/장 단위 분할 저장, 변경 이력 추적. +매일 07:00 실행 (APScheduler). """ import os @@ -15,29 +15,49 @@ from sqlalchemy import select from core.config import settings from core.database import async_session -from core.utils import create_caldav_todo, file_hash, send_smtp_email, setup_logger +from core.utils import create_caldav_todo, escape_ical_text, file_hash, send_smtp_email, setup_logger from models.automation import AutomationState from models.document import Document from models.queue import ProcessingQueue logger = setup_logger("law_monitor") -LAW_API_BASE = "https://www.law.go.kr" +LAW_SEARCH_URL = "https://www.law.go.kr/DRF/lawSearch.do" +LAW_SERVICE_URL = "https://www.law.go.kr/DRF/lawService.do" -# 모니터링 대상 법령 (Tier 1: 핵심) -TIER1_LAWS = [ +# 모니터링 대상 법령 (26개) +MONITORED_LAWS = [ + # 산업안전보건 핵심 "산업안전보건법", "산업안전보건법 시행령", "산업안전보건법 시행규칙", + "산업안전보건기준에 관한 규칙", + "유해위험작업의 취업 제한에 관한 규칙", "중대재해 처벌 등에 관한 법률", "중대재해 처벌 등에 관한 법률 시행령", + # 건설안전 "건설기술 진흥법", "건설기술 진흥법 시행령", + "건설기술 진흥법 시행규칙", + "시설물의 안전 및 유지관리에 관한 특별법", + # 위험물/화학 "위험물안전관리법", + "위험물안전관리법 시행령", + "위험물안전관리법 시행규칙", "화학물질관리법", + "화학물질관리법 시행령", + "화학물질의 등록 및 평가 등에 관한 법률", + # 소방/전기/가스 "소방시설 설치 및 관리에 관한 법률", + "소방시설 설치 및 관리에 관한 법률 시행령", "전기사업법", + "전기안전관리법", "고압가스 안전관리법", + "고압가스 안전관리법 시행령", + "액화석유가스의 안전관리 및 사업법", + # 근로/환경 + "근로기준법", + "환경영향평가법", ] @@ -49,7 +69,6 @@ async def run(): return async with async_session() as session: - # 마지막 체크 날짜 조회 state = await session.execute( select(AutomationState).where(AutomationState.job_name == "law_monitor") ) @@ -63,11 +82,10 @@ async def run(): new_count = 0 async with httpx.AsyncClient(timeout=30) as client: - for law_name in TIER1_LAWS: + for law_name in MONITORED_LAWS: try: - changed = await _check_law(client, law_oc, law_name, session) - if changed: - new_count += 1 + count = await _check_law(client, law_oc, law_name, session) + new_count += count except Exception as e: logger.error(f"[{law_name}] 체크 실패: {e}") @@ -83,7 +101,7 @@ async def run(): )) await session.commit() - logger.info(f"법령 모니터 완료: {new_count}건 변경 감지") + logger.info(f"법령 모니터 완료: {new_count}건 신규/변경 감지") async def _check_law( @@ -91,11 +109,11 @@ async def _check_law( law_oc: str, law_name: str, session, -) -> bool: - """단일 법령 변경 체크 → 변경 시 NAS 저장 + DB 등록 + CalDAV""" - # 법령 정보 조회 +) -> int: + """단일 법령 검색 → 변경 감지 → 분할 저장""" + # 법령 검색 (lawSearch.do) resp = await client.get( - f"{LAW_API_BASE}/DRF/lawService.do", + LAW_SEARCH_URL, params={"OC": law_oc, "target": "law", "type": "XML", "query": law_name}, ) resp.raise_for_status() @@ -103,57 +121,196 @@ async def _check_law( root = ET.fromstring(resp.text) total = root.findtext(".//totalCnt", "0") if total == "0": - return False + logger.debug(f"[{law_name}] 검색 결과 없음") + return 0 - # 첫 번째 결과의 MST (법령 고유번호) - mst = root.findtext(".//법령MST", "") - proclamation_date = root.findtext(".//공포일자", "") - if not mst: - return False + # 정확히 일치하는 법령 찾기 + for law_elem in root.findall(".//law"): + found_name = law_elem.findtext("법령명한글", "").strip() + if found_name != law_name: + continue - # 이미 등록된 법령인지 확인 (같은 공포일자) - check_path = f"PKM/Knowledge/Industrial_Safety/Legislation/{law_name}_{proclamation_date}.md" - existing = await session.execute( - select(Document).where(Document.file_path == check_path) - ) - if existing.scalar_one_or_none(): - return False + mst = law_elem.findtext("법령일련번호", "") + proclamation_date = law_elem.findtext("공포일자", "") + revision_type = law_elem.findtext("제개정구분명", "") - # 법령 본문 조회 - text_resp = await client.get( - f"{LAW_API_BASE}/DRF/lawService.do", - params={"OC": law_oc, "target": "law", "MST": mst, "type": "XML"}, - ) - text_resp.raise_for_status() + if not mst: + continue - # Markdown 변환 - markdown = _law_xml_to_markdown(text_resp.text, law_name) + # 이미 등록된 법령인지 확인 (같은 법령명 + 공포일자) + existing = await session.execute( + select(Document).where( + Document.title.like(f"{law_name}%"), + Document.source_channel == "law_monitor", + ) + ) + existing_docs = existing.scalars().all() - # NAS 저장 - full_path = Path(settings.nas_mount_path) / check_path - full_path.parent.mkdir(parents=True, exist_ok=True) - full_path.write_text(markdown, encoding="utf-8") + # 같은 공포일자 이미 있으면 skip + for doc in existing_docs: + if proclamation_date in (doc.title or ""): + return 0 - # DB 등록 - doc = Document( - file_path=check_path, - file_hash=file_hash(full_path), - file_format="md", - file_size=len(markdown.encode()), - file_type="immutable", - title=f"{law_name} ({proclamation_date})", - source_channel="law_monitor", - data_origin="work", - ) - session.add(doc) - await session.flush() + # 이전 공포일 찾기 (변경 이력용) + prev_date = "" + if existing_docs: + prev_date = max( + (re.search(r'\d{8}', doc.title or "").group() for doc in existing_docs + if re.search(r'\d{8}', doc.title or "")), + default="" + ) - # 처리 큐 등록 - session.add(ProcessingQueue( - document_id=doc.id, stage="extract", status="pending", - )) + # 본문 조회 (lawService.do) + text_resp = await client.get( + LAW_SERVICE_URL, + params={"OC": law_oc, "target": "law", "MST": mst, "type": "XML"}, + ) + text_resp.raise_for_status() - # CalDAV 태스크 생성 + # 분할 저장 + count = await _save_law_split( + session, text_resp.text, law_name, proclamation_date, + revision_type, prev_date, + ) + + # CalDAV + SMTP 알림 + _send_notifications(law_name, proclamation_date, revision_type) + + return count + + return 0 + + +async def _save_law_split( + session, xml_text: str, law_name: str, proclamation_date: str, + revision_type: str, prev_date: str, +) -> int: + """법령 XML → 편/장 단위 Markdown 분할 저장""" + root = ET.fromstring(xml_text) + sections = [] + + # 편(編) 단위 분할 시도 + for part in root.findall(".//*편"): + title = part.attrib.get("제목", part.findtext("편제목", "")) + number = part.attrib.get("번호", "") + content = _xml_section_to_markdown(part) + if content.strip(): + sections.append((f"제{number}편_{_safe_name(title)}", content)) + + # 편이 없으면 장(章) 단위 시도 + if not sections: + for chapter in root.findall(".//*장"): + title = chapter.attrib.get("제목", chapter.findtext("장제목", "")) + number = chapter.attrib.get("번호", "") + content = _xml_section_to_markdown(chapter) + if content.strip(): + sections.append((f"제{number}장_{_safe_name(title)}", content)) + + # 편/장 둘 다 없으면 전체 1파일 + if not sections: + full_md = _law_xml_to_markdown(xml_text, law_name) + sections.append(("전문", full_md)) + + # 각 섹션 저장 + inbox_dir = Path(settings.nas_mount_path) / "PKM" / "Inbox" + inbox_dir.mkdir(parents=True, exist_ok=True) + count = 0 + + for section_name, content in sections: + filename = f"{law_name}_{proclamation_date}_{section_name}.md" + file_path = inbox_dir / filename + file_path.write_text(content, encoding="utf-8") + + rel_path = str(file_path.relative_to(Path(settings.nas_mount_path))) + + # 변경 이력 메모 + note = "" + if prev_date: + note = ( + f"[자동] 법령 개정 감지\n" + f"이전 공포일: {prev_date}\n" + f"현재 공포일: {proclamation_date}\n" + f"개정구분: {revision_type}" + ) + + doc = Document( + file_path=rel_path, + file_hash=file_hash(file_path), + file_format="md", + file_size=len(content.encode()), + file_type="immutable", + title=f"{law_name} ({proclamation_date}) {section_name}", + source_channel="law_monitor", + data_origin="work", + user_note=note or None, + ) + session.add(doc) + await session.flush() + + session.add(ProcessingQueue( + document_id=doc.id, stage="extract", status="pending", + )) + count += 1 + + logger.info(f"[법령] {law_name} ({proclamation_date}) → {count}개 섹션 저장") + return count + + +def _xml_section_to_markdown(elem) -> str: + """XML 섹션(편/장)을 Markdown으로 변환""" + lines = [] + for article in elem.iter(): + tag = article.tag + text = (article.text or "").strip() + if not text: + continue + if "조" in tag: + lines.append(f"\n### {text}\n") + elif "항" in tag: + lines.append(f"\n{text}\n") + elif "호" in tag: + lines.append(f"- {text}") + elif "목" in tag: + lines.append(f" - {text}") + else: + lines.append(text) + return "\n".join(lines) + + +def _law_xml_to_markdown(xml_text: str, law_name: str) -> str: + """법령 XML 전체를 Markdown으로 변환""" + root = ET.fromstring(xml_text) + lines = [f"# {law_name}\n"] + + for elem in root.iter(): + tag = elem.tag + text = (elem.text or "").strip() + if not text: + continue + if "편" in tag and "제목" not in tag: + lines.append(f"\n## {text}\n") + elif "장" in tag and "제목" not in tag: + lines.append(f"\n## {text}\n") + elif "조" in tag: + lines.append(f"\n### {text}\n") + elif "항" in tag: + lines.append(f"\n{text}\n") + elif "호" in tag: + lines.append(f"- {text}") + elif "목" in tag: + lines.append(f" - {text}") + + return "\n".join(lines) + + +def _safe_name(name: str) -> str: + """파일명 안전 변환""" + return re.sub(r'[^\w가-힣-]', '_', name).strip("_") + + +def _send_notifications(law_name: str, proclamation_date: str, revision_type: str): + """CalDAV + SMTP 알림""" + # CalDAV caldav_url = os.getenv("CALDAV_URL", "") caldav_user = os.getenv("CALDAV_USER", "") caldav_pass = os.getenv("CALDAV_PASS", "") @@ -161,32 +318,18 @@ async def _check_law( create_caldav_todo( caldav_url, caldav_user, caldav_pass, title=f"법령 검토: {law_name}", - description=f"공포일자: {proclamation_date}\n경로: {check_path}", + description=f"공포일자: {proclamation_date}, 개정구분: {revision_type}", due_days=7, ) - logger.info(f"[법령] {law_name} ({proclamation_date}) 등록 완료") - return True - - -def _law_xml_to_markdown(xml_text: str, law_name: str) -> str: - """법령 XML을 Markdown으로 변환 (장/조 구조)""" - root = ET.fromstring(xml_text) - lines = [f"# {law_name}\n"] - - for article in root.iter("조문단위"): - num = article.findtext("조문번호", "") - title = article.findtext("조문제목", "") - content = article.findtext("조문내용", "") - - if title: - lines.append(f"\n## 제{num}조 ({title})\n") - elif num: - lines.append(f"\n## 제{num}조\n") - - if content: - # HTML 태그 제거 - clean = re.sub(r"<[^>]+>", "", content).strip() - lines.append(f"{clean}\n") - - return "\n".join(lines) + # SMTP + smtp_host = os.getenv("MAILPLUS_HOST", "") + smtp_port = int(os.getenv("MAILPLUS_SMTP_PORT", "465")) + smtp_user = os.getenv("MAILPLUS_USER", "") + smtp_pass = os.getenv("MAILPLUS_PASS", "") + if smtp_host and smtp_user: + send_smtp_email( + smtp_host, smtp_port, smtp_user, smtp_pass, + subject=f"[법령 변경] {law_name} ({revision_type})", + body=f"법령명: {law_name}\n공포일자: {proclamation_date}\n개정구분: {revision_type}", + )