Files
hyungi_document_server/app/core/database.py
T
hyungi 274d2009c4 fix(migration): fresh DB/DR 부트스트랩 깨짐 3건 수정 (validator 오탐 + multi-statement)
verification env(ephemeral postgres + init_db) 실측으로 fresh DB 부트스트랩이 359~376 replay 중 깨지는 3건 발견·수정:
1. _validate_sql_content 가 인라인 주석(SQL -- ...) 미제거 → 365 의 '-- commit 시 ...' 설명주석을 트랜잭션 제어문 오탐. 줄별 -- 이후 제거.
2. raw '"schema_migrations" in sql.lower()' 체크도 주석 미제외 → 365 의 '-- ... schema_migrations 건드리지 않음' 오탐. _validate_sql_content 로 통합(주석 제외).
3. 마이그 루프가 exec_driver_sql(prepared)이라 multi-statement(365=테이블+시드+인덱스) 불허 → baseline 적재와 동일한 raw asyncpg simple execute 로 통일.
(에이전트가 P0로 본 320/326 enum-same-txn 은 오탐 — baseline 0358 이 이미 방어.)

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-27 07:00:32 +09:00

213 lines
8.8 KiB
Python

"""PostgreSQL 연결 — SQLAlchemy async engine + session factory"""
import logging
import re
import time
from pathlib import Path
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
from sqlalchemy.orm import DeclarativeBase
from core.config import settings
logger = logging.getLogger("migration")
engine = create_async_engine(
settings.database_url,
echo=False,
pool_size=10,
max_overflow=20,
)
async_session = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
class Base(DeclarativeBase):
pass
# NOTE: 모든 pending migration은 단일 트랜잭션으로 실행됨.
# DDL이 많거나 대량 데이터 변경이 포함된 migration은 장시간 lock을 유발할 수 있음.
_MIGRATION_VERSION_RE = re.compile(r"^(\d+)_")
_MIGRATION_LOCK_KEY = 938475
def _parse_migration_files(migrations_dir: Path) -> list[tuple[int, str, Path]]:
"""migration 파일 스캔 → (version, name, path) 리스트, 버전순 정렬"""
files = []
for p in sorted(migrations_dir.glob("*.sql")):
m = _MIGRATION_VERSION_RE.match(p.name)
if not m:
continue
version = int(m.group(1))
files.append((version, p.name, p))
# 중복 버전 검사
seen: dict[int, str] = {}
for version, name, _ in files:
if version in seen:
raise RuntimeError(
f"migration 버전 중복: {seen[version]} vs {name} (version={version})"
)
seen[version] = name
files.sort(key=lambda x: x[0])
return files
def _validate_sql_content(name: str, sql: str) -> None:
"""migration SQL에 BEGIN/COMMIT이 포함되어 있으면 에러 (외부 트랜잭션 깨짐 방지)"""
# 주석(전체 줄 + 인라인 `-- ...`) 제거 후 검사. ★인라인 주석을 안 지우면 설명 주석의
# 'commit/begin' 단어(예 365_scan_jobs 의 `-- commit 시 documents.title 로 전파`)를
# 트랜잭션 제어문으로 false-positive 로 잡아 fresh DB/DR 부트스트랩이 깨진다(verification
# 실측 2026-06). 줄별로 `--` 이후를 잘라 주석 텍스트를 검사에서 제외.
cleaned = [re.sub(r"--.*$", "", line) for line in sql.splitlines()]
stripped = "\n".join(cleaned).upper()
for keyword in ("BEGIN", "COMMIT", "ROLLBACK"):
# 단어 경계로 매칭 (예: BEGIN_SOMETHING은 제외)
if re.search(rf"\b{keyword}\b", stripped):
raise RuntimeError(
f"migration {name}{keyword} 포함됨 — "
f"migration SQL에는 트랜잭션 제어문을 넣지 마세요"
)
# schema_migrations 수정 금지 (runner 가 스탬프 관리) — 주석 제외(stripped) 검사.
# (구: _run_migrations 의 raw `"schema_migrations" in sql.lower()` 가 주석 미제외라
# 365 의 '-- ... schema_migrations 를 건드리지 않음' 주석을 false-positive 로 잡았음.)
if "SCHEMA_MIGRATIONS" in stripped:
raise RuntimeError(
f"Migration {name} must not modify schema_migrations table"
)
# R1: baseline 스냅샷이 대표하는 마지막 마이그레이션 버전 (이하 버전은 baseline 에 포함).
# 새 baseline 재생성 시 이 값을 갱신한다 (migrations/_baseline/<cutoff>_schema_baseline.sql).
_BASELINE_CUTOFF = 358
async def _load_baseline_if_fresh(conn, migrations_dir: Path) -> None:
"""fresh DB(documents 부재)면 baseline 스키마 스냅샷 적재 + schema_migrations 1..cutoff 스탬프.
기존 DB(documents 존재)는 즉시 반환 — baseline 미적재, 무영향. baseline 파일 부재 시도
기존 replay 경로 유지(하위호환).
"""
from sqlalchemy import text
baseline_dir = migrations_dir / "_baseline"
baseline_files = (
sorted(baseline_dir.glob("*_schema_baseline.sql")) if baseline_dir.is_dir() else []
)
if not baseline_files:
return
docs_exists = (
await conn.execute(text("SELECT to_regclass('public.documents') IS NOT NULL"))
).scalar()
if docs_exists:
return # 기존 DB — baseline skip
baseline_path = baseline_files[-1]
logger.info(f"[migration] fresh DB 감지 — baseline 적재: {baseline_path.name}")
# baseline 은 multi-statement 덤프 — exec_driver_sql(asyncpg prepared)은 multi-statement
# 불허("cannot insert multiple commands into a prepared statement"). raw asyncpg 의 simple
# 프로토콜 execute() 로 적재한다(같은 connection = 현재 트랜잭션 내). psql 스모크는 이 제약을
# 못 잡으므로 init_db 런타임 검증으로 확인됨.
raw = await conn.get_raw_connection()
await raw.driver_connection.execute(baseline_path.read_text(encoding="utf-8"))
# baseline = cutoff 까지의 스키마 → 실제 파일 버전 기준으로 schema_migrations 스탬프.
versions = [v for v, _, _ in _parse_migration_files(migrations_dir) if v <= _BASELINE_CUTOFF]
for v in versions:
await conn.execute(
text(
"INSERT INTO schema_migrations (version, name) "
"VALUES (:v, :n) ON CONFLICT DO NOTHING"
),
{"v": v, "n": f"baseline:{v}"},
)
logger.info(
f"[migration] baseline 적재 + schema_migrations {len(versions)}건 스탬프 (cutoff {_BASELINE_CUTOFF})"
)
async def _run_migrations(conn) -> None:
"""미적용 migration 실행 (호출자가 트랜잭션 관리)"""
from sqlalchemy import text
# schema_migrations 테이블 생성
await conn.execute(text("""
CREATE TABLE IF NOT EXISTS schema_migrations (
version INT PRIMARY KEY,
name TEXT NOT NULL,
applied_at TIMESTAMPTZ DEFAULT NOW()
)
"""))
# advisory lock 획득 (트랜잭션 끝나면 자동 해제)
await conn.execute(text(
f"SELECT pg_advisory_xact_lock({_MIGRATION_LOCK_KEY})"
))
# migration 파일 스캔
# /app/core/database.py → parent.parent = /app → /app/migrations (volume mount 위치)
migrations_dir = Path(__file__).resolve().parent.parent / "migrations"
if not migrations_dir.is_dir():
logger.info("[migration] migrations/ 디렉토리 없음, 스킵")
return
# R1: fresh DB(documents 부재)면 baseline 스냅샷 먼저 적재 + schema_migrations 스탬프.
# migrations/ 전체 replay 는 누적 비-replayable(011 view 의존·326 enum-same-txn 등)로
# 깨지므로 신규/DR 환경은 prod 스키마 스냅샷에서 출발한다. 기존 DB 는 skip(무영향).
await _load_baseline_if_fresh(conn, migrations_dir)
# 적용 이력 조회 (baseline 스탬프 반영 — fresh DB 는 1..cutoff 가 이미 applied)
result = await conn.execute(text("SELECT version FROM schema_migrations"))
applied = {row[0] for row in result}
files = _parse_migration_files(migrations_dir)
pending = [(v, name, path) for v, name, path in files if v not in applied]
if not pending:
logger.info("[migration] 미적용 migration 없음")
return
start = time.monotonic()
logger.info(f"[migration] {len(pending)}건 적용 시작")
for version, name, path in pending:
sql = path.read_text(encoding="utf-8")
_validate_sql_content(name, sql) # BEGIN/COMMIT + schema_migrations 검사(주석 제외)
logger.info(f"[migration] {name} 실행 중...")
# raw asyncpg simple 프로토콜로 실행 — baseline 적재(_load_baseline_if_fresh)와 동일.
# ★exec_driver_sql 은 prepared 프로토콜이라 multi-statement 불허("cannot insert multiple
# commands into a prepared statement"). 365_scan_jobs 처럼 테이블+시드+인덱스를 한 파일에
# 담은 마이그(컨벤션상 1-statement 권장이나 이미 prod 적재)도 fresh DB/DR replay 되게
# simple execute 사용. text() :name 콜론-binding 이슈도 동일하게 회피(raw 전달).
raw = await conn.get_raw_connection()
await raw.driver_connection.execute(sql)
await conn.execute(
text("INSERT INTO schema_migrations (version, name) VALUES (:v, :n)"),
{"v": version, "n": name},
)
logger.info(f"[migration] {name} 완료")
elapsed = time.monotonic() - start
logger.info(f"[migration] 전체 {len(pending)}건 완료 ({elapsed:.1f}s)")
async def init_db():
"""DB 연결 확인 + pending migration 실행"""
from sqlalchemy import text
async with engine.begin() as conn:
await conn.execute(text("SELECT 1"))
try:
await _run_migrations(conn)
except Exception as e:
logger.error(f"[migration] 실패: {e} — 전체 트랜잭션 롤백")
raise
async def get_session() -> AsyncSession:
"""FastAPI Depends용 세션 제공"""
async with async_session() as session:
yield session