Files
ai-server/test_admin.py
Hyungi Ahn 1e098999c1 feat: AI 서버 관리 페이지 Phase 3 보안 강화 - JWT 인증 시스템
🔐 JWT 기반 로그인 시스템:
- 로그인 페이지: 아름다운 애니메이션과 보안 정보 표시
- JWT 토큰: 24시간 또는 30일 (Remember Me) 만료 설정
- 비밀번호 암호화: bcrypt 해싱으로 안전한 저장
- 계정 잠금: 5회 실패 시 15분 자동 잠금

👥 사용자 계정 관리:
- admin/admin123 (관리자 권한)
- hyungi/hyungi123 (시스템 권한)
- 역할 기반 접근 제어 (RBAC)

🛡️ 보안 기능:
- 토큰 자동 검증 및 만료 처리
- 감사 로그: 로그인/로그아웃/관리 작업 추적
- 안전한 세션 관리 및 토큰 정리
- 클라이언트 사이드 토큰 검증

🎨 UI/UX 개선:
- 로그인 페이지: 그라디언트 배경, 플로팅 아이콘 애니메이션
- 사용자 메뉴: 헤더에 사용자명과 로그아웃 버튼 표시
- 보안 표시: SSL, 세션 타임아웃, JWT 인증 정보
- 반응형 디자인 및 다크모드 지원

🔧 기술 구현:
- FastAPI HTTPBearer 보안 스키마
- PyJWT 토큰 생성/검증
- bcrypt 비밀번호 해싱
- 클라이언트-서버 토큰 동기화

새 파일:
- templates/login.html: 로그인 페이지 HTML
- static/login.css: 로그인 페이지 스타일
- static/login.js: 로그인 JavaScript 로직
- server/auth.py: JWT 인증 시스템 (실제 서버용)

수정된 파일:
- test_admin.py: 테스트 서버에 JWT 인증 추가
- static/admin.js: JWT 토큰 기반 API 요청으로 변경
- templates/admin.html: 사용자 메뉴 및 로그아웃 버튼 추가
- static/admin.css: 사용자 메뉴 스타일 추가

보안 레벨: Phase 1 (API Key) → Phase 3 (JWT + 감사로그)
2025-08-18 15:24:01 +09:00

474 lines
15 KiB
Python

#!/usr/bin/env python3
"""
AI Server Admin Dashboard Test Server
맥북프로에서 관리 페이지만 테스트하기 위한 간단한 서버
"""
import os
import secrets
import uuid
import jwt
import bcrypt
from datetime import datetime, timedelta
from pathlib import Path
from typing import Optional
from fastapi import FastAPI, Request, HTTPException, Depends, Header
from fastapi.responses import HTMLResponse
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
import uvicorn
# FastAPI 앱 초기화
app = FastAPI(title="AI Server Admin Dashboard (Test Mode)")
# 정적 파일 및 템플릿 설정
app.mount("/static", StaticFiles(directory="static"), name="static")
templates = Jinja2Templates(directory="templates")
# 테스트용 설정
TEST_API_KEY = os.getenv("API_KEY", "test-admin-key-123")
TEST_SERVER_PORT = 28080
TEST_OLLAMA_HOST = "http://localhost:11434"
# JWT 설정
JWT_SECRET_KEY = "test-jwt-secret-key-for-development"
JWT_ALGORITHM = "HS256"
security = HTTPBearer(auto_error=False)
# 테스트용 사용자 데이터
TEST_USERS = {
"admin": {
"username": "admin",
"password_hash": bcrypt.hashpw("admin123".encode('utf-8'), bcrypt.gensalt()).decode('utf-8'),
"role": "admin"
},
"hyungi": {
"username": "hyungi",
"password_hash": bcrypt.hashpw("hyungi123".encode('utf-8'), bcrypt.gensalt()).decode('utf-8'),
"role": "system"
}
}
# 임시 데이터 저장소
api_keys_store = {
"test-key-1": {
"name": "Test Key 1",
"key": "test-api-key-abcd1234",
"created_at": datetime.now().isoformat(),
"usage_count": 42,
},
"test-key-2": {
"name": "Development Key",
"key": "dev-api-key-efgh5678",
"created_at": datetime.now().isoformat(),
"usage_count": 128,
}
}
# 테스트용 모델 데이터
test_models = [
{
"name": "llama3.2:3b",
"size": 2048000000, # 2GB
"status": "ready",
"is_active": True,
"last_used": datetime.now().isoformat(),
},
{
"name": "qwen2.5:7b",
"size": 4096000000, # 4GB
"status": "ready",
"is_active": False,
"last_used": "2024-12-20T10:30:00",
},
{
"name": "gemma2:2b",
"size": 1536000000, # 1.5GB
"status": "inactive",
"is_active": False,
"last_used": "2024-12-19T15:45:00",
}
]
# JWT 인증 함수들
def create_jwt_token(user_data: dict, remember_me: bool = False) -> str:
"""JWT 토큰 생성"""
expiration = datetime.utcnow() + timedelta(
days=30 if remember_me else 0,
hours=24 if not remember_me else 0
)
payload = {
"username": user_data["username"],
"role": user_data["role"],
"exp": expiration,
"iat": datetime.utcnow()
}
return jwt.encode(payload, JWT_SECRET_KEY, algorithm=JWT_ALGORITHM)
def verify_jwt_token(token: str) -> dict:
"""JWT 토큰 검증"""
try:
payload = jwt.decode(token, JWT_SECRET_KEY, algorithms=[JWT_ALGORITHM])
return payload
except jwt.ExpiredSignatureError:
raise HTTPException(status_code=401, detail="Token has expired")
except jwt.InvalidTokenError:
raise HTTPException(status_code=401, detail="Invalid token")
def verify_password(password: str, password_hash: str) -> bool:
"""비밀번호 검증"""
return bcrypt.checkpw(password.encode('utf-8'), password_hash.encode('utf-8'))
async def get_current_user(credentials: Optional[HTTPAuthorizationCredentials] = Depends(security)):
"""현재 인증된 사용자 가져오기"""
if not credentials:
raise HTTPException(status_code=401, detail="Authentication required")
try:
payload = verify_jwt_token(credentials.credentials)
username = payload.get("username")
user = TEST_USERS.get(username)
if not user:
raise HTTPException(status_code=401, detail="User not found")
return {
"username": user["username"],
"role": user["role"]
}
except HTTPException:
raise
except Exception as e:
print(f"JWT verification error: {e}")
raise HTTPException(status_code=401, detail="Invalid authentication credentials")
def require_api_key(x_api_key: Optional[str] = Header(None), api_key: Optional[str] = None):
"""API 키 검증 (테스트 모드에서는 URL 파라미터도 허용)"""
# URL 파라미터로 API 키가 전달된 경우
if api_key and api_key == TEST_API_KEY:
return api_key
# 헤더로 API 키가 전달된 경우
if x_api_key and x_api_key == TEST_API_KEY:
return x_api_key
# 테스트 모드에서는 기본 허용
return "test-mode"
async def require_admin_role(current_user: dict = Depends(get_current_user)):
"""관리자 권한 필요"""
if current_user["role"] not in ["admin", "system"]:
raise HTTPException(status_code=403, detail="Admin privileges required")
return current_user
# 유연한 인증 (JWT 또는 API 키)
async def flexible_auth(
request: Request,
credentials: Optional[HTTPAuthorizationCredentials] = Depends(security),
x_api_key: Optional[str] = Header(None)
):
"""JWT 또는 API 키 인증"""
# JWT 토큰 시도
if credentials:
try:
return await get_current_user(credentials)
except HTTPException:
pass
# API 키 시도 (테스트 모드)
if x_api_key == TEST_API_KEY:
return {"username": "api_user", "role": "system"}
# 둘 다 실패하면 로그인 페이지로 리다이렉트
raise HTTPException(status_code=401, detail="Authentication required")
@app.get("/", response_class=HTMLResponse)
async def root():
"""루트 페이지 - 관리 페이지로 리다이렉트"""
return HTMLResponse("""
<html>
<head><title>AI Server Test</title></head>
<body>
<h1>AI Server Admin Dashboard (Test Mode)</h1>
<p>이것은 맥북프로에서 관리 페이지를 테스트하기 위한 서버입니다.</p>
<p><a href="/admin">관리 페이지로 이동</a></p>
<p><strong>API Key:</strong> <code>test-admin-key-123</code></p>
</body>
</html>
""")
@app.get("/health")
async def health_check():
"""헬스 체크"""
return {"status": "ok", "mode": "test", "timestamp": datetime.now().isoformat()}
# JWT 인증 엔드포인트들
@app.get("/login", response_class=HTMLResponse)
async def login_page(request: Request):
"""로그인 페이지"""
return templates.TemplateResponse("login.html", {"request": request})
@app.post("/admin/login")
async def admin_login(request: Request):
"""JWT 기반 로그인 (테스트 모드)"""
try:
data = await request.json()
username = data.get("username", "").strip()
password = data.get("password", "")
remember_me = data.get("remember_me", False)
if not username or not password:
return {"success": False, "message": "Username and password are required"}
# 사용자 인증
user = TEST_USERS.get(username)
if user and verify_password(password, user["password_hash"]):
# JWT 토큰 생성
token = create_jwt_token(user, remember_me)
return {
"success": True,
"message": "Login successful",
"token": token,
"user": {
"username": user["username"],
"role": user["role"]
}
}
else:
return {"success": False, "message": "Invalid username or password"}
except Exception as e:
return {"success": False, "message": "Login error occurred"}
@app.get("/admin/verify-token")
async def verify_token(current_user: dict = Depends(get_current_user)):
"""JWT 토큰 검증"""
return {
"valid": True,
"user": current_user
}
@app.post("/admin/logout")
async def admin_logout(current_user: dict = Depends(get_current_user)):
"""로그아웃"""
return {"success": True, "message": "Logged out successfully"}
# Admin Dashboard Routes
@app.get("/admin", response_class=HTMLResponse)
async def admin_dashboard(request: Request):
"""관리자 대시보드 페이지 (클라이언트에서 JWT 검증)"""
# HTML 페이지를 먼저 반환하고, JavaScript에서 토큰 검증
return templates.TemplateResponse("admin.html", {
"request": request,
"server_port": TEST_SERVER_PORT,
"ollama_host": TEST_OLLAMA_HOST,
})
@app.get("/admin/ollama/status")
async def admin_ollama_status(api_key: str = Depends(require_api_key)):
"""Ollama 서버 상태 확인 (테스트 모드)"""
return {"status": "offline", "error": "Test mode - Ollama not available"}
@app.get("/admin/models")
async def admin_get_models(api_key: str = Depends(require_api_key)):
"""설치된 모델 목록 조회 (테스트 데이터)"""
return {"models": test_models}
@app.get("/admin/models/active")
async def admin_get_active_model(api_key: str = Depends(require_api_key)):
"""현재 활성 모델 조회"""
active_model = next((m for m in test_models if m["is_active"]), None)
return {"model": active_model["name"] if active_model else "None"}
@app.post("/admin/models/test")
async def admin_test_model(request: dict, api_key: str = Depends(require_api_key)):
"""모델 테스트 (시뮬레이션)"""
model_name = request.get("model")
if not model_name:
raise HTTPException(status_code=400, detail="Model name is required")
# 테스트 모드에서는 시뮬레이션 결과 반환
return {
"result": f"Test mode simulation: Model '{model_name}' would respond with 'Hello! This is a test response from {model_name}.'"
}
@app.get("/admin/api-keys")
async def admin_get_api_keys(api_key: str = Depends(require_api_key)):
"""API 키 목록 조회"""
keys = []
for key_id, key_data in api_keys_store.items():
keys.append({
"id": key_id,
"name": key_data.get("name", "Unnamed"),
"key": key_data.get("key", ""),
"created_at": key_data.get("created_at", datetime.now().isoformat()),
"usage_count": key_data.get("usage_count", 0),
})
return {"api_keys": keys}
@app.post("/admin/api-keys")
async def admin_create_api_key(request: dict, api_key: str = Depends(require_api_key)):
"""새 API 키 생성"""
name = request.get("name", "Unnamed Key")
new_key = secrets.token_urlsafe(32)
key_id = str(uuid.uuid4())
api_keys_store[key_id] = {
"name": name,
"key": new_key,
"created_at": datetime.now().isoformat(),
"usage_count": 0,
}
return {"api_key": new_key, "key_id": key_id}
@app.delete("/admin/api-keys/{key_id}")
async def admin_delete_api_key(key_id: str, api_key: str = Depends(require_api_key)):
"""API 키 삭제"""
if key_id in api_keys_store:
del api_keys_store[key_id]
return {"message": "API key deleted successfully"}
else:
raise HTTPException(status_code=404, detail="API key not found")
# Phase 2: Advanced Model Management (Test Mode)
@app.post("/admin/models/download")
async def admin_download_model(request: dict, api_key: str = Depends(require_api_key)):
"""모델 다운로드 (테스트 모드)"""
model_name = request.get("model")
if not model_name:
raise HTTPException(status_code=400, detail="Model name is required")
# 테스트 모드에서는 시뮬레이션
return {
"success": True,
"message": f"Test mode: Model '{model_name}' download simulation started",
"details": f"In real mode, this would download {model_name} from Ollama registry"
}
@app.delete("/admin/models/{model_name}")
async def admin_delete_model(model_name: str, api_key: str = Depends(require_api_key)):
"""모델 삭제 (테스트 모드)"""
# 테스트 데이터에서 모델 제거
global test_models
test_models = [m for m in test_models if m["name"] != model_name]
return {
"success": True,
"message": f"Test mode: Model '{model_name}' deleted from test data",
"details": f"In real mode, this would delete {model_name} from Ollama"
}
@app.get("/admin/models/available")
async def admin_get_available_models(api_key: str = Depends(require_api_key)):
"""다운로드 가능한 모델 목록"""
available_models = [
{
"name": "llama3.2:1b",
"description": "Meta의 Llama 3.2 1B 모델 - 가벼운 작업용",
"size": "1.3GB",
"tags": ["chat", "lightweight"]
},
{
"name": "llama3.2:3b",
"description": "Meta의 Llama 3.2 3B 모델 - 균형잡힌 성능",
"size": "2.0GB",
"tags": ["chat", "recommended"]
},
{
"name": "qwen2.5:7b",
"description": "Alibaba의 Qwen 2.5 7B 모델 - 다국어 지원",
"size": "4.1GB",
"tags": ["chat", "multilingual"]
},
{
"name": "gemma2:2b",
"description": "Google의 Gemma 2 2B 모델 - 효율적인 추론",
"size": "1.6GB",
"tags": ["chat", "efficient"]
},
{
"name": "codellama:7b",
"description": "Meta의 Code Llama 7B - 코드 생성 특화",
"size": "3.8GB",
"tags": ["code", "programming"]
},
{
"name": "mistral:7b",
"description": "Mistral AI의 7B 모델 - 고성능 추론",
"size": "4.1GB",
"tags": ["chat", "performance"]
}
]
return {"available_models": available_models}
# Phase 2: System Monitoring (Test Mode)
@app.get("/admin/system/stats")
async def admin_get_system_stats(api_key: str = Depends(require_api_key)):
"""시스템 리소스 사용률 조회 (테스트 데이터)"""
import random
# 테스트용 랜덤 데이터 생성
return {
"cpu": {
"usage_percent": round(random.uniform(10, 80), 1),
"core_count": 8
},
"memory": {
"usage_percent": round(random.uniform(30, 90), 1),
"used_gb": round(random.uniform(4, 12), 1),
"total_gb": 16
},
"disk": {
"usage_percent": round(random.uniform(20, 70), 1),
"used_gb": round(random.uniform(50, 200), 1),
"total_gb": 500
},
"gpu": [
{
"name": "Test GPU (Simulated)",
"load": round(random.uniform(0, 100), 1),
"memory_used": round(random.uniform(1000, 8000)),
"memory_total": 8192,
"temperature": round(random.uniform(45, 75))
}
],
"timestamp": datetime.now().isoformat()
}
if __name__ == "__main__":
print("🚀 AI Server Admin Dashboard (Test Mode)")
print(f"📍 Server: http://localhost:{TEST_SERVER_PORT}")
print(f"🔧 Admin: http://localhost:{TEST_SERVER_PORT}/admin")
print(f"🔑 API Key: {TEST_API_KEY}")
print("=" * 50)
uvicorn.run(
app,
host="0.0.0.0",
port=TEST_SERVER_PORT,
log_level="info"
)