- 파이프라인 42→51노드 확장 (calendar/mail/note 핸들러 추가) - 네이티브 서비스 6개: heic_converter(:8090), chat_bridge(:8091), caldav_bridge(:8092), devonthink_bridge(:8093), inbox_processor, news_digest - 분류기 v2→v3: calendar, reminder, mail, note intent 추가 - Mail Processing Pipeline (7노드, IMAP 폴링) - LaunchAgent plist 6개 + manage_services.sh - migrate-v3.sql: news_digest_log + calendar_events 확장 - 개발 문서 현행화 (CLAUDE.md, QUICK_REFERENCE.md, docs/architecture.md) Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
294 lines
10 KiB
Python
294 lines
10 KiB
Python
"""DSM Chat API Bridge — 사진 폴링 + 다운로드 서비스 (port 8091)"""
|
|
|
|
import asyncio
|
|
import base64
|
|
import json
|
|
import logging
|
|
import os
|
|
import time
|
|
from contextlib import asynccontextmanager
|
|
|
|
import httpx
|
|
from dotenv import load_dotenv
|
|
from fastapi import FastAPI, Request
|
|
from fastapi.responses import JSONResponse
|
|
|
|
load_dotenv()
|
|
|
|
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
|
|
logger = logging.getLogger("chat_bridge")
|
|
|
|
DSM_HOST = os.getenv("DSM_HOST", "http://192.168.1.227:5000")
|
|
DSM_ACCOUNT = os.getenv("DSM_ACCOUNT", "chatbot-api")
|
|
DSM_PASSWORD = os.getenv("DSM_PASSWORD", "")
|
|
CHAT_CHANNEL_ID = int(os.getenv("CHAT_CHANNEL_ID", "17"))
|
|
SYNOLOGY_CHAT_WEBHOOK_URL = os.getenv("SYNOLOGY_CHAT_WEBHOOK_URL", "")
|
|
HEIC_CONVERTER_URL = os.getenv("HEIC_CONVERTER_URL", "http://127.0.0.1:8090")
|
|
POLL_INTERVAL = int(os.getenv("POLL_INTERVAL", "5"))
|
|
|
|
# State
|
|
sid: str = ""
|
|
last_seen_post_id: int = 0
|
|
pending_photos: dict[int, dict] = {} # user_id -> {post_id, create_at, filename}
|
|
|
|
|
|
async def dsm_login(client: httpx.AsyncClient) -> str:
|
|
global sid
|
|
resp = await client.get(f"{DSM_HOST}/webapi/entry.cgi", params={
|
|
"api": "SYNO.API.Auth", "version": 7, "method": "login",
|
|
"account": DSM_ACCOUNT, "passwd": DSM_PASSWORD,
|
|
}, timeout=15)
|
|
data = resp.json()
|
|
if data.get("success"):
|
|
sid = data["data"]["sid"]
|
|
logger.info("DSM login successful")
|
|
return sid
|
|
raise RuntimeError(f"DSM login failed: {data}")
|
|
|
|
|
|
async def api_call(client: httpx.AsyncClient, api: str, version: int,
|
|
method: str, params: dict | None = None, retries: int = 1) -> dict:
|
|
global sid
|
|
all_params = {"api": api, "version": version, "method": method, "_sid": sid}
|
|
if params:
|
|
all_params.update(params)
|
|
resp = await client.get(f"{DSM_HOST}/webapi/entry.cgi",
|
|
params=all_params, timeout=15)
|
|
data = resp.json()
|
|
if data.get("success"):
|
|
return data["data"]
|
|
err_code = data.get("error", {}).get("code")
|
|
if err_code == 119 and retries > 0:
|
|
logger.warning("Session expired, re-logging in...")
|
|
await dsm_login(client)
|
|
return await api_call(client, api, version, method, params, retries - 1)
|
|
raise RuntimeError(f"API {api}.{method} failed: {data}")
|
|
|
|
|
|
async def download_file(client: httpx.AsyncClient, post_id: int) -> bytes:
|
|
global sid
|
|
resp = await client.get(f"{DSM_HOST}/webapi/entry.cgi", params={
|
|
"api": "SYNO.Chat.Post.File", "version": 2, "method": "get",
|
|
"post_id": post_id, "_sid": sid,
|
|
}, timeout=30)
|
|
if resp.status_code != 200:
|
|
raise RuntimeError(f"File download failed: HTTP {resp.status_code}")
|
|
ct = resp.headers.get("content-type", "")
|
|
if "json" in ct:
|
|
data = resp.json()
|
|
if not data.get("success"):
|
|
raise RuntimeError(f"File download API error: {data}")
|
|
return resp.content
|
|
|
|
|
|
def extract_posts(data) -> list:
|
|
if isinstance(data, list):
|
|
return data
|
|
if isinstance(data, dict):
|
|
for key in ("posts", "data"):
|
|
if key in data and isinstance(data[key], list):
|
|
return data[key]
|
|
return []
|
|
|
|
|
|
async def send_chat_ack():
|
|
if not SYNOLOGY_CHAT_WEBHOOK_URL:
|
|
logger.warning("No SYNOLOGY_CHAT_WEBHOOK_URL, skipping ack")
|
|
return
|
|
async with httpx.AsyncClient(verify=False) as client:
|
|
payload = json.dumps(
|
|
{"text": "\U0001f4f7 사진이 확인되었습니다. 설명을 입력해주세요."})
|
|
await client.post(SYNOLOGY_CHAT_WEBHOOK_URL,
|
|
data={"payload": payload}, timeout=10)
|
|
|
|
|
|
async def poll_channel(client: httpx.AsyncClient):
|
|
global last_seen_post_id
|
|
try:
|
|
data = await api_call(client, "SYNO.Chat.Post", 8, "list",
|
|
{"channel_id": CHAT_CHANNEL_ID, "limit": 10})
|
|
posts = extract_posts(data)
|
|
|
|
now_ms = int(time.time() * 1000)
|
|
expired = [uid for uid, info in pending_photos.items()
|
|
if now_ms - info["create_at"] > 300_000]
|
|
for uid in expired:
|
|
del pending_photos[uid]
|
|
logger.info(f"Expired pending photo for user_id={uid}")
|
|
|
|
for post in posts:
|
|
post_id = post.get("post_id", 0)
|
|
if post_id <= last_seen_post_id:
|
|
continue
|
|
if (post.get("type") == "file"
|
|
and post.get("file_props", {}).get("is_image")):
|
|
user_id = post.get("creator_id", 0)
|
|
pending_photos[user_id] = {
|
|
"post_id": post_id,
|
|
"create_at": post.get("create_at", now_ms),
|
|
"filename": post.get("file_props", {}).get("name", "unknown"),
|
|
}
|
|
logger.info(f"Photo detected: post_id={post_id} user_id={user_id} "
|
|
f"file={pending_photos[user_id]['filename']}")
|
|
await send_chat_ack()
|
|
|
|
if posts:
|
|
max_id = max(p.get("post_id", 0) for p in posts)
|
|
if max_id > last_seen_post_id:
|
|
last_seen_post_id = max_id
|
|
except Exception as e:
|
|
logger.error(f"Poll error: {e}")
|
|
|
|
|
|
async def polling_loop():
|
|
async with httpx.AsyncClient(verify=False) as client:
|
|
# Login
|
|
for attempt in range(3):
|
|
try:
|
|
await dsm_login(client)
|
|
break
|
|
except Exception as e:
|
|
logger.error(f"DSM login attempt {attempt+1} failed: {e}")
|
|
if attempt == 2:
|
|
logger.error("All login attempts failed, polling disabled")
|
|
return
|
|
await asyncio.sleep(5)
|
|
|
|
# Initialize last_seen_post_id
|
|
global last_seen_post_id
|
|
try:
|
|
data = await api_call(client, "SYNO.Chat.Post", 8, "list",
|
|
{"channel_id": CHAT_CHANNEL_ID, "limit": 5})
|
|
posts = extract_posts(data)
|
|
if posts:
|
|
last_seen_post_id = max(p.get("post_id", 0) for p in posts)
|
|
logger.info(f"Initialized last_seen_post_id={last_seen_post_id}")
|
|
except Exception as e:
|
|
logger.warning(f"Failed to init last_seen_post_id: {e}")
|
|
|
|
# Poll loop
|
|
while True:
|
|
await poll_channel(client)
|
|
await asyncio.sleep(POLL_INTERVAL)
|
|
|
|
|
|
@asynccontextmanager
|
|
async def lifespan(app: FastAPI):
|
|
task = asyncio.create_task(polling_loop())
|
|
yield
|
|
task.cancel()
|
|
try:
|
|
await task
|
|
except asyncio.CancelledError:
|
|
pass
|
|
|
|
|
|
app = FastAPI(lifespan=lifespan)
|
|
|
|
|
|
def is_heic(data: bytes, filename: str) -> bool:
|
|
if filename.lower().endswith((".heic", ".heif")):
|
|
return True
|
|
if len(data) >= 12:
|
|
ftyp = data[4:12].decode("ascii", errors="ignore")
|
|
if "ftyp" in ftyp and any(x in ftyp for x in ["heic", "heix", "mif1"]):
|
|
return True
|
|
return False
|
|
|
|
|
|
@app.post("/chat/recent-photo")
|
|
async def recent_photo(request: Request):
|
|
body = await request.json()
|
|
channel_id = body.get("channel_id", CHAT_CHANNEL_ID)
|
|
user_id = body.get("user_id", 0)
|
|
before_timestamp = body.get("before_timestamp", int(time.time() * 1000))
|
|
|
|
# 1. Check pending_photos (polling already detected)
|
|
photo_info = pending_photos.get(user_id)
|
|
|
|
# 2. Fallback: search via API
|
|
if not photo_info:
|
|
try:
|
|
async with httpx.AsyncClient(verify=False) as client:
|
|
if not sid:
|
|
await dsm_login(client)
|
|
data = await api_call(client, "SYNO.Chat.Post", 8, "list",
|
|
{"channel_id": channel_id, "limit": 20})
|
|
posts = extract_posts(data)
|
|
now_ms = int(time.time() * 1000)
|
|
for post in sorted(posts,
|
|
key=lambda p: p.get("create_at", 0),
|
|
reverse=True):
|
|
if (post.get("type") == "file"
|
|
and post.get("file_props", {}).get("is_image")
|
|
and post.get("creator_id") == user_id
|
|
and post.get("create_at", 0) < before_timestamp
|
|
and now_ms - post.get("create_at", 0) < 300_000):
|
|
photo_info = {
|
|
"post_id": post["post_id"],
|
|
"create_at": post["create_at"],
|
|
"filename": post.get("file_props", {}).get("name",
|
|
"unknown"),
|
|
}
|
|
logger.info(f"Fallback found photo: post_id={post['post_id']}")
|
|
break
|
|
except Exception as e:
|
|
logger.error(f"Fallback search error: {e}")
|
|
|
|
if not photo_info:
|
|
return JSONResponse({"found": False})
|
|
|
|
# 3. Download
|
|
try:
|
|
async with httpx.AsyncClient(verify=False) as client:
|
|
if not sid:
|
|
await dsm_login(client)
|
|
file_data = await download_file(client, photo_info["post_id"])
|
|
except Exception as e:
|
|
logger.error(f"File download error: {e}")
|
|
return JSONResponse({"found": False, "error": str(e)})
|
|
|
|
# 4. HEIC conversion if needed
|
|
filename = photo_info.get("filename", "unknown")
|
|
fmt = "jpeg"
|
|
b64 = base64.b64encode(file_data).decode()
|
|
|
|
if is_heic(file_data, filename):
|
|
try:
|
|
async with httpx.AsyncClient() as client:
|
|
conv_resp = await client.post(
|
|
f"{HEIC_CONVERTER_URL}/convert/heic-to-jpeg",
|
|
json={"base64": b64}, timeout=30)
|
|
conv_data = conv_resp.json()
|
|
b64 = conv_data["base64"]
|
|
fmt = "jpeg"
|
|
logger.info(f"HEIC converted: {filename}")
|
|
except Exception as e:
|
|
logger.warning(f"HEIC conversion failed: {e}, returning raw")
|
|
fmt = "heic"
|
|
|
|
# 5. Consume pending photo
|
|
if user_id in pending_photos:
|
|
del pending_photos[user_id]
|
|
|
|
return JSONResponse({
|
|
"found": True,
|
|
"base64": b64,
|
|
"format": fmt,
|
|
"filename": filename,
|
|
"size": len(file_data),
|
|
})
|
|
|
|
|
|
@app.get("/health")
|
|
async def health():
|
|
return {
|
|
"status": "ok",
|
|
"sid_active": bool(sid),
|
|
"last_seen_post_id": last_seen_post_id,
|
|
"pending_photos": {
|
|
str(uid): info["filename"]
|
|
for uid, info in pending_photos.items()
|
|
},
|
|
}
|