Files
tk-factory-services/system3-nonconformance/api/services/auth_service.py
Hyungi Ahn 64e3c1227d refactor: 미사용 의존성 제거 + deprecated API 수정 + 정리
- SSO/System2: 미사용 redis, bcrypt, multer 의존성 제거
- System2: dbPool.js shim 삭제, workIssueModel을 config/database 직접 참조로 변경
- System3: deprecated datetime.utcnow() → datetime.now(timezone.utc)
- System3: deprecated @app.on_event("startup") → lifespan 패턴
- System3: 중복 /users 라우트 제거, 불필요 파일 삭제
- health-check.sh: tkuser API/Web 체크 추가
- tkuser nginx: upstream 이름 수정 (tk-system2-api → system2-api)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-25 08:35:11 +09:00

86 lines
3.1 KiB
Python

from datetime import datetime, timedelta, timezone
from typing import Optional
from jose import JWTError, jwt
from passlib.context import CryptContext
from sqlalchemy.orm import Session
import os
import bcrypt as bcrypt_lib
from database.models import User, UserRole
from database.schemas import TokenData # kept for compatibility
# 환경 변수 - SSO 공유 시크릿 사용 (docker-compose에서 SECRET_KEY=SSO_JWT_SECRET)
SECRET_KEY = os.getenv("SECRET_KEY", "your-secret-key-here")
ALGORITHM = os.getenv("ALGORITHM", "HS256")
ACCESS_TOKEN_EXPIRE_MINUTES = int(os.getenv("ACCESS_TOKEN_EXPIRE_MINUTES", "10080")) # 7 days
# 비밀번호 암호화 (pbkdf2_sha256 - 로컬 인증용)
pwd_context = CryptContext(
schemes=["pbkdf2_sha256"],
deprecated="auto"
)
def verify_password(plain_password: str, hashed_password: str) -> bool:
"""비밀번호 검증 - bcrypt + pbkdf2_sha256 둘 다 지원 (SSO 호환)"""
if not plain_password or not hashed_password:
return False
# bcrypt 형식 ($2b$ 또는 $2a$)
if hashed_password.startswith(("$2b$", "$2a$")):
try:
return bcrypt_lib.checkpw(
plain_password.encode('utf-8'),
hashed_password.encode('utf-8')
)
except Exception:
return False
# pbkdf2_sha256 형식 (passlib)
try:
return pwd_context.verify(plain_password, hashed_password)
except Exception:
return False
def get_password_hash(password: str) -> str:
"""비밀번호 해시 생성 (pbkdf2_sha256)"""
return pwd_context.hash(password)
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.now(timezone.utc) + expires_delta
else:
expire = datetime.now(timezone.utc) + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
def verify_token(token: str, credentials_exception):
"""JWT 토큰 검증 - SSO 토큰 전체 페이로드 반환"""
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
return payload
except JWTError:
raise credentials_exception
def authenticate_user(db: Session, username: str, password: str):
user = db.query(User).filter(User.username == username).first()
if not user:
return False
if not verify_password(password, user.hashed_password):
return False
return user
def create_admin_user(db: Session):
"""관리자 계정 확인 (SSO에서 관리, 여기서는 조회만)"""
admin_username = os.getenv("ADMIN_USERNAME", "hyungi")
existing_admin = db.query(User).filter(User.username == admin_username).first()
if existing_admin:
print(f"관리자 계정 확인됨: {admin_username} (role: {existing_admin.role.value})")
else:
print(f"경고: 관리자 계정이 sso_users에 없습니다: {admin_username}")