feat(search): MLX priority gate (B-1, Priority.FOREGROUND vs BACKGROUND)

DS-Mac-mini-26B-Priority-Gate-1 — Mac mini 26B single-inference gate 를
FIFO Semaphore → 우선순위 기반 heap dispatch 로 교체. concurrency 1 유지,
queue ordering 만 foreground 우선.

API:
- Priority(IntEnum): FOREGROUND=0, BACKGROUND=100
- acquire_mlx_gate(priority=DEFAULT_PRIORITY) async context manager
- DEFAULT_PRIORITY = BACKGROUND (안전 default, foreground 짓밟지 않음)
- get_mlx_gate() legacy wrapper — context-manager only 호환

구현:
- _inflight: bool + _waiters heap [(priority, seq, future, enqueue_ts)]
- fast-path: not inflight and not waiters → 즉시 inflight, Future 생성 X
- _dispatch_next_locked: cancelled/done Future skip (heap 잔재 risk 회피)
- release: lock 안에서 pop, set_result 는 loop.call_soon (lock 밖) reentry deadlock 회피
- dispatch / enqueue / release / WARN log (observability)
- BACKGROUND wait_ms > 300_000 (5분) 시 starvation WARN — aging 은 Phase 2 deferred

Tests (tests/test_priority_gate.py, 6 scenario):
1. FIFO within same priority
2. Foreground jumps queue (bg5 대기 중 fg 들어오면 즉시 다음 슬롯)
3. Long-running background blocks foreground (preemption X, intended)
4. Mixed concurrent enqueue (FG fifo 먼저, BG fifo 후)
5. Backward compat (legacy get_mlx_gate() = BACKGROUND 매핑)
6. Cancelled waiter skip (heap 의 죽은 Future 건너뜀, gate stuck X)

Site 교체는 별 commit (refactor(search): swap 10 call sites).

plan: ~/.claude/plans/hermes-polymorphic-rossum.md
This commit is contained in:
Hyungi Ahn
2026-05-17 08:42:58 +09:00
parent 7e346d2d3f
commit 7c9aff393a
2 changed files with 473 additions and 23 deletions
+192 -23
View File
@@ -1,17 +1,17 @@
"""MLX single-inference 전역 gate (Phase 3.1.1).
"""MLX single-inference 전역 gate (Phase 3.1.1 + B-1 Priority Gate).
Mac mini MLX primary(gemma-4-26b-a4b-it-8bit)는 **single-inference**다.
동시 호출이 들어오면 queue가 폭발한다(실측: 23 concurrent 요청 → 22개 15초 timeout).
이 모듈은 analyzer / evidence / synthesis 등 **모든 MLX-bound LLM 호출**이
공유하는 `asyncio.Semaphore(1)`를 제공한다. MLX를 호출하는 경로는 예외 없이
`async with get_mlx_gate():` 블록 안에서만 `AIClient._call_chat(ai.primary, ...)`
를 호출해야 한다.
이 모듈은 analyzer / evidence / classifier / synthesis 등 **모든 MLX-bound LLM
호출**이 공유하는 **우선순위 기반 gate** 를 제공한다. concurrency 는 1 고정이지만
queue 의 ordering 은 `Priority.FOREGROUND` (user-facing ask) 가 `Priority.BACKGROUND`
(digest/briefing/worker) 보다 먼저 dispatch.
## 영구 룰
- **MLX primary 호출 경로는 예외 없이 gate 획득 필수**. query_analyzer /
evidence_service / synthesis_service 세 곳이 현재 사용자. 이후 경로가 늘어도
evidence / classifier / synthesis 4 곳이 현재 사용자. 이후 경로가 늘어도
동일 gate를 import해서 사용한다. 새 Semaphore를 만들지 말 것 (큐 분할 시
동시 실행 발생).
- **`asyncio.timeout(...)`은 gate 안쪽에서만 적용**. gate 대기 자체에 timeout을
@@ -22,37 +22,206 @@ Mac mini MLX primary(gemma-4-26b-a4b-it-8bit)는 **single-inference**다.
- **MLX concurrency는 `MLX_CONCURRENCY = 1` 고정**. 모델이 바뀌어도 single-
inference 특성이 깨지지 않는 한 이 값을 올리지 말 것.
## 확장 여지 (지금은 구현하지 않음)
## 우선순위 정책 (B-1, 2026-05-17)
트래픽 증가 시 "우선순위 역전"(/ask가 analyzer background task 뒤에 밀림)이
문제가 되면 `asyncio.PriorityQueue` 기반 우선순위 큐로 교체 가능. Gate 자체
분리(get_analyzer_gate / get_ask_gate)는 single-inference에서 throughput
개선이 없으므로 의미 없음.
- `Priority.FOREGROUND = 0`: user-facing path (`/api/search/ask`, 사용자 동기
API, Hermes orchestrator 경유). 가능한 빨리 dispatch.
- `Priority.BACKGROUND = 100`: digest / briefing / classify-escalate /
study_* worker / query_analyzer prewarm. foreground 가 비어 있을 때만 dispatch.
- **DEFAULT_PRIORITY = BACKGROUND**: priority 미지정 호출은 foreground 짓밟지
않는다 (안전 default).
- **preemption 없음**: 이미 inflight 인 background 는 끊지 않는다. foreground 가
들어와도 현재 점유 background 의 남은 시간만큼은 대기. 단 background 2~5
까지 줄 서있던 큐는 foreground 가 앞으로 jump.
- **starvation aging 없음** (Phase 2 deferred). 단 BACKGROUND wait_ms > 5분이면
WARN 로그 — 원인 추적 단서.
## 사용 예
```python
from services.search.llm_gate import acquire_mlx_gate, Priority
async def user_ask_path(...):
async with acquire_mlx_gate(Priority.FOREGROUND):
async with asyncio.timeout(30):
raw = await ai_client._call_chat(ai_client.ai.primary, prompt)
async def background_worker(...):
async with acquire_mlx_gate(Priority.BACKGROUND):
...
```
## 확장 여지
- aging (background 대기 시간 → priority boost) — Phase 2
- concurrency > 1 일반화 — B-2 (Throughput)
- 별 gate 분리 (`get_analyzer_gate` / `get_ask_gate`) — single-inference 에서
throughput 개선 없으므로 의미 없음 (PriorityQueue 안의 priority 만으로 충분)
"""
from __future__ import annotations
import asyncio
import heapq
import itertools
import time
from contextlib import asynccontextmanager
from enum import IntEnum
from typing import AsyncIterator
from core.utils import setup_logger
logger = setup_logger("llm_gate")
# MLX primary는 single-inference → 1
MLX_CONCURRENCY = 1
# 첫 호출 시 현재 event loop에 바인딩된 Semaphore 생성 (lazy init)
_mlx_gate: asyncio.Semaphore | None = None
# Background waiter wait_ms 가 이 값 초과 시 WARN (starvation 신호, aging mitigation 은 Phase 2)
STARVATION_WARN_MS = 300_000 # 5 min
def get_mlx_gate() -> asyncio.Semaphore:
"""MLX primary 호출 경로 공용 gate. 최초 호출 시 lazy init.
class Priority(IntEnum):
"""MLX gate dispatch 우선순위. 낮을수록 먼저 dispatch."""
FOREGROUND = 0
BACKGROUND = 100
DEFAULT_PRIORITY: Priority = Priority.BACKGROUND
# ── Internal state (lazy init on first acquire) ─────────────────────────────
# Tuple format: (priority: int, seq: int, future: asyncio.Future, enqueue_ts: float)
_waiters: list[tuple[int, int, asyncio.Future, float]] = []
_seq = itertools.count()
_inflight: bool = False
_lock: asyncio.Lock | None = None
def _get_lock() -> asyncio.Lock:
"""Lazy init Lock on the current event loop."""
global _lock
if _lock is None:
_lock = asyncio.Lock()
return _lock
def _dispatch_next_locked() -> asyncio.Future | None:
"""다음 살아있는 waiter 의 Future 를 pop 후 반환. cancelled/done 인 entry skip.
caller 는 lock 보유 상태에서 호출. 반환된 Future 의 set_result 는 lock 밖에서.
"""
while _waiters:
priority, seq, fut, enqueue_ts = heapq.heappop(_waiters)
if fut.cancelled() or fut.done():
continue # timeout/cancel 후 죽은 Future 건너뜀
return fut
return None
@asynccontextmanager
async def acquire_mlx_gate(
priority: Priority = DEFAULT_PRIORITY,
) -> AsyncIterator[None]:
"""우선순위 기반 MLX primary gate.
Args:
priority: Priority.FOREGROUND (user-facing) 또는 BACKGROUND (worker).
미지정 시 BACKGROUND (안전 default).
사용 예:
async with get_mlx_gate():
async with asyncio.timeout(LLM_TIMEOUT_MS / 1000):
async with acquire_mlx_gate(Priority.FOREGROUND):
async with asyncio.timeout(30):
raw = await ai_client._call_chat(ai_client.ai.primary, prompt)
⚠ `asyncio.timeout`은 반드시 gate 안쪽에 둘 것. 바깥에 두면 gate 대기만으로
timeout이 발동한다.
⚠ `asyncio.timeout` 은 반드시 gate 안쪽 (Future await 후) 에 둘 것.
"""
global _mlx_gate
if _mlx_gate is None:
_mlx_gate = asyncio.Semaphore(MLX_CONCURRENCY)
return _mlx_gate
global _inflight, _waiters
lock = _get_lock()
seq = next(_seq)
enqueue_ts = time.monotonic()
waited = False
fut: asyncio.Future | None = None
async with lock:
if not _inflight and not _waiters:
# fast path — 즉시 inflight 진입, Future 생성 안 함
_inflight = True
else:
# 대기열 진입
fut = asyncio.get_event_loop().create_future()
heapq.heappush(_waiters, (int(priority), seq, fut, enqueue_ts))
queue_len = len(_waiters)
logger.debug(
"mlx_gate enqueue priority=%s seq=%d queue_len=%d",
priority.name, seq, queue_len,
)
waited = True
if waited and fut is not None:
# lock 밖에서 await — release 가 lock 안에서 set_result 하면 reentry deadlock
await fut
# inflight 진입 — wait_ms 측정 + dispatch log + starvation WARN
wait_ms = (time.monotonic() - enqueue_ts) * 1000.0 if waited else 0.0
if waited:
async with lock:
queue_len_post = len(_waiters)
logger.info(
"mlx_gate dispatch priority=%s seq=%d wait_ms=%.0f queue_len=%d",
priority.name, seq, wait_ms, queue_len_post,
)
if priority == Priority.BACKGROUND and wait_ms > STARVATION_WARN_MS:
logger.warning(
"mlx_gate background waiter starved wait_ms=%.0f priority=%s seq=%d",
wait_ms, priority.name, seq,
)
inflight_start = time.monotonic()
try:
yield
finally:
duration_ms = (time.monotonic() - inflight_start) * 1000.0
next_fut: asyncio.Future | None = None
async with lock:
next_fut = _dispatch_next_locked()
if next_fut is None:
_inflight = False
# _inflight 는 True 유지 (다음 waiter 가 진입 예정)
logger.debug(
"mlx_gate release duration_ms=%.0f priority=%s seq=%d",
duration_ms, priority.name, seq,
)
if next_fut is not None:
# lock 밖에서 set_result — reentry deadlock 회피
loop = asyncio.get_event_loop()
loop.call_soon(next_fut.set_result, None)
# ── Backward compat: context-manager only wrapper ────────────────────────────
def get_mlx_gate():
"""Legacy wrapper — `async with get_mlx_gate():` 형태만 호환.
내부적으로 `acquire_mlx_gate(DEFAULT_PRIORITY)` (= BACKGROUND) 로 위임한다.
새 호출 site 는 `acquire_mlx_gate(Priority.FOREGROUND|BACKGROUND)` 명시 사용.
⚠ **Semaphore-like API 미지원** — `sem = get_mlx_gate(); await sem.acquire()`
같은 직접 acquire/release 패턴은 동작하지 않는다. 발견 시 호출 site 를
`async with acquire_mlx_gate(...)` 로 명시적 교체.
"""
return acquire_mlx_gate(DEFAULT_PRIORITY)
# ── Test helpers (conftest reset) ────────────────────────────────────────────
def _reset_for_test() -> None:
"""테스트 fixture 가 fresh loop 마다 호출. production code 에서 사용 X."""
global _waiters, _inflight, _lock, _seq
_waiters = []
_inflight = False
_lock = None
_seq = itertools.count()
+281
View File
@@ -0,0 +1,281 @@
"""DS-Mac-mini-26B-Priority-Gate-1 (B-1) unit smoke — 6 scenario.
heap + inflight 기반 MLX gate 의 ordering/cancellation/fast-path 검증. 실 LLM
호출 없이 gate 자체의 acquire/release/dispatch 만 시뮬레이션.
"""
from __future__ import annotations
import asyncio
import logging
import pytest
from services.search.llm_gate import (
Priority,
_reset_for_test,
acquire_mlx_gate,
get_mlx_gate,
)
@pytest.fixture(autouse=True)
def _reset_gate():
"""각 테스트 시작 시 gate 상태 reset (fresh event loop 마다)."""
_reset_for_test()
yield
_reset_for_test()
async def _hold(priority: Priority, duration_s: float, log: list, label: str):
"""단순 gate holder — acquire 후 sleep 후 release."""
async with acquire_mlx_gate(priority):
log.append(("acquired", label))
await asyncio.sleep(duration_s)
log.append(("released", label))
# ─── Scenario 1: FIFO within same priority ────────────────────────────────
@pytest.mark.asyncio
async def test_fifo_within_same_priority():
"""BACKGROUND 3건 직렬 enqueue → 순서대로 dispatch (1→2→3)."""
log: list = []
async def acquirer(label: str, started: asyncio.Event):
async with acquire_mlx_gate(Priority.BACKGROUND):
log.append(label)
started.set()
await asyncio.sleep(0.05)
e1, e2, e3 = asyncio.Event(), asyncio.Event(), asyncio.Event()
t1 = asyncio.create_task(acquirer("bg1", e1))
await asyncio.sleep(0.001) # bg1 이 fast-path 먼저
t2 = asyncio.create_task(acquirer("bg2", e2))
await asyncio.sleep(0.001)
t3 = asyncio.create_task(acquirer("bg3", e3))
await asyncio.gather(t1, t2, t3)
assert log == ["bg1", "bg2", "bg3"], f"FIFO 위반: {log}"
# ─── Scenario 2: Foreground jumps queue ───────────────────────────────────
@pytest.mark.asyncio
async def test_foreground_jumps_queue():
"""BACKGROUND 5건 enqueue (1 inflight, 2~5 대기) → FOREGROUND 1건 enqueue
→ 1번 release 후 다음은 FOREGROUND (2~5 모두 뒤로).
"""
log: list = []
bg_release_signals = [asyncio.Event() for _ in range(5)]
fg_release_signal = asyncio.Event()
async def bg(i: int):
async with acquire_mlx_gate(Priority.BACKGROUND):
log.append(f"bg{i}")
await bg_release_signals[i].wait()
async def fg():
async with acquire_mlx_gate(Priority.FOREGROUND):
log.append("fg")
await fg_release_signal.wait()
# bg0 fast-path inflight
t_bg0 = asyncio.create_task(bg(0))
await asyncio.sleep(0.01) # bg0 이 inflight 확실해질 때까지
# bg1~4 enqueue (대기)
t_bgs = [asyncio.create_task(bg(i)) for i in range(1, 5)]
await asyncio.sleep(0.01)
# fg enqueue (대기) — 모든 background 보다 앞으로 jump 해야 함
t_fg = asyncio.create_task(fg())
await asyncio.sleep(0.01)
# bg0 release → 다음 dispatch 는 FG
bg_release_signals[0].set()
await asyncio.sleep(0.05)
assert log == ["bg0", "fg"], f"FG jump 위반 (bg1 먼저 들어옴?): {log}"
# fg release → bg1
fg_release_signal.set()
await asyncio.sleep(0.05)
assert log == ["bg0", "fg", "bg1"], f"FIFO 위반: {log}"
# 나머지 풀어줌
for i in range(1, 5):
bg_release_signals[i].set()
await asyncio.gather(t_bg0, *t_bgs, t_fg)
assert log == ["bg0", "fg", "bg1", "bg2", "bg3", "bg4"], (
f"최종 순서: {log}"
)
# ─── Scenario 3: Long-running background blocks foreground (intended) ────
@pytest.mark.asyncio
async def test_long_running_background_blocks_foreground():
"""이미 inflight 인 background 는 끊지 않는다 (preemption X). foreground 들어와도
현재 점유 background 의 남은 시간만큼 대기.
"""
log: list = []
async def bg():
async with acquire_mlx_gate(Priority.BACKGROUND):
log.append(("bg_start", asyncio.get_event_loop().time()))
await asyncio.sleep(0.3)
log.append(("bg_end", asyncio.get_event_loop().time()))
async def fg():
await asyncio.sleep(0.05) # bg 가 inflight 진입한 후 fg 시도
async with acquire_mlx_gate(Priority.FOREGROUND):
log.append(("fg_start", asyncio.get_event_loop().time()))
t_bg = asyncio.create_task(bg())
t_fg = asyncio.create_task(fg())
await asyncio.gather(t_bg, t_fg)
events = {label: t for label, t in log}
# fg 는 bg_end 이후에만 시작 가능
assert events["fg_start"] >= events["bg_end"] - 0.01, (
f"preemption 발생? fg_start={events['fg_start']} bg_end={events['bg_end']}"
)
# ─── Scenario 4: Mixed concurrent enqueue (FG FIFO 먼저, BG FIFO 후) ──
@pytest.mark.asyncio
async def test_mixed_concurrent_enqueue():
"""2 FOREGROUND + 3 BACKGROUND 가 거의 동시 → FG fifo 먼저, BG fifo 후."""
log: list = []
holder_started = asyncio.Event()
holder_release = asyncio.Event()
async def holder():
async with acquire_mlx_gate(Priority.BACKGROUND):
holder_started.set()
await holder_release.wait()
async def fg(i: int):
async with acquire_mlx_gate(Priority.FOREGROUND):
log.append(f"fg{i}")
await asyncio.sleep(0.005)
async def bg(i: int):
async with acquire_mlx_gate(Priority.BACKGROUND):
log.append(f"bg{i}")
await asyncio.sleep(0.005)
# holder 가 fast-path 점유
t_h = asyncio.create_task(holder())
await holder_started.wait()
# 거의 동시 enqueue (gather)
tasks = [
asyncio.create_task(fg(0)),
asyncio.create_task(bg(0)),
asyncio.create_task(fg(1)),
asyncio.create_task(bg(1)),
asyncio.create_task(bg(2)),
]
await asyncio.sleep(0.01) # 모두 enqueue 까지 진행
# holder release → priority 순서대로 dispatch
holder_release.set()
await asyncio.gather(t_h, *tasks)
# fg0, fg1 가 먼저 (FG FIFO) → bg0, bg1, bg2 (BG FIFO)
assert log == ["fg0", "fg1", "bg0", "bg1", "bg2"], f"priority order 위반: {log}"
# ─── Scenario 5: Backward compat — async with get_mlx_gate() ──────────────
@pytest.mark.asyncio
async def test_backward_compat_get_mlx_gate():
"""`async with get_mlx_gate():` legacy 형태가 BACKGROUND priority 로 매핑."""
log: list = []
async def legacy_caller():
async with get_mlx_gate():
log.append("legacy")
await asyncio.sleep(0.01)
async def fg_caller():
async with acquire_mlx_gate(Priority.FOREGROUND):
log.append("fg")
await asyncio.sleep(0.01)
# 동시 enqueue
bg_holder_started = asyncio.Event()
bg_holder_release = asyncio.Event()
async def bg_holder():
async with acquire_mlx_gate(Priority.BACKGROUND):
bg_holder_started.set()
await bg_holder_release.wait()
t_h = asyncio.create_task(bg_holder())
await bg_holder_started.wait()
t_legacy = asyncio.create_task(legacy_caller())
await asyncio.sleep(0.005)
t_fg = asyncio.create_task(fg_caller())
await asyncio.sleep(0.01)
bg_holder_release.set()
await asyncio.gather(t_h, t_legacy, t_fg)
# legacy 는 BACKGROUND 매핑이므로 fg 가 먼저 (priority order)
assert log == ["fg", "legacy"], f"legacy 가 FOREGROUND 로 처리됨? {log}"
# ─── Scenario 6: Cancelled waiter skip ────────────────────────────────────
@pytest.mark.asyncio
async def test_cancelled_waiter_skip():
"""bg1 inflight → bg2 enqueue → fg1 enqueue → bg2 cancel → bg1 release →
다음 dispatch 가 fg1 (죽은 bg2 entry skip).
"""
log: list = []
bg1_release = asyncio.Event()
async def bg1():
async with acquire_mlx_gate(Priority.BACKGROUND):
log.append("bg1")
await bg1_release.wait()
async def bg2():
async with acquire_mlx_gate(Priority.BACKGROUND):
log.append("bg2") # 도달 안 해야 함 (cancel)
async def fg1():
async with acquire_mlx_gate(Priority.FOREGROUND):
log.append("fg1")
t_bg1 = asyncio.create_task(bg1())
await asyncio.sleep(0.01) # bg1 fast-path inflight
t_bg2 = asyncio.create_task(bg2())
await asyncio.sleep(0.01) # bg2 enqueue 대기
t_fg1 = asyncio.create_task(fg1())
await asyncio.sleep(0.01) # fg1 enqueue 대기
# bg2 cancel — Future 가 cancelled 상태로 heap 잔존
t_bg2.cancel()
try:
await t_bg2
except asyncio.CancelledError:
pass
# bg1 release — 다음 dispatch 는 fg1 (bg2 죽은 entry skip)
bg1_release.set()
await asyncio.gather(t_bg1, t_fg1)
# bg2 도달 안 함 (cancelled), 순서 = bg1, fg1
assert log == ["bg1", "fg1"], (
f"cancelled bg2 가 dispatch 됐거나 gate stuck: {log}"
)