Files
hyungi_document_server/app/services/search/react_loop.py
hyungi 6a85087b83 feat(eid): 이드 persona substrate W2~W4 — DS compose·약점진단·egress 코드층 박탈
전 로컬 LLM 관통 '이드' persona substrate 의 Document Server 측 빌드(W2~W4).
설계 = PKM eid-persona-substrate(r1~r3 수렴) / impl = eid-persona-impl.

W2 — compose + 표면 배선:
- app/eid/compose.py: persona→rules→overlay→task 단일 system 문자열 + 정적 ROUTE_MAP
  (런타임 sniffing 아님) + rules 부재 fail-loud · persona 부재 quiet · overflow fail-loud.
- 자유-prose 3 표면(react_ask·study_subject_note·study_question_explanation) 중복 정체성·
  generic 정책 trim + compose 배선(AIClient 에 additive system 파라미터). 도메인 calibration 보존.
- STRICT JSON 기계류(briefing_comparative·digest_topic)는 persona-ZERO 동결(불변식 #3).
- app/prompts/substrate/: persona(외부 컴파일 산출물 vendor) + rules(생성 가드 서브셋) + overlay 5.

W3 — migration + 워커 + study_diagnosis:
- migration 301~305: eid_* append-only 원장(약점/복습초안/회고) + approval_requests(가변 큐) + 일정 파생뷰 2.
- app/workers/study_weakness.py: study_question_progress.pattern_state 집계로 약점 derived 산출
  (LLM 0) + bounded tier(watch/review/focus). nightly cron.
- study_diagnosis 표면: 최신 스냅샷을 코치 언어로 번역(약점 판정은 코드, LLM 은 블록 값만 인용).

W4-1 — egress 코드층 박탈:
- app/eid/ai.py EidAIClient: 이드 표면 = call_primary(내부 MLX) only. 외부 LLM fallback 경로
  구조적 봉쇄(call_fallback raise · 자동 fallback 제거 · 외부 endpoint 차단). egress 워커는 분리 유지.

load-bearing 정정 3(환경 grounding 강제, 설계 회귀 아님):
- rules = 운영 ruleset 전체 → 생성 가드 서브셋(HTML 산출물 룰이 study task 와 충돌).
- append-only = REVOKE → CREATE RULE DO INSTEAD NOTHING(단일 owner role 은 REVOKE 무효 +
  migration 검증기가 plpgsql BEGIN 거부) + actor/source_* NOT NULL 스탬프.
- 이드 LLM 봉쇄 = path discipline → EidAIClient 구조화.

검증: eid 순수 단위테스트 30 통과 + py_compile + migration 검증기 모사 + egress 적대감사 COMPLETE.
DB/LLM/httpx 의존 테스트(append-only RULE·EidAIClient·E2E)는 staging(Docker) 가동.
W4-2 네트워크 belt 은 조건부 보류(코드층 1차 충분, P0-3② 원격 실측 후 hard-gate 시 승격).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-07 15:13:20 +09:00

283 lines
9.5 KiB
Python

"""PR-DocSrv-Ask-ToolCalling-ReAct-1: Qwen native tool calling 로 ReAct loop.
G0-2 counter semantics ([[b-velvety-hare]] § Pre-Implementation Gate):
- max_tool_rounds = 2 (tool 호출 round cap)
- max_llm_calls = 3 (= max_tool_rounds + 1, final round 포함)
- search_exec_max = max_tool_rounds (round 당 search 1회 이상 가능 — 모델 결정)
- 마지막 LLM call 은 tool_choice="none" + system instruction 으로 final answer 강제
G0-1 fixture (tests/fixtures/qwen_tool_call_response.json) 기준 parsing —
mlx-vlm 의 OpenAI 표준 호환, `tool_calls[].function.arguments` 는 JSON string.
G0-3 trace exposure:
- `debug=True` 시만 `debug_trace` 채움. server log 에는 항상 round 기록.
- default response = `debug_trace=None`.
Invariant (정정 4 의 자연 연장):
- backend = `QwenMacBookBackend` only. Gemma 자동 fallback 금지.
- `BackendUnavailable` 은 호출자 (search.py) 가 503 + `error_reason=macbook_unavailable`
로 매핑.
"""
from __future__ import annotations
import json
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any
from sqlalchemy.ext.asyncio import AsyncSession
from core.config import settings
from core.utils import setup_logger
from eid.compose import compose
from services.llm.backends import QwenMacBookBackend
from services.search.search_pipeline import run_search
logger = setup_logger("react_loop")
_PROMPT_PATH = Path(__file__).resolve().parents[2] / "prompts" / "react_ask.txt"
_FINAL_INSTRUCTION = (
"이제는 검색 도구를 더 이상 호출하지 마시고, 위 evidence 만으로 "
"한국어 최종 답을 작성하세요."
)
_TOOLS = [
{
"type": "function",
"function": {
"name": "search",
"description": "사내 문서 청크 검색. q 만 넘기면 hybrid 모드로 limit 건 반환.",
"parameters": {
"type": "object",
"properties": {
"q": {
"type": "string",
"description": "검색 질의문 (한국어 가능)",
},
},
"required": ["q"],
},
},
}
]
@dataclass
class ReactResult:
final_answer: str
iterations: int
partial: bool
sources: list[dict[str, Any]] = field(default_factory=list)
debug_trace: list[dict[str, Any]] | None = None
def _load_react_task() -> str:
"""react_ask 표면 고유 지시(task 층). 정체성·근거정책은 substrate(persona/rules) 소관 — 여기엔 검색루프 mechanics 만."""
try:
return _PROMPT_PATH.read_text(encoding="utf-8")
except OSError:
logger.warning("react_ask.txt missing path=%s — fallback task", _PROMPT_PATH)
return (
"작업 원칙: 필요하면 `search` 도구를 호출해 evidence 를 모으고(최대 2회), "
"충분하다 판단되면 그 evidence 만으로 한국어 최종 답을 작성하세요. "
"출처는 sources 필드로 별도 노출됩니다."
)
def _load_system_prompt() -> str:
"""이드 substrate(persona → rules) + react_ask task 합본 system 문자열 (W2-1 compose)."""
return compose("react_ask", task=_load_react_task())
def _result_payload(pr, *, limit: int) -> tuple[str, list[dict[str, Any]]]:
"""run_search() PipelineResult → (LLM-side JSON string, sources-side dict list).
LLM-side: snippet 600자 컷, score / title / doc_id 포함.
Sources-side: snippet 제외, id / doc_id / title / score 만.
"""
items_llm: list[dict[str, Any]] = []
items_src: list[dict[str, Any]] = []
for r in (pr.results or [])[:limit]:
rid = getattr(r, "id", None) or getattr(r, "chunk_id", None)
doc_id = getattr(r, "doc_id", None)
title = getattr(r, "title", "") or ""
score = getattr(r, "score", None)
snippet = (getattr(r, "snippet", "") or getattr(r, "text", "") or "")[:600]
items_llm.append(
{
"id": rid,
"doc_id": doc_id,
"title": title,
"snippet": snippet,
"score": score,
}
)
items_src.append(
{"id": rid, "doc_id": doc_id, "title": title, "score": score}
)
return (
json.dumps({"results": items_llm, "count": len(items_llm)}, ensure_ascii=False),
items_src,
)
async def agentic_ask_loop(
session: AsyncSession,
query: str,
*,
backend: QwenMacBookBackend,
max_tool_rounds: int | None = None,
debug: bool = False,
) -> ReactResult:
"""ReAct loop entry point.
Args:
session: AsyncSession (caller-managed)
query: 사용자 원본 질의
backend: QwenMacBookBackend instance (qwen-macbook only — Gemma 미지원)
max_tool_rounds: None 시 config.search.ask.react.max_tool_rounds
debug: True 시 `debug_trace` 채움
"""
cfg = settings.search.ask.react
if max_tool_rounds is None:
max_tool_rounds = cfg.max_tool_rounds
timeout_read_s = settings.search.ask.backend.timeout_read_s
limit = cfg.search_tool_limit
mode = cfg.search_tool_mode
messages: list[dict] = [
{"role": "system", "content": _load_system_prompt()},
{"role": "user", "content": query},
]
sources: list[dict[str, Any]] = []
seen_ids: set[Any] = set()
trace: list[dict[str, Any]] = []
# Tool rounds — 최대 max_tool_rounds 회 (LLM call #1 .. #max_tool_rounds)
for round_idx in range(max_tool_rounds):
msg = await backend.generate_with_tools(
messages,
_TOOLS,
tool_choice="auto",
timeout_read_s=timeout_read_s,
)
tool_calls = msg.get("tool_calls") or []
trace.append(
{
"phase": "tool_round",
"round": round_idx,
"tool_call_count": len(tool_calls),
"content_present": bool(msg.get("content")),
}
)
logger.info(
"react_loop round=%d tool_calls=%d content=%s",
round_idx,
len(tool_calls),
"yes" if msg.get("content") else "no",
)
if not tool_calls:
# LLM 이 tool 호출 안 함 → 종합문 직접 반환 (early exit)
content = msg.get("content") or ""
return ReactResult(
final_answer=content,
iterations=round_idx + 1,
partial=not bool(content),
sources=sources,
debug_trace=trace if debug else None,
)
# assistant message (tool_calls 포함) 추가
messages.append(
{
"role": "assistant",
"content": msg.get("content"),
"tool_calls": tool_calls,
}
)
# 각 tool call 실행
for tc in tool_calls:
fn = tc.get("function") or {}
tc_id = tc.get("id") or ""
fn_name = fn.get("name")
if fn_name != "search":
messages.append(
{
"role": "tool",
"tool_call_id": tc_id,
"content": json.dumps(
{"error": f"unknown tool {fn_name!r}"},
ensure_ascii=False,
),
}
)
trace.append({"phase": "tool_unknown", "name": fn_name})
continue
try:
args = json.loads(fn.get("arguments") or "{}")
except json.JSONDecodeError:
args = {}
q_arg = (args.get("q") or "").strip() or query
pr = await run_search(
session,
q_arg,
mode=mode,
limit=limit,
rerank=True,
analyze=False,
)
tool_content, round_sources = _result_payload(pr, limit=limit)
for s in round_sources:
sid = s.get("id")
if sid is not None and sid in seen_ids:
continue
if sid is not None:
seen_ids.add(sid)
sources.append(s)
messages.append(
{
"role": "tool",
"tool_call_id": tc_id,
"content": tool_content,
}
)
trace.append(
{
"phase": "search",
"q": q_arg,
"result_count": len(pr.results or []),
}
)
# Final round — LLM call #(max_tool_rounds + 1). tool_choice="none" 강제
messages.append({"role": "system", "content": _FINAL_INSTRUCTION})
final_msg = await backend.generate_with_tools(
messages,
tools=[],
tool_choice="none",
timeout_read_s=timeout_read_s,
)
final_content = final_msg.get("content") or ""
trace.append(
{
"phase": "final",
"content_present": bool(final_content),
"tool_calls_ignored": len(final_msg.get("tool_calls") or []),
}
)
logger.info(
"react_loop final content=%s tool_calls_ignored=%d",
"yes" if final_content else "no",
len(final_msg.get("tool_calls") or []),
)
return ReactResult(
final_answer=final_content,
iterations=max_tool_rounds,
partial=not bool(final_content),
sources=sources,
debug_trace=trace if debug else None,
)