From 235bbf98816ebc5e60a4bad4f5029dadd273f29c Mon Sep 17 00:00:00 2001 From: hyungi Date: Fri, 12 Jun 2026 06:56:02 +0900 Subject: [PATCH] =?UTF-8?q?ops(pipeline):=20fair-share=20=EB=B2=88?= =?UTF-8?q?=EB=93=A4=20=E2=80=94=20drain=20classify=20=ED=95=A9=EB=A5=98?= =?UTF-8?q?=20+=20deep=20=EB=A7=A5=EB=AF=B8=EB=8B=88=20=ED=8F=B4=EB=B0=B1?= =?UTF-8?q?=20+=20mlx=20=EA=B2=8C=EC=9D=B4=ED=8A=B8=20=EB=8F=99=EC=8B=9C?= =?UTF-8?q?=202?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 사용자 '공평하게 동일한 작업' 지적의 비대칭 잔재 2건 + 예고된 배칭 레버: - queue_drain --stage classify (use_deep: deep 슬롯 endpoint + triage sampling, 완료 시 enqueue_next_stage 로 embed/chunk/markdown 연쇄 — DAG 단절 방지) - deep_summary consumer = 맥북 우선, 불가 시 맥미니 primary 즉시 처리(동일 모델 — 강등 아님). drain 은 defer_on_deep_unavailable=True 로 기존 보류-종료 유지 - llm_gate capacity 일반화 (config pipeline.mlx_gate_concurrency, 기본 1, 운영 2) — 'MLX_CONCURRENCY=1 고정' 영구 룰의 전제(single-inference 서버) 소멸을 docstring 에 개정 박제 - analyze_events FK(users) CLI 컨텍스트 INSERT 실패 fix (models.user 명시 import) Co-Authored-By: Claude Fable 5 --- app/ai/client.py | 17 +++- app/core/config.py | 12 +++ app/models/analyze_event.py | 5 + app/services/search/llm_gate.py | 48 ++++++--- app/workers/classify_worker.py | 26 ++++- app/workers/deep_summary_worker.py | 34 +++++-- app/workers/queue_drain.py | 24 +++-- config.yaml | 5 + tests/test_fair_share.py | 150 +++++++++++++++++++++++++++++ tests/test_priority_gate.py | 10 +- 10 files changed, 295 insertions(+), 36 deletions(-) create mode 100644 tests/test_fair_share.py diff --git a/app/ai/client.py b/app/ai/client.py index bebe291..485d745 100644 --- a/app/ai/client.py +++ b/app/ai/client.py @@ -150,15 +150,26 @@ def is_deferrable_error(exc: Exception) -> bool: return isinstance(exc, httpx.TransportError) -async def call_deep_or_defer(client: "AIClient", prompt: str, system: str | None = None) -> str: +async def call_deep_or_defer( + client: "AIClient", + prompt: str, + system: str | None = None, + cfg: "AIModelConfig | None" = None, +) -> str: """call_deep + 보류 변환 — 맥북 불가(503/연결/절단)는 StageDeferred 로 raise. - deep_summary_worker / summarize_worker(drain) 가 공유. StageDeferred 는 queue_consumer/ - queue_drain 이 attempts 미소모 + deferred_until 백오프로 처리한다 (sleep-안전 불변식). + deep_summary_worker / summarize_worker(drain) / classify_worker(drain) 가 공유. + StageDeferred 는 queue_consumer/queue_drain 이 attempts 미소모 + deferred_until + 백오프로 처리한다 (sleep-안전 불변식). + + cfg: 지정 시 deep 슬롯 대신 이 config 로 호출 (classify drain — deep 슬롯의 + endpoint 는 쓰되 triage 의 temperature/max_tokens 를 적용한 변형). """ from models.queue import StageDeferred try: + if cfg is not None: + return await client._request(cfg, prompt, system=system) return await client.call_deep(prompt, system=system) except Exception as exc: if is_deferrable_error(exc): diff --git a/app/core/config.py b/app/core/config.py index 1bbd60e..5cf3302 100644 --- a/app/core/config.py +++ b/app/core/config.py @@ -165,6 +165,10 @@ class Settings(BaseModel): # 빈 리스트 = 무동작 (기존 동작 그대로). pipeline_held_stages: list[str] = [] + # mlx gate 동시 실행 상한 (2026-06-12, config.yaml pipeline.mlx_gate_concurrency). + # 1 = 구 single-inference 동작. 2 = continuous batching 활용 (llm_gate docstring 참조). + mlx_gate_concurrency: int = 1 + # PR-MacMini-Derived-Worker-1: study explanation owner = Mac mini # GPU 측은 false 로 설정 (.env), explanation 분기 skip guard 트리거. study_explanation_enabled: bool = True @@ -252,12 +256,19 @@ def load_settings() -> Settings: ) pipeline_held_stages: list[str] = [] + mlx_gate_concurrency = 1 if config_path.exists() and raw and "pipeline" in raw: held_raw = (raw.get("pipeline") or {}).get("held_stages") or [] # 스칼라(문자열) 오기입 시 char-split 방지 — 단일 항목 리스트로 수용. if not isinstance(held_raw, (list, tuple)): held_raw = [held_raw] pipeline_held_stages = [str(s) for s in held_raw] + try: + mlx_gate_concurrency = max( + 1, int((raw.get("pipeline") or {}).get("mlx_gate_concurrency", 1)) + ) + except (TypeError, ValueError): + mlx_gate_concurrency = 1 taxonomy = raw.get("taxonomy", {}) if config_path.exists() and raw else {} document_types = raw.get("document_types", []) if config_path.exists() and raw else [] @@ -288,6 +299,7 @@ def load_settings() -> Settings: study_card_extract_enabled=study_card_extract_enabled, internal_worker_token=internal_worker_token, pipeline_held_stages=pipeline_held_stages, + mlx_gate_concurrency=mlx_gate_concurrency, ) diff --git a/app/models/analyze_event.py b/app/models/analyze_event.py index fe1d5ab..c12eebf 100644 --- a/app/models/analyze_event.py +++ b/app/models/analyze_event.py @@ -14,6 +14,11 @@ from sqlalchemy.orm import Mapped, mapped_column from core.database import Base +# FK("users.id") 해석에 users 테이블 메타데이터 필요 — fastapi 앱은 어차피 전 모델을 +# import 하지만, CLI 단독 실행(queue_drain 등)은 본 모듈만 끌어와 INSERT 시 +# "could not find table 'users'" 로 실패했다 (2026-06-12 drain 로그 실측). 명시 import. +from models.user import User # noqa: F401 + class AnalyzeEvent(Base): __tablename__ = "analyze_events" diff --git a/app/services/search/llm_gate.py b/app/services/search/llm_gate.py index 7e4add0..a643ceb 100644 --- a/app/services/search/llm_gate.py +++ b/app/services/search/llm_gate.py @@ -26,8 +26,11 @@ PR-MacBook-RAG-Backend-1 부터 `services.llm.QwenMacBookBackend` 는 별 endpoi - **fallback(Claude Sonnet 4 API) 경로는 gate 제외**. PR #20 이후 fallback = Claude API. 단 현재 구현상 `AIClient._call_chat` 내부에서 primary→fallback 전환이 일어나므로 fallback도 gate 점유 상태로 실행된다. 허용 가능(fallback 빈도 낮음). -- **MLX concurrency는 `MLX_CONCURRENCY = 1` 고정**. 모델이 바뀌어도 single- - inference 특성이 깨지지 않는 한 이 값을 올리지 말 것. +- ~~**MLX concurrency는 `MLX_CONCURRENCY = 1` 고정**~~ → **2026-06-12 개정**: + 구 룰의 전제(서버 = single-inference)가 소멸 — 현 mlx_vlm server 는 continuous + batching 으로 동시 스트림 흡수(실측). 상한은 config `pipeline.mlx_gate_concurrency` + (기본 1, 운영 2). **게이트 자체(상한+우선순위 큐)는 영구 유지** — thundering herd + (23 concurrent → 22 timeout 사고) 방지는 계속 이 상한이 담당. 무제한 금지. ## 우선순위 정책 (B-1, 2026-05-17) @@ -80,8 +83,22 @@ from core.utils import setup_logger logger = setup_logger("llm_gate") -# MLX primary는 single-inference → 1 -MLX_CONCURRENCY = 1 + +def _capacity() -> int: + """게이트 동시 실행 상한 — config.yaml `pipeline.mlx_gate_concurrency` (기본 1). + + 2026-06-12 일반화: "MLX_CONCURRENCY = 1 고정" 영구 룰의 전제(구 서버 = single- + inference, 23 concurrent → 22 timeout 실측)가 소멸 — 현 mlx_vlm server 는 + continuous batching 으로 동시 스트림을 흡수(2026-06-11 밤 6~8 concurrent 실측 + 정상). 게이트 자체(상한 + 우선순위)는 유지하고 상한만 config 로 — thundering + herd 재발 방지는 이 상한이 계속 담당한다. 런타임 매 acquire 시 조회라 + config 변경 + 프로세스 재기동으로 반영, 테스트는 settings monkeypatch. + """ + from core.config import settings + try: + return max(1, int(getattr(settings, "mlx_gate_concurrency", 1))) + except (TypeError, ValueError): + return 1 # Background waiter wait_ms 가 이 값 초과 시 WARN (starvation 신호, aging mitigation 은 Phase 2) STARVATION_WARN_MS = 300_000 # 5 min @@ -101,7 +118,7 @@ DEFAULT_PRIORITY: Priority = Priority.BACKGROUND # Tuple format: (priority: int, seq: int, future: asyncio.Future, enqueue_ts: float) _waiters: list[tuple[int, int, asyncio.Future, float]] = [] _seq = itertools.count() -_inflight: bool = False +_inflight_n: int = 0 # 동시 실행 수 (구 bool — capacity 일반화로 카운터) _lock: asyncio.Lock | None = None @@ -143,7 +160,7 @@ async def acquire_mlx_gate( ⚠ `asyncio.timeout` 은 반드시 gate 안쪽 (Future await 후) 에 둘 것. """ - global _inflight, _waiters + global _inflight_n, _waiters lock = _get_lock() seq = next(_seq) @@ -152,9 +169,9 @@ async def acquire_mlx_gate( fut: asyncio.Future | None = None async with lock: - if not _inflight and not _waiters: + if _inflight_n < _capacity() and not _waiters: # fast path — 즉시 inflight 진입, Future 생성 안 함 - _inflight = True + _inflight_n += 1 else: # 대기열 진입 fut = asyncio.get_event_loop().create_future() @@ -194,8 +211,8 @@ async def acquire_mlx_gate( async with lock: next_fut = _dispatch_next_locked() if next_fut is None: - _inflight = False - # _inflight 는 True 유지 (다음 waiter 가 진입 예정) + _inflight_n = max(0, _inflight_n - 1) + # next_fut 가 있으면 슬롯 handover — 카운트 유지 (다음 waiter 가 진입 예정) logger.debug( "mlx_gate release duration_ms=%.0f priority=%s seq=%d", duration_ms, priority.name, seq, @@ -226,8 +243,11 @@ def get_mlx_gate(): def gate_status() -> dict: - """현재 gate 점유 스냅샷 (read-only, lock-free 근사치 — UI 표시용).""" - return {"inflight": _inflight, "waiters": len(_waiters)} + """현재 gate 점유 스냅샷 (read-only, lock-free 근사치 — UI 표시용). + + inflight = 동시 실행 수(int). 기존 소비자(eid status)는 bool() 캐스팅이라 호환. + """ + return {"inflight": _inflight_n, "waiters": len(_waiters)} # ── Test helpers (conftest reset) ──────────────────────────────────────────── @@ -235,8 +255,8 @@ def gate_status() -> dict: def _reset_for_test() -> None: """테스트 fixture 가 fresh loop 마다 호출. production code 에서 사용 X.""" - global _waiters, _inflight, _lock, _seq + global _waiters, _inflight_n, _lock, _seq _waiters = [] - _inflight = False + _inflight_n = 0 _lock = None _seq = itertools.count() diff --git a/app/workers/classify_worker.py b/app/workers/classify_worker.py index dc7f622..14e2548 100644 --- a/app/workers/classify_worker.py +++ b/app/workers/classify_worker.py @@ -31,12 +31,12 @@ from pydantic import BaseModel, Field, ValidationError from sqlalchemy import text as sql_text from sqlalchemy.ext.asyncio import AsyncSession -from ai.client import AIClient, parse_json_response, strip_thinking +from ai.client import AIClient, call_deep_or_defer, parse_json_response, strip_thinking from ai.envelope import EscalationEnvelope from core.config import settings from core.utils import setup_logger from models.document import Document -from models.queue import enqueue_stage +from models.queue import StageDeferred, enqueue_stage from policy.prompt_render import render_4b, policy_version as compute_policy_version from policy.routing import decide_routing from services.document_telemetry import record_analyze_event @@ -345,13 +345,20 @@ _FRONTMATTER_PRESERVED_KEYS = { # ───────────────────────── main process ──────────────────────────────── -async def process(document_id: int, session: AsyncSession) -> None: +async def process( + document_id: int, session: AsyncSession, *, use_deep: bool = False +) -> None: """문서 분류 + 요약 + tier triage. 1) Legacy: classify() → ai_domain/document_type/ai_tags/ai_confidence/ai_suggestion 2) Legacy: summarize() → ai_summary 3) PR-B B-1: summary_triage (4B) → ai_tldr/ai_bullets/ai_analysis_tier='triage' + use_deep (2026-06-12 fair-share, queue_drain 전용): triage LLM 호출을 deep 슬롯 + (맥북, 라우터 경유)으로 보낸다 — sampling 은 triage 의 temperature/max_tokens 를 + 유지(분류 결정성), endpoint 만 교체. 맥북 불가 = StageDeferred 전파(drain 이 + 보류 처리). False(기본/consumer) = 기존 call_triage(맥미니 직접) 그대로. + 예외 — source_channel='law_monitor': 법령은 외부 source-of-truth (law.go.kr) 보유 + immutable + 자동 재수집. AI 분류는 무가치 + 본문 해석 환각 위험. 26B legacy + 4B triage 전부 skip. @@ -590,7 +597,18 @@ async def _run_tier_triage(client: AIClient, doc: Document, session: AsyncSessio prompt = rendered.replace("{extracted_text}", text[:TRIAGE_TEXT_LIMIT]) try: - raw_triage = await client.call_triage(prompt) + if use_deep and settings.ai.deep is not None: + # drain 전용 — deep 슬롯 endpoint + triage sampling (결정성 유지). + # 맥북 불가(StageDeferred)는 아래 generic except 에 먹히지 않게 먼저 전파. + deep_triage_cfg = settings.ai.deep.model_copy(update={ + "temperature": settings.ai.triage.temperature, + "max_tokens": settings.ai.triage.max_tokens, + }) + raw_triage = await call_deep_or_defer(client, prompt, cfg=deep_triage_cfg) + else: + raw_triage = await client.call_triage(prompt) + except StageDeferred: + raise # drain 이 attempts 미소모 + 백오프로 처리 (sleep-안전) except Exception as exc: logger.warning( "[triage] 4B 호출 실패 id=%s type=%s repr=%r", diff --git a/app/workers/deep_summary_worker.py b/app/workers/deep_summary_worker.py index a84fa5a..728c181 100644 --- a/app/workers/deep_summary_worker.py +++ b/app/workers/deep_summary_worker.py @@ -54,8 +54,18 @@ class DeepSummaryOutput(BaseModel): confidence: float = 0.5 -async def process(document_id: int, session: AsyncSession) -> None: - """deep_summary 큐 pickup → 26B 호출 → 필드 저장.""" +async def process( + document_id: int, session: AsyncSession, *, defer_on_deep_unavailable: bool = False +) -> None: + """deep_summary 큐 pickup → LLM 호출 → 필드 저장. + + defer_on_deep_unavailable: + False (기본, consumer 경로) = 맥북(deep 슬롯) 우선 시도, 불가 시 즉시 + 맥미니 primary 로 처리. 2026-06-12 fair-share: 양 머신이 동일 모델 + (Qwen3.6-27B-6bit)이라 폴백 = 품질 강등이 아니라 단순 분배. + True (queue_drain 전용) = 맥북 불가를 StageDeferred 로 올려 drain 이 + 보류 후 run 을 멈춘다 (drain = 맥북 분담 전용 레버 시멘틱 유지). + """ doc = await session.get(Document, document_id) if not doc: raise ValueError(f"deep_summary: document id={document_id} 없음") @@ -111,16 +121,26 @@ async def process(document_id: int, session: AsyncSession) -> None: try: start = time.perf_counter() if deep_cfg is not None: - # 맥북 경유 — 맥미니 mlx gate 미점유(게이트는 맥미니 보호 목적). 맥북 불가 - # (503/연결/생성 중 sleep 절단)는 StageDeferred = 보류, 맥미니 강등 없음. - # doc 쓰기는 완주+파싱 후에만 일어나므로 어느 시점에 끊겨도 부분 쓰기 0. - raw = await call_deep_or_defer(client, prompt) + # 맥북 우선 — 맥미니 mlx gate 미점유(별 endpoint). doc 쓰기는 완주+파싱 + # 후에만 일어나므로 어느 시점에 끊겨도 부분 쓰기 0. + try: + raw = await call_deep_or_defer(client, prompt) + except StageDeferred: + if defer_on_deep_unavailable: + raise # drain 전용 — 맥북 레버 시멘틱 (보류 후 run 종료) + # consumer 경로: 동일 모델이라 강등 아님 — 맥미니가 즉시 처리 (2026-06-12) + logger.info( + f"[deep] id={document_id} 맥북 불가 → 맥미니 primary 처리 (fair-share)" + ) + used_cfg = settings.ai.primary + async with acquire_mlx_gate(Priority.BACKGROUND): + raw = await client.call_primary(prompt) else: async with acquire_mlx_gate(Priority.BACKGROUND): # 2026-05-17 B-1: classify-escalate worker raw = await client.call_primary(prompt) latency_ms = int((time.perf_counter() - start) * 1000) except StageDeferred: - # 보류는 실패가 아님 — analyze_event 미기록(가짜 완료 방지), consumer 가 백오프 기록. + # 보류는 실패가 아님 — analyze_event 미기록(가짜 완료 방지), drain 이 백오프 기록. logger.info(f"[deep] id={document_id} 맥북 일시 불가 — 보류 (deferred)") raise except Exception as exc: diff --git a/app/workers/queue_drain.py b/app/workers/queue_drain.py index bfe2cbc..5859378 100644 --- a/app/workers/queue_drain.py +++ b/app/workers/queue_drain.py @@ -30,9 +30,12 @@ from models.queue import ProcessingQueue, StageDeferred, not_deferred_condition logger = setup_logger("queue_drain") -# summarize = 맥미니 백로그 본체 / deep_summary = 심층 (consumer 도 deep 슬롯 시 맥북 경유). -# classify 는 triage 경량 호출이라 맥미니 적합 — 대상에서 제외 (plan Q-4). -DRAIN_STAGES = ("summarize", "deep_summary") +# summarize = 맥미니 백로그 본체 / deep_summary = 심층 / classify = triage 분류. +# classify 는 2026-06-12 fair-share 로 합류 — 구 제외 사유(plan Q-4 "triage 경량 = 맥미니 +# 적합")는 Gemma a4b(42 tok/s) 전제. Qwen 27B 전환 후 classify 가 장문 프리필로 컨슈머 +# 사이클을 점유하는 최대 병목이라, 맥북(프리필 ~5배)이 가장 효과적인 분담처다. +# classify 완료 시 enqueue_next_stage(embed/chunk/markdown) 필수 — 누락 = DAG 단절. +DRAIN_STAGES = ("summarize", "deep_summary", "classify") async def _claim_one(stage: str) -> tuple[int, int] | None: @@ -98,14 +101,16 @@ async def _mark_failed(queue_id: int, exc: Exception) -> None: async def drain(stage: str, limit: int, defer_retries: int = 5, defer_wait: int = 120) -> None: if stage not in DRAIN_STAGES: - raise SystemExit(f"--stage 는 {DRAIN_STAGES} 만 허용 (classify 등은 맥미니 적합 — plan Q-4)") + raise SystemExit(f"--stage 는 {DRAIN_STAGES} 만 허용") if settings.ai.deep is None: raise SystemExit( "config.yaml ai.models.deep 슬롯 미구성 — drain 은 맥북 분담 전용 레버라 진행하지 않음" " (맥미니로의 silent 강등 금지)" ) + from workers.classify_worker import process as classify_process from workers.deep_summary_worker import process as deep_summary_process + from workers.queue_consumer import enqueue_next_stage from workers.summarize_worker import process as summarize_process done = failed = 0 @@ -121,11 +126,18 @@ async def drain(stage: str, limit: int, defer_retries: int = 5, defer_wait: int async with async_session() as worker_session: if stage == "summarize": await summarize_process(document_id, worker_session, use_deep=True) + elif stage == "classify": + await classify_process(document_id, worker_session, use_deep=True) else: - # deep_summary 는 deep 슬롯 구성 시 워커가 자체적으로 맥북 경유 - await deep_summary_process(document_id, worker_session) + # deep_summary: drain 은 맥북 전용 레버 — 불가 시 보류(폴백은 consumer 만) + await deep_summary_process( + document_id, worker_session, defer_on_deep_unavailable=True + ) await worker_session.commit() await _mark_completed(queue_id) + # 다음 stage 연쇄 — classify 는 embed/chunk/markdown enqueue (consumer 와 동형, + # summarize/deep_summary 는 next_stages 미등록이라 no-op) + await enqueue_next_stage(document_id, stage) done += 1 consecutive_defers = 0 logger.info(f"[drain:{stage}] {done}/{limit} doc={document_id} 완료") diff --git a/config.yaml b/config.yaml index 8f857ad..0d9afa3 100644 --- a/config.yaml +++ b/config.yaml @@ -199,3 +199,8 @@ schedule: # 이력: 2026-06-11 맥미니 모델 확정까지 8키 홀드 → 同日 Qwen3.6-27B-6bit 전환과 함께 해제([]). pipeline: held_stages: [] + # mlx gate 동시 실행 상한 (2026-06-12 fair-share): 구 "1 고정" 룰의 전제(single-inference + # 서버)가 소멸 — 현 mlx_vlm 은 continuous batching (2026-06-11 밤 6~8 concurrent 실측 정상). + # 2 = 워커 LLM 호출과 인터랙티브(ask/eid)가 서로 안 막힘 + 집계 throughput ~1.8배. + # 게이트(상한+우선순위)는 유지 — thundering herd 방지. 1 로 되돌리면 구 동작. + mlx_gate_concurrency: 2 diff --git a/tests/test_fair_share.py b/tests/test_fair_share.py new file mode 100644 index 0000000..db67374 --- /dev/null +++ b/tests/test_fair_share.py @@ -0,0 +1,150 @@ +"""2026-06-12 fair-share 번들 — gate capacity 일반화 / call_deep_or_defer cfg / drain classify. + +worker-process 레벨(DB 필요)의 deep 폴백·classify drain 은 라이브 E2E 로 검증하고, +여기서는 새 메커니즘의 seam 만 단위 검증한다. +""" + +from __future__ import annotations + +import asyncio +from types import SimpleNamespace + +import httpx +import pytest + +from core.config import settings +from services.search.llm_gate import Priority, _reset_for_test, acquire_mlx_gate, gate_status + + +@pytest.fixture(autouse=True) +def _reset_gate(monkeypatch): + monkeypatch.setattr(settings, "mlx_gate_concurrency", 2) + _reset_for_test() + yield + _reset_for_test() + + +# ─── gate capacity 2 ───────────────────────────────────────────────────────── + + +@pytest.mark.asyncio +async def test_two_concurrent_holders_overlap(): + """capacity=2: 두 holder 가 동시에 inflight — 서로를 기다리지 않는다.""" + log: list = [] + + async def hold(label: str): + async with acquire_mlx_gate(Priority.BACKGROUND): + log.append(("in", label)) + await asyncio.sleep(0.05) + log.append(("out", label)) + + await asyncio.gather(hold("a"), hold("b")) + # 둘 다 진입한 뒤에 첫 release 가 나와야 함 (overlap 증명) + assert log[0][0] == "in" and log[1][0] == "in" + + +@pytest.mark.asyncio +async def test_third_waits_until_slot_frees(): + """capacity=2: 3번째는 대기, 첫 release 후 진입.""" + order: list = [] + release_a = asyncio.Event() + + async def hold(label: str, wait_event: asyncio.Event | None): + async with acquire_mlx_gate(Priority.BACKGROUND): + order.append(("in", label)) + if wait_event: + await wait_event.wait() + else: + await asyncio.sleep(0.01) + order.append(("out", label)) + + t_a = asyncio.create_task(hold("a", release_a)) + t_b = asyncio.create_task(hold("b", release_a)) + await asyncio.sleep(0.02) # a, b inflight 진입 대기 + assert ("in", "a") in order and ("in", "b") in order + assert gate_status()["inflight"] == 2 + + t_c = asyncio.create_task(hold("c", None)) + await asyncio.sleep(0.02) + assert ("in", "c") not in order # 슬롯 2 점유 중 — c 는 대기 + assert gate_status()["waiters"] == 1 + + release_a.set() + await asyncio.gather(t_a, t_b, t_c) + assert ("in", "c") in order + + +@pytest.mark.asyncio +async def test_capacity_one_serializes(): + """concurrency=1 이면 기존 직렬화 그대로 (무회귀).""" + from core.config import settings as s + + s_backup = s.mlx_gate_concurrency + s.mlx_gate_concurrency = 1 + try: + _reset_for_test() + log: list = [] + + async def hold(label: str): + async with acquire_mlx_gate(Priority.BACKGROUND): + log.append(("in", label)) + await asyncio.sleep(0.02) + log.append(("out", label)) + + await asyncio.gather(hold("a"), hold("b")) + # 직렬: in/out 쌍이 겹치지 않음 + assert [e[0] for e in log] == ["in", "out", "in", "out"] + finally: + s.mlx_gate_concurrency = s_backup + + +# ─── call_deep_or_defer cfg override ───────────────────────────────────────── + + +@pytest.mark.asyncio +async def test_call_deep_or_defer_cfg_override(): + """cfg 지정 시 deep 슬롯 대신 해당 config 로 _request 호출.""" + from ai.client import call_deep_or_defer + + seen: dict = {} + + class FakeClient: + ai = SimpleNamespace(deep=SimpleNamespace(model="deep-slot")) + + async def _request(self, cfg, prompt, system=None): + seen["cfg"] = cfg + return "ok" + + async def call_deep(self, prompt, system=None): + seen["cfg"] = self.ai.deep + return "ok" + + override = SimpleNamespace(model="deep-endpoint-triage-sampling", temperature=0.0) + out = await call_deep_or_defer(FakeClient(), "p", cfg=override) + assert out == "ok" + assert seen["cfg"] is override + + +@pytest.mark.asyncio +async def test_call_deep_or_defer_cfg_still_defers(): + """cfg 경로에서도 보류 분류(502/503/TransportError → StageDeferred) 동일 적용.""" + from ai.client import call_deep_or_defer + from models.queue import StageDeferred + + class FakeClient: + ai = SimpleNamespace(deep=SimpleNamespace(model="deep-slot")) + + async def _request(self, cfg, prompt, system=None): + raise httpx.ConnectError("down") + + with pytest.raises(StageDeferred): + await call_deep_or_defer(FakeClient(), "p", cfg=SimpleNamespace(model="x")) + + +# ─── drain stages ──────────────────────────────────────────────────────────── + + +def test_drain_stages_include_classify(): + from workers.queue_drain import DRAIN_STAGES + + assert set(DRAIN_STAGES) == {"summarize", "deep_summary", "classify"} diff --git a/tests/test_priority_gate.py b/tests/test_priority_gate.py index ae0eecf..169ad7a 100644 --- a/tests/test_priority_gate.py +++ b/tests/test_priority_gate.py @@ -20,8 +20,14 @@ from services.search.llm_gate import ( @pytest.fixture(autouse=True) -def _reset_gate(): - """각 테스트 시작 시 gate 상태 reset (fresh event loop 마다).""" +def _reset_gate(monkeypatch): + """각 테스트 시작 시 gate 상태 reset (fresh event loop 마다). + + 2026-06-12 capacity 일반화 이후 본 파일의 직렬화 가정 보존을 위해 + concurrency=1 로 고정 (capacity>1 동작은 test_fair_share.py 가 커버). + """ + from core.config import settings + monkeypatch.setattr(settings, "mlx_gate_concurrency", 1) _reset_for_test() yield _reset_for_test()