diff --git a/scripts/backfill_material_axis.py b/scripts/backfill_material_axis.py new file mode 100644 index 0000000..a27fd80 --- /dev/null +++ b/scripts/backfill_material_axis.py @@ -0,0 +1,219 @@ +"""안전 자료실 A-3 백필 — 기존 코퍼스에 material_type/jurisdiction/published_date/license 소급. + +plan: safety-library-1 A-3 (PKM plans/2026-06-12-safety-library-plan.html) +선례: backfill_category.py (one-off 멱등 스크립트 — migration 아님, 152 단일 트랜잭션 제약 회피) + +술어 (2026-06-13 prod 실측 교정 — R2 blocker 반영): + 1. extract_meta.source_id JOIN news_sources → 레지스트리 material_type/country 전파 + (KOSHA 사례 본문·CSB 페이지·HSE·MOEL·JPVT·arXiv·NB·TWI·API 공지 전부 커버. + paper 는 jurisdiction NULL 강제 — plan 0-1. KOSHA 본문의 kosha.kind='case' 가정은 + 실측 부정됨: kind 는 첨부/GUIDE 에만 존재 → source_id JOIN 이 정본 술어) + 2. kosha.kind='case_attachment' → incident/KR + 3. kosha.kind='guide' → guide/KR (+ ofancYmd 'YYYY-MM-DD' 실측) + 4. csb.kind='report_pdf' → incident/US (source_id 없음 — JOIN 비대상) + 5. source_channel='law_monitor' → law/KR (243건. legal_meta 생략 — MST 미보존, + 버전 체인은 B-1 가동 시점부터. published_date = title 의 '(YYYYMMDD)' 공포일 추출 — + extract_meta 빈값 실측, R3-m1 의 'NULL 허용' 보다 1줄 정규식이 저렴해 채움) + 6. file_path LIKE '%KGS_Code%' → law/KR (frontmatter 키 = 'code' 실측 117/118, + 'kgs_code' 0건. 경로 술어가 더 단순·전수. license 는 B-4 소관 — 미주입) + +불변식: + - 전 UPDATE 에 material_type IS NULL 가드 (멱등 — 재실행 안전, A-2 신규 유입분 무접촉) + - material_type + jurisdiction 동일 statement (law CHECK chk_documents_law_jurisdiction 충족) + - published_date / license 는 각자 필드 부재 가드 (이미 값 있으면 무접촉) + - 업로드 Industrial_Safety 문서 = 대상 아님 (LLM 제안+승인 경로만 — 자동 전이 금지) + - 코퍼스(청크/임베딩) 무접촉 — 검색 지표 무변동이 정상 + +실행: + docker compose exec -T fastapi python /app/scripts/backfill_material_axis.py --dry-run + docker compose exec -T fastapi python /app/scripts/backfill_material_axis.py --apply +""" + +import argparse +import asyncio +import os +import sys + +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "app")) + +from sqlalchemy import text +from sqlalchemy.ext.asyncio import create_async_engine + +# ─── 술어별 (라벨, 카운트 SQL, 적용 SQL) ─────────────────────────────────────── + +_KOSHA_LICENSE = ("kogl", "false", "한국산업안전보건공단(KOSHA)") +_CSB_LICENSE = ("public_domain", "true", "U.S. Chemical Safety Board") +_LAW_LICENSE = ("public_domain", "true", "국가법령정보센터") + + +def _license_obj(scheme: str, redistribute: str, attribution: str) -> str: + return ( + f"jsonb_build_object('license', jsonb_build_object(" + f"'scheme', '{scheme}', 'redistribute', {redistribute}::boolean, " + f"'attribution', '{attribution}'))" + ) + + +STEPS: list[tuple[str, str]] = [ + # 1) 레지스트리 전파 (source_id JOIN) + ("1. src_join material/jurisdiction", """ + UPDATE documents d SET + material_type = ns.material_type, + jurisdiction = CASE WHEN ns.material_type = 'paper' THEN NULL ELSE ns.country END + FROM news_sources ns + WHERE d.material_type IS NULL AND d.deleted_at IS NULL + AND d.extract_meta->>'source_id' ~ '^[0-9]+$' + AND ns.id = (d.extract_meta->>'source_id')::int + AND ns.material_type IS NOT NULL + """), + # 2) KOSHA 첨부 + ("2. kosha 첨부 incident/KR", """ + UPDATE documents SET material_type = 'incident', jurisdiction = 'KR' + WHERE material_type IS NULL AND deleted_at IS NULL + AND extract_meta#>>'{kosha,kind}' = 'case_attachment' + """), + # 3) KOSHA GUIDE + ("3. kosha GUIDE guide/KR", """ + UPDATE documents SET material_type = 'guide', jurisdiction = 'KR' + WHERE material_type IS NULL AND deleted_at IS NULL + AND extract_meta#>>'{kosha,kind}' = 'guide' + """), + # 4) CSB 보고서 PDF + ("4. csb PDF incident/US", """ + UPDATE documents SET material_type = 'incident', jurisdiction = 'US' + WHERE material_type IS NULL AND deleted_at IS NULL + AND extract_meta#>>'{csb,kind}' = 'report_pdf' + """), + # 5) 레거시 law_monitor + ("5. law_monitor law/KR", """ + UPDATE documents SET material_type = 'law', jurisdiction = 'KR' + WHERE material_type IS NULL AND deleted_at IS NULL + AND source_channel = 'law_monitor' + """), + # 6) KGS Code watch 폴더 + ("6. KGS law/KR", """ + UPDATE documents SET material_type = 'law', jurisdiction = 'KR' + WHERE material_type IS NULL AND deleted_at IS NULL + AND file_path LIKE '%KGS_Code%' + """), + # 7) published_date — crawl/news 공통 (extract_meta.published_at ISO) + ("7. published_date (published_at)", """ + UPDATE documents SET published_date = (extract_meta->>'published_at')::date + WHERE published_date IS NULL AND deleted_at IS NULL + AND extract_meta->>'published_at' ~ '^\\d{4}-\\d{2}-\\d{2}' + """), + # 8) published_date — KOSHA GUIDE 공표일자 ('YYYY-MM-DD' 실측) + ("8. published_date (GUIDE ofancYmd)", """ + UPDATE documents SET published_date = (extract_meta#>>'{kosha,ofancYmd}')::date + WHERE published_date IS NULL AND deleted_at IS NULL + AND extract_meta#>>'{kosha,ofancYmd}' ~ '^\\d{4}-\\d{2}-\\d{2}$' + """), + # 9) published_date — 레거시 law title 공포일 '(YYYYMMDD)' + ("9. published_date (law title 공포일)", """ + UPDATE documents + SET published_date = to_date(substring(title from '\\((20\\d{2}(?:0[1-9]|1[0-2])(?:0[1-9]|[12]\\d|3[01]))\\)'), 'YYYYMMDD') + WHERE published_date IS NULL AND deleted_at IS NULL + AND source_channel = 'law_monitor' + AND title ~ '\\((20\\d{2}(?:0[1-9]|1[0-2])(?:0[1-9]|[12]\\d|3[01]))\\)' + """), + # 10) license — 레지스트리 전파 (scheme 있는 소스만) + ("10. license (src_join)", """ + UPDATE documents d SET + extract_meta = COALESCE(d.extract_meta, '{}'::jsonb) + || jsonb_build_object('license', jsonb_build_object( + 'scheme', ns.license_scheme, + 'redistribute', COALESCE(ns.license_redistribute, false), + 'attribution', ns.name)) + FROM news_sources ns + WHERE d.deleted_at IS NULL AND NOT (COALESCE(d.extract_meta, '{}'::jsonb) ? 'license') + AND d.extract_meta->>'source_id' ~ '^[0-9]+$' + AND ns.id = (d.extract_meta->>'source_id')::int + AND ns.license_scheme IS NOT NULL + """), + # 11) license — KOSHA 첨부/GUIDE (source_id 없음) + ("11. license (kosha kinds)", f""" + UPDATE documents SET + extract_meta = COALESCE(extract_meta, '{{}}'::jsonb) || {_license_obj(*_KOSHA_LICENSE)} + WHERE deleted_at IS NULL AND NOT (COALESCE(extract_meta, '{{}}'::jsonb) ? 'license') + AND extract_meta#>>'{{kosha,kind}}' IN ('case_attachment', 'guide') + """), + # 12) license — CSB PDF + ("12. license (csb PDF)", f""" + UPDATE documents SET + extract_meta = COALESCE(extract_meta, '{{}}'::jsonb) || {_license_obj(*_CSB_LICENSE)} + WHERE deleted_at IS NULL AND NOT (COALESCE(extract_meta, '{{}}'::jsonb) ? 'license') + AND extract_meta#>>'{{csb,kind}}' = 'report_pdf' + """), + # 13) license — 레거시 법령 (저작권법 제7조 비보호) + ("13. license (law_monitor)", f""" + UPDATE documents SET + extract_meta = COALESCE(extract_meta, '{{}}'::jsonb) || {_license_obj(*_LAW_LICENSE)} + WHERE deleted_at IS NULL AND NOT (COALESCE(extract_meta, '{{}}'::jsonb) ? 'license') + AND source_channel = 'law_monitor' + """), +] + +VERIFY_SQL = [ + ("축 전수표 (material_type x jurisdiction)", """ + SELECT material_type, jurisdiction, count(*) AS docs, + count(published_date) AS with_date, + count(*) FILTER (WHERE extract_meta ? 'license') AS with_license + FROM documents WHERE material_type IS NOT NULL AND deleted_at IS NULL + GROUP BY 1, 2 ORDER BY 1, 2 + """), + ("law & jurisdiction NULL (0 이어야 함 — hard)", """ + SELECT count(*) FROM documents + WHERE material_type = 'law' AND jurisdiction IS NULL AND deleted_at IS NULL + """), + ("잔여 미분류 안전 후보 (kosha/csb 메타 보유인데 NULL — 0 이어야 함)", """ + SELECT count(*) FROM documents + WHERE material_type IS NULL AND deleted_at IS NULL + AND (extract_meta ? 'kosha' OR extract_meta ? 'csb') + """), +] + + +async def main() -> None: + parser = argparse.ArgumentParser() + mode = parser.add_mutually_exclusive_group(required=True) + mode.add_argument("--dry-run", action="store_true", + help="전 UPDATE 를 트랜잭션 안에서 실행해 정확한 rowcount + 검증표를 보여주고 ROLLBACK (변경 0)") + mode.add_argument("--apply", action="store_true", help="백필 실행 (단일 트랜잭션 커밋)") + args = parser.parse_args() + + db_url = os.getenv( + "DATABASE_URL", "postgresql+asyncpg://pkm:pkm@localhost:5432/pkm" + ) + engine = create_async_engine(db_url) + tag = "apply" if args.apply else "dry-run" + + async with engine.connect() as conn: + trans = await conn.begin() + try: + for label, sql in STEPS: + result = await conn.execute(text(sql)) + print(f"[{tag}] {label}: {result.rowcount}행") + + print("\n─── 검증 (트랜잭션 내 미리보기) ───") + for label, sql in VERIFY_SQL: + result = await conn.execute(text(sql)) + rows = result.fetchall() + print(f"\n{label}:") + for row in rows: + print(" ", tuple(row)) + + if args.apply: + await trans.commit() + print("\n[apply] 커밋 완료") + else: + await trans.rollback() + print("\n[dry-run] 전체 롤백 — 변경 0") + except Exception: + await trans.rollback() + raise + + await engine.dispose() + + +if __name__ == "__main__": + asyncio.run(main())