"""PR-2 of DS AI routing policy ([[document-server-ai-routing-policy]], 2026-05-23): /api/search/ask 의 명시 backend dispatcher. 모든 backend = llm-router :8890 경유. ## 정책 (PR-2 of routing policy, MVP 옵션 C — ask path 만 swap) - 기본 (`backend` 미지정) / `gemma-macmini` / `mac-mini-default` → RouterBackend(alias="mac-mini-default", requires_gate=True) → router 가 tier_b (Mac mini :8801 gemma-4-26b) 호출. llm_gate 영구 룰 보존. - `qwen-macbook` → RouterBackend(alias="qwen-macbook", requires_gate=False) → router 가 named upstream (M5 Max :8810 Qwen3.6-27B) 호출. - `claude-cloud` → RouterBackend(alias="claude-cloud", requires_gate=False) → router 가 503 provider_not_configured pass-through. activation = 별 PR. - `auto` → RouterBackend(alias=None, requires_gate=True) → router 가 rule + LLM triage 로 tier 결정. 안전상 Mac mini gate 보호 보수적. - 그 외 → ValueError (호출자가 400/422 으로 매핑) ## 영구 룰 - Mac mini 26B 단일 inference (llm_gate, [[feedback_docstring_invariant_swap_audit]]) 보존 = requires_gate=True 분기에서 `acquire_mlx_gate(Priority.FOREGROUND)` 유지. router 경유로도 client-side mutex 효과는 동일. - BackendUnavailable 매핑 정책 ([[feedback_no_silent_fallback_explicit_opt_in]]) 보존. silent fallback 0 = router 가 503/502 반환하면 그대로 BackendUnavailable. ## Rollback `DS_BACKENDS_VIA_ROUTER=false` env 로 legacy path (GemmaMacMiniBackend + QwenMacBookBackend 직접 호출) 즉시 복귀. legacy class 1주 보존 후 별 cleanup PR. """ from __future__ import annotations import asyncio import os from abc import ABC, abstractmethod from typing import TYPE_CHECKING import httpx from core.config import settings from core.utils import setup_logger from services.search.llm_gate import Priority, acquire_mlx_gate if TYPE_CHECKING: from ai.client import AIClient logger = setup_logger("llm_backend") # 명시 backend 식별자. QWEN_MACBOOK = "qwen-macbook" GEMMA_MACMINI = "gemma-macmini" MAC_MINI_DEFAULT = "mac-mini-default" CLAUDE_CLOUD = "claude-cloud" AUTO = "auto" # Allowed user-facing alias keys (Query pattern 과 동기 — app/api/search.py:457). _ALLOWED_ALIASES = {GEMMA_MACMINI, QWEN_MACBOOK, MAC_MINI_DEFAULT, CLAUDE_CLOUD, AUTO} class BackendUnavailable(Exception): """명시 backend 가 일시 비가용. /ask wrapper 가 503 으로 매핑.""" def __init__(self, backend_name: str, reason: str): self.backend_name = backend_name self.reason = reason super().__init__(f"{backend_name} unavailable: {reason}") class BackendBase(ABC): name: str @abstractmethod async def generate(self, prompt: str, *, timeout_read_s: int) -> str: """프롬프트 → 본문 (OpenAI 호환 chat completion content). 실패 시 `BackendUnavailable` 또는 일반 예외. 일반 예외는 synthesis_service 가 status="llm_error" 로 매핑 (기존 동작). BackendUnavailable 만 503 으로 매핑. """ async def generate_with_tools( self, messages: list[dict], tools: list[dict], *, tool_choice: str = "auto", timeout_read_s: int, ) -> dict: """ReAct loop 용 OpenAI 호환 chat completion with tool calling. Default = NotImplementedError. RouterBackend 와 QwenMacBookBackend (legacy) 만 override. ReAct endpoint 가 미지원 backend 호출하면 명확한 에러. """ raise NotImplementedError( f"{type(self).__name__} does not implement generate_with_tools" ) # ────────────────────────────────────────────────────────────────────────── # RouterBackend (PR-2 신규, 기본 path) # ────────────────────────────────────────────────────────────────────────── class RouterBackend(BackendBase): """모든 ask path 가 llm-router :8890 경유. alias 별 gate 적용. response shape = router 가 upstream OpenAI 호환 응답을 그대로 forward. qwen-macbook tool calling response = mlx-vlm OpenAI 표준 호환 (tests/fixtures/qwen_tool_call_response.json, [[reference_mlx_vlm_tool_calling]]). """ def __init__( self, *, router_url: str, alias: str | None, requires_gate: bool, timeout_connect_s: int, ): self.name = alias or AUTO self.router_url = router_url.rstrip("/") self.alias = alias # None means "auto" (router rule + triage) self.requires_gate = requires_gate self.timeout_connect_s = timeout_connect_s def _build_payload( self, messages_or_prompt, *, tools: list[dict] | None = None, tool_choice: str | None = None, ) -> dict: if isinstance(messages_or_prompt, str): payload: dict = { "messages": [{"role": "user", "content": messages_or_prompt}], "max_tokens": 4096, } else: payload = { "messages": messages_or_prompt, "max_tokens": 4096, } if self.alias: payload["model"] = self.alias if tools: payload["tools"] = tools if tool_choice in ("auto", "none"): payload["tool_choice"] = tool_choice return payload async def _post(self, payload: dict, *, timeout_read_s: int) -> dict: timeout = httpx.Timeout( connect=float(self.timeout_connect_s), read=float(timeout_read_s), write=10.0, pool=5.0, ) url = f"{self.router_url}/v1/chat/completions" try: async with httpx.AsyncClient(timeout=timeout) as client: resp = await client.post(url, json=payload) # router 가 503 (provider_not_configured / 기타 router-side 503) → BackendUnavailable if resp.status_code == 503: try: body = resp.json() err = body.get("error", {}) if isinstance(body, dict) else {} reason = ( err.get("type") or err.get("error_reason") or "router_503" ) except Exception: reason = "router_503" raise BackendUnavailable(self.name, reason) # router 가 400 unknown_alias → 코드 bug. 일반 예외 (호출자가 5xx 로 변환) if resp.status_code == 400: try: body = resp.json() except Exception: body = {} raise ValueError( f"router rejected alias={self.alias!r} body={body!r}" ) # router 가 502 (upstream unavailable, M5 cold 등) → BackendUnavailable if resp.status_code == 502: try: body = resp.json() except Exception: body = {} raise BackendUnavailable( self.name, f"upstream_502_{body.get('error', 'unknown')[:32]}", ) resp.raise_for_status() return resp.json() except ( httpx.ConnectError, httpx.ConnectTimeout, httpx.ReadTimeout, httpx.PoolTimeout, httpx.WriteTimeout, httpx.RemoteProtocolError, ) as exc: logger.warning( "router_backend unavailable alias=%s url=%s exc=%s", self.alias, url, type(exc).__name__, ) raise BackendUnavailable( self.name, f"router_{type(exc).__name__}" ) from exc except httpx.HTTPStatusError as exc: if 500 <= exc.response.status_code < 600: logger.warning( "router_backend 5xx alias=%s status=%d", self.alias, exc.response.status_code, ) raise BackendUnavailable( self.name, f"router_http_{exc.response.status_code}" ) from exc raise async def generate(self, prompt: str, *, timeout_read_s: int) -> str: payload = self._build_payload(prompt) if self.requires_gate: async with acquire_mlx_gate(Priority.FOREGROUND): async with asyncio.timeout(timeout_read_s): data = await self._post(payload, timeout_read_s=timeout_read_s) else: data = await self._post(payload, timeout_read_s=timeout_read_s) return data["choices"][0]["message"]["content"] async def generate_with_tools( self, messages: list[dict], tools: list[dict], *, tool_choice: str = "auto", timeout_read_s: int, ) -> dict: payload = self._build_payload( messages, tools=tools, tool_choice=tool_choice, ) if self.requires_gate: async with acquire_mlx_gate(Priority.FOREGROUND): async with asyncio.timeout(timeout_read_s): data = await self._post(payload, timeout_read_s=timeout_read_s) else: data = await self._post(payload, timeout_read_s=timeout_read_s) return data["choices"][0]["message"] # ────────────────────────────────────────────────────────────────────────── # Legacy backends (rollback safety, DS_BACKENDS_VIA_ROUTER=false 시만 사용) # 1주 후 별 cleanup PR 로 폐기 ([[feedback_closure_gate_vs_observation]] — # dual-path = rollback safety only, 시간 관찰 게이트 0). # ────────────────────────────────────────────────────────────────────────── class GemmaMacMiniBackend(BackendBase): """[LEGACY] 기존 Mac mini ai.primary 직접 호출. DS_BACKENDS_VIA_ROUTER=false 시만.""" name = GEMMA_MACMINI async def generate(self, prompt: str, *, timeout_read_s: int) -> str: # 지연 import — ai.client 가 settings.ai 의존 from ai.client import AIClient client = AIClient() try: async with acquire_mlx_gate(Priority.FOREGROUND): async with asyncio.timeout(timeout_read_s): return await client._call_chat(client.ai.primary, prompt) finally: try: await client.close() except Exception: pass class QwenMacBookBackend(BackendBase): """[LEGACY] MacBook M5 Max mlx-vlm.server (Tailscale) 직접 호출. DS_BACKENDS_VIA_ROUTER=false 시만.""" name = QWEN_MACBOOK _gate: asyncio.Semaphore | None = None def __init__(self, base_url: str, model: str, timeout_connect_s: int): self.base_url = base_url.rstrip("/") self.model = model self.timeout_connect_s = timeout_connect_s @classmethod def _get_gate(cls) -> asyncio.Semaphore: if cls._gate is None: cls._gate = asyncio.Semaphore(1) return cls._gate async def generate(self, prompt: str, *, timeout_read_s: int) -> str: gate = self._get_gate() timeout = httpx.Timeout( connect=float(self.timeout_connect_s), read=float(timeout_read_s), write=10.0, pool=5.0, ) url = f"{self.base_url}/v1/chat/completions" payload = { "model": self.model, "messages": [{"role": "user", "content": prompt}], "max_tokens": 4096, } async with gate: try: async with httpx.AsyncClient(timeout=timeout) as client: resp = await client.post(url, json=payload) resp.raise_for_status() data = resp.json() return data["choices"][0]["message"]["content"] except ( httpx.ConnectError, httpx.ConnectTimeout, httpx.ReadTimeout, httpx.PoolTimeout, httpx.WriteTimeout, httpx.RemoteProtocolError, ) as exc: logger.warning( "qwen-macbook[legacy] unavailable url=%s exc=%s", url, type(exc).__name__, ) raise BackendUnavailable(self.name, type(exc).__name__) from exc except httpx.HTTPStatusError as exc: if 500 <= exc.response.status_code < 600: logger.warning( "qwen-macbook[legacy] 5xx status=%d", exc.response.status_code, ) raise BackendUnavailable( self.name, f"http_{exc.response.status_code}" ) from exc raise async def generate_with_tools( self, messages: list[dict], tools: list[dict], *, tool_choice: str = "auto", timeout_read_s: int, ) -> dict: gate = self._get_gate() timeout = httpx.Timeout( connect=float(self.timeout_connect_s), read=float(timeout_read_s), write=10.0, pool=5.0, ) url = f"{self.base_url}/v1/chat/completions" payload: dict = { "model": self.model, "messages": messages, "max_tokens": 4096, } if tools: payload["tools"] = tools if tool_choice in ("auto", "none"): payload["tool_choice"] = tool_choice async with gate: try: async with httpx.AsyncClient(timeout=timeout) as client: resp = await client.post(url, json=payload) resp.raise_for_status() data = resp.json() return data["choices"][0]["message"] except ( httpx.ConnectError, httpx.ConnectTimeout, httpx.ReadTimeout, httpx.PoolTimeout, httpx.WriteTimeout, httpx.RemoteProtocolError, ) as exc: logger.warning( "qwen-macbook[legacy](tools) unavailable url=%s exc=%s", url, type(exc).__name__, ) raise BackendUnavailable(self.name, type(exc).__name__) from exc except httpx.HTTPStatusError as exc: if 500 <= exc.response.status_code < 600: logger.warning( "qwen-macbook[legacy](tools) 5xx status=%d", exc.response.status_code, ) raise BackendUnavailable( self.name, f"http_{exc.response.status_code}" ) from exc raise # ────────────────────────────────────────────────────────────────────────── # Dispatcher (PR-2: dual-path with DS_BACKENDS_VIA_ROUTER env flag) # ────────────────────────────────────────────────────────────────────────── def _via_router() -> bool: """`DS_BACKENDS_VIA_ROUTER=true` (default) = RouterBackend. false 시 legacy GemmaMacMiniBackend/QwenMacBookBackend (rollback safety). """ return os.getenv("DS_BACKENDS_VIA_ROUTER", "true").lower() == "true" _ROUTER_BACKENDS: dict[str, RouterBackend] = {} _LEGACY_BACKENDS: dict[str, BackendBase] = {} def _router_url() -> str: """router URL = settings 우선, fallback env, fallback hardcoded MVP default.""" cfg = settings.search.ask.backend cfg_url = getattr(cfg, "router_url", "") or "" if cfg_url: return cfg_url return os.getenv("LLM_ROUTER_URL", "http://100.76.254.116:8890") def _build_router_backend(alias: str | None, requires_gate: bool) -> RouterBackend: cfg = settings.search.ask.backend return RouterBackend( router_url=_router_url(), alias=alias, requires_gate=requires_gate, timeout_connect_s=cfg.timeout_connect_s, ) def _build_qwen_backend() -> QwenMacBookBackend: cfg = settings.search.ask.backend return QwenMacBookBackend( base_url=cfg.macbook_url, model=cfg.macbook_model, timeout_connect_s=cfg.timeout_connect_s, ) def _get_router_backend(name: str | None) -> RouterBackend: """RouterBackend path. PR-2 default.""" key = (name or "").strip().lower() if key in ("", GEMMA_MACMINI, MAC_MINI_DEFAULT): cache_key = MAC_MINI_DEFAULT if cache_key not in _ROUTER_BACKENDS: _ROUTER_BACKENDS[cache_key] = _build_router_backend( alias=MAC_MINI_DEFAULT, requires_gate=True, ) return _ROUTER_BACKENDS[cache_key] if key == QWEN_MACBOOK: if QWEN_MACBOOK not in _ROUTER_BACKENDS: _ROUTER_BACKENDS[QWEN_MACBOOK] = _build_router_backend( alias=QWEN_MACBOOK, requires_gate=False, ) return _ROUTER_BACKENDS[QWEN_MACBOOK] if key == CLAUDE_CLOUD: if CLAUDE_CLOUD not in _ROUTER_BACKENDS: _ROUTER_BACKENDS[CLAUDE_CLOUD] = _build_router_backend( alias=CLAUDE_CLOUD, requires_gate=False, ) return _ROUTER_BACKENDS[CLAUDE_CLOUD] if key == AUTO: if AUTO not in _ROUTER_BACKENDS: # auto = router 의 rule + triage. tier_b 갈 가능성 큼 → gate 보호 보수적. _ROUTER_BACKENDS[AUTO] = _build_router_backend( alias=None, requires_gate=True, ) return _ROUTER_BACKENDS[AUTO] raise ValueError(f"unknown backend: {name!r}") def _get_legacy_backend(name: str | None) -> BackendBase: """Rollback path. DS_BACKENDS_VIA_ROUTER=false 시만.""" key = (name or "").strip().lower() or GEMMA_MACMINI if key == MAC_MINI_DEFAULT: key = GEMMA_MACMINI # legacy 는 mac-mini-default alias 모름 if key == AUTO: key = GEMMA_MACMINI # legacy 에 auto 개념 없음 → default 로 if key == CLAUDE_CLOUD: raise ValueError( f"backend {CLAUDE_CLOUD!r} requires DS_BACKENDS_VIA_ROUTER=true" ) if key not in (GEMMA_MACMINI, QWEN_MACBOOK): raise ValueError(f"unknown backend: {name!r}") if key not in _LEGACY_BACKENDS: if key == GEMMA_MACMINI: _LEGACY_BACKENDS[key] = GemmaMacMiniBackend() else: _LEGACY_BACKENDS[key] = _build_qwen_backend() return _LEGACY_BACKENDS[key] def get_backend(name: str | None) -> BackendBase: """name 으로 backend 인스턴스 반환 (캐싱). DS_BACKENDS_VIA_ROUTER=true (default, PR-2) → RouterBackend DS_BACKENDS_VIA_ROUTER=false → legacy GemmaMacMiniBackend / QwenMacBookBackend """ if _via_router(): return _get_router_backend(name) return _get_legacy_backend(name) def reset_backends_for_test() -> None: """test fixture 가 settings 변경 후 backend 인스턴스 재생성하려고 호출. production code 에서 사용 X. """ _ROUTER_BACKENDS.clear() _LEGACY_BACKENDS.clear() QwenMacBookBackend._gate = None