555 lines
21 KiB
Python
555 lines
21 KiB
Python
from fastapi import FastAPI, UploadFile, File, Form
|
|
from fastapi.middleware.cors import CORSMiddleware
|
|
from sqlalchemy import text
|
|
from .database import get_db
|
|
from sqlalchemy.orm import Session
|
|
from fastapi import Depends
|
|
from typing import Optional, List, Dict
|
|
import os
|
|
import shutil
|
|
|
|
# FastAPI 앱 생성
|
|
app = FastAPI(
|
|
title="TK-MP BOM Management API",
|
|
description="자재 분류 및 프로젝트 관리 시스템",
|
|
version="1.0.0"
|
|
)
|
|
|
|
# CORS 설정
|
|
app.add_middleware(
|
|
CORSMiddleware,
|
|
allow_origins=["*"],
|
|
allow_credentials=True,
|
|
allow_methods=["*"],
|
|
allow_headers=["*"],
|
|
)
|
|
|
|
# 라우터들 import 및 등록
|
|
try:
|
|
from .routers import files
|
|
app.include_router(files.router, prefix="/files", tags=["files"])
|
|
except ImportError:
|
|
print("files 라우터를 찾을 수 없습니다")
|
|
|
|
try:
|
|
from .routers import jobs
|
|
app.include_router(jobs.router, prefix="/jobs", tags=["jobs"])
|
|
except ImportError:
|
|
print("jobs 라우터를 찾을 수 없습니다")
|
|
|
|
# 파일 목록 조회 API
|
|
@app.get("/files")
|
|
async def get_files(
|
|
job_no: Optional[str] = None, # project_id 대신 job_no 사용
|
|
show_history: bool = False, # 이력 표시 여부
|
|
db: Session = Depends(get_db)
|
|
):
|
|
"""파일 목록 조회 (BOM별 그룹화)"""
|
|
try:
|
|
if show_history:
|
|
# 전체 이력 표시
|
|
query = "SELECT * FROM files"
|
|
params = {}
|
|
|
|
if job_no:
|
|
query += " WHERE job_no = :job_no"
|
|
params["job_no"] = job_no
|
|
|
|
query += " ORDER BY original_filename, revision DESC"
|
|
else:
|
|
# 최신 리비전만 표시
|
|
if job_no:
|
|
query = """
|
|
SELECT f1.* FROM files f1
|
|
INNER JOIN (
|
|
SELECT original_filename, MAX(revision) as max_revision
|
|
FROM files
|
|
WHERE job_no = :job_no
|
|
GROUP BY original_filename
|
|
) f2 ON f1.original_filename = f2.original_filename
|
|
AND f1.revision = f2.max_revision
|
|
WHERE f1.job_no = :job_no
|
|
ORDER BY f1.upload_date DESC
|
|
"""
|
|
params = {"job_no": job_no}
|
|
else:
|
|
# job_no가 없으면 전체 파일 조회
|
|
query = "SELECT * FROM files ORDER BY upload_date DESC"
|
|
params = {}
|
|
|
|
result = db.execute(text(query), params)
|
|
files = result.fetchall()
|
|
|
|
return [
|
|
{
|
|
"id": f.id,
|
|
"filename": f.original_filename,
|
|
"original_filename": f.original_filename,
|
|
"name": f.original_filename,
|
|
"job_no": f.job_no, # job_no 사용
|
|
"bom_name": f.original_filename, # 파일명을 BOM 이름으로 사용
|
|
"bom_type": f.file_type or "unknown", # file_type을 BOM 종류로 사용
|
|
"status": "active" if f.is_active else "inactive", # is_active 상태
|
|
"file_size": f.file_size,
|
|
"created_at": f.upload_date,
|
|
"upload_date": f.upload_date,
|
|
"revision": f.revision or "Rev.0", # 실제 리비전 또는 기본값
|
|
"description": f"파일: {f.original_filename}"
|
|
}
|
|
for f in files
|
|
]
|
|
except Exception as e:
|
|
print(f"파일 목록 조회 에러: {str(e)}")
|
|
return {"error": f"파일 목록 조회 실패: {str(e)}"}
|
|
|
|
# 파일 삭제 API
|
|
@app.delete("/files/{file_id}")
|
|
async def delete_file(
|
|
file_id: int,
|
|
db: Session = Depends(get_db)
|
|
):
|
|
"""파일 삭제"""
|
|
try:
|
|
# 먼저 파일 정보 조회
|
|
file_query = text("SELECT * FROM files WHERE id = :file_id")
|
|
file_result = db.execute(file_query, {"file_id": file_id})
|
|
file = file_result.fetchone()
|
|
|
|
if not file:
|
|
return {"error": "파일을 찾을 수 없습니다"}
|
|
|
|
# 관련 자재 데이터 삭제
|
|
materials_query = text("DELETE FROM materials WHERE file_id = :file_id")
|
|
db.execute(materials_query, {"file_id": file_id})
|
|
|
|
# 파일 삭제
|
|
delete_query = text("DELETE FROM files WHERE id = :file_id")
|
|
db.execute(delete_query, {"file_id": file_id})
|
|
|
|
db.commit()
|
|
return {"success": True, "message": "파일과 관련 데이터가 삭제되었습니다"}
|
|
except Exception as e:
|
|
db.rollback()
|
|
return {"error": f"파일 삭제 실패: {str(e)}"}
|
|
|
|
# 프로젝트 관리 API (비활성화 - jobs 테이블 사용)
|
|
# projects 테이블은 더 이상 사용하지 않음
|
|
# ):
|
|
# """프로젝트 수정"""
|
|
# try:
|
|
# update_query = text("""
|
|
# UPDATE projects
|
|
# SET project_name = :project_name, status = :status
|
|
# WHERE id = :project_id
|
|
# """)
|
|
#
|
|
# db.execute(update_query, {
|
|
# "project_id": project_id,
|
|
# "project_name": project_data["project_name"],
|
|
# "status": project_data["status"]
|
|
# })
|
|
#
|
|
# db.commit()
|
|
# return {"success": True}
|
|
# except Exception as e:
|
|
# db.rollback()
|
|
# return {"error": f"프로젝트 수정 실패: {str(e)}"}
|
|
|
|
# @app.delete("/projects/{project_id}")
|
|
# async def delete_project(
|
|
# project_id: int,
|
|
# db: Session = Depends(get_db)
|
|
# ):
|
|
# """프로젝트 삭제"""
|
|
# try:
|
|
# delete_query = text("DELETE FROM projects WHERE id = :project_id")
|
|
# db.execute(delete_query, {"project_id": project_id})
|
|
# db.commit()
|
|
# return {"success": True}
|
|
# except Exception as e:
|
|
# db.rollback()
|
|
# return {"error": f"프로젝트 삭제 실패: {str(e)}"}
|
|
|
|
@app.get("/")
|
|
async def root():
|
|
return {
|
|
"message": "TK-MP BOM Management API",
|
|
"version": "1.0.0",
|
|
"endpoints": ["/docs", "/jobs", "/files", "/projects"]
|
|
}
|
|
|
|
# Jobs API
|
|
# @app.get("/jobs")
|
|
# async def get_jobs(db: Session = Depends(get_db)):
|
|
# """Jobs 목록 조회"""
|
|
# try:
|
|
# # jobs 테이블에서 데이터 조회
|
|
# query = text("""
|
|
# SELECT
|
|
# job_no,
|
|
# job_name,
|
|
# client_name,
|
|
# end_user,
|
|
# epc_company,
|
|
# status,
|
|
# created_at
|
|
# FROM jobs
|
|
# WHERE is_active = true
|
|
# ORDER BY created_at DESC
|
|
# """)
|
|
#
|
|
# result = db.execute(query)
|
|
# jobs = result.fetchall()
|
|
#
|
|
# return [
|
|
# {
|
|
# "job_no": job.job_no,
|
|
# "job_name": job.job_name,
|
|
# "client_name": job.client_name,
|
|
# "end_user": job.end_user,
|
|
# "epc_company": job.epc_company,
|
|
# "status": job.status or "진행중",
|
|
# "created_at": job.created_at
|
|
# }
|
|
# for job in jobs
|
|
# ]
|
|
# except Exception as e:
|
|
# print(f"Jobs 조회 에러: {str(e)}")
|
|
# return {"error": f"Jobs 조회 실패: {str(e)}"}
|
|
|
|
# 파일 업로드 API
|
|
@app.post("/upload")
|
|
async def upload_file(
|
|
file: UploadFile = File(...),
|
|
job_no: str = Form(...), # project_id 대신 job_no 사용
|
|
bom_name: str = Form(""), # BOM 이름 추가
|
|
bom_type: str = Form(""),
|
|
revision: str = Form("Rev.0"),
|
|
parent_bom_id: Optional[int] = Form(None),
|
|
description: str = Form(""),
|
|
db: Session = Depends(get_db)
|
|
):
|
|
"""파일 업로드 및 자재 분류 (자동 리비전 관리)"""
|
|
try:
|
|
print("=== main.py 업로드 API 호출됨 ===")
|
|
print(f"파일명: {file.filename}")
|
|
print(f"job_no: {job_no}")
|
|
print(f"bom_name: {bom_name}")
|
|
print(f"bom_type: {bom_type}")
|
|
|
|
# job_no로 job 확인
|
|
job_query = text("SELECT job_no FROM jobs WHERE job_no = :job_no AND is_active = true")
|
|
job_result = db.execute(job_query, {"job_no": job_no})
|
|
job = job_result.fetchone()
|
|
|
|
if not job:
|
|
return {"error": f"Job No. '{job_no}'에 해당하는 작업을 찾을 수 없습니다."}
|
|
|
|
# 업로드 디렉토리 생성
|
|
upload_dir = "uploads"
|
|
os.makedirs(upload_dir, exist_ok=True)
|
|
|
|
# 파일 저장
|
|
if file.filename:
|
|
file_path = os.path.join(upload_dir, file.filename)
|
|
print(f"파일 저장 경로: {file_path}")
|
|
print(f"원본 파일명: {file.filename}")
|
|
with open(file_path, "wb") as buffer:
|
|
shutil.copyfileobj(file.file, buffer)
|
|
print(f"파일 저장 완료: {file_path}")
|
|
else:
|
|
return {"error": "파일명이 없습니다."}
|
|
|
|
# 파일 크기 계산
|
|
file_size = os.path.getsize(file_path)
|
|
|
|
# 파일 타입 결정
|
|
file_type = "excel" if file.filename.endswith(('.xls', '.xlsx')) else "csv" if file.filename.endswith('.csv') else "unknown"
|
|
|
|
# BOM 종류별 자동 리비전 관리
|
|
if bom_type and not parent_bom_id:
|
|
# 같은 job_no의 같은 파일명에 대한 최신 리비전 조회
|
|
latest_revision_query = text("""
|
|
SELECT revision FROM files
|
|
WHERE job_no = :job_no AND original_filename = :filename
|
|
ORDER BY revision DESC LIMIT 1
|
|
""")
|
|
|
|
result = db.execute(latest_revision_query, {
|
|
"job_no": job_no,
|
|
"filename": file.filename
|
|
})
|
|
|
|
latest_file = result.fetchone()
|
|
if latest_file:
|
|
# 기존 리비전이 있으면 다음 리비전 번호 생성
|
|
current_rev = latest_file.revision
|
|
if current_rev.startswith("Rev."):
|
|
try:
|
|
rev_num = int(current_rev.replace("Rev.", ""))
|
|
revision = f"Rev.{rev_num + 1}"
|
|
except ValueError:
|
|
revision = "Rev.1"
|
|
else:
|
|
revision = "Rev.1"
|
|
else:
|
|
# 첫 번째 업로드인 경우 Rev.0
|
|
revision = "Rev.0"
|
|
|
|
# 데이터베이스에 파일 정보 저장
|
|
insert_query = text("""
|
|
INSERT INTO files (
|
|
job_no, filename, original_filename, file_path,
|
|
file_size, upload_date, revision, file_type, uploaded_by
|
|
) VALUES (
|
|
:job_no, :filename, :original_filename, :file_path,
|
|
:file_size, NOW(), :revision, :file_type, :uploaded_by
|
|
) RETURNING id
|
|
""")
|
|
|
|
result = db.execute(insert_query, {
|
|
"job_no": job_no,
|
|
"filename": file.filename,
|
|
"original_filename": file.filename,
|
|
"file_path": file_path,
|
|
"file_size": file_size,
|
|
"revision": revision,
|
|
"file_type": file_type,
|
|
"uploaded_by": "system"
|
|
})
|
|
|
|
file_id = result.fetchone()[0]
|
|
|
|
# 1차: 파일 파싱 (CSV/Excel 파일 읽기)
|
|
materials_data = parse_file(file_path)
|
|
|
|
# 2차: 각 자재를 분류기로 분류
|
|
classified_materials = []
|
|
|
|
# 리비전 업로드인 경우 기존 분류 정보 가져오기
|
|
existing_classifications = {}
|
|
if parent_bom_id:
|
|
parent_materials = db.execute(
|
|
text("SELECT original_description, classified_category, classified_subcategory, material_grade, schedule, size_spec FROM materials WHERE file_id = :file_id"),
|
|
{"file_id": parent_bom_id}
|
|
).fetchall()
|
|
|
|
for material in parent_materials:
|
|
existing_classifications[material.original_description] = {
|
|
"classified_category": material.classified_category,
|
|
"classified_subcategory": material.classified_subcategory,
|
|
"material_grade": material.material_grade,
|
|
"schedule": material.schedule,
|
|
"size_spec": material.size_spec
|
|
}
|
|
|
|
for material in materials_data:
|
|
# 리비전 업로드인 경우 기존 분류 사용, 아니면 새로 분류
|
|
if parent_bom_id and material.get("original_description") in existing_classifications:
|
|
existing_class = existing_classifications[material.get("original_description")]
|
|
classified_material = {
|
|
**material,
|
|
**existing_class,
|
|
"classification_confidence": 1.0 # 기존 분류이므로 높은 신뢰도
|
|
}
|
|
else:
|
|
classified_material = classify_material_item(material)
|
|
classified_materials.append(classified_material)
|
|
|
|
# 3차: 분류된 자재를 데이터베이스에 저장
|
|
for material in classified_materials:
|
|
insert_material_query = text("""
|
|
INSERT INTO materials (
|
|
file_id, line_number, original_description,
|
|
classified_category, classified_subcategory,
|
|
material_grade, schedule, size_spec,
|
|
quantity, unit, drawing_name, area_code, line_no,
|
|
classification_confidence, is_verified, created_at
|
|
) VALUES (
|
|
:file_id, :line_number, :original_description,
|
|
:classified_category, :classified_subcategory,
|
|
:material_grade, :schedule, :size_spec,
|
|
:quantity, :unit, :drawing_name, :area_code, :line_no,
|
|
:classification_confidence, :is_verified, NOW()
|
|
)
|
|
""")
|
|
|
|
db.execute(insert_material_query, {
|
|
"file_id": file_id,
|
|
"line_number": material.get("line_number", 0),
|
|
"original_description": material.get("original_description", ""),
|
|
"classified_category": material.get("classified_category", ""),
|
|
"classified_subcategory": material.get("classified_subcategory", ""),
|
|
"material_grade": material.get("material_grade", ""),
|
|
"schedule": material.get("schedule", ""),
|
|
"size_spec": material.get("size_spec", ""),
|
|
"quantity": material.get("quantity", 0),
|
|
"unit": material.get("unit", ""),
|
|
"drawing_name": material.get("drawing_name", ""),
|
|
"area_code": material.get("area_code", ""),
|
|
"line_no": material.get("line_no", ""),
|
|
"classification_confidence": material.get("classification_confidence", 0.0),
|
|
"is_verified": False
|
|
})
|
|
|
|
db.commit()
|
|
|
|
return {
|
|
"success": True,
|
|
"file_id": file_id,
|
|
"filename": file.filename,
|
|
"materials_count": len(classified_materials),
|
|
"revision": revision,
|
|
"message": f"파일이 성공적으로 업로드되고 {len(classified_materials)}개의 자재가 분류되었습니다. (리비전: {revision})"
|
|
}
|
|
except Exception as e:
|
|
db.rollback()
|
|
print(f"업로드 실패: {str(e)}")
|
|
# HTTP 400 에러로 변경
|
|
from fastapi import HTTPException
|
|
raise HTTPException(status_code=400, detail=f"파일 업로드 및 분류 실패: {str(e)}")
|
|
|
|
def parse_file(file_path: str) -> List[Dict]:
|
|
"""파일 파싱 (CSV/Excel)"""
|
|
import pandas as pd
|
|
import os
|
|
|
|
try:
|
|
print(f"parse_file 호출됨: {file_path}")
|
|
print(f"파일 존재 여부: {os.path.exists(file_path)}")
|
|
print(f"파일 확장자: {os.path.splitext(file_path)[1]}")
|
|
|
|
# 파일 확장자를 소문자로 변환하여 검증
|
|
file_extension = os.path.splitext(file_path)[1].lower()
|
|
print(f"소문자 변환된 확장자: {file_extension}")
|
|
|
|
if file_extension == '.csv':
|
|
df = pd.read_csv(file_path)
|
|
elif file_extension in ['.xls', '.xlsx']:
|
|
df = pd.read_excel(file_path)
|
|
else:
|
|
print(f"지원되지 않는 파일 형식: {file_path}")
|
|
print(f"파일 확장자: {file_extension}")
|
|
raise ValueError("지원하지 않는 파일 형식입니다.")
|
|
|
|
print(f"파일 파싱 시작: {file_path}")
|
|
print(f"데이터프레임 형태: {df.shape}")
|
|
print(f"컬럼명: {list(df.columns)}")
|
|
|
|
# 컬럼명 매핑 (대소문자 구분 없이)
|
|
column_mapping = {
|
|
'description': ['DESCRIPTION', 'Description', 'description', 'DESC', 'Desc', 'desc', 'ITEM', 'Item', 'item'],
|
|
'quantity': ['QTY', 'Quantity', 'quantity', 'QTY.', 'Qty', 'qty', 'AMOUNT', 'Amount', 'amount'],
|
|
'unit': ['UNIT', 'Unit', 'unit', 'UOM', 'Uom', 'uom'],
|
|
'drawing': ['DRAWING', 'Drawing', 'drawing', 'DWG', 'Dwg', 'dwg'],
|
|
'area': ['AREA', 'Area', 'area', 'AREA_CODE', 'Area_Code', 'area_code'],
|
|
'line': ['LINE', 'Line', 'line', 'LINE_NO', 'Line_No', 'line_no', 'PIPELINE', 'Pipeline', 'pipeline']
|
|
}
|
|
|
|
# 실제 컬럼명 찾기
|
|
found_columns = {}
|
|
for target_col, possible_names in column_mapping.items():
|
|
for col_name in possible_names:
|
|
if col_name in df.columns:
|
|
found_columns[target_col] = col_name
|
|
break
|
|
|
|
print(f"찾은 컬럼 매핑: {found_columns}")
|
|
|
|
materials = []
|
|
for index, row in df.iterrows():
|
|
# 빈 행 건너뛰기
|
|
if row.isna().all():
|
|
continue
|
|
|
|
# 안전한 값 추출
|
|
description = str(row.get(found_columns.get('description', ''), '') or '')
|
|
quantity_raw = row.get(found_columns.get('quantity', 1), 1)
|
|
quantity = float(quantity_raw) if quantity_raw is not None else 1.0
|
|
unit = str(row.get(found_columns.get('unit', 'EA'), 'EA') or 'EA')
|
|
drawing = str(row.get(found_columns.get('drawing', ''), '') or '')
|
|
area = str(row.get(found_columns.get('area', ''), '') or '')
|
|
line = str(row.get(found_columns.get('line', ''), '') or '')
|
|
|
|
material = {
|
|
"line_number": index + 1,
|
|
"original_description": description,
|
|
"quantity": quantity,
|
|
"unit": unit,
|
|
"drawing_name": drawing,
|
|
"area_code": area,
|
|
"line_no": line
|
|
}
|
|
|
|
# 빈 설명은 건너뛰기
|
|
if not material["original_description"] or material["original_description"].strip() == '':
|
|
continue
|
|
|
|
materials.append(material)
|
|
|
|
print(f"파싱된 자재 수: {len(materials)}")
|
|
if materials:
|
|
print(f"첫 번째 자재 예시: {materials[0]}")
|
|
|
|
return materials
|
|
except Exception as e:
|
|
print(f"파일 파싱 오류: {str(e)}")
|
|
raise Exception(f"파일 파싱 실패: {str(e)}")
|
|
|
|
def classify_material_item(material: Dict) -> Dict:
|
|
"""개별 자재 분류"""
|
|
from .services import (
|
|
pipe_classifier, fitting_classifier, bolt_classifier,
|
|
valve_classifier, instrument_classifier, flange_classifier,
|
|
gasket_classifier, material_classifier
|
|
)
|
|
|
|
description = material.get("original_description", "")
|
|
|
|
# 각 분류기로 분류 시도
|
|
classifiers = [
|
|
("PIPE", pipe_classifier.classify_pipe),
|
|
("FITTING", fitting_classifier.classify_fitting),
|
|
("BOLT", bolt_classifier.classify_bolt),
|
|
("VALVE", valve_classifier.classify_valve),
|
|
("INSTRUMENT", instrument_classifier.classify_instrument),
|
|
("FLANGE", flange_classifier.classify_flange),
|
|
("GASKET", gasket_classifier.classify_gasket)
|
|
]
|
|
|
|
best_result = None
|
|
best_confidence = 0.0
|
|
|
|
for category, classifier_func in classifiers:
|
|
try:
|
|
result = classifier_func(description)
|
|
if result and result.get("confidence", 0) > best_confidence:
|
|
best_result = result
|
|
best_confidence = result.get("confidence", 0)
|
|
except Exception:
|
|
continue
|
|
|
|
# 재질 분류
|
|
material_result = material_classifier.classify_material(description)
|
|
|
|
# 최종 결과 조합
|
|
final_result = {
|
|
**material,
|
|
"classified_category": best_result.get("category", "UNKNOWN") if best_result else "UNKNOWN",
|
|
"classified_subcategory": best_result.get("subcategory", "") if best_result else "",
|
|
"material_grade": material_result.get("grade", "") if material_result else "",
|
|
"schedule": best_result.get("schedule", "") if best_result else "",
|
|
"size_spec": best_result.get("size_spec", "") if best_result else "",
|
|
"classification_confidence": best_confidence
|
|
}
|
|
|
|
return final_result
|
|
|
|
@app.get("/health")
|
|
async def health_check():
|
|
return {"status": "healthy", "timestamp": "2024-07-15"}
|
|
|
|
if __name__ == "__main__":
|
|
import uvicorn
|
|
uvicorn.run(app, host="0.0.0.0", port=8000, reload=True)
|