Files
TK-BOM-Project/backend/app/auth/jwt_service.py
Hyungi Ahn 4f8e395f87
Some checks failed
SonarQube Analysis / SonarQube Scan (push) Has been cancelled
feat: SWG 가스켓 전체 구성 정보 표시 개선
- H/F/I/O SS304/GRAPHITE/CS/CS 패턴에서 4개 구성요소 모두 표시
- 기존 SS304 + GRAPHITE → SS304/GRAPHITE/CS/CS로 완전한 구성 표시
- 외부링/필러/내부링/추가구성 모든 정보 포함
- 구매수량 계산 모달에서 정확한 재질 정보 확인 가능
2025-08-30 14:23:01 +09:00

252 lines
8.7 KiB
Python

"""
JWT 토큰 관리 서비스
TK-FB-Project의 JWT 구현을 참고하여 FastAPI용으로 구현
"""
import jwt
from datetime import datetime, timedelta
from typing import Optional, Dict, Any
from fastapi import HTTPException, status
import os
from ..config import get_settings
from ..utils.logger import get_logger
settings = get_settings()
logger = get_logger(__name__)
# JWT 설정
JWT_SECRET = os.getenv('JWT_SECRET', 'tkmp-secret-key-2025')
JWT_REFRESH_SECRET = os.getenv('JWT_REFRESH_SECRET', 'tkmp-refresh-secret-2025')
JWT_ALGORITHM = 'HS256'
ACCESS_TOKEN_EXPIRE_HOURS = int(os.getenv('JWT_EXPIRES_IN_HOURS', '24'))
REFRESH_TOKEN_EXPIRE_DAYS = int(os.getenv('JWT_REFRESH_EXPIRES_IN_DAYS', '7'))
class JWTService:
"""JWT 토큰 관리 서비스"""
@staticmethod
def create_access_token(user_data: Dict[str, Any]) -> str:
"""
Access Token 생성
Args:
user_data: 사용자 정보 딕셔너리
Returns:
str: JWT Access Token
"""
try:
# 토큰 만료 시간 설정
expire = datetime.utcnow() + timedelta(hours=ACCESS_TOKEN_EXPIRE_HOURS)
# 토큰 페이로드 구성
payload = {
'user_id': user_data['user_id'],
'username': user_data['username'],
'name': user_data['name'],
'role': user_data['role'],
'access_level': user_data['access_level'],
'exp': expire,
'iat': datetime.utcnow(),
'type': 'access'
}
# JWT 토큰 생성
token = jwt.encode(payload, JWT_SECRET, algorithm=JWT_ALGORITHM)
logger.debug(f"Access token created for user: {user_data['username']}")
return token
except Exception as e:
logger.error(f"Access token creation failed: {str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="토큰 생성에 실패했습니다"
)
@staticmethod
def create_refresh_token(user_id: int) -> str:
"""
Refresh Token 생성
Args:
user_id: 사용자 ID
Returns:
str: JWT Refresh Token
"""
try:
# 토큰 만료 시간 설정
expire = datetime.utcnow() + timedelta(days=REFRESH_TOKEN_EXPIRE_DAYS)
# 토큰 페이로드 구성
payload = {
'user_id': user_id,
'exp': expire,
'iat': datetime.utcnow(),
'type': 'refresh'
}
# JWT 토큰 생성
token = jwt.encode(payload, JWT_REFRESH_SECRET, algorithm=JWT_ALGORITHM)
logger.debug(f"Refresh token created for user_id: {user_id}")
return token
except Exception as e:
logger.error(f"Refresh token creation failed: {str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="리프레시 토큰 생성에 실패했습니다"
)
@staticmethod
def verify_access_token(token: str) -> Dict[str, Any]:
"""
Access Token 검증
Args:
token: JWT Access Token
Returns:
Dict[str, Any]: 토큰 페이로드
Raises:
HTTPException: 토큰 검증 실패 시
"""
try:
# JWT 토큰 디코딩
payload = jwt.decode(token, JWT_SECRET, algorithms=[JWT_ALGORITHM])
# 토큰 타입 확인
if payload.get('type') != 'access':
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="잘못된 토큰 타입입니다"
)
# 필수 필드 확인
required_fields = ['user_id', 'username', 'role']
for field in required_fields:
if field not in payload:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=f"토큰에 {field} 정보가 없습니다"
)
logger.debug(f"Access token verified for user: {payload['username']}")
return payload
except jwt.ExpiredSignatureError:
logger.warning("Access token expired")
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="토큰이 만료되었습니다"
)
except jwt.InvalidTokenError as e:
logger.warning(f"Invalid access token: {str(e)}")
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="유효하지 않은 토큰입니다"
)
except Exception as e:
logger.error(f"Access token verification failed: {str(e)}")
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="토큰 검증에 실패했습니다"
)
@staticmethod
def verify_refresh_token(token: str) -> Dict[str, Any]:
"""
Refresh Token 검증
Args:
token: JWT Refresh Token
Returns:
Dict[str, Any]: 토큰 페이로드
Raises:
HTTPException: 토큰 검증 실패 시
"""
try:
# JWT 토큰 디코딩
payload = jwt.decode(token, JWT_REFRESH_SECRET, algorithms=[JWT_ALGORITHM])
# 토큰 타입 확인
if payload.get('type') != 'refresh':
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="잘못된 리프레시 토큰 타입입니다"
)
# 필수 필드 확인
if 'user_id' not in payload:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="토큰에 사용자 정보가 없습니다"
)
logger.debug(f"Refresh token verified for user_id: {payload['user_id']}")
return payload
except jwt.ExpiredSignatureError:
logger.warning("Refresh token expired")
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="리프레시 토큰이 만료되었습니다"
)
except jwt.InvalidTokenError as e:
logger.warning(f"Invalid refresh token: {str(e)}")
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="유효하지 않은 리프레시 토큰입니다"
)
except Exception as e:
logger.error(f"Refresh token verification failed: {str(e)}")
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="리프레시 토큰 검증에 실패했습니다"
)
@staticmethod
def get_token_expiry_info(token: str, token_type: str = 'access') -> Dict[str, Any]:
"""
토큰 만료 정보 조회
Args:
token: JWT Token
token_type: 토큰 타입 ('access' 또는 'refresh')
Returns:
Dict[str, Any]: 토큰 만료 정보
"""
try:
secret = JWT_SECRET if token_type == 'access' else JWT_REFRESH_SECRET
payload = jwt.decode(token, secret, algorithms=[JWT_ALGORITHM])
exp_timestamp = payload.get('exp')
iat_timestamp = payload.get('iat')
if exp_timestamp:
exp_datetime = datetime.fromtimestamp(exp_timestamp)
remaining_time = exp_datetime - datetime.utcnow()
return {
'expires_at': exp_datetime,
'issued_at': datetime.fromtimestamp(iat_timestamp) if iat_timestamp else None,
'remaining_seconds': int(remaining_time.total_seconds()),
'is_expired': remaining_time.total_seconds() <= 0
}
return {'error': '토큰에 만료 시간 정보가 없습니다'}
except Exception as e:
logger.error(f"Token expiry info retrieval failed: {str(e)}")
return {'error': str(e)}
# JWT 서비스 인스턴스
jwt_service = JWTService()