Nginx terminates TLS and forwards HTTP internally. Secure=True cookies don't get sent when the backend sees HTTP connections. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
202 lines
5.7 KiB
Python
202 lines
5.7 KiB
Python
"""인증 API — 로그인, 토큰 갱신, TOTP 검증
|
|
|
|
access token: 응답 body (프론트에서 메모리 보관)
|
|
refresh token: HttpOnly cookie (XSS 방어)
|
|
"""
|
|
|
|
from datetime import datetime, timezone
|
|
from typing import Annotated
|
|
|
|
from fastapi import APIRouter, Cookie, Depends, HTTPException, Request, Response, status
|
|
from pydantic import BaseModel
|
|
from sqlalchemy import select
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from core.auth import (
|
|
REFRESH_TOKEN_EXPIRE_DAYS,
|
|
create_access_token,
|
|
create_refresh_token,
|
|
decode_token,
|
|
get_current_user,
|
|
hash_password,
|
|
verify_password,
|
|
verify_totp,
|
|
)
|
|
from core.database import get_session
|
|
from models.user import User
|
|
|
|
router = APIRouter()
|
|
|
|
|
|
# ─── 요청/응답 스키마 ───
|
|
|
|
|
|
class LoginRequest(BaseModel):
|
|
username: str
|
|
password: str
|
|
totp_code: str | None = None
|
|
|
|
|
|
class AccessTokenResponse(BaseModel):
|
|
access_token: str
|
|
token_type: str = "bearer"
|
|
|
|
|
|
class ChangePasswordRequest(BaseModel):
|
|
current_password: str
|
|
new_password: str
|
|
|
|
|
|
class UserResponse(BaseModel):
|
|
id: int
|
|
username: str
|
|
is_active: bool
|
|
totp_enabled: bool
|
|
last_login_at: datetime | None
|
|
|
|
class Config:
|
|
from_attributes = True
|
|
|
|
|
|
# ─── 헬퍼 ───
|
|
|
|
def _set_refresh_cookie(response: Response, token: str):
|
|
"""refresh token을 HttpOnly cookie로 설정"""
|
|
response.set_cookie(
|
|
key="refresh_token",
|
|
value=token,
|
|
httponly=True,
|
|
secure=False, # Nginx가 TLS 종료, 내부 트래픽은 HTTP
|
|
samesite="lax",
|
|
max_age=REFRESH_TOKEN_EXPIRE_DAYS * 86400,
|
|
path="/api/auth",
|
|
)
|
|
|
|
|
|
# ─── 엔드포인트 ───
|
|
|
|
|
|
@router.post("/login", response_model=AccessTokenResponse)
|
|
async def login(
|
|
body: LoginRequest,
|
|
response: Response,
|
|
session: Annotated[AsyncSession, Depends(get_session)],
|
|
):
|
|
"""로그인 → access token(body) + refresh token(cookie)"""
|
|
result = await session.execute(
|
|
select(User).where(User.username == body.username)
|
|
)
|
|
user = result.scalar_one_or_none()
|
|
|
|
if not user or not verify_password(body.password, user.password_hash):
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="아이디 또는 비밀번호가 올바르지 않습니다",
|
|
)
|
|
|
|
if not user.is_active:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="비활성화된 계정입니다",
|
|
)
|
|
|
|
# TOTP 검증 (설정된 경우)
|
|
if user.totp_secret:
|
|
if not body.totp_code:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="TOTP 코드가 필요합니다",
|
|
)
|
|
if not verify_totp(body.totp_code, user.totp_secret):
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="TOTP 코드가 올바르지 않습니다",
|
|
)
|
|
|
|
# 마지막 로그인 시간 업데이트
|
|
user.last_login_at = datetime.now(timezone.utc)
|
|
await session.commit()
|
|
|
|
# refresh token → HttpOnly cookie
|
|
_set_refresh_cookie(response, create_refresh_token(user.username))
|
|
|
|
return AccessTokenResponse(
|
|
access_token=create_access_token(user.username),
|
|
)
|
|
|
|
|
|
@router.post("/refresh", response_model=AccessTokenResponse)
|
|
async def refresh_token(
|
|
response: Response,
|
|
session: Annotated[AsyncSession, Depends(get_session)],
|
|
refresh_token: str | None = Cookie(None),
|
|
):
|
|
"""cookie의 refresh token으로 새 토큰 쌍 발급"""
|
|
if not refresh_token:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="리프레시 토큰이 없습니다",
|
|
)
|
|
|
|
payload = decode_token(refresh_token)
|
|
if not payload or payload.get("type") != "refresh":
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="유효하지 않은 리프레시 토큰",
|
|
)
|
|
|
|
username = payload.get("sub")
|
|
result = await session.execute(
|
|
select(User).where(User.username == username, User.is_active.is_(True))
|
|
)
|
|
user = result.scalar_one_or_none()
|
|
if not user:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="유저를 찾을 수 없음",
|
|
)
|
|
|
|
# 새 refresh token → cookie
|
|
_set_refresh_cookie(response, create_refresh_token(user.username))
|
|
|
|
return AccessTokenResponse(
|
|
access_token=create_access_token(user.username),
|
|
)
|
|
|
|
|
|
@router.post("/logout")
|
|
async def logout(response: Response):
|
|
"""로그아웃 — refresh cookie 삭제"""
|
|
response.delete_cookie("refresh_token", path="/api/auth")
|
|
return {"message": "로그아웃 완료"}
|
|
|
|
|
|
@router.get("/me", response_model=UserResponse)
|
|
async def get_me(user: Annotated[User, Depends(get_current_user)]):
|
|
"""현재 로그인한 유저 정보"""
|
|
return UserResponse(
|
|
id=user.id,
|
|
username=user.username,
|
|
is_active=user.is_active,
|
|
totp_enabled=bool(user.totp_secret),
|
|
last_login_at=user.last_login_at,
|
|
)
|
|
|
|
|
|
@router.post("/change-password")
|
|
async def change_password(
|
|
body: ChangePasswordRequest,
|
|
user: Annotated[User, Depends(get_current_user)],
|
|
session: Annotated[AsyncSession, Depends(get_session)],
|
|
):
|
|
"""비밀번호 변경"""
|
|
if not verify_password(body.current_password, user.password_hash):
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail="현재 비밀번호가 올바르지 않습니다",
|
|
)
|
|
|
|
user.password_hash = hash_password(body.new_password)
|
|
await session.commit()
|
|
return {"message": "비밀번호가 변경되었습니다"}
|