75 lines
2.6 KiB
Python
75 lines
2.6 KiB
Python
"""SSO JWT 인증 미들웨어 — tkeg"""
|
|
import jwt
|
|
import os
|
|
from fastapi import Depends, HTTPException, status, Request
|
|
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
|
|
from typing import Optional
|
|
|
|
SSO_JWT_SECRET = os.getenv("SECRET_KEY", "")
|
|
ALGORITHM = "HS256"
|
|
security = HTTPBearer(auto_error=False)
|
|
|
|
|
|
def _extract_token(
|
|
request: Request,
|
|
credentials: Optional[HTTPAuthorizationCredentials],
|
|
) -> str:
|
|
"""Bearer header 또는 sso_token 쿠키에서 토큰 추출"""
|
|
if credentials and credentials.credentials:
|
|
return credentials.credentials
|
|
token = request.cookies.get("sso_token", "")
|
|
if not token:
|
|
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="인증 토큰이 필요합니다")
|
|
return token
|
|
|
|
|
|
def _decode_token(token: str) -> dict:
|
|
try:
|
|
payload = jwt.decode(token, SSO_JWT_SECRET, algorithms=[ALGORITHM])
|
|
except jwt.ExpiredSignatureError:
|
|
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="토큰이 만료되었습니다")
|
|
except jwt.InvalidTokenError:
|
|
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="유효하지 않은 토큰입니다")
|
|
return payload
|
|
|
|
|
|
async def get_current_user(
|
|
request: Request,
|
|
credentials: Optional[HTTPAuthorizationCredentials] = Depends(security),
|
|
) -> dict:
|
|
"""SSO JWT에서 사용자 정보 추출"""
|
|
token = _extract_token(request, credentials)
|
|
payload = _decode_token(token)
|
|
username = payload.get("sub") or payload.get("username")
|
|
if not username:
|
|
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="토큰에 사용자 정보가 없습니다")
|
|
return {
|
|
"user_id": payload.get("user_id"),
|
|
"username": username,
|
|
"name": payload.get("name", username),
|
|
"role": payload.get("role", "user"),
|
|
"department": payload.get("department"),
|
|
}
|
|
|
|
|
|
async def get_current_user_optional(
|
|
request: Request,
|
|
credentials: Optional[HTTPAuthorizationCredentials] = Depends(security),
|
|
) -> Optional[dict]:
|
|
"""선택적 인증 — 토큰 없으면 None"""
|
|
try:
|
|
return await get_current_user(request, credentials)
|
|
except HTTPException:
|
|
return None
|
|
|
|
|
|
def require_roles(allowed_roles):
|
|
async def checker(user: dict = Depends(get_current_user)):
|
|
if user.get("role") not in allowed_roles:
|
|
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="권한이 부족합니다")
|
|
return user
|
|
return checker
|
|
|
|
|
|
require_admin = require_roles(["admin", "system", "Admin", "System Admin"])
|