feat(tkeg): tkeg BOM 자재관리 서비스 초기 세팅 (api + web + docker-compose)
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
74
tkeg/api/app/auth/middleware.py
Normal file
74
tkeg/api/app/auth/middleware.py
Normal file
@@ -0,0 +1,74 @@
|
||||
"""SSO JWT 인증 미들웨어 — tkeg"""
|
||||
import jwt
|
||||
import os
|
||||
from fastapi import Depends, HTTPException, status, Request
|
||||
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
|
||||
from typing import Optional
|
||||
|
||||
SSO_JWT_SECRET = os.getenv("SECRET_KEY", "")
|
||||
ALGORITHM = "HS256"
|
||||
security = HTTPBearer(auto_error=False)
|
||||
|
||||
|
||||
def _extract_token(
|
||||
request: Request,
|
||||
credentials: Optional[HTTPAuthorizationCredentials],
|
||||
) -> str:
|
||||
"""Bearer header 또는 sso_token 쿠키에서 토큰 추출"""
|
||||
if credentials and credentials.credentials:
|
||||
return credentials.credentials
|
||||
token = request.cookies.get("sso_token", "")
|
||||
if not token:
|
||||
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="인증 토큰이 필요합니다")
|
||||
return token
|
||||
|
||||
|
||||
def _decode_token(token: str) -> dict:
|
||||
try:
|
||||
payload = jwt.decode(token, SSO_JWT_SECRET, algorithms=[ALGORITHM])
|
||||
except jwt.ExpiredSignatureError:
|
||||
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="토큰이 만료되었습니다")
|
||||
except jwt.InvalidTokenError:
|
||||
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="유효하지 않은 토큰입니다")
|
||||
return payload
|
||||
|
||||
|
||||
async def get_current_user(
|
||||
request: Request,
|
||||
credentials: Optional[HTTPAuthorizationCredentials] = Depends(security),
|
||||
) -> dict:
|
||||
"""SSO JWT에서 사용자 정보 추출"""
|
||||
token = _extract_token(request, credentials)
|
||||
payload = _decode_token(token)
|
||||
username = payload.get("sub") or payload.get("username")
|
||||
if not username:
|
||||
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="토큰에 사용자 정보가 없습니다")
|
||||
return {
|
||||
"user_id": payload.get("user_id"),
|
||||
"username": username,
|
||||
"name": payload.get("name", username),
|
||||
"role": payload.get("role", "user"),
|
||||
"department": payload.get("department"),
|
||||
}
|
||||
|
||||
|
||||
async def get_current_user_optional(
|
||||
request: Request,
|
||||
credentials: Optional[HTTPAuthorizationCredentials] = Depends(security),
|
||||
) -> Optional[dict]:
|
||||
"""선택적 인증 — 토큰 없으면 None"""
|
||||
try:
|
||||
return await get_current_user(request, credentials)
|
||||
except HTTPException:
|
||||
return None
|
||||
|
||||
|
||||
def require_roles(allowed_roles):
|
||||
async def checker(user: dict = Depends(get_current_user)):
|
||||
if user.get("role") not in allowed_roles:
|
||||
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="권한이 부족합니다")
|
||||
return user
|
||||
return checker
|
||||
|
||||
|
||||
require_admin = require_roles(["admin", "system", "Admin", "System Admin"])
|
||||
Reference in New Issue
Block a user