Files
hyungi_document_server/app/workers/law_monitor.py
Hyungi Ahn 31d5498f8d feat: implement Phase 3 automation workers
- Add automation_state table for incremental sync (last UID, last check)
- Add law_monitor worker: 국가법령정보센터 API → NAS/DB/CalDAV VTODO
  (LAW_OC 승인 대기 중, 코드 완성)
- Add mailplus_archive worker: IMAP(993) → .eml NAS save + DB + SMTP
  notification (imaplib via asyncio.to_thread, timeout=30)
- Add daily_digest worker: PostgreSQL/pipeline stats → Markdown + SMTP
  (documents, law changes, email, queue errors, inbox backlog)
- Add CalDAV VTODO helper and SMTP email helper to core/utils.py
- Wire 3 cron jobs in APScheduler (law@07:00, mail@07:00+18:00,
  digest@20:00) with timezone=Asia/Seoul

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-02 15:24:50 +09:00

193 lines
6.0 KiB
Python

"""법령 모니터 워커 — 국가법령정보센터 API로 법령 변경 감지
LAW_OC 승인 대기 중 — 코드 완성, 실 호출은 승인 후.
v1 scripts/law_monitor.py에서 포팅.
"""
import os
import re
from datetime import datetime, timezone
from pathlib import Path
from xml.etree import ElementTree as ET
import httpx
from sqlalchemy import select
from core.config import settings
from core.database import async_session
from core.utils import create_caldav_todo, file_hash, send_smtp_email, setup_logger
from models.automation import AutomationState
from models.document import Document
from models.queue import ProcessingQueue
logger = setup_logger("law_monitor")
LAW_API_BASE = "https://www.law.go.kr"
# 모니터링 대상 법령 (Tier 1: 핵심)
TIER1_LAWS = [
"산업안전보건법",
"산업안전보건법 시행령",
"산업안전보건법 시행규칙",
"중대재해 처벌 등에 관한 법률",
"중대재해 처벌 등에 관한 법률 시행령",
"건설기술 진흥법",
"건설기술 진흥법 시행령",
"위험물안전관리법",
"화학물질관리법",
"소방시설 설치 및 관리에 관한 법률",
"전기사업법",
"고압가스 안전관리법",
]
async def run():
"""법령 변경 모니터링 실행"""
law_oc = os.getenv("LAW_OC", "")
if not law_oc:
logger.warning("LAW_OC 미설정 — 법령 API 승인 대기 중")
return
async with async_session() as session:
# 마지막 체크 날짜 조회
state = await session.execute(
select(AutomationState).where(AutomationState.job_name == "law_monitor")
)
state_row = state.scalar_one_or_none()
last_check = state_row.last_check_value if state_row else None
today = datetime.now(timezone.utc).strftime("%Y%m%d")
if last_check == today:
logger.info("오늘 이미 체크 완료")
return
new_count = 0
async with httpx.AsyncClient(timeout=30) as client:
for law_name in TIER1_LAWS:
try:
changed = await _check_law(client, law_oc, law_name, session)
if changed:
new_count += 1
except Exception as e:
logger.error(f"[{law_name}] 체크 실패: {e}")
# 상태 업데이트
if state_row:
state_row.last_check_value = today
state_row.last_run_at = datetime.now(timezone.utc)
else:
session.add(AutomationState(
job_name="law_monitor",
last_check_value=today,
last_run_at=datetime.now(timezone.utc),
))
await session.commit()
logger.info(f"법령 모니터 완료: {new_count}건 변경 감지")
async def _check_law(
client: httpx.AsyncClient,
law_oc: str,
law_name: str,
session,
) -> bool:
"""단일 법령 변경 체크 → 변경 시 NAS 저장 + DB 등록 + CalDAV"""
# 법령 정보 조회
resp = await client.get(
f"{LAW_API_BASE}/DRF/lawService.do",
params={"OC": law_oc, "target": "law", "type": "XML", "query": law_name},
)
resp.raise_for_status()
root = ET.fromstring(resp.text)
total = root.findtext(".//totalCnt", "0")
if total == "0":
return False
# 첫 번째 결과의 MST (법령 고유번호)
mst = root.findtext(".//법령MST", "")
proclamation_date = root.findtext(".//공포일자", "")
if not mst:
return False
# 이미 등록된 법령인지 확인 (같은 공포일자)
check_path = f"PKM/Knowledge/Industrial_Safety/Legislation/{law_name}_{proclamation_date}.md"
existing = await session.execute(
select(Document).where(Document.file_path == check_path)
)
if existing.scalar_one_or_none():
return False
# 법령 본문 조회
text_resp = await client.get(
f"{LAW_API_BASE}/DRF/lawService.do",
params={"OC": law_oc, "target": "law", "MST": mst, "type": "XML"},
)
text_resp.raise_for_status()
# Markdown 변환
markdown = _law_xml_to_markdown(text_resp.text, law_name)
# NAS 저장
full_path = Path(settings.nas_mount_path) / check_path
full_path.parent.mkdir(parents=True, exist_ok=True)
full_path.write_text(markdown, encoding="utf-8")
# DB 등록
doc = Document(
file_path=check_path,
file_hash=file_hash(full_path),
file_format="md",
file_size=len(markdown.encode()),
file_type="immutable",
title=f"{law_name} ({proclamation_date})",
source_channel="law_monitor",
data_origin="work",
)
session.add(doc)
await session.flush()
# 처리 큐 등록
session.add(ProcessingQueue(
document_id=doc.id, stage="extract", status="pending",
))
# CalDAV 태스크 생성
caldav_url = os.getenv("CALDAV_URL", "")
caldav_user = os.getenv("CALDAV_USER", "")
caldav_pass = os.getenv("CALDAV_PASS", "")
if caldav_url and caldav_user:
create_caldav_todo(
caldav_url, caldav_user, caldav_pass,
title=f"법령 검토: {law_name}",
description=f"공포일자: {proclamation_date}\n경로: {check_path}",
due_days=7,
)
logger.info(f"[법령] {law_name} ({proclamation_date}) 등록 완료")
return True
def _law_xml_to_markdown(xml_text: str, law_name: str) -> str:
"""법령 XML을 Markdown으로 변환 (장/조 구조)"""
root = ET.fromstring(xml_text)
lines = [f"# {law_name}\n"]
for article in root.iter("조문단위"):
num = article.findtext("조문번호", "")
title = article.findtext("조문제목", "")
content = article.findtext("조문내용", "")
if title:
lines.append(f"\n## 제{num}조 ({title})\n")
elif num:
lines.append(f"\n## 제{num}\n")
if content:
# HTML 태그 제거
clean = re.sub(r"<[^>]+>", "", content).strip()
lines.append(f"{clean}\n")
return "\n".join(lines)