feat(eval): Phase 2A Diagnose Phase 2 — candidate reindex (me5 + snowflake 페어)

phase-2a-embedding-diagnose.md v4 § 7 Phase 2 산출.
페어 invariant (R2-2): documents_cand + document_chunks_cand 동기 swap, 부분 swap 금지.

- snapshot 박제 (R2-D): v0_2_phase2a_snapshot_2026-05-23.json
  - SNAPSHOT_DOC_ID_MAX=25180 / SNAPSHOT_CHUNK_ID_MAX=56526
  - documents_n=21365 (embedded, active) / chunks_n=30605
  - production ingest 정지 0, 모든 candidate reindex + baseline rebaseline 측정이 id<=snapshot 한정

- reindex_candidate.py 신규 (R2-5):
  - reindex_documents(): production _build_embed_input() import 재사용
  - reindex_chunks(): document_chunks.text 그대로 (재 chunking 0)
  - TEI batch=8 (1.7 internal queue overflow 회피) + truncate=true (mE5 512 context)
  - retry-8 exponential backoff (10/20/40/80/90s) — TEI SIGSEGV 자동 복구
  - idempotent ON CONFLICT DO NOTHING (cancellation/resume 안전)

- docker-compose.override.cand.yml: restart=unless-stopped (TEI 1.7 panic 자동 복구)

DB 산출물 (4 테이블):
  - documents_cand_me5_large_inst       : 21365 rows (dim 1024) + ivfflat lists=100
  - document_chunks_cand_me5_large_inst : 30605 rows (dim 1024) + ivfflat lists=100
  - documents_cand_snowflake_l_v2       : 21365 rows (dim 1024) + ivfflat lists=100
  - document_chunks_cand_snowflake_l_v2 : 30605 rows (dim 1024) + ivfflat lists=100
  - ivfflat.probes=20 (production 동일) 보존
  - smoke retrieval (nearest neighbor SQL) PASS 후보 2종

production 영향:
  - documents / document_chunks 컬럼/row 변경 0
  - config.yaml 변경 0 (ollama bge-m3 unchanged)
  - production fastapi/postgres/reranker 변경 0 (profile embed-cand 격리)

다음 단계: Phase 3 (DS API + retrieval_service slug-based dispatcher 추가, baseline rebaseline + 2 후보 51 case 측정).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
hyungi
2026-05-23 06:26:14 +00:00
parent 943ac5f59c
commit a67df0a10b
3 changed files with 217 additions and 0 deletions
+2
View File
@@ -21,6 +21,7 @@
services:
embedding-cand-me5-inst:
image: ghcr.io/huggingface/text-embeddings-inference:1.7
restart: unless-stopped
container_name: hyungi_document_server-embedding-cand-me5-inst-1
expose:
- "80"
@@ -47,6 +48,7 @@ services:
embedding-cand-snowflake-l-v2:
image: ghcr.io/huggingface/text-embeddings-inference:1.7
restart: unless-stopped
container_name: hyungi_document_server-embedding-cand-snowflake-l-v2-1
expose:
- "80"
@@ -0,0 +1,9 @@
{
"snapshot_doc_id_max": 25180,
"snapshot_chunk_id_max": 56526,
"documents_n": 21365,
"chunks_n": 30605,
"captured_at": "2026-05-23T05:48:25Z",
"description": "Phase 2A 측정 corpus snapshot freeze. baseline rebaseline + candidate reindex 모두 id <= snapshot 범위 한정. production ingest 는 계속 동작.",
"plan": "phase-2a-embedding-diagnose.md v4"
}
+206
View File
@@ -0,0 +1,206 @@
#!/usr/bin/env python3
"""Phase 2A — Candidate embedding reindex (documents + chunks 페어).
plan: phase-2a-embedding-diagnose.md v4 § 7 Phase 2
Usage (DS fastapi 컨테이너 내부 실행):
docker exec hyungi_document_server-fastapi-1 python -m tests.search_eval.reindex_candidate \
--slug me5_large_inst \
--endpoint http://embedding-cand-me5-inst:80/embed \
--snapshot-doc-id-max 25180 \
--snapshot-chunk-id-max 56526
idempotent: LEFT JOIN 으로 이미 처리된 row 건너뜀, ON CONFLICT DO NOTHING.
"""
import argparse
import asyncio
import hashlib
import sys
import time
import unicodedata
from types import SimpleNamespace
import httpx
from sqlalchemy import text
# /app is the working dir inside fastapi container; ensures app.* importable
sys.path.insert(0, "/app")
from core.database import async_session
from workers.embed_worker import _build_embed_input
def canonical_hash(s: str) -> str:
normalized = unicodedata.normalize("NFKC", s.strip())
return hashlib.sha256(normalized.encode("utf-8")).hexdigest()
async def tei_embed_batch(endpoint: str, texts: list[str], retries: int = 8) -> list[list[float]]:
last_err = None
for attempt in range(retries):
try:
async with httpx.AsyncClient(timeout=180.0) as client:
r = await client.post(endpoint, json={"inputs": texts, "truncate": True})
r.raise_for_status()
data = r.json()
if not isinstance(data, list) or not all(isinstance(v, list) for v in data):
raise ValueError(f"Unexpected TEI response shape: {type(data).__name__}")
if len(data) != len(texts):
raise ValueError(f"TEI batch size mismatch: sent {len(texts)} got {len(data)}")
return data
except (httpx.RemoteProtocolError, httpx.ReadError, httpx.ConnectError, httpx.HTTPStatusError) as e:
last_err = e
wait = min(10.0 * (2 ** attempt), 90.0)
print(f'[reindex-cand] TEI error attempt={attempt+1}/{retries} err={type(e).__name__} sleeping={wait}s', flush=True)
await asyncio.sleep(wait)
raise RuntimeError(f'TEI exhausted retries: {last_err}')
async def reindex_documents(slug: str, endpoint: str, snapshot_doc_id_max: int, batch_size: int) -> None:
table = f"documents_cand_{slug}"
async with async_session() as session:
already_done = (await session.execute(text(f"SELECT count(*) FROM {table}"))).scalar() or 0
total = (await session.execute(text(
"SELECT count(*) FROM documents "
"WHERE id <= :max AND deleted_at IS NULL AND embedding IS NOT NULL"
), {"max": snapshot_doc_id_max})).scalar() or 0
print(f"[reindex-cand-docs] slug={slug} total={total} already_done={already_done}", flush=True)
start = time.time()
processed = already_done
async with async_session() as session:
stmt = text(f"""
SELECT d.id, d.title, d.ai_summary, d.ai_tags, d.extracted_text
FROM documents d
LEFT JOIN {table} c ON c.doc_id = d.id
WHERE d.id <= :max
AND d.deleted_at IS NULL
AND d.embedding IS NOT NULL
AND c.doc_id IS NULL
ORDER BY d.id
""")
result = await session.execute(stmt, {"max": snapshot_doc_id_max})
rows = result.fetchall()
for i in range(0, len(rows), batch_size):
batch = rows[i:i + batch_size]
doc_objs = [
SimpleNamespace(title=r[1], ai_summary=r[2], ai_tags=r[3], extracted_text=r[4])
for r in batch
]
texts_built = [_build_embed_input(d) for d in doc_objs]
valid = [(idx, t) for idx, t in enumerate(texts_built) if t]
if not valid:
processed += len(batch)
continue
valid_texts = [t for _, t in valid]
embeddings = await tei_embed_batch(endpoint, valid_texts)
insert_rows = [{
"doc_id": batch[idx][0],
"embed_input": t,
"embed_input_hash": canonical_hash(t),
"embedding": str(emb),
} for (idx, t), emb in zip(valid, embeddings)]
await session.execute(text(f"""
INSERT INTO {table} (doc_id, embed_input, embed_input_hash, embedding)
VALUES (:doc_id, :embed_input, :embed_input_hash, CAST(:embedding AS vector))
ON CONFLICT (doc_id) DO NOTHING
"""), insert_rows)
await session.commit()
processed += len(batch)
elapsed = time.time() - start
rate = (processed - already_done) / elapsed if elapsed > 0 else 0
print(f"[reindex-cand-docs] slug={slug} done={processed}/{total} rate={rate:.1f}/sec", flush=True)
print(f"[reindex-cand-docs] slug={slug} COMPLETE total={total} elapsed={time.time()-start:.1f}s", flush=True)
async def reindex_chunks(slug: str, endpoint: str, snapshot_chunk_id_max: int, batch_size: int) -> None:
table = f"document_chunks_cand_{slug}"
async with async_session() as session:
already_done = (await session.execute(text(f"SELECT count(*) FROM {table}"))).scalar() or 0
total = (await session.execute(text(
"SELECT count(*) FROM document_chunks WHERE id <= :max"
), {"max": snapshot_chunk_id_max})).scalar() or 0
print(f"[reindex-cand-chunks] slug={slug} total={total} already_done={already_done}", flush=True)
start = time.time()
processed = already_done
async with async_session() as session:
stmt = text(f"""
SELECT c.id, c.doc_id, c.chunk_index, c.chunk_type, c.section_title, c.heading_path, c.page,
c.language, c.country, c.source, c.domain_category, c.text
FROM document_chunks c
LEFT JOIN {table} cc ON cc.doc_id = c.doc_id AND cc.chunk_index = c.chunk_index
WHERE c.id <= :max AND cc.id IS NULL
ORDER BY c.id
""")
result = await session.execute(stmt, {"max": snapshot_chunk_id_max})
rows = result.fetchall()
for i in range(0, len(rows), batch_size):
batch = rows[i:i + batch_size]
chunk_texts = [r[11] for r in batch]
valid = [(idx, t) for idx, t in enumerate(chunk_texts) if t and t.strip()]
if not valid:
processed += len(batch)
continue
valid_texts = [t for _, t in valid]
embeddings = await tei_embed_batch(endpoint, valid_texts)
insert_rows = []
for (idx, t), emb in zip(valid, embeddings):
r = batch[idx]
insert_rows.append({
"doc_id": r[1], "chunk_index": r[2], "chunk_type": r[3],
"section_title": r[4], "heading_path": r[5], "page": r[6],
"language": r[7], "country": r[8], "source": r[9],
"domain_category": r[10], "text": t,
"text_canonical_hash": canonical_hash(t),
"embedding": str(emb),
})
await session.execute(text(f"""
INSERT INTO {table}
(doc_id, chunk_index, chunk_type, section_title, heading_path, page,
language, country, source, domain_category, text, text_canonical_hash, embedding)
VALUES
(:doc_id, :chunk_index, :chunk_type, :section_title, :heading_path, :page,
:language, :country, :source, :domain_category, :text, :text_canonical_hash,
CAST(:embedding AS vector))
ON CONFLICT (doc_id, chunk_index) DO NOTHING
"""), insert_rows)
await session.commit()
processed += len(batch)
elapsed = time.time() - start
rate = (processed - already_done) / elapsed if elapsed > 0 else 0
print(f"[reindex-cand-chunks] slug={slug} done={processed}/{total} rate={rate:.1f}/sec", flush=True)
print(f"[reindex-cand-chunks] slug={slug} COMPLETE total={total} elapsed={time.time()-start:.1f}s", flush=True)
async def main() -> None:
ap = argparse.ArgumentParser()
ap.add_argument("--slug", required=True)
ap.add_argument("--endpoint", required=True)
ap.add_argument("--snapshot-doc-id-max", type=int, required=True)
ap.add_argument("--snapshot-chunk-id-max", type=int, required=True)
ap.add_argument("--batch-size", type=int, default=8)
ap.add_argument("--documents-only", action="store_true")
ap.add_argument("--chunks-only", action="store_true")
args = ap.parse_args()
if not args.chunks_only:
await reindex_documents(args.slug, args.endpoint, args.snapshot_doc_id_max, args.batch_size)
if not args.documents_only:
await reindex_chunks(args.slug, args.endpoint, args.snapshot_chunk_id_max, args.batch_size)
if __name__ == "__main__":
asyncio.run(main())