""" 파일 관리 비즈니스 로직 API 레이어에서 분리된 핵심 비즈니스 로직 """ from typing import List, Dict, Optional, Tuple from sqlalchemy.orm import Session from sqlalchemy import text from fastapi import HTTPException from ..utils.logger import get_logger from ..utils.cache_manager import tkmp_cache from ..utils.transaction_manager import TransactionManager, async_transactional from ..schemas.response_models import FileInfo from ..config import get_settings logger = get_logger(__name__) settings = get_settings() class FileService: """파일 관리 서비스""" def __init__(self, db: Session): self.db = db self.transaction_manager = TransactionManager(db) async def get_files( self, job_no: Optional[str] = None, show_history: bool = False, use_cache: bool = True ) -> Tuple[List[Dict], bool]: """ 파일 목록 조회 Args: job_no: 작업 번호 show_history: 이력 표시 여부 use_cache: 캐시 사용 여부 Returns: Tuple[List[Dict], bool]: (파일 목록, 캐시 히트 여부) """ try: logger.info(f"파일 목록 조회 - job_no: {job_no}, show_history: {show_history}") # 캐시 확인 if use_cache: cached_files = tkmp_cache.get_file_list(job_no, show_history) if cached_files: logger.info(f"캐시에서 파일 목록 반환 - {len(cached_files)}개 파일") return cached_files, True # 데이터베이스에서 조회 query, params = self._build_file_query(job_no, show_history) result = self.db.execute(text(query), params) files = result.fetchall() # 결과 변환 file_list = self._convert_files_to_dict(files) # 캐시에 저장 if use_cache: tkmp_cache.set_file_list(file_list, job_no, show_history) logger.debug("파일 목록 캐시 저장 완료") logger.info(f"파일 목록 조회 완료 - {len(file_list)}개 파일 반환") return file_list, False except Exception as e: logger.error(f"파일 목록 조회 실패: {str(e)}", exc_info=True) raise HTTPException(status_code=500, detail=f"파일 목록 조회 실패: {str(e)}") def _build_file_query(self, job_no: Optional[str], show_history: bool) -> Tuple[str, Dict]: """파일 조회 쿼리 생성""" if show_history: # 전체 이력 표시 query = "SELECT * FROM files" params = {} if job_no: query += " WHERE job_no = :job_no" params["job_no"] = job_no query += " ORDER BY original_filename, revision DESC" else: # 최신 리비전만 표시 if job_no: query = """ SELECT f1.* FROM files f1 INNER JOIN ( SELECT original_filename, MAX(revision) as max_revision FROM files WHERE job_no = :job_no GROUP BY original_filename ) f2 ON f1.original_filename = f2.original_filename AND f1.revision = f2.max_revision WHERE f1.job_no = :job_no ORDER BY f1.upload_date DESC """ params = {"job_no": job_no} else: query = "SELECT * FROM files ORDER BY upload_date DESC" params = {} return query, params def _convert_files_to_dict(self, files) -> List[Dict]: """파일 결과를 딕셔너리로 변환""" return [ { "id": f.id, "filename": f.original_filename, "original_filename": f.original_filename, "name": f.original_filename, "job_no": f.job_no, "bom_name": f.bom_name or f.original_filename, "revision": f.revision or "Rev.0", "parsed_count": f.parsed_count or 0, "bom_type": f.file_type or "unknown", "status": "active" if f.is_active else "inactive", "file_size": f.file_size, "created_at": f.upload_date, "upload_date": f.upload_date, "description": f"파일: {f.original_filename}" } for f in files ] async def delete_file(self, file_id: int) -> Dict: """ 파일 삭제 (트랜잭션 관리 적용) Args: file_id: 파일 ID Returns: Dict: 삭제 결과 """ try: logger.info(f"파일 삭제 요청 - file_id: {file_id}") # 트랜잭션 내에서 삭제 작업 수행 with self.transaction_manager.transaction(): # 파일 정보 조회 file_info = self._get_file_info(file_id) if not file_info: raise HTTPException(status_code=404, detail="파일을 찾을 수 없습니다") # 관련 데이터 삭제 (세이브포인트 사용) with self.transaction_manager.savepoint("delete_related_data"): self._delete_related_data(file_id) # 파일 삭제 with self.transaction_manager.savepoint("delete_file_record"): self._delete_file_record(file_id) # 트랜잭션이 성공적으로 완료되면 캐시 무효화 self._invalidate_file_cache(file_id, file_info) logger.info(f"파일 삭제 완료 - file_id: {file_id}, filename: {file_info.original_filename}") return { "success": True, "message": "파일과 관련 데이터가 삭제되었습니다", "deleted_file_id": file_id } except HTTPException: raise except Exception as e: logger.error(f"파일 삭제 실패 - file_id: {file_id}, error: {str(e)}", exc_info=True) raise HTTPException(status_code=500, detail=f"파일 삭제 실패: {str(e)}") def _get_file_info(self, file_id: int): """파일 정보 조회""" file_query = text("SELECT * FROM files WHERE id = :file_id") file_result = self.db.execute(file_query, {"file_id": file_id}) return file_result.fetchone() def _delete_related_data(self, file_id: int): """관련 데이터 삭제""" # 상세 테이블 목록 detail_tables = [ 'pipe_details', 'fitting_details', 'valve_details', 'flange_details', 'bolt_details', 'gasket_details', 'instrument_details' ] # 해당 파일의 materials ID 조회 material_ids_query = text("SELECT id FROM materials WHERE file_id = :file_id") material_ids_result = self.db.execute(material_ids_query, {"file_id": file_id}) material_ids = [row[0] for row in material_ids_result] if material_ids: logger.info(f"관련 자재 데이터 삭제 - {len(material_ids)}개 자재") # 각 상세 테이블에서 관련 데이터 삭제 for table in detail_tables: delete_detail_query = text(f"DELETE FROM {table} WHERE material_id = ANY(:material_ids)") self.db.execute(delete_detail_query, {"material_ids": material_ids}) # materials 테이블 데이터 삭제 materials_query = text("DELETE FROM materials WHERE file_id = :file_id") self.db.execute(materials_query, {"file_id": file_id}) def _delete_file_record(self, file_id: int): """파일 레코드 삭제""" delete_query = text("DELETE FROM files WHERE id = :file_id") self.db.execute(delete_query, {"file_id": file_id}) def _invalidate_file_cache(self, file_id: int, file_info): """파일 관련 캐시 무효화""" tkmp_cache.invalidate_file_cache(file_id) if hasattr(file_info, 'job_no') and file_info.job_no: tkmp_cache.invalidate_job_cache(file_info.job_no) async def get_file_statistics(self, job_no: Optional[str] = None) -> Dict: """ 파일 통계 조회 Args: job_no: 작업 번호 Returns: Dict: 파일 통계 """ try: # 캐시 확인 if job_no: cached_stats = tkmp_cache.get_statistics(job_no, "file_stats") if cached_stats: return cached_stats # 통계 쿼리 실행 stats_query = self._build_statistics_query(job_no) result = self.db.execute(text(stats_query["query"]), stats_query["params"]) stats_data = result.fetchall() # 통계 데이터 변환 statistics = self._convert_statistics_data(stats_data) # 캐시에 저장 if job_no: tkmp_cache.set_statistics(statistics, job_no, "file_stats") return statistics except Exception as e: logger.error(f"파일 통계 조회 실패: {str(e)}", exc_info=True) raise HTTPException(status_code=500, detail=f"파일 통계 조회 실패: {str(e)}") def _build_statistics_query(self, job_no: Optional[str]) -> Dict: """통계 쿼리 생성""" base_query = """ SELECT COUNT(*) as total_files, COUNT(DISTINCT job_no) as total_jobs, SUM(CASE WHEN is_active = true THEN 1 ELSE 0 END) as active_files, SUM(file_size) as total_size, AVG(file_size) as avg_size, MAX(upload_date) as latest_upload, MIN(upload_date) as earliest_upload FROM files """ params = {} if job_no: base_query += " WHERE job_no = :job_no" params["job_no"] = job_no return {"query": base_query, "params": params} def _convert_statistics_data(self, stats_data) -> Dict: """통계 데이터 변환""" if not stats_data: return { "total_files": 0, "total_jobs": 0, "active_files": 0, "total_size": 0, "avg_size": 0, "latest_upload": None, "earliest_upload": None } stats = stats_data[0] return { "total_files": stats.total_files or 0, "total_jobs": stats.total_jobs or 0, "active_files": stats.active_files or 0, "total_size": stats.total_size or 0, "total_size_mb": round((stats.total_size or 0) / (1024 * 1024), 2), "avg_size": stats.avg_size or 0, "avg_size_mb": round((stats.avg_size or 0) / (1024 * 1024), 2), "latest_upload": stats.latest_upload, "earliest_upload": stats.earliest_upload } async def validate_file_access(self, file_id: int, user_id: Optional[str] = None) -> bool: """ 파일 접근 권한 검증 Args: file_id: 파일 ID user_id: 사용자 ID Returns: bool: 접근 권한 여부 """ try: # 파일 존재 여부 확인 file_info = self._get_file_info(file_id) if not file_info: return False # 파일이 활성 상태인지 확인 if not file_info.is_active: logger.warning(f"비활성 파일 접근 시도 - file_id: {file_id}") return False # 추가 권한 검증 로직 (필요시 구현) # 예: 사용자별 프로젝트 접근 권한 등 return True except Exception as e: logger.error(f"파일 접근 권한 검증 실패: {str(e)}", exc_info=True) return False def get_file_service(db: Session) -> FileService: """파일 서비스 팩토리 함수""" return FileService(db)