feat: NanoClaude Phase 2 — EXAONE→Gemma 파이프라인, 큐, 상태 API

- ModelAdapter: 범용 OpenAI-compat 어댑터 (stream/complete/health)
- BackendRegistry: rewriter(EXAONE) + reasoner(Gemma4) 헬스체크 루프
- 2단계 파이프라인: EXAONE rewrite → Gemma reasoning (SSE rewrite 이벤트 노출)
- Fallback: 맥미니 다운 시 EXAONE 단독 모드, stream 중간 실패 시 자동 전환
- Cancel-safe: rewrite 전/후, streaming loop 내, fallback 경로 모두 체크
- Rewrite heartbeat: complete_chat 대기 중 2초 간격 processing 이벤트
- JobQueue: Semaphore(3) 기반 동시성 제한, 정확한 queue position
- GET /chat/{job_id}/status, GET /queue/stats 엔드포인트
- DB: rewrite_model, reasoning_model, rewritten_message 컬럼 추가

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Hyungi Ahn
2026-04-06 12:04:15 +09:00
parent 8c41a5dead
commit c4c32170f1
14 changed files with 495 additions and 141 deletions

View File

@@ -17,4 +17,8 @@ DB_PATH=/app/data/gateway.db
# NanoClaude
EXAONE_MODEL=exaone3.5:7.8b-instruct-q8_0
REASONING_BASE_URL=http://192.168.1.122:8800
REASONING_MODEL=mlx-community/gemma-4-26b-a4b-it-8bit
PIPELINE_ENABLED=true
MAX_CONCURRENT_JOBS=3
NANOCLAUDE_API_KEY=

View File

@@ -16,7 +16,8 @@ GPU 서버(RTX 4070 Ti Super)에서 운영하는 중앙 AI 라우팅 서비스.
- GPU Ollama: host.docker.internal:11434
- 맥미니 Ollama: 100.115.153.119:11434
- NanoClaude: localhost:8100 (비동기 job 기반 AI Gateway)
- 맥미니 MLX: 192.168.1.122:8800 (Gemma 4)
- NanoClaude: localhost:8100 (EXAONE → Gemma 파이프라인)
## 개발
@@ -41,7 +42,9 @@ OpenAI 호환: `/v1/chat/completions`, `/v1/models`, `/v1/embeddings`
## NanoClaude API
비동기 job 기반: `POST /nano/chat``{ job_id }`, `GET /nano/chat/{job_id}/stream` → SSE
상태: `GET /nano/chat/{job_id}/status`, 큐: `GET /nano/queue/stats`
취소: `POST /nano/chat/{job_id}/cancel`
파이프라인: EXAONE (rewrite) → Gemma 4 (reasoning), 맥미니 다운 시 EXAONE fallback
## 백엔드 설정

View File

@@ -54,6 +54,10 @@ services:
environment:
- EXAONE_BASE_URL=http://host.docker.internal:11434
- EXAONE_MODEL=${EXAONE_MODEL:-exaone3.5:7.8b-instruct-q8_0}
- REASONING_BASE_URL=${REASONING_BASE_URL:-http://192.168.1.122:8800}
- REASONING_MODEL=${REASONING_MODEL:-mlx-community/gemma-4-26b-a4b-it-8bit}
- PIPELINE_ENABLED=${PIPELINE_ENABLED:-true}
- MAX_CONCURRENT_JOBS=${MAX_CONCURRENT_JOBS:-3}
- DB_PATH=/app/data/nanoclaude.db
- API_KEY=${NANOCLAUDE_API_KEY:-}
volumes:

View File

@@ -2,11 +2,26 @@ from pydantic_settings import BaseSettings
class Settings(BaseSettings):
# EXAONE via Ollama
# EXAONE (rewriter) via Ollama
exaone_base_url: str = "http://localhost:11434"
exaone_model: str = "exaone3.5:7.8b-instruct-q8_0"
exaone_temperature: float = 0.7
exaone_timeout: float = 120.0
exaone_timeout: float = 30.0 # rewrite는 짧아야 함
# Gemma 4 (reasoner) via MLX on Mac mini
reasoning_base_url: str = "http://192.168.1.122:8800"
reasoning_model: str = "mlx-community/gemma-4-26b-a4b-it-8bit"
reasoning_temperature: float = 0.7
reasoning_timeout: float = 180.0
# Pipeline
pipeline_enabled: bool = True # False = EXAONE 단독 모드 (Phase 1 fallback)
# Queue
max_concurrent_jobs: int = 3
# Health check
health_check_interval: float = 30.0
# Server
host: str = "0.0.0.0"

View File

@@ -14,18 +14,35 @@ CREATE TABLE IF NOT EXISTS request_logs (
response_chars INTEGER DEFAULT 0,
latency_ms REAL DEFAULT 0,
created_at REAL NOT NULL,
completed_at REAL
completed_at REAL,
rewrite_model TEXT,
reasoning_model TEXT,
rewritten_message TEXT,
rewrite_latency_ms REAL DEFAULT 0
);
CREATE INDEX IF NOT EXISTS idx_logs_job ON request_logs(job_id);
CREATE INDEX IF NOT EXISTS idx_logs_created ON request_logs(created_at);
"""
# Phase 1 → Phase 2 마이그레이션 (이미 존재하면 무시)
MIGRATIONS = [
"ALTER TABLE request_logs ADD COLUMN rewrite_model TEXT",
"ALTER TABLE request_logs ADD COLUMN reasoning_model TEXT",
"ALTER TABLE request_logs ADD COLUMN rewritten_message TEXT",
"ALTER TABLE request_logs ADD COLUMN rewrite_latency_ms REAL DEFAULT 0",
]
async def init_db():
async with aiosqlite.connect(settings.db_path) as db:
await db.execute("PRAGMA journal_mode=WAL")
await db.executescript(SCHEMA)
for migration in MIGRATIONS:
try:
await db.execute(migration)
except Exception:
pass # 이미 존재하는 컬럼
await db.commit()
@@ -38,10 +55,26 @@ async def log_request(job_id: str, message: str, model: str, created_at: float):
await db.commit()
async def log_completion(job_id: str, status: str, response_chars: int, latency_ms: float, completed_at: float):
async def log_completion(
job_id: str,
status: str,
response_chars: int,
latency_ms: float,
completed_at: float,
*,
rewrite_model: str | None = None,
reasoning_model: str | None = None,
rewritten_message: str | None = None,
rewrite_latency_ms: float = 0,
):
async with aiosqlite.connect(settings.db_path) as db:
await db.execute(
"UPDATE request_logs SET status=?, response_chars=?, latency_ms=?, completed_at=? WHERE job_id=?",
(status, response_chars, latency_ms, completed_at, job_id),
"""UPDATE request_logs
SET status=?, response_chars=?, latency_ms=?, completed_at=?,
rewrite_model=?, reasoning_model=?, rewritten_message=?, rewrite_latency_ms=?
WHERE job_id=?""",
(status, response_chars, latency_ms, completed_at,
rewrite_model, reasoning_model, rewritten_message, rewrite_latency_ms,
job_id),
)
await db.commit()

View File

@@ -1,4 +1,4 @@
"""NanoClaude — 비동기 job 기반 AI Gateway."""
"""NanoClaude — 비동기 job 기반 AI Gateway (Phase 2: EXAONE → Gemma 파이프라인)."""
from __future__ import annotations
@@ -12,22 +12,29 @@ from fastapi.responses import JSONResponse
from config import settings
from db.database import init_db
from routers import chat
from services.backend_registry import backend_registry
from services.job_queue import init_queue, job_queue
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)s %(name)s%(message)s",
)
@asynccontextmanager
async def lifespan(app: FastAPI):
await init_db()
backend_registry.init_from_settings(settings)
backend_registry.start_health_loop(settings.health_check_interval)
init_queue(settings.max_concurrent_jobs)
yield
backend_registry.stop_health_loop()
app = FastAPI(
title="NanoClaude",
version="0.1.0",
description="비동기 job 기반 AI Gateway — Phase 1",
version="0.2.0",
description="비동기 job 기반 AI Gateway — Phase 2 (EXAONE → Gemma 파이프라인)",
lifespan=lifespan,
)
@@ -54,9 +61,14 @@ app.include_router(chat.router)
@app.get("/")
async def root():
return {"service": "NanoClaude", "version": "0.1.0", "phase": 1}
return {"service": "NanoClaude", "version": "0.2.0", "phase": 2}
@app.get("/health")
async def health():
return {"status": "ok"}
from services.job_queue import job_queue
return {
"status": "ok",
"backends": backend_registry.health_summary(),
"queue": job_queue.stats if job_queue else {},
}

View File

@@ -27,6 +27,14 @@ class CancelResponse(BaseModel):
status: str
class JobStatusResponse(BaseModel):
job_id: str
status: JobStatus
created_at: float
pipeline: bool
queue_position: int | None = None
class SSEEvent(BaseModel):
event: str # ack | processing | result | error | done
event: str # ack | processing | rewrite | result | error | done | queued
data: dict

View File

@@ -1,15 +1,13 @@
"""Chat router — POST /chat, GET /chat/{job_id}/stream, POST /chat/{job_id}/cancel."""
"""Chat router — POST /chat, GET /chat/{job_id}/stream, GET /chat/{job_id}/status, POST /chat/{job_id}/cancel."""
from __future__ import annotations
import asyncio
from fastapi import APIRouter, HTTPException
from fastapi.responses import StreamingResponse
from models.schemas import CancelResponse, ChatRequest, ChatResponse
from services import worker
from models.schemas import CancelResponse, ChatRequest, ChatResponse, JobStatusResponse
from services.job_manager import job_manager
from services.job_queue import job_queue
from services.state_stream import state_stream
router = APIRouter(tags=["chat"])
@@ -17,13 +15,10 @@ router = APIRouter(tags=["chat"])
@router.post("/chat", response_model=ChatResponse)
async def create_chat(body: ChatRequest):
"""job_id 즉시 반환 (ACK). 백그라운드에서 EXAONE 처리 시작."""
"""job_id 즉시 반환 (ACK). 백그라운드에서 파이프라인 처리 시작."""
job = job_manager.create(body.message)
state_stream.create(job.id)
task = asyncio.create_task(worker.run(job))
job_manager.attach_task(job.id, task)
await job_queue.submit(job)
return ChatResponse(job_id=job.id)
@@ -52,6 +47,22 @@ async def _stream_with_cleanup(job_id: str):
state_stream.cleanup(job_id)
@router.get("/chat/{job_id}/status", response_model=JobStatusResponse)
async def job_status(job_id: str):
"""job 상태 조회 (SSE 없이)."""
job = job_manager.get(job_id)
if not job:
raise HTTPException(status_code=404, detail="Job not found")
return JobStatusResponse(
job_id=job.id,
status=job.status,
created_at=job.created_at,
pipeline=job.pipeline,
queue_position=job_queue.position(job.id) if job_queue else None,
)
@router.post("/chat/{job_id}/cancel", response_model=CancelResponse)
async def cancel_chat(job_id: str):
"""진행 중인 job 취소."""
@@ -59,3 +70,11 @@ async def cancel_chat(job_id: str):
if not success:
raise HTTPException(status_code=404, detail="Job not found or already finished")
return CancelResponse(status="cancelled")
@router.get("/queue/stats")
async def queue_stats():
"""큐 통계."""
if job_queue:
return job_queue.stats
return {"pending": 0, "active": 0}

View File

@@ -0,0 +1,93 @@
"""BackendRegistry — 모델 어댑터 관리 + 헬스체크 루프."""
from __future__ import annotations
import asyncio
import logging
import time
from services.model_adapter import ModelAdapter
logger = logging.getLogger(__name__)
REWRITER_PROMPT = (
"너는 질문 재구성 전문가다. "
"사용자의 질문을 분석하여 의도를 명확히 하고, 구조화된 질문으로 재작성하라. "
"재구성된 질문만 출력하라. 부연 설명이나 답변은 절대 하지 마라."
)
REASONER_PROMPT = (
"너는 NanoClaude, 사용자의 질문에 구조화되고 정확한 답변을 제공하는 AI 어시스턴트다. "
"논리적으로 사고하고, 명확하게 설명하며, 필요시 예시를 포함하라."
)
class BackendRegistry:
def __init__(self) -> None:
self.rewriter: ModelAdapter | None = None
self.reasoner: ModelAdapter | None = None
self._health: dict[str, bool] = {"rewriter": False, "reasoner": False}
self._latency: dict[str, float] = {"rewriter": 0.0, "reasoner": 0.0}
self._health_task: asyncio.Task | None = None
def init_from_settings(self, settings) -> None:
self.rewriter = ModelAdapter(
name="EXAONE",
base_url=settings.exaone_base_url,
model=settings.exaone_model,
system_prompt=REWRITER_PROMPT,
temperature=settings.exaone_temperature,
timeout=settings.exaone_timeout,
)
self.reasoner = ModelAdapter(
name="Gemma4",
base_url=settings.reasoning_base_url,
model=settings.reasoning_model,
system_prompt=REASONER_PROMPT,
temperature=settings.reasoning_temperature,
timeout=settings.reasoning_timeout,
)
def start_health_loop(self, interval: float = 30.0) -> None:
self._health_task = asyncio.create_task(self._health_loop(interval))
def stop_health_loop(self) -> None:
if self._health_task and not self._health_task.done():
self._health_task.cancel()
async def _health_loop(self, interval: float) -> None:
while True:
await self._check_all()
await asyncio.sleep(interval)
async def _check_all(self) -> None:
for role, adapter in [("rewriter", self.rewriter), ("reasoner", self.reasoner)]:
if not adapter:
continue
start = time.monotonic()
healthy = await adapter.health_check()
elapsed = round((time.monotonic() - start) * 1000, 1)
prev = self._health[role]
self._health[role] = healthy
self._latency[role] = elapsed
if prev != healthy:
status = "UP" if healthy else "DOWN"
logger.warning("%s (%s) → %s (%.0fms)", adapter.name, role, status, elapsed)
def is_healthy(self, role: str) -> bool:
return self._health.get(role, False)
def health_summary(self) -> dict:
result = {}
for role, adapter in [("rewriter", self.rewriter), ("reasoner", self.reasoner)]:
if adapter:
result[role] = {
"name": adapter.name,
"model": adapter.model,
"healthy": self._health[role],
"latency_ms": self._latency[role],
}
return result
backend_registry = BackendRegistry()

View File

@@ -1,90 +0,0 @@
"""EXAONE Adapter — Ollama OpenAI-compat endpoint를 통한 EXAONE 호출."""
from __future__ import annotations
import logging
from collections.abc import AsyncGenerator
import httpx
from config import settings
logger = logging.getLogger(__name__)
SYSTEM_PROMPT = (
"너는 NanoClaude, 사용자의 질문을 이해하고 정리하여 명확한 답변을 제공하는 AI 어시스턴트다. "
"사용자의 질문 의도를 파악하고, 문장을 정리하며, 구조화된 응답을 생성한다."
)
async def stream_chat(message: str) -> AsyncGenerator[str, None]:
"""EXAONE 스트리밍 호출. OpenAI-compat SSE를 chunk 단위로 yield."""
payload = {
"model": settings.exaone_model,
"messages": [
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": message},
],
"stream": True,
"temperature": settings.exaone_temperature,
}
async with httpx.AsyncClient(timeout=settings.exaone_timeout) as client:
try:
async with client.stream(
"POST",
f"{settings.exaone_base_url}/v1/chat/completions",
json=payload,
) as resp:
if resp.status_code != 200:
body = await resp.aread()
logger.error("EXAONE error %d: %s", resp.status_code, body.decode())
yield f"[Error] EXAONE 응답 실패 ({resp.status_code})"
return
async for line in resp.aiter_lines():
line = line.strip()
if not line or not line.startswith("data: "):
continue
payload_str = line[len("data: "):]
if payload_str == "[DONE]":
return
# Extract content delta from OpenAI-format chunk
try:
import json
chunk = json.loads(payload_str)
delta = chunk.get("choices", [{}])[0].get("delta", {})
content = delta.get("content", "")
if content:
yield content
except (json.JSONDecodeError, IndexError, KeyError):
continue
except httpx.ConnectError:
logger.error("EXAONE connection failed: %s", settings.exaone_base_url)
yield "[Error] EXAONE 서버에 연결할 수 없습니다."
except httpx.ReadTimeout:
logger.error("EXAONE read timeout")
yield "[Error] EXAONE 응답 시간이 초과되었습니다."
async def complete_chat(message: str) -> str:
"""EXAONE 비스트리밍 호출. 전체 응답 텍스트 반환."""
payload = {
"model": settings.exaone_model,
"messages": [
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": message},
],
"stream": False,
"temperature": settings.exaone_temperature,
}
async with httpx.AsyncClient(timeout=settings.exaone_timeout) as client:
resp = await client.post(
f"{settings.exaone_base_url}/v1/chat/completions",
json=payload,
)
resp.raise_for_status()
data = resp.json()
return data["choices"][0]["message"]["content"]

View File

@@ -17,6 +17,8 @@ class Job:
status: JobStatus = JobStatus.queued
created_at: float = field(default_factory=time)
task: asyncio.Task | None = field(default=None, repr=False)
pipeline: bool = True
rewritten_message: str = ""
class JobManager:

View File

@@ -0,0 +1,57 @@
"""JobQueue — Semaphore 기반 동시성 제한."""
from __future__ import annotations
import asyncio
import logging
from services import worker
from services.job_manager import Job, job_manager
from services.state_stream import state_stream
logger = logging.getLogger(__name__)
class JobQueue:
def __init__(self, max_concurrent: int = 3) -> None:
self._semaphore = asyncio.Semaphore(max_concurrent)
self._waiting: list[str] = [] # 대기 중 job_id (순서 보장)
self._active: set[str] = set()
async def submit(self, job: Job) -> asyncio.Task:
task = asyncio.create_task(self._run_with_semaphore(job))
job_manager.attach_task(job.id, task)
return task
async def _run_with_semaphore(self, job: Job) -> None:
self._waiting.append(job.id)
pos = self.position(job.id)
if pos and pos > 0:
await state_stream.push(job.id, "queued", {"position": pos})
try:
async with self._semaphore:
self._waiting.remove(job.id)
self._active.add(job.id)
await worker.run(job)
finally:
self._active.discard(job.id)
def position(self, job_id: str) -> int | None:
try:
return self._waiting.index(job_id) + 1
except ValueError:
return None
@property
def stats(self) -> dict:
return {"pending": len(self._waiting), "active": len(self._active)}
job_queue: JobQueue | None = None
def init_queue(max_concurrent: int = 3) -> JobQueue:
global job_queue
job_queue = JobQueue(max_concurrent)
return job_queue

View File

@@ -0,0 +1,109 @@
"""ModelAdapter — 범용 OpenAI-compat 모델 어댑터."""
from __future__ import annotations
import json
import logging
from collections.abc import AsyncGenerator
import httpx
logger = logging.getLogger(__name__)
class ModelAdapter:
"""OpenAI-compatible /v1/chat/completions 백엔드 범용 어댑터.
Ollama, MLX 등 모두 동일 인터페이스로 호출."""
def __init__(
self,
name: str,
base_url: str,
model: str,
system_prompt: str,
temperature: float = 0.7,
timeout: float = 120.0,
):
self.name = name
self.base_url = base_url
self.model = model
self.system_prompt = system_prompt
self.temperature = temperature
self.timeout = timeout
async def stream_chat(self, message: str) -> AsyncGenerator[str, None]:
"""스트리밍 호출. content chunk를 yield."""
payload = {
"model": self.model,
"messages": [
{"role": "system", "content": self.system_prompt},
{"role": "user", "content": message},
],
"stream": True,
"temperature": self.temperature,
}
async with httpx.AsyncClient(timeout=self.timeout) as client:
try:
async with client.stream(
"POST",
f"{self.base_url}/v1/chat/completions",
json=payload,
) as resp:
if resp.status_code != 200:
body = await resp.aread()
logger.error("%s error %d: %s", self.name, resp.status_code, body.decode())
raise RuntimeError(f"{self.name} 응답 실패 ({resp.status_code})")
async for line in resp.aiter_lines():
line = line.strip()
if not line or not line.startswith("data: "):
continue
payload_str = line[len("data: "):]
if payload_str == "[DONE]":
return
try:
chunk = json.loads(payload_str)
delta = chunk.get("choices", [{}])[0].get("delta", {})
content = delta.get("content", "")
if content:
yield content
except (json.JSONDecodeError, IndexError, KeyError):
continue
except httpx.ConnectError:
logger.error("%s connection failed: %s", self.name, self.base_url)
raise
except httpx.ReadTimeout:
logger.error("%s read timeout", self.name)
raise
async def complete_chat(self, message: str) -> str:
"""비스트리밍 호출. 전체 응답 텍스트 반환."""
payload = {
"model": self.model,
"messages": [
{"role": "system", "content": self.system_prompt},
{"role": "user", "content": message},
],
"stream": False,
"temperature": self.temperature,
}
async with httpx.AsyncClient(timeout=self.timeout) as client:
resp = await client.post(
f"{self.base_url}/v1/chat/completions",
json=payload,
)
resp.raise_for_status()
data = resp.json()
return data["choices"][0]["message"]["content"]
async def health_check(self) -> bool:
"""GET /v1/models — 3초 timeout."""
try:
async with httpx.AsyncClient(timeout=3.0) as client:
resp = await client.get(f"{self.base_url}/v1/models")
return resp.status_code < 500
except Exception:
return False

View File

@@ -1,4 +1,4 @@
"""Worker — background task that drives EXAONE call and pushes SSE events."""
"""Worker — 2단계 파이프라인: EXAONE rewrite → Gemma reasoning (cancel-safe + fallback)."""
from __future__ import annotations
@@ -9,23 +9,66 @@ from time import time
from config import settings
from db.database import log_completion, log_request
from models.schemas import JobStatus
from services.exaone_adapter import stream_chat
from services.backend_registry import backend_registry
from services.job_manager import Job, job_manager
from services.state_stream import state_stream
logger = logging.getLogger(__name__)
# 무응답 방지: 3~5초 간격으로 processing heartbeat
HEARTBEAT_INTERVAL = 4.0
REWRITE_HEARTBEAT = 2.0
MAX_REWRITE_LENGTH = 1000
async def _complete_with_heartbeat(adapter, message: str, job_id: str) -> str:
"""complete_chat + heartbeat 병행. rewrite 대기 중 사용자 체감 멈춤 방지."""
result_holder: dict[str, str] = {}
exc_holder: list[Exception] = []
async def call():
try:
result_holder["text"] = await adapter.complete_chat(message)
except Exception as e:
exc_holder.append(e)
task = asyncio.create_task(call())
while not task.done():
await asyncio.sleep(REWRITE_HEARTBEAT)
if not task.done():
await state_stream.push(job_id, "processing", {"message": "질문을 재구성하고 있습니다..."})
if exc_holder:
raise exc_holder[0]
return result_holder.get("text", "")
async def _stream_with_cancel(adapter, message: str, job: Job, collected: list[str]) -> bool:
"""스트리밍 + cancel 체크. 정상 완료 시 True, cancel 시 False."""
last_heartbeat = asyncio.get_event_loop().time()
async for chunk in adapter.stream_chat(message):
if job.status == JobStatus.cancelled:
return False
collected.append(chunk)
await state_stream.push(job.id, "result", {"content": chunk})
now = asyncio.get_event_loop().time()
if now - last_heartbeat >= HEARTBEAT_INTERVAL:
await state_stream.push(job.id, "processing", {"message": "응답 생성 중..."})
last_heartbeat = now
return True
async def run(job: Job) -> None:
"""EXAONE 호출 → SSE 이벤트 발행."""
"""EXAONE rewrite → Gemma reasoning 파이프라인 (fallback + cancel-safe)."""
start_time = time()
rewrite_model = None
reasoning_model = None
rewritten_message = ""
# DB 로깅: 요청 기록
try:
await log_request(job.id, job.message, settings.exaone_model, job.created_at)
await log_request(job.id, job.message, "pipeline", job.created_at)
except Exception:
logger.warning("Failed to log request for job %s", job.id, exc_info=True)
@@ -34,49 +77,91 @@ async def run(job: Job) -> None:
await state_stream.push(job.id, "ack", {"message": "요청을 확인했습니다. 분석을 시작합니다."})
job_manager.set_status(job.id, JobStatus.processing)
# --- Processing + Streaming ---
await state_stream.push(job.id, "processing", {"message": "EXAONE 모델이 응답을 생성하고 있습니다..."})
# --- Cancel 체크 #1 ---
if job.status == JobStatus.cancelled:
return
use_pipeline = settings.pipeline_enabled and backend_registry.is_healthy("reasoner")
collected: list[str] = []
last_heartbeat = asyncio.get_event_loop().time()
async for chunk in stream_chat(job.message):
if not use_pipeline:
# === EXAONE 단독 모드 (Phase 1 fallback) ===
rewrite_model = backend_registry.rewriter.model
await state_stream.push(job.id, "processing", {"message": "EXAONE 모델이 응답을 생성하고 있습니다..."})
ok = await _stream_with_cancel(backend_registry.rewriter, job.message, job, collected)
if not ok:
return
else:
# === 파이프라인 모드: EXAONE rewrite → Gemma reasoning ===
rewrite_model = backend_registry.rewriter.model
reasoning_model = backend_registry.reasoner.model
# --- Rewrite ---
await state_stream.push(job.id, "processing", {"message": "질문을 재구성하고 있습니다..."})
rewrite_start = time()
try:
rewritten_message = await _complete_with_heartbeat(
backend_registry.rewriter, job.message, job.id
)
rewritten_message = rewritten_message[:MAX_REWRITE_LENGTH]
except Exception:
logger.warning("Rewrite failed for job %s, using original message", job.id)
rewritten_message = job.message
rewrite_latency = (time() - rewrite_start) * 1000
job.rewritten_message = rewritten_message
# --- Rewrite 결과 SSE 노출 ---
await state_stream.push(job.id, "rewrite", {"content": rewritten_message})
# --- Cancel 체크 #2 ---
if job.status == JobStatus.cancelled:
logger.info("Job %s cancelled during streaming", job.id)
await state_stream.push(job.id, "error", {"message": "작업이 취소되었습니다."})
latency_ms = (time() - start_time) * 1000
try:
await log_completion(job.id, "cancelled", len("".join(collected)), latency_ms, time())
except Exception:
pass
return
collected.append(chunk)
# --- Reasoning ---
await state_stream.push(job.id, "processing", {"message": "Gemma 4가 응답을 생성하고 있습니다..."})
# Stream partial result
await state_stream.push(job.id, "result", {"content": chunk})
try:
ok = await _stream_with_cancel(backend_registry.reasoner, rewritten_message, job, collected)
if not ok:
return
except Exception:
# Gemma streaming 중간 실패 → EXAONE fallback
logger.warning("Reasoner failed for job %s, falling back to rewriter", job.id, exc_info=True)
# Heartbeat: 긴 침묵 방지
now = asyncio.get_event_loop().time()
if now - last_heartbeat >= HEARTBEAT_INTERVAL:
await state_stream.push(job.id, "processing", {"message": "응답 생성 중..."})
last_heartbeat = now
if job.status == JobStatus.cancelled:
return
await state_stream.push(job.id, "processing", {"message": "모델 전환 중..."})
reasoning_model = rewrite_model # fallback 기록
ok = await _stream_with_cancel(backend_registry.rewriter, job.message, job, collected)
if not ok:
return
# --- Complete ---
if not collected:
job_manager.set_status(job.id, JobStatus.failed)
await state_stream.push(job.id, "error", {"message": "EXAONE으로부터 응답을 받지 못했습니다."})
await state_stream.push(job.id, "error", {"message": "응답을 받지 못했습니다."})
status = "failed"
else:
job_manager.set_status(job.id, JobStatus.completed)
await state_stream.push(job.id, "done", {"message": "완료"})
status = "completed"
# DB 로깅: 완료 기록
# --- DB 로깅 ---
latency_ms = (time() - start_time) * 1000
response_text = "".join(collected)
try:
await log_completion(job.id, status, len(response_text), latency_ms, time())
await log_completion(
job.id, status, len(response_text), latency_ms, time(),
rewrite_model=rewrite_model,
reasoning_model=reasoning_model,
rewritten_message=rewritten_message,
rewrite_latency_ms=rewrite_latency if use_pipeline else 0,
)
except Exception:
logger.warning("Failed to log completion for job %s", job.id, exc_info=True)