refactor(ai): AIClient PR-B — gate 누락 경로 봉인 + 공유 httpx + public classifier/verifier
코드리뷰 AIClient 정비 PR-B (#2 gate·#3 httpx·#4 public). #2 gate 구조 (call-site 컨벤션 — gate 는 caller-managed, AIClient self-gate 금지): · classify_worker consumer call_triage: gate 없이 Mac mini 직타하던 것 → acquire_mlx_gate(BACKGROUND). (drain 경로 call_deep_or_defer 는 맥북 deep 슬롯이라 mini gate 무관, 미적용.) · verifier_service: gate 없이 _request(verifier) 하던 것 → acquire_mlx_gate(FOREGROUND) + call_verifier. classifier/evidence 와 동일 gate 공유로 thundering-herd(22-timeout 사고) 방어. ★재진입 안전 검증: AIClient 메서드 내부 self-gate 0(전부 call-site) + evidence/classifier 는 이미 독립 gate 보유 + api/search 오케스트레이터 gate 미보유 → double-acquire 데드락 불가. #4 public 메서드: call_classifier/call_verifier 추가 → classifier/verifier_service 의 private _request 직접호출 봉인(egress 가드 일관 적용). gate 는 caller-managed 유지(call_primary 와 동일 계약). #3 공유 httpx: 호출마다 AsyncClient 생성(30+ 사이트)을 _get_shared_http() 단일 풀로 — keep-alive 재사용. 이벤트루프 바인딩이라 루프 변경(테스트) 시 재생성, close() 는 no-op. py_compile PASS. (잔여 #4: query_analyzer/digest/backends 의 _request·_call_chat 직접호출은 gated 라 안전, 후속 sweep.) Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
+40
-2
@@ -1,5 +1,6 @@
|
||||
"""AI 추상화 레이어 — 통합 클라이언트. 기본값은 항상 Qwen3.5."""
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
import re
|
||||
from pathlib import Path
|
||||
@@ -188,6 +189,25 @@ def _load_prompt(name: str) -> str:
|
||||
CLASSIFY_PROMPT = _load_prompt("classify.txt") if (PROMPTS_DIR / "classify.txt").exists() else ""
|
||||
|
||||
|
||||
# 공유 httpx 클라이언트 — 호출마다 AsyncClient 를 새로 만들던 것(30+ 사이트, 연결풀 재사용 0)을
|
||||
# 일원화해 keep-alive 재사용. 이벤트루프 바인딩이라 루프 변경(pytest 격리 등) 시 재생성한다.
|
||||
# close() 는 공유 풀이라 no-op — 프로세스 종료 시 GC.
|
||||
_shared_http: httpx.AsyncClient | None = None
|
||||
_shared_http_loop: object | None = None
|
||||
|
||||
|
||||
def _get_shared_http() -> httpx.AsyncClient:
|
||||
global _shared_http, _shared_http_loop
|
||||
try:
|
||||
loop: object | None = asyncio.get_running_loop()
|
||||
except RuntimeError:
|
||||
loop = None
|
||||
if _shared_http is None or _shared_http.is_closed or _shared_http_loop is not loop:
|
||||
_shared_http = httpx.AsyncClient(timeout=120)
|
||||
_shared_http_loop = loop
|
||||
return _shared_http
|
||||
|
||||
|
||||
class AIClient:
|
||||
"""AI 모델 통합 클라이언트.
|
||||
|
||||
@@ -202,7 +222,7 @@ class AIClient:
|
||||
|
||||
def __init__(self):
|
||||
self.ai = settings.ai
|
||||
self._http = httpx.AsyncClient(timeout=120)
|
||||
self._http = _get_shared_http()
|
||||
|
||||
# ─── 3-tier routing (B-0) ───────────────────────────────────────────────
|
||||
|
||||
@@ -240,6 +260,23 @@ class AIClient:
|
||||
cfg = self.ai.deep or self.ai.primary
|
||||
return await self._request(cfg, prompt, system=system)
|
||||
|
||||
async def call_classifier(self, prompt: str) -> str:
|
||||
"""answerability classifier (config ai.classifier, Mac mini 26B MLX).
|
||||
|
||||
private _request 직접 호출(classifier_service)을 봉인하는 public 진입점. gate 는
|
||||
caller(classifier_service)가 acquire_mlx_gate 로 관리 — call_primary 와 동일한
|
||||
caller-managed 계약(여기서 self-gate 하면 caller 와 double-acquire 데드락).
|
||||
"""
|
||||
return await self._request(self.ai.classifier, prompt)
|
||||
|
||||
async def call_verifier(self, prompt: str) -> str:
|
||||
"""semantic verifier (config ai.verifier, Mac mini 26B MLX).
|
||||
|
||||
private _request 직접 호출(verifier_service)을 봉인. gate 는 caller(verifier_service)
|
||||
가 관리(caller-managed — self-gate 금지).
|
||||
"""
|
||||
return await self._request(self.ai.verifier, prompt)
|
||||
|
||||
# ─── Legacy API (classify_worker 교체 시 제거 예정) ───────────────────
|
||||
|
||||
async def classify(self, text: str, cfg=None) -> dict:
|
||||
@@ -360,4 +397,5 @@ class AIClient:
|
||||
return data["choices"][0]["message"]["content"]
|
||||
|
||||
async def close(self):
|
||||
await self._http.aclose()
|
||||
# 공유 풀(_get_shared_http) 이라 per-use close 안 함 — 연결 재사용. 프로세스 종료 시 GC.
|
||||
return None
|
||||
|
||||
@@ -102,7 +102,7 @@ async def classify(
|
||||
# "MLX primary 호출 경로는 예외 없이 gate 획득 필수".
|
||||
async with acquire_mlx_gate(Priority.FOREGROUND):
|
||||
async with asyncio.timeout(LLM_TIMEOUT_MS / 1000):
|
||||
raw = await client._request(settings.ai.classifier, prompt)
|
||||
raw = await client.call_classifier(prompt)
|
||||
_failure_count = 0
|
||||
except asyncio.TimeoutError:
|
||||
_failure_count += 1
|
||||
|
||||
@@ -11,7 +11,7 @@
|
||||
## 핵심 원칙
|
||||
- **Verifier strong 단독 refuse 금지** — grounding strong 과 교차해야 refuse
|
||||
- **Timeout 3s** — 느리면 없는 게 낫다 (fail open)
|
||||
- MLX gate 미사용 (PR #20 이후 Mac mini 26B endpoint — concurrent 안전성 별 검토)
|
||||
- MLX gate 사용 (Mac mini 26B endpoint — classifier/evidence 와 동일 gate 공유, 동시 race 방지)
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
@@ -25,6 +25,7 @@ from typing import TYPE_CHECKING, Literal
|
||||
from ai.client import AIClient, _load_prompt, parse_json_response
|
||||
from core.config import settings
|
||||
from core.utils import setup_logger
|
||||
from .llm_gate import Priority, acquire_mlx_gate
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from .evidence_service import EvidenceItem
|
||||
@@ -132,8 +133,9 @@ async def verify(
|
||||
prompt = _build_input(answer, evidence)
|
||||
client = AIClient()
|
||||
try:
|
||||
async with asyncio.timeout(LLM_TIMEOUT_MS / 1000):
|
||||
raw = await client._request(settings.ai.verifier, prompt)
|
||||
async with acquire_mlx_gate(Priority.FOREGROUND):
|
||||
async with asyncio.timeout(LLM_TIMEOUT_MS / 1000):
|
||||
raw = await client.call_verifier(prompt)
|
||||
_failure_count = 0
|
||||
except asyncio.TimeoutError:
|
||||
_failure_count += 1
|
||||
|
||||
@@ -40,6 +40,7 @@ from ai.client import (
|
||||
)
|
||||
from ai.envelope import EscalationEnvelope
|
||||
from core.config import settings
|
||||
from services.search.llm_gate import Priority, acquire_mlx_gate
|
||||
from core.utils import setup_logger
|
||||
from models.document import Document
|
||||
from models.queue import StageDeferred, enqueue_stage
|
||||
@@ -673,7 +674,10 @@ async def _run_tier_triage(
|
||||
# 는 아래 generic except 에 먹히지 않게 먼저 전파.
|
||||
raw_triage = await call_deep_or_defer(client, prompt, cfg=deep_triage_cfg)
|
||||
else:
|
||||
raw_triage = await client.call_triage(prompt)
|
||||
# consumer 경로 call_triage 는 PR #20 이후 primary 와 동일 Mac mini endpoint —
|
||||
# evidence/classifier 처럼 gate 안에서 호출(영구 룰: 같은 endpoint 예외 없이 gate).
|
||||
async with acquire_mlx_gate(Priority.BACKGROUND):
|
||||
raw_triage = await client.call_triage(prompt)
|
||||
except StageDeferred:
|
||||
raise # drain 이 attempts 미소모 + 백오프로 처리 (sleep-안전)
|
||||
except Exception as exc:
|
||||
|
||||
Reference in New Issue
Block a user