diff --git a/.env.example b/.env.example index ac652be..81da44f 100644 --- a/.env.example +++ b/.env.example @@ -60,6 +60,14 @@ IMAP_SSL=true KARAKEEP_URL=http://192.168.1.227:3000 KARAKEEP_API_KEY=changeme +# 회고 시스템 (chat_bridge.py → n8n retrospect pipeline) +# Synology Chat #회고 채널 ID (0이면 비활성) +RETROSPECT_CHANNEL_ID=0 +# #회고 채널 Incoming Webhook URL (확인 메시지 발송용) +RETROSPECT_CHAT_WEBHOOK_URL=https://your-nas:5001/webapi/entry.cgi?api=SYNO.Chat.External&method=incoming&version=2&token=RETRO_TOKEN +# n8n retrospect webhook (chat_bridge → n8n 포워딩) +N8N_RETROSPECT_WEBHOOK_URL=http://localhost:5678/webhook/retrospect + # Intent Service — Claude API fallback 월간 예산 (USD) API_MONTHLY_LIMIT=10.00 diff --git a/CLAUDE.md b/CLAUDE.md index 70bfc49..de936fc 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -43,10 +43,11 @@ bot-n8n (맥미니 Docker) — 51노드 파이프라인 별도 워크플로우: Mail Processing Pipeline (9노드) — mail_bridge 날짜 기반 조회 → dedup → 분류 → mail_logs + Retrospect Capture Pipeline (8노드) — chat_bridge 텍스트 포워딩 → Qwen 분류 → retrospect.entries 저장 → Chat 확인 네이티브 서비스 (맥미니): heic_converter (:8090) — HEIC→JPEG 변환 (macOS sips) - chat_bridge (:8091) — DSM Chat API 브릿지 (사진 폴링/다운로드) + chat_bridge (:8091) — DSM Chat API 브릿지 (사진 폴링/다운로드 + 회고 채널 텍스트 폴링) caldav_bridge (:8092) — CalDAV REST 래퍼 (Synology Calendar, VEVENT+VTODO) mail_bridge (:8094) — IMAP 날짜 기반 메일 조회 (MailPlus) kb_writer (:8095) — 마크다운 KB 저장 @@ -71,7 +72,7 @@ NAS (192.168.1.227): | Ollama (GPU) | 192.168.1.186 (RTX 4070Ti Super) | 11434 | id-9b:latest (이드 특화 분류+응답), qwen3.5:9b-q8_0 (레거시 백업) | | Claude Haiku Vision | Anthropic API | — | 사진 분석+구조화 (field_report, log_event) | | heic_converter | 네이티브 (맥미니) | 8090 | HEIC→JPEG 변환 (macOS sips) | -| chat_bridge | 네이티브 (맥미니) | 8091 | DSM Chat API 브릿지 (사진 폴링/다운로드) | +| chat_bridge | 네이티브 (맥미니) | 8091 | DSM Chat API 브릿지 (사진 폴링/다운로드 + 회고 텍스트 폴링) | | caldav_bridge | 네이티브 (맥미니) | 8092 | CalDAV REST 래퍼 (Synology Calendar) | | mail_bridge | 네이티브 (맥미니) | 8094 | IMAP 날짜 기반 메일 조회 (MailPlus) | | kb_writer | 네이티브 (맥미니) | 8095 | 마크다운 KB 저장 | @@ -105,6 +106,7 @@ NAS (192.168.1.227): **기존**: `ai_configs`, `routing_rules`, `prompts`, `chat_logs`, `mail_accounts` **신규**: `document_ingestion_log`, `field_reports`, `classification_logs`, `mail_logs`, `calendar_events` (+caldav_uid, +description, +created_by), `report_cache`, `api_usage_monthly`, `news_digest_log` +**회고**: `retrospect.entries`, `retrospect.reviews`, `retrospect.patterns` (별도 스키마) 상세 스키마는 [docs/architecture.md](docs/architecture.md) 참조. diff --git a/chat_bridge.py b/chat_bridge.py index 248e9c5..d80d456 100644 --- a/chat_bridge.py +++ b/chat_bridge.py @@ -1,4 +1,4 @@ -"""DSM Chat API Bridge — 사진 폴링 + 다운로드 서비스 (port 8091)""" +"""DSM Chat API Bridge — 사진 폴링 + 회고 텍스트 포워딩 서비스 (port 8091)""" import asyncio import base64 @@ -25,10 +25,15 @@ CHAT_CHANNEL_ID = int(os.getenv("CHAT_CHANNEL_ID", "17")) SYNOLOGY_CHAT_WEBHOOK_URL = os.getenv("SYNOLOGY_CHAT_WEBHOOK_URL", "") HEIC_CONVERTER_URL = os.getenv("HEIC_CONVERTER_URL", "http://127.0.0.1:8090") POLL_INTERVAL = int(os.getenv("POLL_INTERVAL", "5")) +RETROSPECT_CHANNEL_ID = int(os.getenv("RETROSPECT_CHANNEL_ID", "0")) +RETROSPECT_CHAT_WEBHOOK_URL = os.getenv("RETROSPECT_CHAT_WEBHOOK_URL", "") +N8N_RETROSPECT_WEBHOOK_URL = os.getenv("N8N_RETROSPECT_WEBHOOK_URL", + "http://localhost:5678/webhook/retrospect") # State sid: str = "" last_seen_post_id: int = 0 +retro_last_seen_post_id: int = 0 pending_photos: dict[int, dict] = {} # user_id -> {post_id, create_at, filename} @@ -140,6 +145,47 @@ async def poll_channel(client: httpx.AsyncClient): logger.error(f"Poll error: {e}") +async def forward_to_n8n(post: dict): + payload = { + "text": post.get("msg", ""), + "user_id": post.get("creator_id", 0), + "username": post.get("display_name", post.get("username", "unknown")), + "post_id": post.get("post_id", 0), + "timestamp": post.get("create_at", 0), + } + try: + async with httpx.AsyncClient(verify=False) as client: + resp = await client.post(N8N_RETROSPECT_WEBHOOK_URL, + json=payload, timeout=10) + logger.info(f"Forwarded retrospect post_id={post.get('post_id')} " + f"to n8n: {resp.status_code}") + except Exception as e: + logger.error(f"Failed to forward to n8n: {e}") + + +async def poll_retrospect_channel(client: httpx.AsyncClient): + global retro_last_seen_post_id + if not RETROSPECT_CHANNEL_ID: + return + try: + data = await api_call(client, "SYNO.Chat.Post", 8, "list", + {"channel_id": RETROSPECT_CHANNEL_ID, "limit": 10}) + posts = extract_posts(data) + for post in posts: + post_id = post.get("post_id", 0) + if post_id <= retro_last_seen_post_id: + continue + # 텍스트 메시지만 포워딩 (파일/시스템 메시지 제외) + if post.get("type", "normal") == "normal" and post.get("msg", "").strip(): + await forward_to_n8n(post) + if posts: + max_id = max(p.get("post_id", 0) for p in posts) + if max_id > retro_last_seen_post_id: + retro_last_seen_post_id = max_id + except Exception as e: + logger.error(f"Retrospect poll error: {e}") + + async def polling_loop(): async with httpx.AsyncClient(verify=False) as client: # Login @@ -155,7 +201,7 @@ async def polling_loop(): await asyncio.sleep(5) # Initialize last_seen_post_id - global last_seen_post_id + global last_seen_post_id, retro_last_seen_post_id try: data = await api_call(client, "SYNO.Chat.Post", 8, "list", {"channel_id": CHAT_CHANNEL_ID, "limit": 5}) @@ -166,9 +212,23 @@ async def polling_loop(): except Exception as e: logger.warning(f"Failed to init last_seen_post_id: {e}") + # Initialize retro_last_seen_post_id + if RETROSPECT_CHANNEL_ID: + try: + data = await api_call(client, "SYNO.Chat.Post", 8, "list", + {"channel_id": RETROSPECT_CHANNEL_ID, "limit": 1}) + posts = extract_posts(data) + if posts: + retro_last_seen_post_id = max(p.get("post_id", 0) for p in posts) + logger.info(f"Initialized retro_last_seen_post_id={retro_last_seen_post_id}") + except Exception as e: + logger.warning(f"Failed to init retro_last_seen_post_id: {e}") + # Poll loop while True: await poll_channel(client) + await asyncio.sleep(0.5) # DSM API 호출 간격 + await poll_retrospect_channel(client) await asyncio.sleep(POLL_INTERVAL) @@ -286,6 +346,8 @@ async def health(): "status": "ok", "sid_active": bool(sid), "last_seen_post_id": last_seen_post_id, + "retro_last_seen_post_id": retro_last_seen_post_id, + "retro_channel_id": RETROSPECT_CHANNEL_ID, "pending_photos": { str(uid): info["filename"] for uid, info in pending_photos.items() diff --git a/docker-compose.yml b/docker-compose.yml index 74887e3..44a43c4 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -33,6 +33,7 @@ services: - CALDAV_BRIDGE_URL=http://host.docker.internal:8092 - MAIL_BRIDGE_URL=http://host.docker.internal:8094 - KB_WRITER_URL=http://host.docker.internal:8095 + - RETROSPECT_CHAT_WEBHOOK_URL=${RETROSPECT_CHAT_WEBHOOK_URL} - NODE_FUNCTION_ALLOW_BUILTIN=crypto,http,https,url volumes: - ./n8n/data:/home/node/.n8n diff --git a/init/migrate-v6.sql b/init/migrate-v6.sql new file mode 100644 index 0000000..113ffef --- /dev/null +++ b/init/migrate-v6.sql @@ -0,0 +1,50 @@ +-- migrate-v6.sql: 회고 시스템 (retrospect) 스키마 + 테이블 +-- 실행: docker exec -i bot-postgres psql -U bot -d chatbot < init/migrate-v6.sql + +CREATE SCHEMA IF NOT EXISTS retrospect; + +CREATE TABLE IF NOT EXISTS retrospect.entries ( + id BIGSERIAL PRIMARY KEY, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + raw_text TEXT NOT NULL, + domain VARCHAR(20) NOT NULL, -- work | health | finance + entry_type VARCHAR(20) NOT NULL, -- event | reflection | regret | log + sentiment VARCHAR(10) NOT NULL, -- positive | neutral | negative + tags TEXT[] DEFAULT '{}', + confidence REAL DEFAULT 0.0, + source VARCHAR(20) DEFAULT 'chat', + reviewed BOOLEAN DEFAULT FALSE, + source_post_id INTEGER, + username VARCHAR(100) +); + +CREATE INDEX IF NOT EXISTS idx_entries_created ON retrospect.entries (created_at DESC); +CREATE INDEX IF NOT EXISTS idx_entries_domain ON retrospect.entries (domain); +CREATE INDEX IF NOT EXISTS idx_entries_type ON retrospect.entries (entry_type); +CREATE INDEX IF NOT EXISTS idx_entries_reviewed ON retrospect.entries (reviewed) WHERE NOT reviewed; +CREATE INDEX IF NOT EXISTS idx_entries_tags ON retrospect.entries USING GIN(tags); +-- 중복 방지: chat_bridge 재시작 시 같은 post 재처리 방지 +CREATE UNIQUE INDEX IF NOT EXISTS idx_entries_post_id ON retrospect.entries (source_post_id) + WHERE source_post_id IS NOT NULL; + +CREATE TABLE IF NOT EXISTS retrospect.reviews ( + id BIGSERIAL PRIMARY KEY, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + review_type VARCHAR(10) NOT NULL, -- daily | weekly | monthly + period_start DATE NOT NULL, + period_end DATE NOT NULL, + content TEXT NOT NULL, + entry_ids BIGINT[] DEFAULT '{}', + model_used VARCHAR(50) NOT NULL +); + +CREATE TABLE IF NOT EXISTS retrospect.patterns ( + id BIGSERIAL PRIMARY KEY, + discovered_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + domain VARCHAR(20) NOT NULL, + pattern_type VARCHAR(20) NOT NULL, -- recurring_mistake | habit | correlation | improvement + description TEXT NOT NULL, + occurrences INTEGER DEFAULT 1, + last_seen DATE NOT NULL, + status VARCHAR(20) DEFAULT 'active' +); diff --git a/n8n/workflows/retrospect-capture-pipeline.json b/n8n/workflows/retrospect-capture-pipeline.json new file mode 100644 index 0000000..840f0f1 --- /dev/null +++ b/n8n/workflows/retrospect-capture-pipeline.json @@ -0,0 +1,223 @@ +{ + "id": "mPretrocapture001", + "name": "회고 캡처 파이프라인", + "nodes": [ + { + "parameters": { + "httpMethod": "POST", + "path": "retrospect", + "responseMode": "responseNode", + "options": {} + }, + "id": "r1000001-0000-0000-0000-000000000001", + "name": "Webhook", + "type": "n8n-nodes-base.webhook", + "typeVersion": 2, + "position": [ + 0, + 300 + ], + "webhookId": "retrospect" + }, + { + "parameters": { + "jsCode": "const body = $input.first().json.body || $input.first().json;\nconst text = (body.text || '').trim();\nconst postId = body.post_id || 0;\nconst username = body.username || 'unknown';\nconst userId = body.user_id || 0;\nconst timestamp = body.timestamp || 0;\n\nconst valid = text.length > 0;\n\nreturn [{ json: { text, post_id: postId, username, user_id: userId, timestamp, valid } }];" + }, + "id": "r1000001-0000-0000-0000-000000000002", + "name": "Validate", + "type": "n8n-nodes-base.code", + "typeVersion": 1, + "position": [ + 220, + 300 + ] + }, + { + "parameters": { + "conditions": { + "options": { + "caseSensitive": true, + "leftValue": "", + "typeValidation": "strict" + }, + "conditions": [ + { + "id": "valid-check", + "leftValue": "={{ $json.valid }}", + "rightValue": true, + "operator": { + "type": "boolean", + "operation": "true" + } + } + ], + "combinator": "and" + }, + "options": {} + }, + "id": "r1000001-0000-0000-0000-000000000003", + "name": "IF Valid?", + "type": "n8n-nodes-base.if", + "typeVersion": 2.2, + "position": [ + 440, + 300 + ] + }, + { + "parameters": { + "jsCode": "function httpPost(url, body, { timeout = 15000, headers = {} } = {}) {\n return new Promise((resolve, reject) => {\n const data = JSON.stringify(body);\n const u = require('url').parse(url);\n const mod = require(u.protocol === 'https:' ? 'https' : 'http');\n const req = mod.request({\n hostname: u.hostname, port: u.port, path: u.path,\n method: 'POST',\n headers: { 'Content-Type': 'application/json', 'Content-Length': Buffer.byteLength(data), ...headers }\n }, (res) => {\n let body = '';\n res.on('data', c => body += c);\n res.on('end', () => {\n if (res.statusCode >= 400) return reject(new Error(url + ' \\u2192 ' + res.statusCode + ': ' + body.slice(0, 200)));\n try { resolve(JSON.parse(body)); } catch(e) { reject(new Error('JSON parse error: ' + body.slice(0, 200))); }\n });\n });\n req.on('error', reject);\n req.setTimeout(timeout, () => { req.destroy(); reject(new Error(url + ' \\u2192 timeout after ' + timeout + 'ms')); });\n req.write(data);\n req.end();\n });\n}\n\nconst item = $input.first().json;\nconst text = item.text;\n\nconst prompt = `다음 텍스트를 분류하세요. JSON만 출력.\n\n분류 기준:\n- domain: work (업무, 회의, 프로젝트) | health (건강, 운동, 식사, 수면) | finance (지출, 투자, 예산)\n- entry_type: event (일어난 일) | reflection (생각, 깨달음) | regret (후회, 아쉬움) | log (단순 기록)\n- sentiment: positive | neutral | negative\n- tags: 관련 키워드 배열 (2~5개, 한국어)\n- confidence: 0.0~1.0 (분류 확신도)\n\n출력 형식:\n{\"domain\": \"...\", \"entry_type\": \"...\", \"sentiment\": \"...\", \"tags\": [\"...\"], \"confidence\": 0.0}\n\n텍스트: ${text}`;\n\ntry {\n const r = await httpPost(`${$env.GPU_OLLAMA_URL}/api/generate`,\n { model: 'id-9b:latest', system: '/no_think', prompt, stream: false, format: 'json', think: false },\n { timeout: 15000 }\n );\n const cls = JSON.parse(r.response);\n const validDomains = ['work', 'health', 'finance'];\n const validTypes = ['event', 'reflection', 'regret', 'log'];\n const validSentiments = ['positive', 'neutral', 'negative'];\n return [{ json: {\n ...item,\n domain: validDomains.includes(cls.domain) ? cls.domain : 'work',\n entry_type: validTypes.includes(cls.entry_type) ? cls.entry_type : 'log',\n sentiment: validSentiments.includes(cls.sentiment) ? cls.sentiment : 'neutral',\n tags: Array.isArray(cls.tags) ? cls.tags.slice(0, 5) : [],\n confidence: typeof cls.confidence === 'number' ? Math.min(1, Math.max(0, cls.confidence)) : 0.5,\n gpu_ok: true\n } }];\n} catch(e) {\n // GPU 타임아웃/장애 시 기본값으로 저장 (엔트리 유실 금지)\n return [{ json: {\n ...item,\n domain: 'work',\n entry_type: 'log',\n sentiment: 'neutral',\n tags: [],\n confidence: 0.0,\n gpu_ok: false\n } }];\n}" + }, + "id": "r1000001-0000-0000-0000-000000000004", + "name": "Qwen Classify", + "type": "n8n-nodes-base.code", + "typeVersion": 1, + "position": [ + 660, + 200 + ] + }, + { + "parameters": { + "operation": "executeQuery", + "query": "=INSERT INTO retrospect.entries (raw_text, domain, entry_type, sentiment, tags, confidence, source_post_id, username) VALUES ('{{ $json.text.replace(/'/g, \"''\") }}', '{{ $json.domain }}', '{{ $json.entry_type }}', '{{ $json.sentiment }}', '{{ \"{\" + ($json.tags || []).map(t => '\"' + t.replace(/\"/g, '') + '\"').join(',') + \"}\" }}', {{ $json.confidence }}, {{ $json.post_id || 'NULL' }}, '{{ ($json.username || \"unknown\").replace(/'/g, \"''\") }}') ON CONFLICT (source_post_id) WHERE source_post_id IS NOT NULL DO NOTHING", + "options": {} + }, + "id": "r1000001-0000-0000-0000-000000000005", + "name": "PostgreSQL INSERT", + "type": "n8n-nodes-base.postgres", + "typeVersion": 2.5, + "position": [ + 880, + 200 + ], + "credentials": { + "postgres": { + "id": "KaxU8iKtraFfsrTF", + "name": "bot-postgres" + } + } + }, + { + "parameters": { + "jsCode": "function httpPost(url, body, { timeout = 10000, headers = {} } = {}) {\n return new Promise((resolve, reject) => {\n const data = JSON.stringify(body);\n const u = require('url').parse(url);\n const mod = require(u.protocol === 'https:' ? 'https' : 'http');\n const options = {\n hostname: u.hostname, port: u.port, path: u.path,\n method: 'POST',\n headers: { 'Content-Type': 'application/json', 'Content-Length': Buffer.byteLength(data), ...headers },\n rejectUnauthorized: false\n };\n const req = mod.request(options, (res) => {\n let body = '';\n res.on('data', c => body += c);\n res.on('end', () => resolve({ statusCode: res.statusCode, body }));\n });\n req.on('error', reject);\n req.setTimeout(timeout, () => { req.destroy(); reject(new Error('timeout')); });\n req.write(data);\n req.end();\n });\n}\n\nconst item = $input.first().json;\nconst webhookUrl = $env.RETROSPECT_CHAT_WEBHOOK_URL || '';\n\nif (webhookUrl) {\n const tagsStr = (item.tags || []).map(t => '#' + t).join(' ');\n const msg = `\\u2713 [${item.domain}] ${tagsStr}`;\n try {\n await httpPost(webhookUrl, { text: msg }, { timeout: 10000 });\n } catch(e) {\n console.log('Chat ack failed: ' + e.message);\n }\n}\n\nreturn [{ json: { ...item, ack_sent: true } }];" + }, + "id": "r1000001-0000-0000-0000-000000000006", + "name": "Chat Confirm", + "type": "n8n-nodes-base.code", + "typeVersion": 1, + "position": [ + 1100, + 200 + ] + }, + { + "parameters": { + "respondWith": "json", + "responseBody": "={ \"status\": \"ok\" }", + "options": {} + }, + "id": "r1000001-0000-0000-0000-000000000007", + "name": "Respond 200", + "type": "n8n-nodes-base.respondToWebhook", + "typeVersion": 1.1, + "position": [ + 1320, + 200 + ] + }, + { + "parameters": { + "respondWith": "json", + "responseBody": "={ \"status\": \"skipped\" }", + "options": {} + }, + "id": "r1000001-0000-0000-0000-000000000008", + "name": "Respond Skip", + "type": "n8n-nodes-base.respondToWebhook", + "typeVersion": 1.1, + "position": [ + 660, + 420 + ] + } + ], + "connections": { + "Webhook": { + "main": [ + [ + { + "node": "Validate", + "type": "main", + "index": 0 + } + ] + ] + }, + "Validate": { + "main": [ + [ + { + "node": "IF Valid?", + "type": "main", + "index": 0 + } + ] + ] + }, + "IF Valid?": { + "main": [ + [ + { + "node": "Qwen Classify", + "type": "main", + "index": 0 + } + ], + [ + { + "node": "Respond Skip", + "type": "main", + "index": 0 + } + ] + ] + }, + "Qwen Classify": { + "main": [ + [ + { + "node": "PostgreSQL INSERT", + "type": "main", + "index": 0 + } + ] + ] + }, + "PostgreSQL INSERT": { + "main": [ + [ + { + "node": "Chat Confirm", + "type": "main", + "index": 0 + } + ] + ] + }, + "Chat Confirm": { + "main": [ + [ + { + "node": "Respond 200", + "type": "main", + "index": 0 + } + ] + ] + } + }, + "settings": { + "executionOrder": "v1" + } +}