feat(search): Phase 2A E-1 — Qwen 후보 3종 백필 CLI + eval 디스패처 확장 (마이그 328~333)

- 후보 섀도 테이블 6종(전부 vector 타입 — eval=exact scan 이라 인덱스 불요, halfvec 은 C-1 소관)
- workers/phase2a_cand_backfill: resumable(NOT EXISTS)·배치 커밋·동결셋 한정(--doc/chunk-id-max),
  문서/청크 입력 = production 경로 동일 구성 + plain
- CANDIDATE_BACKEND_MAP += cand_qwen06/qwen4/qwen4m (embed_kind=ollama, 쿼리측 instruct prefix
  G-1 핀 문자열, qwen4m = dimensions 1024 MRL)
- qwen4m 적재는 qwen4 에서 SQL 파생(subvector+l2_normalize) — 본 CLI 비대상

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
This commit is contained in:
hyungi
2026-06-12 08:29:53 +09:00
parent 5bc68c95f6
commit c3d237766d
9 changed files with 357 additions and 1 deletions
+65 -1
View File
@@ -63,8 +63,41 @@ CANDIDATE_BACKEND_MAP: dict[str, dict[str, str] | None] = {
"chunks_table": "document_chunks_cand_snowflake_l_v2",
"embed_endpoint": "http://embedding-cand-snowflake-l-v2:80/embed",
},
# ─── Phase 2A (embedding-phase2a-1, 2026-06-12): Qwen3-Embedding 후보 3종 ───
# embed_kind="ollama" = /api/embed 호출 + 쿼리측 instruct prefix (비대칭 사용,
# G-1 fixture 실측: prefix 가 관련쌍 cos +0.016). 문서측은 backfill 이 plain 으로 적재.
# qwen4m = 4B 의 MRL 1024d (dimensions 옵션 — Ollama 가 truncate+재정규화 수행, G-1 실측).
"cand_qwen06": {
"docs_table": "documents_cand_qwen06",
"chunks_table": "document_chunks_cand_qwen06",
"embed_endpoint": "http://ollama:11434/api/embed",
"embed_kind": "ollama",
"embed_model": "qwen3-embedding:0.6b",
},
"cand_qwen4": {
"docs_table": "documents_cand_qwen4",
"chunks_table": "document_chunks_cand_qwen4",
"embed_endpoint": "http://ollama:11434/api/embed",
"embed_kind": "ollama",
"embed_model": "qwen3-embedding:4b",
},
"cand_qwen4m": {
"docs_table": "documents_cand_qwen4m",
"chunks_table": "document_chunks_cand_qwen4m",
"embed_endpoint": "http://ollama:11434/api/embed",
"embed_kind": "ollama",
"embed_model": "qwen3-embedding:4b",
"embed_dimensions": 1024,
},
}
# G-1 핀 고정 instruct 문자열 (inventory 2026-06-12-c 기록과 동일해야 함 —
# 문구 변경 = 저장=조회 불변식 위반과 동급. 쿼리 측 전용, 문서 적재는 plain).
QWEN3_QUERY_INSTRUCT = (
"Instruct: Given a web search query, retrieve relevant passages that answer the query"
"\nQuery: "
)
# 2단계 gate (R2-B1) — SQL string interpolation 직전 final allowlist.
_VALID_DOCS_TABLE = re.compile(r"^(documents|documents_cand_[a-z0-9_]+)$")
# corpus_chunks = document_chunks WHERE in_corpus=true 뷰 (Hier-Decomp-1 c2 choke point).
@@ -137,6 +170,34 @@ async def _embed_query_via_tei(endpoint: str, text_: str) -> list[float] | None:
return None
async def _embed_query_via_ollama(cfg: dict, text_: str) -> list[float] | None:
"""Phase 2A 후보 쿼리 임베딩 — Ollama /api/embed + 비대칭 instruct prefix.
쿼리 측 전용: QWEN3_QUERY_INSTRUCT 를 선두에 붙인다 (문서 적재 = plain).
embed_dimensions 지정(qwen4m) 시 Ollama dimensions 옵션 = MRL truncate+재정규화
(G-1 fixture: 1024 출력 L2=1.0 실측). cache 미사용 — slug 별 분포 상이.
"""
if not text_:
return None
import httpx
body: dict = {"model": cfg["embed_model"], "input": [QWEN3_QUERY_INSTRUCT + text_]}
if cfg.get("embed_dimensions"):
body["dimensions"] = cfg["embed_dimensions"]
try:
async with httpx.AsyncClient(timeout=60.0) as c:
r = await c.post(cfg["embed_endpoint"], json=body)
r.raise_for_status()
embs = r.json().get("embeddings")
if not isinstance(embs, list) or not embs or not isinstance(embs[0], list):
raise ValueError("unexpected /api/embed shape")
return embs[0]
except Exception as exc:
logger.warning(
"candidate ollama embed failed model=%s err=%r", cfg.get("embed_model"), exc
)
return None
def _query_embed_key(text_: str) -> str:
return hashlib.sha256(f"{text_}|bge-m3".encode("utf-8")).hexdigest()
@@ -323,7 +384,10 @@ async def search_vector(
else:
docs_table = cfg["docs_table"]
chunks_table = cfg["chunks_table"]
query_embedding = await _embed_query_via_tei(cfg["embed_endpoint"], query)
if cfg.get("embed_kind") == "ollama":
query_embedding = await _embed_query_via_ollama(cfg, query)
else:
query_embedding = await _embed_query_via_tei(cfg["embed_endpoint"], query)
logger.info(
"[embedding-dispatch] backend=%s docs_table=%s chunks_table=%s snapshot_doc_id_max=%s "
+142
View File
@@ -0,0 +1,142 @@
"""Phase 2A 후보 임베딩 백필 CLI (embedding-phase2a-1 E-1).
docker compose exec -T fastapi python -m workers.phase2a_cand_backfill \
--target qwen06 --doc-id-max 41944 --chunk-id-max 104140 [--batch 32]
설계 원칙 (plan r3):
- resumable/idempotent: 대상 = NOT EXISTS(후보 테이블) — 중단/재실행 시 이어서.
배치 단위 커밋. C-1 백필 게이트 = "후보 카운트 == 동결셋 카운트".
- 동결셋: id <= *_id_max AND 베이스라인 embedding IS NOT NULL (AND docs.deleted_at IS NULL).
cand 테이블은 동결 범위로만 INSERT (retrieval cand path 가 snapshot filter 를 안 타는 전제).
- 문서/청크 입력 = production 경로와 동일 구성(embed_worker._build_embed_input /
chunk_worker 의 [제목][섹션][본문]) + plain (instruct prefix 는 쿼리 측 전용 — G-1 불변식).
- 임베딩 = Ollama /api/embed 배치 호출 (G-1 fixture: 정규화 출력).
- qwen4m 은 본 CLI 대상이 아님 — qwen4 적재 후 SQL 파생(subvector+l2_normalize), plan E-1.
"""
import argparse
import asyncio
import hashlib
import time
import httpx
from sqlalchemy import text
from core.database import async_session
from core.utils import setup_logger
from models.document import Document
from workers.embed_worker import _build_embed_input
logger = setup_logger("phase2a_cand_backfill")
OLLAMA_EMBED = "http://ollama:11434/api/embed"
TARGETS = {
"qwen06": {
"model": "qwen3-embedding:0.6b", "dim": 1024,
"docs": "documents_cand_qwen06", "chunks": "document_chunks_cand_qwen06",
},
"qwen4": {
"model": "qwen3-embedding:4b", "dim": 2560,
"docs": "documents_cand_qwen4", "chunks": "document_chunks_cand_qwen4",
},
}
async def _embed_batch(client: httpx.AsyncClient, model: str, texts: list[str]) -> list[list[float]]:
r = await client.post(OLLAMA_EMBED, json={"model": model, "input": texts}, timeout=600)
r.raise_for_status()
embs = r.json()["embeddings"]
if len(embs) != len(texts):
raise RuntimeError(f"embed count mismatch: {len(embs)} != {len(texts)}")
return embs
async def backfill_docs(target: dict, doc_id_max: int, batch: int, http: httpx.AsyncClient) -> int:
total = 0
while True:
async with async_session() as session:
rows = (await session.execute(text(f"""
SELECT d.id FROM documents d
WHERE d.id <= :m AND d.embedding IS NOT NULL AND d.deleted_at IS NULL
AND NOT EXISTS (SELECT 1 FROM {target['docs']} c WHERE c.doc_id = d.id)
ORDER BY d.id LIMIT :b
"""), {"m": doc_id_max, "b": batch})).scalars().all()
if not rows:
break
docs = [(await session.get(Document, i)) for i in rows]
inputs = [_build_embed_input(d) for d in docs]
embs = await _embed_batch(http, target["model"], inputs)
for d, inp, e in zip(docs, inputs, embs):
await session.execute(text(f"""
INSERT INTO {target['docs']} (doc_id, embed_input_hash, embedding)
VALUES (:i, :h, cast(:e AS vector))
ON CONFLICT (doc_id) DO NOTHING
"""), {"i": d.id, "h": hashlib.sha256(inp.encode()).hexdigest()[:16], "e": str(e)})
await session.commit()
total += len(rows)
if total % (batch * 10) < batch:
logger.info(f"[{target['docs']}] +{total} (last id={rows[-1]})")
return total
async def backfill_chunks(target: dict, chunk_id_max: int, batch: int, http: httpx.AsyncClient) -> int:
total = 0
while True:
async with async_session() as session:
rows = (await session.execute(text(f"""
SELECT c.id, c.doc_id, c.chunk_index, c.section_title, c.text, d.title
FROM corpus_chunks c JOIN documents d ON d.id = c.doc_id
WHERE c.id <= :m AND c.embedding IS NOT NULL AND d.deleted_at IS NULL
AND NOT EXISTS (SELECT 1 FROM {target['chunks']} k WHERE k.id = c.id)
ORDER BY c.id LIMIT :b
"""), {"m": chunk_id_max, "b": batch})).all()
if not rows:
break
inputs = [
f"[제목] {r.title or ''}\n[섹션] {r.section_title or ''}\n[본문] {r.text}"
for r in rows
]
embs = await _embed_batch(http, target["model"], inputs)
for r, e in zip(rows, embs):
await session.execute(text(f"""
INSERT INTO {target['chunks']} (id, doc_id, chunk_index, section_title, text, embedding)
VALUES (:i, :d, :x, :s, :t, cast(:e AS vector))
ON CONFLICT (id) DO NOTHING
"""), {"i": r.id, "d": r.doc_id, "x": r.chunk_index,
"s": r.section_title, "t": r.text, "e": str(e)})
await session.commit()
total += len(rows)
if total % (batch * 10) < batch:
logger.info(f"[{target['chunks']}] +{total} (last id={rows[-1]})")
return total
async def run(target_key: str, doc_id_max: int, chunk_id_max: int, batch: int) -> None:
target = TARGETS[target_key]
start = time.monotonic()
async with httpx.AsyncClient() as http:
nd = await backfill_docs(target, doc_id_max, batch, http)
nc = await backfill_chunks(target, chunk_id_max, batch, http)
mins = (time.monotonic() - start) / 60
async with async_session() as session:
cd = (await session.execute(text(f"SELECT count(*) FROM {target['docs']}"))).scalar_one()
cc = (await session.execute(text(f"SELECT count(*) FROM {target['chunks']}"))).scalar_one()
logger.info(
f"[{target_key}] 완료 — 이번 run docs +{nd} chunks +{nc} ({mins:.1f}분) · "
f"누적 docs {cd} / chunks {cc} (동결 게이트 = 베이스라인 동결셋 카운트와 일치 확인)"
)
def main() -> None:
p = argparse.ArgumentParser(description="Phase 2A 후보 임베딩 백필 (resumable)")
p.add_argument("--target", required=True, choices=sorted(TARGETS))
p.add_argument("--doc-id-max", type=int, required=True)
p.add_argument("--chunk-id-max", type=int, required=True)
p.add_argument("--batch", type=int, default=32)
a = p.parse_args()
asyncio.run(run(a.target, a.doc_id_max, a.chunk_id_max, a.batch))
if __name__ == "__main__":
main()
+8
View File
@@ -0,0 +1,8 @@
-- Phase 2A (embedding-phase2a-1 E-1): 후보 임베딩 docs 섀도 테이블 (eval 전용, 단일 statement).
-- 평가 = exact scan 이라 벡터 인덱스 없음 (인덱스 전략 = C-1 컷오버 소관).
CREATE TABLE IF NOT EXISTS documents_cand_qwen06 (
doc_id BIGINT PRIMARY KEY,
embed_input_hash TEXT,
embedding vector(1024) NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
@@ -0,0 +1,10 @@
-- Phase 2A (embedding-phase2a-1 E-1): 후보 임베딩 chunks 섀도 테이블 (eval 전용, 단일 statement).
CREATE TABLE IF NOT EXISTS document_chunks_cand_qwen06 (
id BIGINT PRIMARY KEY,
doc_id BIGINT NOT NULL,
chunk_index INTEGER,
section_title TEXT,
text TEXT,
embedding vector(1024) NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
+8
View File
@@ -0,0 +1,8 @@
-- Phase 2A (embedding-phase2a-1 E-1): 후보 임베딩 docs 섀도 테이블 (eval 전용, 단일 statement).
-- 평가 = exact scan 이라 벡터 인덱스 없음 (인덱스 전략 = C-1 컷오버 소관).
CREATE TABLE IF NOT EXISTS documents_cand_qwen4 (
doc_id BIGINT PRIMARY KEY,
embed_input_hash TEXT,
embedding vector(2560) NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
@@ -0,0 +1,10 @@
-- Phase 2A (embedding-phase2a-1 E-1): 후보 임베딩 chunks 섀도 테이블 (eval 전용, 단일 statement).
CREATE TABLE IF NOT EXISTS document_chunks_cand_qwen4 (
id BIGINT PRIMARY KEY,
doc_id BIGINT NOT NULL,
chunk_index INTEGER,
section_title TEXT,
text TEXT,
embedding vector(2560) NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
+8
View File
@@ -0,0 +1,8 @@
-- Phase 2A (embedding-phase2a-1 E-1): 후보 임베딩 docs 섀도 테이블 (eval 전용, 단일 statement).
-- 평가 = exact scan 이라 벡터 인덱스 없음 (인덱스 전략 = C-1 컷오버 소관).
CREATE TABLE IF NOT EXISTS documents_cand_qwen4m (
doc_id BIGINT PRIMARY KEY,
embed_input_hash TEXT,
embedding vector(1024) NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
@@ -0,0 +1,10 @@
-- Phase 2A (embedding-phase2a-1 E-1): 후보 임베딩 chunks 섀도 테이블 (eval 전용, 단일 statement).
CREATE TABLE IF NOT EXISTS document_chunks_cand_qwen4m (
id BIGINT PRIMARY KEY,
doc_id BIGINT NOT NULL,
chunk_index INTEGER,
section_title TEXT,
text TEXT,
embedding vector(1024) NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
+96
View File
@@ -0,0 +1,96 @@
"""Phase 2A (embedding-phase2a-1) — Qwen 후보 디스패처/쿼리 임베딩 단위 테스트."""
from __future__ import annotations
import pytest
from services.search import retrieval_service as rs
def test_resolve_qwen_backends():
for slug in ("cand_qwen06", "cand_qwen4", "cand_qwen4m"):
cfg = rs._resolve_backend(slug)
assert cfg["docs_table"].startswith("documents_cand_qwen")
assert cfg["chunks_table"].startswith("document_chunks_cand_qwen")
assert cfg["embed_kind"] == "ollama"
# 테이블명이 2단계 SQL allowlist 도 통과해야 함 (R2-B1)
assert rs._VALID_DOCS_TABLE.match(cfg["docs_table"])
assert rs._VALID_CHUNKS_TABLE.match(cfg["chunks_table"])
assert rs._resolve_backend("baseline") is None
with pytest.raises(ValueError):
rs._resolve_backend("cand_unknown")
def test_qwen4m_has_mrl_dimensions():
assert rs._resolve_backend("cand_qwen4m")["embed_dimensions"] == 1024
assert "embed_dimensions" not in rs._resolve_backend("cand_qwen4")
class _FakeResp:
def __init__(self, embs):
self._embs = embs
def raise_for_status(self):
return None
def json(self):
return {"embeddings": self._embs}
class _FakeClient:
"""httpx.AsyncClient 대역 — post body 캡처."""
captured: dict = {}
def __init__(self, *a, **k):
pass
async def __aenter__(self):
return self
async def __aexit__(self, *a):
return False
async def post(self, url, json=None):
_FakeClient.captured = {"url": url, "json": json}
dim = (json or {}).get("dimensions") or 1024
return _FakeResp([[0.1] * dim])
@pytest.mark.asyncio
async def test_ollama_query_embed_applies_instruct_prefix(monkeypatch):
import httpx
monkeypatch.setattr(httpx, "AsyncClient", _FakeClient)
cfg = rs._resolve_backend("cand_qwen06")
out = await rs._embed_query_via_ollama(cfg, "압력용기 수압시험")
assert out is not None and len(out) == 1024
body = _FakeClient.captured["json"]
assert body["model"] == "qwen3-embedding:0.6b"
assert body["input"][0].startswith(rs.QWEN3_QUERY_INSTRUCT)
assert body["input"][0].endswith("압력용기 수압시험")
assert "dimensions" not in body
@pytest.mark.asyncio
async def test_ollama_query_embed_mrl_dimensions(monkeypatch):
import httpx
monkeypatch.setattr(httpx, "AsyncClient", _FakeClient)
cfg = rs._resolve_backend("cand_qwen4m")
out = await rs._embed_query_via_ollama(cfg, "q")
assert _FakeClient.captured["json"]["dimensions"] == 1024
assert len(out) == 1024
@pytest.mark.asyncio
async def test_ollama_query_embed_failure_returns_none(monkeypatch):
import httpx
class _Boom(_FakeClient):
async def post(self, url, json=None):
raise httpx.ConnectError("down")
monkeypatch.setattr(httpx, "AsyncClient", _Boom)
cfg = rs._resolve_backend("cand_qwen06")
assert await rs._embed_query_via_ollama(cfg, "q") is None