0c8fb41366
정규식 '(?:' 의 콜론을 text() 가 bind param 으로 해석 (migration 러너 동일 함정). Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
222 lines
10 KiB
Python
222 lines
10 KiB
Python
"""안전 자료실 A-3 백필 — 기존 코퍼스에 material_type/jurisdiction/published_date/license 소급.
|
|
|
|
plan: safety-library-1 A-3 (PKM plans/2026-06-12-safety-library-plan.html)
|
|
선례: backfill_category.py (one-off 멱등 스크립트 — migration 아님, 152 단일 트랜잭션 제약 회피)
|
|
|
|
술어 (2026-06-13 prod 실측 교정 — R2 blocker 반영):
|
|
1. extract_meta.source_id JOIN news_sources → 레지스트리 material_type/country 전파
|
|
(KOSHA 사례 본문·CSB 페이지·HSE·MOEL·JPVT·arXiv·NB·TWI·API 공지 전부 커버.
|
|
paper 는 jurisdiction NULL 강제 — plan 0-1. KOSHA 본문의 kosha.kind='case' 가정은
|
|
실측 부정됨: kind 는 첨부/GUIDE 에만 존재 → source_id JOIN 이 정본 술어)
|
|
2. kosha.kind='case_attachment' → incident/KR
|
|
3. kosha.kind='guide' → guide/KR (+ ofancYmd 'YYYY-MM-DD' 실측)
|
|
4. csb.kind='report_pdf' → incident/US (source_id 없음 — JOIN 비대상)
|
|
5. source_channel='law_monitor' → law/KR (243건. legal_meta 생략 — MST 미보존,
|
|
버전 체인은 B-1 가동 시점부터. published_date = title 의 '(YYYYMMDD)' 공포일 추출 —
|
|
extract_meta 빈값 실측, R3-m1 의 'NULL 허용' 보다 1줄 정규식이 저렴해 채움)
|
|
6. file_path LIKE '%KGS_Code%' → law/KR (frontmatter 키 = 'code' 실측 117/118,
|
|
'kgs_code' 0건. 경로 술어가 더 단순·전수. license 는 B-4 소관 — 미주입)
|
|
|
|
불변식:
|
|
- 전 UPDATE 에 material_type IS NULL 가드 (멱등 — 재실행 안전, A-2 신규 유입분 무접촉)
|
|
- material_type + jurisdiction 동일 statement (law CHECK chk_documents_law_jurisdiction 충족)
|
|
- published_date / license 는 각자 필드 부재 가드 (이미 값 있으면 무접촉)
|
|
- 업로드 Industrial_Safety 문서 = 대상 아님 (LLM 제안+승인 경로만 — 자동 전이 금지)
|
|
- 코퍼스(청크/임베딩) 무접촉 — 검색 지표 무변동이 정상
|
|
|
|
실행:
|
|
docker compose exec -T fastapi python /app/scripts/backfill_material_axis.py --dry-run
|
|
docker compose exec -T fastapi python /app/scripts/backfill_material_axis.py --apply
|
|
"""
|
|
|
|
import argparse
|
|
import asyncio
|
|
import os
|
|
import sys
|
|
|
|
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "app"))
|
|
|
|
# text() 미사용 — exec_driver_sql (정규식 콜론 함정)
|
|
from sqlalchemy.ext.asyncio import create_async_engine
|
|
|
|
# ─── 술어별 (라벨, 카운트 SQL, 적용 SQL) ───────────────────────────────────────
|
|
|
|
_KOSHA_LICENSE = ("kogl", "false", "한국산업안전보건공단(KOSHA)")
|
|
_CSB_LICENSE = ("public_domain", "true", "U.S. Chemical Safety Board")
|
|
_LAW_LICENSE = ("public_domain", "true", "국가법령정보센터")
|
|
|
|
|
|
def _license_obj(scheme: str, redistribute: str, attribution: str) -> str:
|
|
return (
|
|
f"jsonb_build_object('license', jsonb_build_object("
|
|
f"'scheme', '{scheme}', 'redistribute', {redistribute}::boolean, "
|
|
f"'attribution', '{attribution}'))"
|
|
)
|
|
|
|
|
|
STEPS: list[tuple[str, str]] = [
|
|
# 1) 레지스트리 전파 (source_id JOIN)
|
|
("1. src_join material/jurisdiction", """
|
|
UPDATE documents d SET
|
|
material_type = ns.material_type,
|
|
jurisdiction = CASE WHEN ns.material_type = 'paper' THEN NULL ELSE ns.country END
|
|
FROM news_sources ns
|
|
WHERE d.material_type IS NULL AND d.deleted_at IS NULL
|
|
AND d.extract_meta->>'source_id' ~ '^[0-9]+$'
|
|
AND ns.id = (d.extract_meta->>'source_id')::int
|
|
AND ns.material_type IS NOT NULL
|
|
"""),
|
|
# 2) KOSHA 첨부
|
|
("2. kosha 첨부 incident/KR", """
|
|
UPDATE documents SET material_type = 'incident', jurisdiction = 'KR'
|
|
WHERE material_type IS NULL AND deleted_at IS NULL
|
|
AND extract_meta#>>'{kosha,kind}' = 'case_attachment'
|
|
"""),
|
|
# 3) KOSHA GUIDE
|
|
("3. kosha GUIDE guide/KR", """
|
|
UPDATE documents SET material_type = 'guide', jurisdiction = 'KR'
|
|
WHERE material_type IS NULL AND deleted_at IS NULL
|
|
AND extract_meta#>>'{kosha,kind}' = 'guide'
|
|
"""),
|
|
# 4) CSB 보고서 PDF
|
|
("4. csb PDF incident/US", """
|
|
UPDATE documents SET material_type = 'incident', jurisdiction = 'US'
|
|
WHERE material_type IS NULL AND deleted_at IS NULL
|
|
AND extract_meta#>>'{csb,kind}' = 'report_pdf'
|
|
"""),
|
|
# 5) 레거시 law_monitor
|
|
("5. law_monitor law/KR", """
|
|
UPDATE documents SET material_type = 'law', jurisdiction = 'KR'
|
|
WHERE material_type IS NULL AND deleted_at IS NULL
|
|
AND source_channel = 'law_monitor'
|
|
"""),
|
|
# 6) KGS Code watch 폴더
|
|
("6. KGS law/KR", """
|
|
UPDATE documents SET material_type = 'law', jurisdiction = 'KR'
|
|
WHERE material_type IS NULL AND deleted_at IS NULL
|
|
AND file_path LIKE '%KGS_Code%'
|
|
"""),
|
|
# 7) published_date — crawl/news 공통 (extract_meta.published_at ISO)
|
|
("7. published_date (published_at)", """
|
|
UPDATE documents SET published_date = (extract_meta->>'published_at')::date
|
|
WHERE published_date IS NULL AND deleted_at IS NULL
|
|
AND extract_meta->>'published_at' ~ '^\\d{4}-\\d{2}-\\d{2}'
|
|
"""),
|
|
# 8) published_date — KOSHA GUIDE 공표일자 ('YYYY-MM-DD' 실측)
|
|
("8. published_date (GUIDE ofancYmd)", """
|
|
UPDATE documents SET published_date = (extract_meta#>>'{kosha,ofancYmd}')::date
|
|
WHERE published_date IS NULL AND deleted_at IS NULL
|
|
AND extract_meta#>>'{kosha,ofancYmd}' ~ '^\\d{4}-\\d{2}-\\d{2}$'
|
|
"""),
|
|
# 9) published_date — 레거시 law title 공포일 '(YYYYMMDD)'
|
|
("9. published_date (law title 공포일)", """
|
|
UPDATE documents
|
|
SET published_date = to_date(substring(title from '\\((20\\d{2}(?:0[1-9]|1[0-2])(?:0[1-9]|[12]\\d|3[01]))\\)'), 'YYYYMMDD')
|
|
WHERE published_date IS NULL AND deleted_at IS NULL
|
|
AND source_channel = 'law_monitor'
|
|
AND title ~ '\\((20\\d{2}(?:0[1-9]|1[0-2])(?:0[1-9]|[12]\\d|3[01]))\\)'
|
|
"""),
|
|
# 10) license — 레지스트리 전파 (scheme 있는 소스만)
|
|
("10. license (src_join)", """
|
|
UPDATE documents d SET
|
|
extract_meta = COALESCE(d.extract_meta, '{}'::jsonb)
|
|
|| jsonb_build_object('license', jsonb_build_object(
|
|
'scheme', ns.license_scheme,
|
|
'redistribute', COALESCE(ns.license_redistribute, false),
|
|
'attribution', ns.name))
|
|
FROM news_sources ns
|
|
WHERE d.deleted_at IS NULL AND NOT (COALESCE(d.extract_meta, '{}'::jsonb) ? 'license')
|
|
AND d.extract_meta->>'source_id' ~ '^[0-9]+$'
|
|
AND ns.id = (d.extract_meta->>'source_id')::int
|
|
AND ns.license_scheme IS NOT NULL
|
|
"""),
|
|
# 11) license — KOSHA 첨부/GUIDE (source_id 없음)
|
|
("11. license (kosha kinds)", f"""
|
|
UPDATE documents SET
|
|
extract_meta = COALESCE(extract_meta, '{{}}'::jsonb) || {_license_obj(*_KOSHA_LICENSE)}
|
|
WHERE deleted_at IS NULL AND NOT (COALESCE(extract_meta, '{{}}'::jsonb) ? 'license')
|
|
AND extract_meta#>>'{{kosha,kind}}' IN ('case_attachment', 'guide')
|
|
"""),
|
|
# 12) license — CSB PDF
|
|
("12. license (csb PDF)", f"""
|
|
UPDATE documents SET
|
|
extract_meta = COALESCE(extract_meta, '{{}}'::jsonb) || {_license_obj(*_CSB_LICENSE)}
|
|
WHERE deleted_at IS NULL AND NOT (COALESCE(extract_meta, '{{}}'::jsonb) ? 'license')
|
|
AND extract_meta#>>'{{csb,kind}}' = 'report_pdf'
|
|
"""),
|
|
# 13) license — 레거시 법령 (저작권법 제7조 비보호)
|
|
("13. license (law_monitor)", f"""
|
|
UPDATE documents SET
|
|
extract_meta = COALESCE(extract_meta, '{{}}'::jsonb) || {_license_obj(*_LAW_LICENSE)}
|
|
WHERE deleted_at IS NULL AND NOT (COALESCE(extract_meta, '{{}}'::jsonb) ? 'license')
|
|
AND source_channel = 'law_monitor'
|
|
"""),
|
|
]
|
|
|
|
VERIFY_SQL = [
|
|
("축 전수표 (material_type x jurisdiction)", """
|
|
SELECT material_type, jurisdiction, count(*) AS docs,
|
|
count(published_date) AS with_date,
|
|
count(*) FILTER (WHERE extract_meta ? 'license') AS with_license
|
|
FROM documents WHERE material_type IS NOT NULL AND deleted_at IS NULL
|
|
GROUP BY 1, 2 ORDER BY 1, 2
|
|
"""),
|
|
("law & jurisdiction NULL (0 이어야 함 — hard)", """
|
|
SELECT count(*) FROM documents
|
|
WHERE material_type = 'law' AND jurisdiction IS NULL AND deleted_at IS NULL
|
|
"""),
|
|
("잔여 미분류 안전 후보 (kosha/csb 메타 보유인데 NULL — 0 이어야 함)", """
|
|
SELECT count(*) FROM documents
|
|
WHERE material_type IS NULL AND deleted_at IS NULL
|
|
AND (extract_meta ? 'kosha' OR extract_meta ? 'csb')
|
|
"""),
|
|
]
|
|
|
|
|
|
async def main() -> None:
|
|
parser = argparse.ArgumentParser()
|
|
mode = parser.add_mutually_exclusive_group(required=True)
|
|
mode.add_argument("--dry-run", action="store_true",
|
|
help="전 UPDATE 를 트랜잭션 안에서 실행해 정확한 rowcount + 검증표를 보여주고 ROLLBACK (변경 0)")
|
|
mode.add_argument("--apply", action="store_true", help="백필 실행 (단일 트랜잭션 커밋)")
|
|
args = parser.parse_args()
|
|
|
|
db_url = os.getenv(
|
|
"DATABASE_URL", "postgresql+asyncpg://pkm:pkm@localhost:5432/pkm"
|
|
)
|
|
engine = create_async_engine(db_url)
|
|
tag = "apply" if args.apply else "dry-run"
|
|
|
|
async with engine.connect() as conn:
|
|
trans = await conn.begin()
|
|
try:
|
|
for label, sql in STEPS:
|
|
# text() 는 정규식의 '(?:' 콜론을 bind param 으로 오인 (migration 러너와
|
|
# 동일 함정) → driver 직결 실행
|
|
result = await conn.exec_driver_sql(sql)
|
|
print(f"[{tag}] {label}: {result.rowcount}행")
|
|
|
|
print("\n─── 검증 (트랜잭션 내 미리보기) ───")
|
|
for label, sql in VERIFY_SQL:
|
|
result = await conn.exec_driver_sql(sql)
|
|
rows = result.fetchall()
|
|
print(f"\n{label}:")
|
|
for row in rows:
|
|
print(" ", tuple(row))
|
|
|
|
if args.apply:
|
|
await trans.commit()
|
|
print("\n[apply] 커밋 완료")
|
|
else:
|
|
await trans.rollback()
|
|
print("\n[dry-run] 전체 롤백 — 변경 0")
|
|
except Exception:
|
|
await trans.rollback()
|
|
raise
|
|
|
|
await engine.dispose()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(main())
|