"""2026-06-12 fair-share 번들 — gate capacity 일반화 / call_deep_or_defer cfg / drain classify. worker-process 레벨(DB 필요)의 deep 폴백·classify drain 은 라이브 E2E 로 검증하고, 여기서는 새 메커니즘의 seam 만 단위 검증한다. """ from __future__ import annotations import asyncio from types import SimpleNamespace import httpx import pytest from core.config import settings from services.search.llm_gate import Priority, _reset_for_test, acquire_mlx_gate, gate_status @pytest.fixture(autouse=True) def _reset_gate(monkeypatch): monkeypatch.setattr(settings, "mlx_gate_concurrency", 2) _reset_for_test() yield _reset_for_test() # ─── gate capacity 2 ───────────────────────────────────────────────────────── @pytest.mark.asyncio async def test_two_concurrent_holders_overlap(): """capacity=2: 두 holder 가 동시에 inflight — 서로를 기다리지 않는다.""" log: list = [] async def hold(label: str): async with acquire_mlx_gate(Priority.BACKGROUND): log.append(("in", label)) await asyncio.sleep(0.05) log.append(("out", label)) await asyncio.gather(hold("a"), hold("b")) # 둘 다 진입한 뒤에 첫 release 가 나와야 함 (overlap 증명) assert log[0][0] == "in" and log[1][0] == "in" @pytest.mark.asyncio async def test_third_waits_until_slot_frees(): """capacity=2: 3번째는 대기, 첫 release 후 진입.""" order: list = [] release_a = asyncio.Event() async def hold(label: str, wait_event: asyncio.Event | None): async with acquire_mlx_gate(Priority.BACKGROUND): order.append(("in", label)) if wait_event: await wait_event.wait() else: await asyncio.sleep(0.01) order.append(("out", label)) t_a = asyncio.create_task(hold("a", release_a)) t_b = asyncio.create_task(hold("b", release_a)) await asyncio.sleep(0.02) # a, b inflight 진입 대기 assert ("in", "a") in order and ("in", "b") in order assert gate_status()["inflight"] == 2 t_c = asyncio.create_task(hold("c", None)) await asyncio.sleep(0.02) assert ("in", "c") not in order # 슬롯 2 점유 중 — c 는 대기 assert gate_status()["waiters"] == 1 release_a.set() await asyncio.gather(t_a, t_b, t_c) assert ("in", "c") in order @pytest.mark.asyncio async def test_capacity_one_serializes(): """concurrency=1 이면 기존 직렬화 그대로 (무회귀).""" from core.config import settings as s s_backup = s.mlx_gate_concurrency s.mlx_gate_concurrency = 1 try: _reset_for_test() log: list = [] async def hold(label: str): async with acquire_mlx_gate(Priority.BACKGROUND): log.append(("in", label)) await asyncio.sleep(0.02) log.append(("out", label)) await asyncio.gather(hold("a"), hold("b")) # 직렬: in/out 쌍이 겹치지 않음 assert [e[0] for e in log] == ["in", "out", "in", "out"] finally: s.mlx_gate_concurrency = s_backup # ─── call_deep_or_defer cfg override ───────────────────────────────────────── @pytest.mark.asyncio async def test_call_deep_or_defer_cfg_override(): """cfg 지정 시 deep 슬롯 대신 해당 config 로 _request 호출.""" from ai.client import call_deep_or_defer seen: dict = {} class FakeClient: ai = SimpleNamespace(deep=SimpleNamespace(model="deep-slot")) async def _request(self, cfg, prompt, system=None): seen["cfg"] = cfg return "ok" async def call_deep(self, prompt, system=None): seen["cfg"] = self.ai.deep return "ok" override = SimpleNamespace(model="deep-endpoint-triage-sampling", temperature=0.0) out = await call_deep_or_defer(FakeClient(), "p", cfg=override) assert out == "ok" assert seen["cfg"] is override @pytest.mark.asyncio async def test_call_deep_or_defer_cfg_still_defers(): """cfg 경로에서도 보류 분류(502/503/TransportError → StageDeferred) 동일 적용.""" from ai.client import call_deep_or_defer from models.queue import StageDeferred class FakeClient: ai = SimpleNamespace(deep=SimpleNamespace(model="deep-slot")) async def _request(self, cfg, prompt, system=None): raise httpx.ConnectError("down") with pytest.raises(StageDeferred): await call_deep_or_defer(FakeClient(), "p", cfg=SimpleNamespace(model="x")) # ─── drain stages ──────────────────────────────────────────────────────────── def test_drain_stages_include_classify(): from workers.queue_drain import DRAIN_STAGES assert set(DRAIN_STAGES) == {"summarize", "deep_summary", "classify"}