Files
hyungi 0c8fb41366 fix(safety): backfill text() 콜론 bind 오인 — exec_driver_sql 로 교체
정규식 '(?:' 의 콜론을 text() 가 bind param 으로 해석 (migration 러너 동일 함정).

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-06-13 06:49:58 +09:00

222 lines
10 KiB
Python

"""안전 자료실 A-3 백필 — 기존 코퍼스에 material_type/jurisdiction/published_date/license 소급.
plan: safety-library-1 A-3 (PKM plans/2026-06-12-safety-library-plan.html)
선례: backfill_category.py (one-off 멱등 스크립트 — migration 아님, 152 단일 트랜잭션 제약 회피)
술어 (2026-06-13 prod 실측 교정 — R2 blocker 반영):
1. extract_meta.source_id JOIN news_sources → 레지스트리 material_type/country 전파
(KOSHA 사례 본문·CSB 페이지·HSE·MOEL·JPVT·arXiv·NB·TWI·API 공지 전부 커버.
paper 는 jurisdiction NULL 강제 — plan 0-1. KOSHA 본문의 kosha.kind='case' 가정은
실측 부정됨: kind 는 첨부/GUIDE 에만 존재 → source_id JOIN 이 정본 술어)
2. kosha.kind='case_attachment' → incident/KR
3. kosha.kind='guide' → guide/KR (+ ofancYmd 'YYYY-MM-DD' 실측)
4. csb.kind='report_pdf' → incident/US (source_id 없음 — JOIN 비대상)
5. source_channel='law_monitor' → law/KR (243건. legal_meta 생략 — MST 미보존,
버전 체인은 B-1 가동 시점부터. published_date = title 의 '(YYYYMMDD)' 공포일 추출 —
extract_meta 빈값 실측, R3-m1 의 'NULL 허용' 보다 1줄 정규식이 저렴해 채움)
6. file_path LIKE '%KGS_Code%' → law/KR (frontmatter 키 = 'code' 실측 117/118,
'kgs_code' 0건. 경로 술어가 더 단순·전수. license 는 B-4 소관 — 미주입)
불변식:
- 전 UPDATE 에 material_type IS NULL 가드 (멱등 — 재실행 안전, A-2 신규 유입분 무접촉)
- material_type + jurisdiction 동일 statement (law CHECK chk_documents_law_jurisdiction 충족)
- published_date / license 는 각자 필드 부재 가드 (이미 값 있으면 무접촉)
- 업로드 Industrial_Safety 문서 = 대상 아님 (LLM 제안+승인 경로만 — 자동 전이 금지)
- 코퍼스(청크/임베딩) 무접촉 — 검색 지표 무변동이 정상
실행:
docker compose exec -T fastapi python /app/scripts/backfill_material_axis.py --dry-run
docker compose exec -T fastapi python /app/scripts/backfill_material_axis.py --apply
"""
import argparse
import asyncio
import os
import sys
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "app"))
# text() 미사용 — exec_driver_sql (정규식 콜론 함정)
from sqlalchemy.ext.asyncio import create_async_engine
# ─── 술어별 (라벨, 카운트 SQL, 적용 SQL) ───────────────────────────────────────
_KOSHA_LICENSE = ("kogl", "false", "한국산업안전보건공단(KOSHA)")
_CSB_LICENSE = ("public_domain", "true", "U.S. Chemical Safety Board")
_LAW_LICENSE = ("public_domain", "true", "국가법령정보센터")
def _license_obj(scheme: str, redistribute: str, attribution: str) -> str:
return (
f"jsonb_build_object('license', jsonb_build_object("
f"'scheme', '{scheme}', 'redistribute', {redistribute}::boolean, "
f"'attribution', '{attribution}'))"
)
STEPS: list[tuple[str, str]] = [
# 1) 레지스트리 전파 (source_id JOIN)
("1. src_join material/jurisdiction", """
UPDATE documents d SET
material_type = ns.material_type,
jurisdiction = CASE WHEN ns.material_type = 'paper' THEN NULL ELSE ns.country END
FROM news_sources ns
WHERE d.material_type IS NULL AND d.deleted_at IS NULL
AND d.extract_meta->>'source_id' ~ '^[0-9]+$'
AND ns.id = (d.extract_meta->>'source_id')::int
AND ns.material_type IS NOT NULL
"""),
# 2) KOSHA 첨부
("2. kosha 첨부 incident/KR", """
UPDATE documents SET material_type = 'incident', jurisdiction = 'KR'
WHERE material_type IS NULL AND deleted_at IS NULL
AND extract_meta#>>'{kosha,kind}' = 'case_attachment'
"""),
# 3) KOSHA GUIDE
("3. kosha GUIDE guide/KR", """
UPDATE documents SET material_type = 'guide', jurisdiction = 'KR'
WHERE material_type IS NULL AND deleted_at IS NULL
AND extract_meta#>>'{kosha,kind}' = 'guide'
"""),
# 4) CSB 보고서 PDF
("4. csb PDF incident/US", """
UPDATE documents SET material_type = 'incident', jurisdiction = 'US'
WHERE material_type IS NULL AND deleted_at IS NULL
AND extract_meta#>>'{csb,kind}' = 'report_pdf'
"""),
# 5) 레거시 law_monitor
("5. law_monitor law/KR", """
UPDATE documents SET material_type = 'law', jurisdiction = 'KR'
WHERE material_type IS NULL AND deleted_at IS NULL
AND source_channel = 'law_monitor'
"""),
# 6) KGS Code watch 폴더
("6. KGS law/KR", """
UPDATE documents SET material_type = 'law', jurisdiction = 'KR'
WHERE material_type IS NULL AND deleted_at IS NULL
AND file_path LIKE '%KGS_Code%'
"""),
# 7) published_date — crawl/news 공통 (extract_meta.published_at ISO)
("7. published_date (published_at)", """
UPDATE documents SET published_date = (extract_meta->>'published_at')::date
WHERE published_date IS NULL AND deleted_at IS NULL
AND extract_meta->>'published_at' ~ '^\\d{4}-\\d{2}-\\d{2}'
"""),
# 8) published_date — KOSHA GUIDE 공표일자 ('YYYY-MM-DD' 실측)
("8. published_date (GUIDE ofancYmd)", """
UPDATE documents SET published_date = (extract_meta#>>'{kosha,ofancYmd}')::date
WHERE published_date IS NULL AND deleted_at IS NULL
AND extract_meta#>>'{kosha,ofancYmd}' ~ '^\\d{4}-\\d{2}-\\d{2}$'
"""),
# 9) published_date — 레거시 law title 공포일 '(YYYYMMDD)'
("9. published_date (law title 공포일)", """
UPDATE documents
SET published_date = to_date(substring(title from '\\((20\\d{2}(?:0[1-9]|1[0-2])(?:0[1-9]|[12]\\d|3[01]))\\)'), 'YYYYMMDD')
WHERE published_date IS NULL AND deleted_at IS NULL
AND source_channel = 'law_monitor'
AND title ~ '\\((20\\d{2}(?:0[1-9]|1[0-2])(?:0[1-9]|[12]\\d|3[01]))\\)'
"""),
# 10) license — 레지스트리 전파 (scheme 있는 소스만)
("10. license (src_join)", """
UPDATE documents d SET
extract_meta = COALESCE(d.extract_meta, '{}'::jsonb)
|| jsonb_build_object('license', jsonb_build_object(
'scheme', ns.license_scheme,
'redistribute', COALESCE(ns.license_redistribute, false),
'attribution', ns.name))
FROM news_sources ns
WHERE d.deleted_at IS NULL AND NOT (COALESCE(d.extract_meta, '{}'::jsonb) ? 'license')
AND d.extract_meta->>'source_id' ~ '^[0-9]+$'
AND ns.id = (d.extract_meta->>'source_id')::int
AND ns.license_scheme IS NOT NULL
"""),
# 11) license — KOSHA 첨부/GUIDE (source_id 없음)
("11. license (kosha kinds)", f"""
UPDATE documents SET
extract_meta = COALESCE(extract_meta, '{{}}'::jsonb) || {_license_obj(*_KOSHA_LICENSE)}
WHERE deleted_at IS NULL AND NOT (COALESCE(extract_meta, '{{}}'::jsonb) ? 'license')
AND extract_meta#>>'{{kosha,kind}}' IN ('case_attachment', 'guide')
"""),
# 12) license — CSB PDF
("12. license (csb PDF)", f"""
UPDATE documents SET
extract_meta = COALESCE(extract_meta, '{{}}'::jsonb) || {_license_obj(*_CSB_LICENSE)}
WHERE deleted_at IS NULL AND NOT (COALESCE(extract_meta, '{{}}'::jsonb) ? 'license')
AND extract_meta#>>'{{csb,kind}}' = 'report_pdf'
"""),
# 13) license — 레거시 법령 (저작권법 제7조 비보호)
("13. license (law_monitor)", f"""
UPDATE documents SET
extract_meta = COALESCE(extract_meta, '{{}}'::jsonb) || {_license_obj(*_LAW_LICENSE)}
WHERE deleted_at IS NULL AND NOT (COALESCE(extract_meta, '{{}}'::jsonb) ? 'license')
AND source_channel = 'law_monitor'
"""),
]
VERIFY_SQL = [
("축 전수표 (material_type x jurisdiction)", """
SELECT material_type, jurisdiction, count(*) AS docs,
count(published_date) AS with_date,
count(*) FILTER (WHERE extract_meta ? 'license') AS with_license
FROM documents WHERE material_type IS NOT NULL AND deleted_at IS NULL
GROUP BY 1, 2 ORDER BY 1, 2
"""),
("law & jurisdiction NULL (0 이어야 함 — hard)", """
SELECT count(*) FROM documents
WHERE material_type = 'law' AND jurisdiction IS NULL AND deleted_at IS NULL
"""),
("잔여 미분류 안전 후보 (kosha/csb 메타 보유인데 NULL — 0 이어야 함)", """
SELECT count(*) FROM documents
WHERE material_type IS NULL AND deleted_at IS NULL
AND (extract_meta ? 'kosha' OR extract_meta ? 'csb')
"""),
]
async def main() -> None:
parser = argparse.ArgumentParser()
mode = parser.add_mutually_exclusive_group(required=True)
mode.add_argument("--dry-run", action="store_true",
help="전 UPDATE 를 트랜잭션 안에서 실행해 정확한 rowcount + 검증표를 보여주고 ROLLBACK (변경 0)")
mode.add_argument("--apply", action="store_true", help="백필 실행 (단일 트랜잭션 커밋)")
args = parser.parse_args()
db_url = os.getenv(
"DATABASE_URL", "postgresql+asyncpg://pkm:pkm@localhost:5432/pkm"
)
engine = create_async_engine(db_url)
tag = "apply" if args.apply else "dry-run"
async with engine.connect() as conn:
trans = await conn.begin()
try:
for label, sql in STEPS:
# text() 는 정규식의 '(?:' 콜론을 bind param 으로 오인 (migration 러너와
# 동일 함정) → driver 직결 실행
result = await conn.exec_driver_sql(sql)
print(f"[{tag}] {label}: {result.rowcount}")
print("\n─── 검증 (트랜잭션 내 미리보기) ───")
for label, sql in VERIFY_SQL:
result = await conn.exec_driver_sql(sql)
rows = result.fetchall()
print(f"\n{label}:")
for row in rows:
print(" ", tuple(row))
if args.apply:
await trans.commit()
print("\n[apply] 커밋 완료")
else:
await trans.rollback()
print("\n[dry-run] 전체 롤백 — 변경 0")
except Exception:
await trans.rollback()
raise
await engine.dispose()
if __name__ == "__main__":
asyncio.run(main())