Files
TK-BOM-Project/backend/app/auth/auth_controller.py
Hyungi Ahn e14f8b69c7
Some checks failed
SonarQube Analysis / SonarQube Scan (push) Has been cancelled
회원가입 신청 기능 완성 (간소화)
백엔드:
- signup_routes.py 신규 생성
- POST /auth/signup-request: 회원가입 신청
- GET /auth/signup-requests: 승인 대기 목록 (관리자)
- POST /auth/approve-signup/{id}: 승인 (관리자)
- DELETE /auth/reject-signup/{id}: 거부 (관리자)
- main.py에 signup_router 등록

프론트엔드:
- SimpleLogin에 회원가입 폼 추가
- 필수 항목만: 사용자명, 비밀번호, 비밀번호 확인, 이름
- 간단하고 깔끔한 UI
- 비밀번호 일치 검사 및 최소 길이 검사
- 제출 후 승인 대기 안내 메시지
2025-10-14 07:28:06 +09:00

897 lines
26 KiB
Python

"""
인증 컨트롤러
TK-FB-Project의 authController.js를 참고하여 FastAPI용으로 구현
"""
from fastapi import APIRouter, Depends, HTTPException, status, Request
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from sqlalchemy.orm import Session
from pydantic import BaseModel, EmailStr, validator
from typing import Optional, List, Dict, Any
from ..database import get_db
from .auth_service import get_auth_service
from .jwt_service import jwt_service
from .models import UserRepository
from ..utils.logger import get_logger
logger = get_logger(__name__)
router = APIRouter()
security = HTTPBearer()
# Pydantic 모델들
class LoginRequest(BaseModel):
username: str
password: str
class RegisterRequest(BaseModel):
username: str
password: str
name: str
email: Optional[str] = None
access_level: str = 'worker'
department: Optional[str] = None
position: Optional[str] = None
phone: Optional[str] = None
role: str = "user"
@validator('email', pre=True)
def validate_email(cls, v):
if v == '' or v is None:
return None
# 간단한 이메일 형식 검증
if '@' not in v or '.' not in v.split('@')[-1]:
raise ValueError('올바른 이메일 형식을 입력해주세요')
return v
class RefreshTokenRequest(BaseModel):
refresh_token: str
class SignupRequest(BaseModel):
username: str
password: str
name: str
email: Optional[str] = None
department: Optional[str] = None
position: Optional[str] = None
phone: Optional[str] = None
reason: Optional[str] = None # 가입 사유
class LoginResponse(BaseModel):
success: bool
access_token: str
refresh_token: str
token_type: str
expires_in: int
user: Dict[str, Any]
redirect_url: str
permissions: List[str]
class RefreshTokenResponse(BaseModel):
success: bool
access_token: str
token_type: str
expires_in: int
user: Dict[str, Any]
class RegisterResponse(BaseModel):
success: bool
message: str
user_id: int
username: str
class LogoutResponse(BaseModel):
success: bool
message: str
class UserInfoResponse(BaseModel):
success: bool
user: Dict[str, Any]
permissions: List[str]
@router.post("/login", response_model=LoginResponse)
async def login(
login_data: LoginRequest,
request: Request,
db: Session = Depends(get_db)
):
"""
사용자 로그인
Args:
login_data: 로그인 정보 (사용자명, 비밀번호)
request: FastAPI Request 객체
db: 데이터베이스 세션
Returns:
LoginResponse: 로그인 결과 (토큰, 사용자 정보 등)
"""
try:
auth_service = get_auth_service(db)
result = await auth_service.login(
username=login_data.username,
password=login_data.password,
request=request
)
return LoginResponse(**result)
except Exception as e:
logger.error(f"Login endpoint error: {str(e)}")
raise
@router.post("/register", response_model=RegisterResponse)
async def register(
register_data: RegisterRequest,
credentials: HTTPAuthorizationCredentials = Depends(security),
db: Session = Depends(get_db)
):
"""
사용자 등록 (시스템 관리자만 가능)
Args:
register_data: 등록 정보
credentials: JWT 토큰 (시스템 관리자 권한 필요)
db: 데이터베이스 세션
Returns:
RegisterResponse: 등록 결과
"""
try:
# 토큰 검증 및 권한 확인
from .jwt_service import jwt_service
payload = jwt_service.verify_access_token(credentials.credentials)
# 시스템 관리자 권한 확인
if payload['role'] != 'system':
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="계정 생성은 시스템 관리자만 가능합니다"
)
auth_service = get_auth_service(db)
result = await auth_service.register(register_data.dict())
logger.info(f"User registered by system admin: {register_data.username} (created by: {payload['username']})")
return RegisterResponse(**result)
except HTTPException:
raise
except Exception as e:
logger.error(f"Register endpoint error: {str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="사용자 등록 중 오류가 발생했습니다"
)
@router.post("/refresh", response_model=RefreshTokenResponse)
async def refresh_token(
refresh_data: RefreshTokenRequest,
request: Request,
db: Session = Depends(get_db)
):
"""
토큰 갱신
Args:
refresh_data: 리프레시 토큰 정보
request: FastAPI Request 객체
db: 데이터베이스 세션
Returns:
RefreshTokenResponse: 새로운 토큰 정보
"""
try:
auth_service = get_auth_service(db)
result = await auth_service.refresh_token(
refresh_token=refresh_data.refresh_token,
request=request
)
return RefreshTokenResponse(**result)
except Exception as e:
logger.error(f"Refresh token endpoint error: {str(e)}")
raise
@router.post("/logout", response_model=LogoutResponse)
async def logout(
refresh_data: RefreshTokenRequest,
db: Session = Depends(get_db)
):
"""
로그아웃
Args:
refresh_data: 리프레시 토큰 정보
db: 데이터베이스 세션
Returns:
LogoutResponse: 로그아웃 결과
"""
try:
auth_service = get_auth_service(db)
result = await auth_service.logout(refresh_data.refresh_token)
return LogoutResponse(**result)
except Exception as e:
logger.error(f"Logout endpoint error: {str(e)}")
raise
@router.get("/me", response_model=UserInfoResponse)
async def get_current_user_info(
credentials: HTTPAuthorizationCredentials = Depends(security),
db: Session = Depends(get_db)
):
"""
현재 사용자 정보 조회
Args:
credentials: JWT 토큰
db: 데이터베이스 세션
Returns:
UserInfoResponse: 사용자 정보 및 권한
"""
try:
# 토큰 검증
payload = jwt_service.verify_access_token(credentials.credentials)
user_id = payload['user_id']
# 사용자 정보 조회
user_repo = UserRepository(db)
user = user_repo.find_by_id(user_id)
if not user or not user.is_active:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="사용자를 찾을 수 없거나 비활성화된 계정입니다"
)
# 권한 정보 조회
permissions = user_repo.get_user_permissions(user.role)
return UserInfoResponse(
success=True,
user=user.to_dict(),
permissions=permissions
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Get current user info error: {str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="사용자 정보 조회 중 오류가 발생했습니다"
)
@router.get("/verify")
async def verify_token(
credentials: HTTPAuthorizationCredentials = Depends(security)
):
"""
토큰 검증
Args:
credentials: JWT 토큰
Returns:
Dict: 토큰 검증 결과
"""
try:
payload = jwt_service.verify_access_token(credentials.credentials)
return {
'success': True,
'valid': True,
'user_id': payload['user_id'],
'username': payload['username'],
'role': payload['role'],
'expires_at': payload.get('exp')
}
except HTTPException as e:
return {
'success': False,
'valid': False,
'error': e.detail
}
except Exception as e:
logger.error(f"Token verification error: {str(e)}")
return {
'success': False,
'valid': False,
'error': '토큰 검증 중 오류가 발생했습니다'
}
# 관리자 전용 엔드포인트들
@router.get("/users")
async def get_all_users(
skip: int = 0,
limit: int = 100,
credentials: HTTPAuthorizationCredentials = Depends(security),
db: Session = Depends(get_db)
):
"""
모든 사용자 목록 조회 (관리자 전용)
Args:
skip: 건너뛸 레코드 수
limit: 조회할 레코드 수
credentials: JWT 토큰
db: 데이터베이스 세션
Returns:
Dict: 사용자 목록
"""
try:
# 토큰 검증 및 권한 확인
payload = jwt_service.verify_access_token(credentials.credentials)
if payload['role'] not in ['admin', 'system']:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="관리자 이상의 권한이 필요합니다"
)
# 사용자 목록 조회
user_repo = UserRepository(db)
users = user_repo.get_all_users(skip=skip, limit=limit)
return {
'success': True,
'users': [user.to_dict() for user in users],
'total_count': len(users)
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Get all users error: {str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="사용자 목록 조회 중 오류가 발생했습니다"
)
@router.delete("/users/{user_id}")
async def delete_user(
user_id: int,
credentials: HTTPAuthorizationCredentials = Depends(security),
db: Session = Depends(get_db)
):
"""
사용자 삭제 (관리자 전용)
Args:
user_id: 삭제할 사용자 ID
credentials: JWT 토큰
db: 데이터베이스 세션
Returns:
Dict: 삭제 결과
"""
try:
# 토큰 검증 및 권한 확인
payload = jwt_service.verify_access_token(credentials.credentials)
if payload['role'] != 'system':
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="사용자 삭제는 시스템 관리자만 가능합니다"
)
# 자기 자신 삭제 방지
if payload['user_id'] == user_id:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="자기 자신은 삭제할 수 없습니다"
)
# 사용자 조회 및 삭제
user_repo = UserRepository(db)
user = user_repo.find_by_id(user_id)
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="해당 사용자를 찾을 수 없습니다"
)
user_repo.delete_user(user)
logger.info(f"User deleted by admin: {user.username} (deleted by: {payload['username']})")
return {
'success': True,
'message': '사용자가 삭제되었습니다',
'deleted_user_id': user_id
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Delete user error: {str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="사용자 삭제 중 오류가 발생했습니다"
)
# 로그 관리 API (관리자 이상)
@router.get("/logs/login")
async def get_login_logs(
skip: int = 0,
limit: int = 100,
user_id: Optional[int] = None,
status: Optional[str] = None,
credentials: HTTPAuthorizationCredentials = Depends(security),
db: Session = Depends(get_db)
):
"""
로그인 로그 조회 (관리자 이상)
Args:
skip: 건너뛸 레코드 수
limit: 조회할 레코드 수
user_id: 특정 사용자 ID 필터
status: 로그인 상태 필터 (success/failed)
credentials: JWT 토큰
db: 데이터베이스 세션
Returns:
Dict: 로그인 로그 목록
"""
try:
# 토큰 검증 및 권한 확인
from .jwt_service import jwt_service
payload = jwt_service.verify_access_token(credentials.credentials)
if payload['role'] not in ['admin', 'system']:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="로그 조회는 관리자 이상의 권한이 필요합니다"
)
# 로그인 로그 조회
from .models import LoginLog, User
query = db.query(LoginLog).join(User)
if user_id:
query = query.filter(LoginLog.user_id == user_id)
if status:
query = query.filter(LoginLog.login_status == status)
logs = query.order_by(LoginLog.login_time.desc()).offset(skip).limit(limit).all()
return {
'success': True,
'logs': [
{
'log_id': log.log_id,
'user_id': log.user_id,
'username': log.user.username,
'name': log.user.name,
'login_time': log.login_time,
'ip_address': log.ip_address,
'user_agent': log.user_agent,
'login_status': log.login_status,
'failure_reason': log.failure_reason
}
for log in logs
],
'total_count': len(logs)
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Get login logs error: {str(e)}")
raise HTTPException(
status_code=500,
detail="로그인 로그 조회 중 오류가 발생했습니다"
)
@router.get("/logs/system")
async def get_system_logs(
skip: int = 0,
limit: int = 100,
level: Optional[str] = None,
credentials: HTTPAuthorizationCredentials = Depends(security),
db: Session = Depends(get_db)
):
"""
시스템 로그 조회 (관리자 이상)
Args:
skip: 건너뛸 레코드 수
limit: 조회할 레코드 수
level: 로그 레벨 필터 (ERROR, WARNING, INFO, DEBUG)
credentials: JWT 토큰
db: 데이터베이스 세션
Returns:
Dict: 시스템 로그 목록
"""
try:
# 토큰 검증 및 권한 확인
from .jwt_service import jwt_service
payload = jwt_service.verify_access_token(credentials.credentials)
if payload['role'] not in ['admin', 'system']:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="시스템 로그 조회는 관리자 이상의 권한이 필요합니다"
)
# 로그 파일에서 최근 로그 읽기 (임시 구현)
import os
from ..config import get_settings
settings = get_settings()
log_file_path = settings.logging.file_path
logs = []
if os.path.exists(log_file_path):
try:
with open(log_file_path, 'r', encoding='utf-8') as f:
lines = f.readlines()
# 최근 로그부터 처리
recent_lines = lines[-limit-skip:] if len(lines) > skip else lines
for line in reversed(recent_lines):
if line.strip():
# 간단한 로그 파싱 (실제로는 더 정교한 파싱 필요)
parts = line.strip().split(' - ')
if len(parts) >= 4:
timestamp = parts[0]
module = parts[1]
log_level = parts[2]
message = ' - '.join(parts[3:])
if not level or log_level == level:
logs.append({
'timestamp': timestamp,
'module': module,
'level': log_level,
'message': message
})
if len(logs) >= limit:
break
except Exception as e:
logger.error(f"Failed to read log file: {str(e)}")
return {
'success': True,
'logs': logs,
'total_count': len(logs)
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Get system logs error: {str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="시스템 로그 조회 중 오류가 발생했습니다"
)
# 사용자 역할 변경 API (시스템 관리자만)
@router.put("/users/{user_id}/role")
async def change_user_role(
user_id: int,
new_role: str,
credentials: HTTPAuthorizationCredentials = Depends(security),
db: Session = Depends(get_db)
):
"""
사용자 역할 변경 (시스템 관리자만)
Args:
user_id: 변경할 사용자 ID
new_role: 새로운 역할 (system, admin, user)
credentials: JWT 토큰
db: 데이터베이스 세션
Returns:
Dict: 변경 결과
"""
try:
# 토큰 검증 및 권한 확인
from .jwt_service import jwt_service
payload = jwt_service.verify_access_token(credentials.credentials)
if payload['role'] != 'system':
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="사용자 역할 변경은 시스템 관리자만 가능합니다"
)
# 유효한 역할인지 확인
if new_role not in ['system', 'admin', 'user']:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="유효하지 않은 역할입니다. (system, admin, user 중 선택)"
)
# 자기 자신의 역할 변경 방지
if payload['user_id'] == user_id:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="자기 자신의 역할은 변경할 수 없습니다"
)
# 사용자 조회 및 역할 변경
from .models import UserRepository
user_repo = UserRepository(db)
user = user_repo.find_by_id(user_id)
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="해당 사용자를 찾을 수 없습니다"
)
old_role = user.role
user.role = new_role
user_repo.update_user(user)
logger.info(f"User role changed: {user.username} ({old_role}{new_role}) by {payload['username']}")
return {
'success': True,
'message': f'사용자 역할이 변경되었습니다: {old_role}{new_role}',
'user_id': user_id,
'old_role': old_role,
'new_role': new_role
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Change user role error: {str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="사용자 역할 변경 중 오류가 발생했습니다"
)
# 프론트엔드 오류 로그 수집 API
@router.post("/logs/frontend-error")
async def log_frontend_error(
error_data: dict,
request: Request,
db: Session = Depends(get_db)
):
"""
프론트엔드 오류 로그 수집
Args:
error_data: 프론트엔드에서 전송한 오류 데이터
request: FastAPI Request 객체
db: 데이터베이스 세션
Returns:
Dict: 로그 저장 결과
"""
try:
from datetime import datetime
# 클라이언트 정보 추가
client_ip = request.client.host
user_agent = request.headers.get("user-agent", "")
# 오류 데이터 보강
enhanced_error_data = {
**error_data,
'client_ip': client_ip,
'server_timestamp': datetime.utcnow().isoformat(),
'user_agent': user_agent
}
# 로그로 기록
logger.error(f"Frontend Error: {error_data.get('type', 'unknown')} - {error_data.get('message', 'No message')}",
extra={'frontend_error': enhanced_error_data})
# 데이터베이스에 저장 (선택적)
# TODO: 필요시 frontend_errors 테이블 생성하여 저장
return {
'success': True,
'message': '오류가 기록되었습니다',
'timestamp': enhanced_error_data['server_timestamp']
}
except Exception as e:
logger.error(f"Failed to log frontend error: {str(e)}")
return {
'success': False,
'message': '오류 기록에 실패했습니다'
}
# 프로필 업데이트 API
class ProfileUpdateRequest(BaseModel):
name: str
email: Optional[EmailStr] = None
department: Optional[str] = None
position: Optional[str] = None
@router.put("/profile")
async def update_profile(
profile_data: ProfileUpdateRequest,
credentials: HTTPAuthorizationCredentials = Depends(security),
db: Session = Depends(get_db)
):
"""
사용자 프로필 업데이트
Args:
profile_data: 업데이트할 프로필 정보
credentials: JWT 토큰
db: 데이터베이스 세션
Returns:
Dict: 업데이트 결과
"""
try:
# 토큰 검증
from .jwt_service import jwt_service
payload = jwt_service.verify_access_token(credentials.credentials)
user_id = payload['user_id']
# 사용자 조회
from .models import UserRepository
user_repo = UserRepository(db)
user = user_repo.find_by_id(user_id)
if not user or not user.is_active:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="사용자를 찾을 수 없습니다"
)
# 이메일 중복 확인 (다른 사용자가 사용 중인지)
if profile_data.email and profile_data.email != user.email:
existing_email = user_repo.find_by_email(profile_data.email)
if existing_email and existing_email.user_id != user_id:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="이미 사용 중인 이메일입니다"
)
# 프로필 업데이트
user.name = profile_data.name
user.email = profile_data.email
user.department = profile_data.department
user.position = profile_data.position
user_repo.update_user(user)
logger.info(f"Profile updated for user: {user.username}")
return {
'success': True,
'message': '프로필이 업데이트되었습니다',
'user': user.to_dict()
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Profile update error: {str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="프로필 업데이트 중 오류가 발생했습니다"
)
# 비밀번호 변경 API
class ChangePasswordRequest(BaseModel):
current_password: str
new_password: str
@router.put("/change-password")
async def change_password(
password_data: ChangePasswordRequest,
credentials: HTTPAuthorizationCredentials = Depends(security),
db: Session = Depends(get_db)
):
"""
사용자 비밀번호 변경
Args:
password_data: 현재 비밀번호와 새 비밀번호
credentials: JWT 토큰
db: 데이터베이스 세션
Returns:
Dict: 변경 결과
"""
try:
# 토큰 검증
from .jwt_service import jwt_service
payload = jwt_service.verify_access_token(credentials.credentials)
user_id = payload['user_id']
# 사용자 조회
from .models import UserRepository
user_repo = UserRepository(db)
user = user_repo.find_by_id(user_id)
if not user or not user.is_active:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="사용자를 찾을 수 없습니다"
)
# 현재 비밀번호 확인
if not user.check_password(password_data.current_password):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="현재 비밀번호가 올바르지 않습니다"
)
# 새 비밀번호 유효성 검사
if len(password_data.new_password) < 8:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="새 비밀번호는 8자 이상이어야 합니다"
)
# 비밀번호 변경
user.set_password(password_data.new_password)
user_repo.update_user(user)
logger.info(f"Password changed for user: {user.username}")
return {
'success': True,
'message': '비밀번호가 변경되었습니다'
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Password change error: {str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="비밀번호 변경 중 오류가 발생했습니다"
)