a28f12b12e
plan safety-library-1 B-1 PR① (fixture-first): - law.go.kr 라이브 fixture 5종 박제 (OC 새니타이즈 검증 — 응답 법령상세링크에 키 포함 함정) - R7-M3 판정: 전문 1콜 XML = 조문 853+별표 23 전체 스냅샷(부분 실패 개념 없음) + 별표번호/가지번호 = 구조화 필드 — 조문 취득 = 전문 1콜+로컬 파싱 확정(R2-m1) - legal_acts KR 시드 26행(법령ID 라이브 실측, watch=26 전부, FK 계열 9그룹) ★ '유해ㆍ위험작업...' 정식명 = 가운뎃점 — law_monitor 하드코딩(점 없음)은 영구 미매칭 잠복 - statute_adapters/kr.py: poll_changes(lawSearch MST diff) — 순수 파서 분리, fixture 테스트 8/8 - statute_collector.py: 관찰 전용 코어(워터마크 영속 0 — ingest=PR②), 스케줄 미등록(R8-B1) - main.py: law_monitor 스케줄 제거 — 버전 체인 밖 레거시 매일 증식의 유일 경로 차단 Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
77 lines
3.3 KiB
Python
77 lines
3.3 KiB
Python
"""statute_collector — 법령 수집 코어 (plan safety-library-1 B-1).
|
|
|
|
PR① 범위 (본 파일 현재 상태) = poll_changes 관찰 전용:
|
|
legal_acts 워치리스트(KR, watch=true) 순회 → 어댑터 poll_changes → 감지 이벤트 로그.
|
|
- 워터마크 영속 안 함 (계약: '파싱 검증 통과 후에만' — ingest 는 PR②.
|
|
여기서 영속하면 PR② 가 이 변경들을 영원히 못 봄)
|
|
- 스케줄 미등록 (PR② 에서 잡 코드 통째 — 승격·supersede·스윕·repeal — 와 함께 등록.
|
|
R8-B1: 승격과 스윕의 PR 분리 = 배포 갭 동안 이중 노출 무음 윈도)
|
|
- jurisdiction 불변식: 어댑터 상수 == 행 jurisdiction assert (파싱 추론 금지)
|
|
|
|
PR② 에서 추가될 것 (카드 = 스펙): fetch_version(payload 리스트) + ingest 4축 주입
|
|
(material_type='law'/jurisdiction=어댑터 상수/published_date=COALESCE(시행,공포)/
|
|
license=public_domain) + 생애주기 잡(버전 시리즈 단위 승격·supersede + 상태 기반
|
|
레거시 스윕 + repeal — 한 트랜잭션, KST) + 26 family 부트스트랩(kind='bootstrap',
|
|
extract_meta.backfill=true) + 법령명 매핑 단위 테스트.
|
|
|
|
수동 실행: docker compose exec -T fastapi python -m workers.statute_collector
|
|
"""
|
|
|
|
import asyncio
|
|
|
|
import httpx
|
|
from sqlalchemy import select
|
|
|
|
from core.database import async_session
|
|
from core.utils import setup_logger
|
|
from models.legal_act import LegalAct
|
|
from workers.statute_adapters import kr
|
|
|
|
logger = setup_logger("statute_collector")
|
|
|
|
# jurisdiction → 어댑터 모듈 (Phase 1 = KR 단독, 해외는 B-5 게이트 뒤)
|
|
_ADAPTERS = {"KR": kr}
|
|
|
|
|
|
async def poll_once() -> int:
|
|
"""워치리스트 1회 폴링 — 감지 이벤트 수 반환 (관찰 전용, 상태 변경 0)."""
|
|
async with async_session() as session:
|
|
result = await session.execute(
|
|
select(LegalAct).where(LegalAct.watch.is_(True))
|
|
.order_by(LegalAct.family_id)
|
|
)
|
|
rows = result.scalars().all()
|
|
|
|
if not rows:
|
|
logger.warning("[statute] 워치리스트 비어 있음 — 시드(migration 356) 미적용?")
|
|
return 0
|
|
|
|
total = 0
|
|
by_jur: dict[str, list] = {}
|
|
for row in rows:
|
|
by_jur.setdefault(row.jurisdiction, []).append(row)
|
|
|
|
async with httpx.AsyncClient(timeout=30) as client:
|
|
for jur, acts in by_jur.items():
|
|
adapter = _ADAPTERS.get(jur)
|
|
if adapter is None:
|
|
logger.warning(f"[statute] 어댑터 없는 jurisdiction skip: {jur} ({len(acts)}건)")
|
|
continue
|
|
# jurisdiction 불변식 — 어댑터 상수와 행 값 일치 (적재 전 단계에서도 동일 규율)
|
|
assert adapter.JURISDICTION == jur, f"어댑터/행 jurisdiction 불일치: {adapter.JURISDICTION} != {jur}"
|
|
events = await adapter.poll_changes(client, acts)
|
|
for ev in events:
|
|
logger.info(
|
|
f"[statute] 변경 감지 ({ev.kind}): {ev.family_id} {ev.title} "
|
|
f"MST={ev.new_version_key} 공포={ev.promulgation_date} "
|
|
f"시행={ev.effective_date} 구분={ev.revision_type}"
|
|
)
|
|
total += len(events)
|
|
|
|
logger.info(f"[statute] poll 완료 — 워치 {len(rows)}건 중 변경 {total}건 (관찰 전용, 영속 0)")
|
|
return total
|
|
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(poll_once())
|