fix(nanoclaude): prevent classifier router JSON leak in fallback path

When the classifier (gemma4:e4b) timed out or returned unparseable
output, the worker's "direct" branch re-called backend_registry.classifier
with the original user message. The classifier still had CLASSIFIER_PROMPT
attached, so it dutifully emitted router JSON like
{"action": "route", "response": "추론 모델에게 전달할게요!", ...}
which was streamed verbatim to Synology Chat as the bot's answer.
The reasoning model (Gemma 26B on Mac mini) was never actually invoked.

Changes:
- New services/classifier_io.py with parse_classification (returns explicit
  classification_failed instead of silently morphing to direct) and
  looks_like_router_json (defense-in-depth guard on any user-facing output).
- New BackendRegistry.chat_fallback adapter — same physical model as the
  classifier but with CHAT_FALLBACK_PROMPT (no JSON, no routing meta).
  This is what the worker now uses for failed-classification recovery.
- worker.py direct branch split into two:
    * elif action=="direct" and response_text and not router_json → push as-is
    * else → _fetch_fallback_text via chat_fallback (never the classifier),
      with leak guard suppressing router-shaped output.
- Belt-and-suspenders leak check on the final concatenated answer before
  _send_callback fires.
- Static safe message ("분류기가 응답을 제대로 만들지 못했어요...") when the
  fallback path produces nothing usable.

Tests:
- 28 unit tests in tests/test_classifier_io.py covering parser failure
  modes and the leak guard (incl. verbatim production payload).
- Integration tests in tests/test_worker_fallback.py asserting
  backend_registry.classifier is NOT called by the fallback path,
  chat_fallback IS called, router JSON output is suppressed, and the
  chat_fallback adapter system_prompt != CLASSIFIER_PROMPT.

Out of scope: long-input pre-routing optimization, EXAONE_* env rename,
full model routing redesign.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Hyungi Ahn
2026-05-02 08:33:54 +09:00
parent cc2c9467fe
commit 86c076fcf9
9 changed files with 571 additions and 38 deletions
+3
View File
@@ -0,0 +1,3 @@
[pytest]
asyncio_mode = auto
testpaths = tests
+3
View File
@@ -0,0 +1,3 @@
-r requirements.txt
pytest>=8.0
pytest-asyncio>=0.23
+24
View File
@@ -76,11 +76,27 @@ REASONER_PROMPT = (
"불필요한 구조화(번호 매기기, 헤더, 마크다운)는 피하고, 대화하듯 편하게 답변해."
)
# Why a separate prompt: the classifier model is shared (same physical model
# as `classifier`), but its system_prompt forces JSON router output. Any path
# that wants a *user-facing* answer from that model must use this adapter,
# never `classifier` directly — otherwise router JSON leaks to chat surfaces.
CHAT_FALLBACK_PROMPT = (
"너는 '이드'라는 이름의 상냥하고 친근한 AI 어시스턴트야. "
"사용자 메시지에 자연스러운 한국어로 짧고 명확하게 답해. "
"JSON, 마크다운, 코드블록, 라우팅 결정문(예: \"추론 모델에게 전달할게요\") 같은 메타 텍스트는 절대 출력하지 마. "
"확실히 모르는 내용이면 솔직히 모른다고 답하고 사용자에게 다시 물어봐."
)
class BackendRegistry:
def __init__(self) -> None:
self.classifier: ModelAdapter | None = None # EXAONE: 분류 + 직접응답
self.reasoner: ModelAdapter | None = None # Gemma4: 추론
# chat_fallback shares the classifier's physical model but uses
# CHAT_FALLBACK_PROMPT instead of CLASSIFIER_PROMPT, so it produces
# natural-language replies. Used when classification fails — never
# reuse `classifier` for user-facing generation.
self.chat_fallback: ModelAdapter | None = None
self._health: dict[str, bool] = {"classifier": False, "reasoner": False}
self._latency: dict[str, float] = {"classifier": 0.0, "reasoner": 0.0}
self._health_baseline: dict[str, float] = {}
@@ -110,6 +126,14 @@ class BackendRegistry:
timeout=settings.reasoning_timeout,
max_tokens=16000,
)
self.chat_fallback = ModelAdapter(
name="ChatFallback",
base_url=settings.exaone_base_url,
model=settings.exaone_model,
system_prompt=CHAT_FALLBACK_PROMPT,
temperature=settings.exaone_temperature,
timeout=settings.exaone_timeout,
)
def start_health_loop(self, interval: float = 30.0) -> None:
self._health_task = asyncio.create_task(self._health_loop(interval))
+118
View File
@@ -0,0 +1,118 @@
"""Pure helpers for classifier I/O — parsing classifier output and detecting
router-shaped JSON that must never reach the user.
Kept dependency-free so unit tests can import without bootstrapping settings,
DB, or HTTP clients.
"""
from __future__ import annotations
import json
ROUTER_ACTIONS = frozenset(
{"route", "direct", "clarify", "tools", "system_status", "classification_failed"}
)
ROUTER_KEYS = frozenset({"action", "tool", "operation", "params", "prompt"})
_CLASSIFICATION_FAILED: dict = {
"action": "classification_failed",
"response": "",
"prompt": "",
}
def parse_classification(raw: str) -> dict:
"""Parse classifier output into a dict that always carries an `action`.
Returns the parsed JSON when:
- the payload contains a JSON object with an `action` key.
Returns ``{"action": "classification_failed", ...}`` when:
- input is empty or whitespace only,
- no JSON object is present,
- the JSON cannot be decoded,
- the decoded value is not a dict,
- the decoded dict has no `action` key.
The failure path used to be `{"action": "direct", "response": <raw>}`,
which let a downstream branch re-call the classifier and stream
classifier-prompt JSON straight to the user. Returning an explicit
failure sentinel forces callers to take a non-classifier code path.
"""
if raw is None:
return dict(_CLASSIFICATION_FAILED)
stripped = raw.strip()
if not stripped:
return dict(_CLASSIFICATION_FAILED)
start = stripped.find("{")
end = stripped.rfind("}")
if start < 0 or end <= start:
return dict(_CLASSIFICATION_FAILED)
candidate = stripped[start : end + 1]
try:
result = json.loads(candidate)
except json.JSONDecodeError:
return dict(_CLASSIFICATION_FAILED)
if not isinstance(result, dict) or "action" not in result:
return dict(_CLASSIFICATION_FAILED)
return result
def looks_like_router_json(text: str) -> bool:
"""Return True if ``text`` carries router/classifier JSON.
The classifier system prompt forces the model to emit JSON of shape
``{"action": "...", "response": "...", "prompt": "..."}`` (and several
tool variants). When the fallback path accidentally streams that JSON
to the user, the chat surface displays the raw routing decision instead
of an answer. This guard lets us drop such output before send.
Detection rules (any one is enough):
- contains a JSON object whose ``action`` value is a known router action,
- contains a JSON object with two or more known router-shaped keys.
Returns False for empty input, plain text, or unrelated JSON.
"""
if not text:
return False
stripped = text.strip()
if not stripped:
return False
# Strip leading/trailing markdown fences, if any.
for fence in ("```json", "```"):
if stripped.startswith(fence):
stripped = stripped[len(fence) :].lstrip()
break
if stripped.endswith("```"):
stripped = stripped[: -len("```")].rstrip()
start = stripped.find("{")
end = stripped.rfind("}")
if start < 0 or end <= start:
return False
candidate = stripped[start : end + 1]
try:
obj = json.loads(candidate)
except json.JSONDecodeError:
return False
if not isinstance(obj, dict):
return False
action = obj.get("action")
if isinstance(action, str) and action in ROUTER_ACTIONS:
return True
matching = ROUTER_KEYS & set(obj.keys())
if len(matching) >= 2:
return True
return False
+95 -32
View File
@@ -11,6 +11,7 @@ from config import settings
from db.database import log_completion, log_request
from models.schemas import JobStatus
from services.backend_registry import backend_registry
from services.classifier_io import looks_like_router_json, parse_classification
from services.conversation import conversation_store
from services.job_manager import Job, job_manager
from services.state_stream import state_stream
@@ -68,24 +69,44 @@ async def _stream_with_cancel(adapter, message: str, job: Job, collected: list[s
return True
def _parse_classification(raw: str) -> dict:
"""EXAONE JSON 응답 파싱. 실패 시 direct fallback."""
raw = raw.strip()
# 어떤 형태든 첫 번째 { ~ 마지막 } 추출
start = raw.find("{")
end = raw.rfind("}")
if start >= 0 and end > start:
json_str = raw[start:end + 1]
async def _fetch_fallback_text(job: Job) -> str:
"""Generate a safe natural-language reply when classification fails.
Uses ``backend_registry.chat_fallback`` (same physical model as the
classifier, but with a chat-shaped system prompt). Never reuses
``backend_registry.classifier`` — that adapter has CLASSIFIER_PROMPT
attached, so it would emit router JSON straight to the user.
Returns "" when the fallback adapter is unavailable, the call fails,
or the output looks like router JSON. Callers treat empty as "send
the safe error message instead".
"""
adapter = backend_registry.chat_fallback
if adapter is None:
logger.warning("chat_fallback adapter not initialized for job %s", job.id)
return ""
await state_stream.push(
job.id, "processing", {"message": "응답을 생성하고 있습니다..."}
)
try:
result = json.loads(json_str)
if "action" in result:
return result
except json.JSONDecodeError:
pass
# JSON 파싱 실패 → direct로 취급 (raw 텍스트가 직접 응답)
# 마크다운/코드블록 잔재 제거
cleaned = raw.replace("```json", "").replace("```", "").replace("`json", "").replace("`", "").strip()
return {"action": "direct", "response": cleaned, "prompt": ""}
text = await _complete_with_heartbeat(
adapter, job.message, job.id,
beat_msg="응답을 생성하고 있습니다...",
)
except Exception:
logger.warning("Chat fallback failed for job %s", job.id, exc_info=True)
return ""
if looks_like_router_json(text):
logger.warning(
"Router-like JSON detected in fallback output for job %s, suppressing",
job.id,
)
return ""
return text.strip()
async def _build_system_status(force_measure: bool = True) -> str:
@@ -327,9 +348,9 @@ async def run(job: Job) -> None:
raw_result = ""
classify_latency = (time() - classify_start) * 1000
classification = _parse_classification(raw_result)
classification = parse_classification(raw_result)
action = classification.get("action", "direct")
action = classification.get("action", "classification_failed")
response_text = classification.get("response", "")
route_prompt = classification.get("prompt", "")
@@ -339,6 +360,10 @@ async def run(job: Job) -> None:
await conversation_store.add(user_id, "user", job.message)
collected: list[str] = []
# True iff we entered the chat-fallback branch (classification failed,
# or a routed action could not run). Drives the safe error message at
# the Complete gate.
fallback_path = False
if job.status == JobStatus.cancelled:
return
@@ -574,33 +599,71 @@ async def run(job: Job) -> None:
if collected:
await conversation_store.add(user_id, "assistant", "".join(collected))
else:
# === DIRECT: EXAONE 직접 응답 ===
if response_text:
# 분류기가 이미 응답을 생성함
elif action == "direct" and response_text and not looks_like_router_json(response_text):
# === DIRECT: 분류기가 자체 답변을 만든 정상 경로 ===
collected.append(response_text)
await state_stream.push(job.id, "result", {"content": response_text})
else:
# 분류 실패 → EXAONE 스트리밍으로 직접 응답
await state_stream.push(job.id, "processing", {"message": "응답을 생성하고 있습니다..."})
ok = await _stream_with_cancel(backend_registry.classifier, job.message, job, collected)
if not ok:
return
if collected:
await conversation_store.add(user_id, "assistant", "".join(collected))
else:
# === FALLBACK ===
# Reached when:
# - classification_failed (parser couldn't extract router JSON),
# - action="direct" but response is empty / itself router JSON,
# - action="route" but pipeline disabled or reasoner unhealthy,
# - any unknown action.
#
# MUST NOT call backend_registry.classifier here. That adapter
# carries CLASSIFIER_PROMPT, so reusing it for user-facing
# generation streams router JSON to chat. Use chat_fallback,
# which shares the physical model but uses a chat-shaped prompt.
fallback_path = True
if action == "direct" and response_text and looks_like_router_json(response_text):
logger.warning(
"Direct response_text looked like router JSON for job %s, redirecting to chat_fallback",
job.id,
)
fallback_text = await _fetch_fallback_text(job)
if fallback_text:
collected.append(fallback_text)
await state_stream.push(job.id, "result", {"content": fallback_text})
await conversation_store.add(user_id, "assistant", fallback_text)
# --- Complete ---
if not collected:
job_manager.set_status(job.id, JobStatus.failed)
await state_stream.push(job.id, "error", {"message": "응답을 받지 못했습니다."})
status = "failed"
await _send_callback(job, "⚠️ 응답을 받지 못했습니다. 다시 시도해주세요.")
safe_msg = (
"분류기가 응답을 제대로 만들지 못했어요. 다시 한 번 요청해 주세요."
if fallback_path
else "⚠️ 응답을 받지 못했습니다. 다시 시도해주세요."
)
await _send_callback(job, safe_msg)
else:
# Belt-and-suspenders leak guard: even if everything above behaved,
# never let router JSON become the user-visible final answer.
final_text = "".join(collected)
if looks_like_router_json(final_text):
logger.warning(
"Router-like JSON in collected output for job %s, replacing with safe message",
job.id,
)
job_manager.set_status(job.id, JobStatus.failed)
await state_stream.push(
job.id, "error", {"message": "응답을 받지 못했습니다."}
)
status = "failed"
await _send_callback(
job,
"분류기가 응답을 제대로 만들지 못했어요. 다시 한 번 요청해 주세요.",
)
else:
job_manager.set_status(job.id, JobStatus.completed)
await state_stream.push(job.id, "done", {"message": "완료"})
status = "completed"
await _send_callback(job, "".join(collected))
await _send_callback(job, final_text)
# --- DB 로깅 ---
latency_ms = (time() - start_time) * 1000
View File
+12
View File
@@ -0,0 +1,12 @@
"""pytest configuration — pin nanoclaude/ on sys.path so tests can use the
same `services.x` imports the running app does.
"""
from __future__ import annotations
import sys
from pathlib import Path
NANOCLAUDE_ROOT = Path(__file__).resolve().parent.parent
if str(NANOCLAUDE_ROOT) not in sys.path:
sys.path.insert(0, str(NANOCLAUDE_ROOT))
+158
View File
@@ -0,0 +1,158 @@
"""Unit tests for services.classifier_io — parser and router-JSON guard.
These are pure-function tests. They run without bootstrapping the rest of
the worker's imports.
"""
from __future__ import annotations
from services.classifier_io import looks_like_router_json, parse_classification
# ---------- parse_classification ----------
def test_empty_raw_returns_classification_failed():
result = parse_classification("")
assert result["action"] == "classification_failed"
assert result["response"] == ""
assert result["prompt"] == ""
def test_whitespace_only_returns_classification_failed():
assert parse_classification(" \n\t ")["action"] == "classification_failed"
def test_none_returns_classification_failed():
assert parse_classification(None)["action"] == "classification_failed" # type: ignore[arg-type]
def test_non_json_text_returns_classification_failed():
assert parse_classification("그냥 평범한 답변입니다.")["action"] == "classification_failed"
def test_json_without_action_key_returns_classification_failed():
assert parse_classification('{"foo": "bar", "baz": 1}')["action"] == "classification_failed"
def test_invalid_json_returns_classification_failed():
assert parse_classification('{"action": "direct", broken')["action"] == "classification_failed"
def test_non_dict_json_returns_classification_failed():
assert parse_classification("[1, 2, 3]")["action"] == "classification_failed"
def test_valid_direct_returns_unchanged():
raw = '{"action": "direct", "response": "안녕!", "prompt": ""}'
result = parse_classification(raw)
assert result["action"] == "direct"
assert result["response"] == "안녕!"
def test_valid_route_returns_unchanged():
raw = '{"action": "route", "response": "분석 중", "prompt": "양자역학 설명"}'
result = parse_classification(raw)
assert result["action"] == "route"
assert result["prompt"] == "양자역학 설명"
def test_valid_tools_returns_unchanged():
raw = '{"action": "tools", "tool": "calendar", "operation": "today", "params": {}}'
result = parse_classification(raw)
assert result["action"] == "tools"
assert result["tool"] == "calendar"
def test_json_embedded_in_surrounding_text_extracts():
raw = 'Sure! {"action": "direct", "response": "hi", "prompt": ""} Done.'
assert parse_classification(raw)["action"] == "direct"
def test_empty_does_not_become_direct():
"""Regression: empty classifier output must NOT silently become a direct
action. The old behavior caused the worker's direct branch to re-call the
classifier with the user's message, leaking router JSON to chat."""
assert parse_classification("")["action"] != "direct"
def test_non_json_text_does_not_become_direct():
"""Regression: raw natural-language text must NOT become a direct action
that streams the raw text to the user. The classifier prompt biases the
model toward JSON output, so non-JSON output is a malfunction signal."""
assert parse_classification("이건 평문 답변이야.")["action"] != "direct"
# ---------- looks_like_router_json ----------
def test_router_json_with_action_route_detected():
assert looks_like_router_json('{"action": "route", "response": "...", "prompt": "..."}') is True
def test_router_json_with_action_direct_detected():
assert looks_like_router_json('{"action": "direct", "response": "hi", "prompt": ""}') is True
def test_router_json_with_action_tools_detected():
text = '{"action": "tools", "tool": "calendar", "operation": "today", "params": {}}'
assert looks_like_router_json(text) is True
def test_router_json_with_action_clarify_detected():
assert looks_like_router_json('{"action": "clarify", "response": "어떤 의미?", "prompt": ""}') is True
def test_router_json_with_action_classification_failed_detected():
assert looks_like_router_json('{"action": "classification_failed"}') is True
def test_natural_text_not_detected():
assert looks_like_router_json("안녕하세요! 무엇을 도와드릴까요?") is False
def test_empty_string_not_detected():
assert looks_like_router_json("") is False
def test_whitespace_only_not_detected():
assert looks_like_router_json(" \n ") is False
def test_text_with_braces_but_no_json_not_detected():
assert looks_like_router_json("이건 {중괄호 가} 있는 자연어 답변이에요.") is False
def test_unrelated_json_not_detected():
assert looks_like_router_json('{"name": "이드", "version": "1.0"}') is False
def test_json_with_two_router_keys_detected():
"""Even without an `action` field, two router-shaped keys signal leakage."""
assert looks_like_router_json('{"prompt": "do x", "tool": "calendar"}') is True
def test_json_with_one_router_key_not_detected():
"""A single router-like key in otherwise-normal JSON should not trip the guard."""
assert looks_like_router_json('{"prompt": "사용자가 입력한 prompt"}') is False
def test_code_fenced_router_json_detected():
text = '```json\n{"action": "route", "response": "...", "prompt": "..."}\n```'
assert looks_like_router_json(text) is True
def test_unknown_action_value_with_other_router_keys_detected():
"""Unknown action value but other router keys present → still leaks shape."""
text = '{"action": "weird", "tool": "calendar", "operation": "today"}'
assert looks_like_router_json(text) is True
def test_actual_bug_payload_detected():
"""Verbatim production leak (Synology Chat 2026-05-02 08:16:25)."""
text = (
'{"action": "route", "response": "사용자님의 깊은 감정이 담긴 글이네요. '
'이 글을 바탕으로 질문에 답하기 위해서는 자세한 분석이 필요해요. '
'추론 모델에게 전달할게요!", "prompt": "노래 가사를 분석해주세요."}'
)
assert looks_like_router_json(text) is True
+152
View File
@@ -0,0 +1,152 @@
"""Integration-style tests for the worker's fallback path.
Pinned behaviors:
- When classification fails, the direct/fallback path must NOT call
``backend_registry.classifier`` (its CLASSIFIER_PROMPT would leak
router JSON to the user).
- The fallback path must call ``backend_registry.chat_fallback``.
- Router-shaped JSON returned from chat_fallback is suppressed.
- chat_fallback adapter being None or raising returns "".
These tests touch ``services.worker``, which imports the rest of the app
(config, db, tools). Run them in the nanoclaude venv where all deps live;
locally without those deps, pytest will skip the module.
"""
from __future__ import annotations
import asyncio
from unittest.mock import AsyncMock, MagicMock
import pytest
# Skip the whole module if heavy deps (aiosqlite, caldav, ...) aren't
# available. Prevents local devs from seeing a wall of ImportError.
pytest.importorskip("aiosqlite")
pytest.importorskip("caldav")
from services import worker # noqa: E402
from services.backend_registry import backend_registry # noqa: E402
def _make_job(message: str = "안녕"):
job = MagicMock()
job.id = "test-job"
job.message = message
job.callback = "synology"
job.status = "processing"
job.response_sent = False
return job
@pytest.fixture(autouse=True)
def _patch_state_stream(monkeypatch):
"""state_stream.push is awaited from inside _fetch_fallback_text.
Replace with an AsyncMock so the test doesn't try to send real events."""
fake_stream = MagicMock()
fake_stream.push = AsyncMock()
fake_stream.push_done = AsyncMock()
monkeypatch.setattr(worker, "state_stream", fake_stream)
return fake_stream
@pytest.fixture(autouse=True)
def _bypass_heartbeat(monkeypatch):
"""_complete_with_heartbeat normally sleeps 2s between heartbeats. Bypass
by replacing it with a thin shim that just awaits adapter.complete_chat."""
async def _direct_call(adapter, message, job_id, *, messages=None, beat_msg=""):
return await adapter.complete_chat(message, messages=messages)
monkeypatch.setattr(worker, "_complete_with_heartbeat", _direct_call)
@pytest.mark.asyncio
async def test_fallback_uses_chat_fallback_not_classifier(monkeypatch):
classifier_mock = MagicMock()
classifier_mock.complete_chat = AsyncMock(
return_value="this should never reach the user"
)
chat_fallback_mock = MagicMock()
chat_fallback_mock.complete_chat = AsyncMock(
return_value="자연스러운 답변입니다."
)
monkeypatch.setattr(backend_registry, "classifier", classifier_mock)
monkeypatch.setattr(backend_registry, "chat_fallback", chat_fallback_mock)
result = await worker._fetch_fallback_text(_make_job())
assert result == "자연스러운 답변입니다."
chat_fallback_mock.complete_chat.assert_awaited_once()
classifier_mock.complete_chat.assert_not_called()
@pytest.mark.asyncio
async def test_fallback_suppresses_router_json_output(monkeypatch):
chat_fallback_mock = MagicMock()
# The exact bug shape from production: chat-side model emits router JSON.
chat_fallback_mock.complete_chat = AsyncMock(
return_value=(
'{"action": "route", "response": "사용자님의 깊은 감정이 담긴 글이네요.", '
'"prompt": "노래 가사를 분석해주세요."}'
)
)
monkeypatch.setattr(backend_registry, "chat_fallback", chat_fallback_mock)
result = await worker._fetch_fallback_text(_make_job())
assert result == "" # Suppressed by leak guard
@pytest.mark.asyncio
async def test_fallback_returns_empty_when_adapter_missing(monkeypatch):
monkeypatch.setattr(backend_registry, "chat_fallback", None)
result = await worker._fetch_fallback_text(_make_job())
assert result == ""
@pytest.mark.asyncio
async def test_fallback_returns_empty_on_adapter_exception(monkeypatch):
chat_fallback_mock = MagicMock()
chat_fallback_mock.complete_chat = AsyncMock(
side_effect=RuntimeError("connection refused")
)
monkeypatch.setattr(backend_registry, "chat_fallback", chat_fallback_mock)
result = await worker._fetch_fallback_text(_make_job())
assert result == ""
@pytest.mark.asyncio
async def test_fallback_strips_whitespace(monkeypatch):
chat_fallback_mock = MagicMock()
chat_fallback_mock.complete_chat = AsyncMock(
return_value=" 안녕하세요!\n "
)
monkeypatch.setattr(backend_registry, "chat_fallback", chat_fallback_mock)
result = await worker._fetch_fallback_text(_make_job())
assert result == "안녕하세요!"
@pytest.mark.asyncio
async def test_chat_fallback_adapter_uses_chat_prompt_not_classifier_prompt():
"""Spec contract: the chat_fallback adapter must NOT carry CLASSIFIER_PROMPT.
If init_from_settings ever wires the wrong prompt onto chat_fallback, this
test catches it before the leak hits production."""
from config import settings
from services.backend_registry import (
BackendRegistry,
CHAT_FALLBACK_PROMPT,
CLASSIFIER_PROMPT,
)
registry = BackendRegistry()
registry.init_from_settings(settings)
assert registry.chat_fallback is not None
assert registry.chat_fallback.system_prompt == CHAT_FALLBACK_PROMPT
assert registry.chat_fallback.system_prompt != CLASSIFIER_PROMPT