Files
hyungi 235bbf9881 ops(pipeline): fair-share 번들 — drain classify 합류 + deep 맥미니 폴백 + mlx 게이트 동시 2
사용자 '공평하게 동일한 작업' 지적의 비대칭 잔재 2건 + 예고된 배칭 레버:
- queue_drain --stage classify (use_deep: deep 슬롯 endpoint + triage sampling,
  완료 시 enqueue_next_stage 로 embed/chunk/markdown 연쇄 — DAG 단절 방지)
- deep_summary consumer = 맥북 우선, 불가 시 맥미니 primary 즉시 처리(동일 모델 —
  강등 아님). drain 은 defer_on_deep_unavailable=True 로 기존 보류-종료 유지
- llm_gate capacity 일반화 (config pipeline.mlx_gate_concurrency, 기본 1, 운영 2) —
  'MLX_CONCURRENCY=1 고정' 영구 룰의 전제(single-inference 서버) 소멸을 docstring 에 개정 박제
- analyze_events FK(users) CLI 컨텍스트 INSERT 실패 fix (models.user 명시 import)

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-06-12 06:56:02 +09:00

151 lines
5.1 KiB
Python

"""2026-06-12 fair-share 번들 — gate capacity 일반화 / call_deep_or_defer cfg / drain classify.
worker-process 레벨(DB 필요)의 deep 폴백·classify drain 은 라이브 E2E 로 검증하고,
여기서는 새 메커니즘의 seam 만 단위 검증한다.
"""
from __future__ import annotations
import asyncio
from types import SimpleNamespace
import httpx
import pytest
from core.config import settings
from services.search.llm_gate import Priority, _reset_for_test, acquire_mlx_gate, gate_status
@pytest.fixture(autouse=True)
def _reset_gate(monkeypatch):
monkeypatch.setattr(settings, "mlx_gate_concurrency", 2)
_reset_for_test()
yield
_reset_for_test()
# ─── gate capacity 2 ─────────────────────────────────────────────────────────
@pytest.mark.asyncio
async def test_two_concurrent_holders_overlap():
"""capacity=2: 두 holder 가 동시에 inflight — 서로를 기다리지 않는다."""
log: list = []
async def hold(label: str):
async with acquire_mlx_gate(Priority.BACKGROUND):
log.append(("in", label))
await asyncio.sleep(0.05)
log.append(("out", label))
await asyncio.gather(hold("a"), hold("b"))
# 둘 다 진입한 뒤에 첫 release 가 나와야 함 (overlap 증명)
assert log[0][0] == "in" and log[1][0] == "in"
@pytest.mark.asyncio
async def test_third_waits_until_slot_frees():
"""capacity=2: 3번째는 대기, 첫 release 후 진입."""
order: list = []
release_a = asyncio.Event()
async def hold(label: str, wait_event: asyncio.Event | None):
async with acquire_mlx_gate(Priority.BACKGROUND):
order.append(("in", label))
if wait_event:
await wait_event.wait()
else:
await asyncio.sleep(0.01)
order.append(("out", label))
t_a = asyncio.create_task(hold("a", release_a))
t_b = asyncio.create_task(hold("b", release_a))
await asyncio.sleep(0.02) # a, b inflight 진입 대기
assert ("in", "a") in order and ("in", "b") in order
assert gate_status()["inflight"] == 2
t_c = asyncio.create_task(hold("c", None))
await asyncio.sleep(0.02)
assert ("in", "c") not in order # 슬롯 2 점유 중 — c 는 대기
assert gate_status()["waiters"] == 1
release_a.set()
await asyncio.gather(t_a, t_b, t_c)
assert ("in", "c") in order
@pytest.mark.asyncio
async def test_capacity_one_serializes():
"""concurrency=1 이면 기존 직렬화 그대로 (무회귀)."""
from core.config import settings as s
s_backup = s.mlx_gate_concurrency
s.mlx_gate_concurrency = 1
try:
_reset_for_test()
log: list = []
async def hold(label: str):
async with acquire_mlx_gate(Priority.BACKGROUND):
log.append(("in", label))
await asyncio.sleep(0.02)
log.append(("out", label))
await asyncio.gather(hold("a"), hold("b"))
# 직렬: in/out 쌍이 겹치지 않음
assert [e[0] for e in log] == ["in", "out", "in", "out"]
finally:
s.mlx_gate_concurrency = s_backup
# ─── call_deep_or_defer cfg override ─────────────────────────────────────────
@pytest.mark.asyncio
async def test_call_deep_or_defer_cfg_override():
"""cfg 지정 시 deep 슬롯 대신 해당 config 로 _request 호출."""
from ai.client import call_deep_or_defer
seen: dict = {}
class FakeClient:
ai = SimpleNamespace(deep=SimpleNamespace(model="deep-slot"))
async def _request(self, cfg, prompt, system=None):
seen["cfg"] = cfg
return "ok"
async def call_deep(self, prompt, system=None):
seen["cfg"] = self.ai.deep
return "ok"
override = SimpleNamespace(model="deep-endpoint-triage-sampling", temperature=0.0)
out = await call_deep_or_defer(FakeClient(), "p", cfg=override)
assert out == "ok"
assert seen["cfg"] is override
@pytest.mark.asyncio
async def test_call_deep_or_defer_cfg_still_defers():
"""cfg 경로에서도 보류 분류(502/503/TransportError → StageDeferred) 동일 적용."""
from ai.client import call_deep_or_defer
from models.queue import StageDeferred
class FakeClient:
ai = SimpleNamespace(deep=SimpleNamespace(model="deep-slot"))
async def _request(self, cfg, prompt, system=None):
raise httpx.ConnectError("down")
with pytest.raises(StageDeferred):
await call_deep_or_defer(FakeClient(), "p", cfg=SimpleNamespace(model="x"))
# ─── drain stages ────────────────────────────────────────────────────────────
def test_drain_stages_include_classify():
from workers.queue_drain import DRAIN_STAGES
assert set(DRAIN_STAGES) == {"summarize", "deep_summary", "classify"}