"""DSM Chat API Bridge — 사진 폴링 + 회고 텍스트 포워딩 서비스 (port 8091)""" import asyncio import base64 import json import logging import os import time from contextlib import asynccontextmanager import httpx from dotenv import load_dotenv from fastapi import FastAPI, Request from fastapi.responses import JSONResponse load_dotenv() logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s") logger = logging.getLogger("chat_bridge") DSM_HOST = os.getenv("DSM_HOST", "http://192.168.1.227:5000") DSM_ACCOUNT = os.getenv("DSM_ACCOUNT", "chatbot-api") DSM_PASSWORD = os.getenv("DSM_PASSWORD", "") CHAT_CHANNEL_ID = int(os.getenv("CHAT_CHANNEL_ID", "17")) SYNOLOGY_CHAT_WEBHOOK_URL = os.getenv("SYNOLOGY_CHAT_WEBHOOK_URL", "") HEIC_CONVERTER_URL = os.getenv("HEIC_CONVERTER_URL", "http://127.0.0.1:8090") POLL_INTERVAL = int(os.getenv("POLL_INTERVAL", "5")) RETROSPECT_CHANNEL_ID = int(os.getenv("RETROSPECT_CHANNEL_ID", "0")) RETROSPECT_CHAT_WEBHOOK_URL = os.getenv("RETROSPECT_CHAT_WEBHOOK_URL", "") N8N_RETROSPECT_WEBHOOK_URL = os.getenv("N8N_RETROSPECT_WEBHOOK_URL", "http://localhost:5678/webhook/retrospect") # 회고 채널에서 포워딩할 사용자 ID (봇 메시지 제외) RETROSPECT_USER_IDS = {int(x) for x in os.getenv("RETROSPECT_USER_IDS", "6").split(",") if x.strip()} # State sid: str = "" last_seen_post_id: int = 0 retro_last_seen_post_id: int = 0 pending_photos: dict[int, dict] = {} # user_id -> {post_id, create_at, filename} async def dsm_login(client: httpx.AsyncClient) -> str: global sid resp = await client.get(f"{DSM_HOST}/webapi/entry.cgi", params={ "api": "SYNO.API.Auth", "version": 7, "method": "login", "account": DSM_ACCOUNT, "passwd": DSM_PASSWORD, }, timeout=15) data = resp.json() if data.get("success"): sid = data["data"]["sid"] logger.info("DSM login successful") return sid raise RuntimeError(f"DSM login failed: {data}") async def api_call(client: httpx.AsyncClient, api: str, version: int, method: str, params: dict | None = None, retries: int = 1) -> dict: global sid all_params = {"api": api, "version": version, "method": method, "_sid": sid} if params: all_params.update(params) resp = await client.get(f"{DSM_HOST}/webapi/entry.cgi", params=all_params, timeout=15) data = resp.json() if data.get("success"): return data["data"] err_code = data.get("error", {}).get("code") if err_code == 119 and retries > 0: logger.warning("Session expired, re-logging in...") await dsm_login(client) return await api_call(client, api, version, method, params, retries - 1) raise RuntimeError(f"API {api}.{method} failed: {data}") async def download_file(client: httpx.AsyncClient, post_id: int) -> bytes: global sid resp = await client.get(f"{DSM_HOST}/webapi/entry.cgi", params={ "api": "SYNO.Chat.Post.File", "version": 2, "method": "get", "post_id": post_id, "_sid": sid, }, timeout=30) if resp.status_code != 200: raise RuntimeError(f"File download failed: HTTP {resp.status_code}") ct = resp.headers.get("content-type", "") if "json" in ct: data = resp.json() if not data.get("success"): raise RuntimeError(f"File download API error: {data}") return resp.content def extract_posts(data) -> list: if isinstance(data, list): return data if isinstance(data, dict): for key in ("posts", "data"): if key in data and isinstance(data[key], list): return data[key] return [] async def send_chat_ack(): if not SYNOLOGY_CHAT_WEBHOOK_URL: logger.warning("No SYNOLOGY_CHAT_WEBHOOK_URL, skipping ack") return async with httpx.AsyncClient(verify=False) as client: payload = json.dumps( {"text": "\U0001f4f7 사진이 확인되었습니다. 설명을 입력해주세요."}) await client.post(SYNOLOGY_CHAT_WEBHOOK_URL, data={"payload": payload}, timeout=10) async def poll_channel(client: httpx.AsyncClient): global last_seen_post_id try: data = await api_call(client, "SYNO.Chat.Post", 8, "list", {"channel_id": CHAT_CHANNEL_ID, "limit": 10}) posts = extract_posts(data) now_ms = int(time.time() * 1000) expired = [uid for uid, info in pending_photos.items() if now_ms - info["create_at"] > 300_000] for uid in expired: del pending_photos[uid] logger.info(f"Expired pending photo for user_id={uid}") for post in posts: post_id = post.get("post_id", 0) if post_id <= last_seen_post_id: continue if (post.get("type") == "file" and post.get("file_props", {}).get("is_image")): user_id = post.get("creator_id", 0) pending_photos[user_id] = { "post_id": post_id, "create_at": post.get("create_at", now_ms), "filename": post.get("file_props", {}).get("name", "unknown"), } logger.info(f"Photo detected: post_id={post_id} user_id={user_id} " f"file={pending_photos[user_id]['filename']}") await send_chat_ack() if posts: max_id = max(p.get("post_id", 0) for p in posts) if max_id > last_seen_post_id: last_seen_post_id = max_id except Exception as e: logger.error(f"Poll error: {e}") async def forward_to_n8n(post: dict): payload = { "text": post.get("message", ""), "user_id": post.get("creator_id", 0), "username": str(post.get("creator_id", "unknown")), "post_id": post.get("post_id", 0), "timestamp": post.get("create_at", 0), } try: async with httpx.AsyncClient(verify=False) as client: resp = await client.post(N8N_RETROSPECT_WEBHOOK_URL, json=payload, timeout=10) logger.info(f"Forwarded retrospect post_id={post.get('post_id')} " f"to n8n: {resp.status_code}") except Exception as e: logger.error(f"Failed to forward to n8n: {e}") async def poll_retrospect_channel(client: httpx.AsyncClient): global retro_last_seen_post_id if not RETROSPECT_CHANNEL_ID: return try: data = await api_call(client, "SYNO.Chat.Post", 8, "list", {"channel_id": RETROSPECT_CHANNEL_ID, "limit": 10}) posts = extract_posts(data) for post in posts: post_id = post.get("post_id", 0) if post_id <= retro_last_seen_post_id: continue # 텍스트 메시지만 포워딩 (파일/시스템/봇 메시지 제외) if (post.get("type", "normal") == "normal" and post.get("message", "").strip() and post.get("creator_id", 0) in RETROSPECT_USER_IDS): await forward_to_n8n(post) if posts: max_id = max(p.get("post_id", 0) for p in posts) if max_id > retro_last_seen_post_id: retro_last_seen_post_id = max_id except Exception as e: logger.error(f"Retrospect poll error: {e}") async def polling_loop(): async with httpx.AsyncClient(verify=False) as client: # Login for attempt in range(3): try: await dsm_login(client) break except Exception as e: logger.error(f"DSM login attempt {attempt+1} failed: {e}") if attempt == 2: logger.error("All login attempts failed, polling disabled") return await asyncio.sleep(5) # Initialize last_seen_post_id global last_seen_post_id, retro_last_seen_post_id try: data = await api_call(client, "SYNO.Chat.Post", 8, "list", {"channel_id": CHAT_CHANNEL_ID, "limit": 5}) posts = extract_posts(data) if posts: last_seen_post_id = max(p.get("post_id", 0) for p in posts) logger.info(f"Initialized last_seen_post_id={last_seen_post_id}") except Exception as e: logger.warning(f"Failed to init last_seen_post_id: {e}") # Initialize retro_last_seen_post_id if RETROSPECT_CHANNEL_ID: try: data = await api_call(client, "SYNO.Chat.Post", 8, "list", {"channel_id": RETROSPECT_CHANNEL_ID, "limit": 1}) posts = extract_posts(data) if posts: retro_last_seen_post_id = max(p.get("post_id", 0) for p in posts) logger.info(f"Initialized retro_last_seen_post_id={retro_last_seen_post_id}") except Exception as e: logger.warning(f"Failed to init retro_last_seen_post_id: {e}") # Poll loop while True: await poll_channel(client) await asyncio.sleep(0.5) # DSM API 호출 간격 await poll_retrospect_channel(client) await asyncio.sleep(POLL_INTERVAL) @asynccontextmanager async def lifespan(app: FastAPI): task = asyncio.create_task(polling_loop()) yield task.cancel() try: await task except asyncio.CancelledError: pass app = FastAPI(lifespan=lifespan) def is_heic(data: bytes, filename: str) -> bool: if filename.lower().endswith((".heic", ".heif")): return True if len(data) >= 12: ftyp = data[4:12].decode("ascii", errors="ignore") if "ftyp" in ftyp and any(x in ftyp for x in ["heic", "heix", "mif1"]): return True return False @app.post("/chat/recent-photo") async def recent_photo(request: Request): body = await request.json() channel_id = body.get("channel_id", CHAT_CHANNEL_ID) user_id = body.get("user_id", 0) before_timestamp = body.get("before_timestamp", int(time.time() * 1000)) # 1. Check pending_photos (polling already detected) photo_info = pending_photos.get(user_id) # 2. Fallback: search via API if not photo_info: try: async with httpx.AsyncClient(verify=False) as client: if not sid: await dsm_login(client) data = await api_call(client, "SYNO.Chat.Post", 8, "list", {"channel_id": channel_id, "limit": 20}) posts = extract_posts(data) now_ms = int(time.time() * 1000) for post in sorted(posts, key=lambda p: p.get("create_at", 0), reverse=True): if (post.get("type") == "file" and post.get("file_props", {}).get("is_image") and post.get("creator_id") == user_id and post.get("create_at", 0) < before_timestamp and now_ms - post.get("create_at", 0) < 300_000): photo_info = { "post_id": post["post_id"], "create_at": post["create_at"], "filename": post.get("file_props", {}).get("name", "unknown"), } logger.info(f"Fallback found photo: post_id={post['post_id']}") break except Exception as e: logger.error(f"Fallback search error: {e}") if not photo_info: return JSONResponse({"found": False}) # 3. Download try: async with httpx.AsyncClient(verify=False) as client: if not sid: await dsm_login(client) file_data = await download_file(client, photo_info["post_id"]) except Exception as e: logger.error(f"File download error: {e}") return JSONResponse({"found": False, "error": str(e)}) # 4. HEIC conversion if needed filename = photo_info.get("filename", "unknown") fmt = "jpeg" b64 = base64.b64encode(file_data).decode() if is_heic(file_data, filename): try: async with httpx.AsyncClient() as client: conv_resp = await client.post( f"{HEIC_CONVERTER_URL}/convert/heic-to-jpeg", json={"base64": b64}, timeout=30) conv_data = conv_resp.json() b64 = conv_data["base64"] fmt = "jpeg" logger.info(f"HEIC converted: {filename}") except Exception as e: logger.warning(f"HEIC conversion failed: {e}, returning raw") fmt = "heic" # 5. Consume pending photo if user_id in pending_photos: del pending_photos[user_id] return JSONResponse({ "found": True, "base64": b64, "format": fmt, "filename": filename, "size": len(file_data), }) @app.get("/health") async def health(): return { "status": "ok", "sid_active": bool(sid), "last_seen_post_id": last_seen_post_id, "retro_last_seen_post_id": retro_last_seen_post_id, "retro_channel_id": RETROSPECT_CHANNEL_ID, "pending_photos": { str(uid): info["filename"] for uid, info in pending_photos.items() }, }