0d3c841577
R0 가 입증했듯 migrations/ 전체 replay 는 011(view active_documents 가 documents.embedding 의존, DROP COLUMN CASCADE 부재)·326(enum-same-txn) 등 누적 비-replayable 로 깨져 신규/DR 환경 init_db 부팅이 불가능했다. 표준 squash baseline 로 해소: - migrations/_baseline/0358_schema_baseline.sql: prod 스키마 스냅샷(pg_dump --schema-only --no-owner --no-privileges, psql 메타·search_path='' 정리 = asyncpg exec_driver_sql 호환). - init_db._load_baseline_if_fresh: documents 테이블 부재(fresh) 시 baseline 적재 + schema_migrations 1..358 스탬프 → 이후 post-baseline(359/360)만 적용. ★기존 DB(documents 존재)는 skip = prod 무영향(additive). baseline 부재 시 기존 replay 경로(하위호환). - migration_smoke: baseline 경로 검증. ★실측 — 이전 FAIL(011 abort) → 이제 FRESH/INCREMENTAL 모두 PASS (pg16.14). cutoff(_BASELINE_CUTOFF=358) 갱신 시 baseline 재생성. 검증: py_compile + migration_smoke PASS. ★boot-path 변경이라 deploy 전 staging 부팅 검증 필수. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
202 lines
7.3 KiB
Python
202 lines
7.3 KiB
Python
"""PostgreSQL 연결 — SQLAlchemy async engine + session factory"""
|
|
|
|
import logging
|
|
import re
|
|
import time
|
|
from pathlib import Path
|
|
|
|
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
|
|
from sqlalchemy.orm import DeclarativeBase
|
|
|
|
from core.config import settings
|
|
|
|
logger = logging.getLogger("migration")
|
|
|
|
engine = create_async_engine(
|
|
settings.database_url,
|
|
echo=False,
|
|
pool_size=10,
|
|
max_overflow=20,
|
|
)
|
|
|
|
async_session = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
|
|
|
|
|
|
class Base(DeclarativeBase):
|
|
pass
|
|
|
|
|
|
# NOTE: 모든 pending migration은 단일 트랜잭션으로 실행됨.
|
|
# DDL이 많거나 대량 데이터 변경이 포함된 migration은 장시간 lock을 유발할 수 있음.
|
|
_MIGRATION_VERSION_RE = re.compile(r"^(\d+)_")
|
|
_MIGRATION_LOCK_KEY = 938475
|
|
|
|
|
|
def _parse_migration_files(migrations_dir: Path) -> list[tuple[int, str, Path]]:
|
|
"""migration 파일 스캔 → (version, name, path) 리스트, 버전순 정렬"""
|
|
files = []
|
|
for p in sorted(migrations_dir.glob("*.sql")):
|
|
m = _MIGRATION_VERSION_RE.match(p.name)
|
|
if not m:
|
|
continue
|
|
version = int(m.group(1))
|
|
files.append((version, p.name, p))
|
|
|
|
# 중복 버전 검사
|
|
seen: dict[int, str] = {}
|
|
for version, name, _ in files:
|
|
if version in seen:
|
|
raise RuntimeError(
|
|
f"migration 버전 중복: {seen[version]} vs {name} (version={version})"
|
|
)
|
|
seen[version] = name
|
|
|
|
files.sort(key=lambda x: x[0])
|
|
return files
|
|
|
|
|
|
def _validate_sql_content(name: str, sql: str) -> None:
|
|
"""migration SQL에 BEGIN/COMMIT이 포함되어 있으면 에러 (외부 트랜잭션 깨짐 방지)"""
|
|
# 주석(-- ...) 라인 제거 후 검사
|
|
lines = [
|
|
line for line in sql.splitlines()
|
|
if not line.strip().startswith("--")
|
|
]
|
|
stripped = "\n".join(lines).upper()
|
|
for keyword in ("BEGIN", "COMMIT", "ROLLBACK"):
|
|
# 단어 경계로 매칭 (예: BEGIN_SOMETHING은 제외)
|
|
if re.search(rf"\b{keyword}\b", stripped):
|
|
raise RuntimeError(
|
|
f"migration {name}에 {keyword} 포함됨 — "
|
|
f"migration SQL에는 트랜잭션 제어문을 넣지 마세요"
|
|
)
|
|
|
|
|
|
# R1: baseline 스냅샷이 대표하는 마지막 마이그레이션 버전 (이하 버전은 baseline 에 포함).
|
|
# 새 baseline 재생성 시 이 값을 갱신한다 (migrations/_baseline/<cutoff>_schema_baseline.sql).
|
|
_BASELINE_CUTOFF = 358
|
|
|
|
|
|
async def _load_baseline_if_fresh(conn, migrations_dir: Path) -> None:
|
|
"""fresh DB(documents 부재)면 baseline 스키마 스냅샷 적재 + schema_migrations 1..cutoff 스탬프.
|
|
|
|
기존 DB(documents 존재)는 즉시 반환 — baseline 미적재, 무영향. baseline 파일 부재 시도
|
|
기존 replay 경로 유지(하위호환).
|
|
"""
|
|
from sqlalchemy import text
|
|
|
|
baseline_dir = migrations_dir / "_baseline"
|
|
baseline_files = (
|
|
sorted(baseline_dir.glob("*_schema_baseline.sql")) if baseline_dir.is_dir() else []
|
|
)
|
|
if not baseline_files:
|
|
return
|
|
|
|
docs_exists = (
|
|
await conn.execute(text("SELECT to_regclass('public.documents') IS NOT NULL"))
|
|
).scalar()
|
|
if docs_exists:
|
|
return # 기존 DB — baseline skip
|
|
|
|
baseline_path = baseline_files[-1]
|
|
logger.info(f"[migration] fresh DB 감지 — baseline 적재: {baseline_path.name}")
|
|
await conn.exec_driver_sql(baseline_path.read_text(encoding="utf-8"))
|
|
# baseline = cutoff 까지의 스키마 → 실제 파일 버전 기준으로 schema_migrations 스탬프.
|
|
versions = [v for v, _, _ in _parse_migration_files(migrations_dir) if v <= _BASELINE_CUTOFF]
|
|
for v in versions:
|
|
await conn.execute(
|
|
text(
|
|
"INSERT INTO schema_migrations (version, name) "
|
|
"VALUES (:v, :n) ON CONFLICT DO NOTHING"
|
|
),
|
|
{"v": v, "n": f"baseline:{v}"},
|
|
)
|
|
logger.info(
|
|
f"[migration] baseline 적재 + schema_migrations {len(versions)}건 스탬프 (cutoff {_BASELINE_CUTOFF})"
|
|
)
|
|
|
|
|
|
async def _run_migrations(conn) -> None:
|
|
"""미적용 migration 실행 (호출자가 트랜잭션 관리)"""
|
|
from sqlalchemy import text
|
|
|
|
# schema_migrations 테이블 생성
|
|
await conn.execute(text("""
|
|
CREATE TABLE IF NOT EXISTS schema_migrations (
|
|
version INT PRIMARY KEY,
|
|
name TEXT NOT NULL,
|
|
applied_at TIMESTAMPTZ DEFAULT NOW()
|
|
)
|
|
"""))
|
|
|
|
# advisory lock 획득 (트랜잭션 끝나면 자동 해제)
|
|
await conn.execute(text(
|
|
f"SELECT pg_advisory_xact_lock({_MIGRATION_LOCK_KEY})"
|
|
))
|
|
|
|
# migration 파일 스캔
|
|
# /app/core/database.py → parent.parent = /app → /app/migrations (volume mount 위치)
|
|
migrations_dir = Path(__file__).resolve().parent.parent / "migrations"
|
|
if not migrations_dir.is_dir():
|
|
logger.info("[migration] migrations/ 디렉토리 없음, 스킵")
|
|
return
|
|
|
|
# R1: fresh DB(documents 부재)면 baseline 스냅샷 먼저 적재 + schema_migrations 스탬프.
|
|
# migrations/ 전체 replay 는 누적 비-replayable(011 view 의존·326 enum-same-txn 등)로
|
|
# 깨지므로 신규/DR 환경은 prod 스키마 스냅샷에서 출발한다. 기존 DB 는 skip(무영향).
|
|
await _load_baseline_if_fresh(conn, migrations_dir)
|
|
|
|
# 적용 이력 조회 (baseline 스탬프 반영 — fresh DB 는 1..cutoff 가 이미 applied)
|
|
result = await conn.execute(text("SELECT version FROM schema_migrations"))
|
|
applied = {row[0] for row in result}
|
|
|
|
files = _parse_migration_files(migrations_dir)
|
|
pending = [(v, name, path) for v, name, path in files if v not in applied]
|
|
|
|
if not pending:
|
|
logger.info("[migration] 미적용 migration 없음")
|
|
return
|
|
|
|
start = time.monotonic()
|
|
logger.info(f"[migration] {len(pending)}건 적용 시작")
|
|
|
|
for version, name, path in pending:
|
|
sql = path.read_text(encoding="utf-8")
|
|
_validate_sql_content(name, sql)
|
|
if "schema_migrations" in sql.lower():
|
|
raise ValueError(
|
|
f"Migration {name} must not modify schema_migrations table"
|
|
)
|
|
logger.info(f"[migration] {name} 실행 중...")
|
|
# raw driver SQL 사용 — text() 의 :name bind parameter 해석으로
|
|
# SQL 주석/literal 에 콜론이 들어가면 InvalidRequestError 발생.
|
|
# exec_driver_sql 은 SQL 을 driver(asyncpg) 에 그대로 전달.
|
|
await conn.exec_driver_sql(sql)
|
|
await conn.execute(
|
|
text("INSERT INTO schema_migrations (version, name) VALUES (:v, :n)"),
|
|
{"v": version, "n": name},
|
|
)
|
|
logger.info(f"[migration] {name} 완료")
|
|
|
|
elapsed = time.monotonic() - start
|
|
logger.info(f"[migration] 전체 {len(pending)}건 완료 ({elapsed:.1f}s)")
|
|
|
|
|
|
async def init_db():
|
|
"""DB 연결 확인 + pending migration 실행"""
|
|
from sqlalchemy import text
|
|
|
|
async with engine.begin() as conn:
|
|
await conn.execute(text("SELECT 1"))
|
|
try:
|
|
await _run_migrations(conn)
|
|
except Exception as e:
|
|
logger.error(f"[migration] 실패: {e} — 전체 트랜잭션 롤백")
|
|
raise
|
|
|
|
|
|
async def get_session() -> AsyncSession:
|
|
"""FastAPI Depends용 세션 제공"""
|
|
async with async_session() as session:
|
|
yield session
|