Files
hyungi_document_server/app/core/auth.py
T
Hyungi Ahn 52f86acda7 feat(auth): voice-memo bot 365d access token (PoC v1)
bot 계정(`voice-memo-bot`) 한정 long-expiry access token 발급 경로 추가.
일반 사용자 흐름 영향 0 (env gate default false).

- core/auth.py: create_voice_memo_bot_token() 신규 (env gate + username hard-match)
- api/auth.py: login route 에 bot 분기 (bot 이면 long token 반환, 일반은 기존 흐름)
- docker-compose.yml: 3 env (VOICE_MEMO_BOT_TOKEN_ENABLED/_USERNAME/_EXPIRE_DAYS) default false

OpenClaw `/voice-memo` plugin → DS `/memos/` Bearer proxy 의 auth 기반.
정식 service-account/api_keys 테이블은 Phase 2 (multi-service 인입 추가 시점).

plan: ~/.claude/plans/rosy-launching-otter.md
project: ~/.claude/projects/-Users-hyungiahn/memory/project_voice_memo_pipeline.md

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-13 12:24:18 +09:00

114 lines
3.9 KiB
Python

"""JWT + TOTP 2FA 인증"""
import os
from datetime import datetime, timedelta, timezone
from typing import Annotated
import bcrypt
import pyotp
from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from jose import JWTError, jwt
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from core.config import settings
from core.database import get_session
security = HTTPBearer()
# JWT 설정
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 15
REFRESH_TOKEN_EXPIRE_DAYS = 7
def verify_password(plain: str, hashed: str) -> bool:
return bcrypt.checkpw(plain.encode(), hashed.encode())
def hash_password(password: str) -> str:
return bcrypt.hashpw(password.encode(), bcrypt.gensalt()).decode()
def create_access_token(subject: str, expires_minutes: int | None = None) -> str:
minutes = expires_minutes if expires_minutes is not None else ACCESS_TOKEN_EXPIRE_MINUTES
expire = datetime.now(timezone.utc) + timedelta(minutes=minutes)
payload = {"sub": subject, "exp": expire, "type": "access"}
return jwt.encode(payload, settings.jwt_secret, algorithm=ALGORITHM)
def create_voice_memo_bot_token(username: str) -> str | None:
# Voice Memo PoC v1 — bot 계정 한정 long-expiry access token (env gate + username hard-match).
# 일반 사용자 호출 시 None 반환. 정식 service-account/api_keys 는 Phase 2.
if os.getenv("VOICE_MEMO_BOT_TOKEN_ENABLED", "false").lower() != "true":
return None
bot_username = os.getenv("VOICE_MEMO_BOT_USERNAME", "voice-memo-bot")
if username != bot_username:
return None
expire_days = int(os.getenv("VOICE_MEMO_BOT_TOKEN_EXPIRE_DAYS", "365"))
return create_access_token(username, expires_minutes=expire_days * 24 * 60)
def create_refresh_token(subject: str) -> str:
expire = datetime.now(timezone.utc) + timedelta(days=REFRESH_TOKEN_EXPIRE_DAYS)
payload = {"sub": subject, "exp": expire, "type": "refresh"}
return jwt.encode(payload, settings.jwt_secret, algorithm=ALGORITHM)
def decode_token(token: str) -> dict | None:
try:
return jwt.decode(token, settings.jwt_secret, algorithms=[ALGORITHM])
except JWTError:
return None
def verify_totp(code: str, secret: str | None = None) -> bool:
"""TOTP 코드 검증 (유저별 secret 또는 글로벌 설정)"""
totp_secret = secret or settings.totp_secret
if not totp_secret:
return True # TOTP 미설정 시 스킵
totp = pyotp.TOTP(totp_secret)
return totp.verify(code)
async def get_current_user(
credentials: Annotated[HTTPAuthorizationCredentials, Depends(security)],
session: Annotated[AsyncSession, Depends(get_session)],
):
"""Bearer 토큰에서 현재 유저 조회"""
from models.user import User
payload = decode_token(credentials.credentials)
if not payload or payload.get("type") != "access":
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="유효하지 않은 토큰",
)
username = payload.get("sub")
result = await session.execute(
select(User).where(User.username == username, User.is_active.is_(True))
)
user = result.scalar_one_or_none()
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="유저를 찾을 수 없음",
)
return user
async def require_admin(
credentials: Annotated[HTTPAuthorizationCredentials, Depends(security)],
session: Annotated[AsyncSession, Depends(get_session)],
):
"""관리자 권한 확인 — 뉴스 소스 CRUD, 수집 트리거, digest 재생성 등"""
user = await get_current_user(credentials, session)
if not user.is_admin:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="관리자 권한 필요",
)
return user