083aa3126a
- embed_worker: ai_summary 누락 시 text[:800] fallback → ToC 감지 + 서술형 문단 우선 선택 (보수적 휴리스틱, 강신호 2개 이상 + 스킵 상한) - retrieval_service: snippet 200자 → 1200자 (리랭커/evidence에 더 넓은 문맥 제공) - evidence_service: CANDIDATE_SNIPPET_CHARS 800 → 1200 (LLM evidence window 확대) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
171 lines
6.4 KiB
Python
171 lines
6.4 KiB
Python
"""벡터 임베딩 워커 — GPU 서버 bge-m3 호출 (doc-level recall vector)
|
|
|
|
## 구조 원칙 (영구)
|
|
|
|
doc-level embedding 은 "요약 벡터" (recall 담당). chunk-level embedding (chunk_worker)
|
|
이 precision 을 담당하는 hybrid 구조 (`retrieval_service._search_vector_docs` 참조).
|
|
|
|
**본문 일부를 임베딩 입력으로 쓰면 안 된다**. 500k자 교재의 앞 6000자는 표지+목차 —
|
|
임베딩 품질이 쓰레기가 된다. 대신 AI 가 이미 생성한 `ai_summary` 를 중심으로 한
|
|
metadata (title + summary + tags) 를 입력으로 사용한다.
|
|
|
|
이 선택의 이점:
|
|
- 입력 길이 ~1500자 이하 → Ollama 기본 context 안전 (num_ctx 조정 불필요)
|
|
- AI 요약은 "전체 문서의 압축 의미" → doc-level 역할에 정확히 부합
|
|
- 태그는 상위 semantic signal → noise 없음
|
|
"""
|
|
|
|
import re
|
|
from datetime import datetime, timezone
|
|
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from ai.client import AIClient
|
|
from core.utils import setup_logger
|
|
from models.document import Document
|
|
|
|
logger = setup_logger("embed_worker")
|
|
|
|
# ─── 품질 가드 상수 ──────────────────────────────────
|
|
MIN_SUMMARY_CHARS = 50 # 너무 짧은 요약은 저품질 — 본문 fallback 사용
|
|
MAX_TAGS = 5 # 상위 N개만 (과도한 태그는 임베딩 노이즈)
|
|
FALLBACK_PREFIX_CHARS = 800 # ai_summary 누락/저품질 시 본문 프리픽스
|
|
TOC_MAX_SKIP_RATIO = 3 # 전체 텍스트의 1/N 초과 스킵 금지
|
|
TOC_MIN_SIGNALS = 2 # 강한 신호 N개 이상일 때만 발동
|
|
PARA_MIN_CHARS = 100 # 서술형 문단 최소 길이
|
|
PARA_MAX_SYMBOL_RATIO = 0.3 # 숫자/기호 밀도 상한
|
|
|
|
EMBED_MODEL_VERSION = "bge-m3"
|
|
|
|
# ─── ToC 패턴 ────────────────────────────────────────
|
|
_RE_TOC_HEADING = re.compile(r"(?m)^[\s]*목\s*차[\s]*$")
|
|
_RE_DOTTED_LEADER = re.compile(r"[.·…]{3,}\s*\d+")
|
|
_RE_CHAPTER_PAGE = re.compile(r"(?m)^[\s]*제\s*\d+\s*[장절편]\b.*\d+\s*$")
|
|
|
|
|
|
def _find_toc_end(text: str) -> int:
|
|
"""목차 영역 끝 위치를 보수적으로 탐지.
|
|
|
|
강한 신호 2개 이상 동시 충족 시에만 발동.
|
|
스킵 상한: 전체 텍스트의 1/TOC_MAX_SKIP_RATIO.
|
|
"""
|
|
max_pos = len(text) // TOC_MAX_SKIP_RATIO
|
|
signals = 0
|
|
toc_end = 0
|
|
|
|
# 신호 A: "목차" 헤딩
|
|
m = _RE_TOC_HEADING.search(text[:max_pos])
|
|
if m:
|
|
signals += 1
|
|
toc_end = max(toc_end, m.end())
|
|
|
|
# 신호 B: dotted leaders + 숫자 (3줄 이상 연속)
|
|
leaders = list(_RE_DOTTED_LEADER.finditer(text[:max_pos]))
|
|
if len(leaders) >= 3:
|
|
signals += 1
|
|
toc_end = max(toc_end, leaders[-1].end())
|
|
|
|
# 신호 C: "제N장/절" + 페이지번호 (3줄 이상)
|
|
chapters = list(_RE_CHAPTER_PAGE.finditer(text[:max_pos]))
|
|
if len(chapters) >= 3:
|
|
signals += 1
|
|
toc_end = max(toc_end, chapters[-1].end())
|
|
|
|
if signals < TOC_MIN_SIGNALS:
|
|
return 0
|
|
|
|
return min(toc_end, max_pos)
|
|
|
|
|
|
def _is_substantive(paragraph: str) -> bool:
|
|
"""서술형 문단인지 판별 (표/목록/깨진 OCR 제외)."""
|
|
if len(paragraph) < PARA_MIN_CHARS:
|
|
return False
|
|
# 마침표 포함 (서술문)
|
|
if "다." not in paragraph and "." not in paragraph:
|
|
return False
|
|
# 숫자/특수문자 밀도 체크
|
|
non_text = sum(1 for c in paragraph if c.isdigit() or c in "·•–-|/\\()[]{}=+*#<>")
|
|
if non_text / len(paragraph) > PARA_MAX_SYMBOL_RATIO:
|
|
return False
|
|
return True
|
|
|
|
|
|
def _find_substantive_text(text: str) -> str | None:
|
|
"""ToC 스킵 후 첫 서술형 문단을 찾아 반환."""
|
|
toc_end = _find_toc_end(text)
|
|
candidate = text[toc_end:] if toc_end > 0 else text
|
|
|
|
for para in candidate.split("\n\n"):
|
|
stripped = para.strip()
|
|
if _is_substantive(stripped):
|
|
return stripped[:FALLBACK_PREFIX_CHARS]
|
|
|
|
return None
|
|
|
|
|
|
def _build_embed_input(doc: Document) -> str:
|
|
"""doc-level recall vector 용 metadata 입력 빌더.
|
|
|
|
Returns:
|
|
임베딩 모델에 보낼 문자열. 평균 ~500~1500자.
|
|
|
|
품질 가드:
|
|
- ai_summary 가 MIN_SUMMARY_CHARS 미만이면 저품질로 보고 본문 fallback
|
|
- tags 는 상위 MAX_TAGS 개만 (과도한 태그는 임베딩에 노이즈)
|
|
- ai_domain 은 현 단계에서 제외 (taxonomy 품질이 안정화될 때까지)
|
|
"""
|
|
parts = [f"제목: {(doc.title or '').strip()}"]
|
|
|
|
summary = (doc.ai_summary or "").strip()
|
|
use_summary = len(summary) >= MIN_SUMMARY_CHARS
|
|
if use_summary:
|
|
parts.append(f"요약: {summary}")
|
|
|
|
# tags: 리스트면 상위 MAX_TAGS, 문자열이면 그대로 (이상 케이스)
|
|
if doc.ai_tags:
|
|
if isinstance(doc.ai_tags, list):
|
|
tags_list = [str(t).strip() for t in doc.ai_tags[:MAX_TAGS] if t]
|
|
tags_str = ", ".join(tags_list)
|
|
else:
|
|
tags_str = str(doc.ai_tags)
|
|
if tags_str:
|
|
parts.append(f"키워드: {tags_str}")
|
|
|
|
# ai_summary 품질 미달 시 본문 fallback (ToC 회피 + 서술형 문단 우선)
|
|
if not use_summary and doc.extracted_text:
|
|
substantive = _find_substantive_text(doc.extracted_text)
|
|
if substantive:
|
|
parts.append(f"본문: {substantive}")
|
|
else:
|
|
# 서술형 문단 못 찾으면 기존 fallback
|
|
parts.append(f"본문: {doc.extracted_text[:FALLBACK_PREFIX_CHARS]}")
|
|
|
|
return "\n".join(p for p in parts if p).strip()
|
|
|
|
|
|
async def process(document_id: int, session: AsyncSession) -> None:
|
|
"""문서 벡터 임베딩 생성 (doc-level recall vector)"""
|
|
doc = await session.get(Document, document_id)
|
|
if not doc:
|
|
raise ValueError(f"문서 ID {document_id}를 찾을 수 없음")
|
|
|
|
embed_input = _build_embed_input(doc)
|
|
|
|
if not embed_input:
|
|
logger.warning(f"[임베딩] document_id={document_id}: 빈 입력, 스킵")
|
|
return
|
|
|
|
client = AIClient()
|
|
try:
|
|
vector = await client.embed(embed_input)
|
|
doc.embedding = vector
|
|
doc.embed_model_version = EMBED_MODEL_VERSION
|
|
doc.embedded_at = datetime.now(timezone.utc)
|
|
logger.info(
|
|
f"[임베딩] document_id={document_id}: {len(vector)}차원 벡터 "
|
|
f"(input_len={len(embed_input)}, has_summary={bool(doc.ai_summary)})"
|
|
)
|
|
finally:
|
|
await client.close()
|