""" 인증 컨트롤러 TK-FB-Project의 authController.js를 참고하여 FastAPI용으로 구현 """ from fastapi import APIRouter, Depends, HTTPException, status, Request from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials from sqlalchemy.orm import Session from pydantic import BaseModel, EmailStr, validator from typing import Optional, List, Dict, Any from ..database import get_db from .auth_service import get_auth_service from .jwt_service import jwt_service from .models import UserRepository from ..utils.logger import get_logger logger = get_logger(__name__) router = APIRouter() security = HTTPBearer() # Pydantic 모델들 class LoginRequest(BaseModel): username: str password: str class RegisterRequest(BaseModel): username: str password: str name: str email: Optional[str] = None access_level: str = 'worker' department: Optional[str] = None position: Optional[str] = None phone: Optional[str] = None role: str = "user" @validator('email', pre=True) def validate_email(cls, v): if v == '' or v is None: return None # 간단한 이메일 형식 검증 if '@' not in v or '.' not in v.split('@')[-1]: raise ValueError('올바른 이메일 형식을 입력해주세요') return v class RefreshTokenRequest(BaseModel): refresh_token: str class LoginResponse(BaseModel): success: bool access_token: str refresh_token: str token_type: str expires_in: int user: Dict[str, Any] redirect_url: str permissions: List[str] class RefreshTokenResponse(BaseModel): success: bool access_token: str token_type: str expires_in: int user: Dict[str, Any] class RegisterResponse(BaseModel): success: bool message: str user_id: int username: str class LogoutResponse(BaseModel): success: bool message: str class UserInfoResponse(BaseModel): success: bool user: Dict[str, Any] permissions: List[str] @router.post("/login", response_model=LoginResponse) async def login( login_data: LoginRequest, request: Request, db: Session = Depends(get_db) ): """ 사용자 로그인 Args: login_data: 로그인 정보 (사용자명, 비밀번호) request: FastAPI Request 객체 db: 데이터베이스 세션 Returns: LoginResponse: 로그인 결과 (토큰, 사용자 정보 등) """ try: auth_service = get_auth_service(db) result = await auth_service.login( username=login_data.username, password=login_data.password, request=request ) return LoginResponse(**result) except Exception as e: logger.error(f"Login endpoint error: {str(e)}") raise @router.post("/register", response_model=RegisterResponse) async def register( register_data: RegisterRequest, credentials: HTTPAuthorizationCredentials = Depends(security), db: Session = Depends(get_db) ): """ 사용자 등록 (시스템 관리자만 가능) Args: register_data: 등록 정보 credentials: JWT 토큰 (시스템 관리자 권한 필요) db: 데이터베이스 세션 Returns: RegisterResponse: 등록 결과 """ try: # 토큰 검증 및 권한 확인 from .jwt_service import jwt_service payload = jwt_service.verify_access_token(credentials.credentials) # 시스템 관리자 권한 확인 if payload['role'] != 'system': raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="계정 생성은 시스템 관리자만 가능합니다" ) auth_service = get_auth_service(db) result = await auth_service.register(register_data.dict()) logger.info(f"User registered by system admin: {register_data.username} (created by: {payload['username']})") return RegisterResponse(**result) except HTTPException: raise except Exception as e: logger.error(f"Register endpoint error: {str(e)}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="사용자 등록 중 오류가 발생했습니다" ) @router.post("/refresh", response_model=RefreshTokenResponse) async def refresh_token( refresh_data: RefreshTokenRequest, request: Request, db: Session = Depends(get_db) ): """ 토큰 갱신 Args: refresh_data: 리프레시 토큰 정보 request: FastAPI Request 객체 db: 데이터베이스 세션 Returns: RefreshTokenResponse: 새로운 토큰 정보 """ try: auth_service = get_auth_service(db) result = await auth_service.refresh_token( refresh_token=refresh_data.refresh_token, request=request ) return RefreshTokenResponse(**result) except Exception as e: logger.error(f"Refresh token endpoint error: {str(e)}") raise @router.post("/logout", response_model=LogoutResponse) async def logout( refresh_data: RefreshTokenRequest, db: Session = Depends(get_db) ): """ 로그아웃 Args: refresh_data: 리프레시 토큰 정보 db: 데이터베이스 세션 Returns: LogoutResponse: 로그아웃 결과 """ try: auth_service = get_auth_service(db) result = await auth_service.logout(refresh_data.refresh_token) return LogoutResponse(**result) except Exception as e: logger.error(f"Logout endpoint error: {str(e)}") raise @router.get("/me", response_model=UserInfoResponse) async def get_current_user_info( credentials: HTTPAuthorizationCredentials = Depends(security), db: Session = Depends(get_db) ): """ 현재 사용자 정보 조회 Args: credentials: JWT 토큰 db: 데이터베이스 세션 Returns: UserInfoResponse: 사용자 정보 및 권한 """ try: # 토큰 검증 payload = jwt_service.verify_access_token(credentials.credentials) user_id = payload['user_id'] # 사용자 정보 조회 user_repo = UserRepository(db) user = user_repo.find_by_id(user_id) if not user or not user.is_active: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="사용자를 찾을 수 없거나 비활성화된 계정입니다" ) # 권한 정보 조회 permissions = user_repo.get_user_permissions(user.role) return UserInfoResponse( success=True, user=user.to_dict(), permissions=permissions ) except HTTPException: raise except Exception as e: logger.error(f"Get current user info error: {str(e)}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="사용자 정보 조회 중 오류가 발생했습니다" ) @router.get("/verify") async def verify_token( credentials: HTTPAuthorizationCredentials = Depends(security) ): """ 토큰 검증 Args: credentials: JWT 토큰 Returns: Dict: 토큰 검증 결과 """ try: payload = jwt_service.verify_access_token(credentials.credentials) return { 'success': True, 'valid': True, 'user_id': payload['user_id'], 'username': payload['username'], 'role': payload['role'], 'expires_at': payload.get('exp') } except HTTPException as e: return { 'success': False, 'valid': False, 'error': e.detail } except Exception as e: logger.error(f"Token verification error: {str(e)}") return { 'success': False, 'valid': False, 'error': '토큰 검증 중 오류가 발생했습니다' } # 관리자 전용 엔드포인트들 @router.get("/users") async def get_all_users( skip: int = 0, limit: int = 100, credentials: HTTPAuthorizationCredentials = Depends(security), db: Session = Depends(get_db) ): """ 모든 사용자 목록 조회 (관리자 전용) Args: skip: 건너뛸 레코드 수 limit: 조회할 레코드 수 credentials: JWT 토큰 db: 데이터베이스 세션 Returns: Dict: 사용자 목록 """ try: # 토큰 검증 및 권한 확인 payload = jwt_service.verify_access_token(credentials.credentials) if payload['role'] not in ['admin', 'system']: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="관리자 이상의 권한이 필요합니다" ) # 사용자 목록 조회 user_repo = UserRepository(db) users = user_repo.get_all_users(skip=skip, limit=limit) return { 'success': True, 'users': [user.to_dict() for user in users], 'total_count': len(users) } except HTTPException: raise except Exception as e: logger.error(f"Get all users error: {str(e)}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="사용자 목록 조회 중 오류가 발생했습니다" ) @router.delete("/users/{user_id}") async def delete_user( user_id: int, credentials: HTTPAuthorizationCredentials = Depends(security), db: Session = Depends(get_db) ): """ 사용자 삭제 (관리자 전용) Args: user_id: 삭제할 사용자 ID credentials: JWT 토큰 db: 데이터베이스 세션 Returns: Dict: 삭제 결과 """ try: # 토큰 검증 및 권한 확인 payload = jwt_service.verify_access_token(credentials.credentials) if payload['role'] != 'system': raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="사용자 삭제는 시스템 관리자만 가능합니다" ) # 자기 자신 삭제 방지 if payload['user_id'] == user_id: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="자기 자신은 삭제할 수 없습니다" ) # 사용자 조회 및 삭제 user_repo = UserRepository(db) user = user_repo.find_by_id(user_id) if not user: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="해당 사용자를 찾을 수 없습니다" ) user_repo.delete_user(user) logger.info(f"User deleted by admin: {user.username} (deleted by: {payload['username']})") return { 'success': True, 'message': '사용자가 삭제되었습니다', 'deleted_user_id': user_id } except HTTPException: raise except Exception as e: logger.error(f"Delete user error: {str(e)}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="사용자 삭제 중 오류가 발생했습니다" ) # 로그 관리 API (관리자 이상) @router.get("/logs/login") async def get_login_logs( skip: int = 0, limit: int = 100, user_id: Optional[int] = None, status: Optional[str] = None, credentials: HTTPAuthorizationCredentials = Depends(security), db: Session = Depends(get_db) ): """ 로그인 로그 조회 (관리자 이상) Args: skip: 건너뛸 레코드 수 limit: 조회할 레코드 수 user_id: 특정 사용자 ID 필터 status: 로그인 상태 필터 (success/failed) credentials: JWT 토큰 db: 데이터베이스 세션 Returns: Dict: 로그인 로그 목록 """ try: # 토큰 검증 및 권한 확인 from .jwt_service import jwt_service payload = jwt_service.verify_access_token(credentials.credentials) if payload['role'] not in ['admin', 'system']: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="로그 조회는 관리자 이상의 권한이 필요합니다" ) # 로그인 로그 조회 from .models import LoginLog, User query = db.query(LoginLog).join(User) if user_id: query = query.filter(LoginLog.user_id == user_id) if status: query = query.filter(LoginLog.login_status == status) logs = query.order_by(LoginLog.login_time.desc()).offset(skip).limit(limit).all() return { 'success': True, 'logs': [ { 'log_id': log.log_id, 'user_id': log.user_id, 'username': log.user.username, 'name': log.user.name, 'login_time': log.login_time, 'ip_address': log.ip_address, 'user_agent': log.user_agent, 'login_status': log.login_status, 'failure_reason': log.failure_reason } for log in logs ], 'total_count': len(logs) } except HTTPException: raise except Exception as e: logger.error(f"Get login logs error: {str(e)}") raise HTTPException( status_code=500, detail="로그인 로그 조회 중 오류가 발생했습니다" ) @router.get("/logs/system") async def get_system_logs( skip: int = 0, limit: int = 100, level: Optional[str] = None, credentials: HTTPAuthorizationCredentials = Depends(security), db: Session = Depends(get_db) ): """ 시스템 로그 조회 (관리자 이상) Args: skip: 건너뛸 레코드 수 limit: 조회할 레코드 수 level: 로그 레벨 필터 (ERROR, WARNING, INFO, DEBUG) credentials: JWT 토큰 db: 데이터베이스 세션 Returns: Dict: 시스템 로그 목록 """ try: # 토큰 검증 및 권한 확인 from .jwt_service import jwt_service payload = jwt_service.verify_access_token(credentials.credentials) if payload['role'] not in ['admin', 'system']: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="시스템 로그 조회는 관리자 이상의 권한이 필요합니다" ) # 로그 파일에서 최근 로그 읽기 (임시 구현) import os from ..config import get_settings settings = get_settings() log_file_path = settings.logging.file_path logs = [] if os.path.exists(log_file_path): try: with open(log_file_path, 'r', encoding='utf-8') as f: lines = f.readlines() # 최근 로그부터 처리 recent_lines = lines[-limit-skip:] if len(lines) > skip else lines for line in reversed(recent_lines): if line.strip(): # 간단한 로그 파싱 (실제로는 더 정교한 파싱 필요) parts = line.strip().split(' - ') if len(parts) >= 4: timestamp = parts[0] module = parts[1] log_level = parts[2] message = ' - '.join(parts[3:]) if not level or log_level == level: logs.append({ 'timestamp': timestamp, 'module': module, 'level': log_level, 'message': message }) if len(logs) >= limit: break except Exception as e: logger.error(f"Failed to read log file: {str(e)}") return { 'success': True, 'logs': logs, 'total_count': len(logs) } except HTTPException: raise except Exception as e: logger.error(f"Get system logs error: {str(e)}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="시스템 로그 조회 중 오류가 발생했습니다" ) # 사용자 역할 변경 API (시스템 관리자만) @router.put("/users/{user_id}/role") async def change_user_role( user_id: int, new_role: str, credentials: HTTPAuthorizationCredentials = Depends(security), db: Session = Depends(get_db) ): """ 사용자 역할 변경 (시스템 관리자만) Args: user_id: 변경할 사용자 ID new_role: 새로운 역할 (system, admin, user) credentials: JWT 토큰 db: 데이터베이스 세션 Returns: Dict: 변경 결과 """ try: # 토큰 검증 및 권한 확인 from .jwt_service import jwt_service payload = jwt_service.verify_access_token(credentials.credentials) if payload['role'] != 'system': raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="사용자 역할 변경은 시스템 관리자만 가능합니다" ) # 유효한 역할인지 확인 if new_role not in ['system', 'admin', 'user']: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="유효하지 않은 역할입니다. (system, admin, user 중 선택)" ) # 자기 자신의 역할 변경 방지 if payload['user_id'] == user_id: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="자기 자신의 역할은 변경할 수 없습니다" ) # 사용자 조회 및 역할 변경 from .models import UserRepository user_repo = UserRepository(db) user = user_repo.find_by_id(user_id) if not user: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="해당 사용자를 찾을 수 없습니다" ) old_role = user.role user.role = new_role user_repo.update_user(user) logger.info(f"User role changed: {user.username} ({old_role} → {new_role}) by {payload['username']}") return { 'success': True, 'message': f'사용자 역할이 변경되었습니다: {old_role} → {new_role}', 'user_id': user_id, 'old_role': old_role, 'new_role': new_role } except HTTPException: raise except Exception as e: logger.error(f"Change user role error: {str(e)}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="사용자 역할 변경 중 오류가 발생했습니다" ) # 프론트엔드 오류 로그 수집 API @router.post("/logs/frontend-error") async def log_frontend_error( error_data: dict, request: Request, db: Session = Depends(get_db) ): """ 프론트엔드 오류 로그 수집 Args: error_data: 프론트엔드에서 전송한 오류 데이터 request: FastAPI Request 객체 db: 데이터베이스 세션 Returns: Dict: 로그 저장 결과 """ try: from datetime import datetime # 클라이언트 정보 추가 client_ip = request.client.host user_agent = request.headers.get("user-agent", "") # 오류 데이터 보강 enhanced_error_data = { **error_data, 'client_ip': client_ip, 'server_timestamp': datetime.utcnow().isoformat(), 'user_agent': user_agent } # 로그로 기록 logger.error(f"Frontend Error: {error_data.get('type', 'unknown')} - {error_data.get('message', 'No message')}", extra={'frontend_error': enhanced_error_data}) # 데이터베이스에 저장 (선택적) # TODO: 필요시 frontend_errors 테이블 생성하여 저장 return { 'success': True, 'message': '오류가 기록되었습니다', 'timestamp': enhanced_error_data['server_timestamp'] } except Exception as e: logger.error(f"Failed to log frontend error: {str(e)}") return { 'success': False, 'message': '오류 기록에 실패했습니다' } # 프로필 업데이트 API class ProfileUpdateRequest(BaseModel): name: str email: Optional[EmailStr] = None department: Optional[str] = None position: Optional[str] = None @router.put("/profile") async def update_profile( profile_data: ProfileUpdateRequest, credentials: HTTPAuthorizationCredentials = Depends(security), db: Session = Depends(get_db) ): """ 사용자 프로필 업데이트 Args: profile_data: 업데이트할 프로필 정보 credentials: JWT 토큰 db: 데이터베이스 세션 Returns: Dict: 업데이트 결과 """ try: # 토큰 검증 from .jwt_service import jwt_service payload = jwt_service.verify_access_token(credentials.credentials) user_id = payload['user_id'] # 사용자 조회 from .models import UserRepository user_repo = UserRepository(db) user = user_repo.find_by_id(user_id) if not user or not user.is_active: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="사용자를 찾을 수 없습니다" ) # 이메일 중복 확인 (다른 사용자가 사용 중인지) if profile_data.email and profile_data.email != user.email: existing_email = user_repo.find_by_email(profile_data.email) if existing_email and existing_email.user_id != user_id: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="이미 사용 중인 이메일입니다" ) # 프로필 업데이트 user.name = profile_data.name user.email = profile_data.email user.department = profile_data.department user.position = profile_data.position user_repo.update_user(user) logger.info(f"Profile updated for user: {user.username}") return { 'success': True, 'message': '프로필이 업데이트되었습니다', 'user': user.to_dict() } except HTTPException: raise except Exception as e: logger.error(f"Profile update error: {str(e)}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="프로필 업데이트 중 오류가 발생했습니다" ) # 비밀번호 변경 API class ChangePasswordRequest(BaseModel): current_password: str new_password: str @router.put("/change-password") async def change_password( password_data: ChangePasswordRequest, credentials: HTTPAuthorizationCredentials = Depends(security), db: Session = Depends(get_db) ): """ 사용자 비밀번호 변경 Args: password_data: 현재 비밀번호와 새 비밀번호 credentials: JWT 토큰 db: 데이터베이스 세션 Returns: Dict: 변경 결과 """ try: # 토큰 검증 from .jwt_service import jwt_service payload = jwt_service.verify_access_token(credentials.credentials) user_id = payload['user_id'] # 사용자 조회 from .models import UserRepository user_repo = UserRepository(db) user = user_repo.find_by_id(user_id) if not user or not user.is_active: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="사용자를 찾을 수 없습니다" ) # 현재 비밀번호 확인 if not user.check_password(password_data.current_password): raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="현재 비밀번호가 올바르지 않습니다" ) # 새 비밀번호 유효성 검사 if len(password_data.new_password) < 8: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="새 비밀번호는 8자 이상이어야 합니다" ) # 비밀번호 변경 user.set_password(password_data.new_password) user_repo.update_user(user) logger.info(f"Password changed for user: {user.username}") return { 'success': True, 'message': '비밀번호가 변경되었습니다' } except HTTPException: raise except Exception as e: logger.error(f"Password change error: {str(e)}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="비밀번호 변경 중 오류가 발생했습니다" )