feat(eid): 이드 채팅 표면 — /api/eid/chat SSE 스트리밍 + /chat 페이지 (P1)

- compose: eid_chat surface 등록(persona+rules, 자유-prose) + rules_present() 라이브 판정(D-6 fail-closed)
- EidAIClient.call_stream: 닫힌 mode 매핑(daily→mac-mini-default/deep→qwen-macbook), router 경유,
  MLX gate(FOREGROUND)+wall-clock 300s deadline, SSE 라인 relay(model→mode 치환·usage 제거),
  router 400 fail-loud, error_reason allowlist sanitize
- POST /api/eid/chat: JWT, role=system 422 거부, 8000자/40턴/총량 32000 cap,
  503 error_reason(ask 컨벤션), 본문 무로깅
- frontend /chat: 이드 표면 문법(일상/심층, 모델·머신명 비노출), SSE 파서(경계 buf·flush·[DONE]),
  error_reason UX, 8000자 선차단+422 오염 차단, localStorage 이력(logout 시 제거), nav 등록
- Caddyfile: encode 명시 match로 text/event-stream gzip 버퍼링 제외
- tests: 신규 32+ (fixture: router 경유 26B/27B SSE 박제), tests/eid 61 + ask 회귀 9 = 70 passed
- 적대 리뷰 3렌즈 18 finding 반영 13/13. 배포는 D26 게이트(fix/hwp 머지+Soft Lock) 대기

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
This commit is contained in:
hyungi
2026-06-11 10:51:39 +09:00
parent d3aa640f65
commit cd06ef0403
16 changed files with 1641 additions and 3 deletions
+168
View File
@@ -0,0 +1,168 @@
"""이드 채팅 표면 — POST /api/eid/chat (eid-chat 트랙).
확정 결정:
- D-1 경로 = /api/eid/chat (main.py prefix=/api/eid + 본 라우터 POST /chat)
- D-2 mode 닫힌 어휘: daily(mac-mini-default) / deep(qwen-macbook). 클라는 mode 만 보냄 —
claude-cloud / auto 금지 (Literal 로 422 차단). 심층(deep) 모드 무게이트.
- D-3 독립 /chat 라우트 (frontend) — 본 모듈은 백엔드 API 만.
- D-5 LLM 호출 = EidAIClient.call_stream 한 곳 (이드 egress 봉쇄 불변식 #5,
RouterBackend 직접 호출 금지).
- D-6 rules.md 부재 = 503 substrate_degraded fail-closed — 다른 표면의 degraded 배너
컨벤션(compose._rules)과 달리 채팅은 진행 자체를 거부.
응답 = router SSE 라인 단위 중계 (text/event-stream — call_stream 이 model 필드를 mode
어휘로 치환·usage 제거, 프레이밍 보존. 본 모듈은 무변형 relay). 스트림 시작 전
backend 실패는 /api/search/ask 와 동일 shape 의 503 + error_reason 매핑(자동 fallback 0).
로그는 메타 1줄(mode·턴수·status)만 — 대화 본문 로깅 0.
"""
from __future__ import annotations
from typing import Annotated, Literal
import httpx
from fastapi import APIRouter, Depends
from fastapi.responses import JSONResponse, StreamingResponse
from pydantic import BaseModel, Field, field_validator, model_validator
from core.auth import get_current_user
from core.utils import setup_logger
from eid import compose as eid_compose
from eid.ai import EidAIClient
from models.user import User
from services.llm.backends import BackendUnavailable
logger = setup_logger("eid_chat")
router = APIRouter()
class ChatMessage(BaseModel):
"""채팅 턴 1건. role=system 은 Literal 밖 → 422 (system 합본은 서버 compose 만 주입)."""
role: Literal["user", "assistant"]
content: str = Field(min_length=1, max_length=8000)
# 대화 총량 cap (전 메시지 content 합) — per-message 8000·40턴 제한과 별도의 총량 상한
_TOTAL_CONTENT_CAP = 32000
class ChatRequest(BaseModel):
"""POST /api/eid/chat body. mode 는 닫힌 어휘(D-2), messages 는 1~40턴 + 총량 32000자."""
mode: Literal["daily", "deep"]
messages: list[ChatMessage] = Field(min_length=1, max_length=40)
@field_validator("messages")
@classmethod
def _last_turn_is_user(cls, v: list[ChatMessage]) -> list[ChatMessage]:
if v and v[-1].role != "user":
raise ValueError("마지막 메시지는 role=user 여야 합니다")
return v
@model_validator(mode="after")
def _total_content_cap(self) -> "ChatRequest":
if sum(len(m.content) for m in self.messages) > _TOTAL_CONTENT_CAP:
raise ValueError(
"대화 총량 초과 — 새 대화로 시작하거나 입력을 줄여주세요 "
f"(전체 메시지 합 {_TOTAL_CONTENT_CAP}자 제한)"
)
return self
@router.post("/chat")
async def eid_chat(
body: ChatRequest,
user: Annotated[User, Depends(get_current_user)],
):
"""이드 채팅 — router SSE 스트리밍 pass-through.
503 두 경로 (둘 다 자동 fallback 없음):
- substrate_degraded: rules.md 부재 (D-6 fail-closed, 채팅 진행 거부)
- backend_unavailable: 스트림 시작 전 backend 실패 (ask 컨벤션과 동일 shape)
"""
# D-6: rules 부재 = fail-closed. 채팅은 안전·정책 가드 없이 진행하지 않는다(배너 X).
if not eid_compose.rules_present():
logger.error(
"eid_chat substrate_degraded mode=%s turns=%d status=503 — rules.md 부재, 채팅 거부",
body.mode, len(body.messages),
)
return JSONResponse(
status_code=503,
content={
"detail": (
"이드 substrate 가 degraded 상태입니다 (운영 규칙 rules.md 부재). "
"복구 전까지 채팅을 진행하지 않습니다."
),
"error_reason": "substrate_degraded",
},
)
system = eid_compose.compose("eid_chat", task="")
client = EidAIClient()
stream = client.call_stream(
body.mode, [m.model_dump() for m in body.messages], system,
)
# async generator 는 첫 __anext__ 에서야 실제 요청 전송 — 스트림 시작 전 실패(연결/4xx/5xx)
# 를 503 으로 매핑하기 위해 첫 chunk 를 여기서 먼저 당긴다.
try:
first = await anext(stream, None)
except BackendUnavailable as exc:
logger.warning(
"eid_chat backend_unavailable mode=%s turns=%d status=503 reason=%s",
body.mode, len(body.messages), exc.reason,
)
await client.close()
return JSONResponse(
status_code=503,
content={
"error": "backend_unavailable",
"error_reason": exc.reason,
"backend_requested": exc.backend_name,
"detail": (
"선택한 모드의 backend 가 일시적으로 응답할 수 없습니다. "
"잠시 후 다시 시도하거나 mode 를 바꿔 호출하세요."
),
},
)
except BaseException:
await client.close()
raise
# 메타 로그 1줄 — 본문 로깅 0 (대화 내용은 어디에도 남기지 않는다)
logger.info(
"eid_chat stream mode=%s turns=%d status=200", body.mode, len(body.messages)
)
async def _passthrough():
# call_stream 방출분 무변형 relay (정화는 call_stream 라인 단위 한 곳). 취소·
# disconnect 포함 finally 에서 generator aclose → AsyncExitStack 이 upstream 정리.
try:
try:
if first is not None:
yield first
async for chunk in stream:
yield chunk
except (BackendUnavailable, httpx.HTTPError) as exc:
# 스트림 시작 후 절단 — status 200 은 이미 송신돼 재매핑 불가. 메타 로그
# 1줄만 남기고 조용히 종료(traceback 전파 0) — 프론트는 [DONE] 부재로 처리.
logger.warning(
"eid_chat stream aborted mode=%s turns=%d reason=%s",
body.mode, len(body.messages),
getattr(exc, "reason", type(exc).__name__),
)
return
finally:
# stream.aclose() 가 예외여도 client.close() 는 보장 (중첩 finally)
try:
await stream.aclose()
finally:
await client.close()
return StreamingResponse(
_passthrough(),
media_type="text/event-stream",
headers={"Cache-Control": "no-store", "X-Accel-Buffering": "no"},
)
+193
View File
@@ -11,11 +11,116 @@ endpoint 를 못 부른다(silent fallback 0, rules no-silent-fallback).
- _request() → endpoint 에 anthropic.com 있으면 raise(primary 오결선 방어, 이중보증)
call_primary / call_triage / embed / rerank 는 그대로(내부 inference·임베딩 허용).
egress 워커·시스템 경로는 기존 AIClient 유지 — fallback 은 시스템만, 이드만 박탈(분리).
eid-chat (D-5): 이드 채팅 SSE 스트리밍도 이 클래스의 call_stream() 한 곳 — RouterBackend
직접 호출 금지, mode 어휘는 _CHAT_ALIAS 닫힌 매핑(daily/deep)만, 미지 mode = EidEgressBlocked.
"""
from __future__ import annotations
import asyncio
import json
import re
from collections.abc import AsyncIterator
from contextlib import AsyncExitStack
import httpx
from ai.client import AIClient
from services.llm.backends import (
MAC_MINI_DEFAULT,
QWEN_MACBOOK,
BackendUnavailable,
_router_url, # router URL 단일 출처 재사용 (settings → env LLM_ROUTER_URL → MVP default)
)
from services.search.llm_gate import Priority, acquire_mlx_gate
# 이드 채팅 mode → router alias 닫힌 매핑 (D-2). 클라는 mode 만 보냄 — claude-cloud/auto 금지.
_CHAT_ALIAS: dict[str, str] = {
"daily": MAC_MINI_DEFAULT, # router tier_b → Mac mini :8801 gemma-4-26b
"deep": QWEN_MACBOOK, # router named upstream → M5 Max Qwen3.6-27B (무게이트, D-2)
}
# read 는 per-chunk 적용이라 MacBook wake(24s)+토큰 생성 간격 커버. connect 는 내부 router 라 짧게.
_STREAM_TIMEOUT = httpx.Timeout(connect=5.0, read=120.0, write=30.0, pool=5.0)
# 스트림 중계 전체(업스트림 진입~종료) wall-clock 상한. per-chunk read timeout 만으로는
# 토큰이 계속 흐르는 한 무한 점유 가능 → daily 는 mlx gate 를 물고 있어 deadline 필수.
# deep 도 동일 적용(단순·일관). 정상 스트림(max_tokens 2048, ~90tps ≈ 23s)은 여유 통과.
_STREAM_DEADLINE_S = 300.0
# error_reason allowlist — 이 밖(대문자/공백/JSON 직렬화 파편)은 일반화해 비노출
_REASON_ALLOWED = re.compile(r"[a-z0-9_]{1,64}")
# 스트림 시작 전 transport 계열 실패 → BackendUnavailable 매핑 대상 (RouterBackend._post 와 동일 목록)
_TRANSPORT_ERRORS = (
httpx.ConnectError,
httpx.ConnectTimeout,
httpx.ReadTimeout,
httpx.PoolTimeout,
httpx.WriteTimeout,
httpx.RemoteProtocolError,
)
def _stream_error_reason(status_code: int, body: bytes) -> str:
"""스트림 시작 전 4xx/5xx 응답 본문 → error_reason 추출.
어휘는 /api/search/ask(RouterBackend._post)와 일치 — router 가 주는 error.type /
error.error_reason (macbook_unavailable / warming / editor_busy / upstream_cold /
provider_not_configured 등) 우선, 없으면 status 기반 router_503 / upstream_502 /
router_http_<status>.
최종 reason 은 [a-z0-9_]{1,64} allowlist 검사 — 불일치(대문자/공백/dict 직렬화
파편)는 upstream_502(502 계열) / router_error(그 외) 로 일반화해 외부 비노출.
"""
try:
data = json.loads(body.decode("utf-8", errors="replace"))
except Exception:
data = {}
err = data.get("error", {}) if isinstance(data, dict) else {}
reason: str | None = None
if isinstance(err, dict):
raw = err.get("type") or err.get("error_reason")
if raw:
reason = str(raw)
if reason is None and isinstance(data, dict) and data.get("error_reason"):
reason = str(data["error_reason"])
if reason is None:
if status_code == 502:
reason = "upstream_502"
elif status_code == 503:
reason = "router_503"
else:
reason = f"router_http_{status_code}"
if _REASON_ALLOWED.fullmatch(reason):
return reason
return "upstream_502" if status_code == 502 else "router_error"
def _rewrite_sse_line(line: bytes, mode: str) -> bytes:
"""SSE 라인 1건 정화 — data: JSON 의 model 을 mode 어휘로 치환 + usage 제거.
fixture 실측: 27B chunk 의 model 필드가 맥북 파일시스템 절대경로
("/Users/.../mlx-models/Qwen3.6-27B-8bit")를 노출 — 표면 문법 '모델·머신명
비노출'과 충돌해 라인 단위로 재작성한다. usage(tps/peak_memory 등 머신
텔레메트리)도 함께 제거. [DONE]·비-data 라인(빈 줄 포함)·파싱 실패 라인은
원문 그대로(방어적) — SSE 프레이밍(data: 라인 + 빈 줄) 보존.
"""
if not line.startswith(b"data: "):
return line
payload = line[len(b"data: "):]
if payload.strip() == b"[DONE]":
return line
try:
obj = json.loads(payload)
except Exception:
return line
if not isinstance(obj, dict):
return line
obj["model"] = mode
obj.pop("usage", None)
return b"data: " + json.dumps(obj, ensure_ascii=False).encode("utf-8")
class EidEgressBlocked(RuntimeError):
@@ -39,3 +144,91 @@ class EidAIClient(AIClient):
if "anthropic.com" in endpoint:
raise EidEgressBlocked(f"이드: 외부 endpoint 차단 ({endpoint}). 내부 inference 만.")
return await super()._request(model_config, prompt, system=system)
async def call_stream(
self, mode: str, messages: list[dict], system: str
) -> AsyncIterator[bytes]:
"""이드 채팅 SSE 스트림 — router /v1/chat/completions stream=true 라인 단위 중계 (D-5).
mode : "daily" | "deep" — _CHAT_ALIAS 닫힌 매핑. 미지 mode = EidEgressBlocked
(이드 LLM 호출 봉쇄는 이 클래스 한 곳, 불변식 #5).
messages : user/assistant 턴 목록 (system role 금지 — system 인자로만 주입).
system : compose("eid_chat", ...) 합본. messages 맨 앞에 system role 로 끼움.
스트림 시작 전 실패(연결 실패·5xx 응답) = BackendUnavailable(reason 어휘는 ask
와 동일). router 400 = 닫힌 매핑에서 alias drift 코드 버그 → ValueError fail-loud
(RouterBackend._post 컨벤션 미러). 스트림 시작 후엔 bytes 를 라인 버퍼링해
_rewrite_sse_line 으로 model 치환(mode 어휘)·usage 제거만 하고 프레이밍은 보존.
취소/disconnect 시 AsyncExitStack 이 response·client 정리(upstream 닫힘 보장).
daily(mac-mini-default)는 Mac mini MLX 단일 inference 영구 룰(llm_gate docstring
"예외 없이 gate 획득 필수")에 따라 acquire_mlx_gate(FOREGROUND) 안에서 스트리밍 —
RouterBackend 의 requires_gate=True 와 동일한 client-side mutex 효과.
deep(qwen-macbook)은 별 endpoint 라 무게이트 (D-2, RouterBackend 동형).
중계 전체(업스트림 진입~종료)는 asyncio.timeout(_STREAM_DEADLINE_S) wall-clock
deadline 안 — llm_gate 계약 "timeout 은 gate 안쪽" 준수(gate 대기엔 미적용).
초과 시 BackendUnavailable(alias, "stream_deadline_exceeded") 로 수렴.
"""
alias = _CHAT_ALIAS.get(mode)
if alias is None:
raise EidEgressBlocked(
f"이드: 미지 chat mode {mode!r} — 닫힌 매핑(daily/deep) 외 호출 차단."
)
router_url = _router_url()
if "anthropic.com" in router_url:
# 기존 _request 패턴 미러 — router URL 오결선 시 외부 egress 방어 (이중보증)
raise EidEgressBlocked(f"이드: 외부 endpoint 차단 ({router_url}). 내부 router 만.")
url = f"{router_url.rstrip('/')}/v1/chat/completions"
payload = {
"model": alias,
"messages": [{"role": "system", "content": system}] + messages,
"stream": True,
"max_tokens": 2048,
"temperature": 0.4,
}
async with AsyncExitStack() as stack:
if alias == MAC_MINI_DEFAULT:
await stack.enter_async_context(acquire_mlx_gate(Priority.FOREGROUND))
client = await stack.enter_async_context(httpx.AsyncClient(timeout=_STREAM_TIMEOUT))
try:
# wall-clock deadline — gate 획득 *후* 진입 (llm_gate "timeout 은 gate 안쪽")
async with asyncio.timeout(_STREAM_DEADLINE_S):
try:
resp = await stack.enter_async_context(
client.stream("POST", url, json=payload)
)
except _TRANSPORT_ERRORS as exc:
# 스트림 시작 전 연결 계열 실패 — reason 어휘 = RouterBackend(router_*) 와 일치
raise BackendUnavailable(alias, f"router_{type(exc).__name__}") from exc
if resp.status_code == 400:
# 닫힌 매핑에서 400 = alias drift 코드 버그 — RouterBackend._post 미러,
# BackendUnavailable(일시 비가용) 아님 → fail-loud
body = await resp.aread()
try:
data = json.loads(body.decode("utf-8", errors="replace"))
except Exception:
data = {}
raise ValueError(f"router rejected alias={alias!r} body={data!r}")
if resp.status_code >= 400:
body = await resp.aread()
raise BackendUnavailable(
alias, _stream_error_reason(resp.status_code, body)
)
buf = b""
try:
async for chunk in resp.aiter_bytes():
buf += chunk
# 라인 버퍼링 — 청크 경계에서 b"\n" 분리, 잔여 버퍼 유지
while (nl := buf.find(b"\n")) != -1:
line, buf = buf[:nl], buf[nl + 1:]
yield _rewrite_sse_line(line, mode) + b"\n"
except _TRANSPORT_ERRORS as exc:
# 시작 후 중단 — 이미 보낸 chunk 는 전송됨. typed 예외로 수렴(caller 가 끊고 정리).
raise BackendUnavailable(alias, f"router_{type(exc).__name__}") from exc
if buf:
# 스트림 끝 잔여분 flush (개행 없는 마지막 라인 — 원문에 없던 \n 추가 안 함)
yield _rewrite_sse_line(buf, mode)
except TimeoutError as exc:
# asyncio.timeout 초과 — 게이트 점유 무한화 차단, typed 예외로 수렴
raise BackendUnavailable(alias, "stream_deadline_exceeded") from exc
+13
View File
@@ -50,6 +50,8 @@ _ROUTE: dict[str, dict] = {
"react_ask": {"overlay": None, "variant": "full"},
"study_subject_note": {"overlay": None, "variant": "full"},
"study_question_explanation": {"overlay": None, "variant": "full"},
# 이드 채팅 표면 (D-1 /api/eid/chat) — 자유-prose(base), persona ON (불변식 #3)
"eid_chat": {"overlay": None, "variant": "full"},
# 미래 active eid 표면 — 기능 overlay (W3+ 에서 호출 배선)
"study_diagnosis": {"overlay": "study", "variant": "full"},
"document_brief": {"overlay": "document", "variant": "full"},
@@ -113,6 +115,17 @@ def is_composed_surface(surface: str) -> bool:
return surface in _ROUTE
def rules_present() -> bool:
"""rules.md 존재 여부 — 채팅 표면(D-6)의 fail-closed 판정 재료.
기존 _rules() 의 degraded 배너 컨벤션(다른 표면, fail-loud 진행)은 그대로 둔다 —
여긴 '진행 거부' 판정만 제공하고 강제는 호출부(/api/eid/chat) 책임.
lru_cache 된 _read 를 쓰지 않고 매 호출 직접 stat — D-6 게이트는 살아있는 판정
이어야 한다(캐시 동결 시 rules.md 부재/복구가 영원히 반영 안 됨).
"""
return (_SUBSTRATE_DIR / "rules.md").is_file()
def compose(surface: str, task: str, *, variant: str | None = None,
budget_chars: int | None = None) -> str:
"""persona → rules → overlay → task 단일 system 문자열 합성.
+3
View File
@@ -17,6 +17,7 @@ from api.digest import router as digest_router
from api.document_notes import router as document_notes_router
from api.document_reads import router as document_reads_router
from api.documents import router as documents_router
from api.eid_chat import router as eid_chat_router
from api.events import router as events_router
from api.library import router as library_router
from api.memos import router as memos_router
@@ -174,6 +175,8 @@ app.include_router(documents_router, prefix="/api/documents", tags=["documents"]
app.include_router(document_reads_router, prefix="/api/documents", tags=["document-reads"])
app.include_router(document_notes_router, prefix="/api/documents", tags=["document-notes"])
app.include_router(search_router, prefix="/api/search", tags=["search"])
# 이드 채팅 표면 (D-1) — POST /api/eid/chat. SSE 스트리밍, EidAIClient.call_stream 봉쇄 경유.
app.include_router(eid_chat_router, prefix="/api/eid", tags=["eid-chat"])
app.include_router(memos_router, prefix="/api/memos", tags=["memos"])
app.include_router(events_router, prefix="/api/events", tags=["events"])