c3d237766d
- 후보 섀도 테이블 6종(전부 vector 타입 — eval=exact scan 이라 인덱스 불요, halfvec 은 C-1 소관) - workers/phase2a_cand_backfill: resumable(NOT EXISTS)·배치 커밋·동결셋 한정(--doc/chunk-id-max), 문서/청크 입력 = production 경로 동일 구성 + plain - CANDIDATE_BACKEND_MAP += cand_qwen06/qwen4/qwen4m (embed_kind=ollama, 쿼리측 instruct prefix G-1 핀 문자열, qwen4m = dimensions 1024 MRL) - qwen4m 적재는 qwen4 에서 SQL 파생(subvector+l2_normalize) — 본 CLI 비대상 Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
143 lines
6.3 KiB
Python
143 lines
6.3 KiB
Python
"""Phase 2A 후보 임베딩 백필 CLI (embedding-phase2a-1 E-1).
|
|
|
|
docker compose exec -T fastapi python -m workers.phase2a_cand_backfill \
|
|
--target qwen06 --doc-id-max 41944 --chunk-id-max 104140 [--batch 32]
|
|
|
|
설계 원칙 (plan r3):
|
|
- resumable/idempotent: 대상 = NOT EXISTS(후보 테이블) — 중단/재실행 시 이어서.
|
|
배치 단위 커밋. C-1 백필 게이트 = "후보 카운트 == 동결셋 카운트".
|
|
- 동결셋: id <= *_id_max AND 베이스라인 embedding IS NOT NULL (AND docs.deleted_at IS NULL).
|
|
cand 테이블은 동결 범위로만 INSERT (retrieval cand path 가 snapshot filter 를 안 타는 전제).
|
|
- 문서/청크 입력 = production 경로와 동일 구성(embed_worker._build_embed_input /
|
|
chunk_worker 의 [제목][섹션][본문]) + plain (instruct prefix 는 쿼리 측 전용 — G-1 불변식).
|
|
- 임베딩 = Ollama /api/embed 배치 호출 (G-1 fixture: 정규화 출력).
|
|
- qwen4m 은 본 CLI 대상이 아님 — qwen4 적재 후 SQL 파생(subvector+l2_normalize), plan E-1.
|
|
"""
|
|
|
|
import argparse
|
|
import asyncio
|
|
import hashlib
|
|
import time
|
|
|
|
import httpx
|
|
from sqlalchemy import text
|
|
|
|
from core.database import async_session
|
|
from core.utils import setup_logger
|
|
from models.document import Document
|
|
from workers.embed_worker import _build_embed_input
|
|
|
|
logger = setup_logger("phase2a_cand_backfill")
|
|
|
|
OLLAMA_EMBED = "http://ollama:11434/api/embed"
|
|
|
|
TARGETS = {
|
|
"qwen06": {
|
|
"model": "qwen3-embedding:0.6b", "dim": 1024,
|
|
"docs": "documents_cand_qwen06", "chunks": "document_chunks_cand_qwen06",
|
|
},
|
|
"qwen4": {
|
|
"model": "qwen3-embedding:4b", "dim": 2560,
|
|
"docs": "documents_cand_qwen4", "chunks": "document_chunks_cand_qwen4",
|
|
},
|
|
}
|
|
|
|
|
|
async def _embed_batch(client: httpx.AsyncClient, model: str, texts: list[str]) -> list[list[float]]:
|
|
r = await client.post(OLLAMA_EMBED, json={"model": model, "input": texts}, timeout=600)
|
|
r.raise_for_status()
|
|
embs = r.json()["embeddings"]
|
|
if len(embs) != len(texts):
|
|
raise RuntimeError(f"embed count mismatch: {len(embs)} != {len(texts)}")
|
|
return embs
|
|
|
|
|
|
async def backfill_docs(target: dict, doc_id_max: int, batch: int, http: httpx.AsyncClient) -> int:
|
|
total = 0
|
|
while True:
|
|
async with async_session() as session:
|
|
rows = (await session.execute(text(f"""
|
|
SELECT d.id FROM documents d
|
|
WHERE d.id <= :m AND d.embedding IS NOT NULL AND d.deleted_at IS NULL
|
|
AND NOT EXISTS (SELECT 1 FROM {target['docs']} c WHERE c.doc_id = d.id)
|
|
ORDER BY d.id LIMIT :b
|
|
"""), {"m": doc_id_max, "b": batch})).scalars().all()
|
|
if not rows:
|
|
break
|
|
docs = [(await session.get(Document, i)) for i in rows]
|
|
inputs = [_build_embed_input(d) for d in docs]
|
|
embs = await _embed_batch(http, target["model"], inputs)
|
|
for d, inp, e in zip(docs, inputs, embs):
|
|
await session.execute(text(f"""
|
|
INSERT INTO {target['docs']} (doc_id, embed_input_hash, embedding)
|
|
VALUES (:i, :h, cast(:e AS vector))
|
|
ON CONFLICT (doc_id) DO NOTHING
|
|
"""), {"i": d.id, "h": hashlib.sha256(inp.encode()).hexdigest()[:16], "e": str(e)})
|
|
await session.commit()
|
|
total += len(rows)
|
|
if total % (batch * 10) < batch:
|
|
logger.info(f"[{target['docs']}] +{total} (last id={rows[-1]})")
|
|
return total
|
|
|
|
|
|
async def backfill_chunks(target: dict, chunk_id_max: int, batch: int, http: httpx.AsyncClient) -> int:
|
|
total = 0
|
|
while True:
|
|
async with async_session() as session:
|
|
rows = (await session.execute(text(f"""
|
|
SELECT c.id, c.doc_id, c.chunk_index, c.section_title, c.text, d.title
|
|
FROM corpus_chunks c JOIN documents d ON d.id = c.doc_id
|
|
WHERE c.id <= :m AND c.embedding IS NOT NULL AND d.deleted_at IS NULL
|
|
AND NOT EXISTS (SELECT 1 FROM {target['chunks']} k WHERE k.id = c.id)
|
|
ORDER BY c.id LIMIT :b
|
|
"""), {"m": chunk_id_max, "b": batch})).all()
|
|
if not rows:
|
|
break
|
|
inputs = [
|
|
f"[제목] {r.title or ''}\n[섹션] {r.section_title or ''}\n[본문] {r.text}"
|
|
for r in rows
|
|
]
|
|
embs = await _embed_batch(http, target["model"], inputs)
|
|
for r, e in zip(rows, embs):
|
|
await session.execute(text(f"""
|
|
INSERT INTO {target['chunks']} (id, doc_id, chunk_index, section_title, text, embedding)
|
|
VALUES (:i, :d, :x, :s, :t, cast(:e AS vector))
|
|
ON CONFLICT (id) DO NOTHING
|
|
"""), {"i": r.id, "d": r.doc_id, "x": r.chunk_index,
|
|
"s": r.section_title, "t": r.text, "e": str(e)})
|
|
await session.commit()
|
|
total += len(rows)
|
|
if total % (batch * 10) < batch:
|
|
logger.info(f"[{target['chunks']}] +{total} (last id={rows[-1]})")
|
|
return total
|
|
|
|
|
|
async def run(target_key: str, doc_id_max: int, chunk_id_max: int, batch: int) -> None:
|
|
target = TARGETS[target_key]
|
|
start = time.monotonic()
|
|
async with httpx.AsyncClient() as http:
|
|
nd = await backfill_docs(target, doc_id_max, batch, http)
|
|
nc = await backfill_chunks(target, chunk_id_max, batch, http)
|
|
mins = (time.monotonic() - start) / 60
|
|
async with async_session() as session:
|
|
cd = (await session.execute(text(f"SELECT count(*) FROM {target['docs']}"))).scalar_one()
|
|
cc = (await session.execute(text(f"SELECT count(*) FROM {target['chunks']}"))).scalar_one()
|
|
logger.info(
|
|
f"[{target_key}] 완료 — 이번 run docs +{nd} chunks +{nc} ({mins:.1f}분) · "
|
|
f"누적 docs {cd} / chunks {cc} (동결 게이트 = 베이스라인 동결셋 카운트와 일치 확인)"
|
|
)
|
|
|
|
|
|
def main() -> None:
|
|
p = argparse.ArgumentParser(description="Phase 2A 후보 임베딩 백필 (resumable)")
|
|
p.add_argument("--target", required=True, choices=sorted(TARGETS))
|
|
p.add_argument("--doc-id-max", type=int, required=True)
|
|
p.add_argument("--chunk-id-max", type=int, required=True)
|
|
p.add_argument("--batch", type=int, default=32)
|
|
a = p.parse_args()
|
|
asyncio.run(run(a.target, a.doc_id_max, a.chunk_id_max, a.batch))
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|