Files
gpu-services/nanoclaude/services/worker.py
Hyungi Ahn e970ebdbea feat: NanoClaude 프로덕션 통합 — Docker, Caddy, aiosqlite 로깅
- docker-compose에 nanoclaude 서비스 추가 (포트 8100)
- Caddy /nano/* → nanoclaude 리버스 프록시 (SSE flush)
- aiosqlite 요청/응답 로깅 (request_logs 테이블)
- .env.example, CLAUDE.md 업데이트

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-06 11:19:15 +09:00

87 lines
3.3 KiB
Python

"""Worker — background task that drives EXAONE call and pushes SSE events."""
from __future__ import annotations
import asyncio
import logging
from time import time
from config import settings
from db.database import log_completion, log_request
from models.schemas import JobStatus
from services.exaone_adapter import stream_chat
from services.job_manager import Job, job_manager
from services.state_stream import state_stream
logger = logging.getLogger(__name__)
# 무응답 방지: 3~5초 간격으로 processing heartbeat
HEARTBEAT_INTERVAL = 4.0
async def run(job: Job) -> None:
"""EXAONE 호출 → SSE 이벤트 발행."""
start_time = time()
# DB 로깅: 요청 기록
try:
await log_request(job.id, job.message, settings.exaone_model, job.created_at)
except Exception:
logger.warning("Failed to log request for job %s", job.id, exc_info=True)
try:
# --- ACK ---
await state_stream.push(job.id, "ack", {"message": "요청을 확인했습니다. 분석을 시작합니다."})
job_manager.set_status(job.id, JobStatus.processing)
# --- Processing + Streaming ---
await state_stream.push(job.id, "processing", {"message": "EXAONE 모델이 응답을 생성하고 있습니다..."})
collected: list[str] = []
last_heartbeat = asyncio.get_event_loop().time()
async for chunk in stream_chat(job.message):
if job.status == JobStatus.cancelled:
logger.info("Job %s cancelled during streaming", job.id)
await state_stream.push(job.id, "error", {"message": "작업이 취소되었습니다."})
return
collected.append(chunk)
# Stream partial result
await state_stream.push(job.id, "result", {"content": chunk})
# Heartbeat: 긴 침묵 방지
now = asyncio.get_event_loop().time()
if now - last_heartbeat >= HEARTBEAT_INTERVAL:
await state_stream.push(job.id, "processing", {"message": "응답 생성 중..."})
last_heartbeat = now
# --- Complete ---
if not collected:
job_manager.set_status(job.id, JobStatus.failed)
await state_stream.push(job.id, "error", {"message": "EXAONE으로부터 응답을 받지 못했습니다."})
status = "failed"
else:
job_manager.set_status(job.id, JobStatus.completed)
await state_stream.push(job.id, "done", {"message": "완료"})
status = "completed"
# DB 로깅: 완료 기록
latency_ms = (time() - start_time) * 1000
response_text = "".join(collected)
try:
await log_completion(job.id, status, len(response_text), latency_ms, time())
except Exception:
logger.warning("Failed to log completion for job %s", job.id, exc_info=True)
except asyncio.CancelledError:
job_manager.set_status(job.id, JobStatus.cancelled)
await state_stream.push(job.id, "error", {"message": "작업이 취소되었습니다."})
except Exception:
logger.exception("Worker failed for job %s", job.id)
job_manager.set_status(job.id, JobStatus.failed)
await state_stream.push(job.id, "error", {"message": "내부 오류가 발생했습니다."})
finally:
await state_stream.push_done(job.id)