Files
tk-factory-services/system3-nonconformance/api/routers/auth.py
Hyungi Ahn 733bb0cb35 feat: tkuser 통합 관리 서비스 + 전체 시스템 SSO 쿠키 인증 통합
- tkuser 서비스 신규 추가 (API + Web)
  - 사용자/권한/프로젝트/부서/작업자/작업장/설비/작업/휴가 통합 관리
  - 작업장 탭: 공장→작업장 드릴다운 네비게이션 + 구역지도 클릭 연동
  - 작업 탭: 공정(work_types)→작업(tasks) 계층 관리
  - 휴가 탭: 유형 관리 + 연차 배정(근로기준법 자동계산)
- 전 시스템 SSO 쿠키 인증으로 통합 (.technicalkorea.net 공유)
- System 2: 작업 이슈 리포트 기능 강화
- System 3: tkuser API 연동, 페이지 권한 체계 적용
- docker-compose에 tkuser-api, tkuser-web 서비스 추가
- ARCHITECTURE.md, DEPLOYMENT.md 문서 작성

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-12 13:45:52 +09:00

211 lines
6.9 KiB
Python

from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from sqlalchemy.orm import Session
from typing import List
from pydantic import BaseModel
from database.database import get_db
from database.models import User, UserRole
from database import schemas
from services.auth_service import (
authenticate_user, create_access_token, verify_token,
get_password_hash, verify_password
)
router = APIRouter(prefix="/api/auth", tags=["auth"])
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/auth/login")
async def get_current_user(token: str = Depends(oauth2_scheme), db: Session = Depends(get_db)):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
payload = verify_token(token, credentials_exception)
username = payload.get("sub")
# 로컬 DB에서 사용자 조회
user = db.query(User).filter(User.username == username).first()
if user is not None:
if not user.is_active:
raise HTTPException(status_code=400, detail="Inactive user")
return user
# DB에 없는 SSO 사용자: JWT payload로 임시 User 객체 생성
sso_role = payload.get("role", "user")
role_map = {
"Admin": UserRole.admin,
"System Admin": UserRole.system,
"system": UserRole.system,
"admin": UserRole.admin,
"support_team": UserRole.support_team,
"leader": UserRole.leader,
}
mapped_role = role_map.get(sso_role, UserRole.user)
sso_user = User(
id=0,
username=username,
hashed_password="",
full_name=payload.get("name", username),
role=mapped_role,
is_active=True,
)
return sso_user
async def get_current_admin(current_user: User = Depends(get_current_user)):
if current_user.role not in [UserRole.admin, UserRole.system]:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Not enough permissions"
)
return current_user
@router.options("/login")
async def login_options():
"""OPTIONS preflight 요청 처리"""
return {"message": "OK"}
@router.post("/login", response_model=schemas.Token)
async def login(form_data: OAuth2PasswordRequestForm = Depends(), db: Session = Depends(get_db)):
user = authenticate_user(db, form_data.username, form_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token = create_access_token(data={"sub": user.username})
return {
"access_token": access_token,
"token_type": "bearer",
"user": user
}
@router.get("/me", response_model=schemas.User)
async def read_users_me(current_user: User = Depends(get_current_user)):
return current_user
@router.get("/users", response_model=List[schemas.User])
async def get_all_users(
current_admin: User = Depends(get_current_admin),
db: Session = Depends(get_db)
):
"""모든 사용자 목록 조회 (관리자 전용)"""
users = db.query(User).filter(User.is_active == True).all()
return users
@router.post("/users", response_model=schemas.User)
async def create_user(
user: schemas.UserCreate,
current_admin: User = Depends(get_current_admin),
db: Session = Depends(get_db)
):
# 중복 확인
db_user = db.query(User).filter(User.username == user.username).first()
if db_user:
raise HTTPException(status_code=400, detail="Username already registered")
# 사용자 생성
db_user = User(
username=user.username,
hashed_password=get_password_hash(user.password),
full_name=user.full_name,
role=user.role
)
db.add(db_user)
db.commit()
db.refresh(db_user)
return db_user
@router.get("/users", response_model=List[schemas.User])
async def read_users(
skip: int = 0,
limit: int = 100,
current_admin: User = Depends(get_current_admin),
db: Session = Depends(get_db)
):
users = db.query(User).offset(skip).limit(limit).all()
return users
@router.put("/users/{user_id}", response_model=schemas.User)
async def update_user(
user_id: int,
user_update: schemas.UserUpdate,
current_admin: User = Depends(get_current_admin),
db: Session = Depends(get_db)
):
db_user = db.query(User).filter(User.id == user_id).first()
if not db_user:
raise HTTPException(status_code=404, detail="User not found")
# 업데이트
update_data = user_update.dict(exclude_unset=True)
if "password" in update_data:
update_data["hashed_password"] = get_password_hash(update_data.pop("password"))
for field, value in update_data.items():
setattr(db_user, field, value)
db.commit()
db.refresh(db_user)
return db_user
@router.delete("/users/{username}")
async def delete_user(
username: str,
current_admin: User = Depends(get_current_admin),
db: Session = Depends(get_db)
):
db_user = db.query(User).filter(User.username == username).first()
if not db_user:
raise HTTPException(status_code=404, detail="User not found")
# hyungi 계정은 삭제 불가
if db_user.username == "hyungi":
raise HTTPException(status_code=400, detail="Cannot delete primary admin user")
db.delete(db_user)
db.commit()
return {"detail": "User deleted successfully"}
@router.post("/change-password")
async def change_password(
password_change: schemas.PasswordChange,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db)
):
# 현재 비밀번호 확인
if not verify_password(password_change.current_password, current_user.hashed_password):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Incorrect current password"
)
# 새 비밀번호 설정
current_user.hashed_password = get_password_hash(password_change.new_password)
db.commit()
return {"detail": "Password changed successfully"}
class PasswordReset(BaseModel):
new_password: str
@router.post("/users/{user_id}/reset-password")
async def reset_user_password(
user_id: int,
password_reset: PasswordReset,
current_admin: User = Depends(get_current_admin),
db: Session = Depends(get_db)
):
"""사용자 비밀번호 초기화 (관리자 전용)"""
db_user = db.query(User).filter(User.id == user_id).first()
if not db_user:
raise HTTPException(status_code=404, detail="User not found")
# 새 비밀번호로 업데이트
db_user.hashed_password = get_password_hash(password_reset.new_password)
db.commit()
return {"detail": f"Password reset successfully for user {db_user.username}"}