Files
hyungi_document_server/app/services/llm/backends.py
T
hyungi bcf644f893 refactor(search): /api/search/ask dispatcher route via llm-router
PR-2 of DS AI routing policy (2026-05-23, see plan
~/.claude/plans/document-server-ai-cheeky-reddy.md +
memory project_document_server_ai_routing_policy).

DS 의 모든 backend 호출이 llm-router :8890 단일 경유. 정칙 정합:
- 신규 RouterBackend (services/llm/backends.py) — alias 별 router POST
  + requires_gate 분기 (mac-mini-default 만 llm_gate FOREGROUND 보호).
- 기존 GemmaMacMiniBackend + QwenMacBookBackend = legacy 보존
  (DS_BACKENDS_VIA_ROUTER=false rollback safety only). 1주 후 별
  cleanup PR (PR-DS-Backends-Legacy-Cleanup-1) 로 폐기.
- get_backend factory dual-path (env flag) — backward-compat
  (gemma-macmini alias → mac-mini-default 매핑).
- search.py:457 Query pattern 확장: mac-mini-default|claude-cloud|auto
  추가. /ask/react 의 isinstance(QwenMacBookBackend) → hasattr
  duck-typing (RouterBackend + Legacy 모두 generate_with_tools 구현).
- SearchAskBackendConfig 에 router_url 신규 (env LLM_ROUTER_URL 또는
  hardcoded MVP default http://100.76.254.116:8890).
- docker-compose.yml fastapi env 에 LLM_ROUTER_URL +
  DS_BACKENDS_VIA_ROUTER 추가.

AIClient (_call_chat, call_triage, call_primary, call_fallback) 경유
path 는 별 PR (PR-AIClient-Router-Migration-1) — MVP scope C 채택,
회귀 risk 최소화.

Closure (즉시 fixture/matrix):
- factory smoke 6 alias (None/mac-mini-default/gemma-macmini/
  qwen-macbook/claude-cloud/auto) + 1 invalid (nonsense → ValueError).
- live 3 case: mac-mini-default 200 \"pong! 🏓\" + qwen-macbook cold
  502 upstream_502_primary=ConnectError + claude-cloud 503
  provider_not_configured.
- silent fallback 0 + direct M5/Mac mini socket 0
  (RouterBackend 만 router 호출).

Backup: ~/.local/share/ds-routing-pr2-backups/20260523/
(backends.py + config.py + search.py + docker-compose.yml).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-23 03:41:29 +00:00

520 lines
20 KiB
Python

"""PR-2 of DS AI routing policy ([[document-server-ai-routing-policy]], 2026-05-23):
/api/search/ask 의 명시 backend dispatcher. 모든 backend = llm-router :8890 경유.
## 정책 (PR-2 of routing policy, MVP 옵션 C — ask path 만 swap)
- 기본 (`backend` 미지정) / `gemma-macmini` / `mac-mini-default`
→ RouterBackend(alias="mac-mini-default", requires_gate=True)
→ router 가 tier_b (Mac mini :8801 gemma-4-26b) 호출. llm_gate 영구 룰 보존.
- `qwen-macbook`
→ RouterBackend(alias="qwen-macbook", requires_gate=False)
→ router 가 named upstream (M5 Max :8810 Qwen3.6-27B) 호출.
- `claude-cloud`
→ RouterBackend(alias="claude-cloud", requires_gate=False)
→ router 가 503 provider_not_configured pass-through. activation = 별 PR.
- `auto`
→ RouterBackend(alias=None, requires_gate=True)
→ router 가 rule + LLM triage 로 tier 결정. 안전상 Mac mini gate 보호 보수적.
- 그 외 → ValueError (호출자가 400/422 으로 매핑)
## 영구 룰
- Mac mini 26B 단일 inference (llm_gate, [[feedback_docstring_invariant_swap_audit]])
보존 = requires_gate=True 분기에서 `acquire_mlx_gate(Priority.FOREGROUND)` 유지.
router 경유로도 client-side mutex 효과는 동일.
- BackendUnavailable 매핑 정책 ([[feedback_no_silent_fallback_explicit_opt_in]]) 보존.
silent fallback 0 = router 가 503/502 반환하면 그대로 BackendUnavailable.
## Rollback
`DS_BACKENDS_VIA_ROUTER=false` env 로 legacy path (GemmaMacMiniBackend +
QwenMacBookBackend 직접 호출) 즉시 복귀. legacy class 1주 보존 후 별 cleanup PR.
"""
from __future__ import annotations
import asyncio
import os
from abc import ABC, abstractmethod
from typing import TYPE_CHECKING
import httpx
from core.config import settings
from core.utils import setup_logger
from services.search.llm_gate import Priority, acquire_mlx_gate
if TYPE_CHECKING:
from ai.client import AIClient
logger = setup_logger("llm_backend")
# 명시 backend 식별자.
QWEN_MACBOOK = "qwen-macbook"
GEMMA_MACMINI = "gemma-macmini"
MAC_MINI_DEFAULT = "mac-mini-default"
CLAUDE_CLOUD = "claude-cloud"
AUTO = "auto"
# Allowed user-facing alias keys (Query pattern 과 동기 — app/api/search.py:457).
_ALLOWED_ALIASES = {GEMMA_MACMINI, QWEN_MACBOOK, MAC_MINI_DEFAULT, CLAUDE_CLOUD, AUTO}
class BackendUnavailable(Exception):
"""명시 backend 가 일시 비가용. /ask wrapper 가 503 으로 매핑."""
def __init__(self, backend_name: str, reason: str):
self.backend_name = backend_name
self.reason = reason
super().__init__(f"{backend_name} unavailable: {reason}")
class BackendBase(ABC):
name: str
@abstractmethod
async def generate(self, prompt: str, *, timeout_read_s: int) -> str:
"""프롬프트 → 본문 (OpenAI 호환 chat completion content).
실패 시 `BackendUnavailable` 또는 일반 예외. 일반 예외는 synthesis_service
가 status="llm_error" 로 매핑 (기존 동작). BackendUnavailable 만 503 으로 매핑.
"""
async def generate_with_tools(
self,
messages: list[dict],
tools: list[dict],
*,
tool_choice: str = "auto",
timeout_read_s: int,
) -> dict:
"""ReAct loop 용 OpenAI 호환 chat completion with tool calling.
Default = NotImplementedError. RouterBackend 와 QwenMacBookBackend (legacy)
만 override. ReAct endpoint 가 미지원 backend 호출하면 명확한 에러.
"""
raise NotImplementedError(
f"{type(self).__name__} does not implement generate_with_tools"
)
# ──────────────────────────────────────────────────────────────────────────
# RouterBackend (PR-2 신규, 기본 path)
# ──────────────────────────────────────────────────────────────────────────
class RouterBackend(BackendBase):
"""모든 ask path 가 llm-router :8890 경유. alias 별 gate 적용.
response shape = router 가 upstream OpenAI 호환 응답을 그대로 forward.
qwen-macbook tool calling response = mlx-vlm OpenAI 표준 호환
(tests/fixtures/qwen_tool_call_response.json, [[reference_mlx_vlm_tool_calling]]).
"""
def __init__(
self,
*,
router_url: str,
alias: str | None,
requires_gate: bool,
timeout_connect_s: int,
):
self.name = alias or AUTO
self.router_url = router_url.rstrip("/")
self.alias = alias # None means "auto" (router rule + triage)
self.requires_gate = requires_gate
self.timeout_connect_s = timeout_connect_s
def _build_payload(
self,
messages_or_prompt,
*,
tools: list[dict] | None = None,
tool_choice: str | None = None,
) -> dict:
if isinstance(messages_or_prompt, str):
payload: dict = {
"messages": [{"role": "user", "content": messages_or_prompt}],
"max_tokens": 4096,
}
else:
payload = {
"messages": messages_or_prompt,
"max_tokens": 4096,
}
if self.alias:
payload["model"] = self.alias
if tools:
payload["tools"] = tools
if tool_choice in ("auto", "none"):
payload["tool_choice"] = tool_choice
return payload
async def _post(self, payload: dict, *, timeout_read_s: int) -> dict:
timeout = httpx.Timeout(
connect=float(self.timeout_connect_s),
read=float(timeout_read_s),
write=10.0,
pool=5.0,
)
url = f"{self.router_url}/v1/chat/completions"
try:
async with httpx.AsyncClient(timeout=timeout) as client:
resp = await client.post(url, json=payload)
# router 가 503 (provider_not_configured / 기타 router-side 503) → BackendUnavailable
if resp.status_code == 503:
try:
body = resp.json()
err = body.get("error", {}) if isinstance(body, dict) else {}
reason = (
err.get("type")
or err.get("error_reason")
or "router_503"
)
except Exception:
reason = "router_503"
raise BackendUnavailable(self.name, reason)
# router 가 400 unknown_alias → 코드 bug. 일반 예외 (호출자가 5xx 로 변환)
if resp.status_code == 400:
try:
body = resp.json()
except Exception:
body = {}
raise ValueError(
f"router rejected alias={self.alias!r} body={body!r}"
)
# router 가 502 (upstream unavailable, M5 cold 등) → BackendUnavailable
if resp.status_code == 502:
try:
body = resp.json()
except Exception:
body = {}
raise BackendUnavailable(
self.name,
f"upstream_502_{body.get('error', 'unknown')[:32]}",
)
resp.raise_for_status()
return resp.json()
except (
httpx.ConnectError,
httpx.ConnectTimeout,
httpx.ReadTimeout,
httpx.PoolTimeout,
httpx.WriteTimeout,
httpx.RemoteProtocolError,
) as exc:
logger.warning(
"router_backend unavailable alias=%s url=%s exc=%s",
self.alias, url, type(exc).__name__,
)
raise BackendUnavailable(
self.name, f"router_{type(exc).__name__}"
) from exc
except httpx.HTTPStatusError as exc:
if 500 <= exc.response.status_code < 600:
logger.warning(
"router_backend 5xx alias=%s status=%d",
self.alias, exc.response.status_code,
)
raise BackendUnavailable(
self.name, f"router_http_{exc.response.status_code}"
) from exc
raise
async def generate(self, prompt: str, *, timeout_read_s: int) -> str:
payload = self._build_payload(prompt)
if self.requires_gate:
async with acquire_mlx_gate(Priority.FOREGROUND):
async with asyncio.timeout(timeout_read_s):
data = await self._post(payload, timeout_read_s=timeout_read_s)
else:
data = await self._post(payload, timeout_read_s=timeout_read_s)
return data["choices"][0]["message"]["content"]
async def generate_with_tools(
self,
messages: list[dict],
tools: list[dict],
*,
tool_choice: str = "auto",
timeout_read_s: int,
) -> dict:
payload = self._build_payload(
messages, tools=tools, tool_choice=tool_choice,
)
if self.requires_gate:
async with acquire_mlx_gate(Priority.FOREGROUND):
async with asyncio.timeout(timeout_read_s):
data = await self._post(payload, timeout_read_s=timeout_read_s)
else:
data = await self._post(payload, timeout_read_s=timeout_read_s)
return data["choices"][0]["message"]
# ──────────────────────────────────────────────────────────────────────────
# Legacy backends (rollback safety, DS_BACKENDS_VIA_ROUTER=false 시만 사용)
# 1주 후 별 cleanup PR 로 폐기 ([[feedback_closure_gate_vs_observation]] —
# dual-path = rollback safety only, 시간 관찰 게이트 0).
# ──────────────────────────────────────────────────────────────────────────
class GemmaMacMiniBackend(BackendBase):
"""[LEGACY] 기존 Mac mini ai.primary 직접 호출. DS_BACKENDS_VIA_ROUTER=false 시만."""
name = GEMMA_MACMINI
async def generate(self, prompt: str, *, timeout_read_s: int) -> str:
# 지연 import — ai.client 가 settings.ai 의존
from ai.client import AIClient
client = AIClient()
try:
async with acquire_mlx_gate(Priority.FOREGROUND):
async with asyncio.timeout(timeout_read_s):
return await client._call_chat(client.ai.primary, prompt)
finally:
try:
await client.close()
except Exception:
pass
class QwenMacBookBackend(BackendBase):
"""[LEGACY] MacBook M5 Max mlx-vlm.server (Tailscale) 직접 호출. DS_BACKENDS_VIA_ROUTER=false 시만."""
name = QWEN_MACBOOK
_gate: asyncio.Semaphore | None = None
def __init__(self, base_url: str, model: str, timeout_connect_s: int):
self.base_url = base_url.rstrip("/")
self.model = model
self.timeout_connect_s = timeout_connect_s
@classmethod
def _get_gate(cls) -> asyncio.Semaphore:
if cls._gate is None:
cls._gate = asyncio.Semaphore(1)
return cls._gate
async def generate(self, prompt: str, *, timeout_read_s: int) -> str:
gate = self._get_gate()
timeout = httpx.Timeout(
connect=float(self.timeout_connect_s),
read=float(timeout_read_s),
write=10.0,
pool=5.0,
)
url = f"{self.base_url}/v1/chat/completions"
payload = {
"model": self.model,
"messages": [{"role": "user", "content": prompt}],
"max_tokens": 4096,
}
async with gate:
try:
async with httpx.AsyncClient(timeout=timeout) as client:
resp = await client.post(url, json=payload)
resp.raise_for_status()
data = resp.json()
return data["choices"][0]["message"]["content"]
except (
httpx.ConnectError,
httpx.ConnectTimeout,
httpx.ReadTimeout,
httpx.PoolTimeout,
httpx.WriteTimeout,
httpx.RemoteProtocolError,
) as exc:
logger.warning(
"qwen-macbook[legacy] unavailable url=%s exc=%s",
url, type(exc).__name__,
)
raise BackendUnavailable(self.name, type(exc).__name__) from exc
except httpx.HTTPStatusError as exc:
if 500 <= exc.response.status_code < 600:
logger.warning(
"qwen-macbook[legacy] 5xx status=%d",
exc.response.status_code,
)
raise BackendUnavailable(
self.name, f"http_{exc.response.status_code}"
) from exc
raise
async def generate_with_tools(
self,
messages: list[dict],
tools: list[dict],
*,
tool_choice: str = "auto",
timeout_read_s: int,
) -> dict:
gate = self._get_gate()
timeout = httpx.Timeout(
connect=float(self.timeout_connect_s),
read=float(timeout_read_s),
write=10.0,
pool=5.0,
)
url = f"{self.base_url}/v1/chat/completions"
payload: dict = {
"model": self.model,
"messages": messages,
"max_tokens": 4096,
}
if tools:
payload["tools"] = tools
if tool_choice in ("auto", "none"):
payload["tool_choice"] = tool_choice
async with gate:
try:
async with httpx.AsyncClient(timeout=timeout) as client:
resp = await client.post(url, json=payload)
resp.raise_for_status()
data = resp.json()
return data["choices"][0]["message"]
except (
httpx.ConnectError,
httpx.ConnectTimeout,
httpx.ReadTimeout,
httpx.PoolTimeout,
httpx.WriteTimeout,
httpx.RemoteProtocolError,
) as exc:
logger.warning(
"qwen-macbook[legacy](tools) unavailable url=%s exc=%s",
url, type(exc).__name__,
)
raise BackendUnavailable(self.name, type(exc).__name__) from exc
except httpx.HTTPStatusError as exc:
if 500 <= exc.response.status_code < 600:
logger.warning(
"qwen-macbook[legacy](tools) 5xx status=%d",
exc.response.status_code,
)
raise BackendUnavailable(
self.name, f"http_{exc.response.status_code}"
) from exc
raise
# ──────────────────────────────────────────────────────────────────────────
# Dispatcher (PR-2: dual-path with DS_BACKENDS_VIA_ROUTER env flag)
# ──────────────────────────────────────────────────────────────────────────
def _via_router() -> bool:
"""`DS_BACKENDS_VIA_ROUTER=true` (default) = RouterBackend.
false 시 legacy GemmaMacMiniBackend/QwenMacBookBackend (rollback safety).
"""
return os.getenv("DS_BACKENDS_VIA_ROUTER", "true").lower() == "true"
_ROUTER_BACKENDS: dict[str, RouterBackend] = {}
_LEGACY_BACKENDS: dict[str, BackendBase] = {}
def _router_url() -> str:
"""router URL = settings 우선, fallback env, fallback hardcoded MVP default."""
cfg = settings.search.ask.backend
cfg_url = getattr(cfg, "router_url", "") or ""
if cfg_url:
return cfg_url
return os.getenv("LLM_ROUTER_URL", "http://100.76.254.116:8890")
def _build_router_backend(alias: str | None, requires_gate: bool) -> RouterBackend:
cfg = settings.search.ask.backend
return RouterBackend(
router_url=_router_url(),
alias=alias,
requires_gate=requires_gate,
timeout_connect_s=cfg.timeout_connect_s,
)
def _build_qwen_backend() -> QwenMacBookBackend:
cfg = settings.search.ask.backend
return QwenMacBookBackend(
base_url=cfg.macbook_url,
model=cfg.macbook_model,
timeout_connect_s=cfg.timeout_connect_s,
)
def _get_router_backend(name: str | None) -> RouterBackend:
"""RouterBackend path. PR-2 default."""
key = (name or "").strip().lower()
if key in ("", GEMMA_MACMINI, MAC_MINI_DEFAULT):
cache_key = MAC_MINI_DEFAULT
if cache_key not in _ROUTER_BACKENDS:
_ROUTER_BACKENDS[cache_key] = _build_router_backend(
alias=MAC_MINI_DEFAULT, requires_gate=True,
)
return _ROUTER_BACKENDS[cache_key]
if key == QWEN_MACBOOK:
if QWEN_MACBOOK not in _ROUTER_BACKENDS:
_ROUTER_BACKENDS[QWEN_MACBOOK] = _build_router_backend(
alias=QWEN_MACBOOK, requires_gate=False,
)
return _ROUTER_BACKENDS[QWEN_MACBOOK]
if key == CLAUDE_CLOUD:
if CLAUDE_CLOUD not in _ROUTER_BACKENDS:
_ROUTER_BACKENDS[CLAUDE_CLOUD] = _build_router_backend(
alias=CLAUDE_CLOUD, requires_gate=False,
)
return _ROUTER_BACKENDS[CLAUDE_CLOUD]
if key == AUTO:
if AUTO not in _ROUTER_BACKENDS:
# auto = router 의 rule + triage. tier_b 갈 가능성 큼 → gate 보호 보수적.
_ROUTER_BACKENDS[AUTO] = _build_router_backend(
alias=None, requires_gate=True,
)
return _ROUTER_BACKENDS[AUTO]
raise ValueError(f"unknown backend: {name!r}")
def _get_legacy_backend(name: str | None) -> BackendBase:
"""Rollback path. DS_BACKENDS_VIA_ROUTER=false 시만."""
key = (name or "").strip().lower() or GEMMA_MACMINI
if key == MAC_MINI_DEFAULT:
key = GEMMA_MACMINI # legacy 는 mac-mini-default alias 모름
if key == AUTO:
key = GEMMA_MACMINI # legacy 에 auto 개념 없음 → default 로
if key == CLAUDE_CLOUD:
raise ValueError(
f"backend {CLAUDE_CLOUD!r} requires DS_BACKENDS_VIA_ROUTER=true"
)
if key not in (GEMMA_MACMINI, QWEN_MACBOOK):
raise ValueError(f"unknown backend: {name!r}")
if key not in _LEGACY_BACKENDS:
if key == GEMMA_MACMINI:
_LEGACY_BACKENDS[key] = GemmaMacMiniBackend()
else:
_LEGACY_BACKENDS[key] = _build_qwen_backend()
return _LEGACY_BACKENDS[key]
def get_backend(name: str | None) -> BackendBase:
"""name 으로 backend 인스턴스 반환 (캐싱).
DS_BACKENDS_VIA_ROUTER=true (default, PR-2) → RouterBackend
DS_BACKENDS_VIA_ROUTER=false → legacy GemmaMacMiniBackend / QwenMacBookBackend
"""
if _via_router():
return _get_router_backend(name)
return _get_legacy_backend(name)
def reset_backends_for_test() -> None:
"""test fixture 가 settings 변경 후 backend 인스턴스 재생성하려고 호출.
production code 에서 사용 X.
"""
_ROUTER_BACKENDS.clear()
_LEGACY_BACKENDS.clear()
QwenMacBookBackend._gate = None