feat: NanoClaude Phase 1 — 비동기 job 기반 AI Gateway 코어 구현

POST /chat → job_id ACK, GET /chat/{job_id}/stream → SSE 스트리밍,
EXAONE Ollama adapter, JobManager, StateStream, Worker 구조

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Hyungi Ahn
2026-04-06 11:12:04 +09:00
parent 4917fd568f
commit d946b769e5
13 changed files with 450 additions and 0 deletions

View File

View File

@@ -0,0 +1,90 @@
"""EXAONE Adapter — Ollama OpenAI-compat endpoint를 통한 EXAONE 호출."""
from __future__ import annotations
import logging
from collections.abc import AsyncGenerator
import httpx
from config import settings
logger = logging.getLogger(__name__)
SYSTEM_PROMPT = (
"너는 NanoClaude, 사용자의 질문을 이해하고 정리하여 명확한 답변을 제공하는 AI 어시스턴트다. "
"사용자의 질문 의도를 파악하고, 문장을 정리하며, 구조화된 응답을 생성한다."
)
async def stream_chat(message: str) -> AsyncGenerator[str, None]:
"""EXAONE 스트리밍 호출. OpenAI-compat SSE를 chunk 단위로 yield."""
payload = {
"model": settings.exaone_model,
"messages": [
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": message},
],
"stream": True,
"temperature": settings.exaone_temperature,
}
async with httpx.AsyncClient(timeout=settings.exaone_timeout) as client:
try:
async with client.stream(
"POST",
f"{settings.exaone_base_url}/v1/chat/completions",
json=payload,
) as resp:
if resp.status_code != 200:
body = await resp.aread()
logger.error("EXAONE error %d: %s", resp.status_code, body.decode())
yield f"[Error] EXAONE 응답 실패 ({resp.status_code})"
return
async for line in resp.aiter_lines():
line = line.strip()
if not line or not line.startswith("data: "):
continue
payload_str = line[len("data: "):]
if payload_str == "[DONE]":
return
# Extract content delta from OpenAI-format chunk
try:
import json
chunk = json.loads(payload_str)
delta = chunk.get("choices", [{}])[0].get("delta", {})
content = delta.get("content", "")
if content:
yield content
except (json.JSONDecodeError, IndexError, KeyError):
continue
except httpx.ConnectError:
logger.error("EXAONE connection failed: %s", settings.exaone_base_url)
yield "[Error] EXAONE 서버에 연결할 수 없습니다."
except httpx.ReadTimeout:
logger.error("EXAONE read timeout")
yield "[Error] EXAONE 응답 시간이 초과되었습니다."
async def complete_chat(message: str) -> str:
"""EXAONE 비스트리밍 호출. 전체 응답 텍스트 반환."""
payload = {
"model": settings.exaone_model,
"messages": [
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": message},
],
"stream": False,
"temperature": settings.exaone_temperature,
}
async with httpx.AsyncClient(timeout=settings.exaone_timeout) as client:
resp = await client.post(
f"{settings.exaone_base_url}/v1/chat/completions",
json=payload,
)
resp.raise_for_status()
data = resp.json()
return data["choices"][0]["message"]["content"]

View File

@@ -0,0 +1,57 @@
"""JobManager — job lifecycle: create, track status, cancel."""
from __future__ import annotations
import asyncio
import uuid
from dataclasses import dataclass, field
from time import time
from models.schemas import JobStatus
@dataclass
class Job:
id: str
message: str
status: JobStatus = JobStatus.queued
created_at: float = field(default_factory=time)
task: asyncio.Task | None = field(default=None, repr=False)
class JobManager:
def __init__(self) -> None:
self._jobs: dict[str, Job] = {}
def create(self, message: str) -> Job:
job_id = uuid.uuid4().hex[:12]
job = Job(id=job_id, message=message)
self._jobs[job_id] = job
return job
def get(self, job_id: str) -> Job | None:
return self._jobs.get(job_id)
def set_status(self, job_id: str, status: JobStatus) -> None:
job = self._jobs.get(job_id)
if job:
job.status = status
def cancel(self, job_id: str) -> bool:
job = self._jobs.get(job_id)
if not job:
return False
if job.status in (JobStatus.completed, JobStatus.failed, JobStatus.cancelled):
return False
job.status = JobStatus.cancelled
if job.task and not job.task.done():
job.task.cancel()
return True
def attach_task(self, job_id: str, task: asyncio.Task) -> None:
job = self._jobs.get(job_id)
if job:
job.task = task
job_manager = JobManager()

View File

@@ -0,0 +1,55 @@
"""StateStream — per-job SSE event queue."""
from __future__ import annotations
import asyncio
import json
import logging
from collections.abc import AsyncGenerator
logger = logging.getLogger(__name__)
class StateStream:
"""Manages per-job asyncio.Queue for SSE events."""
def __init__(self) -> None:
self._queues: dict[str, asyncio.Queue] = {}
def create(self, job_id: str) -> None:
self._queues[job_id] = asyncio.Queue()
async def push(self, job_id: str, event: str, data: dict) -> None:
q = self._queues.get(job_id)
if q:
await q.put((event, data))
async def push_done(self, job_id: str) -> None:
"""Push sentinel to signal stream end."""
q = self._queues.get(job_id)
if q:
await q.put(None)
async def subscribe(self, job_id: str) -> AsyncGenerator[str, None]:
"""Yield SSE-formatted strings until done sentinel."""
q = self._queues.get(job_id)
if not q:
yield _sse("error", {"message": "Job not found"})
return
while True:
item = await q.get()
if item is None:
break
event, data = item
yield _sse(event, data)
def cleanup(self, job_id: str) -> None:
self._queues.pop(job_id, None)
def _sse(event: str, data: dict) -> str:
return f"event: {event}\ndata: {json.dumps(data, ensure_ascii=False)}\n\n"
state_stream = StateStream()

View File

@@ -0,0 +1,65 @@
"""Worker — background task that drives EXAONE call and pushes SSE events."""
from __future__ import annotations
import asyncio
import logging
from models.schemas import JobStatus
from services.exaone_adapter import stream_chat
from services.job_manager import Job, job_manager
from services.state_stream import state_stream
logger = logging.getLogger(__name__)
# 무응답 방지: 3~5초 간격으로 processing heartbeat
HEARTBEAT_INTERVAL = 4.0
async def run(job: Job) -> None:
"""EXAONE 호출 → SSE 이벤트 발행."""
try:
# --- ACK ---
await state_stream.push(job.id, "ack", {"message": "요청을 확인했습니다. 분석을 시작합니다."})
job_manager.set_status(job.id, JobStatus.processing)
# --- Processing + Streaming ---
await state_stream.push(job.id, "processing", {"message": "EXAONE 모델이 응답을 생성하고 있습니다..."})
collected: list[str] = []
last_heartbeat = asyncio.get_event_loop().time()
async for chunk in stream_chat(job.message):
if job.status == JobStatus.cancelled:
logger.info("Job %s cancelled during streaming", job.id)
await state_stream.push(job.id, "error", {"message": "작업이 취소되었습니다."})
return
collected.append(chunk)
# Stream partial result
await state_stream.push(job.id, "result", {"content": chunk})
# Heartbeat: 긴 침묵 방지
now = asyncio.get_event_loop().time()
if now - last_heartbeat >= HEARTBEAT_INTERVAL:
await state_stream.push(job.id, "processing", {"message": "응답 생성 중..."})
last_heartbeat = now
# --- Complete ---
if not collected:
job_manager.set_status(job.id, JobStatus.failed)
await state_stream.push(job.id, "error", {"message": "EXAONE으로부터 응답을 받지 못했습니다."})
else:
job_manager.set_status(job.id, JobStatus.completed)
await state_stream.push(job.id, "done", {"message": "완료"})
except asyncio.CancelledError:
job_manager.set_status(job.id, JobStatus.cancelled)
await state_stream.push(job.id, "error", {"message": "작업이 취소되었습니다."})
except Exception:
logger.exception("Worker failed for job %s", job.id)
job_manager.set_status(job.id, JobStatus.failed)
await state_stream.push(job.id, "error", {"message": "내부 오류가 발생했습니다."})
finally:
await state_stream.push_done(job.id)