Files
ai-server/server/auth.py
Hyungi Ahn 1e098999c1 feat: AI 서버 관리 페이지 Phase 3 보안 강화 - JWT 인증 시스템
🔐 JWT 기반 로그인 시스템:
- 로그인 페이지: 아름다운 애니메이션과 보안 정보 표시
- JWT 토큰: 24시간 또는 30일 (Remember Me) 만료 설정
- 비밀번호 암호화: bcrypt 해싱으로 안전한 저장
- 계정 잠금: 5회 실패 시 15분 자동 잠금

👥 사용자 계정 관리:
- admin/admin123 (관리자 권한)
- hyungi/hyungi123 (시스템 권한)
- 역할 기반 접근 제어 (RBAC)

🛡️ 보안 기능:
- 토큰 자동 검증 및 만료 처리
- 감사 로그: 로그인/로그아웃/관리 작업 추적
- 안전한 세션 관리 및 토큰 정리
- 클라이언트 사이드 토큰 검증

🎨 UI/UX 개선:
- 로그인 페이지: 그라디언트 배경, 플로팅 아이콘 애니메이션
- 사용자 메뉴: 헤더에 사용자명과 로그아웃 버튼 표시
- 보안 표시: SSL, 세션 타임아웃, JWT 인증 정보
- 반응형 디자인 및 다크모드 지원

🔧 기술 구현:
- FastAPI HTTPBearer 보안 스키마
- PyJWT 토큰 생성/검증
- bcrypt 비밀번호 해싱
- 클라이언트-서버 토큰 동기화

새 파일:
- templates/login.html: 로그인 페이지 HTML
- static/login.css: 로그인 페이지 스타일
- static/login.js: 로그인 JavaScript 로직
- server/auth.py: JWT 인증 시스템 (실제 서버용)

수정된 파일:
- test_admin.py: 테스트 서버에 JWT 인증 추가
- static/admin.js: JWT 토큰 기반 API 요청으로 변경
- templates/admin.html: 사용자 메뉴 및 로그아웃 버튼 추가
- static/admin.css: 사용자 메뉴 스타일 추가

보안 레벨: Phase 1 (API Key) → Phase 3 (JWT + 감사로그)
2025-08-18 15:24:01 +09:00

230 lines
7.7 KiB
Python

"""
JWT Authentication System for AI Server Admin
Phase 3: Security Enhancement
"""
import jwt
import bcrypt
import secrets
from datetime import datetime, timedelta
from typing import Optional, Dict, Any
from fastapi import HTTPException, Depends, Request
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
import os
# JWT Configuration
JWT_SECRET_KEY = os.getenv("JWT_SECRET_KEY", secrets.token_urlsafe(32))
JWT_ALGORITHM = "HS256"
JWT_EXPIRATION_HOURS = 24
JWT_REMEMBER_DAYS = 30
# Security
security = HTTPBearer()
# In-memory user store (in production, use a proper database)
USERS_DB = {
"admin": {
"username": "admin",
"password_hash": bcrypt.hashpw("admin123".encode('utf-8'), bcrypt.gensalt()).decode('utf-8'),
"role": "admin",
"created_at": datetime.now().isoformat(),
"last_login": None,
"login_attempts": 0,
"locked_until": None
},
"hyungi": {
"username": "hyungi",
"password_hash": bcrypt.hashpw("hyungi123".encode('utf-8'), bcrypt.gensalt()).decode('utf-8'),
"role": "system",
"created_at": datetime.now().isoformat(),
"last_login": None,
"login_attempts": 0,
"locked_until": None
}
}
# Login attempt tracking
LOGIN_ATTEMPTS = {}
MAX_LOGIN_ATTEMPTS = 5
LOCKOUT_DURATION_MINUTES = 15
class AuthManager:
@staticmethod
def hash_password(password: str) -> str:
"""Hash password using bcrypt"""
return bcrypt.hashpw(password.encode('utf-8'), bcrypt.gensalt()).decode('utf-8')
@staticmethod
def verify_password(password: str, password_hash: str) -> bool:
"""Verify password against hash"""
return bcrypt.checkpw(password.encode('utf-8'), password_hash.encode('utf-8'))
@staticmethod
def create_jwt_token(user_data: Dict[str, Any], remember_me: bool = False) -> str:
"""Create JWT token"""
expiration = datetime.utcnow() + timedelta(
days=JWT_REMEMBER_DAYS if remember_me else 0,
hours=JWT_EXPIRATION_HOURS if not remember_me else 0
)
payload = {
"username": user_data["username"],
"role": user_data["role"],
"exp": expiration,
"iat": datetime.utcnow(),
"type": "remember" if remember_me else "session"
}
return jwt.encode(payload, JWT_SECRET_KEY, algorithm=JWT_ALGORITHM)
@staticmethod
def verify_jwt_token(token: str) -> Dict[str, Any]:
"""Verify and decode JWT token"""
try:
payload = jwt.decode(token, JWT_SECRET_KEY, algorithms=[JWT_ALGORITHM])
return payload
except jwt.ExpiredSignatureError:
raise HTTPException(status_code=401, detail="Token has expired")
except jwt.InvalidTokenError:
raise HTTPException(status_code=401, detail="Invalid token")
@staticmethod
def is_account_locked(username: str) -> bool:
"""Check if account is locked due to failed attempts"""
user = USERS_DB.get(username)
if not user:
return False
if user["locked_until"]:
locked_until = datetime.fromisoformat(user["locked_until"])
if datetime.now() < locked_until:
return True
else:
# Unlock account
user["locked_until"] = None
user["login_attempts"] = 0
return False
@staticmethod
def record_login_attempt(username: str, success: bool, ip_address: str = None):
"""Record login attempt"""
user = USERS_DB.get(username)
if not user:
return
if success:
user["login_attempts"] = 0
user["locked_until"] = None
user["last_login"] = datetime.now().isoformat()
else:
user["login_attempts"] += 1
# Lock account after max attempts
if user["login_attempts"] >= MAX_LOGIN_ATTEMPTS:
user["locked_until"] = (
datetime.now() + timedelta(minutes=LOCKOUT_DURATION_MINUTES)
).isoformat()
@staticmethod
def authenticate_user(username: str, password: str) -> Optional[Dict[str, Any]]:
"""Authenticate user credentials"""
user = USERS_DB.get(username)
if not user:
return None
if AuthManager.is_account_locked(username):
raise HTTPException(
status_code=423,
detail=f"Account locked due to too many failed attempts. Try again later."
)
if AuthManager.verify_password(password, user["password_hash"]):
return user
return None
# Dependency functions
async def get_current_user(credentials: HTTPAuthorizationCredentials = Depends(security)):
"""Get current authenticated user from JWT token"""
try:
payload = AuthManager.verify_jwt_token(credentials.credentials)
username = payload.get("username")
user = USERS_DB.get(username)
if not user:
raise HTTPException(status_code=401, detail="User not found")
return {
"username": user["username"],
"role": user["role"],
"token_type": payload.get("type", "session")
}
except Exception as e:
raise HTTPException(status_code=401, detail="Invalid authentication credentials")
async def require_admin_role(current_user: dict = Depends(get_current_user)):
"""Require admin or system role"""
if current_user["role"] not in ["admin", "system"]:
raise HTTPException(status_code=403, detail="Admin privileges required")
return current_user
async def require_system_role(current_user: dict = Depends(get_current_user)):
"""Require system role"""
if current_user["role"] != "system":
raise HTTPException(status_code=403, detail="System privileges required")
return current_user
# Legacy API key support (for backward compatibility)
async def get_current_user_or_api_key(
request: Request,
credentials: Optional[HTTPAuthorizationCredentials] = Depends(security),
x_api_key: Optional[str] = None
):
"""Support both JWT and API key authentication"""
# Try JWT first
if credentials:
try:
return await get_current_user(credentials)
except HTTPException:
pass
# Fall back to API key
api_key = x_api_key or request.headers.get("X-API-Key")
if api_key and api_key == os.getenv("API_KEY", "test-admin-key-123"):
return {
"username": "api_user",
"role": "system",
"token_type": "api_key"
}
raise HTTPException(status_code=401, detail="Authentication required")
# Audit logging
class AuditLogger:
@staticmethod
def log_login(username: str, success: bool, ip_address: str = None, user_agent: str = None):
"""Log login attempt"""
log_entry = {
"timestamp": datetime.now().isoformat(),
"event": "login_attempt",
"username": username,
"success": success,
"ip_address": ip_address,
"user_agent": user_agent
}
print(f"AUDIT: {log_entry}") # In production, use proper logging
@staticmethod
def log_admin_action(username: str, action: str, details: str = None, ip_address: str = None):
"""Log admin action"""
log_entry = {
"timestamp": datetime.now().isoformat(),
"event": "admin_action",
"username": username,
"action": action,
"details": details,
"ip_address": ip_address
}
print(f"AUDIT: {log_entry}") # In production, use proper logging