Files
TK-BOM-Project/backend/app/auth/models.py
Hyungi Ahn 529777aa14
Some checks failed
SonarQube Analysis / SonarQube Scan (push) Has been cancelled
feat: 완전한 사용자 관리 및 로그 모니터링 시스템 구현
- 시스템 관리자/관리자 권한별 대시보드 기능 추가
- 사용자 관리 페이지: 계정 생성, 역할 변경, 사용자 삭제
- 시스템 로그 페이지: 로그인 로그, 시스템 오류 로그 조회
- 로그 모니터링 대시보드: 실시간 통계, 최근 활동, 오류 모니터링
- 프론트엔드 ErrorBoundary 및 오류 로깅 시스템 통합
- 계정 설정 페이지: 프로필 업데이트, 비밀번호 변경
- 3단계 권한 시스템 (system/admin/user) 완전 구현
- 시스템 관리자 계정 생성 기능 (hyungi/000000)
- 로그인 페이지 테스트 계정 안내 제거
- API 오류 수정: CORS, 이메일 검증, User 모델 import 등
2025-09-09 12:58:14 +09:00

405 lines
14 KiB
Python

"""
인증 시스템 모델
TK-FB-Project의 사용자 모델을 참고하여 SQLAlchemy 기반으로 구현
"""
from sqlalchemy import Column, Integer, String, Boolean, DateTime, Text, ForeignKey
from sqlalchemy.orm import relationship
from sqlalchemy.sql import func
from datetime import datetime, timedelta
from typing import Optional, List, Dict, Any
from sqlalchemy.orm import Session
from sqlalchemy import text
import bcrypt
from ..database import Base
from ..utils.logger import get_logger
logger = get_logger(__name__)
class User(Base):
"""사용자 모델"""
__tablename__ = "users"
user_id = Column(Integer, primary_key=True, index=True)
username = Column(String(50), unique=True, index=True, nullable=False)
password = Column(String(255), nullable=False)
name = Column(String(100), nullable=False)
email = Column(String(100), index=True)
# 권한 관리 - 3단계 시스템: system(제작자) > admin(관리자) > user(사용자)
role = Column(String(20), default='user', nullable=False) # system, admin, user
access_level = Column(String(20), default='worker', nullable=False) # 호환성 유지
# 계정 상태 관리
is_active = Column(Boolean, default=True, nullable=False)
failed_login_attempts = Column(Integer, default=0)
locked_until = Column(DateTime, nullable=True)
# 추가 정보
department = Column(String(50))
position = Column(String(50))
phone = Column(String(20))
# 타임스탬프
created_at = Column(DateTime, default=func.now())
updated_at = Column(DateTime, default=func.now(), onupdate=func.now())
last_login_at = Column(DateTime, nullable=True)
# 관계 설정
login_logs = relationship("LoginLog", back_populates="user", cascade="all, delete-orphan")
sessions = relationship("UserSession", back_populates="user", cascade="all, delete-orphan")
def __repr__(self):
return f"<User(username='{self.username}', name='{self.name}', role='{self.role}')>"
def to_dict(self) -> Dict[str, Any]:
"""사용자 정보를 딕셔너리로 변환 (비밀번호 제외)"""
return {
'user_id': self.user_id,
'username': self.username,
'name': self.name,
'email': self.email,
'role': self.role,
'access_level': self.access_level,
'is_active': self.is_active,
'department': self.department,
'position': self.position,
'phone': self.phone,
'created_at': self.created_at,
'last_login_at': self.last_login_at
}
def check_password(self, password: str) -> bool:
"""비밀번호 확인"""
try:
return bcrypt.checkpw(password.encode('utf-8'), self.password.encode('utf-8'))
except Exception as e:
logger.error(f"Password check failed for user {self.username}: {str(e)}")
return False
def set_password(self, password: str):
"""비밀번호 설정 (해싱)"""
try:
salt = bcrypt.gensalt()
self.password = bcrypt.hashpw(password.encode('utf-8'), salt).decode('utf-8')
except Exception as e:
logger.error(f"Password hashing failed for user {self.username}: {str(e)}")
raise
def is_locked(self) -> bool:
"""계정 잠금 상태 확인"""
if self.locked_until is None:
return False
return datetime.utcnow() < self.locked_until
def lock_account(self, minutes: int = 15):
"""계정 잠금"""
self.locked_until = datetime.utcnow() + timedelta(minutes=minutes)
logger.warning(f"User account locked: {self.username} for {minutes} minutes")
def unlock_account(self):
"""계정 잠금 해제"""
self.locked_until = None
self.failed_login_attempts = 0
logger.info(f"User account unlocked: {self.username}")
def increment_failed_attempts(self):
"""로그인 실패 횟수 증가"""
self.failed_login_attempts += 1
if self.failed_login_attempts >= 5:
self.lock_account()
def reset_failed_attempts(self):
"""로그인 실패 횟수 초기화"""
self.failed_login_attempts = 0
self.locked_until = None
def update_last_login(self):
"""마지막 로그인 시간 업데이트"""
self.last_login_at = datetime.utcnow()
# 권한 체크 메서드들
def is_system(self) -> bool:
"""시스템 관리자 권한 확인"""
return self.role == 'system'
def is_admin(self) -> bool:
"""관리자 권한 확인 (시스템 관리자 포함)"""
return self.role in ['system', 'admin']
def is_user(self) -> bool:
"""일반 사용자 권한 확인"""
return self.role == 'user'
def can_create_users(self) -> bool:
"""사용자 생성 권한 확인 (시스템 관리자만)"""
return self.is_system()
def can_view_logs(self) -> bool:
"""로그 조회 권한 확인 (관리자 이상)"""
return self.is_admin()
def can_manage_system(self) -> bool:
"""시스템 관리 권한 확인 (시스템 관리자만)"""
return self.is_system()
def get_role_display_name(self) -> str:
"""역할 표시명 반환"""
role_names = {
'system': '시스템 관리자',
'admin': '관리자',
'user': '사용자'
}
return role_names.get(self.role, '알 수 없음')
class LoginLog(Base):
"""로그인 이력 모델"""
__tablename__ = "login_logs"
log_id = Column(Integer, primary_key=True, index=True)
user_id = Column(Integer, ForeignKey("users.user_id", ondelete="CASCADE"), nullable=False)
login_time = Column(DateTime, default=func.now())
ip_address = Column(String(45))
user_agent = Column(Text)
login_status = Column(String(20), nullable=False) # 'success' or 'failed'
failure_reason = Column(String(100))
session_duration = Column(Integer) # 세션 지속 시간 (초)
created_at = Column(DateTime, default=func.now())
# 관계 설정
user = relationship("User", back_populates="login_logs")
def __repr__(self):
return f"<LoginLog(user_id={self.user_id}, status='{self.login_status}', time='{self.login_time}')>"
class UserSession(Base):
"""사용자 세션 모델 (Refresh Token 관리)"""
__tablename__ = "user_sessions"
session_id = Column(Integer, primary_key=True, index=True)
user_id = Column(Integer, ForeignKey("users.user_id", ondelete="CASCADE"), nullable=False)
refresh_token = Column(String(500), nullable=False, index=True)
expires_at = Column(DateTime, nullable=False)
ip_address = Column(String(45))
user_agent = Column(Text)
is_active = Column(Boolean, default=True)
created_at = Column(DateTime, default=func.now())
updated_at = Column(DateTime, default=func.now(), onupdate=func.now())
# 관계 설정
user = relationship("User", back_populates="sessions")
def __repr__(self):
return f"<UserSession(user_id={self.user_id}, expires_at='{self.expires_at}')>"
def is_expired(self) -> bool:
"""세션 만료 여부 확인"""
return datetime.utcnow() > self.expires_at
def deactivate(self):
"""세션 비활성화"""
self.is_active = False
class Permission(Base):
"""권한 모델"""
__tablename__ = "permissions"
permission_id = Column(Integer, primary_key=True, index=True)
permission_name = Column(String(50), unique=True, nullable=False)
description = Column(Text)
module = Column(String(30), index=True) # 모듈별 권한 관리
created_at = Column(DateTime, default=func.now())
def __repr__(self):
return f"<Permission(name='{self.permission_name}', module='{self.module}')>"
class RolePermission(Base):
"""역할-권한 매핑 모델"""
__tablename__ = "role_permissions"
role_permission_id = Column(Integer, primary_key=True, index=True)
role = Column(String(20), nullable=False, index=True)
permission_id = Column(Integer, ForeignKey("permissions.permission_id", ondelete="CASCADE"))
created_at = Column(DateTime, default=func.now())
# 관계 설정
permission = relationship("Permission")
def __repr__(self):
return f"<RolePermission(role='{self.role}', permission_id={self.permission_id})>"
class UserRepository:
"""사용자 데이터 접근 계층"""
def __init__(self, db: Session):
self.db = db
def find_by_username(self, username: str) -> Optional[User]:
"""사용자명으로 사용자 조회"""
try:
return self.db.query(User).filter(User.username == username).first()
except Exception as e:
logger.error(f"Failed to find user by username {username}: {str(e)}")
return None
def find_by_id(self, user_id: int) -> Optional[User]:
"""사용자 ID로 사용자 조회"""
try:
return self.db.query(User).filter(User.user_id == user_id).first()
except Exception as e:
logger.error(f"Failed to find user by id {user_id}: {str(e)}")
return None
def find_by_email(self, email: str) -> Optional[User]:
"""이메일로 사용자 조회"""
try:
return self.db.query(User).filter(User.email == email).first()
except Exception as e:
logger.error(f"Failed to find user by email {email}: {str(e)}")
return None
def create_user(self, user_data: Dict[str, Any]) -> User:
"""새 사용자 생성"""
try:
user = User(**user_data)
self.db.add(user)
self.db.commit()
self.db.refresh(user)
logger.info(f"User created: {user.username}")
return user
except Exception as e:
self.db.rollback()
logger.error(f"Failed to create user: {str(e)}")
raise
def update_user(self, user: User) -> User:
"""사용자 정보 업데이트"""
try:
self.db.commit()
self.db.refresh(user)
logger.info(f"User updated: {user.username}")
return user
except Exception as e:
self.db.rollback()
logger.error(f"Failed to update user {user.username}: {str(e)}")
raise
def delete_user(self, user: User):
"""사용자 삭제"""
try:
self.db.delete(user)
self.db.commit()
logger.info(f"User deleted: {user.username}")
except Exception as e:
self.db.rollback()
logger.error(f"Failed to delete user {user.username}: {str(e)}")
raise
def get_all_users(self, skip: int = 0, limit: int = 100) -> List[User]:
"""모든 사용자 조회"""
try:
return self.db.query(User).offset(skip).limit(limit).all()
except Exception as e:
logger.error(f"Failed to get all users: {str(e)}")
return []
def get_user_permissions(self, role: str) -> List[str]:
"""사용자 역할에 따른 권한 목록 조회"""
try:
query = text("""
SELECT p.permission_name
FROM permissions p
JOIN role_permissions rp ON p.permission_id = rp.permission_id
WHERE rp.role = :role
""")
result = self.db.execute(query, {"role": role})
return [row[0] for row in result.fetchall()]
except Exception as e:
logger.error(f"Failed to get permissions for role {role}: {str(e)}")
return []
def record_login_log(self, user_id: int, ip_address: str, user_agent: str,
status: str, failure_reason: str = None):
"""로그인 이력 기록"""
try:
log = LoginLog(
user_id=user_id,
ip_address=ip_address,
user_agent=user_agent,
login_status=status,
failure_reason=failure_reason
)
self.db.add(log)
self.db.commit()
logger.debug(f"Login log recorded for user_id {user_id}: {status}")
except Exception as e:
self.db.rollback()
logger.error(f"Failed to record login log: {str(e)}")
def create_session(self, user_id: int, refresh_token: str, expires_at: datetime,
ip_address: str, user_agent: str) -> UserSession:
"""사용자 세션 생성"""
try:
session = UserSession(
user_id=user_id,
refresh_token=refresh_token,
expires_at=expires_at,
ip_address=ip_address,
user_agent=user_agent
)
self.db.add(session)
self.db.commit()
self.db.refresh(session)
logger.debug(f"Session created for user_id {user_id}")
return session
except Exception as e:
self.db.rollback()
logger.error(f"Failed to create session: {str(e)}")
raise
def find_session_by_token(self, refresh_token: str) -> Optional[UserSession]:
"""리프레시 토큰으로 세션 조회"""
try:
return self.db.query(UserSession).filter(
UserSession.refresh_token == refresh_token,
UserSession.is_active == True
).first()
except Exception as e:
logger.error(f"Failed to find session by token: {str(e)}")
return None
def deactivate_user_sessions(self, user_id: int):
"""사용자의 모든 세션 비활성화"""
try:
self.db.query(UserSession).filter(
UserSession.user_id == user_id
).update({"is_active": False})
self.db.commit()
logger.info(f"All sessions deactivated for user_id {user_id}")
except Exception as e:
self.db.rollback()
logger.error(f"Failed to deactivate sessions for user_id {user_id}: {str(e)}")
raise