From c3d237766db32aee99f6f1b3fb398407bd715fac Mon Sep 17 00:00:00 2001 From: hyungi Date: Fri, 12 Jun 2026 08:29:53 +0900 Subject: [PATCH] =?UTF-8?q?feat(search):=20Phase=202A=20E-1=20=E2=80=94=20?= =?UTF-8?q?Qwen=20=ED=9B=84=EB=B3=B4=203=EC=A2=85=20=EB=B0=B1=ED=95=84=20C?= =?UTF-8?q?LI=20+=20eval=20=EB=94=94=EC=8A=A4=ED=8C=A8=EC=B2=98=20?= =?UTF-8?q?=ED=99=95=EC=9E=A5=20(=EB=A7=88=EC=9D=B4=EA=B7=B8=20328~333)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 후보 섀도 테이블 6종(전부 vector 타입 — eval=exact scan 이라 인덱스 불요, halfvec 은 C-1 소관) - workers/phase2a_cand_backfill: resumable(NOT EXISTS)·배치 커밋·동결셋 한정(--doc/chunk-id-max), 문서/청크 입력 = production 경로 동일 구성 + plain - CANDIDATE_BACKEND_MAP += cand_qwen06/qwen4/qwen4m (embed_kind=ollama, 쿼리측 instruct prefix G-1 핀 문자열, qwen4m = dimensions 1024 MRL) - qwen4m 적재는 qwen4 에서 SQL 파생(subvector+l2_normalize) — 본 CLI 비대상 Co-Authored-By: Claude Fable 5 --- app/services/search/retrieval_service.py | 66 +++++++- app/workers/phase2a_cand_backfill.py | 142 ++++++++++++++++++ migrations/328_documents_cand_qwen06.sql | 8 + .../329_document_chunks_cand_qwen06.sql | 10 ++ migrations/330_documents_cand_qwen4.sql | 8 + migrations/331_document_chunks_cand_qwen4.sql | 10 ++ migrations/332_documents_cand_qwen4m.sql | 8 + .../333_document_chunks_cand_qwen4m.sql | 10 ++ tests/test_phase2a_backend.py | 96 ++++++++++++ 9 files changed, 357 insertions(+), 1 deletion(-) create mode 100644 app/workers/phase2a_cand_backfill.py create mode 100644 migrations/328_documents_cand_qwen06.sql create mode 100644 migrations/329_document_chunks_cand_qwen06.sql create mode 100644 migrations/330_documents_cand_qwen4.sql create mode 100644 migrations/331_document_chunks_cand_qwen4.sql create mode 100644 migrations/332_documents_cand_qwen4m.sql create mode 100644 migrations/333_document_chunks_cand_qwen4m.sql create mode 100644 tests/test_phase2a_backend.py diff --git a/app/services/search/retrieval_service.py b/app/services/search/retrieval_service.py index ac771ad..56a338b 100644 --- a/app/services/search/retrieval_service.py +++ b/app/services/search/retrieval_service.py @@ -63,8 +63,41 @@ CANDIDATE_BACKEND_MAP: dict[str, dict[str, str] | None] = { "chunks_table": "document_chunks_cand_snowflake_l_v2", "embed_endpoint": "http://embedding-cand-snowflake-l-v2:80/embed", }, + # ─── Phase 2A (embedding-phase2a-1, 2026-06-12): Qwen3-Embedding 후보 3종 ─── + # embed_kind="ollama" = /api/embed 호출 + 쿼리측 instruct prefix (비대칭 사용, + # G-1 fixture 실측: prefix 가 관련쌍 cos +0.016). 문서측은 backfill 이 plain 으로 적재. + # qwen4m = 4B 의 MRL 1024d (dimensions 옵션 — Ollama 가 truncate+재정규화 수행, G-1 실측). + "cand_qwen06": { + "docs_table": "documents_cand_qwen06", + "chunks_table": "document_chunks_cand_qwen06", + "embed_endpoint": "http://ollama:11434/api/embed", + "embed_kind": "ollama", + "embed_model": "qwen3-embedding:0.6b", + }, + "cand_qwen4": { + "docs_table": "documents_cand_qwen4", + "chunks_table": "document_chunks_cand_qwen4", + "embed_endpoint": "http://ollama:11434/api/embed", + "embed_kind": "ollama", + "embed_model": "qwen3-embedding:4b", + }, + "cand_qwen4m": { + "docs_table": "documents_cand_qwen4m", + "chunks_table": "document_chunks_cand_qwen4m", + "embed_endpoint": "http://ollama:11434/api/embed", + "embed_kind": "ollama", + "embed_model": "qwen3-embedding:4b", + "embed_dimensions": 1024, + }, } +# G-1 핀 고정 instruct 문자열 (inventory 2026-06-12-c 기록과 동일해야 함 — +# 문구 변경 = 저장=조회 불변식 위반과 동급. 쿼리 측 전용, 문서 적재는 plain). +QWEN3_QUERY_INSTRUCT = ( + "Instruct: Given a web search query, retrieve relevant passages that answer the query" + "\nQuery: " +) + # 2단계 gate (R2-B1) — SQL string interpolation 직전 final allowlist. _VALID_DOCS_TABLE = re.compile(r"^(documents|documents_cand_[a-z0-9_]+)$") # corpus_chunks = document_chunks WHERE in_corpus=true 뷰 (Hier-Decomp-1 c2 choke point). @@ -137,6 +170,34 @@ async def _embed_query_via_tei(endpoint: str, text_: str) -> list[float] | None: return None +async def _embed_query_via_ollama(cfg: dict, text_: str) -> list[float] | None: + """Phase 2A 후보 쿼리 임베딩 — Ollama /api/embed + 비대칭 instruct prefix. + + 쿼리 측 전용: QWEN3_QUERY_INSTRUCT 를 선두에 붙인다 (문서 적재 = plain). + embed_dimensions 지정(qwen4m) 시 Ollama dimensions 옵션 = MRL truncate+재정규화 + (G-1 fixture: 1024 출력 L2=1.0 실측). cache 미사용 — slug 별 분포 상이. + """ + if not text_: + return None + import httpx + body: dict = {"model": cfg["embed_model"], "input": [QWEN3_QUERY_INSTRUCT + text_]} + if cfg.get("embed_dimensions"): + body["dimensions"] = cfg["embed_dimensions"] + try: + async with httpx.AsyncClient(timeout=60.0) as c: + r = await c.post(cfg["embed_endpoint"], json=body) + r.raise_for_status() + embs = r.json().get("embeddings") + if not isinstance(embs, list) or not embs or not isinstance(embs[0], list): + raise ValueError("unexpected /api/embed shape") + return embs[0] + except Exception as exc: + logger.warning( + "candidate ollama embed failed model=%s err=%r", cfg.get("embed_model"), exc + ) + return None + + def _query_embed_key(text_: str) -> str: return hashlib.sha256(f"{text_}|bge-m3".encode("utf-8")).hexdigest() @@ -323,7 +384,10 @@ async def search_vector( else: docs_table = cfg["docs_table"] chunks_table = cfg["chunks_table"] - query_embedding = await _embed_query_via_tei(cfg["embed_endpoint"], query) + if cfg.get("embed_kind") == "ollama": + query_embedding = await _embed_query_via_ollama(cfg, query) + else: + query_embedding = await _embed_query_via_tei(cfg["embed_endpoint"], query) logger.info( "[embedding-dispatch] backend=%s docs_table=%s chunks_table=%s snapshot_doc_id_max=%s " diff --git a/app/workers/phase2a_cand_backfill.py b/app/workers/phase2a_cand_backfill.py new file mode 100644 index 0000000..4c734d2 --- /dev/null +++ b/app/workers/phase2a_cand_backfill.py @@ -0,0 +1,142 @@ +"""Phase 2A 후보 임베딩 백필 CLI (embedding-phase2a-1 E-1). + + docker compose exec -T fastapi python -m workers.phase2a_cand_backfill \ + --target qwen06 --doc-id-max 41944 --chunk-id-max 104140 [--batch 32] + +설계 원칙 (plan r3): + - resumable/idempotent: 대상 = NOT EXISTS(후보 테이블) — 중단/재실행 시 이어서. + 배치 단위 커밋. C-1 백필 게이트 = "후보 카운트 == 동결셋 카운트". + - 동결셋: id <= *_id_max AND 베이스라인 embedding IS NOT NULL (AND docs.deleted_at IS NULL). + cand 테이블은 동결 범위로만 INSERT (retrieval cand path 가 snapshot filter 를 안 타는 전제). + - 문서/청크 입력 = production 경로와 동일 구성(embed_worker._build_embed_input / + chunk_worker 의 [제목][섹션][본문]) + plain (instruct prefix 는 쿼리 측 전용 — G-1 불변식). + - 임베딩 = Ollama /api/embed 배치 호출 (G-1 fixture: 정규화 출력). + - qwen4m 은 본 CLI 대상이 아님 — qwen4 적재 후 SQL 파생(subvector+l2_normalize), plan E-1. +""" + +import argparse +import asyncio +import hashlib +import time + +import httpx +from sqlalchemy import text + +from core.database import async_session +from core.utils import setup_logger +from models.document import Document +from workers.embed_worker import _build_embed_input + +logger = setup_logger("phase2a_cand_backfill") + +OLLAMA_EMBED = "http://ollama:11434/api/embed" + +TARGETS = { + "qwen06": { + "model": "qwen3-embedding:0.6b", "dim": 1024, + "docs": "documents_cand_qwen06", "chunks": "document_chunks_cand_qwen06", + }, + "qwen4": { + "model": "qwen3-embedding:4b", "dim": 2560, + "docs": "documents_cand_qwen4", "chunks": "document_chunks_cand_qwen4", + }, +} + + +async def _embed_batch(client: httpx.AsyncClient, model: str, texts: list[str]) -> list[list[float]]: + r = await client.post(OLLAMA_EMBED, json={"model": model, "input": texts}, timeout=600) + r.raise_for_status() + embs = r.json()["embeddings"] + if len(embs) != len(texts): + raise RuntimeError(f"embed count mismatch: {len(embs)} != {len(texts)}") + return embs + + +async def backfill_docs(target: dict, doc_id_max: int, batch: int, http: httpx.AsyncClient) -> int: + total = 0 + while True: + async with async_session() as session: + rows = (await session.execute(text(f""" + SELECT d.id FROM documents d + WHERE d.id <= :m AND d.embedding IS NOT NULL AND d.deleted_at IS NULL + AND NOT EXISTS (SELECT 1 FROM {target['docs']} c WHERE c.doc_id = d.id) + ORDER BY d.id LIMIT :b + """), {"m": doc_id_max, "b": batch})).scalars().all() + if not rows: + break + docs = [(await session.get(Document, i)) for i in rows] + inputs = [_build_embed_input(d) for d in docs] + embs = await _embed_batch(http, target["model"], inputs) + for d, inp, e in zip(docs, inputs, embs): + await session.execute(text(f""" + INSERT INTO {target['docs']} (doc_id, embed_input_hash, embedding) + VALUES (:i, :h, cast(:e AS vector)) + ON CONFLICT (doc_id) DO NOTHING + """), {"i": d.id, "h": hashlib.sha256(inp.encode()).hexdigest()[:16], "e": str(e)}) + await session.commit() + total += len(rows) + if total % (batch * 10) < batch: + logger.info(f"[{target['docs']}] +{total} (last id={rows[-1]})") + return total + + +async def backfill_chunks(target: dict, chunk_id_max: int, batch: int, http: httpx.AsyncClient) -> int: + total = 0 + while True: + async with async_session() as session: + rows = (await session.execute(text(f""" + SELECT c.id, c.doc_id, c.chunk_index, c.section_title, c.text, d.title + FROM corpus_chunks c JOIN documents d ON d.id = c.doc_id + WHERE c.id <= :m AND c.embedding IS NOT NULL AND d.deleted_at IS NULL + AND NOT EXISTS (SELECT 1 FROM {target['chunks']} k WHERE k.id = c.id) + ORDER BY c.id LIMIT :b + """), {"m": chunk_id_max, "b": batch})).all() + if not rows: + break + inputs = [ + f"[제목] {r.title or ''}\n[섹션] {r.section_title or ''}\n[본문] {r.text}" + for r in rows + ] + embs = await _embed_batch(http, target["model"], inputs) + for r, e in zip(rows, embs): + await session.execute(text(f""" + INSERT INTO {target['chunks']} (id, doc_id, chunk_index, section_title, text, embedding) + VALUES (:i, :d, :x, :s, :t, cast(:e AS vector)) + ON CONFLICT (id) DO NOTHING + """), {"i": r.id, "d": r.doc_id, "x": r.chunk_index, + "s": r.section_title, "t": r.text, "e": str(e)}) + await session.commit() + total += len(rows) + if total % (batch * 10) < batch: + logger.info(f"[{target['chunks']}] +{total} (last id={rows[-1]})") + return total + + +async def run(target_key: str, doc_id_max: int, chunk_id_max: int, batch: int) -> None: + target = TARGETS[target_key] + start = time.monotonic() + async with httpx.AsyncClient() as http: + nd = await backfill_docs(target, doc_id_max, batch, http) + nc = await backfill_chunks(target, chunk_id_max, batch, http) + mins = (time.monotonic() - start) / 60 + async with async_session() as session: + cd = (await session.execute(text(f"SELECT count(*) FROM {target['docs']}"))).scalar_one() + cc = (await session.execute(text(f"SELECT count(*) FROM {target['chunks']}"))).scalar_one() + logger.info( + f"[{target_key}] 완료 — 이번 run docs +{nd} chunks +{nc} ({mins:.1f}분) · " + f"누적 docs {cd} / chunks {cc} (동결 게이트 = 베이스라인 동결셋 카운트와 일치 확인)" + ) + + +def main() -> None: + p = argparse.ArgumentParser(description="Phase 2A 후보 임베딩 백필 (resumable)") + p.add_argument("--target", required=True, choices=sorted(TARGETS)) + p.add_argument("--doc-id-max", type=int, required=True) + p.add_argument("--chunk-id-max", type=int, required=True) + p.add_argument("--batch", type=int, default=32) + a = p.parse_args() + asyncio.run(run(a.target, a.doc_id_max, a.chunk_id_max, a.batch)) + + +if __name__ == "__main__": + main() diff --git a/migrations/328_documents_cand_qwen06.sql b/migrations/328_documents_cand_qwen06.sql new file mode 100644 index 0000000..b955c9c --- /dev/null +++ b/migrations/328_documents_cand_qwen06.sql @@ -0,0 +1,8 @@ +-- Phase 2A (embedding-phase2a-1 E-1): 후보 임베딩 docs 섀도 테이블 (eval 전용, 단일 statement). +-- 평가 = exact scan 이라 벡터 인덱스 없음 (인덱스 전략 = C-1 컷오버 소관). +CREATE TABLE IF NOT EXISTS documents_cand_qwen06 ( + doc_id BIGINT PRIMARY KEY, + embed_input_hash TEXT, + embedding vector(1024) NOT NULL, + created_at TIMESTAMPTZ NOT NULL DEFAULT now() +); diff --git a/migrations/329_document_chunks_cand_qwen06.sql b/migrations/329_document_chunks_cand_qwen06.sql new file mode 100644 index 0000000..f99966f --- /dev/null +++ b/migrations/329_document_chunks_cand_qwen06.sql @@ -0,0 +1,10 @@ +-- Phase 2A (embedding-phase2a-1 E-1): 후보 임베딩 chunks 섀도 테이블 (eval 전용, 단일 statement). +CREATE TABLE IF NOT EXISTS document_chunks_cand_qwen06 ( + id BIGINT PRIMARY KEY, + doc_id BIGINT NOT NULL, + chunk_index INTEGER, + section_title TEXT, + text TEXT, + embedding vector(1024) NOT NULL, + created_at TIMESTAMPTZ NOT NULL DEFAULT now() +); diff --git a/migrations/330_documents_cand_qwen4.sql b/migrations/330_documents_cand_qwen4.sql new file mode 100644 index 0000000..c10df59 --- /dev/null +++ b/migrations/330_documents_cand_qwen4.sql @@ -0,0 +1,8 @@ +-- Phase 2A (embedding-phase2a-1 E-1): 후보 임베딩 docs 섀도 테이블 (eval 전용, 단일 statement). +-- 평가 = exact scan 이라 벡터 인덱스 없음 (인덱스 전략 = C-1 컷오버 소관). +CREATE TABLE IF NOT EXISTS documents_cand_qwen4 ( + doc_id BIGINT PRIMARY KEY, + embed_input_hash TEXT, + embedding vector(2560) NOT NULL, + created_at TIMESTAMPTZ NOT NULL DEFAULT now() +); diff --git a/migrations/331_document_chunks_cand_qwen4.sql b/migrations/331_document_chunks_cand_qwen4.sql new file mode 100644 index 0000000..e42fa25 --- /dev/null +++ b/migrations/331_document_chunks_cand_qwen4.sql @@ -0,0 +1,10 @@ +-- Phase 2A (embedding-phase2a-1 E-1): 후보 임베딩 chunks 섀도 테이블 (eval 전용, 단일 statement). +CREATE TABLE IF NOT EXISTS document_chunks_cand_qwen4 ( + id BIGINT PRIMARY KEY, + doc_id BIGINT NOT NULL, + chunk_index INTEGER, + section_title TEXT, + text TEXT, + embedding vector(2560) NOT NULL, + created_at TIMESTAMPTZ NOT NULL DEFAULT now() +); diff --git a/migrations/332_documents_cand_qwen4m.sql b/migrations/332_documents_cand_qwen4m.sql new file mode 100644 index 0000000..6e5d5de --- /dev/null +++ b/migrations/332_documents_cand_qwen4m.sql @@ -0,0 +1,8 @@ +-- Phase 2A (embedding-phase2a-1 E-1): 후보 임베딩 docs 섀도 테이블 (eval 전용, 단일 statement). +-- 평가 = exact scan 이라 벡터 인덱스 없음 (인덱스 전략 = C-1 컷오버 소관). +CREATE TABLE IF NOT EXISTS documents_cand_qwen4m ( + doc_id BIGINT PRIMARY KEY, + embed_input_hash TEXT, + embedding vector(1024) NOT NULL, + created_at TIMESTAMPTZ NOT NULL DEFAULT now() +); diff --git a/migrations/333_document_chunks_cand_qwen4m.sql b/migrations/333_document_chunks_cand_qwen4m.sql new file mode 100644 index 0000000..7c94001 --- /dev/null +++ b/migrations/333_document_chunks_cand_qwen4m.sql @@ -0,0 +1,10 @@ +-- Phase 2A (embedding-phase2a-1 E-1): 후보 임베딩 chunks 섀도 테이블 (eval 전용, 단일 statement). +CREATE TABLE IF NOT EXISTS document_chunks_cand_qwen4m ( + id BIGINT PRIMARY KEY, + doc_id BIGINT NOT NULL, + chunk_index INTEGER, + section_title TEXT, + text TEXT, + embedding vector(1024) NOT NULL, + created_at TIMESTAMPTZ NOT NULL DEFAULT now() +); diff --git a/tests/test_phase2a_backend.py b/tests/test_phase2a_backend.py new file mode 100644 index 0000000..0b4c220 --- /dev/null +++ b/tests/test_phase2a_backend.py @@ -0,0 +1,96 @@ +"""Phase 2A (embedding-phase2a-1) — Qwen 후보 디스패처/쿼리 임베딩 단위 테스트.""" + +from __future__ import annotations + +import pytest + +from services.search import retrieval_service as rs + + +def test_resolve_qwen_backends(): + for slug in ("cand_qwen06", "cand_qwen4", "cand_qwen4m"): + cfg = rs._resolve_backend(slug) + assert cfg["docs_table"].startswith("documents_cand_qwen") + assert cfg["chunks_table"].startswith("document_chunks_cand_qwen") + assert cfg["embed_kind"] == "ollama" + # 테이블명이 2단계 SQL allowlist 도 통과해야 함 (R2-B1) + assert rs._VALID_DOCS_TABLE.match(cfg["docs_table"]) + assert rs._VALID_CHUNKS_TABLE.match(cfg["chunks_table"]) + assert rs._resolve_backend("baseline") is None + with pytest.raises(ValueError): + rs._resolve_backend("cand_unknown") + + +def test_qwen4m_has_mrl_dimensions(): + assert rs._resolve_backend("cand_qwen4m")["embed_dimensions"] == 1024 + assert "embed_dimensions" not in rs._resolve_backend("cand_qwen4") + + +class _FakeResp: + def __init__(self, embs): + self._embs = embs + + def raise_for_status(self): + return None + + def json(self): + return {"embeddings": self._embs} + + +class _FakeClient: + """httpx.AsyncClient 대역 — post body 캡처.""" + + captured: dict = {} + + def __init__(self, *a, **k): + pass + + async def __aenter__(self): + return self + + async def __aexit__(self, *a): + return False + + async def post(self, url, json=None): + _FakeClient.captured = {"url": url, "json": json} + dim = (json or {}).get("dimensions") or 1024 + return _FakeResp([[0.1] * dim]) + + +@pytest.mark.asyncio +async def test_ollama_query_embed_applies_instruct_prefix(monkeypatch): + import httpx + + monkeypatch.setattr(httpx, "AsyncClient", _FakeClient) + cfg = rs._resolve_backend("cand_qwen06") + out = await rs._embed_query_via_ollama(cfg, "압력용기 수압시험") + assert out is not None and len(out) == 1024 + body = _FakeClient.captured["json"] + assert body["model"] == "qwen3-embedding:0.6b" + assert body["input"][0].startswith(rs.QWEN3_QUERY_INSTRUCT) + assert body["input"][0].endswith("압력용기 수압시험") + assert "dimensions" not in body + + +@pytest.mark.asyncio +async def test_ollama_query_embed_mrl_dimensions(monkeypatch): + import httpx + + monkeypatch.setattr(httpx, "AsyncClient", _FakeClient) + cfg = rs._resolve_backend("cand_qwen4m") + out = await rs._embed_query_via_ollama(cfg, "q") + assert _FakeClient.captured["json"]["dimensions"] == 1024 + assert len(out) == 1024 + + +@pytest.mark.asyncio +async def test_ollama_query_embed_failure_returns_none(monkeypatch): + import httpx + + class _Boom(_FakeClient): + async def post(self, url, json=None): + raise httpx.ConnectError("down") + + monkeypatch.setattr(httpx, "AsyncClient", _Boom) + cfg = rs._resolve_backend("cand_qwen06") + assert await rs._embed_query_via_ollama(cfg, "q") is None