diff --git a/docker-compose.override.cand.yml b/docker-compose.override.cand.yml index 50633db..59602c2 100644 --- a/docker-compose.override.cand.yml +++ b/docker-compose.override.cand.yml @@ -21,6 +21,7 @@ services: embedding-cand-me5-inst: image: ghcr.io/huggingface/text-embeddings-inference:1.7 + restart: unless-stopped container_name: hyungi_document_server-embedding-cand-me5-inst-1 expose: - "80" @@ -47,6 +48,7 @@ services: embedding-cand-snowflake-l-v2: image: ghcr.io/huggingface/text-embeddings-inference:1.7 + restart: unless-stopped container_name: hyungi_document_server-embedding-cand-snowflake-l-v2-1 expose: - "80" diff --git a/tests/search_eval/baselines/v0_2_phase2a_snapshot_2026-05-23.json b/tests/search_eval/baselines/v0_2_phase2a_snapshot_2026-05-23.json new file mode 100644 index 0000000..a6c460a --- /dev/null +++ b/tests/search_eval/baselines/v0_2_phase2a_snapshot_2026-05-23.json @@ -0,0 +1,9 @@ +{ + "snapshot_doc_id_max": 25180, + "snapshot_chunk_id_max": 56526, + "documents_n": 21365, + "chunks_n": 30605, + "captured_at": "2026-05-23T05:48:25Z", + "description": "Phase 2A 측정 corpus snapshot freeze. baseline rebaseline + candidate reindex 모두 id <= snapshot 범위 한정. production ingest 는 계속 동작.", + "plan": "phase-2a-embedding-diagnose.md v4" +} diff --git a/tests/search_eval/reindex_candidate.py b/tests/search_eval/reindex_candidate.py new file mode 100644 index 0000000..94a96d5 --- /dev/null +++ b/tests/search_eval/reindex_candidate.py @@ -0,0 +1,206 @@ +#!/usr/bin/env python3 +"""Phase 2A — Candidate embedding reindex (documents + chunks 페어). + +plan: phase-2a-embedding-diagnose.md v4 § 7 Phase 2 + +Usage (DS fastapi 컨테이너 내부 실행): + docker exec hyungi_document_server-fastapi-1 python -m tests.search_eval.reindex_candidate \ + --slug me5_large_inst \ + --endpoint http://embedding-cand-me5-inst:80/embed \ + --snapshot-doc-id-max 25180 \ + --snapshot-chunk-id-max 56526 + +idempotent: LEFT JOIN 으로 이미 처리된 row 건너뜀, ON CONFLICT DO NOTHING. +""" + +import argparse +import asyncio +import hashlib +import sys +import time +import unicodedata +from types import SimpleNamespace + +import httpx +from sqlalchemy import text + +# /app is the working dir inside fastapi container; ensures app.* importable +sys.path.insert(0, "/app") +from core.database import async_session +from workers.embed_worker import _build_embed_input + + +def canonical_hash(s: str) -> str: + normalized = unicodedata.normalize("NFKC", s.strip()) + return hashlib.sha256(normalized.encode("utf-8")).hexdigest() + + +async def tei_embed_batch(endpoint: str, texts: list[str], retries: int = 8) -> list[list[float]]: + last_err = None + for attempt in range(retries): + try: + async with httpx.AsyncClient(timeout=180.0) as client: + r = await client.post(endpoint, json={"inputs": texts, "truncate": True}) + r.raise_for_status() + data = r.json() + if not isinstance(data, list) or not all(isinstance(v, list) for v in data): + raise ValueError(f"Unexpected TEI response shape: {type(data).__name__}") + if len(data) != len(texts): + raise ValueError(f"TEI batch size mismatch: sent {len(texts)} got {len(data)}") + return data + except (httpx.RemoteProtocolError, httpx.ReadError, httpx.ConnectError, httpx.HTTPStatusError) as e: + last_err = e + wait = min(10.0 * (2 ** attempt), 90.0) + print(f'[reindex-cand] TEI error attempt={attempt+1}/{retries} err={type(e).__name__} sleeping={wait}s', flush=True) + await asyncio.sleep(wait) + raise RuntimeError(f'TEI exhausted retries: {last_err}') + + +async def reindex_documents(slug: str, endpoint: str, snapshot_doc_id_max: int, batch_size: int) -> None: + table = f"documents_cand_{slug}" + async with async_session() as session: + already_done = (await session.execute(text(f"SELECT count(*) FROM {table}"))).scalar() or 0 + total = (await session.execute(text( + "SELECT count(*) FROM documents " + "WHERE id <= :max AND deleted_at IS NULL AND embedding IS NOT NULL" + ), {"max": snapshot_doc_id_max})).scalar() or 0 + + print(f"[reindex-cand-docs] slug={slug} total={total} already_done={already_done}", flush=True) + start = time.time() + processed = already_done + + async with async_session() as session: + stmt = text(f""" + SELECT d.id, d.title, d.ai_summary, d.ai_tags, d.extracted_text + FROM documents d + LEFT JOIN {table} c ON c.doc_id = d.id + WHERE d.id <= :max + AND d.deleted_at IS NULL + AND d.embedding IS NOT NULL + AND c.doc_id IS NULL + ORDER BY d.id + """) + result = await session.execute(stmt, {"max": snapshot_doc_id_max}) + rows = result.fetchall() + + for i in range(0, len(rows), batch_size): + batch = rows[i:i + batch_size] + doc_objs = [ + SimpleNamespace(title=r[1], ai_summary=r[2], ai_tags=r[3], extracted_text=r[4]) + for r in batch + ] + texts_built = [_build_embed_input(d) for d in doc_objs] + valid = [(idx, t) for idx, t in enumerate(texts_built) if t] + if not valid: + processed += len(batch) + continue + valid_texts = [t for _, t in valid] + embeddings = await tei_embed_batch(endpoint, valid_texts) + + insert_rows = [{ + "doc_id": batch[idx][0], + "embed_input": t, + "embed_input_hash": canonical_hash(t), + "embedding": str(emb), + } for (idx, t), emb in zip(valid, embeddings)] + + await session.execute(text(f""" + INSERT INTO {table} (doc_id, embed_input, embed_input_hash, embedding) + VALUES (:doc_id, :embed_input, :embed_input_hash, CAST(:embedding AS vector)) + ON CONFLICT (doc_id) DO NOTHING + """), insert_rows) + await session.commit() + + processed += len(batch) + elapsed = time.time() - start + rate = (processed - already_done) / elapsed if elapsed > 0 else 0 + print(f"[reindex-cand-docs] slug={slug} done={processed}/{total} rate={rate:.1f}/sec", flush=True) + + print(f"[reindex-cand-docs] slug={slug} COMPLETE total={total} elapsed={time.time()-start:.1f}s", flush=True) + + +async def reindex_chunks(slug: str, endpoint: str, snapshot_chunk_id_max: int, batch_size: int) -> None: + table = f"document_chunks_cand_{slug}" + async with async_session() as session: + already_done = (await session.execute(text(f"SELECT count(*) FROM {table}"))).scalar() or 0 + total = (await session.execute(text( + "SELECT count(*) FROM document_chunks WHERE id <= :max" + ), {"max": snapshot_chunk_id_max})).scalar() or 0 + + print(f"[reindex-cand-chunks] slug={slug} total={total} already_done={already_done}", flush=True) + start = time.time() + processed = already_done + + async with async_session() as session: + stmt = text(f""" + SELECT c.id, c.doc_id, c.chunk_index, c.chunk_type, c.section_title, c.heading_path, c.page, + c.language, c.country, c.source, c.domain_category, c.text + FROM document_chunks c + LEFT JOIN {table} cc ON cc.doc_id = c.doc_id AND cc.chunk_index = c.chunk_index + WHERE c.id <= :max AND cc.id IS NULL + ORDER BY c.id + """) + result = await session.execute(stmt, {"max": snapshot_chunk_id_max}) + rows = result.fetchall() + + for i in range(0, len(rows), batch_size): + batch = rows[i:i + batch_size] + chunk_texts = [r[11] for r in batch] + valid = [(idx, t) for idx, t in enumerate(chunk_texts) if t and t.strip()] + if not valid: + processed += len(batch) + continue + valid_texts = [t for _, t in valid] + embeddings = await tei_embed_batch(endpoint, valid_texts) + + insert_rows = [] + for (idx, t), emb in zip(valid, embeddings): + r = batch[idx] + insert_rows.append({ + "doc_id": r[1], "chunk_index": r[2], "chunk_type": r[3], + "section_title": r[4], "heading_path": r[5], "page": r[6], + "language": r[7], "country": r[8], "source": r[9], + "domain_category": r[10], "text": t, + "text_canonical_hash": canonical_hash(t), + "embedding": str(emb), + }) + + await session.execute(text(f""" + INSERT INTO {table} + (doc_id, chunk_index, chunk_type, section_title, heading_path, page, + language, country, source, domain_category, text, text_canonical_hash, embedding) + VALUES + (:doc_id, :chunk_index, :chunk_type, :section_title, :heading_path, :page, + :language, :country, :source, :domain_category, :text, :text_canonical_hash, + CAST(:embedding AS vector)) + ON CONFLICT (doc_id, chunk_index) DO NOTHING + """), insert_rows) + await session.commit() + + processed += len(batch) + elapsed = time.time() - start + rate = (processed - already_done) / elapsed if elapsed > 0 else 0 + print(f"[reindex-cand-chunks] slug={slug} done={processed}/{total} rate={rate:.1f}/sec", flush=True) + + print(f"[reindex-cand-chunks] slug={slug} COMPLETE total={total} elapsed={time.time()-start:.1f}s", flush=True) + + +async def main() -> None: + ap = argparse.ArgumentParser() + ap.add_argument("--slug", required=True) + ap.add_argument("--endpoint", required=True) + ap.add_argument("--snapshot-doc-id-max", type=int, required=True) + ap.add_argument("--snapshot-chunk-id-max", type=int, required=True) + ap.add_argument("--batch-size", type=int, default=8) + ap.add_argument("--documents-only", action="store_true") + ap.add_argument("--chunks-only", action="store_true") + args = ap.parse_args() + + if not args.chunks_only: + await reindex_documents(args.slug, args.endpoint, args.snapshot_doc_id_max, args.batch_size) + if not args.documents_only: + await reindex_chunks(args.slug, args.endpoint, args.snapshot_chunk_id_max, args.batch_size) + + +if __name__ == "__main__": + asyncio.run(main())