Files
TK-BOM-Project/backend/app/services/database_service.py
Hyungi Ahn 3398f71b80
Some checks failed
SonarQube Analysis / SonarQube Scan (push) Has been cancelled
🔄 전반적인 시스템 리팩토링 완료
 백엔드 구조 개선:
- DatabaseService: 공통 DB 쿼리 로직 통합
- FileUploadService: 파일 업로드 로직 모듈화 및 트랜잭션 관리 개선
- 서비스 레이어 패턴 도입으로 코드 재사용성 향상

 프론트엔드 컴포넌트 개선:
- LoadingSpinner, ErrorMessage, ConfirmDialog 공통 컴포넌트 생성
- 재사용 가능한 컴포넌트 라이브러리 구축
- deprecated/backup 파일들 완전 제거

 성능 최적화:
- optimize_database.py: 핵심 DB 인덱스 자동 생성
- 쿼리 최적화 및 통계 업데이트 자동화
- VACUUM ANALYZE 자동 실행

 코드 정리:
- 개별 SQL 마이그레이션 파일들을 legacy/ 폴더로 정리
- 중복된 마이그레이션 스크립트 정리
- 깔끔하고 체계적인 프로젝트 구조 완성

 자동 마이그레이션 시스템 강화:
- complete_migrate.py: SQLAlchemy 기반 완전한 마이그레이션
- analyze_and_fix_schema.py: 백엔드 코드 분석 기반 스키마 수정
- fix_missing_tables.py: 누락된 테이블/컬럼 자동 생성
- start.sh: 배포 시 자동 실행 순서 최적화
2025-10-20 08:41:06 +09:00

351 lines
16 KiB
Python

"""
데이터베이스 공통 서비스 레이어
중복된 DB 쿼리 로직을 통합하고 재사용 가능한 서비스 제공
"""
from sqlalchemy.orm import Session
from sqlalchemy import text, and_, or_
from typing import List, Dict, Any, Optional, Union
from ..models import Material, File, User, Project
from ..utils.logger import get_logger
from ..utils.error_handlers import ErrorResponse
logger = get_logger(__name__)
class DatabaseService:
"""데이터베이스 공통 서비스"""
def __init__(self, db: Session):
self.db = db
def execute_query(self, query: str, params: Dict = None) -> Any:
"""안전한 쿼리 실행"""
try:
result = self.db.execute(text(query), params or {})
return result
except Exception as e:
logger.error(f"Query execution failed: {query[:100]}... Error: {e}")
raise
def get_materials_with_details(
self,
file_id: Optional[int] = None,
job_no: Optional[str] = None,
limit: int = 1000,
offset: int = 0,
exclude_requested: bool = False
) -> Dict[str, Any]:
"""자재 상세 정보 조회 (통합된 쿼리)"""
where_conditions = ["1=1"]
params = {"limit": limit, "offset": offset}
if file_id:
where_conditions.append("m.file_id = :file_id")
params["file_id"] = file_id
if job_no:
where_conditions.append("f.job_no = :job_no")
params["job_no"] = job_no
if exclude_requested:
where_conditions.append("(m.purchase_confirmed IS NULL OR m.purchase_confirmed = false)")
# 통합된 자재 조회 쿼리
query = f"""
SELECT
m.id, m.file_id, m.line_number, m.row_number,
m.original_description, m.classified_category, m.classified_subcategory,
m.material_grade, m.full_material_grade, m.schedule, m.size_spec,
m.main_nom, m.red_nom, m.quantity, m.unit, m.length,
m.drawing_name, m.area_code, m.line_no,
m.classification_confidence, m.classification_details,
m.purchase_confirmed, m.confirmed_quantity, m.purchase_status,
m.purchase_confirmed_by, m.purchase_confirmed_at,
m.revision_status, m.material_hash, m.normalized_description,
m.drawing_reference, m.notes, m.created_at,
-- 파일 정보
f.filename, f.original_filename, f.job_no, f.revision, f.bom_name,
f.upload_date, f.uploaded_by,
-- 프로젝트 정보
p.project_name, p.client_name,
-- 상세 정보들 (LEFT JOIN)
pd.material_standard as pipe_material_standard,
pd.manufacturing_method, pd.end_preparation, pd.wall_thickness,
fd.fitting_type, fd.fitting_subtype, fd.connection_type as fitting_connection_type,
fd.main_size as fitting_main_size, fd.reduced_size as fitting_reduced_size,
fld.flange_type, fld.flange_subtype, fld.pressure_rating as flange_pressure_rating,
fld.face_type, fld.connection_method,
vd.valve_type, vd.valve_subtype, vd.actuation_type,
vd.pressure_rating as valve_pressure_rating, vd.temperature_rating,
bd.bolt_type, bd.bolt_subtype, bd.thread_type, bd.head_type,
bd.material_standard as bolt_material_standard,
bd.pressure_rating as bolt_pressure_rating,
gd.gasket_type, gd.gasket_subtype, gd.material_type as gasket_material_type,
gd.filler_material, gd.pressure_rating as gasket_pressure_rating,
gd.size_inches as gasket_size_inches, gd.thickness as gasket_thickness,
gd.temperature_range as gasket_temperature_range, gd.fire_safe,
-- 구매 추적 정보
mpt.confirmed_quantity as tracking_confirmed_quantity,
mpt.purchase_status as tracking_purchase_status,
mpt.confirmed_by as tracking_confirmed_by,
mpt.confirmed_at as tracking_confirmed_at,
-- 최종 분류 (구매 추적 정보 우선)
CASE
WHEN mpt.id IS NOT NULL THEN
CASE
WHEN mpt.description LIKE '%PIPE%' OR mpt.description LIKE '%파이프%' THEN 'PIPE'
WHEN mpt.description LIKE '%FITTING%' OR mpt.description LIKE '%피팅%' THEN 'FITTING'
WHEN mpt.description LIKE '%VALVE%' OR mpt.description LIKE '%밸브%' THEN 'VALVE'
WHEN mpt.description LIKE '%FLANGE%' OR mpt.description LIKE '%플랜지%' THEN 'FLANGE'
WHEN mpt.description LIKE '%BOLT%' OR mpt.description LIKE '%볼트%' THEN 'BOLT'
WHEN mpt.description LIKE '%GASKET%' OR mpt.description LIKE '%가스켓%' THEN 'GASKET'
WHEN mpt.description LIKE '%INSTRUMENT%' OR mpt.description LIKE '%계기%' THEN 'INSTRUMENT'
ELSE m.classified_category
END
ELSE m.classified_category
END as final_classified_category,
-- 검증 상태
CASE WHEN mpt.id IS NOT NULL THEN true ELSE m.is_verified END as final_is_verified,
CASE WHEN mpt.id IS NOT NULL THEN 'purchase_calculation' ELSE m.verified_by END as final_verified_by
FROM materials m
LEFT JOIN files f ON m.file_id = f.id
LEFT JOIN projects p ON f.project_id = p.id
LEFT JOIN pipe_details pd ON m.id = pd.material_id
LEFT JOIN fitting_details fd ON m.id = fd.material_id
LEFT JOIN flange_details fld ON m.id = fld.material_id
LEFT JOIN valve_details vd ON m.id = vd.material_id
LEFT JOIN bolt_details bd ON m.id = bd.material_id
LEFT JOIN gasket_details gd ON m.id = gd.material_id
LEFT JOIN material_purchase_tracking mpt ON (
m.material_hash = mpt.material_hash
AND f.job_no = mpt.job_no
AND f.revision = mpt.revision
)
WHERE {' AND '.join(where_conditions)}
ORDER BY m.line_number ASC, m.id ASC
LIMIT :limit OFFSET :offset
"""
try:
result = self.execute_query(query, params)
materials = [dict(row._mapping) for row in result.fetchall()]
# 총 개수 조회
count_query = f"""
SELECT COUNT(*) as total
FROM materials m
LEFT JOIN files f ON m.file_id = f.id
WHERE {' AND '.join(where_conditions[:-1])} -- LIMIT/OFFSET 제외
"""
count_result = self.execute_query(count_query, {k: v for k, v in params.items() if k not in ['limit', 'offset']})
total_count = count_result.scalar()
return {
"materials": materials,
"total_count": total_count,
"limit": limit,
"offset": offset
}
except Exception as e:
logger.error(f"Failed to get materials with details: {e}")
raise
def get_purchase_request_materials(
self,
job_no: str,
category: Optional[str] = None,
limit: int = 1000
) -> List[Dict]:
"""구매 신청 자재 조회"""
where_conditions = ["f.job_no = :job_no", "m.purchase_confirmed = true"]
params = {"job_no": job_no, "limit": limit}
if category and category != 'ALL':
where_conditions.append("m.classified_category = :category")
params["category"] = category
query = f"""
SELECT
m.id, m.original_description, m.classified_category,
m.material_grade, m.schedule, m.size_spec, m.main_nom, m.red_nom,
CAST(m.quantity AS INTEGER) as requested_quantity,
CAST(m.confirmed_quantity AS INTEGER) as original_quantity,
m.unit, m.drawing_name, m.line_no,
f.job_no, f.revision, f.bom_name
FROM materials m
JOIN files f ON m.file_id = f.id
WHERE {' AND '.join(where_conditions)}
ORDER BY m.classified_category, m.original_description
LIMIT :limit
"""
try:
result = self.execute_query(query, params)
return [dict(row._mapping) for row in result.fetchall()]
except Exception as e:
logger.error(f"Failed to get purchase request materials: {e}")
raise
def safe_delete_related_data(self, file_id: int) -> Dict[str, Any]:
"""파일 관련 데이터 안전 삭제"""
deletion_results = {}
# 삭제 순서 (외래키 제약 조건 고려)
deletion_queries = [
("support_details", "DELETE FROM support_details WHERE file_id = :file_id"),
("fitting_details", "DELETE FROM fitting_details WHERE material_id IN (SELECT id FROM materials WHERE file_id = :file_id)"),
("flange_details", "DELETE FROM flange_details WHERE material_id IN (SELECT id FROM materials WHERE file_id = :file_id)"),
("valve_details", "DELETE FROM valve_details WHERE material_id IN (SELECT id FROM materials WHERE file_id = :file_id)"),
("gasket_details", "DELETE FROM gasket_details WHERE material_id IN (SELECT id FROM materials WHERE file_id = :file_id)"),
("bolt_details", "DELETE FROM bolt_details WHERE material_id IN (SELECT id FROM materials WHERE file_id = :file_id)"),
("instrument_details", "DELETE FROM instrument_details WHERE material_id IN (SELECT id FROM materials WHERE file_id = :file_id)"),
("user_requirements", "DELETE FROM user_requirements WHERE file_id = :file_id"),
("purchase_request_items", "DELETE FROM purchase_request_items WHERE material_id IN (SELECT id FROM materials WHERE file_id = :file_id)"),
("purchase_requests", "DELETE FROM purchase_requests WHERE file_id = :file_id"),
("user_activity_logs", "DELETE FROM user_activity_logs WHERE target_id = :file_id AND target_type = 'file'"),
("exported_materials", """
DELETE FROM exported_materials
WHERE export_id IN (SELECT id FROM excel_exports WHERE file_id = :file_id)
"""),
("excel_exports", "DELETE FROM excel_exports WHERE file_id = :file_id"),
("materials", "DELETE FROM materials WHERE file_id = :file_id"),
]
for table_name, query in deletion_queries:
try:
result = self.execute_query(query, {"file_id": file_id})
deleted_count = result.rowcount
deletion_results[table_name] = {"success": True, "deleted_count": deleted_count}
logger.info(f"Deleted {deleted_count} records from {table_name}")
except Exception as e:
deletion_results[table_name] = {"success": False, "error": str(e)}
logger.warning(f"Failed to delete from {table_name}: {e}")
return deletion_results
def bulk_insert_materials(self, materials_data: List[Dict], file_id: int) -> int:
"""자재 데이터 대량 삽입"""
if not materials_data:
return 0
try:
# 대량 삽입을 위한 쿼리 준비
insert_query = """
INSERT INTO materials (
file_id, line_number, row_number, original_description,
classified_category, classified_subcategory, material_grade,
full_material_grade, schedule, size_spec, main_nom, red_nom,
quantity, unit, length, drawing_name, area_code, line_no,
classification_confidence, classification_details,
revision_status, material_hash, normalized_description,
created_at
) VALUES (
:file_id, :line_number, :row_number, :original_description,
:classified_category, :classified_subcategory, :material_grade,
:full_material_grade, :schedule, :size_spec, :main_nom, :red_nom,
:quantity, :unit, :length, :drawing_name, :area_code, :line_no,
:classification_confidence, :classification_details,
:revision_status, :material_hash, :normalized_description,
CURRENT_TIMESTAMP
)
"""
# 데이터 준비
insert_data = []
for material in materials_data:
insert_data.append({
"file_id": file_id,
"line_number": material.get("line_number"),
"row_number": material.get("row_number"),
"original_description": material.get("original_description", ""),
"classified_category": material.get("classified_category"),
"classified_subcategory": material.get("classified_subcategory"),
"material_grade": material.get("material_grade"),
"full_material_grade": material.get("full_material_grade"),
"schedule": material.get("schedule"),
"size_spec": material.get("size_spec"),
"main_nom": material.get("main_nom"),
"red_nom": material.get("red_nom"),
"quantity": material.get("quantity", 0),
"unit": material.get("unit", "EA"),
"length": material.get("length"),
"drawing_name": material.get("drawing_name"),
"area_code": material.get("area_code"),
"line_no": material.get("line_no"),
"classification_confidence": material.get("classification_confidence"),
"classification_details": material.get("classification_details"),
"revision_status": material.get("revision_status", "new"),
"material_hash": material.get("material_hash"),
"normalized_description": material.get("normalized_description"),
})
# 대량 삽입 실행
self.db.execute(text(insert_query), insert_data)
self.db.commit()
logger.info(f"Successfully inserted {len(insert_data)} materials")
return len(insert_data)
except Exception as e:
self.db.rollback()
logger.error(f"Failed to bulk insert materials: {e}")
raise
class MaterialQueryBuilder:
"""자재 쿼리 빌더"""
@staticmethod
def build_materials_query(
filters: Dict[str, Any] = None,
joins: List[str] = None,
order_by: str = "m.line_number ASC"
) -> str:
"""동적 자재 쿼리 생성"""
base_query = """
SELECT m.*, f.job_no, f.revision, f.bom_name
FROM materials m
LEFT JOIN files f ON m.file_id = f.id
"""
# 추가 조인
if joins:
for join in joins:
base_query += f" {join}"
# 필터 조건
where_conditions = ["1=1"]
if filters:
for key, value in filters.items():
if value is not None:
where_conditions.append(f"m.{key} = :{key}")
if where_conditions:
base_query += f" WHERE {' AND '.join(where_conditions)}"
# 정렬
if order_by:
base_query += f" ORDER BY {order_by}"
return base_query