feat(migrations): 스키마 baseline 스냅샷 — fresh-DB/DR 부팅 fix (R1)
R0 가 입증했듯 migrations/ 전체 replay 는 011(view active_documents 가 documents.embedding 의존, DROP COLUMN CASCADE 부재)·326(enum-same-txn) 등 누적 비-replayable 로 깨져 신규/DR 환경 init_db 부팅이 불가능했다. 표준 squash baseline 로 해소: - migrations/_baseline/0358_schema_baseline.sql: prod 스키마 스냅샷(pg_dump --schema-only --no-owner --no-privileges, psql 메타·search_path='' 정리 = asyncpg exec_driver_sql 호환). - init_db._load_baseline_if_fresh: documents 테이블 부재(fresh) 시 baseline 적재 + schema_migrations 1..358 스탬프 → 이후 post-baseline(359/360)만 적용. ★기존 DB(documents 존재)는 skip = prod 무영향(additive). baseline 부재 시 기존 replay 경로(하위호환). - migration_smoke: baseline 경로 검증. ★실측 — 이전 FAIL(011 abort) → 이제 FRESH/INCREMENTAL 모두 PASS (pg16.14). cutoff(_BASELINE_CUTOFF=358) 갱신 시 baseline 재생성. 검증: py_compile + migration_smoke PASS. ★boot-path 변경이라 deploy 전 staging 부팅 검증 필수. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
+53
-4
@@ -72,6 +72,50 @@ def _validate_sql_content(name: str, sql: str) -> None:
|
||||
)
|
||||
|
||||
|
||||
# R1: baseline 스냅샷이 대표하는 마지막 마이그레이션 버전 (이하 버전은 baseline 에 포함).
|
||||
# 새 baseline 재생성 시 이 값을 갱신한다 (migrations/_baseline/<cutoff>_schema_baseline.sql).
|
||||
_BASELINE_CUTOFF = 358
|
||||
|
||||
|
||||
async def _load_baseline_if_fresh(conn, migrations_dir: Path) -> None:
|
||||
"""fresh DB(documents 부재)면 baseline 스키마 스냅샷 적재 + schema_migrations 1..cutoff 스탬프.
|
||||
|
||||
기존 DB(documents 존재)는 즉시 반환 — baseline 미적재, 무영향. baseline 파일 부재 시도
|
||||
기존 replay 경로 유지(하위호환).
|
||||
"""
|
||||
from sqlalchemy import text
|
||||
|
||||
baseline_dir = migrations_dir / "_baseline"
|
||||
baseline_files = (
|
||||
sorted(baseline_dir.glob("*_schema_baseline.sql")) if baseline_dir.is_dir() else []
|
||||
)
|
||||
if not baseline_files:
|
||||
return
|
||||
|
||||
docs_exists = (
|
||||
await conn.execute(text("SELECT to_regclass('public.documents') IS NOT NULL"))
|
||||
).scalar()
|
||||
if docs_exists:
|
||||
return # 기존 DB — baseline skip
|
||||
|
||||
baseline_path = baseline_files[-1]
|
||||
logger.info(f"[migration] fresh DB 감지 — baseline 적재: {baseline_path.name}")
|
||||
await conn.exec_driver_sql(baseline_path.read_text(encoding="utf-8"))
|
||||
# baseline = cutoff 까지의 스키마 → 실제 파일 버전 기준으로 schema_migrations 스탬프.
|
||||
versions = [v for v, _, _ in _parse_migration_files(migrations_dir) if v <= _BASELINE_CUTOFF]
|
||||
for v in versions:
|
||||
await conn.execute(
|
||||
text(
|
||||
"INSERT INTO schema_migrations (version, name) "
|
||||
"VALUES (:v, :n) ON CONFLICT DO NOTHING"
|
||||
),
|
||||
{"v": v, "n": f"baseline:{v}"},
|
||||
)
|
||||
logger.info(
|
||||
f"[migration] baseline 적재 + schema_migrations {len(versions)}건 스탬프 (cutoff {_BASELINE_CUTOFF})"
|
||||
)
|
||||
|
||||
|
||||
async def _run_migrations(conn) -> None:
|
||||
"""미적용 migration 실행 (호출자가 트랜잭션 관리)"""
|
||||
from sqlalchemy import text
|
||||
@@ -90,10 +134,6 @@ async def _run_migrations(conn) -> None:
|
||||
f"SELECT pg_advisory_xact_lock({_MIGRATION_LOCK_KEY})"
|
||||
))
|
||||
|
||||
# 적용 이력 조회
|
||||
result = await conn.execute(text("SELECT version FROM schema_migrations"))
|
||||
applied = {row[0] for row in result}
|
||||
|
||||
# migration 파일 스캔
|
||||
# /app/core/database.py → parent.parent = /app → /app/migrations (volume mount 위치)
|
||||
migrations_dir = Path(__file__).resolve().parent.parent / "migrations"
|
||||
@@ -101,6 +141,15 @@ async def _run_migrations(conn) -> None:
|
||||
logger.info("[migration] migrations/ 디렉토리 없음, 스킵")
|
||||
return
|
||||
|
||||
# R1: fresh DB(documents 부재)면 baseline 스냅샷 먼저 적재 + schema_migrations 스탬프.
|
||||
# migrations/ 전체 replay 는 누적 비-replayable(011 view 의존·326 enum-same-txn 등)로
|
||||
# 깨지므로 신규/DR 환경은 prod 스키마 스냅샷에서 출발한다. 기존 DB 는 skip(무영향).
|
||||
await _load_baseline_if_fresh(conn, migrations_dir)
|
||||
|
||||
# 적용 이력 조회 (baseline 스탬프 반영 — fresh DB 는 1..cutoff 가 이미 applied)
|
||||
result = await conn.execute(text("SELECT version FROM schema_migrations"))
|
||||
applied = {row[0] for row in result}
|
||||
|
||||
files = _parse_migration_files(migrations_dir)
|
||||
pending = [(v, name, path) for v, name, path in files if v not in applied]
|
||||
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -77,28 +77,41 @@ run_scenario() {
|
||||
fi
|
||||
}
|
||||
|
||||
scenario_fresh() {
|
||||
reset_db
|
||||
emit_single_txn "${MIGS[@]}" | psql_exec -d "$DB"
|
||||
}
|
||||
BASELINE_CUTOFF=358
|
||||
BASELINE_FILE="$MIG_DIR/_baseline/0358_schema_baseline.sql"
|
||||
|
||||
scenario_dr() {
|
||||
reset_db
|
||||
local phase1=() phase2=() f base ver p1out p1rc
|
||||
# post-baseline(버전 > cutoff) 마이그 파일만 출력
|
||||
_post_baseline() {
|
||||
local f base ver
|
||||
for f in "${MIGS[@]}"; do
|
||||
base="$(basename "$f")"; ver="${base%%_*}"; ver="$((10#$ver))"
|
||||
if [ "$ver" -le 319 ]; then phase1+=("$f"); else phase2+=("$f"); fi
|
||||
[ "$ver" -gt "$BASELINE_CUTOFF" ] && printf '%s\n' "$f"
|
||||
done
|
||||
# phase1: 001~319 자동커밋 (과거 운영 DB = 타입/값 모두 커밋된 상태)
|
||||
p1out="$( emit_autocommit "${phase1[@]}" 2>/dev/null | psql_exec -d "$DB" 2>&1 )"; p1rc=$?
|
||||
if [ "$p1rc" -ne 0 ]; then
|
||||
local p1last; p1last="$(printf '%s\n' "$p1out" | grep '>>>APPLY' | tail -1 | sed 's/>>>APPLY //')"
|
||||
printf '%s\n' ">>>APPLY ${p1last}" # run_scenario 가 마지막 마커를 읽도록 전달
|
||||
printf '%s\n' "$p1out" | grep -iE 'ERROR|unsafe|DETAIL' | head -2
|
||||
return 1
|
||||
}
|
||||
|
||||
# FRESH — init_db fresh 경로 미러: baseline 적재 + post-baseline 을 단일 트랜잭션
|
||||
scenario_fresh() {
|
||||
reset_db
|
||||
local post=(); while IFS= read -r f; do post+=("$f"); done < <(_post_baseline)
|
||||
{
|
||||
echo '\set ON_ERROR_STOP on'; echo 'BEGIN;'
|
||||
echo "\\echo >>>APPLY _baseline"
|
||||
cat "$BASELINE_FILE"; echo
|
||||
for f in "${post[@]}"; do
|
||||
echo "\\echo >>>APPLY $(basename "$f")"; cat "$f"; echo
|
||||
done
|
||||
echo 'COMMIT;'
|
||||
} | psql_exec -d "$DB"
|
||||
}
|
||||
|
||||
# INCREMENTAL — 기존 운영 DB(at cutoff) 모사: baseline 커밋 후 post-baseline 을 별 트랜잭션
|
||||
scenario_dr() {
|
||||
reset_db
|
||||
if ! { echo '\set ON_ERROR_STOP on'; cat "$BASELINE_FILE"; } | psql_exec -d "$DB" >/dev/null 2>&1; then
|
||||
printf '%s\n' ">>>APPLY _baseline"; echo "baseline 적재 실패"; return 1
|
||||
fi
|
||||
# phase2: 320~end 단일 트랜잭션 (catch-up 업그레이드)
|
||||
emit_single_txn "${phase2[@]}" 2>/dev/null | psql_exec -d "$DB"
|
||||
local post=(); while IFS= read -r f; do post+=("$f"); done < <(_post_baseline)
|
||||
emit_single_txn "${post[@]}" 2>/dev/null | psql_exec -d "$DB"
|
||||
}
|
||||
|
||||
# ── 컨테이너 기동 ──
|
||||
@@ -109,17 +122,17 @@ echo "pg: $(docker exec "$CNAME" psql -U postgres -tAc 'show server_version' 2>/
|
||||
echo
|
||||
|
||||
fail=0
|
||||
echo "── FRESH (빈 DB 단일 트랜잭션) ──"
|
||||
echo "── FRESH (baseline 적재 + post-baseline 단일 트랜잭션 = init_db fresh 경로) ──"
|
||||
run_scenario FRESH scenario_fresh || fail=1
|
||||
echo
|
||||
echo "── DR (001~319 커밋 후 320~end 단일 트랜잭션) ──"
|
||||
echo "── INCREMENTAL (baseline 커밋 후 post-baseline 별 트랜잭션 = 기존 DB 증분) ──"
|
||||
run_scenario DR scenario_dr || fail=1
|
||||
echo
|
||||
|
||||
if [ "$fail" -eq 0 ]; then
|
||||
echo "RESULT: PASS — 빈 DB/DR 모두 단일 트랜잭션 적용 가능 (enum-barrier 적용됨)"
|
||||
echo "RESULT: PASS — fresh/incremental 모두 baseline+post-baseline 적용 가능"
|
||||
exit 0
|
||||
else
|
||||
echo "RESULT: FAIL — 위 지점에서 단일 트랜잭션 적용 불가 (enum-same-txn 등 미수정)"
|
||||
echo "RESULT: FAIL — baseline/post-baseline 적용 불가 (위 지점)"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
Reference in New Issue
Block a user