Files
hyungi_document_server/app/core/auth.py
Hyungi Ahn a601991f48 feat: implement Phase 0 auth system, setup wizard, and Docker config
- Add users table to migration, User ORM model
- Implement JWT+TOTP auth API (login, refresh, me, change-password)
- Add first-run setup wizard with rate-limited admin creation,
  TOTP QR enrollment (secret saved only after verification), and
  NAS path verification — served as Jinja2 single-page HTML
- Add setup redirect middleware (bypasses /health, /docs, /openapi.json)
- Mount config.yaml, scripts, logs volumes in docker-compose
- Route API vs frontend traffic in Caddyfile
- Include admin seed script as CLI fallback

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-02 13:21:45 +09:00

87 lines
2.7 KiB
Python

"""JWT + TOTP 2FA 인증"""
from datetime import datetime, timedelta, timezone
from typing import Annotated
import pyotp
from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from jose import JWTError, jwt
from passlib.context import CryptContext
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from core.config import settings
from core.database import get_session
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
security = HTTPBearer()
# JWT 설정
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 15
REFRESH_TOKEN_EXPIRE_DAYS = 7
def verify_password(plain: str, hashed: str) -> bool:
return pwd_context.verify(plain, hashed)
def hash_password(password: str) -> str:
return pwd_context.hash(password)
def create_access_token(subject: str) -> str:
expire = datetime.now(timezone.utc) + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
payload = {"sub": subject, "exp": expire, "type": "access"}
return jwt.encode(payload, settings.jwt_secret, algorithm=ALGORITHM)
def create_refresh_token(subject: str) -> str:
expire = datetime.now(timezone.utc) + timedelta(days=REFRESH_TOKEN_EXPIRE_DAYS)
payload = {"sub": subject, "exp": expire, "type": "refresh"}
return jwt.encode(payload, settings.jwt_secret, algorithm=ALGORITHM)
def decode_token(token: str) -> dict | None:
try:
return jwt.decode(token, settings.jwt_secret, algorithms=[ALGORITHM])
except JWTError:
return None
def verify_totp(code: str, secret: str | None = None) -> bool:
"""TOTP 코드 검증 (유저별 secret 또는 글로벌 설정)"""
totp_secret = secret or settings.totp_secret
if not totp_secret:
return True # TOTP 미설정 시 스킵
totp = pyotp.TOTP(totp_secret)
return totp.verify(code)
async def get_current_user(
credentials: Annotated[HTTPAuthorizationCredentials, Depends(security)],
session: Annotated[AsyncSession, Depends(get_session)],
):
"""Bearer 토큰에서 현재 유저 조회"""
from models.user import User
payload = decode_token(credentials.credentials)
if not payload or payload.get("type") != "access":
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="유효하지 않은 토큰",
)
username = payload.get("sub")
result = await session.execute(
select(User).where(User.username == username, User.is_active.is_(True))
)
user = result.scalar_one_or_none()
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="유저를 찾을 수 없음",
)
return user