74876b674c
PR-Infra-Sec-1H Phase 0 audit 에서 DS jwt invalidation 정책 부재 확정. password rotation 으로 구 365d JWT (voice-memo-bot 등) invalidate 안 되는 hard gate STOP 진입 → 선행 PR 분리. - migration 269: users.password_changed_at timestamptz NULL (legacy 호환) - create_access_token / create_refresh_token: payload 에 iat (int 초) 추가 - verify_password_changed_at helper: int(password_changed_at.timestamp()) > int(iat) 시 401 - get_current_user + refresh_token route: verify helper 호출 - change_password / setup signup / seed_admin INSERT+UPDATE: password_changed_at 갱신 NULL = 검증 skip (migration 직후 운영 영향 0). 첫 password 변경 후만 iat 검증 활성. Sec-1H 의 G-token-old hard gate 통과 path 확보.
132 lines
4.6 KiB
Python
132 lines
4.6 KiB
Python
"""JWT + TOTP 2FA 인증"""
|
|
|
|
import os
|
|
from datetime import datetime, timedelta, timezone
|
|
from typing import Annotated
|
|
|
|
import bcrypt
|
|
import pyotp
|
|
from fastapi import Depends, HTTPException, status
|
|
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
|
|
from jose import JWTError, jwt
|
|
from sqlalchemy import select
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from core.config import settings
|
|
from core.database import get_session
|
|
|
|
security = HTTPBearer()
|
|
|
|
# JWT 설정
|
|
ALGORITHM = "HS256"
|
|
ACCESS_TOKEN_EXPIRE_MINUTES = 15
|
|
REFRESH_TOKEN_EXPIRE_DAYS = 7
|
|
|
|
|
|
def verify_password(plain: str, hashed: str) -> bool:
|
|
return bcrypt.checkpw(plain.encode(), hashed.encode())
|
|
|
|
|
|
def hash_password(password: str) -> str:
|
|
return bcrypt.hashpw(password.encode(), bcrypt.gensalt()).decode()
|
|
|
|
|
|
def create_access_token(subject: str, expires_minutes: int | None = None) -> str:
|
|
minutes = expires_minutes if expires_minutes is not None else ACCESS_TOKEN_EXPIRE_MINUTES
|
|
now = datetime.now(timezone.utc)
|
|
expire = now + timedelta(minutes=minutes)
|
|
payload = {"sub": subject, "exp": expire, "iat": int(now.timestamp()), "type": "access"}
|
|
return jwt.encode(payload, settings.jwt_secret, algorithm=ALGORITHM)
|
|
|
|
|
|
def create_voice_memo_bot_token(username: str) -> str | None:
|
|
# Voice Memo PoC v1 — bot 계정 한정 long-expiry access token (env gate + username hard-match).
|
|
# 일반 사용자 호출 시 None 반환. 정식 service-account/api_keys 는 Phase 2.
|
|
if os.getenv("VOICE_MEMO_BOT_TOKEN_ENABLED", "false").lower() != "true":
|
|
return None
|
|
bot_username = os.getenv("VOICE_MEMO_BOT_USERNAME", "voice-memo-bot")
|
|
if username != bot_username:
|
|
return None
|
|
expire_days = int(os.getenv("VOICE_MEMO_BOT_TOKEN_EXPIRE_DAYS", "365"))
|
|
return create_access_token(username, expires_minutes=expire_days * 24 * 60)
|
|
|
|
|
|
def create_refresh_token(subject: str) -> str:
|
|
now = datetime.now(timezone.utc)
|
|
expire = now + timedelta(days=REFRESH_TOKEN_EXPIRE_DAYS)
|
|
payload = {"sub": subject, "exp": expire, "iat": int(now.timestamp()), "type": "refresh"}
|
|
return jwt.encode(payload, settings.jwt_secret, algorithm=ALGORITHM)
|
|
|
|
|
|
def decode_token(token: str) -> dict | None:
|
|
try:
|
|
return jwt.decode(token, settings.jwt_secret, algorithms=[ALGORITHM])
|
|
except JWTError:
|
|
return None
|
|
|
|
|
|
|
|
|
|
def verify_password_changed_at(payload: dict, user) -> None:
|
|
# legacy 호환: password_changed_at NULL 이면 검증 skip (migration 전 발급 token 유지)
|
|
# password 변경 후 발급 token 만 검증 — iat (int 초) >= int(password_changed_at.timestamp())
|
|
if user.password_changed_at is None:
|
|
return
|
|
iat = payload.get("iat")
|
|
pwd_changed_int = int(user.password_changed_at.timestamp())
|
|
if iat is None or pwd_changed_int > int(iat):
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="비밀번호 변경 후 재로그인 필요",
|
|
)
|
|
|
|
def verify_totp(code: str, secret: str | None = None) -> bool:
|
|
"""TOTP 코드 검증 (유저별 secret 또는 글로벌 설정)"""
|
|
totp_secret = secret or settings.totp_secret
|
|
if not totp_secret:
|
|
return True # TOTP 미설정 시 스킵
|
|
totp = pyotp.TOTP(totp_secret)
|
|
return totp.verify(code)
|
|
|
|
|
|
async def get_current_user(
|
|
credentials: Annotated[HTTPAuthorizationCredentials, Depends(security)],
|
|
session: Annotated[AsyncSession, Depends(get_session)],
|
|
):
|
|
"""Bearer 토큰에서 현재 유저 조회"""
|
|
from models.user import User
|
|
|
|
payload = decode_token(credentials.credentials)
|
|
if not payload or payload.get("type") != "access":
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="유효하지 않은 토큰",
|
|
)
|
|
|
|
username = payload.get("sub")
|
|
result = await session.execute(
|
|
select(User).where(User.username == username, User.is_active.is_(True))
|
|
)
|
|
user = result.scalar_one_or_none()
|
|
if not user:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="유저를 찾을 수 없음",
|
|
)
|
|
verify_password_changed_at(payload, user)
|
|
return user
|
|
|
|
|
|
async def require_admin(
|
|
credentials: Annotated[HTTPAuthorizationCredentials, Depends(security)],
|
|
session: Annotated[AsyncSession, Depends(get_session)],
|
|
):
|
|
"""관리자 권한 확인 — 뉴스 소스 CRUD, 수집 트리거, digest 재생성 등"""
|
|
user = await get_current_user(credentials, session)
|
|
if not user.is_admin:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="관리자 권한 필요",
|
|
)
|
|
return user
|