Files
TK-BOM-Project/backend/app/routers/purchase_request.py
hyungi b9442928da 🔧 자재 분류기 개선 및 ELL 키워드 분류 문제 해결 (테스트 필요 - 안되는거 같음)
분류기 개선사항:
1. ELL-O-LET vs 일반 엘보 분류 개선
   - OLET 우선순위 확인 로직 추가
   - ELL 키워드 충돌 문제 해결

2. 엘보 서브타입 강화
   - 90DEG_LONG_RADIUS, 90DEG_SHORT_RADIUS 등 조합형 추가
   - 더 구체적인 키워드 패턴 지원

3. 레듀스 플랜지 분류 개선
   - REDUCING FLANGE가 FITTING이 아닌 FLANGE로 분류되도록 수정
   - 특별 우선순위 로직 추가

4. 90 ELL SW 분류 문제 해결
   - fitting_keywords에 ELL 키워드 추가
   - ELBOW description_keywords에 ELL, 90 ELL, 45 ELL 추가

기술적 개선:
- 키워드 우선순위 체계 강화
- 구체적인 패턴 매칭 개선
- 분류 신뢰도 향상

플랜지 카테고리 개선:
- 타입 풀네임 표시 (WN → WELD NECK FLANGE)
- 끝단처리 별도 컬럼 추가 (RF → RAISED FACE)
- 엑셀 내보내기 구조 개선 (P열 납기일, 관리항목 4개)
2025-10-15 17:43:10 +09:00

613 lines
24 KiB
Python

"""
구매신청 관리 API
"""
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.responses import FileResponse
from pydantic import BaseModel
from sqlalchemy import text
from sqlalchemy.orm import Session
from typing import Optional, List, Dict
from datetime import datetime
import os
import json
from ..database import get_db
from ..auth.middleware import get_current_user
from ..utils.logger import get_logger
logger = get_logger(__name__)
router = APIRouter(prefix="/purchase-request", tags=["Purchase Request"])
# 엑셀 파일 저장 경로
EXCEL_DIR = "exports"
os.makedirs(EXCEL_DIR, exist_ok=True)
class PurchaseRequestCreate(BaseModel):
file_id: int
job_no: Optional[str] = None
category: Optional[str] = None
material_ids: List[int] = []
materials_data: List[Dict] = []
grouped_materials: Optional[List[Dict]] = [] # 그룹화된 자재 정보
@router.post("/create")
async def create_purchase_request(
request_data: PurchaseRequestCreate,
current_user: dict = Depends(get_current_user),
db: Session = Depends(get_db)
):
"""
구매신청 생성 (엑셀 내보내기 = 구매신청)
"""
try:
# 구매신청 번호 생성
today = datetime.now().strftime('%Y%m%d')
count_query = text("""
SELECT COUNT(*) as count
FROM purchase_requests
WHERE request_no LIKE :pattern
""")
count = db.execute(count_query, {"pattern": f"PR-{today}%"}).fetchone().count
request_no = f"PR-{today}-{str(count + 1).zfill(3)}"
# 자재 데이터를 JSON과 엑셀 파일로 저장
json_filename = f"{request_no}.json"
excel_filename = f"{request_no}.xlsx"
json_path = os.path.join(EXCEL_DIR, json_filename)
excel_path = os.path.join(EXCEL_DIR, excel_filename)
# JSON 저장
save_materials_data(
request_data.materials_data,
json_path,
request_no,
request_data.job_no,
request_data.grouped_materials # 그룹화 정보 추가
)
# 엑셀 파일 생성 및 저장
create_excel_file(
request_data.grouped_materials or request_data.materials_data,
excel_path,
request_no,
request_data.job_no
)
# 구매신청 레코드 생성
insert_request = text("""
INSERT INTO purchase_requests (
request_no, file_id, job_no, category,
material_count, excel_file_path, requested_by
) VALUES (
:request_no, :file_id, :job_no, :category,
:material_count, :excel_file_path, :requested_by
) RETURNING request_id
""")
result = db.execute(insert_request, {
"request_no": request_no,
"file_id": request_data.file_id,
"job_no": request_data.job_no,
"category": request_data.category,
"material_count": len(request_data.material_ids),
"excel_file_path": excel_filename, # 엑셀 파일명 저장 (JSON 대신)
"requested_by": current_user.get("user_id")
})
request_id = result.fetchone().request_id
# 구매신청 자재 상세 저장
logger.info(f"Processing {len(request_data.material_ids)} material IDs for purchase request {request_no}")
logger.info(f"First 10 Material IDs: {request_data.material_ids[:10]}") # 처음 10개만 로그
logger.info(f"Category: {request_data.category}, Job: {request_data.job_no}")
inserted_count = 0
for i, material_id in enumerate(request_data.material_ids):
material_data = request_data.materials_data[i] if i < len(request_data.materials_data) else {}
# 이미 구매신청된 자재인지 확인
check_existing = text("""
SELECT 1 FROM purchase_request_items
WHERE material_id = :material_id
""")
existing = db.execute(check_existing, {"material_id": material_id}).fetchone()
if not existing:
insert_item = text("""
INSERT INTO purchase_request_items (
request_id, material_id, quantity, unit, user_requirement
) VALUES (
:request_id, :material_id, :quantity, :unit, :user_requirement
)
""")
# quantity를 정수로 변환 (소수점 제거)
quantity_str = str(material_data.get("quantity", 0))
try:
quantity = int(float(quantity_str))
except (ValueError, TypeError):
quantity = 0
db.execute(insert_item, {
"request_id": request_id,
"material_id": material_id,
"quantity": quantity,
"unit": material_data.get("unit", ""),
"user_requirement": material_data.get("user_requirement", "")
})
inserted_count += 1
else:
logger.warning(f"Material {material_id} already in another purchase request, skipping")
db.commit()
logger.info(f"Purchase request created: {request_no} with {inserted_count} materials (out of {len(request_data.material_ids)} requested)")
# 실제 저장된 자재 확인
verify_query = text("""
SELECT COUNT(*) as count FROM purchase_request_items WHERE request_id = :request_id
""")
verified_count = db.execute(verify_query, {"request_id": request_id}).fetchone().count
logger.info(f"✅ DB 검증: purchase_request_items에 {verified_count}개 저장됨")
return {
"success": True,
"request_no": request_no,
"request_id": request_id,
"material_count": len(request_data.material_ids),
"inserted_count": inserted_count,
"verified_count": verified_count,
"message": f"구매신청 {request_no}이 생성되었습니다"
}
except Exception as e:
db.rollback()
logger.error(f"Failed to create purchase request: {str(e)}")
raise HTTPException(
status_code=500,
detail=f"구매신청 생성 실패: {str(e)}"
)
@router.get("/list")
async def get_purchase_requests(
file_id: Optional[int] = None,
job_no: Optional[str] = None,
status: Optional[str] = None,
# current_user: dict = Depends(get_current_user),
db: Session = Depends(get_db)
):
"""
구매신청 목록 조회
"""
try:
query = text("""
SELECT
pr.request_id,
pr.request_no,
pr.file_id,
pr.job_no,
pr.category,
pr.material_count,
pr.excel_file_path,
pr.requested_at,
pr.status,
u.name as requested_by,
f.original_filename,
j.job_name,
COUNT(pri.item_id) as item_count
FROM purchase_requests pr
LEFT JOIN users u ON pr.requested_by = u.user_id
LEFT JOIN files f ON pr.file_id = f.id
LEFT JOIN jobs j ON pr.job_no = j.job_no
LEFT JOIN purchase_request_items pri ON pr.request_id = pri.request_id
WHERE 1=1
AND (:file_id IS NULL OR pr.file_id = :file_id)
AND (:job_no IS NULL OR pr.job_no = :job_no)
AND (:status IS NULL OR pr.status = :status)
GROUP BY
pr.request_id, pr.request_no, pr.file_id, pr.job_no,
pr.category, pr.material_count, pr.excel_file_path,
pr.requested_at, pr.status, u.name, f.original_filename, j.job_name
ORDER BY pr.requested_at DESC
""")
results = db.execute(query, {
"file_id": file_id,
"job_no": job_no,
"status": status
}).fetchall()
requests = []
for row in results:
requests.append({
"request_id": row.request_id,
"request_no": row.request_no,
"file_id": row.file_id,
"job_no": row.job_no,
"job_name": row.job_name,
"category": row.category,
"material_count": row.material_count,
"item_count": row.item_count,
"excel_file_path": row.excel_file_path,
"requested_at": row.requested_at.isoformat() if row.requested_at else None,
"status": row.status,
"requested_by": row.requested_by,
"source_file": row.original_filename
})
return {
"success": True,
"requests": requests,
"count": len(requests)
}
except Exception as e:
logger.error(f"Failed to get purchase requests: {str(e)}")
raise HTTPException(
status_code=500,
detail=f"구매신청 목록 조회 실패: {str(e)}"
)
@router.get("/{request_id}/materials")
async def get_request_materials(
request_id: int,
# current_user: dict = Depends(get_current_user),
db: Session = Depends(get_db)
):
"""
구매신청에 포함된 자재 목록 조회 (그룹화 정보 포함)
"""
try:
# 구매신청 정보 조회하여 JSON 파일 경로 가져오기
info_query = text("""
SELECT excel_file_path
FROM purchase_requests
WHERE request_id = :request_id
""")
info_result = db.execute(info_query, {"request_id": request_id}).fetchone()
grouped_materials = []
if info_result and info_result.excel_file_path:
json_path = os.path.join(EXCEL_DIR, info_result.excel_file_path)
if os.path.exists(json_path):
try:
with open(json_path, 'r', encoding='utf-8', errors='ignore') as f:
data = json.load(f)
grouped_materials = data.get("grouped_materials", [])
except Exception as e:
print(f"⚠️ JSON 파일 읽기 오류 (무시): {e}")
grouped_materials = []
# 개별 자재 정보 조회 (기존 코드)
query = text("""
SELECT
pri.item_id,
pri.material_id,
pri.quantity as requested_quantity,
pri.unit as requested_unit,
pri.user_requirement,
pri.is_ordered,
pri.is_received,
m.original_description,
m.classified_category,
m.size_spec,
m.main_nom,
m.red_nom,
m.schedule,
m.material_grade,
m.full_material_grade,
m.quantity as original_quantity,
m.unit as original_unit,
m.classification_details,
pd.outer_diameter, pd.schedule as pipe_schedule, pd.material_spec, pd.manufacturing_method,
pd.end_preparation, pd.length_mm,
fd.fitting_type, fd.fitting_subtype, fd.connection_method as fitting_connection,
fd.pressure_rating as fitting_pressure, fd.schedule as fitting_schedule,
fld.flange_type, fld.facing_type,
fld.pressure_rating as flange_pressure,
gd.gasket_type, gd.gasket_subtype, gd.material_type as gasket_material,
gd.filler_material, gd.pressure_rating as gasket_pressure, gd.thickness as gasket_thickness,
bd.bolt_type, bd.material_standard as bolt_material, bd.length as bolt_length
FROM purchase_request_items pri
JOIN materials m ON pri.material_id = m.id
LEFT JOIN pipe_details pd ON m.id = pd.material_id
LEFT JOIN fitting_details fd ON m.id = fd.material_id
LEFT JOIN flange_details fld ON m.id = fld.material_id
LEFT JOIN gasket_details gd ON m.id = gd.material_id
LEFT JOIN bolt_details bd ON m.id = bd.material_id
WHERE pri.request_id = :request_id
ORDER BY m.classified_category, m.original_description
""")
# 🎯 데이터베이스 쿼리 실행
results = db.execute(query, {"request_id": request_id}).fetchall()
materials = []
# 🎯 안전한 문자열 변환 함수
def safe_str(value):
if value is None:
return ''
try:
if isinstance(value, bytes):
return value.decode('utf-8', errors='ignore')
return str(value)
except Exception:
return str(value) if value else ''
for row in results:
try:
# quantity를 정수로 변환 (소수점 제거)
qty = row.requested_quantity or row.original_quantity
try:
qty_int = int(float(qty)) if qty else 0
except (ValueError, TypeError):
qty_int = 0
# 안전한 문자열 변환
original_description = safe_str(row.original_description)
size_spec = safe_str(row.size_spec)
material_grade = safe_str(row.material_grade)
full_material_grade = safe_str(row.full_material_grade)
user_requirement = safe_str(row.user_requirement)
except Exception as e:
# 오류 발생 시 기본값 사용
qty_int = 0
original_description = ''
size_spec = ''
material_grade = ''
full_material_grade = ''
user_requirement = ''
# BOM 페이지와 동일한 형식으로 데이터 구성
material_dict = {
"item_id": row.item_id,
"material_id": row.material_id,
"id": row.material_id,
"original_description": original_description,
"classified_category": safe_str(row.classified_category),
"size_spec": size_spec,
"size_inch": safe_str(row.main_nom),
"main_nom": safe_str(row.main_nom),
"red_nom": safe_str(row.red_nom),
"schedule": safe_str(row.schedule),
"material_grade": material_grade,
"full_material_grade": full_material_grade,
"quantity": qty_int,
"unit": safe_str(row.requested_unit or row.original_unit),
"user_requirement": user_requirement,
"is_ordered": row.is_ordered,
"is_received": row.is_received,
"classification_details": safe_str(row.classification_details)
}
# 카테고리별 상세 정보 추가 (안전한 문자열 처리)
if row.classified_category == 'PIPE' and row.manufacturing_method:
material_dict["pipe_details"] = {
"manufacturing_method": safe_str(row.manufacturing_method),
"schedule": safe_str(row.pipe_schedule),
"material_spec": safe_str(row.material_spec),
"end_preparation": safe_str(row.end_preparation),
"length_mm": row.length_mm
}
elif row.classified_category == 'FITTING' and row.fitting_type:
material_dict["fitting_details"] = {
"fitting_type": safe_str(row.fitting_type),
"fitting_subtype": safe_str(row.fitting_subtype),
"connection_method": safe_str(row.fitting_connection),
"pressure_rating": safe_str(row.fitting_pressure),
"schedule": safe_str(row.fitting_schedule)
}
elif row.classified_category == 'FLANGE' and row.flange_type:
material_dict["flange_details"] = {
"flange_type": safe_str(row.flange_type),
"facing_type": safe_str(row.facing_type),
"pressure_rating": safe_str(row.flange_pressure)
}
elif row.classified_category == 'GASKET' and row.gasket_type:
material_dict["gasket_details"] = {
"gasket_type": safe_str(row.gasket_type),
"gasket_subtype": safe_str(row.gasket_subtype),
"material_type": safe_str(row.gasket_material),
"filler_material": safe_str(row.filler_material),
"pressure_rating": safe_str(row.gasket_pressure),
"thickness": safe_str(row.gasket_thickness)
}
elif row.classified_category == 'BOLT' and row.bolt_type:
material_dict["bolt_details"] = {
"bolt_type": safe_str(row.bolt_type),
"material_standard": safe_str(row.bolt_material),
"length": safe_str(row.bolt_length)
}
materials.append(material_dict)
return {
"success": True,
"materials": materials,
"grouped_materials": grouped_materials, # 그룹화 정보 추가
"count": len(grouped_materials) if grouped_materials else len(materials)
}
except Exception as e:
logger.error(f"Failed to get request materials: {str(e)}")
raise HTTPException(
status_code=500,
detail=f"구매신청 자재 조회 실패: {str(e)}"
)
@router.get("/{request_id}/download-excel")
async def download_request_excel(
request_id: int,
# current_user: dict = Depends(get_current_user),
db: Session = Depends(get_db)
):
"""
구매신청 엑셀 파일 직접 다운로드 (BOM 페이지에서 생성한 파일 그대로)
"""
from fastapi.responses import FileResponse
try:
# 구매신청 정보 조회
query = text("""
SELECT request_no, excel_file_path, job_no
FROM purchase_requests
WHERE request_id = :request_id
""")
result = db.execute(query, {"request_id": request_id}).fetchone()
if not result:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="구매신청을 찾을 수 없습니다"
)
excel_file_path = os.path.join(EXCEL_DIR, result.excel_file_path)
if not os.path.exists(excel_file_path):
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="엑셀 파일을 찾을 수 없습니다"
)
# 엑셀 파일 직접 다운로드
return FileResponse(
path=excel_file_path,
filename=f"{result.job_no}_{result.request_no}.xlsx",
media_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Failed to download request excel: {str(e)}")
raise HTTPException(
status_code=500,
detail=f"엑셀 다운로드 실패: {str(e)}"
)
def save_materials_data(materials_data: List[Dict], file_path: str, request_no: str, job_no: str, grouped_materials: List[Dict] = None):
"""
자재 데이터를 JSON으로 저장 (프론트엔드에서 동일한 엑셀 포맷으로 생성하기 위해)
"""
# 수량을 정수로 변환하여 저장
cleaned_materials = []
for material in materials_data:
cleaned_material = material.copy()
if 'quantity' in cleaned_material:
try:
cleaned_material['quantity'] = int(float(cleaned_material['quantity']))
except (ValueError, TypeError):
cleaned_material['quantity'] = 0
cleaned_materials.append(cleaned_material)
# 그룹화된 자재도 수량 정수 변환
cleaned_grouped = []
if grouped_materials:
for group in grouped_materials:
cleaned_group = group.copy()
if 'quantity' in cleaned_group:
try:
cleaned_group['quantity'] = int(float(cleaned_group['quantity']))
except (ValueError, TypeError):
cleaned_group['quantity'] = 0
cleaned_grouped.append(cleaned_group)
data_to_save = {
"request_no": request_no,
"job_no": job_no,
"created_at": datetime.now().isoformat(),
"materials": cleaned_materials,
"grouped_materials": cleaned_grouped or []
}
with open(file_path, 'w', encoding='utf-8') as f:
json.dump(data_to_save, f, ensure_ascii=False, indent=2)
def create_excel_file(materials_data: List[Dict], file_path: str, request_no: str, job_no: str):
"""
자재 데이터로 엑셀 파일 생성 (BOM 페이지와 동일한 형식)
"""
import openpyxl
from openpyxl.styles import Font, PatternFill, Alignment, Border, Side
# 새 워크북 생성
wb = openpyxl.Workbook()
wb.remove(wb.active) # 기본 시트 제거
# 카테고리별 그룹화
category_groups = {}
for material in materials_data:
category = material.get('category', 'UNKNOWN')
if category not in category_groups:
category_groups[category] = []
category_groups[category].append(material)
# 각 카테고리별 시트 생성
for category, items in category_groups.items():
if not items:
continue
ws = wb.create_sheet(title=category)
# 헤더 정의 (P열에 납기일, 관리항목 통일)
headers = ['TAGNO', '품목명', '수량', '통화구분', '단가', '크기', '압력등급', '스케줄',
'재질', '상세내역', '사용자요구', '관리항목1', '관리항목2', '관리항목3',
'관리항목4', '납기일(YYYY-MM-DD)', '관리항목5', '관리항목6', '관리항목7',
'관리항목8', '관리항목9', '관리항목10']
# 헤더 작성
for col, header in enumerate(headers, 1):
cell = ws.cell(row=1, column=col, value=header)
cell.font = Font(bold=True, color="000000", size=12, name="맑은 고딕")
cell.fill = PatternFill(start_color="B3D9FF", end_color="B3D9FF", fill_type="solid")
cell.alignment = Alignment(horizontal="center", vertical="center")
cell.border = Border(
top=Side(style="thin", color="666666"),
bottom=Side(style="thin", color="666666"),
left=Side(style="thin", color="666666"),
right=Side(style="thin", color="666666")
)
# 데이터 작성
for row_idx, material in enumerate(items, 2):
data = [
'', # TAGNO
category, # 품목명
material.get('quantity', 0), # 수량
'KRW', # 통화구분
1, # 단가
material.get('size', '-'), # 크기
'-', # 압력등급 (추후 개선)
material.get('schedule', '-'), # 스케줄
material.get('material_grade', '-'), # 재질
'-', # 상세내역 (추후 개선)
material.get('user_requirement', ''), # 사용자요구
'', '', '', '', '', # 관리항목들
datetime.now().strftime('%Y-%m-%d') # 납기일
]
for col, value in enumerate(data, 1):
ws.cell(row=row_idx, column=col, value=value)
# 컬럼 너비 자동 조정
for column in ws.columns:
max_length = 0
column_letter = column[0].column_letter
for cell in column:
try:
if len(str(cell.value)) > max_length:
max_length = len(str(cell.value))
except:
pass
adjusted_width = min(max(max_length + 2, 10), 50)
ws.column_dimensions[column_letter].width = adjusted_width
# 파일 저장
wb.save(file_path)