Files
hyungi_document_server/scripts/pkm_api_server.py
hyungi 5db2f4f6fa feat: RAG 파이프라인 — pkm_api_server.py에 검색/임베딩 엔드포인트 추가
- POST /rag/query: 질문 → GPU bge-m3 임베딩 → Qdrant 검색 → MLX 35B 답변 생성
  - DEVONthink 링크(x-devonthink-item://UUID) 포함 응답
- POST /devonthink/embed: 단일 문서 UUID → Qdrant 임베딩 트리거
- POST /devonthink/embed-batch: 배치 문서 임베딩
- docstring 범위 갱신: DEVONthink + OmniFocus + RAG 검색

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-30 13:32:49 +09:00

395 lines
14 KiB
Python

#!/usr/bin/env python3
"""
PKM Host API Server
DEVONthink + OmniFocus AppleScript 중계 + RAG 검색 경량 HTTP 서버.
NanoClaw 컨테이너에서 호출. LaunchAgent(GUI 세션)로 실행 필수.
범위: DEVONthink + OmniFocus + RAG 검색.
"""
import json
import os
import subprocess
import sys
from pathlib import Path
from flask import Flask, request, jsonify
sys.path.insert(0, str(Path(__file__).parent))
from pkm_utils import load_credentials
app = Flask(__name__)
def run_applescript(script: str, timeout: int = 120) -> str:
result = subprocess.run(
['osascript', '-e', script],
capture_output=True, text=True, timeout=timeout
)
if result.returncode != 0:
raise RuntimeError(result.stderr.strip())
return result.stdout.strip()
# --- DEVONthink ---
@app.route('/devonthink/stats')
def devonthink_stats():
try:
script = (
'tell application id "DNtp"\n'
' set today to current date\n'
' set time of today to 0\n'
' set stats to {}\n'
' repeat with db in databases\n'
' set dbName to name of db\n'
' set addedCount to 0\n'
' set modifiedCount to 0\n'
' repeat with rec in children of root of db\n'
' try\n'
' if creation date of rec >= today then set addedCount to addedCount + 1\n'
' if modification date of rec >= today then set modifiedCount to modifiedCount + 1\n'
' end try\n'
' end repeat\n'
' if addedCount > 0 or modifiedCount > 0 then\n'
' set end of stats to dbName & ":" & addedCount & ":" & modifiedCount\n'
' end if\n'
' end repeat\n'
' set AppleScript\'s text item delimiters to "|"\n'
' return stats as text\n'
'end tell'
)
result = run_applescript(script)
stats = {}
if result:
for item in result.split('|'):
parts = item.split(':')
if len(parts) == 3:
stats[parts[0]] = {'added': int(parts[1]), 'modified': int(parts[2])}
total_added = sum(s['added'] for s in stats.values())
total_modified = sum(s['modified'] for s in stats.values())
return jsonify(success=True, data={
'databases': stats,
'total_added': total_added,
'total_modified': total_modified
})
except Exception as e:
return jsonify(success=False, error=str(e)), 500
@app.route('/devonthink/search')
def devonthink_search():
q = request.args.get('q', '')
limit = int(request.args.get('limit', '10'))
if not q:
return jsonify(success=False, error='q parameter required'), 400
try:
script = (
'tell application id "DNtp"\n'
f' set results to search "{q}"\n'
' set output to {}\n'
f' set maxCount to {limit}\n'
' set i to 0\n'
' repeat with rec in results\n'
' if i >= maxCount then exit repeat\n'
' set recName to name of rec\n'
' set recDB to name of database of rec\n'
' set recDate to modification date of rec as text\n'
' set end of output to recName & "||" & recDB & "||" & recDate\n'
' set i to i + 1\n'
' end repeat\n'
' set AppleScript\'s text item delimiters to linefeed\n'
' return output as text\n'
'end tell'
)
result = run_applescript(script)
items = []
if result:
for line in result.split('\n'):
parts = line.split('||')
if len(parts) == 3:
items.append({'name': parts[0], 'database': parts[1], 'modified': parts[2]})
return jsonify(success=True, data=items, count=len(items))
except Exception as e:
return jsonify(success=False, error=str(e)), 500
@app.route('/devonthink/inbox-count')
def devonthink_inbox_count():
try:
script = (
'tell application id "DNtp"\n'
' set inboxDB to database "Inbox"\n'
' return count of children of root of inboxDB\n'
'end tell'
)
count = int(run_applescript(script))
return jsonify(success=True, data={'inbox_count': count})
except Exception as e:
return jsonify(success=False, error=str(e)), 500
# --- OmniFocus ---
@app.route('/omnifocus/stats')
def omnifocus_stats():
try:
script = (
'tell application "OmniFocus"\n'
' tell default document\n'
' set today to current date\n'
' set time of today to 0\n'
' set completedCount to count of (every flattened task whose completed is true and completion date >= today)\n'
' set addedCount to count of (every flattened task whose creation date >= today)\n'
' set overdueCount to count of (every flattened task whose completed is false and due date < today and due date is not missing value)\n'
' return (completedCount as text) & "|" & (addedCount as text) & "|" & (overdueCount as text)\n'
' end tell\n'
'end tell'
)
result = run_applescript(script)
parts = result.split('|')
return jsonify(success=True, data={
'completed': int(parts[0]) if len(parts) > 0 else 0,
'added': int(parts[1]) if len(parts) > 1 else 0,
'overdue': int(parts[2]) if len(parts) > 2 else 0
})
except Exception as e:
return jsonify(success=False, error=str(e)), 500
@app.route('/omnifocus/overdue')
def omnifocus_overdue():
try:
script = (
'tell application "OmniFocus"\n'
' tell default document\n'
' set today to current date\n'
' set time of today to 0\n'
' set overdueTasks to every flattened task whose completed is false and due date < today and due date is not missing value\n'
' set output to {}\n'
' repeat with t in overdueTasks\n'
' set taskName to name of t\n'
' set dueDate to due date of t as text\n'
' set projName to ""\n'
' try\n'
' set projName to name of containing project of t\n'
' end try\n'
' set end of output to taskName & "||" & projName & "||" & dueDate\n'
' end repeat\n'
' set AppleScript\'s text item delimiters to linefeed\n'
' return output as text\n'
' end tell\n'
'end tell'
)
result = run_applescript(script)
tasks = []
if result:
for line in result.split('\n'):
parts = line.split('||')
tasks.append({
'name': parts[0],
'project': parts[1] if len(parts) > 1 else '',
'due_date': parts[2] if len(parts) > 2 else ''
})
return jsonify(success=True, data=tasks, count=len(tasks))
except Exception as e:
return jsonify(success=False, error=str(e)), 500
@app.route('/omnifocus/today')
def omnifocus_today():
try:
script = (
'tell application "OmniFocus"\n'
' tell default document\n'
' set today to current date\n'
' set time of today to 0\n'
' set tomorrow to today + 1 * days\n'
' set todayTasks to every flattened task whose completed is false and ((due date >= today and due date < tomorrow) or (defer date >= today and defer date < tomorrow))\n'
' set output to {}\n'
' repeat with t in todayTasks\n'
' set taskName to name of t\n'
' set projName to ""\n'
' try\n'
' set projName to name of containing project of t\n'
' end try\n'
' set end of output to taskName & "||" & projName\n'
' end repeat\n'
' set AppleScript\'s text item delimiters to linefeed\n'
' return output as text\n'
' end tell\n'
'end tell'
)
result = run_applescript(script)
tasks = []
if result:
for line in result.split('\n'):
parts = line.split('||')
tasks.append({'name': parts[0], 'project': parts[1] if len(parts) > 1 else ''})
return jsonify(success=True, data=tasks, count=len(tasks))
except Exception as e:
return jsonify(success=False, error=str(e)), 500
# --- RAG ---
def _get_gpu_ip():
creds = load_credentials()
return creds.get("GPU_SERVER_IP")
def _embed_text(text: str, gpu_ip: str) -> list[float] | None:
"""GPU 서버 bge-m3로 텍스트 임베딩"""
import requests as req
try:
resp = req.post(f"http://{gpu_ip}:11434/api/embed",
json={"model": "bge-m3", "input": [text[:8000]]}, timeout=60)
resp.raise_for_status()
return resp.json().get("embeddings", [[]])[0]
except Exception:
return None
def _search_qdrant(vector: list[float], limit: int = 20) -> list[dict]:
"""Qdrant에서 유사도 검색"""
import requests as req
resp = req.post("http://localhost:6333/collections/pkm_documents/points/search",
json={"vector": vector, "limit": limit, "with_payload": True}, timeout=10)
resp.raise_for_status()
return resp.json().get("result", [])
def _llm_generate(prompt: str) -> str:
"""Mac Mini MLX로 답변 생성"""
import requests as req
resp = req.post("http://localhost:8800/v1/chat/completions", json={
"model": "mlx-community/Qwen3.5-35B-A3B-4bit",
"messages": [{"role": "user", "content": prompt}],
"temperature": 0.3,
"max_tokens": 2048,
}, timeout=120)
resp.raise_for_status()
return resp.json()["choices"][0]["message"]["content"]
@app.route('/rag/query', methods=['POST'])
def rag_query():
"""RAG 질의: 임베딩 → Qdrant 검색 → LLM 답변 생성"""
data = request.get_json(silent=True) or {}
q = data.get('q', '')
limit = data.get('limit', 10)
if not q:
return jsonify(success=False, error='q parameter required'), 400
gpu_ip = _get_gpu_ip()
if not gpu_ip:
return jsonify(success=False, error='GPU_SERVER_IP not configured'), 500
try:
# 1. 쿼리 임베딩
query_vec = _embed_text(q, gpu_ip)
if not query_vec:
return jsonify(success=False, error='embedding failed'), 500
# 2. Qdrant 검색
results = _search_qdrant(query_vec, limit=limit)
if not results:
return jsonify(success=True, answer="관련 문서를 찾지 못했습니다.", sources=[])
# 3. 컨텍스트 조립
sources = []
context_parts = []
for r in results[:5]:
payload = r.get("payload", {})
title = payload.get("title", "")
preview = payload.get("text_preview", "")
doc_uuid = payload.get("uuid", "")
sources.append({
"title": title,
"uuid": doc_uuid,
"score": round(r.get("score", 0), 3),
"link": f"x-devonthink-item://{doc_uuid}" if doc_uuid else None,
})
context_parts.append(f"[{title}]\n{preview}")
context = "\n\n---\n\n".join(context_parts)
# 4. LLM 답변 생성
prompt = f"""다음 문서들을 참고하여 질문에 답변해주세요.
## 참고 문서
{context}
## 질문
{q}
답변은 한국어로, 참고한 문서 제목을 언급해주세요."""
answer = _llm_generate(prompt)
return jsonify(success=True, answer=answer, sources=sources, query=q)
except Exception as e:
return jsonify(success=False, error=str(e)), 500
@app.route('/devonthink/embed', methods=['POST'])
def devonthink_embed():
"""단일 문서 임베딩 트리거"""
data = request.get_json(silent=True) or {}
doc_uuid = data.get('uuid', '')
if not doc_uuid:
return jsonify(success=False, error='uuid parameter required'), 400
try:
venv_python = str(Path(__file__).parent.parent / "venv" / "bin" / "python3")
embed_script = str(Path(__file__).parent / "embed_to_qdrant.py")
result = subprocess.run(
[venv_python, embed_script, doc_uuid],
capture_output=True, text=True, timeout=120
)
if result.returncode != 0:
return jsonify(success=False, error=result.stderr.strip()), 500
return jsonify(success=True, uuid=doc_uuid)
except Exception as e:
return jsonify(success=False, error=str(e)), 500
@app.route('/devonthink/embed-batch', methods=['POST'])
def devonthink_embed_batch():
"""배치 문서 임베딩 트리거"""
data = request.get_json(silent=True) or {}
uuids = data.get('uuids', [])
if not uuids:
return jsonify(success=False, error='uuids array required'), 400
results = []
venv_python = str(Path(__file__).parent.parent / "venv" / "bin" / "python3")
embed_script = str(Path(__file__).parent / "embed_to_qdrant.py")
for doc_uuid in uuids:
try:
result = subprocess.run(
[venv_python, embed_script, doc_uuid],
capture_output=True, text=True, timeout=120
)
results.append({"uuid": doc_uuid, "success": result.returncode == 0})
except Exception as e:
results.append({"uuid": doc_uuid, "success": False, "error": str(e)})
succeeded = sum(1 for r in results if r["success"])
return jsonify(success=True, total=len(uuids), succeeded=succeeded, results=results)
@app.route('/health')
def health():
return jsonify(success=True, service='pkm-api', endpoints=[
'/devonthink/stats', '/devonthink/search?q=',
'/devonthink/inbox-count', '/devonthink/embed', '/devonthink/embed-batch',
'/omnifocus/stats', '/omnifocus/overdue', '/omnifocus/today',
'/rag/query',
])
if __name__ == '__main__':
port = int(sys.argv[1]) if len(sys.argv) > 1 else 9900
print(f'PKM API Server starting on port {port}')
app.run(host='127.0.0.1', port=port)