Compare commits
3 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 2fedaa065b | |||
| 274d2009c4 | |||
| 61bb6f401b |
+21
-15
@@ -57,12 +57,12 @@ def _parse_migration_files(migrations_dir: Path) -> list[tuple[int, str, Path]]:
|
||||
|
||||
def _validate_sql_content(name: str, sql: str) -> None:
|
||||
"""migration SQL에 BEGIN/COMMIT이 포함되어 있으면 에러 (외부 트랜잭션 깨짐 방지)"""
|
||||
# 주석(-- ...) 라인 제거 후 검사
|
||||
lines = [
|
||||
line for line in sql.splitlines()
|
||||
if not line.strip().startswith("--")
|
||||
]
|
||||
stripped = "\n".join(lines).upper()
|
||||
# 주석(전체 줄 + 인라인 `-- ...`) 제거 후 검사. ★인라인 주석을 안 지우면 설명 주석의
|
||||
# 'commit/begin' 단어(예 365_scan_jobs 의 `-- commit 시 documents.title 로 전파`)를
|
||||
# 트랜잭션 제어문으로 false-positive 로 잡아 fresh DB/DR 부트스트랩이 깨진다(verification
|
||||
# 실측 2026-06). 줄별로 `--` 이후를 잘라 주석 텍스트를 검사에서 제외.
|
||||
cleaned = [re.sub(r"--.*$", "", line) for line in sql.splitlines()]
|
||||
stripped = "\n".join(cleaned).upper()
|
||||
for keyword in ("BEGIN", "COMMIT", "ROLLBACK"):
|
||||
# 단어 경계로 매칭 (예: BEGIN_SOMETHING은 제외)
|
||||
if re.search(rf"\b{keyword}\b", stripped):
|
||||
@@ -70,6 +70,13 @@ def _validate_sql_content(name: str, sql: str) -> None:
|
||||
f"migration {name}에 {keyword} 포함됨 — "
|
||||
f"migration SQL에는 트랜잭션 제어문을 넣지 마세요"
|
||||
)
|
||||
# schema_migrations 수정 금지 (runner 가 스탬프 관리) — 주석 제외(stripped) 검사.
|
||||
# (구: _run_migrations 의 raw `"schema_migrations" in sql.lower()` 가 주석 미제외라
|
||||
# 365 의 '-- ... schema_migrations 를 건드리지 않음' 주석을 false-positive 로 잡았음.)
|
||||
if "SCHEMA_MIGRATIONS" in stripped:
|
||||
raise RuntimeError(
|
||||
f"Migration {name} must not modify schema_migrations table"
|
||||
)
|
||||
|
||||
|
||||
# R1: baseline 스냅샷이 대표하는 마지막 마이그레이션 버전 (이하 버전은 baseline 에 포함).
|
||||
@@ -167,16 +174,15 @@ async def _run_migrations(conn) -> None:
|
||||
|
||||
for version, name, path in pending:
|
||||
sql = path.read_text(encoding="utf-8")
|
||||
_validate_sql_content(name, sql)
|
||||
if "schema_migrations" in sql.lower():
|
||||
raise ValueError(
|
||||
f"Migration {name} must not modify schema_migrations table"
|
||||
)
|
||||
_validate_sql_content(name, sql) # BEGIN/COMMIT + schema_migrations 검사(주석 제외)
|
||||
logger.info(f"[migration] {name} 실행 중...")
|
||||
# raw driver SQL 사용 — text() 의 :name bind parameter 해석으로
|
||||
# SQL 주석/literal 에 콜론이 들어가면 InvalidRequestError 발생.
|
||||
# exec_driver_sql 은 SQL 을 driver(asyncpg) 에 그대로 전달.
|
||||
await conn.exec_driver_sql(sql)
|
||||
# raw asyncpg simple 프로토콜로 실행 — baseline 적재(_load_baseline_if_fresh)와 동일.
|
||||
# ★exec_driver_sql 은 prepared 프로토콜이라 multi-statement 불허("cannot insert multiple
|
||||
# commands into a prepared statement"). 365_scan_jobs 처럼 테이블+시드+인덱스를 한 파일에
|
||||
# 담은 마이그(컨벤션상 1-statement 권장이나 이미 prod 적재)도 fresh DB/DR replay 되게
|
||||
# simple execute 사용. text() :name 콜론-binding 이슈도 동일하게 회피(raw 전달).
|
||||
raw = await conn.get_raw_connection()
|
||||
await raw.driver_connection.execute(sql)
|
||||
await conn.execute(
|
||||
text("INSERT INTO schema_migrations (version, name) VALUES (:v, :n)"),
|
||||
{"v": version, "n": name},
|
||||
|
||||
@@ -20,6 +20,7 @@ from models.chunk import DocumentChunk
|
||||
from models.document import Document
|
||||
from models.study_question import StudyQuestion
|
||||
from models.study_topic import StudyTopicDocument
|
||||
from services.search.license_filter import restricted_exclude_orm
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -113,6 +114,9 @@ async def _gather_document_evidence(
|
||||
select(Document.id, Document.title, Document.ai_summary).where(
|
||||
Document.id.in_(doc_ids),
|
||||
Document.deleted_at.is_(None),
|
||||
# B-4: licensed_restricted 제외 — explanation_rag 와 동일 술어(a안 U-2①). 누락 시
|
||||
# 구매 자료 verbatim 이 분야노트 RAG 로 새던 보안 drift(복제 과정 누락).
|
||||
restricted_exclude_orm(),
|
||||
)
|
||||
)
|
||||
).all()
|
||||
|
||||
@@ -1,367 +0,0 @@
|
||||
"""법령 모니터 워커 — 국가법령정보센터 API 연동
|
||||
|
||||
26개 법령 모니터링, 편/장 단위 분할 저장, 변경 이력 추적.
|
||||
매일 07:00 실행 (APScheduler).
|
||||
"""
|
||||
|
||||
import os
|
||||
import re
|
||||
from datetime import date, datetime, timezone
|
||||
from pathlib import Path
|
||||
from xml.etree import ElementTree as ET
|
||||
|
||||
import httpx
|
||||
from sqlalchemy import select
|
||||
|
||||
from core.config import settings
|
||||
from core.database import async_session
|
||||
from core.utils import create_caldav_todo, file_hash, setup_logger
|
||||
from models.automation import AutomationState
|
||||
from models.document import Document
|
||||
from models.queue import enqueue_stage
|
||||
|
||||
logger = setup_logger("law_monitor")
|
||||
|
||||
LAW_SEARCH_URL = "https://www.law.go.kr/DRF/lawSearch.do"
|
||||
LAW_SERVICE_URL = "https://www.law.go.kr/DRF/lawService.do"
|
||||
|
||||
# 모니터링 대상 법령 (26개)
|
||||
MONITORED_LAWS = [
|
||||
# 산업안전보건 핵심
|
||||
"산업안전보건법",
|
||||
"산업안전보건법 시행령",
|
||||
"산업안전보건법 시행규칙",
|
||||
"산업안전보건기준에 관한 규칙",
|
||||
"유해위험작업의 취업 제한에 관한 규칙",
|
||||
"중대재해 처벌 등에 관한 법률",
|
||||
"중대재해 처벌 등에 관한 법률 시행령",
|
||||
# 건설안전
|
||||
"건설기술 진흥법",
|
||||
"건설기술 진흥법 시행령",
|
||||
"건설기술 진흥법 시행규칙",
|
||||
"시설물의 안전 및 유지관리에 관한 특별법",
|
||||
# 위험물/화학
|
||||
"위험물안전관리법",
|
||||
"위험물안전관리법 시행령",
|
||||
"위험물안전관리법 시행규칙",
|
||||
"화학물질관리법",
|
||||
"화학물질관리법 시행령",
|
||||
"화학물질의 등록 및 평가 등에 관한 법률",
|
||||
# 소방/전기/가스
|
||||
"소방시설 설치 및 관리에 관한 법률",
|
||||
"소방시설 설치 및 관리에 관한 법률 시행령",
|
||||
"전기사업법",
|
||||
"전기안전관리법",
|
||||
"고압가스 안전관리법",
|
||||
"고압가스 안전관리법 시행령",
|
||||
"액화석유가스의 안전관리 및 사업법",
|
||||
# 근로/환경
|
||||
"근로기준법",
|
||||
"환경영향평가법",
|
||||
]
|
||||
|
||||
|
||||
async def run():
|
||||
"""법령 변경 모니터링 실행"""
|
||||
law_oc = os.getenv("LAW_OC", "")
|
||||
if not law_oc:
|
||||
logger.warning("LAW_OC 미설정 — 법령 API 승인 대기 중")
|
||||
return
|
||||
|
||||
async with async_session() as session:
|
||||
state = await session.execute(
|
||||
select(AutomationState).where(AutomationState.job_name == "law_monitor")
|
||||
)
|
||||
state_row = state.scalar_one_or_none()
|
||||
last_check = state_row.last_check_value if state_row else None
|
||||
|
||||
today = datetime.now(timezone.utc).strftime("%Y%m%d")
|
||||
if last_check == today:
|
||||
logger.info("오늘 이미 체크 완료")
|
||||
return
|
||||
|
||||
new_count = 0
|
||||
async with httpx.AsyncClient(timeout=30) as client:
|
||||
for law_name in MONITORED_LAWS:
|
||||
try:
|
||||
count = await _check_law(client, law_oc, law_name, session)
|
||||
new_count += count
|
||||
except Exception as e:
|
||||
logger.error(f"[{law_name}] 체크 실패: {e}")
|
||||
|
||||
# 상태 업데이트
|
||||
if state_row:
|
||||
state_row.last_check_value = today
|
||||
state_row.last_run_at = datetime.now(timezone.utc)
|
||||
else:
|
||||
session.add(AutomationState(
|
||||
job_name="law_monitor",
|
||||
last_check_value=today,
|
||||
last_run_at=datetime.now(timezone.utc),
|
||||
))
|
||||
|
||||
await session.commit()
|
||||
logger.info(f"법령 모니터 완료: {new_count}건 신규/변경 감지")
|
||||
|
||||
|
||||
async def _check_law(
|
||||
client: httpx.AsyncClient,
|
||||
law_oc: str,
|
||||
law_name: str,
|
||||
session,
|
||||
) -> int:
|
||||
"""단일 법령 검색 → 변경 감지 → 분할 저장"""
|
||||
# 법령 검색 (lawSearch.do)
|
||||
resp = await client.get(
|
||||
LAW_SEARCH_URL,
|
||||
params={"OC": law_oc, "target": "law", "type": "XML", "query": law_name},
|
||||
)
|
||||
resp.raise_for_status()
|
||||
|
||||
root = ET.fromstring(resp.text)
|
||||
total = root.findtext(".//totalCnt", "0")
|
||||
if total == "0":
|
||||
logger.debug(f"[{law_name}] 검색 결과 없음")
|
||||
return 0
|
||||
|
||||
# 정확히 일치하는 법령 찾기
|
||||
for law_elem in root.findall(".//law"):
|
||||
found_name = law_elem.findtext("법령명한글", "").strip()
|
||||
if found_name != law_name:
|
||||
continue
|
||||
|
||||
mst = law_elem.findtext("법령일련번호", "")
|
||||
proclamation_date = law_elem.findtext("공포일자", "")
|
||||
revision_type = law_elem.findtext("제개정구분명", "")
|
||||
|
||||
if not mst:
|
||||
continue
|
||||
|
||||
# 이미 등록된 법령인지 확인 (같은 법령명 + 공포일자)
|
||||
existing = await session.execute(
|
||||
select(Document).where(
|
||||
Document.title.like(f"{law_name}%"),
|
||||
Document.source_channel == "law_monitor",
|
||||
)
|
||||
)
|
||||
existing_docs = existing.scalars().all()
|
||||
|
||||
# 같은 공포일자 이미 있으면 skip
|
||||
for doc in existing_docs:
|
||||
if proclamation_date in (doc.title or ""):
|
||||
return 0
|
||||
|
||||
# 이전 공포일 찾기 (변경 이력용)
|
||||
prev_date = ""
|
||||
if existing_docs:
|
||||
prev_date = max(
|
||||
(re.search(r'\d{8}', doc.title or "").group() for doc in existing_docs
|
||||
if re.search(r'\d{8}', doc.title or "")),
|
||||
default=""
|
||||
)
|
||||
|
||||
# 본문 조회 (lawService.do)
|
||||
text_resp = await client.get(
|
||||
LAW_SERVICE_URL,
|
||||
params={"OC": law_oc, "target": "law", "MST": mst, "type": "XML"},
|
||||
)
|
||||
text_resp.raise_for_status()
|
||||
|
||||
# 분할 저장
|
||||
count = await _save_law_split(
|
||||
session, text_resp.text, law_name, proclamation_date,
|
||||
revision_type, prev_date,
|
||||
)
|
||||
|
||||
# DB 먼저 커밋 (알림 실패가 저장을 막지 않도록)
|
||||
await session.commit()
|
||||
|
||||
# CalDAV + SMTP 알림 (실패해도 무시)
|
||||
try:
|
||||
_send_notifications(law_name, proclamation_date, revision_type)
|
||||
except Exception as e:
|
||||
logger.warning(f"[{law_name}] 알림 발송 실패 (무시): {e}")
|
||||
|
||||
return count
|
||||
|
||||
return 0
|
||||
|
||||
|
||||
async def _save_law_split(
|
||||
session, xml_text: str, law_name: str, proclamation_date: str,
|
||||
revision_type: str, prev_date: str,
|
||||
) -> int:
|
||||
"""법령 XML → 장(章) 단위 Markdown 분할 저장"""
|
||||
root = ET.fromstring(xml_text)
|
||||
|
||||
# 조문단위에서 장 구분자 찾기 (조문키가 000으로 끝나는 조문)
|
||||
units = root.findall(".//조문단위")
|
||||
chapters = [] # [(장제목, [조문들])]
|
||||
current_chapter = None
|
||||
current_articles = []
|
||||
|
||||
for unit in units:
|
||||
key = unit.attrib.get("조문키", "")
|
||||
content = (unit.findtext("조문내용", "") or "").strip()
|
||||
|
||||
# 장 구분자: 키가 000으로 끝나고 내용에 "제X장" 포함
|
||||
if key.endswith("000") and re.search(r"제\d+장", content):
|
||||
# 이전 장/서문 저장
|
||||
if current_articles:
|
||||
chapter_name = current_chapter or "서문"
|
||||
chapters.append((chapter_name, current_articles))
|
||||
chapter_match = re.search(r"(제\d+장\s*.+)", content)
|
||||
current_chapter = chapter_match.group(1).strip() if chapter_match else content.strip()
|
||||
current_articles = []
|
||||
else:
|
||||
current_articles.append(unit)
|
||||
|
||||
# 마지막 장 저장
|
||||
if current_articles:
|
||||
chapter_name = current_chapter or "서문"
|
||||
chapters.append((chapter_name, current_articles))
|
||||
|
||||
# 장 분할 성공
|
||||
sections = []
|
||||
if chapters:
|
||||
for chapter_title, articles in chapters:
|
||||
md_lines = [f"# {law_name}\n", f"## {chapter_title}\n"]
|
||||
for article in articles:
|
||||
title = article.findtext("조문제목", "")
|
||||
content = article.findtext("조문내용", "")
|
||||
if title:
|
||||
md_lines.append(f"\n### {title}\n")
|
||||
if content:
|
||||
md_lines.append(content.strip())
|
||||
section_name = _safe_name(chapter_title)
|
||||
sections.append((section_name, "\n".join(md_lines)))
|
||||
else:
|
||||
# 장 분할 실패 → 전체 1파일
|
||||
full_md = _law_xml_to_markdown(xml_text, law_name)
|
||||
sections.append(("전문", full_md))
|
||||
|
||||
# 각 섹션 저장
|
||||
inbox_dir = Path(settings.nas_mount_path) / "PKM" / "Inbox"
|
||||
inbox_dir.mkdir(parents=True, exist_ok=True)
|
||||
count = 0
|
||||
|
||||
for section_name, content in sections:
|
||||
filename = f"{law_name}_{proclamation_date}_{section_name}.md"
|
||||
file_path = inbox_dir / filename
|
||||
file_path.write_text(content, encoding="utf-8")
|
||||
|
||||
rel_path = str(file_path.relative_to(Path(settings.nas_mount_path)))
|
||||
|
||||
# 변경 이력 메모
|
||||
note = ""
|
||||
if prev_date:
|
||||
note = (
|
||||
f"[자동] 법령 개정 감지\n"
|
||||
f"이전 공포일: {prev_date}\n"
|
||||
f"현재 공포일: {proclamation_date}\n"
|
||||
f"개정구분: {revision_type}"
|
||||
)
|
||||
|
||||
# 안전 자료실 A-2 — 공포일 파싱 (law published_date = COALESCE(시행일, 공포일) 계약,
|
||||
# 본 레거시 워커는 공포일만 보유 — 시행일 기반 버전 체인은 B-1 statute_collector 소관)
|
||||
_digits = re.sub(r"\D", "", str(proclamation_date or ""))
|
||||
pub_date = None
|
||||
if len(_digits) == 8:
|
||||
try:
|
||||
pub_date = date(int(_digits[:4]), int(_digits[4:6]), int(_digits[6:8]))
|
||||
except ValueError:
|
||||
pub_date = None
|
||||
|
||||
doc = Document(
|
||||
file_path=rel_path,
|
||||
file_hash=file_hash(file_path),
|
||||
file_format="md",
|
||||
file_size=len(content.encode()),
|
||||
file_type="immutable",
|
||||
title=f"{law_name} ({proclamation_date}) {section_name}",
|
||||
source_channel="law_monitor",
|
||||
data_origin="work",
|
||||
category="law",
|
||||
# 안전 자료실 A-2 — ingest 시점 deterministic. 법령 텍스트 = 저작권법 제7조
|
||||
# 비보호 저작물 (public domain). 본 워커는 휴면(LAW_OC 미설정)이나 코드 경로 유지.
|
||||
material_type="law",
|
||||
jurisdiction="KR",
|
||||
published_date=pub_date,
|
||||
extract_meta={"license": {"scheme": "public_domain", "redistribute": True,
|
||||
"attribution": "국가법령정보센터"}},
|
||||
user_note=note or None,
|
||||
)
|
||||
session.add(doc)
|
||||
await session.flush()
|
||||
|
||||
await enqueue_stage(session, doc.id, "extract")
|
||||
count += 1
|
||||
|
||||
logger.info(f"[법령] {law_name} ({proclamation_date}) → {count}개 섹션 저장")
|
||||
return count
|
||||
|
||||
|
||||
def _xml_section_to_markdown(elem) -> str:
|
||||
"""XML 섹션(편/장)을 Markdown으로 변환"""
|
||||
lines = []
|
||||
for article in elem.iter():
|
||||
tag = article.tag
|
||||
text = (article.text or "").strip()
|
||||
if not text:
|
||||
continue
|
||||
if "조" in tag:
|
||||
lines.append(f"\n### {text}\n")
|
||||
elif "항" in tag:
|
||||
lines.append(f"\n{text}\n")
|
||||
elif "호" in tag:
|
||||
lines.append(f"- {text}")
|
||||
elif "목" in tag:
|
||||
lines.append(f" - {text}")
|
||||
else:
|
||||
lines.append(text)
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
def _law_xml_to_markdown(xml_text: str, law_name: str) -> str:
|
||||
"""법령 XML 전체를 Markdown으로 변환"""
|
||||
root = ET.fromstring(xml_text)
|
||||
lines = [f"# {law_name}\n"]
|
||||
|
||||
for elem in root.iter():
|
||||
tag = elem.tag
|
||||
text = (elem.text or "").strip()
|
||||
if not text:
|
||||
continue
|
||||
if "편" in tag and "제목" not in tag:
|
||||
lines.append(f"\n## {text}\n")
|
||||
elif "장" in tag and "제목" not in tag:
|
||||
lines.append(f"\n## {text}\n")
|
||||
elif "조" in tag:
|
||||
lines.append(f"\n### {text}\n")
|
||||
elif "항" in tag:
|
||||
lines.append(f"\n{text}\n")
|
||||
elif "호" in tag:
|
||||
lines.append(f"- {text}")
|
||||
elif "목" in tag:
|
||||
lines.append(f" - {text}")
|
||||
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
def _safe_name(name: str) -> str:
|
||||
"""파일명 안전 변환"""
|
||||
return re.sub(r'[^\w가-힣-]', '_', name).strip("_")
|
||||
|
||||
|
||||
def _send_notifications(law_name: str, proclamation_date: str, revision_type: str):
|
||||
"""CalDAV 할일 알림 (SMTP 발송은 2026-06-10 폐기 — CalDAV 가 단일 알림 채널)"""
|
||||
caldav_url = os.getenv("CALDAV_URL", "")
|
||||
caldav_user = os.getenv("CALDAV_USER", "")
|
||||
caldav_pass = os.getenv("CALDAV_PASS", "")
|
||||
if caldav_url and caldav_user:
|
||||
create_caldav_todo(
|
||||
caldav_url, caldav_user, caldav_pass,
|
||||
title=f"법령 검토: {law_name}",
|
||||
description=f"공포일자: {proclamation_date}, 개정구분: {revision_type}",
|
||||
due_days=7,
|
||||
)
|
||||
Reference in New Issue
Block a user