feat: Phase 3 보안 강화 - API 키 AES-256 암호화
- server/encryption.py: AES-256 암호화/복호화 함수 추가 - test_admin.py: API 키 암호화 저장 및 조회 로직 구현 - static/admin.js: 암호화 상태 표시 UI 추가 - static/admin.css: 암호화 배지 스타일 추가 API 키가 이제 AES-256으로 암호화되어 저장됩니다.
This commit is contained in:
127
server/encryption.py
Normal file
127
server/encryption.py
Normal file
@@ -0,0 +1,127 @@
|
||||
"""
|
||||
AES-256 Encryption Module for API Keys
|
||||
Phase 3: Security Enhancement
|
||||
"""
|
||||
|
||||
import os
|
||||
import base64
|
||||
from cryptography.fernet import Fernet
|
||||
from cryptography.hazmat.primitives import hashes
|
||||
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
|
||||
from typing import Optional
|
||||
import secrets
|
||||
|
||||
class APIKeyEncryption:
|
||||
def __init__(self, master_password: Optional[str] = None):
|
||||
"""
|
||||
Initialize encryption with master password
|
||||
If no password provided, uses environment variable or generates one
|
||||
"""
|
||||
self.master_password = master_password or os.getenv("ENCRYPTION_KEY") or self._generate_master_key()
|
||||
self.salt = b'ai_server_salt_2025' # Fixed salt for consistency
|
||||
self._fernet = self._create_fernet()
|
||||
|
||||
def _generate_master_key(self) -> str:
|
||||
"""Generate a secure master key"""
|
||||
key = secrets.token_urlsafe(32)
|
||||
print(f"🔑 Generated new encryption key: {key}")
|
||||
print("⚠️ IMPORTANT: Save this key in your environment variables!")
|
||||
print(f" export ENCRYPTION_KEY='{key}'")
|
||||
return key
|
||||
|
||||
def _create_fernet(self) -> Fernet:
|
||||
"""Create Fernet cipher from master password"""
|
||||
kdf = PBKDF2HMAC(
|
||||
algorithm=hashes.SHA256(),
|
||||
length=32,
|
||||
salt=self.salt,
|
||||
iterations=100000,
|
||||
)
|
||||
key = base64.urlsafe_b64encode(kdf.derive(self.master_password.encode()))
|
||||
return Fernet(key)
|
||||
|
||||
def encrypt_api_key(self, api_key: str) -> str:
|
||||
"""Encrypt API key using AES-256"""
|
||||
try:
|
||||
encrypted_bytes = self._fernet.encrypt(api_key.encode())
|
||||
return base64.urlsafe_b64encode(encrypted_bytes).decode()
|
||||
except Exception as e:
|
||||
raise Exception(f"Encryption failed: {str(e)}")
|
||||
|
||||
def decrypt_api_key(self, encrypted_api_key: str) -> str:
|
||||
"""Decrypt API key"""
|
||||
try:
|
||||
encrypted_bytes = base64.urlsafe_b64decode(encrypted_api_key.encode())
|
||||
decrypted_bytes = self._fernet.decrypt(encrypted_bytes)
|
||||
return decrypted_bytes.decode()
|
||||
except Exception as e:
|
||||
raise Exception(f"Decryption failed: {str(e)}")
|
||||
|
||||
def is_encrypted(self, api_key: str) -> bool:
|
||||
"""Check if API key is already encrypted"""
|
||||
try:
|
||||
# Try to decode as base64 - encrypted keys are base64 encoded
|
||||
base64.urlsafe_b64decode(api_key.encode())
|
||||
# Try to decrypt - if successful, it's encrypted
|
||||
self.decrypt_api_key(api_key)
|
||||
return True
|
||||
except:
|
||||
return False
|
||||
|
||||
def rotate_encryption_key(self, new_master_password: str, encrypted_keys: list) -> list:
|
||||
"""Rotate encryption key - decrypt with old key, encrypt with new key"""
|
||||
old_fernet = self._fernet
|
||||
|
||||
# Create new Fernet with new password
|
||||
self.master_password = new_master_password
|
||||
self._fernet = self._create_fernet()
|
||||
|
||||
rotated_keys = []
|
||||
for encrypted_key in encrypted_keys:
|
||||
try:
|
||||
# Decrypt with old key
|
||||
decrypted = old_fernet.decrypt(base64.urlsafe_b64decode(encrypted_key.encode()))
|
||||
# Encrypt with new key
|
||||
new_encrypted = self.encrypt_api_key(decrypted.decode())
|
||||
rotated_keys.append(new_encrypted)
|
||||
except Exception as e:
|
||||
print(f"Failed to rotate key: {e}")
|
||||
rotated_keys.append(encrypted_key) # Keep original if rotation fails
|
||||
|
||||
return rotated_keys
|
||||
|
||||
# Global encryption instance
|
||||
encryption = APIKeyEncryption()
|
||||
|
||||
def encrypt_api_key(api_key: str) -> str:
|
||||
"""Convenience function to encrypt API key"""
|
||||
return encryption.encrypt_api_key(api_key)
|
||||
|
||||
def decrypt_api_key(encrypted_api_key: str) -> str:
|
||||
"""Convenience function to decrypt API key"""
|
||||
return encryption.decrypt_api_key(encrypted_api_key)
|
||||
|
||||
def is_encrypted(api_key: str) -> bool:
|
||||
"""Convenience function to check if API key is encrypted"""
|
||||
return encryption.is_encrypted(api_key)
|
||||
|
||||
# Test the encryption system
|
||||
if __name__ == "__main__":
|
||||
# Test encryption/decryption
|
||||
test_key = "test-api-key-12345"
|
||||
|
||||
print("🧪 Testing API Key Encryption...")
|
||||
print(f"Original: {test_key}")
|
||||
|
||||
# Encrypt
|
||||
encrypted = encrypt_api_key(test_key)
|
||||
print(f"Encrypted: {encrypted}")
|
||||
|
||||
# Decrypt
|
||||
decrypted = decrypt_api_key(encrypted)
|
||||
print(f"Decrypted: {decrypted}")
|
||||
|
||||
# Verify
|
||||
print(f"✅ Match: {test_key == decrypted}")
|
||||
print(f"🔒 Is Encrypted: {is_encrypted(encrypted)}")
|
||||
print(f"🔓 Is Plain: {not is_encrypted(test_key)}")
|
||||
@@ -314,6 +314,32 @@ body {
|
||||
font-weight: 600;
|
||||
color: #2c3e50;
|
||||
margin-bottom: 0.3rem;
|
||||
display: flex;
|
||||
align-items: center;
|
||||
gap: 0.5rem;
|
||||
flex-wrap: wrap;
|
||||
}
|
||||
|
||||
.encryption-badge {
|
||||
display: inline-flex;
|
||||
align-items: center;
|
||||
gap: 0.3rem;
|
||||
padding: 0.2rem 0.5rem;
|
||||
border-radius: 12px;
|
||||
font-size: 0.7rem;
|
||||
font-weight: 500;
|
||||
text-transform: uppercase;
|
||||
letter-spacing: 0.5px;
|
||||
background: #27ae60;
|
||||
color: white;
|
||||
}
|
||||
|
||||
.encryption-badge.plain {
|
||||
background: #e74c3c;
|
||||
}
|
||||
|
||||
.encryption-badge i {
|
||||
font-size: 0.6rem;
|
||||
}
|
||||
|
||||
.api-key-value {
|
||||
|
||||
@@ -221,11 +221,15 @@ class AdminDashboard {
|
||||
container.innerHTML = apiKeys.map(key => `
|
||||
<div class="api-key-item">
|
||||
<div class="api-key-info">
|
||||
<div class="api-key-name">${key.name || 'Unnamed Key'}</div>
|
||||
<div class="api-key-name">
|
||||
${key.name || 'Unnamed Key'}
|
||||
${key.encrypted ? '<span class="encryption-badge"><i class="fas fa-lock"></i> Encrypted</span>' : '<span class="encryption-badge plain"><i class="fas fa-unlock"></i> Plain</span>'}
|
||||
</div>
|
||||
<div class="api-key-value">${this.maskApiKey(key.key)}</div>
|
||||
<div class="api-key-meta">
|
||||
Created: ${new Date(key.created_at).toLocaleString('ko-KR')} |
|
||||
Uses: ${key.usage_count || 0}
|
||||
${key.encrypted ? ' | 🔒 AES-256 Encrypted' : ' | ⚠️ Plain Text'}
|
||||
</div>
|
||||
</div>
|
||||
<div class="api-key-actions">
|
||||
|
||||
101
test_admin.py
101
test_admin.py
@@ -20,6 +20,18 @@ from fastapi.templating import Jinja2Templates
|
||||
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
|
||||
import uvicorn
|
||||
|
||||
# Import encryption module
|
||||
try:
|
||||
from server.encryption import encrypt_api_key, decrypt_api_key, is_encrypted
|
||||
ENCRYPTION_AVAILABLE = True
|
||||
except ImportError:
|
||||
print("⚠️ Encryption module not available, using plain text storage")
|
||||
ENCRYPTION_AVAILABLE = False
|
||||
|
||||
def encrypt_api_key(key): return key
|
||||
def decrypt_api_key(key): return key
|
||||
def is_encrypted(key): return False
|
||||
|
||||
# FastAPI 앱 초기화
|
||||
app = FastAPI(title="AI Server Admin Dashboard (Test Mode)")
|
||||
|
||||
@@ -51,21 +63,44 @@ TEST_USERS = {
|
||||
}
|
||||
}
|
||||
|
||||
# 임시 데이터 저장소
|
||||
api_keys_store = {
|
||||
"test-key-1": {
|
||||
"name": "Test Key 1",
|
||||
"key": "test-api-key-abcd1234",
|
||||
"created_at": datetime.now().isoformat(),
|
||||
"usage_count": 42,
|
||||
},
|
||||
"test-key-2": {
|
||||
"name": "Development Key",
|
||||
"key": "dev-api-key-efgh5678",
|
||||
"created_at": datetime.now().isoformat(),
|
||||
"usage_count": 128,
|
||||
# 임시 데이터 저장소 (암호화된 API 키)
|
||||
def initialize_api_keys():
|
||||
"""Initialize API keys with encryption"""
|
||||
keys = {
|
||||
"test-key-1": {
|
||||
"name": "Test Key 1",
|
||||
"key": "test-api-key-abcd1234",
|
||||
"created_at": datetime.now().isoformat(),
|
||||
"usage_count": 42,
|
||||
},
|
||||
"test-key-2": {
|
||||
"name": "Development Key",
|
||||
"key": "dev-api-key-efgh5678",
|
||||
"created_at": datetime.now().isoformat(),
|
||||
"usage_count": 128,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
# Encrypt API keys if encryption is available
|
||||
if ENCRYPTION_AVAILABLE:
|
||||
for key_id, key_data in keys.items():
|
||||
if not is_encrypted(key_data["key"]):
|
||||
try:
|
||||
key_data["key"] = encrypt_api_key(key_data["key"])
|
||||
key_data["encrypted"] = True
|
||||
print(f"🔒 Encrypted API key: {key_data['name']}")
|
||||
except Exception as e:
|
||||
print(f"❌ Failed to encrypt key {key_data['name']}: {e}")
|
||||
key_data["encrypted"] = False
|
||||
else:
|
||||
key_data["encrypted"] = True
|
||||
else:
|
||||
for key_data in keys.values():
|
||||
key_data["encrypted"] = False
|
||||
|
||||
return keys
|
||||
|
||||
api_keys_store = initialize_api_keys()
|
||||
|
||||
# 테스트용 모델 데이터
|
||||
test_models = [
|
||||
@@ -307,15 +342,25 @@ async def admin_test_model(request: dict, api_key: str = Depends(require_api_key
|
||||
|
||||
@app.get("/admin/api-keys")
|
||||
async def admin_get_api_keys(api_key: str = Depends(require_api_key)):
|
||||
"""API 키 목록 조회"""
|
||||
"""API 키 목록 조회 (복호화된 키 반환)"""
|
||||
keys = []
|
||||
for key_id, key_data in api_keys_store.items():
|
||||
# Decrypt key for display
|
||||
display_key = key_data.get("key", "")
|
||||
if ENCRYPTION_AVAILABLE and key_data.get("encrypted", False):
|
||||
try:
|
||||
display_key = decrypt_api_key(display_key)
|
||||
except Exception as e:
|
||||
print(f"❌ Failed to decrypt key {key_data.get('name')}: {e}")
|
||||
display_key = "DECRYPTION_FAILED"
|
||||
|
||||
keys.append({
|
||||
"id": key_id,
|
||||
"name": key_data.get("name", "Unnamed"),
|
||||
"key": key_data.get("key", ""),
|
||||
"key": display_key,
|
||||
"created_at": key_data.get("created_at", datetime.now().isoformat()),
|
||||
"usage_count": key_data.get("usage_count", 0),
|
||||
"encrypted": key_data.get("encrypted", False),
|
||||
})
|
||||
|
||||
return {"api_keys": keys}
|
||||
@@ -323,19 +368,37 @@ async def admin_get_api_keys(api_key: str = Depends(require_api_key)):
|
||||
|
||||
@app.post("/admin/api-keys")
|
||||
async def admin_create_api_key(request: dict, api_key: str = Depends(require_api_key)):
|
||||
"""새 API 키 생성"""
|
||||
"""새 API 키 생성 (암호화 저장)"""
|
||||
name = request.get("name", "Unnamed Key")
|
||||
new_key = secrets.token_urlsafe(32)
|
||||
key_id = str(uuid.uuid4())
|
||||
|
||||
# Encrypt the key before storing
|
||||
stored_key = new_key
|
||||
encrypted = False
|
||||
|
||||
if ENCRYPTION_AVAILABLE:
|
||||
try:
|
||||
stored_key = encrypt_api_key(new_key)
|
||||
encrypted = True
|
||||
print(f"🔒 Created encrypted API key: {name}")
|
||||
except Exception as e:
|
||||
print(f"❌ Failed to encrypt new key {name}: {e}")
|
||||
encrypted = False
|
||||
|
||||
api_keys_store[key_id] = {
|
||||
"name": name,
|
||||
"key": new_key,
|
||||
"key": stored_key,
|
||||
"created_at": datetime.now().isoformat(),
|
||||
"usage_count": 0,
|
||||
"encrypted": encrypted,
|
||||
}
|
||||
|
||||
return {"api_key": new_key, "key_id": key_id}
|
||||
return {
|
||||
"api_key": new_key, # Return plain key to user (only time they'll see it)
|
||||
"key_id": key_id,
|
||||
"encrypted": encrypted
|
||||
}
|
||||
|
||||
|
||||
@app.delete("/admin/api-keys/{key_id}")
|
||||
|
||||
Reference in New Issue
Block a user