diff --git a/.env.example b/.env.example index df7653c..cfc53dc 100644 --- a/.env.example +++ b/.env.example @@ -21,4 +21,6 @@ REASONING_BASE_URL=http://192.168.1.122:8800 REASONING_MODEL=mlx-community/gemma-4-26b-a4b-it-8bit PIPELINE_ENABLED=true MAX_CONCURRENT_JOBS=3 +SYNOLOGY_INCOMING_URL= +SYNOLOGY_OUTGOING_TOKEN= NANOCLAUDE_API_KEY= diff --git a/docker-compose.yml b/docker-compose.yml index 276bd84..8a6739c 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -59,6 +59,8 @@ services: - PIPELINE_ENABLED=${PIPELINE_ENABLED:-true} - MAX_CONCURRENT_JOBS=${MAX_CONCURRENT_JOBS:-3} - DB_PATH=/app/data/nanoclaude.db + - SYNOLOGY_INCOMING_URL=${SYNOLOGY_INCOMING_URL:-} + - SYNOLOGY_OUTGOING_TOKEN=${SYNOLOGY_OUTGOING_TOKEN:-} - API_KEY=${NANOCLAUDE_API_KEY:-} volumes: - nano_data:/app/data diff --git a/nanoclaude/config.py b/nanoclaude/config.py index 8c4f8c3..966f5b1 100644 --- a/nanoclaude/config.py +++ b/nanoclaude/config.py @@ -30,6 +30,10 @@ class Settings(BaseSettings): # DB db_path: str = "/app/data/nanoclaude.db" + # Synology Chat (비어있으면 비활성화) + synology_incoming_url: str = "" + synology_outgoing_token: str = "" + # Optional API key (empty = disabled) api_key: str = "" diff --git a/nanoclaude/main.py b/nanoclaude/main.py index fecc679..1de9448 100644 --- a/nanoclaude/main.py +++ b/nanoclaude/main.py @@ -1,4 +1,4 @@ -"""NanoClaude — 비동기 job 기반 AI Gateway (Phase 2: EXAONE → Gemma 파이프라인).""" +"""NanoClaude — 비동기 job 기반 AI Gateway (Phase 3: Synology Chat 연동).""" from __future__ import annotations @@ -11,7 +11,7 @@ from fastapi.responses import JSONResponse from config import settings from db.database import init_db -from routers import chat +from routers import chat, synology from services.backend_registry import backend_registry from services import job_queue as jq_module @@ -57,6 +57,7 @@ async def check_api_key(request: Request, call_next): app.include_router(chat.router) +app.include_router(synology.router) @app.get("/") diff --git a/nanoclaude/routers/synology.py b/nanoclaude/routers/synology.py new file mode 100644 index 0000000..b3feaee --- /dev/null +++ b/nanoclaude/routers/synology.py @@ -0,0 +1,78 @@ +"""Synology Chat webhook — outgoing webhook 수신 + 파이프라인 연결.""" + +from __future__ import annotations + +import logging +import time + +from fastapi import APIRouter, Request +from fastapi.responses import JSONResponse + +from config import settings +from services.job_manager import job_manager +from services import job_queue as jq_module +from services.state_stream import state_stream +from services.synology_sender import send_to_synology + +logger = logging.getLogger(__name__) + +router = APIRouter(tags=["synology"]) + +# 중복 요청 방지 (retry 대비) — {user_id}:{timestamp} → expire time +_recent: dict[str, float] = {} +DEDUP_TTL = 30.0 # 30초 내 동일 요청 무시 + + +def _cleanup_recent(): + now = time.time() + expired = [k for k, v in _recent.items() if now - v > DEDUP_TTL] + for k in expired: + del _recent[k] + + +@router.post("/webhook/synology") +async def synology_webhook(request: Request): + """Synology Chat outgoing webhook 수신.""" + if not settings.synology_incoming_url: + return JSONResponse(status_code=503, content={"detail": "Synology integration disabled"}) + + # Parse form data + form = await request.form() + token = form.get("token", "") + text = form.get("text", "") + username = form.get("username", "") + user_id = form.get("user_id", "") + timestamp = form.get("timestamp", "") + + # Token 검증 + if token != settings.synology_outgoing_token: + logger.warning("Invalid Synology token from %s", username) + return JSONResponse(status_code=403, content={"detail": "Invalid token"}) + + if not text or not text.strip(): + return JSONResponse(status_code=200, content={"text": "빈 메시지입니다."}) + + # 중복 요청 방지 + _cleanup_recent() + dedup_key = f"{user_id}:{timestamp}" + if dedup_key in _recent: + logger.info("Duplicate webhook ignored: %s", dedup_key) + return JSONResponse(status_code=200, content={}) + _recent[dedup_key] = time.time() + + # Job 생성 + job = job_manager.create(text.strip()) + job.callback = "synology" + job.callback_meta = {"username": username, "user_id": user_id} + state_stream.create(job.id) + + logger.info("Synology job %s from %s: %s", job.id, username, text[:50]) + + # "처리 중" 메시지 먼저 전송 (typing 느낌) + await send_to_synology(f"🤖 분석 중... (from {username})") + + # 파이프라인 시작 (비동기) + await jq_module.job_queue.submit(job) + + # 즉시 200 반환 (Synology는 빠른 응답 기대) + return JSONResponse(status_code=200, content={}) diff --git a/nanoclaude/services/job_manager.py b/nanoclaude/services/job_manager.py index 723f0fc..3ff8609 100644 --- a/nanoclaude/services/job_manager.py +++ b/nanoclaude/services/job_manager.py @@ -19,6 +19,8 @@ class Job: task: asyncio.Task | None = field(default=None, repr=False) pipeline: bool = True rewritten_message: str = "" + callback: str = "" # "synology" | "" + callback_meta: dict = field(default_factory=dict) # username, user_id 등 class JobManager: diff --git a/nanoclaude/services/synology_sender.py b/nanoclaude/services/synology_sender.py new file mode 100644 index 0000000..88195a5 --- /dev/null +++ b/nanoclaude/services/synology_sender.py @@ -0,0 +1,35 @@ +"""Synology Chat — incoming webhook으로 응답 전송.""" + +from __future__ import annotations + +import json +import logging + +import httpx + +from config import settings + +logger = logging.getLogger(__name__) + + +async def send_to_synology(text: str) -> bool: + """Incoming webhook URL로 메시지 전송. 성공 시 True.""" + if not settings.synology_incoming_url: + logger.warning("Synology incoming URL not configured") + return False + + payload = json.dumps({"text": text}, ensure_ascii=False) + + try: + async with httpx.AsyncClient(verify=False, timeout=10.0) as client: + resp = await client.post( + settings.synology_incoming_url, + data={"payload": payload}, + ) + if resp.status_code == 200: + return True + logger.error("Synology send failed: %d %s", resp.status_code, resp.text) + return False + except Exception: + logger.exception("Failed to send to Synology Chat") + return False diff --git a/nanoclaude/services/worker.py b/nanoclaude/services/worker.py index 9f84edb..079ce3b 100644 --- a/nanoclaude/services/worker.py +++ b/nanoclaude/services/worker.py @@ -12,12 +12,14 @@ from models.schemas import JobStatus from services.backend_registry import backend_registry from services.job_manager import Job, job_manager from services.state_stream import state_stream +from services.synology_sender import send_to_synology logger = logging.getLogger(__name__) HEARTBEAT_INTERVAL = 4.0 REWRITE_HEARTBEAT = 2.0 MAX_REWRITE_LENGTH = 1000 +SYNOLOGY_MAX_LEN = 1500 async def _complete_with_heartbeat(adapter, message: str, job_id: str) -> str: @@ -113,8 +115,9 @@ async def run(job: Job) -> None: rewrite_latency = (time() - rewrite_start) * 1000 job.rewritten_message = rewritten_message - # --- Rewrite 결과 SSE 노출 --- - await state_stream.push(job.id, "rewrite", {"content": rewritten_message}) + # --- Rewrite 결과 SSE 노출 (Synology에서는 숨김) --- + if job.callback != "synology": + await state_stream.push(job.id, "rewrite", {"content": rewritten_message}) # --- Cancel 체크 #2 --- if job.status == JobStatus.cancelled: @@ -146,10 +149,18 @@ async def run(job: Job) -> None: job_manager.set_status(job.id, JobStatus.failed) await state_stream.push(job.id, "error", {"message": "응답을 받지 못했습니다."}) status = "failed" + if job.callback == "synology": + await send_to_synology("⚠️ 응답을 받지 못했습니다. 다시 시도해주세요.") else: job_manager.set_status(job.id, JobStatus.completed) await state_stream.push(job.id, "done", {"message": "완료"}) status = "completed" + # Synology callback: 결과 전송 + if job.callback == "synology": + full_response = "".join(collected) + if len(full_response) > SYNOLOGY_MAX_LEN: + full_response = full_response[:SYNOLOGY_MAX_LEN] + "\n\n...(생략됨)" + await send_to_synology(full_response) # --- DB 로깅 --- latency_ms = (time() - start_time) * 1000 @@ -176,6 +187,11 @@ async def run(job: Job) -> None: logger.exception("Worker failed for job %s", job.id) job_manager.set_status(job.id, JobStatus.failed) await state_stream.push(job.id, "error", {"message": "내부 오류가 발생했습니다."}) + if job.callback == "synology": + try: + await send_to_synology("⚠️ 처리 중 오류가 발생했습니다. 다시 시도해주세요.") + except Exception: + pass try: await log_completion(job.id, "failed", 0, (time() - start_time) * 1000, time()) except Exception: