2d86683636
코드리뷰 AIClient 정비 PR-B (#2 gate·#3 httpx·#4 public). #2 gate 구조 (call-site 컨벤션 — gate 는 caller-managed, AIClient self-gate 금지): · classify_worker consumer call_triage: gate 없이 Mac mini 직타하던 것 → acquire_mlx_gate(BACKGROUND). (drain 경로 call_deep_or_defer 는 맥북 deep 슬롯이라 mini gate 무관, 미적용.) · verifier_service: gate 없이 _request(verifier) 하던 것 → acquire_mlx_gate(FOREGROUND) + call_verifier. classifier/evidence 와 동일 gate 공유로 thundering-herd(22-timeout 사고) 방어. ★재진입 안전 검증: AIClient 메서드 내부 self-gate 0(전부 call-site) + evidence/classifier 는 이미 독립 gate 보유 + api/search 오케스트레이터 gate 미보유 → double-acquire 데드락 불가. #4 public 메서드: call_classifier/call_verifier 추가 → classifier/verifier_service 의 private _request 직접호출 봉인(egress 가드 일관 적용). gate 는 caller-managed 유지(call_primary 와 동일 계약). #3 공유 httpx: 호출마다 AsyncClient 생성(30+ 사이트)을 _get_shared_http() 단일 풀로 — keep-alive 재사용. 이벤트루프 바인딩이라 루프 변경(테스트) 시 재생성, close() 는 no-op. py_compile PASS. (잔여 #4: query_analyzer/digest/backends 의 _request·_call_chat 직접호출은 gated 라 안전, 후속 sweep.) Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
402 lines
17 KiB
Python
402 lines
17 KiB
Python
"""AI 추상화 레이어 — 통합 클라이언트. 기본값은 항상 Qwen3.5."""
|
|
|
|
import asyncio
|
|
import json
|
|
import re
|
|
from pathlib import Path
|
|
|
|
import httpx
|
|
|
|
from core.config import settings
|
|
|
|
|
|
def strip_thinking(text: str) -> str:
|
|
"""Qwen3.5의 <think>...</think> 블록 및 Thinking Process 텍스트 제거"""
|
|
# <think> 태그 제거
|
|
text = re.sub(r"<think>.*?</think>", "", text, flags=re.DOTALL)
|
|
# "Thinking Process:" 등 사고 과정 텍스트 제거 (첫 번째 { 이전의 모든 텍스트)
|
|
json_start = text.find("{")
|
|
if json_start > 0:
|
|
text = text[json_start:]
|
|
return text.strip()
|
|
|
|
|
|
def parse_json_response(raw: str) -> dict | None:
|
|
"""AI 응답에서 JSON 객체 추출 (think 태그, 코드블록 등 제거).
|
|
|
|
파싱 시도 순서 (앞 단계가 성공하면 즉시 반환):
|
|
1. ``` json fenced 블록 안의 첫 ``{...}`` (DOTALL)
|
|
2. balanced 정규식 finditer 의 마지막 매치
|
|
3. 전체 cleaned 그대로 json.loads
|
|
4. (Phase 4-A 후속) "first ``{`` ~ last ``}``" greedy slice — envelope JSON 안에
|
|
내부 따옴표/백틱/뉴라인 때문에 balanced 정규식이 못 잡는 케이스 방어.
|
|
raw text 의 첫 ``{`` 부터 마지막 ``}`` 까지 잘라 json.loads. 모델이 JSON 앞뒤
|
|
자유 텍스트 섞어도 본체만 추출.
|
|
"""
|
|
cleaned = strip_thinking(raw)
|
|
# 1. 코드블록 내부 JSON 추출
|
|
code_match = re.search(r"```(?:json)?\s*(\{.*?\})\s*```", cleaned, re.DOTALL)
|
|
if code_match:
|
|
cleaned = code_match.group(1)
|
|
# 2. 마지막 유효 JSON 객체 찾기 (balanced 1단계)
|
|
matches = list(re.finditer(r"\{[^{}]*(?:\{[^{}]*\}[^{}]*)*\}", cleaned, re.DOTALL))
|
|
for m in reversed(matches):
|
|
try:
|
|
return json.loads(m.group())
|
|
except json.JSONDecodeError:
|
|
continue
|
|
# 3. 전체 cleaned
|
|
try:
|
|
result = json.loads(cleaned)
|
|
if isinstance(result, dict):
|
|
return result
|
|
except json.JSONDecodeError:
|
|
pass
|
|
# 4. greedy slice fallback — first '{' ~ last '}' 까지
|
|
first = cleaned.find("{")
|
|
last = cleaned.rfind("}")
|
|
if first < 0 or last <= first:
|
|
return None
|
|
candidate = cleaned[first : last + 1]
|
|
try:
|
|
obj = json.loads(candidate)
|
|
return obj if isinstance(obj, dict) else None
|
|
except json.JSONDecodeError:
|
|
pass
|
|
# 5. (Phase 4-A 후속) Markdown 줄바꿈 + LaTeX 수식이 JSON string literal 안에
|
|
# raw 로 들어간 케이스 방어. 두 가지 invalid:
|
|
# - raw newline (LF/CR/TAB) — JSON 표준 string 안 control char 금지
|
|
# - invalid backslash — `\circ`, `\text`, `\,` 같은 LaTeX. JSON valid escape
|
|
# 은 `\"`, `\\`, `\/`, `\b`, `\f`, `\n`, `\r`, `\t`, `\uXXXX` 만.
|
|
# stateful walker — string literal 안에서만 fix. 외부 (object 구조) 의 newline
|
|
# 은 valid whitespace 라 보존.
|
|
escaped = _fix_json_string_escapes(candidate)
|
|
try:
|
|
obj = json.loads(escaped)
|
|
return obj if isinstance(obj, dict) else None
|
|
except json.JSONDecodeError:
|
|
return None
|
|
|
|
|
|
_VALID_JSON_ESCAPES = set('"\\/bfnrtu')
|
|
|
|
|
|
def _fix_json_string_escapes(s: str) -> str:
|
|
"""JSON string literal 안의 raw newline + invalid backslash 만 escape.
|
|
|
|
state machine: in_string 토글 (`"` 마주침). string 안에서만:
|
|
- raw LF/CR/TAB → ``\\n``/``\\r``/``\\t`` 로 변환
|
|
- 백슬래시 다음에 valid escape char (`"\\/bfnrtu`) 면 그대로
|
|
- 백슬래시 다음에 invalid char (`\\c`, `\\,`) 면 백슬래시 자체를 ``\\\\`` 로 escape
|
|
string 외부 (`{` `,` `:` 사이) 의 raw newline 등은 JSON whitespace 라 보존.
|
|
"""
|
|
out: list[str] = []
|
|
i = 0
|
|
n = len(s)
|
|
in_string = False
|
|
while i < n:
|
|
ch = s[i]
|
|
if not in_string:
|
|
if ch == '"':
|
|
in_string = True
|
|
out.append(ch)
|
|
i += 1
|
|
continue
|
|
# in_string
|
|
if ch == "\\":
|
|
nxt = s[i + 1] if i + 1 < n else ""
|
|
if nxt in _VALID_JSON_ESCAPES:
|
|
out.append(ch)
|
|
out.append(nxt)
|
|
i += 2
|
|
continue
|
|
# invalid escape — backslash 자체를 escape
|
|
out.append("\\\\")
|
|
i += 1
|
|
continue
|
|
if ch == '"':
|
|
in_string = False
|
|
out.append(ch)
|
|
i += 1
|
|
continue
|
|
if ch == "\n":
|
|
out.append("\\n")
|
|
i += 1
|
|
continue
|
|
if ch == "\r":
|
|
out.append("\\r")
|
|
i += 1
|
|
continue
|
|
if ch == "\t":
|
|
out.append("\\t")
|
|
i += 1
|
|
continue
|
|
out.append(ch)
|
|
i += 1
|
|
return "".join(out)
|
|
|
|
def is_deferrable_error(exc: Exception) -> bool:
|
|
"""deep(맥북 M5 Max) 호출 실패가 '보류(StageDeferred)' 대상인지 분류 (ds-macbook-offload-1).
|
|
|
|
보류 = 맥북 일시 불가 신호:
|
|
- HTTP 503 (라우터 upstream_cold / editor_busy / warming — no-silent-fallback 계약)
|
|
- HTTP 502/504 (라우터가 upstream 연결 실패·생성 도중 절단을 502 로 변환 —
|
|
llm_router.py 실측 4곳. 맥북 sleep 절단이 라우터 경유 토폴로지에선 이걸로 표면화)
|
|
- httpx.TransportError 전계열 (ConnectError·ReadError·RemoteProtocolError +
|
|
ConnectTimeout·ReadTimeout 등) — 라우터 자체 불가 / DS↔라우터 구간 절단.
|
|
그 외(400/500, 파싱/검증 오류 등)는 보류가 아니라 호출자의 기존 실패 경로.
|
|
"""
|
|
if isinstance(exc, httpx.HTTPStatusError):
|
|
return exc.response.status_code in (502, 503, 504)
|
|
return isinstance(exc, httpx.TransportError)
|
|
|
|
|
|
async def call_deep_or_defer(
|
|
client: "AIClient",
|
|
prompt: str,
|
|
system: str | None = None,
|
|
cfg: "AIModelConfig | None" = None,
|
|
) -> str:
|
|
"""call_deep + 보류 변환 — 맥북 불가(503/연결/절단)는 StageDeferred 로 raise.
|
|
|
|
deep_summary_worker / summarize_worker(drain) / classify_worker(drain) 가 공유.
|
|
StageDeferred 는 queue_consumer/queue_drain 이 attempts 미소모 + deferred_until
|
|
백오프로 처리한다 (sleep-안전 불변식).
|
|
|
|
cfg: 지정 시 deep 슬롯 대신 이 config 로 호출 (classify drain — deep 슬롯의
|
|
endpoint 는 쓰되 triage 의 temperature/max_tokens 를 적용한 변형).
|
|
"""
|
|
from models.queue import StageDeferred
|
|
|
|
try:
|
|
if cfg is not None:
|
|
return await client._request(cfg, prompt, system=system)
|
|
return await client.call_deep(prompt, system=system)
|
|
except Exception as exc:
|
|
if is_deferrable_error(exc):
|
|
raise StageDeferred(f"macbook_unavailable:{type(exc).__name__}") from exc
|
|
raise
|
|
|
|
|
|
# 프롬프트 로딩
|
|
PROMPTS_DIR = Path(__file__).parent.parent / "prompts"
|
|
|
|
|
|
def _load_prompt(name: str) -> str:
|
|
return (PROMPTS_DIR / name).read_text(encoding="utf-8")
|
|
|
|
|
|
CLASSIFY_PROMPT = _load_prompt("classify.txt") if (PROMPTS_DIR / "classify.txt").exists() else ""
|
|
|
|
|
|
# 공유 httpx 클라이언트 — 호출마다 AsyncClient 를 새로 만들던 것(30+ 사이트, 연결풀 재사용 0)을
|
|
# 일원화해 keep-alive 재사용. 이벤트루프 바인딩이라 루프 변경(pytest 격리 등) 시 재생성한다.
|
|
# close() 는 공유 풀이라 no-op — 프로세스 종료 시 GC.
|
|
_shared_http: httpx.AsyncClient | None = None
|
|
_shared_http_loop: object | None = None
|
|
|
|
|
|
def _get_shared_http() -> httpx.AsyncClient:
|
|
global _shared_http, _shared_http_loop
|
|
try:
|
|
loop: object | None = asyncio.get_running_loop()
|
|
except RuntimeError:
|
|
loop = None
|
|
if _shared_http is None or _shared_http.is_closed or _shared_http_loop is not loop:
|
|
_shared_http = httpx.AsyncClient(timeout=120)
|
|
_shared_http_loop = loop
|
|
return _shared_http
|
|
|
|
|
|
class AIClient:
|
|
"""AI 모델 통합 클라이언트.
|
|
|
|
B-0 3-tier routing:
|
|
- call_triage(): Mac mini 26B MLX, 상시 호출 (llm_gate 외부 — concurrent 안전성 별 검토)
|
|
- call_primary(): Mac mini 26B MLX, 에스컬레이션 전용 (llm_gate Semaphore(1) 는 **caller 책임**)
|
|
- call_fallback(): triage/primary 실패 시 최후 방어선. Claude Sonnet 4 API (PR #20 swap 완료)
|
|
|
|
Legacy: classify() / summarize() 는 기존 호출부(tests/eval runner)를 위해 남겨둠.
|
|
신규 worker 경로는 전부 call_triage / call_primary 사용.
|
|
"""
|
|
|
|
def __init__(self):
|
|
self.ai = settings.ai
|
|
self._http = _get_shared_http()
|
|
|
|
# ─── 3-tier routing (B-0) ───────────────────────────────────────────────
|
|
|
|
async def call_triage(self, prompt: str) -> str:
|
|
"""Mac mini 26B MLX 직접 호출 (config.yaml ai.models.triage). llm_gate 외부 실행 — PR #20 이후 triage/primary 동일 endpoint 라 concurrent 안전성 별 검토.
|
|
|
|
timeout 은 config.yaml ai.models.triage.timeout (기본 30s).
|
|
실패 시 caller 가 에스컬레이션 또는 fallback 판단.
|
|
"""
|
|
return await self._request(self.ai.triage, prompt)
|
|
|
|
async def call_primary(self, prompt: str, system: str | None = None) -> str:
|
|
"""26B MLX 호출. 에스컬레이션 전용.
|
|
|
|
**caller 가 반드시 `async with get_mlx_gate():` 블록 안에서 호출해야 한다.**
|
|
Semaphore(1) 로 동시 호출이 1건으로 제한되어 있고, gate 는 primary 전용.
|
|
|
|
system: 지정 시 별도 system 메시지로 주입(이드 substrate compose 등). None=기존 동작(user 단일).
|
|
"""
|
|
return await self._request(self.ai.primary, prompt, system=system)
|
|
|
|
async def call_fallback(self, prompt: str) -> str:
|
|
"""triage/primary 실패 시 최후 방어선. Claude Sonnet 4 API (config.yaml ai.models.fallback) — PR #20 이후 swap 완료."""
|
|
return await self._request(self.ai.fallback, prompt)
|
|
|
|
async def call_deep(self, prompt: str, system: str | None = None) -> str:
|
|
"""심층 전용 — 맥북 M5 Max Qwen3.6-27B (config.yaml ai.models.deep, ds-macbook-offload-1).
|
|
|
|
llm-router :8890 경유(model=qwen-macbook alias) — 라우터의 wake preflight(~24s)·
|
|
editor_busy 가드를 재사용한다. 맥미니 mlx gate 와 무관(게이트는 맥미니 보호 목적)이라
|
|
gate 없이 호출. 자동 cloud/맥미니 폴백 없음 — 실패는 그대로 전파하고 보류 판단은
|
|
호출자가 is_deferrable_error() 로 한다. 슬롯 부재 시 primary 로 처리(방어적 —
|
|
호출자가 보통 슬롯 유무를 먼저 분기).
|
|
"""
|
|
cfg = self.ai.deep or self.ai.primary
|
|
return await self._request(cfg, prompt, system=system)
|
|
|
|
async def call_classifier(self, prompt: str) -> str:
|
|
"""answerability classifier (config ai.classifier, Mac mini 26B MLX).
|
|
|
|
private _request 직접 호출(classifier_service)을 봉인하는 public 진입점. gate 는
|
|
caller(classifier_service)가 acquire_mlx_gate 로 관리 — call_primary 와 동일한
|
|
caller-managed 계약(여기서 self-gate 하면 caller 와 double-acquire 데드락).
|
|
"""
|
|
return await self._request(self.ai.classifier, prompt)
|
|
|
|
async def call_verifier(self, prompt: str) -> str:
|
|
"""semantic verifier (config ai.verifier, Mac mini 26B MLX).
|
|
|
|
private _request 직접 호출(verifier_service)을 봉인. gate 는 caller(verifier_service)
|
|
가 관리(caller-managed — self-gate 금지).
|
|
"""
|
|
return await self._request(self.ai.verifier, prompt)
|
|
|
|
# ─── Legacy API (classify_worker 교체 시 제거 예정) ───────────────────
|
|
|
|
async def classify(self, text: str, cfg=None) -> dict:
|
|
"""[DEPRECATED] 기존 classify_worker 전용. B-1 에서 summary_triage 로 대체.
|
|
|
|
호출부 정리 전 존속. 신규 코드는 call_triage + prompt_render 를 쓸 것.
|
|
cfg (2026-06-12 fair-share): 지정 시 primary 대신 해당 config 로 호출 —
|
|
drain classify 가 deep 슬롯(맥북) 경유에 사용. cfg != ai.primary 라
|
|
_call_chat 의 primary→fallback 자동 전환은 발동하지 않는다 (에러 raw 전파).
|
|
"""
|
|
prompt = CLASSIFY_PROMPT.replace("{document_text}", text)
|
|
response = await self._call_chat(cfg or self.ai.primary, prompt)
|
|
return response
|
|
|
|
async def summarize(self, text: str, force_premium: bool = False, cfg=None) -> str:
|
|
"""[DEPRECATED] 기존 호출부용. B-1 에서 summary_triage 가 tldr 대체. cfg = classify() 와 동일."""
|
|
if force_premium:
|
|
return await self._call_chat(self.ai.premium, f"다음 문서를 500자 이내로 요약해주세요:\n\n{text}")
|
|
return await self._call_chat(cfg or self.ai.primary, f"다음 문서를 500자 이내로 요약해주세요:\n\n{text}")
|
|
|
|
async def embed(self, text: str) -> list[float]:
|
|
"""벡터 임베딩 — GPU 서버 전용"""
|
|
response = await self._http.post(
|
|
self.ai.embedding.endpoint,
|
|
json={"model": self.ai.embedding.model, "prompt": text, "keep_alive": -1}, # bge-m3 GPU 상주(홈랩 sparse 검색 cold reload ~6s 방지)
|
|
)
|
|
response.raise_for_status()
|
|
return response.json()["embedding"]
|
|
|
|
async def rerank(self, query: str, texts: list[str]) -> list[dict]:
|
|
"""TEI bge-reranker-v2-m3 호출 (Phase 1.3).
|
|
|
|
TEI POST /rerank API:
|
|
request: {"query": str, "texts": [str, ...]}
|
|
response: [{"index": int, "score": float}, ...] (정렬됨)
|
|
|
|
timeout은 self.ai.rerank.timeout (config.yaml).
|
|
호출자(rerank_service)가 asyncio.Semaphore + try/except로 감쌈.
|
|
"""
|
|
timeout = float(self.ai.rerank.timeout) if self.ai.rerank.timeout else 5.0
|
|
response = await self._http.post(
|
|
self.ai.rerank.endpoint,
|
|
json={"query": query, "texts": texts},
|
|
timeout=timeout,
|
|
)
|
|
response.raise_for_status()
|
|
return response.json()
|
|
|
|
async def _call_chat(self, model_config, prompt: str) -> str:
|
|
"""OpenAI 호환 API 호출 (R6: 무동의 클라우드 폴백 제거).
|
|
|
|
이전엔 primary(맥미니) TimeoutException/ConnectError 시 동의·과금 통제 없이
|
|
self.ai.fallback(Claude API)로 자동 전환 → 개인 문서/쿼리/메모가 Anthropic 으로
|
|
silent egress. on-prem 추론 프라이버시 계약 위반이라 봉쇄한다. 실패는 그대로 전파:
|
|
배치 워커는 재시도/StageDeferred(R3·queue_consumer), interactive 호출자는 5xx 표면화
|
|
(documents.analyze 등 이미 502/504 변환). 클라우드는 premium explicit-trigger
|
|
(summarize force_premium) 또는 call_fallback 명시 호출로만 — 자동 진입 금지.
|
|
"""
|
|
return await self._request(model_config, prompt)
|
|
|
|
async def _request(self, model_config, prompt: str, system: str | None = None) -> str:
|
|
"""단일 모델 API 호출 (OpenAI 호환 + Anthropic Messages API).
|
|
|
|
system: 지정 시 system 으로 주입(OpenAI=system role 메시지 / Anthropic=top-level system 필드).
|
|
None=user 단일 메시지(기존 동작, 하위호환).
|
|
"""
|
|
is_anthropic = "anthropic.com" in model_config.endpoint
|
|
|
|
if is_anthropic:
|
|
import os
|
|
headers = {
|
|
"x-api-key": os.getenv("CLAUDE_API_KEY", ""),
|
|
"anthropic-version": "2023-06-01",
|
|
"content-type": "application/json",
|
|
}
|
|
body = {
|
|
"model": model_config.model,
|
|
"max_tokens": model_config.max_tokens,
|
|
"messages": [{"role": "user", "content": prompt}],
|
|
}
|
|
if system:
|
|
body["system"] = system
|
|
response = await self._http.post(
|
|
model_config.endpoint,
|
|
headers=headers,
|
|
json=body,
|
|
timeout=model_config.timeout,
|
|
)
|
|
response.raise_for_status()
|
|
data = response.json()
|
|
return data["content"][0]["text"]
|
|
else:
|
|
messages = []
|
|
if system:
|
|
messages.append({"role": "system", "content": system})
|
|
messages.append({"role": "user", "content": prompt})
|
|
payload = {
|
|
"model": model_config.model,
|
|
"messages": messages,
|
|
"max_tokens": model_config.max_tokens,
|
|
"chat_template_kwargs": {"enable_thinking": False},
|
|
}
|
|
if model_config.temperature is not None:
|
|
payload["temperature"] = model_config.temperature
|
|
if model_config.top_p is not None:
|
|
payload["top_p"] = model_config.top_p
|
|
if model_config.repetition_penalty is not None:
|
|
payload["repetition_penalty"] = model_config.repetition_penalty
|
|
if model_config.top_k is not None:
|
|
payload["top_k"] = model_config.top_k
|
|
response = await self._http.post(
|
|
model_config.endpoint,
|
|
json=payload,
|
|
timeout=model_config.timeout,
|
|
)
|
|
response.raise_for_status()
|
|
data = response.json()
|
|
return data["choices"][0]["message"]["content"]
|
|
|
|
async def close(self):
|
|
# 공유 풀(_get_shared_http) 이라 per-use close 안 함 — 연결 재사용. 프로세스 종료 시 GC.
|
|
return None
|