"""ds-macbook-offload-1 P2-4 — deep 슬롯 라우팅 / 보류(StageDeferred) / drain 가드 테스트. DB 불요(unit) — AIClient 는 __new__ 로 settings 우회, drain 가드는 settings monkeypatch. 통합(보류 백오프 DB 기록, claim 경합)은 P3-2 E2E 게이트에서 라이브 실측. fixture = tests/fixtures/qwen_router_chat_completion.json (2026-06-11 라이브 박제 — 라우터 :8890 경유 model=qwen-macbook, production 호출 형상과 동일 body, 13.2s 실측). """ import json from pathlib import Path from types import SimpleNamespace import httpx import pytest from ai.client import AIClient, call_deep_or_defer, is_deferrable_error from models.queue import StageDeferred FIXTURE = Path(__file__).parent / "fixtures" / "qwen_router_chat_completion.json" def _client(deep_cfg, primary_cfg): """settings 비의존 AIClient — __init__ 우회 후 ai 슬롯만 주입.""" client = AIClient.__new__(AIClient) client.ai = SimpleNamespace(deep=deep_cfg, primary=primary_cfg) return client def _http_status_error(status: int) -> httpx.HTTPStatusError: req = httpx.Request("POST", "http://router:8890/v1/chat/completions") resp = httpx.Response(status, request=req) return httpx.HTTPStatusError(f"status {status}", request=req, response=resp) # ─── is_deferrable_error 분류 ────────────────────────────────────────────── @pytest.mark.parametrize("exc", [ _http_status_error(503), # 라우터 upstream_cold/editor_busy/warming _http_status_error(502), # 라우터: upstream 연결 실패/생성 중 절단 변환 _http_status_error(504), httpx.ConnectError("connection refused"), # 라우터 자체 불가 httpx.ConnectTimeout("connect timeout"), httpx.ReadTimeout("read timeout"), # DS↔라우터 구간 절단 httpx.ReadError("connection reset"), httpx.RemoteProtocolError("server disconnected"), ]) def test_deferrable_errors(exc): assert is_deferrable_error(exc) is True @pytest.mark.parametrize("exc", [ _http_status_error(400), # unknown alias 등 — 설정 오류는 보류 아님 _http_status_error(500), ValueError("parse"), RuntimeError("boom"), ]) def test_non_deferrable_errors(exc): assert is_deferrable_error(exc) is False # ─── call_deep 슬롯 선택 ─────────────────────────────────────────────────── @pytest.mark.asyncio async def test_call_deep_uses_deep_slot(): deep = SimpleNamespace(model="qwen-macbook") primary = SimpleNamespace(model="gemma-26b") client = _client(deep, primary) captured = {} async def fake_request(cfg, prompt, system=None): captured["cfg"] = cfg return "ok" client._request = fake_request assert await client.call_deep("p") == "ok" assert captured["cfg"] is deep @pytest.mark.asyncio async def test_call_deep_falls_back_to_primary_when_slot_absent(): """슬롯 부재 = 기능 미활성 (방어적 primary — silent 강등이 아니라 기존 경로 그대로).""" primary = SimpleNamespace(model="gemma-26b") client = _client(None, primary) captured = {} async def fake_request(cfg, prompt, system=None): captured["cfg"] = cfg return "ok" client._request = fake_request await client.call_deep("p") assert captured["cfg"] is primary # ─── call_deep_or_defer 보류 변환 ────────────────────────────────────────── @pytest.mark.asyncio @pytest.mark.parametrize("exc", [ _http_status_error(503), httpx.ConnectError("refused"), httpx.ReadTimeout("cut mid-generation"), ]) async def test_defer_conversion(exc): client = _client(SimpleNamespace(model="qwen-macbook"), None) async def fail_request(cfg, prompt, system=None): raise exc client._request = fail_request with pytest.raises(StageDeferred): await call_deep_or_defer(client, "p") @pytest.mark.asyncio async def test_non_deferrable_propagates(): """400/일반 오류는 StageDeferred 아님 — 호출자 기존 실패 경로로 전파.""" client = _client(SimpleNamespace(model="qwen-macbook"), None) async def fail_request(cfg, prompt, system=None): raise _http_status_error(400) client._request = fail_request with pytest.raises(httpx.HTTPStatusError): await call_deep_or_defer(client, "p") def test_stage_deferred_carries_backoff(): e = StageDeferred("macbook_unavailable:ConnectError") assert e.retry_after_minutes == 30 def test_router_fixture_shape(): """_request 파싱 경로(choices[0].message.content)가 라우터 실응답 형상과 일치하는지 고정.""" data = json.loads(FIXTURE.read_text()) content = data["choices"][0]["message"]["content"] assert isinstance(content, str) and len(content) > 0 assert data["choices"][0]["message"]["role"] == "assistant" # 라우터가 alias 를 upstream 로컬 경로로 치환해 응답 — 실처리 모델 추적 가능 assert "Qwen3.6-27B-8bit" in data["model"] # ─── drain 가드 (silent 강등 금지) ───────────────────────────────────────── @pytest.mark.asyncio async def test_drain_requires_deep_slot(monkeypatch): import workers.queue_drain as qd monkeypatch.setattr(qd, "settings", SimpleNamespace(ai=SimpleNamespace(deep=None))) with pytest.raises(SystemExit): await qd.drain("summarize", 1) @pytest.mark.asyncio async def test_drain_rejects_non_drain_stage(monkeypatch): """classify 는 2026-06-12 fair-share 로 DRAIN_STAGES 합류 — 거부 대상은 extract 등.""" import workers.queue_drain as qd monkeypatch.setattr(qd, "settings", SimpleNamespace(ai=SimpleNamespace(deep=object()))) with pytest.raises(SystemExit): await qd.drain("extract", 1)