Files
hyungi_document_server/tests/test_priority_gate.py
hyungi 235bbf9881 ops(pipeline): fair-share 번들 — drain classify 합류 + deep 맥미니 폴백 + mlx 게이트 동시 2
사용자 '공평하게 동일한 작업' 지적의 비대칭 잔재 2건 + 예고된 배칭 레버:
- queue_drain --stage classify (use_deep: deep 슬롯 endpoint + triage sampling,
  완료 시 enqueue_next_stage 로 embed/chunk/markdown 연쇄 — DAG 단절 방지)
- deep_summary consumer = 맥북 우선, 불가 시 맥미니 primary 즉시 처리(동일 모델 —
  강등 아님). drain 은 defer_on_deep_unavailable=True 로 기존 보류-종료 유지
- llm_gate capacity 일반화 (config pipeline.mlx_gate_concurrency, 기본 1, 운영 2) —
  'MLX_CONCURRENCY=1 고정' 영구 룰의 전제(single-inference 서버) 소멸을 docstring 에 개정 박제
- analyze_events FK(users) CLI 컨텍스트 INSERT 실패 fix (models.user 명시 import)

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-06-12 06:56:02 +09:00

288 lines
9.3 KiB
Python

"""DS-Mac-mini-26B-Priority-Gate-1 (B-1) unit smoke — 6 scenario.
heap + inflight 기반 MLX gate 의 ordering/cancellation/fast-path 검증. 실 LLM
호출 없이 gate 자체의 acquire/release/dispatch 만 시뮬레이션.
"""
from __future__ import annotations
import asyncio
import logging
import pytest
from services.search.llm_gate import (
Priority,
_reset_for_test,
acquire_mlx_gate,
get_mlx_gate,
)
@pytest.fixture(autouse=True)
def _reset_gate(monkeypatch):
"""각 테스트 시작 시 gate 상태 reset (fresh event loop 마다).
2026-06-12 capacity 일반화 이후 본 파일의 직렬화 가정 보존을 위해
concurrency=1 로 고정 (capacity>1 동작은 test_fair_share.py 가 커버).
"""
from core.config import settings
monkeypatch.setattr(settings, "mlx_gate_concurrency", 1)
_reset_for_test()
yield
_reset_for_test()
async def _hold(priority: Priority, duration_s: float, log: list, label: str):
"""단순 gate holder — acquire 후 sleep 후 release."""
async with acquire_mlx_gate(priority):
log.append(("acquired", label))
await asyncio.sleep(duration_s)
log.append(("released", label))
# ─── Scenario 1: FIFO within same priority ────────────────────────────────
@pytest.mark.asyncio
async def test_fifo_within_same_priority():
"""BACKGROUND 3건 직렬 enqueue → 순서대로 dispatch (1→2→3)."""
log: list = []
async def acquirer(label: str, started: asyncio.Event):
async with acquire_mlx_gate(Priority.BACKGROUND):
log.append(label)
started.set()
await asyncio.sleep(0.05)
e1, e2, e3 = asyncio.Event(), asyncio.Event(), asyncio.Event()
t1 = asyncio.create_task(acquirer("bg1", e1))
await asyncio.sleep(0.001) # bg1 이 fast-path 먼저
t2 = asyncio.create_task(acquirer("bg2", e2))
await asyncio.sleep(0.001)
t3 = asyncio.create_task(acquirer("bg3", e3))
await asyncio.gather(t1, t2, t3)
assert log == ["bg1", "bg2", "bg3"], f"FIFO 위반: {log}"
# ─── Scenario 2: Foreground jumps queue ───────────────────────────────────
@pytest.mark.asyncio
async def test_foreground_jumps_queue():
"""BACKGROUND 5건 enqueue (1 inflight, 2~5 대기) → FOREGROUND 1건 enqueue
→ 1번 release 후 다음은 FOREGROUND (2~5 모두 뒤로).
"""
log: list = []
bg_release_signals = [asyncio.Event() for _ in range(5)]
fg_release_signal = asyncio.Event()
async def bg(i: int):
async with acquire_mlx_gate(Priority.BACKGROUND):
log.append(f"bg{i}")
await bg_release_signals[i].wait()
async def fg():
async with acquire_mlx_gate(Priority.FOREGROUND):
log.append("fg")
await fg_release_signal.wait()
# bg0 fast-path inflight
t_bg0 = asyncio.create_task(bg(0))
await asyncio.sleep(0.01) # bg0 이 inflight 확실해질 때까지
# bg1~4 enqueue (대기)
t_bgs = [asyncio.create_task(bg(i)) for i in range(1, 5)]
await asyncio.sleep(0.01)
# fg enqueue (대기) — 모든 background 보다 앞으로 jump 해야 함
t_fg = asyncio.create_task(fg())
await asyncio.sleep(0.01)
# bg0 release → 다음 dispatch 는 FG
bg_release_signals[0].set()
await asyncio.sleep(0.05)
assert log == ["bg0", "fg"], f"FG jump 위반 (bg1 먼저 들어옴?): {log}"
# fg release → bg1
fg_release_signal.set()
await asyncio.sleep(0.05)
assert log == ["bg0", "fg", "bg1"], f"FIFO 위반: {log}"
# 나머지 풀어줌
for i in range(1, 5):
bg_release_signals[i].set()
await asyncio.gather(t_bg0, *t_bgs, t_fg)
assert log == ["bg0", "fg", "bg1", "bg2", "bg3", "bg4"], (
f"최종 순서: {log}"
)
# ─── Scenario 3: Long-running background blocks foreground (intended) ────
@pytest.mark.asyncio
async def test_long_running_background_blocks_foreground():
"""이미 inflight 인 background 는 끊지 않는다 (preemption X). foreground 들어와도
현재 점유 background 의 남은 시간만큼 대기.
"""
log: list = []
async def bg():
async with acquire_mlx_gate(Priority.BACKGROUND):
log.append(("bg_start", asyncio.get_event_loop().time()))
await asyncio.sleep(0.3)
log.append(("bg_end", asyncio.get_event_loop().time()))
async def fg():
await asyncio.sleep(0.05) # bg 가 inflight 진입한 후 fg 시도
async with acquire_mlx_gate(Priority.FOREGROUND):
log.append(("fg_start", asyncio.get_event_loop().time()))
t_bg = asyncio.create_task(bg())
t_fg = asyncio.create_task(fg())
await asyncio.gather(t_bg, t_fg)
events = {label: t for label, t in log}
# fg 는 bg_end 이후에만 시작 가능
assert events["fg_start"] >= events["bg_end"] - 0.01, (
f"preemption 발생? fg_start={events['fg_start']} bg_end={events['bg_end']}"
)
# ─── Scenario 4: Mixed concurrent enqueue (FG FIFO 먼저, BG FIFO 후) ──
@pytest.mark.asyncio
async def test_mixed_concurrent_enqueue():
"""2 FOREGROUND + 3 BACKGROUND 가 거의 동시 → FG fifo 먼저, BG fifo 후."""
log: list = []
holder_started = asyncio.Event()
holder_release = asyncio.Event()
async def holder():
async with acquire_mlx_gate(Priority.BACKGROUND):
holder_started.set()
await holder_release.wait()
async def fg(i: int):
async with acquire_mlx_gate(Priority.FOREGROUND):
log.append(f"fg{i}")
await asyncio.sleep(0.005)
async def bg(i: int):
async with acquire_mlx_gate(Priority.BACKGROUND):
log.append(f"bg{i}")
await asyncio.sleep(0.005)
# holder 가 fast-path 점유
t_h = asyncio.create_task(holder())
await holder_started.wait()
# 거의 동시 enqueue (gather)
tasks = [
asyncio.create_task(fg(0)),
asyncio.create_task(bg(0)),
asyncio.create_task(fg(1)),
asyncio.create_task(bg(1)),
asyncio.create_task(bg(2)),
]
await asyncio.sleep(0.01) # 모두 enqueue 까지 진행
# holder release → priority 순서대로 dispatch
holder_release.set()
await asyncio.gather(t_h, *tasks)
# fg0, fg1 가 먼저 (FG FIFO) → bg0, bg1, bg2 (BG FIFO)
assert log == ["fg0", "fg1", "bg0", "bg1", "bg2"], f"priority order 위반: {log}"
# ─── Scenario 5: Backward compat — async with get_mlx_gate() ──────────────
@pytest.mark.asyncio
async def test_backward_compat_get_mlx_gate():
"""`async with get_mlx_gate():` legacy 형태가 BACKGROUND priority 로 매핑."""
log: list = []
async def legacy_caller():
async with get_mlx_gate():
log.append("legacy")
await asyncio.sleep(0.01)
async def fg_caller():
async with acquire_mlx_gate(Priority.FOREGROUND):
log.append("fg")
await asyncio.sleep(0.01)
# 동시 enqueue
bg_holder_started = asyncio.Event()
bg_holder_release = asyncio.Event()
async def bg_holder():
async with acquire_mlx_gate(Priority.BACKGROUND):
bg_holder_started.set()
await bg_holder_release.wait()
t_h = asyncio.create_task(bg_holder())
await bg_holder_started.wait()
t_legacy = asyncio.create_task(legacy_caller())
await asyncio.sleep(0.005)
t_fg = asyncio.create_task(fg_caller())
await asyncio.sleep(0.01)
bg_holder_release.set()
await asyncio.gather(t_h, t_legacy, t_fg)
# legacy 는 BACKGROUND 매핑이므로 fg 가 먼저 (priority order)
assert log == ["fg", "legacy"], f"legacy 가 FOREGROUND 로 처리됨? {log}"
# ─── Scenario 6: Cancelled waiter skip ────────────────────────────────────
@pytest.mark.asyncio
async def test_cancelled_waiter_skip():
"""bg1 inflight → bg2 enqueue → fg1 enqueue → bg2 cancel → bg1 release →
다음 dispatch 가 fg1 (죽은 bg2 entry skip).
"""
log: list = []
bg1_release = asyncio.Event()
async def bg1():
async with acquire_mlx_gate(Priority.BACKGROUND):
log.append("bg1")
await bg1_release.wait()
async def bg2():
async with acquire_mlx_gate(Priority.BACKGROUND):
log.append("bg2") # 도달 안 해야 함 (cancel)
async def fg1():
async with acquire_mlx_gate(Priority.FOREGROUND):
log.append("fg1")
t_bg1 = asyncio.create_task(bg1())
await asyncio.sleep(0.01) # bg1 fast-path inflight
t_bg2 = asyncio.create_task(bg2())
await asyncio.sleep(0.01) # bg2 enqueue 대기
t_fg1 = asyncio.create_task(fg1())
await asyncio.sleep(0.01) # fg1 enqueue 대기
# bg2 cancel — Future 가 cancelled 상태로 heap 잔존
t_bg2.cancel()
try:
await t_bg2
except asyncio.CancelledError:
pass
# bg1 release — 다음 dispatch 는 fg1 (bg2 죽은 entry skip)
bg1_release.set()
await asyncio.gather(t_bg1, t_fg1)
# bg2 도달 안 함 (cancelled), 순서 = bg1, fg1
assert log == ["bg1", "fg1"], (
f"cancelled bg2 가 dispatch 됐거나 gate stuck: {log}"
)