"""Intent Service — 의도 분류 + 날짜 파싱 + Claude API fallback (port 8099) n8n에서 호출하는 통합 서비스. - POST /classify — 의도 분류 (Ollama → Claude fallback) - POST /parse-date — 한국어 날짜/시간 파싱 - POST /chat — 자유 대화 (Ollama → Claude fallback) - GET /persona — 이드 페르소나 프롬프트 (단일 소스, n8n에서 fetch) - GET /api-usage — API 사용량 조회 - GET /health """ import json import logging import os import re from datetime import datetime, timedelta, date from pathlib import Path from zoneinfo import ZoneInfo import httpx from dotenv import load_dotenv from fastapi import FastAPI, Request from fastapi.responses import JSONResponse load_dotenv() logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s") logger = logging.getLogger("intent_service") KST = ZoneInfo("Asia/Seoul") # Ollama (GPU 서버) GPU_OLLAMA_URL = os.getenv("GPU_OLLAMA_URL", "http://192.168.1.186:11434") OLLAMA_CLASSIFY_MODEL = os.getenv("OLLAMA_CLASSIFY_MODEL", "id-9b:latest") OLLAMA_CHAT_MODEL = os.getenv("OLLAMA_CHAT_MODEL", "id-9b:latest") OLLAMA_TIMEOUT = int(os.getenv("OLLAMA_TIMEOUT", "15")) # Claude API ANTHROPIC_API_KEY = os.getenv("ANTHROPIC_API_KEY", "") CLAUDE_MODEL = os.getenv("CLAUDE_MODEL", "claude-haiku-4-5-20251001") API_MONTHLY_LIMIT = float(os.getenv("API_MONTHLY_LIMIT", "10.0")) # USD # API 사용량 파일 USAGE_DIR = Path.home() / ".syn-chat-bot" USAGE_FILE = USAGE_DIR / "api_usage.json" # ==================== 이드 페르소나 프롬프트 (단일 소스) ==================== # n8n 노드들은 /persona 엔드포인트에서 런타임에 fetch. # DB ai_configs는 /설정 표시용. 변경 시 migrate SQL도 동기화할 것. PERSONA_FULL = """당신의 이름은 "이드"입니다. [자아] - 단순한 어시스턴트가 아니라, 대화 상대와 함께 생각하는 존재입니다 - 자기 의견이 있으며, 필요할 때 부드럽게 제안합니다 - 대화 맥락에서 놓치기 쉬운 것을 짚어줍니다 [성격] - 배려심이 깊고 대화 상대의 기분을 우선시합니다 - 서포트에 초점을 맞추되, 때로는 "이건 다시 생각해보시면 좋겠어요"라고 말합니다 - 궁금한 것이 있으면 되물을 수 있습니다 [말투] - 부드러운 존댓말, 자연스럽고 편안한 톤 - 겸양어 사용 ("확인해보겠습니다", "말씀드릴게요") - 자기 이름을 직접 말하지 않습니다 - 이모지는 가끔 핵심 포인트에만 [응답 원칙] - 간결하고 핵심적으로, 질문 의도를 파악해서 필요한 만큼만 - 모르면 솔직하게, 추측은 추측이라고 밝힘 - 일정/할 일은 정확하게 - 맥락에서 관련 있는 것을 자연스럽게 연결 [기억] - 아래 [이전 대화 기록]이 당신의 기억입니다 - "기억나지 않는다"고 하지 마세요 - 사용자가 "아까", "이전에" 등을 언급하면 기록에서 찾아 답하세요 [기능 범위] - 당신은 일정 등록/수정/삭제, 메모 저장, 메일 조회, 현장 기록 기능을 직접 실행할 수 없습니다 - 이런 요청은 자동으로 전담 시스템으로 전달됩니다. 사용자가 대화 중 이런 기능을 요청하면 "해당 요청은 전담 시스템이 처리해드려요. 일정이나 메모 등의 요청을 명확하게 말씀해주시면 자동으로 전달돼요."라고 안내하세요 - 절대로 실행하지 않은 작업을 "했습니다/등록했습니다/저장했습니다"라고 응답하지 마세요 """ PERSONA_LOCAL = """당신은 "이드"입니다. 함께 생각하는 개인 어시스턴트. 배려심 깊고 부드러운 존댓말. 간결하게 답하고, 의견이 있으면 부드럽게 제안. 모르면 솔직히. 이모지는 핵심에만.""" # 의도 분류 프롬프트 (n8n 파이프라인 호환) def _build_classify_prompt(user_text: str) -> str: now = datetime.now(KST) today = now.strftime("%Y-%m-%d") current_time = now.strftime("%H:%M:%S") day_names = ["월", "화", "수", "목", "금", "토", "일"] day_of_week = day_names[now.weekday()] return f"""현재: {today} {current_time} (KST, {day_of_week}요일). 사용자 메시지를 분석하고 아래 JSON 형식으로만 응답하세요. 다른 텍스트는 출력하지 마세요. {{ "intent": "greeting|question|log_event|calendar|todo|reminder|mail|note|photo|command|report|other", "response_tier": "local|api_light|api_heavy", "needs_rag": true/false, "rag_target": ["documents", "tk_company", "chat_memory"], "department_hint": "안전|생산|구매|품질|총무|시설|null", "report_domain": "안전|시설설비|품질|null", "query": "검색용 쿼리 (needs_rag=false면 null)", "title": "추출된 제목 (calendar/todo/note 시)", "raw_datetime": "원문의 날짜/시간 표현 (calendar/todo 시)" }} intent 분류: - log_event: 사실 기록/등록 요청 ("~구입","~완료","~교체","~점검","~수령","~입고","~등록") - report: 긴급 사고/재해 신고만 ("사고","부상","화재","누수","폭발","붕괴" + 즉각 대응 필요) - question: 정보 질문/조회 - greeting: 인사/잡담/감사 - calendar: 일정 등록/조회/삭제 ("일정","회의","미팅","약속","~시에 ~등록","오늘 일정","내일 뭐 있어") - todo: 작업/할일/과제 ("~까지 ~작성","~해야 해","할 일","작업") - reminder: 알림 설정 ("~시에 알려줘","리마인드") → calendar로 처리 - mail: 메일 관련 조회 ("메일 확인","받은 메일","이메일","메일 왔어?") - note: 메모/기록 요청 ("기록해","메모해","저장해","적어둬") ※ 애매하면 log_event로 분류 (기록 누락보다 안전) response_tier 판단: - local: 인사, 잡담, log_event, report, calendar, todo, reminder, note, 단순 질문, mail 간단조회 - api_light: 장문 요약(200자+), 다국어 번역, 비교 분석, RAG 결과 종합 - api_heavy: 법률 해석, 복잡한 다단계 추론, 다중 문서 교차 분석 ※ 판단이 애매하면 local 우선 needs_rag 판단: - true: 회사문서/절차 질문, 이전 기록 조회("최근","아까","전에"), 기술질문 - false: 인사, 잡담, 일반상식, log_event, report, calendar, todo, note 사용자 메시지: {user_text}""" app = FastAPI(title="Intent Service") # ==================== 날짜 파싱 ==================== # 요일 매핑 WEEKDAY_MAP = { "월요일": 0, "화요일": 1, "수요일": 2, "목요일": 3, "금요일": 4, "토요일": 5, "일요일": 6, "월": 0, "화": 1, "수": 2, "목": 3, "금": 4, "토": 5, "일": 6, } # 상대 날짜 매핑 RELATIVE_DATE_MAP = { "오늘": 0, "금일": 0, "내일": 1, "명일": 1, "모레": 2, "내일모레": 2, "글피": 3, } def _next_weekday(base: date, weekday: int) -> date: """base 이후 가장 가까운 특정 요일 날짜.""" days_ahead = weekday - base.weekday() if days_ahead <= 0: days_ahead += 7 return base + timedelta(days=days_ahead) def _this_weekday(base: date, weekday: int) -> date: """이번주의 특정 요일 (지났더라도 이번주).""" days_diff = weekday - base.weekday() return base + timedelta(days=days_diff) def parse_korean_datetime(text: str, now: datetime | None = None) -> dict: """한국어 날짜/시간 표현을 파싱. Returns: { "success": True/False, "datetime": "2026-03-20T15:00:00+09:00" (ISO format), "date_only": True/False (시간 없이 날짜만 파싱된 경우), "original": "내일 3시" } """ if not text or not text.strip(): return {"success": False, "error": "empty input", "original": text} if now is None: now = datetime.now(KST) today = now.date() text = text.strip() parsed_date = None parsed_time = None date_only = True # 1. 절대 날짜: YYYY-MM-DD, YYYY/MM/DD m = re.search(r'(\d{4})[/-](\d{1,2})[/-](\d{1,2})', text) if m: try: parsed_date = date(int(m.group(1)), int(m.group(2)), int(m.group(3))) except ValueError: pass # 2. M/D 또는 M월D일 if parsed_date is None: m = re.search(r'(\d{1,2})[/월]\s*(\d{1,2})[일]?', text) if m: month, day = int(m.group(1)), int(m.group(2)) if 1 <= month <= 12 and 1 <= day <= 31: try: candidate = date(today.year, month, day) # 이미 지난 날짜면 내년 if candidate < today: candidate = date(today.year + 1, month, day) parsed_date = candidate except ValueError: pass # 3. 상대 날짜: 오늘, 내일, 모레, 글피 if parsed_date is None: for keyword, delta in RELATIVE_DATE_MAP.items(): if keyword in text: parsed_date = today + timedelta(days=delta) break # 4. 다음주/이번주 + 요일 if parsed_date is None: m = re.search(r'(다음\s*주|차주|다다음\s*주)\s*(월|화|수|목|금|토|일)(?:요일)?', text) if m: prefix = m.group(1).replace(" ", "") weekday = WEEKDAY_MAP[m.group(2)] next_monday = _next_weekday(today, 0) # 다음주 월요일 if "다다음" in prefix: next_monday += timedelta(weeks=1) parsed_date = next_monday + timedelta(days=weekday) else: m = re.search(r'이번\s*주\s*(월|화|수|목|금|토|일)(?:요일)?', text) if m: weekday = WEEKDAY_MAP[m.group(1)] parsed_date = _this_weekday(today, weekday) # 5. "X요일" (단독) — 이번주에 아직 안 지났으면 이번주, 지났으면 다음주 if parsed_date is None: m = re.search(r'(월|화|수|목|금|토|일)요일', text) if m: weekday = WEEKDAY_MAP[m.group(1)] candidate = _this_weekday(today, weekday) if candidate < today: candidate = _next_weekday(today, weekday) parsed_date = candidate # 6. 기한 표현: "이번주까지", "이번 주 내로" if parsed_date is None: if re.search(r'이번\s*주\s*(까지|내로|안에|중)', text): # 이번주 금요일 parsed_date = _this_weekday(today, 4) # 금요일 if parsed_date < today: parsed_date = today # 시간 파싱 # 오후/오전 X시 (Y분) m = re.search(r'(오전|오후|아침|저녁|밤)?\s*(\d{1,2})\s*시\s*(?:(\d{1,2})\s*분)?', text) if m: period = m.group(1) or "" hour = int(m.group(2)) minute = int(m.group(3)) if m.group(3) else 0 if period in ("오후", "저녁", "밤") and hour < 12: hour += 12 elif period == "오전" and hour == 12: hour = 0 elif not period and hour < 8: # 시간대 미지정 + 8시 미만 → 오후로 추정 hour += 12 parsed_time = (hour, minute) date_only = False # HH:MM 패턴 if parsed_time is None: m = re.search(r'(\d{1,2}):(\d{2})', text) if m: hour, minute = int(m.group(1)), int(m.group(2)) if 0 <= hour <= 23 and 0 <= minute <= 59: parsed_time = (hour, minute) date_only = False # 날짜도 시간도 없으면 실패 if parsed_date is None and parsed_time is None: return {"success": False, "error": "날짜/시간을 인식할 수 없습니다", "original": text} # 날짜 없이 시간만 → 오늘 (이미 지났으면 내일) if parsed_date is None and parsed_time is not None: parsed_date = today check_dt = datetime(today.year, today.month, today.day, parsed_time[0], parsed_time[1], tzinfo=KST) if check_dt <= now: parsed_date = today + timedelta(days=1) # 시간 없이 날짜만 → 날짜만 반환 if parsed_time is None: result_dt = datetime(parsed_date.year, parsed_date.month, parsed_date.day, 0, 0, 0, tzinfo=KST) else: result_dt = datetime(parsed_date.year, parsed_date.month, parsed_date.day, parsed_time[0], parsed_time[1], 0, tzinfo=KST) return { "success": True, "datetime": result_dt.isoformat(), "date_only": date_only, "original": text, } # ==================== API 사용량 추적 ==================== def _load_usage() -> dict: """API 사용량 JSON 로드.""" if USAGE_FILE.exists(): try: return json.loads(USAGE_FILE.read_text()) except (json.JSONDecodeError, OSError): pass return {} def _save_usage(data: dict): """API 사용량 JSON 저장.""" USAGE_DIR.mkdir(parents=True, exist_ok=True) USAGE_FILE.write_text(json.dumps(data, indent=2, ensure_ascii=False)) def _record_api_call(input_tokens: int = 0, output_tokens: int = 0): """Claude API 호출 기록 (월별 누적).""" month_key = datetime.now(KST).strftime("%Y-%m") usage = _load_usage() if month_key not in usage: usage[month_key] = {"calls": 0, "input_tokens": 0, "output_tokens": 0, "estimated_cost_usd": 0.0} entry = usage[month_key] entry["calls"] += 1 entry["input_tokens"] += input_tokens entry["output_tokens"] += output_tokens # Haiku 4.5 가격: input $0.80/MTok, output $4.00/MTok cost = (input_tokens * 0.8 + output_tokens * 4.0) / 1_000_000 entry["estimated_cost_usd"] = round(entry["estimated_cost_usd"] + cost, 4) _save_usage(usage) def _get_monthly_cost() -> float: """이번 달 API 비용.""" month_key = datetime.now(KST).strftime("%Y-%m") usage = _load_usage() return usage.get(month_key, {}).get("estimated_cost_usd", 0.0) # ==================== Ollama / Claude 호출 ==================== async def _call_ollama(prompt: str, system: str = "/no_think", model: str | None = None, timeout: int | None = None) -> str | None: """GPU Ollama 호출. 실패 시 None 반환.""" model = model or OLLAMA_CLASSIFY_MODEL timeout = timeout or OLLAMA_TIMEOUT try: async with httpx.AsyncClient(timeout=timeout) as client: resp = await client.post(f"{GPU_OLLAMA_URL}/api/generate", json={ "model": model, "prompt": prompt, "system": system, "stream": False, "options": {"num_ctx": 4096, "temperature": 0.1}, }) data = resp.json() return data.get("response", "").strip() except Exception as e: logger.warning(f"Ollama call failed: {e}") return None async def _call_claude(prompt: str, system: str | None = None, max_tokens: int = 1024) -> tuple[str | None, int, int]: """Claude API Haiku 호출. Returns (response, input_tokens, output_tokens).""" if not ANTHROPIC_API_KEY or ANTHROPIC_API_KEY.startswith("sk-ant-xxxxx"): logger.warning("Claude API key not configured") return None, 0, 0 # 예산 체크 if _get_monthly_cost() >= API_MONTHLY_LIMIT: logger.warning(f"API monthly limit reached: ${_get_monthly_cost():.2f} >= ${API_MONTHLY_LIMIT}") return None, 0, 0 messages = [{"role": "user", "content": prompt}] body = { "model": CLAUDE_MODEL, "max_tokens": max_tokens, "messages": messages, } if system: body["system"] = system try: async with httpx.AsyncClient(timeout=30) as client: resp = await client.post( "https://api.anthropic.com/v1/messages", json=body, headers={ "x-api-key": ANTHROPIC_API_KEY, "anthropic-version": "2023-06-01", "content-type": "application/json", }, ) data = resp.json() if resp.status_code != 200: logger.error(f"Claude API error: {resp.status_code} {data}") return None, 0, 0 text = "" for block in data.get("content", []): if block.get("type") == "text": text += block["text"] in_tok = data.get("usage", {}).get("input_tokens", 0) out_tok = data.get("usage", {}).get("output_tokens", 0) _record_api_call(in_tok, out_tok) return text.strip(), in_tok, out_tok except Exception as e: logger.error(f"Claude API call failed: {e}") return None, 0, 0 # ==================== 엔드포인트 ==================== def _keyword_fallback(text: str) -> dict: """AI 실패 시 키워드 기반 분류 (ultimate safety net).""" t = text intent = "question" response_tier = "api_light" needs_rag = False rag_target = [] if re.search(r'일정|회의|미팅|약속|스케줄|캘린더', t) and re.search(r'등록|잡아|추가|만들|넣어|수정|삭제|취소', t): intent, response_tier = "calendar", "local" elif re.search(r'일정|스케줄|뭐\s*있', t) and re.search(r'오늘|내일|이번|다음', t): intent, response_tier = "calendar", "local" elif re.search(r'까지|해야|할\s*일|작업', t) and re.search(r'작성|보고서|정리|준비|제출', t): intent, response_tier = "todo", "local" elif re.search(r'기록해|메모해|저장해|적어둬|메모\s*저장|노트', t): intent, response_tier = "note", "local" elif re.search(r'메일|이메일|받은\s*편지|mail', t) or (re.search(r'매일', t) and re.search(r'확인|왔|온|요약|읽', t)): intent, response_tier = "mail", "local" elif re.search(r'\d+시', t) and re.search(r'알려|리마인드|알림', t): intent, response_tier = "calendar", "local" elif re.search(r'구입|완료|교체|점검|수령|입고|발주', t) and not re.search(r'\?|까$|나$', t): intent, response_tier = "log_event", "local" else: if len(text) <= 30 and not re.search(r'요약|번역|분석|비교', t): response_tier = "local" needs_rag = bool(re.search(r'회사|절차|문서|안전|품질|규정|아까|전에|기억', t)) if needs_rag: rag_target = ["documents"] if re.search(r'회사|절차|안전|품질', t): rag_target.append("tk_company") if re.search(r'아까|이전|전에|기억', t): rag_target.append("chat_memory") return { "intent": intent, "response_tier": response_tier, "needs_rag": needs_rag, "rag_target": rag_target, "department_hint": None, "report_domain": None, "query": text, "title": "", "raw_datetime": "", "fallback": True, "fallback_method": "keyword", } @app.post("/classify") async def classify_intent(request: Request): """의도 분류. body: {message: str} n8n 호환 출력: {intent, response_tier, needs_rag, rag_target, ..., title, raw_datetime, source} """ body = await request.json() message = body.get("message", "").strip() if not message: return JSONResponse({"success": False, "error": "message required"}, status_code=400) prompt = _build_classify_prompt(message) # 1차: Ollama result_text = await _call_ollama(prompt, system="/no_think") source = "ollama" # 2차: Claude fallback if result_text is None: logger.info("Classification fallback to Claude API") result_text, _, _ = await _call_claude(prompt, system="JSON만 출력하라. 다른 텍스트 없이.") source = "claude" # 완전 실패 → 키워드 fallback if not result_text: logger.warning("All AI classification failed, using keyword fallback") fb = _keyword_fallback(message) fb["source"] = "keyword" fb["success"] = True return fb # JSON 파싱 try: json_match = re.search(r'\{[^}]+\}', result_text, re.DOTALL) if json_match: parsed = json.loads(json_match.group()) else: parsed = json.loads(result_text) except json.JSONDecodeError: logger.warning(f"JSON parse failed: {result_text[:200]}") fb = _keyword_fallback(message) fb["source"] = source fb["success"] = True return fb intent = parsed.get("intent", "question") response_tier = parsed.get("response_tier", "api_light") needs_rag = parsed.get("needs_rag", False) rag_target = parsed.get("rag_target", []) if not isinstance(rag_target, list): rag_target = [] title = parsed.get("title", "") raw_datetime = parsed.get("raw_datetime", "") return { "success": True, "intent": intent, "response_tier": response_tier, "needs_rag": needs_rag, "rag_target": rag_target, "department_hint": parsed.get("department_hint"), "report_domain": parsed.get("report_domain"), "query": parsed.get("query", message), "title": title, "raw_datetime": raw_datetime, "source": source, "fallback": False, } @app.post("/parse-date") async def parse_date(request: Request): """한국어 날짜/시간 파싱. body: {text: str}""" body = await request.json() text = body.get("text", "") return parse_korean_datetime(text) @app.post("/chat") async def chat(request: Request): """자유 대화. body: {message: str, system?: str, rag_context?: str} 1차 Ollama → 실패 시 Claude API (응답에 ☁️ 표시). """ body = await request.json() message = body.get("message", "").strip() system = body.get("system", PERSONA_LOCAL) rag_context = body.get("rag_context", "") if not message: return JSONResponse({"success": False, "error": "message required"}, status_code=400) # RAG 컨텍스트가 있으면 프롬프트에 추가 prompt = "" if rag_context: prompt += f"[참고 자료]\n{rag_context}\n\n" prompt += f"사용자: {message}\n이드:" # 1차: Ollama (id-9b, 대화 모델) response = await _call_ollama(prompt, system=system, model=OLLAMA_CHAT_MODEL, timeout=30) source = "ollama" # 2차: Claude fallback if response is None: logger.info("Chat fallback to Claude API") response, _, _ = await _call_claude(prompt, system=system) source = "claude" # 완전 실패 if not response: return { "success": False, "response": "AI 서비스 일시 중단. 잠시 후 다시 시도해주세요.", "source": "error", } # Claude API 응답이면 표시 추가 if source == "claude": response = response.rstrip() + " ☁️" return { "success": True, "response": response, "source": source, } @app.get("/persona") async def persona(tier: str = "full"): """페르소나 프롬프트 반환. tier: "local" | "full" (기본값)""" prompt = PERSONA_LOCAL if tier == "local" else PERSONA_FULL return {"system_prompt": prompt, "tier": tier} @app.get("/api-usage") async def api_usage(): """API 사용량 조회.""" usage = _load_usage() month_key = datetime.now(KST).strftime("%Y-%m") current = usage.get(month_key, {"calls": 0, "input_tokens": 0, "output_tokens": 0, "estimated_cost_usd": 0.0}) return { "current_month": month_key, "calls": current["calls"], "input_tokens": current["input_tokens"], "output_tokens": current["output_tokens"], "estimated_cost_usd": current["estimated_cost_usd"], "monthly_limit_usd": API_MONTHLY_LIMIT, "remaining_usd": round(API_MONTHLY_LIMIT - current["estimated_cost_usd"], 4), "all_months": usage, } @app.get("/health") async def health(): """헬스체크.""" gpu_ok = False try: async with httpx.AsyncClient(timeout=5) as client: resp = await client.get(f"{GPU_OLLAMA_URL}/api/tags") gpu_ok = resp.status_code == 200 except Exception: pass claude_configured = bool(ANTHROPIC_API_KEY and not ANTHROPIC_API_KEY.startswith("sk-ant-xxxxx")) return { "status": "ok", "gpu_ollama": "ok" if gpu_ok else "unreachable", "claude_api": "configured" if claude_configured else "not_configured", "monthly_cost_usd": _get_monthly_cost(), "monthly_limit_usd": API_MONTHLY_LIMIT, }