From d946b769e506aac9af543a95a3be5a61eb7f92ae Mon Sep 17 00:00:00 2001 From: Hyungi Ahn Date: Mon, 6 Apr 2026 11:12:04 +0900 Subject: [PATCH] =?UTF-8?q?feat:=20NanoClaude=20Phase=201=20=E2=80=94=20?= =?UTF-8?q?=EB=B9=84=EB=8F=99=EA=B8=B0=20job=20=EA=B8=B0=EB=B0=98=20AI=20G?= =?UTF-8?q?ateway=20=EC=BD=94=EC=96=B4=20=EA=B5=AC=ED=98=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit POST /chat → job_id ACK, GET /chat/{job_id}/stream → SSE 스트리밍, EXAONE Ollama adapter, JobManager, StateStream, Worker 구조 Co-Authored-By: Claude Opus 4.6 (1M context) --- nanoclaude/Dockerfile | 12 ++++ nanoclaude/config.py | 21 +++++++ nanoclaude/main.py | 53 ++++++++++++++++ nanoclaude/models/__init__.py | 0 nanoclaude/models/schemas.py | 32 ++++++++++ nanoclaude/requirements.txt | 4 ++ nanoclaude/routers/__init__.py | 0 nanoclaude/routers/chat.py | 61 ++++++++++++++++++ nanoclaude/services/__init__.py | 0 nanoclaude/services/exaone_adapter.py | 90 +++++++++++++++++++++++++++ nanoclaude/services/job_manager.py | 57 +++++++++++++++++ nanoclaude/services/state_stream.py | 55 ++++++++++++++++ nanoclaude/services/worker.py | 65 +++++++++++++++++++ 13 files changed, 450 insertions(+) create mode 100644 nanoclaude/Dockerfile create mode 100644 nanoclaude/config.py create mode 100644 nanoclaude/main.py create mode 100644 nanoclaude/models/__init__.py create mode 100644 nanoclaude/models/schemas.py create mode 100644 nanoclaude/requirements.txt create mode 100644 nanoclaude/routers/__init__.py create mode 100644 nanoclaude/routers/chat.py create mode 100644 nanoclaude/services/__init__.py create mode 100644 nanoclaude/services/exaone_adapter.py create mode 100644 nanoclaude/services/job_manager.py create mode 100644 nanoclaude/services/state_stream.py create mode 100644 nanoclaude/services/worker.py diff --git a/nanoclaude/Dockerfile b/nanoclaude/Dockerfile new file mode 100644 index 0000000..5e3216e --- /dev/null +++ b/nanoclaude/Dockerfile @@ -0,0 +1,12 @@ +FROM python:3.12-slim + +WORKDIR /app + +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt + +COPY . . + +EXPOSE 8100 + +CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8100"] diff --git a/nanoclaude/config.py b/nanoclaude/config.py new file mode 100644 index 0000000..b56d353 --- /dev/null +++ b/nanoclaude/config.py @@ -0,0 +1,21 @@ +from pydantic_settings import BaseSettings + + +class Settings(BaseSettings): + # EXAONE via Ollama + exaone_base_url: str = "http://localhost:11434" + exaone_model: str = "exaone:7.8b" + exaone_temperature: float = 0.7 + exaone_timeout: float = 120.0 + + # Server + host: str = "0.0.0.0" + port: int = 8100 + + # Optional API key (empty = disabled) + api_key: str = "" + + model_config = {"env_file": ".env", "extra": "ignore"} + + +settings = Settings() diff --git a/nanoclaude/main.py b/nanoclaude/main.py new file mode 100644 index 0000000..846830d --- /dev/null +++ b/nanoclaude/main.py @@ -0,0 +1,53 @@ +"""NanoClaude — 비동기 job 기반 AI Gateway.""" + +from __future__ import annotations + +import logging + +from fastapi import FastAPI, Request +from fastapi.middleware.cors import CORSMiddleware +from fastapi.responses import JSONResponse + +from config import settings +from routers import chat + +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s %(levelname)s %(name)s — %(message)s", +) + +app = FastAPI( + title="NanoClaude", + version="0.1.0", + description="비동기 job 기반 AI Gateway — Phase 1", +) + +app.add_middleware( + CORSMiddleware, + allow_origins=["*"], + allow_methods=["*"], + allow_headers=["*"], +) + + +@app.middleware("http") +async def check_api_key(request: Request, call_next): + """Optional API key check. 설정이 비어있으면 통과.""" + if settings.api_key: + auth = request.headers.get("Authorization", "") + if request.url.path not in ("/", "/health") and auth != f"Bearer {settings.api_key}": + return JSONResponse(status_code=401, content={"detail": "Invalid API key"}) + return await call_next(request) + + +app.include_router(chat.router) + + +@app.get("/") +async def root(): + return {"service": "NanoClaude", "version": "0.1.0", "phase": 1} + + +@app.get("/health") +async def health(): + return {"status": "ok"} diff --git a/nanoclaude/models/__init__.py b/nanoclaude/models/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/nanoclaude/models/schemas.py b/nanoclaude/models/schemas.py new file mode 100644 index 0000000..5121322 --- /dev/null +++ b/nanoclaude/models/schemas.py @@ -0,0 +1,32 @@ +"""Pydantic models for NanoClaude API.""" + +from __future__ import annotations + +from enum import Enum + +from pydantic import BaseModel + + +class JobStatus(str, Enum): + queued = "queued" + processing = "processing" + completed = "completed" + failed = "failed" + cancelled = "cancelled" + + +class ChatRequest(BaseModel): + message: str + + +class ChatResponse(BaseModel): + job_id: str + + +class CancelResponse(BaseModel): + status: str + + +class SSEEvent(BaseModel): + event: str # ack | processing | result | error | done + data: dict diff --git a/nanoclaude/requirements.txt b/nanoclaude/requirements.txt new file mode 100644 index 0000000..6a40415 --- /dev/null +++ b/nanoclaude/requirements.txt @@ -0,0 +1,4 @@ +fastapi==0.115.0 +uvicorn[standard]==0.30.0 +httpx==0.27.0 +pydantic-settings==2.5.0 diff --git a/nanoclaude/routers/__init__.py b/nanoclaude/routers/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/nanoclaude/routers/chat.py b/nanoclaude/routers/chat.py new file mode 100644 index 0000000..a7c3009 --- /dev/null +++ b/nanoclaude/routers/chat.py @@ -0,0 +1,61 @@ +"""Chat router — POST /chat, GET /chat/{job_id}/stream, POST /chat/{job_id}/cancel.""" + +from __future__ import annotations + +import asyncio + +from fastapi import APIRouter, HTTPException +from fastapi.responses import StreamingResponse + +from models.schemas import CancelResponse, ChatRequest, ChatResponse +from services import worker +from services.job_manager import job_manager +from services.state_stream import state_stream + +router = APIRouter(tags=["chat"]) + + +@router.post("/chat", response_model=ChatResponse) +async def create_chat(body: ChatRequest): + """job_id 즉시 반환 (ACK). 백그라운드에서 EXAONE 처리 시작.""" + job = job_manager.create(body.message) + state_stream.create(job.id) + + task = asyncio.create_task(worker.run(job)) + job_manager.attach_task(job.id, task) + + return ChatResponse(job_id=job.id) + + +@router.get("/chat/{job_id}/stream") +async def stream_chat(job_id: str): + """SSE 스트림으로 상태 + 결과 전달.""" + job = job_manager.get(job_id) + if not job: + raise HTTPException(status_code=404, detail="Job not found") + + return StreamingResponse( + _stream_with_cleanup(job_id), + media_type="text/event-stream", + headers={ + "Cache-Control": "no-cache", + "X-Accel-Buffering": "no", + }, + ) + + +async def _stream_with_cleanup(job_id: str): + try: + async for chunk in state_stream.subscribe(job_id): + yield chunk + finally: + state_stream.cleanup(job_id) + + +@router.post("/chat/{job_id}/cancel", response_model=CancelResponse) +async def cancel_chat(job_id: str): + """진행 중인 job 취소.""" + success = job_manager.cancel(job_id) + if not success: + raise HTTPException(status_code=404, detail="Job not found or already finished") + return CancelResponse(status="cancelled") diff --git a/nanoclaude/services/__init__.py b/nanoclaude/services/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/nanoclaude/services/exaone_adapter.py b/nanoclaude/services/exaone_adapter.py new file mode 100644 index 0000000..0979418 --- /dev/null +++ b/nanoclaude/services/exaone_adapter.py @@ -0,0 +1,90 @@ +"""EXAONE Adapter — Ollama OpenAI-compat endpoint를 통한 EXAONE 호출.""" + +from __future__ import annotations + +import logging +from collections.abc import AsyncGenerator + +import httpx + +from config import settings + +logger = logging.getLogger(__name__) + +SYSTEM_PROMPT = ( + "너는 NanoClaude, 사용자의 질문을 이해하고 정리하여 명확한 답변을 제공하는 AI 어시스턴트다. " + "사용자의 질문 의도를 파악하고, 문장을 정리하며, 구조화된 응답을 생성한다." +) + + +async def stream_chat(message: str) -> AsyncGenerator[str, None]: + """EXAONE 스트리밍 호출. OpenAI-compat SSE를 chunk 단위로 yield.""" + payload = { + "model": settings.exaone_model, + "messages": [ + {"role": "system", "content": SYSTEM_PROMPT}, + {"role": "user", "content": message}, + ], + "stream": True, + "temperature": settings.exaone_temperature, + } + + async with httpx.AsyncClient(timeout=settings.exaone_timeout) as client: + try: + async with client.stream( + "POST", + f"{settings.exaone_base_url}/v1/chat/completions", + json=payload, + ) as resp: + if resp.status_code != 200: + body = await resp.aread() + logger.error("EXAONE error %d: %s", resp.status_code, body.decode()) + yield f"[Error] EXAONE 응답 실패 ({resp.status_code})" + return + + async for line in resp.aiter_lines(): + line = line.strip() + if not line or not line.startswith("data: "): + continue + payload_str = line[len("data: "):] + if payload_str == "[DONE]": + return + # Extract content delta from OpenAI-format chunk + try: + import json + chunk = json.loads(payload_str) + delta = chunk.get("choices", [{}])[0].get("delta", {}) + content = delta.get("content", "") + if content: + yield content + except (json.JSONDecodeError, IndexError, KeyError): + continue + + except httpx.ConnectError: + logger.error("EXAONE connection failed: %s", settings.exaone_base_url) + yield "[Error] EXAONE 서버에 연결할 수 없습니다." + except httpx.ReadTimeout: + logger.error("EXAONE read timeout") + yield "[Error] EXAONE 응답 시간이 초과되었습니다." + + +async def complete_chat(message: str) -> str: + """EXAONE 비스트리밍 호출. 전체 응답 텍스트 반환.""" + payload = { + "model": settings.exaone_model, + "messages": [ + {"role": "system", "content": SYSTEM_PROMPT}, + {"role": "user", "content": message}, + ], + "stream": False, + "temperature": settings.exaone_temperature, + } + + async with httpx.AsyncClient(timeout=settings.exaone_timeout) as client: + resp = await client.post( + f"{settings.exaone_base_url}/v1/chat/completions", + json=payload, + ) + resp.raise_for_status() + data = resp.json() + return data["choices"][0]["message"]["content"] diff --git a/nanoclaude/services/job_manager.py b/nanoclaude/services/job_manager.py new file mode 100644 index 0000000..84f963b --- /dev/null +++ b/nanoclaude/services/job_manager.py @@ -0,0 +1,57 @@ +"""JobManager — job lifecycle: create, track status, cancel.""" + +from __future__ import annotations + +import asyncio +import uuid +from dataclasses import dataclass, field +from time import time + +from models.schemas import JobStatus + + +@dataclass +class Job: + id: str + message: str + status: JobStatus = JobStatus.queued + created_at: float = field(default_factory=time) + task: asyncio.Task | None = field(default=None, repr=False) + + +class JobManager: + def __init__(self) -> None: + self._jobs: dict[str, Job] = {} + + def create(self, message: str) -> Job: + job_id = uuid.uuid4().hex[:12] + job = Job(id=job_id, message=message) + self._jobs[job_id] = job + return job + + def get(self, job_id: str) -> Job | None: + return self._jobs.get(job_id) + + def set_status(self, job_id: str, status: JobStatus) -> None: + job = self._jobs.get(job_id) + if job: + job.status = status + + def cancel(self, job_id: str) -> bool: + job = self._jobs.get(job_id) + if not job: + return False + if job.status in (JobStatus.completed, JobStatus.failed, JobStatus.cancelled): + return False + job.status = JobStatus.cancelled + if job.task and not job.task.done(): + job.task.cancel() + return True + + def attach_task(self, job_id: str, task: asyncio.Task) -> None: + job = self._jobs.get(job_id) + if job: + job.task = task + + +job_manager = JobManager() diff --git a/nanoclaude/services/state_stream.py b/nanoclaude/services/state_stream.py new file mode 100644 index 0000000..7aad497 --- /dev/null +++ b/nanoclaude/services/state_stream.py @@ -0,0 +1,55 @@ +"""StateStream — per-job SSE event queue.""" + +from __future__ import annotations + +import asyncio +import json +import logging +from collections.abc import AsyncGenerator + +logger = logging.getLogger(__name__) + + +class StateStream: + """Manages per-job asyncio.Queue for SSE events.""" + + def __init__(self) -> None: + self._queues: dict[str, asyncio.Queue] = {} + + def create(self, job_id: str) -> None: + self._queues[job_id] = asyncio.Queue() + + async def push(self, job_id: str, event: str, data: dict) -> None: + q = self._queues.get(job_id) + if q: + await q.put((event, data)) + + async def push_done(self, job_id: str) -> None: + """Push sentinel to signal stream end.""" + q = self._queues.get(job_id) + if q: + await q.put(None) + + async def subscribe(self, job_id: str) -> AsyncGenerator[str, None]: + """Yield SSE-formatted strings until done sentinel.""" + q = self._queues.get(job_id) + if not q: + yield _sse("error", {"message": "Job not found"}) + return + + while True: + item = await q.get() + if item is None: + break + event, data = item + yield _sse(event, data) + + def cleanup(self, job_id: str) -> None: + self._queues.pop(job_id, None) + + +def _sse(event: str, data: dict) -> str: + return f"event: {event}\ndata: {json.dumps(data, ensure_ascii=False)}\n\n" + + +state_stream = StateStream() diff --git a/nanoclaude/services/worker.py b/nanoclaude/services/worker.py new file mode 100644 index 0000000..14abb1e --- /dev/null +++ b/nanoclaude/services/worker.py @@ -0,0 +1,65 @@ +"""Worker — background task that drives EXAONE call and pushes SSE events.""" + +from __future__ import annotations + +import asyncio +import logging + +from models.schemas import JobStatus +from services.exaone_adapter import stream_chat +from services.job_manager import Job, job_manager +from services.state_stream import state_stream + +logger = logging.getLogger(__name__) + +# 무응답 방지: 3~5초 간격으로 processing heartbeat +HEARTBEAT_INTERVAL = 4.0 + + +async def run(job: Job) -> None: + """EXAONE 호출 → SSE 이벤트 발행.""" + try: + # --- ACK --- + await state_stream.push(job.id, "ack", {"message": "요청을 확인했습니다. 분석을 시작합니다."}) + job_manager.set_status(job.id, JobStatus.processing) + + # --- Processing + Streaming --- + await state_stream.push(job.id, "processing", {"message": "EXAONE 모델이 응답을 생성하고 있습니다..."}) + + collected: list[str] = [] + last_heartbeat = asyncio.get_event_loop().time() + + async for chunk in stream_chat(job.message): + if job.status == JobStatus.cancelled: + logger.info("Job %s cancelled during streaming", job.id) + await state_stream.push(job.id, "error", {"message": "작업이 취소되었습니다."}) + return + + collected.append(chunk) + + # Stream partial result + await state_stream.push(job.id, "result", {"content": chunk}) + + # Heartbeat: 긴 침묵 방지 + now = asyncio.get_event_loop().time() + if now - last_heartbeat >= HEARTBEAT_INTERVAL: + await state_stream.push(job.id, "processing", {"message": "응답 생성 중..."}) + last_heartbeat = now + + # --- Complete --- + if not collected: + job_manager.set_status(job.id, JobStatus.failed) + await state_stream.push(job.id, "error", {"message": "EXAONE으로부터 응답을 받지 못했습니다."}) + else: + job_manager.set_status(job.id, JobStatus.completed) + await state_stream.push(job.id, "done", {"message": "완료"}) + + except asyncio.CancelledError: + job_manager.set_status(job.id, JobStatus.cancelled) + await state_stream.push(job.id, "error", {"message": "작업이 취소되었습니다."}) + except Exception: + logger.exception("Worker failed for job %s", job.id) + job_manager.set_status(job.id, JobStatus.failed) + await state_stream.push(job.id, "error", {"message": "내부 오류가 발생했습니다."}) + finally: + await state_stream.push_done(job.id)