"""JWT + TOTP 2FA 인증""" from datetime import datetime, timedelta, timezone from typing import Annotated import bcrypt import pyotp from fastapi import Depends, HTTPException, status from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer from jose import JWTError, jwt from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from core.config import settings from core.database import get_session security = HTTPBearer() # JWT 설정 ALGORITHM = "HS256" ACCESS_TOKEN_EXPIRE_MINUTES = 15 REFRESH_TOKEN_EXPIRE_DAYS = 7 def verify_password(plain: str, hashed: str) -> bool: return bcrypt.checkpw(plain.encode(), hashed.encode()) def hash_password(password: str) -> str: return bcrypt.hashpw(password.encode(), bcrypt.gensalt()).decode() def create_access_token(subject: str) -> str: expire = datetime.now(timezone.utc) + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) payload = {"sub": subject, "exp": expire, "type": "access"} return jwt.encode(payload, settings.jwt_secret, algorithm=ALGORITHM) def create_refresh_token(subject: str) -> str: expire = datetime.now(timezone.utc) + timedelta(days=REFRESH_TOKEN_EXPIRE_DAYS) payload = {"sub": subject, "exp": expire, "type": "refresh"} return jwt.encode(payload, settings.jwt_secret, algorithm=ALGORITHM) def decode_token(token: str) -> dict | None: try: return jwt.decode(token, settings.jwt_secret, algorithms=[ALGORITHM]) except JWTError: return None def verify_totp(code: str, secret: str | None = None) -> bool: """TOTP 코드 검증 (유저별 secret 또는 글로벌 설정)""" totp_secret = secret or settings.totp_secret if not totp_secret: return True # TOTP 미설정 시 스킵 totp = pyotp.TOTP(totp_secret) return totp.verify(code) async def get_current_user( credentials: Annotated[HTTPAuthorizationCredentials, Depends(security)], session: Annotated[AsyncSession, Depends(get_session)], ): """Bearer 토큰에서 현재 유저 조회""" from models.user import User payload = decode_token(credentials.credentials) if not payload or payload.get("type") != "access": raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="유효하지 않은 토큰", ) username = payload.get("sub") result = await session.execute( select(User).where(User.username == username, User.is_active.is_(True)) ) user = result.scalar_one_or_none() if not user: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="유저를 찾을 수 없음", ) return user