5038007998
- 신규 url_validator.py: SSRF 차단 (private IP/loopback/link-local/reserved/multicast/CGNAT 블록, HTTPS only) - require_admin dependency 추가 — 소스 CRUD, /collect, /digest/regenerate에 적용 - User.is_admin 컬럼 + migration 104 - NYT API key 로그 마스킹 (쿼리스트링 제거) - RSS fetch: redirect 수동 처리(3회, target 재검증), 5MB 크기 제한, content-type 허용목록, feed.bozo 체크 - /collect 재진입 차단 (asyncio.Lock, 단일 인스턴스 한정) - HTTP feed allowlist (코드 레벨 상수, API 미노출) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
100 lines
3.1 KiB
Python
100 lines
3.1 KiB
Python
"""JWT + TOTP 2FA 인증"""
|
|
|
|
from datetime import datetime, timedelta, timezone
|
|
from typing import Annotated
|
|
|
|
import bcrypt
|
|
import pyotp
|
|
from fastapi import Depends, HTTPException, status
|
|
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
|
|
from jose import JWTError, jwt
|
|
from sqlalchemy import select
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from core.config import settings
|
|
from core.database import get_session
|
|
|
|
security = HTTPBearer()
|
|
|
|
# JWT 설정
|
|
ALGORITHM = "HS256"
|
|
ACCESS_TOKEN_EXPIRE_MINUTES = 15
|
|
REFRESH_TOKEN_EXPIRE_DAYS = 7
|
|
|
|
|
|
def verify_password(plain: str, hashed: str) -> bool:
|
|
return bcrypt.checkpw(plain.encode(), hashed.encode())
|
|
|
|
|
|
def hash_password(password: str) -> str:
|
|
return bcrypt.hashpw(password.encode(), bcrypt.gensalt()).decode()
|
|
|
|
|
|
def create_access_token(subject: str) -> str:
|
|
expire = datetime.now(timezone.utc) + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
|
|
payload = {"sub": subject, "exp": expire, "type": "access"}
|
|
return jwt.encode(payload, settings.jwt_secret, algorithm=ALGORITHM)
|
|
|
|
|
|
def create_refresh_token(subject: str) -> str:
|
|
expire = datetime.now(timezone.utc) + timedelta(days=REFRESH_TOKEN_EXPIRE_DAYS)
|
|
payload = {"sub": subject, "exp": expire, "type": "refresh"}
|
|
return jwt.encode(payload, settings.jwt_secret, algorithm=ALGORITHM)
|
|
|
|
|
|
def decode_token(token: str) -> dict | None:
|
|
try:
|
|
return jwt.decode(token, settings.jwt_secret, algorithms=[ALGORITHM])
|
|
except JWTError:
|
|
return None
|
|
|
|
|
|
def verify_totp(code: str, secret: str | None = None) -> bool:
|
|
"""TOTP 코드 검증 (유저별 secret 또는 글로벌 설정)"""
|
|
totp_secret = secret or settings.totp_secret
|
|
if not totp_secret:
|
|
return True # TOTP 미설정 시 스킵
|
|
totp = pyotp.TOTP(totp_secret)
|
|
return totp.verify(code)
|
|
|
|
|
|
async def get_current_user(
|
|
credentials: Annotated[HTTPAuthorizationCredentials, Depends(security)],
|
|
session: Annotated[AsyncSession, Depends(get_session)],
|
|
):
|
|
"""Bearer 토큰에서 현재 유저 조회"""
|
|
from models.user import User
|
|
|
|
payload = decode_token(credentials.credentials)
|
|
if not payload or payload.get("type") != "access":
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="유효하지 않은 토큰",
|
|
)
|
|
|
|
username = payload.get("sub")
|
|
result = await session.execute(
|
|
select(User).where(User.username == username, User.is_active.is_(True))
|
|
)
|
|
user = result.scalar_one_or_none()
|
|
if not user:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="유저를 찾을 수 없음",
|
|
)
|
|
return user
|
|
|
|
|
|
async def require_admin(
|
|
credentials: Annotated[HTTPAuthorizationCredentials, Depends(security)],
|
|
session: Annotated[AsyncSession, Depends(get_session)],
|
|
):
|
|
"""관리자 권한 확인 — 뉴스 소스 CRUD, 수집 트리거, digest 재생성 등"""
|
|
user = await get_current_user(credentials, session)
|
|
if not user.is_admin:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="관리자 권한 필요",
|
|
)
|
|
return user
|