Files
tk-factory-services/system3-nonconformance/api/services/auth_service.py
Hyungi Ahn 550633b89d feat: 3-System 분리 프로젝트 초기 코드 작성
TK-FB(공장관리+신고)와 M-Project(부적합관리)를 3개 독립 시스템으로
분리하기 위한 전체 코드 구조 작성.
- SSO 인증 서비스 (bcrypt + pbkdf2 이중 해시 지원)
- System 1: 공장관리 (TK-FB 기반, 신고 코드 제거)
- System 2: 신고 (TK-FB에서 workIssue 코드 추출)
- System 3: 부적합관리 (M-Project 기반)
- Gateway 포털 (path-based 라우팅)
- 통합 docker-compose.yml 및 배포 스크립트

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-09 14:40:11 +09:00

99 lines
3.5 KiB
Python

from datetime import datetime, timedelta
from typing import Optional
from jose import JWTError, jwt
from passlib.context import CryptContext
from sqlalchemy.orm import Session
import os
import bcrypt as bcrypt_lib
from database.models import User, UserRole
from database.schemas import TokenData
# 환경 변수 - SSO 공유 시크릿 사용 (docker-compose에서 SECRET_KEY=SSO_JWT_SECRET)
SECRET_KEY = os.getenv("SECRET_KEY", "your-secret-key-here")
ALGORITHM = os.getenv("ALGORITHM", "HS256")
ACCESS_TOKEN_EXPIRE_MINUTES = int(os.getenv("ACCESS_TOKEN_EXPIRE_MINUTES", "10080")) # 7 days
# 비밀번호 암호화 (pbkdf2_sha256 - 로컬 인증용)
pwd_context = CryptContext(
schemes=["pbkdf2_sha256"],
deprecated="auto"
)
def verify_password(plain_password: str, hashed_password: str) -> bool:
"""비밀번호 검증 - bcrypt + pbkdf2_sha256 둘 다 지원 (SSO 호환)"""
if not plain_password or not hashed_password:
return False
# bcrypt 형식 ($2b$ 또는 $2a$)
if hashed_password.startswith(("$2b$", "$2a$")):
try:
return bcrypt_lib.checkpw(
plain_password.encode('utf-8'),
hashed_password.encode('utf-8')
)
except Exception:
return False
# pbkdf2_sha256 형식 (passlib)
try:
return pwd_context.verify(plain_password, hashed_password)
except Exception:
return False
def get_password_hash(password: str) -> str:
"""비밀번호 해시 생성 (pbkdf2_sha256)"""
return pwd_context.hash(password)
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
def verify_token(token: str, credentials_exception):
"""JWT 토큰 검증 - SSO 토큰 페이로드 구조 지원"""
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
# SSO 토큰: "sub" 필드에 username
# 기존 M-Project 토큰: "sub" 필드에 username
username: str = payload.get("sub")
if username is None:
raise credentials_exception
token_data = TokenData(username=username)
return token_data
except JWTError:
raise credentials_exception
def authenticate_user(db: Session, username: str, password: str):
user = db.query(User).filter(User.username == username).first()
if not user:
return False
if not verify_password(password, user.hashed_password):
return False
return user
def create_admin_user(db: Session):
"""초기 관리자 계정 생성"""
admin_username = os.getenv("ADMIN_USERNAME", "hyungi")
admin_password = os.getenv("ADMIN_PASSWORD", "djg3-jj34-X3Q3")
existing_admin = db.query(User).filter(User.username == admin_username).first()
if not existing_admin:
admin_user = User(
username=admin_username,
hashed_password=get_password_hash(admin_password),
full_name="관리자",
role=UserRole.admin,
is_active=True
)
db.add(admin_user)
db.commit()
print(f"관리자 계정 생성됨: {admin_username}")
else:
print(f"관리자 계정이 이미 존재함: {admin_username}")