314 lines
11 KiB
Python
314 lines
11 KiB
Python
"""
|
|
FastAPI 브릿지 분석 시스템
|
|
Phase 4: 데이터 분석 및 성능 모니터링
|
|
"""
|
|
import time
|
|
import json
|
|
import asyncio
|
|
from typing import Dict, List, Any, Optional
|
|
from collections import defaultdict, deque
|
|
from dataclasses import dataclass, asdict
|
|
from datetime import datetime, timedelta
|
|
|
|
import aiohttp
|
|
from fastapi import Request, Response
|
|
|
|
from config import settings
|
|
|
|
@dataclass
|
|
class RequestMetric:
|
|
"""요청 메트릭 데이터 클래스"""
|
|
timestamp: float
|
|
method: str
|
|
path: str
|
|
status_code: int
|
|
response_time: float
|
|
cache_hit: bool = False
|
|
user_agent: Optional[str] = None
|
|
ip_address: Optional[str] = None
|
|
|
|
class AnalyticsManager:
|
|
"""분석 데이터 관리자"""
|
|
|
|
def __init__(self, max_entries: int = 10000):
|
|
self.max_entries = max_entries
|
|
self.metrics: deque = deque(maxlen=max_entries)
|
|
self.cache_stats = {
|
|
"hits": 0,
|
|
"misses": 0,
|
|
"total_requests": 0
|
|
}
|
|
|
|
# 실시간 통계
|
|
self.hourly_stats = defaultdict(lambda: {
|
|
"requests": 0,
|
|
"avg_response_time": 0,
|
|
"cache_hit_rate": 0,
|
|
"errors": 0
|
|
})
|
|
|
|
# API 엔드포인트별 통계
|
|
self.endpoint_stats = defaultdict(lambda: {
|
|
"requests": 0,
|
|
"avg_response_time": 0,
|
|
"min_response_time": float('inf'),
|
|
"max_response_time": 0,
|
|
"cache_hits": 0,
|
|
"errors": 0
|
|
})
|
|
|
|
# 클라이언트 IP별 통계
|
|
self.client_stats = defaultdict(lambda: {
|
|
"requests": 0,
|
|
"last_request": None,
|
|
"user_agents": set()
|
|
})
|
|
|
|
def record_request(self, metric: RequestMetric):
|
|
"""요청 메트릭 기록"""
|
|
self.metrics.append(metric)
|
|
|
|
# 전체 통계 업데이트
|
|
self.cache_stats["total_requests"] += 1
|
|
if metric.cache_hit:
|
|
self.cache_stats["hits"] += 1
|
|
else:
|
|
self.cache_stats["misses"] += 1
|
|
|
|
# 시간대별 통계 업데이트
|
|
hour_key = datetime.fromtimestamp(metric.timestamp).strftime("%Y-%m-%d_%H")
|
|
hourly = self.hourly_stats[hour_key]
|
|
hourly["requests"] += 1
|
|
|
|
# 평균 응답시간 계산 (이동평균)
|
|
if hourly["avg_response_time"] == 0:
|
|
hourly["avg_response_time"] = metric.response_time
|
|
else:
|
|
hourly["avg_response_time"] = (
|
|
hourly["avg_response_time"] * 0.9 + metric.response_time * 0.1
|
|
)
|
|
|
|
if metric.status_code >= 400:
|
|
hourly["errors"] += 1
|
|
|
|
# 엔드포인트별 통계 업데이트
|
|
endpoint_key = f"{metric.method}:{metric.path}"
|
|
endpoint = self.endpoint_stats[endpoint_key]
|
|
endpoint["requests"] += 1
|
|
|
|
# 응답시간 통계
|
|
if endpoint["avg_response_time"] == 0:
|
|
endpoint["avg_response_time"] = metric.response_time
|
|
else:
|
|
endpoint["avg_response_time"] = (
|
|
endpoint["avg_response_time"] * 0.9 + metric.response_time * 0.1
|
|
)
|
|
|
|
endpoint["min_response_time"] = min(
|
|
endpoint["min_response_time"], metric.response_time
|
|
)
|
|
endpoint["max_response_time"] = max(
|
|
endpoint["max_response_time"], metric.response_time
|
|
)
|
|
|
|
if metric.cache_hit:
|
|
endpoint["cache_hits"] += 1
|
|
|
|
if metric.status_code >= 400:
|
|
endpoint["errors"] += 1
|
|
|
|
# 클라이언트별 통계 업데이트
|
|
if metric.ip_address:
|
|
client = self.client_stats[metric.ip_address]
|
|
client["requests"] += 1
|
|
client["last_request"] = metric.timestamp
|
|
if metric.user_agent:
|
|
client["user_agents"].add(metric.user_agent)
|
|
|
|
def get_summary_stats(self) -> Dict[str, Any]:
|
|
"""요약 통계 반환"""
|
|
total_requests = self.cache_stats["total_requests"]
|
|
cache_hit_rate = 0
|
|
if total_requests > 0:
|
|
cache_hit_rate = (self.cache_stats["hits"] / total_requests) * 100
|
|
|
|
# 최근 1시간 통계
|
|
recent_metrics = [
|
|
m for m in self.metrics
|
|
if time.time() - m.timestamp < 3600
|
|
]
|
|
|
|
avg_response_time = 0
|
|
if recent_metrics:
|
|
avg_response_time = sum(m.response_time for m in recent_metrics) / len(recent_metrics)
|
|
|
|
return {
|
|
"total_requests": total_requests,
|
|
"cache_hit_rate": round(cache_hit_rate, 2),
|
|
"recent_hour_requests": len(recent_metrics),
|
|
"avg_response_time_ms": round(avg_response_time * 1000, 2),
|
|
"active_clients": len([
|
|
ip for ip, stats in self.client_stats.items()
|
|
if time.time() - (stats["last_request"] or 0) < 3600
|
|
]),
|
|
"total_endpoints": len(self.endpoint_stats)
|
|
}
|
|
|
|
def get_top_endpoints(self, limit: int = 10) -> List[Dict[str, Any]]:
|
|
"""상위 엔드포인트 통계"""
|
|
sorted_endpoints = sorted(
|
|
self.endpoint_stats.items(),
|
|
key=lambda x: x[1]["requests"],
|
|
reverse=True
|
|
)
|
|
|
|
result = []
|
|
for endpoint, stats in sorted_endpoints[:limit]:
|
|
method, path = endpoint.split(":", 1)
|
|
cache_rate = 0
|
|
if stats["requests"] > 0:
|
|
cache_rate = (stats["cache_hits"] / stats["requests"]) * 100
|
|
|
|
result.append({
|
|
"method": method,
|
|
"path": path,
|
|
"requests": stats["requests"],
|
|
"avg_response_time_ms": round(stats["avg_response_time"] * 1000, 2),
|
|
"min_response_time_ms": round(stats["min_response_time"] * 1000, 2) if stats["min_response_time"] != float('inf') else 0,
|
|
"max_response_time_ms": round(stats["max_response_time"] * 1000, 2),
|
|
"cache_hit_rate": round(cache_rate, 2),
|
|
"error_count": stats["errors"]
|
|
})
|
|
|
|
return result
|
|
|
|
def get_hourly_trends(self, hours: int = 24) -> Dict[str, List]:
|
|
"""시간대별 트렌드 데이터"""
|
|
now = datetime.now()
|
|
trends = {
|
|
"hours": [],
|
|
"requests": [],
|
|
"avg_response_times": [],
|
|
"error_rates": []
|
|
}
|
|
|
|
for i in range(hours):
|
|
hour = now - timedelta(hours=i)
|
|
hour_key = hour.strftime("%Y-%m-%d_%H")
|
|
|
|
stats = self.hourly_stats.get(hour_key, {
|
|
"requests": 0,
|
|
"avg_response_time": 0,
|
|
"errors": 0
|
|
})
|
|
|
|
error_rate = 0
|
|
if stats["requests"] > 0:
|
|
error_rate = (stats["errors"] / stats["requests"]) * 100
|
|
|
|
trends["hours"].insert(0, hour.strftime("%H:00"))
|
|
trends["requests"].insert(0, stats["requests"])
|
|
trends["avg_response_times"].insert(0, round(stats["avg_response_time"] * 1000, 2))
|
|
trends["error_rates"].insert(0, round(error_rate, 2))
|
|
|
|
return trends
|
|
|
|
def get_client_analysis(self) -> Dict[str, Any]:
|
|
"""클라이언트 분석"""
|
|
total_clients = len(self.client_stats)
|
|
active_clients = len([
|
|
ip for ip, stats in self.client_stats.items()
|
|
if time.time() - (stats["last_request"] or 0) < 3600
|
|
])
|
|
|
|
# 상위 클라이언트
|
|
top_clients = sorted(
|
|
self.client_stats.items(),
|
|
key=lambda x: x[1]["requests"],
|
|
reverse=True
|
|
)[:10]
|
|
|
|
client_list = []
|
|
for ip, stats in top_clients:
|
|
client_list.append({
|
|
"ip": ip,
|
|
"requests": stats["requests"],
|
|
"last_request": datetime.fromtimestamp(stats["last_request"]).isoformat() if stats["last_request"] else None,
|
|
"user_agents": len(stats["user_agents"])
|
|
})
|
|
|
|
return {
|
|
"total_clients": total_clients,
|
|
"active_clients_1h": active_clients,
|
|
"top_clients": client_list
|
|
}
|
|
|
|
def predict_load(self) -> Dict[str, Any]:
|
|
"""간단한 부하 예측"""
|
|
recent_hours = []
|
|
for i in range(3): # 최근 3시간
|
|
hour = datetime.now() - timedelta(hours=i)
|
|
hour_key = hour.strftime("%Y-%m-%d_%H")
|
|
stats = self.hourly_stats.get(hour_key, {"requests": 0})
|
|
recent_hours.append(stats["requests"])
|
|
|
|
if len(recent_hours) >= 2:
|
|
# 단순 선형 트렌드
|
|
trend = recent_hours[0] - recent_hours[1] if len(recent_hours) > 1 else 0
|
|
predicted_next_hour = max(0, recent_hours[0] + trend)
|
|
else:
|
|
predicted_next_hour = recent_hours[0] if recent_hours else 0
|
|
|
|
return {
|
|
"current_hour_requests": recent_hours[0] if recent_hours else 0,
|
|
"predicted_next_hour": round(predicted_next_hour),
|
|
"trend": "increasing" if trend > 0 else "decreasing" if trend < 0 else "stable" if 'trend' in locals() else "unknown"
|
|
}
|
|
|
|
# 전역 분석 매니저
|
|
analytics_manager = AnalyticsManager()
|
|
|
|
class AnalyticsMiddleware:
|
|
"""분석 미들웨어"""
|
|
|
|
def __init__(self, app):
|
|
self.app = app
|
|
|
|
async def __call__(self, scope, receive, send):
|
|
if scope["type"] != "http":
|
|
await self.app(scope, receive, send)
|
|
return
|
|
|
|
request = Request(scope, receive)
|
|
start_time = time.time()
|
|
|
|
# 응답 캡처를 위한 래퍼
|
|
response_data = {"status_code": 200}
|
|
|
|
async def send_wrapper(message):
|
|
if message["type"] == "http.response.start":
|
|
response_data["status_code"] = message["status"]
|
|
await send(message)
|
|
|
|
# 요청 처리
|
|
await self.app(scope, receive, send_wrapper)
|
|
|
|
# 메트릭 기록
|
|
end_time = time.time()
|
|
response_time = end_time - start_time
|
|
|
|
# 캐시 히트 여부는 로그에서 추측 (간단한 구현)
|
|
cache_hit = "/cache/" in request.url.path or response_time < 0.01
|
|
|
|
metric = RequestMetric(
|
|
timestamp=start_time,
|
|
method=request.method,
|
|
path=request.url.path,
|
|
status_code=response_data["status_code"],
|
|
response_time=response_time,
|
|
cache_hit=cache_hit,
|
|
user_agent=request.headers.get("user-agent"),
|
|
ip_address=request.client.host if request.client else None
|
|
)
|
|
|
|
analytics_manager.record_request(metric) |