Files
tk-factory-services/ai-service/services/db_client.py
Hyungi Ahn 5b1b89254c feat: RAG 임베딩 자동 동기화 + AI 서비스 개선
- 부적합 라이프사이클 전 과정에서 Qdrant 임베딩 자동 동기화
  - 관리함 5개 저장 함수 + 수신함 상태 변경 시 fire-and-forget sync
  - 30분 주기 전체 재동기화 안전망 (FastAPI lifespan 백그라운드 태스크)
  - build_document_text에 카테고리(final_category/category) 포함
- RAG 질의에 DB 통계 집계 지원 (카테고리별/부서별 건수)
- Qdrant client.search → query_points API 마이그레이션
- AI 어시스턴트 페이지 권한 추가 (tkuser)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-12 13:05:32 +09:00

130 lines
4.4 KiB
Python

from urllib.parse import quote_plus
from sqlalchemy import create_engine, text
from config import settings
def get_engine():
password = quote_plus(settings.DB_PASSWORD)
url = (
f"mysql+pymysql://{settings.DB_USER}:{password}"
f"@{settings.DB_HOST}:{settings.DB_PORT}/{settings.DB_NAME}"
)
return create_engine(url, pool_pre_ping=True, pool_size=5)
engine = get_engine()
def get_all_issues() -> list[dict]:
with engine.connect() as conn:
result = conn.execute(
text(
"SELECT id, category, description, detail_notes, "
"final_description, final_category, solution, "
"management_comment, cause_detail, project_id, "
"review_status, report_date, responsible_department, "
"location_info "
"FROM qc_issues ORDER BY id"
)
)
return [dict(row._mapping) for row in result]
def get_issue_by_id(issue_id: int) -> dict | None:
with engine.connect() as conn:
result = conn.execute(
text(
"SELECT id, category, description, detail_notes, "
"final_description, final_category, solution, "
"management_comment, cause_detail, project_id, "
"review_status, report_date, responsible_department, "
"location_info "
"FROM qc_issues WHERE id = :id"
),
{"id": issue_id},
)
row = result.fetchone()
return dict(row._mapping) if row else None
def get_issues_since(last_id: int) -> list[dict]:
with engine.connect() as conn:
result = conn.execute(
text(
"SELECT id, category, description, detail_notes, "
"final_description, final_category, solution, "
"management_comment, cause_detail, project_id, "
"review_status, report_date, responsible_department, "
"location_info "
"FROM qc_issues WHERE id > :last_id ORDER BY id"
),
{"last_id": last_id},
)
return [dict(row._mapping) for row in result]
def get_daily_qc_stats(date_str: str) -> dict:
with engine.connect() as conn:
result = conn.execute(
text(
"SELECT "
" COUNT(*) as total, "
" SUM(CASE WHEN DATE(report_date) = :d THEN 1 ELSE 0 END) as new_today, "
" SUM(CASE WHEN review_status = 'in_progress' THEN 1 ELSE 0 END) as in_progress, "
" SUM(CASE WHEN review_status = 'completed' THEN 1 ELSE 0 END) as completed, "
" SUM(CASE WHEN review_status = 'pending_review' THEN 1 ELSE 0 END) as pending "
"FROM qc_issues"
),
{"d": date_str},
)
row = result.fetchone()
return dict(row._mapping) if row else {}
def get_category_stats() -> list[dict]:
"""카테고리별 부적합 건수 집계"""
with engine.connect() as conn:
result = conn.execute(
text(
"SELECT COALESCE(final_category, category) AS category, "
"COUNT(*) AS count "
"FROM qc_issues "
"GROUP BY COALESCE(final_category, category) "
"ORDER BY count DESC"
)
)
return [dict(row._mapping) for row in result]
def get_department_stats() -> list[dict]:
"""부서별 부적합 건수 집계"""
with engine.connect() as conn:
result = conn.execute(
text(
"SELECT responsible_department AS department, "
"COUNT(*) AS count "
"FROM qc_issues "
"WHERE responsible_department IS NOT NULL "
"AND responsible_department != '' "
"GROUP BY responsible_department "
"ORDER BY count DESC"
)
)
return [dict(row._mapping) for row in result]
def get_issues_for_date(date_str: str) -> list[dict]:
with engine.connect() as conn:
result = conn.execute(
text(
"SELECT id, category, description, detail_notes, "
"review_status, responsible_department, solution "
"FROM qc_issues "
"WHERE DATE(report_date) = :d "
"ORDER BY id"
),
{"d": date_str},
)
return [dict(row._mapping) for row in result]