"""JWT + TOTP 2FA 인증""" import os from datetime import datetime, timedelta, timezone from typing import Annotated import bcrypt import pyotp from fastapi import Depends, HTTPException, status from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer from jose import JWTError, jwt from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from core.config import settings from core.database import get_session security = HTTPBearer() # JWT 설정 ALGORITHM = "HS256" ACCESS_TOKEN_EXPIRE_MINUTES = 15 REFRESH_TOKEN_EXPIRE_DAYS = 7 def verify_password(plain: str, hashed: str) -> bool: return bcrypt.checkpw(plain.encode(), hashed.encode()) def hash_password(password: str) -> str: return bcrypt.hashpw(password.encode(), bcrypt.gensalt()).decode() def create_access_token(subject: str, expires_minutes: int | None = None) -> str: minutes = expires_minutes if expires_minutes is not None else ACCESS_TOKEN_EXPIRE_MINUTES now = datetime.now(timezone.utc) expire = now + timedelta(minutes=minutes) payload = {"sub": subject, "exp": expire, "iat": int(now.timestamp()), "type": "access"} return jwt.encode(payload, settings.jwt_secret, algorithm=ALGORITHM) def create_voice_memo_bot_token(username: str) -> str | None: # Voice Memo PoC v1 — bot 계정 한정 long-expiry access token (env gate + username hard-match). # 일반 사용자 호출 시 None 반환. 정식 service-account/api_keys 는 Phase 2. if os.getenv("VOICE_MEMO_BOT_TOKEN_ENABLED", "false").lower() != "true": return None bot_username = os.getenv("VOICE_MEMO_BOT_USERNAME", "voice-memo-bot") if username != bot_username: return None expire_days = int(os.getenv("VOICE_MEMO_BOT_TOKEN_EXPIRE_DAYS", "365")) return create_access_token(username, expires_minutes=expire_days * 24 * 60) def create_laptop_worker_bot_token(username: str) -> str | None: # PR-Worker-Pool-Registry-1B — laptop-worker-bot 계정 한정 long-expiry token (voice-memo 동형). if os.getenv("LAPTOP_WORKER_BOT_TOKEN_ENABLED", "false").lower() != "true": return None bot_username = os.getenv("LAPTOP_WORKER_BOT_USERNAME", "laptop-worker-bot") if username != bot_username: return None expire_days = int(os.getenv("LAPTOP_WORKER_BOT_TOKEN_EXPIRE_DAYS", "365")) return create_access_token(username, expires_minutes=expire_days * 24 * 60) def create_refresh_token(subject: str) -> str: now = datetime.now(timezone.utc) expire = now + timedelta(days=REFRESH_TOKEN_EXPIRE_DAYS) payload = {"sub": subject, "exp": expire, "iat": int(now.timestamp()), "type": "refresh"} return jwt.encode(payload, settings.jwt_secret, algorithm=ALGORITHM) def decode_token(token: str) -> dict | None: try: return jwt.decode(token, settings.jwt_secret, algorithms=[ALGORITHM]) except JWTError: return None def verify_password_changed_at(payload: dict, user) -> None: # legacy 호환: password_changed_at NULL 이면 검증 skip (migration 전 발급 token 유지) # password 변경 후 발급 token 만 검증 — iat (int 초) >= int(password_changed_at.timestamp()) if user.password_changed_at is None: return iat = payload.get("iat") pwd_changed_int = int(user.password_changed_at.timestamp()) if iat is None or pwd_changed_int > int(iat): raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="비밀번호 변경 후 재로그인 필요", ) def verify_totp(code: str, secret: str | None = None) -> bool: """TOTP 코드 검증 (유저별 secret 또는 글로벌 설정)""" totp_secret = secret or settings.totp_secret if not totp_secret: return True # TOTP 미설정 시 스킵 totp = pyotp.TOTP(totp_secret) return totp.verify(code) async def get_current_user( credentials: Annotated[HTTPAuthorizationCredentials, Depends(security)], session: Annotated[AsyncSession, Depends(get_session)], ): """Bearer 토큰에서 현재 유저 조회""" from models.user import User payload = decode_token(credentials.credentials) if not payload or payload.get("type") != "access": raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="유효하지 않은 토큰", ) username = payload.get("sub") result = await session.execute( select(User).where(User.username == username, User.is_active.is_(True)) ) user = result.scalar_one_or_none() if not user: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="유저를 찾을 수 없음", ) verify_password_changed_at(payload, user) return user async def require_admin( credentials: Annotated[HTTPAuthorizationCredentials, Depends(security)], session: Annotated[AsyncSession, Depends(get_session)], ): """관리자 권한 확인 — 뉴스 소스 CRUD, 수집 트리거, digest 재생성 등""" user = await get_current_user(credentials, session) if not user.is_admin: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="관리자 권한 필요", ) return user async def require_worker_user( credentials: Annotated[HTTPAuthorizationCredentials, Depends(security)], session: Annotated[AsyncSession, Depends(get_session)], ): """PR-Worker-Pool-Registry-1B — /internal/worker/* 인증. laptop-worker-bot 만 허용. voice-memo-bot 또는 일반 사용자 토큰 → 403. """ user = await get_current_user(credentials, session) bot_username = os.getenv("LAPTOP_WORKER_BOT_USERNAME", "laptop-worker-bot") if user.username != bot_username: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="worker user only", ) return user