""" 구매신청 관리 API """ from fastapi import APIRouter, Depends, HTTPException, status from fastapi.responses import FileResponse from pydantic import BaseModel from sqlalchemy import text from sqlalchemy.orm import Session from typing import Optional, List, Dict from datetime import datetime import os import json from ..database import get_db from ..auth.middleware import get_current_user from ..utils.logger import get_logger logger = get_logger(__name__) router = APIRouter(prefix="/purchase-request", tags=["Purchase Request"]) # 엑셀 파일 저장 경로 EXCEL_DIR = "exports" os.makedirs(EXCEL_DIR, exist_ok=True) class PurchaseRequestCreate(BaseModel): file_id: int job_no: Optional[str] = None category: Optional[str] = None material_ids: List[int] = [] materials_data: List[Dict] = [] grouped_materials: Optional[List[Dict]] = [] # 그룹화된 자재 정보 @router.post("/create") async def create_purchase_request( request_data: PurchaseRequestCreate, current_user: dict = Depends(get_current_user), db: Session = Depends(get_db) ): """ 구매신청 생성 (엑셀 내보내기 = 구매신청) """ try: # 구매신청 번호 생성 today = datetime.now().strftime('%Y%m%d') count_query = text(""" SELECT COUNT(*) as count FROM purchase_requests WHERE request_no LIKE :pattern """) count = db.execute(count_query, {"pattern": f"PR-{today}%"}).fetchone().count request_no = f"PR-{today}-{str(count + 1).zfill(3)}" # 자재 데이터를 JSON과 엑셀 파일로 저장 json_filename = f"{request_no}.json" excel_filename = f"{request_no}.xlsx" json_path = os.path.join(EXCEL_DIR, json_filename) excel_path = os.path.join(EXCEL_DIR, excel_filename) # JSON 저장 save_materials_data( request_data.materials_data, json_path, request_no, request_data.job_no, request_data.grouped_materials # 그룹화 정보 추가 ) # 엑셀 파일 생성 및 저장 create_excel_file( request_data.grouped_materials or request_data.materials_data, excel_path, request_no, request_data.job_no ) # 구매신청 레코드 생성 insert_request = text(""" INSERT INTO purchase_requests ( request_no, file_id, job_no, category, material_count, excel_file_path, requested_by ) VALUES ( :request_no, :file_id, :job_no, :category, :material_count, :excel_file_path, :requested_by ) RETURNING request_id """) result = db.execute(insert_request, { "request_no": request_no, "file_id": request_data.file_id, "job_no": request_data.job_no, "category": request_data.category, "material_count": len(request_data.material_ids), "excel_file_path": excel_filename, # 엑셀 파일명 저장 (JSON 대신) "requested_by": current_user.get("user_id") }) request_id = result.fetchone().request_id # 구매신청 자재 상세 저장 logger.info(f"Processing {len(request_data.material_ids)} material IDs for purchase request {request_no}") logger.info(f"First 10 Material IDs: {request_data.material_ids[:10]}") # 처음 10개만 로그 logger.info(f"Category: {request_data.category}, Job: {request_data.job_no}") inserted_count = 0 for i, material_id in enumerate(request_data.material_ids): material_data = request_data.materials_data[i] if i < len(request_data.materials_data) else {} # 이미 구매신청된 자재인지 확인 check_existing = text(""" SELECT 1 FROM purchase_request_items WHERE material_id = :material_id """) existing = db.execute(check_existing, {"material_id": material_id}).fetchone() if not existing: insert_item = text(""" INSERT INTO purchase_request_items ( request_id, material_id, quantity, unit, user_requirement ) VALUES ( :request_id, :material_id, :quantity, :unit, :user_requirement ) """) # quantity를 정수로 변환 (소수점 제거) quantity_str = str(material_data.get("quantity", 0)) try: quantity = int(float(quantity_str)) except (ValueError, TypeError): quantity = 0 db.execute(insert_item, { "request_id": request_id, "material_id": material_id, "quantity": quantity, "unit": material_data.get("unit", ""), "user_requirement": material_data.get("user_requirement", "") }) inserted_count += 1 else: logger.warning(f"Material {material_id} already in another purchase request, skipping") db.commit() logger.info(f"Purchase request created: {request_no} with {inserted_count} materials (out of {len(request_data.material_ids)} requested)") # 실제 저장된 자재 확인 verify_query = text(""" SELECT COUNT(*) as count FROM purchase_request_items WHERE request_id = :request_id """) verified_count = db.execute(verify_query, {"request_id": request_id}).fetchone().count logger.info(f"✅ DB 검증: purchase_request_items에 {verified_count}개 저장됨") return { "success": True, "request_no": request_no, "request_id": request_id, "material_count": len(request_data.material_ids), "inserted_count": inserted_count, "verified_count": verified_count, "message": f"구매신청 {request_no}이 생성되었습니다" } except Exception as e: db.rollback() logger.error(f"Failed to create purchase request: {str(e)}") raise HTTPException( status_code=500, detail=f"구매신청 생성 실패: {str(e)}" ) @router.get("/list") async def get_purchase_requests( file_id: Optional[int] = None, job_no: Optional[str] = None, status: Optional[str] = None, # current_user: dict = Depends(get_current_user), db: Session = Depends(get_db) ): """ 구매신청 목록 조회 """ try: query = text(""" SELECT pr.request_id, pr.request_no, pr.file_id, pr.job_no, pr.category, pr.material_count, pr.excel_file_path, pr.requested_at, pr.status, u.name as requested_by, f.original_filename, j.job_name, COUNT(pri.id) as item_count FROM purchase_requests pr LEFT JOIN users u ON pr.requested_by = u.user_id LEFT JOIN files f ON pr.file_id = f.id LEFT JOIN jobs j ON pr.job_no = j.job_no LEFT JOIN purchase_request_items pri ON pr.request_id = pri.request_id WHERE 1=1 AND (:file_id IS NULL OR pr.file_id = :file_id) AND (:job_no IS NULL OR pr.job_no = :job_no) AND (:status IS NULL OR pr.status = :status) GROUP BY pr.request_id, pr.request_no, pr.file_id, pr.job_no, pr.category, pr.material_count, pr.excel_file_path, pr.requested_at, pr.status, u.name, f.original_filename, j.job_name ORDER BY pr.requested_at DESC """) results = db.execute(query, { "file_id": file_id, "job_no": job_no, "status": status }).fetchall() requests = [] for row in results: requests.append({ "request_id": row.request_id, "request_no": row.request_no, "file_id": row.file_id, "job_no": row.job_no, "job_name": row.job_name, "category": row.category, "material_count": row.material_count, "item_count": row.item_count, "excel_file_path": row.excel_file_path, "requested_at": row.requested_at.isoformat() if row.requested_at else None, "status": row.status, "requested_by": row.requested_by, "source_file": row.original_filename }) return { "success": True, "requests": requests, "count": len(requests) } except Exception as e: logger.error(f"Failed to get purchase requests: {str(e)}") raise HTTPException( status_code=500, detail=f"구매신청 목록 조회 실패: {str(e)}" ) @router.get("/{request_id}/materials") async def get_request_materials( request_id: int, # current_user: dict = Depends(get_current_user), db: Session = Depends(get_db) ): """ 구매신청에 포함된 자재 목록 조회 (그룹화 정보 포함) """ try: # 구매신청 정보 조회하여 JSON 파일 경로 가져오기 info_query = text(""" SELECT excel_file_path FROM purchase_requests WHERE request_id = :request_id """) info_result = db.execute(info_query, {"request_id": request_id}).fetchone() grouped_materials = [] if info_result and info_result.excel_file_path: json_path = os.path.join(EXCEL_DIR, info_result.excel_file_path) if os.path.exists(json_path): with open(json_path, 'r', encoding='utf-8') as f: data = json.load(f) grouped_materials = data.get("grouped_materials", []) # 개별 자재 정보 조회 (기존 코드) query = text(""" SELECT pri.item_id, pri.material_id, pri.quantity as requested_quantity, pri.unit as requested_unit, pri.user_requirement, pri.is_ordered, pri.is_received, m.original_description, m.classified_category, m.size_spec, m.main_nom, m.red_nom, m.schedule, m.material_grade, m.full_material_grade, m.quantity as original_quantity, m.unit as original_unit, m.classification_details, pd.outer_diameter, pd.schedule as pipe_schedule, pd.material_spec, pd.manufacturing_method, pd.end_preparation, pd.length_mm, fd.fitting_type, fd.fitting_subtype, fd.connection_method as fitting_connection, fd.pressure_rating as fitting_pressure, fd.schedule as fitting_schedule, fld.flange_type, fld.facing_type, fld.pressure_rating as flange_pressure, gd.gasket_type, gd.gasket_subtype, gd.material_type as gasket_material, gd.filler_material, gd.pressure_rating as gasket_pressure, gd.thickness as gasket_thickness, bd.bolt_type, bd.material_standard as bolt_material, bd.length as bolt_length FROM purchase_request_items pri JOIN materials m ON pri.material_id = m.id LEFT JOIN pipe_details pd ON m.id = pd.material_id LEFT JOIN fitting_details fd ON m.id = fd.material_id LEFT JOIN flange_details fld ON m.id = fld.material_id LEFT JOIN gasket_details gd ON m.id = gd.material_id LEFT JOIN bolt_details bd ON m.id = bd.material_id WHERE pri.request_id = :request_id ORDER BY m.classified_category, m.original_description """) results = db.execute(query, {"request_id": request_id}).fetchall() materials = [] for row in results: # quantity를 정수로 변환 (소수점 제거) qty = row.requested_quantity or row.original_quantity try: qty_int = int(float(qty)) if qty else 0 except (ValueError, TypeError): qty_int = 0 # BOM 페이지와 동일한 형식으로 데이터 구성 material_dict = { "item_id": row.item_id, "material_id": row.material_id, "id": row.material_id, "original_description": row.original_description, "classified_category": row.classified_category, "size_spec": row.size_spec, "size_inch": row.main_nom, "main_nom": row.main_nom, "red_nom": row.red_nom, "schedule": row.schedule, "material_grade": row.material_grade, "full_material_grade": row.full_material_grade, "quantity": qty_int, "unit": row.requested_unit or row.original_unit, "user_requirement": row.user_requirement, "is_ordered": row.is_ordered, "is_received": row.is_received, "classification_details": row.classification_details } # 카테고리별 상세 정보 추가 if row.classified_category == 'PIPE' and row.manufacturing_method: material_dict["pipe_details"] = { "manufacturing_method": row.manufacturing_method, "schedule": row.pipe_schedule, "material_spec": row.material_spec, "end_preparation": row.end_preparation, "length_mm": row.length_mm } elif row.classified_category == 'FITTING' and row.fitting_type: material_dict["fitting_details"] = { "fitting_type": row.fitting_type, "fitting_subtype": row.fitting_subtype, "connection_method": row.fitting_connection, "pressure_rating": row.fitting_pressure, "schedule": row.fitting_schedule } elif row.classified_category == 'FLANGE' and row.flange_type: material_dict["flange_details"] = { "flange_type": row.flange_type, "facing_type": row.facing_type, "pressure_rating": row.flange_pressure } elif row.classified_category == 'GASKET' and row.gasket_type: material_dict["gasket_details"] = { "gasket_type": row.gasket_type, "gasket_subtype": row.gasket_subtype, "material_type": row.gasket_material, "filler_material": row.filler_material, "pressure_rating": row.gasket_pressure, "thickness": row.gasket_thickness } elif row.classified_category == 'BOLT' and row.bolt_type: material_dict["bolt_details"] = { "bolt_type": row.bolt_type, "material_standard": row.bolt_material, "length": row.bolt_length } materials.append(material_dict) return { "success": True, "materials": materials, "grouped_materials": grouped_materials, # 그룹화 정보 추가 "count": len(grouped_materials) if grouped_materials else len(materials) } except Exception as e: logger.error(f"Failed to get request materials: {str(e)}") raise HTTPException( status_code=500, detail=f"구매신청 자재 조회 실패: {str(e)}" ) @router.get("/{request_id}/download-excel") async def download_request_excel( request_id: int, # current_user: dict = Depends(get_current_user), db: Session = Depends(get_db) ): """ 구매신청 엑셀 파일 직접 다운로드 (BOM 페이지에서 생성한 파일 그대로) """ from fastapi.responses import FileResponse try: # 구매신청 정보 조회 query = text(""" SELECT request_no, excel_file_path, job_no FROM purchase_requests WHERE request_id = :request_id """) result = db.execute(query, {"request_id": request_id}).fetchone() if not result: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="구매신청을 찾을 수 없습니다" ) excel_file_path = os.path.join(EXCEL_DIR, result.excel_file_path) if not os.path.exists(excel_file_path): raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="엑셀 파일을 찾을 수 없습니다" ) # 엑셀 파일 직접 다운로드 return FileResponse( path=excel_file_path, filename=f"{result.job_no}_{result.request_no}.xlsx", media_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet" ) except HTTPException: raise except Exception as e: logger.error(f"Failed to download request excel: {str(e)}") raise HTTPException( status_code=500, detail=f"엑셀 다운로드 실패: {str(e)}" ) def save_materials_data(materials_data: List[Dict], file_path: str, request_no: str, job_no: str, grouped_materials: List[Dict] = None): """ 자재 데이터를 JSON으로 저장 (프론트엔드에서 동일한 엑셀 포맷으로 생성하기 위해) """ # 수량을 정수로 변환하여 저장 cleaned_materials = [] for material in materials_data: cleaned_material = material.copy() if 'quantity' in cleaned_material: try: cleaned_material['quantity'] = int(float(cleaned_material['quantity'])) except (ValueError, TypeError): cleaned_material['quantity'] = 0 cleaned_materials.append(cleaned_material) # 그룹화된 자재도 수량 정수 변환 cleaned_grouped = [] if grouped_materials: for group in grouped_materials: cleaned_group = group.copy() if 'quantity' in cleaned_group: try: cleaned_group['quantity'] = int(float(cleaned_group['quantity'])) except (ValueError, TypeError): cleaned_group['quantity'] = 0 cleaned_grouped.append(cleaned_group) data_to_save = { "request_no": request_no, "job_no": job_no, "created_at": datetime.now().isoformat(), "materials": cleaned_materials, "grouped_materials": cleaned_grouped or [] } with open(file_path, 'w', encoding='utf-8') as f: json.dump(data_to_save, f, ensure_ascii=False, indent=2) def create_excel_file(materials_data: List[Dict], file_path: str, request_no: str, job_no: str): """ 자재 데이터로 엑셀 파일 생성 (BOM 페이지와 동일한 형식) """ import openpyxl from openpyxl.styles import Font, PatternFill, Alignment, Border, Side # 새 워크북 생성 wb = openpyxl.Workbook() wb.remove(wb.active) # 기본 시트 제거 # 카테고리별 그룹화 category_groups = {} for material in materials_data: category = material.get('category', 'UNKNOWN') if category not in category_groups: category_groups[category] = [] category_groups[category].append(material) # 각 카테고리별 시트 생성 for category, items in category_groups.items(): if not items: continue ws = wb.create_sheet(title=category) # 헤더 정의 headers = ['TAGNO', '품목명', '수량', '통화구분', '단가', '크기', '압력등급', '스케줄', '재질', '상세내역', '사용자요구', '관리항목1', '관리항목7', '관리항목8', '관리항목9', '관리항목10', '납기일(YYYY-MM-DD)'] # 헤더 작성 for col, header in enumerate(headers, 1): cell = ws.cell(row=1, column=col, value=header) cell.font = Font(bold=True, color="000000", size=12, name="맑은 고딕") cell.fill = PatternFill(start_color="B3D9FF", end_color="B3D9FF", fill_type="solid") cell.alignment = Alignment(horizontal="center", vertical="center") cell.border = Border( top=Side(style="thin", color="666666"), bottom=Side(style="thin", color="666666"), left=Side(style="thin", color="666666"), right=Side(style="thin", color="666666") ) # 데이터 작성 for row_idx, material in enumerate(items, 2): data = [ '', # TAGNO category, # 품목명 material.get('quantity', 0), # 수량 'KRW', # 통화구분 1, # 단가 material.get('size', '-'), # 크기 '-', # 압력등급 (추후 개선) material.get('schedule', '-'), # 스케줄 material.get('material_grade', '-'), # 재질 '-', # 상세내역 (추후 개선) material.get('user_requirement', ''), # 사용자요구 '', '', '', '', '', # 관리항목들 datetime.now().strftime('%Y-%m-%d') # 납기일 ] for col, value in enumerate(data, 1): ws.cell(row=row_idx, column=col, value=value) # 컬럼 너비 자동 조정 for column in ws.columns: max_length = 0 column_letter = column[0].column_letter for cell in column: try: if len(str(cell.value)) > max_length: max_length = len(str(cell.value)) except: pass adjusted_width = min(max(max_length + 2, 10), 50) ws.column_dimensions[column_letter].width = adjusted_width # 파일 저장 wb.save(file_path)