feat(search): /ask backend dispatcher (qwen-macbook opt-in, no silent fallback)

PR-MacBook-RAG-Backend-1 — /api/search/ask 의 명시 backend 선택 진입점.

핵심 invariant (정정 4):
- backend 미지정 = Gemma Mac mini default, 응답 contract 변동 0
- backend="qwen-macbook" 명시 opt-in 만 MacBook M5 Max mlx-vlm.server 호출
- MacBook unavailable 시 HTTP 503 + error_reason=macbook_unavailable
- 자동 fallback 절대 금지 — 실패 path 에서 Gemma backend.generate() 호출 0

backend dispatcher (services/llm/):
- BackendBase / GemmaMacMiniBackend / QwenMacBookBackend / BackendUnavailable
- Qwen backend 는 Mac mini llm_gate 점유 X, 별 Semaphore(1) — llm_gate
  docstring 의 single-inference 영구 룰은 같은 endpoint 한정으로 scope 명시
- httpx Connect/Read/Pool/Timeout/5xx → BackendUnavailable, 4xx 전파

synthesis_service.py:
- backend 인자 추가, status="backend_unavailable" 신규
- cache key 에 backend_name 포함 (qwen ↔ gemma 캐시 충돌 차단)

config:
- search.ask.backend.{macmini_url, macbook_url, macbook_model,
  timeout_connect_s=1, timeout_read_s=30}
- MacBook endpoint = http://100.118.112.84:8810 (M5 Max Tailscale bind)

tests (14 신규):
- tests/services/test_backend_dispatcher.py (9): dispatcher 정합성 + Qwen
  generate path (mock 200 / dead port / 5xx / 4xx) + cache identity
- tests/api/test_search_ask_macbook_503.py (5): 정정 4 핵심 invariant.
  backend=qwen-macbook 비가용 시 gemma.generate.assert_not_called()

기존 ask 회귀 0 (test_ask_eval_auth 9건 등 85건 모두 PASS).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
hyungi
2026-05-22 12:38:48 +00:00
parent 224843ba25
commit a7b8f15870
9 changed files with 910 additions and 42 deletions
+72 -3
View File
@@ -15,6 +15,7 @@ import time
from typing import Annotated, Literal
from fastapi import APIRouter, BackgroundTasks, Depends, Header, Query
from fastapi.responses import JSONResponse
from pydantic import BaseModel
from sqlalchemy.ext.asyncio import AsyncSession
@@ -261,7 +262,10 @@ class AskResponse(BaseModel):
ai_answer: str | None
citations: list[Citation]
synthesis_status: Literal[
"completed", "timeout", "skipped", "no_evidence", "parse_failed", "llm_error"
"completed", "timeout", "skipped", "no_evidence", "parse_failed", "llm_error",
# PR-MacBook-RAG-Backend-1: 200 응답에는 등장하지 않음 (해당 status 는 503 분기).
# Literal 호환성 위해 포함.
"backend_unavailable",
]
synthesis_ms: float
confidence: Literal["high", "medium", "low"] | None
@@ -274,6 +278,11 @@ class AskResponse(BaseModel):
covered_aspects: list[str] | None = None
missing_aspects: list[str] | None = None
confirmed_items: list[ConfirmedItem] | None = None
# PR-MacBook-RAG-Backend-1: backend dispatcher metadata.
# backend 미지정 호출은 둘 다 None 으로 유지 (기존 호출자 호환 — Hermes docsrv_ask /
# voice-memo-bot 응답 형식 변동 0). 명시 opt-in 시만 채워짐.
backend_requested: str | None = None
backend_used: str | None = None
debug: AskDebug | None = None
@@ -445,6 +454,19 @@ async def ask(
background_tasks: BackgroundTasks,
limit: int = Query(10, ge=1, le=20, description="synthesis 입력 상한"),
debug: bool = Query(False, description="evidence/synthesis 중간 상태 노출"),
backend: Annotated[
str | None,
Query(
pattern="^(qwen-macbook|gemma-macmini)$",
description=(
"PR-MacBook-RAG-Backend-1: 명시 backend opt-in. "
"미지정 = gemma-macmini (Mac mini, default). "
"'qwen-macbook' = MacBook M5 Max Qwen 3.6 27B. "
"MacBook unavailable 시 503 + error_reason=macbook_unavailable "
"(자동 fallback 없음 — 다시 호출하거나 backend 인자 제거 후 재시도)."
),
),
] = None,
x_source: Annotated[str | None, Header(alias="X-Source")] = None,
x_eval_case_id: Annotated[str | None, Header(alias="X-Eval-Case-Id")] = None,
x_eval_token: Annotated[str | None, Header(alias="X-Eval-Token")] = None,
@@ -617,14 +639,55 @@ async def ask(
completeness="insufficient",
covered_aspects=classifier_result.covered_aspects or None,
missing_aspects=classifier_result.missing_aspects or None,
# refusal gate 단계에서는 backend 호출 자체가 일어나지 않음 →
# backend_used = None. backend_requested 는 호출자 의도 표시용.
backend_requested=backend,
backend_used=None,
debug=debug_obj,
)
# 4. Synthesis
# 4. Synthesis (backend dispatcher 적용 — PR-MacBook-RAG-Backend-1)
t_synth = time.perf_counter()
sr = await synthesize(q, evidence, debug=debug)
sr = await synthesize(q, evidence, debug=debug, backend=backend)
synth_ms = (time.perf_counter() - t_synth) * 1000
# 4.1. backend_unavailable → 503 fail-fast (자동 fallback 금지)
# 명시 opt-in backend (예: qwen-macbook) 가 비가용일 때만 발생. /ask wrapper 는
# 절대 다른 backend 로 재시도하지 않음. 사용자가 backend 인자 제거 또는 wake 후 재시도.
if sr.status == "backend_unavailable":
backend_requested_val = backend or "gemma-macmini"
total_ms = (time.perf_counter() - t_total) * 1000
logger.warning(
"ask backend_unavailable backend=%s query=%r total_ms=%.0f flags=%s",
backend_requested_val, q[:80], total_ms,
",".join(sr.hallucination_flags) if sr.hallucination_flags else "-",
)
# error_reason 명명 — macbook_unavailable 만 정착 (자동 fallback 부재).
error_reason = (
"macbook_unavailable"
if backend_requested_val == "qwen-macbook"
else "backend_unavailable"
)
# telemetry — search 만 기록 (ask_events 는 200 응답 path 전용)
background_tasks.add_task(
record_search_event, q, user.id, pr.results, "hybrid",
pr.confidence_signal, pr.analyzer_confidence,
)
return JSONResponse(
status_code=503,
content={
"error": "backend_unavailable",
"error_reason": error_reason,
"backend_requested": backend_requested_val,
"backend_used": None,
"query": q,
"detail": (
"명시 선택한 backend 가 일시적으로 응답할 수 없습니다. "
"MacBook 깨우거나 backend 인자를 제거하고 (기본 Gemma) 다시 호출하세요."
),
},
)
# 5. Grounding check + Verifier (조건부 병렬) + re-gate (Phase 3.5b)
grounding = grounding_check(q, sr.answer or "", evidence)
@@ -846,6 +909,10 @@ async def ask(
defense_layers=defense_log,
)
# backend_used: synthesize 가 실제 호출한 backend (backend 인자 그대로 신뢰 OK —
# backend_unavailable 은 위 503 분기에서 이미 return 됨).
backend_used_val = backend or "gemma-macmini"
return AskResponse(
results=pr.results,
ai_answer=sr.answer,
@@ -861,5 +928,7 @@ async def ask(
covered_aspects=covered_aspects,
missing_aspects=missing_aspects,
confirmed_items=confirmed_items,
backend_requested=backend,
backend_used=backend_used_val,
debug=debug_obj,
)
+34
View File
@@ -35,6 +35,29 @@ class DeepSummaryBacklogConfig(BaseModel):
window_minutes: int = 30
class SearchAskBackendConfig(BaseModel):
"""PR-MacBook-RAG-Backend-1: /api/search/ask backend dispatcher.
backend 미지정 = Gemma Mac mini (settings.ai.primary 경로 그대로).
backend="qwen-macbook" 명시 opt-in = MacBook M5 Max mlx-vlm.server.
MacBook unavailable 시 503 + error_reason=macbook_unavailable (자동 fallback 없음).
"""
macmini_url: str = "http://100.76.254.116:8801"
macbook_url: str = "http://100.118.112.84:8810"
macbook_model: str = "mlx-community/Qwen3.6-27B-8bit"
timeout_connect_s: int = 1
timeout_read_s: int = 30
class SearchAskConfig(BaseModel):
backend: SearchAskBackendConfig = SearchAskBackendConfig()
class SearchConfig(BaseModel):
ask: SearchAskConfig = SearchAskConfig()
class AIConfig(BaseModel):
gateway_endpoint: str
# B-0: 3-tier routing. triage/primary = Mac mini 26B MLX (PR #20 endpoint 통합). fallback = Claude Sonnet 4 API.
@@ -62,6 +85,9 @@ class Settings(BaseModel):
# AI
ai: AIConfig | None = None
# PR-MacBook-RAG-Backend-1: /api/search/ask backend dispatcher
search: SearchConfig = SearchConfig()
# NAS
nas_mount_path: str = "/documents"
nas_pkm_root: str = "/documents/PKM"
@@ -171,6 +197,13 @@ def load_settings() -> Settings:
nas_mount = raw["nas"].get("mount_path", nas_mount)
nas_pkm = raw["nas"].get("pkm_root", nas_pkm)
search_cfg = SearchConfig()
if config_path.exists() and raw and "search" in raw:
sb = (raw.get("search") or {}).get("ask", {}).get("backend", {}) or {}
search_cfg = SearchConfig(
ask=SearchAskConfig(backend=SearchAskBackendConfig(**sb))
)
taxonomy = raw.get("taxonomy", {}) if config_path.exists() and raw else {}
document_types = raw.get("document_types", []) if config_path.exists() and raw else []
upload_cfg = (
@@ -182,6 +215,7 @@ def load_settings() -> Settings:
return Settings(
database_url=database_url,
ai=ai_config,
search=search_cfg,
nas_mount_path=nas_mount,
nas_pkm_root=nas_pkm,
jwt_secret=jwt_secret,
+24
View File
@@ -0,0 +1,24 @@
"""PR-MacBook-RAG-Backend-1: /api/search/ask backend dispatcher.
이 패키지는 ask 의 LLM 호출자만 사용한다. 다른 generation 경로 (classifier /
verifier / evidence / triage / digest 등) 는 본 dispatcher 를 통과하지 않는다 —
모두 Mac mini ai.primary 로 고정.
"""
from .backends import (
BackendBase,
BackendUnavailable,
GemmaMacMiniBackend,
QwenMacBookBackend,
get_backend,
reset_backends_for_test,
)
__all__ = [
"BackendBase",
"BackendUnavailable",
"GemmaMacMiniBackend",
"QwenMacBookBackend",
"get_backend",
"reset_backends_for_test",
]
+193
View File
@@ -0,0 +1,193 @@
"""PR-MacBook-RAG-Backend-1: /api/search/ask 의 명시 backend dispatcher.
## 정책 (정정 4)
- 기본 (`backend` 미지정) = Gemma Mac mini. 기존 코드 경로 100% 보존.
- 명시 opt-in `backend="qwen-macbook"` 만 MacBook M5 Max mlx-vlm.server 호출.
- MacBook unavailable 시 `BackendUnavailable` 예외 → /ask wrapper 가 503 +
`error_reason="macbook_unavailable"` 응답. **Gemma 자동 fallback 금지**.
## 영구 룰
- Qwen backend 는 **Mac mini llm_gate 점유 금지**. 별 endpoint, 별 concurrency.
→ MacBook 전용 `asyncio.Semaphore(1)` (single-inference 가정) 분리.
- Gemma backend 는 기존 path 그대로 (acquire_mlx_gate(FOREGROUND) + ai.primary).
llm_gate 영구 룰 ([[feedback_docstring_invariant_swap_audit]] 케이스) 보존.
"""
from __future__ import annotations
import asyncio
from abc import ABC, abstractmethod
from typing import TYPE_CHECKING
import httpx
from core.config import settings
from core.utils import setup_logger
from services.search.llm_gate import Priority, acquire_mlx_gate
if TYPE_CHECKING:
from ai.client import AIClient
logger = setup_logger("llm_backend")
# 명시 backend 식별자. None / "gemma-macmini" 는 default Gemma path.
QWEN_MACBOOK = "qwen-macbook"
GEMMA_MACMINI = "gemma-macmini"
class BackendUnavailable(Exception):
"""명시 backend 가 일시 비가용. /ask wrapper 가 503 으로 매핑."""
def __init__(self, backend_name: str, reason: str):
self.backend_name = backend_name
self.reason = reason
super().__init__(f"{backend_name} unavailable: {reason}")
class BackendBase(ABC):
name: str
@abstractmethod
async def generate(self, prompt: str, *, timeout_read_s: int) -> str:
"""프롬프트 → 본문 (OpenAI 호환 chat completion content).
실패 시 `BackendUnavailable` 또는 일반 예외. 일반 예외는 synthesis_service
가 status="llm_error" 로 매핑 (기존 동작). BackendUnavailable 만 503 으로 매핑.
"""
class GemmaMacMiniBackend(BackendBase):
"""기존 Mac mini ai.primary 경로 그대로. 코드 변경 0 path."""
name = GEMMA_MACMINI
async def generate(self, prompt: str, *, timeout_read_s: int) -> str:
# 지연 import — ai.client 가 settings.ai 의존
from ai.client import AIClient
client = AIClient()
try:
async with acquire_mlx_gate(Priority.FOREGROUND):
async with asyncio.timeout(timeout_read_s):
return await client._call_chat(client.ai.primary, prompt)
finally:
try:
await client.close()
except Exception:
pass
class QwenMacBookBackend(BackendBase):
"""MacBook M5 Max mlx-vlm.server (Tailscale) 직접 호출.
- Mac mini llm_gate 점유 X (별 endpoint 라 의미 없음 + 큐 분할 금지 영구 룰의
대상이 아님)
- MacBook 자체 single-inference 가정 → 별 semaphore(1)
- 연결 거부 / DNS / timeout / 5xx → BackendUnavailable
"""
name = QWEN_MACBOOK
_gate: asyncio.Semaphore | None = None
def __init__(self, base_url: str, model: str, timeout_connect_s: int):
self.base_url = base_url.rstrip("/")
self.model = model
self.timeout_connect_s = timeout_connect_s
@classmethod
def _get_gate(cls) -> asyncio.Semaphore:
if cls._gate is None:
cls._gate = asyncio.Semaphore(1)
return cls._gate
async def generate(self, prompt: str, *, timeout_read_s: int) -> str:
gate = self._get_gate()
timeout = httpx.Timeout(
connect=float(self.timeout_connect_s),
read=float(timeout_read_s),
write=10.0,
pool=5.0,
)
url = f"{self.base_url}/v1/chat/completions"
payload = {
"model": self.model,
"messages": [{"role": "user", "content": prompt}],
"max_tokens": 4096,
}
async with gate:
try:
async with httpx.AsyncClient(timeout=timeout) as client:
resp = await client.post(url, json=payload)
resp.raise_for_status()
data = resp.json()
return data["choices"][0]["message"]["content"]
except (
httpx.ConnectError,
httpx.ConnectTimeout,
httpx.ReadTimeout,
httpx.PoolTimeout,
httpx.WriteTimeout,
httpx.RemoteProtocolError,
) as exc:
logger.warning(
"qwen-macbook unavailable url=%s exc=%s",
url, type(exc).__name__,
)
raise BackendUnavailable(self.name, type(exc).__name__) from exc
except httpx.HTTPStatusError as exc:
# 5xx 만 unavailable, 4xx 는 호출자 잘못 → 일반 예외 전파
if 500 <= exc.response.status_code < 600:
logger.warning(
"qwen-macbook 5xx status=%d", exc.response.status_code,
)
raise BackendUnavailable(
self.name, f"http_{exc.response.status_code}"
) from exc
raise
# ── dispatcher ─────────────────────────────────────────────────────────────
_BACKENDS: dict[str, BackendBase] = {}
def _build_qwen_backend() -> QwenMacBookBackend:
b = settings.search.ask.backend
return QwenMacBookBackend(
base_url=b.macbook_url,
model=b.macbook_model,
timeout_connect_s=b.timeout_connect_s,
)
def get_backend(name: str | None) -> BackendBase:
"""name 으로 backend 인스턴스 반환 (캐싱).
- None / "" / "gemma-macmini" → Gemma Mac mini (default)
- "qwen-macbook" → MacBook Qwen
- 그 외 → ValueError (호출자가 400 으로 매핑)
"""
key = (name or "").strip().lower() or GEMMA_MACMINI
if key not in (GEMMA_MACMINI, QWEN_MACBOOK):
raise ValueError(f"unknown backend: {name!r}")
if key not in _BACKENDS:
if key == GEMMA_MACMINI:
_BACKENDS[key] = GemmaMacMiniBackend()
else:
_BACKENDS[key] = _build_qwen_backend()
return _BACKENDS[key]
def reset_backends_for_test() -> None:
"""test fixture 가 settings 변경 후 backend 인스턴스 재생성하려고 호출.
production code 에서 사용 X.
"""
_BACKENDS.clear()
QwenMacBookBackend._gate = None
+15 -8
View File
@@ -3,17 +3,24 @@
Mac mini MLX primary(gemma-4-26b-a4b-it-8bit)는 **single-inference**다.
동시 호출이 들어오면 queue가 폭발한다(실측: 23 concurrent 요청 → 22개 15초 timeout).
이 모듈은 analyzer / evidence / classifier / synthesis 등 **모든 MLX-bound LLM
호출**이 공유하는 **우선순위 기반 gate** 를 제공한다. concurrency 는 1 고정이지만
queue 의 ordering 은 `Priority.FOREGROUND` (user-facing ask) 가 `Priority.BACKGROUND`
(digest/briefing/worker) 보다 먼저 dispatch.
이 모듈은 analyzer / evidence / classifier / synthesis(gemma-macmini backend
한정) 등 **Mac mini MLX endpoint 로 향하는 모든 호출**이 공유하는 **우선순위
기반 gate** 를 제공한다. concurrency 는 1 고정이지만 queue 의 ordering 은
`Priority.FOREGROUND` (user-facing ask) 가 `Priority.BACKGROUND` (digest/
briefing/worker) 보다 먼저 dispatch.
PR-MacBook-RAG-Backend-1 부터 `services.llm.QwenMacBookBackend` 는 별 endpoint
(MacBook mlx-vlm.server) 라 본 gate 와 무관 — 자체 Semaphore(1) 사용.
## 영구 룰
- **MLX primary 호출 경로는 예외 없이 gate 획득 필수**. query_analyzer /
evidence / classifier / synthesis 4 곳이 현재 사용자. 이후 경로가 늘어도
동일 gate를 import해서 사용한다. 새 Semaphore를 만들지 말 것 (큐 분할 시
동시 실행 발생).
- **Mac mini MLX endpoint 호출 경로는 예외 없이 gate 획득 필수**. query_analyzer /
evidence / classifier / `synthesis (gemma-macmini backend)` 가 현재 사용자.
이후 경로가 늘어도 **같은 Mac mini endpoint** 라면 동일 gate를 import해서
사용한다. 새 Semaphore를 만들지 말 것 (같은 endpoint 에서 큐 분할 시 동시 실행
발생, [[feedback_docstring_invariant_swap_audit]] PR #20 사고 케이스).
다른 endpoint (MacBook 등) 는 그 endpoint 전용 별 gate 를 둔다 — 본 gate 와
무관.
- **`asyncio.timeout(...)`은 gate 안쪽에서만 적용**. gate 대기 자체에 timeout을
걸면 "대기만으로 timeout 발동" 버그가 재발한다(query_analyzer 초기 이슈).
- **fallback(Claude Sonnet 4 API) 경로는 gate 제외**. PR #20 이후 fallback = Claude API. 단 현재
+71 -31
View File
@@ -9,10 +9,18 @@ evidence span 을 Gemma 4 에 전달해 citation 기반 답변을 생성한다.
`EvidenceItem.full_snippet` 을 프롬프트에 포함하면 LLM 이 span 밖 내용을
hallucinate 한다. 이 규칙이 깨지면 시스템 무너짐 → docstring + 코드 패턴으로
방어 (함수 상단에서 제한 뷰만 만든다).
- **cache 는 성공 + 고신뢰에만**: 실패 (timeout/parse_failed/llm_error) 와
low confidence / refused 는 캐시 금지. 잘못된 답변 고정 방지.
- **MLX gate 공유**: `get_mlx_gate()` 경유. analyzer / evidence 와 동일 semaphore.
- **timeout 15s**: `asyncio.timeout` 은 gate 안쪽에서만 적용. 바깥에 두면 gate
- **cache 는 성공 + 고신뢰에만**: 실패 (timeout/parse_failed/llm_error/
backend_unavailable) 와 low confidence / refused 는 캐시 금지. 잘못된 답변
고정 방지.
- **backend dispatcher**: PR-MacBook-RAG-Backend-1 부터 LLM 호출은
`services.llm.get_backend(name)` 경유. Gemma backend 는 기존 Mac mini MLX
gate (analyzer/evidence 와 공유 semaphore) 그대로. Qwen backend 는 MacBook
endpoint + 별 semaphore (Mac mini gate 점유 X). 새 backend 추가 시 본
invariant 만 지키면 됨 — 큐 분할 영구 룰은 **같은 endpoint** 한정 적용.
- **명시 opt-in 만 Qwen**: `backend` 인자가 `"qwen-macbook"` 일 때만 MacBook
호출. 미지정 (None) 은 항상 Gemma. Qwen 비가용 시 status="backend_unavailable"
로 반환 — /ask wrapper 가 503 으로 매핑하며 Gemma 자동 fallback 금지.
- **timeout 30s**: `asyncio.timeout` 은 gate 안쪽에서만 적용. 바깥에 두면 gate
대기만으로 timeout 발동.
- **citation 검증**: 본문 `[n]` 범위 초과는 제거 + `hallucination_flags` 기록.
answer 수정본을 반환하되 status 는 completed 유지 (silent fix + observable).
@@ -30,8 +38,7 @@ from typing import TYPE_CHECKING, Literal
from ai.client import AIClient, _load_prompt, parse_json_response
from core.config import settings
from core.utils import setup_logger
from .llm_gate import Priority, acquire_mlx_gate
from services.llm import BackendUnavailable, get_backend
if TYPE_CHECKING:
from .evidence_service import EvidenceItem
@@ -52,6 +59,10 @@ SynthesisStatus = Literal[
"no_evidence",
"parse_failed",
"llm_error",
# PR-MacBook-RAG-Backend-1: 명시 opt-in backend (예: qwen-macbook) 가 일시
# 비가용일 때만 발생. /ask wrapper 가 503 + error_reason=macbook_unavailable
# 로 매핑. **Gemma 자동 fallback 금지** (silent fallback 방지 영구 룰).
"backend_unavailable",
]
@@ -95,16 +106,19 @@ def _model_version() -> str:
return "unknown-model"
def _cache_key(query: str, chunk_ids: list[int]) -> str:
"""(query + sorted chunk_ids + PROMPT_VERSION + model) sha256."""
def _cache_key(query: str, chunk_ids: list[int], backend_name: str) -> str:
"""(query + sorted chunk_ids + PROMPT_VERSION + model + backend) sha256.
backend_name 을 키에 포함 — Qwen 과 Gemma 캐시 충돌 방지.
"""
sorted_ids = ",".join(str(c) for c in sorted(chunk_ids))
raw = f"{query}|{sorted_ids}|{PROMPT_VERSION}|{_model_version()}"
raw = f"{query}|{sorted_ids}|{PROMPT_VERSION}|{_model_version()}|{backend_name}"
return hashlib.sha256(raw.encode("utf-8")).hexdigest()
def get_cached(query: str, chunk_ids: list[int]) -> SynthesisResult | None:
def get_cached(query: str, chunk_ids: list[int], backend_name: str = "gemma-macmini") -> SynthesisResult | None:
"""캐시 조회. TTL 경과는 자동 삭제."""
key = _cache_key(query, chunk_ids)
key = _cache_key(query, chunk_ids, backend_name)
entry = _CACHE.get(key)
if entry is None:
return None
@@ -124,11 +138,11 @@ def _should_cache(result: SynthesisResult) -> bool:
)
def set_cached(query: str, chunk_ids: list[int], result: SynthesisResult) -> None:
def set_cached(query: str, chunk_ids: list[int], result: SynthesisResult, backend_name: str = "gemma-macmini") -> None:
"""조건부 저장 + FIFO eviction."""
if not _should_cache(result):
return
key = _cache_key(query, chunk_ids)
key = _cache_key(query, chunk_ids, backend_name)
if key in _CACHE:
_CACHE[key] = result
return
@@ -224,14 +238,26 @@ async def synthesize(
evidence: list["EvidenceItem"],
ai_client: AIClient | None = None,
debug: bool = False,
backend: str | None = None,
) -> SynthesisResult:
"""evidence → grounded answer.
Failure modes 는 모두 SynthesisResult 로 반환한다 (예외는 외부로 전파되지
않음). 호출자 (`/ask` wrapper) 가 status 를 보고 user-facing 메시지를
결정한다.
Args:
backend: 명시 backend 선택 (PR-MacBook-RAG-Backend-1).
- None / "gemma-macmini" (default): Mac mini Gemma 4 26B. 기존 경로 100% 보존.
- "qwen-macbook": MacBook M5 Max Qwen 3.6 27B. unavailable 시
status="backend_unavailable" 반환 (Gemma 자동 fallback 금지).
ai_client: legacy 인자. Gemma path 는 backend 객체가 자체 AIClient 생성하므로
전달돼도 무시된다. Qwen path 는 사용하지 않음. 하위 호환용으로 보존.
"""
t_start = time.perf_counter()
backend_obj = get_backend(backend)
backend_name = backend_obj.name
# ── evidence 비면 즉시 no_evidence ─────────────────
if not evidence:
@@ -253,7 +279,7 @@ async def synthesize(
chunk_ids = [
(e.chunk_id if e.chunk_id is not None else -e.doc_id) for e in evidence
]
cached = get_cached(query, chunk_ids)
cached = get_cached(query, chunk_ids, backend_name)
if cached is not None:
return SynthesisResult(
status=cached.status,
@@ -286,32 +312,45 @@ async def synthesize(
prompt = _render_prompt(query, evidence)
prompt_preview = prompt[:500] if debug else None
# ── LLM 호출 ───────────────────────────────────────
client_owned = False
if ai_client is None:
ai_client = AIClient()
client_owned = True
# ── LLM 호출 (backend dispatcher) ──────────────────
# 각 backend 는 자체 gate/concurrency/timeout 보호 책임. asyncio.timeout 은
# backend.generate 안쪽에서 발동 (gate 안쪽 영구 룰 보존).
raw: str | None = None
llm_error: str | None = None
backend_unavailable_reason: str | None = None
try:
async with acquire_mlx_gate(Priority.FOREGROUND):
async with asyncio.timeout(LLM_TIMEOUT_MS / 1000):
raw = await ai_client._call_chat(ai_client.ai.primary, prompt)
raw = await backend_obj.generate(
prompt, timeout_read_s=int(LLM_TIMEOUT_MS / 1000),
)
except BackendUnavailable as exc:
# 명시 opt-in backend 일시 비가용. 절대 다른 backend 로 자동 fallback 하지 않는다.
backend_unavailable_reason = exc.reason
except asyncio.TimeoutError:
llm_error = "timeout"
except Exception as exc:
llm_error = f"llm_error:{type(exc).__name__}"
finally:
if client_owned:
try:
await ai_client.close()
except Exception:
pass
elapsed_ms = (time.perf_counter() - t_start) * 1000
if backend_unavailable_reason is not None:
logger.warning(
"synthesis backend_unavailable backend=%s reason=%s query=%r evidence_n=%d elapsed_ms=%.0f",
backend_name, backend_unavailable_reason, query[:80], len(evidence), elapsed_ms,
)
return SynthesisResult(
status="backend_unavailable",
answer=None,
used_citations=[],
confidence=None,
refused=False,
refuse_reason=None,
elapsed_ms=elapsed_ms,
cache_hit=False,
hallucination_flags=[f"backend_unavailable:{backend_name}:{backend_unavailable_reason}"],
raw_preview=None,
)
if llm_error is not None:
status: SynthesisStatus = "timeout" if llm_error == "timeout" else "llm_error"
logger.warning(
@@ -412,7 +451,8 @@ async def synthesize(
)
logger.info(
"synthesis ok query=%r evidence_n=%d answer_len=%d citations=%d conf=%s flags=%s elapsed_ms=%.0f",
"synthesis ok backend=%s query=%r evidence_n=%d answer_len=%d citations=%d conf=%s flags=%s elapsed_ms=%.0f",
backend_name,
query[:80],
len(evidence),
len(corrected_answer_final or ""),
@@ -423,5 +463,5 @@ async def synthesize(
)
# 조건부 캐시 저장
set_cached(query, chunk_ids, result)
set_cached(query, chunk_ids, result, backend_name)
return result
+13
View File
@@ -70,6 +70,19 @@ ai:
pending_threshold: 5 # deep_summary stage 의 pending+processing
window_minutes: 30
# ─── /api/search/ask backend dispatcher (PR-MacBook-RAG-Backend-1) ───
# backend 미지정 (default) → Gemma Mac mini (settings.ai.primary 경로 그대로, 변동 0).
# backend="qwen-macbook" 명시 opt-in → MacBook M5 Max mlx-vlm.server. unavailable 시 503.
# 자동 fallback 없음 ([[macbook-inference-endpoint-role]] Invariant 1).
search:
ask:
backend:
macmini_url: "http://100.76.254.116:8801" # Gemma 경로 = settings.ai.primary 가 권위, 본 키는 spec 일관성 + 변경 추적용
macbook_url: "http://100.118.112.84:8810" # MacBook M5 Max Tailscale interface bind
macbook_model: "mlx-community/Qwen3.6-27B-8bit"
timeout_connect_s: 1 # MacBook sleep/wake 빠른 감지 (자동 fallback 부재 → 빠른 503)
timeout_read_s: 30 # synthesis_service.LLM_TIMEOUT_MS=30000 와 align
nas:
mount_path: "/documents"
pkm_root: "/documents/PKM"
+291
View File
@@ -0,0 +1,291 @@
"""PR-MacBook-RAG-Backend-1 정정 4 핵심 테스트.
검증 invariant (synthesize 함수 레벨 — /ask wrapper 의 503 매핑은 search.py 의
status="backend_unavailable" 분기로 1:1 deterministic):
1. backend="qwen-macbook" + MacBook URL 죽은 포트
→ synthesize() 가 SynthesisResult(status="backend_unavailable", ...) 반환
→ Gemma backend 의 generate() 가 **단 1번도 호출되지 않음** (자동 fallback 부재)
2. backend 미지정 (None)
→ Gemma backend.generate() 호출, Qwen backend.generate() 호출 0
→ 기존 호출자 (Hermes docsrv_ask / voice-memo-bot) 회귀 0
3. backend="qwen-macbook" + MacBook 정상 응답
→ status="completed" + answer 채워짐, Gemma backend 호출 0
테스트 전략:
- synthesize() 가 호출하는 backend dispatcher (services.llm.get_backend) 를
monkeypatch 해서 mock backend 주입.
- Gemma backend 의 generate AsyncMock 호출 횟수를 추적.
- 정정 4 의 핵심 가드: `gemma_backend.generate.assert_not_called()`
"""
from __future__ import annotations
import asyncio
import os
import sys
from dataclasses import dataclass
from unittest.mock import AsyncMock
import pytest
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", "app"))
# ── 가짜 evidence (synthesize 의 no_evidence 분기 회피용 최소 객체) ─────────
@dataclass
class _FakeEvidence:
n: int = 1
doc_id: int = 100
chunk_id: int | None = 200
title: str | None = "fake doc"
span_text: str = "이것은 짧은 근거 텍스트입니다."
source: str = "llm"
def _make_evidence():
return [_FakeEvidence()]
# ── backend mock ───────────────────────────────────────────────────────────
def _gemma_mock(content: str = "GEMMA_SHOULD_NEVER_BE_CALLED"):
m = AsyncMock()
m.name = "gemma-macmini"
m.generate = AsyncMock(return_value=content)
return m
def _qwen_mock_success(content: str):
m = AsyncMock()
m.name = "qwen-macbook"
m.generate = AsyncMock(return_value=content)
return m
def _qwen_mock_unavailable():
from services.llm import BackendUnavailable
m = AsyncMock()
m.name = "qwen-macbook"
m.generate = AsyncMock(
side_effect=BackendUnavailable("qwen-macbook", "ConnectError")
)
return m
# ── 공통 fixture: synthesis_service 에 mock backend 주입 ───────────────────
@pytest.fixture
def patched_backends(monkeypatch):
"""services.llm.get_backend 를 mock dispatcher 로 치환.
Returns (gemma_mock, qwen_mock, set_qwen_unavailable_fn).
"""
from services.search import synthesis_service
gemma = _gemma_mock()
qwen_holder = {"backend": _qwen_mock_success(
'{"answer":"Qwen ok [1]","confidence":"high","refused":false}'
)}
def _fake_get_backend(name: str | None):
key = (name or "").strip().lower() or "gemma-macmini"
if key == "gemma-macmini":
return gemma
if key == "qwen-macbook":
return qwen_holder["backend"]
raise ValueError(f"unknown backend: {name!r}")
monkeypatch.setattr(synthesis_service, "get_backend", _fake_get_backend)
# synthesis_service 캐시 비움 (qwen vs gemma 캐시 분리 invariant)
synthesis_service._CACHE.clear()
def _swap_qwen_unavailable():
qwen_holder["backend"] = _qwen_mock_unavailable()
return gemma, qwen_holder, _swap_qwen_unavailable
# ── 정정 4 핵심: backend=qwen-macbook + MacBook 비가용 → Gemma 호출 0 ─────
def test_qwen_unavailable_yields_backend_unavailable_status_and_gemma_not_called(
patched_backends,
):
"""**정정 4 의 핵심 invariant**.
backend="qwen-macbook" 명시 + Qwen 호출이 BackendUnavailable 로 실패 →
synthesize() 는 status="backend_unavailable" 반환. Gemma backend 의
generate() 는 **단 한 번도 호출되지 않음** (silent fallback 금지).
"""
from services.search.synthesis_service import synthesize
gemma, qwen_holder, swap_qwen_unavailable = patched_backends
swap_qwen_unavailable()
qwen = qwen_holder["backend"]
result = asyncio.run(
synthesize(
query="압력용기 최대허용응력은?",
evidence=_make_evidence(),
backend="qwen-macbook",
)
)
# 1. status
assert result.status == "backend_unavailable"
assert result.answer is None
assert result.confidence is None
assert result.refused is False
# 2. flag 에 backend 비가용 사유 기록
assert any(
f.startswith("backend_unavailable:qwen-macbook:") for f in result.hallucination_flags
), f"expected backend_unavailable flag, got {result.hallucination_flags}"
# 3. ★ 핵심 가드 ★ — Gemma backend 자동 fallback 금지
gemma.generate.assert_not_called()
# 4. Qwen 은 1회만 호출 (재시도 없음)
assert qwen.generate.call_count == 1
def test_qwen_unavailable_result_not_cached(patched_backends):
"""비가용 결과는 캐시 X — 다음 호출이 다시 Qwen 시도해야 함."""
from services.search.synthesis_service import synthesize
gemma, qwen_holder, swap_qwen_unavailable = patched_backends
swap_qwen_unavailable()
qwen = qwen_holder["backend"]
asyncio.run(
synthesize(
query="동일 쿼리",
evidence=_make_evidence(),
backend="qwen-macbook",
)
)
asyncio.run(
synthesize(
query="동일 쿼리",
evidence=_make_evidence(),
backend="qwen-macbook",
)
)
# 두 번 모두 실제 호출 (캐시 적중 X) — Gemma 는 여전히 0
assert qwen.generate.call_count == 2
gemma.generate.assert_not_called()
# ── 정정 4: backend 미지정 → 기존 Gemma path (회귀 0) ─────────────────────
def test_default_backend_calls_gemma_not_qwen(patched_backends):
"""backend 미지정 = 기본 Gemma. Qwen 호출 0."""
from services.search.synthesis_service import synthesize
gemma, qwen_holder, _ = patched_backends
qwen = qwen_holder["backend"]
gemma.generate.return_value = (
'{"answer":"Gemma 답변 [1]","confidence":"high","refused":false}'
)
result = asyncio.run(
synthesize(
query="기본 호출",
evidence=_make_evidence(),
backend=None, # 명시 None = default
)
)
assert result.status == "completed"
assert result.answer is not None and "Gemma" in result.answer
# Qwen 은 호출 0
qwen.generate.assert_not_called()
# Gemma 는 1회
assert gemma.generate.call_count == 1
# ── backend="qwen-macbook" + 정상 응답 ──────────────────────────────────────
def test_qwen_success_does_not_call_gemma(patched_backends):
"""Qwen 정상 응답 시 Gemma 는 호출되지 않음 (대칭 invariant)."""
from services.search.synthesis_service import synthesize
gemma, qwen_holder, _ = patched_backends
qwen = qwen_holder["backend"]
result = asyncio.run(
synthesize(
query="정상 호출",
evidence=_make_evidence(),
backend="qwen-macbook",
)
)
assert result.status == "completed"
assert result.answer is not None and "Qwen" in result.answer
# Gemma 는 0회
gemma.generate.assert_not_called()
# Qwen 은 1회
assert qwen.generate.call_count == 1
# ── 캐시 분리 (qwen vs gemma 키 충돌 없음) ─────────────────────────────────
def test_qwen_and_gemma_have_separate_caches(patched_backends):
"""같은 query 라도 backend 다르면 캐시 분리 — Qwen 결과가 Gemma 호출 답으로 둔갑하지 않음."""
from services.search.synthesis_service import synthesize
gemma, qwen_holder, _ = patched_backends
qwen = qwen_holder["backend"]
gemma.generate.return_value = (
'{"answer":"GEMMA_ANSWER [1]","confidence":"high","refused":false}'
)
qwen.generate.return_value = (
'{"answer":"QWEN_ANSWER [1]","confidence":"high","refused":false}'
)
r_qwen_1 = asyncio.run(
synthesize(
query="같은 query",
evidence=_make_evidence(),
backend="qwen-macbook",
)
)
r_gemma_1 = asyncio.run(
synthesize(
query="같은 query",
evidence=_make_evidence(),
backend=None,
)
)
r_qwen_2 = asyncio.run(
synthesize(
query="같은 query",
evidence=_make_evidence(),
backend="qwen-macbook",
)
)
assert "QWEN_ANSWER" in (r_qwen_1.answer or "")
assert "GEMMA_ANSWER" in (r_gemma_1.answer or "")
# 두 번째 Qwen 호출은 캐시 적중 — 결과는 동일하지만 generate 추가 호출 X
assert "QWEN_ANSWER" in (r_qwen_2.answer or "")
assert r_qwen_2.cache_hit is True
# generate 호출 횟수: Qwen 1 (두번째는 캐시), Gemma 1
assert qwen.generate.call_count == 1
assert gemma.generate.call_count == 1
+197
View File
@@ -0,0 +1,197 @@
"""PR-MacBook-RAG-Backend-1: backend dispatcher 단위 테스트.
- get_backend(None) / get_backend("gemma-macmini") → GemmaMacMiniBackend
- get_backend("qwen-macbook") → QwenMacBookBackend (config 값 반영)
- get_backend("unknown") → ValueError
- QwenMacBookBackend.generate() — mock httpx 200 OK → content 반환
- QwenMacBookBackend.generate() — dead port → BackendUnavailable("ConnectError")
목적: 정정 4 (자동 fallback 부재) 의 핵심 빌딩블럭 검증. dispatcher 자체 무결성.
"""
from __future__ import annotations
import asyncio
import os
import sys
from unittest.mock import AsyncMock, patch
import httpx
import pytest
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", "app"))
@pytest.fixture(autouse=True)
def _reset_dispatcher():
"""각 테스트 격리 — backend 인스턴스 캐시 초기화."""
from services.llm import reset_backends_for_test
reset_backends_for_test()
yield
reset_backends_for_test()
def test_get_backend_default_is_gemma():
"""backend 미지정 (None) = Gemma Mac mini default."""
from services.llm import get_backend
b = get_backend(None)
assert b.name == "gemma-macmini"
def test_get_backend_explicit_gemma():
"""gemma-macmini 명시도 동일."""
from services.llm import get_backend
b = get_backend("gemma-macmini")
assert b.name == "gemma-macmini"
def test_get_backend_qwen_macbook_uses_config():
"""qwen-macbook 은 settings.search.ask.backend 값 그대로 반영."""
from core.config import settings
from services.llm import QwenMacBookBackend, get_backend
b = get_backend("qwen-macbook")
assert isinstance(b, QwenMacBookBackend)
assert b.name == "qwen-macbook"
# config.yaml 의 search.ask.backend.macbook_url 그대로
assert b.base_url == settings.search.ask.backend.macbook_url.rstrip("/")
assert b.model == settings.search.ask.backend.macbook_model
assert b.timeout_connect_s == settings.search.ask.backend.timeout_connect_s
def test_get_backend_unknown_raises_value_error():
"""미지원 backend 이름 → ValueError (호출자가 400 으로 매핑)."""
from services.llm import get_backend
with pytest.raises(ValueError, match="unknown backend"):
get_backend("claude-opus")
def test_get_backend_cached_returns_same_instance():
"""동일 backend 재호출 시 인스턴스 캐시."""
from services.llm import get_backend
b1 = get_backend("qwen-macbook")
b2 = get_backend("qwen-macbook")
assert b1 is b2
def test_qwen_generate_success_mocked():
"""mock 200 OK → choices[0].message.content 반환."""
from services.llm import QwenMacBookBackend
fake_payload = {
"choices": [{"message": {"content": "hello from qwen"}}],
}
class _Resp:
status_code = 200
def raise_for_status(self):
return None
def json(self):
return fake_payload
async def _fake_post(self, url, json=None):
return _Resp()
backend = QwenMacBookBackend(
base_url="http://test:8810",
model="test-model",
timeout_connect_s=1,
)
with patch.object(httpx.AsyncClient, "post", new=_fake_post):
result = asyncio.run(backend.generate("hi", timeout_read_s=2))
assert result == "hello from qwen"
def test_qwen_generate_dead_port_raises_backend_unavailable():
"""실제 dead port (127.0.0.1:1) → BackendUnavailable.
정정 4 의 핵심: 명시 Qwen 호출이 실패하면 예외가 통과돼야 한다.
synthesis_service 가 이 예외를 잡아 status="backend_unavailable" 로 매핑.
"""
from services.llm import BackendUnavailable, QwenMacBookBackend
backend = QwenMacBookBackend(
base_url="http://127.0.0.1:1",
model="test-model",
timeout_connect_s=1,
)
with pytest.raises(BackendUnavailable) as exc_info:
asyncio.run(backend.generate("hi", timeout_read_s=2))
assert exc_info.value.backend_name == "qwen-macbook"
assert "ConnectError" in exc_info.value.reason or "Timeout" in exc_info.value.reason
def test_qwen_generate_http_5xx_raises_backend_unavailable():
"""5xx 응답도 BackendUnavailable 로 매핑."""
from services.llm import BackendUnavailable, QwenMacBookBackend
class _Resp:
status_code = 503
def raise_for_status(self):
raise httpx.HTTPStatusError(
"service unavailable",
request=httpx.Request("POST", "http://test:8810/v1/chat/completions"),
response=httpx.Response(503),
)
def json(self):
return {}
async def _fake_post(self, url, json=None):
return _Resp()
backend = QwenMacBookBackend(
base_url="http://test:8810",
model="test-model",
timeout_connect_s=1,
)
with patch.object(httpx.AsyncClient, "post", new=_fake_post):
with pytest.raises(BackendUnavailable) as exc_info:
asyncio.run(backend.generate("hi", timeout_read_s=2))
assert exc_info.value.backend_name == "qwen-macbook"
assert "503" in exc_info.value.reason
def test_qwen_generate_http_4xx_not_backend_unavailable():
"""4xx (호출자 잘못) 은 BackendUnavailable 아님 — 일반 예외 전파."""
from services.llm import BackendUnavailable, QwenMacBookBackend
class _Resp:
status_code = 400
def raise_for_status(self):
raise httpx.HTTPStatusError(
"bad request",
request=httpx.Request("POST", "http://test:8810/v1/chat/completions"),
response=httpx.Response(400),
)
def json(self):
return {}
async def _fake_post(self, url, json=None):
return _Resp()
backend = QwenMacBookBackend(
base_url="http://test:8810",
model="test-model",
timeout_connect_s=1,
)
with patch.object(httpx.AsyncClient, "post", new=_fake_post):
with pytest.raises(httpx.HTTPStatusError):
asyncio.run(backend.generate("hi", timeout_read_s=2))