Files
M-Project/backend/routers/auth.py
hyungi 1339e5dded feat: 작업보고서 시스템 완성
- 일일 공수 입력 기능
- 부적합 사항 등록 (이미지 선택사항)
- 날짜별 부적합 조회 (시간순 나열)
- 목록 관리 (인라인 편집, 작업시간 확인 버튼)
- 보고서 생성 (총 공수/부적합 시간 분리)
- JWT 인증 및 권한 관리
- Docker 기반 배포 환경 구성
2025-09-17 10:41:25 +09:00

132 lines
4.4 KiB
Python

from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from sqlalchemy.orm import Session
from typing import List
from database.database import get_db
from database.models import User, UserRole
from database import schemas
from services.auth_service import (
authenticate_user, create_access_token, verify_token,
get_password_hash, verify_password
)
router = APIRouter(prefix="/api/auth", tags=["auth"])
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/auth/login")
async def get_current_user(token: str = Depends(oauth2_scheme), db: Session = Depends(get_db)):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
token_data = verify_token(token, credentials_exception)
user = db.query(User).filter(User.username == token_data.username).first()
if user is None:
raise credentials_exception
if not user.is_active:
raise HTTPException(status_code=400, detail="Inactive user")
return user
async def get_current_admin(current_user: User = Depends(get_current_user)):
if current_user.role != UserRole.ADMIN:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Not enough permissions"
)
return current_user
@router.post("/login", response_model=schemas.Token)
async def login(login_data: schemas.LoginRequest, db: Session = Depends(get_db)):
user = authenticate_user(db, login_data.username, login_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token = create_access_token(data={"sub": user.username})
return {
"access_token": access_token,
"token_type": "bearer",
"user": user
}
@router.get("/me", response_model=schemas.User)
async def read_users_me(current_user: User = Depends(get_current_user)):
return current_user
@router.post("/users", response_model=schemas.User)
async def create_user(
user: schemas.UserCreate,
current_admin: User = Depends(get_current_admin),
db: Session = Depends(get_db)
):
# 중복 확인
db_user = db.query(User).filter(User.username == user.username).first()
if db_user:
raise HTTPException(status_code=400, detail="Username already registered")
# 사용자 생성
db_user = User(
username=user.username,
hashed_password=get_password_hash(user.password),
full_name=user.full_name,
role=user.role
)
db.add(db_user)
db.commit()
db.refresh(db_user)
return db_user
@router.get("/users", response_model=List[schemas.User])
async def read_users(
skip: int = 0,
limit: int = 100,
current_admin: User = Depends(get_current_admin),
db: Session = Depends(get_db)
):
users = db.query(User).offset(skip).limit(limit).all()
return users
@router.put("/users/{user_id}", response_model=schemas.User)
async def update_user(
user_id: int,
user_update: schemas.UserUpdate,
current_admin: User = Depends(get_current_admin),
db: Session = Depends(get_db)
):
db_user = db.query(User).filter(User.id == user_id).first()
if not db_user:
raise HTTPException(status_code=404, detail="User not found")
# 업데이트
update_data = user_update.dict(exclude_unset=True)
if "password" in update_data:
update_data["hashed_password"] = get_password_hash(update_data.pop("password"))
for field, value in update_data.items():
setattr(db_user, field, value)
db.commit()
db.refresh(db_user)
return db_user
@router.delete("/users/{user_id}")
async def delete_user(
user_id: int,
current_admin: User = Depends(get_current_admin),
db: Session = Depends(get_db)
):
db_user = db.query(User).filter(User.id == user_id).first()
if not db_user:
raise HTTPException(status_code=404, detail="User not found")
# 관리자는 삭제 불가
if db_user.role == UserRole.ADMIN:
raise HTTPException(status_code=400, detail="Cannot delete admin user")
db.delete(db_user)
db.commit()
return {"detail": "User deleted successfully"}