Files
gpu-services/nanoclaude/services/job_manager.py
Hyungi Ahn c4c32170f1 feat: NanoClaude Phase 2 — EXAONE→Gemma 파이프라인, 큐, 상태 API
- ModelAdapter: 범용 OpenAI-compat 어댑터 (stream/complete/health)
- BackendRegistry: rewriter(EXAONE) + reasoner(Gemma4) 헬스체크 루프
- 2단계 파이프라인: EXAONE rewrite → Gemma reasoning (SSE rewrite 이벤트 노출)
- Fallback: 맥미니 다운 시 EXAONE 단독 모드, stream 중간 실패 시 자동 전환
- Cancel-safe: rewrite 전/후, streaming loop 내, fallback 경로 모두 체크
- Rewrite heartbeat: complete_chat 대기 중 2초 간격 processing 이벤트
- JobQueue: Semaphore(3) 기반 동시성 제한, 정확한 queue position
- GET /chat/{job_id}/status, GET /queue/stats 엔드포인트
- DB: rewrite_model, reasoning_model, rewritten_message 컬럼 추가

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-06 12:04:15 +09:00

60 lines
1.5 KiB
Python

"""JobManager — job lifecycle: create, track status, cancel."""
from __future__ import annotations
import asyncio
import uuid
from dataclasses import dataclass, field
from time import time
from models.schemas import JobStatus
@dataclass
class Job:
id: str
message: str
status: JobStatus = JobStatus.queued
created_at: float = field(default_factory=time)
task: asyncio.Task | None = field(default=None, repr=False)
pipeline: bool = True
rewritten_message: str = ""
class JobManager:
def __init__(self) -> None:
self._jobs: dict[str, Job] = {}
def create(self, message: str) -> Job:
job_id = uuid.uuid4().hex[:12]
job = Job(id=job_id, message=message)
self._jobs[job_id] = job
return job
def get(self, job_id: str) -> Job | None:
return self._jobs.get(job_id)
def set_status(self, job_id: str, status: JobStatus) -> None:
job = self._jobs.get(job_id)
if job:
job.status = status
def cancel(self, job_id: str) -> bool:
job = self._jobs.get(job_id)
if not job:
return False
if job.status in (JobStatus.completed, JobStatus.failed, JobStatus.cancelled):
return False
job.status = JobStatus.cancelled
if job.task and not job.task.done():
job.task.cancel()
return True
def attach_task(self, job_id: str, task: asyncio.Task) -> None:
job = self._jobs.get(job_id)
if job:
job.task = task
job_manager = JobManager()