"""첫 접속 셋업 위자드 API 유저가 0명일 때만 동작. 셋업 완료 후 자동 비활성화. """ import time from pathlib import Path from typing import Annotated import pyotp from fastapi import APIRouter, Depends, HTTPException, Request, status from fastapi.responses import HTMLResponse from fastapi.templating import Jinja2Templates from pydantic import BaseModel from sqlalchemy import func, select from sqlalchemy.ext.asyncio import AsyncSession from core.auth import create_access_token, create_refresh_token, hash_password from core.config import settings from core.database import get_session from models.user import User router = APIRouter() templates = Jinja2Templates(directory=Path(__file__).parent.parent / "templates") # ─── Rate Limiting (인메모리, 단일 프로세스) ─── _failed_attempts: dict[str, list[float]] = {} RATE_LIMIT_MAX = 5 RATE_LIMIT_WINDOW = 300 # 5분 def _check_rate_limit(client_ip: str): """5분 내 5회 실패 시 차단""" now = time.time() attempts = _failed_attempts.get(client_ip, []) # 윈도우 밖의 기록 제거 attempts = [t for t in attempts if now - t < RATE_LIMIT_WINDOW] _failed_attempts[client_ip] = attempts if len(attempts) >= RATE_LIMIT_MAX: raise HTTPException( status_code=status.HTTP_429_TOO_MANY_REQUESTS, detail=f"너무 많은 시도입니다. {RATE_LIMIT_WINDOW // 60}분 후 다시 시도하세요.", ) def _record_failure(client_ip: str): _failed_attempts.setdefault(client_ip, []).append(time.time()) # ─── 헬퍼: 셋업 필요 여부 ─── async def _needs_setup(session: AsyncSession) -> bool: result = await session.execute(select(func.count(User.id))) return result.scalar() == 0 async def _require_setup(session: AsyncSession): if not await _needs_setup(session): raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="셋업이 이미 완료되었습니다", ) # ─── 스키마 ─── class SetupStatusResponse(BaseModel): needs_setup: bool class CreateAdminRequest(BaseModel): username: str password: str class CreateAdminResponse(BaseModel): message: str access_token: str refresh_token: str class TOTPInitResponse(BaseModel): secret: str otpauth_uri: str class TOTPVerifyRequest(BaseModel): secret: str code: str class VerifyNASRequest(BaseModel): path: str class VerifyNASResponse(BaseModel): exists: bool readable: bool writable: bool path: str # ─── 엔드포인트 ─── @router.get("/status", response_model=SetupStatusResponse) async def setup_status(session: Annotated[AsyncSession, Depends(get_session)]): """셋업 필요 여부 확인""" return SetupStatusResponse(needs_setup=await _needs_setup(session)) @router.post("/admin", response_model=CreateAdminResponse) async def create_admin( body: CreateAdminRequest, request: Request, session: Annotated[AsyncSession, Depends(get_session)], ): """관리자 계정 생성 (유저 0명일 때만)""" await _require_setup(session) client_ip = request.client.host if request.client else "unknown" _check_rate_limit(client_ip) # 유효성 검사 if len(body.username) < 2: _record_failure(client_ip) raise HTTPException(status_code=400, detail="아이디는 2자 이상이어야 합니다") if len(body.password) < 8: _record_failure(client_ip) raise HTTPException(status_code=400, detail="비밀번호는 8자 이상이어야 합니다") user = User( username=body.username, password_hash=hash_password(body.password), is_active=True, ) session.add(user) await session.commit() return CreateAdminResponse( message=f"관리자 '{body.username}' 계정이 생성되었습니다", access_token=create_access_token(body.username), refresh_token=create_refresh_token(body.username), ) @router.post("/totp/init", response_model=TOTPInitResponse) async def totp_init( request: Request, session: Annotated[AsyncSession, Depends(get_session)], ): """TOTP 시크릿 생성 + otpauth URI 반환 (DB에 저장하지 않음)""" await _require_setup(session) secret = pyotp.random_base32() totp = pyotp.TOTP(secret) uri = totp.provisioning_uri( name="admin", issuer_name="hyungi Document Server", ) return TOTPInitResponse(secret=secret, otpauth_uri=uri) @router.post("/totp/verify") async def totp_verify( body: TOTPVerifyRequest, session: Annotated[AsyncSession, Depends(get_session)], ): """TOTP 코드 검증 후 DB에 시크릿 저장""" await _require_setup(session) totp = pyotp.TOTP(body.secret) if not totp.verify(body.code): raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="TOTP 코드가 올바르지 않습니다. 다시 시도하세요.", ) # 가장 최근 생성된 유저에 저장 (셋업 직후이므로 유저 1명) result = await session.execute( select(User).order_by(User.id.desc()).limit(1) ) user = result.scalar_one_or_none() if not user: raise HTTPException(status_code=404, detail="유저를 찾을 수 없습니다") user.totp_secret = body.secret await session.commit() return {"message": "TOTP 2FA가 활성화되었습니다"} @router.post("/verify-nas", response_model=VerifyNASResponse) async def verify_nas( body: VerifyNASRequest, session: Annotated[AsyncSession, Depends(get_session)], ): """NAS 마운트 경로 읽기/쓰기 테스트""" await _require_setup(session) path = Path(body.path) exists = path.exists() readable = path.is_dir() and any(True for _ in path.iterdir()) if exists else False writable = False if exists: test_file = path / ".pkm_write_test" try: test_file.write_text("test") test_file.unlink() writable = True except OSError: pass return VerifyNASResponse( exists=exists, readable=readable, writable=writable, path=str(path), ) @router.get("/", response_class=HTMLResponse) async def setup_page( request: Request, session: Annotated[AsyncSession, Depends(get_session)], ): """셋업 위자드 HTML 페이지""" if not await _needs_setup(session): from fastapi.responses import RedirectResponse return RedirectResponse(url="/docs") return templates.TemplateResponse(request, "setup.html")