🔐 JWT 기반 로그인 시스템: - 로그인 페이지: 아름다운 애니메이션과 보안 정보 표시 - JWT 토큰: 24시간 또는 30일 (Remember Me) 만료 설정 - 비밀번호 암호화: bcrypt 해싱으로 안전한 저장 - 계정 잠금: 5회 실패 시 15분 자동 잠금 👥 사용자 계정 관리: - admin/admin123 (관리자 권한) - hyungi/hyungi123 (시스템 권한) - 역할 기반 접근 제어 (RBAC) 🛡️ 보안 기능: - 토큰 자동 검증 및 만료 처리 - 감사 로그: 로그인/로그아웃/관리 작업 추적 - 안전한 세션 관리 및 토큰 정리 - 클라이언트 사이드 토큰 검증 🎨 UI/UX 개선: - 로그인 페이지: 그라디언트 배경, 플로팅 아이콘 애니메이션 - 사용자 메뉴: 헤더에 사용자명과 로그아웃 버튼 표시 - 보안 표시: SSL, 세션 타임아웃, JWT 인증 정보 - 반응형 디자인 및 다크모드 지원 🔧 기술 구현: - FastAPI HTTPBearer 보안 스키마 - PyJWT 토큰 생성/검증 - bcrypt 비밀번호 해싱 - 클라이언트-서버 토큰 동기화 새 파일: - templates/login.html: 로그인 페이지 HTML - static/login.css: 로그인 페이지 스타일 - static/login.js: 로그인 JavaScript 로직 - server/auth.py: JWT 인증 시스템 (실제 서버용) 수정된 파일: - test_admin.py: 테스트 서버에 JWT 인증 추가 - static/admin.js: JWT 토큰 기반 API 요청으로 변경 - templates/admin.html: 사용자 메뉴 및 로그아웃 버튼 추가 - static/admin.css: 사용자 메뉴 스타일 추가 보안 레벨: Phase 1 (API Key) → Phase 3 (JWT + 감사로그)
842 lines
28 KiB
Python
842 lines
28 KiB
Python
from __future__ import annotations
|
|
|
|
from fastapi import FastAPI, HTTPException, Depends, UploadFile, File, Form, Request
|
|
from fastapi.middleware.cors import CORSMiddleware
|
|
from fastapi.templating import Jinja2Templates
|
|
from fastapi.staticfiles import StaticFiles
|
|
from fastapi.responses import HTMLResponse
|
|
from pydantic import BaseModel
|
|
from typing import List, Dict, Any
|
|
import shutil
|
|
from pathlib import Path
|
|
import os
|
|
from datetime import datetime
|
|
|
|
from .config import settings
|
|
from .ollama_client import OllamaClient
|
|
from .index_store import JsonlIndex
|
|
from .security import require_api_key
|
|
from .paperless_client import PaperlessClient
|
|
from .utils import chunk_text
|
|
from .pipeline import DocumentPipeline
|
|
|
|
|
|
app = FastAPI(title="Local AI Server", version="0.2.1")
|
|
|
|
# 템플릿과 정적 파일 설정
|
|
templates = Jinja2Templates(directory="templates")
|
|
app.mount("/static", StaticFiles(directory="static"), name="static")
|
|
|
|
# HTML 출력 디렉토리도 정적 파일로 서빙
|
|
if Path("outputs/html").exists():
|
|
app.mount("/html", StaticFiles(directory="outputs/html"), name="html")
|
|
|
|
# CORS
|
|
import os
|
|
cors_origins = os.getenv("CORS_ORIGINS", "*")
|
|
origins = [o.strip() for o in cors_origins.split(",") if o.strip()] or ["*"]
|
|
app.add_middleware(
|
|
CORSMiddleware,
|
|
allow_origins=origins,
|
|
allow_credentials=True,
|
|
allow_methods=["*"],
|
|
allow_headers=["*"],
|
|
)
|
|
ollama = OllamaClient(settings.ollama_host)
|
|
index = JsonlIndex(settings.index_path)
|
|
pipeline = DocumentPipeline(ollama, settings.embedding_model, settings.boost_model, output_dir=settings.output_dir)
|
|
|
|
|
|
class ChatRequest(BaseModel):
|
|
model: str | None = None
|
|
messages: List[Dict[str, str]]
|
|
use_rag: bool = True
|
|
top_k: int = 5
|
|
force_boost: bool = False
|
|
options: Dict[str, Any] | None = None
|
|
|
|
|
|
class SearchRequest(BaseModel):
|
|
query: str
|
|
top_k: int = 5
|
|
|
|
class UpsertRow(BaseModel):
|
|
id: str
|
|
text: str
|
|
source: str | None = None
|
|
|
|
class UpsertRequest(BaseModel):
|
|
rows: List[UpsertRow]
|
|
embed: bool = True
|
|
model: str | None = None
|
|
batch: int = 16
|
|
|
|
|
|
class PipelineIngestRequest(BaseModel):
|
|
doc_id: str
|
|
text: str
|
|
generate_html: bool = True
|
|
translate: bool = True
|
|
target_language: str = "ko"
|
|
summarize: bool = False
|
|
summary_sentences: int = 5
|
|
summary_language: str | None = None
|
|
html_basename: str | None = None
|
|
|
|
|
|
@app.get("/health")
|
|
def health() -> Dict[str, Any]:
|
|
return {
|
|
"status": "ok",
|
|
"base_model": settings.base_model,
|
|
"boost_model": settings.boost_model,
|
|
"embedding_model": settings.embedding_model,
|
|
"index_loaded": len(index.rows) if index else 0,
|
|
}
|
|
|
|
|
|
@app.post("/search")
|
|
def search(req: SearchRequest) -> Dict[str, Any]:
|
|
if not index.rows:
|
|
return {"results": []}
|
|
qvec = ollama.embeddings(settings.embedding_model, req.query)
|
|
results = index.search(qvec, top_k=req.top_k)
|
|
return {
|
|
"results": [
|
|
{"id": r.id, "score": float(score), "text": r.text[:400], "source": r.source}
|
|
for r, score in results
|
|
]
|
|
}
|
|
|
|
|
|
@app.post("/chat")
|
|
def chat(req: ChatRequest) -> Dict[str, Any]:
|
|
model = req.model
|
|
if not model:
|
|
# 언어 감지(매우 단순): 영문 비율이 높으면 영어 모델, 아니면 기본/부스팅
|
|
user_text = "\n".join(m.get("content", "") for m in req.messages if m.get("role") == "user")
|
|
ascii_letters = sum(ch.isascii() and ch.isalpha() for ch in user_text)
|
|
non_ascii_letters = sum((not ch.isascii()) and ch.isalpha() for ch in user_text)
|
|
english_ratio = ascii_letters / max(ascii_letters + non_ascii_letters, 1)
|
|
total_chars = len(user_text)
|
|
if english_ratio > settings.english_ratio_threshold:
|
|
model = settings.english_model
|
|
else:
|
|
model = settings.boost_model if (req.force_boost or total_chars > 2000) else settings.base_model
|
|
|
|
context_docs: List[str] = []
|
|
if req.use_rag and index.rows:
|
|
q = "\n".join([m.get("content", "") for m in req.messages if m.get("role") == "user"]).strip()
|
|
if q:
|
|
qvec = ollama.embeddings(settings.embedding_model, q)
|
|
hits = index.search(qvec, top_k=req.top_k)
|
|
context_docs = [r.text for r, _ in hits]
|
|
|
|
sys_prompt = ""
|
|
if context_docs:
|
|
sys_prompt = (
|
|
"당신은 문서 기반 비서입니다. 제공된 컨텍스트만 신뢰하고, 모르면 모른다고 답하세요.\n\n"
|
|
+ "\n\n".join(f"[DOC {i+1}]\n{t}" for i, t in enumerate(context_docs))
|
|
)
|
|
|
|
messages: List[Dict[str, str]] = []
|
|
if sys_prompt:
|
|
messages.append({"role": "system", "content": sys_prompt})
|
|
messages.extend(req.messages)
|
|
|
|
try:
|
|
resp = ollama.chat(model, messages, stream=False, options=req.options)
|
|
return {"model": model, "response": resp}
|
|
except Exception as e:
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
|
|
@app.post("/index/upsert")
|
|
def index_upsert(req: UpsertRequest) -> Dict[str, Any]:
|
|
try:
|
|
if not req.rows:
|
|
return {"added": 0}
|
|
model = req.model or settings.embedding_model
|
|
new_rows = []
|
|
for r in req.rows:
|
|
vec = ollama.embeddings(model, r.text) if req.embed else []
|
|
new_rows.append({
|
|
"id": r.id,
|
|
"text": r.text,
|
|
"vector": vec,
|
|
"source": r.source or "api",
|
|
})
|
|
# convert to IndexRow and append
|
|
from .index_store import IndexRow
|
|
to_append = [IndexRow(**nr) for nr in new_rows]
|
|
added = index.append(to_append)
|
|
return {"added": added}
|
|
except Exception as e:
|
|
raise HTTPException(status_code=500, detail=f"index_upsert_error: {e}")
|
|
|
|
|
|
@app.post("/index/reload")
|
|
def index_reload() -> Dict[str, Any]:
|
|
total = index.reload()
|
|
return {"total": total}
|
|
|
|
|
|
@app.post("/pipeline/ingest")
|
|
def pipeline_ingest(req: PipelineIngestRequest, _: None = Depends(require_api_key)) -> Dict[str, Any]:
|
|
result = pipeline.process(
|
|
doc_id=req.doc_id,
|
|
text=req.text,
|
|
index=index,
|
|
generate_html=req.generate_html,
|
|
translate=req.translate,
|
|
target_language=req.target_language,
|
|
summarize=req.summarize,
|
|
summary_sentences=req.summary_sentences,
|
|
summary_language=req.summary_language,
|
|
html_basename=req.html_basename,
|
|
)
|
|
exported_html: str | None = None
|
|
if result.html_path and settings.export_html_dir:
|
|
Path(settings.export_html_dir).mkdir(parents=True, exist_ok=True)
|
|
dst = str(Path(settings.export_html_dir) / Path(result.html_path).name)
|
|
shutil.copyfile(result.html_path, dst)
|
|
exported_html = dst
|
|
return {"status": "ok", "doc_id": result.doc_id, "added": result.added_chunks, "chunks": result.chunks, "html_path": result.html_path, "exported_html": exported_html}
|
|
|
|
|
|
@app.post("/pipeline/ingest_file")
|
|
async def pipeline_ingest_file(
|
|
_: None = Depends(require_api_key),
|
|
file: UploadFile = File(...),
|
|
doc_id: str = Form(...),
|
|
generate_html: bool = Form(True),
|
|
translate: bool = Form(True),
|
|
target_language: str = Form("ko"),
|
|
) -> Dict[str, Any]:
|
|
content_type = (file.content_type or "").lower()
|
|
raw = await file.read()
|
|
text = ""
|
|
if "text/plain" in content_type or file.filename.endswith(".txt"):
|
|
try:
|
|
text = raw.decode("utf-8")
|
|
except Exception:
|
|
text = raw.decode("latin-1", errors="ignore")
|
|
elif "pdf" in content_type or file.filename.endswith(".pdf"):
|
|
try:
|
|
from pypdf import PdfReader
|
|
from io import BytesIO
|
|
reader = PdfReader(BytesIO(raw))
|
|
parts: List[str] = []
|
|
for p in reader.pages:
|
|
try:
|
|
parts.append(p.extract_text() or "")
|
|
except Exception:
|
|
parts.append("")
|
|
text = "\n\n".join(parts)
|
|
except Exception as e:
|
|
raise HTTPException(status_code=400, detail=f"pdf_extract_error: {e}")
|
|
else:
|
|
raise HTTPException(status_code=400, detail="unsupported_file_type (only .txt/.pdf)")
|
|
|
|
if not text.strip():
|
|
raise HTTPException(status_code=400, detail="empty_text_after_extraction")
|
|
|
|
result = pipeline.process(
|
|
doc_id=doc_id,
|
|
text=text,
|
|
index=index,
|
|
generate_html=generate_html,
|
|
translate=translate,
|
|
target_language=target_language,
|
|
html_basename=file.filename,
|
|
)
|
|
exported_html: str | None = None
|
|
if result.html_path and settings.export_html_dir:
|
|
Path(settings.export_html_dir).mkdir(parents=True, exist_ok=True)
|
|
dst = str(Path(settings.export_html_dir) / Path(result.html_path).name)
|
|
shutil.copyfile(result.html_path, dst)
|
|
exported_html = dst
|
|
if settings.export_upload_dir:
|
|
Path(settings.export_upload_dir).mkdir(parents=True, exist_ok=True)
|
|
orig_name = f"{doc_id}__{file.filename}"
|
|
with open(str(Path(settings.export_upload_dir) / orig_name), "wb") as f:
|
|
f.write(raw)
|
|
return {"status": "ok", "doc_id": result.doc_id, "added": result.added_chunks, "chunks": result.chunks, "html_path": result.html_path, "exported_html": exported_html}
|
|
|
|
|
|
# Paperless webhook placeholder (to be wired with user-provided details)
|
|
class PaperlessHook(BaseModel):
|
|
document_id: int
|
|
title: str | None = None
|
|
tags: List[str] | None = None
|
|
|
|
|
|
@app.post("/paperless/hook")
|
|
def paperless_hook(hook: PaperlessHook, _: None = Depends(require_api_key)) -> Dict[str, Any]:
|
|
# Fetch text from Paperless and upsert into index
|
|
client = PaperlessClient(settings.paperless_base_url, settings.paperless_token)
|
|
text = client.get_document_text(hook.document_id)
|
|
parts = chunk_text(text)
|
|
model = settings.embedding_model
|
|
from .index_store import IndexRow
|
|
to_append = []
|
|
for i, t in enumerate(parts):
|
|
vec = ollama.embeddings(model, t)
|
|
to_append.append(IndexRow(id=f"paperless:{hook.document_id}:{i}", text=t, vector=vec, source="paperless"))
|
|
added = index.append(to_append)
|
|
return {"status": "indexed", "document_id": hook.document_id, "chunks": added}
|
|
|
|
|
|
class PaperlessSyncRequest(BaseModel):
|
|
page_size: int = 50
|
|
ordering: str = "-created"
|
|
tags: List[int] | None = None
|
|
query: str | None = None
|
|
limit: int = 200
|
|
|
|
|
|
@app.post("/paperless/sync")
|
|
def paperless_sync(req: PaperlessSyncRequest, _: None = Depends(require_api_key)) -> Dict[str, Any]:
|
|
client = PaperlessClient(settings.paperless_base_url, settings.paperless_token)
|
|
from .index_store import IndexRow
|
|
added_total = 0
|
|
skipped = 0
|
|
next_url: str | None = None
|
|
fetched = 0
|
|
|
|
while True:
|
|
if next_url:
|
|
import requests as _rq
|
|
resp = _rq.get(next_url, headers=client._headers(), timeout=60)
|
|
resp.raise_for_status()
|
|
data = resp.json()
|
|
else:
|
|
data = client.list_documents(page_size=req.page_size, ordering=req.ordering, tags=req.tags, query=req.query)
|
|
results = data.get("results", [])
|
|
to_append: List[IndexRow] = []
|
|
for doc in results:
|
|
doc_id = doc.get("id")
|
|
if not doc_id:
|
|
continue
|
|
try:
|
|
text = client.get_document_text(int(doc_id))
|
|
if not text:
|
|
skipped += 1
|
|
continue
|
|
parts = chunk_text(text)
|
|
for i, t in enumerate(parts):
|
|
vec = ollama.embeddings(settings.embedding_model, t)
|
|
to_append.append(IndexRow(id=f"paperless:{doc_id}:{i}", text=t, vector=vec, source="paperless"))
|
|
except Exception:
|
|
skipped += 1
|
|
continue
|
|
if to_append:
|
|
added_total += index.append(to_append)
|
|
fetched += len(results)
|
|
if fetched >= req.limit:
|
|
break
|
|
next_url = data.get("next")
|
|
if not next_url:
|
|
break
|
|
|
|
return {"status": "synced", "added": added_total, "skipped": skipped}
|
|
|
|
|
|
# OpenAI-compatible chat completions (minimal)
|
|
class ChatCompletionsRequest(BaseModel):
|
|
model: str | None = None
|
|
messages: List[Dict[str, str]]
|
|
temperature: float | None = None
|
|
max_tokens: int | None = None
|
|
|
|
|
|
@app.post("/v1/chat/completions")
|
|
def chat_completions(req: ChatCompletionsRequest, _: None = Depends(require_api_key)) -> Dict[str, Any]:
|
|
chosen = req.model or settings.base_model
|
|
opts: Dict[str, Any] = {}
|
|
if req.temperature is not None:
|
|
opts["temperature"] = req.temperature
|
|
# Note: Ollama ignores max_tokens field; left here for interface similarity
|
|
resp = ollama.chat(chosen, req.messages, stream=False, options=opts)
|
|
# Minimal OpenAI-like response shape
|
|
return {
|
|
"id": "chatcmpl-local",
|
|
"object": "chat.completion",
|
|
"model": chosen,
|
|
"choices": [
|
|
{
|
|
"index": 0,
|
|
"message": resp.get("message", {"role": "assistant", "content": resp.get("response", "")}),
|
|
"finish_reason": resp.get("done_reason", "stop"),
|
|
}
|
|
],
|
|
}
|
|
|
|
|
|
# =============================================================================
|
|
# UI 라우트들
|
|
# =============================================================================
|
|
|
|
@app.get("/", response_class=HTMLResponse)
|
|
async def dashboard(request: Request):
|
|
"""메인 대시보드 페이지"""
|
|
# 서버 상태 가져오기
|
|
status = {
|
|
"base_model": settings.base_model,
|
|
"boost_model": settings.boost_model,
|
|
"embedding_model": settings.embedding_model,
|
|
"index_loaded": len(index.rows) if index else 0,
|
|
}
|
|
|
|
# 최근 문서 (임시 데이터 - 실제로는 DB나 파일에서 가져올 것)
|
|
recent_documents = []
|
|
|
|
# 통계 (임시 데이터)
|
|
stats = {
|
|
"total_documents": len(index.rows) if index else 0,
|
|
"total_chunks": len(index.rows) if index else 0,
|
|
"today_processed": 0,
|
|
}
|
|
|
|
return templates.TemplateResponse("index.html", {
|
|
"request": request,
|
|
"status": status,
|
|
"recent_documents": recent_documents,
|
|
"stats": stats,
|
|
})
|
|
|
|
|
|
@app.get("/upload", response_class=HTMLResponse)
|
|
async def upload_page(request: Request):
|
|
"""파일 업로드 페이지"""
|
|
return templates.TemplateResponse("upload.html", {
|
|
"request": request,
|
|
"api_key": os.getenv("API_KEY", "")
|
|
})
|
|
|
|
|
|
def format_file_size(bytes_size):
|
|
"""파일 크기 포맷팅 헬퍼 함수"""
|
|
if bytes_size == 0:
|
|
return "0 Bytes"
|
|
k = 1024
|
|
sizes = ["Bytes", "KB", "MB", "GB"]
|
|
i = int(bytes_size / k)
|
|
if i >= len(sizes):
|
|
i = len(sizes) - 1
|
|
return f"{bytes_size / (k ** i):.2f} {sizes[i]}"
|
|
|
|
|
|
@app.get("/documents", response_class=HTMLResponse)
|
|
async def documents_page(request: Request):
|
|
"""문서 관리 페이지"""
|
|
# HTML 파일 목록 가져오기
|
|
html_dir = Path("outputs/html")
|
|
html_files = []
|
|
if html_dir.exists():
|
|
for file in html_dir.glob("*.html"):
|
|
stat = file.stat()
|
|
html_files.append({
|
|
"name": file.name,
|
|
"size": stat.st_size,
|
|
"created": datetime.fromtimestamp(stat.st_ctime).strftime("%Y-%m-%d %H:%M"),
|
|
"url": f"/html/{file.name}"
|
|
})
|
|
|
|
# 날짜순 정렬 (최신순)
|
|
html_files.sort(key=lambda x: x["created"], reverse=True)
|
|
|
|
return templates.TemplateResponse("documents.html", {
|
|
"request": request,
|
|
"documents": html_files,
|
|
"formatFileSize": format_file_size,
|
|
})
|
|
|
|
|
|
@app.get("/chat", response_class=HTMLResponse)
|
|
async def chat_page(request: Request):
|
|
"""AI 챗봇 페이지"""
|
|
# 서버 상태 정보
|
|
status = {
|
|
"base_model": settings.base_model,
|
|
"boost_model": settings.boost_model,
|
|
"embedding_model": settings.embedding_model,
|
|
"index_loaded": len(index.rows) if index else 0,
|
|
}
|
|
|
|
return templates.TemplateResponse("chat.html", {
|
|
"request": request,
|
|
"status": status,
|
|
"current_time": datetime.now().strftime("%H:%M"),
|
|
"api_key": os.getenv("API_KEY", "")
|
|
})
|
|
|
|
|
|
# Admin Dashboard Routes
|
|
@app.get("/admin", response_class=HTMLResponse)
|
|
async def admin_dashboard(request: Request, api_key: str = Depends(require_api_key)):
|
|
"""관리자 대시보드 페이지"""
|
|
return templates.TemplateResponse("admin.html", {
|
|
"request": request,
|
|
"server_port": settings.ai_server_port,
|
|
"ollama_host": settings.ollama_host,
|
|
})
|
|
|
|
|
|
@app.get("/admin/ollama/status")
|
|
async def admin_ollama_status(api_key: str = Depends(require_api_key)):
|
|
"""Ollama 서버 상태 확인"""
|
|
try:
|
|
# Ollama 서버에 ping 요청
|
|
response = await ollama.client.get(f"{settings.ollama_host}/api/tags")
|
|
if response.status_code == 200:
|
|
return {"status": "online", "models_count": len(response.json().get("models", []))}
|
|
else:
|
|
return {"status": "offline", "error": f"HTTP {response.status_code}"}
|
|
except Exception as e:
|
|
return {"status": "offline", "error": str(e)}
|
|
|
|
|
|
@app.get("/admin/models")
|
|
async def admin_get_models(api_key: str = Depends(require_api_key)):
|
|
"""설치된 모델 목록 조회"""
|
|
try:
|
|
models_data = await ollama.list_models()
|
|
models = []
|
|
|
|
for model in models_data.get("models", []):
|
|
models.append({
|
|
"name": model.get("name", "Unknown"),
|
|
"size": model.get("size", 0),
|
|
"status": "ready",
|
|
"is_active": model.get("name") == settings.base_model,
|
|
"last_used": model.get("modified_at"),
|
|
})
|
|
|
|
return {"models": models}
|
|
except Exception as e:
|
|
return {"models": [], "error": str(e)}
|
|
|
|
|
|
@app.get("/admin/models/active")
|
|
async def admin_get_active_model(api_key: str = Depends(require_api_key)):
|
|
"""현재 활성 모델 조회"""
|
|
return {"model": settings.base_model}
|
|
|
|
|
|
@app.post("/admin/models/test")
|
|
async def admin_test_model(request: dict, api_key: str = Depends(require_api_key)):
|
|
"""모델 테스트"""
|
|
model_name = request.get("model")
|
|
if not model_name:
|
|
raise HTTPException(status_code=400, detail="Model name is required")
|
|
|
|
try:
|
|
# 간단한 테스트 메시지 전송
|
|
test_response = await ollama.generate(
|
|
model=model_name,
|
|
prompt="Hello, this is a test. Please respond with 'Test successful'.",
|
|
stream=False
|
|
)
|
|
|
|
return {
|
|
"result": f"Test successful. Model responded: {test_response.get('response', 'No response')[:100]}..."
|
|
}
|
|
except Exception as e:
|
|
return {"result": f"Test failed: {str(e)}"}
|
|
|
|
|
|
# API Key Management (Placeholder - 실제 구현은 데이터베이스 필요)
|
|
api_keys_store = {} # 임시 저장소
|
|
|
|
|
|
@app.get("/admin/api-keys")
|
|
async def admin_get_api_keys(api_key: str = Depends(require_api_key)):
|
|
"""API 키 목록 조회"""
|
|
keys = []
|
|
for key_id, key_data in api_keys_store.items():
|
|
keys.append({
|
|
"id": key_id,
|
|
"name": key_data.get("name", "Unnamed"),
|
|
"key": key_data.get("key", ""),
|
|
"created_at": key_data.get("created_at", datetime.now().isoformat()),
|
|
"usage_count": key_data.get("usage_count", 0),
|
|
})
|
|
|
|
return {"api_keys": keys}
|
|
|
|
|
|
@app.post("/admin/api-keys")
|
|
async def admin_create_api_key(request: dict, api_key: str = Depends(require_api_key)):
|
|
"""새 API 키 생성"""
|
|
import secrets
|
|
import uuid
|
|
|
|
name = request.get("name", "Unnamed Key")
|
|
new_key = secrets.token_urlsafe(32)
|
|
key_id = str(uuid.uuid4())
|
|
|
|
api_keys_store[key_id] = {
|
|
"name": name,
|
|
"key": new_key,
|
|
"created_at": datetime.now().isoformat(),
|
|
"usage_count": 0,
|
|
}
|
|
|
|
return {"api_key": new_key, "key_id": key_id}
|
|
|
|
|
|
@app.delete("/admin/api-keys/{key_id}")
|
|
async def admin_delete_api_key(key_id: str, api_key: str = Depends(require_api_key)):
|
|
"""API 키 삭제"""
|
|
if key_id in api_keys_store:
|
|
del api_keys_store[key_id]
|
|
return {"message": "API key deleted successfully"}
|
|
else:
|
|
raise HTTPException(status_code=404, detail="API key not found")
|
|
|
|
|
|
# Phase 2: Advanced Model Management
|
|
@app.post("/admin/models/download")
|
|
async def admin_download_model(request: dict, api_key: str = Depends(require_api_key)):
|
|
"""모델 다운로드"""
|
|
model_name = request.get("model")
|
|
if not model_name:
|
|
raise HTTPException(status_code=400, detail="Model name is required")
|
|
|
|
try:
|
|
# Ollama pull 명령 실행
|
|
result = await ollama.pull_model(model_name)
|
|
return {
|
|
"success": True,
|
|
"message": f"Model '{model_name}' download started",
|
|
"details": result
|
|
}
|
|
except Exception as e:
|
|
return {
|
|
"success": False,
|
|
"error": f"Failed to download model: {str(e)}"
|
|
}
|
|
|
|
|
|
@app.delete("/admin/models/{model_name}")
|
|
async def admin_delete_model(model_name: str, api_key: str = Depends(require_api_key)):
|
|
"""모델 삭제"""
|
|
try:
|
|
# Ollama 모델 삭제
|
|
result = await ollama.delete_model(model_name)
|
|
return {
|
|
"success": True,
|
|
"message": f"Model '{model_name}' deleted successfully",
|
|
"details": result
|
|
}
|
|
except Exception as e:
|
|
return {
|
|
"success": False,
|
|
"error": f"Failed to delete model: {str(e)}"
|
|
}
|
|
|
|
|
|
@app.get("/admin/models/available")
|
|
async def admin_get_available_models(api_key: str = Depends(require_api_key)):
|
|
"""다운로드 가능한 모델 목록"""
|
|
# 인기 있는 모델들 목록 (실제로는 Ollama 레지스트리에서 가져와야 함)
|
|
available_models = [
|
|
{
|
|
"name": "llama3.2:1b",
|
|
"description": "Meta의 Llama 3.2 1B 모델 - 가벼운 작업용",
|
|
"size": "1.3GB",
|
|
"tags": ["chat", "lightweight"]
|
|
},
|
|
{
|
|
"name": "llama3.2:3b",
|
|
"description": "Meta의 Llama 3.2 3B 모델 - 균형잡힌 성능",
|
|
"size": "2.0GB",
|
|
"tags": ["chat", "recommended"]
|
|
},
|
|
{
|
|
"name": "qwen2.5:7b",
|
|
"description": "Alibaba의 Qwen 2.5 7B 모델 - 다국어 지원",
|
|
"size": "4.1GB",
|
|
"tags": ["chat", "multilingual"]
|
|
},
|
|
{
|
|
"name": "gemma2:2b",
|
|
"description": "Google의 Gemma 2 2B 모델 - 효율적인 추론",
|
|
"size": "1.6GB",
|
|
"tags": ["chat", "efficient"]
|
|
},
|
|
{
|
|
"name": "codellama:7b",
|
|
"description": "Meta의 Code Llama 7B - 코드 생성 특화",
|
|
"size": "3.8GB",
|
|
"tags": ["code", "programming"]
|
|
},
|
|
{
|
|
"name": "mistral:7b",
|
|
"description": "Mistral AI의 7B 모델 - 고성능 추론",
|
|
"size": "4.1GB",
|
|
"tags": ["chat", "performance"]
|
|
}
|
|
]
|
|
|
|
return {"available_models": available_models}
|
|
|
|
|
|
# Phase 2: System Monitoring
|
|
@app.get("/admin/system/stats")
|
|
async def admin_get_system_stats(api_key: str = Depends(require_api_key)):
|
|
"""시스템 리소스 사용률 조회"""
|
|
import psutil
|
|
import GPUtil
|
|
|
|
try:
|
|
# CPU 사용률
|
|
cpu_percent = psutil.cpu_percent(interval=1)
|
|
cpu_count = psutil.cpu_count()
|
|
|
|
# 메모리 사용률
|
|
memory = psutil.virtual_memory()
|
|
memory_percent = memory.percent
|
|
memory_used = memory.used // (1024**3) # GB
|
|
memory_total = memory.total // (1024**3) # GB
|
|
|
|
# 디스크 사용률
|
|
disk = psutil.disk_usage('/')
|
|
disk_percent = (disk.used / disk.total) * 100
|
|
disk_used = disk.used // (1024**3) # GB
|
|
disk_total = disk.total // (1024**3) # GB
|
|
|
|
# GPU 사용률 (NVIDIA GPU가 있는 경우)
|
|
gpu_stats = []
|
|
try:
|
|
gpus = GPUtil.getGPUs()
|
|
for gpu in gpus:
|
|
gpu_stats.append({
|
|
"name": gpu.name,
|
|
"load": gpu.load * 100,
|
|
"memory_used": gpu.memoryUsed,
|
|
"memory_total": gpu.memoryTotal,
|
|
"temperature": gpu.temperature
|
|
})
|
|
except:
|
|
gpu_stats = []
|
|
|
|
return {
|
|
"cpu": {
|
|
"usage_percent": cpu_percent,
|
|
"core_count": cpu_count
|
|
},
|
|
"memory": {
|
|
"usage_percent": memory_percent,
|
|
"used_gb": memory_used,
|
|
"total_gb": memory_total
|
|
},
|
|
"disk": {
|
|
"usage_percent": disk_percent,
|
|
"used_gb": disk_used,
|
|
"total_gb": disk_total
|
|
},
|
|
"gpu": gpu_stats,
|
|
"timestamp": datetime.now().isoformat()
|
|
}
|
|
except Exception as e:
|
|
return {
|
|
"error": f"Failed to get system stats: {str(e)}",
|
|
"timestamp": datetime.now().isoformat()
|
|
}
|
|
|
|
|
|
# Phase 3: JWT Authentication System
|
|
from .auth import AuthManager, AuditLogger, get_current_user, require_admin_role, require_system_role
|
|
|
|
@app.get("/login", response_class=HTMLResponse)
|
|
async def login_page(request: Request):
|
|
"""로그인 페이지"""
|
|
return templates.TemplateResponse("login.html", {"request": request})
|
|
|
|
@app.post("/admin/login")
|
|
async def admin_login(request: Request):
|
|
"""JWT 기반 로그인"""
|
|
try:
|
|
data = await request.json()
|
|
username = data.get("username", "").strip()
|
|
password = data.get("password", "")
|
|
remember_me = data.get("remember_me", False)
|
|
|
|
if not username or not password:
|
|
return {"success": False, "message": "Username and password are required"}
|
|
|
|
# Get client IP
|
|
client_ip = request.client.host
|
|
user_agent = request.headers.get("user-agent", "")
|
|
|
|
try:
|
|
# Authenticate user
|
|
user = AuthManager.authenticate_user(username, password)
|
|
|
|
if user:
|
|
# Create JWT token
|
|
token = AuthManager.create_jwt_token(user, remember_me)
|
|
|
|
# Record successful login
|
|
AuthManager.record_login_attempt(username, True, client_ip)
|
|
AuditLogger.log_login(username, True, client_ip, user_agent)
|
|
|
|
return {
|
|
"success": True,
|
|
"message": "Login successful",
|
|
"token": token,
|
|
"user": {
|
|
"username": user["username"],
|
|
"role": user["role"]
|
|
}
|
|
}
|
|
else:
|
|
# Record failed login
|
|
AuthManager.record_login_attempt(username, False, client_ip)
|
|
AuditLogger.log_login(username, False, client_ip, user_agent)
|
|
|
|
return {"success": False, "message": "Invalid username or password"}
|
|
|
|
except HTTPException as e:
|
|
# Account locked
|
|
AuditLogger.log_login(username, False, client_ip, user_agent)
|
|
return {"success": False, "message": e.detail}
|
|
|
|
except Exception as e:
|
|
return {"success": False, "message": "Login error occurred"}
|
|
|
|
@app.get("/admin/verify-token")
|
|
async def verify_token(current_user: dict = Depends(get_current_user)):
|
|
"""JWT 토큰 검증"""
|
|
return {
|
|
"valid": True,
|
|
"user": current_user
|
|
}
|
|
|
|
@app.post("/admin/logout")
|
|
async def admin_logout(request: Request, current_user: dict = Depends(get_current_user)):
|
|
"""로그아웃 (클라이언트에서 토큰 삭제)"""
|
|
client_ip = request.client.host
|
|
AuditLogger.log_admin_action(
|
|
current_user["username"],
|
|
"logout",
|
|
"User logged out",
|
|
client_ip
|
|
)
|
|
|
|
return {"success": True, "message": "Logged out successfully"}
|
|
|
|
# Update existing admin routes to use JWT authentication
|
|
@app.get("/admin", response_class=HTMLResponse)
|
|
async def admin_dashboard(request: Request, current_user: dict = Depends(require_admin_role)):
|
|
"""관리자 대시보드 페이지 (JWT 인증 필요)"""
|
|
return templates.TemplateResponse("admin.html", {
|
|
"request": request,
|
|
"server_port": settings.ai_server_port,
|
|
"ollama_host": settings.ollama_host,
|
|
"current_user": current_user
|
|
})
|
|
|