"""문서 chunk 재인덱싱 (Phase 1.2-E). 전체 documents를 chunk_worker로 재처리. 야간 배치 권장 (00:00~06:00). 핵심 요건 (사용자 정의): - concurrency 제한 (asyncio.Semaphore) — Ollama 부하 조절 - checkpoint resume (중간 실패/중단 대비) - rate limiting (Ollama API 보호) - 진행 로그 ([REINDEX] N/total (P%) ETA: ...) 사용: cd /home/hyungi/Documents/code/hyungi_Document_Server PYTHONPATH=app .venv/bin/python tests/scripts/reindex_all_chunks.py \\ --concurrency 3 \\ --checkpoint checkpoints/reindex.json \\ > logs/reindex.log 2>&1 & dry-run (5개만): PYTHONPATH=app .venv/bin/python tests/scripts/reindex_all_chunks.py --limit 5 기존 chunks 보유 doc 건너뛰기: PYTHONPATH=app .venv/bin/python tests/scripts/reindex_all_chunks.py --skip-existing 기존 chunks 강제 재처리 (chunk_worker가 자동으로 delete + insert): PYTHONPATH=app .venv/bin/python tests/scripts/reindex_all_chunks.py """ import argparse import asyncio import json import sys import time from pathlib import Path # PYTHONPATH=app 가정 sys.path.insert(0, str(Path(__file__).resolve().parent.parent.parent / "app")) from sqlalchemy import select # noqa: E402 from sqlalchemy.ext.asyncio import async_sessionmaker # noqa: E402 from core.database import engine # noqa: E402 from core.utils import setup_logger # noqa: E402 from models.chunk import DocumentChunk # noqa: E402 from models.document import Document # noqa: E402 from workers.chunk_worker import process # noqa: E402 logger = setup_logger("reindex") def load_checkpoint(path: Path) -> set[int]: """checkpoint 파일에서 처리 완료 doc_id 집합 복원.""" if not path.exists(): return set() try: data = json.loads(path.read_text()) return set(data.get("processed", [])) except (json.JSONDecodeError, KeyError) as e: logger.warning(f"checkpoint {path} invalid ({e}) → 새로 시작") return set() def save_checkpoint(path: Path, processed: set[int]) -> None: """처리 완료 doc_id를 checkpoint 파일에 저장 (incremental).""" path.parent.mkdir(parents=True, exist_ok=True) tmp = path.with_suffix(path.suffix + ".tmp") tmp.write_text(json.dumps({"processed": sorted(processed)}, indent=2)) tmp.replace(path) # atomic swap def format_eta(elapsed: float, done: int, total: int) -> str: """남은 작업 시간 ETA 포맷.""" if done == 0: return "?" rate = done / elapsed remaining = (total - done) / rate if remaining < 60: return f"{remaining:.0f}s" if remaining < 3600: return f"{remaining / 60:.0f}m" return f"{remaining / 3600:.1f}h" async def main(): parser = argparse.ArgumentParser(description="문서 chunk 재인덱싱 (Phase 1.2-E)") parser.add_argument( "--concurrency", type=int, default=3, help="동시 처리 doc 수 (default 3, Ollama bge-m3 부하 조절)", ) parser.add_argument( "--checkpoint", type=Path, default=Path("checkpoints/reindex.json"), help="checkpoint 파일 경로 (resume 가능)", ) parser.add_argument( "--rate-limit", type=float, default=0.1, help="작업 간 sleep (초, Ollama 보호)", ) parser.add_argument( "--limit", type=int, default=None, help="처리할 doc 수 제한 (dry-run 용)", ) parser.add_argument( "--skip-existing", action="store_true", help="이미 chunks 있는 doc skip (재처리 생략)", ) args = parser.parse_args() Session = async_sessionmaker(engine) # 1. 대상 docs 수집 async with Session() as session: query = ( select(Document.id) .where( Document.deleted_at.is_(None), Document.extracted_text.is_not(None), ) .order_by(Document.id) ) result = await session.execute(query) all_doc_ids = [row[0] for row in result] if args.skip_existing: existing_query = select(DocumentChunk.doc_id).distinct() existing_result = await session.execute(existing_query) existing = {row[0] for row in existing_result} logger.info(f"skip-existing: 기존 chunks 보유 doc {len(existing)}건") else: existing = set() # 2. checkpoint resume processed = load_checkpoint(args.checkpoint) if processed: logger.info(f"checkpoint: 이미 처리됨 {len(processed)}건 (resume)") # 3. 처리 대상 = 전체 - skip_existing - checkpoint targets = [d for d in all_doc_ids if d not in processed and d not in existing] if args.limit: targets = targets[: args.limit] total = len(targets) logger.info( f"REINDEX 시작: 전체 {len(all_doc_ids)} docs / 처리 대상 {total} docs" f" / concurrency={args.concurrency} rate_limit={args.rate_limit}s" ) if total == 0: logger.info("처리할 doc 없음. 종료.") return semaphore = asyncio.Semaphore(args.concurrency) done_count = 0 fail_count = 0 start_time = time.monotonic() log_interval = max(1, total // 50) # ~2% 단위 진행 로그 async def process_one(doc_id: int) -> None: nonlocal done_count, fail_count async with semaphore: try: async with Session() as session: await process(doc_id, session) await session.commit() # rate limit (Ollama 보호) await asyncio.sleep(args.rate_limit) done_count += 1 processed.add(doc_id) # 진행 로그 + 체크포인트 저장 if done_count % log_interval == 0 or done_count == total: elapsed = time.monotonic() - start_time pct = (done_count / total) * 100 eta = format_eta(elapsed, done_count, total) logger.info( f"[REINDEX] {done_count}/{total} ({pct:.1f}%)" f" ETA: {eta} elapsed: {elapsed:.0f}s fails: {fail_count}" ) save_checkpoint(args.checkpoint, processed) except Exception as e: fail_count += 1 logger.warning( f"[REINDEX] doc {doc_id} 실패: {type(e).__name__}: {e}" ) tasks = [process_one(doc_id) for doc_id in targets] await asyncio.gather(*tasks) elapsed = time.monotonic() - start_time save_checkpoint(args.checkpoint, processed) logger.info( f"[REINDEX] 완료: {done_count}/{total} done, {fail_count} fails, {elapsed:.0f}s" ) if __name__ == "__main__": asyncio.run(main())