feat: EXAONE 분류기 — direct/route/clarify 라우팅 + 대화 기억
- EXAONE: 분류기+프롬프트엔지니어+직접응답 (JSON 출력) - 간단한 질문은 EXAONE이 직접 답변 (파이프라인 스킵) - 복잡한 질문은 AI 최적화 프롬프트로 Gemma에 전달 - 모호한 질문은 사용자에게 추가 질문 (clarify) - user별 최근 대화 기억 (최대 10개, 1시간 TTL) - ModelAdapter: messages 직접 전달 옵션 추가 Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -10,40 +10,45 @@ from services.model_adapter import ModelAdapter
|
|||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
REWRITER_PROMPT = (
|
CLASSIFIER_PROMPT = """\
|
||||||
"너는 질문 재구성 전문가다. "
|
너는 AI 라우터다. 사용자 메시지를 분석하여 JSON으로 응답하라.
|
||||||
"사용자의 질문을 분석하여 의도를 명확히 하고, 더 명확한 질문으로 재작성하라. "
|
반드시 아래 3가지 action 중 하나를 선택하라:
|
||||||
"재구성된 질문만 출력하라. 부연 설명이나 답변은 절대 하지 마라. "
|
|
||||||
"인사, 잡담, 간단한 질문, 1~2문장 질문은 원문 그대로 출력하라. "
|
1. "direct" — 인사, 잡담, 간단한 질문, 자기소개 요청 등. 네가 직접 답변한다.
|
||||||
"복잡하거나 모호한 질문만 재구성하라."
|
2. "route" — 복잡한 질문, 분석, 설명, 코딩 등. 추론 모델에게 넘긴다. 이때 prompt 필드에 추론 모델이 이해하기 좋게 정리된 프롬프트를 작성하라.
|
||||||
)
|
3. "clarify" — 질문이 모호하거나 정보가 부족할 때. 사용자에게 추가 질문한다.
|
||||||
|
|
||||||
|
반드시 아래 JSON 형식으로만 응답하라. JSON 외 텍스트는 절대 출력하지 마라:
|
||||||
|
{"action": "direct|route|clarify", "response": "direct/clarify일 때 사용자에게 보낼 텍스트", "prompt": "route일 때 추론 모델에게 보낼 프롬프트"}
|
||||||
|
|
||||||
|
너의 이름은 '이드'이고, 상냥하고 친근하게 대화한다.
|
||||||
|
너는 GPU 서버의 EXAONE 모델과 맥미니의 Gemma4 모델로 구성된 NanoClaude 파이프라인에서 돌아간다.
|
||||||
|
대화 이력이 있으면 맥락을 고려하라.\
|
||||||
|
"""
|
||||||
|
|
||||||
REASONER_PROMPT = (
|
REASONER_PROMPT = (
|
||||||
"너는 '이드'라는 이름의 상냥하고 친근한 AI 어시스턴트야. "
|
"너는 '이드'라는 이름의 상냥하고 친근한 AI 어시스턴트야. "
|
||||||
"너는 GPU 서버의 EXAONE 모델(질문 정리)과 맥미니의 Gemma4 모델(답변 생성)로 구성된 "
|
"간결하고 자연스럽게 대화해. "
|
||||||
"NanoClaude 파이프라인 위에서 돌아가고 있어. "
|
|
||||||
"간결하고 자연스럽게 대화해. 인사에는 짧게 인사로 답하고, "
|
|
||||||
"질문에는 핵심만 명확하게 답해. "
|
"질문에는 핵심만 명확하게 답해. "
|
||||||
"불필요한 구조화(번호 매기기, 헤더, 마크다운)는 피하고, 대화하듯 편하게 답변해. "
|
"불필요한 구조화(번호 매기기, 헤더, 마크다운)는 피하고, 대화하듯 편하게 답변해."
|
||||||
"너 자신에 대한 질문에는 솔직하게 답해."
|
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
class BackendRegistry:
|
class BackendRegistry:
|
||||||
def __init__(self) -> None:
|
def __init__(self) -> None:
|
||||||
self.rewriter: ModelAdapter | None = None
|
self.classifier: ModelAdapter | None = None # EXAONE: 분류 + 직접응답
|
||||||
self.reasoner: ModelAdapter | None = None
|
self.reasoner: ModelAdapter | None = None # Gemma4: 추론
|
||||||
self._health: dict[str, bool] = {"rewriter": False, "reasoner": False}
|
self._health: dict[str, bool] = {"classifier": False, "reasoner": False}
|
||||||
self._latency: dict[str, float] = {"rewriter": 0.0, "reasoner": 0.0}
|
self._latency: dict[str, float] = {"classifier": 0.0, "reasoner": 0.0}
|
||||||
self._health_task: asyncio.Task | None = None
|
self._health_task: asyncio.Task | None = None
|
||||||
|
|
||||||
def init_from_settings(self, settings) -> None:
|
def init_from_settings(self, settings) -> None:
|
||||||
self.rewriter = ModelAdapter(
|
self.classifier = ModelAdapter(
|
||||||
name="EXAONE",
|
name="EXAONE",
|
||||||
base_url=settings.exaone_base_url,
|
base_url=settings.exaone_base_url,
|
||||||
model=settings.exaone_model,
|
model=settings.exaone_model,
|
||||||
system_prompt=REWRITER_PROMPT,
|
system_prompt=CLASSIFIER_PROMPT,
|
||||||
temperature=settings.exaone_temperature,
|
temperature=0.3, # 분류는 낮은 temperature
|
||||||
timeout=settings.exaone_timeout,
|
timeout=settings.exaone_timeout,
|
||||||
)
|
)
|
||||||
self.reasoner = ModelAdapter(
|
self.reasoner = ModelAdapter(
|
||||||
@@ -68,7 +73,7 @@ class BackendRegistry:
|
|||||||
await asyncio.sleep(interval)
|
await asyncio.sleep(interval)
|
||||||
|
|
||||||
async def _check_all(self) -> None:
|
async def _check_all(self) -> None:
|
||||||
for role, adapter in [("rewriter", self.rewriter), ("reasoner", self.reasoner)]:
|
for role, adapter in [("classifier", self.classifier), ("reasoner", self.reasoner)]:
|
||||||
if not adapter:
|
if not adapter:
|
||||||
continue
|
continue
|
||||||
start = time.monotonic()
|
start = time.monotonic()
|
||||||
@@ -86,7 +91,7 @@ class BackendRegistry:
|
|||||||
|
|
||||||
def health_summary(self) -> dict:
|
def health_summary(self) -> dict:
|
||||||
result = {}
|
result = {}
|
||||||
for role, adapter in [("rewriter", self.rewriter), ("reasoner", self.reasoner)]:
|
for role, adapter in [("classifier", self.classifier), ("reasoner", self.reasoner)]:
|
||||||
if adapter:
|
if adapter:
|
||||||
result[role] = {
|
result[role] = {
|
||||||
"name": adapter.name,
|
"name": adapter.name,
|
||||||
|
|||||||
52
nanoclaude/services/conversation.py
Normal file
52
nanoclaude/services/conversation.py
Normal file
@@ -0,0 +1,52 @@
|
|||||||
|
"""Conversation — user별 최근 대화 기억."""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from collections import defaultdict
|
||||||
|
from dataclasses import dataclass, field
|
||||||
|
from time import time
|
||||||
|
|
||||||
|
MAX_HISTORY = 10 # user당 최근 대화 수
|
||||||
|
HISTORY_TTL = 3600.0 # 1시간 이후 대화 만료
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class Message:
|
||||||
|
role: str # "user" | "assistant"
|
||||||
|
content: str
|
||||||
|
timestamp: float = field(default_factory=time)
|
||||||
|
|
||||||
|
|
||||||
|
class ConversationStore:
|
||||||
|
def __init__(self) -> None:
|
||||||
|
self._history: dict[str, list[Message]] = defaultdict(list)
|
||||||
|
|
||||||
|
def add(self, user_id: str, role: str, content: str) -> None:
|
||||||
|
msgs = self._history[user_id]
|
||||||
|
msgs.append(Message(role=role, content=content))
|
||||||
|
# 최대 개수 제한
|
||||||
|
if len(msgs) > MAX_HISTORY:
|
||||||
|
self._history[user_id] = msgs[-MAX_HISTORY:]
|
||||||
|
|
||||||
|
def get(self, user_id: str) -> list[Message]:
|
||||||
|
"""만료되지 않은 최근 대화 반환."""
|
||||||
|
now = time()
|
||||||
|
msgs = self._history.get(user_id, [])
|
||||||
|
# TTL 필터
|
||||||
|
fresh = [m for m in msgs if now - m.timestamp < HISTORY_TTL]
|
||||||
|
self._history[user_id] = fresh
|
||||||
|
return fresh
|
||||||
|
|
||||||
|
def format_for_prompt(self, user_id: str) -> str:
|
||||||
|
"""EXAONE에 전달할 대화 이력 포맷."""
|
||||||
|
msgs = self.get(user_id)
|
||||||
|
if not msgs:
|
||||||
|
return ""
|
||||||
|
lines = []
|
||||||
|
for m in msgs[-6:]: # 최근 6개만
|
||||||
|
prefix = "사용자" if m.role == "user" else "이드"
|
||||||
|
lines.append(f"{prefix}: {m.content}")
|
||||||
|
return "\n".join(lines)
|
||||||
|
|
||||||
|
|
||||||
|
conversation_store = ConversationStore()
|
||||||
@@ -31,14 +31,16 @@ class ModelAdapter:
|
|||||||
self.temperature = temperature
|
self.temperature = temperature
|
||||||
self.timeout = timeout
|
self.timeout = timeout
|
||||||
|
|
||||||
async def stream_chat(self, message: str) -> AsyncGenerator[str, None]:
|
async def stream_chat(self, message: str, *, messages: list[dict] | None = None) -> AsyncGenerator[str, None]:
|
||||||
"""스트리밍 호출. content chunk를 yield."""
|
"""스트리밍 호출. content chunk를 yield. messages 직접 전달 가능."""
|
||||||
payload = {
|
if messages is None:
|
||||||
"model": self.model,
|
messages = [
|
||||||
"messages": [
|
|
||||||
{"role": "system", "content": self.system_prompt},
|
{"role": "system", "content": self.system_prompt},
|
||||||
{"role": "user", "content": message},
|
{"role": "user", "content": message},
|
||||||
],
|
]
|
||||||
|
payload = {
|
||||||
|
"model": self.model,
|
||||||
|
"messages": messages,
|
||||||
"stream": True,
|
"stream": True,
|
||||||
"temperature": self.temperature,
|
"temperature": self.temperature,
|
||||||
}
|
}
|
||||||
@@ -78,14 +80,16 @@ class ModelAdapter:
|
|||||||
logger.error("%s read timeout", self.name)
|
logger.error("%s read timeout", self.name)
|
||||||
raise
|
raise
|
||||||
|
|
||||||
async def complete_chat(self, message: str) -> str:
|
async def complete_chat(self, message: str, *, messages: list[dict] | None = None) -> str:
|
||||||
"""비스트리밍 호출. 전체 응답 텍스트 반환."""
|
"""비스트리밍 호출. 전체 응답 텍스트 반환. messages 직접 전달 가능."""
|
||||||
payload = {
|
if messages is None:
|
||||||
"model": self.model,
|
messages = [
|
||||||
"messages": [
|
|
||||||
{"role": "system", "content": self.system_prompt},
|
{"role": "system", "content": self.system_prompt},
|
||||||
{"role": "user", "content": message},
|
{"role": "user", "content": message},
|
||||||
],
|
]
|
||||||
|
payload = {
|
||||||
|
"model": self.model,
|
||||||
|
"messages": messages,
|
||||||
"stream": False,
|
"stream": False,
|
||||||
"temperature": self.temperature,
|
"temperature": self.temperature,
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,8 +1,9 @@
|
|||||||
"""Worker — 2단계 파이프라인: EXAONE rewrite → Gemma reasoning (cancel-safe + fallback)."""
|
"""Worker — EXAONE 분류 → direct/route/clarify 분기 (cancel-safe + fallback)."""
|
||||||
|
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
import asyncio
|
import asyncio
|
||||||
|
import json
|
||||||
import logging
|
import logging
|
||||||
from time import time
|
from time import time
|
||||||
|
|
||||||
@@ -10,6 +11,7 @@ from config import settings
|
|||||||
from db.database import log_completion, log_request
|
from db.database import log_completion, log_request
|
||||||
from models.schemas import JobStatus
|
from models.schemas import JobStatus
|
||||||
from services.backend_registry import backend_registry
|
from services.backend_registry import backend_registry
|
||||||
|
from services.conversation import conversation_store
|
||||||
from services.job_manager import Job, job_manager
|
from services.job_manager import Job, job_manager
|
||||||
from services.state_stream import state_stream
|
from services.state_stream import state_stream
|
||||||
from services.synology_sender import send_to_synology
|
from services.synology_sender import send_to_synology
|
||||||
@@ -17,38 +19,38 @@ from services.synology_sender import send_to_synology
|
|||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
HEARTBEAT_INTERVAL = 4.0
|
HEARTBEAT_INTERVAL = 4.0
|
||||||
REWRITE_HEARTBEAT = 2.0
|
CLASSIFY_HEARTBEAT = 2.0
|
||||||
MAX_REWRITE_LENGTH = 1000
|
MAX_PROMPT_LENGTH = 1000
|
||||||
SYNOLOGY_MAX_LEN = 1500
|
SYNOLOGY_MAX_LEN = 1500
|
||||||
|
|
||||||
|
|
||||||
async def _complete_with_heartbeat(adapter, message: str, job_id: str) -> str:
|
async def _complete_with_heartbeat(adapter, message: str, job_id: str, *, messages=None, beat_msg="처리 중...") -> str:
|
||||||
"""complete_chat + heartbeat 병행. rewrite 대기 중 사용자 체감 멈춤 방지."""
|
"""complete_chat + heartbeat 병행."""
|
||||||
result_holder: dict[str, str] = {}
|
result_holder: dict[str, str] = {}
|
||||||
exc_holder: list[Exception] = []
|
exc_holder: list[Exception] = []
|
||||||
|
|
||||||
async def call():
|
async def call():
|
||||||
try:
|
try:
|
||||||
result_holder["text"] = await adapter.complete_chat(message)
|
result_holder["text"] = await adapter.complete_chat(message, messages=messages)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
exc_holder.append(e)
|
exc_holder.append(e)
|
||||||
|
|
||||||
task = asyncio.create_task(call())
|
task = asyncio.create_task(call())
|
||||||
while not task.done():
|
while not task.done():
|
||||||
await asyncio.sleep(REWRITE_HEARTBEAT)
|
await asyncio.sleep(CLASSIFY_HEARTBEAT)
|
||||||
if not task.done():
|
if not task.done():
|
||||||
await state_stream.push(job_id, "processing", {"message": "질문을 재구성하고 있습니다..."})
|
await state_stream.push(job_id, "processing", {"message": beat_msg})
|
||||||
|
|
||||||
if exc_holder:
|
if exc_holder:
|
||||||
raise exc_holder[0]
|
raise exc_holder[0]
|
||||||
return result_holder.get("text", "")
|
return result_holder.get("text", "")
|
||||||
|
|
||||||
|
|
||||||
async def _stream_with_cancel(adapter, message: str, job: Job, collected: list[str]) -> bool:
|
async def _stream_with_cancel(adapter, message: str, job: Job, collected: list[str], *, messages=None) -> bool:
|
||||||
"""스트리밍 + cancel 체크. 정상 완료 시 True, cancel 시 False."""
|
"""스트리밍 + cancel 체크."""
|
||||||
last_heartbeat = asyncio.get_event_loop().time()
|
last_heartbeat = asyncio.get_event_loop().time()
|
||||||
|
|
||||||
async for chunk in adapter.stream_chat(message):
|
async for chunk in adapter.stream_chat(message, messages=messages):
|
||||||
if job.status == JobStatus.cancelled:
|
if job.status == JobStatus.cancelled:
|
||||||
return False
|
return False
|
||||||
collected.append(chunk)
|
collected.append(chunk)
|
||||||
@@ -62,68 +64,108 @@ async def _stream_with_cancel(adapter, message: str, job: Job, collected: list[s
|
|||||||
return True
|
return True
|
||||||
|
|
||||||
|
|
||||||
|
def _parse_classification(raw: str) -> dict:
|
||||||
|
"""EXAONE JSON 응답 파싱. 실패 시 direct fallback."""
|
||||||
|
raw = raw.strip()
|
||||||
|
# JSON 블록 추출 (```json ... ``` 감싸는 경우 대응)
|
||||||
|
if "```" in raw:
|
||||||
|
start = raw.find("{")
|
||||||
|
end = raw.rfind("}") + 1
|
||||||
|
if start >= 0 and end > start:
|
||||||
|
raw = raw[start:end]
|
||||||
|
try:
|
||||||
|
result = json.loads(raw)
|
||||||
|
if "action" in result:
|
||||||
|
return result
|
||||||
|
except json.JSONDecodeError:
|
||||||
|
pass
|
||||||
|
# JSON 파싱 실패 → direct로 취급 (raw 텍스트가 직접 응답)
|
||||||
|
return {"action": "direct", "response": raw, "prompt": ""}
|
||||||
|
|
||||||
|
|
||||||
|
async def _send_callback(job: Job, text: str) -> None:
|
||||||
|
"""Synology callback이면 전송."""
|
||||||
|
if job.callback == "synology":
|
||||||
|
if len(text) > SYNOLOGY_MAX_LEN:
|
||||||
|
text = text[:SYNOLOGY_MAX_LEN] + "\n\n...(생략됨)"
|
||||||
|
await send_to_synology(text)
|
||||||
|
|
||||||
|
|
||||||
async def run(job: Job) -> None:
|
async def run(job: Job) -> None:
|
||||||
"""EXAONE rewrite → Gemma reasoning 파이프라인 (fallback + cancel-safe)."""
|
"""EXAONE 분류 → direct/route/clarify 분기."""
|
||||||
start_time = time()
|
start_time = time()
|
||||||
rewrite_model = None
|
user_id = job.callback_meta.get("user_id", "api")
|
||||||
|
classify_model = None
|
||||||
reasoning_model = None
|
reasoning_model = None
|
||||||
rewritten_message = ""
|
rewritten_message = ""
|
||||||
|
|
||||||
try:
|
try:
|
||||||
await log_request(job.id, job.message, "pipeline", job.created_at)
|
await log_request(job.id, job.message, "classify", job.created_at)
|
||||||
except Exception:
|
except Exception:
|
||||||
logger.warning("Failed to log request for job %s", job.id, exc_info=True)
|
logger.warning("Failed to log request for job %s", job.id, exc_info=True)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# --- ACK ---
|
# --- ACK ---
|
||||||
await state_stream.push(job.id, "ack", {"message": "요청을 확인했습니다. 분석을 시작합니다."})
|
await state_stream.push(job.id, "ack", {"message": "요청을 확인했습니다."})
|
||||||
job_manager.set_status(job.id, JobStatus.processing)
|
job_manager.set_status(job.id, JobStatus.processing)
|
||||||
|
|
||||||
# --- Cancel 체크 #1 ---
|
|
||||||
if job.status == JobStatus.cancelled:
|
if job.status == JobStatus.cancelled:
|
||||||
return
|
return
|
||||||
|
|
||||||
use_pipeline = settings.pipeline_enabled and backend_registry.is_healthy("reasoner")
|
classify_model = backend_registry.classifier.model
|
||||||
|
|
||||||
|
# --- 대화 이력 포함하여 분류 요청 ---
|
||||||
|
history = conversation_store.format_for_prompt(user_id)
|
||||||
|
classify_input = job.message
|
||||||
|
if history:
|
||||||
|
classify_input = f"[대화 이력]\n{history}\n\n[현재 메시지]\n{job.message}"
|
||||||
|
|
||||||
|
await state_stream.push(job.id, "processing", {"message": "메시지를 분석하고 있습니다..."})
|
||||||
|
classify_start = time()
|
||||||
|
|
||||||
|
try:
|
||||||
|
raw_result = await _complete_with_heartbeat(
|
||||||
|
backend_registry.classifier, classify_input, job.id,
|
||||||
|
beat_msg="메시지를 분석하고 있습니다..."
|
||||||
|
)
|
||||||
|
except Exception:
|
||||||
|
logger.warning("Classification failed for job %s, falling back to direct", job.id)
|
||||||
|
raw_result = ""
|
||||||
|
|
||||||
|
classify_latency = (time() - classify_start) * 1000
|
||||||
|
classification = _parse_classification(raw_result)
|
||||||
|
action = classification.get("action", "direct")
|
||||||
|
response_text = classification.get("response", "")
|
||||||
|
route_prompt = classification.get("prompt", "")
|
||||||
|
|
||||||
|
logger.info("Job %s classified as '%s'", job.id, action)
|
||||||
|
|
||||||
|
# 대화 기록: 사용자 메시지
|
||||||
|
conversation_store.add(user_id, "user", job.message)
|
||||||
|
|
||||||
collected: list[str] = []
|
collected: list[str] = []
|
||||||
|
|
||||||
if not use_pipeline:
|
if job.status == JobStatus.cancelled:
|
||||||
# === EXAONE 단독 모드 (Phase 1 fallback) ===
|
return
|
||||||
rewrite_model = backend_registry.rewriter.model
|
|
||||||
await state_stream.push(job.id, "processing", {"message": "EXAONE 모델이 응답을 생성하고 있습니다..."})
|
|
||||||
|
|
||||||
ok = await _stream_with_cancel(backend_registry.rewriter, job.message, job, collected)
|
if action == "clarify":
|
||||||
if not ok:
|
# === CLARIFY: 추가 질문 ===
|
||||||
return
|
collected.append(response_text)
|
||||||
else:
|
await state_stream.push(job.id, "result", {"content": response_text})
|
||||||
# === 파이프라인 모드: EXAONE rewrite → Gemma reasoning ===
|
conversation_store.add(user_id, "assistant", response_text)
|
||||||
rewrite_model = backend_registry.rewriter.model
|
|
||||||
|
elif action == "route" and settings.pipeline_enabled and backend_registry.is_healthy("reasoner"):
|
||||||
|
# === ROUTE: Gemma reasoning ===
|
||||||
reasoning_model = backend_registry.reasoner.model
|
reasoning_model = backend_registry.reasoner.model
|
||||||
|
rewritten_message = (route_prompt or job.message)[:MAX_PROMPT_LENGTH]
|
||||||
# --- Rewrite ---
|
|
||||||
await state_stream.push(job.id, "processing", {"message": "질문을 재구성하고 있습니다..."})
|
|
||||||
rewrite_start = time()
|
|
||||||
|
|
||||||
try:
|
|
||||||
rewritten_message = await _complete_with_heartbeat(
|
|
||||||
backend_registry.rewriter, job.message, job.id
|
|
||||||
)
|
|
||||||
rewritten_message = rewritten_message[:MAX_REWRITE_LENGTH]
|
|
||||||
except Exception:
|
|
||||||
logger.warning("Rewrite failed for job %s, using original message", job.id)
|
|
||||||
rewritten_message = job.message
|
|
||||||
|
|
||||||
rewrite_latency = (time() - rewrite_start) * 1000
|
|
||||||
job.rewritten_message = rewritten_message
|
job.rewritten_message = rewritten_message
|
||||||
|
|
||||||
# --- Rewrite 결과 SSE 노출 (Synology에서는 숨김) ---
|
|
||||||
if job.callback != "synology":
|
if job.callback != "synology":
|
||||||
await state_stream.push(job.id, "rewrite", {"content": rewritten_message})
|
await state_stream.push(job.id, "rewrite", {"content": rewritten_message})
|
||||||
|
|
||||||
# --- Cancel 체크 #2 ---
|
|
||||||
if job.status == JobStatus.cancelled:
|
if job.status == JobStatus.cancelled:
|
||||||
return
|
return
|
||||||
|
|
||||||
# --- Reasoning ---
|
|
||||||
await state_stream.push(job.id, "processing", {"message": "Gemma 4가 응답을 생성하고 있습니다..."})
|
await state_stream.push(job.id, "processing", {"message": "Gemma 4가 응답을 생성하고 있습니다..."})
|
||||||
|
|
||||||
try:
|
try:
|
||||||
@@ -131,47 +173,55 @@ async def run(job: Job) -> None:
|
|||||||
if not ok:
|
if not ok:
|
||||||
return
|
return
|
||||||
except Exception:
|
except Exception:
|
||||||
# Gemma streaming 중간 실패 → EXAONE fallback
|
logger.warning("Reasoner failed for job %s, falling back to EXAONE", job.id, exc_info=True)
|
||||||
logger.warning("Reasoner failed for job %s, falling back to rewriter", job.id, exc_info=True)
|
|
||||||
|
|
||||||
if job.status == JobStatus.cancelled:
|
if job.status == JobStatus.cancelled:
|
||||||
return
|
return
|
||||||
|
|
||||||
await state_stream.push(job.id, "processing", {"message": "모델 전환 중..."})
|
await state_stream.push(job.id, "processing", {"message": "모델 전환 중..."})
|
||||||
reasoning_model = rewrite_model # fallback 기록
|
reasoning_model = classify_model
|
||||||
|
ok = await _stream_with_cancel(backend_registry.classifier, job.message, job, collected)
|
||||||
ok = await _stream_with_cancel(backend_registry.rewriter, job.message, job, collected)
|
|
||||||
if not ok:
|
if not ok:
|
||||||
return
|
return
|
||||||
|
|
||||||
|
if collected:
|
||||||
|
conversation_store.add(user_id, "assistant", "".join(collected))
|
||||||
|
|
||||||
|
else:
|
||||||
|
# === DIRECT: EXAONE 직접 응답 ===
|
||||||
|
if response_text:
|
||||||
|
# 분류기가 이미 응답을 생성함
|
||||||
|
collected.append(response_text)
|
||||||
|
await state_stream.push(job.id, "result", {"content": response_text})
|
||||||
|
else:
|
||||||
|
# 분류 실패 → EXAONE 스트리밍으로 직접 응답
|
||||||
|
await state_stream.push(job.id, "processing", {"message": "응답을 생성하고 있습니다..."})
|
||||||
|
ok = await _stream_with_cancel(backend_registry.classifier, job.message, job, collected)
|
||||||
|
if not ok:
|
||||||
|
return
|
||||||
|
|
||||||
|
if collected:
|
||||||
|
conversation_store.add(user_id, "assistant", "".join(collected))
|
||||||
|
|
||||||
# --- Complete ---
|
# --- Complete ---
|
||||||
if not collected:
|
if not collected:
|
||||||
job_manager.set_status(job.id, JobStatus.failed)
|
job_manager.set_status(job.id, JobStatus.failed)
|
||||||
await state_stream.push(job.id, "error", {"message": "응답을 받지 못했습니다."})
|
await state_stream.push(job.id, "error", {"message": "응답을 받지 못했습니다."})
|
||||||
status = "failed"
|
status = "failed"
|
||||||
if job.callback == "synology":
|
await _send_callback(job, "⚠️ 응답을 받지 못했습니다. 다시 시도해주세요.")
|
||||||
await send_to_synology("⚠️ 응답을 받지 못했습니다. 다시 시도해주세요.")
|
|
||||||
else:
|
else:
|
||||||
job_manager.set_status(job.id, JobStatus.completed)
|
job_manager.set_status(job.id, JobStatus.completed)
|
||||||
await state_stream.push(job.id, "done", {"message": "완료"})
|
await state_stream.push(job.id, "done", {"message": "완료"})
|
||||||
status = "completed"
|
status = "completed"
|
||||||
# Synology callback: 결과 전송
|
await _send_callback(job, "".join(collected))
|
||||||
if job.callback == "synology":
|
|
||||||
full_response = "".join(collected)
|
|
||||||
if len(full_response) > SYNOLOGY_MAX_LEN:
|
|
||||||
full_response = full_response[:SYNOLOGY_MAX_LEN] + "\n\n...(생략됨)"
|
|
||||||
await send_to_synology(full_response)
|
|
||||||
|
|
||||||
# --- DB 로깅 ---
|
# --- DB 로깅 ---
|
||||||
latency_ms = (time() - start_time) * 1000
|
latency_ms = (time() - start_time) * 1000
|
||||||
response_text = "".join(collected)
|
|
||||||
try:
|
try:
|
||||||
await log_completion(
|
await log_completion(
|
||||||
job.id, status, len(response_text), latency_ms, time(),
|
job.id, status, len("".join(collected)), latency_ms, time(),
|
||||||
rewrite_model=rewrite_model,
|
rewrite_model=classify_model,
|
||||||
reasoning_model=reasoning_model,
|
reasoning_model=reasoning_model,
|
||||||
rewritten_message=rewritten_message,
|
rewritten_message=rewritten_message,
|
||||||
rewrite_latency_ms=rewrite_latency if use_pipeline else 0,
|
rewrite_latency_ms=classify_latency,
|
||||||
)
|
)
|
||||||
except Exception:
|
except Exception:
|
||||||
logger.warning("Failed to log completion for job %s", job.id, exc_info=True)
|
logger.warning("Failed to log completion for job %s", job.id, exc_info=True)
|
||||||
|
|||||||
Reference in New Issue
Block a user