""" 구매 수량 계산 서비스 - 자재별 여유율 적용 - PIPE: 절단 손실 + 6M 단위 계산 - 기타: 최소 주문 수량 적용 """ import math from typing import Dict, List, Tuple from sqlalchemy.orm import Session from sqlalchemy import text # 자재별 기본 여유율 SAFETY_FACTORS = { 'PIPE': 1.15, # 15% 추가 (절단 손실) 'FITTING': 1.10, # 10% 추가 (연결 오차) 'VALVE': 1.50, # 50% 추가 (예비품) 'FLANGE': 1.10, # 10% 추가 'BOLT': 1.20, # 20% 추가 (분실율) 'GASKET': 1.25, # 25% 추가 (교체주기) 'INSTRUMENT': 1.00, # 0% 추가 (정확한 수량) 'DEFAULT': 1.10 # 기본 10% 추가 } # 최소 주문 수량 (자재별) MINIMUM_ORDER_QTY = { 'PIPE': 6000, # 6M 단위 'FITTING': 1, # 개별 주문 가능 'VALVE': 1, # 개별 주문 가능 'FLANGE': 1, # 개별 주문 가능 'BOLT': 50, # 박스 단위 (50개) 'GASKET': 10, # 세트 단위 'INSTRUMENT': 1, # 개별 주문 가능 'DEFAULT': 1 } def calculate_pipe_purchase_quantity(materials: List[Dict]) -> Dict: """ PIPE 구매 수량 계산 - 각 절단마다 3mm 손실 - 6,000mm (6M) 단위로 올림 """ total_bom_length = 0 cutting_count = 0 pipe_details = [] for material in materials: # 길이 정보 추출 length_mm = float(material.get('length_mm', 0) or 0) if length_mm > 0: total_bom_length += length_mm cutting_count += 1 pipe_details.append({ 'description': material.get('original_description', ''), 'length_mm': length_mm, 'quantity': material.get('quantity', 1) }) # 절단 손실 계산 (각 절단마다 3mm) cutting_loss = cutting_count * 3 # 총 필요 길이 = BOM 길이 + 절단 손실 required_length = total_bom_length + cutting_loss # 6M 단위로 올림 계산 standard_length = 6000 # 6M = 6,000mm pipes_needed = math.ceil(required_length / standard_length) if required_length > 0 else 0 total_purchase_length = pipes_needed * standard_length waste_length = total_purchase_length - required_length if pipes_needed > 0 else 0 return { 'bom_quantity': total_bom_length, 'cutting_count': cutting_count, 'cutting_loss': cutting_loss, 'required_length': required_length, 'pipes_count': pipes_needed, 'calculated_qty': total_purchase_length, 'waste_length': waste_length, 'utilization_rate': (required_length / total_purchase_length * 100) if total_purchase_length > 0 else 0, 'unit': 'mm', 'pipe_details': pipe_details } def calculate_standard_purchase_quantity(category: str, bom_quantity: float, safety_factor: float = None) -> Dict: """ 일반 자재 구매 수량 계산 - 여유율 적용 - 최소 주문 수량 적용 """ # 여유율 결정 if safety_factor is None: safety_factor = SAFETY_FACTORS.get(category, SAFETY_FACTORS['DEFAULT']) # 1단계: 여유율 적용 safety_qty = bom_quantity * safety_factor # 2단계: 최소 주문 수량 확인 min_order_qty = MINIMUM_ORDER_QTY.get(category, MINIMUM_ORDER_QTY['DEFAULT']) # 3단계: 최소 주문 수량과 비교하여 큰 값 선택 calculated_qty = max(safety_qty, min_order_qty) # 4단계: 특별 처리 (BOLT는 박스 단위로 올림) if category == 'BOLT' and calculated_qty > min_order_qty: calculated_qty = math.ceil(calculated_qty / min_order_qty) * min_order_qty return { 'bom_quantity': bom_quantity, 'safety_factor': safety_factor, 'safety_qty': safety_qty, 'min_order_qty': min_order_qty, 'calculated_qty': calculated_qty, 'waste_quantity': calculated_qty - bom_quantity, 'utilization_rate': (bom_quantity / calculated_qty * 100) if calculated_qty > 0 else 0 } def generate_purchase_items_from_materials(db: Session, file_id: int, job_no: str, revision: str) -> List[Dict]: """ 자재 데이터로부터 구매 품목 생성 """ # 1. 파일의 모든 자재 조회 materials_query = text(""" SELECT m.*, pd.length_mm, pd.outer_diameter, pd.schedule, pd.material_spec as pipe_material_spec, fd.fitting_type, fd.connection_method as fitting_connection, vd.valve_type, vd.connection_method as valve_connection, vd.pressure_rating as valve_pressure, fl.flange_type, fl.pressure_rating as flange_pressure, gd.gasket_type, gd.material_type as gasket_material, bd.bolt_type, bd.material_standard, bd.diameter, id.instrument_type FROM materials m LEFT JOIN pipe_details pd ON m.id = pd.material_id LEFT JOIN fitting_details fd ON m.id = fd.material_id LEFT JOIN valve_details vd ON m.id = vd.material_id LEFT JOIN flange_details fl ON m.id = fl.material_id LEFT JOIN gasket_details gd ON m.id = gd.material_id LEFT JOIN bolt_details bd ON m.id = bd.material_id LEFT JOIN instrument_details id ON m.id = id.material_id WHERE m.file_id = :file_id ORDER BY m.classified_category, m.original_description """) materials = db.execute(materials_query, {"file_id": file_id}).fetchall() # 2. 카테고리별로 그룹핑 grouped_materials = {} for material in materials: category = material.classified_category or 'OTHER' if category not in grouped_materials: grouped_materials[category] = [] grouped_materials[category].append(dict(material)) # 3. 각 카테고리별로 구매 품목 생성 purchase_items = [] for category, category_materials in grouped_materials.items(): if category == 'PIPE': # PIPE는 재질+사이즈+스케줄별로 그룹핑 pipe_groups = {} for material in category_materials: # 그룹핑 키 생성 material_spec = material.get('pipe_material_spec') or material.get('material_grade', '') outer_diameter = material.get('outer_diameter') or material.get('main_nom', '') schedule = material.get('schedule', '') group_key = f"{material_spec}|{outer_diameter}|{schedule}" if group_key not in pipe_groups: pipe_groups[group_key] = [] pipe_groups[group_key].append(material) # 각 PIPE 그룹별로 구매 수량 계산 for group_key, group_materials in pipe_groups.items(): pipe_calc = calculate_pipe_purchase_quantity(group_materials) if pipe_calc['calculated_qty'] > 0: material_spec, outer_diameter, schedule = group_key.split('|') # 품목 코드 생성 item_code = generate_item_code('PIPE', material_spec, outer_diameter, schedule) # 사양 생성 spec_parts = [f"PIPE {outer_diameter}"] if schedule: spec_parts.append(schedule) if material_spec: spec_parts.append(material_spec) specification = ', '.join(spec_parts) purchase_item = { 'item_code': item_code, 'category': 'PIPE', 'specification': specification, 'material_spec': material_spec, 'size_spec': outer_diameter, 'unit': 'mm', **pipe_calc, 'job_no': job_no, 'revision': revision, 'file_id': file_id, 'materials': group_materials } purchase_items.append(purchase_item) else: # 기타 자재들은 사양별로 그룹핑 spec_groups = generate_material_specs_for_category(category_materials, category) for spec_key, spec_data in spec_groups.items(): if spec_data['totalQuantity'] > 0: # 구매 수량 계산 calc_result = calculate_standard_purchase_quantity( category, spec_data['totalQuantity'] ) # 품목 코드 생성 item_code = generate_item_code(category, spec_data.get('material_spec', ''), spec_data.get('size_display', '')) purchase_item = { 'item_code': item_code, 'category': category, 'specification': spec_data.get('full_spec', spec_key), 'material_spec': spec_data.get('material_spec', ''), 'size_spec': spec_data.get('size_display', ''), 'unit': spec_data.get('unit', 'EA'), **calc_result, 'job_no': job_no, 'revision': revision, 'file_id': file_id, 'materials': spec_data['items'] } purchase_items.append(purchase_item) return purchase_items def generate_material_specs_for_category(materials: List[Dict], category: str) -> Dict: """카테고리별 자재 사양 그룹핑 (MaterialsPage.jsx 로직과 동일)""" specs = {} for material in materials: spec_key = '' spec_data = {} if category == 'FITTING': fitting_type = material.get('fitting_type', 'FITTING') connection_method = material.get('fitting_connection', '') material_spec = material.get('material_grade', '') main_nom = material.get('main_nom', '') red_nom = material.get('red_nom', '') size_display = f"{main_nom} x {red_nom}" if red_nom else main_nom spec_parts = [fitting_type] if connection_method: spec_parts.append(connection_method) full_spec = ', '.join(spec_parts) spec_key = f"FITTING|{full_spec}|{material_spec}|{size_display}" spec_data = { 'category': 'FITTING', 'full_spec': full_spec, 'material_spec': material_spec, 'size_display': size_display, 'unit': 'EA' } elif category == 'VALVE': valve_type = material.get('valve_type', 'VALVE') connection_method = material.get('valve_connection', '') pressure_rating = material.get('valve_pressure', '') material_spec = material.get('material_grade', '') main_nom = material.get('main_nom', '') spec_parts = [valve_type.replace('_', ' ')] if connection_method: spec_parts.append(connection_method.replace('_', ' ')) if pressure_rating: spec_parts.append(pressure_rating) full_spec = ', '.join(spec_parts) spec_key = f"VALVE|{full_spec}|{material_spec}|{main_nom}" spec_data = { 'category': 'VALVE', 'full_spec': full_spec, 'material_spec': material_spec, 'size_display': main_nom, 'unit': 'EA' } elif category == 'FLANGE': flange_type = material.get('flange_type', 'FLANGE') pressure_rating = material.get('flange_pressure', '') material_spec = material.get('material_grade', '') main_nom = material.get('main_nom', '') spec_parts = [flange_type] if pressure_rating: spec_parts.append(pressure_rating) full_spec = ', '.join(spec_parts) spec_key = f"FLANGE|{full_spec}|{material_spec}|{main_nom}" spec_data = { 'category': 'FLANGE', 'full_spec': full_spec, 'material_spec': material_spec, 'size_display': main_nom, 'unit': 'EA' } elif category == 'BOLT': bolt_type = material.get('bolt_type', 'BOLT') material_standard = material.get('material_standard', '') diameter = material.get('diameter', material.get('main_nom', '')) material_spec = material_standard or material.get('material_grade', '') spec_parts = [bolt_type.replace('_', ' ')] if material_standard: spec_parts.append(material_standard) full_spec = ', '.join(spec_parts) spec_key = f"BOLT|{full_spec}|{material_spec}|{diameter}" spec_data = { 'category': 'BOLT', 'full_spec': full_spec, 'material_spec': material_spec, 'size_display': diameter, 'unit': 'EA' } elif category == 'GASKET': gasket_type = material.get('gasket_type', 'GASKET') gasket_material = material.get('gasket_material', '') material_spec = gasket_material or material.get('material_grade', '') main_nom = material.get('main_nom', '') spec_parts = [gasket_type] if gasket_material: spec_parts.append(gasket_material) full_spec = ', '.join(spec_parts) spec_key = f"GASKET|{full_spec}|{material_spec}|{main_nom}" spec_data = { 'category': 'GASKET', 'full_spec': full_spec, 'material_spec': material_spec, 'size_display': main_nom, 'unit': 'EA' } elif category == 'INSTRUMENT': instrument_type = material.get('instrument_type', 'INSTRUMENT') material_spec = material.get('material_grade', '') main_nom = material.get('main_nom', '') full_spec = instrument_type.replace('_', ' ') spec_key = f"INSTRUMENT|{full_spec}|{material_spec}|{main_nom}" spec_data = { 'category': 'INSTRUMENT', 'full_spec': full_spec, 'material_spec': material_spec, 'size_display': main_nom, 'unit': 'EA' } else: # 기타 자재 material_spec = material.get('material_grade', '') size_display = material.get('main_nom') or material.get('size_spec', '') spec_key = f"{category}|{material_spec}|{size_display}" spec_data = { 'category': category, 'full_spec': material_spec, 'material_spec': material_spec, 'size_display': size_display, 'unit': 'EA' } # 스펙별 수량 집계 if spec_key not in specs: specs[spec_key] = { **spec_data, 'totalQuantity': 0, 'count': 0, 'items': [] } specs[spec_key]['totalQuantity'] += material.get('quantity', 0) specs[spec_key]['count'] += 1 specs[spec_key]['items'].append(material) return specs def generate_item_code(category: str, material_spec: str = '', size_spec: str = '', schedule: str = '') -> str: """구매 품목 코드 생성""" import hashlib # 기본 접두사 prefix = f"PI-{category}" # 재질 약어 생성 material_abbr = '' if 'A106' in material_spec: material_abbr = 'A106' elif 'A333' in material_spec: material_abbr = 'A333' elif 'SS316' in material_spec or '316' in material_spec: material_abbr = 'SS316' elif 'A105' in material_spec: material_abbr = 'A105' elif material_spec: material_abbr = material_spec.replace(' ', '')[:6] # 사이즈 약어 size_abbr = size_spec.replace('"', 'IN').replace(' ', '').replace('x', 'X')[:10] # 스케줄 (PIPE용) schedule_abbr = schedule.replace(' ', '')[:6] # 유니크 해시 생성 (중복 방지) unique_str = f"{category}|{material_spec}|{size_spec}|{schedule}" hash_suffix = hashlib.md5(unique_str.encode()).hexdigest()[:4].upper() # 최종 코드 조합 code_parts = [prefix] if material_abbr: code_parts.append(material_abbr) if size_abbr: code_parts.append(size_abbr) if schedule_abbr: code_parts.append(schedule_abbr) code_parts.append(hash_suffix) return '-'.join(code_parts) def save_purchase_items_to_db(db: Session, purchase_items: List[Dict]) -> List[int]: """구매 품목을 데이터베이스에 저장""" saved_ids = [] for item in purchase_items: # 기존 품목 확인 (동일 사양이 있는지) existing_query = text(""" SELECT id FROM purchase_items WHERE job_no = :job_no AND revision = :revision AND item_code = :item_code """) existing = db.execute(existing_query, { 'job_no': item['job_no'], 'revision': item['revision'], 'item_code': item['item_code'] }).fetchone() if existing: # 기존 품목 업데이트 update_query = text(""" UPDATE purchase_items SET bom_quantity = :bom_quantity, calculated_qty = :calculated_qty, safety_factor = :safety_factor, cutting_loss = :cutting_loss, pipes_count = :pipes_count, waste_length = :waste_length, detailed_spec = :detailed_spec, updated_at = CURRENT_TIMESTAMP WHERE id = :id """) db.execute(update_query, { 'id': existing.id, 'bom_quantity': item['bom_quantity'], 'calculated_qty': item['calculated_qty'], 'safety_factor': item.get('safety_factor', 1.0), 'cutting_loss': item.get('cutting_loss', 0), 'pipes_count': item.get('pipes_count'), 'waste_length': item.get('waste_length'), 'detailed_spec': item.get('detailed_spec', '{}') }) saved_ids.append(existing.id) else: # 새 품목 생성 insert_query = text(""" INSERT INTO purchase_items ( item_code, category, specification, material_spec, size_spec, unit, bom_quantity, safety_factor, minimum_order_qty, calculated_qty, cutting_loss, standard_length, pipes_count, waste_length, job_no, revision, file_id, is_active, created_by ) VALUES ( :item_code, :category, :specification, :material_spec, :size_spec, :unit, :bom_quantity, :safety_factor, :minimum_order_qty, :calculated_qty, :cutting_loss, :standard_length, :pipes_count, :waste_length, :job_no, :revision, :file_id, :is_active, :created_by ) RETURNING id """) result = db.execute(insert_query, { 'item_code': item['item_code'], 'category': item['category'], 'specification': item['specification'], 'material_spec': item['material_spec'], 'size_spec': item['size_spec'], 'unit': item['unit'], 'bom_quantity': item['bom_quantity'], 'safety_factor': item.get('safety_factor', 1.0), 'minimum_order_qty': item.get('min_order_qty', 0), 'calculated_qty': item['calculated_qty'], 'cutting_loss': item.get('cutting_loss', 0), 'standard_length': item.get('standard_length', 6000 if item['category'] == 'PIPE' else None), 'pipes_count': item.get('pipes_count'), 'waste_length': item.get('waste_length'), 'job_no': item['job_no'], 'revision': item['revision'], 'file_id': item['file_id'], 'is_active': True, 'created_by': 'system' }) result_row = result.fetchone() new_id = result_row[0] if result_row else None saved_ids.append(new_id) # 개별 자재와 구매 품목 연결 for material in item['materials']: mapping_query = text(""" INSERT INTO material_purchase_mapping (material_id, purchase_item_id) VALUES (:material_id, :purchase_item_id) ON CONFLICT (material_id, purchase_item_id) DO NOTHING """) db.execute(mapping_query, { 'material_id': material['id'], 'purchase_item_id': new_id }) db.commit() return saved_ids