GPU 서버 중앙 AI 라우팅 서비스 초기 구현: - OpenAI 호환 API (/v1/chat/completions, /v1/models, /v1/embeddings) - 모델 레지스트리 + 백엔드 헬스체크 (30초 루프) - Ollama SSE 프록시 (NDJSON → OpenAI SSE 변환) - JWT 인증 이중 경로 (httpOnly 쿠키 + Bearer 토큰) - owner/guest 역할 분리, 로그인 rate limiting - 백엔드별 rate limiting (NanoClaude 대비) - SQLite 스키마 사전 정의 (aiosqlite + WAL) - Docker Compose + Caddy 리버스 프록시 Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
97 lines
2.7 KiB
Python
97 lines
2.7 KiB
Python
from __future__ import annotations
|
|
|
|
import time
|
|
|
|
from jose import JWTError, jwt
|
|
from starlette.middleware.base import BaseHTTPMiddleware
|
|
from starlette.requests import Request
|
|
|
|
from config import settings
|
|
|
|
# Paths that don't require authentication
|
|
PUBLIC_PATHS = {"/", "/health", "/auth/login", "/docs", "/openapi.json"}
|
|
PUBLIC_PREFIXES = ("/health/",)
|
|
|
|
|
|
class AuthMiddleware(BaseHTTPMiddleware):
|
|
async def dispatch(self, request: Request, call_next):
|
|
path = request.url.path
|
|
|
|
# Skip auth for public paths
|
|
if path in PUBLIC_PATHS or any(path.startswith(p) for p in PUBLIC_PREFIXES):
|
|
request.state.role = "anonymous"
|
|
return await call_next(request)
|
|
|
|
# Skip auth for OPTIONS (CORS preflight)
|
|
if request.method == "OPTIONS":
|
|
return await call_next(request)
|
|
|
|
# Try Bearer token first, then cookie
|
|
token = _extract_token(request)
|
|
if not token:
|
|
request.state.role = "anonymous"
|
|
return await call_next(request)
|
|
|
|
# Verify JWT
|
|
payload = _verify_token(token)
|
|
if payload:
|
|
request.state.role = payload.get("role", "guest")
|
|
else:
|
|
request.state.role = "anonymous"
|
|
|
|
return await call_next(request)
|
|
|
|
|
|
def create_token(role: str) -> str:
|
|
payload = {
|
|
"role": role,
|
|
"exp": time.time() + settings.jwt_expire_hours * 3600,
|
|
"iat": time.time(),
|
|
}
|
|
return jwt.encode(payload, settings.jwt_secret, algorithm=settings.jwt_algorithm)
|
|
|
|
|
|
def _extract_token(request: Request) -> str | None:
|
|
# 1. Authorization: Bearer header
|
|
auth_header = request.headers.get("authorization", "")
|
|
if auth_header.startswith("Bearer "):
|
|
return auth_header[7:]
|
|
|
|
# 2. httpOnly cookie
|
|
return request.cookies.get("token")
|
|
|
|
|
|
def _verify_token(token: str) -> dict | None:
|
|
try:
|
|
payload = jwt.decode(
|
|
token, settings.jwt_secret, algorithms=[settings.jwt_algorithm]
|
|
)
|
|
if payload.get("exp", 0) < time.time():
|
|
return None
|
|
return payload
|
|
except JWTError:
|
|
return None
|
|
|
|
|
|
# Login rate limiting (IP-based)
|
|
_login_attempts: dict[str, list[float]] = {}
|
|
MAX_ATTEMPTS = 5
|
|
LOCKOUT_SECONDS = 60
|
|
|
|
|
|
def check_login_rate_limit(ip: str) -> bool:
|
|
"""Returns True if login is allowed for this IP."""
|
|
now = time.time()
|
|
attempts = _login_attempts.get(ip, [])
|
|
# Clean old attempts
|
|
attempts = [t for t in attempts if now - t < LOCKOUT_SECONDS]
|
|
_login_attempts[ip] = attempts
|
|
return len(attempts) < MAX_ATTEMPTS
|
|
|
|
|
|
def record_login_attempt(ip: str):
|
|
now = time.time()
|
|
if ip not in _login_attempts:
|
|
_login_attempts[ip] = []
|
|
_login_attempts[ip].append(now)
|