From 7c9aff393adbaa617db4feab3d6248b918d61a39 Mon Sep 17 00:00:00 2001 From: Hyungi Ahn Date: Sun, 17 May 2026 08:42:58 +0900 Subject: [PATCH] feat(search): MLX priority gate (B-1, Priority.FOREGROUND vs BACKGROUND) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit DS-Mac-mini-26B-Priority-Gate-1 — Mac mini 26B single-inference gate 를 FIFO Semaphore → 우선순위 기반 heap dispatch 로 교체. concurrency 1 유지, queue ordering 만 foreground 우선. API: - Priority(IntEnum): FOREGROUND=0, BACKGROUND=100 - acquire_mlx_gate(priority=DEFAULT_PRIORITY) async context manager - DEFAULT_PRIORITY = BACKGROUND (안전 default, foreground 짓밟지 않음) - get_mlx_gate() legacy wrapper — context-manager only 호환 구현: - _inflight: bool + _waiters heap [(priority, seq, future, enqueue_ts)] - fast-path: not inflight and not waiters → 즉시 inflight, Future 생성 X - _dispatch_next_locked: cancelled/done Future skip (heap 잔재 risk 회피) - release: lock 안에서 pop, set_result 는 loop.call_soon (lock 밖) reentry deadlock 회피 - dispatch / enqueue / release / WARN log (observability) - BACKGROUND wait_ms > 300_000 (5분) 시 starvation WARN — aging 은 Phase 2 deferred Tests (tests/test_priority_gate.py, 6 scenario): 1. FIFO within same priority 2. Foreground jumps queue (bg5 대기 중 fg 들어오면 즉시 다음 슬롯) 3. Long-running background blocks foreground (preemption X, intended) 4. Mixed concurrent enqueue (FG fifo 먼저, BG fifo 후) 5. Backward compat (legacy get_mlx_gate() = BACKGROUND 매핑) 6. Cancelled waiter skip (heap 의 죽은 Future 건너뜀, gate stuck X) Site 교체는 별 commit (refactor(search): swap 10 call sites). plan: ~/.claude/plans/hermes-polymorphic-rossum.md --- app/services/search/llm_gate.py | 215 +++++++++++++++++++++--- tests/test_priority_gate.py | 281 ++++++++++++++++++++++++++++++++ 2 files changed, 473 insertions(+), 23 deletions(-) create mode 100644 tests/test_priority_gate.py diff --git a/app/services/search/llm_gate.py b/app/services/search/llm_gate.py index 560dc0a..76b73e2 100644 --- a/app/services/search/llm_gate.py +++ b/app/services/search/llm_gate.py @@ -1,17 +1,17 @@ -"""MLX single-inference 전역 gate (Phase 3.1.1). +"""MLX single-inference 전역 gate (Phase 3.1.1 + B-1 Priority Gate). Mac mini MLX primary(gemma-4-26b-a4b-it-8bit)는 **single-inference**다. 동시 호출이 들어오면 queue가 폭발한다(실측: 23 concurrent 요청 → 22개 15초 timeout). -이 모듈은 analyzer / evidence / synthesis 등 **모든 MLX-bound LLM 호출**이 -공유하는 `asyncio.Semaphore(1)`를 제공한다. MLX를 호출하는 경로는 예외 없이 -`async with get_mlx_gate():` 블록 안에서만 `AIClient._call_chat(ai.primary, ...)` -를 호출해야 한다. +이 모듈은 analyzer / evidence / classifier / synthesis 등 **모든 MLX-bound LLM +호출**이 공유하는 **우선순위 기반 gate** 를 제공한다. concurrency 는 1 고정이지만 +queue 의 ordering 은 `Priority.FOREGROUND` (user-facing ask) 가 `Priority.BACKGROUND` +(digest/briefing/worker) 보다 먼저 dispatch. ## 영구 룰 - **MLX primary 호출 경로는 예외 없이 gate 획득 필수**. query_analyzer / - evidence_service / synthesis_service 세 곳이 현재 사용자. 이후 경로가 늘어도 + evidence / classifier / synthesis 4 곳이 현재 사용자. 이후 경로가 늘어도 동일 gate를 import해서 사용한다. 새 Semaphore를 만들지 말 것 (큐 분할 시 동시 실행 발생). - **`asyncio.timeout(...)`은 gate 안쪽에서만 적용**. gate 대기 자체에 timeout을 @@ -22,37 +22,206 @@ Mac mini MLX primary(gemma-4-26b-a4b-it-8bit)는 **single-inference**다. - **MLX concurrency는 `MLX_CONCURRENCY = 1` 고정**. 모델이 바뀌어도 single- inference 특성이 깨지지 않는 한 이 값을 올리지 말 것. -## 확장 여지 (지금은 구현하지 않음) +## 우선순위 정책 (B-1, 2026-05-17) -트래픽 증가 시 "우선순위 역전"(/ask가 analyzer background task 뒤에 밀림)이 -문제가 되면 `asyncio.PriorityQueue` 기반 우선순위 큐로 교체 가능. Gate 자체 -분리(get_analyzer_gate / get_ask_gate)는 single-inference에서 throughput -개선이 없으므로 의미 없음. +- `Priority.FOREGROUND = 0`: user-facing path (`/api/search/ask`, 사용자 동기 + API, Hermes orchestrator 경유). 가능한 빨리 dispatch. +- `Priority.BACKGROUND = 100`: digest / briefing / classify-escalate / + study_* worker / query_analyzer prewarm. foreground 가 비어 있을 때만 dispatch. +- **DEFAULT_PRIORITY = BACKGROUND**: priority 미지정 호출은 foreground 짓밟지 + 않는다 (안전 default). +- **preemption 없음**: 이미 inflight 인 background 는 끊지 않는다. foreground 가 + 들어와도 현재 점유 background 의 남은 시간만큼은 대기. 단 background 2~5 + 까지 줄 서있던 큐는 foreground 가 앞으로 jump. +- **starvation aging 없음** (Phase 2 deferred). 단 BACKGROUND wait_ms > 5분이면 + WARN 로그 — 원인 추적 단서. + +## 사용 예 + +```python +from services.search.llm_gate import acquire_mlx_gate, Priority + +async def user_ask_path(...): + async with acquire_mlx_gate(Priority.FOREGROUND): + async with asyncio.timeout(30): + raw = await ai_client._call_chat(ai_client.ai.primary, prompt) + +async def background_worker(...): + async with acquire_mlx_gate(Priority.BACKGROUND): + ... +``` + +## 확장 여지 + +- aging (background 대기 시간 → priority boost) — Phase 2 +- concurrency > 1 일반화 — B-2 (Throughput) +- 별 gate 분리 (`get_analyzer_gate` / `get_ask_gate`) — single-inference 에서 + throughput 개선 없으므로 의미 없음 (PriorityQueue 안의 priority 만으로 충분) """ from __future__ import annotations import asyncio +import heapq +import itertools +import time +from contextlib import asynccontextmanager +from enum import IntEnum +from typing import AsyncIterator + +from core.utils import setup_logger + +logger = setup_logger("llm_gate") # MLX primary는 single-inference → 1 MLX_CONCURRENCY = 1 -# 첫 호출 시 현재 event loop에 바인딩된 Semaphore 생성 (lazy init) -_mlx_gate: asyncio.Semaphore | None = None +# Background waiter wait_ms 가 이 값 초과 시 WARN (starvation 신호, aging mitigation 은 Phase 2) +STARVATION_WARN_MS = 300_000 # 5 min -def get_mlx_gate() -> asyncio.Semaphore: - """MLX primary 호출 경로 공용 gate. 최초 호출 시 lazy init. +class Priority(IntEnum): + """MLX gate dispatch 우선순위. 낮을수록 먼저 dispatch.""" + + FOREGROUND = 0 + BACKGROUND = 100 + + +DEFAULT_PRIORITY: Priority = Priority.BACKGROUND + + +# ── Internal state (lazy init on first acquire) ───────────────────────────── +# Tuple format: (priority: int, seq: int, future: asyncio.Future, enqueue_ts: float) +_waiters: list[tuple[int, int, asyncio.Future, float]] = [] +_seq = itertools.count() +_inflight: bool = False +_lock: asyncio.Lock | None = None + + +def _get_lock() -> asyncio.Lock: + """Lazy init Lock on the current event loop.""" + global _lock + if _lock is None: + _lock = asyncio.Lock() + return _lock + + +def _dispatch_next_locked() -> asyncio.Future | None: + """다음 살아있는 waiter 의 Future 를 pop 후 반환. cancelled/done 인 entry skip. + + caller 는 lock 보유 상태에서 호출. 반환된 Future 의 set_result 는 lock 밖에서. + """ + while _waiters: + priority, seq, fut, enqueue_ts = heapq.heappop(_waiters) + if fut.cancelled() or fut.done(): + continue # timeout/cancel 후 죽은 Future 건너뜀 + return fut + return None + + +@asynccontextmanager +async def acquire_mlx_gate( + priority: Priority = DEFAULT_PRIORITY, +) -> AsyncIterator[None]: + """우선순위 기반 MLX primary gate. + + Args: + priority: Priority.FOREGROUND (user-facing) 또는 BACKGROUND (worker). + 미지정 시 BACKGROUND (안전 default). 사용 예: - async with get_mlx_gate(): - async with asyncio.timeout(LLM_TIMEOUT_MS / 1000): + async with acquire_mlx_gate(Priority.FOREGROUND): + async with asyncio.timeout(30): raw = await ai_client._call_chat(ai_client.ai.primary, prompt) - ⚠ `asyncio.timeout`은 반드시 gate 안쪽에 둘 것. 바깥에 두면 gate 대기만으로 - timeout이 발동한다. + ⚠ `asyncio.timeout` 은 반드시 gate 안쪽 (Future await 후) 에 둘 것. """ - global _mlx_gate - if _mlx_gate is None: - _mlx_gate = asyncio.Semaphore(MLX_CONCURRENCY) - return _mlx_gate + global _inflight, _waiters + + lock = _get_lock() + seq = next(_seq) + enqueue_ts = time.monotonic() + waited = False + fut: asyncio.Future | None = None + + async with lock: + if not _inflight and not _waiters: + # fast path — 즉시 inflight 진입, Future 생성 안 함 + _inflight = True + else: + # 대기열 진입 + fut = asyncio.get_event_loop().create_future() + heapq.heappush(_waiters, (int(priority), seq, fut, enqueue_ts)) + queue_len = len(_waiters) + logger.debug( + "mlx_gate enqueue priority=%s seq=%d queue_len=%d", + priority.name, seq, queue_len, + ) + waited = True + + if waited and fut is not None: + # lock 밖에서 await — release 가 lock 안에서 set_result 하면 reentry deadlock + await fut + + # inflight 진입 — wait_ms 측정 + dispatch log + starvation WARN + wait_ms = (time.monotonic() - enqueue_ts) * 1000.0 if waited else 0.0 + if waited: + async with lock: + queue_len_post = len(_waiters) + logger.info( + "mlx_gate dispatch priority=%s seq=%d wait_ms=%.0f queue_len=%d", + priority.name, seq, wait_ms, queue_len_post, + ) + if priority == Priority.BACKGROUND and wait_ms > STARVATION_WARN_MS: + logger.warning( + "mlx_gate background waiter starved wait_ms=%.0f priority=%s seq=%d", + wait_ms, priority.name, seq, + ) + + inflight_start = time.monotonic() + try: + yield + finally: + duration_ms = (time.monotonic() - inflight_start) * 1000.0 + next_fut: asyncio.Future | None = None + async with lock: + next_fut = _dispatch_next_locked() + if next_fut is None: + _inflight = False + # _inflight 는 True 유지 (다음 waiter 가 진입 예정) + logger.debug( + "mlx_gate release duration_ms=%.0f priority=%s seq=%d", + duration_ms, priority.name, seq, + ) + if next_fut is not None: + # lock 밖에서 set_result — reentry deadlock 회피 + loop = asyncio.get_event_loop() + loop.call_soon(next_fut.set_result, None) + + +# ── Backward compat: context-manager only wrapper ──────────────────────────── + + +def get_mlx_gate(): + """Legacy wrapper — `async with get_mlx_gate():` 형태만 호환. + + 내부적으로 `acquire_mlx_gate(DEFAULT_PRIORITY)` (= BACKGROUND) 로 위임한다. + 새 호출 site 는 `acquire_mlx_gate(Priority.FOREGROUND|BACKGROUND)` 명시 사용. + + ⚠ **Semaphore-like API 미지원** — `sem = get_mlx_gate(); await sem.acquire()` + 같은 직접 acquire/release 패턴은 동작하지 않는다. 발견 시 호출 site 를 + `async with acquire_mlx_gate(...)` 로 명시적 교체. + """ + return acquire_mlx_gate(DEFAULT_PRIORITY) + + +# ── Test helpers (conftest reset) ──────────────────────────────────────────── + + +def _reset_for_test() -> None: + """테스트 fixture 가 fresh loop 마다 호출. production code 에서 사용 X.""" + global _waiters, _inflight, _lock, _seq + _waiters = [] + _inflight = False + _lock = None + _seq = itertools.count() diff --git a/tests/test_priority_gate.py b/tests/test_priority_gate.py new file mode 100644 index 0000000..ae0eecf --- /dev/null +++ b/tests/test_priority_gate.py @@ -0,0 +1,281 @@ +"""DS-Mac-mini-26B-Priority-Gate-1 (B-1) unit smoke — 6 scenario. + +heap + inflight 기반 MLX gate 의 ordering/cancellation/fast-path 검증. 실 LLM +호출 없이 gate 자체의 acquire/release/dispatch 만 시뮬레이션. +""" + +from __future__ import annotations + +import asyncio +import logging + +import pytest + +from services.search.llm_gate import ( + Priority, + _reset_for_test, + acquire_mlx_gate, + get_mlx_gate, +) + + +@pytest.fixture(autouse=True) +def _reset_gate(): + """각 테스트 시작 시 gate 상태 reset (fresh event loop 마다).""" + _reset_for_test() + yield + _reset_for_test() + + +async def _hold(priority: Priority, duration_s: float, log: list, label: str): + """단순 gate holder — acquire 후 sleep 후 release.""" + async with acquire_mlx_gate(priority): + log.append(("acquired", label)) + await asyncio.sleep(duration_s) + log.append(("released", label)) + + +# ─── Scenario 1: FIFO within same priority ──────────────────────────────── + + +@pytest.mark.asyncio +async def test_fifo_within_same_priority(): + """BACKGROUND 3건 직렬 enqueue → 순서대로 dispatch (1→2→3).""" + log: list = [] + + async def acquirer(label: str, started: asyncio.Event): + async with acquire_mlx_gate(Priority.BACKGROUND): + log.append(label) + started.set() + await asyncio.sleep(0.05) + + e1, e2, e3 = asyncio.Event(), asyncio.Event(), asyncio.Event() + t1 = asyncio.create_task(acquirer("bg1", e1)) + await asyncio.sleep(0.001) # bg1 이 fast-path 먼저 + t2 = asyncio.create_task(acquirer("bg2", e2)) + await asyncio.sleep(0.001) + t3 = asyncio.create_task(acquirer("bg3", e3)) + + await asyncio.gather(t1, t2, t3) + assert log == ["bg1", "bg2", "bg3"], f"FIFO 위반: {log}" + + +# ─── Scenario 2: Foreground jumps queue ─────────────────────────────────── + + +@pytest.mark.asyncio +async def test_foreground_jumps_queue(): + """BACKGROUND 5건 enqueue (1 inflight, 2~5 대기) → FOREGROUND 1건 enqueue + → 1번 release 후 다음은 FOREGROUND (2~5 모두 뒤로). + """ + log: list = [] + bg_release_signals = [asyncio.Event() for _ in range(5)] + fg_release_signal = asyncio.Event() + + async def bg(i: int): + async with acquire_mlx_gate(Priority.BACKGROUND): + log.append(f"bg{i}") + await bg_release_signals[i].wait() + + async def fg(): + async with acquire_mlx_gate(Priority.FOREGROUND): + log.append("fg") + await fg_release_signal.wait() + + # bg0 fast-path inflight + t_bg0 = asyncio.create_task(bg(0)) + await asyncio.sleep(0.01) # bg0 이 inflight 확실해질 때까지 + # bg1~4 enqueue (대기) + t_bgs = [asyncio.create_task(bg(i)) for i in range(1, 5)] + await asyncio.sleep(0.01) + # fg enqueue (대기) — 모든 background 보다 앞으로 jump 해야 함 + t_fg = asyncio.create_task(fg()) + await asyncio.sleep(0.01) + + # bg0 release → 다음 dispatch 는 FG + bg_release_signals[0].set() + await asyncio.sleep(0.05) + assert log == ["bg0", "fg"], f"FG jump 위반 (bg1 먼저 들어옴?): {log}" + + # fg release → bg1 + fg_release_signal.set() + await asyncio.sleep(0.05) + assert log == ["bg0", "fg", "bg1"], f"FIFO 위반: {log}" + + # 나머지 풀어줌 + for i in range(1, 5): + bg_release_signals[i].set() + await asyncio.gather(t_bg0, *t_bgs, t_fg) + + assert log == ["bg0", "fg", "bg1", "bg2", "bg3", "bg4"], ( + f"최종 순서: {log}" + ) + + +# ─── Scenario 3: Long-running background blocks foreground (intended) ──── + + +@pytest.mark.asyncio +async def test_long_running_background_blocks_foreground(): + """이미 inflight 인 background 는 끊지 않는다 (preemption X). foreground 들어와도 + 현재 점유 background 의 남은 시간만큼 대기. + """ + log: list = [] + + async def bg(): + async with acquire_mlx_gate(Priority.BACKGROUND): + log.append(("bg_start", asyncio.get_event_loop().time())) + await asyncio.sleep(0.3) + log.append(("bg_end", asyncio.get_event_loop().time())) + + async def fg(): + await asyncio.sleep(0.05) # bg 가 inflight 진입한 후 fg 시도 + async with acquire_mlx_gate(Priority.FOREGROUND): + log.append(("fg_start", asyncio.get_event_loop().time())) + + t_bg = asyncio.create_task(bg()) + t_fg = asyncio.create_task(fg()) + await asyncio.gather(t_bg, t_fg) + + events = {label: t for label, t in log} + # fg 는 bg_end 이후에만 시작 가능 + assert events["fg_start"] >= events["bg_end"] - 0.01, ( + f"preemption 발생? fg_start={events['fg_start']} bg_end={events['bg_end']}" + ) + + +# ─── Scenario 4: Mixed concurrent enqueue (FG FIFO 먼저, BG FIFO 후) ── + + +@pytest.mark.asyncio +async def test_mixed_concurrent_enqueue(): + """2 FOREGROUND + 3 BACKGROUND 가 거의 동시 → FG fifo 먼저, BG fifo 후.""" + log: list = [] + holder_started = asyncio.Event() + holder_release = asyncio.Event() + + async def holder(): + async with acquire_mlx_gate(Priority.BACKGROUND): + holder_started.set() + await holder_release.wait() + + async def fg(i: int): + async with acquire_mlx_gate(Priority.FOREGROUND): + log.append(f"fg{i}") + await asyncio.sleep(0.005) + + async def bg(i: int): + async with acquire_mlx_gate(Priority.BACKGROUND): + log.append(f"bg{i}") + await asyncio.sleep(0.005) + + # holder 가 fast-path 점유 + t_h = asyncio.create_task(holder()) + await holder_started.wait() + + # 거의 동시 enqueue (gather) + tasks = [ + asyncio.create_task(fg(0)), + asyncio.create_task(bg(0)), + asyncio.create_task(fg(1)), + asyncio.create_task(bg(1)), + asyncio.create_task(bg(2)), + ] + await asyncio.sleep(0.01) # 모두 enqueue 까지 진행 + + # holder release → priority 순서대로 dispatch + holder_release.set() + await asyncio.gather(t_h, *tasks) + + # fg0, fg1 가 먼저 (FG FIFO) → bg0, bg1, bg2 (BG FIFO) + assert log == ["fg0", "fg1", "bg0", "bg1", "bg2"], f"priority order 위반: {log}" + + +# ─── Scenario 5: Backward compat — async with get_mlx_gate() ────────────── + + +@pytest.mark.asyncio +async def test_backward_compat_get_mlx_gate(): + """`async with get_mlx_gate():` legacy 형태가 BACKGROUND priority 로 매핑.""" + log: list = [] + + async def legacy_caller(): + async with get_mlx_gate(): + log.append("legacy") + await asyncio.sleep(0.01) + + async def fg_caller(): + async with acquire_mlx_gate(Priority.FOREGROUND): + log.append("fg") + await asyncio.sleep(0.01) + + # 동시 enqueue + bg_holder_started = asyncio.Event() + bg_holder_release = asyncio.Event() + + async def bg_holder(): + async with acquire_mlx_gate(Priority.BACKGROUND): + bg_holder_started.set() + await bg_holder_release.wait() + + t_h = asyncio.create_task(bg_holder()) + await bg_holder_started.wait() + + t_legacy = asyncio.create_task(legacy_caller()) + await asyncio.sleep(0.005) + t_fg = asyncio.create_task(fg_caller()) + await asyncio.sleep(0.01) + + bg_holder_release.set() + await asyncio.gather(t_h, t_legacy, t_fg) + + # legacy 는 BACKGROUND 매핑이므로 fg 가 먼저 (priority order) + assert log == ["fg", "legacy"], f"legacy 가 FOREGROUND 로 처리됨? {log}" + + +# ─── Scenario 6: Cancelled waiter skip ──────────────────────────────────── + + +@pytest.mark.asyncio +async def test_cancelled_waiter_skip(): + """bg1 inflight → bg2 enqueue → fg1 enqueue → bg2 cancel → bg1 release → + 다음 dispatch 가 fg1 (죽은 bg2 entry skip). + """ + log: list = [] + bg1_release = asyncio.Event() + + async def bg1(): + async with acquire_mlx_gate(Priority.BACKGROUND): + log.append("bg1") + await bg1_release.wait() + + async def bg2(): + async with acquire_mlx_gate(Priority.BACKGROUND): + log.append("bg2") # 도달 안 해야 함 (cancel) + + async def fg1(): + async with acquire_mlx_gate(Priority.FOREGROUND): + log.append("fg1") + + t_bg1 = asyncio.create_task(bg1()) + await asyncio.sleep(0.01) # bg1 fast-path inflight + t_bg2 = asyncio.create_task(bg2()) + await asyncio.sleep(0.01) # bg2 enqueue 대기 + t_fg1 = asyncio.create_task(fg1()) + await asyncio.sleep(0.01) # fg1 enqueue 대기 + + # bg2 cancel — Future 가 cancelled 상태로 heap 잔존 + t_bg2.cancel() + try: + await t_bg2 + except asyncio.CancelledError: + pass + + # bg1 release — 다음 dispatch 는 fg1 (bg2 죽은 entry skip) + bg1_release.set() + await asyncio.gather(t_bg1, t_fg1) + + # bg2 도달 안 함 (cancelled), 순서 = bg1, fg1 + assert log == ["bg1", "fg1"], ( + f"cancelled bg2 가 dispatch 됐거나 gate stuck: {log}" + )