From 010e25cb23142712d4895fb70225bbd086593138 Mon Sep 17 00:00:00 2001 From: Hyungi Ahn Date: Thu, 9 Apr 2026 13:45:55 +0900 Subject: [PATCH] =?UTF-8?q?fix(queue):=20doc-level=20embed=20metadata=20?= =?UTF-8?q?=EA=B8=B0=EB=B0=98=20+=20NUL=20=EB=B0=94=EC=9D=B4=ED=8A=B8=20st?= =?UTF-8?q?rip=20+=20=EB=B9=88=20=EC=98=88=EC=99=B8=20fallback?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit embed_worker: - extracted_text[:6000] → title + ai_summary + tags(top 5) metadata 입력 - 500k자 문서의 표지+목차가 임베딩되는 구조적 버그 해결 - Ollama 기본 context 안전 (~1500자 이하), num_ctx 조정 불필요 - ai_summary < 50자 시 본문 800자 fallback - ai_domain 은 초기 제외 (taxonomy 노이즈 방지) extract_worker: - kordoc / 직접 읽기 / LibreOffice 3 경로 모두 \x00 strip - asyncpg CharacterNotInRepertoireError 재발 방지 queue_consumer: - str(e) or repr(e) or type(e).__name__ fallback - 빈 메시지 예외(24건 발생) 다음부터 클래스명이라도 기록 plan: ~/.claude/plans/quiet-meandering-nova.md Co-Authored-By: Claude Opus 4.6 (1M context) --- app/workers/embed_worker.py | 76 +++++++++++++++++++++++++++++------ app/workers/extract_worker.py | 9 +++-- app/workers/queue_consumer.py | 4 +- 3 files changed, 72 insertions(+), 17 deletions(-) diff --git a/app/workers/embed_worker.py b/app/workers/embed_worker.py index 15871a4..bdd371b 100644 --- a/app/workers/embed_worker.py +++ b/app/workers/embed_worker.py @@ -1,4 +1,19 @@ -"""벡터 임베딩 워커 — GPU 서버 bge-m3 호출""" +"""벡터 임베딩 워커 — GPU 서버 bge-m3 호출 (doc-level recall vector) + +## 구조 원칙 (영구) + +doc-level embedding 은 "요약 벡터" (recall 담당). chunk-level embedding (chunk_worker) +이 precision 을 담당하는 hybrid 구조 (`retrieval_service._search_vector_docs` 참조). + +**본문 일부를 임베딩 입력으로 쓰면 안 된다**. 500k자 교재의 앞 6000자는 표지+목차 — +임베딩 품질이 쓰레기가 된다. 대신 AI 가 이미 생성한 `ai_summary` 를 중심으로 한 +metadata (title + summary + tags) 를 입력으로 사용한다. + +이 선택의 이점: +- 입력 길이 ~1500자 이하 → Ollama 기본 context 안전 (num_ctx 조정 불필요) +- AI 요약은 "전체 문서의 압축 의미" → doc-level 역할에 정확히 부합 +- 태그는 상위 semantic signal → noise 없음 +""" from datetime import datetime, timezone @@ -10,27 +25,59 @@ from models.document import Document logger = setup_logger("embed_worker") -# 임베딩용 텍스트 최대 길이 (bge-m3: 8192 토큰) -MAX_EMBED_TEXT = 6000 +# ─── 품질 가드 상수 ────────────────────────────────── +MIN_SUMMARY_CHARS = 50 # 너무 짧은 요약은 저품질 — 본문 fallback 사용 +MAX_TAGS = 5 # 상위 N개만 (과도한 태그는 임베딩 노이즈) +FALLBACK_PREFIX_CHARS = 800 # ai_summary 누락/저품질 시 본문 프리픽스 + EMBED_MODEL_VERSION = "bge-m3" +def _build_embed_input(doc: Document) -> str: + """doc-level recall vector 용 metadata 입력 빌더. + + Returns: + 임베딩 모델에 보낼 문자열. 평균 ~500~1500자. + + 품질 가드: + - ai_summary 가 MIN_SUMMARY_CHARS 미만이면 저품질로 보고 본문 fallback + - tags 는 상위 MAX_TAGS 개만 (과도한 태그는 임베딩에 노이즈) + - ai_domain 은 현 단계에서 제외 (taxonomy 품질이 안정화될 때까지) + """ + parts = [f"제목: {(doc.title or '').strip()}"] + + summary = (doc.ai_summary or "").strip() + use_summary = len(summary) >= MIN_SUMMARY_CHARS + if use_summary: + parts.append(f"요약: {summary}") + + # tags: 리스트면 상위 MAX_TAGS, 문자열이면 그대로 (이상 케이스) + if doc.ai_tags: + if isinstance(doc.ai_tags, list): + tags_list = [str(t).strip() for t in doc.ai_tags[:MAX_TAGS] if t] + tags_str = ", ".join(tags_list) + else: + tags_str = str(doc.ai_tags) + if tags_str: + parts.append(f"키워드: {tags_str}") + + # ai_summary 품질 미달 시 본문 프리픽스 fallback (최소 recall 확보) + if not use_summary and doc.extracted_text: + parts.append(f"본문: {doc.extracted_text[:FALLBACK_PREFIX_CHARS]}") + + return "\n".join(p for p in parts if p).strip() + + async def process(document_id: int, session: AsyncSession) -> None: - """문서 벡터 임베딩 생성""" + """문서 벡터 임베딩 생성 (doc-level recall vector)""" doc = await session.get(Document, document_id) if not doc: raise ValueError(f"문서 ID {document_id}를 찾을 수 없음") - if not doc.extracted_text: - raise ValueError(f"문서 ID {document_id}: extracted_text가 비어있음") - - # title + 본문 앞부분을 결합하여 임베딩 입력 생성 - title_part = doc.title or "" - text_part = doc.extracted_text[:MAX_EMBED_TEXT] - embed_input = f"{title_part}\n\n{text_part}".strip() + embed_input = _build_embed_input(doc) if not embed_input: - logger.warning(f"[임베딩] document_id={document_id}: 빈 텍스트, 스킵") + logger.warning(f"[임베딩] document_id={document_id}: 빈 입력, 스킵") return client = AIClient() @@ -39,6 +86,9 @@ async def process(document_id: int, session: AsyncSession) -> None: doc.embedding = vector doc.embed_model_version = EMBED_MODEL_VERSION doc.embedded_at = datetime.now(timezone.utc) - logger.info(f"[임베딩] document_id={document_id}: {len(vector)}차원 벡터 생성") + logger.info( + f"[임베딩] document_id={document_id}: {len(vector)}차원 벡터 " + f"(input_len={len(embed_input)}, has_summary={bool(doc.ai_summary)})" + ) finally: await client.close() diff --git a/app/workers/extract_worker.py b/app/workers/extract_worker.py index 52d5119..649a6dc 100644 --- a/app/workers/extract_worker.py +++ b/app/workers/extract_worker.py @@ -39,7 +39,8 @@ async def process(document_id: int, session: AsyncSession) -> None: if not full_path.exists(): raise FileNotFoundError(f"파일 없음: {full_path}") text = full_path.read_text(encoding="utf-8", errors="replace") - doc.extracted_text = text + # NUL 바이트 제거 (Postgres TEXT 저장 시 CharacterNotInRepertoireError 방지) + doc.extracted_text = text.replace("\x00", "") doc.extracted_at = datetime.now(timezone.utc) doc.extractor_version = "direct_read" logger.info(f"[텍스트] {doc.file_path} ({len(text)}자)") @@ -70,7 +71,8 @@ async def process(document_id: int, session: AsyncSession) -> None: resp.raise_for_status() data = resp.json() - doc.extracted_text = data.get("markdown", "") + # NUL 바이트 제거 (Postgres TEXT 저장 시 CharacterNotInRepertoireError 방지) + doc.extracted_text = data.get("markdown", "").replace("\x00", "") doc.extracted_at = datetime.now(timezone.utc) doc.extractor_version = EXTRACTOR_VERSION logger.info(f"[kordoc] {doc.file_path} ({len(doc.extracted_text)}자)") @@ -106,7 +108,8 @@ async def process(document_id: int, session: AsyncSession) -> None: out_file = tmp_dir / f"input_{document_id}.{out_ext}" if out_file.exists(): text = out_file.read_text(encoding="utf-8", errors="replace") - doc.extracted_text = text[:15000] + # NUL 바이트 제거 (Postgres TEXT 저장 시 CharacterNotInRepertoireError 방지) + doc.extracted_text = text.replace("\x00", "")[:15000] doc.extracted_at = datetime.now(timezone.utc) doc.extractor_version = "libreoffice" out_file.unlink() diff --git a/app/workers/queue_consumer.py b/app/workers/queue_consumer.py index 2729034..9ac74eb 100644 --- a/app/workers/queue_consumer.py +++ b/app/workers/queue_consumer.py @@ -133,7 +133,9 @@ async def consume_queue(): if not item: logger.warning(f"[{stage}] queue_id={queue_id} 없음 (삭제됨?), skip") continue - item.error_message = str(e)[:500] + # 빈 메시지 방어: str → repr → 클래스명 순 fallback + err_text = str(e) or repr(e) or type(e).__name__ + item.error_message = err_text[:500] if item.attempts >= item.max_attempts: item.status = "failed" logger.error(f"[{stage}] document_id={document_id} 영구 실패: {e}")