feat(auth): JWT iat + users.password_changed_at invalidation (PR-Docsrv-JWT-Invalidation-1)
PR-Infra-Sec-1H Phase 0 audit 에서 DS jwt invalidation 정책 부재 확정. password rotation 으로 구 365d JWT (voice-memo-bot 등) invalidate 안 되는 hard gate STOP 진입 → 선행 PR 분리. - migration 269: users.password_changed_at timestamptz NULL (legacy 호환) - create_access_token / create_refresh_token: payload 에 iat (int 초) 추가 - verify_password_changed_at helper: int(password_changed_at.timestamp()) > int(iat) 시 401 - get_current_user + refresh_token route: verify helper 호출 - change_password / setup signup / seed_admin INSERT+UPDATE: password_changed_at 갱신 NULL = 검증 skip (migration 직후 운영 영향 0). 첫 password 변경 후만 iat 검증 활성. Sec-1H 의 G-token-old hard gate 통과 path 확보.
This commit is contained in:
@@ -19,6 +19,7 @@ from core.auth import (
|
||||
create_voice_memo_bot_token,
|
||||
decode_token,
|
||||
get_current_user,
|
||||
verify_password_changed_at,
|
||||
hash_password,
|
||||
verify_password,
|
||||
verify_totp,
|
||||
@@ -161,6 +162,7 @@ async def refresh_token(
|
||||
status_code=status.HTTP_401_UNAUTHORIZED,
|
||||
detail="유저를 찾을 수 없음",
|
||||
)
|
||||
verify_password_changed_at(payload, user)
|
||||
|
||||
# 새 refresh token → cookie
|
||||
_set_refresh_cookie(response, create_refresh_token(user.username))
|
||||
@@ -203,5 +205,6 @@ async def change_password(
|
||||
)
|
||||
|
||||
user.password_hash = hash_password(body.new_password)
|
||||
user.password_changed_at = datetime.now(timezone.utc)
|
||||
await session.commit()
|
||||
return {"message": "비밀번호가 변경되었습니다"}
|
||||
|
||||
@@ -8,6 +8,7 @@ from pathlib import Path
|
||||
from typing import Annotated
|
||||
|
||||
import pyotp
|
||||
from datetime import datetime, timezone
|
||||
from fastapi import APIRouter, Depends, HTTPException, Request, status
|
||||
from fastapi.responses import HTMLResponse
|
||||
from fastapi.templating import Jinja2Templates
|
||||
@@ -137,6 +138,7 @@ async def create_admin(
|
||||
username=body.username,
|
||||
password_hash=hash_password(body.password),
|
||||
is_active=True,
|
||||
password_changed_at=datetime.now(timezone.utc),
|
||||
)
|
||||
session.add(user)
|
||||
await session.commit()
|
||||
|
||||
+22
-4
@@ -33,8 +33,9 @@ def hash_password(password: str) -> str:
|
||||
|
||||
def create_access_token(subject: str, expires_minutes: int | None = None) -> str:
|
||||
minutes = expires_minutes if expires_minutes is not None else ACCESS_TOKEN_EXPIRE_MINUTES
|
||||
expire = datetime.now(timezone.utc) + timedelta(minutes=minutes)
|
||||
payload = {"sub": subject, "exp": expire, "type": "access"}
|
||||
now = datetime.now(timezone.utc)
|
||||
expire = now + timedelta(minutes=minutes)
|
||||
payload = {"sub": subject, "exp": expire, "iat": int(now.timestamp()), "type": "access"}
|
||||
return jwt.encode(payload, settings.jwt_secret, algorithm=ALGORITHM)
|
||||
|
||||
|
||||
@@ -51,8 +52,9 @@ def create_voice_memo_bot_token(username: str) -> str | None:
|
||||
|
||||
|
||||
def create_refresh_token(subject: str) -> str:
|
||||
expire = datetime.now(timezone.utc) + timedelta(days=REFRESH_TOKEN_EXPIRE_DAYS)
|
||||
payload = {"sub": subject, "exp": expire, "type": "refresh"}
|
||||
now = datetime.now(timezone.utc)
|
||||
expire = now + timedelta(days=REFRESH_TOKEN_EXPIRE_DAYS)
|
||||
payload = {"sub": subject, "exp": expire, "iat": int(now.timestamp()), "type": "refresh"}
|
||||
return jwt.encode(payload, settings.jwt_secret, algorithm=ALGORITHM)
|
||||
|
||||
|
||||
@@ -63,6 +65,21 @@ def decode_token(token: str) -> dict | None:
|
||||
return None
|
||||
|
||||
|
||||
|
||||
|
||||
def verify_password_changed_at(payload: dict, user) -> None:
|
||||
# legacy 호환: password_changed_at NULL 이면 검증 skip (migration 전 발급 token 유지)
|
||||
# password 변경 후 발급 token 만 검증 — iat (int 초) >= int(password_changed_at.timestamp())
|
||||
if user.password_changed_at is None:
|
||||
return
|
||||
iat = payload.get("iat")
|
||||
pwd_changed_int = int(user.password_changed_at.timestamp())
|
||||
if iat is None or pwd_changed_int > int(iat):
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_401_UNAUTHORIZED,
|
||||
detail="비밀번호 변경 후 재로그인 필요",
|
||||
)
|
||||
|
||||
def verify_totp(code: str, secret: str | None = None) -> bool:
|
||||
"""TOTP 코드 검증 (유저별 secret 또는 글로벌 설정)"""
|
||||
totp_secret = secret or settings.totp_secret
|
||||
@@ -96,6 +113,7 @@ async def get_current_user(
|
||||
status_code=status.HTTP_401_UNAUTHORIZED,
|
||||
detail="유저를 찾을 수 없음",
|
||||
)
|
||||
verify_password_changed_at(payload, user)
|
||||
return user
|
||||
|
||||
|
||||
|
||||
@@ -21,3 +21,4 @@ class User(Base):
|
||||
DateTime(timezone=True), default=datetime.now
|
||||
)
|
||||
last_login_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True))
|
||||
password_changed_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True))
|
||||
|
||||
@@ -0,0 +1,4 @@
|
||||
-- 2026-05-17 PR-Docsrv-JWT-Invalidation-1: users.password_changed_at 컬럼 추가.
|
||||
-- JWT iat (issued_at) claim 과 비교해 password 변경 시 구 access/refresh token 자동 invalidation.
|
||||
-- NULL = 검증 skip (legacy 호환). change-password / seed_admin / setup signup 시 now() 갱신.
|
||||
ALTER TABLE users ADD COLUMN IF NOT EXISTS password_changed_at timestamptz;
|
||||
@@ -55,14 +55,14 @@ async def seed_admin():
|
||||
if result.scalar_one_or_none():
|
||||
print(f"'{username}' 계정이 이미 존재합니다. 비밀번호를 업데이트합니다.")
|
||||
await session.execute(
|
||||
text("UPDATE users SET password_hash = :hash WHERE username = :username"),
|
||||
text("UPDATE users SET password_hash = :hash, password_changed_at = NOW() WHERE username = :username"),
|
||||
{"hash": password_hash, "username": username},
|
||||
)
|
||||
else:
|
||||
await session.execute(
|
||||
text(
|
||||
"INSERT INTO users (username, password_hash, is_active) "
|
||||
"VALUES (:username, :hash, TRUE)"
|
||||
"INSERT INTO users (username, password_hash, is_active, password_changed_at) "
|
||||
"VALUES (:username, :hash, TRUE, NOW())"
|
||||
),
|
||||
{"username": username, "hash": password_hash},
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user