Files
TK-BOM-Project/backend/app/auth/models.py
Hyungi Ahn e27020ae9b feat: 구매신청 기능 완성 및 SUPPORT/SPECIAL 카테고리 개선
- 모든 카테고리 구매신청 기능 완성 (PIPE, FITTING, VALVE, FLANGE, GASKET, BOLT, SUPPORT, SPECIAL, UNKNOWN)
- 구매신청 완료 항목: 회색 배경, 체크박스 비활성화, '구매신청완료' 배지 표시
- 전체 선택/구매신청 시 이미 구매신청된 항목 자동 제외
- 구매신청 quantity 타입 에러 수정 (문자열 -> 정수 변환)

SUPPORT 카테고리 (구 U-BOLT):
- U-BOLT -> SUPPORT로 카테고리명 변경
- 클램프, 유볼트, 우레탄블럭슈 분류 개선
- 테이블 헤더: 선택-종류-타입-크기-디스크립션-추가요구-사용자요구-수량
- 크기 정보 main_nom 필드에서 가져오기 (배관 인치)
- 엑셀 내보내기 형식 조정

SPECIAL 카테고리:
- SPECIAL 키워드 자재 자동 분류 (SPECIFICATION 제외)
- 파일 업로드 시 SPECIAL 카테고리 처리 로직 추가
- 도면번호 필드 추가 (drawing_name, line_no)
- 타입 필드: 크기/스케줄/재질 제외한 핵심 정보 표시
- 엑셀 DWG_NAME, LINE_NUM 컬럼 파싱 및 저장

FITTING 카테고리:
- 테이블 컬럼 너비 조정 (선택 2%, 종류 8.5%, 수량 12%)

구매신청 관리:
- 엑셀 재다운로드 형식 개선 (BOM 페이지와 동일한 형식)
- 그룹화된 자재 정보 포함하여 저장 및 다운로드
2025-10-14 12:39:25 +09:00

412 lines
15 KiB
Python

"""
인증 시스템 모델
TK-FB-Project의 사용자 모델을 참고하여 SQLAlchemy 기반으로 구현
"""
from sqlalchemy import Column, Integer, String, Boolean, DateTime, Text, ForeignKey
from sqlalchemy.orm import relationship
from sqlalchemy.sql import func
from datetime import datetime, timedelta
from typing import Optional, List, Dict, Any
from sqlalchemy.orm import Session
from sqlalchemy import text
import bcrypt
from ..database import Base
from ..utils.logger import get_logger
logger = get_logger(__name__)
class User(Base):
"""사용자 모델"""
__tablename__ = "users"
user_id = Column(Integer, primary_key=True, index=True)
username = Column(String(50), unique=True, index=True, nullable=False)
password = Column(String(255), nullable=False)
name = Column(String(100), nullable=False)
email = Column(String(100), index=True)
# 권한 관리 - 3단계 시스템: system(제작자) > admin(관리자) > user(사용자)
role = Column(String(20), default='user', nullable=False) # system, admin, user
access_level = Column(String(20), default='worker', nullable=False) # 호환성 유지
# 계정 상태 관리
is_active = Column(Boolean, default=True, nullable=False) # DEPRECATED: Use status instead
status = Column(String(20), default='active', nullable=False) # pending, active, suspended, deleted
failed_login_attempts = Column(Integer, default=0)
locked_until = Column(DateTime, nullable=True)
# 추가 정보
department = Column(String(50))
position = Column(String(50))
phone = Column(String(20))
# 타임스탬프
created_at = Column(DateTime, default=func.now())
updated_at = Column(DateTime, default=func.now(), onupdate=func.now())
last_login_at = Column(DateTime, nullable=True)
# 관계 설정
login_logs = relationship("LoginLog", back_populates="user", cascade="all, delete-orphan")
sessions = relationship("UserSession", back_populates="user", cascade="all, delete-orphan")
def __repr__(self):
return f"<User(username='{self.username}', name='{self.name}', role='{self.role}')>"
def to_dict(self) -> Dict[str, Any]:
"""사용자 정보를 딕셔너리로 변환 (비밀번호 제외)"""
return {
'user_id': self.user_id,
'username': self.username,
'name': self.name,
'email': self.email,
'role': self.role,
'access_level': self.access_level,
'is_active': self.is_active,
'department': self.department,
'position': self.position,
'phone': self.phone,
'created_at': self.created_at,
'last_login_at': self.last_login_at
}
def check_password(self, password: str) -> bool:
"""비밀번호 확인"""
try:
return bcrypt.checkpw(password.encode('utf-8'), self.password.encode('utf-8'))
except Exception as e:
logger.error(f"Password check failed for user {self.username}: {str(e)}")
return False
def set_password(self, password: str):
"""비밀번호 설정 (해싱)"""
try:
salt = bcrypt.gensalt()
self.password = bcrypt.hashpw(password.encode('utf-8'), salt).decode('utf-8')
except Exception as e:
logger.error(f"Password hashing failed for user {self.username}: {str(e)}")
raise
def is_locked(self) -> bool:
"""계정 잠금 상태 확인"""
if self.locked_until is None:
return False
return datetime.utcnow() < self.locked_until
def lock_account(self, minutes: int = 15):
"""계정 잠금"""
self.locked_until = datetime.utcnow() + timedelta(minutes=minutes)
logger.warning(f"User account locked: {self.username} for {minutes} minutes")
def unlock_account(self):
"""계정 잠금 해제"""
self.locked_until = None
self.failed_login_attempts = 0
logger.info(f"User account unlocked: {self.username}")
def increment_failed_attempts(self):
"""로그인 실패 횟수 증가"""
self.failed_login_attempts += 1
if self.failed_login_attempts >= 5:
self.lock_account()
def reset_failed_attempts(self):
"""로그인 실패 횟수 초기화"""
self.failed_login_attempts = 0
self.locked_until = None
def update_last_login(self):
"""마지막 로그인 시간 업데이트"""
self.last_login_at = datetime.utcnow()
# 권한 체크 메서드들
def is_system(self) -> bool:
"""시스템 관리자 권한 확인"""
return self.role == 'system'
def is_admin(self) -> bool:
"""관리자 권한 확인 (시스템 관리자 포함)"""
return self.role in ['system', 'admin']
def is_user(self) -> bool:
"""일반 사용자 권한 확인"""
return self.role == 'user'
def can_create_users(self) -> bool:
"""사용자 생성 권한 확인 (시스템 관리자만)"""
return self.is_system()
def can_view_logs(self) -> bool:
"""로그 조회 권한 확인 (관리자 이상)"""
return self.is_admin()
def can_manage_system(self) -> bool:
"""시스템 관리 권한 확인 (시스템 관리자만)"""
return self.is_system()
def get_role_display_name(self) -> str:
"""역할 표시명 반환"""
role_names = {
'system': '시스템 관리자',
'admin': '관리자',
'user': '사용자'
}
return role_names.get(self.role, '알 수 없음')
class LoginLog(Base):
"""로그인 이력 모델"""
__tablename__ = "login_logs"
log_id = Column(Integer, primary_key=True, index=True)
user_id = Column(Integer, ForeignKey("users.user_id", ondelete="CASCADE"), nullable=False)
login_time = Column(DateTime, default=func.now())
ip_address = Column(String(45))
user_agent = Column(Text)
login_status = Column(String(20), nullable=False) # 'success' or 'failed'
failure_reason = Column(String(100))
session_duration = Column(Integer) # 세션 지속 시간 (초)
created_at = Column(DateTime, default=func.now())
# 관계 설정
user = relationship("User", back_populates="login_logs")
def __repr__(self):
return f"<LoginLog(user_id={self.user_id}, status='{self.login_status}', time='{self.login_time}')>"
class UserSession(Base):
"""사용자 세션 모델 (Refresh Token 관리)"""
__tablename__ = "user_sessions"
session_id = Column(Integer, primary_key=True, index=True)
user_id = Column(Integer, ForeignKey("users.user_id", ondelete="CASCADE"), nullable=False)
refresh_token = Column(String(500), nullable=False, index=True)
expires_at = Column(DateTime, nullable=False)
ip_address = Column(String(45))
user_agent = Column(Text)
is_active = Column(Boolean, default=True)
created_at = Column(DateTime, default=func.now())
updated_at = Column(DateTime, default=func.now(), onupdate=func.now())
# 관계 설정
user = relationship("User", back_populates="sessions")
def __repr__(self):
return f"<UserSession(user_id={self.user_id}, expires_at='{self.expires_at}')>"
def is_expired(self) -> bool:
"""세션 만료 여부 확인"""
return datetime.utcnow() > self.expires_at
def deactivate(self):
"""세션 비활성화"""
self.is_active = False
class Permission(Base):
"""권한 모델"""
__tablename__ = "permissions"
permission_id = Column(Integer, primary_key=True, index=True)
permission_name = Column(String(50), unique=True, nullable=False)
description = Column(Text)
module = Column(String(30), index=True) # 모듈별 권한 관리
created_at = Column(DateTime, default=func.now())
def __repr__(self):
return f"<Permission(name='{self.permission_name}', module='{self.module}')>"
class RolePermission(Base):
"""역할-권한 매핑 모델"""
__tablename__ = "role_permissions"
role_permission_id = Column(Integer, primary_key=True, index=True)
role = Column(String(20), nullable=False, index=True)
permission_id = Column(Integer, ForeignKey("permissions.permission_id", ondelete="CASCADE"))
created_at = Column(DateTime, default=func.now())
# 관계 설정
permission = relationship("Permission")
def __repr__(self):
return f"<RolePermission(role='{self.role}', permission_id={self.permission_id})>"
class UserRepository:
"""사용자 데이터 접근 계층"""
def __init__(self, db: Session):
self.db = db
def find_by_username(self, username: str) -> Optional[User]:
"""사용자명으로 사용자 조회"""
try:
return self.db.query(User).filter(User.username == username).first()
except Exception as e:
logger.error(f"Failed to find user by username {username}: {str(e)}")
return None
def find_by_id(self, user_id: int) -> Optional[User]:
"""사용자 ID로 사용자 조회"""
try:
return self.db.query(User).filter(User.user_id == user_id).first()
except Exception as e:
logger.error(f"Failed to find user by id {user_id}: {str(e)}")
return None
def find_by_email(self, email: str) -> Optional[User]:
"""이메일로 사용자 조회"""
try:
return self.db.query(User).filter(User.email == email).first()
except Exception as e:
logger.error(f"Failed to find user by email {email}: {str(e)}")
return None
def create_user(self, user_data: Dict[str, Any]) -> User:
"""새 사용자 생성"""
try:
user = User(**user_data)
self.db.add(user)
self.db.commit()
self.db.refresh(user)
logger.info(f"User created: {user.username}")
return user
except Exception as e:
self.db.rollback()
logger.error(f"Failed to create user: {str(e)}")
raise
def update_user(self, user: User) -> User:
"""사용자 정보 업데이트"""
try:
self.db.commit()
self.db.refresh(user)
logger.info(f"User updated: {user.username}")
return user
except Exception as e:
self.db.rollback()
logger.error(f"Failed to update user {user.username}: {str(e)}")
raise
def delete_user(self, user: User):
"""사용자 삭제"""
try:
self.db.delete(user)
self.db.commit()
logger.info(f"User deleted: {user.username}")
except Exception as e:
self.db.rollback()
logger.error(f"Failed to delete user {user.username}: {str(e)}")
raise
def get_all_users(self, skip: int = 0, limit: int = 100) -> List[User]:
"""활성 사용자만 조회 (status='active')"""
try:
# status 필드가 있으면 status='active', 없으면 is_active=True (하위 호환성)
users = self.db.query(User)
if hasattr(User, 'status'):
users = users.filter(User.status == 'active')
else:
users = users.filter(User.is_active == True)
return users.offset(skip).limit(limit).all()
except Exception as e:
logger.error(f"Failed to get all users: {str(e)}")
return []
def get_user_permissions(self, role: str) -> List[str]:
"""사용자 역할에 따른 권한 목록 조회"""
try:
query = text("""
SELECT p.permission_name
FROM permissions p
JOIN role_permissions rp ON p.permission_id = rp.permission_id
WHERE rp.role = :role
""")
result = self.db.execute(query, {"role": role})
return [row[0] for row in result.fetchall()]
except Exception as e:
logger.error(f"Failed to get permissions for role {role}: {str(e)}")
return []
def record_login_log(self, user_id: int, ip_address: str, user_agent: str,
status: str, failure_reason: str = None):
"""로그인 이력 기록"""
try:
log = LoginLog(
user_id=user_id,
ip_address=ip_address,
user_agent=user_agent,
login_status=status,
failure_reason=failure_reason
)
self.db.add(log)
self.db.commit()
logger.debug(f"Login log recorded for user_id {user_id}: {status}")
except Exception as e:
self.db.rollback()
logger.error(f"Failed to record login log: {str(e)}")
def create_session(self, user_id: int, refresh_token: str, expires_at: datetime,
ip_address: str, user_agent: str) -> UserSession:
"""사용자 세션 생성"""
try:
session = UserSession(
user_id=user_id,
refresh_token=refresh_token,
expires_at=expires_at,
ip_address=ip_address,
user_agent=user_agent
)
self.db.add(session)
self.db.commit()
self.db.refresh(session)
logger.debug(f"Session created for user_id {user_id}")
return session
except Exception as e:
self.db.rollback()
logger.error(f"Failed to create session: {str(e)}")
raise
def find_session_by_token(self, refresh_token: str) -> Optional[UserSession]:
"""리프레시 토큰으로 세션 조회"""
try:
return self.db.query(UserSession).filter(
UserSession.refresh_token == refresh_token,
UserSession.is_active == True
).first()
except Exception as e:
logger.error(f"Failed to find session by token: {str(e)}")
return None
def deactivate_user_sessions(self, user_id: int):
"""사용자의 모든 세션 비활성화"""
try:
self.db.query(UserSession).filter(
UserSession.user_id == user_id
).update({"is_active": False})
self.db.commit()
logger.info(f"All sessions deactivated for user_id {user_id}")
except Exception as e:
self.db.rollback()
logger.error(f"Failed to deactivate sessions for user_id {user_id}: {str(e)}")
raise