- Add automation_state table for incremental sync (last UID, last check) - Add law_monitor worker: 국가법령정보센터 API → NAS/DB/CalDAV VTODO (LAW_OC 승인 대기 중, 코드 완성) - Add mailplus_archive worker: IMAP(993) → .eml NAS save + DB + SMTP notification (imaplib via asyncio.to_thread, timeout=30) - Add daily_digest worker: PostgreSQL/pipeline stats → Markdown + SMTP (documents, law changes, email, queue errors, inbox backlog) - Add CalDAV VTODO helper and SMTP email helper to core/utils.py - Wire 3 cron jobs in APScheduler (law@07:00, mail@07:00+18:00, digest@20:00) with timezone=Asia/Seoul Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
193 lines
6.0 KiB
Python
193 lines
6.0 KiB
Python
"""법령 모니터 워커 — 국가법령정보센터 API로 법령 변경 감지
|
|
|
|
LAW_OC 승인 대기 중 — 코드 완성, 실 호출은 승인 후.
|
|
v1 scripts/law_monitor.py에서 포팅.
|
|
"""
|
|
|
|
import os
|
|
import re
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from xml.etree import ElementTree as ET
|
|
|
|
import httpx
|
|
from sqlalchemy import select
|
|
|
|
from core.config import settings
|
|
from core.database import async_session
|
|
from core.utils import create_caldav_todo, file_hash, send_smtp_email, setup_logger
|
|
from models.automation import AutomationState
|
|
from models.document import Document
|
|
from models.queue import ProcessingQueue
|
|
|
|
logger = setup_logger("law_monitor")
|
|
|
|
LAW_API_BASE = "https://www.law.go.kr"
|
|
|
|
# 모니터링 대상 법령 (Tier 1: 핵심)
|
|
TIER1_LAWS = [
|
|
"산업안전보건법",
|
|
"산업안전보건법 시행령",
|
|
"산업안전보건법 시행규칙",
|
|
"중대재해 처벌 등에 관한 법률",
|
|
"중대재해 처벌 등에 관한 법률 시행령",
|
|
"건설기술 진흥법",
|
|
"건설기술 진흥법 시행령",
|
|
"위험물안전관리법",
|
|
"화학물질관리법",
|
|
"소방시설 설치 및 관리에 관한 법률",
|
|
"전기사업법",
|
|
"고압가스 안전관리법",
|
|
]
|
|
|
|
|
|
async def run():
|
|
"""법령 변경 모니터링 실행"""
|
|
law_oc = os.getenv("LAW_OC", "")
|
|
if not law_oc:
|
|
logger.warning("LAW_OC 미설정 — 법령 API 승인 대기 중")
|
|
return
|
|
|
|
async with async_session() as session:
|
|
# 마지막 체크 날짜 조회
|
|
state = await session.execute(
|
|
select(AutomationState).where(AutomationState.job_name == "law_monitor")
|
|
)
|
|
state_row = state.scalar_one_or_none()
|
|
last_check = state_row.last_check_value if state_row else None
|
|
|
|
today = datetime.now(timezone.utc).strftime("%Y%m%d")
|
|
if last_check == today:
|
|
logger.info("오늘 이미 체크 완료")
|
|
return
|
|
|
|
new_count = 0
|
|
async with httpx.AsyncClient(timeout=30) as client:
|
|
for law_name in TIER1_LAWS:
|
|
try:
|
|
changed = await _check_law(client, law_oc, law_name, session)
|
|
if changed:
|
|
new_count += 1
|
|
except Exception as e:
|
|
logger.error(f"[{law_name}] 체크 실패: {e}")
|
|
|
|
# 상태 업데이트
|
|
if state_row:
|
|
state_row.last_check_value = today
|
|
state_row.last_run_at = datetime.now(timezone.utc)
|
|
else:
|
|
session.add(AutomationState(
|
|
job_name="law_monitor",
|
|
last_check_value=today,
|
|
last_run_at=datetime.now(timezone.utc),
|
|
))
|
|
|
|
await session.commit()
|
|
logger.info(f"법령 모니터 완료: {new_count}건 변경 감지")
|
|
|
|
|
|
async def _check_law(
|
|
client: httpx.AsyncClient,
|
|
law_oc: str,
|
|
law_name: str,
|
|
session,
|
|
) -> bool:
|
|
"""단일 법령 변경 체크 → 변경 시 NAS 저장 + DB 등록 + CalDAV"""
|
|
# 법령 정보 조회
|
|
resp = await client.get(
|
|
f"{LAW_API_BASE}/DRF/lawService.do",
|
|
params={"OC": law_oc, "target": "law", "type": "XML", "query": law_name},
|
|
)
|
|
resp.raise_for_status()
|
|
|
|
root = ET.fromstring(resp.text)
|
|
total = root.findtext(".//totalCnt", "0")
|
|
if total == "0":
|
|
return False
|
|
|
|
# 첫 번째 결과의 MST (법령 고유번호)
|
|
mst = root.findtext(".//법령MST", "")
|
|
proclamation_date = root.findtext(".//공포일자", "")
|
|
if not mst:
|
|
return False
|
|
|
|
# 이미 등록된 법령인지 확인 (같은 공포일자)
|
|
check_path = f"PKM/Knowledge/Industrial_Safety/Legislation/{law_name}_{proclamation_date}.md"
|
|
existing = await session.execute(
|
|
select(Document).where(Document.file_path == check_path)
|
|
)
|
|
if existing.scalar_one_or_none():
|
|
return False
|
|
|
|
# 법령 본문 조회
|
|
text_resp = await client.get(
|
|
f"{LAW_API_BASE}/DRF/lawService.do",
|
|
params={"OC": law_oc, "target": "law", "MST": mst, "type": "XML"},
|
|
)
|
|
text_resp.raise_for_status()
|
|
|
|
# Markdown 변환
|
|
markdown = _law_xml_to_markdown(text_resp.text, law_name)
|
|
|
|
# NAS 저장
|
|
full_path = Path(settings.nas_mount_path) / check_path
|
|
full_path.parent.mkdir(parents=True, exist_ok=True)
|
|
full_path.write_text(markdown, encoding="utf-8")
|
|
|
|
# DB 등록
|
|
doc = Document(
|
|
file_path=check_path,
|
|
file_hash=file_hash(full_path),
|
|
file_format="md",
|
|
file_size=len(markdown.encode()),
|
|
file_type="immutable",
|
|
title=f"{law_name} ({proclamation_date})",
|
|
source_channel="law_monitor",
|
|
data_origin="work",
|
|
)
|
|
session.add(doc)
|
|
await session.flush()
|
|
|
|
# 처리 큐 등록
|
|
session.add(ProcessingQueue(
|
|
document_id=doc.id, stage="extract", status="pending",
|
|
))
|
|
|
|
# CalDAV 태스크 생성
|
|
caldav_url = os.getenv("CALDAV_URL", "")
|
|
caldav_user = os.getenv("CALDAV_USER", "")
|
|
caldav_pass = os.getenv("CALDAV_PASS", "")
|
|
if caldav_url and caldav_user:
|
|
create_caldav_todo(
|
|
caldav_url, caldav_user, caldav_pass,
|
|
title=f"법령 검토: {law_name}",
|
|
description=f"공포일자: {proclamation_date}\n경로: {check_path}",
|
|
due_days=7,
|
|
)
|
|
|
|
logger.info(f"[법령] {law_name} ({proclamation_date}) 등록 완료")
|
|
return True
|
|
|
|
|
|
def _law_xml_to_markdown(xml_text: str, law_name: str) -> str:
|
|
"""법령 XML을 Markdown으로 변환 (장/조 구조)"""
|
|
root = ET.fromstring(xml_text)
|
|
lines = [f"# {law_name}\n"]
|
|
|
|
for article in root.iter("조문단위"):
|
|
num = article.findtext("조문번호", "")
|
|
title = article.findtext("조문제목", "")
|
|
content = article.findtext("조문내용", "")
|
|
|
|
if title:
|
|
lines.append(f"\n## 제{num}조 ({title})\n")
|
|
elif num:
|
|
lines.append(f"\n## 제{num}조\n")
|
|
|
|
if content:
|
|
# HTML 태그 제거
|
|
clean = re.sub(r"<[^>]+>", "", content).strip()
|
|
lines.append(f"{clean}\n")
|
|
|
|
return "\n".join(lines)
|