Compare commits

..

1 Commits

Author SHA1 Message Date
hyungi 250896cdfa feat(eid): deep 모드 = ReAct 자동검색 + 근거 카드 (ds-eid-ask-absorb P1)
- deep 분기 _eid_chat_deep: 비생성 probe → phase:searching → agentic_ask_loop
  (tool_choice=auto 가 검색 여부 자율 판단, 검색 불요는 early-exit 대화) → final_answer
  + eid_sources envelope → DONE. heartbeat {phase:ping}(~10s, 프록시 idle timeout 차단)
  · mid-stream BackendUnavailable → in-stream error envelope · disconnect 시 task.cancel()
  + await(고아화·27B 점유 방지).
- daily = call_stream 무변경(맥미니 대화). deep = 맥북 27B ReAct (tool calling 27B 전용,
  맥미니 26B token-leak 미검증). 멀티턴 = 메시지 단독 처리(agentic_ask_loop query: str,
  history 2단계 백로그).
- EidEvidenceCard.svelte 접이식 근거 카드(sources 순서번호·제목·점수) + 프론트 SSE 파서
  확장(ping/searching/error/eid_sources) + 검색 중 표시 + 이력 보존.
- 테스트: deep 4건(검색성/대화성/probe-503/mid-stream-error) + 기존 call_stream 회귀 daily
  로 이전 = 29 passed.
- 동반(이전 eid-chat 세션 미커밋): /api/eid/status endpoint + llm_gate.gate_status +
  test_eid_status (채팅 대기 UI 의 '대기 vs 고장' 구분용, 5 passed).

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-06-11 14:51:00 +09:00
16 changed files with 677 additions and 546 deletions
-44
View File
@@ -134,38 +134,6 @@ def _fix_json_string_escapes(s: str) -> str:
i += 1
return "".join(out)
def is_deferrable_error(exc: Exception) -> bool:
"""deep(맥북 M5 Max) 호출 실패가 '보류(StageDeferred)' 대상인지 분류 (ds-macbook-offload-1).
보류 = 맥북 일시 불가 신호:
- HTTP 503 (라우터 upstream_cold / editor_busy / warming — no-silent-fallback 계약)
- HTTP 502/504 (라우터가 upstream 연결 실패·생성 도중 절단을 502 로 변환 —
llm_router.py 실측 4곳. 맥북 sleep 절단이 라우터 경유 토폴로지에선 이걸로 표면화)
- httpx.TransportError 전계열 (ConnectError·ReadError·RemoteProtocolError +
ConnectTimeout·ReadTimeout 등) — 라우터 자체 불가 / DS↔라우터 구간 절단.
그 외(400/500, 파싱/검증 오류 등)는 보류가 아니라 호출자의 기존 실패 경로.
"""
if isinstance(exc, httpx.HTTPStatusError):
return exc.response.status_code in (502, 503, 504)
return isinstance(exc, httpx.TransportError)
async def call_deep_or_defer(client: "AIClient", prompt: str, system: str | None = None) -> str:
"""call_deep + 보류 변환 — 맥북 불가(503/연결/절단)는 StageDeferred 로 raise.
deep_summary_worker / summarize_worker(drain) 가 공유. StageDeferred 는 queue_consumer/
queue_drain 이 attempts 미소모 + deferred_until 백오프로 처리한다 (sleep-안전 불변식).
"""
from models.queue import StageDeferred
try:
return await client.call_deep(prompt, system=system)
except Exception as exc:
if is_deferrable_error(exc):
raise StageDeferred(f"macbook_unavailable:{type(exc).__name__}") from exc
raise
# 프롬프트 로딩
PROMPTS_DIR = Path(__file__).parent.parent / "prompts"
@@ -217,18 +185,6 @@ class AIClient:
"""triage/primary 실패 시 최후 방어선. Claude Sonnet 4 API (config.yaml ai.models.fallback) — PR #20 이후 swap 완료."""
return await self._request(self.ai.fallback, prompt)
async def call_deep(self, prompt: str, system: str | None = None) -> str:
"""심층 전용 — 맥북 M5 Max Qwen3.6-27B (config.yaml ai.models.deep, ds-macbook-offload-1).
llm-router :8890 경유(model=qwen-macbook alias) — 라우터의 wake preflight(~24s)·
editor_busy 가드를 재사용한다. 맥미니 mlx gate 와 무관(게이트는 맥미니 보호 목적)이라
gate 없이 호출. 자동 cloud/맥미니 폴백 없음 — 실패는 그대로 전파하고 보류 판단은
호출자가 is_deferrable_error() 로 한다. 슬롯 부재 시 primary 로 처리(방어적 —
호출자가 보통 슬롯 유무를 먼저 분기).
"""
cfg = self.ai.deep or self.ai.primary
return await self._request(cfg, prompt, system=system)
# ─── Legacy API (classify_worker 교체 시 제거 예정) ───────────────────
async def classify(self, text: str) -> dict:
+157 -4
View File
@@ -18,24 +18,58 @@ backend 실패는 /api/search/ask 와 동일 shape 의 503 + error_reason 매핑
from __future__ import annotations
import asyncio
import json
from collections.abc import AsyncIterator
from typing import Annotated, Literal
import httpx
from fastapi import APIRouter, Depends
from fastapi.responses import JSONResponse, StreamingResponse
from pydantic import BaseModel, Field, field_validator, model_validator
from sqlalchemy.ext.asyncio import AsyncSession
from core.auth import get_current_user
from core.database import get_session
from core.utils import setup_logger
from eid import compose as eid_compose
from eid.ai import EidAIClient
from models.user import User
from services.llm.backends import BackendUnavailable
from services.llm.backends import BackendUnavailable, _router_url, get_backend
from services.search import llm_gate
from services.search.react_loop import agentic_ask_loop
logger = setup_logger("eid_chat")
router = APIRouter()
# ── ds-eid-ask-absorb P1: deep 모드 = ReAct 자동검색 (qwen-macbook 27B) ──
# 비생성 reachability probe — router 도달만 확인(coarse). 27B(맥북) 자체 미가용은
# 첫 generate_with_tools 호출의 BackendUnavailable → mid-stream error envelope 로 커버
# (plan: probe 정밀도 불필요, TOCTOU 는 in-stream error 가 처리). ~2s 타임아웃·생성 슬롯 비점유.
_DEEP_PROBE_TIMEOUT = httpx.Timeout(connect=2.0, read=2.0, write=2.0, pool=2.0)
# heartbeat: ReAct 다회 tool call 시 수십초 무출력 → 프록시 idle timeout 차단.
# `{"phase":"ping"}` no-op 이벤트 (프론트 envelope 파서가 자연 스킵 — `: ping` comment 는
# POST SSE fetch 파서가 처리 보장 안 됨).
_HEARTBEAT_INTERVAL_S = 10.0
async def _probe_router_reachable() -> bool:
"""router(:8890) /v1/models GET — 도달 확인(비생성). 실패/비200 = 미가용."""
url = f"{_router_url().rstrip('/')}/v1/models"
try:
async with httpx.AsyncClient(timeout=_DEEP_PROBE_TIMEOUT) as client:
resp = await client.get(url)
return resp.status_code == 200
except Exception:
return False
def _sse(obj: dict) -> bytes:
"""SSE 이벤트 1건 — data: <json>\\n\\n. final_answer 는 OpenAI 호환 choices.delta.content
로, sources/phase 는 별 envelope 키로(프론트가 분기). model/usage 머신 메타 미포함."""
return b"data: " + json.dumps(obj, ensure_ascii=False).encode("utf-8") + b"\n\n"
class ChatMessage(BaseModel):
"""채팅 턴 1건. role=system 은 Literal 밖 → 422 (system 합본은 서버 compose 만 주입)."""
@@ -71,16 +105,130 @@ class ChatRequest(BaseModel):
return self
@router.get("/status")
async def eid_status(
user: Annotated[User, Depends(get_current_user)],
):
"""이드 backend 점유 상태 스냅샷 — GET /api/eid/status (UI 의 "대기 vs 고장" 구분용).
daily(맥미니 MLX) 의 DS 프로세스 내부 llm_gate 점유만 본다 — 외부 소비자
(맥미니 자체 derived-worker·Hermes 등)의 endpoint 점유는 미포착.
따라서 busy=true 는 확실(지금 줄이 있다), false 는 근사(외부 점유 가능성 잔존).
가벼움 보장: DB 0 / LLM 0 / 본문 로깅 0 — 폴링 대상으로 안전.
자동 fallback 판단 근거로 쓰지 않는다 (모드 전환 = 명시 버튼만, 정책).
"""
snap = llm_gate.gate_status()
inflight = bool(snap["inflight"])
waiters = int(snap["waiters"])
return {
"daily": {
"busy": inflight or waiters > 0,
"inflight": inflight,
"waiters": waiters,
}
}
def _backend_unavailable_response(body: ChatRequest, reason: str, backend_name: str) -> JSONResponse:
"""스트림 시작 전 27B 미가용 → ask 컨벤션과 동일 shape 503 (자동 fallback 0)."""
logger.warning(
"eid_chat backend_unavailable mode=%s turns=%d status=503 reason=%s",
body.mode, len(body.messages), reason,
)
return JSONResponse(
status_code=503,
content={
"error": "backend_unavailable",
"error_reason": reason,
"backend_requested": backend_name,
"detail": (
"심층 엔진(검색)이 일시적으로 응답할 수 없습니다. "
"잠시 후 다시 시도하거나 일상 모드로 물어보세요."
),
},
)
async def _eid_chat_deep(body: ChatRequest, session: AsyncSession) -> StreamingResponse | JSONResponse:
"""deep 모드 = ReAct 자동검색. ReAct(`tool_choice=auto`)가 검색 여부를 LLM 자율 판단 —
검색 불요 질문은 early-exit 으로 대화 답변. substrate(persona+rules+react_ask task)는
agentic_ask_loop 내부 compose("react_ask") 가 주입(evidence-first 자동 상속).
멀티턴 = 1단계는 마지막 user 메시지 단독 처리(agentic_ask_loop 가 query: str — history
미지원). 후속 질문 대명사 해소는 2단계 백로그.
"""
# ① 첫 SSE 바이트(=HTTP 200 확정) 전 비생성 probe — router 도달 실패 시 503 (재매핑 가능 구간)
if not await _probe_router_reachable():
return _backend_unavailable_response(body, "macbook_unavailable", "qwen-macbook")
query = body.messages[-1].content # 메시지 단독 처리 (마지막 user 턴)
backend = get_backend("qwen-macbook")
async def _stream() -> AsyncIterator[bytes]:
# ② phase:searching 방출 = HTTP 200 확정. 이후 미가용은 503 불가 → in-stream error.
yield _sse({"phase": "searching"})
task = asyncio.create_task(agentic_ask_loop(session, query, backend=backend))
try:
# heartbeat: task 미완 동안 ~10s 마다 ping (shield 로 wait_for 취소가 task 안 죽임)
while not task.done():
try:
await asyncio.wait_for(asyncio.shield(task), timeout=_HEARTBEAT_INTERVAL_S)
except asyncio.TimeoutError:
yield _sse({"phase": "ping"})
result = task.result() # BackendUnavailable 은 여기서 raise (mid-stream)
# final_answer = OpenAI 호환 1청크(프론트 기존 content 누적 경로 재사용)
yield _sse({"choices": [{"delta": {"content": result.final_answer}}]})
# 근거 = 별 envelope (citation 번호 없음 — 프론트가 순서 기반). partial = 근거 부족 표식
yield _sse({"eid_sources": result.sources, "partial": result.partial})
yield b"data: [DONE]\n\n"
logger.info(
"eid_chat deep ok turns=%d sources=%d partial=%s iters=%d",
len(body.messages), len(result.sources), result.partial, result.iterations,
)
except BackendUnavailable as exc:
# mid-stream 미가용(검색 중 AC 분리·뚜껑 닫힘) — 200 이미 송신, in-stream error envelope.
# error 뒤 [DONE] = 프론트 sawDone 로 '중단' 오경보 방지(명시 error notice 유지).
logger.warning(
"eid_chat deep mid-stream unavailable turns=%d reason=%s",
len(body.messages), exc.reason,
)
yield _sse({"phase": "error", "error_reason": exc.reason})
yield b"data: [DONE]\n\n"
except asyncio.CancelledError:
raise # 클라 disconnect — finally 가 task 정리
except Exception:
logger.exception("eid_chat deep stream failed turns=%d", len(body.messages))
yield _sse({"phase": "error", "error_reason": "deep_failed"})
yield b"data: [DONE]\n\n"
finally:
# 클라 disconnect 시 ReAct task 고아화 방지 — cancel + await(전파 완료 보장).
# 안 하면 27B 가 닫힌 연결 위해 수분 점유, router 동시성상 다음 검색 대기.
if not task.done():
task.cancel()
try:
await task
except (asyncio.CancelledError, Exception):
pass
return StreamingResponse(
_stream(),
media_type="text/event-stream",
headers={"Cache-Control": "no-store", "X-Accel-Buffering": "no"},
)
@router.post("/chat")
async def eid_chat(
body: ChatRequest,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
):
"""이드 채팅 — router SSE 스트리밍 pass-through.
"""이드 채팅 — daily = router SSE pass-through(대화) / deep = ReAct 자동검색(근거).
503 경로 (둘 다 자동 fallback 없음):
503 경로 (모두 자동 fallback 없음):
- substrate_degraded: rules.md 부재 (D-6 fail-closed, 채팅 진행 거부)
- backend_unavailable: 스트림 시작 전 backend 실패 (ask 컨벤션과 동일 shape)
- backend_unavailable: 스트림 시작 전 backend 실패 (daily/deep 공통, ask 컨벤션 shape)
"""
# D-6: rules 부재 = fail-closed. 채팅은 안전·정책 가드 없이 진행하지 않는다(배너 X).
if not eid_compose.rules_present():
@@ -99,6 +247,11 @@ async def eid_chat(
},
)
# deep = ReAct 자동검색 (별 흐름 — probe + 동기 ReAct → SSE 변환)
if body.mode == "deep":
return await _eid_chat_deep(body, session)
# daily = 순수 대화 SSE pass-through (기존)
system = eid_compose.compose("eid_chat", task="")
client = EidAIClient()
stream = client.call_stream(
-5
View File
@@ -98,10 +98,6 @@ class AIConfig(BaseModel):
classifier: AIModelConfig | None = None
# Phase 3.5b: semantic verifier (optional — 없으면 grounding-only). PR #20 이후 Mac mini 26B MLX endpoint (initial = exaone3.5).
verifier: AIModelConfig | None = None
# ds-macbook-offload-1: 심층 전용 슬롯 (optional). 맥북 M5 Max Qwen3.6-27B — llm-router :8890
# 경유(model=qwen-macbook alias, wake preflight 재사용). 부재 시 deep_summary 는 기존
# primary(맥미니 26B) 경로 그대로 = 기능 미활성. 명시 opt-in — silent fallback 없음.
deep: AIModelConfig | None = None
# Legacy: vision 슬롯 (현재 사용처 0 — Document Server 는 OCR/STT 별도 서비스).
# 제거 진행 중이므로 optional 로 관대한 로딩 유지.
vision: AIModelConfig | None = None
@@ -222,7 +218,6 @@ def load_settings() -> Settings:
verifier=(
AIModelConfig(**models["verifier"]) if "verifier" in models else None
),
deep=(AIModelConfig(**models["deep"]) if "deep" in models else None),
deep_summary_backlog=DeepSummaryBacklogConfig(
**ai_raw.get("deep_summary_backlog", {})
),
+1 -28
View File
@@ -2,41 +2,14 @@
from datetime import datetime
from sqlalchemy import BigInteger, DateTime, Enum, ForeignKey, SmallInteger, Text, func, or_, text
from sqlalchemy import BigInteger, DateTime, Enum, ForeignKey, SmallInteger, Text, text
from sqlalchemy.dialects.postgresql import JSONB, insert as pg_insert
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import Mapped, mapped_column
from sqlalchemy.types import TIMESTAMP
from core.database import Base
class StageDeferred(Exception):
"""워커가 '지금은 처리 불가 — 자료 손상 없이 보류' 를 선언하는 신호 (ds-macbook-offload-1).
맥북(M5 Max) deep 슬롯 경로 전용: 503(upstream_cold/editor_busy/warming) · 연결 실패 ·
생성 중 절단(read-timeout, 맥북 sleep) 시 raise. queue_consumer/queue_drain 이 attempts 를
소모하지 않고 pending 복귀 + payload.deferred_until 백오프를 기록한다. 결과 쓰기는 호출
완주 + 파싱 성공 후에만 일어나므로 어느 시점에 끊겨도 부분 쓰기 0 (sleep-안전 불변식).
"""
def __init__(self, reason: str, retry_after_minutes: int = 30):
super().__init__(reason)
self.retry_after_minutes = retry_after_minutes
def not_deferred_condition():
"""보류 백오프(payload.deferred_until, ISO 문자열) 가 미래인 행을 claim 에서 제외.
payload 없음 / 키 없음 = 통과. queue_consumer 와 queue_drain 의 claim 이 공유한다.
"""
deferred = ProcessingQueue.payload["deferred_until"].astext
return or_(
deferred.is_(None),
deferred.cast(TIMESTAMP(timezone=True)) <= func.now(),
)
class ProcessingQueue(Base):
__tablename__ = "processing_queue"
+8
View File
@@ -222,6 +222,14 @@ def get_mlx_gate():
return acquire_mlx_gate(DEFAULT_PRIORITY)
# ── Read-only status (UI 표시용) ─────────────────────────────────────────────
def gate_status() -> dict:
"""현재 gate 점유 스냅샷 (read-only, lock-free 근사치 — UI 표시용)."""
return {"inflight": _inflight, "waiters": len(_waiters)}
# ── Test helpers (conftest reset) ────────────────────────────────────────────
+7 -21
View File
@@ -20,12 +20,12 @@ from sqlalchemy.ext.asyncio import AsyncSession
import json
import re
from ai.client import AIClient, call_deep_or_defer, parse_json_response, strip_thinking
from ai.client import AIClient, parse_json_response, strip_thinking
from ai.envelope import EscalationEnvelope
from core.config import settings
from core.utils import setup_logger
from models.document import Document
from models.queue import ProcessingQueue, StageDeferred
from models.queue import ProcessingQueue
from policy.prompt_render import render_26b, policy_version as compute_policy_version
from services.document_telemetry import record_analyze_event
from services.search.llm_gate import Priority, acquire_mlx_gate
@@ -101,30 +101,17 @@ async def process(document_id: int, session: AsyncSession) -> None:
)
client = AIClient()
# ds-macbook-offload-1: deep 슬롯 구성 시 맥북 M5 Max 경유(라우터). 부재 시 기존 경로 그대로.
deep_cfg = client.ai.deep
used_cfg = deep_cfg or settings.ai.primary
latency_ms = 0
parse_error: str | None = None
deep_out = DeepSummaryOutput()
try:
start = time.perf_counter()
if deep_cfg is not None:
# 맥북 경유 — 맥미니 mlx gate 미점유(게이트는 맥미니 보호 목적). 맥북 불가
# (503/연결/생성 중 sleep 절단)는 StageDeferred = 보류, 맥미니 강등 없음.
# doc 쓰기는 완주+파싱 후에만 일어나므로 어느 시점에 끊겨도 부분 쓰기 0.
raw = await call_deep_or_defer(client, prompt)
else:
async with acquire_mlx_gate(Priority.BACKGROUND): # 2026-05-17 B-1: classify-escalate worker
raw = await client.call_primary(prompt)
async with acquire_mlx_gate(Priority.BACKGROUND): # 2026-05-17 B-1: classify-escalate worker
raw = await client.call_primary(prompt)
latency_ms = int((time.perf_counter() - start) * 1000)
except StageDeferred:
# 보류는 실패가 아님 — analyze_event 미기록(가짜 완료 방지), consumer 가 백오프 기록.
logger.info(f"[deep] id={document_id} 맥북 일시 불가 — 보류 (deferred)")
raise
except Exception as exc:
logger.warning(f"[deep] 호출 실패 id={document_id} model={used_cfg.model}: {exc}")
logger.warning(f"[deep] 26B 호출 실패 id={document_id}: {exc}")
parse_error = "call_failed"
raw = ""
finally:
@@ -160,13 +147,12 @@ async def process(document_id: int, session: AsyncSession) -> None:
doc_id=document_id,
user_id=None,
mode="summary_deep",
text_limit=used_cfg.context_char_limit or 260000,
text_limit=settings.ai.primary.context_char_limit or 260000,
truncated=False,
layers_returned=["detail_summary", "inconsistencies"] if not parse_error else [],
cached=False,
latency_ms=latency_ms,
# deep 슬롯 사용 시 실처리 모델(qwen-macbook alias) 기록 — 어느 머신이 처리했는지 추적
model_name=used_cfg.model,
model_name=settings.ai.primary.model,
prompt_version=(f"{DEEP_SUMMARY_TASK}@{pv}" if pv else DEEP_SUMMARY_TASK),
error_code=parse_error,
source="document_server",
+2 -23
View File
@@ -15,7 +15,7 @@ from sqlalchemy.orm import aliased
from core.database import async_session
from core.utils import setup_logger
from models.queue import ProcessingQueue, StageDeferred, enqueue_stage, not_deferred_condition
from models.queue import ProcessingQueue, enqueue_stage
logger = setup_logger("queue_consumer")
@@ -216,14 +216,13 @@ async def _process_stage(stage, worker_fn):
"""
batch_size = BATCH_SIZE.get(stage, 3)
# pending 항목 조회 (보류 백오프 deferred_until 미래 항목 제외 — ds-macbook-offload-1)
# pending 항목 조회
async with async_session() as session:
result = await session.execute(
select(ProcessingQueue.id, ProcessingQueue.document_id)
.where(
ProcessingQueue.stage == stage,
ProcessingQueue.status == "pending",
not_deferred_condition(),
)
.order_by(ProcessingQueue.created_at)
.limit(batch_size)
@@ -277,26 +276,6 @@ async def _process_stage(stage, worker_fn):
await enqueue_next_stage(document_id, stage)
logger.info(f"[{stage}] document_id={document_id} 완료")
except StageDeferred as defer:
# 보류 (ds-macbook-offload-1): 맥북 일시 불가(sleep/cold/editor_busy) — 실패 아님.
# attempts 는 claim 시 선증가분을 반환(미소모)하고 deferred_until 백오프 후 자연 재개.
# 워커는 완주 전 doc 쓰기를 하지 않으므로 이 시점의 데이터 변경 = 0 (sleep-안전).
async with async_session() as session:
item = await session.get(ProcessingQueue, queue_id)
if not item:
logger.warning(f"[{stage}] queue_id={queue_id} 없음 (삭제됨?), skip")
continue
item.status = "pending"
item.started_at = None
item.attempts = max(0, item.attempts - 1)
until = datetime.now(timezone.utc) + timedelta(minutes=defer.retry_after_minutes)
item.payload = {**(item.payload or {}), "deferred_until": until.isoformat()}
await session.commit()
logger.info(
f"[{stage}] document_id={document_id} 보류({defer}) — "
f"{defer.retry_after_minutes}분 후 재개"
)
except Exception as e:
# 실패 처리
async with async_session() as session:
-183
View File
@@ -1,183 +0,0 @@
"""수동 burst-drain CLI — 맥미니 백로그를 사용자가 의도적으로 맥북(M5 Max)으로 소화.
ds-macbook-offload-1 P2-3. 운영 패턴 = csb_collector --bulk 동일 (컨테이너 실행,
장기 배치 fastapi 재생성 = in-flight 절단이지만 멱등 재실행으로 무손실).
docker compose exec fastapi python -m workers.queue_drain --stage summarize --limit 200
설계 원칙:
- deep 슬롯(config.yaml ai.models.deep) 필수 부재 명시 종료 (silent 강등 금지)
- claim = FOR UPDATE SKIP LOCKED 단건 전이 consumer(1 주기) 이중처리 0
- per-item 커밋 = sleep-안전: 중단돼도 완료분 무손상, 진행 1건만 stale recovery
(10) pending 복귀. 재실행 멱등 (summarize ai_summary 존재 skip)
- 보류(StageDeferred = 맥북 sleep/cold/editor_busy/네트워크 플랩): attempts 반환 +
deferred_until 백오프 기록. 연속 보류 --defer-retries(기본 5)회까지 --defer-wait
(기본 120s) 간격 재시도( 단위 플랩 흡수), 한도 도달 = sleep 판정으로 run 종료
불가 상태의 맥북을 계속 두드리지 않는다
- 폴백 0: 맥미니/cloud 강등 없음
"""
import argparse
import asyncio
from datetime import datetime, timedelta, timezone
from sqlalchemy import select
from core.config import settings
from core.database import async_session
from core.utils import setup_logger
from models.queue import ProcessingQueue, StageDeferred, not_deferred_condition
logger = setup_logger("queue_drain")
# summarize = 맥미니 백로그 본체 / deep_summary = 심층 (consumer 도 deep 슬롯 시 맥북 경유).
# classify 는 triage 경량 호출이라 맥미니 적합 — 대상에서 제외 (plan Q-4).
DRAIN_STAGES = ("summarize", "deep_summary")
async def _claim_one(stage: str) -> tuple[int, int] | None:
"""pending 1건을 processing 으로 원자 전이 (SKIP LOCKED — consumer 와 경합 안전)."""
async with async_session() as session:
item = (await session.execute(
select(ProcessingQueue)
.where(
ProcessingQueue.stage == stage,
ProcessingQueue.status == "pending",
not_deferred_condition(),
)
.order_by(ProcessingQueue.created_at)
.limit(1)
.with_for_update(skip_locked=True)
)).scalar_one_or_none()
if item is None:
return None
item.status = "processing"
item.started_at = datetime.now(timezone.utc)
item.attempts += 1
claimed = (item.id, item.document_id)
await session.commit()
return claimed
async def _mark_completed(queue_id: int) -> None:
async with async_session() as session:
item = await session.get(ProcessingQueue, queue_id)
if item:
item.status = "completed"
item.completed_at = datetime.now(timezone.utc)
await session.commit()
async def _mark_deferred(queue_id: int, defer: StageDeferred) -> None:
"""보류: attempts 반환(미소모) + deferred_until 백오프 — consumer 의 처리와 동형."""
async with async_session() as session:
item = await session.get(ProcessingQueue, queue_id)
if item:
item.status = "pending"
item.started_at = None
item.attempts = max(0, item.attempts - 1)
until = datetime.now(timezone.utc) + timedelta(minutes=defer.retry_after_minutes)
item.payload = {**(item.payload or {}), "deferred_until": until.isoformat()}
await session.commit()
async def _mark_failed(queue_id: int, exc: Exception) -> None:
"""실패: consumer 와 동일 재시도 정책 (attempts >= max → failed, 아니면 pending 복귀)."""
async with async_session() as session:
item = await session.get(ProcessingQueue, queue_id)
if item:
err_text = str(exc) or repr(exc) or type(exc).__name__
item.error_message = err_text[:500]
if item.attempts >= item.max_attempts:
item.status = "failed"
else:
item.status = "pending"
item.started_at = None
await session.commit()
async def drain(stage: str, limit: int, defer_retries: int = 5, defer_wait: int = 120) -> None:
if stage not in DRAIN_STAGES:
raise SystemExit(f"--stage 는 {DRAIN_STAGES} 만 허용 (classify 등은 맥미니 적합 — plan Q-4)")
if settings.ai.deep is None:
raise SystemExit(
"config.yaml ai.models.deep 슬롯 미구성 — drain 은 맥북 분담 전용 레버라 진행하지 않음"
" (맥미니로의 silent 강등 금지)"
)
from workers.deep_summary_worker import process as deep_summary_process
from workers.summarize_worker import process as summarize_process
done = failed = 0
deferred = False
consecutive_defers = 0
while done + failed < limit:
claimed = await _claim_one(stage)
if claimed is None:
logger.info(f"[drain:{stage}] pending 소진 — 종료")
break
queue_id, document_id = claimed
try:
async with async_session() as worker_session:
if stage == "summarize":
await summarize_process(document_id, worker_session, use_deep=True)
else:
# deep_summary 는 deep 슬롯 구성 시 워커가 자체적으로 맥북 경유
await deep_summary_process(document_id, worker_session)
await worker_session.commit()
await _mark_completed(queue_id)
done += 1
consecutive_defers = 0
logger.info(f"[drain:{stage}] {done}/{limit} doc={document_id} 완료")
except StageDeferred as defer:
# 일시 불가는 종류가 둘: 진짜 sleep(장시간) vs 일시 네트워크 플랩(분 단위 —
# 2026-06-11 실측: Tailscale direct 경로 ~10분 플랩으로 32/300 조기 종료).
# 연속 보류 한도까지 대기 후 재시도해 플랩을 흡수, 한도 도달 시 종료(sleep 판정).
await _mark_deferred(queue_id, defer)
consecutive_defers += 1
if consecutive_defers >= defer_retries:
deferred = True
logger.warning(
f"[drain:{stage}] doc={document_id} 맥북 불가({defer}) — 연속 보류 "
f"{consecutive_defers}회 한도 도달, run 종료. 맥북 깨운 뒤(또는 "
f"{defer.retry_after_minutes}분 후) 재실행"
)
break
logger.warning(
f"[drain:{stage}] doc={document_id} 맥북 일시 불가({defer}) — "
f"{defer_wait}s 대기 후 재시도 ({consecutive_defers}/{defer_retries})"
)
await asyncio.sleep(defer_wait)
except Exception as exc:
await _mark_failed(queue_id, exc)
failed += 1
logger.error(f"[drain:{stage}] doc={document_id} 실패: {exc}")
# 종료 요약 (잔여 = 지금 시점 pending 수)
async with async_session() as session:
from sqlalchemy import func as sa_func
remaining = (await session.execute(
select(sa_func.count()).select_from(ProcessingQueue).where(
ProcessingQueue.stage == stage, ProcessingQueue.status == "pending",
)
)).scalar_one()
logger.info(
f"[drain:{stage}] 요약 — 완료 {done} · 실패 {failed} · "
f"보류종료 {'' if deferred else '아니오'} · 잔여 pending {remaining}"
)
def main() -> None:
parser = argparse.ArgumentParser(description="맥북(M5 Max) burst-drain — 수동 백로그 분담 레버")
parser.add_argument("--stage", required=True, choices=DRAIN_STAGES)
parser.add_argument("--limit", type=int, default=50, help="이번 run 최대 처리 건수 (기본 50)")
parser.add_argument("--defer-retries", type=int, default=5,
help="연속 보류 허용 횟수 — 네트워크 플랩 흡수 (기본 5, 한도 도달 시 종료)")
parser.add_argument("--defer-wait", type=int, default=120,
help="보류 재시도 간 대기 초 (기본 120)")
args = parser.parse_args()
asyncio.run(drain(args.stage, args.limit, args.defer_retries, args.defer_wait))
if __name__ == "__main__":
main()
+7 -35
View File
@@ -2,37 +2,27 @@
P3 of family-adaptive-bengio (2026-05-23): 50k 초과 input sliding window
(cumulative carry-over) 분할 처리. 50k 이하 input 기존 동작 유지.
ds-macbook-offload-1: use_deep=True (queue_drain 전용) 맥북 M5 Max deep 슬롯으로
호출 맥미니 백로그를 사용자가 의도적으로 분담시키는 수동 레버. 기본(consumer) 경로는
use_deep=False 기존 동작 그대로. 맥북 불가 StageDeferred (강등 0, 부분 쓰기 0).
"""
from datetime import datetime, timezone
from sqlalchemy.ext.asyncio import AsyncSession
from ai.client import AIClient, call_deep_or_defer, strip_thinking
from ai.client import AIClient, strip_thinking
from core.utils import setup_logger
from models.document import Document
logger = setup_logger("summarize_worker")
CHUNK_SIZE = 50000
# client.summarize() 의 단일 프롬프트와 동일 문구 — deep 경로가 같은 과업을 수행하도록 고정
SUMMARY_PROMPT_SINGLE = "다음 문서를 500자 이내로 요약해주세요:\n\n{text}"
SUMMARY_PROMPT_CONTINUATION = (
"이전 부분 요약:\n{prior}\n\n다음 부분:\n{text}\n\n"
"위 두 정보를 합쳐 전체 문서를 500자 이내로 요약해주세요."
)
async def process(document_id: int, session: AsyncSession, *, use_deep: bool = False) -> None:
"""문서 AI 요약 생성 (분류 없이 요약만).
use_deep: queue_drain 전용 deep 슬롯(맥북) 경유. 슬롯 미구성 명시 에러
(silent 강등 금지). consumer 기본 경로는 False (기존 동작 무변경).
"""
async def process(document_id: int, session: AsyncSession) -> None:
"""문서 AI 요약 생성 (분류 없이 요약만)"""
doc = await session.get(Document, document_id)
if not doc:
raise ValueError(f"문서 ID {document_id}를 찾을 수 없음")
@@ -45,29 +35,13 @@ async def process(document_id: int, session: AsyncSession, *, use_deep: bool = F
return
client = AIClient()
if use_deep and client.ai.deep is None:
await client.close()
raise ValueError("use_deep=True 인데 config.yaml ai.models.deep 슬롯 미구성 — silent 강등 금지")
used_cfg = client.ai.deep if use_deep else client.ai.primary
async def _summarize_first(text_part: str) -> str:
if use_deep:
return await call_deep_or_defer(client, SUMMARY_PROMPT_SINGLE.format(text=text_part))
return await client.summarize(text_part)
async def _summarize_continuation(prompt: str) -> str:
if use_deep:
return await call_deep_or_defer(client, prompt)
return await client.call_primary(prompt)
try:
text = doc.extracted_text
total_chars = len(text)
if total_chars <= CHUNK_SIZE:
summary = await _summarize_first(text)
summary = await client.summarize(text)
logger.info(
f"[요약] document_id={document_id}: single chunk ({total_chars}자)"
+ (" via deep(맥북)" if use_deep else "")
)
else:
chunks = [text[i:i + CHUNK_SIZE] for i in range(0, total_chars, CHUNK_SIZE)]
@@ -78,10 +52,10 @@ async def process(document_id: int, session: AsyncSession, *, use_deep: bool = F
carry = ""
for idx, chunk in enumerate(chunks):
if idx == 0:
partial = await _summarize_first(chunk)
partial = await client.summarize(chunk)
else:
prompt = SUMMARY_PROMPT_CONTINUATION.format(prior=carry, text=chunk)
partial = await _summarize_continuation(prompt)
partial = await client.call_primary(prompt)
carry = strip_thinking(partial)
logger.info(
f"[요약] document_id={document_id}: chunk {idx + 1}/{len(chunks)} done "
@@ -89,10 +63,8 @@ async def process(document_id: int, session: AsyncSession, *, use_deep: bool = F
)
summary = carry
# sleep-안전 불변식: 쓰기는 전체 완주 후에만 — 중간 절단은 StageDeferred 로 빠져
# 이 지점에 도달하지 않는다 (carry 는 로컬 변수, doc 무변경).
doc.ai_summary = strip_thinking(summary)
doc.ai_model_version = used_cfg.model
doc.ai_model_version = client.ai.primary.model
doc.ai_processed_at = datetime.now(timezone.utc)
logger.info(
f"[요약] document_id={document_id}: {len(doc.ai_summary)}자 final"
@@ -0,0 +1,31 @@
<!--
EidEvidenceCard — 이드 채팅 deep(검색) 답변의 근거 카드 (ds-eid-ask-absorb P1).
ReactResult.sources = {id, doc_id, title, score} (citation 번호 n 없음 — /ask 의 Citation 과
다름) → 순서 기반 번호([1],[2]...). 1단계 카드 = 제목·출처·점수 (스니펫은 react_loop
_result_payload items_src 에 없음 — 2단계 후보). 접이식 <details> 로 채팅 흐름 보존.
디자인 토큰만 (CLAUDE.md lint:tokens).
-->
<script lang="ts">
type EidSource = { id?: number; doc_id?: number; title?: string; score?: number };
let { sources, partial = false }: { sources: EidSource[]; partial?: boolean } = $props();
</script>
{#if sources.length}
<details class="mt-2 rounded-lg border border-default bg-surface text-xs max-w-[85%] sm:max-w-[75%]">
<summary class="cursor-pointer px-3 py-2 text-dim hover:text-text select-none font-semibold">
근거 {sources.length}{partial ? ' · 부분 답변 (확정 근거 부족)' : ''}
</summary>
<ul class="px-3 pb-2.5 flex flex-col gap-1.5">
{#each sources as src, i (src.id ?? i)}
<li class="flex items-start gap-2">
<span class="text-accent font-bold shrink-0">[{i + 1}]</span>
<span class="flex-1 min-w-0 text-text break-words">{src.title || `문서 ${src.doc_id ?? '?'}`}</span>
{#if typeof src.score === 'number'}
<span class="text-faint shrink-0 tabular-nums">{src.score.toFixed(2)}</span>
{/if}
</li>
{/each}
</ul>
</details>
{/if}
+194 -10
View File
@@ -17,6 +17,12 @@
macbook_unavailable / substrate_degraded / 기타 detail). 자동 fallback
금지 — 다른 모드로 자동 전환하지 않는다. 스트림 도중 중단 = 받은 부분
유지 + 표시.
- 대기 표시(첫 바이트 전): 경과 타이머 1초 갱신 + 3초 후 GET /api/eid/status
1회·이후 8초 간격 재조회(실패는 조용히 무시 — 기능 비차단)로 "대기"와
"고장"을 정직하게 구분. daily.busy=true 면 줄 서는 중 안내. 15초 경과 +
daily 모드면 [심층으로 전환]/[취소] 버튼 노출 — 전환은 명시 클릭만
(자동 fallback 금지 정책 위반 아님). 첫 바이트 도착/스트림 종료 시
타이머·폴링 즉시 정리.
- 이력: localStorage `eid_chat:v1` (키 상수는 $lib/eidChat — logout 시 제거와 공유).
전송 payload 는 마지막 20턴(40 messages) cap.
- 입력 한도: 메시지당 8,000자 클라 선차단(서버 422 검증과 동일 한도).
@@ -25,15 +31,25 @@
-->
<script lang="ts">
import { onMount, onDestroy } from 'svelte';
import { apiFetchRaw } from '$lib/api';
import { api, apiFetchRaw } from '$lib/api';
import { EID_CHAT_STORAGE_KEY } from '$lib/eidChat';
import Button from '$lib/components/ui/Button.svelte';
import EmptyState from '$lib/components/ui/EmptyState.svelte';
import EidEvidenceCard from '$lib/components/eid/EidEvidenceCard.svelte';
import { MessageCircle, SendHorizontal, RotateCcw, AlertCircle } from 'lucide-svelte';
type ChatMode = 'daily' | 'deep';
type ChatMessage = { role: 'user' | 'assistant'; content: string };
// deep(검색) 답변은 sources(근거)·partial 동반. daily 답변은 없음.
type EidSource = { id?: number; doc_id?: number; title?: string; score?: number };
type ChatMessage = {
role: 'user' | 'assistant';
content: string;
sources?: EidSource[];
partial?: boolean;
};
type Notice = { kind: 'warn' | 'error'; message: string; retryable: boolean };
// GET /api/eid/status 응답 — 대기 중 바쁨 신호 조회에 필요한 필드만 좁게 정의
type EidStatus = { daily?: { busy?: boolean } };
// 이력 키 — logout(stores/auth.ts) 의 이력 제거와 단일 상수 공유
const STORAGE_KEY = EID_CHAT_STORAGE_KEY;
@@ -45,6 +61,10 @@
const MAX_MESSAGE_CHARS = 8000;
// 한도 근접 카운터 노출 시작점
const COUNTER_THRESHOLD = 7500;
// 대기 표시(첫 바이트 전): 상태 폴링 시작 시점(초) / 재조회 간격(초) / 행동 버튼 노출 시점(초)
const STATUS_POLL_START_SEC = 3;
const STATUS_POLL_INTERVAL_SEC = 8;
const WAIT_ACTIONS_SEC = 15;
const DEEP_CAPTION =
'장문·무거운 질문에 적합 — 잠들어 있으면 자동 기동 (처음 응답까지 최대 ~1분)';
@@ -64,11 +84,72 @@
let streaming = $state(false);
let streamingText = $state('');
let notice = $state<Notice | null>(null);
// deep(검색) 모드 첫 바이트 전 단계 — 'searching' 이면 대기 표시를 "근거 검색 중"으로
let deepPhase = $state<'searching' | null>(null);
let scrollEl: HTMLDivElement | undefined = $state();
let textareaEl: HTMLTextAreaElement | undefined = $state();
let abortCtrl: AbortController | null = null;
// ── 대기 추적 (첫 바이트 전) ────────────────────────
// 경과 초 + daily 엔진 바쁨 여부(null = 미확인). 토큰(세대 카운터)으로
// 스트림별 소유를 구분 — abort 직후 즉시 재전송(심층 전환) 경로에서
// 이전 스트림의 늦은 정리가 새 스트림의 타이머를 죽이지 않게 한다.
let waitSeconds = $state(0);
let dailyBusy = $state<boolean | null>(null);
let waitIntervalId: ReturnType<typeof setInterval> | null = null;
let waitTokenSeq = 0;
let waitToken = 0; // 현재 활성 추적 토큰 (0 = 추적 없음)
function startWaitTracking(streamMode: ChatMode): number {
// 이전 추적 잔여 정리 (전환 재전송처럼 stop 전에 start 가 오는 경로 방어)
if (waitIntervalId !== null) {
clearInterval(waitIntervalId);
waitIntervalId = null;
}
const token = ++waitTokenSeq;
waitToken = token;
waitSeconds = 0;
dailyBusy = null;
waitIntervalId = setInterval(() => {
if (waitToken !== token) return; // 정리 누락 방어 — 무해 no-op
waitSeconds += 1;
// 바쁨 신호 폴링: 3초 경과 시 1회 + 이후 8초 간격 (3, 11, 19, ...).
// daily 모드 전용 — deep 대기는 기존 wake 안내 + 경과 타이머만.
if (
streamMode === 'daily' &&
waitSeconds >= STATUS_POLL_START_SEC &&
(waitSeconds - STATUS_POLL_START_SEC) % STATUS_POLL_INTERVAL_SEC === 0
) {
void pollEidStatus(token);
}
}, 1000);
return token;
}
// token 가드: 본인 소유 추적만 정리 — 다른 스트림이 이어받았으면 no-op
function stopWaitTracking(token: number) {
if (token !== waitToken) return;
waitToken = 0;
if (waitIntervalId !== null) {
clearInterval(waitIntervalId);
waitIntervalId = null;
}
waitSeconds = 0;
dailyBusy = null;
}
// 상태 조회 — 실패는 조용히 무시 (대기 표시는 타이머만으로 유지, 기능 비차단)
async function pollEidStatus(token: number) {
try {
const status = await api<EidStatus>('/eid/status');
if (token !== waitToken) return; // 스트림 종료/교체 후 도착한 늦은 응답 폐기
dailyBusy = status?.daily?.busy === true;
} catch {
// 무시 — 바쁨 신호는 부가 정보일 뿐 채팅 기능을 차단하지 않는다
}
}
// ── localStorage 이력 ───────────────────────────────
function persist() {
if (typeof window === 'undefined') return;
@@ -97,9 +178,15 @@
typeof (m as ChatMessage).content === 'string'
)
// 배열 크기 가드 + content 8,000자 clamp — 외부에서 손상/비대해진
// 이력이 전송 payload 를 오염시키지 않도록 복원 시점에 정규화
// 이력이 전송 payload 를 오염시키지 않도록 복원 시점에 정규화.
// sources/partial(deep 답변 근거)은 보존 — 전송 payload 엔 안 실림(runStream map 이 role/content 만).
.slice(-MAX_STORED_MESSAGES)
.map((m) => ({ role: m.role, content: m.content.slice(0, MAX_MESSAGE_CHARS) }));
.map((m) => ({
role: m.role,
content: m.content.slice(0, MAX_MESSAGE_CHARS),
sources: Array.isArray((m as ChatMessage).sources) ? (m as ChatMessage).sources : undefined,
partial: (m as ChatMessage).partial === true || undefined,
}));
}
} catch {
// 손상된 이력은 무시 (새 대화로 시작)
@@ -107,7 +194,11 @@
}
onMount(() => restore());
onDestroy(() => abortCtrl?.abort());
onDestroy(() => {
abortCtrl?.abort();
// 페이지 이탈 시 대기 타이머/폴링 정리 (abort 의 finally 와 이중이어도 무해)
if (waitIntervalId !== null) clearInterval(waitIntervalId);
});
// ── 자동 스크롤 (새 메시지 / 스트림 청크마다 하단 고정) ──
$effect(() => {
@@ -235,12 +326,39 @@
void runStream();
}
// ── 대기 중 행동 버튼 (daily + 15초 경과) ────────────
// [심층으로 전환] — 명시 클릭에 의한 모드 전환 (자동 fallback 금지 정책
// 위반 아님). 현재 fetch abort → 같은 user 턴을 mode=deep 으로 즉시 재전송.
// abort 된 이전 스트림의 finally 는 abortCtrl 비교 + 대기 token 가드로
// 새 스트림 상태를 건드리지 않는다 (새 대화 abort race 가드와 동일 구조).
function switchToDeep() {
if (!streaming || mode !== 'daily') return;
mode = 'deep'; // 모드 토글 상태도 deep 으로 갱신
abortCtrl?.abort();
void runStream();
}
// [취소] — abort 후 방금 push 한 user 턴 pop + 입력창 본문 복원
// (422 처리와 동일 패턴: 이력 오염 차단 + localStorage 재저장).
// placeholder 제거는 abort 된 스트림의 finally(streaming=false)가 처리.
function cancelWait() {
if (!streaming) return;
abortCtrl?.abort();
if (messages.length > 0 && messages[messages.length - 1].role === 'user') {
const popped = messages.pop();
if (popped && !input) input = popped.content;
persist();
}
}
async function runStream() {
notice = null;
streaming = true;
streamingText = '';
const ctrl = new AbortController();
abortCtrl = ctrl;
// 첫 바이트 전 대기 추적 시작 — 본 스트림 소유 토큰으로 정리 시점 제어
const waitTok = startWaitTracking(mode);
const payload = {
mode,
@@ -251,6 +369,9 @@
let acc = '';
let sawDone = false;
// deep(검색) 답변 동반 데이터 — daily 는 안 옴
let accSources: EidSource[] = [];
let accPartial = false;
try {
const res = await apiFetchRaw('/eid/chat', {
@@ -301,9 +422,35 @@
try {
const obj = JSON.parse(data) as {
choices?: Array<{ delta?: { content?: unknown } }>;
phase?: string;
error_reason?: string;
eid_sources?: EidSource[];
partial?: boolean;
};
// deep(검색) envelope 분기 — daily 응답엔 없음
if (obj?.phase === 'ping') return false; // heartbeat — 무시
if (obj?.phase === 'searching') {
deepPhase = 'searching'; // 대기 표시를 "근거 검색 중"으로
return false;
}
if (obj?.phase === 'error') {
// in-stream 미가용/실패 — 받은 부분 유지 + 명시 표시 (자동 fallback 0).
// 뒤따르는 [DONE] 이 sawDone 처리하므로 '중단' 오경보 없음.
notice = mapErrorReason(obj.error_reason, '');
return false;
}
if (Array.isArray(obj?.eid_sources)) {
accSources = obj.eid_sources;
accPartial = obj.partial === true;
return false;
}
const piece = obj?.choices?.[0]?.delta?.content;
if (typeof piece === 'string' && piece) {
// 첫 바이트 도착 — 대기 타이머/폴링 제거, 기존 스트리밍 표시로 전환
if (!acc) {
stopWaitTracking(waitTok);
deepPhase = null;
}
acc += piece;
streamingText = acc;
}
@@ -356,7 +503,7 @@
}
} catch (err) {
if ((err as Error)?.name === 'AbortError') {
// 새 대화 등 사용자 의도 중단 — 안내 불필요
// 새 대화 / 대기 취소 / 심층 전환 등 사용자 의도 중단 — 안내 불필요
return;
}
// 스트림 도중 네트워크 에러 — 받은 부분 유지 + 표시
@@ -368,14 +515,23 @@
}
: { kind: 'error', message: '요청에 실패했습니다 — 네트워크를 확인하세요.', retryable: true };
} finally {
// 스트림 종료 — 대기 타이머/폴링 정리. 첫 바이트에서 이미 정리됐거나
// 전환 재전송으로 새 스트림이 추적을 이어받았으면 token 가드로 no-op.
stopWaitTracking(waitTok);
// abort(새 대화/페이지 이탈) 시에는 push 하지 않음 — 새 대화로 비운
// messages 에 이전 스트림 잔여분이 흘러들어가는 race 방지.
if (acc && !ctrl.signal.aborted) {
messages.push({ role: 'assistant', content: acc });
messages.push({
role: 'assistant',
content: acc,
sources: accSources.length ? accSources : undefined,
partial: accPartial || undefined,
});
}
if (abortCtrl === ctrl) {
streaming = false;
streamingText = '';
deepPhase = null;
abortCtrl = null;
}
persist();
@@ -398,6 +554,24 @@
// 입력 길이(전송 기준 = trim 후) — 7,500자부터 카운터 노출, 8,000자 초과 차단
let inputLength = $derived(input.trim().length);
let overLimit = $derived(inputLength > MAX_MESSAGE_CHARS);
// 첫 바이트 전 placeholder 문구 — "대기"와 "고장"의 정직한 구분:
// 바쁨 확인 = 줄 서는 중 / 비-바쁨 확인 = 생성 준비 중 / 미확인 = 응답 대기 중.
// deep 모드는 폴링하지 않으므로 항상 미확인(타이머만) — wake 안내는 헤더 caption.
let waitPlaceholder = $derived(
deepPhase === 'searching'
? `이드가 문서·뉴스에서 근거를 찾는 중 · ${waitSeconds}초`
: dailyBusy === true
? `엔진이 다른 작업을 처리하고 있어요 — 차례가 오면 바로 시작됩니다 (대기 ${waitSeconds}초)`
: dailyBusy === false
? `응답 생성 준비 중 · ${waitSeconds}초`
: `응답 대기 중 · ${waitSeconds}초`
);
// 행동 버튼 노출: daily 모드 + 첫 바이트 전 + 15초 경과
let showWaitActions = $derived(
streaming && !streamingText && mode === 'daily' && waitSeconds >= WAIT_ACTIONS_SEC
);
</script>
<svelte:head>
@@ -473,25 +647,35 @@
</div>
</div>
{:else}
<div class="flex justify-start">
<div class="flex flex-col items-start">
<div class="max-w-[85%] sm:max-w-[75%] px-3.5 py-2.5 rounded-lg rounded-bl-sm bg-surface border border-default text-text text-sm whitespace-pre-wrap break-words">
{msg.content}
</div>
{#if msg.sources?.length}
<EidEvidenceCard sources={msg.sources} partial={msg.partial ?? false} />
{/if}
</div>
{/if}
{/each}
<!-- 스트리밍 중 assistant 부분 응답 -->
<!-- 스트리밍 중 assistant 부분 응답 / 첫 바이트 전 대기 표시 -->
{#if streaming}
<div class="flex justify-start">
<div class="max-w-[85%] sm:max-w-[75%] px-3.5 py-2.5 rounded-lg rounded-bl-sm bg-surface border border-default text-text text-sm whitespace-pre-wrap break-words">
{#if streamingText}
{streamingText}<span class="inline-block w-1.5 h-3.5 ml-0.5 align-middle bg-accent animate-pulse rounded-sm"></span>
{:else}
<span class="text-dim animate-pulse">응답 준비 중...</span>
<span class="text-dim animate-pulse">{waitPlaceholder}</span>
{/if}
</div>
</div>
<!-- 대기 행동 버튼: daily + 15초 경과 — 전환은 명시 클릭만 (자동 fallback 금지) -->
{#if showWaitActions}
<div class="flex justify-start gap-2">
<Button variant="secondary" size="sm" onclick={switchToDeep}>심층으로 전환</Button>
<Button variant="ghost" size="sm" onclick={cancelWait}>취소</Button>
</div>
{/if}
{/if}
<!-- 에러/안내 카드: 자동 fallback 없이 명시 표시만 -->
+152
View File
@@ -0,0 +1,152 @@
"""POST /api/eid/chat mode=deep — ReAct 자동검색 SSE 변환 (ds-eid-ask-absorb P1).
DB·LLM 0: get_session/get_current_user dependency override, probe·agentic_ask_loop·
get_backend monkeypatch. 실제 검색·27B 호출 없음.
검증: 검색성phase:searching+content+eid_sources+DONE / probe 실패503 /
mid-stream BackendUnavailablein-stream error envelope / 대화성sources .
"""
from __future__ import annotations
import json
import sys
import types
from pathlib import Path
import pytest
import pytest_asyncio
from fastapi import FastAPI
from httpx import ASGITransport, AsyncClient
sys.path.insert(0, str(Path(__file__).resolve().parents[2] / "app"))
import api.eid_chat as eid_chat # noqa: E402
from api.eid_chat import router as eid_chat_router # noqa: E402
from core.auth import get_current_user # noqa: E402
from core.database import get_session # noqa: E402
from services.llm.backends import BackendUnavailable # noqa: E402
from services.search.react_loop import ReactResult # noqa: E402
_DEEP = {"mode": "deep", "messages": [{"role": "user", "content": "콜드박스 위험성평가 찾아줘"}]}
async def _async_true() -> bool:
return True
async def _async_false() -> bool:
return False
def _build_app() -> FastAPI:
app = FastAPI()
app.include_router(eid_chat_router, prefix="/api/eid")
app.dependency_overrides[get_current_user] = lambda: types.SimpleNamespace(
id=1, username="test-user"
)
async def _fake_session():
yield None # deep 경로는 session 을 agentic_ask_loop 에 넘기기만(여기선 monkeypatch)
app.dependency_overrides[get_session] = _fake_session
return app
def _data_objs(raw: bytes) -> list[dict]:
out: list[dict] = []
for line in raw.split(b"\n"):
if line.startswith(b"data: ") and line[len(b"data: "):].strip() != b"[DONE]":
try:
out.append(json.loads(line[len(b"data: "):]))
except Exception:
pass
return out
@pytest_asyncio.fixture
async def client():
async with AsyncClient(
transport=ASGITransport(app=_build_app()), base_url="http://test"
) as ac:
yield ac
@pytest.fixture(autouse=True)
def _rules_present(monkeypatch):
# D-6 fail-closed 가드 통과 (substrate degraded 아님)
monkeypatch.setattr(eid_chat.eid_compose, "rules_present", lambda: True)
@pytest.mark.asyncio
async def test_deep_search_sse_shape(client, monkeypatch):
"""검색성 질문 → phase:searching + final content + eid_sources + DONE 순서."""
monkeypatch.setattr(eid_chat, "_probe_router_reachable", _async_true)
monkeypatch.setattr(eid_chat, "get_backend", lambda name: object())
async def _fake_loop(session, query, *, backend, **kw):
return ReactResult(
final_answer="콜드박스 위험성평가는 TK-RA-2026-OT1-01 입니다.",
iterations=1,
partial=False,
sources=[{"id": 1, "doc_id": 10, "title": "OT1 콜드박스 위험성평가", "score": 0.91}],
)
monkeypatch.setattr(eid_chat, "agentic_ask_loop", _fake_loop)
r = await client.post("/api/eid/chat", json=_DEEP)
assert r.status_code == 200
objs = _data_objs(r.content)
assert "searching" in [o.get("phase") for o in objs if "phase" in o]
content = "".join(
o["choices"][0]["delta"]["content"] for o in objs if "choices" in o
)
assert "OT1-01" in content
srcs = [o["eid_sources"] for o in objs if "eid_sources" in o]
assert srcs and srcs[0][0]["title"] == "OT1 콜드박스 위험성평가"
assert b"data: [DONE]" in r.content
@pytest.mark.asyncio
async def test_deep_conversational_no_sources(client, monkeypatch):
"""대화성(검색 불요) → ReAct early-exit, sources 빈 배열."""
monkeypatch.setattr(eid_chat, "_probe_router_reachable", _async_true)
monkeypatch.setattr(eid_chat, "get_backend", lambda name: object())
async def _chat_loop(session, query, *, backend, **kw):
return ReactResult(final_answer="안녕하세요, 이드입니다.", iterations=1, partial=False, sources=[])
monkeypatch.setattr(eid_chat, "agentic_ask_loop", _chat_loop)
r = await client.post("/api/eid/chat", json=_DEEP)
assert r.status_code == 200
objs = _data_objs(r.content)
srcs = [o["eid_sources"] for o in objs if "eid_sources" in o]
assert srcs and srcs[0] == [] # 검색 안 함 = 근거 카드 안 뜸
@pytest.mark.asyncio
async def test_deep_probe_fail_503(client, monkeypatch):
"""probe 실패(router 미도달) → 첫 바이트 전 503 macbook_unavailable."""
monkeypatch.setattr(eid_chat, "_probe_router_reachable", _async_false)
r = await client.post("/api/eid/chat", json=_DEEP)
assert r.status_code == 503
assert r.json()["error_reason"] == "macbook_unavailable"
@pytest.mark.asyncio
async def test_deep_midstream_error_envelope(client, monkeypatch):
"""검색 중 BackendUnavailable(AC 분리 등) → in-stream error envelope + DONE."""
monkeypatch.setattr(eid_chat, "_probe_router_reachable", _async_true)
monkeypatch.setattr(eid_chat, "get_backend", lambda name: object())
async def _fail_loop(session, query, *, backend, **kw):
raise BackendUnavailable("qwen-macbook", "macbook_unavailable")
monkeypatch.setattr(eid_chat, "agentic_ask_loop", _fail_loop)
r = await client.post("/api/eid/chat", json=_DEEP)
assert r.status_code == 200 # 스트림 이미 시작(probe 통과) → 200 + in-stream error
objs = _data_objs(r.content)
errs = [o for o in objs if o.get("phase") == "error"]
assert errs and errs[0]["error_reason"] == "macbook_unavailable"
assert b"data: [DONE]" in r.content
+6 -2
View File
@@ -131,6 +131,8 @@ async def test_503_substrate_degraded(client, monkeypatch):
@pytest.mark.asyncio
async def test_503_backend_unavailable_prestream(client, monkeypatch):
# call_stream 회귀(prestream 503)는 daily 로 검증 — deep 은 이제 ReAct 별 경로
# (probe·agentic_ask_loop), deep 의 503/midstream 은 test_eid_chat_deep.py 가 커버.
async def fake_call_stream(self, mode, messages, system):
raise BackendUnavailable("qwen-macbook", "macbook_unavailable")
yield b"" # pragma: no cover — async generator 형태 유지용
@@ -138,7 +140,7 @@ async def test_503_backend_unavailable_prestream(client, monkeypatch):
monkeypatch.setattr(EidAIClient, "call_stream", fake_call_stream)
r = await client.post(
"/api/eid/chat",
json={"mode": "deep", "messages": [{"role": "user", "content": "x"}]},
json={"mode": "daily", "messages": [{"role": "user", "content": "x"}]},
)
assert r.status_code == 503
js = r.json()
@@ -192,9 +194,11 @@ async def test_200_midstream_abort_quiet(client, monkeypatch):
raise BackendUnavailable("qwen-macbook", "stream_deadline_exceeded")
monkeypatch.setattr(EidAIClient, "call_stream", fake_call_stream)
# call_stream midstream 회귀는 daily 로 — deep midstream 은 in-stream error envelope
# 경로(test_eid_chat_deep.test_deep_midstream_error_envelope)로 분리됨.
r = await client.post(
"/api/eid/chat",
json={"mode": "deep", "messages": [{"role": "user", "content": "x"}]},
json={"mode": "daily", "messages": [{"role": "user", "content": "x"}]},
)
assert r.status_code == 200
assert r.content == b'data: {"x": 1}\n\n'
+112
View File
@@ -0,0 +1,112 @@
"""GET /api/eid/status endpoint 테스트 — inline ASGI app (DB 의존 0).
실행 환경: fastapi + httpx 필요 test_eid_chat_endpoint.py 동일 idiom.
DB 0 / LLM 0: get_current_user dependency_overrides 대체, gate 점유는
llm_gate.gate_status monkeypatch (eid_chat 모듈 attribute 호출하므로 유효).
무인증 케이스는 실제 auth 경로지만 decode 단계에서 거부돼 DB 접근 반환.
"""
from __future__ import annotations
import sys
import types
from pathlib import Path
import pytest
import pytest_asyncio
from fastapi import FastAPI
from httpx import ASGITransport, AsyncClient
sys.path.insert(0, str(Path(__file__).resolve().parents[2] / "app"))
from api.eid_chat import router as eid_chat_router # noqa: E402
from core.auth import get_current_user # noqa: E402
from services.search import llm_gate # noqa: E402
def _build_app(*, override_auth: bool = True) -> FastAPI:
"""main.py 등록 방식과 동일 prefix(/api/eid)로 라우터만 올린 inline app."""
app = FastAPI()
app.include_router(eid_chat_router, prefix="/api/eid")
if override_auth:
app.dependency_overrides[get_current_user] = lambda: types.SimpleNamespace(
id=1, username="test-user"
)
return app
@pytest_asyncio.fixture
async def client():
async with AsyncClient(
transport=ASGITransport(app=_build_app()), base_url="http://test"
) as ac:
yield ac
# ── 401 무인증 ────────────────────────────────────────────────────────────────
@pytest.mark.asyncio
async def test_unauthenticated_rejected():
async with AsyncClient(
transport=ASGITransport(app=_build_app(override_auth=False)),
base_url="http://test",
) as ac:
# 헤더 자체 부재 — HTTPBearer 단계 거부 (fastapi 기본 403, 버전별 401 허용)
r = await ac.get("/api/eid/status")
assert r.status_code in (401, 403)
# 위조 토큰 — decode_token 실패 → 401 (DB 접근 전 거부)
r2 = await ac.get(
"/api/eid/status", headers={"Authorization": "Bearer bogus-token"}
)
assert r2.status_code == 401
# ── 200 shape ────────────────────────────────────────────────────────────────
@pytest.mark.asyncio
async def test_200_shape(client, monkeypatch):
"""응답 shape — daily 키 아래 busy/inflight/waiters 3필드, 타입 고정."""
monkeypatch.setattr(
llm_gate, "gate_status", lambda: {"inflight": False, "waiters": 0}
)
r = await client.get("/api/eid/status")
assert r.status_code == 200, r.text
js = r.json()
assert set(js.keys()) == {"daily"}
assert set(js["daily"].keys()) == {"busy", "inflight", "waiters"}
assert isinstance(js["daily"]["busy"], bool)
assert isinstance(js["daily"]["inflight"], bool)
assert isinstance(js["daily"]["waiters"], int)
# ── busy 판정 — gate_status monkeypatch ──────────────────────────────────────
@pytest.mark.asyncio
@pytest.mark.parametrize(
"snap, expected",
[
# 유휴 — busy=false (근사: 외부 소비자 점유는 미포착)
(
{"inflight": False, "waiters": 0},
{"busy": False, "inflight": False, "waiters": 0},
),
# inflight 만 — busy=true (확실)
(
{"inflight": True, "waiters": 0},
{"busy": True, "inflight": True, "waiters": 0},
),
# waiters 만 — busy=true (inflight or waiters>0 의 or 분기)
(
{"inflight": False, "waiters": 3},
{"busy": True, "inflight": False, "waiters": 3},
),
],
)
async def test_busy_from_gate_status(client, monkeypatch, snap, expected):
monkeypatch.setattr(llm_gate, "gate_status", lambda: dict(snap))
r = await client.get("/api/eid/status")
assert r.status_code == 200, r.text
assert r.json() == {"daily": expected}
-32
View File
@@ -1,32 +0,0 @@
{
"id": "chatcmpl-80cd8ddc-7788-4605-b40e-3975fe7e1326",
"object": "chat.completion",
"created": 1781149952,
"model": "/Users/hyungi/mlx-models/Qwen3.6-27B-8bit",
"choices": [
{
"index": 0,
"finish_reason": "stop",
"message": {
"role": "assistant",
"content": "\uc81c\uacf5\ub41c \ubb38\uc11c\ub294 \uc555\ub825\uc6a9\uae30 \uac80\uc0ac\uc758 \uae30\uc900\uc774 \ub418\ub294 \uaddc\uc815\uc744 \uba85\uc2dc\ud558\uace0 \uc788\uc2b5\ub2c8\ub2e4. \ud575\uc2ec \ub0b4\uc6a9\uc740 \uc555\ub825\uc6a9\uae30\uc5d0 \ub300\ud55c \ubaa8\ub4e0 \uac80\uc0ac \uc808\ucc28\uc640 \uae30\uc900\uc774 'ASME Section VIII Div 1'\uc774\ub77c\ub294 \uad6d\uc81c\uc801\uc73c\ub85c \uc778\uc815\ubc1b\ub294 \uc555\ub825\uc6a9\uae30 \uc124\uacc4 \ubc0f \uc81c\uc791 \uaddc\uc815\uc5d0 \ub530\ub77c \uc5c4\uaca9\ud558\uac8c \uc218\ud589\ub418\uc5b4\uc57c \ud55c\ub2e4\ub294 \uac83\uc785\ub2c8\ub2e4. \uc774\ub294 \uc548\uc804\uc131\uacfc \uc2e0\ub8b0\uc131\uc744 \ubcf4\uc7a5\ud558\uae30 \uc704\ud55c \ud544\uc218\uc801\uc778 \uc694\uad6c\uc0ac\ud56d\uc73c\ub85c, \ud574\ub2f9 \uaddc\uc815\uc744 \uc900\uc218\ud568\uc73c\ub85c\uc368 \uc555\ub825\uc6a9\uae30\uc758 \uad6c\uc870\uc801 \ubb34\uacb0\uc131\uacfc \uc6b4\uc601 \uc548\uc804\uc131\uc744 \ud655\ubcf4\ud560 \uc218 \uc788\uc2b5\ub2c8\ub2e4. \ub530\ub77c\uc11c \uad00\ub828 \uc5c5\ubb34 \uc218\ud589 \uc2dc \ubc18\ub4dc\uc2dc \uc774 \uaddc\uc815\uc744 \ucc38\uc870\ud558\uc5ec \uac80\uc0ac\ub97c \uc9c4\ud589\ud574\uc57c \ud569\ub2c8\ub2e4.",
"reasoning": null,
"tool_calls": null,
"tool_call_id": null,
"name": null
},
"logprobs": null
}
],
"usage": {
"prompt_tokens": 44,
"completion_tokens": 118,
"total_tokens": 162,
"prompt_tokens_details": {
"cached_tokens": 0
},
"prompt_tps": 0.0,
"generation_tps": 0.0,
"peak_memory": 29.804702642
}
}
-159
View File
@@ -1,159 +0,0 @@
"""ds-macbook-offload-1 P2-4 — deep 슬롯 라우팅 / 보류(StageDeferred) / drain 가드 테스트.
DB 불요(unit) AIClient __new__ settings 우회, drain 가드는 settings monkeypatch.
통합(보류 백오프 DB 기록, claim 경합) P3-2 E2E 게이트에서 라이브 실측.
fixture = tests/fixtures/qwen_router_chat_completion.json (2026-06-11 라이브 박제
라우터 :8890 경유 model=qwen-macbook, production 호출 형상과 동일 body, 13.2s 실측).
"""
import json
from pathlib import Path
from types import SimpleNamespace
import httpx
import pytest
from ai.client import AIClient, call_deep_or_defer, is_deferrable_error
from models.queue import StageDeferred
FIXTURE = Path(__file__).parent / "fixtures" / "qwen_router_chat_completion.json"
def _client(deep_cfg, primary_cfg):
"""settings 비의존 AIClient — __init__ 우회 후 ai 슬롯만 주입."""
client = AIClient.__new__(AIClient)
client.ai = SimpleNamespace(deep=deep_cfg, primary=primary_cfg)
return client
def _http_status_error(status: int) -> httpx.HTTPStatusError:
req = httpx.Request("POST", "http://router:8890/v1/chat/completions")
resp = httpx.Response(status, request=req)
return httpx.HTTPStatusError(f"status {status}", request=req, response=resp)
# ─── is_deferrable_error 분류 ──────────────────────────────────────────────
@pytest.mark.parametrize("exc", [
_http_status_error(503), # 라우터 upstream_cold/editor_busy/warming
_http_status_error(502), # 라우터: upstream 연결 실패/생성 중 절단 변환
_http_status_error(504),
httpx.ConnectError("connection refused"), # 라우터 자체 불가
httpx.ConnectTimeout("connect timeout"),
httpx.ReadTimeout("read timeout"), # DS↔라우터 구간 절단
httpx.ReadError("connection reset"),
httpx.RemoteProtocolError("server disconnected"),
])
def test_deferrable_errors(exc):
assert is_deferrable_error(exc) is True
@pytest.mark.parametrize("exc", [
_http_status_error(400), # unknown alias 등 — 설정 오류는 보류 아님
_http_status_error(500),
ValueError("parse"),
RuntimeError("boom"),
])
def test_non_deferrable_errors(exc):
assert is_deferrable_error(exc) is False
# ─── call_deep 슬롯 선택 ───────────────────────────────────────────────────
@pytest.mark.asyncio
async def test_call_deep_uses_deep_slot():
deep = SimpleNamespace(model="qwen-macbook")
primary = SimpleNamespace(model="gemma-26b")
client = _client(deep, primary)
captured = {}
async def fake_request(cfg, prompt, system=None):
captured["cfg"] = cfg
return "ok"
client._request = fake_request
assert await client.call_deep("p") == "ok"
assert captured["cfg"] is deep
@pytest.mark.asyncio
async def test_call_deep_falls_back_to_primary_when_slot_absent():
"""슬롯 부재 = 기능 미활성 (방어적 primary — silent 강등이 아니라 기존 경로 그대로)."""
primary = SimpleNamespace(model="gemma-26b")
client = _client(None, primary)
captured = {}
async def fake_request(cfg, prompt, system=None):
captured["cfg"] = cfg
return "ok"
client._request = fake_request
await client.call_deep("p")
assert captured["cfg"] is primary
# ─── call_deep_or_defer 보류 변환 ──────────────────────────────────────────
@pytest.mark.asyncio
@pytest.mark.parametrize("exc", [
_http_status_error(503),
httpx.ConnectError("refused"),
httpx.ReadTimeout("cut mid-generation"),
])
async def test_defer_conversion(exc):
client = _client(SimpleNamespace(model="qwen-macbook"), None)
async def fail_request(cfg, prompt, system=None):
raise exc
client._request = fail_request
with pytest.raises(StageDeferred):
await call_deep_or_defer(client, "p")
@pytest.mark.asyncio
async def test_non_deferrable_propagates():
"""400/일반 오류는 StageDeferred 아님 — 호출자 기존 실패 경로로 전파."""
client = _client(SimpleNamespace(model="qwen-macbook"), None)
async def fail_request(cfg, prompt, system=None):
raise _http_status_error(400)
client._request = fail_request
with pytest.raises(httpx.HTTPStatusError):
await call_deep_or_defer(client, "p")
def test_stage_deferred_carries_backoff():
e = StageDeferred("macbook_unavailable:ConnectError")
assert e.retry_after_minutes == 30
def test_router_fixture_shape():
"""_request 파싱 경로(choices[0].message.content)가 라우터 실응답 형상과 일치하는지 고정."""
data = json.loads(FIXTURE.read_text())
content = data["choices"][0]["message"]["content"]
assert isinstance(content, str) and len(content) > 0
assert data["choices"][0]["message"]["role"] == "assistant"
# 라우터가 alias 를 upstream 로컬 경로로 치환해 응답 — 실처리 모델 추적 가능
assert "Qwen3.6-27B-8bit" in data["model"]
# ─── drain 가드 (silent 강등 금지) ─────────────────────────────────────────
@pytest.mark.asyncio
async def test_drain_requires_deep_slot(monkeypatch):
import workers.queue_drain as qd
monkeypatch.setattr(qd, "settings", SimpleNamespace(ai=SimpleNamespace(deep=None)))
with pytest.raises(SystemExit):
await qd.drain("summarize", 1)
@pytest.mark.asyncio
async def test_drain_rejects_non_drain_stage(monkeypatch):
import workers.queue_drain as qd
monkeypatch.setattr(qd, "settings", SimpleNamespace(ai=SimpleNamespace(deep=object())))
with pytest.raises(SystemExit):
await qd.drain("classify", 1)