From 2d866836366b7626220cff0bf901339d22cd206b Mon Sep 17 00:00:00 2001 From: hyungi Date: Fri, 26 Jun 2026 20:07:30 +0900 Subject: [PATCH] =?UTF-8?q?refactor(ai):=20AIClient=20PR-B=20=E2=80=94=20g?= =?UTF-8?q?ate=20=EB=88=84=EB=9D=BD=20=EA=B2=BD=EB=A1=9C=20=EB=B4=89?= =?UTF-8?q?=EC=9D=B8=20+=20=EA=B3=B5=EC=9C=A0=20httpx=20+=20public=20class?= =?UTF-8?q?ifier/verifier?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 코드리뷰 AIClient 정비 PR-B (#2 gate·#3 httpx·#4 public). #2 gate 구조 (call-site 컨벤션 — gate 는 caller-managed, AIClient self-gate 금지): · classify_worker consumer call_triage: gate 없이 Mac mini 직타하던 것 → acquire_mlx_gate(BACKGROUND). (drain 경로 call_deep_or_defer 는 맥북 deep 슬롯이라 mini gate 무관, 미적용.) · verifier_service: gate 없이 _request(verifier) 하던 것 → acquire_mlx_gate(FOREGROUND) + call_verifier. classifier/evidence 와 동일 gate 공유로 thundering-herd(22-timeout 사고) 방어. ★재진입 안전 검증: AIClient 메서드 내부 self-gate 0(전부 call-site) + evidence/classifier 는 이미 독립 gate 보유 + api/search 오케스트레이터 gate 미보유 → double-acquire 데드락 불가. #4 public 메서드: call_classifier/call_verifier 추가 → classifier/verifier_service 의 private _request 직접호출 봉인(egress 가드 일관 적용). gate 는 caller-managed 유지(call_primary 와 동일 계약). #3 공유 httpx: 호출마다 AsyncClient 생성(30+ 사이트)을 _get_shared_http() 단일 풀로 — keep-alive 재사용. 이벤트루프 바인딩이라 루프 변경(테스트) 시 재생성, close() 는 no-op. py_compile PASS. (잔여 #4: query_analyzer/digest/backends 의 _request·_call_chat 직접호출은 gated 라 안전, 후속 sweep.) Co-Authored-By: Claude Opus 4.8 (1M context) --- app/ai/client.py | 42 +++++++++++++++++++++-- app/services/search/classifier_service.py | 2 +- app/services/search/verifier_service.py | 8 +++-- app/workers/classify_worker.py | 6 +++- 4 files changed, 51 insertions(+), 7 deletions(-) diff --git a/app/ai/client.py b/app/ai/client.py index 3c278a0..cd87f5b 100644 --- a/app/ai/client.py +++ b/app/ai/client.py @@ -1,5 +1,6 @@ """AI 추상화 레이어 — 통합 클라이언트. 기본값은 항상 Qwen3.5.""" +import asyncio import json import re from pathlib import Path @@ -188,6 +189,25 @@ def _load_prompt(name: str) -> str: CLASSIFY_PROMPT = _load_prompt("classify.txt") if (PROMPTS_DIR / "classify.txt").exists() else "" +# 공유 httpx 클라이언트 — 호출마다 AsyncClient 를 새로 만들던 것(30+ 사이트, 연결풀 재사용 0)을 +# 일원화해 keep-alive 재사용. 이벤트루프 바인딩이라 루프 변경(pytest 격리 등) 시 재생성한다. +# close() 는 공유 풀이라 no-op — 프로세스 종료 시 GC. +_shared_http: httpx.AsyncClient | None = None +_shared_http_loop: object | None = None + + +def _get_shared_http() -> httpx.AsyncClient: + global _shared_http, _shared_http_loop + try: + loop: object | None = asyncio.get_running_loop() + except RuntimeError: + loop = None + if _shared_http is None or _shared_http.is_closed or _shared_http_loop is not loop: + _shared_http = httpx.AsyncClient(timeout=120) + _shared_http_loop = loop + return _shared_http + + class AIClient: """AI 모델 통합 클라이언트. @@ -202,7 +222,7 @@ class AIClient: def __init__(self): self.ai = settings.ai - self._http = httpx.AsyncClient(timeout=120) + self._http = _get_shared_http() # ─── 3-tier routing (B-0) ─────────────────────────────────────────────── @@ -240,6 +260,23 @@ class AIClient: cfg = self.ai.deep or self.ai.primary return await self._request(cfg, prompt, system=system) + async def call_classifier(self, prompt: str) -> str: + """answerability classifier (config ai.classifier, Mac mini 26B MLX). + + private _request 직접 호출(classifier_service)을 봉인하는 public 진입점. gate 는 + caller(classifier_service)가 acquire_mlx_gate 로 관리 — call_primary 와 동일한 + caller-managed 계약(여기서 self-gate 하면 caller 와 double-acquire 데드락). + """ + return await self._request(self.ai.classifier, prompt) + + async def call_verifier(self, prompt: str) -> str: + """semantic verifier (config ai.verifier, Mac mini 26B MLX). + + private _request 직접 호출(verifier_service)을 봉인. gate 는 caller(verifier_service) + 가 관리(caller-managed — self-gate 금지). + """ + return await self._request(self.ai.verifier, prompt) + # ─── Legacy API (classify_worker 교체 시 제거 예정) ─────────────────── async def classify(self, text: str, cfg=None) -> dict: @@ -360,4 +397,5 @@ class AIClient: return data["choices"][0]["message"]["content"] async def close(self): - await self._http.aclose() + # 공유 풀(_get_shared_http) 이라 per-use close 안 함 — 연결 재사용. 프로세스 종료 시 GC. + return None diff --git a/app/services/search/classifier_service.py b/app/services/search/classifier_service.py index 626b5c9..da44f48 100644 --- a/app/services/search/classifier_service.py +++ b/app/services/search/classifier_service.py @@ -102,7 +102,7 @@ async def classify( # "MLX primary 호출 경로는 예외 없이 gate 획득 필수". async with acquire_mlx_gate(Priority.FOREGROUND): async with asyncio.timeout(LLM_TIMEOUT_MS / 1000): - raw = await client._request(settings.ai.classifier, prompt) + raw = await client.call_classifier(prompt) _failure_count = 0 except asyncio.TimeoutError: _failure_count += 1 diff --git a/app/services/search/verifier_service.py b/app/services/search/verifier_service.py index 757986b..c2a8fe5 100644 --- a/app/services/search/verifier_service.py +++ b/app/services/search/verifier_service.py @@ -11,7 +11,7 @@ ## 핵심 원칙 - **Verifier strong 단독 refuse 금지** — grounding strong 과 교차해야 refuse - **Timeout 3s** — 느리면 없는 게 낫다 (fail open) -- MLX gate 미사용 (PR #20 이후 Mac mini 26B endpoint — concurrent 안전성 별 검토) +- MLX gate 사용 (Mac mini 26B endpoint — classifier/evidence 와 동일 gate 공유, 동시 race 방지) """ from __future__ import annotations @@ -25,6 +25,7 @@ from typing import TYPE_CHECKING, Literal from ai.client import AIClient, _load_prompt, parse_json_response from core.config import settings from core.utils import setup_logger +from .llm_gate import Priority, acquire_mlx_gate if TYPE_CHECKING: from .evidence_service import EvidenceItem @@ -132,8 +133,9 @@ async def verify( prompt = _build_input(answer, evidence) client = AIClient() try: - async with asyncio.timeout(LLM_TIMEOUT_MS / 1000): - raw = await client._request(settings.ai.verifier, prompt) + async with acquire_mlx_gate(Priority.FOREGROUND): + async with asyncio.timeout(LLM_TIMEOUT_MS / 1000): + raw = await client.call_verifier(prompt) _failure_count = 0 except asyncio.TimeoutError: _failure_count += 1 diff --git a/app/workers/classify_worker.py b/app/workers/classify_worker.py index 8a14982..9d9d3b0 100644 --- a/app/workers/classify_worker.py +++ b/app/workers/classify_worker.py @@ -40,6 +40,7 @@ from ai.client import ( ) from ai.envelope import EscalationEnvelope from core.config import settings +from services.search.llm_gate import Priority, acquire_mlx_gate from core.utils import setup_logger from models.document import Document from models.queue import StageDeferred, enqueue_stage @@ -673,7 +674,10 @@ async def _run_tier_triage( # 는 아래 generic except 에 먹히지 않게 먼저 전파. raw_triage = await call_deep_or_defer(client, prompt, cfg=deep_triage_cfg) else: - raw_triage = await client.call_triage(prompt) + # consumer 경로 call_triage 는 PR #20 이후 primary 와 동일 Mac mini endpoint — + # evidence/classifier 처럼 gate 안에서 호출(영구 룰: 같은 endpoint 예외 없이 gate). + async with acquire_mlx_gate(Priority.BACKGROUND): + raw_triage = await client.call_triage(prompt) except StageDeferred: raise # drain 이 attempts 미소모 + 백오프로 처리 (sleep-안전) except Exception as exc: