feat: 이드 도구 확장 — 캘린더/메일/문서 연동 (read-only + 캘린더 생성 확인)

- tools/calendar_tool.py: CalDAV search/today/create_draft/create_confirmed
- tools/email_tool.py: IMAP search/read (전송 비활성화)
- tools/document_tool.py: Document Server search/read (read-only)
- tools/registry.py: 도구 디스패처 + WRITE_OPS 안전장치 + 에러 표준화
- 분류기: "tools" 액션 추가, 도구 목록/파라미터 스키마/규칙 명시
- Worker: tools 분기 + tool timeout 10초 + payload 2000자 제한
- conversation: pending_draft (TTL 5분) + create 확인 플로우
- 현재 시간을 분류기에 전달 (날짜 질문 대응)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Hyungi Ahn
2026-04-06 13:39:15 +09:00
parent 40c5d3cf21
commit 6e24da56a4
12 changed files with 647 additions and 15 deletions

View File

@@ -21,6 +21,15 @@ REASONING_BASE_URL=http://192.168.1.122:8800
REASONING_MODEL=mlx-community/gemma-4-26b-a4b-it-8bit
PIPELINE_ENABLED=true
MAX_CONCURRENT_JOBS=3
CALDAV_URL=
CALDAV_USER=
CALDAV_PASS=
MAILPLUS_HOST=
MAILPLUS_PORT=993
MAILPLUS_USER=
MAILPLUS_PASS=
DOCUMENT_API_URL=
DOCUMENT_API_TOKEN=
SYNOLOGY_INCOMING_URL=
SYNOLOGY_OUTGOING_TOKEN=
NANOCLAUDE_API_KEY=

View File

@@ -59,6 +59,15 @@ services:
- PIPELINE_ENABLED=${PIPELINE_ENABLED:-true}
- MAX_CONCURRENT_JOBS=${MAX_CONCURRENT_JOBS:-3}
- DB_PATH=/app/data/nanoclaude.db
- CALDAV_URL=${CALDAV_URL:-}
- CALDAV_USER=${CALDAV_USER:-}
- CALDAV_PASS=${CALDAV_PASS:-}
- MAILPLUS_HOST=${MAILPLUS_HOST:-}
- MAILPLUS_PORT=${MAILPLUS_PORT:-993}
- MAILPLUS_USER=${MAILPLUS_USER:-}
- MAILPLUS_PASS=${MAILPLUS_PASS:-}
- DOCUMENT_API_URL=${DOCUMENT_API_URL:-}
- DOCUMENT_API_TOKEN=${DOCUMENT_API_TOKEN:-}
- SYNOLOGY_INCOMING_URL=${SYNOLOGY_INCOMING_URL:-}
- SYNOLOGY_OUTGOING_TOKEN=${SYNOLOGY_OUTGOING_TOKEN:-}
- API_KEY=${NANOCLAUDE_API_KEY:-}

View File

@@ -30,6 +30,21 @@ class Settings(BaseSettings):
# DB
db_path: str = "/app/data/nanoclaude.db"
# Calendar (CalDAV)
caldav_url: str = ""
caldav_user: str = ""
caldav_pass: str = ""
# MailPlus (read-only)
mailplus_host: str = ""
mailplus_port: int = 993
mailplus_user: str = ""
mailplus_pass: str = ""
# Document Server (read-only)
document_api_url: str = ""
document_api_token: str = ""
# Synology Chat (비어있으면 비활성화)
synology_incoming_url: str = ""
synology_outgoing_token: str = ""

View File

@@ -4,3 +4,4 @@ httpx==0.27.0
pydantic-settings==2.5.0
aiosqlite==0.20.0
python-multipart==0.0.9
caldav>=1.3.0

View File

@@ -11,18 +11,37 @@ from services.model_adapter import ModelAdapter
logger = logging.getLogger(__name__)
CLASSIFIER_PROMPT = """\
너는 AI 라우터다. 사용 메시지를 분석하여 JSON으로 응답하라.
반드시 아래 3가지 action 중 하나를 선택하라:
너는 AI 라우터다. 사용<EFBFBD><EFBFBD><EFBFBD> 메시지를 분석하여 JSON으로 응답하라.
반드시 아래 4가<EFBFBD><EFBFBD> action 중 하나를 선택하라:
1. "direct" — 인사, 잡담, 간단한 질문, 자기소개 요청 등. 네가 직접 답변한다.
2. "route" — 복잡한 질문, 분석, 설명, 코딩 등. 추론 모델에게 넘긴다. 이때 prompt 필드에 추론 모델이 이해하기 좋게 정리된 프롬프트를 작성하라.
3. "clarify" — 질문이 모호하거나 정보가 부족할 때. 사용자에게 추가 질문다.
2. "route" — 복잡한 질문, 분석, 설명, 코딩 등. 추론 모델에게 넘긴다. prompt 필드에 추론 모델 프롬프트를 작성하라.
3. "clarify" — 질문이 모호하거나 정보가 부족할 때. 사용자에게 추가 질문<EFBFBD><EFBFBD><EFBFBD>다.
4. "tools" — 캘린더, 이메일, 문서 조회/조작이 필요할 때. tool/operation/params를 지정하라.
반드시 아래 JSON 형식으로만 응답하라. JSON 외 텍스트는 절대 출력하지 마라:
{"action": "direct|route|clarify", "response": "direct/clarify일 때 사용자에게 보낼 텍스트", "prompt": "route일 때 추론 모델에게 보낼 프롬프트"}
사용 가능한 도구:
- calendar.today() — 오늘 일정 조회
- calendar.search(date_from, date_to) — 기간 일정 검색. 날짜는 YYYY-MM-DD
- calendar.create_draft(title, date, time, description) — 일정 생성 초안. 시간은 HH:MM. 실제 생성 아님
- calendar.create_confirmed() — pending_draft가 있을 때, 사용자가 "확인/예/yes" 한 경우만 사용
- email.search(query, days) — 메일 검색. days 기본 7
- email.read(uid) — 메일 본문 조회
- document.search(query) — 문서 검색
- document.read(doc_id) — 문서 조회
너의 이름은 '이드'이고, 상냥하고 친근하게 대화한다.
너는 GPU 서버의 EXAONE 모델과 맥미니의 Gemma4 모델로 구성된 NanoClaude 파이프라인에서 돌아간다.
메일 전송은 불가능하다. 메일 보내달라는 요청은 거부하라.
JSON 형식:
- direct/route/clarify: {"action": "...", "response": "...", "prompt": "..."}
- tools: {"action": "tools", "tool": "calendar|email|document", "operation": "...", "params": {...}}
규칙:
- JSON 외 텍스트는 절대 출력하지 마라
- 날짜는 YYYY-MM-DD, 시간은 HH:MM
- pending_draft가 있다고 [대화 이력]에 표시되어 있을 때만 create_confirmed 사용
- 오늘 날짜 정보가 [현재 시간]에 있으니 참고하라
<EFBFBD><EFBFBD><EFBFBD>의 이름은 '이드'. 상냥하고 친근하게 대화한다.
대화 이력이 있으면 맥락을 고려하라.\
"""

View File

@@ -8,6 +8,7 @@ from time import time
MAX_HISTORY = 10 # user당 최근 대화 수
HISTORY_TTL = 3600.0 # 1시간 이후 대화 만료
DRAFT_TTL = 300.0 # pending_draft 5분 만료
@dataclass
@@ -20,6 +21,7 @@ class Message:
class ConversationStore:
def __init__(self) -> None:
self._history: dict[str, list[Message]] = defaultdict(list)
self._pending_drafts: dict[str, tuple[dict, float]] = {} # user_id → (draft_data, created_at)
def add(self, user_id: str, role: str, content: str) -> None:
msgs = self._history[user_id]
@@ -40,13 +42,33 @@ class ConversationStore:
def format_for_prompt(self, user_id: str) -> str:
"""EXAONE에 전달할 대화 이력 포맷."""
msgs = self.get(user_id)
if not msgs:
return ""
lines = []
for m in msgs[-6:]: # 최근 6개만
prefix = "사용자" if m.role == "user" else "이드"
lines.append(f"{prefix}: {m.content}")
# pending_draft 표시
draft = self.get_pending_draft(user_id)
if draft:
lines.append(f"[시스템: pending_draft 있음 — {draft.get('title', '일정')} {draft.get('date', '')} {draft.get('time', '')}]")
if not lines:
return ""
return "\n".join(lines)
def set_pending_draft(self, user_id: str, draft_data: dict) -> None:
self._pending_drafts[user_id] = (draft_data, time())
def get_pending_draft(self, user_id: str) -> dict | None:
entry = self._pending_drafts.get(user_id)
if not entry:
return None
data, created_at = entry
if time() - created_at > DRAFT_TTL:
del self._pending_drafts[user_id]
return None
return data
def clear_pending_draft(self, user_id: str) -> None:
self._pending_drafts.pop(user_id, None)
conversation_store = ConversationStore()

View File

@@ -1,4 +1,4 @@
"""Worker — EXAONE 분류 → direct/route/clarify 분기 (cancel-safe + fallback)."""
"""Worker — EXAONE 분류 → direct/route/clarify/tools 분기 (cancel-safe + fallback)."""
from __future__ import annotations
@@ -15,6 +15,7 @@ from services.conversation import conversation_store
from services.job_manager import Job, job_manager
from services.state_stream import state_stream
from services.synology_sender import send_to_synology
from tools.registry import execute_tool
logger = logging.getLogger(__name__)
@@ -22,6 +23,8 @@ HEARTBEAT_INTERVAL = 4.0
CLASSIFY_HEARTBEAT = 2.0
MAX_PROMPT_LENGTH = 1000
SYNOLOGY_MAX_LEN = 4000
MAX_TOOL_PAYLOAD = 2000
TOOL_TIMEOUT = 10.0
async def _complete_with_heartbeat(adapter, message: str, job_id: str, *, messages=None, beat_msg="처리 중...") -> str:
@@ -114,11 +117,14 @@ async def run(job: Job) -> None:
classify_model = backend_registry.classifier.model
# --- 대화 이력 포함하여 분류 요청 ---
# --- 대화 이력 + 현재 시간 포함하여 분류 요청 ---
from datetime import datetime
now_str = datetime.now().strftime("%Y-%m-%d %H:%M (%A)")
history = conversation_store.format_for_prompt(user_id)
classify_input = job.message
classify_input = f"[현재 시간]\n{now_str}\n\n"
if history:
classify_input = f"[대화 이력]\n{history}\n\n[현재 메시지]\n{job.message}"
classify_input += f"[대화 이력]\n{history}\n\n"
classify_input += f"[현재 메시지]\n{job.message}"
await state_stream.push(job.id, "processing", {"message": "메시지를 분석하고 있습니다..."})
classify_start = time()
@@ -148,7 +154,76 @@ async def run(job: Job) -> None:
if job.status == JobStatus.cancelled:
return
if action == "clarify":
if action == "tools":
# === TOOLS: 도구 실행 ===
tool_name = classification.get("tool", "")
operation = classification.get("operation", "")
params = classification.get("params", {})
logger.info("Job %s tool call: %s.%s(%s)", job.id, tool_name, operation, params)
await state_stream.push(job.id, "processing", {"message": f"🔧 {tool_name} 도구를 사용하고 있습니다..."})
# create_confirmed → pending_draft에서 데이터 가져오기
if operation == "create_confirmed":
draft = conversation_store.get_pending_draft(user_id)
if not draft:
response = "확인할 일정이 없습니다. 다시 요청해주세요."
collected.append(response)
await state_stream.push(job.id, "result", {"content": response})
conversation_store.add(user_id, "assistant", response)
else:
try:
result = await asyncio.wait_for(execute_tool(tool_name, operation, draft), timeout=TOOL_TIMEOUT)
except asyncio.TimeoutError:
result = {"ok": False, "tool": tool_name, "operation": operation, "data": [], "summary": "", "error": "⚠️ 서비스 응답 시간이 초과되었습니다."}
conversation_store.clear_pending_draft(user_id)
response = result.get("summary", "") if result["ok"] else result.get("error", "⚠️ 오류")
collected.append(response)
await state_stream.push(job.id, "result", {"content": response})
conversation_store.add(user_id, "assistant", response)
else:
# 일반 도구 실행
try:
result = await asyncio.wait_for(execute_tool(tool_name, operation, params), timeout=TOOL_TIMEOUT)
except asyncio.TimeoutError:
result = {"ok": False, "tool": tool_name, "operation": operation, "data": [], "summary": "", "error": "⚠️ 서비스 응답 시간이 초과되었습니다."}
if not result["ok"]:
response = result.get("error", "⚠️ 서비스를 사용할 수 없습니다.")
collected.append(response)
await state_stream.push(job.id, "result", {"content": response})
else:
# create_draft → pending에 저장 + 확인 요청
if operation == "create_draft":
conversation_store.set_pending_draft(user_id, result["data"])
response = result["summary"] + "\n\n'확인' 또는 '취소'로 답해주세요."
collected.append(response)
await state_stream.push(job.id, "result", {"content": response})
else:
# 결과를 EXAONE에 전달하여 자연어로 정리
tool_json = json.dumps(result["data"], ensure_ascii=False)
if len(tool_json) > MAX_TOOL_PAYLOAD:
tool_json = tool_json[:MAX_TOOL_PAYLOAD] + "...(truncated)"
format_input = f"[도구 결과]\n{tool_json}\n\n위 데이터를 바탕으로 사용자에게 자연스럽고 간결하게 답해."
try:
response = await _complete_with_heartbeat(
backend_registry.classifier, format_input, job.id,
beat_msg="결과를 정리하고 있습니다..."
)
# 포맷팅 응답이 JSON으로 올 수도 있으니 파싱 시도
try:
parsed = json.loads(response)
response = parsed.get("response", response)
except (json.JSONDecodeError, AttributeError):
pass
except Exception:
response = result.get("summary", "결과를 조회했습니다.")
collected.append(response)
await state_stream.push(job.id, "result", {"content": response})
conversation_store.add(user_id, "assistant", "".join(collected))
elif action == "clarify":
# === CLARIFY: 추가 질문 ===
collected.append(response_text)
await state_stream.push(job.id, "result", {"content": response_text})

View File

View File

@@ -0,0 +1,140 @@
"""Calendar 도구 — CalDAV를 통한 일정 조회/생성."""
from __future__ import annotations
import logging
import uuid
from datetime import datetime, timedelta
import caldav
from config import settings
logger = logging.getLogger(__name__)
TOOL_NAME = "calendar"
def _make_result(ok: bool, operation: str, data=None, summary: str = "", error: str | None = None) -> dict:
return {"ok": ok, "tool": TOOL_NAME, "operation": operation, "data": data or [], "summary": summary, "error": error}
def _get_client():
if not settings.caldav_url or not settings.caldav_user:
return None
return caldav.DAVClient(
url=settings.caldav_url,
username=settings.caldav_user,
password=settings.caldav_pass,
ssl_verify_cert=False,
)
def _parse_event(event) -> dict:
"""VEVENT → dict."""
try:
vevent = event.vobject_instance.vevent
summary = str(vevent.summary.value) if hasattr(vevent, "summary") else "(제목 없음)"
dtstart = vevent.dtstart.value
dtend = vevent.dtend.value if hasattr(vevent, "dtend") else None
start_str = dtstart.strftime("%Y-%m-%d %H:%M") if hasattr(dtstart, "strftime") else str(dtstart)
end_str = dtend.strftime("%Y-%m-%d %H:%M") if dtend and hasattr(dtend, "strftime") else ""
return {"summary": summary, "start": start_str, "end": end_str}
except Exception:
return {"summary": "(파싱 실패)", "start": "", "end": ""}
async def today() -> dict:
"""오늘 일정 조회."""
now = datetime.now()
start = now.replace(hour=0, minute=0, second=0)
end = start + timedelta(days=1)
return await search(start.strftime("%Y-%m-%d"), end.strftime("%Y-%m-%d"))
async def search(date_from: str, date_to: str) -> dict:
"""기간 내 일정 검색."""
try:
client = _get_client()
if not client:
return _make_result(False, "search", error="CalDAV 설정이 없습니다.")
principal = client.principal()
calendars = principal.calendars()
if not calendars:
return _make_result(True, "search", data=[], summary="캘린더가 없습니다.")
start = datetime.strptime(date_from, "%Y-%m-%d")
end = datetime.strptime(date_to, "%Y-%m-%d") + timedelta(days=1)
events = []
for cal in calendars:
try:
results = cal.date_search(start=start, end=end, expand=True)
for ev in results:
events.append(_parse_event(ev))
except Exception:
continue
summary = f"{date_from}~{date_to} 기간에 {len(events)}개의 일정이 있습니다."
return _make_result(True, "search", data=events, summary=summary)
except Exception as e:
logger.exception("Calendar search failed")
return _make_result(False, "search", error=str(e))
async def create_draft(title: str, date: str, time: str, description: str = "") -> dict:
"""일정 생성 초안 (실제 생성 안 함)."""
title = title or "일정"
draft = {
"title": title,
"date": date,
"time": time,
"description": description,
"display": f"📅 제목: {title}\n날짜: {date} {time}\n설명: {description or '없음'}",
}
return _make_result(True, "create_draft", data=draft, summary=draft["display"])
async def create_confirmed(draft_data: dict) -> dict:
"""사용자 확인 후 실제 CalDAV 이벤트 생성."""
try:
client = _get_client()
if not client:
return _make_result(False, "create_confirmed", error="CalDAV 설정이 없습니다.")
title = draft_data.get("title", "일정")
date_str = draft_data.get("date", "")
time_str = draft_data.get("time", "00:00")
description = draft_data.get("description", "")
dt_start = datetime.strptime(f"{date_str} {time_str}", "%Y-%m-%d %H:%M")
dt_end = dt_start + timedelta(hours=1)
uid = str(uuid.uuid4())
vcal = f"""BEGIN:VCALENDAR
VERSION:2.0
PRODID:-//NanoClaude//이드//KO
BEGIN:VEVENT
UID:{uid}
DTSTART:{dt_start.strftime('%Y%m%dT%H%M%S')}
DTEND:{dt_end.strftime('%Y%m%dT%H%M%S')}
SUMMARY:{title}
DESCRIPTION:{description}
END:VEVENT
END:VCALENDAR"""
principal = client.principal()
calendars = principal.calendars()
if not calendars:
return _make_result(False, "create_confirmed", error="캘린더를 찾을 수 없습니다.")
calendars[0].save_event(vcal)
return _make_result(True, "create_confirmed", data={"uid": uid, "title": title}, summary=f"'{title}' 일정이 등록되었습니다!")
except Exception as e:
logger.exception("Calendar create failed")
return _make_result(False, "create_confirmed", error=str(e))

View File

@@ -0,0 +1,91 @@
"""Document 도구 — Document Server REST API (read-only)."""
from __future__ import annotations
import logging
import httpx
from config import settings
logger = logging.getLogger(__name__)
TOOL_NAME = "document"
MAX_RESULTS = 5
def _make_result(ok: bool, operation: str, data=None, summary: str = "", error: str | None = None) -> dict:
return {"ok": ok, "tool": TOOL_NAME, "operation": operation, "data": data or [], "summary": summary, "error": error}
def _headers() -> dict:
return {"Authorization": f"Bearer {settings.document_api_token}"} if settings.document_api_token else {}
async def search(query: str) -> dict:
"""문서 하이브리드 검색."""
if not settings.document_api_url:
return _make_result(False, "search", error="Document Server 설정이 없습니다.")
try:
async with httpx.AsyncClient(timeout=10.0) as client:
resp = await client.get(
f"{settings.document_api_url}/search/",
params={"q": query, "mode": "hybrid"},
headers=_headers(),
)
if resp.status_code != 200:
return _make_result(False, "search", error=f"API 응답 오류 ({resp.status_code})")
results = resp.json()
if isinstance(results, dict):
results = results.get("results", results.get("data", []))
# top-N 제한
results = results[:MAX_RESULTS]
items = []
for doc in results:
items.append({
"id": doc.get("id", ""),
"title": doc.get("title", "(제목 없음)"),
"domain": doc.get("domain", ""),
"preview": str(doc.get("content", doc.get("snippet", "")))[:200],
})
summary = f"'{query}' 검색 결과 {len(items)}"
return _make_result(True, "search", data=items, summary=summary)
except Exception as e:
logger.exception("Document search failed")
return _make_result(False, "search", error=str(e))
async def read(doc_id: str) -> dict:
"""문서 내용 조회."""
if not settings.document_api_url:
return _make_result(False, "read", error="Document Server 설정이 없습니다.")
try:
async with httpx.AsyncClient(timeout=10.0) as client:
resp = await client.get(
f"{settings.document_api_url}/documents/{doc_id}",
headers=_headers(),
)
if resp.status_code == 404:
return _make_result(False, "read", error=f"문서 {doc_id}를 찾을 수 없습니다.")
if resp.status_code != 200:
return _make_result(False, "read", error=f"API 응답 오류 ({resp.status_code})")
doc = resp.json()
data = {
"id": doc.get("id", ""),
"title": doc.get("title", ""),
"domain": doc.get("domain", ""),
"content": str(doc.get("content", doc.get("markdown_content", "")))[:2000],
}
return _make_result(True, "read", data=data, summary=f"문서: {data['title']}")
except Exception as e:
logger.exception("Document read failed")
return _make_result(False, "read", error=str(e))

View File

@@ -0,0 +1,151 @@
"""Email 도구 — IMAP을 통한 메일 조회 (read-only)."""
from __future__ import annotations
import email
import email.header
import imaplib
import logging
from datetime import datetime, timedelta
from config import settings
logger = logging.getLogger(__name__)
TOOL_NAME = "email"
MAX_RESULTS = 10
PREVIEW_LEN = 200
def _make_result(ok: bool, operation: str, data=None, summary: str = "", error: str | None = None) -> dict:
return {"ok": ok, "tool": TOOL_NAME, "operation": operation, "data": data or [], "summary": summary, "error": error}
def _decode_header(raw: str) -> str:
parts = email.header.decode_header(raw)
decoded = []
for part, charset in parts:
if isinstance(part, bytes):
decoded.append(part.decode(charset or "utf-8", errors="replace"))
else:
decoded.append(part)
return "".join(decoded)
def _get_body(msg) -> str:
"""메일 본문 추출."""
if msg.is_multipart():
for part in msg.walk():
ct = part.get_content_type()
if ct == "text/plain":
payload = part.get_payload(decode=True)
if payload:
charset = part.get_content_charset() or "utf-8"
return payload.decode(charset, errors="replace")
# fallback to html
for part in msg.walk():
if part.get_content_type() == "text/html":
payload = part.get_payload(decode=True)
if payload:
charset = part.get_content_charset() or "utf-8"
return payload.decode(charset, errors="replace")
else:
payload = msg.get_payload(decode=True)
if payload:
charset = msg.get_content_charset() or "utf-8"
return payload.decode(charset, errors="replace")
return ""
def _connect():
if not settings.mailplus_host or not settings.mailplus_user:
return None
conn = imaplib.IMAP4_SSL(settings.mailplus_host, settings.mailplus_port)
conn.login(settings.mailplus_user, settings.mailplus_pass)
return conn
async def search(query: str = "", days: int = 7) -> dict:
"""최근 메일 검색."""
try:
conn = _connect()
if not conn:
return _make_result(False, "search", error="메일 설정이 없습니다.")
conn.select("INBOX", readonly=True)
since = (datetime.now() - timedelta(days=days)).strftime("%d-%b-%Y")
if query:
criteria = f'(SINCE {since} SUBJECT "{query}")'
else:
criteria = f"(SINCE {since})"
_, data = conn.search(None, criteria)
uids = data[0].split()
# 최신 MAX_RESULTS개만
uids = uids[-MAX_RESULTS:]
uids.reverse()
results = []
for uid in uids:
_, msg_data = conn.fetch(uid, "(RFC822.HEADER)")
if not msg_data or not msg_data[0]:
continue
raw = msg_data[0][1]
msg = email.message_from_bytes(raw)
subject = _decode_header(msg.get("Subject", "(제목 없음)"))
from_addr = _decode_header(msg.get("From", ""))
date_str = msg.get("Date", "")
results.append({
"uid": uid.decode(),
"subject": subject,
"from": from_addr,
"date": date_str[:20],
})
conn.close()
conn.logout()
summary = f"최근 {days}일간 {len(results)}개의 메일이 있습니다."
return _make_result(True, "search", data=results, summary=summary)
except Exception as e:
logger.exception("Email search failed")
return _make_result(False, "search", error=str(e))
async def read(uid: str) -> dict:
"""특정 메일 본문 조회."""
try:
conn = _connect()
if not conn:
return _make_result(False, "read", error="메일 설정이 없습니다.")
conn.select("INBOX", readonly=True)
_, msg_data = conn.fetch(uid.encode(), "(RFC822)")
if not msg_data or not msg_data[0]:
conn.close()
conn.logout()
return _make_result(False, "read", error=f"UID {uid} 메일을 찾을 수 없습니다.")
raw = msg_data[0][1]
msg = email.message_from_bytes(raw)
subject = _decode_header(msg.get("Subject", ""))
from_addr = _decode_header(msg.get("From", ""))
date_str = msg.get("Date", "")
body = _get_body(msg)[:PREVIEW_LEN * 5] # read는 더 긴 본문
conn.close()
conn.logout()
data = {"uid": uid, "subject": subject, "from": from_addr, "date": date_str, "body": body}
return _make_result(True, "read", data=data, summary=f"메일: {subject}")
except Exception as e:
logger.exception("Email read failed")
return _make_result(False, "read", error=str(e))

View File

@@ -0,0 +1,100 @@
"""Tool Registry — 도구 실행 디스패처 + 안전장치."""
from __future__ import annotations
import logging
from tools import calendar_tool, document_tool, email_tool
logger = logging.getLogger(__name__)
# 에러 메시지 표준화 (내부 에러 노출 안 함)
ERROR_MESSAGES = {
"calendar": "⚠️ 캘린더 서비스를 사용할 수 없습니다. 잠시 후 다시 시도해주세요.",
"email": "⚠️ 메일 서비스를 사용할 수 없습니다. 잠시 후 다시 시도해주세요.",
"document": "⚠️ 문서 서비스를 사용할 수 없습니다. 잠시 후 다시 시도해주세요.",
}
# 허용된 operations
ALLOWED_OPS = {
"calendar": {"today", "search", "create_draft", "create_confirmed"},
"email": {"search", "read"},
"document": {"search", "read"},
}
# payload hard limit
MAX_TOOL_PAYLOAD = 2000
async def execute_tool(tool_name: str, operation: str, params: dict) -> dict:
"""도구 실행 디스패처."""
# 도구 존재 확인
if tool_name not in ALLOWED_OPS:
return _error(tool_name, operation, f"알 수 없는 도구: {tool_name}")
# operation 허용 확인
if operation not in ALLOWED_OPS[tool_name]:
return _error(tool_name, operation, f"허용되지 않은 작업: {tool_name}.{operation}")
try:
if tool_name == "calendar":
result = await _exec_calendar(operation, params)
elif tool_name == "email":
result = await _exec_email(operation, params)
elif tool_name == "document":
result = await _exec_document(operation, params)
else:
result = _error(tool_name, operation, "미구현")
if not result["ok"]:
logger.warning("Tool %s.%s failed: %s", tool_name, operation, result.get("error"))
result["error"] = ERROR_MESSAGES.get(tool_name, "⚠️ 서비스를 사용할 수 없습니다.")
return result
except Exception:
logger.exception("Tool %s.%s exception", tool_name, operation)
return _error(tool_name, operation, ERROR_MESSAGES.get(tool_name, "⚠️ 서비스 오류"))
async def _exec_calendar(operation: str, params: dict) -> dict:
if operation == "today":
return await calendar_tool.today()
elif operation == "search":
return await calendar_tool.search(
params.get("date_from", ""),
params.get("date_to", ""),
)
elif operation == "create_draft":
return await calendar_tool.create_draft(
params.get("title", ""),
params.get("date", ""),
params.get("time", "00:00"),
params.get("description", ""),
)
elif operation == "create_confirmed":
return await calendar_tool.create_confirmed(params)
return _error("calendar", operation, "미구현")
async def _exec_email(operation: str, params: dict) -> dict:
if operation == "search":
return await email_tool.search(
params.get("query", ""),
params.get("days", 7),
)
elif operation == "read":
return await email_tool.read(params.get("uid", ""))
return _error("email", operation, "미구현")
async def _exec_document(operation: str, params: dict) -> dict:
if operation == "search":
return await document_tool.search(params.get("query", ""))
elif operation == "read":
return await document_tool.read(params.get("doc_id", ""))
return _error("document", operation, "미구현")
def _error(tool: str, operation: str, msg: str) -> dict:
return {"ok": False, "tool": tool, "operation": operation, "data": [], "summary": "", "error": msg}