From dd0d7833f657ecab123f52045fb45a93b6148fa4 Mon Sep 17 00:00:00 2001 From: hyungi Date: Mon, 30 Mar 2026 16:41:19 +0900 Subject: [PATCH] =?UTF-8?q?feat:=20DEVONthink=20=EC=A0=84=EC=B2=B4=20?= =?UTF-8?q?=EB=AC=B8=EC=84=9C=20=EB=B0=B0=EC=B9=98=20=EC=9E=84=EB=B2=A0?= =?UTF-8?q?=EB=94=A9=20=EC=8A=A4=ED=81=AC=EB=A6=BD=ED=8A=B8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - batch_embed.py: 9,000+ 문서 배치 임베딩 - DB별 순차 처리, 500건씩 AppleScript 배치 텍스트 추출 - GPU bge-m3 배치 임베딩 (32건/호출) - Qdrant 배치 upsert (100건/호출) - --sync: 삭제된 문서 Qdrant 정리 (고아 포인트 제거) - --force: 전체 재임베딩 - --db: 특정 DB만 처리 - GPU 헬스체크 + Qdrant UUID 중복 스킵 - 페이로드: uuid, title, db_name, text_preview, embedded_at Co-Authored-By: Claude Opus 4.6 (1M context) --- scripts/batch_embed.py | 359 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 359 insertions(+) create mode 100644 scripts/batch_embed.py diff --git a/scripts/batch_embed.py b/scripts/batch_embed.py new file mode 100644 index 0000000..c5910c9 --- /dev/null +++ b/scripts/batch_embed.py @@ -0,0 +1,359 @@ +#!/usr/bin/env python3 +""" +DEVONthink 전체 문서 배치 임베딩 +- DB별 순차 처리, 500건씩 AppleScript 배치 텍스트 추출 +- GPU bge-m3 배치 임베딩 (32건/호출) +- Qdrant 배치 upsert (100건/호출) +- --sync: 삭제된 문서 Qdrant 정리 +- --force: 전체 재임베딩 +- --db: 특정 DB만 처리 + +사용법: + python3 batch_embed.py # 신규 문서만 + python3 batch_embed.py --sync # 신규 + 삭제 동기화 + python3 batch_embed.py --force # 전체 재임베딩 + python3 batch_embed.py --db "04_Industrial safety" +""" + +import argparse +import sys +import uuid as uuid_mod +import time +import requests +from pathlib import Path +from datetime import datetime + +sys.path.insert(0, str(Path(__file__).parent)) +from pkm_utils import setup_logger, load_credentials, run_applescript_inline + +logger = setup_logger("batch_embed") + +QDRANT_URL = "http://localhost:6333" +COLLECTION = "pkm_documents" +EMBED_BATCH_SIZE = 32 +QDRANT_BATCH_SIZE = 100 +APPLESCRIPT_CHUNK = 500 + + +# --- GPU 헬스체크 --- + +def check_gpu_health(gpu_ip: str) -> bool: + """GPU bge-m3 API ping""" + try: + resp = requests.post( + f"http://{gpu_ip}:11434/api/embed", + json={"model": "bge-m3", "input": ["test"]}, + timeout=10, + ) + return resp.status_code == 200 + except Exception: + return False + + +# --- Qdrant --- + +def get_existing_uuids_from_qdrant() -> set[str]: + """Qdrant에 이미 저장된 UUID 집합 조회""" + uuids = set() + offset = None + while True: + body = {"limit": 1000, "with_payload": {"include": ["uuid"]}} + if offset: + body["offset"] = offset + resp = requests.post( + f"{QDRANT_URL}/collections/{COLLECTION}/points/scroll", + json=body, timeout=30, + ) + resp.raise_for_status() + result = resp.json()["result"] + points = result.get("points", []) + for p in points: + uuid_val = p.get("payload", {}).get("uuid") + if uuid_val: + uuids.add(uuid_val) + offset = result.get("next_page_offset") + if not offset or not points: + break + return uuids + + +def delete_from_qdrant(point_ids: list[int]): + """Qdrant에서 포인트 삭제""" + if not point_ids: + return + resp = requests.post( + f"{QDRANT_URL}/collections/{COLLECTION}/points/delete", + json={"points": point_ids}, + timeout=30, + ) + resp.raise_for_status() + + +def uuid_to_point_id(doc_uuid: str) -> int: + return uuid_mod.uuid5(uuid_mod.NAMESPACE_URL, doc_uuid).int >> 64 + + +def store_batch_in_qdrant(docs: list[dict]): + """Qdrant 배치 upsert""" + if not docs: + return + points = [] + for doc in docs: + points.append({ + "id": uuid_to_point_id(doc["uuid"]), + "vector": doc["embedding"], + "payload": { + "uuid": doc["uuid"], + "title": doc["title"], + "db_name": doc.get("db_name", ""), + "text_preview": doc.get("text", "")[:200], + "source": "devonthink", + "embedded_at": datetime.now().isoformat(), + }, + }) + + for i in range(0, len(points), QDRANT_BATCH_SIZE): + batch = points[i:i + QDRANT_BATCH_SIZE] + resp = requests.put( + f"{QDRANT_URL}/collections/{COLLECTION}/points", + json={"points": batch}, + timeout=60, + ) + resp.raise_for_status() + + +# --- GPU 임베딩 --- + +def get_embeddings_batch(texts: list[str], gpu_ip: str) -> list[list[float]]: + """GPU bge-m3 배치 임베딩 (4000자 제한 — bge-m3 토큰 한도 고려)""" + truncated = [t[:4000] for t in texts] + resp = requests.post( + f"http://{gpu_ip}:11434/api/embed", + json={"model": "bge-m3", "input": truncated}, + timeout=120, + ) + resp.raise_for_status() + return resp.json().get("embeddings", []) + + +# --- DEVONthink 텍스트 추출 --- + +def get_db_names() -> list[str]: + """DEVONthink DB 이름 목록""" + script = ''' + tell application id "DNtp" + set dbNames to {} + repeat with db in databases + set end of dbNames to name of db + end repeat + set AppleScript's text item delimiters to linefeed + return dbNames as text + end tell + ''' + result = run_applescript_inline(script) + return [n.strip() for n in result.split("\n") if n.strip()] + + +def get_db_document_uuids(db_name: str) -> list[str]: + """특정 DB의 임베딩 대상 UUID 목록 (그룹 제외, 텍스트 10자 이상)""" + script = f''' + tell application id "DNtp" + set theDB to database "{db_name}" + set allDocs to contents of theDB + set output to {{}} + repeat with rec in allDocs + try + set recType to type of rec as string + if recType is not "group" then + set recText to plain text of rec + if length of recText > 10 then + set end of output to uuid of rec + end if + end if + end try + end repeat + set AppleScript's text item delimiters to linefeed + return output as text + end tell + ''' + try: + result = run_applescript_inline(script) + return [u.strip() for u in result.split("\n") if u.strip()] + except Exception as e: + logger.error(f"UUID 수집 실패 [{db_name}]: {e}") + return [] + + +def get_documents_batch(uuids: list[str]) -> list[dict]: + """UUID 리스트로 배치 텍스트 추출 (AppleScript 1회 호출)""" + if not uuids: + return [] + + # UUID를 AppleScript 리스트로 변환 + uuid_list = ", ".join(f'"{u}"' for u in uuids) + script = f''' + tell application id "DNtp" + set uuidList to {{{uuid_list}}} + set output to {{}} + repeat with u in uuidList + try + set theRecord to get record with uuid u + set recText to plain text of theRecord + set recTitle to name of theRecord + set recDB to name of database of theRecord + if length of recText > 8000 then + set recText to text 1 thru 8000 of recText + end if + set end of output to u & "|||" & recTitle & "|||" & recDB & "|||" & recText + on error + set end of output to u & "|||ERROR|||||||" + end try + end repeat + set AppleScript's text item delimiters to linefeed & "<<<>>>" & linefeed + return output as text + end tell + ''' + try: + result = run_applescript_inline(script) + except Exception as e: + logger.error(f"배치 텍스트 추출 실패: {e}") + return [] + + docs = [] + for entry in result.split("\n<<<>>>\n"): + entry = entry.strip() + if not entry or "|||ERROR|||" in entry: + continue + parts = entry.split("|||", 3) + if len(parts) >= 4: + text = parts[3].strip() + if len(text) >= 10: + docs.append({ + "uuid": parts[0].strip(), + "title": parts[1].strip(), + "db_name": parts[2].strip(), + "text": text, + }) + return docs + + +# --- 메인 배치 --- + +def run_batch(gpu_ip: str, target_db: str = None, force: bool = False, sync: bool = False): + """배치 임베딩 실행""" + + # GPU 헬스체크 + if not check_gpu_health(gpu_ip): + logger.error(f"GPU 서버 연결 실패 ({gpu_ip}) — 종료") + sys.exit(1) + logger.info(f"GPU 서버 연결 확인: {gpu_ip}") + + # 기존 임베딩 UUID 조회 + existing_uuids = set() + if not force: + existing_uuids = get_existing_uuids_from_qdrant() + logger.info(f"Qdrant 기존 임베딩: {len(existing_uuids)}건") + + # DB 목록 + db_names = [target_db] if target_db else get_db_names() + logger.info(f"처리 대상 DB: {db_names}") + + total_embedded = 0 + total_skipped = 0 + total_failed = 0 + all_dt_uuids = set() + + for db_name in db_names: + logger.info(f"--- DB: {db_name} ---") + + # UUID 수집 + uuids = get_db_document_uuids(db_name) + all_dt_uuids.update(uuids) + logger.info(f" 문서: {len(uuids)}건") + + # 기존 스킵 + if not force: + new_uuids = [u for u in uuids if u not in existing_uuids] + skipped = len(uuids) - len(new_uuids) + total_skipped += skipped + if skipped > 0: + logger.info(f" 스킵: {skipped}건 (이미 임베딩)") + uuids = new_uuids + + if not uuids: + continue + + # 500건씩 AppleScript 배치 텍스트 추출 + for chunk_start in range(0, len(uuids), APPLESCRIPT_CHUNK): + chunk_uuids = uuids[chunk_start:chunk_start + APPLESCRIPT_CHUNK] + docs = get_documents_batch(chunk_uuids) + + if not docs: + continue + + # 32건씩 GPU 임베딩 + for batch_start in range(0, len(docs), EMBED_BATCH_SIZE): + batch = docs[batch_start:batch_start + EMBED_BATCH_SIZE] + texts = [d["text"] for d in batch] + + try: + embeddings = get_embeddings_batch(texts, gpu_ip) + if len(embeddings) != len(batch): + logger.warning(f"임베딩 수 불일치: {len(embeddings)} != {len(batch)}") + total_failed += len(batch) + continue + + for doc, emb in zip(batch, embeddings): + doc["embedding"] = emb + + store_batch_in_qdrant(batch) + total_embedded += len(batch) + + except Exception as e: + logger.error(f"배치 임베딩 실패: {e}") + total_failed += len(batch) + + progress = chunk_start + len(chunk_uuids) + logger.info(f" 진행: {progress}/{len(uuids)}") + + # --sync: 고아 포인트 삭제 + orphan_deleted = 0 + if sync and all_dt_uuids: + orphan_uuids = existing_uuids - all_dt_uuids + if orphan_uuids: + orphan_ids = [uuid_to_point_id(u) for u in orphan_uuids] + delete_from_qdrant(orphan_ids) + orphan_deleted = len(orphan_uuids) + logger.info(f"고아 포인트 삭제: {orphan_deleted}건") + + # 통계 + logger.info("=== 배치 임베딩 완료 ===") + logger.info(f" 임베딩: {total_embedded}건") + logger.info(f" 스킵: {total_skipped}건") + logger.info(f" 실패: {total_failed}건") + if orphan_deleted: + logger.info(f" 고아 삭제: {orphan_deleted}건") + + # Qdrant 최종 카운트 + try: + resp = requests.get(f"{QDRANT_URL}/collections/{COLLECTION}", timeout=10) + count = resp.json()["result"]["points_count"] + logger.info(f" Qdrant 총 포인트: {count}건") + except Exception: + pass + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="DEVONthink 배치 임베딩") + parser.add_argument("--force", action="store_true", help="전체 재임베딩") + parser.add_argument("--sync", action="store_true", help="삭제 동기화 포함") + parser.add_argument("--db", type=str, help="특정 DB만 처리") + args = parser.parse_args() + + creds = load_credentials() + gpu_ip = creds.get("GPU_SERVER_IP") + if not gpu_ip: + logger.error("GPU_SERVER_IP 미설정") + sys.exit(1) + + run_batch(gpu_ip, target_db=args.db, force=args.force, sync=args.sync)