- 부적합 라이프사이클 전 과정에서 Qdrant 임베딩 자동 동기화 - 관리함 5개 저장 함수 + 수신함 상태 변경 시 fire-and-forget sync - 30분 주기 전체 재동기화 안전망 (FastAPI lifespan 백그라운드 태스크) - build_document_text에 카테고리(final_category/category) 포함 - RAG 질의에 DB 통계 집계 지원 (카테고리별/부서별 건수) - Qdrant client.search → query_points API 마이그레이션 - AI 어시스턴트 페이지 권한 추가 (tkuser) Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
212 lines
7.6 KiB
Python
212 lines
7.6 KiB
Python
import logging
|
|
import time
|
|
|
|
from services.ollama_client import ollama_client
|
|
from services.embedding_service import search_similar_by_text, build_document_text
|
|
from services.db_client import get_issue_by_id, get_category_stats, get_department_stats
|
|
from services.utils import load_prompt
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
_stats_cache = {"data": "", "expires": 0}
|
|
STATS_CACHE_TTL = 300 # 5분
|
|
|
|
STATS_KEYWORDS = {"많이", "빈도", "추이", "비율", "통계", "몇 건", "자주", "빈번", "유형별", "부서별"}
|
|
|
|
|
|
def _needs_stats(question: str) -> bool:
|
|
"""키워드 매칭으로 통계성 질문인지 판별"""
|
|
return any(kw in question for kw in STATS_KEYWORDS)
|
|
|
|
|
|
def _build_stats_summary() -> str:
|
|
"""DB 집계 통계 요약 (5분 TTL 캐싱, 실패 시 빈 문자열)"""
|
|
now = time.time()
|
|
if _stats_cache["data"] and now < _stats_cache["expires"]:
|
|
return _stats_cache["data"]
|
|
try:
|
|
lines = ["[전체 통계 요약]"]
|
|
cats = get_category_stats()
|
|
if cats:
|
|
total = sum(c["count"] for c in cats)
|
|
lines.append(f"총 부적합 건수: {total}건")
|
|
lines.append("카테고리별:")
|
|
for c in cats[:10]:
|
|
pct = round(c["count"] / total * 100, 1)
|
|
lines.append(f" - {c['category']}: {c['count']}건 ({pct}%)")
|
|
depts = get_department_stats()
|
|
if depts:
|
|
lines.append("부서별:")
|
|
for d in depts[:10]:
|
|
lines.append(f" - {d['department']}: {d['count']}건")
|
|
if len(lines) <= 1:
|
|
return "" # 데이터 없으면 빈 문자열
|
|
result = "\n".join(lines)
|
|
_stats_cache["data"] = result
|
|
_stats_cache["expires"] = now + STATS_CACHE_TTL
|
|
return result
|
|
except Exception as e:
|
|
logger.warning(f"Stats summary failed: {e}")
|
|
return ""
|
|
|
|
|
|
def _format_retrieved_issues(results: list[dict]) -> str:
|
|
if not results:
|
|
return "관련 과거 사례가 없습니다."
|
|
lines = []
|
|
for i, r in enumerate(results, 1):
|
|
meta = r.get("metadata", {})
|
|
similarity = round(r.get("similarity", 0) * 100)
|
|
doc = (r.get("document", ""))[:500]
|
|
cat = meta.get("category", "")
|
|
dept = meta.get("responsible_department", "")
|
|
status = meta.get("review_status", "")
|
|
has_sol = meta.get("has_solution", "false")
|
|
date = meta.get("report_date", "")
|
|
issue_id = meta.get("issue_id", r["id"])
|
|
lines.append(
|
|
f"[사례 {i}] No.{issue_id} (유사도 {similarity}%)\n"
|
|
f" 분류: {cat} | 부서: {dept} | 상태: {status} | 날짜: {date} | 해결여부: {'O' if has_sol == 'true' else 'X'}\n"
|
|
f" 내용: {doc}"
|
|
)
|
|
return "\n\n".join(lines)
|
|
|
|
|
|
async def rag_suggest_solution(issue_id: int) -> dict:
|
|
"""과거 유사 이슈의 해결 사례를 참고하여 해결방안을 제안"""
|
|
issue = get_issue_by_id(issue_id)
|
|
if not issue:
|
|
return {"available": False, "error": "이슈를 찾을 수 없습니다"}
|
|
|
|
doc_text = build_document_text(issue)
|
|
if not doc_text.strip():
|
|
return {"available": False, "error": "이슈 내용이 비어있습니다"}
|
|
|
|
# 해결 완료된 유사 이슈 검색
|
|
similar = await search_similar_by_text(
|
|
doc_text, n_results=5, filters={"has_solution": "true"}
|
|
)
|
|
# 해결 안 된 것도 포함 (참고용)
|
|
if len(similar) < 3:
|
|
all_similar = await search_similar_by_text(doc_text, n_results=5)
|
|
seen = {r["id"] for r in similar}
|
|
for r in all_similar:
|
|
if r["id"] not in seen:
|
|
similar.append(r)
|
|
if len(similar) >= 5:
|
|
break
|
|
|
|
context = _format_retrieved_issues(similar)
|
|
template = load_prompt("prompts/rag_suggest_solution.txt")
|
|
prompt = template.format(
|
|
description=issue.get("description", ""),
|
|
detail_notes=issue.get("detail_notes", ""),
|
|
category=issue.get("category", ""),
|
|
retrieved_cases=context,
|
|
)
|
|
|
|
response = await ollama_client.generate_text(prompt)
|
|
return {
|
|
"available": True,
|
|
"issue_id": issue_id,
|
|
"suggestion": response,
|
|
"referenced_issues": [
|
|
{
|
|
"id": r.get("metadata", {}).get("issue_id", r["id"]),
|
|
"similarity": round(r.get("similarity", 0) * 100),
|
|
"has_solution": r.get("metadata", {}).get("has_solution", "false") == "true",
|
|
}
|
|
for r in similar
|
|
],
|
|
}
|
|
|
|
|
|
async def rag_ask(question: str, project_id: int = None) -> dict:
|
|
"""부적합 데이터를 기반으로 자연어 질문에 답변"""
|
|
# 프로젝트 필터 없이 전체 데이터에서 검색 (과거 미지정 데이터 포함)
|
|
results = await search_similar_by_text(
|
|
question, n_results=7, filters=None
|
|
)
|
|
logger.info(f"RAG ask: question='{question[:50]}', results={len(results)}")
|
|
context = _format_retrieved_issues(results)
|
|
|
|
# 통계성 질문일 때만 DB 집계 포함 (토큰 절약)
|
|
stats = _build_stats_summary() if _needs_stats(question) else ""
|
|
|
|
template = load_prompt("prompts/rag_qa.txt")
|
|
prompt = template.format(
|
|
question=question,
|
|
stats_summary=stats,
|
|
retrieved_cases=context,
|
|
)
|
|
|
|
response = await ollama_client.generate_text(prompt)
|
|
return {
|
|
"available": True,
|
|
"answer": response,
|
|
"sources": [
|
|
{
|
|
"id": r.get("metadata", {}).get("issue_id", r["id"]),
|
|
"similarity": round(r.get("similarity", 0) * 100),
|
|
"snippet": (r.get("document", ""))[:100],
|
|
}
|
|
for r in results
|
|
],
|
|
}
|
|
|
|
|
|
async def rag_analyze_pattern(description: str, n_results: int = 10) -> dict:
|
|
"""유사 부적합 패턴 분석 — 반복되는 문제인지, 근본 원인은 무엇인지"""
|
|
results = await search_similar_by_text(description, n_results=n_results)
|
|
context = _format_retrieved_issues(results)
|
|
|
|
template = load_prompt("prompts/rag_pattern.txt")
|
|
prompt = template.format(
|
|
description=description,
|
|
retrieved_cases=context,
|
|
total_similar=len(results),
|
|
)
|
|
|
|
response = await ollama_client.generate_text(prompt)
|
|
return {
|
|
"available": True,
|
|
"analysis": response,
|
|
"similar_count": len(results),
|
|
"sources": [
|
|
{
|
|
"id": r.get("metadata", {}).get("issue_id", r["id"]),
|
|
"similarity": round(r.get("similarity", 0) * 100),
|
|
"category": r.get("metadata", {}).get("category", ""),
|
|
}
|
|
for r in results
|
|
],
|
|
}
|
|
|
|
|
|
async def rag_classify_with_context(description: str, detail_notes: str = "") -> dict:
|
|
"""과거 사례를 참고하여 더 정확한 분류 수행 (기존 classify 강화)"""
|
|
query = f"{description} {detail_notes}".strip()
|
|
similar = await search_similar_by_text(query, n_results=5)
|
|
context = _format_retrieved_issues(similar)
|
|
|
|
template = load_prompt("prompts/rag_classify.txt")
|
|
prompt = template.format(
|
|
description=description,
|
|
detail_notes=detail_notes,
|
|
retrieved_cases=context,
|
|
)
|
|
|
|
raw = await ollama_client.generate_text(prompt)
|
|
import json
|
|
try:
|
|
start = raw.find("{")
|
|
end = raw.rfind("}") + 1
|
|
if start >= 0 and end > start:
|
|
result = json.loads(raw[start:end])
|
|
result["rag_enhanced"] = True
|
|
result["referenced_count"] = len(similar)
|
|
return {"available": True, **result}
|
|
except json.JSONDecodeError:
|
|
pass
|
|
return {"available": True, "raw_response": raw, "rag_enhanced": True}
|