""" 인증 관련 API 라우터 """ from fastapi import APIRouter, Depends, HTTPException, status from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select, update from datetime import datetime from ...core.database import get_db from ...core.security import verify_password, create_access_token, create_refresh_token, get_password_hash from ...core.config import settings from ...models.user import User from ...schemas.auth import ( LoginRequest, TokenResponse, RefreshTokenRequest, UserInfo, ChangePasswordRequest, CreateUserRequest ) from ..dependencies import get_current_active_user, get_current_admin_user router = APIRouter() @router.post("/login", response_model=TokenResponse) async def login( login_data: LoginRequest, db: AsyncSession = Depends(get_db) ): """사용자 로그인""" # 사용자 조회 result = await db.execute( select(User).where(User.email == login_data.email) ) user = result.scalar_one_or_none() # 사용자 존재 및 비밀번호 확인 if not user or not verify_password(login_data.password, user.hashed_password): raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Incorrect email or password" ) # 비활성 사용자 확인 if not user.is_active: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Inactive user" ) # 토큰 생성 access_token = create_access_token(data={"sub": str(user.id)}) refresh_token = create_refresh_token(data={"sub": str(user.id)}) # 마지막 로그인 시간 업데이트 await db.execute( update(User) .where(User.id == user.id) .values(last_login=datetime.utcnow()) ) await db.commit() return TokenResponse( access_token=access_token, refresh_token=refresh_token, expires_in=settings.ACCESS_TOKEN_EXPIRE_MINUTES * 60 ) @router.post("/refresh", response_model=TokenResponse) async def refresh_token( refresh_data: RefreshTokenRequest, db: AsyncSession = Depends(get_db) ): """토큰 갱신""" from ...core.security import verify_token try: # 리프레시 토큰 검증 payload = verify_token(refresh_data.refresh_token, token_type="refresh") user_id = payload.get("sub") if not user_id: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid refresh token" ) # 사용자 존재 확인 result = await db.execute(select(User).where(User.id == user_id)) user = result.scalar_one_or_none() if not user or not user.is_active: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="User not found or inactive" ) # 새 토큰 생성 access_token = create_access_token(data={"sub": str(user.id)}) new_refresh_token = create_refresh_token(data={"sub": str(user.id)}) return TokenResponse( access_token=access_token, refresh_token=new_refresh_token, expires_in=settings.ACCESS_TOKEN_EXPIRE_MINUTES * 60 ) except Exception: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid refresh token" ) @router.get("/me", response_model=UserInfo) async def get_current_user_info( current_user: User = Depends(get_current_active_user) ): """현재 사용자 정보 조회""" return UserInfo.model_validate(current_user) @router.put("/change-password") async def change_password( password_data: ChangePasswordRequest, current_user: User = Depends(get_current_active_user), db: AsyncSession = Depends(get_db) ): """비밀번호 변경""" # 현재 비밀번호 확인 if not verify_password(password_data.current_password, current_user.hashed_password): raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Incorrect current password" ) # 새 비밀번호 해싱 및 업데이트 new_hashed_password = get_password_hash(password_data.new_password) await db.execute( update(User) .where(User.id == current_user.id) .values(hashed_password=new_hashed_password) ) await db.commit() return {"message": "Password changed successfully"} @router.post("/create-user", response_model=UserInfo) async def create_user( user_data: CreateUserRequest, admin_user: User = Depends(get_current_admin_user), db: AsyncSession = Depends(get_db) ): """새 사용자 생성 (관리자 전용)""" # 이메일 중복 확인 result = await db.execute( select(User).where(User.email == user_data.email) ) existing_user = result.scalar_one_or_none() if existing_user: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Email already registered" ) # 새 사용자 생성 new_user = User( email=user_data.email, hashed_password=get_password_hash(user_data.password), full_name=user_data.full_name, is_admin=user_data.is_admin, is_active=True ) db.add(new_user) await db.commit() await db.refresh(new_user) return UserInfo.from_orm(new_user) @router.post("/logout") async def logout( current_user: User = Depends(get_current_active_user) ): """로그아웃 (클라이언트에서 토큰 삭제)""" # 실제로는 클라이언트에서 토큰을 삭제하면 됨 # 필요시 토큰 블랙리스트 구현 가능 return {"message": "Logged out successfully"}