from __future__ import annotations from fastapi import FastAPI, HTTPException from pydantic import BaseModel from typing import List, Dict, Any from .config import settings from .ollama_client import OllamaClient from .index_store import JsonlIndex app = FastAPI(title="Local AI Server", version="0.1.0") ollama = OllamaClient(settings.ollama_host) index = JsonlIndex(settings.index_path) class ChatRequest(BaseModel): model: str | None = None messages: List[Dict[str, str]] use_rag: bool = True top_k: int = 5 force_boost: bool = False options: Dict[str, Any] | None = None class SearchRequest(BaseModel): query: str top_k: int = 5 class UpsertRow(BaseModel): id: str text: str source: str | None = None class UpsertRequest(BaseModel): rows: List[UpsertRow] embed: bool = True model: str | None = None batch: int = 16 @app.get("/health") def health() -> Dict[str, Any]: return { "status": "ok", "base_model": settings.base_model, "boost_model": settings.boost_model, "embedding_model": settings.embedding_model, "index_loaded": len(index.rows) if index else 0, } @app.post("/search") def search(req: SearchRequest) -> Dict[str, Any]: if not index.rows: return {"results": []} qvec = ollama.embeddings(settings.embedding_model, req.query) results = index.search(qvec, top_k=req.top_k) return { "results": [ {"id": r.id, "score": float(score), "text": r.text[:400], "source": r.source} for r, score in results ] } @app.post("/chat") def chat(req: ChatRequest) -> Dict[str, Any]: model = req.model if not model: # 라우팅: 메시지 길이/force_boost 기준 간단 분기 total_chars = sum(len(m.get("content", "")) for m in req.messages) model = settings.boost_model if (req.force_boost or total_chars > 2000) else settings.base_model context_docs: List[str] = [] if req.use_rag and index.rows: q = "\n".join([m.get("content", "") for m in req.messages if m.get("role") == "user"]).strip() if q: qvec = ollama.embeddings(settings.embedding_model, q) hits = index.search(qvec, top_k=req.top_k) context_docs = [r.text for r, _ in hits] sys_prompt = "" if context_docs: sys_prompt = ( "당신은 문서 기반 비서입니다. 제공된 컨텍스트만 신뢰하고, 모르면 모른다고 답하세요.\n\n" + "\n\n".join(f"[DOC {i+1}]\n{t}" for i, t in enumerate(context_docs)) ) messages: List[Dict[str, str]] = [] if sys_prompt: messages.append({"role": "system", "content": sys_prompt}) messages.extend(req.messages) try: resp = ollama.chat(model, messages, stream=False, options=req.options) return {"model": model, "response": resp} except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.post("/index/upsert") def index_upsert(req: UpsertRequest) -> Dict[str, Any]: try: if not req.rows: return {"added": 0} model = req.model or settings.embedding_model new_rows = [] for r in req.rows: vec = ollama.embeddings(model, r.text) if req.embed else [] new_rows.append({ "id": r.id, "text": r.text, "vector": vec, "source": r.source or "api", }) # convert to IndexRow and append from .index_store import IndexRow to_append = [IndexRow(**nr) for nr in new_rows] added = index.append(to_append) return {"added": added} except Exception as e: raise HTTPException(status_code=500, detail=f"index_upsert_error: {e}") @app.post("/index/reload") def index_reload() -> Dict[str, Any]: total = index.reload() return {"total": total} # Paperless webhook placeholder (to be wired with user-provided details) class PaperlessHook(BaseModel): document_id: int title: str | None = None tags: List[str] | None = None @app.post("/paperless/hook") def paperless_hook(hook: PaperlessHook) -> Dict[str, Any]: # NOTE: 확장 지점 - paperless API를 조회하여 문서 텍스트/메타데이터를 받아 # scripts/embed_ollama.py와 동일 로직으로 인덱스를 업데이트할 수 있습니다. return {"status": "ack", "document_id": hook.document_id}