"""mlx-vlm proxy — ollama 호환 API for mlx-vlm inference (port 11435) ollama 호환 /api/generate 엔드포인트 제공. n8n에서 투명하게 사용 가능. Qwen3.5-27B-4bit를 mlx-vlm으로 서빙, thinking 자동 비활성화 (prefill 방식). """ import asyncio import logging import time from functools import partial from fastapi import FastAPI, Request from fastapi.responses import JSONResponse logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s") logger = logging.getLogger("mlx_proxy") MODEL_PATH = "mlx-community/Qwen3.5-27B-4bit" DISPLAY_NAME = "qwen3.5:27b" app = FastAPI() # Global model state — loaded once at startup model = None processor = None @app.on_event("startup") async def startup(): global model, processor logger.info(f"Loading {MODEL_PATH} ...") from mlx_vlm import load model, processor = load(MODEL_PATH) logger.info("Model ready") def _generate_sync( prompt: str, system: str, max_tokens: int, temperature: float, is_json: bool ) -> dict: """Blocking generation — runs in thread pool executor.""" from mlx_vlm import generate as mlx_generate from mlx_vlm.utils import apply_chat_template messages = [] # System prompt — strip /no_think (thinking handled by prefill) sys_text = (system or "").replace("/no_think", "").strip() if is_json and sys_text and "JSON" not in sys_text: sys_text += "\nJSON으로만 응답하세요." elif is_json and not sys_text: sys_text = "JSON으로만 응답하세요." if sys_text: messages.append({"role": "system", "content": sys_text}) messages.append({"role": "user", "content": prompt}) # Prefill: thinking 비활성화 — 모델이 단계 완료 상태에서 시작 messages.append({"role": "assistant", "content": "\n\n\n"}) formatted = apply_chat_template(processor, model.config, messages) t0 = time.perf_counter() output = mlx_generate( model, processor, formatted, max_tokens=max_tokens, temp=temperature ) elapsed = time.perf_counter() - t0 output = output.strip() # Token count (approximate via tokenizer) try: n_tokens = len(processor.tokenizer.encode(output)) except Exception: n_tokens = max(1, len(output) // 3) return {"text": output, "n_tokens": n_tokens, "elapsed": elapsed} @app.post("/api/generate") async def api_generate(request: Request): body = await request.json() prompt = body.get("prompt", "") system = body.get("system", "") opts = body.get("options", {}) max_tokens = opts.get("num_predict", 2048) temperature = opts.get("temperature", 0.7) is_json = body.get("format") == "json" if not prompt: return JSONResponse({"error": "prompt required"}, status_code=400) try: loop = asyncio.get_event_loop() result = await loop.run_in_executor( None, partial(_generate_sync, prompt, system, max_tokens, temperature, is_json), ) except Exception as e: logger.error(f"Generation failed: {e}") return JSONResponse({"error": str(e)}, status_code=500) return { "model": DISPLAY_NAME, "response": result["text"], "done": True, "eval_count": result["n_tokens"], "eval_duration": int(result["elapsed"] * 1e9), } @app.get("/health") async def health(): return {"status": "ok" if model is not None else "loading", "model": MODEL_PATH}