"""Worker — background task that drives EXAONE call and pushes SSE events.""" from __future__ import annotations import asyncio import logging from time import time from config import settings from db.database import log_completion, log_request from models.schemas import JobStatus from services.exaone_adapter import stream_chat from services.job_manager import Job, job_manager from services.state_stream import state_stream logger = logging.getLogger(__name__) # 무응답 방지: 3~5초 간격으로 processing heartbeat HEARTBEAT_INTERVAL = 4.0 async def run(job: Job) -> None: """EXAONE 호출 → SSE 이벤트 발행.""" start_time = time() # DB 로깅: 요청 기록 try: await log_request(job.id, job.message, settings.exaone_model, job.created_at) except Exception: logger.warning("Failed to log request for job %s", job.id, exc_info=True) try: # --- ACK --- await state_stream.push(job.id, "ack", {"message": "요청을 확인했습니다. 분석을 시작합니다."}) job_manager.set_status(job.id, JobStatus.processing) # --- Processing + Streaming --- await state_stream.push(job.id, "processing", {"message": "EXAONE 모델이 응답을 생성하고 있습니다..."}) collected: list[str] = [] last_heartbeat = asyncio.get_event_loop().time() async for chunk in stream_chat(job.message): if job.status == JobStatus.cancelled: logger.info("Job %s cancelled during streaming", job.id) await state_stream.push(job.id, "error", {"message": "작업이 취소되었습니다."}) latency_ms = (time() - start_time) * 1000 try: await log_completion(job.id, "cancelled", len("".join(collected)), latency_ms, time()) except Exception: pass return collected.append(chunk) # Stream partial result await state_stream.push(job.id, "result", {"content": chunk}) # Heartbeat: 긴 침묵 방지 now = asyncio.get_event_loop().time() if now - last_heartbeat >= HEARTBEAT_INTERVAL: await state_stream.push(job.id, "processing", {"message": "응답 생성 중..."}) last_heartbeat = now # --- Complete --- if not collected: job_manager.set_status(job.id, JobStatus.failed) await state_stream.push(job.id, "error", {"message": "EXAONE으로부터 응답을 받지 못했습니다."}) status = "failed" else: job_manager.set_status(job.id, JobStatus.completed) await state_stream.push(job.id, "done", {"message": "완료"}) status = "completed" # DB 로깅: 완료 기록 latency_ms = (time() - start_time) * 1000 response_text = "".join(collected) try: await log_completion(job.id, status, len(response_text), latency_ms, time()) except Exception: logger.warning("Failed to log completion for job %s", job.id, exc_info=True) except asyncio.CancelledError: job_manager.set_status(job.id, JobStatus.cancelled) await state_stream.push(job.id, "error", {"message": "작업이 취소되었습니다."}) except Exception: logger.exception("Worker failed for job %s", job.id) job_manager.set_status(job.id, JobStatus.failed) await state_stream.push(job.id, "error", {"message": "내부 오류가 발생했습니다."}) finally: await state_stream.push_done(job.id)