Files
hyungi_document_server/app/workers/statute_collector.py
T
hyungi bacb36924b feat(safety): B-1 PR② — fetch_version(payload 리스트) + ingest 4축 + 생애주기 잡 통째 + 부트스트랩
plan safety-library-1 B-1 PR② (R8-B1: 승격·supersede·스윕·repeal = 잡 코드 통째 배포):
- kr.fetch_version: 전문 1콜 → primary+annex payload 리스트 (R4-M4)
  ★fixture 가 잡은 결함 2: 별표구분(별표/서식) 차원 누락 시 (번호,가지) 4건 충돌
  → version_key='MST|{구분}{번호}-{가지}' / 삭제 tombstone 3건(별표10·서식1·2) skip
  — KR 별표 삭제 = absence 아닌 명시 tombstone (R7-M3 absence 추론 불요 확정)
- ingest: 전 버전 pending 적재 + 4축(law/KR/COALESCE날짜/public_domain) + backfill 마커
- 생애주기 잡: 버전 시리즈 단위 승격·supersede(R7-B1) + 상태 기반 레거시 스윕(primary
  current 보유 한정) + repeal(레거시 매핑분 포함, R7-M2) — 단일 트랜잭션·KST
- 법령명 매핑: 정규화 동등 비교(prefix 금지 — 시행령 오폭 차단), 가운뎃점·공백 흡수
- 워터마크 = 파싱 검증 통과 후에만 / 스케줄 daily 07:00 KST (law_monitor 슬롯 승계)
- 테스트 14/14 (매핑 표본·시리즈 키·payload fixture)

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-06-13 09:37:51 +09:00

382 lines
17 KiB
Python

"""statute_collector — 법령 수집 코어 (plan safety-library-1 B-1, PR②).
구성 (잡 코드 통째 — R8-B1: 승격과 스윕의 PR 분리 = 배포 갭 이중 노출 윈도):
poll_changes(어댑터) → fetch_version(전문 1콜, payload 리스트) → ingest(전 버전
pending 적재 + 4축 주입) → 생애주기 잡(버전 시리즈 단위 승격·supersede + 상태 기반
레거시 스윕 + repeal — 단일 트랜잭션, KST 기준).
핵심 계약 (카드 = 스펙):
- 워터마크 영속 = ingest 파싱 검증 통과 후에만 (실패 시 다음 폴링이 재감지)
- 승격·supersede 단위 = 버전 시리즈 = (family_id, law_doc_kind, annex 식별자)
— R7-B1: family 단위 구현 금지 (annex 승격이 primary 를 소거하는 본문 소실 경로)
- 레거시 스윕 = 상태 기반: 매 잡 실행, primary 시리즈 current 보유 + repeal 미감지
family 의 법령명 매핑 레거시(law_monitor 스냅샷) 청크 in_corpus=false (멱등)
- 매핑 = 정확 일치 가정 금지: title 의 '법령명 (YYYYMMDD)' 패턴에서 법령명 추출 후
정규화(공백·가운뎃점 변형 흡수) **동등** 비교 — prefix 비교 금지 ('산업안전보건법'
'산업안전보건법 시행령' 레거시를 오폭하는 경로 차단)
- ingest 4축 (R8-M1): material_type='law' / jurisdiction=어댑터 상수 /
published_date=COALESCE(시행일, 공포일) / license=public_domain(저작권법 제7조)
- 부트스트랩(--bootstrap) = kind='bootstrap' 합성 이벤트, amend 와 동일 경로 +
extract_meta.backfill=true (E-1 게이트 집계 제외 마커)
- 가시성: source_health 성공/실패 기록 (HC.io 는 2026-05-30 알림 레이어 폐기로 부재 —
silent-skip 가드 정신은 crawl-health 보드 + health 행으로 대체)
실행:
스케줄 = daily 07:00 KST (main.py — 구 law_monitor 슬롯 승계)
수동 = docker compose exec -T fastapi python -m workers.statute_collector [--bootstrap]
"""
import argparse
import asyncio
import hashlib
import re
import unicodedata
from datetime import date, datetime, timezone
from zoneinfo import ZoneInfo
import httpx
from sqlalchemy import select, update
from core.database import async_session
from core.utils import setup_logger
from models.chunk import DocumentChunk
from models.document import Document
from models.legal_act import LegalAct, LegalMeta
from models.news_source import NewsSource
from models.queue import enqueue_stage
from workers.news_collector import _get_or_create_health, _record_failure, _record_success
from workers.statute_adapters import ChangeEvent, VersionPayload
from workers.statute_adapters import kr
logger = setup_logger("statute_collector")
_KST = ZoneInfo("Asia/Seoul")
_SOURCE_NAME = "KR 법령 (law.go.kr)"
_LICENSE = {"scheme": "public_domain", "redistribute": True, "attribution": "국가법령정보센터"}
_FETCH_DELAY_S = 2.5 # lawService 전문(최대 ~1.3MB) 연속 호출 간격
# jurisdiction → 어댑터 모듈 (Phase 1 = KR 단독, 해외는 B-5 게이트 뒤)
_ADAPTERS = {"KR": kr}
# ─── 법령명 매핑 (R8-m1: 정확 일치 가정 금지 — 변형 흡수 정규화 + 동등 비교) ───
_LEGACY_TITLE_RE = re.compile(r"^(.*?)\s*\((\d{8})\)")
def normalize_law_name(name: str) -> str:
"""공백·가운뎃점 변형 흡수 — NFC 정규화 후 공백/ㆍ·・ 제거."""
s = unicodedata.normalize("NFC", name or "")
return re.sub(r"[\sㆍ·・]", "", s)
def legacy_law_name(title: str) -> str | None:
"""레거시 law_monitor title('법령명 (YYYYMMDD) 섹션')에서 법령명 추출."""
m = _LEGACY_TITLE_RE.match(title or "")
return m.group(1).strip() if m else None
def series_suffix(version_key: str) -> str | None:
"""버전 시리즈의 annex 식별자 — version_key 'MST|NNNN-SS''|' 뒤 (primary=None)."""
return version_key.split("|", 1)[1] if "|" in version_key else None
def _to_date(ymd: str | None) -> date | None:
digits = re.sub(r"\D", "", ymd or "")
if len(digits) != 8:
return None
try:
return date(int(digits[:4]), int(digits[4:6]), int(digits[6:8]))
except ValueError:
return None
# ─── ingest (전 버전 pending 적재 — R2-B2/R3 계약) ──────────────────────────────
async def _ingest_payload(session, act: LegalAct, ev: ChangeEvent,
payload: VersionPayload, backfill: bool) -> bool:
"""payload 1건 → Document + legal_meta(pending). 반환 = 신규 여부 (dedup 멱등)."""
fhash = hashlib.sha256(
f"statute|{act.jurisdiction}|{act.native_id}|{payload.version_key}".encode()
).hexdigest()[:32]
existing = await session.execute(
select(Document.id).where(Document.file_hash == fhash).limit(1)
)
if existing.scalars().first():
return False
prom = _to_date(payload.promulgation_date or ev.promulgation_date)
eff = _to_date(payload.effective_date or ev.effective_date)
now = datetime.now(timezone.utc)
extra = {"backfill": True} if backfill else {}
doc = Document(
file_path=f"crawl/statute/{act.family_id}/{payload.version_key.replace('|', '_')}",
file_hash=fhash,
file_format="article",
file_size=len(payload.content.encode()),
file_type="note",
title=f"{payload.title} ({payload.promulgation_date or ev.promulgation_date or ''})".strip(),
extracted_text=payload.content,
extracted_at=now,
extractor_version="statute_kr@law.go.kr",
md_status="skipped",
md_extraction_error="statute: 텍스트 네이티브, markdown 변환 비대상",
source_channel="crawl",
data_origin="external",
review_status="approved",
ai_domain="법령",
ai_sub_group=act.title,
ai_tags=[f"법령/KR/{act.title}"],
# 안전 자료실 ingest 4축 (R8-M1 — classify-skip 경로라 ingest 시점 필수)
material_type="law",
jurisdiction=kr.JURISDICTION,
published_date=eff or prom,
extract_meta={
"statute": {"family_id": act.family_id, "law_id": act.native_id,
"kind": payload.law_doc_kind, "version_key": payload.version_key,
"annex_label": payload.annex_label,
"event_kind": ev.kind, "revision_type": ev.revision_type},
"license": dict(_LICENSE),
**extra,
},
)
session.add(doc)
await session.flush()
session.add(LegalMeta(
document_id=doc.id,
family_id=act.family_id,
law_doc_kind=payload.law_doc_kind,
version_key=payload.version_key,
promulgation_date=prom,
effective_date=eff,
version_status="pending", # 전 버전 pending 적재 — 승격은 생애주기 잡만
))
# summarize 안 함 (조문 자체가 정본 — 맥미니 부하 0), embed+chunk 만
await enqueue_stage(session, doc.id, "embed")
await enqueue_stage(session, doc.id, "chunk")
return True
# ─── 생애주기 잡 (전이·supersede·스윕·repeal 의 유일한 코드 지점) ────────────────
async def _flip_chunks(session, doc_ids: list[int]) -> int:
if not doc_ids:
return 0
result = await session.execute(
update(DocumentChunk)
.where(DocumentChunk.doc_id.in_(doc_ids), DocumentChunk.in_corpus.is_(True))
.values(in_corpus=False)
)
return result.rowcount or 0
async def _legacy_doc_ids(session, act: LegalAct) -> list[int]:
"""법령명 매핑 레거시(law_monitor) 문서 id — 정규화 동등 비교 (prefix 금지)."""
result = await session.execute(
select(Document.id, Document.title).where(
Document.source_channel == "law_monitor",
Document.deleted_at.is_(None),
)
)
want = normalize_law_name(act.title)
ids = []
for doc_id, title in result.all():
name = legacy_law_name(title or "")
if name and normalize_law_name(name) == want:
ids.append(doc_id)
return ids
async def run_lifecycle(session) -> dict:
"""일 1회 생애주기 잡 — 호출측이 단일 트랜잭션 commit. KST 기준, 멱등."""
today = datetime.now(_KST).date()
stats = {"promoted": 0, "superseded": 0, "repealed": 0,
"legacy_flipped_docs": 0, "legacy_flipped_chunks": 0}
acts_result = await session.execute(select(LegalAct).where(LegalAct.watch.is_(True)))
acts = {a.family_id: a for a in acts_result.scalars().all()}
lm_result = await session.execute(
select(LegalMeta).where(LegalMeta.family_id.in_(list(acts.keys())))
)
metas = lm_result.scalars().all()
# 1) repeal — 마킹된 family: current+pending 전부 repealed + 청크 flip + 레거시 flip (R7-M2)
repeal_families = {fid for fid, a in acts.items() if a.repeal_detected_at is not None}
for fid in repeal_families:
rows = [m for m in metas if m.family_id == fid and m.version_status in ("pending", "current")]
for m in rows:
m.version_status = "repealed"
stats["repealed"] += 1
await _flip_chunks(session, [m.document_id for m in rows])
legacy_ids = await _legacy_doc_ids(session, acts[fid])
stats["legacy_flipped_chunks"] += await _flip_chunks(session, legacy_ids)
# 2) 승격 + supersede — 버전 시리즈 단위 (R7-B1 a: family 단위 금지)
series: dict[tuple, list[LegalMeta]] = {}
for m in metas:
if m.family_id in repeal_families:
continue
series.setdefault(
(m.family_id, m.law_doc_kind, series_suffix(m.version_key)), []
).append(m)
for key, rows in series.items():
due = sorted(
(m for m in rows if m.version_status == "pending"
and (m.effective_date or m.promulgation_date)
and (m.effective_date or m.promulgation_date) <= today),
key=lambda m: (m.effective_date or m.promulgation_date),
)
for m in due:
prev = [c for c in rows if c.version_status == "current" and c is not m]
for c in prev:
c.version_status = "superseded"
stats["superseded"] += 1
await _flip_chunks(session, [c.document_id for c in prev])
m.version_status = "current"
stats["promoted"] += 1
# 3) 레거시 스윕 — 상태 기반 (R6-B1 a / R7-B1 b: primary 시리즈 current 보유 한정)
for fid, act in acts.items():
if fid in repeal_families:
continue
has_primary_current = any(
m.family_id == fid and m.law_doc_kind == "primary" and m.version_status == "current"
for m in metas
)
if not has_primary_current:
continue # R3-B1 ② 내장 — fetch 실패 family 의 레거시 보존
legacy_ids = await _legacy_doc_ids(session, act)
flipped = await _flip_chunks(session, legacy_ids)
if flipped:
stats["legacy_flipped_docs"] += len(legacy_ids)
stats["legacy_flipped_chunks"] += flipped
return stats
# ─── 메인 런 ─────────────────────────────────────────────────────────────────────
async def run(bootstrap: bool = False) -> None:
"""poll → fetch → ingest(가족 단위 커밋) → 생애주기 잡. 가족 단위 실패 격리."""
async with async_session() as session:
result = await session.execute(
select(LegalAct).where(LegalAct.watch.is_(True)).order_by(LegalAct.family_id)
)
rows = result.scalars().all()
if not rows:
logger.warning("[statute] 워치리스트 비어 있음 — 시드(migration 356) 미적용?")
return
source = await _get_source(session)
await session.commit()
source_id = source.id
ingested = 0
failed = 0
by_jur: dict[str, list] = {}
for row in rows:
by_jur.setdefault(row.jurisdiction, []).append(row)
async with httpx.AsyncClient(timeout=60) as client:
for jur, acts in by_jur.items():
adapter = _ADAPTERS.get(jur)
if adapter is None:
logger.warning(f"[statute] 어댑터 없는 jurisdiction skip: {jur}")
continue
assert adapter.JURISDICTION == jur, \
f"어댑터/행 jurisdiction 불일치: {adapter.JURISDICTION} != {jur}"
events = await adapter.poll_changes(client, acts)
acts_by_id = {a.family_id: a for a in acts}
for ev in events:
if bootstrap:
ev.kind = "bootstrap" # 합성 이벤트 — amend 와 동일 경로 (R6-m2)
act_ref = acts_by_id[ev.family_id]
try:
payloads = await adapter.fetch_version(client, act_ref, ev)
async with async_session() as session:
act = await session.get(LegalAct, ev.family_id)
new_docs = 0
for p in payloads:
if await _ingest_payload(session, act, ev, p, backfill=bootstrap):
new_docs += 1
# 워터마크 영속 = 파싱 검증(payload floor) 통과 후에만
act.watermark = ev.new_version_key
if ev.kind == "repeal":
act.repeal_detected_at = datetime.now(timezone.utc)
await session.commit()
ingested += new_docs
logger.info(f"[statute] ingest {ev.family_id} ({ev.kind}): "
f"payload {len(payloads)}건 중 신규 {new_docs}")
except Exception as e:
failed += 1
logger.error(f"[statute] ingest 실패 ({ev.family_id}): "
f"{type(e).__name__}: {e!r} — 워터마크 미영속, 다음 폴링 재감지")
await asyncio.sleep(_FETCH_DELAY_S)
# 생애주기 잡 — 수집 사이클 직후, 단일 트랜잭션 (0-2 ②)
async with async_session() as session:
stats = await run_lifecycle(session)
await session.commit()
logger.info(f"[statute] lifecycle: {stats}")
# health — fail-loud 가시성 (HC.io 폐기로 보드/health 행이 1차 관측면)
async with async_session() as session:
h = await _get_or_create_health(session, source_id)
now = datetime.now(timezone.utc)
if failed:
_record_failure(h, f"ingest 실패 {failed}", now)
else:
_record_success(h, ingested, False, now)
await session.commit()
logger.info(f"[statute] run 완료 — 신규 문서 {ingested}건, 실패 {failed}"
+ (" (bootstrap)" if bootstrap else ""))
async def _get_source(session) -> NewsSource:
result = await session.execute(select(NewsSource).where(NewsSource.name == _SOURCE_NAME))
source = result.scalars().first()
if source is None:
source = NewsSource(
name=_SOURCE_NAME, feed_url=kr.LAW_SEARCH_URL, feed_type="rss",
fetch_method="api", fulltext_policy="none", source_channel="crawl",
category="Safety", language="ko", country="KR",
enabled=False, # 6h 뉴스 사이클 비대상 — 본 워커가 daily 폴링
)
session.add(source)
await session.flush()
return source
async def poll_once() -> int:
"""관찰 전용 폴링 (PR① 잔존 CLI — 상태 변경 0)."""
async with async_session() as session:
result = await session.execute(
select(LegalAct).where(LegalAct.watch.is_(True)).order_by(LegalAct.family_id)
)
rows = result.scalars().all()
total = 0
async with httpx.AsyncClient(timeout=30) as client:
events = await kr.poll_changes(client, [r for r in rows if r.jurisdiction == "KR"])
for ev in events:
logger.info(f"[statute] 변경 감지 ({ev.kind}): {ev.family_id} {ev.title} "
f"MST={ev.new_version_key}")
total = len(events)
logger.info(f"[statute] poll 완료 — 변경 {total}건 (관찰 전용)")
return total
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--bootstrap", action="store_true",
help="26 family 현행판 1회 부트스트랩 (backfill 마커, R4-M1)")
parser.add_argument("--poll-only", action="store_true", help="관찰 전용 폴링")
args = parser.parse_args()
if args.poll_only:
asyncio.run(poll_once())
else:
asyncio.run(run(bootstrap=args.bootstrap))