feat: AI 서비스 및 AI 어시스턴트 전용 페이지 추가

- ai-service: Ollama 기반 AI 서비스 (분류, 시맨틱 검색, RAG Q&A, 패턴 분석)
- AI 어시스턴트 페이지: 채팅형 Q&A, 시맨틱 검색, 패턴 분석, 분류 테스트
- 권한 시스템에 ai_assistant 페이지 등록 (기본 비활성)
- 기존 페이지에 AI 기능 통합 (대시보드, 수신함, 관리함)
- docker-compose, gateway, nginx 설정 업데이트

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Hyungi Ahn
2026-03-06 09:38:30 +09:00
parent d385ce7ac1
commit b3012b8320
44 changed files with 2914 additions and 53 deletions

View File

View File

@@ -0,0 +1,60 @@
import json
from services.ollama_client import ollama_client
from config import settings
CLASSIFY_PROMPT_PATH = "prompts/classify_issue.txt"
SUMMARIZE_PROMPT_PATH = "prompts/summarize_issue.txt"
def _load_prompt(path: str) -> str:
with open(path, "r", encoding="utf-8") as f:
return f.read()
async def classify_issue(description: str, detail_notes: str = "") -> dict:
template = _load_prompt(CLASSIFY_PROMPT_PATH)
prompt = template.format(
description=description or "",
detail_notes=detail_notes or "",
)
raw = await ollama_client.generate_text(prompt)
try:
start = raw.find("{")
end = raw.rfind("}") + 1
if start >= 0 and end > start:
return json.loads(raw[start:end])
except json.JSONDecodeError:
pass
return {"raw_response": raw, "parse_error": True}
async def summarize_issue(
description: str, detail_notes: str = "", solution: str = ""
) -> dict:
template = _load_prompt(SUMMARIZE_PROMPT_PATH)
prompt = template.format(
description=description or "",
detail_notes=detail_notes or "",
solution=solution or "",
)
raw = await ollama_client.generate_text(prompt)
try:
start = raw.find("{")
end = raw.rfind("}") + 1
if start >= 0 and end > start:
return json.loads(raw[start:end])
except json.JSONDecodeError:
pass
return {"summary": raw.strip()}
async def classify_and_summarize(
description: str, detail_notes: str = ""
) -> dict:
classification = await classify_issue(description, detail_notes)
summary_result = await summarize_issue(description, detail_notes)
return {
"classification": classification,
"summary": summary_result.get("summary", ""),
}

View File

@@ -0,0 +1,97 @@
from urllib.parse import quote_plus
from sqlalchemy import create_engine, text
from config import settings
def get_engine():
password = quote_plus(settings.DB_PASSWORD)
url = (
f"mysql+pymysql://{settings.DB_USER}:{password}"
f"@{settings.DB_HOST}:{settings.DB_PORT}/{settings.DB_NAME}"
)
return create_engine(url, pool_pre_ping=True, pool_size=5)
engine = get_engine()
def get_all_issues() -> list[dict]:
with engine.connect() as conn:
result = conn.execute(
text(
"SELECT id, category, description, detail_notes, "
"final_description, final_category, solution, "
"management_comment, cause_detail, project_id, "
"review_status, report_date, responsible_department, "
"location_info "
"FROM qc_issues ORDER BY id"
)
)
return [dict(row._mapping) for row in result]
def get_issue_by_id(issue_id: int) -> dict | None:
with engine.connect() as conn:
result = conn.execute(
text(
"SELECT id, category, description, detail_notes, "
"final_description, final_category, solution, "
"management_comment, cause_detail, project_id, "
"review_status, report_date, responsible_department, "
"location_info "
"FROM qc_issues WHERE id = :id"
),
{"id": issue_id},
)
row = result.fetchone()
return dict(row._mapping) if row else None
def get_issues_since(last_id: int) -> list[dict]:
with engine.connect() as conn:
result = conn.execute(
text(
"SELECT id, category, description, detail_notes, "
"final_description, final_category, solution, "
"management_comment, cause_detail, project_id, "
"review_status, report_date, responsible_department, "
"location_info "
"FROM qc_issues WHERE id > :last_id ORDER BY id"
),
{"last_id": last_id},
)
return [dict(row._mapping) for row in result]
def get_daily_qc_stats(date_str: str) -> dict:
with engine.connect() as conn:
result = conn.execute(
text(
"SELECT "
" COUNT(*) as total, "
" SUM(CASE WHEN DATE(report_date) = :d THEN 1 ELSE 0 END) as new_today, "
" SUM(CASE WHEN review_status = 'in_progress' THEN 1 ELSE 0 END) as in_progress, "
" SUM(CASE WHEN review_status = 'completed' THEN 1 ELSE 0 END) as completed, "
" SUM(CASE WHEN review_status = 'pending_review' THEN 1 ELSE 0 END) as pending "
"FROM qc_issues"
),
{"d": date_str},
)
row = result.fetchone()
return dict(row._mapping) if row else {}
def get_issues_for_date(date_str: str) -> list[dict]:
with engine.connect() as conn:
result = conn.execute(
text(
"SELECT id, category, description, detail_notes, "
"review_status, responsible_department, solution "
"FROM qc_issues "
"WHERE DATE(report_date) = :d "
"ORDER BY id"
),
{"d": date_str},
)
return [dict(row._mapping) for row in result]

View File

@@ -0,0 +1,144 @@
from services.ollama_client import ollama_client
from db.vector_store import vector_store
from db.metadata_store import metadata_store
from services.db_client import get_all_issues, get_issue_by_id, get_issues_since
def build_document_text(issue: dict) -> str:
parts = []
if issue.get("description"):
parts.append(issue["description"])
if issue.get("final_description"):
parts.append(issue["final_description"])
if issue.get("detail_notes"):
parts.append(issue["detail_notes"])
if issue.get("solution"):
parts.append(f"해결: {issue['solution']}")
if issue.get("management_comment"):
parts.append(f"의견: {issue['management_comment']}")
if issue.get("cause_detail"):
parts.append(f"원인: {issue['cause_detail']}")
return " ".join(parts)
def build_metadata(issue: dict) -> dict:
meta = {"issue_id": issue["id"]}
for key in [
"category", "project_id", "review_status",
"responsible_department", "location_info",
]:
val = issue.get(key)
if val is not None:
meta[key] = str(val)
rd = issue.get("report_date")
if rd:
meta["report_date"] = str(rd)[:10]
meta["has_solution"] = "true" if issue.get("solution") else "false"
return meta
async def sync_all_issues() -> dict:
issues = get_all_issues()
synced = 0
skipped = 0
for issue in issues:
doc_text = build_document_text(issue)
if not doc_text.strip():
skipped += 1
continue
try:
embedding = await ollama_client.generate_embedding(doc_text)
vector_store.upsert(
doc_id=f"issue_{issue['id']}",
document=doc_text,
embedding=embedding,
metadata=build_metadata(issue),
)
synced += 1
except Exception as e:
skipped += 1
if issues:
max_id = max(i["id"] for i in issues)
metadata_store.set_last_synced_id(max_id)
return {"synced": synced, "skipped": skipped, "total": len(issues)}
async def sync_single_issue(issue_id: int) -> dict:
issue = get_issue_by_id(issue_id)
if not issue:
return {"status": "not_found"}
doc_text = build_document_text(issue)
if not doc_text.strip():
return {"status": "empty_text"}
embedding = await ollama_client.generate_embedding(doc_text)
vector_store.upsert(
doc_id=f"issue_{issue['id']}",
document=doc_text,
embedding=embedding,
metadata=build_metadata(issue),
)
return {"status": "synced", "issue_id": issue_id}
async def sync_incremental() -> dict:
last_id = metadata_store.get_last_synced_id()
issues = get_issues_since(last_id)
synced = 0
for issue in issues:
doc_text = build_document_text(issue)
if not doc_text.strip():
continue
try:
embedding = await ollama_client.generate_embedding(doc_text)
vector_store.upsert(
doc_id=f"issue_{issue['id']}",
document=doc_text,
embedding=embedding,
metadata=build_metadata(issue),
)
synced += 1
except Exception:
pass
if issues:
max_id = max(i["id"] for i in issues)
metadata_store.set_last_synced_id(max_id)
return {"synced": synced, "new_issues": len(issues)}
async def search_similar_by_id(issue_id: int, n_results: int = 5) -> list[dict]:
issue = get_issue_by_id(issue_id)
if not issue:
return []
doc_text = build_document_text(issue)
if not doc_text.strip():
return []
embedding = await ollama_client.generate_embedding(doc_text)
results = vector_store.query(
embedding=embedding,
n_results=n_results + 1,
)
# exclude self
filtered = []
for r in results:
if r["id"] != f"issue_{issue_id}":
filtered.append(r)
return filtered[:n_results]
async def search_similar_by_text(query: str, n_results: int = 5, filters: dict = None) -> list[dict]:
embedding = await ollama_client.generate_embedding(query)
where = None
if filters:
conditions = []
for k, v in filters.items():
if v is not None:
conditions.append({k: str(v)})
if len(conditions) == 1:
where = conditions[0]
elif len(conditions) > 1:
where = {"$and": conditions}
return vector_store.query(
embedding=embedding,
n_results=n_results,
where=where,
)

View File

@@ -0,0 +1,57 @@
import httpx
from config import settings
class OllamaClient:
def __init__(self):
self.base_url = settings.OLLAMA_BASE_URL
self.timeout = httpx.Timeout(float(settings.OLLAMA_TIMEOUT), connect=10.0)
async def generate_embedding(self, text: str) -> list[float]:
async with httpx.AsyncClient(timeout=self.timeout) as client:
response = await client.post(
f"{self.base_url}/api/embeddings",
json={"model": settings.OLLAMA_EMBED_MODEL, "prompt": text},
)
response.raise_for_status()
return response.json()["embedding"]
async def batch_embeddings(self, texts: list[str]) -> list[list[float]]:
results = []
for text in texts:
emb = await self.generate_embedding(text)
results.append(emb)
return results
async def generate_text(self, prompt: str, system: str = None) -> str:
messages = []
if system:
messages.append({"role": "system", "content": system})
messages.append({"role": "user", "content": prompt})
async with httpx.AsyncClient(timeout=self.timeout) as client:
response = await client.post(
f"{self.base_url}/api/chat",
json={
"model": settings.OLLAMA_TEXT_MODEL,
"messages": messages,
"stream": False,
"options": {"temperature": 0.3, "num_predict": 2048},
},
)
response.raise_for_status()
return response.json()["message"]["content"]
async def check_health(self) -> dict:
try:
async with httpx.AsyncClient(timeout=httpx.Timeout(5.0)) as client:
response = await client.get(f"{self.base_url}/api/tags")
models = response.json().get("models", [])
return {
"status": "connected",
"models": [m["name"] for m in models],
}
except Exception:
return {"status": "disconnected"}
ollama_client = OllamaClient()

View File

@@ -0,0 +1,164 @@
from services.ollama_client import ollama_client
from services.embedding_service import search_similar_by_text, build_document_text
from services.db_client import get_issue_by_id
def _load_prompt(path: str) -> str:
with open(path, "r", encoding="utf-8") as f:
return f.read()
def _format_retrieved_issues(results: list[dict]) -> str:
if not results:
return "관련 과거 사례가 없습니다."
lines = []
for i, r in enumerate(results, 1):
meta = r.get("metadata", {})
similarity = round(r.get("similarity", 0) * 100)
doc = (r.get("document", ""))[:500]
cat = meta.get("category", "")
dept = meta.get("responsible_department", "")
status = meta.get("review_status", "")
has_sol = meta.get("has_solution", "false")
date = meta.get("report_date", "")
issue_id = meta.get("issue_id", r["id"])
lines.append(
f"[사례 {i}] No.{issue_id} (유사도 {similarity}%)\n"
f" 분류: {cat} | 부서: {dept} | 상태: {status} | 날짜: {date} | 해결여부: {'O' if has_sol == 'true' else 'X'}\n"
f" 내용: {doc}"
)
return "\n\n".join(lines)
async def rag_suggest_solution(issue_id: int) -> dict:
"""과거 유사 이슈의 해결 사례를 참고하여 해결방안을 제안"""
issue = get_issue_by_id(issue_id)
if not issue:
return {"available": False, "error": "이슈를 찾을 수 없습니다"}
doc_text = build_document_text(issue)
if not doc_text.strip():
return {"available": False, "error": "이슈 내용이 비어있습니다"}
# 해결 완료된 유사 이슈 검색
similar = await search_similar_by_text(
doc_text, n_results=5, filters={"has_solution": "true"}
)
# 해결 안 된 것도 포함 (참고용)
if len(similar) < 3:
all_similar = await search_similar_by_text(doc_text, n_results=5)
seen = {r["id"] for r in similar}
for r in all_similar:
if r["id"] not in seen:
similar.append(r)
if len(similar) >= 5:
break
context = _format_retrieved_issues(similar)
template = _load_prompt("prompts/rag_suggest_solution.txt")
prompt = template.format(
description=issue.get("description", ""),
detail_notes=issue.get("detail_notes", ""),
category=issue.get("category", ""),
retrieved_cases=context,
)
response = await ollama_client.generate_text(prompt)
return {
"available": True,
"issue_id": issue_id,
"suggestion": response,
"referenced_issues": [
{
"id": r.get("metadata", {}).get("issue_id", r["id"]),
"similarity": round(r.get("similarity", 0) * 100),
"has_solution": r.get("metadata", {}).get("has_solution", "false") == "true",
}
for r in similar
],
}
async def rag_ask(question: str, project_id: int = None) -> dict:
"""부적합 데이터를 기반으로 자연어 질문에 답변"""
# 프로젝트 필터 없이 전체 데이터에서 검색 (과거 미지정 데이터 포함)
results = await search_similar_by_text(
question, n_results=15, filters=None
)
context = _format_retrieved_issues(results)
template = _load_prompt("prompts/rag_qa.txt")
prompt = template.format(
question=question,
retrieved_cases=context,
)
response = await ollama_client.generate_text(prompt)
return {
"available": True,
"answer": response,
"sources": [
{
"id": r.get("metadata", {}).get("issue_id", r["id"]),
"similarity": round(r.get("similarity", 0) * 100),
"snippet": (r.get("document", ""))[:100],
}
for r in results
],
}
async def rag_analyze_pattern(description: str, n_results: int = 10) -> dict:
"""유사 부적합 패턴 분석 — 반복되는 문제인지, 근본 원인은 무엇인지"""
results = await search_similar_by_text(description, n_results=n_results)
context = _format_retrieved_issues(results)
template = _load_prompt("prompts/rag_pattern.txt")
prompt = template.format(
description=description,
retrieved_cases=context,
total_similar=len(results),
)
response = await ollama_client.generate_text(prompt)
return {
"available": True,
"analysis": response,
"similar_count": len(results),
"sources": [
{
"id": r.get("metadata", {}).get("issue_id", r["id"]),
"similarity": round(r.get("similarity", 0) * 100),
"category": r.get("metadata", {}).get("category", ""),
}
for r in results
],
}
async def rag_classify_with_context(description: str, detail_notes: str = "") -> dict:
"""과거 사례를 참고하여 더 정확한 분류 수행 (기존 classify 강화)"""
query = f"{description} {detail_notes}".strip()
similar = await search_similar_by_text(query, n_results=5)
context = _format_retrieved_issues(similar)
template = _load_prompt("prompts/rag_classify.txt")
prompt = template.format(
description=description,
detail_notes=detail_notes,
retrieved_cases=context,
)
raw = await ollama_client.generate_text(prompt)
import json
try:
start = raw.find("{")
end = raw.rfind("}") + 1
if start >= 0 and end > start:
result = json.loads(raw[start:end])
result["rag_enhanced"] = True
result["referenced_count"] = len(similar)
return {"available": True, **result}
except json.JSONDecodeError:
pass
return {"available": True, "raw_response": raw, "rag_enhanced": True}

View File

@@ -0,0 +1,122 @@
import httpx
from services.ollama_client import ollama_client
from services.db_client import get_daily_qc_stats, get_issues_for_date
from config import settings
REPORT_PROMPT_PATH = "prompts/daily_report.txt"
def _load_prompt(path: str) -> str:
with open(path, "r", encoding="utf-8") as f:
return f.read()
async def _fetch_system1_data(date_str: str, token: str) -> dict:
headers = {"Authorization": f"Bearer {token}"}
data = {"attendance": None, "work_reports": None, "patrol": None}
try:
async with httpx.AsyncClient(timeout=15.0) as client:
# 근태
try:
r = await client.get(
f"{settings.SYSTEM1_API_URL}/api/attendance/daily-status",
params={"date": date_str},
headers=headers,
)
if r.status_code == 200:
data["attendance"] = r.json()
except Exception:
pass
# 작업보고
try:
r = await client.get(
f"{settings.SYSTEM1_API_URL}/api/daily-work-reports/summary",
params={"date": date_str},
headers=headers,
)
if r.status_code == 200:
data["work_reports"] = r.json()
except Exception:
pass
# 순회점검
try:
r = await client.get(
f"{settings.SYSTEM1_API_URL}/api/patrol/today-status",
params={"date": date_str},
headers=headers,
)
if r.status_code == 200:
data["patrol"] = r.json()
except Exception:
pass
except Exception:
pass
return data
def _format_attendance(data) -> str:
if not data:
return "데이터 없음"
if isinstance(data, dict):
parts = []
for k, v in data.items():
parts.append(f" {k}: {v}")
return "\n".join(parts)
return str(data)
def _format_work_reports(data) -> str:
if not data:
return "데이터 없음"
return str(data)
def _format_qc_issues(issues: list[dict], stats: dict) -> str:
lines = []
lines.append(f"전체: {stats.get('total', 0)}")
lines.append(f"금일 신규: {stats.get('new_today', 0)}")
lines.append(f"진행중: {stats.get('in_progress', 0)}")
lines.append(f"완료: {stats.get('completed', 0)}")
lines.append(f"미검토: {stats.get('pending', 0)}")
if issues:
lines.append("\n금일 신규 이슈:")
for iss in issues[:10]:
cat = iss.get("category", "")
desc = (iss.get("description") or "")[:50]
status = iss.get("review_status", "")
lines.append(f" - [{cat}] {desc} (상태: {status})")
return "\n".join(lines)
def _format_patrol(data) -> str:
if not data:
return "데이터 없음"
return str(data)
async def generate_daily_report(
date_str: str, project_id: int = None, token: str = ""
) -> dict:
system1_data = await _fetch_system1_data(date_str, token)
qc_stats = get_daily_qc_stats(date_str)
qc_issues = get_issues_for_date(date_str)
template = _load_prompt(REPORT_PROMPT_PATH)
prompt = template.format(
date=date_str,
attendance_data=_format_attendance(system1_data["attendance"]),
work_report_data=_format_work_reports(system1_data["work_reports"]),
qc_issue_data=_format_qc_issues(qc_issues, qc_stats),
patrol_data=_format_patrol(system1_data["patrol"]),
)
report_text = await ollama_client.generate_text(prompt)
return {
"date": date_str,
"report": report_text,
"stats": {
"qc": qc_stats,
"new_issues_count": len(qc_issues),
},
}