Files
TK-BOM-Project/backend/app/routers/files.py
Hyungi Ahn 5f7a6f0b3a feat: 자재 분류 시스템 개선 및 상세 테이블 추가
- 모든 자재 카테고리별 상세 테이블 생성 (fitting, valve, flange, bolt, gasket, instrument)
- PIPE, FITTING, VALVE 분류 결과를 각 상세 테이블에 저장하는 로직 구현
- 프론트엔드 라우팅 정리 및 BOM 현황 페이지 기능 개선
- 자재확인 페이지 에러 처리 개선

TODO: FLANGE, BOLT, GASKET, INSTRUMENT 저장 로직 추가 필요
2025-07-17 10:44:19 +09:00

980 lines
38 KiB
Python

from fastapi import APIRouter, Depends, HTTPException, UploadFile, File, Form
from sqlalchemy.orm import Session
from sqlalchemy import text
from typing import List, Optional
import os
import shutil
from datetime import datetime
import uuid
import pandas as pd
import re
from pathlib import Path
from ..database import get_db
from app.services.material_classifier import classify_material
from app.services.bolt_classifier import classify_bolt
from app.services.flange_classifier import classify_flange
from app.services.fitting_classifier import classify_fitting
from app.services.gasket_classifier import classify_gasket
from app.services.instrument_classifier import classify_instrument
from app.services.pipe_classifier import classify_pipe
from app.services.valve_classifier import classify_valve
router = APIRouter()
UPLOAD_DIR = Path("uploads")
UPLOAD_DIR.mkdir(exist_ok=True)
ALLOWED_EXTENSIONS = {".xlsx", ".xls", ".csv"}
@router.get("/")
async def get_files_info():
return {
"message": "파일 관리 API",
"allowed_extensions": list(ALLOWED_EXTENSIONS),
"upload_directory": str(UPLOAD_DIR)
}
@router.get("/test")
async def test_endpoint():
return {"status": "파일 API가 정상 작동합니다!"}
@router.post("/add-missing-columns")
async def add_missing_columns(db: Session = Depends(get_db)):
"""누락된 컬럼들 추가"""
try:
db.execute(text("ALTER TABLE files ADD COLUMN IF NOT EXISTS parsed_count INTEGER DEFAULT 0"))
db.execute(text("ALTER TABLE materials ADD COLUMN IF NOT EXISTS row_number INTEGER"))
db.commit()
return {
"success": True,
"message": "누락된 컬럼들이 추가되었습니다",
"added_columns": ["files.parsed_count", "materials.row_number"]
}
except Exception as e:
db.rollback()
return {"success": False, "error": f"컬럼 추가 실패: {str(e)}"}
def validate_file_extension(filename: str) -> bool:
return Path(filename).suffix.lower() in ALLOWED_EXTENSIONS
def generate_unique_filename(original_filename: str) -> str:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
unique_id = str(uuid.uuid4())[:8]
stem = Path(original_filename).stem
suffix = Path(original_filename).suffix
return f"{stem}_{timestamp}_{unique_id}{suffix}"
def parse_dataframe(df):
df = df.dropna(how='all')
df.columns = df.columns.str.strip().str.lower()
column_mapping = {
'description': ['description', 'item', 'material', '품명', '자재명'],
'quantity': ['qty', 'quantity', 'ea', '수량'],
'main_size': ['main_nom', 'nominal_diameter', 'nd', '주배관'],
'red_size': ['red_nom', 'reduced_diameter', '축소배관'],
'length': ['length', 'len', '길이'],
'weight': ['weight', 'wt', '중량'],
'dwg_name': ['dwg_name', 'drawing', '도면명'],
'line_num': ['line_num', 'line_number', '라인번호']
}
mapped_columns = {}
for standard_col, possible_names in column_mapping.items():
for possible_name in possible_names:
if possible_name in df.columns:
mapped_columns[standard_col] = possible_name
break
materials = []
for index, row in df.iterrows():
description = str(row.get(mapped_columns.get('description', ''), ''))
quantity_raw = row.get(mapped_columns.get('quantity', ''), 0)
try:
quantity = float(quantity_raw) if pd.notna(quantity_raw) else 0
except:
quantity = 0
material_grade = ""
if "ASTM" in description.upper():
astm_match = re.search(r'ASTM\s+([A-Z0-9\s]+)', description.upper())
if astm_match:
material_grade = astm_match.group(0).strip()
main_size = str(row.get(mapped_columns.get('main_size', ''), ''))
red_size = str(row.get(mapped_columns.get('red_size', ''), ''))
if main_size != 'nan' and red_size != 'nan' and red_size != '':
size_spec = f"{main_size} x {red_size}"
elif main_size != 'nan' and main_size != '':
size_spec = main_size
else:
size_spec = ""
# LENGTH 정보 추출
length_raw = row.get(mapped_columns.get('length', ''), '')
length_value = None
if pd.notna(length_raw) and str(length_raw).strip() != '':
try:
length_value = float(str(length_raw).strip())
except (ValueError, TypeError):
length_value = None
if description and description not in ['nan', 'None', '']:
materials.append({
'original_description': description,
'quantity': quantity,
'unit': "EA",
'size_spec': size_spec,
'material_grade': material_grade,
'length': length_value,
'line_number': index + 1,
'row_number': index + 1
})
return materials
def parse_file_data(file_path):
file_extension = Path(file_path).suffix.lower()
try:
if file_extension == ".csv":
df = pd.read_csv(file_path, encoding='utf-8')
elif file_extension in [".xlsx", ".xls"]:
df = pd.read_excel(file_path, sheet_name=0)
else:
raise HTTPException(status_code=400, detail="지원하지 않는 파일 형식")
return parse_dataframe(df)
except Exception as e:
raise HTTPException(status_code=400, detail=f"파일 파싱 실패: {str(e)}")
@router.post("/upload")
async def upload_file(
file: UploadFile = File(...),
job_no: str = Form(...),
revision: str = Form("Rev.0"),
db: Session = Depends(get_db)
):
if not validate_file_extension(file.filename):
raise HTTPException(
status_code=400,
detail=f"지원하지 않는 파일 형식입니다. 허용된 확장자: {', '.join(ALLOWED_EXTENSIONS)}"
)
if file.size and file.size > 10 * 1024 * 1024:
raise HTTPException(status_code=400, detail="파일 크기는 10MB를 초과할 수 없습니다")
unique_filename = generate_unique_filename(file.filename)
file_path = UPLOAD_DIR / unique_filename
try:
print("파일 저장 시작")
with open(file_path, "wb") as buffer:
shutil.copyfileobj(file.file, buffer)
print(f"파일 저장 완료: {file_path}")
except Exception as e:
raise HTTPException(status_code=500, detail=f"파일 저장 실패: {str(e)}")
try:
print("파일 파싱 시작")
materials_data = parse_file_data(str(file_path))
parsed_count = len(materials_data)
print(f"파싱 완료: {parsed_count}개 자재")
# 파일 정보 저장
print("DB 저장 시작")
file_insert_query = text("""
INSERT INTO files (filename, original_filename, file_path, job_no, revision, description, file_size, parsed_count, is_active)
VALUES (:filename, :original_filename, :file_path, :job_no, :revision, :description, :file_size, :parsed_count, :is_active)
RETURNING id
""")
file_result = db.execute(file_insert_query, {
"filename": unique_filename,
"original_filename": file.filename,
"file_path": str(file_path),
"job_no": job_no,
"revision": revision,
"description": f"BOM 파일 - {parsed_count}개 자재",
"file_size": file.size,
"parsed_count": parsed_count,
"is_active": True
})
file_id = file_result.fetchone()[0]
print(f"파일 저장 완료: file_id = {file_id}")
# 자재 데이터 저장 (분류 포함)
materials_inserted = 0
for material_data in materials_data:
# 자재 타입 분류기 적용 (PIPE, FITTING, VALVE 등)
description = material_data["original_description"]
size_spec = material_data["size_spec"]
# 각 분류기로 시도 (올바른 매개변수 사용)
print(f"분류 시도: {description}")
# LENGTH 정보 추출
length_value = None
if "length" in material_data:
try:
length_value = float(material_data["length"])
except (ValueError, TypeError):
length_value = None
classification_result = classify_pipe("", description, size_spec, length_value)
print(f"PIPE 분류 결과: {classification_result.get('category', 'UNKNOWN')} (신뢰도: {classification_result.get('overall_confidence', 0)})")
if classification_result.get("overall_confidence", 0) < 0.5:
classification_result = classify_fitting("", description, size_spec)
print(f"FITTING 분류 결과: {classification_result.get('category', 'UNKNOWN')} (신뢰도: {classification_result.get('overall_confidence', 0)})")
if classification_result.get("overall_confidence", 0) < 0.5:
classification_result = classify_valve("", description, size_spec)
print(f"VALVE 분류 결과: {classification_result.get('category', 'UNKNOWN')} (신뢰도: {classification_result.get('overall_confidence', 0)})")
if classification_result.get("overall_confidence", 0) < 0.5:
classification_result = classify_flange("", description, size_spec)
print(f"FLANGE 분류 결과: {classification_result.get('category', 'UNKNOWN')} (신뢰도: {classification_result.get('overall_confidence', 0)})")
if classification_result.get("overall_confidence", 0) < 0.5:
classification_result = classify_bolt("", description, size_spec)
print(f"BOLT 분류 결과: {classification_result.get('category', 'UNKNOWN')} (신뢰도: {classification_result.get('overall_confidence', 0)})")
if classification_result.get("overall_confidence", 0) < 0.5:
classification_result = classify_gasket("", description, size_spec)
print(f"GASKET 분류 결과: {classification_result.get('category', 'UNKNOWN')} (신뢰도: {classification_result.get('overall_confidence', 0)})")
if classification_result.get("overall_confidence", 0) < 0.5:
classification_result = classify_instrument("", description, size_spec)
print(f"INSTRUMENT 분류 결과: {classification_result.get('category', 'UNKNOWN')} (신뢰도: {classification_result.get('overall_confidence', 0)})")
print(f"최종 분류 결과: {classification_result.get('category', 'UNKNOWN')}")
# 기본 자재 정보 저장
material_insert_query = text("""
INSERT INTO materials (
file_id, original_description, quantity, unit, size_spec,
material_grade, line_number, row_number, classified_category,
classification_confidence, is_verified, created_at
)
VALUES (
:file_id, :original_description, :quantity, :unit, :size_spec,
:material_grade, :line_number, :row_number, :classified_category,
:classification_confidence, :is_verified, :created_at
)
RETURNING id
""")
material_result = db.execute(material_insert_query, {
"file_id": file_id,
"original_description": material_data["original_description"],
"quantity": material_data["quantity"],
"unit": material_data["unit"],
"size_spec": material_data["size_spec"],
"material_grade": material_data["material_grade"],
"line_number": material_data["line_number"],
"row_number": material_data["row_number"],
"classified_category": classification_result.get("category", "UNKNOWN"),
"classification_confidence": classification_result.get("overall_confidence", 0.0),
"is_verified": False,
"created_at": datetime.now()
})
material_id = material_result.fetchone()[0]
materials_inserted += 1
# PIPE 분류 결과인 경우 상세 정보 저장
if classification_result.get("category") == "PIPE":
print("PIPE 상세 정보 저장 시작")
# 길이 정보 추출
length_mm = None
if "length_info" in classification_result:
length_mm = classification_result["length_info"].get("length_mm")
pipe_detail_insert_query = text("""
INSERT INTO pipe_details (
file_id, material_standard, material_grade, material_type,
manufacturing_method, end_preparation, schedule, wall_thickness,
nominal_size, length_mm, material_confidence, manufacturing_confidence,
end_prep_confidence, schedule_confidence
)
VALUES (
:file_id, :material_standard, :material_grade, :material_type,
:manufacturing_method, :end_preparation, :schedule, :wall_thickness,
:nominal_size, :length_mm, :material_confidence, :manufacturing_confidence,
:end_prep_confidence, :schedule_confidence
)
""")
# 재질 정보
material_info = classification_result.get("material", {})
manufacturing_info = classification_result.get("manufacturing", {})
end_prep_info = classification_result.get("end_preparation", {})
schedule_info = classification_result.get("schedule", {})
size_info = classification_result.get("size_info", {})
db.execute(pipe_detail_insert_query, {
"file_id": file_id,
"material_standard": material_info.get("standard"),
"material_grade": material_info.get("grade"),
"material_type": material_info.get("material_type"),
"manufacturing_method": manufacturing_info.get("method"),
"end_preparation": end_prep_info.get("type"),
"schedule": schedule_info.get("schedule"),
"wall_thickness": schedule_info.get("wall_thickness"),
"nominal_size": size_info.get("nominal_size"),
"length_mm": length_mm,
"material_confidence": material_info.get("confidence", 0.0),
"manufacturing_confidence": manufacturing_info.get("confidence", 0.0),
"end_prep_confidence": end_prep_info.get("confidence", 0.0),
"schedule_confidence": schedule_info.get("confidence", 0.0)
})
print("PIPE 상세 정보 저장 완료")
db.commit()
print(f"자재 저장 완료: {materials_inserted}")
return {
"success": True,
"message": f"업로드 성공! {materials_inserted}개 자재가 분류되었습니다.",
"original_filename": file.filename,
"file_id": file_id,
"materials_count": materials_inserted,
"parsed_count": parsed_count
}
except Exception as e:
db.rollback()
if os.path.exists(file_path):
os.remove(file_path)
raise HTTPException(status_code=500, detail=f"파일 처리 실패: {str(e)}")
@router.get("/files")
async def get_files(
job_no: Optional[str] = None,
db: Session = Depends(get_db)
):
"""파일 목록 조회"""
try:
query = """
SELECT id, filename, original_filename, job_no, revision,
description, file_size, parsed_count, created_at, is_active
FROM files
WHERE is_active = TRUE
"""
params = {}
if job_no:
query += " AND job_no = :job_no"
params["job_no"] = job_no
query += " ORDER BY created_at DESC"
result = db.execute(text(query), params)
files = result.fetchall()
return [
{
"id": file.id,
"filename": file.filename,
"original_filename": file.original_filename,
"job_no": file.job_no,
"revision": file.revision,
"description": file.description,
"file_size": file.file_size,
"parsed_count": file.parsed_count,
"created_at": file.created_at,
"is_active": file.is_active
}
for file in files
]
except Exception as e:
raise HTTPException(status_code=500, detail=f"파일 목록 조회 실패: {str(e)}")
@router.delete("/files/{file_id}")
async def delete_file(file_id: int, db: Session = Depends(get_db)):
"""파일 삭제"""
try:
# 자재 먼저 삭제
db.execute(text("DELETE FROM materials WHERE file_id = :file_id"), {"file_id": file_id})
# 파일 삭제
result = db.execute(text("DELETE FROM files WHERE id = :file_id"), {"file_id": file_id})
if result.rowcount == 0:
raise HTTPException(status_code=404, detail="파일을 찾을 수 없습니다")
db.commit()
return {
"success": True,
"message": "파일이 삭제되었습니다"
}
except Exception as e:
db.rollback()
raise HTTPException(status_code=500, detail=f"파일 삭제 실패: {str(e)}")
@router.get("/materials")
async def get_materials(
project_id: Optional[int] = None,
file_id: Optional[int] = None,
job_no: Optional[str] = None,
filename: Optional[str] = None,
revision: Optional[str] = None,
skip: int = 0,
limit: int = 100,
search: Optional[str] = None,
item_type: Optional[str] = None,
material_grade: Optional[str] = None,
size_spec: Optional[str] = None,
file_filter: Optional[str] = None,
sort_by: Optional[str] = None,
db: Session = Depends(get_db)
):
"""
저장된 자재 목록 조회 (job_no, filename, revision 3가지로 필터링 가능)
"""
try:
query = """
SELECT m.id, m.file_id, m.original_description, m.quantity, m.unit,
m.size_spec, m.material_grade, m.line_number, m.row_number,
m.created_at, m.classified_category, m.classification_confidence,
f.original_filename, f.project_id, f.job_no, f.revision,
p.official_project_code, p.project_name
FROM materials m
LEFT JOIN files f ON m.file_id = f.id
LEFT JOIN projects p ON f.project_id = p.id
WHERE 1=1
"""
params = {}
if project_id:
query += " AND f.project_id = :project_id"
params["project_id"] = project_id
if file_id:
query += " AND m.file_id = :file_id"
params["file_id"] = file_id
if job_no:
query += " AND f.job_no = :job_no"
params["job_no"] = job_no
if filename:
query += " AND f.original_filename = :filename"
params["filename"] = filename
if revision:
query += " AND f.revision = :revision"
params["revision"] = revision
if search:
query += " AND (m.original_description ILIKE :search OR m.material_grade ILIKE :search)"
params["search"] = f"%{search}%"
if item_type:
query += " AND m.classified_category = :item_type"
params["item_type"] = item_type
if material_grade:
query += " AND m.material_grade ILIKE :material_grade"
params["material_grade"] = f"%{material_grade}%"
if size_spec:
query += " AND m.size_spec ILIKE :size_spec"
params["size_spec"] = f"%{size_spec}%"
if file_filter:
query += " AND f.original_filename ILIKE :file_filter"
params["file_filter"] = f"%{file_filter}%"
# 정렬 처리
if sort_by:
if sort_by == "quantity_desc":
query += " ORDER BY m.quantity DESC"
elif sort_by == "quantity_asc":
query += " ORDER BY m.quantity ASC"
elif sort_by == "name_asc":
query += " ORDER BY m.original_description ASC"
elif sort_by == "name_desc":
query += " ORDER BY m.original_description DESC"
elif sort_by == "created_desc":
query += " ORDER BY m.created_at DESC"
elif sort_by == "created_asc":
query += " ORDER BY m.created_at ASC"
else:
query += " ORDER BY m.line_number ASC"
else:
query += " ORDER BY m.line_number ASC"
query += " LIMIT :limit OFFSET :skip"
params["limit"] = limit
params["skip"] = skip
result = db.execute(text(query), params)
materials = result.fetchall()
# 전체 개수 조회
count_query = """
SELECT COUNT(*) as total
FROM materials m
LEFT JOIN files f ON m.file_id = f.id
WHERE 1=1
"""
count_params = {}
if project_id:
count_query += " AND f.project_id = :project_id"
count_params["project_id"] = project_id
if file_id:
count_query += " AND m.file_id = :file_id"
count_params["file_id"] = file_id
if search:
count_query += " AND (m.original_description ILIKE :search OR m.material_grade ILIKE :search)"
count_params["search"] = f"%{search}%"
if item_type:
count_query += " AND m.classified_category = :item_type"
count_params["item_type"] = item_type
if material_grade:
count_query += " AND m.material_grade ILIKE :material_grade"
count_params["material_grade"] = f"%{material_grade}%"
if size_spec:
count_query += " AND m.size_spec ILIKE :size_spec"
count_params["size_spec"] = f"%{size_spec}%"
if file_filter:
count_query += " AND f.original_filename ILIKE :file_filter"
count_params["file_filter"] = f"%{file_filter}%"
count_result = db.execute(text(count_query), count_params)
total_count = count_result.fetchone()[0]
return {
"success": True,
"total_count": total_count,
"returned_count": len(materials),
"skip": skip,
"limit": limit,
"materials": [
{
"id": m.id,
"file_id": m.file_id,
"filename": m.original_filename,
"project_id": m.project_id,
"project_code": m.official_project_code,
"project_name": m.project_name,
"original_description": m.original_description,
"quantity": float(m.quantity) if m.quantity else 0,
"unit": m.unit,
"size_spec": m.size_spec,
"material_grade": m.material_grade,
"line_number": m.line_number,
"row_number": m.row_number,
"classified_category": m.classified_category,
"classification_confidence": float(m.classification_confidence) if m.classification_confidence else 0.0,
"created_at": m.created_at
}
for m in materials
]
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"자재 조회 실패: {str(e)}")
@router.get("/materials/summary")
async def get_materials_summary(
project_id: Optional[int] = None,
file_id: Optional[int] = None,
db: Session = Depends(get_db)
):
"""자재 요약 통계"""
try:
query = """
SELECT
COUNT(*) as total_items,
COUNT(DISTINCT m.original_description) as unique_descriptions,
COUNT(DISTINCT m.size_spec) as unique_sizes,
COUNT(DISTINCT m.material_grade) as unique_materials,
SUM(m.quantity) as total_quantity,
AVG(m.quantity) as avg_quantity,
MIN(m.created_at) as earliest_upload,
MAX(m.created_at) as latest_upload
FROM materials m
LEFT JOIN files f ON m.file_id = f.id
WHERE 1=1
"""
params = {}
if project_id:
query += " AND f.project_id = :project_id"
params["project_id"] = project_id
if file_id:
query += " AND m.file_id = :file_id"
params["file_id"] = file_id
result = db.execute(text(query), params)
summary = result.fetchone()
return {
"success": True,
"summary": {
"total_items": summary.total_items,
"unique_descriptions": summary.unique_descriptions,
"unique_sizes": summary.unique_sizes,
"unique_materials": summary.unique_materials,
"total_quantity": float(summary.total_quantity) if summary.total_quantity else 0,
"avg_quantity": round(float(summary.avg_quantity), 2) if summary.avg_quantity else 0,
"earliest_upload": summary.earliest_upload,
"latest_upload": summary.latest_upload
}
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"요약 조회 실패: {str(e)}")
@router.get("/materials/compare-revisions")
async def compare_revisions(
job_no: str,
filename: str,
old_revision: str,
new_revision: str,
db: Session = Depends(get_db)
):
"""
리비전 간 자재 비교
"""
try:
# 기존 리비전 자재 조회
old_materials_query = text("""
SELECT m.original_description, m.quantity, m.unit, m.size_spec,
m.material_grade, m.classified_category, m.classification_confidence
FROM materials m
JOIN files f ON m.file_id = f.id
WHERE f.job_no = :job_no
AND f.original_filename = :filename
AND f.revision = :old_revision
""")
old_result = db.execute(old_materials_query, {
"job_no": job_no,
"filename": filename,
"old_revision": old_revision
})
old_materials = old_result.fetchall()
# 새 리비전 자재 조회
new_materials_query = text("""
SELECT m.original_description, m.quantity, m.unit, m.size_spec,
m.material_grade, m.classified_category, m.classification_confidence
FROM materials m
JOIN files f ON m.file_id = f.id
WHERE f.job_no = :job_no
AND f.original_filename = :filename
AND f.revision = :new_revision
""")
new_result = db.execute(new_materials_query, {
"job_no": job_no,
"filename": filename,
"new_revision": new_revision
})
new_materials = new_result.fetchall()
# 자재 키 생성 함수
def create_material_key(material):
return f"{material.original_description}_{material.size_spec}_{material.material_grade}"
# 기존 자재를 딕셔너리로 변환
old_materials_dict = {}
for material in old_materials:
key = create_material_key(material)
old_materials_dict[key] = {
"original_description": material.original_description,
"quantity": float(material.quantity) if material.quantity else 0,
"unit": material.unit,
"size_spec": material.size_spec,
"material_grade": material.material_grade,
"classified_category": material.classified_category,
"classification_confidence": material.classification_confidence
}
# 새 자재를 딕셔너리로 변환
new_materials_dict = {}
for material in new_materials:
key = create_material_key(material)
new_materials_dict[key] = {
"original_description": material.original_description,
"quantity": float(material.quantity) if material.quantity else 0,
"unit": material.unit,
"size_spec": material.size_spec,
"material_grade": material.material_grade,
"classified_category": material.classified_category,
"classification_confidence": material.classification_confidence
}
# 변경 사항 분석
all_keys = set(old_materials_dict.keys()) | set(new_materials_dict.keys())
added_items = []
removed_items = []
changed_items = []
for key in all_keys:
old_item = old_materials_dict.get(key)
new_item = new_materials_dict.get(key)
if old_item and not new_item:
# 삭제된 항목
removed_items.append({
"key": key,
"item": old_item,
"change_type": "removed"
})
elif not old_item and new_item:
# 추가된 항목
added_items.append({
"key": key,
"item": new_item,
"change_type": "added"
})
elif old_item and new_item:
# 수량 변경 확인
if old_item["quantity"] != new_item["quantity"]:
changed_items.append({
"key": key,
"old_item": old_item,
"new_item": new_item,
"quantity_change": new_item["quantity"] - old_item["quantity"],
"change_type": "quantity_changed"
})
# 분류별 통계
def calculate_category_stats(items):
stats = {}
for item in items:
category = item.get("item", {}).get("classified_category", "OTHER")
if category not in stats:
stats[category] = {"count": 0, "total_quantity": 0}
stats[category]["count"] += 1
stats[category]["total_quantity"] += item.get("item", {}).get("quantity", 0)
return stats
added_stats = calculate_category_stats(added_items)
removed_stats = calculate_category_stats(removed_items)
changed_stats = calculate_category_stats(changed_items)
return {
"success": True,
"comparison": {
"old_revision": old_revision,
"new_revision": new_revision,
"filename": filename,
"job_no": job_no,
"summary": {
"added_count": len(added_items),
"removed_count": len(removed_items),
"changed_count": len(changed_items),
"total_changes": len(added_items) + len(removed_items) + len(changed_items)
},
"changes": {
"added": added_items,
"removed": removed_items,
"changed": changed_items
},
"category_stats": {
"added": added_stats,
"removed": removed_stats,
"changed": changed_stats
}
}
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"리비전 비교 실패: {str(e)}")
@router.get("/pipe-details")
async def get_pipe_details(
file_id: Optional[int] = None,
job_no: Optional[str] = None,
db: Session = Depends(get_db)
):
"""
PIPE 상세 정보 조회
"""
try:
query = """
SELECT pd.*, f.original_filename, f.job_no, f.revision,
m.original_description, m.quantity, m.unit
FROM pipe_details pd
LEFT JOIN files f ON pd.file_id = f.id
LEFT JOIN materials m ON pd.file_id = m.file_id
AND m.classified_category = 'PIPE'
WHERE 1=1
"""
params = {}
if file_id:
query += " AND pd.file_id = :file_id"
params["file_id"] = file_id
if job_no:
query += " AND f.job_no = :job_no"
params["job_no"] = job_no
query += " ORDER BY pd.created_at DESC"
result = db.execute(text(query), params)
pipe_details = result.fetchall()
return [
{
"id": pd.id,
"file_id": pd.file_id,
"original_filename": pd.original_filename,
"job_no": pd.job_no,
"revision": pd.revision,
"original_description": pd.original_description,
"quantity": pd.quantity,
"unit": pd.unit,
"material_standard": pd.material_standard,
"material_grade": pd.material_grade,
"material_type": pd.material_type,
"manufacturing_method": pd.manufacturing_method,
"end_preparation": pd.end_preparation,
"schedule": pd.schedule,
"wall_thickness": pd.wall_thickness,
"nominal_size": pd.nominal_size,
"length_mm": pd.length_mm,
"material_confidence": pd.material_confidence,
"manufacturing_confidence": pd.manufacturing_confidence,
"end_prep_confidence": pd.end_prep_confidence,
"schedule_confidence": pd.schedule_confidence,
"created_at": pd.created_at,
"updated_at": pd.updated_at
}
for pd in pipe_details
]
except Exception as e:
raise HTTPException(status_code=500, detail=f"PIPE 상세 정보 조회 실패: {str(e)}")
@router.get("/user-requirements")
async def get_user_requirements(
file_id: Optional[int] = None,
job_no: Optional[str] = None,
status: Optional[str] = None,
db: Session = Depends(get_db)
):
"""
사용자 요구사항 조회
"""
try:
query = """
SELECT ur.*, f.original_filename, f.job_no, f.revision,
rt.type_name, rt.category
FROM user_requirements ur
LEFT JOIN files f ON ur.file_id = f.id
LEFT JOIN requirement_types rt ON ur.requirement_type = rt.type_code
WHERE 1=1
"""
params = {}
if file_id:
query += " AND ur.file_id = :file_id"
params["file_id"] = file_id
if job_no:
query += " AND f.job_no = :job_no"
params["job_no"] = job_no
if status:
query += " AND ur.status = :status"
params["status"] = status
query += " ORDER BY ur.created_at DESC"
result = db.execute(text(query), params)
requirements = result.fetchall()
return [
{
"id": req.id,
"file_id": req.file_id,
"original_filename": req.original_filename,
"job_no": req.job_no,
"revision": req.revision,
"requirement_type": req.requirement_type,
"type_name": req.type_name,
"category": req.category,
"requirement_title": req.requirement_title,
"requirement_description": req.requirement_description,
"requirement_spec": req.requirement_spec,
"status": req.status,
"priority": req.priority,
"assigned_to": req.assigned_to,
"due_date": req.due_date,
"created_at": req.created_at,
"updated_at": req.updated_at
}
for req in requirements
]
except Exception as e:
raise HTTPException(status_code=500, detail=f"사용자 요구사항 조회 실패: {str(e)}")
@router.post("/user-requirements")
async def create_user_requirement(
file_id: int,
requirement_type: str,
requirement_title: str,
requirement_description: Optional[str] = None,
requirement_spec: Optional[str] = None,
priority: str = "NORMAL",
assigned_to: Optional[str] = None,
due_date: Optional[str] = None,
db: Session = Depends(get_db)
):
"""
사용자 요구사항 생성
"""
try:
insert_query = text("""
INSERT INTO user_requirements (
file_id, requirement_type, requirement_title, requirement_description,
requirement_spec, priority, assigned_to, due_date
)
VALUES (
:file_id, :requirement_type, :requirement_title, :requirement_description,
:requirement_spec, :priority, :assigned_to, :due_date
)
RETURNING id
""")
result = db.execute(insert_query, {
"file_id": file_id,
"requirement_type": requirement_type,
"requirement_title": requirement_title,
"requirement_description": requirement_description,
"requirement_spec": requirement_spec,
"priority": priority,
"assigned_to": assigned_to,
"due_date": due_date
})
requirement_id = result.fetchone()[0]
db.commit()
return {
"success": True,
"message": "요구사항이 생성되었습니다",
"requirement_id": requirement_id
}
except Exception as e:
db.rollback()
raise HTTPException(status_code=500, detail=f"요구사항 생성 실패: {str(e)}")