diff --git a/nanoclaude/db/database.py b/nanoclaude/db/database.py index 25d27ee..f988cc4 100644 --- a/nanoclaude/db/database.py +++ b/nanoclaude/db/database.py @@ -1,4 +1,6 @@ -"""aiosqlite DB — 요청/응답 로깅 및 메트릭.""" +"""aiosqlite DB — 요청/응답 로깅 + 대화 영구화.""" + +from time import time import aiosqlite @@ -23,6 +25,17 @@ CREATE TABLE IF NOT EXISTS request_logs ( CREATE INDEX IF NOT EXISTS idx_logs_job ON request_logs(job_id); CREATE INDEX IF NOT EXISTS idx_logs_created ON request_logs(created_at); + +CREATE TABLE IF NOT EXISTS conversation_messages ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + user_id TEXT NOT NULL, + role TEXT NOT NULL, + content TEXT NOT NULL, + created_at REAL NOT NULL +); + +CREATE INDEX IF NOT EXISTS idx_conv_user ON conversation_messages(user_id, created_at DESC); +CREATE INDEX IF NOT EXISTS idx_conv_created ON conversation_messages(created_at); """ # Phase 1 → Phase 2 마이그레이션 (이미 존재하면 무시) @@ -55,6 +68,39 @@ async def log_request(job_id: str, message: str, model: str, created_at: float): await db.commit() +async def save_conversation_message(user_id: str, role: str, content: str, created_at: float): + """대화 메시지 저장.""" + async with aiosqlite.connect(settings.db_path) as db: + await db.execute( + "INSERT INTO conversation_messages (user_id, role, content, created_at) VALUES (?, ?, ?, ?)", + (user_id, role, content, created_at), + ) + await db.commit() + + +async def load_conversation_messages(user_id: str, since: float, limit: int = 40) -> list[dict]: + """user_id의 최근 메시지 로드 (오래된 순).""" + async with aiosqlite.connect(settings.db_path) as db: + db.row_factory = aiosqlite.Row + cursor = await db.execute( + "SELECT role, content, created_at FROM conversation_messages " + "WHERE user_id=? AND created_at >= ? ORDER BY created_at DESC LIMIT ?", + (user_id, since, limit), + ) + rows = await cursor.fetchall() + # 오래된 순으로 뒤집기 + return [{"role": r["role"], "content": r["content"], "created_at": r["created_at"]} for r in reversed(rows)] + + +async def cleanup_old_conversations(days: int = 7) -> int: + """N일 이상 오래된 대화 삭제. 삭제된 행 수 반환.""" + cutoff = time() - (days * 86400) + async with aiosqlite.connect(settings.db_path) as db: + cursor = await db.execute("DELETE FROM conversation_messages WHERE created_at < ?", (cutoff,)) + await db.commit() + return cursor.rowcount + + async def log_completion( job_id: str, status: str, diff --git a/nanoclaude/main.py b/nanoclaude/main.py index 1de9448..735b745 100644 --- a/nanoclaude/main.py +++ b/nanoclaude/main.py @@ -24,6 +24,15 @@ logging.basicConfig( @asynccontextmanager async def lifespan(app: FastAPI): await init_db() + # 7일 이상 오래된 대화 정리 + try: + from db.database import cleanup_old_conversations + deleted = await cleanup_old_conversations(days=7) + if deleted: + import logging + logging.getLogger(__name__).info("Cleaned up %d old conversation messages", deleted) + except Exception: + pass backend_registry.init_from_settings(settings) backend_registry.start_health_loop(settings.health_check_interval) jq_module.init_queue(settings.max_concurrent_jobs) diff --git a/nanoclaude/services/conversation.py b/nanoclaude/services/conversation.py index d6c1676..67cc8f4 100644 --- a/nanoclaude/services/conversation.py +++ b/nanoclaude/services/conversation.py @@ -1,14 +1,18 @@ -"""Conversation — user별 최근 대화 기억.""" +"""Conversation — user별 최근 대화 기억 (메모리 캐시 + sqlite write-through).""" from __future__ import annotations +import logging from collections import defaultdict from dataclasses import dataclass, field from time import time -MAX_HISTORY = 10 # user당 최근 대화 수 -HISTORY_TTL = 3600.0 # 1시간 이후 대화 만료 +logger = logging.getLogger(__name__) + +MAX_HISTORY = 10 # 메모리 캐시 user당 최근 대화 수 +HISTORY_TTL = 3600.0 # 1시간 이후 메모리 만료 (DB는 7일 보관) DRAFT_TTL = 300.0 # pending_draft 5분 만료 +DB_LOAD_LIMIT = 40 # DB lazy load 시 최대 메시지 수 @dataclass @@ -21,27 +25,56 @@ class Message: class ConversationStore: def __init__(self) -> None: self._history: dict[str, list[Message]] = defaultdict(list) + self._loaded_users: set[str] = set() # DB lazy load 완료한 user_id self._pending_drafts: dict[str, tuple[dict, float]] = {} # user_id → (draft_data, created_at) - def add(self, user_id: str, role: str, content: str) -> None: + async def add(self, user_id: str, role: str, content: str) -> None: + """메모리 + DB write-through.""" + from db.database import save_conversation_message + msg = Message(role=role, content=content) + + # 메모리 캐시 msgs = self._history[user_id] - msgs.append(Message(role=role, content=content)) - # 최대 개수 제한 + msgs.append(msg) if len(msgs) > MAX_HISTORY: self._history[user_id] = msgs[-MAX_HISTORY:] - def get(self, user_id: str) -> list[Message]: - """만료되지 않은 최근 대화 반환.""" + # DB write-through (실패해도 메모리에는 남음) + try: + await save_conversation_message(user_id, role, content, msg.timestamp) + except Exception: + logger.warning("Failed to persist conversation for %s", user_id, exc_info=True) + + async def get(self, user_id: str) -> list[Message]: + """만료되지 않은 최근 대화. 메모리 비어있으면 DB lazy load.""" + from db.database import load_conversation_messages now = time() + + # 메모리 캐시에서 fresh 한 것만 msgs = self._history.get(user_id, []) - # TTL 필터 fresh = [m for m in msgs if now - m.timestamp < HISTORY_TTL] + + # 메모리 비어있고 DB lazy load 안 했으면 DB에서 가져옴 + if not fresh and user_id not in self._loaded_users: + self._loaded_users.add(user_id) + since = now - HISTORY_TTL + try: + db_msgs = await load_conversation_messages(user_id, since, limit=DB_LOAD_LIMIT) + fresh = [ + Message(role=m["role"], content=m["content"], timestamp=m["created_at"]) + for m in db_msgs + ] + if fresh: + logger.info("Loaded %d messages from DB for %s", len(fresh), user_id) + except Exception: + logger.warning("Failed to load conversation from DB for %s", user_id, exc_info=True) + self._history[user_id] = fresh return fresh - def format_for_prompt(self, user_id: str) -> str: + async def format_for_prompt(self, user_id: str) -> str: """EXAONE에 전달할 대화 이력 포맷.""" - msgs = self.get(user_id) + msgs = await self.get(user_id) lines = [] for m in msgs[-6:]: # 최근 6개만 prefix = "사용자" if m.role == "user" else "이드" diff --git a/nanoclaude/services/worker.py b/nanoclaude/services/worker.py index 8d55679..696c6f2 100644 --- a/nanoclaude/services/worker.py +++ b/nanoclaude/services/worker.py @@ -240,7 +240,7 @@ async def run(job: Job) -> None: kst = timezone(timedelta(hours=9)) now_kst = datetime.now(kst) now_str = now_kst.strftime("%Y년 %m월 %d일 %H:%M (%A) KST") - history = conversation_store.format_for_prompt(user_id) + history = await conversation_store.format_for_prompt(user_id) classify_input = f"[현재 시간]\n{now_str}\n\n" if history: classify_input += f"[대화 이력]\n{history}\n\n" @@ -268,7 +268,7 @@ async def run(job: Job) -> None: logger.info("Job %s classified as '%s'", job.id, action) # 대화 기록: 사용자 메시지 - conversation_store.add(user_id, "user", job.message) + await conversation_store.add(user_id, "user", job.message) collected: list[str] = [] @@ -281,7 +281,7 @@ async def run(job: Job) -> None: status_text = await _build_system_status(force_measure=True) collected.append(status_text) await state_stream.push(job.id, "result", {"content": status_text}) - conversation_store.add(user_id, "assistant", status_text) + await conversation_store.add(user_id, "assistant", status_text) elif action == "tools": # === TOOLS: 도구 실행 === @@ -299,7 +299,7 @@ async def run(job: Job) -> None: response = "확인할 일정이 없습니다. 다시 요청해주세요." collected.append(response) await state_stream.push(job.id, "result", {"content": response}) - conversation_store.add(user_id, "assistant", response) + await conversation_store.add(user_id, "assistant", response) else: try: result = await asyncio.wait_for(execute_tool(tool_name, operation, draft), timeout=TOOL_TIMEOUT) @@ -309,7 +309,7 @@ async def run(job: Job) -> None: response = result.get("summary", "") if result["ok"] else result.get("error", "⚠️ 오류") collected.append(response) await state_stream.push(job.id, "result", {"content": response}) - conversation_store.add(user_id, "assistant", response) + await conversation_store.add(user_id, "assistant", response) else: # 일반 도구 실행 try: @@ -355,13 +355,13 @@ async def run(job: Job) -> None: collected.append(response) await state_stream.push(job.id, "result", {"content": response}) - conversation_store.add(user_id, "assistant", "".join(collected)) + await conversation_store.add(user_id, "assistant", "".join(collected)) elif action == "clarify": # === CLARIFY: 추가 질문 === collected.append(response_text) await state_stream.push(job.id, "result", {"content": response_text}) - conversation_store.add(user_id, "assistant", response_text) + await conversation_store.add(user_id, "assistant", response_text) elif action == "route" and settings.pipeline_enabled and backend_registry.is_healthy("reasoner"): # === ROUTE: Gemma reasoning === @@ -396,7 +396,7 @@ async def run(job: Job) -> None: reasoner_system = f"{backend_registry.reasoner.system_prompt}\n\n현재 시간: {now_kst} (한국 표준시)" # 대화 이력을 OpenAI messages 형식으로 변환 (현재 user 메시지 포함됨) - history_msgs = conversation_store.get(user_id)[-10:] + history_msgs = (await conversation_store.get(user_id))[-10:] reasoner_messages = [{"role": "system", "content": reasoner_system}] for m in history_msgs: reasoner_messages.append({"role": m.role, "content": m.content}) @@ -419,7 +419,7 @@ async def run(job: Job) -> None: return if collected: - conversation_store.add(user_id, "assistant", "".join(collected)) + await conversation_store.add(user_id, "assistant", "".join(collected)) else: # === DIRECT: EXAONE 직접 응답 === @@ -435,7 +435,7 @@ async def run(job: Job) -> None: return if collected: - conversation_store.add(user_id, "assistant", "".join(collected)) + await conversation_store.add(user_id, "assistant", "".join(collected)) # --- Complete --- if not collected: