""" 인증 시스템 모델 TK-FB-Project의 사용자 모델을 참고하여 SQLAlchemy 기반으로 구현 """ from sqlalchemy import Column, Integer, String, Boolean, DateTime, Text, ForeignKey from sqlalchemy.orm import relationship from sqlalchemy.sql import func from datetime import datetime, timedelta from typing import Optional, List, Dict, Any from sqlalchemy.orm import Session from sqlalchemy import text import bcrypt from ..database import Base from ..utils.logger import get_logger logger = get_logger(__name__) class User(Base): """사용자 모델""" __tablename__ = "users" user_id = Column(Integer, primary_key=True, index=True) username = Column(String(50), unique=True, index=True, nullable=False) password = Column(String(255), nullable=False) name = Column(String(100), nullable=False) email = Column(String(100), index=True) # 권한 관리 - 3단계 시스템: system(제작자) > admin(관리자) > user(사용자) role = Column(String(20), default='user', nullable=False) # system, admin, user access_level = Column(String(20), default='worker', nullable=False) # 호환성 유지 # 계정 상태 관리 is_active = Column(Boolean, default=True, nullable=False) # DEPRECATED: Use status instead status = Column(String(20), default='active', nullable=False) # pending, active, suspended, deleted failed_login_attempts = Column(Integer, default=0) locked_until = Column(DateTime, nullable=True) # 추가 정보 department = Column(String(50)) position = Column(String(50)) phone = Column(String(20)) # 타임스탬프 created_at = Column(DateTime, default=func.now()) updated_at = Column(DateTime, default=func.now(), onupdate=func.now()) last_login_at = Column(DateTime, nullable=True) # 관계 설정 login_logs = relationship("LoginLog", back_populates="user", cascade="all, delete-orphan") sessions = relationship("UserSession", back_populates="user", cascade="all, delete-orphan") def __repr__(self): return f"" def to_dict(self) -> Dict[str, Any]: """사용자 정보를 딕셔너리로 변환 (비밀번호 제외)""" return { 'user_id': self.user_id, 'username': self.username, 'name': self.name, 'email': self.email, 'role': self.role, 'access_level': self.access_level, 'is_active': self.is_active, 'department': self.department, 'position': self.position, 'phone': self.phone, 'created_at': self.created_at, 'last_login_at': self.last_login_at } def check_password(self, password: str) -> bool: """비밀번호 확인""" try: return bcrypt.checkpw(password.encode('utf-8'), self.password.encode('utf-8')) except Exception as e: logger.error(f"Password check failed for user {self.username}: {str(e)}") return False def set_password(self, password: str): """비밀번호 설정 (해싱)""" try: salt = bcrypt.gensalt() self.password = bcrypt.hashpw(password.encode('utf-8'), salt).decode('utf-8') except Exception as e: logger.error(f"Password hashing failed for user {self.username}: {str(e)}") raise def is_locked(self) -> bool: """계정 잠금 상태 확인""" if self.locked_until is None: return False return datetime.utcnow() < self.locked_until def lock_account(self, minutes: int = 15): """계정 잠금""" self.locked_until = datetime.utcnow() + timedelta(minutes=minutes) logger.warning(f"User account locked: {self.username} for {minutes} minutes") def unlock_account(self): """계정 잠금 해제""" self.locked_until = None self.failed_login_attempts = 0 logger.info(f"User account unlocked: {self.username}") def increment_failed_attempts(self): """로그인 실패 횟수 증가""" self.failed_login_attempts += 1 if self.failed_login_attempts >= 5: self.lock_account() def reset_failed_attempts(self): """로그인 실패 횟수 초기화""" self.failed_login_attempts = 0 self.locked_until = None def update_last_login(self): """마지막 로그인 시간 업데이트""" self.last_login_at = datetime.utcnow() # 권한 체크 메서드들 def is_system(self) -> bool: """시스템 관리자 권한 확인""" return self.role == 'system' def is_admin(self) -> bool: """관리자 권한 확인 (시스템 관리자 포함)""" return self.role in ['system', 'admin'] def is_user(self) -> bool: """일반 사용자 권한 확인""" return self.role == 'user' def can_create_users(self) -> bool: """사용자 생성 권한 확인 (시스템 관리자만)""" return self.is_system() def can_view_logs(self) -> bool: """로그 조회 권한 확인 (관리자 이상)""" return self.is_admin() def can_manage_system(self) -> bool: """시스템 관리 권한 확인 (시스템 관리자만)""" return self.is_system() def get_role_display_name(self) -> str: """역할 표시명 반환""" role_names = { 'system': '시스템 관리자', 'admin': '관리자', 'user': '사용자' } return role_names.get(self.role, '알 수 없음') class LoginLog(Base): """로그인 이력 모델""" __tablename__ = "login_logs" log_id = Column(Integer, primary_key=True, index=True) user_id = Column(Integer, ForeignKey("users.user_id", ondelete="CASCADE"), nullable=False) login_time = Column(DateTime, default=func.now()) ip_address = Column(String(45)) user_agent = Column(Text) login_status = Column(String(20), nullable=False) # 'success' or 'failed' failure_reason = Column(String(100)) session_duration = Column(Integer) # 세션 지속 시간 (초) created_at = Column(DateTime, default=func.now()) # 관계 설정 user = relationship("User", back_populates="login_logs") def __repr__(self): return f"" class UserSession(Base): """사용자 세션 모델 (Refresh Token 관리)""" __tablename__ = "user_sessions" session_id = Column(Integer, primary_key=True, index=True) user_id = Column(Integer, ForeignKey("users.user_id", ondelete="CASCADE"), nullable=False) refresh_token = Column(String(500), nullable=False, index=True) expires_at = Column(DateTime, nullable=False) ip_address = Column(String(45)) user_agent = Column(Text) is_active = Column(Boolean, default=True) created_at = Column(DateTime, default=func.now()) updated_at = Column(DateTime, default=func.now(), onupdate=func.now()) # 관계 설정 user = relationship("User", back_populates="sessions") def __repr__(self): return f"" def is_expired(self) -> bool: """세션 만료 여부 확인""" return datetime.utcnow() > self.expires_at def deactivate(self): """세션 비활성화""" self.is_active = False class Permission(Base): """권한 모델""" __tablename__ = "permissions" permission_id = Column(Integer, primary_key=True, index=True) permission_name = Column(String(50), unique=True, nullable=False) description = Column(Text) module = Column(String(30), index=True) # 모듈별 권한 관리 created_at = Column(DateTime, default=func.now()) def __repr__(self): return f"" class RolePermission(Base): """역할-권한 매핑 모델""" __tablename__ = "role_permissions" role_permission_id = Column(Integer, primary_key=True, index=True) role = Column(String(20), nullable=False, index=True) permission_id = Column(Integer, ForeignKey("permissions.permission_id", ondelete="CASCADE")) created_at = Column(DateTime, default=func.now()) # 관계 설정 permission = relationship("Permission") def __repr__(self): return f"" class UserRepository: """사용자 데이터 접근 계층""" def __init__(self, db: Session): self.db = db def find_by_username(self, username: str) -> Optional[User]: """사용자명으로 사용자 조회""" try: return self.db.query(User).filter(User.username == username).first() except Exception as e: logger.error(f"Failed to find user by username {username}: {str(e)}") return None def find_by_id(self, user_id: int) -> Optional[User]: """사용자 ID로 사용자 조회""" try: return self.db.query(User).filter(User.user_id == user_id).first() except Exception as e: logger.error(f"Failed to find user by id {user_id}: {str(e)}") return None def find_by_email(self, email: str) -> Optional[User]: """이메일로 사용자 조회""" try: return self.db.query(User).filter(User.email == email).first() except Exception as e: logger.error(f"Failed to find user by email {email}: {str(e)}") return None def create_user(self, user_data: Dict[str, Any]) -> User: """새 사용자 생성""" try: user = User(**user_data) self.db.add(user) self.db.commit() self.db.refresh(user) logger.info(f"User created: {user.username}") return user except Exception as e: self.db.rollback() logger.error(f"Failed to create user: {str(e)}") raise def update_user(self, user: User) -> User: """사용자 정보 업데이트""" try: self.db.commit() self.db.refresh(user) logger.info(f"User updated: {user.username}") return user except Exception as e: self.db.rollback() logger.error(f"Failed to update user {user.username}: {str(e)}") raise def delete_user(self, user: User): """사용자 삭제""" try: self.db.delete(user) self.db.commit() logger.info(f"User deleted: {user.username}") except Exception as e: self.db.rollback() logger.error(f"Failed to delete user {user.username}: {str(e)}") raise def get_all_users(self, skip: int = 0, limit: int = 100) -> List[User]: """활성 사용자만 조회 (status='active')""" try: # status 필드가 있으면 status='active', 없으면 is_active=True (하위 호환성) users = self.db.query(User) if hasattr(User, 'status'): users = users.filter(User.status == 'active') else: users = users.filter(User.is_active == True) return users.offset(skip).limit(limit).all() except Exception as e: logger.error(f"Failed to get all users: {str(e)}") return [] def get_user_permissions(self, role: str) -> List[str]: """사용자 역할에 따른 권한 목록 조회""" try: query = text(""" SELECT p.permission_name FROM permissions p JOIN role_permissions rp ON p.permission_id = rp.permission_id WHERE rp.role = :role """) result = self.db.execute(query, {"role": role}) return [row[0] for row in result.fetchall()] except Exception as e: logger.error(f"Failed to get permissions for role {role}: {str(e)}") return [] def record_login_log(self, user_id: int, ip_address: str, user_agent: str, status: str, failure_reason: str = None): """로그인 이력 기록""" try: log = LoginLog( user_id=user_id, ip_address=ip_address, user_agent=user_agent, login_status=status, failure_reason=failure_reason ) self.db.add(log) self.db.commit() logger.debug(f"Login log recorded for user_id {user_id}: {status}") except Exception as e: self.db.rollback() logger.error(f"Failed to record login log: {str(e)}") def create_session(self, user_id: int, refresh_token: str, expires_at: datetime, ip_address: str, user_agent: str) -> UserSession: """사용자 세션 생성""" try: session = UserSession( user_id=user_id, refresh_token=refresh_token, expires_at=expires_at, ip_address=ip_address, user_agent=user_agent ) self.db.add(session) self.db.commit() self.db.refresh(session) logger.debug(f"Session created for user_id {user_id}") return session except Exception as e: self.db.rollback() logger.error(f"Failed to create session: {str(e)}") raise def find_session_by_token(self, refresh_token: str) -> Optional[UserSession]: """리프레시 토큰으로 세션 조회""" try: return self.db.query(UserSession).filter( UserSession.refresh_token == refresh_token, UserSession.is_active == True ).first() except Exception as e: logger.error(f"Failed to find session by token: {str(e)}") return None def deactivate_user_sessions(self, user_id: int): """사용자의 모든 세션 비활성화""" try: self.db.query(UserSession).filter( UserSession.user_id == user_id ).update({"is_active": False}) self.db.commit() logger.info(f"All sessions deactivated for user_id {user_id}") except Exception as e: self.db.rollback() logger.error(f"Failed to deactivate sessions for user_id {user_id}: {str(e)}") raise