Files
hyungi_document_server/app/services/search/react_loop.py
T
hyungi 51c3f6df10 feat(search): /ask/react endpoint with Qwen native tool calling ReAct loop
PR-DocSrv-Ask-ToolCalling-ReAct-1 — Qwen3.6-27B-8bit 의 native tool calling
으로 ReAct loop 도입. 기존 /api/search/ask 무수정. 트랙 B (frontend /ask SSE)
와 파일 단위 충돌 0 (search.py 의 ask() 함수 line diff = 0, 순수 추가).

핵심 invariant:
- 별 endpoint /api/search/ask/react (qwen-macbook only, implicit opt-in)
- MacBook unavailable 시 HTTP 503 + error_reason=macbook_unavailable.
  Gemma 자동 fallback X (정정 4 의 연장)

G0 (구현 전 hard gate, plan b-velvety-hare.md):
- G0-1 fixture (tests/fixtures/qwen_tool_call_response.json): 실제 mlx-vlm
  응답 박제. shape = OpenAI 표준 호환 (choices[0].message.tool_calls +
  function.arguments JSON string). generate_with_tools() 가 본 shape 기준 구현.
- G0-2 counter semantics: max_tool_rounds=2 + max_llm_calls=3 + search_exec_max=2.
  마지막 LLM 호출은 tool_choice="none" + system instruction 으로 final 강제.
- G0-3 trace exposure: default response 의 debug_trace=null. debug=true 시만
  채움. server log 에는 항상 round 기록.

backends.py (193 → 261줄):
- QwenMacBookBackend.generate_with_tools(messages, tools, tool_choice)
  신규 method. 기존 generate() 무수정. BackendUnavailable 처리 동일.

react_loop.py 신규 (275줄):
- agentic_ask_loop(session, query, *, backend, max_tool_rounds, debug)
- tool round 안에서 run_search 호출, results dedup by id, final round 강제,
  partial=True 조건 (final content 빈 경우)

search.py (+82줄):
- POST /api/search/ask/react + AskReactRequest/Response schema
- BackendUnavailable → JSONResponse(503, error_reason=macbook_unavailable)

config.yaml + config.py:
- search.ask.react: { enabled, max_tool_rounds=2, search_tool_limit=5,
  search_tool_mode=hybrid }

tests (566줄, 18 신규 + 23 회귀 모두 PASS):
- test_react_loop.py 13건: G0-1 fixture shape / G0-2 counter cap / G0-3 trace
  exposure / BackendUnavailable propagation / sources dedup
- test_search_ask_react_endpoint.py 5건: 503 + run_search 호출 0 / 정상 200 /
  debug=true trace 노출 / max rounds partial
- 회귀 (test_ask_eval_auth 9 + test_search_ask_macbook_503 5 +
  test_backend_dispatcher 9) 모두 PASS

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-22 13:43:47 +00:00

276 lines
9.2 KiB
Python

"""PR-DocSrv-Ask-ToolCalling-ReAct-1: Qwen native tool calling 로 ReAct loop.
G0-2 counter semantics ([[b-velvety-hare]] § Pre-Implementation Gate):
- max_tool_rounds = 2 (tool 호출 round cap)
- max_llm_calls = 3 (= max_tool_rounds + 1, final round 포함)
- search_exec_max = max_tool_rounds (round 당 search 1회 이상 가능 — 모델 결정)
- 마지막 LLM call 은 tool_choice="none" + system instruction 으로 final answer 강제
G0-1 fixture (tests/fixtures/qwen_tool_call_response.json) 기준 parsing —
mlx-vlm 의 OpenAI 표준 호환, `tool_calls[].function.arguments` 는 JSON string.
G0-3 trace exposure:
- `debug=True` 시만 `debug_trace` 채움. server log 에는 항상 round 기록.
- default response = `debug_trace=None`.
Invariant (정정 4 의 자연 연장):
- backend = `QwenMacBookBackend` only. Gemma 자동 fallback 금지.
- `BackendUnavailable` 은 호출자 (search.py) 가 503 + `error_reason=macbook_unavailable`
로 매핑.
"""
from __future__ import annotations
import json
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any
from sqlalchemy.ext.asyncio import AsyncSession
from core.config import settings
from core.utils import setup_logger
from services.llm.backends import QwenMacBookBackend
from services.search.search_pipeline import run_search
logger = setup_logger("react_loop")
_PROMPT_PATH = Path(__file__).resolve().parents[2] / "prompts" / "react_ask.txt"
_FINAL_INSTRUCTION = (
"이제는 검색 도구를 더 이상 호출하지 마시고, 위 evidence 만으로 "
"한국어 최종 답을 작성하세요."
)
_TOOLS = [
{
"type": "function",
"function": {
"name": "search",
"description": "사내 문서 청크 검색. q 만 넘기면 hybrid 모드로 limit 건 반환.",
"parameters": {
"type": "object",
"properties": {
"q": {
"type": "string",
"description": "검색 질의문 (한국어 가능)",
},
},
"required": ["q"],
},
},
}
]
@dataclass
class ReactResult:
final_answer: str
iterations: int
partial: bool
sources: list[dict[str, Any]] = field(default_factory=list)
debug_trace: list[dict[str, Any]] | None = None
def _load_system_prompt() -> str:
try:
return _PROMPT_PATH.read_text(encoding="utf-8")
except OSError:
logger.warning("react_ask.txt missing path=%s — fallback prompt", _PROMPT_PATH)
return (
"당신은 사내 문서 자료를 기반으로 정확한 한국어 답변을 제공하는 비서입니다. "
"필요하면 `search` 도구를 호출해 evidence 를 모으고, 충분하다 판단되면 "
"최종 답을 작성하세요. 근거 없는 추측은 피하세요."
)
def _result_payload(pr, *, limit: int) -> tuple[str, list[dict[str, Any]]]:
"""run_search() PipelineResult → (LLM-side JSON string, sources-side dict list).
LLM-side: snippet 600자 컷, score / title / doc_id 포함.
Sources-side: snippet 제외, id / doc_id / title / score 만.
"""
items_llm: list[dict[str, Any]] = []
items_src: list[dict[str, Any]] = []
for r in (pr.results or [])[:limit]:
rid = getattr(r, "id", None) or getattr(r, "chunk_id", None)
doc_id = getattr(r, "doc_id", None)
title = getattr(r, "title", "") or ""
score = getattr(r, "score", None)
snippet = (getattr(r, "snippet", "") or getattr(r, "text", "") or "")[:600]
items_llm.append(
{
"id": rid,
"doc_id": doc_id,
"title": title,
"snippet": snippet,
"score": score,
}
)
items_src.append(
{"id": rid, "doc_id": doc_id, "title": title, "score": score}
)
return (
json.dumps({"results": items_llm, "count": len(items_llm)}, ensure_ascii=False),
items_src,
)
async def agentic_ask_loop(
session: AsyncSession,
query: str,
*,
backend: QwenMacBookBackend,
max_tool_rounds: int | None = None,
debug: bool = False,
) -> ReactResult:
"""ReAct loop entry point.
Args:
session: AsyncSession (caller-managed)
query: 사용자 원본 질의
backend: QwenMacBookBackend instance (qwen-macbook only — Gemma 미지원)
max_tool_rounds: None 시 config.search.ask.react.max_tool_rounds
debug: True 시 `debug_trace` 채움
"""
cfg = settings.search.ask.react
if max_tool_rounds is None:
max_tool_rounds = cfg.max_tool_rounds
timeout_read_s = settings.search.ask.backend.timeout_read_s
limit = cfg.search_tool_limit
mode = cfg.search_tool_mode
messages: list[dict] = [
{"role": "system", "content": _load_system_prompt()},
{"role": "user", "content": query},
]
sources: list[dict[str, Any]] = []
seen_ids: set[Any] = set()
trace: list[dict[str, Any]] = []
# Tool rounds — 최대 max_tool_rounds 회 (LLM call #1 .. #max_tool_rounds)
for round_idx in range(max_tool_rounds):
msg = await backend.generate_with_tools(
messages,
_TOOLS,
tool_choice="auto",
timeout_read_s=timeout_read_s,
)
tool_calls = msg.get("tool_calls") or []
trace.append(
{
"phase": "tool_round",
"round": round_idx,
"tool_call_count": len(tool_calls),
"content_present": bool(msg.get("content")),
}
)
logger.info(
"react_loop round=%d tool_calls=%d content=%s",
round_idx,
len(tool_calls),
"yes" if msg.get("content") else "no",
)
if not tool_calls:
# LLM 이 tool 호출 안 함 → 종합문 직접 반환 (early exit)
content = msg.get("content") or ""
return ReactResult(
final_answer=content,
iterations=round_idx + 1,
partial=not bool(content),
sources=sources,
debug_trace=trace if debug else None,
)
# assistant message (tool_calls 포함) 추가
messages.append(
{
"role": "assistant",
"content": msg.get("content"),
"tool_calls": tool_calls,
}
)
# 각 tool call 실행
for tc in tool_calls:
fn = tc.get("function") or {}
tc_id = tc.get("id") or ""
fn_name = fn.get("name")
if fn_name != "search":
messages.append(
{
"role": "tool",
"tool_call_id": tc_id,
"content": json.dumps(
{"error": f"unknown tool {fn_name!r}"},
ensure_ascii=False,
),
}
)
trace.append({"phase": "tool_unknown", "name": fn_name})
continue
try:
args = json.loads(fn.get("arguments") or "{}")
except json.JSONDecodeError:
args = {}
q_arg = (args.get("q") or "").strip() or query
pr = await run_search(
session,
q_arg,
mode=mode,
limit=limit,
rerank=True,
analyze=False,
)
tool_content, round_sources = _result_payload(pr, limit=limit)
for s in round_sources:
sid = s.get("id")
if sid is not None and sid in seen_ids:
continue
if sid is not None:
seen_ids.add(sid)
sources.append(s)
messages.append(
{
"role": "tool",
"tool_call_id": tc_id,
"content": tool_content,
}
)
trace.append(
{
"phase": "search",
"q": q_arg,
"result_count": len(pr.results or []),
}
)
# Final round — LLM call #(max_tool_rounds + 1). tool_choice="none" 강제
messages.append({"role": "system", "content": _FINAL_INSTRUCTION})
final_msg = await backend.generate_with_tools(
messages,
tools=[],
tool_choice="none",
timeout_read_s=timeout_read_s,
)
final_content = final_msg.get("content") or ""
trace.append(
{
"phase": "final",
"content_present": bool(final_content),
"tool_calls_ignored": len(final_msg.get("tool_calls") or []),
}
)
logger.info(
"react_loop final content=%s tool_calls_ignored=%d",
"yes" if final_content else "no",
len(final_msg.get("tool_calls") or []),
)
return ReactResult(
final_answer=final_content,
iterations=max_tool_rounds,
partial=not bool(final_content),
sources=sources,
debug_trace=trace if debug else None,
)