feat(chunk): Phase 1.2-E reindex 스크립트 추가

tests/scripts/reindex_all_chunks.py — 전체 documents chunk 재인덱싱 도구.

핵심 요건 (사용자 정의):
- asyncio.Semaphore(N) — 동시 처리 수 제한 (기본 3, Ollama bge-m3 부하 조절)
- checkpoint resume — JSON 파일 atomic swap, 중간 실패/중단 후 재시작 가능
- rate limiting — 작업 간 sleep 0.1초 (Ollama API 보호)
- 진행 로그 — [REINDEX] N/total (P%) ETA: ... fails: N (~2% 단위)

CLI:
- --concurrency, --checkpoint, --rate-limit, --limit (dry-run), --skip-existing

야간 배치 (00:00~06:00):
  PYTHONPATH=app .venv/bin/python tests/scripts/reindex_all_chunks.py \
    --concurrency 3 --checkpoint checkpoints/reindex.json \
    > logs/reindex.log 2>&1 &
This commit is contained in:
Hyungi Ahn
2026-04-08 12:31:29 +09:00
parent 731d1396e8
commit 42dfe82c9b
2 changed files with 204 additions and 0 deletions

View File

View File

@@ -0,0 +1,204 @@
"""문서 chunk 재인덱싱 (Phase 1.2-E).
전체 documents를 chunk_worker로 재처리. 야간 배치 권장 (00:00~06:00).
핵심 요건 (사용자 정의):
- concurrency 제한 (asyncio.Semaphore) — Ollama 부하 조절
- checkpoint resume (중간 실패/중단 대비)
- rate limiting (Ollama API 보호)
- 진행 로그 ([REINDEX] N/total (P%) ETA: ...)
사용:
cd /home/hyungi/Documents/code/hyungi_Document_Server
PYTHONPATH=app .venv/bin/python tests/scripts/reindex_all_chunks.py \\
--concurrency 3 \\
--checkpoint checkpoints/reindex.json \\
> logs/reindex.log 2>&1 &
dry-run (5개만):
PYTHONPATH=app .venv/bin/python tests/scripts/reindex_all_chunks.py --limit 5
기존 chunks 보유 doc 건너뛰기:
PYTHONPATH=app .venv/bin/python tests/scripts/reindex_all_chunks.py --skip-existing
기존 chunks 강제 재처리 (chunk_worker가 자동으로 delete + insert):
PYTHONPATH=app .venv/bin/python tests/scripts/reindex_all_chunks.py
"""
import argparse
import asyncio
import json
import sys
import time
from pathlib import Path
# PYTHONPATH=app 가정
sys.path.insert(0, str(Path(__file__).resolve().parent.parent.parent / "app"))
from sqlalchemy import select # noqa: E402
from sqlalchemy.ext.asyncio import async_sessionmaker # noqa: E402
from core.database import engine # noqa: E402
from core.utils import setup_logger # noqa: E402
from models.chunk import DocumentChunk # noqa: E402
from models.document import Document # noqa: E402
from workers.chunk_worker import process # noqa: E402
logger = setup_logger("reindex")
def load_checkpoint(path: Path) -> set[int]:
"""checkpoint 파일에서 처리 완료 doc_id 집합 복원."""
if not path.exists():
return set()
try:
data = json.loads(path.read_text())
return set(data.get("processed", []))
except (json.JSONDecodeError, KeyError) as e:
logger.warning(f"checkpoint {path} invalid ({e}) → 새로 시작")
return set()
def save_checkpoint(path: Path, processed: set[int]) -> None:
"""처리 완료 doc_id를 checkpoint 파일에 저장 (incremental)."""
path.parent.mkdir(parents=True, exist_ok=True)
tmp = path.with_suffix(path.suffix + ".tmp")
tmp.write_text(json.dumps({"processed": sorted(processed)}, indent=2))
tmp.replace(path) # atomic swap
def format_eta(elapsed: float, done: int, total: int) -> str:
"""남은 작업 시간 ETA 포맷."""
if done == 0:
return "?"
rate = done / elapsed
remaining = (total - done) / rate
if remaining < 60:
return f"{remaining:.0f}s"
if remaining < 3600:
return f"{remaining / 60:.0f}m"
return f"{remaining / 3600:.1f}h"
async def main():
parser = argparse.ArgumentParser(description="문서 chunk 재인덱싱 (Phase 1.2-E)")
parser.add_argument(
"--concurrency",
type=int,
default=3,
help="동시 처리 doc 수 (default 3, Ollama bge-m3 부하 조절)",
)
parser.add_argument(
"--checkpoint",
type=Path,
default=Path("checkpoints/reindex.json"),
help="checkpoint 파일 경로 (resume 가능)",
)
parser.add_argument(
"--rate-limit",
type=float,
default=0.1,
help="작업 간 sleep (초, Ollama 보호)",
)
parser.add_argument(
"--limit",
type=int,
default=None,
help="처리할 doc 수 제한 (dry-run 용)",
)
parser.add_argument(
"--skip-existing",
action="store_true",
help="이미 chunks 있는 doc skip (재처리 생략)",
)
args = parser.parse_args()
Session = async_sessionmaker(engine)
# 1. 대상 docs 수집
async with Session() as session:
query = (
select(Document.id)
.where(
Document.deleted_at.is_(None),
Document.extracted_text.is_not(None),
)
.order_by(Document.id)
)
result = await session.execute(query)
all_doc_ids = [row[0] for row in result]
if args.skip_existing:
existing_query = select(DocumentChunk.doc_id).distinct()
existing_result = await session.execute(existing_query)
existing = {row[0] for row in existing_result}
logger.info(f"skip-existing: 기존 chunks 보유 doc {len(existing)}")
else:
existing = set()
# 2. checkpoint resume
processed = load_checkpoint(args.checkpoint)
if processed:
logger.info(f"checkpoint: 이미 처리됨 {len(processed)}건 (resume)")
# 3. 처리 대상 = 전체 - skip_existing - checkpoint
targets = [d for d in all_doc_ids if d not in processed and d not in existing]
if args.limit:
targets = targets[: args.limit]
total = len(targets)
logger.info(
f"REINDEX 시작: 전체 {len(all_doc_ids)} docs / 처리 대상 {total} docs"
f" / concurrency={args.concurrency} rate_limit={args.rate_limit}s"
)
if total == 0:
logger.info("처리할 doc 없음. 종료.")
return
semaphore = asyncio.Semaphore(args.concurrency)
done_count = 0
fail_count = 0
start_time = time.monotonic()
log_interval = max(1, total // 50) # ~2% 단위 진행 로그
async def process_one(doc_id: int) -> None:
nonlocal done_count, fail_count
async with semaphore:
try:
async with Session() as session:
await process(doc_id, session)
await session.commit()
# rate limit (Ollama 보호)
await asyncio.sleep(args.rate_limit)
done_count += 1
processed.add(doc_id)
# 진행 로그 + 체크포인트 저장
if done_count % log_interval == 0 or done_count == total:
elapsed = time.monotonic() - start_time
pct = (done_count / total) * 100
eta = format_eta(elapsed, done_count, total)
logger.info(
f"[REINDEX] {done_count}/{total} ({pct:.1f}%)"
f" ETA: {eta} elapsed: {elapsed:.0f}s fails: {fail_count}"
)
save_checkpoint(args.checkpoint, processed)
except Exception as e:
fail_count += 1
logger.warning(
f"[REINDEX] doc {doc_id} 실패: {type(e).__name__}: {e}"
)
tasks = [process_one(doc_id) for doc_id in targets]
await asyncio.gather(*tasks)
elapsed = time.monotonic() - start_time
save_checkpoint(args.checkpoint, processed)
logger.info(
f"[REINDEX] 완료: {done_count}/{total} done, {fail_count} fails, {elapsed:.0f}s"
)
if __name__ == "__main__":
asyncio.run(main())