from datetime import datetime, timedelta from typing import Optional from jose import JWTError, jwt from passlib.context import CryptContext from sqlalchemy.orm import Session import os from database.models import User, UserRole from database.schemas import TokenData # 환경 변수 SECRET_KEY = os.getenv("SECRET_KEY", "your-secret-key-here") ALGORITHM = os.getenv("ALGORITHM", "HS256") ACCESS_TOKEN_EXPIRE_MINUTES = int(os.getenv("ACCESS_TOKEN_EXPIRE_MINUTES", "10080")) # 7 days # 비밀번호 암호화 (pbkdf2_sha256 사용 - bcrypt 문제 회피) pwd_context = CryptContext( schemes=["pbkdf2_sha256"], deprecated="auto" ) def verify_password(plain_password: str, hashed_password: str) -> bool: """비밀번호 검증 (pbkdf2_sha256 - 길이 제한 없음)""" return pwd_context.verify(plain_password, hashed_password) def get_password_hash(password: str) -> str: """비밀번호 해시 생성 (pbkdf2_sha256 - 길이 제한 없음)""" return pwd_context.hash(password) def create_access_token(data: dict, expires_delta: Optional[timedelta] = None): to_encode = data.copy() if expires_delta: expire = datetime.utcnow() + expires_delta else: expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) to_encode.update({"exp": expire}) encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) return encoded_jwt def verify_token(token: str, credentials_exception): try: payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) username: str = payload.get("sub") if username is None: raise credentials_exception token_data = TokenData(username=username) return token_data except JWTError: raise credentials_exception def authenticate_user(db: Session, username: str, password: str): user = db.query(User).filter(User.username == username).first() if not user: return False if not verify_password(password, user.hashed_password): return False return user def create_admin_user(db: Session): """초기 관리자 계정 생성""" admin_username = os.getenv("ADMIN_USERNAME", "hyungi") admin_password = os.getenv("ADMIN_PASSWORD", "djg3-jj34-X3Q3") # 이미 존재하는지 확인 existing_admin = db.query(User).filter(User.username == admin_username).first() if not existing_admin: admin_user = User( username=admin_username, hashed_password=get_password_hash(admin_password), full_name="관리자", role=UserRole.admin, is_active=True ) db.add(admin_user) db.commit() print(f"관리자 계정 생성됨: {admin_username}") else: print(f"관리자 계정이 이미 존재함: {admin_username}")