86c076fcf9
When the classifier (gemma4:e4b) timed out or returned unparseable
output, the worker's "direct" branch re-called backend_registry.classifier
with the original user message. The classifier still had CLASSIFIER_PROMPT
attached, so it dutifully emitted router JSON like
{"action": "route", "response": "추론 모델에게 전달할게요!", ...}
which was streamed verbatim to Synology Chat as the bot's answer.
The reasoning model (Gemma 26B on Mac mini) was never actually invoked.
Changes:
- New services/classifier_io.py with parse_classification (returns explicit
classification_failed instead of silently morphing to direct) and
looks_like_router_json (defense-in-depth guard on any user-facing output).
- New BackendRegistry.chat_fallback adapter — same physical model as the
classifier but with CHAT_FALLBACK_PROMPT (no JSON, no routing meta).
This is what the worker now uses for failed-classification recovery.
- worker.py direct branch split into two:
* elif action=="direct" and response_text and not router_json → push as-is
* else → _fetch_fallback_text via chat_fallback (never the classifier),
with leak guard suppressing router-shaped output.
- Belt-and-suspenders leak check on the final concatenated answer before
_send_callback fires.
- Static safe message ("분류기가 응답을 제대로 만들지 못했어요...") when the
fallback path produces nothing usable.
Tests:
- 28 unit tests in tests/test_classifier_io.py covering parser failure
modes and the leak guard (incl. verbatim production payload).
- Integration tests in tests/test_worker_fallback.py asserting
backend_registry.classifier is NOT called by the fallback path,
chat_fallback IS called, router JSON output is suppressed, and the
chat_fallback adapter system_prompt != CLASSIFIER_PROMPT.
Out of scope: long-input pre-routing optimization, EXAONE_* env rename,
full model routing redesign.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
710 lines
36 KiB
Python
710 lines
36 KiB
Python
"""Worker — EXAONE 분류 → direct/route/clarify/tools 분기 (cancel-safe + fallback)."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import json
|
|
import logging
|
|
from time import time
|
|
|
|
from config import settings
|
|
from db.database import log_completion, log_request
|
|
from models.schemas import JobStatus
|
|
from services.backend_registry import backend_registry
|
|
from services.classifier_io import looks_like_router_json, parse_classification
|
|
from services.conversation import conversation_store
|
|
from services.job_manager import Job, job_manager
|
|
from services.state_stream import state_stream
|
|
from services.synology_sender import send_to_synology
|
|
from tools.registry import execute_tool
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
HEARTBEAT_INTERVAL = 4.0
|
|
CLASSIFY_HEARTBEAT = 2.0
|
|
MAX_PROMPT_LENGTH = 1000
|
|
SYNOLOGY_MAX_LEN = 4000
|
|
MAX_TOOL_PAYLOAD = 2000
|
|
TOOL_TIMEOUT = 10.0
|
|
DOCUMENT_ASK_TIMEOUT = 35.0
|
|
|
|
|
|
async def _complete_with_heartbeat(adapter, message: str, job_id: str, *, messages=None, beat_msg="처리 중...") -> str:
|
|
"""complete_chat + heartbeat 병행."""
|
|
result_holder: dict[str, str] = {}
|
|
exc_holder: list[Exception] = []
|
|
|
|
async def call():
|
|
try:
|
|
result_holder["text"] = await adapter.complete_chat(message, messages=messages)
|
|
except Exception as e:
|
|
exc_holder.append(e)
|
|
|
|
task = asyncio.create_task(call())
|
|
while not task.done():
|
|
await asyncio.sleep(CLASSIFY_HEARTBEAT)
|
|
if not task.done():
|
|
await state_stream.push(job_id, "processing", {"message": beat_msg})
|
|
|
|
if exc_holder:
|
|
raise exc_holder[0]
|
|
return result_holder.get("text", "")
|
|
|
|
|
|
async def _stream_with_cancel(adapter, message: str, job: Job, collected: list[str], *, messages=None) -> bool:
|
|
"""스트리밍 + cancel 체크."""
|
|
last_heartbeat = asyncio.get_event_loop().time()
|
|
|
|
async for chunk in adapter.stream_chat(message, messages=messages):
|
|
if job.status == JobStatus.cancelled:
|
|
return False
|
|
collected.append(chunk)
|
|
await state_stream.push(job.id, "result", {"content": chunk})
|
|
|
|
now = asyncio.get_event_loop().time()
|
|
if now - last_heartbeat >= HEARTBEAT_INTERVAL:
|
|
await state_stream.push(job.id, "processing", {"message": "응답 생성 중..."})
|
|
last_heartbeat = now
|
|
|
|
return True
|
|
|
|
|
|
async def _fetch_fallback_text(job: Job) -> str:
|
|
"""Generate a safe natural-language reply when classification fails.
|
|
|
|
Uses ``backend_registry.chat_fallback`` (same physical model as the
|
|
classifier, but with a chat-shaped system prompt). Never reuses
|
|
``backend_registry.classifier`` — that adapter has CLASSIFIER_PROMPT
|
|
attached, so it would emit router JSON straight to the user.
|
|
|
|
Returns "" when the fallback adapter is unavailable, the call fails,
|
|
or the output looks like router JSON. Callers treat empty as "send
|
|
the safe error message instead".
|
|
"""
|
|
adapter = backend_registry.chat_fallback
|
|
if adapter is None:
|
|
logger.warning("chat_fallback adapter not initialized for job %s", job.id)
|
|
return ""
|
|
|
|
await state_stream.push(
|
|
job.id, "processing", {"message": "응답을 생성하고 있습니다..."}
|
|
)
|
|
|
|
try:
|
|
text = await _complete_with_heartbeat(
|
|
adapter, job.message, job.id,
|
|
beat_msg="응답을 생성하고 있습니다...",
|
|
)
|
|
except Exception:
|
|
logger.warning("Chat fallback failed for job %s", job.id, exc_info=True)
|
|
return ""
|
|
|
|
if looks_like_router_json(text):
|
|
logger.warning(
|
|
"Router-like JSON detected in fallback output for job %s, suppressing",
|
|
job.id,
|
|
)
|
|
return ""
|
|
|
|
return text.strip()
|
|
|
|
|
|
async def _build_system_status(force_measure: bool = True) -> str:
|
|
"""시스템 상태 메시지 생성. force_measure=True면 inference latency 1회 측정."""
|
|
from services import job_queue as jq_module
|
|
health = backend_registry.health_summary()
|
|
lines = ["🤖 시스템 상태:", ""]
|
|
|
|
for role in ("classifier", "reasoner"):
|
|
info = health.get(role)
|
|
if not info:
|
|
continue
|
|
load = await backend_registry.get_load_status(role, force_measure=force_measure)
|
|
connected = "정상" if info["healthy"] else "연결 안 됨"
|
|
load_str = load["load"]
|
|
source = load.get("source", "hybrid")
|
|
|
|
line1 = f"{info['name']}: {connected} — {load_str}"
|
|
lines.append(line1)
|
|
|
|
if source == "status_api":
|
|
# /status API 직접 조회 — 정확한 active_jobs
|
|
active = load.get("active_jobs", 0)
|
|
total = load.get("total_requests", 0)
|
|
lines.append(f" active_jobs: {active} (total: {total}) [status API]")
|
|
lines.append(f" health {load['health_ms']:.0f}ms")
|
|
else:
|
|
# hybrid fallback (health baseline + 조건부 ping)
|
|
baseline = load["health_baseline_ms"]
|
|
ratio = (load["health_ms"] / baseline) if baseline > 0 else 1.0
|
|
lines.append(f" health {load['health_ms']:.0f}ms (baseline {baseline:.0f}ms, {ratio:.1f}배)")
|
|
if load["measured"]:
|
|
lines.append(f" ping {load['inference_ms']:.0f}ms")
|
|
else:
|
|
lines.append(" ping: 측정 안 함")
|
|
lines.append("")
|
|
|
|
queue = jq_module.job_queue.stats if jq_module.job_queue else {"pending": 0, "active": 0}
|
|
lines.append(f"내 작업 큐: 처리 중 {queue.get('active', 0)}건, 대기 {queue.get('pending', 0)}건")
|
|
return "\n".join(lines)
|
|
|
|
|
|
async def _send_callback(job: Job, text: str) -> None:
|
|
"""Synology callback이면 전송 + response_sent 플래그."""
|
|
if job.callback == "synology":
|
|
if len(text) > SYNOLOGY_MAX_LEN:
|
|
text = text[:SYNOLOGY_MAX_LEN] + "\n\n...(생략됨)"
|
|
await send_to_synology(text)
|
|
job.response_sent = True
|
|
|
|
|
|
def _pre_route(message: str) -> dict | None:
|
|
"""키워드 기반 사전 라우팅. EXAONE 7.8B 분류기 보완."""
|
|
from datetime import datetime, timedelta
|
|
msg = message.lower().strip()
|
|
now = datetime.now()
|
|
|
|
# 캘린더 키워드
|
|
cal_keywords = ["일정", "캘린더", "스케줄", "약속", "미팅", "회의"]
|
|
if any(k in msg for k in cal_keywords):
|
|
# 생성 요청
|
|
if any(k in msg for k in ["잡아", "만들", "등록", "추가", "넣어"]):
|
|
return None # EXAONE이 파라미터 추출해야 함
|
|
# 오늘
|
|
if "오늘" in msg:
|
|
return {"action": "tools", "tool": "calendar", "operation": "today", "params": {}}
|
|
# 내일
|
|
if "내일" in msg:
|
|
tmr = (now + timedelta(days=1)).strftime("%Y-%m-%d")
|
|
return {"action": "tools", "tool": "calendar", "operation": "search", "params": {"date_from": tmr, "date_to": tmr}}
|
|
# 이번주
|
|
if "이번" in msg and ("주" in msg or "week" in msg):
|
|
monday = now - timedelta(days=now.weekday())
|
|
sunday = monday + timedelta(days=6)
|
|
return {"action": "tools", "tool": "calendar", "operation": "search", "params": {"date_from": monday.strftime("%Y-%m-%d"), "date_to": sunday.strftime("%Y-%m-%d")}}
|
|
# 기본: 오늘
|
|
return {"action": "tools", "tool": "calendar", "operation": "today", "params": {}}
|
|
|
|
# 메일 키워드
|
|
if any(k in msg for k in ["메일", "이메일", "mail", "편지"]):
|
|
query = ""
|
|
days = 7
|
|
folder = ""
|
|
if "오늘" in msg or "4월 6일" in msg or "6일" in msg:
|
|
days = 1
|
|
# 폴더 필터링
|
|
if any(k in msg for k in ["테크니컬", "회사", "technicalkorea", "네이버웍스"]):
|
|
folder = "Technicalkorea"
|
|
elif any(k in msg for k in ["gmail", "구글", "지메일"]):
|
|
folder = "Gmail"
|
|
elif any(k in msg for k in ["개인", "inbox"]):
|
|
folder = "INBOX"
|
|
return {"action": "tools", "tool": "email", "operation": "search", "params": {"query": query, "days": days, "folder": folder}}
|
|
|
|
# Tier 2: 특정 문서 ID 명시 + 분석 키워드 → 전문 분석
|
|
import re
|
|
doc_id_match = re.search(r'(\d{3,6})\s*번', msg)
|
|
if not doc_id_match:
|
|
doc_id_match = re.search(r'#(\d{3,6})', msg)
|
|
analyze_signals = ["전체", "요약", "분석", "정리", "읽어", "전문", "자세히"]
|
|
if doc_id_match and any(s in msg for s in analyze_signals):
|
|
# query에서 doc_id 참조 제거 + 구체적 질문 없으면 기본 분석 요청
|
|
clean_query = re.sub(r'#?\d{3,6}\s*번?\s*', '', msg).strip()
|
|
clean_query = re.sub(r'^(문서|자료|파일)\s*', '', clean_query).strip()
|
|
clean_query = re.sub(r'\s*(해줘|해주세요|줘|좀)\s*$', '', clean_query).strip()
|
|
if not clean_query or clean_query in ["분석", "요약", "정리", "전체", "읽어", "전문", "자세히"]:
|
|
clean_query = "이 문서의 핵심 내용을 분석하고, 주요 사항과 실무상 중요한 포인트를 정리해주세요."
|
|
return {"action": "tools", "tool": "document", "operation": "analyze",
|
|
"params": {"doc_id": doc_id_match.group(1), "query": clean_query}}
|
|
|
|
# 문서 키워드 — 질문형/탐색형 점수 기반 분기
|
|
doc_entry = any(k in msg for k in ["문서", "도큐먼트", "자료", "파일"])
|
|
doc_action = any(k in msg for k in ["찾아", "검색", "확인", "알려", "설명", "뭐야"])
|
|
|
|
# 도메인 키워드 — "문서 찾아줘" 없이도 전문 분야 질문이면 자동 라우팅
|
|
domain_keywords = [
|
|
"산업안전", "산안법", "안전보건", "위험성평가", "중대재해",
|
|
"asme", "압력용기", "배관", "용접", "기계안전",
|
|
"화학물질", "유해물질", "msds", "ghs",
|
|
"법령", "시행령", "시행규칙", "고시", "규정",
|
|
"산업안전기사", "건설안전", "전기안전",
|
|
]
|
|
domain_hit = any(k in msg.lower() for k in domain_keywords)
|
|
|
|
if (doc_entry and doc_action) or domain_hit:
|
|
query = msg
|
|
# 명시적 문서 키워드가 있으면 제거
|
|
if doc_entry:
|
|
for rm in ["문서", "도큐먼트", "자료", "파일", "찾아줘", "찾아", "검색", "확인", "해줘", "줘", "좀"]:
|
|
query = query.replace(rm, "")
|
|
query = query.strip()
|
|
if query:
|
|
# 명시적 목록 요청만 search_full, 나머지는 ask (근거 중심 답변 기본값)
|
|
list_signals = ["목록", "리스트", "검색 결과", "몇 개", "list", "제목만", "문서만"]
|
|
list_request = any(s in msg for s in list_signals)
|
|
operation = "search_full" if list_request else "ask"
|
|
return {"action": "tools", "tool": "document", "operation": operation, "params": {"query": query}}
|
|
|
|
# 인프라 도구 키워드
|
|
infra_keywords = ["docker", "컨테이너", "디스크", "용량", "헬스체크", "tailscale", "ollama 모델", "mlx 모델", "스케줄러", "scheduler", "큐 상태", "queue", "처리 큐", "verify", "검증"]
|
|
if any(k in msg for k in infra_keywords):
|
|
# docker/컨테이너 상태
|
|
if any(k in msg for k in ["docker", "컨테이너"]):
|
|
host = "nas-company" if any(k in msg for k in ["nas", "회사"]) else "gpu"
|
|
return {"action": "tools", "tool": "infra", "operation": "status", "params": {"host": host}}
|
|
# 디스크
|
|
if any(k in msg for k in ["디스크", "용량"]):
|
|
host = ""
|
|
if "gpu" in msg: host = "gpu"
|
|
elif any(k in msg for k in ["맥미니", "macmini", "mac mini"]): host = "macmini"
|
|
return {"action": "tools", "tool": "infra", "operation": "disk", "params": {"host": host}}
|
|
# 헬스체크
|
|
if "헬스" in msg or "health" in msg:
|
|
return {"action": "tools", "tool": "infra", "operation": "health", "params": {}}
|
|
# tailscale
|
|
if "tailscale" in msg or "네트워크" in msg:
|
|
return {"action": "tools", "tool": "infra", "operation": "network", "params": {}}
|
|
# 모델 목록
|
|
if any(k in msg for k in ["ollama 모델", "mlx 모델"]):
|
|
host = "mlx" if "mlx" in msg else "gpu"
|
|
return {"action": "tools", "tool": "infra", "operation": "models", "params": {"host": host}}
|
|
# 스케줄러
|
|
if any(k in msg for k in ["스케줄러", "scheduler"]):
|
|
return {"action": "tools", "tool": "infra", "operation": "scheduler", "params": {}}
|
|
# 큐
|
|
if any(k in msg for k in ["큐 상태", "queue", "처리 큐"]):
|
|
return {"action": "tools", "tool": "infra", "operation": "queue", "params": {}}
|
|
# 검증
|
|
if any(k in msg for k in ["verify", "검증"]):
|
|
return {"action": "tools", "tool": "infra", "operation": "verify", "params": {"check_name": "gpu-snapshot"}}
|
|
|
|
# 시스템 상태 질문 — 마커만 반환 (worker에서 비동기 조회)
|
|
if any(k in msg for k in ["추론 모델", "gemma", "젬마", "서버 상태", "시스템 상태"]) or \
|
|
(any(k in msg for k in ["상태", "젬마", "gemma"]) and any(k in msg for k in ["돌아", "일하", "작동", "상태", "건강", "살아", "대기", "큐", "밀려", "바빠", "느려", "많"])):
|
|
return {"action": "system_status", "response": "", "prompt": ""}
|
|
|
|
# pending_draft 확인 응답
|
|
if msg in ("확인", "예", "yes", "ㅇㅇ", "응", "네", "좋아", "ok"):
|
|
return {"action": "tools", "tool": "calendar", "operation": "create_confirmed", "params": {}}
|
|
|
|
if msg in ("취소", "아니", "no", "ㄴㄴ"):
|
|
return {"action": "direct", "response": "알겠어, 취소했어!", "prompt": ""}
|
|
|
|
return None
|
|
|
|
|
|
async def run(job: Job) -> None:
|
|
"""사전 라우팅 → EXAONE 분류 → direct/route/clarify/tools 분기."""
|
|
start_time = time()
|
|
user_id = job.callback_meta.get("user_id", "api")
|
|
classify_model = None
|
|
reasoning_model = None
|
|
rewritten_message = ""
|
|
|
|
try:
|
|
await log_request(job.id, job.message, "classify", job.created_at)
|
|
except Exception:
|
|
logger.warning("Failed to log request for job %s", job.id, exc_info=True)
|
|
|
|
try:
|
|
# --- ACK ---
|
|
await state_stream.push(job.id, "ack", {"message": "요청을 확인했습니다."})
|
|
job_manager.set_status(job.id, JobStatus.processing)
|
|
|
|
if job.status == JobStatus.cancelled:
|
|
return
|
|
|
|
classify_model = backend_registry.classifier.model
|
|
|
|
# --- 사전 라우팅 (키워드 기반, EXAONE 스킵) ---
|
|
pre = _pre_route(job.message)
|
|
classify_latency = 0
|
|
|
|
if pre:
|
|
classification = pre
|
|
logger.info("Job %s pre-routed: %s.%s", job.id, pre.get("tool", ""), pre.get("operation", pre.get("action", "")))
|
|
else:
|
|
# --- EXAONE 분류기 호출 ---
|
|
from datetime import datetime, timezone, timedelta
|
|
kst = timezone(timedelta(hours=9))
|
|
now_kst = datetime.now(kst)
|
|
now_str = now_kst.strftime("%Y년 %m월 %d일 %H:%M (%A) KST")
|
|
history = await conversation_store.format_for_prompt(user_id)
|
|
classify_input = f"[현재 시간]\n{now_str}\n\n"
|
|
if history:
|
|
classify_input += f"[대화 이력]\n{history}\n\n"
|
|
classify_input += f"[현재 메시지]\n{job.message}"
|
|
|
|
await state_stream.push(job.id, "processing", {"message": "메시지를 분석하고 있습니다..."})
|
|
classify_start = time()
|
|
|
|
try:
|
|
raw_result = await _complete_with_heartbeat(
|
|
backend_registry.classifier, classify_input, job.id,
|
|
beat_msg="메시지를 분석하고 있습니다..."
|
|
)
|
|
except Exception:
|
|
logger.warning("Classification failed for job %s, falling back to direct", job.id)
|
|
raw_result = ""
|
|
|
|
classify_latency = (time() - classify_start) * 1000
|
|
classification = parse_classification(raw_result)
|
|
|
|
action = classification.get("action", "classification_failed")
|
|
response_text = classification.get("response", "")
|
|
route_prompt = classification.get("prompt", "")
|
|
|
|
logger.info("Job %s classified as '%s'", job.id, action)
|
|
|
|
# 대화 기록: 사용자 메시지
|
|
await conversation_store.add(user_id, "user", job.message)
|
|
|
|
collected: list[str] = []
|
|
# True iff we entered the chat-fallback branch (classification failed,
|
|
# or a routed action could not run). Drives the safe error message at
|
|
# the Complete gate.
|
|
fallback_path = False
|
|
|
|
if job.status == JobStatus.cancelled:
|
|
return
|
|
|
|
if action == "system_status":
|
|
# === SYSTEM STATUS: 부하 상태 조회 (사용자 명시 요청 → ping 측정) ===
|
|
await state_stream.push(job.id, "processing", {"message": "시스템 상태 확인 중..."})
|
|
status_text = await _build_system_status(force_measure=True)
|
|
collected.append(status_text)
|
|
await state_stream.push(job.id, "result", {"content": status_text})
|
|
await conversation_store.add(user_id, "assistant", status_text)
|
|
|
|
elif action == "tools":
|
|
# === TOOLS: 도구 실행 ===
|
|
tool_name = classification.get("tool", "")
|
|
operation = classification.get("operation", "")
|
|
params = classification.get("params", {})
|
|
|
|
logger.info("Job %s tool call: %s.%s(%s)", job.id, tool_name, operation, params)
|
|
await state_stream.push(job.id, "processing", {"message": f"🔧 {tool_name} 도구를 사용하고 있습니다..."})
|
|
|
|
# create_confirmed → pending_draft에서 데이터 가져오기
|
|
if operation == "create_confirmed":
|
|
draft = conversation_store.get_pending_draft(user_id)
|
|
if not draft:
|
|
response = "확인할 일정이 없습니다. 다시 요청해주세요."
|
|
collected.append(response)
|
|
await state_stream.push(job.id, "result", {"content": response})
|
|
await conversation_store.add(user_id, "assistant", response)
|
|
else:
|
|
try:
|
|
result = await asyncio.wait_for(execute_tool(tool_name, operation, draft), timeout=TOOL_TIMEOUT)
|
|
except asyncio.TimeoutError:
|
|
result = {"ok": False, "tool": tool_name, "operation": operation, "data": [], "summary": "", "error": "⚠️ 서비스 응답 시간이 초과되었습니다."}
|
|
conversation_store.clear_pending_draft(user_id)
|
|
response = result.get("summary", "") if result["ok"] else result.get("error", "⚠️ 오류")
|
|
collected.append(response)
|
|
await state_stream.push(job.id, "result", {"content": response})
|
|
await conversation_store.add(user_id, "assistant", response)
|
|
else:
|
|
# 문서 도구 호출 시 안내 문구
|
|
if tool_name == "document" and job.callback == "synology":
|
|
notice = "서고를 확인하는 중입니다..."
|
|
await send_to_synology(notice, raw=True)
|
|
|
|
# 일반 도구 실행 (document.ask는 긴 timeout + 중간 안내)
|
|
timeout = DOCUMENT_ASK_TIMEOUT if (tool_name == "document" and operation == "ask") else TOOL_TIMEOUT
|
|
doc_start = time()
|
|
if tool_name == "document":
|
|
logger.info("Job %s document.%s domain_hit=%s list_request=%s query=%s",
|
|
job.id, operation,
|
|
any(k in job.message.lower() for k in ["산업안전", "위험성평가", "asme", "법령"]),
|
|
any(s in job.message.lower() for s in ["목록", "리스트", "제목만"]),
|
|
params.get("query", "")[:50])
|
|
|
|
# document.ask는 Gemma 분석으로 오래 걸림 → 10초 후 중간 안내
|
|
if tool_name == "document" and operation == "ask" and job.callback == "synology":
|
|
tool_task = asyncio.create_task(
|
|
asyncio.wait_for(execute_tool(tool_name, operation, params), timeout=timeout)
|
|
)
|
|
analyze_notice_sent = False
|
|
while not tool_task.done():
|
|
await asyncio.sleep(2)
|
|
if not tool_task.done() and not analyze_notice_sent and (time() - doc_start) >= 10:
|
|
await send_to_synology("자료를 분석하고 있습니다...", raw=True)
|
|
analyze_notice_sent = True
|
|
try:
|
|
result = tool_task.result()
|
|
except asyncio.TimeoutError:
|
|
result = {"ok": False, "tool": tool_name, "operation": operation, "data": [], "summary": "", "error": "⚠️ 서비스 응답 시간이 초과되었습니다."}
|
|
else:
|
|
try:
|
|
result = await asyncio.wait_for(execute_tool(tool_name, operation, params), timeout=timeout)
|
|
except asyncio.TimeoutError:
|
|
result = {"ok": False, "tool": tool_name, "operation": operation, "data": [], "summary": "", "error": "⚠️ 서비스 응답 시간이 초과되었습니다."}
|
|
|
|
if tool_name == "document":
|
|
logger.info("Job %s document.%s ok=%s elapsed=%.1fs", job.id, operation, result.get("ok"), time() - doc_start)
|
|
|
|
if not result["ok"]:
|
|
response = result.get("error", "⚠️ 서비스를 사용할 수 없습니다.")
|
|
collected.append(response)
|
|
await state_stream.push(job.id, "result", {"content": response})
|
|
else:
|
|
# create_draft → pending에 저장 + 확인 요청
|
|
if operation == "create_draft":
|
|
conversation_store.set_pending_draft(user_id, result["data"])
|
|
response = result["summary"] + "\n\n'확인' 또는 '취소'로 답해주세요."
|
|
collected.append(response)
|
|
await state_stream.push(job.id, "result", {"content": response})
|
|
elif result.get("render_mode") == "final":
|
|
# 이미 포맷된 응답 (document.ask 등) — EXAONE 재포맷 스킵
|
|
response = result.get("rendered_text", result.get("summary", "결과를 조회했습니다."))
|
|
collected.append(response)
|
|
await state_stream.push(job.id, "result", {"content": response})
|
|
elif result.get("render_mode") == "analyze":
|
|
# Tier 2: 문서 전문 → Gemma 분석
|
|
doc_data = result["data"]
|
|
doc_start_analyze = time()
|
|
logger.info("Job %s document.analyze doc_id=%s query=%s", job.id, doc_data.get("doc_id"), doc_data.get("query", "")[:80])
|
|
summary_text = doc_data.get("ai_summary") or "(요약 없음)"
|
|
analyze_messages = [
|
|
{"role": "system", "content": (
|
|
"너는 산업안전 문서 분석 전문가야. "
|
|
"아래 문서를 분석하여 질문에 답해. "
|
|
"규칙: "
|
|
"1) 문서에 있는 내용만 근거로 삼아라. "
|
|
"2) 문서에 없는 내용은 추정하지 말고 '문서에 명시되지 않음'이라고 답해라. "
|
|
"3) 답변은 다음 구조로: "
|
|
" [근거] 법령/기준 인용 (있으면) "
|
|
" [해설] 실무 적용 방법 "
|
|
" [사례] 유사 사고/재해 사례 (문서에 있으면) "
|
|
" [요약] 왜 중요한지 한 줄 "
|
|
"4) 해당 층이 문서에 없으면 그 섹션은 생략해라. "
|
|
"5) 순수 텍스트만 (마크다운/코드블록 금지)."
|
|
)},
|
|
{"role": "user", "content": (
|
|
f"[문서: {doc_data['title']}]\n"
|
|
f"유형: {doc_data.get('document_type', '미분류')}\n"
|
|
f"요약: {summary_text}\n\n"
|
|
f"{doc_data['content'][:12000]}\n\n"
|
|
f"[질문]\n{doc_data['query']}"
|
|
)},
|
|
]
|
|
if job.callback == "synology":
|
|
await send_to_synology("자료를 분석하고 있습니다...", raw=True)
|
|
ok = await _stream_with_cancel(
|
|
backend_registry.reasoner, "", job, collected, messages=analyze_messages
|
|
)
|
|
logger.info("Job %s document.analyze ok=%s elapsed=%.1fs", job.id, ok, time() - doc_start_analyze)
|
|
if not ok:
|
|
return
|
|
if collected:
|
|
await conversation_store.add(user_id, "assistant", "".join(collected))
|
|
else:
|
|
# 결과를 EXAONE에 전달하여 자연어로 정리 (평문 프롬프트 사용)
|
|
tool_json = json.dumps(result["data"], ensure_ascii=False)
|
|
if len(tool_json) > MAX_TOOL_PAYLOAD:
|
|
tool_json = tool_json[:MAX_TOOL_PAYLOAD] + "...(truncated)"
|
|
format_messages = [
|
|
{"role": "system", "content": (
|
|
"너는 이드, 상냥한 AI 어시스턴트야. "
|
|
"도구 결과를 사용자에게 간결하게 전달해. "
|
|
"규칙: "
|
|
"1) 순수 텍스트만 (마크다운/코드블록 금지). "
|
|
"2) 데이터에 없는 날짜/시간/숫자를 절대 지어내지 마. "
|
|
"3) 목록은 짧게 한 줄씩. "
|
|
"4) 정상/이상만 구분해서 요약 우선, 상세는 뒤에."
|
|
)},
|
|
{"role": "user", "content": f"아래 도구 결과를 정리해줘. 요약 한 줄 + 목록:\n\n{tool_json}"},
|
|
]
|
|
try:
|
|
response = await _complete_with_heartbeat(
|
|
backend_registry.classifier, "", job.id,
|
|
messages=format_messages,
|
|
beat_msg="결과를 정리하고 있습니다..."
|
|
)
|
|
# 혹시 JSON 잔재가 있으면 추출
|
|
if response.strip().startswith("{"):
|
|
try:
|
|
parsed = json.loads(response)
|
|
response = parsed.get("response", response)
|
|
except (json.JSONDecodeError, AttributeError):
|
|
pass
|
|
except Exception:
|
|
response = result.get("summary", "결과를 조회했습니다.")
|
|
collected.append(response)
|
|
await state_stream.push(job.id, "result", {"content": response})
|
|
|
|
await conversation_store.add(user_id, "assistant", "".join(collected))
|
|
|
|
elif action == "clarify":
|
|
# === CLARIFY: 추가 질문 ===
|
|
collected.append(response_text)
|
|
await state_stream.push(job.id, "result", {"content": response_text})
|
|
await conversation_store.add(user_id, "assistant", response_text)
|
|
|
|
elif action == "route" and settings.pipeline_enabled and backend_registry.is_healthy("reasoner"):
|
|
# === ROUTE: Gemma reasoning ===
|
|
reasoning_model = backend_registry.reasoner.model
|
|
# 원본 사용자 메시지를 그대로 전달 (route_prompt는 신뢰 안 함 — EXAONE이 답변까지 미리 작성하는 문제)
|
|
rewritten_message = job.message[:MAX_PROMPT_LENGTH]
|
|
job.rewritten_message = rewritten_message
|
|
|
|
# Gemma busy 안내 (Hybrid 부하 판단 — inference 강제 측정 안 함)
|
|
if job.callback == "synology":
|
|
load = await backend_registry.get_load_status("reasoner", force_measure=False)
|
|
if load["load"] in ("바쁨", "매우 바쁨"):
|
|
await send_to_synology(
|
|
f"⏳ 추론 모델이 현재 바쁜 상태야({load['load']}). 좀 걸릴 수 있어...",
|
|
raw=True
|
|
)
|
|
|
|
if job.callback != "synology":
|
|
await state_stream.push(job.id, "rewrite", {"content": rewritten_message})
|
|
else:
|
|
await send_to_synology("📝 더 깊이 살펴볼게요...", raw=True)
|
|
|
|
if job.status == JobStatus.cancelled:
|
|
return
|
|
|
|
await state_stream.push(job.id, "processing", {"message": "Gemma 4가 응답을 생성하고 있습니다..."})
|
|
|
|
# KST 현재 시간을 system prompt에 주입
|
|
from datetime import datetime, timezone, timedelta
|
|
kst = timezone(timedelta(hours=9))
|
|
now_kst = datetime.now(kst).strftime("%Y년 %m월 %d일 %H:%M (%A) KST")
|
|
reasoner_system = f"{backend_registry.reasoner.system_prompt}\n\n현재 시간: {now_kst} (한국 표준시)"
|
|
|
|
# 대화 이력을 OpenAI messages 형식으로 변환 (현재 user 메시지 포함됨)
|
|
history_msgs = (await conversation_store.get(user_id))[-10:]
|
|
reasoner_messages = [{"role": "system", "content": reasoner_system}]
|
|
for m in history_msgs:
|
|
reasoner_messages.append({"role": m.role, "content": m.content})
|
|
# 현재 메시지가 history 마지막에 없으면 추가 (안전장치)
|
|
if not reasoner_messages or reasoner_messages[-1].get("content") != rewritten_message:
|
|
reasoner_messages.append({"role": "user", "content": rewritten_message})
|
|
|
|
try:
|
|
ok = await _stream_with_cancel(backend_registry.reasoner, rewritten_message, job, collected, messages=reasoner_messages)
|
|
if not ok:
|
|
return
|
|
except Exception:
|
|
logger.warning("Reasoner failed for job %s, falling back to EXAONE", job.id, exc_info=True)
|
|
if job.status == JobStatus.cancelled:
|
|
return
|
|
await state_stream.push(job.id, "processing", {"message": "모델 전환 중..."})
|
|
reasoning_model = classify_model
|
|
ok = await _stream_with_cancel(backend_registry.classifier, job.message, job, collected)
|
|
if not ok:
|
|
return
|
|
|
|
if collected:
|
|
await conversation_store.add(user_id, "assistant", "".join(collected))
|
|
|
|
elif action == "direct" and response_text and not looks_like_router_json(response_text):
|
|
# === DIRECT: 분류기가 자체 답변을 만든 정상 경로 ===
|
|
collected.append(response_text)
|
|
await state_stream.push(job.id, "result", {"content": response_text})
|
|
if collected:
|
|
await conversation_store.add(user_id, "assistant", "".join(collected))
|
|
|
|
else:
|
|
# === FALLBACK ===
|
|
# Reached when:
|
|
# - classification_failed (parser couldn't extract router JSON),
|
|
# - action="direct" but response is empty / itself router JSON,
|
|
# - action="route" but pipeline disabled or reasoner unhealthy,
|
|
# - any unknown action.
|
|
#
|
|
# MUST NOT call backend_registry.classifier here. That adapter
|
|
# carries CLASSIFIER_PROMPT, so reusing it for user-facing
|
|
# generation streams router JSON to chat. Use chat_fallback,
|
|
# which shares the physical model but uses a chat-shaped prompt.
|
|
fallback_path = True
|
|
if action == "direct" and response_text and looks_like_router_json(response_text):
|
|
logger.warning(
|
|
"Direct response_text looked like router JSON for job %s, redirecting to chat_fallback",
|
|
job.id,
|
|
)
|
|
fallback_text = await _fetch_fallback_text(job)
|
|
if fallback_text:
|
|
collected.append(fallback_text)
|
|
await state_stream.push(job.id, "result", {"content": fallback_text})
|
|
await conversation_store.add(user_id, "assistant", fallback_text)
|
|
|
|
# --- Complete ---
|
|
if not collected:
|
|
job_manager.set_status(job.id, JobStatus.failed)
|
|
await state_stream.push(job.id, "error", {"message": "응답을 받지 못했습니다."})
|
|
status = "failed"
|
|
safe_msg = (
|
|
"분류기가 응답을 제대로 만들지 못했어요. 다시 한 번 요청해 주세요."
|
|
if fallback_path
|
|
else "⚠️ 응답을 받지 못했습니다. 다시 시도해주세요."
|
|
)
|
|
await _send_callback(job, safe_msg)
|
|
else:
|
|
# Belt-and-suspenders leak guard: even if everything above behaved,
|
|
# never let router JSON become the user-visible final answer.
|
|
final_text = "".join(collected)
|
|
if looks_like_router_json(final_text):
|
|
logger.warning(
|
|
"Router-like JSON in collected output for job %s, replacing with safe message",
|
|
job.id,
|
|
)
|
|
job_manager.set_status(job.id, JobStatus.failed)
|
|
await state_stream.push(
|
|
job.id, "error", {"message": "응답을 받지 못했습니다."}
|
|
)
|
|
status = "failed"
|
|
await _send_callback(
|
|
job,
|
|
"분류기가 응답을 제대로 만들지 못했어요. 다시 한 번 요청해 주세요.",
|
|
)
|
|
else:
|
|
job_manager.set_status(job.id, JobStatus.completed)
|
|
await state_stream.push(job.id, "done", {"message": "완료"})
|
|
status = "completed"
|
|
await _send_callback(job, final_text)
|
|
|
|
# --- DB 로깅 ---
|
|
latency_ms = (time() - start_time) * 1000
|
|
try:
|
|
await log_completion(
|
|
job.id, status, len("".join(collected)), latency_ms, time(),
|
|
rewrite_model=classify_model,
|
|
reasoning_model=reasoning_model,
|
|
rewritten_message=rewritten_message,
|
|
rewrite_latency_ms=classify_latency,
|
|
)
|
|
except Exception:
|
|
logger.warning("Failed to log completion for job %s", job.id, exc_info=True)
|
|
|
|
except asyncio.CancelledError:
|
|
job_manager.set_status(job.id, JobStatus.cancelled)
|
|
await state_stream.push(job.id, "error", {"message": "작업이 취소되었습니다."})
|
|
try:
|
|
await log_completion(job.id, "cancelled", 0, (time() - start_time) * 1000, time())
|
|
except Exception:
|
|
pass
|
|
except Exception:
|
|
logger.exception("Worker failed for job %s", job.id)
|
|
job_manager.set_status(job.id, JobStatus.failed)
|
|
await state_stream.push(job.id, "error", {"message": "내부 오류가 발생했습니다."})
|
|
if job.callback == "synology":
|
|
try:
|
|
await send_to_synology("⚠️ 처리 중 오류가 발생했습니다. 다시 시도해주세요.", raw=True)
|
|
job.response_sent = True
|
|
except Exception:
|
|
pass
|
|
try:
|
|
await log_completion(job.id, "failed", 0, (time() - start_time) * 1000, time())
|
|
except Exception:
|
|
pass
|
|
finally:
|
|
await state_stream.push_done(job.id)
|
|
# Synology 무응답 방지: 응답이 한 번도 안 갔으면 에러 메시지
|
|
if job.callback == "synology" and not job.response_sent:
|
|
try:
|
|
await send_to_synology("⚠️ 요청을 처리하지 못했습니다. 다시 시도해주세요.", raw=True)
|
|
except Exception:
|
|
pass
|