""" 대용량 파일 처리 최적화 유틸리티 메모리 효율적인 파일 처리 및 청크 기반 처리 """ import pandas as pd import asyncio from typing import Iterator, List, Dict, Any, Optional, Callable from pathlib import Path import tempfile import os from concurrent.futures import ThreadPoolExecutor import gc from fastapi import HTTPException from .logger import get_logger from ..config import get_settings logger = get_logger(__name__) settings = get_settings() class FileProcessor: """대용량 파일 처리 최적화 클래스""" def __init__(self, chunk_size: int = 1000, max_workers: int = 4): self.chunk_size = chunk_size self.max_workers = max_workers self.executor = ThreadPoolExecutor(max_workers=max_workers) def read_excel_chunks(self, file_path: str, sheet_name: str = None) -> Iterator[pd.DataFrame]: """ 엑셀 파일을 청크 단위로 읽기 Args: file_path: 파일 경로 sheet_name: 시트명 (None이면 첫 번째 시트) Yields: DataFrame: 청크 단위 데이터 """ try: # 파일 크기 확인 file_size = os.path.getsize(file_path) logger.info(f"엑셀 파일 처리 시작 - 파일: {file_path}, 크기: {file_size} bytes") # 전체 행 수 확인 (메모리 효율적으로) with pd.ExcelFile(file_path) as xls: if sheet_name is None: sheet_name = xls.sheet_names[0] # 첫 번째 청크로 컬럼 정보 확인 first_chunk = pd.read_excel(xls, sheet_name=sheet_name, nrows=self.chunk_size) total_rows = len(first_chunk) # 전체 데이터를 청크로 나누어 처리 processed_rows = 0 chunk_num = 0 while processed_rows < total_rows: try: # 청크 읽기 chunk = pd.read_excel( xls, sheet_name=sheet_name, skiprows=processed_rows + 1 if processed_rows > 0 else 0, nrows=self.chunk_size, header=0 if processed_rows == 0 else None ) if chunk.empty: break # 첫 번째 청크가 아닌 경우 컬럼명 설정 if processed_rows > 0: chunk.columns = first_chunk.columns chunk_num += 1 processed_rows += len(chunk) logger.debug(f"청크 {chunk_num} 처리 - 행 수: {len(chunk)}, 누적: {processed_rows}") yield chunk # 메모리 정리 del chunk gc.collect() except Exception as e: logger.error(f"청크 {chunk_num} 처리 중 오류: {e}") break logger.info(f"엑셀 파일 처리 완료 - 총 {chunk_num}개 청크, {processed_rows}행 처리") except Exception as e: logger.error(f"엑셀 파일 읽기 실패: {e}") raise def read_csv_chunks(self, file_path: str, encoding: str = 'utf-8') -> Iterator[pd.DataFrame]: """ CSV 파일을 청크 단위로 읽기 Args: file_path: 파일 경로 encoding: 인코딩 (기본: utf-8) Yields: DataFrame: 청크 단위 데이터 """ try: file_size = os.path.getsize(file_path) logger.info(f"CSV 파일 처리 시작 - 파일: {file_path}, 크기: {file_size} bytes") chunk_num = 0 total_rows = 0 # pandas의 chunksize 옵션 사용 for chunk in pd.read_csv(file_path, chunksize=self.chunk_size, encoding=encoding): chunk_num += 1 total_rows += len(chunk) logger.debug(f"CSV 청크 {chunk_num} 처리 - 행 수: {len(chunk)}, 누적: {total_rows}") yield chunk # 메모리 정리 gc.collect() logger.info(f"CSV 파일 처리 완료 - 총 {chunk_num}개 청크, {total_rows}행 처리") except Exception as e: logger.error(f"CSV 파일 읽기 실패: {e}") raise async def process_file_async( self, file_path: str, processor_func: Callable[[pd.DataFrame], List[Dict]], file_type: str = "excel" ) -> List[Dict]: """ 파일을 비동기적으로 처리 Args: file_path: 파일 경로 processor_func: 각 청크를 처리할 함수 file_type: 파일 타입 ("excel" 또는 "csv") Returns: List[Dict]: 처리된 결과 리스트 """ try: logger.info(f"비동기 파일 처리 시작 - {file_path}") results = [] chunk_futures = [] # 파일 타입에 따른 청크 리더 선택 if file_type.lower() == "csv": chunk_reader = self.read_csv_chunks(file_path) else: chunk_reader = self.read_excel_chunks(file_path) # 청크별 비동기 처리 for chunk in chunk_reader: # 스레드 풀에서 청크 처리 future = asyncio.get_event_loop().run_in_executor( self.executor, processor_func, chunk ) chunk_futures.append(future) # 너무 많은 청크가 동시에 처리되지 않도록 제한 if len(chunk_futures) >= self.max_workers: # 완료된 작업들 수집 completed_results = await asyncio.gather(*chunk_futures) for result in completed_results: if result: results.extend(result) chunk_futures = [] gc.collect() # 남은 청크들 처리 if chunk_futures: completed_results = await asyncio.gather(*chunk_futures) for result in completed_results: if result: results.extend(result) logger.info(f"비동기 파일 처리 완료 - 총 {len(results)}개 항목 처리") return results except Exception as e: logger.error(f"비동기 파일 처리 실패: {e}") raise def optimize_dataframe_memory(self, df: pd.DataFrame) -> pd.DataFrame: """ DataFrame 메모리 사용량 최적화 Args: df: 최적화할 DataFrame Returns: DataFrame: 최적화된 DataFrame """ try: original_memory = df.memory_usage(deep=True).sum() # 수치형 컬럼 최적화 for col in df.select_dtypes(include=['int64']).columns: col_min = df[col].min() col_max = df[col].max() if col_min >= -128 and col_max <= 127: df[col] = df[col].astype('int8') elif col_min >= -32768 and col_max <= 32767: df[col] = df[col].astype('int16') elif col_min >= -2147483648 and col_max <= 2147483647: df[col] = df[col].astype('int32') # 실수형 컬럼 최적화 for col in df.select_dtypes(include=['float64']).columns: df[col] = pd.to_numeric(df[col], downcast='float') # 문자열 컬럼 최적화 (카테고리형으로 변환) for col in df.select_dtypes(include=['object']).columns: if df[col].nunique() / len(df) < 0.5: # 고유값이 50% 미만인 경우 df[col] = df[col].astype('category') optimized_memory = df.memory_usage(deep=True).sum() memory_reduction = (original_memory - optimized_memory) / original_memory * 100 logger.debug(f"DataFrame 메모리 최적화 완료 - 감소율: {memory_reduction:.1f}%") return df except Exception as e: logger.warning(f"DataFrame 메모리 최적화 실패: {e}") return df def create_temp_file(self, suffix: str = '.tmp') -> str: """ 임시 파일 생성 Args: suffix: 파일 확장자 Returns: str: 임시 파일 경로 """ temp_file = tempfile.NamedTemporaryFile(delete=False, suffix=suffix) temp_file.close() logger.debug(f"임시 파일 생성: {temp_file.name}") return temp_file.name def cleanup_temp_file(self, file_path: str): """ 임시 파일 정리 Args: file_path: 삭제할 파일 경로 """ try: if os.path.exists(file_path): os.unlink(file_path) logger.debug(f"임시 파일 삭제: {file_path}") except Exception as e: logger.warning(f"임시 파일 삭제 실패: {file_path}, error: {e}") def get_file_info(self, file_path: str) -> Dict[str, Any]: """ 파일 정보 조회 Args: file_path: 파일 경로 Returns: Dict: 파일 정보 """ try: file_stat = os.stat(file_path) file_ext = Path(file_path).suffix.lower() info = { "file_path": file_path, "file_size": file_stat.st_size, "file_size_mb": round(file_stat.st_size / (1024 * 1024), 2), "file_extension": file_ext, "is_large_file": file_stat.st_size > 10 * 1024 * 1024, # 10MB 이상 "recommended_chunk_size": self._calculate_optimal_chunk_size(file_stat.st_size) } # 파일 타입별 추가 정보 if file_ext in ['.xlsx', '.xls']: info["file_type"] = "excel" info["processing_method"] = "chunk_based" if info["is_large_file"] else "full_load" elif file_ext == '.csv': info["file_type"] = "csv" info["processing_method"] = "chunk_based" if info["is_large_file"] else "full_load" return info except Exception as e: logger.error(f"파일 정보 조회 실패: {e}") return {"error": str(e)} def _calculate_optimal_chunk_size(self, file_size: int) -> int: """ 파일 크기에 따른 최적 청크 크기 계산 Args: file_size: 파일 크기 (bytes) Returns: int: 최적 청크 크기 """ # 파일 크기에 따른 청크 크기 조정 if file_size < 1024 * 1024: # 1MB 미만 return 500 elif file_size < 10 * 1024 * 1024: # 10MB 미만 return 1000 elif file_size < 50 * 1024 * 1024: # 50MB 미만 return 2000 else: # 50MB 이상 return 5000 def __del__(self): """소멸자 - 스레드 풀 정리""" if hasattr(self, 'executor'): self.executor.shutdown(wait=True) # 전역 파일 프로세서 인스턴스 file_processor = FileProcessor() def parse_dataframe(df): """DataFrame을 파싱하여 자재 데이터로 변환""" df = df.dropna(how='all') # 원본 컬럼명 출력 # 로그 제거 df.columns = df.columns.str.strip().str.lower() # 로그 제거 column_mapping = { 'description': ['description', 'item', 'material', '품명', '자재명'], 'quantity': ['qty', 'quantity', 'ea', '수량'], 'main_size': ['main_nom', 'nominal_diameter', 'nd', '주배관'], 'red_size': ['red_nom', 'reduced_diameter', '축소배관'], 'length': ['length', 'len', '길이'], 'weight': ['weight', 'wt', '중량'], 'dwg_name': ['dwg_name', 'drawing', '도면명'], 'line_num': ['line_num', 'line_number', '라인번호'] } mapped_columns = {} for standard_col, possible_names in column_mapping.items(): for possible_name in possible_names: if possible_name in df.columns: mapped_columns[standard_col] = possible_name break print(f"📋 엑셀 컬럼 매핑 결과: {mapped_columns}") print(f"📋 원본 컬럼명들: {list(df.columns)}") materials = [] for index, row in df.iterrows(): description = str(row.get(mapped_columns.get('description', ''), '')).strip() if not description or description.lower() in ['nan', 'none', '']: continue # 수량 처리 quantity_raw = row.get(mapped_columns.get('quantity', ''), 0) try: quantity = float(quantity_raw) if pd.notna(quantity_raw) else 0 except (ValueError, TypeError): quantity = 0 if quantity <= 0: continue # 길이 처리 length_raw = row.get(mapped_columns.get('length', ''), 0) try: length = float(length_raw) if pd.notna(length_raw) else 0 except (ValueError, TypeError): length = 0 # 도면명 처리 dwg_name = str(row.get(mapped_columns.get('dwg_name', ''), '')).strip() if dwg_name.lower() in ['nan', 'none']: dwg_name = '' # 라인번호 처리 line_num = str(row.get(mapped_columns.get('line_num', ''), '')).strip() if line_num.lower() in ['nan', 'none']: line_num = '' # 사이즈 처리 main_size = str(row.get(mapped_columns.get('main_size', ''), '')).strip() if main_size.lower() in ['nan', 'none']: main_size = '' red_size = str(row.get(mapped_columns.get('red_size', ''), '')).strip() if red_size.lower() in ['nan', 'none']: red_size = '' materials.append({ 'original_description': description, 'quantity': quantity, 'unit': 'EA', # 기본 단위 'length': length, 'drawing_name': dwg_name, 'line_no': line_num, 'main_nom': main_size, 'red_nom': red_size, 'row_number': index + 1 }) return materials def parse_file_data(file_path): """파일을 파싱하여 자재 데이터 추출""" file_extension = Path(file_path).suffix.lower() try: if file_extension == ".csv": df = pd.read_csv(file_path, encoding='utf-8') elif file_extension in [".xlsx", ".xls"]: df = pd.read_excel(file_path, sheet_name=0) else: raise HTTPException(status_code=400, detail="지원하지 않는 파일 형식") return parse_dataframe(df) except Exception as e: raise HTTPException(status_code=400, detail=f"파일 파싱 실패: {str(e)}")