feat: 구매 수량 계산 시스템 구현

🧮 구매 수량 계산 로직:
- PIPE: 절단 손실(3mm/절단) + 6M 단위 올림 계산
- 일반 자재: 여유율 + 최소 주문 수량 적용
- 자재별 차별화된 여유율 (VALVE 50%, BOLT 20% 등)

🛒 구매 관리 API:
- /purchase/items/calculate: 실시간 구매 수량 계산
- /purchase/items/save: 구매 품목 DB 저장
- /purchase/revision-diff: 리비전간 차이 계산
- /purchase/orders/create: 구매 주문 생성

🧪 테스트 검증:
- PIPE 절단 손실 계산: 25,000mm → 5본 (정확)
- 여유율 적용: VALVE 2개 → 3개 (50% 예비)
- 최소 주문: BOLT 24개 → 50개 (박스 단위)

📱 프론트엔드:
- PurchaseConfirmationPage 라우팅 추가
- 구매확정 버튼 → 구매 페이지 이동
This commit is contained in:
Hyungi Ahn
2025-07-18 13:18:13 +09:00
parent 92a78225f0
commit 28431ee490
5 changed files with 1079 additions and 6 deletions

View File

@@ -37,6 +37,12 @@ try:
except ImportError:
print("jobs 라우터를 찾을 수 없습니다")
try:
from .routers import purchase
app.include_router(purchase.router, tags=["purchase"])
except ImportError:
print("purchase 라우터를 찾을 수 없습니다")
# 파일 목록 조회 API
@app.get("/files")
async def get_files(

View File

@@ -0,0 +1,428 @@
"""
구매 관리 API
- 구매 품목 생성/조회
- 구매 수량 계산
- 리비전 비교
"""
from fastapi import APIRouter, Depends, HTTPException, Query
from sqlalchemy.orm import Session
from sqlalchemy import text
from typing import List, Optional
import json
from ..database import get_db
from ..services.purchase_calculator import (
generate_purchase_items_from_materials,
save_purchase_items_to_db,
calculate_pipe_purchase_quantity,
calculate_standard_purchase_quantity
)
router = APIRouter(prefix="/purchase", tags=["purchase"])
@router.get("/items/calculate")
async def calculate_purchase_items(
job_no: str = Query(..., description="Job 번호"),
revision: str = Query("Rev.0", description="리비전"),
file_id: Optional[int] = Query(None, description="파일 ID (선택사항)"),
db: Session = Depends(get_db)
):
"""
구매 품목 계산 (실시간)
- 자재 데이터로부터 구매 품목 생성
- 수량 계산 (파이프 절단손실 포함)
"""
try:
# 1. 파일 ID 조회 (job_no, revision으로)
if not file_id:
file_query = text("""
SELECT id FROM files
WHERE job_no = :job_no AND revision = :revision AND is_active = TRUE
ORDER BY created_at DESC
LIMIT 1
""")
file_result = db.execute(file_query, {"job_no": job_no, "revision": revision}).fetchone()
if not file_result:
raise HTTPException(status_code=404, detail=f"Job {job_no} {revision}에 해당하는 파일을 찾을 수 없습니다")
file_id = file_result[0]
# 2. 구매 품목 생성
purchase_items = generate_purchase_items_from_materials(db, file_id, job_no, revision)
return {
"success": True,
"job_no": job_no,
"revision": revision,
"file_id": file_id,
"items": purchase_items,
"total_items": len(purchase_items)
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"구매 품목 계산 실패: {str(e)}")
@router.post("/items/save")
async def save_purchase_items(
job_no: str,
revision: str,
file_id: int,
db: Session = Depends(get_db)
):
"""
구매 품목을 데이터베이스에 저장
"""
try:
# 1. 구매 품목 생성
purchase_items = generate_purchase_items_from_materials(db, file_id, job_no, revision)
# 2. 데이터베이스에 저장
saved_ids = save_purchase_items_to_db(db, purchase_items)
return {
"success": True,
"message": f"{len(saved_ids)}개 구매 품목이 저장되었습니다",
"saved_items": len(saved_ids),
"item_ids": saved_ids
}
except Exception as e:
db.rollback()
raise HTTPException(status_code=500, detail=f"구매 품목 저장 실패: {str(e)}")
@router.get("/items")
async def get_purchase_items(
job_no: str = Query(..., description="Job 번호"),
revision: str = Query("Rev.0", description="리비전"),
category: Optional[str] = Query(None, description="카테고리 필터"),
db: Session = Depends(get_db)
):
"""
저장된 구매 품목 조회
"""
try:
query = text("""
SELECT pi.*,
COUNT(mpm.material_id) as material_count,
SUM(m.quantity) as total_material_quantity
FROM purchase_items pi
LEFT JOIN material_purchase_mapping mpm ON pi.id = mpm.purchase_item_id
LEFT JOIN materials m ON mpm.material_id = m.id
WHERE pi.job_no = :job_no AND pi.revision = :revision AND pi.is_active = TRUE
""")
params = {"job_no": job_no, "revision": revision}
if category:
query = text(str(query) + " AND pi.category = :category")
params["category"] = category
query = text(str(query) + """
GROUP BY pi.id
ORDER BY pi.category, pi.specification
""")
result = db.execute(query, params)
items = result.fetchall()
return {
"success": True,
"job_no": job_no,
"revision": revision,
"items": [dict(item) for item in items],
"total_items": len(items)
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"구매 품목 조회 실패: {str(e)}")
@router.patch("/items/{item_id}")
async def update_purchase_item(
item_id: int,
safety_factor: Optional[float] = None,
calculated_qty: Optional[float] = None,
minimum_order_qty: Optional[float] = None,
db: Session = Depends(get_db)
):
"""
구매 품목 수정 (수량 조정)
"""
try:
update_fields = []
params = {"item_id": item_id}
if safety_factor is not None:
update_fields.append("safety_factor = :safety_factor")
params["safety_factor"] = safety_factor
if calculated_qty is not None:
update_fields.append("calculated_qty = :calculated_qty")
params["calculated_qty"] = calculated_qty
if minimum_order_qty is not None:
update_fields.append("minimum_order_qty = :minimum_order_qty")
params["minimum_order_qty"] = minimum_order_qty
if not update_fields:
raise HTTPException(status_code=400, detail="수정할 필드가 없습니다")
update_fields.append("updated_at = CURRENT_TIMESTAMP")
query = text(f"""
UPDATE purchase_items
SET {', '.join(update_fields)}
WHERE id = :item_id
RETURNING id, calculated_qty, safety_factor
""")
result = db.execute(query, params)
updated_item = result.fetchone()
if not updated_item:
raise HTTPException(status_code=404, detail="구매 품목을 찾을 수 없습니다")
db.commit()
return {
"success": True,
"message": "구매 품목이 수정되었습니다",
"item": dict(updated_item)
}
except Exception as e:
db.rollback()
raise HTTPException(status_code=500, detail=f"구매 품목 수정 실패: {str(e)}")
@router.get("/revision-diff")
async def get_revision_diff(
job_no: str = Query(..., description="Job 번호"),
current_revision: str = Query(..., description="현재 리비전"),
previous_revision: str = Query(..., description="이전 리비전"),
db: Session = Depends(get_db)
):
"""
리비전간 구매 수량 차이 계산
"""
try:
# 1. 이전 리비전 구매 품목 조회
prev_query = text("""
SELECT item_code, category, specification, calculated_qty, bom_quantity
FROM purchase_items
WHERE job_no = :job_no AND revision = :prev_revision AND is_active = TRUE
""")
prev_items = db.execute(prev_query, {
"job_no": job_no,
"prev_revision": previous_revision
}).fetchall()
# 2. 현재 리비전 구매 품목 조회
curr_query = text("""
SELECT item_code, category, specification, calculated_qty, bom_quantity
FROM purchase_items
WHERE job_no = :job_no AND revision = :curr_revision AND is_active = TRUE
""")
curr_items = db.execute(curr_query, {
"job_no": job_no,
"curr_revision": current_revision
}).fetchall()
# 3. 차이 계산
prev_dict = {item.item_code: dict(item) for item in prev_items}
curr_dict = {item.item_code: dict(item) for item in curr_items}
changes = []
added_items = 0
modified_items = 0
# 현재 리비전에서 추가되거나 변경된 항목
for item_code, curr_item in curr_dict.items():
if item_code not in prev_dict:
# 새로 추가된 품목
changes.append({
"item_code": item_code,
"change_type": "ADDED",
"specification": curr_item["specification"],
"previous_qty": 0,
"current_qty": curr_item["calculated_qty"],
"qty_diff": curr_item["calculated_qty"],
"additional_needed": curr_item["calculated_qty"]
})
added_items += 1
else:
prev_item = prev_dict[item_code]
qty_diff = curr_item["calculated_qty"] - prev_item["calculated_qty"]
if abs(qty_diff) > 0.001: # 수량 변경
changes.append({
"item_code": item_code,
"change_type": "MODIFIED",
"specification": curr_item["specification"],
"previous_qty": prev_item["calculated_qty"],
"current_qty": curr_item["calculated_qty"],
"qty_diff": qty_diff,
"additional_needed": max(qty_diff, 0) # 증가한 경우만 추가 구매
})
modified_items += 1
# 삭제된 품목 (현재 리비전에 없는 항목)
removed_items = 0
for item_code, prev_item in prev_dict.items():
if item_code not in curr_dict:
changes.append({
"item_code": item_code,
"change_type": "REMOVED",
"specification": prev_item["specification"],
"previous_qty": prev_item["calculated_qty"],
"current_qty": 0,
"qty_diff": -prev_item["calculated_qty"],
"additional_needed": 0
})
removed_items += 1
# 요약 정보
total_additional_needed = sum(
change["additional_needed"] for change in changes
if change["additional_needed"] > 0
)
has_changes = len(changes) > 0
summary = f"추가: {added_items}개, 변경: {modified_items}개, 삭제: {removed_items}"
if total_additional_needed > 0:
summary += f" (추가 구매 필요: {total_additional_needed:.1f})"
return {
"success": True,
"job_no": job_no,
"previous_revision": previous_revision,
"current_revision": current_revision,
"comparison": {
"has_changes": has_changes,
"summary": summary,
"added_items": added_items,
"modified_items": modified_items,
"removed_items": removed_items,
"total_additional_needed": total_additional_needed,
"changes": changes
}
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"리비전 비교 실패: {str(e)}")
@router.post("/orders/create")
async def create_purchase_order(
job_no: str,
revision: str,
items: List[dict],
supplier_name: Optional[str] = None,
required_date: Optional[str] = None,
db: Session = Depends(get_db)
):
"""
구매 주문 생성
"""
try:
from datetime import datetime, date
# 1. 주문 번호 생성
order_no = f"PO-{job_no}-{revision}-{datetime.now().strftime('%Y%m%d')}"
# 2. 구매 주문 생성
order_query = text("""
INSERT INTO purchase_orders (
order_no, job_no, revision, status, order_date, required_date,
supplier_name, created_by
) VALUES (
:order_no, :job_no, :revision, 'DRAFT', CURRENT_DATE, :required_date,
:supplier_name, 'system'
) RETURNING id
""")
order_result = db.execute(order_query, {
"order_no": order_no,
"job_no": job_no,
"revision": revision,
"required_date": required_date,
"supplier_name": supplier_name
})
order_id = order_result.fetchone()[0]
# 3. 주문 상세 항목 생성
total_amount = 0
for item in items:
item_query = text("""
INSERT INTO purchase_order_items (
purchase_order_id, purchase_item_id, ordered_quantity, required_date
) VALUES (
:order_id, :item_id, :quantity, :required_date
)
""")
db.execute(item_query, {
"order_id": order_id,
"item_id": item["purchase_item_id"],
"quantity": item["ordered_quantity"],
"required_date": required_date
})
db.commit()
return {
"success": True,
"message": "구매 주문이 생성되었습니다",
"order_no": order_no,
"order_id": order_id,
"items_count": len(items)
}
except Exception as e:
db.rollback()
raise HTTPException(status_code=500, detail=f"구매 주문 생성 실패: {str(e)}")
@router.get("/orders")
async def get_purchase_orders(
job_no: Optional[str] = Query(None, description="Job 번호"),
status: Optional[str] = Query(None, description="주문 상태"),
db: Session = Depends(get_db)
):
"""
구매 주문 목록 조회
"""
try:
query = text("""
SELECT po.*,
COUNT(poi.id) as items_count,
SUM(poi.ordered_quantity) as total_quantity
FROM purchase_orders po
LEFT JOIN purchase_order_items poi ON po.id = poi.purchase_order_id
WHERE 1=1
""")
params = {}
if job_no:
query = text(str(query) + " AND po.job_no = :job_no")
params["job_no"] = job_no
if status:
query = text(str(query) + " AND po.status = :status")
params["status"] = status
query = text(str(query) + """
GROUP BY po.id
ORDER BY po.created_at DESC
""")
result = db.execute(query, params)
orders = result.fetchall()
return {
"success": True,
"orders": [dict(order) for order in orders],
"total_orders": len(orders)
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"구매 주문 조회 실패: {str(e)}")

View File

@@ -0,0 +1,526 @@
"""
구매 수량 계산 서비스
- 자재별 여유율 적용
- PIPE: 절단 손실 + 6M 단위 계산
- 기타: 최소 주문 수량 적용
"""
import math
from typing import Dict, List, Tuple
from sqlalchemy.orm import Session
from sqlalchemy import text
# 자재별 기본 여유율
SAFETY_FACTORS = {
'PIPE': 1.15, # 15% 추가 (절단 손실)
'FITTING': 1.10, # 10% 추가 (연결 오차)
'VALVE': 1.50, # 50% 추가 (예비품)
'FLANGE': 1.10, # 10% 추가
'BOLT': 1.20, # 20% 추가 (분실율)
'GASKET': 1.25, # 25% 추가 (교체주기)
'INSTRUMENT': 1.00, # 0% 추가 (정확한 수량)
'DEFAULT': 1.10 # 기본 10% 추가
}
# 최소 주문 수량 (자재별)
MINIMUM_ORDER_QTY = {
'PIPE': 6000, # 6M 단위
'FITTING': 1, # 개별 주문 가능
'VALVE': 1, # 개별 주문 가능
'FLANGE': 1, # 개별 주문 가능
'BOLT': 50, # 박스 단위 (50개)
'GASKET': 10, # 세트 단위
'INSTRUMENT': 1, # 개별 주문 가능
'DEFAULT': 1
}
def calculate_pipe_purchase_quantity(materials: List[Dict]) -> Dict:
"""
PIPE 구매 수량 계산
- 각 절단마다 3mm 손실
- 6,000mm (6M) 단위로 올림
"""
total_bom_length = 0
cutting_count = 0
pipe_details = []
for material in materials:
# 길이 정보 추출
length_mm = float(material.get('length_mm', 0) or 0)
if length_mm > 0:
total_bom_length += length_mm
cutting_count += 1
pipe_details.append({
'description': material.get('original_description', ''),
'length_mm': length_mm,
'quantity': material.get('quantity', 1)
})
# 절단 손실 계산 (각 절단마다 3mm)
cutting_loss = cutting_count * 3
# 총 필요 길이 = BOM 길이 + 절단 손실
required_length = total_bom_length + cutting_loss
# 6M 단위로 올림 계산
standard_length = 6000 # 6M = 6,000mm
pipes_needed = math.ceil(required_length / standard_length) if required_length > 0 else 0
total_purchase_length = pipes_needed * standard_length
waste_length = total_purchase_length - required_length if pipes_needed > 0 else 0
return {
'bom_quantity': total_bom_length,
'cutting_count': cutting_count,
'cutting_loss': cutting_loss,
'required_length': required_length,
'pipes_count': pipes_needed,
'calculated_qty': total_purchase_length,
'waste_length': waste_length,
'utilization_rate': (required_length / total_purchase_length * 100) if total_purchase_length > 0 else 0,
'unit': 'mm',
'pipe_details': pipe_details
}
def calculate_standard_purchase_quantity(category: str, bom_quantity: float,
safety_factor: float = None) -> Dict:
"""
일반 자재 구매 수량 계산
- 여유율 적용
- 최소 주문 수량 적용
"""
# 여유율 결정
if safety_factor is None:
safety_factor = SAFETY_FACTORS.get(category, SAFETY_FACTORS['DEFAULT'])
# 1단계: 여유율 적용
safety_qty = bom_quantity * safety_factor
# 2단계: 최소 주문 수량 확인
min_order_qty = MINIMUM_ORDER_QTY.get(category, MINIMUM_ORDER_QTY['DEFAULT'])
# 3단계: 최소 주문 수량과 비교하여 큰 값 선택
calculated_qty = max(safety_qty, min_order_qty)
# 4단계: 특별 처리 (BOLT는 박스 단위로 올림)
if category == 'BOLT' and calculated_qty > min_order_qty:
calculated_qty = math.ceil(calculated_qty / min_order_qty) * min_order_qty
return {
'bom_quantity': bom_quantity,
'safety_factor': safety_factor,
'safety_qty': safety_qty,
'min_order_qty': min_order_qty,
'calculated_qty': calculated_qty,
'waste_quantity': calculated_qty - bom_quantity,
'utilization_rate': (bom_quantity / calculated_qty * 100) if calculated_qty > 0 else 0
}
def generate_purchase_items_from_materials(db: Session, file_id: int,
job_no: str, revision: str) -> List[Dict]:
"""
자재 데이터로부터 구매 품목 생성
"""
# 1. 파일의 모든 자재 조회
materials_query = text("""
SELECT m.*,
pd.length_mm, pd.outer_diameter, pd.schedule, pd.material_spec as pipe_material_spec,
fd.fitting_type, fd.connection_method as fitting_connection,
vd.valve_type, vd.connection_method as valve_connection, vd.pressure_rating as valve_pressure,
fl.flange_type, fl.pressure_rating as flange_pressure,
gd.gasket_type, gd.material_type as gasket_material,
bd.bolt_type, bd.material_standard, bd.diameter,
id.instrument_type
FROM materials m
LEFT JOIN pipe_details pd ON m.id = pd.material_id
LEFT JOIN fitting_details fd ON m.id = fd.material_id
LEFT JOIN valve_details vd ON m.id = vd.material_id
LEFT JOIN flange_details fl ON m.id = fl.material_id
LEFT JOIN gasket_details gd ON m.id = gd.material_id
LEFT JOIN bolt_details bd ON m.id = bd.material_id
LEFT JOIN instrument_details id ON m.id = id.material_id
WHERE m.file_id = :file_id
ORDER BY m.classified_category, m.original_description
""")
materials = db.execute(materials_query, {"file_id": file_id}).fetchall()
# 2. 카테고리별로 그룹핑
grouped_materials = {}
for material in materials:
category = material.classified_category or 'OTHER'
if category not in grouped_materials:
grouped_materials[category] = []
grouped_materials[category].append(dict(material))
# 3. 각 카테고리별로 구매 품목 생성
purchase_items = []
for category, category_materials in grouped_materials.items():
if category == 'PIPE':
# PIPE는 재질+사이즈+스케줄별로 그룹핑
pipe_groups = {}
for material in category_materials:
# 그룹핑 키 생성
material_spec = material.get('pipe_material_spec') or material.get('material_grade', '')
outer_diameter = material.get('outer_diameter') or material.get('main_nom', '')
schedule = material.get('schedule', '')
group_key = f"{material_spec}|{outer_diameter}|{schedule}"
if group_key not in pipe_groups:
pipe_groups[group_key] = []
pipe_groups[group_key].append(material)
# 각 PIPE 그룹별로 구매 수량 계산
for group_key, group_materials in pipe_groups.items():
pipe_calc = calculate_pipe_purchase_quantity(group_materials)
if pipe_calc['calculated_qty'] > 0:
material_spec, outer_diameter, schedule = group_key.split('|')
# 품목 코드 생성
item_code = generate_item_code('PIPE', material_spec, outer_diameter, schedule)
# 사양 생성
spec_parts = [f"PIPE {outer_diameter}"]
if schedule: spec_parts.append(schedule)
if material_spec: spec_parts.append(material_spec)
specification = ', '.join(spec_parts)
purchase_item = {
'item_code': item_code,
'category': 'PIPE',
'specification': specification,
'material_spec': material_spec,
'size_spec': outer_diameter,
'unit': 'mm',
**pipe_calc,
'job_no': job_no,
'revision': revision,
'file_id': file_id,
'materials': group_materials
}
purchase_items.append(purchase_item)
else:
# 기타 자재들은 사양별로 그룹핑
spec_groups = generate_material_specs_for_category(category_materials, category)
for spec_key, spec_data in spec_groups.items():
if spec_data['totalQuantity'] > 0:
# 구매 수량 계산
calc_result = calculate_standard_purchase_quantity(
category,
spec_data['totalQuantity']
)
# 품목 코드 생성
item_code = generate_item_code(category, spec_data.get('material_spec', ''),
spec_data.get('size_display', ''))
purchase_item = {
'item_code': item_code,
'category': category,
'specification': spec_data.get('full_spec', spec_key),
'material_spec': spec_data.get('material_spec', ''),
'size_spec': spec_data.get('size_display', ''),
'unit': spec_data.get('unit', 'EA'),
**calc_result,
'job_no': job_no,
'revision': revision,
'file_id': file_id,
'materials': spec_data['items']
}
purchase_items.append(purchase_item)
return purchase_items
def generate_material_specs_for_category(materials: List[Dict], category: str) -> Dict:
"""카테고리별 자재 사양 그룹핑 (MaterialsPage.jsx 로직과 동일)"""
specs = {}
for material in materials:
spec_key = ''
spec_data = {}
if category == 'FITTING':
fitting_type = material.get('fitting_type', 'FITTING')
connection_method = material.get('fitting_connection', '')
material_spec = material.get('material_grade', '')
main_nom = material.get('main_nom', '')
red_nom = material.get('red_nom', '')
size_display = f"{main_nom} x {red_nom}" if red_nom else main_nom
spec_parts = [fitting_type]
if connection_method: spec_parts.append(connection_method)
full_spec = ', '.join(spec_parts)
spec_key = f"FITTING|{full_spec}|{material_spec}|{size_display}"
spec_data = {
'category': 'FITTING',
'full_spec': full_spec,
'material_spec': material_spec,
'size_display': size_display,
'unit': 'EA'
}
elif category == 'VALVE':
valve_type = material.get('valve_type', 'VALVE')
connection_method = material.get('valve_connection', '')
pressure_rating = material.get('valve_pressure', '')
material_spec = material.get('material_grade', '')
main_nom = material.get('main_nom', '')
spec_parts = [valve_type.replace('_', ' ')]
if connection_method: spec_parts.append(connection_method.replace('_', ' '))
if pressure_rating: spec_parts.append(pressure_rating)
full_spec = ', '.join(spec_parts)
spec_key = f"VALVE|{full_spec}|{material_spec}|{main_nom}"
spec_data = {
'category': 'VALVE',
'full_spec': full_spec,
'material_spec': material_spec,
'size_display': main_nom,
'unit': 'EA'
}
elif category == 'FLANGE':
flange_type = material.get('flange_type', 'FLANGE')
pressure_rating = material.get('flange_pressure', '')
material_spec = material.get('material_grade', '')
main_nom = material.get('main_nom', '')
spec_parts = [flange_type]
if pressure_rating: spec_parts.append(pressure_rating)
full_spec = ', '.join(spec_parts)
spec_key = f"FLANGE|{full_spec}|{material_spec}|{main_nom}"
spec_data = {
'category': 'FLANGE',
'full_spec': full_spec,
'material_spec': material_spec,
'size_display': main_nom,
'unit': 'EA'
}
elif category == 'BOLT':
bolt_type = material.get('bolt_type', 'BOLT')
material_standard = material.get('material_standard', '')
diameter = material.get('diameter', material.get('main_nom', ''))
material_spec = material_standard or material.get('material_grade', '')
spec_parts = [bolt_type.replace('_', ' ')]
if material_standard: spec_parts.append(material_standard)
full_spec = ', '.join(spec_parts)
spec_key = f"BOLT|{full_spec}|{material_spec}|{diameter}"
spec_data = {
'category': 'BOLT',
'full_spec': full_spec,
'material_spec': material_spec,
'size_display': diameter,
'unit': 'EA'
}
elif category == 'GASKET':
gasket_type = material.get('gasket_type', 'GASKET')
gasket_material = material.get('gasket_material', '')
material_spec = gasket_material or material.get('material_grade', '')
main_nom = material.get('main_nom', '')
spec_parts = [gasket_type]
if gasket_material: spec_parts.append(gasket_material)
full_spec = ', '.join(spec_parts)
spec_key = f"GASKET|{full_spec}|{material_spec}|{main_nom}"
spec_data = {
'category': 'GASKET',
'full_spec': full_spec,
'material_spec': material_spec,
'size_display': main_nom,
'unit': 'EA'
}
elif category == 'INSTRUMENT':
instrument_type = material.get('instrument_type', 'INSTRUMENT')
material_spec = material.get('material_grade', '')
main_nom = material.get('main_nom', '')
full_spec = instrument_type.replace('_', ' ')
spec_key = f"INSTRUMENT|{full_spec}|{material_spec}|{main_nom}"
spec_data = {
'category': 'INSTRUMENT',
'full_spec': full_spec,
'material_spec': material_spec,
'size_display': main_nom,
'unit': 'EA'
}
else:
# 기타 자재
material_spec = material.get('material_grade', '')
size_display = material.get('main_nom') or material.get('size_spec', '')
spec_key = f"{category}|{material_spec}|{size_display}"
spec_data = {
'category': category,
'full_spec': material_spec,
'material_spec': material_spec,
'size_display': size_display,
'unit': 'EA'
}
# 스펙별 수량 집계
if spec_key not in specs:
specs[spec_key] = {
**spec_data,
'totalQuantity': 0,
'count': 0,
'items': []
}
specs[spec_key]['totalQuantity'] += material.get('quantity', 0)
specs[spec_key]['count'] += 1
specs[spec_key]['items'].append(material)
return specs
def generate_item_code(category: str, material_spec: str = '', size_spec: str = '',
schedule: str = '') -> str:
"""구매 품목 코드 생성"""
import hashlib
# 기본 접두사
prefix = f"PI-{category}"
# 재질 약어 생성
material_abbr = ''
if 'A106' in material_spec:
material_abbr = 'A106'
elif 'A333' in material_spec:
material_abbr = 'A333'
elif 'SS316' in material_spec or '316' in material_spec:
material_abbr = 'SS316'
elif 'A105' in material_spec:
material_abbr = 'A105'
elif material_spec:
material_abbr = material_spec.replace(' ', '')[:6]
# 사이즈 약어
size_abbr = size_spec.replace('"', 'IN').replace(' ', '').replace('x', 'X')[:10]
# 스케줄 (PIPE용)
schedule_abbr = schedule.replace(' ', '')[:6]
# 유니크 해시 생성 (중복 방지)
unique_str = f"{category}|{material_spec}|{size_spec}|{schedule}"
hash_suffix = hashlib.md5(unique_str.encode()).hexdigest()[:4].upper()
# 최종 코드 조합
code_parts = [prefix]
if material_abbr: code_parts.append(material_abbr)
if size_abbr: code_parts.append(size_abbr)
if schedule_abbr: code_parts.append(schedule_abbr)
code_parts.append(hash_suffix)
return '-'.join(code_parts)
def save_purchase_items_to_db(db: Session, purchase_items: List[Dict]) -> List[int]:
"""구매 품목을 데이터베이스에 저장"""
saved_ids = []
for item in purchase_items:
# 기존 품목 확인 (동일 사양이 있는지)
existing_query = text("""
SELECT id FROM purchase_items
WHERE job_no = :job_no AND revision = :revision AND item_code = :item_code
""")
existing = db.execute(existing_query, {
'job_no': item['job_no'],
'revision': item['revision'],
'item_code': item['item_code']
}).fetchone()
if existing:
# 기존 품목 업데이트
update_query = text("""
UPDATE purchase_items SET
bom_quantity = :bom_quantity,
calculated_qty = :calculated_qty,
safety_factor = :safety_factor,
cutting_loss = :cutting_loss,
pipes_count = :pipes_count,
waste_length = :waste_length,
detailed_spec = :detailed_spec,
updated_at = CURRENT_TIMESTAMP
WHERE id = :id
""")
db.execute(update_query, {
'id': existing.id,
'bom_quantity': item['bom_quantity'],
'calculated_qty': item['calculated_qty'],
'safety_factor': item.get('safety_factor', 1.0),
'cutting_loss': item.get('cutting_loss', 0),
'pipes_count': item.get('pipes_count'),
'waste_length': item.get('waste_length'),
'detailed_spec': item.get('detailed_spec', '{}')
})
saved_ids.append(existing.id)
else:
# 새 품목 생성
insert_query = text("""
INSERT INTO purchase_items (
item_code, category, specification, material_spec, size_spec, unit,
bom_quantity, safety_factor, minimum_order_qty, calculated_qty,
cutting_loss, standard_length, pipes_count, waste_length,
job_no, revision, file_id, is_active, created_by
) VALUES (
:item_code, :category, :specification, :material_spec, :size_spec, :unit,
:bom_quantity, :safety_factor, :minimum_order_qty, :calculated_qty,
:cutting_loss, :standard_length, :pipes_count, :waste_length,
:job_no, :revision, :file_id, :is_active, :created_by
) RETURNING id
""")
result = db.execute(insert_query, {
'item_code': item['item_code'],
'category': item['category'],
'specification': item['specification'],
'material_spec': item['material_spec'],
'size_spec': item['size_spec'],
'unit': item['unit'],
'bom_quantity': item['bom_quantity'],
'safety_factor': item.get('safety_factor', 1.0),
'minimum_order_qty': item.get('min_order_qty', 0),
'calculated_qty': item['calculated_qty'],
'cutting_loss': item.get('cutting_loss', 0),
'standard_length': item.get('standard_length', 6000 if item['category'] == 'PIPE' else None),
'pipes_count': item.get('pipes_count'),
'waste_length': item.get('waste_length'),
'job_no': item['job_no'],
'revision': item['revision'],
'file_id': item['file_id'],
'is_active': True,
'created_by': 'system'
})
result_row = result.fetchone()
new_id = result_row[0] if result_row else None
saved_ids.append(new_id)
# 개별 자재와 구매 품목 연결
for material in item['materials']:
mapping_query = text("""
INSERT INTO material_purchase_mapping (material_id, purchase_item_id)
VALUES (:material_id, :purchase_item_id)
ON CONFLICT (material_id, purchase_item_id) DO NOTHING
""")
db.execute(mapping_query, {
'material_id': material['id'],
'purchase_item_id': new_id
})
db.commit()
return saved_ids

View File

@@ -1,17 +1,20 @@
import React from 'react';
import { BrowserRouter as Router, Routes, Route, Navigate } from 'react-router-dom';
import JobSelectionPage from './pages/JobSelectionPage';
import BOMStatusPage from './pages/BOMStatusPage';
import { BrowserRouter as Router, Routes, Route } from 'react-router-dom';
import ProjectSelectionPage from './pages/ProjectSelectionPage';
import BOMManagerPage from './pages/BOMManagerPage';
import MaterialsPage from './pages/MaterialsPage';
import BOMStatusPage from './pages/BOMStatusPage';
import PurchaseConfirmationPage from './pages/PurchaseConfirmationPage';
function App() {
return (
<Router>
<Routes>
<Route path="/" element={<JobSelectionPage />} />
<Route path="/bom-status" element={<BOMStatusPage />} />
<Route path="/" element={<ProjectSelectionPage />} />
<Route path="/bom-manager" element={<BOMManagerPage />} />
<Route path="/materials" element={<MaterialsPage />} />
<Route path="*" element={<Navigate to="/" />} />
<Route path="/bom-status" element={<BOMStatusPage />} />
<Route path="/purchase-confirmation" element={<PurchaseConfirmationPage />} />
</Routes>
</Router>
);

110
test_purchase_calculator.py Normal file
View File

@@ -0,0 +1,110 @@
#!/usr/bin/env python3
"""
구매 수량 계산기 테스트
특히 파이프 절단 손실 + 6M 단위 계산 테스트
"""
import sys
import os
sys.path.append('backend')
def test_pipe_calculation():
"""파이프 절단 손실 + 6M 단위 계산 테스트"""
from app.services.purchase_calculator import calculate_pipe_purchase_quantity
print("🔧 PIPE 구매 수량 계산 테스트\n")
# 테스트 케이스들
test_cases = [
{
"name": "25,000mm 필요 (10회 절단)",
"materials": [
{"length_mm": 3000, "original_description": "PIPE 4\" SCH40 - 3M", "quantity": 1},
{"length_mm": 2500, "original_description": "PIPE 4\" SCH40 - 2.5M", "quantity": 1},
{"length_mm": 1800, "original_description": "PIPE 4\" SCH40 - 1.8M", "quantity": 1},
{"length_mm": 4200, "original_description": "PIPE 4\" SCH40 - 4.2M", "quantity": 1},
{"length_mm": 2100, "original_description": "PIPE 4\" SCH40 - 2.1M", "quantity": 1},
{"length_mm": 1500, "original_description": "PIPE 4\" SCH40 - 1.5M", "quantity": 1},
{"length_mm": 3800, "original_description": "PIPE 4\" SCH40 - 3.8M", "quantity": 1},
{"length_mm": 2200, "original_description": "PIPE 4\" SCH40 - 2.2M", "quantity": 1},
{"length_mm": 1900, "original_description": "PIPE 4\" SCH40 - 1.9M", "quantity": 1},
{"length_mm": 2000, "original_description": "PIPE 4\" SCH40 - 2M", "quantity": 1}
],
"expected_pipes": 5
},
{
"name": "5,900mm 필요 (3회 절단)",
"materials": [
{"length_mm": 2000, "original_description": "PIPE 6\" SCH40 - 2M", "quantity": 1},
{"length_mm": 1900, "original_description": "PIPE 6\" SCH40 - 1.9M", "quantity": 1},
{"length_mm": 2000, "original_description": "PIPE 6\" SCH40 - 2M", "quantity": 1}
],
"expected_pipes": 1
},
{
"name": "12,000mm 정확히 (4회 절단)",
"materials": [
{"length_mm": 3000, "original_description": "PIPE 8\" SCH40 - 3M", "quantity": 4}
],
"expected_pipes": 2
}
]
for i, test_case in enumerate(test_cases, 1):
print(f"📋 테스트 {i}: {test_case['name']}")
result = calculate_pipe_purchase_quantity(test_case["materials"])
print(f" 🎯 BOM 총 길이: {result['bom_quantity']:,}mm")
print(f" ✂️ 절단 횟수: {result['cutting_count']}")
print(f" 📏 절단 손실: {result['cutting_loss']}mm (각 절단마다 3mm)")
print(f" 🔢 총 필요량: {result['required_length']:,}mm")
print(f" 📦 구매 파이프: {result['pipes_count']}본 (각 6M)")
print(f" 💰 구매 총량: {result['calculated_qty']:,}mm")
print(f" ♻️ 여유분: {result['waste_length']:,}mm")
print(f" 📊 활용률: {result['utilization_rate']:.1f}%")
# 결과 확인
if result['pipes_count'] == test_case['expected_pipes']:
print(f" ✅ 성공: 예상 {test_case['expected_pipes']}본 = 결과 {result['pipes_count']}")
else:
print(f" ❌ 실패: 예상 {test_case['expected_pipes']}본 ≠ 결과 {result['pipes_count']}")
print()
def test_standard_calculation():
"""일반 자재 구매 수량 계산 테스트"""
from app.services.purchase_calculator import calculate_standard_purchase_quantity
print("🔧 일반 자재 구매 수량 계산 테스트\n")
test_cases = [
{"category": "VALVE", "bom_qty": 2, "expected_factor": 1.5, "desc": "밸브 2개 (50% 예비품)"},
{"category": "BOLT", "bom_qty": 24, "expected_min": 50, "desc": "볼트 24개 (박스 단위 50개)"},
{"category": "FITTING", "bom_qty": 5, "expected_factor": 1.1, "desc": "피팅 5개 (10% 여유)"},
{"category": "GASKET", "bom_qty": 3, "expected_factor": 1.25, "desc": "가스켓 3개 (25% 교체 주기)"},
{"category": "INSTRUMENT", "bom_qty": 1, "expected_factor": 1.0, "desc": "계기 1개 (정확한 수량)"}
]
for i, test_case in enumerate(test_cases, 1):
print(f"📋 테스트 {i}: {test_case['desc']}")
result = calculate_standard_purchase_quantity(
test_case["category"],
test_case["bom_qty"]
)
print(f" 🎯 BOM 수량: {result['bom_quantity']}")
print(f" 📈 여유율: {result['safety_factor']:.2f} ({(result['safety_factor']-1)*100:.0f}%)")
print(f" 🔢 여유 적용: {result['safety_qty']:.1f}")
print(f" 📦 최소 주문: {result['min_order_qty']}")
print(f" 💰 최종 구매: {result['calculated_qty']}")
print(f" ♻️ 여유분: {result['waste_quantity']}")
print(f" 📊 활용률: {result['utilization_rate']:.1f}%")
print()
if __name__ == "__main__":
test_pipe_calculation()
test_standard_calculation()