Files
hyungi_document_server/tests/test_macbook_offload_deep_slot.py
2026-06-12 07:22:47 +09:00

161 lines
6.0 KiB
Python

"""ds-macbook-offload-1 P2-4 — deep 슬롯 라우팅 / 보류(StageDeferred) / drain 가드 테스트.
DB 불요(unit) — AIClient 는 __new__ 로 settings 우회, drain 가드는 settings monkeypatch.
통합(보류 백오프 DB 기록, claim 경합)은 P3-2 E2E 게이트에서 라이브 실측.
fixture = tests/fixtures/qwen_router_chat_completion.json (2026-06-11 라이브 박제 —
라우터 :8890 경유 model=qwen-macbook, production 호출 형상과 동일 body, 13.2s 실측).
"""
import json
from pathlib import Path
from types import SimpleNamespace
import httpx
import pytest
from ai.client import AIClient, call_deep_or_defer, is_deferrable_error
from models.queue import StageDeferred
FIXTURE = Path(__file__).parent / "fixtures" / "qwen_router_chat_completion.json"
def _client(deep_cfg, primary_cfg):
"""settings 비의존 AIClient — __init__ 우회 후 ai 슬롯만 주입."""
client = AIClient.__new__(AIClient)
client.ai = SimpleNamespace(deep=deep_cfg, primary=primary_cfg)
return client
def _http_status_error(status: int) -> httpx.HTTPStatusError:
req = httpx.Request("POST", "http://router:8890/v1/chat/completions")
resp = httpx.Response(status, request=req)
return httpx.HTTPStatusError(f"status {status}", request=req, response=resp)
# ─── is_deferrable_error 분류 ──────────────────────────────────────────────
@pytest.mark.parametrize("exc", [
_http_status_error(503), # 라우터 upstream_cold/editor_busy/warming
_http_status_error(502), # 라우터: upstream 연결 실패/생성 중 절단 변환
_http_status_error(504),
httpx.ConnectError("connection refused"), # 라우터 자체 불가
httpx.ConnectTimeout("connect timeout"),
httpx.ReadTimeout("read timeout"), # DS↔라우터 구간 절단
httpx.ReadError("connection reset"),
httpx.RemoteProtocolError("server disconnected"),
])
def test_deferrable_errors(exc):
assert is_deferrable_error(exc) is True
@pytest.mark.parametrize("exc", [
_http_status_error(400), # unknown alias 등 — 설정 오류는 보류 아님
_http_status_error(500),
ValueError("parse"),
RuntimeError("boom"),
])
def test_non_deferrable_errors(exc):
assert is_deferrable_error(exc) is False
# ─── call_deep 슬롯 선택 ───────────────────────────────────────────────────
@pytest.mark.asyncio
async def test_call_deep_uses_deep_slot():
deep = SimpleNamespace(model="qwen-macbook")
primary = SimpleNamespace(model="gemma-26b")
client = _client(deep, primary)
captured = {}
async def fake_request(cfg, prompt, system=None):
captured["cfg"] = cfg
return "ok"
client._request = fake_request
assert await client.call_deep("p") == "ok"
assert captured["cfg"] is deep
@pytest.mark.asyncio
async def test_call_deep_falls_back_to_primary_when_slot_absent():
"""슬롯 부재 = 기능 미활성 (방어적 primary — silent 강등이 아니라 기존 경로 그대로)."""
primary = SimpleNamespace(model="gemma-26b")
client = _client(None, primary)
captured = {}
async def fake_request(cfg, prompt, system=None):
captured["cfg"] = cfg
return "ok"
client._request = fake_request
await client.call_deep("p")
assert captured["cfg"] is primary
# ─── call_deep_or_defer 보류 변환 ──────────────────────────────────────────
@pytest.mark.asyncio
@pytest.mark.parametrize("exc", [
_http_status_error(503),
httpx.ConnectError("refused"),
httpx.ReadTimeout("cut mid-generation"),
])
async def test_defer_conversion(exc):
client = _client(SimpleNamespace(model="qwen-macbook"), None)
async def fail_request(cfg, prompt, system=None):
raise exc
client._request = fail_request
with pytest.raises(StageDeferred):
await call_deep_or_defer(client, "p")
@pytest.mark.asyncio
async def test_non_deferrable_propagates():
"""400/일반 오류는 StageDeferred 아님 — 호출자 기존 실패 경로로 전파."""
client = _client(SimpleNamespace(model="qwen-macbook"), None)
async def fail_request(cfg, prompt, system=None):
raise _http_status_error(400)
client._request = fail_request
with pytest.raises(httpx.HTTPStatusError):
await call_deep_or_defer(client, "p")
def test_stage_deferred_carries_backoff():
e = StageDeferred("macbook_unavailable:ConnectError")
assert e.retry_after_minutes == 30
def test_router_fixture_shape():
"""_request 파싱 경로(choices[0].message.content)가 라우터 실응답 형상과 일치하는지 고정."""
data = json.loads(FIXTURE.read_text())
content = data["choices"][0]["message"]["content"]
assert isinstance(content, str) and len(content) > 0
assert data["choices"][0]["message"]["role"] == "assistant"
# 라우터가 alias 를 upstream 로컬 경로로 치환해 응답 — 실처리 모델 추적 가능
assert "Qwen3.6-27B-8bit" in data["model"]
# ─── drain 가드 (silent 강등 금지) ─────────────────────────────────────────
@pytest.mark.asyncio
async def test_drain_requires_deep_slot(monkeypatch):
import workers.queue_drain as qd
monkeypatch.setattr(qd, "settings", SimpleNamespace(ai=SimpleNamespace(deep=None)))
with pytest.raises(SystemExit):
await qd.drain("summarize", 1)
@pytest.mark.asyncio
async def test_drain_rejects_non_drain_stage(monkeypatch):
"""classify 는 2026-06-12 fair-share 로 DRAIN_STAGES 합류 — 거부 대상은 extract 등."""
import workers.queue_drain as qd
monkeypatch.setattr(qd, "settings", SimpleNamespace(ai=SimpleNamespace(deep=object())))
with pytest.raises(SystemExit):
await qd.drain("extract", 1)