from sqlalchemy.orm import Session from sqlalchemy import text from typing import List, Dict, Optional import json from datetime import datetime from app.services.integrated_classifier import classify_material_integrated, should_exclude_material from app.services.bolt_classifier import classify_bolt from app.services.flange_classifier import classify_flange from app.services.fitting_classifier import classify_fitting from app.services.gasket_classifier import classify_gasket from app.services.instrument_classifier import classify_instrument from app.services.valve_classifier import classify_valve from app.services.support_classifier import classify_support from app.services.plate_classifier import classify_plate from app.services.structural_classifier import classify_structural from app.services.pipe_classifier import classify_pipe_for_purchase, extract_end_preparation_info from app.services.material_grade_extractor import extract_full_material_grade class MaterialService: """자재 처리 및 저장을 담당하는 서비스""" @staticmethod def process_and_save_materials( db: Session, file_id: int, materials_data: List[Dict], revision_comparison: Optional[Dict] = None, parent_file_id: Optional[int] = None, purchased_materials_map: Optional[Dict] = None ) -> int: """ 자재 목록을 분류하고 DB에 저장합니다. Args: db: DB 세션 file_id: 파일 ID materials_data: 파싱된 자재 데이터 목록 revision_comparison: 리비전 비교 결과 parent_file_id: 이전 리비전 파일 ID purchased_materials_map: 구매 확정된 자재 매핑 정보 Returns: 저장된 자재 수 """ materials_inserted = 0 # 변경/신규 자재 키 집합 (리비전 추적용) changed_materials_keys = set() new_materials_keys = set() # 리비전 업로드인 경우 변경사항 분석 if parent_file_id is not None: MaterialService._analyze_changes( db, parent_file_id, materials_data, changed_materials_keys, new_materials_keys ) # 변경 없는 자재 (확정된 자재) 먼저 처리 if revision_comparison and revision_comparison.get("has_previous_confirmation", False): unchanged_materials = revision_comparison.get("unchanged_materials", []) for material_data in unchanged_materials: MaterialService._save_unchanged_material(db, file_id, material_data) materials_inserted += 1 # 분류가 필요한 자재 처리 (신규 또는 변경된 자재) # revision_comparison에서 필터링된 목록이 있으면 그것을 사용, 아니면 전체 materials_to_classify = materials_data if revision_comparison and revision_comparison.get("materials_to_classify"): materials_to_classify = revision_comparison.get("materials_to_classify") print(f"🔧 자재 분류 및 저장 시작: {len(materials_to_classify)}개") for material_data in materials_to_classify: MaterialService._classify_and_save_single_material( db, file_id, material_data, changed_materials_keys, new_materials_keys, purchased_materials_map ) materials_inserted += 1 return materials_inserted @staticmethod def _analyze_changes(db: Session, parent_file_id: int, materials_data: List[Dict], changed_keys: set, new_keys: set): """이전 리비전과 비교하여 변경/신규 자재를 식별합니다.""" try: prev_materials_query = text(""" SELECT original_description, size_spec, material_grade, main_nom, drawing_name, line_no, quantity FROM materials WHERE file_id = :parent_file_id """) prev_materials = db.execute(prev_materials_query, {"parent_file_id": parent_file_id}).fetchall() prev_dict = {} for pm in prev_materials: key = MaterialService._generate_material_key( pm.drawing_name, pm.line_no, pm.original_description, pm.size_spec, pm.material_grade ) prev_dict[key] = float(pm.quantity) if pm.quantity else 0 for mat in materials_data: new_key = MaterialService._generate_material_key( mat.get("dwg_name"), mat.get("line_num"), mat["original_description"], mat.get("size_spec"), mat.get("material_grade") ) if new_key in prev_dict: if abs(prev_dict[new_key] - float(mat.get("quantity", 0))) > 0.001: changed_keys.add(new_key) else: new_keys.add(new_key) except Exception as e: print(f"❌ 변경사항 분석 실패: {e}") @staticmethod def _generate_material_key(dwg, line, desc, size, grade): """자재 고유 키 생성""" parts = [] if dwg: parts.append(str(dwg)) elif line: parts.append(str(line)) parts.append(str(desc)) parts.append(str(size or '')) parts.append(str(grade or '')) return "|".join(parts) @staticmethod def _save_unchanged_material(db: Session, file_id: int, material_data: Dict): """변경 없는(확정된) 자재 저장""" previous_item = material_data.get("previous_item", {}) query = text(""" INSERT INTO materials ( file_id, original_description, classified_category, confidence, quantity, unit, size_spec, material_grade, specification, reused_from_confirmation, created_at ) VALUES ( :file_id, :desc, :category, 1.0, :qty, :unit, :size, :grade, :spec, TRUE, :created_at ) """) db.execute(query, { "file_id": file_id, "desc": material_data["original_description"], "category": previous_item.get("category", "UNCLASSIFIED"), "qty": material_data["quantity"], "unit": material_data.get("unit", "EA"), "size": material_data.get("size_spec", ""), "grade": previous_item.get("material", ""), "spec": previous_item.get("specification", ""), "created_at": datetime.now() }) @staticmethod def _classify_and_save_single_material( db: Session, file_id: int, material_data: Dict, changed_keys: set, new_keys: set, purchased_map: Optional[Dict] ): """단일 자재 분류 및 저장 (상세 정보 포함)""" description = material_data["original_description"] main_nom = material_data.get("main_nom", "") red_nom = material_data.get("red_nom", "") length_val = material_data.get("length") # 1. 통합 분류 integrated_result = classify_material_integrated(description, main_nom, red_nom, length_val) classification_result = integrated_result # 2. 상세 분류 if not should_exclude_material(description): category = integrated_result.get('category') if category == "PIPE": classification_result = classify_pipe_for_purchase("", description, main_nom, length_val) elif category == "FITTING": classification_result = classify_fitting("", description, main_nom, red_nom) elif category == "FLANGE": classification_result = classify_flange("", description, main_nom, red_nom) elif category == "VALVE": classification_result = classify_valve("", description, main_nom) elif category == "BOLT": classification_result = classify_bolt("", description, main_nom) elif category == "GASKET": classification_result = classify_gasket("", description, main_nom) elif category == "INSTRUMENT": classification_result = classify_instrument("", description, main_nom) elif category == "SUPPORT": classification_result = classify_support("", description, main_nom) elif category == "PLATE": classification_result = classify_plate("", description, main_nom) elif category == "STRUCTURAL": classification_result = classify_structural("", description, main_nom) # 신뢰도 조정 if integrated_result.get('confidence', 0) < 0.5: classification_result['overall_confidence'] = min( classification_result.get('overall_confidence', 1.0), integrated_result.get('confidence', 0.0) + 0.2 ) else: classification_result = {"category": "EXCLUDE", "overall_confidence": 0.95} # 3. 구매 확정 정보 상속 확인 is_purchase_confirmed = False purchase_confirmed_at = None purchase_confirmed_by = None if purchased_map: key = f"{description.strip().upper()}|{material_data.get('size_spec', '')}" if key in purchased_map: info = purchased_map[key] is_purchase_confirmed = True purchase_confirmed_at = info.get("purchase_confirmed_at") purchase_confirmed_by = info.get("purchase_confirmed_by") # 4. 자재 기본 정보 저장 full_grade = extract_full_material_grade(description) or material_data.get("material_grade", "") insert_query = text(""" INSERT INTO materials ( file_id, original_description, quantity, unit, size_spec, main_nom, red_nom, material_grade, full_material_grade, line_number, row_number, classified_category, classification_confidence, is_verified, drawing_name, line_no, created_at, purchase_confirmed, purchase_confirmed_at, purchase_confirmed_by, revision_status ) VALUES ( :file_id, :desc, :qty, :unit, :size, :main, :red, :grade, :full_grade, :line_num, :row_num, :category, :confidence, :verified, :dwg, :line, :created_at, :confirmed, :confirmed_at, :confirmed_by, :status ) RETURNING id """) # 리비전 상태 결정 mat_key = MaterialService._generate_material_key( material_data.get("dwg_name"), material_data.get("line_num"), description, material_data.get("size_spec"), material_data.get("material_grade") ) rev_status = 'changed' if mat_key in changed_keys else ('active' if mat_key in new_keys else None) result = db.execute(insert_query, { "file_id": file_id, "desc": description, "qty": material_data["quantity"], "unit": material_data["unit"], "size": material_data.get("size_spec", ""), "main": main_nom, "red": red_nom, "grade": material_data.get("material_grade", ""), "full_grade": full_grade, "line_num": material_data.get("line_number"), "row_num": material_data.get("row_number"), "category": classification_result.get("category", "UNCLASSIFIED"), "confidence": classification_result.get("overall_confidence", 0.0), "verified": False, "dwg": material_data.get("dwg_name"), "line": material_data.get("line_num"), "created_at": datetime.now(), "confirmed": is_purchase_confirmed, "confirmed_at": purchase_confirmed_at, "confirmed_by": purchase_confirmed_by, "status": rev_status }) material_id = result.fetchone()[0] # 5. 상세 정보 저장 (별도 메서드로 분리) MaterialService._save_material_details( db, material_id, file_id, classification_result, material_data ) @staticmethod def _save_material_details(db: Session, material_id: int, file_id: int, result: Dict, data: Dict): """카테고리별 상세 정보 저장""" category = result.get("category") if category == "PIPE": MaterialService._save_pipe_details(db, material_id, file_id, result, data) elif category == "FITTING": MaterialService._save_fitting_details(db, material_id, file_id, result, data) elif category == "FLANGE": MaterialService._save_flange_details(db, material_id, file_id, result, data) elif category == "BOLT": MaterialService._save_bolt_details(db, material_id, file_id, result, data) elif category == "VALVE": MaterialService._save_valve_details(db, material_id, file_id, result, data) elif category == "GASKET": MaterialService._save_gasket_details(db, material_id, file_id, result, data) elif category == "SUPPORT": MaterialService._save_support_details(db, material_id, file_id, result, data) elif category == "PLATE": MaterialService._save_plate_details(db, material_id, file_id, result, data) elif category == "STRUCTURAL": MaterialService._save_structural_details(db, material_id, file_id, result, data) @staticmethod def _save_plate_details(db: Session, mid: int, fid: int, res: Dict, data: Dict): """판재 정보 업데이트 (현재는 materials 테이블에 요약 정보 저장)""" details = res.get("details", {}) spec = f"{details.get('thickness')}T x {details.get('dimensions')}" db.execute(text(""" UPDATE materials SET size_spec = :size, material_grade = :mat WHERE id = :id """), {"size": spec, "mat": details.get("material"), "id": mid}) @staticmethod def _save_structural_details(db: Session, mid: int, fid: int, res: Dict, data: Dict): """형강 정보 업데이트 (현재는 materials 테이블에 요약 정보 저장)""" details = res.get("details", {}) spec = f"{details.get('type')} {details.get('dimension')}" db.execute(text(""" UPDATE materials SET size_spec = :size WHERE id = :id """), {"size": spec, "id": mid}) # --- 각 카테고리별 상세 저장 메서드들 (기존 로직 이관) --- @staticmethod def inherit_purchase_requests(db: Session, current_file_id: int, parent_file_id: int): """이전 리비전의 구매신청 정보를 상속합니다.""" try: print(f"🔄 구매신청 정보 상속 처리 시작...") # 1. 이전 리비전에서 그룹별 구매신청 수량 집계 prev_purchase_summary = text(""" SELECT m.original_description, m.size_spec, m.material_grade, m.drawing_name, COUNT(DISTINCT pri.material_id) as purchased_count, SUM(pri.quantity) as total_purchased_qty, MIN(pri.request_id) as request_id FROM materials m JOIN purchase_request_items pri ON m.id = pri.material_id WHERE m.file_id = :parent_file_id GROUP BY m.original_description, m.size_spec, m.material_grade, m.drawing_name """) prev_purchases = db.execute(prev_purchase_summary, {"parent_file_id": parent_file_id}).fetchall() # 2. 새 리비전에서 같은 그룹의 자재에 수량만큼 구매신청 상속 for prev_purchase in prev_purchases: purchased_count = prev_purchase.purchased_count # 새 리비전에서 같은 그룹의 자재 조회 (순서대로) new_group_materials = text(""" SELECT id, quantity FROM materials WHERE file_id = :file_id AND original_description = :description AND COALESCE(size_spec, '') = :size_spec AND COALESCE(material_grade, '') = :material_grade AND COALESCE(drawing_name, '') = :drawing_name ORDER BY id LIMIT :limit """) new_materials = db.execute(new_group_materials, { "file_id": current_file_id, "description": prev_purchase.original_description, "size_spec": prev_purchase.size_spec or '', "material_grade": prev_purchase.material_grade or '', "drawing_name": prev_purchase.drawing_name or '', "limit": purchased_count }).fetchall() # 구매신청 수량만큼만 상속 for new_mat in new_materials: inherit_query = text(""" INSERT INTO purchase_request_items ( request_id, material_id, quantity, unit, user_requirement ) VALUES ( :request_id, :material_id, :quantity, 'EA', '' ) ON CONFLICT DO NOTHING """) db.execute(inherit_query, { "request_id": prev_purchase.request_id, "material_id": new_mat.id, "quantity": new_mat.quantity }) inherited_count = len(new_materials) if inherited_count > 0: print(f" ✅ {prev_purchase.original_description[:30]}... (도면: {prev_purchase.drawing_name or 'N/A'}) → {inherited_count}/{purchased_count}개 상속") # 커밋은 호출하는 쪽에서 일괄 처리하거나 여기서 처리 # db.commit() print(f"✅ 구매신청 정보 상속 완료") except Exception as e: print(f"❌ 구매신청 정보 상속 실패: {str(e)}") # 상속 실패는 전체 프로세스를 중단하지 않음 @staticmethod def _save_pipe_details(db, mid, fid, res, data): # PIPE 상세 저장 로직 end_prep_info = extract_end_preparation_info(data["original_description"]) # 1. End Prep 정보 저장 db.execute(text(""" INSERT INTO pipe_end_preparations ( material_id, file_id, end_preparation_type, end_preparation_code, machining_required, cutting_note, original_description, confidence ) VALUES ( :mid, :fid, :type, :code, :req, :note, :desc, :conf ) """), { "mid": mid, "fid": fid, "type": end_prep_info["end_preparation_type"], "code": end_prep_info["end_preparation_code"], "req": end_prep_info["machining_required"], "note": end_prep_info["cutting_note"], "desc": end_prep_info["original_description"], "conf": end_prep_info["confidence"] }) # 2. Pipe Details 저장 length_info = res.get("length_info", {}) length_mm = length_info.get("length_mm") or data.get("length", 0.0) mat_info = res.get("material", {}) sch_info = res.get("schedule", {}) # 재질 정보 업데이트 if mat_info.get("grade"): db.execute(text("UPDATE materials SET material_grade = :g WHERE id = :id"), {"g": mat_info.get("grade"), "id": mid}) db.execute(text(""" INSERT INTO pipe_details ( material_id, file_id, outer_diameter, schedule, material_spec, manufacturing_method, length_mm ) VALUES (:mid, :fid, :od, :sch, :spec, :method, :len) """), { "mid": mid, "fid": fid, "od": data.get("main_nom") or data.get("size_spec"), "sch": sch_info.get("schedule", "UNKNOWN") if isinstance(sch_info, dict) else str(sch_info), "spec": mat_info.get("grade", "") if isinstance(mat_info, dict) else "", "method": res.get("manufacturing", {}).get("method", "UNKNOWN") if isinstance(res.get("manufacturing"), dict) else "UNKNOWN", "len": length_mm or 0.0 }) @staticmethod def _save_fitting_details(db, mid, fid, res, data): fit_type = res.get("fitting_type", {}) mat_info = res.get("material", {}) if mat_info.get("grade"): db.execute(text("UPDATE materials SET material_grade = :g WHERE id = :id"), {"g": mat_info.get("grade"), "id": mid}) db.execute(text(""" INSERT INTO fitting_details ( material_id, file_id, fitting_type, fitting_subtype, connection_method, pressure_rating, material_grade, main_size, reduced_size ) VALUES (:mid, :fid, :type, :subtype, :conn, :rating, :grade, :main, :red) """), { "mid": mid, "fid": fid, "type": fit_type.get("type", "UNKNOWN") if isinstance(fit_type, dict) else str(fit_type), "subtype": fit_type.get("subtype", "UNKNOWN") if isinstance(fit_type, dict) else "UNKNOWN", "conn": res.get("connection_method", {}).get("method", "UNKNOWN") if isinstance(res.get("connection_method"), dict) else "UNKNOWN", "rating": res.get("pressure_rating", {}).get("rating", "UNKNOWN") if isinstance(res.get("pressure_rating"), dict) else "UNKNOWN", "grade": mat_info.get("grade", "") if isinstance(mat_info, dict) else "", "main": data.get("main_nom") or data.get("size_spec"), "red": data.get("red_nom", "") }) @staticmethod def _save_flange_details(db, mid, fid, res, data): flg_type = res.get("flange_type", {}) mat_info = res.get("material", {}) if mat_info.get("grade"): db.execute(text("UPDATE materials SET material_grade = :g WHERE id = :id"), {"g": mat_info.get("grade"), "id": mid}) db.execute(text(""" INSERT INTO flange_details ( material_id, file_id, flange_type, pressure_rating, facing_type, material_grade, size_inches ) VALUES (:mid, :fid, :type, :rating, :face, :grade, :size) """), { "mid": mid, "fid": fid, "type": flg_type.get("type", "UNKNOWN") if isinstance(flg_type, dict) else str(flg_type), "rating": res.get("pressure_rating", {}).get("rating", "UNKNOWN") if isinstance(res.get("pressure_rating"), dict) else "UNKNOWN", "face": res.get("face_finish", {}).get("finish", "UNKNOWN") if isinstance(res.get("face_finish"), dict) else "UNKNOWN", "grade": mat_info.get("grade", "") if isinstance(mat_info, dict) else "", "size": data.get("main_nom") or data.get("size_spec") }) @staticmethod def _save_bolt_details(db, mid, fid, res, data): fast_type = res.get("fastener_type", {}) mat_info = res.get("material", {}) dim_info = res.get("dimensions", {}) if mat_info.get("grade"): db.execute(text("UPDATE materials SET material_grade = :g WHERE id = :id"), {"g": mat_info.get("grade"), "id": mid}) # 볼트 타입 결정 (특수 용도 고려) bolt_type = fast_type.get("type", "UNKNOWN") if isinstance(fast_type, dict) else str(fast_type) special_apps = res.get("special_applications", {}).get("detected_applications", []) if "LT" in special_apps: bolt_type = "LT_BOLT" elif "PSV" in special_apps: bolt_type = "PSV_BOLT" # 코팅 타입 desc_upper = data["original_description"].upper() coating = "UNKNOWN" if "GALV" in desc_upper: coating = "GALVANIZED" elif "ZINC" in desc_upper: coating = "ZINC_PLATED" db.execute(text(""" INSERT INTO bolt_details ( material_id, file_id, bolt_type, thread_type, diameter, length, material_grade, coating_type ) VALUES (:mid, :fid, :type, :thread, :dia, :len, :grade, :coating) """), { "mid": mid, "fid": fid, "type": bolt_type, "thread": res.get("thread_specification", {}).get("standard", "UNKNOWN") if isinstance(res.get("thread_specification"), dict) else "UNKNOWN", "dia": dim_info.get("nominal_size", data.get("main_nom", "")), "len": dim_info.get("length", ""), "grade": mat_info.get("grade", "") if isinstance(mat_info, dict) else "", "coating": coating }) @staticmethod def _save_valve_details(db, mid, fid, res, data): val_type = res.get("valve_type", {}) mat_info = res.get("material", {}) if mat_info.get("grade"): db.execute(text("UPDATE materials SET material_grade = :g WHERE id = :id"), {"g": mat_info.get("grade"), "id": mid}) db.execute(text(""" INSERT INTO valve_details ( material_id, file_id, valve_type, connection_method, pressure_rating, body_material, size_inches ) VALUES (:mid, :fid, :type, :conn, :rating, :body, :size) """), { "mid": mid, "fid": fid, "type": val_type.get("type", "UNKNOWN") if isinstance(val_type, dict) else str(val_type), "conn": res.get("connection_method", {}).get("method", "UNKNOWN") if isinstance(res.get("connection_method"), dict) else "UNKNOWN", "rating": res.get("pressure_rating", {}).get("rating", "UNKNOWN") if isinstance(res.get("pressure_rating"), dict) else "UNKNOWN", "body": mat_info.get("grade", "") if isinstance(mat_info, dict) else "", "size": data.get("main_nom") or data.get("size_spec") }) @staticmethod def _save_gasket_details(db, mid, fid, res, data): gask_type = res.get("gasket_type", {}) db.execute(text(""" INSERT INTO gasket_details ( material_id, file_id, gasket_type, pressure_rating, size_inches ) VALUES (:mid, :fid, :type, :rating, :size) """), { "mid": mid, "fid": fid, "type": gask_type.get("type", "UNKNOWN") if isinstance(gask_type, dict) else str(gask_type), "rating": res.get("pressure_rating", {}).get("rating", "UNKNOWN") if isinstance(res.get("pressure_rating"), dict) else "UNKNOWN", "size": data.get("main_nom") or data.get("size_spec") }) @staticmethod def _save_support_details(db, mid, fid, res, data): db.execute(text(""" INSERT INTO support_details ( material_id, file_id, support_type, pipe_size ) VALUES (:mid, :fid, :type, :size) """), { "mid": mid, "fid": fid, "type": res.get("support_type", "UNKNOWN"), "size": res.get("size_info", {}).get("pipe_size", "") })