Some checks failed
SonarQube Analysis / SonarQube Scan (push) Has been cancelled
✅ 주요 수정사항: - 재질 GRADE 전체 표기: ASTM A106 B 완전 표시 (A10 잘림 현상 해결) - material_grade_extractor.py 정규표현식 패턴 개선 - files.py 파일 업로드 시 재질 추출 로직 수정 - CSS 그리드 너비 확장으로 텍스트 잘림 현상 해결 - 사용자 요구사항 엑셀 다운로드 기능 완료 🎯 해결된 문제: 1. ASTM A106 B → ASTM A10 잘림 문제 2. 재질 컬럼 너비 부족으로 인한 표시 문제 3. 사용자 요구사항이 엑셀에 반영되지 않는 문제 📋 다음 단계 준비: - 파이프 끝단 정보 제외 취합 로직 개선 - 플랜지 타입 정보 확장 - 자재 분류 필터 기능 추가
273 lines
8.7 KiB
Python
273 lines
8.7 KiB
Python
"""
|
|
JWT 토큰 관리 서비스
|
|
TK-FB-Project의 JWT 구현을 참고하여 FastAPI용으로 구현
|
|
"""
|
|
import jwt
|
|
from datetime import datetime, timedelta
|
|
from typing import Optional, Dict, Any
|
|
from fastapi import HTTPException, status
|
|
import os
|
|
from ..config import get_settings
|
|
from ..utils.logger import get_logger
|
|
|
|
settings = get_settings()
|
|
logger = get_logger(__name__)
|
|
|
|
# JWT 설정
|
|
JWT_SECRET = os.getenv('JWT_SECRET', 'tkmp-secret-key-2025')
|
|
JWT_REFRESH_SECRET = os.getenv('JWT_REFRESH_SECRET', 'tkmp-refresh-secret-2025')
|
|
JWT_ALGORITHM = 'HS256'
|
|
ACCESS_TOKEN_EXPIRE_HOURS = int(os.getenv('JWT_EXPIRES_IN_HOURS', '24'))
|
|
REFRESH_TOKEN_EXPIRE_DAYS = int(os.getenv('JWT_REFRESH_EXPIRES_IN_DAYS', '7'))
|
|
|
|
|
|
class JWTService:
|
|
"""JWT 토큰 관리 서비스"""
|
|
|
|
@staticmethod
|
|
def create_access_token(user_data: Dict[str, Any]) -> str:
|
|
"""
|
|
Access Token 생성
|
|
|
|
Args:
|
|
user_data: 사용자 정보 딕셔너리
|
|
|
|
Returns:
|
|
str: JWT Access Token
|
|
"""
|
|
try:
|
|
# 토큰 만료 시간 설정
|
|
expire = datetime.utcnow() + timedelta(hours=ACCESS_TOKEN_EXPIRE_HOURS)
|
|
|
|
# 토큰 페이로드 구성
|
|
payload = {
|
|
'user_id': user_data['user_id'],
|
|
'username': user_data['username'],
|
|
'name': user_data['name'],
|
|
'role': user_data['role'],
|
|
'access_level': user_data['access_level'],
|
|
'exp': expire,
|
|
'iat': datetime.utcnow(),
|
|
'type': 'access'
|
|
}
|
|
|
|
# JWT 토큰 생성
|
|
token = jwt.encode(payload, JWT_SECRET, algorithm=JWT_ALGORITHM)
|
|
|
|
logger.debug(f"Access token created for user: {user_data['username']}")
|
|
return token
|
|
|
|
except Exception as e:
|
|
logger.error(f"Access token creation failed: {str(e)}")
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail="토큰 생성에 실패했습니다"
|
|
)
|
|
|
|
@staticmethod
|
|
def create_refresh_token(user_id: int) -> str:
|
|
"""
|
|
Refresh Token 생성
|
|
|
|
Args:
|
|
user_id: 사용자 ID
|
|
|
|
Returns:
|
|
str: JWT Refresh Token
|
|
"""
|
|
try:
|
|
# 토큰 만료 시간 설정
|
|
expire = datetime.utcnow() + timedelta(days=REFRESH_TOKEN_EXPIRE_DAYS)
|
|
|
|
# 토큰 페이로드 구성
|
|
payload = {
|
|
'user_id': user_id,
|
|
'exp': expire,
|
|
'iat': datetime.utcnow(),
|
|
'type': 'refresh'
|
|
}
|
|
|
|
# JWT 토큰 생성
|
|
token = jwt.encode(payload, JWT_REFRESH_SECRET, algorithm=JWT_ALGORITHM)
|
|
|
|
logger.debug(f"Refresh token created for user_id: {user_id}")
|
|
return token
|
|
|
|
except Exception as e:
|
|
logger.error(f"Refresh token creation failed: {str(e)}")
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail="리프레시 토큰 생성에 실패했습니다"
|
|
)
|
|
|
|
@staticmethod
|
|
def verify_access_token(token: str) -> Dict[str, Any]:
|
|
"""
|
|
Access Token 검증
|
|
|
|
Args:
|
|
token: JWT Access Token
|
|
|
|
Returns:
|
|
Dict[str, Any]: 토큰 페이로드
|
|
|
|
Raises:
|
|
HTTPException: 토큰 검증 실패 시
|
|
"""
|
|
try:
|
|
# JWT 토큰 디코딩
|
|
payload = jwt.decode(token, JWT_SECRET, algorithms=[JWT_ALGORITHM])
|
|
|
|
# 토큰 타입 확인
|
|
if payload.get('type') != 'access':
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="잘못된 토큰 타입입니다"
|
|
)
|
|
|
|
# 필수 필드 확인
|
|
required_fields = ['user_id', 'username', 'role']
|
|
for field in required_fields:
|
|
if field not in payload:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail=f"토큰에 {field} 정보가 없습니다"
|
|
)
|
|
|
|
logger.debug(f"Access token verified for user: {payload['username']}")
|
|
return payload
|
|
|
|
except jwt.ExpiredSignatureError:
|
|
logger.warning("Access token expired")
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="토큰이 만료되었습니다"
|
|
)
|
|
except jwt.InvalidTokenError as e:
|
|
logger.warning(f"Invalid access token: {str(e)}")
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="유효하지 않은 토큰입니다"
|
|
)
|
|
except Exception as e:
|
|
logger.error(f"Access token verification failed: {str(e)}")
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="토큰 검증에 실패했습니다"
|
|
)
|
|
|
|
@staticmethod
|
|
def verify_refresh_token(token: str) -> Dict[str, Any]:
|
|
"""
|
|
Refresh Token 검증
|
|
|
|
Args:
|
|
token: JWT Refresh Token
|
|
|
|
Returns:
|
|
Dict[str, Any]: 토큰 페이로드
|
|
|
|
Raises:
|
|
HTTPException: 토큰 검증 실패 시
|
|
"""
|
|
try:
|
|
# JWT 토큰 디코딩
|
|
payload = jwt.decode(token, JWT_REFRESH_SECRET, algorithms=[JWT_ALGORITHM])
|
|
|
|
# 토큰 타입 확인
|
|
if payload.get('type') != 'refresh':
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="잘못된 리프레시 토큰 타입입니다"
|
|
)
|
|
|
|
# 필수 필드 확인
|
|
if 'user_id' not in payload:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="토큰에 사용자 정보가 없습니다"
|
|
)
|
|
|
|
logger.debug(f"Refresh token verified for user_id: {payload['user_id']}")
|
|
return payload
|
|
|
|
except jwt.ExpiredSignatureError:
|
|
logger.warning("Refresh token expired")
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="리프레시 토큰이 만료되었습니다"
|
|
)
|
|
except jwt.InvalidTokenError as e:
|
|
logger.warning(f"Invalid refresh token: {str(e)}")
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="유효하지 않은 리프레시 토큰입니다"
|
|
)
|
|
except Exception as e:
|
|
logger.error(f"Refresh token verification failed: {str(e)}")
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="리프레시 토큰 검증에 실패했습니다"
|
|
)
|
|
|
|
@staticmethod
|
|
def get_token_expiry_info(token: str, token_type: str = 'access') -> Dict[str, Any]:
|
|
"""
|
|
토큰 만료 정보 조회
|
|
|
|
Args:
|
|
token: JWT Token
|
|
token_type: 토큰 타입 ('access' 또는 'refresh')
|
|
|
|
Returns:
|
|
Dict[str, Any]: 토큰 만료 정보
|
|
"""
|
|
try:
|
|
secret = JWT_SECRET if token_type == 'access' else JWT_REFRESH_SECRET
|
|
payload = jwt.decode(token, secret, algorithms=[JWT_ALGORITHM])
|
|
|
|
exp_timestamp = payload.get('exp')
|
|
iat_timestamp = payload.get('iat')
|
|
|
|
if exp_timestamp:
|
|
exp_datetime = datetime.fromtimestamp(exp_timestamp)
|
|
remaining_time = exp_datetime - datetime.utcnow()
|
|
|
|
return {
|
|
'expires_at': exp_datetime,
|
|
'issued_at': datetime.fromtimestamp(iat_timestamp) if iat_timestamp else None,
|
|
'remaining_seconds': int(remaining_time.total_seconds()),
|
|
'is_expired': remaining_time.total_seconds() <= 0
|
|
}
|
|
|
|
return {'error': '토큰에 만료 시간 정보가 없습니다'}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Token expiry info retrieval failed: {str(e)}")
|
|
return {'error': str(e)}
|
|
|
|
|
|
# JWT 서비스 인스턴스
|
|
jwt_service = JWTService()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|