Files
hyungi_document_server/app/ai/client.py
T
Hyungi Ahn b3dbf1a11e fix(ai): parse_json_response — string literal 안만 fix 하는 stateful walker
직전 fallback 의 무차별 newline replace 가 string 외부 (object 구조) 의 raw newline
까지 escape 해서 JSON 거부. 또 LaTeX 수식 (\circ, \text, \, etc) 의 invalid backslash
는 newline 이슈와 별개라 별도 fix 필요.

state machine: in_string 토글 (`\"` 만남). string literal 안에서만:
- raw LF/CR/TAB → \\n/\\r/\\t 로 변환
- backslash 다음에 valid escape char (\"\\/bfnrtu) 면 그대로
- backslash 다음에 invalid (\\c, \\,) 면 backslash 자체를 \\\\ 로 escape
- string 외부 raw newline 은 JSON whitespace 라 보존

운영 데이터 id=243 의 raw 940자에 \\circ \\text \\, \\approx \\times 등 다수 LaTeX +
markdown 줄바꿈 → 새 walker 가 두 케이스 모두 fix. 다른 worker (classify/triage/
study_explanation/evidence/study_session_analysis) 자동 혜택.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-02 08:00:20 +09:00

281 lines
11 KiB
Python

"""AI 추상화 레이어 — 통합 클라이언트. 기본값은 항상 Qwen3.5."""
import json
import re
from pathlib import Path
import httpx
from core.config import settings
def strip_thinking(text: str) -> str:
"""Qwen3.5의 <think>...</think> 블록 및 Thinking Process 텍스트 제거"""
# <think> 태그 제거
text = re.sub(r"<think>.*?</think>", "", text, flags=re.DOTALL)
# "Thinking Process:" 등 사고 과정 텍스트 제거 (첫 번째 { 이전의 모든 텍스트)
json_start = text.find("{")
if json_start > 0:
text = text[json_start:]
return text.strip()
def parse_json_response(raw: str) -> dict | None:
"""AI 응답에서 JSON 객체 추출 (think 태그, 코드블록 등 제거).
파싱 시도 순서 (앞 단계가 성공하면 즉시 반환):
1. ``` json fenced 블록 안의 첫 ``{...}`` (DOTALL)
2. balanced 정규식 finditer 의 마지막 매치
3. 전체 cleaned 그대로 json.loads
4. (Phase 4-A 후속) "first ``{`` ~ last ``}``" greedy slice — envelope JSON 안에
내부 따옴표/백틱/뉴라인 때문에 balanced 정규식이 못 잡는 케이스 방어.
raw text 의 첫 ``{`` 부터 마지막 ``}`` 까지 잘라 json.loads. 모델이 JSON 앞뒤
자유 텍스트 섞어도 본체만 추출.
"""
cleaned = strip_thinking(raw)
# 1. 코드블록 내부 JSON 추출
code_match = re.search(r"```(?:json)?\s*(\{.*?\})\s*```", cleaned, re.DOTALL)
if code_match:
cleaned = code_match.group(1)
# 2. 마지막 유효 JSON 객체 찾기 (balanced 1단계)
matches = list(re.finditer(r"\{[^{}]*(?:\{[^{}]*\}[^{}]*)*\}", cleaned, re.DOTALL))
for m in reversed(matches):
try:
return json.loads(m.group())
except json.JSONDecodeError:
continue
# 3. 전체 cleaned
try:
result = json.loads(cleaned)
if isinstance(result, dict):
return result
except json.JSONDecodeError:
pass
# 4. greedy slice fallback — first '{' ~ last '}' 까지
first = cleaned.find("{")
last = cleaned.rfind("}")
if first < 0 or last <= first:
return None
candidate = cleaned[first : last + 1]
try:
obj = json.loads(candidate)
return obj if isinstance(obj, dict) else None
except json.JSONDecodeError:
pass
# 5. (Phase 4-A 후속) Markdown 줄바꿈 + LaTeX 수식이 JSON string literal 안에
# raw 로 들어간 케이스 방어. 두 가지 invalid:
# - raw newline (LF/CR/TAB) — JSON 표준 string 안 control char 금지
# - invalid backslash — `\circ`, `\text`, `\,` 같은 LaTeX. JSON valid escape
# 은 `\"`, `\\`, `\/`, `\b`, `\f`, `\n`, `\r`, `\t`, `\uXXXX` 만.
# stateful walker — string literal 안에서만 fix. 외부 (object 구조) 의 newline
# 은 valid whitespace 라 보존.
escaped = _fix_json_string_escapes(candidate)
try:
obj = json.loads(escaped)
return obj if isinstance(obj, dict) else None
except json.JSONDecodeError:
return None
_VALID_JSON_ESCAPES = set('"\\/bfnrtu')
def _fix_json_string_escapes(s: str) -> str:
"""JSON string literal 안의 raw newline + invalid backslash 만 escape.
state machine: in_string 토글 (`"` 마주침). string 안에서만:
- raw LF/CR/TAB → ``\\n``/``\\r``/``\\t`` 로 변환
- 백슬래시 다음에 valid escape char (`"\\/bfnrtu`) 면 그대로
- 백슬래시 다음에 invalid char (`\\c`, `\\,`) 면 백슬래시 자체를 ``\\\\`` 로 escape
string 외부 (`{` `,` `:` 사이) 의 raw newline 등은 JSON whitespace 라 보존.
"""
out: list[str] = []
i = 0
n = len(s)
in_string = False
while i < n:
ch = s[i]
if not in_string:
if ch == '"':
in_string = True
out.append(ch)
i += 1
continue
# in_string
if ch == "\\":
nxt = s[i + 1] if i + 1 < n else ""
if nxt in _VALID_JSON_ESCAPES:
out.append(ch)
out.append(nxt)
i += 2
continue
# invalid escape — backslash 자체를 escape
out.append("\\\\")
i += 1
continue
if ch == '"':
in_string = False
out.append(ch)
i += 1
continue
if ch == "\n":
out.append("\\n")
i += 1
continue
if ch == "\r":
out.append("\\r")
i += 1
continue
if ch == "\t":
out.append("\\t")
i += 1
continue
out.append(ch)
i += 1
return "".join(out)
# 프롬프트 로딩
PROMPTS_DIR = Path(__file__).parent.parent / "prompts"
def _load_prompt(name: str) -> str:
return (PROMPTS_DIR / name).read_text(encoding="utf-8")
CLASSIFY_PROMPT = _load_prompt("classify.txt") if (PROMPTS_DIR / "classify.txt").exists() else ""
class AIClient:
"""AI 모델 통합 클라이언트.
B-0 3-tier routing:
- call_triage(): 4B Ollama, 상시 호출 (llm_gate 외부 — 병렬 OK)
- call_primary(): 26B MLX, 에스컬레이션 전용 (llm_gate Semaphore(1) 는 **caller 책임**)
- call_fallback(): triage/primary 실패 시 최후 방어선 (현재 4B 동일)
Legacy: classify() / summarize() 는 기존 호출부(tests/eval runner)를 위해 남겨둠.
신규 worker 경로는 전부 call_triage / call_primary 사용.
"""
def __init__(self):
self.ai = settings.ai
self._http = httpx.AsyncClient(timeout=120)
# ─── 3-tier routing (B-0) ───────────────────────────────────────────────
async def call_triage(self, prompt: str) -> str:
"""4B Ollama 직접 호출. llm_gate 밖 (Ollama 는 concurrent OK).
timeout 은 config.yaml ai.models.triage.timeout (기본 30s).
실패 시 caller 가 에스컬레이션 또는 fallback 판단.
"""
return await self._request(self.ai.triage, prompt)
async def call_primary(self, prompt: str) -> str:
"""26B MLX 호출. 에스컬레이션 전용.
**caller 가 반드시 `async with get_mlx_gate():` 블록 안에서 호출해야 한다.**
Semaphore(1) 로 동시 호출이 1건으로 제한되어 있고, gate 는 primary 전용.
"""
return await self._request(self.ai.primary, prompt)
async def call_fallback(self, prompt: str) -> str:
"""triage/primary 실패 시 최후 방어선. 현재는 triage 와 동일 엔드포인트."""
return await self._request(self.ai.fallback, prompt)
# ─── Legacy API (classify_worker 교체 시 제거 예정) ───────────────────
async def classify(self, text: str) -> dict:
"""[DEPRECATED] 기존 classify_worker 전용. B-1 에서 summary_triage 로 대체.
호출부 정리 전 존속. 신규 코드는 call_triage + prompt_render 를 쓸 것.
"""
prompt = CLASSIFY_PROMPT.replace("{document_text}", text)
response = await self._call_chat(self.ai.primary, prompt)
return response
async def summarize(self, text: str, force_premium: bool = False) -> str:
"""[DEPRECATED] 기존 호출부용. B-1 에서 summary_triage 가 tldr 대체."""
if force_premium:
return await self._call_chat(self.ai.premium, f"다음 문서를 500자 이내로 요약해주세요:\n\n{text}")
return await self._call_chat(self.ai.primary, f"다음 문서를 500자 이내로 요약해주세요:\n\n{text}")
async def embed(self, text: str) -> list[float]:
"""벡터 임베딩 — GPU 서버 전용"""
response = await self._http.post(
self.ai.embedding.endpoint,
json={"model": self.ai.embedding.model, "prompt": text},
)
response.raise_for_status()
return response.json()["embedding"]
async def rerank(self, query: str, texts: list[str]) -> list[dict]:
"""TEI bge-reranker-v2-m3 호출 (Phase 1.3).
TEI POST /rerank API:
request: {"query": str, "texts": [str, ...]}
response: [{"index": int, "score": float}, ...] (정렬됨)
timeout은 self.ai.rerank.timeout (config.yaml).
호출자(rerank_service)가 asyncio.Semaphore + try/except로 감쌈.
"""
timeout = float(self.ai.rerank.timeout) if self.ai.rerank.timeout else 5.0
response = await self._http.post(
self.ai.rerank.endpoint,
json={"query": query, "texts": texts},
timeout=timeout,
)
response.raise_for_status()
return response.json()
async def _call_chat(self, model_config, prompt: str) -> str:
"""OpenAI 호환 API 호출 + 자동 폴백"""
try:
return await self._request(model_config, prompt)
except (httpx.TimeoutException, httpx.ConnectError):
if model_config == self.ai.primary:
return await self._request(self.ai.fallback, prompt)
raise
async def _request(self, model_config, prompt: str) -> str:
"""단일 모델 API 호출 (OpenAI 호환 + Anthropic Messages API)"""
is_anthropic = "anthropic.com" in model_config.endpoint
if is_anthropic:
import os
headers = {
"x-api-key": os.getenv("CLAUDE_API_KEY", ""),
"anthropic-version": "2023-06-01",
"content-type": "application/json",
}
response = await self._http.post(
model_config.endpoint,
headers=headers,
json={
"model": model_config.model,
"max_tokens": model_config.max_tokens,
"messages": [{"role": "user", "content": prompt}],
},
timeout=model_config.timeout,
)
response.raise_for_status()
data = response.json()
return data["content"][0]["text"]
else:
response = await self._http.post(
model_config.endpoint,
json={
"model": model_config.model,
"messages": [{"role": "user", "content": prompt}],
"max_tokens": model_config.max_tokens,
"chat_template_kwargs": {"enable_thinking": False},
},
timeout=model_config.timeout,
)
response.raise_for_status()
data = response.json()
return data["choices"][0]["message"]["content"]
async def close(self):
await self._http.aclose()