Files
ai-server/server/main.py
Hyungi Ahn b752e56b94 feat: AI 서버 관리 페이지 Phase 2 고급 기능 구현
🤖 모델 관리 고도화:
- 모델 다운로드: 인기 모델들 원클릭 설치 (llama, qwen, gemma, codellama, mistral)
- 모델 삭제: 확인 모달과 함께 안전한 삭제 기능
- 사용 가능한 모델 목록: 태그별 분류 (chat, code, lightweight 등)
- 모델 상세 정보: 설명, 크기, 용도별 태그 표시

�� 실시간 시스템 모니터링:
- CPU/메모리/디스크/GPU 사용률 원형 프로그레스바
- 색상 코딩: 사용률에 따른 시각적 구분 (녹색/주황/빨강)
- 실시간 업데이트: 30초마다 자동 새로고침
- 시스템 리소스 상세 정보 (코어 수, 용량, 온도 등)

🎨 고급 UI/UX:
- 모달 창: 부드러운 애니메이션과 블러 효과
- 원형 프로그레스바: CSS 기반 실시간 업데이트
- 반응형 디자인: 모바일 최적화
- 태그 시스템: 모델 분류 및 시각화

🔧 새 API 엔드포인트:
- POST /admin/models/download - 모델 다운로드
- DELETE /admin/models/{model_name} - 모델 삭제
- GET /admin/models/available - 다운로드 가능한 모델 목록
- GET /admin/system/stats - 시스템 리소스 사용률

수정된 파일:
- server/main.py: Phase 2 API 엔드포인트 추가
- test_admin.py: 테스트 모드 Phase 2 기능 추가
- templates/admin.html: 시스템 모니터링 섹션, 모달 창 추가
- static/admin.css: 모니터링 차트, 모달 스타일 추가
- static/admin.js: Phase 2 기능 JavaScript 구현
2025-08-18 13:45:04 +09:00

749 lines
25 KiB
Python

from __future__ import annotations
from fastapi import FastAPI, HTTPException, Depends, UploadFile, File, Form, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.templating import Jinja2Templates
from fastapi.staticfiles import StaticFiles
from fastapi.responses import HTMLResponse
from pydantic import BaseModel
from typing import List, Dict, Any
import shutil
from pathlib import Path
import os
from datetime import datetime
from .config import settings
from .ollama_client import OllamaClient
from .index_store import JsonlIndex
from .security import require_api_key
from .paperless_client import PaperlessClient
from .utils import chunk_text
from .pipeline import DocumentPipeline
app = FastAPI(title="Local AI Server", version="0.2.1")
# 템플릿과 정적 파일 설정
templates = Jinja2Templates(directory="templates")
app.mount("/static", StaticFiles(directory="static"), name="static")
# HTML 출력 디렉토리도 정적 파일로 서빙
if Path("outputs/html").exists():
app.mount("/html", StaticFiles(directory="outputs/html"), name="html")
# CORS
import os
cors_origins = os.getenv("CORS_ORIGINS", "*")
origins = [o.strip() for o in cors_origins.split(",") if o.strip()] or ["*"]
app.add_middleware(
CORSMiddleware,
allow_origins=origins,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
ollama = OllamaClient(settings.ollama_host)
index = JsonlIndex(settings.index_path)
pipeline = DocumentPipeline(ollama, settings.embedding_model, settings.boost_model, output_dir=settings.output_dir)
class ChatRequest(BaseModel):
model: str | None = None
messages: List[Dict[str, str]]
use_rag: bool = True
top_k: int = 5
force_boost: bool = False
options: Dict[str, Any] | None = None
class SearchRequest(BaseModel):
query: str
top_k: int = 5
class UpsertRow(BaseModel):
id: str
text: str
source: str | None = None
class UpsertRequest(BaseModel):
rows: List[UpsertRow]
embed: bool = True
model: str | None = None
batch: int = 16
class PipelineIngestRequest(BaseModel):
doc_id: str
text: str
generate_html: bool = True
translate: bool = True
target_language: str = "ko"
summarize: bool = False
summary_sentences: int = 5
summary_language: str | None = None
html_basename: str | None = None
@app.get("/health")
def health() -> Dict[str, Any]:
return {
"status": "ok",
"base_model": settings.base_model,
"boost_model": settings.boost_model,
"embedding_model": settings.embedding_model,
"index_loaded": len(index.rows) if index else 0,
}
@app.post("/search")
def search(req: SearchRequest) -> Dict[str, Any]:
if not index.rows:
return {"results": []}
qvec = ollama.embeddings(settings.embedding_model, req.query)
results = index.search(qvec, top_k=req.top_k)
return {
"results": [
{"id": r.id, "score": float(score), "text": r.text[:400], "source": r.source}
for r, score in results
]
}
@app.post("/chat")
def chat(req: ChatRequest) -> Dict[str, Any]:
model = req.model
if not model:
# 언어 감지(매우 단순): 영문 비율이 높으면 영어 모델, 아니면 기본/부스팅
user_text = "\n".join(m.get("content", "") for m in req.messages if m.get("role") == "user")
ascii_letters = sum(ch.isascii() and ch.isalpha() for ch in user_text)
non_ascii_letters = sum((not ch.isascii()) and ch.isalpha() for ch in user_text)
english_ratio = ascii_letters / max(ascii_letters + non_ascii_letters, 1)
total_chars = len(user_text)
if english_ratio > settings.english_ratio_threshold:
model = settings.english_model
else:
model = settings.boost_model if (req.force_boost or total_chars > 2000) else settings.base_model
context_docs: List[str] = []
if req.use_rag and index.rows:
q = "\n".join([m.get("content", "") for m in req.messages if m.get("role") == "user"]).strip()
if q:
qvec = ollama.embeddings(settings.embedding_model, q)
hits = index.search(qvec, top_k=req.top_k)
context_docs = [r.text for r, _ in hits]
sys_prompt = ""
if context_docs:
sys_prompt = (
"당신은 문서 기반 비서입니다. 제공된 컨텍스트만 신뢰하고, 모르면 모른다고 답하세요.\n\n"
+ "\n\n".join(f"[DOC {i+1}]\n{t}" for i, t in enumerate(context_docs))
)
messages: List[Dict[str, str]] = []
if sys_prompt:
messages.append({"role": "system", "content": sys_prompt})
messages.extend(req.messages)
try:
resp = ollama.chat(model, messages, stream=False, options=req.options)
return {"model": model, "response": resp}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post("/index/upsert")
def index_upsert(req: UpsertRequest) -> Dict[str, Any]:
try:
if not req.rows:
return {"added": 0}
model = req.model or settings.embedding_model
new_rows = []
for r in req.rows:
vec = ollama.embeddings(model, r.text) if req.embed else []
new_rows.append({
"id": r.id,
"text": r.text,
"vector": vec,
"source": r.source or "api",
})
# convert to IndexRow and append
from .index_store import IndexRow
to_append = [IndexRow(**nr) for nr in new_rows]
added = index.append(to_append)
return {"added": added}
except Exception as e:
raise HTTPException(status_code=500, detail=f"index_upsert_error: {e}")
@app.post("/index/reload")
def index_reload() -> Dict[str, Any]:
total = index.reload()
return {"total": total}
@app.post("/pipeline/ingest")
def pipeline_ingest(req: PipelineIngestRequest, _: None = Depends(require_api_key)) -> Dict[str, Any]:
result = pipeline.process(
doc_id=req.doc_id,
text=req.text,
index=index,
generate_html=req.generate_html,
translate=req.translate,
target_language=req.target_language,
summarize=req.summarize,
summary_sentences=req.summary_sentences,
summary_language=req.summary_language,
html_basename=req.html_basename,
)
exported_html: str | None = None
if result.html_path and settings.export_html_dir:
Path(settings.export_html_dir).mkdir(parents=True, exist_ok=True)
dst = str(Path(settings.export_html_dir) / Path(result.html_path).name)
shutil.copyfile(result.html_path, dst)
exported_html = dst
return {"status": "ok", "doc_id": result.doc_id, "added": result.added_chunks, "chunks": result.chunks, "html_path": result.html_path, "exported_html": exported_html}
@app.post("/pipeline/ingest_file")
async def pipeline_ingest_file(
_: None = Depends(require_api_key),
file: UploadFile = File(...),
doc_id: str = Form(...),
generate_html: bool = Form(True),
translate: bool = Form(True),
target_language: str = Form("ko"),
) -> Dict[str, Any]:
content_type = (file.content_type or "").lower()
raw = await file.read()
text = ""
if "text/plain" in content_type or file.filename.endswith(".txt"):
try:
text = raw.decode("utf-8")
except Exception:
text = raw.decode("latin-1", errors="ignore")
elif "pdf" in content_type or file.filename.endswith(".pdf"):
try:
from pypdf import PdfReader
from io import BytesIO
reader = PdfReader(BytesIO(raw))
parts: List[str] = []
for p in reader.pages:
try:
parts.append(p.extract_text() or "")
except Exception:
parts.append("")
text = "\n\n".join(parts)
except Exception as e:
raise HTTPException(status_code=400, detail=f"pdf_extract_error: {e}")
else:
raise HTTPException(status_code=400, detail="unsupported_file_type (only .txt/.pdf)")
if not text.strip():
raise HTTPException(status_code=400, detail="empty_text_after_extraction")
result = pipeline.process(
doc_id=doc_id,
text=text,
index=index,
generate_html=generate_html,
translate=translate,
target_language=target_language,
html_basename=file.filename,
)
exported_html: str | None = None
if result.html_path and settings.export_html_dir:
Path(settings.export_html_dir).mkdir(parents=True, exist_ok=True)
dst = str(Path(settings.export_html_dir) / Path(result.html_path).name)
shutil.copyfile(result.html_path, dst)
exported_html = dst
if settings.export_upload_dir:
Path(settings.export_upload_dir).mkdir(parents=True, exist_ok=True)
orig_name = f"{doc_id}__{file.filename}"
with open(str(Path(settings.export_upload_dir) / orig_name), "wb") as f:
f.write(raw)
return {"status": "ok", "doc_id": result.doc_id, "added": result.added_chunks, "chunks": result.chunks, "html_path": result.html_path, "exported_html": exported_html}
# Paperless webhook placeholder (to be wired with user-provided details)
class PaperlessHook(BaseModel):
document_id: int
title: str | None = None
tags: List[str] | None = None
@app.post("/paperless/hook")
def paperless_hook(hook: PaperlessHook, _: None = Depends(require_api_key)) -> Dict[str, Any]:
# Fetch text from Paperless and upsert into index
client = PaperlessClient(settings.paperless_base_url, settings.paperless_token)
text = client.get_document_text(hook.document_id)
parts = chunk_text(text)
model = settings.embedding_model
from .index_store import IndexRow
to_append = []
for i, t in enumerate(parts):
vec = ollama.embeddings(model, t)
to_append.append(IndexRow(id=f"paperless:{hook.document_id}:{i}", text=t, vector=vec, source="paperless"))
added = index.append(to_append)
return {"status": "indexed", "document_id": hook.document_id, "chunks": added}
class PaperlessSyncRequest(BaseModel):
page_size: int = 50
ordering: str = "-created"
tags: List[int] | None = None
query: str | None = None
limit: int = 200
@app.post("/paperless/sync")
def paperless_sync(req: PaperlessSyncRequest, _: None = Depends(require_api_key)) -> Dict[str, Any]:
client = PaperlessClient(settings.paperless_base_url, settings.paperless_token)
from .index_store import IndexRow
added_total = 0
skipped = 0
next_url: str | None = None
fetched = 0
while True:
if next_url:
import requests as _rq
resp = _rq.get(next_url, headers=client._headers(), timeout=60)
resp.raise_for_status()
data = resp.json()
else:
data = client.list_documents(page_size=req.page_size, ordering=req.ordering, tags=req.tags, query=req.query)
results = data.get("results", [])
to_append: List[IndexRow] = []
for doc in results:
doc_id = doc.get("id")
if not doc_id:
continue
try:
text = client.get_document_text(int(doc_id))
if not text:
skipped += 1
continue
parts = chunk_text(text)
for i, t in enumerate(parts):
vec = ollama.embeddings(settings.embedding_model, t)
to_append.append(IndexRow(id=f"paperless:{doc_id}:{i}", text=t, vector=vec, source="paperless"))
except Exception:
skipped += 1
continue
if to_append:
added_total += index.append(to_append)
fetched += len(results)
if fetched >= req.limit:
break
next_url = data.get("next")
if not next_url:
break
return {"status": "synced", "added": added_total, "skipped": skipped}
# OpenAI-compatible chat completions (minimal)
class ChatCompletionsRequest(BaseModel):
model: str | None = None
messages: List[Dict[str, str]]
temperature: float | None = None
max_tokens: int | None = None
@app.post("/v1/chat/completions")
def chat_completions(req: ChatCompletionsRequest, _: None = Depends(require_api_key)) -> Dict[str, Any]:
chosen = req.model or settings.base_model
opts: Dict[str, Any] = {}
if req.temperature is not None:
opts["temperature"] = req.temperature
# Note: Ollama ignores max_tokens field; left here for interface similarity
resp = ollama.chat(chosen, req.messages, stream=False, options=opts)
# Minimal OpenAI-like response shape
return {
"id": "chatcmpl-local",
"object": "chat.completion",
"model": chosen,
"choices": [
{
"index": 0,
"message": resp.get("message", {"role": "assistant", "content": resp.get("response", "")}),
"finish_reason": resp.get("done_reason", "stop"),
}
],
}
# =============================================================================
# UI 라우트들
# =============================================================================
@app.get("/", response_class=HTMLResponse)
async def dashboard(request: Request):
"""메인 대시보드 페이지"""
# 서버 상태 가져오기
status = {
"base_model": settings.base_model,
"boost_model": settings.boost_model,
"embedding_model": settings.embedding_model,
"index_loaded": len(index.rows) if index else 0,
}
# 최근 문서 (임시 데이터 - 실제로는 DB나 파일에서 가져올 것)
recent_documents = []
# 통계 (임시 데이터)
stats = {
"total_documents": len(index.rows) if index else 0,
"total_chunks": len(index.rows) if index else 0,
"today_processed": 0,
}
return templates.TemplateResponse("index.html", {
"request": request,
"status": status,
"recent_documents": recent_documents,
"stats": stats,
})
@app.get("/upload", response_class=HTMLResponse)
async def upload_page(request: Request):
"""파일 업로드 페이지"""
return templates.TemplateResponse("upload.html", {
"request": request,
"api_key": os.getenv("API_KEY", "")
})
def format_file_size(bytes_size):
"""파일 크기 포맷팅 헬퍼 함수"""
if bytes_size == 0:
return "0 Bytes"
k = 1024
sizes = ["Bytes", "KB", "MB", "GB"]
i = int(bytes_size / k)
if i >= len(sizes):
i = len(sizes) - 1
return f"{bytes_size / (k ** i):.2f} {sizes[i]}"
@app.get("/documents", response_class=HTMLResponse)
async def documents_page(request: Request):
"""문서 관리 페이지"""
# HTML 파일 목록 가져오기
html_dir = Path("outputs/html")
html_files = []
if html_dir.exists():
for file in html_dir.glob("*.html"):
stat = file.stat()
html_files.append({
"name": file.name,
"size": stat.st_size,
"created": datetime.fromtimestamp(stat.st_ctime).strftime("%Y-%m-%d %H:%M"),
"url": f"/html/{file.name}"
})
# 날짜순 정렬 (최신순)
html_files.sort(key=lambda x: x["created"], reverse=True)
return templates.TemplateResponse("documents.html", {
"request": request,
"documents": html_files,
"formatFileSize": format_file_size,
})
@app.get("/chat", response_class=HTMLResponse)
async def chat_page(request: Request):
"""AI 챗봇 페이지"""
# 서버 상태 정보
status = {
"base_model": settings.base_model,
"boost_model": settings.boost_model,
"embedding_model": settings.embedding_model,
"index_loaded": len(index.rows) if index else 0,
}
return templates.TemplateResponse("chat.html", {
"request": request,
"status": status,
"current_time": datetime.now().strftime("%H:%M"),
"api_key": os.getenv("API_KEY", "")
})
# Admin Dashboard Routes
@app.get("/admin", response_class=HTMLResponse)
async def admin_dashboard(request: Request, api_key: str = Depends(require_api_key)):
"""관리자 대시보드 페이지"""
return templates.TemplateResponse("admin.html", {
"request": request,
"server_port": settings.ai_server_port,
"ollama_host": settings.ollama_host,
})
@app.get("/admin/ollama/status")
async def admin_ollama_status(api_key: str = Depends(require_api_key)):
"""Ollama 서버 상태 확인"""
try:
# Ollama 서버에 ping 요청
response = await ollama.client.get(f"{settings.ollama_host}/api/tags")
if response.status_code == 200:
return {"status": "online", "models_count": len(response.json().get("models", []))}
else:
return {"status": "offline", "error": f"HTTP {response.status_code}"}
except Exception as e:
return {"status": "offline", "error": str(e)}
@app.get("/admin/models")
async def admin_get_models(api_key: str = Depends(require_api_key)):
"""설치된 모델 목록 조회"""
try:
models_data = await ollama.list_models()
models = []
for model in models_data.get("models", []):
models.append({
"name": model.get("name", "Unknown"),
"size": model.get("size", 0),
"status": "ready",
"is_active": model.get("name") == settings.base_model,
"last_used": model.get("modified_at"),
})
return {"models": models}
except Exception as e:
return {"models": [], "error": str(e)}
@app.get("/admin/models/active")
async def admin_get_active_model(api_key: str = Depends(require_api_key)):
"""현재 활성 모델 조회"""
return {"model": settings.base_model}
@app.post("/admin/models/test")
async def admin_test_model(request: dict, api_key: str = Depends(require_api_key)):
"""모델 테스트"""
model_name = request.get("model")
if not model_name:
raise HTTPException(status_code=400, detail="Model name is required")
try:
# 간단한 테스트 메시지 전송
test_response = await ollama.generate(
model=model_name,
prompt="Hello, this is a test. Please respond with 'Test successful'.",
stream=False
)
return {
"result": f"Test successful. Model responded: {test_response.get('response', 'No response')[:100]}..."
}
except Exception as e:
return {"result": f"Test failed: {str(e)}"}
# API Key Management (Placeholder - 실제 구현은 데이터베이스 필요)
api_keys_store = {} # 임시 저장소
@app.get("/admin/api-keys")
async def admin_get_api_keys(api_key: str = Depends(require_api_key)):
"""API 키 목록 조회"""
keys = []
for key_id, key_data in api_keys_store.items():
keys.append({
"id": key_id,
"name": key_data.get("name", "Unnamed"),
"key": key_data.get("key", ""),
"created_at": key_data.get("created_at", datetime.now().isoformat()),
"usage_count": key_data.get("usage_count", 0),
})
return {"api_keys": keys}
@app.post("/admin/api-keys")
async def admin_create_api_key(request: dict, api_key: str = Depends(require_api_key)):
"""새 API 키 생성"""
import secrets
import uuid
name = request.get("name", "Unnamed Key")
new_key = secrets.token_urlsafe(32)
key_id = str(uuid.uuid4())
api_keys_store[key_id] = {
"name": name,
"key": new_key,
"created_at": datetime.now().isoformat(),
"usage_count": 0,
}
return {"api_key": new_key, "key_id": key_id}
@app.delete("/admin/api-keys/{key_id}")
async def admin_delete_api_key(key_id: str, api_key: str = Depends(require_api_key)):
"""API 키 삭제"""
if key_id in api_keys_store:
del api_keys_store[key_id]
return {"message": "API key deleted successfully"}
else:
raise HTTPException(status_code=404, detail="API key not found")
# Phase 2: Advanced Model Management
@app.post("/admin/models/download")
async def admin_download_model(request: dict, api_key: str = Depends(require_api_key)):
"""모델 다운로드"""
model_name = request.get("model")
if not model_name:
raise HTTPException(status_code=400, detail="Model name is required")
try:
# Ollama pull 명령 실행
result = await ollama.pull_model(model_name)
return {
"success": True,
"message": f"Model '{model_name}' download started",
"details": result
}
except Exception as e:
return {
"success": False,
"error": f"Failed to download model: {str(e)}"
}
@app.delete("/admin/models/{model_name}")
async def admin_delete_model(model_name: str, api_key: str = Depends(require_api_key)):
"""모델 삭제"""
try:
# Ollama 모델 삭제
result = await ollama.delete_model(model_name)
return {
"success": True,
"message": f"Model '{model_name}' deleted successfully",
"details": result
}
except Exception as e:
return {
"success": False,
"error": f"Failed to delete model: {str(e)}"
}
@app.get("/admin/models/available")
async def admin_get_available_models(api_key: str = Depends(require_api_key)):
"""다운로드 가능한 모델 목록"""
# 인기 있는 모델들 목록 (실제로는 Ollama 레지스트리에서 가져와야 함)
available_models = [
{
"name": "llama3.2:1b",
"description": "Meta의 Llama 3.2 1B 모델 - 가벼운 작업용",
"size": "1.3GB",
"tags": ["chat", "lightweight"]
},
{
"name": "llama3.2:3b",
"description": "Meta의 Llama 3.2 3B 모델 - 균형잡힌 성능",
"size": "2.0GB",
"tags": ["chat", "recommended"]
},
{
"name": "qwen2.5:7b",
"description": "Alibaba의 Qwen 2.5 7B 모델 - 다국어 지원",
"size": "4.1GB",
"tags": ["chat", "multilingual"]
},
{
"name": "gemma2:2b",
"description": "Google의 Gemma 2 2B 모델 - 효율적인 추론",
"size": "1.6GB",
"tags": ["chat", "efficient"]
},
{
"name": "codellama:7b",
"description": "Meta의 Code Llama 7B - 코드 생성 특화",
"size": "3.8GB",
"tags": ["code", "programming"]
},
{
"name": "mistral:7b",
"description": "Mistral AI의 7B 모델 - 고성능 추론",
"size": "4.1GB",
"tags": ["chat", "performance"]
}
]
return {"available_models": available_models}
# Phase 2: System Monitoring
@app.get("/admin/system/stats")
async def admin_get_system_stats(api_key: str = Depends(require_api_key)):
"""시스템 리소스 사용률 조회"""
import psutil
import GPUtil
try:
# CPU 사용률
cpu_percent = psutil.cpu_percent(interval=1)
cpu_count = psutil.cpu_count()
# 메모리 사용률
memory = psutil.virtual_memory()
memory_percent = memory.percent
memory_used = memory.used // (1024**3) # GB
memory_total = memory.total // (1024**3) # GB
# 디스크 사용률
disk = psutil.disk_usage('/')
disk_percent = (disk.used / disk.total) * 100
disk_used = disk.used // (1024**3) # GB
disk_total = disk.total // (1024**3) # GB
# GPU 사용률 (NVIDIA GPU가 있는 경우)
gpu_stats = []
try:
gpus = GPUtil.getGPUs()
for gpu in gpus:
gpu_stats.append({
"name": gpu.name,
"load": gpu.load * 100,
"memory_used": gpu.memoryUsed,
"memory_total": gpu.memoryTotal,
"temperature": gpu.temperature
})
except:
gpu_stats = []
return {
"cpu": {
"usage_percent": cpu_percent,
"core_count": cpu_count
},
"memory": {
"usage_percent": memory_percent,
"used_gb": memory_used,
"total_gb": memory_total
},
"disk": {
"usage_percent": disk_percent,
"used_gb": disk_used,
"total_gb": disk_total
},
"gpu": gpu_stats,
"timestamp": datetime.now().isoformat()
}
except Exception as e:
return {
"error": f"Failed to get system stats: {str(e)}",
"timestamp": datetime.now().isoformat()
}