Files
Hyungi Ahn 5b1b89254c feat: RAG 임베딩 자동 동기화 + AI 서비스 개선
- 부적합 라이프사이클 전 과정에서 Qdrant 임베딩 자동 동기화
  - 관리함 5개 저장 함수 + 수신함 상태 변경 시 fire-and-forget sync
  - 30분 주기 전체 재동기화 안전망 (FastAPI lifespan 백그라운드 태스크)
  - build_document_text에 카테고리(final_category/category) 포함
- RAG 질의에 DB 통계 집계 지원 (카테고리별/부서별 건수)
- Qdrant client.search → query_points API 마이그레이션
- AI 어시스턴트 페이지 권한 추가 (tkuser)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-12 13:05:32 +09:00

212 lines
7.6 KiB
Python

import logging
import time
from services.ollama_client import ollama_client
from services.embedding_service import search_similar_by_text, build_document_text
from services.db_client import get_issue_by_id, get_category_stats, get_department_stats
from services.utils import load_prompt
logger = logging.getLogger(__name__)
_stats_cache = {"data": "", "expires": 0}
STATS_CACHE_TTL = 300 # 5분
STATS_KEYWORDS = {"많이", "빈도", "추이", "비율", "통계", "몇 건", "자주", "빈번", "유형별", "부서별"}
def _needs_stats(question: str) -> bool:
"""키워드 매칭으로 통계성 질문인지 판별"""
return any(kw in question for kw in STATS_KEYWORDS)
def _build_stats_summary() -> str:
"""DB 집계 통계 요약 (5분 TTL 캐싱, 실패 시 빈 문자열)"""
now = time.time()
if _stats_cache["data"] and now < _stats_cache["expires"]:
return _stats_cache["data"]
try:
lines = ["[전체 통계 요약]"]
cats = get_category_stats()
if cats:
total = sum(c["count"] for c in cats)
lines.append(f"총 부적합 건수: {total}")
lines.append("카테고리별:")
for c in cats[:10]:
pct = round(c["count"] / total * 100, 1)
lines.append(f" - {c['category']}: {c['count']}건 ({pct}%)")
depts = get_department_stats()
if depts:
lines.append("부서별:")
for d in depts[:10]:
lines.append(f" - {d['department']}: {d['count']}")
if len(lines) <= 1:
return "" # 데이터 없으면 빈 문자열
result = "\n".join(lines)
_stats_cache["data"] = result
_stats_cache["expires"] = now + STATS_CACHE_TTL
return result
except Exception as e:
logger.warning(f"Stats summary failed: {e}")
return ""
def _format_retrieved_issues(results: list[dict]) -> str:
if not results:
return "관련 과거 사례가 없습니다."
lines = []
for i, r in enumerate(results, 1):
meta = r.get("metadata", {})
similarity = round(r.get("similarity", 0) * 100)
doc = (r.get("document", ""))[:500]
cat = meta.get("category", "")
dept = meta.get("responsible_department", "")
status = meta.get("review_status", "")
has_sol = meta.get("has_solution", "false")
date = meta.get("report_date", "")
issue_id = meta.get("issue_id", r["id"])
lines.append(
f"[사례 {i}] No.{issue_id} (유사도 {similarity}%)\n"
f" 분류: {cat} | 부서: {dept} | 상태: {status} | 날짜: {date} | 해결여부: {'O' if has_sol == 'true' else 'X'}\n"
f" 내용: {doc}"
)
return "\n\n".join(lines)
async def rag_suggest_solution(issue_id: int) -> dict:
"""과거 유사 이슈의 해결 사례를 참고하여 해결방안을 제안"""
issue = get_issue_by_id(issue_id)
if not issue:
return {"available": False, "error": "이슈를 찾을 수 없습니다"}
doc_text = build_document_text(issue)
if not doc_text.strip():
return {"available": False, "error": "이슈 내용이 비어있습니다"}
# 해결 완료된 유사 이슈 검색
similar = await search_similar_by_text(
doc_text, n_results=5, filters={"has_solution": "true"}
)
# 해결 안 된 것도 포함 (참고용)
if len(similar) < 3:
all_similar = await search_similar_by_text(doc_text, n_results=5)
seen = {r["id"] for r in similar}
for r in all_similar:
if r["id"] not in seen:
similar.append(r)
if len(similar) >= 5:
break
context = _format_retrieved_issues(similar)
template = load_prompt("prompts/rag_suggest_solution.txt")
prompt = template.format(
description=issue.get("description", ""),
detail_notes=issue.get("detail_notes", ""),
category=issue.get("category", ""),
retrieved_cases=context,
)
response = await ollama_client.generate_text(prompt)
return {
"available": True,
"issue_id": issue_id,
"suggestion": response,
"referenced_issues": [
{
"id": r.get("metadata", {}).get("issue_id", r["id"]),
"similarity": round(r.get("similarity", 0) * 100),
"has_solution": r.get("metadata", {}).get("has_solution", "false") == "true",
}
for r in similar
],
}
async def rag_ask(question: str, project_id: int = None) -> dict:
"""부적합 데이터를 기반으로 자연어 질문에 답변"""
# 프로젝트 필터 없이 전체 데이터에서 검색 (과거 미지정 데이터 포함)
results = await search_similar_by_text(
question, n_results=7, filters=None
)
logger.info(f"RAG ask: question='{question[:50]}', results={len(results)}")
context = _format_retrieved_issues(results)
# 통계성 질문일 때만 DB 집계 포함 (토큰 절약)
stats = _build_stats_summary() if _needs_stats(question) else ""
template = load_prompt("prompts/rag_qa.txt")
prompt = template.format(
question=question,
stats_summary=stats,
retrieved_cases=context,
)
response = await ollama_client.generate_text(prompt)
return {
"available": True,
"answer": response,
"sources": [
{
"id": r.get("metadata", {}).get("issue_id", r["id"]),
"similarity": round(r.get("similarity", 0) * 100),
"snippet": (r.get("document", ""))[:100],
}
for r in results
],
}
async def rag_analyze_pattern(description: str, n_results: int = 10) -> dict:
"""유사 부적합 패턴 분석 — 반복되는 문제인지, 근본 원인은 무엇인지"""
results = await search_similar_by_text(description, n_results=n_results)
context = _format_retrieved_issues(results)
template = load_prompt("prompts/rag_pattern.txt")
prompt = template.format(
description=description,
retrieved_cases=context,
total_similar=len(results),
)
response = await ollama_client.generate_text(prompt)
return {
"available": True,
"analysis": response,
"similar_count": len(results),
"sources": [
{
"id": r.get("metadata", {}).get("issue_id", r["id"]),
"similarity": round(r.get("similarity", 0) * 100),
"category": r.get("metadata", {}).get("category", ""),
}
for r in results
],
}
async def rag_classify_with_context(description: str, detail_notes: str = "") -> dict:
"""과거 사례를 참고하여 더 정확한 분류 수행 (기존 classify 강화)"""
query = f"{description} {detail_notes}".strip()
similar = await search_similar_by_text(query, n_results=5)
context = _format_retrieved_issues(similar)
template = load_prompt("prompts/rag_classify.txt")
prompt = template.format(
description=description,
detail_notes=detail_notes,
retrieved_cases=context,
)
raw = await ollama_client.generate_text(prompt)
import json
try:
start = raw.find("{")
end = raw.rfind("}") + 1
if start >= 0 and end > start:
result = json.loads(raw[start:end])
result["rag_enhanced"] = True
result["referenced_count"] = len(similar)
return {"available": True, **result}
except json.JSONDecodeError:
pass
return {"available": True, "raw_response": raw, "rag_enhanced": True}