Files
ai-server/server/main.py
Hyungi Ahn e102ce6db9 feat: AI 서버 관리 페이지 Phase 1 구현
- 웹 기반 관리 대시보드 추가 (/admin)
- 시스템 상태 모니터링 (AI 서버, Ollama, 활성 모델, API 호출)
- 모델 관리 기능 (목록 조회, 테스트, 새로고침)
- API 키 관리 시스템 (생성, 조회, 삭제)
- 반응형 UI/UX 디자인 (모바일 지원)
- 테스트 모드 서버 (test_admin.py) 추가
- 보안: API 키 기반 인증, 키 마스킹
- 실시간 업데이트 (30초 자동 새로고침)

구현 파일:
- templates/admin.html: 관리 페이지 HTML
- static/admin.css: 관리 페이지 스타일
- static/admin.js: 관리 페이지 JavaScript
- server/main.py: 관리 API 엔드포인트 추가
- test_admin.py: 맥북프로 테스트용 서버
- README.md: 관리 페이지 문서 업데이트
2025-08-18 13:33:39 +09:00

598 lines
20 KiB
Python

from __future__ import annotations
from fastapi import FastAPI, HTTPException, Depends, UploadFile, File, Form, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.templating import Jinja2Templates
from fastapi.staticfiles import StaticFiles
from fastapi.responses import HTMLResponse
from pydantic import BaseModel
from typing import List, Dict, Any
import shutil
from pathlib import Path
import os
from datetime import datetime
from .config import settings
from .ollama_client import OllamaClient
from .index_store import JsonlIndex
from .security import require_api_key
from .paperless_client import PaperlessClient
from .utils import chunk_text
from .pipeline import DocumentPipeline
app = FastAPI(title="Local AI Server", version="0.2.1")
# 템플릿과 정적 파일 설정
templates = Jinja2Templates(directory="templates")
app.mount("/static", StaticFiles(directory="static"), name="static")
# HTML 출력 디렉토리도 정적 파일로 서빙
if Path("outputs/html").exists():
app.mount("/html", StaticFiles(directory="outputs/html"), name="html")
# CORS
import os
cors_origins = os.getenv("CORS_ORIGINS", "*")
origins = [o.strip() for o in cors_origins.split(",") if o.strip()] or ["*"]
app.add_middleware(
CORSMiddleware,
allow_origins=origins,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
ollama = OllamaClient(settings.ollama_host)
index = JsonlIndex(settings.index_path)
pipeline = DocumentPipeline(ollama, settings.embedding_model, settings.boost_model, output_dir=settings.output_dir)
class ChatRequest(BaseModel):
model: str | None = None
messages: List[Dict[str, str]]
use_rag: bool = True
top_k: int = 5
force_boost: bool = False
options: Dict[str, Any] | None = None
class SearchRequest(BaseModel):
query: str
top_k: int = 5
class UpsertRow(BaseModel):
id: str
text: str
source: str | None = None
class UpsertRequest(BaseModel):
rows: List[UpsertRow]
embed: bool = True
model: str | None = None
batch: int = 16
class PipelineIngestRequest(BaseModel):
doc_id: str
text: str
generate_html: bool = True
translate: bool = True
target_language: str = "ko"
summarize: bool = False
summary_sentences: int = 5
summary_language: str | None = None
html_basename: str | None = None
@app.get("/health")
def health() -> Dict[str, Any]:
return {
"status": "ok",
"base_model": settings.base_model,
"boost_model": settings.boost_model,
"embedding_model": settings.embedding_model,
"index_loaded": len(index.rows) if index else 0,
}
@app.post("/search")
def search(req: SearchRequest) -> Dict[str, Any]:
if not index.rows:
return {"results": []}
qvec = ollama.embeddings(settings.embedding_model, req.query)
results = index.search(qvec, top_k=req.top_k)
return {
"results": [
{"id": r.id, "score": float(score), "text": r.text[:400], "source": r.source}
for r, score in results
]
}
@app.post("/chat")
def chat(req: ChatRequest) -> Dict[str, Any]:
model = req.model
if not model:
# 언어 감지(매우 단순): 영문 비율이 높으면 영어 모델, 아니면 기본/부스팅
user_text = "\n".join(m.get("content", "") for m in req.messages if m.get("role") == "user")
ascii_letters = sum(ch.isascii() and ch.isalpha() for ch in user_text)
non_ascii_letters = sum((not ch.isascii()) and ch.isalpha() for ch in user_text)
english_ratio = ascii_letters / max(ascii_letters + non_ascii_letters, 1)
total_chars = len(user_text)
if english_ratio > settings.english_ratio_threshold:
model = settings.english_model
else:
model = settings.boost_model if (req.force_boost or total_chars > 2000) else settings.base_model
context_docs: List[str] = []
if req.use_rag and index.rows:
q = "\n".join([m.get("content", "") for m in req.messages if m.get("role") == "user"]).strip()
if q:
qvec = ollama.embeddings(settings.embedding_model, q)
hits = index.search(qvec, top_k=req.top_k)
context_docs = [r.text for r, _ in hits]
sys_prompt = ""
if context_docs:
sys_prompt = (
"당신은 문서 기반 비서입니다. 제공된 컨텍스트만 신뢰하고, 모르면 모른다고 답하세요.\n\n"
+ "\n\n".join(f"[DOC {i+1}]\n{t}" for i, t in enumerate(context_docs))
)
messages: List[Dict[str, str]] = []
if sys_prompt:
messages.append({"role": "system", "content": sys_prompt})
messages.extend(req.messages)
try:
resp = ollama.chat(model, messages, stream=False, options=req.options)
return {"model": model, "response": resp}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post("/index/upsert")
def index_upsert(req: UpsertRequest) -> Dict[str, Any]:
try:
if not req.rows:
return {"added": 0}
model = req.model or settings.embedding_model
new_rows = []
for r in req.rows:
vec = ollama.embeddings(model, r.text) if req.embed else []
new_rows.append({
"id": r.id,
"text": r.text,
"vector": vec,
"source": r.source or "api",
})
# convert to IndexRow and append
from .index_store import IndexRow
to_append = [IndexRow(**nr) for nr in new_rows]
added = index.append(to_append)
return {"added": added}
except Exception as e:
raise HTTPException(status_code=500, detail=f"index_upsert_error: {e}")
@app.post("/index/reload")
def index_reload() -> Dict[str, Any]:
total = index.reload()
return {"total": total}
@app.post("/pipeline/ingest")
def pipeline_ingest(req: PipelineIngestRequest, _: None = Depends(require_api_key)) -> Dict[str, Any]:
result = pipeline.process(
doc_id=req.doc_id,
text=req.text,
index=index,
generate_html=req.generate_html,
translate=req.translate,
target_language=req.target_language,
summarize=req.summarize,
summary_sentences=req.summary_sentences,
summary_language=req.summary_language,
html_basename=req.html_basename,
)
exported_html: str | None = None
if result.html_path and settings.export_html_dir:
Path(settings.export_html_dir).mkdir(parents=True, exist_ok=True)
dst = str(Path(settings.export_html_dir) / Path(result.html_path).name)
shutil.copyfile(result.html_path, dst)
exported_html = dst
return {"status": "ok", "doc_id": result.doc_id, "added": result.added_chunks, "chunks": result.chunks, "html_path": result.html_path, "exported_html": exported_html}
@app.post("/pipeline/ingest_file")
async def pipeline_ingest_file(
_: None = Depends(require_api_key),
file: UploadFile = File(...),
doc_id: str = Form(...),
generate_html: bool = Form(True),
translate: bool = Form(True),
target_language: str = Form("ko"),
) -> Dict[str, Any]:
content_type = (file.content_type or "").lower()
raw = await file.read()
text = ""
if "text/plain" in content_type or file.filename.endswith(".txt"):
try:
text = raw.decode("utf-8")
except Exception:
text = raw.decode("latin-1", errors="ignore")
elif "pdf" in content_type or file.filename.endswith(".pdf"):
try:
from pypdf import PdfReader
from io import BytesIO
reader = PdfReader(BytesIO(raw))
parts: List[str] = []
for p in reader.pages:
try:
parts.append(p.extract_text() or "")
except Exception:
parts.append("")
text = "\n\n".join(parts)
except Exception as e:
raise HTTPException(status_code=400, detail=f"pdf_extract_error: {e}")
else:
raise HTTPException(status_code=400, detail="unsupported_file_type (only .txt/.pdf)")
if not text.strip():
raise HTTPException(status_code=400, detail="empty_text_after_extraction")
result = pipeline.process(
doc_id=doc_id,
text=text,
index=index,
generate_html=generate_html,
translate=translate,
target_language=target_language,
html_basename=file.filename,
)
exported_html: str | None = None
if result.html_path and settings.export_html_dir:
Path(settings.export_html_dir).mkdir(parents=True, exist_ok=True)
dst = str(Path(settings.export_html_dir) / Path(result.html_path).name)
shutil.copyfile(result.html_path, dst)
exported_html = dst
if settings.export_upload_dir:
Path(settings.export_upload_dir).mkdir(parents=True, exist_ok=True)
orig_name = f"{doc_id}__{file.filename}"
with open(str(Path(settings.export_upload_dir) / orig_name), "wb") as f:
f.write(raw)
return {"status": "ok", "doc_id": result.doc_id, "added": result.added_chunks, "chunks": result.chunks, "html_path": result.html_path, "exported_html": exported_html}
# Paperless webhook placeholder (to be wired with user-provided details)
class PaperlessHook(BaseModel):
document_id: int
title: str | None = None
tags: List[str] | None = None
@app.post("/paperless/hook")
def paperless_hook(hook: PaperlessHook, _: None = Depends(require_api_key)) -> Dict[str, Any]:
# Fetch text from Paperless and upsert into index
client = PaperlessClient(settings.paperless_base_url, settings.paperless_token)
text = client.get_document_text(hook.document_id)
parts = chunk_text(text)
model = settings.embedding_model
from .index_store import IndexRow
to_append = []
for i, t in enumerate(parts):
vec = ollama.embeddings(model, t)
to_append.append(IndexRow(id=f"paperless:{hook.document_id}:{i}", text=t, vector=vec, source="paperless"))
added = index.append(to_append)
return {"status": "indexed", "document_id": hook.document_id, "chunks": added}
class PaperlessSyncRequest(BaseModel):
page_size: int = 50
ordering: str = "-created"
tags: List[int] | None = None
query: str | None = None
limit: int = 200
@app.post("/paperless/sync")
def paperless_sync(req: PaperlessSyncRequest, _: None = Depends(require_api_key)) -> Dict[str, Any]:
client = PaperlessClient(settings.paperless_base_url, settings.paperless_token)
from .index_store import IndexRow
added_total = 0
skipped = 0
next_url: str | None = None
fetched = 0
while True:
if next_url:
import requests as _rq
resp = _rq.get(next_url, headers=client._headers(), timeout=60)
resp.raise_for_status()
data = resp.json()
else:
data = client.list_documents(page_size=req.page_size, ordering=req.ordering, tags=req.tags, query=req.query)
results = data.get("results", [])
to_append: List[IndexRow] = []
for doc in results:
doc_id = doc.get("id")
if not doc_id:
continue
try:
text = client.get_document_text(int(doc_id))
if not text:
skipped += 1
continue
parts = chunk_text(text)
for i, t in enumerate(parts):
vec = ollama.embeddings(settings.embedding_model, t)
to_append.append(IndexRow(id=f"paperless:{doc_id}:{i}", text=t, vector=vec, source="paperless"))
except Exception:
skipped += 1
continue
if to_append:
added_total += index.append(to_append)
fetched += len(results)
if fetched >= req.limit:
break
next_url = data.get("next")
if not next_url:
break
return {"status": "synced", "added": added_total, "skipped": skipped}
# OpenAI-compatible chat completions (minimal)
class ChatCompletionsRequest(BaseModel):
model: str | None = None
messages: List[Dict[str, str]]
temperature: float | None = None
max_tokens: int | None = None
@app.post("/v1/chat/completions")
def chat_completions(req: ChatCompletionsRequest, _: None = Depends(require_api_key)) -> Dict[str, Any]:
chosen = req.model or settings.base_model
opts: Dict[str, Any] = {}
if req.temperature is not None:
opts["temperature"] = req.temperature
# Note: Ollama ignores max_tokens field; left here for interface similarity
resp = ollama.chat(chosen, req.messages, stream=False, options=opts)
# Minimal OpenAI-like response shape
return {
"id": "chatcmpl-local",
"object": "chat.completion",
"model": chosen,
"choices": [
{
"index": 0,
"message": resp.get("message", {"role": "assistant", "content": resp.get("response", "")}),
"finish_reason": resp.get("done_reason", "stop"),
}
],
}
# =============================================================================
# UI 라우트들
# =============================================================================
@app.get("/", response_class=HTMLResponse)
async def dashboard(request: Request):
"""메인 대시보드 페이지"""
# 서버 상태 가져오기
status = {
"base_model": settings.base_model,
"boost_model": settings.boost_model,
"embedding_model": settings.embedding_model,
"index_loaded": len(index.rows) if index else 0,
}
# 최근 문서 (임시 데이터 - 실제로는 DB나 파일에서 가져올 것)
recent_documents = []
# 통계 (임시 데이터)
stats = {
"total_documents": len(index.rows) if index else 0,
"total_chunks": len(index.rows) if index else 0,
"today_processed": 0,
}
return templates.TemplateResponse("index.html", {
"request": request,
"status": status,
"recent_documents": recent_documents,
"stats": stats,
})
@app.get("/upload", response_class=HTMLResponse)
async def upload_page(request: Request):
"""파일 업로드 페이지"""
return templates.TemplateResponse("upload.html", {
"request": request,
"api_key": os.getenv("API_KEY", "")
})
def format_file_size(bytes_size):
"""파일 크기 포맷팅 헬퍼 함수"""
if bytes_size == 0:
return "0 Bytes"
k = 1024
sizes = ["Bytes", "KB", "MB", "GB"]
i = int(bytes_size / k)
if i >= len(sizes):
i = len(sizes) - 1
return f"{bytes_size / (k ** i):.2f} {sizes[i]}"
@app.get("/documents", response_class=HTMLResponse)
async def documents_page(request: Request):
"""문서 관리 페이지"""
# HTML 파일 목록 가져오기
html_dir = Path("outputs/html")
html_files = []
if html_dir.exists():
for file in html_dir.glob("*.html"):
stat = file.stat()
html_files.append({
"name": file.name,
"size": stat.st_size,
"created": datetime.fromtimestamp(stat.st_ctime).strftime("%Y-%m-%d %H:%M"),
"url": f"/html/{file.name}"
})
# 날짜순 정렬 (최신순)
html_files.sort(key=lambda x: x["created"], reverse=True)
return templates.TemplateResponse("documents.html", {
"request": request,
"documents": html_files,
"formatFileSize": format_file_size,
})
@app.get("/chat", response_class=HTMLResponse)
async def chat_page(request: Request):
"""AI 챗봇 페이지"""
# 서버 상태 정보
status = {
"base_model": settings.base_model,
"boost_model": settings.boost_model,
"embedding_model": settings.embedding_model,
"index_loaded": len(index.rows) if index else 0,
}
return templates.TemplateResponse("chat.html", {
"request": request,
"status": status,
"current_time": datetime.now().strftime("%H:%M"),
"api_key": os.getenv("API_KEY", "")
})
# Admin Dashboard Routes
@app.get("/admin", response_class=HTMLResponse)
async def admin_dashboard(request: Request, api_key: str = Depends(require_api_key)):
"""관리자 대시보드 페이지"""
return templates.TemplateResponse("admin.html", {
"request": request,
"server_port": settings.ai_server_port,
"ollama_host": settings.ollama_host,
})
@app.get("/admin/ollama/status")
async def admin_ollama_status(api_key: str = Depends(require_api_key)):
"""Ollama 서버 상태 확인"""
try:
# Ollama 서버에 ping 요청
response = await ollama.client.get(f"{settings.ollama_host}/api/tags")
if response.status_code == 200:
return {"status": "online", "models_count": len(response.json().get("models", []))}
else:
return {"status": "offline", "error": f"HTTP {response.status_code}"}
except Exception as e:
return {"status": "offline", "error": str(e)}
@app.get("/admin/models")
async def admin_get_models(api_key: str = Depends(require_api_key)):
"""설치된 모델 목록 조회"""
try:
models_data = await ollama.list_models()
models = []
for model in models_data.get("models", []):
models.append({
"name": model.get("name", "Unknown"),
"size": model.get("size", 0),
"status": "ready",
"is_active": model.get("name") == settings.base_model,
"last_used": model.get("modified_at"),
})
return {"models": models}
except Exception as e:
return {"models": [], "error": str(e)}
@app.get("/admin/models/active")
async def admin_get_active_model(api_key: str = Depends(require_api_key)):
"""현재 활성 모델 조회"""
return {"model": settings.base_model}
@app.post("/admin/models/test")
async def admin_test_model(request: dict, api_key: str = Depends(require_api_key)):
"""모델 테스트"""
model_name = request.get("model")
if not model_name:
raise HTTPException(status_code=400, detail="Model name is required")
try:
# 간단한 테스트 메시지 전송
test_response = await ollama.generate(
model=model_name,
prompt="Hello, this is a test. Please respond with 'Test successful'.",
stream=False
)
return {
"result": f"Test successful. Model responded: {test_response.get('response', 'No response')[:100]}..."
}
except Exception as e:
return {"result": f"Test failed: {str(e)}"}
# API Key Management (Placeholder - 실제 구현은 데이터베이스 필요)
api_keys_store = {} # 임시 저장소
@app.get("/admin/api-keys")
async def admin_get_api_keys(api_key: str = Depends(require_api_key)):
"""API 키 목록 조회"""
keys = []
for key_id, key_data in api_keys_store.items():
keys.append({
"id": key_id,
"name": key_data.get("name", "Unnamed"),
"key": key_data.get("key", ""),
"created_at": key_data.get("created_at", datetime.now().isoformat()),
"usage_count": key_data.get("usage_count", 0),
})
return {"api_keys": keys}
@app.post("/admin/api-keys")
async def admin_create_api_key(request: dict, api_key: str = Depends(require_api_key)):
"""새 API 키 생성"""
import secrets
import uuid
name = request.get("name", "Unnamed Key")
new_key = secrets.token_urlsafe(32)
key_id = str(uuid.uuid4())
api_keys_store[key_id] = {
"name": name,
"key": new_key,
"created_at": datetime.now().isoformat(),
"usage_count": 0,
}
return {"api_key": new_key, "key_id": key_id}
@app.delete("/admin/api-keys/{key_id}")
async def admin_delete_api_key(key_id: str, api_key: str = Depends(require_api_key)):
"""API 키 삭제"""
if key_id in api_keys_store:
del api_keys_store[key_id]
return {"message": "API key deleted successfully"}
else:
raise HTTPException(status_code=404, detail="API key not found")