"""OpenAI-compatible proxy (MLX server, vLLM, etc.) — SSE passthrough.""" from __future__ import annotations import json import logging from collections.abc import AsyncGenerator import httpx logger = logging.getLogger(__name__) async def stream_chat( base_url: str, model: str, messages: list[dict], **kwargs, ) -> AsyncGenerator[str, None]: """Proxy OpenAI-compatible chat streaming. SSE passthrough with model field override.""" payload = { "model": model, "messages": messages, "stream": True, **{k: v for k, v in kwargs.items() if v is not None}, } async with httpx.AsyncClient(timeout=120.0) as client: async with client.stream( "POST", f"{base_url}/v1/chat/completions", json=payload, ) as resp: if resp.status_code != 200: body = await resp.aread() error_msg = body.decode("utf-8", errors="replace") yield _error_event(f"Backend error ({resp.status_code}): {error_msg}") return async for line in resp.aiter_lines(): if not line.strip(): continue # Pass through SSE lines as-is (already in OpenAI format) if line.startswith("data: "): yield f"{line}\n\n" elif line == "data: [DONE]": yield "data: [DONE]\n\n" async def complete_chat( base_url: str, model: str, messages: list[dict], **kwargs, ) -> dict: """Non-streaming OpenAI-compatible chat.""" payload = { "model": model, "messages": messages, "stream": False, **{k: v for k, v in kwargs.items() if v is not None}, } async with httpx.AsyncClient(timeout=120.0) as client: resp = await client.post(f"{base_url}/v1/chat/completions", json=payload) resp.raise_for_status() return resp.json() def _error_event(message: str) -> str: error = { "id": "chatcmpl-gateway", "object": "chat.completion.chunk", "model": "error", "choices": [ { "index": 0, "delta": {"content": f"[Error] {message}"}, "finish_reason": "stop", } ], } return f"data: {json.dumps(error)}\n\ndata: [DONE]\n\n"