235bbf9881
사용자 '공평하게 동일한 작업' 지적의 비대칭 잔재 2건 + 예고된 배칭 레버: - queue_drain --stage classify (use_deep: deep 슬롯 endpoint + triage sampling, 완료 시 enqueue_next_stage 로 embed/chunk/markdown 연쇄 — DAG 단절 방지) - deep_summary consumer = 맥북 우선, 불가 시 맥미니 primary 즉시 처리(동일 모델 — 강등 아님). drain 은 defer_on_deep_unavailable=True 로 기존 보류-종료 유지 - llm_gate capacity 일반화 (config pipeline.mlx_gate_concurrency, 기본 1, 운영 2) — 'MLX_CONCURRENCY=1 고정' 영구 룰의 전제(single-inference 서버) 소멸을 docstring 에 개정 박제 - analyze_events FK(users) CLI 컨텍스트 INSERT 실패 fix (models.user 명시 import) Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
288 lines
9.3 KiB
Python
288 lines
9.3 KiB
Python
"""DS-Mac-mini-26B-Priority-Gate-1 (B-1) unit smoke — 6 scenario.
|
|
|
|
heap + inflight 기반 MLX gate 의 ordering/cancellation/fast-path 검증. 실 LLM
|
|
호출 없이 gate 자체의 acquire/release/dispatch 만 시뮬레이션.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import logging
|
|
|
|
import pytest
|
|
|
|
from services.search.llm_gate import (
|
|
Priority,
|
|
_reset_for_test,
|
|
acquire_mlx_gate,
|
|
get_mlx_gate,
|
|
)
|
|
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def _reset_gate(monkeypatch):
|
|
"""각 테스트 시작 시 gate 상태 reset (fresh event loop 마다).
|
|
|
|
2026-06-12 capacity 일반화 이후 본 파일의 직렬화 가정 보존을 위해
|
|
concurrency=1 로 고정 (capacity>1 동작은 test_fair_share.py 가 커버).
|
|
"""
|
|
from core.config import settings
|
|
monkeypatch.setattr(settings, "mlx_gate_concurrency", 1)
|
|
_reset_for_test()
|
|
yield
|
|
_reset_for_test()
|
|
|
|
|
|
async def _hold(priority: Priority, duration_s: float, log: list, label: str):
|
|
"""단순 gate holder — acquire 후 sleep 후 release."""
|
|
async with acquire_mlx_gate(priority):
|
|
log.append(("acquired", label))
|
|
await asyncio.sleep(duration_s)
|
|
log.append(("released", label))
|
|
|
|
|
|
# ─── Scenario 1: FIFO within same priority ────────────────────────────────
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_fifo_within_same_priority():
|
|
"""BACKGROUND 3건 직렬 enqueue → 순서대로 dispatch (1→2→3)."""
|
|
log: list = []
|
|
|
|
async def acquirer(label: str, started: asyncio.Event):
|
|
async with acquire_mlx_gate(Priority.BACKGROUND):
|
|
log.append(label)
|
|
started.set()
|
|
await asyncio.sleep(0.05)
|
|
|
|
e1, e2, e3 = asyncio.Event(), asyncio.Event(), asyncio.Event()
|
|
t1 = asyncio.create_task(acquirer("bg1", e1))
|
|
await asyncio.sleep(0.001) # bg1 이 fast-path 먼저
|
|
t2 = asyncio.create_task(acquirer("bg2", e2))
|
|
await asyncio.sleep(0.001)
|
|
t3 = asyncio.create_task(acquirer("bg3", e3))
|
|
|
|
await asyncio.gather(t1, t2, t3)
|
|
assert log == ["bg1", "bg2", "bg3"], f"FIFO 위반: {log}"
|
|
|
|
|
|
# ─── Scenario 2: Foreground jumps queue ───────────────────────────────────
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_foreground_jumps_queue():
|
|
"""BACKGROUND 5건 enqueue (1 inflight, 2~5 대기) → FOREGROUND 1건 enqueue
|
|
→ 1번 release 후 다음은 FOREGROUND (2~5 모두 뒤로).
|
|
"""
|
|
log: list = []
|
|
bg_release_signals = [asyncio.Event() for _ in range(5)]
|
|
fg_release_signal = asyncio.Event()
|
|
|
|
async def bg(i: int):
|
|
async with acquire_mlx_gate(Priority.BACKGROUND):
|
|
log.append(f"bg{i}")
|
|
await bg_release_signals[i].wait()
|
|
|
|
async def fg():
|
|
async with acquire_mlx_gate(Priority.FOREGROUND):
|
|
log.append("fg")
|
|
await fg_release_signal.wait()
|
|
|
|
# bg0 fast-path inflight
|
|
t_bg0 = asyncio.create_task(bg(0))
|
|
await asyncio.sleep(0.01) # bg0 이 inflight 확실해질 때까지
|
|
# bg1~4 enqueue (대기)
|
|
t_bgs = [asyncio.create_task(bg(i)) for i in range(1, 5)]
|
|
await asyncio.sleep(0.01)
|
|
# fg enqueue (대기) — 모든 background 보다 앞으로 jump 해야 함
|
|
t_fg = asyncio.create_task(fg())
|
|
await asyncio.sleep(0.01)
|
|
|
|
# bg0 release → 다음 dispatch 는 FG
|
|
bg_release_signals[0].set()
|
|
await asyncio.sleep(0.05)
|
|
assert log == ["bg0", "fg"], f"FG jump 위반 (bg1 먼저 들어옴?): {log}"
|
|
|
|
# fg release → bg1
|
|
fg_release_signal.set()
|
|
await asyncio.sleep(0.05)
|
|
assert log == ["bg0", "fg", "bg1"], f"FIFO 위반: {log}"
|
|
|
|
# 나머지 풀어줌
|
|
for i in range(1, 5):
|
|
bg_release_signals[i].set()
|
|
await asyncio.gather(t_bg0, *t_bgs, t_fg)
|
|
|
|
assert log == ["bg0", "fg", "bg1", "bg2", "bg3", "bg4"], (
|
|
f"최종 순서: {log}"
|
|
)
|
|
|
|
|
|
# ─── Scenario 3: Long-running background blocks foreground (intended) ────
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_long_running_background_blocks_foreground():
|
|
"""이미 inflight 인 background 는 끊지 않는다 (preemption X). foreground 들어와도
|
|
현재 점유 background 의 남은 시간만큼 대기.
|
|
"""
|
|
log: list = []
|
|
|
|
async def bg():
|
|
async with acquire_mlx_gate(Priority.BACKGROUND):
|
|
log.append(("bg_start", asyncio.get_event_loop().time()))
|
|
await asyncio.sleep(0.3)
|
|
log.append(("bg_end", asyncio.get_event_loop().time()))
|
|
|
|
async def fg():
|
|
await asyncio.sleep(0.05) # bg 가 inflight 진입한 후 fg 시도
|
|
async with acquire_mlx_gate(Priority.FOREGROUND):
|
|
log.append(("fg_start", asyncio.get_event_loop().time()))
|
|
|
|
t_bg = asyncio.create_task(bg())
|
|
t_fg = asyncio.create_task(fg())
|
|
await asyncio.gather(t_bg, t_fg)
|
|
|
|
events = {label: t for label, t in log}
|
|
# fg 는 bg_end 이후에만 시작 가능
|
|
assert events["fg_start"] >= events["bg_end"] - 0.01, (
|
|
f"preemption 발생? fg_start={events['fg_start']} bg_end={events['bg_end']}"
|
|
)
|
|
|
|
|
|
# ─── Scenario 4: Mixed concurrent enqueue (FG FIFO 먼저, BG FIFO 후) ──
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_mixed_concurrent_enqueue():
|
|
"""2 FOREGROUND + 3 BACKGROUND 가 거의 동시 → FG fifo 먼저, BG fifo 후."""
|
|
log: list = []
|
|
holder_started = asyncio.Event()
|
|
holder_release = asyncio.Event()
|
|
|
|
async def holder():
|
|
async with acquire_mlx_gate(Priority.BACKGROUND):
|
|
holder_started.set()
|
|
await holder_release.wait()
|
|
|
|
async def fg(i: int):
|
|
async with acquire_mlx_gate(Priority.FOREGROUND):
|
|
log.append(f"fg{i}")
|
|
await asyncio.sleep(0.005)
|
|
|
|
async def bg(i: int):
|
|
async with acquire_mlx_gate(Priority.BACKGROUND):
|
|
log.append(f"bg{i}")
|
|
await asyncio.sleep(0.005)
|
|
|
|
# holder 가 fast-path 점유
|
|
t_h = asyncio.create_task(holder())
|
|
await holder_started.wait()
|
|
|
|
# 거의 동시 enqueue (gather)
|
|
tasks = [
|
|
asyncio.create_task(fg(0)),
|
|
asyncio.create_task(bg(0)),
|
|
asyncio.create_task(fg(1)),
|
|
asyncio.create_task(bg(1)),
|
|
asyncio.create_task(bg(2)),
|
|
]
|
|
await asyncio.sleep(0.01) # 모두 enqueue 까지 진행
|
|
|
|
# holder release → priority 순서대로 dispatch
|
|
holder_release.set()
|
|
await asyncio.gather(t_h, *tasks)
|
|
|
|
# fg0, fg1 가 먼저 (FG FIFO) → bg0, bg1, bg2 (BG FIFO)
|
|
assert log == ["fg0", "fg1", "bg0", "bg1", "bg2"], f"priority order 위반: {log}"
|
|
|
|
|
|
# ─── Scenario 5: Backward compat — async with get_mlx_gate() ──────────────
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_backward_compat_get_mlx_gate():
|
|
"""`async with get_mlx_gate():` legacy 형태가 BACKGROUND priority 로 매핑."""
|
|
log: list = []
|
|
|
|
async def legacy_caller():
|
|
async with get_mlx_gate():
|
|
log.append("legacy")
|
|
await asyncio.sleep(0.01)
|
|
|
|
async def fg_caller():
|
|
async with acquire_mlx_gate(Priority.FOREGROUND):
|
|
log.append("fg")
|
|
await asyncio.sleep(0.01)
|
|
|
|
# 동시 enqueue
|
|
bg_holder_started = asyncio.Event()
|
|
bg_holder_release = asyncio.Event()
|
|
|
|
async def bg_holder():
|
|
async with acquire_mlx_gate(Priority.BACKGROUND):
|
|
bg_holder_started.set()
|
|
await bg_holder_release.wait()
|
|
|
|
t_h = asyncio.create_task(bg_holder())
|
|
await bg_holder_started.wait()
|
|
|
|
t_legacy = asyncio.create_task(legacy_caller())
|
|
await asyncio.sleep(0.005)
|
|
t_fg = asyncio.create_task(fg_caller())
|
|
await asyncio.sleep(0.01)
|
|
|
|
bg_holder_release.set()
|
|
await asyncio.gather(t_h, t_legacy, t_fg)
|
|
|
|
# legacy 는 BACKGROUND 매핑이므로 fg 가 먼저 (priority order)
|
|
assert log == ["fg", "legacy"], f"legacy 가 FOREGROUND 로 처리됨? {log}"
|
|
|
|
|
|
# ─── Scenario 6: Cancelled waiter skip ────────────────────────────────────
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_cancelled_waiter_skip():
|
|
"""bg1 inflight → bg2 enqueue → fg1 enqueue → bg2 cancel → bg1 release →
|
|
다음 dispatch 가 fg1 (죽은 bg2 entry skip).
|
|
"""
|
|
log: list = []
|
|
bg1_release = asyncio.Event()
|
|
|
|
async def bg1():
|
|
async with acquire_mlx_gate(Priority.BACKGROUND):
|
|
log.append("bg1")
|
|
await bg1_release.wait()
|
|
|
|
async def bg2():
|
|
async with acquire_mlx_gate(Priority.BACKGROUND):
|
|
log.append("bg2") # 도달 안 해야 함 (cancel)
|
|
|
|
async def fg1():
|
|
async with acquire_mlx_gate(Priority.FOREGROUND):
|
|
log.append("fg1")
|
|
|
|
t_bg1 = asyncio.create_task(bg1())
|
|
await asyncio.sleep(0.01) # bg1 fast-path inflight
|
|
t_bg2 = asyncio.create_task(bg2())
|
|
await asyncio.sleep(0.01) # bg2 enqueue 대기
|
|
t_fg1 = asyncio.create_task(fg1())
|
|
await asyncio.sleep(0.01) # fg1 enqueue 대기
|
|
|
|
# bg2 cancel — Future 가 cancelled 상태로 heap 잔존
|
|
t_bg2.cancel()
|
|
try:
|
|
await t_bg2
|
|
except asyncio.CancelledError:
|
|
pass
|
|
|
|
# bg1 release — 다음 dispatch 는 fg1 (bg2 죽은 entry skip)
|
|
bg1_release.set()
|
|
await asyncio.gather(t_bg1, t_fg1)
|
|
|
|
# bg2 도달 안 함 (cancelled), 순서 = bg1, fg1
|
|
assert log == ["bg1", "fg1"], (
|
|
f"cancelled bg2 가 dispatch 됐거나 gate stuck: {log}"
|
|
)
|