Files
hyungi_document_server/app/core/auth.py
T
Hyungi Ahn 5038007998 fix(news): SSRF validation + admin auth + API key masking + collect lock + XML safety
- 신규 url_validator.py: SSRF 차단 (private IP/loopback/link-local/reserved/multicast/CGNAT 블록, HTTPS only)
- require_admin dependency 추가 — 소스 CRUD, /collect, /digest/regenerate에 적용
- User.is_admin 컬럼 + migration 104
- NYT API key 로그 마스킹 (쿼리스트링 제거)
- RSS fetch: redirect 수동 처리(3회, target 재검증), 5MB 크기 제한, content-type 허용목록, feed.bozo 체크
- /collect 재진입 차단 (asyncio.Lock, 단일 인스턴스 한정)
- HTTP feed allowlist (코드 레벨 상수, API 미노출)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-13 14:32:55 +09:00

100 lines
3.1 KiB
Python

"""JWT + TOTP 2FA 인증"""
from datetime import datetime, timedelta, timezone
from typing import Annotated
import bcrypt
import pyotp
from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from jose import JWTError, jwt
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from core.config import settings
from core.database import get_session
security = HTTPBearer()
# JWT 설정
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 15
REFRESH_TOKEN_EXPIRE_DAYS = 7
def verify_password(plain: str, hashed: str) -> bool:
return bcrypt.checkpw(plain.encode(), hashed.encode())
def hash_password(password: str) -> str:
return bcrypt.hashpw(password.encode(), bcrypt.gensalt()).decode()
def create_access_token(subject: str) -> str:
expire = datetime.now(timezone.utc) + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
payload = {"sub": subject, "exp": expire, "type": "access"}
return jwt.encode(payload, settings.jwt_secret, algorithm=ALGORITHM)
def create_refresh_token(subject: str) -> str:
expire = datetime.now(timezone.utc) + timedelta(days=REFRESH_TOKEN_EXPIRE_DAYS)
payload = {"sub": subject, "exp": expire, "type": "refresh"}
return jwt.encode(payload, settings.jwt_secret, algorithm=ALGORITHM)
def decode_token(token: str) -> dict | None:
try:
return jwt.decode(token, settings.jwt_secret, algorithms=[ALGORITHM])
except JWTError:
return None
def verify_totp(code: str, secret: str | None = None) -> bool:
"""TOTP 코드 검증 (유저별 secret 또는 글로벌 설정)"""
totp_secret = secret or settings.totp_secret
if not totp_secret:
return True # TOTP 미설정 시 스킵
totp = pyotp.TOTP(totp_secret)
return totp.verify(code)
async def get_current_user(
credentials: Annotated[HTTPAuthorizationCredentials, Depends(security)],
session: Annotated[AsyncSession, Depends(get_session)],
):
"""Bearer 토큰에서 현재 유저 조회"""
from models.user import User
payload = decode_token(credentials.credentials)
if not payload or payload.get("type") != "access":
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="유효하지 않은 토큰",
)
username = payload.get("sub")
result = await session.execute(
select(User).where(User.username == username, User.is_active.is_(True))
)
user = result.scalar_one_or_none()
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="유저를 찾을 수 없음",
)
return user
async def require_admin(
credentials: Annotated[HTTPAuthorizationCredentials, Depends(security)],
session: Annotated[AsyncSession, Depends(get_session)],
):
"""관리자 권한 확인 — 뉴스 소스 CRUD, 수집 트리거, digest 재생성 등"""
user = await get_current_user(credentials, session)
if not user.is_admin:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="관리자 권한 필요",
)
return user