3fb613916a
클라우드 소비자(Claude/MCP)에 cloud-eligibility allowlist 강제 — DS 접근규격 갭2.
- auth: create_access_token egress claim(기본 local·비파괴) + get_egress_class 의존성
- AxisFilter.cloud_egress + _axis_sql allowlist 술어(토큰 claim 유래·쿼리파라미터 아님=우회불가)
- 규칙: external OR (work ∩ bucket∈{Eng,Safety,Law} ∩ ∉{voice,chat,memo} ∩ ≠memo ∩ user_note없음)
검증(cloud vs local): 인프라알림([Hyungi_NAS] tk-*api)·work/Programming(리디북스) 차단,
work/Engineering(hoop stress·ASME) 통과, external 통과. local=전부(무회귀).
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
170 lines
6.3 KiB
Python
170 lines
6.3 KiB
Python
"""JWT + TOTP 2FA 인증"""
|
|
|
|
import os
|
|
from datetime import datetime, timedelta, timezone
|
|
from typing import Annotated
|
|
|
|
import bcrypt
|
|
import pyotp
|
|
from fastapi import Depends, HTTPException, status
|
|
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
|
|
from jose import JWTError, jwt
|
|
from sqlalchemy import select
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from core.config import settings
|
|
from core.database import get_session
|
|
|
|
security = HTTPBearer()
|
|
|
|
# JWT 설정
|
|
ALGORITHM = "HS256"
|
|
ACCESS_TOKEN_EXPIRE_MINUTES = 15
|
|
REFRESH_TOKEN_EXPIRE_DAYS = 7
|
|
|
|
|
|
def verify_password(plain: str, hashed: str) -> bool:
|
|
return bcrypt.checkpw(plain.encode(), hashed.encode())
|
|
|
|
|
|
def hash_password(password: str) -> str:
|
|
return bcrypt.hashpw(password.encode(), bcrypt.gensalt()).decode()
|
|
|
|
|
|
def create_access_token(subject: str, expires_minutes: int | None = None, egress: str = "local") -> str:
|
|
minutes = expires_minutes if expires_minutes is not None else ACCESS_TOKEN_EXPIRE_MINUTES
|
|
now = datetime.now(timezone.utc)
|
|
expire = now + timedelta(minutes=minutes)
|
|
payload = {"sub": subject, "exp": expire, "iat": int(now.timestamp()), "type": "access", "egress": egress}
|
|
return jwt.encode(payload, settings.jwt_secret, algorithm=ALGORITHM)
|
|
|
|
|
|
def create_voice_memo_bot_token(username: str) -> str | None:
|
|
# Voice Memo PoC v1 — bot 계정 한정 long-expiry access token (env gate + username hard-match).
|
|
# 일반 사용자 호출 시 None 반환. 정식 service-account/api_keys 는 Phase 2.
|
|
if os.getenv("VOICE_MEMO_BOT_TOKEN_ENABLED", "false").lower() != "true":
|
|
return None
|
|
bot_username = os.getenv("VOICE_MEMO_BOT_USERNAME", "voice-memo-bot")
|
|
if username != bot_username:
|
|
return None
|
|
expire_days = int(os.getenv("VOICE_MEMO_BOT_TOKEN_EXPIRE_DAYS", "365"))
|
|
return create_access_token(username, expires_minutes=expire_days * 24 * 60)
|
|
|
|
|
|
def create_laptop_worker_bot_token(username: str) -> str | None:
|
|
# PR-Worker-Pool-Registry-1B — laptop-worker-bot 계정 한정 long-expiry token (voice-memo 동형).
|
|
if os.getenv("LAPTOP_WORKER_BOT_TOKEN_ENABLED", "false").lower() != "true":
|
|
return None
|
|
bot_username = os.getenv("LAPTOP_WORKER_BOT_USERNAME", "laptop-worker-bot")
|
|
if username != bot_username:
|
|
return None
|
|
expire_days = int(os.getenv("LAPTOP_WORKER_BOT_TOKEN_EXPIRE_DAYS", "365"))
|
|
return create_access_token(username, expires_minutes=expire_days * 24 * 60)
|
|
|
|
|
|
def create_refresh_token(subject: str) -> str:
|
|
now = datetime.now(timezone.utc)
|
|
expire = now + timedelta(days=REFRESH_TOKEN_EXPIRE_DAYS)
|
|
payload = {"sub": subject, "exp": expire, "iat": int(now.timestamp()), "type": "refresh"}
|
|
return jwt.encode(payload, settings.jwt_secret, algorithm=ALGORITHM)
|
|
|
|
|
|
def decode_token(token: str) -> dict | None:
|
|
try:
|
|
return jwt.decode(token, settings.jwt_secret, algorithms=[ALGORITHM])
|
|
except JWTError:
|
|
return None
|
|
|
|
|
|
|
|
|
|
def verify_password_changed_at(payload: dict, user) -> None:
|
|
# legacy 호환: password_changed_at NULL 이면 검증 skip (migration 전 발급 token 유지)
|
|
# password 변경 후 발급 token 만 검증 — iat (int 초) >= int(password_changed_at.timestamp())
|
|
if user.password_changed_at is None:
|
|
return
|
|
iat = payload.get("iat")
|
|
pwd_changed_int = int(user.password_changed_at.timestamp())
|
|
if iat is None or pwd_changed_int > int(iat):
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="비밀번호 변경 후 재로그인 필요",
|
|
)
|
|
|
|
def verify_totp(code: str, secret: str | None = None) -> bool:
|
|
"""TOTP 코드 검증 (유저별 secret 또는 글로벌 설정)"""
|
|
totp_secret = secret or settings.totp_secret
|
|
if not totp_secret:
|
|
return True # TOTP 미설정 시 스킵
|
|
totp = pyotp.TOTP(totp_secret)
|
|
return totp.verify(code)
|
|
|
|
|
|
async def get_egress_class(
|
|
credentials: Annotated[HTTPAuthorizationCredentials, Depends(security)],
|
|
) -> str:
|
|
"""토큰 egress claim -> 'cloud'|'local' (갭2 cloud-egress allowlist). claim 부재=local
|
|
(비파괴; 기존 토큰=신뢰/로컬). 쿼리파라미터 아님 -> 호출자가 끌 수 없음(우회 차단)."""
|
|
payload = decode_token(credentials.credentials)
|
|
return (payload or {}).get("egress", "local")
|
|
|
|
|
|
async def get_current_user(
|
|
credentials: Annotated[HTTPAuthorizationCredentials, Depends(security)],
|
|
session: Annotated[AsyncSession, Depends(get_session)],
|
|
):
|
|
"""Bearer 토큰에서 현재 유저 조회"""
|
|
from models.user import User
|
|
|
|
payload = decode_token(credentials.credentials)
|
|
if not payload or payload.get("type") != "access":
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="유효하지 않은 토큰",
|
|
)
|
|
|
|
username = payload.get("sub")
|
|
result = await session.execute(
|
|
select(User).where(User.username == username, User.is_active.is_(True))
|
|
)
|
|
user = result.scalar_one_or_none()
|
|
if not user:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="유저를 찾을 수 없음",
|
|
)
|
|
verify_password_changed_at(payload, user)
|
|
return user
|
|
|
|
|
|
async def require_admin(
|
|
credentials: Annotated[HTTPAuthorizationCredentials, Depends(security)],
|
|
session: Annotated[AsyncSession, Depends(get_session)],
|
|
):
|
|
"""관리자 권한 확인 — 뉴스 소스 CRUD, 수집 트리거, digest 재생성 등"""
|
|
user = await get_current_user(credentials, session)
|
|
if not user.is_admin:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="관리자 권한 필요",
|
|
)
|
|
return user
|
|
|
|
|
|
async def require_worker_user(
|
|
credentials: Annotated[HTTPAuthorizationCredentials, Depends(security)],
|
|
session: Annotated[AsyncSession, Depends(get_session)],
|
|
):
|
|
"""PR-Worker-Pool-Registry-1B — /internal/worker/* 인증.
|
|
|
|
laptop-worker-bot 만 허용. voice-memo-bot 또는 일반 사용자 토큰 → 403.
|
|
"""
|
|
user = await get_current_user(credentials, session)
|
|
bot_username = os.getenv("LAPTOP_WORKER_BOT_USERNAME", "laptop-worker-bot")
|
|
if user.username != bot_username:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="worker user only",
|
|
)
|
|
return user
|