diff --git a/app/ai/client.py b/app/ai/client.py index 3c278a0..cd87f5b 100644 --- a/app/ai/client.py +++ b/app/ai/client.py @@ -1,5 +1,6 @@ """AI 추상화 레이어 — 통합 클라이언트. 기본값은 항상 Qwen3.5.""" +import asyncio import json import re from pathlib import Path @@ -188,6 +189,25 @@ def _load_prompt(name: str) -> str: CLASSIFY_PROMPT = _load_prompt("classify.txt") if (PROMPTS_DIR / "classify.txt").exists() else "" +# 공유 httpx 클라이언트 — 호출마다 AsyncClient 를 새로 만들던 것(30+ 사이트, 연결풀 재사용 0)을 +# 일원화해 keep-alive 재사용. 이벤트루프 바인딩이라 루프 변경(pytest 격리 등) 시 재생성한다. +# close() 는 공유 풀이라 no-op — 프로세스 종료 시 GC. +_shared_http: httpx.AsyncClient | None = None +_shared_http_loop: object | None = None + + +def _get_shared_http() -> httpx.AsyncClient: + global _shared_http, _shared_http_loop + try: + loop: object | None = asyncio.get_running_loop() + except RuntimeError: + loop = None + if _shared_http is None or _shared_http.is_closed or _shared_http_loop is not loop: + _shared_http = httpx.AsyncClient(timeout=120) + _shared_http_loop = loop + return _shared_http + + class AIClient: """AI 모델 통합 클라이언트. @@ -202,7 +222,7 @@ class AIClient: def __init__(self): self.ai = settings.ai - self._http = httpx.AsyncClient(timeout=120) + self._http = _get_shared_http() # ─── 3-tier routing (B-0) ─────────────────────────────────────────────── @@ -240,6 +260,23 @@ class AIClient: cfg = self.ai.deep or self.ai.primary return await self._request(cfg, prompt, system=system) + async def call_classifier(self, prompt: str) -> str: + """answerability classifier (config ai.classifier, Mac mini 26B MLX). + + private _request 직접 호출(classifier_service)을 봉인하는 public 진입점. gate 는 + caller(classifier_service)가 acquire_mlx_gate 로 관리 — call_primary 와 동일한 + caller-managed 계약(여기서 self-gate 하면 caller 와 double-acquire 데드락). + """ + return await self._request(self.ai.classifier, prompt) + + async def call_verifier(self, prompt: str) -> str: + """semantic verifier (config ai.verifier, Mac mini 26B MLX). + + private _request 직접 호출(verifier_service)을 봉인. gate 는 caller(verifier_service) + 가 관리(caller-managed — self-gate 금지). + """ + return await self._request(self.ai.verifier, prompt) + # ─── Legacy API (classify_worker 교체 시 제거 예정) ─────────────────── async def classify(self, text: str, cfg=None) -> dict: @@ -360,4 +397,5 @@ class AIClient: return data["choices"][0]["message"]["content"] async def close(self): - await self._http.aclose() + # 공유 풀(_get_shared_http) 이라 per-use close 안 함 — 연결 재사용. 프로세스 종료 시 GC. + return None diff --git a/app/services/search/classifier_service.py b/app/services/search/classifier_service.py index 626b5c9..da44f48 100644 --- a/app/services/search/classifier_service.py +++ b/app/services/search/classifier_service.py @@ -102,7 +102,7 @@ async def classify( # "MLX primary 호출 경로는 예외 없이 gate 획득 필수". async with acquire_mlx_gate(Priority.FOREGROUND): async with asyncio.timeout(LLM_TIMEOUT_MS / 1000): - raw = await client._request(settings.ai.classifier, prompt) + raw = await client.call_classifier(prompt) _failure_count = 0 except asyncio.TimeoutError: _failure_count += 1 diff --git a/app/services/search/verifier_service.py b/app/services/search/verifier_service.py index 757986b..c2a8fe5 100644 --- a/app/services/search/verifier_service.py +++ b/app/services/search/verifier_service.py @@ -11,7 +11,7 @@ ## 핵심 원칙 - **Verifier strong 단독 refuse 금지** — grounding strong 과 교차해야 refuse - **Timeout 3s** — 느리면 없는 게 낫다 (fail open) -- MLX gate 미사용 (PR #20 이후 Mac mini 26B endpoint — concurrent 안전성 별 검토) +- MLX gate 사용 (Mac mini 26B endpoint — classifier/evidence 와 동일 gate 공유, 동시 race 방지) """ from __future__ import annotations @@ -25,6 +25,7 @@ from typing import TYPE_CHECKING, Literal from ai.client import AIClient, _load_prompt, parse_json_response from core.config import settings from core.utils import setup_logger +from .llm_gate import Priority, acquire_mlx_gate if TYPE_CHECKING: from .evidence_service import EvidenceItem @@ -132,8 +133,9 @@ async def verify( prompt = _build_input(answer, evidence) client = AIClient() try: - async with asyncio.timeout(LLM_TIMEOUT_MS / 1000): - raw = await client._request(settings.ai.verifier, prompt) + async with acquire_mlx_gate(Priority.FOREGROUND): + async with asyncio.timeout(LLM_TIMEOUT_MS / 1000): + raw = await client.call_verifier(prompt) _failure_count = 0 except asyncio.TimeoutError: _failure_count += 1 diff --git a/app/workers/classify_worker.py b/app/workers/classify_worker.py index 8a14982..9d9d3b0 100644 --- a/app/workers/classify_worker.py +++ b/app/workers/classify_worker.py @@ -40,6 +40,7 @@ from ai.client import ( ) from ai.envelope import EscalationEnvelope from core.config import settings +from services.search.llm_gate import Priority, acquire_mlx_gate from core.utils import setup_logger from models.document import Document from models.queue import StageDeferred, enqueue_stage @@ -673,7 +674,10 @@ async def _run_tier_triage( # 는 아래 generic except 에 먹히지 않게 먼저 전파. raw_triage = await call_deep_or_defer(client, prompt, cfg=deep_triage_cfg) else: - raw_triage = await client.call_triage(prompt) + # consumer 경로 call_triage 는 PR #20 이후 primary 와 동일 Mac mini endpoint — + # evidence/classifier 처럼 gate 안에서 호출(영구 룰: 같은 endpoint 예외 없이 gate). + async with acquire_mlx_gate(Priority.BACKGROUND): + raw_triage = await client.call_triage(prompt) except StageDeferred: raise # drain 이 attempts 미소모 + 백오프로 처리 (sleep-안전) except Exception as exc: