#!/usr/bin/env python3 """ PKM Host API Server DEVONthink + OmniFocus AppleScript 중계 + RAG 검색 경량 HTTP 서버. NanoClaw 컨테이너에서 호출. LaunchAgent(GUI 세션)로 실행 필수. 범위: DEVONthink + OmniFocus + RAG 검색. """ import json import os import subprocess import sys from pathlib import Path from flask import Flask, request, jsonify sys.path.insert(0, str(Path(__file__).parent)) from pkm_utils import load_credentials app = Flask(__name__) def run_applescript(script: str, timeout: int = 120) -> str: result = subprocess.run( ['osascript', '-e', script], capture_output=True, text=True, timeout=timeout ) if result.returncode != 0: raise RuntimeError(result.stderr.strip()) return result.stdout.strip() # --- DEVONthink --- @app.route('/devonthink/stats') def devonthink_stats(): try: # DB별 문서 수만 빠르게 조회 (children 순회 대신 count 사용) script = ( 'tell application id "DNtp"\n' ' set stats to {}\n' ' repeat with db in databases\n' ' set dbName to name of db\n' ' set docCount to count of contents of db\n' ' set end of stats to dbName & ":" & docCount\n' ' end repeat\n' ' set AppleScript\'s text item delimiters to "|"\n' ' return stats as text\n' 'end tell' ) result = run_applescript(script) stats = {} total = 0 if result: for item in result.split('|'): parts = item.split(':') if len(parts) == 2: count = int(parts[1]) stats[parts[0]] = {'count': count} total += count return jsonify(success=True, data={ 'databases': stats, 'total_documents': total, 'database_count': len(stats), }) except Exception as e: return jsonify(success=False, error=str(e)), 500 @app.route('/devonthink/search') def devonthink_search(): q = request.args.get('q', '') limit = int(request.args.get('limit', '10')) if not q: return jsonify(success=False, error='q parameter required'), 400 try: # 한글 쿼리 이스케이프 (따옴표, 백슬래시) safe_q = q.replace('\\', '\\\\').replace('"', '\\"') script = ( 'tell application id "DNtp"\n' f' set results to search "{safe_q}"\n' ' set output to {}\n' f' set maxCount to {limit}\n' ' set i to 0\n' ' repeat with rec in results\n' ' if i >= maxCount then exit repeat\n' ' set recName to name of rec\n' ' set recDB to name of database of rec\n' ' set recDate to modification date of rec as text\n' ' set end of output to recName & "||" & recDB & "||" & recDate\n' ' set i to i + 1\n' ' end repeat\n' ' set AppleScript\'s text item delimiters to linefeed\n' ' return output as text\n' 'end tell' ) result = run_applescript(script) items = [] if result: for line in result.split('\n'): parts = line.split('||') if len(parts) == 3: items.append({'name': parts[0], 'database': parts[1], 'modified': parts[2]}) return jsonify(success=True, data=items, count=len(items)) except Exception as e: return jsonify(success=False, error=str(e)), 500 @app.route('/devonthink/inbox-count') def devonthink_inbox_count(): try: script = ( 'tell application id "DNtp"\n' ' set inboxDB to database "Inbox"\n' ' return count of children of root of inboxDB\n' 'end tell' ) count = int(run_applescript(script)) return jsonify(success=True, data={'inbox_count': count}) except Exception as e: return jsonify(success=False, error=str(e)), 500 # --- OmniFocus --- @app.route('/omnifocus/stats') def omnifocus_stats(): try: script = ( 'tell application "OmniFocus"\n' ' tell default document\n' ' set today to current date\n' ' set time of today to 0\n' ' set completedCount to count of (every flattened task whose completed is true and completion date >= today)\n' ' set addedCount to count of (every flattened task whose creation date >= today)\n' ' set overdueCount to count of (every flattened task whose completed is false and due date < today and due date is not missing value)\n' ' return (completedCount as text) & "|" & (addedCount as text) & "|" & (overdueCount as text)\n' ' end tell\n' 'end tell' ) result = run_applescript(script) parts = result.split('|') return jsonify(success=True, data={ 'completed': int(parts[0]) if len(parts) > 0 else 0, 'added': int(parts[1]) if len(parts) > 1 else 0, 'overdue': int(parts[2]) if len(parts) > 2 else 0 }) except Exception as e: return jsonify(success=False, error=str(e)), 500 @app.route('/omnifocus/overdue') def omnifocus_overdue(): try: script = ( 'tell application "OmniFocus"\n' ' tell default document\n' ' set today to current date\n' ' set time of today to 0\n' ' set overdueTasks to every flattened task whose completed is false and due date < today and due date is not missing value\n' ' set output to {}\n' ' repeat with t in overdueTasks\n' ' set taskName to name of t\n' ' set dueDate to due date of t as text\n' ' set projName to ""\n' ' try\n' ' set projName to name of containing project of t\n' ' end try\n' ' set end of output to taskName & "||" & projName & "||" & dueDate\n' ' end repeat\n' ' set AppleScript\'s text item delimiters to linefeed\n' ' return output as text\n' ' end tell\n' 'end tell' ) result = run_applescript(script) tasks = [] if result: for line in result.split('\n'): parts = line.split('||') tasks.append({ 'name': parts[0], 'project': parts[1] if len(parts) > 1 else '', 'due_date': parts[2] if len(parts) > 2 else '' }) return jsonify(success=True, data=tasks, count=len(tasks)) except Exception as e: return jsonify(success=False, error=str(e)), 500 @app.route('/omnifocus/today') def omnifocus_today(): try: script = ( 'tell application "OmniFocus"\n' ' tell default document\n' ' set today to current date\n' ' set time of today to 0\n' ' set tomorrow to today + 1 * days\n' ' set todayTasks to every flattened task whose completed is false and ((due date >= today and due date < tomorrow) or (defer date >= today and defer date < tomorrow))\n' ' set output to {}\n' ' repeat with t in todayTasks\n' ' set taskName to name of t\n' ' set projName to ""\n' ' try\n' ' set projName to name of containing project of t\n' ' end try\n' ' set end of output to taskName & "||" & projName\n' ' end repeat\n' ' set AppleScript\'s text item delimiters to linefeed\n' ' return output as text\n' ' end tell\n' 'end tell' ) result = run_applescript(script) tasks = [] if result: for line in result.split('\n'): parts = line.split('||') tasks.append({'name': parts[0], 'project': parts[1] if len(parts) > 1 else ''}) return jsonify(success=True, data=tasks, count=len(tasks)) except Exception as e: return jsonify(success=False, error=str(e)), 500 # --- RAG --- def _get_gpu_ip(): creds = load_credentials() return creds.get("GPU_SERVER_IP") def _embed_text(text: str, gpu_ip: str) -> list[float] | None: """GPU 서버 bge-m3로 텍스트 임베딩""" import requests as req try: resp = req.post(f"http://{gpu_ip}:11434/api/embed", json={"model": "bge-m3", "input": [text[:8000]]}, timeout=60) resp.raise_for_status() return resp.json().get("embeddings", [[]])[0] except Exception: return None def _search_qdrant(vector: list[float], limit: int = 20) -> list[dict]: """Qdrant에서 유사도 검색""" import requests as req resp = req.post("http://localhost:6333/collections/pkm_documents/points/search", json={"vector": vector, "limit": limit, "with_payload": True}, timeout=10) resp.raise_for_status() return resp.json().get("result", []) def _llm_generate(prompt: str) -> str: """Mac Mini MLX로 답변 생성 (thinking 필터링 포함)""" import requests as req from pkm_utils import strip_thinking resp = req.post("http://localhost:8800/v1/chat/completions", json={ "model": "mlx-community/Qwen3.5-35B-A3B-4bit", "messages": [{"role": "user", "content": prompt}], "temperature": 0.3, "max_tokens": 2048, "enable_thinking": False, }, timeout=120) resp.raise_for_status() content = resp.json()["choices"][0]["message"]["content"] return strip_thinking(content) @app.route('/rag/query', methods=['POST']) def rag_query(): """RAG 질의: 임베딩 → Qdrant 검색 → LLM 답변 생성""" data = request.get_json(silent=True) or {} q = data.get('q', '') limit = data.get('limit', 10) if not q: return jsonify(success=False, error='q parameter required'), 400 gpu_ip = _get_gpu_ip() if not gpu_ip: return jsonify(success=False, error='GPU_SERVER_IP not configured'), 500 try: # 1. 쿼리 임베딩 query_vec = _embed_text(q, gpu_ip) if not query_vec: return jsonify(success=False, error='embedding failed'), 500 # 2. Qdrant 검색 results = _search_qdrant(query_vec, limit=limit) if not results: return jsonify(success=True, answer="관련 문서를 찾지 못했습니다.", sources=[]) # 3. 컨텍스트 조립 sources = [] context_parts = [] for r in results[:5]: payload = r.get("payload", {}) title = payload.get("title", "") preview = payload.get("text_preview", "") doc_uuid = payload.get("uuid", "") sources.append({ "title": title, "uuid": doc_uuid, "score": round(r.get("score", 0), 3), "link": f"x-devonthink-item://{doc_uuid}" if doc_uuid else None, }) context_parts.append(f"[{title}]\n{preview}") context = "\n\n---\n\n".join(context_parts) # 4. LLM 답변 생성 prompt = f"""다음 문서들을 참고하여 질문에 답변해주세요. ## 참고 문서 {context} ## 질문 {q} 답변은 한국어로, 참고한 문서 제목을 언급해주세요.""" answer = _llm_generate(prompt) return jsonify(success=True, answer=answer, sources=sources, query=q) except Exception as e: return jsonify(success=False, error=str(e)), 500 @app.route('/devonthink/embed', methods=['POST']) def devonthink_embed(): """단일 문서 임베딩 트리거""" data = request.get_json(silent=True) or {} doc_uuid = data.get('uuid', '') if not doc_uuid: return jsonify(success=False, error='uuid parameter required'), 400 try: venv_python = str(Path(__file__).parent.parent / "venv" / "bin" / "python3") embed_script = str(Path(__file__).parent / "embed_to_qdrant.py") result = subprocess.run( [venv_python, embed_script, doc_uuid], capture_output=True, text=True, timeout=120 ) if result.returncode != 0: return jsonify(success=False, error=result.stderr.strip()), 500 return jsonify(success=True, uuid=doc_uuid) except Exception as e: return jsonify(success=False, error=str(e)), 500 @app.route('/devonthink/embed-batch', methods=['POST']) def devonthink_embed_batch(): """배치 문서 임베딩 트리거""" data = request.get_json(silent=True) or {} uuids = data.get('uuids', []) if not uuids: return jsonify(success=False, error='uuids array required'), 400 results = [] venv_python = str(Path(__file__).parent.parent / "venv" / "bin" / "python3") embed_script = str(Path(__file__).parent / "embed_to_qdrant.py") for doc_uuid in uuids: try: result = subprocess.run( [venv_python, embed_script, doc_uuid], capture_output=True, text=True, timeout=120 ) results.append({"uuid": doc_uuid, "success": result.returncode == 0}) except Exception as e: results.append({"uuid": doc_uuid, "success": False, "error": str(e)}) succeeded = sum(1 for r in results if r["success"]) return jsonify(success=True, total=len(uuids), succeeded=succeeded, results=results) @app.route('/health') def health(): return jsonify(success=True, service='pkm-api', endpoints=[ '/devonthink/stats', '/devonthink/search?q=', '/devonthink/inbox-count', '/devonthink/embed', '/devonthink/embed-batch', '/omnifocus/stats', '/omnifocus/overdue', '/omnifocus/today', '/rag/query', ]) if __name__ == '__main__': port = int(sys.argv[1]) if len(sys.argv) > 1 else 9900 print(f'PKM API Server starting on port {port}') app.run(host='127.0.0.1', port=port)