Files
tk-factory-services/tkeg/api/app/auth/middleware.py
2026-03-16 15:41:58 +09:00

75 lines
2.6 KiB
Python

"""SSO JWT 인증 미들웨어 — tkeg"""
import jwt
import os
from fastapi import Depends, HTTPException, status, Request
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from typing import Optional
SSO_JWT_SECRET = os.getenv("SECRET_KEY", "")
ALGORITHM = "HS256"
security = HTTPBearer(auto_error=False)
def _extract_token(
request: Request,
credentials: Optional[HTTPAuthorizationCredentials],
) -> str:
"""Bearer header 또는 sso_token 쿠키에서 토큰 추출"""
if credentials and credentials.credentials:
return credentials.credentials
token = request.cookies.get("sso_token", "")
if not token:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="인증 토큰이 필요합니다")
return token
def _decode_token(token: str) -> dict:
try:
payload = jwt.decode(token, SSO_JWT_SECRET, algorithms=[ALGORITHM])
except jwt.ExpiredSignatureError:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="토큰이 만료되었습니다")
except jwt.InvalidTokenError:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="유효하지 않은 토큰입니다")
return payload
async def get_current_user(
request: Request,
credentials: Optional[HTTPAuthorizationCredentials] = Depends(security),
) -> dict:
"""SSO JWT에서 사용자 정보 추출"""
token = _extract_token(request, credentials)
payload = _decode_token(token)
username = payload.get("sub") or payload.get("username")
if not username:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="토큰에 사용자 정보가 없습니다")
return {
"user_id": payload.get("user_id"),
"username": username,
"name": payload.get("name", username),
"role": payload.get("role", "user"),
"department": payload.get("department"),
}
async def get_current_user_optional(
request: Request,
credentials: Optional[HTTPAuthorizationCredentials] = Depends(security),
) -> Optional[dict]:
"""선택적 인증 — 토큰 없으면 None"""
try:
return await get_current_user(request, credentials)
except HTTPException:
return None
def require_roles(allowed_roles):
async def checker(user: dict = Depends(get_current_user)):
if user.get("role") not in allowed_roles:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="권한이 부족합니다")
return user
return checker
require_admin = require_roles(["admin", "system", "Admin", "System Admin"])