Files
tk-factory-services/system3-nonconformance/api/routers/auth.py
Hyungi Ahn 6495b8af32 feat: SSO 쿠키 인증 통합 + 서브도메인 라우팅 아키텍처
- Path-based 라우팅을 서브도메인 기반으로 전환
  (tkfb/tkreport/tkqc.technicalkorea.net)
- 3개 시스템 프론트엔드에 SSO 쿠키 인증 통합
  (domain=.technicalkorea.net, localStorage 폴백)
- Gateway: 포털+로그인+System1 프록시, 쿠키 SSO 설정
- System 1: 토큰키 통일, nginx.conf 생성, 신고페이지 리다이렉트
- System 2: api-base.js/app-init.js 생성, getSSOToken() 통합
- System 3: TokenManager 쿠키 지원, 중앙 로그인 리다이렉트
- docker-compose.yml에 cloudflared 서비스 추가
- DEPLOY-GUIDE.md 배포 가이드 작성

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-09 18:41:44 +09:00

207 lines
6.7 KiB
Python

from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from sqlalchemy.orm import Session
from typing import List
from pydantic import BaseModel
from database.database import get_db
from database.models import User, UserRole
from database import schemas
from services.auth_service import (
authenticate_user, create_access_token, verify_token,
get_password_hash, verify_password
)
router = APIRouter(prefix="/api/auth", tags=["auth"])
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/auth/login")
async def get_current_user(token: str = Depends(oauth2_scheme), db: Session = Depends(get_db)):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
payload = verify_token(token, credentials_exception)
username = payload.get("sub")
# 로컬 DB에서 사용자 조회
user = db.query(User).filter(User.username == username).first()
if user is not None:
if not user.is_active:
raise HTTPException(status_code=400, detail="Inactive user")
return user
# DB에 없는 SSO 사용자: JWT payload로 임시 User 객체 생성
sso_role = payload.get("role", "user")
role_map = {
"Admin": UserRole.admin,
"System Admin": UserRole.admin,
}
mapped_role = role_map.get(sso_role, UserRole.user)
sso_user = User(
id=0,
username=username,
hashed_password="",
full_name=payload.get("name", username),
role=mapped_role,
is_active=True,
)
return sso_user
async def get_current_admin(current_user: User = Depends(get_current_user)):
if current_user.role != UserRole.admin:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Not enough permissions"
)
return current_user
@router.options("/login")
async def login_options():
"""OPTIONS preflight 요청 처리"""
return {"message": "OK"}
@router.post("/login", response_model=schemas.Token)
async def login(form_data: OAuth2PasswordRequestForm = Depends(), db: Session = Depends(get_db)):
user = authenticate_user(db, form_data.username, form_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token = create_access_token(data={"sub": user.username})
return {
"access_token": access_token,
"token_type": "bearer",
"user": user
}
@router.get("/me", response_model=schemas.User)
async def read_users_me(current_user: User = Depends(get_current_user)):
return current_user
@router.get("/users", response_model=List[schemas.User])
async def get_all_users(
current_admin: User = Depends(get_current_admin),
db: Session = Depends(get_db)
):
"""모든 사용자 목록 조회 (관리자 전용)"""
users = db.query(User).filter(User.is_active == True).all()
return users
@router.post("/users", response_model=schemas.User)
async def create_user(
user: schemas.UserCreate,
current_admin: User = Depends(get_current_admin),
db: Session = Depends(get_db)
):
# 중복 확인
db_user = db.query(User).filter(User.username == user.username).first()
if db_user:
raise HTTPException(status_code=400, detail="Username already registered")
# 사용자 생성
db_user = User(
username=user.username,
hashed_password=get_password_hash(user.password),
full_name=user.full_name,
role=user.role
)
db.add(db_user)
db.commit()
db.refresh(db_user)
return db_user
@router.get("/users", response_model=List[schemas.User])
async def read_users(
skip: int = 0,
limit: int = 100,
current_admin: User = Depends(get_current_admin),
db: Session = Depends(get_db)
):
users = db.query(User).offset(skip).limit(limit).all()
return users
@router.put("/users/{user_id}", response_model=schemas.User)
async def update_user(
user_id: int,
user_update: schemas.UserUpdate,
current_admin: User = Depends(get_current_admin),
db: Session = Depends(get_db)
):
db_user = db.query(User).filter(User.id == user_id).first()
if not db_user:
raise HTTPException(status_code=404, detail="User not found")
# 업데이트
update_data = user_update.dict(exclude_unset=True)
if "password" in update_data:
update_data["hashed_password"] = get_password_hash(update_data.pop("password"))
for field, value in update_data.items():
setattr(db_user, field, value)
db.commit()
db.refresh(db_user)
return db_user
@router.delete("/users/{username}")
async def delete_user(
username: str,
current_admin: User = Depends(get_current_admin),
db: Session = Depends(get_db)
):
db_user = db.query(User).filter(User.username == username).first()
if not db_user:
raise HTTPException(status_code=404, detail="User not found")
# hyungi 계정은 삭제 불가
if db_user.username == "hyungi":
raise HTTPException(status_code=400, detail="Cannot delete primary admin user")
db.delete(db_user)
db.commit()
return {"detail": "User deleted successfully"}
@router.post("/change-password")
async def change_password(
password_change: schemas.PasswordChange,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db)
):
# 현재 비밀번호 확인
if not verify_password(password_change.current_password, current_user.hashed_password):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Incorrect current password"
)
# 새 비밀번호 설정
current_user.hashed_password = get_password_hash(password_change.new_password)
db.commit()
return {"detail": "Password changed successfully"}
class PasswordReset(BaseModel):
new_password: str
@router.post("/users/{user_id}/reset-password")
async def reset_user_password(
user_id: int,
password_reset: PasswordReset,
current_admin: User = Depends(get_current_admin),
db: Session = Depends(get_db)
):
"""사용자 비밀번호 초기화 (관리자 전용)"""
db_user = db.query(User).filter(User.id == user_id).first()
if not db_user:
raise HTTPException(status_code=404, detail="User not found")
# 새 비밀번호로 업데이트
db_user.hashed_password = get_password_hash(password_reset.new_password)
db.commit()
return {"detail": f"Password reset successfully for user {db_user.username}"}