Files
tk-factory-services/tkeg/api/app/routers/material_comparison.py
Hyungi Ahn f09c86ee01 fix(security): CRITICAL 보안 이슈 13건 일괄 수정
- SEC-42: JWT algorithm HS256 명시 (sign 5곳, verify 3곳)
- SEC-44: MariaDB/PhpMyAdmin 포트 127.0.0.1 바인딩
- SEC-29: escHtml = escapeHtml alias 추가 (XSS 방지)
- SEC-39: Python Dockerfile 4개 non-root user + chown
- SEC-43: deploy-remote.sh 삭제 (평문 비밀번호 포함)
- SEC-11,12: SQL SET ? → 명시적 컬럼 whitelist + IN절 parameterized
- QA-34: vacation approveRequest/cancelRequest 트랜잭션 래핑
- SEC-32,34: material_comparison.py 5개 엔드포인트 인증 + confirmed_by
- SEC-33: files.py 17개 미인증 엔드포인트 인증 추가
- SEC-37: chatbot 프롬프트 인젝션 방어 (sanitize + XML 구분자)
- SEC-38: fastapi-bridge 프록시 JWT 검증 + 캐시 키 user_id 포함
- SEC-58/QA-98: monthly-comparison API_BASE_URL 수정 + 401 처리
- SEC-61: monthlyComparisonModel SELECT FOR UPDATE 추가
- SEC-63: proxyInputController 에러 메시지 노출 제거
- QA-103: pageAccessRoutes error→message 통일
- SEC-62: tbm-create onclick 인젝션 → data-attribute event delegation
- QA-99: tbm-mobile/create 캐시 버스팅 갱신
- QA-100,101: ESC 키 리스너 cleanup 추가

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-01 10:48:58 +09:00

669 lines
26 KiB
Python

"""
자재 비교 및 발주 추적 API
- 리비전간 자재 비교
- 추가 발주 필요량 계산
- 발주 상태 관리
"""
import logging
from fastapi import APIRouter, Depends, HTTPException, Query
from sqlalchemy.orm import Session
from sqlalchemy import text
import json
from typing import List, Optional, Dict
from datetime import datetime
from ..database import get_db
from ..auth.middleware import get_current_user
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/materials", tags=["material-comparison"])
@router.post("/compare-revisions")
async def compare_material_revisions(
job_no: str,
current_revision: str,
previous_revision: Optional[str] = None,
save_result: bool = True,
db: Session = Depends(get_db),
current_user: dict = Depends(get_current_user)
):
"""
리비전간 자재 비교 및 추가 발주 필요량 계산
- 해시 기반 고성능 비교
- 누적 재고 고려한 실제 구매 필요량 계산
"""
try:
# 1. 파일 정보 조회
current_file = await get_file_by_revision(db, job_no, current_revision)
if not current_file:
raise HTTPException(status_code=404, detail=f"{job_no} {current_revision} 파일을 찾을 수 없습니다")
# 2. 이전 리비전 자동 탐지
if not previous_revision:
previous_revision = await get_previous_revision(db, job_no, current_revision)
previous_file = None
if previous_revision:
previous_file = await get_file_by_revision(db, job_no, previous_revision)
# 3. 자재 비교 실행
comparison_result = await perform_material_comparison(
db, current_file, previous_file, job_no
)
# 4. 결과 저장
comparison_id = None
if save_result and previous_file and previous_revision:
try:
comparison_id = await save_comparison_result(
db, job_no, current_revision, previous_revision,
current_file["id"], previous_file["id"], comparison_result
)
except Exception as e:
logger.warning(f"비교 결과 저장 실패 (비교 자체는 성공): {e}")
return {
"success": True,
"job_no": job_no,
"current_revision": current_revision,
"previous_revision": previous_revision,
"comparison_id": comparison_id,
"summary": comparison_result["summary"],
"new_items": comparison_result["new_items"],
"modified_items": comparison_result["modified_items"],
"removed_items": comparison_result["removed_items"]
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"자재 비교 실패: {str(e)}")
@router.get("/comparison-history")
async def get_comparison_history(
job_no: str = Query(..., description="Job 번호"),
limit: int = Query(10, ge=1, le=50),
db: Session = Depends(get_db),
current_user: dict = Depends(get_current_user)
):
"""
자재 비교 이력 조회
"""
try:
query = text("""
SELECT
id, current_revision, previous_revision,
new_items_count, modified_items_count, removed_items_count,
upload_date, created_by
FROM material_revisions_comparison
WHERE job_no = :job_no
ORDER BY upload_date DESC
LIMIT :limit
""")
result = db.execute(query, {"job_no": job_no, "limit": limit})
comparisons = result.fetchall()
return {
"success": True,
"job_no": job_no,
"comparisons": [
{
"id": comp[0],
"current_revision": comp[1],
"previous_revision": comp[2],
"new_items_count": comp[3],
"modified_items_count": comp[4],
"removed_items_count": comp[5],
"upload_date": comp[6],
"created_by": comp[7]
}
for comp in comparisons
]
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"비교 이력 조회 실패: {str(e)}")
@router.get("/inventory-status")
async def get_material_inventory_status(
job_no: str = Query(..., description="Job 번호"),
material_hash: Optional[str] = Query(None, description="특정 자재 해시"),
db: Session = Depends(get_db),
current_user: dict = Depends(get_current_user)
):
"""
자재별 누적 재고 현황 조회
"""
try:
# 임시로 빈 결과 반환 (추후 개선)
return {
"success": True,
"job_no": job_no,
"inventory": []
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"재고 현황 조회 실패: {str(e)}")
@router.post("/confirm-purchase")
async def confirm_material_purchase(
job_no: str,
revision: str,
confirmations: List[Dict],
db: Session = Depends(get_db),
current_user: dict = Depends(get_current_user)
):
"""
자재 발주 확정 처리
confirmations = [
{
"material_hash": "abc123",
"confirmed_quantity": 100,
"supplier_name": "ABC공급업체",
"unit_price": 1000
}
]
"""
confirmed_by = current_user.get('username', current_user.get('name', 'unknown'))
try:
# 입력 데이터 검증
if not job_no or not revision:
raise HTTPException(status_code=400, detail="Job 번호와 리비전은 필수입니다")
if not confirmations:
raise HTTPException(status_code=400, detail="확정할 자재가 없습니다")
# 각 확정 항목 검증
for i, confirmation in enumerate(confirmations):
if not confirmation.get("material_hash"):
raise HTTPException(status_code=400, detail=f"{i+1}번째 항목의 material_hash가 없습니다")
confirmed_qty = confirmation.get("confirmed_quantity")
if confirmed_qty is None or confirmed_qty < 0:
raise HTTPException(status_code=400, detail=f"{i+1}번째 항목의 확정 수량이 유효하지 않습니다")
unit_price = confirmation.get("unit_price", 0)
if unit_price < 0:
raise HTTPException(status_code=400, detail=f"{i+1}번째 항목의 단가가 유효하지 않습니다")
confirmed_items = []
for confirmation in confirmations:
# 발주 추적 테이블에 저장/업데이트
upsert_query = text("""
INSERT INTO material_purchase_tracking (
job_no, material_hash, revision, description, size_spec, unit,
bom_quantity, calculated_quantity, confirmed_quantity,
purchase_status, supplier_name, unit_price, total_price,
confirmed_by, confirmed_at
)
SELECT
:job_no, m.material_hash, :revision, m.original_description,
m.size_spec, m.unit, m.quantity, :calculated_qty, :confirmed_qty,
'CONFIRMED', :supplier_name, :unit_price, :total_price,
:confirmed_by, CURRENT_TIMESTAMP
FROM materials m
WHERE m.material_hash = :material_hash
AND m.file_id = (
SELECT id FROM files
WHERE job_no = :job_no AND revision = :revision
ORDER BY upload_date DESC LIMIT 1
)
LIMIT 1
ON CONFLICT (job_no, material_hash, revision)
DO UPDATE SET
confirmed_quantity = :confirmed_qty,
purchase_status = 'CONFIRMED',
supplier_name = :supplier_name,
unit_price = :unit_price,
total_price = :total_price,
confirmed_by = :confirmed_by,
confirmed_at = CURRENT_TIMESTAMP,
updated_at = CURRENT_TIMESTAMP
RETURNING id, description, confirmed_quantity
""")
calculated_qty = confirmation.get("calculated_quantity", confirmation["confirmed_quantity"])
total_price = confirmation["confirmed_quantity"] * confirmation.get("unit_price", 0)
result = db.execute(upsert_query, {
"job_no": job_no,
"revision": revision,
"material_hash": confirmation["material_hash"],
"calculated_qty": calculated_qty,
"confirmed_qty": confirmation["confirmed_quantity"],
"supplier_name": confirmation.get("supplier_name", ""),
"unit_price": confirmation.get("unit_price", 0),
"total_price": total_price,
"confirmed_by": confirmed_by
})
confirmed_item = result.fetchone()
if confirmed_item:
confirmed_items.append({
"id": confirmed_item[0],
"material_hash": confirmed_item[1],
"confirmed_quantity": confirmed_item[2],
"supplier_name": confirmed_item[3],
"unit_price": confirmed_item[4],
"total_price": confirmed_item[5]
})
db.commit()
return {
"success": True,
"message": f"{len(confirmed_items)}개 자재 발주가 확정되었습니다",
"confirmed_items": confirmed_items,
"job_no": job_no,
"revision": revision
}
except Exception as e:
db.rollback()
raise HTTPException(status_code=500, detail=f"발주 확정 실패: {str(e)}")
@router.get("/purchase-status")
async def get_purchase_status(
job_no: str = Query(..., description="Job 번호"),
revision: Optional[str] = Query(None, description="리비전 (전체 조회시 생략)"),
status: Optional[str] = Query(None, description="발주 상태 필터"),
db: Session = Depends(get_db),
current_user: dict = Depends(get_current_user)
):
"""
발주 상태 조회
"""
try:
where_conditions = ["job_no = :job_no"]
params = {"job_no": job_no}
if revision:
where_conditions.append("revision = :revision")
params["revision"] = revision
if status:
where_conditions.append("purchase_status = :status")
params["status"] = status
query = text(f"""
SELECT
material_hash, revision, description, size_spec, unit,
bom_quantity, calculated_quantity, confirmed_quantity,
purchase_status, supplier_name, unit_price, total_price,
order_date, delivery_date, confirmed_by, confirmed_at
FROM material_purchase_tracking
WHERE {' AND '.join(where_conditions)}
ORDER BY revision DESC, description
""")
result = db.execute(query, params)
purchases = result.fetchall()
# 상태별 요약
status_summary = {}
total_amount = 0
for purchase in purchases:
status_key = purchase.purchase_status
if status_key not in status_summary:
status_summary[status_key] = {"count": 0, "total_amount": 0}
status_summary[status_key]["count"] += 1
status_summary[status_key]["total_amount"] += purchase.total_price or 0
total_amount += purchase.total_price or 0
return {
"success": True,
"job_no": job_no,
"revision": revision,
"purchases": [purchase._asdict() if hasattr(purchase, '_asdict') else dict(zip(purchase.keys(), purchase)) for purchase in purchases],
"summary": {
"total_items": len(purchases),
"total_amount": total_amount,
"status_breakdown": status_summary
}
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"발주 상태 조회 실패: {str(e)}")
# ========== 헬퍼 함수들 ==========
async def get_file_by_revision(db: Session, job_no: str, revision: str) -> Optional[Dict]:
"""리비전으로 파일 정보 조회"""
query = text("""
SELECT id, original_filename, revision, upload_date
FROM files
WHERE job_no = :job_no AND revision = :revision AND is_active = TRUE
ORDER BY upload_date DESC
LIMIT 1
""")
result = db.execute(query, {"job_no": job_no, "revision": revision})
file_row = result.fetchone()
if file_row:
return {
"id": file_row[0],
"original_filename": file_row[1],
"revision": file_row[2],
"upload_date": file_row[3]
}
return None
async def get_previous_revision(db: Session, job_no: str, current_revision: str) -> Optional[str]:
"""이전 리비전 자동 탐지 - 숫자 기반 비교"""
# 현재 리비전의 숫자 추출
try:
current_rev_num = int(current_revision.replace("Rev.", ""))
except (ValueError, AttributeError):
current_rev_num = 0
query = text("""
SELECT revision
FROM files
WHERE job_no = :job_no AND is_active = TRUE
ORDER BY revision DESC
""")
result = db.execute(query, {"job_no": job_no})
revisions = result.fetchall()
# 현재 리비전보다 낮은 리비전 중 가장 높은 것 찾기
previous_revision = None
highest_prev_num = -1
for row in revisions:
rev = row[0]
try:
rev_num = int(rev.replace("Rev.", ""))
if rev_num < current_rev_num and rev_num > highest_prev_num:
highest_prev_num = rev_num
previous_revision = rev
except (ValueError, AttributeError):
continue
return previous_revision
async def perform_material_comparison(
db: Session,
current_file: Dict,
previous_file: Optional[Dict],
job_no: str
) -> Dict:
"""
핵심 자재 비교 로직 - 간단한 버전
"""
# 1. 현재 리비전 자재 목록 (해시별로 그룹화)
current_materials = await get_materials_by_hash(db, current_file["id"])
# 2. 이전 리비전 자재 목록
previous_materials = {}
if previous_file:
previous_materials = await get_materials_by_hash(db, previous_file["id"])
# 3. 비교 실행
new_items = []
modified_items = []
removed_items = []
# 신규/변경 항목 찾기
for material_hash, current_item in current_materials.items():
current_qty = current_item["quantity"]
if material_hash not in previous_materials:
# 완전히 새로운 항목
new_item = {
"material_hash": material_hash,
"description": current_item["description"],
"size_spec": current_item["size_spec"],
"material_grade": current_item["material_grade"],
"quantity": current_qty,
"category": current_item["category"],
"unit": current_item["unit"]
}
# 파이프인 경우 pipe_details 정보 포함
if current_item.get("pipe_details"):
new_item["pipe_details"] = current_item["pipe_details"]
new_items.append(new_item)
else:
# 기존 항목 - 수량 변경 체크
previous_qty = previous_materials[material_hash]["quantity"]
qty_change = current_qty - previous_qty
if qty_change != 0:
modified_item = {
"material_hash": material_hash,
"description": current_item["description"],
"size_spec": current_item["size_spec"],
"material_grade": current_item["material_grade"],
"previous_quantity": previous_qty,
"current_quantity": current_qty,
"quantity_change": qty_change,
"category": current_item["category"],
"unit": current_item["unit"]
}
# 파이프인 경우 이전/현재 pipe_details 모두 포함
if current_item.get("pipe_details"):
modified_item["pipe_details"] = current_item["pipe_details"]
# 이전 리비전 pipe_details도 포함
previous_item = previous_materials[material_hash]
if previous_item.get("pipe_details"):
modified_item["previous_pipe_details"] = previous_item["pipe_details"]
# 실제 길이 변화 계산 (현재 총길이 - 이전 총길이)
if current_item.get("pipe_details"):
current_total = current_item["pipe_details"]["total_length_mm"]
previous_total = previous_item["pipe_details"]["total_length_mm"]
length_change = current_total - previous_total
modified_item["length_change"] = length_change
logger.info(f"실제 길이 변화: {current_item['description'][:50]} - 이전:{previous_total:.0f}mm -> 현재:{current_total:.0f}mm (변화:{length_change:+.0f}mm)")
modified_items.append(modified_item)
# 삭제된 항목 찾기
for material_hash, previous_item in previous_materials.items():
if material_hash not in current_materials:
removed_item = {
"material_hash": material_hash,
"description": previous_item["description"],
"size_spec": previous_item["size_spec"],
"material_grade": previous_item["material_grade"],
"quantity": previous_item["quantity"],
"category": previous_item["category"],
"unit": previous_item["unit"]
}
# 파이프인 경우 pipe_details 정보 포함
if previous_item.get("pipe_details"):
removed_item["pipe_details"] = previous_item["pipe_details"]
removed_items.append(removed_item)
return {
"summary": {
"total_current_items": len(current_materials),
"total_previous_items": len(previous_materials),
"new_items_count": len(new_items),
"modified_items_count": len(modified_items),
"removed_items_count": len(removed_items)
},
"new_items": new_items,
"modified_items": modified_items,
"removed_items": removed_items
}
async def get_materials_by_hash(db: Session, file_id: int) -> Dict[str, Dict]:
"""파일의 자재를 해시별로 그룹화하여 조회"""
import hashlib
# 로그 제거
query = text("""
SELECT
m.id,
m.original_description,
m.size_spec,
m.material_grade,
m.quantity,
m.classified_category,
m.unit,
pd.length_mm,
m.line_number
FROM materials m
LEFT JOIN pipe_details pd ON m.id = pd.material_id
WHERE m.file_id = :file_id
ORDER BY m.line_number
""")
result = db.execute(query, {"file_id": file_id})
materials = result.fetchall()
# 로그 제거
# 🔄 같은 파이프들을 Python에서 올바르게 그룹핑
materials_dict = {}
for mat in materials:
# 자재 해시 생성 (description + size_spec + material_grade)
hash_source = f"{mat[1] or ''}|{mat[2] or ''}|{mat[3] or ''}"
material_hash = hashlib.md5(hash_source.encode()).hexdigest()
# 개별 자재 로그 제거 (너무 많음)
if material_hash in materials_dict:
# 🔄 기존 항목에 수량 합계
existing = materials_dict[material_hash]
# 파이프가 아닌 경우만 quantity 합산 (파이프는 개별 길이가 다르므로 합산하지 않음)
if mat[5] != 'PIPE':
existing["quantity"] += float(mat[4]) if mat[4] else 0.0
existing["line_number"] += f", {mat[8]}" if mat[8] else ""
# 파이프인 경우 길이 정보 합산
if mat[5] == 'PIPE' and mat[7] is not None:
if "pipe_details" in existing:
# 총길이 합산: 기존 총길이 + 현재 파이프의 실제 길이 (DB에 저장된 개별 길이)
current_total = existing["pipe_details"]["total_length_mm"]
current_count = existing["pipe_details"]["pipe_count"]
# ✅ DB에서 가져온 length_mm는 이미 개별 파이프의 실제 길이이므로 수량을 곱하지 않음
individual_length = float(mat[7]) # 개별 파이프의 실제 길이
existing["pipe_details"]["total_length_mm"] = current_total + individual_length
existing["pipe_details"]["pipe_count"] = current_count + 1 # 파이프 개수는 1개씩 증가
# 평균 단위 길이 재계산
total_length = existing["pipe_details"]["total_length_mm"]
total_count = existing["pipe_details"]["pipe_count"]
existing["pipe_details"]["length_mm"] = total_length / total_count
# 파이프 합산 로그 제거 (너무 많음)
else:
# 첫 파이프 정보 설정
individual_length = float(mat[7]) # 개별 파이프의 실제 길이
existing["pipe_details"] = {
"length_mm": individual_length,
"total_length_mm": individual_length, # 첫 번째 파이프이므로 개별 길이와 동일
"pipe_count": 1 # 첫 번째 파이프이므로 1개
}
else:
# 🆕 새 항목 생성
material_data = {
"material_hash": material_hash,
"description": mat[1], # original_description
"size_spec": mat[2],
"material_grade": mat[3],
"quantity": float(mat[4]) if mat[4] else 0.0,
"category": mat[5], # classified_category
"unit": mat[6] or 'EA',
"line_number": str(mat[8]) if mat[8] else ''
}
# 파이프인 경우 pipe_details 정보 추가
if mat[5] == 'PIPE' and mat[7] is not None:
individual_length = float(mat[7]) # 개별 파이프의 실제 길이
material_data["pipe_details"] = {
"length_mm": individual_length, # 개별 파이프 길이
"total_length_mm": individual_length, # 첫 번째 파이프이므로 개별 길이와 동일
"pipe_count": 1 # 첫 번째 파이프이므로 1개
}
# 파이프는 quantity를 1로 설정 (pipe_count와 동일)
material_data["quantity"] = 1
materials_dict[material_hash] = material_data
# 파이프 데이터 요약만 출력
pipe_count = sum(1 for data in materials_dict.values() if data.get('category') == 'PIPE')
pipe_with_details = sum(1 for data in materials_dict.values()
if data.get('category') == 'PIPE' and 'pipe_details' in data)
logger.info(f"자재 처리 완료: 총 {len(materials_dict)}개, 파이프 {pipe_count}개 (길이정보: {pipe_with_details}개)")
return materials_dict
async def get_current_inventory(db: Session, job_no: str) -> Dict[str, float]:
"""현재까지의 누적 재고량 조회 - 임시로 빈 딕셔너리 반환"""
# TODO: 실제 재고 시스템 구현 후 활성화
return {}
async def save_comparison_result(
db: Session,
job_no: str,
current_revision: str,
previous_revision: str,
current_file_id: int,
previous_file_id: int,
comparison_result: Dict
) -> int:
"""비교 결과를 데이터베이스에 저장"""
# 메인 비교 레코드 저장
insert_query = text("""
INSERT INTO material_revisions_comparison (
job_no, current_revision, previous_revision,
current_file_id, previous_file_id,
total_current_items, total_previous_items,
new_items_count, modified_items_count, removed_items_count,
comparison_details, created_by
) VALUES (
:job_no, :current_revision, :previous_revision,
:current_file_id, :previous_file_id,
:total_current_items, :total_previous_items,
:new_items_count, :modified_items_count, :removed_items_count,
:comparison_details, 'system'
)
ON CONFLICT (job_no, current_revision, previous_revision)
DO UPDATE SET
total_current_items = :total_current_items,
total_previous_items = :total_previous_items,
new_items_count = :new_items_count,
modified_items_count = :modified_items_count,
removed_items_count = :removed_items_count,
comparison_details = :comparison_details,
upload_date = CURRENT_TIMESTAMP
RETURNING id
""")
import json
summary = comparison_result["summary"]
result = db.execute(insert_query, {
"job_no": job_no,
"current_revision": current_revision,
"previous_revision": previous_revision,
"current_file_id": current_file_id,
"previous_file_id": previous_file_id,
"total_current_items": summary["total_current_items"],
"total_previous_items": summary["total_previous_items"],
"new_items_count": summary["new_items_count"],
"modified_items_count": summary["modified_items_count"],
"removed_items_count": summary["removed_items_count"],
"comparison_details": json.dumps(comparison_result, ensure_ascii=False)
})
comparison_id = result.fetchone()[0]
db.commit()
return comparison_id