""" 구매신청 관리 API """ from fastapi import APIRouter, Depends, HTTPException, status, Body, File, UploadFile, Form from fastapi.responses import FileResponse from pydantic import BaseModel from sqlalchemy import text from sqlalchemy.orm import Session from typing import Optional, List, Dict from datetime import datetime from pathlib import Path import os import json from ..database import get_db from ..auth.middleware import get_current_user from ..utils.logger import get_logger logger = get_logger(__name__) router = APIRouter(prefix="/purchase-request", tags=["Purchase Request"]) # 엑셀 파일 저장 경로 EXCEL_DIR = "uploads/excel_exports" os.makedirs(EXCEL_DIR, exist_ok=True) class PurchaseRequestCreate(BaseModel): file_id: int job_no: Optional[str] = None category: Optional[str] = None material_ids: List[int] = [] materials_data: List[Dict] = [] grouped_materials: Optional[List[Dict]] = [] # 그룹화된 자재 정보 @router.post("/create") async def create_purchase_request( request_data: PurchaseRequestCreate, current_user: dict = Depends(get_current_user), db: Session = Depends(get_db) ): """ 구매신청 생성 (엑셀 내보내기 = 구매신청) """ try: # 구매신청 번호 생성 today = datetime.now().strftime('%Y%m%d') count_query = text(""" SELECT COUNT(*) as count FROM purchase_requests WHERE request_no LIKE :pattern """) count = db.execute(count_query, {"pattern": f"PR-{today}%"}).fetchone().count request_no = f"PR-{today}-{str(count + 1).zfill(3)}" # 자재 데이터를 JSON과 엑셀 파일로 저장 json_filename = f"{request_no}.json" excel_filename = f"{request_no}.xlsx" json_path = os.path.join(EXCEL_DIR, json_filename) excel_path = os.path.join(EXCEL_DIR, excel_filename) # JSON 저장 save_materials_data( request_data.materials_data, json_path, request_no, request_data.job_no, request_data.grouped_materials # 그룹화 정보 추가 ) # 엑셀 파일 생성 및 저장 create_excel_file( request_data.grouped_materials or request_data.materials_data, excel_path, request_no, request_data.job_no ) # 구매신청 레코드 생성 insert_request = text(""" INSERT INTO purchase_requests ( request_no, file_id, job_no, category, material_count, excel_file_path, requested_by ) VALUES ( :request_no, :file_id, :job_no, :category, :material_count, :excel_file_path, :requested_by ) RETURNING request_id """) result = db.execute(insert_request, { "request_no": request_no, "file_id": request_data.file_id, "job_no": request_data.job_no, "category": request_data.category, "material_count": len(request_data.material_ids), "excel_file_path": excel_filename, # 엑셀 파일명 저장 (JSON 대신) "requested_by": current_user.get("user_id") }) request_id = result.fetchone().request_id # 구매신청 자재 상세 저장 logger.info(f"Processing {len(request_data.material_ids)} material IDs for purchase request {request_no}") logger.info(f"First 10 Material IDs: {request_data.material_ids[:10]}") # 처음 10개만 로그 logger.info(f"Category: {request_data.category}, Job: {request_data.job_no}") inserted_count = 0 for i, material_id in enumerate(request_data.material_ids): material_data = request_data.materials_data[i] if i < len(request_data.materials_data) else {} # 이미 구매신청된 자재인지 확인 check_existing = text(""" SELECT 1 FROM purchase_request_items WHERE material_id = :material_id """) existing = db.execute(check_existing, {"material_id": material_id}).fetchone() if not existing: insert_item = text(""" INSERT INTO purchase_request_items ( request_id, material_id, quantity, unit, user_requirement ) VALUES ( :request_id, :material_id, :quantity, :unit, :user_requirement ) """) # quantity를 정수로 변환 (소수점 제거) quantity_str = str(material_data.get("quantity", 0)) try: quantity = int(float(quantity_str)) except (ValueError, TypeError): quantity = 0 db.execute(insert_item, { "request_id": request_id, "material_id": material_id, "quantity": quantity, "unit": material_data.get("unit", ""), "user_requirement": material_data.get("user_requirement", "") }) inserted_count += 1 else: logger.warning(f"Material {material_id} already in another purchase request, skipping") db.commit() logger.info(f"Purchase request created: {request_no} with {inserted_count} materials (out of {len(request_data.material_ids)} requested)") # 실제 저장된 자재 확인 verify_query = text(""" SELECT COUNT(*) as count FROM purchase_request_items WHERE request_id = :request_id """) verified_count = db.execute(verify_query, {"request_id": request_id}).fetchone().count logger.info(f"✅ DB 검증: purchase_request_items에 {verified_count}개 저장됨") return { "success": True, "request_no": request_no, "request_id": request_id, "material_count": len(request_data.material_ids), "inserted_count": inserted_count, "verified_count": verified_count, "message": f"구매신청 {request_no}이 생성되었습니다" } except Exception as e: db.rollback() logger.error(f"Failed to create purchase request: {str(e)}") raise HTTPException( status_code=500, detail=f"구매신청 생성 실패: {str(e)}" ) @router.get("/list") async def get_purchase_requests( file_id: Optional[int] = None, job_no: Optional[str] = None, status: Optional[str] = None, # current_user: dict = Depends(get_current_user), db: Session = Depends(get_db) ): """ 구매신청 목록 조회 """ try: query = text(""" SELECT pr.request_id, pr.request_no, pr.file_id, pr.job_no, pr.category, pr.material_count, pr.excel_file_path, pr.requested_at, pr.status, u.name as requested_by, f.original_filename, j.job_name, COUNT(pri.item_id) as item_count FROM purchase_requests pr LEFT JOIN users u ON pr.requested_by = u.user_id LEFT JOIN files f ON pr.file_id = f.id LEFT JOIN jobs j ON pr.job_no = j.job_no LEFT JOIN purchase_request_items pri ON pr.request_id = pri.request_id WHERE 1=1 AND (:file_id IS NULL OR pr.file_id = :file_id) AND (:job_no IS NULL OR pr.job_no = :job_no) AND (:status IS NULL OR pr.status = :status) GROUP BY pr.request_id, pr.request_no, pr.file_id, pr.job_no, pr.category, pr.material_count, pr.excel_file_path, pr.requested_at, pr.status, u.name, f.original_filename, j.job_name ORDER BY pr.requested_at DESC """) results = db.execute(query, { "file_id": file_id, "job_no": job_no, "status": status }).fetchall() requests = [] for row in results: requests.append({ "request_id": row.request_id, "request_no": row.request_no, "file_id": row.file_id, "job_no": row.job_no, "job_name": row.job_name, "category": row.category, "material_count": row.material_count, "item_count": row.item_count, "excel_file_path": row.excel_file_path, "requested_at": row.requested_at.isoformat() if row.requested_at else None, "status": row.status, "requested_by": row.requested_by, "source_file": row.original_filename }) return { "success": True, "requests": requests, "count": len(requests) } except Exception as e: logger.error(f"Failed to get purchase requests: {str(e)}") raise HTTPException( status_code=500, detail=f"구매신청 목록 조회 실패: {str(e)}" ) @router.get("/{request_id}/materials") async def get_request_materials( request_id: int, # current_user: dict = Depends(get_current_user), db: Session = Depends(get_db) ): """ 구매신청에 포함된 자재 목록 조회 (그룹화 정보 포함) """ try: # 구매신청 정보 조회하여 JSON 파일 경로 가져오기 info_query = text(""" SELECT excel_file_path FROM purchase_requests WHERE request_id = :request_id """) info_result = db.execute(info_query, {"request_id": request_id}).fetchone() grouped_materials = [] if info_result and info_result.excel_file_path: json_path = os.path.join(EXCEL_DIR, info_result.excel_file_path) if os.path.exists(json_path): try: with open(json_path, 'r', encoding='utf-8', errors='ignore') as f: data = json.load(f) grouped_materials = data.get("grouped_materials", []) except Exception as e: print(f"⚠️ JSON 파일 읽기 오류 (무시): {e}") grouped_materials = [] # 개별 자재 정보 조회 (기존 코드) query = text(""" SELECT pri.item_id, pri.material_id, pri.quantity as requested_quantity, pri.unit as requested_unit, pri.user_requirement, pri.is_ordered, pri.is_received, m.original_description, m.classified_category, m.size_spec, m.main_nom, m.red_nom, m.schedule, m.material_grade, m.full_material_grade, m.quantity as original_quantity, m.unit as original_unit, m.classification_details, pd.outer_diameter, pd.schedule as pipe_schedule, pd.material_spec, pd.manufacturing_method, pd.end_preparation, pd.length_mm, fd.fitting_type, fd.fitting_subtype, fd.connection_method as fitting_connection, fd.pressure_rating as fitting_pressure, fd.schedule as fitting_schedule, fld.flange_type, fld.facing_type, fld.pressure_rating as flange_pressure, gd.gasket_type, gd.gasket_subtype, gd.material_type as gasket_material, gd.filler_material, gd.pressure_rating as gasket_pressure, gd.thickness as gasket_thickness, bd.bolt_type, bd.material_standard as bolt_material, bd.length as bolt_length FROM purchase_request_items pri JOIN materials m ON pri.material_id = m.id LEFT JOIN pipe_details pd ON m.id = pd.material_id LEFT JOIN fitting_details fd ON m.id = fd.material_id LEFT JOIN flange_details fld ON m.id = fld.material_id LEFT JOIN gasket_details gd ON m.id = gd.material_id LEFT JOIN bolt_details bd ON m.id = bd.material_id WHERE pri.request_id = :request_id ORDER BY m.classified_category, m.original_description """) # 🎯 데이터베이스 쿼리 실행 results = db.execute(query, {"request_id": request_id}).fetchall() materials = [] # 🎯 안전한 문자열 변환 함수 def safe_str(value): if value is None: return '' try: if isinstance(value, bytes): return value.decode('utf-8', errors='ignore') return str(value) except Exception: return str(value) if value else '' for row in results: try: # quantity를 정수로 변환 (소수점 제거) qty = row.requested_quantity or row.original_quantity try: qty_int = int(float(qty)) if qty else 0 except (ValueError, TypeError): qty_int = 0 # 안전한 문자열 변환 original_description = safe_str(row.original_description) size_spec = safe_str(row.size_spec) material_grade = safe_str(row.material_grade) full_material_grade = safe_str(row.full_material_grade) user_requirement = safe_str(row.user_requirement) except Exception as e: # 오류 발생 시 기본값 사용 qty_int = 0 original_description = '' size_spec = '' material_grade = '' full_material_grade = '' user_requirement = '' # BOM 페이지와 동일한 형식으로 데이터 구성 material_dict = { "item_id": row.item_id, "material_id": row.material_id, "id": row.material_id, "original_description": original_description, "classified_category": safe_str(row.classified_category), "size_spec": size_spec, "size_inch": safe_str(row.main_nom), "main_nom": safe_str(row.main_nom), "red_nom": safe_str(row.red_nom), "schedule": safe_str(row.schedule), "material_grade": material_grade, "full_material_grade": full_material_grade, "quantity": qty_int, "unit": safe_str(row.requested_unit or row.original_unit), "user_requirement": user_requirement, "is_ordered": row.is_ordered, "is_received": row.is_received, "classification_details": safe_str(row.classification_details) } # 카테고리별 상세 정보 추가 (안전한 문자열 처리) if row.classified_category == 'PIPE' and row.manufacturing_method: material_dict["pipe_details"] = { "manufacturing_method": safe_str(row.manufacturing_method), "schedule": safe_str(row.pipe_schedule), "material_spec": safe_str(row.material_spec), "end_preparation": safe_str(row.end_preparation), "length_mm": row.length_mm } elif row.classified_category == 'FITTING' and row.fitting_type: material_dict["fitting_details"] = { "fitting_type": safe_str(row.fitting_type), "fitting_subtype": safe_str(row.fitting_subtype), "connection_method": safe_str(row.fitting_connection), "pressure_rating": safe_str(row.fitting_pressure), "schedule": safe_str(row.fitting_schedule) } elif row.classified_category == 'FLANGE' and row.flange_type: material_dict["flange_details"] = { "flange_type": safe_str(row.flange_type), "facing_type": safe_str(row.facing_type), "pressure_rating": safe_str(row.flange_pressure) } elif row.classified_category == 'GASKET' and row.gasket_type: material_dict["gasket_details"] = { "gasket_type": safe_str(row.gasket_type), "gasket_subtype": safe_str(row.gasket_subtype), "material_type": safe_str(row.gasket_material), "filler_material": safe_str(row.filler_material), "pressure_rating": safe_str(row.gasket_pressure), "thickness": safe_str(row.gasket_thickness) } elif row.classified_category == 'BOLT' and row.bolt_type: material_dict["bolt_details"] = { "bolt_type": safe_str(row.bolt_type), "material_standard": safe_str(row.bolt_material), "length": safe_str(row.bolt_length) } materials.append(material_dict) return { "success": True, "materials": materials, "grouped_materials": grouped_materials, # 그룹화 정보 추가 "count": len(grouped_materials) if grouped_materials else len(materials) } except Exception as e: logger.error(f"Failed to get request materials: {str(e)}") raise HTTPException( status_code=500, detail=f"구매신청 자재 조회 실패: {str(e)}" ) @router.get("/requested-materials") async def get_requested_material_ids( file_id: Optional[int] = None, job_no: Optional[str] = None, db: Session = Depends(get_db) ): """ 구매신청된 자재 ID 목록 조회 (BOM 페이지에서 비활성화용) """ try: query = text(""" SELECT DISTINCT pri.material_id FROM purchase_request_items pri JOIN purchase_requests pr ON pri.request_id = pr.request_id WHERE 1=1 AND (:file_id IS NULL OR pr.file_id = :file_id) AND (:job_no IS NULL OR pr.job_no = :job_no) """) results = db.execute(query, { "file_id": file_id, "job_no": job_no }).fetchall() material_ids = [row.material_id for row in results] return { "success": True, "requested_material_ids": material_ids, "count": len(material_ids) } except Exception as e: logger.error(f"Failed to get requested material IDs: {str(e)}") raise HTTPException( status_code=500, detail=f"구매신청 자재 ID 조회 실패: {str(e)}" ) @router.patch("/{request_id}/title") async def update_request_title( request_id: int, title: str = Body(..., embed=True), # current_user: dict = Depends(get_current_user), db: Session = Depends(get_db) ): """ 구매신청 제목(request_no) 업데이트 """ try: # 구매신청 존재 확인 check_query = text(""" SELECT request_no FROM purchase_requests WHERE request_id = :request_id """) existing = db.execute(check_query, {"request_id": request_id}).fetchone() if not existing: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="구매신청을 찾을 수 없습니다" ) # 제목 업데이트 update_query = text(""" UPDATE purchase_requests SET request_no = :title WHERE request_id = :request_id """) db.execute(update_query, { "request_id": request_id, "title": title }) db.commit() return { "success": True, "message": "구매신청 제목이 업데이트되었습니다", "old_title": existing.request_no, "new_title": title } except HTTPException: raise except Exception as e: db.rollback() logger.error(f"Failed to update request title: {str(e)}") raise HTTPException( status_code=500, detail=f"구매신청 제목 업데이트 실패: {str(e)}" ) @router.get("/{request_id}/download-excel") async def download_request_excel( request_id: int, # current_user: dict = Depends(get_current_user), db: Session = Depends(get_db) ): """ 구매신청 엑셀 파일 직접 다운로드 (BOM 페이지에서 생성한 파일 그대로) """ from fastapi.responses import FileResponse try: # 구매신청 정보 조회 query = text(""" SELECT request_no, excel_file_path, job_no FROM purchase_requests WHERE request_id = :request_id """) result = db.execute(query, {"request_id": request_id}).fetchone() if not result: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="구매신청을 찾을 수 없습니다" ) excel_file_path = os.path.join(EXCEL_DIR, result.excel_file_path) if not os.path.exists(excel_file_path): raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="엑셀 파일을 찾을 수 없습니다" ) # 엑셀 파일 직접 다운로드 return FileResponse( path=excel_file_path, filename=f"{result.job_no}_{result.request_no}.xlsx", media_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet" ) except HTTPException: raise except Exception as e: logger.error(f"Failed to download request excel: {str(e)}") raise HTTPException( status_code=500, detail=f"엑셀 다운로드 실패: {str(e)}" ) def save_materials_data(materials_data: List[Dict], file_path: str, request_no: str, job_no: str, grouped_materials: List[Dict] = None): """ 자재 데이터를 JSON으로 저장 (프론트엔드에서 동일한 엑셀 포맷으로 생성하기 위해) """ # 수량을 정수로 변환하여 저장 cleaned_materials = [] for material in materials_data: cleaned_material = material.copy() if 'quantity' in cleaned_material: try: cleaned_material['quantity'] = int(float(cleaned_material['quantity'])) except (ValueError, TypeError): cleaned_material['quantity'] = 0 cleaned_materials.append(cleaned_material) # 그룹화된 자재도 수량 정수 변환 cleaned_grouped = [] if grouped_materials: for group in grouped_materials: cleaned_group = group.copy() if 'quantity' in cleaned_group: try: cleaned_group['quantity'] = int(float(cleaned_group['quantity'])) except (ValueError, TypeError): cleaned_group['quantity'] = 0 cleaned_grouped.append(cleaned_group) data_to_save = { "request_no": request_no, "job_no": job_no, "created_at": datetime.now().isoformat(), "materials": cleaned_materials, "grouped_materials": cleaned_grouped or [] } with open(file_path, 'w', encoding='utf-8') as f: json.dump(data_to_save, f, ensure_ascii=False, indent=2) def create_excel_file(materials_data: List[Dict], file_path: str, request_no: str, job_no: str): """ 자재 데이터로 엑셀 파일 생성 (BOM 페이지와 동일한 형식) """ import openpyxl from openpyxl.styles import Font, PatternFill, Alignment, Border, Side # 새 워크북 생성 wb = openpyxl.Workbook() wb.remove(wb.active) # 기본 시트 제거 # 카테고리별 그룹화 category_groups = {} for material in materials_data: category = material.get('category', 'UNKNOWN') if category not in category_groups: category_groups[category] = [] category_groups[category].append(material) # 각 카테고리별 시트 생성 for category, items in category_groups.items(): if not items: continue ws = wb.create_sheet(title=category) # 헤더 정의 (P열에 납기일, 관리항목 통일) headers = ['TAGNO', '품목명', '수량', '통화구분', '단가', '크기', '압력등급', '스케줄', '재질', '상세내역', '사용자요구', '관리항목1', '관리항목2', '관리항목3', '관리항목4', '납기일(YYYY-MM-DD)', '관리항목5', '관리항목6', '관리항목7', '관리항목8', '관리항목9', '관리항목10'] # 헤더 작성 for col, header in enumerate(headers, 1): cell = ws.cell(row=1, column=col, value=header) cell.font = Font(bold=True, color="000000", size=12, name="맑은 고딕") cell.fill = PatternFill(start_color="B3D9FF", end_color="B3D9FF", fill_type="solid") cell.alignment = Alignment(horizontal="center", vertical="center") cell.border = Border( top=Side(style="thin", color="666666"), bottom=Side(style="thin", color="666666"), left=Side(style="thin", color="666666"), right=Side(style="thin", color="666666") ) # 데이터 작성 for row_idx, material in enumerate(items, 2): data = [ '', # TAGNO category, # 품목명 material.get('quantity', 0), # 수량 'KRW', # 통화구분 1, # 단가 material.get('size', '-'), # 크기 '-', # 압력등급 (추후 개선) material.get('schedule', '-'), # 스케줄 material.get('material_grade', '-'), # 재질 '-', # 상세내역 (추후 개선) material.get('user_requirement', ''), # 사용자요구 '', '', '', '', '', # 관리항목들 datetime.now().strftime('%Y-%m-%d') # 납기일 ] for col, value in enumerate(data, 1): ws.cell(row=row_idx, column=col, value=value) # 컬럼 너비 자동 조정 for column in ws.columns: max_length = 0 column_letter = column[0].column_letter for cell in column: try: if len(str(cell.value)) > max_length: max_length = len(str(cell.value)) except: pass adjusted_width = min(max(max_length + 2, 10), 50) ws.column_dimensions[column_letter].width = adjusted_width # 파일 저장 wb.save(file_path) @router.post("/upload-excel") async def upload_request_excel( excel_file: UploadFile = File(...), request_id: int = Form(...), category: str = Form(...), # current_user: dict = Depends(get_current_user), db: Session = Depends(get_db) ): """ 구매신청에 대한 엑셀 파일 업로드 (BOM에서 생성한 원본 파일) """ try: # 구매신청 정보 조회 query = text(""" SELECT request_no, job_no FROM purchase_requests WHERE request_id = :request_id """) result = db.execute(query, {"request_id": request_id}).fetchone() if not result: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="구매신청을 찾을 수 없습니다" ) # 엑셀 저장 디렉토리 생성 excel_dir = Path("uploads/excel_exports") excel_dir.mkdir(parents=True, exist_ok=True) # 파일명 생성 timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") safe_filename = f"{result.job_no}_{result.request_no}_{timestamp}.xlsx" file_path = excel_dir / safe_filename # 파일 저장 content = await excel_file.read() with open(file_path, "wb") as f: f.write(content) # 구매신청 테이블에 엑셀 파일 경로 업데이트 update_query = text(""" UPDATE purchase_requests SET excel_file_path = :excel_file_path WHERE request_id = :request_id """) db.execute(update_query, { "excel_file_path": safe_filename, "request_id": request_id }) db.commit() logger.info(f"엑셀 파일 업로드 완료: {safe_filename}") return { "success": True, "message": "엑셀 파일이 성공적으로 업로드되었습니다", "file_path": safe_filename } except HTTPException: raise except Exception as e: db.rollback() logger.error(f"Failed to upload excel file: {str(e)}") raise HTTPException( status_code=500, detail=f"엑셀 파일 업로드 실패: {str(e)}" )