"""SSO JWT 인증 미들웨어 — tkeg""" import jwt import os from fastapi import Depends, HTTPException, status, Request from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials from typing import Optional SSO_JWT_SECRET = os.getenv("SECRET_KEY", "") ALGORITHM = "HS256" security = HTTPBearer(auto_error=False) def _extract_token( request: Request, credentials: Optional[HTTPAuthorizationCredentials], ) -> str: """Bearer header 또는 sso_token 쿠키에서 토큰 추출""" if credentials and credentials.credentials: return credentials.credentials token = request.cookies.get("sso_token", "") if not token: raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="인증 토큰이 필요합니다") return token def _decode_token(token: str) -> dict: try: payload = jwt.decode(token, SSO_JWT_SECRET, algorithms=[ALGORITHM]) except jwt.ExpiredSignatureError: raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="토큰이 만료되었습니다") except jwt.InvalidTokenError: raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="유효하지 않은 토큰입니다") return payload async def get_current_user( request: Request, credentials: Optional[HTTPAuthorizationCredentials] = Depends(security), ) -> dict: """SSO JWT에서 사용자 정보 추출""" token = _extract_token(request, credentials) payload = _decode_token(token) username = payload.get("sub") or payload.get("username") if not username: raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="토큰에 사용자 정보가 없습니다") return { "user_id": payload.get("user_id"), "username": username, "name": payload.get("name", username), "role": payload.get("role", "user"), "department": payload.get("department"), } async def get_current_user_optional( request: Request, credentials: Optional[HTTPAuthorizationCredentials] = Depends(security), ) -> Optional[dict]: """선택적 인증 — 토큰 없으면 None""" try: return await get_current_user(request, credentials) except HTTPException: return None def require_roles(allowed_roles): async def checker(user: dict = Depends(get_current_user)): if user.get("role") not in allowed_roles: raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="권한이 부족합니다") return user return checker require_admin = require_roles(["admin", "system", "Admin", "System Admin"])