Files
TK-BOM-Project/backend/app/routers/purchase_request.py
Hyungi Ahn 17843e285f feat: 리비전 관리 시스템 및 구매확정 기능 구현
- 리비전 관리 라우터 및 서비스 추가 (revision_management.py, revision_comparison_service.py, revision_session_service.py)
- 구매확정 기능 구현: materials 테이블에 purchase_confirmed 필드 추가 및 업데이트 로직
- 리비전 비교 로직 구현: 구매확정된 자재 기반으로 신규/변경 자재 자동 분류
- 데이터베이스 스키마 확장: revision_sessions, revision_material_changes, inventory_transfers 테이블 추가
- 구매신청 생성 시 자재 상세 정보 저장 및 purchase_confirmed 자동 업데이트
- 프론트엔드: 리비전 관리 컴포넌트 및 hooks 추가
- 파일 목록 조회 API 추가 (/files/list)

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-12-06 07:36:44 +09:00

835 lines
33 KiB
Python

"""
구매신청 관리 API
"""
from fastapi import APIRouter, Depends, HTTPException, status, Body, File, UploadFile, Form
from fastapi.responses import FileResponse
from pydantic import BaseModel
from sqlalchemy import text
from sqlalchemy.orm import Session
from typing import Optional, List, Dict
from datetime import datetime
from pathlib import Path
import os
import json
from ..database import get_db
from ..auth.middleware import get_current_user
from ..utils.logger import get_logger
logger = get_logger(__name__)
router = APIRouter(prefix="/purchase-request", tags=["Purchase Request"])
# 엑셀 파일 저장 경로
EXCEL_DIR = "uploads/excel_exports"
os.makedirs(EXCEL_DIR, exist_ok=True)
class PurchaseRequestCreate(BaseModel):
file_id: int
job_no: Optional[str] = None
category: Optional[str] = None
material_ids: List[int] = []
materials_data: List[Dict] = []
grouped_materials: Optional[List[Dict]] = [] # 그룹화된 자재 정보
@router.post("/create")
async def create_purchase_request(
request_data: PurchaseRequestCreate,
current_user: dict = Depends(get_current_user),
db: Session = Depends(get_db)
):
"""
구매신청 생성 (엑셀 내보내기 = 구매신청)
"""
try:
# 🔍 디버깅: 요청 데이터 로깅
logger.info(f"🔍 구매신청 생성 요청 - file_id: {request_data.file_id}, job_no: {request_data.job_no}")
logger.info(f"🔍 material_ids 개수: {len(request_data.material_ids)}")
logger.info(f"🔍 materials_data 개수: {len(request_data.materials_data)}")
if request_data.material_ids:
logger.info(f"🔍 material_ids 샘플: {request_data.material_ids[:5]}")
print(f"🔍 [DEBUG] 구매신청 API 호출됨 - material_ids: {len(request_data.material_ids)}")
# 구매신청 번호 생성
today = datetime.now().strftime('%Y%m%d')
count_query = text("""
SELECT COUNT(*) as count
FROM purchase_requests
WHERE request_no LIKE :pattern
""")
count = db.execute(count_query, {"pattern": f"PR-{today}%"}).fetchone().count
request_no = f"PR-{today}-{str(count + 1).zfill(3)}"
# 자재 데이터를 JSON과 엑셀 파일로 저장
json_filename = f"{request_no}.json"
excel_filename = f"{request_no}.xlsx"
json_path = os.path.join(EXCEL_DIR, json_filename)
excel_path = os.path.join(EXCEL_DIR, excel_filename)
# JSON 저장
save_materials_data(
request_data.materials_data,
json_path,
request_no,
request_data.job_no,
request_data.grouped_materials # 그룹화 정보 추가
)
# 엑셀 파일 생성 및 저장
create_excel_file(
request_data.grouped_materials or request_data.materials_data,
excel_path,
request_no,
request_data.job_no
)
# 구매신청 레코드 생성
insert_request = text("""
INSERT INTO purchase_requests (
request_no, file_id, job_no, category,
material_count, excel_file_path, requested_by
) VALUES (
:request_no, :file_id, :job_no, :category,
:material_count, :excel_file_path, :requested_by
) RETURNING request_id
""")
result = db.execute(insert_request, {
"request_no": request_no,
"file_id": request_data.file_id,
"job_no": request_data.job_no,
"category": request_data.category,
"material_count": len(request_data.material_ids),
"excel_file_path": excel_filename, # 엑셀 파일명 저장 (JSON 대신)
"requested_by": current_user.get("user_id")
})
request_id = result.fetchone().request_id
# 구매신청 자재 상세 저장
logger.info(f"Processing {len(request_data.material_ids)} material IDs for purchase request {request_no}")
logger.info(f"First 10 Material IDs: {request_data.material_ids[:10]}") # 처음 10개만 로그
logger.info(f"Category: {request_data.category}, Job: {request_data.job_no}")
inserted_count = 0
for i, material_id in enumerate(request_data.material_ids):
material_data = request_data.materials_data[i] if i < len(request_data.materials_data) else {}
# 이미 구매신청된 자재인지 확인
check_existing = text("""
SELECT 1 FROM purchase_request_items
WHERE material_id = :material_id
""")
existing = db.execute(check_existing, {"material_id": material_id}).fetchone()
if not existing:
insert_item = text("""
INSERT INTO purchase_request_items (
request_id, material_id, description, category, subcategory,
material_grade, size_spec, quantity, unit, drawing_name,
notes, user_requirement
) VALUES (
:request_id, :material_id, :description, :category, :subcategory,
:material_grade, :size_spec, :quantity, :unit, :drawing_name,
:notes, :user_requirement
)
""")
# quantity를 정수로 변환 (소수점 제거)
quantity_str = str(material_data.get("quantity", 0))
try:
quantity = int(float(quantity_str))
except (ValueError, TypeError):
quantity = 0
db.execute(insert_item, {
"request_id": request_id,
"material_id": material_id,
"description": material_data.get("description", material_data.get("original_description", "")),
"category": material_data.get("category", material_data.get("classified_category", "")),
"subcategory": material_data.get("subcategory", material_data.get("classified_subcategory", "")),
"material_grade": material_data.get("material_grade", ""),
"size_spec": material_data.get("size_spec", ""),
"quantity": quantity,
"unit": material_data.get("unit", "EA"),
"drawing_name": material_data.get("drawing_name", ""),
"notes": material_data.get("notes", ""),
"user_requirement": material_data.get("user_requirement", "")
})
inserted_count += 1
else:
logger.warning(f"Material {material_id} already in another purchase request, skipping")
# 🔥 중요: materials 테이블의 purchase_confirmed 업데이트
if request_data.material_ids:
print(f"🔥 [PURCHASE] purchase_confirmed 업데이트 시작: {len(request_data.material_ids)}개 자재")
print(f"🔥 [PURCHASE] material_ids: {request_data.material_ids[:5]}...") # 처음 5개만 로그
update_materials_query = text("""
UPDATE materials
SET purchase_confirmed = true,
purchase_confirmed_at = NOW(),
purchase_confirmed_by = :confirmed_by
WHERE id = ANY(:material_ids)
""")
result = db.execute(update_materials_query, {
"material_ids": request_data.material_ids,
"confirmed_by": current_user.get("username", "system")
})
print(f"🔥 [PURCHASE] UPDATE 결과: {result.rowcount}개 행 업데이트됨")
logger.info(f"{len(request_data.material_ids)}개 자재의 purchase_confirmed를 true로 업데이트")
else:
print(f"⚠️ [PURCHASE] material_ids가 비어있음!")
db.commit()
logger.info(f"Purchase request created: {request_no} with {inserted_count} materials (out of {len(request_data.material_ids)} requested)")
# 실제 저장된 자재 확인
verify_query = text("""
SELECT COUNT(*) as count FROM purchase_request_items WHERE request_id = :request_id
""")
verified_count = db.execute(verify_query, {"request_id": request_id}).fetchone().count
logger.info(f"✅ DB 검증: purchase_request_items에 {verified_count}개 저장됨")
# purchase_requests 테이블의 total_items 필드 업데이트
update_total_items = text("""
UPDATE purchase_requests
SET total_items = :total_items
WHERE request_id = :request_id
""")
db.execute(update_total_items, {
"request_id": request_id,
"total_items": verified_count
})
db.commit()
logger.info(f"✅ total_items 업데이트 완료: {verified_count}")
return {
"success": True,
"request_no": request_no,
"request_id": request_id,
"material_count": len(request_data.material_ids),
"inserted_count": inserted_count,
"verified_count": verified_count,
"message": f"구매신청 {request_no}이 생성되었습니다"
}
except Exception as e:
db.rollback()
logger.error(f"Failed to create purchase request: {str(e)}")
raise HTTPException(
status_code=500,
detail=f"구매신청 생성 실패: {str(e)}"
)
@router.get("/list")
async def get_purchase_requests(
file_id: Optional[int] = None,
job_no: Optional[str] = None,
status: Optional[str] = None,
# current_user: dict = Depends(get_current_user),
db: Session = Depends(get_db)
):
"""
구매신청 목록 조회
"""
try:
query = text("""
SELECT
pr.request_id,
pr.request_no,
pr.file_id,
pr.job_no,
pr.total_items,
pr.request_date,
pr.status,
pr.requested_by_username as requested_by,
f.original_filename,
j.job_name,
COUNT(pri.item_id) as item_count
FROM purchase_requests pr
LEFT JOIN files f ON pr.file_id = f.id
LEFT JOIN jobs j ON pr.job_no = j.job_no
LEFT JOIN purchase_request_items pri ON pr.request_id = pri.request_id
WHERE 1=1
AND (:file_id IS NULL OR pr.file_id = :file_id)
AND (:job_no IS NULL OR pr.job_no = :job_no)
AND (:status IS NULL OR pr.status = :status)
GROUP BY
pr.request_id, pr.request_no, pr.file_id, pr.job_no,
pr.total_items, pr.request_date, pr.status,
pr.requested_by_username, f.original_filename, j.job_name
ORDER BY pr.request_date DESC
""")
results = db.execute(query, {
"file_id": file_id,
"job_no": job_no,
"status": status
}).fetchall()
requests = []
for row in results:
requests.append({
"request_id": row.request_id,
"request_no": row.request_no,
"file_id": row.file_id,
"job_no": row.job_no,
"job_name": row.job_name,
"category": "ALL", # 기본값
"material_count": row.item_count or 0, # 실제 자재 개수 사용
"item_count": row.item_count,
"excel_file_path": None, # 현재 테이블에 없음
"requested_at": row.request_date.isoformat() if row.request_date else None,
"status": row.status,
"requested_by": row.requested_by,
"source_file": row.original_filename
})
return {
"success": True,
"requests": requests,
"count": len(requests)
}
except Exception as e:
logger.error(f"Failed to get purchase requests: {str(e)}")
raise HTTPException(
status_code=500,
detail=f"구매신청 목록 조회 실패: {str(e)}"
)
@router.get("/{request_id}/materials")
async def get_request_materials(
request_id: int,
# current_user: dict = Depends(get_current_user),
db: Session = Depends(get_db)
):
"""
구매신청에 포함된 자재 목록 조회 (그룹화 정보 포함)
"""
try:
# 구매신청 정보 조회하여 JSON 파일 경로 가져오기
info_query = text("""
SELECT excel_file_path
FROM purchase_requests
WHERE request_id = :request_id
""")
info_result = db.execute(info_query, {"request_id": request_id}).fetchone()
grouped_materials = []
if info_result and info_result.excel_file_path:
json_path = os.path.join(EXCEL_DIR, info_result.excel_file_path)
if os.path.exists(json_path):
try:
with open(json_path, 'r', encoding='utf-8', errors='ignore') as f:
data = json.load(f)
grouped_materials = data.get("grouped_materials", [])
except Exception as e:
print(f"⚠️ JSON 파일 읽기 오류 (무시): {e}")
grouped_materials = []
# 개별 자재 정보 조회 (기존 코드)
query = text("""
SELECT
pri.item_id,
pri.material_id,
pri.quantity as requested_quantity,
pri.unit as requested_unit,
pri.user_requirement,
pri.is_ordered,
pri.is_received,
m.original_description,
m.classified_category,
m.size_spec,
m.main_nom,
m.red_nom,
m.schedule,
m.material_grade,
m.full_material_grade,
m.quantity as original_quantity,
m.unit as original_unit,
m.classification_details,
pd.outer_diameter, pd.schedule as pipe_schedule, pd.material_spec, pd.manufacturing_method,
pd.end_preparation, pd.length_mm,
fd.fitting_type, fd.fitting_subtype, fd.connection_method as fitting_connection,
fd.pressure_rating as fitting_pressure, fd.schedule as fitting_schedule,
fld.flange_type, fld.facing_type,
fld.pressure_rating as flange_pressure,
gd.gasket_type, gd.gasket_subtype, gd.material_type as gasket_material,
gd.filler_material, gd.pressure_rating as gasket_pressure, gd.thickness as gasket_thickness,
bd.bolt_type, bd.material_standard as bolt_material, bd.length as bolt_length
FROM purchase_request_items pri
JOIN materials m ON pri.material_id = m.id
LEFT JOIN pipe_details pd ON m.id = pd.material_id
LEFT JOIN fitting_details fd ON m.id = fd.material_id
LEFT JOIN flange_details fld ON m.id = fld.material_id
LEFT JOIN gasket_details gd ON m.id = gd.material_id
LEFT JOIN bolt_details bd ON m.id = bd.material_id
WHERE pri.request_id = :request_id
ORDER BY m.classified_category, m.original_description
""")
# 🎯 데이터베이스 쿼리 실행
results = db.execute(query, {"request_id": request_id}).fetchall()
materials = []
# 🎯 안전한 문자열 변환 함수
def safe_str(value):
if value is None:
return ''
try:
if isinstance(value, bytes):
return value.decode('utf-8', errors='ignore')
return str(value)
except Exception:
return str(value) if value else ''
for row in results:
try:
# quantity를 정수로 변환 (소수점 제거)
qty = row.requested_quantity or row.original_quantity
try:
qty_int = int(float(qty)) if qty else 0
except (ValueError, TypeError):
qty_int = 0
# 안전한 문자열 변환
original_description = safe_str(row.original_description)
size_spec = safe_str(row.size_spec)
material_grade = safe_str(row.material_grade)
full_material_grade = safe_str(row.full_material_grade)
user_requirement = safe_str(row.user_requirement)
except Exception as e:
# 오류 발생 시 기본값 사용
qty_int = 0
original_description = ''
size_spec = ''
material_grade = ''
full_material_grade = ''
user_requirement = ''
# BOM 페이지와 동일한 형식으로 데이터 구성
material_dict = {
"item_id": row.item_id,
"material_id": row.material_id,
"id": row.material_id,
"original_description": original_description,
"classified_category": safe_str(row.classified_category),
"size_spec": size_spec,
"size_inch": safe_str(row.main_nom),
"main_nom": safe_str(row.main_nom),
"red_nom": safe_str(row.red_nom),
"schedule": safe_str(row.schedule),
"material_grade": material_grade,
"full_material_grade": full_material_grade,
"quantity": qty_int,
"unit": safe_str(row.requested_unit or row.original_unit),
"user_requirement": user_requirement,
"is_ordered": row.is_ordered,
"is_received": row.is_received,
"classification_details": safe_str(row.classification_details)
}
# 카테고리별 상세 정보 추가 (안전한 문자열 처리)
if row.classified_category == 'PIPE' and row.manufacturing_method:
material_dict["pipe_details"] = {
"manufacturing_method": safe_str(row.manufacturing_method),
"schedule": safe_str(row.pipe_schedule),
"material_spec": safe_str(row.material_spec),
"end_preparation": safe_str(row.end_preparation),
"length_mm": row.length_mm
}
elif row.classified_category == 'FITTING' and row.fitting_type:
material_dict["fitting_details"] = {
"fitting_type": safe_str(row.fitting_type),
"fitting_subtype": safe_str(row.fitting_subtype),
"connection_method": safe_str(row.fitting_connection),
"pressure_rating": safe_str(row.fitting_pressure),
"schedule": safe_str(row.fitting_schedule)
}
elif row.classified_category == 'FLANGE' and row.flange_type:
material_dict["flange_details"] = {
"flange_type": safe_str(row.flange_type),
"facing_type": safe_str(row.facing_type),
"pressure_rating": safe_str(row.flange_pressure)
}
elif row.classified_category == 'GASKET' and row.gasket_type:
material_dict["gasket_details"] = {
"gasket_type": safe_str(row.gasket_type),
"gasket_subtype": safe_str(row.gasket_subtype),
"material_type": safe_str(row.gasket_material),
"filler_material": safe_str(row.filler_material),
"pressure_rating": safe_str(row.gasket_pressure),
"thickness": safe_str(row.gasket_thickness)
}
elif row.classified_category == 'BOLT' and row.bolt_type:
material_dict["bolt_details"] = {
"bolt_type": safe_str(row.bolt_type),
"material_standard": safe_str(row.bolt_material),
"length": safe_str(row.bolt_length)
}
materials.append(material_dict)
return {
"success": True,
"materials": materials,
"grouped_materials": grouped_materials, # 그룹화 정보 추가
"count": len(grouped_materials) if grouped_materials else len(materials)
}
except Exception as e:
logger.error(f"Failed to get request materials: {str(e)}")
raise HTTPException(
status_code=500,
detail=f"구매신청 자재 조회 실패: {str(e)}"
)
@router.get("/requested-materials")
async def get_requested_material_ids(
file_id: Optional[int] = None,
job_no: Optional[str] = None,
db: Session = Depends(get_db)
):
"""
구매신청된 자재 ID 목록 조회 (BOM 페이지에서 비활성화용)
"""
try:
query = text("""
SELECT DISTINCT pri.material_id
FROM purchase_request_items pri
JOIN purchase_requests pr ON pri.request_id = pr.request_id
WHERE 1=1
AND (:file_id IS NULL OR pr.file_id = :file_id)
AND (:job_no IS NULL OR pr.job_no = :job_no)
""")
results = db.execute(query, {
"file_id": file_id,
"job_no": job_no
}).fetchall()
material_ids = [row.material_id for row in results]
return {
"success": True,
"requested_material_ids": material_ids,
"count": len(material_ids)
}
except Exception as e:
logger.error(f"Failed to get requested material IDs: {str(e)}")
raise HTTPException(
status_code=500,
detail=f"구매신청 자재 ID 조회 실패: {str(e)}"
)
@router.patch("/{request_id}/title")
async def update_request_title(
request_id: int,
title: str = Body(..., embed=True),
# current_user: dict = Depends(get_current_user),
db: Session = Depends(get_db)
):
"""
구매신청 제목(request_no) 업데이트
"""
try:
# 구매신청 존재 확인
check_query = text("""
SELECT request_no FROM purchase_requests
WHERE request_id = :request_id
""")
existing = db.execute(check_query, {"request_id": request_id}).fetchone()
if not existing:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="구매신청을 찾을 수 없습니다"
)
# 제목 업데이트
update_query = text("""
UPDATE purchase_requests
SET request_no = :title
WHERE request_id = :request_id
""")
db.execute(update_query, {
"request_id": request_id,
"title": title
})
db.commit()
return {
"success": True,
"message": "구매신청 제목이 업데이트되었습니다",
"old_title": existing.request_no,
"new_title": title
}
except HTTPException:
raise
except Exception as e:
db.rollback()
logger.error(f"Failed to update request title: {str(e)}")
raise HTTPException(
status_code=500,
detail=f"구매신청 제목 업데이트 실패: {str(e)}"
)
@router.get("/{request_id}/download-excel")
async def download_request_excel(
request_id: int,
# current_user: dict = Depends(get_current_user),
db: Session = Depends(get_db)
):
"""
구매신청 엑셀 파일 직접 다운로드 (BOM 페이지에서 생성한 파일 그대로)
"""
from fastapi.responses import FileResponse
try:
# 구매신청 정보 조회
query = text("""
SELECT request_no, excel_file_path, job_no
FROM purchase_requests
WHERE request_id = :request_id
""")
result = db.execute(query, {"request_id": request_id}).fetchone()
if not result:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="구매신청을 찾을 수 없습니다"
)
excel_file_path = os.path.join(EXCEL_DIR, result.excel_file_path)
if not os.path.exists(excel_file_path):
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="엑셀 파일을 찾을 수 없습니다"
)
# 엑셀 파일 직접 다운로드
return FileResponse(
path=excel_file_path,
filename=f"{result.job_no}_{result.request_no}.xlsx",
media_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Failed to download request excel: {str(e)}")
raise HTTPException(
status_code=500,
detail=f"엑셀 다운로드 실패: {str(e)}"
)
def save_materials_data(materials_data: List[Dict], file_path: str, request_no: str, job_no: str, grouped_materials: List[Dict] = None):
"""
자재 데이터를 JSON으로 저장 (프론트엔드에서 동일한 엑셀 포맷으로 생성하기 위해)
"""
# 수량을 정수로 변환하여 저장
cleaned_materials = []
for material in materials_data:
cleaned_material = material.copy()
if 'quantity' in cleaned_material:
try:
cleaned_material['quantity'] = int(float(cleaned_material['quantity']))
except (ValueError, TypeError):
cleaned_material['quantity'] = 0
cleaned_materials.append(cleaned_material)
# 그룹화된 자재도 수량 정수 변환
cleaned_grouped = []
if grouped_materials:
for group in grouped_materials:
cleaned_group = group.copy()
if 'quantity' in cleaned_group:
try:
cleaned_group['quantity'] = int(float(cleaned_group['quantity']))
except (ValueError, TypeError):
cleaned_group['quantity'] = 0
cleaned_grouped.append(cleaned_group)
data_to_save = {
"request_no": request_no,
"job_no": job_no,
"created_at": datetime.now().isoformat(),
"materials": cleaned_materials,
"grouped_materials": cleaned_grouped or []
}
with open(file_path, 'w', encoding='utf-8') as f:
json.dump(data_to_save, f, ensure_ascii=False, indent=2)
def create_excel_file(materials_data: List[Dict], file_path: str, request_no: str, job_no: str):
"""
자재 데이터로 엑셀 파일 생성 (BOM 페이지와 동일한 형식)
"""
import openpyxl
from openpyxl.styles import Font, PatternFill, Alignment, Border, Side
# 새 워크북 생성
wb = openpyxl.Workbook()
wb.remove(wb.active) # 기본 시트 제거
# 카테고리별 그룹화
category_groups = {}
for material in materials_data:
category = material.get('category', 'UNKNOWN')
if category not in category_groups:
category_groups[category] = []
category_groups[category].append(material)
# 각 카테고리별 시트 생성
for category, items in category_groups.items():
if not items:
continue
ws = wb.create_sheet(title=category)
# 헤더 정의 (P열에 납기일, 관리항목 통일)
headers = ['TAGNO', '품목명', '수량', '통화구분', '단가', '크기', '압력등급', '스케줄',
'재질', '상세내역', '사용자요구', '관리항목1', '관리항목2', '관리항목3',
'관리항목4', '납기일(YYYY-MM-DD)', '관리항목5', '관리항목6', '관리항목7',
'관리항목8', '관리항목9', '관리항목10']
# 헤더 작성
for col, header in enumerate(headers, 1):
cell = ws.cell(row=1, column=col, value=header)
cell.font = Font(bold=True, color="000000", size=12, name="맑은 고딕")
cell.fill = PatternFill(start_color="B3D9FF", end_color="B3D9FF", fill_type="solid")
cell.alignment = Alignment(horizontal="center", vertical="center")
cell.border = Border(
top=Side(style="thin", color="666666"),
bottom=Side(style="thin", color="666666"),
left=Side(style="thin", color="666666"),
right=Side(style="thin", color="666666")
)
# 데이터 작성
for row_idx, material in enumerate(items, 2):
data = [
'', # TAGNO
category, # 품목명
material.get('quantity', 0), # 수량
'KRW', # 통화구분
1, # 단가
material.get('size', '-'), # 크기
'-', # 압력등급 (추후 개선)
material.get('schedule', '-'), # 스케줄
material.get('material_grade', '-'), # 재질
'-', # 상세내역 (추후 개선)
material.get('user_requirement', ''), # 사용자요구
'', '', '', '', '', # 관리항목들
datetime.now().strftime('%Y-%m-%d') # 납기일
]
for col, value in enumerate(data, 1):
ws.cell(row=row_idx, column=col, value=value)
# 컬럼 너비 자동 조정
for column in ws.columns:
max_length = 0
column_letter = column[0].column_letter
for cell in column:
try:
if len(str(cell.value)) > max_length:
max_length = len(str(cell.value))
except:
pass
adjusted_width = min(max(max_length + 2, 10), 50)
ws.column_dimensions[column_letter].width = adjusted_width
# 파일 저장
wb.save(file_path)
@router.post("/upload-excel")
async def upload_request_excel(
excel_file: UploadFile = File(...),
request_id: int = Form(...),
category: str = Form(...),
# current_user: dict = Depends(get_current_user),
db: Session = Depends(get_db)
):
"""
구매신청에 대한 엑셀 파일 업로드 (BOM에서 생성한 원본 파일)
"""
try:
# 구매신청 정보 조회
query = text("""
SELECT request_no, job_no
FROM purchase_requests
WHERE request_id = :request_id
""")
result = db.execute(query, {"request_id": request_id}).fetchone()
if not result:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="구매신청을 찾을 수 없습니다"
)
# 엑셀 저장 디렉토리 생성
excel_dir = Path("uploads/excel_exports")
excel_dir.mkdir(parents=True, exist_ok=True)
# 파일명 생성
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
safe_filename = f"{result.job_no}_{result.request_no}_{timestamp}.xlsx"
file_path = excel_dir / safe_filename
# 파일 저장
content = await excel_file.read()
with open(file_path, "wb") as f:
f.write(content)
# 구매신청 테이블에 엑셀 파일 경로 업데이트
update_query = text("""
UPDATE purchase_requests
SET excel_file_path = :excel_file_path
WHERE request_id = :request_id
""")
db.execute(update_query, {
"excel_file_path": safe_filename,
"request_id": request_id
})
db.commit()
logger.info(f"엑셀 파일 업로드 완료: {safe_filename}")
return {
"success": True,
"message": "엑셀 파일이 성공적으로 업로드되었습니다",
"file_path": safe_filename
}
except HTTPException:
raise
except Exception as e:
db.rollback()
logger.error(f"Failed to upload excel file: {str(e)}")
raise HTTPException(
status_code=500,
detail=f"엑셀 파일 업로드 실패: {str(e)}"
)