diff --git a/.env.example b/.env.example index b5aa0e1..df7653c 100644 --- a/.env.example +++ b/.env.example @@ -17,4 +17,8 @@ DB_PATH=/app/data/gateway.db # NanoClaude EXAONE_MODEL=exaone3.5:7.8b-instruct-q8_0 +REASONING_BASE_URL=http://192.168.1.122:8800 +REASONING_MODEL=mlx-community/gemma-4-26b-a4b-it-8bit +PIPELINE_ENABLED=true +MAX_CONCURRENT_JOBS=3 NANOCLAUDE_API_KEY= diff --git a/CLAUDE.md b/CLAUDE.md index 67211de..6141e6d 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -16,7 +16,8 @@ GPU 서버(RTX 4070 Ti Super)에서 운영하는 중앙 AI 라우팅 서비스. - GPU Ollama: host.docker.internal:11434 - 맥미니 Ollama: 100.115.153.119:11434 -- NanoClaude: localhost:8100 (비동기 job 기반 AI Gateway) +- 맥미니 MLX: 192.168.1.122:8800 (Gemma 4) +- NanoClaude: localhost:8100 (EXAONE → Gemma 파이프라인) ## 개발 @@ -41,7 +42,9 @@ OpenAI 호환: `/v1/chat/completions`, `/v1/models`, `/v1/embeddings` ## NanoClaude API 비동기 job 기반: `POST /nano/chat` → `{ job_id }`, `GET /nano/chat/{job_id}/stream` → SSE +상태: `GET /nano/chat/{job_id}/status`, 큐: `GET /nano/queue/stats` 취소: `POST /nano/chat/{job_id}/cancel` +파이프라인: EXAONE (rewrite) → Gemma 4 (reasoning), 맥미니 다운 시 EXAONE fallback ## 백엔드 설정 diff --git a/docker-compose.yml b/docker-compose.yml index 04a25a8..276bd84 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -54,6 +54,10 @@ services: environment: - EXAONE_BASE_URL=http://host.docker.internal:11434 - EXAONE_MODEL=${EXAONE_MODEL:-exaone3.5:7.8b-instruct-q8_0} + - REASONING_BASE_URL=${REASONING_BASE_URL:-http://192.168.1.122:8800} + - REASONING_MODEL=${REASONING_MODEL:-mlx-community/gemma-4-26b-a4b-it-8bit} + - PIPELINE_ENABLED=${PIPELINE_ENABLED:-true} + - MAX_CONCURRENT_JOBS=${MAX_CONCURRENT_JOBS:-3} - DB_PATH=/app/data/nanoclaude.db - API_KEY=${NANOCLAUDE_API_KEY:-} volumes: diff --git a/nanoclaude/config.py b/nanoclaude/config.py index 2295dad..8c4f8c3 100644 --- a/nanoclaude/config.py +++ b/nanoclaude/config.py @@ -2,11 +2,26 @@ from pydantic_settings import BaseSettings class Settings(BaseSettings): - # EXAONE via Ollama + # EXAONE (rewriter) via Ollama exaone_base_url: str = "http://localhost:11434" exaone_model: str = "exaone3.5:7.8b-instruct-q8_0" exaone_temperature: float = 0.7 - exaone_timeout: float = 120.0 + exaone_timeout: float = 30.0 # rewrite는 짧아야 함 + + # Gemma 4 (reasoner) via MLX on Mac mini + reasoning_base_url: str = "http://192.168.1.122:8800" + reasoning_model: str = "mlx-community/gemma-4-26b-a4b-it-8bit" + reasoning_temperature: float = 0.7 + reasoning_timeout: float = 180.0 + + # Pipeline + pipeline_enabled: bool = True # False = EXAONE 단독 모드 (Phase 1 fallback) + + # Queue + max_concurrent_jobs: int = 3 + + # Health check + health_check_interval: float = 30.0 # Server host: str = "0.0.0.0" diff --git a/nanoclaude/db/database.py b/nanoclaude/db/database.py index a95dbc9..25d27ee 100644 --- a/nanoclaude/db/database.py +++ b/nanoclaude/db/database.py @@ -14,18 +14,35 @@ CREATE TABLE IF NOT EXISTS request_logs ( response_chars INTEGER DEFAULT 0, latency_ms REAL DEFAULT 0, created_at REAL NOT NULL, - completed_at REAL + completed_at REAL, + rewrite_model TEXT, + reasoning_model TEXT, + rewritten_message TEXT, + rewrite_latency_ms REAL DEFAULT 0 ); CREATE INDEX IF NOT EXISTS idx_logs_job ON request_logs(job_id); CREATE INDEX IF NOT EXISTS idx_logs_created ON request_logs(created_at); """ +# Phase 1 → Phase 2 마이그레이션 (이미 존재하면 무시) +MIGRATIONS = [ + "ALTER TABLE request_logs ADD COLUMN rewrite_model TEXT", + "ALTER TABLE request_logs ADD COLUMN reasoning_model TEXT", + "ALTER TABLE request_logs ADD COLUMN rewritten_message TEXT", + "ALTER TABLE request_logs ADD COLUMN rewrite_latency_ms REAL DEFAULT 0", +] + async def init_db(): async with aiosqlite.connect(settings.db_path) as db: await db.execute("PRAGMA journal_mode=WAL") await db.executescript(SCHEMA) + for migration in MIGRATIONS: + try: + await db.execute(migration) + except Exception: + pass # 이미 존재하는 컬럼 await db.commit() @@ -38,10 +55,26 @@ async def log_request(job_id: str, message: str, model: str, created_at: float): await db.commit() -async def log_completion(job_id: str, status: str, response_chars: int, latency_ms: float, completed_at: float): +async def log_completion( + job_id: str, + status: str, + response_chars: int, + latency_ms: float, + completed_at: float, + *, + rewrite_model: str | None = None, + reasoning_model: str | None = None, + rewritten_message: str | None = None, + rewrite_latency_ms: float = 0, +): async with aiosqlite.connect(settings.db_path) as db: await db.execute( - "UPDATE request_logs SET status=?, response_chars=?, latency_ms=?, completed_at=? WHERE job_id=?", - (status, response_chars, latency_ms, completed_at, job_id), + """UPDATE request_logs + SET status=?, response_chars=?, latency_ms=?, completed_at=?, + rewrite_model=?, reasoning_model=?, rewritten_message=?, rewrite_latency_ms=? + WHERE job_id=?""", + (status, response_chars, latency_ms, completed_at, + rewrite_model, reasoning_model, rewritten_message, rewrite_latency_ms, + job_id), ) await db.commit() diff --git a/nanoclaude/main.py b/nanoclaude/main.py index 273220d..8c60ba1 100644 --- a/nanoclaude/main.py +++ b/nanoclaude/main.py @@ -1,4 +1,4 @@ -"""NanoClaude — 비동기 job 기반 AI Gateway.""" +"""NanoClaude — 비동기 job 기반 AI Gateway (Phase 2: EXAONE → Gemma 파이프라인).""" from __future__ import annotations @@ -12,22 +12,29 @@ from fastapi.responses import JSONResponse from config import settings from db.database import init_db from routers import chat +from services.backend_registry import backend_registry +from services.job_queue import init_queue, job_queue logging.basicConfig( level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s — %(message)s", ) + @asynccontextmanager async def lifespan(app: FastAPI): await init_db() + backend_registry.init_from_settings(settings) + backend_registry.start_health_loop(settings.health_check_interval) + init_queue(settings.max_concurrent_jobs) yield + backend_registry.stop_health_loop() app = FastAPI( title="NanoClaude", - version="0.1.0", - description="비동기 job 기반 AI Gateway — Phase 1", + version="0.2.0", + description="비동기 job 기반 AI Gateway — Phase 2 (EXAONE → Gemma 파이프라인)", lifespan=lifespan, ) @@ -54,9 +61,14 @@ app.include_router(chat.router) @app.get("/") async def root(): - return {"service": "NanoClaude", "version": "0.1.0", "phase": 1} + return {"service": "NanoClaude", "version": "0.2.0", "phase": 2} @app.get("/health") async def health(): - return {"status": "ok"} + from services.job_queue import job_queue + return { + "status": "ok", + "backends": backend_registry.health_summary(), + "queue": job_queue.stats if job_queue else {}, + } diff --git a/nanoclaude/models/schemas.py b/nanoclaude/models/schemas.py index 5121322..98273df 100644 --- a/nanoclaude/models/schemas.py +++ b/nanoclaude/models/schemas.py @@ -27,6 +27,14 @@ class CancelResponse(BaseModel): status: str +class JobStatusResponse(BaseModel): + job_id: str + status: JobStatus + created_at: float + pipeline: bool + queue_position: int | None = None + + class SSEEvent(BaseModel): - event: str # ack | processing | result | error | done + event: str # ack | processing | rewrite | result | error | done | queued data: dict diff --git a/nanoclaude/routers/chat.py b/nanoclaude/routers/chat.py index a7c3009..aaa97ef 100644 --- a/nanoclaude/routers/chat.py +++ b/nanoclaude/routers/chat.py @@ -1,15 +1,13 @@ -"""Chat router — POST /chat, GET /chat/{job_id}/stream, POST /chat/{job_id}/cancel.""" +"""Chat router — POST /chat, GET /chat/{job_id}/stream, GET /chat/{job_id}/status, POST /chat/{job_id}/cancel.""" from __future__ import annotations -import asyncio - from fastapi import APIRouter, HTTPException from fastapi.responses import StreamingResponse -from models.schemas import CancelResponse, ChatRequest, ChatResponse -from services import worker +from models.schemas import CancelResponse, ChatRequest, ChatResponse, JobStatusResponse from services.job_manager import job_manager +from services.job_queue import job_queue from services.state_stream import state_stream router = APIRouter(tags=["chat"]) @@ -17,13 +15,10 @@ router = APIRouter(tags=["chat"]) @router.post("/chat", response_model=ChatResponse) async def create_chat(body: ChatRequest): - """job_id 즉시 반환 (ACK). 백그라운드에서 EXAONE 처리 시작.""" + """job_id 즉시 반환 (ACK). 백그라운드에서 파이프라인 처리 시작.""" job = job_manager.create(body.message) state_stream.create(job.id) - - task = asyncio.create_task(worker.run(job)) - job_manager.attach_task(job.id, task) - + await job_queue.submit(job) return ChatResponse(job_id=job.id) @@ -52,6 +47,22 @@ async def _stream_with_cleanup(job_id: str): state_stream.cleanup(job_id) +@router.get("/chat/{job_id}/status", response_model=JobStatusResponse) +async def job_status(job_id: str): + """job 상태 조회 (SSE 없이).""" + job = job_manager.get(job_id) + if not job: + raise HTTPException(status_code=404, detail="Job not found") + + return JobStatusResponse( + job_id=job.id, + status=job.status, + created_at=job.created_at, + pipeline=job.pipeline, + queue_position=job_queue.position(job.id) if job_queue else None, + ) + + @router.post("/chat/{job_id}/cancel", response_model=CancelResponse) async def cancel_chat(job_id: str): """진행 중인 job 취소.""" @@ -59,3 +70,11 @@ async def cancel_chat(job_id: str): if not success: raise HTTPException(status_code=404, detail="Job not found or already finished") return CancelResponse(status="cancelled") + + +@router.get("/queue/stats") +async def queue_stats(): + """큐 통계.""" + if job_queue: + return job_queue.stats + return {"pending": 0, "active": 0} diff --git a/nanoclaude/services/backend_registry.py b/nanoclaude/services/backend_registry.py new file mode 100644 index 0000000..d0ca946 --- /dev/null +++ b/nanoclaude/services/backend_registry.py @@ -0,0 +1,93 @@ +"""BackendRegistry — 모델 어댑터 관리 + 헬스체크 루프.""" + +from __future__ import annotations + +import asyncio +import logging +import time + +from services.model_adapter import ModelAdapter + +logger = logging.getLogger(__name__) + +REWRITER_PROMPT = ( + "너는 질문 재구성 전문가다. " + "사용자의 질문을 분석하여 의도를 명확히 하고, 구조화된 질문으로 재작성하라. " + "재구성된 질문만 출력하라. 부연 설명이나 답변은 절대 하지 마라." +) + +REASONER_PROMPT = ( + "너는 NanoClaude, 사용자의 질문에 구조화되고 정확한 답변을 제공하는 AI 어시스턴트다. " + "논리적으로 사고하고, 명확하게 설명하며, 필요시 예시를 포함하라." +) + + +class BackendRegistry: + def __init__(self) -> None: + self.rewriter: ModelAdapter | None = None + self.reasoner: ModelAdapter | None = None + self._health: dict[str, bool] = {"rewriter": False, "reasoner": False} + self._latency: dict[str, float] = {"rewriter": 0.0, "reasoner": 0.0} + self._health_task: asyncio.Task | None = None + + def init_from_settings(self, settings) -> None: + self.rewriter = ModelAdapter( + name="EXAONE", + base_url=settings.exaone_base_url, + model=settings.exaone_model, + system_prompt=REWRITER_PROMPT, + temperature=settings.exaone_temperature, + timeout=settings.exaone_timeout, + ) + self.reasoner = ModelAdapter( + name="Gemma4", + base_url=settings.reasoning_base_url, + model=settings.reasoning_model, + system_prompt=REASONER_PROMPT, + temperature=settings.reasoning_temperature, + timeout=settings.reasoning_timeout, + ) + + def start_health_loop(self, interval: float = 30.0) -> None: + self._health_task = asyncio.create_task(self._health_loop(interval)) + + def stop_health_loop(self) -> None: + if self._health_task and not self._health_task.done(): + self._health_task.cancel() + + async def _health_loop(self, interval: float) -> None: + while True: + await self._check_all() + await asyncio.sleep(interval) + + async def _check_all(self) -> None: + for role, adapter in [("rewriter", self.rewriter), ("reasoner", self.reasoner)]: + if not adapter: + continue + start = time.monotonic() + healthy = await adapter.health_check() + elapsed = round((time.monotonic() - start) * 1000, 1) + prev = self._health[role] + self._health[role] = healthy + self._latency[role] = elapsed + if prev != healthy: + status = "UP" if healthy else "DOWN" + logger.warning("%s (%s) → %s (%.0fms)", adapter.name, role, status, elapsed) + + def is_healthy(self, role: str) -> bool: + return self._health.get(role, False) + + def health_summary(self) -> dict: + result = {} + for role, adapter in [("rewriter", self.rewriter), ("reasoner", self.reasoner)]: + if adapter: + result[role] = { + "name": adapter.name, + "model": adapter.model, + "healthy": self._health[role], + "latency_ms": self._latency[role], + } + return result + + +backend_registry = BackendRegistry() diff --git a/nanoclaude/services/exaone_adapter.py b/nanoclaude/services/exaone_adapter.py deleted file mode 100644 index 0979418..0000000 --- a/nanoclaude/services/exaone_adapter.py +++ /dev/null @@ -1,90 +0,0 @@ -"""EXAONE Adapter — Ollama OpenAI-compat endpoint를 통한 EXAONE 호출.""" - -from __future__ import annotations - -import logging -from collections.abc import AsyncGenerator - -import httpx - -from config import settings - -logger = logging.getLogger(__name__) - -SYSTEM_PROMPT = ( - "너는 NanoClaude, 사용자의 질문을 이해하고 정리하여 명확한 답변을 제공하는 AI 어시스턴트다. " - "사용자의 질문 의도를 파악하고, 문장을 정리하며, 구조화된 응답을 생성한다." -) - - -async def stream_chat(message: str) -> AsyncGenerator[str, None]: - """EXAONE 스트리밍 호출. OpenAI-compat SSE를 chunk 단위로 yield.""" - payload = { - "model": settings.exaone_model, - "messages": [ - {"role": "system", "content": SYSTEM_PROMPT}, - {"role": "user", "content": message}, - ], - "stream": True, - "temperature": settings.exaone_temperature, - } - - async with httpx.AsyncClient(timeout=settings.exaone_timeout) as client: - try: - async with client.stream( - "POST", - f"{settings.exaone_base_url}/v1/chat/completions", - json=payload, - ) as resp: - if resp.status_code != 200: - body = await resp.aread() - logger.error("EXAONE error %d: %s", resp.status_code, body.decode()) - yield f"[Error] EXAONE 응답 실패 ({resp.status_code})" - return - - async for line in resp.aiter_lines(): - line = line.strip() - if not line or not line.startswith("data: "): - continue - payload_str = line[len("data: "):] - if payload_str == "[DONE]": - return - # Extract content delta from OpenAI-format chunk - try: - import json - chunk = json.loads(payload_str) - delta = chunk.get("choices", [{}])[0].get("delta", {}) - content = delta.get("content", "") - if content: - yield content - except (json.JSONDecodeError, IndexError, KeyError): - continue - - except httpx.ConnectError: - logger.error("EXAONE connection failed: %s", settings.exaone_base_url) - yield "[Error] EXAONE 서버에 연결할 수 없습니다." - except httpx.ReadTimeout: - logger.error("EXAONE read timeout") - yield "[Error] EXAONE 응답 시간이 초과되었습니다." - - -async def complete_chat(message: str) -> str: - """EXAONE 비스트리밍 호출. 전체 응답 텍스트 반환.""" - payload = { - "model": settings.exaone_model, - "messages": [ - {"role": "system", "content": SYSTEM_PROMPT}, - {"role": "user", "content": message}, - ], - "stream": False, - "temperature": settings.exaone_temperature, - } - - async with httpx.AsyncClient(timeout=settings.exaone_timeout) as client: - resp = await client.post( - f"{settings.exaone_base_url}/v1/chat/completions", - json=payload, - ) - resp.raise_for_status() - data = resp.json() - return data["choices"][0]["message"]["content"] diff --git a/nanoclaude/services/job_manager.py b/nanoclaude/services/job_manager.py index 84f963b..723f0fc 100644 --- a/nanoclaude/services/job_manager.py +++ b/nanoclaude/services/job_manager.py @@ -17,6 +17,8 @@ class Job: status: JobStatus = JobStatus.queued created_at: float = field(default_factory=time) task: asyncio.Task | None = field(default=None, repr=False) + pipeline: bool = True + rewritten_message: str = "" class JobManager: diff --git a/nanoclaude/services/job_queue.py b/nanoclaude/services/job_queue.py new file mode 100644 index 0000000..f3831d8 --- /dev/null +++ b/nanoclaude/services/job_queue.py @@ -0,0 +1,57 @@ +"""JobQueue — Semaphore 기반 동시성 제한.""" + +from __future__ import annotations + +import asyncio +import logging + +from services import worker +from services.job_manager import Job, job_manager +from services.state_stream import state_stream + +logger = logging.getLogger(__name__) + + +class JobQueue: + def __init__(self, max_concurrent: int = 3) -> None: + self._semaphore = asyncio.Semaphore(max_concurrent) + self._waiting: list[str] = [] # 대기 중 job_id (순서 보장) + self._active: set[str] = set() + + async def submit(self, job: Job) -> asyncio.Task: + task = asyncio.create_task(self._run_with_semaphore(job)) + job_manager.attach_task(job.id, task) + return task + + async def _run_with_semaphore(self, job: Job) -> None: + self._waiting.append(job.id) + pos = self.position(job.id) + if pos and pos > 0: + await state_stream.push(job.id, "queued", {"position": pos}) + + try: + async with self._semaphore: + self._waiting.remove(job.id) + self._active.add(job.id) + await worker.run(job) + finally: + self._active.discard(job.id) + + def position(self, job_id: str) -> int | None: + try: + return self._waiting.index(job_id) + 1 + except ValueError: + return None + + @property + def stats(self) -> dict: + return {"pending": len(self._waiting), "active": len(self._active)} + + +job_queue: JobQueue | None = None + + +def init_queue(max_concurrent: int = 3) -> JobQueue: + global job_queue + job_queue = JobQueue(max_concurrent) + return job_queue diff --git a/nanoclaude/services/model_adapter.py b/nanoclaude/services/model_adapter.py new file mode 100644 index 0000000..e13641b --- /dev/null +++ b/nanoclaude/services/model_adapter.py @@ -0,0 +1,109 @@ +"""ModelAdapter — 범용 OpenAI-compat 모델 어댑터.""" + +from __future__ import annotations + +import json +import logging +from collections.abc import AsyncGenerator + +import httpx + +logger = logging.getLogger(__name__) + + +class ModelAdapter: + """OpenAI-compatible /v1/chat/completions 백엔드 범용 어댑터. + Ollama, MLX 등 모두 동일 인터페이스로 호출.""" + + def __init__( + self, + name: str, + base_url: str, + model: str, + system_prompt: str, + temperature: float = 0.7, + timeout: float = 120.0, + ): + self.name = name + self.base_url = base_url + self.model = model + self.system_prompt = system_prompt + self.temperature = temperature + self.timeout = timeout + + async def stream_chat(self, message: str) -> AsyncGenerator[str, None]: + """스트리밍 호출. content chunk를 yield.""" + payload = { + "model": self.model, + "messages": [ + {"role": "system", "content": self.system_prompt}, + {"role": "user", "content": message}, + ], + "stream": True, + "temperature": self.temperature, + } + + async with httpx.AsyncClient(timeout=self.timeout) as client: + try: + async with client.stream( + "POST", + f"{self.base_url}/v1/chat/completions", + json=payload, + ) as resp: + if resp.status_code != 200: + body = await resp.aread() + logger.error("%s error %d: %s", self.name, resp.status_code, body.decode()) + raise RuntimeError(f"{self.name} 응답 실패 ({resp.status_code})") + + async for line in resp.aiter_lines(): + line = line.strip() + if not line or not line.startswith("data: "): + continue + payload_str = line[len("data: "):] + if payload_str == "[DONE]": + return + try: + chunk = json.loads(payload_str) + delta = chunk.get("choices", [{}])[0].get("delta", {}) + content = delta.get("content", "") + if content: + yield content + except (json.JSONDecodeError, IndexError, KeyError): + continue + + except httpx.ConnectError: + logger.error("%s connection failed: %s", self.name, self.base_url) + raise + except httpx.ReadTimeout: + logger.error("%s read timeout", self.name) + raise + + async def complete_chat(self, message: str) -> str: + """비스트리밍 호출. 전체 응답 텍스트 반환.""" + payload = { + "model": self.model, + "messages": [ + {"role": "system", "content": self.system_prompt}, + {"role": "user", "content": message}, + ], + "stream": False, + "temperature": self.temperature, + } + + async with httpx.AsyncClient(timeout=self.timeout) as client: + resp = await client.post( + f"{self.base_url}/v1/chat/completions", + json=payload, + ) + resp.raise_for_status() + data = resp.json() + return data["choices"][0]["message"]["content"] + + async def health_check(self) -> bool: + """GET /v1/models — 3초 timeout.""" + try: + async with httpx.AsyncClient(timeout=3.0) as client: + resp = await client.get(f"{self.base_url}/v1/models") + return resp.status_code < 500 + except Exception: + return False diff --git a/nanoclaude/services/worker.py b/nanoclaude/services/worker.py index 19aa1ce..9f84edb 100644 --- a/nanoclaude/services/worker.py +++ b/nanoclaude/services/worker.py @@ -1,4 +1,4 @@ -"""Worker — background task that drives EXAONE call and pushes SSE events.""" +"""Worker — 2단계 파이프라인: EXAONE rewrite → Gemma reasoning (cancel-safe + fallback).""" from __future__ import annotations @@ -9,23 +9,66 @@ from time import time from config import settings from db.database import log_completion, log_request from models.schemas import JobStatus -from services.exaone_adapter import stream_chat +from services.backend_registry import backend_registry from services.job_manager import Job, job_manager from services.state_stream import state_stream logger = logging.getLogger(__name__) -# 무응답 방지: 3~5초 간격으로 processing heartbeat HEARTBEAT_INTERVAL = 4.0 +REWRITE_HEARTBEAT = 2.0 +MAX_REWRITE_LENGTH = 1000 + + +async def _complete_with_heartbeat(adapter, message: str, job_id: str) -> str: + """complete_chat + heartbeat 병행. rewrite 대기 중 사용자 체감 멈춤 방지.""" + result_holder: dict[str, str] = {} + exc_holder: list[Exception] = [] + + async def call(): + try: + result_holder["text"] = await adapter.complete_chat(message) + except Exception as e: + exc_holder.append(e) + + task = asyncio.create_task(call()) + while not task.done(): + await asyncio.sleep(REWRITE_HEARTBEAT) + if not task.done(): + await state_stream.push(job_id, "processing", {"message": "질문을 재구성하고 있습니다..."}) + + if exc_holder: + raise exc_holder[0] + return result_holder.get("text", "") + + +async def _stream_with_cancel(adapter, message: str, job: Job, collected: list[str]) -> bool: + """스트리밍 + cancel 체크. 정상 완료 시 True, cancel 시 False.""" + last_heartbeat = asyncio.get_event_loop().time() + + async for chunk in adapter.stream_chat(message): + if job.status == JobStatus.cancelled: + return False + collected.append(chunk) + await state_stream.push(job.id, "result", {"content": chunk}) + + now = asyncio.get_event_loop().time() + if now - last_heartbeat >= HEARTBEAT_INTERVAL: + await state_stream.push(job.id, "processing", {"message": "응답 생성 중..."}) + last_heartbeat = now + + return True async def run(job: Job) -> None: - """EXAONE 호출 → SSE 이벤트 발행.""" + """EXAONE rewrite → Gemma reasoning 파이프라인 (fallback + cancel-safe).""" start_time = time() + rewrite_model = None + reasoning_model = None + rewritten_message = "" - # DB 로깅: 요청 기록 try: - await log_request(job.id, job.message, settings.exaone_model, job.created_at) + await log_request(job.id, job.message, "pipeline", job.created_at) except Exception: logger.warning("Failed to log request for job %s", job.id, exc_info=True) @@ -34,49 +77,91 @@ async def run(job: Job) -> None: await state_stream.push(job.id, "ack", {"message": "요청을 확인했습니다. 분석을 시작합니다."}) job_manager.set_status(job.id, JobStatus.processing) - # --- Processing + Streaming --- - await state_stream.push(job.id, "processing", {"message": "EXAONE 모델이 응답을 생성하고 있습니다..."}) + # --- Cancel 체크 #1 --- + if job.status == JobStatus.cancelled: + return + use_pipeline = settings.pipeline_enabled and backend_registry.is_healthy("reasoner") collected: list[str] = [] - last_heartbeat = asyncio.get_event_loop().time() - async for chunk in stream_chat(job.message): + if not use_pipeline: + # === EXAONE 단독 모드 (Phase 1 fallback) === + rewrite_model = backend_registry.rewriter.model + await state_stream.push(job.id, "processing", {"message": "EXAONE 모델이 응답을 생성하고 있습니다..."}) + + ok = await _stream_with_cancel(backend_registry.rewriter, job.message, job, collected) + if not ok: + return + else: + # === 파이프라인 모드: EXAONE rewrite → Gemma reasoning === + rewrite_model = backend_registry.rewriter.model + reasoning_model = backend_registry.reasoner.model + + # --- Rewrite --- + await state_stream.push(job.id, "processing", {"message": "질문을 재구성하고 있습니다..."}) + rewrite_start = time() + + try: + rewritten_message = await _complete_with_heartbeat( + backend_registry.rewriter, job.message, job.id + ) + rewritten_message = rewritten_message[:MAX_REWRITE_LENGTH] + except Exception: + logger.warning("Rewrite failed for job %s, using original message", job.id) + rewritten_message = job.message + + rewrite_latency = (time() - rewrite_start) * 1000 + job.rewritten_message = rewritten_message + + # --- Rewrite 결과 SSE 노출 --- + await state_stream.push(job.id, "rewrite", {"content": rewritten_message}) + + # --- Cancel 체크 #2 --- if job.status == JobStatus.cancelled: - logger.info("Job %s cancelled during streaming", job.id) - await state_stream.push(job.id, "error", {"message": "작업이 취소되었습니다."}) - latency_ms = (time() - start_time) * 1000 - try: - await log_completion(job.id, "cancelled", len("".join(collected)), latency_ms, time()) - except Exception: - pass return - collected.append(chunk) + # --- Reasoning --- + await state_stream.push(job.id, "processing", {"message": "Gemma 4가 응답을 생성하고 있습니다..."}) - # Stream partial result - await state_stream.push(job.id, "result", {"content": chunk}) + try: + ok = await _stream_with_cancel(backend_registry.reasoner, rewritten_message, job, collected) + if not ok: + return + except Exception: + # Gemma streaming 중간 실패 → EXAONE fallback + logger.warning("Reasoner failed for job %s, falling back to rewriter", job.id, exc_info=True) - # Heartbeat: 긴 침묵 방지 - now = asyncio.get_event_loop().time() - if now - last_heartbeat >= HEARTBEAT_INTERVAL: - await state_stream.push(job.id, "processing", {"message": "응답 생성 중..."}) - last_heartbeat = now + if job.status == JobStatus.cancelled: + return + + await state_stream.push(job.id, "processing", {"message": "모델 전환 중..."}) + reasoning_model = rewrite_model # fallback 기록 + + ok = await _stream_with_cancel(backend_registry.rewriter, job.message, job, collected) + if not ok: + return # --- Complete --- if not collected: job_manager.set_status(job.id, JobStatus.failed) - await state_stream.push(job.id, "error", {"message": "EXAONE으로부터 응답을 받지 못했습니다."}) + await state_stream.push(job.id, "error", {"message": "응답을 받지 못했습니다."}) status = "failed" else: job_manager.set_status(job.id, JobStatus.completed) await state_stream.push(job.id, "done", {"message": "완료"}) status = "completed" - # DB 로깅: 완료 기록 + # --- DB 로깅 --- latency_ms = (time() - start_time) * 1000 response_text = "".join(collected) try: - await log_completion(job.id, status, len(response_text), latency_ms, time()) + await log_completion( + job.id, status, len(response_text), latency_ms, time(), + rewrite_model=rewrite_model, + reasoning_model=reasoning_model, + rewritten_message=rewritten_message, + rewrite_latency_ms=rewrite_latency if use_pipeline else 0, + ) except Exception: logger.warning("Failed to log completion for job %s", job.id, exc_info=True)