Files
TK-FB-Project/fastapi-bridge/analytics.py

314 lines
11 KiB
Python

"""
FastAPI 브릿지 분석 시스템
Phase 4: 데이터 분석 및 성능 모니터링
"""
import time
import json
import asyncio
from typing import Dict, List, Any, Optional
from collections import defaultdict, deque
from dataclasses import dataclass, asdict
from datetime import datetime, timedelta
import aiohttp
from fastapi import Request, Response
from config import settings
@dataclass
class RequestMetric:
"""요청 메트릭 데이터 클래스"""
timestamp: float
method: str
path: str
status_code: int
response_time: float
cache_hit: bool = False
user_agent: Optional[str] = None
ip_address: Optional[str] = None
class AnalyticsManager:
"""분석 데이터 관리자"""
def __init__(self, max_entries: int = 10000):
self.max_entries = max_entries
self.metrics: deque = deque(maxlen=max_entries)
self.cache_stats = {
"hits": 0,
"misses": 0,
"total_requests": 0
}
# 실시간 통계
self.hourly_stats = defaultdict(lambda: {
"requests": 0,
"avg_response_time": 0,
"cache_hit_rate": 0,
"errors": 0
})
# API 엔드포인트별 통계
self.endpoint_stats = defaultdict(lambda: {
"requests": 0,
"avg_response_time": 0,
"min_response_time": float('inf'),
"max_response_time": 0,
"cache_hits": 0,
"errors": 0
})
# 클라이언트 IP별 통계
self.client_stats = defaultdict(lambda: {
"requests": 0,
"last_request": None,
"user_agents": set()
})
def record_request(self, metric: RequestMetric):
"""요청 메트릭 기록"""
self.metrics.append(metric)
# 전체 통계 업데이트
self.cache_stats["total_requests"] += 1
if metric.cache_hit:
self.cache_stats["hits"] += 1
else:
self.cache_stats["misses"] += 1
# 시간대별 통계 업데이트
hour_key = datetime.fromtimestamp(metric.timestamp).strftime("%Y-%m-%d_%H")
hourly = self.hourly_stats[hour_key]
hourly["requests"] += 1
# 평균 응답시간 계산 (이동평균)
if hourly["avg_response_time"] == 0:
hourly["avg_response_time"] = metric.response_time
else:
hourly["avg_response_time"] = (
hourly["avg_response_time"] * 0.9 + metric.response_time * 0.1
)
if metric.status_code >= 400:
hourly["errors"] += 1
# 엔드포인트별 통계 업데이트
endpoint_key = f"{metric.method}:{metric.path}"
endpoint = self.endpoint_stats[endpoint_key]
endpoint["requests"] += 1
# 응답시간 통계
if endpoint["avg_response_time"] == 0:
endpoint["avg_response_time"] = metric.response_time
else:
endpoint["avg_response_time"] = (
endpoint["avg_response_time"] * 0.9 + metric.response_time * 0.1
)
endpoint["min_response_time"] = min(
endpoint["min_response_time"], metric.response_time
)
endpoint["max_response_time"] = max(
endpoint["max_response_time"], metric.response_time
)
if metric.cache_hit:
endpoint["cache_hits"] += 1
if metric.status_code >= 400:
endpoint["errors"] += 1
# 클라이언트별 통계 업데이트
if metric.ip_address:
client = self.client_stats[metric.ip_address]
client["requests"] += 1
client["last_request"] = metric.timestamp
if metric.user_agent:
client["user_agents"].add(metric.user_agent)
def get_summary_stats(self) -> Dict[str, Any]:
"""요약 통계 반환"""
total_requests = self.cache_stats["total_requests"]
cache_hit_rate = 0
if total_requests > 0:
cache_hit_rate = (self.cache_stats["hits"] / total_requests) * 100
# 최근 1시간 통계
recent_metrics = [
m for m in self.metrics
if time.time() - m.timestamp < 3600
]
avg_response_time = 0
if recent_metrics:
avg_response_time = sum(m.response_time for m in recent_metrics) / len(recent_metrics)
return {
"total_requests": total_requests,
"cache_hit_rate": round(cache_hit_rate, 2),
"recent_hour_requests": len(recent_metrics),
"avg_response_time_ms": round(avg_response_time * 1000, 2),
"active_clients": len([
ip for ip, stats in self.client_stats.items()
if time.time() - (stats["last_request"] or 0) < 3600
]),
"total_endpoints": len(self.endpoint_stats)
}
def get_top_endpoints(self, limit: int = 10) -> List[Dict[str, Any]]:
"""상위 엔드포인트 통계"""
sorted_endpoints = sorted(
self.endpoint_stats.items(),
key=lambda x: x[1]["requests"],
reverse=True
)
result = []
for endpoint, stats in sorted_endpoints[:limit]:
method, path = endpoint.split(":", 1)
cache_rate = 0
if stats["requests"] > 0:
cache_rate = (stats["cache_hits"] / stats["requests"]) * 100
result.append({
"method": method,
"path": path,
"requests": stats["requests"],
"avg_response_time_ms": round(stats["avg_response_time"] * 1000, 2),
"min_response_time_ms": round(stats["min_response_time"] * 1000, 2) if stats["min_response_time"] != float('inf') else 0,
"max_response_time_ms": round(stats["max_response_time"] * 1000, 2),
"cache_hit_rate": round(cache_rate, 2),
"error_count": stats["errors"]
})
return result
def get_hourly_trends(self, hours: int = 24) -> Dict[str, List]:
"""시간대별 트렌드 데이터"""
now = datetime.now()
trends = {
"hours": [],
"requests": [],
"avg_response_times": [],
"error_rates": []
}
for i in range(hours):
hour = now - timedelta(hours=i)
hour_key = hour.strftime("%Y-%m-%d_%H")
stats = self.hourly_stats.get(hour_key, {
"requests": 0,
"avg_response_time": 0,
"errors": 0
})
error_rate = 0
if stats["requests"] > 0:
error_rate = (stats["errors"] / stats["requests"]) * 100
trends["hours"].insert(0, hour.strftime("%H:00"))
trends["requests"].insert(0, stats["requests"])
trends["avg_response_times"].insert(0, round(stats["avg_response_time"] * 1000, 2))
trends["error_rates"].insert(0, round(error_rate, 2))
return trends
def get_client_analysis(self) -> Dict[str, Any]:
"""클라이언트 분석"""
total_clients = len(self.client_stats)
active_clients = len([
ip for ip, stats in self.client_stats.items()
if time.time() - (stats["last_request"] or 0) < 3600
])
# 상위 클라이언트
top_clients = sorted(
self.client_stats.items(),
key=lambda x: x[1]["requests"],
reverse=True
)[:10]
client_list = []
for ip, stats in top_clients:
client_list.append({
"ip": ip,
"requests": stats["requests"],
"last_request": datetime.fromtimestamp(stats["last_request"]).isoformat() if stats["last_request"] else None,
"user_agents": len(stats["user_agents"])
})
return {
"total_clients": total_clients,
"active_clients_1h": active_clients,
"top_clients": client_list
}
def predict_load(self) -> Dict[str, Any]:
"""간단한 부하 예측"""
recent_hours = []
for i in range(3): # 최근 3시간
hour = datetime.now() - timedelta(hours=i)
hour_key = hour.strftime("%Y-%m-%d_%H")
stats = self.hourly_stats.get(hour_key, {"requests": 0})
recent_hours.append(stats["requests"])
if len(recent_hours) >= 2:
# 단순 선형 트렌드
trend = recent_hours[0] - recent_hours[1] if len(recent_hours) > 1 else 0
predicted_next_hour = max(0, recent_hours[0] + trend)
else:
predicted_next_hour = recent_hours[0] if recent_hours else 0
return {
"current_hour_requests": recent_hours[0] if recent_hours else 0,
"predicted_next_hour": round(predicted_next_hour),
"trend": "increasing" if trend > 0 else "decreasing" if trend < 0 else "stable" if 'trend' in locals() else "unknown"
}
# 전역 분석 매니저
analytics_manager = AnalyticsManager()
class AnalyticsMiddleware:
"""분석 미들웨어"""
def __init__(self, app):
self.app = app
async def __call__(self, scope, receive, send):
if scope["type"] != "http":
await self.app(scope, receive, send)
return
request = Request(scope, receive)
start_time = time.time()
# 응답 캡처를 위한 래퍼
response_data = {"status_code": 200}
async def send_wrapper(message):
if message["type"] == "http.response.start":
response_data["status_code"] = message["status"]
await send(message)
# 요청 처리
await self.app(scope, receive, send_wrapper)
# 메트릭 기록
end_time = time.time()
response_time = end_time - start_time
# 캐시 히트 여부는 로그에서 추측 (간단한 구현)
cache_hit = "/cache/" in request.url.path or response_time < 0.01
metric = RequestMetric(
timestamp=start_time,
method=request.method,
path=request.url.path,
status_code=response_data["status_code"],
response_time=response_time,
cache_hit=cache_hit,
user_agent=request.headers.get("user-agent"),
ip_address=request.client.host if request.client else None
)
analytics_manager.record_request(metric)