From 61bb6f401b6c00452c749d3acb50ef65709a9e16 Mon Sep 17 00:00:00 2001 From: hyungi Date: Sat, 27 Jun 2026 06:29:02 +0900 Subject: [PATCH] =?UTF-8?q?refactor(workers):=20=EC=A3=BD=EC=9D=80=20?= =?UTF-8?q?=EC=BD=94=EB=93=9C=20law=5Fmonitor.py=20=EC=82=AD=EC=A0=9C=20(3?= =?UTF-8?q?67=EC=A4=84)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit statute_collector(safety-library-1 B-1)가 대체 — 모듈 import 0·스케줄러 미등록·동적로딩 없음 = 절대 로딩 안 되는 dead file. 'law_monitor' 잔존 출현 34건은 전부 source_channel enum 값/config 키/주석(모듈 참조 아님). 복구는 git history. Co-Authored-By: Claude Opus 4.8 (1M context) --- app/workers/law_monitor.py | 367 ------------------------------------- 1 file changed, 367 deletions(-) delete mode 100644 app/workers/law_monitor.py diff --git a/app/workers/law_monitor.py b/app/workers/law_monitor.py deleted file mode 100644 index a3a304b..0000000 --- a/app/workers/law_monitor.py +++ /dev/null @@ -1,367 +0,0 @@ -"""법령 모니터 워커 — 국가법령정보센터 API 연동 - -26개 법령 모니터링, 편/장 단위 분할 저장, 변경 이력 추적. -매일 07:00 실행 (APScheduler). -""" - -import os -import re -from datetime import date, datetime, timezone -from pathlib import Path -from xml.etree import ElementTree as ET - -import httpx -from sqlalchemy import select - -from core.config import settings -from core.database import async_session -from core.utils import create_caldav_todo, file_hash, setup_logger -from models.automation import AutomationState -from models.document import Document -from models.queue import enqueue_stage - -logger = setup_logger("law_monitor") - -LAW_SEARCH_URL = "https://www.law.go.kr/DRF/lawSearch.do" -LAW_SERVICE_URL = "https://www.law.go.kr/DRF/lawService.do" - -# 모니터링 대상 법령 (26개) -MONITORED_LAWS = [ - # 산업안전보건 핵심 - "산업안전보건법", - "산업안전보건법 시행령", - "산업안전보건법 시행규칙", - "산업안전보건기준에 관한 규칙", - "유해위험작업의 취업 제한에 관한 규칙", - "중대재해 처벌 등에 관한 법률", - "중대재해 처벌 등에 관한 법률 시행령", - # 건설안전 - "건설기술 진흥법", - "건설기술 진흥법 시행령", - "건설기술 진흥법 시행규칙", - "시설물의 안전 및 유지관리에 관한 특별법", - # 위험물/화학 - "위험물안전관리법", - "위험물안전관리법 시행령", - "위험물안전관리법 시행규칙", - "화학물질관리법", - "화학물질관리법 시행령", - "화학물질의 등록 및 평가 등에 관한 법률", - # 소방/전기/가스 - "소방시설 설치 및 관리에 관한 법률", - "소방시설 설치 및 관리에 관한 법률 시행령", - "전기사업법", - "전기안전관리법", - "고압가스 안전관리법", - "고압가스 안전관리법 시행령", - "액화석유가스의 안전관리 및 사업법", - # 근로/환경 - "근로기준법", - "환경영향평가법", -] - - -async def run(): - """법령 변경 모니터링 실행""" - law_oc = os.getenv("LAW_OC", "") - if not law_oc: - logger.warning("LAW_OC 미설정 — 법령 API 승인 대기 중") - return - - async with async_session() as session: - state = await session.execute( - select(AutomationState).where(AutomationState.job_name == "law_monitor") - ) - state_row = state.scalar_one_or_none() - last_check = state_row.last_check_value if state_row else None - - today = datetime.now(timezone.utc).strftime("%Y%m%d") - if last_check == today: - logger.info("오늘 이미 체크 완료") - return - - new_count = 0 - async with httpx.AsyncClient(timeout=30) as client: - for law_name in MONITORED_LAWS: - try: - count = await _check_law(client, law_oc, law_name, session) - new_count += count - except Exception as e: - logger.error(f"[{law_name}] 체크 실패: {e}") - - # 상태 업데이트 - if state_row: - state_row.last_check_value = today - state_row.last_run_at = datetime.now(timezone.utc) - else: - session.add(AutomationState( - job_name="law_monitor", - last_check_value=today, - last_run_at=datetime.now(timezone.utc), - )) - - await session.commit() - logger.info(f"법령 모니터 완료: {new_count}건 신규/변경 감지") - - -async def _check_law( - client: httpx.AsyncClient, - law_oc: str, - law_name: str, - session, -) -> int: - """단일 법령 검색 → 변경 감지 → 분할 저장""" - # 법령 검색 (lawSearch.do) - resp = await client.get( - LAW_SEARCH_URL, - params={"OC": law_oc, "target": "law", "type": "XML", "query": law_name}, - ) - resp.raise_for_status() - - root = ET.fromstring(resp.text) - total = root.findtext(".//totalCnt", "0") - if total == "0": - logger.debug(f"[{law_name}] 검색 결과 없음") - return 0 - - # 정확히 일치하는 법령 찾기 - for law_elem in root.findall(".//law"): - found_name = law_elem.findtext("법령명한글", "").strip() - if found_name != law_name: - continue - - mst = law_elem.findtext("법령일련번호", "") - proclamation_date = law_elem.findtext("공포일자", "") - revision_type = law_elem.findtext("제개정구분명", "") - - if not mst: - continue - - # 이미 등록된 법령인지 확인 (같은 법령명 + 공포일자) - existing = await session.execute( - select(Document).where( - Document.title.like(f"{law_name}%"), - Document.source_channel == "law_monitor", - ) - ) - existing_docs = existing.scalars().all() - - # 같은 공포일자 이미 있으면 skip - for doc in existing_docs: - if proclamation_date in (doc.title or ""): - return 0 - - # 이전 공포일 찾기 (변경 이력용) - prev_date = "" - if existing_docs: - prev_date = max( - (re.search(r'\d{8}', doc.title or "").group() for doc in existing_docs - if re.search(r'\d{8}', doc.title or "")), - default="" - ) - - # 본문 조회 (lawService.do) - text_resp = await client.get( - LAW_SERVICE_URL, - params={"OC": law_oc, "target": "law", "MST": mst, "type": "XML"}, - ) - text_resp.raise_for_status() - - # 분할 저장 - count = await _save_law_split( - session, text_resp.text, law_name, proclamation_date, - revision_type, prev_date, - ) - - # DB 먼저 커밋 (알림 실패가 저장을 막지 않도록) - await session.commit() - - # CalDAV + SMTP 알림 (실패해도 무시) - try: - _send_notifications(law_name, proclamation_date, revision_type) - except Exception as e: - logger.warning(f"[{law_name}] 알림 발송 실패 (무시): {e}") - - return count - - return 0 - - -async def _save_law_split( - session, xml_text: str, law_name: str, proclamation_date: str, - revision_type: str, prev_date: str, -) -> int: - """법령 XML → 장(章) 단위 Markdown 분할 저장""" - root = ET.fromstring(xml_text) - - # 조문단위에서 장 구분자 찾기 (조문키가 000으로 끝나는 조문) - units = root.findall(".//조문단위") - chapters = [] # [(장제목, [조문들])] - current_chapter = None - current_articles = [] - - for unit in units: - key = unit.attrib.get("조문키", "") - content = (unit.findtext("조문내용", "") or "").strip() - - # 장 구분자: 키가 000으로 끝나고 내용에 "제X장" 포함 - if key.endswith("000") and re.search(r"제\d+장", content): - # 이전 장/서문 저장 - if current_articles: - chapter_name = current_chapter or "서문" - chapters.append((chapter_name, current_articles)) - chapter_match = re.search(r"(제\d+장\s*.+)", content) - current_chapter = chapter_match.group(1).strip() if chapter_match else content.strip() - current_articles = [] - else: - current_articles.append(unit) - - # 마지막 장 저장 - if current_articles: - chapter_name = current_chapter or "서문" - chapters.append((chapter_name, current_articles)) - - # 장 분할 성공 - sections = [] - if chapters: - for chapter_title, articles in chapters: - md_lines = [f"# {law_name}\n", f"## {chapter_title}\n"] - for article in articles: - title = article.findtext("조문제목", "") - content = article.findtext("조문내용", "") - if title: - md_lines.append(f"\n### {title}\n") - if content: - md_lines.append(content.strip()) - section_name = _safe_name(chapter_title) - sections.append((section_name, "\n".join(md_lines))) - else: - # 장 분할 실패 → 전체 1파일 - full_md = _law_xml_to_markdown(xml_text, law_name) - sections.append(("전문", full_md)) - - # 각 섹션 저장 - inbox_dir = Path(settings.nas_mount_path) / "PKM" / "Inbox" - inbox_dir.mkdir(parents=True, exist_ok=True) - count = 0 - - for section_name, content in sections: - filename = f"{law_name}_{proclamation_date}_{section_name}.md" - file_path = inbox_dir / filename - file_path.write_text(content, encoding="utf-8") - - rel_path = str(file_path.relative_to(Path(settings.nas_mount_path))) - - # 변경 이력 메모 - note = "" - if prev_date: - note = ( - f"[자동] 법령 개정 감지\n" - f"이전 공포일: {prev_date}\n" - f"현재 공포일: {proclamation_date}\n" - f"개정구분: {revision_type}" - ) - - # 안전 자료실 A-2 — 공포일 파싱 (law published_date = COALESCE(시행일, 공포일) 계약, - # 본 레거시 워커는 공포일만 보유 — 시행일 기반 버전 체인은 B-1 statute_collector 소관) - _digits = re.sub(r"\D", "", str(proclamation_date or "")) - pub_date = None - if len(_digits) == 8: - try: - pub_date = date(int(_digits[:4]), int(_digits[4:6]), int(_digits[6:8])) - except ValueError: - pub_date = None - - doc = Document( - file_path=rel_path, - file_hash=file_hash(file_path), - file_format="md", - file_size=len(content.encode()), - file_type="immutable", - title=f"{law_name} ({proclamation_date}) {section_name}", - source_channel="law_monitor", - data_origin="work", - category="law", - # 안전 자료실 A-2 — ingest 시점 deterministic. 법령 텍스트 = 저작권법 제7조 - # 비보호 저작물 (public domain). 본 워커는 휴면(LAW_OC 미설정)이나 코드 경로 유지. - material_type="law", - jurisdiction="KR", - published_date=pub_date, - extract_meta={"license": {"scheme": "public_domain", "redistribute": True, - "attribution": "국가법령정보센터"}}, - user_note=note or None, - ) - session.add(doc) - await session.flush() - - await enqueue_stage(session, doc.id, "extract") - count += 1 - - logger.info(f"[법령] {law_name} ({proclamation_date}) → {count}개 섹션 저장") - return count - - -def _xml_section_to_markdown(elem) -> str: - """XML 섹션(편/장)을 Markdown으로 변환""" - lines = [] - for article in elem.iter(): - tag = article.tag - text = (article.text or "").strip() - if not text: - continue - if "조" in tag: - lines.append(f"\n### {text}\n") - elif "항" in tag: - lines.append(f"\n{text}\n") - elif "호" in tag: - lines.append(f"- {text}") - elif "목" in tag: - lines.append(f" - {text}") - else: - lines.append(text) - return "\n".join(lines) - - -def _law_xml_to_markdown(xml_text: str, law_name: str) -> str: - """법령 XML 전체를 Markdown으로 변환""" - root = ET.fromstring(xml_text) - lines = [f"# {law_name}\n"] - - for elem in root.iter(): - tag = elem.tag - text = (elem.text or "").strip() - if not text: - continue - if "편" in tag and "제목" not in tag: - lines.append(f"\n## {text}\n") - elif "장" in tag and "제목" not in tag: - lines.append(f"\n## {text}\n") - elif "조" in tag: - lines.append(f"\n### {text}\n") - elif "항" in tag: - lines.append(f"\n{text}\n") - elif "호" in tag: - lines.append(f"- {text}") - elif "목" in tag: - lines.append(f" - {text}") - - return "\n".join(lines) - - -def _safe_name(name: str) -> str: - """파일명 안전 변환""" - return re.sub(r'[^\w가-힣-]', '_', name).strip("_") - - -def _send_notifications(law_name: str, proclamation_date: str, revision_type: str): - """CalDAV 할일 알림 (SMTP 발송은 2026-06-10 폐기 — CalDAV 가 단일 알림 채널)""" - caldav_url = os.getenv("CALDAV_URL", "") - caldav_user = os.getenv("CALDAV_USER", "") - caldav_pass = os.getenv("CALDAV_PASS", "") - if caldav_url and caldav_user: - create_caldav_todo( - caldav_url, caldav_user, caldav_pass, - title=f"법령 검토: {law_name}", - description=f"공포일자: {proclamation_date}, 개정구분: {revision_type}", - due_days=7, - )