feat: 회고 시스템 Phase 1 캡처 파이프라인

chat_bridge에 회고 채널 텍스트 폴링 + n8n 포워딩 추가.
n8n 워크플로우(8노드): Webhook → Validate → Qwen 분류 → PostgreSQL INSERT → Chat 확인.
retrospect 스키마 + 3 테이블 (entries, reviews, patterns).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Hyungi Ahn
2026-03-19 14:57:13 +09:00
parent 5a067de189
commit de5dde43ab
6 changed files with 350 additions and 4 deletions

View File

@@ -60,6 +60,14 @@ IMAP_SSL=true
KARAKEEP_URL=http://192.168.1.227:3000
KARAKEEP_API_KEY=changeme
# 회고 시스템 (chat_bridge.py → n8n retrospect pipeline)
# Synology Chat #회고 채널 ID (0이면 비활성)
RETROSPECT_CHANNEL_ID=0
# #회고 채널 Incoming Webhook URL (확인 메시지 발송용)
RETROSPECT_CHAT_WEBHOOK_URL=https://your-nas:5001/webapi/entry.cgi?api=SYNO.Chat.External&method=incoming&version=2&token=RETRO_TOKEN
# n8n retrospect webhook (chat_bridge → n8n 포워딩)
N8N_RETROSPECT_WEBHOOK_URL=http://localhost:5678/webhook/retrospect
# Intent Service — Claude API fallback 월간 예산 (USD)
API_MONTHLY_LIMIT=10.00

View File

@@ -43,10 +43,11 @@ bot-n8n (맥미니 Docker) — 51노드 파이프라인
별도 워크플로우:
Mail Processing Pipeline (9노드) — mail_bridge 날짜 기반 조회 → dedup → 분류 → mail_logs
Retrospect Capture Pipeline (8노드) — chat_bridge 텍스트 포워딩 → Qwen 분류 → retrospect.entries 저장 → Chat 확인
네이티브 서비스 (맥미니):
heic_converter (:8090) — HEIC→JPEG 변환 (macOS sips)
chat_bridge (:8091) — DSM Chat API 브릿지 (사진 폴링/다운로드)
chat_bridge (:8091) — DSM Chat API 브릿지 (사진 폴링/다운로드 + 회고 채널 텍스트 폴링)
caldav_bridge (:8092) — CalDAV REST 래퍼 (Synology Calendar, VEVENT+VTODO)
mail_bridge (:8094) — IMAP 날짜 기반 메일 조회 (MailPlus)
kb_writer (:8095) — 마크다운 KB 저장
@@ -71,7 +72,7 @@ NAS (192.168.1.227):
| Ollama (GPU) | 192.168.1.186 (RTX 4070Ti Super) | 11434 | id-9b:latest (이드 특화 분류+응답), qwen3.5:9b-q8_0 (레거시 백업) |
| Claude Haiku Vision | Anthropic API | — | 사진 분석+구조화 (field_report, log_event) |
| heic_converter | 네이티브 (맥미니) | 8090 | HEIC→JPEG 변환 (macOS sips) |
| chat_bridge | 네이티브 (맥미니) | 8091 | DSM Chat API 브릿지 (사진 폴링/다운로드) |
| chat_bridge | 네이티브 (맥미니) | 8091 | DSM Chat API 브릿지 (사진 폴링/다운로드 + 회고 텍스트 폴링) |
| caldav_bridge | 네이티브 (맥미니) | 8092 | CalDAV REST 래퍼 (Synology Calendar) |
| mail_bridge | 네이티브 (맥미니) | 8094 | IMAP 날짜 기반 메일 조회 (MailPlus) |
| kb_writer | 네이티브 (맥미니) | 8095 | 마크다운 KB 저장 |
@@ -105,6 +106,7 @@ NAS (192.168.1.227):
**기존**: `ai_configs`, `routing_rules`, `prompts`, `chat_logs`, `mail_accounts`
**신규**: `document_ingestion_log`, `field_reports`, `classification_logs`, `mail_logs`, `calendar_events` (+caldav_uid, +description, +created_by), `report_cache`, `api_usage_monthly`, `news_digest_log`
**회고**: `retrospect.entries`, `retrospect.reviews`, `retrospect.patterns` (별도 스키마)
상세 스키마는 [docs/architecture.md](docs/architecture.md) 참조.

View File

@@ -1,4 +1,4 @@
"""DSM Chat API Bridge — 사진 폴링 + 다운로드 서비스 (port 8091)"""
"""DSM Chat API Bridge — 사진 폴링 + 회고 텍스트 포워딩 서비스 (port 8091)"""
import asyncio
import base64
@@ -25,10 +25,15 @@ CHAT_CHANNEL_ID = int(os.getenv("CHAT_CHANNEL_ID", "17"))
SYNOLOGY_CHAT_WEBHOOK_URL = os.getenv("SYNOLOGY_CHAT_WEBHOOK_URL", "")
HEIC_CONVERTER_URL = os.getenv("HEIC_CONVERTER_URL", "http://127.0.0.1:8090")
POLL_INTERVAL = int(os.getenv("POLL_INTERVAL", "5"))
RETROSPECT_CHANNEL_ID = int(os.getenv("RETROSPECT_CHANNEL_ID", "0"))
RETROSPECT_CHAT_WEBHOOK_URL = os.getenv("RETROSPECT_CHAT_WEBHOOK_URL", "")
N8N_RETROSPECT_WEBHOOK_URL = os.getenv("N8N_RETROSPECT_WEBHOOK_URL",
"http://localhost:5678/webhook/retrospect")
# State
sid: str = ""
last_seen_post_id: int = 0
retro_last_seen_post_id: int = 0
pending_photos: dict[int, dict] = {} # user_id -> {post_id, create_at, filename}
@@ -140,6 +145,47 @@ async def poll_channel(client: httpx.AsyncClient):
logger.error(f"Poll error: {e}")
async def forward_to_n8n(post: dict):
payload = {
"text": post.get("msg", ""),
"user_id": post.get("creator_id", 0),
"username": post.get("display_name", post.get("username", "unknown")),
"post_id": post.get("post_id", 0),
"timestamp": post.get("create_at", 0),
}
try:
async with httpx.AsyncClient(verify=False) as client:
resp = await client.post(N8N_RETROSPECT_WEBHOOK_URL,
json=payload, timeout=10)
logger.info(f"Forwarded retrospect post_id={post.get('post_id')} "
f"to n8n: {resp.status_code}")
except Exception as e:
logger.error(f"Failed to forward to n8n: {e}")
async def poll_retrospect_channel(client: httpx.AsyncClient):
global retro_last_seen_post_id
if not RETROSPECT_CHANNEL_ID:
return
try:
data = await api_call(client, "SYNO.Chat.Post", 8, "list",
{"channel_id": RETROSPECT_CHANNEL_ID, "limit": 10})
posts = extract_posts(data)
for post in posts:
post_id = post.get("post_id", 0)
if post_id <= retro_last_seen_post_id:
continue
# 텍스트 메시지만 포워딩 (파일/시스템 메시지 제외)
if post.get("type", "normal") == "normal" and post.get("msg", "").strip():
await forward_to_n8n(post)
if posts:
max_id = max(p.get("post_id", 0) for p in posts)
if max_id > retro_last_seen_post_id:
retro_last_seen_post_id = max_id
except Exception as e:
logger.error(f"Retrospect poll error: {e}")
async def polling_loop():
async with httpx.AsyncClient(verify=False) as client:
# Login
@@ -155,7 +201,7 @@ async def polling_loop():
await asyncio.sleep(5)
# Initialize last_seen_post_id
global last_seen_post_id
global last_seen_post_id, retro_last_seen_post_id
try:
data = await api_call(client, "SYNO.Chat.Post", 8, "list",
{"channel_id": CHAT_CHANNEL_ID, "limit": 5})
@@ -166,9 +212,23 @@ async def polling_loop():
except Exception as e:
logger.warning(f"Failed to init last_seen_post_id: {e}")
# Initialize retro_last_seen_post_id
if RETROSPECT_CHANNEL_ID:
try:
data = await api_call(client, "SYNO.Chat.Post", 8, "list",
{"channel_id": RETROSPECT_CHANNEL_ID, "limit": 1})
posts = extract_posts(data)
if posts:
retro_last_seen_post_id = max(p.get("post_id", 0) for p in posts)
logger.info(f"Initialized retro_last_seen_post_id={retro_last_seen_post_id}")
except Exception as e:
logger.warning(f"Failed to init retro_last_seen_post_id: {e}")
# Poll loop
while True:
await poll_channel(client)
await asyncio.sleep(0.5) # DSM API 호출 간격
await poll_retrospect_channel(client)
await asyncio.sleep(POLL_INTERVAL)
@@ -286,6 +346,8 @@ async def health():
"status": "ok",
"sid_active": bool(sid),
"last_seen_post_id": last_seen_post_id,
"retro_last_seen_post_id": retro_last_seen_post_id,
"retro_channel_id": RETROSPECT_CHANNEL_ID,
"pending_photos": {
str(uid): info["filename"]
for uid, info in pending_photos.items()

View File

@@ -33,6 +33,7 @@ services:
- CALDAV_BRIDGE_URL=http://host.docker.internal:8092
- MAIL_BRIDGE_URL=http://host.docker.internal:8094
- KB_WRITER_URL=http://host.docker.internal:8095
- RETROSPECT_CHAT_WEBHOOK_URL=${RETROSPECT_CHAT_WEBHOOK_URL}
- NODE_FUNCTION_ALLOW_BUILTIN=crypto,http,https,url
volumes:
- ./n8n/data:/home/node/.n8n

50
init/migrate-v6.sql Normal file
View File

@@ -0,0 +1,50 @@
-- migrate-v6.sql: 회고 시스템 (retrospect) 스키마 + 테이블
-- 실행: docker exec -i bot-postgres psql -U bot -d chatbot < init/migrate-v6.sql
CREATE SCHEMA IF NOT EXISTS retrospect;
CREATE TABLE IF NOT EXISTS retrospect.entries (
id BIGSERIAL PRIMARY KEY,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
raw_text TEXT NOT NULL,
domain VARCHAR(20) NOT NULL, -- work | health | finance
entry_type VARCHAR(20) NOT NULL, -- event | reflection | regret | log
sentiment VARCHAR(10) NOT NULL, -- positive | neutral | negative
tags TEXT[] DEFAULT '{}',
confidence REAL DEFAULT 0.0,
source VARCHAR(20) DEFAULT 'chat',
reviewed BOOLEAN DEFAULT FALSE,
source_post_id INTEGER,
username VARCHAR(100)
);
CREATE INDEX IF NOT EXISTS idx_entries_created ON retrospect.entries (created_at DESC);
CREATE INDEX IF NOT EXISTS idx_entries_domain ON retrospect.entries (domain);
CREATE INDEX IF NOT EXISTS idx_entries_type ON retrospect.entries (entry_type);
CREATE INDEX IF NOT EXISTS idx_entries_reviewed ON retrospect.entries (reviewed) WHERE NOT reviewed;
CREATE INDEX IF NOT EXISTS idx_entries_tags ON retrospect.entries USING GIN(tags);
-- 중복 방지: chat_bridge 재시작 시 같은 post 재처리 방지
CREATE UNIQUE INDEX IF NOT EXISTS idx_entries_post_id ON retrospect.entries (source_post_id)
WHERE source_post_id IS NOT NULL;
CREATE TABLE IF NOT EXISTS retrospect.reviews (
id BIGSERIAL PRIMARY KEY,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
review_type VARCHAR(10) NOT NULL, -- daily | weekly | monthly
period_start DATE NOT NULL,
period_end DATE NOT NULL,
content TEXT NOT NULL,
entry_ids BIGINT[] DEFAULT '{}',
model_used VARCHAR(50) NOT NULL
);
CREATE TABLE IF NOT EXISTS retrospect.patterns (
id BIGSERIAL PRIMARY KEY,
discovered_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
domain VARCHAR(20) NOT NULL,
pattern_type VARCHAR(20) NOT NULL, -- recurring_mistake | habit | correlation | improvement
description TEXT NOT NULL,
occurrences INTEGER DEFAULT 1,
last_seen DATE NOT NULL,
status VARCHAR(20) DEFAULT 'active'
);

View File

@@ -0,0 +1,223 @@
{
"id": "mPretrocapture001",
"name": "회고 캡처 파이프라인",
"nodes": [
{
"parameters": {
"httpMethod": "POST",
"path": "retrospect",
"responseMode": "responseNode",
"options": {}
},
"id": "r1000001-0000-0000-0000-000000000001",
"name": "Webhook",
"type": "n8n-nodes-base.webhook",
"typeVersion": 2,
"position": [
0,
300
],
"webhookId": "retrospect"
},
{
"parameters": {
"jsCode": "const body = $input.first().json.body || $input.first().json;\nconst text = (body.text || '').trim();\nconst postId = body.post_id || 0;\nconst username = body.username || 'unknown';\nconst userId = body.user_id || 0;\nconst timestamp = body.timestamp || 0;\n\nconst valid = text.length > 0;\n\nreturn [{ json: { text, post_id: postId, username, user_id: userId, timestamp, valid } }];"
},
"id": "r1000001-0000-0000-0000-000000000002",
"name": "Validate",
"type": "n8n-nodes-base.code",
"typeVersion": 1,
"position": [
220,
300
]
},
{
"parameters": {
"conditions": {
"options": {
"caseSensitive": true,
"leftValue": "",
"typeValidation": "strict"
},
"conditions": [
{
"id": "valid-check",
"leftValue": "={{ $json.valid }}",
"rightValue": true,
"operator": {
"type": "boolean",
"operation": "true"
}
}
],
"combinator": "and"
},
"options": {}
},
"id": "r1000001-0000-0000-0000-000000000003",
"name": "IF Valid?",
"type": "n8n-nodes-base.if",
"typeVersion": 2.2,
"position": [
440,
300
]
},
{
"parameters": {
"jsCode": "function httpPost(url, body, { timeout = 15000, headers = {} } = {}) {\n return new Promise((resolve, reject) => {\n const data = JSON.stringify(body);\n const u = require('url').parse(url);\n const mod = require(u.protocol === 'https:' ? 'https' : 'http');\n const req = mod.request({\n hostname: u.hostname, port: u.port, path: u.path,\n method: 'POST',\n headers: { 'Content-Type': 'application/json', 'Content-Length': Buffer.byteLength(data), ...headers }\n }, (res) => {\n let body = '';\n res.on('data', c => body += c);\n res.on('end', () => {\n if (res.statusCode >= 400) return reject(new Error(url + ' \\u2192 ' + res.statusCode + ': ' + body.slice(0, 200)));\n try { resolve(JSON.parse(body)); } catch(e) { reject(new Error('JSON parse error: ' + body.slice(0, 200))); }\n });\n });\n req.on('error', reject);\n req.setTimeout(timeout, () => { req.destroy(); reject(new Error(url + ' \\u2192 timeout after ' + timeout + 'ms')); });\n req.write(data);\n req.end();\n });\n}\n\nconst item = $input.first().json;\nconst text = item.text;\n\nconst prompt = `다음 텍스트를 분류하세요. JSON만 출력.\n\n분류 기준:\n- domain: work (업무, 회의, 프로젝트) | health (건강, 운동, 식사, 수면) | finance (지출, 투자, 예산)\n- entry_type: event (일어난 일) | reflection (생각, 깨달음) | regret (후회, 아쉬움) | log (단순 기록)\n- sentiment: positive | neutral | negative\n- tags: 관련 키워드 배열 (2~5개, 한국어)\n- confidence: 0.0~1.0 (분류 확신도)\n\n출력 형식:\n{\"domain\": \"...\", \"entry_type\": \"...\", \"sentiment\": \"...\", \"tags\": [\"...\"], \"confidence\": 0.0}\n\n텍스트: ${text}`;\n\ntry {\n const r = await httpPost(`${$env.GPU_OLLAMA_URL}/api/generate`,\n { model: 'id-9b:latest', system: '/no_think', prompt, stream: false, format: 'json', think: false },\n { timeout: 15000 }\n );\n const cls = JSON.parse(r.response);\n const validDomains = ['work', 'health', 'finance'];\n const validTypes = ['event', 'reflection', 'regret', 'log'];\n const validSentiments = ['positive', 'neutral', 'negative'];\n return [{ json: {\n ...item,\n domain: validDomains.includes(cls.domain) ? cls.domain : 'work',\n entry_type: validTypes.includes(cls.entry_type) ? cls.entry_type : 'log',\n sentiment: validSentiments.includes(cls.sentiment) ? cls.sentiment : 'neutral',\n tags: Array.isArray(cls.tags) ? cls.tags.slice(0, 5) : [],\n confidence: typeof cls.confidence === 'number' ? Math.min(1, Math.max(0, cls.confidence)) : 0.5,\n gpu_ok: true\n } }];\n} catch(e) {\n // GPU 타임아웃/장애 시 기본값으로 저장 (엔트리 유실 금지)\n return [{ json: {\n ...item,\n domain: 'work',\n entry_type: 'log',\n sentiment: 'neutral',\n tags: [],\n confidence: 0.0,\n gpu_ok: false\n } }];\n}"
},
"id": "r1000001-0000-0000-0000-000000000004",
"name": "Qwen Classify",
"type": "n8n-nodes-base.code",
"typeVersion": 1,
"position": [
660,
200
]
},
{
"parameters": {
"operation": "executeQuery",
"query": "=INSERT INTO retrospect.entries (raw_text, domain, entry_type, sentiment, tags, confidence, source_post_id, username) VALUES ('{{ $json.text.replace(/'/g, \"''\") }}', '{{ $json.domain }}', '{{ $json.entry_type }}', '{{ $json.sentiment }}', '{{ \"{\" + ($json.tags || []).map(t => '\"' + t.replace(/\"/g, '') + '\"').join(',') + \"}\" }}', {{ $json.confidence }}, {{ $json.post_id || 'NULL' }}, '{{ ($json.username || \"unknown\").replace(/'/g, \"''\") }}') ON CONFLICT (source_post_id) WHERE source_post_id IS NOT NULL DO NOTHING",
"options": {}
},
"id": "r1000001-0000-0000-0000-000000000005",
"name": "PostgreSQL INSERT",
"type": "n8n-nodes-base.postgres",
"typeVersion": 2.5,
"position": [
880,
200
],
"credentials": {
"postgres": {
"id": "KaxU8iKtraFfsrTF",
"name": "bot-postgres"
}
}
},
{
"parameters": {
"jsCode": "function httpPost(url, body, { timeout = 10000, headers = {} } = {}) {\n return new Promise((resolve, reject) => {\n const data = JSON.stringify(body);\n const u = require('url').parse(url);\n const mod = require(u.protocol === 'https:' ? 'https' : 'http');\n const options = {\n hostname: u.hostname, port: u.port, path: u.path,\n method: 'POST',\n headers: { 'Content-Type': 'application/json', 'Content-Length': Buffer.byteLength(data), ...headers },\n rejectUnauthorized: false\n };\n const req = mod.request(options, (res) => {\n let body = '';\n res.on('data', c => body += c);\n res.on('end', () => resolve({ statusCode: res.statusCode, body }));\n });\n req.on('error', reject);\n req.setTimeout(timeout, () => { req.destroy(); reject(new Error('timeout')); });\n req.write(data);\n req.end();\n });\n}\n\nconst item = $input.first().json;\nconst webhookUrl = $env.RETROSPECT_CHAT_WEBHOOK_URL || '';\n\nif (webhookUrl) {\n const tagsStr = (item.tags || []).map(t => '#' + t).join(' ');\n const msg = `\\u2713 [${item.domain}] ${tagsStr}`;\n try {\n await httpPost(webhookUrl, { text: msg }, { timeout: 10000 });\n } catch(e) {\n console.log('Chat ack failed: ' + e.message);\n }\n}\n\nreturn [{ json: { ...item, ack_sent: true } }];"
},
"id": "r1000001-0000-0000-0000-000000000006",
"name": "Chat Confirm",
"type": "n8n-nodes-base.code",
"typeVersion": 1,
"position": [
1100,
200
]
},
{
"parameters": {
"respondWith": "json",
"responseBody": "={ \"status\": \"ok\" }",
"options": {}
},
"id": "r1000001-0000-0000-0000-000000000007",
"name": "Respond 200",
"type": "n8n-nodes-base.respondToWebhook",
"typeVersion": 1.1,
"position": [
1320,
200
]
},
{
"parameters": {
"respondWith": "json",
"responseBody": "={ \"status\": \"skipped\" }",
"options": {}
},
"id": "r1000001-0000-0000-0000-000000000008",
"name": "Respond Skip",
"type": "n8n-nodes-base.respondToWebhook",
"typeVersion": 1.1,
"position": [
660,
420
]
}
],
"connections": {
"Webhook": {
"main": [
[
{
"node": "Validate",
"type": "main",
"index": 0
}
]
]
},
"Validate": {
"main": [
[
{
"node": "IF Valid?",
"type": "main",
"index": 0
}
]
]
},
"IF Valid?": {
"main": [
[
{
"node": "Qwen Classify",
"type": "main",
"index": 0
}
],
[
{
"node": "Respond Skip",
"type": "main",
"index": 0
}
]
]
},
"Qwen Classify": {
"main": [
[
{
"node": "PostgreSQL INSERT",
"type": "main",
"index": 0
}
]
]
},
"PostgreSQL INSERT": {
"main": [
[
{
"node": "Chat Confirm",
"type": "main",
"index": 0
}
]
]
},
"Chat Confirm": {
"main": [
[
{
"node": "Respond 200",
"type": "main",
"index": 0
}
]
]
}
},
"settings": {
"executionOrder": "v1"
}
}