Files
TK-BOM-Project/backend/app/auth/middleware.py
Hyungi Ahn 83b90ef05c
Some checks failed
SonarQube Analysis / SonarQube Scan (push) Has been cancelled
feat: 자재 관리 페이지 대규모 개선
- 파이프 수량 계산 로직 수정 (단관 개수가 아닌 실제 길이 기반 계산)
- UI 전면 개편 (DevonThink 스타일의 간결하고 세련된 디자인)
- 자재별 그룹핑 로직 개선:
  * 플랜지: 동일 사양별 그룹핑, WN 스케줄 표시, ORIFICE 풀네임 표시
  * 피팅: 상세 타입 표시 (니플 길이, 엘보 각도/연결, 티 타입, 리듀서 타입 등)
  * 밸브: 동일 사양별 그룹핑, 타입/연결방식/압력 표시
  * 볼트: 크기/재질/길이별 그룹핑 (8SET → 개별 집계)
  * 가스켓: 동일 사양별 그룹핑, 재질/상세내역/두께 분리 표시
  * UNKNOWN: 원본 설명 전체 표시, 동일 항목 그룹핑
- 전체 카테고리 버튼 제거 (표시 복잡도 감소)
- 카테고리별 동적 컬럼 헤더 및 레이아웃 적용
2025-09-09 09:24:45 +09:00

322 lines
9.7 KiB
Python

"""
인증 및 권한 미들웨어
JWT 토큰 기반 인증과 역할 기반 접근 제어(RBAC) 구현
"""
from fastapi import Depends, HTTPException, status, Request
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from sqlalchemy.orm import Session
from typing import List, Optional, Callable, Any
from functools import wraps
import inspect
from ..database import get_db
from .jwt_service import jwt_service
from .models import UserRepository
from ..utils.logger import get_logger
logger = get_logger(__name__)
security = HTTPBearer()
class AuthMiddleware:
"""인증 미들웨어 클래스"""
def __init__(self):
self.security = HTTPBearer()
async def get_current_user(
self,
credentials: HTTPAuthorizationCredentials = Depends(security),
db: Session = Depends(get_db)
) -> dict:
"""
현재 사용자 정보 조회
Args:
credentials: JWT 토큰
db: 데이터베이스 세션
Returns:
dict: 사용자 정보
Raises:
HTTPException: 인증 실패 시
"""
try:
# 토큰 검증
payload = jwt_service.verify_access_token(credentials.credentials)
user_id = payload['user_id']
# 사용자 정보 조회
user_repo = UserRepository(db)
user = user_repo.find_by_id(user_id)
if not user:
logger.warning(f"User not found for token: user_id {user_id}")
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="사용자를 찾을 수 없습니다"
)
if not user.is_active:
logger.warning(f"Inactive user attempted access: {user.username}")
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="비활성화된 계정입니다"
)
if user.is_locked():
logger.warning(f"Locked user attempted access: {user.username}")
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="잠긴 계정입니다"
)
# 사용자 정보와 토큰 페이로드 결합
user_info = user.to_dict()
user_info.update({
'token_user_id': payload['user_id'],
'token_username': payload['username'],
'token_role': payload['role']
})
return user_info
except HTTPException:
raise
except Exception as e:
logger.error(f"Get current user error: {str(e)}")
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="인증 처리 중 오류가 발생했습니다"
)
async def get_current_active_user(
self,
current_user: dict = Depends(get_current_user)
) -> dict:
"""
현재 활성 사용자 정보 조회 (별칭)
Args:
current_user: 현재 사용자 정보
Returns:
dict: 사용자 정보
"""
return current_user
def require_roles(self, allowed_roles: List[str]):
"""
특정 역할을 요구하는 의존성 함수 생성
Args:
allowed_roles: 허용된 역할 목록
Returns:
Callable: 의존성 함수
"""
async def role_checker(
current_user: dict = Depends(self.get_current_user)
) -> dict:
user_role = current_user.get('role')
if user_role not in allowed_roles:
logger.warning(
f"Access denied for user {current_user.get('username')} "
f"with role {user_role}. Required roles: {allowed_roles}"
)
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"이 기능을 사용하려면 다음 권한이 필요합니다: {', '.join(allowed_roles)}"
)
return current_user
return role_checker
def require_permissions(self, required_permissions: List[str]):
"""
특정 권한을 요구하는 의존성 함수 생성
Args:
required_permissions: 필요한 권한 목록
Returns:
Callable: 의존성 함수
"""
async def permission_checker(
current_user: dict = Depends(self.get_current_user),
db: Session = Depends(get_db)
) -> dict:
user_role = current_user.get('role')
# 사용자 권한 조회
user_repo = UserRepository(db)
user_permissions = user_repo.get_user_permissions(user_role)
# 필요한 권한 확인
missing_permissions = []
for permission in required_permissions:
if permission not in user_permissions:
missing_permissions.append(permission)
if missing_permissions:
logger.warning(
f"Permission denied for user {current_user.get('username')} "
f"with role {user_role}. Missing permissions: {missing_permissions}"
)
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"이 기능을 사용하려면 다음 권한이 필요합니다: {', '.join(missing_permissions)}"
)
# 사용자 정보에 권한 정보 추가
current_user['permissions'] = user_permissions
return current_user
return permission_checker
def require_admin(self):
"""관리자 권한을 요구하는 의존성 함수"""
return self.require_roles(['admin', 'system'])
def require_leader_or_admin(self):
"""팀장 이상 권한을 요구하는 의존성 함수"""
return self.require_roles(['admin', 'system', 'leader'])
# 전역 인증 미들웨어 인스턴스
auth_middleware = AuthMiddleware()
# 편의를 위한 의존성 함수들
get_current_user = auth_middleware.get_current_user
get_current_active_user = auth_middleware.get_current_active_user
require_admin = auth_middleware.require_admin
require_leader_or_admin = auth_middleware.require_leader_or_admin
def require_roles(allowed_roles: List[str]):
"""역할 기반 접근 제어 데코레이터"""
return auth_middleware.require_roles(allowed_roles)
def require_permissions(required_permissions: List[str]):
"""권한 기반 접근 제어 데코레이터"""
return auth_middleware.require_permissions(required_permissions)
# 추가 유틸리티 함수들
async def get_user_from_token(token: str, db: Session) -> Optional[dict]:
"""
토큰에서 사용자 정보 추출 (미들웨어 없이 직접 사용)
Args:
token: JWT 토큰
db: 데이터베이스 세션
Returns:
Optional[dict]: 사용자 정보 또는 None
"""
try:
payload = jwt_service.verify_access_token(token)
user_id = payload['user_id']
user_repo = UserRepository(db)
user = user_repo.find_by_id(user_id)
if user and user.is_active and not user.is_locked():
return user.to_dict()
return None
except Exception as e:
logger.error(f"Get user from token error: {str(e)}")
return None
def check_user_permission(user_role: str, required_permission: str, db: Session) -> bool:
"""
사용자 권한 확인
Args:
user_role: 사용자 역할
required_permission: 필요한 권한
db: 데이터베이스 세션
Returns:
bool: 권한 보유 여부
"""
try:
user_repo = UserRepository(db)
user_permissions = user_repo.get_user_permissions(user_role)
return required_permission in user_permissions
except Exception as e:
logger.error(f"Check user permission error: {str(e)}")
return False
def get_user_permissions_by_role(role: str, db: Session) -> List[str]:
"""
역할별 권한 목록 조회
Args:
role: 사용자 역할
db: 데이터베이스 세션
Returns:
List[str]: 권한 목록
"""
try:
user_repo = UserRepository(db)
return user_repo.get_user_permissions(role)
except Exception as e:
logger.error(f"Get user permissions by role error: {str(e)}")
return []
# 선택적 인증 (토큰이 있으면 검증, 없으면 None 반환)
async def get_current_user_optional(
request: Request,
db: Session = Depends(get_db)
) -> Optional[dict]:
"""
선택적 사용자 인증 (토큰이 있으면 검증, 없으면 None)
Args:
request: FastAPI Request 객체
db: 데이터베이스 세션
Returns:
Optional[dict]: 사용자 정보 또는 None
"""
try:
# Authorization 헤더 확인
authorization = request.headers.get('authorization')
if not authorization or not authorization.startswith('Bearer '):
return None
token = authorization.split(' ')[1]
return await get_user_from_token(token, db)
except Exception as e:
logger.debug(f"Optional auth failed: {str(e)}")
return None