"""Worker — 2단계 파이프라인: EXAONE rewrite → Gemma reasoning (cancel-safe + fallback).""" from __future__ import annotations import asyncio import logging from time import time from config import settings from db.database import log_completion, log_request from models.schemas import JobStatus from services.backend_registry import backend_registry from services.job_manager import Job, job_manager from services.state_stream import state_stream from services.synology_sender import send_to_synology logger = logging.getLogger(__name__) HEARTBEAT_INTERVAL = 4.0 REWRITE_HEARTBEAT = 2.0 MAX_REWRITE_LENGTH = 1000 SYNOLOGY_MAX_LEN = 1500 async def _complete_with_heartbeat(adapter, message: str, job_id: str) -> str: """complete_chat + heartbeat 병행. rewrite 대기 중 사용자 체감 멈춤 방지.""" result_holder: dict[str, str] = {} exc_holder: list[Exception] = [] async def call(): try: result_holder["text"] = await adapter.complete_chat(message) except Exception as e: exc_holder.append(e) task = asyncio.create_task(call()) while not task.done(): await asyncio.sleep(REWRITE_HEARTBEAT) if not task.done(): await state_stream.push(job_id, "processing", {"message": "질문을 재구성하고 있습니다..."}) if exc_holder: raise exc_holder[0] return result_holder.get("text", "") async def _stream_with_cancel(adapter, message: str, job: Job, collected: list[str]) -> bool: """스트리밍 + cancel 체크. 정상 완료 시 True, cancel 시 False.""" last_heartbeat = asyncio.get_event_loop().time() async for chunk in adapter.stream_chat(message): if job.status == JobStatus.cancelled: return False collected.append(chunk) await state_stream.push(job.id, "result", {"content": chunk}) now = asyncio.get_event_loop().time() if now - last_heartbeat >= HEARTBEAT_INTERVAL: await state_stream.push(job.id, "processing", {"message": "응답 생성 중..."}) last_heartbeat = now return True async def run(job: Job) -> None: """EXAONE rewrite → Gemma reasoning 파이프라인 (fallback + cancel-safe).""" start_time = time() rewrite_model = None reasoning_model = None rewritten_message = "" try: await log_request(job.id, job.message, "pipeline", job.created_at) except Exception: logger.warning("Failed to log request for job %s", job.id, exc_info=True) try: # --- ACK --- await state_stream.push(job.id, "ack", {"message": "요청을 확인했습니다. 분석을 시작합니다."}) job_manager.set_status(job.id, JobStatus.processing) # --- Cancel 체크 #1 --- if job.status == JobStatus.cancelled: return use_pipeline = settings.pipeline_enabled and backend_registry.is_healthy("reasoner") collected: list[str] = [] if not use_pipeline: # === EXAONE 단독 모드 (Phase 1 fallback) === rewrite_model = backend_registry.rewriter.model await state_stream.push(job.id, "processing", {"message": "EXAONE 모델이 응답을 생성하고 있습니다..."}) ok = await _stream_with_cancel(backend_registry.rewriter, job.message, job, collected) if not ok: return else: # === 파이프라인 모드: EXAONE rewrite → Gemma reasoning === rewrite_model = backend_registry.rewriter.model reasoning_model = backend_registry.reasoner.model # --- Rewrite --- await state_stream.push(job.id, "processing", {"message": "질문을 재구성하고 있습니다..."}) rewrite_start = time() try: rewritten_message = await _complete_with_heartbeat( backend_registry.rewriter, job.message, job.id ) rewritten_message = rewritten_message[:MAX_REWRITE_LENGTH] except Exception: logger.warning("Rewrite failed for job %s, using original message", job.id) rewritten_message = job.message rewrite_latency = (time() - rewrite_start) * 1000 job.rewritten_message = rewritten_message # --- Rewrite 결과 SSE 노출 (Synology에서는 숨김) --- if job.callback != "synology": await state_stream.push(job.id, "rewrite", {"content": rewritten_message}) # --- Cancel 체크 #2 --- if job.status == JobStatus.cancelled: return # --- Reasoning --- await state_stream.push(job.id, "processing", {"message": "Gemma 4가 응답을 생성하고 있습니다..."}) try: ok = await _stream_with_cancel(backend_registry.reasoner, rewritten_message, job, collected) if not ok: return except Exception: # Gemma streaming 중간 실패 → EXAONE fallback logger.warning("Reasoner failed for job %s, falling back to rewriter", job.id, exc_info=True) if job.status == JobStatus.cancelled: return await state_stream.push(job.id, "processing", {"message": "모델 전환 중..."}) reasoning_model = rewrite_model # fallback 기록 ok = await _stream_with_cancel(backend_registry.rewriter, job.message, job, collected) if not ok: return # --- Complete --- if not collected: job_manager.set_status(job.id, JobStatus.failed) await state_stream.push(job.id, "error", {"message": "응답을 받지 못했습니다."}) status = "failed" if job.callback == "synology": await send_to_synology("⚠️ 응답을 받지 못했습니다. 다시 시도해주세요.") else: job_manager.set_status(job.id, JobStatus.completed) await state_stream.push(job.id, "done", {"message": "완료"}) status = "completed" # Synology callback: 결과 전송 if job.callback == "synology": full_response = "".join(collected) if len(full_response) > SYNOLOGY_MAX_LEN: full_response = full_response[:SYNOLOGY_MAX_LEN] + "\n\n...(생략됨)" await send_to_synology(full_response) # --- DB 로깅 --- latency_ms = (time() - start_time) * 1000 response_text = "".join(collected) try: await log_completion( job.id, status, len(response_text), latency_ms, time(), rewrite_model=rewrite_model, reasoning_model=reasoning_model, rewritten_message=rewritten_message, rewrite_latency_ms=rewrite_latency if use_pipeline else 0, ) except Exception: logger.warning("Failed to log completion for job %s", job.id, exc_info=True) except asyncio.CancelledError: job_manager.set_status(job.id, JobStatus.cancelled) await state_stream.push(job.id, "error", {"message": "작업이 취소되었습니다."}) try: await log_completion(job.id, "cancelled", 0, (time() - start_time) * 1000, time()) except Exception: pass except Exception: logger.exception("Worker failed for job %s", job.id) job_manager.set_status(job.id, JobStatus.failed) await state_stream.push(job.id, "error", {"message": "내부 오류가 발생했습니다."}) if job.callback == "synology": try: await send_to_synology("⚠️ 처리 중 오류가 발생했습니다. 다시 시도해주세요.") except Exception: pass try: await log_completion(job.id, "failed", 0, (time() - start_time) * 1000, time()) except Exception: pass finally: await state_stream.push_done(job.id)