Files
hyungi_document_server/tests/test_priority_gate.py
Hyungi Ahn 7c9aff393a feat(search): MLX priority gate (B-1, Priority.FOREGROUND vs BACKGROUND)
DS-Mac-mini-26B-Priority-Gate-1 — Mac mini 26B single-inference gate 를
FIFO Semaphore → 우선순위 기반 heap dispatch 로 교체. concurrency 1 유지,
queue ordering 만 foreground 우선.

API:
- Priority(IntEnum): FOREGROUND=0, BACKGROUND=100
- acquire_mlx_gate(priority=DEFAULT_PRIORITY) async context manager
- DEFAULT_PRIORITY = BACKGROUND (안전 default, foreground 짓밟지 않음)
- get_mlx_gate() legacy wrapper — context-manager only 호환

구현:
- _inflight: bool + _waiters heap [(priority, seq, future, enqueue_ts)]
- fast-path: not inflight and not waiters → 즉시 inflight, Future 생성 X
- _dispatch_next_locked: cancelled/done Future skip (heap 잔재 risk 회피)
- release: lock 안에서 pop, set_result 는 loop.call_soon (lock 밖) reentry deadlock 회피
- dispatch / enqueue / release / WARN log (observability)
- BACKGROUND wait_ms > 300_000 (5분) 시 starvation WARN — aging 은 Phase 2 deferred

Tests (tests/test_priority_gate.py, 6 scenario):
1. FIFO within same priority
2. Foreground jumps queue (bg5 대기 중 fg 들어오면 즉시 다음 슬롯)
3. Long-running background blocks foreground (preemption X, intended)
4. Mixed concurrent enqueue (FG fifo 먼저, BG fifo 후)
5. Backward compat (legacy get_mlx_gate() = BACKGROUND 매핑)
6. Cancelled waiter skip (heap 의 죽은 Future 건너뜀, gate stuck X)

Site 교체는 별 commit (refactor(search): swap 10 call sites).

plan: ~/.claude/plans/hermes-polymorphic-rossum.md
2026-05-17 08:42:58 +09:00

282 lines
9.0 KiB
Python

"""DS-Mac-mini-26B-Priority-Gate-1 (B-1) unit smoke — 6 scenario.
heap + inflight 기반 MLX gate 의 ordering/cancellation/fast-path 검증. 실 LLM
호출 없이 gate 자체의 acquire/release/dispatch 만 시뮬레이션.
"""
from __future__ import annotations
import asyncio
import logging
import pytest
from services.search.llm_gate import (
Priority,
_reset_for_test,
acquire_mlx_gate,
get_mlx_gate,
)
@pytest.fixture(autouse=True)
def _reset_gate():
"""각 테스트 시작 시 gate 상태 reset (fresh event loop 마다)."""
_reset_for_test()
yield
_reset_for_test()
async def _hold(priority: Priority, duration_s: float, log: list, label: str):
"""단순 gate holder — acquire 후 sleep 후 release."""
async with acquire_mlx_gate(priority):
log.append(("acquired", label))
await asyncio.sleep(duration_s)
log.append(("released", label))
# ─── Scenario 1: FIFO within same priority ────────────────────────────────
@pytest.mark.asyncio
async def test_fifo_within_same_priority():
"""BACKGROUND 3건 직렬 enqueue → 순서대로 dispatch (1→2→3)."""
log: list = []
async def acquirer(label: str, started: asyncio.Event):
async with acquire_mlx_gate(Priority.BACKGROUND):
log.append(label)
started.set()
await asyncio.sleep(0.05)
e1, e2, e3 = asyncio.Event(), asyncio.Event(), asyncio.Event()
t1 = asyncio.create_task(acquirer("bg1", e1))
await asyncio.sleep(0.001) # bg1 이 fast-path 먼저
t2 = asyncio.create_task(acquirer("bg2", e2))
await asyncio.sleep(0.001)
t3 = asyncio.create_task(acquirer("bg3", e3))
await asyncio.gather(t1, t2, t3)
assert log == ["bg1", "bg2", "bg3"], f"FIFO 위반: {log}"
# ─── Scenario 2: Foreground jumps queue ───────────────────────────────────
@pytest.mark.asyncio
async def test_foreground_jumps_queue():
"""BACKGROUND 5건 enqueue (1 inflight, 2~5 대기) → FOREGROUND 1건 enqueue
→ 1번 release 후 다음은 FOREGROUND (2~5 모두 뒤로).
"""
log: list = []
bg_release_signals = [asyncio.Event() for _ in range(5)]
fg_release_signal = asyncio.Event()
async def bg(i: int):
async with acquire_mlx_gate(Priority.BACKGROUND):
log.append(f"bg{i}")
await bg_release_signals[i].wait()
async def fg():
async with acquire_mlx_gate(Priority.FOREGROUND):
log.append("fg")
await fg_release_signal.wait()
# bg0 fast-path inflight
t_bg0 = asyncio.create_task(bg(0))
await asyncio.sleep(0.01) # bg0 이 inflight 확실해질 때까지
# bg1~4 enqueue (대기)
t_bgs = [asyncio.create_task(bg(i)) for i in range(1, 5)]
await asyncio.sleep(0.01)
# fg enqueue (대기) — 모든 background 보다 앞으로 jump 해야 함
t_fg = asyncio.create_task(fg())
await asyncio.sleep(0.01)
# bg0 release → 다음 dispatch 는 FG
bg_release_signals[0].set()
await asyncio.sleep(0.05)
assert log == ["bg0", "fg"], f"FG jump 위반 (bg1 먼저 들어옴?): {log}"
# fg release → bg1
fg_release_signal.set()
await asyncio.sleep(0.05)
assert log == ["bg0", "fg", "bg1"], f"FIFO 위반: {log}"
# 나머지 풀어줌
for i in range(1, 5):
bg_release_signals[i].set()
await asyncio.gather(t_bg0, *t_bgs, t_fg)
assert log == ["bg0", "fg", "bg1", "bg2", "bg3", "bg4"], (
f"최종 순서: {log}"
)
# ─── Scenario 3: Long-running background blocks foreground (intended) ────
@pytest.mark.asyncio
async def test_long_running_background_blocks_foreground():
"""이미 inflight 인 background 는 끊지 않는다 (preemption X). foreground 들어와도
현재 점유 background 의 남은 시간만큼 대기.
"""
log: list = []
async def bg():
async with acquire_mlx_gate(Priority.BACKGROUND):
log.append(("bg_start", asyncio.get_event_loop().time()))
await asyncio.sleep(0.3)
log.append(("bg_end", asyncio.get_event_loop().time()))
async def fg():
await asyncio.sleep(0.05) # bg 가 inflight 진입한 후 fg 시도
async with acquire_mlx_gate(Priority.FOREGROUND):
log.append(("fg_start", asyncio.get_event_loop().time()))
t_bg = asyncio.create_task(bg())
t_fg = asyncio.create_task(fg())
await asyncio.gather(t_bg, t_fg)
events = {label: t for label, t in log}
# fg 는 bg_end 이후에만 시작 가능
assert events["fg_start"] >= events["bg_end"] - 0.01, (
f"preemption 발생? fg_start={events['fg_start']} bg_end={events['bg_end']}"
)
# ─── Scenario 4: Mixed concurrent enqueue (FG FIFO 먼저, BG FIFO 후) ──
@pytest.mark.asyncio
async def test_mixed_concurrent_enqueue():
"""2 FOREGROUND + 3 BACKGROUND 가 거의 동시 → FG fifo 먼저, BG fifo 후."""
log: list = []
holder_started = asyncio.Event()
holder_release = asyncio.Event()
async def holder():
async with acquire_mlx_gate(Priority.BACKGROUND):
holder_started.set()
await holder_release.wait()
async def fg(i: int):
async with acquire_mlx_gate(Priority.FOREGROUND):
log.append(f"fg{i}")
await asyncio.sleep(0.005)
async def bg(i: int):
async with acquire_mlx_gate(Priority.BACKGROUND):
log.append(f"bg{i}")
await asyncio.sleep(0.005)
# holder 가 fast-path 점유
t_h = asyncio.create_task(holder())
await holder_started.wait()
# 거의 동시 enqueue (gather)
tasks = [
asyncio.create_task(fg(0)),
asyncio.create_task(bg(0)),
asyncio.create_task(fg(1)),
asyncio.create_task(bg(1)),
asyncio.create_task(bg(2)),
]
await asyncio.sleep(0.01) # 모두 enqueue 까지 진행
# holder release → priority 순서대로 dispatch
holder_release.set()
await asyncio.gather(t_h, *tasks)
# fg0, fg1 가 먼저 (FG FIFO) → bg0, bg1, bg2 (BG FIFO)
assert log == ["fg0", "fg1", "bg0", "bg1", "bg2"], f"priority order 위반: {log}"
# ─── Scenario 5: Backward compat — async with get_mlx_gate() ──────────────
@pytest.mark.asyncio
async def test_backward_compat_get_mlx_gate():
"""`async with get_mlx_gate():` legacy 형태가 BACKGROUND priority 로 매핑."""
log: list = []
async def legacy_caller():
async with get_mlx_gate():
log.append("legacy")
await asyncio.sleep(0.01)
async def fg_caller():
async with acquire_mlx_gate(Priority.FOREGROUND):
log.append("fg")
await asyncio.sleep(0.01)
# 동시 enqueue
bg_holder_started = asyncio.Event()
bg_holder_release = asyncio.Event()
async def bg_holder():
async with acquire_mlx_gate(Priority.BACKGROUND):
bg_holder_started.set()
await bg_holder_release.wait()
t_h = asyncio.create_task(bg_holder())
await bg_holder_started.wait()
t_legacy = asyncio.create_task(legacy_caller())
await asyncio.sleep(0.005)
t_fg = asyncio.create_task(fg_caller())
await asyncio.sleep(0.01)
bg_holder_release.set()
await asyncio.gather(t_h, t_legacy, t_fg)
# legacy 는 BACKGROUND 매핑이므로 fg 가 먼저 (priority order)
assert log == ["fg", "legacy"], f"legacy 가 FOREGROUND 로 처리됨? {log}"
# ─── Scenario 6: Cancelled waiter skip ────────────────────────────────────
@pytest.mark.asyncio
async def test_cancelled_waiter_skip():
"""bg1 inflight → bg2 enqueue → fg1 enqueue → bg2 cancel → bg1 release →
다음 dispatch 가 fg1 (죽은 bg2 entry skip).
"""
log: list = []
bg1_release = asyncio.Event()
async def bg1():
async with acquire_mlx_gate(Priority.BACKGROUND):
log.append("bg1")
await bg1_release.wait()
async def bg2():
async with acquire_mlx_gate(Priority.BACKGROUND):
log.append("bg2") # 도달 안 해야 함 (cancel)
async def fg1():
async with acquire_mlx_gate(Priority.FOREGROUND):
log.append("fg1")
t_bg1 = asyncio.create_task(bg1())
await asyncio.sleep(0.01) # bg1 fast-path inflight
t_bg2 = asyncio.create_task(bg2())
await asyncio.sleep(0.01) # bg2 enqueue 대기
t_fg1 = asyncio.create_task(fg1())
await asyncio.sleep(0.01) # fg1 enqueue 대기
# bg2 cancel — Future 가 cancelled 상태로 heap 잔존
t_bg2.cancel()
try:
await t_bg2
except asyncio.CancelledError:
pass
# bg1 release — 다음 dispatch 는 fg1 (bg2 죽은 entry skip)
bg1_release.set()
await asyncio.gather(t_bg1, t_fg1)
# bg2 도달 안 함 (cancelled), 순서 = bg1, fg1
assert log == ["bg1", "fg1"], (
f"cancelled bg2 가 dispatch 됐거나 gate stuck: {log}"
)