""" FastAPI 브릿지 분석 시스템 Phase 4: 데이터 분석 및 성능 모니터링 """ import time import json import asyncio from typing import Dict, List, Any, Optional from collections import defaultdict, deque from dataclasses import dataclass, asdict from datetime import datetime, timedelta import aiohttp from fastapi import Request, Response from config import settings @dataclass class RequestMetric: """요청 메트릭 데이터 클래스""" timestamp: float method: str path: str status_code: int response_time: float cache_hit: bool = False user_agent: Optional[str] = None ip_address: Optional[str] = None class AnalyticsManager: """분석 데이터 관리자""" def __init__(self, max_entries: int = 10000): self.max_entries = max_entries self.metrics: deque = deque(maxlen=max_entries) self.cache_stats = { "hits": 0, "misses": 0, "total_requests": 0 } # 실시간 통계 self.hourly_stats = defaultdict(lambda: { "requests": 0, "avg_response_time": 0, "cache_hit_rate": 0, "errors": 0 }) # API 엔드포인트별 통계 self.endpoint_stats = defaultdict(lambda: { "requests": 0, "avg_response_time": 0, "min_response_time": float('inf'), "max_response_time": 0, "cache_hits": 0, "errors": 0 }) # 클라이언트 IP별 통계 self.client_stats = defaultdict(lambda: { "requests": 0, "last_request": None, "user_agents": set() }) def record_request(self, metric: RequestMetric): """요청 메트릭 기록""" self.metrics.append(metric) # 전체 통계 업데이트 self.cache_stats["total_requests"] += 1 if metric.cache_hit: self.cache_stats["hits"] += 1 else: self.cache_stats["misses"] += 1 # 시간대별 통계 업데이트 hour_key = datetime.fromtimestamp(metric.timestamp).strftime("%Y-%m-%d_%H") hourly = self.hourly_stats[hour_key] hourly["requests"] += 1 # 평균 응답시간 계산 (이동평균) if hourly["avg_response_time"] == 0: hourly["avg_response_time"] = metric.response_time else: hourly["avg_response_time"] = ( hourly["avg_response_time"] * 0.9 + metric.response_time * 0.1 ) if metric.status_code >= 400: hourly["errors"] += 1 # 엔드포인트별 통계 업데이트 endpoint_key = f"{metric.method}:{metric.path}" endpoint = self.endpoint_stats[endpoint_key] endpoint["requests"] += 1 # 응답시간 통계 if endpoint["avg_response_time"] == 0: endpoint["avg_response_time"] = metric.response_time else: endpoint["avg_response_time"] = ( endpoint["avg_response_time"] * 0.9 + metric.response_time * 0.1 ) endpoint["min_response_time"] = min( endpoint["min_response_time"], metric.response_time ) endpoint["max_response_time"] = max( endpoint["max_response_time"], metric.response_time ) if metric.cache_hit: endpoint["cache_hits"] += 1 if metric.status_code >= 400: endpoint["errors"] += 1 # 클라이언트별 통계 업데이트 if metric.ip_address: client = self.client_stats[metric.ip_address] client["requests"] += 1 client["last_request"] = metric.timestamp if metric.user_agent: client["user_agents"].add(metric.user_agent) def get_summary_stats(self) -> Dict[str, Any]: """요약 통계 반환""" total_requests = self.cache_stats["total_requests"] cache_hit_rate = 0 if total_requests > 0: cache_hit_rate = (self.cache_stats["hits"] / total_requests) * 100 # 최근 1시간 통계 recent_metrics = [ m for m in self.metrics if time.time() - m.timestamp < 3600 ] avg_response_time = 0 if recent_metrics: avg_response_time = sum(m.response_time for m in recent_metrics) / len(recent_metrics) return { "total_requests": total_requests, "cache_hit_rate": round(cache_hit_rate, 2), "recent_hour_requests": len(recent_metrics), "avg_response_time_ms": round(avg_response_time * 1000, 2), "active_clients": len([ ip for ip, stats in self.client_stats.items() if time.time() - (stats["last_request"] or 0) < 3600 ]), "total_endpoints": len(self.endpoint_stats) } def get_top_endpoints(self, limit: int = 10) -> List[Dict[str, Any]]: """상위 엔드포인트 통계""" sorted_endpoints = sorted( self.endpoint_stats.items(), key=lambda x: x[1]["requests"], reverse=True ) result = [] for endpoint, stats in sorted_endpoints[:limit]: method, path = endpoint.split(":", 1) cache_rate = 0 if stats["requests"] > 0: cache_rate = (stats["cache_hits"] / stats["requests"]) * 100 result.append({ "method": method, "path": path, "requests": stats["requests"], "avg_response_time_ms": round(stats["avg_response_time"] * 1000, 2), "min_response_time_ms": round(stats["min_response_time"] * 1000, 2) if stats["min_response_time"] != float('inf') else 0, "max_response_time_ms": round(stats["max_response_time"] * 1000, 2), "cache_hit_rate": round(cache_rate, 2), "error_count": stats["errors"] }) return result def get_hourly_trends(self, hours: int = 24) -> Dict[str, List]: """시간대별 트렌드 데이터""" now = datetime.now() trends = { "hours": [], "requests": [], "avg_response_times": [], "error_rates": [] } for i in range(hours): hour = now - timedelta(hours=i) hour_key = hour.strftime("%Y-%m-%d_%H") stats = self.hourly_stats.get(hour_key, { "requests": 0, "avg_response_time": 0, "errors": 0 }) error_rate = 0 if stats["requests"] > 0: error_rate = (stats["errors"] / stats["requests"]) * 100 trends["hours"].insert(0, hour.strftime("%H:00")) trends["requests"].insert(0, stats["requests"]) trends["avg_response_times"].insert(0, round(stats["avg_response_time"] * 1000, 2)) trends["error_rates"].insert(0, round(error_rate, 2)) return trends def get_client_analysis(self) -> Dict[str, Any]: """클라이언트 분석""" total_clients = len(self.client_stats) active_clients = len([ ip for ip, stats in self.client_stats.items() if time.time() - (stats["last_request"] or 0) < 3600 ]) # 상위 클라이언트 top_clients = sorted( self.client_stats.items(), key=lambda x: x[1]["requests"], reverse=True )[:10] client_list = [] for ip, stats in top_clients: client_list.append({ "ip": ip, "requests": stats["requests"], "last_request": datetime.fromtimestamp(stats["last_request"]).isoformat() if stats["last_request"] else None, "user_agents": len(stats["user_agents"]) }) return { "total_clients": total_clients, "active_clients_1h": active_clients, "top_clients": client_list } def predict_load(self) -> Dict[str, Any]: """간단한 부하 예측""" recent_hours = [] for i in range(3): # 최근 3시간 hour = datetime.now() - timedelta(hours=i) hour_key = hour.strftime("%Y-%m-%d_%H") stats = self.hourly_stats.get(hour_key, {"requests": 0}) recent_hours.append(stats["requests"]) if len(recent_hours) >= 2: # 단순 선형 트렌드 trend = recent_hours[0] - recent_hours[1] if len(recent_hours) > 1 else 0 predicted_next_hour = max(0, recent_hours[0] + trend) else: predicted_next_hour = recent_hours[0] if recent_hours else 0 return { "current_hour_requests": recent_hours[0] if recent_hours else 0, "predicted_next_hour": round(predicted_next_hour), "trend": "increasing" if trend > 0 else "decreasing" if trend < 0 else "stable" if 'trend' in locals() else "unknown" } # 전역 분석 매니저 analytics_manager = AnalyticsManager() class AnalyticsMiddleware: """분석 미들웨어""" def __init__(self, app): self.app = app async def __call__(self, scope, receive, send): if scope["type"] != "http": await self.app(scope, receive, send) return request = Request(scope, receive) start_time = time.time() # 응답 캡처를 위한 래퍼 response_data = {"status_code": 200} async def send_wrapper(message): if message["type"] == "http.response.start": response_data["status_code"] = message["status"] await send(message) # 요청 처리 await self.app(scope, receive, send_wrapper) # 메트릭 기록 end_time = time.time() response_time = end_time - start_time # 캐시 히트 여부는 로그에서 추측 (간단한 구현) cache_hit = "/cache/" in request.url.path or response_time < 0.01 metric = RequestMetric( timestamp=start_time, method=request.method, path=request.url.path, status_code=response_data["status_code"], response_time=response_time, cache_hit=cache_hit, user_agent=request.headers.get("user-agent"), ip_address=request.client.host if request.client else None ) analytics_manager.record_request(metric)