"""Worker — EXAONE 분류 → direct/route/clarify 분기 (cancel-safe + fallback).""" from __future__ import annotations import asyncio import json import logging from time import time from config import settings from db.database import log_completion, log_request from models.schemas import JobStatus from services.backend_registry import backend_registry from services.conversation import conversation_store from services.job_manager import Job, job_manager from services.state_stream import state_stream from services.synology_sender import send_to_synology logger = logging.getLogger(__name__) HEARTBEAT_INTERVAL = 4.0 CLASSIFY_HEARTBEAT = 2.0 MAX_PROMPT_LENGTH = 1000 SYNOLOGY_MAX_LEN = 4000 async def _complete_with_heartbeat(adapter, message: str, job_id: str, *, messages=None, beat_msg="처리 중...") -> str: """complete_chat + heartbeat 병행.""" result_holder: dict[str, str] = {} exc_holder: list[Exception] = [] async def call(): try: result_holder["text"] = await adapter.complete_chat(message, messages=messages) except Exception as e: exc_holder.append(e) task = asyncio.create_task(call()) while not task.done(): await asyncio.sleep(CLASSIFY_HEARTBEAT) if not task.done(): await state_stream.push(job_id, "processing", {"message": beat_msg}) if exc_holder: raise exc_holder[0] return result_holder.get("text", "") async def _stream_with_cancel(adapter, message: str, job: Job, collected: list[str], *, messages=None) -> bool: """스트리밍 + cancel 체크.""" last_heartbeat = asyncio.get_event_loop().time() async for chunk in adapter.stream_chat(message, messages=messages): if job.status == JobStatus.cancelled: return False collected.append(chunk) await state_stream.push(job.id, "result", {"content": chunk}) now = asyncio.get_event_loop().time() if now - last_heartbeat >= HEARTBEAT_INTERVAL: await state_stream.push(job.id, "processing", {"message": "응답 생성 중..."}) last_heartbeat = now return True def _parse_classification(raw: str) -> dict: """EXAONE JSON 응답 파싱. 실패 시 direct fallback.""" raw = raw.strip() # JSON 블록 추출 (```json ... ``` 감싸는 경우 대응) if "```" in raw: start = raw.find("{") end = raw.rfind("}") + 1 if start >= 0 and end > start: raw = raw[start:end] try: result = json.loads(raw) if "action" in result: return result except json.JSONDecodeError: pass # JSON 파싱 실패 → direct로 취급 (raw 텍스트가 직접 응답) return {"action": "direct", "response": raw, "prompt": ""} async def _send_callback(job: Job, text: str) -> None: """Synology callback이면 전송.""" if job.callback == "synology": if len(text) > SYNOLOGY_MAX_LEN: text = text[:SYNOLOGY_MAX_LEN] + "\n\n...(생략됨)" await send_to_synology(text) async def run(job: Job) -> None: """EXAONE 분류 → direct/route/clarify 분기.""" start_time = time() user_id = job.callback_meta.get("user_id", "api") classify_model = None reasoning_model = None rewritten_message = "" try: await log_request(job.id, job.message, "classify", job.created_at) except Exception: logger.warning("Failed to log request for job %s", job.id, exc_info=True) try: # --- ACK --- await state_stream.push(job.id, "ack", {"message": "요청을 확인했습니다."}) job_manager.set_status(job.id, JobStatus.processing) if job.status == JobStatus.cancelled: return classify_model = backend_registry.classifier.model # --- 대화 이력 포함하여 분류 요청 --- history = conversation_store.format_for_prompt(user_id) classify_input = job.message if history: classify_input = f"[대화 이력]\n{history}\n\n[현재 메시지]\n{job.message}" await state_stream.push(job.id, "processing", {"message": "메시지를 분석하고 있습니다..."}) classify_start = time() try: raw_result = await _complete_with_heartbeat( backend_registry.classifier, classify_input, job.id, beat_msg="메시지를 분석하고 있습니다..." ) except Exception: logger.warning("Classification failed for job %s, falling back to direct", job.id) raw_result = "" classify_latency = (time() - classify_start) * 1000 classification = _parse_classification(raw_result) action = classification.get("action", "direct") response_text = classification.get("response", "") route_prompt = classification.get("prompt", "") logger.info("Job %s classified as '%s'", job.id, action) # 대화 기록: 사용자 메시지 conversation_store.add(user_id, "user", job.message) collected: list[str] = [] if job.status == JobStatus.cancelled: return if action == "clarify": # === CLARIFY: 추가 질문 === collected.append(response_text) await state_stream.push(job.id, "result", {"content": response_text}) conversation_store.add(user_id, "assistant", response_text) elif action == "route" and settings.pipeline_enabled and backend_registry.is_healthy("reasoner"): # === ROUTE: Gemma reasoning === reasoning_model = backend_registry.reasoner.model rewritten_message = (route_prompt or job.message)[:MAX_PROMPT_LENGTH] job.rewritten_message = rewritten_message if job.callback != "synology": await state_stream.push(job.id, "rewrite", {"content": rewritten_message}) else: await send_to_synology("📝 더 깊이 살펴볼게요...") if job.status == JobStatus.cancelled: return await state_stream.push(job.id, "processing", {"message": "Gemma 4가 응답을 생성하고 있습니다..."}) try: ok = await _stream_with_cancel(backend_registry.reasoner, rewritten_message, job, collected) if not ok: return except Exception: logger.warning("Reasoner failed for job %s, falling back to EXAONE", job.id, exc_info=True) if job.status == JobStatus.cancelled: return await state_stream.push(job.id, "processing", {"message": "모델 전환 중..."}) reasoning_model = classify_model ok = await _stream_with_cancel(backend_registry.classifier, job.message, job, collected) if not ok: return if collected: conversation_store.add(user_id, "assistant", "".join(collected)) else: # === DIRECT: EXAONE 직접 응답 === if response_text: # 분류기가 이미 응답을 생성함 collected.append(response_text) await state_stream.push(job.id, "result", {"content": response_text}) else: # 분류 실패 → EXAONE 스트리밍으로 직접 응답 await state_stream.push(job.id, "processing", {"message": "응답을 생성하고 있습니다..."}) ok = await _stream_with_cancel(backend_registry.classifier, job.message, job, collected) if not ok: return if collected: conversation_store.add(user_id, "assistant", "".join(collected)) # --- Complete --- if not collected: job_manager.set_status(job.id, JobStatus.failed) await state_stream.push(job.id, "error", {"message": "응답을 받지 못했습니다."}) status = "failed" await _send_callback(job, "⚠️ 응답을 받지 못했습니다. 다시 시도해주세요.") else: job_manager.set_status(job.id, JobStatus.completed) await state_stream.push(job.id, "done", {"message": "완료"}) status = "completed" await _send_callback(job, "".join(collected)) # --- DB 로깅 --- latency_ms = (time() - start_time) * 1000 try: await log_completion( job.id, status, len("".join(collected)), latency_ms, time(), rewrite_model=classify_model, reasoning_model=reasoning_model, rewritten_message=rewritten_message, rewrite_latency_ms=classify_latency, ) except Exception: logger.warning("Failed to log completion for job %s", job.id, exc_info=True) except asyncio.CancelledError: job_manager.set_status(job.id, JobStatus.cancelled) await state_stream.push(job.id, "error", {"message": "작업이 취소되었습니다."}) try: await log_completion(job.id, "cancelled", 0, (time() - start_time) * 1000, time()) except Exception: pass except Exception: logger.exception("Worker failed for job %s", job.id) job_manager.set_status(job.id, JobStatus.failed) await state_stream.push(job.id, "error", {"message": "내부 오류가 발생했습니다."}) if job.callback == "synology": try: await send_to_synology("⚠️ 처리 중 오류가 발생했습니다. 다시 시도해주세요.") except Exception: pass try: await log_completion(job.id, "failed", 0, (time() - start_time) * 1000, time()) except Exception: pass finally: await state_stream.push_done(job.id)