From 21f68698983c456086f8f656ec6e4ee5cc58a14a Mon Sep 17 00:00:00 2001 From: Hyungi Ahn Date: Mon, 6 Apr 2026 12:40:39 +0900 Subject: [PATCH] =?UTF-8?q?feat:=20EXAONE=20=EB=B6=84=EB=A5=98=EA=B8=B0=20?= =?UTF-8?q?=E2=80=94=20direct/route/clarify=20=EB=9D=BC=EC=9A=B0=ED=8C=85?= =?UTF-8?q?=20+=20=EB=8C=80=ED=99=94=20=EA=B8=B0=EC=96=B5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - EXAONE: 분류기+프롬프트엔지니어+직접응답 (JSON 출력) - 간단한 질문은 EXAONE이 직접 답변 (파이프라인 스킵) - 복잡한 질문은 AI 최적화 프롬프트로 Gemma에 전달 - 모호한 질문은 사용자에게 추가 질문 (clarify) - user별 최근 대화 기억 (최대 10개, 1시간 TTL) - ModelAdapter: messages 직접 전달 옵션 추가 Co-Authored-By: Claude Opus 4.6 (1M context) --- nanoclaude/services/backend_registry.py | 47 ++++--- nanoclaude/services/conversation.py | 52 +++++++ nanoclaude/services/model_adapter.py | 28 ++-- nanoclaude/services/worker.py | 178 +++++++++++++++--------- 4 files changed, 208 insertions(+), 97 deletions(-) create mode 100644 nanoclaude/services/conversation.py diff --git a/nanoclaude/services/backend_registry.py b/nanoclaude/services/backend_registry.py index aacc134..ac196b7 100644 --- a/nanoclaude/services/backend_registry.py +++ b/nanoclaude/services/backend_registry.py @@ -10,40 +10,45 @@ from services.model_adapter import ModelAdapter logger = logging.getLogger(__name__) -REWRITER_PROMPT = ( - "너는 질문 재구성 전문가다. " - "사용자의 질문을 분석하여 의도를 명확히 하고, 더 명확한 질문으로 재작성하라. " - "재구성된 질문만 출력하라. 부연 설명이나 답변은 절대 하지 마라. " - "인사, 잡담, 간단한 질문, 1~2문장 질문은 원문 그대로 출력하라. " - "복잡하거나 모호한 질문만 재구성하라." -) +CLASSIFIER_PROMPT = """\ +너는 AI 라우터다. 사용자 메시지를 분석하여 JSON으로 응답하라. +반드시 아래 3가지 action 중 하나를 선택하라: + +1. "direct" — 인사, 잡담, 간단한 질문, 자기소개 요청 등. 네가 직접 답변한다. +2. "route" — 복잡한 질문, 분석, 설명, 코딩 등. 추론 모델에게 넘긴다. 이때 prompt 필드에 추론 모델이 이해하기 좋게 정리된 프롬프트를 작성하라. +3. "clarify" — 질문이 모호하거나 정보가 부족할 때. 사용자에게 추가 질문한다. + +반드시 아래 JSON 형식으로만 응답하라. JSON 외 텍스트는 절대 출력하지 마라: +{"action": "direct|route|clarify", "response": "direct/clarify일 때 사용자에게 보낼 텍스트", "prompt": "route일 때 추론 모델에게 보낼 프롬프트"} + +너의 이름은 '이드'이고, 상냥하고 친근하게 대화한다. +너는 GPU 서버의 EXAONE 모델과 맥미니의 Gemma4 모델로 구성된 NanoClaude 파이프라인에서 돌아간다. +대화 이력이 있으면 맥락을 고려하라.\ +""" REASONER_PROMPT = ( "너는 '이드'라는 이름의 상냥하고 친근한 AI 어시스턴트야. " - "너는 GPU 서버의 EXAONE 모델(질문 정리)과 맥미니의 Gemma4 모델(답변 생성)로 구성된 " - "NanoClaude 파이프라인 위에서 돌아가고 있어. " - "간결하고 자연스럽게 대화해. 인사에는 짧게 인사로 답하고, " + "간결하고 자연스럽게 대화해. " "질문에는 핵심만 명확하게 답해. " - "불필요한 구조화(번호 매기기, 헤더, 마크다운)는 피하고, 대화하듯 편하게 답변해. " - "너 자신에 대한 질문에는 솔직하게 답해." + "불필요한 구조화(번호 매기기, 헤더, 마크다운)는 피하고, 대화하듯 편하게 답변해." ) class BackendRegistry: def __init__(self) -> None: - self.rewriter: ModelAdapter | None = None - self.reasoner: ModelAdapter | None = None - self._health: dict[str, bool] = {"rewriter": False, "reasoner": False} - self._latency: dict[str, float] = {"rewriter": 0.0, "reasoner": 0.0} + self.classifier: ModelAdapter | None = None # EXAONE: 분류 + 직접응답 + self.reasoner: ModelAdapter | None = None # Gemma4: 추론 + self._health: dict[str, bool] = {"classifier": False, "reasoner": False} + self._latency: dict[str, float] = {"classifier": 0.0, "reasoner": 0.0} self._health_task: asyncio.Task | None = None def init_from_settings(self, settings) -> None: - self.rewriter = ModelAdapter( + self.classifier = ModelAdapter( name="EXAONE", base_url=settings.exaone_base_url, model=settings.exaone_model, - system_prompt=REWRITER_PROMPT, - temperature=settings.exaone_temperature, + system_prompt=CLASSIFIER_PROMPT, + temperature=0.3, # 분류는 낮은 temperature timeout=settings.exaone_timeout, ) self.reasoner = ModelAdapter( @@ -68,7 +73,7 @@ class BackendRegistry: await asyncio.sleep(interval) async def _check_all(self) -> None: - for role, adapter in [("rewriter", self.rewriter), ("reasoner", self.reasoner)]: + for role, adapter in [("classifier", self.classifier), ("reasoner", self.reasoner)]: if not adapter: continue start = time.monotonic() @@ -86,7 +91,7 @@ class BackendRegistry: def health_summary(self) -> dict: result = {} - for role, adapter in [("rewriter", self.rewriter), ("reasoner", self.reasoner)]: + for role, adapter in [("classifier", self.classifier), ("reasoner", self.reasoner)]: if adapter: result[role] = { "name": adapter.name, diff --git a/nanoclaude/services/conversation.py b/nanoclaude/services/conversation.py new file mode 100644 index 0000000..cb719c8 --- /dev/null +++ b/nanoclaude/services/conversation.py @@ -0,0 +1,52 @@ +"""Conversation — user별 최근 대화 기억.""" + +from __future__ import annotations + +from collections import defaultdict +from dataclasses import dataclass, field +from time import time + +MAX_HISTORY = 10 # user당 최근 대화 수 +HISTORY_TTL = 3600.0 # 1시간 이후 대화 만료 + + +@dataclass +class Message: + role: str # "user" | "assistant" + content: str + timestamp: float = field(default_factory=time) + + +class ConversationStore: + def __init__(self) -> None: + self._history: dict[str, list[Message]] = defaultdict(list) + + def add(self, user_id: str, role: str, content: str) -> None: + msgs = self._history[user_id] + msgs.append(Message(role=role, content=content)) + # 최대 개수 제한 + if len(msgs) > MAX_HISTORY: + self._history[user_id] = msgs[-MAX_HISTORY:] + + def get(self, user_id: str) -> list[Message]: + """만료되지 않은 최근 대화 반환.""" + now = time() + msgs = self._history.get(user_id, []) + # TTL 필터 + fresh = [m for m in msgs if now - m.timestamp < HISTORY_TTL] + self._history[user_id] = fresh + return fresh + + def format_for_prompt(self, user_id: str) -> str: + """EXAONE에 전달할 대화 이력 포맷.""" + msgs = self.get(user_id) + if not msgs: + return "" + lines = [] + for m in msgs[-6:]: # 최근 6개만 + prefix = "사용자" if m.role == "user" else "이드" + lines.append(f"{prefix}: {m.content}") + return "\n".join(lines) + + +conversation_store = ConversationStore() diff --git a/nanoclaude/services/model_adapter.py b/nanoclaude/services/model_adapter.py index 7885e3a..5b289c2 100644 --- a/nanoclaude/services/model_adapter.py +++ b/nanoclaude/services/model_adapter.py @@ -31,14 +31,16 @@ class ModelAdapter: self.temperature = temperature self.timeout = timeout - async def stream_chat(self, message: str) -> AsyncGenerator[str, None]: - """스트리밍 호출. content chunk를 yield.""" - payload = { - "model": self.model, - "messages": [ + async def stream_chat(self, message: str, *, messages: list[dict] | None = None) -> AsyncGenerator[str, None]: + """스트리밍 호출. content chunk를 yield. messages 직접 전달 가능.""" + if messages is None: + messages = [ {"role": "system", "content": self.system_prompt}, {"role": "user", "content": message}, - ], + ] + payload = { + "model": self.model, + "messages": messages, "stream": True, "temperature": self.temperature, } @@ -78,14 +80,16 @@ class ModelAdapter: logger.error("%s read timeout", self.name) raise - async def complete_chat(self, message: str) -> str: - """비스트리밍 호출. 전체 응답 텍스트 반환.""" - payload = { - "model": self.model, - "messages": [ + async def complete_chat(self, message: str, *, messages: list[dict] | None = None) -> str: + """비스트리밍 호출. 전체 응답 텍스트 반환. messages 직접 전달 가능.""" + if messages is None: + messages = [ {"role": "system", "content": self.system_prompt}, {"role": "user", "content": message}, - ], + ] + payload = { + "model": self.model, + "messages": messages, "stream": False, "temperature": self.temperature, } diff --git a/nanoclaude/services/worker.py b/nanoclaude/services/worker.py index 079ce3b..8892aaa 100644 --- a/nanoclaude/services/worker.py +++ b/nanoclaude/services/worker.py @@ -1,8 +1,9 @@ -"""Worker — 2단계 파이프라인: EXAONE rewrite → Gemma reasoning (cancel-safe + fallback).""" +"""Worker — EXAONE 분류 → direct/route/clarify 분기 (cancel-safe + fallback).""" from __future__ import annotations import asyncio +import json import logging from time import time @@ -10,6 +11,7 @@ from config import settings from db.database import log_completion, log_request from models.schemas import JobStatus from services.backend_registry import backend_registry +from services.conversation import conversation_store from services.job_manager import Job, job_manager from services.state_stream import state_stream from services.synology_sender import send_to_synology @@ -17,38 +19,38 @@ from services.synology_sender import send_to_synology logger = logging.getLogger(__name__) HEARTBEAT_INTERVAL = 4.0 -REWRITE_HEARTBEAT = 2.0 -MAX_REWRITE_LENGTH = 1000 +CLASSIFY_HEARTBEAT = 2.0 +MAX_PROMPT_LENGTH = 1000 SYNOLOGY_MAX_LEN = 1500 -async def _complete_with_heartbeat(adapter, message: str, job_id: str) -> str: - """complete_chat + heartbeat 병행. rewrite 대기 중 사용자 체감 멈춤 방지.""" +async def _complete_with_heartbeat(adapter, message: str, job_id: str, *, messages=None, beat_msg="처리 중...") -> str: + """complete_chat + heartbeat 병행.""" result_holder: dict[str, str] = {} exc_holder: list[Exception] = [] async def call(): try: - result_holder["text"] = await adapter.complete_chat(message) + result_holder["text"] = await adapter.complete_chat(message, messages=messages) except Exception as e: exc_holder.append(e) task = asyncio.create_task(call()) while not task.done(): - await asyncio.sleep(REWRITE_HEARTBEAT) + await asyncio.sleep(CLASSIFY_HEARTBEAT) if not task.done(): - await state_stream.push(job_id, "processing", {"message": "질문을 재구성하고 있습니다..."}) + await state_stream.push(job_id, "processing", {"message": beat_msg}) if exc_holder: raise exc_holder[0] return result_holder.get("text", "") -async def _stream_with_cancel(adapter, message: str, job: Job, collected: list[str]) -> bool: - """스트리밍 + cancel 체크. 정상 완료 시 True, cancel 시 False.""" +async def _stream_with_cancel(adapter, message: str, job: Job, collected: list[str], *, messages=None) -> bool: + """스트리밍 + cancel 체크.""" last_heartbeat = asyncio.get_event_loop().time() - async for chunk in adapter.stream_chat(message): + async for chunk in adapter.stream_chat(message, messages=messages): if job.status == JobStatus.cancelled: return False collected.append(chunk) @@ -62,68 +64,108 @@ async def _stream_with_cancel(adapter, message: str, job: Job, collected: list[s return True +def _parse_classification(raw: str) -> dict: + """EXAONE JSON 응답 파싱. 실패 시 direct fallback.""" + raw = raw.strip() + # JSON 블록 추출 (```json ... ``` 감싸는 경우 대응) + if "```" in raw: + start = raw.find("{") + end = raw.rfind("}") + 1 + if start >= 0 and end > start: + raw = raw[start:end] + try: + result = json.loads(raw) + if "action" in result: + return result + except json.JSONDecodeError: + pass + # JSON 파싱 실패 → direct로 취급 (raw 텍스트가 직접 응답) + return {"action": "direct", "response": raw, "prompt": ""} + + +async def _send_callback(job: Job, text: str) -> None: + """Synology callback이면 전송.""" + if job.callback == "synology": + if len(text) > SYNOLOGY_MAX_LEN: + text = text[:SYNOLOGY_MAX_LEN] + "\n\n...(생략됨)" + await send_to_synology(text) + + async def run(job: Job) -> None: - """EXAONE rewrite → Gemma reasoning 파이프라인 (fallback + cancel-safe).""" + """EXAONE 분류 → direct/route/clarify 분기.""" start_time = time() - rewrite_model = None + user_id = job.callback_meta.get("user_id", "api") + classify_model = None reasoning_model = None rewritten_message = "" try: - await log_request(job.id, job.message, "pipeline", job.created_at) + await log_request(job.id, job.message, "classify", job.created_at) except Exception: logger.warning("Failed to log request for job %s", job.id, exc_info=True) try: # --- ACK --- - await state_stream.push(job.id, "ack", {"message": "요청을 확인했습니다. 분석을 시작합니다."}) + await state_stream.push(job.id, "ack", {"message": "요청을 확인했습니다."}) job_manager.set_status(job.id, JobStatus.processing) - # --- Cancel 체크 #1 --- if job.status == JobStatus.cancelled: return - use_pipeline = settings.pipeline_enabled and backend_registry.is_healthy("reasoner") + classify_model = backend_registry.classifier.model + + # --- 대화 이력 포함하여 분류 요청 --- + history = conversation_store.format_for_prompt(user_id) + classify_input = job.message + if history: + classify_input = f"[대화 이력]\n{history}\n\n[현재 메시지]\n{job.message}" + + await state_stream.push(job.id, "processing", {"message": "메시지를 분석하고 있습니다..."}) + classify_start = time() + + try: + raw_result = await _complete_with_heartbeat( + backend_registry.classifier, classify_input, job.id, + beat_msg="메시지를 분석하고 있습니다..." + ) + except Exception: + logger.warning("Classification failed for job %s, falling back to direct", job.id) + raw_result = "" + + classify_latency = (time() - classify_start) * 1000 + classification = _parse_classification(raw_result) + action = classification.get("action", "direct") + response_text = classification.get("response", "") + route_prompt = classification.get("prompt", "") + + logger.info("Job %s classified as '%s'", job.id, action) + + # 대화 기록: 사용자 메시지 + conversation_store.add(user_id, "user", job.message) + collected: list[str] = [] - if not use_pipeline: - # === EXAONE 단독 모드 (Phase 1 fallback) === - rewrite_model = backend_registry.rewriter.model - await state_stream.push(job.id, "processing", {"message": "EXAONE 모델이 응답을 생성하고 있습니다..."}) + if job.status == JobStatus.cancelled: + return - ok = await _stream_with_cancel(backend_registry.rewriter, job.message, job, collected) - if not ok: - return - else: - # === 파이프라인 모드: EXAONE rewrite → Gemma reasoning === - rewrite_model = backend_registry.rewriter.model + if action == "clarify": + # === CLARIFY: 추가 질문 === + collected.append(response_text) + await state_stream.push(job.id, "result", {"content": response_text}) + conversation_store.add(user_id, "assistant", response_text) + + elif action == "route" and settings.pipeline_enabled and backend_registry.is_healthy("reasoner"): + # === ROUTE: Gemma reasoning === reasoning_model = backend_registry.reasoner.model - - # --- Rewrite --- - await state_stream.push(job.id, "processing", {"message": "질문을 재구성하고 있습니다..."}) - rewrite_start = time() - - try: - rewritten_message = await _complete_with_heartbeat( - backend_registry.rewriter, job.message, job.id - ) - rewritten_message = rewritten_message[:MAX_REWRITE_LENGTH] - except Exception: - logger.warning("Rewrite failed for job %s, using original message", job.id) - rewritten_message = job.message - - rewrite_latency = (time() - rewrite_start) * 1000 + rewritten_message = (route_prompt or job.message)[:MAX_PROMPT_LENGTH] job.rewritten_message = rewritten_message - # --- Rewrite 결과 SSE 노출 (Synology에서는 숨김) --- if job.callback != "synology": await state_stream.push(job.id, "rewrite", {"content": rewritten_message}) - # --- Cancel 체크 #2 --- if job.status == JobStatus.cancelled: return - # --- Reasoning --- await state_stream.push(job.id, "processing", {"message": "Gemma 4가 응답을 생성하고 있습니다..."}) try: @@ -131,47 +173,55 @@ async def run(job: Job) -> None: if not ok: return except Exception: - # Gemma streaming 중간 실패 → EXAONE fallback - logger.warning("Reasoner failed for job %s, falling back to rewriter", job.id, exc_info=True) - + logger.warning("Reasoner failed for job %s, falling back to EXAONE", job.id, exc_info=True) if job.status == JobStatus.cancelled: return - await state_stream.push(job.id, "processing", {"message": "모델 전환 중..."}) - reasoning_model = rewrite_model # fallback 기록 - - ok = await _stream_with_cancel(backend_registry.rewriter, job.message, job, collected) + reasoning_model = classify_model + ok = await _stream_with_cancel(backend_registry.classifier, job.message, job, collected) if not ok: return + if collected: + conversation_store.add(user_id, "assistant", "".join(collected)) + + else: + # === DIRECT: EXAONE 직접 응답 === + if response_text: + # 분류기가 이미 응답을 생성함 + collected.append(response_text) + await state_stream.push(job.id, "result", {"content": response_text}) + else: + # 분류 실패 → EXAONE 스트리밍으로 직접 응답 + await state_stream.push(job.id, "processing", {"message": "응답을 생성하고 있습니다..."}) + ok = await _stream_with_cancel(backend_registry.classifier, job.message, job, collected) + if not ok: + return + + if collected: + conversation_store.add(user_id, "assistant", "".join(collected)) + # --- Complete --- if not collected: job_manager.set_status(job.id, JobStatus.failed) await state_stream.push(job.id, "error", {"message": "응답을 받지 못했습니다."}) status = "failed" - if job.callback == "synology": - await send_to_synology("⚠️ 응답을 받지 못했습니다. 다시 시도해주세요.") + await _send_callback(job, "⚠️ 응답을 받지 못했습니다. 다시 시도해주세요.") else: job_manager.set_status(job.id, JobStatus.completed) await state_stream.push(job.id, "done", {"message": "완료"}) status = "completed" - # Synology callback: 결과 전송 - if job.callback == "synology": - full_response = "".join(collected) - if len(full_response) > SYNOLOGY_MAX_LEN: - full_response = full_response[:SYNOLOGY_MAX_LEN] + "\n\n...(생략됨)" - await send_to_synology(full_response) + await _send_callback(job, "".join(collected)) # --- DB 로깅 --- latency_ms = (time() - start_time) * 1000 - response_text = "".join(collected) try: await log_completion( - job.id, status, len(response_text), latency_ms, time(), - rewrite_model=rewrite_model, + job.id, status, len("".join(collected)), latency_ms, time(), + rewrite_model=classify_model, reasoning_model=reasoning_model, rewritten_message=rewritten_message, - rewrite_latency_ms=rewrite_latency if use_pipeline else 0, + rewrite_latency_ms=classify_latency, ) except Exception: logger.warning("Failed to log completion for job %s", job.id, exc_info=True)