"""Worker — EXAONE 분류 → direct/route/clarify/tools 분기 (cancel-safe + fallback).""" from __future__ import annotations import asyncio import json import logging from time import time from config import settings from db.database import log_completion, log_request from models.schemas import JobStatus from services.backend_registry import backend_registry from services.classifier_io import looks_like_router_json, parse_classification from services.conversation import conversation_store from services.job_manager import Job, job_manager from services.state_stream import state_stream from services.synology_sender import send_to_synology from tools.registry import execute_tool logger = logging.getLogger(__name__) HEARTBEAT_INTERVAL = 4.0 CLASSIFY_HEARTBEAT = 2.0 MAX_PROMPT_LENGTH = 1000 SYNOLOGY_MAX_LEN = 4000 MAX_TOOL_PAYLOAD = 2000 TOOL_TIMEOUT = 10.0 DOCUMENT_ASK_TIMEOUT = 35.0 async def _complete_with_heartbeat(adapter, message: str, job_id: str, *, messages=None, beat_msg="처리 중...") -> str: """complete_chat + heartbeat 병행.""" result_holder: dict[str, str] = {} exc_holder: list[Exception] = [] async def call(): try: result_holder["text"] = await adapter.complete_chat(message, messages=messages) except Exception as e: exc_holder.append(e) task = asyncio.create_task(call()) while not task.done(): await asyncio.sleep(CLASSIFY_HEARTBEAT) if not task.done(): await state_stream.push(job_id, "processing", {"message": beat_msg}) if exc_holder: raise exc_holder[0] return result_holder.get("text", "") async def _stream_with_cancel(adapter, message: str, job: Job, collected: list[str], *, messages=None) -> bool: """스트리밍 + cancel 체크.""" last_heartbeat = asyncio.get_event_loop().time() async for chunk in adapter.stream_chat(message, messages=messages): if job.status == JobStatus.cancelled: return False collected.append(chunk) await state_stream.push(job.id, "result", {"content": chunk}) now = asyncio.get_event_loop().time() if now - last_heartbeat >= HEARTBEAT_INTERVAL: await state_stream.push(job.id, "processing", {"message": "응답 생성 중..."}) last_heartbeat = now return True async def _fetch_fallback_text(job: Job) -> str: """Generate a safe natural-language reply when classification fails. Uses ``backend_registry.chat_fallback`` (same physical model as the classifier, but with a chat-shaped system prompt). Never reuses ``backend_registry.classifier`` — that adapter has CLASSIFIER_PROMPT attached, so it would emit router JSON straight to the user. Returns "" when the fallback adapter is unavailable, the call fails, or the output looks like router JSON. Callers treat empty as "send the safe error message instead". """ adapter = backend_registry.chat_fallback if adapter is None: logger.warning("chat_fallback adapter not initialized for job %s", job.id) return "" await state_stream.push( job.id, "processing", {"message": "응답을 생성하고 있습니다..."} ) try: text = await _complete_with_heartbeat( adapter, job.message, job.id, beat_msg="응답을 생성하고 있습니다...", ) except Exception: logger.warning("Chat fallback failed for job %s", job.id, exc_info=True) return "" if looks_like_router_json(text): logger.warning( "Router-like JSON detected in fallback output for job %s, suppressing", job.id, ) return "" return text.strip() async def _build_system_status(force_measure: bool = True) -> str: """시스템 상태 메시지 생성. force_measure=True면 inference latency 1회 측정.""" from services import job_queue as jq_module health = backend_registry.health_summary() lines = ["🤖 시스템 상태:", ""] for role in ("classifier", "reasoner"): info = health.get(role) if not info: continue load = await backend_registry.get_load_status(role, force_measure=force_measure) connected = "정상" if info["healthy"] else "연결 안 됨" load_str = load["load"] source = load.get("source", "hybrid") line1 = f"{info['name']}: {connected} — {load_str}" lines.append(line1) if source == "status_api": # /status API 직접 조회 — 정확한 active_jobs active = load.get("active_jobs", 0) total = load.get("total_requests", 0) lines.append(f" active_jobs: {active} (total: {total}) [status API]") lines.append(f" health {load['health_ms']:.0f}ms") else: # hybrid fallback (health baseline + 조건부 ping) baseline = load["health_baseline_ms"] ratio = (load["health_ms"] / baseline) if baseline > 0 else 1.0 lines.append(f" health {load['health_ms']:.0f}ms (baseline {baseline:.0f}ms, {ratio:.1f}배)") if load["measured"]: lines.append(f" ping {load['inference_ms']:.0f}ms") else: lines.append(" ping: 측정 안 함") lines.append("") queue = jq_module.job_queue.stats if jq_module.job_queue else {"pending": 0, "active": 0} lines.append(f"내 작업 큐: 처리 중 {queue.get('active', 0)}건, 대기 {queue.get('pending', 0)}건") return "\n".join(lines) async def _send_callback(job: Job, text: str) -> None: """Synology callback이면 전송 + response_sent 플래그.""" if job.callback == "synology": if len(text) > SYNOLOGY_MAX_LEN: text = text[:SYNOLOGY_MAX_LEN] + "\n\n...(생략됨)" await send_to_synology(text) job.response_sent = True def _pre_route(message: str) -> dict | None: """키워드 기반 사전 라우팅. EXAONE 7.8B 분류기 보완.""" from datetime import datetime, timedelta msg = message.lower().strip() now = datetime.now() # 캘린더 키워드 cal_keywords = ["일정", "캘린더", "스케줄", "약속", "미팅", "회의"] if any(k in msg for k in cal_keywords): # 생성 요청 if any(k in msg for k in ["잡아", "만들", "등록", "추가", "넣어"]): return None # EXAONE이 파라미터 추출해야 함 # 오늘 if "오늘" in msg: return {"action": "tools", "tool": "calendar", "operation": "today", "params": {}} # 내일 if "내일" in msg: tmr = (now + timedelta(days=1)).strftime("%Y-%m-%d") return {"action": "tools", "tool": "calendar", "operation": "search", "params": {"date_from": tmr, "date_to": tmr}} # 이번주 if "이번" in msg and ("주" in msg or "week" in msg): monday = now - timedelta(days=now.weekday()) sunday = monday + timedelta(days=6) return {"action": "tools", "tool": "calendar", "operation": "search", "params": {"date_from": monday.strftime("%Y-%m-%d"), "date_to": sunday.strftime("%Y-%m-%d")}} # 기본: 오늘 return {"action": "tools", "tool": "calendar", "operation": "today", "params": {}} # 메일 키워드 if any(k in msg for k in ["메일", "이메일", "mail", "편지"]): query = "" days = 7 folder = "" if "오늘" in msg or "4월 6일" in msg or "6일" in msg: days = 1 # 폴더 필터링 if any(k in msg for k in ["테크니컬", "회사", "technicalkorea", "네이버웍스"]): folder = "Technicalkorea" elif any(k in msg for k in ["gmail", "구글", "지메일"]): folder = "Gmail" elif any(k in msg for k in ["개인", "inbox"]): folder = "INBOX" return {"action": "tools", "tool": "email", "operation": "search", "params": {"query": query, "days": days, "folder": folder}} # Tier 2: 특정 문서 ID 명시 + 분석 키워드 → 전문 분석 import re doc_id_match = re.search(r'(\d{3,6})\s*번', msg) if not doc_id_match: doc_id_match = re.search(r'#(\d{3,6})', msg) analyze_signals = ["전체", "요약", "분석", "정리", "읽어", "전문", "자세히"] if doc_id_match and any(s in msg for s in analyze_signals): # query에서 doc_id 참조 제거 + 구체적 질문 없으면 기본 분석 요청 clean_query = re.sub(r'#?\d{3,6}\s*번?\s*', '', msg).strip() clean_query = re.sub(r'^(문서|자료|파일)\s*', '', clean_query).strip() clean_query = re.sub(r'\s*(해줘|해주세요|줘|좀)\s*$', '', clean_query).strip() if not clean_query or clean_query in ["분석", "요약", "정리", "전체", "읽어", "전문", "자세히"]: clean_query = "이 문서의 핵심 내용을 분석하고, 주요 사항과 실무상 중요한 포인트를 정리해주세요." return {"action": "tools", "tool": "document", "operation": "analyze", "params": {"doc_id": doc_id_match.group(1), "query": clean_query}} # 문서 키워드 — 질문형/탐색형 점수 기반 분기 doc_entry = any(k in msg for k in ["문서", "도큐먼트", "자료", "파일"]) doc_action = any(k in msg for k in ["찾아", "검색", "확인", "알려", "설명", "뭐야"]) # 도메인 키워드 — "문서 찾아줘" 없이도 전문 분야 질문이면 자동 라우팅 domain_keywords = [ "산업안전", "산안법", "안전보건", "위험성평가", "중대재해", "asme", "압력용기", "배관", "용접", "기계안전", "화학물질", "유해물질", "msds", "ghs", "법령", "시행령", "시행규칙", "고시", "규정", "산업안전기사", "건설안전", "전기안전", ] domain_hit = any(k in msg.lower() for k in domain_keywords) if (doc_entry and doc_action) or domain_hit: query = msg # 명시적 문서 키워드가 있으면 제거 if doc_entry: for rm in ["문서", "도큐먼트", "자료", "파일", "찾아줘", "찾아", "검색", "확인", "해줘", "줘", "좀"]: query = query.replace(rm, "") query = query.strip() if query: # 명시적 목록 요청만 search_full, 나머지는 ask (근거 중심 답변 기본값) list_signals = ["목록", "리스트", "검색 결과", "몇 개", "list", "제목만", "문서만"] list_request = any(s in msg for s in list_signals) operation = "search_full" if list_request else "ask" return {"action": "tools", "tool": "document", "operation": operation, "params": {"query": query}} # 인프라 도구 키워드 infra_keywords = ["docker", "컨테이너", "디스크", "용량", "헬스체크", "tailscale", "ollama 모델", "mlx 모델", "스케줄러", "scheduler", "큐 상태", "queue", "처리 큐", "verify", "검증"] if any(k in msg for k in infra_keywords): # docker/컨테이너 상태 if any(k in msg for k in ["docker", "컨테이너"]): host = "nas-company" if any(k in msg for k in ["nas", "회사"]) else "gpu" return {"action": "tools", "tool": "infra", "operation": "status", "params": {"host": host}} # 디스크 if any(k in msg for k in ["디스크", "용량"]): host = "" if "gpu" in msg: host = "gpu" elif any(k in msg for k in ["맥미니", "macmini", "mac mini"]): host = "macmini" return {"action": "tools", "tool": "infra", "operation": "disk", "params": {"host": host}} # 헬스체크 if "헬스" in msg or "health" in msg: return {"action": "tools", "tool": "infra", "operation": "health", "params": {}} # tailscale if "tailscale" in msg or "네트워크" in msg: return {"action": "tools", "tool": "infra", "operation": "network", "params": {}} # 모델 목록 if any(k in msg for k in ["ollama 모델", "mlx 모델"]): host = "mlx" if "mlx" in msg else "gpu" return {"action": "tools", "tool": "infra", "operation": "models", "params": {"host": host}} # 스케줄러 if any(k in msg for k in ["스케줄러", "scheduler"]): return {"action": "tools", "tool": "infra", "operation": "scheduler", "params": {}} # 큐 if any(k in msg for k in ["큐 상태", "queue", "처리 큐"]): return {"action": "tools", "tool": "infra", "operation": "queue", "params": {}} # 검증 if any(k in msg for k in ["verify", "검증"]): return {"action": "tools", "tool": "infra", "operation": "verify", "params": {"check_name": "gpu-snapshot"}} # 시스템 상태 질문 — 마커만 반환 (worker에서 비동기 조회) if any(k in msg for k in ["추론 모델", "gemma", "젬마", "서버 상태", "시스템 상태"]) or \ (any(k in msg for k in ["상태", "젬마", "gemma"]) and any(k in msg for k in ["돌아", "일하", "작동", "상태", "건강", "살아", "대기", "큐", "밀려", "바빠", "느려", "많"])): return {"action": "system_status", "response": "", "prompt": ""} # pending_draft 확인 응답 if msg in ("확인", "예", "yes", "ㅇㅇ", "응", "네", "좋아", "ok"): return {"action": "tools", "tool": "calendar", "operation": "create_confirmed", "params": {}} if msg in ("취소", "아니", "no", "ㄴㄴ"): return {"action": "direct", "response": "알겠어, 취소했어!", "prompt": ""} return None async def run(job: Job) -> None: """사전 라우팅 → EXAONE 분류 → direct/route/clarify/tools 분기.""" start_time = time() user_id = job.callback_meta.get("user_id", "api") classify_model = None reasoning_model = None rewritten_message = "" try: await log_request(job.id, job.message, "classify", job.created_at) except Exception: logger.warning("Failed to log request for job %s", job.id, exc_info=True) try: # --- ACK --- await state_stream.push(job.id, "ack", {"message": "요청을 확인했습니다."}) job_manager.set_status(job.id, JobStatus.processing) if job.status == JobStatus.cancelled: return classify_model = backend_registry.classifier.model # --- 사전 라우팅 (키워드 기반, EXAONE 스킵) --- pre = _pre_route(job.message) classify_latency = 0 if pre: classification = pre logger.info("Job %s pre-routed: %s.%s", job.id, pre.get("tool", ""), pre.get("operation", pre.get("action", ""))) else: # --- EXAONE 분류기 호출 --- from datetime import datetime, timezone, timedelta kst = timezone(timedelta(hours=9)) now_kst = datetime.now(kst) now_str = now_kst.strftime("%Y년 %m월 %d일 %H:%M (%A) KST") history = await conversation_store.format_for_prompt(user_id) classify_input = f"[현재 시간]\n{now_str}\n\n" if history: classify_input += f"[대화 이력]\n{history}\n\n" classify_input += f"[현재 메시지]\n{job.message}" await state_stream.push(job.id, "processing", {"message": "메시지를 분석하고 있습니다..."}) classify_start = time() try: raw_result = await _complete_with_heartbeat( backend_registry.classifier, classify_input, job.id, beat_msg="메시지를 분석하고 있습니다..." ) except Exception: logger.warning("Classification failed for job %s, falling back to direct", job.id) raw_result = "" classify_latency = (time() - classify_start) * 1000 classification = parse_classification(raw_result) action = classification.get("action", "classification_failed") response_text = classification.get("response", "") route_prompt = classification.get("prompt", "") logger.info("Job %s classified as '%s'", job.id, action) # 대화 기록: 사용자 메시지 await conversation_store.add(user_id, "user", job.message) collected: list[str] = [] # True iff we entered the chat-fallback branch (classification failed, # or a routed action could not run). Drives the safe error message at # the Complete gate. fallback_path = False if job.status == JobStatus.cancelled: return if action == "system_status": # === SYSTEM STATUS: 부하 상태 조회 (사용자 명시 요청 → ping 측정) === await state_stream.push(job.id, "processing", {"message": "시스템 상태 확인 중..."}) status_text = await _build_system_status(force_measure=True) collected.append(status_text) await state_stream.push(job.id, "result", {"content": status_text}) await conversation_store.add(user_id, "assistant", status_text) elif action == "tools": # === TOOLS: 도구 실행 === tool_name = classification.get("tool", "") operation = classification.get("operation", "") params = classification.get("params", {}) logger.info("Job %s tool call: %s.%s(%s)", job.id, tool_name, operation, params) await state_stream.push(job.id, "processing", {"message": f"🔧 {tool_name} 도구를 사용하고 있습니다..."}) # create_confirmed → pending_draft에서 데이터 가져오기 if operation == "create_confirmed": draft = conversation_store.get_pending_draft(user_id) if not draft: response = "확인할 일정이 없습니다. 다시 요청해주세요." collected.append(response) await state_stream.push(job.id, "result", {"content": response}) await conversation_store.add(user_id, "assistant", response) else: try: result = await asyncio.wait_for(execute_tool(tool_name, operation, draft), timeout=TOOL_TIMEOUT) except asyncio.TimeoutError: result = {"ok": False, "tool": tool_name, "operation": operation, "data": [], "summary": "", "error": "⚠️ 서비스 응답 시간이 초과되었습니다."} conversation_store.clear_pending_draft(user_id) response = result.get("summary", "") if result["ok"] else result.get("error", "⚠️ 오류") collected.append(response) await state_stream.push(job.id, "result", {"content": response}) await conversation_store.add(user_id, "assistant", response) else: # 문서 도구 호출 시 안내 문구 if tool_name == "document" and job.callback == "synology": notice = "서고를 확인하는 중입니다..." await send_to_synology(notice, raw=True) # 일반 도구 실행 (document.ask는 긴 timeout + 중간 안내) timeout = DOCUMENT_ASK_TIMEOUT if (tool_name == "document" and operation == "ask") else TOOL_TIMEOUT doc_start = time() if tool_name == "document": logger.info("Job %s document.%s domain_hit=%s list_request=%s query=%s", job.id, operation, any(k in job.message.lower() for k in ["산업안전", "위험성평가", "asme", "법령"]), any(s in job.message.lower() for s in ["목록", "리스트", "제목만"]), params.get("query", "")[:50]) # document.ask는 Gemma 분석으로 오래 걸림 → 10초 후 중간 안내 if tool_name == "document" and operation == "ask" and job.callback == "synology": tool_task = asyncio.create_task( asyncio.wait_for(execute_tool(tool_name, operation, params), timeout=timeout) ) analyze_notice_sent = False while not tool_task.done(): await asyncio.sleep(2) if not tool_task.done() and not analyze_notice_sent and (time() - doc_start) >= 10: await send_to_synology("자료를 분석하고 있습니다...", raw=True) analyze_notice_sent = True try: result = tool_task.result() except asyncio.TimeoutError: result = {"ok": False, "tool": tool_name, "operation": operation, "data": [], "summary": "", "error": "⚠️ 서비스 응답 시간이 초과되었습니다."} else: try: result = await asyncio.wait_for(execute_tool(tool_name, operation, params), timeout=timeout) except asyncio.TimeoutError: result = {"ok": False, "tool": tool_name, "operation": operation, "data": [], "summary": "", "error": "⚠️ 서비스 응답 시간이 초과되었습니다."} if tool_name == "document": logger.info("Job %s document.%s ok=%s elapsed=%.1fs", job.id, operation, result.get("ok"), time() - doc_start) if not result["ok"]: response = result.get("error", "⚠️ 서비스를 사용할 수 없습니다.") collected.append(response) await state_stream.push(job.id, "result", {"content": response}) else: # create_draft → pending에 저장 + 확인 요청 if operation == "create_draft": conversation_store.set_pending_draft(user_id, result["data"]) response = result["summary"] + "\n\n'확인' 또는 '취소'로 답해주세요." collected.append(response) await state_stream.push(job.id, "result", {"content": response}) elif result.get("render_mode") == "final": # 이미 포맷된 응답 (document.ask 등) — EXAONE 재포맷 스킵 response = result.get("rendered_text", result.get("summary", "결과를 조회했습니다.")) collected.append(response) await state_stream.push(job.id, "result", {"content": response}) elif result.get("render_mode") == "analyze": # Tier 2: 문서 전문 → Gemma 분석 doc_data = result["data"] doc_start_analyze = time() logger.info("Job %s document.analyze doc_id=%s query=%s", job.id, doc_data.get("doc_id"), doc_data.get("query", "")[:80]) summary_text = doc_data.get("ai_summary") or "(요약 없음)" analyze_messages = [ {"role": "system", "content": ( "너는 산업안전 문서 분석 전문가야. " "아래 문서를 분석하여 질문에 답해. " "규칙: " "1) 문서에 있는 내용만 근거로 삼아라. " "2) 문서에 없는 내용은 추정하지 말고 '문서에 명시되지 않음'이라고 답해라. " "3) 답변은 다음 구조로: " " [근거] 법령/기준 인용 (있으면) " " [해설] 실무 적용 방법 " " [사례] 유사 사고/재해 사례 (문서에 있으면) " " [요약] 왜 중요한지 한 줄 " "4) 해당 층이 문서에 없으면 그 섹션은 생략해라. " "5) 순수 텍스트만 (마크다운/코드블록 금지)." )}, {"role": "user", "content": ( f"[문서: {doc_data['title']}]\n" f"유형: {doc_data.get('document_type', '미분류')}\n" f"요약: {summary_text}\n\n" f"{doc_data['content'][:12000]}\n\n" f"[질문]\n{doc_data['query']}" )}, ] if job.callback == "synology": await send_to_synology("자료를 분석하고 있습니다...", raw=True) ok = await _stream_with_cancel( backend_registry.reasoner, "", job, collected, messages=analyze_messages ) logger.info("Job %s document.analyze ok=%s elapsed=%.1fs", job.id, ok, time() - doc_start_analyze) if not ok: return if collected: await conversation_store.add(user_id, "assistant", "".join(collected)) else: # 결과를 EXAONE에 전달하여 자연어로 정리 (평문 프롬프트 사용) tool_json = json.dumps(result["data"], ensure_ascii=False) if len(tool_json) > MAX_TOOL_PAYLOAD: tool_json = tool_json[:MAX_TOOL_PAYLOAD] + "...(truncated)" format_messages = [ {"role": "system", "content": ( "너는 이드, 상냥한 AI 어시스턴트야. " "도구 결과를 사용자에게 간결하게 전달해. " "규칙: " "1) 순수 텍스트만 (마크다운/코드블록 금지). " "2) 데이터에 없는 날짜/시간/숫자를 절대 지어내지 마. " "3) 목록은 짧게 한 줄씩. " "4) 정상/이상만 구분해서 요약 우선, 상세는 뒤에." )}, {"role": "user", "content": f"아래 도구 결과를 정리해줘. 요약 한 줄 + 목록:\n\n{tool_json}"}, ] try: response = await _complete_with_heartbeat( backend_registry.classifier, "", job.id, messages=format_messages, beat_msg="결과를 정리하고 있습니다..." ) # 혹시 JSON 잔재가 있으면 추출 if response.strip().startswith("{"): try: parsed = json.loads(response) response = parsed.get("response", response) except (json.JSONDecodeError, AttributeError): pass except Exception: response = result.get("summary", "결과를 조회했습니다.") collected.append(response) await state_stream.push(job.id, "result", {"content": response}) await conversation_store.add(user_id, "assistant", "".join(collected)) elif action == "clarify": # === CLARIFY: 추가 질문 === collected.append(response_text) await state_stream.push(job.id, "result", {"content": response_text}) await conversation_store.add(user_id, "assistant", response_text) elif action == "route" and settings.pipeline_enabled and backend_registry.is_healthy("reasoner"): # === ROUTE: Gemma reasoning === reasoning_model = backend_registry.reasoner.model # 원본 사용자 메시지를 그대로 전달 (route_prompt는 신뢰 안 함 — EXAONE이 답변까지 미리 작성하는 문제) rewritten_message = job.message[:MAX_PROMPT_LENGTH] job.rewritten_message = rewritten_message # Gemma busy 안내 (Hybrid 부하 판단 — inference 강제 측정 안 함) if job.callback == "synology": load = await backend_registry.get_load_status("reasoner", force_measure=False) if load["load"] in ("바쁨", "매우 바쁨"): await send_to_synology( f"⏳ 추론 모델이 현재 바쁜 상태야({load['load']}). 좀 걸릴 수 있어...", raw=True ) if job.callback != "synology": await state_stream.push(job.id, "rewrite", {"content": rewritten_message}) else: await send_to_synology("📝 더 깊이 살펴볼게요...", raw=True) if job.status == JobStatus.cancelled: return await state_stream.push(job.id, "processing", {"message": "Gemma 4가 응답을 생성하고 있습니다..."}) # KST 현재 시간을 system prompt에 주입 from datetime import datetime, timezone, timedelta kst = timezone(timedelta(hours=9)) now_kst = datetime.now(kst).strftime("%Y년 %m월 %d일 %H:%M (%A) KST") reasoner_system = f"{backend_registry.reasoner.system_prompt}\n\n현재 시간: {now_kst} (한국 표준시)" # 대화 이력을 OpenAI messages 형식으로 변환 (현재 user 메시지 포함됨) history_msgs = (await conversation_store.get(user_id))[-10:] reasoner_messages = [{"role": "system", "content": reasoner_system}] for m in history_msgs: reasoner_messages.append({"role": m.role, "content": m.content}) # 현재 메시지가 history 마지막에 없으면 추가 (안전장치) if not reasoner_messages or reasoner_messages[-1].get("content") != rewritten_message: reasoner_messages.append({"role": "user", "content": rewritten_message}) try: ok = await _stream_with_cancel(backend_registry.reasoner, rewritten_message, job, collected, messages=reasoner_messages) if not ok: return except Exception: logger.warning("Reasoner failed for job %s, falling back to EXAONE", job.id, exc_info=True) if job.status == JobStatus.cancelled: return await state_stream.push(job.id, "processing", {"message": "모델 전환 중..."}) reasoning_model = classify_model ok = await _stream_with_cancel(backend_registry.classifier, job.message, job, collected) if not ok: return if collected: await conversation_store.add(user_id, "assistant", "".join(collected)) elif action == "direct" and response_text and not looks_like_router_json(response_text): # === DIRECT: 분류기가 자체 답변을 만든 정상 경로 === collected.append(response_text) await state_stream.push(job.id, "result", {"content": response_text}) if collected: await conversation_store.add(user_id, "assistant", "".join(collected)) else: # === FALLBACK === # Reached when: # - classification_failed (parser couldn't extract router JSON), # - action="direct" but response is empty / itself router JSON, # - action="route" but pipeline disabled or reasoner unhealthy, # - any unknown action. # # MUST NOT call backend_registry.classifier here. That adapter # carries CLASSIFIER_PROMPT, so reusing it for user-facing # generation streams router JSON to chat. Use chat_fallback, # which shares the physical model but uses a chat-shaped prompt. fallback_path = True if action == "direct" and response_text and looks_like_router_json(response_text): logger.warning( "Direct response_text looked like router JSON for job %s, redirecting to chat_fallback", job.id, ) fallback_text = await _fetch_fallback_text(job) if fallback_text: collected.append(fallback_text) await state_stream.push(job.id, "result", {"content": fallback_text}) await conversation_store.add(user_id, "assistant", fallback_text) # --- Complete --- if not collected: job_manager.set_status(job.id, JobStatus.failed) await state_stream.push(job.id, "error", {"message": "응답을 받지 못했습니다."}) status = "failed" safe_msg = ( "분류기가 응답을 제대로 만들지 못했어요. 다시 한 번 요청해 주세요." if fallback_path else "⚠️ 응답을 받지 못했습니다. 다시 시도해주세요." ) await _send_callback(job, safe_msg) else: # Belt-and-suspenders leak guard: even if everything above behaved, # never let router JSON become the user-visible final answer. final_text = "".join(collected) if looks_like_router_json(final_text): logger.warning( "Router-like JSON in collected output for job %s, replacing with safe message", job.id, ) job_manager.set_status(job.id, JobStatus.failed) await state_stream.push( job.id, "error", {"message": "응답을 받지 못했습니다."} ) status = "failed" await _send_callback( job, "분류기가 응답을 제대로 만들지 못했어요. 다시 한 번 요청해 주세요.", ) else: job_manager.set_status(job.id, JobStatus.completed) await state_stream.push(job.id, "done", {"message": "완료"}) status = "completed" await _send_callback(job, final_text) # --- DB 로깅 --- latency_ms = (time() - start_time) * 1000 try: await log_completion( job.id, status, len("".join(collected)), latency_ms, time(), rewrite_model=classify_model, reasoning_model=reasoning_model, rewritten_message=rewritten_message, rewrite_latency_ms=classify_latency, ) except Exception: logger.warning("Failed to log completion for job %s", job.id, exc_info=True) except asyncio.CancelledError: job_manager.set_status(job.id, JobStatus.cancelled) await state_stream.push(job.id, "error", {"message": "작업이 취소되었습니다."}) try: await log_completion(job.id, "cancelled", 0, (time() - start_time) * 1000, time()) except Exception: pass except Exception: logger.exception("Worker failed for job %s", job.id) job_manager.set_status(job.id, JobStatus.failed) await state_stream.push(job.id, "error", {"message": "내부 오류가 발생했습니다."}) if job.callback == "synology": try: await send_to_synology("⚠️ 처리 중 오류가 발생했습니다. 다시 시도해주세요.", raw=True) job.response_sent = True except Exception: pass try: await log_completion(job.id, "failed", 0, (time() - start_time) * 1000, time()) except Exception: pass finally: await state_stream.push_done(job.id) # Synology 무응답 방지: 응답이 한 번도 안 갔으면 에러 메시지 if job.callback == "synology" and not job.response_sent: try: await send_to_synology("⚠️ 요청을 처리하지 못했습니다. 다시 시도해주세요.", raw=True) except Exception: pass