- DEVONthink 의존성 제거 → kb_writer 전환 (news_digest, inbox_processor, mail pipeline) - devonthink_bridge.py, plist 삭제 - morning_briefing.py 신규 (매일 07:30, 일정·메일·보고·뉴스 → Synology Chat) - intent_service.py 분류기 프롬프트 개선 + 키워드 fallback - migrate-v5.sql (news_digest_log kb_path 컬럼) Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
604 lines
22 KiB
Python
604 lines
22 KiB
Python
"""Intent Service — 의도 분류 + 날짜 파싱 + Claude API fallback (port 8099)
|
|
|
|
n8n에서 호출하는 통합 서비스.
|
|
- POST /classify — 의도 분류 (Ollama → Claude fallback)
|
|
- POST /parse-date — 한국어 날짜/시간 파싱
|
|
- POST /chat — 자유 대화 (Ollama → Claude fallback)
|
|
- GET /api-usage — API 사용량 조회
|
|
- GET /health
|
|
"""
|
|
|
|
import json
|
|
import logging
|
|
import os
|
|
import re
|
|
from datetime import datetime, timedelta, date
|
|
from pathlib import Path
|
|
from zoneinfo import ZoneInfo
|
|
|
|
import httpx
|
|
from dotenv import load_dotenv
|
|
from fastapi import FastAPI, Request
|
|
from fastapi.responses import JSONResponse
|
|
|
|
load_dotenv()
|
|
|
|
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
|
|
logger = logging.getLogger("intent_service")
|
|
|
|
KST = ZoneInfo("Asia/Seoul")
|
|
|
|
# Ollama (GPU 서버)
|
|
GPU_OLLAMA_URL = os.getenv("GPU_OLLAMA_URL", "http://192.168.1.186:11434")
|
|
OLLAMA_CLASSIFY_MODEL = os.getenv("OLLAMA_CLASSIFY_MODEL", "id-9b:latest")
|
|
OLLAMA_CHAT_MODEL = os.getenv("OLLAMA_CHAT_MODEL", "id-9b:latest")
|
|
OLLAMA_TIMEOUT = int(os.getenv("OLLAMA_TIMEOUT", "15"))
|
|
|
|
# Claude API
|
|
ANTHROPIC_API_KEY = os.getenv("ANTHROPIC_API_KEY", "")
|
|
CLAUDE_MODEL = os.getenv("CLAUDE_MODEL", "claude-haiku-4-5-20251001")
|
|
API_MONTHLY_LIMIT = float(os.getenv("API_MONTHLY_LIMIT", "10.0")) # USD
|
|
|
|
# API 사용량 파일
|
|
USAGE_DIR = Path.home() / ".syn-chat-bot"
|
|
USAGE_FILE = USAGE_DIR / "api_usage.json"
|
|
|
|
# 이드 시스템 프롬프트 (자유 대화용)
|
|
ID_SYSTEM_PROMPT = """너는 '이드'라는 이름의 AI 비서야. 한국어로 대화해.
|
|
간결하고 실용적으로 답변하되, 친근한 톤을 유지해.
|
|
불필요한 인사나 꾸밈말은 생략하고 핵심만 전달해."""
|
|
|
|
# 의도 분류 프롬프트 (n8n 파이프라인 호환)
|
|
def _build_classify_prompt(user_text: str) -> str:
|
|
now = datetime.now(KST)
|
|
today = now.strftime("%Y-%m-%d")
|
|
current_time = now.strftime("%H:%M:%S")
|
|
day_names = ["월", "화", "수", "목", "금", "토", "일"]
|
|
day_of_week = day_names[now.weekday()]
|
|
|
|
return f"""현재: {today} {current_time} (KST, {day_of_week}요일). 사용자 메시지를 분석하고 아래 JSON 형식으로만 응답하세요. 다른 텍스트는 출력하지 마세요.
|
|
|
|
{{
|
|
"intent": "greeting|question|log_event|calendar|todo|reminder|mail|note|photo|command|report|other",
|
|
"response_tier": "local|api_light|api_heavy",
|
|
"needs_rag": true/false,
|
|
"rag_target": ["documents", "tk_company", "chat_memory"],
|
|
"department_hint": "안전|생산|구매|품질|총무|시설|null",
|
|
"report_domain": "안전|시설설비|품질|null",
|
|
"query": "검색용 쿼리 (needs_rag=false면 null)",
|
|
"title": "추출된 제목 (calendar/todo/note 시)",
|
|
"raw_datetime": "원문의 날짜/시간 표현 (calendar/todo 시)"
|
|
}}
|
|
|
|
intent 분류:
|
|
- log_event: 사실 기록/등록 요청 ("~구입","~완료","~교체","~점검","~수령","~입고","~등록")
|
|
- report: 긴급 사고/재해 신고만 ("사고","부상","화재","누수","폭발","붕괴" + 즉각 대응 필요)
|
|
- question: 정보 질문/조회
|
|
- greeting: 인사/잡담/감사
|
|
- calendar: 일정 등록/조회/삭제 ("일정","회의","미팅","약속","~시에 ~등록","오늘 일정","내일 뭐 있어")
|
|
- todo: 작업/할일/과제 ("~까지 ~작성","~해야 해","할 일","작업")
|
|
- reminder: 알림 설정 ("~시에 알려줘","리마인드") → calendar로 처리
|
|
- mail: 메일 관련 조회 ("메일 확인","받은 메일","이메일","메일 왔어?")
|
|
- note: 메모/기록 요청 ("기록해","메모해","저장해","적어둬")
|
|
※ 애매하면 log_event로 분류 (기록 누락보다 안전)
|
|
|
|
response_tier 판단:
|
|
- local: 인사, 잡담, log_event, report, calendar, todo, reminder, note, 단순 질문, mail 간단조회
|
|
- api_light: 장문 요약(200자+), 다국어 번역, 비교 분석, RAG 결과 종합
|
|
- api_heavy: 법률 해석, 복잡한 다단계 추론, 다중 문서 교차 분석
|
|
※ 판단이 애매하면 local 우선
|
|
|
|
needs_rag 판단:
|
|
- true: 회사문서/절차 질문, 이전 기록 조회("최근","아까","전에"), 기술질문
|
|
- false: 인사, 잡담, 일반상식, log_event, report, calendar, todo, note
|
|
|
|
사용자 메시지: {user_text}"""
|
|
|
|
app = FastAPI(title="Intent Service")
|
|
|
|
|
|
# ==================== 날짜 파싱 ====================
|
|
|
|
# 요일 매핑
|
|
WEEKDAY_MAP = {
|
|
"월요일": 0, "화요일": 1, "수요일": 2, "목요일": 3,
|
|
"금요일": 4, "토요일": 5, "일요일": 6,
|
|
"월": 0, "화": 1, "수": 2, "목": 3, "금": 4, "토": 5, "일": 6,
|
|
}
|
|
|
|
# 상대 날짜 매핑
|
|
RELATIVE_DATE_MAP = {
|
|
"오늘": 0, "금일": 0,
|
|
"내일": 1, "명일": 1,
|
|
"모레": 2, "내일모레": 2,
|
|
"글피": 3,
|
|
}
|
|
|
|
|
|
def _next_weekday(base: date, weekday: int) -> date:
|
|
"""base 이후 가장 가까운 특정 요일 날짜."""
|
|
days_ahead = weekday - base.weekday()
|
|
if days_ahead <= 0:
|
|
days_ahead += 7
|
|
return base + timedelta(days=days_ahead)
|
|
|
|
|
|
def _this_weekday(base: date, weekday: int) -> date:
|
|
"""이번주의 특정 요일 (지났더라도 이번주)."""
|
|
days_diff = weekday - base.weekday()
|
|
return base + timedelta(days=days_diff)
|
|
|
|
|
|
def parse_korean_datetime(text: str, now: datetime | None = None) -> dict:
|
|
"""한국어 날짜/시간 표현을 파싱.
|
|
|
|
Returns:
|
|
{
|
|
"success": True/False,
|
|
"datetime": "2026-03-20T15:00:00+09:00" (ISO format),
|
|
"date_only": True/False (시간 없이 날짜만 파싱된 경우),
|
|
"original": "내일 3시"
|
|
}
|
|
"""
|
|
if not text or not text.strip():
|
|
return {"success": False, "error": "empty input", "original": text}
|
|
|
|
if now is None:
|
|
now = datetime.now(KST)
|
|
today = now.date()
|
|
|
|
text = text.strip()
|
|
parsed_date = None
|
|
parsed_time = None
|
|
date_only = True
|
|
|
|
# 1. 절대 날짜: YYYY-MM-DD, YYYY/MM/DD
|
|
m = re.search(r'(\d{4})[/-](\d{1,2})[/-](\d{1,2})', text)
|
|
if m:
|
|
try:
|
|
parsed_date = date(int(m.group(1)), int(m.group(2)), int(m.group(3)))
|
|
except ValueError:
|
|
pass
|
|
|
|
# 2. M/D 또는 M월D일
|
|
if parsed_date is None:
|
|
m = re.search(r'(\d{1,2})[/월]\s*(\d{1,2})[일]?', text)
|
|
if m:
|
|
month, day = int(m.group(1)), int(m.group(2))
|
|
if 1 <= month <= 12 and 1 <= day <= 31:
|
|
try:
|
|
candidate = date(today.year, month, day)
|
|
# 이미 지난 날짜면 내년
|
|
if candidate < today:
|
|
candidate = date(today.year + 1, month, day)
|
|
parsed_date = candidate
|
|
except ValueError:
|
|
pass
|
|
|
|
# 3. 상대 날짜: 오늘, 내일, 모레, 글피
|
|
if parsed_date is None:
|
|
for keyword, delta in RELATIVE_DATE_MAP.items():
|
|
if keyword in text:
|
|
parsed_date = today + timedelta(days=delta)
|
|
break
|
|
|
|
# 4. 다음주/이번주 + 요일
|
|
if parsed_date is None:
|
|
m = re.search(r'(다음\s*주|차주|다다음\s*주)\s*(월|화|수|목|금|토|일)(?:요일)?', text)
|
|
if m:
|
|
prefix = m.group(1).replace(" ", "")
|
|
weekday = WEEKDAY_MAP[m.group(2)]
|
|
next_monday = _next_weekday(today, 0) # 다음주 월요일
|
|
if "다다음" in prefix:
|
|
next_monday += timedelta(weeks=1)
|
|
parsed_date = next_monday + timedelta(days=weekday)
|
|
else:
|
|
m = re.search(r'이번\s*주\s*(월|화|수|목|금|토|일)(?:요일)?', text)
|
|
if m:
|
|
weekday = WEEKDAY_MAP[m.group(1)]
|
|
parsed_date = _this_weekday(today, weekday)
|
|
|
|
# 5. "X요일" (단독) — 이번주에 아직 안 지났으면 이번주, 지났으면 다음주
|
|
if parsed_date is None:
|
|
m = re.search(r'(월|화|수|목|금|토|일)요일', text)
|
|
if m:
|
|
weekday = WEEKDAY_MAP[m.group(1)]
|
|
candidate = _this_weekday(today, weekday)
|
|
if candidate < today:
|
|
candidate = _next_weekday(today, weekday)
|
|
parsed_date = candidate
|
|
|
|
# 6. 기한 표현: "이번주까지", "이번 주 내로"
|
|
if parsed_date is None:
|
|
if re.search(r'이번\s*주\s*(까지|내로|안에|중)', text):
|
|
# 이번주 금요일
|
|
parsed_date = _this_weekday(today, 4) # 금요일
|
|
if parsed_date < today:
|
|
parsed_date = today
|
|
|
|
# 시간 파싱
|
|
# 오후/오전 X시 (Y분)
|
|
m = re.search(r'(오전|오후|아침|저녁|밤)?\s*(\d{1,2})\s*시\s*(?:(\d{1,2})\s*분)?', text)
|
|
if m:
|
|
period = m.group(1) or ""
|
|
hour = int(m.group(2))
|
|
minute = int(m.group(3)) if m.group(3) else 0
|
|
|
|
if period in ("오후", "저녁", "밤") and hour < 12:
|
|
hour += 12
|
|
elif period == "오전" and hour == 12:
|
|
hour = 0
|
|
elif not period and hour < 8:
|
|
# 시간대 미지정 + 8시 미만 → 오후로 추정
|
|
hour += 12
|
|
|
|
parsed_time = (hour, minute)
|
|
date_only = False
|
|
|
|
# HH:MM 패턴
|
|
if parsed_time is None:
|
|
m = re.search(r'(\d{1,2}):(\d{2})', text)
|
|
if m:
|
|
hour, minute = int(m.group(1)), int(m.group(2))
|
|
if 0 <= hour <= 23 and 0 <= minute <= 59:
|
|
parsed_time = (hour, minute)
|
|
date_only = False
|
|
|
|
# 날짜도 시간도 없으면 실패
|
|
if parsed_date is None and parsed_time is None:
|
|
return {"success": False, "error": "날짜/시간을 인식할 수 없습니다", "original": text}
|
|
|
|
# 날짜 없이 시간만 → 오늘 (이미 지났으면 내일)
|
|
if parsed_date is None and parsed_time is not None:
|
|
parsed_date = today
|
|
check_dt = datetime(today.year, today.month, today.day,
|
|
parsed_time[0], parsed_time[1], tzinfo=KST)
|
|
if check_dt <= now:
|
|
parsed_date = today + timedelta(days=1)
|
|
|
|
# 시간 없이 날짜만 → 날짜만 반환
|
|
if parsed_time is None:
|
|
result_dt = datetime(parsed_date.year, parsed_date.month, parsed_date.day,
|
|
0, 0, 0, tzinfo=KST)
|
|
else:
|
|
result_dt = datetime(parsed_date.year, parsed_date.month, parsed_date.day,
|
|
parsed_time[0], parsed_time[1], 0, tzinfo=KST)
|
|
|
|
return {
|
|
"success": True,
|
|
"datetime": result_dt.isoformat(),
|
|
"date_only": date_only,
|
|
"original": text,
|
|
}
|
|
|
|
|
|
# ==================== API 사용량 추적 ====================
|
|
|
|
def _load_usage() -> dict:
|
|
"""API 사용량 JSON 로드."""
|
|
if USAGE_FILE.exists():
|
|
try:
|
|
return json.loads(USAGE_FILE.read_text())
|
|
except (json.JSONDecodeError, OSError):
|
|
pass
|
|
return {}
|
|
|
|
|
|
def _save_usage(data: dict):
|
|
"""API 사용량 JSON 저장."""
|
|
USAGE_DIR.mkdir(parents=True, exist_ok=True)
|
|
USAGE_FILE.write_text(json.dumps(data, indent=2, ensure_ascii=False))
|
|
|
|
|
|
def _record_api_call(input_tokens: int = 0, output_tokens: int = 0):
|
|
"""Claude API 호출 기록 (월별 누적)."""
|
|
month_key = datetime.now(KST).strftime("%Y-%m")
|
|
usage = _load_usage()
|
|
|
|
if month_key not in usage:
|
|
usage[month_key] = {"calls": 0, "input_tokens": 0, "output_tokens": 0, "estimated_cost_usd": 0.0}
|
|
|
|
entry = usage[month_key]
|
|
entry["calls"] += 1
|
|
entry["input_tokens"] += input_tokens
|
|
entry["output_tokens"] += output_tokens
|
|
|
|
# Haiku 4.5 가격: input $0.80/MTok, output $4.00/MTok
|
|
cost = (input_tokens * 0.8 + output_tokens * 4.0) / 1_000_000
|
|
entry["estimated_cost_usd"] = round(entry["estimated_cost_usd"] + cost, 4)
|
|
|
|
_save_usage(usage)
|
|
|
|
|
|
def _get_monthly_cost() -> float:
|
|
"""이번 달 API 비용."""
|
|
month_key = datetime.now(KST).strftime("%Y-%m")
|
|
usage = _load_usage()
|
|
return usage.get(month_key, {}).get("estimated_cost_usd", 0.0)
|
|
|
|
|
|
# ==================== Ollama / Claude 호출 ====================
|
|
|
|
async def _call_ollama(prompt: str, system: str = "/no_think",
|
|
model: str | None = None, timeout: int | None = None) -> str | None:
|
|
"""GPU Ollama 호출. 실패 시 None 반환."""
|
|
model = model or OLLAMA_CLASSIFY_MODEL
|
|
timeout = timeout or OLLAMA_TIMEOUT
|
|
try:
|
|
async with httpx.AsyncClient(timeout=timeout) as client:
|
|
resp = await client.post(f"{GPU_OLLAMA_URL}/api/generate", json={
|
|
"model": model,
|
|
"prompt": prompt,
|
|
"system": system,
|
|
"stream": False,
|
|
"options": {"num_ctx": 4096, "temperature": 0.1},
|
|
})
|
|
data = resp.json()
|
|
return data.get("response", "").strip()
|
|
except Exception as e:
|
|
logger.warning(f"Ollama call failed: {e}")
|
|
return None
|
|
|
|
|
|
async def _call_claude(prompt: str, system: str | None = None,
|
|
max_tokens: int = 1024) -> tuple[str | None, int, int]:
|
|
"""Claude API Haiku 호출. Returns (response, input_tokens, output_tokens)."""
|
|
if not ANTHROPIC_API_KEY or ANTHROPIC_API_KEY.startswith("sk-ant-xxxxx"):
|
|
logger.warning("Claude API key not configured")
|
|
return None, 0, 0
|
|
|
|
# 예산 체크
|
|
if _get_monthly_cost() >= API_MONTHLY_LIMIT:
|
|
logger.warning(f"API monthly limit reached: ${_get_monthly_cost():.2f} >= ${API_MONTHLY_LIMIT}")
|
|
return None, 0, 0
|
|
|
|
messages = [{"role": "user", "content": prompt}]
|
|
body = {
|
|
"model": CLAUDE_MODEL,
|
|
"max_tokens": max_tokens,
|
|
"messages": messages,
|
|
}
|
|
if system:
|
|
body["system"] = system
|
|
|
|
try:
|
|
async with httpx.AsyncClient(timeout=30) as client:
|
|
resp = await client.post(
|
|
"https://api.anthropic.com/v1/messages",
|
|
json=body,
|
|
headers={
|
|
"x-api-key": ANTHROPIC_API_KEY,
|
|
"anthropic-version": "2023-06-01",
|
|
"content-type": "application/json",
|
|
},
|
|
)
|
|
data = resp.json()
|
|
if resp.status_code != 200:
|
|
logger.error(f"Claude API error: {resp.status_code} {data}")
|
|
return None, 0, 0
|
|
|
|
text = ""
|
|
for block in data.get("content", []):
|
|
if block.get("type") == "text":
|
|
text += block["text"]
|
|
|
|
in_tok = data.get("usage", {}).get("input_tokens", 0)
|
|
out_tok = data.get("usage", {}).get("output_tokens", 0)
|
|
_record_api_call(in_tok, out_tok)
|
|
|
|
return text.strip(), in_tok, out_tok
|
|
except Exception as e:
|
|
logger.error(f"Claude API call failed: {e}")
|
|
return None, 0, 0
|
|
|
|
|
|
# ==================== 엔드포인트 ====================
|
|
|
|
def _keyword_fallback(text: str) -> dict:
|
|
"""AI 실패 시 키워드 기반 분류 (ultimate safety net)."""
|
|
t = text
|
|
intent = "question"
|
|
response_tier = "api_light"
|
|
needs_rag = False
|
|
rag_target = []
|
|
|
|
if re.search(r'일정|회의|미팅|약속|스케줄|캘린더', t) and re.search(r'등록|잡아|추가|만들|넣어|수정|삭제|취소', t):
|
|
intent, response_tier = "calendar", "local"
|
|
elif re.search(r'일정|스케줄|뭐\s*있', t) and re.search(r'오늘|내일|이번|다음', t):
|
|
intent, response_tier = "calendar", "local"
|
|
elif re.search(r'까지|해야|할\s*일|작업', t) and re.search(r'작성|보고서|정리|준비|제출', t):
|
|
intent, response_tier = "todo", "local"
|
|
elif re.search(r'기록해|메모해|저장해|적어둬|메모\s*저장|노트', t):
|
|
intent, response_tier = "note", "local"
|
|
elif re.search(r'메일|이메일|받은\s*편지|mail', t) or (re.search(r'매일', t) and re.search(r'확인|왔|온|요약|읽', t)):
|
|
intent, response_tier = "mail", "local"
|
|
elif re.search(r'\d+시', t) and re.search(r'알려|리마인드|알림', t):
|
|
intent, response_tier = "calendar", "local"
|
|
elif re.search(r'구입|완료|교체|점검|수령|입고|발주', t) and not re.search(r'\?|까$|나$', t):
|
|
intent, response_tier = "log_event", "local"
|
|
else:
|
|
if len(text) <= 30 and not re.search(r'요약|번역|분석|비교', t):
|
|
response_tier = "local"
|
|
needs_rag = bool(re.search(r'회사|절차|문서|안전|품질|규정|아까|전에|기억', t))
|
|
if needs_rag:
|
|
rag_target = ["documents"]
|
|
if re.search(r'회사|절차|안전|품질', t):
|
|
rag_target.append("tk_company")
|
|
if re.search(r'아까|이전|전에|기억', t):
|
|
rag_target.append("chat_memory")
|
|
|
|
return {
|
|
"intent": intent, "response_tier": response_tier,
|
|
"needs_rag": needs_rag, "rag_target": rag_target,
|
|
"department_hint": None, "report_domain": None,
|
|
"query": text, "title": "", "raw_datetime": "",
|
|
"fallback": True, "fallback_method": "keyword",
|
|
}
|
|
|
|
|
|
@app.post("/classify")
|
|
async def classify_intent(request: Request):
|
|
"""의도 분류. body: {message: str}
|
|
|
|
n8n 호환 출력: {intent, response_tier, needs_rag, rag_target, ..., title, raw_datetime, source}
|
|
"""
|
|
body = await request.json()
|
|
message = body.get("message", "").strip()
|
|
if not message:
|
|
return JSONResponse({"success": False, "error": "message required"}, status_code=400)
|
|
|
|
prompt = _build_classify_prompt(message)
|
|
|
|
# 1차: Ollama
|
|
result_text = await _call_ollama(prompt, system="/no_think")
|
|
source = "ollama"
|
|
|
|
# 2차: Claude fallback
|
|
if result_text is None:
|
|
logger.info("Classification fallback to Claude API")
|
|
result_text, _, _ = await _call_claude(prompt, system="JSON만 출력하라. 다른 텍스트 없이.")
|
|
source = "claude"
|
|
|
|
# 완전 실패 → 키워드 fallback
|
|
if not result_text:
|
|
logger.warning("All AI classification failed, using keyword fallback")
|
|
fb = _keyword_fallback(message)
|
|
fb["source"] = "keyword"
|
|
fb["success"] = True
|
|
return fb
|
|
|
|
# JSON 파싱
|
|
try:
|
|
json_match = re.search(r'\{[^}]+\}', result_text, re.DOTALL)
|
|
if json_match:
|
|
parsed = json.loads(json_match.group())
|
|
else:
|
|
parsed = json.loads(result_text)
|
|
except json.JSONDecodeError:
|
|
logger.warning(f"JSON parse failed: {result_text[:200]}")
|
|
fb = _keyword_fallback(message)
|
|
fb["source"] = source
|
|
fb["success"] = True
|
|
return fb
|
|
|
|
intent = parsed.get("intent", "question")
|
|
response_tier = parsed.get("response_tier", "api_light")
|
|
needs_rag = parsed.get("needs_rag", False)
|
|
rag_target = parsed.get("rag_target", [])
|
|
if not isinstance(rag_target, list):
|
|
rag_target = []
|
|
title = parsed.get("title", "")
|
|
raw_datetime = parsed.get("raw_datetime", "")
|
|
|
|
return {
|
|
"success": True,
|
|
"intent": intent,
|
|
"response_tier": response_tier,
|
|
"needs_rag": needs_rag,
|
|
"rag_target": rag_target,
|
|
"department_hint": parsed.get("department_hint"),
|
|
"report_domain": parsed.get("report_domain"),
|
|
"query": parsed.get("query", message),
|
|
"title": title,
|
|
"raw_datetime": raw_datetime,
|
|
"source": source,
|
|
"fallback": False,
|
|
}
|
|
|
|
|
|
@app.post("/parse-date")
|
|
async def parse_date(request: Request):
|
|
"""한국어 날짜/시간 파싱. body: {text: str}"""
|
|
body = await request.json()
|
|
text = body.get("text", "")
|
|
return parse_korean_datetime(text)
|
|
|
|
|
|
@app.post("/chat")
|
|
async def chat(request: Request):
|
|
"""자유 대화. body: {message: str, system?: str, rag_context?: str}
|
|
|
|
1차 Ollama → 실패 시 Claude API (응답에 ☁️ 표시).
|
|
"""
|
|
body = await request.json()
|
|
message = body.get("message", "").strip()
|
|
system = body.get("system", ID_SYSTEM_PROMPT)
|
|
rag_context = body.get("rag_context", "")
|
|
|
|
if not message:
|
|
return JSONResponse({"success": False, "error": "message required"}, status_code=400)
|
|
|
|
# RAG 컨텍스트가 있으면 프롬프트에 추가
|
|
prompt = ""
|
|
if rag_context:
|
|
prompt += f"[참고 자료]\n{rag_context}\n\n"
|
|
prompt += f"사용자: {message}\n이드:"
|
|
|
|
# 1차: Ollama (id-9b, 대화 모델)
|
|
response = await _call_ollama(prompt, system=system, model=OLLAMA_CHAT_MODEL, timeout=30)
|
|
source = "ollama"
|
|
|
|
# 2차: Claude fallback
|
|
if response is None:
|
|
logger.info("Chat fallback to Claude API")
|
|
response, _, _ = await _call_claude(prompt, system=system)
|
|
source = "claude"
|
|
|
|
# 완전 실패
|
|
if not response:
|
|
return {
|
|
"success": False,
|
|
"response": "AI 서비스 일시 중단. 잠시 후 다시 시도해주세요.",
|
|
"source": "error",
|
|
}
|
|
|
|
# Claude API 응답이면 표시 추가
|
|
if source == "claude":
|
|
response = response.rstrip() + " ☁️"
|
|
|
|
return {
|
|
"success": True,
|
|
"response": response,
|
|
"source": source,
|
|
}
|
|
|
|
|
|
@app.get("/api-usage")
|
|
async def api_usage():
|
|
"""API 사용량 조회."""
|
|
usage = _load_usage()
|
|
month_key = datetime.now(KST).strftime("%Y-%m")
|
|
current = usage.get(month_key, {"calls": 0, "input_tokens": 0, "output_tokens": 0, "estimated_cost_usd": 0.0})
|
|
return {
|
|
"current_month": month_key,
|
|
"calls": current["calls"],
|
|
"input_tokens": current["input_tokens"],
|
|
"output_tokens": current["output_tokens"],
|
|
"estimated_cost_usd": current["estimated_cost_usd"],
|
|
"monthly_limit_usd": API_MONTHLY_LIMIT,
|
|
"remaining_usd": round(API_MONTHLY_LIMIT - current["estimated_cost_usd"], 4),
|
|
"all_months": usage,
|
|
}
|
|
|
|
|
|
@app.get("/health")
|
|
async def health():
|
|
"""헬스체크."""
|
|
gpu_ok = False
|
|
try:
|
|
async with httpx.AsyncClient(timeout=5) as client:
|
|
resp = await client.get(f"{GPU_OLLAMA_URL}/api/tags")
|
|
gpu_ok = resp.status_code == 200
|
|
except Exception:
|
|
pass
|
|
|
|
claude_configured = bool(ANTHROPIC_API_KEY and not ANTHROPIC_API_KEY.startswith("sk-ant-xxxxx"))
|
|
|
|
return {
|
|
"status": "ok",
|
|
"gpu_ollama": "ok" if gpu_ok else "unreachable",
|
|
"claude_api": "configured" if claude_configured else "not_configured",
|
|
"monthly_cost_usd": _get_monthly_cost(),
|
|
"monthly_limit_usd": API_MONTHLY_LIMIT,
|
|
}
|