Files
hyungi 74876b674c feat(auth): JWT iat + users.password_changed_at invalidation (PR-Docsrv-JWT-Invalidation-1)
PR-Infra-Sec-1H Phase 0 audit 에서 DS jwt invalidation 정책 부재 확정.
password rotation 으로 구 365d JWT (voice-memo-bot 등) invalidate 안 되는
hard gate STOP 진입 → 선행 PR 분리.

- migration 269: users.password_changed_at timestamptz NULL (legacy 호환)
- create_access_token / create_refresh_token: payload 에 iat (int 초) 추가
- verify_password_changed_at helper: int(password_changed_at.timestamp()) > int(iat) 시 401
- get_current_user + refresh_token route: verify helper 호출
- change_password / setup signup / seed_admin INSERT+UPDATE: password_changed_at 갱신

NULL = 검증 skip (migration 직후 운영 영향 0). 첫 password 변경 후만 iat
검증 활성. Sec-1H 의 G-token-old hard gate 통과 path 확보.
2026-05-17 06:20:46 +00:00

237 lines
6.6 KiB
Python

"""첫 접속 셋업 위자드 API
유저가 0명일 때만 동작. 셋업 완료 후 자동 비활성화.
"""
import time
from pathlib import Path
from typing import Annotated
import pyotp
from datetime import datetime, timezone
from fastapi import APIRouter, Depends, HTTPException, Request, status
from fastapi.responses import HTMLResponse
from fastapi.templating import Jinja2Templates
from pydantic import BaseModel
from sqlalchemy import func, select
from sqlalchemy.ext.asyncio import AsyncSession
from core.auth import create_access_token, create_refresh_token, hash_password
from core.config import settings
from core.database import get_session
from models.user import User
router = APIRouter()
templates = Jinja2Templates(directory=Path(__file__).parent.parent / "templates")
# ─── Rate Limiting (인메모리, 단일 프로세스) ───
_failed_attempts: dict[str, list[float]] = {}
RATE_LIMIT_MAX = 5
RATE_LIMIT_WINDOW = 300 # 5분
def _check_rate_limit(client_ip: str):
"""5분 내 5회 실패 시 차단"""
now = time.time()
attempts = _failed_attempts.get(client_ip, [])
# 윈도우 밖의 기록 제거
attempts = [t for t in attempts if now - t < RATE_LIMIT_WINDOW]
_failed_attempts[client_ip] = attempts
if len(attempts) >= RATE_LIMIT_MAX:
raise HTTPException(
status_code=status.HTTP_429_TOO_MANY_REQUESTS,
detail=f"너무 많은 시도입니다. {RATE_LIMIT_WINDOW // 60}분 후 다시 시도하세요.",
)
def _record_failure(client_ip: str):
_failed_attempts.setdefault(client_ip, []).append(time.time())
# ─── 헬퍼: 셋업 필요 여부 ───
async def _needs_setup(session: AsyncSession) -> bool:
result = await session.execute(select(func.count(User.id)))
return result.scalar() == 0
async def _require_setup(session: AsyncSession):
if not await _needs_setup(session):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="셋업이 이미 완료되었습니다",
)
# ─── 스키마 ───
class SetupStatusResponse(BaseModel):
needs_setup: bool
class CreateAdminRequest(BaseModel):
username: str
password: str
class CreateAdminResponse(BaseModel):
message: str
access_token: str
refresh_token: str
class TOTPInitResponse(BaseModel):
secret: str
otpauth_uri: str
class TOTPVerifyRequest(BaseModel):
secret: str
code: str
class VerifyNASRequest(BaseModel):
path: str
class VerifyNASResponse(BaseModel):
exists: bool
readable: bool
writable: bool
path: str
# ─── 엔드포인트 ───
@router.get("/status", response_model=SetupStatusResponse)
async def setup_status(session: Annotated[AsyncSession, Depends(get_session)]):
"""셋업 필요 여부 확인"""
return SetupStatusResponse(needs_setup=await _needs_setup(session))
@router.post("/admin", response_model=CreateAdminResponse)
async def create_admin(
body: CreateAdminRequest,
request: Request,
session: Annotated[AsyncSession, Depends(get_session)],
):
"""관리자 계정 생성 (유저 0명일 때만)"""
await _require_setup(session)
client_ip = request.client.host if request.client else "unknown"
_check_rate_limit(client_ip)
# 유효성 검사
if len(body.username) < 2:
_record_failure(client_ip)
raise HTTPException(status_code=400, detail="아이디는 2자 이상이어야 합니다")
if len(body.password) < 8:
_record_failure(client_ip)
raise HTTPException(status_code=400, detail="비밀번호는 8자 이상이어야 합니다")
user = User(
username=body.username,
password_hash=hash_password(body.password),
is_active=True,
password_changed_at=datetime.now(timezone.utc),
)
session.add(user)
await session.commit()
return CreateAdminResponse(
message=f"관리자 '{body.username}' 계정이 생성되었습니다",
access_token=create_access_token(body.username),
refresh_token=create_refresh_token(body.username),
)
@router.post("/totp/init", response_model=TOTPInitResponse)
async def totp_init(
request: Request,
session: Annotated[AsyncSession, Depends(get_session)],
):
"""TOTP 시크릿 생성 + otpauth URI 반환 (DB에 저장하지 않음)"""
await _require_setup(session)
secret = pyotp.random_base32()
totp = pyotp.TOTP(secret)
uri = totp.provisioning_uri(
name="admin",
issuer_name="hyungi Document Server",
)
return TOTPInitResponse(secret=secret, otpauth_uri=uri)
@router.post("/totp/verify")
async def totp_verify(
body: TOTPVerifyRequest,
session: Annotated[AsyncSession, Depends(get_session)],
):
"""TOTP 코드 검증 후 DB에 시크릿 저장"""
await _require_setup(session)
totp = pyotp.TOTP(body.secret)
if not totp.verify(body.code):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="TOTP 코드가 올바르지 않습니다. 다시 시도하세요.",
)
# 가장 최근 생성된 유저에 저장 (셋업 직후이므로 유저 1명)
result = await session.execute(
select(User).order_by(User.id.desc()).limit(1)
)
user = result.scalar_one_or_none()
if not user:
raise HTTPException(status_code=404, detail="유저를 찾을 수 없습니다")
user.totp_secret = body.secret
await session.commit()
return {"message": "TOTP 2FA가 활성화되었습니다"}
@router.post("/verify-nas", response_model=VerifyNASResponse)
async def verify_nas(
body: VerifyNASRequest,
session: Annotated[AsyncSession, Depends(get_session)],
):
"""NAS 마운트 경로 읽기/쓰기 테스트"""
await _require_setup(session)
path = Path(body.path)
exists = path.exists()
readable = path.is_dir() and any(True for _ in path.iterdir()) if exists else False
writable = False
if exists:
test_file = path / ".pkm_write_test"
try:
test_file.write_text("test")
test_file.unlink()
writable = True
except OSError:
pass
return VerifyNASResponse(
exists=exists,
readable=readable,
writable=writable,
path=str(path),
)
@router.get("/", response_class=HTMLResponse)
async def setup_page(
request: Request,
session: Annotated[AsyncSession, Depends(get_session)],
):
"""셋업 위자드 HTML 페이지"""
if not await _needs_setup(session):
from fastapi.responses import RedirectResponse
return RedirectResponse(url="/docs")
return templates.TemplateResponse(request, "setup.html")