Files
hyungi_document_server/app/services/llm/backends.py
T
hyungi 51c3f6df10 feat(search): /ask/react endpoint with Qwen native tool calling ReAct loop
PR-DocSrv-Ask-ToolCalling-ReAct-1 — Qwen3.6-27B-8bit 의 native tool calling
으로 ReAct loop 도입. 기존 /api/search/ask 무수정. 트랙 B (frontend /ask SSE)
와 파일 단위 충돌 0 (search.py 의 ask() 함수 line diff = 0, 순수 추가).

핵심 invariant:
- 별 endpoint /api/search/ask/react (qwen-macbook only, implicit opt-in)
- MacBook unavailable 시 HTTP 503 + error_reason=macbook_unavailable.
  Gemma 자동 fallback X (정정 4 의 연장)

G0 (구현 전 hard gate, plan b-velvety-hare.md):
- G0-1 fixture (tests/fixtures/qwen_tool_call_response.json): 실제 mlx-vlm
  응답 박제. shape = OpenAI 표준 호환 (choices[0].message.tool_calls +
  function.arguments JSON string). generate_with_tools() 가 본 shape 기준 구현.
- G0-2 counter semantics: max_tool_rounds=2 + max_llm_calls=3 + search_exec_max=2.
  마지막 LLM 호출은 tool_choice="none" + system instruction 으로 final 강제.
- G0-3 trace exposure: default response 의 debug_trace=null. debug=true 시만
  채움. server log 에는 항상 round 기록.

backends.py (193 → 261줄):
- QwenMacBookBackend.generate_with_tools(messages, tools, tool_choice)
  신규 method. 기존 generate() 무수정. BackendUnavailable 처리 동일.

react_loop.py 신규 (275줄):
- agentic_ask_loop(session, query, *, backend, max_tool_rounds, debug)
- tool round 안에서 run_search 호출, results dedup by id, final round 강제,
  partial=True 조건 (final content 빈 경우)

search.py (+82줄):
- POST /api/search/ask/react + AskReactRequest/Response schema
- BackendUnavailable → JSONResponse(503, error_reason=macbook_unavailable)

config.yaml + config.py:
- search.ask.react: { enabled, max_tool_rounds=2, search_tool_limit=5,
  search_tool_mode=hybrid }

tests (566줄, 18 신규 + 23 회귀 모두 PASS):
- test_react_loop.py 13건: G0-1 fixture shape / G0-2 counter cap / G0-3 trace
  exposure / BackendUnavailable propagation / sources dedup
- test_search_ask_react_endpoint.py 5건: 503 + run_search 호출 0 / 정상 200 /
  debug=true trace 노출 / max rounds partial
- 회귀 (test_ask_eval_auth 9 + test_search_ask_macbook_503 5 +
  test_backend_dispatcher 9) 모두 PASS

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-22 13:43:47 +00:00

262 lines
9.2 KiB
Python

"""PR-MacBook-RAG-Backend-1: /api/search/ask 의 명시 backend dispatcher.
## 정책 (정정 4)
- 기본 (`backend` 미지정) = Gemma Mac mini. 기존 코드 경로 100% 보존.
- 명시 opt-in `backend="qwen-macbook"` 만 MacBook M5 Max mlx-vlm.server 호출.
- MacBook unavailable 시 `BackendUnavailable` 예외 → /ask wrapper 가 503 +
`error_reason="macbook_unavailable"` 응답. **Gemma 자동 fallback 금지**.
## 영구 룰
- Qwen backend 는 **Mac mini llm_gate 점유 금지**. 별 endpoint, 별 concurrency.
→ MacBook 전용 `asyncio.Semaphore(1)` (single-inference 가정) 분리.
- Gemma backend 는 기존 path 그대로 (acquire_mlx_gate(FOREGROUND) + ai.primary).
llm_gate 영구 룰 ([[feedback_docstring_invariant_swap_audit]] 케이스) 보존.
"""
from __future__ import annotations
import asyncio
from abc import ABC, abstractmethod
from typing import TYPE_CHECKING
import httpx
from core.config import settings
from core.utils import setup_logger
from services.search.llm_gate import Priority, acquire_mlx_gate
if TYPE_CHECKING:
from ai.client import AIClient
logger = setup_logger("llm_backend")
# 명시 backend 식별자. None / "gemma-macmini" 는 default Gemma path.
QWEN_MACBOOK = "qwen-macbook"
GEMMA_MACMINI = "gemma-macmini"
class BackendUnavailable(Exception):
"""명시 backend 가 일시 비가용. /ask wrapper 가 503 으로 매핑."""
def __init__(self, backend_name: str, reason: str):
self.backend_name = backend_name
self.reason = reason
super().__init__(f"{backend_name} unavailable: {reason}")
class BackendBase(ABC):
name: str
@abstractmethod
async def generate(self, prompt: str, *, timeout_read_s: int) -> str:
"""프롬프트 → 본문 (OpenAI 호환 chat completion content).
실패 시 `BackendUnavailable` 또는 일반 예외. 일반 예외는 synthesis_service
가 status="llm_error" 로 매핑 (기존 동작). BackendUnavailable 만 503 으로 매핑.
"""
class GemmaMacMiniBackend(BackendBase):
"""기존 Mac mini ai.primary 경로 그대로. 코드 변경 0 path."""
name = GEMMA_MACMINI
async def generate(self, prompt: str, *, timeout_read_s: int) -> str:
# 지연 import — ai.client 가 settings.ai 의존
from ai.client import AIClient
client = AIClient()
try:
async with acquire_mlx_gate(Priority.FOREGROUND):
async with asyncio.timeout(timeout_read_s):
return await client._call_chat(client.ai.primary, prompt)
finally:
try:
await client.close()
except Exception:
pass
class QwenMacBookBackend(BackendBase):
"""MacBook M5 Max mlx-vlm.server (Tailscale) 직접 호출.
- Mac mini llm_gate 점유 X (별 endpoint 라 의미 없음 + 큐 분할 금지 영구 룰의
대상이 아님)
- MacBook 자체 single-inference 가정 → 별 semaphore(1)
- 연결 거부 / DNS / timeout / 5xx → BackendUnavailable
"""
name = QWEN_MACBOOK
_gate: asyncio.Semaphore | None = None
def __init__(self, base_url: str, model: str, timeout_connect_s: int):
self.base_url = base_url.rstrip("/")
self.model = model
self.timeout_connect_s = timeout_connect_s
@classmethod
def _get_gate(cls) -> asyncio.Semaphore:
if cls._gate is None:
cls._gate = asyncio.Semaphore(1)
return cls._gate
async def generate(self, prompt: str, *, timeout_read_s: int) -> str:
gate = self._get_gate()
timeout = httpx.Timeout(
connect=float(self.timeout_connect_s),
read=float(timeout_read_s),
write=10.0,
pool=5.0,
)
url = f"{self.base_url}/v1/chat/completions"
payload = {
"model": self.model,
"messages": [{"role": "user", "content": prompt}],
"max_tokens": 4096,
}
async with gate:
try:
async with httpx.AsyncClient(timeout=timeout) as client:
resp = await client.post(url, json=payload)
resp.raise_for_status()
data = resp.json()
return data["choices"][0]["message"]["content"]
except (
httpx.ConnectError,
httpx.ConnectTimeout,
httpx.ReadTimeout,
httpx.PoolTimeout,
httpx.WriteTimeout,
httpx.RemoteProtocolError,
) as exc:
logger.warning(
"qwen-macbook unavailable url=%s exc=%s",
url, type(exc).__name__,
)
raise BackendUnavailable(self.name, type(exc).__name__) from exc
except httpx.HTTPStatusError as exc:
# 5xx 만 unavailable, 4xx 는 호출자 잘못 → 일반 예외 전파
if 500 <= exc.response.status_code < 600:
logger.warning(
"qwen-macbook 5xx status=%d", exc.response.status_code,
)
raise BackendUnavailable(
self.name, f"http_{exc.response.status_code}"
) from exc
raise
async def generate_with_tools(
self,
messages: list[dict],
tools: list[dict],
*,
tool_choice: str = "auto",
timeout_read_s: int,
) -> dict:
"""OpenAI 호환 chat completion with tool calling (ReAct loop 용).
Returns: `choices[0].message` dict 그대로 — `content` (Optional[str]) +
`tool_calls` (Optional[list]) 둘 다 포함.
Response shape = G0-1 fixture `tests/fixtures/qwen_tool_call_response.json`
기준 (mlx-vlm OpenAI 표준 호환). tool_calls[].function.arguments 는
**JSON string** 으로 옴 — 호출자가 json.loads 필요.
- `tool_choice="auto"`: 모델이 tool 호출 여부 결정
- `tool_choice="none"`: tool 호출 금지, content 만 반환 (final round)
- `tools=[]` + `tool_choice="none"`: tool 정의 없이 final answer 강제
"""
gate = self._get_gate()
timeout = httpx.Timeout(
connect=float(self.timeout_connect_s),
read=float(timeout_read_s),
write=10.0,
pool=5.0,
)
url = f"{self.base_url}/v1/chat/completions"
payload: dict = {
"model": self.model,
"messages": messages,
"max_tokens": 4096,
}
if tools:
payload["tools"] = tools
if tool_choice in ("auto", "none"):
payload["tool_choice"] = tool_choice
async with gate:
try:
async with httpx.AsyncClient(timeout=timeout) as client:
resp = await client.post(url, json=payload)
resp.raise_for_status()
data = resp.json()
return data["choices"][0]["message"]
except (
httpx.ConnectError,
httpx.ConnectTimeout,
httpx.ReadTimeout,
httpx.PoolTimeout,
httpx.WriteTimeout,
httpx.RemoteProtocolError,
) as exc:
logger.warning(
"qwen-macbook(tools) unavailable url=%s exc=%s",
url, type(exc).__name__,
)
raise BackendUnavailable(self.name, type(exc).__name__) from exc
except httpx.HTTPStatusError as exc:
if 500 <= exc.response.status_code < 600:
logger.warning(
"qwen-macbook(tools) 5xx status=%d", exc.response.status_code,
)
raise BackendUnavailable(
self.name, f"http_{exc.response.status_code}"
) from exc
raise
# ── dispatcher ─────────────────────────────────────────────────────────────
_BACKENDS: dict[str, BackendBase] = {}
def _build_qwen_backend() -> QwenMacBookBackend:
b = settings.search.ask.backend
return QwenMacBookBackend(
base_url=b.macbook_url,
model=b.macbook_model,
timeout_connect_s=b.timeout_connect_s,
)
def get_backend(name: str | None) -> BackendBase:
"""name 으로 backend 인스턴스 반환 (캐싱).
- None / "" / "gemma-macmini" → Gemma Mac mini (default)
- "qwen-macbook" → MacBook Qwen
- 그 외 → ValueError (호출자가 400 으로 매핑)
"""
key = (name or "").strip().lower() or GEMMA_MACMINI
if key not in (GEMMA_MACMINI, QWEN_MACBOOK):
raise ValueError(f"unknown backend: {name!r}")
if key not in _BACKENDS:
if key == GEMMA_MACMINI:
_BACKENDS[key] = GemmaMacMiniBackend()
else:
_BACKENDS[key] = _build_qwen_backend()
return _BACKENDS[key]
def reset_backends_for_test() -> None:
"""test fixture 가 settings 변경 후 backend 인스턴스 재생성하려고 호출.
production code 에서 사용 X.
"""
_BACKENDS.clear()
QwenMacBookBackend._gate = None