Files
tk-factory-services/ai-service/db/vector_store.py
Hyungi Ahn 5b1b89254c feat: RAG 임베딩 자동 동기화 + AI 서비스 개선
- 부적합 라이프사이클 전 과정에서 Qdrant 임베딩 자동 동기화
  - 관리함 5개 저장 함수 + 수신함 상태 변경 시 fire-and-forget sync
  - 30분 주기 전체 재동기화 안전망 (FastAPI lifespan 백그라운드 태스크)
  - build_document_text에 카테고리(final_category/category) 포함
- RAG 질의에 DB 통계 집계 지원 (카테고리별/부서별 건수)
- Qdrant client.search → query_points API 마이그레이션
- AI 어시스턴트 페이지 권한 추가 (tkuser)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-12 13:05:32 +09:00

108 lines
3.5 KiB
Python

import logging
import uuid
from qdrant_client import QdrantClient
from qdrant_client.models import Distance, VectorParams, PointStruct, Filter, FieldCondition, MatchValue
from config import settings
logger = logging.getLogger(__name__)
class VectorStore:
def __init__(self):
self.client = None
self.collection = settings.QDRANT_COLLECTION # "tk_qc_issues"
def initialize(self):
self.client = QdrantClient(url=settings.QDRANT_URL)
self._ensure_collection()
def _ensure_collection(self):
collections = [c.name for c in self.client.get_collections().collections]
if self.collection not in collections:
# bge-m3 기본 출력 = 1024 dims
self.client.create_collection(
collection_name=self.collection,
vectors_config=VectorParams(size=1024, distance=Distance.COSINE),
)
@staticmethod
def _to_uuid(doc_id) -> str:
"""문자열/정수 ID → UUID5 변환 (Qdrant 호환)"""
return str(uuid.uuid5(uuid.NAMESPACE_URL, str(doc_id)))
def upsert(
self,
doc_id: str,
document: str,
embedding: list[float],
metadata: dict = None,
):
point_id = self._to_uuid(doc_id)
payload = {"document": document, "original_id": str(doc_id)}
if metadata:
payload.update(metadata)
self.client.upsert(
collection_name=self.collection,
points=[PointStruct(id=point_id, vector=embedding, payload=payload)],
)
def query(
self,
embedding: list[float],
n_results: int = 5,
where: dict = None,
) -> list[dict]:
query_filter = self._build_filter(where) if where else None
try:
response = self.client.query_points(
collection_name=self.collection,
query=embedding,
limit=n_results,
query_filter=query_filter,
with_payload=True,
)
except Exception as e:
logger.error(f"Qdrant search failed: {e}", exc_info=True)
return []
items = []
for hit in response.points:
payload = hit.payload or {}
item = {
"id": payload.get("original_id", str(hit.id)),
"document": payload.get("document", ""),
"distance": round(1 - hit.score, 4), # cosine score → distance
"metadata": {k: v for k, v in payload.items() if k not in ("document", "original_id")},
"similarity": round(hit.score, 4),
}
items.append(item)
return items
@staticmethod
def _build_filter(where: dict) -> Filter:
"""ChromaDB 스타일 where 조건 → Qdrant Filter 변환"""
conditions = []
for key, value in where.items():
conditions.append(FieldCondition(key=key, match=MatchValue(value=value)))
return Filter(must=conditions)
def delete(self, doc_id: str):
point_id = self._to_uuid(doc_id)
self.client.delete(
collection_name=self.collection,
points_selector=[point_id],
)
def count(self) -> int:
info = self.client.get_collection(collection_name=self.collection)
return info.points_count
def stats(self) -> dict:
return {
"total_documents": self.count(),
"collection_name": self.collection,
}
vector_store = VectorStore()