"""KR 법령 어댑터 — 국가법령정보센터 (law.go.kr DRF) (plan safety-library-1 B-1 PR①). poll_changes = lawSearch 목록 diff: 워치리스트 행별 정식 법령명 exact 조회 → MST(법령일련번호) != watermark 이면 ChangeEvent. law_monitor 의 검증된 호출 형태 재사용. fixture (2026-06-13 라이브 박제, tests/fixtures/statute_kr/): - lawsearch_*.xml — 목록 필드: 법령ID(불변)·법령일련번호(MST)·공포일자·시행일자·제개정구분명 - lawservice_*.xml.gz — 전문 1콜 XML: 조문단위 853(산안기준규칙) + 별표단위 23 전부 포함 = 스냅샷 의미론 확정(R7-M3 ①: annex 부분 fetch 실패 개념 없음 — 같은 응답에 없는 별표 = 삭제 간주 가능). 별표번호+별표가지번호 = 구조화 필드(R7-M3 ② — suffix 문자열 파싱 불요, version_key 합성은 이 필드 기반. PR② fetch_version 소관). - 조문 취득 방식 판정(R2-m1): 전문 1콜 + 로컬 파싱 확정 — lawjosub 조 단위 호출이면 산안기준규칙(853조)은 개정당 호출 폭증. lawjosub fixture 는 보조 박제. 주의: 응답의 '법령상세링크' 필드에 OC 키가 포함됨 — fixture/로그에 raw 응답을 남길 때 새니타이즈 필수 (repo fixture 는 __OC_REDACTED__ 처리됨). """ import asyncio import os import xml.etree.ElementTree as ET import httpx from core.crawl_politeness import CRAWL_UA from core.utils import setup_logger from workers.statute_adapters import ChangeEvent, VersionPayload logger = setup_logger("statute_kr") JURISDICTION = "KR" SOURCE_API = "law.go.kr" LAW_SEARCH_URL = "https://www.law.go.kr/DRF/lawSearch.do" LAW_SERVICE_URL = "https://www.law.go.kr/DRF/lawService.do" # 같은 도메인 연속 호출 간격 (일 1회 x 26콜 — 보수적) _POLL_DELAY_S = 1.5 def _oc() -> str: oc = os.getenv("LAW_OC", "") if not oc: raise RuntimeError("LAW_OC 미설정 — statute KR 어댑터 사용 불가") return oc def parse_search_hit(xml_text: str, official_title: str) -> dict | None: """lawSearch XML 에서 정식 법령명 exact match 1건 추출 (순수 함수 — fixture 테스트 대상). 정식명 기준 exact match — 워치리스트 title 이 정식명(가운뎃점 포함)이므로 안전. (law_monitor 의 하드코딩 '유해위험작업...'(점 없음)이 영구 미매칭이던 함정의 교훈: 조회 키는 반드시 레지스트리의 정식명을 쓴다.) """ root = ET.fromstring(xml_text) for law in root.findall(".//law"): if (law.findtext("법령명한글") or "").strip() != official_title: continue mst = (law.findtext("법령일련번호") or "").strip() if not mst: continue return { "mst": mst, "law_id": (law.findtext("법령ID") or "").strip(), "promulgation_date": (law.findtext("공포일자") or "").strip() or None, "effective_date": (law.findtext("시행일자") or "").strip() or None, "revision_type": (law.findtext("제개정구분명") or "").strip() or None, "status_code": (law.findtext("현행연혁코드") or "").strip() or None, } return None def detect_change(hit: dict | None, act_family_id: str, act_title: str, watermark: str | None) -> ChangeEvent | None: """목록 hit + 워터마크 → ChangeEvent (순수 함수 — fixture 테스트 대상). - hit 없음 = 감지 불가 (None — 호출측이 fail-loud 로그. 폐지 단정 금지: 검색 누락/표기 변경 가능성과 구분 불가하므로 repeal 은 제개정구분명 기준만) - MST == watermark = 변경 없음 - 제개정구분명에 '폐지' = repeal, 그 외 = amend """ if hit is None: return None if watermark and hit["mst"] == watermark: return None kind = "repeal" if (hit.get("revision_type") or "").find("폐지") >= 0 else "amend" return ChangeEvent( family_id=act_family_id, kind=kind, new_version_key=hit["mst"], title=act_title, promulgation_date=hit.get("promulgation_date"), effective_date=hit.get("effective_date"), revision_type=hit.get("revision_type"), ) def _article_markdown(art: ET.Element) -> str: """조문단위 1건 → 텍스트. 조문내용(이미 '제N조(제목) ...' 형태) + 항/호/목 전체. 메타 필드(조문번호/조문여부/조문시행일자 등)는 제외 — 조문내용과 항 서브트리만. """ parts = [] body = (art.findtext("조문내용") or "").strip() if body: parts.append(body) for hang in art.findall("항"): text = "\n".join(t.strip() for t in hang.itertext() if t.strip()) if text: parts.append(text) return "\n".join(parts) def parse_service_payloads(xml_text: str, official_title: str, mst: str) -> list[VersionPayload]: """lawService 전문 XML → VersionPayload 리스트 (순수 함수 — fixture 테스트 대상). 스냅샷 의미론: 응답에 있는 별표가 그 버전의 별표 전체 (R7-M3 fixture 판정). - primary 1건: 전 조문 markdown (조문여부 != '조문' 행 = 장/절 헤더 → '## ' 처리) - annex N건: 별표단위별 — version_key = 'MST|{별표번호}-{가지번호}' (zero-padded 그대로) """ root = ET.fromstring(xml_text) base = root.find(".//기본정보") prom = (base.findtext("공포일자") or "").strip() or None if base is not None else None eff = (base.findtext("시행일자") or "").strip() or None if base is not None else None lines: list[str] = [f"# {official_title}", ""] for art in root.findall(".//조문단위"): is_article = (art.findtext("조문여부") or "").strip() == "조문" text = _article_markdown(art) if not text: continue if is_article: lines.append(f"### {text}" if not text.startswith("제") else text) else: lines.append(f"## {text}") lines.append("") primary_content = "\n".join(lines).strip() payloads = [VersionPayload( law_doc_kind="primary", version_key=mst, title=official_title, content=primary_content, promulgation_date=prom, effective_date=eff, )] for annex in root.findall(".//별표단위"): no = (annex.findtext("별표번호") or "").strip() sub = (annex.findtext("별표가지번호") or "").strip() or "00" kind = (annex.findtext("별표구분") or "별표").strip() # 별표 / 서식 — 별도 차원! a_title = (annex.findtext("별표제목") or "").strip() a_body = (annex.findtext("별표내용") or "").strip() if not no: continue # 삭제 tombstone — KR 은 별표/서식 삭제가 absence 가 아니라 '삭제 <날짜>' 명시 행 # (fixture 실측: 산안기준규칙 서식1·2). 내용 없는 tombstone 은 적재 skip. # 시리즈의 구버전 current 잔존 처리 = PR③ 관찰 후보 (absence 추론은 불요 확정). if a_title.startswith("삭제") and len(a_body) < 50: continue label = f"{kind}{int(no)}" + (f"의{int(sub)}" if sub not in ("", "0", "00") else "") payloads.append(VersionPayload( law_doc_kind="annex", # 구분 차원 포함 — (번호,가지)만으로는 별표1 vs 서식1 충돌 (fixture 실측) version_key=f"{mst}|{kind}{no}-{sub}", title=f"{official_title} {label} {a_title}".strip(), content=f"# {official_title} {label}\n## {a_title}\n\n{a_body}".strip(), promulgation_date=prom, effective_date=eff, annex_label=label, )) return payloads async def fetch_version(client: httpx.AsyncClient, act, change: ChangeEvent) -> list[VersionPayload]: """전문 1콜 → payload 리스트 (R2-m1 판정: lawjosub 조 단위 호출 안 함 — 853조 폭증 회피).""" resp = await client.get( LAW_SERVICE_URL, params={"OC": _oc(), "target": "law", "MST": change.new_version_key, "type": "XML"}, headers={"User-Agent": CRAWL_UA}, ) resp.raise_for_status() payloads = parse_service_payloads(resp.text, act.title, change.new_version_key) if not payloads or len(payloads[0].content) < 200: # 파싱 검증 floor — 미달 시 예외 = 워터마크 미영속 (재시도 가능 상태 유지) raise ValueError(f"전문 파싱 결과 빈약 ({act.family_id}): payloads={len(payloads)}") return payloads async def poll_changes(client: httpx.AsyncClient, watch_rows: list) -> list[ChangeEvent]: """워치리스트 행별 lawSearch diff. 행 단위 실패 격리 (한 법령 실패가 나머지를 막지 않음).""" oc = _oc() events: list[ChangeEvent] = [] for act in watch_rows: try: resp = await client.get( LAW_SEARCH_URL, params={"OC": oc, "target": "law", "type": "XML", "query": act.title}, headers={"User-Agent": CRAWL_UA}, ) resp.raise_for_status() hit = parse_search_hit(resp.text, act.title) if hit is None: # fail-loud: 정식명 미매칭 = 표기 변경/검색 누락 의심 — 침묵 skip 금지 logger.warning(f"[statute-kr] 목록 미매칭: {act.family_id} {act.title!r}") else: ev = detect_change(hit, act.family_id, act.title, act.watermark) if ev: events.append(ev) except Exception as e: logger.error(f"[statute-kr] poll 실패 ({act.family_id}): {type(e).__name__}: {e!r}") await asyncio.sleep(_POLL_DELAY_S) return events