#!/usr/bin/env python3 """ 백그라운드 AI 모델 서비스 및 모니터링 시스템 실시간 모델 상태 추적 및 성능 지표 수집 """ import asyncio import time import psutil import threading from datetime import datetime, timedelta from dataclasses import dataclass, asdict from typing import Dict, List, Optional from pathlib import Path import json import logging from integrated_translation_system import IntegratedTranslationSystem @dataclass class ModelStatus: """개별 모델 상태 정보""" name: str status: str # "loading", "ready", "error", "unloaded" load_time: Optional[float] = None memory_usage_mb: float = 0.0 last_used: Optional[datetime] = None error_message: Optional[str] = None total_processed: int = 0 @dataclass class SystemMetrics: """전체 시스템 성능 지표""" total_memory_usage_mb: float cpu_usage_percent: float active_jobs: int queued_jobs: int completed_jobs_today: int average_processing_time: float uptime_seconds: float class BackgroundAIService: """백그라운드 AI 모델 서비스 관리자""" def __init__(self): self.models_status: Dict[str, ModelStatus] = {} self.translation_system: Optional[IntegratedTranslationSystem] = None self.start_time = datetime.now() self.job_queue = asyncio.Queue() self.active_jobs = {} self.completed_jobs = [] self.metrics_history = [] # 로깅 설정 self.logger = logging.getLogger(__name__) self.logger.setLevel(logging.INFO) # 모델 상태 초기화 self._initialize_model_status() # 백그라운드 워커 및 모니터링 시작 self.worker_task = None self.monitoring_task = None def _initialize_model_status(self): """모델 상태 초기화""" model_names = ["NLLB 번역", "KoBART 요약"] for name in model_names: self.models_status[name] = ModelStatus( name=name, status="unloaded" ) async def start_service(self): """백그라운드 서비스 시작""" self.logger.info("🚀 백그라운드 AI 서비스 시작") # 모델 로딩 시작 asyncio.create_task(self._load_models()) # 백그라운드 워커 시작 self.worker_task = asyncio.create_task(self._process_job_queue()) # 모니터링 시작 self.monitoring_task = asyncio.create_task(self._collect_metrics()) self.logger.info("✅ 백그라운드 서비스 준비 완료") async def _load_models(self): """AI 모델들을 순차적으로 로딩""" try: # NLLB 모델 로딩 시작 self.models_status["NLLB 번역"].status = "loading" load_start = time.time() self.translation_system = IntegratedTranslationSystem() # 실제 모델 로딩 success = await asyncio.to_thread(self.translation_system.load_models) if success: load_time = time.time() - load_start # 각 모델 상태 업데이트 self.models_status["NLLB 번역"].status = "ready" self.models_status["NLLB 번역"].load_time = load_time self.models_status["KoBART 요약"].status = "ready" self.models_status["KoBART 요약"].load_time = load_time self.logger.info(f"✅ 모든 AI 모델 로딩 완료 ({load_time:.1f}초)") else: # 로딩 실패 for model_name in self.models_status.keys(): self.models_status[model_name].status = "error" self.models_status[model_name].error_message = "모델 로딩 실패" self.logger.error("❌ AI 모델 로딩 실패") except Exception as e: self.logger.error(f"❌ 모델 로딩 중 오류: {e}") for model_name in self.models_status.keys(): self.models_status[model_name].status = "error" self.models_status[model_name].error_message = str(e) def _get_memory_usage(self) -> float: """현재 프로세스의 메모리 사용량 반환 (MB)""" try: process = psutil.Process() return process.memory_info().rss / 1024 / 1024 # bytes to MB except: return 0.0 async def _collect_metrics(self): """주기적으로 시스템 메트릭 수집""" while True: try: # 메모리 사용량 업데이트 total_memory = self._get_memory_usage() # 각 모델별 메모리 사용량 추정 (실제로는 더 정교한 측정 필요) if self.models_status["NLLB 번역"].status == "ready": self.models_status["NLLB 번역"].memory_usage_mb = total_memory * 0.6 if self.models_status["KoBART 요약"].status == "ready": self.models_status["KoBART 요약"].memory_usage_mb = total_memory * 0.4 # 전체 시스템 메트릭 수집 metrics = SystemMetrics( total_memory_usage_mb=total_memory, cpu_usage_percent=psutil.cpu_percent(), active_jobs=len(self.active_jobs), queued_jobs=self.job_queue.qsize(), completed_jobs_today=len([ job for job in self.completed_jobs if job.get('completed_at', datetime.min).date() == datetime.now().date() ]), average_processing_time=self._calculate_average_processing_time(), uptime_seconds=(datetime.now() - self.start_time).total_seconds() ) # 메트릭 히스토리에 추가 (최근 100개만 유지) self.metrics_history.append({ 'timestamp': datetime.now(), 'metrics': metrics }) if len(self.metrics_history) > 100: self.metrics_history.pop(0) await asyncio.sleep(5) # 5초마다 수집 except Exception as e: self.logger.error(f"메트릭 수집 오류: {e}") await asyncio.sleep(10) def _calculate_average_processing_time(self) -> float: """최근 처리된 작업들의 평균 처리 시간 계산""" recent_jobs = [ job for job in self.completed_jobs[-20:] # 최근 20개 if 'processing_time' in job ] if not recent_jobs: return 0.0 return sum(job['processing_time'] for job in recent_jobs) / len(recent_jobs) async def _process_job_queue(self): """작업 큐 처리 워커""" while True: try: # 큐에서 작업 가져오기 job = await self.job_queue.get() job_id = job['job_id'] # 활성 작업에 추가 self.active_jobs[job_id] = { 'start_time': datetime.now(), 'job_data': job } # 실제 AI 처리 await self._process_single_job(job) # 완료된 작업으로 이동 processing_time = (datetime.now() - self.active_jobs[job_id]['start_time']).total_seconds() self.completed_jobs.append({ 'job_id': job_id, 'completed_at': datetime.now(), 'processing_time': processing_time }) # 활성 작업에서 제거 del self.active_jobs[job_id] # 모델 사용 시간 업데이트 for model_status in self.models_status.values(): if model_status.status == "ready": model_status.last_used = datetime.now() model_status.total_processed += 1 except Exception as e: self.logger.error(f"작업 처리 오류: {e}") if job_id in self.active_jobs: del self.active_jobs[job_id] async def _process_single_job(self, job): """개별 작업 처리""" # 기존 번역 시스템 로직 사용 if self.translation_system and self.models_status["NLLB 번역"].status == "ready": result = await asyncio.to_thread( self.translation_system.process_document, job['file_path'] ) return result else: raise Exception("AI 모델이 준비되지 않음") async def add_job(self, job_data: Dict): """새 작업을 큐에 추가""" await self.job_queue.put(job_data) def get_dashboard_data(self) -> Dict: """대시보드용 데이터 반환""" current_metrics = self.metrics_history[-1]['metrics'] if self.metrics_history else None return { 'models_status': {name: asdict(status) for name, status in self.models_status.items()}, 'current_metrics': asdict(current_metrics) if current_metrics else None, 'recent_metrics': [ { 'timestamp': entry['timestamp'].isoformat(), 'metrics': asdict(entry['metrics']) } for entry in self.metrics_history[-20:] # 최근 20개 ], 'active_jobs': len(self.active_jobs), 'completed_today': len([ job for job in self.completed_jobs if job.get('completed_at', datetime.min).date() == datetime.now().date() ]) } async def restart_models(self): """모델 재시작""" self.logger.info("🔄 AI 모델 재시작 중...") # 모든 모델 상태를 재로딩으로 설정 for model_status in self.models_status.values(): model_status.status = "loading" model_status.error_message = None # 기존 시스템 정리 if self.translation_system: del self.translation_system # 모델 재로딩 await self._load_models() # 전역 서비스 인스턴스 ai_service = BackgroundAIService()