Files
TK-BOM-Project/backend/app/routers/purchase_request.py
Hyungi Ahn 72126ef78d fix: 구매신청 엑셀 수량 표시 개선 및 FLANGE 품목명 개선
- 구매신청 관리 페이지 수량을 정수로 표시 (3.000 EA → 3 EA)
- JSON 저장 시 수량 정수 변환
- FLANGE 품목명 세분화: WN RF, SO RF, ORIFICE FLANGE, SPECTACLE BLIND 등
- 구매관리 페이지 엑셀 다운로드 데이터 구조 개선
- 디버그 로그 추가
2025-10-14 15:29:01 +09:00

421 lines
16 KiB
Python

"""
구매신청 관리 API
"""
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.responses import FileResponse
from pydantic import BaseModel
from sqlalchemy import text
from sqlalchemy.orm import Session
from typing import Optional, List, Dict
from datetime import datetime
import os
import json
from ..database import get_db
from ..auth.middleware import get_current_user
from ..utils.logger import get_logger
logger = get_logger(__name__)
router = APIRouter(prefix="/purchase-request", tags=["Purchase Request"])
# 엑셀 파일 저장 경로
EXCEL_DIR = "exports"
os.makedirs(EXCEL_DIR, exist_ok=True)
class PurchaseRequestCreate(BaseModel):
file_id: int
job_no: Optional[str] = None
category: Optional[str] = None
material_ids: List[int] = []
materials_data: List[Dict] = []
grouped_materials: Optional[List[Dict]] = [] # 그룹화된 자재 정보
@router.post("/create")
async def create_purchase_request(
request_data: PurchaseRequestCreate,
# current_user: dict = Depends(get_current_user),
db: Session = Depends(get_db)
):
"""
구매신청 생성 (엑셀 내보내기 = 구매신청)
"""
try:
# 구매신청 번호 생성
today = datetime.now().strftime('%Y%m%d')
count_query = text("""
SELECT COUNT(*) as count
FROM purchase_requests
WHERE request_no LIKE :pattern
""")
count = db.execute(count_query, {"pattern": f"PR-{today}%"}).fetchone().count
request_no = f"PR-{today}-{str(count + 1).zfill(3)}"
# 자재 데이터를 JSON 파일로 저장 (나중에 재다운로드 시 사용)
json_filename = f"{request_no}.json"
json_path = os.path.join(EXCEL_DIR, json_filename)
save_materials_data(
request_data.materials_data,
json_path,
request_no,
request_data.job_no,
request_data.grouped_materials # 그룹화 정보 추가
)
# 구매신청 레코드 생성
insert_request = text("""
INSERT INTO purchase_requests (
request_no, file_id, job_no, category,
material_count, excel_file_path, requested_by
) VALUES (
:request_no, :file_id, :job_no, :category,
:material_count, :excel_path, :requested_by
) RETURNING request_id
""")
result = db.execute(insert_request, {
"request_no": request_no,
"file_id": request_data.file_id,
"job_no": request_data.job_no,
"category": request_data.category,
"material_count": len(request_data.material_ids),
"excel_path": json_filename,
"requested_by": 1 # current_user.get("user_id")
})
request_id = result.fetchone().request_id
# 구매신청 자재 상세 저장
logger.info(f"Processing {len(request_data.material_ids)} material IDs for purchase request {request_no}")
logger.info(f"First 10 Material IDs: {request_data.material_ids[:10]}") # 처음 10개만 로그
logger.info(f"Category: {request_data.category}, Job: {request_data.job_no}")
inserted_count = 0
for i, material_id in enumerate(request_data.material_ids):
material_data = request_data.materials_data[i] if i < len(request_data.materials_data) else {}
# 이미 구매신청된 자재인지 확인
check_existing = text("""
SELECT 1 FROM purchase_request_items
WHERE material_id = :material_id
""")
existing = db.execute(check_existing, {"material_id": material_id}).fetchone()
if not existing:
insert_item = text("""
INSERT INTO purchase_request_items (
request_id, material_id, quantity, unit, user_requirement
) VALUES (
:request_id, :material_id, :quantity, :unit, :user_requirement
)
""")
# quantity를 정수로 변환 (소수점 제거)
quantity_str = str(material_data.get("quantity", 0))
try:
quantity = int(float(quantity_str))
except (ValueError, TypeError):
quantity = 0
db.execute(insert_item, {
"request_id": request_id,
"material_id": material_id,
"quantity": quantity,
"unit": material_data.get("unit", ""),
"user_requirement": material_data.get("user_requirement", "")
})
inserted_count += 1
else:
logger.warning(f"Material {material_id} already in another purchase request, skipping")
db.commit()
logger.info(f"Purchase request created: {request_no} with {inserted_count} materials (out of {len(request_data.material_ids)} requested)")
# 실제 저장된 자재 확인
verify_query = text("""
SELECT COUNT(*) as count FROM purchase_request_items WHERE request_id = :request_id
""")
verified_count = db.execute(verify_query, {"request_id": request_id}).fetchone().count
logger.info(f"✅ DB 검증: purchase_request_items에 {verified_count}개 저장됨")
return {
"success": True,
"request_no": request_no,
"request_id": request_id,
"material_count": len(request_data.material_ids),
"inserted_count": inserted_count,
"verified_count": verified_count,
"message": f"구매신청 {request_no}이 생성되었습니다"
}
except Exception as e:
db.rollback()
logger.error(f"Failed to create purchase request: {str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"구매신청 생성 실패: {str(e)}"
)
@router.get("/list")
async def get_purchase_requests(
file_id: Optional[int] = None,
job_no: Optional[str] = None,
status: Optional[str] = None,
# current_user: dict = Depends(get_current_user),
db: Session = Depends(get_db)
):
"""
구매신청 목록 조회
"""
try:
query = text("""
SELECT
pr.request_id,
pr.request_no,
pr.file_id,
pr.job_no,
pr.category,
pr.material_count,
pr.excel_file_path,
pr.requested_at,
pr.status,
u.name as requested_by,
f.original_filename,
j.job_name,
COUNT(pri.item_id) as item_count
FROM purchase_requests pr
LEFT JOIN users u ON pr.requested_by = u.user_id
LEFT JOIN files f ON pr.file_id = f.id
LEFT JOIN jobs j ON pr.job_no = j.job_no
LEFT JOIN purchase_request_items pri ON pr.request_id = pri.request_id
WHERE 1=1
AND (:file_id IS NULL OR pr.file_id = :file_id)
AND (:job_no IS NULL OR pr.job_no = :job_no)
AND (:status IS NULL OR pr.status = :status)
GROUP BY
pr.request_id, pr.request_no, pr.file_id, pr.job_no,
pr.category, pr.material_count, pr.excel_file_path,
pr.requested_at, pr.status, u.name, f.original_filename, j.job_name
ORDER BY pr.requested_at DESC
""")
results = db.execute(query, {
"file_id": file_id,
"job_no": job_no,
"status": status
}).fetchall()
requests = []
for row in results:
requests.append({
"request_id": row.request_id,
"request_no": row.request_no,
"file_id": row.file_id,
"job_no": row.job_no,
"job_name": row.job_name,
"category": row.category,
"material_count": row.material_count,
"item_count": row.item_count,
"excel_file_path": row.excel_file_path,
"requested_at": row.requested_at.isoformat() if row.requested_at else None,
"status": row.status,
"requested_by": row.requested_by,
"source_file": row.original_filename
})
return {
"success": True,
"requests": requests,
"count": len(requests)
}
except Exception as e:
logger.error(f"Failed to get purchase requests: {str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"구매신청 목록 조회 실패: {str(e)}"
)
@router.get("/{request_id}/materials")
async def get_request_materials(
request_id: int,
# current_user: dict = Depends(get_current_user),
db: Session = Depends(get_db)
):
"""
구매신청에 포함된 자재 목록 조회 (그룹화 정보 포함)
"""
try:
# 구매신청 정보 조회하여 JSON 파일 경로 가져오기
info_query = text("""
SELECT excel_file_path
FROM purchase_requests
WHERE request_id = :request_id
""")
info_result = db.execute(info_query, {"request_id": request_id}).fetchone()
grouped_materials = []
if info_result and info_result.excel_file_path:
json_path = os.path.join(EXCEL_DIR, info_result.excel_file_path)
if os.path.exists(json_path):
with open(json_path, 'r', encoding='utf-8') as f:
data = json.load(f)
grouped_materials = data.get("grouped_materials", [])
# 개별 자재 정보 조회 (기존 코드)
query = text("""
SELECT
pri.item_id,
pri.material_id,
pri.quantity as requested_quantity,
pri.unit as requested_unit,
pri.user_requirement,
pri.is_ordered,
pri.is_received,
m.original_description,
m.classified_category,
m.size_spec,
m.schedule,
m.material_grade,
m.quantity as original_quantity,
m.unit as original_unit
FROM purchase_request_items pri
JOIN materials m ON pri.material_id = m.id
WHERE pri.request_id = :request_id
ORDER BY m.classified_category, m.original_description
""")
results = db.execute(query, {"request_id": request_id}).fetchall()
materials = []
for row in results:
# quantity를 정수로 변환 (소수점 제거)
qty = row.requested_quantity or row.original_quantity
try:
qty_int = int(float(qty)) if qty else 0
except (ValueError, TypeError):
qty_int = 0
materials.append({
"item_id": row.item_id,
"material_id": row.material_id,
"description": row.original_description,
"category": row.classified_category,
"size": row.size_spec,
"schedule": row.schedule,
"material_grade": row.material_grade,
"quantity": qty_int, # 정수로 변환
"unit": row.requested_unit or row.original_unit,
"user_requirement": row.user_requirement,
"is_ordered": row.is_ordered,
"is_received": row.is_received
})
return {
"success": True,
"materials": materials,
"grouped_materials": grouped_materials, # 그룹화 정보 추가
"count": len(grouped_materials) if grouped_materials else len(materials)
}
except Exception as e:
logger.error(f"Failed to get request materials: {str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"구매신청 자재 조회 실패: {str(e)}"
)
@router.get("/{request_id}/download-excel")
async def download_request_excel(
request_id: int,
# current_user: dict = Depends(get_current_user),
db: Session = Depends(get_db)
):
"""
구매신청 자재 데이터 조회 (프론트엔드에서 엑셀 생성용)
"""
try:
# 구매신청 정보 조회
query = text("""
SELECT request_no, excel_file_path, job_no
FROM purchase_requests
WHERE request_id = :request_id
""")
result = db.execute(query, {"request_id": request_id}).fetchone()
if not result:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="구매신청을 찾을 수 없습니다"
)
file_path = os.path.join(EXCEL_DIR, result.excel_file_path)
if not os.path.exists(file_path):
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="데이터 파일을 찾을 수 없습니다"
)
# JSON 파일 읽기
with open(file_path, 'r', encoding='utf-8') as f:
data = json.load(f)
return {
"success": True,
"request_no": result.request_no,
"job_no": result.job_no,
"materials": data.get("materials", []),
"grouped_materials": data.get("grouped_materials", []) # 그룹화 정보도 반환
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Failed to download request excel: {str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"엑셀 다운로드 실패: {str(e)}"
)
def save_materials_data(materials_data: List[Dict], file_path: str, request_no: str, job_no: str, grouped_materials: List[Dict] = None):
"""
자재 데이터를 JSON으로 저장 (프론트엔드에서 동일한 엑셀 포맷으로 생성하기 위해)
"""
# 수량을 정수로 변환하여 저장
cleaned_materials = []
for material in materials_data:
cleaned_material = material.copy()
if 'quantity' in cleaned_material:
try:
cleaned_material['quantity'] = int(float(cleaned_material['quantity']))
except (ValueError, TypeError):
cleaned_material['quantity'] = 0
cleaned_materials.append(cleaned_material)
# 그룹화된 자재도 수량 정수 변환
cleaned_grouped = []
if grouped_materials:
for group in grouped_materials:
cleaned_group = group.copy()
if 'quantity' in cleaned_group:
try:
cleaned_group['quantity'] = int(float(cleaned_group['quantity']))
except (ValueError, TypeError):
cleaned_group['quantity'] = 0
cleaned_grouped.append(cleaned_group)
data_to_save = {
"request_no": request_no,
"job_no": job_no,
"created_at": datetime.now().isoformat(),
"materials": cleaned_materials,
"grouped_materials": cleaned_grouped or []
}
with open(file_path, 'w', encoding='utf-8') as f:
json.dump(data_to_save, f, ensure_ascii=False, indent=2)