Files
TK-BOM-Project/backend/app/auth/auth_controller.py
Hyungi Ahn e27020ae9b feat: 구매신청 기능 완성 및 SUPPORT/SPECIAL 카테고리 개선
- 모든 카테고리 구매신청 기능 완성 (PIPE, FITTING, VALVE, FLANGE, GASKET, BOLT, SUPPORT, SPECIAL, UNKNOWN)
- 구매신청 완료 항목: 회색 배경, 체크박스 비활성화, '구매신청완료' 배지 표시
- 전체 선택/구매신청 시 이미 구매신청된 항목 자동 제외
- 구매신청 quantity 타입 에러 수정 (문자열 -> 정수 변환)

SUPPORT 카테고리 (구 U-BOLT):
- U-BOLT -> SUPPORT로 카테고리명 변경
- 클램프, 유볼트, 우레탄블럭슈 분류 개선
- 테이블 헤더: 선택-종류-타입-크기-디스크립션-추가요구-사용자요구-수량
- 크기 정보 main_nom 필드에서 가져오기 (배관 인치)
- 엑셀 내보내기 형식 조정

SPECIAL 카테고리:
- SPECIAL 키워드 자재 자동 분류 (SPECIFICATION 제외)
- 파일 업로드 시 SPECIAL 카테고리 처리 로직 추가
- 도면번호 필드 추가 (drawing_name, line_no)
- 타입 필드: 크기/스케줄/재질 제외한 핵심 정보 표시
- 엑셀 DWG_NAME, LINE_NUM 컬럼 파싱 및 저장

FITTING 카테고리:
- 테이블 컬럼 너비 조정 (선택 2%, 종류 8.5%, 수량 12%)

구매신청 관리:
- 엑셀 재다운로드 형식 개선 (BOM 페이지와 동일한 형식)
- 그룹화된 자재 정보 포함하여 저장 및 다운로드
2025-10-14 12:39:25 +09:00

1163 lines
36 KiB
Python

"""
인증 컨트롤러
TK-FB-Project의 authController.js를 참고하여 FastAPI용으로 구현
"""
from fastapi import APIRouter, Depends, HTTPException, status, Request
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from sqlalchemy.orm import Session
from pydantic import BaseModel, EmailStr, validator
from typing import Optional, List, Dict, Any
from ..database import get_db
from .auth_service import get_auth_service
from .jwt_service import jwt_service
from .models import UserRepository
from ..utils.logger import get_logger
logger = get_logger(__name__)
router = APIRouter()
security = HTTPBearer()
# Pydantic 모델들
class LoginRequest(BaseModel):
username: str
password: str
class RegisterRequest(BaseModel):
username: str
password: str
name: str
email: Optional[str] = None
access_level: str = 'worker'
department: Optional[str] = None
position: Optional[str] = None
phone: Optional[str] = None
role: str = "user"
@validator('email', pre=True)
def validate_email(cls, v):
if v == '' or v is None:
return None
# 간단한 이메일 형식 검증
if '@' not in v or '.' not in v.split('@')[-1]:
raise ValueError('올바른 이메일 형식을 입력해주세요')
return v
class RefreshTokenRequest(BaseModel):
refresh_token: str
class SignupRequest(BaseModel):
username: str
password: str
name: str
email: Optional[str] = None
department: Optional[str] = None
position: Optional[str] = None
phone: Optional[str] = None
reason: Optional[str] = None # 가입 사유
class LoginResponse(BaseModel):
success: bool
access_token: str
refresh_token: str
token_type: str
expires_in: int
user: Dict[str, Any]
redirect_url: str
permissions: List[str]
class RefreshTokenResponse(BaseModel):
success: bool
access_token: str
token_type: str
expires_in: int
user: Dict[str, Any]
class RegisterResponse(BaseModel):
success: bool
message: str
user_id: int
username: str
class LogoutResponse(BaseModel):
success: bool
message: str
class UserInfoResponse(BaseModel):
success: bool
user: Dict[str, Any]
permissions: List[str]
@router.post("/login", response_model=LoginResponse)
async def login(
login_data: LoginRequest,
request: Request,
db: Session = Depends(get_db)
):
"""
사용자 로그인
Args:
login_data: 로그인 정보 (사용자명, 비밀번호)
request: FastAPI Request 객체
db: 데이터베이스 세션
Returns:
LoginResponse: 로그인 결과 (토큰, 사용자 정보 등)
"""
try:
auth_service = get_auth_service(db)
result = await auth_service.login(
username=login_data.username,
password=login_data.password,
request=request
)
return LoginResponse(**result)
except Exception as e:
logger.error(f"Login endpoint error: {str(e)}")
raise
@router.post("/register", response_model=RegisterResponse)
async def register(
register_data: RegisterRequest,
credentials: HTTPAuthorizationCredentials = Depends(security),
db: Session = Depends(get_db)
):
"""
사용자 등록 (시스템 관리자만 가능)
Args:
register_data: 등록 정보
credentials: JWT 토큰 (시스템 관리자 권한 필요)
db: 데이터베이스 세션
Returns:
RegisterResponse: 등록 결과
"""
try:
# 토큰 검증 및 권한 확인
from .jwt_service import jwt_service
payload = jwt_service.verify_access_token(credentials.credentials)
# 시스템 관리자 권한 확인
if payload['role'] != 'system':
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="계정 생성은 시스템 관리자만 가능합니다"
)
auth_service = get_auth_service(db)
result = await auth_service.register(register_data.dict())
logger.info(f"User registered by system admin: {register_data.username} (created by: {payload['username']})")
return RegisterResponse(**result)
except HTTPException:
raise
except Exception as e:
logger.error(f"Register endpoint error: {str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="사용자 등록 중 오류가 발생했습니다"
)
@router.post("/refresh", response_model=RefreshTokenResponse)
async def refresh_token(
refresh_data: RefreshTokenRequest,
request: Request,
db: Session = Depends(get_db)
):
"""
토큰 갱신
Args:
refresh_data: 리프레시 토큰 정보
request: FastAPI Request 객체
db: 데이터베이스 세션
Returns:
RefreshTokenResponse: 새로운 토큰 정보
"""
try:
auth_service = get_auth_service(db)
result = await auth_service.refresh_token(
refresh_token=refresh_data.refresh_token,
request=request
)
return RefreshTokenResponse(**result)
except Exception as e:
logger.error(f"Refresh token endpoint error: {str(e)}")
raise
@router.post("/logout", response_model=LogoutResponse)
async def logout(
refresh_data: RefreshTokenRequest,
db: Session = Depends(get_db)
):
"""
로그아웃
Args:
refresh_data: 리프레시 토큰 정보
db: 데이터베이스 세션
Returns:
LogoutResponse: 로그아웃 결과
"""
try:
auth_service = get_auth_service(db)
result = await auth_service.logout(refresh_data.refresh_token)
return LogoutResponse(**result)
except Exception as e:
logger.error(f"Logout endpoint error: {str(e)}")
raise
@router.get("/me", response_model=UserInfoResponse)
async def get_current_user_info(
credentials: HTTPAuthorizationCredentials = Depends(security),
db: Session = Depends(get_db)
):
"""
현재 사용자 정보 조회
Args:
credentials: JWT 토큰
db: 데이터베이스 세션
Returns:
UserInfoResponse: 사용자 정보 및 권한
"""
try:
# 토큰 검증
payload = jwt_service.verify_access_token(credentials.credentials)
user_id = payload['user_id']
# 사용자 정보 조회
user_repo = UserRepository(db)
user = user_repo.find_by_id(user_id)
if not user or not user.is_active:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="사용자를 찾을 수 없거나 비활성화된 계정입니다"
)
# 권한 정보 조회
permissions = user_repo.get_user_permissions(user.role)
return UserInfoResponse(
success=True,
user=user.to_dict(),
permissions=permissions
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Get current user info error: {str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="사용자 정보 조회 중 오류가 발생했습니다"
)
@router.get("/verify")
async def verify_token(
credentials: HTTPAuthorizationCredentials = Depends(security)
):
"""
토큰 검증
Args:
credentials: JWT 토큰
Returns:
Dict: 토큰 검증 결과
"""
try:
payload = jwt_service.verify_access_token(credentials.credentials)
return {
'success': True,
'valid': True,
'user_id': payload['user_id'],
'username': payload['username'],
'role': payload['role'],
'expires_at': payload.get('exp')
}
except HTTPException as e:
return {
'success': False,
'valid': False,
'error': e.detail
}
except Exception as e:
logger.error(f"Token verification error: {str(e)}")
return {
'success': False,
'valid': False,
'error': '토큰 검증 중 오류가 발생했습니다'
}
# 관리자 전용 엔드포인트들
@router.get("/users/suspended")
async def get_suspended_users(
credentials: HTTPAuthorizationCredentials = Depends(security),
db: Session = Depends(get_db)
):
"""
정지된 사용자 목록 조회 (관리자 전용)
Args:
credentials: JWT 토큰
db: 데이터베이스 세션
Returns:
Dict: 정지된 사용자 목록
"""
try:
# 토큰 검증 및 권한 확인
payload = jwt_service.verify_access_token(credentials.credentials)
if payload['role'] not in ['admin', 'system']:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="관리자 이상의 권한이 필요합니다"
)
# 정지된 사용자 조회
from sqlalchemy import text
query = text("""
SELECT
user_id, username, name, email, role, department, position,
phone, status, created_at, updated_at
FROM users
WHERE status = 'suspended'
ORDER BY updated_at DESC
""")
results = db.execute(query).fetchall()
suspended_users = []
for row in results:
suspended_users.append({
"user_id": row.user_id,
"username": row.username,
"name": row.name,
"email": row.email,
"role": row.role,
"department": row.department,
"position": row.position,
"phone": row.phone,
"status": row.status,
"created_at": row.created_at.isoformat() if row.created_at else None,
"updated_at": row.updated_at.isoformat() if row.updated_at else None
})
return {
"success": True,
"users": suspended_users,
"count": len(suspended_users)
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Failed to get suspended users: {str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="정지된 사용자 목록 조회 중 오류가 발생했습니다"
)
@router.get("/users")
async def get_all_users(
skip: int = 0,
limit: int = 100,
credentials: HTTPAuthorizationCredentials = Depends(security),
db: Session = Depends(get_db)
):
"""
모든 사용자 목록 조회 (관리자 전용)
Args:
skip: 건너뛸 레코드 수
limit: 조회할 레코드 수
credentials: JWT 토큰
db: 데이터베이스 세션
Returns:
Dict: 사용자 목록
"""
try:
# 토큰 검증 및 권한 확인
payload = jwt_service.verify_access_token(credentials.credentials)
if payload['role'] not in ['admin', 'system']:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="관리자 이상의 권한이 필요합니다"
)
# 사용자 목록 조회
user_repo = UserRepository(db)
users = user_repo.get_all_users(skip=skip, limit=limit)
return {
'success': True,
'users': [user.to_dict() for user in users],
'total_count': len(users)
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Get all users error: {str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="사용자 목록 조회 중 오류가 발생했습니다"
)
@router.patch("/users/{user_id}/suspend")
async def suspend_user(
user_id: int,
credentials: HTTPAuthorizationCredentials = Depends(security),
db: Session = Depends(get_db)
):
"""
사용자 정지 (관리자 전용)
Args:
user_id: 정지할 사용자 ID
credentials: JWT 토큰
db: 데이터베이스 세션
Returns:
Dict: 정지 결과
"""
try:
# 토큰 검증 및 권한 확인
payload = jwt_service.verify_access_token(credentials.credentials)
if payload['role'] not in ['system', 'admin']:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="관리자만 사용자를 정지할 수 있습니다"
)
# 자기 자신 정지 방지
if payload['user_id'] == user_id:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="자기 자신은 정지할 수 없습니다"
)
# 사용자 정지
from sqlalchemy import text
update_query = text("""
UPDATE users
SET status = 'suspended',
is_active = FALSE,
updated_at = CURRENT_TIMESTAMP
WHERE user_id = :user_id AND status = 'active'
RETURNING user_id, username, name, status
""")
result = db.execute(update_query, {"user_id": user_id}).fetchone()
db.commit()
if not result:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="사용자를 찾을 수 없거나 이미 정지된 상태입니다"
)
logger.info(f"User {result.username} suspended by {payload['username']}")
return {
"success": True,
"message": f"{result.name} 사용자가 정지되었습니다",
"user": {
"user_id": result.user_id,
"username": result.username,
"name": result.name,
"status": result.status
}
}
except HTTPException:
raise
except Exception as e:
db.rollback()
logger.error(f"Failed to suspend user {user_id}: {str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"사용자 정지 중 오류가 발생했습니다: {str(e)}"
)
@router.patch("/users/{user_id}/reactivate")
async def reactivate_user(
user_id: int,
credentials: HTTPAuthorizationCredentials = Depends(security),
db: Session = Depends(get_db)
):
"""
사용자 재활성화 (관리자 전용)
Args:
user_id: 재활성화할 사용자 ID
credentials: JWT 토큰
db: 데이터베이스 세션
Returns:
Dict: 재활성화 결과
"""
try:
# 토큰 검증 및 권한 확인
payload = jwt_service.verify_access_token(credentials.credentials)
if payload['role'] not in ['system', 'admin']:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="관리자만 사용자를 재활성화할 수 있습니다"
)
# 사용자 재활성화
from sqlalchemy import text
update_query = text("""
UPDATE users
SET status = 'active',
is_active = TRUE,
failed_login_attempts = 0,
locked_until = NULL,
updated_at = CURRENT_TIMESTAMP
WHERE user_id = :user_id AND status = 'suspended'
RETURNING user_id, username, name, status
""")
result = db.execute(update_query, {"user_id": user_id}).fetchone()
db.commit()
if not result:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="사용자를 찾을 수 없거나 정지 상태가 아닙니다"
)
logger.info(f"User {result.username} reactivated by {payload['username']}")
return {
"success": True,
"message": f"{result.name} 사용자가 재활성화되었습니다",
"user": {
"user_id": result.user_id,
"username": result.username,
"name": result.name,
"status": result.status
}
}
except HTTPException:
raise
except Exception as e:
db.rollback()
logger.error(f"Failed to reactivate user {user_id}: {str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"사용자 재활성화 중 오류가 발생했습니다: {str(e)}"
)
@router.delete("/users/{user_id}")
async def delete_user(
user_id: int,
credentials: HTTPAuthorizationCredentials = Depends(security),
db: Session = Depends(get_db)
):
"""
사용자 삭제 (관리자 전용)
Args:
user_id: 삭제할 사용자 ID
credentials: JWT 토큰
db: 데이터베이스 세션
Returns:
Dict: 삭제 결과
"""
try:
# 토큰 검증 및 권한 확인
payload = jwt_service.verify_access_token(credentials.credentials)
# admin role도 사용자 삭제 가능하도록 수정
if payload['role'] not in ['system', 'admin']:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="사용자 삭제는 관리자만 가능합니다"
)
# 자기 자신 삭제 방지
if payload['user_id'] == user_id:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="자기 자신은 삭제할 수 없습니다"
)
# BOM 데이터 존재 여부 확인
from sqlalchemy import text
# files 테이블에서 uploaded_by가 이 사용자인 레코드 확인
check_files = text("""
SELECT COUNT(*) as count
FROM files
WHERE uploaded_by = :user_id
""")
files_result = db.execute(check_files, {"user_id": user_id}).fetchone()
has_files = files_result.count > 0 if files_result else False
# user_requirements 테이블 확인
check_requirements = text("""
SELECT COUNT(*) as count
FROM user_requirements
WHERE created_by = :user_id
""")
requirements_result = db.execute(check_requirements, {"user_id": user_id}).fetchone()
has_requirements = requirements_result.count > 0 if requirements_result else False
has_bom_data = has_files or has_requirements
# 사용자 조회
user_repo = UserRepository(db)
user = user_repo.find_by_id(user_id)
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="해당 사용자를 찾을 수 없습니다"
)
if has_bom_data:
# BOM 데이터가 있으면 소프트 삭제 (status='deleted')
soft_delete = text("""
UPDATE users
SET status = 'deleted',
is_active = FALSE,
updated_at = CURRENT_TIMESTAMP
WHERE user_id = :user_id
RETURNING username, name
""")
result = db.execute(soft_delete, {"user_id": user_id}).fetchone()
db.commit()
logger.info(f"User soft-deleted (has BOM data): {result.username} (deleted by: {payload['username']})")
return {
'success': True,
'message': f'{result.name} 사용자가 비활성화되었습니다 (BOM 데이터 보존)',
'soft_deleted': True,
'deleted_user_id': user_id
}
else:
# BOM 데이터가 없으면 완전 삭제
user_repo.delete_user(user)
logger.info(f"User hard-deleted (no BOM data): {user.username} (deleted by: {payload['username']})")
return {
'success': True,
'message': '사용자가 완전히 삭제되었습니다',
'soft_deleted': False,
'deleted_user_id': user_id
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Delete user error: {str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="사용자 삭제 중 오류가 발생했습니다"
)
# 로그 관리 API (관리자 이상)
@router.get("/logs/login")
async def get_login_logs(
skip: int = 0,
limit: int = 100,
user_id: Optional[int] = None,
status: Optional[str] = None,
credentials: HTTPAuthorizationCredentials = Depends(security),
db: Session = Depends(get_db)
):
"""
로그인 로그 조회 (관리자 이상)
Args:
skip: 건너뛸 레코드 수
limit: 조회할 레코드 수
user_id: 특정 사용자 ID 필터
status: 로그인 상태 필터 (success/failed)
credentials: JWT 토큰
db: 데이터베이스 세션
Returns:
Dict: 로그인 로그 목록
"""
try:
# 토큰 검증 및 권한 확인
from .jwt_service import jwt_service
payload = jwt_service.verify_access_token(credentials.credentials)
if payload['role'] not in ['admin', 'system']:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="로그 조회는 관리자 이상의 권한이 필요합니다"
)
# 로그인 로그 조회
from .models import LoginLog, User
query = db.query(LoginLog).join(User)
if user_id:
query = query.filter(LoginLog.user_id == user_id)
if status:
query = query.filter(LoginLog.login_status == status)
logs = query.order_by(LoginLog.login_time.desc()).offset(skip).limit(limit).all()
return {
'success': True,
'logs': [
{
'log_id': log.log_id,
'user_id': log.user_id,
'username': log.user.username,
'name': log.user.name,
'login_time': log.login_time,
'ip_address': log.ip_address,
'user_agent': log.user_agent,
'login_status': log.login_status,
'failure_reason': log.failure_reason
}
for log in logs
],
'total_count': len(logs)
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Get login logs error: {str(e)}")
raise HTTPException(
status_code=500,
detail="로그인 로그 조회 중 오류가 발생했습니다"
)
@router.get("/logs/system")
async def get_system_logs(
skip: int = 0,
limit: int = 100,
level: Optional[str] = None,
credentials: HTTPAuthorizationCredentials = Depends(security),
db: Session = Depends(get_db)
):
"""
시스템 로그 조회 (관리자 이상)
Args:
skip: 건너뛸 레코드 수
limit: 조회할 레코드 수
level: 로그 레벨 필터 (ERROR, WARNING, INFO, DEBUG)
credentials: JWT 토큰
db: 데이터베이스 세션
Returns:
Dict: 시스템 로그 목록
"""
try:
# 토큰 검증 및 권한 확인
from .jwt_service import jwt_service
payload = jwt_service.verify_access_token(credentials.credentials)
if payload['role'] not in ['admin', 'system']:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="시스템 로그 조회는 관리자 이상의 권한이 필요합니다"
)
# 로그 파일에서 최근 로그 읽기 (임시 구현)
import os
from ..config import get_settings
settings = get_settings()
log_file_path = settings.logging.file_path
logs = []
if os.path.exists(log_file_path):
try:
with open(log_file_path, 'r', encoding='utf-8') as f:
lines = f.readlines()
# 최근 로그부터 처리
recent_lines = lines[-limit-skip:] if len(lines) > skip else lines
for line in reversed(recent_lines):
if line.strip():
# 간단한 로그 파싱 (실제로는 더 정교한 파싱 필요)
parts = line.strip().split(' - ')
if len(parts) >= 4:
timestamp = parts[0]
module = parts[1]
log_level = parts[2]
message = ' - '.join(parts[3:])
if not level or log_level == level:
logs.append({
'timestamp': timestamp,
'module': module,
'level': log_level,
'message': message
})
if len(logs) >= limit:
break
except Exception as e:
logger.error(f"Failed to read log file: {str(e)}")
return {
'success': True,
'logs': logs,
'total_count': len(logs)
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Get system logs error: {str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="시스템 로그 조회 중 오류가 발생했습니다"
)
# 사용자 역할 변경 API (시스템 관리자만)
@router.put("/users/{user_id}/role")
async def change_user_role(
user_id: int,
new_role: str,
credentials: HTTPAuthorizationCredentials = Depends(security),
db: Session = Depends(get_db)
):
"""
사용자 역할 변경 (시스템 관리자만)
Args:
user_id: 변경할 사용자 ID
new_role: 새로운 역할 (system, admin, user)
credentials: JWT 토큰
db: 데이터베이스 세션
Returns:
Dict: 변경 결과
"""
try:
# 토큰 검증 및 권한 확인
from .jwt_service import jwt_service
payload = jwt_service.verify_access_token(credentials.credentials)
if payload['role'] != 'system':
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="사용자 역할 변경은 시스템 관리자만 가능합니다"
)
# 유효한 역할인지 확인
if new_role not in ['system', 'admin', 'user']:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="유효하지 않은 역할입니다. (system, admin, user 중 선택)"
)
# 자기 자신의 역할 변경 방지
if payload['user_id'] == user_id:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="자기 자신의 역할은 변경할 수 없습니다"
)
# 사용자 조회 및 역할 변경
from .models import UserRepository
user_repo = UserRepository(db)
user = user_repo.find_by_id(user_id)
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="해당 사용자를 찾을 수 없습니다"
)
old_role = user.role
user.role = new_role
user_repo.update_user(user)
logger.info(f"User role changed: {user.username} ({old_role}{new_role}) by {payload['username']}")
return {
'success': True,
'message': f'사용자 역할이 변경되었습니다: {old_role}{new_role}',
'user_id': user_id,
'old_role': old_role,
'new_role': new_role
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Change user role error: {str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="사용자 역할 변경 중 오류가 발생했습니다"
)
# 프론트엔드 오류 로그 수집 API
@router.post("/logs/frontend-error")
async def log_frontend_error(
error_data: dict,
request: Request,
db: Session = Depends(get_db)
):
"""
프론트엔드 오류 로그 수집
Args:
error_data: 프론트엔드에서 전송한 오류 데이터
request: FastAPI Request 객체
db: 데이터베이스 세션
Returns:
Dict: 로그 저장 결과
"""
try:
from datetime import datetime
# 클라이언트 정보 추가
client_ip = request.client.host
user_agent = request.headers.get("user-agent", "")
# 오류 데이터 보강
enhanced_error_data = {
**error_data,
'client_ip': client_ip,
'server_timestamp': datetime.utcnow().isoformat(),
'user_agent': user_agent
}
# 로그로 기록
logger.error(f"Frontend Error: {error_data.get('type', 'unknown')} - {error_data.get('message', 'No message')}",
extra={'frontend_error': enhanced_error_data})
# 데이터베이스에 저장 (선택적)
# TODO: 필요시 frontend_errors 테이블 생성하여 저장
return {
'success': True,
'message': '오류가 기록되었습니다',
'timestamp': enhanced_error_data['server_timestamp']
}
except Exception as e:
logger.error(f"Failed to log frontend error: {str(e)}")
return {
'success': False,
'message': '오류 기록에 실패했습니다'
}
# 프로필 업데이트 API
class ProfileUpdateRequest(BaseModel):
name: str
email: Optional[EmailStr] = None
department: Optional[str] = None
position: Optional[str] = None
@router.put("/profile")
async def update_profile(
profile_data: ProfileUpdateRequest,
credentials: HTTPAuthorizationCredentials = Depends(security),
db: Session = Depends(get_db)
):
"""
사용자 프로필 업데이트
Args:
profile_data: 업데이트할 프로필 정보
credentials: JWT 토큰
db: 데이터베이스 세션
Returns:
Dict: 업데이트 결과
"""
try:
# 토큰 검증
from .jwt_service import jwt_service
payload = jwt_service.verify_access_token(credentials.credentials)
user_id = payload['user_id']
# 사용자 조회
from .models import UserRepository
user_repo = UserRepository(db)
user = user_repo.find_by_id(user_id)
if not user or not user.is_active:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="사용자를 찾을 수 없습니다"
)
# 이메일 중복 확인 (다른 사용자가 사용 중인지)
if profile_data.email and profile_data.email != user.email:
existing_email = user_repo.find_by_email(profile_data.email)
if existing_email and existing_email.user_id != user_id:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="이미 사용 중인 이메일입니다"
)
# 프로필 업데이트
user.name = profile_data.name
user.email = profile_data.email
user.department = profile_data.department
user.position = profile_data.position
user_repo.update_user(user)
logger.info(f"Profile updated for user: {user.username}")
return {
'success': True,
'message': '프로필이 업데이트되었습니다',
'user': user.to_dict()
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Profile update error: {str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="프로필 업데이트 중 오류가 발생했습니다"
)
# 비밀번호 변경 API
class ChangePasswordRequest(BaseModel):
current_password: str
new_password: str
@router.put("/change-password")
async def change_password(
password_data: ChangePasswordRequest,
credentials: HTTPAuthorizationCredentials = Depends(security),
db: Session = Depends(get_db)
):
"""
사용자 비밀번호 변경
Args:
password_data: 현재 비밀번호와 새 비밀번호
credentials: JWT 토큰
db: 데이터베이스 세션
Returns:
Dict: 변경 결과
"""
try:
# 토큰 검증
from .jwt_service import jwt_service
payload = jwt_service.verify_access_token(credentials.credentials)
user_id = payload['user_id']
# 사용자 조회
from .models import UserRepository
user_repo = UserRepository(db)
user = user_repo.find_by_id(user_id)
if not user or not user.is_active:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="사용자를 찾을 수 없습니다"
)
# 현재 비밀번호 확인
if not user.check_password(password_data.current_password):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="현재 비밀번호가 올바르지 않습니다"
)
# 새 비밀번호 유효성 검사
if len(password_data.new_password) < 8:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="새 비밀번호는 8자 이상이어야 합니다"
)
# 비밀번호 변경
user.set_password(password_data.new_password)
user_repo.update_user(user)
logger.info(f"Password changed for user: {user.username}")
return {
'success': True,
'message': '비밀번호가 변경되었습니다'
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Password change error: {str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="비밀번호 변경 중 오류가 발생했습니다"
)