Files
TK-FB-Project/fastapi-bridge/cache.py

204 lines
6.6 KiB
Python

"""
FastAPI 브릿지 캐싱 시스템
Phase 3: Redis 캐싱 (메모리 캐시로 폴백)
"""
import asyncio
import hashlib
import json
import time
from typing import Any, Dict, Optional, Union
from functools import wraps
import redis.asyncio as redis
from config import settings
class CacheManager:
"""캐시 관리자 - Redis 우선, 메모리 캐시 폴백"""
def __init__(self):
self.redis_client: Optional[redis.Redis] = None
self.memory_cache: Dict[str, Dict[str, Any]] = {}
self.is_redis_available = False
async def connect(self):
"""Redis 연결 시도, 실패시 메모리 캐시 사용"""
try:
self.redis_client = redis.from_url(
settings.REDIS_URL,
decode_responses=True,
socket_connect_timeout=5,
socket_timeout=5
)
# Redis 연결 테스트
await self.redis_client.ping()
self.is_redis_available = True
print("✅ Redis 연결 성공")
except Exception as e:
print(f"⚠️ Redis 연결 실패, 메모리 캐시 사용: {e}")
self.is_redis_available = False
self.redis_client = None
async def disconnect(self):
"""연결 종료"""
if self.redis_client:
await self.redis_client.close()
def _generate_key(self, prefix: str, *args, **kwargs) -> str:
"""캐시 키 생성"""
# 파라미터들을 문자열로 변환
key_data = f"{prefix}:{':'.join(map(str, args))}"
if kwargs:
key_data += f":{json.dumps(kwargs, sort_keys=True)}"
# 긴 키는 해시로 단축
if len(key_data) > 100:
key_hash = hashlib.md5(key_data.encode()).hexdigest()
return f"{prefix}:{key_hash}"
return key_data
async def get(self, key: str) -> Optional[Any]:
"""캐시에서 값 조회"""
if self.is_redis_available and self.redis_client:
try:
value = await self.redis_client.get(key)
if value:
return json.loads(value)
except Exception as e:
print(f"Redis GET 오류: {e}")
# 메모리 캐시 폴백
if key in self.memory_cache:
cache_entry = self.memory_cache[key]
if cache_entry['expires_at'] > time.time():
return cache_entry['data']
else:
# 만료된 캐시 삭제
del self.memory_cache[key]
return None
async def set(self, key: str, value: Any, ttl: int = 300) -> bool:
"""캐시에 값 저장"""
json_value = json.dumps(value, ensure_ascii=False)
if self.is_redis_available and self.redis_client:
try:
await self.redis_client.setex(key, ttl, json_value)
return True
except Exception as e:
print(f"Redis SET 오류: {e}")
# 메모리 캐시 폴백
self.memory_cache[key] = {
'data': value,
'expires_at': time.time() + ttl
}
# 메모리 캐시 크기 제한 (최대 1000개)
if len(self.memory_cache) > 1000:
# 가장 오래된 10개 항목 제거
sorted_keys = sorted(
self.memory_cache.keys(),
key=lambda k: self.memory_cache[k]['expires_at']
)
for key_to_remove in sorted_keys[:10]:
del self.memory_cache[key_to_remove]
return True
async def delete(self, key: str) -> bool:
"""캐시에서 값 삭제"""
deleted = False
if self.is_redis_available and self.redis_client:
try:
result = await self.redis_client.delete(key)
deleted = result > 0
except Exception as e:
print(f"Redis DELETE 오류: {e}")
# 메모리 캐시에서도 삭제
if key in self.memory_cache:
del self.memory_cache[key]
deleted = True
return deleted
async def clear_pattern(self, pattern: str) -> int:
"""패턴 매칭으로 키 삭제"""
deleted_count = 0
if self.is_redis_available and self.redis_client:
try:
keys = await self.redis_client.keys(pattern)
if keys:
deleted_count = await self.redis_client.delete(*keys)
except Exception as e:
print(f"Redis CLEAR_PATTERN 오류: {e}")
# 메모리 캐시에서 패턴 매칭 삭제
import fnmatch
keys_to_delete = [
key for key in self.memory_cache.keys()
if fnmatch.fnmatch(key, pattern)
]
for key in keys_to_delete:
del self.memory_cache[key]
deleted_count += 1
return deleted_count
def get_stats(self) -> Dict[str, Any]:
"""캐시 통계"""
stats = {
"redis_available": self.is_redis_available,
"memory_cache_size": len(self.memory_cache),
"cache_type": "Redis" if self.is_redis_available else "Memory"
}
# 만료된 메모리 캐시 정리
current_time = time.time()
expired_keys = [
key for key, entry in self.memory_cache.items()
if entry['expires_at'] <= current_time
]
for key in expired_keys:
del self.memory_cache[key]
stats["expired_keys_cleaned"] = len(expired_keys)
return stats
# 전역 캐시 매니저
cache_manager = CacheManager()
def cached(prefix: str = "default", ttl: int = 300):
"""캐싱 데코레이터"""
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
# 캐시 키 생성
cache_key = cache_manager._generate_key(prefix, *args, **kwargs)
# 캐시에서 조회
cached_result = await cache_manager.get(cache_key)
if cached_result is not None:
print(f"🟢 캐시 히트: {cache_key}")
return cached_result
# 캐시 미스 - 함수 실행
print(f"🟡 캐시 미스: {cache_key}")
result = await func(*args, **kwargs)
# 결과 캐싱
await cache_manager.set(cache_key, result, ttl)
return result
return wrapper
return decorator