"""DS-Mac-mini-26B-Priority-Gate-1 (B-1) unit smoke — 6 scenario. heap + inflight 기반 MLX gate 의 ordering/cancellation/fast-path 검증. 실 LLM 호출 없이 gate 자체의 acquire/release/dispatch 만 시뮬레이션. """ from __future__ import annotations import asyncio import logging import pytest from services.search.llm_gate import ( Priority, _reset_for_test, acquire_mlx_gate, get_mlx_gate, ) @pytest.fixture(autouse=True) def _reset_gate(): """각 테스트 시작 시 gate 상태 reset (fresh event loop 마다).""" _reset_for_test() yield _reset_for_test() async def _hold(priority: Priority, duration_s: float, log: list, label: str): """단순 gate holder — acquire 후 sleep 후 release.""" async with acquire_mlx_gate(priority): log.append(("acquired", label)) await asyncio.sleep(duration_s) log.append(("released", label)) # ─── Scenario 1: FIFO within same priority ──────────────────────────────── @pytest.mark.asyncio async def test_fifo_within_same_priority(): """BACKGROUND 3건 직렬 enqueue → 순서대로 dispatch (1→2→3).""" log: list = [] async def acquirer(label: str, started: asyncio.Event): async with acquire_mlx_gate(Priority.BACKGROUND): log.append(label) started.set() await asyncio.sleep(0.05) e1, e2, e3 = asyncio.Event(), asyncio.Event(), asyncio.Event() t1 = asyncio.create_task(acquirer("bg1", e1)) await asyncio.sleep(0.001) # bg1 이 fast-path 먼저 t2 = asyncio.create_task(acquirer("bg2", e2)) await asyncio.sleep(0.001) t3 = asyncio.create_task(acquirer("bg3", e3)) await asyncio.gather(t1, t2, t3) assert log == ["bg1", "bg2", "bg3"], f"FIFO 위반: {log}" # ─── Scenario 2: Foreground jumps queue ─────────────────────────────────── @pytest.mark.asyncio async def test_foreground_jumps_queue(): """BACKGROUND 5건 enqueue (1 inflight, 2~5 대기) → FOREGROUND 1건 enqueue → 1번 release 후 다음은 FOREGROUND (2~5 모두 뒤로). """ log: list = [] bg_release_signals = [asyncio.Event() for _ in range(5)] fg_release_signal = asyncio.Event() async def bg(i: int): async with acquire_mlx_gate(Priority.BACKGROUND): log.append(f"bg{i}") await bg_release_signals[i].wait() async def fg(): async with acquire_mlx_gate(Priority.FOREGROUND): log.append("fg") await fg_release_signal.wait() # bg0 fast-path inflight t_bg0 = asyncio.create_task(bg(0)) await asyncio.sleep(0.01) # bg0 이 inflight 확실해질 때까지 # bg1~4 enqueue (대기) t_bgs = [asyncio.create_task(bg(i)) for i in range(1, 5)] await asyncio.sleep(0.01) # fg enqueue (대기) — 모든 background 보다 앞으로 jump 해야 함 t_fg = asyncio.create_task(fg()) await asyncio.sleep(0.01) # bg0 release → 다음 dispatch 는 FG bg_release_signals[0].set() await asyncio.sleep(0.05) assert log == ["bg0", "fg"], f"FG jump 위반 (bg1 먼저 들어옴?): {log}" # fg release → bg1 fg_release_signal.set() await asyncio.sleep(0.05) assert log == ["bg0", "fg", "bg1"], f"FIFO 위반: {log}" # 나머지 풀어줌 for i in range(1, 5): bg_release_signals[i].set() await asyncio.gather(t_bg0, *t_bgs, t_fg) assert log == ["bg0", "fg", "bg1", "bg2", "bg3", "bg4"], ( f"최종 순서: {log}" ) # ─── Scenario 3: Long-running background blocks foreground (intended) ──── @pytest.mark.asyncio async def test_long_running_background_blocks_foreground(): """이미 inflight 인 background 는 끊지 않는다 (preemption X). foreground 들어와도 현재 점유 background 의 남은 시간만큼 대기. """ log: list = [] async def bg(): async with acquire_mlx_gate(Priority.BACKGROUND): log.append(("bg_start", asyncio.get_event_loop().time())) await asyncio.sleep(0.3) log.append(("bg_end", asyncio.get_event_loop().time())) async def fg(): await asyncio.sleep(0.05) # bg 가 inflight 진입한 후 fg 시도 async with acquire_mlx_gate(Priority.FOREGROUND): log.append(("fg_start", asyncio.get_event_loop().time())) t_bg = asyncio.create_task(bg()) t_fg = asyncio.create_task(fg()) await asyncio.gather(t_bg, t_fg) events = {label: t for label, t in log} # fg 는 bg_end 이후에만 시작 가능 assert events["fg_start"] >= events["bg_end"] - 0.01, ( f"preemption 발생? fg_start={events['fg_start']} bg_end={events['bg_end']}" ) # ─── Scenario 4: Mixed concurrent enqueue (FG FIFO 먼저, BG FIFO 후) ── @pytest.mark.asyncio async def test_mixed_concurrent_enqueue(): """2 FOREGROUND + 3 BACKGROUND 가 거의 동시 → FG fifo 먼저, BG fifo 후.""" log: list = [] holder_started = asyncio.Event() holder_release = asyncio.Event() async def holder(): async with acquire_mlx_gate(Priority.BACKGROUND): holder_started.set() await holder_release.wait() async def fg(i: int): async with acquire_mlx_gate(Priority.FOREGROUND): log.append(f"fg{i}") await asyncio.sleep(0.005) async def bg(i: int): async with acquire_mlx_gate(Priority.BACKGROUND): log.append(f"bg{i}") await asyncio.sleep(0.005) # holder 가 fast-path 점유 t_h = asyncio.create_task(holder()) await holder_started.wait() # 거의 동시 enqueue (gather) tasks = [ asyncio.create_task(fg(0)), asyncio.create_task(bg(0)), asyncio.create_task(fg(1)), asyncio.create_task(bg(1)), asyncio.create_task(bg(2)), ] await asyncio.sleep(0.01) # 모두 enqueue 까지 진행 # holder release → priority 순서대로 dispatch holder_release.set() await asyncio.gather(t_h, *tasks) # fg0, fg1 가 먼저 (FG FIFO) → bg0, bg1, bg2 (BG FIFO) assert log == ["fg0", "fg1", "bg0", "bg1", "bg2"], f"priority order 위반: {log}" # ─── Scenario 5: Backward compat — async with get_mlx_gate() ────────────── @pytest.mark.asyncio async def test_backward_compat_get_mlx_gate(): """`async with get_mlx_gate():` legacy 형태가 BACKGROUND priority 로 매핑.""" log: list = [] async def legacy_caller(): async with get_mlx_gate(): log.append("legacy") await asyncio.sleep(0.01) async def fg_caller(): async with acquire_mlx_gate(Priority.FOREGROUND): log.append("fg") await asyncio.sleep(0.01) # 동시 enqueue bg_holder_started = asyncio.Event() bg_holder_release = asyncio.Event() async def bg_holder(): async with acquire_mlx_gate(Priority.BACKGROUND): bg_holder_started.set() await bg_holder_release.wait() t_h = asyncio.create_task(bg_holder()) await bg_holder_started.wait() t_legacy = asyncio.create_task(legacy_caller()) await asyncio.sleep(0.005) t_fg = asyncio.create_task(fg_caller()) await asyncio.sleep(0.01) bg_holder_release.set() await asyncio.gather(t_h, t_legacy, t_fg) # legacy 는 BACKGROUND 매핑이므로 fg 가 먼저 (priority order) assert log == ["fg", "legacy"], f"legacy 가 FOREGROUND 로 처리됨? {log}" # ─── Scenario 6: Cancelled waiter skip ──────────────────────────────────── @pytest.mark.asyncio async def test_cancelled_waiter_skip(): """bg1 inflight → bg2 enqueue → fg1 enqueue → bg2 cancel → bg1 release → 다음 dispatch 가 fg1 (죽은 bg2 entry skip). """ log: list = [] bg1_release = asyncio.Event() async def bg1(): async with acquire_mlx_gate(Priority.BACKGROUND): log.append("bg1") await bg1_release.wait() async def bg2(): async with acquire_mlx_gate(Priority.BACKGROUND): log.append("bg2") # 도달 안 해야 함 (cancel) async def fg1(): async with acquire_mlx_gate(Priority.FOREGROUND): log.append("fg1") t_bg1 = asyncio.create_task(bg1()) await asyncio.sleep(0.01) # bg1 fast-path inflight t_bg2 = asyncio.create_task(bg2()) await asyncio.sleep(0.01) # bg2 enqueue 대기 t_fg1 = asyncio.create_task(fg1()) await asyncio.sleep(0.01) # fg1 enqueue 대기 # bg2 cancel — Future 가 cancelled 상태로 heap 잔존 t_bg2.cancel() try: await t_bg2 except asyncio.CancelledError: pass # bg1 release — 다음 dispatch 는 fg1 (bg2 죽은 entry skip) bg1_release.set() await asyncio.gather(t_bg1, t_fg1) # bg2 도달 안 함 (cancelled), 순서 = bg1, fg1 assert log == ["bg1", "fg1"], ( f"cancelled bg2 가 dispatch 됐거나 gate stuck: {log}" )