ops(pipeline): fair-share 번들 — drain classify 합류 + deep 맥미니 폴백 + mlx 게이트 동시 2
사용자 '공평하게 동일한 작업' 지적의 비대칭 잔재 2건 + 예고된 배칭 레버: - queue_drain --stage classify (use_deep: deep 슬롯 endpoint + triage sampling, 완료 시 enqueue_next_stage 로 embed/chunk/markdown 연쇄 — DAG 단절 방지) - deep_summary consumer = 맥북 우선, 불가 시 맥미니 primary 즉시 처리(동일 모델 — 강등 아님). drain 은 defer_on_deep_unavailable=True 로 기존 보류-종료 유지 - llm_gate capacity 일반화 (config pipeline.mlx_gate_concurrency, 기본 1, 운영 2) — 'MLX_CONCURRENCY=1 고정' 영구 룰의 전제(single-inference 서버) 소멸을 docstring 에 개정 박제 - analyze_events FK(users) CLI 컨텍스트 INSERT 실패 fix (models.user 명시 import) Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,150 @@
|
||||
"""2026-06-12 fair-share 번들 — gate capacity 일반화 / call_deep_or_defer cfg / drain classify.
|
||||
|
||||
worker-process 레벨(DB 필요)의 deep 폴백·classify drain 은 라이브 E2E 로 검증하고,
|
||||
여기서는 새 메커니즘의 seam 만 단위 검증한다.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
from types import SimpleNamespace
|
||||
|
||||
import httpx
|
||||
import pytest
|
||||
|
||||
from core.config import settings
|
||||
from services.search.llm_gate import Priority, _reset_for_test, acquire_mlx_gate, gate_status
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def _reset_gate(monkeypatch):
|
||||
monkeypatch.setattr(settings, "mlx_gate_concurrency", 2)
|
||||
_reset_for_test()
|
||||
yield
|
||||
_reset_for_test()
|
||||
|
||||
|
||||
# ─── gate capacity 2 ─────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_two_concurrent_holders_overlap():
|
||||
"""capacity=2: 두 holder 가 동시에 inflight — 서로를 기다리지 않는다."""
|
||||
log: list = []
|
||||
|
||||
async def hold(label: str):
|
||||
async with acquire_mlx_gate(Priority.BACKGROUND):
|
||||
log.append(("in", label))
|
||||
await asyncio.sleep(0.05)
|
||||
log.append(("out", label))
|
||||
|
||||
await asyncio.gather(hold("a"), hold("b"))
|
||||
# 둘 다 진입한 뒤에 첫 release 가 나와야 함 (overlap 증명)
|
||||
assert log[0][0] == "in" and log[1][0] == "in"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_third_waits_until_slot_frees():
|
||||
"""capacity=2: 3번째는 대기, 첫 release 후 진입."""
|
||||
order: list = []
|
||||
release_a = asyncio.Event()
|
||||
|
||||
async def hold(label: str, wait_event: asyncio.Event | None):
|
||||
async with acquire_mlx_gate(Priority.BACKGROUND):
|
||||
order.append(("in", label))
|
||||
if wait_event:
|
||||
await wait_event.wait()
|
||||
else:
|
||||
await asyncio.sleep(0.01)
|
||||
order.append(("out", label))
|
||||
|
||||
t_a = asyncio.create_task(hold("a", release_a))
|
||||
t_b = asyncio.create_task(hold("b", release_a))
|
||||
await asyncio.sleep(0.02) # a, b inflight 진입 대기
|
||||
assert ("in", "a") in order and ("in", "b") in order
|
||||
assert gate_status()["inflight"] == 2
|
||||
|
||||
t_c = asyncio.create_task(hold("c", None))
|
||||
await asyncio.sleep(0.02)
|
||||
assert ("in", "c") not in order # 슬롯 2 점유 중 — c 는 대기
|
||||
assert gate_status()["waiters"] == 1
|
||||
|
||||
release_a.set()
|
||||
await asyncio.gather(t_a, t_b, t_c)
|
||||
assert ("in", "c") in order
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_capacity_one_serializes():
|
||||
"""concurrency=1 이면 기존 직렬화 그대로 (무회귀)."""
|
||||
from core.config import settings as s
|
||||
|
||||
s_backup = s.mlx_gate_concurrency
|
||||
s.mlx_gate_concurrency = 1
|
||||
try:
|
||||
_reset_for_test()
|
||||
log: list = []
|
||||
|
||||
async def hold(label: str):
|
||||
async with acquire_mlx_gate(Priority.BACKGROUND):
|
||||
log.append(("in", label))
|
||||
await asyncio.sleep(0.02)
|
||||
log.append(("out", label))
|
||||
|
||||
await asyncio.gather(hold("a"), hold("b"))
|
||||
# 직렬: in/out 쌍이 겹치지 않음
|
||||
assert [e[0] for e in log] == ["in", "out", "in", "out"]
|
||||
finally:
|
||||
s.mlx_gate_concurrency = s_backup
|
||||
|
||||
|
||||
# ─── call_deep_or_defer cfg override ─────────────────────────────────────────
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_call_deep_or_defer_cfg_override():
|
||||
"""cfg 지정 시 deep 슬롯 대신 해당 config 로 _request 호출."""
|
||||
from ai.client import call_deep_or_defer
|
||||
|
||||
seen: dict = {}
|
||||
|
||||
class FakeClient:
|
||||
ai = SimpleNamespace(deep=SimpleNamespace(model="deep-slot"))
|
||||
|
||||
async def _request(self, cfg, prompt, system=None):
|
||||
seen["cfg"] = cfg
|
||||
return "ok"
|
||||
|
||||
async def call_deep(self, prompt, system=None):
|
||||
seen["cfg"] = self.ai.deep
|
||||
return "ok"
|
||||
|
||||
override = SimpleNamespace(model="deep-endpoint-triage-sampling", temperature=0.0)
|
||||
out = await call_deep_or_defer(FakeClient(), "p", cfg=override)
|
||||
assert out == "ok"
|
||||
assert seen["cfg"] is override
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_call_deep_or_defer_cfg_still_defers():
|
||||
"""cfg 경로에서도 보류 분류(502/503/TransportError → StageDeferred) 동일 적용."""
|
||||
from ai.client import call_deep_or_defer
|
||||
from models.queue import StageDeferred
|
||||
|
||||
class FakeClient:
|
||||
ai = SimpleNamespace(deep=SimpleNamespace(model="deep-slot"))
|
||||
|
||||
async def _request(self, cfg, prompt, system=None):
|
||||
raise httpx.ConnectError("down")
|
||||
|
||||
with pytest.raises(StageDeferred):
|
||||
await call_deep_or_defer(FakeClient(), "p", cfg=SimpleNamespace(model="x"))
|
||||
|
||||
|
||||
# ─── drain stages ────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
def test_drain_stages_include_classify():
|
||||
from workers.queue_drain import DRAIN_STAGES
|
||||
|
||||
assert set(DRAIN_STAGES) == {"summarize", "deep_summary", "classify"}
|
||||
@@ -20,8 +20,14 @@ from services.search.llm_gate import (
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def _reset_gate():
|
||||
"""각 테스트 시작 시 gate 상태 reset (fresh event loop 마다)."""
|
||||
def _reset_gate(monkeypatch):
|
||||
"""각 테스트 시작 시 gate 상태 reset (fresh event loop 마다).
|
||||
|
||||
2026-06-12 capacity 일반화 이후 본 파일의 직렬화 가정 보존을 위해
|
||||
concurrency=1 로 고정 (capacity>1 동작은 test_fair_share.py 가 커버).
|
||||
"""
|
||||
from core.config import settings
|
||||
monkeypatch.setattr(settings, "mlx_gate_concurrency", 1)
|
||||
_reset_for_test()
|
||||
yield
|
||||
_reset_for_test()
|
||||
|
||||
Reference in New Issue
Block a user