From 42dfe82c9bf56df2a38d73c1559d742547c184bd Mon Sep 17 00:00:00 2001 From: Hyungi Ahn Date: Wed, 8 Apr 2026 12:31:29 +0900 Subject: [PATCH] =?UTF-8?q?feat(chunk):=20Phase=201.2-E=20reindex=20?= =?UTF-8?q?=EC=8A=A4=ED=81=AC=EB=A6=BD=ED=8A=B8=20=EC=B6=94=EA=B0=80?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit tests/scripts/reindex_all_chunks.py — 전체 documents chunk 재인덱싱 도구. 핵심 요건 (사용자 정의): - asyncio.Semaphore(N) — 동시 처리 수 제한 (기본 3, Ollama bge-m3 부하 조절) - checkpoint resume — JSON 파일 atomic swap, 중간 실패/중단 후 재시작 가능 - rate limiting — 작업 간 sleep 0.1초 (Ollama API 보호) - 진행 로그 — [REINDEX] N/total (P%) ETA: ... fails: N (~2% 단위) CLI: - --concurrency, --checkpoint, --rate-limit, --limit (dry-run), --skip-existing 야간 배치 (00:00~06:00): PYTHONPATH=app .venv/bin/python tests/scripts/reindex_all_chunks.py \ --concurrency 3 --checkpoint checkpoints/reindex.json \ > logs/reindex.log 2>&1 & --- tests/scripts/__init__.py | 0 tests/scripts/reindex_all_chunks.py | 204 ++++++++++++++++++++++++++++ 2 files changed, 204 insertions(+) create mode 100644 tests/scripts/__init__.py create mode 100644 tests/scripts/reindex_all_chunks.py diff --git a/tests/scripts/__init__.py b/tests/scripts/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/scripts/reindex_all_chunks.py b/tests/scripts/reindex_all_chunks.py new file mode 100644 index 0000000..851fb8d --- /dev/null +++ b/tests/scripts/reindex_all_chunks.py @@ -0,0 +1,204 @@ +"""문서 chunk 재인덱싱 (Phase 1.2-E). + +전체 documents를 chunk_worker로 재처리. 야간 배치 권장 (00:00~06:00). + +핵심 요건 (사용자 정의): +- concurrency 제한 (asyncio.Semaphore) — Ollama 부하 조절 +- checkpoint resume (중간 실패/중단 대비) +- rate limiting (Ollama API 보호) +- 진행 로그 ([REINDEX] N/total (P%) ETA: ...) + +사용: + cd /home/hyungi/Documents/code/hyungi_Document_Server + PYTHONPATH=app .venv/bin/python tests/scripts/reindex_all_chunks.py \\ + --concurrency 3 \\ + --checkpoint checkpoints/reindex.json \\ + > logs/reindex.log 2>&1 & + +dry-run (5개만): + PYTHONPATH=app .venv/bin/python tests/scripts/reindex_all_chunks.py --limit 5 + +기존 chunks 보유 doc 건너뛰기: + PYTHONPATH=app .venv/bin/python tests/scripts/reindex_all_chunks.py --skip-existing + +기존 chunks 강제 재처리 (chunk_worker가 자동으로 delete + insert): + PYTHONPATH=app .venv/bin/python tests/scripts/reindex_all_chunks.py +""" + +import argparse +import asyncio +import json +import sys +import time +from pathlib import Path + +# PYTHONPATH=app 가정 +sys.path.insert(0, str(Path(__file__).resolve().parent.parent.parent / "app")) + +from sqlalchemy import select # noqa: E402 +from sqlalchemy.ext.asyncio import async_sessionmaker # noqa: E402 + +from core.database import engine # noqa: E402 +from core.utils import setup_logger # noqa: E402 +from models.chunk import DocumentChunk # noqa: E402 +from models.document import Document # noqa: E402 +from workers.chunk_worker import process # noqa: E402 + +logger = setup_logger("reindex") + + +def load_checkpoint(path: Path) -> set[int]: + """checkpoint 파일에서 처리 완료 doc_id 집합 복원.""" + if not path.exists(): + return set() + try: + data = json.loads(path.read_text()) + return set(data.get("processed", [])) + except (json.JSONDecodeError, KeyError) as e: + logger.warning(f"checkpoint {path} invalid ({e}) → 새로 시작") + return set() + + +def save_checkpoint(path: Path, processed: set[int]) -> None: + """처리 완료 doc_id를 checkpoint 파일에 저장 (incremental).""" + path.parent.mkdir(parents=True, exist_ok=True) + tmp = path.with_suffix(path.suffix + ".tmp") + tmp.write_text(json.dumps({"processed": sorted(processed)}, indent=2)) + tmp.replace(path) # atomic swap + + +def format_eta(elapsed: float, done: int, total: int) -> str: + """남은 작업 시간 ETA 포맷.""" + if done == 0: + return "?" + rate = done / elapsed + remaining = (total - done) / rate + if remaining < 60: + return f"{remaining:.0f}s" + if remaining < 3600: + return f"{remaining / 60:.0f}m" + return f"{remaining / 3600:.1f}h" + + +async def main(): + parser = argparse.ArgumentParser(description="문서 chunk 재인덱싱 (Phase 1.2-E)") + parser.add_argument( + "--concurrency", + type=int, + default=3, + help="동시 처리 doc 수 (default 3, Ollama bge-m3 부하 조절)", + ) + parser.add_argument( + "--checkpoint", + type=Path, + default=Path("checkpoints/reindex.json"), + help="checkpoint 파일 경로 (resume 가능)", + ) + parser.add_argument( + "--rate-limit", + type=float, + default=0.1, + help="작업 간 sleep (초, Ollama 보호)", + ) + parser.add_argument( + "--limit", + type=int, + default=None, + help="처리할 doc 수 제한 (dry-run 용)", + ) + parser.add_argument( + "--skip-existing", + action="store_true", + help="이미 chunks 있는 doc skip (재처리 생략)", + ) + args = parser.parse_args() + + Session = async_sessionmaker(engine) + + # 1. 대상 docs 수집 + async with Session() as session: + query = ( + select(Document.id) + .where( + Document.deleted_at.is_(None), + Document.extracted_text.is_not(None), + ) + .order_by(Document.id) + ) + result = await session.execute(query) + all_doc_ids = [row[0] for row in result] + + if args.skip_existing: + existing_query = select(DocumentChunk.doc_id).distinct() + existing_result = await session.execute(existing_query) + existing = {row[0] for row in existing_result} + logger.info(f"skip-existing: 기존 chunks 보유 doc {len(existing)}건") + else: + existing = set() + + # 2. checkpoint resume + processed = load_checkpoint(args.checkpoint) + if processed: + logger.info(f"checkpoint: 이미 처리됨 {len(processed)}건 (resume)") + + # 3. 처리 대상 = 전체 - skip_existing - checkpoint + targets = [d for d in all_doc_ids if d not in processed and d not in existing] + if args.limit: + targets = targets[: args.limit] + + total = len(targets) + logger.info( + f"REINDEX 시작: 전체 {len(all_doc_ids)} docs / 처리 대상 {total} docs" + f" / concurrency={args.concurrency} rate_limit={args.rate_limit}s" + ) + + if total == 0: + logger.info("처리할 doc 없음. 종료.") + return + + semaphore = asyncio.Semaphore(args.concurrency) + done_count = 0 + fail_count = 0 + start_time = time.monotonic() + log_interval = max(1, total // 50) # ~2% 단위 진행 로그 + + async def process_one(doc_id: int) -> None: + nonlocal done_count, fail_count + async with semaphore: + try: + async with Session() as session: + await process(doc_id, session) + await session.commit() + # rate limit (Ollama 보호) + await asyncio.sleep(args.rate_limit) + done_count += 1 + processed.add(doc_id) + + # 진행 로그 + 체크포인트 저장 + if done_count % log_interval == 0 or done_count == total: + elapsed = time.monotonic() - start_time + pct = (done_count / total) * 100 + eta = format_eta(elapsed, done_count, total) + logger.info( + f"[REINDEX] {done_count}/{total} ({pct:.1f}%)" + f" ETA: {eta} elapsed: {elapsed:.0f}s fails: {fail_count}" + ) + save_checkpoint(args.checkpoint, processed) + except Exception as e: + fail_count += 1 + logger.warning( + f"[REINDEX] doc {doc_id} 실패: {type(e).__name__}: {e}" + ) + + tasks = [process_one(doc_id) for doc_id in targets] + await asyncio.gather(*tasks) + + elapsed = time.monotonic() - start_time + save_checkpoint(args.checkpoint, processed) + logger.info( + f"[REINDEX] 완료: {done_count}/{total} done, {fail_count} fails, {elapsed:.0f}s" + ) + + +if __name__ == "__main__": + asyncio.run(main())