From bacb36924bb44e41da0003b504c6cd0656c24823 Mon Sep 17 00:00:00 2001 From: hyungi Date: Sat, 13 Jun 2026 09:37:51 +0900 Subject: [PATCH] =?UTF-8?q?feat(safety):=20B-1=20PR=E2=91=A1=20=E2=80=94?= =?UTF-8?q?=20fetch=5Fversion(payload=20=EB=A6=AC=EC=8A=A4=ED=8A=B8)=20+?= =?UTF-8?q?=20ingest=204=EC=B6=95=20+=20=EC=83=9D=EC=95=A0=EC=A3=BC?= =?UTF-8?q?=EA=B8=B0=20=EC=9E=A1=20=ED=86=B5=EC=A7=B8=20+=20=EB=B6=80?= =?UTF-8?q?=ED=8A=B8=EC=8A=A4=ED=8A=B8=EB=9E=A9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit plan safety-library-1 B-1 PR② (R8-B1: 승격·supersede·스윕·repeal = 잡 코드 통째 배포): - kr.fetch_version: 전문 1콜 → primary+annex payload 리스트 (R4-M4) ★fixture 가 잡은 결함 2: 별표구분(별표/서식) 차원 누락 시 (번호,가지) 4건 충돌 → version_key='MST|{구분}{번호}-{가지}' / 삭제 tombstone 3건(별표10·서식1·2) skip — KR 별표 삭제 = absence 아닌 명시 tombstone (R7-M3 absence 추론 불요 확정) - ingest: 전 버전 pending 적재 + 4축(law/KR/COALESCE날짜/public_domain) + backfill 마커 - 생애주기 잡: 버전 시리즈 단위 승격·supersede(R7-B1) + 상태 기반 레거시 스윕(primary current 보유 한정) + repeal(레거시 매핑분 포함, R7-M2) — 단일 트랜잭션·KST - 법령명 매핑: 정규화 동등 비교(prefix 금지 — 시행령 오폭 차단), 가운뎃점·공백 흡수 - 워터마크 = 파싱 검증 통과 후에만 / 스케줄 daily 07:00 KST (law_monitor 슬롯 승계) - 테스트 14/14 (매핑 표본·시리즈 키·payload fixture) Co-Authored-By: Claude Fable 5 --- app/main.py | 7 +- app/workers/statute_adapters/__init__.py | 20 +- app/workers/statute_adapters/kr.py | 94 +++++- app/workers/statute_collector.py | 383 ++++++++++++++++++++--- tests/test_statute_lifecycle_units.py | 87 +++++ 5 files changed, 547 insertions(+), 44 deletions(-) create mode 100644 tests/test_statute_lifecycle_units.py diff --git a/app/main.py b/app/main.py index 4d6b6fb..6f6aa8d 100644 --- a/app/main.py +++ b/app/main.py @@ -54,6 +54,7 @@ async def lifespan(app: FastAPI): from workers.digest_worker import run as global_digest_run from workers.file_watcher import watch_inbox from workers.mailplus_archive import run as mailplus_run + from workers.statute_collector import run as statute_run from workers.news_collector import run as news_collector_run from workers.fulltext_worker import reconcile_unresolved as fulltext_reconcile_run from workers.kosha_collector import run as kosha_collector_run @@ -119,9 +120,9 @@ async def lifespan(app: FastAPI): # safety > law > manual 우선순위로 25건씩. 6720 레거시 → 야간당 ~150건 → 약 45일 소화. scheduler.add_job(tier_backfill_run, "interval", minutes=30, id="tier_backfill") # 일일 스케줄 (KST) - # law_monitor 스케줄 제거 (safety-library-1 B-1 PR①, 2026-06-13) — 매일 버전 체인 밖 - # 레거시 스냅샷을 증식하던 유일 경로 차단. 파일은 강등 보존(1사이클 관찰 후 삭제), - # 대체 = statute_collector (스케줄 등록은 PR② 잡 코드와 함께 — R8-B1). + # statute_collector = 구 law_monitor 대체 (safety-library-1 B-1 PR②) — poll→ingest→ + # 생애주기 잡(버전 시리즈 승격·supersede·레거시 스윕·repeal) 통째 (R8-B1). + scheduler.add_job(statute_run, CronTrigger(hour=7, timezone=KST), id="statute_collector") scheduler.add_job(mailplus_run, CronTrigger(hour=7, timezone=KST), id="mailplus_morning") scheduler.add_job(mailplus_run, CronTrigger(hour=18, timezone=KST), id="mailplus_evening") scheduler.add_job(daily_digest_run, CronTrigger(hour=20, timezone=KST), id="daily_digest") diff --git a/app/workers/statute_adapters/__init__.py b/app/workers/statute_adapters/__init__.py index 846444c..d6e1527 100644 --- a/app/workers/statute_adapters/__init__.py +++ b/app/workers/statute_adapters/__init__.py @@ -10,7 +10,7 @@ ChangeEvent.kind: amend / repeal / bootstrap(합성 — PR② 부트스트랩이 동일 ingest 경로 재사용, R6-m2). """ -from dataclasses import dataclass +from dataclasses import dataclass, field @dataclass @@ -23,3 +23,21 @@ class ChangeEvent: promulgation_date: str | None = None # YYYYMMDD effective_date: str | None = None # YYYYMMDD (목록 시행일자 — 조문별 차등 시행 주의) revision_type: str | None = None # 제개정구분명 + + +@dataclass +class VersionPayload: + """fetch_version 산출물 1건 — primary 또는 annex 각자 자기 version_key (R4-M4). + + 전문 1콜 스냅샷 의미론(R7-M3 fixture 판정): 한 응답에서 primary + annex 전부 생성. + annex version_key = 'MST|{별표번호}-{별표가지번호}' (zero-padded 구조화 필드 그대로 — + suffix 문자열 파싱 아닌 필드 기반, R7-B1 a 업그레이드). + """ + law_doc_kind: str # primary / annex + version_key: str + title: str + content: str # 조문/별표 markdown 텍스트 + promulgation_date: str | None = None # YYYYMMDD (본문 기본정보) + effective_date: str | None = None # YYYYMMDD (본문 기본정보 — 목록값과 다를 수 있음) + annex_label: str | None = None # '별표1' / '별표5의2' (표시용) + meta: dict = field(default_factory=dict) diff --git a/app/workers/statute_adapters/kr.py b/app/workers/statute_adapters/kr.py index 321017d..43229da 100644 --- a/app/workers/statute_adapters/kr.py +++ b/app/workers/statute_adapters/kr.py @@ -24,7 +24,7 @@ import httpx from core.crawl_politeness import CRAWL_UA from core.utils import setup_logger -from workers.statute_adapters import ChangeEvent +from workers.statute_adapters import ChangeEvent, VersionPayload logger = setup_logger("statute_kr") @@ -95,6 +95,98 @@ def detect_change(hit: dict | None, act_family_id: str, act_title: str, ) +def _article_markdown(art: ET.Element) -> str: + """조문단위 1건 → 텍스트. 조문내용(이미 '제N조(제목) ...' 형태) + 항/호/목 전체. + + 메타 필드(조문번호/조문여부/조문시행일자 등)는 제외 — 조문내용과 항 서브트리만. + """ + parts = [] + body = (art.findtext("조문내용") or "").strip() + if body: + parts.append(body) + for hang in art.findall("항"): + text = "\n".join(t.strip() for t in hang.itertext() if t.strip()) + if text: + parts.append(text) + return "\n".join(parts) + + +def parse_service_payloads(xml_text: str, official_title: str, mst: str) -> list[VersionPayload]: + """lawService 전문 XML → VersionPayload 리스트 (순수 함수 — fixture 테스트 대상). + + 스냅샷 의미론: 응답에 있는 별표가 그 버전의 별표 전체 (R7-M3 fixture 판정). + - primary 1건: 전 조문 markdown (조문여부 != '조문' 행 = 장/절 헤더 → '## ' 처리) + - annex N건: 별표단위별 — version_key = 'MST|{별표번호}-{가지번호}' (zero-padded 그대로) + """ + root = ET.fromstring(xml_text) + base = root.find(".//기본정보") + prom = (base.findtext("공포일자") or "").strip() or None if base is not None else None + eff = (base.findtext("시행일자") or "").strip() or None if base is not None else None + + lines: list[str] = [f"# {official_title}", ""] + for art in root.findall(".//조문단위"): + is_article = (art.findtext("조문여부") or "").strip() == "조문" + text = _article_markdown(art) + if not text: + continue + if is_article: + lines.append(f"### {text}" if not text.startswith("제") else text) + else: + lines.append(f"## {text}") + lines.append("") + primary_content = "\n".join(lines).strip() + + payloads = [VersionPayload( + law_doc_kind="primary", + version_key=mst, + title=official_title, + content=primary_content, + promulgation_date=prom, + effective_date=eff, + )] + + for annex in root.findall(".//별표단위"): + no = (annex.findtext("별표번호") or "").strip() + sub = (annex.findtext("별표가지번호") or "").strip() or "00" + kind = (annex.findtext("별표구분") or "별표").strip() # 별표 / 서식 — 별도 차원! + a_title = (annex.findtext("별표제목") or "").strip() + a_body = (annex.findtext("별표내용") or "").strip() + if not no: + continue + # 삭제 tombstone — KR 은 별표/서식 삭제가 absence 가 아니라 '삭제 <날짜>' 명시 행 + # (fixture 실측: 산안기준규칙 서식1·2). 내용 없는 tombstone 은 적재 skip. + # 시리즈의 구버전 current 잔존 처리 = PR③ 관찰 후보 (absence 추론은 불요 확정). + if a_title.startswith("삭제") and len(a_body) < 50: + continue + label = f"{kind}{int(no)}" + (f"의{int(sub)}" if sub not in ("", "0", "00") else "") + payloads.append(VersionPayload( + law_doc_kind="annex", + # 구분 차원 포함 — (번호,가지)만으로는 별표1 vs 서식1 충돌 (fixture 실측) + version_key=f"{mst}|{kind}{no}-{sub}", + title=f"{official_title} {label} {a_title}".strip(), + content=f"# {official_title} {label}\n## {a_title}\n\n{a_body}".strip(), + promulgation_date=prom, + effective_date=eff, + annex_label=label, + )) + return payloads + + +async def fetch_version(client: httpx.AsyncClient, act, change: ChangeEvent) -> list[VersionPayload]: + """전문 1콜 → payload 리스트 (R2-m1 판정: lawjosub 조 단위 호출 안 함 — 853조 폭증 회피).""" + resp = await client.get( + LAW_SERVICE_URL, + params={"OC": _oc(), "target": "law", "MST": change.new_version_key, "type": "XML"}, + headers={"User-Agent": CRAWL_UA}, + ) + resp.raise_for_status() + payloads = parse_service_payloads(resp.text, act.title, change.new_version_key) + if not payloads or len(payloads[0].content) < 200: + # 파싱 검증 floor — 미달 시 예외 = 워터마크 미영속 (재시도 가능 상태 유지) + raise ValueError(f"전문 파싱 결과 빈약 ({act.family_id}): payloads={len(payloads)}") + return payloads + + async def poll_changes(client: httpx.AsyncClient, watch_rows: list) -> list[ChangeEvent]: """워치리스트 행별 lawSearch diff. 행 단위 실패 격리 (한 법령 실패가 나머지를 막지 않음).""" oc = _oc() diff --git a/app/workers/statute_collector.py b/app/workers/statute_collector.py index d016d5e..7f402a0 100644 --- a/app/workers/statute_collector.py +++ b/app/workers/statute_collector.py @@ -1,76 +1,381 @@ -"""statute_collector — 법령 수집 코어 (plan safety-library-1 B-1). +"""statute_collector — 법령 수집 코어 (plan safety-library-1 B-1, PR②). -PR① 범위 (본 파일 현재 상태) = poll_changes 관찰 전용: - legal_acts 워치리스트(KR, watch=true) 순회 → 어댑터 poll_changes → 감지 이벤트 로그. - - 워터마크 영속 안 함 (계약: '파싱 검증 통과 후에만' — ingest 는 PR②. - 여기서 영속하면 PR② 가 이 변경들을 영원히 못 봄) - - 스케줄 미등록 (PR② 에서 잡 코드 통째 — 승격·supersede·스윕·repeal — 와 함께 등록. - R8-B1: 승격과 스윕의 PR 분리 = 배포 갭 동안 이중 노출 무음 윈도) - - jurisdiction 불변식: 어댑터 상수 == 행 jurisdiction assert (파싱 추론 금지) +구성 (잡 코드 통째 — R8-B1: 승격과 스윕의 PR 분리 = 배포 갭 이중 노출 윈도): + poll_changes(어댑터) → fetch_version(전문 1콜, payload 리스트) → ingest(전 버전 + pending 적재 + 4축 주입) → 생애주기 잡(버전 시리즈 단위 승격·supersede + 상태 기반 + 레거시 스윕 + repeal — 단일 트랜잭션, KST 기준). -PR② 에서 추가될 것 (카드 = 스펙): fetch_version(payload 리스트) + ingest 4축 주입 -(material_type='law'/jurisdiction=어댑터 상수/published_date=COALESCE(시행,공포)/ -license=public_domain) + 생애주기 잡(버전 시리즈 단위 승격·supersede + 상태 기반 -레거시 스윕 + repeal — 한 트랜잭션, KST) + 26 family 부트스트랩(kind='bootstrap', -extract_meta.backfill=true) + 법령명 매핑 단위 테스트. +핵심 계약 (카드 = 스펙): + - 워터마크 영속 = ingest 파싱 검증 통과 후에만 (실패 시 다음 폴링이 재감지) + - 승격·supersede 단위 = 버전 시리즈 = (family_id, law_doc_kind, annex 식별자) + — R7-B1: family 단위 구현 금지 (annex 승격이 primary 를 소거하는 본문 소실 경로) + - 레거시 스윕 = 상태 기반: 매 잡 실행, primary 시리즈 current 보유 + repeal 미감지 + family 의 법령명 매핑 레거시(law_monitor 스냅샷) 청크 in_corpus=false (멱등) + - 매핑 = 정확 일치 가정 금지: title 의 '법령명 (YYYYMMDD)' 패턴에서 법령명 추출 후 + 정규화(공백·가운뎃점 변형 흡수) **동등** 비교 — prefix 비교 금지 ('산업안전보건법'이 + '산업안전보건법 시행령' 레거시를 오폭하는 경로 차단) + - ingest 4축 (R8-M1): material_type='law' / jurisdiction=어댑터 상수 / + published_date=COALESCE(시행일, 공포일) / license=public_domain(저작권법 제7조) + - 부트스트랩(--bootstrap) = kind='bootstrap' 합성 이벤트, amend 와 동일 경로 + + extract_meta.backfill=true (E-1 게이트 집계 제외 마커) + - 가시성: source_health 성공/실패 기록 (HC.io 는 2026-05-30 알림 레이어 폐기로 부재 — + silent-skip 가드 정신은 crawl-health 보드 + health 행으로 대체) -수동 실행: docker compose exec -T fastapi python -m workers.statute_collector +실행: + 스케줄 = daily 07:00 KST (main.py — 구 law_monitor 슬롯 승계) + 수동 = docker compose exec -T fastapi python -m workers.statute_collector [--bootstrap] """ +import argparse import asyncio +import hashlib +import re +import unicodedata +from datetime import date, datetime, timezone +from zoneinfo import ZoneInfo import httpx -from sqlalchemy import select +from sqlalchemy import select, update from core.database import async_session from core.utils import setup_logger -from models.legal_act import LegalAct +from models.chunk import DocumentChunk +from models.document import Document +from models.legal_act import LegalAct, LegalMeta +from models.news_source import NewsSource +from models.queue import enqueue_stage +from workers.news_collector import _get_or_create_health, _record_failure, _record_success +from workers.statute_adapters import ChangeEvent, VersionPayload from workers.statute_adapters import kr logger = setup_logger("statute_collector") +_KST = ZoneInfo("Asia/Seoul") +_SOURCE_NAME = "KR 법령 (law.go.kr)" +_LICENSE = {"scheme": "public_domain", "redistribute": True, "attribution": "국가법령정보센터"} +_FETCH_DELAY_S = 2.5 # lawService 전문(최대 ~1.3MB) 연속 호출 간격 + # jurisdiction → 어댑터 모듈 (Phase 1 = KR 단독, 해외는 B-5 게이트 뒤) _ADAPTERS = {"KR": kr} -async def poll_once() -> int: - """워치리스트 1회 폴링 — 감지 이벤트 수 반환 (관찰 전용, 상태 변경 0).""" +# ─── 법령명 매핑 (R8-m1: 정확 일치 가정 금지 — 변형 흡수 정규화 + 동등 비교) ─── + +_LEGACY_TITLE_RE = re.compile(r"^(.*?)\s*\((\d{8})\)") + + +def normalize_law_name(name: str) -> str: + """공백·가운뎃점 변형 흡수 — NFC 정규화 후 공백/ㆍ·・ 제거.""" + s = unicodedata.normalize("NFC", name or "") + return re.sub(r"[\sㆍ·・]", "", s) + + +def legacy_law_name(title: str) -> str | None: + """레거시 law_monitor title('법령명 (YYYYMMDD) 섹션')에서 법령명 추출.""" + m = _LEGACY_TITLE_RE.match(title or "") + return m.group(1).strip() if m else None + + +def series_suffix(version_key: str) -> str | None: + """버전 시리즈의 annex 식별자 — version_key 'MST|NNNN-SS' 의 '|' 뒤 (primary=None).""" + return version_key.split("|", 1)[1] if "|" in version_key else None + + +def _to_date(ymd: str | None) -> date | None: + digits = re.sub(r"\D", "", ymd or "") + if len(digits) != 8: + return None + try: + return date(int(digits[:4]), int(digits[4:6]), int(digits[6:8])) + except ValueError: + return None + + +# ─── ingest (전 버전 pending 적재 — R2-B2/R3 계약) ────────────────────────────── + +async def _ingest_payload(session, act: LegalAct, ev: ChangeEvent, + payload: VersionPayload, backfill: bool) -> bool: + """payload 1건 → Document + legal_meta(pending). 반환 = 신규 여부 (dedup 멱등).""" + fhash = hashlib.sha256( + f"statute|{act.jurisdiction}|{act.native_id}|{payload.version_key}".encode() + ).hexdigest()[:32] + existing = await session.execute( + select(Document.id).where(Document.file_hash == fhash).limit(1) + ) + if existing.scalars().first(): + return False + + prom = _to_date(payload.promulgation_date or ev.promulgation_date) + eff = _to_date(payload.effective_date or ev.effective_date) + now = datetime.now(timezone.utc) + extra = {"backfill": True} if backfill else {} + doc = Document( + file_path=f"crawl/statute/{act.family_id}/{payload.version_key.replace('|', '_')}", + file_hash=fhash, + file_format="article", + file_size=len(payload.content.encode()), + file_type="note", + title=f"{payload.title} ({payload.promulgation_date or ev.promulgation_date or ''})".strip(), + extracted_text=payload.content, + extracted_at=now, + extractor_version="statute_kr@law.go.kr", + md_status="skipped", + md_extraction_error="statute: 텍스트 네이티브, markdown 변환 비대상", + source_channel="crawl", + data_origin="external", + review_status="approved", + ai_domain="법령", + ai_sub_group=act.title, + ai_tags=[f"법령/KR/{act.title}"], + # 안전 자료실 ingest 4축 (R8-M1 — classify-skip 경로라 ingest 시점 필수) + material_type="law", + jurisdiction=kr.JURISDICTION, + published_date=eff or prom, + extract_meta={ + "statute": {"family_id": act.family_id, "law_id": act.native_id, + "kind": payload.law_doc_kind, "version_key": payload.version_key, + "annex_label": payload.annex_label, + "event_kind": ev.kind, "revision_type": ev.revision_type}, + "license": dict(_LICENSE), + **extra, + }, + ) + session.add(doc) + await session.flush() + + session.add(LegalMeta( + document_id=doc.id, + family_id=act.family_id, + law_doc_kind=payload.law_doc_kind, + version_key=payload.version_key, + promulgation_date=prom, + effective_date=eff, + version_status="pending", # 전 버전 pending 적재 — 승격은 생애주기 잡만 + )) + # summarize 안 함 (조문 자체가 정본 — 맥미니 부하 0), embed+chunk 만 + await enqueue_stage(session, doc.id, "embed") + await enqueue_stage(session, doc.id, "chunk") + return True + + +# ─── 생애주기 잡 (전이·supersede·스윕·repeal 의 유일한 코드 지점) ──────────────── + +async def _flip_chunks(session, doc_ids: list[int]) -> int: + if not doc_ids: + return 0 + result = await session.execute( + update(DocumentChunk) + .where(DocumentChunk.doc_id.in_(doc_ids), DocumentChunk.in_corpus.is_(True)) + .values(in_corpus=False) + ) + return result.rowcount or 0 + + +async def _legacy_doc_ids(session, act: LegalAct) -> list[int]: + """법령명 매핑 레거시(law_monitor) 문서 id — 정규화 동등 비교 (prefix 금지).""" + result = await session.execute( + select(Document.id, Document.title).where( + Document.source_channel == "law_monitor", + Document.deleted_at.is_(None), + ) + ) + want = normalize_law_name(act.title) + ids = [] + for doc_id, title in result.all(): + name = legacy_law_name(title or "") + if name and normalize_law_name(name) == want: + ids.append(doc_id) + return ids + + +async def run_lifecycle(session) -> dict: + """일 1회 생애주기 잡 — 호출측이 단일 트랜잭션 commit. KST 기준, 멱등.""" + today = datetime.now(_KST).date() + stats = {"promoted": 0, "superseded": 0, "repealed": 0, + "legacy_flipped_docs": 0, "legacy_flipped_chunks": 0} + + acts_result = await session.execute(select(LegalAct).where(LegalAct.watch.is_(True))) + acts = {a.family_id: a for a in acts_result.scalars().all()} + + lm_result = await session.execute( + select(LegalMeta).where(LegalMeta.family_id.in_(list(acts.keys()))) + ) + metas = lm_result.scalars().all() + + # 1) repeal — 마킹된 family: current+pending 전부 repealed + 청크 flip + 레거시 flip (R7-M2) + repeal_families = {fid for fid, a in acts.items() if a.repeal_detected_at is not None} + for fid in repeal_families: + rows = [m for m in metas if m.family_id == fid and m.version_status in ("pending", "current")] + for m in rows: + m.version_status = "repealed" + stats["repealed"] += 1 + await _flip_chunks(session, [m.document_id for m in rows]) + legacy_ids = await _legacy_doc_ids(session, acts[fid]) + stats["legacy_flipped_chunks"] += await _flip_chunks(session, legacy_ids) + + # 2) 승격 + supersede — 버전 시리즈 단위 (R7-B1 a: family 단위 금지) + series: dict[tuple, list[LegalMeta]] = {} + for m in metas: + if m.family_id in repeal_families: + continue + series.setdefault( + (m.family_id, m.law_doc_kind, series_suffix(m.version_key)), [] + ).append(m) + + for key, rows in series.items(): + due = sorted( + (m for m in rows if m.version_status == "pending" + and (m.effective_date or m.promulgation_date) + and (m.effective_date or m.promulgation_date) <= today), + key=lambda m: (m.effective_date or m.promulgation_date), + ) + for m in due: + prev = [c for c in rows if c.version_status == "current" and c is not m] + for c in prev: + c.version_status = "superseded" + stats["superseded"] += 1 + await _flip_chunks(session, [c.document_id for c in prev]) + m.version_status = "current" + stats["promoted"] += 1 + + # 3) 레거시 스윕 — 상태 기반 (R6-B1 a / R7-B1 b: primary 시리즈 current 보유 한정) + for fid, act in acts.items(): + if fid in repeal_families: + continue + has_primary_current = any( + m.family_id == fid and m.law_doc_kind == "primary" and m.version_status == "current" + for m in metas + ) + if not has_primary_current: + continue # R3-B1 ② 내장 — fetch 실패 family 의 레거시 보존 + legacy_ids = await _legacy_doc_ids(session, act) + flipped = await _flip_chunks(session, legacy_ids) + if flipped: + stats["legacy_flipped_docs"] += len(legacy_ids) + stats["legacy_flipped_chunks"] += flipped + + return stats + + +# ─── 메인 런 ───────────────────────────────────────────────────────────────────── + +async def run(bootstrap: bool = False) -> None: + """poll → fetch → ingest(가족 단위 커밋) → 생애주기 잡. 가족 단위 실패 격리.""" async with async_session() as session: result = await session.execute( - select(LegalAct).where(LegalAct.watch.is_(True)) - .order_by(LegalAct.family_id) + select(LegalAct).where(LegalAct.watch.is_(True)).order_by(LegalAct.family_id) ) rows = result.scalars().all() + if not rows: + logger.warning("[statute] 워치리스트 비어 있음 — 시드(migration 356) 미적용?") + return + source = await _get_source(session) + await session.commit() + source_id = source.id - if not rows: - logger.warning("[statute] 워치리스트 비어 있음 — 시드(migration 356) 미적용?") - return 0 - - total = 0 + ingested = 0 + failed = 0 by_jur: dict[str, list] = {} for row in rows: by_jur.setdefault(row.jurisdiction, []).append(row) - async with httpx.AsyncClient(timeout=30) as client: + async with httpx.AsyncClient(timeout=60) as client: for jur, acts in by_jur.items(): adapter = _ADAPTERS.get(jur) if adapter is None: - logger.warning(f"[statute] 어댑터 없는 jurisdiction skip: {jur} ({len(acts)}건)") + logger.warning(f"[statute] 어댑터 없는 jurisdiction skip: {jur}") continue - # jurisdiction 불변식 — 어댑터 상수와 행 값 일치 (적재 전 단계에서도 동일 규율) - assert adapter.JURISDICTION == jur, f"어댑터/행 jurisdiction 불일치: {adapter.JURISDICTION} != {jur}" - events = await adapter.poll_changes(client, acts) - for ev in events: - logger.info( - f"[statute] 변경 감지 ({ev.kind}): {ev.family_id} {ev.title} " - f"MST={ev.new_version_key} 공포={ev.promulgation_date} " - f"시행={ev.effective_date} 구분={ev.revision_type}" - ) - total += len(events) + assert adapter.JURISDICTION == jur, \ + f"어댑터/행 jurisdiction 불일치: {adapter.JURISDICTION} != {jur}" - logger.info(f"[statute] poll 완료 — 워치 {len(rows)}건 중 변경 {total}건 (관찰 전용, 영속 0)") + events = await adapter.poll_changes(client, acts) + acts_by_id = {a.family_id: a for a in acts} + for ev in events: + if bootstrap: + ev.kind = "bootstrap" # 합성 이벤트 — amend 와 동일 경로 (R6-m2) + act_ref = acts_by_id[ev.family_id] + try: + payloads = await adapter.fetch_version(client, act_ref, ev) + async with async_session() as session: + act = await session.get(LegalAct, ev.family_id) + new_docs = 0 + for p in payloads: + if await _ingest_payload(session, act, ev, p, backfill=bootstrap): + new_docs += 1 + # 워터마크 영속 = 파싱 검증(payload floor) 통과 후에만 + act.watermark = ev.new_version_key + if ev.kind == "repeal": + act.repeal_detected_at = datetime.now(timezone.utc) + await session.commit() + ingested += new_docs + logger.info(f"[statute] ingest {ev.family_id} ({ev.kind}): " + f"payload {len(payloads)}건 중 신규 {new_docs}건") + except Exception as e: + failed += 1 + logger.error(f"[statute] ingest 실패 ({ev.family_id}): " + f"{type(e).__name__}: {e!r} — 워터마크 미영속, 다음 폴링 재감지") + await asyncio.sleep(_FETCH_DELAY_S) + + # 생애주기 잡 — 수집 사이클 직후, 단일 트랜잭션 (0-2 ②) + async with async_session() as session: + stats = await run_lifecycle(session) + await session.commit() + logger.info(f"[statute] lifecycle: {stats}") + + # health — fail-loud 가시성 (HC.io 폐기로 보드/health 행이 1차 관측면) + async with async_session() as session: + h = await _get_or_create_health(session, source_id) + now = datetime.now(timezone.utc) + if failed: + _record_failure(h, f"ingest 실패 {failed}건", now) + else: + _record_success(h, ingested, False, now) + await session.commit() + + logger.info(f"[statute] run 완료 — 신규 문서 {ingested}건, 실패 {failed}건" + + (" (bootstrap)" if bootstrap else "")) + + +async def _get_source(session) -> NewsSource: + result = await session.execute(select(NewsSource).where(NewsSource.name == _SOURCE_NAME)) + source = result.scalars().first() + if source is None: + source = NewsSource( + name=_SOURCE_NAME, feed_url=kr.LAW_SEARCH_URL, feed_type="rss", + fetch_method="api", fulltext_policy="none", source_channel="crawl", + category="Safety", language="ko", country="KR", + enabled=False, # 6h 뉴스 사이클 비대상 — 본 워커가 daily 폴링 + ) + session.add(source) + await session.flush() + return source + + +async def poll_once() -> int: + """관찰 전용 폴링 (PR① 잔존 CLI — 상태 변경 0).""" + async with async_session() as session: + result = await session.execute( + select(LegalAct).where(LegalAct.watch.is_(True)).order_by(LegalAct.family_id) + ) + rows = result.scalars().all() + total = 0 + async with httpx.AsyncClient(timeout=30) as client: + events = await kr.poll_changes(client, [r for r in rows if r.jurisdiction == "KR"]) + for ev in events: + logger.info(f"[statute] 변경 감지 ({ev.kind}): {ev.family_id} {ev.title} " + f"MST={ev.new_version_key}") + total = len(events) + logger.info(f"[statute] poll 완료 — 변경 {total}건 (관찰 전용)") return total if __name__ == "__main__": - asyncio.run(poll_once()) + parser = argparse.ArgumentParser() + parser.add_argument("--bootstrap", action="store_true", + help="26 family 현행판 1회 부트스트랩 (backfill 마커, R4-M1)") + parser.add_argument("--poll-only", action="store_true", help="관찰 전용 폴링") + args = parser.parse_args() + if args.poll_only: + asyncio.run(poll_once()) + else: + asyncio.run(run(bootstrap=args.bootstrap)) diff --git a/tests/test_statute_lifecycle_units.py b/tests/test_statute_lifecycle_units.py new file mode 100644 index 0000000..afe2778 --- /dev/null +++ b/tests/test_statute_lifecycle_units.py @@ -0,0 +1,87 @@ +"""B-1 PR② — 매핑·시리즈·payload 순수 단위 테스트 (plan safety-library-1). + +법령명 매핑 단위 테스트 = R8-B1 동반 계약 (검증(PR③) 전에 스윕이 도는 만큼 +매핑은 코드 레벨 선고정). 실 title 표본 = 2026-06-13 prod documents 실측 형태. +""" + +import gzip +import sys +from pathlib import Path + +sys.path.insert(0, str(Path(__file__).parent.parent / "app")) + +from workers.statute_adapters.kr import parse_service_payloads # noqa: E402 +from workers.statute_collector import ( # noqa: E402 + legacy_law_name, + normalize_law_name, + series_suffix, +) + +FIX = Path(__file__).parent / "fixtures" / "statute_kr" + + +# ─── 법령명 매핑 (실 title 표본) ─── + +def test_legacy_law_name_extraction(): + # prod 실측 형태: '법령명 (YYYYMMDD) 섹션' + assert legacy_law_name("건설기술 진흥법 시행규칙 (20260611) 제6장_보칙") == "건설기술 진흥법 시행규칙" + assert legacy_law_name("산업안전보건법 (20260219) 전문") == "산업안전보건법" + assert legacy_law_name("패턴 불일치 제목") is None + + +def test_mapping_equality_not_prefix(): + """prefix 비교 금지 — '산업안전보건법' family 가 시행령 레거시를 오폭하면 안 됨.""" + name = legacy_law_name("산업안전보건법 시행령 (20260324) 제1장") + assert name == "산업안전보건법 시행령" + assert normalize_law_name(name) != normalize_law_name("산업안전보건법") + assert normalize_law_name(name) == normalize_law_name("산업안전보건법 시행령") + + +def test_mapping_absorbs_middle_dot_and_space(): + """가운뎃점·공백 변형 흡수 — 유해ㆍ위험작업(정식) vs 유해위험작업(law_monitor 표기).""" + assert (normalize_law_name("유해ㆍ위험작업의 취업 제한에 관한 규칙") + == normalize_law_name("유해위험작업의 취업 제한에 관한 규칙")) + assert (normalize_law_name("산업안전보건기준에 관한 규칙") + == normalize_law_name("산업안전보건기준에관한규칙")) + + +# ─── 버전 시리즈 식별자 (R7-B1 a) ─── + +def test_series_suffix(): + assert series_suffix("283449") is None # primary + assert series_suffix("273603|별표0001-00") == "별표0001-00" # annex (구분 차원 포함) + assert series_suffix("273603|서식0003-00") == "서식0003-00" + + +# ─── fetch_version payload (fixture — R4-M4 리스트 계약) ─── + +def _read_gz(name: str) -> str: + return gzip.decompress((FIX / name).read_bytes()).decode("utf-8") + + +def test_parse_service_payloads_rule_with_annexes(): + payloads = parse_service_payloads( + _read_gz("lawservice_rule.xml.gz"), "산업안전보건기준에 관한 규칙", "273603") + assert payloads[0].law_doc_kind == "primary" + assert payloads[0].version_key == "273603" + assert len(payloads[0].content) > 100_000 # 853조 본문 + annexes = [p for p in payloads if p.law_doc_kind == "annex"] + # 별표단위 23 중 삭제 tombstone 3 skip(별표10 '삭제 <2023.11.14>'·서식1·2 '삭제 <2012.3.5>') + # — KR 별표/서식 삭제 = absence 아닌 명시 tombstone (R7-M3 absence 추론 불요의 fixture 증거) + assert len(annexes) == 20 + keys = [p.version_key for p in annexes] + assert len(keys) == len(set(keys)), "annex version_key 유일성 (uq_legal_meta_version 전제)" + assert all(k.startswith("273603|") for k in keys) + # 구분 차원 — 별표1 vs 서식N 공존 (fixture 실측: (번호,가지)만으로는 4건 충돌) + assert any("별표" in k for k in keys) and any("서식" in k for k in keys) + + +def test_parse_service_payloads_sanab_no_annex(): + payloads = parse_service_payloads( + _read_gz("lawservice_sanab.xml.gz"), "산업안전보건법", "283449") + assert len(payloads) == 1 # 별표 없는 법령 = primary 단독 + p = payloads[0] + assert p.promulgation_date == "20260219" + assert p.effective_date == "20260601" + assert "제2조(정의)" in p.content # 조문내용 보존 + assert p.content.startswith("# 산업안전보건법")